iroh_blobs/util/
channel.rs

1pub mod oneshot {
2    use std::{
3        future::Future,
4        pin::Pin,
5        task::{Context, Poll},
6    };
7
8    pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
9        let (tx, rx) = tokio::sync::oneshot::channel::<T>();
10        (Sender::Tokio(tx), Receiver::Tokio(rx))
11    }
12
13    #[derive(Debug)]
14    pub enum Sender<T> {
15        Tokio(tokio::sync::oneshot::Sender<T>),
16    }
17
18    impl<T> From<Sender<T>> for irpc::channel::oneshot::Sender<T> {
19        fn from(sender: Sender<T>) -> Self {
20            match sender {
21                Sender::Tokio(tx) => tx.into(),
22            }
23        }
24    }
25
26    impl<T> Sender<T> {
27        pub fn send(self, value: T) {
28            match self {
29                Self::Tokio(tx) => tx.send(value).ok(),
30            };
31        }
32    }
33
34    pub enum Receiver<T> {
35        Tokio(tokio::sync::oneshot::Receiver<T>),
36    }
37
38    impl<T> Future for Receiver<T> {
39        type Output = std::result::Result<T, tokio::sync::oneshot::error::RecvError>;
40
41        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
42            match self.as_mut().get_mut() {
43                Self::Tokio(rx) => {
44                    if rx.is_terminated() {
45                        // don't panic when polling a terminated receiver
46                        Poll::Pending
47                    } else {
48                        Future::poll(Pin::new(rx), cx)
49                    }
50                }
51            }
52        }
53    }
54}