1use crate::event_queue::{Events, Token};
53
54#[derive(Debug)]
68pub struct LocalNotify {
69 bits: Vec<u64>,
73
74 dispatch_list: Vec<usize>,
76
77 num_tokens: usize,
79}
80
81impl LocalNotify {
82 pub fn with_capacity(capacity: usize) -> Self {
84 Self {
85 bits: vec![0u64; capacity.div_ceil(64)],
86 dispatch_list: Vec::with_capacity(capacity),
87 num_tokens: 0,
88 }
89 }
90
91 pub fn register(&mut self) -> Token {
95 let idx = self.num_tokens;
96 self.num_tokens += 1;
97 let word = idx / 64;
98 if word >= self.bits.len() {
99 self.bits.push(0);
100 }
101 Token::new(idx)
102 }
103
104 pub fn ensure_capacity(&mut self, idx: usize) {
110 if idx >= self.num_tokens {
111 self.num_tokens = idx + 1;
112 }
113 let word = idx / 64;
114 if word >= self.bits.len() {
115 self.bits.resize(word + 1, 0);
116 }
117 }
118
119 #[inline]
127 pub fn mark(&mut self, token: Token) {
128 let idx = token.index();
129 assert!(
130 idx < self.num_tokens,
131 "token index {} out of range ({})",
132 idx,
133 self.num_tokens,
134 );
135 let word = idx / 64;
136 let bit = 1u64 << (idx % 64);
137 if self.bits[word] & bit == 0 {
140 self.bits[word] |= bit;
141 self.dispatch_list.push(idx);
142 }
143 }
144
145 #[inline]
153 pub fn poll(&mut self, events: &mut Events) {
154 self.poll_limit(events, usize::MAX);
155 }
156
157 #[inline]
164 pub fn poll_limit(&mut self, events: &mut Events, limit: usize) {
165 events.clear();
166 let drain_count = self.dispatch_list.len().min(limit);
167 for &idx in &self.dispatch_list[..drain_count] {
168 events.push(Token::new(idx));
169 }
170 if drain_count == self.dispatch_list.len() {
171 self.bits.fill(0);
173 self.dispatch_list.clear();
174 } else {
175 for &idx in &self.dispatch_list[..drain_count] {
181 self.bits[idx / 64] &= !(1 << (idx % 64));
182 }
183 self.dispatch_list.drain(..drain_count);
184 }
185 }
186
187 pub fn has_notified(&self) -> bool {
189 !self.dispatch_list.is_empty()
190 }
191
192 pub fn notified_count(&self) -> usize {
194 self.dispatch_list.len()
195 }
196
197 pub fn capacity(&self) -> usize {
199 self.num_tokens
200 }
201}
202
203#[cfg(test)]
208mod tests {
209 use super::*;
210
211 #[test]
212 fn register_and_mark() {
213 let mut notify = LocalNotify::with_capacity(4);
214 let mut events = Events::with_capacity(4);
215
216 let t = notify.register();
217 notify.mark(t);
218 assert!(notify.has_notified());
219
220 notify.poll(&mut events);
221 assert_eq!(events.len(), 1);
222 assert_eq!(events.as_slice()[0], t);
223
224 assert!(!notify.has_notified());
226 }
227
228 #[test]
229 fn dedup() {
230 let mut notify = LocalNotify::with_capacity(4);
231 let mut events = Events::with_capacity(4);
232
233 let t = notify.register();
234 notify.mark(t);
235 notify.mark(t); notify.mark(t); notify.poll(&mut events);
239 assert_eq!(events.len(), 1);
240 }
241
242 #[test]
243 fn multiple_tokens() {
244 let mut notify = LocalNotify::with_capacity(4);
245 let mut events = Events::with_capacity(4);
246
247 let t0 = notify.register();
248 let t1 = notify.register();
249 let t2 = notify.register();
250
251 notify.mark(t0);
252 notify.mark(t2);
253 notify.poll(&mut events);
256 assert_eq!(events.len(), 2);
257 assert!(events.as_slice().contains(&t0));
258 assert!(events.as_slice().contains(&t2));
259 }
260
261 #[test]
262 fn mark_order_preserved() {
263 let mut notify = LocalNotify::with_capacity(4);
264 let mut events = Events::with_capacity(4);
265
266 let t0 = notify.register();
267 let t1 = notify.register();
268 let t2 = notify.register();
269
270 notify.mark(t2);
271 notify.mark(t0);
272 notify.mark(t1);
273
274 notify.poll(&mut events);
275 assert_eq!(events.as_slice(), &[t2, t0, t1]);
276 }
277
278 #[test]
279 fn multiple_cycles() {
280 let mut notify = LocalNotify::with_capacity(4);
281 let mut events = Events::with_capacity(4);
282
283 let t = notify.register();
284
285 notify.mark(t);
287 notify.poll(&mut events);
288 assert_eq!(events.len(), 1);
289
290 notify.mark(t);
292 notify.poll(&mut events);
293 assert_eq!(events.len(), 1);
294 }
295
296 #[test]
297 fn no_marks_empty_poll() {
298 let mut notify = LocalNotify::with_capacity(4);
299 let mut events = Events::with_capacity(4);
300
301 let _t = notify.register();
302 notify.poll(&mut events);
303 assert!(events.is_empty());
304 assert!(!notify.has_notified());
305 }
306
307 #[test]
308 fn zero_capacity() {
309 let mut notify = LocalNotify::with_capacity(0);
310 let mut events = Events::with_capacity(4);
311
312 let t = notify.register();
313 notify.mark(t);
314
315 notify.poll(&mut events);
316 assert_eq!(events.len(), 1);
317 }
318
319 #[test]
320 fn word_boundary_tokens() {
321 let mut notify = LocalNotify::with_capacity(0);
322 let mut events = Events::with_capacity(256);
323
324 let mut tokens = Vec::new();
326 for _ in 0..130 {
327 tokens.push(notify.register());
328 }
329
330 let boundary = [
332 tokens[0],
333 tokens[63], tokens[64], tokens[127], tokens[128], ];
338 for &t in &boundary {
339 notify.mark(t);
340 }
341
342 notify.poll(&mut events);
343 assert_eq!(events.len(), 5);
344 for &t in &boundary {
345 assert!(events.as_slice().contains(&t));
346 }
347
348 for &t in &boundary {
350 notify.mark(t);
351 }
352 notify.poll(&mut events);
353 assert_eq!(events.len(), 5);
354 }
355
356 #[test]
357 fn grows_beyond_initial_capacity() {
358 let mut notify = LocalNotify::with_capacity(2);
359 let mut events = Events::with_capacity(256);
360
361 let mut tokens = Vec::new();
362 for _ in 0..200 {
363 tokens.push(notify.register());
364 }
365
366 for &t in &tokens {
367 notify.mark(t);
368 }
369
370 notify.poll(&mut events);
371 assert_eq!(events.len(), 200);
372 }
373
374 #[test]
375 fn poll_limit_partial() {
376 let mut notify = LocalNotify::with_capacity(8);
377 let mut events = Events::with_capacity(8);
378
379 let mut tokens = Vec::new();
380 for _ in 0..5 {
381 tokens.push(notify.register());
382 }
383 for &t in &tokens {
384 notify.mark(t);
385 }
386
387 notify.poll_limit(&mut events, 2);
389 assert_eq!(events.len(), 2);
390 assert_eq!(notify.notified_count(), 3); notify.poll(&mut events);
394 assert_eq!(events.len(), 3);
395 assert!(!notify.has_notified());
396 }
397
398 #[test]
399 fn poll_limit_exceeds_count() {
400 let mut notify = LocalNotify::with_capacity(4);
401 let mut events = Events::with_capacity(4);
402
403 let t = notify.register();
404 notify.mark(t);
405
406 notify.poll_limit(&mut events, 100);
407 assert_eq!(events.len(), 1);
408 assert!(!notify.has_notified());
409 }
410
411 #[test]
412 fn notified_count() {
413 let mut notify = LocalNotify::with_capacity(4);
414
415 let t0 = notify.register();
416 let t1 = notify.register();
417
418 assert_eq!(notify.notified_count(), 0);
419 notify.mark(t0);
420 assert_eq!(notify.notified_count(), 1);
421 notify.mark(t1);
422 assert_eq!(notify.notified_count(), 2);
423 notify.mark(t0); assert_eq!(notify.notified_count(), 2);
425 }
426
427 #[test]
428 fn capacity_tracks_registrations() {
429 let mut notify = LocalNotify::with_capacity(4);
430 assert_eq!(notify.capacity(), 0);
431
432 notify.register();
433 notify.register();
434 assert_eq!(notify.capacity(), 2);
435 }
436}