car-workflow 0.32.1

Declarative multi-stage workflow orchestration for Common Agent Runtime
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
//! Workflow definition types — fully serializable for use through all bindings.

use std::collections::HashMap;

use car_ir::{ActionProposal, Precondition};
use car_multi::AgentSpec;
use serde::{Deserialize, Serialize};
use serde_json::Value;

/// Top-level workflow definition: a named graph of stages with conditional edges.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Workflow {
    /// Unique identifier.
    pub id: String,
    /// Human-readable name.
    pub name: String,
    /// Stage ID of the entry point.
    pub start: String,
    /// The overall objective. When set, it is pinned into workflow state as
    /// `goal` at run start and re-anchored into every agent coordination's task
    /// ("Overall goal: … / Current step: …") so a long multi-stage run can't
    /// drift from the original objective. A structural defense against goal drift.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub goal: Option<String>,
    /// All stages in this workflow.
    pub stages: Vec<Stage>,
    /// Directed edges between stages (may have conditions).
    pub edges: Vec<Edge>,
    /// Maximum total stage executions before aborting (loop guard).
    #[serde(default = "default_max_iterations")]
    pub max_iterations: u32,
    /// Opaque metadata (owner, version, tags, etc.).
    #[serde(default)]
    pub metadata: HashMap<String, Value>,
}

fn default_max_iterations() -> u32 {
    100
}

/// A named step in the workflow.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Stage {
    /// Unique stage identifier (referenced by edges).
    pub id: String,
    /// Human-readable name.
    pub name: String,
    /// What this stage does.
    pub step: StageStep,
    /// Optional compensation to run if a later stage fails (saga pattern).
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub compensation: Option<CompensationHandler>,
    /// Optional timeout for this stage in milliseconds.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub timeout_ms: Option<u64>,
    /// Opaque metadata.
    #[serde(default)]
    pub metadata: HashMap<String, Value>,
}

/// What a stage actually does.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum StageStep {
    /// Run a car-multi agent coordination pattern.
    Pattern(PatternStep),
    /// Execute a car-engine action proposal directly.
    Proposal(ProposalStep),
    /// Run a nested sub-workflow.
    SubWorkflow(SubWorkflowStep),
    /// Pause the workflow and wait for human input (human-in-the-loop gate).
    ///
    /// The engine snapshots the run, returns [`crate::WorkflowStatus::Paused`]
    /// with a [`crate::PausedWorkflow`] checkpoint, and does not execute this
    /// stage's body. The run continues only when
    /// [`crate::WorkflowEngine::resume`] is called with the human's response.
    Approval(ApprovalStep),
    /// Repeat an inner step until a state predicate holds or a cap is reached.
    ///
    /// This makes the blog-style "loop until done" idiom a first-class,
    /// verifiable construct rather than something hand-wired from a back-edge,
    /// a counter stage, and a conditional edge. Bounded by `max_iterations`.
    LoopUntil(LoopUntilStep),
    /// Fan an inner step out over a list of items resolved **at runtime** from
    /// workflow state (e.g. produced by an earlier stage), not hardcoded in the
    /// manifest. This is the declarative analogue of `parallel(items.map(...))`
    /// where the item count is unknown until the run reaches this stage.
    ForEach(ForEachStep),
    /// Drop items already processed in a prior run via a persistent content-hash
    /// seen-set, then hand the unseen subset to a downstream `for_each`. The
    /// backbone of the external-item automation recipe: poll a source on a
    /// schedule and process each item exactly once across runs.
    Dedup(DedupStep),
    /// Fan the workflow's result out to N delivery sinks — the terminal
    /// "publish everywhere" stage. Each sink is a plain
    /// [`car_ir::ActionProposal`] executed through the same proposal path as a
    /// `proposal` step, so e.g. messaging delivery is expressed as *a proposal
    /// invoking the messaging tool*, not a car-messaging dependency. Sinks run
    /// sequentially and are best-effort per sink: a failed sink records its
    /// error and the remaining sinks still run; the stage fails only when ALL
    /// sinks fail.
    Deliver(DeliverStep),
}

