camel_core/lifecycle/domain/
route_runtime.rs1use crate::CamelError;
2use crate::lifecycle::domain::RuntimeEvent;
3
4#[derive(Debug, Clone, PartialEq, Eq)]
5pub enum RouteRuntimeState {
6 Registered,
7 Starting,
8 Started,
9 Suspended,
10 Stopping,
11 Stopped,
12 Failed(String),
13}
14
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub enum RouteLifecycleCommand {
17 Start,
18 Stop,
19 Suspend,
20 Resume,
21 Reload,
22 Fail(String),
23}
24
25#[derive(Debug, Clone)]
26pub struct RouteRuntimeAggregate {
27 route_id: String,
28 state: RouteRuntimeState,
29 version: u64,
30}
31
32impl RouteRuntimeAggregate {
33 pub fn new(route_id: impl Into<String>) -> Self {
34 Self {
35 route_id: route_id.into(),
36 state: RouteRuntimeState::Registered,
37 version: 0,
38 }
39 }
40
41 pub fn from_snapshot(
42 route_id: impl Into<String>,
43 state: RouteRuntimeState,
44 version: u64,
45 ) -> Self {
46 Self {
47 route_id: route_id.into(),
48 state,
49 version,
50 }
51 }
52
53 pub fn state(&self) -> &RouteRuntimeState {
54 &self.state
55 }
56
57 pub fn version(&self) -> u64 {
58 self.version
59 }
60
61 pub fn route_id(&self) -> &str {
62 &self.route_id
63 }
64
65 pub fn apply_command(
66 &mut self,
67 cmd: RouteLifecycleCommand,
68 ) -> Result<Vec<RuntimeEvent>, CamelError> {
69 let invalid = |from: &RouteRuntimeState, to: &str| {
70 CamelError::ProcessorError(format!("invalid transition: {from:?} -> {to}"))
71 };
72
73 let events = match cmd {
74 RouteLifecycleCommand::Start => match self.state {
75 RouteRuntimeState::Registered | RouteRuntimeState::Stopped => {
76 self.state = RouteRuntimeState::Started;
77 vec![
78 RuntimeEvent::RouteStartRequested {
79 route_id: self.route_id.clone(),
80 },
81 RuntimeEvent::RouteStarted {
82 route_id: self.route_id.clone(),
83 },
84 ]
85 }
86 _ => return Err(invalid(&self.state, "Started")),
87 },
88 RouteLifecycleCommand::Stop => match self.state {
89 RouteRuntimeState::Started
90 | RouteRuntimeState::Suspended
91 | RouteRuntimeState::Failed(_) => {
92 self.state = RouteRuntimeState::Stopped;
93 vec![RuntimeEvent::RouteStopped {
94 route_id: self.route_id.clone(),
95 }]
96 }
97 _ => return Err(invalid(&self.state, "Stopped")),
98 },
99 RouteLifecycleCommand::Suspend => match self.state {
100 RouteRuntimeState::Started => {
101 self.state = RouteRuntimeState::Suspended;
102 vec![RuntimeEvent::RouteSuspended {
103 route_id: self.route_id.clone(),
104 }]
105 }
106 _ => return Err(invalid(&self.state, "Suspended")),
107 },
108 RouteLifecycleCommand::Resume => match self.state {
109 RouteRuntimeState::Suspended => {
110 self.state = RouteRuntimeState::Started;
111 vec![RuntimeEvent::RouteResumed {
112 route_id: self.route_id.clone(),
113 }]
114 }
115 _ => return Err(invalid(&self.state, "Started")),
116 },
117 RouteLifecycleCommand::Reload => match self.state {
118 RouteRuntimeState::Started
119 | RouteRuntimeState::Suspended
120 | RouteRuntimeState::Stopped
121 | RouteRuntimeState::Failed(_) => {
122 self.state = RouteRuntimeState::Started;
123 vec![RuntimeEvent::RouteReloaded {
124 route_id: self.route_id.clone(),
125 }]
126 }
127 _ => return Err(invalid(&self.state, "Started")),
128 },
129 RouteLifecycleCommand::Fail(error) => {
130 self.state = RouteRuntimeState::Failed(error.clone());
131 vec![RuntimeEvent::RouteFailed {
132 route_id: self.route_id.clone(),
133 error,
134 }]
135 }
136 };
137
138 self.version += 1;
139 Ok(events)
140 }
141}
142
143#[cfg(test)]
144mod tests {
145 use super::*;
146
147 #[test]
148 fn cannot_suspend_when_stopped() {
149 let mut agg = RouteRuntimeAggregate::new("r1");
150 agg.state = RouteRuntimeState::Stopped;
151 let err = agg
152 .apply_command(RouteLifecycleCommand::Suspend)
153 .unwrap_err();
154 assert!(err.to_string().contains("invalid transition"));
155 }
156
157 #[test]
158 fn start_emits_route_started_event() {
159 let mut agg = RouteRuntimeAggregate::new("r1");
160 let events = agg.apply_command(RouteLifecycleCommand::Start).unwrap();
161 assert!(
162 events
163 .iter()
164 .any(|e| matches!(e, RuntimeEvent::RouteStarted { route_id } if route_id == "r1"))
165 );
166 }
167}