1use std::marker::PhantomData;
2use std::time::Duration;
3
4use tokio::sync::{
5 broadcast,
6 mpsc::{UnboundedReceiver, UnboundedSender},
7};
8
9use super::{Exit, Interrupted, Share, Terminator};
10
11const STORE_TICK_RATE: Duration = Duration::from_millis(1000);
12
13pub trait Update<M> {
16 type Return;
17
18 fn update(&mut self, message: M) -> Option<Exit<Self::Return>>;
21
22 fn tick(&mut self) {}
24}
25
26pub struct Store<S, M, R>
29where
30 S: Update<M, Return = R> + Share,
31{
32 state_tx: UnboundedSender<S>,
33 _phantom: PhantomData<(M, R)>,
34}
35
36impl<S, M, R> Store<S, M, R>
37where
38 S: Update<M, Return = R> + Share,
39 R: Share,
40{
41 pub fn new(tx: UnboundedSender<S>) -> Self {
42 Self {
43 state_tx: tx,
44 _phantom: PhantomData,
45 }
46 }
47}
48
49impl<S, M, R> Store<S, M, R>
50where
51 S: Update<M, Return = R> + Share,
52 M: Share,
53 R: Share,
54{
55 pub async fn run(
60 self,
61 mut state: S,
62 mut terminator: Terminator<R>,
63 mut message_rx: broadcast::Receiver<M>,
64 mut work_rx: UnboundedReceiver<M>,
65 mut interrupt_rx: broadcast::Receiver<Interrupted<R>>,
66 ) -> anyhow::Result<Interrupted<R>> {
67 self.state_tx.send(state.clone())?;
69
70 let mut ticker = tokio::time::interval(STORE_TICK_RATE);
71
72 let result = loop {
73 tokio::select! {
74 Ok(message) = message_rx.recv() => {
77 if let Some(exit) = state.update(message) {
78 let interrupted = Interrupted::User { payload: exit.value };
79 let _ = terminator.terminate(interrupted.clone());
80
81 break interrupted;
82 }
83 self.state_tx.send(state.clone())?;
84 },
85 Some(message) = work_rx.recv() => {
86 state.update(message);
87 self.state_tx.send(state.clone())?;
88 },
89 _ = ticker.tick() => {
91 state.tick();
92 },
93 Ok(interrupted) = interrupt_rx.recv() => {
95 break interrupted;
96 }
97 }
98 };
99
100 Ok(result)
101 }
102}