/// Fan results out to N delivery sinks (see [`StageStep::Deliver`]).
///
/// Timeouts: there is no per-sink timeout field — bound an individual sink
/// via `Action.timeout_ms` on the actions inside its proposal;
/// [`Stage::timeout_ms`] bounds the whole fan-out. A stage timeout that
/// cancels the fan-out mid-way still reports the sinks that already fired in
/// the failed stage's [`crate::StageOutput::Deliver`] output.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeliverStep {
    /// The delivery sinks, executed sequentially in order. Each is a plain
    /// action proposal — the same shape as [`ProposalStep`]'s and the
    /// automation recipe's `deliver` — so any tool the runtime can execute is
    /// a valid sink. Must be non-empty (enforced by
    /// [`crate::verify_workflow`]).
    pub sinks: Vec<ActionProposal>,
    /// Optional workflow-state key whose value seeds a `{{payload}}` template
    /// substitution in every string *value* of each sink proposal (action
    /// parameters, context, …) — mirroring how `for_each` templates
    /// `{{item}}`. A string value is substituted raw; any other JSON value is
    /// rendered as compact JSON; a missing key substitutes the empty string.
    /// `None` runs the sinks verbatim.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub payload_key: Option<String>,
}

/// Filter a candidate item array down to those not seen in any prior run, using
/// a persistent content-hash seen-set (see [`crate::dedup`]).
///
/// The engine reads the array at `items_from`, computes a stable content hash of
/// each item, drops those whose hash is already recorded in the `store`
/// namespace, records the survivors' hashes, and writes the survivors to `into`.
/// It also exposes `stage.<id>.unseen_count` and `stage.<id>.total_count` for
/// edge conditions (e.g. skip delivery when nothing is new).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DedupStep {
    /// Workflow-state key holding the candidate item array. A missing key or
    /// non-array value yields zero unseen items (a no-op).
    pub items_from: String,
    /// Workflow-state key to write the unseen subset (a JSON array) to, for a
    /// downstream `for_each` to fan out over.
    pub into: String,
    /// Persistent seen-set namespace. Items are deduplicated against, and
    /// committed to, the store of this name — shared across runs of the
    /// automation, isolated from other stores. Sanitized to a safe filename.
    pub store: String,
    /// Optional identity key: top-level object fields that define an item's
    /// identity. When set, only these fields contribute to the content hash, so
    /// unrelated changes to an item don't make it look new. Empty hashes the
    /// whole item canonically.
    #[serde(default, skip_serializing_if = "Vec::is_empty")]
    pub hash_fields: Vec<String>,
    /// Optional time-to-live for seen-set entries, in seconds. When set, a hash
    /// first seen more than this long ago is evicted before the dedup check, so
    /// the store stays bounded to a recent window and an item that aged out of
    /// the source (then reappears) is reprocessed. `None` keeps every hash
    /// forever — exact-once for all time, at the cost of an unbounded file.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub ttl_secs: Option<u64>,
}

/// Repeat `body` until `until` holds (AND of preconditions over workflow state)
/// or `max_iterations` is reached — whichever comes first.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LoopUntilStep {
    /// The step run each iteration. Its produced state (proposal `state_changes`,
    /// and `stage.<loop_id>.answer`/`.iteration`) is visible to `until`.
    pub body: Box<StageStep>,
    /// Loop stops once ALL of these pass (AND) against the state after a body
    /// iteration. Empty `until` runs exactly `max_iterations` times.
    #[serde(default)]
    pub until: Vec<Precondition>,
    /// Hard cap on body iterations. Must be >= 1 (enforced by
    /// [`crate::verify_workflow`]); the loop guard against unbounded repetition.
    pub max_iterations: u32,
}

/// Run `body` once per element of the array at state key `items_from`.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ForEachStep {
    /// Workflow-state key holding a JSON array. Each element becomes one item.
    /// A missing key or non-array value yields zero iterations (a no-op).
    pub items_from: String,
    /// The step to run per item. Before each run, the placeholders `{{item}}`
    /// (the element rendered as a string) and `{{index}}` (0-based position) are
    /// substituted into every string *value* in the body — pattern tasks, agent
    /// prompts, proposal parameter values — so the body can be parameterized by
    /// the item. (JSON object *keys* are not templated; `{{item}}` is expanded
    /// before `{{index}}`, so item text containing `{{index}}` is itself
    /// expanded.)
    ///
    /// Per-item results land in workflow state under
    /// `foreach.<id>.<index>.{item,answer}`, and the body's own state deltas
    /// (e.g. a proposal's `state_changes`) under
    /// `foreach.<id>.<index>.state.<key>` — namespaced per item so concurrent
    /// bodies never clobber one another. (Body deltas are deliberately NOT
    /// merged under their bare keys, unlike `LoopUntil`, which runs serially.)
    pub body: Box<StageStep>,
    /// Max bodies to run concurrently. `0` or `1` means sequential. Higher fans
    /// out via the runtime's bounded concurrency.
    #[serde(default)]
    pub max_concurrent: usize,
}

