Skip to main content

meerkat_mobkit/console_aggregator/
state.rs

1#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2pub enum SendState {
3    Requested,
4    AcceptedPersisted,
5    Dispatching,
6    Delivered,
7    DeliveryFailed,
8}
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum SendTransition {
12    PersistAccepted,
13    StartDispatch,
14    MarkDelivered,
15    MarkDeliveryFailed,
16}
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum SendEffect {
20    AppendFrame,
21    UpdateFrameStatus,
22    EmitFrame,
23    DispatchSend,
24}
25
26impl SendState {
27    pub fn apply(
28        self,
29        transition: SendTransition,
30    ) -> Result<(Self, Vec<SendEffect>), &'static str> {
31        match (self, transition) {
32            (Self::Requested, SendTransition::PersistAccepted) => Ok((
33                Self::AcceptedPersisted,
34                vec![SendEffect::AppendFrame, SendEffect::EmitFrame],
35            )),
36            (Self::AcceptedPersisted, SendTransition::StartDispatch) => Ok((
37                Self::Dispatching,
38                vec![SendEffect::UpdateFrameStatus, SendEffect::DispatchSend],
39            )),
40            (Self::Dispatching, SendTransition::MarkDelivered) => Ok((
41                Self::Delivered,
42                vec![SendEffect::UpdateFrameStatus, SendEffect::EmitFrame],
43            )),
44            (Self::Dispatching, SendTransition::MarkDeliveryFailed) => Ok((
45                Self::DeliveryFailed,
46                vec![SendEffect::UpdateFrameStatus, SendEffect::EmitFrame],
47            )),
48            _ => Err("illegal send state transition"),
49        }
50    }
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum SourceIngestionState {
55    Registered,
56    Backfilling,
57    CaughtUp,
58    Live,
59    GapDetected,
60    ReplayUnavailable,
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum SourceIngestionTransition {
65    StartBackfill,
66    BackfillComplete,
67    StartLive,
68    DetectGap,
69    MarkReplayUnavailable,
70    Recover,
71}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub enum SourceIngestionEffect {
75    QuerySourceHistory,
76    AppendFrame,
77    SubscribeLive,
78    MarkReplayUnavailable,
79}
80
81impl SourceIngestionState {
82    pub fn apply(
83        self,
84        transition: SourceIngestionTransition,
85    ) -> Result<(Self, Vec<SourceIngestionEffect>), &'static str> {
86        match (self, transition) {
87            (Self::Registered, SourceIngestionTransition::StartBackfill) => Ok((
88                Self::Backfilling,
89                vec![SourceIngestionEffect::QuerySourceHistory],
90            )),
91            (Self::Backfilling, SourceIngestionTransition::BackfillComplete) => {
92                Ok((Self::CaughtUp, vec![SourceIngestionEffect::AppendFrame]))
93            }
94            (Self::CaughtUp, SourceIngestionTransition::StartLive) => {
95                Ok((Self::Live, vec![SourceIngestionEffect::SubscribeLive]))
96            }
97            (Self::Live, SourceIngestionTransition::DetectGap) => {
98                Ok((Self::GapDetected, Vec::new()))
99            }
100            (Self::GapDetected, SourceIngestionTransition::Recover) => Ok((
101                Self::Backfilling,
102                vec![SourceIngestionEffect::QuerySourceHistory],
103            )),
104            (Self::GapDetected, SourceIngestionTransition::MarkReplayUnavailable) => Ok((
105                Self::ReplayUnavailable,
106                vec![SourceIngestionEffect::MarkReplayUnavailable],
107            )),
108            (Self::ReplayUnavailable, SourceIngestionTransition::Recover) => Ok((
109                Self::Backfilling,
110                vec![SourceIngestionEffect::QuerySourceHistory],
111            )),
112            _ => Err("illegal source ingestion state transition"),
113        }
114    }
115}
116
117#[derive(Debug, Clone, Copy, PartialEq, Eq)]
118pub enum ReplaySubscriptionState {
119    Snapshotting,
120    Live,
121    Lagged,
122    Closed,
123}
124
125#[derive(Debug, Clone, Copy, PartialEq, Eq)]
126pub enum ReplaySubscriptionTransition {
127    SnapshotComplete,
128    MarkLagged,
129    RecoverLive,
130    Close,
131}
132
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134pub enum ReplaySubscriptionEffect {
135    EmitSnapshotStarted,
136    EmitReplayFrames,
137    EmitSnapshotComplete,
138    EmitLagged,
139    CloseStream,
140}
141
142impl ReplaySubscriptionState {
143    pub fn apply(
144        self,
145        transition: ReplaySubscriptionTransition,
146    ) -> Result<(Self, Vec<ReplaySubscriptionEffect>), &'static str> {
147        match (self, transition) {
148            (Self::Snapshotting, ReplaySubscriptionTransition::SnapshotComplete) => Ok((
149                Self::Live,
150                vec![
151                    ReplaySubscriptionEffect::EmitSnapshotStarted,
152                    ReplaySubscriptionEffect::EmitReplayFrames,
153                    ReplaySubscriptionEffect::EmitSnapshotComplete,
154                ],
155            )),
156            (Self::Live, ReplaySubscriptionTransition::MarkLagged) => {
157                Ok((Self::Lagged, vec![ReplaySubscriptionEffect::EmitLagged]))
158            }
159            (Self::Lagged, ReplaySubscriptionTransition::RecoverLive) => {
160                Ok((Self::Live, Vec::new()))
161            }
162            (_, ReplaySubscriptionTransition::Close) if self != Self::Closed => {
163                Ok((Self::Closed, vec![ReplaySubscriptionEffect::CloseStream]))
164            }
165            _ => Err("illegal replay subscription state transition"),
166        }
167    }
168}
169
170#[cfg(test)]
171#[allow(clippy::expect_used)]
172mod tests {
173    use super::*;
174
175    #[test]
176    fn send_reducer_allows_happy_path() {
177        let (state, effects) = SendState::Requested
178            .apply(SendTransition::PersistAccepted)
179            .expect("requested -> accepted should be legal");
180        assert_eq!(state, SendState::AcceptedPersisted);
181        assert!(effects.contains(&SendEffect::AppendFrame));
182
183        let (state, effects) = state
184            .apply(SendTransition::StartDispatch)
185            .expect("accepted -> dispatching should be legal");
186        assert_eq!(state, SendState::Dispatching);
187        assert!(effects.contains(&SendEffect::DispatchSend));
188
189        let (state, effects) = state
190            .apply(SendTransition::MarkDelivered)
191            .expect("dispatching -> delivered should be legal");
192        assert_eq!(state, SendState::Delivered);
193        assert!(effects.contains(&SendEffect::UpdateFrameStatus));
194    }
195
196    #[test]
197    fn send_reducer_rejects_terminal_to_dispatch() {
198        assert_eq!(
199            SendState::Delivered.apply(SendTransition::StartDispatch),
200            Err("illegal send state transition")
201        );
202    }
203
204    #[test]
205    fn source_ingestion_reducer_models_backfill_to_live() {
206        let (state, _) = SourceIngestionState::Registered
207            .apply(SourceIngestionTransition::StartBackfill)
208            .expect("registered -> backfilling");
209        let (state, _) = state
210            .apply(SourceIngestionTransition::BackfillComplete)
211            .expect("backfilling -> caught up");
212        let (state, effects) = state
213            .apply(SourceIngestionTransition::StartLive)
214            .expect("caught up -> live");
215        assert_eq!(state, SourceIngestionState::Live);
216        assert!(effects.contains(&SourceIngestionEffect::SubscribeLive));
217    }
218
219    #[test]
220    fn replay_subscription_reducer_emits_snapshot_boundary() {
221        let (state, effects) = ReplaySubscriptionState::Snapshotting
222            .apply(ReplaySubscriptionTransition::SnapshotComplete)
223            .expect("snapshotting -> live");
224        assert_eq!(state, ReplaySubscriptionState::Live);
225        assert_eq!(
226            effects,
227            vec![
228                ReplaySubscriptionEffect::EmitSnapshotStarted,
229                ReplaySubscriptionEffect::EmitReplayFrames,
230                ReplaySubscriptionEffect::EmitSnapshotComplete,
231            ]
232        );
233    }
234}