Skip to main content

camel_core/lifecycle/domain/
route_runtime.rs

1use crate::CamelError;
2use crate::lifecycle::domain::RuntimeEvent;
3
4#[derive(Debug, Clone, PartialEq, Eq)]
5pub enum RouteRuntimeState {
6    Registered,
7    Starting,
8    Started,
9    Suspended,
10    Stopping,
11    Stopped,
12    Failed(String),
13}
14
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub enum RouteLifecycleCommand {
17    Start,
18    Stop,
19    Suspend,
20    Resume,
21    Reload,
22    Fail(String),
23}
24
25#[derive(Debug, Clone)]
26pub struct RouteRuntimeAggregate {
27    route_id: String,
28    state: RouteRuntimeState,
29    version: u64,
30}
31
32impl RouteRuntimeAggregate {
33    pub fn new(route_id: impl Into<String>) -> Self {
34        Self {
35            route_id: route_id.into(),
36            state: RouteRuntimeState::Registered,
37            version: 0,
38        }
39    }
40
41    pub fn from_snapshot(
42        route_id: impl Into<String>,
43        state: RouteRuntimeState,
44        version: u64,
45    ) -> Self {
46        Self {
47            route_id: route_id.into(),
48            state,
49            version,
50        }
51    }
52
53    pub fn state(&self) -> &RouteRuntimeState {
54        &self.state
55    }
56
57    pub fn version(&self) -> u64 {
58        self.version
59    }
60
61    pub fn route_id(&self) -> &str {
62        &self.route_id
63    }
64
65    pub fn apply_command(
66        &mut self,
67        cmd: RouteLifecycleCommand,
68    ) -> Result<Vec<RuntimeEvent>, CamelError> {
69        let invalid = |from: &RouteRuntimeState, to: &str| {
70            CamelError::ProcessorError(format!("invalid transition: {from:?} -> {to}"))
71        };
72
73        let events = match cmd {
74            RouteLifecycleCommand::Start => match self.state {
75                RouteRuntimeState::Registered | RouteRuntimeState::Stopped => {
76                    self.state = RouteRuntimeState::Started;
77                    vec![
78                        RuntimeEvent::RouteStartRequested {
79                            route_id: self.route_id.clone(),
80                        },
81                        RuntimeEvent::RouteStarted {
82                            route_id: self.route_id.clone(),
83                        },
84                    ]
85                }
86                _ => return Err(invalid(&self.state, "Started")),
87            },
88            RouteLifecycleCommand::Stop => match self.state {
89                RouteRuntimeState::Started
90                | RouteRuntimeState::Suspended
91                | RouteRuntimeState::Failed(_) => {
92                    self.state = RouteRuntimeState::Stopped;
93                    vec![RuntimeEvent::RouteStopped {
94                        route_id: self.route_id.clone(),
95                    }]
96                }
97                _ => return Err(invalid(&self.state, "Stopped")),
98            },
99            RouteLifecycleCommand::Suspend => match self.state {
100                RouteRuntimeState::Started => {
101                    self.state = RouteRuntimeState::Suspended;
102                    vec![RuntimeEvent::RouteSuspended {
103                        route_id: self.route_id.clone(),
104                    }]
105                }
106                _ => return Err(invalid(&self.state, "Suspended")),
107            },
108            RouteLifecycleCommand::Resume => match self.state {
109                RouteRuntimeState::Suspended => {
110                    self.state = RouteRuntimeState::Started;
111                    vec![RuntimeEvent::RouteResumed {
112                        route_id: self.route_id.clone(),
113                    }]
114                }
115                _ => return Err(invalid(&self.state, "Started")),
116            },
117            RouteLifecycleCommand::Reload => match self.state {
118                RouteRuntimeState::Started
119                | RouteRuntimeState::Suspended
120                | RouteRuntimeState::Stopped
121                | RouteRuntimeState::Failed(_) => {
122                    self.state = RouteRuntimeState::Started;
123                    vec![RuntimeEvent::RouteReloaded {
124                        route_id: self.route_id.clone(),
125                    }]
126                }
127                _ => return Err(invalid(&self.state, "Started")),
128            },
129            RouteLifecycleCommand::Fail(error) => {
130                self.state = RouteRuntimeState::Failed(error.clone());
131                vec![RuntimeEvent::RouteFailed {
132                    route_id: self.route_id.clone(),
133                    error,
134                }]
135            }
136        };
137
138        self.version += 1;
139        Ok(events)
140    }
141}
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146
147    #[test]
148    fn cannot_suspend_when_stopped() {
149        let mut agg = RouteRuntimeAggregate::new("r1");
150        agg.state = RouteRuntimeState::Stopped;
151        let err = agg
152            .apply_command(RouteLifecycleCommand::Suspend)
153            .unwrap_err();
154        assert!(err.to_string().contains("invalid transition"));
155    }
156
157    #[test]
158    fn start_emits_route_started_event() {
159        let mut agg = RouteRuntimeAggregate::new("r1");
160        let events = agg.apply_command(RouteLifecycleCommand::Start).unwrap();
161        assert!(
162            events
163                .iter()
164                .any(|e| matches!(e, RuntimeEvent::RouteStarted { route_id } if route_id == "r1"))
165        );
166    }
167}