car_workflow/types.rs
1//! Workflow definition types — fully serializable for use through all bindings.
2
3use std::collections::HashMap;
4
5use car_ir::{ActionProposal, Precondition};
6use car_multi::AgentSpec;
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9
10/// Top-level workflow definition: a named graph of stages with conditional edges.
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct Workflow {
13 /// Unique identifier.
14 pub id: String,
15 /// Human-readable name.
16 pub name: String,
17 /// Stage ID of the entry point.
18 pub start: String,
19 /// The overall objective. When set, it is pinned into workflow state as
20 /// `goal` at run start and re-anchored into every agent coordination's task
21 /// ("Overall goal: … / Current step: …") so a long multi-stage run can't
22 /// drift from the original objective. A structural defense against goal drift.
23 #[serde(default, skip_serializing_if = "Option::is_none")]
24 pub goal: Option<String>,
25 /// All stages in this workflow.
26 pub stages: Vec<Stage>,
27 /// Directed edges between stages (may have conditions).
28 pub edges: Vec<Edge>,
29 /// Maximum total stage executions before aborting (loop guard).
30 #[serde(default = "default_max_iterations")]
31 pub max_iterations: u32,
32 /// Opaque metadata (owner, version, tags, etc.).
33 #[serde(default)]
34 pub metadata: HashMap<String, Value>,
35}
36
37fn default_max_iterations() -> u32 {
38 100
39}
40
41/// A named step in the workflow.
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct Stage {
44 /// Unique stage identifier (referenced by edges).
45 pub id: String,
46 /// Human-readable name.
47 pub name: String,
48 /// What this stage does.
49 pub step: StageStep,
50 /// Optional compensation to run if a later stage fails (saga pattern).
51 #[serde(default, skip_serializing_if = "Option::is_none")]
52 pub compensation: Option<CompensationHandler>,
53 /// Optional timeout for this stage in milliseconds.
54 #[serde(default, skip_serializing_if = "Option::is_none")]
55 pub timeout_ms: Option<u64>,
56 /// Opaque metadata.
57 #[serde(default)]
58 pub metadata: HashMap<String, Value>,
59}
60
61/// What a stage actually does.
62#[derive(Debug, Clone, Serialize, Deserialize)]
63#[serde(tag = "type", rename_all = "snake_case")]
64pub enum StageStep {
65 /// Run a car-multi agent coordination pattern.
66 Pattern(PatternStep),
67 /// Execute a car-engine action proposal directly.
68 Proposal(ProposalStep),
69 /// Run a nested sub-workflow.
70 SubWorkflow(SubWorkflowStep),
71 /// Pause the workflow and wait for human input (human-in-the-loop gate).
72 ///
73 /// The engine snapshots the run, returns [`crate::WorkflowStatus::Paused`]
74 /// with a [`crate::PausedWorkflow`] checkpoint, and does not execute this
75 /// stage's body. The run continues only when
76 /// [`crate::WorkflowEngine::resume`] is called with the human's response.
77 Approval(ApprovalStep),
78 /// Repeat an inner step until a state predicate holds or a cap is reached.
79 ///
80 /// This makes the blog-style "loop until done" idiom a first-class,
81 /// verifiable construct rather than something hand-wired from a back-edge,
82 /// a counter stage, and a conditional edge. Bounded by `max_iterations`.
83 LoopUntil(LoopUntilStep),
84 /// Fan an inner step out over a list of items resolved **at runtime** from
85 /// workflow state (e.g. produced by an earlier stage), not hardcoded in the
86 /// manifest. This is the declarative analogue of `parallel(items.map(...))`
87 /// where the item count is unknown until the run reaches this stage.
88 ForEach(ForEachStep),
89}
90
91/// Repeat `body` until `until` holds (AND of preconditions over workflow state)
92/// or `max_iterations` is reached — whichever comes first.
93#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct LoopUntilStep {
95 /// The step run each iteration. Its produced state (proposal `state_changes`,
96 /// and `stage.<loop_id>.answer`/`.iteration`) is visible to `until`.
97 pub body: Box<StageStep>,
98 /// Loop stops once ALL of these pass (AND) against the state after a body
99 /// iteration. Empty `until` runs exactly `max_iterations` times.
100 #[serde(default)]
101 pub until: Vec<Precondition>,
102 /// Hard cap on body iterations. Must be >= 1 (enforced by
103 /// [`crate::verify_workflow`]); the loop guard against unbounded repetition.
104 pub max_iterations: u32,
105}
106
107/// Run `body` once per element of the array at state key `items_from`.
108#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct ForEachStep {
110 /// Workflow-state key holding a JSON array. Each element becomes one item.
111 /// A missing key or non-array value yields zero iterations (a no-op).
112 pub items_from: String,
113 /// The step to run per item. Before each run, the placeholders `{{item}}`
114 /// (the element rendered as a string) and `{{index}}` (0-based position) are
115 /// substituted into every string *value* in the body — pattern tasks, agent
116 /// prompts, proposal parameter values — so the body can be parameterized by
117 /// the item. (JSON object *keys* are not templated; `{{item}}` is expanded
118 /// before `{{index}}`, so item text containing `{{index}}` is itself
119 /// expanded.)
120 ///
121 /// Per-item results land in workflow state under
122 /// `foreach.<id>.<index>.{item,answer}`, and the body's own state deltas
123 /// (e.g. a proposal's `state_changes`) under
124 /// `foreach.<id>.<index>.state.<key>` — namespaced per item so concurrent
125 /// bodies never clobber one another. (Body deltas are deliberately NOT
126 /// merged under their bare keys, unlike `LoopUntil`, which runs serially.)
127 pub body: Box<StageStep>,
128 /// Max bodies to run concurrently. `0` or `1` means sequential. Higher fans
129 /// out via the runtime's bounded concurrency.
130 #[serde(default)]
131 pub max_concurrent: usize,
132}
133
134/// Run one of the car-multi coordination patterns.
135#[derive(Debug, Clone, Serialize, Deserialize)]
136pub struct PatternStep {
137 /// Which pattern to run.
138 pub pattern: PatternKind,
139 /// Task description passed to the pattern.
140 pub task: String,
141 /// Agents involved. Interpretation depends on pattern kind.
142 pub agents: Vec<AgentSpec>,
143 /// Pattern-specific configuration.
144 /// - supervisor: `{"max_rounds": 3, "supervisor_index": 0}`
145 /// - map_reduce: `{"max_concurrent": 5, "items": ["a", "b", "c"]}`
146 /// - fleet: `{"timeout_secs": 60}`
147 #[serde(default)]
148 pub config: HashMap<String, Value>,
149}
150
151/// All supported car-multi coordination patterns.
152#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
153#[serde(rename_all = "snake_case")]
154pub enum PatternKind {
155 SwarmParallel,
156 SwarmSequential,
157 SwarmDebate,
158 Pipeline,
159 Supervisor,
160 Delegator,
161 MapReduce,
162 Vote,
163 Fleet,
164 /// Fresh-context verification: a reviewer agent checks prior work against
165 /// acceptance criteria with no author context. A structural defense against
166 /// self-preferential bias and premature "done". `agents[0]` is the reviewer;
167 /// `config.criteria` is a string array; `config.review_key` is the state key
168 /// holding the work to review (required — there is no default; a missing or
169 /// empty value fails the review closed). The verdict is exposed as the typed
170 /// `stage.<id>.review_passed` (bool) for edge branching.
171 AdversarialReview,
172 /// Rank competitors by single-elimination pairwise judging. All agents but
173 /// the last are competitors; the last (or `config.judge_index`) is the judge.
174 Tournament,
175}
176
177/// Execute a car-engine action proposal.
178#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct ProposalStep {
180 pub proposal: ActionProposal,
181}
182
183/// Run a nested workflow.
184#[derive(Debug, Clone, Serialize, Deserialize)]
185pub struct SubWorkflowStep {
186 pub workflow: Box<Workflow>,
187}
188
189/// Pause for human-in-the-loop approval or input.
190#[derive(Debug, Clone, Serialize, Deserialize)]
191pub struct ApprovalStep {
192 /// Message shown to the human when the run pauses.
193 pub prompt: String,
194 /// Structured fields the human is asked to fill in (empty = free-form approval).
195 #[serde(default)]
196 pub fields: Vec<ApprovalField>,
197 /// State key under which the human's response object is written on resume.
198 ///
199 /// The response is also mirrored to `stage.<id>.answer` for edge conditions.
200 /// Must be non-empty (enforced by [`crate::verify_workflow`]).
201 pub output_key: String,
202}
203
204/// One field in an [`ApprovalStep`] form.
205#[derive(Debug, Clone, Serialize, Deserialize)]
206pub struct ApprovalField {
207 /// Machine name (key in the response object).
208 pub name: String,
209 /// Human-readable label.
210 #[serde(default)]
211 pub label: String,
212 /// Input kind: `text`, `textarea`, `options`, `bool`, or `number`.
213 #[serde(default = "default_field_type")]
214 pub field_type: String,
215 /// Allowed values when `field_type` is `options`.
216 #[serde(default, skip_serializing_if = "Vec::is_empty")]
217 pub options: Vec<String>,
218 /// Whether the human must supply this field.
219 #[serde(default)]
220 pub required: bool,
221}
222
223fn default_field_type() -> String {
224 "text".to_string()
225}
226
227/// Directed edge between two stages, optionally conditional.
228#[derive(Debug, Clone, Serialize, Deserialize)]
229pub struct Edge {
230 /// Source stage ID.
231 pub from: String,
232 /// Target stage ID.
233 pub to: String,
234 /// Conditions that must ALL be satisfied (AND) for this edge to be taken.
235 /// Evaluated against the workflow state after `from` completes.
236 /// Empty = unconditional.
237 #[serde(default)]
238 pub conditions: Vec<Precondition>,
239 /// Human-readable label (e.g., "on success", "if approved").
240 #[serde(default)]
241 pub label: String,
242}
243
244/// What to run when compensating a stage on workflow failure (saga pattern).
245#[derive(Debug, Clone, Serialize, Deserialize)]
246#[serde(tag = "type", rename_all = "snake_case")]
247pub enum CompensationHandler {
248 /// Run a proposal to undo side effects.
249 Proposal(ProposalStep),
250 /// Execute a named stage from this workflow.
251 StageRef { stage_id: String },
252}
253
254impl Workflow {
255 /// Look up a stage by ID.
256 pub fn stage(&self, id: &str) -> Option<&Stage> {
257 self.stages.iter().find(|s| s.id == id)
258 }
259
260 /// Get all outgoing edges from a stage.
261 pub fn outgoing_edges(&self, stage_id: &str) -> Vec<&Edge> {
262 self.edges.iter().filter(|e| e.from == stage_id).collect()
263 }
264}
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269
270 fn simple_workflow() -> Workflow {
271 Workflow {
272 id: "test-wf".into(),
273 name: "Test Workflow".into(),
274 start: "stage-a".into(),
275 goal: None,
276 stages: vec![
277 Stage {
278 id: "stage-a".into(),
279 name: "Stage A".into(),
280 step: StageStep::Proposal(ProposalStep {
281 proposal: ActionProposal {
282 id: "p1".into(),
283 source: "test".into(),
284 actions: vec![],
285 timestamp: chrono::Utc::now(),
286 context: HashMap::new(),
287 },
288 }),
289 compensation: None,
290 timeout_ms: None,
291 metadata: HashMap::new(),
292 },
293 Stage {
294 id: "stage-b".into(),
295 name: "Stage B".into(),
296 step: StageStep::Proposal(ProposalStep {
297 proposal: ActionProposal {
298 id: "p2".into(),
299 source: "test".into(),
300 actions: vec![],
301 timestamp: chrono::Utc::now(),
302 context: HashMap::new(),
303 },
304 }),
305 compensation: None,
306 timeout_ms: None,
307 metadata: HashMap::new(),
308 },
309 ],
310 edges: vec![Edge {
311 from: "stage-a".into(),
312 to: "stage-b".into(),
313 conditions: vec![],
314 label: "always".into(),
315 }],
316 max_iterations: 100,
317 metadata: HashMap::new(),
318 }
319 }
320
321 #[test]
322 fn serde_roundtrip() {
323 let wf = simple_workflow();
324 let json = serde_json::to_string_pretty(&wf).unwrap();
325 let parsed: Workflow = serde_json::from_str(&json).unwrap();
326 assert_eq!(parsed.id, "test-wf");
327 assert_eq!(parsed.stages.len(), 2);
328 assert_eq!(parsed.edges.len(), 1);
329 assert_eq!(parsed.start, "stage-a");
330 }
331
332 #[test]
333 fn stage_lookup() {
334 let wf = simple_workflow();
335 assert!(wf.stage("stage-a").is_some());
336 assert!(wf.stage("nonexistent").is_none());
337 }
338
339 #[test]
340 fn outgoing_edges() {
341 let wf = simple_workflow();
342 assert_eq!(wf.outgoing_edges("stage-a").len(), 1);
343 assert_eq!(wf.outgoing_edges("stage-b").len(), 0);
344 }
345}