Skip to main content

distri_workflow/
executor.rs

1//! Workflow executor — runs steps sequentially or in parallel.
2//!
3//! Operates on `WorkflowRun` (a definition + runtime state). Template
4//! fields read from `run.definition.steps[i]`; runtime mutations apply
5//! to `run.step_runs[i]`.
6
7use crate::resolve;
8use crate::store::WorkflowStateStore;
9use crate::types::*;
10
11/// Receives workflow events during execution.
12#[async_trait::async_trait]
13pub trait EventSink: Send + Sync {
14    async fn emit(&self, event: WorkflowEvent);
15}
16
17/// Event sink that logs to tracing.
18pub struct TracingEventSink;
19
20#[async_trait::async_trait]
21impl EventSink for TracingEventSink {
22    async fn emit(&self, event: WorkflowEvent) {
23        match &event {
24            WorkflowEvent::WorkflowStarted {
25                workflow_id,
26                total_steps,
27            } => {
28                tracing::info!(%workflow_id, total_steps, "workflow started");
29            }
30            WorkflowEvent::StepStarted {
31                step_id,
32                step_label,
33                ..
34            } => {
35                tracing::info!(%step_id, %step_label, "step started");
36            }
37            WorkflowEvent::StepCompleted {
38                step_id,
39                step_label,
40                ..
41            } => {
42                tracing::info!(%step_id, %step_label, "step completed");
43            }
44            WorkflowEvent::StepFailed {
45                step_id,
46                step_label,
47                error,
48                ..
49            } => {
50                tracing::error!(%step_id, %step_label, %error, "step failed");
51            }
52            WorkflowEvent::WorkflowCompleted {
53                workflow_id,
54                status,
55                steps_done,
56                steps_failed,
57            } => {
58                tracing::info!(%workflow_id, ?status, steps_done, steps_failed, "workflow completed");
59            }
60            WorkflowEvent::StepWaiting {
61                step_id,
62                step_label,
63                message,
64                ..
65            } => {
66                tracing::info!(%step_id, %step_label, %message, "step waiting for input");
67            }
68        }
69    }
70}
71
72/// No-op event sink (for testing).
73pub struct NoopEventSink;
74
75#[async_trait::async_trait]
76impl EventSink for NoopEventSink {
77    async fn emit(&self, _event: WorkflowEvent) {}
78}
79
80/// Execute a single workflow step.
81#[async_trait::async_trait]
82pub trait StepExecutor: Send + Sync {
83    /// Execute one step given the step definition and workflow context.
84    async fn execute(
85        &self,
86        step: &WorkflowStep,
87        context: &serde_json::Value,
88    ) -> Result<StepResult, String>;
89
90    /// Check if this executor can satisfy a requirement.
91    /// Default: true (backward compatible — accepts everything).
92    fn supports(&self, _requirement: &StepRequirement) -> bool {
93        true
94    }
95
96    /// Informational: list skills this executor provides.
97    /// Used by UI for introspection, not for execution gating.
98    fn available_skills(&self) -> Vec<StepRequirement> {
99        vec![]
100    }
101}
102
103/// Runs workflows step by step, handling sequential and parallel execution.
104pub struct WorkflowRunner<S: WorkflowStateStore, E: StepExecutor, K: EventSink = NoopEventSink> {
105    pub store: S,
106    pub executor: E,
107    pub events: K,
108}
109
110impl<S: WorkflowStateStore, E: StepExecutor> WorkflowRunner<S, E, NoopEventSink> {
111    pub fn new(store: S, executor: E) -> Self {
112        Self {
113            store,
114            executor,
115            events: NoopEventSink,
116        }
117    }
118}
119
120impl<S: WorkflowStateStore, E: StepExecutor, K: EventSink> WorkflowRunner<S, E, K> {
121    pub fn with_events(store: S, executor: E, events: K) -> Self {
122        Self {
123            store,
124            executor,
125            events,
126        }
127    }
128
129    /// Check which requirements are unmet for a step.
130    fn unmet_requirements<'a>(&self, step: &'a WorkflowStep) -> Vec<&'a StepRequirement> {
131        step.requires
132            .iter()
133            .filter(|r| !self.executor.supports(r))
134            .collect()
135    }
136
137    /// Run the next runnable step(s). Handles both sequential and parallel.
138    /// Returns the results of executed steps.
139    pub async fn run_next(&self, workflow_id: &str) -> Result<Vec<(String, StepResult)>, String> {
140        let mut run = self
141            .store
142            .load(workflow_id)
143            .await?
144            .ok_or("Workflow not found")?;
145
146        if run.is_complete() {
147            run.status = WorkflowStatus::Completed;
148            self.store.save(&run).await?;
149            return Ok(vec![]);
150        }
151
152        if run.has_failed() {
153            return Err("Workflow has failed steps".into());
154        }
155
156        // Evaluate skip_if conditions on pending steps
157        let mut skipped_any = false;
158        for i in 0..run.definition.steps.len() {
159            if run.step_runs[i].status != StepStatus::Pending {
160                continue;
161            }
162            let skip_expr = run.definition.steps[i].skip_if.clone();
163            if let Some(skip_expr) = skip_expr {
164                if resolve::evaluate_skip_condition(&skip_expr, &run.context) {
165                    let step_id = run.definition.steps[i].id.clone();
166                    run.step_runs[i].status = StepStatus::Skipped;
167                    run.step_runs[i].completed_at = Some(chrono::Utc::now());
168                    run.add_note(&step_id, "Skipped by skip_if condition");
169                    skipped_any = true;
170                }
171            }
172        }
173        if skipped_any {
174            self.store.save(&run).await?;
175        }
176
177        // Collect runnable step info
178        let runnable: Vec<(usize, String, StepExecution, WorkflowStep)> = run
179            .runnable_steps()
180            .into_iter()
181            .map(|(i, s)| (i, s.id.clone(), s.execution, s.clone()))
182            .collect();
183
184        if runnable.is_empty() {
185            // Check if we're stuck (all remaining are blocked or depend on blocked)
186            if run.is_stuck() {
187                run.status = WorkflowStatus::Blocked;
188                self.store.save(&run).await?;
189                return Ok(vec![]);
190            }
191            return Err("No runnable steps (all blocked by dependencies)".into());
192        }
193
194        // Check requirements and separate blocked from executable
195        let mut blocked_indices = vec![];
196        let mut executable = vec![];
197
198        for (idx, step_id, exec, step) in runnable {
199            let unmet = self.unmet_requirements(&step);
200            if !unmet.is_empty() {
201                let missing: Vec<String> = unmet.iter().map(|r| r.skill.clone()).collect();
202                blocked_indices.push((idx, missing));
203            } else {
204                executable.push((idx, step_id, exec, step));
205            }
206        }
207
208        // Mark blocked steps
209        for (idx, missing) in &blocked_indices {
210            run.step_runs[*idx].status = StepStatus::Blocked;
211            run.step_runs[*idx].error = Some(format!("Missing skills: {}", missing.join(", ")));
212        }
213
214        if !blocked_indices.is_empty() {
215            self.store.save(&run).await?;
216        }
217
218        if executable.is_empty() {
219            // All runnable steps were blocked
220            if run.is_stuck() {
221                run.status = WorkflowStatus::Blocked;
222                self.store.save(&run).await?;
223            }
224            return Ok(vec![]);
225        }
226
227        // Filter out WaitForInput steps from parallel — they always pause
228        let (wait_steps, non_wait): (Vec<_>, Vec<_>) = executable
229            .into_iter()
230            .partition(|(_, _, _, step)| matches!(step.kind, StepKind::WaitForInput { .. }));
231
232        // If any WaitForInput step is runnable, pause on the first one
233        if !wait_steps.is_empty() {
234            let (idx, step_id, _, step) = &wait_steps[0];
235            let (message, schema) = match &step.kind {
236                StepKind::WaitForInput { message, schema } => (message.clone(), schema.clone()),
237                _ => unreachable!(),
238            };
239            run.step_runs[*idx].status = StepStatus::WaitingForInput;
240            run.step_runs[*idx].started_at = Some(chrono::Utc::now());
241            run.status = WorkflowStatus::Paused;
242            run.current_step = *idx;
243            self.store.save(&run).await?;
244            self.events
245                .emit(WorkflowEvent::StepWaiting {
246                    workflow_id: workflow_id.to_string(),
247                    step_id: step_id.clone(),
248                    step_label: step.label.clone(),
249                    message,
250                    schema,
251                })
252                .await;
253            return Ok(vec![]);
254        }
255
256        let (parallel, sequential): (Vec<_>, Vec<_>) = non_wait
257            .into_iter()
258            .partition(|(_, _, exec, _)| *exec == StepExecution::Parallel);
259
260        let mut results = vec![];
261
262        // Run parallel steps
263        if !parallel.is_empty() {
264            for (idx, _, _, _) in &parallel {
265                run.step_runs[*idx].status = StepStatus::Running;
266                run.step_runs[*idx].started_at = Some(chrono::Utc::now());
267            }
268            run.status = WorkflowStatus::Running;
269            self.store.save(&run).await?;
270
271            for (idx, step_id, _, step) in &parallel {
272                self.events
273                    .emit(WorkflowEvent::StepStarted {
274                        workflow_id: workflow_id.to_string(),
275                        step_id: step_id.clone(),
276                        step_label: step.label.clone(),
277                    })
278                    .await;
279
280                let step_context = resolve::resolve_step_input(step.input.as_ref(), &run.context);
281                let result = self.executor.execute(step, &step_context).await;
282                match result {
283                    Ok(r) if r.status == StepStatus::Failed => {
284                        let error = r.error.clone().unwrap_or_else(|| "Step failed".to_string());
285                        self.events
286                            .emit(WorkflowEvent::StepFailed {
287                                workflow_id: workflow_id.to_string(),
288                                step_id: step_id.clone(),
289                                step_label: step.label.clone(),
290                                error: error.clone(),
291                            })
292                            .await;
293                        self.store.commit_step(workflow_id, *idx, r.clone()).await?;
294                        results.push((step_id.clone(), r));
295                    }
296                    Ok(r) => {
297                        self.events
298                            .emit(WorkflowEvent::StepCompleted {
299                                workflow_id: workflow_id.to_string(),
300                                step_id: step_id.clone(),
301                                step_label: step.label.clone(),
302                                result: r.result.clone(),
303                            })
304                            .await;
305                        self.store.commit_step(workflow_id, *idx, r.clone()).await?;
306                        results.push((step_id.clone(), r));
307                    }
308                    Err(e) => {
309                        self.events
310                            .emit(WorkflowEvent::StepFailed {
311                                workflow_id: workflow_id.to_string(),
312                                step_id: step_id.clone(),
313                                step_label: step.label.clone(),
314                                error: e.clone(),
315                            })
316                            .await;
317                        let failed = StepResult::failed(&e);
318                        self.store
319                            .commit_step(workflow_id, *idx, failed.clone())
320                            .await?;
321                        results.push((step_id.clone(), failed));
322                    }
323                }
324            }
325        }
326
327        // Run first sequential step
328        if !sequential.is_empty() && parallel.is_empty() {
329            let (idx, step_id, _, step) = &sequential[0];
330
331            run.step_runs[*idx].status = StepStatus::Running;
332            run.step_runs[*idx].started_at = Some(chrono::Utc::now());
333            run.status = WorkflowStatus::Running;
334            run.current_step = *idx;
335            self.store.save(&run).await?;
336
337            self.events
338                .emit(WorkflowEvent::StepStarted {
339                    workflow_id: workflow_id.to_string(),
340                    step_id: step_id.clone(),
341                    step_label: step.label.clone(),
342                })
343                .await;
344
345            let step_context = resolve::resolve_step_input(step.input.as_ref(), &run.context);
346            let result = self.executor.execute(step, &step_context).await;
347            match result {
348                Ok(r) if r.status == StepStatus::Failed => {
349                    let error = r.error.clone().unwrap_or_else(|| "Step failed".to_string());
350                    self.events
351                        .emit(WorkflowEvent::StepFailed {
352                            workflow_id: workflow_id.to_string(),
353                            step_id: step_id.clone(),
354                            step_label: step.label.clone(),
355                            error: error.clone(),
356                        })
357                        .await;
358                    self.store.commit_step(workflow_id, *idx, r.clone()).await?;
359                    results.push((step_id.clone(), r));
360                }
361                Ok(r) => {
362                    self.events
363                        .emit(WorkflowEvent::StepCompleted {
364                            workflow_id: workflow_id.to_string(),
365                            step_id: step_id.clone(),
366                            step_label: step.label.clone(),
367                            result: r.result.clone(),
368                        })
369                        .await;
370                    self.store.commit_step(workflow_id, *idx, r.clone()).await?;
371                    results.push((step_id.clone(), r));
372                }
373                Err(e) => {
374                    self.events
375                        .emit(WorkflowEvent::StepFailed {
376                            workflow_id: workflow_id.to_string(),
377                            step_id: step_id.clone(),
378                            step_label: step.label.clone(),
379                            error: e.clone(),
380                        })
381                        .await;
382                    let failed = StepResult::failed(&e);
383                    self.store
384                        .commit_step(workflow_id, *idx, failed.clone())
385                        .await?;
386                    results.push((step_id.clone(), failed));
387                }
388            }
389        }
390
391        // Check if workflow is now complete
392        let run = self.store.load(workflow_id).await?.unwrap();
393        if run.is_complete() {
394            let mut w = run;
395            if w.is_stuck() || w.step_runs.iter().any(|s| s.status == StepStatus::Blocked) {
396                w.status = WorkflowStatus::Blocked;
397            } else {
398                w.status = WorkflowStatus::Completed;
399            }
400            self.store.save(&w).await?;
401        }
402
403        Ok(results)
404    }
405
406    /// Run all steps until completion, failure, blocked, or pause.
407    pub async fn run_all(&self, workflow_id: &str) -> Result<WorkflowStatus, String> {
408        // Validate DAG before executing
409        let run = self
410            .store
411            .load(workflow_id)
412            .await?
413            .ok_or("Workflow not found")?;
414        run.detect_cycles()?;
415
416        // Emit workflow started
417        self.events
418            .emit(WorkflowEvent::WorkflowStarted {
419                workflow_id: workflow_id.to_string(),
420                total_steps: run.definition.steps.len(),
421            })
422            .await;
423
424        loop {
425            let run = self
426                .store
427                .load(workflow_id)
428                .await?
429                .ok_or("Workflow not found")?;
430
431            match run.status {
432                WorkflowStatus::Completed | WorkflowStatus::Failed | WorkflowStatus::Blocked => {
433                    let done = run
434                        .step_runs
435                        .iter()
436                        .filter(|s| s.status == StepStatus::Done)
437                        .count();
438                    let failed = run
439                        .step_runs
440                        .iter()
441                        .filter(|s| s.status == StepStatus::Failed)
442                        .count();
443                    self.events
444                        .emit(WorkflowEvent::WorkflowCompleted {
445                            workflow_id: workflow_id.to_string(),
446                            status: run.status,
447                            steps_done: done,
448                            steps_failed: failed,
449                        })
450                        .await;
451                    return Ok(run.status);
452                }
453                // Paused = waiting for human input. Return without emitting completed.
454                WorkflowStatus::Paused => {
455                    return Ok(WorkflowStatus::Paused);
456                }
457                _ => {}
458            }
459
460            if run.is_complete() {
461                self.events
462                    .emit(WorkflowEvent::WorkflowCompleted {
463                        workflow_id: workflow_id.to_string(),
464                        status: WorkflowStatus::Completed,
465                        steps_done: run.definition.steps.len(),
466                        steps_failed: 0,
467                    })
468                    .await;
469                return Ok(WorkflowStatus::Completed);
470            }
471
472            let results = self.run_next(workflow_id).await?;
473
474            if results.iter().any(|(_, r)| r.status == StepStatus::Failed) {
475                let mut w = self.store.load(workflow_id).await?.unwrap();
476                w.status = WorkflowStatus::Failed;
477                self.store.save(&w).await?;
478                let done = w
479                    .step_runs
480                    .iter()
481                    .filter(|s| s.status == StepStatus::Done)
482                    .count();
483                let failed = w
484                    .step_runs
485                    .iter()
486                    .filter(|s| s.status == StepStatus::Failed)
487                    .count();
488                self.events
489                    .emit(WorkflowEvent::WorkflowCompleted {
490                        workflow_id: workflow_id.to_string(),
491                        status: WorkflowStatus::Failed,
492                        steps_done: done,
493                        steps_failed: failed,
494                    })
495                    .await;
496                return Ok(WorkflowStatus::Failed);
497            }
498
499            if results.is_empty() {
500                let w = self.store.load(workflow_id).await?.unwrap();
501                return Ok(w.status);
502            }
503        }
504    }
505
506    /// Resume a paused workflow by providing input for the waiting step.
507    /// After providing the input, continues running all remaining steps.
508    pub async fn resume(
509        &self,
510        workflow_id: &str,
511        step_id: &str,
512        input: serde_json::Value,
513    ) -> Result<WorkflowStatus, String> {
514        let mut run = self
515            .store
516            .load(workflow_id)
517            .await?
518            .ok_or("Workflow not found")?;
519
520        if run.status != WorkflowStatus::Paused {
521            return Err(format!("Workflow is not paused (status: {:?})", run.status));
522        }
523
524        run.resume_step(step_id, input)?;
525        self.store.save(&run).await?;
526
527        // Continue running remaining steps
528        self.run_all(workflow_id).await
529    }
530
531    /// Get current workflow run state.
532    pub async fn get_state(&self, workflow_id: &str) -> Result<Option<WorkflowRun>, String> {
533        self.store.load(workflow_id).await
534    }
535}