use std::future::Future;
use tokio::sync::mpsc;
use crate::rhythm::Rhythm;
pub struct EventTrigger<T> {
tx: mpsc::Sender<T>,
}
impl<T: Send + 'static> EventTrigger<T> {
pub fn fire(&self, payload: T) {
let _ = self.tx.blocking_send(payload);
}
}
pub struct EventRhythm<T> {
rx: mpsc::Receiver<T>,
}
impl<T: Send + 'static> Rhythm for EventRhythm<T> {
type Yield = T;
type Feed = ();
type Output = ();
type Error = crate::RoplatError;
async fn drive<N, F, Fut>(
&mut self,
mut nodes: N,
mut op_domain: F,
) -> (Self::Output, N)
where
N: Send,
F: FnMut(N, Self::Yield) -> Fut + Send,
Fut: Future<Output = (Self::Feed, N)> + Send,
{
while let Some(payload) = self.rx.recv().await {
let ((), returned_nodes) = op_domain(nodes, payload).await;
nodes = returned_nodes;
}
((), nodes)
}
}
pub fn create_event_channel<T>(buffer_size: usize) -> (EventTrigger<T>, EventRhythm<T>) {
let (tx, rx) = mpsc::channel(buffer_size);
(EventTrigger { tx }, EventRhythm { rx })
}