1use std::collections::VecDeque;
33use std::path::PathBuf;
34use std::sync::Arc;
35
36use anyhow::Result;
37use chrono::Utc;
38use sha2::{Digest, Sha256};
39use tokio::sync::RwLock;
40
41use brainwires_core::{
42 ChatOptions, ChatResponse, ContentBlock, ContentSource, Message, MessageContent, Provider,
43 Role, Task, ToolContext, ToolResult, ToolUse, estimate_tokens_from_size,
44};
45use brainwires_tool_system::{PreHookDecision, wrap_with_content_source};
46
47use crate::agent_hooks::{ConversationView, IterationContext, IterationDecision, ToolDecision};
48use crate::communication::AgentMessage;
49use crate::context::AgentContext;
50use crate::execution_graph::{ExecutionGraph, RunTelemetry, ToolCallRecord};
51use crate::file_locks::LockType;
52use crate::validation_loop::{ValidationConfig, format_validation_feedback, run_validation};
53
54const EXTERNAL_CONTENT_TOOLS: &[&str] = &[
57 "fetch_url",
58 "web_fetch",
59 "web_search",
60 "context_recall",
61 "semantic_search",
62];
63
64const DEFAULT_LOOP_DETECTION_WINDOW: usize = 5;
65const DEFAULT_MAX_ITERATIONS: u32 = 100;
66
67#[derive(Debug, Clone)]
69pub struct LoopDetectionConfig {
70 pub window_size: usize,
72 pub enabled: bool,
74}
75
76impl Default for LoopDetectionConfig {
77 fn default() -> Self {
78 Self {
79 window_size: DEFAULT_LOOP_DETECTION_WINDOW,
80 enabled: true,
81 }
82 }
83}
84
85#[derive(Debug, Clone, PartialEq, Eq)]
87pub enum TaskAgentStatus {
88 Idle,
90 Working(String),
92 WaitingForLock(String),
94 Paused(String),
96 Replanning(String),
98 Completed(String),
100 Failed(String),
102}
103
104impl std::fmt::Display for TaskAgentStatus {
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 match self {
107 TaskAgentStatus::Idle => write!(f, "Idle"),
108 TaskAgentStatus::Working(desc) => write!(f, "Working: {}", desc),
109 TaskAgentStatus::WaitingForLock(path) => write!(f, "Waiting for lock: {}", path),
110 TaskAgentStatus::Paused(reason) => write!(f, "Paused: {}", reason),
111 TaskAgentStatus::Replanning(reason) => write!(f, "Replanning: {}", reason),
112 TaskAgentStatus::Completed(summary) => write!(f, "Completed: {}", summary),
113 TaskAgentStatus::Failed(error) => write!(f, "Failed: {}", error),
114 }
115 }
116}
117
118#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
123pub enum FailureCategory {
124 IterationLimitExceeded,
126 TokenBudgetExceeded,
128 CostBudgetExceeded,
130 WallClockTimeout,
132 LoopDetected,
134 MaxReplanAttemptsExceeded,
136 FileScopeViolation,
138 ValidationFailed,
141 ToolExecutionError,
143 Unknown,
145 PlanBudgetExceeded,
148}
149
150#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
152pub struct TaskAgentResult {
153 pub agent_id: String,
155 pub task_id: String,
157 pub success: bool,
159 pub summary: String,
161 pub iterations: u32,
163 pub replan_count: u32,
165 pub budget_exhausted: bool,
167 pub partial_output: Option<String>,
169 pub total_tokens_used: u64,
171 pub total_cost_usd: f64,
173 pub timed_out: bool,
175 pub failure_category: Option<FailureCategory>,
177 pub execution_graph: ExecutionGraph,
179 pub telemetry: RunTelemetry,
181 pub pre_execution_plan: Option<brainwires_core::SerializablePlan>,
185}
186
187#[non_exhaustive]
189#[derive(Debug, Clone)]
190pub struct TaskAgentConfig {
191 pub max_iterations: u32,
195
196 pub system_prompt: Option<String>,
200
201 pub temperature: f32,
203
204 pub max_tokens: u32,
206
207 pub validation_config: Option<ValidationConfig>,
211
212 pub loop_detection: Option<LoopDetectionConfig>,
214
215 pub goal_revalidation_interval: Option<u32>,
217
218 pub max_replan_attempts: u32,
220
221 pub max_total_tokens: Option<u64>,
223
224 pub max_cost_usd: Option<f64>,
226
227 pub timeout_secs: Option<u64>,
229
230 pub allowed_files: Option<Vec<PathBuf>>,
240
241 pub plan_budget: Option<brainwires_core::PlanBudget>,
251
252 pub context_budget_tokens: Option<u64>,
261}
262
263impl Default for TaskAgentConfig {
264 fn default() -> Self {
265 Self {
266 max_iterations: DEFAULT_MAX_ITERATIONS,
267 system_prompt: None,
268 temperature: 0.7,
269 max_tokens: 4096,
270 validation_config: Some(ValidationConfig::default()),
271 loop_detection: Some(LoopDetectionConfig::default()),
272 goal_revalidation_interval: Some(10),
273 max_replan_attempts: 3,
274 max_total_tokens: None,
275 max_cost_usd: None,
276 timeout_secs: None,
277 allowed_files: None,
278 plan_budget: None,
279 context_budget_tokens: None,
280 }
281 }
282}
283
284pub struct TaskAgent {
289 pub id: String,
291 task: Arc<RwLock<Task>>,
293 provider: Arc<dyn Provider>,
295 context: Arc<AgentContext>,
297 config: TaskAgentConfig,
299 status: Arc<RwLock<TaskAgentStatus>>,
301 conversation_history: Arc<RwLock<Vec<Message>>>,
303 replan_count: Arc<RwLock<u32>>,
305}
306
307impl TaskAgent {
308 pub fn new(
313 id: String,
314 task: Task,
315 provider: Arc<dyn Provider>,
316 context: Arc<AgentContext>,
317 config: TaskAgentConfig,
318 ) -> Self {
319 Self {
320 id,
321 task: Arc::new(RwLock::new(task)),
322 provider,
323 context,
324 config,
325 status: Arc::new(RwLock::new(TaskAgentStatus::Idle)),
326 conversation_history: Arc::new(RwLock::new(Vec::new())),
327 replan_count: Arc::new(RwLock::new(0)),
328 }
329 }
330
331 pub fn id(&self) -> &str {
333 &self.id
334 }
335
336 pub async fn status(&self) -> TaskAgentStatus {
338 self.status.read().await.clone()
339 }
340
341 pub async fn task(&self) -> Task {
343 self.task.read().await.clone()
344 }
345
346 pub async fn conversation_snapshot(&self) -> Vec<Message> {
348 self.conversation_history.read().await.clone()
349 }
350
351 pub async fn conversation_len(&self) -> usize {
353 self.conversation_history.read().await.len()
354 }
355
356 pub async fn inject_message(&self, msg: Message) {
358 self.conversation_history.write().await.push(msg);
359 }
360
361 async fn set_status(&self, status: TaskAgentStatus) {
364 *self.status.write().await = status.clone();
365 let _ = self
366 .context
367 .communication_hub
368 .broadcast(
369 self.id.clone(),
370 AgentMessage::StatusUpdate {
371 agent_id: self.id.clone(),
372 status: status.to_string(),
373 details: None,
374 },
375 )
376 .await;
377 }
378
379 fn is_file_operation(tool_name: &str) -> bool {
381 matches!(
382 tool_name,
383 "read_file" | "write_file" | "edit_file" | "append_to_file" | "delete_file"
384 )
385 }
386
387 fn extract_file_path(tool_use: &ToolUse) -> Option<PathBuf> {
389 let path_str = tool_use
390 .input
391 .get("file_path")
392 .or_else(|| tool_use.input.get("path"))
393 .and_then(|v| v.as_str())?;
394 Some(PathBuf::from(path_str))
395 }
396
397 fn is_file_path_allowed(path: &str, allowed: &[PathBuf]) -> bool {
399 if allowed.is_empty() {
400 return false;
401 }
402 let candidate = PathBuf::from(path);
403 allowed.iter().any(|prefix| candidate.starts_with(prefix))
404 }
405
406 fn get_lock_requirement(tool_use: &ToolUse) -> Option<(String, LockType)> {
408 let path = tool_use
409 .input
410 .get("path")
411 .or_else(|| tool_use.input.get("file_path"))
412 .and_then(|v| v.as_str())?;
413
414 let lock_type = match tool_use.name.as_str() {
415 "read_file" | "list_directory" | "search_code" => LockType::Read,
416 "write_file" | "edit_file" | "patch_file" | "delete_file" | "create_directory" => {
417 LockType::Write
418 }
419 _ => return None,
420 };
421 Some((path.to_string(), lock_type))
422 }
423
424 fn extract_tool_uses(message: &Message) -> Vec<ToolUse> {
426 match &message.content {
427 MessageContent::Blocks(blocks) => blocks
428 .iter()
429 .filter_map(|block| match block {
430 ContentBlock::ToolUse { id, name, input } => Some(ToolUse {
431 id: id.clone(),
432 name: name.clone(),
433 input: input.clone(),
434 }),
435 _ => None,
436 })
437 .collect(),
438 _ => vec![],
439 }
440 }
441
442 fn tool_result_message(result: &ToolResult) -> Message {
444 Message {
445 role: Role::User,
446 content: MessageContent::Blocks(vec![ContentBlock::ToolResult {
447 tool_use_id: result.tool_use_id.clone(),
448 content: result.content.clone(),
449 is_error: Some(result.is_error),
450 }]),
451 name: None,
452 metadata: None,
453 }
454 }
455
456 async fn call_provider(&self) -> Result<ChatResponse> {
458 let history = self.conversation_history.read().await.clone();
459 let tools = self.context.tool_executor.available_tools();
460
461 let system_prompt = self.config.system_prompt.clone().unwrap_or_else(|| {
462 crate::system_prompts::reasoning_agent_prompt(&self.id, &self.context.working_directory)
463 });
464
465 let options = ChatOptions {
466 temperature: Some(self.config.temperature),
467 max_tokens: Some(self.config.max_tokens),
468 top_p: None,
469 stop: None,
470 system: Some(system_prompt),
471 };
472
473 self.provider.chat(&history, Some(&tools), &options).await
474 }
475
476 async fn attempt_validated_completion(
482 &self,
483 message_text: &str,
484 total_tokens_used: u64,
485 total_cost_usd: f64,
486 replan_count: u32,
487 execution_graph: ExecutionGraph,
488 pre_execution_plan: Option<brainwires_core::SerializablePlan>,
489 ) -> Result<Option<TaskAgentResult>> {
490 let task_id = self.task.read().await.id.clone();
491
492 if let Some(ref validation_config) = self.config.validation_config
493 && validation_config.enabled
494 {
495 tracing::info!(
496 agent_id = %self.id,
497 "running validation before completion"
498 );
499
500 let working_set_files = {
501 let ws = self.context.working_set.read().await;
502 ws.file_paths()
503 .iter()
504 .map(|p| p.to_string_lossy().to_string())
505 .collect::<Vec<_>>()
506 };
507
508 let mut config_with_ws = validation_config.clone();
509 config_with_ws.working_set_files = working_set_files;
510
511 match run_validation(&config_with_ws).await {
512 Ok(result) if !result.passed => {
513 tracing::warn!(
514 agent_id = %self.id,
515 issues = result.issues.len(),
516 "validation failed, continuing loop"
517 );
518 let feedback = format_validation_feedback(&result);
519 self.conversation_history
520 .write()
521 .await
522 .push(Message::user(feedback));
523 return Ok(None);
524 }
525 Ok(_) => {
526 tracing::info!(agent_id = %self.id, "validation passed");
527 }
528 Err(e) => {
529 tracing::error!(agent_id = %self.id, "validation error: {}", e);
531 }
532 }
533 }
534
535 self.task.write().await.complete(message_text);
537 self.set_status(TaskAgentStatus::Completed(message_text.to_string()))
538 .await;
539
540 let _ = self
541 .context
542 .communication_hub
543 .broadcast(
544 self.id.clone(),
545 AgentMessage::TaskResult {
546 task_id: task_id.clone(),
547 success: true,
548 result: message_text.to_string(),
549 },
550 )
551 .await;
552
553 let _ = self
554 .context
555 .communication_hub
556 .unregister_agent(&self.id)
557 .await;
558 self.context
559 .file_lock_manager
560 .release_all_locks(&self.id)
561 .await;
562
563 let iterations = self.task.read().await.iterations;
564 let run_ended_at = Utc::now();
565 let telemetry =
566 RunTelemetry::from_graph(&execution_graph, run_ended_at, true, total_cost_usd);
567
568 Ok(Some(TaskAgentResult {
569 agent_id: self.id.clone(),
570 task_id,
571 success: true,
572 summary: message_text.to_string(),
573 iterations,
574 replan_count,
575 budget_exhausted: false,
576 partial_output: None,
577 total_tokens_used,
578 total_cost_usd,
579 timed_out: false,
580 failure_category: None,
581 execution_graph,
582 telemetry,
583 pre_execution_plan,
584 }))
585 }
586
587 pub async fn execute(&self) -> Result<TaskAgentResult> {
594 let task_id = self.task.read().await.id.clone();
595 let task_description = self.task.read().await.description.clone();
596
597 tracing::info!(
598 agent_id = %self.id,
599 task_id = %task_id,
600 "TaskAgent starting execution"
601 );
602
603 if !self.context.communication_hub.is_registered(&self.id).await {
605 self.context
606 .communication_hub
607 .register_agent(self.id.clone())
608 .await?;
609 }
610
611 self.task.write().await.start();
612 self.set_status(TaskAgentStatus::Working(task_description.clone()))
613 .await;
614
615 self.conversation_history
617 .write()
618 .await
619 .push(Message::user(task_description.clone()));
620
621 let prompt_hash = {
623 let system_prompt = self.config.system_prompt.clone().unwrap_or_else(|| {
624 crate::system_prompts::reasoning_agent_prompt(
625 &self.id,
626 &self.context.working_directory,
627 )
628 });
629 let mut tool_names: Vec<String> = self
630 .context
631 .tool_executor
632 .available_tools()
633 .iter()
634 .map(|t| t.name.clone())
635 .collect();
636 tool_names.sort_unstable();
637 let mut hasher = Sha256::new();
638 hasher.update(system_prompt.as_bytes());
639 for name in &tool_names {
640 hasher.update(name.as_bytes());
641 }
642 hex::encode(hasher.finalize())
643 };
644 let run_started_at = Utc::now();
645 let mut execution_graph = ExecutionGraph::new(prompt_hash, run_started_at);
646
647 let mut pre_execution_plan: Option<brainwires_core::SerializablePlan> = None;
651 if let Some(ref budget) = self.config.plan_budget {
652 let planning_msg = Message::user(format!(
653 "Before beginning work, produce a JSON execution plan for this task.\n\n\
654 Task: {task_description}\n\n\
655 Reply with ONLY a JSON object in this exact format:\n\
656 {{\"steps\":[{{\"description\":\"short description\",\"tool\":\"tool_name\",\"estimated_tokens\":500}},...]}}\n\n\
657 Estimate 200–2000 tokens per step based on expected complexity. \
658 Do not perform any work yet — only plan.",
659 ));
660 let planning_options = brainwires_core::ChatOptions {
661 temperature: Some(0.1),
662 max_tokens: Some(2048),
663 top_p: None,
664 stop: None,
665 system: Some(
666 "You are a planning assistant. Respond only with a valid JSON execution plan."
667 .to_string(),
668 ),
669 };
670 match self
671 .provider
672 .chat(&[planning_msg], None, &planning_options)
673 .await
674 {
675 Ok(response) => {
676 let plan_text = response.message.text().unwrap_or("").to_string();
677 if let Some(plan) = brainwires_core::SerializablePlan::parse_from_text(
678 task_description.clone(),
679 &plan_text,
680 ) {
681 match budget.check(&plan) {
682 Ok(()) => {
683 tracing::info!(
684 agent_id = %self.id,
685 steps = plan.step_count(),
686 estimated_tokens = plan.total_estimated_tokens(),
687 "pre-execution plan accepted"
688 );
689 pre_execution_plan = Some(plan);
690 }
691 Err(reason) => {
692 let error = format!(
693 "Agent {} rejected by plan budget before execution: {}",
694 self.id, reason
695 );
696 tracing::error!(agent_id = %self.id, %error);
697 self.task.write().await.fail(&error);
698 self.set_status(TaskAgentStatus::Failed(error.clone()))
699 .await;
700 let _ = self
701 .context
702 .communication_hub
703 .broadcast(
704 self.id.clone(),
705 AgentMessage::TaskResult {
706 task_id: task_id.clone(),
707 success: false,
708 result: error.clone(),
709 },
710 )
711 .await;
712 let _ = self
713 .context
714 .communication_hub
715 .unregister_agent(&self.id)
716 .await;
717 self.context
718 .file_lock_manager
719 .release_all_locks(&self.id)
720 .await;
721 let run_ended_at = Utc::now();
722 let telemetry = RunTelemetry::from_graph(
723 &execution_graph,
724 run_ended_at,
725 false,
726 0.0,
727 );
728 return Ok(TaskAgentResult {
729 agent_id: self.id.clone(),
730 task_id,
731 success: false,
732 summary: error,
733 iterations: 0,
734 replan_count: 0,
735 budget_exhausted: true,
736 partial_output: None,
737 total_tokens_used: 0,
738 total_cost_usd: 0.0,
739 timed_out: false,
740 failure_category: Some(FailureCategory::PlanBudgetExceeded),
741 execution_graph,
742 telemetry,
743 pre_execution_plan: None,
744 });
745 }
746 }
747 } else {
748 tracing::warn!(
749 agent_id = %self.id,
750 "could not parse pre-execution plan from model response; \
751 proceeding without budget guard"
752 );
753 }
754 }
755 Err(e) => {
756 tracing::warn!(
757 agent_id = %self.id,
758 error = %e,
759 "planning phase provider call failed; proceeding without plan"
760 );
761 }
762 }
763 }
764
765 let mut iterations = 0u32;
766 let mut total_tokens_used: u64 = 0;
767 let mut total_cost_usd: f64 = 0.0;
768 const COST_PER_TOKEN: f64 = 0.000003; let start_time = std::time::Instant::now();
770 let mut recent_tool_names: VecDeque<String> = VecDeque::with_capacity(
771 self.config
772 .loop_detection
773 .as_ref()
774 .map(|c| c.window_size)
775 .unwrap_or(5),
776 );
777 let tool_context = ToolContext {
778 working_directory: self.context.working_directory.clone(),
779 idempotency_registry: Some(brainwires_core::IdempotencyRegistry::new()),
782 ..Default::default()
783 };
784
785 loop {
786 iterations += 1;
787 self.task.write().await.increment_iteration();
788
789 tracing::debug!(
790 agent_id = %self.id,
791 iteration = iterations,
792 max = self.config.max_iterations,
793 "iteration starting"
794 );
795
796 let step_started_at = Utc::now();
797 let step_idx = execution_graph.push_step(iterations, step_started_at);
798
799 if let Some(ref hooks) = self.context.lifecycle_hooks {
801 let conv_len = self.conversation_history.read().await.len();
802 let iter_ctx = self.build_iteration_context(
803 iterations,
804 total_tokens_used,
805 total_cost_usd,
806 &start_time,
807 conv_len,
808 );
809 let mut history = self.conversation_history.write().await;
810 let mut view = ConversationView::new(&mut history);
811 match hooks.on_before_iteration(&iter_ctx, &mut view).await {
812 IterationDecision::Continue => {}
813 IterationDecision::Skip => {
814 drop(history);
815 continue;
816 }
817 IterationDecision::Abort(reason) => {
818 drop(history);
819 let error = format!("Agent {} aborted by hook: {}", self.id, reason);
820 tracing::error!(agent_id = %self.id, %error);
821 self.task.write().await.fail(&error);
822 self.set_status(TaskAgentStatus::Failed(error.clone()))
823 .await;
824 let _ = self
825 .context
826 .communication_hub
827 .broadcast(
828 self.id.clone(),
829 AgentMessage::TaskResult {
830 task_id: task_id.clone(),
831 success: false,
832 result: error.clone(),
833 },
834 )
835 .await;
836 let _ = self
837 .context
838 .communication_hub
839 .unregister_agent(&self.id)
840 .await;
841 self.context
842 .file_lock_manager
843 .release_all_locks(&self.id)
844 .await;
845 let run_ended_at = Utc::now();
846 let telemetry = RunTelemetry::from_graph(
847 &execution_graph,
848 run_ended_at,
849 false,
850 total_cost_usd,
851 );
852 return Ok(TaskAgentResult {
853 agent_id: self.id.clone(),
854 task_id,
855 success: false,
856 summary: error,
857 iterations,
858 replan_count: *self.replan_count.read().await,
859 budget_exhausted: false,
860 partial_output: None,
861 total_tokens_used,
862 total_cost_usd,
863 timed_out: false,
864 failure_category: Some(FailureCategory::Unknown),
865 execution_graph: execution_graph.clone(),
866 telemetry,
867 pre_execution_plan: pre_execution_plan.clone(),
868 });
869 }
870 }
871 }
872
873 if iterations >= self.config.max_iterations {
875 let error = format!(
876 "Agent {} exceeded maximum iterations ({})",
877 self.id, self.config.max_iterations
878 );
879 tracing::error!(agent_id = %self.id, %error);
880
881 self.task.write().await.fail(&error);
882 self.set_status(TaskAgentStatus::Failed(error.clone()))
883 .await;
884
885 let _ = self
886 .context
887 .communication_hub
888 .broadcast(
889 self.id.clone(),
890 AgentMessage::TaskResult {
891 task_id: task_id.clone(),
892 success: false,
893 result: error.clone(),
894 },
895 )
896 .await;
897
898 let _ = self
899 .context
900 .communication_hub
901 .unregister_agent(&self.id)
902 .await;
903 self.context
904 .file_lock_manager
905 .release_all_locks(&self.id)
906 .await;
907
908 let run_ended_at = Utc::now();
909 let telemetry =
910 RunTelemetry::from_graph(&execution_graph, run_ended_at, false, total_cost_usd);
911 return Ok(TaskAgentResult {
912 agent_id: self.id.clone(),
913 task_id,
914 success: false,
915 summary: error,
916 iterations,
917 replan_count: *self.replan_count.read().await,
918 budget_exhausted: false,
919 partial_output: None,
920 total_tokens_used,
921 total_cost_usd,
922 timed_out: false,
923 failure_category: Some(FailureCategory::IterationLimitExceeded),
924 execution_graph: execution_graph.clone(),
925 telemetry,
926 pre_execution_plan: pre_execution_plan.clone(),
927 });
928 }
929
930 if let Some(envelope) = self
932 .context
933 .communication_hub
934 .try_receive_message(&self.id)
935 .await
936 && let AgentMessage::HelpResponse {
937 request_id,
938 response,
939 } = envelope.message
940 {
941 self.conversation_history
942 .write()
943 .await
944 .push(Message::user(format!(
945 "Response to help request {}: {}",
946 request_id, response
947 )));
948 }
949
950 if let Some(secs) = self.config.timeout_secs
952 && start_time.elapsed().as_secs() >= secs
953 {
954 let elapsed = start_time.elapsed().as_secs();
955 let partial = self.last_assistant_text().await;
956 let error = format!(
957 "Agent {} timed out after {}s (limit: {}s)",
958 self.id, elapsed, secs
959 );
960 tracing::error!(agent_id = %self.id, %error);
961 self.task.write().await.fail(&error);
962 self.set_status(TaskAgentStatus::Failed(error.clone()))
963 .await;
964 let _ = self
965 .context
966 .communication_hub
967 .broadcast(
968 self.id.clone(),
969 AgentMessage::TaskResult {
970 task_id: task_id.clone(),
971 success: false,
972 result: error.clone(),
973 },
974 )
975 .await;
976 let _ = self
977 .context
978 .communication_hub
979 .unregister_agent(&self.id)
980 .await;
981 self.context
982 .file_lock_manager
983 .release_all_locks(&self.id)
984 .await;
985 let run_ended_at = Utc::now();
986 let telemetry =
987 RunTelemetry::from_graph(&execution_graph, run_ended_at, false, total_cost_usd);
988 return Ok(TaskAgentResult {
989 agent_id: self.id.clone(),
990 task_id,
991 success: false,
992 summary: error,
993 iterations,
994 replan_count: *self.replan_count.read().await,
995 budget_exhausted: false,
996 partial_output: partial,
997 total_tokens_used,
998 total_cost_usd,
999 timed_out: true,
1000 failure_category: Some(FailureCategory::WallClockTimeout),
1001 execution_graph: execution_graph.clone(),
1002 telemetry,
1003 pre_execution_plan: pre_execution_plan.clone(),
1004 });
1005 }
1006
1007 if let Some(max) = self.config.max_total_tokens
1009 && total_tokens_used >= max
1010 {
1011 let partial = self.last_assistant_text().await;
1012 let error = format!(
1013 "Agent {} exceeded token budget ({}/{} tokens)",
1014 self.id, total_tokens_used, max
1015 );
1016 tracing::error!(agent_id = %self.id, %error);
1017 self.task.write().await.fail(&error);
1018 self.set_status(TaskAgentStatus::Failed(error.clone()))
1019 .await;
1020 let _ = self
1021 .context
1022 .communication_hub
1023 .broadcast(
1024 self.id.clone(),
1025 AgentMessage::TaskResult {
1026 task_id: task_id.clone(),
1027 success: false,
1028 result: error.clone(),
1029 },
1030 )
1031 .await;
1032 let _ = self
1033 .context
1034 .communication_hub
1035 .unregister_agent(&self.id)
1036 .await;
1037 self.context
1038 .file_lock_manager
1039 .release_all_locks(&self.id)
1040 .await;
1041 let run_ended_at = Utc::now();
1042 let telemetry =
1043 RunTelemetry::from_graph(&execution_graph, run_ended_at, false, total_cost_usd);
1044 return Ok(TaskAgentResult {
1045 agent_id: self.id.clone(),
1046 task_id,
1047 success: false,
1048 summary: error,
1049 iterations,
1050 replan_count: *self.replan_count.read().await,
1051 budget_exhausted: true,
1052 partial_output: partial,
1053 total_tokens_used,
1054 total_cost_usd,
1055 timed_out: false,
1056 failure_category: Some(FailureCategory::TokenBudgetExceeded),
1057 execution_graph: execution_graph.clone(),
1058 telemetry,
1059 pre_execution_plan: pre_execution_plan.clone(),
1060 });
1061 }
1062
1063 if let Some(max) = self.config.max_cost_usd
1065 && total_cost_usd >= max
1066 {
1067 let partial = self.last_assistant_text().await;
1068 let error = format!(
1069 "Agent {} exceeded cost budget (${:.6}/{:.6} USD)",
1070 self.id, total_cost_usd, max
1071 );
1072 tracing::error!(agent_id = %self.id, %error);
1073 self.task.write().await.fail(&error);
1074 self.set_status(TaskAgentStatus::Failed(error.clone()))
1075 .await;
1076 let _ = self
1077 .context
1078 .communication_hub
1079 .broadcast(
1080 self.id.clone(),
1081 AgentMessage::TaskResult {
1082 task_id: task_id.clone(),
1083 success: false,
1084 result: error.clone(),
1085 },
1086 )
1087 .await;
1088 let _ = self
1089 .context
1090 .communication_hub
1091 .unregister_agent(&self.id)
1092 .await;
1093 self.context
1094 .file_lock_manager
1095 .release_all_locks(&self.id)
1096 .await;
1097 let run_ended_at = Utc::now();
1098 let telemetry =
1099 RunTelemetry::from_graph(&execution_graph, run_ended_at, false, total_cost_usd);
1100 return Ok(TaskAgentResult {
1101 agent_id: self.id.clone(),
1102 task_id,
1103 success: false,
1104 summary: error,
1105 iterations,
1106 replan_count: *self.replan_count.read().await,
1107 budget_exhausted: true,
1108 partial_output: partial,
1109 total_tokens_used,
1110 total_cost_usd,
1111 timed_out: false,
1112 failure_category: Some(FailureCategory::CostBudgetExceeded),
1113 execution_graph: execution_graph.clone(),
1114 telemetry,
1115 pre_execution_plan: pre_execution_plan.clone(),
1116 });
1117 }
1118
1119 if let Some(interval) = self.config.goal_revalidation_interval
1121 && interval > 0
1122 && iterations > 1
1123 && (iterations - 1).is_multiple_of(interval)
1124 {
1125 self.conversation_history
1126 .write()
1127 .await
1128 .push(Message::user(format!(
1129 "GOAL CHECK (iteration {}): Your original task was:\n\n\"{}\"\n\n\
1130 Confirm you are still on track. Correct course if you have drifted.",
1131 iterations, task_description
1132 )));
1133 }
1134
1135 if let Some(ref hooks) = self.context.lifecycle_hooks {
1137 let conv_len = self.conversation_history.read().await.len();
1138 let iter_ctx = self.build_iteration_context(
1139 iterations,
1140 total_tokens_used,
1141 total_cost_usd,
1142 &start_time,
1143 conv_len,
1144 );
1145 let mut history = self.conversation_history.write().await;
1146 let mut view = ConversationView::new(&mut history);
1147 hooks.on_before_provider_call(&iter_ctx, &mut view).await;
1148 }
1149
1150 let response = self.call_provider().await?;
1152
1153 total_tokens_used += response.usage.total_tokens as u64;
1155 total_cost_usd += response.usage.total_tokens as f64 * COST_PER_TOKEN;
1156
1157 if let Some(ref hooks) = self.context.lifecycle_hooks {
1159 let conv_len = self.conversation_history.read().await.len();
1160 let iter_ctx = self.build_iteration_context(
1161 iterations,
1162 total_tokens_used,
1163 total_cost_usd,
1164 &start_time,
1165 conv_len,
1166 );
1167 hooks.on_after_provider_call(&iter_ctx, &response).await;
1168 }
1169
1170 execution_graph.finalize_step(
1172 step_idx,
1173 Utc::now(),
1174 response.usage.prompt_tokens,
1175 response.usage.completion_tokens,
1176 response.finish_reason.clone(),
1177 );
1178
1179 {
1181 let text = response.message.text().unwrap_or("").to_lowercase();
1182 if text.contains("replan") || text.contains("replanning") {
1183 let mut count = self.replan_count.write().await;
1184 *count += 1;
1185 let c = *count;
1186 drop(count);
1187 self.set_status(TaskAgentStatus::Replanning(format!(
1188 "attempt {}/{}",
1189 c, self.config.max_replan_attempts
1190 )))
1191 .await;
1192 if c > self.config.max_replan_attempts {
1193 let error = format!(
1194 "Agent {} exceeded max replan attempts ({}/{})",
1195 self.id, c, self.config.max_replan_attempts
1196 );
1197 tracing::error!(agent_id = %self.id, %error);
1198 self.task.write().await.fail(&error);
1199 self.set_status(TaskAgentStatus::Failed(error.clone()))
1200 .await;
1201 let _ = self
1202 .context
1203 .communication_hub
1204 .broadcast(
1205 self.id.clone(),
1206 AgentMessage::TaskResult {
1207 task_id: task_id.clone(),
1208 success: false,
1209 result: error.clone(),
1210 },
1211 )
1212 .await;
1213 let _ = self
1214 .context
1215 .communication_hub
1216 .unregister_agent(&self.id)
1217 .await;
1218 self.context
1219 .file_lock_manager
1220 .release_all_locks(&self.id)
1221 .await;
1222 let run_ended_at = Utc::now();
1223 let telemetry = RunTelemetry::from_graph(
1224 &execution_graph,
1225 run_ended_at,
1226 false,
1227 total_cost_usd,
1228 );
1229 return Ok(TaskAgentResult {
1230 agent_id: self.id.clone(),
1231 task_id,
1232 success: false,
1233 summary: error,
1234 iterations,
1235 replan_count: c,
1236 budget_exhausted: false,
1237 partial_output: None,
1238 total_tokens_used,
1239 total_cost_usd,
1240 timed_out: false,
1241 failure_category: Some(FailureCategory::MaxReplanAttemptsExceeded),
1242 execution_graph: execution_graph.clone(),
1243 telemetry,
1244 pre_execution_plan: pre_execution_plan.clone(),
1245 });
1246 }
1247 }
1248 }
1249
1250 let is_done = response
1251 .finish_reason
1252 .as_deref()
1253 .is_some_and(|r| r == "end_turn" || r == "stop");
1254
1255 if is_done {
1257 let text = response
1258 .message
1259 .text()
1260 .unwrap_or("Task completed")
1261 .to_string();
1262
1263 if let Some(ref hooks) = self.context.lifecycle_hooks {
1265 let conv_len = self.conversation_history.read().await.len();
1266 let iter_ctx = self.build_iteration_context(
1267 iterations,
1268 total_tokens_used,
1269 total_cost_usd,
1270 &start_time,
1271 conv_len,
1272 );
1273 if !hooks.on_before_completion(&iter_ctx, &text).await {
1274 continue; }
1276 }
1277
1278 if let Some(result) = self
1279 .attempt_validated_completion(
1280 &text,
1281 total_tokens_used,
1282 total_cost_usd,
1283 *self.replan_count.read().await,
1284 execution_graph.clone(),
1285 pre_execution_plan.clone(),
1286 )
1287 .await?
1288 {
1289 if let Some(ref hooks) = self.context.lifecycle_hooks {
1291 let conv_len = self.conversation_history.read().await.len();
1292 let iter_ctx = self.build_iteration_context(
1293 iterations,
1294 total_tokens_used,
1295 total_cost_usd,
1296 &start_time,
1297 conv_len,
1298 );
1299 hooks.on_after_completion(&iter_ctx, &result).await;
1300 }
1301 return Ok(result);
1302 }
1303 continue; }
1305
1306 let tool_uses = Self::extract_tool_uses(&response.message);
1308
1309 if tool_uses.is_empty() {
1310 let text = response
1312 .message
1313 .text()
1314 .unwrap_or("Task completed")
1315 .to_string();
1316
1317 if let Some(ref hooks) = self.context.lifecycle_hooks {
1319 let conv_len = self.conversation_history.read().await.len();
1320 let iter_ctx = self.build_iteration_context(
1321 iterations,
1322 total_tokens_used,
1323 total_cost_usd,
1324 &start_time,
1325 conv_len,
1326 );
1327 if !hooks.on_before_completion(&iter_ctx, &text).await {
1328 continue; }
1330 }
1331
1332 if let Some(result) = self
1333 .attempt_validated_completion(
1334 &text,
1335 total_tokens_used,
1336 total_cost_usd,
1337 *self.replan_count.read().await,
1338 execution_graph.clone(),
1339 pre_execution_plan.clone(),
1340 )
1341 .await?
1342 {
1343 if let Some(ref hooks) = self.context.lifecycle_hooks {
1345 let conv_len = self.conversation_history.read().await.len();
1346 let iter_ctx = self.build_iteration_context(
1347 iterations,
1348 total_tokens_used,
1349 total_cost_usd,
1350 &start_time,
1351 conv_len,
1352 );
1353 hooks.on_after_completion(&iter_ctx, &result).await;
1354 }
1355 return Ok(result);
1356 }
1357 continue;
1358 }
1359
1360 self.conversation_history
1362 .write()
1363 .await
1364 .push(response.message.clone());
1365
1366 for tool_use in &tool_uses {
1367 tracing::debug!(
1368 agent_id = %self.id,
1369 tool = %tool_use.name,
1370 "executing tool"
1371 );
1372
1373 if let Some(ref hooks) = self.context.lifecycle_hooks {
1375 let conv_len = self.conversation_history.read().await.len();
1376 let iter_ctx = self.build_iteration_context(
1377 iterations,
1378 total_tokens_used,
1379 total_cost_usd,
1380 &start_time,
1381 conv_len,
1382 );
1383 match hooks.on_before_tool_execution(&iter_ctx, tool_use).await {
1384 ToolDecision::Execute => {} ToolDecision::Override(result) => {
1386 execution_graph.record_tool_call(
1387 step_idx,
1388 ToolCallRecord {
1389 tool_use_id: tool_use.id.clone(),
1390 tool_name: tool_use.name.clone(),
1391 is_error: result.is_error,
1392 executed_at: Utc::now(),
1393 },
1394 );
1395 self.conversation_history
1396 .write()
1397 .await
1398 .push(Self::tool_result_message(&result));
1399 continue;
1400 }
1401 ToolDecision::Delegate(request) => {
1402 match hooks.execute_delegation(&request).await {
1403 Ok(delegation_result) => {
1404 let tool_result = ToolResult::success(
1405 tool_use.id.clone(),
1406 format!(
1407 "Delegated to sub-agent {}: {}",
1408 delegation_result.agent_id, delegation_result.output
1409 ),
1410 );
1411 execution_graph.record_tool_call(
1412 step_idx,
1413 ToolCallRecord {
1414 tool_use_id: tool_use.id.clone(),
1415 tool_name: tool_use.name.clone(),
1416 is_error: !delegation_result.success,
1417 executed_at: Utc::now(),
1418 },
1419 );
1420 self.conversation_history
1421 .write()
1422 .await
1423 .push(Self::tool_result_message(&tool_result));
1424 }
1425 Err(e) => {
1426 let tool_result = ToolResult::error(
1427 tool_use.id.clone(),
1428 format!("Delegation failed: {}", e),
1429 );
1430 execution_graph.record_tool_call(
1431 step_idx,
1432 ToolCallRecord {
1433 tool_use_id: tool_use.id.clone(),
1434 tool_name: tool_use.name.clone(),
1435 is_error: true,
1436 executed_at: Utc::now(),
1437 },
1438 );
1439 self.conversation_history
1440 .write()
1441 .await
1442 .push(Self::tool_result_message(&tool_result));
1443 }
1444 }
1445 continue;
1446 }
1447 }
1448 }
1449
1450 if let Some(ref hook) = self.context.pre_execute_hook {
1452 match hook.before_execute(tool_use, &tool_context).await {
1453 Ok(PreHookDecision::Reject(reason)) => {
1454 tracing::warn!(
1455 agent_id = %self.id,
1456 tool = %tool_use.name,
1457 reason = %reason,
1458 "tool call rejected by pre-execute hook"
1459 );
1460 execution_graph.record_tool_call(
1461 step_idx,
1462 ToolCallRecord {
1463 tool_use_id: tool_use.id.clone(),
1464 tool_name: tool_use.name.clone(),
1465 is_error: true,
1466 executed_at: Utc::now(),
1467 },
1468 );
1469 let rejection = ToolResult::error(tool_use.id.clone(), reason);
1470 self.conversation_history
1471 .write()
1472 .await
1473 .push(Self::tool_result_message(&rejection));
1474 continue;
1475 }
1476 Ok(PreHookDecision::Allow) => {}
1477 Err(e) => {
1478 tracing::error!(
1479 agent_id = %self.id,
1480 "pre-execute hook error: {}",
1481 e
1482 );
1483 }
1484 }
1485 }
1486
1487 let tool_result =
1488 if let Some((path, lock_type)) = Self::get_lock_requirement(tool_use) {
1489 if let Some(ref allowed) = self.config.allowed_files
1491 && !Self::is_file_path_allowed(&path, allowed)
1492 {
1493 tracing::warn!(
1494 agent_id = %self.id,
1495 path = %path,
1496 "file scope violation"
1497 );
1498 let result = ToolResult::error(
1499 tool_use.id.clone(),
1500 format!(
1501 "File scope violation: '{}' is outside allowed paths: {:?}",
1502 path, allowed
1503 ),
1504 );
1505 self.conversation_history
1506 .write()
1507 .await
1508 .push(Self::tool_result_message(&result));
1509 continue;
1510 }
1511
1512 self.set_status(TaskAgentStatus::WaitingForLock(path.clone()))
1513 .await;
1514
1515 match self
1516 .context
1517 .file_lock_manager
1518 .acquire_lock(&self.id, &path, lock_type)
1519 .await
1520 {
1521 Ok(_guard) => {
1522 self.set_status(TaskAgentStatus::Working(format!(
1523 "Executing {}",
1524 tool_use.name
1525 )))
1526 .await;
1527 match self
1528 .context
1529 .tool_executor
1530 .execute(tool_use, &tool_context)
1531 .await
1532 {
1533 Ok(r) => r,
1534 Err(e) => ToolResult::error(
1535 tool_use.id.clone(),
1536 format!("Tool execution failed: {}", e),
1537 ),
1538 }
1539 }
1541 Err(e) => {
1542 tracing::warn!(
1543 agent_id = %self.id,
1544 path = %path,
1545 "failed to acquire lock: {}",
1546 e
1547 );
1548 ToolResult::error(
1549 tool_use.id.clone(),
1550 format!("Could not acquire file lock: {}", e),
1551 )
1552 }
1553 }
1554 } else {
1555 self.set_status(TaskAgentStatus::Working(format!(
1556 "Executing {}",
1557 tool_use.name
1558 )))
1559 .await;
1560 match self
1561 .context
1562 .tool_executor
1563 .execute(tool_use, &tool_context)
1564 .await
1565 {
1566 Ok(r) => r,
1567 Err(e) => ToolResult::error(
1568 tool_use.id.clone(),
1569 format!("Tool execution failed: {}", e),
1570 ),
1571 }
1572 };
1573
1574 execution_graph.record_tool_call(
1576 step_idx,
1577 ToolCallRecord {
1578 tool_use_id: tool_use.id.clone(),
1579 tool_name: tool_use.name.clone(),
1580 is_error: tool_result.is_error,
1581 executed_at: Utc::now(),
1582 },
1583 );
1584
1585 if !tool_result.is_error
1587 && Self::is_file_operation(&tool_use.name)
1588 && let Some(fp) = Self::extract_file_path(tool_use)
1589 {
1590 let tokens = estimate_tokens_from_size(
1591 std::fs::metadata(&fp).ok().map(|m| m.len()).unwrap_or(0),
1592 );
1593 self.context.working_set.write().await.add(fp, tokens);
1594 }
1595
1596 let final_result = if EXTERNAL_CONTENT_TOOLS.contains(&tool_use.name.as_str())
1600 && !tool_result.is_error
1601 {
1602 ToolResult {
1603 tool_use_id: tool_result.tool_use_id.clone(),
1604 content: wrap_with_content_source(
1605 &tool_result.content,
1606 ContentSource::ExternalContent,
1607 ),
1608 is_error: false,
1609 }
1610 } else {
1611 tool_result.clone()
1612 };
1613 self.conversation_history
1614 .write()
1615 .await
1616 .push(Self::tool_result_message(&final_result));
1617
1618 if let Some(ref hooks) = self.context.lifecycle_hooks {
1620 let conv_len = self.conversation_history.read().await.len();
1621 let iter_ctx = self.build_iteration_context(
1622 iterations,
1623 total_tokens_used,
1624 total_cost_usd,
1625 &start_time,
1626 conv_len,
1627 );
1628 let mut history = self.conversation_history.write().await;
1629 let mut view = ConversationView::new(&mut history);
1630 hooks
1631 .on_after_tool_execution(&iter_ctx, tool_use, &final_result, &mut view)
1632 .await;
1633 }
1634 }
1635
1636 if let Some(ref ld) = self.config.loop_detection
1638 && ld.enabled
1639 {
1640 for tool_use in &tool_uses {
1641 if recent_tool_names.len() == ld.window_size {
1642 recent_tool_names.pop_front();
1643 }
1644 recent_tool_names.push_back(tool_use.name.clone());
1645 }
1646 if recent_tool_names.len() == ld.window_size
1647 && recent_tool_names.iter().all(|n| n == &recent_tool_names[0])
1648 {
1649 let stuck = recent_tool_names[0].clone();
1650 let error = format!(
1651 "Loop detected: '{}' called {} times consecutively. Aborting.",
1652 stuck, ld.window_size
1653 );
1654 tracing::error!(agent_id = %self.id, %error);
1655 self.conversation_history
1656 .write()
1657 .await
1658 .push(Message::user(format!(
1659 "SYSTEM: {error} Stop calling '{stuck}' and summarise progress."
1660 )));
1661 self.task.write().await.fail(&error);
1662 self.set_status(TaskAgentStatus::Failed(error.clone()))
1663 .await;
1664 let _ = self
1665 .context
1666 .communication_hub
1667 .broadcast(
1668 self.id.clone(),
1669 AgentMessage::TaskResult {
1670 task_id: task_id.clone(),
1671 success: false,
1672 result: error.clone(),
1673 },
1674 )
1675 .await;
1676 let _ = self
1677 .context
1678 .communication_hub
1679 .unregister_agent(&self.id)
1680 .await;
1681 self.context
1682 .file_lock_manager
1683 .release_all_locks(&self.id)
1684 .await;
1685 let run_ended_at = Utc::now();
1686 let telemetry = RunTelemetry::from_graph(
1687 &execution_graph,
1688 run_ended_at,
1689 false,
1690 total_cost_usd,
1691 );
1692 return Ok(TaskAgentResult {
1693 agent_id: self.id.clone(),
1694 task_id,
1695 success: false,
1696 summary: error,
1697 iterations,
1698 replan_count: *self.replan_count.read().await,
1699 budget_exhausted: false,
1700 partial_output: None,
1701 total_tokens_used,
1702 total_cost_usd,
1703 timed_out: false,
1704 failure_category: Some(FailureCategory::LoopDetected),
1705 execution_graph: execution_graph.clone(),
1706 telemetry,
1707 pre_execution_plan: pre_execution_plan.clone(),
1708 });
1709 }
1710 }
1711
1712 if let Some(ref hooks) = self.context.lifecycle_hooks {
1714 let conv_len = self.conversation_history.read().await.len();
1715 let iter_ctx = self.build_iteration_context(
1716 iterations,
1717 total_tokens_used,
1718 total_cost_usd,
1719 &start_time,
1720 conv_len,
1721 );
1722
1723 if let Some(budget) = self.config.context_budget_tokens {
1725 let mut history = self.conversation_history.write().await;
1726 let mut view = ConversationView::new(&mut history);
1727 let est_tokens = view.estimated_tokens();
1728 if est_tokens > budget {
1729 hooks
1730 .on_context_pressure(&iter_ctx, &mut view, est_tokens, budget)
1731 .await;
1732 }
1733 }
1734
1735 let mut history = self.conversation_history.write().await;
1737 let mut view = ConversationView::new(&mut history);
1738 hooks.on_after_iteration(&iter_ctx, &mut view).await;
1739 }
1740 }
1741 }
1742
1743 fn build_iteration_context<'a>(
1745 &'a self,
1746 iteration: u32,
1747 total_tokens_used: u64,
1748 total_cost_usd: f64,
1749 start_time: &std::time::Instant,
1750 conversation_len: usize,
1751 ) -> IterationContext<'a> {
1752 IterationContext {
1753 agent_id: &self.id,
1754 iteration,
1755 max_iterations: self.config.max_iterations,
1756 total_tokens_used,
1757 total_cost_usd,
1758 elapsed: start_time.elapsed(),
1759 conversation_len,
1760 }
1761 }
1762
1763 async fn last_assistant_text(&self) -> Option<String> {
1765 self.conversation_history
1766 .read()
1767 .await
1768 .iter()
1769 .rev()
1770 .find(|m| m.role == Role::Assistant)
1771 .and_then(|m| m.text())
1772 .map(|t| t.to_string())
1773 }
1774}
1775
1776pub fn spawn_task_agent(agent: Arc<TaskAgent>) -> tokio::task::JoinHandle<Result<TaskAgentResult>> {
1781 tokio::spawn(async move { agent.execute().await })
1782}
1783
1784#[cfg(test)]
1785mod tests {
1786 use super::*;
1787 use crate::communication::CommunicationHub;
1788 use crate::context::AgentContext;
1789 use crate::file_locks::FileLockManager;
1790 use async_trait::async_trait;
1791 use brainwires_core::{
1792 ChatResponse, StreamChunk, Tool, ToolContext, ToolResult, ToolUse, Usage,
1793 };
1794 use brainwires_tool_system::ToolExecutor;
1795 use futures::stream::BoxStream;
1796
1797 struct MockProvider {
1800 responses: std::sync::Mutex<Vec<ChatResponse>>,
1801 }
1802
1803 impl MockProvider {
1804 fn single(text: &str) -> Self {
1805 Self {
1806 responses: std::sync::Mutex::new(vec![ChatResponse {
1807 message: Message::assistant(text),
1808 finish_reason: Some("stop".to_string()),
1809 usage: Usage::default(),
1810 }]),
1811 }
1812 }
1813 }
1814
1815 #[async_trait]
1816 impl Provider for MockProvider {
1817 fn name(&self) -> &str {
1818 "mock"
1819 }
1820
1821 async fn chat(
1822 &self,
1823 _messages: &[Message],
1824 _tools: Option<&[Tool]>,
1825 _options: &ChatOptions,
1826 ) -> Result<ChatResponse> {
1827 let mut guard = self.responses.lock().unwrap();
1828 if guard.is_empty() {
1829 anyhow::bail!("no more mock responses")
1830 }
1831 Ok(guard.remove(0))
1832 }
1833
1834 fn stream_chat<'a>(
1835 &'a self,
1836 _messages: &'a [Message],
1837 _tools: Option<&'a [Tool]>,
1838 _options: &'a ChatOptions,
1839 ) -> BoxStream<'a, Result<StreamChunk>> {
1840 Box::pin(futures::stream::empty())
1841 }
1842 }
1843
1844 struct NoOpExecutor;
1847
1848 #[async_trait]
1849 impl ToolExecutor for NoOpExecutor {
1850 async fn execute(&self, tool_use: &ToolUse, _ctx: &ToolContext) -> Result<ToolResult> {
1851 Ok(ToolResult::success(tool_use.id.clone(), "ok".to_string()))
1852 }
1853
1854 fn available_tools(&self) -> Vec<Tool> {
1855 vec![]
1856 }
1857 }
1858
1859 fn make_context() -> Arc<AgentContext> {
1860 Arc::new(AgentContext::new(
1861 "/tmp",
1862 Arc::new(NoOpExecutor),
1863 Arc::new(CommunicationHub::new()),
1864 Arc::new(FileLockManager::new()),
1865 ))
1866 }
1867
1868 #[tokio::test]
1871 async fn test_creation() {
1872 let task = Task::new("t-1", "Do something");
1873 let agent = TaskAgent::new(
1874 "agent-1".to_string(),
1875 task,
1876 Arc::new(MockProvider::single("done")),
1877 make_context(),
1878 TaskAgentConfig::default(),
1879 );
1880 assert_eq!(agent.id(), "agent-1");
1881 assert_eq!(agent.status().await, TaskAgentStatus::Idle);
1882 }
1883
1884 #[tokio::test]
1885 async fn test_execution_completes() {
1886 let task = Task::new("t-1", "Simple task");
1887 let agent = Arc::new(TaskAgent::new(
1888 "agent-1".to_string(),
1889 task,
1890 Arc::new(MockProvider::single("Task completed successfully")),
1891 make_context(),
1892 TaskAgentConfig {
1893 validation_config: None,
1894 ..Default::default()
1895 },
1896 ));
1897
1898 let result = agent.execute().await.unwrap();
1899 assert!(result.success);
1900 assert_eq!(result.agent_id, "agent-1");
1901 assert_eq!(result.task_id, "t-1");
1902 assert_eq!(result.iterations, 1);
1903 }
1904
1905 #[tokio::test]
1906 async fn test_spawn_task_agent() {
1907 let task = Task::new("t-1", "Background task");
1908 let agent = Arc::new(TaskAgent::new(
1909 "agent-1".to_string(),
1910 task,
1911 Arc::new(MockProvider::single("done")),
1912 make_context(),
1913 TaskAgentConfig {
1914 validation_config: None,
1915 ..Default::default()
1916 },
1917 ));
1918
1919 let handle = spawn_task_agent(agent);
1920 let result = handle.await.unwrap().unwrap();
1921 assert!(result.success);
1922 }
1923
1924 #[tokio::test]
1925 async fn test_status_display() {
1926 assert_eq!(TaskAgentStatus::Idle.to_string(), "Idle");
1927 assert_eq!(
1928 TaskAgentStatus::Working("reading".to_string()).to_string(),
1929 "Working: reading"
1930 );
1931 assert_eq!(
1932 TaskAgentStatus::Failed("oops".to_string()).to_string(),
1933 "Failed: oops"
1934 );
1935 }
1936
1937 #[tokio::test]
1938 async fn test_result_has_execution_graph() {
1939 let task = Task::new("t-1", "Simple task");
1940 let agent = Arc::new(TaskAgent::new(
1941 "agent-1".to_string(),
1942 task,
1943 Arc::new(MockProvider::single("done")),
1944 make_context(),
1945 TaskAgentConfig {
1946 validation_config: None,
1947 ..Default::default()
1948 },
1949 ));
1950
1951 let result = agent.execute().await.unwrap();
1952 assert!(result.success);
1953 assert_eq!(result.execution_graph.steps.len(), 1);
1955 assert_eq!(result.execution_graph.steps[0].iteration, 1);
1956 assert!(!result.execution_graph.prompt_hash.is_empty());
1958 assert_eq!(result.telemetry.total_iterations, 1);
1960 assert!(result.telemetry.success);
1961 assert_eq!(
1962 result.telemetry.prompt_hash,
1963 result.execution_graph.prompt_hash
1964 );
1965 }
1966
1967 #[tokio::test]
1968 async fn test_pre_execute_hook_reject() {
1969 use brainwires_tool_system::{PreHookDecision, ToolPreHook};
1970
1971 struct RejectAll;
1972 #[async_trait]
1973 impl ToolPreHook for RejectAll {
1974 async fn before_execute(
1975 &self,
1976 tool_use: &ToolUse,
1977 _ctx: &ToolContext,
1978 ) -> anyhow::Result<PreHookDecision> {
1979 Ok(PreHookDecision::Reject(format!(
1980 "rejected: {}",
1981 tool_use.name
1982 )))
1983 }
1984 }
1985
1986 struct ToolThenStop;
1988 #[async_trait]
1989 impl Provider for ToolThenStop {
1990 fn name(&self) -> &str {
1991 "tool-then-stop"
1992 }
1993 async fn chat(
1994 &self,
1995 messages: &[Message],
1996 _tools: Option<&[Tool]>,
1997 _options: &ChatOptions,
1998 ) -> Result<ChatResponse> {
1999 let has_tool_result = messages.iter().any(|m| {
2001 matches!(&m.content, MessageContent::Blocks(b) if b.iter().any(|cb| matches!(cb, ContentBlock::ToolResult { .. })))
2002 });
2003 if has_tool_result {
2004 return Ok(ChatResponse {
2005 message: Message::assistant("done after hook rejection"),
2006 finish_reason: Some("stop".to_string()),
2007 usage: Usage::default(),
2008 });
2009 }
2010 Ok(ChatResponse {
2011 message: Message {
2012 role: Role::Assistant,
2013 content: MessageContent::Blocks(vec![ContentBlock::ToolUse {
2014 id: "tu-1".to_string(),
2015 name: "bash".to_string(),
2016 input: serde_json::json!({"command": "echo hi"}),
2017 }]),
2018 name: None,
2019 metadata: None,
2020 },
2021 finish_reason: None,
2022 usage: Usage::default(),
2023 })
2024 }
2025 fn stream_chat<'a>(
2026 &'a self,
2027 _messages: &'a [Message],
2028 _tools: Option<&'a [Tool]>,
2029 _options: &'a ChatOptions,
2030 ) -> futures::stream::BoxStream<'a, Result<brainwires_core::StreamChunk>> {
2031 Box::pin(futures::stream::empty())
2032 }
2033 }
2034
2035 let ctx = Arc::new(
2036 AgentContext::new(
2037 "/tmp",
2038 Arc::new(NoOpExecutor),
2039 Arc::new(CommunicationHub::new()),
2040 Arc::new(FileLockManager::new()),
2041 )
2042 .with_pre_execute_hook(Arc::new(RejectAll)),
2043 );
2044
2045 let task = Task::new("t-hook", "test hook rejection");
2046 let agent = Arc::new(TaskAgent::new(
2047 "agent-hook".to_string(),
2048 task,
2049 Arc::new(ToolThenStop),
2050 ctx,
2051 TaskAgentConfig {
2052 validation_config: None,
2053 ..Default::default()
2054 },
2055 ));
2056
2057 let result = agent.execute().await.unwrap();
2058 assert!(result.success);
2059 let rejected: Vec<_> = result
2061 .execution_graph
2062 .steps
2063 .iter()
2064 .flat_map(|s| s.tool_calls.iter())
2065 .filter(|tc| tc.is_error)
2066 .collect();
2067 assert_eq!(rejected.len(), 1);
2068 assert_eq!(rejected[0].tool_name, "bash");
2069 assert!(
2071 result
2072 .execution_graph
2073 .tool_sequence
2074 .contains(&"bash".to_string())
2075 );
2076 }
2077}