makepad_futures/channel/
mpsc.rs1use {
2 crate::Stream,
3 std::{
4 collections::VecDeque,
5 error, fmt,
6 pin::Pin,
7 sync::{Arc, Mutex},
8 task::{Context, Poll, Waker},
9 },
10};
11
12#[derive(Debug)]
13pub struct UnboundedSender<T> {
14 channel: Arc<Mutex<UnboundedChannel<T>>>,
15}
16
17impl<T> UnboundedSender<T> {
18 pub fn send(&self, message: T) -> Result<(), SendError<T>> {
19 let mut channel = self.channel.lock().unwrap();
20 if channel.is_closed {
21 return Err(SendError(message));
22 }
23 channel.message_queue.push_back(message);
24 if let Some(recv_task) = channel.recv_task.take() {
25 recv_task.wake();
26 }
27 Ok(())
28 }
29}
30
31impl<T> Clone for UnboundedSender<T> {
32 fn clone(&self) -> Self {
33 let mut channel = self.channel.lock().unwrap();
34 channel.sender_count += 1;
35 Self {
36 channel: self.channel.clone(),
37 }
38 }
39}
40
41impl<T> Drop for UnboundedSender<T> {
42 fn drop(&mut self) {
43 let mut channel = self.channel.lock().unwrap();
44 channel.sender_count -= 1;
45 if channel.sender_count == 0 {
46 if let Some(recv_task) = channel.recv_task.take() {
47 recv_task.wake();
48 }
49 }
50 }
51}
52
53#[derive(Clone, Eq, PartialEq)]
54pub struct SendError<T>(pub T);
55
56impl<T> error::Error for SendError<T> {}
57
58impl<T> fmt::Debug for SendError<T> {
59 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60 f.debug_struct("SendError").finish_non_exhaustive()
61 }
62}
63
64impl<T> fmt::Display for SendError<T> {
65 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
66 write!(f, "sending on a closed channel")
67 }
68}
69
70#[derive(Debug)]
71pub struct UnboundedReceiver<T> {
72 channel: Arc<Mutex<UnboundedChannel<T>>>,
73}
74
75impl<T> Drop for UnboundedReceiver<T> {
76 fn drop(&mut self) {
77 let mut channel = self.channel.lock().unwrap();
78 channel.is_closed = true;
79 channel.message_queue.clear();
80 }
81}
82
83impl<T> Stream for UnboundedReceiver<T> {
84 type Item = T;
85
86 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
87 let mut channel = self.channel.lock().unwrap();
88 match channel.message_queue.pop_front() {
89 Some(message) => Poll::Ready(Some(message)),
90 None => {
91 if channel.sender_count == 0 {
92 Poll::Ready(None)
93 } else {
94 channel.recv_task = Some(cx.waker().clone());
95 Poll::Pending
96 }
97 }
98 }
99 }
100}
101
102#[derive(Debug)]
103struct UnboundedChannel<T> {
104 is_closed: bool,
105 sender_count: usize,
106 message_queue: VecDeque<T>,
107 recv_task: Option<Waker>,
108}
109
110pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
111 let channel = Arc::new(Mutex::new(UnboundedChannel {
112 is_closed: false,
113 sender_count: 1,
114 message_queue: VecDeque::new(),
115 recv_task: None,
116 }));
117 (
118 UnboundedSender {
119 channel: channel.clone(),
120 },
121 UnboundedReceiver { channel },
122 )
123}