Skip to main content

distri_workflow/
types.rs

1//! Core types for the workflow engine.
2//!
3//! Two layers:
4//!
5//! - **Definition** (`WorkflowDefinition`, `WorkflowStep`): the static
6//!   template — what this workflow IS. Stored alongside the agent
7//!   config; never mutated by execution.
8//! - **Run** (`WorkflowRun`, `WorkflowStepRun`): runtime state for one
9//!   execution — status, current step pointer, shared context, per-step
10//!   result/error/timestamps. Built from a definition via
11//!   `WorkflowRun::new(definition)` and then mutated by the engine.
12//!
13//! Status uses the canonical `distri_types::TaskStatus` everywhere.
14//! Workflow-specific concepts that don't have a 1:1 TaskStatus value
15//! map as follows:
16//!
17//! | concept | TaskStatus | extra signal |
18//! |---|---|---|
19//! | step waiting for input / workflow paused | InputRequired | — |
20//! | step skipped (skip_if / entry-point) | Canceled | note appended |
21//! | step blocked (missing requirement) | Failed | error explains |
22//!
23//! Phase 2b will replace `WorkflowStateStore` with the cloud
24//! `TaskStore` + a `workflow_step_executions` sidecar so runs flow
25//! through the canonical task tree.
26
27use chrono::{DateTime, Utc};
28pub use distri_types::TaskStatus;
29use serde::{Deserialize, Serialize};
30use std::collections::HashMap;
31
32// ============================================================================
33// Workflow Definition (template)
34// ============================================================================
35
36/// A workflow is a DAG of steps. The definition is the *template* — no
37/// runtime state lives here. Use `WorkflowRun::new(definition)` to start
38/// an execution.
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct WorkflowDefinition {
41    pub id: String,
42    pub steps: Vec<WorkflowStep>,
43    /// JSON Schema describing required inputs for this workflow.
44    #[serde(default, skip_serializing_if = "Option::is_none")]
45    pub input_schema: Option<serde_json::Value>,
46    /// How workflow state is checkpointed between steps.
47    #[serde(default)]
48    pub checkpoint: CheckpointStrategy,
49    /// Named entry points for multi-entry workflows.
50    #[serde(default, skip_serializing_if = "Vec::is_empty")]
51    pub entry_points: Vec<EntryPoint>,
52}
53
54/// Built-in channel commands a workflow may not shadow. Mirrors
55/// `distri-gateway` `ChannelCommand::parse`.
56pub const BUILTIN_CHANNEL_COMMANDS: &[&str] = &[
57    "/start",
58    "/stop",
59    "/disconnect",
60    "/reset",
61    "/new",
62    "/newsession",
63    "/newthread",
64    "/status",
65    "/debug",
66    "/verbose",
67    "/help",
68    "/switch",
69    "/workspace",
70    "/context",
71    "/ctx",
72];
73
74impl WorkflowDefinition {
75    pub fn new(steps: Vec<WorkflowStep>) -> Self {
76        Self {
77            id: uuid::Uuid::new_v4().to_string(),
78            steps,
79            input_schema: None,
80            checkpoint: CheckpointStrategy::default(),
81            entry_points: vec![],
82        }
83    }
84
85    pub fn with_id(mut self, id: &str) -> Self {
86        self.id = id.to_string();
87        self
88    }
89
90    pub fn with_checkpoint(mut self, strategy: CheckpointStrategy) -> Self {
91        self.checkpoint = strategy;
92        self
93    }
94
95    pub fn with_entry_points(mut self, entry_points: Vec<EntryPoint>) -> Self {
96        self.entry_points = entry_points;
97        self
98    }
99
100    /// Get an entry point by ID.
101    pub fn entry_point(&self, id: &str) -> Option<&EntryPoint> {
102        self.entry_points.iter().find(|ep| ep.id == id)
103    }
104
105    /// Find all step IDs reachable from the given step (inclusive) by following
106    /// depends_on forward. Used by entry-point logic to mark unreachable steps
107    /// as skipped at run start.
108    pub fn reachable_from(&self, start_step_id: &str) -> std::collections::HashSet<String> {
109        use std::collections::{HashSet, VecDeque};
110
111        let mut reachable = HashSet::new();
112        let mut queue = VecDeque::new();
113        queue.push_back(start_step_id.to_string());
114
115        while let Some(current) = queue.pop_front() {
116            if !reachable.insert(current.clone()) {
117                continue;
118            }
119            for step in &self.steps {
120                if step.depends_on.contains(&current) && !reachable.contains(&step.id) {
121                    queue.push_back(step.id.clone());
122                }
123            }
124        }
125
126        reachable
127    }
128
129    /// Validate the channel-command surface declared by entry-point
130    /// triggers. Returns a precise error string on the first problem.
131    pub fn validate_channel_surface(&self) -> Result<(), String> {
132        use distri_types::WorkflowTrigger;
133        use std::collections::HashSet;
134
135        let step_ids: HashSet<&str> = self.steps.iter().map(|s| s.id.as_str()).collect();
136        let mut slash_names: HashSet<String> = HashSet::new();
137        let mut callback_ids: HashSet<String> = HashSet::new();
138        let mut message_count = 0usize;
139
140        for ep in &self.entry_points {
141            if !step_ids.contains(ep.starts_at.as_str()) {
142                return Err(format!(
143                    "entry point '{}' starts_at unknown step '{}'",
144                    ep.id, ep.starts_at
145                ));
146            }
147            for trigger in &ep.triggers {
148                match trigger {
149                    WorkflowTrigger::Slash { name, aliases, .. } => {
150                        for n in std::iter::once(name).chain(aliases.iter()) {
151                            let lower = n.to_lowercase();
152                            if BUILTIN_CHANNEL_COMMANDS.contains(&lower.as_str()) {
153                                return Err(format!(
154                                    "slash command '{n}' shadows a built-in command"
155                                ));
156                            }
157                            if !slash_names.insert(lower.clone()) {
158                                return Err(format!(
159                                    "entry point '{}': slash command '{}' is already declared",
160                                    ep.id, n
161                                ));
162                            }
163                        }
164                    }
165                    WorkflowTrigger::Callback { id, .. } => {
166                        if !callback_ids.insert(id.clone()) {
167                            return Err(format!(
168                                "entry point '{}': callback id '{}' is already declared",
169                                ep.id, id
170                            ));
171                        }
172                    }
173                    WorkflowTrigger::Message {} => message_count += 1,
174                    // Manual / Schedule / Webhook / Event / Tool don't
175                    // contribute to channel-command surface validation.
176                    _ => {}
177                }
178            }
179        }
180        if message_count > 1 {
181            return Err(format!(
182                "workflow declares {message_count} message catch-all entry \
183                 points; at most one is allowed"
184            ));
185        }
186        for step in &self.steps {
187            if let StepKind::Reply {
188                buttons_from,
189                button_template,
190                ..
191            } = &step.kind
192            {
193                if button_template.is_some() != buttons_from.is_some() {
194                    return Err(format!(
195                        "reply step '{}': button_template and buttons_from \
196                         must be set together",
197                        step.id
198                    ));
199                }
200            }
201        }
202        Ok(())
203    }
204
205    /// Detect circular dependencies in the workflow DAG.
206    /// Returns `Err` with the cycle description if found.
207    pub fn detect_cycles(&self) -> Result<(), String> {
208        use std::collections::{HashMap, HashSet};
209
210        let step_ids: HashSet<&str> = self.steps.iter().map(|s| s.id.as_str()).collect();
211        let mut adj: HashMap<&str, Vec<&str>> = HashMap::new();
212        for step in &self.steps {
213            adj.insert(
214                step.id.as_str(),
215                step.depends_on.iter().map(|s| s.as_str()).collect(),
216            );
217        }
218
219        let mut visited = HashSet::new();
220        let mut in_stack = HashSet::new();
221
222        fn dfs<'a>(
223            node: &'a str,
224            adj: &HashMap<&'a str, Vec<&'a str>>,
225            visited: &mut HashSet<&'a str>,
226            in_stack: &mut HashSet<&'a str>,
227            path: &mut Vec<&'a str>,
228        ) -> Result<(), String> {
229            visited.insert(node);
230            in_stack.insert(node);
231            path.push(node);
232
233            if let Some(deps) = adj.get(node) {
234                for &dep in deps {
235                    if !visited.contains(dep) {
236                        dfs(dep, adj, visited, in_stack, path)?;
237                    } else if in_stack.contains(dep) {
238                        let cycle_start = path.iter().position(|&n| n == dep).unwrap();
239                        let cycle: Vec<&str> = path[cycle_start..].to_vec();
240                        return Err(format!(
241                            "Circular dependency detected: {} → {}",
242                            cycle.join(" → "),
243                            dep
244                        ));
245                    }
246                }
247            }
248
249            in_stack.remove(node);
250            path.pop();
251            Ok(())
252        }
253
254        let mut path = Vec::new();
255        for step in &self.steps {
256            if !visited.contains(step.id.as_str()) {
257                dfs(
258                    step.id.as_str(),
259                    &adj,
260                    &mut visited,
261                    &mut in_stack,
262                    &mut path,
263                )?;
264            }
265        }
266
267        // Also check for references to non-existent steps
268        for step in &self.steps {
269            for dep in &step.depends_on {
270                if !step_ids.contains(dep.as_str()) {
271                    return Err(format!(
272                        "Step '{}' depends on '{}' which does not exist",
273                        step.id, dep
274                    ));
275                }
276            }
277        }
278
279        Ok(())
280    }
281}
282
283// ============================================================================
284// Workflow Run (execution state)
285// ============================================================================
286
287fn default_empty_object() -> serde_json::Value {
288    serde_json::json!({})
289}
290
291fn default_now() -> DateTime<Utc> {
292    Utc::now()
293}
294
295/// One execution of a `WorkflowDefinition`. Owns the definition plus all
296/// runtime state — status, shared context, per-step status / result /
297/// error / timestamps. The engine mutates a `WorkflowRun`.
298///
299/// `step_runs` is parallel to `definition.steps` (same length, same
300/// order).
301///
302/// **Wire shape**: `definition` is flattened, so a `WorkflowRun`
303/// serializes as one flat JSON object with the definition fields
304/// (`id`, `steps`, `input_schema`, …) alongside the runtime fields
305/// (`status`, `current_step`, `context`, `step_runs`, …). This
306/// keeps the persisted run row identical to the legacy monolithic
307/// shape.
308#[derive(Debug, Clone, Serialize, Deserialize)]
309pub struct WorkflowRun {
310    #[serde(flatten)]
311    pub definition: WorkflowDefinition,
312    #[serde(default)]
313    pub status: TaskStatus,
314    #[serde(default)]
315    pub current_step: usize,
316    #[serde(default = "default_empty_object")]
317    pub context: serde_json::Value,
318    #[serde(default)]
319    pub notes: Vec<WorkflowNote>,
320    #[serde(default)]
321    pub step_runs: Vec<WorkflowStepRun>,
322    #[serde(default = "default_now")]
323    pub created_at: DateTime<Utc>,
324    #[serde(default = "default_now")]
325    pub updated_at: DateTime<Utc>,
326}
327
328/// Per-step runtime state. Parallel to `WorkflowDefinition.steps[i]`.
329#[derive(Debug, Clone, Serialize, Deserialize, Default)]
330pub struct WorkflowStepRun {
331    pub step_id: String,
332    #[serde(default)]
333    pub status: TaskStatus,
334    #[serde(default, skip_serializing_if = "Option::is_none")]
335    pub result: Option<serde_json::Value>,
336    #[serde(default, skip_serializing_if = "Option::is_none")]
337    pub error: Option<String>,
338    #[serde(default, skip_serializing_if = "Option::is_none")]
339    pub started_at: Option<DateTime<Utc>>,
340    #[serde(default, skip_serializing_if = "Option::is_none")]
341    pub completed_at: Option<DateTime<Utc>>,
342}
343
344impl WorkflowRun {
345    /// Build a fresh run from a definition. All step_runs start `Pending`,
346    /// context is `{}`, status is `Pending`.
347    pub fn new(definition: WorkflowDefinition) -> Self {
348        let step_runs = definition
349            .steps
350            .iter()
351            .map(|s| WorkflowStepRun {
352                step_id: s.id.clone(),
353                ..Default::default()
354            })
355            .collect();
356        Self {
357            definition,
358            status: TaskStatus::Pending,
359            current_step: 0,
360            context: serde_json::json!({}),
361            notes: vec![],
362            step_runs,
363            created_at: Utc::now(),
364            updated_at: Utc::now(),
365        }
366    }
367
368    /// Convenience for callers that want to skip the explicit
369    /// definition struct (mostly tests).
370    pub fn from_steps(steps: Vec<WorkflowStep>) -> Self {
371        Self::new(WorkflowDefinition::new(steps))
372    }
373
374    pub fn with_context(mut self, context: serde_json::Value) -> Self {
375        self.context = context;
376        self
377    }
378
379    pub fn with_id(mut self, id: &str) -> Self {
380        self.definition.id = id.to_string();
381        self
382    }
383
384    pub fn with_checkpoint(mut self, strategy: CheckpointStrategy) -> Self {
385        self.definition.checkpoint = strategy;
386        self
387    }
388
389    pub fn with_entry_points(mut self, entry_points: Vec<EntryPoint>) -> Self {
390        self.definition.entry_points = entry_points;
391        self
392    }
393
394    pub fn id(&self) -> &str {
395        &self.definition.id
396    }
397
398    pub fn steps(&self) -> &[WorkflowStep] {
399        &self.definition.steps
400    }
401
402    pub fn step(&self, idx: usize) -> &WorkflowStep {
403        &self.definition.steps[idx]
404    }
405
406    pub fn step_run(&self, idx: usize) -> &WorkflowStepRun {
407        &self.step_runs[idx]
408    }
409
410    pub fn step_run_mut(&mut self, idx: usize) -> &mut WorkflowStepRun {
411        &mut self.step_runs[idx]
412    }
413
414    pub fn step_run_by_id(&self, step_id: &str) -> Option<&WorkflowStepRun> {
415        self.step_runs.iter().find(|s| s.step_id == step_id)
416    }
417
418    pub fn step_run_by_id_mut(&mut self, step_id: &str) -> Option<&mut WorkflowStepRun> {
419        self.step_runs.iter_mut().find(|s| s.step_id == step_id)
420    }
421
422    /// Apply an entry point: mark steps not reachable from `starts_at` as
423    /// Skipped, pre-populate their results from `preset_results`, and
424    /// merge those into context so downstream `{steps.X}` references work.
425    pub fn apply_entry_point(mut self, entry_point_id: &str) -> Result<Self, String> {
426        let ep = self
427            .definition
428            .entry_points
429            .iter()
430            .find(|ep| ep.id == entry_point_id)
431            .ok_or_else(|| format!("Entry point '{}' not found", entry_point_id))?
432            .clone();
433
434        if !self.definition.steps.iter().any(|s| s.id == ep.starts_at) {
435            return Err(format!(
436                "Entry point '{}' references step '{}' which does not exist",
437                entry_point_id, ep.starts_at
438            ));
439        }
440
441        let reachable = self.definition.reachable_from(&ep.starts_at);
442
443        for (i, step) in self.definition.steps.iter().enumerate() {
444            if !reachable.contains(&step.id) {
445                self.step_runs[i].status = TaskStatus::Canceled;
446                if let Some(result) = ep.preset_results.get(&step.id) {
447                    self.step_runs[i].result = Some(result.clone());
448                }
449            }
450        }
451
452        if let Some(ctx) = self.context.as_object_mut() {
453            let steps = ctx
454                .entry("steps")
455                .or_insert(serde_json::json!({}))
456                .as_object_mut()
457                .expect("steps must be an object");
458            for (step_id, result) in &ep.preset_results {
459                steps.insert(step_id.clone(), result.clone());
460            }
461        }
462
463        Ok(self)
464    }
465
466    /// Initialize the run with validated input. Input is validated
467    /// against `definition.input_schema` if present, then merged into
468    /// `context`. Status flips to Running.
469    pub fn with_input(mut self, input: serde_json::Value) -> Result<Self, String> {
470        if let Some(ref schema_value) = self.definition.input_schema {
471            let validator = jsonschema::validator_for(schema_value)
472                .map_err(|e| format!("Invalid input_schema: {e}"))?;
473
474            if !validator.is_valid(&input) {
475                let errors: Vec<String> = validator
476                    .iter_errors(&input)
477                    .map(|e| format!("{}", e))
478                    .collect();
479                return Err(format!("Input validation failed: {}", errors.join("; ")));
480            }
481        }
482
483        if let (Some(ctx), Some(inp)) = (self.context.as_object_mut(), input.as_object()) {
484            for (k, v) in inp {
485                ctx.insert(k.clone(), v.clone());
486            }
487            ctx.insert("input".to_string(), input.clone());
488        }
489
490        self.status = TaskStatus::Running;
491        self.updated_at = Utc::now();
492        Ok(self)
493    }
494
495    /// First pending step, if any.
496    pub fn next_pending_step(&self) -> Option<(usize, &WorkflowStep)> {
497        self.step_runs
498            .iter()
499            .enumerate()
500            .find(|(_, s)| s.status == TaskStatus::Pending)
501            .map(|(i, _)| (i, &self.definition.steps[i]))
502    }
503
504    /// All steps that can run now: pending + all dependencies completed.
505    /// Pure query — does not mutate.
506    pub fn runnable_steps(&self) -> Vec<(usize, &WorkflowStep)> {
507        let mut runnable = vec![];
508        for (i, step) in self.definition.steps.iter().enumerate() {
509            if self.step_runs[i].status != TaskStatus::Pending {
510                continue;
511            }
512            let deps_met = step.depends_on.iter().all(|dep_id| {
513                self.definition
514                    .steps
515                    .iter()
516                    .zip(self.step_runs.iter())
517                    .any(|(s, sr)| {
518                        &s.id == dep_id
519                            && matches!(sr.status, TaskStatus::Completed | TaskStatus::Canceled)
520                    })
521            });
522            if deps_met {
523                runnable.push((i, step));
524            }
525        }
526        runnable
527    }
528
529    pub fn is_complete(&self) -> bool {
530        self.step_runs.iter().all(|s| {
531            matches!(
532                s.status,
533                TaskStatus::Completed | TaskStatus::Canceled | TaskStatus::Failed
534            )
535        })
536    }
537
538    pub fn is_waiting_for_input(&self) -> bool {
539        self.step_runs
540            .iter()
541            .any(|s| s.status == TaskStatus::InputRequired)
542    }
543
544    pub fn waiting_step(&self) -> Option<(usize, &WorkflowStep)> {
545        self.step_runs
546            .iter()
547            .enumerate()
548            .find(|(_, s)| s.status == TaskStatus::InputRequired)
549            .map(|(i, _)| (i, &self.definition.steps[i]))
550    }
551
552    /// Resume a paused run by providing input for the waiting step.
553    pub fn resume_step(
554        &mut self,
555        step_id: &str,
556        result: serde_json::Value,
557    ) -> Result<usize, String> {
558        let idx = self
559            .step_runs
560            .iter()
561            .position(|s| s.step_id == step_id && s.status == TaskStatus::InputRequired)
562            .ok_or_else(|| {
563                format!(
564                    "Step '{}' not found or not in waiting_for_input state",
565                    step_id
566                )
567            })?;
568
569        self.step_runs[idx].status = TaskStatus::Completed;
570        self.step_runs[idx].result = Some(result.clone());
571        self.step_runs[idx].completed_at = Some(Utc::now());
572
573        if let Some(ctx) = self.context.as_object_mut() {
574            let steps = ctx
575                .entry("steps")
576                .or_insert(serde_json::json!({}))
577                .as_object_mut()
578                .expect("steps must be an object");
579            steps.insert(step_id.to_string(), result);
580        }
581
582        self.status = TaskStatus::Running;
583        self.updated_at = Utc::now();
584        Ok(idx)
585    }
586
587    /// Stuck: blocked steps, nothing running, no path forward.
588    pub fn is_stuck(&self) -> bool {
589        let has_blocked = self
590            .step_runs
591            .iter()
592            .any(|s| s.status == TaskStatus::Failed);
593        let has_pending = self
594            .step_runs
595            .iter()
596            .any(|s| s.status == TaskStatus::Pending);
597        let has_running = self
598            .step_runs
599            .iter()
600            .any(|s| s.status == TaskStatus::Running);
601
602        if !has_blocked || has_running {
603            return false;
604        }
605
606        if !has_pending {
607            return true;
608        }
609
610        !self
611            .definition
612            .steps
613            .iter()
614            .zip(self.step_runs.iter())
615            .any(|(step, run)| {
616                run.status == TaskStatus::Pending
617                    && step.depends_on.iter().all(|dep_id| {
618                        self.definition
619                            .steps
620                            .iter()
621                            .zip(self.step_runs.iter())
622                            .any(|(s, sr)| {
623                                &s.id == dep_id
624                                    && matches!(
625                                        sr.status,
626                                        TaskStatus::Completed
627                                            | TaskStatus::Pending
628                                            | TaskStatus::Running
629                                    )
630                            })
631                    })
632            })
633    }
634
635    pub fn has_failed(&self) -> bool {
636        self.step_runs
637            .iter()
638            .any(|s| s.status == TaskStatus::Failed)
639    }
640
641    /// Append a note to the run's log.
642    pub fn add_note(&mut self, step_id: &str, message: &str) {
643        self.notes.push(WorkflowNote {
644            step_id: step_id.to_string(),
645            message: message.to_string(),
646            at: Utc::now(),
647        });
648        self.updated_at = Utc::now();
649    }
650
651    /// Validate the underlying DAG. Convenience that delegates to the
652    /// definition's `detect_cycles`.
653    pub fn detect_cycles(&self) -> Result<(), String> {
654        self.definition.detect_cycles()
655    }
656}
657
658// ============================================================================
659// Entry Point — named starting positions for multi-entry workflows
660// ============================================================================
661
662/// A named entry point into a workflow.
663/// Allows workflows to be started at different steps depending on context.
664#[derive(Debug, Clone, Serialize, Deserialize)]
665pub struct EntryPoint {
666    /// Unique identifier for this entry point (e.g., "import_from_docs", "grade_only").
667    pub id: String,
668    /// Human-readable label.
669    pub label: String,
670    /// Optional description of when to use this entry point.
671    #[serde(default, skip_serializing_if = "Option::is_none")]
672    pub description: Option<String>,
673    /// The step ID where execution begins.
674    pub starts_at: String,
675    /// Pre-populated step results for steps that are skipped.
676    /// Maps step_id → result value. These steps are marked Done before execution starts.
677    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
678    pub preset_results: HashMap<String, serde_json::Value>,
679    /// Required input fields for this entry point (for UI/validation).
680    #[serde(default, skip_serializing_if = "Vec::is_empty")]
681    pub required_inputs: Vec<String>,
682    /// How this entry point is reached. Empty = default `Manual`
683    /// only (direct API/UI invocation). Multiple triggers can target
684    /// the same entry point — e.g. a slash command and a webhook
685    /// both starting the same run.
686    #[serde(default, skip_serializing_if = "Vec::is_empty")]
687    pub triggers: Vec<distri_types::WorkflowTrigger>,
688}
689
690// ============================================================================
691// Workflow Step (template)
692// ============================================================================
693
694/// A single step in a workflow. **Template only** — runtime status /
695/// result / timestamps live on `WorkflowStepRun`.
696#[derive(Debug, Clone, Serialize, Deserialize)]
697pub struct WorkflowStep {
698    pub id: String,
699    pub label: String,
700    pub kind: StepKind,
701    /// IDs of steps that must complete before this one can run.
702    #[serde(default)]
703    pub depends_on: Vec<String>,
704    /// Execution mode for this step.
705    #[serde(default)]
706    pub execution: StepExecution,
707    /// Capabilities required to run this step.
708    #[serde(default)]
709    pub requires: Vec<StepRequirement>,
710    /// Optional explicit input mapping for this step.
711    /// Values can reference `{input.X}`, `{steps.step_id.X}`, `{env.X}`.
712    /// If omitted, the step receives the full execution context.
713    #[serde(default, skip_serializing_if = "Option::is_none")]
714    pub input: Option<serde_json::Value>,
715    /// Skip this step if the expression evaluates to true against the workflow context.
716    /// Expression format: `{input.field_name}` — truthy check (field exists and is not null/false/empty).
717    /// Supports negation: `!{input.field_name}` — skip if field is absent/falsy.
718    #[serde(default, skip_serializing_if = "Option::is_none")]
719    pub skip_if: Option<String>,
720}
721
722impl WorkflowStep {
723    fn new_step(id: &str, label: &str, kind: StepKind) -> Self {
724        Self {
725            id: id.to_string(),
726            label: label.to_string(),
727            kind,
728            depends_on: vec![],
729            execution: StepExecution::Sequential,
730            requires: vec![],
731            input: None,
732            skip_if: None,
733        }
734    }
735
736    pub fn api_call(id: &str, label: &str, method: &str, url: &str) -> Self {
737        Self::new_step(
738            id,
739            label,
740            StepKind::ApiCall {
741                method: method.to_string(),
742                url: url.to_string(),
743                body: None,
744                headers: None,
745            },
746        )
747    }
748
749    pub fn agent_run(id: &str, label: &str, agent_id: &str, prompt: &str) -> Self {
750        Self::new_step(
751            id,
752            label,
753            StepKind::AgentRun {
754                agent_id: agent_id.to_string(),
755                prompt: prompt.to_string(),
756                tools: vec![],
757                skills: vec![],
758                model: None,
759                max_iterations: None,
760            },
761        )
762    }
763
764    pub fn script(id: &str, label: &str, command: &str) -> Self {
765        Self::new_step(
766            id,
767            label,
768            StepKind::Script {
769                command: command.to_string(),
770                args: vec![],
771                cwd: None,
772                env: None,
773                timeout_secs: None,
774                output_format: None,
775                shell: None,
776            },
777        )
778    }
779
780    pub fn tool_call(id: &str, label: &str, tool_name: &str, input: serde_json::Value) -> Self {
781        Self::new_step(
782            id,
783            label,
784            StepKind::ToolCall {
785                tool_name: tool_name.to_string(),
786                input,
787                agent_id: None,
788            },
789        )
790    }
791
792    pub fn condition(
793        id: &str,
794        label: &str,
795        expression: &str,
796        if_true: StepKind,
797        if_false: Option<StepKind>,
798    ) -> Self {
799        Self::new_step(
800            id,
801            label,
802            StepKind::Condition {
803                expression: expression.to_string(),
804                if_true: Box::new(if_true),
805                if_false: if_false.map(Box::new),
806            },
807        )
808    }
809
810    pub fn checkpoint(id: &str, label: &str, message: &str) -> Self {
811        Self::new_step(
812            id,
813            label,
814            StepKind::Checkpoint {
815                message: message.to_string(),
816            },
817        )
818    }
819
820    pub fn wait_for_input(id: &str, label: &str, message: &str) -> Self {
821        Self::new_step(
822            id,
823            label,
824            StepKind::WaitForInput {
825                message: message.to_string(),
826                schema: None,
827            },
828        )
829    }
830
831    pub fn with_body(mut self, body: serde_json::Value) -> Self {
832        if let StepKind::ApiCall {
833            body: ref mut b, ..
834        } = self.kind
835        {
836            *b = Some(body);
837        }
838        self
839    }
840
841    pub fn with_depends_on(mut self, deps: Vec<&str>) -> Self {
842        self.depends_on = deps.into_iter().map(|s| s.to_string()).collect();
843        self
844    }
845
846    pub fn parallel(mut self) -> Self {
847        self.execution = StepExecution::Parallel;
848        self
849    }
850
851    pub fn with_requires(mut self, requires: Vec<StepRequirement>) -> Self {
852        self.requires = requires;
853        self
854    }
855
856    pub fn with_cwd(mut self, cwd: &str) -> Self {
857        if let StepKind::Script { cwd: ref mut c, .. } = self.kind {
858            *c = Some(cwd.to_string());
859        }
860        self
861    }
862
863    pub fn with_timeout(mut self, secs: u64) -> Self {
864        if let StepKind::Script {
865            timeout_secs: ref mut t,
866            ..
867        } = self.kind
868        {
869            *t = Some(secs);
870        }
871        self
872    }
873
874    pub fn with_env(mut self, env: HashMap<String, String>) -> Self {
875        if let StepKind::Script { env: ref mut e, .. } = self.kind {
876            *e = Some(env);
877        }
878        self
879    }
880
881    pub fn with_input_mapping(mut self, input: serde_json::Value) -> Self {
882        self.input = Some(input);
883        self
884    }
885
886    pub fn with_skip_if(mut self, expression: &str) -> Self {
887        self.skip_if = Some(expression.to_string());
888        self
889    }
890}
891
892// ============================================================================
893// Step Kind — what the step does
894// ============================================================================
895
896#[derive(Debug, Clone, Serialize, Deserialize)]
897#[serde(tag = "type", rename_all = "snake_case")]
898pub enum StepKind {
899    /// HTTP API call
900    ApiCall {
901        method: String,
902        url: String,
903        #[serde(skip_serializing_if = "Option::is_none")]
904        body: Option<serde_json::Value>,
905        #[serde(skip_serializing_if = "Option::is_none")]
906        headers: Option<HashMap<String, String>>,
907    },
908
909    /// Shell script / command
910    Script {
911        command: String,
912        #[serde(default)]
913        args: Vec<String>,
914        #[serde(default, skip_serializing_if = "Option::is_none")]
915        cwd: Option<String>,
916        #[serde(default, skip_serializing_if = "Option::is_none")]
917        env: Option<HashMap<String, String>>,
918        #[serde(default, skip_serializing_if = "Option::is_none")]
919        timeout_secs: Option<u64>,
920        #[serde(default, skip_serializing_if = "Option::is_none")]
921        output_format: Option<ScriptOutputFormat>,
922        #[serde(default, skip_serializing_if = "Option::is_none")]
923        shell: Option<ShellType>,
924    },
925
926    /// Delegate to a Distri agent (sub-agent run)
927    AgentRun {
928        agent_id: String,
929        prompt: String,
930        #[serde(default)]
931        tools: Vec<String>,
932        /// Skills to load for this agent step
933        #[serde(default)]
934        skills: Vec<String>,
935        /// Override model for this step
936        #[serde(default, skip_serializing_if = "Option::is_none")]
937        model: Option<String>,
938        /// Limit agent loop iterations
939        #[serde(default, skip_serializing_if = "Option::is_none")]
940        max_iterations: Option<u32>,
941    },
942
943    /// Single tool invocation — not a full agent loop
944    ToolCall {
945        /// Tool name (must be registered)
946        tool_name: String,
947        /// Tool input parameters
948        input: serde_json::Value,
949        /// Agent context to execute in (for tools needing agent-scoped permissions)
950        #[serde(default, skip_serializing_if = "Option::is_none")]
951        agent_id: Option<String>,
952    },
953
954    /// Conditional branch — evaluates expression against context
955    Condition {
956        expression: String,
957        if_true: Box<StepKind>,
958        #[serde(skip_serializing_if = "Option::is_none")]
959        if_false: Option<Box<StepKind>>,
960    },
961
962    /// No-op / marker step (for documentation or manual checkpoints)
963    Checkpoint { message: String },
964
965    /// Pause execution and wait for external/human input before continuing.
966    /// The workflow saves state and stops. A resume call provides the input
967    /// as the step result and continues from here.
968    WaitForInput {
969        /// Message to display to the human (what input is needed)
970        message: String,
971        /// Optional JSON Schema describing the expected input shape
972        #[serde(default, skip_serializing_if = "Option::is_none")]
973        schema: Option<serde_json::Value>,
974    },
975
976    /// Emit a channel reply (text + optional buttons). Rendered by the
977    /// gateway per channel. `text` / button fields support the standard
978    /// `{input.x}` / `{steps.id.x}` interpolation; `button_template`
979    /// fields additionally support `{item.x}` per `buttons_from`
980    /// element.
981    Reply {
982        text: String,
983        /// Static buttons (rows: outer = top-to-bottom).
984        #[serde(default, skip_serializing_if = "Vec::is_empty")]
985        buttons: Vec<Vec<distri_types::channel_commands::ReplyButtonSpec>>,
986        /// Context path resolving to an array; one button per element.
987        #[serde(default, skip_serializing_if = "Option::is_none")]
988        buttons_from: Option<String>,
989        /// Template applied per `buttons_from` element.
990        #[serde(default, skip_serializing_if = "Option::is_none")]
991        button_template: Option<distri_types::channel_commands::ReplyButtonSpec>,
992    },
993}
994
995// ============================================================================
996// Step Requirement — what a step needs to run
997// ============================================================================
998
999/// A capability required to execute a step.
1000/// Uses namespaced skill identifiers:
1001/// - `native:shell`, `native:browser`, `native:network` — built-in
1002/// - `{provider}:{service}` — connections (e.g., `google:drive`, `slack:chat`)
1003#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1004pub struct StepRequirement {
1005    /// Namespaced skill identifier.
1006    pub skill: String,
1007    /// Required permissions/scopes within the skill.
1008    #[serde(default)]
1009    pub permissions: Vec<String>,
1010    /// Optional extra constraints.
1011    #[serde(default, skip_serializing_if = "Option::is_none")]
1012    pub config: Option<serde_json::Value>,
1013}
1014
1015impl StepRequirement {
1016    /// Create a native skill requirement (prefixed with "native:").
1017    pub fn native(skill: &str) -> Self {
1018        Self {
1019            skill: format!("native:{}", skill),
1020            permissions: vec![],
1021            config: None,
1022        }
1023    }
1024
1025    /// Create a connection requirement (e.g., "google:drive").
1026    pub fn connection(provider: &str, service: &str) -> Self {
1027        Self {
1028            skill: format!("{}:{}", provider, service),
1029            permissions: vec![],
1030            config: None,
1031        }
1032    }
1033
1034    pub fn with_permissions(mut self, perms: Vec<&str>) -> Self {
1035        self.permissions = perms.into_iter().map(|s| s.to_string()).collect();
1036        self
1037    }
1038
1039    /// Get the namespace (part before ':').
1040    pub fn namespace(&self) -> Option<&str> {
1041        self.skill.split(':').next()
1042    }
1043
1044    /// Get the skill name (part after ':').
1045    pub fn skill_name(&self) -> Option<&str> {
1046        self.skill.split(':').nth(1)
1047    }
1048
1049    /// Check if this is a native skill.
1050    pub fn is_native(&self) -> bool {
1051        self.skill.starts_with("native:")
1052    }
1053
1054    /// Validate the requirement. Returns error message if invalid.
1055    pub fn validate(&self) -> Result<(), String> {
1056        if !self.skill.contains(':') {
1057            return Err(format!(
1058                "Invalid skill identifier '{}': must be namespaced (e.g., 'native:shell', 'google:drive')",
1059                self.skill
1060            ));
1061        }
1062
1063        if self.is_native() {
1064            let known = ["shell", "browser", "network", "agent", "tool"];
1065            if let Some(name) = self.skill_name() {
1066                if !known.contains(&name) {
1067                    return Err(format!(
1068                        "Unknown native skill '{}'. Known: {:?}",
1069                        name, known
1070                    ));
1071                }
1072            }
1073        }
1074
1075        Ok(())
1076    }
1077}
1078
1079// ============================================================================
1080// Checkpoint Strategy
1081// ============================================================================
1082
1083/// How workflow state is checkpointed between steps.
1084#[derive(Debug, Clone, Serialize, Deserialize)]
1085#[serde(tag = "type", rename_all = "snake_case")]
1086pub enum CheckpointStrategy {
1087    /// Redis-based, thread+task scoped, auto-TTL.
1088    Internal {
1089        #[serde(default, skip_serializing_if = "Option::is_none")]
1090        ttl_secs: Option<u64>,
1091    },
1092    /// Client-registered tool handles persistence.
1093    /// Tool must support actions: save, load, list.
1094    External { tool_name: String },
1095}
1096
1097impl Default for CheckpointStrategy {
1098    fn default() -> Self {
1099        CheckpointStrategy::Internal { ttl_secs: None }
1100    }
1101}
1102
1103/// Metadata about a checkpoint snapshot.
1104#[derive(Debug, Clone, Serialize, Deserialize)]
1105pub struct CheckpointMeta {
1106    pub checkpoint_id: String,
1107    pub workflow_id: String,
1108    pub step_id: String,
1109    pub created_at: DateTime<Utc>,
1110}
1111
1112// ============================================================================
1113// Enums
1114// ============================================================================
1115
1116// `TaskStatus` is the single status model for both workflow runs and
1117// steps — the old `WorkflowStatus` and `StepStatus` enums are gone.
1118// Two engine-level nuances are recorded where they already are
1119// recorded:
1120//   - a *blocked* step (unmet `StepRequirement`) → `TaskStatus::Failed`
1121//     with the reason in `WorkflowStepRun.error` ("Missing skills: …");
1122//   - a *skipped* step (`skip_if`, or unreachable from the entry point)
1123//     → `TaskStatus::Canceled` with a run note.
1124
1125#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
1126#[serde(rename_all = "snake_case")]
1127pub enum StepExecution {
1128    /// Must wait for previous step to complete.
1129    #[default]
1130    Sequential,
1131    /// Can run in parallel with other parallel steps at the same level.
1132    Parallel,
1133}
1134
1135#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1136#[serde(rename_all = "snake_case")]
1137pub enum ScriptOutputFormat {
1138    Text,
1139    Json,
1140    Stream,
1141}
1142
1143#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1144#[serde(rename_all = "snake_case")]
1145pub enum ShellType {
1146    Bash,
1147    Sh,
1148    Zsh,
1149}
1150
1151// ============================================================================
1152// Step Result
1153// ============================================================================
1154
1155#[derive(Debug, Clone, Serialize, Deserialize)]
1156pub struct StepResult {
1157    pub status: TaskStatus,
1158    pub result: Option<serde_json::Value>,
1159    pub error: Option<String>,
1160    /// Updates to merge into workflow context for subsequent steps.
1161    #[serde(skip_serializing_if = "Option::is_none")]
1162    pub context_updates: Option<serde_json::Value>,
1163}
1164
1165impl StepResult {
1166    pub fn done(result: serde_json::Value) -> Self {
1167        Self {
1168            status: TaskStatus::Completed,
1169            result: Some(result),
1170            error: None,
1171            context_updates: None,
1172        }
1173    }
1174
1175    pub fn done_with_context(result: serde_json::Value, updates: serde_json::Value) -> Self {
1176        Self {
1177            status: TaskStatus::Completed,
1178            result: Some(result),
1179            error: None,
1180            context_updates: Some(updates),
1181        }
1182    }
1183
1184    pub fn failed(error: &str) -> Self {
1185        Self {
1186            status: TaskStatus::Failed,
1187            result: None,
1188            error: Some(error.to_string()),
1189            context_updates: None,
1190        }
1191    }
1192
1193    pub fn skipped() -> Self {
1194        Self {
1195            status: TaskStatus::Canceled,
1196            result: None,
1197            error: None,
1198            context_updates: None,
1199        }
1200    }
1201}
1202
1203// ============================================================================
1204// Workflow Note
1205// ============================================================================
1206
1207#[derive(Debug, Clone, Serialize, Deserialize)]
1208pub struct WorkflowNote {
1209    pub step_id: String,
1210    pub message: String,
1211    pub at: DateTime<Utc>,
1212}
1213
1214// ============================================================================
1215// Workflow Run Summary (returned at end of execution)
1216// ============================================================================
1217
1218/// Snapshot of one finished step in a `WorkflowRunSummary`.
1219///
1220/// Surfaces `distri_types::TaskStatus` at the boundary — engine-internal
1221/// `StepStatus` distinctions (Blocked / Skipped) translate via
1222/// `StepStatus → TaskStatus`. The original phase is still recoverable
1223/// from the `error` field for Blocked steps.
1224#[derive(Debug, Clone, Serialize, Deserialize)]
1225pub struct WorkflowStepSummary {
1226    pub id: String,
1227    pub label: String,
1228    pub status: TaskStatus,
1229    #[serde(default, skip_serializing_if = "Option::is_none")]
1230    pub result: Option<serde_json::Value>,
1231    #[serde(default, skip_serializing_if = "Option::is_none")]
1232    pub error: Option<String>,
1233}
1234
1235/// Final summary of a workflow run — id, terminal status, and one row
1236/// per step. Returned to callers (e.g. the WorkflowAgent invoke result)
1237/// instead of an ad-hoc JSON shape.
1238#[derive(Debug, Clone, Serialize, Deserialize)]
1239pub struct WorkflowRunSummary {
1240    pub workflow_id: String,
1241    pub status: TaskStatus,
1242    pub steps: Vec<WorkflowStepSummary>,
1243}
1244
1245impl WorkflowRunSummary {
1246    /// Build a summary from a finished `WorkflowRun` and its terminal
1247    /// `WorkflowStatus`. Translates statuses to `TaskStatus` at the
1248    /// boundary so consumers don't need to know about the engine's
1249    /// internal enums.
1250    pub fn from_run(run: &WorkflowRun, status: TaskStatus) -> Self {
1251        let steps = run
1252            .steps()
1253            .iter()
1254            .zip(run.step_runs.iter())
1255            .map(|(step, sr)| WorkflowStepSummary {
1256                id: step.id.clone(),
1257                label: step.label.clone(),
1258                status: sr.status.clone(),
1259                result: sr.result.clone(),
1260                error: sr.error.clone(),
1261            })
1262            .collect();
1263        Self {
1264            workflow_id: run.id().to_string(),
1265            status,
1266            steps,
1267        }
1268    }
1269}