makepad_futures/channel/
oneshot.rs

1use std::{
2    error, fmt,
3    future::Future,
4    pin::Pin,
5    sync::{Arc, Mutex},
6    task::{Context, Poll, Waker},
7};
8
9#[derive(Clone, Debug)]
10pub struct Sender<T> {
11    channel: Arc<Mutex<Channel<T>>>,
12}
13
14impl<T> Sender<T> {
15    pub fn send(self, message: T) -> Result<(), SendError<T>> {
16        let mut channel = self.channel.lock().unwrap();
17        if channel.is_complete {
18            return Err(SendError(message));
19        }
20        channel.message = Some(message);
21        if let Some(recv_task) = channel.recv_task.take() {
22            recv_task.wake();
23        }
24        Ok(())
25    }
26}
27
28impl<T> Drop for Sender<T> {
29    fn drop(&mut self) {
30        let mut channel = self.channel.lock().unwrap();
31        channel.is_complete = true;
32        if let Some(recv_task) = channel.recv_task.take() {
33            recv_task.wake();
34        }
35    }
36}
37
38#[derive(Clone, Eq, PartialEq)]
39pub struct SendError<T>(pub T);
40
41impl<T> error::Error for SendError<T> {}
42
43impl<T> fmt::Debug for SendError<T> {
44    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
45        f.debug_struct("SendError").finish_non_exhaustive()
46    }
47}
48
49impl<T> fmt::Display for SendError<T> {
50    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51        write!(f, "sending on a closed channel")
52    }
53}
54
55#[derive(Debug)]
56pub struct Receiver<T> {
57    channel: Arc<Mutex<Channel<T>>>,
58}
59
60impl<T> Future for Receiver<T> {
61    type Output = Result<T, RecvError>;
62
63    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
64        let mut channel = self.channel.lock().unwrap();
65        match channel.message.take() {
66            Some(message) => Poll::Ready(Ok(message)),
67            None => {
68                if channel.is_complete {
69                    Poll::Ready(Err(RecvError { _priv: () }))
70                } else {
71                    channel.recv_task = Some(cx.waker().clone());
72                    Poll::Pending
73                }
74            }
75        }
76    }
77}
78
79impl<T> Drop for Receiver<T> {
80    fn drop(&mut self) {
81        let mut channel = self.channel.lock().unwrap();
82        channel.is_complete = true;
83        channel.message = None;
84    }
85}
86
87#[derive(Clone, Debug, Eq, PartialEq)]
88pub struct RecvError {
89    _priv: (),
90}
91
92impl error::Error for RecvError {}
93
94impl fmt::Display for RecvError {
95    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96        write!(f, "receiving from a closed channel")
97    }
98}
99
100#[derive(Debug)]
101struct Channel<T> {
102    is_complete: bool,
103    message: Option<T>,
104    recv_task: Option<Waker>,
105}
106
107pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
108    let channel = Arc::new(Mutex::new(Channel {
109        is_complete: false,
110        message: None,
111        recv_task: None,
112    }));
113    (
114        Sender {
115            channel: channel.clone(),
116        },
117        Receiver { channel },
118    )
119}