Skip to main content

distri_workflow/
types.rs

1//! Core types for the workflow engine.
2//!
3//! Two layers:
4//!
5//! - **Definition** (`WorkflowDefinition`, `WorkflowStep`): the static
6//!   template — what this workflow IS. Stored alongside the agent
7//!   config; never mutated by execution.
8//! - **Run** (`WorkflowRun`, `WorkflowStepRun`): runtime state for one
9//!   execution — status, current step pointer, shared context, per-step
10//!   result/error/timestamps. Built from a definition via
11//!   `WorkflowRun::new(definition)` and then mutated by the engine.
12//!
13//! Status uses the canonical `distri_types::TaskStatus` everywhere.
14//! Workflow-specific concepts that don't have a 1:1 TaskStatus value
15//! map as follows:
16//!
17//! | concept | TaskStatus | extra signal |
18//! |---|---|---|
19//! | step waiting for input / workflow paused | InputRequired | — |
20//! | step skipped (skip_if / entry-point) | Canceled | note appended |
21//! | step blocked (missing requirement) | Failed | error explains |
22//!
23//! Phase 2b will replace `WorkflowStateStore` with the cloud
24//! `TaskStore` + a `workflow_step_executions` sidecar so runs flow
25//! through the canonical task tree.
26
27use chrono::{DateTime, Utc};
28use distri_types::TaskStatus;
29use serde::{Deserialize, Serialize};
30use std::collections::HashMap;
31
32// ============================================================================
33// Workflow Definition (template)
34// ============================================================================
35
36/// A workflow is a DAG of steps. The definition is the *template* — no
37/// runtime state lives here. Use `WorkflowRun::new(definition)` to start
38/// an execution.
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct WorkflowDefinition {
41    pub id: String,
42    pub steps: Vec<WorkflowStep>,
43    /// JSON Schema describing required inputs for this workflow.
44    #[serde(default, skip_serializing_if = "Option::is_none")]
45    pub input_schema: Option<serde_json::Value>,
46    /// How workflow state is checkpointed between steps.
47    #[serde(default)]
48    pub checkpoint: CheckpointStrategy,
49    /// Named entry points for multi-entry workflows.
50    #[serde(default, skip_serializing_if = "Vec::is_empty")]
51    pub entry_points: Vec<EntryPoint>,
52}
53
54/// Built-in channel commands a workflow may not shadow. Mirrors
55/// `distri-gateway` `ChannelCommand::parse`.
56pub const BUILTIN_CHANNEL_COMMANDS: &[&str] = &[
57    "/start",
58    "/stop",
59    "/disconnect",
60    "/reset",
61    "/new",
62    "/newsession",
63    "/newthread",
64    "/status",
65    "/debug",
66    "/verbose",
67    "/help",
68    "/switch",
69    "/workspace",
70    "/context",
71    "/ctx",
72];
73
74impl WorkflowDefinition {
75    pub fn new(steps: Vec<WorkflowStep>) -> Self {
76        Self {
77            id: uuid::Uuid::new_v4().to_string(),
78            steps,
79            input_schema: None,
80            checkpoint: CheckpointStrategy::default(),
81            entry_points: vec![],
82        }
83    }
84
85    pub fn with_id(mut self, id: &str) -> Self {
86        self.id = id.to_string();
87        self
88    }
89
90    pub fn with_checkpoint(mut self, strategy: CheckpointStrategy) -> Self {
91        self.checkpoint = strategy;
92        self
93    }
94
95    pub fn with_entry_points(mut self, entry_points: Vec<EntryPoint>) -> Self {
96        self.entry_points = entry_points;
97        self
98    }
99
100    /// Get an entry point by ID.
101    pub fn entry_point(&self, id: &str) -> Option<&EntryPoint> {
102        self.entry_points.iter().find(|ep| ep.id == id)
103    }
104
105    /// Find all step IDs reachable from the given step (inclusive) by following
106    /// depends_on forward. Used by entry-point logic to mark unreachable steps
107    /// as skipped at run start.
108    pub fn reachable_from(&self, start_step_id: &str) -> std::collections::HashSet<String> {
109        use std::collections::{HashSet, VecDeque};
110
111        let mut reachable = HashSet::new();
112        let mut queue = VecDeque::new();
113        queue.push_back(start_step_id.to_string());
114
115        while let Some(current) = queue.pop_front() {
116            if !reachable.insert(current.clone()) {
117                continue;
118            }
119            for step in &self.steps {
120                if step.depends_on.contains(&current) && !reachable.contains(&step.id) {
121                    queue.push_back(step.id.clone());
122                }
123            }
124        }
125
126        reachable
127    }
128
129    /// Validate the channel-command surface declared by entry-point
130    /// triggers. Returns a precise error string on the first problem.
131    pub fn validate_channel_surface(&self) -> Result<(), String> {
132        use distri_types::channel_commands::ChannelTrigger;
133        use std::collections::HashSet;
134
135        let step_ids: HashSet<&str> = self.steps.iter().map(|s| s.id.as_str()).collect();
136        let mut slash_names: HashSet<String> = HashSet::new();
137        let mut callback_ids: HashSet<String> = HashSet::new();
138        let mut message_count = 0usize;
139
140        for ep in &self.entry_points {
141            if !step_ids.contains(ep.starts_at.as_str()) {
142                return Err(format!(
143                    "entry point '{}' starts_at unknown step '{}'",
144                    ep.id, ep.starts_at
145                ));
146            }
147            let Some(trigger) = &ep.trigger else { continue };
148            match trigger {
149                ChannelTrigger::Slash { name, aliases, .. } => {
150                    for n in std::iter::once(name).chain(aliases.iter()) {
151                        let lower = n.to_lowercase();
152                        if BUILTIN_CHANNEL_COMMANDS.contains(&lower.as_str()) {
153                            return Err(format!("slash command '{n}' shadows a built-in command"));
154                        }
155                        if !slash_names.insert(lower.clone()) {
156                            return Err(format!(
157                                "entry point '{}': slash command '{}' is already declared",
158                                ep.id, n
159                            ));
160                        }
161                    }
162                }
163                ChannelTrigger::Callback { id, .. } => {
164                    // Note: cross-validating a Reply step's ReplyButtonSpec::Callback.callback_data
165                    // against declared callback ids is intentionally NOT done in v1 (out of plan scope).
166                    if !callback_ids.insert(id.clone()) {
167                        return Err(format!(
168                            "entry point '{}': callback id '{}' is already declared",
169                            ep.id, id
170                        ));
171                    }
172                }
173                ChannelTrigger::Message {} => message_count += 1,
174            }
175        }
176        if message_count > 1 {
177            return Err(format!(
178                "workflow declares {message_count} message catch-all entry \
179                 points; at most one is allowed"
180            ));
181        }
182        for step in &self.steps {
183            if let StepKind::Reply {
184                buttons_from,
185                button_template,
186                ..
187            } = &step.kind
188            {
189                if button_template.is_some() != buttons_from.is_some() {
190                    return Err(format!(
191                        "reply step '{}': button_template and buttons_from \
192                         must be set together",
193                        step.id
194                    ));
195                }
196            }
197        }
198        Ok(())
199    }
200
201    /// Detect circular dependencies in the workflow DAG.
202    /// Returns `Err` with the cycle description if found.
203    pub fn detect_cycles(&self) -> Result<(), String> {
204        use std::collections::{HashMap, HashSet};
205
206        let step_ids: HashSet<&str> = self.steps.iter().map(|s| s.id.as_str()).collect();
207        let mut adj: HashMap<&str, Vec<&str>> = HashMap::new();
208        for step in &self.steps {
209            adj.insert(
210                step.id.as_str(),
211                step.depends_on.iter().map(|s| s.as_str()).collect(),
212            );
213        }
214
215        let mut visited = HashSet::new();
216        let mut in_stack = HashSet::new();
217
218        fn dfs<'a>(
219            node: &'a str,
220            adj: &HashMap<&'a str, Vec<&'a str>>,
221            visited: &mut HashSet<&'a str>,
222            in_stack: &mut HashSet<&'a str>,
223            path: &mut Vec<&'a str>,
224        ) -> Result<(), String> {
225            visited.insert(node);
226            in_stack.insert(node);
227            path.push(node);
228
229            if let Some(deps) = adj.get(node) {
230                for &dep in deps {
231                    if !visited.contains(dep) {
232                        dfs(dep, adj, visited, in_stack, path)?;
233                    } else if in_stack.contains(dep) {
234                        let cycle_start = path.iter().position(|&n| n == dep).unwrap();
235                        let cycle: Vec<&str> = path[cycle_start..].to_vec();
236                        return Err(format!(
237                            "Circular dependency detected: {} → {}",
238                            cycle.join(" → "),
239                            dep
240                        ));
241                    }
242                }
243            }
244
245            in_stack.remove(node);
246            path.pop();
247            Ok(())
248        }
249
250        let mut path = Vec::new();
251        for step in &self.steps {
252            if !visited.contains(step.id.as_str()) {
253                dfs(
254                    step.id.as_str(),
255                    &adj,
256                    &mut visited,
257                    &mut in_stack,
258                    &mut path,
259                )?;
260            }
261        }
262
263        // Also check for references to non-existent steps
264        for step in &self.steps {
265            for dep in &step.depends_on {
266                if !step_ids.contains(dep.as_str()) {
267                    return Err(format!(
268                        "Step '{}' depends on '{}' which does not exist",
269                        step.id, dep
270                    ));
271                }
272            }
273        }
274
275        Ok(())
276    }
277}
278
279// ============================================================================
280// Workflow Run (execution state)
281// ============================================================================
282
283fn default_empty_object() -> serde_json::Value {
284    serde_json::json!({})
285}
286
287fn default_now() -> DateTime<Utc> {
288    Utc::now()
289}
290
291/// One execution of a `WorkflowDefinition`. Owns the definition plus all
292/// runtime state — status, shared context, per-step status / result /
293/// error / timestamps. The engine mutates a `WorkflowRun`.
294///
295/// `step_runs` is parallel to `definition.steps` (same length, same
296/// order).
297///
298/// **Wire shape**: `definition` is flattened, so a `WorkflowRun`
299/// serializes as one flat JSON object with the definition fields
300/// (`id`, `steps`, `input_schema`, …) alongside the runtime fields
301/// (`status`, `current_step`, `context`, `step_runs`, …). This
302/// keeps the persisted run row identical to the legacy monolithic
303/// shape.
304#[derive(Debug, Clone, Serialize, Deserialize)]
305pub struct WorkflowRun {
306    #[serde(flatten)]
307    pub definition: WorkflowDefinition,
308    #[serde(default)]
309    pub status: WorkflowStatus,
310    #[serde(default)]
311    pub current_step: usize,
312    #[serde(default = "default_empty_object")]
313    pub context: serde_json::Value,
314    #[serde(default)]
315    pub notes: Vec<WorkflowNote>,
316    #[serde(default)]
317    pub step_runs: Vec<WorkflowStepRun>,
318    #[serde(default = "default_now")]
319    pub created_at: DateTime<Utc>,
320    #[serde(default = "default_now")]
321    pub updated_at: DateTime<Utc>,
322}
323
324/// Per-step runtime state. Parallel to `WorkflowDefinition.steps[i]`.
325#[derive(Debug, Clone, Serialize, Deserialize, Default)]
326pub struct WorkflowStepRun {
327    pub step_id: String,
328    #[serde(default)]
329    pub status: StepStatus,
330    #[serde(default, skip_serializing_if = "Option::is_none")]
331    pub result: Option<serde_json::Value>,
332    #[serde(default, skip_serializing_if = "Option::is_none")]
333    pub error: Option<String>,
334    #[serde(default, skip_serializing_if = "Option::is_none")]
335    pub started_at: Option<DateTime<Utc>>,
336    #[serde(default, skip_serializing_if = "Option::is_none")]
337    pub completed_at: Option<DateTime<Utc>>,
338}
339
340impl WorkflowRun {
341    /// Build a fresh run from a definition. All step_runs start `Pending`,
342    /// context is `{}`, status is `Pending`.
343    pub fn new(definition: WorkflowDefinition) -> Self {
344        let step_runs = definition
345            .steps
346            .iter()
347            .map(|s| WorkflowStepRun {
348                step_id: s.id.clone(),
349                ..Default::default()
350            })
351            .collect();
352        Self {
353            definition,
354            status: WorkflowStatus::Pending,
355            current_step: 0,
356            context: serde_json::json!({}),
357            notes: vec![],
358            step_runs,
359            created_at: Utc::now(),
360            updated_at: Utc::now(),
361        }
362    }
363
364    /// Convenience for callers that want to skip the explicit
365    /// definition struct (mostly tests).
366    pub fn from_steps(steps: Vec<WorkflowStep>) -> Self {
367        Self::new(WorkflowDefinition::new(steps))
368    }
369
370    pub fn with_context(mut self, context: serde_json::Value) -> Self {
371        self.context = context;
372        self
373    }
374
375    pub fn with_id(mut self, id: &str) -> Self {
376        self.definition.id = id.to_string();
377        self
378    }
379
380    pub fn with_checkpoint(mut self, strategy: CheckpointStrategy) -> Self {
381        self.definition.checkpoint = strategy;
382        self
383    }
384
385    pub fn with_entry_points(mut self, entry_points: Vec<EntryPoint>) -> Self {
386        self.definition.entry_points = entry_points;
387        self
388    }
389
390    pub fn id(&self) -> &str {
391        &self.definition.id
392    }
393
394    pub fn steps(&self) -> &[WorkflowStep] {
395        &self.definition.steps
396    }
397
398    pub fn step(&self, idx: usize) -> &WorkflowStep {
399        &self.definition.steps[idx]
400    }
401
402    pub fn step_run(&self, idx: usize) -> &WorkflowStepRun {
403        &self.step_runs[idx]
404    }
405
406    pub fn step_run_mut(&mut self, idx: usize) -> &mut WorkflowStepRun {
407        &mut self.step_runs[idx]
408    }
409
410    pub fn step_run_by_id(&self, step_id: &str) -> Option<&WorkflowStepRun> {
411        self.step_runs.iter().find(|s| s.step_id == step_id)
412    }
413
414    pub fn step_run_by_id_mut(&mut self, step_id: &str) -> Option<&mut WorkflowStepRun> {
415        self.step_runs.iter_mut().find(|s| s.step_id == step_id)
416    }
417
418    /// Apply an entry point: mark steps not reachable from `starts_at` as
419    /// Skipped, pre-populate their results from `preset_results`, and
420    /// merge those into context so downstream `{steps.X}` references work.
421    pub fn apply_entry_point(mut self, entry_point_id: &str) -> Result<Self, String> {
422        let ep = self
423            .definition
424            .entry_points
425            .iter()
426            .find(|ep| ep.id == entry_point_id)
427            .ok_or_else(|| format!("Entry point '{}' not found", entry_point_id))?
428            .clone();
429
430        if !self.definition.steps.iter().any(|s| s.id == ep.starts_at) {
431            return Err(format!(
432                "Entry point '{}' references step '{}' which does not exist",
433                entry_point_id, ep.starts_at
434            ));
435        }
436
437        let reachable = self.definition.reachable_from(&ep.starts_at);
438
439        for (i, step) in self.definition.steps.iter().enumerate() {
440            if !reachable.contains(&step.id) {
441                self.step_runs[i].status = StepStatus::Skipped;
442                if let Some(result) = ep.preset_results.get(&step.id) {
443                    self.step_runs[i].result = Some(result.clone());
444                }
445            }
446        }
447
448        if let Some(ctx) = self.context.as_object_mut() {
449            let steps = ctx
450                .entry("steps")
451                .or_insert(serde_json::json!({}))
452                .as_object_mut()
453                .expect("steps must be an object");
454            for (step_id, result) in &ep.preset_results {
455                steps.insert(step_id.clone(), result.clone());
456            }
457        }
458
459        Ok(self)
460    }
461
462    /// Initialize the run with validated input. Input is validated
463    /// against `definition.input_schema` if present, then merged into
464    /// `context`. Status flips to Running.
465    pub fn with_input(mut self, input: serde_json::Value) -> Result<Self, String> {
466        if let Some(ref schema_value) = self.definition.input_schema {
467            let validator = jsonschema::validator_for(schema_value)
468                .map_err(|e| format!("Invalid input_schema: {e}"))?;
469
470            if !validator.is_valid(&input) {
471                let errors: Vec<String> = validator
472                    .iter_errors(&input)
473                    .map(|e| format!("{}", e))
474                    .collect();
475                return Err(format!("Input validation failed: {}", errors.join("; ")));
476            }
477        }
478
479        if let (Some(ctx), Some(inp)) = (self.context.as_object_mut(), input.as_object()) {
480            for (k, v) in inp {
481                ctx.insert(k.clone(), v.clone());
482            }
483            ctx.insert("input".to_string(), input.clone());
484        }
485
486        self.status = WorkflowStatus::Running;
487        self.updated_at = Utc::now();
488        Ok(self)
489    }
490
491    /// First pending step, if any.
492    pub fn next_pending_step(&self) -> Option<(usize, &WorkflowStep)> {
493        self.step_runs
494            .iter()
495            .enumerate()
496            .find(|(_, s)| s.status == StepStatus::Pending)
497            .map(|(i, _)| (i, &self.definition.steps[i]))
498    }
499
500    /// All steps that can run now: pending + all dependencies completed.
501    /// Pure query — does not mutate.
502    pub fn runnable_steps(&self) -> Vec<(usize, &WorkflowStep)> {
503        let mut runnable = vec![];
504        for (i, step) in self.definition.steps.iter().enumerate() {
505            if self.step_runs[i].status != StepStatus::Pending {
506                continue;
507            }
508            let deps_met = step.depends_on.iter().all(|dep_id| {
509                self.definition
510                    .steps
511                    .iter()
512                    .zip(self.step_runs.iter())
513                    .any(|(s, sr)| {
514                        &s.id == dep_id
515                            && matches!(sr.status, StepStatus::Done | StepStatus::Skipped)
516                    })
517            });
518            if deps_met {
519                runnable.push((i, step));
520            }
521        }
522        runnable
523    }
524
525    pub fn is_complete(&self) -> bool {
526        self.step_runs.iter().all(|s| {
527            matches!(
528                s.status,
529                StepStatus::Done | StepStatus::Skipped | StepStatus::Blocked
530            )
531        })
532    }
533
534    pub fn is_waiting_for_input(&self) -> bool {
535        self.step_runs
536            .iter()
537            .any(|s| s.status == StepStatus::WaitingForInput)
538    }
539
540    pub fn waiting_step(&self) -> Option<(usize, &WorkflowStep)> {
541        self.step_runs
542            .iter()
543            .enumerate()
544            .find(|(_, s)| s.status == StepStatus::WaitingForInput)
545            .map(|(i, _)| (i, &self.definition.steps[i]))
546    }
547
548    /// Resume a paused run by providing input for the waiting step.
549    pub fn resume_step(
550        &mut self,
551        step_id: &str,
552        result: serde_json::Value,
553    ) -> Result<usize, String> {
554        let idx = self
555            .step_runs
556            .iter()
557            .position(|s| s.step_id == step_id && s.status == StepStatus::WaitingForInput)
558            .ok_or_else(|| {
559                format!(
560                    "Step '{}' not found or not in waiting_for_input state",
561                    step_id
562                )
563            })?;
564
565        self.step_runs[idx].status = StepStatus::Done;
566        self.step_runs[idx].result = Some(result.clone());
567        self.step_runs[idx].completed_at = Some(Utc::now());
568
569        if let Some(ctx) = self.context.as_object_mut() {
570            let steps = ctx
571                .entry("steps")
572                .or_insert(serde_json::json!({}))
573                .as_object_mut()
574                .expect("steps must be an object");
575            steps.insert(step_id.to_string(), result);
576        }
577
578        self.status = WorkflowStatus::Running;
579        self.updated_at = Utc::now();
580        Ok(idx)
581    }
582
583    /// Stuck: blocked steps, nothing running, no path forward.
584    pub fn is_stuck(&self) -> bool {
585        let has_blocked = self
586            .step_runs
587            .iter()
588            .any(|s| s.status == StepStatus::Blocked);
589        let has_pending = self
590            .step_runs
591            .iter()
592            .any(|s| s.status == StepStatus::Pending);
593        let has_running = self
594            .step_runs
595            .iter()
596            .any(|s| s.status == StepStatus::Running);
597
598        if !has_blocked || has_running {
599            return false;
600        }
601
602        if !has_pending {
603            return true;
604        }
605
606        !self
607            .definition
608            .steps
609            .iter()
610            .zip(self.step_runs.iter())
611            .any(|(step, run)| {
612                run.status == StepStatus::Pending
613                    && step.depends_on.iter().all(|dep_id| {
614                        self.definition
615                            .steps
616                            .iter()
617                            .zip(self.step_runs.iter())
618                            .any(|(s, sr)| {
619                                &s.id == dep_id
620                                    && matches!(
621                                        sr.status,
622                                        StepStatus::Done
623                                            | StepStatus::Pending
624                                            | StepStatus::Running
625                                    )
626                            })
627                    })
628            })
629    }
630
631    pub fn has_failed(&self) -> bool {
632        self.step_runs
633            .iter()
634            .any(|s| s.status == StepStatus::Failed)
635    }
636
637    /// Append a note to the run's log.
638    pub fn add_note(&mut self, step_id: &str, message: &str) {
639        self.notes.push(WorkflowNote {
640            step_id: step_id.to_string(),
641            message: message.to_string(),
642            at: Utc::now(),
643        });
644        self.updated_at = Utc::now();
645    }
646
647    /// Validate the underlying DAG. Convenience that delegates to the
648    /// definition's `detect_cycles`.
649    pub fn detect_cycles(&self) -> Result<(), String> {
650        self.definition.detect_cycles()
651    }
652}
653
654// ============================================================================
655// Entry Point — named starting positions for multi-entry workflows
656// ============================================================================
657
658/// A named entry point into a workflow.
659/// Allows workflows to be started at different steps depending on context.
660#[derive(Debug, Clone, Serialize, Deserialize)]
661pub struct EntryPoint {
662    /// Unique identifier for this entry point (e.g., "import_from_docs", "grade_only").
663    pub id: String,
664    /// Human-readable label.
665    pub label: String,
666    /// Optional description of when to use this entry point.
667    #[serde(default, skip_serializing_if = "Option::is_none")]
668    pub description: Option<String>,
669    /// The step ID where execution begins.
670    pub starts_at: String,
671    /// Pre-populated step results for steps that are skipped.
672    /// Maps step_id → result value. These steps are marked Done before execution starts.
673    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
674    pub preset_results: HashMap<String, serde_json::Value>,
675    /// Required input fields for this entry point (for UI/validation).
676    #[serde(default, skip_serializing_if = "Vec::is_empty")]
677    pub required_inputs: Vec<String>,
678    /// How a channel user reaches this entry point (slash command,
679    /// callback button, or free-text catch-all). `None` = not channel-
680    /// reachable (e.g. an internal or scheduled entry point).
681    #[serde(default, skip_serializing_if = "Option::is_none")]
682    pub trigger: Option<distri_types::channel_commands::ChannelTrigger>,
683}
684
685// ============================================================================
686// Workflow Step (template)
687// ============================================================================
688
689/// A single step in a workflow. **Template only** — runtime status /
690/// result / timestamps live on `WorkflowStepRun`.
691#[derive(Debug, Clone, Serialize, Deserialize)]
692pub struct WorkflowStep {
693    pub id: String,
694    pub label: String,
695    pub kind: StepKind,
696    /// IDs of steps that must complete before this one can run.
697    #[serde(default)]
698    pub depends_on: Vec<String>,
699    /// Execution mode for this step.
700    #[serde(default)]
701    pub execution: StepExecution,
702    /// Capabilities required to run this step.
703    #[serde(default)]
704    pub requires: Vec<StepRequirement>,
705    /// Optional explicit input mapping for this step.
706    /// Values can reference `{input.X}`, `{steps.step_id.X}`, `{env.X}`.
707    /// If omitted, the step receives the full execution context.
708    #[serde(default, skip_serializing_if = "Option::is_none")]
709    pub input: Option<serde_json::Value>,
710    /// Skip this step if the expression evaluates to true against the workflow context.
711    /// Expression format: `{input.field_name}` — truthy check (field exists and is not null/false/empty).
712    /// Supports negation: `!{input.field_name}` — skip if field is absent/falsy.
713    #[serde(default, skip_serializing_if = "Option::is_none")]
714    pub skip_if: Option<String>,
715}
716
717impl WorkflowStep {
718    fn new_step(id: &str, label: &str, kind: StepKind) -> Self {
719        Self {
720            id: id.to_string(),
721            label: label.to_string(),
722            kind,
723            depends_on: vec![],
724            execution: StepExecution::Sequential,
725            requires: vec![],
726            input: None,
727            skip_if: None,
728        }
729    }
730
731    pub fn api_call(id: &str, label: &str, method: &str, url: &str) -> Self {
732        Self::new_step(
733            id,
734            label,
735            StepKind::ApiCall {
736                method: method.to_string(),
737                url: url.to_string(),
738                body: None,
739                headers: None,
740            },
741        )
742    }
743
744    pub fn agent_run(id: &str, label: &str, agent_id: &str, prompt: &str) -> Self {
745        Self::new_step(
746            id,
747            label,
748            StepKind::AgentRun {
749                agent_id: agent_id.to_string(),
750                prompt: prompt.to_string(),
751                tools: vec![],
752                skills: vec![],
753                model: None,
754                max_iterations: None,
755            },
756        )
757    }
758
759    pub fn script(id: &str, label: &str, command: &str) -> Self {
760        Self::new_step(
761            id,
762            label,
763            StepKind::Script {
764                command: command.to_string(),
765                args: vec![],
766                cwd: None,
767                env: None,
768                timeout_secs: None,
769                output_format: None,
770                shell: None,
771            },
772        )
773    }
774
775    pub fn tool_call(id: &str, label: &str, tool_name: &str, input: serde_json::Value) -> Self {
776        Self::new_step(
777            id,
778            label,
779            StepKind::ToolCall {
780                tool_name: tool_name.to_string(),
781                input,
782                agent_id: None,
783            },
784        )
785    }
786
787    pub fn condition(
788        id: &str,
789        label: &str,
790        expression: &str,
791        if_true: StepKind,
792        if_false: Option<StepKind>,
793    ) -> Self {
794        Self::new_step(
795            id,
796            label,
797            StepKind::Condition {
798                expression: expression.to_string(),
799                if_true: Box::new(if_true),
800                if_false: if_false.map(Box::new),
801            },
802        )
803    }
804
805    pub fn checkpoint(id: &str, label: &str, message: &str) -> Self {
806        Self::new_step(
807            id,
808            label,
809            StepKind::Checkpoint {
810                message: message.to_string(),
811            },
812        )
813    }
814
815    pub fn wait_for_input(id: &str, label: &str, message: &str) -> Self {
816        Self::new_step(
817            id,
818            label,
819            StepKind::WaitForInput {
820                message: message.to_string(),
821                schema: None,
822            },
823        )
824    }
825
826    pub fn with_body(mut self, body: serde_json::Value) -> Self {
827        if let StepKind::ApiCall {
828            body: ref mut b, ..
829        } = self.kind
830        {
831            *b = Some(body);
832        }
833        self
834    }
835
836    pub fn with_depends_on(mut self, deps: Vec<&str>) -> Self {
837        self.depends_on = deps.into_iter().map(|s| s.to_string()).collect();
838        self
839    }
840
841    pub fn parallel(mut self) -> Self {
842        self.execution = StepExecution::Parallel;
843        self
844    }
845
846    pub fn with_requires(mut self, requires: Vec<StepRequirement>) -> Self {
847        self.requires = requires;
848        self
849    }
850
851    pub fn with_cwd(mut self, cwd: &str) -> Self {
852        if let StepKind::Script { cwd: ref mut c, .. } = self.kind {
853            *c = Some(cwd.to_string());
854        }
855        self
856    }
857
858    pub fn with_timeout(mut self, secs: u64) -> Self {
859        if let StepKind::Script {
860            timeout_secs: ref mut t,
861            ..
862        } = self.kind
863        {
864            *t = Some(secs);
865        }
866        self
867    }
868
869    pub fn with_env(mut self, env: HashMap<String, String>) -> Self {
870        if let StepKind::Script { env: ref mut e, .. } = self.kind {
871            *e = Some(env);
872        }
873        self
874    }
875
876    pub fn with_input_mapping(mut self, input: serde_json::Value) -> Self {
877        self.input = Some(input);
878        self
879    }
880
881    pub fn with_skip_if(mut self, expression: &str) -> Self {
882        self.skip_if = Some(expression.to_string());
883        self
884    }
885}
886
887// ============================================================================
888// Step Kind — what the step does
889// ============================================================================
890
891#[derive(Debug, Clone, Serialize, Deserialize)]
892#[serde(tag = "type", rename_all = "snake_case")]
893pub enum StepKind {
894    /// HTTP API call
895    ApiCall {
896        method: String,
897        url: String,
898        #[serde(skip_serializing_if = "Option::is_none")]
899        body: Option<serde_json::Value>,
900        #[serde(skip_serializing_if = "Option::is_none")]
901        headers: Option<HashMap<String, String>>,
902    },
903
904    /// Shell script / command
905    Script {
906        command: String,
907        #[serde(default)]
908        args: Vec<String>,
909        #[serde(default, skip_serializing_if = "Option::is_none")]
910        cwd: Option<String>,
911        #[serde(default, skip_serializing_if = "Option::is_none")]
912        env: Option<HashMap<String, String>>,
913        #[serde(default, skip_serializing_if = "Option::is_none")]
914        timeout_secs: Option<u64>,
915        #[serde(default, skip_serializing_if = "Option::is_none")]
916        output_format: Option<ScriptOutputFormat>,
917        #[serde(default, skip_serializing_if = "Option::is_none")]
918        shell: Option<ShellType>,
919    },
920
921    /// Delegate to a Distri agent (sub-agent run)
922    AgentRun {
923        agent_id: String,
924        prompt: String,
925        #[serde(default)]
926        tools: Vec<String>,
927        /// Skills to load for this agent step
928        #[serde(default)]
929        skills: Vec<String>,
930        /// Override model for this step
931        #[serde(default, skip_serializing_if = "Option::is_none")]
932        model: Option<String>,
933        /// Limit agent loop iterations
934        #[serde(default, skip_serializing_if = "Option::is_none")]
935        max_iterations: Option<u32>,
936    },
937
938    /// Single tool invocation — not a full agent loop
939    ToolCall {
940        /// Tool name (must be registered)
941        tool_name: String,
942        /// Tool input parameters
943        input: serde_json::Value,
944        /// Agent context to execute in (for tools needing agent-scoped permissions)
945        #[serde(default, skip_serializing_if = "Option::is_none")]
946        agent_id: Option<String>,
947    },
948
949    /// Conditional branch — evaluates expression against context
950    Condition {
951        expression: String,
952        if_true: Box<StepKind>,
953        #[serde(skip_serializing_if = "Option::is_none")]
954        if_false: Option<Box<StepKind>>,
955    },
956
957    /// No-op / marker step (for documentation or manual checkpoints)
958    Checkpoint { message: String },
959
960    /// Pause execution and wait for external/human input before continuing.
961    /// The workflow saves state and stops. A resume call provides the input
962    /// as the step result and continues from here.
963    WaitForInput {
964        /// Message to display to the human (what input is needed)
965        message: String,
966        /// Optional JSON Schema describing the expected input shape
967        #[serde(default, skip_serializing_if = "Option::is_none")]
968        schema: Option<serde_json::Value>,
969    },
970
971    /// Emit a channel reply (text + optional buttons). Rendered by the
972    /// gateway per channel. `text` / button fields support the standard
973    /// `{input.x}` / `{steps.id.x}` interpolation; `button_template`
974    /// fields additionally support `{item.x}` per `buttons_from`
975    /// element.
976    Reply {
977        text: String,
978        /// Static buttons (rows: outer = top-to-bottom).
979        #[serde(default, skip_serializing_if = "Vec::is_empty")]
980        buttons: Vec<Vec<distri_types::channel_commands::ReplyButtonSpec>>,
981        /// Context path resolving to an array; one button per element.
982        #[serde(default, skip_serializing_if = "Option::is_none")]
983        buttons_from: Option<String>,
984        /// Template applied per `buttons_from` element.
985        #[serde(default, skip_serializing_if = "Option::is_none")]
986        button_template: Option<distri_types::channel_commands::ReplyButtonSpec>,
987    },
988}
989
990// ============================================================================
991// Step Requirement — what a step needs to run
992// ============================================================================
993
994/// A capability required to execute a step.
995/// Uses namespaced skill identifiers:
996/// - `native:shell`, `native:browser`, `native:network` — built-in
997/// - `{provider}:{service}` — connections (e.g., `google:drive`, `slack:chat`)
998#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
999pub struct StepRequirement {
1000    /// Namespaced skill identifier.
1001    pub skill: String,
1002    /// Required permissions/scopes within the skill.
1003    #[serde(default)]
1004    pub permissions: Vec<String>,
1005    /// Optional extra constraints.
1006    #[serde(default, skip_serializing_if = "Option::is_none")]
1007    pub config: Option<serde_json::Value>,
1008}
1009
1010impl StepRequirement {
1011    /// Create a native skill requirement (prefixed with "native:").
1012    pub fn native(skill: &str) -> Self {
1013        Self {
1014            skill: format!("native:{}", skill),
1015            permissions: vec![],
1016            config: None,
1017        }
1018    }
1019
1020    /// Create a connection requirement (e.g., "google:drive").
1021    pub fn connection(provider: &str, service: &str) -> Self {
1022        Self {
1023            skill: format!("{}:{}", provider, service),
1024            permissions: vec![],
1025            config: None,
1026        }
1027    }
1028
1029    pub fn with_permissions(mut self, perms: Vec<&str>) -> Self {
1030        self.permissions = perms.into_iter().map(|s| s.to_string()).collect();
1031        self
1032    }
1033
1034    /// Get the namespace (part before ':').
1035    pub fn namespace(&self) -> Option<&str> {
1036        self.skill.split(':').next()
1037    }
1038
1039    /// Get the skill name (part after ':').
1040    pub fn skill_name(&self) -> Option<&str> {
1041        self.skill.split(':').nth(1)
1042    }
1043
1044    /// Check if this is a native skill.
1045    pub fn is_native(&self) -> bool {
1046        self.skill.starts_with("native:")
1047    }
1048
1049    /// Validate the requirement. Returns error message if invalid.
1050    pub fn validate(&self) -> Result<(), String> {
1051        if !self.skill.contains(':') {
1052            return Err(format!(
1053                "Invalid skill identifier '{}': must be namespaced (e.g., 'native:shell', 'google:drive')",
1054                self.skill
1055            ));
1056        }
1057
1058        if self.is_native() {
1059            let known = ["shell", "browser", "network", "agent", "tool"];
1060            if let Some(name) = self.skill_name() {
1061                if !known.contains(&name) {
1062                    return Err(format!(
1063                        "Unknown native skill '{}'. Known: {:?}",
1064                        name, known
1065                    ));
1066                }
1067            }
1068        }
1069
1070        Ok(())
1071    }
1072}
1073
1074// ============================================================================
1075// Checkpoint Strategy
1076// ============================================================================
1077
1078/// How workflow state is checkpointed between steps.
1079#[derive(Debug, Clone, Serialize, Deserialize)]
1080#[serde(tag = "type", rename_all = "snake_case")]
1081pub enum CheckpointStrategy {
1082    /// Redis-based, thread+task scoped, auto-TTL.
1083    Internal {
1084        #[serde(default, skip_serializing_if = "Option::is_none")]
1085        ttl_secs: Option<u64>,
1086    },
1087    /// Client-registered tool handles persistence.
1088    /// Tool must support actions: save, load, list.
1089    External { tool_name: String },
1090}
1091
1092impl Default for CheckpointStrategy {
1093    fn default() -> Self {
1094        CheckpointStrategy::Internal { ttl_secs: None }
1095    }
1096}
1097
1098/// Metadata about a checkpoint snapshot.
1099#[derive(Debug, Clone, Serialize, Deserialize)]
1100pub struct CheckpointMeta {
1101    pub checkpoint_id: String,
1102    pub workflow_id: String,
1103    pub step_id: String,
1104    pub created_at: DateTime<Utc>,
1105}
1106
1107// ============================================================================
1108// Enums
1109// ============================================================================
1110
1111/// Top-level run status. Engine-internal; external surfaces translate
1112/// to `distri_types::TaskStatus` via `From`.
1113#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
1114#[serde(rename_all = "snake_case")]
1115pub enum WorkflowStatus {
1116    #[default]
1117    Pending,
1118    Running,
1119    /// Waiting for human/external input (`WaitForInput` step).
1120    Paused,
1121    Completed,
1122    Failed,
1123    /// All remaining steps are blocked — requirements cannot be met.
1124    Blocked,
1125}
1126
1127impl From<WorkflowStatus> for TaskStatus {
1128    fn from(s: WorkflowStatus) -> Self {
1129        match s {
1130            WorkflowStatus::Pending => TaskStatus::Pending,
1131            WorkflowStatus::Running => TaskStatus::Running,
1132            WorkflowStatus::Paused => TaskStatus::InputRequired,
1133            WorkflowStatus::Completed => TaskStatus::Completed,
1134            WorkflowStatus::Failed => TaskStatus::Failed,
1135            // No 1:1 TaskStatus for Blocked — surface as Failed; the
1136            // step-level error fields carry the "missing skills:" reason.
1137            WorkflowStatus::Blocked => TaskStatus::Failed,
1138        }
1139    }
1140}
1141
1142/// Per-step phase. Engine-internal; richer than `TaskStatus` because
1143/// the engine cares about the difference between "blocked on a missing
1144/// requirement" (cannot start) and "failed during execution" (tried,
1145/// errored). External surfaces translate via `From<StepStatus>`.
1146#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
1147#[serde(rename_all = "snake_case")]
1148pub enum StepStatus {
1149    #[default]
1150    Pending,
1151    /// Requirements not met — cannot execute.
1152    Blocked,
1153    Running,
1154    Done,
1155    Failed,
1156    Skipped,
1157    /// Step is waiting for external/human input. Workflow is paused.
1158    WaitingForInput,
1159}
1160
1161impl From<StepStatus> for TaskStatus {
1162    fn from(s: StepStatus) -> Self {
1163        match s {
1164            StepStatus::Pending => TaskStatus::Pending,
1165            // No TaskStatus::Blocked — surface as Failed (with
1166            // `step_run.error` carrying the missing-requirement reason).
1167            StepStatus::Blocked => TaskStatus::Failed,
1168            StepStatus::Running => TaskStatus::Running,
1169            StepStatus::Done => TaskStatus::Completed,
1170            StepStatus::Failed => TaskStatus::Failed,
1171            // Intentionally not run; semantically a deliberate cancel.
1172            StepStatus::Skipped => TaskStatus::Canceled,
1173            StepStatus::WaitingForInput => TaskStatus::InputRequired,
1174        }
1175    }
1176}
1177
1178#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
1179#[serde(rename_all = "snake_case")]
1180pub enum StepExecution {
1181    /// Must wait for previous step to complete.
1182    #[default]
1183    Sequential,
1184    /// Can run in parallel with other parallel steps at the same level.
1185    Parallel,
1186}
1187
1188#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1189#[serde(rename_all = "snake_case")]
1190pub enum ScriptOutputFormat {
1191    Text,
1192    Json,
1193    Stream,
1194}
1195
1196#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1197#[serde(rename_all = "snake_case")]
1198pub enum ShellType {
1199    Bash,
1200    Sh,
1201    Zsh,
1202}
1203
1204// ============================================================================
1205// Step Result
1206// ============================================================================
1207
1208#[derive(Debug, Clone, Serialize, Deserialize)]
1209pub struct StepResult {
1210    pub status: StepStatus,
1211    pub result: Option<serde_json::Value>,
1212    pub error: Option<String>,
1213    /// Updates to merge into workflow context for subsequent steps.
1214    #[serde(skip_serializing_if = "Option::is_none")]
1215    pub context_updates: Option<serde_json::Value>,
1216}
1217
1218impl StepResult {
1219    pub fn done(result: serde_json::Value) -> Self {
1220        Self {
1221            status: StepStatus::Done,
1222            result: Some(result),
1223            error: None,
1224            context_updates: None,
1225        }
1226    }
1227
1228    pub fn done_with_context(result: serde_json::Value, updates: serde_json::Value) -> Self {
1229        Self {
1230            status: StepStatus::Done,
1231            result: Some(result),
1232            error: None,
1233            context_updates: Some(updates),
1234        }
1235    }
1236
1237    pub fn failed(error: &str) -> Self {
1238        Self {
1239            status: StepStatus::Failed,
1240            result: None,
1241            error: Some(error.to_string()),
1242            context_updates: None,
1243        }
1244    }
1245
1246    pub fn skipped() -> Self {
1247        Self {
1248            status: StepStatus::Skipped,
1249            result: None,
1250            error: None,
1251            context_updates: None,
1252        }
1253    }
1254}
1255
1256// ============================================================================
1257// Workflow Note
1258// ============================================================================
1259
1260#[derive(Debug, Clone, Serialize, Deserialize)]
1261pub struct WorkflowNote {
1262    pub step_id: String,
1263    pub message: String,
1264    pub at: DateTime<Utc>,
1265}
1266
1267// ============================================================================
1268// Workflow Run Summary (returned at end of execution)
1269// ============================================================================
1270
1271/// Snapshot of one finished step in a `WorkflowRunSummary`.
1272///
1273/// Surfaces `distri_types::TaskStatus` at the boundary — engine-internal
1274/// `StepStatus` distinctions (Blocked / Skipped) translate via
1275/// `StepStatus → TaskStatus`. The original phase is still recoverable
1276/// from the `error` field for Blocked steps.
1277#[derive(Debug, Clone, Serialize, Deserialize)]
1278pub struct WorkflowStepSummary {
1279    pub id: String,
1280    pub label: String,
1281    pub status: TaskStatus,
1282    #[serde(default, skip_serializing_if = "Option::is_none")]
1283    pub result: Option<serde_json::Value>,
1284    #[serde(default, skip_serializing_if = "Option::is_none")]
1285    pub error: Option<String>,
1286}
1287
1288/// Final summary of a workflow run — id, terminal status, and one row
1289/// per step. Returned to callers (e.g. the WorkflowAgent invoke result)
1290/// instead of an ad-hoc JSON shape.
1291#[derive(Debug, Clone, Serialize, Deserialize)]
1292pub struct WorkflowRunSummary {
1293    pub workflow_id: String,
1294    pub status: TaskStatus,
1295    pub steps: Vec<WorkflowStepSummary>,
1296}
1297
1298impl WorkflowRunSummary {
1299    /// Build a summary from a finished `WorkflowRun` and its terminal
1300    /// `WorkflowStatus`. Translates statuses to `TaskStatus` at the
1301    /// boundary so consumers don't need to know about the engine's
1302    /// internal enums.
1303    pub fn from_run(run: &WorkflowRun, status: WorkflowStatus) -> Self {
1304        let steps = run
1305            .steps()
1306            .iter()
1307            .zip(run.step_runs.iter())
1308            .map(|(step, sr)| WorkflowStepSummary {
1309                id: step.id.clone(),
1310                label: step.label.clone(),
1311                status: sr.status.into(),
1312                result: sr.result.clone(),
1313                error: sr.error.clone(),
1314            })
1315            .collect();
1316        Self {
1317            workflow_id: run.id().to_string(),
1318            status: status.into(),
1319            steps,
1320        }
1321    }
1322}
1323
1324// ============================================================================
1325// Workflow Events (for streaming to clients)
1326// ============================================================================
1327
1328/// Events emitted during workflow execution.
1329#[derive(Debug, Clone, Serialize, Deserialize)]
1330#[serde(tag = "event", rename_all = "snake_case")]
1331pub enum WorkflowEvent {
1332    /// Workflow started
1333    WorkflowStarted {
1334        workflow_id: String,
1335        total_steps: usize,
1336    },
1337    /// A step started executing
1338    StepStarted {
1339        workflow_id: String,
1340        step_id: String,
1341        step_label: String,
1342    },
1343    /// A step completed successfully
1344    StepCompleted {
1345        workflow_id: String,
1346        step_id: String,
1347        step_label: String,
1348        result: Option<serde_json::Value>,
1349    },
1350    /// A step failed
1351    StepFailed {
1352        workflow_id: String,
1353        step_id: String,
1354        step_label: String,
1355        error: String,
1356    },
1357    /// A step is waiting for external/human input
1358    StepWaiting {
1359        workflow_id: String,
1360        step_id: String,
1361        step_label: String,
1362        message: String,
1363        schema: Option<serde_json::Value>,
1364    },
1365    /// Workflow completed (all steps done or failed)
1366    WorkflowCompleted {
1367        workflow_id: String,
1368        status: WorkflowStatus,
1369        steps_done: usize,
1370        steps_failed: usize,
1371    },
1372}