1use std::collections::VecDeque;
43use std::sync::Arc;
44
45use anyhow::Result;
46use async_trait::async_trait;
47
48use tokio::sync::RwLock;
49
50use brainwires_core::{ChatResponse, Message, ToolResult, ToolUse};
51
52use crate::agent_hooks::{
53 AgentLifecycleHooks, ConversationView, IterationContext, IterationDecision, ToolDecision,
54};
55use crate::communication::CommunicationHub;
56use crate::file_locks::{FileLockManager, LockType};
57
58#[derive(Debug, Clone)]
60pub struct AgentExecutionResult {
61 pub agent_id: String,
63 pub success: bool,
65 pub output: String,
67 pub iterations: usize,
69 pub tools_used: Vec<String>,
71}
72
73struct LoopDetector {
76 window_size: usize,
77 enabled: bool,
78 recent: VecDeque<String>,
79}
80
81impl LoopDetector {
82 fn new(window_size: usize, enabled: bool) -> Self {
83 Self {
84 window_size,
85 enabled,
86 recent: VecDeque::with_capacity(window_size),
87 }
88 }
89
90 fn record(&mut self, tool_name: &str) -> Option<String> {
92 if !self.enabled {
93 return None;
94 }
95 if self.recent.len() == self.window_size {
96 self.recent.pop_front();
97 }
98 self.recent.push_back(tool_name.to_string());
99 if self.recent.len() == self.window_size && self.recent.iter().all(|n| n == tool_name) {
100 Some(tool_name.to_string())
101 } else {
102 None
103 }
104 }
105}
106
107#[async_trait]
117pub trait AgentRuntime: Send + Sync {
118 fn agent_id(&self) -> &str;
120
121 fn max_iterations(&self) -> usize;
123
124 async fn call_provider(&self) -> Result<ChatResponse>;
129
130 fn extract_tool_uses(&self, response: &ChatResponse) -> Vec<ToolUse>;
132
133 fn is_completion(&self, response: &ChatResponse) -> bool;
137
138 async fn execute_tool(&self, tool_use: &ToolUse) -> Result<ToolResult>;
140
141 fn get_lock_requirement(&self, tool_use: &ToolUse) -> Option<(String, LockType)>;
147
148 async fn on_provider_response(&self, response: &ChatResponse);
153
154 async fn on_tool_result(&self, tool_use: &ToolUse, result: &ToolResult);
159
160 async fn on_completion(&self, response: &ChatResponse) -> Result<Option<String>>;
169
170 async fn on_iteration_limit(&self, iterations: usize) -> String;
175
176 fn lifecycle_hooks(&self) -> Option<&dyn AgentLifecycleHooks> {
182 None
183 }
184
185 fn context_budget_tokens(&self) -> Option<u64> {
187 None
188 }
189
190 fn conversation(&self) -> Option<&RwLock<Vec<Message>>> {
196 None
197 }
198}
199
200#[tracing::instrument(name = "agent.execute", skip_all, fields(agent_id = agent.agent_id()))]
212pub async fn run_agent_loop(
213 agent: &dyn AgentRuntime,
214 hub: &CommunicationHub,
215 lock_manager: &Arc<FileLockManager>,
216) -> Result<AgentExecutionResult> {
217 let agent_id = agent.agent_id().to_string();
218 let mut iterations: usize = 0;
219 let mut tools_used = Vec::new();
220 let mut loop_detector = LoopDetector::new(5, true);
221 let start_time = std::time::Instant::now();
222
223 if !hub.is_registered(&agent_id).await {
225 hub.register_agent(agent_id.clone()).await?;
226 }
227
228 let hooks = agent.lifecycle_hooks();
229
230 loop {
231 if iterations >= agent.max_iterations() {
233 tracing::warn!(agent_id = %agent_id, iterations, "agent hit iteration limit");
234 let output = agent.on_iteration_limit(iterations).await;
235 let _ = hub.unregister_agent(&agent_id).await;
236 lock_manager.release_all_locks(&agent_id).await;
237 return Ok(AgentExecutionResult {
238 agent_id,
239 success: false,
240 output,
241 iterations,
242 tools_used,
243 });
244 }
245
246 iterations += 1;
247
248 if let Some(hooks) = hooks
250 && let Some(conv_lock) = agent.conversation()
251 {
252 let conv_len = conv_lock.read().await.len();
253 let iter_ctx = IterationContext {
254 agent_id: &agent_id,
255 iteration: iterations as u32,
256 max_iterations: agent.max_iterations() as u32,
257 total_tokens_used: 0,
258 total_cost_usd: 0.0,
259 elapsed: start_time.elapsed(),
260 conversation_len: conv_len,
261 };
262 let mut history = conv_lock.write().await;
263 let mut view = ConversationView::new(&mut history);
264 match hooks.on_before_iteration(&iter_ctx, &mut view).await {
265 IterationDecision::Continue => {}
266 IterationDecision::Skip => continue,
267 IterationDecision::Abort(reason) => {
268 let output = format!("Aborted by hook: {}", reason);
269 let _ = hub.unregister_agent(&agent_id).await;
270 lock_manager.release_all_locks(&agent_id).await;
271 return Ok(AgentExecutionResult {
272 agent_id,
273 success: false,
274 output,
275 iterations,
276 tools_used,
277 });
278 }
279 }
280 }
281
282 if let Some(hooks) = hooks
284 && let Some(conv_lock) = agent.conversation()
285 {
286 let conv_len = conv_lock.read().await.len();
287 let iter_ctx = IterationContext {
288 agent_id: &agent_id,
289 iteration: iterations as u32,
290 max_iterations: agent.max_iterations() as u32,
291 total_tokens_used: 0,
292 total_cost_usd: 0.0,
293 elapsed: start_time.elapsed(),
294 conversation_len: conv_len,
295 };
296 let mut history = conv_lock.write().await;
297 let mut view = ConversationView::new(&mut history);
298 hooks.on_before_provider_call(&iter_ctx, &mut view).await;
299 }
300
301 let response = agent.call_provider().await?;
303
304 if let Some(hooks) = hooks {
306 let conv_len = match agent.conversation() {
307 Some(c) => c.read().await.len(),
308 None => 0,
309 };
310 let iter_ctx = IterationContext {
311 agent_id: &agent_id,
312 iteration: iterations as u32,
313 max_iterations: agent.max_iterations() as u32,
314 total_tokens_used: 0,
315 total_cost_usd: 0.0,
316 elapsed: start_time.elapsed(),
317 conversation_len: conv_len,
318 };
319 hooks.on_after_provider_call(&iter_ctx, &response).await;
320 }
321
322 if agent.is_completion(&response) {
324 if let Some(output) = agent.on_completion(&response).await? {
325 let _ = hub.unregister_agent(&agent_id).await;
326 lock_manager.release_all_locks(&agent_id).await;
327 return Ok(AgentExecutionResult {
328 agent_id,
329 success: true,
330 output,
331 iterations,
332 tools_used,
333 });
334 }
335 continue;
337 }
338
339 let tool_use_requests = agent.extract_tool_uses(&response);
341
342 if tool_use_requests.is_empty() {
343 if let Some(output) = agent.on_completion(&response).await? {
345 let _ = hub.unregister_agent(&agent_id).await;
346 lock_manager.release_all_locks(&agent_id).await;
347 return Ok(AgentExecutionResult {
348 agent_id,
349 success: true,
350 output,
351 iterations,
352 tools_used,
353 });
354 }
355 continue;
356 }
357
358 agent.on_provider_response(&response).await;
360
361 for tool_use in &tool_use_requests {
363 if let Some(hooks) = hooks {
365 let conv_len = match agent.conversation() {
366 Some(c) => c.read().await.len(),
367 None => 0,
368 };
369 let iter_ctx = IterationContext {
370 agent_id: &agent_id,
371 iteration: iterations as u32,
372 max_iterations: agent.max_iterations() as u32,
373 total_tokens_used: 0,
374 total_cost_usd: 0.0,
375 elapsed: start_time.elapsed(),
376 conversation_len: conv_len,
377 };
378 match hooks.on_before_tool_execution(&iter_ctx, tool_use).await {
379 ToolDecision::Execute => {} ToolDecision::Override(result) => {
381 agent.on_tool_result(tool_use, &result).await;
382 tools_used.push(tool_use.name.clone());
383 continue;
384 }
385 ToolDecision::Delegate(request) => {
386 match hooks.execute_delegation(&request).await {
387 Ok(delegation_result) => {
388 let result = ToolResult::success(
389 tool_use.id.clone(),
390 format!(
391 "Delegated to sub-agent {}: {}",
392 delegation_result.agent_id, delegation_result.output
393 ),
394 );
395 agent.on_tool_result(tool_use, &result).await;
396 }
397 Err(e) => {
398 let result = ToolResult::error(
399 tool_use.id.clone(),
400 format!("Delegation failed: {}", e),
401 );
402 agent.on_tool_result(tool_use, &result).await;
403 }
404 }
405 tools_used.push(tool_use.name.clone());
406 continue;
407 }
408 }
409 }
410
411 tools_used.push(tool_use.name.clone());
412
413 let tool_result = if let Some((path, lock_type)) = agent.get_lock_requirement(tool_use)
414 {
415 match lock_manager.acquire_lock(&agent_id, &path, lock_type).await {
417 Ok(_guard) => match agent.execute_tool(tool_use).await {
418 Ok(result) => result,
419 Err(e) => ToolResult::error(
420 tool_use.id.clone(),
421 format!("Tool execution failed: {}", e),
422 ),
423 },
424 Err(e) => {
425 tracing::warn!(
426 agent_id = %agent_id,
427 path = %path,
428 "failed to acquire file lock: {}",
429 e
430 );
431 ToolResult::error(
432 tool_use.id.clone(),
433 format!("Failed to acquire lock on {}: {}", path, e),
434 )
435 }
436 }
437 } else {
438 match agent.execute_tool(tool_use).await {
440 Ok(result) => result,
441 Err(e) => ToolResult::error(
442 tool_use.id.clone(),
443 format!("Tool execution failed: {}", e),
444 ),
445 }
446 };
447
448 agent.on_tool_result(tool_use, &tool_result).await;
449
450 if let Some(hooks) = hooks
452 && let Some(conv_lock) = agent.conversation()
453 {
454 let conv_len = conv_lock.read().await.len();
455 let iter_ctx = IterationContext {
456 agent_id: &agent_id,
457 iteration: iterations as u32,
458 max_iterations: agent.max_iterations() as u32,
459 total_tokens_used: 0,
460 total_cost_usd: 0.0,
461 elapsed: start_time.elapsed(),
462 conversation_len: conv_len,
463 };
464 let mut history = conv_lock.write().await;
465 let mut view = ConversationView::new(&mut history);
466 hooks
467 .on_after_tool_execution(&iter_ctx, tool_use, &tool_result, &mut view)
468 .await;
469 }
470 }
471
472 for tool_use in &tool_use_requests {
474 if let Some(stuck) = loop_detector.record(&tool_use.name) {
475 let output = format!(
476 "Loop detected: '{}' called {} times consecutively. Aborting.",
477 stuck, loop_detector.window_size
478 );
479 tracing::error!(agent_id = %agent_id, %output);
480 let _ = hub.unregister_agent(&agent_id).await;
481 lock_manager.release_all_locks(&agent_id).await;
482 return Ok(AgentExecutionResult {
483 agent_id,
484 success: false,
485 output,
486 iterations,
487 tools_used,
488 });
489 }
490 }
491
492 if let Some(hooks) = hooks
494 && let Some(conv_lock) = agent.conversation()
495 {
496 let conv_len = conv_lock.read().await.len();
497 let iter_ctx = IterationContext {
498 agent_id: &agent_id,
499 iteration: iterations as u32,
500 max_iterations: agent.max_iterations() as u32,
501 total_tokens_used: 0,
502 total_cost_usd: 0.0,
503 elapsed: start_time.elapsed(),
504 conversation_len: conv_len,
505 };
506
507 if let Some(budget) = agent.context_budget_tokens() {
509 let mut history = conv_lock.write().await;
510 let mut view = ConversationView::new(&mut history);
511 let est_tokens = view.estimated_tokens();
512 if est_tokens > budget {
513 hooks
514 .on_context_pressure(&iter_ctx, &mut view, est_tokens, budget)
515 .await;
516 }
517 }
518
519 let mut history = conv_lock.write().await;
521 let mut view = ConversationView::new(&mut history);
522 hooks.on_after_iteration(&iter_ctx, &mut view).await;
523 }
524 }
525}
526
527#[cfg(test)]
528mod tests {
529 use super::*;
530 use brainwires_core::{ContentBlock, Message, MessageContent, Role, Usage};
531 use std::sync::atomic::{AtomicUsize, Ordering};
532 use tokio::sync::RwLock;
533
534 struct TestAgent {
536 id: String,
537 max_iters: usize,
538 call_count: AtomicUsize,
539 complete_after: usize,
540 tool_results: RwLock<Vec<String>>,
541 }
542
543 impl TestAgent {
544 fn new(id: &str, max_iters: usize, complete_after: usize) -> Self {
545 Self {
546 id: id.to_string(),
547 max_iters,
548 call_count: AtomicUsize::new(0),
549 complete_after,
550 tool_results: RwLock::new(Vec::new()),
551 }
552 }
553 }
554
555 #[async_trait]
556 impl AgentRuntime for TestAgent {
557 fn agent_id(&self) -> &str {
558 &self.id
559 }
560
561 fn max_iterations(&self) -> usize {
562 self.max_iters
563 }
564
565 async fn call_provider(&self) -> Result<ChatResponse> {
566 let count = self.call_count.fetch_add(1, Ordering::SeqCst);
567 let finish = if count >= self.complete_after {
568 Some("end_turn".to_string())
569 } else {
570 None
571 };
572 Ok(ChatResponse {
573 message: Message {
574 role: Role::Assistant,
575 content: MessageContent::Text(format!("Response #{}", count)),
576 name: None,
577 metadata: None,
578 },
579 usage: Usage::new(10, 20),
580 finish_reason: finish,
581 })
582 }
583
584 fn extract_tool_uses(&self, _response: &ChatResponse) -> Vec<ToolUse> {
585 vec![]
586 }
587
588 fn is_completion(&self, response: &ChatResponse) -> bool {
589 response
590 .finish_reason
591 .as_deref()
592 .is_some_and(|r| r == "end_turn" || r == "stop")
593 }
594
595 async fn execute_tool(&self, tool_use: &ToolUse) -> Result<ToolResult> {
596 Ok(ToolResult::success(tool_use.id.clone(), "ok".to_string()))
597 }
598
599 fn get_lock_requirement(&self, _tool_use: &ToolUse) -> Option<(String, LockType)> {
600 None
601 }
602
603 async fn on_provider_response(&self, _response: &ChatResponse) {}
604
605 async fn on_tool_result(&self, _tool_use: &ToolUse, result: &ToolResult) {
606 self.tool_results.write().await.push(result.content.clone());
607 }
608
609 async fn on_completion(&self, response: &ChatResponse) -> Result<Option<String>> {
610 if response
612 .finish_reason
613 .as_deref()
614 .is_some_and(|r| r == "end_turn" || r == "stop")
615 {
616 if let MessageContent::Text(ref text) = response.message.content {
617 Ok(Some(text.clone()))
618 } else {
619 Ok(Some("completed".to_string()))
620 }
621 } else {
622 Ok(None)
623 }
624 }
625
626 async fn on_iteration_limit(&self, iterations: usize) -> String {
627 format!("Hit iteration limit at {}", iterations)
628 }
629 }
630
631 struct ToolUsingAgent {
633 id: String,
634 call_count: AtomicUsize,
635 }
636
637 impl ToolUsingAgent {
638 fn new(id: &str) -> Self {
639 Self {
640 id: id.to_string(),
641 call_count: AtomicUsize::new(0),
642 }
643 }
644 }
645
646 #[async_trait]
647 impl AgentRuntime for ToolUsingAgent {
648 fn agent_id(&self) -> &str {
649 &self.id
650 }
651
652 fn max_iterations(&self) -> usize {
653 10
654 }
655
656 async fn call_provider(&self) -> Result<ChatResponse> {
657 let count = self.call_count.fetch_add(1, Ordering::SeqCst);
658 if count == 0 {
659 Ok(ChatResponse {
661 message: Message {
662 role: Role::Assistant,
663 content: MessageContent::Blocks(vec![ContentBlock::ToolUse {
664 id: "tool-1".to_string(),
665 name: "read_file".to_string(),
666 input: serde_json::json!({"path": "/tmp/test.txt"}),
667 }]),
668 name: None,
669 metadata: None,
670 },
671 usage: Usage::new(10, 20),
672 finish_reason: None,
673 })
674 } else {
675 Ok(ChatResponse {
677 message: Message {
678 role: Role::Assistant,
679 content: MessageContent::Text("Done!".to_string()),
680 name: None,
681 metadata: None,
682 },
683 usage: Usage::new(10, 20),
684 finish_reason: Some("end_turn".to_string()),
685 })
686 }
687 }
688
689 fn extract_tool_uses(&self, response: &ChatResponse) -> Vec<ToolUse> {
690 if let MessageContent::Blocks(ref blocks) = response.message.content {
691 blocks
692 .iter()
693 .filter_map(|b| {
694 if let ContentBlock::ToolUse { id, name, input } = b {
695 Some(ToolUse {
696 id: id.clone(),
697 name: name.clone(),
698 input: input.clone(),
699 })
700 } else {
701 None
702 }
703 })
704 .collect()
705 } else {
706 vec![]
707 }
708 }
709
710 fn is_completion(&self, response: &ChatResponse) -> bool {
711 response
712 .finish_reason
713 .as_deref()
714 .is_some_and(|r| r == "end_turn" || r == "stop")
715 }
716
717 async fn execute_tool(&self, tool_use: &ToolUse) -> Result<ToolResult> {
718 Ok(ToolResult::success(
719 tool_use.id.clone(),
720 "file contents".to_string(),
721 ))
722 }
723
724 fn get_lock_requirement(&self, tool_use: &ToolUse) -> Option<(String, LockType)> {
725 if tool_use.name == "read_file" {
726 tool_use
727 .input
728 .get("path")
729 .and_then(|v| v.as_str())
730 .map(|p| (p.to_string(), LockType::Read))
731 } else {
732 None
733 }
734 }
735
736 async fn on_provider_response(&self, _response: &ChatResponse) {}
737
738 async fn on_tool_result(&self, _tool_use: &ToolUse, _result: &ToolResult) {}
739
740 async fn on_completion(&self, _response: &ChatResponse) -> Result<Option<String>> {
741 Ok(Some("Done!".to_string()))
742 }
743
744 async fn on_iteration_limit(&self, iterations: usize) -> String {
745 format!("Limit at {}", iterations)
746 }
747 }
748
749 #[tokio::test]
750 async fn test_agent_completes_successfully() {
751 let agent = TestAgent::new("test-1", 10, 2);
752 let hub = CommunicationHub::new();
753 let locks = Arc::new(FileLockManager::new());
754
755 let result = run_agent_loop(&agent, &hub, &locks).await.unwrap();
756
757 assert!(result.success);
758 assert_eq!(result.agent_id, "test-1");
759 assert_eq!(result.iterations, 3); assert!(result.tools_used.is_empty());
761 }
762
763 #[tokio::test]
764 async fn test_agent_hits_iteration_limit() {
765 let agent = TestAgent::new("test-2", 3, 100); let hub = CommunicationHub::new();
767 let locks = Arc::new(FileLockManager::new());
768
769 let result = run_agent_loop(&agent, &hub, &locks).await.unwrap();
770
771 assert!(!result.success);
772 assert_eq!(result.iterations, 3);
773 assert!(result.output.contains("iteration limit"));
774 }
775
776 #[tokio::test]
777 async fn test_agent_with_tool_use() {
778 let agent = ToolUsingAgent::new("test-3");
779 let hub = CommunicationHub::new();
780 let locks = Arc::new(FileLockManager::new());
781
782 let result = run_agent_loop(&agent, &hub, &locks).await.unwrap();
783
784 assert!(result.success);
785 assert_eq!(result.iterations, 2);
786 assert_eq!(result.tools_used, vec!["read_file"]);
787 }
788
789 #[tokio::test]
790 async fn test_agent_unregisters_on_completion() {
791 let agent = TestAgent::new("test-4", 10, 0);
792 let hub = CommunicationHub::new();
793 let locks = Arc::new(FileLockManager::new());
794
795 let _ = run_agent_loop(&agent, &hub, &locks).await.unwrap();
796
797 assert!(!hub.is_registered("test-4").await);
799 }
800
801 #[tokio::test]
802 async fn test_agent_releases_locks_on_completion() {
803 let agent = TestAgent::new("test-5", 10, 0);
804 let hub = CommunicationHub::new();
805 let locks = Arc::new(FileLockManager::new());
806
807 let _guard = locks
809 .acquire_lock("test-5", "/tmp/some_file.txt", LockType::Write)
810 .await
811 .unwrap();
812 std::mem::forget(_guard); let _ = run_agent_loop(&agent, &hub, &locks).await.unwrap();
815
816 let agent_locks = locks.locks_for_agent("test-5").await;
818 assert!(agent_locks.is_empty());
819 }
820
821 struct LoopingAgent {
823 id: String,
824 }
825
826 #[async_trait]
827 impl AgentRuntime for LoopingAgent {
828 fn agent_id(&self) -> &str {
829 &self.id
830 }
831 fn max_iterations(&self) -> usize {
832 100
833 }
834
835 async fn call_provider(&self) -> Result<ChatResponse> {
836 Ok(ChatResponse {
837 message: Message {
838 role: Role::Assistant,
839 content: MessageContent::Blocks(vec![ContentBlock::ToolUse {
840 id: "t".to_string(),
841 name: "bash".to_string(),
842 input: serde_json::json!({"command": "ls"}),
843 }]),
844 name: None,
845 metadata: None,
846 },
847 usage: Usage::new(10, 20),
848 finish_reason: None,
849 })
850 }
851
852 fn extract_tool_uses(&self, response: &ChatResponse) -> Vec<ToolUse> {
853 if let MessageContent::Blocks(ref blocks) = response.message.content {
854 blocks
855 .iter()
856 .filter_map(|b| {
857 if let ContentBlock::ToolUse { id, name, input } = b {
858 Some(ToolUse {
859 id: id.clone(),
860 name: name.clone(),
861 input: input.clone(),
862 })
863 } else {
864 None
865 }
866 })
867 .collect()
868 } else {
869 vec![]
870 }
871 }
872
873 fn is_completion(&self, _response: &ChatResponse) -> bool {
874 false
875 }
876
877 async fn execute_tool(&self, tool_use: &ToolUse) -> Result<ToolResult> {
878 Ok(ToolResult::success(tool_use.id.clone(), "ok".to_string()))
879 }
880
881 fn get_lock_requirement(&self, _tool_use: &ToolUse) -> Option<(String, LockType)> {
882 None
883 }
884 async fn on_provider_response(&self, _response: &ChatResponse) {}
885 async fn on_tool_result(&self, _tool_use: &ToolUse, _result: &ToolResult) {}
886 async fn on_completion(&self, _response: &ChatResponse) -> Result<Option<String>> {
887 Ok(None)
888 }
889 async fn on_iteration_limit(&self, iterations: usize) -> String {
890 format!("Limit at {}", iterations)
891 }
892 }
893
894 #[tokio::test]
895 async fn test_loop_detection_aborts() {
896 let agent = LoopingAgent {
897 id: "loop-agent".to_string(),
898 };
899 let hub = CommunicationHub::new();
900 let locks = Arc::new(FileLockManager::new());
901
902 let result = run_agent_loop(&agent, &hub, &locks).await.unwrap();
903
904 assert!(!result.success);
905 assert!(
906 result.output.contains("Loop detected"),
907 "got: {}",
908 result.output
909 );
910 assert_eq!(result.tools_used.len(), 5);
912 }
913}