1use std::{
3 cell::RefCell,
4 collections::BTreeMap,
5 fmt::Debug,
6 pin::Pin,
7 rc::Rc,
8 task::{Context, Poll, Waker},
9};
10
11#[derive(Clone)]
17pub struct Event {
18 inner: Rc<RefCell<Inner>>,
19}
20
21impl Debug for Event {
22 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23 let guard = self.inner.try_borrow();
24 match guard {
25 Ok(inner) => f.debug_tuple("Event").field(&inner).finish(),
26 Err(_) => f.debug_tuple("Event").field(&"<locked>").finish(),
27 }
28 }
29}
30
31#[derive(Debug)]
32struct Inner {
33 listeners: BTreeMap<usize, ListenerEntry>,
35
36 next_id: usize,
38
39 notified: usize,
41}
42
43#[derive(Debug, Default)]
44struct ListenerEntry {
45 waker: Option<Waker>,
46 notified: bool,
47}
48
49impl Event {
50 pub fn new() -> Self {
60 Event {
61 inner: Rc::new(RefCell::new(Inner {
62 listeners: BTreeMap::new(),
63 next_id: 0,
64 notified: 0,
65 })),
66 }
67 }
68
69 pub fn listen(&self) -> EventListener {
83 let mut inner = self.inner.borrow_mut();
84 let id = inner.next_id;
85 inner.next_id += 1;
86
87 inner.listeners.insert(id, ListenerEntry::default());
88
89 EventListener {
90 event: Rc::clone(&self.inner),
91 id,
92 }
93 }
94
95 pub fn notify(&self, n: usize) {
115 let mut inner = self.inner.borrow_mut();
116
117 let count = if n == usize::MAX {
118 inner.listeners.len()
119 } else {
120 n.saturating_sub(inner.notified)
121 };
122
123 let mut notified = 0;
124 for entry in inner.listeners.values_mut() {
125 if notified >= count {
126 break;
127 }
128 if entry.notified {
129 continue;
130 }
131 entry.notified = true;
132 if let Some(waker) = entry.waker.take() {
133 waker.wake();
134 }
135 notified += 1;
136 }
137
138 inner.notified += notified;
139 }
140
141 pub fn notify_additional(&self, n: usize) {
155 let mut inner = self.inner.borrow_mut();
156
157 let count = if n == usize::MAX {
158 inner.listeners.len()
159 } else {
160 n.min(inner.listeners.len())
161 };
162
163 let mut notified = 0;
164 for entry in inner.listeners.values_mut() {
165 if notified >= count {
166 break;
167 }
168 if entry.notified {
169 continue;
170 }
171 entry.notified = true;
172 if let Some(waker) = entry.waker.take() {
173 waker.wake();
174 }
175 notified += 1;
176 }
177
178 inner.notified += notified;
179 }
180
181 pub fn notify_all(&self) {
201 self.notify(usize::MAX);
202 }
203}
204
205impl Default for Event {
206 fn default() -> Self {
207 Self::new()
208 }
209}
210
211pub struct EventListener {
215 event: Rc<RefCell<Inner>>,
216 id: usize,
217}
218
219impl EventListener {
220 pub fn is_notified(&self) -> bool {
235 self.event
236 .borrow()
237 .listeners
238 .get(&self.id)
239 .map(|e| e.notified)
240 .unwrap_or(false)
241 }
242}
243
244impl Drop for EventListener {
245 fn drop(&mut self) {
246 let mut inner = self.event.borrow_mut();
247
248 let Some(entry) = inner.listeners.remove(&self.id) else {
250 return;
251 };
252
253 if !entry.notified || inner.notified == 0 {
254 return;
255 }
256
257 inner.notified -= 1;
258
259 let Some(next) = inner.listeners.values_mut().find(|e| !e.notified) else {
260 return;
261 };
262
263 next.notified = true;
264
265 if let Some(waker) = next.waker.take() {
266 waker.wake();
267 }
268
269 inner.notified += 1;
270 }
271}
272
273impl std::future::Future for EventListener {
274 type Output = ();
275
276 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
277 let mut inner = self.event.borrow_mut();
278
279 let Some(entry) = inner.listeners.get_mut(&self.id) else {
280 unreachable!("Entry shouldn't be removed")
281 };
282
283 if entry.notified {
284 return Poll::Ready(());
285 }
286
287 entry.waker = Some(cx.waker().clone());
289
290 Poll::Pending
291 }
292}
293
294#[cfg(test)]
295mod tests {
296 use super::*;
297
298 #[test]
299 fn test_notify() {
300 let event = Event::new();
301 let listener = event.listen();
302
303 assert!(!listener.is_notified());
304 event.notify(1);
305 assert!(listener.is_notified());
306 }
307
308 #[test]
309 fn test_notify_multiple() {
310 let event = Event::new();
311 let listener1 = event.listen();
312 let listener2 = event.listen();
313 let listener3 = event.listen();
314
315 event.notify(2);
316
317 assert!(listener1.is_notified());
318 assert!(listener2.is_notified());
319 assert!(!listener3.is_notified());
320
321 event.notify(2);
322
323 assert!(listener1.is_notified());
324 assert!(listener2.is_notified());
325 assert!(!listener3.is_notified());
326 }
327
328 #[test]
329 fn test_notify_additional() {
330 let event = Event::new();
331 let listener1 = event.listen();
332 let listener2 = event.listen();
333 let listener3 = event.listen();
334 let listener4 = event.listen();
335
336 event.notify(2);
337
338 assert!(listener1.is_notified());
339 assert!(listener2.is_notified());
340 assert!(!listener3.is_notified());
341 assert!(!listener4.is_notified());
342
343 event.notify_additional(2);
344
345 assert!(listener1.is_notified());
346 assert!(listener2.is_notified());
347 assert!(listener3.is_notified());
348 assert!(listener4.is_notified());
349 }
350
351 #[test]
352 fn test_notify_all() {
353 let event = Event::new();
354 let listener1 = event.listen();
355 let listener2 = event.listen();
356 let listener3 = event.listen();
357
358 event.notify(usize::MAX);
359
360 assert!(listener1.is_notified());
361 assert!(listener2.is_notified());
362 assert!(listener3.is_notified());
363 }
364
365 #[test]
366 fn test_notify_drop() {
367 let event = Event::new();
368 let listener1 = event.listen();
369 let listener2 = event.listen();
370 let listener3 = event.listen();
371
372 event.notify(2);
373
374 assert!(listener1.is_notified());
375 assert!(listener2.is_notified());
376 assert!(!listener3.is_notified());
377
378 drop(listener2);
379
380 assert!(listener3.is_notified());
381 }
382
383 #[pollster::test]
384 async fn test_listen_async() {
385 let event = Event::new();
386 let listener = event.listen();
387
388 event.notify(1);
389
390 listener.await
391 }
392}