1mod builder;
5mod context;
6pub(crate) mod context_manager;
7pub mod error;
8#[cfg(feature = "experiments")]
9mod experiment_cmd;
10pub(super) mod feedback_detector;
11mod graph_commands;
12#[cfg(feature = "index")]
13mod index;
14mod learning;
15pub(crate) mod learning_engine;
16mod log_commands;
17#[cfg(feature = "lsp-context")]
18mod lsp_commands;
19mod mcp;
20mod message_queue;
21mod persistence;
22mod skill_management;
23pub mod slash_commands;
24mod tool_execution;
25pub(crate) mod tool_orchestrator;
26mod trust_commands;
27mod utils;
28
29use std::collections::VecDeque;
30use std::path::PathBuf;
31use std::time::Instant;
32
33use std::sync::Arc;
34
35use tokio::sync::{Notify, mpsc, watch};
36use tokio_util::sync::CancellationToken;
37use zeph_llm::any::AnyProvider;
38use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
39use zeph_llm::stt::SpeechToText;
40
41use crate::metrics::MetricsSnapshot;
42use std::collections::HashMap;
43use zeph_memory::TokenCounter;
44use zeph_memory::semantic::SemanticMemory;
45use zeph_skills::loader::Skill;
46use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
47use zeph_skills::prompt::format_skills_prompt;
48use zeph_skills::registry::SkillRegistry;
49use zeph_skills::watcher::SkillEvent;
50use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
51
52use crate::channel::Channel;
53use crate::config::Config;
54use crate::config::{SecurityConfig, SkillPromptMode, TimeoutConfig};
55use crate::config_watcher::ConfigEvent;
56use crate::context::{
57 ContextBudget, EnvironmentContext, build_system_prompt, build_system_prompt_with_instructions,
58};
59use crate::cost::CostTracker;
60use crate::instructions::{InstructionBlock, InstructionEvent, InstructionReloadState};
61use crate::sanitizer::ContentSanitizer;
62use crate::sanitizer::quarantine::QuarantinedSummarizer;
63use crate::vault::Secret;
64
65use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, QueuedMessage, detect_image_mime};
66
67pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
68pub(crate) const DOCUMENT_RAG_PREFIX: &str = "## Relevant documents\n";
69pub(crate) const RECALL_PREFIX: &str = "[semantic recall]\n";
70pub(crate) const CODE_CONTEXT_PREFIX: &str = "[code context]\n";
71pub(crate) const SUMMARY_PREFIX: &str = "[conversation summaries]\n";
72pub(crate) const CROSS_SESSION_PREFIX: &str = "[cross-session context]\n";
73pub(crate) const CORRECTIONS_PREFIX: &str = "[past corrections]\n";
74pub(crate) const GRAPH_FACTS_PREFIX: &str = "[known facts]\n";
75#[cfg(feature = "lsp-context")]
80pub(crate) const LSP_NOTE_PREFIX: &str = "[lsp ";
81pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
82
83fn format_plan_summary(graph: &crate::orchestration::TaskGraph) -> String {
84 use std::fmt::Write;
85 let mut out = String::new();
86 let _ = writeln!(out, "Plan: \"{}\"", graph.goal);
87 let _ = writeln!(out, "Tasks: {}", graph.tasks.len());
88 let _ = writeln!(out);
89 for (i, task) in graph.tasks.iter().enumerate() {
90 let deps = if task.depends_on.is_empty() {
91 String::new()
92 } else {
93 let ids: Vec<String> = task.depends_on.iter().map(ToString::to_string).collect();
94 format!(" (after: {})", ids.join(", "))
95 };
96 let agent = task.agent_hint.as_deref().unwrap_or("-");
97 let _ = writeln!(out, " {}. [{}] {}{}", i + 1, agent, task.title, deps);
98 }
99 out
100}
101
102pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
103 use std::fmt::Write;
104 let capacity = "[tool output: ".len()
105 + tool_name.len()
106 + "]\n```\n".len()
107 + body.len()
108 + TOOL_OUTPUT_SUFFIX.len();
109 let mut buf = String::with_capacity(capacity);
110 let _ = write!(
111 buf,
112 "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
113 );
114 buf
115}
116
117pub(super) struct MemoryState {
118 pub(super) memory: Option<Arc<SemanticMemory>>,
119 pub(super) conversation_id: Option<zeph_memory::ConversationId>,
120 pub(super) history_limit: u32,
121 pub(super) recall_limit: usize,
122 pub(super) summarization_threshold: usize,
123 pub(super) cross_session_score_threshold: f32,
124 pub(super) autosave_assistant: bool,
125 pub(super) autosave_min_length: usize,
126 pub(super) tool_call_cutoff: usize,
127 pub(super) unsummarized_count: usize,
128 pub(super) document_config: crate::config::DocumentConfig,
129 pub(super) graph_config: crate::config::GraphConfig,
130}
131
132pub(super) struct SkillState {
133 pub(super) registry: std::sync::Arc<std::sync::RwLock<SkillRegistry>>,
134 pub(super) skill_paths: Vec<PathBuf>,
135 pub(super) managed_dir: Option<PathBuf>,
136 pub(super) trust_config: crate::config::TrustConfig,
137 pub(super) matcher: Option<SkillMatcherBackend>,
138 pub(super) max_active_skills: usize,
139 pub(super) disambiguation_threshold: f32,
140 pub(super) embedding_model: String,
141 pub(super) skill_reload_rx: Option<mpsc::Receiver<SkillEvent>>,
142 pub(super) active_skill_names: Vec<String>,
143 pub(super) last_skills_prompt: String,
144 pub(super) prompt_mode: SkillPromptMode,
145 pub(super) available_custom_secrets: HashMap<String, Secret>,
147 pub(super) cosine_weight: f32,
148 pub(super) hybrid_search: bool,
149 pub(super) bm25_index: Option<zeph_skills::bm25::Bm25Index>,
150}
151
152pub(super) struct McpState {
153 pub(super) tools: Vec<zeph_mcp::McpTool>,
154 pub(super) registry: Option<zeph_mcp::McpToolRegistry>,
155 pub(super) manager: Option<std::sync::Arc<zeph_mcp::McpManager>>,
156 pub(super) allowed_commands: Vec<String>,
157 pub(super) max_dynamic: usize,
158 pub(super) shared_tools: Option<std::sync::Arc<std::sync::RwLock<Vec<zeph_mcp::McpTool>>>>,
160}
161
162#[cfg(feature = "index")]
163pub(super) struct IndexState {
164 pub(super) retriever: Option<std::sync::Arc<zeph_index::retriever::CodeRetriever>>,
165 pub(super) repo_map_tokens: usize,
166 pub(super) cached_repo_map: Option<(String, std::time::Instant)>,
167 pub(super) repo_map_ttl: std::time::Duration,
168}
169
170pub(super) struct RuntimeConfig {
171 pub(super) security: SecurityConfig,
172 pub(super) timeouts: TimeoutConfig,
173 pub(super) model_name: String,
174 pub(super) permission_policy: zeph_tools::PermissionPolicy,
175 pub(super) redact_credentials: bool,
176}
177
178pub struct Agent<C: Channel> {
179 provider: AnyProvider,
180 channel: C,
181 pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
182 messages: Vec<Message>,
183 pub(super) memory_state: MemoryState,
184 pub(super) skill_state: SkillState,
185 pub(super) context_manager: context_manager::ContextManager,
186 pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
187 pub(super) learning_engine: learning_engine::LearningEngine,
188 pub(super) feedback_detector: feedback_detector::FeedbackDetector,
189 pub(super) judge_detector: Option<feedback_detector::JudgeDetector>,
190 pub(super) judge_provider: Option<AnyProvider>,
191 config_path: Option<PathBuf>,
192 pub(super) logging_config: crate::config::LoggingConfig,
193 config_reload_rx: Option<mpsc::Receiver<ConfigEvent>>,
194 shutdown: watch::Receiver<bool>,
195 metrics_tx: Option<watch::Sender<MetricsSnapshot>>,
196 pub(super) runtime: RuntimeConfig,
197 pub(super) mcp: McpState,
198 #[cfg(feature = "index")]
199 pub(super) index: IndexState,
200 cancel_signal: Arc<Notify>,
201 cancel_token: CancellationToken,
202 start_time: Instant,
203 message_queue: VecDeque<QueuedMessage>,
204 summary_provider: Option<AnyProvider>,
205 provider_override: Option<Arc<std::sync::RwLock<Option<AnyProvider>>>>,
207 warmup_ready: Option<watch::Receiver<bool>>,
208 cost_tracker: Option<CostTracker>,
209 cached_prompt_tokens: u64,
210 env_context: EnvironmentContext,
211 pub(crate) token_counter: Arc<TokenCounter>,
212 stt: Option<Box<dyn SpeechToText>>,
213 update_notify_rx: Option<mpsc::Receiver<String>>,
214 custom_task_rx: Option<mpsc::Receiver<String>>,
215 pub(crate) subagent_manager: Option<crate::subagent::SubAgentManager>,
219 pub(crate) subagent_config: crate::config::SubAgentConfig,
220 pub(crate) orchestration_config: crate::config::OrchestrationConfig,
221 #[cfg(feature = "experiments")]
222 pub(super) experiment_config: crate::config::ExperimentConfig,
223 pub(super) response_cache: Option<std::sync::Arc<zeph_memory::ResponseCache>>,
224 pub(crate) parent_tool_use_id: Option<String>,
228 pub(super) anomaly_detector: Option<zeph_tools::AnomalyDetector>,
229 pub(super) instruction_blocks: Vec<InstructionBlock>,
231 pub(super) instruction_reload_rx: Option<mpsc::Receiver<InstructionEvent>>,
232 pub(super) instruction_reload_state: Option<InstructionReloadState>,
233 pub(super) sanitizer: ContentSanitizer,
235 pub(super) quarantine_summarizer: Option<QuarantinedSummarizer>,
237 pub(super) exfiltration_guard: crate::sanitizer::exfiltration::ExfiltrationGuard,
239 pub(super) flagged_urls: std::collections::HashSet<String>,
242 pending_image_parts: Vec<zeph_llm::provider::MessagePart>,
244 pub(super) pending_graph: Option<crate::orchestration::TaskGraph>,
246 pub(super) debug_dumper: Option<crate::debug_dump::DebugDumper>,
250 pub(super) dump_format: crate::debug_dump::DumpFormat,
252 #[cfg(feature = "lsp-context")]
255 pub(super) lsp_hooks: Option<crate::lsp_hooks::LspHookRunner>,
256 #[cfg(feature = "experiments")]
258 pub(super) experiment_cancel: Option<tokio_util::sync::CancellationToken>,
259 #[cfg(feature = "experiments")]
262 pub(super) experiment_baseline: crate::experiments::ConfigSnapshot,
263 pub(super) experiment_notify_rx: Option<tokio::sync::mpsc::Receiver<String>>,
268 #[cfg(feature = "experiments")]
271 pub(super) experiment_notify_tx: tokio::sync::mpsc::Sender<String>,
272}
273
274impl<C: Channel> Agent<C> {
275 #[must_use]
276 #[allow(clippy::too_many_lines)]
277 pub fn new(
278 provider: AnyProvider,
279 channel: C,
280 registry: SkillRegistry,
281 matcher: Option<SkillMatcherBackend>,
282 max_active_skills: usize,
283 tool_executor: impl ToolExecutor + 'static,
284 ) -> Self {
285 let registry = std::sync::Arc::new(std::sync::RwLock::new(registry));
286 Self::new_with_registry_arc(
287 provider,
288 channel,
289 registry,
290 matcher,
291 max_active_skills,
292 tool_executor,
293 )
294 }
295
296 #[must_use]
303 #[allow(clippy::too_many_lines)]
304 pub fn new_with_registry_arc(
305 provider: AnyProvider,
306 channel: C,
307 registry: std::sync::Arc<std::sync::RwLock<SkillRegistry>>,
308 matcher: Option<SkillMatcherBackend>,
309 max_active_skills: usize,
310 tool_executor: impl ToolExecutor + 'static,
311 ) -> Self {
312 let all_skills: Vec<Skill> = {
313 let reg = registry.read().expect("registry read lock poisoned");
314 reg.all_meta()
315 .iter()
316 .filter_map(|m| reg.get_skill(&m.name).ok())
317 .collect()
318 };
319 let empty_trust = HashMap::new();
320 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
321 let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
322 let system_prompt = build_system_prompt(&skills_prompt, None, None, false);
323 tracing::debug!(len = system_prompt.len(), "initial system prompt built");
324 tracing::trace!(prompt = %system_prompt, "full system prompt");
325
326 let initial_prompt_tokens = u64::try_from(system_prompt.len()).unwrap_or(0) / 4;
327 let (_tx, rx) = watch::channel(false);
328 let token_counter = Arc::new(TokenCounter::new());
329 #[cfg(feature = "experiments")]
333 let (exp_notify_tx, exp_notify_rx) = tokio::sync::mpsc::channel::<String>(4);
334 #[cfg(not(feature = "experiments"))]
335 let (_exp_notify_tx, exp_notify_rx) = tokio::sync::mpsc::channel::<String>(4);
336 Self {
337 provider,
338 channel,
339 tool_executor: Arc::new(tool_executor),
340 messages: vec![Message {
341 role: Role::System,
342 content: system_prompt,
343 parts: vec![],
344 metadata: MessageMetadata::default(),
345 }],
346 memory_state: MemoryState {
347 memory: None,
348 conversation_id: None,
349 history_limit: 50,
350 recall_limit: 5,
351 summarization_threshold: 50,
352 cross_session_score_threshold: 0.35,
353 autosave_assistant: false,
354 autosave_min_length: 20,
355 tool_call_cutoff: 6,
356 unsummarized_count: 0,
357 document_config: crate::config::DocumentConfig::default(),
358 graph_config: crate::config::GraphConfig::default(),
359 },
360 skill_state: SkillState {
361 registry,
362 skill_paths: Vec::new(),
363 managed_dir: None,
364 trust_config: crate::config::TrustConfig::default(),
365 matcher,
366 max_active_skills,
367 disambiguation_threshold: 0.05,
368 embedding_model: String::new(),
369 skill_reload_rx: None,
370 active_skill_names: Vec::new(),
371 last_skills_prompt: skills_prompt,
372 prompt_mode: SkillPromptMode::Auto,
373 available_custom_secrets: HashMap::new(),
374 cosine_weight: 0.7,
375 hybrid_search: false,
376 bm25_index: None,
377 },
378 context_manager: context_manager::ContextManager::new(),
379 tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
380 learning_engine: learning_engine::LearningEngine::new(),
381 feedback_detector: feedback_detector::FeedbackDetector::new(0.6),
382 judge_detector: None,
383 judge_provider: None,
384 config_path: None,
385 logging_config: crate::config::LoggingConfig::default(),
386 config_reload_rx: None,
387 shutdown: rx,
388 metrics_tx: None,
389 runtime: RuntimeConfig {
390 security: SecurityConfig::default(),
391 timeouts: TimeoutConfig::default(),
392 model_name: String::new(),
393 permission_policy: zeph_tools::PermissionPolicy::default(),
394 redact_credentials: true,
395 },
396 mcp: McpState {
397 tools: Vec::new(),
398 registry: None,
399 manager: None,
400 allowed_commands: Vec::new(),
401 max_dynamic: 10,
402 shared_tools: None,
403 },
404 #[cfg(feature = "index")]
405 index: IndexState {
406 retriever: None,
407 repo_map_tokens: 0,
408 cached_repo_map: None,
409 repo_map_ttl: std::time::Duration::from_secs(300),
410 },
411 cancel_signal: Arc::new(Notify::new()),
412 cancel_token: CancellationToken::new(),
413 start_time: Instant::now(),
414 message_queue: VecDeque::new(),
415 summary_provider: None,
416 provider_override: None,
417 warmup_ready: None,
418 cost_tracker: None,
419 cached_prompt_tokens: initial_prompt_tokens,
420 env_context: EnvironmentContext::gather(""),
421 token_counter,
422 stt: None,
423 update_notify_rx: None,
424 custom_task_rx: None,
425 subagent_manager: None,
426 subagent_config: crate::config::SubAgentConfig::default(),
427 orchestration_config: crate::config::OrchestrationConfig::default(),
428 #[cfg(feature = "experiments")]
429 experiment_config: crate::config::ExperimentConfig::default(),
430 #[cfg(feature = "experiments")]
431 experiment_baseline: crate::experiments::ConfigSnapshot::default(),
432 experiment_notify_rx: Some(exp_notify_rx),
433 #[cfg(feature = "experiments")]
434 experiment_notify_tx: exp_notify_tx,
435 response_cache: None,
436 parent_tool_use_id: None,
437 anomaly_detector: None,
438 instruction_blocks: Vec::new(),
439 instruction_reload_rx: None,
440 instruction_reload_state: None,
441 sanitizer: ContentSanitizer::new(&crate::sanitizer::ContentIsolationConfig::default()),
442 quarantine_summarizer: None,
443 exfiltration_guard: crate::sanitizer::exfiltration::ExfiltrationGuard::new(
444 crate::sanitizer::exfiltration::ExfiltrationGuardConfig::default(),
445 ),
446 flagged_urls: std::collections::HashSet::new(),
447 pending_image_parts: Vec::new(),
448 pending_graph: None,
449 debug_dumper: None,
450 dump_format: crate::debug_dump::DumpFormat::default(),
451 #[cfg(feature = "lsp-context")]
452 lsp_hooks: None,
453 #[cfg(feature = "experiments")]
454 experiment_cancel: None,
455 }
456 }
457
458 pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
463 let Some(mgr) = &mut self.subagent_manager else {
464 return vec![];
465 };
466
467 let finished: Vec<String> = mgr
468 .statuses()
469 .into_iter()
470 .filter_map(|(id, status)| {
471 if matches!(
472 status.state,
473 crate::subagent::SubAgentState::Completed
474 | crate::subagent::SubAgentState::Failed
475 | crate::subagent::SubAgentState::Canceled
476 ) {
477 Some(id)
478 } else {
479 None
480 }
481 })
482 .collect();
483
484 let mut results = vec![];
485 for task_id in finished {
486 match mgr.collect(&task_id).await {
487 Ok(result) => results.push((task_id, result)),
488 Err(e) => {
489 tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
490 }
491 }
492 }
493 results
494 }
495
496 async fn handle_plan_command(
497 &mut self,
498 cmd: crate::orchestration::PlanCommand,
499 ) -> Result<(), error::AgentError> {
500 use crate::orchestration::PlanCommand;
501
502 if !self.config_for_orchestration().enabled {
503 self.channel
504 .send(
505 "Task orchestration is disabled. Set `orchestration.enabled = true` in config.",
506 )
507 .await?;
508 return Ok(());
509 }
510
511 match cmd {
512 PlanCommand::Goal(goal) => self.handle_plan_goal(&goal).await,
513 PlanCommand::Confirm => self.handle_plan_confirm().await,
514 PlanCommand::Status(id) => self.handle_plan_status(id.as_deref()).await,
515 PlanCommand::List => self.handle_plan_list().await,
516 PlanCommand::Cancel(id) => self.handle_plan_cancel(id.as_deref()).await,
517 PlanCommand::Resume(id) => self.handle_plan_resume(id.as_deref()).await,
518 PlanCommand::Retry(id) => self.handle_plan_retry(id.as_deref()).await,
519 }
520 }
521
522 fn config_for_orchestration(&self) -> &crate::config::OrchestrationConfig {
523 &self.orchestration_config
524 }
525
526 async fn handle_plan_goal(&mut self, goal: &str) -> Result<(), error::AgentError> {
527 use crate::orchestration::{LlmPlanner, Planner};
528
529 if self.pending_graph.is_some() {
530 self.channel
531 .send(
532 "A plan is already pending confirmation. \
533 Use /plan confirm to execute it or /plan cancel to discard.",
534 )
535 .await?;
536 return Ok(());
537 }
538
539 self.channel.send("Planning task decomposition...").await?;
540
541 let available_agents = self
542 .subagent_manager
543 .as_ref()
544 .map(|m| m.definitions().to_vec())
545 .unwrap_or_default();
546
547 let confirm_before_execute = self.orchestration_config.confirm_before_execute;
548 let graph = LlmPlanner::new(self.provider.clone(), &self.orchestration_config)
549 .plan(goal, &available_agents)
550 .await
551 .map_err(|e| error::AgentError::Other(e.to_string()))?;
552
553 let task_count = graph.tasks.len() as u64;
554 let snapshot = crate::metrics::TaskGraphSnapshot::from(&graph);
555 self.update_metrics(|m| {
556 m.orchestration.plans_total += 1;
557 m.orchestration.tasks_total += task_count;
558 m.orchestration_graph = Some(snapshot);
559 });
560
561 if confirm_before_execute {
562 let summary = format_plan_summary(&graph);
563 self.channel.send(&summary).await?;
564 self.channel
565 .send("Type `/plan confirm` to execute, or `/plan cancel` to abort.")
566 .await?;
567 self.pending_graph = Some(graph);
568 } else {
569 let summary = format_plan_summary(&graph);
572 self.channel.send(&summary).await?;
573 self.channel
574 .send("Plan ready. Full execution will be available in a future phase.")
575 .await?;
576 let now = std::time::Instant::now();
578 self.update_metrics(|m| {
579 if let Some(ref mut s) = m.orchestration_graph {
580 "completed".clone_into(&mut s.status);
581 s.completed_at = Some(now);
582 }
583 });
584 }
585
586 Ok(())
587 }
588
589 async fn handle_plan_confirm(&mut self) -> Result<(), error::AgentError> {
590 use crate::orchestration::{Aggregator, LlmAggregator};
591
592 let Some(graph) = self.pending_graph.take() else {
593 self.channel
594 .send("No pending plan to confirm. Use `/plan <goal>` to create one.")
595 .await?;
596 return Ok(());
597 };
598 let summary = format_plan_summary(&graph);
599 self.channel
600 .send(&format!("Confirmed. Executing plan:\n{summary}"))
601 .await?;
602
603 let aggregator = LlmAggregator::new(self.provider.clone(), &self.orchestration_config);
607 let final_status = match aggregator.aggregate(&graph).await {
608 Ok(synthesis) => {
609 self.channel.send(&synthesis).await?;
610 "completed"
611 }
612 Err(e) => {
613 tracing::error!(error = %e, "aggregation failed after plan confirm");
614 self.channel
615 .send("Plan confirmed. Execution and aggregation will run when the scheduler is wired.")
616 .await?;
617 "failed"
618 }
619 };
620 let now = std::time::Instant::now();
623 self.update_metrics(|m| {
624 if let Some(ref mut s) = m.orchestration_graph {
625 final_status.clone_into(&mut s.status);
626 s.completed_at = Some(now);
627 }
628 });
629 Ok(())
630 }
631
632 async fn handle_plan_status(
633 &mut self,
634 _graph_id: Option<&str>,
635 ) -> Result<(), error::AgentError> {
636 if self.pending_graph.is_some() {
637 self.channel
638 .send("A plan is awaiting confirmation. Type `/plan confirm` to execute or `/plan cancel` to abort.")
639 .await?;
640 } else {
641 self.channel.send("No active plan.").await?;
642 }
643 Ok(())
644 }
645
646 async fn handle_plan_list(&mut self) -> Result<(), error::AgentError> {
647 self.channel.send("No recent plans.").await?;
648 Ok(())
649 }
650
651 async fn handle_plan_cancel(
652 &mut self,
653 _graph_id: Option<&str>,
654 ) -> Result<(), error::AgentError> {
655 if self.pending_graph.take().is_some() {
656 let now = std::time::Instant::now();
657 self.update_metrics(|m| {
658 if let Some(ref mut s) = m.orchestration_graph {
659 "canceled".clone_into(&mut s.status);
660 s.completed_at = Some(now);
661 }
662 });
663 self.channel.send("Plan canceled.").await?;
664 } else {
665 self.channel.send("No active plan to cancel.").await?;
666 }
667 Ok(())
668 }
669
670 async fn handle_plan_resume(
675 &mut self,
676 graph_id: Option<&str>,
677 ) -> Result<(), error::AgentError> {
678 use crate::orchestration::GraphStatus;
679
680 let Some(ref graph) = self.pending_graph else {
681 self.channel
682 .send("No paused plan to resume. Use `/plan status` to check the current state.")
683 .await?;
684 return Ok(());
685 };
686
687 if let Some(id) = graph_id
689 && graph.id.to_string() != id
690 {
691 self.channel
692 .send(&format!(
693 "Graph id '{id}' does not match the active plan ({}). \
694 Use `/plan status` to see the active plan id.",
695 graph.id
696 ))
697 .await?;
698 return Ok(());
699 }
700
701 if graph.status != GraphStatus::Paused {
702 self.channel
703 .send(&format!(
704 "The active plan is in '{}' status and cannot be resumed. \
705 Only Paused plans can be resumed.",
706 graph.status
707 ))
708 .await?;
709 return Ok(());
710 }
711
712 let graph = self.pending_graph.take().unwrap();
713
714 tracing::info!(
715 graph_id = %graph.id,
716 "resuming paused graph"
717 );
718
719 self.channel
720 .send(&format!(
721 "Resuming plan: {}\nScheduler execution will run in a future integration phase.",
722 graph.goal
723 ))
724 .await?;
725
726 self.pending_graph = Some(graph);
728 Ok(())
729 }
730
731 async fn handle_plan_retry(&mut self, graph_id: Option<&str>) -> Result<(), error::AgentError> {
737 use crate::orchestration::{GraphStatus, dag};
738
739 let Some(ref graph) = self.pending_graph else {
740 self.channel
741 .send("No active plan to retry. Use `/plan status` to check the current state.")
742 .await?;
743 return Ok(());
744 };
745
746 if let Some(id) = graph_id
748 && graph.id.to_string() != id
749 {
750 self.channel
751 .send(&format!(
752 "Graph id '{id}' does not match the active plan ({}). \
753 Use `/plan status` to see the active plan id.",
754 graph.id
755 ))
756 .await?;
757 return Ok(());
758 }
759
760 if graph.status != GraphStatus::Failed && graph.status != GraphStatus::Paused {
761 self.channel
762 .send(&format!(
763 "The active plan is in '{}' status. Only Failed or Paused plans can be retried.",
764 graph.status
765 ))
766 .await?;
767 return Ok(());
768 }
769
770 let mut graph = self.pending_graph.take().unwrap();
771
772 let failed_count = graph
774 .tasks
775 .iter()
776 .filter(|t| t.status == crate::orchestration::TaskStatus::Failed)
777 .count();
778
779 dag::reset_for_retry(&mut graph).map_err(|e| error::AgentError::Other(e.to_string()))?;
780
781 tracing::info!(
782 graph_id = %graph.id,
783 failed_count,
784 "retrying failed tasks in graph"
785 );
786
787 self.channel
788 .send(&format!(
789 "Retrying {failed_count} failed task(s) in plan: {}\n\
790 Scheduler execution will run in a future integration phase.",
791 graph.goal
792 ))
793 .await?;
794
795 self.pending_graph = Some(graph);
797 Ok(())
798 }
799
800 pub async fn shutdown(&mut self) {
801 self.channel.send("Shutting down...").await.ok();
802
803 self.provider.save_router_state();
805
806 if let Some(ref mut mgr) = self.subagent_manager {
807 mgr.shutdown_all();
808 }
809
810 if let Some(ref manager) = self.mcp.manager {
811 manager.shutdown_all_shared().await;
812 }
813
814 if let Some(ref tx) = self.metrics_tx {
815 let m = tx.borrow();
816 if m.filter_applications > 0 {
817 #[allow(clippy::cast_precision_loss)]
818 let pct = if m.filter_raw_tokens > 0 {
819 m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
820 } else {
821 0.0
822 };
823 tracing::info!(
824 raw_tokens = m.filter_raw_tokens,
825 saved_tokens = m.filter_saved_tokens,
826 applications = m.filter_applications,
827 "tool output filtering saved ~{} tokens ({pct:.0}%)",
828 m.filter_saved_tokens,
829 );
830 }
831 }
832 tracing::info!("agent shutdown complete");
833 }
834
835 #[allow(clippy::too_many_lines)]
841 pub async fn run(&mut self) -> anyhow::Result<()> {
842 if let Some(mut rx) = self.warmup_ready.take()
843 && !*rx.borrow()
844 {
845 let _ = rx.changed().await;
846 if !*rx.borrow() {
847 tracing::warn!("model warmup did not complete successfully");
848 }
849 }
850
851 loop {
852 if let Some(ref slot) = self.provider_override
854 && let Some(new_provider) = slot
855 .write()
856 .unwrap_or_else(std::sync::PoisonError::into_inner)
857 .take()
858 {
859 tracing::debug!(provider = new_provider.name(), "ACP model override applied");
860 self.provider = new_provider;
861 }
862
863 if let Some(ref mgr) = self.subagent_manager {
865 let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
866 .statuses()
867 .into_iter()
868 .map(|(id, s)| {
869 let def = mgr.agents_def(&id);
870 crate::metrics::SubAgentMetrics {
871 name: def.map_or_else(
872 || id[..8.min(id.len())].to_owned(),
873 |d| d.name.clone(),
874 ),
875 id: id.clone(),
876 state: format!("{:?}", s.state).to_lowercase(),
877 turns_used: s.turns_used,
878 max_turns: def.map_or(20, |d| d.permissions.max_turns),
879 background: def.is_some_and(|d| d.permissions.background),
880 elapsed_secs: s.started_at.elapsed().as_secs(),
881 permission_mode: def.map_or_else(String::new, |d| {
882 use crate::subagent::def::PermissionMode;
883 match d.permissions.permission_mode {
884 PermissionMode::Default => String::new(),
885 PermissionMode::AcceptEdits => "accept_edits".into(),
886 PermissionMode::DontAsk => "dont_ask".into(),
887 PermissionMode::BypassPermissions => {
888 "bypass_permissions".into()
889 }
890 PermissionMode::Plan => "plan".into(),
891 }
892 }),
893 }
894 })
895 .collect();
896 self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
897 }
898
899 let completed = self.poll_subagents().await;
901 for (task_id, result) in completed {
902 let notice = if result.is_empty() {
903 format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
904 } else {
905 format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
906 };
907 if let Err(e) = self.channel.send(¬ice).await {
908 tracing::warn!(error = %e, "failed to send sub-agent completion notice");
909 }
910 }
911
912 self.drain_channel();
913
914 let (text, image_parts) = if let Some(queued) = self.message_queue.pop_front() {
915 self.notify_queue_count().await;
916 if queued.raw_attachments.is_empty() {
917 (queued.text, queued.image_parts)
918 } else {
919 let msg = crate::channel::ChannelMessage {
920 text: queued.text,
921 attachments: queued.raw_attachments,
922 };
923 self.resolve_message(msg).await
924 }
925 } else {
926 let incoming = tokio::select! {
927 result = self.channel.recv() => result?,
928 () = shutdown_signal(&mut self.shutdown) => {
929 tracing::info!("shutting down");
930 break;
931 }
932 Some(_) = recv_optional(&mut self.skill_state.skill_reload_rx) => {
933 self.reload_skills().await;
934 continue;
935 }
936 Some(_) = recv_optional(&mut self.instruction_reload_rx) => {
937 self.reload_instructions();
938 continue;
939 }
940 Some(_) = recv_optional(&mut self.config_reload_rx) => {
941 self.reload_config();
942 continue;
943 }
944 Some(msg) = recv_optional(&mut self.update_notify_rx) => {
945 if let Err(e) = self.channel.send(&msg).await {
946 tracing::warn!("failed to send update notification: {e}");
947 }
948 continue;
949 }
950 Some(msg) = recv_optional(&mut self.experiment_notify_rx) => {
951 #[cfg(feature = "experiments")]
954 { self.experiment_cancel = None; }
955 if let Err(e) = self.channel.send(&msg).await {
956 tracing::warn!("failed to send experiment completion: {e}");
957 }
958 continue;
959 }
960 Some(prompt) = recv_optional(&mut self.custom_task_rx) => {
961 tracing::info!("scheduler: injecting custom task as agent turn");
962 let text = format!("[Scheduled task] {prompt}");
963 Some(crate::channel::ChannelMessage { text, attachments: Vec::new() })
964 }
965 };
966 let Some(msg) = incoming else { break };
967 self.drain_channel();
968 self.resolve_message(msg).await
969 };
970
971 let trimmed = text.trim();
972
973 if trimmed == "/clear-queue" {
974 let n = self.clear_queue();
975 self.notify_queue_count().await;
976 self.channel
977 .send(&format!("Cleared {n} queued messages."))
978 .await?;
979 let _ = self.channel.flush_chunks().await;
980 continue;
981 }
982
983 if trimmed == "/compact" {
984 if self.messages.len() > self.context_manager.compaction_preserve_tail + 1 {
985 match self.compact_context().await {
986 Ok(()) => {
987 let _ = self.channel.send("Context compacted successfully.").await;
988 }
989 Err(e) => {
990 let _ = self.channel.send(&format!("Compaction failed: {e}")).await;
991 }
992 }
993 } else {
994 let _ = self.channel.send("Nothing to compact.").await;
995 }
996 let _ = self.channel.flush_chunks().await;
997 continue;
998 }
999
1000 if trimmed == "/clear" {
1001 self.clear_history();
1002 let _ = self.channel.flush_chunks().await;
1003 continue;
1004 }
1005
1006 if trimmed == "/model" || trimmed.starts_with("/model ") {
1007 self.handle_model_command(trimmed).await;
1008 let _ = self.channel.flush_chunks().await;
1009 continue;
1010 }
1011
1012 if trimmed == "/debug-dump" || trimmed.starts_with("/debug-dump ") {
1013 self.handle_debug_dump_command(trimmed).await;
1014 let _ = self.channel.flush_chunks().await;
1015 continue;
1016 }
1017
1018 if trimmed == "/exit" || trimmed == "/quit" {
1019 if self.channel.supports_exit() {
1020 break;
1021 }
1022 let _ = self
1023 .channel
1024 .send("/exit is not supported in this channel.")
1025 .await;
1026 continue;
1027 }
1028
1029 self.process_user_message(text, image_parts).await?;
1030 }
1031
1032 Ok(())
1033 }
1034
1035 pub fn set_model(&mut self, model_id: &str) -> Result<(), String> {
1043 if model_id.is_empty() {
1044 return Err("model id must not be empty".to_string());
1045 }
1046 if model_id.len() > 256 {
1047 return Err("model id exceeds maximum length of 256 characters".to_string());
1048 }
1049 if !model_id
1050 .chars()
1051 .all(|c| c.is_ascii() && !c.is_ascii_control())
1052 {
1053 return Err("model id must contain only printable ASCII characters".to_string());
1054 }
1055 self.runtime.model_name = model_id.to_string();
1056 tracing::info!(model = model_id, "set_model called");
1057 Ok(())
1058 }
1059
1060 #[allow(clippy::too_many_lines)]
1062 async fn handle_model_command(&mut self, trimmed: &str) {
1063 let arg = trimmed.strip_prefix("/model").map_or("", str::trim);
1064
1065 if arg == "refresh" {
1066 if let Some(cache_dir) = dirs::cache_dir() {
1068 let models_dir = cache_dir.join("zeph").join("models");
1069 if let Ok(entries) = std::fs::read_dir(&models_dir) {
1070 for entry in entries.flatten() {
1071 let path = entry.path();
1072 if path.extension().and_then(|e| e.to_str()) == Some("json") {
1073 let _ = std::fs::remove_file(&path);
1074 }
1075 }
1076 }
1077 }
1078 match self.provider.list_models_remote().await {
1079 Ok(models) => {
1080 let _ = self
1081 .channel
1082 .send(&format!("Fetched {} models.", models.len()))
1083 .await;
1084 }
1085 Err(e) => {
1086 let _ = self
1087 .channel
1088 .send(&format!("Error fetching models: {e}"))
1089 .await;
1090 }
1091 }
1092 return;
1093 }
1094
1095 if arg.is_empty() {
1096 let cache = zeph_llm::model_cache::ModelCache::for_slug(self.provider.name());
1098 let models = if cache.is_stale() {
1099 None
1100 } else {
1101 cache.load().unwrap_or(None)
1102 };
1103 let models = if let Some(m) = models {
1104 m
1105 } else {
1106 match self.provider.list_models_remote().await {
1107 Ok(m) => m,
1108 Err(e) => {
1109 let _ = self
1110 .channel
1111 .send(&format!("Error fetching models: {e}"))
1112 .await;
1113 return;
1114 }
1115 }
1116 };
1117
1118 if models.is_empty() {
1119 let _ = self.channel.send("No models available.").await;
1120 return;
1121 }
1122 let mut lines = vec!["Available models:".to_string()];
1123 for (i, m) in models.iter().enumerate() {
1124 lines.push(format!(" {}. {} ({})", i + 1, m.display_name, m.id));
1125 }
1126 let _ = self.channel.send(&lines.join("\n")).await;
1127 return;
1128 }
1129
1130 let model_id = arg;
1132
1133 let cache = zeph_llm::model_cache::ModelCache::for_slug(self.provider.name());
1136 let known_models: Option<Vec<zeph_llm::model_cache::RemoteModelInfo>> = if cache.is_stale()
1137 {
1138 match self.provider.list_models_remote().await {
1139 Ok(m) if !m.is_empty() => Some(m),
1140 _ => None,
1141 }
1142 } else {
1143 cache.load().unwrap_or(None)
1144 };
1145 if let Some(models) = known_models {
1146 if !models.iter().any(|m| m.id == model_id) {
1147 let mut lines = vec![format!("Unknown model '{model_id}'. Available models:")];
1148 for m in &models {
1149 lines.push(format!(" • {} ({})", m.display_name, m.id));
1150 }
1151 let _ = self.channel.send(&lines.join("\n")).await;
1152 return;
1153 }
1154 } else {
1155 let _ = self
1156 .channel
1157 .send(
1158 "Model list unavailable, switching anyway — verify your model name is correct.",
1159 )
1160 .await;
1161 }
1162
1163 match self.set_model(model_id) {
1164 Ok(()) => {
1165 let _ = self
1166 .channel
1167 .send(&format!("Switched to model: {model_id}"))
1168 .await;
1169 }
1170 Err(e) => {
1171 let _ = self.channel.send(&format!("Error: {e}")).await;
1172 }
1173 }
1174 }
1175
1176 async fn handle_debug_dump_command(&mut self, trimmed: &str) {
1178 let arg = trimmed.strip_prefix("/debug-dump").map_or("", str::trim);
1179 if arg.is_empty() {
1180 match &self.debug_dumper {
1181 Some(d) => {
1182 let _ = self
1183 .channel
1184 .send(&format!("Debug dump active: {}", d.dir().display()))
1185 .await;
1186 }
1187 None => {
1188 let _ = self
1189 .channel
1190 .send(
1191 "Debug dump is inactive. Use `/debug-dump <path>` to enable, \
1192 or start with `--debug-dump [dir]`.",
1193 )
1194 .await;
1195 }
1196 }
1197 return;
1198 }
1199 let dir = std::path::PathBuf::from(arg);
1200 match crate::debug_dump::DebugDumper::new(&dir, self.dump_format) {
1201 Ok(dumper) => {
1202 let path = dumper.dir().display().to_string();
1203 self.debug_dumper = Some(dumper);
1204 let _ = self
1205 .channel
1206 .send(&format!("Debug dump enabled: {path}"))
1207 .await;
1208 }
1209 Err(e) => {
1210 let _ = self
1211 .channel
1212 .send(&format!("Failed to enable debug dump: {e}"))
1213 .await;
1214 }
1215 }
1216 }
1217
1218 async fn resolve_message(
1219 &self,
1220 msg: crate::channel::ChannelMessage,
1221 ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
1222 use crate::channel::{Attachment, AttachmentKind};
1223 use zeph_llm::provider::{ImageData, MessagePart};
1224
1225 let text_base = msg.text.clone();
1226
1227 let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
1228 .attachments
1229 .into_iter()
1230 .partition(|a| a.kind == AttachmentKind::Audio);
1231
1232 tracing::debug!(
1233 audio = audio_attachments.len(),
1234 has_stt = self.stt.is_some(),
1235 "resolve_message attachments"
1236 );
1237
1238 let text = if !audio_attachments.is_empty()
1239 && let Some(stt) = self.stt.as_ref()
1240 {
1241 let mut transcribed_parts = Vec::new();
1242 for attachment in &audio_attachments {
1243 if attachment.data.len() > MAX_AUDIO_BYTES {
1244 tracing::warn!(
1245 size = attachment.data.len(),
1246 max = MAX_AUDIO_BYTES,
1247 "audio attachment exceeds size limit, skipping"
1248 );
1249 continue;
1250 }
1251 match stt
1252 .transcribe(&attachment.data, attachment.filename.as_deref())
1253 .await
1254 {
1255 Ok(result) => {
1256 tracing::info!(
1257 len = result.text.len(),
1258 language = ?result.language,
1259 "audio transcribed"
1260 );
1261 transcribed_parts.push(result.text);
1262 }
1263 Err(e) => {
1264 tracing::error!(error = %e, "audio transcription failed");
1265 }
1266 }
1267 }
1268 if transcribed_parts.is_empty() {
1269 text_base
1270 } else {
1271 let transcribed = transcribed_parts.join("\n");
1272 if text_base.is_empty() {
1273 transcribed
1274 } else {
1275 format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
1276 }
1277 }
1278 } else {
1279 if !audio_attachments.is_empty() {
1280 tracing::warn!(
1281 count = audio_attachments.len(),
1282 "audio attachments received but no STT provider configured, dropping"
1283 );
1284 }
1285 text_base
1286 };
1287
1288 let mut image_parts = Vec::new();
1289 for attachment in image_attachments {
1290 if attachment.data.len() > MAX_IMAGE_BYTES {
1291 tracing::warn!(
1292 size = attachment.data.len(),
1293 max = MAX_IMAGE_BYTES,
1294 "image attachment exceeds size limit, skipping"
1295 );
1296 continue;
1297 }
1298 let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
1299 image_parts.push(MessagePart::Image(Box::new(ImageData {
1300 data: attachment.data,
1301 mime_type,
1302 })));
1303 }
1304
1305 (text, image_parts)
1306 }
1307
1308 #[allow(clippy::too_many_lines)]
1309 async fn process_user_message(
1310 &mut self,
1311 text: String,
1312 image_parts: Vec<zeph_llm::provider::MessagePart>,
1313 ) -> Result<(), error::AgentError> {
1314 self.cancel_token = CancellationToken::new();
1315 let signal = Arc::clone(&self.cancel_signal);
1316 let token = self.cancel_token.clone();
1317 tokio::spawn(async move {
1318 signal.notified().await;
1319 token.cancel();
1320 });
1321 let trimmed = text.trim();
1322
1323 if trimmed == "/help" {
1324 self.handle_help_command().await?;
1325 return Ok(());
1326 }
1327
1328 if trimmed == "/status" {
1329 self.handle_status_command().await?;
1330 return Ok(());
1331 }
1332
1333 if trimmed == "/skills" {
1334 self.handle_skills_command().await?;
1335 return Ok(());
1336 }
1337
1338 if trimmed == "/skill" || trimmed.starts_with("/skill ") {
1339 let rest = trimmed.strip_prefix("/skill").unwrap_or("").trim();
1340 self.handle_skill_command(rest).await?;
1341 return Ok(());
1342 }
1343
1344 if trimmed == "/feedback" || trimmed.starts_with("/feedback ") {
1345 let rest = trimmed.strip_prefix("/feedback").unwrap_or("").trim();
1346 self.handle_feedback(rest).await?;
1347 return Ok(());
1348 }
1349
1350 if trimmed == "/mcp" || trimmed.starts_with("/mcp ") {
1351 let args = trimmed.strip_prefix("/mcp").unwrap_or("").trim();
1352 self.handle_mcp_command(args).await?;
1353 return Ok(());
1354 }
1355
1356 if trimmed == "/image" || trimmed.starts_with("/image ") {
1357 let path = trimmed.strip_prefix("/image").unwrap_or("").trim();
1358 if path.is_empty() {
1359 self.channel.send("Usage: /image <path>").await?;
1360 return Ok(());
1361 }
1362 return self.handle_image_command(path).await;
1363 }
1364
1365 if trimmed == "/plan" || trimmed.starts_with("/plan ") {
1366 match crate::orchestration::PlanCommand::parse(trimmed) {
1367 Ok(cmd) => {
1368 self.handle_plan_command(cmd).await?;
1369 return Ok(());
1370 }
1371 Err(e) => {
1372 self.channel.send(&e.to_string()).await?;
1373 return Ok(());
1374 }
1375 }
1376 }
1377
1378 if trimmed == "/graph" || trimmed.starts_with("/graph ") {
1379 self.handle_graph_command(trimmed).await?;
1380 return Ok(());
1381 }
1382
1383 #[cfg(feature = "experiments")]
1384 if trimmed == "/experiment" || trimmed.starts_with("/experiment ") {
1385 self.handle_experiment_command(trimmed).await?;
1386 return Ok(());
1387 }
1388
1389 #[cfg(feature = "lsp-context")]
1390 if trimmed == "/lsp" {
1391 self.handle_lsp_status_command().await?;
1392 return Ok(());
1393 }
1394
1395 if trimmed == "/log" {
1396 self.handle_log_command().await?;
1397 return Ok(());
1398 }
1399
1400 if trimmed.starts_with("/agent") || trimmed.starts_with('@') {
1401 let known: Vec<String> = self
1402 .subagent_manager
1403 .as_ref()
1404 .map(|m| m.definitions().iter().map(|d| d.name.clone()).collect())
1405 .unwrap_or_default();
1406 match crate::subagent::AgentCommand::parse(trimmed, &known) {
1407 Ok(cmd) => {
1408 if let Some(msg) = self.handle_agent_command(cmd).await {
1409 self.channel.send(&msg).await?;
1410 }
1411 return Ok(());
1412 }
1413 Err(e) if trimmed.starts_with('@') => {
1414 tracing::debug!("@mention not matched as agent: {e}");
1416 }
1417 Err(e) => {
1418 self.channel.send(&e.to_string()).await?;
1419 return Ok(());
1420 }
1421 }
1422 }
1423
1424 self.check_pending_rollbacks().await;
1425 let conv_id = self.memory_state.conversation_id;
1428 self.rebuild_system_prompt(&text).await;
1429
1430 let correction_detection_enabled = self
1431 .learning_engine
1432 .config
1433 .as_ref()
1434 .is_none_or(|c| c.correction_detection);
1435 if self.is_learning_enabled() && correction_detection_enabled {
1436 let previous_user_messages: Vec<&str> = self
1437 .messages
1438 .iter()
1439 .filter(|m| m.role == Role::User)
1440 .map(|m| m.content.as_str())
1441 .collect();
1442 let regex_signal = self
1443 .feedback_detector
1444 .detect(trimmed, &previous_user_messages);
1445
1446 let judge_should_run = self
1459 .judge_detector
1460 .as_ref()
1461 .is_some_and(|jd| jd.should_invoke(regex_signal.as_ref()))
1462 && self
1463 .judge_detector
1464 .as_mut()
1465 .is_some_and(feedback_detector::JudgeDetector::check_rate_limit);
1466
1467 let signal = if judge_should_run {
1468 let judge_provider = self
1469 .judge_provider
1470 .clone()
1471 .unwrap_or_else(|| self.provider.clone());
1472 let assistant_snippet = self.last_assistant_response();
1473 let user_msg_owned = trimmed.to_owned();
1474 let memory_arc = self.memory_state.memory.clone();
1475 let skill_name = self
1476 .skill_state
1477 .active_skill_names
1478 .first()
1479 .cloned()
1480 .unwrap_or_default();
1481 let conv_id_bg = conv_id;
1482 let confidence_threshold = self
1484 .learning_engine
1485 .config
1486 .as_ref()
1487 .map_or(0.6, |c| c.correction_confidence_threshold);
1488
1489 tokio::spawn(async move {
1490 match feedback_detector::JudgeDetector::evaluate(
1491 &judge_provider,
1492 &user_msg_owned,
1493 &assistant_snippet,
1494 confidence_threshold,
1495 )
1496 .await
1497 {
1498 Ok(verdict) => {
1499 if let Some(signal) = verdict.into_signal(&user_msg_owned) {
1500 let is_self_correction = signal.kind
1505 == feedback_detector::CorrectionKind::SelfCorrection;
1506 tracing::info!(
1507 kind = signal.kind.as_str(),
1508 confidence = signal.confidence,
1509 source = "judge",
1510 is_self_correction,
1511 "correction signal detected"
1512 );
1513 if let Some(memory) = memory_arc {
1514 let correction_text =
1515 context::truncate_chars(&user_msg_owned, 500);
1516 match memory
1517 .sqlite()
1518 .store_user_correction(
1519 conv_id_bg.map(|c| c.0),
1520 &assistant_snippet,
1521 &correction_text,
1522 if skill_name.is_empty() {
1523 None
1524 } else {
1525 Some(skill_name.as_str())
1526 },
1527 signal.kind.as_str(),
1528 )
1529 .await
1530 {
1531 Ok(correction_id) => {
1532 if let Err(e) = memory
1533 .store_correction_embedding(
1534 correction_id,
1535 &correction_text,
1536 )
1537 .await
1538 {
1539 tracing::warn!(
1540 "failed to store correction embedding: {e:#}"
1541 );
1542 }
1543 }
1544 Err(e) => {
1545 tracing::warn!(
1546 "failed to store judge correction: {e:#}"
1547 );
1548 }
1549 }
1550 }
1551 }
1552 }
1553 Err(e) => {
1554 tracing::warn!("judge detector failed: {e:#}");
1555 }
1556 }
1557 });
1558
1559 None
1561 } else {
1562 regex_signal
1563 };
1564
1565 if let Some(signal) = signal {
1566 tracing::info!(
1567 kind = signal.kind.as_str(),
1568 confidence = signal.confidence,
1569 source = "regex",
1570 "implicit correction detected"
1571 );
1572 let feedback_text = context::truncate_chars(&signal.feedback_text, 500);
1574 if signal.kind != feedback_detector::CorrectionKind::SelfCorrection {
1577 self.record_skill_outcomes(
1578 "user_rejection",
1579 Some(&feedback_text),
1580 Some(signal.kind.as_str()),
1581 )
1582 .await;
1583 }
1584 if let Some(memory) = &self.memory_state.memory {
1585 let correction_text = context::truncate_chars(trimmed, 500);
1589 match memory
1590 .sqlite()
1591 .store_user_correction(
1592 conv_id.map(|c| c.0),
1593 "",
1594 &correction_text,
1595 self.skill_state
1596 .active_skill_names
1597 .first()
1598 .map(String::as_str),
1599 signal.kind.as_str(),
1600 )
1601 .await
1602 {
1603 Ok(correction_id) => {
1604 if let Err(e) = memory
1605 .store_correction_embedding(correction_id, &correction_text)
1606 .await
1607 {
1608 tracing::warn!("failed to store correction embedding: {e:#}");
1609 }
1610 }
1611 Err(e) => tracing::warn!("failed to store user correction: {e:#}"),
1612 }
1613 }
1614 }
1615 }
1616
1617 self.context_manager.compacted_this_turn = false;
1619
1620 self.maybe_apply_deferred_summaries();
1625
1626 if let Err(e) = self.maybe_proactive_compress().await {
1628 tracing::warn!("proactive compression failed: {e:#}");
1629 }
1630
1631 if let Err(e) = self.maybe_compact().await {
1632 tracing::warn!("context compaction failed: {e:#}");
1633 }
1634
1635 if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
1636 tracing::warn!("context preparation failed: {e:#}");
1637 }
1638
1639 self.learning_engine.reset_reflection();
1640
1641 let mut all_image_parts = std::mem::take(&mut self.pending_image_parts);
1642 all_image_parts.extend(image_parts);
1643 let image_parts = all_image_parts;
1644
1645 let user_msg = if !image_parts.is_empty() && self.provider.supports_vision() {
1646 let mut parts = vec![zeph_llm::provider::MessagePart::Text { text: text.clone() }];
1647 parts.extend(image_parts);
1648 Message::from_parts(Role::User, parts)
1649 } else {
1650 if !image_parts.is_empty() {
1651 tracing::warn!(
1652 count = image_parts.len(),
1653 "image attachments dropped: provider does not support vision"
1654 );
1655 }
1656 Message {
1657 role: Role::User,
1658 content: text.clone(),
1659 parts: vec![],
1660 metadata: MessageMetadata::default(),
1661 }
1662 };
1663 self.persist_message(Role::User, &text, &[], false).await;
1665 self.push_message(user_msg);
1666
1667 if let Err(e) = self.process_response().await {
1668 tracing::error!("Response processing failed: {e:#}");
1669 let user_msg = format!("Error: {e:#}");
1670 self.channel.send(&user_msg).await?;
1671 self.messages.pop();
1672 self.recompute_prompt_tokens();
1673 self.channel.flush_chunks().await?;
1674 }
1675
1676 Ok(())
1677 }
1678
1679 async fn handle_image_command(&mut self, path: &str) -> Result<(), error::AgentError> {
1680 use std::path::Component;
1681 use zeph_llm::provider::{ImageData, MessagePart};
1682
1683 let has_parent_dir = std::path::Path::new(path)
1685 .components()
1686 .any(|c| c == Component::ParentDir);
1687 if has_parent_dir {
1688 self.channel
1689 .send("Invalid image path: path traversal not allowed")
1690 .await?;
1691 return Ok(());
1692 }
1693
1694 let data = match std::fs::read(path) {
1695 Ok(d) => d,
1696 Err(e) => {
1697 self.channel
1698 .send(&format!("Cannot read image {path}: {e}"))
1699 .await?;
1700 return Ok(());
1701 }
1702 };
1703 if data.len() > MAX_IMAGE_BYTES {
1704 self.channel
1705 .send(&format!(
1706 "Image {path} exceeds size limit ({} MB), skipping",
1707 MAX_IMAGE_BYTES / 1024 / 1024
1708 ))
1709 .await?;
1710 return Ok(());
1711 }
1712 let mime_type = detect_image_mime(Some(path)).to_string();
1713 self.pending_image_parts
1714 .push(MessagePart::Image(Box::new(ImageData { data, mime_type })));
1715 self.channel
1716 .send(&format!("Image loaded: {path}. Send your message."))
1717 .await?;
1718 Ok(())
1719 }
1720
1721 async fn handle_help_command(&mut self) -> Result<(), error::AgentError> {
1722 use std::fmt::Write;
1723
1724 let mut out = String::from("Slash commands:\n\n");
1725
1726 let categories = [
1727 slash_commands::SlashCategory::Info,
1728 slash_commands::SlashCategory::Session,
1729 slash_commands::SlashCategory::Model,
1730 slash_commands::SlashCategory::Memory,
1731 slash_commands::SlashCategory::Tools,
1732 slash_commands::SlashCategory::Planning,
1733 slash_commands::SlashCategory::Debug,
1734 slash_commands::SlashCategory::Advanced,
1735 ];
1736
1737 for cat in &categories {
1738 let entries: Vec<_> = slash_commands::COMMANDS
1739 .iter()
1740 .filter(|c| &c.category == cat)
1741 .collect();
1742 if entries.is_empty() {
1743 continue;
1744 }
1745 let _ = writeln!(out, "{}:", cat.as_str());
1746 for cmd in entries {
1747 if cmd.args.is_empty() {
1748 let _ = write!(out, " {}", cmd.name);
1749 } else {
1750 let _ = write!(out, " {} {}", cmd.name, cmd.args);
1751 }
1752 let _ = write!(out, " — {}", cmd.description);
1753 if let Some(feat) = cmd.feature_gate {
1754 let _ = write!(out, " [requires: {feat}]");
1755 }
1756 let _ = writeln!(out);
1757 }
1758 let _ = writeln!(out);
1759 }
1760
1761 self.channel.send(out.trim_end()).await?;
1762 Ok(())
1763 }
1764
1765 async fn handle_status_command(&mut self) -> Result<(), error::AgentError> {
1766 use std::fmt::Write;
1767
1768 let uptime = self.start_time.elapsed().as_secs();
1769 let msg_count = self
1770 .messages
1771 .iter()
1772 .filter(|m| m.role == Role::User)
1773 .count();
1774
1775 let (api_calls, prompt_tokens, completion_tokens, cost_cents, mcp_servers) =
1776 if let Some(ref tx) = self.metrics_tx {
1777 let m = tx.borrow();
1778 (
1779 m.api_calls,
1780 m.prompt_tokens,
1781 m.completion_tokens,
1782 m.cost_spent_cents,
1783 m.mcp_server_count,
1784 )
1785 } else {
1786 (0, 0, 0, 0.0, 0)
1787 };
1788
1789 let skill_count = self
1790 .skill_state
1791 .registry
1792 .read()
1793 .map(|r| r.all_meta().len())
1794 .unwrap_or(0);
1795
1796 let mut out = String::from("Session status:\n\n");
1797 let _ = writeln!(out, "Provider: {}", self.provider.name());
1798 let _ = writeln!(out, "Model: {}", self.runtime.model_name);
1799 let _ = writeln!(out, "Uptime: {uptime}s");
1800 let _ = writeln!(out, "Turns: {msg_count}");
1801 let _ = writeln!(out, "API calls: {api_calls}");
1802 let _ = writeln!(
1803 out,
1804 "Tokens: {prompt_tokens} prompt / {completion_tokens} completion"
1805 );
1806 let _ = writeln!(out, "Skills: {skill_count}");
1807 let _ = writeln!(out, "MCP: {mcp_servers} server(s)");
1808 if cost_cents > 0.0 {
1809 let _ = writeln!(out, "Cost: ${:.4}", cost_cents / 100.0);
1810 }
1811
1812 self.channel.send(out.trim_end()).await?;
1813 Ok(())
1814 }
1815
1816 async fn handle_skills_command(&mut self) -> Result<(), error::AgentError> {
1817 use std::fmt::Write;
1818
1819 let mut output = String::from("Available skills:\n\n");
1820
1821 let all_meta: Vec<zeph_skills::loader::SkillMeta> = self
1822 .skill_state
1823 .registry
1824 .read()
1825 .expect("registry read lock")
1826 .all_meta()
1827 .into_iter()
1828 .cloned()
1829 .collect();
1830
1831 for meta in &all_meta {
1832 let trust_info = if let Some(memory) = &self.memory_state.memory {
1833 memory
1834 .sqlite()
1835 .load_skill_trust(&meta.name)
1836 .await
1837 .ok()
1838 .flatten()
1839 .map_or_else(String::new, |r| format!(" [{}]", r.trust_level))
1840 } else {
1841 String::new()
1842 };
1843 let _ = writeln!(output, "- {} — {}{trust_info}", meta.name, meta.description);
1844 }
1845
1846 if let Some(memory) = &self.memory_state.memory {
1847 match memory.sqlite().load_skill_usage().await {
1848 Ok(usage) if !usage.is_empty() => {
1849 output.push_str("\nUsage statistics:\n\n");
1850 for row in &usage {
1851 let _ = writeln!(
1852 output,
1853 "- {}: {} invocations (last: {})",
1854 row.skill_name, row.invocation_count, row.last_used_at,
1855 );
1856 }
1857 }
1858 Ok(_) => {}
1859 Err(e) => tracing::warn!("failed to load skill usage: {e:#}"),
1860 }
1861 }
1862
1863 self.channel.send(&output).await?;
1864 Ok(())
1865 }
1866
1867 async fn handle_feedback(&mut self, input: &str) -> Result<(), error::AgentError> {
1868 let Some((name, rest)) = input.split_once(' ') else {
1869 self.channel
1870 .send("Usage: /feedback <skill_name> <message>")
1871 .await?;
1872 return Ok(());
1873 };
1874 let (skill_name, feedback) = (name.trim(), rest.trim().trim_matches('"'));
1875
1876 if feedback.is_empty() {
1877 self.channel
1878 .send("Usage: /feedback <skill_name> <message>")
1879 .await?;
1880 return Ok(());
1881 }
1882
1883 let Some(memory) = &self.memory_state.memory else {
1884 self.channel.send("Memory not available.").await?;
1885 return Ok(());
1886 };
1887
1888 memory
1889 .sqlite()
1890 .record_skill_outcome(
1891 skill_name,
1892 None,
1893 self.memory_state.conversation_id,
1894 "user_rejection",
1895 Some(feedback),
1896 None,
1897 )
1898 .await?;
1899
1900 if self.is_learning_enabled() {
1901 self.generate_improved_skill(skill_name, feedback, "", Some(feedback))
1902 .await
1903 .ok();
1904 }
1905
1906 self.channel
1907 .send(&format!("Feedback recorded for \"{skill_name}\"."))
1908 .await?;
1909 Ok(())
1910 }
1911
1912 #[allow(clippy::too_many_lines)]
1913 async fn handle_agent_command(&mut self, cmd: crate::subagent::AgentCommand) -> Option<String> {
1914 use crate::subagent::{AgentCommand, SubAgentState};
1915 use std::fmt::Write as _;
1916
1917 match cmd {
1918 AgentCommand::List => {
1919 let mgr = self.subagent_manager.as_ref()?;
1920 let defs = mgr.definitions();
1921 if defs.is_empty() {
1922 return Some("No sub-agent definitions found.".into());
1923 }
1924 let mut out = String::from("Available sub-agents:\n");
1925 for d in defs {
1926 let memory_label = match d.memory {
1927 Some(crate::subagent::MemoryScope::User) => " [memory:user]",
1928 Some(crate::subagent::MemoryScope::Project) => " [memory:project]",
1929 Some(crate::subagent::MemoryScope::Local) => " [memory:local]",
1930 None => "",
1931 };
1932 if let Some(ref src) = d.source {
1933 let _ = writeln!(
1934 out,
1935 " {}{} — {} ({})",
1936 d.name, memory_label, d.description, src
1937 );
1938 } else {
1939 let _ = writeln!(out, " {}{} — {}", d.name, memory_label, d.description);
1940 }
1941 }
1942 Some(out)
1943 }
1944 AgentCommand::Background { name, prompt } => {
1945 let provider = self.provider.clone();
1946 let tool_executor = Arc::clone(&self.tool_executor);
1947 let skills = self.filtered_skills_for(&name);
1948 let mgr = self.subagent_manager.as_mut()?;
1949 let cfg = self.subagent_config.clone();
1950 match mgr.spawn(&name, &prompt, provider, tool_executor, skills, &cfg) {
1951 Ok(id) => Some(format!(
1952 "Sub-agent '{name}' started in background (id: {short})",
1953 short = &id[..8.min(id.len())]
1954 )),
1955 Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
1956 }
1957 }
1958 AgentCommand::Spawn { name, prompt }
1959 | AgentCommand::Mention {
1960 agent: name,
1961 prompt,
1962 } => {
1963 let provider = self.provider.clone();
1965 let tool_executor = Arc::clone(&self.tool_executor);
1966 let skills = self.filtered_skills_for(&name);
1967 let mgr = self.subagent_manager.as_mut()?;
1968 let cfg = self.subagent_config.clone();
1969 let task_id = match mgr.spawn(&name, &prompt, provider, tool_executor, skills, &cfg)
1970 {
1971 Ok(id) => id,
1972 Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
1973 };
1974 let short = task_id[..8.min(task_id.len())].to_owned();
1975 let _ = self
1976 .channel
1977 .send(&format!("Sub-agent '{name}' running... (id: {short})"))
1978 .await;
1979 let result = loop {
1981 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1982
1983 #[allow(clippy::redundant_closure_for_method_calls)]
1987 let pending = self
1988 .subagent_manager
1989 .as_mut()
1990 .and_then(|m| m.try_recv_secret_request());
1991 if let Some((req_task_id, req)) = pending {
1992 let prompt =
2000 format!("Sub-agent requests secret '{}'. Allow?", req.secret_key);
2001 let approved = self.channel.confirm(&prompt).await.unwrap_or(false);
2002 if let Some(mgr) = self.subagent_manager.as_mut() {
2003 if approved {
2004 let ttl = std::time::Duration::from_secs(300);
2005 let key = req.secret_key.clone();
2006 if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
2007 let _ = mgr.deliver_secret(&req_task_id, key);
2008 }
2009 } else {
2010 let _ = mgr.deny_secret(&req_task_id);
2011 }
2012 }
2013 }
2014
2015 let mgr = self.subagent_manager.as_ref()?;
2016 let statuses = mgr.statuses();
2017 let Some((_, status)) = statuses.iter().find(|(id, _)| id == &task_id) else {
2018 break "Sub-agent completed (no status available).".to_owned();
2019 };
2020 match status.state {
2021 SubAgentState::Completed => {
2022 let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
2023 break format!("Sub-agent '{name}' completed: {msg}");
2024 }
2025 SubAgentState::Failed => {
2026 let msg = status
2027 .last_message
2028 .clone()
2029 .unwrap_or_else(|| "unknown error".into());
2030 break format!("Sub-agent '{name}' failed: {msg}");
2031 }
2032 SubAgentState::Canceled => {
2033 break format!("Sub-agent '{name}' was cancelled.");
2034 }
2035 _ => {
2036 let _ = self
2037 .channel
2038 .send_status(&format!(
2039 "sub-agent '{name}': turn {}/{}",
2040 status.turns_used,
2041 self.subagent_manager
2042 .as_ref()
2043 .and_then(|m| m.agents_def(&task_id))
2044 .map_or(20, |d| d.permissions.max_turns)
2045 ))
2046 .await;
2047 }
2048 }
2049 };
2050 Some(result)
2051 }
2052 AgentCommand::Status => {
2053 let mgr = self.subagent_manager.as_ref()?;
2054 let statuses = mgr.statuses();
2055 if statuses.is_empty() {
2056 return Some("No active sub-agents.".into());
2057 }
2058 let mut out = String::from("Active sub-agents:\n");
2059 for (id, s) in &statuses {
2060 let state = format!("{:?}", s.state).to_lowercase();
2061 let elapsed = s.started_at.elapsed().as_secs();
2062 let _ = writeln!(
2063 out,
2064 " [{short}] {state} turns={t} elapsed={elapsed}s {msg}",
2065 short = &id[..8.min(id.len())],
2066 t = s.turns_used,
2067 msg = s.last_message.as_deref().unwrap_or(""),
2068 );
2069 if let Some(def) = mgr.agents_def(id)
2071 && let Some(scope) = def.memory
2072 && let Ok(dir) =
2073 crate::subagent::memory::resolve_memory_dir(scope, &def.name)
2074 {
2075 let _ = writeln!(out, " memory: {}", dir.display());
2076 }
2077 }
2078 Some(out)
2079 }
2080 AgentCommand::Cancel { id } => {
2081 let mgr = self.subagent_manager.as_mut()?;
2082 let ids: Vec<String> = mgr
2084 .statuses()
2085 .into_iter()
2086 .map(|(task_id, _)| task_id)
2087 .filter(|task_id| task_id.starts_with(&id))
2088 .collect();
2089 match ids.as_slice() {
2090 [] => Some(format!("No sub-agent with id prefix '{id}'")),
2091 [full_id] => {
2092 let full_id = full_id.clone();
2093 match mgr.cancel(&full_id) {
2094 Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
2095 Err(e) => Some(format!("Cancel failed: {e}")),
2096 }
2097 }
2098 _ => Some(format!(
2099 "Ambiguous id prefix '{id}': matches {} agents",
2100 ids.len()
2101 )),
2102 }
2103 }
2104 AgentCommand::Approve { id } => {
2105 let mgr = self.subagent_manager.as_mut()?;
2107 let full_ids: Vec<String> = mgr
2108 .statuses()
2109 .into_iter()
2110 .map(|(tid, _)| tid)
2111 .filter(|tid| tid.starts_with(&id))
2112 .collect();
2113 let full_id = match full_ids.as_slice() {
2114 [] => return Some(format!("No sub-agent with id prefix '{id}'")),
2115 [fid] => fid.clone(),
2116 _ => {
2117 return Some(format!(
2118 "Ambiguous id prefix '{id}': matches {} agents",
2119 full_ids.len()
2120 ));
2121 }
2122 };
2123 if let Some((tid, req)) = mgr.try_recv_secret_request()
2124 && tid == full_id
2125 {
2126 let key = req.secret_key.clone();
2127 let ttl = std::time::Duration::from_secs(300);
2128 if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
2129 return Some(format!("Approve failed: {e}"));
2130 }
2131 if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
2132 return Some(format!("Secret delivery failed: {e}"));
2133 }
2134 return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
2135 }
2136 Some(format!(
2137 "No pending secret request for sub-agent '{full_id}'."
2138 ))
2139 }
2140 AgentCommand::Deny { id } => {
2141 let mgr = self.subagent_manager.as_mut()?;
2142 let full_ids: Vec<String> = mgr
2143 .statuses()
2144 .into_iter()
2145 .map(|(tid, _)| tid)
2146 .filter(|tid| tid.starts_with(&id))
2147 .collect();
2148 let full_id = match full_ids.as_slice() {
2149 [] => return Some(format!("No sub-agent with id prefix '{id}'")),
2150 [fid] => fid.clone(),
2151 _ => {
2152 return Some(format!(
2153 "Ambiguous id prefix '{id}': matches {} agents",
2154 full_ids.len()
2155 ));
2156 }
2157 };
2158 match mgr.deny_secret(&full_id) {
2159 Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
2160 Err(e) => Some(format!("Deny failed: {e}")),
2161 }
2162 }
2163 AgentCommand::Resume { id, prompt } => {
2164 let cfg = self.subagent_config.clone();
2165 let def_name = {
2168 let mgr = self.subagent_manager.as_ref()?;
2169 match mgr.def_name_for_resume(&id, &cfg) {
2170 Ok(name) => name,
2171 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2172 }
2173 };
2174 let skills = self.filtered_skills_for(&def_name);
2175 let provider = self.provider.clone();
2176 let tool_executor = Arc::clone(&self.tool_executor);
2177 let mgr = self.subagent_manager.as_mut()?;
2178 let (task_id, _) =
2179 match mgr.resume(&id, &prompt, provider, tool_executor, skills, &cfg) {
2180 Ok(pair) => pair,
2181 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2182 };
2183 let short = task_id[..8.min(task_id.len())].to_owned();
2184 let _ = self
2185 .channel
2186 .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
2187 .await;
2188 let result = loop {
2190 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
2191
2192 #[allow(clippy::redundant_closure_for_method_calls)]
2193 let pending = self
2194 .subagent_manager
2195 .as_mut()
2196 .and_then(|m| m.try_recv_secret_request());
2197 if let Some((req_task_id, req)) = pending {
2198 let confirm_prompt =
2199 format!("Sub-agent requests secret '{}'. Allow?", req.secret_key);
2200 let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
2201 if let Some(mgr) = self.subagent_manager.as_mut() {
2202 if approved {
2203 let ttl = std::time::Duration::from_secs(300);
2204 let key = req.secret_key.clone();
2205 if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
2206 let _ = mgr.deliver_secret(&req_task_id, key);
2207 }
2208 } else {
2209 let _ = mgr.deny_secret(&req_task_id);
2210 }
2211 }
2212 }
2213
2214 let mgr = self.subagent_manager.as_ref()?;
2215 let statuses = mgr.statuses();
2216 let Some((_, status)) = statuses.iter().find(|(tid, _)| tid == &task_id) else {
2217 break "Sub-agent resume completed (no status available).".to_owned();
2218 };
2219 match status.state {
2220 SubAgentState::Completed => {
2221 let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
2222 break format!("Resumed sub-agent completed: {msg}");
2223 }
2224 SubAgentState::Failed => {
2225 let msg = status
2226 .last_message
2227 .clone()
2228 .unwrap_or_else(|| "unknown error".into());
2229 break format!("Resumed sub-agent failed: {msg}");
2230 }
2231 SubAgentState::Canceled => {
2232 break "Resumed sub-agent was cancelled.".to_owned();
2233 }
2234 _ => {
2235 let _ = self
2236 .channel
2237 .send_status(&format!(
2238 "resumed sub-agent: turn {}/{}",
2239 status.turns_used,
2240 self.subagent_manager
2241 .as_ref()
2242 .and_then(|m| m.agents_def(&task_id))
2243 .map_or(20, |d| d.permissions.max_turns)
2244 ))
2245 .await;
2246 }
2247 }
2248 };
2249 Some(result)
2250 }
2251 }
2252 }
2253
2254 fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
2255 let mgr = self.subagent_manager.as_ref()?;
2256 let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
2257 let reg = self
2258 .skill_state
2259 .registry
2260 .read()
2261 .expect("registry read lock");
2262 match crate::subagent::filter_skills(®, &def.skills) {
2263 Ok(skills) => {
2264 let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
2265 if bodies.is_empty() {
2266 None
2267 } else {
2268 Some(bodies)
2269 }
2270 }
2271 Err(e) => {
2272 tracing::warn!(error = %e, "skill filtering failed for sub-agent");
2273 None
2274 }
2275 }
2276 }
2277
2278 #[allow(clippy::too_many_lines)]
2279 async fn reload_skills(&mut self) {
2280 let new_registry = SkillRegistry::load(&self.skill_state.skill_paths);
2281 if new_registry.fingerprint()
2282 == self
2283 .skill_state
2284 .registry
2285 .read()
2286 .expect("registry read lock")
2287 .fingerprint()
2288 {
2289 return;
2290 }
2291 let _ = self.channel.send_status("reloading skills...").await;
2292 *self
2293 .skill_state
2294 .registry
2295 .write()
2296 .expect("registry write lock") = new_registry;
2297
2298 let all_meta = self
2299 .skill_state
2300 .registry
2301 .read()
2302 .expect("registry read lock")
2303 .all_meta()
2304 .into_iter()
2305 .cloned()
2306 .collect::<Vec<_>>();
2307
2308 if let Some(ref memory) = self.memory_state.memory {
2310 let trust_cfg = self.skill_state.trust_config.clone();
2311 let managed_dir = self.skill_state.managed_dir.clone();
2312 for meta in &all_meta {
2313 let source_kind = if managed_dir
2314 .as_ref()
2315 .is_some_and(|d| meta.skill_dir.starts_with(d))
2316 {
2317 zeph_memory::sqlite::SourceKind::Hub
2318 } else {
2319 zeph_memory::sqlite::SourceKind::Local
2320 };
2321 let initial_level = if matches!(source_kind, zeph_memory::sqlite::SourceKind::Hub) {
2322 &trust_cfg.default_level
2323 } else {
2324 &trust_cfg.local_level
2325 };
2326 match zeph_skills::compute_skill_hash(&meta.skill_dir) {
2327 Ok(current_hash) => {
2328 let existing = memory
2329 .sqlite()
2330 .load_skill_trust(&meta.name)
2331 .await
2332 .ok()
2333 .flatten();
2334 let trust_level_str = if let Some(ref row) = existing {
2335 if row.blake3_hash == current_hash {
2336 row.trust_level.clone()
2337 } else {
2338 trust_cfg.hash_mismatch_level.to_string()
2339 }
2340 } else {
2341 initial_level.to_string()
2342 };
2343 let source_path = meta.skill_dir.to_str();
2344 if let Err(e) = memory
2345 .sqlite()
2346 .upsert_skill_trust(
2347 &meta.name,
2348 &trust_level_str,
2349 source_kind,
2350 None,
2351 source_path,
2352 ¤t_hash,
2353 )
2354 .await
2355 {
2356 tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
2357 }
2358 }
2359 Err(e) => {
2360 tracing::warn!("failed to compute hash for '{}': {e:#}", meta.name);
2361 }
2362 }
2363 }
2364 }
2365
2366 let all_meta = all_meta.iter().collect::<Vec<_>>();
2367 let provider = self.provider.clone();
2368 let embed_fn = |text: &str| -> zeph_skills::matcher::EmbedFuture {
2369 let owned = text.to_owned();
2370 let p = provider.clone();
2371 Box::pin(async move { p.embed(&owned).await })
2372 };
2373
2374 let needs_inmemory_rebuild = !self
2375 .skill_state
2376 .matcher
2377 .as_ref()
2378 .is_some_and(SkillMatcherBackend::is_qdrant);
2379
2380 if needs_inmemory_rebuild {
2381 self.skill_state.matcher = SkillMatcher::new(&all_meta, embed_fn)
2382 .await
2383 .map(SkillMatcherBackend::InMemory);
2384 } else if let Some(ref mut backend) = self.skill_state.matcher {
2385 let _ = self.channel.send_status("syncing skill index...").await;
2386 if let Err(e) = backend
2387 .sync(&all_meta, &self.skill_state.embedding_model, embed_fn)
2388 .await
2389 {
2390 tracing::warn!("failed to sync skill embeddings: {e:#}");
2391 }
2392 }
2393
2394 if self.skill_state.hybrid_search {
2395 let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
2396 let _ = self.channel.send_status("rebuilding search index...").await;
2397 self.skill_state.bm25_index = Some(zeph_skills::bm25::Bm25Index::build(&descs));
2398 }
2399
2400 let all_skills: Vec<Skill> = {
2401 let reg = self
2402 .skill_state
2403 .registry
2404 .read()
2405 .expect("registry read lock");
2406 reg.all_meta()
2407 .iter()
2408 .filter_map(|m| reg.get_skill(&m.name).ok())
2409 .collect()
2410 };
2411 let trust_map = self.build_skill_trust_map().await;
2412 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
2413 let skills_prompt = format_skills_prompt(&all_skills, &trust_map, &empty_health);
2414 self.skill_state
2415 .last_skills_prompt
2416 .clone_from(&skills_prompt);
2417 let system_prompt = build_system_prompt(&skills_prompt, None, None, false);
2418 if let Some(msg) = self.messages.first_mut() {
2419 msg.content = system_prompt;
2420 }
2421
2422 let _ = self.channel.send_status("").await;
2423 tracing::info!(
2424 "reloaded {} skill(s)",
2425 self.skill_state
2426 .registry
2427 .read()
2428 .expect("registry read lock")
2429 .all_meta()
2430 .len()
2431 );
2432 }
2433
2434 fn reload_instructions(&mut self) {
2435 if let Some(ref mut rx) = self.instruction_reload_rx {
2437 while rx.try_recv().is_ok() {}
2438 }
2439 let Some(ref state) = self.instruction_reload_state else {
2440 return;
2441 };
2442 let new_blocks = crate::instructions::load_instructions(
2443 &state.base_dir,
2444 &state.provider_kinds,
2445 &state.explicit_files,
2446 state.auto_detect,
2447 );
2448 let old_sources: std::collections::HashSet<_> =
2449 self.instruction_blocks.iter().map(|b| &b.source).collect();
2450 let new_sources: std::collections::HashSet<_> =
2451 new_blocks.iter().map(|b| &b.source).collect();
2452 for added in new_sources.difference(&old_sources) {
2453 tracing::info!(path = %added.display(), "instruction file added");
2454 }
2455 for removed in old_sources.difference(&new_sources) {
2456 tracing::info!(path = %removed.display(), "instruction file removed");
2457 }
2458 tracing::info!(
2459 old_count = self.instruction_blocks.len(),
2460 new_count = new_blocks.len(),
2461 "reloaded instruction files"
2462 );
2463 self.instruction_blocks = new_blocks;
2464 }
2465
2466 fn reload_config(&mut self) {
2467 let Some(ref path) = self.config_path else {
2468 return;
2469 };
2470 let config = match Config::load(path) {
2471 Ok(c) => c,
2472 Err(e) => {
2473 tracing::warn!("config reload failed: {e:#}");
2474 return;
2475 }
2476 };
2477
2478 self.runtime.security = config.security;
2479 self.runtime.timeouts = config.timeouts;
2480 self.runtime.redact_credentials = config.memory.redact_credentials;
2481 self.memory_state.history_limit = config.memory.history_limit;
2482 self.memory_state.recall_limit = config.memory.semantic.recall_limit;
2483 self.memory_state.summarization_threshold = config.memory.summarization_threshold;
2484 self.skill_state.max_active_skills = config.skills.max_active_skills;
2485 self.skill_state.disambiguation_threshold = config.skills.disambiguation_threshold;
2486 self.skill_state.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
2487 self.skill_state.hybrid_search = config.skills.hybrid_search;
2488
2489 if config.memory.context_budget_tokens > 0 {
2490 self.context_manager.budget = Some(
2491 ContextBudget::new(config.memory.context_budget_tokens, 0.20)
2492 .with_graph_enabled(config.memory.graph.enabled),
2493 );
2494 } else {
2495 self.context_manager.budget = None;
2496 }
2497
2498 {
2499 self.memory_state.graph_config = config.memory.graph.clone();
2500 }
2501 self.context_manager.compaction_threshold = config.memory.compaction_threshold;
2502 self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
2503 self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
2504 self.context_manager.compression = config.memory.compression.clone();
2505 self.context_manager.routing = config.memory.routing.clone();
2506 self.memory_state.cross_session_score_threshold =
2507 config.memory.cross_session_score_threshold;
2508
2509 #[cfg(feature = "index")]
2510 {
2511 self.index.repo_map_ttl =
2512 std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
2513 }
2514
2515 tracing::info!("config reloaded");
2516 }
2517}
2518pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
2519 while !*rx.borrow_and_update() {
2520 if rx.changed().await.is_err() {
2521 std::future::pending::<()>().await;
2522 }
2523 }
2524}
2525
2526pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
2527 match rx {
2528 Some(inner) => {
2529 if let Some(v) = inner.recv().await {
2530 Some(v)
2531 } else {
2532 *rx = None;
2533 std::future::pending().await
2534 }
2535 }
2536 None => std::future::pending().await,
2537 }
2538}
2539
2540#[cfg(test)]
2541pub(super) mod agent_tests {
2542 use super::message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, detect_image_mime};
2543 #[allow(unused_imports)]
2544 pub(crate) use super::{
2545 Agent, CODE_CONTEXT_PREFIX, CROSS_SESSION_PREFIX, DOOM_LOOP_WINDOW, RECALL_PREFIX,
2546 SUMMARY_PREFIX, TOOL_OUTPUT_SUFFIX, format_tool_output, recv_optional, shutdown_signal,
2547 };
2548 pub(crate) use crate::channel::Channel;
2549 use crate::channel::{Attachment, AttachmentKind, ChannelMessage};
2550 pub(crate) use crate::config::{SecurityConfig, TimeoutConfig};
2551 pub(crate) use crate::metrics::MetricsSnapshot;
2552 use std::sync::{Arc, Mutex};
2553 pub(crate) use tokio::sync::{Notify, mpsc, watch};
2554 pub(crate) use zeph_llm::any::AnyProvider;
2555 use zeph_llm::mock::MockProvider;
2556 pub(crate) use zeph_llm::provider::{Message, MessageMetadata, Role};
2557 pub(crate) use zeph_memory::semantic::SemanticMemory;
2558 pub(crate) use zeph_skills::registry::SkillRegistry;
2559 pub(crate) use zeph_skills::watcher::SkillEvent;
2560 pub(crate) use zeph_tools::executor::ToolExecutor;
2561 use zeph_tools::executor::{ToolError, ToolOutput};
2562
2563 pub(crate) fn mock_provider(responses: Vec<String>) -> AnyProvider {
2564 AnyProvider::Mock(MockProvider::with_responses(responses))
2565 }
2566
2567 pub(crate) fn mock_provider_streaming(responses: Vec<String>) -> AnyProvider {
2568 AnyProvider::Mock(MockProvider::with_responses(responses).with_streaming())
2569 }
2570
2571 pub(crate) fn mock_provider_failing() -> AnyProvider {
2572 AnyProvider::Mock(MockProvider::failing())
2573 }
2574
2575 pub(crate) fn mock_provider_with_models(
2576 responses: Vec<String>,
2577 models: Vec<zeph_llm::model_cache::RemoteModelInfo>,
2578 ) -> AnyProvider {
2579 AnyProvider::Mock(MockProvider::with_responses(responses).with_models(models))
2580 }
2581
2582 pub(crate) struct MockChannel {
2583 messages: Arc<Mutex<Vec<String>>>,
2584 sent: Arc<Mutex<Vec<String>>>,
2585 chunks: Arc<Mutex<Vec<String>>>,
2586 confirmations: Arc<Mutex<Vec<bool>>>,
2587 pub(crate) statuses: Arc<Mutex<Vec<String>>>,
2588 exit_supported: bool,
2589 }
2590
2591 impl MockChannel {
2592 pub(crate) fn new(messages: Vec<String>) -> Self {
2593 Self {
2594 messages: Arc::new(Mutex::new(messages)),
2595 sent: Arc::new(Mutex::new(Vec::new())),
2596 chunks: Arc::new(Mutex::new(Vec::new())),
2597 confirmations: Arc::new(Mutex::new(Vec::new())),
2598 statuses: Arc::new(Mutex::new(Vec::new())),
2599 exit_supported: true,
2600 }
2601 }
2602
2603 pub(crate) fn without_exit_support(mut self) -> Self {
2604 self.exit_supported = false;
2605 self
2606 }
2607
2608 pub(crate) fn with_confirmations(mut self, confirmations: Vec<bool>) -> Self {
2609 self.confirmations = Arc::new(Mutex::new(confirmations));
2610 self
2611 }
2612
2613 pub(crate) fn sent_messages(&self) -> Vec<String> {
2614 self.sent.lock().unwrap().clone()
2615 }
2616
2617 pub(crate) fn sent_chunks(&self) -> Vec<String> {
2618 self.chunks.lock().unwrap().clone()
2619 }
2620 }
2621
2622 impl Channel for MockChannel {
2623 async fn recv(&mut self) -> Result<Option<ChannelMessage>, crate::channel::ChannelError> {
2624 let mut msgs = self.messages.lock().unwrap();
2625 if msgs.is_empty() {
2626 Ok(None)
2627 } else {
2628 Ok(Some(ChannelMessage {
2629 text: msgs.remove(0),
2630 attachments: vec![],
2631 }))
2632 }
2633 }
2634
2635 fn try_recv(&mut self) -> Option<ChannelMessage> {
2636 let mut msgs = self.messages.lock().unwrap();
2637 if msgs.is_empty() {
2638 None
2639 } else {
2640 Some(ChannelMessage {
2641 text: msgs.remove(0),
2642 attachments: vec![],
2643 })
2644 }
2645 }
2646
2647 async fn send(&mut self, text: &str) -> Result<(), crate::channel::ChannelError> {
2648 self.sent.lock().unwrap().push(text.to_string());
2649 Ok(())
2650 }
2651
2652 async fn send_chunk(&mut self, chunk: &str) -> Result<(), crate::channel::ChannelError> {
2653 self.chunks.lock().unwrap().push(chunk.to_string());
2654 Ok(())
2655 }
2656
2657 async fn flush_chunks(&mut self) -> Result<(), crate::channel::ChannelError> {
2658 Ok(())
2659 }
2660
2661 async fn send_status(&mut self, text: &str) -> Result<(), crate::channel::ChannelError> {
2662 self.statuses.lock().unwrap().push(text.to_string());
2663 Ok(())
2664 }
2665
2666 async fn confirm(&mut self, _prompt: &str) -> Result<bool, crate::channel::ChannelError> {
2667 let mut confs = self.confirmations.lock().unwrap();
2668 Ok(if confs.is_empty() {
2669 true
2670 } else {
2671 confs.remove(0)
2672 })
2673 }
2674
2675 fn supports_exit(&self) -> bool {
2676 self.exit_supported
2677 }
2678 }
2679
2680 pub(crate) struct MockToolExecutor {
2681 outputs: Arc<Mutex<Vec<Result<Option<ToolOutput>, ToolError>>>>,
2682 pub(crate) captured_env: Arc<Mutex<Vec<Option<std::collections::HashMap<String, String>>>>>,
2683 }
2684
2685 impl MockToolExecutor {
2686 pub(crate) fn new(outputs: Vec<Result<Option<ToolOutput>, ToolError>>) -> Self {
2687 Self {
2688 outputs: Arc::new(Mutex::new(outputs)),
2689 captured_env: Arc::new(Mutex::new(Vec::new())),
2690 }
2691 }
2692
2693 pub(crate) fn no_tools() -> Self {
2694 Self::new(vec![Ok(None)])
2695 }
2696 }
2697
2698 impl ToolExecutor for MockToolExecutor {
2699 async fn execute(&self, _response: &str) -> Result<Option<ToolOutput>, ToolError> {
2700 let mut outputs = self.outputs.lock().unwrap();
2701 if outputs.is_empty() {
2702 Ok(None)
2703 } else {
2704 outputs.remove(0)
2705 }
2706 }
2707
2708 fn set_skill_env(&self, env: Option<std::collections::HashMap<String, String>>) {
2709 self.captured_env.lock().unwrap().push(env);
2710 }
2711 }
2712
2713 pub(crate) fn create_test_registry() -> SkillRegistry {
2714 let temp_dir = tempfile::tempdir().unwrap();
2715 let skill_dir = temp_dir.path().join("test-skill");
2716 std::fs::create_dir(&skill_dir).unwrap();
2717 std::fs::write(
2718 skill_dir.join("SKILL.md"),
2719 "---\nname: test-skill\ndescription: A test skill\n---\nTest skill body",
2720 )
2721 .unwrap();
2722 SkillRegistry::load(&[temp_dir.path().to_path_buf()])
2723 }
2724
2725 #[tokio::test]
2726 async fn agent_new_initializes_with_system_prompt() {
2727 let provider = mock_provider(vec![]);
2728 let channel = MockChannel::new(vec![]);
2729 let registry = create_test_registry();
2730 let executor = MockToolExecutor::no_tools();
2731
2732 let agent = Agent::new(provider, channel, registry, None, 5, executor);
2733
2734 assert_eq!(agent.messages.len(), 1);
2735 assert_eq!(agent.messages[0].role, Role::System);
2736 assert!(!agent.messages[0].content.is_empty());
2737 }
2738
2739 #[tokio::test]
2740 async fn agent_with_embedding_model_sets_model() {
2741 let provider = mock_provider(vec![]);
2742 let channel = MockChannel::new(vec![]);
2743 let registry = create_test_registry();
2744 let executor = MockToolExecutor::no_tools();
2745
2746 let agent = Agent::new(provider, channel, registry, None, 5, executor)
2747 .with_embedding_model("test-embed-model".to_string());
2748
2749 assert_eq!(agent.skill_state.embedding_model, "test-embed-model");
2750 }
2751
2752 #[tokio::test]
2753 async fn agent_with_shutdown_sets_receiver() {
2754 let provider = mock_provider(vec![]);
2755 let channel = MockChannel::new(vec![]);
2756 let registry = create_test_registry();
2757 let executor = MockToolExecutor::no_tools();
2758 let (_tx, rx) = watch::channel(false);
2759
2760 let _agent = Agent::new(provider, channel, registry, None, 5, executor).with_shutdown(rx);
2761 }
2762
2763 #[tokio::test]
2764 async fn agent_with_security_sets_config() {
2765 let provider = mock_provider(vec![]);
2766 let channel = MockChannel::new(vec![]);
2767 let registry = create_test_registry();
2768 let executor = MockToolExecutor::no_tools();
2769
2770 let security = SecurityConfig {
2771 redact_secrets: true,
2772 ..Default::default()
2773 };
2774 let timeouts = TimeoutConfig {
2775 llm_seconds: 60,
2776 ..Default::default()
2777 };
2778
2779 let agent = Agent::new(provider, channel, registry, None, 5, executor)
2780 .with_security(security, timeouts);
2781
2782 assert!(agent.runtime.security.redact_secrets);
2783 assert_eq!(agent.runtime.timeouts.llm_seconds, 60);
2784 }
2785
2786 #[tokio::test]
2787 async fn agent_run_handles_empty_channel() {
2788 let provider = mock_provider(vec![]);
2789 let channel = MockChannel::new(vec![]);
2790 let registry = create_test_registry();
2791 let executor = MockToolExecutor::no_tools();
2792
2793 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
2794
2795 let result = agent.run().await;
2796 assert!(result.is_ok());
2797 }
2798
2799 #[tokio::test]
2800 async fn agent_run_processes_user_message() {
2801 let provider = mock_provider(vec!["test response".to_string()]);
2802 let channel = MockChannel::new(vec!["hello".to_string()]);
2803 let registry = create_test_registry();
2804 let executor = MockToolExecutor::no_tools();
2805
2806 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
2807
2808 let result = agent.run().await;
2809 assert!(result.is_ok());
2810 assert_eq!(agent.messages.len(), 3);
2811 assert_eq!(agent.messages[1].role, Role::User);
2812 assert_eq!(agent.messages[1].content, "hello");
2813 assert_eq!(agent.messages[2].role, Role::Assistant);
2814 }
2815
2816 #[tokio::test]
2817 async fn agent_run_handles_shutdown_signal() {
2818 let provider = mock_provider(vec![]);
2819 let (tx, rx) = watch::channel(false);
2820 let channel = MockChannel::new(vec!["should not process".to_string()]);
2821 let registry = create_test_registry();
2822 let executor = MockToolExecutor::no_tools();
2823
2824 let mut agent =
2825 Agent::new(provider, channel, registry, None, 5, executor).with_shutdown(rx);
2826
2827 tx.send(true).unwrap();
2828
2829 let result = agent.run().await;
2830 assert!(result.is_ok());
2831 }
2832
2833 #[tokio::test]
2834 async fn agent_handles_skills_command() {
2835 let provider = mock_provider(vec![]);
2836 let _channel = MockChannel::new(vec!["/skills".to_string()]);
2837 let registry = create_test_registry();
2838 let executor = MockToolExecutor::no_tools();
2839
2840 let agent_channel = MockChannel::new(vec!["/skills".to_string()]);
2841 let sent = agent_channel.sent.clone();
2842
2843 let mut agent = Agent::new(provider, agent_channel, registry, None, 5, executor);
2844
2845 let result = agent.run().await;
2846 assert!(result.is_ok());
2847
2848 let sent_msgs = sent.lock().unwrap();
2849 assert!(!sent_msgs.is_empty());
2850 assert!(sent_msgs[0].contains("Available skills"));
2851 }
2852
2853 #[tokio::test]
2854 async fn agent_process_response_handles_empty_response() {
2855 let provider = mock_provider(vec!["".to_string()]);
2856 let _channel = MockChannel::new(vec!["test".to_string()]);
2857 let registry = create_test_registry();
2858 let executor = MockToolExecutor::no_tools();
2859
2860 let agent_channel = MockChannel::new(vec!["test".to_string()]);
2861 let sent = agent_channel.sent.clone();
2862
2863 let mut agent = Agent::new(provider, agent_channel, registry, None, 5, executor);
2864
2865 let result = agent.run().await;
2866 assert!(result.is_ok());
2867
2868 let sent_msgs = sent.lock().unwrap();
2869 assert!(sent_msgs.iter().any(|m| m.contains("empty response")));
2870 }
2871
2872 #[tokio::test]
2873 async fn agent_handles_tool_execution_success() {
2874 let provider = mock_provider(vec!["response with tool".to_string()]);
2875 let _channel = MockChannel::new(vec!["execute tool".to_string()]);
2876 let registry = create_test_registry();
2877 let executor = MockToolExecutor::new(vec![Ok(Some(ToolOutput {
2878 tool_name: "bash".to_string(),
2879 summary: "tool executed successfully".to_string(),
2880 blocks_executed: 1,
2881 filter_stats: None,
2882 diff: None,
2883 streamed: false,
2884 terminal_id: None,
2885 locations: None,
2886 raw_response: None,
2887 }))]);
2888
2889 let agent_channel = MockChannel::new(vec!["execute tool".to_string()]);
2890 let sent = agent_channel.sent.clone();
2891
2892 let mut agent = Agent::new(provider, agent_channel, registry, None, 5, executor);
2893
2894 let result = agent.run().await;
2895 assert!(result.is_ok());
2896
2897 let sent_msgs = sent.lock().unwrap();
2898 assert!(
2899 sent_msgs
2900 .iter()
2901 .any(|m| m.contains("tool executed successfully"))
2902 );
2903 }
2904
2905 #[tokio::test]
2906 async fn agent_handles_tool_blocked_error() {
2907 let provider = mock_provider(vec!["run blocked command".to_string()]);
2908 let _channel = MockChannel::new(vec!["test".to_string()]);
2909 let registry = create_test_registry();
2910 let executor = MockToolExecutor::new(vec![Err(ToolError::Blocked {
2911 command: "rm -rf /".to_string(),
2912 })]);
2913
2914 let agent_channel = MockChannel::new(vec!["test".to_string()]);
2915 let sent = agent_channel.sent.clone();
2916
2917 let mut agent = Agent::new(provider, agent_channel, registry, None, 5, executor);
2918
2919 let result = agent.run().await;
2920 assert!(result.is_ok());
2921
2922 let sent_msgs = sent.lock().unwrap();
2923 assert!(
2924 sent_msgs
2925 .iter()
2926 .any(|m| m.contains("blocked by security policy"))
2927 );
2928 }
2929
2930 #[tokio::test]
2931 async fn agent_handles_tool_sandbox_violation() {
2932 let provider = mock_provider(vec!["access forbidden path".to_string()]);
2933 let _channel = MockChannel::new(vec!["test".to_string()]);
2934 let registry = create_test_registry();
2935 let executor = MockToolExecutor::new(vec![Err(ToolError::SandboxViolation {
2936 path: "/etc/passwd".to_string(),
2937 })]);
2938
2939 let agent_channel = MockChannel::new(vec!["test".to_string()]);
2940 let sent = agent_channel.sent.clone();
2941
2942 let mut agent = Agent::new(provider, agent_channel, registry, None, 5, executor);
2943
2944 let result = agent.run().await;
2945 assert!(result.is_ok());
2946
2947 let sent_msgs = sent.lock().unwrap();
2948 assert!(sent_msgs.iter().any(|m| m.contains("outside the sandbox")));
2949 }
2950
2951 #[tokio::test]
2952 async fn agent_handles_tool_confirmation_approved() {
2953 let provider = mock_provider(vec!["needs confirmation".to_string()]);
2954 let _channel = MockChannel::new(vec!["test".to_string()]);
2955 let registry = create_test_registry();
2956 let executor = MockToolExecutor::new(vec![Err(ToolError::ConfirmationRequired {
2957 command: "dangerous command".to_string(),
2958 })]);
2959
2960 let agent_channel =
2961 MockChannel::new(vec!["test".to_string()]).with_confirmations(vec![true]);
2962 let sent = agent_channel.sent.clone();
2963
2964 let mut agent = Agent::new(provider, agent_channel, registry, None, 5, executor);
2965
2966 let result = agent.run().await;
2967 assert!(result.is_ok());
2968
2969 let sent_msgs = sent.lock().unwrap();
2970 assert!(!sent_msgs.is_empty());
2971 }
2972
2973 #[tokio::test]
2974 async fn agent_handles_tool_confirmation_denied() {
2975 let provider = mock_provider(vec!["needs confirmation".to_string()]);
2976 let _channel = MockChannel::new(vec!["test".to_string()]);
2977 let registry = create_test_registry();
2978 let executor = MockToolExecutor::new(vec![Err(ToolError::ConfirmationRequired {
2979 command: "dangerous command".to_string(),
2980 })]);
2981
2982 let agent_channel =
2983 MockChannel::new(vec!["test".to_string()]).with_confirmations(vec![false]);
2984 let sent = agent_channel.sent.clone();
2985
2986 let mut agent = Agent::new(provider, agent_channel, registry, None, 5, executor);
2987
2988 let result = agent.run().await;
2989 assert!(result.is_ok());
2990
2991 let sent_msgs = sent.lock().unwrap();
2992 assert!(sent_msgs.iter().any(|m| m.contains("Command cancelled")));
2993 }
2994
2995 #[tokio::test]
2996 async fn agent_handles_streaming_response() {
2997 let provider = mock_provider_streaming(vec!["streaming response".to_string()]);
2998 let _channel = MockChannel::new(vec!["test".to_string()]);
2999 let registry = create_test_registry();
3000 let executor = MockToolExecutor::no_tools();
3001
3002 let agent_channel = MockChannel::new(vec!["test".to_string()]);
3003 let chunks = agent_channel.chunks.clone();
3004
3005 let mut agent = Agent::new(provider, agent_channel, registry, None, 5, executor);
3006
3007 let result = agent.run().await;
3008 assert!(result.is_ok());
3009
3010 let sent_chunks = chunks.lock().unwrap();
3011 assert!(!sent_chunks.is_empty());
3012 }
3013
3014 #[tokio::test]
3015 async fn agent_maybe_redact_enabled() {
3016 let provider = mock_provider(vec![]);
3017 let channel = MockChannel::new(vec![]);
3018 let registry = create_test_registry();
3019 let executor = MockToolExecutor::no_tools();
3020
3021 let security = SecurityConfig {
3022 redact_secrets: true,
3023 ..Default::default()
3024 };
3025
3026 let agent = Agent::new(provider, channel, registry, None, 5, executor)
3027 .with_security(security, TimeoutConfig::default());
3028
3029 let text = "token: sk-abc123secret";
3030 let redacted = agent.maybe_redact(text);
3031 assert_ne!(AsRef::<str>::as_ref(&redacted), text);
3032 }
3033
3034 #[tokio::test]
3035 async fn agent_maybe_redact_disabled() {
3036 let provider = mock_provider(vec![]);
3037 let channel = MockChannel::new(vec![]);
3038 let registry = create_test_registry();
3039 let executor = MockToolExecutor::no_tools();
3040
3041 let security = SecurityConfig {
3042 redact_secrets: false,
3043 ..Default::default()
3044 };
3045
3046 let agent = Agent::new(provider, channel, registry, None, 5, executor)
3047 .with_security(security, TimeoutConfig::default());
3048
3049 let text = "password=secret123";
3050 let redacted = agent.maybe_redact(text);
3051 assert_eq!(AsRef::<str>::as_ref(&redacted), text);
3052 }
3053
3054 #[tokio::test]
3055 async fn agent_handles_multiple_messages() {
3056 let provider = mock_provider(vec![
3057 "first response".to_string(),
3058 "second response".to_string(),
3059 ]);
3060 let channel = MockChannel::new(vec!["first".to_string(), "second".to_string()]);
3063 let registry = create_test_registry();
3064 let executor = MockToolExecutor::new(vec![Ok(None), Ok(None)]);
3065
3066 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
3067
3068 let result = agent.run().await;
3069 assert!(result.is_ok());
3070 assert_eq!(agent.messages.len(), 3);
3071 assert_eq!(agent.messages[1].content, "first\nsecond");
3072 }
3073
3074 #[tokio::test]
3075 async fn agent_handles_tool_output_with_error_marker() {
3076 let provider = mock_provider(vec!["response".to_string(), "retry".to_string()]);
3077 let channel = MockChannel::new(vec!["test".to_string()]);
3078 let registry = create_test_registry();
3079 let executor = MockToolExecutor::new(vec![
3080 Ok(Some(ToolOutput {
3081 tool_name: "bash".to_string(),
3082 summary: "[error] command failed [exit code 1]".to_string(),
3083 blocks_executed: 1,
3084 filter_stats: None,
3085 diff: None,
3086 streamed: false,
3087 terminal_id: None,
3088 locations: None,
3089 raw_response: None,
3090 })),
3091 Ok(None),
3092 ]);
3093
3094 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
3095
3096 let result = agent.run().await;
3097 assert!(result.is_ok());
3098 }
3099
3100 #[tokio::test]
3101 async fn agent_handles_empty_tool_output() {
3102 let provider = mock_provider(vec!["response".to_string()]);
3103 let channel = MockChannel::new(vec!["test".to_string()]);
3104 let registry = create_test_registry();
3105 let executor = MockToolExecutor::new(vec![Ok(Some(ToolOutput {
3106 tool_name: "bash".to_string(),
3107 summary: " ".to_string(),
3108 blocks_executed: 1,
3109 filter_stats: None,
3110 diff: None,
3111 streamed: false,
3112 terminal_id: None,
3113 locations: None,
3114 raw_response: None,
3115 }))]);
3116
3117 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
3118
3119 let result = agent.run().await;
3120 assert!(result.is_ok());
3121 }
3122
3123 #[tokio::test]
3124 async fn shutdown_signal_helper_returns_on_true() {
3125 let (tx, rx) = watch::channel(false);
3126 let handle = tokio::spawn(async move {
3127 let mut rx_clone = rx;
3128 shutdown_signal(&mut rx_clone).await;
3129 });
3130
3131 tx.send(true).unwrap();
3132 let result = tokio::time::timeout(std::time::Duration::from_millis(100), handle).await;
3133 assert!(result.is_ok());
3134 }
3135
3136 #[tokio::test]
3137 async fn recv_optional_returns_pending_when_no_receiver() {
3138 let result = tokio::time::timeout(
3139 std::time::Duration::from_millis(10),
3140 recv_optional::<SkillEvent>(&mut None),
3141 )
3142 .await;
3143 assert!(result.is_err());
3144 }
3145
3146 #[tokio::test]
3147 async fn recv_optional_receives_from_channel() {
3148 let (tx, rx) = mpsc::channel(1);
3149 tx.send(SkillEvent::Changed).await.unwrap();
3150
3151 let result = recv_optional(&mut Some(rx)).await;
3152 assert!(result.is_some());
3153 }
3154
3155 #[tokio::test]
3156 async fn agent_with_skill_reload_sets_paths() {
3157 let provider = mock_provider(vec![]);
3158 let channel = MockChannel::new(vec![]);
3159 let registry = create_test_registry();
3160 let executor = MockToolExecutor::no_tools();
3161 let (_tx, rx) = mpsc::channel(1);
3162
3163 let paths = vec![std::path::PathBuf::from("/test/path")];
3164 let agent = Agent::new(provider, channel, registry, None, 5, executor)
3165 .with_skill_reload(paths.clone(), rx);
3166
3167 assert_eq!(agent.skill_state.skill_paths, paths);
3168 }
3169
3170 #[tokio::test]
3171 async fn agent_handles_tool_execution_error() {
3172 let provider = mock_provider(vec!["response".to_string()]);
3173 let _channel = MockChannel::new(vec!["test".to_string()]);
3174 let registry = create_test_registry();
3175 let executor = MockToolExecutor::new(vec![Err(ToolError::Timeout { timeout_secs: 30 })]);
3176
3177 let agent_channel = MockChannel::new(vec!["test".to_string()]);
3178 let sent = agent_channel.sent.clone();
3179
3180 let mut agent = Agent::new(provider, agent_channel, registry, None, 5, executor);
3181
3182 let result = agent.run().await;
3183 assert!(result.is_ok());
3184
3185 let sent_msgs = sent.lock().unwrap();
3186 assert!(
3187 sent_msgs
3188 .iter()
3189 .any(|m| m.contains("Tool execution failed"))
3190 );
3191 }
3192
3193 #[tokio::test]
3194 async fn agent_processes_multi_turn_tool_execution() {
3195 let provider = mock_provider(vec![
3196 "first response".to_string(),
3197 "second response".to_string(),
3198 ]);
3199 let channel = MockChannel::new(vec!["start task".to_string()]);
3200 let registry = create_test_registry();
3201 let executor = MockToolExecutor::new(vec![
3202 Ok(Some(ToolOutput {
3203 tool_name: "bash".to_string(),
3204 summary: "step 1 complete".to_string(),
3205 blocks_executed: 1,
3206 filter_stats: None,
3207 diff: None,
3208 streamed: false,
3209 terminal_id: None,
3210 locations: None,
3211 raw_response: None,
3212 })),
3213 Ok(None),
3214 ]);
3215
3216 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
3217
3218 let result = agent.run().await;
3219 assert!(result.is_ok());
3220 assert!(agent.messages.len() > 3);
3221 }
3222
3223 #[tokio::test]
3224 async fn agent_respects_max_shell_iterations() {
3225 let mut responses = vec![];
3226 for _ in 0..10 {
3227 responses.push("response".to_string());
3228 }
3229 let provider = mock_provider(responses);
3230 let channel = MockChannel::new(vec!["test".to_string()]);
3231 let registry = create_test_registry();
3232
3233 let mut outputs = vec![];
3234 for _ in 0..10 {
3235 outputs.push(Ok(Some(ToolOutput {
3236 tool_name: "bash".to_string(),
3237 summary: "continuing".to_string(),
3238 blocks_executed: 1,
3239 filter_stats: None,
3240 diff: None,
3241 streamed: false,
3242 terminal_id: None,
3243 locations: None,
3244 raw_response: None,
3245 })));
3246 }
3247 let executor = MockToolExecutor::new(outputs);
3248
3249 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
3250
3251 let result = agent.run().await;
3252 assert!(result.is_ok());
3253 let assistant_count = agent
3254 .messages
3255 .iter()
3256 .filter(|m| m.role == Role::Assistant)
3257 .count();
3258 assert!(assistant_count <= 10);
3259 }
3260
3261 #[test]
3262 fn security_config_default() {
3263 let config = SecurityConfig::default();
3264 let _ = format!("{config:?}");
3265 }
3266
3267 #[test]
3268 fn timeout_config_default() {
3269 let config = TimeoutConfig::default();
3270 let _ = format!("{config:?}");
3271 }
3272
3273 #[tokio::test]
3274 async fn agent_with_metrics_sets_initial_values() {
3275 let provider = mock_provider(vec![]);
3276 let channel = MockChannel::new(vec![]);
3277 let registry = create_test_registry();
3278 let executor = MockToolExecutor::no_tools();
3279 let (tx, rx) = watch::channel(crate::metrics::MetricsSnapshot::default());
3280
3281 let _agent = Agent::new(provider, channel, registry, None, 5, executor)
3282 .with_model_name("test-model")
3283 .with_metrics(tx);
3284
3285 let snapshot = rx.borrow().clone();
3286 assert_eq!(snapshot.provider_name, "mock");
3287 assert_eq!(snapshot.model_name, "test-model");
3288 assert_eq!(snapshot.total_skills, 1);
3289 assert!(
3290 snapshot.prompt_tokens > 0,
3291 "initial prompt estimate should be non-zero"
3292 );
3293 assert_eq!(snapshot.total_tokens, snapshot.prompt_tokens);
3294 }
3295
3296 #[tokio::test]
3297 async fn agent_metrics_update_on_llm_call() {
3298 let provider = mock_provider(vec!["response".to_string()]);
3299 let channel = MockChannel::new(vec!["hello".to_string()]);
3300 let registry = create_test_registry();
3301 let executor = MockToolExecutor::no_tools();
3302 let (tx, rx) = watch::channel(crate::metrics::MetricsSnapshot::default());
3303
3304 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_metrics(tx);
3305
3306 agent.run().await.unwrap();
3307
3308 let snapshot = rx.borrow().clone();
3309 assert_eq!(snapshot.api_calls, 1);
3310 assert!(snapshot.total_tokens > 0);
3311 }
3312
3313 #[tokio::test]
3314 async fn agent_metrics_streaming_updates_completion_tokens() {
3315 let provider = mock_provider_streaming(vec!["streaming response".to_string()]);
3316 let channel = MockChannel::new(vec!["test".to_string()]);
3317 let registry = create_test_registry();
3318 let executor = MockToolExecutor::no_tools();
3319 let (tx, rx) = watch::channel(crate::metrics::MetricsSnapshot::default());
3320
3321 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_metrics(tx);
3322
3323 agent.run().await.unwrap();
3324
3325 let snapshot = rx.borrow().clone();
3326 assert!(snapshot.completion_tokens > 0);
3327 assert_eq!(snapshot.api_calls, 1);
3328 }
3329
3330 #[tokio::test]
3331 async fn agent_metrics_persist_increments_count() {
3332 let provider = mock_provider(vec!["response".to_string()]);
3333 let channel = MockChannel::new(vec!["hello".to_string()]);
3334 let registry = create_test_registry();
3335 let executor = MockToolExecutor::no_tools();
3336 let (tx, rx) = watch::channel(crate::metrics::MetricsSnapshot::default());
3337
3338 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_metrics(tx);
3339
3340 agent.run().await.unwrap();
3341
3342 let snapshot = rx.borrow().clone();
3343 assert!(snapshot.sqlite_message_count == 0, "no memory = no persist");
3344 }
3345
3346 #[tokio::test]
3347 async fn agent_metrics_skills_updated_on_prompt_rebuild() {
3348 let provider = mock_provider(vec!["response".to_string()]);
3349 let channel = MockChannel::new(vec!["hello".to_string()]);
3350 let registry = create_test_registry();
3351 let executor = MockToolExecutor::no_tools();
3352 let (tx, rx) = watch::channel(crate::metrics::MetricsSnapshot::default());
3353
3354 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_metrics(tx);
3355
3356 agent.run().await.unwrap();
3357
3358 let snapshot = rx.borrow().clone();
3359 assert_eq!(snapshot.total_skills, 1);
3360 assert!(!snapshot.active_skills.is_empty());
3361 }
3362
3363 #[test]
3364 fn update_metrics_noop_when_none() {
3365 let provider = mock_provider(vec![]);
3366 let channel = MockChannel::new(vec![]);
3367 let registry = create_test_registry();
3368 let executor = MockToolExecutor::no_tools();
3369
3370 let agent = Agent::new(provider, channel, registry, None, 5, executor);
3371 agent.update_metrics(|m| m.api_calls = 999);
3372 }
3373
3374 #[test]
3375 fn update_metrics_sets_uptime_seconds() {
3376 let provider = mock_provider(vec![]);
3377 let channel = MockChannel::new(vec![]);
3378 let registry = create_test_registry();
3379 let executor = MockToolExecutor::no_tools();
3380
3381 let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
3382 let agent = Agent::new(provider, channel, registry, None, 5, executor).with_metrics(tx);
3383
3384 agent.update_metrics(|m| m.api_calls = 1);
3385
3386 let snapshot = rx.borrow();
3387 assert!(snapshot.uptime_seconds < 2);
3388 assert_eq!(snapshot.api_calls, 1);
3389 }
3390
3391 #[test]
3392 fn test_last_user_query_finds_original() {
3393 let provider = mock_provider(vec![]);
3394 let channel = MockChannel::new(vec![]);
3395 let registry = create_test_registry();
3396 let executor = MockToolExecutor::no_tools();
3397
3398 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
3399 agent.messages.push(Message {
3400 role: Role::User,
3401 content: "hello".to_string(),
3402 parts: vec![],
3403 metadata: MessageMetadata::default(),
3404 });
3405 agent.messages.push(Message {
3406 role: Role::Assistant,
3407 content: "cmd".to_string(),
3408 parts: vec![],
3409 metadata: MessageMetadata::default(),
3410 });
3411 agent.messages.push(Message {
3412 role: Role::User,
3413 content: "[tool output: bash]\nsome output".to_string(),
3414 parts: vec![],
3415 metadata: MessageMetadata::default(),
3416 });
3417
3418 assert_eq!(agent.last_user_query(), "hello");
3419 }
3420
3421 #[test]
3422 fn test_last_user_query_empty_messages() {
3423 let provider = mock_provider(vec![]);
3424 let channel = MockChannel::new(vec![]);
3425 let registry = create_test_registry();
3426 let executor = MockToolExecutor::no_tools();
3427
3428 let agent = Agent::new(provider, channel, registry, None, 5, executor);
3429 assert_eq!(agent.last_user_query(), "");
3430 }
3431
3432 #[tokio::test]
3433 async fn test_maybe_summarize_short_output_passthrough() {
3434 let provider = mock_provider(vec![]);
3435 let channel = MockChannel::new(vec![]);
3436 let registry = create_test_registry();
3437 let executor = MockToolExecutor::no_tools();
3438
3439 let agent = Agent::new(provider, channel, registry, None, 5, executor)
3440 .with_tool_summarization(true);
3441
3442 let short = "short output";
3443 let result = agent.maybe_summarize_tool_output(short).await;
3444 assert_eq!(result, short);
3445 }
3446
3447 #[tokio::test]
3448 async fn test_overflow_notice_contains_filename() {
3449 let dir = tempfile::tempdir().unwrap();
3450 let provider = mock_provider(vec![]);
3451 let channel = MockChannel::new(vec![]);
3452 let registry = create_test_registry();
3453 let executor = MockToolExecutor::no_tools();
3454
3455 let agent = Agent::new(provider, channel, registry, None, 5, executor)
3456 .with_tool_summarization(false)
3457 .with_overflow_config(zeph_tools::OverflowConfig {
3458 threshold: 100,
3459 retention_days: 7,
3460 dir: Some(dir.path().to_path_buf()),
3461 });
3462
3463 let long = "x".repeat(zeph_tools::MAX_TOOL_OUTPUT_CHARS + 1000);
3464 let result = agent.maybe_summarize_tool_output(&long).await;
3465 assert!(result.contains("full output saved to"));
3466 let notice_start = result.find("full output saved to").unwrap();
3468 let notice_part = &result[notice_start..];
3469 assert!(notice_part.contains(".txt"));
3470 assert!(notice_part.contains(std::path::MAIN_SEPARATOR));
3471 assert!(notice_part.contains("bytes"));
3472 }
3473
3474 #[tokio::test]
3475 async fn test_maybe_summarize_long_output_disabled_truncates() {
3476 let provider = mock_provider(vec![]);
3477 let channel = MockChannel::new(vec![]);
3478 let registry = create_test_registry();
3479 let executor = MockToolExecutor::no_tools();
3480
3481 let agent = Agent::new(provider, channel, registry, None, 5, executor)
3482 .with_tool_summarization(false)
3483 .with_overflow_config(zeph_tools::OverflowConfig {
3484 threshold: 1000,
3485 retention_days: 7,
3486 dir: None,
3487 });
3488
3489 let long = "x".repeat(zeph_tools::MAX_TOOL_OUTPUT_CHARS + 1000);
3492 let result = agent.maybe_summarize_tool_output(&long).await;
3493 assert!(result.contains("truncated"));
3494 }
3495
3496 #[tokio::test]
3497 async fn test_maybe_summarize_long_output_enabled_calls_llm() {
3498 let provider = mock_provider(vec!["summary text".to_string()]);
3499 let channel = MockChannel::new(vec![]);
3500 let registry = create_test_registry();
3501 let executor = MockToolExecutor::no_tools();
3502
3503 let agent = Agent::new(provider, channel, registry, None, 5, executor)
3504 .with_tool_summarization(true)
3505 .with_overflow_config(zeph_tools::OverflowConfig {
3506 threshold: 1000,
3507 retention_days: 7,
3508 dir: None,
3509 });
3510
3511 let long = "x".repeat(zeph_tools::MAX_TOOL_OUTPUT_CHARS + 1000);
3512 let result = agent.maybe_summarize_tool_output(&long).await;
3513 assert!(result.contains("summary text"));
3514 assert!(result.contains("[tool output summary]"));
3515 assert!(!result.contains("truncated"));
3516 }
3517
3518 #[tokio::test]
3519 async fn test_summarize_tool_output_llm_failure_fallback() {
3520 let provider = mock_provider_failing();
3521 let channel = MockChannel::new(vec![]);
3522 let registry = create_test_registry();
3523 let executor = MockToolExecutor::no_tools();
3524
3525 let agent = Agent::new(provider, channel, registry, None, 5, executor)
3526 .with_tool_summarization(true)
3527 .with_overflow_config(zeph_tools::OverflowConfig {
3528 threshold: 1000,
3529 retention_days: 7,
3530 dir: None,
3531 });
3532
3533 let long = "x".repeat(zeph_tools::MAX_TOOL_OUTPUT_CHARS + 1000);
3534 let result = agent.maybe_summarize_tool_output(&long).await;
3535 assert!(result.contains("truncated"));
3536 }
3537
3538 #[test]
3539 fn with_tool_summarization_sets_flag() {
3540 let provider = mock_provider(vec![]);
3541 let channel = MockChannel::new(vec![]);
3542 let registry = create_test_registry();
3543 let executor = MockToolExecutor::no_tools();
3544
3545 let agent = Agent::new(provider, channel, registry, None, 5, executor)
3546 .with_tool_summarization(true);
3547 assert!(agent.tool_orchestrator.summarize_tool_output_enabled);
3548
3549 let provider2 = mock_provider(vec![]);
3550 let channel2 = MockChannel::new(vec![]);
3551 let registry2 = create_test_registry();
3552 let executor2 = MockToolExecutor::no_tools();
3553
3554 let agent2 = Agent::new(provider2, channel2, registry2, None, 5, executor2)
3555 .with_tool_summarization(false);
3556 assert!(!agent2.tool_orchestrator.summarize_tool_output_enabled);
3557 }
3558
3559 #[test]
3560 fn doom_loop_detection_triggers_on_identical_outputs() {
3561 let h = 42u64;
3563 let history: Vec<u64> = vec![h, h, h];
3564 let recent = &history[history.len() - DOOM_LOOP_WINDOW..];
3565 assert!(recent.windows(2).all(|w| w[0] == w[1]));
3566 }
3567
3568 #[test]
3569 fn doom_loop_detection_no_trigger_on_different_outputs() {
3570 let history: Vec<u64> = vec![1, 2, 3];
3571 let recent = &history[history.len() - DOOM_LOOP_WINDOW..];
3572 assert!(!recent.windows(2).all(|w| w[0] == w[1]));
3573 }
3574
3575 #[test]
3576 fn format_tool_output_structure() {
3577 let out = format_tool_output("bash", "hello world");
3578 assert!(out.starts_with("[tool output: bash]\n```\n"));
3579 assert!(out.ends_with(TOOL_OUTPUT_SUFFIX));
3580 assert!(out.contains("hello world"));
3581 }
3582
3583 #[test]
3584 fn format_tool_output_empty_body() {
3585 let out = format_tool_output("grep", "");
3586 assert_eq!(out, "[tool output: grep]\n```\n\n```");
3587 }
3588
3589 #[tokio::test]
3590 async fn cancel_signal_propagates_to_fresh_token() {
3591 use tokio_util::sync::CancellationToken;
3592 let signal = Arc::new(Notify::new());
3593
3594 let token = CancellationToken::new();
3595 let sig = Arc::clone(&signal);
3596 let tok = token.clone();
3597 tokio::spawn(async move {
3598 sig.notified().await;
3599 tok.cancel();
3600 });
3601
3602 tokio::task::yield_now().await;
3604 assert!(!token.is_cancelled());
3605 signal.notify_waiters();
3606 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3607 assert!(token.is_cancelled());
3608 }
3609
3610 #[tokio::test]
3611 async fn cancel_signal_works_across_multiple_messages() {
3612 use tokio_util::sync::CancellationToken;
3613 let signal = Arc::new(Notify::new());
3614
3615 let token1 = CancellationToken::new();
3617 let sig1 = Arc::clone(&signal);
3618 let tok1 = token1.clone();
3619 tokio::spawn(async move {
3620 sig1.notified().await;
3621 tok1.cancel();
3622 });
3623
3624 tokio::task::yield_now().await;
3625 signal.notify_waiters();
3626 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3627 assert!(token1.is_cancelled());
3628
3629 let token2 = CancellationToken::new();
3631 let sig2 = Arc::clone(&signal);
3632 let tok2 = token2.clone();
3633 tokio::spawn(async move {
3634 sig2.notified().await;
3635 tok2.cancel();
3636 });
3637
3638 tokio::task::yield_now().await;
3639 assert!(!token2.is_cancelled());
3640 signal.notify_waiters();
3641 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3642 assert!(token2.is_cancelled());
3643 }
3644
3645 mod resolve_message_tests {
3646 use super::*;
3647 use crate::channel::{Attachment, AttachmentKind, ChannelMessage};
3648 use std::future::Future;
3649 use std::pin::Pin;
3650 use zeph_llm::error::LlmError;
3651 use zeph_llm::stt::{SpeechToText, Transcription};
3652
3653 struct MockStt {
3654 text: Option<String>,
3655 }
3656
3657 impl MockStt {
3658 fn ok(text: &str) -> Self {
3659 Self {
3660 text: Some(text.to_string()),
3661 }
3662 }
3663
3664 fn failing() -> Self {
3665 Self { text: None }
3666 }
3667 }
3668
3669 impl SpeechToText for MockStt {
3670 fn transcribe(
3671 &self,
3672 _audio: &[u8],
3673 _filename: Option<&str>,
3674 ) -> Pin<Box<dyn Future<Output = Result<Transcription, LlmError>> + Send + '_>>
3675 {
3676 let result = match &self.text {
3677 Some(t) => Ok(Transcription {
3678 text: t.clone(),
3679 language: None,
3680 duration_secs: None,
3681 }),
3682 None => Err(LlmError::TranscriptionFailed("mock error".into())),
3683 };
3684 Box::pin(async move { result })
3685 }
3686 }
3687
3688 fn make_agent(stt: Option<Box<dyn SpeechToText>>) -> Agent<MockChannel> {
3689 let provider = mock_provider(vec!["ok".into()]);
3690 let empty: Vec<String> = vec![];
3691 let registry = zeph_skills::registry::SkillRegistry::load(&empty);
3692 let channel = MockChannel::new(vec![]);
3693 let executor = MockToolExecutor::no_tools();
3694 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
3695 agent.stt = stt;
3696 agent
3697 }
3698
3699 fn audio_attachment(data: &[u8]) -> Attachment {
3700 Attachment {
3701 kind: AttachmentKind::Audio,
3702 data: data.to_vec(),
3703 filename: Some("test.wav".into()),
3704 }
3705 }
3706
3707 #[tokio::test]
3708 async fn no_audio_attachments_returns_text() {
3709 let agent = make_agent(None);
3710 let msg = ChannelMessage {
3711 text: "hello".into(),
3712 attachments: vec![],
3713 };
3714 assert_eq!(agent.resolve_message(msg).await.0, "hello");
3715 }
3716
3717 #[tokio::test]
3718 async fn audio_without_stt_returns_original_text() {
3719 let agent = make_agent(None);
3720 let msg = ChannelMessage {
3721 text: "hello".into(),
3722 attachments: vec![audio_attachment(b"audio-data")],
3723 };
3724 assert_eq!(agent.resolve_message(msg).await.0, "hello");
3725 }
3726
3727 #[tokio::test]
3728 async fn audio_with_stt_prepends_transcription() {
3729 let agent = make_agent(Some(Box::new(MockStt::ok("transcribed text"))));
3730 let msg = ChannelMessage {
3731 text: "original".into(),
3732 attachments: vec![audio_attachment(b"audio-data")],
3733 };
3734 let (result, _) = agent.resolve_message(msg).await;
3735 assert!(result.contains("[transcribed audio]"));
3736 assert!(result.contains("transcribed text"));
3737 assert!(result.contains("original"));
3738 }
3739
3740 #[tokio::test]
3741 async fn audio_with_stt_no_original_text() {
3742 let agent = make_agent(Some(Box::new(MockStt::ok("transcribed text"))));
3743 let msg = ChannelMessage {
3744 text: String::new(),
3745 attachments: vec![audio_attachment(b"audio-data")],
3746 };
3747 let (result, _) = agent.resolve_message(msg).await;
3748 assert_eq!(result, "transcribed text");
3749 }
3750
3751 #[tokio::test]
3752 async fn all_transcriptions_fail_returns_original() {
3753 let agent = make_agent(Some(Box::new(MockStt::failing())));
3754 let msg = ChannelMessage {
3755 text: "original".into(),
3756 attachments: vec![audio_attachment(b"audio-data")],
3757 };
3758 assert_eq!(agent.resolve_message(msg).await.0, "original");
3759 }
3760
3761 #[tokio::test]
3762 async fn multiple_audio_attachments_joined() {
3763 let agent = make_agent(Some(Box::new(MockStt::ok("chunk"))));
3764 let msg = ChannelMessage {
3765 text: String::new(),
3766 attachments: vec![
3767 audio_attachment(b"a1"),
3768 audio_attachment(b"a2"),
3769 audio_attachment(b"a3"),
3770 ],
3771 };
3772 let (result, _) = agent.resolve_message(msg).await;
3773 assert_eq!(result, "chunk\nchunk\nchunk");
3774 }
3775
3776 #[tokio::test]
3777 async fn oversized_audio_skipped() {
3778 let agent = make_agent(Some(Box::new(MockStt::ok("should not appear"))));
3779 let big = vec![0u8; MAX_AUDIO_BYTES + 1];
3780 let msg = ChannelMessage {
3781 text: "original".into(),
3782 attachments: vec![Attachment {
3783 kind: AttachmentKind::Audio,
3784 data: big,
3785 filename: None,
3786 }],
3787 };
3788 assert_eq!(agent.resolve_message(msg).await.0, "original");
3789 }
3790 }
3791
3792 #[test]
3793 fn detect_image_mime_jpeg() {
3794 assert_eq!(detect_image_mime(Some("photo.jpg")), "image/jpeg");
3795 assert_eq!(detect_image_mime(Some("photo.jpeg")), "image/jpeg");
3796 }
3797
3798 #[test]
3799 fn detect_image_mime_gif() {
3800 assert_eq!(detect_image_mime(Some("anim.gif")), "image/gif");
3801 }
3802
3803 #[test]
3804 fn detect_image_mime_webp() {
3805 assert_eq!(detect_image_mime(Some("img.webp")), "image/webp");
3806 }
3807
3808 #[test]
3809 fn detect_image_mime_unknown_defaults_png() {
3810 assert_eq!(detect_image_mime(Some("file.bmp")), "image/png");
3811 assert_eq!(detect_image_mime(None), "image/png");
3812 }
3813
3814 #[tokio::test]
3815 async fn resolve_message_extracts_image_attachment() {
3816 let provider = mock_provider(vec![]);
3817 let channel = MockChannel::new(vec![]);
3818 let registry = create_test_registry();
3819 let executor = MockToolExecutor::no_tools();
3820 let agent = Agent::new(provider, channel, registry, None, 5, executor);
3821
3822 let msg = ChannelMessage {
3823 text: "look at this".into(),
3824 attachments: vec![Attachment {
3825 kind: AttachmentKind::Image,
3826 data: vec![0u8; 16],
3827 filename: Some("test.jpg".into()),
3828 }],
3829 };
3830 let (text, parts) = agent.resolve_message(msg).await;
3831 assert_eq!(text, "look at this");
3832 assert_eq!(parts.len(), 1);
3833 match &parts[0] {
3834 zeph_llm::provider::MessagePart::Image(img) => {
3835 assert_eq!(img.mime_type, "image/jpeg");
3836 assert_eq!(img.data.len(), 16);
3837 }
3838 _ => panic!("expected Image part"),
3839 }
3840 }
3841
3842 #[tokio::test]
3843 async fn resolve_message_drops_oversized_image() {
3844 let provider = mock_provider(vec![]);
3845 let channel = MockChannel::new(vec![]);
3846 let registry = create_test_registry();
3847 let executor = MockToolExecutor::no_tools();
3848 let agent = Agent::new(provider, channel, registry, None, 5, executor);
3849
3850 let msg = ChannelMessage {
3851 text: "big image".into(),
3852 attachments: vec![Attachment {
3853 kind: AttachmentKind::Image,
3854 data: vec![0u8; MAX_IMAGE_BYTES + 1],
3855 filename: Some("huge.png".into()),
3856 }],
3857 };
3858 let (text, parts) = agent.resolve_message(msg).await;
3859 assert_eq!(text, "big image");
3860 assert!(parts.is_empty());
3861 }
3862
3863 #[tokio::test]
3864 async fn handle_image_command_rejects_path_traversal() {
3865 let provider = mock_provider(vec![]);
3866 let channel = MockChannel::new(vec![]);
3867 let registry = create_test_registry();
3868 let executor = MockToolExecutor::no_tools();
3869 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
3870
3871 let result = agent.handle_image_command("../../etc/passwd").await;
3872 assert!(result.is_ok());
3873 assert!(agent.pending_image_parts.is_empty());
3874 let sent = agent.channel.sent_messages();
3876 assert!(sent.iter().any(|m| m.contains("traversal")));
3877 }
3878
3879 #[tokio::test]
3880 async fn handle_image_command_missing_file_sends_error() {
3881 let provider = mock_provider(vec![]);
3882 let channel = MockChannel::new(vec![]);
3883 let registry = create_test_registry();
3884 let executor = MockToolExecutor::no_tools();
3885 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
3886
3887 let result = agent.handle_image_command("/nonexistent/image.png").await;
3888 assert!(result.is_ok());
3889 assert!(agent.pending_image_parts.is_empty());
3890 let sent = agent.channel.sent_messages();
3891 assert!(sent.iter().any(|m| m.contains("Cannot read image")));
3892 }
3893
3894 #[tokio::test]
3895 async fn handle_image_command_loads_valid_file() {
3896 use std::io::Write;
3897 let provider = mock_provider(vec![]);
3898 let channel = MockChannel::new(vec![]);
3899 let registry = create_test_registry();
3900 let executor = MockToolExecutor::no_tools();
3901 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
3902
3903 let mut tmp = tempfile::NamedTempFile::with_suffix(".jpg").unwrap();
3905 let data = vec![0xFFu8, 0xD8, 0xFF, 0xE0];
3906 tmp.write_all(&data).unwrap();
3907 let path = tmp.path().to_str().unwrap().to_owned();
3908
3909 let result = agent.handle_image_command(&path).await;
3910 assert!(result.is_ok());
3911 assert_eq!(agent.pending_image_parts.len(), 1);
3912 match &agent.pending_image_parts[0] {
3913 zeph_llm::provider::MessagePart::Image(img) => {
3914 assert_eq!(img.data, data);
3915 assert_eq!(img.mime_type, "image/jpeg");
3916 }
3917 _ => panic!("expected Image part"),
3918 }
3919 let sent = agent.channel.sent_messages();
3920 assert!(sent.iter().any(|m| m.contains("Image loaded")));
3921 }
3922
3923 use crate::subagent::AgentCommand;
3926
3927 fn make_agent_with_manager() -> Agent<MockChannel> {
3928 use crate::subagent::def::{SkillFilter, SubAgentPermissions, ToolPolicy};
3929 use crate::subagent::hooks::SubagentHooks;
3930 use crate::subagent::{SubAgentDef, SubAgentManager};
3931
3932 let provider = mock_provider(vec![]);
3933 let channel = MockChannel::new(vec![]);
3934 let registry = create_test_registry();
3935 let executor = MockToolExecutor::no_tools();
3936 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
3937
3938 let mut mgr = SubAgentManager::new(4);
3939 mgr.definitions_mut().push(SubAgentDef {
3940 name: "helper".into(),
3941 description: "A helper bot".into(),
3942 model: None,
3943 tools: ToolPolicy::InheritAll,
3944 disallowed_tools: vec![],
3945 permissions: SubAgentPermissions::default(),
3946 skills: SkillFilter::default(),
3947 system_prompt: "You are helpful.".into(),
3948 hooks: SubagentHooks::default(),
3949 memory: None,
3950 source: None,
3951 file_path: None,
3952 });
3953 agent.subagent_manager = Some(mgr);
3954 agent
3955 }
3956
3957 #[tokio::test]
3958 async fn agent_command_no_manager_returns_none() {
3959 let provider = mock_provider(vec![]);
3960 let channel = MockChannel::new(vec![]);
3961 let registry = create_test_registry();
3962 let executor = MockToolExecutor::no_tools();
3963 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
3964 assert!(
3966 agent
3967 .handle_agent_command(AgentCommand::List)
3968 .await
3969 .is_none()
3970 );
3971 }
3972
3973 #[tokio::test]
3974 async fn agent_command_list_returns_definitions() {
3975 let mut agent = make_agent_with_manager();
3976 let resp = agent
3977 .handle_agent_command(AgentCommand::List)
3978 .await
3979 .unwrap();
3980 assert!(resp.contains("helper"));
3981 assert!(resp.contains("A helper bot"));
3982 }
3983
3984 #[tokio::test]
3985 async fn agent_command_spawn_unknown_name_returns_error() {
3986 let mut agent = make_agent_with_manager();
3987 let resp = agent
3988 .handle_agent_command(AgentCommand::Background {
3989 name: "unknown-bot".into(),
3990 prompt: "do something".into(),
3991 })
3992 .await
3993 .unwrap();
3994 assert!(resp.contains("Failed to spawn"));
3995 }
3996
3997 #[tokio::test]
3998 async fn agent_command_spawn_known_name_returns_started() {
3999 let mut agent = make_agent_with_manager();
4000 let resp = agent
4001 .handle_agent_command(AgentCommand::Background {
4002 name: "helper".into(),
4003 prompt: "do some work".into(),
4004 })
4005 .await
4006 .unwrap();
4007 assert!(resp.contains("helper"));
4008 assert!(resp.contains("started"));
4009 }
4010
4011 #[tokio::test]
4012 async fn agent_command_status_no_agents_returns_empty_message() {
4013 let mut agent = make_agent_with_manager();
4014 let resp = agent
4015 .handle_agent_command(AgentCommand::Status)
4016 .await
4017 .unwrap();
4018 assert!(resp.contains("No active sub-agents"));
4019 }
4020
4021 #[tokio::test]
4022 async fn agent_command_cancel_unknown_id_returns_not_found() {
4023 let mut agent = make_agent_with_manager();
4024 let resp = agent
4025 .handle_agent_command(AgentCommand::Cancel {
4026 id: "deadbeef".into(),
4027 })
4028 .await
4029 .unwrap();
4030 assert!(resp.contains("No sub-agent"));
4031 }
4032
4033 #[tokio::test]
4034 async fn agent_command_cancel_valid_id_succeeds() {
4035 let mut agent = make_agent_with_manager();
4036 let spawn_resp = agent
4038 .handle_agent_command(AgentCommand::Background {
4039 name: "helper".into(),
4040 prompt: "cancel this".into(),
4041 })
4042 .await
4043 .unwrap();
4044 let short_id = spawn_resp
4046 .split("id: ")
4047 .nth(1)
4048 .unwrap()
4049 .trim_end_matches(')')
4050 .trim()
4051 .to_string();
4052 let resp = agent
4053 .handle_agent_command(AgentCommand::Cancel { id: short_id })
4054 .await
4055 .unwrap();
4056 assert!(resp.contains("Cancelled"));
4057 }
4058
4059 #[tokio::test]
4060 async fn agent_command_approve_no_pending_request() {
4061 let mut agent = make_agent_with_manager();
4062 let spawn_resp = agent
4064 .handle_agent_command(AgentCommand::Background {
4065 name: "helper".into(),
4066 prompt: "do work".into(),
4067 })
4068 .await
4069 .unwrap();
4070 let short_id = spawn_resp
4071 .split("id: ")
4072 .nth(1)
4073 .unwrap()
4074 .trim_end_matches(')')
4075 .trim()
4076 .to_string();
4077 let resp = agent
4078 .handle_agent_command(AgentCommand::Approve { id: short_id })
4079 .await
4080 .unwrap();
4081 assert!(resp.contains("No pending secret request"));
4082 }
4083
4084 #[test]
4085 fn set_model_updates_model_name() {
4086 let provider = mock_provider(vec![]);
4087 let channel = MockChannel::new(vec![]);
4088 let registry = create_test_registry();
4089 let executor = MockToolExecutor::no_tools();
4090
4091 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
4092 assert!(agent.set_model("claude-opus-4-6").is_ok());
4093 assert_eq!(agent.runtime.model_name, "claude-opus-4-6");
4094 }
4095
4096 #[test]
4097 fn set_model_overwrites_previous_value() {
4098 let provider = mock_provider(vec![]);
4099 let channel = MockChannel::new(vec![]);
4100 let registry = create_test_registry();
4101 let executor = MockToolExecutor::no_tools();
4102
4103 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
4104 agent.set_model("model-a").unwrap();
4105 agent.set_model("model-b").unwrap();
4106 assert_eq!(agent.runtime.model_name, "model-b");
4107 }
4108
4109 #[tokio::test]
4110 async fn model_command_switch_sends_confirmation() {
4111 let provider = mock_provider(vec![]);
4112 let channel = MockChannel::new(vec![]);
4113 let sent = channel.sent.clone();
4114 let registry = create_test_registry();
4115 let executor = MockToolExecutor::no_tools();
4116
4117 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
4118 agent.handle_model_command("/model my-new-model").await;
4119 let messages = sent.lock().unwrap();
4120 assert!(
4121 messages.iter().any(|m| m.contains("my-new-model")),
4122 "expected switch confirmation, got: {messages:?}"
4123 );
4124 }
4125
4126 #[tokio::test]
4127 async fn model_command_list_no_cache_fetches_remote() {
4128 let provider = mock_provider(vec![]);
4130 let channel = MockChannel::new(vec![]);
4131 let sent = channel.sent.clone();
4132 let registry = create_test_registry();
4133 let executor = MockToolExecutor::no_tools();
4134
4135 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
4136 zeph_llm::model_cache::ModelCache::for_slug("mock").invalidate();
4138 agent.handle_model_command("/model").await;
4139 let messages = sent.lock().unwrap();
4140 assert!(
4142 messages.iter().any(|m| m.contains("No models")),
4143 "expected empty model list message, got: {messages:?}"
4144 );
4145 }
4146
4147 #[tokio::test]
4148 async fn model_command_refresh_sends_result() {
4149 let provider = mock_provider(vec![]);
4150 let channel = MockChannel::new(vec![]);
4151 let sent = channel.sent.clone();
4152 let registry = create_test_registry();
4153 let executor = MockToolExecutor::no_tools();
4154
4155 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
4156 agent.handle_model_command("/model refresh").await;
4157 let messages = sent.lock().unwrap();
4158 assert!(
4159 messages.iter().any(|m| m.contains("Fetched")),
4160 "expected fetch confirmation, got: {messages:?}"
4161 );
4162 }
4163
4164 #[tokio::test]
4165 async fn model_command_valid_model_accepted() {
4166 zeph_llm::model_cache::ModelCache::for_slug("mock").invalidate();
4168
4169 let models = vec![
4170 zeph_llm::model_cache::RemoteModelInfo {
4171 id: "llama3:8b".to_string(),
4172 display_name: "Llama 3 8B".to_string(),
4173 context_window: Some(8192),
4174 created_at: None,
4175 },
4176 zeph_llm::model_cache::RemoteModelInfo {
4177 id: "qwen3:8b".to_string(),
4178 display_name: "Qwen3 8B".to_string(),
4179 context_window: Some(32768),
4180 created_at: None,
4181 },
4182 ];
4183 let provider = mock_provider_with_models(vec![], models);
4184 let channel = MockChannel::new(vec![]);
4185 let sent = channel.sent.clone();
4186 let registry = create_test_registry();
4187 let executor = MockToolExecutor::no_tools();
4188
4189 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
4190 agent.handle_model_command("/model llama3:8b").await;
4191
4192 let messages = sent.lock().unwrap();
4193 assert!(
4194 messages
4195 .iter()
4196 .any(|m| m.contains("Switched to model: llama3:8b")),
4197 "expected switch confirmation, got: {messages:?}"
4198 );
4199 assert!(
4200 !messages.iter().any(|m| m.contains("Unknown model")),
4201 "unexpected rejection for valid model, got: {messages:?}"
4202 );
4203 }
4204
4205 #[tokio::test]
4206 async fn model_command_invalid_model_rejected() {
4207 zeph_llm::model_cache::ModelCache::for_slug("mock").invalidate();
4209
4210 let models = vec![zeph_llm::model_cache::RemoteModelInfo {
4211 id: "qwen3:8b".to_string(),
4212 display_name: "Qwen3 8B".to_string(),
4213 context_window: None,
4214 created_at: None,
4215 }];
4216 let provider = mock_provider_with_models(vec![], models);
4217 let channel = MockChannel::new(vec![]);
4218 let sent = channel.sent.clone();
4219 let registry = create_test_registry();
4220 let executor = MockToolExecutor::no_tools();
4221
4222 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
4223 agent.handle_model_command("/model nonexistent-model").await;
4224
4225 let messages = sent.lock().unwrap();
4226 assert!(
4227 messages
4228 .iter()
4229 .any(|m| m.contains("Unknown model") && m.contains("nonexistent-model")),
4230 "expected rejection with model name, got: {messages:?}"
4231 );
4232 assert!(
4233 messages.iter().any(|m| m.contains("qwen3:8b")),
4234 "expected available models list, got: {messages:?}"
4235 );
4236 assert!(
4237 !messages.iter().any(|m| m.contains("Switched to model")),
4238 "should not switch to invalid model, got: {messages:?}"
4239 );
4240 }
4241
4242 #[tokio::test]
4243 async fn model_command_empty_model_list_warns_and_proceeds() {
4244 zeph_llm::model_cache::ModelCache::for_slug("mock").invalidate();
4247
4248 let provider = mock_provider(vec![]);
4249 let channel = MockChannel::new(vec![]);
4250 let sent = channel.sent.clone();
4251 let registry = create_test_registry();
4252 let executor = MockToolExecutor::no_tools();
4253
4254 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
4255 agent.handle_model_command("/model unknown-model").await;
4256
4257 let messages = sent.lock().unwrap();
4258 assert!(
4259 messages.iter().any(|m| m.contains("unavailable")),
4260 "expected warning about unavailable model list, got: {messages:?}"
4261 );
4262 assert!(
4263 messages
4264 .iter()
4265 .any(|m| m.contains("Switched to model: unknown-model")),
4266 "expected switch to proceed despite missing model list, got: {messages:?}"
4267 );
4268 }
4269
4270 #[tokio::test]
4271 async fn help_command_lists_commands() {
4272 let provider = mock_provider(vec![]);
4273 let channel = MockChannel::new(vec!["/help".to_string()]);
4274 let sent = channel.sent.clone();
4275 let registry = create_test_registry();
4276 let executor = MockToolExecutor::no_tools();
4277
4278 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
4279 let result = agent.run().await;
4280 assert!(result.is_ok());
4281
4282 let messages = sent.lock().unwrap();
4283 assert!(!messages.is_empty(), "expected /help output");
4284 let output = messages.join("\n");
4285 assert!(output.contains("/help"), "expected /help in output");
4286 assert!(output.contains("/exit"), "expected /exit in output");
4287 assert!(output.contains("/status"), "expected /status in output");
4288 assert!(output.contains("/skills"), "expected /skills in output");
4289 assert!(output.contains("/model"), "expected /model in output");
4290 }
4291
4292 #[tokio::test]
4293 async fn help_command_does_not_include_unknown_commands() {
4294 let provider = mock_provider(vec![]);
4295 let channel = MockChannel::new(vec!["/help".to_string()]);
4296 let sent = channel.sent.clone();
4297 let registry = create_test_registry();
4298 let executor = MockToolExecutor::no_tools();
4299
4300 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
4301 let result = agent.run().await;
4302 assert!(result.is_ok());
4303
4304 let messages = sent.lock().unwrap();
4305 let output = messages.join("\n");
4306 assert!(
4308 !output.contains("/ingest"),
4309 "unexpected /ingest in /help output"
4310 );
4311 }
4312
4313 #[tokio::test]
4314 async fn status_command_includes_provider_and_model() {
4315 let provider = mock_provider(vec![]);
4316 let channel = MockChannel::new(vec!["/status".to_string()]);
4317 let sent = channel.sent.clone();
4318 let registry = create_test_registry();
4319 let executor = MockToolExecutor::no_tools();
4320
4321 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
4322 let result = agent.run().await;
4323 assert!(result.is_ok());
4324
4325 let messages = sent.lock().unwrap();
4326 assert!(!messages.is_empty(), "expected /status output");
4327 let output = messages.join("\n");
4328 assert!(output.contains("Provider:"), "expected Provider: field");
4329 assert!(output.contains("Model:"), "expected Model: field");
4330 assert!(output.contains("Uptime:"), "expected Uptime: field");
4331 assert!(output.contains("Tokens:"), "expected Tokens: field");
4332 }
4333
4334 #[tokio::test]
4337 async fn status_command_shows_metrics_in_cli_mode() {
4338 let provider = mock_provider(vec![]);
4339 let channel = MockChannel::new(vec!["/status".to_string()]);
4340 let sent = channel.sent.clone();
4341 let registry = create_test_registry();
4342 let executor = MockToolExecutor::no_tools();
4343
4344 let (tx, _rx) = watch::channel(MetricsSnapshot::default());
4345 let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_metrics(tx);
4346
4347 agent.update_metrics(|m| {
4349 m.api_calls = 3;
4350 m.prompt_tokens = 100;
4351 m.completion_tokens = 50;
4352 });
4353
4354 let result = agent.run().await;
4355 assert!(result.is_ok());
4356
4357 let messages = sent.lock().unwrap();
4358 let output = messages.join("\n");
4359 assert!(
4360 output.contains("API calls: 3"),
4361 "expected non-zero api_calls in /status output; got: {output}"
4362 );
4363 assert!(
4364 output.contains("100 prompt / 50 completion"),
4365 "expected non-zero tokens in /status output; got: {output}"
4366 );
4367 }
4368
4369 #[tokio::test]
4370 async fn exit_command_breaks_run_loop() {
4371 let provider = mock_provider(vec![]);
4372 let channel = MockChannel::new(vec!["/exit".to_string()]);
4373 let registry = create_test_registry();
4374 let executor = MockToolExecutor::no_tools();
4375
4376 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
4377 let result = agent.run().await;
4378 assert!(result.is_ok());
4379 assert_eq!(agent.messages.len(), 1, "expected only system message");
4381 }
4382
4383 #[tokio::test]
4384 async fn quit_command_breaks_run_loop() {
4385 let provider = mock_provider(vec![]);
4386 let channel = MockChannel::new(vec!["/quit".to_string()]);
4387 let registry = create_test_registry();
4388 let executor = MockToolExecutor::no_tools();
4389
4390 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
4391 let result = agent.run().await;
4392 assert!(result.is_ok());
4393 assert_eq!(agent.messages.len(), 1, "expected only system message");
4394 }
4395
4396 #[tokio::test]
4397 async fn exit_command_sends_info_and_continues_when_not_supported() {
4398 let provider = mock_provider(vec![]);
4399 let channel = MockChannel::new(vec![
4402 "/exit".to_string(),
4403 ])
4405 .without_exit_support();
4406 let sent = channel.sent.clone();
4407 let registry = create_test_registry();
4408 let executor = MockToolExecutor::no_tools();
4409
4410 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
4411 let result = agent.run().await;
4412 assert!(result.is_ok());
4413
4414 let messages = sent.lock().unwrap();
4415 assert!(
4416 messages
4417 .iter()
4418 .any(|m| m.contains("/exit is not supported")),
4419 "expected info message, got: {messages:?}"
4420 );
4421 }
4422
4423 #[test]
4424 fn slash_commands_registry_has_no_ingest() {
4425 use super::slash_commands::COMMANDS;
4426 assert!(
4427 !COMMANDS.iter().any(|c| c.name == "/ingest"),
4428 "/ingest is not implemented and must not appear in COMMANDS"
4429 );
4430 }
4431
4432 #[test]
4433 fn slash_commands_graph_and_plan_have_no_feature_gate() {
4434 use super::slash_commands::COMMANDS;
4435 for cmd in COMMANDS {
4436 if cmd.name == "/graph" || cmd.name == "/plan" {
4437 assert!(
4438 cmd.feature_gate.is_none(),
4439 "{} should have feature_gate: None",
4440 cmd.name
4441 );
4442 }
4443 }
4444 }
4445
4446 #[tokio::test]
4449 async fn bare_skill_command_does_not_invoke_llm() {
4450 let provider = mock_provider(vec![]);
4453 let channel = MockChannel::new(vec!["/skill".to_string()]);
4454 let registry = create_test_registry();
4455 let executor = MockToolExecutor::no_tools();
4456 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
4457
4458 let result = agent.run().await;
4459 assert!(result.is_ok());
4460
4461 let sent = agent.channel.sent_messages();
4462 assert!(
4464 sent.iter().any(|m| m.contains("Unknown /skill subcommand")),
4465 "bare /skill must send usage; got: {sent:?}"
4466 );
4467 assert!(
4469 agent.messages.iter().all(|m| m.role != Role::Assistant),
4470 "bare /skill must not produce an assistant message; messages: {:?}",
4471 agent.messages
4472 );
4473 }
4474
4475 #[tokio::test]
4476 async fn bare_feedback_command_does_not_invoke_llm() {
4477 let provider = mock_provider(vec![]);
4478 let channel = MockChannel::new(vec!["/feedback".to_string()]);
4479 let registry = create_test_registry();
4480 let executor = MockToolExecutor::no_tools();
4481 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
4482
4483 let result = agent.run().await;
4484 assert!(result.is_ok());
4485
4486 let sent = agent.channel.sent_messages();
4487 assert!(
4488 sent.iter().any(|m| m.contains("Usage: /feedback")),
4489 "bare /feedback must send usage; got: {sent:?}"
4490 );
4491 assert!(
4492 agent.messages.iter().all(|m| m.role != Role::Assistant),
4493 "bare /feedback must not produce an assistant message; messages: {:?}",
4494 agent.messages
4495 );
4496 }
4497
4498 #[tokio::test]
4499 async fn bare_image_command_sends_usage() {
4500 let provider = mock_provider(vec![]);
4501 let channel = MockChannel::new(vec!["/image".to_string()]);
4502 let registry = create_test_registry();
4503 let executor = MockToolExecutor::no_tools();
4504 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
4505
4506 let result = agent.run().await;
4507 assert!(result.is_ok());
4508
4509 let sent = agent.channel.sent_messages();
4510 assert!(
4511 sent.iter().any(|m| m.contains("Usage: /image <path>")),
4512 "bare /image must send usage; got: {sent:?}"
4513 );
4514 assert!(
4515 agent.messages.iter().all(|m| m.role != Role::Assistant),
4516 "bare /image must not produce an assistant message; messages: {:?}",
4517 agent.messages
4518 );
4519 }
4520}
4521
4522#[cfg(test)]
4524mod compaction_e2e {
4525 use super::agent_tests::*;
4526 use zeph_llm::LlmError;
4527 use zeph_llm::any::AnyProvider;
4528 use zeph_llm::mock::MockProvider;
4529 use zeph_llm::provider::{Message, MessageMetadata, Role};
4530
4531 #[tokio::test]
4534 async fn agent_recovers_from_context_length_exceeded_and_produces_response() {
4535 let provider = AnyProvider::Mock(
4537 MockProvider::with_responses(vec!["final answer".into()])
4538 .with_errors(vec![LlmError::ContextLengthExceeded]),
4539 );
4540 let channel = MockChannel::new(vec![]);
4541 let registry = create_test_registry();
4542 let executor = MockToolExecutor::no_tools();
4543
4544 let mut agent = super::Agent::new(provider, channel, registry, None, 5, executor)
4545 .with_context_budget(200_000, 0.20, 0.80, 4, 0);
4547
4548 agent.messages.push(Message {
4550 role: Role::User,
4551 content: "describe the architecture".into(),
4552 parts: vec![],
4553 metadata: MessageMetadata::default(),
4554 });
4555
4556 let result = agent.call_llm_with_retry(2).await.unwrap();
4558
4559 assert!(
4560 result.is_some(),
4561 "agent must produce a response after recovering from context length error"
4562 );
4563 assert_eq!(result.as_deref(), Some("final answer"));
4564
4565 let sent = agent.channel.sent_messages();
4567 assert!(
4568 sent.iter().any(|m| m.contains("final answer")),
4569 "recovered response must be forwarded to the channel; got: {sent:?}"
4570 );
4571 }
4572
4573 #[tokio::test]
4577 async fn subagent_spawn_text_collect_e2e() {
4578 use crate::subagent::def::{SkillFilter, SubAgentPermissions, ToolPolicy};
4579 use crate::subagent::hooks::SubagentHooks;
4580 use crate::subagent::{AgentCommand, SubAgentDef, SubAgentManager};
4581
4582 let provider = mock_provider(vec!["task completed successfully".into()]);
4585 let channel = MockChannel::new(vec![]);
4586 let registry = create_test_registry();
4587 let executor = MockToolExecutor::no_tools();
4588 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
4589
4590 let mut mgr = SubAgentManager::new(4);
4591 mgr.definitions_mut().push(SubAgentDef {
4592 name: "worker".into(),
4593 description: "A worker bot".into(),
4594 model: None,
4595 tools: ToolPolicy::InheritAll,
4596 disallowed_tools: vec![],
4597 permissions: SubAgentPermissions {
4598 max_turns: 1,
4599 ..SubAgentPermissions::default()
4600 },
4601 skills: SkillFilter::default(),
4602 system_prompt: "You are a worker.".into(),
4603 hooks: SubagentHooks::default(),
4604 memory: None,
4605 source: None,
4606 file_path: None,
4607 });
4608 agent.subagent_manager = Some(mgr);
4609
4610 let spawn_resp = agent
4612 .handle_agent_command(AgentCommand::Background {
4613 name: "worker".into(),
4614 prompt: "do a task".into(),
4615 })
4616 .await
4617 .expect("Background spawn must return Some");
4618 assert!(
4619 spawn_resp.contains("worker"),
4620 "spawn response must mention agent name; got: {spawn_resp}"
4621 );
4622 assert!(
4623 spawn_resp.contains("started"),
4624 "spawn response must confirm start; got: {spawn_resp}"
4625 );
4626
4627 let short_id = spawn_resp
4629 .split("id: ")
4630 .nth(1)
4631 .expect("response must contain 'id: '")
4632 .trim_end_matches(')')
4633 .trim()
4634 .to_string();
4635 assert!(!short_id.is_empty(), "short_id must not be empty");
4636
4637 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
4639 let full_id = loop {
4640 let mgr = agent.subagent_manager.as_ref().unwrap();
4641 let statuses = mgr.statuses();
4642 let found = statuses.iter().find(|(id, _)| id.starts_with(&short_id));
4643 if let Some((id, status)) = found {
4644 match status.state {
4645 crate::subagent::SubAgentState::Completed => break id.clone(),
4646 crate::subagent::SubAgentState::Failed => {
4647 panic!(
4648 "sub-agent reached Failed state unexpectedly: {:?}",
4649 status.last_message
4650 );
4651 }
4652 _ => {}
4653 }
4654 }
4655 if std::time::Instant::now() > deadline {
4656 panic!("sub-agent did not complete within timeout");
4657 }
4658 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
4659 };
4660
4661 let result = agent
4663 .subagent_manager
4664 .as_mut()
4665 .unwrap()
4666 .collect(&full_id)
4667 .await
4668 .expect("collect must succeed for completed sub-agent");
4669 assert!(
4670 result.contains("task completed successfully"),
4671 "collected result must contain sub-agent output; got: {result:?}"
4672 );
4673 }
4674
4675 #[tokio::test]
4682 async fn foreground_spawn_secret_bridge_approves() {
4683 use crate::subagent::def::{SkillFilter, SubAgentPermissions, ToolPolicy};
4684 use crate::subagent::hooks::SubagentHooks;
4685 use crate::subagent::{AgentCommand, SubAgentDef, SubAgentManager};
4686
4687 let provider = mock_provider(vec![
4691 "[REQUEST_SECRET: api-key]".into(),
4692 "done with secret".into(),
4693 ]);
4694
4695 let channel = MockChannel::new(vec![]).with_confirmations(vec![true]);
4697
4698 let registry = create_test_registry();
4699 let executor = MockToolExecutor::no_tools();
4700 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
4701
4702 let mut mgr = SubAgentManager::new(4);
4703 mgr.definitions_mut().push(SubAgentDef {
4704 name: "vault-bot".into(),
4705 description: "A bot that requests secrets".into(),
4706 model: None,
4707 tools: ToolPolicy::InheritAll,
4708 disallowed_tools: vec![],
4709 permissions: SubAgentPermissions {
4710 max_turns: 2,
4711 secrets: vec!["api-key".into()],
4712 ..SubAgentPermissions::default()
4713 },
4714 skills: SkillFilter::default(),
4715 system_prompt: "You need a secret.".into(),
4716 hooks: SubagentHooks::default(),
4717 memory: None,
4718 source: None,
4719 file_path: None,
4720 });
4721 agent.subagent_manager = Some(mgr);
4722
4723 let resp: String = agent
4725 .handle_agent_command(AgentCommand::Spawn {
4726 name: "vault-bot".into(),
4727 prompt: "fetch the api key".into(),
4728 })
4729 .await
4730 .expect("Spawn must return Some");
4731
4732 assert!(
4738 resp.contains("vault-bot"),
4739 "response must mention agent name; got: {resp}"
4740 );
4741 assert!(
4742 resp.contains("completed"),
4743 "sub-agent must complete successfully; got: {resp}"
4744 );
4745 }
4746}