makepad_futures/channel/
oneshot.rs1use std::{
2 error, fmt,
3 future::Future,
4 pin::Pin,
5 sync::{Arc, Mutex},
6 task::{Context, Poll, Waker},
7};
8
9#[derive(Clone, Debug)]
10pub struct Sender<T> {
11 channel: Arc<Mutex<Channel<T>>>,
12}
13
14impl<T> Sender<T> {
15 pub fn send(self, message: T) -> Result<(), SendError<T>> {
16 let mut channel = self.channel.lock().unwrap();
17 if channel.is_complete {
18 return Err(SendError(message));
19 }
20 channel.message = Some(message);
21 if let Some(recv_task) = channel.recv_task.take() {
22 recv_task.wake();
23 }
24 Ok(())
25 }
26}
27
28impl<T> Drop for Sender<T> {
29 fn drop(&mut self) {
30 let mut channel = self.channel.lock().unwrap();
31 channel.is_complete = true;
32 if let Some(recv_task) = channel.recv_task.take() {
33 recv_task.wake();
34 }
35 }
36}
37
38#[derive(Clone, Eq, PartialEq)]
39pub struct SendError<T>(pub T);
40
41impl<T> error::Error for SendError<T> {}
42
43impl<T> fmt::Debug for SendError<T> {
44 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
45 f.debug_struct("SendError").finish_non_exhaustive()
46 }
47}
48
49impl<T> fmt::Display for SendError<T> {
50 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51 write!(f, "sending on a closed channel")
52 }
53}
54
55#[derive(Debug)]
56pub struct Receiver<T> {
57 channel: Arc<Mutex<Channel<T>>>,
58}
59
60impl<T> Future for Receiver<T> {
61 type Output = Result<T, RecvError>;
62
63 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
64 let mut channel = self.channel.lock().unwrap();
65 match channel.message.take() {
66 Some(message) => Poll::Ready(Ok(message)),
67 None => {
68 if channel.is_complete {
69 Poll::Ready(Err(RecvError { _priv: () }))
70 } else {
71 channel.recv_task = Some(cx.waker().clone());
72 Poll::Pending
73 }
74 }
75 }
76 }
77}
78
79impl<T> Drop for Receiver<T> {
80 fn drop(&mut self) {
81 let mut channel = self.channel.lock().unwrap();
82 channel.is_complete = true;
83 channel.message = None;
84 }
85}
86
87#[derive(Clone, Debug, Eq, PartialEq)]
88pub struct RecvError {
89 _priv: (),
90}
91
92impl error::Error for RecvError {}
93
94impl fmt::Display for RecvError {
95 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96 write!(f, "receiving from a closed channel")
97 }
98}
99
100#[derive(Debug)]
101struct Channel<T> {
102 is_complete: bool,
103 message: Option<T>,
104 recv_task: Option<Waker>,
105}
106
107pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
108 let channel = Arc::new(Mutex::new(Channel {
109 is_complete: false,
110 message: None,
111 recv_task: None,
112 }));
113 (
114 Sender {
115 channel: channel.clone(),
116 },
117 Receiver { channel },
118 )
119}