Skip to main content

distri_workflow/
types.rs

1//! Core types for the workflow engine.
2//!
3//! Two layers:
4//!
5//! - **Definition** (`WorkflowDefinition`, `WorkflowStep`): the static
6//!   template — what this workflow IS. Stored alongside the agent
7//!   config; never mutated by execution.
8//! - **Run** (`WorkflowRun`, `WorkflowStepRun`): runtime state for one
9//!   execution — status, current step pointer, shared context, per-step
10//!   result/error/timestamps. Built from a definition via
11//!   `WorkflowRun::new(definition)` and then mutated by the engine.
12//!
13//! Status uses the canonical `distri_types::TaskStatus` everywhere.
14//! Workflow-specific concepts that don't have a 1:1 TaskStatus value
15//! map as follows:
16//!
17//! | concept | TaskStatus | extra signal |
18//! |---|---|---|
19//! | step waiting for input / workflow paused | InputRequired | — |
20//! | step skipped (skip_if / entry-point) | Canceled | note appended |
21//! | step blocked (missing requirement) | Failed | error explains |
22//!
23//! Phase 2b will replace `WorkflowStateStore` with the cloud
24//! `TaskStore` + a `workflow_step_executions` sidecar so runs flow
25//! through the canonical task tree.
26
27use chrono::{DateTime, Utc};
28pub use distri_types::TaskStatus;
29use serde::{Deserialize, Serialize};
30use std::collections::HashMap;
31
32// ============================================================================
33// Workflow Definition (template)
34// ============================================================================
35
36/// A workflow is a DAG of steps. The definition is the *template* — no
37/// runtime state lives here. Use `WorkflowRun::new(definition)` to start
38/// an execution.
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct WorkflowDefinition {
41    pub id: String,
42    pub steps: Vec<WorkflowStep>,
43    /// JSON Schema describing required inputs for this workflow.
44    #[serde(default, skip_serializing_if = "Option::is_none")]
45    pub input_schema: Option<serde_json::Value>,
46    /// How workflow state is checkpointed between steps.
47    #[serde(default)]
48    pub checkpoint: CheckpointStrategy,
49    /// Named entry points for multi-entry workflows.
50    #[serde(default, skip_serializing_if = "Vec::is_empty")]
51    pub entry_points: Vec<EntryPoint>,
52}
53
54/// Built-in channel commands a workflow may not shadow. Mirrors
55/// `distri-gateway` `ChannelCommand::parse`.
56pub const BUILTIN_CHANNEL_COMMANDS: &[&str] = &[
57    "/start",
58    "/stop",
59    "/disconnect",
60    "/reset",
61    "/new",
62    "/newsession",
63    "/newthread",
64    "/status",
65    "/debug",
66    "/verbose",
67    "/help",
68    "/switch",
69    "/workspace",
70    "/context",
71    "/ctx",
72];
73
74impl WorkflowDefinition {
75    pub fn new(steps: Vec<WorkflowStep>) -> Self {
76        Self {
77            id: uuid::Uuid::new_v4().to_string(),
78            steps,
79            input_schema: None,
80            checkpoint: CheckpointStrategy::default(),
81            entry_points: vec![],
82        }
83    }
84
85    pub fn with_id(mut self, id: &str) -> Self {
86        self.id = id.to_string();
87        self
88    }
89
90    pub fn with_checkpoint(mut self, strategy: CheckpointStrategy) -> Self {
91        self.checkpoint = strategy;
92        self
93    }
94
95    pub fn with_entry_points(mut self, entry_points: Vec<EntryPoint>) -> Self {
96        self.entry_points = entry_points;
97        self
98    }
99
100    /// Get an entry point by ID.
101    pub fn entry_point(&self, id: &str) -> Option<&EntryPoint> {
102        self.entry_points.iter().find(|ep| ep.id == id)
103    }
104
105    /// Find all step IDs reachable from the given step (inclusive) by following
106    /// depends_on forward. Used by entry-point logic to mark unreachable steps
107    /// as skipped at run start.
108    pub fn reachable_from(&self, start_step_id: &str) -> std::collections::HashSet<String> {
109        use std::collections::{HashSet, VecDeque};
110
111        let mut reachable = HashSet::new();
112        let mut queue = VecDeque::new();
113        queue.push_back(start_step_id.to_string());
114
115        while let Some(current) = queue.pop_front() {
116            if !reachable.insert(current.clone()) {
117                continue;
118            }
119            for step in &self.steps {
120                if step.depends_on.contains(&current) && !reachable.contains(&step.id) {
121                    queue.push_back(step.id.clone());
122                }
123            }
124        }
125
126        reachable
127    }
128
129    /// Validate the channel-command surface declared by entry-point
130    /// triggers. Returns a precise error string on the first problem.
131    pub fn validate_channel_surface(&self) -> Result<(), String> {
132        use distri_types::WorkflowTrigger;
133        use std::collections::HashSet;
134
135        let step_ids: HashSet<&str> =
136            self.steps.iter().map(|s| s.id.as_str()).collect();
137        let mut slash_names: HashSet<String> = HashSet::new();
138        let mut callback_ids: HashSet<String> = HashSet::new();
139        let mut message_count = 0usize;
140
141        for ep in &self.entry_points {
142            if !step_ids.contains(ep.starts_at.as_str()) {
143                return Err(format!(
144                    "entry point '{}' starts_at unknown step '{}'",
145                    ep.id, ep.starts_at
146                ));
147            }
148            for trigger in &ep.triggers {
149                match trigger {
150                    WorkflowTrigger::Slash { name, aliases, .. } => {
151                        for n in std::iter::once(name).chain(aliases.iter()) {
152                            let lower = n.to_lowercase();
153                            if BUILTIN_CHANNEL_COMMANDS.contains(&lower.as_str()) {
154                                return Err(format!(
155                                    "slash command '{n}' shadows a built-in command"
156                                ));
157                            }
158                            if !slash_names.insert(lower.clone()) {
159                                return Err(format!(
160                                    "entry point '{}': slash command '{}' is already declared",
161                                    ep.id, n
162                                ));
163                            }
164                        }
165                    }
166                    WorkflowTrigger::Callback { id, .. } => {
167                        if !callback_ids.insert(id.clone()) {
168                            return Err(format!(
169                                "entry point '{}': callback id '{}' is already declared",
170                                ep.id, id
171                            ));
172                        }
173                    }
174                    WorkflowTrigger::Message {} => message_count += 1,
175                    // Manual / Schedule / Webhook / Event / Tool don't
176                    // contribute to channel-command surface validation.
177                    _ => {}
178                }
179            }
180        }
181        if message_count > 1 {
182            return Err(format!(
183                "workflow declares {message_count} message catch-all entry \
184                 points; at most one is allowed"
185            ));
186        }
187        for step in &self.steps {
188            if let StepKind::Reply {
189                buttons_from,
190                button_template,
191                ..
192            } = &step.kind
193            {
194                if button_template.is_some() != buttons_from.is_some() {
195                    return Err(format!(
196                        "reply step '{}': button_template and buttons_from \
197                         must be set together",
198                        step.id
199                    ));
200                }
201            }
202        }
203        Ok(())
204    }
205
206    /// Detect circular dependencies in the workflow DAG.
207    /// Returns `Err` with the cycle description if found.
208    pub fn detect_cycles(&self) -> Result<(), String> {
209        use std::collections::{HashMap, HashSet};
210
211        let step_ids: HashSet<&str> = self.steps.iter().map(|s| s.id.as_str()).collect();
212        let mut adj: HashMap<&str, Vec<&str>> = HashMap::new();
213        for step in &self.steps {
214            adj.insert(
215                step.id.as_str(),
216                step.depends_on.iter().map(|s| s.as_str()).collect(),
217            );
218        }
219
220        let mut visited = HashSet::new();
221        let mut in_stack = HashSet::new();
222
223        fn dfs<'a>(
224            node: &'a str,
225            adj: &HashMap<&'a str, Vec<&'a str>>,
226            visited: &mut HashSet<&'a str>,
227            in_stack: &mut HashSet<&'a str>,
228            path: &mut Vec<&'a str>,
229        ) -> Result<(), String> {
230            visited.insert(node);
231            in_stack.insert(node);
232            path.push(node);
233
234            if let Some(deps) = adj.get(node) {
235                for &dep in deps {
236                    if !visited.contains(dep) {
237                        dfs(dep, adj, visited, in_stack, path)?;
238                    } else if in_stack.contains(dep) {
239                        let cycle_start = path.iter().position(|&n| n == dep).unwrap();
240                        let cycle: Vec<&str> = path[cycle_start..].to_vec();
241                        return Err(format!(
242                            "Circular dependency detected: {} → {}",
243                            cycle.join(" → "),
244                            dep
245                        ));
246                    }
247                }
248            }
249
250            in_stack.remove(node);
251            path.pop();
252            Ok(())
253        }
254
255        let mut path = Vec::new();
256        for step in &self.steps {
257            if !visited.contains(step.id.as_str()) {
258                dfs(
259                    step.id.as_str(),
260                    &adj,
261                    &mut visited,
262                    &mut in_stack,
263                    &mut path,
264                )?;
265            }
266        }
267
268        // Also check for references to non-existent steps
269        for step in &self.steps {
270            for dep in &step.depends_on {
271                if !step_ids.contains(dep.as_str()) {
272                    return Err(format!(
273                        "Step '{}' depends on '{}' which does not exist",
274                        step.id, dep
275                    ));
276                }
277            }
278        }
279
280        Ok(())
281    }
282}
283
284
285// ============================================================================
286// Workflow Run (execution state)
287// ============================================================================
288
289fn default_empty_object() -> serde_json::Value {
290    serde_json::json!({})
291}
292
293fn default_now() -> DateTime<Utc> {
294    Utc::now()
295}
296
297/// One execution of a `WorkflowDefinition`. Owns the definition plus all
298/// runtime state — status, shared context, per-step status / result /
299/// error / timestamps. The engine mutates a `WorkflowRun`.
300///
301/// `step_runs` is parallel to `definition.steps` (same length, same
302/// order).
303///
304/// **Wire shape**: `definition` is flattened, so a `WorkflowRun`
305/// serializes as one flat JSON object with the definition fields
306/// (`id`, `steps`, `input_schema`, …) alongside the runtime fields
307/// (`status`, `current_step`, `context`, `step_runs`, …). This
308/// keeps the persisted run row identical to the legacy monolithic
309/// shape.
310#[derive(Debug, Clone, Serialize, Deserialize)]
311pub struct WorkflowRun {
312    #[serde(flatten)]
313    pub definition: WorkflowDefinition,
314    #[serde(default)]
315    pub status: TaskStatus,
316    #[serde(default)]
317    pub current_step: usize,
318    #[serde(default = "default_empty_object")]
319    pub context: serde_json::Value,
320    #[serde(default)]
321    pub notes: Vec<WorkflowNote>,
322    #[serde(default)]
323    pub step_runs: Vec<WorkflowStepRun>,
324    #[serde(default = "default_now")]
325    pub created_at: DateTime<Utc>,
326    #[serde(default = "default_now")]
327    pub updated_at: DateTime<Utc>,
328}
329
330/// Per-step runtime state. Parallel to `WorkflowDefinition.steps[i]`.
331#[derive(Debug, Clone, Serialize, Deserialize, Default)]
332pub struct WorkflowStepRun {
333    pub step_id: String,
334    #[serde(default)]
335    pub status: TaskStatus,
336    #[serde(default, skip_serializing_if = "Option::is_none")]
337    pub result: Option<serde_json::Value>,
338    #[serde(default, skip_serializing_if = "Option::is_none")]
339    pub error: Option<String>,
340    #[serde(default, skip_serializing_if = "Option::is_none")]
341    pub started_at: Option<DateTime<Utc>>,
342    #[serde(default, skip_serializing_if = "Option::is_none")]
343    pub completed_at: Option<DateTime<Utc>>,
344}
345
346impl WorkflowRun {
347    /// Build a fresh run from a definition. All step_runs start `Pending`,
348    /// context is `{}`, status is `Pending`.
349    pub fn new(definition: WorkflowDefinition) -> Self {
350        let step_runs = definition
351            .steps
352            .iter()
353            .map(|s| WorkflowStepRun {
354                step_id: s.id.clone(),
355                ..Default::default()
356            })
357            .collect();
358        Self {
359            definition,
360            status: TaskStatus::Pending,
361            current_step: 0,
362            context: serde_json::json!({}),
363            notes: vec![],
364            step_runs,
365            created_at: Utc::now(),
366            updated_at: Utc::now(),
367        }
368    }
369
370    /// Convenience for callers that want to skip the explicit
371    /// definition struct (mostly tests).
372    pub fn from_steps(steps: Vec<WorkflowStep>) -> Self {
373        Self::new(WorkflowDefinition::new(steps))
374    }
375
376    pub fn with_context(mut self, context: serde_json::Value) -> Self {
377        self.context = context;
378        self
379    }
380
381    pub fn with_id(mut self, id: &str) -> Self {
382        self.definition.id = id.to_string();
383        self
384    }
385
386    pub fn with_checkpoint(mut self, strategy: CheckpointStrategy) -> Self {
387        self.definition.checkpoint = strategy;
388        self
389    }
390
391    pub fn with_entry_points(mut self, entry_points: Vec<EntryPoint>) -> Self {
392        self.definition.entry_points = entry_points;
393        self
394    }
395
396    pub fn id(&self) -> &str {
397        &self.definition.id
398    }
399
400    pub fn steps(&self) -> &[WorkflowStep] {
401        &self.definition.steps
402    }
403
404    pub fn step(&self, idx: usize) -> &WorkflowStep {
405        &self.definition.steps[idx]
406    }
407
408    pub fn step_run(&self, idx: usize) -> &WorkflowStepRun {
409        &self.step_runs[idx]
410    }
411
412    pub fn step_run_mut(&mut self, idx: usize) -> &mut WorkflowStepRun {
413        &mut self.step_runs[idx]
414    }
415
416    pub fn step_run_by_id(&self, step_id: &str) -> Option<&WorkflowStepRun> {
417        self.step_runs.iter().find(|s| s.step_id == step_id)
418    }
419
420    pub fn step_run_by_id_mut(&mut self, step_id: &str) -> Option<&mut WorkflowStepRun> {
421        self.step_runs.iter_mut().find(|s| s.step_id == step_id)
422    }
423
424    /// Apply an entry point: mark steps not reachable from `starts_at` as
425    /// Skipped, pre-populate their results from `preset_results`, and
426    /// merge those into context so downstream `{steps.X}` references work.
427    pub fn apply_entry_point(mut self, entry_point_id: &str) -> Result<Self, String> {
428        let ep = self
429            .definition
430            .entry_points
431            .iter()
432            .find(|ep| ep.id == entry_point_id)
433            .ok_or_else(|| format!("Entry point '{}' not found", entry_point_id))?
434            .clone();
435
436        if !self.definition.steps.iter().any(|s| s.id == ep.starts_at) {
437            return Err(format!(
438                "Entry point '{}' references step '{}' which does not exist",
439                entry_point_id, ep.starts_at
440            ));
441        }
442
443        let reachable = self.definition.reachable_from(&ep.starts_at);
444
445        for (i, step) in self.definition.steps.iter().enumerate() {
446            if !reachable.contains(&step.id) {
447                self.step_runs[i].status = TaskStatus::Canceled;
448                if let Some(result) = ep.preset_results.get(&step.id) {
449                    self.step_runs[i].result = Some(result.clone());
450                }
451            }
452        }
453
454        if let Some(ctx) = self.context.as_object_mut() {
455            let steps = ctx
456                .entry("steps")
457                .or_insert(serde_json::json!({}))
458                .as_object_mut()
459                .expect("steps must be an object");
460            for (step_id, result) in &ep.preset_results {
461                steps.insert(step_id.clone(), result.clone());
462            }
463        }
464
465        Ok(self)
466    }
467
468    /// Initialize the run with validated input. Input is validated
469    /// against `definition.input_schema` if present, then merged into
470    /// `context`. Status flips to Running.
471    pub fn with_input(mut self, input: serde_json::Value) -> Result<Self, String> {
472        if let Some(ref schema_value) = self.definition.input_schema {
473            let validator = jsonschema::validator_for(schema_value)
474                .map_err(|e| format!("Invalid input_schema: {e}"))?;
475
476            if !validator.is_valid(&input) {
477                let errors: Vec<String> = validator
478                    .iter_errors(&input)
479                    .map(|e| format!("{}", e))
480                    .collect();
481                return Err(format!("Input validation failed: {}", errors.join("; ")));
482            }
483        }
484
485        if let (Some(ctx), Some(inp)) = (self.context.as_object_mut(), input.as_object()) {
486            for (k, v) in inp {
487                ctx.insert(k.clone(), v.clone());
488            }
489            ctx.insert("input".to_string(), input.clone());
490        }
491
492        self.status = TaskStatus::Running;
493        self.updated_at = Utc::now();
494        Ok(self)
495    }
496
497    /// First pending step, if any.
498    pub fn next_pending_step(&self) -> Option<(usize, &WorkflowStep)> {
499        self.step_runs
500            .iter()
501            .enumerate()
502            .find(|(_, s)| s.status == TaskStatus::Pending)
503            .map(|(i, _)| (i, &self.definition.steps[i]))
504    }
505
506    /// All steps that can run now: pending + all dependencies completed.
507    /// Pure query — does not mutate.
508    pub fn runnable_steps(&self) -> Vec<(usize, &WorkflowStep)> {
509        let mut runnable = vec![];
510        for (i, step) in self.definition.steps.iter().enumerate() {
511            if self.step_runs[i].status != TaskStatus::Pending {
512                continue;
513            }
514            let deps_met = step.depends_on.iter().all(|dep_id| {
515                self.definition
516                    .steps
517                    .iter()
518                    .zip(self.step_runs.iter())
519                    .any(|(s, sr)| {
520                        &s.id == dep_id
521                            && matches!(sr.status, TaskStatus::Completed | TaskStatus::Canceled)
522                    })
523            });
524            if deps_met {
525                runnable.push((i, step));
526            }
527        }
528        runnable
529    }
530
531    pub fn is_complete(&self) -> bool {
532        self.step_runs.iter().all(|s| {
533            matches!(
534                s.status,
535                TaskStatus::Completed | TaskStatus::Canceled | TaskStatus::Failed
536            )
537        })
538    }
539
540    pub fn is_waiting_for_input(&self) -> bool {
541        self.step_runs
542            .iter()
543            .any(|s| s.status == TaskStatus::InputRequired)
544    }
545
546    pub fn waiting_step(&self) -> Option<(usize, &WorkflowStep)> {
547        self.step_runs
548            .iter()
549            .enumerate()
550            .find(|(_, s)| s.status == TaskStatus::InputRequired)
551            .map(|(i, _)| (i, &self.definition.steps[i]))
552    }
553
554    /// Resume a paused run by providing input for the waiting step.
555    pub fn resume_step(
556        &mut self,
557        step_id: &str,
558        result: serde_json::Value,
559    ) -> Result<usize, String> {
560        let idx = self
561            .step_runs
562            .iter()
563            .position(|s| s.step_id == step_id && s.status == TaskStatus::InputRequired)
564            .ok_or_else(|| {
565                format!(
566                    "Step '{}' not found or not in waiting_for_input state",
567                    step_id
568                )
569            })?;
570
571        self.step_runs[idx].status = TaskStatus::Completed;
572        self.step_runs[idx].result = Some(result.clone());
573        self.step_runs[idx].completed_at = Some(Utc::now());
574
575        if let Some(ctx) = self.context.as_object_mut() {
576            let steps = ctx
577                .entry("steps")
578                .or_insert(serde_json::json!({}))
579                .as_object_mut()
580                .expect("steps must be an object");
581            steps.insert(step_id.to_string(), result);
582        }
583
584        self.status = TaskStatus::Running;
585        self.updated_at = Utc::now();
586        Ok(idx)
587    }
588
589    /// Stuck: blocked steps, nothing running, no path forward.
590    pub fn is_stuck(&self) -> bool {
591        let has_blocked = self
592            .step_runs
593            .iter()
594            .any(|s| s.status == TaskStatus::Failed);
595        let has_pending = self
596            .step_runs
597            .iter()
598            .any(|s| s.status == TaskStatus::Pending);
599        let has_running = self
600            .step_runs
601            .iter()
602            .any(|s| s.status == TaskStatus::Running);
603
604        if !has_blocked || has_running {
605            return false;
606        }
607
608        if !has_pending {
609            return true;
610        }
611
612        !self
613            .definition
614            .steps
615            .iter()
616            .zip(self.step_runs.iter())
617            .any(|(step, run)| {
618                run.status == TaskStatus::Pending
619                    && step.depends_on.iter().all(|dep_id| {
620                        self.definition
621                            .steps
622                            .iter()
623                            .zip(self.step_runs.iter())
624                            .any(|(s, sr)| {
625                                &s.id == dep_id
626                                    && matches!(
627                                        sr.status,
628                                        TaskStatus::Completed
629                                            | TaskStatus::Pending
630                                            | TaskStatus::Running
631                                    )
632                            })
633                    })
634            })
635    }
636
637    pub fn has_failed(&self) -> bool {
638        self.step_runs
639            .iter()
640            .any(|s| s.status == TaskStatus::Failed)
641    }
642
643    /// Append a note to the run's log.
644    pub fn add_note(&mut self, step_id: &str, message: &str) {
645        self.notes.push(WorkflowNote {
646            step_id: step_id.to_string(),
647            message: message.to_string(),
648            at: Utc::now(),
649        });
650        self.updated_at = Utc::now();
651    }
652
653    /// Validate the underlying DAG. Convenience that delegates to the
654    /// definition's `detect_cycles`.
655    pub fn detect_cycles(&self) -> Result<(), String> {
656        self.definition.detect_cycles()
657    }
658}
659
660// ============================================================================
661// Entry Point — named starting positions for multi-entry workflows
662// ============================================================================
663
664/// A named entry point into a workflow.
665/// Allows workflows to be started at different steps depending on context.
666#[derive(Debug, Clone, Serialize, Deserialize)]
667pub struct EntryPoint {
668    /// Unique identifier for this entry point (e.g., "import_from_docs", "grade_only").
669    pub id: String,
670    /// Human-readable label.
671    pub label: String,
672    /// Optional description of when to use this entry point.
673    #[serde(default, skip_serializing_if = "Option::is_none")]
674    pub description: Option<String>,
675    /// The step ID where execution begins.
676    pub starts_at: String,
677    /// Pre-populated step results for steps that are skipped.
678    /// Maps step_id → result value. These steps are marked Done before execution starts.
679    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
680    pub preset_results: HashMap<String, serde_json::Value>,
681    /// Required input fields for this entry point (for UI/validation).
682    #[serde(default, skip_serializing_if = "Vec::is_empty")]
683    pub required_inputs: Vec<String>,
684    /// How this entry point is reached. Empty = default `Manual`
685    /// only (direct API/UI invocation). Multiple triggers can target
686    /// the same entry point — e.g. a slash command and a webhook
687    /// both starting the same run.
688    #[serde(default, skip_serializing_if = "Vec::is_empty")]
689    pub triggers: Vec<distri_types::WorkflowTrigger>,
690}
691
692// ============================================================================
693// Workflow Step (template)
694// ============================================================================
695
696/// A single step in a workflow. **Template only** — runtime status /
697/// result / timestamps live on `WorkflowStepRun`.
698#[derive(Debug, Clone, Serialize, Deserialize)]
699pub struct WorkflowStep {
700    pub id: String,
701    pub label: String,
702    pub kind: StepKind,
703    /// IDs of steps that must complete before this one can run.
704    #[serde(default)]
705    pub depends_on: Vec<String>,
706    /// Execution mode for this step.
707    #[serde(default)]
708    pub execution: StepExecution,
709    /// Capabilities required to run this step.
710    #[serde(default)]
711    pub requires: Vec<StepRequirement>,
712    /// Optional explicit input mapping for this step.
713    /// Values can reference `{input.X}`, `{steps.step_id.X}`, `{env.X}`.
714    /// If omitted, the step receives the full execution context.
715    #[serde(default, skip_serializing_if = "Option::is_none")]
716    pub input: Option<serde_json::Value>,
717    /// Skip this step if the expression evaluates to true against the workflow context.
718    /// Expression format: `{input.field_name}` — truthy check (field exists and is not null/false/empty).
719    /// Supports negation: `!{input.field_name}` — skip if field is absent/falsy.
720    #[serde(default, skip_serializing_if = "Option::is_none")]
721    pub skip_if: Option<String>,
722}
723
724impl WorkflowStep {
725    fn new_step(id: &str, label: &str, kind: StepKind) -> Self {
726        Self {
727            id: id.to_string(),
728            label: label.to_string(),
729            kind,
730            depends_on: vec![],
731            execution: StepExecution::Sequential,
732            requires: vec![],
733            input: None,
734            skip_if: None,
735        }
736    }
737
738    pub fn api_call(id: &str, label: &str, method: &str, url: &str) -> Self {
739        Self::new_step(
740            id,
741            label,
742            StepKind::ApiCall {
743                method: method.to_string(),
744                url: url.to_string(),
745                body: None,
746                headers: None,
747            },
748        )
749    }
750
751    pub fn agent_run(id: &str, label: &str, agent_id: &str, prompt: &str) -> Self {
752        Self::new_step(
753            id,
754            label,
755            StepKind::AgentRun {
756                agent_id: agent_id.to_string(),
757                prompt: prompt.to_string(),
758                tools: vec![],
759                skills: vec![],
760                model: None,
761                max_iterations: None,
762            },
763        )
764    }
765
766    pub fn script(id: &str, label: &str, command: &str) -> Self {
767        Self::new_step(
768            id,
769            label,
770            StepKind::Script {
771                command: command.to_string(),
772                args: vec![],
773                cwd: None,
774                env: None,
775                timeout_secs: None,
776                output_format: None,
777                shell: None,
778            },
779        )
780    }
781
782    pub fn tool_call(id: &str, label: &str, tool_name: &str, input: serde_json::Value) -> Self {
783        Self::new_step(
784            id,
785            label,
786            StepKind::ToolCall {
787                tool_name: tool_name.to_string(),
788                input,
789                agent_id: None,
790            },
791        )
792    }
793
794    pub fn condition(
795        id: &str,
796        label: &str,
797        expression: &str,
798        if_true: StepKind,
799        if_false: Option<StepKind>,
800    ) -> Self {
801        Self::new_step(
802            id,
803            label,
804            StepKind::Condition {
805                expression: expression.to_string(),
806                if_true: Box::new(if_true),
807                if_false: if_false.map(Box::new),
808            },
809        )
810    }
811
812    pub fn checkpoint(id: &str, label: &str, message: &str) -> Self {
813        Self::new_step(
814            id,
815            label,
816            StepKind::Checkpoint {
817                message: message.to_string(),
818            },
819        )
820    }
821
822    pub fn wait_for_input(id: &str, label: &str, message: &str) -> Self {
823        Self::new_step(
824            id,
825            label,
826            StepKind::WaitForInput {
827                message: message.to_string(),
828                schema: None,
829            },
830        )
831    }
832
833    pub fn with_body(mut self, body: serde_json::Value) -> Self {
834        if let StepKind::ApiCall {
835            body: ref mut b, ..
836        } = self.kind
837        {
838            *b = Some(body);
839        }
840        self
841    }
842
843    pub fn with_depends_on(mut self, deps: Vec<&str>) -> Self {
844        self.depends_on = deps.into_iter().map(|s| s.to_string()).collect();
845        self
846    }
847
848    pub fn parallel(mut self) -> Self {
849        self.execution = StepExecution::Parallel;
850        self
851    }
852
853    pub fn with_requires(mut self, requires: Vec<StepRequirement>) -> Self {
854        self.requires = requires;
855        self
856    }
857
858    pub fn with_cwd(mut self, cwd: &str) -> Self {
859        if let StepKind::Script { cwd: ref mut c, .. } = self.kind {
860            *c = Some(cwd.to_string());
861        }
862        self
863    }
864
865    pub fn with_timeout(mut self, secs: u64) -> Self {
866        if let StepKind::Script {
867            timeout_secs: ref mut t,
868            ..
869        } = self.kind
870        {
871            *t = Some(secs);
872        }
873        self
874    }
875
876    pub fn with_env(mut self, env: HashMap<String, String>) -> Self {
877        if let StepKind::Script { env: ref mut e, .. } = self.kind {
878            *e = Some(env);
879        }
880        self
881    }
882
883    pub fn with_input_mapping(mut self, input: serde_json::Value) -> Self {
884        self.input = Some(input);
885        self
886    }
887
888    pub fn with_skip_if(mut self, expression: &str) -> Self {
889        self.skip_if = Some(expression.to_string());
890        self
891    }
892}
893
894// ============================================================================
895// Step Kind — what the step does
896// ============================================================================
897
898#[derive(Debug, Clone, Serialize, Deserialize)]
899#[serde(tag = "type", rename_all = "snake_case")]
900pub enum StepKind {
901    /// HTTP API call
902    ApiCall {
903        method: String,
904        url: String,
905        #[serde(skip_serializing_if = "Option::is_none")]
906        body: Option<serde_json::Value>,
907        #[serde(skip_serializing_if = "Option::is_none")]
908        headers: Option<HashMap<String, String>>,
909    },
910
911    /// Shell script / command
912    Script {
913        command: String,
914        #[serde(default)]
915        args: Vec<String>,
916        #[serde(default, skip_serializing_if = "Option::is_none")]
917        cwd: Option<String>,
918        #[serde(default, skip_serializing_if = "Option::is_none")]
919        env: Option<HashMap<String, String>>,
920        #[serde(default, skip_serializing_if = "Option::is_none")]
921        timeout_secs: Option<u64>,
922        #[serde(default, skip_serializing_if = "Option::is_none")]
923        output_format: Option<ScriptOutputFormat>,
924        #[serde(default, skip_serializing_if = "Option::is_none")]
925        shell: Option<ShellType>,
926    },
927
928    /// Delegate to a Distri agent (sub-agent run)
929    AgentRun {
930        agent_id: String,
931        prompt: String,
932        #[serde(default)]
933        tools: Vec<String>,
934        /// Skills to load for this agent step
935        #[serde(default)]
936        skills: Vec<String>,
937        /// Override model for this step
938        #[serde(default, skip_serializing_if = "Option::is_none")]
939        model: Option<String>,
940        /// Limit agent loop iterations
941        #[serde(default, skip_serializing_if = "Option::is_none")]
942        max_iterations: Option<u32>,
943    },
944
945    /// Single tool invocation — not a full agent loop
946    ToolCall {
947        /// Tool name (must be registered)
948        tool_name: String,
949        /// Tool input parameters
950        input: serde_json::Value,
951        /// Agent context to execute in (for tools needing agent-scoped permissions)
952        #[serde(default, skip_serializing_if = "Option::is_none")]
953        agent_id: Option<String>,
954    },
955
956    /// Conditional branch — evaluates expression against context
957    Condition {
958        expression: String,
959        if_true: Box<StepKind>,
960        #[serde(skip_serializing_if = "Option::is_none")]
961        if_false: Option<Box<StepKind>>,
962    },
963
964    /// No-op / marker step (for documentation or manual checkpoints)
965    Checkpoint { message: String },
966
967    /// Pause execution and wait for external/human input before continuing.
968    /// The workflow saves state and stops. A resume call provides the input
969    /// as the step result and continues from here.
970    WaitForInput {
971        /// Message to display to the human (what input is needed)
972        message: String,
973        /// Optional JSON Schema describing the expected input shape
974        #[serde(default, skip_serializing_if = "Option::is_none")]
975        schema: Option<serde_json::Value>,
976    },
977
978    /// Emit a channel reply (text + optional buttons). Rendered by the
979    /// gateway per channel. `text` / button fields support the standard
980    /// `{input.x}` / `{steps.id.x}` interpolation; `button_template`
981    /// fields additionally support `{item.x}` per `buttons_from`
982    /// element.
983    Reply {
984        text: String,
985        /// Static buttons (rows: outer = top-to-bottom).
986        #[serde(default, skip_serializing_if = "Vec::is_empty")]
987        buttons: Vec<Vec<distri_types::channel_commands::ReplyButtonSpec>>,
988        /// Context path resolving to an array; one button per element.
989        #[serde(default, skip_serializing_if = "Option::is_none")]
990        buttons_from: Option<String>,
991        /// Template applied per `buttons_from` element.
992        #[serde(default, skip_serializing_if = "Option::is_none")]
993        button_template:
994            Option<distri_types::channel_commands::ReplyButtonSpec>,
995    },
996}
997
998// ============================================================================
999// Step Requirement — what a step needs to run
1000// ============================================================================
1001
1002/// A capability required to execute a step.
1003/// Uses namespaced skill identifiers:
1004/// - `native:shell`, `native:browser`, `native:network` — built-in
1005/// - `{provider}:{service}` — connections (e.g., `google:drive`, `slack:chat`)
1006#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1007pub struct StepRequirement {
1008    /// Namespaced skill identifier.
1009    pub skill: String,
1010    /// Required permissions/scopes within the skill.
1011    #[serde(default)]
1012    pub permissions: Vec<String>,
1013    /// Optional extra constraints.
1014    #[serde(default, skip_serializing_if = "Option::is_none")]
1015    pub config: Option<serde_json::Value>,
1016}
1017
1018impl StepRequirement {
1019    /// Create a native skill requirement (prefixed with "native:").
1020    pub fn native(skill: &str) -> Self {
1021        Self {
1022            skill: format!("native:{}", skill),
1023            permissions: vec![],
1024            config: None,
1025        }
1026    }
1027
1028    /// Create a connection requirement (e.g., "google:drive").
1029    pub fn connection(provider: &str, service: &str) -> Self {
1030        Self {
1031            skill: format!("{}:{}", provider, service),
1032            permissions: vec![],
1033            config: None,
1034        }
1035    }
1036
1037    pub fn with_permissions(mut self, perms: Vec<&str>) -> Self {
1038        self.permissions = perms.into_iter().map(|s| s.to_string()).collect();
1039        self
1040    }
1041
1042    /// Get the namespace (part before ':').
1043    pub fn namespace(&self) -> Option<&str> {
1044        self.skill.split(':').next()
1045    }
1046
1047    /// Get the skill name (part after ':').
1048    pub fn skill_name(&self) -> Option<&str> {
1049        self.skill.split(':').nth(1)
1050    }
1051
1052    /// Check if this is a native skill.
1053    pub fn is_native(&self) -> bool {
1054        self.skill.starts_with("native:")
1055    }
1056
1057    /// Validate the requirement. Returns error message if invalid.
1058    pub fn validate(&self) -> Result<(), String> {
1059        if !self.skill.contains(':') {
1060            return Err(format!(
1061                "Invalid skill identifier '{}': must be namespaced (e.g., 'native:shell', 'google:drive')",
1062                self.skill
1063            ));
1064        }
1065
1066        if self.is_native() {
1067            let known = ["shell", "browser", "network", "agent", "tool"];
1068            if let Some(name) = self.skill_name() {
1069                if !known.contains(&name) {
1070                    return Err(format!(
1071                        "Unknown native skill '{}'. Known: {:?}",
1072                        name, known
1073                    ));
1074                }
1075            }
1076        }
1077
1078        Ok(())
1079    }
1080}
1081
1082// ============================================================================
1083// Checkpoint Strategy
1084// ============================================================================
1085
1086/// How workflow state is checkpointed between steps.
1087#[derive(Debug, Clone, Serialize, Deserialize)]
1088#[serde(tag = "type", rename_all = "snake_case")]
1089pub enum CheckpointStrategy {
1090    /// Redis-based, thread+task scoped, auto-TTL.
1091    Internal {
1092        #[serde(default, skip_serializing_if = "Option::is_none")]
1093        ttl_secs: Option<u64>,
1094    },
1095    /// Client-registered tool handles persistence.
1096    /// Tool must support actions: save, load, list.
1097    External { tool_name: String },
1098}
1099
1100impl Default for CheckpointStrategy {
1101    fn default() -> Self {
1102        CheckpointStrategy::Internal { ttl_secs: None }
1103    }
1104}
1105
1106/// Metadata about a checkpoint snapshot.
1107#[derive(Debug, Clone, Serialize, Deserialize)]
1108pub struct CheckpointMeta {
1109    pub checkpoint_id: String,
1110    pub workflow_id: String,
1111    pub step_id: String,
1112    pub created_at: DateTime<Utc>,
1113}
1114
1115// ============================================================================
1116// Enums
1117// ============================================================================
1118
1119// `TaskStatus` is the single status model for both workflow runs and
1120// steps — the old `WorkflowStatus` and `StepStatus` enums are gone.
1121// Two engine-level nuances are recorded where they already are
1122// recorded:
1123//   - a *blocked* step (unmet `StepRequirement`) → `TaskStatus::Failed`
1124//     with the reason in `WorkflowStepRun.error` ("Missing skills: …");
1125//   - a *skipped* step (`skip_if`, or unreachable from the entry point)
1126//     → `TaskStatus::Canceled` with a run note.
1127
1128#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
1129#[serde(rename_all = "snake_case")]
1130pub enum StepExecution {
1131    /// Must wait for previous step to complete.
1132    #[default]
1133    Sequential,
1134    /// Can run in parallel with other parallel steps at the same level.
1135    Parallel,
1136}
1137
1138#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1139#[serde(rename_all = "snake_case")]
1140pub enum ScriptOutputFormat {
1141    Text,
1142    Json,
1143    Stream,
1144}
1145
1146#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1147#[serde(rename_all = "snake_case")]
1148pub enum ShellType {
1149    Bash,
1150    Sh,
1151    Zsh,
1152}
1153
1154// ============================================================================
1155// Step Result
1156// ============================================================================
1157
1158#[derive(Debug, Clone, Serialize, Deserialize)]
1159pub struct StepResult {
1160    pub status: TaskStatus,
1161    pub result: Option<serde_json::Value>,
1162    pub error: Option<String>,
1163    /// Updates to merge into workflow context for subsequent steps.
1164    #[serde(skip_serializing_if = "Option::is_none")]
1165    pub context_updates: Option<serde_json::Value>,
1166}
1167
1168impl StepResult {
1169    pub fn done(result: serde_json::Value) -> Self {
1170        Self {
1171            status: TaskStatus::Completed,
1172            result: Some(result),
1173            error: None,
1174            context_updates: None,
1175        }
1176    }
1177
1178    pub fn done_with_context(result: serde_json::Value, updates: serde_json::Value) -> Self {
1179        Self {
1180            status: TaskStatus::Completed,
1181            result: Some(result),
1182            error: None,
1183            context_updates: Some(updates),
1184        }
1185    }
1186
1187    pub fn failed(error: &str) -> Self {
1188        Self {
1189            status: TaskStatus::Failed,
1190            result: None,
1191            error: Some(error.to_string()),
1192            context_updates: None,
1193        }
1194    }
1195
1196    pub fn skipped() -> Self {
1197        Self {
1198            status: TaskStatus::Canceled,
1199            result: None,
1200            error: None,
1201            context_updates: None,
1202        }
1203    }
1204}
1205
1206// ============================================================================
1207// Workflow Note
1208// ============================================================================
1209
1210#[derive(Debug, Clone, Serialize, Deserialize)]
1211pub struct WorkflowNote {
1212    pub step_id: String,
1213    pub message: String,
1214    pub at: DateTime<Utc>,
1215}
1216
1217// ============================================================================
1218// Workflow Run Summary (returned at end of execution)
1219// ============================================================================
1220
1221/// Snapshot of one finished step in a `WorkflowRunSummary`.
1222///
1223/// Surfaces `distri_types::TaskStatus` at the boundary — engine-internal
1224/// `StepStatus` distinctions (Blocked / Skipped) translate via
1225/// `StepStatus → TaskStatus`. The original phase is still recoverable
1226/// from the `error` field for Blocked steps.
1227#[derive(Debug, Clone, Serialize, Deserialize)]
1228pub struct WorkflowStepSummary {
1229    pub id: String,
1230    pub label: String,
1231    pub status: TaskStatus,
1232    #[serde(default, skip_serializing_if = "Option::is_none")]
1233    pub result: Option<serde_json::Value>,
1234    #[serde(default, skip_serializing_if = "Option::is_none")]
1235    pub error: Option<String>,
1236}
1237
1238/// Final summary of a workflow run — id, terminal status, and one row
1239/// per step. Returned to callers (e.g. the WorkflowAgent invoke result)
1240/// instead of an ad-hoc JSON shape.
1241#[derive(Debug, Clone, Serialize, Deserialize)]
1242pub struct WorkflowRunSummary {
1243    pub workflow_id: String,
1244    pub status: TaskStatus,
1245    pub steps: Vec<WorkflowStepSummary>,
1246}
1247
1248impl WorkflowRunSummary {
1249    /// Build a summary from a finished `WorkflowRun` and its terminal
1250    /// `WorkflowStatus`. Translates statuses to `TaskStatus` at the
1251    /// boundary so consumers don't need to know about the engine's
1252    /// internal enums.
1253    pub fn from_run(run: &WorkflowRun, status: TaskStatus) -> Self {
1254        let steps = run
1255            .steps()
1256            .iter()
1257            .zip(run.step_runs.iter())
1258            .map(|(step, sr)| WorkflowStepSummary {
1259                id: step.id.clone(),
1260                label: step.label.clone(),
1261                status: sr.status.clone(),
1262                result: sr.result.clone(),
1263                error: sr.error.clone(),
1264            })
1265            .collect();
1266        Self {
1267            workflow_id: run.id().to_string(),
1268            status,
1269            steps,
1270        }
1271    }
1272}