use std::fmt::Debug;
use std::marker::PhantomData;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use crate::Exit;
use super::task::{Interrupted, Terminator};
const STORE_TICK_RATE: Duration = Duration::from_millis(1000);
pub trait Update<M> {
type Return;
fn update(&mut self, message: M) -> Option<Exit<Self::Return>>;
fn tick(&mut self) {}
}
pub struct Store<S, M, P>
where
S: Update<M, Return = P> + Clone + Send + Sync,
P: Clone + Debug + Send + Sync,
{
state_tx: UnboundedSender<S>,
_phantom: PhantomData<(M, P)>,
}
impl<S, M, P> Store<S, M, P>
where
S: Update<M, Return = P> + Clone + Send + Sync,
P: Clone + Debug + Send + Sync,
{
pub fn new() -> (Self, UnboundedReceiver<S>) {
let (state_tx, state_rx) = mpsc::unbounded_channel::<S>();
(
Store {
state_tx,
_phantom: PhantomData,
},
state_rx,
)
}
}
impl<S, M, P> Store<S, M, P>
where
S: Update<M, Return = P> + Clone + Debug + Send + Sync + 'static,
P: Clone + Debug + Send + Sync + 'static,
{
pub async fn run(
self,
mut state: S,
mut terminator: Terminator<P>,
mut message_rx: UnboundedReceiver<M>,
mut interrupt_rx: broadcast::Receiver<Interrupted<P>>,
) -> anyhow::Result<Interrupted<P>> {
self.state_tx.send(state.clone())?;
let mut ticker = tokio::time::interval(STORE_TICK_RATE);
let result = loop {
tokio::select! {
Some(message) = message_rx.recv() => {
if let Some(exit) = state.update(message) {
let interrupted = Interrupted::User { payload: exit.value };
let _ = terminator.terminate(interrupted.clone());
break interrupted;
}
},
_ = ticker.tick() => {
state.tick();
},
Ok(interrupted) = interrupt_rx.recv() => {
break interrupted;
}
}
self.state_tx.send(state.clone())?;
};
Ok(result)
}
}