1use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::Instant;
11
12use pulsedb::SubstrateProvider;
13use tracing::Instrument;
14
15use pulsehive_core::agent::{AgentOutcome, ExperienceExtractor, LlmAgentConfig};
16use pulsehive_core::approval::{ApprovalHandler, ApprovalResult, PendingAction};
17use pulsehive_core::event::{EventEmitter, HiveEvent};
18use pulsehive_core::lens::Lens;
19use pulsehive_core::llm::{LlmConfig, LlmProvider, Message, ToolCall, ToolDefinition};
20use pulsehive_core::tool::{Tool, ToolContext, ToolResult};
21
22use crate::hivemind::Task;
23
24pub const DEFAULT_MAX_ITERATIONS: usize = 25;
26
27pub struct LoopContext<'a> {
29 pub agent_id: String,
30 pub task: &'a Task,
31 pub provider: Arc<dyn LlmProvider>,
32 pub substrate: Arc<dyn SubstrateProvider>,
33 pub approval_handler: &'a dyn ApprovalHandler,
34 pub event_emitter: EventEmitter,
35 pub max_iterations: usize,
36 pub embedding_provider: Option<Arc<dyn pulsehive_core::embedding::EmbeddingProvider>>,
38}
39
40pub async fn run_agentic_loop(config: LlmAgentConfig, ctx: LoopContext<'_>) -> AgentOutcome {
45 let LlmAgentConfig {
46 system_prompt,
47 tools,
48 lens,
49 llm_config,
50 experience_extractor,
51 refresh_every_n_tool_calls,
52 } = config;
53
54 let tool_map: HashMap<&str, &dyn Tool> = tools
56 .iter()
57 .map(|t| (t.name(), t.as_ref() as &dyn Tool))
58 .collect();
59 let tool_defs: Vec<ToolDefinition> = tools
60 .iter()
61 .map(|t| ToolDefinition::from_tool(t.as_ref()))
62 .collect();
63
64 let context_messages = perceive(
66 ctx.substrate.as_ref(),
67 &lens,
68 ctx.task,
69 &ctx.event_emitter,
70 &ctx.agent_id,
71 )
72 .instrument(tracing::info_span!("perceive", agent_id = %ctx.agent_id))
73 .await;
74
75 let mut messages: Vec<Message> = Vec::new();
77 messages.push(Message::system(&system_prompt));
78 messages.extend(context_messages);
79 messages.push(Message::user(&ctx.task.description));
80
81 let outcome = think_act_loop(
83 &ctx.agent_id,
84 &mut messages,
85 &tool_map,
86 &tool_defs,
87 &llm_config,
88 &ctx,
89 &lens,
90 refresh_every_n_tool_calls,
91 )
92 .await;
93
94 record(&messages, &outcome, &ctx, experience_extractor.as_deref())
96 .instrument(tracing::info_span!("record", agent_id = %ctx.agent_id))
97 .await;
98
99 outcome
100}
101
102#[allow(clippy::too_many_arguments)]
108async fn think_act_loop(
109 agent_id: &str,
110 messages: &mut Vec<Message>,
111 tool_map: &HashMap<&str, &dyn Tool>,
112 tool_defs: &[ToolDefinition],
113 llm_config: &LlmConfig,
114 ctx: &LoopContext<'_>,
115 lens: &Lens,
116 refresh_every: Option<usize>,
117) -> AgentOutcome {
118 let mut tool_calls_since_refresh: usize = 0;
119
120 for iteration in 1..=ctx.max_iterations {
121 let think_span = tracing::info_span!(
122 "think",
123 agent_id = %agent_id,
124 iteration,
125 model = %llm_config.model,
126 message_count = messages.len(),
127 );
128
129 ctx.event_emitter.emit(HiveEvent::LlmCallStarted {
131 agent_id: agent_id.to_string(),
132 model: llm_config.model.clone(),
133 message_count: messages.len(),
134 });
135
136 let start = Instant::now();
137 let response = ctx
138 .provider
139 .chat(messages.clone(), tool_defs.to_vec(), llm_config)
140 .instrument(think_span)
141 .await;
142 let duration_ms = start.elapsed().as_millis() as u64;
143
144 ctx.event_emitter.emit(HiveEvent::LlmCallCompleted {
145 agent_id: agent_id.to_string(),
146 model: llm_config.model.clone(),
147 duration_ms,
148 });
149
150 let response = match response {
151 Ok(r) => r,
152 Err(e) => {
153 tracing::error!(agent_id = %agent_id, error = %e, "LLM call failed");
154 return AgentOutcome::Error {
155 error: e.to_string(),
156 };
157 }
158 };
159
160 if response.tool_calls.is_empty() {
162 let content = response.content.unwrap_or_default();
163 tracing::debug!(agent_id = %agent_id, "Final response received");
164 messages.push(Message::assistant(&content));
165 return AgentOutcome::Complete { response: content };
166 }
167
168 tracing::debug!(
169 agent_id = %agent_id,
170 tool_count = response.tool_calls.len(),
171 "Tool calls received"
172 );
173
174 messages.push(Message::assistant_with_tool_calls(
175 response.tool_calls.clone(),
176 ));
177
178 for tool_call in &response.tool_calls {
179 let result = execute_tool_call(
180 agent_id,
181 tool_call,
182 tool_map,
183 &ctx.substrate,
184 ctx.approval_handler,
185 &ctx.event_emitter,
186 &ctx.task.collective_id,
187 )
188 .instrument(tracing::info_span!("act", agent_id = %agent_id, tool = %tool_call.name))
189 .await;
190
191 messages.push(Message::tool_result(&tool_call.id, result.to_content()));
192 tool_calls_since_refresh += 1;
193 }
194
195 if let Some(interval) = refresh_every {
197 if tool_calls_since_refresh >= interval {
198 tracing::info!(
199 agent_id = %agent_id,
200 tool_calls = tool_calls_since_refresh,
201 "Mid-task substrate refresh"
202 );
203 let refreshed = perceive(
204 ctx.substrate.as_ref(),
205 lens,
206 ctx.task,
207 &ctx.event_emitter,
208 agent_id,
209 )
210 .instrument(tracing::info_span!("perceive", agent_id = %agent_id, refresh = true))
211 .await;
212 messages.extend(refreshed);
213 tool_calls_since_refresh = 0;
214 }
215 }
216 }
217
218 tracing::warn!(agent_id = %agent_id, max = ctx.max_iterations, "Max iterations reached");
219 AgentOutcome::MaxIterationsReached
220}
221
222async fn execute_tool_call(
224 agent_id: &str,
225 tool_call: &ToolCall,
226 tool_map: &HashMap<&str, &dyn Tool>,
227 substrate: &Arc<dyn SubstrateProvider>,
228 approval_handler: &dyn ApprovalHandler,
229 event_emitter: &EventEmitter,
230 collective_id: &pulsedb::CollectiveId,
231) -> ToolResult {
232 let Some(&tool) = tool_map.get(tool_call.name.as_str()) else {
233 tracing::warn!(agent_id = %agent_id, tool = %tool_call.name, "Tool not found");
234 return ToolResult::error(format!("Tool '{}' not found", tool_call.name));
235 };
236
237 if tool.requires_approval() {
239 event_emitter.emit(HiveEvent::ToolApprovalRequested {
240 agent_id: agent_id.to_string(),
241 tool_name: tool_call.name.clone(),
242 description: format!("Execute {} with {:?}", tool_call.name, tool_call.arguments),
243 });
244
245 let action = PendingAction {
246 agent_id: agent_id.to_string(),
247 tool_name: tool_call.name.clone(),
248 params: tool_call.arguments.clone(),
249 description: format!("Execute {} tool", tool_call.name),
250 };
251
252 match approval_handler.request_approval(&action).await {
253 Ok(ApprovalResult::Approved) => {} Ok(ApprovalResult::Denied { reason }) => {
255 return ToolResult::error(format!("Tool execution denied: {reason}"));
256 }
257 Ok(ApprovalResult::Modified { new_params }) => {
258 return execute_tool_inner(
260 agent_id,
261 &tool_call.name,
262 new_params,
263 tool,
264 substrate,
265 event_emitter,
266 collective_id,
267 )
268 .await;
269 }
270 Err(e) => {
271 return ToolResult::error(format!("Approval handler error: {e}"));
272 }
273 }
274 }
275
276 execute_tool_inner(
277 agent_id,
278 &tool_call.name,
279 tool_call.arguments.clone(),
280 tool,
281 substrate,
282 event_emitter,
283 collective_id,
284 )
285 .await
286}
287
288async fn execute_tool_inner(
290 agent_id: &str,
291 tool_name: &str,
292 params: serde_json::Value,
293 tool: &dyn Tool,
294 substrate: &Arc<dyn SubstrateProvider>,
295 event_emitter: &EventEmitter,
296 collective_id: &pulsedb::CollectiveId,
297) -> ToolResult {
298 event_emitter.emit(HiveEvent::ToolCallStarted {
299 agent_id: agent_id.to_string(),
300 tool_name: tool_name.to_string(),
301 });
302
303 let start = Instant::now();
304 let context = ToolContext {
305 agent_id: agent_id.to_string(),
306 collective_id: *collective_id,
307 substrate: Arc::clone(substrate),
308 event_emitter: event_emitter.clone(),
309 };
310
311 let result = match tool
312 .execute(params, &context)
313 .instrument(tracing::debug_span!("tool_execute", tool = %tool_name))
314 .await
315 {
316 Ok(result) => result,
317 Err(e) => {
318 tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
319 ToolResult::error(e.to_string())
320 }
321 };
322
323 let duration_ms = start.elapsed().as_millis() as u64;
324 tracing::debug!(tool = %tool_name, duration_ms, "Tool completed");
325 event_emitter.emit(HiveEvent::ToolCallCompleted {
326 agent_id: agent_id.to_string(),
327 tool_name: tool_name.to_string(),
328 duration_ms,
329 });
330
331 result
332}
333
334async fn perceive(
338 substrate: &dyn SubstrateProvider,
339 lens: &Lens,
340 task: &Task,
341 event_emitter: &EventEmitter,
342 agent_id: &str,
343) -> Vec<Message> {
344 use crate::perception;
345 use pulsehive_core::context::ContextBudget;
346
347 let budget = ContextBudget::from_lens(lens);
348 let messages = match perception::assemble_context(substrate, lens, task.collective_id, &budget)
349 .await
350 {
351 Ok(msgs) => msgs,
352 Err(e) => {
353 tracing::warn!(agent_id = %agent_id, error = %e, "Perception failed, continuing without context");
354 vec![]
355 }
356 };
357
358 let experience_count = if messages.is_empty() { 0 } else { 1 }; event_emitter.emit(HiveEvent::SubstratePerceived {
360 agent_id: agent_id.to_string(),
361 experience_count,
362 insight_count: 0,
363 });
364
365 messages
366}
367
368async fn record(
372 conversation: &[Message],
373 outcome: &AgentOutcome,
374 ctx: &LoopContext<'_>,
375 extractor: Option<&dyn ExperienceExtractor>,
376) {
377 use crate::experience::DefaultExperienceExtractor;
378 use pulsehive_core::agent::ExtractionContext;
379
380 let extraction_ctx = ExtractionContext {
381 agent_id: ctx.agent_id.clone(),
382 collective_id: ctx.task.collective_id,
383 task_description: ctx.task.description.clone(),
384 };
385
386 let default_extractor = DefaultExperienceExtractor;
387 let extractor: &dyn ExperienceExtractor = extractor.unwrap_or(&default_extractor);
388
389 let experiences = extractor
390 .extract(conversation, outcome, &extraction_ctx)
391 .await;
392
393 let count = experiences.len();
394 for mut exp in experiences {
395 if let Some(provider) = &ctx.embedding_provider {
397 if exp.embedding.is_none() {
398 let start = std::time::Instant::now();
399 match provider.embed(&exp.content).await {
400 Ok(embedding) => {
401 let duration_ms = start.elapsed().as_millis() as u64;
402 let dimensions = embedding.len();
403 exp.embedding = Some(embedding);
404 ctx.event_emitter.emit(HiveEvent::EmbeddingComputed {
405 agent_id: ctx.agent_id.clone(),
406 dimensions,
407 duration_ms,
408 });
409 }
410 Err(e) => {
411 tracing::warn!(
412 agent_id = %ctx.agent_id,
413 error = %e,
414 "Failed to compute embedding, storing without"
415 );
416 }
417 }
418 }
419 }
420
421 match ctx.substrate.store_experience(exp).await {
422 Ok(id) => {
423 ctx.event_emitter.emit(HiveEvent::ExperienceRecorded {
424 experience_id: id,
425 agent_id: ctx.agent_id.clone(),
426 });
427 }
428 Err(e) => {
429 tracing::warn!(
430 agent_id = %ctx.agent_id,
431 error = %e,
432 "Failed to store experience"
433 );
434 }
435 }
436 }
437
438 if count > 0 {
439 tracing::debug!(agent_id = %ctx.agent_id, count = count, "Recorded experiences");
440 }
441}
442
443#[cfg(test)]
444mod tests {
445 use super::*;
446 use async_trait::async_trait;
447 use futures_core::Stream;
448 use pulsedb::CollectiveId;
449 use pulsehive_core::error::{PulseHiveError, Result};
450 use pulsehive_core::llm::{LlmChunk, LlmResponse, TokenUsage};
451 use std::pin::Pin;
452 use std::sync::Mutex;
453
454 struct MockLlm {
458 responses: Mutex<Vec<LlmResponse>>,
459 }
460
461 impl MockLlm {
462 fn new(responses: Vec<LlmResponse>) -> Self {
463 Self {
464 responses: Mutex::new(responses),
465 }
466 }
467
468 fn text_response(content: &str) -> LlmResponse {
469 LlmResponse {
470 content: Some(content.into()),
471 tool_calls: vec![],
472 usage: TokenUsage::default(),
473 }
474 }
475
476 fn tool_call_response(id: &str, name: &str, args: serde_json::Value) -> LlmResponse {
477 LlmResponse {
478 content: None,
479 tool_calls: vec![ToolCall {
480 id: id.into(),
481 name: name.into(),
482 arguments: args,
483 }],
484 usage: TokenUsage::default(),
485 }
486 }
487 }
488
489 #[async_trait]
490 impl LlmProvider for MockLlm {
491 async fn chat(
492 &self,
493 _messages: Vec<Message>,
494 _tools: Vec<ToolDefinition>,
495 _config: &LlmConfig,
496 ) -> Result<LlmResponse> {
497 let mut responses = self.responses.lock().unwrap();
498 if responses.is_empty() {
499 Err(PulseHiveError::llm("No more scripted responses"))
500 } else {
501 Ok(responses.remove(0))
502 }
503 }
504
505 async fn chat_stream(
506 &self,
507 _messages: Vec<Message>,
508 _tools: Vec<ToolDefinition>,
509 _config: &LlmConfig,
510 ) -> Result<Pin<Box<dyn Stream<Item = Result<LlmChunk>> + Send>>> {
511 Err(PulseHiveError::llm("Streaming not used in loop"))
512 }
513 }
514
515 struct EchoTool;
518
519 #[async_trait]
520 impl Tool for EchoTool {
521 fn name(&self) -> &str {
522 "echo"
523 }
524 fn description(&self) -> &str {
525 "Echoes input"
526 }
527 fn parameters(&self) -> serde_json::Value {
528 serde_json::json!({"type": "object", "properties": {"text": {"type": "string"}}})
529 }
530 async fn execute(
531 &self,
532 params: serde_json::Value,
533 _ctx: &ToolContext,
534 ) -> Result<ToolResult> {
535 let text = params["text"].as_str().unwrap_or("no text");
536 Ok(ToolResult::text(format!("Echo: {text}")))
537 }
538 }
539
540 fn test_config(tools: Vec<Arc<dyn Tool>>) -> LlmAgentConfig {
543 LlmAgentConfig {
544 system_prompt: "You are a test agent.".into(),
545 tools,
546 lens: pulsehive_core::lens::Lens::default(),
547 llm_config: LlmConfig::new("mock", "test-model"),
548 experience_extractor: None,
549 refresh_every_n_tool_calls: None,
550 }
551 }
552
553 fn test_task() -> Task {
554 Task {
555 description: "Test task".into(),
556 collective_id: CollectiveId::new(),
557 }
558 }
559
560 fn test_substrate() -> Arc<dyn SubstrateProvider> {
561 let dir = tempfile::tempdir().unwrap();
563 let db =
564 pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default()).unwrap();
565 let dir = Box::leak(Box::new(dir));
567 let _ = dir;
568 Arc::new(pulsedb::PulseDBSubstrate::from_db(db))
569 }
570
571 #[tokio::test]
574 async fn test_text_only_response() {
575 let provider = Arc::new(MockLlm::new(vec![MockLlm::text_response(
576 "The answer is 42.",
577 )]));
578 let config = test_config(vec![]);
579 let task = test_task();
580 let substrate = test_substrate();
581 let emitter = EventEmitter::default();
582 let approval = pulsehive_core::approval::AutoApprove;
583
584 let outcome = run_agentic_loop(
585 config,
586 LoopContext {
587 agent_id: "agent-1".into(),
588 task: &task,
589 provider,
590 substrate,
591 approval_handler: &approval,
592 event_emitter: emitter,
593 max_iterations: DEFAULT_MAX_ITERATIONS,
594 embedding_provider: None,
595 },
596 )
597 .await;
598
599 assert!(
600 matches!(&outcome, AgentOutcome::Complete { response } if response == "The answer is 42.")
601 );
602 }
603
604 #[tokio::test]
605 async fn test_tool_call_then_response() {
606 let provider = Arc::new(MockLlm::new(vec![
607 MockLlm::tool_call_response("call_1", "echo", serde_json::json!({"text": "hello"})),
608 MockLlm::text_response("Echo said: hello"),
609 ]));
610 let config = test_config(vec![Arc::new(EchoTool)]);
611 let task = test_task();
612 let substrate = test_substrate();
613 let emitter = EventEmitter::default();
614 let approval = pulsehive_core::approval::AutoApprove;
615
616 let outcome = run_agentic_loop(
617 config,
618 LoopContext {
619 agent_id: "agent-1".into(),
620 task: &task,
621 provider,
622 substrate,
623 approval_handler: &approval,
624 event_emitter: emitter,
625 max_iterations: DEFAULT_MAX_ITERATIONS,
626 embedding_provider: None,
627 },
628 )
629 .await;
630
631 assert!(
632 matches!(&outcome, AgentOutcome::Complete { response } if response == "Echo said: hello")
633 );
634 }
635
636 #[tokio::test]
637 async fn test_max_iterations_reached() {
638 let responses: Vec<LlmResponse> = (0..5)
640 .map(|i| {
641 MockLlm::tool_call_response(
642 &format!("call_{i}"),
643 "echo",
644 serde_json::json!({"text": "loop"}),
645 )
646 })
647 .collect();
648
649 let provider = Arc::new(MockLlm::new(responses));
650 let config = test_config(vec![Arc::new(EchoTool)]);
651 let task = test_task();
652 let substrate = test_substrate();
653 let emitter = EventEmitter::default();
654 let approval = pulsehive_core::approval::AutoApprove;
655
656 let outcome = run_agentic_loop(
657 config,
658 LoopContext {
659 agent_id: "agent-1".into(),
660 task: &task,
661 provider,
662 substrate,
663 approval_handler: &approval,
664 event_emitter: emitter,
665 max_iterations: 3, embedding_provider: None,
667 },
668 )
669 .await;
670
671 assert!(matches!(outcome, AgentOutcome::MaxIterationsReached));
672 }
673
674 #[tokio::test]
675 async fn test_tool_not_found() {
676 let provider = Arc::new(MockLlm::new(vec![
678 MockLlm::tool_call_response("call_1", "nonexistent_tool", serde_json::json!({})),
679 MockLlm::text_response("I couldn't find that tool."),
680 ]));
681 let config = test_config(vec![]); let task = test_task();
683 let substrate = test_substrate();
684 let emitter = EventEmitter::default();
685 let approval = pulsehive_core::approval::AutoApprove;
686
687 let outcome = run_agentic_loop(
688 config,
689 LoopContext {
690 agent_id: "agent-1".into(),
691 task: &task,
692 provider,
693 substrate,
694 approval_handler: &approval,
695 event_emitter: emitter,
696 max_iterations: DEFAULT_MAX_ITERATIONS,
697 embedding_provider: None,
698 },
699 )
700 .await;
701
702 assert!(matches!(outcome, AgentOutcome::Complete { .. }));
704 }
705
706 #[tokio::test]
707 async fn test_llm_error_returns_error_outcome() {
708 let provider = Arc::new(MockLlm::new(vec![])); let config = test_config(vec![]);
711 let task = test_task();
712 let substrate = test_substrate();
713 let emitter = EventEmitter::default();
714 let approval = pulsehive_core::approval::AutoApprove;
715
716 let outcome = run_agentic_loop(
717 config,
718 LoopContext {
719 agent_id: "agent-1".into(),
720 task: &task,
721 provider,
722 substrate,
723 approval_handler: &approval,
724 event_emitter: emitter,
725 max_iterations: DEFAULT_MAX_ITERATIONS,
726 embedding_provider: None,
727 },
728 )
729 .await;
730
731 assert!(matches!(outcome, AgentOutcome::Error { .. }));
732 }
733
734 #[tokio::test]
735 async fn test_events_emitted_during_loop() {
736 let provider = Arc::new(MockLlm::new(vec![
737 MockLlm::tool_call_response("call_1", "echo", serde_json::json!({"text": "test"})),
738 MockLlm::text_response("Done"),
739 ]));
740 let config = test_config(vec![Arc::new(EchoTool)]);
741 let task = test_task();
742 let substrate = test_substrate();
743 let emitter = EventEmitter::default();
744 let mut rx = emitter.subscribe();
745 let approval = pulsehive_core::approval::AutoApprove;
746
747 let _outcome = run_agentic_loop(
748 config,
749 LoopContext {
750 agent_id: "agent-1".into(),
751 task: &task,
752 provider,
753 substrate,
754 approval_handler: &approval,
755 event_emitter: emitter,
756 max_iterations: DEFAULT_MAX_ITERATIONS,
757 embedding_provider: None,
758 },
759 )
760 .await;
761
762 let mut events = vec![];
764 while let Ok(event) = rx.try_recv() {
765 events.push(event);
766 }
767
768 assert!(events
771 .iter()
772 .any(|e| matches!(e, HiveEvent::SubstratePerceived { .. })));
773 assert!(events
774 .iter()
775 .any(|e| matches!(e, HiveEvent::LlmCallStarted { .. })));
776 assert!(events
777 .iter()
778 .any(|e| matches!(e, HiveEvent::LlmCallCompleted { .. })));
779 assert!(events
780 .iter()
781 .any(|e| matches!(e, HiveEvent::ToolCallStarted { .. })));
782 assert!(events
783 .iter()
784 .any(|e| matches!(e, HiveEvent::ToolCallCompleted { .. })));
785 }
786
787 fn test_config_with_refresh(
790 tools: Vec<Arc<dyn Tool>>,
791 refresh: Option<usize>,
792 ) -> LlmAgentConfig {
793 let mut config = test_config(tools);
794 config.refresh_every_n_tool_calls = refresh;
795 config
796 }
797
798 #[tokio::test]
799 async fn test_refresh_disabled_no_extra_perception() {
800 let provider = Arc::new(MockLlm::new(vec![
802 MockLlm::tool_call_response("c1", "echo", serde_json::json!({"text": "a"})),
803 MockLlm::text_response("Done"),
804 ]));
805 let config = test_config_with_refresh(vec![Arc::new(EchoTool)], None);
806 let task = test_task();
807 let substrate = test_substrate();
808 let emitter = EventEmitter::default();
809 let mut rx = emitter.subscribe();
810 let approval = pulsehive_core::approval::AutoApprove;
811
812 let _outcome = run_agentic_loop(
813 config,
814 LoopContext {
815 agent_id: "agent-no-refresh".into(),
816 task: &task,
817 provider,
818 substrate,
819 approval_handler: &approval,
820 event_emitter: emitter,
821 max_iterations: DEFAULT_MAX_ITERATIONS,
822 embedding_provider: None,
823 },
824 )
825 .await;
826
827 let mut events = vec![];
828 while let Ok(e) = rx.try_recv() {
829 events.push(e);
830 }
831
832 let perceive_count = events
833 .iter()
834 .filter(|e| matches!(e, HiveEvent::SubstratePerceived { .. }))
835 .count();
836 assert_eq!(
837 perceive_count, 1,
838 "With refresh=None, should have exactly 1 SubstratePerceived (initial). Got {perceive_count}"
839 );
840 }
841
842 #[tokio::test]
843 async fn test_refresh_every_1_triggers_after_tool_call() {
844 let provider = Arc::new(MockLlm::new(vec![
846 MockLlm::tool_call_response("c1", "echo", serde_json::json!({"text": "a"})),
847 MockLlm::tool_call_response("c2", "echo", serde_json::json!({"text": "b"})),
848 MockLlm::text_response("Done"),
849 ]));
850 let config = test_config_with_refresh(vec![Arc::new(EchoTool)], Some(1));
851 let task = test_task();
852 let substrate = test_substrate();
853 let emitter = EventEmitter::default();
854 let mut rx = emitter.subscribe();
855 let approval = pulsehive_core::approval::AutoApprove;
856
857 let _outcome = run_agentic_loop(
858 config,
859 LoopContext {
860 agent_id: "agent-refresh-1".into(),
861 task: &task,
862 provider,
863 substrate,
864 approval_handler: &approval,
865 event_emitter: emitter,
866 max_iterations: DEFAULT_MAX_ITERATIONS,
867 embedding_provider: None,
868 },
869 )
870 .await;
871
872 let mut events = vec![];
873 while let Ok(e) = rx.try_recv() {
874 events.push(e);
875 }
876
877 let perceive_count = events
878 .iter()
879 .filter(|e| matches!(e, HiveEvent::SubstratePerceived { .. }))
880 .count();
881 assert!(
882 perceive_count >= 3,
883 "With refresh=Some(1) and 2 tool calls, should have >= 3 SubstratePerceived. Got {perceive_count}"
884 );
885 }
886
887 #[tokio::test]
888 async fn test_refresh_not_triggered_below_threshold() {
889 let provider = Arc::new(MockLlm::new(vec![
891 MockLlm::tool_call_response("c1", "echo", serde_json::json!({"text": "a"})),
892 MockLlm::text_response("Done"),
893 ]));
894 let config = test_config_with_refresh(vec![Arc::new(EchoTool)], Some(10));
895 let task = test_task();
896 let substrate = test_substrate();
897 let emitter = EventEmitter::default();
898 let mut rx = emitter.subscribe();
899 let approval = pulsehive_core::approval::AutoApprove;
900
901 let _outcome = run_agentic_loop(
902 config,
903 LoopContext {
904 agent_id: "agent-high-threshold".into(),
905 task: &task,
906 provider,
907 substrate,
908 approval_handler: &approval,
909 event_emitter: emitter,
910 max_iterations: DEFAULT_MAX_ITERATIONS,
911 embedding_provider: None,
912 },
913 )
914 .await;
915
916 let mut events = vec![];
917 while let Ok(e) = rx.try_recv() {
918 events.push(e);
919 }
920
921 let perceive_count = events
922 .iter()
923 .filter(|e| matches!(e, HiveEvent::SubstratePerceived { .. }))
924 .count();
925 assert_eq!(
926 perceive_count, 1,
927 "With refresh=Some(10) and 1 tool call, should have exactly 1 SubstratePerceived. Got {perceive_count}"
928 );
929 }
930}