camel_core/lifecycle/domain/
route_runtime.rs1use crate::lifecycle::domain::DomainError;
2use crate::lifecycle::domain::RuntimeEvent;
3
4#[derive(Debug, Clone, PartialEq, Eq)]
5pub enum RouteRuntimeState {
6 Registered,
7 Starting,
8 Started,
9 Suspended,
10 Stopping,
11 Stopped,
12 Failed(String),
13}
14
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub enum RouteLifecycleCommand {
17 Start,
18 Stop,
19 Suspend,
20 Resume,
21 Reload,
22 Fail(String),
23 Remove,
24}
25
26#[derive(Debug, Clone)]
27pub struct RouteRuntimeAggregate {
28 route_id: String,
29 state: RouteRuntimeState,
30 version: u64,
31}
32
33impl RouteRuntimeAggregate {
34 pub fn new(route_id: impl Into<String>) -> Self {
35 Self {
36 route_id: route_id.into(),
37 state: RouteRuntimeState::Registered,
38 version: 0,
39 }
40 }
41
42 pub fn register(route_id: impl Into<String>) -> (Self, Vec<RuntimeEvent>) {
45 let route_id = route_id.into();
46 let aggregate = Self::new(route_id.clone());
47 let events = vec![RuntimeEvent::RouteRegistered { route_id }];
48 (aggregate, events)
49 }
50
51 pub fn from_snapshot(
52 route_id: impl Into<String>,
53 state: RouteRuntimeState,
54 version: u64,
55 ) -> Self {
56 Self {
57 route_id: route_id.into(),
58 state,
59 version,
60 }
61 }
62
63 pub fn state(&self) -> &RouteRuntimeState {
64 &self.state
65 }
66
67 pub fn version(&self) -> u64 {
68 self.version
69 }
70
71 pub fn route_id(&self) -> &str {
72 &self.route_id
73 }
74
75 pub fn state_from_event(event: &RuntimeEvent) -> Option<RouteRuntimeState> {
78 match event {
79 RuntimeEvent::RouteRegistered { .. } => Some(RouteRuntimeState::Registered),
80 RuntimeEvent::RouteStartRequested { .. } => Some(RouteRuntimeState::Starting),
81 RuntimeEvent::RouteStarted { .. } => Some(RouteRuntimeState::Started),
82 RuntimeEvent::RouteStopped { .. } => Some(RouteRuntimeState::Stopped),
83 RuntimeEvent::RouteSuspended { .. } => Some(RouteRuntimeState::Suspended),
84 RuntimeEvent::RouteResumed { .. } => Some(RouteRuntimeState::Started),
85 RuntimeEvent::RouteFailed { error, .. } => {
86 Some(RouteRuntimeState::Failed(error.clone()))
87 }
88 RuntimeEvent::RouteReloaded { .. } => Some(RouteRuntimeState::Started),
89 RuntimeEvent::RouteRemoved { .. } => None,
90 }
91 }
92
93 pub fn apply_command(
94 &mut self,
95 cmd: RouteLifecycleCommand,
96 ) -> Result<Vec<RuntimeEvent>, DomainError> {
97 let invalid = |from: &RouteRuntimeState, to: &str| DomainError::InvalidTransition {
98 from: format!("{from:?}"),
99 to: to.to_string(),
100 };
101
102 let events = match cmd {
103 RouteLifecycleCommand::Start => match self.state {
104 RouteRuntimeState::Registered | RouteRuntimeState::Stopped => {
105 self.state = RouteRuntimeState::Started;
106 vec![
107 RuntimeEvent::RouteStartRequested {
108 route_id: self.route_id.clone(),
109 },
110 RuntimeEvent::RouteStarted {
111 route_id: self.route_id.clone(),
112 },
113 ]
114 }
115 _ => return Err(invalid(&self.state, "Started")),
116 },
117 RouteLifecycleCommand::Stop => match self.state {
118 RouteRuntimeState::Started
119 | RouteRuntimeState::Suspended
120 | RouteRuntimeState::Failed(_) => {
121 self.state = RouteRuntimeState::Stopped;
122 vec![RuntimeEvent::RouteStopped {
123 route_id: self.route_id.clone(),
124 }]
125 }
126 _ => return Err(invalid(&self.state, "Stopped")),
127 },
128 RouteLifecycleCommand::Suspend => match self.state {
129 RouteRuntimeState::Started => {
130 self.state = RouteRuntimeState::Suspended;
131 vec![RuntimeEvent::RouteSuspended {
132 route_id: self.route_id.clone(),
133 }]
134 }
135 _ => return Err(invalid(&self.state, "Suspended")),
136 },
137 RouteLifecycleCommand::Resume => match self.state {
138 RouteRuntimeState::Suspended => {
139 self.state = RouteRuntimeState::Started;
140 vec![RuntimeEvent::RouteResumed {
141 route_id: self.route_id.clone(),
142 }]
143 }
144 _ => return Err(invalid(&self.state, "Started")),
145 },
146 RouteLifecycleCommand::Reload => match self.state {
147 RouteRuntimeState::Started
148 | RouteRuntimeState::Suspended
149 | RouteRuntimeState::Stopped
150 | RouteRuntimeState::Failed(_) => {
151 self.state = RouteRuntimeState::Started;
152 vec![RuntimeEvent::RouteReloaded {
153 route_id: self.route_id.clone(),
154 }]
155 }
156 _ => return Err(invalid(&self.state, "Started")),
157 },
158 RouteLifecycleCommand::Fail(error) => {
159 self.state = RouteRuntimeState::Failed(error.clone());
160 vec![RuntimeEvent::RouteFailed {
161 route_id: self.route_id.clone(),
162 error,
163 }]
164 }
165 RouteLifecycleCommand::Remove => match self.state {
166 RouteRuntimeState::Registered | RouteRuntimeState::Stopped => {
167 vec![RuntimeEvent::RouteRemoved {
168 route_id: self.route_id.clone(),
169 }]
170 }
171 _ => {
172 return Err(invalid(&self.state, "Removed"));
173 }
174 },
175 };
176
177 self.version += 1;
178 Ok(events)
179 }
180}
181
182#[cfg(test)]
183mod tests {
184 use super::*;
185
186 #[test]
187 fn cannot_suspend_when_stopped() {
188 let mut agg = RouteRuntimeAggregate::new("r1");
189 agg.state = RouteRuntimeState::Stopped;
190 let err = agg
191 .apply_command(RouteLifecycleCommand::Suspend)
192 .unwrap_err();
193 assert!(err.to_string().contains("invalid transition"));
194 }
195
196 #[test]
197 fn start_emits_route_started_event() {
198 let mut agg = RouteRuntimeAggregate::new("r1");
199 let events = agg.apply_command(RouteLifecycleCommand::Start).unwrap();
200 assert!(
201 events
202 .iter()
203 .any(|e| matches!(e, RuntimeEvent::RouteStarted { route_id } if route_id == "r1"))
204 );
205 }
206
207 #[test]
208 fn register_returns_aggregate_and_event() {
209 let (agg, events) = RouteRuntimeAggregate::register("r1");
210 assert_eq!(agg.route_id(), "r1");
211 assert_eq!(agg.state(), &RouteRuntimeState::Registered);
212 assert_eq!(events.len(), 1);
213 assert!(matches!(
214 &events[0],
215 RuntimeEvent::RouteRegistered { route_id } if route_id == "r1"
216 ));
217 }
218
219 #[test]
220 fn remove_from_registered_emits_removed() {
221 let mut agg = RouteRuntimeAggregate::new("r1");
222 let events = agg.apply_command(RouteLifecycleCommand::Remove).unwrap();
223 assert_eq!(events.len(), 1);
224 assert!(matches!(
225 &events[0],
226 RuntimeEvent::RouteRemoved { route_id } if route_id == "r1"
227 ));
228 }
229
230 #[test]
231 fn remove_from_started_is_invalid() {
232 let mut agg = RouteRuntimeAggregate::new("r1");
233 agg.apply_command(RouteLifecycleCommand::Start).unwrap();
234 let err = agg
235 .apply_command(RouteLifecycleCommand::Remove)
236 .unwrap_err();
237 assert!(err.to_string().contains("invalid transition"));
238 }
239
240 #[test]
241 fn remove_from_stopped_emits_removed() {
242 let mut agg = RouteRuntimeAggregate::new("r1");
243 agg.apply_command(RouteLifecycleCommand::Start).unwrap();
244 agg.apply_command(RouteLifecycleCommand::Stop).unwrap();
245 let events = agg.apply_command(RouteLifecycleCommand::Remove).unwrap();
246 assert_eq!(events.len(), 1);
247 assert!(matches!(
248 &events[0],
249 RuntimeEvent::RouteRemoved { route_id } if route_id == "r1"
250 ));
251 }
252
253 #[test]
254 fn state_from_event_maps_all_variants() {
255 assert_eq!(
256 RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteRegistered {
257 route_id: "r".into()
258 }),
259 Some(RouteRuntimeState::Registered)
260 );
261 assert_eq!(
262 RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteStartRequested {
263 route_id: "r".into()
264 }),
265 Some(RouteRuntimeState::Starting)
266 );
267 assert_eq!(
268 RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteStarted {
269 route_id: "r".into()
270 }),
271 Some(RouteRuntimeState::Started)
272 );
273 assert_eq!(
274 RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteStopped {
275 route_id: "r".into()
276 }),
277 Some(RouteRuntimeState::Stopped)
278 );
279 assert_eq!(
280 RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteSuspended {
281 route_id: "r".into()
282 }),
283 Some(RouteRuntimeState::Suspended)
284 );
285 assert_eq!(
286 RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteResumed {
287 route_id: "r".into()
288 }),
289 Some(RouteRuntimeState::Started)
290 );
291 assert_eq!(
292 RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteFailed {
293 route_id: "r".into(),
294 error: "e".into()
295 }),
296 Some(RouteRuntimeState::Failed("e".into()))
297 );
298 assert_eq!(
299 RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteReloaded {
300 route_id: "r".into()
301 }),
302 Some(RouteRuntimeState::Started)
303 );
304 assert_eq!(
305 RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteRemoved {
306 route_id: "r".into()
307 }),
308 None
309 );
310 }
311}