1use std::collections::HashMap;
2use std::time::Instant;
3
4use chrono::Utc;
5
6use crate::types::{
7 Edge, EdgeType, ExecutionContext, ExecutionEvent, ExecutionEventType,
8 ExecutionFingerprint, ExecutionStatus, StepLifecycle, StepState, StepType,
9 Workflow, WorkflowError, WorkflowResult,
10};
11
12#[derive(Debug, Clone)]
14pub struct StepExecutionResult {
15 pub step_id: String,
16 pub success: bool,
17 pub output: Option<serde_json::Value>,
18 pub error: Option<String>,
19 pub duration_ms: u64,
20}
21
22pub fn execute_step(
24 step_id: &str,
25 step_type: &StepType,
26 inputs: &HashMap<String, serde_json::Value>,
27 timeout_ms: Option<u64>,
28) -> StepExecutionResult {
29 let start = Instant::now();
30
31 let result = match step_type {
32 StepType::Noop => Ok(serde_json::json!({"status": "noop"})),
33
34 StepType::Expression { expression } => {
35 Ok(serde_json::json!({
36 "expression": expression,
37 "evaluated": true,
38 "inputs": inputs,
39 }))
40 }
41
42 StepType::Command { command, args } => {
43 Ok(serde_json::json!({
45 "command": command,
46 "args": args,
47 "status": "prepared",
48 "note": "Execution delegated to step runner"
49 }))
50 }
51
52 StepType::McpTool { sister, tool, params } => {
53 Ok(serde_json::json!({
54 "sister": sister,
55 "tool": tool,
56 "params": params,
57 "status": "prepared",
58 "note": "Execution delegated to MCP dispatcher"
59 }))
60 }
61
62 StepType::HttpRequest { method, url, headers, body } => {
63 Ok(serde_json::json!({
64 "method": method,
65 "url": url,
66 "headers": headers,
67 "body": body,
68 "status": "prepared",
69 "note": "Execution delegated to HTTP runner"
70 }))
71 }
72
73 StepType::SubWorkflow { workflow_id } => {
74 Ok(serde_json::json!({
75 "sub_workflow_id": workflow_id,
76 "status": "prepared",
77 "note": "Execution delegated to sub-workflow runner"
78 }))
79 }
80
81 StepType::FanOut { destinations, completion_policy } => {
82 Ok(serde_json::json!({
83 "destinations": destinations.len(),
84 "completion_policy": format!("{:?}", completion_policy),
85 "status": "prepared"
86 }))
87 }
88
89 StepType::ApprovalGate { approvers, timeout_ms } => {
90 Ok(serde_json::json!({
91 "approvers": approvers,
92 "timeout_ms": timeout_ms,
93 "status": "waiting_approval"
94 }))
95 }
96 };
97
98 let duration = start.elapsed();
99 let duration_ms = duration.as_millis() as u64;
100
101 if let Some(timeout) = timeout_ms {
103 if duration_ms > timeout {
104 return StepExecutionResult {
105 step_id: step_id.to_string(),
106 success: false,
107 output: None,
108 error: Some(format!("Step timed out after {}ms (limit: {}ms)", duration_ms, timeout)),
109 duration_ms,
110 };
111 }
112 }
113
114 match result {
115 Ok(output) => StepExecutionResult {
116 step_id: step_id.to_string(),
117 success: true,
118 output: Some(output),
119 error: None,
120 duration_ms,
121 },
122 Err(e) => StepExecutionResult {
123 step_id: step_id.to_string(),
124 success: false,
125 output: None,
126 error: Some(e),
127 duration_ms,
128 },
129 }
130}
131
132pub fn apply_step_result(
134 ctx: &mut ExecutionContext,
135 result: &StepExecutionResult,
136) {
137 if let Some(state) = ctx.step_states.get_mut(&result.step_id) {
138 state.lifecycle = if result.success {
139 StepLifecycle::Success
140 } else {
141 StepLifecycle::Failed
142 };
143 state.completed_at = Some(Utc::now());
144 state.duration_ms = Some(result.duration_ms);
145 state.output = result.output.clone();
146 state.error = result.error.clone();
147 state.attempt += 1;
148 }
149}
150
151pub fn next_ready_steps(
153 workflow: &Workflow,
154 ctx: &ExecutionContext,
155) -> Vec<String> {
156 let mut ready = Vec::new();
157
158 for step in &workflow.steps {
159 let state = ctx.step_states.get(&step.id);
160 if state.map_or(true, |s| s.lifecycle != StepLifecycle::Pending) {
161 continue; }
163
164 let deps_satisfied = workflow
166 .edges
167 .iter()
168 .filter(|e| e.to == step.id)
169 .all(|e| {
170 ctx.step_states
171 .get(&e.from)
172 .map_or(false, |s| {
173 s.lifecycle == StepLifecycle::Success
174 || s.lifecycle == StepLifecycle::Skipped
175 })
176 });
177
178 if deps_satisfied {
179 ready.push(step.id.clone());
180 }
181 }
182
183 ready
184}
185
186pub fn is_execution_complete(ctx: &ExecutionContext) -> bool {
188 ctx.step_states.values().all(|s| {
189 matches!(
190 s.lifecycle,
191 StepLifecycle::Success
192 | StepLifecycle::Failed
193 | StepLifecycle::Skipped
194 | StepLifecycle::Cancelled
195 )
196 })
197}
198
199pub fn compute_execution_status(ctx: &ExecutionContext) -> ExecutionStatus {
201 if !is_execution_complete(ctx) {
202 return ctx.status.clone();
203 }
204
205 let has_failures = ctx
206 .step_states
207 .values()
208 .any(|s| s.lifecycle == StepLifecycle::Failed);
209
210 if has_failures {
211 ExecutionStatus::Failed {
212 error: "One or more steps failed".to_string(),
213 }
214 } else {
215 ExecutionStatus::Succeeded
216 }
217}
218
219pub fn build_fingerprint(ctx: &ExecutionContext) -> ExecutionFingerprint {
221 let step_durations: HashMap<String, u64> = ctx
222 .step_states
223 .iter()
224 .filter_map(|(id, s)| s.duration_ms.map(|d| (id.clone(), d)))
225 .collect();
226
227 let step_outcomes: HashMap<String, StepLifecycle> = ctx
228 .step_states
229 .iter()
230 .map(|(id, s)| (id.clone(), s.lifecycle.clone()))
231 .collect();
232
233 let total_duration: u64 = step_durations.values().sum();
234 let retry_count: u32 = ctx
235 .step_states
236 .values()
237 .map(|s| s.attempt.saturating_sub(1))
238 .sum();
239
240 ExecutionFingerprint {
241 execution_id: ctx.execution_id.clone(),
242 workflow_id: ctx.workflow_id.clone(),
243 total_duration_ms: total_duration,
244 step_durations,
245 step_outcomes,
246 retry_count,
247 completed_at: ctx.completed_at.unwrap_or_else(Utc::now),
248 }
249}
250
251pub fn emit_step_event(
253 ctx: &ExecutionContext,
254 step_id: &str,
255 event_type: ExecutionEventType,
256) -> ExecutionEvent {
257 ExecutionEvent {
258 execution_id: ctx.execution_id.clone(),
259 step_id: Some(step_id.to_string()),
260 event_type,
261 timestamp: Utc::now(),
262 data: None,
263 }
264}
265
266pub fn propagate_outputs(
268 workflow: &Workflow,
269 ctx: &ExecutionContext,
270 target_step_id: &str,
271) -> HashMap<String, serde_json::Value> {
272 let mut inputs = HashMap::new();
273
274 for edge in &workflow.edges {
275 if edge.to != target_step_id {
276 continue;
277 }
278
279 if let Some(state) = ctx.step_states.get(&edge.from) {
280 if let Some(output) = &state.output {
281 inputs.insert(edge.from.clone(), output.clone());
282 }
283 }
284 }
285
286 inputs
287}
288
289