Skip to main content

pulsehive_runtime/
workflow.rs

1//! Workflow execution engine — Sequential, Parallel, Loop agent orchestration.
2//!
3//! This module provides `dispatch_agent()`, the central routing function that handles
4//! all `AgentKind` variants. LLM agents are dispatched to the agentic loop; workflow
5//! agents are dispatched to their respective executors.
6//!
7//! ## Architecture
8//!
9//! `WorkflowContext` carries all shared resources as owned/Arc types (no lifetimes).
10//! This is critical for the Parallel executor which needs to `tokio::spawn` child tasks
11//! (requiring `'static`). When dispatching to the agentic loop, a temporary
12//! `LoopContext` is created with borrows scoped to that call.
13
14use std::collections::HashMap;
15use std::sync::Arc;
16
17use pulsedb::SubstrateProvider;
18use tracing::Instrument;
19
20use pulsehive_core::agent::{
21    AgentDefinition, AgentKind, AgentKindTag, AgentOutcome, LlmAgentConfig,
22};
23use pulsehive_core::approval::ApprovalHandler;
24use pulsehive_core::event::{EventBus, HiveEvent};
25use pulsehive_core::llm::LlmProvider;
26
27use crate::agentic_loop::{self, LoopContext, DEFAULT_MAX_ITERATIONS};
28use crate::hivemind::Task;
29
30/// Owned context for workflow execution.
31///
32/// All fields are owned or `Arc`-wrapped, making this `Clone`-able for cheap sharing
33/// across parallel child tasks. Cloning bumps reference counts — no deep copies.
34#[derive(Clone)]
35pub(crate) struct WorkflowContext {
36    /// The task being executed (description + collective ID).
37    pub task: Task,
38    /// Named LLM providers registered with HiveMind.
39    pub llm_providers: HashMap<String, Arc<dyn LlmProvider>>,
40    /// Shared substrate for experience storage and retrieval.
41    pub substrate: Arc<dyn SubstrateProvider>,
42    /// Handler for tool approval requests.
43    pub approval_handler: Arc<dyn ApprovalHandler>,
44    /// Event broadcaster for lifecycle and observability events.
45    pub event_emitter: EventBus,
46    /// Optional embedding provider for computing embeddings before storage.
47    pub embedding_provider: Option<Arc<dyn pulsehive_core::embedding::EmbeddingProvider>>,
48}
49
50/// Dispatch an agent to the appropriate executor based on its kind.
51///
52/// This is the central routing function for all agent types:
53/// - `Llm` → agentic loop (Perceive→Think→Act→Record)
54/// - `Sequential` / `Parallel` / `Loop` → workflow executors
55///
56/// Each call emits `AgentStarted` and `AgentCompleted` events, enabling
57/// observability at every nesting level.
58///
59/// Uses `Box::pin` internally for recursive dispatch (Sequential/Parallel/Loop
60/// call back into `dispatch_agent`, creating recursive futures that need heap allocation).
61pub(crate) fn dispatch_agent(
62    agent: AgentDefinition,
63    ctx: &WorkflowContext,
64) -> std::pin::Pin<Box<dyn std::future::Future<Output = AgentOutcome> + Send + '_>> {
65    let agent_name = agent.name.clone();
66    let kind_tag = agent_kind_tag(&agent.kind);
67    let span = tracing::info_span!("dispatch_agent", agent_name = %agent_name, kind = ?kind_tag);
68    Box::pin(
69        async move {
70            let agent_id = uuid::Uuid::now_v7().to_string();
71
72            // Emit lifecycle start event
73            ctx.event_emitter.emit(HiveEvent::AgentStarted {
74                agent_id: agent_id.clone(),
75                name: agent.name.clone(),
76                kind: agent_kind_tag(&agent.kind),
77            });
78
79            let outcome = match agent.kind {
80                AgentKind::Llm(config) => run_llm_agent(&agent_id, *config, ctx).await,
81                AgentKind::Sequential(children) => run_sequential(children, ctx).await,
82                AgentKind::Parallel(children) => run_parallel(children, ctx).await,
83                AgentKind::Loop {
84                    agent,
85                    max_iterations,
86                } => run_loop(*agent, max_iterations, ctx).await,
87            };
88
89            // Emit lifecycle completion event
90            ctx.event_emitter.emit(HiveEvent::AgentCompleted {
91                agent_id,
92                outcome: outcome.clone(),
93            });
94
95            outcome
96        }
97        .instrument(span),
98    ) // Box::pin
99}
100
101/// Execute child agents sequentially — each starts after the previous completes.
102///
103/// Children share the substrate and collective, so each child's Perceive phase
104/// naturally finds experiences recorded by all previous children. This is the
105/// "shared consciousness" model — no explicit data passing between agents.
106///
107/// Returns the last child's outcome. Stops early on error or `MaxIterationsReached`.
108/// Empty children list returns `Complete` with empty response.
109async fn run_sequential(children: Vec<AgentDefinition>, ctx: &WorkflowContext) -> AgentOutcome {
110    tracing::info!(child_count = children.len(), "Sequential workflow started");
111
112    if children.is_empty() {
113        return AgentOutcome::Complete {
114            response: String::new(),
115        };
116    }
117
118    let mut last_response = String::new();
119    for (i, child) in children.into_iter().enumerate() {
120        tracing::info!(child_index = i, child_name = %child.name, "Sequential: running child");
121        let outcome = dispatch_agent(child, ctx).await;
122        match &outcome {
123            AgentOutcome::Complete { response } => {
124                last_response = response.clone();
125            }
126            AgentOutcome::Error { .. } | AgentOutcome::MaxIterationsReached => {
127                return outcome;
128            }
129        }
130    }
131    AgentOutcome::Complete {
132        response: last_response,
133    }
134}
135
136/// Execute child agents in parallel — all spawned as concurrent Tokio tasks.
137///
138/// Children share the substrate (via `Arc`) and can perceive each other's
139/// experiences as they're written. Each child gets a cloned `WorkflowContext`
140/// (cheap: just Arc reference count bumps).
141///
142/// Returns combined responses on success. If any child errors, reports all
143/// errors but still waits for all children to complete (no early cancellation).
144async fn run_parallel(children: Vec<AgentDefinition>, ctx: &WorkflowContext) -> AgentOutcome {
145    tracing::info!(child_count = children.len(), "Parallel workflow started");
146
147    if children.is_empty() {
148        return AgentOutcome::Complete {
149            response: String::new(),
150        };
151    }
152
153    let child_count = children.len();
154    tracing::info!(child_count, "Parallel: spawning children");
155
156    let mut join_set = tokio::task::JoinSet::new();
157    for child in children {
158        let child_ctx = ctx.clone();
159        join_set.spawn(async move { dispatch_agent(child, &child_ctx).await });
160    }
161
162    let mut responses = Vec::new();
163    let mut errors = Vec::new();
164    while let Some(result) = join_set.join_next().await {
165        match result {
166            Ok(AgentOutcome::Complete { response }) => {
167                responses.push(response);
168            }
169            Ok(outcome) => {
170                errors.push(format!("{outcome:?}"));
171            }
172            Err(join_err) => {
173                errors.push(format!("Task panic: {join_err}"));
174            }
175        }
176    }
177
178    if !errors.is_empty() {
179        AgentOutcome::Error {
180            error: errors.join("; "),
181        }
182    } else {
183        AgentOutcome::Complete {
184            response: responses.join("\n"),
185        }
186    }
187}
188
189/// Completion signal: if a child agent's response contains this string,
190/// the loop terminates early. This is a convention — the LLM is instructed
191/// to include `[LOOP_DONE]` when it's satisfied with the result.
192const LOOP_DONE_SIGNAL: &str = "[LOOP_DONE]";
193
194/// Execute a child agent repeatedly up to `max_iterations` times.
195///
196/// Each iteration clones the child definition (cheap: Arc bumps for tools)
197/// and dispatches it. The loop terminates early if:
198/// - The child's response contains `[LOOP_DONE]`
199/// - The child returns an error
200///
201/// Each iteration perceives cumulative experiences from all prior iterations
202/// via the shared substrate.
203async fn run_loop(
204    child: AgentDefinition,
205    max_iterations: usize,
206    ctx: &WorkflowContext,
207) -> AgentOutcome {
208    tracing::info!(max_iterations, "Loop workflow started");
209
210    if max_iterations == 0 {
211        tracing::warn!("Loop with max_iterations=0, returning immediately");
212        return AgentOutcome::Complete {
213            response: String::new(),
214        };
215    }
216
217    let mut last_outcome = AgentOutcome::MaxIterationsReached;
218    for i in 0..max_iterations {
219        tracing::info!(
220            iteration = i + 1,
221            max = max_iterations,
222            "Loop: starting iteration"
223        );
224        let outcome = dispatch_agent(child.clone(), ctx).await;
225
226        match &outcome {
227            AgentOutcome::Complete { response } if response.contains(LOOP_DONE_SIGNAL) => {
228                tracing::info!(iteration = i + 1, "Loop: completion signal received");
229                last_outcome = outcome;
230                break;
231            }
232            AgentOutcome::Error { .. } => {
233                tracing::warn!(iteration = i + 1, "Loop: child errored, stopping");
234                return outcome;
235            }
236            _ => {
237                last_outcome = outcome;
238            }
239        }
240    }
241    last_outcome
242}
243
244/// Execute an LLM agent through the agentic loop.
245///
246/// Resolves the named LLM provider from the context, creates a scoped
247/// [`LoopContext`] with borrowed fields, and delegates to `run_agentic_loop`.
248async fn run_llm_agent(
249    agent_id: &str,
250    config: LlmAgentConfig,
251    ctx: &WorkflowContext,
252) -> AgentOutcome {
253    // Resolve the LLM provider by name
254    let provider_name = &config.llm_config.provider;
255    let provider = match ctx.llm_providers.get(provider_name) {
256        Some(p) => p.clone(),
257        None => {
258            return AgentOutcome::Error {
259                error: format!(
260                    "LLM provider '{}' not registered. Available: {:?}",
261                    provider_name,
262                    ctx.llm_providers.keys().collect::<Vec<_>>()
263                ),
264            };
265        }
266    };
267
268    // Create a scoped LoopContext — borrows from WorkflowContext are local to this call
269    agentic_loop::run_agentic_loop(
270        config,
271        LoopContext {
272            agent_id: agent_id.to_string(),
273            task: &ctx.task,
274            provider,
275            substrate: Arc::clone(&ctx.substrate),
276            approval_handler: ctx.approval_handler.as_ref(),
277            event_emitter: ctx.event_emitter.clone(),
278            max_iterations: DEFAULT_MAX_ITERATIONS,
279            embedding_provider: ctx.embedding_provider.clone(),
280        },
281    )
282    .await
283}
284
285/// Extract a compact kind tag from an agent kind (for event reporting).
286fn agent_kind_tag(kind: &AgentKind) -> AgentKindTag {
287    match kind {
288        AgentKind::Llm(_) => AgentKindTag::Llm,
289        AgentKind::Sequential(_) => AgentKindTag::Sequential,
290        AgentKind::Parallel(_) => AgentKindTag::Parallel,
291        AgentKind::Loop { .. } => AgentKindTag::Loop,
292    }
293}
294
295#[cfg(test)]
296mod tests {
297    use super::*;
298    use std::sync::Mutex;
299
300    use async_trait::async_trait;
301    use pulsehive_core::lens::Lens;
302    use pulsehive_core::llm::*;
303
304    // ── Mock LLM ─────────────────────────────────────────────────────
305
306    struct MockLlm {
307        responses: Mutex<Vec<LlmResponse>>,
308    }
309
310    impl MockLlm {
311        fn new(responses: Vec<LlmResponse>) -> Self {
312            Self {
313                responses: Mutex::new(responses),
314            }
315        }
316
317        fn text_response(content: &str) -> LlmResponse {
318            LlmResponse {
319                content: Some(content.into()),
320                tool_calls: vec![],
321                usage: TokenUsage::default(),
322            }
323        }
324    }
325
326    #[async_trait]
327    impl LlmProvider for MockLlm {
328        async fn chat(
329            &self,
330            _messages: Vec<Message>,
331            _tools: Vec<ToolDefinition>,
332            _config: &LlmConfig,
333        ) -> pulsehive_core::error::Result<LlmResponse> {
334            let mut responses = self.responses.lock().unwrap();
335            if responses.is_empty() {
336                Err(pulsehive_core::error::PulseHiveError::llm(
337                    "No more scripted responses",
338                ))
339            } else {
340                Ok(responses.remove(0))
341            }
342        }
343
344        async fn chat_stream(
345            &self,
346            _messages: Vec<Message>,
347            _tools: Vec<ToolDefinition>,
348            _config: &LlmConfig,
349        ) -> pulsehive_core::error::Result<
350            std::pin::Pin<
351                Box<
352                    dyn futures_core::Stream<Item = pulsehive_core::error::Result<LlmChunk>> + Send,
353                >,
354            >,
355        > {
356            Err(pulsehive_core::error::PulseHiveError::llm(
357                "Streaming not used in tests",
358            ))
359        }
360    }
361
362    // ── Helpers ──────────────────────────────────────────────────────
363
364    fn test_substrate() -> Arc<dyn SubstrateProvider> {
365        let dir = tempfile::tempdir().unwrap();
366        let db =
367            pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default()).unwrap();
368        Box::leak(Box::new(dir));
369        Arc::new(pulsedb::PulseDBSubstrate::from_db(db))
370    }
371
372    async fn test_workflow_ctx(provider: MockLlm) -> WorkflowContext {
373        let substrate = test_substrate();
374        let collective_id = substrate
375            .get_or_create_collective("test-workflow")
376            .await
377            .unwrap();
378
379        let mut providers: HashMap<String, Arc<dyn LlmProvider>> = HashMap::new();
380        providers.insert("mock".into(), Arc::new(provider));
381
382        WorkflowContext {
383            task: Task::with_collective("Test task", collective_id),
384            llm_providers: providers,
385            substrate,
386            approval_handler: Arc::new(pulsehive_core::approval::AutoApprove),
387            event_emitter: EventBus::default(),
388            embedding_provider: None,
389        }
390    }
391
392    fn llm_agent_def(name: &str) -> AgentDefinition {
393        AgentDefinition {
394            name: name.into(),
395            kind: AgentKind::Llm(Box::new(LlmAgentConfig {
396                system_prompt: "You are a test agent.".into(),
397                tools: vec![],
398                lens: Lens::default(),
399                llm_config: LlmConfig::new("mock", "test-model"),
400                experience_extractor: None,
401                refresh_every_n_tool_calls: None,
402            })),
403        }
404    }
405
406    // ── Tests ────────────────────────────────────────────────────────
407
408    #[tokio::test]
409    async fn test_dispatch_llm_agent_completes() {
410        let provider = MockLlm::new(vec![MockLlm::text_response("Hello from dispatch!")]);
411        let ctx = test_workflow_ctx(provider).await;
412
413        let agent = llm_agent_def("test-agent");
414        let outcome = dispatch_agent(agent, &ctx).await;
415
416        assert!(
417            matches!(&outcome, AgentOutcome::Complete { response } if response == "Hello from dispatch!"),
418            "Expected Complete, got: {outcome:?}"
419        );
420    }
421
422    #[tokio::test]
423    async fn test_dispatch_llm_agent_emits_events() {
424        let provider = MockLlm::new(vec![MockLlm::text_response("Done")]);
425        let ctx = test_workflow_ctx(provider).await;
426        let mut rx = ctx.event_emitter.subscribe();
427
428        let agent = llm_agent_def("evented-agent");
429        let _outcome = dispatch_agent(agent, &ctx).await;
430
431        // Collect all events
432        let mut events = vec![];
433        while let Ok(event) = rx.try_recv() {
434            events.push(event);
435        }
436
437        // First event should be AgentStarted
438        assert!(
439            matches!(&events[0], HiveEvent::AgentStarted { name, kind, .. }
440                if name == "evented-agent" && *kind == AgentKindTag::Llm),
441            "Expected AgentStarted, got: {:?}",
442            events.first()
443        );
444
445        // Last event should be AgentCompleted
446        assert!(
447            matches!(
448                events.last(),
449                Some(HiveEvent::AgentCompleted {
450                    outcome: AgentOutcome::Complete { .. },
451                    ..
452                })
453            ),
454            "Expected AgentCompleted, got: {:?}",
455            events.last()
456        );
457    }
458
459    #[tokio::test]
460    async fn test_dispatch_missing_provider_returns_error() {
461        let provider = MockLlm::new(vec![]);
462        let ctx = test_workflow_ctx(provider).await;
463
464        // Agent uses a provider name that doesn't exist
465        let agent = AgentDefinition {
466            name: "bad-provider".into(),
467            kind: AgentKind::Llm(Box::new(LlmAgentConfig {
468                system_prompt: "test".into(),
469                tools: vec![],
470                lens: Lens::default(),
471                llm_config: LlmConfig::new("nonexistent", "model"),
472                experience_extractor: None,
473                refresh_every_n_tool_calls: None,
474            })),
475        };
476
477        let outcome = dispatch_agent(agent, &ctx).await;
478        assert!(
479            matches!(&outcome, AgentOutcome::Error { error } if error.contains("nonexistent")),
480            "Expected provider error, got: {outcome:?}"
481        );
482    }
483
484    #[tokio::test]
485    async fn test_sequential_empty_children() {
486        let provider = MockLlm::new(vec![]);
487        let ctx = test_workflow_ctx(provider).await;
488
489        let agent = AgentDefinition {
490            name: "seq".into(),
491            kind: AgentKind::Sequential(vec![]),
492        };
493
494        let outcome = dispatch_agent(agent, &ctx).await;
495        assert!(
496            matches!(&outcome, AgentOutcome::Complete { response } if response.is_empty()),
497            "Empty Sequential should return Complete with empty response, got: {outcome:?}"
498        );
499    }
500
501    #[tokio::test]
502    async fn test_sequential_two_children_in_order() {
503        let provider = MockLlm::new(vec![
504            MockLlm::text_response("First done"),
505            MockLlm::text_response("Second done"),
506        ]);
507        let ctx = test_workflow_ctx(provider).await;
508
509        let agent = AgentDefinition {
510            name: "pipeline".into(),
511            kind: AgentKind::Sequential(vec![llm_agent_def("step-1"), llm_agent_def("step-2")]),
512        };
513
514        let outcome = dispatch_agent(agent, &ctx).await;
515        assert!(
516            matches!(&outcome, AgentOutcome::Complete { response } if response == "Second done"),
517            "Sequential should return last child's response, got: {outcome:?}"
518        );
519    }
520
521    #[tokio::test]
522    async fn test_sequential_error_stops_execution() {
523        // Only one response — first child gets it, second would fail if reached
524        let provider = MockLlm::new(vec![]);
525        let ctx = test_workflow_ctx(provider).await;
526
527        let agent = AgentDefinition {
528            name: "failing-seq".into(),
529            kind: AgentKind::Sequential(vec![
530                llm_agent_def("will-error"),
531                llm_agent_def("should-not-run"),
532            ]),
533        };
534
535        let outcome = dispatch_agent(agent, &ctx).await;
536        // First child errors (no LLM responses), second never runs
537        assert!(
538            matches!(&outcome, AgentOutcome::Error { .. }),
539            "Sequential should stop on first error, got: {outcome:?}"
540        );
541    }
542
543    #[tokio::test]
544    async fn test_workflow_context_is_clone() {
545        let provider = MockLlm::new(vec![]);
546        let ctx = test_workflow_ctx(provider).await;
547        let _cloned = ctx.clone(); // Compile-time proof that Clone works
548    }
549
550    #[tokio::test]
551    async fn test_parallel_empty_children() {
552        let provider = MockLlm::new(vec![]);
553        let ctx = test_workflow_ctx(provider).await;
554
555        let agent = AgentDefinition {
556            name: "par".into(),
557            kind: AgentKind::Parallel(vec![]),
558        };
559
560        let outcome = dispatch_agent(agent, &ctx).await;
561        assert!(
562            matches!(&outcome, AgentOutcome::Complete { response } if response.is_empty()),
563            "Empty Parallel should return Complete with empty response, got: {outcome:?}"
564        );
565    }
566
567    #[tokio::test]
568    async fn test_parallel_two_children_both_complete() {
569        let provider = MockLlm::new(vec![
570            MockLlm::text_response("Alpha result"),
571            MockLlm::text_response("Beta result"),
572        ]);
573        let ctx = test_workflow_ctx(provider).await;
574
575        let agent = AgentDefinition {
576            name: "par".into(),
577            kind: AgentKind::Parallel(vec![llm_agent_def("alpha"), llm_agent_def("beta")]),
578        };
579
580        let outcome = dispatch_agent(agent, &ctx).await;
581        match &outcome {
582            AgentOutcome::Complete { response } => {
583                // Both responses should appear (order may vary due to concurrency)
584                assert!(
585                    response.contains("result"),
586                    "Should contain child responses, got: {response}"
587                );
588            }
589            other => panic!("Expected Complete, got: {other:?}"),
590        }
591    }
592
593    #[tokio::test]
594    async fn test_parallel_one_error_reports_all() {
595        // Only one response — one child succeeds, other errors
596        let provider = MockLlm::new(vec![MockLlm::text_response("I succeeded")]);
597        let ctx = test_workflow_ctx(provider).await;
598
599        let agent = AgentDefinition {
600            name: "par-err".into(),
601            kind: AgentKind::Parallel(vec![
602                llm_agent_def("will-succeed"),
603                llm_agent_def("will-error"),
604            ]),
605        };
606
607        let outcome = dispatch_agent(agent, &ctx).await;
608        assert!(
609            matches!(&outcome, AgentOutcome::Error { .. }),
610            "Parallel with one error should return Error, got: {outcome:?}"
611        );
612    }
613
614    #[tokio::test]
615    async fn test_loop_zero_iterations() {
616        let provider = MockLlm::new(vec![]);
617        let ctx = test_workflow_ctx(provider).await;
618
619        let agent = AgentDefinition {
620            name: "loop-0".into(),
621            kind: AgentKind::Loop {
622                agent: Box::new(llm_agent_def("child")),
623                max_iterations: 0,
624            },
625        };
626
627        let outcome = dispatch_agent(agent, &ctx).await;
628        assert!(
629            matches!(&outcome, AgentOutcome::Complete { response } if response.is_empty()),
630            "Loop with 0 iterations should return Complete empty, got: {outcome:?}"
631        );
632    }
633
634    #[tokio::test]
635    async fn test_loop_runs_n_times() {
636        let provider = MockLlm::new(vec![
637            MockLlm::text_response("Iteration 1"),
638            MockLlm::text_response("Iteration 2"),
639        ]);
640        let ctx = test_workflow_ctx(provider).await;
641
642        let agent = AgentDefinition {
643            name: "loop-2".into(),
644            kind: AgentKind::Loop {
645                agent: Box::new(llm_agent_def("worker")),
646                max_iterations: 2,
647            },
648        };
649
650        let outcome = dispatch_agent(agent, &ctx).await;
651        // After 2 iterations without [LOOP_DONE], returns MaxIterationsReached
652        // But since both completed successfully, last_outcome = last Complete
653        assert!(
654            matches!(&outcome, AgentOutcome::Complete { response } if response == "Iteration 2"),
655            "Loop should return last iteration's response, got: {outcome:?}"
656        );
657    }
658
659    #[tokio::test]
660    async fn test_loop_early_exit_on_done_signal() {
661        let provider = MockLlm::new(vec![
662            MockLlm::text_response("Still working..."),
663            MockLlm::text_response("All done [LOOP_DONE]"),
664            MockLlm::text_response("Should not reach this"),
665        ]);
666        let ctx = test_workflow_ctx(provider).await;
667
668        let agent = AgentDefinition {
669            name: "loop-done".into(),
670            kind: AgentKind::Loop {
671                agent: Box::new(llm_agent_def("worker")),
672                max_iterations: 5,
673            },
674        };
675
676        let outcome = dispatch_agent(agent, &ctx).await;
677        assert!(
678            matches!(&outcome, AgentOutcome::Complete { response } if response.contains("[LOOP_DONE]")),
679            "Loop should exit on LOOP_DONE signal, got: {outcome:?}"
680        );
681    }
682
683    #[tokio::test]
684    async fn test_loop_error_stops() {
685        // One response, then error (no more responses)
686        let provider = MockLlm::new(vec![MockLlm::text_response("First iteration ok")]);
687        let ctx = test_workflow_ctx(provider).await;
688
689        let agent = AgentDefinition {
690            name: "loop-err".into(),
691            kind: AgentKind::Loop {
692                agent: Box::new(llm_agent_def("worker")),
693                max_iterations: 5,
694            },
695        };
696
697        let outcome = dispatch_agent(agent, &ctx).await;
698        assert!(
699            matches!(&outcome, AgentOutcome::Error { .. }),
700            "Loop should stop on error, got: {outcome:?}"
701        );
702    }
703
704    // ── Ticket #46: Sequential event ordering ────────────────────────
705
706    #[tokio::test]
707    async fn test_sequential_events_ordered() {
708        let provider = MockLlm::new(vec![
709            MockLlm::text_response("A done"),
710            MockLlm::text_response("B done"),
711        ]);
712        let ctx = test_workflow_ctx(provider).await;
713        let mut rx = ctx.event_emitter.subscribe();
714
715        let agent = AgentDefinition {
716            name: "seq-events".into(),
717            kind: AgentKind::Sequential(vec![llm_agent_def("child-a"), llm_agent_def("child-b")]),
718        };
719
720        let _outcome = dispatch_agent(agent, &ctx).await;
721
722        // Collect events
723        let mut events = vec![];
724        while let Ok(event) = rx.try_recv() {
725            events.push(event);
726        }
727
728        // Find AgentStarted events for the LLM children (not the Sequential wrapper)
729        let started_names: Vec<&str> = events
730            .iter()
731            .filter_map(|e| match e {
732                HiveEvent::AgentStarted {
733                    name,
734                    kind: AgentKindTag::Llm,
735                    ..
736                } => Some(name.as_str()),
737                _ => None,
738            })
739            .collect();
740
741        // child-a should start before child-b (sequential ordering)
742        assert_eq!(
743            started_names,
744            vec!["child-a", "child-b"],
745            "Sequential children should start in order"
746        );
747    }
748
749    // ── Ticket #47: Parallel event verification ──────────────────────
750
751    #[tokio::test]
752    async fn test_parallel_events_for_all_children() {
753        let provider = MockLlm::new(vec![
754            MockLlm::text_response("Alpha"),
755            MockLlm::text_response("Beta"),
756        ]);
757        let ctx = test_workflow_ctx(provider).await;
758        let mut rx = ctx.event_emitter.subscribe();
759
760        let agent = AgentDefinition {
761            name: "par-events".into(),
762            kind: AgentKind::Parallel(vec![llm_agent_def("alpha"), llm_agent_def("beta")]),
763        };
764
765        let _outcome = dispatch_agent(agent, &ctx).await;
766
767        let mut events = vec![];
768        while let Ok(event) = rx.try_recv() {
769            events.push(event);
770        }
771
772        // Both children should have AgentStarted events
773        let started_names: Vec<&str> = events
774            .iter()
775            .filter_map(|e| match e {
776                HiveEvent::AgentStarted {
777                    name,
778                    kind: AgentKindTag::Llm,
779                    ..
780                } => Some(name.as_str()),
781                _ => None,
782            })
783            .collect();
784
785        assert!(
786            started_names.contains(&"alpha"),
787            "alpha should have AgentStarted"
788        );
789        assert!(
790            started_names.contains(&"beta"),
791            "beta should have AgentStarted"
792        );
793
794        // Both should have AgentCompleted events
795        let completed_count = events
796            .iter()
797            .filter(|e| {
798                matches!(
799                    e,
800                    HiveEvent::AgentCompleted {
801                        outcome: AgentOutcome::Complete { .. },
802                        ..
803                    }
804                )
805            })
806            .count();
807        assert!(
808            completed_count >= 2,
809            "Both children should complete, got {completed_count}"
810        );
811    }
812
813    // ── Ticket #48: Loop additional tests ────────────────────────────
814
815    #[tokio::test]
816    async fn test_loop_single_iteration() {
817        let provider = MockLlm::new(vec![MockLlm::text_response("Only once")]);
818        let ctx = test_workflow_ctx(provider).await;
819
820        let agent = AgentDefinition {
821            name: "loop-1".into(),
822            kind: AgentKind::Loop {
823                agent: Box::new(llm_agent_def("worker")),
824                max_iterations: 1,
825            },
826        };
827
828        let outcome = dispatch_agent(agent, &ctx).await;
829        assert!(
830            matches!(&outcome, AgentOutcome::Complete { response } if response == "Only once"),
831            "Loop max=1 should run exactly once, got: {outcome:?}"
832        );
833    }
834
835    #[tokio::test]
836    async fn test_loop_all_iterations_complete_returns_last() {
837        let provider = MockLlm::new(vec![
838            MockLlm::text_response("Iter 1"),
839            MockLlm::text_response("Iter 2"),
840            MockLlm::text_response("Iter 3"),
841        ]);
842        let ctx = test_workflow_ctx(provider).await;
843
844        let agent = AgentDefinition {
845            name: "loop-3".into(),
846            kind: AgentKind::Loop {
847                agent: Box::new(llm_agent_def("worker")),
848                max_iterations: 3,
849            },
850        };
851
852        let outcome = dispatch_agent(agent, &ctx).await;
853        // All 3 completed without [LOOP_DONE], returns last Complete
854        assert!(
855            matches!(&outcome, AgentOutcome::Complete { response } if response == "Iter 3"),
856            "Loop should return last iteration's response, got: {outcome:?}"
857        );
858    }
859
860    // ── Ticket #50: Edge cases ───────────────────────────────────────
861
862    #[tokio::test]
863    async fn test_single_child_sequential() {
864        let provider = MockLlm::new(vec![MockLlm::text_response("Solo")]);
865        let ctx = test_workflow_ctx(provider).await;
866
867        let agent = AgentDefinition {
868            name: "single-seq".into(),
869            kind: AgentKind::Sequential(vec![llm_agent_def("only-child")]),
870        };
871
872        let outcome = dispatch_agent(agent, &ctx).await;
873        assert!(
874            matches!(&outcome, AgentOutcome::Complete { response } if response == "Solo"),
875            "Single-child Sequential should work like running the child directly, got: {outcome:?}"
876        );
877    }
878
879    #[tokio::test]
880    async fn test_single_child_parallel() {
881        let provider = MockLlm::new(vec![MockLlm::text_response("Solo parallel")]);
882        let ctx = test_workflow_ctx(provider).await;
883
884        let agent = AgentDefinition {
885            name: "single-par".into(),
886            kind: AgentKind::Parallel(vec![llm_agent_def("only-child")]),
887        };
888
889        let outcome = dispatch_agent(agent, &ctx).await;
890        assert!(
891            matches!(&outcome, AgentOutcome::Complete { response } if response == "Solo parallel"),
892            "Single-child Parallel should work, got: {outcome:?}"
893        );
894    }
895
896    #[tokio::test]
897    async fn test_deep_nesting_no_stack_overflow() {
898        // 5 levels deep: Sequential(Sequential(Sequential(Sequential(Sequential(Llm)))))
899        let provider = MockLlm::new(vec![MockLlm::text_response("Deep!")]);
900        let ctx = test_workflow_ctx(provider).await;
901
902        let mut agent = llm_agent_def("leaf");
903        for i in 0..5 {
904            agent = AgentDefinition {
905                name: format!("level-{i}"),
906                kind: AgentKind::Sequential(vec![agent]),
907            };
908        }
909
910        let outcome = dispatch_agent(agent, &ctx).await;
911        assert!(
912            matches!(&outcome, AgentOutcome::Complete { response } if response == "Deep!"),
913            "5-level nesting should work without stack overflow, got: {outcome:?}"
914        );
915    }
916}