use super::error::ExecutionError;
use super::execution_model::{Execution, Step};
use super::execution_state::{ExecutionState, StepState, WaitReason};
use super::ids::{StepId, StepSource, StepType};
use std::time::Instant;
#[derive(Debug, Clone)]
pub enum ExecutionAction {
Start,
StepStarted {
step_id: StepId,
parent_step_id: Option<StepId>,
step_type: StepType,
name: String,
source: Option<StepSource>,
},
StepCompleted {
step_id: StepId,
output: Option<String>,
duration_ms: u64,
},
StepFailed {
step_id: StepId,
error: ExecutionError,
},
Pause { reason: String },
Resume,
Wait { reason: WaitReason },
InputReceived,
Complete { output: Option<String> },
Fail { error: ExecutionError },
Cancel { reason: String },
}
pub fn reduce(execution: &mut Execution, action: ExecutionAction) -> Result<(), ReducerError> {
match action {
ExecutionAction::Start => {
if execution.state != ExecutionState::Created {
return Err(ReducerError::InvalidTransition {
from: execution.state,
action: "Start".to_string(),
});
}
execution.state = ExecutionState::Running;
execution.started_at = Some(Instant::now());
Ok(())
}
ExecutionAction::StepStarted {
step_id,
parent_step_id,
step_type,
name,
source,
} => {
if !matches!(execution.state, ExecutionState::Running) {
return Err(ReducerError::InvalidTransition {
from: execution.state,
action: "StepStarted".to_string(),
});
}
let mut step = Step::new(step_type, name);
step.id = step_id;
step.parent_step_id = parent_step_id;
step.state = StepState::Running;
step.started_at = Some(now_millis());
step.source = source;
execution.add_step(step);
Ok(())
}
ExecutionAction::StepCompleted {
step_id,
output,
duration_ms,
} => {
if let Some(step) = execution.get_step_mut(&step_id) {
step.state = StepState::Completed;
step.output = output;
step.duration_ms = Some(duration_ms);
step.ended_at = Some(now_millis());
Ok(())
} else {
Err(ReducerError::StepNotFound(step_id))
}
}
ExecutionAction::StepFailed { step_id, error } => {
if let Some(step) = execution.get_step_mut(&step_id) {
step.state = StepState::Failed;
step.error = Some(error);
step.ended_at = Some(now_millis());
Ok(())
} else {
Err(ReducerError::StepNotFound(step_id))
}
}
ExecutionAction::Pause { reason: _ } => {
if execution.state != ExecutionState::Running {
return Err(ReducerError::InvalidTransition {
from: execution.state,
action: "Pause".to_string(),
});
}
execution.state = ExecutionState::Paused;
Ok(())
}
ExecutionAction::Resume => {
if !execution.state.can_resume() {
return Err(ReducerError::InvalidTransition {
from: execution.state,
action: "Resume".to_string(),
});
}
execution.state = ExecutionState::Running;
Ok(())
}
ExecutionAction::Wait { reason } => {
if execution.state != ExecutionState::Running {
return Err(ReducerError::InvalidTransition {
from: execution.state,
action: "Wait".to_string(),
});
}
execution.state = ExecutionState::Waiting(reason);
Ok(())
}
ExecutionAction::InputReceived => {
if !matches!(execution.state, ExecutionState::Waiting(_)) {
return Err(ReducerError::InvalidTransition {
from: execution.state,
action: "InputReceived".to_string(),
});
}
execution.state = ExecutionState::Running;
Ok(())
}
ExecutionAction::Complete { output } => {
if execution.state != ExecutionState::Running {
return Err(ReducerError::InvalidTransition {
from: execution.state,
action: "Complete".to_string(),
});
}
execution.state = ExecutionState::Completed;
execution.output = output;
execution.ended_at = Some(Instant::now());
Ok(())
}
ExecutionAction::Fail { error } => {
if execution.state.is_terminal() {
return Err(ReducerError::InvalidTransition {
from: execution.state,
action: "Fail".to_string(),
});
}
execution.state = ExecutionState::Failed;
execution.error = Some(error);
execution.ended_at = Some(Instant::now());
Ok(())
}
ExecutionAction::Cancel { reason: _ } => {
if execution.state.is_terminal() {
return Err(ReducerError::InvalidTransition {
from: execution.state,
action: "Cancel".to_string(),
});
}
execution.state = ExecutionState::Cancelled;
execution.ended_at = Some(Instant::now());
Ok(())
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum ReducerError {
#[error("Invalid state transition from {from:?} via {action}")]
InvalidTransition {
from: ExecutionState,
action: String,
},
#[error("Step not found: {0}")]
StepNotFound(StepId),
}
fn now_millis() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64
}
#[cfg(test)]
mod tests {
use super::*;
use crate::kernel::error::ExecutionErrorCategory;
#[test]
fn test_valid_execution_lifecycle() {
let mut exec = Execution::new();
assert!(reduce(&mut exec, ExecutionAction::Start).is_ok());
assert_eq!(exec.state, ExecutionState::Running);
assert!(reduce(
&mut exec,
ExecutionAction::Complete {
output: Some("done".into())
}
)
.is_ok());
assert_eq!(exec.state, ExecutionState::Completed);
assert_eq!(exec.output, Some("done".to_string()));
}
#[test]
fn test_invalid_start_from_running() {
let mut exec = Execution::new();
reduce(&mut exec, ExecutionAction::Start).unwrap();
assert!(reduce(&mut exec, ExecutionAction::Start).is_err());
}
#[test]
fn test_pause_resume() {
let mut exec = Execution::new();
reduce(&mut exec, ExecutionAction::Start).unwrap();
assert!(reduce(
&mut exec,
ExecutionAction::Pause {
reason: "test".into()
}
)
.is_ok());
assert_eq!(exec.state, ExecutionState::Paused);
assert!(reduce(&mut exec, ExecutionAction::Resume).is_ok());
assert_eq!(exec.state, ExecutionState::Running);
}
#[test]
fn test_fail_with_structured_error() {
let mut exec = Execution::new();
reduce(&mut exec, ExecutionAction::Start).unwrap();
let error = ExecutionError::llm(
crate::kernel::error::LlmErrorCode::RateLimit,
"Too many requests",
)
.with_http_status(429);
assert!(reduce(
&mut exec,
ExecutionAction::Fail {
error: error.clone()
}
)
.is_ok());
assert_eq!(exec.state, ExecutionState::Failed);
assert!(exec.error.is_some());
let stored_error = exec.error.unwrap();
assert_eq!(stored_error.category, ExecutionErrorCategory::LlmError);
assert!(stored_error.is_retryable());
assert_eq!(stored_error.http_status, Some(429));
}
#[test]
fn test_step_fail_with_structured_error() {
let mut exec = Execution::new();
reduce(&mut exec, ExecutionAction::Start).unwrap();
let step_id = StepId::new();
reduce(
&mut exec,
ExecutionAction::StepStarted {
step_id: step_id.clone(),
parent_step_id: None,
step_type: StepType::ToolNode,
name: "test_tool".into(),
source: None,
},
)
.unwrap();
let error = ExecutionError::tool(
crate::kernel::error::ToolErrorCode::ExecutionFailed,
"Tool crashed",
);
assert!(reduce(
&mut exec,
ExecutionAction::StepFailed {
step_id: step_id.clone(),
error: error.clone(),
}
)
.is_ok());
let step = exec.get_step(&step_id).unwrap();
assert_eq!(step.state, StepState::Failed);
assert!(step.error.is_some());
let stored_error = step.error.as_ref().unwrap();
assert_eq!(stored_error.category, ExecutionErrorCategory::ToolError);
assert!(stored_error.is_retryable());
}
#[test]
fn test_fatal_error_not_retryable() {
let error = ExecutionError::policy_violation("Content blocked");
assert!(!error.is_retryable());
assert!(error.is_fatal());
assert!(!error.should_retry());
}
#[test]
fn test_start_sets_started_at() {
let mut exec = Execution::new();
assert!(exec.started_at.is_none());
reduce(&mut exec, ExecutionAction::Start).unwrap();
assert!(exec.started_at.is_some());
}
#[test]
fn test_complete_sets_ended_at() {
let mut exec = Execution::new();
reduce(&mut exec, ExecutionAction::Start).unwrap();
assert!(exec.ended_at.is_none());
reduce(&mut exec, ExecutionAction::Complete { output: None }).unwrap();
assert!(exec.ended_at.is_some());
}
#[test]
fn test_fail_sets_ended_at() {
let mut exec = Execution::new();
reduce(&mut exec, ExecutionAction::Start).unwrap();
let error = ExecutionError::kernel_internal("Test failure");
reduce(&mut exec, ExecutionAction::Fail { error }).unwrap();
assert!(exec.ended_at.is_some());
}
#[test]
fn test_cancel_sets_ended_at() {
let mut exec = Execution::new();
reduce(&mut exec, ExecutionAction::Start).unwrap();
reduce(
&mut exec,
ExecutionAction::Cancel {
reason: "User cancelled".into(),
},
)
.unwrap();
assert!(exec.ended_at.is_some());
}
#[test]
fn test_wait_for_approval() {
let mut exec = Execution::new();
reduce(&mut exec, ExecutionAction::Start).unwrap();
reduce(
&mut exec,
ExecutionAction::Wait {
reason: WaitReason::Approval,
},
)
.unwrap();
assert!(matches!(
exec.state,
ExecutionState::Waiting(WaitReason::Approval)
));
}
#[test]
fn test_wait_for_tool_result() {
let mut exec = Execution::new();
reduce(&mut exec, ExecutionAction::Start).unwrap();
reduce(
&mut exec,
ExecutionAction::Wait {
reason: WaitReason::External,
},
)
.unwrap();
assert!(matches!(
exec.state,
ExecutionState::Waiting(WaitReason::External)
));
}
#[test]
fn test_wait_for_user_input() {
let mut exec = Execution::new();
reduce(&mut exec, ExecutionAction::Start).unwrap();
reduce(
&mut exec,
ExecutionAction::Wait {
reason: WaitReason::UserInput,
},
)
.unwrap();
assert!(matches!(
exec.state,
ExecutionState::Waiting(WaitReason::UserInput)
));
}
#[test]
fn test_input_received_resumes_from_waiting() {
let mut exec = Execution::new();
reduce(&mut exec, ExecutionAction::Start).unwrap();
reduce(
&mut exec,
ExecutionAction::Wait {
reason: WaitReason::Approval,
},
)
.unwrap();
reduce(&mut exec, ExecutionAction::InputReceived).unwrap();
assert_eq!(exec.state, ExecutionState::Running);
}
#[test]
fn test_resume_from_waiting() {
let mut exec = Execution::new();
reduce(&mut exec, ExecutionAction::Start).unwrap();
reduce(
&mut exec,
ExecutionAction::Wait {
reason: WaitReason::Approval,
},
)
.unwrap();
reduce(&mut exec, ExecutionAction::Resume).unwrap();
assert_eq!(exec.state, ExecutionState::Running);
}
#[test]
fn test_step_started_adds_step() {
let mut exec = Execution::new();
reduce(&mut exec, ExecutionAction::Start).unwrap();
let step_id = StepId::from_string("step_test");
reduce(
&mut exec,
ExecutionAction::StepStarted {
step_id: step_id.clone(),
parent_step_id: None,
step_type: StepType::LlmNode,
name: "test_step".into(),
source: None,
},
)
.unwrap();
assert_eq!(exec.steps.len(), 1);
let step = exec.get_step(&step_id).unwrap();
assert_eq!(step.state, StepState::Running);
assert_eq!(step.name, "test_step");
assert!(step.started_at.is_some());
}
#[test]
fn test_step_completed_sets_output() {
let mut exec = Execution::new();
reduce(&mut exec, ExecutionAction::Start).unwrap();
let step_id = StepId::from_string("step_complete");
reduce(
&mut exec,
ExecutionAction::StepStarted {
step_id: step_id.clone(),
parent_step_id: None,
step_type: StepType::LlmNode,
name: "complete_step".into(),
source: None,
},
)
.unwrap();
reduce(
&mut exec,
ExecutionAction::StepCompleted {
step_id: step_id.clone(),
output: Some("Result".into()),
duration_ms: 1000,
},
)
.unwrap();
let step = exec.get_step(&step_id).unwrap();
assert_eq!(step.state, StepState::Completed);
assert_eq!(step.output, Some("Result".to_string()));
assert_eq!(step.duration_ms, Some(1000));
assert!(step.ended_at.is_some());
}
#[test]
fn test_nested_step_with_parent() {
let mut exec = Execution::new();
reduce(&mut exec, ExecutionAction::Start).unwrap();
let parent_id = StepId::from_string("step_parent");
let child_id = StepId::from_string("step_child");
reduce(
&mut exec,
ExecutionAction::StepStarted {
step_id: parent_id.clone(),
parent_step_id: None,
step_type: StepType::GraphNode,
name: "parent".into(),
source: None,
},
)
.unwrap();
reduce(
&mut exec,
ExecutionAction::StepStarted {
step_id: child_id.clone(),
parent_step_id: Some(parent_id.clone()),
step_type: StepType::LlmNode,
name: "child".into(),
source: None,
},
)
.unwrap();
let child = exec.get_step(&child_id).unwrap();
assert_eq!(child.parent_step_id, Some(parent_id));
}
#[test]
fn test_multiple_steps_preserved_order() {
let mut exec = Execution::new();
reduce(&mut exec, ExecutionAction::Start).unwrap();
let step1 = StepId::from_string("step_1");
let step2 = StepId::from_string("step_2");
let step3 = StepId::from_string("step_3");
for (step_id, name) in [(&step1, "first"), (&step2, "second"), (&step3, "third")] {
reduce(
&mut exec,
ExecutionAction::StepStarted {
step_id: step_id.clone(),
parent_step_id: None,
step_type: StepType::FunctionNode,
name: name.into(),
source: None,
},
)
.unwrap();
}
assert_eq!(exec.step_order.len(), 3);
assert_eq!(exec.step_order[0], step1);
assert_eq!(exec.step_order[1], step2);
assert_eq!(exec.step_order[2], step3);
}
#[test]
fn test_cannot_complete_from_created() {
let mut exec = Execution::new();
let result = reduce(&mut exec, ExecutionAction::Complete { output: None });
assert!(result.is_err());
}
#[test]
fn test_cannot_pause_from_created() {
let mut exec = Execution::new();
let result = reduce(
&mut exec,
ExecutionAction::Pause {
reason: "test".into(),
},
);
assert!(result.is_err());
}
#[test]
fn test_cannot_resume_from_created() {
let mut exec = Execution::new();
let result = reduce(&mut exec, ExecutionAction::Resume);
assert!(result.is_err());
}
#[test]
fn test_cannot_resume_from_running() {
let mut exec = Execution::new();
reduce(&mut exec, ExecutionAction::Start).unwrap();
let result = reduce(&mut exec, ExecutionAction::Resume);
assert!(result.is_err());
}
#[test]
fn test_cannot_wait_from_paused() {
let mut exec = Execution::new();
reduce(&mut exec, ExecutionAction::Start).unwrap();
reduce(
&mut exec,
ExecutionAction::Pause {
reason: "test".into(),
},
)
.unwrap();
let result = reduce(
&mut exec,
ExecutionAction::Wait {
reason: WaitReason::Approval,
},
);
assert!(result.is_err());
}
#[test]
fn test_cannot_input_received_when_not_waiting() {
let mut exec = Execution::new();
reduce(&mut exec, ExecutionAction::Start).unwrap();
let result = reduce(&mut exec, ExecutionAction::InputReceived);
assert!(result.is_err());
}
#[test]
fn test_cannot_fail_from_completed() {
let mut exec = Execution::new();
reduce(&mut exec, ExecutionAction::Start).unwrap();
reduce(&mut exec, ExecutionAction::Complete { output: None }).unwrap();
let error = ExecutionError::kernel_internal("Too late");
let result = reduce(&mut exec, ExecutionAction::Fail { error });
assert!(result.is_err());
}
#[test]
fn test_cannot_cancel_from_completed() {
let mut exec = Execution::new();
reduce(&mut exec, ExecutionAction::Start).unwrap();
reduce(&mut exec, ExecutionAction::Complete { output: None }).unwrap();
let result = reduce(
&mut exec,
ExecutionAction::Cancel {
reason: "test".into(),
},
);
assert!(result.is_err());
}
#[test]
fn test_cannot_cancel_from_failed() {
let mut exec = Execution::new();
reduce(&mut exec, ExecutionAction::Start).unwrap();
let error = ExecutionError::kernel_internal("Failed");
reduce(&mut exec, ExecutionAction::Fail { error }).unwrap();
let result = reduce(
&mut exec,
ExecutionAction::Cancel {
reason: "test".into(),
},
);
assert!(result.is_err());
}
#[test]
fn test_cannot_start_step_when_not_running() {
let mut exec = Execution::new();
reduce(&mut exec, ExecutionAction::Start).unwrap();
reduce(
&mut exec,
ExecutionAction::Pause {
reason: "test".into(),
},
)
.unwrap();
let result = reduce(
&mut exec,
ExecutionAction::StepStarted {
step_id: StepId::new(),
parent_step_id: None,
step_type: StepType::LlmNode,
name: "test".into(),
source: None,
},
);
assert!(result.is_err());
}
#[test]
fn test_step_not_found_error() {
let mut exec = Execution::new();
reduce(&mut exec, ExecutionAction::Start).unwrap();
let nonexistent = StepId::from_string("step_nonexistent");
let result = reduce(
&mut exec,
ExecutionAction::StepCompleted {
step_id: nonexistent,
output: None,
duration_ms: 0,
},
);
assert!(matches!(result, Err(ReducerError::StepNotFound(_))));
}
#[test]
fn test_step_failed_not_found() {
let mut exec = Execution::new();
reduce(&mut exec, ExecutionAction::Start).unwrap();
let nonexistent = StepId::from_string("step_missing");
let error = ExecutionError::kernel_internal("Test");
let result = reduce(
&mut exec,
ExecutionAction::StepFailed {
step_id: nonexistent,
error,
},
);
assert!(matches!(result, Err(ReducerError::StepNotFound(_))));
}
#[test]
fn test_can_fail_from_running() {
let mut exec = Execution::new();
reduce(&mut exec, ExecutionAction::Start).unwrap();
let error = ExecutionError::kernel_internal("Runtime error");
let result = reduce(&mut exec, ExecutionAction::Fail { error });
assert!(result.is_ok());
assert_eq!(exec.state, ExecutionState::Failed);
}
#[test]
fn test_can_fail_from_paused() {
let mut exec = Execution::new();
reduce(&mut exec, ExecutionAction::Start).unwrap();
reduce(
&mut exec,
ExecutionAction::Pause {
reason: "test".into(),
},
)
.unwrap();
let error = ExecutionError::kernel_internal("Error while paused");
let result = reduce(&mut exec, ExecutionAction::Fail { error });
assert!(result.is_ok());
assert_eq!(exec.state, ExecutionState::Failed);
}
#[test]
fn test_can_fail_from_waiting() {
let mut exec = Execution::new();
reduce(&mut exec, ExecutionAction::Start).unwrap();
reduce(
&mut exec,
ExecutionAction::Wait {
reason: WaitReason::Approval,
},
)
.unwrap();
let error = ExecutionError::timeout("Approval timed out");
let result = reduce(&mut exec, ExecutionAction::Fail { error });
assert!(result.is_ok());
assert_eq!(exec.state, ExecutionState::Failed);
}
#[test]
fn test_can_cancel_from_running() {
let mut exec = Execution::new();
reduce(&mut exec, ExecutionAction::Start).unwrap();
let result = reduce(
&mut exec,
ExecutionAction::Cancel {
reason: "User request".into(),
},
);
assert!(result.is_ok());
assert_eq!(exec.state, ExecutionState::Cancelled);
}
#[test]
fn test_can_cancel_from_paused() {
let mut exec = Execution::new();
reduce(&mut exec, ExecutionAction::Start).unwrap();
reduce(
&mut exec,
ExecutionAction::Pause {
reason: "test".into(),
},
)
.unwrap();
let result = reduce(
&mut exec,
ExecutionAction::Cancel {
reason: "Cancelled".into(),
},
);
assert!(result.is_ok());
assert_eq!(exec.state, ExecutionState::Cancelled);
}
#[test]
fn test_can_cancel_from_waiting() {
let mut exec = Execution::new();
reduce(&mut exec, ExecutionAction::Start).unwrap();
reduce(
&mut exec,
ExecutionAction::Wait {
reason: WaitReason::Approval,
},
)
.unwrap();
let result = reduce(
&mut exec,
ExecutionAction::Cancel {
reason: "Timeout".into(),
},
);
assert!(result.is_ok());
assert_eq!(exec.state, ExecutionState::Cancelled);
}
#[test]
fn test_reducer_error_display_invalid_transition() {
let error = ReducerError::InvalidTransition {
from: ExecutionState::Paused,
action: "Start".to_string(),
};
let display = format!("{}", error);
assert!(display.contains("Paused"));
assert!(display.contains("Start"));
}
#[test]
fn test_reducer_error_display_step_not_found() {
let step_id = StepId::from_string("step_missing");
let error = ReducerError::StepNotFound(step_id);
let display = format!("{}", error);
assert!(display.contains("step_missing"));
}
}