use futures::future::BoxFuture;
use tokio::sync::mpsc;
use crate::rhythm::Rhythm;
pub struct EventTrigger<T> {
tx: mpsc::Sender<T>,
}
impl<T: Send + 'static> EventTrigger<T> {
pub fn fire(&self, payload: T) {
let _ = self.tx.blocking_send(payload);
}
}
pub struct EventRhythm<T> {
rx: mpsc::Receiver<T>,
}
#[async_trait::async_trait]
impl<T: Send + 'static> Rhythm for EventRhythm<T> {
type Yield = T;
type Feed = ();
type Output = ();
type Error = crate::RoplatError;
async fn drive<N, F>(&mut self, mut nodes: N, mut op_domain: F)
where
N: Send + Sync,
F: for<'a> FnMut(&'a mut N, Self::Yield) -> BoxFuture<'a, Self::Feed> + Send,
{
while let Some(payload) = self.rx.recv().await {
op_domain(&mut nodes, payload).await;
}
}
}
pub fn create_event_channel<T>(buffer_size: usize) -> (EventTrigger<T>, EventRhythm<T>) {
let (tx, rx) = mpsc::channel(buffer_size);
(EventTrigger { tx }, EventRhythm { rx })
}