Skip to main content

agentic_workflow/engine/
dag_exec.rs

1use std::collections::HashMap;
2use std::time::Instant;
3
4use chrono::Utc;
5
6use crate::types::{
7    Edge, EdgeType, ExecutionContext, ExecutionEvent, ExecutionEventType,
8    ExecutionFingerprint, ExecutionStatus, StepLifecycle, StepState, StepType,
9    Workflow, WorkflowError, WorkflowResult,
10};
11
12/// Step execution result with output and timing.
13#[derive(Debug, Clone)]
14pub struct StepExecutionResult {
15    pub step_id: String,
16    pub success: bool,
17    pub output: Option<serde_json::Value>,
18    pub error: Option<String>,
19    pub duration_ms: u64,
20}
21
22/// Execute a single step and capture its result.
23pub fn execute_step(
24    step_id: &str,
25    step_type: &StepType,
26    inputs: &HashMap<String, serde_json::Value>,
27    timeout_ms: Option<u64>,
28) -> StepExecutionResult {
29    let start = Instant::now();
30
31    let result = match step_type {
32        StepType::Noop => Ok(serde_json::json!({"status": "noop"})),
33
34        StepType::Expression { expression } => {
35            Ok(serde_json::json!({
36                "expression": expression,
37                "evaluated": true,
38                "inputs": inputs,
39            }))
40        }
41
42        StepType::Command { command, args } => {
43            // Build command description (don't execute in library — that's the runner's job)
44            Ok(serde_json::json!({
45                "command": command,
46                "args": args,
47                "status": "prepared",
48                "note": "Execution delegated to step runner"
49            }))
50        }
51
52        StepType::McpTool { sister, tool, params } => {
53            Ok(serde_json::json!({
54                "sister": sister,
55                "tool": tool,
56                "params": params,
57                "status": "prepared",
58                "note": "Execution delegated to MCP dispatcher"
59            }))
60        }
61
62        StepType::HttpRequest { method, url, headers, body } => {
63            Ok(serde_json::json!({
64                "method": method,
65                "url": url,
66                "headers": headers,
67                "body": body,
68                "status": "prepared",
69                "note": "Execution delegated to HTTP runner"
70            }))
71        }
72
73        StepType::SubWorkflow { workflow_id } => {
74            Ok(serde_json::json!({
75                "sub_workflow_id": workflow_id,
76                "status": "prepared",
77                "note": "Execution delegated to sub-workflow runner"
78            }))
79        }
80
81        StepType::FanOut { destinations, completion_policy } => {
82            Ok(serde_json::json!({
83                "destinations": destinations.len(),
84                "completion_policy": format!("{:?}", completion_policy),
85                "status": "prepared"
86            }))
87        }
88
89        StepType::ApprovalGate { approvers, timeout_ms } => {
90            Ok(serde_json::json!({
91                "approvers": approvers,
92                "timeout_ms": timeout_ms,
93                "status": "waiting_approval"
94            }))
95        }
96    };
97
98    let duration = start.elapsed();
99    let duration_ms = duration.as_millis() as u64;
100
101    // Check timeout
102    if let Some(timeout) = timeout_ms {
103        if duration_ms > timeout {
104            return StepExecutionResult {
105                step_id: step_id.to_string(),
106                success: false,
107                output: None,
108                error: Some(format!("Step timed out after {}ms (limit: {}ms)", duration_ms, timeout)),
109                duration_ms,
110            };
111        }
112    }
113
114    match result {
115        Ok(output) => StepExecutionResult {
116            step_id: step_id.to_string(),
117            success: true,
118            output: Some(output),
119            error: None,
120            duration_ms,
121        },
122        Err(e) => StepExecutionResult {
123            step_id: step_id.to_string(),
124            success: false,
125            output: None,
126            error: Some(e),
127            duration_ms,
128        },
129    }
130}
131
132/// Apply a step execution result to the execution context.
133pub fn apply_step_result(
134    ctx: &mut ExecutionContext,
135    result: &StepExecutionResult,
136) {
137    if let Some(state) = ctx.step_states.get_mut(&result.step_id) {
138        state.lifecycle = if result.success {
139            StepLifecycle::Success
140        } else {
141            StepLifecycle::Failed
142        };
143        state.completed_at = Some(Utc::now());
144        state.duration_ms = Some(result.duration_ms);
145        state.output = result.output.clone();
146        state.error = result.error.clone();
147        state.attempt += 1;
148    }
149}
150
151/// Get the next ready steps (all dependencies satisfied).
152pub fn next_ready_steps(
153    workflow: &Workflow,
154    ctx: &ExecutionContext,
155) -> Vec<String> {
156    let mut ready = Vec::new();
157
158    for step in &workflow.steps {
159        let state = ctx.step_states.get(&step.id);
160        if state.map_or(true, |s| s.lifecycle != StepLifecycle::Pending) {
161            continue; // Not pending — skip
162        }
163
164        // Check all incoming edges are satisfied
165        let deps_satisfied = workflow
166            .edges
167            .iter()
168            .filter(|e| e.to == step.id)
169            .all(|e| {
170                ctx.step_states
171                    .get(&e.from)
172                    .map_or(false, |s| {
173                        s.lifecycle == StepLifecycle::Success
174                            || s.lifecycle == StepLifecycle::Skipped
175                    })
176            });
177
178        if deps_satisfied {
179            ready.push(step.id.clone());
180        }
181    }
182
183    ready
184}
185
186/// Check if execution is complete (all steps done).
187pub fn is_execution_complete(ctx: &ExecutionContext) -> bool {
188    ctx.step_states.values().all(|s| {
189        matches!(
190            s.lifecycle,
191            StepLifecycle::Success
192                | StepLifecycle::Failed
193                | StepLifecycle::Skipped
194                | StepLifecycle::Cancelled
195        )
196    })
197}
198
199/// Determine overall execution status from step states.
200pub fn compute_execution_status(ctx: &ExecutionContext) -> ExecutionStatus {
201    if !is_execution_complete(ctx) {
202        return ctx.status.clone();
203    }
204
205    let has_failures = ctx
206        .step_states
207        .values()
208        .any(|s| s.lifecycle == StepLifecycle::Failed);
209
210    if has_failures {
211        ExecutionStatus::Failed {
212            error: "One or more steps failed".to_string(),
213        }
214    } else {
215        ExecutionStatus::Succeeded
216    }
217}
218
219/// Build an execution fingerprint from a completed execution.
220pub fn build_fingerprint(ctx: &ExecutionContext) -> ExecutionFingerprint {
221    let step_durations: HashMap<String, u64> = ctx
222        .step_states
223        .iter()
224        .filter_map(|(id, s)| s.duration_ms.map(|d| (id.clone(), d)))
225        .collect();
226
227    let step_outcomes: HashMap<String, StepLifecycle> = ctx
228        .step_states
229        .iter()
230        .map(|(id, s)| (id.clone(), s.lifecycle.clone()))
231        .collect();
232
233    let total_duration: u64 = step_durations.values().sum();
234    let retry_count: u32 = ctx
235        .step_states
236        .values()
237        .map(|s| s.attempt.saturating_sub(1))
238        .sum();
239
240    ExecutionFingerprint {
241        execution_id: ctx.execution_id.clone(),
242        workflow_id: ctx.workflow_id.clone(),
243        total_duration_ms: total_duration,
244        step_durations,
245        step_outcomes,
246        retry_count,
247        completed_at: ctx.completed_at.unwrap_or_else(Utc::now),
248    }
249}
250
251/// Generate execution events for observability.
252pub fn emit_step_event(
253    ctx: &ExecutionContext,
254    step_id: &str,
255    event_type: ExecutionEventType,
256) -> ExecutionEvent {
257    ExecutionEvent {
258        execution_id: ctx.execution_id.clone(),
259        step_id: Some(step_id.to_string()),
260        event_type,
261        timestamp: Utc::now(),
262        data: None,
263    }
264}
265
266/// Pass outputs from completed steps to dependent steps as inputs.
267pub fn propagate_outputs(
268    workflow: &Workflow,
269    ctx: &ExecutionContext,
270    target_step_id: &str,
271) -> HashMap<String, serde_json::Value> {
272    let mut inputs = HashMap::new();
273
274    for edge in &workflow.edges {
275        if edge.to != target_step_id {
276            continue;
277        }
278
279        if let Some(state) = ctx.step_states.get(&edge.from) {
280            if let Some(output) = &state.output {
281                inputs.insert(edge.from.clone(), output.clone());
282            }
283        }
284    }
285
286    inputs
287}
288
289// Tests for dag_exec are in tests/phase8_persistence.rs