Skip to main content

car_workflow/
types.rs

1//! Workflow definition types — fully serializable for use through all bindings.
2
3use std::collections::HashMap;
4
5use car_ir::{ActionProposal, Precondition};
6use car_multi::AgentSpec;
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9
10/// Top-level workflow definition: a named graph of stages with conditional edges.
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct Workflow {
13    /// Unique identifier.
14    pub id: String,
15    /// Human-readable name.
16    pub name: String,
17    /// Stage ID of the entry point.
18    pub start: String,
19    /// The overall objective. When set, it is pinned into workflow state as
20    /// `goal` at run start and re-anchored into every agent coordination's task
21    /// ("Overall goal: … / Current step: …") so a long multi-stage run can't
22    /// drift from the original objective. A structural defense against goal drift.
23    #[serde(default, skip_serializing_if = "Option::is_none")]
24    pub goal: Option<String>,
25    /// All stages in this workflow.
26    pub stages: Vec<Stage>,
27    /// Directed edges between stages (may have conditions).
28    pub edges: Vec<Edge>,
29    /// Maximum total stage executions before aborting (loop guard).
30    #[serde(default = "default_max_iterations")]
31    pub max_iterations: u32,
32    /// Opaque metadata (owner, version, tags, etc.).
33    #[serde(default)]
34    pub metadata: HashMap<String, Value>,
35}
36
37fn default_max_iterations() -> u32 {
38    100
39}
40
41/// A named step in the workflow.
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct Stage {
44    /// Unique stage identifier (referenced by edges).
45    pub id: String,
46    /// Human-readable name.
47    pub name: String,
48    /// What this stage does.
49    pub step: StageStep,
50    /// Optional compensation to run if a later stage fails (saga pattern).
51    #[serde(default, skip_serializing_if = "Option::is_none")]
52    pub compensation: Option<CompensationHandler>,
53    /// Optional timeout for this stage in milliseconds.
54    #[serde(default, skip_serializing_if = "Option::is_none")]
55    pub timeout_ms: Option<u64>,
56    /// Opaque metadata.
57    #[serde(default)]
58    pub metadata: HashMap<String, Value>,
59}
60
61/// What a stage actually does.
62#[derive(Debug, Clone, Serialize, Deserialize)]
63#[serde(tag = "type", rename_all = "snake_case")]
64pub enum StageStep {
65    /// Run a car-multi agent coordination pattern.
66    Pattern(PatternStep),
67    /// Execute a car-engine action proposal directly.
68    Proposal(ProposalStep),
69    /// Run a nested sub-workflow.
70    SubWorkflow(SubWorkflowStep),
71    /// Pause the workflow and wait for human input (human-in-the-loop gate).
72    ///
73    /// The engine snapshots the run, returns [`crate::WorkflowStatus::Paused`]
74    /// with a [`crate::PausedWorkflow`] checkpoint, and does not execute this
75    /// stage's body. The run continues only when
76    /// [`crate::WorkflowEngine::resume`] is called with the human's response.
77    Approval(ApprovalStep),
78    /// Repeat an inner step until a state predicate holds or a cap is reached.
79    ///
80    /// This makes the blog-style "loop until done" idiom a first-class,
81    /// verifiable construct rather than something hand-wired from a back-edge,
82    /// a counter stage, and a conditional edge. Bounded by `max_iterations`.
83    LoopUntil(LoopUntilStep),
84    /// Fan an inner step out over a list of items resolved **at runtime** from
85    /// workflow state (e.g. produced by an earlier stage), not hardcoded in the
86    /// manifest. This is the declarative analogue of `parallel(items.map(...))`
87    /// where the item count is unknown until the run reaches this stage.
88    ForEach(ForEachStep),
89}
90
91/// Repeat `body` until `until` holds (AND of preconditions over workflow state)
92/// or `max_iterations` is reached — whichever comes first.
93#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct LoopUntilStep {
95    /// The step run each iteration. Its produced state (proposal `state_changes`,
96    /// and `stage.<loop_id>.answer`/`.iteration`) is visible to `until`.
97    pub body: Box<StageStep>,
98    /// Loop stops once ALL of these pass (AND) against the state after a body
99    /// iteration. Empty `until` runs exactly `max_iterations` times.
100    #[serde(default)]
101    pub until: Vec<Precondition>,
102    /// Hard cap on body iterations. Must be >= 1 (enforced by
103    /// [`crate::verify_workflow`]); the loop guard against unbounded repetition.
104    pub max_iterations: u32,
105}
106
107/// Run `body` once per element of the array at state key `items_from`.
108#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct ForEachStep {
110    /// Workflow-state key holding a JSON array. Each element becomes one item.
111    /// A missing key or non-array value yields zero iterations (a no-op).
112    pub items_from: String,
113    /// The step to run per item. Before each run, the placeholders `{{item}}`
114    /// (the element rendered as a string) and `{{index}}` (0-based position) are
115    /// substituted into every string *value* in the body — pattern tasks, agent
116    /// prompts, proposal parameter values — so the body can be parameterized by
117    /// the item. (JSON object *keys* are not templated; `{{item}}` is expanded
118    /// before `{{index}}`, so item text containing `{{index}}` is itself
119    /// expanded.)
120    ///
121    /// Per-item results land in workflow state under
122    /// `foreach.<id>.<index>.{item,answer}`, and the body's own state deltas
123    /// (e.g. a proposal's `state_changes`) under
124    /// `foreach.<id>.<index>.state.<key>` — namespaced per item so concurrent
125    /// bodies never clobber one another. (Body deltas are deliberately NOT
126    /// merged under their bare keys, unlike `LoopUntil`, which runs serially.)
127    pub body: Box<StageStep>,
128    /// Max bodies to run concurrently. `0` or `1` means sequential. Higher fans
129    /// out via the runtime's bounded concurrency.
130    #[serde(default)]
131    pub max_concurrent: usize,
132}
133
134/// Run one of the car-multi coordination patterns.
135#[derive(Debug, Clone, Serialize, Deserialize)]
136pub struct PatternStep {
137    /// Which pattern to run.
138    pub pattern: PatternKind,
139    /// Task description passed to the pattern.
140    pub task: String,
141    /// Agents involved. Interpretation depends on pattern kind.
142    pub agents: Vec<AgentSpec>,
143    /// Pattern-specific configuration.
144    /// - supervisor: `{"max_rounds": 3, "supervisor_index": 0}`
145    /// - map_reduce: `{"max_concurrent": 5, "items": ["a", "b", "c"]}`
146    /// - fleet: `{"timeout_secs": 60}`
147    #[serde(default)]
148    pub config: HashMap<String, Value>,
149}
150
151/// All supported car-multi coordination patterns.
152#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
153#[serde(rename_all = "snake_case")]
154pub enum PatternKind {
155    SwarmParallel,
156    SwarmSequential,
157    SwarmDebate,
158    Pipeline,
159    Supervisor,
160    Delegator,
161    MapReduce,
162    Vote,
163    Fleet,
164    /// Fresh-context verification: a reviewer agent checks prior work against
165    /// acceptance criteria with no author context. A structural defense against
166    /// self-preferential bias and premature "done". `agents[0]` is the reviewer;
167    /// `config.criteria` is a string array; `config.review_key` is the state key
168    /// holding the work to review (required — there is no default; a missing or
169    /// empty value fails the review closed). The verdict is exposed as the typed
170    /// `stage.<id>.review_passed` (bool) for edge branching.
171    AdversarialReview,
172    /// Rank competitors by single-elimination pairwise judging. All agents but
173    /// the last are competitors; the last (or `config.judge_index`) is the judge.
174    Tournament,
175}
176
177/// Execute a car-engine action proposal.
178#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct ProposalStep {
180    pub proposal: ActionProposal,
181}
182
183/// Run a nested workflow.
184#[derive(Debug, Clone, Serialize, Deserialize)]
185pub struct SubWorkflowStep {
186    pub workflow: Box<Workflow>,
187}
188
189/// Pause for human-in-the-loop approval or input.
190#[derive(Debug, Clone, Serialize, Deserialize)]
191pub struct ApprovalStep {
192    /// Message shown to the human when the run pauses.
193    pub prompt: String,
194    /// Structured fields the human is asked to fill in (empty = free-form approval).
195    #[serde(default)]
196    pub fields: Vec<ApprovalField>,
197    /// State key under which the human's response object is written on resume.
198    ///
199    /// The response is also mirrored to `stage.<id>.answer` for edge conditions.
200    /// Must be non-empty (enforced by [`crate::verify_workflow`]).
201    pub output_key: String,
202}
203
204/// One field in an [`ApprovalStep`] form.
205#[derive(Debug, Clone, Serialize, Deserialize)]
206pub struct ApprovalField {
207    /// Machine name (key in the response object).
208    pub name: String,
209    /// Human-readable label.
210    #[serde(default)]
211    pub label: String,
212    /// Input kind: `text`, `textarea`, `options`, `bool`, or `number`.
213    #[serde(default = "default_field_type")]
214    pub field_type: String,
215    /// Allowed values when `field_type` is `options`.
216    #[serde(default, skip_serializing_if = "Vec::is_empty")]
217    pub options: Vec<String>,
218    /// Whether the human must supply this field.
219    #[serde(default)]
220    pub required: bool,
221}
222
223fn default_field_type() -> String {
224    "text".to_string()
225}
226
227/// Directed edge between two stages, optionally conditional.
228#[derive(Debug, Clone, Serialize, Deserialize)]
229pub struct Edge {
230    /// Source stage ID.
231    pub from: String,
232    /// Target stage ID.
233    pub to: String,
234    /// Conditions that must ALL be satisfied (AND) for this edge to be taken.
235    /// Evaluated against the workflow state after `from` completes.
236    /// Empty = unconditional.
237    #[serde(default)]
238    pub conditions: Vec<Precondition>,
239    /// Human-readable label (e.g., "on success", "if approved").
240    #[serde(default)]
241    pub label: String,
242}
243
244/// What to run when compensating a stage on workflow failure (saga pattern).
245#[derive(Debug, Clone, Serialize, Deserialize)]
246#[serde(tag = "type", rename_all = "snake_case")]
247pub enum CompensationHandler {
248    /// Run a proposal to undo side effects.
249    Proposal(ProposalStep),
250    /// Execute a named stage from this workflow.
251    StageRef { stage_id: String },
252}
253
254impl Workflow {
255    /// Look up a stage by ID.
256    pub fn stage(&self, id: &str) -> Option<&Stage> {
257        self.stages.iter().find(|s| s.id == id)
258    }
259
260    /// Get all outgoing edges from a stage.
261    pub fn outgoing_edges(&self, stage_id: &str) -> Vec<&Edge> {
262        self.edges.iter().filter(|e| e.from == stage_id).collect()
263    }
264}
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269
270    fn simple_workflow() -> Workflow {
271        Workflow {
272            id: "test-wf".into(),
273            name: "Test Workflow".into(),
274            start: "stage-a".into(),
275            goal: None,
276            stages: vec![
277                Stage {
278                    id: "stage-a".into(),
279                    name: "Stage A".into(),
280                    step: StageStep::Proposal(ProposalStep {
281                        proposal: ActionProposal {
282                            id: "p1".into(),
283                            source: "test".into(),
284                            actions: vec![],
285                            timestamp: chrono::Utc::now(),
286                            context: HashMap::new(),
287                        },
288                    }),
289                    compensation: None,
290                    timeout_ms: None,
291                    metadata: HashMap::new(),
292                },
293                Stage {
294                    id: "stage-b".into(),
295                    name: "Stage B".into(),
296                    step: StageStep::Proposal(ProposalStep {
297                        proposal: ActionProposal {
298                            id: "p2".into(),
299                            source: "test".into(),
300                            actions: vec![],
301                            timestamp: chrono::Utc::now(),
302                            context: HashMap::new(),
303                        },
304                    }),
305                    compensation: None,
306                    timeout_ms: None,
307                    metadata: HashMap::new(),
308                },
309            ],
310            edges: vec![Edge {
311                from: "stage-a".into(),
312                to: "stage-b".into(),
313                conditions: vec![],
314                label: "always".into(),
315            }],
316            max_iterations: 100,
317            metadata: HashMap::new(),
318        }
319    }
320
321    #[test]
322    fn serde_roundtrip() {
323        let wf = simple_workflow();
324        let json = serde_json::to_string_pretty(&wf).unwrap();
325        let parsed: Workflow = serde_json::from_str(&json).unwrap();
326        assert_eq!(parsed.id, "test-wf");
327        assert_eq!(parsed.stages.len(), 2);
328        assert_eq!(parsed.edges.len(), 1);
329        assert_eq!(parsed.start, "stage-a");
330    }
331
332    #[test]
333    fn stage_lookup() {
334        let wf = simple_workflow();
335        assert!(wf.stage("stage-a").is_some());
336        assert!(wf.stage("nonexistent").is_none());
337    }
338
339    #[test]
340    fn outgoing_edges() {
341        let wf = simple_workflow();
342        assert_eq!(wf.outgoing_edges("stage-a").len(), 1);
343        assert_eq!(wf.outgoing_edges("stage-b").len(), 0);
344    }
345}