use std::marker::PhantomData;
use std::time::Duration;
use tokio::sync::{
broadcast,
mpsc::{UnboundedReceiver, UnboundedSender},
};
use super::{Exit, Interrupted, Share, Terminator};
const STORE_TICK_RATE: Duration = Duration::from_millis(1000);
pub trait Update<M> {
type Return;
fn update(&mut self, message: M) -> Option<Exit<Self::Return>>;
fn tick(&mut self) {}
}
pub struct Store<S, M, R>
where
S: Update<M, Return = R> + Share,
{
state_tx: UnboundedSender<S>,
_phantom: PhantomData<(M, R)>,
}
impl<S, M, R> Store<S, M, R>
where
S: Update<M, Return = R> + Share,
R: Share,
{
pub fn new(tx: UnboundedSender<S>) -> Self {
Self {
state_tx: tx,
_phantom: PhantomData,
}
}
}
impl<S, M, R> Store<S, M, R>
where
S: Update<M, Return = R> + Share,
M: Share,
R: Share,
{
pub async fn run(
self,
mut state: S,
mut terminator: Terminator<R>,
mut message_rx: broadcast::Receiver<M>,
mut work_rx: UnboundedReceiver<M>,
mut interrupt_rx: broadcast::Receiver<Interrupted<R>>,
) -> anyhow::Result<Interrupted<R>> {
self.state_tx.send(state.clone())?;
let mut ticker = tokio::time::interval(STORE_TICK_RATE);
let result = loop {
tokio::select! {
Ok(message) = message_rx.recv() => {
if let Some(exit) = state.update(message) {
let interrupted = Interrupted::User { payload: exit.value };
let _ = terminator.terminate(interrupted.clone());
break interrupted;
}
self.state_tx.send(state.clone())?;
},
Some(message) = work_rx.recv() => {
state.update(message);
self.state_tx.send(state.clone())?;
},
_ = ticker.tick() => {
state.tick();
},
Ok(interrupted) = interrupt_rx.recv() => {
break interrupted;
}
}
};
Ok(result)
}
}