1use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::Instant;
11
12use pulsedb::SubstrateProvider;
13use tracing::Instrument;
14
15use pulsehive_core::agent::{AgentOutcome, ExperienceExtractor, LlmAgentConfig};
16use pulsehive_core::approval::{ApprovalHandler, ApprovalResult, PendingAction};
17use pulsehive_core::event::{EventEmitter, HiveEvent};
18use pulsehive_core::lens::Lens;
19use pulsehive_core::llm::{LlmConfig, LlmProvider, Message, ToolCall, ToolDefinition};
20use pulsehive_core::tool::{Tool, ToolContext, ToolResult};
21
22use crate::hivemind::Task;
23
24pub const DEFAULT_MAX_ITERATIONS: usize = 25;
26
27pub struct LoopContext<'a> {
29 pub agent_id: String,
30 pub task: &'a Task,
31 pub provider: Arc<dyn LlmProvider>,
32 pub substrate: Arc<dyn SubstrateProvider>,
33 pub approval_handler: &'a dyn ApprovalHandler,
34 pub event_emitter: EventEmitter,
35 pub max_iterations: usize,
36 pub embedding_provider: Option<Arc<dyn pulsehive_core::embedding::EmbeddingProvider>>,
38}
39
40pub async fn run_agentic_loop(config: LlmAgentConfig, ctx: LoopContext<'_>) -> AgentOutcome {
45 let LlmAgentConfig {
46 system_prompt,
47 tools,
48 lens,
49 llm_config,
50 experience_extractor,
51 refresh_every_n_tool_calls,
52 } = config;
53
54 let tool_map: HashMap<&str, &dyn Tool> = tools
56 .iter()
57 .map(|t| (t.name(), t.as_ref() as &dyn Tool))
58 .collect();
59 let tool_defs: Vec<ToolDefinition> = tools
60 .iter()
61 .map(|t| ToolDefinition::from_tool(t.as_ref()))
62 .collect();
63
64 let context_messages = perceive(
66 ctx.substrate.as_ref(),
67 &lens,
68 ctx.task,
69 &ctx.event_emitter,
70 &ctx.agent_id,
71 )
72 .instrument(tracing::info_span!("perceive", agent_id = %ctx.agent_id))
73 .await;
74
75 let mut messages: Vec<Message> = Vec::new();
77 messages.push(Message::system(&system_prompt));
78 messages.extend(context_messages);
79 messages.push(Message::user(&ctx.task.description));
80
81 let outcome = think_act_loop(
83 &ctx.agent_id,
84 &mut messages,
85 &tool_map,
86 &tool_defs,
87 &llm_config,
88 &ctx,
89 &lens,
90 refresh_every_n_tool_calls,
91 )
92 .await;
93
94 record(&messages, &outcome, &ctx, experience_extractor.as_deref())
96 .instrument(tracing::info_span!("record", agent_id = %ctx.agent_id))
97 .await;
98
99 outcome
100}
101
102#[allow(clippy::too_many_arguments)]
108async fn think_act_loop(
109 agent_id: &str,
110 messages: &mut Vec<Message>,
111 tool_map: &HashMap<&str, &dyn Tool>,
112 tool_defs: &[ToolDefinition],
113 llm_config: &LlmConfig,
114 ctx: &LoopContext<'_>,
115 lens: &Lens,
116 refresh_every: Option<usize>,
117) -> AgentOutcome {
118 let mut tool_calls_since_refresh: usize = 0;
119
120 for iteration in 1..=ctx.max_iterations {
121 let think_span = tracing::info_span!(
122 "think",
123 agent_id = %agent_id,
124 iteration,
125 model = %llm_config.model,
126 message_count = messages.len(),
127 );
128
129 ctx.event_emitter.emit(HiveEvent::LlmCallStarted {
131 timestamp_ms: pulsehive_core::event::now_ms(),
132 agent_id: agent_id.to_string(),
133 model: llm_config.model.clone(),
134 message_count: messages.len(),
135 });
136
137 let start = Instant::now();
138 let response = ctx
139 .provider
140 .chat(messages.clone(), tool_defs.to_vec(), llm_config)
141 .instrument(think_span)
142 .await;
143 let duration_ms = start.elapsed().as_millis() as u64;
144
145 let (input_tokens, output_tokens) = match &response {
146 Ok(r) => (r.usage.input_tokens, r.usage.output_tokens),
147 Err(_) => (0, 0),
148 };
149 ctx.event_emitter.emit(HiveEvent::LlmCallCompleted {
150 timestamp_ms: pulsehive_core::event::now_ms(),
151 agent_id: agent_id.to_string(),
152 model: llm_config.model.clone(),
153 duration_ms,
154 input_tokens,
155 output_tokens,
156 });
157
158 let response = match response {
159 Ok(r) => r,
160 Err(e) => {
161 tracing::error!(agent_id = %agent_id, error = %e, "LLM call failed");
162 return AgentOutcome::Error {
163 error: e.to_string(),
164 };
165 }
166 };
167
168 if response.tool_calls.is_empty() {
170 let content = response.content.unwrap_or_default();
171 tracing::debug!(agent_id = %agent_id, "Final response received");
172 messages.push(Message::assistant(&content));
173 return AgentOutcome::Complete { response: content };
174 }
175
176 tracing::debug!(
177 agent_id = %agent_id,
178 tool_count = response.tool_calls.len(),
179 "Tool calls received"
180 );
181
182 messages.push(Message::assistant_with_tool_calls(
183 response.tool_calls.clone(),
184 ));
185
186 for tool_call in &response.tool_calls {
187 let result = execute_tool_call(
188 agent_id,
189 tool_call,
190 tool_map,
191 &ctx.substrate,
192 ctx.approval_handler,
193 &ctx.event_emitter,
194 &ctx.task.collective_id,
195 )
196 .instrument(tracing::info_span!("act", agent_id = %agent_id, tool = %tool_call.name))
197 .await;
198
199 messages.push(Message::tool_result(&tool_call.id, result.to_content()));
200 tool_calls_since_refresh += 1;
201 }
202
203 if let Some(interval) = refresh_every {
205 if tool_calls_since_refresh >= interval {
206 tracing::info!(
207 agent_id = %agent_id,
208 tool_calls = tool_calls_since_refresh,
209 "Mid-task substrate refresh"
210 );
211 let refreshed = perceive(
212 ctx.substrate.as_ref(),
213 lens,
214 ctx.task,
215 &ctx.event_emitter,
216 agent_id,
217 )
218 .instrument(tracing::info_span!("perceive", agent_id = %agent_id, refresh = true))
219 .await;
220 messages.extend(refreshed);
221 tool_calls_since_refresh = 0;
222 }
223 }
224 }
225
226 tracing::warn!(agent_id = %agent_id, max = ctx.max_iterations, "Max iterations reached");
227 AgentOutcome::MaxIterationsReached
228}
229
230async fn execute_tool_call(
232 agent_id: &str,
233 tool_call: &ToolCall,
234 tool_map: &HashMap<&str, &dyn Tool>,
235 substrate: &Arc<dyn SubstrateProvider>,
236 approval_handler: &dyn ApprovalHandler,
237 event_emitter: &EventEmitter,
238 collective_id: &pulsedb::CollectiveId,
239) -> ToolResult {
240 let Some(&tool) = tool_map.get(tool_call.name.as_str()) else {
241 tracing::warn!(agent_id = %agent_id, tool = %tool_call.name, "Tool not found");
242 return ToolResult::error(format!("Tool '{}' not found", tool_call.name));
243 };
244
245 if tool.requires_approval() {
247 event_emitter.emit(HiveEvent::ToolApprovalRequested {
248 timestamp_ms: pulsehive_core::event::now_ms(),
249 agent_id: agent_id.to_string(),
250 tool_name: tool_call.name.clone(),
251 description: format!("Execute {} with {:?}", tool_call.name, tool_call.arguments),
252 });
253
254 let action = PendingAction {
255 agent_id: agent_id.to_string(),
256 tool_name: tool_call.name.clone(),
257 params: tool_call.arguments.clone(),
258 description: format!("Execute {} tool", tool_call.name),
259 };
260
261 match approval_handler.request_approval(&action).await {
262 Ok(ApprovalResult::Approved) => {} Ok(ApprovalResult::Denied { reason }) => {
264 return ToolResult::error(format!("Tool execution denied: {reason}"));
265 }
266 Ok(ApprovalResult::Modified { new_params }) => {
267 return execute_tool_inner(
269 agent_id,
270 &tool_call.name,
271 new_params,
272 tool,
273 substrate,
274 event_emitter,
275 collective_id,
276 )
277 .await;
278 }
279 Err(e) => {
280 return ToolResult::error(format!("Approval handler error: {e}"));
281 }
282 }
283 }
284
285 execute_tool_inner(
286 agent_id,
287 &tool_call.name,
288 tool_call.arguments.clone(),
289 tool,
290 substrate,
291 event_emitter,
292 collective_id,
293 )
294 .await
295}
296
297async fn execute_tool_inner(
299 agent_id: &str,
300 tool_name: &str,
301 params: serde_json::Value,
302 tool: &dyn Tool,
303 substrate: &Arc<dyn SubstrateProvider>,
304 event_emitter: &EventEmitter,
305 collective_id: &pulsedb::CollectiveId,
306) -> ToolResult {
307 let params_str = serde_json::to_string(¶ms).unwrap_or_default();
308 event_emitter.emit(HiveEvent::ToolCallStarted {
309 timestamp_ms: pulsehive_core::event::now_ms(),
310 agent_id: agent_id.to_string(),
311 tool_name: tool_name.to_string(),
312 params: params_str,
313 });
314
315 let start = Instant::now();
316 let context = ToolContext {
317 agent_id: agent_id.to_string(),
318 collective_id: *collective_id,
319 substrate: Arc::clone(substrate),
320 event_emitter: event_emitter.clone(),
321 };
322
323 let result = match tool
324 .execute(params, &context)
325 .instrument(tracing::debug_span!("tool_execute", tool = %tool_name))
326 .await
327 {
328 Ok(result) => result,
329 Err(e) => {
330 tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
331 ToolResult::error(e.to_string())
332 }
333 };
334
335 let duration_ms = start.elapsed().as_millis() as u64;
336 tracing::debug!(tool = %tool_name, duration_ms, "Tool completed");
337 let result_preview: String = result.to_content().chars().take(200).collect();
338 event_emitter.emit(HiveEvent::ToolCallCompleted {
339 timestamp_ms: pulsehive_core::event::now_ms(),
340 agent_id: agent_id.to_string(),
341 tool_name: tool_name.to_string(),
342 duration_ms,
343 result_preview,
344 });
345
346 result
347}
348
349async fn perceive(
353 substrate: &dyn SubstrateProvider,
354 lens: &Lens,
355 task: &Task,
356 event_emitter: &EventEmitter,
357 agent_id: &str,
358) -> Vec<Message> {
359 use crate::perception;
360 use pulsehive_core::context::ContextBudget;
361
362 let budget = ContextBudget::from_lens(lens);
363 let messages = match perception::assemble_context(substrate, lens, task.collective_id, &budget)
364 .await
365 {
366 Ok(msgs) => msgs,
367 Err(e) => {
368 tracing::warn!(agent_id = %agent_id, error = %e, "Perception failed, continuing without context");
369 vec![]
370 }
371 };
372
373 let experience_count = messages
375 .iter()
376 .filter(|m| matches!(m, pulsehive_core::llm::Message::System { content } if !content.is_empty()))
377 .count()
378 .max(1)
379 .saturating_sub(1); event_emitter.emit(HiveEvent::SubstratePerceived {
381 timestamp_ms: pulsehive_core::event::now_ms(),
382 agent_id: agent_id.to_string(),
383 experience_count,
384 insight_count: 0,
385 });
386
387 messages
388}
389
390async fn record(
394 conversation: &[Message],
395 outcome: &AgentOutcome,
396 ctx: &LoopContext<'_>,
397 extractor: Option<&dyn ExperienceExtractor>,
398) {
399 use crate::experience::DefaultExperienceExtractor;
400 use pulsehive_core::agent::ExtractionContext;
401
402 let extraction_ctx = ExtractionContext {
403 agent_id: ctx.agent_id.clone(),
404 collective_id: ctx.task.collective_id,
405 task_description: ctx.task.description.clone(),
406 };
407
408 let default_extractor = DefaultExperienceExtractor;
409 let extractor: &dyn ExperienceExtractor = extractor.unwrap_or(&default_extractor);
410
411 let experiences = extractor
412 .extract(conversation, outcome, &extraction_ctx)
413 .await;
414
415 let count = experiences.len();
416 for mut exp in experiences {
417 if let Some(provider) = &ctx.embedding_provider {
419 if exp.embedding.is_none() {
420 let start = std::time::Instant::now();
421 match provider.embed(&exp.content).await {
422 Ok(embedding) => {
423 let duration_ms = start.elapsed().as_millis() as u64;
424 let dimensions = embedding.len();
425 exp.embedding = Some(embedding);
426 ctx.event_emitter.emit(HiveEvent::EmbeddingComputed {
427 timestamp_ms: pulsehive_core::event::now_ms(),
428 agent_id: ctx.agent_id.clone(),
429 dimensions,
430 duration_ms,
431 });
432 }
433 Err(e) => {
434 tracing::warn!(
435 agent_id = %ctx.agent_id,
436 error = %e,
437 "Failed to compute embedding, storing without"
438 );
439 }
440 }
441 }
442 }
443
444 let content_preview: String = exp.content.chars().take(200).collect();
446 let experience_type = format!("{:?}", exp.experience_type);
447 let importance = exp.importance;
448
449 match ctx.substrate.store_experience(exp).await {
450 Ok(id) => {
451 ctx.event_emitter.emit(HiveEvent::ExperienceRecorded {
452 timestamp_ms: pulsehive_core::event::now_ms(),
453 experience_id: id,
454 agent_id: ctx.agent_id.clone(),
455 content_preview,
456 experience_type,
457 importance,
458 });
459 }
460 Err(e) => {
461 tracing::warn!(
462 agent_id = %ctx.agent_id,
463 error = %e,
464 "Failed to store experience"
465 );
466 }
467 }
468 }
469
470 if count > 0 {
471 tracing::debug!(agent_id = %ctx.agent_id, count = count, "Recorded experiences");
472 }
473}
474
475#[cfg(test)]
476mod tests {
477 use super::*;
478 use async_trait::async_trait;
479 use futures_core::Stream;
480 use pulsedb::CollectiveId;
481 use pulsehive_core::error::{PulseHiveError, Result};
482 use pulsehive_core::llm::{LlmChunk, LlmResponse, TokenUsage};
483 use std::pin::Pin;
484 use std::sync::Mutex;
485
486 struct MockLlm {
490 responses: Mutex<Vec<LlmResponse>>,
491 }
492
493 impl MockLlm {
494 fn new(responses: Vec<LlmResponse>) -> Self {
495 Self {
496 responses: Mutex::new(responses),
497 }
498 }
499
500 fn text_response(content: &str) -> LlmResponse {
501 LlmResponse {
502 content: Some(content.into()),
503 tool_calls: vec![],
504 usage: TokenUsage::default(),
505 }
506 }
507
508 fn tool_call_response(id: &str, name: &str, args: serde_json::Value) -> LlmResponse {
509 LlmResponse {
510 content: None,
511 tool_calls: vec![ToolCall {
512 id: id.into(),
513 name: name.into(),
514 arguments: args,
515 }],
516 usage: TokenUsage::default(),
517 }
518 }
519 }
520
521 #[async_trait]
522 impl LlmProvider for MockLlm {
523 async fn chat(
524 &self,
525 _messages: Vec<Message>,
526 _tools: Vec<ToolDefinition>,
527 _config: &LlmConfig,
528 ) -> Result<LlmResponse> {
529 let mut responses = self.responses.lock().unwrap();
530 if responses.is_empty() {
531 Err(PulseHiveError::llm("No more scripted responses"))
532 } else {
533 Ok(responses.remove(0))
534 }
535 }
536
537 async fn chat_stream(
538 &self,
539 _messages: Vec<Message>,
540 _tools: Vec<ToolDefinition>,
541 _config: &LlmConfig,
542 ) -> Result<Pin<Box<dyn Stream<Item = Result<LlmChunk>> + Send>>> {
543 Err(PulseHiveError::llm("Streaming not used in loop"))
544 }
545 }
546
547 struct EchoTool;
550
551 #[async_trait]
552 impl Tool for EchoTool {
553 fn name(&self) -> &str {
554 "echo"
555 }
556 fn description(&self) -> &str {
557 "Echoes input"
558 }
559 fn parameters(&self) -> serde_json::Value {
560 serde_json::json!({"type": "object", "properties": {"text": {"type": "string"}}})
561 }
562 async fn execute(
563 &self,
564 params: serde_json::Value,
565 _ctx: &ToolContext,
566 ) -> Result<ToolResult> {
567 let text = params["text"].as_str().unwrap_or("no text");
568 Ok(ToolResult::text(format!("Echo: {text}")))
569 }
570 }
571
572 fn test_config(tools: Vec<Arc<dyn Tool>>) -> LlmAgentConfig {
575 LlmAgentConfig {
576 system_prompt: "You are a test agent.".into(),
577 tools,
578 lens: pulsehive_core::lens::Lens::default(),
579 llm_config: LlmConfig::new("mock", "test-model"),
580 experience_extractor: None,
581 refresh_every_n_tool_calls: None,
582 }
583 }
584
585 fn test_task() -> Task {
586 Task {
587 description: "Test task".into(),
588 collective_id: CollectiveId::new(),
589 }
590 }
591
592 fn test_substrate() -> Arc<dyn SubstrateProvider> {
593 let dir = tempfile::tempdir().unwrap();
595 let db =
596 pulsedb::PulseDB::open(dir.path().join("test.db"), pulsedb::Config::default()).unwrap();
597 let dir = Box::leak(Box::new(dir));
599 let _ = dir;
600 Arc::new(pulsedb::PulseDBSubstrate::from_db(db))
601 }
602
603 #[tokio::test]
606 async fn test_text_only_response() {
607 let provider = Arc::new(MockLlm::new(vec![MockLlm::text_response(
608 "The answer is 42.",
609 )]));
610 let config = test_config(vec![]);
611 let task = test_task();
612 let substrate = test_substrate();
613 let emitter = EventEmitter::default();
614 let approval = pulsehive_core::approval::AutoApprove;
615
616 let outcome = run_agentic_loop(
617 config,
618 LoopContext {
619 agent_id: "agent-1".into(),
620 task: &task,
621 provider,
622 substrate,
623 approval_handler: &approval,
624 event_emitter: emitter,
625 max_iterations: DEFAULT_MAX_ITERATIONS,
626 embedding_provider: None,
627 },
628 )
629 .await;
630
631 assert!(
632 matches!(&outcome, AgentOutcome::Complete { response } if response == "The answer is 42.")
633 );
634 }
635
636 #[tokio::test]
637 async fn test_tool_call_then_response() {
638 let provider = Arc::new(MockLlm::new(vec![
639 MockLlm::tool_call_response("call_1", "echo", serde_json::json!({"text": "hello"})),
640 MockLlm::text_response("Echo said: hello"),
641 ]));
642 let config = test_config(vec![Arc::new(EchoTool)]);
643 let task = test_task();
644 let substrate = test_substrate();
645 let emitter = EventEmitter::default();
646 let approval = pulsehive_core::approval::AutoApprove;
647
648 let outcome = run_agentic_loop(
649 config,
650 LoopContext {
651 agent_id: "agent-1".into(),
652 task: &task,
653 provider,
654 substrate,
655 approval_handler: &approval,
656 event_emitter: emitter,
657 max_iterations: DEFAULT_MAX_ITERATIONS,
658 embedding_provider: None,
659 },
660 )
661 .await;
662
663 assert!(
664 matches!(&outcome, AgentOutcome::Complete { response } if response == "Echo said: hello")
665 );
666 }
667
668 #[tokio::test]
669 async fn test_max_iterations_reached() {
670 let responses: Vec<LlmResponse> = (0..5)
672 .map(|i| {
673 MockLlm::tool_call_response(
674 &format!("call_{i}"),
675 "echo",
676 serde_json::json!({"text": "loop"}),
677 )
678 })
679 .collect();
680
681 let provider = Arc::new(MockLlm::new(responses));
682 let config = test_config(vec![Arc::new(EchoTool)]);
683 let task = test_task();
684 let substrate = test_substrate();
685 let emitter = EventEmitter::default();
686 let approval = pulsehive_core::approval::AutoApprove;
687
688 let outcome = run_agentic_loop(
689 config,
690 LoopContext {
691 agent_id: "agent-1".into(),
692 task: &task,
693 provider,
694 substrate,
695 approval_handler: &approval,
696 event_emitter: emitter,
697 max_iterations: 3, embedding_provider: None,
699 },
700 )
701 .await;
702
703 assert!(matches!(outcome, AgentOutcome::MaxIterationsReached));
704 }
705
706 #[tokio::test]
707 async fn test_tool_not_found() {
708 let provider = Arc::new(MockLlm::new(vec![
710 MockLlm::tool_call_response("call_1", "nonexistent_tool", serde_json::json!({})),
711 MockLlm::text_response("I couldn't find that tool."),
712 ]));
713 let config = test_config(vec![]); let task = test_task();
715 let substrate = test_substrate();
716 let emitter = EventEmitter::default();
717 let approval = pulsehive_core::approval::AutoApprove;
718
719 let outcome = run_agentic_loop(
720 config,
721 LoopContext {
722 agent_id: "agent-1".into(),
723 task: &task,
724 provider,
725 substrate,
726 approval_handler: &approval,
727 event_emitter: emitter,
728 max_iterations: DEFAULT_MAX_ITERATIONS,
729 embedding_provider: None,
730 },
731 )
732 .await;
733
734 assert!(matches!(outcome, AgentOutcome::Complete { .. }));
736 }
737
738 #[tokio::test]
739 async fn test_llm_error_returns_error_outcome() {
740 let provider = Arc::new(MockLlm::new(vec![])); let config = test_config(vec![]);
743 let task = test_task();
744 let substrate = test_substrate();
745 let emitter = EventEmitter::default();
746 let approval = pulsehive_core::approval::AutoApprove;
747
748 let outcome = run_agentic_loop(
749 config,
750 LoopContext {
751 agent_id: "agent-1".into(),
752 task: &task,
753 provider,
754 substrate,
755 approval_handler: &approval,
756 event_emitter: emitter,
757 max_iterations: DEFAULT_MAX_ITERATIONS,
758 embedding_provider: None,
759 },
760 )
761 .await;
762
763 assert!(matches!(outcome, AgentOutcome::Error { .. }));
764 }
765
766 #[tokio::test]
767 async fn test_events_emitted_during_loop() {
768 let provider = Arc::new(MockLlm::new(vec![
769 MockLlm::tool_call_response("call_1", "echo", serde_json::json!({"text": "test"})),
770 MockLlm::text_response("Done"),
771 ]));
772 let config = test_config(vec![Arc::new(EchoTool)]);
773 let task = test_task();
774 let substrate = test_substrate();
775 let emitter = EventEmitter::default();
776 let mut rx = emitter.subscribe();
777 let approval = pulsehive_core::approval::AutoApprove;
778
779 let _outcome = run_agentic_loop(
780 config,
781 LoopContext {
782 agent_id: "agent-1".into(),
783 task: &task,
784 provider,
785 substrate,
786 approval_handler: &approval,
787 event_emitter: emitter,
788 max_iterations: DEFAULT_MAX_ITERATIONS,
789 embedding_provider: None,
790 },
791 )
792 .await;
793
794 let mut events = vec![];
796 while let Ok(event) = rx.try_recv() {
797 events.push(event);
798 }
799
800 assert!(events
803 .iter()
804 .any(|e| matches!(e, HiveEvent::SubstratePerceived { .. })));
805 assert!(events
806 .iter()
807 .any(|e| matches!(e, HiveEvent::LlmCallStarted { .. })));
808 assert!(events
809 .iter()
810 .any(|e| matches!(e, HiveEvent::LlmCallCompleted { .. })));
811 assert!(events
812 .iter()
813 .any(|e| matches!(e, HiveEvent::ToolCallStarted { .. })));
814 assert!(events
815 .iter()
816 .any(|e| matches!(e, HiveEvent::ToolCallCompleted { .. })));
817 }
818
819 fn test_config_with_refresh(
822 tools: Vec<Arc<dyn Tool>>,
823 refresh: Option<usize>,
824 ) -> LlmAgentConfig {
825 let mut config = test_config(tools);
826 config.refresh_every_n_tool_calls = refresh;
827 config
828 }
829
830 #[tokio::test]
831 async fn test_refresh_disabled_no_extra_perception() {
832 let provider = Arc::new(MockLlm::new(vec![
834 MockLlm::tool_call_response("c1", "echo", serde_json::json!({"text": "a"})),
835 MockLlm::text_response("Done"),
836 ]));
837 let config = test_config_with_refresh(vec![Arc::new(EchoTool)], None);
838 let task = test_task();
839 let substrate = test_substrate();
840 let emitter = EventEmitter::default();
841 let mut rx = emitter.subscribe();
842 let approval = pulsehive_core::approval::AutoApprove;
843
844 let _outcome = run_agentic_loop(
845 config,
846 LoopContext {
847 agent_id: "agent-no-refresh".into(),
848 task: &task,
849 provider,
850 substrate,
851 approval_handler: &approval,
852 event_emitter: emitter,
853 max_iterations: DEFAULT_MAX_ITERATIONS,
854 embedding_provider: None,
855 },
856 )
857 .await;
858
859 let mut events = vec![];
860 while let Ok(e) = rx.try_recv() {
861 events.push(e);
862 }
863
864 let perceive_count = events
865 .iter()
866 .filter(|e| matches!(e, HiveEvent::SubstratePerceived { .. }))
867 .count();
868 assert_eq!(
869 perceive_count, 1,
870 "With refresh=None, should have exactly 1 SubstratePerceived (initial). Got {perceive_count}"
871 );
872 }
873
874 #[tokio::test]
875 async fn test_refresh_every_1_triggers_after_tool_call() {
876 let provider = Arc::new(MockLlm::new(vec![
878 MockLlm::tool_call_response("c1", "echo", serde_json::json!({"text": "a"})),
879 MockLlm::tool_call_response("c2", "echo", serde_json::json!({"text": "b"})),
880 MockLlm::text_response("Done"),
881 ]));
882 let config = test_config_with_refresh(vec![Arc::new(EchoTool)], Some(1));
883 let task = test_task();
884 let substrate = test_substrate();
885 let emitter = EventEmitter::default();
886 let mut rx = emitter.subscribe();
887 let approval = pulsehive_core::approval::AutoApprove;
888
889 let _outcome = run_agentic_loop(
890 config,
891 LoopContext {
892 agent_id: "agent-refresh-1".into(),
893 task: &task,
894 provider,
895 substrate,
896 approval_handler: &approval,
897 event_emitter: emitter,
898 max_iterations: DEFAULT_MAX_ITERATIONS,
899 embedding_provider: None,
900 },
901 )
902 .await;
903
904 let mut events = vec![];
905 while let Ok(e) = rx.try_recv() {
906 events.push(e);
907 }
908
909 let perceive_count = events
910 .iter()
911 .filter(|e| matches!(e, HiveEvent::SubstratePerceived { .. }))
912 .count();
913 assert!(
914 perceive_count >= 3,
915 "With refresh=Some(1) and 2 tool calls, should have >= 3 SubstratePerceived. Got {perceive_count}"
916 );
917 }
918
919 #[tokio::test]
920 async fn test_refresh_not_triggered_below_threshold() {
921 let provider = Arc::new(MockLlm::new(vec![
923 MockLlm::tool_call_response("c1", "echo", serde_json::json!({"text": "a"})),
924 MockLlm::text_response("Done"),
925 ]));
926 let config = test_config_with_refresh(vec![Arc::new(EchoTool)], Some(10));
927 let task = test_task();
928 let substrate = test_substrate();
929 let emitter = EventEmitter::default();
930 let mut rx = emitter.subscribe();
931 let approval = pulsehive_core::approval::AutoApprove;
932
933 let _outcome = run_agentic_loop(
934 config,
935 LoopContext {
936 agent_id: "agent-high-threshold".into(),
937 task: &task,
938 provider,
939 substrate,
940 approval_handler: &approval,
941 event_emitter: emitter,
942 max_iterations: DEFAULT_MAX_ITERATIONS,
943 embedding_provider: None,
944 },
945 )
946 .await;
947
948 let mut events = vec![];
949 while let Ok(e) = rx.try_recv() {
950 events.push(e);
951 }
952
953 let perceive_count = events
954 .iter()
955 .filter(|e| matches!(e, HiveEvent::SubstratePerceived { .. }))
956 .count();
957 assert_eq!(
958 perceive_count, 1,
959 "With refresh=Some(10) and 1 tool call, should have exactly 1 SubstratePerceived. Got {perceive_count}"
960 );
961 }
962}