Skip to main content

pulsehive_runtime/
agentic_loop.rs

1//! Agentic loop engine — the Perceive→Think→Act→Record cycle.
2//!
3//! This module implements the core execution loop for LLM agents. Each agent
4//! runs through: perceive substrate → think via LLM → act on tool calls → record experiences.
5//!
6//! The loop is driven by [`run_agentic_loop`], called from `HiveMind::deploy()`.
7
8use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::Instant;
11
12use pulsedb::SubstrateProvider;
13use tracing::Instrument;
14
15use pulsehive_core::agent::{AgentOutcome, ExperienceExtractor, LlmAgentConfig};
16use pulsehive_core::approval::{ApprovalHandler, ApprovalResult, PendingAction};
17use pulsehive_core::event::{EventEmitter, HiveEvent};
18use pulsehive_core::lens::Lens;
19use pulsehive_core::llm::{LlmConfig, LlmProvider, Message, ToolCall, ToolDefinition};
20use pulsehive_core::tool::{Tool, ToolContext, ToolResult};
21
22use crate::hivemind::Task;
23
24/// Default maximum iterations for the agentic loop.
25pub const DEFAULT_MAX_ITERATIONS: usize = 25;
26
27/// Runtime context for the agentic loop, grouping shared resources.
28pub struct LoopContext<'a> {
29    pub agent_id: String,
30    pub task: &'a Task,
31    pub provider: Arc<dyn LlmProvider>,
32    pub substrate: Arc<dyn SubstrateProvider>,
33    pub approval_handler: &'a dyn ApprovalHandler,
34    pub event_emitter: EventEmitter,
35    pub max_iterations: usize,
36    /// Optional embedding provider for computing embeddings before storage.
37    pub embedding_provider: Option<Arc<dyn pulsehive_core::embedding::EmbeddingProvider>>,
38}
39
40/// Run the agentic loop for a single LLM agent.
41///
42/// Executes the Perceive→Think→Act→Record cycle until the LLM produces
43/// a final response (no tool calls) or `max_iterations` is reached.
44pub async fn run_agentic_loop(config: LlmAgentConfig, ctx: LoopContext<'_>) -> AgentOutcome {
45    let LlmAgentConfig {
46        system_prompt,
47        tools,
48        lens,
49        llm_config,
50        experience_extractor,
51        refresh_every_n_tool_calls,
52    } = config;
53
54    // Build tool lookup map and definitions for LLM
55    let tool_map: HashMap<&str, &dyn Tool> = tools
56        .iter()
57        .map(|t| (t.name(), t.as_ref() as &dyn Tool))
58        .collect();
59    let tool_defs: Vec<ToolDefinition> = tools
60        .iter()
61        .map(|t| ToolDefinition::from_tool(t.as_ref()))
62        .collect();
63
64    // 1. PERCEIVE — query substrate through lens
65    let context_messages = perceive(
66        ctx.substrate.as_ref(),
67        &lens,
68        ctx.task,
69        &ctx.event_emitter,
70        &ctx.agent_id,
71    )
72    .instrument(tracing::info_span!("perceive", agent_id = %ctx.agent_id))
73    .await;
74
75    // 2. Build initial conversation
76    let mut messages: Vec<Message> = Vec::new();
77    messages.push(Message::system(&system_prompt));
78    messages.extend(context_messages);
79    messages.push(Message::user(&ctx.task.description));
80
81    // 3. THINK → ACT loop (with optional mid-task refresh)
82    let outcome = think_act_loop(
83        &ctx.agent_id,
84        &mut messages,
85        &tool_map,
86        &tool_defs,
87        &llm_config,
88        &ctx,
89        &lens,
90        refresh_every_n_tool_calls,
91    )
92    .await;
93
94    // 4. RECORD — extract experiences and store in substrate
95    record(&messages, &outcome, &ctx, experience_extractor.as_deref())
96        .instrument(tracing::info_span!("record", agent_id = %ctx.agent_id))
97        .await;
98
99    outcome
100}
101
102/// The core Think→Act loop. Returns when LLM produces a final response or max iterations hit.
103///
104/// When `refresh_every` is `Some(n)`, re-runs the Perceive phase every `n` tool calls,
105/// appending fresh context to the conversation. This enables parallel agents to perceive
106/// each other's experiences mid-task.
107#[allow(clippy::too_many_arguments)]
108async fn think_act_loop(
109    agent_id: &str,
110    messages: &mut Vec<Message>,
111    tool_map: &HashMap<&str, &dyn Tool>,
112    tool_defs: &[ToolDefinition],
113    llm_config: &LlmConfig,
114    ctx: &LoopContext<'_>,
115    lens: &Lens,
116    refresh_every: Option<usize>,
117) -> AgentOutcome {
118    let mut tool_calls_since_refresh: usize = 0;
119
120    for iteration in 1..=ctx.max_iterations {
121        let think_span = tracing::info_span!(
122            "think",
123            agent_id = %agent_id,
124            iteration,
125            model = %llm_config.model,
126            message_count = messages.len(),
127        );
128
129        // ── THINK: call LLM ──────────────────────────────────────────
130        ctx.event_emitter.emit(HiveEvent::LlmCallStarted {
131            agent_id: agent_id.to_string(),
132            model: llm_config.model.clone(),
133            message_count: messages.len(),
134        });
135
136        let start = Instant::now();
137        let response = ctx
138            .provider
139            .chat(messages.clone(), tool_defs.to_vec(), llm_config)
140            .instrument(think_span)
141            .await;
142        let duration_ms = start.elapsed().as_millis() as u64;
143
144        ctx.event_emitter.emit(HiveEvent::LlmCallCompleted {
145            agent_id: agent_id.to_string(),
146            model: llm_config.model.clone(),
147            duration_ms,
148        });
149
150        let response = match response {
151            Ok(r) => r,
152            Err(e) => {
153                tracing::error!(agent_id = %agent_id, error = %e, "LLM call failed");
154                return AgentOutcome::Error {
155                    error: e.to_string(),
156                };
157            }
158        };
159
160        // ── ACT: handle response ─────────────────────────────────────
161        if response.tool_calls.is_empty() {
162            let content = response.content.unwrap_or_default();
163            tracing::debug!(agent_id = %agent_id, "Final response received");
164            messages.push(Message::assistant(&content));
165            return AgentOutcome::Complete { response: content };
166        }
167
168        tracing::debug!(
169            agent_id = %agent_id,
170            tool_count = response.tool_calls.len(),
171            "Tool calls received"
172        );
173
174        messages.push(Message::assistant_with_tool_calls(
175            response.tool_calls.clone(),
176        ));
177
178        for tool_call in &response.tool_calls {
179            let result = execute_tool_call(
180                agent_id,
181                tool_call,
182                tool_map,
183                &ctx.substrate,
184                ctx.approval_handler,
185                &ctx.event_emitter,
186                &ctx.task.collective_id,
187            )
188            .instrument(tracing::info_span!("act", agent_id = %agent_id, tool = %tool_call.name))
189            .await;
190
191            messages.push(Message::tool_result(&tool_call.id, result.to_content()));
192            tool_calls_since_refresh += 1;
193        }
194
195        // ── MID-TASK REFRESH: re-perceive substrate if threshold reached ──
196        if let Some(interval) = refresh_every {
197            if tool_calls_since_refresh >= interval {
198                tracing::info!(
199                    agent_id = %agent_id,
200                    tool_calls = tool_calls_since_refresh,
201                    "Mid-task substrate refresh"
202                );
203                let refreshed = perceive(
204                    ctx.substrate.as_ref(),
205                    lens,
206                    ctx.task,
207                    &ctx.event_emitter,
208                    agent_id,
209                )
210                .instrument(tracing::info_span!("perceive", agent_id = %agent_id, refresh = true))
211                .await;
212                messages.extend(refreshed);
213                tool_calls_since_refresh = 0;
214            }
215        }
216    }
217
218    tracing::warn!(agent_id = %agent_id, max = ctx.max_iterations, "Max iterations reached");
219    AgentOutcome::MaxIterationsReached
220}
221
222/// Execute a single tool call with approval check.
223async fn execute_tool_call(
224    agent_id: &str,
225    tool_call: &ToolCall,
226    tool_map: &HashMap<&str, &dyn Tool>,
227    substrate: &Arc<dyn SubstrateProvider>,
228    approval_handler: &dyn ApprovalHandler,
229    event_emitter: &EventEmitter,
230    collective_id: &pulsedb::CollectiveId,
231) -> ToolResult {
232    let Some(&tool) = tool_map.get(tool_call.name.as_str()) else {
233        tracing::warn!(agent_id = %agent_id, tool = %tool_call.name, "Tool not found");
234        return ToolResult::error(format!("Tool '{}' not found", tool_call.name));
235    };
236
237    // Check approval if required
238    if tool.requires_approval() {
239        event_emitter.emit(HiveEvent::ToolApprovalRequested {
240            agent_id: agent_id.to_string(),
241            tool_name: tool_call.name.clone(),
242            description: format!("Execute {} with {:?}", tool_call.name, tool_call.arguments),
243        });
244
245        let action = PendingAction {
246            agent_id: agent_id.to_string(),
247            tool_name: tool_call.name.clone(),
248            params: tool_call.arguments.clone(),
249            description: format!("Execute {} tool", tool_call.name),
250        };
251
252        match approval_handler.request_approval(&action).await {
253            Ok(ApprovalResult::Approved) => {} // proceed
254            Ok(ApprovalResult::Denied { reason }) => {
255                return ToolResult::error(format!("Tool execution denied: {reason}"));
256            }
257            Ok(ApprovalResult::Modified { new_params }) => {
258                // Execute with modified params
259                return execute_tool_inner(
260                    agent_id,
261                    &tool_call.name,
262                    new_params,
263                    tool,
264                    substrate,
265                    event_emitter,
266                    collective_id,
267                )
268                .await;
269            }
270            Err(e) => {
271                return ToolResult::error(format!("Approval handler error: {e}"));
272            }
273        }
274    }
275
276    execute_tool_inner(
277        agent_id,
278        &tool_call.name,
279        tool_call.arguments.clone(),
280        tool,
281        substrate,
282        event_emitter,
283        collective_id,
284    )
285    .await
286}
287
288/// Execute a tool and emit events.
289async fn execute_tool_inner(
290    agent_id: &str,
291    tool_name: &str,
292    params: serde_json::Value,
293    tool: &dyn Tool,
294    substrate: &Arc<dyn SubstrateProvider>,
295    event_emitter: &EventEmitter,
296    collective_id: &pulsedb::CollectiveId,
297) -> ToolResult {
298    event_emitter.emit(HiveEvent::ToolCallStarted {
299        agent_id: agent_id.to_string(),
300        tool_name: tool_name.to_string(),
301    });
302
303    let start = Instant::now();
304    let context = ToolContext {
305        agent_id: agent_id.to_string(),
306        collective_id: *collective_id,
307        substrate: Arc::clone(substrate),
308        event_emitter: event_emitter.clone(),
309    };
310
311    let result = match tool
312        .execute(params, &context)
313        .instrument(tracing::debug_span!("tool_execute", tool = %tool_name))
314        .await
315    {
316        Ok(result) => result,
317        Err(e) => {
318            tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
319            ToolResult::error(e.to_string())
320        }
321    };
322
323    let duration_ms = start.elapsed().as_millis() as u64;
324    tracing::debug!(tool = %tool_name, duration_ms, "Tool completed");
325    event_emitter.emit(HiveEvent::ToolCallCompleted {
326        agent_id: agent_id.to_string(),
327        tool_name: tool_name.to_string(),
328        duration_ms,
329    });
330
331    result
332}
333
334// ── Perceive Phase ───────────────────────────────────────────────────
335
336/// Query the substrate through the agent's lens and assemble budget-aware context.
337async fn perceive(
338    substrate: &dyn SubstrateProvider,
339    lens: &Lens,
340    task: &Task,
341    event_emitter: &EventEmitter,
342    agent_id: &str,
343) -> Vec<Message> {
344    use crate::perception;
345    use pulsehive_core::context::ContextBudget;
346
347    let budget = ContextBudget::from_lens(lens);
348    let messages = match perception::assemble_context(substrate, lens, task.collective_id, &budget)
349        .await
350    {
351        Ok(msgs) => msgs,
352        Err(e) => {
353            tracing::warn!(agent_id = %agent_id, error = %e, "Perception failed, continuing without context");
354            vec![]
355        }
356    };
357
358    let experience_count = if messages.is_empty() { 0 } else { 1 }; // At least 1 context message
359    event_emitter.emit(HiveEvent::SubstratePerceived {
360        agent_id: agent_id.to_string(),
361        experience_count,
362        insight_count: 0,
363    });
364
365    messages
366}
367
368// ── Record Phase ─────────────────────────────────────────────────────
369
370/// Extract experiences from the conversation and store in substrate.
371async fn record(
372    conversation: &[Message],
373    outcome: &AgentOutcome,
374    ctx: &LoopContext<'_>,
375    extractor: Option<&dyn ExperienceExtractor>,
376) {
377    use crate::experience::DefaultExperienceExtractor;
378    use pulsehive_core::agent::ExtractionContext;
379
380    let extraction_ctx = ExtractionContext {
381        agent_id: ctx.agent_id.clone(),
382        collective_id: ctx.task.collective_id,
383        task_description: ctx.task.description.clone(),
384    };
385
386    let default_extractor = DefaultExperienceExtractor;
387    let extractor: &dyn ExperienceExtractor = extractor.unwrap_or(&default_extractor);
388
389    let experiences = extractor
390        .extract(conversation, outcome, &extraction_ctx)
391        .await;
392
393    let count = experiences.len();
394    for mut exp in experiences {
395        // Compute embedding via provider if available and not already set
396        if let Some(provider) = &ctx.embedding_provider {
397            if exp.embedding.is_none() {
398                let start = std::time::Instant::now();
399                match provider.embed(&exp.content).await {
400                    Ok(embedding) => {
401                        let duration_ms = start.elapsed().as_millis() as u64;
402                        let dimensions = embedding.len();
403                        exp.embedding = Some(embedding);
404                        ctx.event_emitter.emit(HiveEvent::EmbeddingComputed {
405                            agent_id: ctx.agent_id.clone(),
406                            dimensions,
407                            duration_ms,
408                        });
409                    }
410                    Err(e) => {
411                        tracing::warn!(
412                            agent_id = %ctx.agent_id,
413                            error = %e,
414                            "Failed to compute embedding, storing without"
415                        );
416                    }
417                }
418            }
419        }
420
421        match ctx.substrate.store_experience(exp).await {
422            Ok(id) => {
423                ctx.event_emitter.emit(HiveEvent::ExperienceRecorded {
424                    experience_id: id,
425                    agent_id: ctx.agent_id.clone(),
426                });
427            }
428            Err(e) => {
429                tracing::warn!(
430                    agent_id = %ctx.agent_id,
431                    error = %e,
432                    "Failed to store experience"
433                );
434            }
435        }
436    }
437
438    if count > 0 {
439        tracing::debug!(agent_id = %ctx.agent_id, count = count, "Recorded experiences");
440    }
441}
442
443#[cfg(test)]
444mod tests {
445    use super::*;
446    use async_trait::async_trait;
447    use futures_core::Stream;
448    use pulsedb::CollectiveId;
449    use pulsehive_core::error::{PulseHiveError, Result};
450    use pulsehive_core::llm::{LlmChunk, LlmResponse, TokenUsage};
451    use std::pin::Pin;
452    use std::sync::Mutex;
453
454    // ── Mock LLM Provider ────────────────────────────────────────────
455
456    /// Mock LLM that returns scripted responses in order.
457    struct MockLlm {
458        responses: Mutex<Vec<LlmResponse>>,
459    }
460
461    impl MockLlm {
462        fn new(responses: Vec<LlmResponse>) -> Self {
463            Self {
464                responses: Mutex::new(responses),
465            }
466        }
467
468        fn text_response(content: &str) -> LlmResponse {
469            LlmResponse {
470                content: Some(content.into()),
471                tool_calls: vec![],
472                usage: TokenUsage::default(),
473            }
474        }
475
476        fn tool_call_response(id: &str, name: &str, args: serde_json::Value) -> LlmResponse {
477            LlmResponse {
478                content: None,
479                tool_calls: vec![ToolCall {
480                    id: id.into(),
481                    name: name.into(),
482                    arguments: args,
483                }],
484                usage: TokenUsage::default(),
485            }
486        }
487    }
488
489    #[async_trait]
490    impl LlmProvider for MockLlm {
491        async fn chat(
492            &self,
493            _messages: Vec<Message>,
494            _tools: Vec<ToolDefinition>,
495            _config: &LlmConfig,
496        ) -> Result<LlmResponse> {
497            let mut responses = self.responses.lock().unwrap();
498            if responses.is_empty() {
499                Err(PulseHiveError::llm("No more scripted responses"))
500            } else {
501                Ok(responses.remove(0))
502            }
503        }
504
505        async fn chat_stream(
506            &self,
507            _messages: Vec<Message>,
508            _tools: Vec<ToolDefinition>,
509            _config: &LlmConfig,
510        ) -> Result<Pin<Box<dyn Stream<Item = Result<LlmChunk>> + Send>>> {
511            Err(PulseHiveError::llm("Streaming not used in loop"))
512        }
513    }
514
515    // ── Mock Tool ────────────────────────────────────────────────────
516
517    struct EchoTool;
518
519    #[async_trait]
520    impl Tool for EchoTool {
521        fn name(&self) -> &str {
522            "echo"
523        }
524        fn description(&self) -> &str {
525            "Echoes input"
526        }
527        fn parameters(&self) -> serde_json::Value {
528            serde_json::json!({"type": "object", "properties": {"text": {"type": "string"}}})
529        }
530        async fn execute(
531            &self,
532            params: serde_json::Value,
533            _ctx: &ToolContext,
534        ) -> Result<ToolResult> {
535            let text = params["text"].as_str().unwrap_or("no text");
536            Ok(ToolResult::text(format!("Echo: {text}")))
537        }
538    }
539
540    // ── Helper ───────────────────────────────────────────────────────
541
542    fn test_config(tools: Vec<Arc<dyn Tool>>) -> LlmAgentConfig {
543        LlmAgentConfig {
544            system_prompt: "You are a test agent.".into(),
545            tools,
546            lens: pulsehive_core::lens::Lens::default(),
547            llm_config: LlmConfig::new("mock", "test-model"),
548            experience_extractor: None,
549            refresh_every_n_tool_calls: None,
550        }
551    }
552
553    fn test_task() -> Task {
554        Task {
555            description: "Test task".into(),
556            collective_id: CollectiveId::new(),
557        }
558    }
559
560    fn test_substrate() -> Arc<dyn SubstrateProvider> {
561        // Use a real PulseDB with tempfile for substrate
562        let dir = tempfile::tempdir().unwrap();
563        let db =
564            pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default()).unwrap();
565        // Leak the tempdir so it lives long enough
566        let dir = Box::leak(Box::new(dir));
567        let _ = dir;
568        Arc::new(pulsedb::PulseDBSubstrate::from_db(db))
569    }
570
571    // ── Tests ────────────────────────────────────────────────────────
572
573    #[tokio::test]
574    async fn test_text_only_response() {
575        let provider = Arc::new(MockLlm::new(vec![MockLlm::text_response(
576            "The answer is 42.",
577        )]));
578        let config = test_config(vec![]);
579        let task = test_task();
580        let substrate = test_substrate();
581        let emitter = EventEmitter::default();
582        let approval = pulsehive_core::approval::AutoApprove;
583
584        let outcome = run_agentic_loop(
585            config,
586            LoopContext {
587                agent_id: "agent-1".into(),
588                task: &task,
589                provider,
590                substrate,
591                approval_handler: &approval,
592                event_emitter: emitter,
593                max_iterations: DEFAULT_MAX_ITERATIONS,
594                embedding_provider: None,
595            },
596        )
597        .await;
598
599        assert!(
600            matches!(&outcome, AgentOutcome::Complete { response } if response == "The answer is 42.")
601        );
602    }
603
604    #[tokio::test]
605    async fn test_tool_call_then_response() {
606        let provider = Arc::new(MockLlm::new(vec![
607            MockLlm::tool_call_response("call_1", "echo", serde_json::json!({"text": "hello"})),
608            MockLlm::text_response("Echo said: hello"),
609        ]));
610        let config = test_config(vec![Arc::new(EchoTool)]);
611        let task = test_task();
612        let substrate = test_substrate();
613        let emitter = EventEmitter::default();
614        let approval = pulsehive_core::approval::AutoApprove;
615
616        let outcome = run_agentic_loop(
617            config,
618            LoopContext {
619                agent_id: "agent-1".into(),
620                task: &task,
621                provider,
622                substrate,
623                approval_handler: &approval,
624                event_emitter: emitter,
625                max_iterations: DEFAULT_MAX_ITERATIONS,
626                embedding_provider: None,
627            },
628        )
629        .await;
630
631        assert!(
632            matches!(&outcome, AgentOutcome::Complete { response } if response == "Echo said: hello")
633        );
634    }
635
636    #[tokio::test]
637    async fn test_max_iterations_reached() {
638        // LLM always returns tool calls — never gives a final response
639        let responses: Vec<LlmResponse> = (0..5)
640            .map(|i| {
641                MockLlm::tool_call_response(
642                    &format!("call_{i}"),
643                    "echo",
644                    serde_json::json!({"text": "loop"}),
645                )
646            })
647            .collect();
648
649        let provider = Arc::new(MockLlm::new(responses));
650        let config = test_config(vec![Arc::new(EchoTool)]);
651        let task = test_task();
652        let substrate = test_substrate();
653        let emitter = EventEmitter::default();
654        let approval = pulsehive_core::approval::AutoApprove;
655
656        let outcome = run_agentic_loop(
657            config,
658            LoopContext {
659                agent_id: "agent-1".into(),
660                task: &task,
661                provider,
662                substrate,
663                approval_handler: &approval,
664                event_emitter: emitter,
665                max_iterations: 3, // Only 3 iterations
666                embedding_provider: None,
667            },
668        )
669        .await;
670
671        assert!(matches!(outcome, AgentOutcome::MaxIterationsReached));
672    }
673
674    #[tokio::test]
675    async fn test_tool_not_found() {
676        // LLM calls a tool that doesn't exist, then gives final response
677        let provider = Arc::new(MockLlm::new(vec![
678            MockLlm::tool_call_response("call_1", "nonexistent_tool", serde_json::json!({})),
679            MockLlm::text_response("I couldn't find that tool."),
680        ]));
681        let config = test_config(vec![]); // No tools registered
682        let task = test_task();
683        let substrate = test_substrate();
684        let emitter = EventEmitter::default();
685        let approval = pulsehive_core::approval::AutoApprove;
686
687        let outcome = run_agentic_loop(
688            config,
689            LoopContext {
690                agent_id: "agent-1".into(),
691                task: &task,
692                provider,
693                substrate,
694                approval_handler: &approval,
695                event_emitter: emitter,
696                max_iterations: DEFAULT_MAX_ITERATIONS,
697                embedding_provider: None,
698            },
699        )
700        .await;
701
702        // Should complete (LLM recovered from tool-not-found error)
703        assert!(matches!(outcome, AgentOutcome::Complete { .. }));
704    }
705
706    #[tokio::test]
707    async fn test_llm_error_returns_error_outcome() {
708        // Provider that always returns error
709        let provider = Arc::new(MockLlm::new(vec![])); // Empty = error
710        let config = test_config(vec![]);
711        let task = test_task();
712        let substrate = test_substrate();
713        let emitter = EventEmitter::default();
714        let approval = pulsehive_core::approval::AutoApprove;
715
716        let outcome = run_agentic_loop(
717            config,
718            LoopContext {
719                agent_id: "agent-1".into(),
720                task: &task,
721                provider,
722                substrate,
723                approval_handler: &approval,
724                event_emitter: emitter,
725                max_iterations: DEFAULT_MAX_ITERATIONS,
726                embedding_provider: None,
727            },
728        )
729        .await;
730
731        assert!(matches!(outcome, AgentOutcome::Error { .. }));
732    }
733
734    #[tokio::test]
735    async fn test_events_emitted_during_loop() {
736        let provider = Arc::new(MockLlm::new(vec![
737            MockLlm::tool_call_response("call_1", "echo", serde_json::json!({"text": "test"})),
738            MockLlm::text_response("Done"),
739        ]));
740        let config = test_config(vec![Arc::new(EchoTool)]);
741        let task = test_task();
742        let substrate = test_substrate();
743        let emitter = EventEmitter::default();
744        let mut rx = emitter.subscribe();
745        let approval = pulsehive_core::approval::AutoApprove;
746
747        let _outcome = run_agentic_loop(
748            config,
749            LoopContext {
750                agent_id: "agent-1".into(),
751                task: &task,
752                provider,
753                substrate,
754                approval_handler: &approval,
755                event_emitter: emitter,
756                max_iterations: DEFAULT_MAX_ITERATIONS,
757                embedding_provider: None,
758            },
759        )
760        .await;
761
762        // Collect all events
763        let mut events = vec![];
764        while let Ok(event) = rx.try_recv() {
765            events.push(event);
766        }
767
768        // Should have: SubstratePerceived, LlmCallStarted, LlmCallCompleted,
769        // ToolCallStarted, ToolCallCompleted, LlmCallStarted, LlmCallCompleted
770        assert!(events
771            .iter()
772            .any(|e| matches!(e, HiveEvent::SubstratePerceived { .. })));
773        assert!(events
774            .iter()
775            .any(|e| matches!(e, HiveEvent::LlmCallStarted { .. })));
776        assert!(events
777            .iter()
778            .any(|e| matches!(e, HiveEvent::LlmCallCompleted { .. })));
779        assert!(events
780            .iter()
781            .any(|e| matches!(e, HiveEvent::ToolCallStarted { .. })));
782        assert!(events
783            .iter()
784            .any(|e| matches!(e, HiveEvent::ToolCallCompleted { .. })));
785    }
786
787    // ── Mid-task refresh tests ───────────────────────────────────────
788
789    fn test_config_with_refresh(
790        tools: Vec<Arc<dyn Tool>>,
791        refresh: Option<usize>,
792    ) -> LlmAgentConfig {
793        let mut config = test_config(tools);
794        config.refresh_every_n_tool_calls = refresh;
795        config
796    }
797
798    #[tokio::test]
799    async fn test_refresh_disabled_no_extra_perception() {
800        // 1 tool call + final response, refresh=None → only 1 SubstratePerceived
801        let provider = Arc::new(MockLlm::new(vec![
802            MockLlm::tool_call_response("c1", "echo", serde_json::json!({"text": "a"})),
803            MockLlm::text_response("Done"),
804        ]));
805        let config = test_config_with_refresh(vec![Arc::new(EchoTool)], None);
806        let task = test_task();
807        let substrate = test_substrate();
808        let emitter = EventEmitter::default();
809        let mut rx = emitter.subscribe();
810        let approval = pulsehive_core::approval::AutoApprove;
811
812        let _outcome = run_agentic_loop(
813            config,
814            LoopContext {
815                agent_id: "agent-no-refresh".into(),
816                task: &task,
817                provider,
818                substrate,
819                approval_handler: &approval,
820                event_emitter: emitter,
821                max_iterations: DEFAULT_MAX_ITERATIONS,
822                embedding_provider: None,
823            },
824        )
825        .await;
826
827        let mut events = vec![];
828        while let Ok(e) = rx.try_recv() {
829            events.push(e);
830        }
831
832        let perceive_count = events
833            .iter()
834            .filter(|e| matches!(e, HiveEvent::SubstratePerceived { .. }))
835            .count();
836        assert_eq!(
837            perceive_count, 1,
838            "With refresh=None, should have exactly 1 SubstratePerceived (initial). Got {perceive_count}"
839        );
840    }
841
842    #[tokio::test]
843    async fn test_refresh_every_1_triggers_after_tool_call() {
844        // 2 tool calls + final response, refresh=Some(1) → 1 initial + 2 refresh = 3 SubstratePerceived
845        let provider = Arc::new(MockLlm::new(vec![
846            MockLlm::tool_call_response("c1", "echo", serde_json::json!({"text": "a"})),
847            MockLlm::tool_call_response("c2", "echo", serde_json::json!({"text": "b"})),
848            MockLlm::text_response("Done"),
849        ]));
850        let config = test_config_with_refresh(vec![Arc::new(EchoTool)], Some(1));
851        let task = test_task();
852        let substrate = test_substrate();
853        let emitter = EventEmitter::default();
854        let mut rx = emitter.subscribe();
855        let approval = pulsehive_core::approval::AutoApprove;
856
857        let _outcome = run_agentic_loop(
858            config,
859            LoopContext {
860                agent_id: "agent-refresh-1".into(),
861                task: &task,
862                provider,
863                substrate,
864                approval_handler: &approval,
865                event_emitter: emitter,
866                max_iterations: DEFAULT_MAX_ITERATIONS,
867                embedding_provider: None,
868            },
869        )
870        .await;
871
872        let mut events = vec![];
873        while let Ok(e) = rx.try_recv() {
874            events.push(e);
875        }
876
877        let perceive_count = events
878            .iter()
879            .filter(|e| matches!(e, HiveEvent::SubstratePerceived { .. }))
880            .count();
881        assert!(
882            perceive_count >= 3,
883            "With refresh=Some(1) and 2 tool calls, should have >= 3 SubstratePerceived. Got {perceive_count}"
884        );
885    }
886
887    #[tokio::test]
888    async fn test_refresh_not_triggered_below_threshold() {
889        // 1 tool call + final response, refresh=Some(10) → only 1 SubstratePerceived (threshold not reached)
890        let provider = Arc::new(MockLlm::new(vec![
891            MockLlm::tool_call_response("c1", "echo", serde_json::json!({"text": "a"})),
892            MockLlm::text_response("Done"),
893        ]));
894        let config = test_config_with_refresh(vec![Arc::new(EchoTool)], Some(10));
895        let task = test_task();
896        let substrate = test_substrate();
897        let emitter = EventEmitter::default();
898        let mut rx = emitter.subscribe();
899        let approval = pulsehive_core::approval::AutoApprove;
900
901        let _outcome = run_agentic_loop(
902            config,
903            LoopContext {
904                agent_id: "agent-high-threshold".into(),
905                task: &task,
906                provider,
907                substrate,
908                approval_handler: &approval,
909                event_emitter: emitter,
910                max_iterations: DEFAULT_MAX_ITERATIONS,
911                embedding_provider: None,
912            },
913        )
914        .await;
915
916        let mut events = vec![];
917        while let Ok(e) = rx.try_recv() {
918            events.push(e);
919        }
920
921        let perceive_count = events
922            .iter()
923            .filter(|e| matches!(e, HiveEvent::SubstratePerceived { .. }))
924            .count();
925        assert_eq!(
926            perceive_count, 1,
927            "With refresh=Some(10) and 1 tool call, should have exactly 1 SubstratePerceived. Got {perceive_count}"
928        );
929    }
930}