use crate::lifecycle::domain::DomainError;
use crate::lifecycle::domain::RuntimeEvent;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RouteRuntimeState {
Registered,
Starting,
Started,
Suspended,
Stopping,
Stopped,
Failed(String),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RouteLifecycleCommand {
Start,
Stop,
Suspend,
Resume,
Reload,
Fail(String),
Remove,
}
#[derive(Debug, Clone)]
pub struct RouteRuntimeAggregate {
route_id: String,
state: RouteRuntimeState,
version: u64,
}
impl RouteRuntimeAggregate {
pub fn new(route_id: impl Into<String>) -> Self {
Self {
route_id: route_id.into(),
state: RouteRuntimeState::Registered,
version: 0,
}
}
pub fn register(route_id: impl Into<String>) -> (Self, Vec<RuntimeEvent>) {
let route_id = route_id.into();
let aggregate = Self::new(route_id.clone());
let events = vec![RuntimeEvent::RouteRegistered { route_id }];
(aggregate, events)
}
pub fn from_snapshot(
route_id: impl Into<String>,
state: RouteRuntimeState,
version: u64,
) -> Self {
Self {
route_id: route_id.into(),
state,
version,
}
}
pub fn state(&self) -> &RouteRuntimeState {
&self.state
}
pub fn version(&self) -> u64 {
self.version
}
pub fn route_id(&self) -> &str {
&self.route_id
}
pub fn state_from_event(event: &RuntimeEvent) -> Option<RouteRuntimeState> {
match event {
RuntimeEvent::RouteRegistered { .. } => Some(RouteRuntimeState::Registered),
RuntimeEvent::RouteStartRequested { .. } => Some(RouteRuntimeState::Starting),
RuntimeEvent::RouteStarted { .. } => Some(RouteRuntimeState::Started),
RuntimeEvent::RouteStopped { .. } => Some(RouteRuntimeState::Stopped),
RuntimeEvent::RouteSuspended { .. } => Some(RouteRuntimeState::Suspended),
RuntimeEvent::RouteResumed { .. } => Some(RouteRuntimeState::Started),
RuntimeEvent::RouteFailed { error, .. } => {
Some(RouteRuntimeState::Failed(error.clone()))
}
RuntimeEvent::RouteReloaded { .. } => Some(RouteRuntimeState::Started),
RuntimeEvent::RouteRemoved { .. } => None,
}
}
pub fn apply_command(
&mut self,
cmd: RouteLifecycleCommand,
) -> Result<Vec<RuntimeEvent>, DomainError> {
let invalid = |from: &RouteRuntimeState, to: &str| DomainError::InvalidTransition {
from: format!("{from:?}"),
to: to.to_string(),
};
let events = match cmd {
RouteLifecycleCommand::Start => match self.state {
RouteRuntimeState::Registered | RouteRuntimeState::Stopped => {
self.state = RouteRuntimeState::Started;
vec![
RuntimeEvent::RouteStartRequested {
route_id: self.route_id.clone(),
},
RuntimeEvent::RouteStarted {
route_id: self.route_id.clone(),
},
]
}
RouteRuntimeState::Started => vec![],
_ => return Err(invalid(&self.state, "Started")),
},
RouteLifecycleCommand::Stop => match self.state {
RouteRuntimeState::Started
| RouteRuntimeState::Suspended
| RouteRuntimeState::Failed(_) => {
self.state = RouteRuntimeState::Stopped;
vec![RuntimeEvent::RouteStopped {
route_id: self.route_id.clone(),
}]
}
RouteRuntimeState::Stopped => vec![],
_ => return Err(invalid(&self.state, "Stopped")),
},
RouteLifecycleCommand::Suspend => match self.state {
RouteRuntimeState::Started => {
self.state = RouteRuntimeState::Suspended;
vec![RuntimeEvent::RouteSuspended {
route_id: self.route_id.clone(),
}]
}
RouteRuntimeState::Suspended => vec![],
_ => return Err(invalid(&self.state, "Suspended")),
},
RouteLifecycleCommand::Resume => match self.state {
RouteRuntimeState::Suspended => {
self.state = RouteRuntimeState::Started;
vec![RuntimeEvent::RouteResumed {
route_id: self.route_id.clone(),
}]
}
RouteRuntimeState::Started => vec![],
_ => return Err(invalid(&self.state, "Started")),
},
RouteLifecycleCommand::Reload => match self.state {
RouteRuntimeState::Started
| RouteRuntimeState::Suspended
| RouteRuntimeState::Stopped
| RouteRuntimeState::Failed(_) => {
self.state = RouteRuntimeState::Started;
vec![RuntimeEvent::RouteReloaded {
route_id: self.route_id.clone(),
}]
}
_ => return Err(invalid(&self.state, "Started")),
},
RouteLifecycleCommand::Fail(error) => {
self.state = RouteRuntimeState::Failed(error.clone());
vec![RuntimeEvent::RouteFailed {
route_id: self.route_id.clone(),
error,
}]
}
RouteLifecycleCommand::Remove => match self.state {
RouteRuntimeState::Registered | RouteRuntimeState::Stopped => {
vec![RuntimeEvent::RouteRemoved {
route_id: self.route_id.clone(),
}]
}
_ => {
return Err(invalid(&self.state, "Removed"));
}
},
};
self.version += 1;
Ok(events)
}
pub fn begin_start(&mut self) -> Result<Vec<RuntimeEvent>, DomainError> {
match self.state {
RouteRuntimeState::Registered | RouteRuntimeState::Stopped => {
self.version += 1;
self.state = RouteRuntimeState::Starting;
Ok(vec![RuntimeEvent::RouteStartRequested {
route_id: self.route_id.clone(),
}])
}
RouteRuntimeState::Starting | RouteRuntimeState::Started => Ok(vec![]),
_ => Err(DomainError::InvalidTransition {
from: format!("{:?}", self.state),
to: "Starting".to_string(),
}),
}
}
pub fn confirm_start(&mut self) -> Result<Vec<RuntimeEvent>, DomainError> {
match self.state {
RouteRuntimeState::Starting => {
self.state = RouteRuntimeState::Started;
Ok(vec![RuntimeEvent::RouteStarted {
route_id: self.route_id.clone(),
}])
}
RouteRuntimeState::Started => Ok(vec![]),
_ => Err(DomainError::InvalidTransition {
from: format!("{:?}", self.state),
to: "Started".to_string(),
}),
}
}
pub fn fail(&mut self, error: String) -> Vec<RuntimeEvent> {
self.state = RouteRuntimeState::Failed(error.clone());
self.version += 1;
vec![RuntimeEvent::RouteFailed {
route_id: self.route_id.clone(),
error,
}]
}
pub fn state_label(&self) -> &'static str {
match self.state {
RouteRuntimeState::Registered => "Registered",
RouteRuntimeState::Starting => "Starting",
RouteRuntimeState::Started => "Started",
RouteRuntimeState::Suspended => "Suspended",
RouteRuntimeState::Stopping => "Stopping",
RouteRuntimeState::Stopped => "Stopped",
RouteRuntimeState::Failed(_) => "Failed",
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn cannot_suspend_when_stopped() {
let mut agg = RouteRuntimeAggregate::new("r1");
agg.state = RouteRuntimeState::Stopped;
let err = agg
.apply_command(RouteLifecycleCommand::Suspend)
.unwrap_err();
assert!(err.to_string().contains("invalid transition"));
}
#[test]
fn start_emits_route_started_event() {
let mut agg = RouteRuntimeAggregate::new("r1");
let events = agg.apply_command(RouteLifecycleCommand::Start).unwrap();
assert!(
events
.iter()
.any(|e| matches!(e, RuntimeEvent::RouteStarted { route_id } if route_id == "r1"))
);
}
#[test]
fn register_returns_aggregate_and_event() {
let (agg, events) = RouteRuntimeAggregate::register("r1");
assert_eq!(agg.route_id(), "r1");
assert_eq!(agg.state(), &RouteRuntimeState::Registered);
assert_eq!(events.len(), 1);
assert!(matches!(
&events[0],
RuntimeEvent::RouteRegistered { route_id } if route_id == "r1"
));
}
#[test]
fn remove_from_registered_emits_removed() {
let mut agg = RouteRuntimeAggregate::new("r1");
let events = agg.apply_command(RouteLifecycleCommand::Remove).unwrap();
assert_eq!(events.len(), 1);
assert!(matches!(
&events[0],
RuntimeEvent::RouteRemoved { route_id } if route_id == "r1"
));
}
#[test]
fn remove_from_started_is_invalid() {
let mut agg = RouteRuntimeAggregate::new("r1");
agg.apply_command(RouteLifecycleCommand::Start).unwrap();
let err = agg
.apply_command(RouteLifecycleCommand::Remove)
.unwrap_err();
assert!(err.to_string().contains("invalid transition"));
}
#[test]
fn remove_from_stopped_emits_removed() {
let mut agg = RouteRuntimeAggregate::new("r1");
agg.apply_command(RouteLifecycleCommand::Start).unwrap();
agg.apply_command(RouteLifecycleCommand::Stop).unwrap();
let events = agg.apply_command(RouteLifecycleCommand::Remove).unwrap();
assert_eq!(events.len(), 1);
assert!(matches!(
&events[0],
RuntimeEvent::RouteRemoved { route_id } if route_id == "r1"
));
}
#[test]
fn start_from_started_is_idempotent() {
let mut agg = RouteRuntimeAggregate::new("r1");
agg.apply_command(RouteLifecycleCommand::Start).unwrap();
let events = agg.apply_command(RouteLifecycleCommand::Start).unwrap();
assert!(events.is_empty());
assert_eq!(agg.state(), &RouteRuntimeState::Started);
}
#[test]
fn stop_from_stopped_is_idempotent() {
let mut agg = RouteRuntimeAggregate::new("r1");
agg.apply_command(RouteLifecycleCommand::Start).unwrap();
agg.apply_command(RouteLifecycleCommand::Stop).unwrap();
let events = agg.apply_command(RouteLifecycleCommand::Stop).unwrap();
assert!(events.is_empty());
assert_eq!(agg.state(), &RouteRuntimeState::Stopped);
}
#[test]
fn suspend_from_suspended_is_idempotent() {
let mut agg = RouteRuntimeAggregate::new("r1");
agg.apply_command(RouteLifecycleCommand::Start).unwrap();
agg.apply_command(RouteLifecycleCommand::Suspend).unwrap();
let events = agg.apply_command(RouteLifecycleCommand::Suspend).unwrap();
assert!(events.is_empty());
assert_eq!(agg.state(), &RouteRuntimeState::Suspended);
}
#[test]
fn resume_from_started_is_idempotent() {
let mut agg = RouteRuntimeAggregate::new("r1");
agg.apply_command(RouteLifecycleCommand::Start).unwrap();
let events = agg.apply_command(RouteLifecycleCommand::Resume).unwrap();
assert!(events.is_empty());
assert_eq!(agg.state(), &RouteRuntimeState::Started);
}
#[test]
fn begin_start_from_registered_transitions_to_starting() {
let mut agg = RouteRuntimeAggregate::new("r1");
let version_before = agg.version();
let events = agg.begin_start().unwrap();
assert_eq!(*agg.state(), RouteRuntimeState::Starting);
assert_eq!(
events,
vec![RuntimeEvent::RouteStartRequested {
route_id: "r1".into()
}]
);
assert_eq!(
agg.version(),
version_before + 1,
"version must increment by 1 in begin_start"
);
}
#[test]
fn begin_start_from_stopped_transitions_to_starting() {
let mut agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Stopped, 1);
let version_before = agg.version();
let events = agg.begin_start().unwrap();
assert_eq!(*agg.state(), RouteRuntimeState::Starting);
assert_eq!(events.len(), 1);
assert!(matches!(
events[0],
RuntimeEvent::RouteStartRequested { .. }
));
assert_eq!(
agg.version(),
version_before + 1,
"version must increment by 1 in begin_start"
);
}
#[test]
fn begin_start_idempotent_on_starting() {
let mut agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Starting, 1);
let events = agg.begin_start().unwrap();
assert_eq!(*agg.state(), RouteRuntimeState::Starting);
assert!(events.is_empty());
}
#[test]
fn begin_start_idempotent_on_started() {
let mut agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Started, 2);
let events = agg.begin_start().unwrap();
assert_eq!(*agg.state(), RouteRuntimeState::Started);
assert!(events.is_empty());
}
#[test]
fn begin_start_rejects_invalid_states() {
for state in [
RouteRuntimeState::Suspended,
RouteRuntimeState::Failed("err".into()),
] {
let mut agg = RouteRuntimeAggregate::from_snapshot("r1", state.clone(), 1);
assert!(agg.begin_start().is_err());
}
}
#[test]
fn confirm_start_from_starting_transitions_to_started() {
let mut agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Starting, 1);
let events = agg.confirm_start().unwrap();
assert_eq!(*agg.state(), RouteRuntimeState::Started);
assert_eq!(
events,
vec![RuntimeEvent::RouteStarted {
route_id: "r1".into()
}]
);
}
#[test]
fn confirm_start_idempotent_on_started() {
let mut agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Started, 2);
let events = agg.confirm_start().unwrap();
assert!(events.is_empty());
assert_eq!(*agg.state(), RouteRuntimeState::Started);
}
#[test]
fn confirm_start_rejects_non_starting() {
let mut agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Registered, 0);
assert!(agg.confirm_start().is_err());
}
#[test]
fn fail_from_any_state() {
for state in [
RouteRuntimeState::Registered,
RouteRuntimeState::Starting,
RouteRuntimeState::Started,
RouteRuntimeState::Stopped,
RouteRuntimeState::Suspended,
RouteRuntimeState::Stopping,
] {
let mut agg = RouteRuntimeAggregate::from_snapshot("r1", state, 1);
let version_before = agg.version();
let events = agg.fail("crash".into());
assert_eq!(*agg.state(), RouteRuntimeState::Failed("crash".into()));
assert_eq!(
agg.version(),
version_before + 1,
"fail() must increment version to match replay"
);
assert_eq!(events.len(), 1);
assert!(
matches!(&events[0], RuntimeEvent::RouteFailed { route_id, error } if route_id == "r1" && error == "crash")
);
}
}
#[test]
fn state_label_covers_all_states() {
let agg = RouteRuntimeAggregate::new("r1");
assert_eq!(agg.state_label(), "Registered");
let agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Starting, 1);
assert_eq!(agg.state_label(), "Starting");
let agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Started, 2);
assert_eq!(agg.state_label(), "Started");
let agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Suspended, 2);
assert_eq!(agg.state_label(), "Suspended");
let agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Stopping, 2);
assert_eq!(agg.state_label(), "Stopping");
let agg = RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Stopped, 2);
assert_eq!(agg.state_label(), "Stopped");
let agg =
RouteRuntimeAggregate::from_snapshot("r1", RouteRuntimeState::Failed("e".into()), 2);
assert_eq!(agg.state_label(), "Failed");
}
#[test]
fn state_from_event_maps_all_variants() {
assert_eq!(
RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteRegistered {
route_id: "r".into()
}),
Some(RouteRuntimeState::Registered)
);
assert_eq!(
RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteStartRequested {
route_id: "r".into()
}),
Some(RouteRuntimeState::Starting)
);
assert_eq!(
RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteStarted {
route_id: "r".into()
}),
Some(RouteRuntimeState::Started)
);
assert_eq!(
RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteStopped {
route_id: "r".into()
}),
Some(RouteRuntimeState::Stopped)
);
assert_eq!(
RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteSuspended {
route_id: "r".into()
}),
Some(RouteRuntimeState::Suspended)
);
assert_eq!(
RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteResumed {
route_id: "r".into()
}),
Some(RouteRuntimeState::Started)
);
assert_eq!(
RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteFailed {
route_id: "r".into(),
error: "e".into()
}),
Some(RouteRuntimeState::Failed("e".into()))
);
assert_eq!(
RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteReloaded {
route_id: "r".into()
}),
Some(RouteRuntimeState::Started)
);
assert_eq!(
RouteRuntimeAggregate::state_from_event(&RuntimeEvent::RouteRemoved {
route_id: "r".into()
}),
None
);
}
}