1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
use crate::Event;
use futures_core::stream::Stream;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};
use std::cell::RefCell;
use std::sync::Arc;

pub struct EventStream {
    buffer: Arc<RefCell<EventBuffer>>,
}

impl EventStream {
    pub fn new() -> EventStream {
        EventStream {
            buffer: Arc::new(RefCell::new(EventBuffer {
                events: VecDeque::new(),
                waker: None
            }))
        }
    }

    pub(crate) fn buffer(&self) -> Arc<RefCell<EventBuffer>> {
        self.buffer.clone()
    }
}

impl Stream for EventStream {
    type Item = Event;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context)
        -> Poll<Option<Self::Item>> {
        let mut buffer = self.buffer.borrow_mut();

        match buffer.events.pop_front() {
            Some(event) => Poll::Ready(Some(event)),
            None => {
                buffer.waker = Some(cx.waker().clone());
                Poll::Pending
            }
        }
    }
}

pub(crate) struct EventBuffer {
    events: VecDeque<Event>,
    waker: Option<Waker>,
}

impl EventBuffer {
    pub fn push(&mut self, event: Event) {
        self.events.push_back(event);
        if let Some(waker) = self.waker.take() {
            waker.wake();
        }
    }
}