simple_rc_async/sync/
broadcast.rs

1use std::{borrow::BorrowMut, cell::{Cell, RefCell}, ops::Deref, os::unix::thread, pin::Pin, rc::{Rc, Weak}, task::{Context, LocalWaker, Poll}};
2
3use futures_core::Future;
4
5pub(crate) struct Channel<T: Clone> {
6    result: Poll<T>,
7    waker: Vec<LocalWaker>,
8}
9
10#[derive(Clone)]
11pub struct Reciever<T: Clone>(Weak<RefCell<Channel<T>>>);
12
13impl<T: Clone + std::fmt::Debug> futures_core::stream::Stream for Reciever<T> {
14    type Item = T;
15
16    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
17        if let Some(this) = self.0.upgrade() {
18            Sender(this).poll_ref(cx).map(|x| Some(x))
19        } else {
20            Poll::Ready(None)
21        }
22    }
23}
24
25#[derive(Clone)]
26pub struct Sender<T: Clone>(Rc<RefCell<Channel<T>>>);
27
28impl<T: Clone + std::fmt::Debug> std::future::Future for Sender<T> {
29    type Output = T;
30
31    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
32        self.poll_ref(cx)
33    }
34}
35
36
37impl<T: Clone + std::fmt::Debug> Sender<T> {
38    pub fn poll_ref(&self, cx: &mut Context<'_>) -> Poll<T> {
39        assert!(self.0.as_ptr() as usize >= 0x200);
40        let mut this = self.0.deref().borrow_mut();
41        let result = std::mem::replace(&mut this.result, Poll::Pending);
42        if result.is_pending() { this.waker.push(cx.local_waker().clone()); }
43        result
44    }
45    pub fn new() -> Self {
46        Self(Rc::new(RefCell::new(Channel {
47            result: Poll::Pending,
48            waker: Vec::new(),
49        })))
50    }
51    fn retrieve_wakers(&self) -> Vec<LocalWaker> {
52        let mut lock = (*self.0).borrow_mut();
53        std::mem::replace(&mut lock.waker, Vec::new())
54    }
55    pub fn send(&self, v: T) {
56        let wakers = self.retrieve_wakers();
57        let pointer = self.0.clone();
58        for waker in wakers {
59            {
60                assert!(Rc::ptr_eq(&self.0, &pointer));
61                let a = &*pointer;
62                let mut b = a.borrow_mut();
63                b.result = Poll::Ready(v.clone());
64            }
65            waker.wake();
66        }
67    }
68
69    pub fn reciever(&self) -> Reciever<T> {
70        Reciever(Rc::downgrade(&self.0))
71    }
72}
73
74pub fn channel<T: Clone + std::fmt::Debug>() -> Sender<T> {
75    let a = Sender::new();
76    assert!(a.0.as_ptr() as usize >= 0x200);
77    a
78}
79
80#[derive(Clone)]
81pub enum MaybeReady<T: Clone> {
82    Ready(T),
83    Pending(Sender<T>),
84}
85
86impl<T: Clone + std::fmt::Debug> MaybeReady<T> {
87    pub fn pending() -> Self {
88        Self::Pending(channel())
89    }
90    pub fn ready(t: T) -> Self {
91        Self::Ready(t)
92    }
93    pub fn is_ready(&self) -> bool {
94        matches!(self, Self::Ready(_))
95    }
96    pub fn set(&mut self, t: T) {
97        if let Self::Pending(ref sd) = std::mem::replace(self, Self::Ready(t.clone())) {
98            sd.send(t.clone());
99        }
100    }
101    pub fn sender(&mut self, t: T) -> Option<Sender<T>> {
102        let res = if let Self::Pending(ref sd) = self {
103            Some(sd.clone())
104        } else { None };
105        *self = Self::Ready(t);
106        res
107    }
108    pub fn poll(&self) -> Poll<T> {
109        match self {
110            MaybeReady::Ready(a) => Poll::Ready(a.clone()),
111            MaybeReady::Pending(_) => Poll::Pending,
112        }
113    }
114    pub fn poll_opt(&self) -> Option<T> {
115        match self {
116            MaybeReady::Ready(a) => Some(a.clone()),
117            MaybeReady::Pending(_) => None,
118        }
119    }
120    pub async fn get(&self) -> T {
121        match self {
122            MaybeReady::Ready(a) => a.clone(),
123            MaybeReady::Pending(sender) => sender.clone().await,
124        }
125    }
126}
127
128#[cfg(test)]
129mod test {
130    use std::task::Poll;
131
132    use futures::StreamExt;
133
134    use crate::task;
135
136    use super::channel;
137
138    #[test]
139    fn test() {
140        let sd = channel::<usize>();
141        eprintln!("Creating handle2");
142        let mut rv2 = sd.reciever();
143        let handle2 = task::spawn(async move {
144            eprintln!("  handle2 0");
145            let a = rv2.next().await.unwrap();
146            eprintln!("  handle2 1");
147            let b = rv2.next().await.unwrap();
148            eprintln!("  handle2 2");
149            let c = rv2.next().await.unwrap();
150            eprintln!("  handle2 3");
151            a + b + c
152        });
153        eprintln!("Creating handle");
154        let mut rv = sd.reciever();
155        let handle = task::spawn(async move {
156            eprintln!("  handle 0");
157            let a = rv.next().await.unwrap();
158            eprintln!("  handle 1");
159            let b = rv.next().await.unwrap();
160            eprintln!("  handle 2");
161            a + b + handle2.await
162        });
163        assert!(!handle.is_ready());
164        eprintln!("Sending 1");
165        let _ = sd.send(1);
166        assert!(!handle.is_ready());
167        eprintln!("Sending 2");
168        let _ = sd.send(2);
169        assert!(!handle.is_ready());
170        eprintln!("Sending 3");
171        let _ = sd.send(3);
172        assert!(handle.is_ready());
173        assert!(handle.poll_rc_nocx() == Poll::Ready(9));
174    }
175
176    // #[test]
177    // fn test2() {
178    //     let cell1 = MaybeReady::pending();
179    //     let cell2 = MaybeReady::pending();
180    //     let handle1 = task::spawn(async { rv.await });
181    //     let handle2 = task::spawn(async { handle1.await });
182    //     assert!(!handle2.is_ready());
183    //     assert!(handle2.poll_rc_nocx() == Poll::Pending);
184    //     let _ = sd.send(1);
185    //     assert!(handle2.is_ready());
186    //     assert!(handle2.poll_rc_nocx() == Poll::Ready(1));a
187    // }
188}
189