use std::{
cell::RefCell,
collections::BTreeMap,
fmt::Debug,
pin::Pin,
rc::Rc,
task::{Context, Poll, Waker},
};
#[derive(Clone)]
pub struct Event {
inner: Rc<RefCell<Inner>>,
}
impl Debug for Event {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let guard = self.inner.try_borrow();
match guard {
Ok(inner) => f.debug_tuple("Event").field(&inner).finish(),
Err(_) => f.debug_tuple("Event").field(&"<locked>").finish(),
}
}
}
#[derive(Debug)]
struct Inner {
listeners: BTreeMap<usize, ListenerEntry>,
next_id: usize,
notified: usize,
}
#[derive(Debug, Default)]
struct ListenerEntry {
waker: Option<Waker>,
notified: bool,
}
impl Event {
pub fn new() -> Self {
Event {
inner: Rc::new(RefCell::new(Inner {
listeners: BTreeMap::new(),
next_id: 0,
notified: 0,
})),
}
}
pub fn listen(&self) -> EventListener {
let mut inner = self.inner.borrow_mut();
let id = inner.next_id;
inner.next_id += 1;
inner.listeners.insert(id, ListenerEntry::default());
EventListener {
event: Rc::clone(&self.inner),
id,
}
}
pub fn notify(&self, n: usize) {
let mut inner = self.inner.borrow_mut();
let count = if n == usize::MAX {
inner.listeners.len()
} else {
n.saturating_sub(inner.notified)
};
let mut notified = 0;
for entry in inner.listeners.values_mut() {
if notified >= count {
break;
}
if entry.notified {
continue;
}
entry.notified = true;
if let Some(waker) = entry.waker.take() {
waker.wake();
}
notified += 1;
}
inner.notified += notified;
}
pub fn notify_additional(&self, n: usize) {
let mut inner = self.inner.borrow_mut();
let count = if n == usize::MAX {
inner.listeners.len()
} else {
n.min(inner.listeners.len())
};
let mut notified = 0;
for entry in inner.listeners.values_mut() {
if notified >= count {
break;
}
if entry.notified {
continue;
}
entry.notified = true;
if let Some(waker) = entry.waker.take() {
waker.wake();
}
notified += 1;
}
inner.notified += notified;
}
pub fn notify_all(&self) {
self.notify(usize::MAX);
}
}
impl Default for Event {
fn default() -> Self {
Self::new()
}
}
pub struct EventListener {
event: Rc<RefCell<Inner>>,
id: usize,
}
impl EventListener {
pub fn is_notified(&self) -> bool {
self.event
.borrow()
.listeners
.get(&self.id)
.map(|e| e.notified)
.unwrap_or(false)
}
}
impl Drop for EventListener {
fn drop(&mut self) {
let mut inner = self.event.borrow_mut();
let Some(entry) = inner.listeners.remove(&self.id) else {
return;
};
if !entry.notified || inner.notified == 0 {
return;
}
inner.notified -= 1;
let Some(next) = inner.listeners.values_mut().find(|e| !e.notified) else {
return;
};
next.notified = true;
if let Some(waker) = next.waker.take() {
waker.wake();
}
inner.notified += 1;
}
}
impl std::future::Future for EventListener {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut inner = self.event.borrow_mut();
let Some(entry) = inner.listeners.get_mut(&self.id) else {
unreachable!("Entry shouldn't be removed")
};
if entry.notified {
return Poll::Ready(());
}
entry.waker = Some(cx.waker().clone());
Poll::Pending
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_notify() {
let event = Event::new();
let listener = event.listen();
assert!(!listener.is_notified());
event.notify(1);
assert!(listener.is_notified());
}
#[test]
fn test_notify_multiple() {
let event = Event::new();
let listener1 = event.listen();
let listener2 = event.listen();
let listener3 = event.listen();
event.notify(2);
assert!(listener1.is_notified());
assert!(listener2.is_notified());
assert!(!listener3.is_notified());
event.notify(2);
assert!(listener1.is_notified());
assert!(listener2.is_notified());
assert!(!listener3.is_notified());
}
#[test]
fn test_notify_additional() {
let event = Event::new();
let listener1 = event.listen();
let listener2 = event.listen();
let listener3 = event.listen();
let listener4 = event.listen();
event.notify(2);
assert!(listener1.is_notified());
assert!(listener2.is_notified());
assert!(!listener3.is_notified());
assert!(!listener4.is_notified());
event.notify_additional(2);
assert!(listener1.is_notified());
assert!(listener2.is_notified());
assert!(listener3.is_notified());
assert!(listener4.is_notified());
}
#[test]
fn test_notify_all() {
let event = Event::new();
let listener1 = event.listen();
let listener2 = event.listen();
let listener3 = event.listen();
event.notify(usize::MAX);
assert!(listener1.is_notified());
assert!(listener2.is_notified());
assert!(listener3.is_notified());
}
#[test]
fn test_notify_drop() {
let event = Event::new();
let listener1 = event.listen();
let listener2 = event.listen();
let listener3 = event.listen();
event.notify(2);
assert!(listener1.is_notified());
assert!(listener2.is_notified());
assert!(!listener3.is_notified());
drop(listener2);
assert!(listener3.is_notified());
}
#[pollster::test]
async fn test_listen_async() {
let event = Event::new();
let listener = event.listen();
event.notify(1);
listener.await
}
}