Skip to main content

camel_core/lifecycle/domain/
route_runtime.rs

1use crate::lifecycle::domain::DomainError;
2use crate::lifecycle::domain::RuntimeEvent;
3
4#[derive(Debug, Clone, PartialEq, Eq)]
5pub enum RouteRuntimeState {
6    Registered,
7    Starting,
8    Started,
9    Suspended,
10    Stopping,
11    Stopped,
12    Failed(String),
13}
14
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub enum RouteLifecycleCommand {
17    Start,
18    Stop,
19    Suspend,
20    Resume,
21    Reload,
22    Fail(String),
23    Remove,
24}
25
26#[derive(Debug, Clone)]
27pub struct RouteRuntimeAggregate {
28    route_id: String,
29    state: RouteRuntimeState,
30    version: u64,
31}
32
33impl RouteRuntimeAggregate {
34    pub fn new(route_id: impl Into<String>) -> Self {
35        Self {
36            route_id: route_id.into(),
37            state: RouteRuntimeState::Registered,
38            version: 0,
39        }
40    }
41
42    /// Constructor that returns both the aggregate and the initial `RouteRegistered` event.
43    /// Used by command handlers — avoids duplicating event construction outside the domain.
44    pub fn register(route_id: impl Into<String>) -> (Self, Vec<RuntimeEvent>) {
45        let route_id = route_id.into();
46        let aggregate = Self::new(route_id.clone());
47        let events = vec![RuntimeEvent::RouteRegistered { route_id }];
48        (aggregate, events)
49    }
50
51    pub fn from_snapshot(
52        route_id: impl Into<String>,
53        state: RouteRuntimeState,
54        version: u64,
55    ) -> Self {
56        Self {
57            route_id: route_id.into(),
58            state,
59            version,
60        }
61    }
62
63    pub fn state(&self) -> &RouteRuntimeState {
64        &self.state
65    }
66
67    pub fn version(&self) -> u64 {
68        self.version
69    }
70
71    pub fn route_id(&self) -> &str {
72        &self.route_id
73    }
74
75    /// Map a persisted RuntimeEvent to the resulting RouteRuntimeState.
76    /// Used for event replay — avoids duplicating the state machine outside the domain.
77    pub fn state_from_event(event: &RuntimeEvent) -> Option<RouteRuntimeState> {
78        match event {
79            RuntimeEvent::RouteRegistered { .. } => Some(RouteRuntimeState::Registered),
80            RuntimeEvent::RouteStartRequested { .. } => Some(RouteRuntimeState::Starting),
81            RuntimeEvent::RouteStarted { .. } => Some(RouteRuntimeState::Started),
82            RuntimeEvent::RouteStopped { .. } => Some(RouteRuntimeState::Stopped),
83            RuntimeEvent::RouteSuspended { .. } => Some(RouteRuntimeState::Suspended),
84            RuntimeEvent::RouteResumed { .. } => Some(RouteRuntimeState::Started),
85            RuntimeEvent::RouteFailed { error, .. } => {
86                Some(RouteRuntimeState::Failed(error.clone()))
87            }
88            RuntimeEvent::RouteReloaded { .. } => Some(RouteRuntimeState::Started),
89            RuntimeEvent::RouteRemoved { .. } => None,
90        }
91    }
92
93    pub fn apply_command(
94        &mut self,
95        cmd: RouteLifecycleCommand,
96    ) -> Result<Vec<RuntimeEvent>, DomainError> {
97        let invalid = |from: &RouteRuntimeState, to: &str| DomainError::InvalidTransition {
98            from: format!("{from:?}"),
99            to: to.to_string(),
100        };
101
102        let events = match cmd {
103            RouteLifecycleCommand::Start => match self.state {
104                RouteRuntimeState::Registered | RouteRuntimeState::Stopped => {
105                    self.state = RouteRuntimeState::Started;
106                    vec![
107                        RuntimeEvent::RouteStartRequested {
108                            route_id: self.route_id.clone(),
109                        },
110                        RuntimeEvent::RouteStarted {
111                            route_id: self.route_id.clone(),
112                        },
113                    ]
114                }
115                RouteRuntimeState::Started => vec![],
116                _ => return Err(invalid(&self.state, "Started")),
117            },
118            RouteLifecycleCommand::Stop => match self.state {
119                RouteRuntimeState::Started
120                | RouteRuntimeState::Suspended
121                | RouteRuntimeState::Failed(_) => {
122                    self.state = RouteRuntimeState::Stopped;
123                    vec![RuntimeEvent::RouteStopped {
124                        route_id: self.route_id.clone(),
125                    }]
126                }
127                RouteRuntimeState::Stopped => vec![],
128                _ => return Err(invalid(&self.state, "Stopped")),
129            },
130            RouteLifecycleCommand::Suspend => match self.state {
131                RouteRuntimeState::Started => {
132                    self.state = RouteRuntimeState::Suspended;
133                    vec![RuntimeEvent::RouteSuspended {
134                        route_id: self.route_id.clone(),
135                    }]
136                }
137                RouteRuntimeState::Suspended => vec![],
138                _ => return Err(invalid(&self.state, "Suspended")),
139            },
140            RouteLifecycleCommand::Resume => match self.state {
141                RouteRuntimeState::Suspended => {
142                    self.state = RouteRuntimeState::Started;
143                    vec![RuntimeEvent::RouteResumed {
144                        route_id: self.route_id.clone(),
145                    }]
146                }
147                RouteRuntimeState::Started => vec![],
148                _ => return Err(invalid(&self.state, "Started")),
149            },
150            RouteLifecycleCommand::Reload => match self.state {
151                RouteRuntimeState::Started
152                | RouteRuntimeState::Suspended
153                | RouteRuntimeState::Stopped
154                | RouteRuntimeState::Failed(_) => {
155                    self.state = RouteRuntimeState::Started;
156                    vec![RuntimeEvent::RouteReloaded {
157                        route_id: self.route_id.clone(),
158                    }]
159                }
160                _ => return Err(invalid(&self.state, "Started")),
161            },
162            RouteLifecycleCommand::Fail(error) => {
163                self.state = RouteRuntimeState::Failed(error.clone());
164                vec![RuntimeEvent::RouteFailed {
165                    route_id: self.route_id.clone(),
166                    error,
167                }]
168            }
169            RouteLifecycleCommand::Remove => match self.state {
170                RouteRuntimeState::Registered | RouteRuntimeState::Stopped => {
171                    vec![RuntimeEvent::RouteRemoved {
172                        route_id: self.route_id.clone(),
173                    }]
174                }
175                _ => {
176                    return Err(invalid(&self.state, "Removed"));
177                }
178            },
179        };
180
181        self.version += 1;
182        Ok(events)
183    }
184}
185
186#[cfg(test)]
187mod tests {
188    use super::*;
189
190    #[test]
191    fn cannot_suspend_when_stopped() {
192        let mut agg = RouteRuntimeAggregate::new("r1");
193        agg.state = RouteRuntimeState::Stopped;
194        let err = agg
195            .apply_command(RouteLifecycleCommand::Suspend)
196            .unwrap_err();
197        assert!(err.to_string().contains("invalid transition"));
198    }
199
200    #[test]
201    fn start_emits_route_started_event() {
202        let mut agg = RouteRuntimeAggregate::new("r1");
203        let events = agg.apply_command(RouteLifecycleCommand::Start).unwrap();
204        assert!(
205            events
206                .iter()
207                .any(|e| matches!(e, RuntimeEvent::RouteStarted { route_id } if route_id == "r1"))
208        );
209    }
210
211    #[test]
212    fn register_returns_aggregate_and_event() {
213        let (agg, events) = RouteRuntimeAggregate::register("r1");
214        assert_eq!(agg.route_id(), "r1");
215        assert_eq!(agg.state(), &RouteRuntimeState::Registered);
216        assert_eq!(events.len(), 1);
217        assert!(matches!(
218            &events[0],
219            RuntimeEvent::RouteRegistered { route_id } if route_id == "r1"
220        ));
221    }
222
223    #[test]
224    fn remove_from_registered_emits_removed() {
225        let mut agg = RouteRuntimeAggregate::new("r1");
226        let events = agg.apply_command(RouteLifecycleCommand::Remove).unwrap();
227        assert_eq!(events.len(), 1);
228        assert!(matches!(
229            &events[0],
230            RuntimeEvent::RouteRemoved { route_id } if route_id == "r1"
231        ));
232    }
233
234    #[test]
235    fn remove_from_started_is_invalid() {
236        let mut agg = RouteRuntimeAggregate::new("r1");
237        agg.apply_command(RouteLifecycleCommand::Start).unwrap();
238        let err = agg
239            .apply_command(RouteLifecycleCommand::Remove)
240            .unwrap_err();
241        assert!(err.to_string().contains("invalid transition"));
242    }
243
244    #[test]
245    fn remove_from_stopped_emits_removed() {
246        let mut agg = RouteRuntimeAggregate::new("r1");
247        agg.apply_command(RouteLifecycleCommand::Start).unwrap();
248        agg.apply_command(RouteLifecycleCommand::Stop).unwrap();
249        let events = agg.apply_command(RouteLifecycleCommand::Remove).unwrap();
250        assert_eq!(events.len(), 1);
251        assert!(matches!(
252            &events[0],
253            RuntimeEvent::RouteRemoved { route_id } if route_id == "r1"
254        ));
255    }
256
257    #[test]
258    fn start_from_started_is_idempotent() {
259        let mut agg = RouteRuntimeAggregate::new("r1");
260        agg.apply_command(RouteLifecycleCommand::Start).unwrap();
261        let events = agg.apply_command(RouteLifecycleCommand::Start).unwrap();
262        assert!(events.is_empty());
263        assert_eq!(agg.state(), &RouteRuntimeState::Started);
264    }
265
266    #[test]
267    fn stop_from_stopped_is_idempotent() {
268        let mut agg = RouteRuntimeAggregate::new("r1");
269        agg.apply_command(RouteLifecycleCommand::Start).unwrap();
270        agg.apply_command(RouteLifecycleCommand::Stop).unwrap();
271        let events = agg.apply_command(RouteLifecycleCommand::Stop).unwrap();
272        assert!(events.is_empty());
273        assert_eq!(agg.state(), &RouteRuntimeState::Stopped);
274    }
275
276    #[test]
277    fn suspend_from_suspended_is_idempotent() {
278        let mut agg = RouteRuntimeAggregate::new("r1");
279        agg.apply_command(RouteLifecycleCommand::Start).unwrap();
280        agg.apply_command(RouteLifecycleCommand::Suspend).unwrap();
281        let events = agg.apply_command(RouteLifecycleCommand::Suspend).unwrap();
282        assert!(events.is_empty());
283        assert_eq!(agg.state(), &RouteRuntimeState::Suspended);
284    }
285
286    #[test]
287    fn resume_from_started_is_idempotent() {
288        let mut agg = RouteRuntimeAggregate::new("r1");
289        agg.apply_command(RouteLifecycleCommand::Start).unwrap();
290        let events = agg.apply_command(RouteLifecycleCommand::Resume).unwrap();
291        assert!(events.is_empty());
292        assert_eq!(agg.state(), &RouteRuntimeState::Started);
293    }
294
295    #[test]
296    fn state_from_event_maps_all_variants() {
297        assert_eq!(
298            RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteRegistered {
299                route_id: "r".into()
300            }),
301            Some(RouteRuntimeState::Registered)
302        );
303        assert_eq!(
304            RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteStartRequested {
305                route_id: "r".into()
306            }),
307            Some(RouteRuntimeState::Starting)
308        );
309        assert_eq!(
310            RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteStarted {
311                route_id: "r".into()
312            }),
313            Some(RouteRuntimeState::Started)
314        );
315        assert_eq!(
316            RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteStopped {
317                route_id: "r".into()
318            }),
319            Some(RouteRuntimeState::Stopped)
320        );
321        assert_eq!(
322            RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteSuspended {
323                route_id: "r".into()
324            }),
325            Some(RouteRuntimeState::Suspended)
326        );
327        assert_eq!(
328            RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteResumed {
329                route_id: "r".into()
330            }),
331            Some(RouteRuntimeState::Started)
332        );
333        assert_eq!(
334            RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteFailed {
335                route_id: "r".into(),
336                error: "e".into()
337            }),
338            Some(RouteRuntimeState::Failed("e".into()))
339        );
340        assert_eq!(
341            RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteReloaded {
342                route_id: "r".into()
343            }),
344            Some(RouteRuntimeState::Started)
345        );
346        assert_eq!(
347            RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteRemoved {
348                route_id: "r".into()
349            }),
350            None
351        );
352    }
353}