makepad_futures/channel/
mpsc.rs

1use {
2    crate::Stream,
3    std::{
4        collections::VecDeque,
5        error, fmt,
6        pin::Pin,
7        sync::{Arc, Mutex},
8        task::{Context, Poll, Waker},
9    },
10};
11
12#[derive(Debug)]
13pub struct UnboundedSender<T> {
14    channel: Arc<Mutex<UnboundedChannel<T>>>,
15}
16
17impl<T> UnboundedSender<T> {
18    pub fn send(&self, message: T) -> Result<(), SendError<T>> {
19        let mut channel = self.channel.lock().unwrap();
20        if channel.is_closed {
21            return Err(SendError(message));
22        }
23        channel.message_queue.push_back(message);
24        if let Some(recv_task) = channel.recv_task.take() {
25            recv_task.wake();
26        }
27        Ok(())
28    }
29}
30
31impl<T> Clone for UnboundedSender<T> {
32    fn clone(&self) -> Self {
33        let mut channel = self.channel.lock().unwrap();
34        channel.sender_count += 1;
35        Self {
36            channel: self.channel.clone(),
37        }
38    }
39}
40
41impl<T> Drop for UnboundedSender<T> {
42    fn drop(&mut self) {
43        let mut channel = self.channel.lock().unwrap();
44        channel.sender_count -= 1;
45        if channel.sender_count == 0 {
46            if let Some(recv_task) = channel.recv_task.take() {
47                recv_task.wake();
48            }
49        }
50    }
51}
52
53#[derive(Clone, Eq, PartialEq)]
54pub struct SendError<T>(pub T);
55
56impl<T> error::Error for SendError<T> {}
57
58impl<T> fmt::Debug for SendError<T> {
59    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60        f.debug_struct("SendError").finish_non_exhaustive()
61    }
62}
63
64impl<T> fmt::Display for SendError<T> {
65    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
66        write!(f, "sending on a closed channel")
67    }
68}
69
70#[derive(Debug)]
71pub struct UnboundedReceiver<T> {
72    channel: Arc<Mutex<UnboundedChannel<T>>>,
73}
74
75impl<T> Drop for UnboundedReceiver<T> {
76    fn drop(&mut self) {
77        let mut channel = self.channel.lock().unwrap();
78        channel.is_closed = true;
79        channel.message_queue.clear();
80    }
81}
82
83impl<T> Stream for UnboundedReceiver<T> {
84    type Item = T;
85
86    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
87        let mut channel = self.channel.lock().unwrap();
88        match channel.message_queue.pop_front() {
89            Some(message) => Poll::Ready(Some(message)),
90            None => {
91                if channel.sender_count == 0 {
92                    Poll::Ready(None)
93                } else {
94                    channel.recv_task = Some(cx.waker().clone());
95                    Poll::Pending
96                }
97            }
98        }
99    }
100}
101
102#[derive(Debug)]
103struct UnboundedChannel<T> {
104    is_closed: bool,
105    sender_count: usize,
106    message_queue: VecDeque<T>,
107    recv_task: Option<Waker>,
108}
109
110pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
111    let channel = Arc::new(Mutex::new(UnboundedChannel {
112        is_closed: false,
113        sender_count: 1,
114        message_queue: VecDeque::new(),
115        recv_task: None,
116    }));
117    (
118        UnboundedSender {
119            channel: channel.clone(),
120        },
121        UnboundedReceiver { channel },
122    )
123}