1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
use futures::channel::mpsc::{self, TrySendError}; use futures::{Future, StreamExt}; use polyhorn_core::{Link, UseAsync, UseReference}; #[doc(hidden)] pub use polyhorn_core::{use_id, Key}; #[macro_export] macro_rules! use_channel { ($manager:expr, $task:expr) => { $crate::UseChannel::use_channel($manager, $crate::Key::from($crate::use_id!()), $task) }; } pub struct Sender<T>(mpsc::Sender<T>) where T: Send + 'static; impl<T> Sender<T> where T: Send + 'static, { pub fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> { self.0.try_send(message) } } impl<T> Clone for Sender<T> where T: Send + 'static, { fn clone(&self) -> Self { Sender(self.0.clone()) } } pub struct Receiver<T>(mpsc::Receiver<T>) where T: Send + 'static; impl<T> Receiver<T> where T: Send + 'static, { pub fn next(&mut self) -> impl Future<Output = Option<T>> + '_ { self.0.next() } } pub trait UseChannel { fn use_channel<T, C, F>(&mut self, key: Key, closure: C) -> Sender<T> where T: Send + 'static, C: FnOnce(Receiver<T>) -> F + 'static, F: Future<Output = ()>; } impl<M> UseChannel for M where M: UseAsync + UseReference + Link, { fn use_channel<T, C, F>(&mut self, key: Key, closure: C) -> Sender<T> where T: Send + 'static, C: FnOnce(Receiver<T>) -> F + 'static, F: Future<Output = ()>, { let mut rx = None; let tx = self.use_reference(key.clone(), || { let (new_tx, new_rx) = mpsc::channel::<T>(1024); rx = Some(new_rx); new_tx }); let tx = tx.apply(self, |tx| tx.to_owned()); self.use_async(key, async move { if let Some(rx) = rx.take() { closure(Receiver(rx)).await; } }); Sender(tx) } }