Skip to main content

camel_core/lifecycle/domain/
route_runtime.rs

1use crate::lifecycle::domain::DomainError;
2use crate::lifecycle::domain::RuntimeEvent;
3
4#[derive(Debug, Clone, PartialEq, Eq)]
5pub enum RouteRuntimeState {
6    Registered,
7    Starting,
8    Started,
9    Suspended,
10    Stopping,
11    Stopped,
12    Failed(String),
13}
14
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub enum RouteLifecycleCommand {
17    Start,
18    Stop,
19    Suspend,
20    Resume,
21    Reload,
22    Fail(String),
23    Remove,
24}
25
26#[derive(Debug, Clone)]
27pub struct RouteRuntimeAggregate {
28    route_id: String,
29    state: RouteRuntimeState,
30    version: u64,
31}
32
33impl RouteRuntimeAggregate {
34    pub fn new(route_id: impl Into<String>) -> Self {
35        Self {
36            route_id: route_id.into(),
37            state: RouteRuntimeState::Registered,
38            version: 0,
39        }
40    }
41
42    /// Constructor that returns both the aggregate and the initial `RouteRegistered` event.
43    /// Used by command handlers — avoids duplicating event construction outside the domain.
44    pub fn register(route_id: impl Into<String>) -> (Self, Vec<RuntimeEvent>) {
45        let route_id = route_id.into();
46        let aggregate = Self::new(route_id.clone());
47        let events = vec![RuntimeEvent::RouteRegistered { route_id }];
48        (aggregate, events)
49    }
50
51    pub fn from_snapshot(
52        route_id: impl Into<String>,
53        state: RouteRuntimeState,
54        version: u64,
55    ) -> Self {
56        Self {
57            route_id: route_id.into(),
58            state,
59            version,
60        }
61    }
62
63    pub fn state(&self) -> &RouteRuntimeState {
64        &self.state
65    }
66
67    pub fn version(&self) -> u64 {
68        self.version
69    }
70
71    pub fn route_id(&self) -> &str {
72        &self.route_id
73    }
74
75    /// Map a persisted RuntimeEvent to the resulting RouteRuntimeState.
76    /// Used for event replay — avoids duplicating the state machine outside the domain.
77    pub fn state_from_event(event: &RuntimeEvent) -> Option<RouteRuntimeState> {
78        match event {
79            RuntimeEvent::RouteRegistered { .. } => Some(RouteRuntimeState::Registered),
80            RuntimeEvent::RouteStartRequested { .. } => Some(RouteRuntimeState::Starting),
81            RuntimeEvent::RouteStarted { .. } => Some(RouteRuntimeState::Started),
82            RuntimeEvent::RouteStopped { .. } => Some(RouteRuntimeState::Stopped),
83            RuntimeEvent::RouteSuspended { .. } => Some(RouteRuntimeState::Suspended),
84            RuntimeEvent::RouteResumed { .. } => Some(RouteRuntimeState::Started),
85            RuntimeEvent::RouteFailed { error, .. } => {
86                Some(RouteRuntimeState::Failed(error.clone()))
87            }
88            RuntimeEvent::RouteReloaded { .. } => Some(RouteRuntimeState::Started),
89            RuntimeEvent::RouteRemoved { .. } => None,
90        }
91    }
92
93    pub fn apply_command(
94        &mut self,
95        cmd: RouteLifecycleCommand,
96    ) -> Result<Vec<RuntimeEvent>, DomainError> {
97        let invalid = |from: &RouteRuntimeState, to: &str| DomainError::InvalidTransition {
98            from: format!("{from:?}"),
99            to: to.to_string(),
100        };
101
102        let events = match cmd {
103            RouteLifecycleCommand::Start => match self.state {
104                RouteRuntimeState::Registered | RouteRuntimeState::Stopped => {
105                    self.state = RouteRuntimeState::Started;
106                    vec![
107                        RuntimeEvent::RouteStartRequested {
108                            route_id: self.route_id.clone(),
109                        },
110                        RuntimeEvent::RouteStarted {
111                            route_id: self.route_id.clone(),
112                        },
113                    ]
114                }
115                RouteRuntimeState::Started => vec![],
116                _ => return Err(invalid(&self.state, "Started")),
117            },
118            RouteLifecycleCommand::Stop => match self.state {
119                RouteRuntimeState::Started
120                | RouteRuntimeState::Suspended
121                | RouteRuntimeState::Failed(_) => {
122                    self.state = RouteRuntimeState::Stopped;
123                    vec![RuntimeEvent::RouteStopped {
124                        route_id: self.route_id.clone(),
125                    }]
126                }
127                RouteRuntimeState::Stopped => vec![],
128                _ => return Err(invalid(&self.state, "Stopped")),
129            },
130            RouteLifecycleCommand::Suspend => match self.state {
131                RouteRuntimeState::Started => {
132                    self.state = RouteRuntimeState::Suspended;
133                    vec![RuntimeEvent::RouteSuspended {
134                        route_id: self.route_id.clone(),
135                    }]
136                }
137                RouteRuntimeState::Suspended => vec![],
138                _ => return Err(invalid(&self.state, "Suspended")),
139            },
140            RouteLifecycleCommand::Resume => match self.state {
141                RouteRuntimeState::Suspended => {
142                    self.state = RouteRuntimeState::Started;
143                    vec![RuntimeEvent::RouteResumed {
144                        route_id: self.route_id.clone(),
145                    }]
146                }
147                RouteRuntimeState::Started => vec![],
148                _ => return Err(invalid(&self.state, "Started")),
149            },
150            RouteLifecycleCommand::Reload => match self.state {
151                RouteRuntimeState::Started
152                | RouteRuntimeState::Suspended
153                | RouteRuntimeState::Stopped
154                | RouteRuntimeState::Failed(_) => {
155                    self.state = RouteRuntimeState::Started;
156                    vec![RuntimeEvent::RouteReloaded {
157                        route_id: self.route_id.clone(),
158                    }]
159                }
160                _ => return Err(invalid(&self.state, "Started")),
161            },
162            RouteLifecycleCommand::Fail(error) => {
163                self.state = RouteRuntimeState::Failed(error.clone());
164                vec![RuntimeEvent::RouteFailed {
165                    route_id: self.route_id.clone(),
166                    error,
167                }]
168            }
169            RouteLifecycleCommand::Remove => match self.state {
170                RouteRuntimeState::Registered | RouteRuntimeState::Stopped => {
171                    vec![RuntimeEvent::RouteRemoved {
172                        route_id: self.route_id.clone(),
173                    }]
174                }
175                _ => {
176                    return Err(invalid(&self.state, "Removed"));
177                }
178            },
179        };
180
181        self.version += 1;
182        Ok(events)
183    }
184
185    // --- Two-phase lifecycle methods ---
186
187    /// Phase 1 of two-phase start: transition to Starting.
188    /// Idempotent if already Starting or Started.
189    pub fn begin_start(&mut self) -> Result<Vec<RuntimeEvent>, DomainError> {
190        match self.state {
191            RouteRuntimeState::Registered | RouteRuntimeState::Stopped => {
192                self.version += 1;
193                self.state = RouteRuntimeState::Starting;
194                Ok(vec![RuntimeEvent::RouteStartRequested {
195                    route_id: self.route_id.clone(),
196                }])
197            }
198            RouteRuntimeState::Starting | RouteRuntimeState::Started => Ok(vec![]),
199            _ => Err(DomainError::InvalidTransition {
200                from: format!("{:?}", self.state),
201                to: "Starting".to_string(),
202            }),
203        }
204    }
205
206    /// Phase 2 of two-phase start: transition from Starting to Started.
207    /// Idempotent if already Started.
208    pub fn confirm_start(&mut self) -> Result<Vec<RuntimeEvent>, DomainError> {
209        match self.state {
210            RouteRuntimeState::Starting => {
211                self.state = RouteRuntimeState::Started;
212                Ok(vec![RuntimeEvent::RouteStarted {
213                    route_id: self.route_id.clone(),
214                }])
215            }
216            RouteRuntimeState::Started => Ok(vec![]),
217            _ => Err(DomainError::InvalidTransition {
218                from: format!("{:?}", self.state),
219                to: "Started".to_string(),
220            }),
221        }
222    }
223
224    /// Transition to Failed state from any state.
225    /// Increments version to stay consistent with event replay
226    /// (the replay path increments version for RouteFailed events).
227    pub fn fail(&mut self, error: String) -> Vec<RuntimeEvent> {
228        self.state = RouteRuntimeState::Failed(error.clone());
229        self.version += 1;
230        vec![RuntimeEvent::RouteFailed {
231            route_id: self.route_id.clone(),
232            error,
233        }]
234    }
235
236    /// Human-readable label for the current lifecycle state.
237    pub fn state_label(&self) -> &'static str {
238        match self.state {
239            RouteRuntimeState::Registered => "Registered",
240            RouteRuntimeState::Starting => "Starting",
241            RouteRuntimeState::Started => "Started",
242            RouteRuntimeState::Suspended => "Suspended",
243            RouteRuntimeState::Stopping => "Stopping",
244            RouteRuntimeState::Stopped => "Stopped",
245            RouteRuntimeState::Failed(_) => "Failed",
246        }
247    }
248}
249
250#[cfg(test)]
251mod tests {
252    use super::*;
253
254    #[test]
255    fn cannot_suspend_when_stopped() {
256        let mut agg = RouteRuntimeAggregate::new("r1");
257        agg.state = RouteRuntimeState::Stopped;
258        let err = agg
259            .apply_command(RouteLifecycleCommand::Suspend)
260            .unwrap_err();
261        assert!(err.to_string().contains("invalid transition"));
262    }
263
264    #[test]
265    fn start_emits_route_started_event() {
266        let mut agg = RouteRuntimeAggregate::new("r1");
267        let events = agg.apply_command(RouteLifecycleCommand::Start).unwrap();
268        assert!(
269            events
270                .iter()
271                .any(|e| matches!(e, RuntimeEvent::RouteStarted { route_id } if route_id == "r1"))
272        );
273    }
274
275    #[test]
276    fn register_returns_aggregate_and_event() {
277        let (agg, events) = RouteRuntimeAggregate::register("r1");
278        assert_eq!(agg.route_id(), "r1");
279        assert_eq!(agg.state(), &RouteRuntimeState::Registered);
280        assert_eq!(events.len(), 1);
281        assert!(matches!(
282            &events[0],
283            RuntimeEvent::RouteRegistered { route_id } if route_id == "r1"
284        ));
285    }
286
287    #[test]
288    fn remove_from_registered_emits_removed() {
289        let mut agg = RouteRuntimeAggregate::new("r1");
290        let events = agg.apply_command(RouteLifecycleCommand::Remove).unwrap();
291        assert_eq!(events.len(), 1);
292        assert!(matches!(
293            &events[0],
294            RuntimeEvent::RouteRemoved { route_id } if route_id == "r1"
295        ));
296    }
297
298    #[test]
299    fn remove_from_started_is_invalid() {
300        let mut agg = RouteRuntimeAggregate::new("r1");
301        agg.apply_command(RouteLifecycleCommand::Start).unwrap();
302        let err = agg
303            .apply_command(RouteLifecycleCommand::Remove)
304            .unwrap_err();
305        assert!(err.to_string().contains("invalid transition"));
306    }
307
308    #[test]
309    fn remove_from_stopped_emits_removed() {
310        let mut agg = RouteRuntimeAggregate::new("r1");
311        agg.apply_command(RouteLifecycleCommand::Start).unwrap();
312        agg.apply_command(RouteLifecycleCommand::Stop).unwrap();
313        let events = agg.apply_command(RouteLifecycleCommand::Remove).unwrap();
314        assert_eq!(events.len(), 1);
315        assert!(matches!(
316            &events[0],
317            RuntimeEvent::RouteRemoved { route_id } if route_id == "r1"
318        ));
319    }
320
321    #[test]
322    fn start_from_started_is_idempotent() {
323        let mut agg = RouteRuntimeAggregate::new("r1");
324        agg.apply_command(RouteLifecycleCommand::Start).unwrap();
325        let events = agg.apply_command(RouteLifecycleCommand::Start).unwrap();
326        assert!(events.is_empty());
327        assert_eq!(agg.state(), &RouteRuntimeState::Started);
328    }
329
330    #[test]
331    fn stop_from_stopped_is_idempotent() {
332        let mut agg = RouteRuntimeAggregate::new("r1");
333        agg.apply_command(RouteLifecycleCommand::Start).unwrap();
334        agg.apply_command(RouteLifecycleCommand::Stop).unwrap();
335        let events = agg.apply_command(RouteLifecycleCommand::Stop).unwrap();
336        assert!(events.is_empty());
337        assert_eq!(agg.state(), &RouteRuntimeState::Stopped);
338    }
339
340    #[test]
341    fn suspend_from_suspended_is_idempotent() {
342        let mut agg = RouteRuntimeAggregate::new("r1");
343        agg.apply_command(RouteLifecycleCommand::Start).unwrap();
344        agg.apply_command(RouteLifecycleCommand::Suspend).unwrap();
345        let events = agg.apply_command(RouteLifecycleCommand::Suspend).unwrap();
346        assert!(events.is_empty());
347        assert_eq!(agg.state(), &RouteRuntimeState::Suspended);
348    }
349
350    #[test]
351    fn resume_from_started_is_idempotent() {
352        let mut agg = RouteRuntimeAggregate::new("r1");
353        agg.apply_command(RouteLifecycleCommand::Start).unwrap();
354        let events = agg.apply_command(RouteLifecycleCommand::Resume).unwrap();
355        assert!(events.is_empty());
356        assert_eq!(agg.state(), &RouteRuntimeState::Started);
357    }
358
359    // --- begin_start tests ---
360
361    #[test]
362    fn begin_start_from_registered_transitions_to_starting() {
363        let mut agg = RouteRuntimeAggregate::new("r1");
364        let version_before = agg.version();
365        let events = agg.begin_start().unwrap();
366        assert_eq!(*agg.state(), RouteRuntimeState::Starting);
367        assert_eq!(
368            events,
369            vec![RuntimeEvent::RouteStartRequested {
370                route_id: "r1".into()
371            }]
372        );
373        assert_eq!(
374            agg.version(),
375            version_before + 1,
376            "version must increment by 1 in begin_start"
377        );
378    }
379
380    #[test]
381    fn begin_start_from_stopped_transitions_to_starting() {
382        let mut agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Stopped, 1);
383        let version_before = agg.version();
384        let events = agg.begin_start().unwrap();
385        assert_eq!(*agg.state(), RouteRuntimeState::Starting);
386        assert_eq!(events.len(), 1);
387        assert!(matches!(
388            events[0],
389            RuntimeEvent::RouteStartRequested { .. }
390        ));
391        assert_eq!(
392            agg.version(),
393            version_before + 1,
394            "version must increment by 1 in begin_start"
395        );
396    }
397
398    #[test]
399    fn begin_start_idempotent_on_starting() {
400        let mut agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Starting, 1);
401        let events = agg.begin_start().unwrap();
402        assert_eq!(*agg.state(), RouteRuntimeState::Starting);
403        assert!(events.is_empty());
404    }
405
406    #[test]
407    fn begin_start_idempotent_on_started() {
408        let mut agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Started, 2);
409        let events = agg.begin_start().unwrap();
410        assert_eq!(*agg.state(), RouteRuntimeState::Started);
411        assert!(events.is_empty());
412    }
413
414    #[test]
415    fn begin_start_rejects_invalid_states() {
416        for state in [
417            RouteRuntimeState::Suspended,
418            RouteRuntimeState::Failed("err".into()),
419        ] {
420            let mut agg = RouteRuntimeAggregate::from_snapshot("r1", state.clone(), 1);
421            assert!(agg.begin_start().is_err());
422        }
423    }
424
425    // --- confirm_start tests ---
426
427    #[test]
428    fn confirm_start_from_starting_transitions_to_started() {
429        let mut agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Starting, 1);
430        let events = agg.confirm_start().unwrap();
431        assert_eq!(*agg.state(), RouteRuntimeState::Started);
432        assert_eq!(
433            events,
434            vec![RuntimeEvent::RouteStarted {
435                route_id: "r1".into()
436            }]
437        );
438    }
439
440    #[test]
441    fn confirm_start_idempotent_on_started() {
442        let mut agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Started, 2);
443        let events = agg.confirm_start().unwrap();
444        assert!(events.is_empty());
445        assert_eq!(*agg.state(), RouteRuntimeState::Started);
446    }
447
448    #[test]
449    fn confirm_start_rejects_non_starting() {
450        let mut agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Registered, 0);
451        assert!(agg.confirm_start().is_err());
452    }
453
454    // --- fail tests ---
455
456    #[test]
457    fn fail_from_any_state() {
458        for state in [
459            RouteRuntimeState::Registered,
460            RouteRuntimeState::Starting,
461            RouteRuntimeState::Started,
462            RouteRuntimeState::Stopped,
463            RouteRuntimeState::Suspended,
464            RouteRuntimeState::Stopping,
465        ] {
466            let mut agg = RouteRuntimeAggregate::from_snapshot("r1", state, 1);
467            let version_before = agg.version();
468            let events = agg.fail("crash".into());
469            assert_eq!(*agg.state(), RouteRuntimeState::Failed("crash".into()));
470            assert_eq!(
471                agg.version(),
472                version_before + 1,
473                "fail() must increment version to match replay"
474            );
475            assert_eq!(events.len(), 1);
476            assert!(
477                matches!(&events[0], RuntimeEvent::RouteFailed { route_id, error } if route_id == "r1" && error == "crash")
478            );
479        }
480    }
481
482    #[test]
483    fn state_label_covers_all_states() {
484        let agg = RouteRuntimeAggregate::new("r1");
485        assert_eq!(agg.state_label(), "Registered");
486        let agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Starting, 1);
487        assert_eq!(agg.state_label(), "Starting");
488        let agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Started, 2);
489        assert_eq!(agg.state_label(), "Started");
490        let agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Suspended, 2);
491        assert_eq!(agg.state_label(), "Suspended");
492        let agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Stopping, 2);
493        assert_eq!(agg.state_label(), "Stopping");
494        let agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Stopped, 2);
495        assert_eq!(agg.state_label(), "Stopped");
496        let agg =
497            RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Failed("e".into()), 2);
498        assert_eq!(agg.state_label(), "Failed");
499    }
500
501    #[test]
502    fn state_from_event_maps_all_variants() {
503        assert_eq!(
504            RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteRegistered {
505                route_id: "r".into()
506            }),
507            Some(RouteRuntimeState::Registered)
508        );
509        assert_eq!(
510            RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteStartRequested {
511                route_id: "r".into()
512            }),
513            Some(RouteRuntimeState::Starting)
514        );
515        assert_eq!(
516            RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteStarted {
517                route_id: "r".into()
518            }),
519            Some(RouteRuntimeState::Started)
520        );
521        assert_eq!(
522            RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteStopped {
523                route_id: "r".into()
524            }),
525            Some(RouteRuntimeState::Stopped)
526        );
527        assert_eq!(
528            RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteSuspended {
529                route_id: "r".into()
530            }),
531            Some(RouteRuntimeState::Suspended)
532        );
533        assert_eq!(
534            RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteResumed {
535                route_id: "r".into()
536            }),
537            Some(RouteRuntimeState::Started)
538        );
539        assert_eq!(
540            RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteFailed {
541                route_id: "r".into(),
542                error: "e".into()
543            }),
544            Some(RouteRuntimeState::Failed("e".into()))
545        );
546        assert_eq!(
547            RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteReloaded {
548                route_id: "r".into()
549            }),
550            Some(RouteRuntimeState::Started)
551        );
552        assert_eq!(
553            RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteRemoved {
554                route_id: "r".into()
555            }),
556            None
557        );
558    }
559}