camel_core/lifecycle/domain/
route_runtime.rs1use crate::lifecycle::domain::DomainError;
2use crate::lifecycle::domain::RuntimeEvent;
3
4#[derive(Debug, Clone, PartialEq, Eq)]
5pub enum RouteRuntimeState {
6 Registered,
7 Starting,
8 Started,
9 Suspended,
10 Stopping,
11 Stopped,
12 Failed(String),
13}
14
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub enum RouteLifecycleCommand {
17 Start,
18 Stop,
19 Suspend,
20 Resume,
21 Reload,
22 Fail(String),
23 Remove,
24}
25
26#[derive(Debug, Clone)]
27pub struct RouteRuntimeAggregate {
28 route_id: String,
29 state: RouteRuntimeState,
30 version: u64,
31}
32
33impl RouteRuntimeAggregate {
34 pub fn new(route_id: impl Into<String>) -> Self {
35 Self {
36 route_id: route_id.into(),
37 state: RouteRuntimeState::Registered,
38 version: 0,
39 }
40 }
41
42 pub fn register(route_id: impl Into<String>) -> (Self, Vec<RuntimeEvent>) {
45 let route_id = route_id.into();
46 let aggregate = Self::new(route_id.clone());
47 let events = vec![RuntimeEvent::RouteRegistered { route_id }];
48 (aggregate, events)
49 }
50
51 pub fn from_snapshot(
52 route_id: impl Into<String>,
53 state: RouteRuntimeState,
54 version: u64,
55 ) -> Self {
56 Self {
57 route_id: route_id.into(),
58 state,
59 version,
60 }
61 }
62
63 pub fn state(&self) -> &RouteRuntimeState {
64 &self.state
65 }
66
67 pub fn version(&self) -> u64 {
68 self.version
69 }
70
71 pub fn route_id(&self) -> &str {
72 &self.route_id
73 }
74
75 pub fn state_from_event(event: &RuntimeEvent) -> Option<RouteRuntimeState> {
78 match event {
79 RuntimeEvent::RouteRegistered { .. } => Some(RouteRuntimeState::Registered),
80 RuntimeEvent::RouteStartRequested { .. } => Some(RouteRuntimeState::Starting),
81 RuntimeEvent::RouteStarted { .. } => Some(RouteRuntimeState::Started),
82 RuntimeEvent::RouteStopped { .. } => Some(RouteRuntimeState::Stopped),
83 RuntimeEvent::RouteSuspended { .. } => Some(RouteRuntimeState::Suspended),
84 RuntimeEvent::RouteResumed { .. } => Some(RouteRuntimeState::Started),
85 RuntimeEvent::RouteFailed { error, .. } => {
86 Some(RouteRuntimeState::Failed(error.clone()))
87 }
88 RuntimeEvent::RouteReloaded { .. } => Some(RouteRuntimeState::Started),
89 RuntimeEvent::RouteRemoved { .. } => None,
90 }
91 }
92
93 pub fn apply_command(
94 &mut self,
95 cmd: RouteLifecycleCommand,
96 ) -> Result<Vec<RuntimeEvent>, DomainError> {
97 let invalid = |from: &RouteRuntimeState, to: &str| DomainError::InvalidTransition {
98 from: format!("{from:?}"),
99 to: to.to_string(),
100 };
101
102 let events = match cmd {
103 RouteLifecycleCommand::Start => match self.state {
104 RouteRuntimeState::Registered | RouteRuntimeState::Stopped => {
105 self.state = RouteRuntimeState::Started;
106 vec![
107 RuntimeEvent::RouteStartRequested {
108 route_id: self.route_id.clone(),
109 },
110 RuntimeEvent::RouteStarted {
111 route_id: self.route_id.clone(),
112 },
113 ]
114 }
115 RouteRuntimeState::Started => vec![],
116 _ => return Err(invalid(&self.state, "Started")),
117 },
118 RouteLifecycleCommand::Stop => match self.state {
119 RouteRuntimeState::Started
120 | RouteRuntimeState::Suspended
121 | RouteRuntimeState::Failed(_) => {
122 self.state = RouteRuntimeState::Stopped;
123 vec![RuntimeEvent::RouteStopped {
124 route_id: self.route_id.clone(),
125 }]
126 }
127 RouteRuntimeState::Stopped => vec![],
128 _ => return Err(invalid(&self.state, "Stopped")),
129 },
130 RouteLifecycleCommand::Suspend => match self.state {
131 RouteRuntimeState::Started => {
132 self.state = RouteRuntimeState::Suspended;
133 vec![RuntimeEvent::RouteSuspended {
134 route_id: self.route_id.clone(),
135 }]
136 }
137 RouteRuntimeState::Suspended => vec![],
138 _ => return Err(invalid(&self.state, "Suspended")),
139 },
140 RouteLifecycleCommand::Resume => match self.state {
141 RouteRuntimeState::Suspended => {
142 self.state = RouteRuntimeState::Started;
143 vec![RuntimeEvent::RouteResumed {
144 route_id: self.route_id.clone(),
145 }]
146 }
147 RouteRuntimeState::Started => vec![],
148 _ => return Err(invalid(&self.state, "Started")),
149 },
150 RouteLifecycleCommand::Reload => match self.state {
151 RouteRuntimeState::Started
152 | RouteRuntimeState::Suspended
153 | RouteRuntimeState::Stopped
154 | RouteRuntimeState::Failed(_) => {
155 self.state = RouteRuntimeState::Started;
156 vec![RuntimeEvent::RouteReloaded {
157 route_id: self.route_id.clone(),
158 }]
159 }
160 _ => return Err(invalid(&self.state, "Started")),
161 },
162 RouteLifecycleCommand::Fail(error) => {
163 self.state = RouteRuntimeState::Failed(error.clone());
164 vec![RuntimeEvent::RouteFailed {
165 route_id: self.route_id.clone(),
166 error,
167 }]
168 }
169 RouteLifecycleCommand::Remove => match self.state {
170 RouteRuntimeState::Registered | RouteRuntimeState::Stopped => {
171 vec![RuntimeEvent::RouteRemoved {
172 route_id: self.route_id.clone(),
173 }]
174 }
175 _ => {
176 return Err(invalid(&self.state, "Removed"));
177 }
178 },
179 };
180
181 self.version += 1;
182 Ok(events)
183 }
184}
185
186#[cfg(test)]
187mod tests {
188 use super::*;
189
190 #[test]
191 fn cannot_suspend_when_stopped() {
192 let mut agg = RouteRuntimeAggregate::new("r1");
193 agg.state = RouteRuntimeState::Stopped;
194 let err = agg
195 .apply_command(RouteLifecycleCommand::Suspend)
196 .unwrap_err();
197 assert!(err.to_string().contains("invalid transition"));
198 }
199
200 #[test]
201 fn start_emits_route_started_event() {
202 let mut agg = RouteRuntimeAggregate::new("r1");
203 let events = agg.apply_command(RouteLifecycleCommand::Start).unwrap();
204 assert!(
205 events
206 .iter()
207 .any(|e| matches!(e, RuntimeEvent::RouteStarted { route_id } if route_id == "r1"))
208 );
209 }
210
211 #[test]
212 fn register_returns_aggregate_and_event() {
213 let (agg, events) = RouteRuntimeAggregate::register("r1");
214 assert_eq!(agg.route_id(), "r1");
215 assert_eq!(agg.state(), &RouteRuntimeState::Registered);
216 assert_eq!(events.len(), 1);
217 assert!(matches!(
218 &events[0],
219 RuntimeEvent::RouteRegistered { route_id } if route_id == "r1"
220 ));
221 }
222
223 #[test]
224 fn remove_from_registered_emits_removed() {
225 let mut agg = RouteRuntimeAggregate::new("r1");
226 let events = agg.apply_command(RouteLifecycleCommand::Remove).unwrap();
227 assert_eq!(events.len(), 1);
228 assert!(matches!(
229 &events[0],
230 RuntimeEvent::RouteRemoved { route_id } if route_id == "r1"
231 ));
232 }
233
234 #[test]
235 fn remove_from_started_is_invalid() {
236 let mut agg = RouteRuntimeAggregate::new("r1");
237 agg.apply_command(RouteLifecycleCommand::Start).unwrap();
238 let err = agg
239 .apply_command(RouteLifecycleCommand::Remove)
240 .unwrap_err();
241 assert!(err.to_string().contains("invalid transition"));
242 }
243
244 #[test]
245 fn remove_from_stopped_emits_removed() {
246 let mut agg = RouteRuntimeAggregate::new("r1");
247 agg.apply_command(RouteLifecycleCommand::Start).unwrap();
248 agg.apply_command(RouteLifecycleCommand::Stop).unwrap();
249 let events = agg.apply_command(RouteLifecycleCommand::Remove).unwrap();
250 assert_eq!(events.len(), 1);
251 assert!(matches!(
252 &events[0],
253 RuntimeEvent::RouteRemoved { route_id } if route_id == "r1"
254 ));
255 }
256
257 #[test]
258 fn start_from_started_is_idempotent() {
259 let mut agg = RouteRuntimeAggregate::new("r1");
260 agg.apply_command(RouteLifecycleCommand::Start).unwrap();
261 let events = agg.apply_command(RouteLifecycleCommand::Start).unwrap();
262 assert!(events.is_empty());
263 assert_eq!(agg.state(), &RouteRuntimeState::Started);
264 }
265
266 #[test]
267 fn stop_from_stopped_is_idempotent() {
268 let mut agg = RouteRuntimeAggregate::new("r1");
269 agg.apply_command(RouteLifecycleCommand::Start).unwrap();
270 agg.apply_command(RouteLifecycleCommand::Stop).unwrap();
271 let events = agg.apply_command(RouteLifecycleCommand::Stop).unwrap();
272 assert!(events.is_empty());
273 assert_eq!(agg.state(), &RouteRuntimeState::Stopped);
274 }
275
276 #[test]
277 fn suspend_from_suspended_is_idempotent() {
278 let mut agg = RouteRuntimeAggregate::new("r1");
279 agg.apply_command(RouteLifecycleCommand::Start).unwrap();
280 agg.apply_command(RouteLifecycleCommand::Suspend).unwrap();
281 let events = agg.apply_command(RouteLifecycleCommand::Suspend).unwrap();
282 assert!(events.is_empty());
283 assert_eq!(agg.state(), &RouteRuntimeState::Suspended);
284 }
285
286 #[test]
287 fn resume_from_started_is_idempotent() {
288 let mut agg = RouteRuntimeAggregate::new("r1");
289 agg.apply_command(RouteLifecycleCommand::Start).unwrap();
290 let events = agg.apply_command(RouteLifecycleCommand::Resume).unwrap();
291 assert!(events.is_empty());
292 assert_eq!(agg.state(), &RouteRuntimeState::Started);
293 }
294
295 #[test]
296 fn state_from_event_maps_all_variants() {
297 assert_eq!(
298 RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteRegistered {
299 route_id: "r".into()
300 }),
301 Some(RouteRuntimeState::Registered)
302 );
303 assert_eq!(
304 RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteStartRequested {
305 route_id: "r".into()
306 }),
307 Some(RouteRuntimeState::Starting)
308 );
309 assert_eq!(
310 RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteStarted {
311 route_id: "r".into()
312 }),
313 Some(RouteRuntimeState::Started)
314 );
315 assert_eq!(
316 RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteStopped {
317 route_id: "r".into()
318 }),
319 Some(RouteRuntimeState::Stopped)
320 );
321 assert_eq!(
322 RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteSuspended {
323 route_id: "r".into()
324 }),
325 Some(RouteRuntimeState::Suspended)
326 );
327 assert_eq!(
328 RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteResumed {
329 route_id: "r".into()
330 }),
331 Some(RouteRuntimeState::Started)
332 );
333 assert_eq!(
334 RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteFailed {
335 route_id: "r".into(),
336 error: "e".into()
337 }),
338 Some(RouteRuntimeState::Failed("e".into()))
339 );
340 assert_eq!(
341 RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteReloaded {
342 route_id: "r".into()
343 }),
344 Some(RouteRuntimeState::Started)
345 );
346 assert_eq!(
347 RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteRemoved {
348 route_id: "r".into()
349 }),
350 None
351 );
352 }
353}