polyhorn_core/
channel.rs

1use futures::channel::mpsc::{self, TrySendError};
2use futures::{Future, SinkExt, StreamExt};
3
4use crate::{Key, Link, UseAsync, UseReference};
5
6#[macro_export]
7macro_rules! use_channel {
8    ($manager:expr, $task:expr) => {
9        $crate::UseChannel::use_channel($manager, $crate::Key::from($crate::use_id!()), $task)
10    };
11}
12
13pub struct Sender<T>(mpsc::Sender<T>)
14where
15    T: Send + 'static;
16
17impl<T> Sender<T>
18where
19    T: Send + 'static,
20{
21    pub fn send(&mut self, message: T) -> futures::sink::Send<mpsc::Sender<T>, T> {
22        self.0.send(message)
23    }
24
25    pub fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
26        self.0.try_send(message)
27    }
28}
29
30impl<T> Clone for Sender<T>
31where
32    T: Send + 'static,
33{
34    fn clone(&self) -> Self {
35        Sender(self.0.clone())
36    }
37}
38
39pub struct Receiver<T>(mpsc::Receiver<T>)
40where
41    T: Send + 'static;
42
43impl<T> Receiver<T>
44where
45    T: Send + 'static,
46{
47    pub fn next(&mut self) -> impl Future<Output = Option<T>> + '_ {
48        self.0.next()
49    }
50}
51
52pub trait UseChannel {
53    fn use_channel<T, C, F>(&mut self, key: Key, closure: C) -> Sender<T>
54    where
55        T: Send + 'static,
56        C: FnOnce(Receiver<T>) -> F + 'static,
57        F: Future<Output = ()>;
58}
59
60impl<M> UseChannel for M
61where
62    M: UseAsync + UseReference + Link,
63{
64    fn use_channel<T, C, F>(&mut self, key: Key, closure: C) -> Sender<T>
65    where
66        T: Send + 'static,
67        C: FnOnce(Receiver<T>) -> F + 'static,
68        F: Future<Output = ()>,
69    {
70        let mut rx = None;
71
72        let tx = self.use_reference(key.clone(), || {
73            let (new_tx, new_rx) = mpsc::channel::<T>(1024);
74            rx = Some(new_rx);
75            new_tx
76        });
77
78        let tx = tx.apply(self, |tx| tx.to_owned());
79
80        self.use_async(key, async move {
81            if let Some(rx) = rx.take() {
82                closure(Receiver(rx)).await;
83            }
84        });
85
86        Sender(tx)
87    }
88}