1use super::combinators::{execute_pipeline, execute_steps_parallel_resumable, PipelineStage};
47use super::executor::{execute_steps_parallel, AgentExecutor, AgentStepSpec, StepOutcome};
48use super::workflow_budget::{BudgetSnapshot, WorkflowBudget};
49use crate::agent::AgentEvent;
50use crate::store::SessionStore;
51use serde::{Deserialize, Serialize};
52use std::sync::atomic::{AtomicUsize, Ordering};
53use std::sync::Arc;
54use tokio::sync::broadcast;
55
56const DEFAULT_WORKFLOW_EVENT_CAPACITY: usize = 256;
60
61#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
67#[serde(tag = "type", rename_all = "snake_case")]
68pub enum WorkflowEvent {
69 PhaseStart {
71 name: String,
72 index: usize,
73 step_count: usize,
74 },
75 PhaseEnd {
77 name: String,
78 index: usize,
79 succeeded: usize,
80 failed: usize,
81 },
82 Log {
84 level: String,
85 message: String,
86 fields: serde_json::Value,
87 },
88 BudgetExhausted { resource: String, reason: String },
92}
93
94pub struct WorkflowBuilder {
97 executor: Arc<dyn AgentExecutor>,
98 store: Option<Arc<dyn SessionStore>>,
99 step_events: Option<broadcast::Sender<AgentEvent>>,
100 root_id: Option<String>,
101 budget: Option<Arc<WorkflowBudget>>,
102}
103
104impl WorkflowBuilder {
105 pub fn new(executor: Arc<dyn AgentExecutor>) -> Self {
108 Self {
109 executor,
110 store: None,
111 step_events: None,
112 root_id: None,
113 budget: None,
114 }
115 }
116
117 pub fn with_budget(mut self, budget: Arc<WorkflowBudget>) -> Self {
125 self.budget = Some(budget);
126 self
127 }
128
129 pub fn with_store(mut self, store: Arc<dyn SessionStore>) -> Self {
132 self.store = Some(store);
133 self
134 }
135
136 pub fn with_step_events(mut self, step_events: broadcast::Sender<AgentEvent>) -> Self {
139 self.step_events = Some(step_events);
140 self
141 }
142
143 pub fn with_root_id(mut self, root_id: impl Into<String>) -> Self {
147 self.root_id = Some(root_id.into());
148 self
149 }
150
151 pub fn build(self) -> Workflow {
153 let (events, _) = broadcast::channel(DEFAULT_WORKFLOW_EVENT_CAPACITY);
154 let root_id = self
155 .root_id
156 .unwrap_or_else(|| format!("wf-{}", uuid::Uuid::new_v4()));
157 Workflow {
158 executor: self.executor,
159 store: self.store,
160 events,
161 step_events: self.step_events,
162 root_id,
163 phase_seq: Arc::new(AtomicUsize::new(0)),
164 budget: self.budget,
165 }
166 }
167}
168
169#[derive(Clone)]
175pub struct Workflow {
176 executor: Arc<dyn AgentExecutor>,
177 store: Option<Arc<dyn SessionStore>>,
178 events: broadcast::Sender<WorkflowEvent>,
179 step_events: Option<broadcast::Sender<AgentEvent>>,
180 root_id: String,
181 phase_seq: Arc<AtomicUsize>,
182 budget: Option<Arc<WorkflowBudget>>,
183}
184
185impl Workflow {
186 pub fn builder(executor: Arc<dyn AgentExecutor>) -> WorkflowBuilder {
188 WorkflowBuilder::new(executor)
189 }
190
191 pub fn root_id(&self) -> &str {
193 &self.root_id
194 }
195
196 pub fn budget_snapshot(&self) -> Option<BudgetSnapshot> {
198 self.budget.as_ref().map(|b| b.snapshot())
199 }
200
201 pub fn subscribe(&self) -> broadcast::Receiver<WorkflowEvent> {
203 self.events.subscribe()
204 }
205
206 pub async fn agent(&self, spec: AgentStepSpec) -> StepOutcome {
210 let task_id = spec.task_id.clone();
211 let agent = spec.agent.clone();
212 execute_steps_parallel(
213 Arc::clone(&self.executor),
214 vec![spec],
215 self.step_events.clone(),
216 )
217 .await
218 .into_iter()
219 .next()
220 .unwrap_or_else(|| StepOutcome::failed(task_id, agent, "step produced no outcome"))
221 }
222
223 pub async fn parallel(&self, specs: Vec<AgentStepSpec>) -> Vec<StepOutcome> {
227 execute_steps_parallel(Arc::clone(&self.executor), specs, self.step_events.clone()).await
228 }
229
230 pub async fn phase(&self, name: &str, specs: Vec<AgentStepSpec>) -> Vec<StepOutcome> {
238 let index = self.phase_seq.fetch_add(1, Ordering::SeqCst);
239 let _ = self.events.send(WorkflowEvent::PhaseStart {
240 name: name.to_string(),
241 index,
242 step_count: specs.len(),
243 });
244
245 let out = match &self.store {
246 Some(store) => {
247 let workflow_id = format!("{}/{index}:{name}", self.root_id);
248 execute_steps_parallel_resumable(
249 Arc::clone(&self.executor),
250 specs,
251 &workflow_id,
252 Arc::clone(store),
253 self.step_events.clone(),
254 )
255 .await
256 }
257 None => self.parallel(specs).await,
258 };
259
260 let failed = out.iter().filter(|o| !o.success).count();
261 let _ = self.events.send(WorkflowEvent::PhaseEnd {
262 name: name.to_string(),
263 index,
264 succeeded: out.len() - failed,
265 failed,
266 });
267
268 if let Some(budget) = &self.budget {
272 if budget.is_exhausted() {
273 let snap = budget.snapshot();
274 let _ = self.events.send(WorkflowEvent::BudgetExhausted {
275 resource: "workflow_tokens".to_string(),
276 reason: format!(
277 "workflow token budget exhausted ({} / {} tokens)",
278 snap.consumed_tokens,
279 snap.limit_tokens.unwrap_or(0)
280 ),
281 });
282 }
283 }
284 out
285 }
286
287 pub async fn pipeline<I>(
291 &self,
292 items: Vec<I>,
293 stages: Vec<PipelineStage<I>>,
294 ) -> Vec<Option<StepOutcome>>
295 where
296 I: Send + 'static,
297 {
298 execute_pipeline(
299 Arc::clone(&self.executor),
300 items,
301 stages,
302 self.step_events.clone(),
303 )
304 .await
305 }
306
307 pub fn log(&self, level: &str, message: &str, fields: serde_json::Value) {
311 let _ = self.events.send(WorkflowEvent::Log {
312 level: level.to_string(),
313 message: message.to_string(),
314 fields,
315 });
316 }
317}
318
319#[cfg(test)]
320mod tests {
321 use super::*;
322 use crate::orchestration::WorkflowCheckpoint;
323 use crate::store::MemorySessionStore;
324 use async_trait::async_trait;
325 use std::collections::HashMap;
326
327 struct EchoExecutor {
329 ran: Arc<tokio::sync::Mutex<Vec<String>>>,
330 }
331
332 impl EchoExecutor {
333 fn new() -> Self {
334 Self {
335 ran: Arc::new(tokio::sync::Mutex::new(Vec::new())),
336 }
337 }
338 }
339
340 #[async_trait]
341 impl AgentExecutor for EchoExecutor {
342 async fn execute_step(
343 &self,
344 spec: AgentStepSpec,
345 _event_tx: Option<broadcast::Sender<AgentEvent>>,
346 ) -> StepOutcome {
347 self.ran.lock().await.push(spec.task_id.clone());
348 StepOutcome {
349 task_id: spec.task_id.clone(),
350 session_id: format!("task-run-{}", spec.task_id),
351 agent: spec.agent.clone(),
352 output: spec.prompt.clone(),
353 success: spec.agent != "fail",
354 structured: None,
355 }
356 }
357 fn concurrency_hint(&self) -> usize {
358 4
359 }
360 }
361
362 fn spec(id: &str, agent: &str, prompt: &str) -> AgentStepSpec {
363 AgentStepSpec::new(id, agent, "d", prompt)
364 }
365
366 #[tokio::test]
367 async fn agent_runs_a_single_step() {
368 let wf = Workflow::builder(Arc::new(EchoExecutor::new())).build();
369 let out = wf.agent(spec("a", "explore", "hello")).await;
370 assert_eq!(out.task_id, "a");
371 assert_eq!(out.output, "hello");
372 assert!(out.success);
373 }
374
375 #[tokio::test]
376 async fn parallel_preserves_order_and_isolates_failure() {
377 let wf = Workflow::builder(Arc::new(EchoExecutor::new())).build();
378 let out = wf
379 .parallel(vec![
380 spec("a", "explore", "pa"),
381 spec("b", "fail", "pb"),
382 spec("c", "review", "pc"),
383 ])
384 .await;
385 assert_eq!(
386 out.iter().map(|o| o.task_id.as_str()).collect::<Vec<_>>(),
387 vec!["a", "b", "c"]
388 );
389 assert!(out[0].success);
390 assert!(
391 !out[1].success,
392 "the failing branch surfaces as success=false"
393 );
394 assert!(out[2].success);
395 }
396
397 #[tokio::test]
398 async fn phase_emits_start_and_end_milestones() {
399 let wf = Workflow::builder(Arc::new(EchoExecutor::new())).build();
400 let mut rx = wf.subscribe();
401 let out = wf
402 .phase(
403 "review",
404 vec![spec("a", "review", "p"), spec("b", "fail", "p")],
405 )
406 .await;
407 assert_eq!(out.len(), 2);
408
409 let start = rx.recv().await.unwrap();
410 assert_eq!(
411 start,
412 WorkflowEvent::PhaseStart {
413 name: "review".to_string(),
414 index: 0,
415 step_count: 2,
416 }
417 );
418 let end = rx.recv().await.unwrap();
419 assert_eq!(
420 end,
421 WorkflowEvent::PhaseEnd {
422 name: "review".to_string(),
423 index: 0,
424 succeeded: 1,
425 failed: 1,
426 }
427 );
428 }
429
430 #[tokio::test]
431 async fn phases_get_monotonic_indices() {
432 let wf = Workflow::builder(Arc::new(EchoExecutor::new())).build();
433 let mut rx = wf.subscribe();
434 wf.phase("one", vec![spec("a", "explore", "p")]).await;
435 wf.phase("two", vec![spec("b", "explore", "p")]).await;
436
437 let indices: Vec<usize> = {
438 let mut seen = Vec::new();
439 while let Ok(ev) = rx.try_recv() {
440 if let WorkflowEvent::PhaseStart { index, .. } = ev {
441 seen.push(index);
442 }
443 }
444 seen
445 };
446 assert_eq!(indices, vec![0, 1], "phase indices increment per call");
447 }
448
449 #[tokio::test]
450 async fn phase_with_store_resumes_from_checkpoint() {
451 let store: Arc<dyn SessionStore> = Arc::new(MemorySessionStore::new());
452 let exec = Arc::new(EchoExecutor::new());
453 let ran = Arc::clone(&exec.ran);
454 let wf = Workflow::builder(exec)
455 .with_store(Arc::clone(&store))
456 .with_root_id("root")
457 .build();
458
459 let mut done = HashMap::new();
462 done.insert(
463 "a".to_string(),
464 StepOutcome {
465 task_id: "a".into(),
466 session_id: "task-run-a".into(),
467 agent: "explore".into(),
468 output: "cached-a".into(),
469 success: true,
470 structured: None,
471 },
472 );
473 store
474 .save_workflow_checkpoint(
475 "root/0:implement",
476 &WorkflowCheckpoint::from_completed("root/0:implement", &done, 1),
477 )
478 .await
479 .unwrap();
480
481 let out = wf
482 .phase(
483 "implement",
484 vec![spec("a", "explore", "pa"), spec("b", "review", "pb")],
485 )
486 .await;
487
488 assert_eq!(
489 *ran.lock().await,
490 vec!["b".to_string()],
491 "only the not-yet-completed step actually runs"
492 );
493 assert_eq!(
494 out[0].output, "cached-a",
495 "completed step reuses its cached outcome"
496 );
497 assert!(out.iter().all(|o| o.success));
498 }
499
500 #[tokio::test]
501 async fn pipeline_chains_stages_per_item() {
502 let wf = Workflow::builder(Arc::new(EchoExecutor::new())).build();
503 let stages: Vec<PipelineStage<&str>> = vec![
504 Arc::new(|_prev: Option<&StepOutcome>, item: &&str| {
505 Some(AgentStepSpec::new("s1", "explore", "d", *item))
506 }),
507 Arc::new(|prev: Option<&StepOutcome>, _item: &&str| {
508 let prior = prev.map(|o| o.output.clone()).unwrap_or_default();
509 Some(AgentStepSpec::new(
510 "s2",
511 "review",
512 "d",
513 format!("review of: {prior}"),
514 ))
515 }),
516 ];
517 let out = wf.pipeline(vec!["alpha", "beta"], stages).await;
518 assert_eq!(out.len(), 2);
519 assert_eq!(out[0].as_ref().unwrap().output, "review of: alpha");
520 assert_eq!(out[1].as_ref().unwrap().output, "review of: beta");
521 }
522
523 #[tokio::test]
524 async fn log_emits_a_log_event() {
525 let wf = Workflow::builder(Arc::new(EchoExecutor::new())).build();
526 let mut rx = wf.subscribe();
527 wf.log("info", "hello", serde_json::json!({ "k": 1 }));
528 let ev = rx.recv().await.unwrap();
529 assert_eq!(
530 ev,
531 WorkflowEvent::Log {
532 level: "info".to_string(),
533 message: "hello".to_string(),
534 fields: serde_json::json!({ "k": 1 }),
535 }
536 );
537 }
538
539 #[tokio::test]
540 async fn phase_emits_budget_exhausted_when_capped() {
541 use crate::budget::BudgetGuard;
542 use crate::llm::TokenUsage;
543
544 let budget = Arc::new(WorkflowBudget::new(Some(10)));
545 budget
548 .record_after_llm(
549 "s",
550 &TokenUsage {
551 total_tokens: 12,
552 ..Default::default()
553 },
554 )
555 .await;
556
557 let wf = Workflow::builder(Arc::new(EchoExecutor::new()))
558 .with_budget(Arc::clone(&budget))
559 .build();
560 let mut rx = wf.subscribe();
561 wf.phase("p", vec![spec("a", "explore", "p")]).await;
562
563 let mut saw_exhausted = false;
564 while let Ok(ev) = rx.try_recv() {
565 if let WorkflowEvent::BudgetExhausted { resource, .. } = ev {
566 assert_eq!(resource, "workflow_tokens");
567 saw_exhausted = true;
568 }
569 }
570 assert!(
571 saw_exhausted,
572 "phase boundary emits BudgetExhausted once capped"
573 );
574 assert_eq!(wf.budget_snapshot().unwrap().consumed_tokens, 12);
575 }
576
577 #[tokio::test]
578 async fn no_budget_means_no_snapshot_and_no_exhausted_event() {
579 let wf = Workflow::builder(Arc::new(EchoExecutor::new())).build();
580 let mut rx = wf.subscribe();
581 wf.phase("p", vec![spec("a", "explore", "p")]).await;
582 assert!(wf.budget_snapshot().is_none());
583 while let Ok(ev) = rx.try_recv() {
584 assert!(
585 !matches!(ev, WorkflowEvent::BudgetExhausted { .. }),
586 "no budget → never a BudgetExhausted event"
587 );
588 }
589 }
590
591 #[tokio::test]
592 async fn agent_returns_failed_outcome_when_executor_yields_nothing() {
593 struct Empty;
596 #[async_trait]
597 impl AgentExecutor for Empty {
598 async fn execute_step(
599 &self,
600 spec: AgentStepSpec,
601 _tx: Option<broadcast::Sender<AgentEvent>>,
602 ) -> StepOutcome {
603 StepOutcome::failed(spec.task_id, spec.agent, "boom")
604 }
605 }
606 let wf = Workflow::builder(Arc::new(Empty)).build();
607 let out = wf.agent(spec("x", "explore", "p")).await;
608 assert_eq!(out.task_id, "x");
609 assert!(!out.success);
610 }
611}