use std::collections::{HashMap, HashSet, VecDeque};
use std::time::{Duration, Instant};
use serde_json::Value;
use crate::agent::DeepAgentError;
type ConditionFn = Box<dyn Fn(&Value) -> bool + Send + Sync>;
pub type Result<T> = std::result::Result<T, DeepAgentError>;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StepStatus {
Pending,
Running,
Completed,
Failed,
Skipped,
TimedOut,
}
impl std::fmt::Display for StepStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Pending => write!(f, "pending"),
Self::Running => write!(f, "running"),
Self::Completed => write!(f, "completed"),
Self::Failed => write!(f, "failed"),
Self::Skipped => write!(f, "skipped"),
Self::TimedOut => write!(f, "timed_out"),
}
}
}
pub enum StepAction {
Transform(Box<dyn Fn(Value) -> std::result::Result<Value, String> + Send + Sync>),
Message(String),
Parallel(Vec<String>),
Checkpoint,
}
impl std::fmt::Debug for StepAction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Transform(_) => write!(f, "Transform(<fn>)"),
Self::Message(msg) => write!(f, "Message({:?})", msg),
Self::Parallel(ids) => write!(f, "Parallel({:?})", ids),
Self::Checkpoint => write!(f, "Checkpoint"),
}
}
}
pub struct WorkflowStep {
pub id: String,
pub name: String,
pub description: Option<String>,
pub action: StepAction,
pub dependencies: Vec<String>,
pub condition: Option<ConditionFn>,
pub timeout: Option<Duration>,
pub retry_count: usize,
}
impl std::fmt::Debug for WorkflowStep {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WorkflowStep")
.field("id", &self.id)
.field("name", &self.name)
.field("description", &self.description)
.field("action", &self.action)
.field("dependencies", &self.dependencies)
.field("condition", &self.condition.is_some())
.field("timeout", &self.timeout)
.field("retry_count", &self.retry_count)
.finish()
}
}
impl WorkflowStep {
pub fn new(id: impl Into<String>, name: impl Into<String>, action: StepAction) -> Self {
Self {
id: id.into(),
name: name.into(),
description: None,
action,
dependencies: Vec::new(),
condition: None,
timeout: None,
retry_count: 0,
}
}
pub fn with_description(mut self, desc: impl Into<String>) -> Self {
self.description = Some(desc.into());
self
}
pub fn with_dependency(mut self, dep_id: impl Into<String>) -> Self {
self.dependencies.push(dep_id.into());
self
}
pub fn with_dependencies(mut self, deps: Vec<String>) -> Self {
self.dependencies.extend(deps);
self
}
pub fn with_condition(mut self, cond: impl Fn(&Value) -> bool + Send + Sync + 'static) -> Self {
self.condition = Some(Box::new(cond));
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub fn with_retry_count(mut self, count: usize) -> Self {
self.retry_count = count;
self
}
}
#[derive(Debug, Clone)]
pub struct StepResult {
pub step_id: String,
pub status: StepStatus,
pub output: Option<Value>,
pub duration: Duration,
pub error: Option<String>,
}
pub struct Workflow {
pub id: String,
pub name: String,
pub steps: Vec<WorkflowStep>,
}
impl Workflow {
pub fn new(id: impl Into<String>, name: impl Into<String>) -> Self {
Self {
id: id.into(),
name: name.into(),
steps: Vec::new(),
}
}
pub fn add_step(&mut self, step: WorkflowStep) -> &mut Self {
self.steps.push(step);
self
}
pub fn validate(&self) -> Result<()> {
let ids: HashSet<&str> = self.steps.iter().map(|s| s.id.as_str()).collect();
if ids.len() != self.steps.len() {
return Err(DeepAgentError::ConfigError(
"Workflow contains duplicate step IDs".to_string(),
));
}
for step in &self.steps {
for dep in &step.dependencies {
if !ids.contains(dep.as_str()) {
return Err(DeepAgentError::ConfigError(format!(
"Step '{}' depends on non-existent step '{}'",
step.id, dep
)));
}
}
if step.dependencies.contains(&step.id) {
return Err(DeepAgentError::ConfigError(format!(
"Step '{}' depends on itself",
step.id
)));
}
}
let mut in_degree: HashMap<&str, usize> = HashMap::new();
let mut adj: HashMap<&str, Vec<&str>> = HashMap::new();
for step in &self.steps {
in_degree.entry(step.id.as_str()).or_insert(0);
adj.entry(step.id.as_str()).or_default();
for dep in &step.dependencies {
adj.entry(dep.as_str()).or_default().push(step.id.as_str());
*in_degree.entry(step.id.as_str()).or_insert(0) += 1;
}
}
let mut queue: VecDeque<&str> = in_degree
.iter()
.filter(|(_, °)| deg == 0)
.map(|(&id, _)| id)
.collect();
let mut visited = 0usize;
while let Some(node) = queue.pop_front() {
visited += 1;
if let Some(neighbors) = adj.get(node) {
for &neighbor in neighbors {
if let Some(deg) = in_degree.get_mut(neighbor) {
*deg -= 1;
if *deg == 0 {
queue.push_back(neighbor);
}
}
}
}
}
if visited != self.steps.len() {
return Err(DeepAgentError::ConfigError(
"Workflow contains a dependency cycle".to_string(),
));
}
Ok(())
}
pub fn get_ready_steps(&self, completed: &HashSet<String>) -> Vec<&WorkflowStep> {
self.steps
.iter()
.filter(|step| {
!completed.contains(&step.id)
&& step.dependencies.iter().all(|dep| completed.contains(dep))
})
.collect()
}
pub fn is_complete(&self, completed: &HashSet<String>) -> bool {
!self.steps.is_empty() && self.steps.iter().all(|s| completed.contains(&s.id))
}
}
#[derive(Debug, Clone)]
pub struct WorkflowResult {
pub results: HashMap<String, StepResult>,
pub final_state: Value,
pub total_duration: Duration,
}
impl WorkflowResult {
pub fn succeeded(&self) -> Vec<&StepResult> {
self.results
.values()
.filter(|r| r.status == StepStatus::Completed)
.collect()
}
pub fn failed(&self) -> Vec<&StepResult> {
self.results
.values()
.filter(|r| r.status == StepStatus::Failed || r.status == StepStatus::TimedOut)
.collect()
}
}
pub struct WorkflowExecutor {
pub checkpoints: Vec<Value>,
}
impl WorkflowExecutor {
pub fn new() -> Self {
Self {
checkpoints: Vec::new(),
}
}
pub fn execute(&mut self, workflow: &Workflow, initial_state: Value) -> Result<WorkflowResult> {
workflow.validate()?;
let start = Instant::now();
let mut state = initial_state;
let mut results: HashMap<String, StepResult> = HashMap::new();
let mut completed: HashSet<String> = HashSet::new();
let mut processed: HashSet<String> = HashSet::new();
loop {
if workflow.is_complete(&processed) {
break;
}
let ready_ids: Vec<String> = workflow
.get_ready_steps(&processed)
.iter()
.map(|s| s.id.clone())
.collect();
if ready_ids.is_empty() {
break;
}
for step_id in &ready_ids {
let step = workflow.steps.iter().find(|s| s.id == *step_id).unwrap();
let step_result = self.execute_step(step, &mut state);
if step_result.status == StepStatus::Completed
|| step_result.status == StepStatus::Skipped
{
completed.insert(step_id.clone());
}
processed.insert(step_id.clone());
results.insert(step_id.clone(), step_result);
}
}
Ok(WorkflowResult {
results,
final_state: state,
total_duration: start.elapsed(),
})
}
fn execute_step(&mut self, step: &WorkflowStep, state: &mut Value) -> StepResult {
let step_start = Instant::now();
if let Some(ref cond) = step.condition {
if !cond(state) {
return StepResult {
step_id: step.id.clone(),
status: StepStatus::Skipped,
output: None,
duration: step_start.elapsed(),
error: Some("Condition not met".to_string()),
};
}
}
let max_attempts = step.retry_count + 1;
let mut last_error: Option<String> = None;
for _attempt in 0..max_attempts {
if let Some(timeout) = step.timeout {
if step_start.elapsed() >= timeout {
return StepResult {
step_id: step.id.clone(),
status: StepStatus::TimedOut,
output: None,
duration: step_start.elapsed(),
error: Some("Step timed out".to_string()),
};
}
}
match &step.action {
StepAction::Transform(transform) => match transform(state.clone()) {
Ok(new_state) => {
*state = new_state.clone();
return StepResult {
step_id: step.id.clone(),
status: StepStatus::Completed,
output: Some(new_state),
duration: step_start.elapsed(),
error: None,
};
}
Err(e) => {
last_error = Some(e);
}
},
StepAction::Message(msg) => {
let messages = state.as_object_mut().and_then(|obj| {
if !obj.contains_key("messages") {
obj.insert("messages".to_string(), Value::Array(Vec::new()));
}
obj.get_mut("messages").and_then(|v| v.as_array_mut())
});
if let Some(msgs) = messages {
msgs.push(serde_json::json!({
"type": "agent",
"content": msg
}));
}
return StepResult {
step_id: step.id.clone(),
status: StepStatus::Completed,
output: Some(serde_json::json!({ "message": msg })),
duration: step_start.elapsed(),
error: None,
};
}
StepAction::Parallel(step_ids) => {
return StepResult {
step_id: step.id.clone(),
status: StepStatus::Completed,
output: Some(serde_json::json!({ "parallel_steps": step_ids })),
duration: step_start.elapsed(),
error: None,
};
}
StepAction::Checkpoint => {
self.checkpoints.push(state.clone());
return StepResult {
step_id: step.id.clone(),
status: StepStatus::Completed,
output: Some(state.clone()),
duration: step_start.elapsed(),
error: None,
};
}
}
}
StepResult {
step_id: step.id.clone(),
status: StepStatus::Failed,
output: None,
duration: step_start.elapsed(),
error: last_error,
}
}
}
impl Default for WorkflowExecutor {
fn default() -> Self {
Self::new()
}
}
pub struct WorkflowBuilder {
id: String,
name: String,
steps: Vec<WorkflowStep>,
deferred_deps: Vec<(String, String)>,
}
impl WorkflowBuilder {
pub fn new(id: impl Into<String>, name: impl Into<String>) -> Self {
Self {
id: id.into(),
name: name.into(),
steps: Vec::new(),
deferred_deps: Vec::new(),
}
}
pub fn add_step(mut self, step: WorkflowStep) -> Self {
self.steps.push(step);
self
}
pub fn add_step_simple(
self,
id: impl Into<String>,
name: impl Into<String>,
action: StepAction,
) -> Self {
self.add_step(WorkflowStep::new(id, name, action))
}
pub fn dependency(mut self, step_id: impl Into<String>, dep_id: impl Into<String>) -> Self {
self.deferred_deps.push((step_id.into(), dep_id.into()));
self
}
pub fn build(mut self) -> Result<Workflow> {
for (step_id, dep_id) in &self.deferred_deps {
let step = self
.steps
.iter_mut()
.find(|s| s.id == *step_id)
.ok_or_else(|| {
DeepAgentError::ConfigError(format!(
"Cannot add dependency: step '{}' not found",
step_id
))
})?;
if !step.dependencies.contains(dep_id) {
step.dependencies.push(dep_id.clone());
}
}
let mut workflow = Workflow::new(self.id, self.name);
for step in self.steps {
workflow.add_step(step);
}
workflow.validate()?;
Ok(workflow)
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_step_status_display() {
assert_eq!(StepStatus::Pending.to_string(), "pending");
assert_eq!(StepStatus::Running.to_string(), "running");
assert_eq!(StepStatus::Completed.to_string(), "completed");
assert_eq!(StepStatus::Failed.to_string(), "failed");
assert_eq!(StepStatus::Skipped.to_string(), "skipped");
assert_eq!(StepStatus::TimedOut.to_string(), "timed_out");
}
#[test]
fn test_step_status_equality() {
assert_eq!(StepStatus::Pending, StepStatus::Pending);
assert_ne!(StepStatus::Pending, StepStatus::Running);
}
#[test]
fn test_step_action_debug() {
let t = StepAction::Transform(Box::new(|v| Ok(v)));
assert!(format!("{:?}", t).contains("Transform"));
let m = StepAction::Message("hello".into());
assert!(format!("{:?}", m).contains("hello"));
let p = StepAction::Parallel(vec!["a".into(), "b".into()]);
assert!(format!("{:?}", p).contains("Parallel"));
let c = StepAction::Checkpoint;
assert!(format!("{:?}", c).contains("Checkpoint"));
}
#[test]
fn test_workflow_step_new() {
let step = WorkflowStep::new("s1", "Step One", StepAction::Checkpoint);
assert_eq!(step.id, "s1");
assert_eq!(step.name, "Step One");
assert!(step.description.is_none());
assert!(step.dependencies.is_empty());
assert!(step.condition.is_none());
assert!(step.timeout.is_none());
assert_eq!(step.retry_count, 0);
}
#[test]
fn test_workflow_step_with_description() {
let step =
WorkflowStep::new("s1", "Step", StepAction::Checkpoint).with_description("Does things");
assert_eq!(step.description.as_deref(), Some("Does things"));
}
#[test]
fn test_workflow_step_with_dependency() {
let step = WorkflowStep::new("s2", "Step", StepAction::Checkpoint).with_dependency("s1");
assert_eq!(step.dependencies, vec!["s1"]);
}
#[test]
fn test_workflow_step_with_dependencies() {
let step = WorkflowStep::new("s3", "Step", StepAction::Checkpoint)
.with_dependencies(vec!["s1".into(), "s2".into()]);
assert_eq!(step.dependencies, vec!["s1", "s2"]);
}
#[test]
fn test_workflow_step_with_condition() {
let step = WorkflowStep::new("s1", "Step", StepAction::Checkpoint)
.with_condition(|v| v.get("ready").and_then(|r| r.as_bool()).unwrap_or(false));
assert!(step.condition.is_some());
}
#[test]
fn test_workflow_step_with_timeout() {
let step = WorkflowStep::new("s1", "Step", StepAction::Checkpoint)
.with_timeout(Duration::from_secs(30));
assert_eq!(step.timeout, Some(Duration::from_secs(30)));
}
#[test]
fn test_workflow_step_with_retry_count() {
let step = WorkflowStep::new("s1", "Step", StepAction::Checkpoint).with_retry_count(3);
assert_eq!(step.retry_count, 3);
}
#[test]
fn test_workflow_new() {
let wf = Workflow::new("wf-1", "My Workflow");
assert_eq!(wf.id, "wf-1");
assert_eq!(wf.name, "My Workflow");
assert!(wf.steps.is_empty());
}
#[test]
fn test_workflow_add_step() {
let mut wf = Workflow::new("wf", "Test");
wf.add_step(WorkflowStep::new("s1", "Step 1", StepAction::Checkpoint));
wf.add_step(WorkflowStep::new("s2", "Step 2", StepAction::Checkpoint));
assert_eq!(wf.steps.len(), 2);
}
#[test]
fn test_workflow_validate_valid() {
let mut wf = Workflow::new("wf", "Test");
wf.add_step(WorkflowStep::new("s1", "Step 1", StepAction::Checkpoint));
wf.add_step(
WorkflowStep::new("s2", "Step 2", StepAction::Checkpoint).with_dependency("s1"),
);
assert!(wf.validate().is_ok());
}
#[test]
fn test_workflow_validate_missing_dependency() {
let mut wf = Workflow::new("wf", "Test");
wf.add_step(
WorkflowStep::new("s1", "Step 1", StepAction::Checkpoint)
.with_dependency("nonexistent"),
);
let err = wf.validate().unwrap_err();
let msg = err.to_string();
assert!(msg.contains("non-existent step"));
}
#[test]
fn test_workflow_validate_duplicate_ids() {
let mut wf = Workflow::new("wf", "Test");
wf.add_step(WorkflowStep::new("s1", "Step 1", StepAction::Checkpoint));
wf.add_step(WorkflowStep::new(
"s1",
"Step 1 dup",
StepAction::Checkpoint,
));
let err = wf.validate().unwrap_err();
assert!(err.to_string().contains("duplicate"));
}
#[test]
fn test_workflow_validate_self_dependency() {
let mut wf = Workflow::new("wf", "Test");
wf.add_step(
WorkflowStep::new("s1", "Step 1", StepAction::Checkpoint).with_dependency("s1"),
);
let err = wf.validate().unwrap_err();
assert!(err.to_string().contains("depends on itself"));
}
#[test]
fn test_workflow_validate_cycle() {
let mut wf = Workflow::new("wf", "Test");
wf.add_step(WorkflowStep::new("a", "A", StepAction::Checkpoint).with_dependency("b"));
wf.add_step(WorkflowStep::new("b", "B", StepAction::Checkpoint).with_dependency("a"));
let err = wf.validate().unwrap_err();
assert!(err.to_string().contains("cycle"));
}
#[test]
fn test_get_ready_steps_no_deps() {
let mut wf = Workflow::new("wf", "Test");
wf.add_step(WorkflowStep::new("s1", "A", StepAction::Checkpoint));
wf.add_step(WorkflowStep::new("s2", "B", StepAction::Checkpoint));
let completed = HashSet::new();
let ready = wf.get_ready_steps(&completed);
assert_eq!(ready.len(), 2);
}
#[test]
fn test_get_ready_steps_with_deps() {
let mut wf = Workflow::new("wf", "Test");
wf.add_step(WorkflowStep::new("s1", "A", StepAction::Checkpoint));
wf.add_step(WorkflowStep::new("s2", "B", StepAction::Checkpoint).with_dependency("s1"));
wf.add_step(WorkflowStep::new("s3", "C", StepAction::Checkpoint).with_dependency("s2"));
let completed = HashSet::new();
let ready = wf.get_ready_steps(&completed);
assert_eq!(ready.len(), 1);
assert_eq!(ready[0].id, "s1");
let mut completed = HashSet::new();
completed.insert("s1".to_string());
let ready = wf.get_ready_steps(&completed);
assert_eq!(ready.len(), 1);
assert_eq!(ready[0].id, "s2");
}
#[test]
fn test_is_complete() {
let mut wf = Workflow::new("wf", "Test");
wf.add_step(WorkflowStep::new("s1", "A", StepAction::Checkpoint));
wf.add_step(WorkflowStep::new("s2", "B", StepAction::Checkpoint));
let empty: HashSet<String> = HashSet::new();
assert!(!wf.is_complete(&empty));
let mut partial = HashSet::new();
partial.insert("s1".to_string());
assert!(!wf.is_complete(&partial));
let mut all = HashSet::new();
all.insert("s1".to_string());
all.insert("s2".to_string());
assert!(wf.is_complete(&all));
}
#[test]
fn test_is_complete_empty_workflow() {
let wf = Workflow::new("wf", "Empty");
let empty: HashSet<String> = HashSet::new();
assert!(!wf.is_complete(&empty));
}
#[test]
fn test_execute_single_transform() {
let mut wf = Workflow::new("wf", "Test");
wf.add_step(WorkflowStep::new(
"s1",
"Double x",
StepAction::Transform(Box::new(|mut v| {
let x = v.get("x").and_then(|x| x.as_i64()).unwrap_or(0);
v["x"] = serde_json::json!(x * 2);
Ok(v)
})),
));
let mut executor = WorkflowExecutor::new();
let result = executor.execute(&wf, json!({"x": 5})).unwrap();
assert_eq!(result.final_state["x"], 10);
assert_eq!(result.succeeded().len(), 1);
assert!(result.failed().is_empty());
}
#[test]
fn test_execute_chained_transforms() {
let mut wf = Workflow::new("wf", "Chain");
wf.add_step(WorkflowStep::new(
"add",
"Add 10",
StepAction::Transform(Box::new(|mut v| {
let x = v.get("x").and_then(|x| x.as_i64()).unwrap_or(0);
v["x"] = json!(x + 10);
Ok(v)
})),
));
wf.add_step(
WorkflowStep::new(
"mul",
"Multiply by 3",
StepAction::Transform(Box::new(|mut v| {
let x = v.get("x").and_then(|x| x.as_i64()).unwrap_or(0);
v["x"] = json!(x * 3);
Ok(v)
})),
)
.with_dependency("add"),
);
let mut executor = WorkflowExecutor::new();
let result = executor.execute(&wf, json!({"x": 2})).unwrap();
assert_eq!(result.final_state["x"], 36);
assert_eq!(result.succeeded().len(), 2);
}
#[test]
fn test_execute_condition_skip() {
let mut wf = Workflow::new("wf", "Cond");
wf.add_step(
WorkflowStep::new("guarded", "Guarded step", StepAction::Message("hi".into()))
.with_condition(|v| v.get("run_it").and_then(|r| r.as_bool()).unwrap_or(false)),
);
let mut executor = WorkflowExecutor::new();
let result = executor.execute(&wf, json!({"run_it": false})).unwrap();
let step_result = result.results.get("guarded").unwrap();
assert_eq!(step_result.status, StepStatus::Skipped);
}
#[test]
fn test_execute_condition_pass() {
let mut wf = Workflow::new("wf", "Cond");
wf.add_step(
WorkflowStep::new("guarded", "Guarded step", StepAction::Message("hi".into()))
.with_condition(|v| v.get("run_it").and_then(|r| r.as_bool()).unwrap_or(false)),
);
let mut executor = WorkflowExecutor::new();
let result = executor.execute(&wf, json!({"run_it": true})).unwrap();
let step_result = result.results.get("guarded").unwrap();
assert_eq!(step_result.status, StepStatus::Completed);
}
#[test]
fn test_execute_retry_then_fail() {
let mut wf = Workflow::new("wf", "Retry");
wf.add_step(
WorkflowStep::new(
"failing",
"Always fails",
StepAction::Transform(Box::new(|_| Err("boom".to_string()))),
)
.with_retry_count(2),
);
let mut executor = WorkflowExecutor::new();
let result = executor.execute(&wf, json!({})).unwrap();
let step_result = result.results.get("failing").unwrap();
assert_eq!(step_result.status, StepStatus::Failed);
assert_eq!(step_result.error.as_deref(), Some("boom"));
}
#[test]
fn test_execute_retry_success_on_transform() {
let mut wf = Workflow::new("wf", "Retry");
wf.add_step(
WorkflowStep::new("ok", "Succeeds", StepAction::Transform(Box::new(|v| Ok(v))))
.with_retry_count(3),
);
let mut executor = WorkflowExecutor::new();
let result = executor.execute(&wf, json!({"val": 1})).unwrap();
let step_result = result.results.get("ok").unwrap();
assert_eq!(step_result.status, StepStatus::Completed);
}
#[test]
fn test_execute_message_action() {
let mut wf = Workflow::new("wf", "Msg");
wf.add_step(WorkflowStep::new(
"msg",
"Send message",
StepAction::Message("Hello agent".into()),
));
let mut executor = WorkflowExecutor::new();
let result = executor.execute(&wf, json!({})).unwrap();
let step_result = result.results.get("msg").unwrap();
assert_eq!(step_result.status, StepStatus::Completed);
let messages = result.final_state["messages"].as_array().unwrap();
assert_eq!(messages.len(), 1);
assert_eq!(messages[0]["content"], "Hello agent");
}
#[test]
fn test_execute_checkpoint_action() {
let mut wf = Workflow::new("wf", "Chk");
wf.add_step(WorkflowStep::new(
"chk",
"Save state",
StepAction::Checkpoint,
));
let mut executor = WorkflowExecutor::new();
let result = executor.execute(&wf, json!({"data": 42})).unwrap();
assert_eq!(
result.results.get("chk").unwrap().status,
StepStatus::Completed
);
assert_eq!(executor.checkpoints.len(), 1);
assert_eq!(executor.checkpoints[0]["data"], 42);
}
#[test]
fn test_execute_parallel_action() {
let mut wf = Workflow::new("wf", "Par");
wf.add_step(WorkflowStep::new(
"par",
"Parallel steps",
StepAction::Parallel(vec!["a".into(), "b".into()]),
));
let mut executor = WorkflowExecutor::new();
let result = executor.execute(&wf, json!({})).unwrap();
let step_result = result.results.get("par").unwrap();
assert_eq!(step_result.status, StepStatus::Completed);
let output = step_result.output.as_ref().unwrap();
assert!(output.get("parallel_steps").is_some());
}
#[test]
fn test_workflow_result_succeeded_and_failed() {
let mut results = HashMap::new();
results.insert(
"s1".to_string(),
StepResult {
step_id: "s1".to_string(),
status: StepStatus::Completed,
output: None,
duration: Duration::from_millis(10),
error: None,
},
);
results.insert(
"s2".to_string(),
StepResult {
step_id: "s2".to_string(),
status: StepStatus::Failed,
output: None,
duration: Duration::from_millis(20),
error: Some("err".to_string()),
},
);
results.insert(
"s3".to_string(),
StepResult {
step_id: "s3".to_string(),
status: StepStatus::TimedOut,
output: None,
duration: Duration::from_millis(30),
error: Some("timeout".to_string()),
},
);
let wr = WorkflowResult {
results,
final_state: json!({}),
total_duration: Duration::from_millis(60),
};
assert_eq!(wr.succeeded().len(), 1);
assert_eq!(wr.failed().len(), 2); }
#[test]
fn test_builder_simple() {
let wf = WorkflowBuilder::new("wf", "Builder test")
.add_step_simple("s1", "First", StepAction::Checkpoint)
.add_step_simple("s2", "Second", StepAction::Checkpoint)
.dependency("s2", "s1")
.build()
.unwrap();
assert_eq!(wf.id, "wf");
assert_eq!(wf.name, "Builder test");
assert_eq!(wf.steps.len(), 2);
assert_eq!(wf.steps[1].dependencies, vec!["s1"]);
}
#[test]
fn test_builder_invalid_dependency_step() {
let result = WorkflowBuilder::new("wf", "Test")
.add_step_simple("s1", "First", StepAction::Checkpoint)
.dependency("nonexistent", "s1")
.build();
assert!(result.is_err());
}
#[test]
fn test_builder_with_full_step() {
let step = WorkflowStep::new("s1", "Step", StepAction::Checkpoint)
.with_description("A step")
.with_retry_count(2)
.with_timeout(Duration::from_secs(10));
let wf = WorkflowBuilder::new("wf", "Test")
.add_step(step)
.build()
.unwrap();
assert_eq!(wf.steps[0].description.as_deref(), Some("A step"));
assert_eq!(wf.steps[0].retry_count, 2);
assert_eq!(wf.steps[0].timeout, Some(Duration::from_secs(10)));
}
#[test]
fn test_execute_respects_dependency_order() {
let wf = WorkflowBuilder::new("wf", "Order test")
.add_step(WorkflowStep::new(
"s1",
"Set x",
StepAction::Transform(Box::new(|mut v| {
v["x"] = json!(1);
Ok(v)
})),
))
.add_step(
WorkflowStep::new(
"s2",
"Add 10",
StepAction::Transform(Box::new(|mut v| {
let x = v["x"].as_i64().unwrap_or(0);
v["x"] = json!(x + 10);
Ok(v)
})),
)
.with_dependency("s1"),
)
.add_step(
WorkflowStep::new(
"s3",
"Mul 2",
StepAction::Transform(Box::new(|mut v| {
let x = v["x"].as_i64().unwrap_or(0);
v["x"] = json!(x * 2);
Ok(v)
})),
)
.with_dependency("s2"),
)
.build()
.unwrap();
let mut executor = WorkflowExecutor::new();
let result = executor.execute(&wf, json!({})).unwrap();
assert_eq!(result.final_state["x"], 22);
assert_eq!(result.succeeded().len(), 3);
}
#[test]
fn test_workflow_executor_default() {
let executor = WorkflowExecutor::default();
assert!(executor.checkpoints.is_empty());
}
#[test]
fn test_step_result_has_duration() {
let mut wf = Workflow::new("wf", "Dur");
wf.add_step(WorkflowStep::new("s1", "Step", StepAction::Checkpoint));
let mut executor = WorkflowExecutor::new();
let result = executor.execute(&wf, json!({})).unwrap();
let sr = result.results.get("s1").unwrap();
let _ = sr.duration;
let _ = result.total_duration;
}
}