1use crate::context::{ContextProvider, ContextQuery, ContextResult};
13use crate::hitl::ConfirmationProvider;
14use crate::hooks::{
15 ErrorType, GenerateEndEvent, GenerateStartEvent, HookEvent, HookExecutor, HookResult,
16 OnErrorEvent, PostResponseEvent, PostToolUseEvent, PrePromptEvent, PreToolUseEvent,
17 TokenUsageInfo, ToolCallInfo, ToolResultData,
18};
19use crate::llm::{LlmClient, LlmResponse, Message, TokenUsage, ToolDefinition};
20use crate::permissions::{PermissionChecker, PermissionDecision};
21use crate::planning::{AgentGoal, ExecutionPlan, TaskStatus};
22use crate::prompts::SystemPromptSlots;
23use crate::queue::SessionCommand;
24use crate::session_lane_queue::SessionLaneQueue;
25use crate::tool_search::ToolIndex;
26use crate::tools::{ToolContext, ToolExecutor, ToolStreamEvent};
27use anyhow::{Context, Result};
28use async_trait::async_trait;
29use futures::future::join_all;
30use serde::{Deserialize, Serialize};
31use serde_json::Value;
32use std::sync::Arc;
33use std::time::Duration;
34use tokio::sync::{mpsc, RwLock};
35
36const MAX_TOOL_ROUNDS: usize = 50;
38
39#[derive(Clone)]
41pub struct AgentConfig {
42 pub prompt_slots: SystemPromptSlots,
48 pub tools: Vec<ToolDefinition>,
49 pub max_tool_rounds: usize,
50 pub security_provider: Option<Arc<dyn crate::security::SecurityProvider>>,
52 pub permission_checker: Option<Arc<dyn PermissionChecker>>,
54 pub confirmation_manager: Option<Arc<dyn ConfirmationProvider>>,
56 pub context_providers: Vec<Arc<dyn ContextProvider>>,
58 pub planning_enabled: bool,
60 pub goal_tracking: bool,
62 pub hook_engine: Option<Arc<dyn HookExecutor>>,
64 pub skill_registry: Option<Arc<crate::skills::SkillRegistry>>,
66 pub max_parse_retries: u32,
72 pub tool_timeout_ms: Option<u64>,
78 pub circuit_breaker_threshold: u32,
85 pub auto_compact: bool,
87 pub auto_compact_threshold: f32,
90 pub max_context_tokens: usize,
93 pub llm_client: Option<Arc<dyn LlmClient>>,
95 pub memory: Option<Arc<crate::memory::AgentMemory>>,
97 pub continuation_enabled: bool,
105 pub max_continuation_turns: u32,
109 pub tool_index: Option<ToolIndex>,
114}
115
116impl std::fmt::Debug for AgentConfig {
117 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118 f.debug_struct("AgentConfig")
119 .field("prompt_slots", &self.prompt_slots)
120 .field("tools", &self.tools)
121 .field("max_tool_rounds", &self.max_tool_rounds)
122 .field("security_provider", &self.security_provider.is_some())
123 .field("permission_checker", &self.permission_checker.is_some())
124 .field("confirmation_manager", &self.confirmation_manager.is_some())
125 .field("context_providers", &self.context_providers.len())
126 .field("planning_enabled", &self.planning_enabled)
127 .field("goal_tracking", &self.goal_tracking)
128 .field("hook_engine", &self.hook_engine.is_some())
129 .field(
130 "skill_registry",
131 &self.skill_registry.as_ref().map(|r| r.len()),
132 )
133 .field("max_parse_retries", &self.max_parse_retries)
134 .field("tool_timeout_ms", &self.tool_timeout_ms)
135 .field("circuit_breaker_threshold", &self.circuit_breaker_threshold)
136 .field("auto_compact", &self.auto_compact)
137 .field("auto_compact_threshold", &self.auto_compact_threshold)
138 .field("max_context_tokens", &self.max_context_tokens)
139 .field("continuation_enabled", &self.continuation_enabled)
140 .field("max_continuation_turns", &self.max_continuation_turns)
141 .field("memory", &self.memory.is_some())
142 .field("tool_index", &self.tool_index.as_ref().map(|i| i.len()))
143 .finish()
144 }
145}
146
147impl Default for AgentConfig {
148 fn default() -> Self {
149 Self {
150 prompt_slots: SystemPromptSlots::default(),
151 tools: Vec::new(), max_tool_rounds: MAX_TOOL_ROUNDS,
153 security_provider: None,
154 permission_checker: None,
155 confirmation_manager: None,
156 context_providers: Vec::new(),
157 planning_enabled: false,
158 goal_tracking: false,
159 hook_engine: None,
160 skill_registry: Some(Arc::new(crate::skills::SkillRegistry::with_builtins())),
161 max_parse_retries: 2,
162 tool_timeout_ms: None,
163 circuit_breaker_threshold: 3,
164 auto_compact: false,
165 auto_compact_threshold: 0.80,
166 max_context_tokens: 200_000,
167 llm_client: None,
168 memory: None,
169 continuation_enabled: true,
170 max_continuation_turns: 3,
171 tool_index: None,
172 }
173 }
174}
175
176#[derive(Debug, Clone, Serialize, Deserialize)]
182#[serde(tag = "type")]
183#[non_exhaustive]
184pub enum AgentEvent {
185 #[serde(rename = "agent_start")]
187 Start { prompt: String },
188
189 #[serde(rename = "turn_start")]
191 TurnStart { turn: usize },
192
193 #[serde(rename = "text_delta")]
195 TextDelta { text: String },
196
197 #[serde(rename = "tool_start")]
199 ToolStart { id: String, name: String },
200
201 #[serde(rename = "tool_input_delta")]
203 ToolInputDelta { delta: String },
204
205 #[serde(rename = "tool_end")]
207 ToolEnd {
208 id: String,
209 name: String,
210 output: String,
211 exit_code: i32,
212 #[serde(skip_serializing_if = "Option::is_none")]
213 metadata: Option<serde_json::Value>,
214 },
215
216 #[serde(rename = "tool_output_delta")]
218 ToolOutputDelta {
219 id: String,
220 name: String,
221 delta: String,
222 },
223
224 #[serde(rename = "turn_end")]
226 TurnEnd { turn: usize, usage: TokenUsage },
227
228 #[serde(rename = "agent_end")]
230 End {
231 text: String,
232 usage: TokenUsage,
233 #[serde(skip_serializing_if = "Option::is_none")]
234 meta: Option<crate::llm::LlmResponseMeta>,
235 },
236
237 #[serde(rename = "error")]
239 Error { message: String },
240
241 #[serde(rename = "confirmation_required")]
243 ConfirmationRequired {
244 tool_id: String,
245 tool_name: String,
246 args: serde_json::Value,
247 timeout_ms: u64,
248 },
249
250 #[serde(rename = "confirmation_received")]
252 ConfirmationReceived {
253 tool_id: String,
254 approved: bool,
255 reason: Option<String>,
256 },
257
258 #[serde(rename = "confirmation_timeout")]
260 ConfirmationTimeout {
261 tool_id: String,
262 action_taken: String, },
264
265 #[serde(rename = "external_task_pending")]
267 ExternalTaskPending {
268 task_id: String,
269 session_id: String,
270 lane: crate::hitl::SessionLane,
271 command_type: String,
272 payload: serde_json::Value,
273 timeout_ms: u64,
274 },
275
276 #[serde(rename = "external_task_completed")]
278 ExternalTaskCompleted {
279 task_id: String,
280 session_id: String,
281 success: bool,
282 },
283
284 #[serde(rename = "permission_denied")]
286 PermissionDenied {
287 tool_id: String,
288 tool_name: String,
289 args: serde_json::Value,
290 reason: String,
291 },
292
293 #[serde(rename = "context_resolving")]
295 ContextResolving { providers: Vec<String> },
296
297 #[serde(rename = "context_resolved")]
299 ContextResolved {
300 total_items: usize,
301 total_tokens: usize,
302 },
303
304 #[serde(rename = "command_dead_lettered")]
309 CommandDeadLettered {
310 command_id: String,
311 command_type: String,
312 lane: String,
313 error: String,
314 attempts: u32,
315 },
316
317 #[serde(rename = "command_retry")]
319 CommandRetry {
320 command_id: String,
321 command_type: String,
322 lane: String,
323 attempt: u32,
324 delay_ms: u64,
325 },
326
327 #[serde(rename = "queue_alert")]
329 QueueAlert {
330 level: String,
331 alert_type: String,
332 message: String,
333 },
334
335 #[serde(rename = "task_updated")]
340 TaskUpdated {
341 session_id: String,
342 tasks: Vec<crate::planning::Task>,
343 },
344
345 #[serde(rename = "memory_stored")]
350 MemoryStored {
351 memory_id: String,
352 memory_type: String,
353 importance: f32,
354 tags: Vec<String>,
355 },
356
357 #[serde(rename = "memory_recalled")]
359 MemoryRecalled {
360 memory_id: String,
361 content: String,
362 relevance: f32,
363 },
364
365 #[serde(rename = "memories_searched")]
367 MemoriesSearched {
368 query: Option<String>,
369 tags: Vec<String>,
370 result_count: usize,
371 },
372
373 #[serde(rename = "memory_cleared")]
375 MemoryCleared {
376 tier: String, count: u64,
378 },
379
380 #[serde(rename = "subagent_start")]
385 SubagentStart {
386 task_id: String,
388 session_id: String,
390 parent_session_id: String,
392 agent: String,
394 description: String,
396 },
397
398 #[serde(rename = "subagent_progress")]
400 SubagentProgress {
401 task_id: String,
403 session_id: String,
405 status: String,
407 metadata: serde_json::Value,
409 },
410
411 #[serde(rename = "subagent_end")]
413 SubagentEnd {
414 task_id: String,
416 session_id: String,
418 agent: String,
420 output: String,
422 success: bool,
424 },
425
426 #[serde(rename = "planning_start")]
431 PlanningStart { prompt: String },
432
433 #[serde(rename = "planning_end")]
435 PlanningEnd {
436 plan: ExecutionPlan,
437 estimated_steps: usize,
438 },
439
440 #[serde(rename = "step_start")]
442 StepStart {
443 step_id: String,
444 description: String,
445 step_number: usize,
446 total_steps: usize,
447 },
448
449 #[serde(rename = "step_end")]
451 StepEnd {
452 step_id: String,
453 status: TaskStatus,
454 step_number: usize,
455 total_steps: usize,
456 },
457
458 #[serde(rename = "goal_extracted")]
460 GoalExtracted { goal: AgentGoal },
461
462 #[serde(rename = "goal_progress")]
464 GoalProgress {
465 goal: String,
466 progress: f32,
467 completed_steps: usize,
468 total_steps: usize,
469 },
470
471 #[serde(rename = "goal_achieved")]
473 GoalAchieved {
474 goal: String,
475 total_steps: usize,
476 duration_ms: i64,
477 },
478
479 #[serde(rename = "context_compacted")]
484 ContextCompacted {
485 session_id: String,
486 before_messages: usize,
487 after_messages: usize,
488 percent_before: f32,
489 },
490
491 #[serde(rename = "persistence_failed")]
496 PersistenceFailed {
497 session_id: String,
498 operation: String,
499 error: String,
500 },
501
502 #[serde(rename = "btw_answer")]
510 BtwAnswer {
511 question: String,
512 answer: String,
513 usage: TokenUsage,
514 },
515}
516
517#[derive(Debug, Clone)]
519pub struct AgentResult {
520 pub text: String,
521 pub messages: Vec<Message>,
522 pub usage: TokenUsage,
523 pub tool_calls_count: usize,
524}
525
526pub struct ToolCommand {
534 tool_executor: Arc<ToolExecutor>,
535 tool_name: String,
536 tool_args: Value,
537 tool_context: ToolContext,
538 skill_registry: Option<Arc<crate::skills::SkillRegistry>>,
539}
540
541impl ToolCommand {
542 pub fn new(
544 tool_executor: Arc<ToolExecutor>,
545 tool_name: String,
546 tool_args: Value,
547 tool_context: ToolContext,
548 skill_registry: Option<Arc<crate::skills::SkillRegistry>>,
549 ) -> Self {
550 Self {
551 tool_executor,
552 tool_name,
553 tool_args,
554 tool_context,
555 skill_registry,
556 }
557 }
558}
559
560#[async_trait]
561impl SessionCommand for ToolCommand {
562 async fn execute(&self) -> Result<Value> {
563 if let Some(registry) = &self.skill_registry {
565 let instruction_skills = registry.by_kind(crate::skills::SkillKind::Instruction);
566
567 let has_restrictions = instruction_skills.iter().any(|s| s.allowed_tools.is_some());
569
570 if has_restrictions {
571 let mut allowed = false;
572
573 for skill in &instruction_skills {
574 if skill.is_tool_allowed(&self.tool_name) {
575 allowed = true;
576 break;
577 }
578 }
579
580 if !allowed {
581 return Err(anyhow::anyhow!(
582 "Tool '{}' is not allowed by any active skill. Active skills restrict tools to their allowed-tools lists.",
583 self.tool_name
584 ));
585 }
586 }
587 }
588
589 let result = self
591 .tool_executor
592 .execute_with_context(&self.tool_name, &self.tool_args, &self.tool_context)
593 .await?;
594 Ok(serde_json::json!({
595 "output": result.output,
596 "exit_code": result.exit_code,
597 "metadata": result.metadata,
598 }))
599 }
600
601 fn command_type(&self) -> &str {
602 &self.tool_name
603 }
604
605 fn payload(&self) -> Value {
606 self.tool_args.clone()
607 }
608}
609
610#[derive(Clone)]
616pub struct AgentLoop {
617 llm_client: Arc<dyn LlmClient>,
618 tool_executor: Arc<ToolExecutor>,
619 tool_context: ToolContext,
620 config: AgentConfig,
621 tool_metrics: Option<Arc<RwLock<crate::telemetry::ToolMetrics>>>,
623 command_queue: Option<Arc<SessionLaneQueue>>,
625}
626
627impl AgentLoop {
628 pub fn new(
629 llm_client: Arc<dyn LlmClient>,
630 tool_executor: Arc<ToolExecutor>,
631 tool_context: ToolContext,
632 config: AgentConfig,
633 ) -> Self {
634 Self {
635 llm_client,
636 tool_executor,
637 tool_context,
638 config,
639 tool_metrics: None,
640 command_queue: None,
641 }
642 }
643
644 pub fn with_tool_metrics(
646 mut self,
647 metrics: Arc<RwLock<crate::telemetry::ToolMetrics>>,
648 ) -> Self {
649 self.tool_metrics = Some(metrics);
650 self
651 }
652
653 pub fn with_queue(mut self, queue: Arc<SessionLaneQueue>) -> Self {
658 self.command_queue = Some(queue);
659 self
660 }
661
662 async fn execute_tool_timed(
668 &self,
669 name: &str,
670 args: &serde_json::Value,
671 ctx: &ToolContext,
672 ) -> anyhow::Result<crate::tools::ToolResult> {
673 let fut = self.tool_executor.execute_with_context(name, args, ctx);
674 if let Some(timeout_ms) = self.config.tool_timeout_ms {
675 match tokio::time::timeout(Duration::from_millis(timeout_ms), fut).await {
676 Ok(result) => result,
677 Err(_) => Err(anyhow::anyhow!(
678 "Tool '{}' timed out after {}ms",
679 name,
680 timeout_ms
681 )),
682 }
683 } else {
684 fut.await
685 }
686 }
687
688 fn tool_result_to_tuple(
690 result: anyhow::Result<crate::tools::ToolResult>,
691 ) -> (
692 String,
693 i32,
694 bool,
695 Option<serde_json::Value>,
696 Vec<crate::llm::Attachment>,
697 ) {
698 match result {
699 Ok(r) => (
700 r.output,
701 r.exit_code,
702 r.exit_code != 0,
703 r.metadata,
704 r.images,
705 ),
706 Err(e) => {
707 let msg = e.to_string();
708 let hint = if Self::is_transient_error(&msg) {
710 " [transient — you may retry this tool call]"
711 } else {
712 " [permanent — do not retry without changing the arguments]"
713 };
714 (
715 format!("Tool execution error: {}{}", msg, hint),
716 1,
717 true,
718 None,
719 Vec::new(),
720 )
721 }
722 }
723 }
724
725 fn detect_project_hint(workspace: &std::path::Path) -> String {
729 struct Marker {
730 file: &'static str,
731 lang: &'static str,
732 tip: &'static str,
733 }
734
735 let markers = [
736 Marker {
737 file: "Cargo.toml",
738 lang: "Rust",
739 tip: "Use `cargo build`, `cargo test`, `cargo clippy`, and `cargo fmt`. \
740 Prefer `anyhow` / `thiserror` for error handling. \
741 Follow the Microsoft Rust Guidelines (no panics in library code, \
742 async-first with Tokio).",
743 },
744 Marker {
745 file: "package.json",
746 lang: "Node.js / TypeScript",
747 tip: "Check `package.json` for the package manager (npm/yarn/pnpm/bun) \
748 and available scripts. Prefer TypeScript with strict mode. \
749 Use ESM imports unless the project is CommonJS.",
750 },
751 Marker {
752 file: "pyproject.toml",
753 lang: "Python",
754 tip: "Use the package manager declared in `pyproject.toml` \
755 (uv, poetry, hatch, etc.). Prefer type hints and async/await for I/O.",
756 },
757 Marker {
758 file: "setup.py",
759 lang: "Python",
760 tip: "Legacy Python project. Prefer type hints and async/await for I/O.",
761 },
762 Marker {
763 file: "requirements.txt",
764 lang: "Python",
765 tip: "Python project with pip-style dependencies. \
766 Prefer type hints and async/await for I/O.",
767 },
768 Marker {
769 file: "go.mod",
770 lang: "Go",
771 tip: "Use `go build ./...` and `go test ./...`. \
772 Follow standard Go project layout. Use `gofmt` for formatting.",
773 },
774 Marker {
775 file: "pom.xml",
776 lang: "Java / Maven",
777 tip: "Use `mvn compile`, `mvn test`, `mvn package`. \
778 Follow standard Maven project structure.",
779 },
780 Marker {
781 file: "build.gradle",
782 lang: "Java / Gradle",
783 tip: "Use `./gradlew build` and `./gradlew test`. \
784 Follow standard Gradle project structure.",
785 },
786 Marker {
787 file: "build.gradle.kts",
788 lang: "Kotlin / Gradle",
789 tip: "Use `./gradlew build` and `./gradlew test`. \
790 Prefer Kotlin coroutines for async work.",
791 },
792 Marker {
793 file: "CMakeLists.txt",
794 lang: "C / C++",
795 tip: "Use `cmake -B build && cmake --build build`. \
796 Check for `compile_commands.json` for IDE tooling.",
797 },
798 Marker {
799 file: "Makefile",
800 lang: "C / C++ (or generic)",
801 tip: "Use `make` or `make <target>`. \
802 Check available targets with `make help` or by reading the Makefile.",
803 },
804 ];
805
806 let is_dotnet = workspace.join("*.csproj").exists() || {
808 std::fs::read_dir(workspace)
810 .map(|entries| {
811 entries.flatten().any(|e| {
812 let name = e.file_name();
813 let s = name.to_string_lossy();
814 s.ends_with(".csproj") || s.ends_with(".sln")
815 })
816 })
817 .unwrap_or(false)
818 };
819
820 if is_dotnet {
821 return "## Project Context\n\nThis is a **C# / .NET** project. \
822 Use `dotnet build`, `dotnet test`, and `dotnet run`. \
823 Follow C# coding conventions and async/await patterns."
824 .to_string();
825 }
826
827 for marker in &markers {
828 if workspace.join(marker.file).exists() {
829 return format!(
830 "## Project Context\n\nThis is a **{}** project. {}",
831 marker.lang, marker.tip
832 );
833 }
834 }
835
836 String::new()
837 }
838
839 fn is_transient_error(msg: &str) -> bool {
842 let lower = msg.to_lowercase();
843 lower.contains("timeout")
844 || lower.contains("timed out")
845 || lower.contains("connection refused")
846 || lower.contains("connection reset")
847 || lower.contains("broken pipe")
848 || lower.contains("temporarily unavailable")
849 || lower.contains("resource temporarily unavailable")
850 || lower.contains("os error 11") || lower.contains("os error 35") || lower.contains("rate limit")
853 || lower.contains("too many requests")
854 || lower.contains("service unavailable")
855 || lower.contains("network unreachable")
856 }
857
858 fn is_parallel_safe_write(name: &str, _args: &serde_json::Value) -> bool {
861 matches!(
862 name,
863 "write_file" | "edit_file" | "create_file" | "append_to_file" | "replace_in_file"
864 )
865 }
866
867 fn extract_write_path(args: &serde_json::Value) -> Option<String> {
869 args.get("path")
872 .and_then(|v| v.as_str())
873 .map(|s| s.to_string())
874 }
875
876 async fn execute_tool_queued_or_direct(
878 &self,
879 name: &str,
880 args: &serde_json::Value,
881 ctx: &ToolContext,
882 ) -> anyhow::Result<crate::tools::ToolResult> {
883 if let Some(ref queue) = self.command_queue {
884 let command = ToolCommand::new(
885 Arc::clone(&self.tool_executor),
886 name.to_string(),
887 args.clone(),
888 ctx.clone(),
889 self.config.skill_registry.clone(),
890 );
891 let rx = queue.submit_by_tool(name, Box::new(command)).await;
892 match rx.await {
893 Ok(Ok(value)) => {
894 let output = value["output"]
895 .as_str()
896 .ok_or_else(|| {
897 anyhow::anyhow!(
898 "Queue result missing 'output' field for tool '{}'",
899 name
900 )
901 })?
902 .to_string();
903 let exit_code = value["exit_code"].as_i64().unwrap_or(0) as i32;
904 return Ok(crate::tools::ToolResult {
905 name: name.to_string(),
906 output,
907 exit_code,
908 metadata: None,
909 images: Vec::new(),
910 });
911 }
912 Ok(Err(e)) => {
913 tracing::warn!(
914 "Queue execution failed for tool '{}', falling back to direct: {}",
915 name,
916 e
917 );
918 }
919 Err(_) => {
920 tracing::warn!(
921 "Queue channel closed for tool '{}', falling back to direct",
922 name
923 );
924 }
925 }
926 }
927 self.execute_tool_timed(name, args, ctx).await
928 }
929
930 async fn call_llm(
941 &self,
942 messages: &[Message],
943 system: Option<&str>,
944 event_tx: &Option<mpsc::Sender<AgentEvent>>,
945 cancel_token: &tokio_util::sync::CancellationToken,
946 ) -> anyhow::Result<LlmResponse> {
947 let tools = if let Some(ref index) = self.config.tool_index {
949 let query = messages
950 .iter()
951 .rev()
952 .find(|m| m.role == "user")
953 .and_then(|m| {
954 m.content.iter().find_map(|b| match b {
955 crate::llm::ContentBlock::Text { text } => Some(text.as_str()),
956 _ => None,
957 })
958 })
959 .unwrap_or("");
960 let matches = index.search(query, index.len());
961 let matched_names: std::collections::HashSet<&str> =
962 matches.iter().map(|m| m.name.as_str()).collect();
963 self.config
964 .tools
965 .iter()
966 .filter(|t| matched_names.contains(t.name.as_str()))
967 .cloned()
968 .collect::<Vec<_>>()
969 } else {
970 self.config.tools.clone()
971 };
972
973 if event_tx.is_some() {
974 let mut stream_rx = self
975 .llm_client
976 .complete_streaming(messages, system, &tools)
977 .await
978 .context("LLM streaming call failed")?;
979
980 let mut final_response: Option<LlmResponse> = None;
981 loop {
982 tokio::select! {
983 _ = cancel_token.cancelled() => {
984 tracing::info!("🛑 LLM streaming cancelled by CancellationToken");
985 anyhow::bail!("Operation cancelled by user");
986 }
987 event = stream_rx.recv() => {
988 match event {
989 Some(crate::llm::StreamEvent::TextDelta(text)) => {
990 if let Some(tx) = event_tx {
991 tx.send(AgentEvent::TextDelta { text }).await.ok();
992 }
993 }
994 Some(crate::llm::StreamEvent::ToolUseStart { id, name }) => {
995 if let Some(tx) = event_tx {
996 tx.send(AgentEvent::ToolStart { id, name }).await.ok();
997 }
998 }
999 Some(crate::llm::StreamEvent::ToolUseInputDelta(delta)) => {
1000 if let Some(tx) = event_tx {
1001 tx.send(AgentEvent::ToolInputDelta { delta }).await.ok();
1002 }
1003 }
1004 Some(crate::llm::StreamEvent::Done(resp)) => {
1005 final_response = Some(resp);
1006 break;
1007 }
1008 None => break,
1009 }
1010 }
1011 }
1012 }
1013 final_response.context("Stream ended without final response")
1014 } else {
1015 self.llm_client
1016 .complete(messages, system, &tools)
1017 .await
1018 .context("LLM call failed")
1019 }
1020 }
1021
1022 fn streaming_tool_context(
1031 &self,
1032 event_tx: &Option<mpsc::Sender<AgentEvent>>,
1033 tool_id: &str,
1034 tool_name: &str,
1035 ) -> ToolContext {
1036 let mut ctx = self.tool_context.clone();
1037 if let Some(agent_tx) = event_tx {
1038 let (tool_tx, mut tool_rx) = mpsc::channel::<ToolStreamEvent>(64);
1039 ctx.event_tx = Some(tool_tx);
1040
1041 let agent_tx = agent_tx.clone();
1042 let tool_id = tool_id.to_string();
1043 let tool_name = tool_name.to_string();
1044 tokio::spawn(async move {
1045 while let Some(event) = tool_rx.recv().await {
1046 match event {
1047 ToolStreamEvent::OutputDelta(delta) => {
1048 agent_tx
1049 .send(AgentEvent::ToolOutputDelta {
1050 id: tool_id.clone(),
1051 name: tool_name.clone(),
1052 delta,
1053 })
1054 .await
1055 .ok();
1056 }
1057 }
1058 }
1059 });
1060 }
1061 ctx
1062 }
1063
1064 async fn resolve_context(&self, prompt: &str, session_id: Option<&str>) -> Vec<ContextResult> {
1068 if self.config.context_providers.is_empty() {
1069 return Vec::new();
1070 }
1071
1072 let query = ContextQuery::new(prompt).with_session_id(session_id.unwrap_or(""));
1073
1074 let futures = self
1075 .config
1076 .context_providers
1077 .iter()
1078 .map(|p| p.query(&query));
1079 let outcomes = join_all(futures).await;
1080
1081 outcomes
1082 .into_iter()
1083 .enumerate()
1084 .filter_map(|(i, r)| match r {
1085 Ok(result) if !result.is_empty() => Some(result),
1086 Ok(_) => None,
1087 Err(e) => {
1088 tracing::warn!(
1089 "Context provider '{}' failed: {}",
1090 self.config.context_providers[i].name(),
1091 e
1092 );
1093 None
1094 }
1095 })
1096 .collect()
1097 }
1098
1099 fn looks_incomplete(text: &str) -> bool {
1107 let t = text.trim();
1108 if t.is_empty() {
1109 return true;
1110 }
1111 if t.len() < 80 && !t.contains('\n') {
1113 let ends_continuation =
1116 t.ends_with(':') || t.ends_with("...") || t.ends_with('…') || t.ends_with(',');
1117 if ends_continuation {
1118 return true;
1119 }
1120 }
1121 let incomplete_phrases = [
1123 "i'll ",
1124 "i will ",
1125 "let me ",
1126 "i need to ",
1127 "i should ",
1128 "next, i",
1129 "first, i",
1130 "now i",
1131 "i'll start",
1132 "i'll begin",
1133 "i'll now",
1134 "let's start",
1135 "let's begin",
1136 "to do this",
1137 "i'm going to",
1138 ];
1139 let lower = t.to_lowercase();
1140 for phrase in &incomplete_phrases {
1141 if lower.contains(phrase) {
1142 return true;
1143 }
1144 }
1145 false
1146 }
1147
1148 fn system_prompt(&self) -> String {
1150 self.config.prompt_slots.build()
1151 }
1152
1153 fn build_augmented_system_prompt(&self, context_results: &[ContextResult]) -> Option<String> {
1155 let base = self.system_prompt();
1156
1157 let live_tools = self.tool_executor.definitions();
1159 let mcp_tools: Vec<&ToolDefinition> = live_tools
1160 .iter()
1161 .filter(|t| t.name.starts_with("mcp__"))
1162 .collect();
1163
1164 let mcp_section = if mcp_tools.is_empty() {
1165 String::new()
1166 } else {
1167 let mut lines = vec![
1168 "## MCP Tools".to_string(),
1169 String::new(),
1170 "The following MCP (Model Context Protocol) tools are available. Use them when the task requires external capabilities beyond the built-in tools:".to_string(),
1171 String::new(),
1172 ];
1173 for tool in &mcp_tools {
1174 let display = format!("- `{}` — {}", tool.name, tool.description);
1175 lines.push(display);
1176 }
1177 lines.join("\n")
1178 };
1179
1180 let parts: Vec<&str> = [base.as_str(), mcp_section.as_str()]
1181 .iter()
1182 .filter(|s| !s.is_empty())
1183 .copied()
1184 .collect();
1185
1186 let project_hint = if self.config.prompt_slots.guidelines.is_none() {
1189 Self::detect_project_hint(&self.tool_context.workspace)
1190 } else {
1191 String::new()
1192 };
1193
1194 if context_results.is_empty() {
1195 if project_hint.is_empty() {
1196 return Some(parts.join("\n\n"));
1197 }
1198 return Some(format!("{}\n\n{}", parts.join("\n\n"), project_hint));
1199 }
1200
1201 let context_xml: String = context_results
1203 .iter()
1204 .map(|r| r.to_xml())
1205 .collect::<Vec<_>>()
1206 .join("\n\n");
1207
1208 if project_hint.is_empty() {
1209 Some(format!("{}\n\n{}", parts.join("\n\n"), context_xml))
1210 } else {
1211 Some(format!(
1212 "{}\n\n{}\n\n{}",
1213 parts.join("\n\n"),
1214 project_hint,
1215 context_xml
1216 ))
1217 }
1218 }
1219
1220 async fn notify_turn_complete(&self, session_id: &str, prompt: &str, response: &str) {
1222 let futures = self
1223 .config
1224 .context_providers
1225 .iter()
1226 .map(|p| p.on_turn_complete(session_id, prompt, response));
1227 let outcomes = join_all(futures).await;
1228
1229 for (i, result) in outcomes.into_iter().enumerate() {
1230 if let Err(e) = result {
1231 tracing::warn!(
1232 "Context provider '{}' on_turn_complete failed: {}",
1233 self.config.context_providers[i].name(),
1234 e
1235 );
1236 }
1237 }
1238 }
1239
1240 async fn fire_pre_tool_use(
1243 &self,
1244 session_id: &str,
1245 tool_name: &str,
1246 args: &serde_json::Value,
1247 ) -> Option<HookResult> {
1248 if let Some(he) = &self.config.hook_engine {
1249 let event = HookEvent::PreToolUse(PreToolUseEvent {
1250 session_id: session_id.to_string(),
1251 tool: tool_name.to_string(),
1252 args: args.clone(),
1253 working_directory: self.tool_context.workspace.to_string_lossy().to_string(),
1254 recent_tools: Vec::new(),
1255 });
1256 let result = he.fire(&event).await;
1257 if result.is_block() {
1258 return Some(result);
1259 }
1260 }
1261 None
1262 }
1263
1264 async fn fire_post_tool_use(
1266 &self,
1267 session_id: &str,
1268 tool_name: &str,
1269 args: &serde_json::Value,
1270 output: &str,
1271 success: bool,
1272 duration_ms: u64,
1273 ) {
1274 if let Some(he) = &self.config.hook_engine {
1275 let event = HookEvent::PostToolUse(PostToolUseEvent {
1276 session_id: session_id.to_string(),
1277 tool: tool_name.to_string(),
1278 args: args.clone(),
1279 result: ToolResultData {
1280 success,
1281 output: output.to_string(),
1282 exit_code: if success { Some(0) } else { Some(1) },
1283 duration_ms,
1284 },
1285 });
1286 let he = Arc::clone(he);
1287 tokio::spawn(async move {
1288 let _ = he.fire(&event).await;
1289 });
1290 }
1291 }
1292
1293 async fn fire_generate_start(
1295 &self,
1296 session_id: &str,
1297 prompt: &str,
1298 system_prompt: &Option<String>,
1299 ) {
1300 if let Some(he) = &self.config.hook_engine {
1301 let event = HookEvent::GenerateStart(GenerateStartEvent {
1302 session_id: session_id.to_string(),
1303 prompt: prompt.to_string(),
1304 system_prompt: system_prompt.clone(),
1305 model_provider: String::new(),
1306 model_name: String::new(),
1307 available_tools: self.config.tools.iter().map(|t| t.name.clone()).collect(),
1308 });
1309 let _ = he.fire(&event).await;
1310 }
1311 }
1312
1313 async fn fire_generate_end(
1315 &self,
1316 session_id: &str,
1317 prompt: &str,
1318 response: &LlmResponse,
1319 duration_ms: u64,
1320 ) {
1321 if let Some(he) = &self.config.hook_engine {
1322 let tool_calls: Vec<ToolCallInfo> = response
1323 .tool_calls()
1324 .iter()
1325 .map(|tc| ToolCallInfo {
1326 name: tc.name.clone(),
1327 args: tc.args.clone(),
1328 })
1329 .collect();
1330
1331 let event = HookEvent::GenerateEnd(GenerateEndEvent {
1332 session_id: session_id.to_string(),
1333 prompt: prompt.to_string(),
1334 response_text: response.text().to_string(),
1335 tool_calls,
1336 usage: TokenUsageInfo {
1337 prompt_tokens: response.usage.prompt_tokens as i32,
1338 completion_tokens: response.usage.completion_tokens as i32,
1339 total_tokens: response.usage.total_tokens as i32,
1340 },
1341 duration_ms,
1342 });
1343 let _ = he.fire(&event).await;
1344 }
1345 }
1346
1347 async fn fire_pre_prompt(
1350 &self,
1351 session_id: &str,
1352 prompt: &str,
1353 system_prompt: &Option<String>,
1354 message_count: usize,
1355 ) -> Option<String> {
1356 if let Some(he) = &self.config.hook_engine {
1357 let event = HookEvent::PrePrompt(PrePromptEvent {
1358 session_id: session_id.to_string(),
1359 prompt: prompt.to_string(),
1360 system_prompt: system_prompt.clone(),
1361 message_count,
1362 });
1363 let result = he.fire(&event).await;
1364 if let HookResult::Continue(Some(modified)) = result {
1365 if let Some(new_prompt) = modified.get("prompt").and_then(|v| v.as_str()) {
1367 return Some(new_prompt.to_string());
1368 }
1369 }
1370 }
1371 None
1372 }
1373
1374 async fn fire_post_response(
1376 &self,
1377 session_id: &str,
1378 response_text: &str,
1379 tool_calls_count: usize,
1380 usage: &TokenUsage,
1381 duration_ms: u64,
1382 ) {
1383 if let Some(he) = &self.config.hook_engine {
1384 let event = HookEvent::PostResponse(PostResponseEvent {
1385 session_id: session_id.to_string(),
1386 response_text: response_text.to_string(),
1387 tool_calls_count,
1388 usage: TokenUsageInfo {
1389 prompt_tokens: usage.prompt_tokens as i32,
1390 completion_tokens: usage.completion_tokens as i32,
1391 total_tokens: usage.total_tokens as i32,
1392 },
1393 duration_ms,
1394 });
1395 let he = Arc::clone(he);
1396 tokio::spawn(async move {
1397 let _ = he.fire(&event).await;
1398 });
1399 }
1400 }
1401
1402 async fn fire_on_error(
1404 &self,
1405 session_id: &str,
1406 error_type: ErrorType,
1407 error_message: &str,
1408 context: serde_json::Value,
1409 ) {
1410 if let Some(he) = &self.config.hook_engine {
1411 let event = HookEvent::OnError(OnErrorEvent {
1412 session_id: session_id.to_string(),
1413 error_type,
1414 error_message: error_message.to_string(),
1415 context,
1416 });
1417 let he = Arc::clone(he);
1418 tokio::spawn(async move {
1419 let _ = he.fire(&event).await;
1420 });
1421 }
1422 }
1423
1424 pub async fn execute(
1430 &self,
1431 history: &[Message],
1432 prompt: &str,
1433 event_tx: Option<mpsc::Sender<AgentEvent>>,
1434 ) -> Result<AgentResult> {
1435 self.execute_with_session(history, prompt, None, event_tx, None)
1436 .await
1437 }
1438
1439 pub async fn execute_from_messages(
1445 &self,
1446 messages: Vec<Message>,
1447 session_id: Option<&str>,
1448 event_tx: Option<mpsc::Sender<AgentEvent>>,
1449 cancel_token: Option<&tokio_util::sync::CancellationToken>,
1450 ) -> Result<AgentResult> {
1451 let default_token = tokio_util::sync::CancellationToken::new();
1452 let token = cancel_token.unwrap_or(&default_token);
1453 tracing::info!(
1454 a3s.session.id = session_id.unwrap_or("none"),
1455 a3s.agent.max_turns = self.config.max_tool_rounds,
1456 "a3s.agent.execute_from_messages started"
1457 );
1458
1459 let effective_prompt = messages
1463 .iter()
1464 .rev()
1465 .find(|m| m.role == "user")
1466 .map(|m| m.text())
1467 .unwrap_or_default();
1468
1469 let result = self
1470 .execute_loop_inner(
1471 &messages,
1472 "",
1473 &effective_prompt,
1474 session_id,
1475 event_tx,
1476 token,
1477 )
1478 .await;
1479
1480 match &result {
1481 Ok(r) => tracing::info!(
1482 a3s.agent.tool_calls_count = r.tool_calls_count,
1483 a3s.llm.total_tokens = r.usage.total_tokens,
1484 "a3s.agent.execute_from_messages completed"
1485 ),
1486 Err(e) => tracing::warn!(
1487 error = %e,
1488 "a3s.agent.execute_from_messages failed"
1489 ),
1490 }
1491
1492 result
1493 }
1494
1495 pub async fn execute_with_session(
1500 &self,
1501 history: &[Message],
1502 prompt: &str,
1503 session_id: Option<&str>,
1504 event_tx: Option<mpsc::Sender<AgentEvent>>,
1505 cancel_token: Option<&tokio_util::sync::CancellationToken>,
1506 ) -> Result<AgentResult> {
1507 let default_token = tokio_util::sync::CancellationToken::new();
1508 let token = cancel_token.unwrap_or(&default_token);
1509 tracing::info!(
1510 a3s.session.id = session_id.unwrap_or("none"),
1511 a3s.agent.max_turns = self.config.max_tool_rounds,
1512 "a3s.agent.execute started"
1513 );
1514
1515 let result = if self.config.planning_enabled {
1517 self.execute_with_planning(history, prompt, event_tx).await
1518 } else {
1519 self.execute_loop(history, prompt, session_id, event_tx, token)
1520 .await
1521 };
1522
1523 match &result {
1524 Ok(r) => {
1525 tracing::info!(
1526 a3s.agent.tool_calls_count = r.tool_calls_count,
1527 a3s.llm.total_tokens = r.usage.total_tokens,
1528 "a3s.agent.execute completed"
1529 );
1530 self.fire_post_response(
1532 session_id.unwrap_or(""),
1533 &r.text,
1534 r.tool_calls_count,
1535 &r.usage,
1536 0, )
1538 .await;
1539 }
1540 Err(e) => {
1541 tracing::warn!(
1542 error = %e,
1543 "a3s.agent.execute failed"
1544 );
1545 self.fire_on_error(
1547 session_id.unwrap_or(""),
1548 ErrorType::Other,
1549 &e.to_string(),
1550 serde_json::json!({"phase": "execute"}),
1551 )
1552 .await;
1553 }
1554 }
1555
1556 result
1557 }
1558
1559 async fn execute_loop(
1565 &self,
1566 history: &[Message],
1567 prompt: &str,
1568 session_id: Option<&str>,
1569 event_tx: Option<mpsc::Sender<AgentEvent>>,
1570 cancel_token: &tokio_util::sync::CancellationToken,
1571 ) -> Result<AgentResult> {
1572 self.execute_loop_inner(history, prompt, prompt, session_id, event_tx, cancel_token)
1575 .await
1576 }
1577
1578 async fn execute_loop_inner(
1583 &self,
1584 history: &[Message],
1585 msg_prompt: &str,
1586 effective_prompt: &str,
1587 session_id: Option<&str>,
1588 event_tx: Option<mpsc::Sender<AgentEvent>>,
1589 cancel_token: &tokio_util::sync::CancellationToken,
1590 ) -> Result<AgentResult> {
1591 let mut messages = history.to_vec();
1592 let mut total_usage = TokenUsage::default();
1593 let mut tool_calls_count = 0;
1594 let mut turn = 0;
1595 let mut parse_error_count: u32 = 0;
1597 let mut continuation_count: u32 = 0;
1599
1600 if let Some(tx) = &event_tx {
1602 tx.send(AgentEvent::Start {
1603 prompt: effective_prompt.to_string(),
1604 })
1605 .await
1606 .ok();
1607 }
1608
1609 let _queue_forward_handle =
1611 if let (Some(ref queue), Some(ref tx)) = (&self.command_queue, &event_tx) {
1612 let mut rx = queue.subscribe();
1613 let tx = tx.clone();
1614 Some(tokio::spawn(async move {
1615 while let Ok(event) = rx.recv().await {
1616 if tx.send(event).await.is_err() {
1617 break;
1618 }
1619 }
1620 }))
1621 } else {
1622 None
1623 };
1624
1625 let built_system_prompt = Some(self.system_prompt());
1627 let hooked_prompt = if let Some(modified) = self
1628 .fire_pre_prompt(
1629 session_id.unwrap_or(""),
1630 effective_prompt,
1631 &built_system_prompt,
1632 messages.len(),
1633 )
1634 .await
1635 {
1636 modified
1637 } else {
1638 effective_prompt.to_string()
1639 };
1640 let effective_prompt = hooked_prompt.as_str();
1641
1642 if let Some(ref sp) = self.config.security_provider {
1644 sp.taint_input(effective_prompt);
1645 }
1646
1647 let system_with_memory = if let Some(ref memory) = self.config.memory {
1649 match memory.recall_similar(effective_prompt, 5).await {
1650 Ok(items) if !items.is_empty() => {
1651 if let Some(tx) = &event_tx {
1652 for item in &items {
1653 tx.send(AgentEvent::MemoryRecalled {
1654 memory_id: item.id.clone(),
1655 content: item.content.clone(),
1656 relevance: item.relevance_score(),
1657 })
1658 .await
1659 .ok();
1660 }
1661 tx.send(AgentEvent::MemoriesSearched {
1662 query: Some(effective_prompt.to_string()),
1663 tags: Vec::new(),
1664 result_count: items.len(),
1665 })
1666 .await
1667 .ok();
1668 }
1669 let memory_context = items
1670 .iter()
1671 .map(|i| format!("- {}", i.content))
1672 .collect::<Vec<_>>()
1673 .join(
1674 "
1675",
1676 );
1677 let base = self.system_prompt();
1678 Some(format!(
1679 "{}
1680
1681## Relevant past experience
1682{}",
1683 base, memory_context
1684 ))
1685 }
1686 _ => Some(self.system_prompt()),
1687 }
1688 } else {
1689 Some(self.system_prompt())
1690 };
1691
1692 let augmented_system = if !self.config.context_providers.is_empty() {
1694 if let Some(tx) = &event_tx {
1696 let provider_names: Vec<String> = self
1697 .config
1698 .context_providers
1699 .iter()
1700 .map(|p| p.name().to_string())
1701 .collect();
1702 tx.send(AgentEvent::ContextResolving {
1703 providers: provider_names,
1704 })
1705 .await
1706 .ok();
1707 }
1708
1709 tracing::info!(
1710 a3s.context.providers = self.config.context_providers.len() as i64,
1711 "Context resolution started"
1712 );
1713 let context_results = self.resolve_context(effective_prompt, session_id).await;
1714
1715 if let Some(tx) = &event_tx {
1717 let total_items: usize = context_results.iter().map(|r| r.items.len()).sum();
1718 let total_tokens: usize = context_results.iter().map(|r| r.total_tokens).sum();
1719
1720 tracing::info!(
1721 context_items = total_items,
1722 context_tokens = total_tokens,
1723 "Context resolution completed"
1724 );
1725
1726 tx.send(AgentEvent::ContextResolved {
1727 total_items,
1728 total_tokens,
1729 })
1730 .await
1731 .ok();
1732 }
1733
1734 self.build_augmented_system_prompt(&context_results)
1735 } else {
1736 Some(self.system_prompt())
1737 };
1738
1739 let base_prompt = self.system_prompt();
1741 let augmented_system = match (augmented_system, system_with_memory) {
1742 (Some(ctx), Some(mem)) if ctx != mem => Some(ctx.replacen(&base_prompt, &mem, 1)),
1743 (Some(ctx), _) => Some(ctx),
1744 (None, mem) => mem,
1745 };
1746
1747 if !msg_prompt.is_empty() {
1749 messages.push(Message::user(msg_prompt));
1750 }
1751
1752 loop {
1753 turn += 1;
1754
1755 if turn > self.config.max_tool_rounds {
1756 let error = format!("Max tool rounds ({}) exceeded", self.config.max_tool_rounds);
1757 if let Some(tx) = &event_tx {
1758 tx.send(AgentEvent::Error {
1759 message: error.clone(),
1760 })
1761 .await
1762 .ok();
1763 }
1764 anyhow::bail!(error);
1765 }
1766
1767 if let Some(tx) = &event_tx {
1769 tx.send(AgentEvent::TurnStart { turn }).await.ok();
1770 }
1771
1772 tracing::info!(
1773 turn = turn,
1774 max_turns = self.config.max_tool_rounds,
1775 "Agent turn started"
1776 );
1777
1778 tracing::info!(
1780 a3s.llm.streaming = event_tx.is_some(),
1781 "LLM completion started"
1782 );
1783
1784 self.fire_generate_start(
1786 session_id.unwrap_or(""),
1787 effective_prompt,
1788 &augmented_system,
1789 )
1790 .await;
1791
1792 let llm_start = std::time::Instant::now();
1793 let response = {
1797 let threshold = self.config.circuit_breaker_threshold.max(1);
1798 let mut attempt = 0u32;
1799 loop {
1800 attempt += 1;
1801 let result = self
1802 .call_llm(
1803 &messages,
1804 augmented_system.as_deref(),
1805 &event_tx,
1806 cancel_token,
1807 )
1808 .await;
1809 match result {
1810 Ok(r) => {
1811 break r;
1812 }
1813 Err(e) if cancel_token.is_cancelled() => {
1815 anyhow::bail!(e);
1816 }
1817 Err(e) if attempt < threshold && (event_tx.is_none() || attempt == 1) => {
1819 tracing::warn!(
1820 turn = turn,
1821 attempt = attempt,
1822 threshold = threshold,
1823 error = %e,
1824 "LLM call failed, will retry"
1825 );
1826 tokio::time::sleep(Duration::from_millis(100 * attempt as u64)).await;
1827 }
1828 Err(e) => {
1830 let msg = if attempt > 1 {
1831 format!(
1832 "LLM circuit breaker triggered: failed after {} attempt(s): {}",
1833 attempt, e
1834 )
1835 } else {
1836 format!("LLM call failed: {}", e)
1837 };
1838 tracing::error!(turn = turn, attempt = attempt, "{}", msg);
1839 self.fire_on_error(
1841 session_id.unwrap_or(""),
1842 ErrorType::LlmFailure,
1843 &msg,
1844 serde_json::json!({"turn": turn, "attempt": attempt}),
1845 )
1846 .await;
1847 if let Some(tx) = &event_tx {
1848 tx.send(AgentEvent::Error {
1849 message: msg.clone(),
1850 })
1851 .await
1852 .ok();
1853 }
1854 anyhow::bail!(msg);
1855 }
1856 }
1857 }
1858 };
1859
1860 total_usage.prompt_tokens += response.usage.prompt_tokens;
1862 total_usage.completion_tokens += response.usage.completion_tokens;
1863 total_usage.total_tokens += response.usage.total_tokens;
1864 let llm_duration = llm_start.elapsed();
1866 tracing::info!(
1867 turn = turn,
1868 streaming = event_tx.is_some(),
1869 prompt_tokens = response.usage.prompt_tokens,
1870 completion_tokens = response.usage.completion_tokens,
1871 total_tokens = response.usage.total_tokens,
1872 stop_reason = response.stop_reason.as_deref().unwrap_or("unknown"),
1873 duration_ms = llm_duration.as_millis() as u64,
1874 "LLM completion finished"
1875 );
1876
1877 self.fire_generate_end(
1879 session_id.unwrap_or(""),
1880 effective_prompt,
1881 &response,
1882 llm_duration.as_millis() as u64,
1883 )
1884 .await;
1885
1886 crate::telemetry::record_llm_usage(
1888 response.usage.prompt_tokens,
1889 response.usage.completion_tokens,
1890 response.usage.total_tokens,
1891 response.stop_reason.as_deref(),
1892 );
1893 tracing::info!(
1895 turn = turn,
1896 a3s.llm.total_tokens = response.usage.total_tokens,
1897 "Turn token usage"
1898 );
1899
1900 messages.push(response.message.clone());
1902
1903 let tool_calls = response.tool_calls();
1905
1906 if let Some(tx) = &event_tx {
1908 tx.send(AgentEvent::TurnEnd {
1909 turn,
1910 usage: response.usage.clone(),
1911 })
1912 .await
1913 .ok();
1914 }
1915
1916 if self.config.auto_compact {
1918 let used = response.usage.prompt_tokens;
1919 let max = self.config.max_context_tokens;
1920 let threshold = self.config.auto_compact_threshold;
1921
1922 if crate::session::compaction::should_auto_compact(used, max, threshold) {
1923 let before_len = messages.len();
1924 let percent_before = used as f32 / max as f32;
1925
1926 tracing::info!(
1927 used_tokens = used,
1928 max_tokens = max,
1929 percent = percent_before,
1930 threshold = threshold,
1931 "Auto-compact triggered"
1932 );
1933
1934 if let Some(pruned) = crate::session::compaction::prune_tool_outputs(&messages)
1936 {
1937 messages = pruned;
1938 tracing::info!("Tool output pruning applied");
1939 }
1940
1941 if let Ok(Some(compacted)) = crate::session::compaction::compact_messages(
1943 session_id.unwrap_or(""),
1944 &messages,
1945 &self.llm_client,
1946 )
1947 .await
1948 {
1949 messages = compacted;
1950 }
1951
1952 if let Some(tx) = &event_tx {
1954 tx.send(AgentEvent::ContextCompacted {
1955 session_id: session_id.unwrap_or("").to_string(),
1956 before_messages: before_len,
1957 after_messages: messages.len(),
1958 percent_before,
1959 })
1960 .await
1961 .ok();
1962 }
1963 }
1964 }
1965
1966 if tool_calls.is_empty() {
1967 let final_text = response.text();
1970
1971 if self.config.continuation_enabled
1972 && continuation_count < self.config.max_continuation_turns
1973 && turn < self.config.max_tool_rounds && Self::looks_incomplete(&final_text)
1975 {
1976 continuation_count += 1;
1977 tracing::info!(
1978 turn = turn,
1979 continuation = continuation_count,
1980 max_continuation = self.config.max_continuation_turns,
1981 "Injecting continuation message — response looks incomplete"
1982 );
1983 messages.push(Message::user(crate::prompts::CONTINUATION));
1985 continue;
1986 }
1987
1988 let final_text = if let Some(ref sp) = self.config.security_provider {
1990 sp.sanitize_output(&final_text)
1991 } else {
1992 final_text
1993 };
1994
1995 tracing::info!(
1997 tool_calls_count = tool_calls_count,
1998 total_prompt_tokens = total_usage.prompt_tokens,
1999 total_completion_tokens = total_usage.completion_tokens,
2000 total_tokens = total_usage.total_tokens,
2001 turns = turn,
2002 "Agent execution completed"
2003 );
2004
2005 if let Some(tx) = &event_tx {
2006 tx.send(AgentEvent::End {
2007 text: final_text.clone(),
2008 usage: total_usage.clone(),
2009 meta: response.meta.clone(),
2010 })
2011 .await
2012 .ok();
2013 }
2014
2015 if let Some(sid) = session_id {
2017 self.notify_turn_complete(sid, effective_prompt, &final_text)
2018 .await;
2019 }
2020
2021 return Ok(AgentResult {
2022 text: final_text,
2023 messages,
2024 usage: total_usage,
2025 tool_calls_count,
2026 });
2027 }
2028
2029 let tool_calls = if self.config.hook_engine.is_none()
2033 && self.config.confirmation_manager.is_none()
2034 && tool_calls.len() > 1
2035 && tool_calls
2036 .iter()
2037 .all(|tc| Self::is_parallel_safe_write(&tc.name, &tc.args))
2038 && {
2039 let paths: Vec<_> = tool_calls
2041 .iter()
2042 .filter_map(|tc| Self::extract_write_path(&tc.args))
2043 .collect();
2044 paths.len() == tool_calls.len()
2045 && paths.iter().collect::<std::collections::HashSet<_>>().len()
2046 == paths.len()
2047 } {
2048 tracing::info!(
2049 count = tool_calls.len(),
2050 "Parallel write batch: executing {} independent file writes concurrently",
2051 tool_calls.len()
2052 );
2053
2054 let futures: Vec<_> = tool_calls
2055 .iter()
2056 .map(|tc| {
2057 let ctx = self.tool_context.clone();
2058 let executor = Arc::clone(&self.tool_executor);
2059 let name = tc.name.clone();
2060 let args = tc.args.clone();
2061 async move { executor.execute_with_context(&name, &args, &ctx).await }
2062 })
2063 .collect();
2064
2065 let results = join_all(futures).await;
2066
2067 for (tc, result) in tool_calls.iter().zip(results) {
2069 tool_calls_count += 1;
2070 let (output, exit_code, is_error, metadata, images) =
2071 Self::tool_result_to_tuple(result);
2072
2073 let output = if let Some(ref sp) = self.config.security_provider {
2074 sp.sanitize_output(&output)
2075 } else {
2076 output
2077 };
2078
2079 if let Some(tx) = &event_tx {
2080 tx.send(AgentEvent::ToolEnd {
2081 id: tc.id.clone(),
2082 name: tc.name.clone(),
2083 output: output.clone(),
2084 exit_code,
2085 metadata,
2086 })
2087 .await
2088 .ok();
2089 }
2090
2091 if images.is_empty() {
2092 messages.push(Message::tool_result(&tc.id, &output, is_error));
2093 } else {
2094 messages.push(Message::tool_result_with_images(
2095 &tc.id, &output, &images, is_error,
2096 ));
2097 }
2098 }
2099
2100 continue;
2102 } else {
2103 tool_calls
2104 };
2105
2106 for tool_call in tool_calls {
2107 tool_calls_count += 1;
2108
2109 let tool_start = std::time::Instant::now();
2110
2111 tracing::info!(
2112 tool_name = tool_call.name.as_str(),
2113 tool_id = tool_call.id.as_str(),
2114 "Tool execution started"
2115 );
2116
2117 if let Some(parse_error) =
2123 tool_call.args.get("__parse_error").and_then(|v| v.as_str())
2124 {
2125 parse_error_count += 1;
2126 let error_msg = format!("Error: {}", parse_error);
2127 tracing::warn!(
2128 tool = tool_call.name.as_str(),
2129 parse_error_count = parse_error_count,
2130 max_parse_retries = self.config.max_parse_retries,
2131 "Malformed tool arguments from LLM"
2132 );
2133
2134 if let Some(tx) = &event_tx {
2135 tx.send(AgentEvent::ToolEnd {
2136 id: tool_call.id.clone(),
2137 name: tool_call.name.clone(),
2138 output: error_msg.clone(),
2139 exit_code: 1,
2140 metadata: None,
2141 })
2142 .await
2143 .ok();
2144 }
2145
2146 messages.push(Message::tool_result(&tool_call.id, &error_msg, true));
2147
2148 if parse_error_count > self.config.max_parse_retries {
2149 let msg = format!(
2150 "LLM produced malformed tool arguments {} time(s) in a row \
2151 (max_parse_retries={}); giving up",
2152 parse_error_count, self.config.max_parse_retries
2153 );
2154 tracing::error!("{}", msg);
2155 if let Some(tx) = &event_tx {
2156 tx.send(AgentEvent::Error {
2157 message: msg.clone(),
2158 })
2159 .await
2160 .ok();
2161 }
2162 anyhow::bail!(msg);
2163 }
2164 continue;
2165 }
2166
2167 parse_error_count = 0;
2169
2170 if let Some(ref registry) = self.config.skill_registry {
2172 let instruction_skills =
2173 registry.by_kind(crate::skills::SkillKind::Instruction);
2174 let has_restrictions =
2175 instruction_skills.iter().any(|s| s.allowed_tools.is_some());
2176 if has_restrictions {
2177 let allowed = instruction_skills
2178 .iter()
2179 .any(|s| s.is_tool_allowed(&tool_call.name));
2180 if !allowed {
2181 let msg = format!(
2182 "Tool '{}' is not allowed by any active skill.",
2183 tool_call.name
2184 );
2185 tracing::info!(
2186 tool_name = tool_call.name.as_str(),
2187 "Tool blocked by skill registry"
2188 );
2189 if let Some(tx) = &event_tx {
2190 tx.send(AgentEvent::PermissionDenied {
2191 tool_id: tool_call.id.clone(),
2192 tool_name: tool_call.name.clone(),
2193 args: tool_call.args.clone(),
2194 reason: msg.clone(),
2195 })
2196 .await
2197 .ok();
2198 }
2199 messages.push(Message::tool_result(&tool_call.id, &msg, true));
2200 continue;
2201 }
2202 }
2203 }
2204
2205 if let Some(HookResult::Block(reason)) = self
2207 .fire_pre_tool_use(session_id.unwrap_or(""), &tool_call.name, &tool_call.args)
2208 .await
2209 {
2210 let msg = format!("Tool '{}' blocked by hook: {}", tool_call.name, reason);
2211 tracing::info!(
2212 tool_name = tool_call.name.as_str(),
2213 "Tool blocked by PreToolUse hook"
2214 );
2215
2216 if let Some(tx) = &event_tx {
2217 tx.send(AgentEvent::PermissionDenied {
2218 tool_id: tool_call.id.clone(),
2219 tool_name: tool_call.name.clone(),
2220 args: tool_call.args.clone(),
2221 reason: reason.clone(),
2222 })
2223 .await
2224 .ok();
2225 }
2226
2227 messages.push(Message::tool_result(&tool_call.id, &msg, true));
2228 continue;
2229 }
2230
2231 let permission_decision = if let Some(checker) = &self.config.permission_checker {
2233 checker.check(&tool_call.name, &tool_call.args)
2234 } else {
2235 PermissionDecision::Ask
2237 };
2238
2239 let (output, exit_code, is_error, metadata, images) = match permission_decision {
2240 PermissionDecision::Deny => {
2241 tracing::info!(
2242 tool_name = tool_call.name.as_str(),
2243 permission = "deny",
2244 "Tool permission denied"
2245 );
2246 let denial_msg = format!(
2248 "Permission denied: Tool '{}' is blocked by permission policy.",
2249 tool_call.name
2250 );
2251
2252 if let Some(tx) = &event_tx {
2254 tx.send(AgentEvent::PermissionDenied {
2255 tool_id: tool_call.id.clone(),
2256 tool_name: tool_call.name.clone(),
2257 args: tool_call.args.clone(),
2258 reason: "Blocked by deny rule in permission policy".to_string(),
2259 })
2260 .await
2261 .ok();
2262 }
2263
2264 (denial_msg, 1, true, None, Vec::new())
2265 }
2266 PermissionDecision::Allow => {
2267 tracing::info!(
2268 tool_name = tool_call.name.as_str(),
2269 permission = "allow",
2270 "Tool permission: allow"
2271 );
2272 let stream_ctx =
2274 self.streaming_tool_context(&event_tx, &tool_call.id, &tool_call.name);
2275 let result = self
2276 .execute_tool_queued_or_direct(
2277 &tool_call.name,
2278 &tool_call.args,
2279 &stream_ctx,
2280 )
2281 .await;
2282
2283 Self::tool_result_to_tuple(result)
2284 }
2285 PermissionDecision::Ask => {
2286 tracing::info!(
2287 tool_name = tool_call.name.as_str(),
2288 permission = "ask",
2289 "Tool permission: ask"
2290 );
2291 if let Some(cm) = &self.config.confirmation_manager {
2293 if !cm.requires_confirmation(&tool_call.name).await {
2295 let stream_ctx = self.streaming_tool_context(
2296 &event_tx,
2297 &tool_call.id,
2298 &tool_call.name,
2299 );
2300 let result = self
2301 .execute_tool_queued_or_direct(
2302 &tool_call.name,
2303 &tool_call.args,
2304 &stream_ctx,
2305 )
2306 .await;
2307
2308 let (output, exit_code, is_error, metadata, images) =
2309 Self::tool_result_to_tuple(result);
2310
2311 if images.is_empty() {
2313 messages.push(Message::tool_result(
2314 &tool_call.id,
2315 &output,
2316 is_error,
2317 ));
2318 } else {
2319 messages.push(Message::tool_result_with_images(
2320 &tool_call.id,
2321 &output,
2322 &images,
2323 is_error,
2324 ));
2325 }
2326
2327 let tool_duration = tool_start.elapsed();
2329 crate::telemetry::record_tool_result(exit_code, tool_duration);
2330
2331 if let Some(tx) = &event_tx {
2333 tx.send(AgentEvent::ToolEnd {
2334 id: tool_call.id.clone(),
2335 name: tool_call.name.clone(),
2336 output: output.clone(),
2337 exit_code,
2338 metadata,
2339 })
2340 .await
2341 .ok();
2342 }
2343
2344 self.fire_post_tool_use(
2346 session_id.unwrap_or(""),
2347 &tool_call.name,
2348 &tool_call.args,
2349 &output,
2350 exit_code == 0,
2351 tool_duration.as_millis() as u64,
2352 )
2353 .await;
2354
2355 continue; }
2357
2358 let policy = cm.policy().await;
2360 let timeout_ms = policy.default_timeout_ms;
2361 let timeout_action = policy.timeout_action;
2362
2363 let rx = cm
2365 .request_confirmation(
2366 &tool_call.id,
2367 &tool_call.name,
2368 &tool_call.args,
2369 )
2370 .await;
2371
2372 if let Some(tx) = &event_tx {
2376 tx.send(AgentEvent::ConfirmationRequired {
2377 tool_id: tool_call.id.clone(),
2378 tool_name: tool_call.name.clone(),
2379 args: tool_call.args.clone(),
2380 timeout_ms,
2381 })
2382 .await
2383 .ok();
2384 }
2385
2386 let confirmation_result =
2388 tokio::time::timeout(Duration::from_millis(timeout_ms), rx).await;
2389
2390 match confirmation_result {
2391 Ok(Ok(response)) => {
2392 if let Some(tx) = &event_tx {
2394 tx.send(AgentEvent::ConfirmationReceived {
2395 tool_id: tool_call.id.clone(),
2396 approved: response.approved,
2397 reason: response.reason.clone(),
2398 })
2399 .await
2400 .ok();
2401 }
2402 if response.approved {
2403 let stream_ctx = self.streaming_tool_context(
2404 &event_tx,
2405 &tool_call.id,
2406 &tool_call.name,
2407 );
2408 let result = self
2409 .execute_tool_queued_or_direct(
2410 &tool_call.name,
2411 &tool_call.args,
2412 &stream_ctx,
2413 )
2414 .await;
2415
2416 Self::tool_result_to_tuple(result)
2417 } else {
2418 let rejection_msg = format!(
2419 "Tool '{}' execution was REJECTED by the user. Reason: {}. \
2420 DO NOT retry this tool call unless the user explicitly asks you to.",
2421 tool_call.name,
2422 response.reason.unwrap_or_else(|| "No reason provided".to_string())
2423 );
2424 (rejection_msg, 1, true, None, Vec::new())
2425 }
2426 }
2427 Ok(Err(_)) => {
2428 if let Some(tx) = &event_tx {
2430 tx.send(AgentEvent::ConfirmationTimeout {
2431 tool_id: tool_call.id.clone(),
2432 action_taken: "rejected".to_string(),
2433 })
2434 .await
2435 .ok();
2436 }
2437 let msg = format!(
2438 "Tool '{}' confirmation failed: confirmation channel closed",
2439 tool_call.name
2440 );
2441 (msg, 1, true, None, Vec::new())
2442 }
2443 Err(_) => {
2444 cm.check_timeouts().await;
2445
2446 if let Some(tx) = &event_tx {
2448 tx.send(AgentEvent::ConfirmationTimeout {
2449 tool_id: tool_call.id.clone(),
2450 action_taken: match timeout_action {
2451 crate::hitl::TimeoutAction::Reject => {
2452 "rejected".to_string()
2453 }
2454 crate::hitl::TimeoutAction::AutoApprove => {
2455 "auto_approved".to_string()
2456 }
2457 },
2458 })
2459 .await
2460 .ok();
2461 }
2462
2463 match timeout_action {
2464 crate::hitl::TimeoutAction::Reject => {
2465 let msg = format!(
2466 "Tool '{}' execution was REJECTED: user confirmation timed out after {}ms. \
2467 DO NOT retry this tool call — the user did not approve it. \
2468 Inform the user that the operation requires their approval and ask them to try again.",
2469 tool_call.name, timeout_ms
2470 );
2471 (msg, 1, true, None, Vec::new())
2472 }
2473 crate::hitl::TimeoutAction::AutoApprove => {
2474 let stream_ctx = self.streaming_tool_context(
2475 &event_tx,
2476 &tool_call.id,
2477 &tool_call.name,
2478 );
2479 let result = self
2480 .execute_tool_queued_or_direct(
2481 &tool_call.name,
2482 &tool_call.args,
2483 &stream_ctx,
2484 )
2485 .await;
2486
2487 Self::tool_result_to_tuple(result)
2488 }
2489 }
2490 }
2491 }
2492 } else {
2493 let msg = format!(
2495 "Tool '{}' requires confirmation but no HITL confirmation manager is configured. \
2496 Configure a confirmation policy to enable tool execution.",
2497 tool_call.name
2498 );
2499 tracing::warn!(
2500 tool_name = tool_call.name.as_str(),
2501 "Tool requires confirmation but no HITL manager configured"
2502 );
2503 (msg, 1, true, None, Vec::new())
2504 }
2505 }
2506 };
2507
2508 let tool_duration = tool_start.elapsed();
2509 crate::telemetry::record_tool_result(exit_code, tool_duration);
2510
2511 let output = if let Some(ref sp) = self.config.security_provider {
2513 sp.sanitize_output(&output)
2514 } else {
2515 output
2516 };
2517
2518 self.fire_post_tool_use(
2520 session_id.unwrap_or(""),
2521 &tool_call.name,
2522 &tool_call.args,
2523 &output,
2524 exit_code == 0,
2525 tool_duration.as_millis() as u64,
2526 )
2527 .await;
2528
2529 if let Some(ref memory) = self.config.memory {
2531 let tools_used = [tool_call.name.clone()];
2532 let remember_result = if exit_code == 0 {
2533 memory
2534 .remember_success(effective_prompt, &tools_used, &output)
2535 .await
2536 } else {
2537 memory
2538 .remember_failure(effective_prompt, &output, &tools_used)
2539 .await
2540 };
2541 match remember_result {
2542 Ok(()) => {
2543 if let Some(tx) = &event_tx {
2544 let item_type = if exit_code == 0 { "success" } else { "failure" };
2545 tx.send(AgentEvent::MemoryStored {
2546 memory_id: uuid::Uuid::new_v4().to_string(),
2547 memory_type: item_type.to_string(),
2548 importance: if exit_code == 0 { 0.8 } else { 0.9 },
2549 tags: vec![item_type.to_string(), tool_call.name.clone()],
2550 })
2551 .await
2552 .ok();
2553 }
2554 }
2555 Err(e) => {
2556 tracing::warn!("Failed to store memory after tool execution: {}", e);
2557 }
2558 }
2559 }
2560
2561 if let Some(tx) = &event_tx {
2563 tx.send(AgentEvent::ToolEnd {
2564 id: tool_call.id.clone(),
2565 name: tool_call.name.clone(),
2566 output: output.clone(),
2567 exit_code,
2568 metadata,
2569 })
2570 .await
2571 .ok();
2572 }
2573
2574 if images.is_empty() {
2576 messages.push(Message::tool_result(&tool_call.id, &output, is_error));
2577 } else {
2578 messages.push(Message::tool_result_with_images(
2579 &tool_call.id,
2580 &output,
2581 &images,
2582 is_error,
2583 ));
2584 }
2585 }
2586 }
2587 }
2588
2589 pub async fn execute_streaming(
2591 &self,
2592 history: &[Message],
2593 prompt: &str,
2594 ) -> Result<(
2595 mpsc::Receiver<AgentEvent>,
2596 tokio::task::JoinHandle<Result<AgentResult>>,
2597 tokio_util::sync::CancellationToken,
2598 )> {
2599 let (tx, rx) = mpsc::channel(100);
2600 let cancel_token = tokio_util::sync::CancellationToken::new();
2601
2602 let llm_client = self.llm_client.clone();
2603 let tool_executor = self.tool_executor.clone();
2604 let tool_context = self.tool_context.clone();
2605 let config = self.config.clone();
2606 let tool_metrics = self.tool_metrics.clone();
2607 let command_queue = self.command_queue.clone();
2608 let history = history.to_vec();
2609 let prompt = prompt.to_string();
2610 let token_clone = cancel_token.clone();
2611
2612 let handle = tokio::spawn(async move {
2613 let mut agent = AgentLoop::new(llm_client, tool_executor, tool_context, config);
2614 if let Some(metrics) = tool_metrics {
2615 agent = agent.with_tool_metrics(metrics);
2616 }
2617 if let Some(queue) = command_queue {
2618 agent = agent.with_queue(queue);
2619 }
2620 agent
2621 .execute_with_session(&history, &prompt, None, Some(tx), Some(&token_clone))
2622 .await
2623 });
2624
2625 Ok((rx, handle, cancel_token))
2626 }
2627
2628 pub async fn plan(&self, prompt: &str, _context: Option<&str>) -> Result<ExecutionPlan> {
2633 use crate::planning::LlmPlanner;
2634
2635 match LlmPlanner::create_plan(&self.llm_client, prompt).await {
2636 Ok(plan) => Ok(plan),
2637 Err(e) => {
2638 tracing::warn!("LLM plan creation failed, using fallback: {}", e);
2639 Ok(LlmPlanner::fallback_plan(prompt))
2640 }
2641 }
2642 }
2643
2644 pub async fn execute_with_planning(
2646 &self,
2647 history: &[Message],
2648 prompt: &str,
2649 event_tx: Option<mpsc::Sender<AgentEvent>>,
2650 ) -> Result<AgentResult> {
2651 if let Some(tx) = &event_tx {
2653 tx.send(AgentEvent::PlanningStart {
2654 prompt: prompt.to_string(),
2655 })
2656 .await
2657 .ok();
2658 }
2659
2660 let goal = if self.config.goal_tracking {
2662 let g = self.extract_goal(prompt).await?;
2663 if let Some(tx) = &event_tx {
2664 tx.send(AgentEvent::GoalExtracted { goal: g.clone() })
2665 .await
2666 .ok();
2667 }
2668 Some(g)
2669 } else {
2670 None
2671 };
2672
2673 let plan = self.plan(prompt, None).await?;
2675
2676 if let Some(tx) = &event_tx {
2678 tx.send(AgentEvent::PlanningEnd {
2679 estimated_steps: plan.steps.len(),
2680 plan: plan.clone(),
2681 })
2682 .await
2683 .ok();
2684 }
2685
2686 let plan_start = std::time::Instant::now();
2687
2688 let result = self.execute_plan(history, &plan, event_tx.clone()).await?;
2690
2691 if self.config.goal_tracking {
2693 if let Some(ref g) = goal {
2694 let achieved = self.check_goal_achievement(g, &result.text).await?;
2695 if achieved {
2696 if let Some(tx) = &event_tx {
2697 tx.send(AgentEvent::GoalAchieved {
2698 goal: g.description.clone(),
2699 total_steps: result.messages.len(),
2700 duration_ms: plan_start.elapsed().as_millis() as i64,
2701 })
2702 .await
2703 .ok();
2704 }
2705 }
2706 }
2707 }
2708
2709 Ok(result)
2710 }
2711
2712 async fn execute_plan(
2719 &self,
2720 history: &[Message],
2721 plan: &ExecutionPlan,
2722 event_tx: Option<mpsc::Sender<AgentEvent>>,
2723 ) -> Result<AgentResult> {
2724 let mut plan = plan.clone();
2725 let mut current_history = history.to_vec();
2726 let mut total_usage = TokenUsage::default();
2727 let mut tool_calls_count = 0;
2728 let total_steps = plan.steps.len();
2729
2730 let steps_text = plan
2732 .steps
2733 .iter()
2734 .enumerate()
2735 .map(|(i, step)| format!("{}. {}", i + 1, step.content))
2736 .collect::<Vec<_>>()
2737 .join("\n");
2738 current_history.push(Message::user(&crate::prompts::render(
2739 crate::prompts::PLAN_EXECUTE_GOAL,
2740 &[("goal", &plan.goal), ("steps", &steps_text)],
2741 )));
2742
2743 loop {
2744 let ready: Vec<String> = plan
2745 .get_ready_steps()
2746 .iter()
2747 .map(|s| s.id.clone())
2748 .collect();
2749
2750 if ready.is_empty() {
2751 if plan.has_deadlock() {
2753 tracing::warn!(
2754 "Plan deadlock detected: {} pending steps with unresolvable dependencies",
2755 plan.pending_count()
2756 );
2757 }
2758 break;
2759 }
2760
2761 if ready.len() == 1 {
2762 let step_id = &ready[0];
2764 let step = plan
2765 .steps
2766 .iter()
2767 .find(|s| s.id == *step_id)
2768 .ok_or_else(|| anyhow::anyhow!("step '{}' not found in plan", step_id))?
2769 .clone();
2770 let step_number = plan
2771 .steps
2772 .iter()
2773 .position(|s| s.id == *step_id)
2774 .unwrap_or(0)
2775 + 1;
2776
2777 if let Some(tx) = &event_tx {
2779 tx.send(AgentEvent::StepStart {
2780 step_id: step.id.clone(),
2781 description: step.content.clone(),
2782 step_number,
2783 total_steps,
2784 })
2785 .await
2786 .ok();
2787 }
2788
2789 plan.mark_status(&step.id, TaskStatus::InProgress);
2790
2791 let step_prompt = crate::prompts::render(
2792 crate::prompts::PLAN_EXECUTE_STEP,
2793 &[
2794 ("step_num", &step_number.to_string()),
2795 ("description", &step.content),
2796 ],
2797 );
2798
2799 match self
2800 .execute_loop(
2801 ¤t_history,
2802 &step_prompt,
2803 None,
2804 event_tx.clone(),
2805 &tokio_util::sync::CancellationToken::new(),
2806 )
2807 .await
2808 {
2809 Ok(result) => {
2810 current_history = result.messages.clone();
2811 total_usage.prompt_tokens += result.usage.prompt_tokens;
2812 total_usage.completion_tokens += result.usage.completion_tokens;
2813 total_usage.total_tokens += result.usage.total_tokens;
2814 tool_calls_count += result.tool_calls_count;
2815 plan.mark_status(&step.id, TaskStatus::Completed);
2816
2817 if let Some(tx) = &event_tx {
2818 tx.send(AgentEvent::StepEnd {
2819 step_id: step.id.clone(),
2820 status: TaskStatus::Completed,
2821 step_number,
2822 total_steps,
2823 })
2824 .await
2825 .ok();
2826 }
2827 }
2828 Err(e) => {
2829 tracing::error!("Plan step '{}' failed: {}", step.id, e);
2830 plan.mark_status(&step.id, TaskStatus::Failed);
2831
2832 if let Some(tx) = &event_tx {
2833 tx.send(AgentEvent::StepEnd {
2834 step_id: step.id.clone(),
2835 status: TaskStatus::Failed,
2836 step_number,
2837 total_steps,
2838 })
2839 .await
2840 .ok();
2841 }
2842 }
2843 }
2844 } else {
2845 let ready_steps: Vec<_> = ready
2852 .iter()
2853 .filter_map(|id| {
2854 let step = plan.steps.iter().find(|s| s.id == *id)?.clone();
2855 let step_number =
2856 plan.steps.iter().position(|s| s.id == *id).unwrap_or(0) + 1;
2857 Some((step, step_number))
2858 })
2859 .collect();
2860
2861 for (step, step_number) in &ready_steps {
2863 plan.mark_status(&step.id, TaskStatus::InProgress);
2864 if let Some(tx) = &event_tx {
2865 tx.send(AgentEvent::StepStart {
2866 step_id: step.id.clone(),
2867 description: step.content.clone(),
2868 step_number: *step_number,
2869 total_steps,
2870 })
2871 .await
2872 .ok();
2873 }
2874 }
2875
2876 let mut join_set = tokio::task::JoinSet::new();
2878 for (step, step_number) in &ready_steps {
2879 let base_history = current_history.clone();
2880 let agent_clone = self.clone();
2881 let tx = event_tx.clone();
2882 let step_clone = step.clone();
2883 let sn = *step_number;
2884
2885 join_set.spawn(async move {
2886 let prompt = crate::prompts::render(
2887 crate::prompts::PLAN_EXECUTE_STEP,
2888 &[
2889 ("step_num", &sn.to_string()),
2890 ("description", &step_clone.content),
2891 ],
2892 );
2893 let result = agent_clone
2894 .execute_loop(
2895 &base_history,
2896 &prompt,
2897 None,
2898 tx,
2899 &tokio_util::sync::CancellationToken::new(),
2900 )
2901 .await;
2902 (step_clone.id, sn, result)
2903 });
2904 }
2905
2906 let mut parallel_summaries = Vec::new();
2908 while let Some(join_result) = join_set.join_next().await {
2909 match join_result {
2910 Ok((step_id, step_number, step_result)) => match step_result {
2911 Ok(result) => {
2912 total_usage.prompt_tokens += result.usage.prompt_tokens;
2913 total_usage.completion_tokens += result.usage.completion_tokens;
2914 total_usage.total_tokens += result.usage.total_tokens;
2915 tool_calls_count += result.tool_calls_count;
2916 plan.mark_status(&step_id, TaskStatus::Completed);
2917
2918 parallel_summaries.push(format!(
2920 "- Step {} ({}): {}",
2921 step_number, step_id, result.text
2922 ));
2923
2924 if let Some(tx) = &event_tx {
2925 tx.send(AgentEvent::StepEnd {
2926 step_id,
2927 status: TaskStatus::Completed,
2928 step_number,
2929 total_steps,
2930 })
2931 .await
2932 .ok();
2933 }
2934 }
2935 Err(e) => {
2936 tracing::error!("Plan step '{}' failed: {}", step_id, e);
2937 plan.mark_status(&step_id, TaskStatus::Failed);
2938
2939 if let Some(tx) = &event_tx {
2940 tx.send(AgentEvent::StepEnd {
2941 step_id,
2942 status: TaskStatus::Failed,
2943 step_number,
2944 total_steps,
2945 })
2946 .await
2947 .ok();
2948 }
2949 }
2950 },
2951 Err(e) => {
2952 tracing::error!("JoinSet task panicked: {}", e);
2953 }
2954 }
2955 }
2956
2957 if !parallel_summaries.is_empty() {
2959 parallel_summaries.sort(); let results_text = parallel_summaries.join("\n");
2961 current_history.push(Message::user(&crate::prompts::render(
2962 crate::prompts::PLAN_PARALLEL_RESULTS,
2963 &[("results", &results_text)],
2964 )));
2965 }
2966 }
2967
2968 if self.config.goal_tracking {
2970 let completed = plan
2971 .steps
2972 .iter()
2973 .filter(|s| s.status == TaskStatus::Completed)
2974 .count();
2975 if let Some(tx) = &event_tx {
2976 tx.send(AgentEvent::GoalProgress {
2977 goal: plan.goal.clone(),
2978 progress: plan.progress(),
2979 completed_steps: completed,
2980 total_steps,
2981 })
2982 .await
2983 .ok();
2984 }
2985 }
2986 }
2987
2988 let final_text = current_history
2990 .last()
2991 .map(|m| {
2992 m.content
2993 .iter()
2994 .filter_map(|block| {
2995 if let crate::llm::ContentBlock::Text { text } = block {
2996 Some(text.as_str())
2997 } else {
2998 None
2999 }
3000 })
3001 .collect::<Vec<_>>()
3002 .join("\n")
3003 })
3004 .unwrap_or_default();
3005
3006 Ok(AgentResult {
3007 text: final_text,
3008 messages: current_history,
3009 usage: total_usage,
3010 tool_calls_count,
3011 })
3012 }
3013
3014 pub async fn extract_goal(&self, prompt: &str) -> Result<AgentGoal> {
3019 use crate::planning::LlmPlanner;
3020
3021 match LlmPlanner::extract_goal(&self.llm_client, prompt).await {
3022 Ok(goal) => Ok(goal),
3023 Err(e) => {
3024 tracing::warn!("LLM goal extraction failed, using fallback: {}", e);
3025 Ok(LlmPlanner::fallback_goal(prompt))
3026 }
3027 }
3028 }
3029
3030 pub async fn check_goal_achievement(
3035 &self,
3036 goal: &AgentGoal,
3037 current_state: &str,
3038 ) -> Result<bool> {
3039 use crate::planning::LlmPlanner;
3040
3041 match LlmPlanner::check_achievement(&self.llm_client, goal, current_state).await {
3042 Ok(result) => Ok(result.achieved),
3043 Err(e) => {
3044 tracing::warn!("LLM achievement check failed, using fallback: {}", e);
3045 let result = LlmPlanner::fallback_check_achievement(goal, current_state);
3046 Ok(result.achieved)
3047 }
3048 }
3049 }
3050}
3051
3052#[cfg(test)]
3053mod tests {
3054 use super::*;
3055 use crate::llm::{ContentBlock, StreamEvent};
3056 use crate::permissions::PermissionPolicy;
3057 use crate::tools::ToolExecutor;
3058 use std::path::PathBuf;
3059 use std::sync::atomic::{AtomicUsize, Ordering};
3060
3061 fn test_tool_context() -> ToolContext {
3063 ToolContext::new(PathBuf::from("/tmp"))
3064 }
3065
3066 #[test]
3067 fn test_agent_config_default() {
3068 let config = AgentConfig::default();
3069 assert!(config.prompt_slots.is_empty());
3070 assert!(config.tools.is_empty()); assert_eq!(config.max_tool_rounds, MAX_TOOL_ROUNDS);
3072 assert!(config.permission_checker.is_none());
3073 assert!(config.context_providers.is_empty());
3074 let registry = config
3076 .skill_registry
3077 .expect("skill_registry must be Some by default");
3078 assert!(registry.len() >= 7, "expected at least 7 built-in skills");
3079 assert!(registry.get("code-search").is_some());
3080 assert!(registry.get("find-bugs").is_some());
3081 }
3082
3083 pub(crate) struct MockLlmClient {
3089 responses: std::sync::Mutex<Vec<LlmResponse>>,
3091 pub(crate) call_count: AtomicUsize,
3093 }
3094
3095 impl MockLlmClient {
3096 pub(crate) fn new(responses: Vec<LlmResponse>) -> Self {
3097 Self {
3098 responses: std::sync::Mutex::new(responses),
3099 call_count: AtomicUsize::new(0),
3100 }
3101 }
3102
3103 pub(crate) fn text_response(text: &str) -> LlmResponse {
3105 LlmResponse {
3106 message: Message {
3107 role: "assistant".to_string(),
3108 content: vec![ContentBlock::Text {
3109 text: text.to_string(),
3110 }],
3111 reasoning_content: None,
3112 },
3113 usage: TokenUsage {
3114 prompt_tokens: 10,
3115 completion_tokens: 5,
3116 total_tokens: 15,
3117 cache_read_tokens: None,
3118 cache_write_tokens: None,
3119 },
3120 stop_reason: Some("end_turn".to_string()),
3121 meta: None,
3122 }
3123 }
3124
3125 pub(crate) fn tool_call_response(
3127 tool_id: &str,
3128 tool_name: &str,
3129 args: serde_json::Value,
3130 ) -> LlmResponse {
3131 LlmResponse {
3132 message: Message {
3133 role: "assistant".to_string(),
3134 content: vec![ContentBlock::ToolUse {
3135 id: tool_id.to_string(),
3136 name: tool_name.to_string(),
3137 input: args,
3138 }],
3139 reasoning_content: None,
3140 },
3141 usage: TokenUsage {
3142 prompt_tokens: 10,
3143 completion_tokens: 5,
3144 total_tokens: 15,
3145 cache_read_tokens: None,
3146 cache_write_tokens: None,
3147 },
3148 stop_reason: Some("tool_use".to_string()),
3149 meta: None,
3150 }
3151 }
3152 }
3153
3154 #[async_trait::async_trait]
3155 impl LlmClient for MockLlmClient {
3156 async fn complete(
3157 &self,
3158 _messages: &[Message],
3159 _system: Option<&str>,
3160 _tools: &[ToolDefinition],
3161 ) -> Result<LlmResponse> {
3162 self.call_count.fetch_add(1, Ordering::SeqCst);
3163 let mut responses = self.responses.lock().unwrap();
3164 if responses.is_empty() {
3165 anyhow::bail!("No more mock responses available");
3166 }
3167 Ok(responses.remove(0))
3168 }
3169
3170 async fn complete_streaming(
3171 &self,
3172 _messages: &[Message],
3173 _system: Option<&str>,
3174 _tools: &[ToolDefinition],
3175 ) -> Result<mpsc::Receiver<StreamEvent>> {
3176 self.call_count.fetch_add(1, Ordering::SeqCst);
3177 let mut responses = self.responses.lock().unwrap();
3178 if responses.is_empty() {
3179 anyhow::bail!("No more mock responses available");
3180 }
3181 let response = responses.remove(0);
3182
3183 let (tx, rx) = mpsc::channel(10);
3184 tokio::spawn(async move {
3185 for block in &response.message.content {
3187 if let ContentBlock::Text { text } = block {
3188 tx.send(StreamEvent::TextDelta(text.clone())).await.ok();
3189 }
3190 }
3191 tx.send(StreamEvent::Done(response)).await.ok();
3192 });
3193
3194 Ok(rx)
3195 }
3196 }
3197
3198 #[tokio::test]
3203 async fn test_agent_simple_response() {
3204 let mock_client = Arc::new(MockLlmClient::new(vec![MockLlmClient::text_response(
3205 "Hello, I'm an AI assistant.",
3206 )]));
3207
3208 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
3209 let config = AgentConfig::default();
3210
3211 let agent = AgentLoop::new(
3212 mock_client.clone(),
3213 tool_executor,
3214 test_tool_context(),
3215 config,
3216 );
3217 let result = agent.execute(&[], "Hello", None).await.unwrap();
3218
3219 assert_eq!(result.text, "Hello, I'm an AI assistant.");
3220 assert_eq!(result.tool_calls_count, 0);
3221 assert_eq!(mock_client.call_count.load(Ordering::SeqCst), 1);
3222 }
3223
3224 #[tokio::test]
3225 async fn test_agent_with_tool_call() {
3226 let mock_client = Arc::new(MockLlmClient::new(vec![
3227 MockLlmClient::tool_call_response(
3229 "tool-1",
3230 "bash",
3231 serde_json::json!({"command": "echo hello"}),
3232 ),
3233 MockLlmClient::text_response("The command output was: hello"),
3235 ]));
3236
3237 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
3238 let config = AgentConfig::default();
3239
3240 let agent = AgentLoop::new(
3241 mock_client.clone(),
3242 tool_executor,
3243 test_tool_context(),
3244 config,
3245 );
3246 let result = agent.execute(&[], "Run echo hello", None).await.unwrap();
3247
3248 assert_eq!(result.text, "The command output was: hello");
3249 assert_eq!(result.tool_calls_count, 1);
3250 assert_eq!(mock_client.call_count.load(Ordering::SeqCst), 2);
3251 }
3252
3253 #[tokio::test]
3254 async fn test_agent_permission_deny() {
3255 let mock_client = Arc::new(MockLlmClient::new(vec![
3256 MockLlmClient::tool_call_response(
3258 "tool-1",
3259 "bash",
3260 serde_json::json!({"command": "rm -rf /tmp/test"}),
3261 ),
3262 MockLlmClient::text_response(
3264 "I cannot execute that command due to permission restrictions.",
3265 ),
3266 ]));
3267
3268 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
3269
3270 let permission_policy = PermissionPolicy::new().deny("bash(rm:*)");
3272
3273 let config = AgentConfig {
3274 permission_checker: Some(Arc::new(permission_policy)),
3275 ..Default::default()
3276 };
3277
3278 let (tx, mut rx) = mpsc::channel(100);
3279 let agent = AgentLoop::new(
3280 mock_client.clone(),
3281 tool_executor,
3282 test_tool_context(),
3283 config,
3284 );
3285 let result = agent.execute(&[], "Delete files", Some(tx)).await.unwrap();
3286
3287 let mut found_permission_denied = false;
3289 while let Ok(event) = rx.try_recv() {
3290 if let AgentEvent::PermissionDenied { tool_name, .. } = event {
3291 assert_eq!(tool_name, "bash");
3292 found_permission_denied = true;
3293 }
3294 }
3295 assert!(
3296 found_permission_denied,
3297 "Should have received PermissionDenied event"
3298 );
3299
3300 assert_eq!(result.tool_calls_count, 1);
3301 }
3302
3303 #[tokio::test]
3304 async fn test_agent_permission_allow() {
3305 let mock_client = Arc::new(MockLlmClient::new(vec![
3306 MockLlmClient::tool_call_response(
3308 "tool-1",
3309 "bash",
3310 serde_json::json!({"command": "echo hello"}),
3311 ),
3312 MockLlmClient::text_response("Done!"),
3314 ]));
3315
3316 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
3317
3318 let permission_policy = PermissionPolicy::new()
3320 .allow("bash(echo:*)")
3321 .deny("bash(rm:*)");
3322
3323 let config = AgentConfig {
3324 permission_checker: Some(Arc::new(permission_policy)),
3325 ..Default::default()
3326 };
3327
3328 let agent = AgentLoop::new(
3329 mock_client.clone(),
3330 tool_executor,
3331 test_tool_context(),
3332 config,
3333 );
3334 let result = agent.execute(&[], "Echo hello", None).await.unwrap();
3335
3336 assert_eq!(result.text, "Done!");
3337 assert_eq!(result.tool_calls_count, 1);
3338 }
3339
3340 #[tokio::test]
3341 async fn test_agent_streaming_events() {
3342 let mock_client = Arc::new(MockLlmClient::new(vec![MockLlmClient::text_response(
3343 "Hello!",
3344 )]));
3345
3346 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
3347 let config = AgentConfig::default();
3348
3349 let agent = AgentLoop::new(mock_client, tool_executor, test_tool_context(), config);
3350 let (mut rx, handle, _cancel_token) = agent.execute_streaming(&[], "Hi").await.unwrap();
3351
3352 let mut events = Vec::new();
3354 while let Some(event) = rx.recv().await {
3355 events.push(event);
3356 }
3357
3358 let result = handle.await.unwrap().unwrap();
3359 assert_eq!(result.text, "Hello!");
3360
3361 assert!(events.iter().any(|e| matches!(e, AgentEvent::Start { .. })));
3363 assert!(events.iter().any(|e| matches!(e, AgentEvent::End { .. })));
3364 }
3365
3366 #[tokio::test]
3367 async fn test_agent_max_tool_rounds() {
3368 let responses: Vec<LlmResponse> = (0..100)
3370 .map(|i| {
3371 MockLlmClient::tool_call_response(
3372 &format!("tool-{}", i),
3373 "bash",
3374 serde_json::json!({"command": "echo loop"}),
3375 )
3376 })
3377 .collect();
3378
3379 let mock_client = Arc::new(MockLlmClient::new(responses));
3380 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
3381
3382 let config = AgentConfig {
3383 max_tool_rounds: 3,
3384 ..Default::default()
3385 };
3386
3387 let agent = AgentLoop::new(mock_client, tool_executor, test_tool_context(), config);
3388 let result = agent.execute(&[], "Loop forever", None).await;
3389
3390 assert!(result.is_err());
3392 assert!(result.unwrap_err().to_string().contains("Max tool rounds"));
3393 }
3394
3395 #[tokio::test]
3396 async fn test_agent_no_permission_policy_defaults_to_ask() {
3397 let mock_client = Arc::new(MockLlmClient::new(vec![
3400 MockLlmClient::tool_call_response(
3401 "tool-1",
3402 "bash",
3403 serde_json::json!({"command": "rm -rf /tmp/test"}),
3404 ),
3405 MockLlmClient::text_response("Denied!"),
3406 ]));
3407
3408 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
3409 let config = AgentConfig {
3410 permission_checker: None, ..Default::default()
3413 };
3414
3415 let agent = AgentLoop::new(mock_client, tool_executor, test_tool_context(), config);
3416 let result = agent.execute(&[], "Delete", None).await.unwrap();
3417
3418 assert_eq!(result.text, "Denied!");
3420 assert_eq!(result.tool_calls_count, 1);
3421 }
3422
3423 #[tokio::test]
3424 async fn test_agent_permission_ask_without_cm_denies() {
3425 let mock_client = Arc::new(MockLlmClient::new(vec![
3428 MockLlmClient::tool_call_response(
3429 "tool-1",
3430 "bash",
3431 serde_json::json!({"command": "echo test"}),
3432 ),
3433 MockLlmClient::text_response("Denied!"),
3434 ]));
3435
3436 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
3437
3438 let permission_policy = PermissionPolicy::new(); let config = AgentConfig {
3442 permission_checker: Some(Arc::new(permission_policy)),
3443 ..Default::default()
3445 };
3446
3447 let agent = AgentLoop::new(mock_client, tool_executor, test_tool_context(), config);
3448 let result = agent.execute(&[], "Echo", None).await.unwrap();
3449
3450 assert_eq!(result.text, "Denied!");
3452 assert!(result.tool_calls_count >= 1);
3454 }
3455
3456 #[tokio::test]
3461 async fn test_agent_hitl_approved() {
3462 use crate::hitl::{ConfirmationManager, ConfirmationPolicy};
3463 use tokio::sync::broadcast;
3464
3465 let mock_client = Arc::new(MockLlmClient::new(vec![
3466 MockLlmClient::tool_call_response(
3467 "tool-1",
3468 "bash",
3469 serde_json::json!({"command": "echo hello"}),
3470 ),
3471 MockLlmClient::text_response("Command executed!"),
3472 ]));
3473
3474 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
3475
3476 let (event_tx, _event_rx) = broadcast::channel(100);
3478 let hitl_policy = ConfirmationPolicy {
3479 enabled: true,
3480 ..Default::default()
3481 };
3482 let confirmation_manager = Arc::new(ConfirmationManager::new(hitl_policy, event_tx));
3483
3484 let permission_policy = PermissionPolicy::new(); let config = AgentConfig {
3488 permission_checker: Some(Arc::new(permission_policy)),
3489 confirmation_manager: Some(confirmation_manager.clone()),
3490 ..Default::default()
3491 };
3492
3493 let cm_clone = confirmation_manager.clone();
3495 tokio::spawn(async move {
3496 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3498 cm_clone.confirm("tool-1", true, None).await.ok();
3500 });
3501
3502 let agent = AgentLoop::new(mock_client, tool_executor, test_tool_context(), config);
3503 let result = agent.execute(&[], "Run echo", None).await.unwrap();
3504
3505 assert_eq!(result.text, "Command executed!");
3506 assert_eq!(result.tool_calls_count, 1);
3507 }
3508
3509 #[tokio::test]
3510 async fn test_agent_hitl_rejected() {
3511 use crate::hitl::{ConfirmationManager, ConfirmationPolicy};
3512 use tokio::sync::broadcast;
3513
3514 let mock_client = Arc::new(MockLlmClient::new(vec![
3515 MockLlmClient::tool_call_response(
3516 "tool-1",
3517 "bash",
3518 serde_json::json!({"command": "rm -rf /"}),
3519 ),
3520 MockLlmClient::text_response("Understood, I won't do that."),
3521 ]));
3522
3523 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
3524
3525 let (event_tx, _event_rx) = broadcast::channel(100);
3527 let hitl_policy = ConfirmationPolicy {
3528 enabled: true,
3529 ..Default::default()
3530 };
3531 let confirmation_manager = Arc::new(ConfirmationManager::new(hitl_policy, event_tx));
3532
3533 let permission_policy = PermissionPolicy::new();
3535
3536 let config = AgentConfig {
3537 permission_checker: Some(Arc::new(permission_policy)),
3538 confirmation_manager: Some(confirmation_manager.clone()),
3539 ..Default::default()
3540 };
3541
3542 let cm_clone = confirmation_manager.clone();
3544 tokio::spawn(async move {
3545 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3546 cm_clone
3547 .confirm("tool-1", false, Some("Too dangerous".to_string()))
3548 .await
3549 .ok();
3550 });
3551
3552 let agent = AgentLoop::new(mock_client, tool_executor, test_tool_context(), config);
3553 let result = agent.execute(&[], "Delete everything", None).await.unwrap();
3554
3555 assert_eq!(result.text, "Understood, I won't do that.");
3557 }
3558
3559 #[tokio::test]
3560 async fn test_agent_hitl_timeout_reject() {
3561 use crate::hitl::{ConfirmationManager, ConfirmationPolicy, TimeoutAction};
3562 use tokio::sync::broadcast;
3563
3564 let mock_client = Arc::new(MockLlmClient::new(vec![
3565 MockLlmClient::tool_call_response(
3566 "tool-1",
3567 "bash",
3568 serde_json::json!({"command": "echo test"}),
3569 ),
3570 MockLlmClient::text_response("Timed out, I understand."),
3571 ]));
3572
3573 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
3574
3575 let (event_tx, _event_rx) = broadcast::channel(100);
3577 let hitl_policy = ConfirmationPolicy {
3578 enabled: true,
3579 default_timeout_ms: 50, timeout_action: TimeoutAction::Reject,
3581 ..Default::default()
3582 };
3583 let confirmation_manager = Arc::new(ConfirmationManager::new(hitl_policy, event_tx));
3584
3585 let permission_policy = PermissionPolicy::new();
3586
3587 let config = AgentConfig {
3588 permission_checker: Some(Arc::new(permission_policy)),
3589 confirmation_manager: Some(confirmation_manager),
3590 ..Default::default()
3591 };
3592
3593 let agent = AgentLoop::new(mock_client, tool_executor, test_tool_context(), config);
3595 let result = agent.execute(&[], "Echo", None).await.unwrap();
3596
3597 assert_eq!(result.text, "Timed out, I understand.");
3599 }
3600
3601 #[tokio::test]
3602 async fn test_agent_hitl_timeout_auto_approve() {
3603 use crate::hitl::{ConfirmationManager, ConfirmationPolicy, TimeoutAction};
3604 use tokio::sync::broadcast;
3605
3606 let mock_client = Arc::new(MockLlmClient::new(vec![
3607 MockLlmClient::tool_call_response(
3608 "tool-1",
3609 "bash",
3610 serde_json::json!({"command": "echo hello"}),
3611 ),
3612 MockLlmClient::text_response("Auto-approved and executed!"),
3613 ]));
3614
3615 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
3616
3617 let (event_tx, _event_rx) = broadcast::channel(100);
3619 let hitl_policy = ConfirmationPolicy {
3620 enabled: true,
3621 default_timeout_ms: 50, timeout_action: TimeoutAction::AutoApprove,
3623 ..Default::default()
3624 };
3625 let confirmation_manager = Arc::new(ConfirmationManager::new(hitl_policy, event_tx));
3626
3627 let permission_policy = PermissionPolicy::new();
3628
3629 let config = AgentConfig {
3630 permission_checker: Some(Arc::new(permission_policy)),
3631 confirmation_manager: Some(confirmation_manager),
3632 ..Default::default()
3633 };
3634
3635 let agent = AgentLoop::new(mock_client, tool_executor, test_tool_context(), config);
3637 let result = agent.execute(&[], "Echo", None).await.unwrap();
3638
3639 assert_eq!(result.text, "Auto-approved and executed!");
3641 assert_eq!(result.tool_calls_count, 1);
3642 }
3643
3644 #[tokio::test]
3645 async fn test_agent_hitl_confirmation_events() {
3646 use crate::hitl::{ConfirmationManager, ConfirmationPolicy};
3647 use tokio::sync::broadcast;
3648
3649 let mock_client = Arc::new(MockLlmClient::new(vec![
3650 MockLlmClient::tool_call_response(
3651 "tool-1",
3652 "bash",
3653 serde_json::json!({"command": "echo test"}),
3654 ),
3655 MockLlmClient::text_response("Done!"),
3656 ]));
3657
3658 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
3659
3660 let (event_tx, mut event_rx) = broadcast::channel(100);
3662 let hitl_policy = ConfirmationPolicy {
3663 enabled: true,
3664 default_timeout_ms: 5000, ..Default::default()
3666 };
3667 let confirmation_manager = Arc::new(ConfirmationManager::new(hitl_policy, event_tx));
3668
3669 let permission_policy = PermissionPolicy::new();
3670
3671 let config = AgentConfig {
3672 permission_checker: Some(Arc::new(permission_policy)),
3673 confirmation_manager: Some(confirmation_manager.clone()),
3674 ..Default::default()
3675 };
3676
3677 let cm_clone = confirmation_manager.clone();
3679 let event_handle = tokio::spawn(async move {
3680 let mut events = Vec::new();
3681 while let Ok(event) = event_rx.recv().await {
3683 events.push(event.clone());
3684 if let AgentEvent::ConfirmationRequired { tool_id, .. } = event {
3685 cm_clone.confirm(&tool_id, true, None).await.ok();
3687 if let Ok(recv_event) = event_rx.recv().await {
3689 events.push(recv_event);
3690 }
3691 break;
3692 }
3693 }
3694 events
3695 });
3696
3697 let agent = AgentLoop::new(mock_client, tool_executor, test_tool_context(), config);
3698 let _result = agent.execute(&[], "Echo", None).await.unwrap();
3699
3700 let events = event_handle.await.unwrap();
3702 assert!(
3703 events
3704 .iter()
3705 .any(|e| matches!(e, AgentEvent::ConfirmationRequired { .. })),
3706 "Should have ConfirmationRequired event"
3707 );
3708 assert!(
3709 events
3710 .iter()
3711 .any(|e| matches!(e, AgentEvent::ConfirmationReceived { approved: true, .. })),
3712 "Should have ConfirmationReceived event with approved=true"
3713 );
3714 }
3715
3716 #[tokio::test]
3717 async fn test_agent_hitl_disabled_auto_executes() {
3718 use crate::hitl::{ConfirmationManager, ConfirmationPolicy};
3720 use tokio::sync::broadcast;
3721
3722 let mock_client = Arc::new(MockLlmClient::new(vec![
3723 MockLlmClient::tool_call_response(
3724 "tool-1",
3725 "bash",
3726 serde_json::json!({"command": "echo auto"}),
3727 ),
3728 MockLlmClient::text_response("Auto executed!"),
3729 ]));
3730
3731 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
3732
3733 let (event_tx, _event_rx) = broadcast::channel(100);
3735 let hitl_policy = ConfirmationPolicy {
3736 enabled: false, ..Default::default()
3738 };
3739 let confirmation_manager = Arc::new(ConfirmationManager::new(hitl_policy, event_tx));
3740
3741 let permission_policy = PermissionPolicy::new(); let config = AgentConfig {
3744 permission_checker: Some(Arc::new(permission_policy)),
3745 confirmation_manager: Some(confirmation_manager),
3746 ..Default::default()
3747 };
3748
3749 let agent = AgentLoop::new(mock_client, tool_executor, test_tool_context(), config);
3750 let result = agent.execute(&[], "Echo", None).await.unwrap();
3751
3752 assert_eq!(result.text, "Auto executed!");
3754 assert_eq!(result.tool_calls_count, 1);
3755 }
3756
3757 #[tokio::test]
3758 async fn test_agent_hitl_with_permission_deny_skips_hitl() {
3759 use crate::hitl::{ConfirmationManager, ConfirmationPolicy};
3761 use tokio::sync::broadcast;
3762
3763 let mock_client = Arc::new(MockLlmClient::new(vec![
3764 MockLlmClient::tool_call_response(
3765 "tool-1",
3766 "bash",
3767 serde_json::json!({"command": "rm -rf /"}),
3768 ),
3769 MockLlmClient::text_response("Blocked by permission."),
3770 ]));
3771
3772 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
3773
3774 let (event_tx, mut event_rx) = broadcast::channel(100);
3776 let hitl_policy = ConfirmationPolicy {
3777 enabled: true,
3778 ..Default::default()
3779 };
3780 let confirmation_manager = Arc::new(ConfirmationManager::new(hitl_policy, event_tx));
3781
3782 let permission_policy = PermissionPolicy::new().deny("bash(rm:*)");
3784
3785 let config = AgentConfig {
3786 permission_checker: Some(Arc::new(permission_policy)),
3787 confirmation_manager: Some(confirmation_manager),
3788 ..Default::default()
3789 };
3790
3791 let agent = AgentLoop::new(mock_client, tool_executor, test_tool_context(), config);
3792 let result = agent.execute(&[], "Delete", None).await.unwrap();
3793
3794 assert_eq!(result.text, "Blocked by permission.");
3796
3797 let mut found_confirmation = false;
3799 while let Ok(event) = event_rx.try_recv() {
3800 if matches!(event, AgentEvent::ConfirmationRequired { .. }) {
3801 found_confirmation = true;
3802 }
3803 }
3804 assert!(
3805 !found_confirmation,
3806 "HITL should not be triggered when permission is Deny"
3807 );
3808 }
3809
3810 #[tokio::test]
3811 async fn test_agent_hitl_with_permission_allow_skips_hitl() {
3812 use crate::hitl::{ConfirmationManager, ConfirmationPolicy};
3815 use tokio::sync::broadcast;
3816
3817 let mock_client = Arc::new(MockLlmClient::new(vec![
3818 MockLlmClient::tool_call_response(
3819 "tool-1",
3820 "bash",
3821 serde_json::json!({"command": "echo hello"}),
3822 ),
3823 MockLlmClient::text_response("Allowed!"),
3824 ]));
3825
3826 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
3827
3828 let (event_tx, mut event_rx) = broadcast::channel(100);
3830 let hitl_policy = ConfirmationPolicy {
3831 enabled: true,
3832 ..Default::default()
3833 };
3834 let confirmation_manager = Arc::new(ConfirmationManager::new(hitl_policy, event_tx));
3835
3836 let permission_policy = PermissionPolicy::new().allow("bash(echo:*)");
3838
3839 let config = AgentConfig {
3840 permission_checker: Some(Arc::new(permission_policy)),
3841 confirmation_manager: Some(confirmation_manager.clone()),
3842 ..Default::default()
3843 };
3844
3845 let agent = AgentLoop::new(mock_client, tool_executor, test_tool_context(), config);
3846 let result = agent.execute(&[], "Echo", None).await.unwrap();
3847
3848 assert_eq!(result.text, "Allowed!");
3850
3851 let mut found_confirmation = false;
3853 while let Ok(event) = event_rx.try_recv() {
3854 if matches!(event, AgentEvent::ConfirmationRequired { .. }) {
3855 found_confirmation = true;
3856 }
3857 }
3858 assert!(
3859 !found_confirmation,
3860 "Permission Allow should skip HITL confirmation"
3861 );
3862 }
3863
3864 #[tokio::test]
3865 async fn test_agent_hitl_multiple_tool_calls() {
3866 use crate::hitl::{ConfirmationManager, ConfirmationPolicy};
3868 use tokio::sync::broadcast;
3869
3870 let mock_client = Arc::new(MockLlmClient::new(vec![
3871 LlmResponse {
3873 message: Message {
3874 role: "assistant".to_string(),
3875 content: vec![
3876 ContentBlock::ToolUse {
3877 id: "tool-1".to_string(),
3878 name: "bash".to_string(),
3879 input: serde_json::json!({"command": "echo first"}),
3880 },
3881 ContentBlock::ToolUse {
3882 id: "tool-2".to_string(),
3883 name: "bash".to_string(),
3884 input: serde_json::json!({"command": "echo second"}),
3885 },
3886 ],
3887 reasoning_content: None,
3888 },
3889 usage: TokenUsage {
3890 prompt_tokens: 10,
3891 completion_tokens: 5,
3892 total_tokens: 15,
3893 cache_read_tokens: None,
3894 cache_write_tokens: None,
3895 },
3896 stop_reason: Some("tool_use".to_string()),
3897 meta: None,
3898 },
3899 MockLlmClient::text_response("Both executed!"),
3900 ]));
3901
3902 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
3903
3904 let (event_tx, _event_rx) = broadcast::channel(100);
3906 let hitl_policy = ConfirmationPolicy {
3907 enabled: true,
3908 default_timeout_ms: 5000,
3909 ..Default::default()
3910 };
3911 let confirmation_manager = Arc::new(ConfirmationManager::new(hitl_policy, event_tx));
3912
3913 let permission_policy = PermissionPolicy::new(); let config = AgentConfig {
3916 permission_checker: Some(Arc::new(permission_policy)),
3917 confirmation_manager: Some(confirmation_manager.clone()),
3918 ..Default::default()
3919 };
3920
3921 let cm_clone = confirmation_manager.clone();
3923 tokio::spawn(async move {
3924 tokio::time::sleep(std::time::Duration::from_millis(30)).await;
3925 cm_clone.confirm("tool-1", true, None).await.ok();
3926 tokio::time::sleep(std::time::Duration::from_millis(30)).await;
3927 cm_clone.confirm("tool-2", true, None).await.ok();
3928 });
3929
3930 let agent = AgentLoop::new(mock_client, tool_executor, test_tool_context(), config);
3931 let result = agent.execute(&[], "Run both", None).await.unwrap();
3932
3933 assert_eq!(result.text, "Both executed!");
3934 assert_eq!(result.tool_calls_count, 2);
3935 }
3936
3937 #[tokio::test]
3938 async fn test_agent_hitl_partial_approval() {
3939 use crate::hitl::{ConfirmationManager, ConfirmationPolicy};
3941 use tokio::sync::broadcast;
3942
3943 let mock_client = Arc::new(MockLlmClient::new(vec![
3944 LlmResponse {
3946 message: Message {
3947 role: "assistant".to_string(),
3948 content: vec![
3949 ContentBlock::ToolUse {
3950 id: "tool-1".to_string(),
3951 name: "bash".to_string(),
3952 input: serde_json::json!({"command": "echo safe"}),
3953 },
3954 ContentBlock::ToolUse {
3955 id: "tool-2".to_string(),
3956 name: "bash".to_string(),
3957 input: serde_json::json!({"command": "rm -rf /"}),
3958 },
3959 ],
3960 reasoning_content: None,
3961 },
3962 usage: TokenUsage {
3963 prompt_tokens: 10,
3964 completion_tokens: 5,
3965 total_tokens: 15,
3966 cache_read_tokens: None,
3967 cache_write_tokens: None,
3968 },
3969 stop_reason: Some("tool_use".to_string()),
3970 meta: None,
3971 },
3972 MockLlmClient::text_response("First worked, second rejected."),
3973 ]));
3974
3975 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
3976
3977 let (event_tx, _event_rx) = broadcast::channel(100);
3978 let hitl_policy = ConfirmationPolicy {
3979 enabled: true,
3980 default_timeout_ms: 5000,
3981 ..Default::default()
3982 };
3983 let confirmation_manager = Arc::new(ConfirmationManager::new(hitl_policy, event_tx));
3984
3985 let permission_policy = PermissionPolicy::new();
3986
3987 let config = AgentConfig {
3988 permission_checker: Some(Arc::new(permission_policy)),
3989 confirmation_manager: Some(confirmation_manager.clone()),
3990 ..Default::default()
3991 };
3992
3993 let cm_clone = confirmation_manager.clone();
3995 tokio::spawn(async move {
3996 tokio::time::sleep(std::time::Duration::from_millis(30)).await;
3997 cm_clone.confirm("tool-1", true, None).await.ok();
3998 tokio::time::sleep(std::time::Duration::from_millis(30)).await;
3999 cm_clone
4000 .confirm("tool-2", false, Some("Dangerous".to_string()))
4001 .await
4002 .ok();
4003 });
4004
4005 let agent = AgentLoop::new(mock_client, tool_executor, test_tool_context(), config);
4006 let result = agent.execute(&[], "Run both", None).await.unwrap();
4007
4008 assert_eq!(result.text, "First worked, second rejected.");
4009 assert_eq!(result.tool_calls_count, 2);
4010 }
4011
4012 #[tokio::test]
4013 async fn test_agent_hitl_yolo_mode_auto_approves() {
4014 use crate::hitl::{ConfirmationManager, ConfirmationPolicy, SessionLane};
4016 use tokio::sync::broadcast;
4017
4018 let mock_client = Arc::new(MockLlmClient::new(vec![
4019 MockLlmClient::tool_call_response(
4020 "tool-1",
4021 "read", serde_json::json!({"path": "/tmp/test.txt"}),
4023 ),
4024 MockLlmClient::text_response("File read!"),
4025 ]));
4026
4027 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
4028
4029 let (event_tx, mut event_rx) = broadcast::channel(100);
4031 let mut yolo_lanes = std::collections::HashSet::new();
4032 yolo_lanes.insert(SessionLane::Query);
4033 let hitl_policy = ConfirmationPolicy {
4034 enabled: true,
4035 yolo_lanes, ..Default::default()
4037 };
4038 let confirmation_manager = Arc::new(ConfirmationManager::new(hitl_policy, event_tx));
4039
4040 let permission_policy = PermissionPolicy::new();
4041
4042 let config = AgentConfig {
4043 permission_checker: Some(Arc::new(permission_policy)),
4044 confirmation_manager: Some(confirmation_manager),
4045 ..Default::default()
4046 };
4047
4048 let agent = AgentLoop::new(mock_client, tool_executor, test_tool_context(), config);
4049 let result = agent.execute(&[], "Read file", None).await.unwrap();
4050
4051 assert_eq!(result.text, "File read!");
4053
4054 let mut found_confirmation = false;
4056 while let Ok(event) = event_rx.try_recv() {
4057 if matches!(event, AgentEvent::ConfirmationRequired { .. }) {
4058 found_confirmation = true;
4059 }
4060 }
4061 assert!(
4062 !found_confirmation,
4063 "YOLO mode should not trigger confirmation"
4064 );
4065 }
4066
4067 #[tokio::test]
4068 async fn test_agent_config_with_all_options() {
4069 use crate::hitl::{ConfirmationManager, ConfirmationPolicy};
4070 use tokio::sync::broadcast;
4071
4072 let (event_tx, _) = broadcast::channel(100);
4073 let hitl_policy = ConfirmationPolicy::default();
4074 let confirmation_manager = Arc::new(ConfirmationManager::new(hitl_policy, event_tx));
4075
4076 let permission_policy = PermissionPolicy::new().allow("bash(*)");
4077
4078 let config = AgentConfig {
4079 prompt_slots: SystemPromptSlots {
4080 extra: Some("Test system prompt".to_string()),
4081 ..Default::default()
4082 },
4083 tools: vec![],
4084 max_tool_rounds: 10,
4085 permission_checker: Some(Arc::new(permission_policy)),
4086 confirmation_manager: Some(confirmation_manager),
4087 context_providers: vec![],
4088 planning_enabled: false,
4089 goal_tracking: false,
4090 hook_engine: None,
4091 skill_registry: None,
4092 ..AgentConfig::default()
4093 };
4094
4095 assert!(config.prompt_slots.build().contains("Test system prompt"));
4096 assert_eq!(config.max_tool_rounds, 10);
4097 assert!(config.permission_checker.is_some());
4098 assert!(config.confirmation_manager.is_some());
4099 assert!(config.context_providers.is_empty());
4100
4101 let debug_str = format!("{:?}", config);
4103 assert!(debug_str.contains("AgentConfig"));
4104 assert!(debug_str.contains("permission_checker: true"));
4105 assert!(debug_str.contains("confirmation_manager: true"));
4106 assert!(debug_str.contains("context_providers: 0"));
4107 }
4108
4109 use crate::context::{ContextItem, ContextType};
4114
4115 struct MockContextProvider {
4117 name: String,
4118 items: Vec<ContextItem>,
4119 on_turn_calls: std::sync::Arc<tokio::sync::RwLock<Vec<(String, String, String)>>>,
4120 }
4121
4122 impl MockContextProvider {
4123 fn new(name: &str) -> Self {
4124 Self {
4125 name: name.to_string(),
4126 items: Vec::new(),
4127 on_turn_calls: std::sync::Arc::new(tokio::sync::RwLock::new(Vec::new())),
4128 }
4129 }
4130
4131 fn with_items(mut self, items: Vec<ContextItem>) -> Self {
4132 self.items = items;
4133 self
4134 }
4135 }
4136
4137 #[async_trait::async_trait]
4138 impl ContextProvider for MockContextProvider {
4139 fn name(&self) -> &str {
4140 &self.name
4141 }
4142
4143 async fn query(&self, _query: &ContextQuery) -> anyhow::Result<ContextResult> {
4144 let mut result = ContextResult::new(&self.name);
4145 for item in &self.items {
4146 result.add_item(item.clone());
4147 }
4148 Ok(result)
4149 }
4150
4151 async fn on_turn_complete(
4152 &self,
4153 session_id: &str,
4154 prompt: &str,
4155 response: &str,
4156 ) -> anyhow::Result<()> {
4157 let mut calls = self.on_turn_calls.write().await;
4158 calls.push((
4159 session_id.to_string(),
4160 prompt.to_string(),
4161 response.to_string(),
4162 ));
4163 Ok(())
4164 }
4165 }
4166
4167 #[tokio::test]
4168 async fn test_agent_with_context_provider() {
4169 let mock_client = Arc::new(MockLlmClient::new(vec![MockLlmClient::text_response(
4170 "Response using context",
4171 )]));
4172
4173 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
4174
4175 let provider =
4176 MockContextProvider::new("test-provider").with_items(vec![ContextItem::new(
4177 "ctx-1",
4178 ContextType::Resource,
4179 "Relevant context here",
4180 )
4181 .with_source("test://docs/example")]);
4182
4183 let config = AgentConfig {
4184 prompt_slots: SystemPromptSlots {
4185 extra: Some("You are helpful.".to_string()),
4186 ..Default::default()
4187 },
4188 context_providers: vec![Arc::new(provider)],
4189 ..Default::default()
4190 };
4191
4192 let agent = AgentLoop::new(
4193 mock_client.clone(),
4194 tool_executor,
4195 test_tool_context(),
4196 config,
4197 );
4198 let result = agent.execute(&[], "What is X?", None).await.unwrap();
4199
4200 assert_eq!(result.text, "Response using context");
4201 assert_eq!(mock_client.call_count.load(Ordering::SeqCst), 1);
4202 }
4203
4204 #[tokio::test]
4205 async fn test_agent_context_provider_events() {
4206 let mock_client = Arc::new(MockLlmClient::new(vec![MockLlmClient::text_response(
4207 "Answer",
4208 )]));
4209
4210 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
4211
4212 let provider =
4213 MockContextProvider::new("event-provider").with_items(vec![ContextItem::new(
4214 "item-1",
4215 ContextType::Memory,
4216 "Memory content",
4217 )
4218 .with_token_count(50)]);
4219
4220 let config = AgentConfig {
4221 context_providers: vec![Arc::new(provider)],
4222 ..Default::default()
4223 };
4224
4225 let (tx, mut rx) = mpsc::channel(100);
4226 let agent = AgentLoop::new(mock_client, tool_executor, test_tool_context(), config);
4227 let _result = agent.execute(&[], "Test prompt", Some(tx)).await.unwrap();
4228
4229 let mut events = Vec::new();
4231 while let Ok(event) = rx.try_recv() {
4232 events.push(event);
4233 }
4234
4235 assert!(
4237 events
4238 .iter()
4239 .any(|e| matches!(e, AgentEvent::ContextResolving { .. })),
4240 "Should have ContextResolving event"
4241 );
4242 assert!(
4243 events
4244 .iter()
4245 .any(|e| matches!(e, AgentEvent::ContextResolved { .. })),
4246 "Should have ContextResolved event"
4247 );
4248
4249 for event in &events {
4251 if let AgentEvent::ContextResolved {
4252 total_items,
4253 total_tokens,
4254 } = event
4255 {
4256 assert_eq!(*total_items, 1);
4257 assert_eq!(*total_tokens, 50);
4258 }
4259 }
4260 }
4261
4262 #[tokio::test]
4263 async fn test_agent_multiple_context_providers() {
4264 let mock_client = Arc::new(MockLlmClient::new(vec![MockLlmClient::text_response(
4265 "Combined response",
4266 )]));
4267
4268 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
4269
4270 let provider1 = MockContextProvider::new("provider-1").with_items(vec![ContextItem::new(
4271 "p1-1",
4272 ContextType::Resource,
4273 "Resource from P1",
4274 )
4275 .with_token_count(100)]);
4276
4277 let provider2 = MockContextProvider::new("provider-2").with_items(vec![
4278 ContextItem::new("p2-1", ContextType::Memory, "Memory from P2").with_token_count(50),
4279 ContextItem::new("p2-2", ContextType::Skill, "Skill from P2").with_token_count(75),
4280 ]);
4281
4282 let config = AgentConfig {
4283 prompt_slots: SystemPromptSlots {
4284 extra: Some("Base system prompt.".to_string()),
4285 ..Default::default()
4286 },
4287 context_providers: vec![Arc::new(provider1), Arc::new(provider2)],
4288 ..Default::default()
4289 };
4290
4291 let (tx, mut rx) = mpsc::channel(100);
4292 let agent = AgentLoop::new(mock_client, tool_executor, test_tool_context(), config);
4293 let result = agent.execute(&[], "Query", Some(tx)).await.unwrap();
4294
4295 assert_eq!(result.text, "Combined response");
4296
4297 while let Ok(event) = rx.try_recv() {
4299 if let AgentEvent::ContextResolved {
4300 total_items,
4301 total_tokens,
4302 } = event
4303 {
4304 assert_eq!(total_items, 3); assert_eq!(total_tokens, 225); }
4307 }
4308 }
4309
4310 #[tokio::test]
4311 async fn test_agent_no_context_providers() {
4312 let mock_client = Arc::new(MockLlmClient::new(vec![MockLlmClient::text_response(
4313 "No context",
4314 )]));
4315
4316 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
4317
4318 let config = AgentConfig::default();
4320
4321 let (tx, mut rx) = mpsc::channel(100);
4322 let agent = AgentLoop::new(mock_client, tool_executor, test_tool_context(), config);
4323 let result = agent.execute(&[], "Simple prompt", Some(tx)).await.unwrap();
4324
4325 assert_eq!(result.text, "No context");
4326
4327 let mut events = Vec::new();
4329 while let Ok(event) = rx.try_recv() {
4330 events.push(event);
4331 }
4332
4333 assert!(
4334 !events
4335 .iter()
4336 .any(|e| matches!(e, AgentEvent::ContextResolving { .. })),
4337 "Should NOT have ContextResolving event"
4338 );
4339 }
4340
4341 #[tokio::test]
4342 async fn test_agent_context_on_turn_complete() {
4343 let mock_client = Arc::new(MockLlmClient::new(vec![MockLlmClient::text_response(
4344 "Final response",
4345 )]));
4346
4347 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
4348
4349 let provider = Arc::new(MockContextProvider::new("memory-provider"));
4350 let on_turn_calls = provider.on_turn_calls.clone();
4351
4352 let config = AgentConfig {
4353 context_providers: vec![provider],
4354 ..Default::default()
4355 };
4356
4357 let agent = AgentLoop::new(mock_client, tool_executor, test_tool_context(), config);
4358
4359 let result = agent
4361 .execute_with_session(&[], "User prompt", Some("sess-123"), None, None)
4362 .await
4363 .unwrap();
4364
4365 assert_eq!(result.text, "Final response");
4366
4367 let calls = on_turn_calls.read().await;
4369 assert_eq!(calls.len(), 1);
4370 assert_eq!(calls[0].0, "sess-123");
4371 assert_eq!(calls[0].1, "User prompt");
4372 assert_eq!(calls[0].2, "Final response");
4373 }
4374
4375 #[tokio::test]
4376 async fn test_agent_context_on_turn_complete_no_session() {
4377 let mock_client = Arc::new(MockLlmClient::new(vec![MockLlmClient::text_response(
4378 "Response",
4379 )]));
4380
4381 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
4382
4383 let provider = Arc::new(MockContextProvider::new("memory-provider"));
4384 let on_turn_calls = provider.on_turn_calls.clone();
4385
4386 let config = AgentConfig {
4387 context_providers: vec![provider],
4388 ..Default::default()
4389 };
4390
4391 let agent = AgentLoop::new(mock_client, tool_executor, test_tool_context(), config);
4392
4393 let _result = agent.execute(&[], "Prompt", None).await.unwrap();
4395
4396 let calls = on_turn_calls.read().await;
4398 assert!(calls.is_empty());
4399 }
4400
4401 #[tokio::test]
4402 async fn test_agent_build_augmented_system_prompt() {
4403 let mock_client = Arc::new(MockLlmClient::new(vec![MockLlmClient::text_response("OK")]));
4404
4405 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
4406
4407 let provider = MockContextProvider::new("test").with_items(vec![ContextItem::new(
4408 "doc-1",
4409 ContextType::Resource,
4410 "Auth uses JWT tokens.",
4411 )
4412 .with_source("viking://docs/auth")]);
4413
4414 let config = AgentConfig {
4415 prompt_slots: SystemPromptSlots {
4416 extra: Some("You are helpful.".to_string()),
4417 ..Default::default()
4418 },
4419 context_providers: vec![Arc::new(provider)],
4420 ..Default::default()
4421 };
4422
4423 let agent = AgentLoop::new(mock_client, tool_executor, test_tool_context(), config);
4424
4425 let context_results = agent.resolve_context("test", None).await;
4427 let augmented = agent.build_augmented_system_prompt(&context_results);
4428
4429 let augmented_str = augmented.unwrap();
4430 assert!(augmented_str.contains("You are helpful."));
4431 assert!(augmented_str.contains("<context source=\"viking://docs/auth\" type=\"Resource\">"));
4432 assert!(augmented_str.contains("Auth uses JWT tokens."));
4433 }
4434
4435 async fn collect_events(mut rx: mpsc::Receiver<AgentEvent>) -> Vec<AgentEvent> {
4441 let mut events = Vec::new();
4442 while let Ok(event) = rx.try_recv() {
4443 events.push(event);
4444 }
4445 while let Some(event) = rx.recv().await {
4447 events.push(event);
4448 }
4449 events
4450 }
4451
4452 #[tokio::test]
4453 async fn test_agent_multi_turn_tool_chain() {
4454 let mock_client = Arc::new(MockLlmClient::new(vec![
4456 MockLlmClient::tool_call_response(
4458 "t1",
4459 "bash",
4460 serde_json::json!({"command": "echo step1"}),
4461 ),
4462 MockLlmClient::tool_call_response(
4464 "t2",
4465 "bash",
4466 serde_json::json!({"command": "echo step2"}),
4467 ),
4468 MockLlmClient::text_response("Completed both steps: step1 then step2"),
4470 ]));
4471
4472 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
4473 let config = AgentConfig::default();
4474
4475 let agent = AgentLoop::new(
4476 mock_client.clone(),
4477 tool_executor,
4478 test_tool_context(),
4479 config,
4480 );
4481 let result = agent.execute(&[], "Run two steps", None).await.unwrap();
4482
4483 assert_eq!(result.text, "Completed both steps: step1 then step2");
4484 assert_eq!(result.tool_calls_count, 2);
4485 assert_eq!(mock_client.call_count.load(Ordering::SeqCst), 3);
4486
4487 assert_eq!(result.messages[0].role, "user");
4489 assert_eq!(result.messages[1].role, "assistant"); assert_eq!(result.messages[2].role, "user"); assert_eq!(result.messages[3].role, "assistant"); assert_eq!(result.messages[4].role, "user"); assert_eq!(result.messages[5].role, "assistant"); assert_eq!(result.messages.len(), 6);
4495 }
4496
4497 #[tokio::test]
4498 async fn test_agent_conversation_history_preserved() {
4499 let existing_history = vec![
4501 Message::user("What is Rust?"),
4502 Message {
4503 role: "assistant".to_string(),
4504 content: vec![ContentBlock::Text {
4505 text: "Rust is a systems programming language.".to_string(),
4506 }],
4507 reasoning_content: None,
4508 },
4509 ];
4510
4511 let mock_client = Arc::new(MockLlmClient::new(vec![MockLlmClient::text_response(
4512 "Rust was created by Graydon Hoare at Mozilla.",
4513 )]));
4514
4515 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
4516 let agent = AgentLoop::new(
4517 mock_client.clone(),
4518 tool_executor,
4519 test_tool_context(),
4520 AgentConfig::default(),
4521 );
4522
4523 let result = agent
4524 .execute(&existing_history, "Who created it?", None)
4525 .await
4526 .unwrap();
4527
4528 assert_eq!(result.messages.len(), 4);
4530 assert_eq!(result.messages[0].text(), "What is Rust?");
4531 assert_eq!(
4532 result.messages[1].text(),
4533 "Rust is a systems programming language."
4534 );
4535 assert_eq!(result.messages[2].text(), "Who created it?");
4536 assert_eq!(
4537 result.messages[3].text(),
4538 "Rust was created by Graydon Hoare at Mozilla."
4539 );
4540 }
4541
4542 #[tokio::test]
4543 async fn test_agent_event_stream_completeness() {
4544 let mock_client = Arc::new(MockLlmClient::new(vec![
4546 MockLlmClient::tool_call_response(
4547 "t1",
4548 "bash",
4549 serde_json::json!({"command": "echo hi"}),
4550 ),
4551 MockLlmClient::text_response("Done"),
4552 ]));
4553
4554 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
4555 let agent = AgentLoop::new(
4556 mock_client,
4557 tool_executor,
4558 test_tool_context(),
4559 AgentConfig::default(),
4560 );
4561
4562 let (tx, rx) = mpsc::channel(100);
4563 let result = agent.execute(&[], "Say hi", Some(tx)).await.unwrap();
4564 assert_eq!(result.text, "Done");
4565
4566 let events = collect_events(rx).await;
4567
4568 let event_types: Vec<&str> = events
4570 .iter()
4571 .map(|e| match e {
4572 AgentEvent::Start { .. } => "Start",
4573 AgentEvent::TurnStart { .. } => "TurnStart",
4574 AgentEvent::TurnEnd { .. } => "TurnEnd",
4575 AgentEvent::ToolEnd { .. } => "ToolEnd",
4576 AgentEvent::End { .. } => "End",
4577 _ => "Other",
4578 })
4579 .collect();
4580
4581 assert_eq!(event_types.first(), Some(&"Start"));
4583 assert_eq!(event_types.last(), Some(&"End"));
4584
4585 let turn_starts = event_types.iter().filter(|&&t| t == "TurnStart").count();
4587 assert_eq!(turn_starts, 2);
4588
4589 let tool_ends = event_types.iter().filter(|&&t| t == "ToolEnd").count();
4591 assert_eq!(tool_ends, 1);
4592 }
4593
4594 #[tokio::test]
4595 async fn test_agent_multiple_tools_single_turn() {
4596 let mock_client = Arc::new(MockLlmClient::new(vec![
4598 LlmResponse {
4599 message: Message {
4600 role: "assistant".to_string(),
4601 content: vec![
4602 ContentBlock::ToolUse {
4603 id: "t1".to_string(),
4604 name: "bash".to_string(),
4605 input: serde_json::json!({"command": "echo first"}),
4606 },
4607 ContentBlock::ToolUse {
4608 id: "t2".to_string(),
4609 name: "bash".to_string(),
4610 input: serde_json::json!({"command": "echo second"}),
4611 },
4612 ],
4613 reasoning_content: None,
4614 },
4615 usage: TokenUsage {
4616 prompt_tokens: 10,
4617 completion_tokens: 5,
4618 total_tokens: 15,
4619 cache_read_tokens: None,
4620 cache_write_tokens: None,
4621 },
4622 stop_reason: Some("tool_use".to_string()),
4623 meta: None,
4624 },
4625 MockLlmClient::text_response("Both commands ran"),
4626 ]));
4627
4628 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
4629 let agent = AgentLoop::new(
4630 mock_client.clone(),
4631 tool_executor,
4632 test_tool_context(),
4633 AgentConfig::default(),
4634 );
4635
4636 let result = agent.execute(&[], "Run both", None).await.unwrap();
4637
4638 assert_eq!(result.text, "Both commands ran");
4639 assert_eq!(result.tool_calls_count, 2);
4640 assert_eq!(mock_client.call_count.load(Ordering::SeqCst), 2); assert_eq!(result.messages[0].role, "user");
4644 assert_eq!(result.messages[1].role, "assistant");
4645 assert_eq!(result.messages[2].role, "user"); assert_eq!(result.messages[3].role, "user"); assert_eq!(result.messages[4].role, "assistant");
4648 }
4649
4650 #[tokio::test]
4651 async fn test_agent_token_usage_accumulation() {
4652 let mock_client = Arc::new(MockLlmClient::new(vec![
4654 MockLlmClient::tool_call_response(
4655 "t1",
4656 "bash",
4657 serde_json::json!({"command": "echo x"}),
4658 ),
4659 MockLlmClient::text_response("Done"),
4660 ]));
4661
4662 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
4663 let agent = AgentLoop::new(
4664 mock_client,
4665 tool_executor,
4666 test_tool_context(),
4667 AgentConfig::default(),
4668 );
4669
4670 let result = agent.execute(&[], "test", None).await.unwrap();
4671
4672 assert_eq!(result.usage.prompt_tokens, 20);
4675 assert_eq!(result.usage.completion_tokens, 10);
4676 assert_eq!(result.usage.total_tokens, 30);
4677 }
4678
4679 #[tokio::test]
4680 async fn test_agent_system_prompt_passed() {
4681 let mock_client = Arc::new(MockLlmClient::new(vec![MockLlmClient::text_response(
4683 "I am a coding assistant.",
4684 )]));
4685
4686 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
4687 let config = AgentConfig {
4688 prompt_slots: SystemPromptSlots {
4689 extra: Some("You are a coding assistant.".to_string()),
4690 ..Default::default()
4691 },
4692 ..Default::default()
4693 };
4694
4695 let agent = AgentLoop::new(
4696 mock_client.clone(),
4697 tool_executor,
4698 test_tool_context(),
4699 config,
4700 );
4701 let result = agent.execute(&[], "What are you?", None).await.unwrap();
4702
4703 assert_eq!(result.text, "I am a coding assistant.");
4704 assert_eq!(mock_client.call_count.load(Ordering::SeqCst), 1);
4705 }
4706
4707 #[tokio::test]
4708 async fn test_agent_max_rounds_with_persistent_tool_calls() {
4709 let mut responses = Vec::new();
4711 for i in 0..15 {
4712 responses.push(MockLlmClient::tool_call_response(
4713 &format!("t{}", i),
4714 "bash",
4715 serde_json::json!({"command": format!("echo round{}", i)}),
4716 ));
4717 }
4718
4719 let mock_client = Arc::new(MockLlmClient::new(responses));
4720 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
4721 let config = AgentConfig {
4722 max_tool_rounds: 5,
4723 ..Default::default()
4724 };
4725
4726 let agent = AgentLoop::new(
4727 mock_client.clone(),
4728 tool_executor,
4729 test_tool_context(),
4730 config,
4731 );
4732 let result = agent.execute(&[], "Loop forever", None).await;
4733
4734 assert!(result.is_err());
4735 let err = result.unwrap_err().to_string();
4736 assert!(err.contains("Max tool rounds (5) exceeded"));
4737 }
4738
4739 #[tokio::test]
4740 async fn test_agent_end_event_contains_final_text() {
4741 let mock_client = Arc::new(MockLlmClient::new(vec![MockLlmClient::text_response(
4742 "Final answer here",
4743 )]));
4744
4745 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
4746 let agent = AgentLoop::new(
4747 mock_client,
4748 tool_executor,
4749 test_tool_context(),
4750 AgentConfig::default(),
4751 );
4752
4753 let (tx, rx) = mpsc::channel(100);
4754 agent.execute(&[], "test", Some(tx)).await.unwrap();
4755
4756 let events = collect_events(rx).await;
4757 let end_event = events.iter().find(|e| matches!(e, AgentEvent::End { .. }));
4758 assert!(end_event.is_some());
4759
4760 if let AgentEvent::End { text, usage, .. } = end_event.unwrap() {
4761 assert_eq!(text, "Final answer here");
4762 assert_eq!(usage.total_tokens, 15);
4763 }
4764 }
4765}
4766
4767#[cfg(test)]
4768mod extra_agent_tests {
4769 use super::*;
4770 use crate::agent::tests::MockLlmClient;
4771 use crate::queue::SessionQueueConfig;
4772 use crate::tools::ToolExecutor;
4773 use std::path::PathBuf;
4774 use std::sync::atomic::{AtomicUsize, Ordering};
4775
4776 fn test_tool_context() -> ToolContext {
4777 ToolContext::new(PathBuf::from("/tmp"))
4778 }
4779
4780 #[test]
4785 fn test_agent_config_debug() {
4786 let config = AgentConfig {
4787 prompt_slots: SystemPromptSlots {
4788 extra: Some("You are helpful".to_string()),
4789 ..Default::default()
4790 },
4791 tools: vec![],
4792 max_tool_rounds: 10,
4793 permission_checker: None,
4794 confirmation_manager: None,
4795 context_providers: vec![],
4796 planning_enabled: true,
4797 goal_tracking: false,
4798 hook_engine: None,
4799 skill_registry: None,
4800 ..AgentConfig::default()
4801 };
4802 let debug = format!("{:?}", config);
4803 assert!(debug.contains("AgentConfig"));
4804 assert!(debug.contains("planning_enabled"));
4805 }
4806
4807 #[test]
4808 fn test_agent_config_default_values() {
4809 let config = AgentConfig::default();
4810 assert_eq!(config.max_tool_rounds, MAX_TOOL_ROUNDS);
4811 assert!(!config.planning_enabled);
4812 assert!(!config.goal_tracking);
4813 assert!(config.context_providers.is_empty());
4814 }
4815
4816 #[test]
4821 fn test_agent_event_serialize_start() {
4822 let event = AgentEvent::Start {
4823 prompt: "Hello".to_string(),
4824 };
4825 let json = serde_json::to_string(&event).unwrap();
4826 assert!(json.contains("agent_start"));
4827 assert!(json.contains("Hello"));
4828 }
4829
4830 #[test]
4831 fn test_agent_event_serialize_text_delta() {
4832 let event = AgentEvent::TextDelta {
4833 text: "chunk".to_string(),
4834 };
4835 let json = serde_json::to_string(&event).unwrap();
4836 assert!(json.contains("text_delta"));
4837 }
4838
4839 #[test]
4840 fn test_agent_event_serialize_tool_start() {
4841 let event = AgentEvent::ToolStart {
4842 id: "t1".to_string(),
4843 name: "bash".to_string(),
4844 };
4845 let json = serde_json::to_string(&event).unwrap();
4846 assert!(json.contains("tool_start"));
4847 assert!(json.contains("bash"));
4848 }
4849
4850 #[test]
4851 fn test_agent_event_serialize_tool_end() {
4852 let event = AgentEvent::ToolEnd {
4853 id: "t1".to_string(),
4854 name: "bash".to_string(),
4855 output: "hello".to_string(),
4856 exit_code: 0,
4857 metadata: None,
4858 };
4859 let json = serde_json::to_string(&event).unwrap();
4860 assert!(json.contains("tool_end"));
4861 }
4862
4863 #[test]
4864 fn test_agent_event_tool_end_has_metadata_field() {
4865 let event = AgentEvent::ToolEnd {
4866 id: "t1".to_string(),
4867 name: "write".to_string(),
4868 output: "Wrote 5 bytes".to_string(),
4869 exit_code: 0,
4870 metadata: Some(
4871 serde_json::json!({ "before": "old", "after": "new", "file_path": "f.txt" }),
4872 ),
4873 };
4874 let json = serde_json::to_string(&event).unwrap();
4875 assert!(json.contains("\"before\""));
4876 }
4877
4878 #[test]
4879 fn test_agent_event_serialize_error() {
4880 let event = AgentEvent::Error {
4881 message: "oops".to_string(),
4882 };
4883 let json = serde_json::to_string(&event).unwrap();
4884 assert!(json.contains("error"));
4885 assert!(json.contains("oops"));
4886 }
4887
4888 #[test]
4889 fn test_agent_event_serialize_confirmation_required() {
4890 let event = AgentEvent::ConfirmationRequired {
4891 tool_id: "t1".to_string(),
4892 tool_name: "bash".to_string(),
4893 args: serde_json::json!({"cmd": "rm"}),
4894 timeout_ms: 30000,
4895 };
4896 let json = serde_json::to_string(&event).unwrap();
4897 assert!(json.contains("confirmation_required"));
4898 }
4899
4900 #[test]
4901 fn test_agent_event_serialize_confirmation_received() {
4902 let event = AgentEvent::ConfirmationReceived {
4903 tool_id: "t1".to_string(),
4904 approved: true,
4905 reason: Some("safe".to_string()),
4906 };
4907 let json = serde_json::to_string(&event).unwrap();
4908 assert!(json.contains("confirmation_received"));
4909 }
4910
4911 #[test]
4912 fn test_agent_event_serialize_confirmation_timeout() {
4913 let event = AgentEvent::ConfirmationTimeout {
4914 tool_id: "t1".to_string(),
4915 action_taken: "rejected".to_string(),
4916 };
4917 let json = serde_json::to_string(&event).unwrap();
4918 assert!(json.contains("confirmation_timeout"));
4919 }
4920
4921 #[test]
4922 fn test_agent_event_serialize_external_task_pending() {
4923 let event = AgentEvent::ExternalTaskPending {
4924 task_id: "task-1".to_string(),
4925 session_id: "sess-1".to_string(),
4926 lane: crate::hitl::SessionLane::Execute,
4927 command_type: "bash".to_string(),
4928 payload: serde_json::json!({}),
4929 timeout_ms: 60000,
4930 };
4931 let json = serde_json::to_string(&event).unwrap();
4932 assert!(json.contains("external_task_pending"));
4933 }
4934
4935 #[test]
4936 fn test_agent_event_serialize_external_task_completed() {
4937 let event = AgentEvent::ExternalTaskCompleted {
4938 task_id: "task-1".to_string(),
4939 session_id: "sess-1".to_string(),
4940 success: false,
4941 };
4942 let json = serde_json::to_string(&event).unwrap();
4943 assert!(json.contains("external_task_completed"));
4944 }
4945
4946 #[test]
4947 fn test_agent_event_serialize_permission_denied() {
4948 let event = AgentEvent::PermissionDenied {
4949 tool_id: "t1".to_string(),
4950 tool_name: "bash".to_string(),
4951 args: serde_json::json!({}),
4952 reason: "denied".to_string(),
4953 };
4954 let json = serde_json::to_string(&event).unwrap();
4955 assert!(json.contains("permission_denied"));
4956 }
4957
4958 #[test]
4959 fn test_agent_event_serialize_context_compacted() {
4960 let event = AgentEvent::ContextCompacted {
4961 session_id: "sess-1".to_string(),
4962 before_messages: 100,
4963 after_messages: 20,
4964 percent_before: 0.85,
4965 };
4966 let json = serde_json::to_string(&event).unwrap();
4967 assert!(json.contains("context_compacted"));
4968 }
4969
4970 #[test]
4971 fn test_agent_event_serialize_turn_start() {
4972 let event = AgentEvent::TurnStart { turn: 3 };
4973 let json = serde_json::to_string(&event).unwrap();
4974 assert!(json.contains("turn_start"));
4975 }
4976
4977 #[test]
4978 fn test_agent_event_serialize_turn_end() {
4979 let event = AgentEvent::TurnEnd {
4980 turn: 3,
4981 usage: TokenUsage::default(),
4982 };
4983 let json = serde_json::to_string(&event).unwrap();
4984 assert!(json.contains("turn_end"));
4985 }
4986
4987 #[test]
4988 fn test_agent_event_serialize_end() {
4989 let event = AgentEvent::End {
4990 text: "Done".to_string(),
4991 usage: TokenUsage {
4992 prompt_tokens: 100,
4993 completion_tokens: 50,
4994 total_tokens: 150,
4995 cache_read_tokens: None,
4996 cache_write_tokens: None,
4997 },
4998 meta: None,
4999 };
5000 let json = serde_json::to_string(&event).unwrap();
5001 assert!(json.contains("agent_end"));
5002 }
5003
5004 #[test]
5009 fn test_agent_result_fields() {
5010 let result = AgentResult {
5011 text: "output".to_string(),
5012 messages: vec![Message::user("hello")],
5013 usage: TokenUsage::default(),
5014 tool_calls_count: 3,
5015 };
5016 assert_eq!(result.text, "output");
5017 assert_eq!(result.messages.len(), 1);
5018 assert_eq!(result.tool_calls_count, 3);
5019 }
5020
5021 #[test]
5026 fn test_agent_event_serialize_context_resolving() {
5027 let event = AgentEvent::ContextResolving {
5028 providers: vec!["provider1".to_string(), "provider2".to_string()],
5029 };
5030 let json = serde_json::to_string(&event).unwrap();
5031 assert!(json.contains("context_resolving"));
5032 assert!(json.contains("provider1"));
5033 }
5034
5035 #[test]
5036 fn test_agent_event_serialize_context_resolved() {
5037 let event = AgentEvent::ContextResolved {
5038 total_items: 5,
5039 total_tokens: 1000,
5040 };
5041 let json = serde_json::to_string(&event).unwrap();
5042 assert!(json.contains("context_resolved"));
5043 assert!(json.contains("1000"));
5044 }
5045
5046 #[test]
5047 fn test_agent_event_serialize_command_dead_lettered() {
5048 let event = AgentEvent::CommandDeadLettered {
5049 command_id: "cmd-1".to_string(),
5050 command_type: "bash".to_string(),
5051 lane: "execute".to_string(),
5052 error: "timeout".to_string(),
5053 attempts: 3,
5054 };
5055 let json = serde_json::to_string(&event).unwrap();
5056 assert!(json.contains("command_dead_lettered"));
5057 assert!(json.contains("cmd-1"));
5058 }
5059
5060 #[test]
5061 fn test_agent_event_serialize_command_retry() {
5062 let event = AgentEvent::CommandRetry {
5063 command_id: "cmd-2".to_string(),
5064 command_type: "read".to_string(),
5065 lane: "query".to_string(),
5066 attempt: 2,
5067 delay_ms: 1000,
5068 };
5069 let json = serde_json::to_string(&event).unwrap();
5070 assert!(json.contains("command_retry"));
5071 assert!(json.contains("cmd-2"));
5072 }
5073
5074 #[test]
5075 fn test_agent_event_serialize_queue_alert() {
5076 let event = AgentEvent::QueueAlert {
5077 level: "warning".to_string(),
5078 alert_type: "depth".to_string(),
5079 message: "Queue depth exceeded".to_string(),
5080 };
5081 let json = serde_json::to_string(&event).unwrap();
5082 assert!(json.contains("queue_alert"));
5083 assert!(json.contains("warning"));
5084 }
5085
5086 #[test]
5087 fn test_agent_event_serialize_task_updated() {
5088 let event = AgentEvent::TaskUpdated {
5089 session_id: "sess-1".to_string(),
5090 tasks: vec![],
5091 };
5092 let json = serde_json::to_string(&event).unwrap();
5093 assert!(json.contains("task_updated"));
5094 assert!(json.contains("sess-1"));
5095 }
5096
5097 #[test]
5098 fn test_agent_event_serialize_memory_stored() {
5099 let event = AgentEvent::MemoryStored {
5100 memory_id: "mem-1".to_string(),
5101 memory_type: "conversation".to_string(),
5102 importance: 0.8,
5103 tags: vec!["important".to_string()],
5104 };
5105 let json = serde_json::to_string(&event).unwrap();
5106 assert!(json.contains("memory_stored"));
5107 assert!(json.contains("mem-1"));
5108 }
5109
5110 #[test]
5111 fn test_agent_event_serialize_memory_recalled() {
5112 let event = AgentEvent::MemoryRecalled {
5113 memory_id: "mem-2".to_string(),
5114 content: "Previous conversation".to_string(),
5115 relevance: 0.9,
5116 };
5117 let json = serde_json::to_string(&event).unwrap();
5118 assert!(json.contains("memory_recalled"));
5119 assert!(json.contains("mem-2"));
5120 }
5121
5122 #[test]
5123 fn test_agent_event_serialize_memories_searched() {
5124 let event = AgentEvent::MemoriesSearched {
5125 query: Some("search term".to_string()),
5126 tags: vec!["tag1".to_string()],
5127 result_count: 5,
5128 };
5129 let json = serde_json::to_string(&event).unwrap();
5130 assert!(json.contains("memories_searched"));
5131 assert!(json.contains("search term"));
5132 }
5133
5134 #[test]
5135 fn test_agent_event_serialize_memory_cleared() {
5136 let event = AgentEvent::MemoryCleared {
5137 tier: "short_term".to_string(),
5138 count: 10,
5139 };
5140 let json = serde_json::to_string(&event).unwrap();
5141 assert!(json.contains("memory_cleared"));
5142 assert!(json.contains("short_term"));
5143 }
5144
5145 #[test]
5146 fn test_agent_event_serialize_subagent_start() {
5147 let event = AgentEvent::SubagentStart {
5148 task_id: "task-1".to_string(),
5149 session_id: "child-sess".to_string(),
5150 parent_session_id: "parent-sess".to_string(),
5151 agent: "explore".to_string(),
5152 description: "Explore codebase".to_string(),
5153 };
5154 let json = serde_json::to_string(&event).unwrap();
5155 assert!(json.contains("subagent_start"));
5156 assert!(json.contains("explore"));
5157 }
5158
5159 #[test]
5160 fn test_agent_event_serialize_subagent_progress() {
5161 let event = AgentEvent::SubagentProgress {
5162 task_id: "task-1".to_string(),
5163 session_id: "child-sess".to_string(),
5164 status: "processing".to_string(),
5165 metadata: serde_json::json!({"progress": 50}),
5166 };
5167 let json = serde_json::to_string(&event).unwrap();
5168 assert!(json.contains("subagent_progress"));
5169 assert!(json.contains("processing"));
5170 }
5171
5172 #[test]
5173 fn test_agent_event_serialize_subagent_end() {
5174 let event = AgentEvent::SubagentEnd {
5175 task_id: "task-1".to_string(),
5176 session_id: "child-sess".to_string(),
5177 agent: "explore".to_string(),
5178 output: "Found 10 files".to_string(),
5179 success: true,
5180 };
5181 let json = serde_json::to_string(&event).unwrap();
5182 assert!(json.contains("subagent_end"));
5183 assert!(json.contains("Found 10 files"));
5184 }
5185
5186 #[test]
5187 fn test_agent_event_serialize_planning_start() {
5188 let event = AgentEvent::PlanningStart {
5189 prompt: "Build a web app".to_string(),
5190 };
5191 let json = serde_json::to_string(&event).unwrap();
5192 assert!(json.contains("planning_start"));
5193 assert!(json.contains("Build a web app"));
5194 }
5195
5196 #[test]
5197 fn test_agent_event_serialize_planning_end() {
5198 use crate::planning::{Complexity, ExecutionPlan};
5199 let plan = ExecutionPlan::new("Test goal".to_string(), Complexity::Simple);
5200 let event = AgentEvent::PlanningEnd {
5201 plan,
5202 estimated_steps: 3,
5203 };
5204 let json = serde_json::to_string(&event).unwrap();
5205 assert!(json.contains("planning_end"));
5206 assert!(json.contains("estimated_steps"));
5207 }
5208
5209 #[test]
5210 fn test_agent_event_serialize_step_start() {
5211 let event = AgentEvent::StepStart {
5212 step_id: "step-1".to_string(),
5213 description: "Initialize project".to_string(),
5214 step_number: 1,
5215 total_steps: 5,
5216 };
5217 let json = serde_json::to_string(&event).unwrap();
5218 assert!(json.contains("step_start"));
5219 assert!(json.contains("Initialize project"));
5220 }
5221
5222 #[test]
5223 fn test_agent_event_serialize_step_end() {
5224 let event = AgentEvent::StepEnd {
5225 step_id: "step-1".to_string(),
5226 status: TaskStatus::Completed,
5227 step_number: 1,
5228 total_steps: 5,
5229 };
5230 let json = serde_json::to_string(&event).unwrap();
5231 assert!(json.contains("step_end"));
5232 assert!(json.contains("step-1"));
5233 }
5234
5235 #[test]
5236 fn test_agent_event_serialize_goal_extracted() {
5237 use crate::planning::AgentGoal;
5238 let goal = AgentGoal::new("Complete the task".to_string());
5239 let event = AgentEvent::GoalExtracted { goal };
5240 let json = serde_json::to_string(&event).unwrap();
5241 assert!(json.contains("goal_extracted"));
5242 }
5243
5244 #[test]
5245 fn test_agent_event_serialize_goal_progress() {
5246 let event = AgentEvent::GoalProgress {
5247 goal: "Build app".to_string(),
5248 progress: 0.5,
5249 completed_steps: 2,
5250 total_steps: 4,
5251 };
5252 let json = serde_json::to_string(&event).unwrap();
5253 assert!(json.contains("goal_progress"));
5254 assert!(json.contains("0.5"));
5255 }
5256
5257 #[test]
5258 fn test_agent_event_serialize_goal_achieved() {
5259 let event = AgentEvent::GoalAchieved {
5260 goal: "Build app".to_string(),
5261 total_steps: 4,
5262 duration_ms: 5000,
5263 };
5264 let json = serde_json::to_string(&event).unwrap();
5265 assert!(json.contains("goal_achieved"));
5266 assert!(json.contains("5000"));
5267 }
5268
5269 #[tokio::test]
5270 async fn test_extract_goal_with_json_response() {
5271 let mock_client = Arc::new(MockLlmClient::new(vec![MockLlmClient::text_response(
5273 r#"{"description": "Build web app", "success_criteria": ["App runs on port 3000", "Has login page"]}"#,
5274 )]));
5275 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
5276 let agent = AgentLoop::new(
5277 mock_client,
5278 tool_executor,
5279 test_tool_context(),
5280 AgentConfig::default(),
5281 );
5282
5283 let goal = agent.extract_goal("Build a web app").await.unwrap();
5284 assert_eq!(goal.description, "Build web app");
5285 assert_eq!(goal.success_criteria.len(), 2);
5286 assert_eq!(goal.success_criteria[0], "App runs on port 3000");
5287 }
5288
5289 #[tokio::test]
5290 async fn test_extract_goal_fallback_on_non_json() {
5291 let mock_client = Arc::new(MockLlmClient::new(vec![MockLlmClient::text_response(
5293 "Some non-JSON response",
5294 )]));
5295 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
5296 let agent = AgentLoop::new(
5297 mock_client,
5298 tool_executor,
5299 test_tool_context(),
5300 AgentConfig::default(),
5301 );
5302
5303 let goal = agent.extract_goal("Do something").await.unwrap();
5304 assert_eq!(goal.description, "Do something");
5306 assert_eq!(goal.success_criteria.len(), 2);
5308 }
5309
5310 #[tokio::test]
5311 async fn test_check_goal_achievement_json_yes() {
5312 let mock_client = Arc::new(MockLlmClient::new(vec![MockLlmClient::text_response(
5313 r#"{"achieved": true, "progress": 1.0, "remaining_criteria": []}"#,
5314 )]));
5315 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
5316 let agent = AgentLoop::new(
5317 mock_client,
5318 tool_executor,
5319 test_tool_context(),
5320 AgentConfig::default(),
5321 );
5322
5323 let goal = crate::planning::AgentGoal::new("Test goal".to_string());
5324 let achieved = agent
5325 .check_goal_achievement(&goal, "All done")
5326 .await
5327 .unwrap();
5328 assert!(achieved);
5329 }
5330
5331 #[tokio::test]
5332 async fn test_check_goal_achievement_fallback_not_done() {
5333 let mock_client = Arc::new(MockLlmClient::new(vec![MockLlmClient::text_response(
5335 "invalid json",
5336 )]));
5337 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
5338 let agent = AgentLoop::new(
5339 mock_client,
5340 tool_executor,
5341 test_tool_context(),
5342 AgentConfig::default(),
5343 );
5344
5345 let goal = crate::planning::AgentGoal::new("Test goal".to_string());
5346 let achieved = agent
5348 .check_goal_achievement(&goal, "still working")
5349 .await
5350 .unwrap();
5351 assert!(!achieved);
5352 }
5353
5354 #[test]
5359 fn test_build_augmented_system_prompt_empty_context() {
5360 let mock_client = Arc::new(MockLlmClient::new(vec![]));
5361 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
5362 let config = AgentConfig {
5363 prompt_slots: SystemPromptSlots {
5364 extra: Some("Base prompt".to_string()),
5365 ..Default::default()
5366 },
5367 ..Default::default()
5368 };
5369 let agent = AgentLoop::new(mock_client, tool_executor, test_tool_context(), config);
5370
5371 let result = agent.build_augmented_system_prompt(&[]);
5372 assert!(result.unwrap().contains("Base prompt"));
5373 }
5374
5375 #[test]
5376 fn test_build_augmented_system_prompt_no_custom_slots() {
5377 let mock_client = Arc::new(MockLlmClient::new(vec![]));
5378 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
5379 let agent = AgentLoop::new(
5380 mock_client,
5381 tool_executor,
5382 test_tool_context(),
5383 AgentConfig::default(),
5384 );
5385
5386 let result = agent.build_augmented_system_prompt(&[]);
5387 assert!(result.is_some());
5389 assert!(result.unwrap().contains("Core Behaviour"));
5390 }
5391
5392 #[test]
5393 fn test_build_augmented_system_prompt_with_context_no_base() {
5394 use crate::context::{ContextItem, ContextResult, ContextType};
5395
5396 let mock_client = Arc::new(MockLlmClient::new(vec![]));
5397 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
5398 let agent = AgentLoop::new(
5399 mock_client,
5400 tool_executor,
5401 test_tool_context(),
5402 AgentConfig::default(),
5403 );
5404
5405 let context = vec![ContextResult {
5406 provider: "test".to_string(),
5407 items: vec![ContextItem::new("id1", ContextType::Resource, "Content")],
5408 total_tokens: 10,
5409 truncated: false,
5410 }];
5411
5412 let result = agent.build_augmented_system_prompt(&context);
5413 assert!(result.is_some());
5414 let text = result.unwrap();
5415 assert!(text.contains("<context"));
5416 assert!(text.contains("Content"));
5417 }
5418
5419 #[test]
5424 fn test_agent_result_clone() {
5425 let result = AgentResult {
5426 text: "output".to_string(),
5427 messages: vec![Message::user("hello")],
5428 usage: TokenUsage::default(),
5429 tool_calls_count: 3,
5430 };
5431 let cloned = result.clone();
5432 assert_eq!(cloned.text, result.text);
5433 assert_eq!(cloned.tool_calls_count, result.tool_calls_count);
5434 }
5435
5436 #[test]
5437 fn test_agent_result_debug() {
5438 let result = AgentResult {
5439 text: "output".to_string(),
5440 messages: vec![Message::user("hello")],
5441 usage: TokenUsage::default(),
5442 tool_calls_count: 3,
5443 };
5444 let debug = format!("{:?}", result);
5445 assert!(debug.contains("AgentResult"));
5446 assert!(debug.contains("output"));
5447 }
5448
5449 #[tokio::test]
5458 async fn test_tool_command_command_type() {
5459 let executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
5460 let cmd = ToolCommand {
5461 tool_executor: executor,
5462 tool_name: "read".to_string(),
5463 tool_args: serde_json::json!({"file": "test.rs"}),
5464 skill_registry: None,
5465 tool_context: test_tool_context(),
5466 };
5467 assert_eq!(cmd.command_type(), "read");
5468 }
5469
5470 #[tokio::test]
5471 async fn test_tool_command_payload() {
5472 let executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
5473 let args = serde_json::json!({"file": "test.rs", "offset": 10});
5474 let cmd = ToolCommand {
5475 tool_executor: executor,
5476 tool_name: "read".to_string(),
5477 tool_args: args.clone(),
5478 skill_registry: None,
5479 tool_context: test_tool_context(),
5480 };
5481 assert_eq!(cmd.payload(), args);
5482 }
5483
5484 #[tokio::test(flavor = "multi_thread")]
5489 async fn test_agent_loop_with_queue() {
5490 use tokio::sync::broadcast;
5491
5492 let mock_client = Arc::new(MockLlmClient::new(vec![MockLlmClient::text_response(
5493 "Hello",
5494 )]));
5495 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
5496 let config = AgentConfig::default();
5497
5498 let (event_tx, _) = broadcast::channel(100);
5499 let queue = SessionLaneQueue::new("test-session", SessionQueueConfig::default(), event_tx)
5500 .await
5501 .unwrap();
5502
5503 let agent = AgentLoop::new(mock_client, tool_executor, test_tool_context(), config)
5504 .with_queue(Arc::new(queue));
5505
5506 assert!(agent.command_queue.is_some());
5507 }
5508
5509 #[tokio::test]
5510 async fn test_agent_loop_without_queue() {
5511 let mock_client = Arc::new(MockLlmClient::new(vec![MockLlmClient::text_response(
5512 "Hello",
5513 )]));
5514 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
5515 let config = AgentConfig::default();
5516
5517 let agent = AgentLoop::new(mock_client, tool_executor, test_tool_context(), config);
5518
5519 assert!(agent.command_queue.is_none());
5520 }
5521
5522 #[tokio::test]
5527 async fn test_execute_plan_parallel_independent() {
5528 use crate::planning::{Complexity, ExecutionPlan, Task};
5529
5530 let mock_client = Arc::new(MockLlmClient::new(vec![
5533 MockLlmClient::text_response("Step 1 done"),
5534 MockLlmClient::text_response("Step 2 done"),
5535 MockLlmClient::text_response("Step 3 done"),
5536 ]));
5537
5538 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
5539 let config = AgentConfig::default();
5540 let agent = AgentLoop::new(
5541 mock_client.clone(),
5542 tool_executor,
5543 test_tool_context(),
5544 config,
5545 );
5546
5547 let mut plan = ExecutionPlan::new("Test parallel", Complexity::Simple);
5548 plan.add_step(Task::new("s1", "First step"));
5549 plan.add_step(Task::new("s2", "Second step"));
5550 plan.add_step(Task::new("s3", "Third step"));
5551
5552 let (tx, mut rx) = mpsc::channel(100);
5553 let result = agent.execute_plan(&[], &plan, Some(tx)).await.unwrap();
5554
5555 assert_eq!(result.usage.total_tokens, 45);
5557
5558 let mut step_starts = Vec::new();
5560 let mut step_ends = Vec::new();
5561 rx.close();
5562 while let Some(event) = rx.recv().await {
5563 match event {
5564 AgentEvent::StepStart { step_id, .. } => step_starts.push(step_id),
5565 AgentEvent::StepEnd {
5566 step_id, status, ..
5567 } => {
5568 assert_eq!(status, TaskStatus::Completed);
5569 step_ends.push(step_id);
5570 }
5571 _ => {}
5572 }
5573 }
5574 assert_eq!(step_starts.len(), 3);
5575 assert_eq!(step_ends.len(), 3);
5576 }
5577
5578 #[tokio::test]
5579 async fn test_execute_plan_respects_dependencies() {
5580 use crate::planning::{Complexity, ExecutionPlan, Task};
5581
5582 let mock_client = Arc::new(MockLlmClient::new(vec![
5585 MockLlmClient::text_response("Step 1 done"),
5586 MockLlmClient::text_response("Step 2 done"),
5587 MockLlmClient::text_response("Step 3 done"),
5588 ]));
5589
5590 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
5591 let config = AgentConfig::default();
5592 let agent = AgentLoop::new(
5593 mock_client.clone(),
5594 tool_executor,
5595 test_tool_context(),
5596 config,
5597 );
5598
5599 let mut plan = ExecutionPlan::new("Test deps", Complexity::Medium);
5600 plan.add_step(Task::new("s1", "Independent A"));
5601 plan.add_step(Task::new("s2", "Independent B"));
5602 plan.add_step(
5603 Task::new("s3", "Depends on A+B")
5604 .with_dependencies(vec!["s1".to_string(), "s2".to_string()]),
5605 );
5606
5607 let (tx, mut rx) = mpsc::channel(100);
5608 let result = agent.execute_plan(&[], &plan, Some(tx)).await.unwrap();
5609
5610 assert_eq!(result.usage.total_tokens, 45);
5612
5613 let mut events = Vec::new();
5615 rx.close();
5616 while let Some(event) = rx.recv().await {
5617 match &event {
5618 AgentEvent::StepStart { step_id, .. } => {
5619 events.push(format!("start:{}", step_id));
5620 }
5621 AgentEvent::StepEnd { step_id, .. } => {
5622 events.push(format!("end:{}", step_id));
5623 }
5624 _ => {}
5625 }
5626 }
5627
5628 let s1_end = events.iter().position(|e| e == "end:s1").unwrap();
5630 let s2_end = events.iter().position(|e| e == "end:s2").unwrap();
5631 let s3_start = events.iter().position(|e| e == "start:s3").unwrap();
5632 assert!(
5633 s3_start > s1_end,
5634 "s3 started before s1 ended: {:?}",
5635 events
5636 );
5637 assert!(
5638 s3_start > s2_end,
5639 "s3 started before s2 ended: {:?}",
5640 events
5641 );
5642
5643 assert!(result.text.contains("Step 3 done") || !result.text.is_empty());
5645 }
5646
5647 #[tokio::test]
5648 async fn test_execute_plan_handles_step_failure() {
5649 use crate::planning::{Complexity, ExecutionPlan, Task};
5650
5651 let mock_client = Arc::new(MockLlmClient::new(vec![
5661 MockLlmClient::text_response("s1 done"),
5663 MockLlmClient::text_response("s3 done"),
5664 ]));
5667
5668 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
5669 let config = AgentConfig::default();
5670 let agent = AgentLoop::new(
5671 mock_client.clone(),
5672 tool_executor,
5673 test_tool_context(),
5674 config,
5675 );
5676
5677 let mut plan = ExecutionPlan::new("Test failure", Complexity::Medium);
5678 plan.add_step(Task::new("s1", "Independent step"));
5679 plan.add_step(Task::new("s2", "Depends on s1").with_dependencies(vec!["s1".to_string()]));
5680 plan.add_step(Task::new("s3", "Another independent"));
5681 plan.add_step(Task::new("s4", "Depends on s2").with_dependencies(vec!["s2".to_string()]));
5682
5683 let (tx, mut rx) = mpsc::channel(100);
5684 let _result = agent.execute_plan(&[], &plan, Some(tx)).await.unwrap();
5685
5686 let mut completed_steps = Vec::new();
5689 let mut failed_steps = Vec::new();
5690 rx.close();
5691 while let Some(event) = rx.recv().await {
5692 if let AgentEvent::StepEnd {
5693 step_id, status, ..
5694 } = event
5695 {
5696 match status {
5697 TaskStatus::Completed => completed_steps.push(step_id),
5698 TaskStatus::Failed => failed_steps.push(step_id),
5699 _ => {}
5700 }
5701 }
5702 }
5703
5704 assert!(
5705 completed_steps.contains(&"s1".to_string()),
5706 "s1 should complete"
5707 );
5708 assert!(
5709 completed_steps.contains(&"s3".to_string()),
5710 "s3 should complete"
5711 );
5712 assert!(failed_steps.contains(&"s2".to_string()), "s2 should fail");
5713 assert!(
5715 !completed_steps.contains(&"s4".to_string()),
5716 "s4 should not complete"
5717 );
5718 assert!(
5719 !failed_steps.contains(&"s4".to_string()),
5720 "s4 should not fail (never started)"
5721 );
5722 }
5723
5724 #[test]
5729 fn test_agent_config_resilience_defaults() {
5730 let config = AgentConfig::default();
5731 assert_eq!(config.max_parse_retries, 2);
5732 assert_eq!(config.tool_timeout_ms, None);
5733 assert_eq!(config.circuit_breaker_threshold, 3);
5734 }
5735
5736 #[tokio::test]
5738 async fn test_parse_error_recovery_bails_after_threshold() {
5739 let mock_client = Arc::new(MockLlmClient::new(vec![
5741 MockLlmClient::tool_call_response(
5742 "c1",
5743 "bash",
5744 serde_json::json!({"__parse_error": "unexpected token at position 5"}),
5745 ),
5746 MockLlmClient::tool_call_response(
5747 "c2",
5748 "bash",
5749 serde_json::json!({"__parse_error": "missing closing brace"}),
5750 ),
5751 MockLlmClient::tool_call_response(
5752 "c3",
5753 "bash",
5754 serde_json::json!({"__parse_error": "still broken"}),
5755 ),
5756 MockLlmClient::text_response("Done"), ]));
5758
5759 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
5760 let config = AgentConfig {
5761 max_parse_retries: 2,
5762 ..AgentConfig::default()
5763 };
5764 let agent = AgentLoop::new(mock_client, tool_executor, test_tool_context(), config);
5765 let result = agent.execute(&[], "Do something", None).await;
5766 assert!(result.is_err(), "should bail after parse error threshold");
5767 let err = result.unwrap_err().to_string();
5768 assert!(
5769 err.contains("malformed tool arguments"),
5770 "error should mention malformed tool arguments, got: {}",
5771 err
5772 );
5773 }
5774
5775 #[tokio::test]
5777 async fn test_parse_error_counter_resets_on_success() {
5778 let mock_client = Arc::new(MockLlmClient::new(vec![
5782 MockLlmClient::tool_call_response(
5783 "c1",
5784 "bash",
5785 serde_json::json!({"__parse_error": "bad args"}),
5786 ),
5787 MockLlmClient::tool_call_response(
5788 "c2",
5789 "bash",
5790 serde_json::json!({"__parse_error": "bad args again"}),
5791 ),
5792 MockLlmClient::tool_call_response(
5794 "c3",
5795 "bash",
5796 serde_json::json!({"command": "echo ok"}),
5797 ),
5798 MockLlmClient::text_response("All done"),
5799 ]));
5800
5801 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
5802 let config = AgentConfig {
5803 max_parse_retries: 2,
5804 ..AgentConfig::default()
5805 };
5806 let agent = AgentLoop::new(mock_client, tool_executor, test_tool_context(), config);
5807 let result = agent.execute(&[], "Do something", None).await;
5808 assert!(
5809 result.is_ok(),
5810 "should not bail — counter reset after successful tool, got: {:?}",
5811 result.err()
5812 );
5813 assert_eq!(result.unwrap().text, "All done");
5814 }
5815
5816 #[tokio::test]
5818 async fn test_tool_timeout_produces_error_result() {
5819 let mock_client = Arc::new(MockLlmClient::new(vec![
5820 MockLlmClient::tool_call_response(
5821 "t1",
5822 "bash",
5823 serde_json::json!({"command": "sleep 10"}),
5824 ),
5825 MockLlmClient::text_response("The command timed out."),
5826 ]));
5827
5828 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
5829 let config = AgentConfig {
5830 tool_timeout_ms: Some(50),
5832 ..AgentConfig::default()
5833 };
5834 let agent = AgentLoop::new(
5835 mock_client.clone(),
5836 tool_executor,
5837 test_tool_context(),
5838 config,
5839 );
5840 let result = agent.execute(&[], "Run sleep", None).await;
5841 assert!(
5842 result.is_ok(),
5843 "session should continue after tool timeout: {:?}",
5844 result.err()
5845 );
5846 assert_eq!(result.unwrap().text, "The command timed out.");
5847 assert_eq!(mock_client.call_count.load(Ordering::SeqCst), 2);
5849 }
5850
5851 #[tokio::test]
5853 async fn test_tool_within_timeout_succeeds() {
5854 let mock_client = Arc::new(MockLlmClient::new(vec![
5855 MockLlmClient::tool_call_response(
5856 "t1",
5857 "bash",
5858 serde_json::json!({"command": "echo fast"}),
5859 ),
5860 MockLlmClient::text_response("Command succeeded."),
5861 ]));
5862
5863 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
5864 let config = AgentConfig {
5865 tool_timeout_ms: Some(5_000), ..AgentConfig::default()
5867 };
5868 let agent = AgentLoop::new(mock_client, tool_executor, test_tool_context(), config);
5869 let result = agent.execute(&[], "Run something fast", None).await;
5870 assert!(
5871 result.is_ok(),
5872 "fast tool should succeed: {:?}",
5873 result.err()
5874 );
5875 assert_eq!(result.unwrap().text, "Command succeeded.");
5876 }
5877
5878 #[tokio::test]
5880 async fn test_circuit_breaker_retries_non_streaming() {
5881 let mock_client = Arc::new(MockLlmClient::new(vec![]));
5884
5885 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
5886 let config = AgentConfig {
5887 circuit_breaker_threshold: 2,
5888 ..AgentConfig::default()
5889 };
5890 let agent = AgentLoop::new(
5891 mock_client.clone(),
5892 tool_executor,
5893 test_tool_context(),
5894 config,
5895 );
5896 let result = agent.execute(&[], "Hello", None).await;
5897 assert!(result.is_err(), "should fail when LLM always errors");
5898 let err = result.unwrap_err().to_string();
5899 assert!(
5900 err.contains("circuit breaker"),
5901 "error should mention circuit breaker, got: {}",
5902 err
5903 );
5904 assert_eq!(
5905 mock_client.call_count.load(Ordering::SeqCst),
5906 2,
5907 "should make exactly threshold=2 LLM calls"
5908 );
5909 }
5910
5911 #[tokio::test]
5913 async fn test_circuit_breaker_threshold_one_no_retry() {
5914 let mock_client = Arc::new(MockLlmClient::new(vec![]));
5915
5916 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
5917 let config = AgentConfig {
5918 circuit_breaker_threshold: 1,
5919 ..AgentConfig::default()
5920 };
5921 let agent = AgentLoop::new(
5922 mock_client.clone(),
5923 tool_executor,
5924 test_tool_context(),
5925 config,
5926 );
5927 let result = agent.execute(&[], "Hello", None).await;
5928 assert!(result.is_err());
5929 assert_eq!(
5930 mock_client.call_count.load(Ordering::SeqCst),
5931 1,
5932 "with threshold=1 exactly one attempt should be made"
5933 );
5934 }
5935
5936 #[tokio::test]
5938 async fn test_circuit_breaker_succeeds_if_llm_recovers() {
5939 struct FailOnceThenSucceed {
5941 inner: MockLlmClient,
5942 failed_once: std::sync::atomic::AtomicBool,
5943 call_count: AtomicUsize,
5944 }
5945
5946 #[async_trait::async_trait]
5947 impl LlmClient for FailOnceThenSucceed {
5948 async fn complete(
5949 &self,
5950 messages: &[Message],
5951 system: Option<&str>,
5952 tools: &[ToolDefinition],
5953 ) -> Result<LlmResponse> {
5954 self.call_count.fetch_add(1, Ordering::SeqCst);
5955 let already_failed = self
5956 .failed_once
5957 .swap(true, std::sync::atomic::Ordering::SeqCst);
5958 if !already_failed {
5959 anyhow::bail!("transient network error");
5960 }
5961 self.inner.complete(messages, system, tools).await
5962 }
5963
5964 async fn complete_streaming(
5965 &self,
5966 messages: &[Message],
5967 system: Option<&str>,
5968 tools: &[ToolDefinition],
5969 ) -> Result<tokio::sync::mpsc::Receiver<crate::llm::StreamEvent>> {
5970 self.inner.complete_streaming(messages, system, tools).await
5971 }
5972 }
5973
5974 let mock = Arc::new(FailOnceThenSucceed {
5975 inner: MockLlmClient::new(vec![MockLlmClient::text_response("Recovered!")]),
5976 failed_once: std::sync::atomic::AtomicBool::new(false),
5977 call_count: AtomicUsize::new(0),
5978 });
5979
5980 let tool_executor = Arc::new(ToolExecutor::new("/tmp".to_string()));
5981 let config = AgentConfig {
5982 circuit_breaker_threshold: 3,
5983 ..AgentConfig::default()
5984 };
5985 let agent = AgentLoop::new(mock.clone(), tool_executor, test_tool_context(), config);
5986 let result = agent.execute(&[], "Hello", None).await;
5987 assert!(
5988 result.is_ok(),
5989 "should succeed when LLM recovers within threshold: {:?}",
5990 result.err()
5991 );
5992 assert_eq!(result.unwrap().text, "Recovered!");
5993 assert_eq!(
5994 mock.call_count.load(Ordering::SeqCst),
5995 2,
5996 "should have made exactly 2 calls (1 fail + 1 success)"
5997 );
5998 }
5999
6000 #[test]
6003 fn test_looks_incomplete_empty() {
6004 assert!(AgentLoop::looks_incomplete(""));
6005 assert!(AgentLoop::looks_incomplete(" "));
6006 }
6007
6008 #[test]
6009 fn test_looks_incomplete_trailing_colon() {
6010 assert!(AgentLoop::looks_incomplete("Let me check the file:"));
6011 assert!(AgentLoop::looks_incomplete("Next steps:"));
6012 }
6013
6014 #[test]
6015 fn test_looks_incomplete_ellipsis() {
6016 assert!(AgentLoop::looks_incomplete("Working on it..."));
6017 assert!(AgentLoop::looks_incomplete("Processing…"));
6018 }
6019
6020 #[test]
6021 fn test_looks_incomplete_intent_phrases() {
6022 assert!(AgentLoop::looks_incomplete(
6023 "I'll start by reading the file."
6024 ));
6025 assert!(AgentLoop::looks_incomplete(
6026 "Let me check the configuration."
6027 ));
6028 assert!(AgentLoop::looks_incomplete("I will now run the tests."));
6029 assert!(AgentLoop::looks_incomplete(
6030 "I need to update the Cargo.toml."
6031 ));
6032 }
6033
6034 #[test]
6035 fn test_looks_complete_final_answer() {
6036 assert!(!AgentLoop::looks_incomplete(
6038 "The tests pass. All changes have been applied successfully."
6039 ));
6040 assert!(!AgentLoop::looks_incomplete(
6041 "Done. I've updated the three files and verified the build succeeds."
6042 ));
6043 assert!(!AgentLoop::looks_incomplete("42"));
6044 assert!(!AgentLoop::looks_incomplete("Yes."));
6045 }
6046
6047 #[test]
6048 fn test_looks_incomplete_multiline_complete() {
6049 let text = "Here is the summary:\n\n- Fixed the bug in agent.rs\n- All tests pass\n- Build succeeds";
6050 assert!(!AgentLoop::looks_incomplete(text));
6051 }
6052}