Skip to main content

radicle_tui/
store.rs

1use std::marker::PhantomData;
2use std::time::Duration;
3
4use tokio::sync::{
5    broadcast,
6    mpsc::{UnboundedReceiver, UnboundedSender},
7};
8
9use super::{Exit, Interrupted, Share, Terminator};
10
11const STORE_TICK_RATE: Duration = Duration::from_millis(1000);
12
13/// The main state trait for the ability to update an applications' state.
14/// Implementations should handle user-defined application messages as well as ticks.
15pub trait Update<M> {
16    type Return;
17
18    /// Handle a user-defined application message and return an `Exit` object
19    /// in case the received message requested the application to also quit.
20    fn update(&mut self, message: M) -> Option<Exit<Self::Return>>;
21
22    /// Handle recurring tick.
23    fn tick(&mut self) {}
24}
25
26/// The `Store` updates the applications' state concurrently. It handles
27/// messages coming from the frontend and updates the state accordingly.
28pub struct Store<S, M, R>
29where
30    S: Update<M, Return = R> + Share,
31{
32    state_tx: UnboundedSender<S>,
33    _phantom: PhantomData<(M, R)>,
34}
35
36impl<S, M, R> Store<S, M, R>
37where
38    S: Update<M, Return = R> + Share,
39    R: Share,
40{
41    pub fn new(tx: UnboundedSender<S>) -> Self {
42        Self {
43            state_tx: tx,
44            _phantom: PhantomData,
45        }
46    }
47}
48
49impl<S, M, R> Store<S, M, R>
50where
51    S: Update<M, Return = R> + Share,
52    M: Share,
53    R: Share,
54{
55    /// By calling `main_loop`, the store will wait for new messages coming
56    /// from the frontend and update the applications' state accordingly. It will
57    /// also tick with the defined `STORE_TICK_RATE`.
58    /// Updated states are then being send to the state message channel.
59    pub async fn run(
60        self,
61        mut state: S,
62        mut terminator: Terminator<R>,
63        mut message_rx: broadcast::Receiver<M>,
64        mut work_rx: UnboundedReceiver<M>,
65        mut interrupt_rx: broadcast::Receiver<Interrupted<R>>,
66    ) -> anyhow::Result<Interrupted<R>> {
67        // Send the initial state once
68        self.state_tx.send(state.clone())?;
69
70        let mut ticker = tokio::time::interval(STORE_TICK_RATE);
71
72        let result = loop {
73            tokio::select! {
74                // Handle the messages coming from the frontend
75                // and process them to do async operations
76                Ok(message) = message_rx.recv() => {
77                    if let Some(exit) = state.update(message) {
78                        let interrupted = Interrupted::User { payload: exit.value };
79                        let _ = terminator.terminate(interrupted.clone());
80
81                        break interrupted;
82                    }
83                    self.state_tx.send(state.clone())?;
84                },
85                Some(message) = work_rx.recv() => {
86                    state.update(message);
87                    self.state_tx.send(state.clone())?;
88                },
89                // Tick to terminate the select every N milliseconds
90                _ = ticker.tick() => {
91                    state.tick();
92                },
93                // Catch and handle interrupt signal to gracefully shutdown
94                Ok(interrupted) = interrupt_rx.recv() => {
95                    break interrupted;
96                }
97            }
98        };
99
100        Ok(result)
101    }
102}