/// Run one of the car-multi coordination patterns.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PatternStep {
    /// Which pattern to run.
    pub pattern: PatternKind,
    /// Task description passed to the pattern.
    pub task: String,
    /// Agents involved. Interpretation depends on pattern kind.
    pub agents: Vec<AgentSpec>,
    /// Pattern-specific configuration.
    /// - supervisor: `{"max_rounds": 3, "supervisor_index": 0}`
    /// - map_reduce: `{"max_concurrent": 5, "items": ["a", "b", "c"]}`
    /// - fleet: `{"timeout_secs": 60}`
    #[serde(default)]
    pub config: HashMap<String, Value>,
}

/// All supported car-multi coordination patterns.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PatternKind {
    SwarmParallel,
    SwarmSequential,
    SwarmDebate,
    Pipeline,
    Supervisor,
    Delegator,
    MapReduce,
    Vote,
    Fleet,
    /// Fresh-context verification: a reviewer agent checks prior work against
    /// acceptance criteria with no author context. A structural defense against
    /// self-preferential bias and premature "done". `agents[0]` is the reviewer;
    /// `config.criteria` is a string array; `config.review_key` is the state key
    /// holding the work to review (required — there is no default; a missing or
    /// empty value fails the review closed). The verdict is exposed as the typed
    /// `stage.<id>.review_passed` (bool) for edge branching.
    AdversarialReview,
    /// Rank competitors by single-elimination pairwise judging. All agents but
    /// the last are competitors; the last (or `config.judge_index`) is the judge.
    Tournament,
}

/// Execute a car-engine action proposal.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProposalStep {
    pub proposal: ActionProposal,
}

/// Run a nested workflow.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubWorkflowStep {
    pub workflow: Box<Workflow>,
}

/// Pause for human-in-the-loop approval or input.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ApprovalStep {
    /// Message shown to the human when the run pauses.
    pub prompt: String,
    /// Structured fields the human is asked to fill in (empty = free-form approval).
    #[serde(default)]
    pub fields: Vec<ApprovalField>,
    /// State key under which the human's response object is written on resume.
    ///
    /// The response is also mirrored to `stage.<id>.answer` for edge conditions.
    /// Must be non-empty (enforced by [`crate::verify_workflow`]).
    pub output_key: String,
}

/// One field in an [`ApprovalStep`] form.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ApprovalField {
    /// Machine name (key in the response object).
    pub name: String,
    /// Human-readable label.
    #[serde(default)]
    pub label: String,
    /// Input kind: `text`, `textarea`, `options`, `bool`, or `number`.
    #[serde(default = "default_field_type")]
    pub field_type: String,
    /// Allowed values when `field_type` is `options`.
    #[serde(default, skip_serializing_if = "Vec::is_empty")]
    pub options: Vec<String>,
    /// Whether the human must supply this field.
    #[serde(default)]
    pub required: bool,
}

fn default_field_type() -> String {
    "text".to_string()
}

/// Directed edge between two stages, optionally conditional.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Edge {
    /// Source stage ID.
    pub from: String,
    /// Target stage ID.
    pub to: String,
    /// Conditions that must ALL be satisfied (AND) for this edge to be taken.
    /// Evaluated against the workflow state after `from` completes.
    /// Empty = unconditional.
    #[serde(default)]
    pub conditions: Vec<Precondition>,
    /// Human-readable label (e.g., "on success", "if approved").
    #[serde(default)]
    pub label: String,
}

/// What to run when compensating a stage on workflow failure (saga pattern).
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum CompensationHandler {
    /// Run a proposal to undo side effects.
    Proposal(ProposalStep),
    /// Execute a named stage from this workflow.
    StageRef { stage_id: String },
}

impl Workflow {
    /// Look up a stage by ID.
    pub fn stage(&self, id: &str) -> Option<&Stage> {
        self.stages.iter().find(|s| s.id == id)
    }

