1#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2pub enum SendState {
3 Requested,
4 AcceptedPersisted,
5 Dispatching,
6 Delivered,
7 DeliveryFailed,
8}
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum SendTransition {
12 PersistAccepted,
13 StartDispatch,
14 MarkDelivered,
15 MarkDeliveryFailed,
16}
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum SendEffect {
20 AppendFrame,
21 UpdateFrameStatus,
22 EmitFrame,
23 DispatchSend,
24}
25
26impl SendState {
27 pub fn apply(
28 self,
29 transition: SendTransition,
30 ) -> Result<(Self, Vec<SendEffect>), &'static str> {
31 match (self, transition) {
32 (Self::Requested, SendTransition::PersistAccepted) => Ok((
33 Self::AcceptedPersisted,
34 vec![SendEffect::AppendFrame, SendEffect::EmitFrame],
35 )),
36 (Self::AcceptedPersisted, SendTransition::StartDispatch) => Ok((
37 Self::Dispatching,
38 vec![SendEffect::UpdateFrameStatus, SendEffect::DispatchSend],
39 )),
40 (Self::Dispatching, SendTransition::MarkDelivered) => Ok((
41 Self::Delivered,
42 vec![SendEffect::UpdateFrameStatus, SendEffect::EmitFrame],
43 )),
44 (Self::Dispatching, SendTransition::MarkDeliveryFailed) => Ok((
45 Self::DeliveryFailed,
46 vec![SendEffect::UpdateFrameStatus, SendEffect::EmitFrame],
47 )),
48 _ => Err("illegal send state transition"),
49 }
50 }
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum SourceIngestionState {
55 Registered,
56 Backfilling,
57 CaughtUp,
58 Live,
59 GapDetected,
60 ReplayUnavailable,
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum SourceIngestionTransition {
65 StartBackfill,
66 BackfillComplete,
67 StartLive,
68 DetectGap,
69 MarkReplayUnavailable,
70 Recover,
71}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub enum SourceIngestionEffect {
75 QuerySourceHistory,
76 AppendFrame,
77 SubscribeLive,
78 MarkReplayUnavailable,
79}
80
81impl SourceIngestionState {
82 pub fn apply(
83 self,
84 transition: SourceIngestionTransition,
85 ) -> Result<(Self, Vec<SourceIngestionEffect>), &'static str> {
86 match (self, transition) {
87 (Self::Registered, SourceIngestionTransition::StartBackfill) => Ok((
88 Self::Backfilling,
89 vec![SourceIngestionEffect::QuerySourceHistory],
90 )),
91 (Self::Backfilling, SourceIngestionTransition::BackfillComplete) => {
92 Ok((Self::CaughtUp, vec![SourceIngestionEffect::AppendFrame]))
93 }
94 (Self::CaughtUp, SourceIngestionTransition::StartLive) => {
95 Ok((Self::Live, vec![SourceIngestionEffect::SubscribeLive]))
96 }
97 (Self::Live, SourceIngestionTransition::DetectGap) => {
98 Ok((Self::GapDetected, Vec::new()))
99 }
100 (Self::GapDetected, SourceIngestionTransition::Recover) => Ok((
101 Self::Backfilling,
102 vec![SourceIngestionEffect::QuerySourceHistory],
103 )),
104 (Self::GapDetected, SourceIngestionTransition::MarkReplayUnavailable) => Ok((
105 Self::ReplayUnavailable,
106 vec![SourceIngestionEffect::MarkReplayUnavailable],
107 )),
108 (Self::ReplayUnavailable, SourceIngestionTransition::Recover) => Ok((
109 Self::Backfilling,
110 vec![SourceIngestionEffect::QuerySourceHistory],
111 )),
112 _ => Err("illegal source ingestion state transition"),
113 }
114 }
115}
116
117#[derive(Debug, Clone, Copy, PartialEq, Eq)]
118pub enum ReplaySubscriptionState {
119 Snapshotting,
120 Live,
121 Lagged,
122 Closed,
123}
124
125#[derive(Debug, Clone, Copy, PartialEq, Eq)]
126pub enum ReplaySubscriptionTransition {
127 SnapshotComplete,
128 MarkLagged,
129 RecoverLive,
130 Close,
131}
132
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134pub enum ReplaySubscriptionEffect {
135 EmitSnapshotStarted,
136 EmitReplayFrames,
137 EmitSnapshotComplete,
138 EmitLagged,
139 CloseStream,
140}
141
142impl ReplaySubscriptionState {
143 pub fn apply(
144 self,
145 transition: ReplaySubscriptionTransition,
146 ) -> Result<(Self, Vec<ReplaySubscriptionEffect>), &'static str> {
147 match (self, transition) {
148 (Self::Snapshotting, ReplaySubscriptionTransition::SnapshotComplete) => Ok((
149 Self::Live,
150 vec![
151 ReplaySubscriptionEffect::EmitSnapshotStarted,
152 ReplaySubscriptionEffect::EmitReplayFrames,
153 ReplaySubscriptionEffect::EmitSnapshotComplete,
154 ],
155 )),
156 (Self::Live, ReplaySubscriptionTransition::MarkLagged) => {
157 Ok((Self::Lagged, vec![ReplaySubscriptionEffect::EmitLagged]))
158 }
159 (Self::Lagged, ReplaySubscriptionTransition::RecoverLive) => {
160 Ok((Self::Live, Vec::new()))
161 }
162 (_, ReplaySubscriptionTransition::Close) if self != Self::Closed => {
163 Ok((Self::Closed, vec![ReplaySubscriptionEffect::CloseStream]))
164 }
165 _ => Err("illegal replay subscription state transition"),
166 }
167 }
168}
169
170#[cfg(test)]
171#[allow(clippy::expect_used)]
172mod tests {
173 use super::*;
174
175 #[test]
176 fn send_reducer_allows_happy_path() {
177 let (state, effects) = SendState::Requested
178 .apply(SendTransition::PersistAccepted)
179 .expect("requested -> accepted should be legal");
180 assert_eq!(state, SendState::AcceptedPersisted);
181 assert!(effects.contains(&SendEffect::AppendFrame));
182
183 let (state, effects) = state
184 .apply(SendTransition::StartDispatch)
185 .expect("accepted -> dispatching should be legal");
186 assert_eq!(state, SendState::Dispatching);
187 assert!(effects.contains(&SendEffect::DispatchSend));
188
189 let (state, effects) = state
190 .apply(SendTransition::MarkDelivered)
191 .expect("dispatching -> delivered should be legal");
192 assert_eq!(state, SendState::Delivered);
193 assert!(effects.contains(&SendEffect::UpdateFrameStatus));
194 }
195
196 #[test]
197 fn send_reducer_rejects_terminal_to_dispatch() {
198 assert_eq!(
199 SendState::Delivered.apply(SendTransition::StartDispatch),
200 Err("illegal send state transition")
201 );
202 }
203
204 #[test]
205 fn source_ingestion_reducer_models_backfill_to_live() {
206 let (state, _) = SourceIngestionState::Registered
207 .apply(SourceIngestionTransition::StartBackfill)
208 .expect("registered -> backfilling");
209 let (state, _) = state
210 .apply(SourceIngestionTransition::BackfillComplete)
211 .expect("backfilling -> caught up");
212 let (state, effects) = state
213 .apply(SourceIngestionTransition::StartLive)
214 .expect("caught up -> live");
215 assert_eq!(state, SourceIngestionState::Live);
216 assert!(effects.contains(&SourceIngestionEffect::SubscribeLive));
217 }
218
219 #[test]
220 fn replay_subscription_reducer_emits_snapshot_boundary() {
221 let (state, effects) = ReplaySubscriptionState::Snapshotting
222 .apply(ReplaySubscriptionTransition::SnapshotComplete)
223 .expect("snapshotting -> live");
224 assert_eq!(state, ReplaySubscriptionState::Live);
225 assert_eq!(
226 effects,
227 vec![
228 ReplaySubscriptionEffect::EmitSnapshotStarted,
229 ReplaySubscriptionEffect::EmitReplayFrames,
230 ReplaySubscriptionEffect::EmitSnapshotComplete,
231 ]
232 );
233 }
234}