Skip to main content

pulsehive_runtime/
agentic_loop.rs

1//! Agentic loop engine — the Perceive→Think→Act→Record cycle.
2//!
3//! This module implements the core execution loop for LLM agents. Each agent
4//! runs through: perceive substrate → think via LLM → act on tool calls → record experiences.
5//!
6//! The loop is driven by [`run_agentic_loop`], called from `HiveMind::deploy()`.
7
8use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::Instant;
11
12use pulsedb::SubstrateProvider;
13use tracing::Instrument;
14
15use pulsehive_core::agent::{AgentOutcome, ExperienceExtractor, LlmAgentConfig};
16use pulsehive_core::approval::{ApprovalHandler, ApprovalResult, PendingAction};
17use pulsehive_core::event::{EventEmitter, HiveEvent};
18use pulsehive_core::lens::Lens;
19use pulsehive_core::llm::{LlmConfig, LlmProvider, Message, ToolCall, ToolDefinition};
20use pulsehive_core::tool::{Tool, ToolContext, ToolResult};
21
22use crate::hivemind::Task;
23
24/// Default maximum iterations for the agentic loop.
25pub const DEFAULT_MAX_ITERATIONS: usize = 25;
26
27/// Runtime context for the agentic loop, grouping shared resources.
28pub struct LoopContext<'a> {
29    pub agent_id: String,
30    pub task: &'a Task,
31    pub provider: Arc<dyn LlmProvider>,
32    pub substrate: Arc<dyn SubstrateProvider>,
33    pub approval_handler: &'a dyn ApprovalHandler,
34    pub event_emitter: EventEmitter,
35    pub max_iterations: usize,
36    /// Optional embedding provider for computing embeddings before storage.
37    pub embedding_provider: Option<Arc<dyn pulsehive_core::embedding::EmbeddingProvider>>,
38}
39
40/// Run the agentic loop for a single LLM agent.
41///
42/// Executes the Perceive→Think→Act→Record cycle until the LLM produces
43/// a final response (no tool calls) or `max_iterations` is reached.
44pub async fn run_agentic_loop(config: LlmAgentConfig, ctx: LoopContext<'_>) -> AgentOutcome {
45    let LlmAgentConfig {
46        system_prompt,
47        tools,
48        lens,
49        llm_config,
50        experience_extractor,
51        refresh_every_n_tool_calls,
52    } = config;
53
54    // Build tool lookup map and definitions for LLM
55    let tool_map: HashMap<&str, &dyn Tool> = tools
56        .iter()
57        .map(|t| (t.name(), t.as_ref() as &dyn Tool))
58        .collect();
59    let tool_defs: Vec<ToolDefinition> = tools
60        .iter()
61        .map(|t| ToolDefinition::from_tool(t.as_ref()))
62        .collect();
63
64    // 1. PERCEIVE — query substrate through lens
65    let context_messages = perceive(
66        ctx.substrate.as_ref(),
67        &lens,
68        ctx.task,
69        &ctx.event_emitter,
70        &ctx.agent_id,
71    )
72    .instrument(tracing::info_span!("perceive", agent_id = %ctx.agent_id))
73    .await;
74
75    // 2. Build initial conversation
76    let mut messages: Vec<Message> = Vec::new();
77    messages.push(Message::system(&system_prompt));
78    messages.extend(context_messages);
79    messages.push(Message::user(&ctx.task.description));
80
81    // 3. THINK → ACT loop (with optional mid-task refresh)
82    let outcome = think_act_loop(
83        &ctx.agent_id,
84        &mut messages,
85        &tool_map,
86        &tool_defs,
87        &llm_config,
88        &ctx,
89        &lens,
90        refresh_every_n_tool_calls,
91    )
92    .await;
93
94    // 4. RECORD — extract experiences and store in substrate
95    record(&messages, &outcome, &ctx, experience_extractor.as_deref())
96        .instrument(tracing::info_span!("record", agent_id = %ctx.agent_id))
97        .await;
98
99    outcome
100}
101
102/// The core Think→Act loop. Returns when LLM produces a final response or max iterations hit.
103///
104/// When `refresh_every` is `Some(n)`, re-runs the Perceive phase every `n` tool calls,
105/// appending fresh context to the conversation. This enables parallel agents to perceive
106/// each other's experiences mid-task.
107#[allow(clippy::too_many_arguments)]
108async fn think_act_loop(
109    agent_id: &str,
110    messages: &mut Vec<Message>,
111    tool_map: &HashMap<&str, &dyn Tool>,
112    tool_defs: &[ToolDefinition],
113    llm_config: &LlmConfig,
114    ctx: &LoopContext<'_>,
115    lens: &Lens,
116    refresh_every: Option<usize>,
117) -> AgentOutcome {
118    let mut tool_calls_since_refresh: usize = 0;
119
120    for iteration in 1..=ctx.max_iterations {
121        let think_span = tracing::info_span!(
122            "think",
123            agent_id = %agent_id,
124            iteration,
125            model = %llm_config.model,
126            message_count = messages.len(),
127        );
128
129        // ── THINK: call LLM ──────────────────────────────────────────
130        ctx.event_emitter.emit(HiveEvent::LlmCallStarted {
131            timestamp_ms: pulsehive_core::event::now_ms(),
132            agent_id: agent_id.to_string(),
133            model: llm_config.model.clone(),
134            message_count: messages.len(),
135        });
136
137        let start = Instant::now();
138        let response = ctx
139            .provider
140            .chat(messages.clone(), tool_defs.to_vec(), llm_config)
141            .instrument(think_span)
142            .await;
143        let duration_ms = start.elapsed().as_millis() as u64;
144
145        let (input_tokens, output_tokens) = match &response {
146            Ok(r) => (r.usage.input_tokens, r.usage.output_tokens),
147            Err(_) => (0, 0),
148        };
149        ctx.event_emitter.emit(HiveEvent::LlmCallCompleted {
150            timestamp_ms: pulsehive_core::event::now_ms(),
151            agent_id: agent_id.to_string(),
152            model: llm_config.model.clone(),
153            duration_ms,
154            input_tokens,
155            output_tokens,
156        });
157
158        let response = match response {
159            Ok(r) => r,
160            Err(e) => {
161                tracing::error!(agent_id = %agent_id, error = %e, "LLM call failed");
162                return AgentOutcome::Error {
163                    error: e.to_string(),
164                };
165            }
166        };
167
168        // ── ACT: handle response ─────────────────────────────────────
169        if response.tool_calls.is_empty() {
170            let content = response.content.unwrap_or_default();
171            tracing::debug!(agent_id = %agent_id, "Final response received");
172            messages.push(Message::assistant(&content));
173            return AgentOutcome::Complete { response: content };
174        }
175
176        tracing::debug!(
177            agent_id = %agent_id,
178            tool_count = response.tool_calls.len(),
179            "Tool calls received"
180        );
181
182        messages.push(Message::assistant_with_tool_calls(
183            response.tool_calls.clone(),
184        ));
185
186        for tool_call in &response.tool_calls {
187            let result = execute_tool_call(
188                agent_id,
189                tool_call,
190                tool_map,
191                &ctx.substrate,
192                ctx.approval_handler,
193                &ctx.event_emitter,
194                &ctx.task.collective_id,
195            )
196            .instrument(tracing::info_span!("act", agent_id = %agent_id, tool = %tool_call.name))
197            .await;
198
199            messages.push(Message::tool_result(&tool_call.id, result.to_content()));
200            tool_calls_since_refresh += 1;
201        }
202
203        // ── MID-TASK REFRESH: re-perceive substrate if threshold reached ──
204        if let Some(interval) = refresh_every {
205            if tool_calls_since_refresh >= interval {
206                tracing::info!(
207                    agent_id = %agent_id,
208                    tool_calls = tool_calls_since_refresh,
209                    "Mid-task substrate refresh"
210                );
211                let refreshed = perceive(
212                    ctx.substrate.as_ref(),
213                    lens,
214                    ctx.task,
215                    &ctx.event_emitter,
216                    agent_id,
217                )
218                .instrument(tracing::info_span!("perceive", agent_id = %agent_id, refresh = true))
219                .await;
220                messages.extend(refreshed);
221                tool_calls_since_refresh = 0;
222            }
223        }
224    }
225
226    tracing::warn!(agent_id = %agent_id, max = ctx.max_iterations, "Max iterations reached");
227    AgentOutcome::MaxIterationsReached
228}
229
230/// Execute a single tool call with approval check.
231async fn execute_tool_call(
232    agent_id: &str,
233    tool_call: &ToolCall,
234    tool_map: &HashMap<&str, &dyn Tool>,
235    substrate: &Arc<dyn SubstrateProvider>,
236    approval_handler: &dyn ApprovalHandler,
237    event_emitter: &EventEmitter,
238    collective_id: &pulsedb::CollectiveId,
239) -> ToolResult {
240    let Some(&tool) = tool_map.get(tool_call.name.as_str()) else {
241        tracing::warn!(agent_id = %agent_id, tool = %tool_call.name, "Tool not found");
242        return ToolResult::error(format!("Tool '{}' not found", tool_call.name));
243    };
244
245    // Check approval if required
246    if tool.requires_approval() {
247        event_emitter.emit(HiveEvent::ToolApprovalRequested {
248            timestamp_ms: pulsehive_core::event::now_ms(),
249            agent_id: agent_id.to_string(),
250            tool_name: tool_call.name.clone(),
251            description: format!("Execute {} with {:?}", tool_call.name, tool_call.arguments),
252        });
253
254        let action = PendingAction {
255            agent_id: agent_id.to_string(),
256            tool_name: tool_call.name.clone(),
257            params: tool_call.arguments.clone(),
258            description: format!("Execute {} tool", tool_call.name),
259        };
260
261        match approval_handler.request_approval(&action).await {
262            Ok(ApprovalResult::Approved) => {} // proceed
263            Ok(ApprovalResult::Denied { reason }) => {
264                return ToolResult::error(format!("Tool execution denied: {reason}"));
265            }
266            Ok(ApprovalResult::Modified { new_params }) => {
267                // Execute with modified params
268                return execute_tool_inner(
269                    agent_id,
270                    &tool_call.name,
271                    new_params,
272                    tool,
273                    substrate,
274                    event_emitter,
275                    collective_id,
276                )
277                .await;
278            }
279            Err(e) => {
280                return ToolResult::error(format!("Approval handler error: {e}"));
281            }
282        }
283    }
284
285    execute_tool_inner(
286        agent_id,
287        &tool_call.name,
288        tool_call.arguments.clone(),
289        tool,
290        substrate,
291        event_emitter,
292        collective_id,
293    )
294    .await
295}
296
297/// Execute a tool and emit events.
298async fn execute_tool_inner(
299    agent_id: &str,
300    tool_name: &str,
301    params: serde_json::Value,
302    tool: &dyn Tool,
303    substrate: &Arc<dyn SubstrateProvider>,
304    event_emitter: &EventEmitter,
305    collective_id: &pulsedb::CollectiveId,
306) -> ToolResult {
307    let params_str = serde_json::to_string(&params).unwrap_or_default();
308    event_emitter.emit(HiveEvent::ToolCallStarted {
309        timestamp_ms: pulsehive_core::event::now_ms(),
310        agent_id: agent_id.to_string(),
311        tool_name: tool_name.to_string(),
312        params: params_str,
313    });
314
315    let start = Instant::now();
316    let context = ToolContext {
317        agent_id: agent_id.to_string(),
318        collective_id: *collective_id,
319        substrate: Arc::clone(substrate),
320        event_emitter: event_emitter.clone(),
321    };
322
323    let result = match tool
324        .execute(params, &context)
325        .instrument(tracing::debug_span!("tool_execute", tool = %tool_name))
326        .await
327    {
328        Ok(result) => result,
329        Err(e) => {
330            tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
331            ToolResult::error(e.to_string())
332        }
333    };
334
335    let duration_ms = start.elapsed().as_millis() as u64;
336    tracing::debug!(tool = %tool_name, duration_ms, "Tool completed");
337    let result_preview: String = result.to_content().chars().take(200).collect();
338    event_emitter.emit(HiveEvent::ToolCallCompleted {
339        timestamp_ms: pulsehive_core::event::now_ms(),
340        agent_id: agent_id.to_string(),
341        tool_name: tool_name.to_string(),
342        duration_ms,
343        result_preview,
344    });
345
346    result
347}
348
349// ── Perceive Phase ───────────────────────────────────────────────────
350
351/// Query the substrate through the agent's lens and assemble budget-aware context.
352async fn perceive(
353    substrate: &dyn SubstrateProvider,
354    lens: &Lens,
355    task: &Task,
356    event_emitter: &EventEmitter,
357    agent_id: &str,
358) -> Vec<Message> {
359    use crate::perception;
360    use pulsehive_core::context::ContextBudget;
361
362    let budget = ContextBudget::from_lens(lens);
363    let messages = match perception::assemble_context(substrate, lens, task.collective_id, &budget)
364        .await
365    {
366        Ok(msgs) => msgs,
367        Err(e) => {
368            tracing::warn!(agent_id = %agent_id, error = %e, "Perception failed, continuing without context");
369            vec![]
370        }
371    };
372
373    // Count context messages that contain experience data (non-system, non-empty)
374    let experience_count = messages
375        .iter()
376        .filter(|m| matches!(m, pulsehive_core::llm::Message::System { content } if !content.is_empty()))
377        .count()
378        .max(1)
379        .saturating_sub(1); // Subtract the system message itself
380    event_emitter.emit(HiveEvent::SubstratePerceived {
381        timestamp_ms: pulsehive_core::event::now_ms(),
382        agent_id: agent_id.to_string(),
383        experience_count,
384        insight_count: 0,
385    });
386
387    messages
388}
389
390// ── Record Phase ─────────────────────────────────────────────────────
391
392/// Extract experiences from the conversation and store in substrate.
393async fn record(
394    conversation: &[Message],
395    outcome: &AgentOutcome,
396    ctx: &LoopContext<'_>,
397    extractor: Option<&dyn ExperienceExtractor>,
398) {
399    use crate::experience::DefaultExperienceExtractor;
400    use pulsehive_core::agent::ExtractionContext;
401
402    let extraction_ctx = ExtractionContext {
403        agent_id: ctx.agent_id.clone(),
404        collective_id: ctx.task.collective_id,
405        task_description: ctx.task.description.clone(),
406    };
407
408    let default_extractor = DefaultExperienceExtractor;
409    let extractor: &dyn ExperienceExtractor = extractor.unwrap_or(&default_extractor);
410
411    let experiences = extractor
412        .extract(conversation, outcome, &extraction_ctx)
413        .await;
414
415    let count = experiences.len();
416    for mut exp in experiences {
417        // Compute embedding via provider if available and not already set
418        if let Some(provider) = &ctx.embedding_provider {
419            if exp.embedding.is_none() {
420                let start = std::time::Instant::now();
421                match provider.embed(&exp.content).await {
422                    Ok(embedding) => {
423                        let duration_ms = start.elapsed().as_millis() as u64;
424                        let dimensions = embedding.len();
425                        exp.embedding = Some(embedding);
426                        ctx.event_emitter.emit(HiveEvent::EmbeddingComputed {
427                            timestamp_ms: pulsehive_core::event::now_ms(),
428                            agent_id: ctx.agent_id.clone(),
429                            dimensions,
430                            duration_ms,
431                        });
432                    }
433                    Err(e) => {
434                        tracing::warn!(
435                            agent_id = %ctx.agent_id,
436                            error = %e,
437                            "Failed to compute embedding, storing without"
438                        );
439                    }
440                }
441            }
442        }
443
444        // Capture metadata before move into store_experience
445        let content_preview: String = exp.content.chars().take(200).collect();
446        let experience_type = format!("{:?}", exp.experience_type);
447        let importance = exp.importance;
448
449        match ctx.substrate.store_experience(exp).await {
450            Ok(id) => {
451                ctx.event_emitter.emit(HiveEvent::ExperienceRecorded {
452                    timestamp_ms: pulsehive_core::event::now_ms(),
453                    experience_id: id,
454                    agent_id: ctx.agent_id.clone(),
455                    content_preview,
456                    experience_type,
457                    importance,
458                });
459            }
460            Err(e) => {
461                tracing::warn!(
462                    agent_id = %ctx.agent_id,
463                    error = %e,
464                    "Failed to store experience"
465                );
466            }
467        }
468    }
469
470    if count > 0 {
471        tracing::debug!(agent_id = %ctx.agent_id, count = count, "Recorded experiences");
472    }
473}
474
475#[cfg(test)]
476mod tests {
477    use super::*;
478    use async_trait::async_trait;
479    use futures_core::Stream;
480    use pulsedb::CollectiveId;
481    use pulsehive_core::error::{PulseHiveError, Result};
482    use pulsehive_core::llm::{LlmChunk, LlmResponse, TokenUsage};
483    use std::pin::Pin;
484    use std::sync::Mutex;
485
486    // ── Mock LLM Provider ────────────────────────────────────────────
487
488    /// Mock LLM that returns scripted responses in order.
489    struct MockLlm {
490        responses: Mutex<Vec<LlmResponse>>,
491    }
492
493    impl MockLlm {
494        fn new(responses: Vec<LlmResponse>) -> Self {
495            Self {
496                responses: Mutex::new(responses),
497            }
498        }
499
500        fn text_response(content: &str) -> LlmResponse {
501            LlmResponse {
502                content: Some(content.into()),
503                tool_calls: vec![],
504                usage: TokenUsage::default(),
505            }
506        }
507
508        fn tool_call_response(id: &str, name: &str, args: serde_json::Value) -> LlmResponse {
509            LlmResponse {
510                content: None,
511                tool_calls: vec![ToolCall {
512                    id: id.into(),
513                    name: name.into(),
514                    arguments: args,
515                }],
516                usage: TokenUsage::default(),
517            }
518        }
519    }
520
521    #[async_trait]
522    impl LlmProvider for MockLlm {
523        async fn chat(
524            &self,
525            _messages: Vec<Message>,
526            _tools: Vec<ToolDefinition>,
527            _config: &LlmConfig,
528        ) -> Result<LlmResponse> {
529            let mut responses = self.responses.lock().unwrap();
530            if responses.is_empty() {
531                Err(PulseHiveError::llm("No more scripted responses"))
532            } else {
533                Ok(responses.remove(0))
534            }
535        }
536
537        async fn chat_stream(
538            &self,
539            _messages: Vec<Message>,
540            _tools: Vec<ToolDefinition>,
541            _config: &LlmConfig,
542        ) -> Result<Pin<Box<dyn Stream<Item = Result<LlmChunk>> + Send>>> {
543            Err(PulseHiveError::llm("Streaming not used in loop"))
544        }
545    }
546
547    // ── Mock Tool ────────────────────────────────────────────────────
548
549    struct EchoTool;
550
551    #[async_trait]
552    impl Tool for EchoTool {
553        fn name(&self) -> &str {
554            "echo"
555        }
556        fn description(&self) -> &str {
557            "Echoes input"
558        }
559        fn parameters(&self) -> serde_json::Value {
560            serde_json::json!({"type": "object", "properties": {"text": {"type": "string"}}})
561        }
562        async fn execute(
563            &self,
564            params: serde_json::Value,
565            _ctx: &ToolContext,
566        ) -> Result<ToolResult> {
567            let text = params["text"].as_str().unwrap_or("no text");
568            Ok(ToolResult::text(format!("Echo: {text}")))
569        }
570    }
571
572    // ── Helper ───────────────────────────────────────────────────────
573
574    fn test_config(tools: Vec<Arc<dyn Tool>>) -> LlmAgentConfig {
575        LlmAgentConfig {
576            system_prompt: "You are a test agent.".into(),
577            tools,
578            lens: pulsehive_core::lens::Lens::default(),
579            llm_config: LlmConfig::new("mock", "test-model"),
580            experience_extractor: None,
581            refresh_every_n_tool_calls: None,
582        }
583    }
584
585    fn test_task() -> Task {
586        Task {
587            description: "Test task".into(),
588            collective_id: CollectiveId::new(),
589        }
590    }
591
592    fn test_substrate() -> Arc<dyn SubstrateProvider> {
593        // Use a real PulseDB with tempfile for substrate
594        let dir = tempfile::tempdir().unwrap();
595        let db =
596            pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default()).unwrap();
597        // Leak the tempdir so it lives long enough
598        let dir = Box::leak(Box::new(dir));
599        let _ = dir;
600        Arc::new(pulsedb::PulseDBSubstrate::from_db(db))
601    }
602
603    // ── Tests ────────────────────────────────────────────────────────
604
605    #[tokio::test]
606    async fn test_text_only_response() {
607        let provider = Arc::new(MockLlm::new(vec![MockLlm::text_response(
608            "The answer is 42.",
609        )]));
610        let config = test_config(vec![]);
611        let task = test_task();
612        let substrate = test_substrate();
613        let emitter = EventEmitter::default();
614        let approval = pulsehive_core::approval::AutoApprove;
615
616        let outcome = run_agentic_loop(
617            config,
618            LoopContext {
619                agent_id: "agent-1".into(),
620                task: &task,
621                provider,
622                substrate,
623                approval_handler: &approval,
624                event_emitter: emitter,
625                max_iterations: DEFAULT_MAX_ITERATIONS,
626                embedding_provider: None,
627            },
628        )
629        .await;
630
631        assert!(
632            matches!(&outcome, AgentOutcome::Complete { response } if response == "The answer is 42.")
633        );
634    }
635
636    #[tokio::test]
637    async fn test_tool_call_then_response() {
638        let provider = Arc::new(MockLlm::new(vec![
639            MockLlm::tool_call_response("call_1", "echo", serde_json::json!({"text": "hello"})),
640            MockLlm::text_response("Echo said: hello"),
641        ]));
642        let config = test_config(vec![Arc::new(EchoTool)]);
643        let task = test_task();
644        let substrate = test_substrate();
645        let emitter = EventEmitter::default();
646        let approval = pulsehive_core::approval::AutoApprove;
647
648        let outcome = run_agentic_loop(
649            config,
650            LoopContext {
651                agent_id: "agent-1".into(),
652                task: &task,
653                provider,
654                substrate,
655                approval_handler: &approval,
656                event_emitter: emitter,
657                max_iterations: DEFAULT_MAX_ITERATIONS,
658                embedding_provider: None,
659            },
660        )
661        .await;
662
663        assert!(
664            matches!(&outcome, AgentOutcome::Complete { response } if response == "Echo said: hello")
665        );
666    }
667
668    #[tokio::test]
669    async fn test_max_iterations_reached() {
670        // LLM always returns tool calls — never gives a final response
671        let responses: Vec<LlmResponse> = (0..5)
672            .map(|i| {
673                MockLlm::tool_call_response(
674                    &format!("call_{i}"),
675                    "echo",
676                    serde_json::json!({"text": "loop"}),
677                )
678            })
679            .collect();
680
681        let provider = Arc::new(MockLlm::new(responses));
682        let config = test_config(vec![Arc::new(EchoTool)]);
683        let task = test_task();
684        let substrate = test_substrate();
685        let emitter = EventEmitter::default();
686        let approval = pulsehive_core::approval::AutoApprove;
687
688        let outcome = run_agentic_loop(
689            config,
690            LoopContext {
691                agent_id: "agent-1".into(),
692                task: &task,
693                provider,
694                substrate,
695                approval_handler: &approval,
696                event_emitter: emitter,
697                max_iterations: 3, // Only 3 iterations
698                embedding_provider: None,
699            },
700        )
701        .await;
702
703        assert!(matches!(outcome, AgentOutcome::MaxIterationsReached));
704    }
705
706    #[tokio::test]
707    async fn test_tool_not_found() {
708        // LLM calls a tool that doesn't exist, then gives final response
709        let provider = Arc::new(MockLlm::new(vec![
710            MockLlm::tool_call_response("call_1", "nonexistent_tool", serde_json::json!({})),
711            MockLlm::text_response("I couldn't find that tool."),
712        ]));
713        let config = test_config(vec![]); // No tools registered
714        let task = test_task();
715        let substrate = test_substrate();
716        let emitter = EventEmitter::default();
717        let approval = pulsehive_core::approval::AutoApprove;
718
719        let outcome = run_agentic_loop(
720            config,
721            LoopContext {
722                agent_id: "agent-1".into(),
723                task: &task,
724                provider,
725                substrate,
726                approval_handler: &approval,
727                event_emitter: emitter,
728                max_iterations: DEFAULT_MAX_ITERATIONS,
729                embedding_provider: None,
730            },
731        )
732        .await;
733
734        // Should complete (LLM recovered from tool-not-found error)
735        assert!(matches!(outcome, AgentOutcome::Complete { .. }));
736    }
737
738    #[tokio::test]
739    async fn test_llm_error_returns_error_outcome() {
740        // Provider that always returns error
741        let provider = Arc::new(MockLlm::new(vec![])); // Empty = error
742        let config = test_config(vec![]);
743        let task = test_task();
744        let substrate = test_substrate();
745        let emitter = EventEmitter::default();
746        let approval = pulsehive_core::approval::AutoApprove;
747
748        let outcome = run_agentic_loop(
749            config,
750            LoopContext {
751                agent_id: "agent-1".into(),
752                task: &task,
753                provider,
754                substrate,
755                approval_handler: &approval,
756                event_emitter: emitter,
757                max_iterations: DEFAULT_MAX_ITERATIONS,
758                embedding_provider: None,
759            },
760        )
761        .await;
762
763        assert!(matches!(outcome, AgentOutcome::Error { .. }));
764    }
765
766    #[tokio::test]
767    async fn test_events_emitted_during_loop() {
768        let provider = Arc::new(MockLlm::new(vec![
769            MockLlm::tool_call_response("call_1", "echo", serde_json::json!({"text": "test"})),
770            MockLlm::text_response("Done"),
771        ]));
772        let config = test_config(vec![Arc::new(EchoTool)]);
773        let task = test_task();
774        let substrate = test_substrate();
775        let emitter = EventEmitter::default();
776        let mut rx = emitter.subscribe();
777        let approval = pulsehive_core::approval::AutoApprove;
778
779        let _outcome = run_agentic_loop(
780            config,
781            LoopContext {
782                agent_id: "agent-1".into(),
783                task: &task,
784                provider,
785                substrate,
786                approval_handler: &approval,
787                event_emitter: emitter,
788                max_iterations: DEFAULT_MAX_ITERATIONS,
789                embedding_provider: None,
790            },
791        )
792        .await;
793
794        // Collect all events
795        let mut events = vec![];
796        while let Ok(event) = rx.try_recv() {
797            events.push(event);
798        }
799
800        // Should have: SubstratePerceived, LlmCallStarted, LlmCallCompleted,
801        // ToolCallStarted, ToolCallCompleted, LlmCallStarted, LlmCallCompleted
802        assert!(events
803            .iter()
804            .any(|e| matches!(e, HiveEvent::SubstratePerceived { .. })));
805        assert!(events
806            .iter()
807            .any(|e| matches!(e, HiveEvent::LlmCallStarted { .. })));
808        assert!(events
809            .iter()
810            .any(|e| matches!(e, HiveEvent::LlmCallCompleted { .. })));
811        assert!(events
812            .iter()
813            .any(|e| matches!(e, HiveEvent::ToolCallStarted { .. })));
814        assert!(events
815            .iter()
816            .any(|e| matches!(e, HiveEvent::ToolCallCompleted { .. })));
817    }
818
819    // ── Mid-task refresh tests ───────────────────────────────────────
820
821    fn test_config_with_refresh(
822        tools: Vec<Arc<dyn Tool>>,
823        refresh: Option<usize>,
824    ) -> LlmAgentConfig {
825        let mut config = test_config(tools);
826        config.refresh_every_n_tool_calls = refresh;
827        config
828    }
829
830    #[tokio::test]
831    async fn test_refresh_disabled_no_extra_perception() {
832        // 1 tool call + final response, refresh=None → only 1 SubstratePerceived
833        let provider = Arc::new(MockLlm::new(vec![
834            MockLlm::tool_call_response("c1", "echo", serde_json::json!({"text": "a"})),
835            MockLlm::text_response("Done"),
836        ]));
837        let config = test_config_with_refresh(vec![Arc::new(EchoTool)], None);
838        let task = test_task();
839        let substrate = test_substrate();
840        let emitter = EventEmitter::default();
841        let mut rx = emitter.subscribe();
842        let approval = pulsehive_core::approval::AutoApprove;
843
844        let _outcome = run_agentic_loop(
845            config,
846            LoopContext {
847                agent_id: "agent-no-refresh".into(),
848                task: &task,
849                provider,
850                substrate,
851                approval_handler: &approval,
852                event_emitter: emitter,
853                max_iterations: DEFAULT_MAX_ITERATIONS,
854                embedding_provider: None,
855            },
856        )
857        .await;
858
859        let mut events = vec![];
860        while let Ok(e) = rx.try_recv() {
861            events.push(e);
862        }
863
864        let perceive_count = events
865            .iter()
866            .filter(|e| matches!(e, HiveEvent::SubstratePerceived { .. }))
867            .count();
868        assert_eq!(
869            perceive_count, 1,
870            "With refresh=None, should have exactly 1 SubstratePerceived (initial). Got {perceive_count}"
871        );
872    }
873
874    #[tokio::test]
875    async fn test_refresh_every_1_triggers_after_tool_call() {
876        // 2 tool calls + final response, refresh=Some(1) → 1 initial + 2 refresh = 3 SubstratePerceived
877        let provider = Arc::new(MockLlm::new(vec![
878            MockLlm::tool_call_response("c1", "echo", serde_json::json!({"text": "a"})),
879            MockLlm::tool_call_response("c2", "echo", serde_json::json!({"text": "b"})),
880            MockLlm::text_response("Done"),
881        ]));
882        let config = test_config_with_refresh(vec![Arc::new(EchoTool)], Some(1));
883        let task = test_task();
884        let substrate = test_substrate();
885        let emitter = EventEmitter::default();
886        let mut rx = emitter.subscribe();
887        let approval = pulsehive_core::approval::AutoApprove;
888
889        let _outcome = run_agentic_loop(
890            config,
891            LoopContext {
892                agent_id: "agent-refresh-1".into(),
893                task: &task,
894                provider,
895                substrate,
896                approval_handler: &approval,
897                event_emitter: emitter,
898                max_iterations: DEFAULT_MAX_ITERATIONS,
899                embedding_provider: None,
900            },
901        )
902        .await;
903
904        let mut events = vec![];
905        while let Ok(e) = rx.try_recv() {
906            events.push(e);
907        }
908
909        let perceive_count = events
910            .iter()
911            .filter(|e| matches!(e, HiveEvent::SubstratePerceived { .. }))
912            .count();
913        assert!(
914            perceive_count >= 3,
915            "With refresh=Some(1) and 2 tool calls, should have >= 3 SubstratePerceived. Got {perceive_count}"
916        );
917    }
918
919    #[tokio::test]
920    async fn test_refresh_not_triggered_below_threshold() {
921        // 1 tool call + final response, refresh=Some(10) → only 1 SubstratePerceived (threshold not reached)
922        let provider = Arc::new(MockLlm::new(vec![
923            MockLlm::tool_call_response("c1", "echo", serde_json::json!({"text": "a"})),
924            MockLlm::text_response("Done"),
925        ]));
926        let config = test_config_with_refresh(vec![Arc::new(EchoTool)], Some(10));
927        let task = test_task();
928        let substrate = test_substrate();
929        let emitter = EventEmitter::default();
930        let mut rx = emitter.subscribe();
931        let approval = pulsehive_core::approval::AutoApprove;
932
933        let _outcome = run_agentic_loop(
934            config,
935            LoopContext {
936                agent_id: "agent-high-threshold".into(),
937                task: &task,
938                provider,
939                substrate,
940                approval_handler: &approval,
941                event_emitter: emitter,
942                max_iterations: DEFAULT_MAX_ITERATIONS,
943                embedding_provider: None,
944            },
945        )
946        .await;
947
948        let mut events = vec![];
949        while let Ok(e) = rx.try_recv() {
950            events.push(e);
951        }
952
953        let perceive_count = events
954            .iter()
955            .filter(|e| matches!(e, HiveEvent::SubstratePerceived { .. }))
956            .count();
957        assert_eq!(
958            perceive_count, 1,
959            "With refresh=Some(10) and 1 tool call, should have exactly 1 SubstratePerceived. Got {perceive_count}"
960        );
961    }
962}