Skip to main content

a3s_code_core/orchestration/
workflow.rs

1//! A typed, host-driven workflow facade over the [`AgentExecutor`] seam.
2//!
3//! [`Workflow`] is a thin orchestrator with **zero new execution machinery**:
4//! each verb resolves to exactly one existing combinator call (see
5//! [`executor`](super::executor) and [`combinators`](super::combinators)). The
6//! genuinely new behavior is observability — workflow-level milestone events on
7//! a dedicated [`WorkflowEvent`] broadcast — and a resume boundary at each
8//! [`phase`](Workflow::phase).
9//!
10//! ## Control flow lives in the host
11//!
12//! Unlike the model-driven `task`/`parallel_task` tools (the LLM decides to fan
13//! out) this is *programmable*: the caller `await`s a verb, inspects the
14//! returned [`StepOutcome`]s, and decides what runs next with ordinary Rust
15//! `if`/`for`/`while`/`?`/`match`. The host language IS the interpreter, so
16//! there is no embedded scripting engine to sandbox and the grammar stays
17//! minimal (4 verbs + `log`) — anything richer (retry, map/reduce, timeouts) is
18//! a host concern.
19//!
20//! ```no_run
21//! # use std::sync::Arc;
22//! # use a3s_code_core::orchestration::{Workflow, AgentExecutor, AgentStepSpec};
23//! # async fn run(executor: Arc<dyn AgentExecutor>) {
24//! let wf = Workflow::builder(executor).build();
25//!
26//! // One step.
27//! let plan = wf
28//!     .agent(AgentStepSpec::new("plan", "plan", "plan the work", "Plan the refactor"))
29//!     .await;
30//!
31//! // Fan a *variable* number of steps out of the prior result — this is the
32//! // "dynamic" part: the shape is computed at runtime, not declared up front.
33//! if plan.success {
34//!     let specs: Vec<AgentStepSpec> = plan
35//!         .output
36//!         .lines()
37//!         .enumerate()
38//!         .map(|(i, line)| AgentStepSpec::new(format!("impl-{i}"), "general", "impl", line))
39//!         .collect();
40//!     let results = wf.phase("implement", specs).await; // resumable barrier
41//!     wf.log("info", &format!("implemented {} steps", results.len()), serde_json::Value::Null);
42//! }
43//! # }
44//! ```
45
46use super::combinators::{execute_pipeline, execute_steps_parallel_resumable, PipelineStage};
47use super::executor::{execute_steps_parallel, AgentExecutor, AgentStepSpec, StepOutcome};
48use super::workflow_budget::{BudgetSnapshot, WorkflowBudget};
49use crate::agent::AgentEvent;
50use crate::store::SessionStore;
51use serde::{Deserialize, Serialize};
52use std::sync::atomic::{AtomicUsize, Ordering};
53use std::sync::Arc;
54use tokio::sync::broadcast;
55
56/// Default capacity of the [`WorkflowEvent`] broadcast channel. Slow
57/// subscribers lag (and drop) rather than block the workflow — milestones are
58/// best-effort; durable audit belongs on the trace/AHP path.
59const DEFAULT_WORKFLOW_EVENT_CAPACITY: usize = 256;
60
61/// Workflow-level milestone stream, distinct from the per-step
62/// [`AgentEvent`] stream the combinators already emit.
63///
64/// Bridged onto whatever consumes `AgentEvent` is *not* automatic — a host that
65/// wants these in its existing UI subscribes via [`Workflow::subscribe`].
66#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
67#[serde(tag = "type", rename_all = "snake_case")]
68pub enum WorkflowEvent {
69    /// A named [`phase`](Workflow::phase) began.
70    PhaseStart {
71        name: String,
72        index: usize,
73        step_count: usize,
74    },
75    /// A named [`phase`](Workflow::phase) finished.
76    PhaseEnd {
77        name: String,
78        index: usize,
79        succeeded: usize,
80        failed: usize,
81    },
82    /// A structured log line emitted by [`Workflow::log`].
83    Log {
84        level: String,
85        message: String,
86        fields: serde_json::Value,
87    },
88    /// The shared workflow budget reached its cap; subsequent child LLM calls
89    /// will be denied. Emitted at the phase boundary where exhaustion is first
90    /// observed. The fan-out in flight is **not** force-killed.
91    BudgetExhausted { resource: String, reason: String },
92}
93
94/// Builder for a [`Workflow`]. The executor is mandatory (it is the seam to the
95/// host's placement / scheduling); everything else is optional.
96pub struct WorkflowBuilder {
97    executor: Arc<dyn AgentExecutor>,
98    store: Option<Arc<dyn SessionStore>>,
99    step_events: Option<broadcast::Sender<AgentEvent>>,
100    root_id: Option<String>,
101    budget: Option<Arc<WorkflowBudget>>,
102}
103
104impl WorkflowBuilder {
105    /// Start a builder around `executor` (the default in-box executor is
106    /// [`AgentSession::agent_executor`](crate::AgentSession::agent_executor)).
107    pub fn new(executor: Arc<dyn AgentExecutor>) -> Self {
108        Self {
109            executor,
110            store: None,
111            step_events: None,
112            root_id: None,
113            budget: None,
114        }
115    }
116
117    /// Attach the shared budget ledger this workflow reports on.
118    ///
119    /// The SAME `Arc<WorkflowBudget>` must also be installed as the executor's
120    /// child `budget_guard` (the in-box wiring is done by
121    /// [`AgentSession::workflow`](crate::AgentSession::workflow)); the workflow
122    /// only *reads* it for [`budget_snapshot`](Workflow::budget_snapshot) and to
123    /// emit [`WorkflowEvent::BudgetExhausted`].
124    pub fn with_budget(mut self, budget: Arc<WorkflowBudget>) -> Self {
125        self.budget = Some(budget);
126        self
127    }
128
129    /// Persist progress so each [`phase`](Workflow::phase) becomes a resume
130    /// boundary. Without a store, phases run as plain (non-resumable) barriers.
131    pub fn with_store(mut self, store: Arc<dyn SessionStore>) -> Self {
132        self.store = Some(store);
133        self
134    }
135
136    /// Thread the per-step [`AgentEvent`] sender into every combinator call so
137    /// existing subagent/AHP/trace listeners observe child-run lifecycle events.
138    pub fn with_step_events(mut self, step_events: broadcast::Sender<AgentEvent>) -> Self {
139        self.step_events = Some(step_events);
140        self
141    }
142
143    /// Set the stable root id used to derive per-phase checkpoint keys. Supply a
144    /// deterministic id (e.g. session-derived) for resume to work across runs;
145    /// otherwise a random id is used and resume is effectively disabled.
146    pub fn with_root_id(mut self, root_id: impl Into<String>) -> Self {
147        self.root_id = Some(root_id.into());
148        self
149    }
150
151    /// Finalize the [`Workflow`], creating its event channel.
152    pub fn build(self) -> Workflow {
153        let (events, _) = broadcast::channel(DEFAULT_WORKFLOW_EVENT_CAPACITY);
154        let root_id = self
155            .root_id
156            .unwrap_or_else(|| format!("wf-{}", uuid::Uuid::new_v4()));
157        Workflow {
158            executor: self.executor,
159            store: self.store,
160            events,
161            step_events: self.step_events,
162            root_id,
163            phase_seq: Arc::new(AtomicUsize::new(0)),
164            budget: self.budget,
165        }
166    }
167}
168
169/// A cheaply-clonable (all-`Arc`) typed orchestration facade.
170///
171/// Every verb delegates to exactly one existing combinator; [`Workflow`] owns no
172/// scheduling or LLM logic. It is `Clone` so a phase can itself spawn
173/// sub-workflows that share the same event bus, store, and phase numbering.
174#[derive(Clone)]
175pub struct Workflow {
176    executor: Arc<dyn AgentExecutor>,
177    store: Option<Arc<dyn SessionStore>>,
178    events: broadcast::Sender<WorkflowEvent>,
179    step_events: Option<broadcast::Sender<AgentEvent>>,
180    root_id: String,
181    phase_seq: Arc<AtomicUsize>,
182    budget: Option<Arc<WorkflowBudget>>,
183}
184
185impl Workflow {
186    /// Begin building a workflow around `executor`.
187    pub fn builder(executor: Arc<dyn AgentExecutor>) -> WorkflowBuilder {
188        WorkflowBuilder::new(executor)
189    }
190
191    /// The stable root id phase checkpoint keys are derived from.
192    pub fn root_id(&self) -> &str {
193        &self.root_id
194    }
195
196    /// A snapshot of the shared budget ledger, if a budget is attached.
197    pub fn budget_snapshot(&self) -> Option<BudgetSnapshot> {
198        self.budget.as_ref().map(|b| b.snapshot())
199    }
200
201    /// Subscribe to this workflow's [`WorkflowEvent`] milestones.
202    pub fn subscribe(&self) -> broadcast::Receiver<WorkflowEvent> {
203        self.events.subscribe()
204    }
205
206    /// Run a single step. A one-element barrier — reuses the panic isolation and
207    /// placement of [`parallel`](Self::parallel) so one and many steps share a
208    /// single code path.
209    pub async fn agent(&self, spec: AgentStepSpec) -> StepOutcome {
210        let task_id = spec.task_id.clone();
211        let agent = spec.agent.clone();
212        execute_steps_parallel(
213            Arc::clone(&self.executor),
214            vec![spec],
215            self.step_events.clone(),
216        )
217        .await
218        .into_iter()
219        .next()
220        .unwrap_or_else(|| StepOutcome::failed(task_id, agent, "step produced no outcome"))
221    }
222
223    /// Barrier fan-out. Bounded by the executor's
224    /// [`concurrency_hint`](AgentExecutor::concurrency_hint); input order is
225    /// preserved and a panicked branch becomes a failed [`StepOutcome`].
226    pub async fn parallel(&self, specs: Vec<AgentStepSpec>) -> Vec<StepOutcome> {
227        execute_steps_parallel(Arc::clone(&self.executor), specs, self.step_events.clone()).await
228    }
229
230    /// A *named* barrier and a resume boundary.
231    ///
232    /// Emits [`WorkflowEvent::PhaseStart`]/[`WorkflowEvent::PhaseEnd`] around the
233    /// inner combinator. When a store is configured the phase runs through
234    /// [`execute_steps_parallel_resumable`] keyed by a deterministic
235    /// `"{root_id}/{index}:{name}"` id, so an interrupted run skips already
236    /// completed steps; otherwise it is a plain [`parallel`](Self::parallel).
237    pub async fn phase(&self, name: &str, specs: Vec<AgentStepSpec>) -> Vec<StepOutcome> {
238        let index = self.phase_seq.fetch_add(1, Ordering::SeqCst);
239        let _ = self.events.send(WorkflowEvent::PhaseStart {
240            name: name.to_string(),
241            index,
242            step_count: specs.len(),
243        });
244
245        let out = match &self.store {
246            Some(store) => {
247                let workflow_id = format!("{}/{index}:{name}", self.root_id);
248                execute_steps_parallel_resumable(
249                    Arc::clone(&self.executor),
250                    specs,
251                    &workflow_id,
252                    Arc::clone(store),
253                    self.step_events.clone(),
254                )
255                .await
256            }
257            None => self.parallel(specs).await,
258        };
259
260        let failed = out.iter().filter(|o| !o.success).count();
261        let _ = self.events.send(WorkflowEvent::PhaseEnd {
262            name: name.to_string(),
263            index,
264            succeeded: out.len() - failed,
265            failed,
266        });
267
268        // Surface a budget cap once it is reached so the host can stop issuing
269        // further phases. Enforcement itself lives in the child loops (via the
270        // shared budget guard); this is observation only.
271        if let Some(budget) = &self.budget {
272            if budget.is_exhausted() {
273                let snap = budget.snapshot();
274                let _ = self.events.send(WorkflowEvent::BudgetExhausted {
275                    resource: "workflow_tokens".to_string(),
276                    reason: format!(
277                        "workflow token budget exhausted ({} / {} tokens)",
278                        snap.consumed_tokens,
279                        snap.limit_tokens.unwrap_or(0)
280                    ),
281                });
282            }
283        }
284        out
285    }
286
287    /// Per-item staged chains with **no barrier between stages** — delegates
288    /// straight to [`execute_pipeline`]. Item A may be in stage 3 while item B is
289    /// still in stage 1.
290    pub async fn pipeline<I>(
291        &self,
292        items: Vec<I>,
293        stages: Vec<PipelineStage<I>>,
294    ) -> Vec<Option<StepOutcome>>
295    where
296        I: Send + 'static,
297    {
298        execute_pipeline(
299            Arc::clone(&self.executor),
300            items,
301            stages,
302            self.step_events.clone(),
303        )
304        .await
305    }
306
307    /// Emit a best-effort structured log line on the [`WorkflowEvent`] stream.
308    /// Synchronous and non-failing: a full channel drops the line rather than
309    /// blocking the workflow.
310    pub fn log(&self, level: &str, message: &str, fields: serde_json::Value) {
311        let _ = self.events.send(WorkflowEvent::Log {
312            level: level.to_string(),
313            message: message.to_string(),
314            fields,
315        });
316    }
317}
318
319#[cfg(test)]
320mod tests {
321    use super::*;
322    use crate::orchestration::WorkflowCheckpoint;
323    use crate::store::MemorySessionStore;
324    use async_trait::async_trait;
325    use std::collections::HashMap;
326
327    /// Echoes the prompt into the output; records which task ids it ran.
328    struct EchoExecutor {
329        ran: Arc<tokio::sync::Mutex<Vec<String>>>,
330    }
331
332    impl EchoExecutor {
333        fn new() -> Self {
334            Self {
335                ran: Arc::new(tokio::sync::Mutex::new(Vec::new())),
336            }
337        }
338    }
339
340    #[async_trait]
341    impl AgentExecutor for EchoExecutor {
342        async fn execute_step(
343            &self,
344            spec: AgentStepSpec,
345            _event_tx: Option<broadcast::Sender<AgentEvent>>,
346        ) -> StepOutcome {
347            self.ran.lock().await.push(spec.task_id.clone());
348            StepOutcome {
349                task_id: spec.task_id.clone(),
350                session_id: format!("task-run-{}", spec.task_id),
351                agent: spec.agent.clone(),
352                output: spec.prompt.clone(),
353                success: spec.agent != "fail",
354                structured: None,
355            }
356        }
357        fn concurrency_hint(&self) -> usize {
358            4
359        }
360    }
361
362    fn spec(id: &str, agent: &str, prompt: &str) -> AgentStepSpec {
363        AgentStepSpec::new(id, agent, "d", prompt)
364    }
365
366    #[tokio::test]
367    async fn agent_runs_a_single_step() {
368        let wf = Workflow::builder(Arc::new(EchoExecutor::new())).build();
369        let out = wf.agent(spec("a", "explore", "hello")).await;
370        assert_eq!(out.task_id, "a");
371        assert_eq!(out.output, "hello");
372        assert!(out.success);
373    }
374
375    #[tokio::test]
376    async fn parallel_preserves_order_and_isolates_failure() {
377        let wf = Workflow::builder(Arc::new(EchoExecutor::new())).build();
378        let out = wf
379            .parallel(vec![
380                spec("a", "explore", "pa"),
381                spec("b", "fail", "pb"),
382                spec("c", "review", "pc"),
383            ])
384            .await;
385        assert_eq!(
386            out.iter().map(|o| o.task_id.as_str()).collect::<Vec<_>>(),
387            vec!["a", "b", "c"]
388        );
389        assert!(out[0].success);
390        assert!(
391            !out[1].success,
392            "the failing branch surfaces as success=false"
393        );
394        assert!(out[2].success);
395    }
396
397    #[tokio::test]
398    async fn phase_emits_start_and_end_milestones() {
399        let wf = Workflow::builder(Arc::new(EchoExecutor::new())).build();
400        let mut rx = wf.subscribe();
401        let out = wf
402            .phase(
403                "review",
404                vec![spec("a", "review", "p"), spec("b", "fail", "p")],
405            )
406            .await;
407        assert_eq!(out.len(), 2);
408
409        let start = rx.recv().await.unwrap();
410        assert_eq!(
411            start,
412            WorkflowEvent::PhaseStart {
413                name: "review".to_string(),
414                index: 0,
415                step_count: 2,
416            }
417        );
418        let end = rx.recv().await.unwrap();
419        assert_eq!(
420            end,
421            WorkflowEvent::PhaseEnd {
422                name: "review".to_string(),
423                index: 0,
424                succeeded: 1,
425                failed: 1,
426            }
427        );
428    }
429
430    #[tokio::test]
431    async fn phases_get_monotonic_indices() {
432        let wf = Workflow::builder(Arc::new(EchoExecutor::new())).build();
433        let mut rx = wf.subscribe();
434        wf.phase("one", vec![spec("a", "explore", "p")]).await;
435        wf.phase("two", vec![spec("b", "explore", "p")]).await;
436
437        let indices: Vec<usize> = {
438            let mut seen = Vec::new();
439            while let Ok(ev) = rx.try_recv() {
440                if let WorkflowEvent::PhaseStart { index, .. } = ev {
441                    seen.push(index);
442                }
443            }
444            seen
445        };
446        assert_eq!(indices, vec![0, 1], "phase indices increment per call");
447    }
448
449    #[tokio::test]
450    async fn phase_with_store_resumes_from_checkpoint() {
451        let store: Arc<dyn SessionStore> = Arc::new(MemorySessionStore::new());
452        let exec = Arc::new(EchoExecutor::new());
453        let ran = Arc::clone(&exec.ran);
454        let wf = Workflow::builder(exec)
455            .with_store(Arc::clone(&store))
456            .with_root_id("root")
457            .build();
458
459        // Pre-seed the checkpoint for phase 0 ("implement"): step "a" is already
460        // done. The deterministic id must match what phase() derives.
461        let mut done = HashMap::new();
462        done.insert(
463            "a".to_string(),
464            StepOutcome {
465                task_id: "a".into(),
466                session_id: "task-run-a".into(),
467                agent: "explore".into(),
468                output: "cached-a".into(),
469                success: true,
470                structured: None,
471            },
472        );
473        store
474            .save_workflow_checkpoint(
475                "root/0:implement",
476                &WorkflowCheckpoint::from_completed("root/0:implement", &done, 1),
477            )
478            .await
479            .unwrap();
480
481        let out = wf
482            .phase(
483                "implement",
484                vec![spec("a", "explore", "pa"), spec("b", "review", "pb")],
485            )
486            .await;
487
488        assert_eq!(
489            *ran.lock().await,
490            vec!["b".to_string()],
491            "only the not-yet-completed step actually runs"
492        );
493        assert_eq!(
494            out[0].output, "cached-a",
495            "completed step reuses its cached outcome"
496        );
497        assert!(out.iter().all(|o| o.success));
498    }
499
500    #[tokio::test]
501    async fn pipeline_chains_stages_per_item() {
502        let wf = Workflow::builder(Arc::new(EchoExecutor::new())).build();
503        let stages: Vec<PipelineStage<&str>> = vec![
504            Arc::new(|_prev: Option<&StepOutcome>, item: &&str| {
505                Some(AgentStepSpec::new("s1", "explore", "d", *item))
506            }),
507            Arc::new(|prev: Option<&StepOutcome>, _item: &&str| {
508                let prior = prev.map(|o| o.output.clone()).unwrap_or_default();
509                Some(AgentStepSpec::new(
510                    "s2",
511                    "review",
512                    "d",
513                    format!("review of: {prior}"),
514                ))
515            }),
516        ];
517        let out = wf.pipeline(vec!["alpha", "beta"], stages).await;
518        assert_eq!(out.len(), 2);
519        assert_eq!(out[0].as_ref().unwrap().output, "review of: alpha");
520        assert_eq!(out[1].as_ref().unwrap().output, "review of: beta");
521    }
522
523    #[tokio::test]
524    async fn log_emits_a_log_event() {
525        let wf = Workflow::builder(Arc::new(EchoExecutor::new())).build();
526        let mut rx = wf.subscribe();
527        wf.log("info", "hello", serde_json::json!({ "k": 1 }));
528        let ev = rx.recv().await.unwrap();
529        assert_eq!(
530            ev,
531            WorkflowEvent::Log {
532                level: "info".to_string(),
533                message: "hello".to_string(),
534                fields: serde_json::json!({ "k": 1 }),
535            }
536        );
537    }
538
539    #[tokio::test]
540    async fn phase_emits_budget_exhausted_when_capped() {
541        use crate::budget::BudgetGuard;
542        use crate::llm::TokenUsage;
543
544        let budget = Arc::new(WorkflowBudget::new(Some(10)));
545        // Spend past the cap before the phase runs (simulates child loops having
546        // already recorded usage into the shared ledger).
547        budget
548            .record_after_llm(
549                "s",
550                &TokenUsage {
551                    total_tokens: 12,
552                    ..Default::default()
553                },
554            )
555            .await;
556
557        let wf = Workflow::builder(Arc::new(EchoExecutor::new()))
558            .with_budget(Arc::clone(&budget))
559            .build();
560        let mut rx = wf.subscribe();
561        wf.phase("p", vec![spec("a", "explore", "p")]).await;
562
563        let mut saw_exhausted = false;
564        while let Ok(ev) = rx.try_recv() {
565            if let WorkflowEvent::BudgetExhausted { resource, .. } = ev {
566                assert_eq!(resource, "workflow_tokens");
567                saw_exhausted = true;
568            }
569        }
570        assert!(
571            saw_exhausted,
572            "phase boundary emits BudgetExhausted once capped"
573        );
574        assert_eq!(wf.budget_snapshot().unwrap().consumed_tokens, 12);
575    }
576
577    #[tokio::test]
578    async fn no_budget_means_no_snapshot_and_no_exhausted_event() {
579        let wf = Workflow::builder(Arc::new(EchoExecutor::new())).build();
580        let mut rx = wf.subscribe();
581        wf.phase("p", vec![spec("a", "explore", "p")]).await;
582        assert!(wf.budget_snapshot().is_none());
583        while let Ok(ev) = rx.try_recv() {
584            assert!(
585                !matches!(ev, WorkflowEvent::BudgetExhausted { .. }),
586                "no budget → never a BudgetExhausted event"
587            );
588        }
589    }
590
591    #[tokio::test]
592    async fn agent_returns_failed_outcome_when_executor_yields_nothing() {
593        // A pathological executor whose fan-out yields no result still gives the
594        // caller an addressable failed outcome rather than panicking.
595        struct Empty;
596        #[async_trait]
597        impl AgentExecutor for Empty {
598            async fn execute_step(
599                &self,
600                spec: AgentStepSpec,
601                _tx: Option<broadcast::Sender<AgentEvent>>,
602            ) -> StepOutcome {
603                StepOutcome::failed(spec.task_id, spec.agent, "boom")
604            }
605        }
606        let wf = Workflow::builder(Arc::new(Empty)).build();
607        let out = wf.agent(spec("x", "explore", "p")).await;
608        assert_eq!(out.task_id, "x");
609        assert!(!out.success);
610    }
611}