use mako_engine::{
builder::EngineBuilder,
error::{EngineError, WorkflowError},
event_store::InMemoryEventStore,
ids::TenantId,
snapshot::InMemorySnapshotStore,
version::WorkflowId,
workflow::{CommandContext, EventPayload, Workflow, WorkflowOutput},
};
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(tag = "type", content = "data")]
enum ActivationEvent {
Registered { name: String },
Activated,
Deactivated { reason: String },
}
impl EventPayload for ActivationEvent {
fn event_type(&self) -> &'static str {
match self {
Self::Registered { .. } => "Registered",
Self::Activated => "Activated",
Self::Deactivated { .. } => "Deactivated",
}
}
}
#[derive(Debug, Default, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
enum ActivationStatus {
#[default]
New,
Registered,
Active,
Inactive,
}
impl ActivationStatus {
fn as_str(&self) -> &'static str {
match self {
Self::New => "New",
Self::Registered => "Registered",
Self::Active => "Active",
Self::Inactive => "Inactive",
}
}
}
#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)]
struct ActivationState {
status: ActivationStatus,
name: Option<String>,
}
#[derive(Clone)]
enum ActivationCommand {
Register { name: String },
Activate,
Deactivate { reason: String },
}
impl mako_engine::workflow::CommandPayload for ActivationCommand {}
struct ActivationWorkflow;
impl Workflow for ActivationWorkflow {
type State = ActivationState;
type Event = ActivationEvent;
type Command = ActivationCommand;
fn apply(mut state: Self::State, event: &Self::Event) -> Self::State {
match event {
ActivationEvent::Registered { name } => {
state.status = ActivationStatus::Registered;
state.name = Some(name.clone());
}
ActivationEvent::Activated => {
state.status = ActivationStatus::Active;
}
ActivationEvent::Deactivated { .. } => {
state.status = ActivationStatus::Inactive;
}
}
state
}
fn handle(
state: &Self::State,
command: Self::Command,
) -> Result<WorkflowOutput<Self::Event>, WorkflowError> {
match command {
ActivationCommand::Register { name } => {
if state.status != ActivationStatus::New {
return Err(WorkflowError::invalid_state("New", state.status.as_str()));
}
Ok(vec![ActivationEvent::Registered { name }].into())
}
ActivationCommand::Activate => {
if state.status != ActivationStatus::Registered {
return Err(WorkflowError::invalid_state(
"Registered",
state.status.as_str(),
));
}
Ok(vec![ActivationEvent::Activated].into())
}
ActivationCommand::Deactivate { reason } => {
if state.status != ActivationStatus::Active {
return Err(WorkflowError::invalid_state(
"Active",
state.status.as_str(),
));
}
Ok(vec![ActivationEvent::Deactivated { reason }].into())
}
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("╔══════════════════════════════════════════════════════════════╗");
println!("║ mako-engine — Process lifecycle example ║");
println!("╚══════════════════════════════════════════════════════════════╝");
println!();
let ctx = EngineBuilder::new()
.with_event_store(InMemoryEventStore::new())
.with_snapshot_store(InMemorySnapshotStore::new())
.build();
const SNAP_INTERVAL: u64 = 2;
println!("Phase 1: Create process and run two commands");
println!("─────────────────────────────────────────────");
let process = ctx.spawn::<ActivationWorkflow>(
TenantId::new(),
WorkflowId::new("activation", "FV2025-04-01"),
);
let identity = process.identity();
println!(" Stream : {}", process.stream_id());
println!(" Workflow: {}", process.workflow_id());
println!();
let envs = process
.execute(ActivationCommand::Register {
name: "Messstelle 12345".to_owned(),
})
.await?;
for env in &envs {
println!(" ✓ {} (seq {})", env.event_type, env.sequence_number);
}
let envs = process.execute(ActivationCommand::Activate).await?;
for env in &envs {
println!(" ✓ {} (seq {})", env.event_type, env.sequence_number);
}
let snapped = process
.take_snapshot(ctx.snapshot_store(), SNAP_INTERVAL)
.await?;
println!(
" 📸 Snapshot taken: {snapped} (seq {})",
process.event_count().await?
);
let state = process.state_with_snapshot(ctx.snapshot_store()).await?;
println!(" State after phase 1: {}", state.status.as_str());
println!();
println!("Phase 2: Simulate service restart — resume via ProcessIdentity");
println!("────────────────────────────────────────────────────────────────");
let identity_json = serde_json::to_string(&identity).unwrap();
println!(" identity JSON: {identity_json}");
let identity_back: mako_engine::ids::ProcessIdentity =
serde_json::from_str(&identity_json).unwrap();
println!(" Resuming stream: {}", identity_back.stream_id());
let resumed = ctx.resume::<ActivationWorkflow>(identity_back);
let resumed_state = resumed.state_with_snapshot(ctx.snapshot_store()).await?;
println!(
" ✓ State after resume (snapshot-accelerated): {} (name: {})",
resumed_state.status.as_str(),
resumed_state.name.as_deref().unwrap_or("-")
);
println!(" ✓ Event count: {}", resumed.event_count().await?);
let cmd_ctx = CommandContext::new(
resumed.tenant_id(),
resumed.process_id(),
resumed.workflow_id().clone(),
);
let envs = resumed
.execute_with(
ActivationCommand::Deactivate {
reason: "Planned maintenance".to_owned(),
},
cmd_ctx,
)
.await?;
for env in &envs {
println!(" ✓ {} (seq {})", env.event_type, env.sequence_number);
}
println!();
println!("Phase 3: execute_with_retry — automatic version-conflict retry");
println!("─────────────────────────────────────────────────────────────");
let p2 = ctx.spawn::<ActivationWorkflow>(
TenantId::new(),
WorkflowId::new("activation", "FV2025-04-01"),
);
p2.execute(ActivationCommand::Register {
name: "P2-Messstelle".to_owned(),
})
.await?;
let envs = p2
.execute_with_retry(ActivationCommand::Activate, 3)
.await?;
println!(
" ✓ execute_with_retry succeeded (max_attempts=3): {} at seq {}",
envs[0].event_type, envs[0].sequence_number
);
println!();
println!("Phase 4: Error classification with EngineError::is_* helpers");
println!("──────────────────────────────────────────────────────────────");
let err: EngineError = resumed
.execute(ActivationCommand::Activate)
.await
.unwrap_err();
println!(" Raw error : {err}");
println!(" is_workflow_error() : {}", err.is_workflow_error());
println!(" is_version_conflict() : {}", err.is_version_conflict());
if let Some(we) = err.as_workflow_error() {
println!(" WorkflowError variant : {we}");
println!(" is_invalid_state() : {}", we.is_invalid_state());
println!(" is_rejected() : {}", we.is_rejected());
}
println!();
let final_state = resumed.state_with_snapshot(ctx.snapshot_store()).await?;
let total_events = resumed.event_count().await?;
println!("══════════════════════════════════════════════════════════════════");
println!(" Final state : {}", final_state.status.as_str());
println!(" Total events : {total_events}");
println!(" Checks passed: process resumption, snapshot-accelerated replay,");
println!(" execute_with_retry, error classification.");
println!("══════════════════════════════════════════════════════════════════");
Ok(())
}