1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
use futures::channel::mpsc::{self, TrySendError};
use futures::{Future, StreamExt};
use polyhorn_core::{Link, UseAsync, UseReference};

#[doc(hidden)]
pub use polyhorn_core::{use_id, Key};

#[macro_export]
macro_rules! use_channel {
    ($manager:expr, $task:expr) => {
        $crate::UseChannel::use_channel($manager, $crate::Key::from($crate::use_id!()), $task)
    };
}

pub struct Sender<T>(mpsc::Sender<T>)
where
    T: Send + 'static;

impl<T> Sender<T>
where
    T: Send + 'static,
{
    pub fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
        self.0.try_send(message)
    }
}

impl<T> Clone for Sender<T>
where
    T: Send + 'static,
{
    fn clone(&self) -> Self {
        Sender(self.0.clone())
    }
}

pub struct Receiver<T>(mpsc::Receiver<T>)
where
    T: Send + 'static;

impl<T> Receiver<T>
where
    T: Send + 'static,
{
    pub fn next(&mut self) -> impl Future<Output = Option<T>> + '_ {
        self.0.next()
    }
}

pub trait UseChannel {
    fn use_channel<T, C, F>(&mut self, key: Key, closure: C) -> Sender<T>
    where
        T: Send + 'static,
        C: FnOnce(Receiver<T>) -> F + 'static,
        F: Future<Output = ()>;
}

impl<M> UseChannel for M
where
    M: UseAsync + UseReference + Link,
{
    fn use_channel<T, C, F>(&mut self, key: Key, closure: C) -> Sender<T>
    where
        T: Send + 'static,
        C: FnOnce(Receiver<T>) -> F + 'static,
        F: Future<Output = ()>,
    {
        let mut rx = None;

        let tx = self.use_reference(key.clone(), || {
            let (new_tx, new_rx) = mpsc::channel::<T>(1024);
            rx = Some(new_rx);
            new_tx
        });

        let tx = tx.apply(self, |tx| tx.to_owned());

        self.use_async(key, async move {
            if let Some(rx) = rx.take() {
                closure(Receiver(rx)).await;
            }
        });

        Sender(tx)
    }
}