Skip to main content

pulsehive_runtime/
workflow.rs

1//! Workflow execution engine — Sequential, Parallel, Loop agent orchestration.
2//!
3//! This module provides `dispatch_agent()`, the central routing function that handles
4//! all `AgentKind` variants. LLM agents are dispatched to the agentic loop; workflow
5//! agents are dispatched to their respective executors.
6//!
7//! ## Architecture
8//!
9//! `WorkflowContext` carries all shared resources as owned/Arc types (no lifetimes).
10//! This is critical for the Parallel executor which needs to `tokio::spawn` child tasks
11//! (requiring `'static`). When dispatching to the agentic loop, a temporary
12//! `LoopContext` is created with borrows scoped to that call.
13
14use std::collections::HashMap;
15use std::sync::Arc;
16
17use pulsedb::SubstrateProvider;
18use tracing::Instrument;
19
20use pulsehive_core::agent::{
21    AgentDefinition, AgentKind, AgentKindTag, AgentOutcome, LlmAgentConfig,
22};
23use pulsehive_core::approval::ApprovalHandler;
24use pulsehive_core::event::{EventBus, HiveEvent};
25use pulsehive_core::llm::LlmProvider;
26
27use crate::agentic_loop::{self, LoopContext, DEFAULT_MAX_ITERATIONS};
28use crate::hivemind::Task;
29
30/// Owned context for workflow execution.
31///
32/// All fields are owned or `Arc`-wrapped, making this `Clone`-able for cheap sharing
33/// across parallel child tasks. Cloning bumps reference counts — no deep copies.
34#[derive(Clone)]
35pub(crate) struct WorkflowContext {
36    /// The task being executed (description + collective ID).
37    pub task: Task,
38    /// Named LLM providers registered with HiveMind.
39    pub llm_providers: HashMap<String, Arc<dyn LlmProvider>>,
40    /// Shared substrate for experience storage and retrieval.
41    pub substrate: Arc<dyn SubstrateProvider>,
42    /// Handler for tool approval requests.
43    pub approval_handler: Arc<dyn ApprovalHandler>,
44    /// Event broadcaster for lifecycle and observability events.
45    pub event_emitter: EventBus,
46    /// Optional embedding provider for computing embeddings before storage.
47    pub embedding_provider: Option<Arc<dyn pulsehive_core::embedding::EmbeddingProvider>>,
48}
49
50/// Dispatch an agent to the appropriate executor based on its kind.
51///
52/// This is the central routing function for all agent types:
53/// - `Llm` → agentic loop (Perceive→Think→Act→Record)
54/// - `Sequential` / `Parallel` / `Loop` → workflow executors
55///
56/// Each call emits `AgentStarted` and `AgentCompleted` events, enabling
57/// observability at every nesting level.
58///
59/// Uses `Box::pin` internally for recursive dispatch (Sequential/Parallel/Loop
60/// call back into `dispatch_agent`, creating recursive futures that need heap allocation).
61pub(crate) fn dispatch_agent(
62    agent: AgentDefinition,
63    ctx: &WorkflowContext,
64) -> std::pin::Pin<Box<dyn std::future::Future<Output = AgentOutcome> + Send + '_>> {
65    let agent_name = agent.name.clone();
66    let kind_tag = agent_kind_tag(&agent.kind);
67    let span = tracing::info_span!("dispatch_agent", agent_name = %agent_name, kind = ?kind_tag);
68    Box::pin(
69        async move {
70            let agent_id = uuid::Uuid::now_v7().to_string();
71
72            // Emit lifecycle start event
73            ctx.event_emitter.emit(HiveEvent::AgentStarted {
74                timestamp_ms: pulsehive_core::event::now_ms(),
75                agent_id: agent_id.clone(),
76                name: agent.name.clone(),
77                kind: agent_kind_tag(&agent.kind),
78            });
79
80            let outcome = match agent.kind {
81                AgentKind::Llm(config) => run_llm_agent(&agent_id, *config, ctx).await,
82                AgentKind::Sequential(children) => run_sequential(children, ctx).await,
83                AgentKind::Parallel(children) => run_parallel(children, ctx).await,
84                AgentKind::Loop {
85                    agent,
86                    max_iterations,
87                } => run_loop(*agent, max_iterations, ctx).await,
88            };
89
90            // Emit lifecycle completion event
91            ctx.event_emitter.emit(HiveEvent::AgentCompleted {
92                timestamp_ms: pulsehive_core::event::now_ms(),
93                agent_id,
94                outcome: outcome.clone(),
95            });
96
97            outcome
98        }
99        .instrument(span),
100    ) // Box::pin
101}
102
103/// Execute child agents sequentially — each starts after the previous completes.
104///
105/// Children share the substrate and collective, so each child's Perceive phase
106/// naturally finds experiences recorded by all previous children. This is the
107/// "shared consciousness" model — no explicit data passing between agents.
108///
109/// Returns the last child's outcome. Stops early on error or `MaxIterationsReached`.
110/// Empty children list returns `Complete` with empty response.
111async fn run_sequential(children: Vec<AgentDefinition>, ctx: &WorkflowContext) -> AgentOutcome {
112    tracing::info!(child_count = children.len(), "Sequential workflow started");
113
114    if children.is_empty() {
115        return AgentOutcome::Complete {
116            response: String::new(),
117        };
118    }
119
120    let mut last_response = String::new();
121    for (i, child) in children.into_iter().enumerate() {
122        tracing::info!(child_index = i, child_name = %child.name, "Sequential: running child");
123        let outcome = dispatch_agent(child, ctx).await;
124        match &outcome {
125            AgentOutcome::Complete { response } => {
126                last_response = response.clone();
127            }
128            AgentOutcome::Error { .. } | AgentOutcome::MaxIterationsReached => {
129                return outcome;
130            }
131        }
132    }
133    AgentOutcome::Complete {
134        response: last_response,
135    }
136}
137
138/// Execute child agents in parallel — all spawned as concurrent Tokio tasks.
139///
140/// Children share the substrate (via `Arc`) and can perceive each other's
141/// experiences as they're written. Each child gets a cloned `WorkflowContext`
142/// (cheap: just Arc reference count bumps).
143///
144/// Returns combined responses on success. If any child errors, reports all
145/// errors but still waits for all children to complete (no early cancellation).
146async fn run_parallel(children: Vec<AgentDefinition>, ctx: &WorkflowContext) -> AgentOutcome {
147    tracing::info!(child_count = children.len(), "Parallel workflow started");
148
149    if children.is_empty() {
150        return AgentOutcome::Complete {
151            response: String::new(),
152        };
153    }
154
155    let child_count = children.len();
156    tracing::info!(child_count, "Parallel: spawning children");
157
158    let mut join_set = tokio::task::JoinSet::new();
159    for child in children {
160        let child_ctx = ctx.clone();
161        join_set.spawn(async move { dispatch_agent(child, &child_ctx).await });
162    }
163
164    let mut responses = Vec::new();
165    let mut errors = Vec::new();
166    while let Some(result) = join_set.join_next().await {
167        match result {
168            Ok(AgentOutcome::Complete { response }) => {
169                responses.push(response);
170            }
171            Ok(outcome) => {
172                errors.push(format!("{outcome:?}"));
173            }
174            Err(join_err) => {
175                errors.push(format!("Task panic: {join_err}"));
176            }
177        }
178    }
179
180    if !errors.is_empty() {
181        AgentOutcome::Error {
182            error: errors.join("; "),
183        }
184    } else {
185        AgentOutcome::Complete {
186            response: responses.join("\n"),
187        }
188    }
189}
190
191/// Completion signal: if a child agent's response contains this string,
192/// the loop terminates early. This is a convention — the LLM is instructed
193/// to include `[LOOP_DONE]` when it's satisfied with the result.
194const LOOP_DONE_SIGNAL: &str = "[LOOP_DONE]";
195
196/// Execute a child agent repeatedly up to `max_iterations` times.
197///
198/// Each iteration clones the child definition (cheap: Arc bumps for tools)
199/// and dispatches it. The loop terminates early if:
200/// - The child's response contains `[LOOP_DONE]`
201/// - The child returns an error
202///
203/// Each iteration perceives cumulative experiences from all prior iterations
204/// via the shared substrate.
205async fn run_loop(
206    child: AgentDefinition,
207    max_iterations: usize,
208    ctx: &WorkflowContext,
209) -> AgentOutcome {
210    tracing::info!(max_iterations, "Loop workflow started");
211
212    if max_iterations == 0 {
213        tracing::warn!("Loop with max_iterations=0, returning immediately");
214        return AgentOutcome::Complete {
215            response: String::new(),
216        };
217    }
218
219    let mut last_outcome = AgentOutcome::MaxIterationsReached;
220    for i in 0..max_iterations {
221        tracing::info!(
222            iteration = i + 1,
223            max = max_iterations,
224            "Loop: starting iteration"
225        );
226        let outcome = dispatch_agent(child.clone(), ctx).await;
227
228        match &outcome {
229            AgentOutcome::Complete { response } if response.contains(LOOP_DONE_SIGNAL) => {
230                tracing::info!(iteration = i + 1, "Loop: completion signal received");
231                last_outcome = outcome;
232                break;
233            }
234            AgentOutcome::Error { .. } => {
235                tracing::warn!(iteration = i + 1, "Loop: child errored, stopping");
236                return outcome;
237            }
238            _ => {
239                last_outcome = outcome;
240            }
241        }
242    }
243    last_outcome
244}
245
246/// Execute an LLM agent through the agentic loop.
247///
248/// Resolves the named LLM provider from the context, creates a scoped
249/// [`LoopContext`] with borrowed fields, and delegates to `run_agentic_loop`.
250async fn run_llm_agent(
251    agent_id: &str,
252    config: LlmAgentConfig,
253    ctx: &WorkflowContext,
254) -> AgentOutcome {
255    // Resolve the LLM provider by name
256    let provider_name = &config.llm_config.provider;
257    let provider = match ctx.llm_providers.get(provider_name) {
258        Some(p) => p.clone(),
259        None => {
260            return AgentOutcome::Error {
261                error: format!(
262                    "LLM provider '{}' not registered. Available: {:?}",
263                    provider_name,
264                    ctx.llm_providers.keys().collect::<Vec<_>>()
265                ),
266            };
267        }
268    };
269
270    // Create a scoped LoopContext — borrows from WorkflowContext are local to this call
271    agentic_loop::run_agentic_loop(
272        config,
273        LoopContext {
274            agent_id: agent_id.to_string(),
275            task: &ctx.task,
276            provider,
277            substrate: Arc::clone(&ctx.substrate),
278            approval_handler: ctx.approval_handler.as_ref(),
279            event_emitter: ctx.event_emitter.clone(),
280            max_iterations: DEFAULT_MAX_ITERATIONS,
281            embedding_provider: ctx.embedding_provider.clone(),
282        },
283    )
284    .await
285}
286
287/// Extract a compact kind tag from an agent kind (for event reporting).
288fn agent_kind_tag(kind: &AgentKind) -> AgentKindTag {
289    match kind {
290        AgentKind::Llm(_) => AgentKindTag::Llm,
291        AgentKind::Sequential(_) => AgentKindTag::Sequential,
292        AgentKind::Parallel(_) => AgentKindTag::Parallel,
293        AgentKind::Loop { .. } => AgentKindTag::Loop,
294    }
295}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300    use std::sync::Mutex;
301
302    use async_trait::async_trait;
303    use pulsehive_core::lens::Lens;
304    use pulsehive_core::llm::*;
305
306    // ── Mock LLM ─────────────────────────────────────────────────────
307
308    struct MockLlm {
309        responses: Mutex<Vec<LlmResponse>>,
310    }
311
312    impl MockLlm {
313        fn new(responses: Vec<LlmResponse>) -> Self {
314            Self {
315                responses: Mutex::new(responses),
316            }
317        }
318
319        fn text_response(content: &str) -> LlmResponse {
320            LlmResponse {
321                content: Some(content.into()),
322                tool_calls: vec![],
323                usage: TokenUsage::default(),
324            }
325        }
326    }
327
328    #[async_trait]
329    impl LlmProvider for MockLlm {
330        async fn chat(
331            &self,
332            _messages: Vec<Message>,
333            _tools: Vec<ToolDefinition>,
334            _config: &LlmConfig,
335        ) -> pulsehive_core::error::Result<LlmResponse> {
336            let mut responses = self.responses.lock().unwrap();
337            if responses.is_empty() {
338                Err(pulsehive_core::error::PulseHiveError::llm(
339                    "No more scripted responses",
340                ))
341            } else {
342                Ok(responses.remove(0))
343            }
344        }
345
346        async fn chat_stream(
347            &self,
348            _messages: Vec<Message>,
349            _tools: Vec<ToolDefinition>,
350            _config: &LlmConfig,
351        ) -> pulsehive_core::error::Result<
352            std::pin::Pin<
353                Box<
354                    dyn futures_core::Stream<Item = pulsehive_core::error::Result<LlmChunk>> + Send,
355                >,
356            >,
357        > {
358            Err(pulsehive_core::error::PulseHiveError::llm(
359                "Streaming not used in tests",
360            ))
361        }
362    }
363
364    // ── Helpers ──────────────────────────────────────────────────────
365
366    fn test_substrate() -> Arc<dyn SubstrateProvider> {
367        let dir = tempfile::tempdir().unwrap();
368        let db =
369            pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default()).unwrap();
370        Box::leak(Box::new(dir));
371        Arc::new(pulsedb::PulseDBSubstrate::from_db(db))
372    }
373
374    async fn test_workflow_ctx(provider: MockLlm) -> WorkflowContext {
375        let substrate = test_substrate();
376        let collective_id = substrate
377            .get_or_create_collective("test-workflow")
378            .await
379            .unwrap();
380
381        let mut providers: HashMap<String, Arc<dyn LlmProvider>> = HashMap::new();
382        providers.insert("mock".into(), Arc::new(provider));
383
384        WorkflowContext {
385            task: Task::with_collective("Test task", collective_id),
386            llm_providers: providers,
387            substrate,
388            approval_handler: Arc::new(pulsehive_core::approval::AutoApprove),
389            event_emitter: EventBus::default(),
390            embedding_provider: None,
391        }
392    }
393
394    fn llm_agent_def(name: &str) -> AgentDefinition {
395        AgentDefinition {
396            name: name.into(),
397            kind: AgentKind::Llm(Box::new(LlmAgentConfig {
398                system_prompt: "You are a test agent.".into(),
399                tools: vec![],
400                lens: Lens::default(),
401                llm_config: LlmConfig::new("mock", "test-model"),
402                experience_extractor: None,
403                refresh_every_n_tool_calls: None,
404            })),
405        }
406    }
407
408    // ── Tests ────────────────────────────────────────────────────────
409
410    #[tokio::test]
411    async fn test_dispatch_llm_agent_completes() {
412        let provider = MockLlm::new(vec![MockLlm::text_response("Hello from dispatch!")]);
413        let ctx = test_workflow_ctx(provider).await;
414
415        let agent = llm_agent_def("test-agent");
416        let outcome = dispatch_agent(agent, &ctx).await;
417
418        assert!(
419            matches!(&outcome, AgentOutcome::Complete { response } if response == "Hello from dispatch!"),
420            "Expected Complete, got: {outcome:?}"
421        );
422    }
423
424    #[tokio::test]
425    async fn test_dispatch_llm_agent_emits_events() {
426        let provider = MockLlm::new(vec![MockLlm::text_response("Done")]);
427        let ctx = test_workflow_ctx(provider).await;
428        let mut rx = ctx.event_emitter.subscribe();
429
430        let agent = llm_agent_def("evented-agent");
431        let _outcome = dispatch_agent(agent, &ctx).await;
432
433        // Collect all events
434        let mut events = vec![];
435        while let Ok(event) = rx.try_recv() {
436            events.push(event);
437        }
438
439        // First event should be AgentStarted
440        assert!(
441            matches!(&events[0], HiveEvent::AgentStarted { name, kind, .. }
442                if name == "evented-agent" && *kind == AgentKindTag::Llm),
443            "Expected AgentStarted, got: {:?}",
444            events.first()
445        );
446
447        // Last event should be AgentCompleted
448        assert!(
449            matches!(
450                events.last(),
451                Some(HiveEvent::AgentCompleted {
452                    outcome: AgentOutcome::Complete { .. },
453                    ..
454                })
455            ),
456            "Expected AgentCompleted, got: {:?}",
457            events.last()
458        );
459    }
460
461    #[tokio::test]
462    async fn test_dispatch_missing_provider_returns_error() {
463        let provider = MockLlm::new(vec![]);
464        let ctx = test_workflow_ctx(provider).await;
465
466        // Agent uses a provider name that doesn't exist
467        let agent = AgentDefinition {
468            name: "bad-provider".into(),
469            kind: AgentKind::Llm(Box::new(LlmAgentConfig {
470                system_prompt: "test".into(),
471                tools: vec![],
472                lens: Lens::default(),
473                llm_config: LlmConfig::new("nonexistent", "model"),
474                experience_extractor: None,
475                refresh_every_n_tool_calls: None,
476            })),
477        };
478
479        let outcome = dispatch_agent(agent, &ctx).await;
480        assert!(
481            matches!(&outcome, AgentOutcome::Error { error } if error.contains("nonexistent")),
482            "Expected provider error, got: {outcome:?}"
483        );
484    }
485
486    #[tokio::test]
487    async fn test_sequential_empty_children() {
488        let provider = MockLlm::new(vec![]);
489        let ctx = test_workflow_ctx(provider).await;
490
491        let agent = AgentDefinition {
492            name: "seq".into(),
493            kind: AgentKind::Sequential(vec![]),
494        };
495
496        let outcome = dispatch_agent(agent, &ctx).await;
497        assert!(
498            matches!(&outcome, AgentOutcome::Complete { response } if response.is_empty()),
499            "Empty Sequential should return Complete with empty response, got: {outcome:?}"
500        );
501    }
502
503    #[tokio::test]
504    async fn test_sequential_two_children_in_order() {
505        let provider = MockLlm::new(vec![
506            MockLlm::text_response("First done"),
507            MockLlm::text_response("Second done"),
508        ]);
509        let ctx = test_workflow_ctx(provider).await;
510
511        let agent = AgentDefinition {
512            name: "pipeline".into(),
513            kind: AgentKind::Sequential(vec![llm_agent_def("step-1"), llm_agent_def("step-2")]),
514        };
515
516        let outcome = dispatch_agent(agent, &ctx).await;
517        assert!(
518            matches!(&outcome, AgentOutcome::Complete { response } if response == "Second done"),
519            "Sequential should return last child's response, got: {outcome:?}"
520        );
521    }
522
523    #[tokio::test]
524    async fn test_sequential_error_stops_execution() {
525        // Only one response — first child gets it, second would fail if reached
526        let provider = MockLlm::new(vec![]);
527        let ctx = test_workflow_ctx(provider).await;
528
529        let agent = AgentDefinition {
530            name: "failing-seq".into(),
531            kind: AgentKind::Sequential(vec![
532                llm_agent_def("will-error"),
533                llm_agent_def("should-not-run"),
534            ]),
535        };
536
537        let outcome = dispatch_agent(agent, &ctx).await;
538        // First child errors (no LLM responses), second never runs
539        assert!(
540            matches!(&outcome, AgentOutcome::Error { .. }),
541            "Sequential should stop on first error, got: {outcome:?}"
542        );
543    }
544
545    #[tokio::test]
546    async fn test_workflow_context_is_clone() {
547        let provider = MockLlm::new(vec![]);
548        let ctx = test_workflow_ctx(provider).await;
549        let _cloned = ctx.clone(); // Compile-time proof that Clone works
550    }
551
552    #[tokio::test]
553    async fn test_parallel_empty_children() {
554        let provider = MockLlm::new(vec![]);
555        let ctx = test_workflow_ctx(provider).await;
556
557        let agent = AgentDefinition {
558            name: "par".into(),
559            kind: AgentKind::Parallel(vec![]),
560        };
561
562        let outcome = dispatch_agent(agent, &ctx).await;
563        assert!(
564            matches!(&outcome, AgentOutcome::Complete { response } if response.is_empty()),
565            "Empty Parallel should return Complete with empty response, got: {outcome:?}"
566        );
567    }
568
569    #[tokio::test]
570    async fn test_parallel_two_children_both_complete() {
571        let provider = MockLlm::new(vec![
572            MockLlm::text_response("Alpha result"),
573            MockLlm::text_response("Beta result"),
574        ]);
575        let ctx = test_workflow_ctx(provider).await;
576
577        let agent = AgentDefinition {
578            name: "par".into(),
579            kind: AgentKind::Parallel(vec![llm_agent_def("alpha"), llm_agent_def("beta")]),
580        };
581
582        let outcome = dispatch_agent(agent, &ctx).await;
583        match &outcome {
584            AgentOutcome::Complete { response } => {
585                // Both responses should appear (order may vary due to concurrency)
586                assert!(
587                    response.contains("result"),
588                    "Should contain child responses, got: {response}"
589                );
590            }
591            other => panic!("Expected Complete, got: {other:?}"),
592        }
593    }
594
595    #[tokio::test]
596    async fn test_parallel_one_error_reports_all() {
597        // Only one response — one child succeeds, other errors
598        let provider = MockLlm::new(vec![MockLlm::text_response("I succeeded")]);
599        let ctx = test_workflow_ctx(provider).await;
600
601        let agent = AgentDefinition {
602            name: "par-err".into(),
603            kind: AgentKind::Parallel(vec![
604                llm_agent_def("will-succeed"),
605                llm_agent_def("will-error"),
606            ]),
607        };
608
609        let outcome = dispatch_agent(agent, &ctx).await;
610        assert!(
611            matches!(&outcome, AgentOutcome::Error { .. }),
612            "Parallel with one error should return Error, got: {outcome:?}"
613        );
614    }
615
616    #[tokio::test]
617    async fn test_loop_zero_iterations() {
618        let provider = MockLlm::new(vec![]);
619        let ctx = test_workflow_ctx(provider).await;
620
621        let agent = AgentDefinition {
622            name: "loop-0".into(),
623            kind: AgentKind::Loop {
624                agent: Box::new(llm_agent_def("child")),
625                max_iterations: 0,
626            },
627        };
628
629        let outcome = dispatch_agent(agent, &ctx).await;
630        assert!(
631            matches!(&outcome, AgentOutcome::Complete { response } if response.is_empty()),
632            "Loop with 0 iterations should return Complete empty, got: {outcome:?}"
633        );
634    }
635
636    #[tokio::test]
637    async fn test_loop_runs_n_times() {
638        let provider = MockLlm::new(vec![
639            MockLlm::text_response("Iteration 1"),
640            MockLlm::text_response("Iteration 2"),
641        ]);
642        let ctx = test_workflow_ctx(provider).await;
643
644        let agent = AgentDefinition {
645            name: "loop-2".into(),
646            kind: AgentKind::Loop {
647                agent: Box::new(llm_agent_def("worker")),
648                max_iterations: 2,
649            },
650        };
651
652        let outcome = dispatch_agent(agent, &ctx).await;
653        // After 2 iterations without [LOOP_DONE], returns MaxIterationsReached
654        // But since both completed successfully, last_outcome = last Complete
655        assert!(
656            matches!(&outcome, AgentOutcome::Complete { response } if response == "Iteration 2"),
657            "Loop should return last iteration's response, got: {outcome:?}"
658        );
659    }
660
661    #[tokio::test]
662    async fn test_loop_early_exit_on_done_signal() {
663        let provider = MockLlm::new(vec![
664            MockLlm::text_response("Still working..."),
665            MockLlm::text_response("All done [LOOP_DONE]"),
666            MockLlm::text_response("Should not reach this"),
667        ]);
668        let ctx = test_workflow_ctx(provider).await;
669
670        let agent = AgentDefinition {
671            name: "loop-done".into(),
672            kind: AgentKind::Loop {
673                agent: Box::new(llm_agent_def("worker")),
674                max_iterations: 5,
675            },
676        };
677
678        let outcome = dispatch_agent(agent, &ctx).await;
679        assert!(
680            matches!(&outcome, AgentOutcome::Complete { response } if response.contains("[LOOP_DONE]")),
681            "Loop should exit on LOOP_DONE signal, got: {outcome:?}"
682        );
683    }
684
685    #[tokio::test]
686    async fn test_loop_error_stops() {
687        // One response, then error (no more responses)
688        let provider = MockLlm::new(vec![MockLlm::text_response("First iteration ok")]);
689        let ctx = test_workflow_ctx(provider).await;
690
691        let agent = AgentDefinition {
692            name: "loop-err".into(),
693            kind: AgentKind::Loop {
694                agent: Box::new(llm_agent_def("worker")),
695                max_iterations: 5,
696            },
697        };
698
699        let outcome = dispatch_agent(agent, &ctx).await;
700        assert!(
701            matches!(&outcome, AgentOutcome::Error { .. }),
702            "Loop should stop on error, got: {outcome:?}"
703        );
704    }
705
706    // ── Ticket #46: Sequential event ordering ────────────────────────
707
708    #[tokio::test]
709    async fn test_sequential_events_ordered() {
710        let provider = MockLlm::new(vec![
711            MockLlm::text_response("A done"),
712            MockLlm::text_response("B done"),
713        ]);
714        let ctx = test_workflow_ctx(provider).await;
715        let mut rx = ctx.event_emitter.subscribe();
716
717        let agent = AgentDefinition {
718            name: "seq-events".into(),
719            kind: AgentKind::Sequential(vec![llm_agent_def("child-a"), llm_agent_def("child-b")]),
720        };
721
722        let _outcome = dispatch_agent(agent, &ctx).await;
723
724        // Collect events
725        let mut events = vec![];
726        while let Ok(event) = rx.try_recv() {
727            events.push(event);
728        }
729
730        // Find AgentStarted events for the LLM children (not the Sequential wrapper)
731        let started_names: Vec<&str> = events
732            .iter()
733            .filter_map(|e| match e {
734                HiveEvent::AgentStarted {
735                    name,
736                    kind: AgentKindTag::Llm,
737                    ..
738                } => Some(name.as_str()),
739                _ => None,
740            })
741            .collect();
742
743        // child-a should start before child-b (sequential ordering)
744        assert_eq!(
745            started_names,
746            vec!["child-a", "child-b"],
747            "Sequential children should start in order"
748        );
749    }
750
751    // ── Ticket #47: Parallel event verification ──────────────────────
752
753    #[tokio::test]
754    async fn test_parallel_events_for_all_children() {
755        let provider = MockLlm::new(vec![
756            MockLlm::text_response("Alpha"),
757            MockLlm::text_response("Beta"),
758        ]);
759        let ctx = test_workflow_ctx(provider).await;
760        let mut rx = ctx.event_emitter.subscribe();
761
762        let agent = AgentDefinition {
763            name: "par-events".into(),
764            kind: AgentKind::Parallel(vec![llm_agent_def("alpha"), llm_agent_def("beta")]),
765        };
766
767        let _outcome = dispatch_agent(agent, &ctx).await;
768
769        let mut events = vec![];
770        while let Ok(event) = rx.try_recv() {
771            events.push(event);
772        }
773
774        // Both children should have AgentStarted events
775        let started_names: Vec<&str> = events
776            .iter()
777            .filter_map(|e| match e {
778                HiveEvent::AgentStarted {
779                    name,
780                    kind: AgentKindTag::Llm,
781                    ..
782                } => Some(name.as_str()),
783                _ => None,
784            })
785            .collect();
786
787        assert!(
788            started_names.contains(&"alpha"),
789            "alpha should have AgentStarted"
790        );
791        assert!(
792            started_names.contains(&"beta"),
793            "beta should have AgentStarted"
794        );
795
796        // Both should have AgentCompleted events
797        let completed_count = events
798            .iter()
799            .filter(|e| {
800                matches!(
801                    e,
802                    HiveEvent::AgentCompleted {
803                        outcome: AgentOutcome::Complete { .. },
804                        ..
805                    }
806                )
807            })
808            .count();
809        assert!(
810            completed_count >= 2,
811            "Both children should complete, got {completed_count}"
812        );
813    }
814
815    // ── Ticket #48: Loop additional tests ────────────────────────────
816
817    #[tokio::test]
818    async fn test_loop_single_iteration() {
819        let provider = MockLlm::new(vec![MockLlm::text_response("Only once")]);
820        let ctx = test_workflow_ctx(provider).await;
821
822        let agent = AgentDefinition {
823            name: "loop-1".into(),
824            kind: AgentKind::Loop {
825                agent: Box::new(llm_agent_def("worker")),
826                max_iterations: 1,
827            },
828        };
829
830        let outcome = dispatch_agent(agent, &ctx).await;
831        assert!(
832            matches!(&outcome, AgentOutcome::Complete { response } if response == "Only once"),
833            "Loop max=1 should run exactly once, got: {outcome:?}"
834        );
835    }
836
837    #[tokio::test]
838    async fn test_loop_all_iterations_complete_returns_last() {
839        let provider = MockLlm::new(vec![
840            MockLlm::text_response("Iter 1"),
841            MockLlm::text_response("Iter 2"),
842            MockLlm::text_response("Iter 3"),
843        ]);
844        let ctx = test_workflow_ctx(provider).await;
845
846        let agent = AgentDefinition {
847            name: "loop-3".into(),
848            kind: AgentKind::Loop {
849                agent: Box::new(llm_agent_def("worker")),
850                max_iterations: 3,
851            },
852        };
853
854        let outcome = dispatch_agent(agent, &ctx).await;
855        // All 3 completed without [LOOP_DONE], returns last Complete
856        assert!(
857            matches!(&outcome, AgentOutcome::Complete { response } if response == "Iter 3"),
858            "Loop should return last iteration's response, got: {outcome:?}"
859        );
860    }
861
862    // ── Ticket #50: Edge cases ───────────────────────────────────────
863
864    #[tokio::test]
865    async fn test_single_child_sequential() {
866        let provider = MockLlm::new(vec![MockLlm::text_response("Solo")]);
867        let ctx = test_workflow_ctx(provider).await;
868
869        let agent = AgentDefinition {
870            name: "single-seq".into(),
871            kind: AgentKind::Sequential(vec![llm_agent_def("only-child")]),
872        };
873
874        let outcome = dispatch_agent(agent, &ctx).await;
875        assert!(
876            matches!(&outcome, AgentOutcome::Complete { response } if response == "Solo"),
877            "Single-child Sequential should work like running the child directly, got: {outcome:?}"
878        );
879    }
880
881    #[tokio::test]
882    async fn test_single_child_parallel() {
883        let provider = MockLlm::new(vec![MockLlm::text_response("Solo parallel")]);
884        let ctx = test_workflow_ctx(provider).await;
885
886        let agent = AgentDefinition {
887            name: "single-par".into(),
888            kind: AgentKind::Parallel(vec![llm_agent_def("only-child")]),
889        };
890
891        let outcome = dispatch_agent(agent, &ctx).await;
892        assert!(
893            matches!(&outcome, AgentOutcome::Complete { response } if response == "Solo parallel"),
894            "Single-child Parallel should work, got: {outcome:?}"
895        );
896    }
897
898    #[tokio::test]
899    async fn test_deep_nesting_no_stack_overflow() {
900        // 5 levels deep: Sequential(Sequential(Sequential(Sequential(Sequential(Llm)))))
901        let provider = MockLlm::new(vec![MockLlm::text_response("Deep!")]);
902        let ctx = test_workflow_ctx(provider).await;
903
904        let mut agent = llm_agent_def("leaf");
905        for i in 0..5 {
906            agent = AgentDefinition {
907                name: format!("level-{i}"),
908                kind: AgentKind::Sequential(vec![agent]),
909            };
910        }
911
912        let outcome = dispatch_agent(agent, &ctx).await;
913        assert!(
914            matches!(&outcome, AgentOutcome::Complete { response } if response == "Deep!"),
915            "5-level nesting should work without stack overflow, got: {outcome:?}"
916        );
917    }
918}