1use futures::channel::mpsc::{self, TrySendError};
2use futures::{Future, SinkExt, StreamExt};
3
4use crate::{Key, Link, UseAsync, UseReference};
5
6#[macro_export]
7macro_rules! use_channel {
8 ($manager:expr, $task:expr) => {
9 $crate::UseChannel::use_channel($manager, $crate::Key::from($crate::use_id!()), $task)
10 };
11}
12
13pub struct Sender<T>(mpsc::Sender<T>)
14where
15 T: Send + 'static;
16
17impl<T> Sender<T>
18where
19 T: Send + 'static,
20{
21 pub fn send(&mut self, message: T) -> futures::sink::Send<mpsc::Sender<T>, T> {
22 self.0.send(message)
23 }
24
25 pub fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
26 self.0.try_send(message)
27 }
28}
29
30impl<T> Clone for Sender<T>
31where
32 T: Send + 'static,
33{
34 fn clone(&self) -> Self {
35 Sender(self.0.clone())
36 }
37}
38
39pub struct Receiver<T>(mpsc::Receiver<T>)
40where
41 T: Send + 'static;
42
43impl<T> Receiver<T>
44where
45 T: Send + 'static,
46{
47 pub fn next(&mut self) -> impl Future<Output = Option<T>> + '_ {
48 self.0.next()
49 }
50}
51
52pub trait UseChannel {
53 fn use_channel<T, C, F>(&mut self, key: Key, closure: C) -> Sender<T>
54 where
55 T: Send + 'static,
56 C: FnOnce(Receiver<T>) -> F + 'static,
57 F: Future<Output = ()>;
58}
59
60impl<M> UseChannel for M
61where
62 M: UseAsync + UseReference + Link,
63{
64 fn use_channel<T, C, F>(&mut self, key: Key, closure: C) -> Sender<T>
65 where
66 T: Send + 'static,
67 C: FnOnce(Receiver<T>) -> F + 'static,
68 F: Future<Output = ()>,
69 {
70 let mut rx = None;
71
72 let tx = self.use_reference(key.clone(), || {
73 let (new_tx, new_rx) = mpsc::channel::<T>(1024);
74 rx = Some(new_rx);
75 new_tx
76 });
77
78 let tx = tx.apply(self, |tx| tx.to_owned());
79
80 self.use_async(key, async move {
81 if let Some(rx) = rx.take() {
82 closure(Receiver(rx)).await;
83 }
84 });
85
86 Sender(tx)
87 }
88}