1use std::{
3 cell::RefCell,
4 collections::BTreeMap,
5 pin::Pin,
6 rc::Rc,
7 task::{Context, Poll, Waker},
8};
9
10pub struct Event {
16 inner: Rc<RefCell<Inner>>,
17}
18
19struct Inner {
20 listeners: BTreeMap<usize, ListenerEntry>,
22
23 next_id: usize,
25
26 notified: usize,
28}
29
30#[derive(Debug, Default)]
31struct ListenerEntry {
32 waker: Option<Waker>,
33 notified: bool,
34}
35
36impl Event {
37 pub fn new() -> Self {
47 Event {
48 inner: Rc::new(RefCell::new(Inner {
49 listeners: BTreeMap::new(),
50 next_id: 0,
51 notified: 0,
52 })),
53 }
54 }
55
56 pub fn listen(&self) -> EventListener {
70 let mut inner = self.inner.borrow_mut();
71 let id = inner.next_id;
72 inner.next_id += 1;
73
74 inner.listeners.insert(id, ListenerEntry::default());
75
76 EventListener {
77 event: Rc::clone(&self.inner),
78 id,
79 }
80 }
81
82 pub fn notify(&self, n: usize) {
102 let mut inner = self.inner.borrow_mut();
103
104 let count = if n == usize::MAX {
105 inner.listeners.len()
106 } else {
107 n.saturating_sub(inner.notified)
108 };
109
110 let mut notified = 0;
111 for entry in inner.listeners.values_mut() {
112 if notified >= count {
113 break;
114 }
115 if entry.notified {
116 continue;
117 }
118 entry.notified = true;
119 if let Some(waker) = entry.waker.take() {
120 waker.wake();
121 }
122 notified += 1;
123 }
124
125 inner.notified += notified;
126 }
127
128 pub fn notify_additional(&self, n: usize) {
142 let mut inner = self.inner.borrow_mut();
143
144 let count = if n == usize::MAX {
145 inner.listeners.len()
146 } else {
147 n.min(inner.listeners.len())
148 };
149
150 let mut notified = 0;
151 for entry in inner.listeners.values_mut() {
152 if notified >= count {
153 break;
154 }
155 if entry.notified {
156 continue;
157 }
158 entry.notified = true;
159 if let Some(waker) = entry.waker.take() {
160 waker.wake();
161 }
162 notified += 1;
163 }
164
165 inner.notified += notified;
166 }
167
168 pub fn notify_all(&self) {
188 self.notify(usize::MAX);
189 }
190}
191
192impl Default for Event {
193 fn default() -> Self {
194 Self::new()
195 }
196}
197
198pub struct EventListener {
202 event: Rc<RefCell<Inner>>,
203 id: usize,
204}
205
206impl EventListener {
207 pub fn is_notified(&self) -> bool {
222 self.event
223 .borrow()
224 .listeners
225 .get(&self.id)
226 .map(|e| e.notified)
227 .unwrap_or(false)
228 }
229}
230
231impl Drop for EventListener {
232 fn drop(&mut self) {
233 let mut inner = self.event.borrow_mut();
234
235 let Some(entry) = inner.listeners.remove(&self.id) else {
237 return;
238 };
239
240 if !entry.notified || inner.notified == 0 {
241 return;
242 }
243
244 inner.notified -= 1;
245
246 let Some(next) = inner.listeners.values_mut().find(|e| !e.notified) else {
247 return;
248 };
249
250 next.notified = true;
251
252 if let Some(waker) = next.waker.take() {
253 waker.wake();
254 }
255
256 inner.notified += 1;
257 }
258}
259
260impl std::future::Future for EventListener {
261 type Output = ();
262
263 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
264 let mut inner = self.event.borrow_mut();
265
266 let Some(entry) = inner.listeners.get_mut(&self.id) else {
267 unreachable!("Entry shouldn't be removed")
268 };
269
270 if entry.notified {
271 return Poll::Ready(());
272 }
273
274 entry.waker = Some(cx.waker().clone());
276
277 Poll::Pending
278 }
279}
280
281#[cfg(test)]
282mod tests {
283 use super::*;
284
285 #[test]
286 fn test_notify() {
287 let event = Event::new();
288 let listener = event.listen();
289
290 assert!(!listener.is_notified());
291 event.notify(1);
292 assert!(listener.is_notified());
293 }
294
295 #[test]
296 fn test_notify_multiple() {
297 let event = Event::new();
298 let listener1 = event.listen();
299 let listener2 = event.listen();
300 let listener3 = event.listen();
301
302 event.notify(2);
303
304 assert!(listener1.is_notified());
305 assert!(listener2.is_notified());
306 assert!(!listener3.is_notified());
307
308 event.notify(2);
309
310 assert!(listener1.is_notified());
311 assert!(listener2.is_notified());
312 assert!(!listener3.is_notified());
313 }
314
315 #[test]
316 fn test_notify_additional() {
317 let event = Event::new();
318 let listener1 = event.listen();
319 let listener2 = event.listen();
320 let listener3 = event.listen();
321 let listener4 = event.listen();
322
323 event.notify(2);
324
325 assert!(listener1.is_notified());
326 assert!(listener2.is_notified());
327 assert!(!listener3.is_notified());
328 assert!(!listener4.is_notified());
329
330 event.notify_additional(2);
331
332 assert!(listener1.is_notified());
333 assert!(listener2.is_notified());
334 assert!(listener3.is_notified());
335 assert!(listener4.is_notified());
336 }
337
338 #[test]
339 fn test_notify_all() {
340 let event = Event::new();
341 let listener1 = event.listen();
342 let listener2 = event.listen();
343 let listener3 = event.listen();
344
345 event.notify(usize::MAX);
346
347 assert!(listener1.is_notified());
348 assert!(listener2.is_notified());
349 assert!(listener3.is_notified());
350 }
351
352 #[test]
353 fn test_notify_drop() {
354 let event = Event::new();
355 let listener1 = event.listen();
356 let listener2 = event.listen();
357 let listener3 = event.listen();
358
359 event.notify(2);
360
361 assert!(listener1.is_notified());
362 assert!(listener2.is_notified());
363 assert!(!listener3.is_notified());
364
365 drop(listener2);
366
367 assert!(listener3.is_notified());
368 }
369
370 #[pollster::test]
371 async fn test_listen_async() {
372 let event = Event::new();
373 let listener = event.listen();
374
375 event.notify(1);
376
377 listener.await
378 }
379}