    /// Get all outgoing edges from a stage.
    pub fn outgoing_edges(&self, stage_id: &str) -> Vec<&Edge> {
        self.edges.iter().filter(|e| e.from == stage_id).collect()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    fn simple_workflow() -> Workflow {
        Workflow {
            id: "test-wf".into(),
            name: "Test Workflow".into(),
            start: "stage-a".into(),
            goal: None,
            stages: vec![
                Stage {
                    id: "stage-a".into(),
                    name: "Stage A".into(),
                    step: StageStep::Proposal(ProposalStep {
                        proposal: ActionProposal {
                            id: "p1".into(),
                            source: "test".into(),
                            actions: vec![],
                            timestamp: chrono::Utc::now(),
                            context: HashMap::new(),
                        },
                    }),
                    compensation: None,
                    timeout_ms: None,
                    metadata: HashMap::new(),
                },
                Stage {
                    id: "stage-b".into(),
                    name: "Stage B".into(),
                    step: StageStep::Proposal(ProposalStep {
                        proposal: ActionProposal {
                            id: "p2".into(),
                            source: "test".into(),
                            actions: vec![],
                            timestamp: chrono::Utc::now(),
                            context: HashMap::new(),
                        },
                    }),
                    compensation: None,
                    timeout_ms: None,
                    metadata: HashMap::new(),
                },
            ],
            edges: vec![Edge {
                from: "stage-a".into(),
                to: "stage-b".into(),
                conditions: vec![],
                label: "always".into(),
            }],
            max_iterations: 100,
            metadata: HashMap::new(),
        }
    }

    #[test]
    fn serde_roundtrip() {
        let wf = simple_workflow();
        let json = serde_json::to_string_pretty(&wf).unwrap();
        let parsed: Workflow = serde_json::from_str(&json).unwrap();
        assert_eq!(parsed.id, "test-wf");
        assert_eq!(parsed.stages.len(), 2);
        assert_eq!(parsed.edges.len(), 1);
        assert_eq!(parsed.start, "stage-a");
    }

    #[test]
    fn deliver_serde_roundtrip() {
        let step = StageStep::Deliver(DeliverStep {
            sinks: vec![
                ActionProposal {
                    id: "sink-slack".into(),
                    source: "test".into(),
                    actions: vec![],
                    timestamp: chrono::Utc::now(),
                    context: HashMap::new(),
                },
                ActionProposal {
                    id: "sink-mail".into(),
                    source: "test".into(),
                    actions: vec![],
                    timestamp: chrono::Utc::now(),
                    context: HashMap::new(),
                },
            ],
            payload_key: Some("digest".into()),
        });
        let json = serde_json::to_value(&step).unwrap();
        // Tagged form: {"type": "deliver", "sinks": [...], "payload_key": "digest"}
        assert_eq!(json.get("type").and_then(|v| v.as_str()), Some("deliver"));
        assert_eq!(json["sinks"].as_array().unwrap().len(), 2);
        let parsed: StageStep = serde_json::from_value(json).unwrap();
        match parsed {
            StageStep::Deliver(ds) => {
                assert_eq!(ds.sinks.len(), 2);
                assert_eq!(ds.sinks[0].id, "sink-slack");
                assert_eq!(ds.payload_key.as_deref(), Some("digest"));
            }
            other => panic!("expected deliver step, got {other:?}"),
        }

        // payload_key is optional and omitted when None.
        let bare = StageStep::Deliver(DeliverStep {
            sinks: vec![],
            payload_key: None,
        });
        let bare_json = serde_json::to_value(&bare).unwrap();
        assert!(bare_json.get("payload_key").is_none());
        let reparsed: StageStep = serde_json::from_value(bare_json).unwrap();
        assert!(matches!(
            reparsed,
            StageStep::Deliver(DeliverStep {
                payload_key: None,
                ..
            })
        ));
    }

    #[test]
    fn stage_lookup() {
        let wf = simple_workflow();
        assert!(wf.stage("stage-a").is_some());
        assert!(wf.stage("nonexistent").is_none());
    }

    #[test]
    fn outgoing_edges() {
        let wf = simple_workflow();
        assert_eq!(wf.outgoing_edges("stage-a").len(), 1);
        assert_eq!(wf.outgoing_edges("stage-b").len(), 0);
    }
}