Skip to main content

camel_core/lifecycle/domain/
route_runtime.rs

1use crate::lifecycle::domain::DomainError;
2use crate::lifecycle::domain::RuntimeEvent;
3
4#[derive(Debug, Clone, PartialEq, Eq)]
5pub enum RouteRuntimeState {
6    Registered,
7    Starting,
8    Started,
9    Suspended,
10    Stopping,
11    Stopped,
12    Failed(String),
13}
14
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub enum RouteLifecycleCommand {
17    Start,
18    Stop,
19    Suspend,
20    Resume,
21    Reload,
22    Fail(String),
23    Remove,
24}
25
26#[derive(Debug, Clone)]
27pub struct RouteRuntimeAggregate {
28    route_id: String,
29    state: RouteRuntimeState,
30    version: u64,
31}
32
33impl RouteRuntimeAggregate {
34    pub fn new(route_id: impl Into<String>) -> Self {
35        Self {
36            route_id: route_id.into(),
37            state: RouteRuntimeState::Registered,
38            version: 0,
39        }
40    }
41
42    /// Constructor that returns both the aggregate and the initial `RouteRegistered` event.
43    /// Used by command handlers — avoids duplicating event construction outside the domain.
44    pub fn register(route_id: impl Into<String>) -> (Self, Vec<RuntimeEvent>) {
45        let route_id = route_id.into();
46        let aggregate = Self::new(route_id.clone());
47        let events = vec![RuntimeEvent::RouteRegistered { route_id }];
48        (aggregate, events)
49    }
50
51    pub fn from_snapshot(
52        route_id: impl Into<String>,
53        state: RouteRuntimeState,
54        version: u64,
55    ) -> Self {
56        Self {
57            route_id: route_id.into(),
58            state,
59            version,
60        }
61    }
62
63    pub fn state(&self) -> &RouteRuntimeState {
64        &self.state
65    }
66
67    pub fn version(&self) -> u64 {
68        self.version
69    }
70
71    pub fn route_id(&self) -> &str {
72        &self.route_id
73    }
74
75    /// Map a persisted RuntimeEvent to the resulting RouteRuntimeState.
76    /// Used for event replay — avoids duplicating the state machine outside the domain.
77    pub fn state_from_event(event: &RuntimeEvent) -> Option<RouteRuntimeState> {
78        match event {
79            RuntimeEvent::RouteRegistered { .. } => Some(RouteRuntimeState::Registered),
80            RuntimeEvent::RouteStartRequested { .. } => Some(RouteRuntimeState::Starting),
81            RuntimeEvent::RouteStarted { .. } => Some(RouteRuntimeState::Started),
82            RuntimeEvent::RouteStopped { .. } => Some(RouteRuntimeState::Stopped),
83            RuntimeEvent::RouteSuspended { .. } => Some(RouteRuntimeState::Suspended),
84            RuntimeEvent::RouteResumed { .. } => Some(RouteRuntimeState::Started),
85            RuntimeEvent::RouteFailed { error, .. } => {
86                Some(RouteRuntimeState::Failed(error.clone()))
87            }
88            RuntimeEvent::RouteReloaded { .. } => Some(RouteRuntimeState::Started),
89            RuntimeEvent::RouteRemoved { .. } => None,
90        }
91    }
92
93    pub fn apply_command(
94        &mut self,
95        cmd: RouteLifecycleCommand,
96    ) -> Result<Vec<RuntimeEvent>, DomainError> {
97        let invalid = |from: &RouteRuntimeState, to: &str| DomainError::InvalidTransition {
98            from: format!("{from:?}"),
99            to: to.to_string(),
100        };
101
102        let events = match cmd {
103            RouteLifecycleCommand::Start => match self.state {
104                RouteRuntimeState::Registered | RouteRuntimeState::Stopped => {
105                    self.state = RouteRuntimeState::Started;
106                    vec![
107                        RuntimeEvent::RouteStartRequested {
108                            route_id: self.route_id.clone(),
109                        },
110                        RuntimeEvent::RouteStarted {
111                            route_id: self.route_id.clone(),
112                        },
113                    ]
114                }
115                _ => return Err(invalid(&self.state, "Started")),
116            },
117            RouteLifecycleCommand::Stop => match self.state {
118                RouteRuntimeState::Started
119                | RouteRuntimeState::Suspended
120                | RouteRuntimeState::Failed(_) => {
121                    self.state = RouteRuntimeState::Stopped;
122                    vec![RuntimeEvent::RouteStopped {
123                        route_id: self.route_id.clone(),
124                    }]
125                }
126                _ => return Err(invalid(&self.state, "Stopped")),
127            },
128            RouteLifecycleCommand::Suspend => match self.state {
129                RouteRuntimeState::Started => {
130                    self.state = RouteRuntimeState::Suspended;
131                    vec![RuntimeEvent::RouteSuspended {
132                        route_id: self.route_id.clone(),
133                    }]
134                }
135                _ => return Err(invalid(&self.state, "Suspended")),
136            },
137            RouteLifecycleCommand::Resume => match self.state {
138                RouteRuntimeState::Suspended => {
139                    self.state = RouteRuntimeState::Started;
140                    vec![RuntimeEvent::RouteResumed {
141                        route_id: self.route_id.clone(),
142                    }]
143                }
144                _ => return Err(invalid(&self.state, "Started")),
145            },
146            RouteLifecycleCommand::Reload => match self.state {
147                RouteRuntimeState::Started
148                | RouteRuntimeState::Suspended
149                | RouteRuntimeState::Stopped
150                | RouteRuntimeState::Failed(_) => {
151                    self.state = RouteRuntimeState::Started;
152                    vec![RuntimeEvent::RouteReloaded {
153                        route_id: self.route_id.clone(),
154                    }]
155                }
156                _ => return Err(invalid(&self.state, "Started")),
157            },
158            RouteLifecycleCommand::Fail(error) => {
159                self.state = RouteRuntimeState::Failed(error.clone());
160                vec![RuntimeEvent::RouteFailed {
161                    route_id: self.route_id.clone(),
162                    error,
163                }]
164            }
165            RouteLifecycleCommand::Remove => match self.state {
166                RouteRuntimeState::Registered | RouteRuntimeState::Stopped => {
167                    vec![RuntimeEvent::RouteRemoved {
168                        route_id: self.route_id.clone(),
169                    }]
170                }
171                _ => {
172                    return Err(invalid(&self.state, "Removed"));
173                }
174            },
175        };
176
177        self.version += 1;
178        Ok(events)
179    }
180}
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185
186    #[test]
187    fn cannot_suspend_when_stopped() {
188        let mut agg = RouteRuntimeAggregate::new("r1");
189        agg.state = RouteRuntimeState::Stopped;
190        let err = agg
191            .apply_command(RouteLifecycleCommand::Suspend)
192            .unwrap_err();
193        assert!(err.to_string().contains("invalid transition"));
194    }
195
196    #[test]
197    fn start_emits_route_started_event() {
198        let mut agg = RouteRuntimeAggregate::new("r1");
199        let events = agg.apply_command(RouteLifecycleCommand::Start).unwrap();
200        assert!(
201            events
202                .iter()
203                .any(|e| matches!(e, RuntimeEvent::RouteStarted { route_id } if route_id == "r1"))
204        );
205    }
206
207    #[test]
208    fn register_returns_aggregate_and_event() {
209        let (agg, events) = RouteRuntimeAggregate::register("r1");
210        assert_eq!(agg.route_id(), "r1");
211        assert_eq!(agg.state(), &RouteRuntimeState::Registered);
212        assert_eq!(events.len(), 1);
213        assert!(matches!(
214            &events[0],
215            RuntimeEvent::RouteRegistered { route_id } if route_id == "r1"
216        ));
217    }
218
219    #[test]
220    fn remove_from_registered_emits_removed() {
221        let mut agg = RouteRuntimeAggregate::new("r1");
222        let events = agg.apply_command(RouteLifecycleCommand::Remove).unwrap();
223        assert_eq!(events.len(), 1);
224        assert!(matches!(
225            &events[0],
226            RuntimeEvent::RouteRemoved { route_id } if route_id == "r1"
227        ));
228    }
229
230    #[test]
231    fn remove_from_started_is_invalid() {
232        let mut agg = RouteRuntimeAggregate::new("r1");
233        agg.apply_command(RouteLifecycleCommand::Start).unwrap();
234        let err = agg
235            .apply_command(RouteLifecycleCommand::Remove)
236            .unwrap_err();
237        assert!(err.to_string().contains("invalid transition"));
238    }
239
240    #[test]
241    fn remove_from_stopped_emits_removed() {
242        let mut agg = RouteRuntimeAggregate::new("r1");
243        agg.apply_command(RouteLifecycleCommand::Start).unwrap();
244        agg.apply_command(RouteLifecycleCommand::Stop).unwrap();
245        let events = agg.apply_command(RouteLifecycleCommand::Remove).unwrap();
246        assert_eq!(events.len(), 1);
247        assert!(matches!(
248            &events[0],
249            RuntimeEvent::RouteRemoved { route_id } if route_id == "r1"
250        ));
251    }
252
253    #[test]
254    fn state_from_event_maps_all_variants() {
255        assert_eq!(
256            RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteRegistered {
257                route_id: "r".into()
258            }),
259            Some(RouteRuntimeState::Registered)
260        );
261        assert_eq!(
262            RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteStartRequested {
263                route_id: "r".into()
264            }),
265            Some(RouteRuntimeState::Starting)
266        );
267        assert_eq!(
268            RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteStarted {
269                route_id: "r".into()
270            }),
271            Some(RouteRuntimeState::Started)
272        );
273        assert_eq!(
274            RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteStopped {
275                route_id: "r".into()
276            }),
277            Some(RouteRuntimeState::Stopped)
278        );
279        assert_eq!(
280            RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteSuspended {
281                route_id: "r".into()
282            }),
283            Some(RouteRuntimeState::Suspended)
284        );
285        assert_eq!(
286            RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteResumed {
287                route_id: "r".into()
288            }),
289            Some(RouteRuntimeState::Started)
290        );
291        assert_eq!(
292            RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteFailed {
293                route_id: "r".into(),
294                error: "e".into()
295            }),
296            Some(RouteRuntimeState::Failed("e".into()))
297        );
298        assert_eq!(
299            RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteReloaded {
300                route_id: "r".into()
301            }),
302            Some(RouteRuntimeState::Started)
303        );
304        assert_eq!(
305            RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteRemoved {
306                route_id: "r".into()
307            }),
308            None
309        );
310    }
311}