1use crate::lifecycle::domain::DomainError;
2use crate::lifecycle::domain::RuntimeEvent;
3
4#[derive(Debug, Clone, PartialEq, Eq)]
5pub enum RouteRuntimeState {
6 Registered,
7 Starting,
8 Started,
9 Suspended,
10 Stopping,
11 Stopped,
12 Failed(String),
13}
14
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub enum RouteLifecycleCommand {
17 Start,
18 Stop,
19 Suspend,
20 Resume,
21 Reload,
22 Fail(String),
23 Remove,
24}
25
26#[derive(Debug, Clone)]
27pub struct RouteRuntimeAggregate {
28 route_id: String,
29 state: RouteRuntimeState,
30 version: u64,
31}
32
33impl RouteRuntimeAggregate {
34 pub fn new(route_id: impl Into<String>) -> Self {
35 Self {
36 route_id: route_id.into(),
37 state: RouteRuntimeState::Registered,
38 version: 0,
39 }
40 }
41
42 pub fn register(route_id: impl Into<String>) -> (Self, Vec<RuntimeEvent>) {
45 let route_id = route_id.into();
46 let aggregate = Self::new(route_id.clone());
47 let events = vec![RuntimeEvent::RouteRegistered { route_id }];
48 (aggregate, events)
49 }
50
51 pub fn from_snapshot(
52 route_id: impl Into<String>,
53 state: RouteRuntimeState,
54 version: u64,
55 ) -> Self {
56 Self {
57 route_id: route_id.into(),
58 state,
59 version,
60 }
61 }
62
63 pub fn state(&self) -> &RouteRuntimeState {
64 &self.state
65 }
66
67 pub fn version(&self) -> u64 {
68 self.version
69 }
70
71 pub fn route_id(&self) -> &str {
72 &self.route_id
73 }
74
75 pub fn state_from_event(event: &RuntimeEvent) -> Option<RouteRuntimeState> {
78 match event {
79 RuntimeEvent::RouteRegistered { .. } => Some(RouteRuntimeState::Registered),
80 RuntimeEvent::RouteStartRequested { .. } => Some(RouteRuntimeState::Starting),
81 RuntimeEvent::RouteStarted { .. } => Some(RouteRuntimeState::Started),
82 RuntimeEvent::RouteStopped { .. } => Some(RouteRuntimeState::Stopped),
83 RuntimeEvent::RouteSuspended { .. } => Some(RouteRuntimeState::Suspended),
84 RuntimeEvent::RouteResumed { .. } => Some(RouteRuntimeState::Started),
85 RuntimeEvent::RouteFailed { error, .. } => {
86 Some(RouteRuntimeState::Failed(error.clone()))
87 }
88 RuntimeEvent::RouteReloaded { .. } => Some(RouteRuntimeState::Started),
89 RuntimeEvent::RouteRemoved { .. } => None,
90 }
91 }
92
93 pub fn apply_command(
94 &mut self,
95 cmd: RouteLifecycleCommand,
96 ) -> Result<Vec<RuntimeEvent>, DomainError> {
97 let invalid = |from: &RouteRuntimeState, to: &str| DomainError::InvalidTransition {
98 from: format!("{from:?}"),
99 to: to.to_string(),
100 };
101
102 let events = match cmd {
103 RouteLifecycleCommand::Start => match self.state {
104 RouteRuntimeState::Registered | RouteRuntimeState::Stopped => {
105 self.state = RouteRuntimeState::Started;
106 vec![
107 RuntimeEvent::RouteStartRequested {
108 route_id: self.route_id.clone(),
109 },
110 RuntimeEvent::RouteStarted {
111 route_id: self.route_id.clone(),
112 },
113 ]
114 }
115 RouteRuntimeState::Started => vec![],
116 _ => return Err(invalid(&self.state, "Started")),
117 },
118 RouteLifecycleCommand::Stop => match self.state {
119 RouteRuntimeState::Started
120 | RouteRuntimeState::Suspended
121 | RouteRuntimeState::Failed(_) => {
122 self.state = RouteRuntimeState::Stopped;
123 vec![RuntimeEvent::RouteStopped {
124 route_id: self.route_id.clone(),
125 }]
126 }
127 RouteRuntimeState::Stopped => vec![],
128 _ => return Err(invalid(&self.state, "Stopped")),
129 },
130 RouteLifecycleCommand::Suspend => match self.state {
131 RouteRuntimeState::Started => {
132 self.state = RouteRuntimeState::Suspended;
133 vec![RuntimeEvent::RouteSuspended {
134 route_id: self.route_id.clone(),
135 }]
136 }
137 RouteRuntimeState::Suspended => vec![],
138 _ => return Err(invalid(&self.state, "Suspended")),
139 },
140 RouteLifecycleCommand::Resume => match self.state {
141 RouteRuntimeState::Suspended => {
142 self.state = RouteRuntimeState::Started;
143 vec![RuntimeEvent::RouteResumed {
144 route_id: self.route_id.clone(),
145 }]
146 }
147 RouteRuntimeState::Started => vec![],
148 _ => return Err(invalid(&self.state, "Started")),
149 },
150 RouteLifecycleCommand::Reload => match self.state {
151 RouteRuntimeState::Started
152 | RouteRuntimeState::Suspended
153 | RouteRuntimeState::Stopped
154 | RouteRuntimeState::Failed(_) => {
155 self.state = RouteRuntimeState::Started;
156 vec![RuntimeEvent::RouteReloaded {
157 route_id: self.route_id.clone(),
158 }]
159 }
160 _ => return Err(invalid(&self.state, "Started")),
161 },
162 RouteLifecycleCommand::Fail(error) => {
163 self.state = RouteRuntimeState::Failed(error.clone());
164 vec![RuntimeEvent::RouteFailed {
165 route_id: self.route_id.clone(),
166 error,
167 }]
168 }
169 RouteLifecycleCommand::Remove => match self.state {
170 RouteRuntimeState::Registered | RouteRuntimeState::Stopped => {
171 vec![RuntimeEvent::RouteRemoved {
172 route_id: self.route_id.clone(),
173 }]
174 }
175 _ => {
176 return Err(invalid(&self.state, "Removed"));
177 }
178 },
179 };
180
181 self.version += 1;
182 Ok(events)
183 }
184
185 pub fn begin_start(&mut self) -> Result<Vec<RuntimeEvent>, DomainError> {
190 match self.state {
191 RouteRuntimeState::Registered | RouteRuntimeState::Stopped => {
192 self.version += 1;
193 self.state = RouteRuntimeState::Starting;
194 Ok(vec![RuntimeEvent::RouteStartRequested {
195 route_id: self.route_id.clone(),
196 }])
197 }
198 RouteRuntimeState::Starting | RouteRuntimeState::Started => Ok(vec![]),
199 _ => Err(DomainError::InvalidTransition {
200 from: format!("{:?}", self.state),
201 to: "Starting".to_string(),
202 }),
203 }
204 }
205
206 pub fn confirm_start(&mut self) -> Result<Vec<RuntimeEvent>, DomainError> {
209 match self.state {
210 RouteRuntimeState::Starting => {
211 self.state = RouteRuntimeState::Started;
212 Ok(vec![RuntimeEvent::RouteStarted {
213 route_id: self.route_id.clone(),
214 }])
215 }
216 RouteRuntimeState::Started => Ok(vec![]),
217 _ => Err(DomainError::InvalidTransition {
218 from: format!("{:?}", self.state),
219 to: "Started".to_string(),
220 }),
221 }
222 }
223
224 pub fn fail(&mut self, error: String) -> Vec<RuntimeEvent> {
228 self.state = RouteRuntimeState::Failed(error.clone());
229 self.version += 1;
230 vec![RuntimeEvent::RouteFailed {
231 route_id: self.route_id.clone(),
232 error,
233 }]
234 }
235
236 pub fn state_label(&self) -> &'static str {
238 match self.state {
239 RouteRuntimeState::Registered => "Registered",
240 RouteRuntimeState::Starting => "Starting",
241 RouteRuntimeState::Started => "Started",
242 RouteRuntimeState::Suspended => "Suspended",
243 RouteRuntimeState::Stopping => "Stopping",
244 RouteRuntimeState::Stopped => "Stopped",
245 RouteRuntimeState::Failed(_) => "Failed",
246 }
247 }
248}
249
250#[cfg(test)]
251mod tests {
252 use super::*;
253
254 #[test]
255 fn cannot_suspend_when_stopped() {
256 let mut agg = RouteRuntimeAggregate::new("r1");
257 agg.state = RouteRuntimeState::Stopped;
258 let err = agg
259 .apply_command(RouteLifecycleCommand::Suspend)
260 .unwrap_err();
261 assert!(err.to_string().contains("invalid transition"));
262 }
263
264 #[test]
265 fn start_emits_route_started_event() {
266 let mut agg = RouteRuntimeAggregate::new("r1");
267 let events = agg.apply_command(RouteLifecycleCommand::Start).unwrap();
268 assert!(
269 events
270 .iter()
271 .any(|e| matches!(e, RuntimeEvent::RouteStarted { route_id } if route_id == "r1"))
272 );
273 }
274
275 #[test]
276 fn register_returns_aggregate_and_event() {
277 let (agg, events) = RouteRuntimeAggregate::register("r1");
278 assert_eq!(agg.route_id(), "r1");
279 assert_eq!(agg.state(), &RouteRuntimeState::Registered);
280 assert_eq!(events.len(), 1);
281 assert!(matches!(
282 &events[0],
283 RuntimeEvent::RouteRegistered { route_id } if route_id == "r1"
284 ));
285 }
286
287 #[test]
288 fn remove_from_registered_emits_removed() {
289 let mut agg = RouteRuntimeAggregate::new("r1");
290 let events = agg.apply_command(RouteLifecycleCommand::Remove).unwrap();
291 assert_eq!(events.len(), 1);
292 assert!(matches!(
293 &events[0],
294 RuntimeEvent::RouteRemoved { route_id } if route_id == "r1"
295 ));
296 }
297
298 #[test]
299 fn remove_from_started_is_invalid() {
300 let mut agg = RouteRuntimeAggregate::new("r1");
301 agg.apply_command(RouteLifecycleCommand::Start).unwrap();
302 let err = agg
303 .apply_command(RouteLifecycleCommand::Remove)
304 .unwrap_err();
305 assert!(err.to_string().contains("invalid transition"));
306 }
307
308 #[test]
309 fn remove_from_stopped_emits_removed() {
310 let mut agg = RouteRuntimeAggregate::new("r1");
311 agg.apply_command(RouteLifecycleCommand::Start).unwrap();
312 agg.apply_command(RouteLifecycleCommand::Stop).unwrap();
313 let events = agg.apply_command(RouteLifecycleCommand::Remove).unwrap();
314 assert_eq!(events.len(), 1);
315 assert!(matches!(
316 &events[0],
317 RuntimeEvent::RouteRemoved { route_id } if route_id == "r1"
318 ));
319 }
320
321 #[test]
322 fn start_from_started_is_idempotent() {
323 let mut agg = RouteRuntimeAggregate::new("r1");
324 agg.apply_command(RouteLifecycleCommand::Start).unwrap();
325 let events = agg.apply_command(RouteLifecycleCommand::Start).unwrap();
326 assert!(events.is_empty());
327 assert_eq!(agg.state(), &RouteRuntimeState::Started);
328 }
329
330 #[test]
331 fn stop_from_stopped_is_idempotent() {
332 let mut agg = RouteRuntimeAggregate::new("r1");
333 agg.apply_command(RouteLifecycleCommand::Start).unwrap();
334 agg.apply_command(RouteLifecycleCommand::Stop).unwrap();
335 let events = agg.apply_command(RouteLifecycleCommand::Stop).unwrap();
336 assert!(events.is_empty());
337 assert_eq!(agg.state(), &RouteRuntimeState::Stopped);
338 }
339
340 #[test]
341 fn suspend_from_suspended_is_idempotent() {
342 let mut agg = RouteRuntimeAggregate::new("r1");
343 agg.apply_command(RouteLifecycleCommand::Start).unwrap();
344 agg.apply_command(RouteLifecycleCommand::Suspend).unwrap();
345 let events = agg.apply_command(RouteLifecycleCommand::Suspend).unwrap();
346 assert!(events.is_empty());
347 assert_eq!(agg.state(), &RouteRuntimeState::Suspended);
348 }
349
350 #[test]
351 fn resume_from_started_is_idempotent() {
352 let mut agg = RouteRuntimeAggregate::new("r1");
353 agg.apply_command(RouteLifecycleCommand::Start).unwrap();
354 let events = agg.apply_command(RouteLifecycleCommand::Resume).unwrap();
355 assert!(events.is_empty());
356 assert_eq!(agg.state(), &RouteRuntimeState::Started);
357 }
358
359 #[test]
362 fn begin_start_from_registered_transitions_to_starting() {
363 let mut agg = RouteRuntimeAggregate::new("r1");
364 let version_before = agg.version();
365 let events = agg.begin_start().unwrap();
366 assert_eq!(*agg.state(), RouteRuntimeState::Starting);
367 assert_eq!(
368 events,
369 vec![RuntimeEvent::RouteStartRequested {
370 route_id: "r1".into()
371 }]
372 );
373 assert_eq!(
374 agg.version(),
375 version_before + 1,
376 "version must increment by 1 in begin_start"
377 );
378 }
379
380 #[test]
381 fn begin_start_from_stopped_transitions_to_starting() {
382 let mut agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Stopped, 1);
383 let version_before = agg.version();
384 let events = agg.begin_start().unwrap();
385 assert_eq!(*agg.state(), RouteRuntimeState::Starting);
386 assert_eq!(events.len(), 1);
387 assert!(matches!(
388 events[0],
389 RuntimeEvent::RouteStartRequested { .. }
390 ));
391 assert_eq!(
392 agg.version(),
393 version_before + 1,
394 "version must increment by 1 in begin_start"
395 );
396 }
397
398 #[test]
399 fn begin_start_idempotent_on_starting() {
400 let mut agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Starting, 1);
401 let events = agg.begin_start().unwrap();
402 assert_eq!(*agg.state(), RouteRuntimeState::Starting);
403 assert!(events.is_empty());
404 }
405
406 #[test]
407 fn begin_start_idempotent_on_started() {
408 let mut agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Started, 2);
409 let events = agg.begin_start().unwrap();
410 assert_eq!(*agg.state(), RouteRuntimeState::Started);
411 assert!(events.is_empty());
412 }
413
414 #[test]
415 fn begin_start_rejects_invalid_states() {
416 for state in [
417 RouteRuntimeState::Suspended,
418 RouteRuntimeState::Failed("err".into()),
419 ] {
420 let mut agg = RouteRuntimeAggregate::from_snapshot("r1", state.clone(), 1);
421 assert!(agg.begin_start().is_err());
422 }
423 }
424
425 #[test]
428 fn confirm_start_from_starting_transitions_to_started() {
429 let mut agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Starting, 1);
430 let events = agg.confirm_start().unwrap();
431 assert_eq!(*agg.state(), RouteRuntimeState::Started);
432 assert_eq!(
433 events,
434 vec![RuntimeEvent::RouteStarted {
435 route_id: "r1".into()
436 }]
437 );
438 }
439
440 #[test]
441 fn confirm_start_idempotent_on_started() {
442 let mut agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Started, 2);
443 let events = agg.confirm_start().unwrap();
444 assert!(events.is_empty());
445 assert_eq!(*agg.state(), RouteRuntimeState::Started);
446 }
447
448 #[test]
449 fn confirm_start_rejects_non_starting() {
450 let mut agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Registered, 0);
451 assert!(agg.confirm_start().is_err());
452 }
453
454 #[test]
457 fn fail_from_any_state() {
458 for state in [
459 RouteRuntimeState::Registered,
460 RouteRuntimeState::Starting,
461 RouteRuntimeState::Started,
462 RouteRuntimeState::Stopped,
463 RouteRuntimeState::Suspended,
464 RouteRuntimeState::Stopping,
465 ] {
466 let mut agg = RouteRuntimeAggregate::from_snapshot("r1", state, 1);
467 let version_before = agg.version();
468 let events = agg.fail("crash".into());
469 assert_eq!(*agg.state(), RouteRuntimeState::Failed("crash".into()));
470 assert_eq!(
471 agg.version(),
472 version_before + 1,
473 "fail() must increment version to match replay"
474 );
475 assert_eq!(events.len(), 1);
476 assert!(
477 matches!(&events[0], RuntimeEvent::RouteFailed { route_id, error } if route_id == "r1" && error == "crash")
478 );
479 }
480 }
481
482 #[test]
483 fn state_label_covers_all_states() {
484 let agg = RouteRuntimeAggregate::new("r1");
485 assert_eq!(agg.state_label(), "Registered");
486 let agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Starting, 1);
487 assert_eq!(agg.state_label(), "Starting");
488 let agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Started, 2);
489 assert_eq!(agg.state_label(), "Started");
490 let agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Suspended, 2);
491 assert_eq!(agg.state_label(), "Suspended");
492 let agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Stopping, 2);
493 assert_eq!(agg.state_label(), "Stopping");
494 let agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Stopped, 2);
495 assert_eq!(agg.state_label(), "Stopped");
496 let agg =
497 RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Failed("e".into()), 2);
498 assert_eq!(agg.state_label(), "Failed");
499 }
500
501 #[test]
502 fn state_from_event_maps_all_variants() {
503 assert_eq!(
504 RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteRegistered {
505 route_id: "r".into()
506 }),
507 Some(RouteRuntimeState::Registered)
508 );
509 assert_eq!(
510 RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteStartRequested {
511 route_id: "r".into()
512 }),
513 Some(RouteRuntimeState::Starting)
514 );
515 assert_eq!(
516 RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteStarted {
517 route_id: "r".into()
518 }),
519 Some(RouteRuntimeState::Started)
520 );
521 assert_eq!(
522 RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteStopped {
523 route_id: "r".into()
524 }),
525 Some(RouteRuntimeState::Stopped)
526 );
527 assert_eq!(
528 RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteSuspended {
529 route_id: "r".into()
530 }),
531 Some(RouteRuntimeState::Suspended)
532 );
533 assert_eq!(
534 RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteResumed {
535 route_id: "r".into()
536 }),
537 Some(RouteRuntimeState::Started)
538 );
539 assert_eq!(
540 RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteFailed {
541 route_id: "r".into(),
542 error: "e".into()
543 }),
544 Some(RouteRuntimeState::Failed("e".into()))
545 );
546 assert_eq!(
547 RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteReloaded {
548 route_id: "r".into()
549 }),
550 Some(RouteRuntimeState::Started)
551 );
552 assert_eq!(
553 RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteRemoved {
554 route_id: "r".into()
555 }),
556 None
557 );
558 }
559}