1mod builder;
5mod context;
6pub(crate) mod context_manager;
7pub mod error;
8#[cfg(feature = "experiments")]
9mod experiment_cmd;
10pub(super) mod feedback_detector;
11mod graph_commands;
12mod index;
13mod learning;
14pub(crate) mod learning_engine;
15mod log_commands;
16#[cfg(feature = "lsp-context")]
17mod lsp_commands;
18mod mcp;
19mod message_queue;
20mod persistence;
21#[cfg(feature = "scheduler")]
22mod scheduler_commands;
23mod skill_management;
24pub mod slash_commands;
25pub(crate) mod tool_execution;
26pub(crate) mod tool_orchestrator;
27mod trust_commands;
28mod utils;
29
30use std::collections::VecDeque;
31use std::path::PathBuf;
32use std::time::Instant;
33
34use std::sync::Arc;
35
36use tokio::sync::{Notify, mpsc, watch};
37use tokio_util::sync::CancellationToken;
38use zeph_llm::any::AnyProvider;
39use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
40use zeph_llm::stt::SpeechToText;
41
42use crate::metrics::MetricsSnapshot;
43use std::collections::HashMap;
44use zeph_memory::TokenCounter;
45use zeph_memory::semantic::SemanticMemory;
46use zeph_skills::loader::Skill;
47use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
48use zeph_skills::prompt::format_skills_prompt;
49use zeph_skills::registry::SkillRegistry;
50use zeph_skills::watcher::SkillEvent;
51use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
52
53use crate::channel::Channel;
54use crate::config::Config;
55use crate::config::{SecurityConfig, SkillPromptMode, TimeoutConfig};
56use crate::config_watcher::ConfigEvent;
57use crate::context::{
58 ContextBudget, EnvironmentContext, build_system_prompt, build_system_prompt_with_instructions,
59};
60use crate::cost::CostTracker;
61use crate::instructions::{InstructionBlock, InstructionEvent, InstructionReloadState};
62use crate::sanitizer::ContentSanitizer;
63use crate::sanitizer::quarantine::QuarantinedSummarizer;
64use crate::vault::Secret;
65
66use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, QueuedMessage, detect_image_mime};
67
68pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
69pub(crate) const DOCUMENT_RAG_PREFIX: &str = "## Relevant documents\n";
70pub(crate) const RECALL_PREFIX: &str = "[semantic recall]\n";
71pub(crate) const CODE_CONTEXT_PREFIX: &str = "[code context]\n";
72pub(crate) const SUMMARY_PREFIX: &str = "[conversation summaries]\n";
73pub(crate) const CROSS_SESSION_PREFIX: &str = "[cross-session context]\n";
74pub(crate) const CORRECTIONS_PREFIX: &str = "[past corrections]\n";
75pub(crate) const GRAPH_FACTS_PREFIX: &str = "[known facts]\n";
76#[cfg(feature = "lsp-context")]
81pub(crate) const LSP_NOTE_PREFIX: &str = "[lsp ";
82pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
83
84fn format_plan_summary(graph: &crate::orchestration::TaskGraph) -> String {
85 use std::fmt::Write;
86 let mut out = String::new();
87 let _ = writeln!(out, "Plan: \"{}\"", graph.goal);
88 let _ = writeln!(out, "Tasks: {}", graph.tasks.len());
89 let _ = writeln!(out);
90 for (i, task) in graph.tasks.iter().enumerate() {
91 let deps = if task.depends_on.is_empty() {
92 String::new()
93 } else {
94 let ids: Vec<String> = task.depends_on.iter().map(ToString::to_string).collect();
95 format!(" (after: {})", ids.join(", "))
96 };
97 let agent = task.agent_hint.as_deref().unwrap_or("-");
98 let _ = writeln!(out, " {}. [{}] {}{}", i + 1, agent, task.title, deps);
99 }
100 out
101}
102
103pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
104 use std::fmt::Write;
105 let capacity = "[tool output: ".len()
106 + tool_name.len()
107 + "]\n```\n".len()
108 + body.len()
109 + TOOL_OUTPUT_SUFFIX.len();
110 let mut buf = String::with_capacity(capacity);
111 let _ = write!(
112 buf,
113 "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
114 );
115 buf
116}
117
118pub(super) struct MemoryState {
119 pub(super) memory: Option<Arc<SemanticMemory>>,
120 pub(super) conversation_id: Option<zeph_memory::ConversationId>,
121 pub(super) history_limit: u32,
122 pub(super) recall_limit: usize,
123 pub(super) summarization_threshold: usize,
124 pub(super) cross_session_score_threshold: f32,
125 pub(super) autosave_assistant: bool,
126 pub(super) autosave_min_length: usize,
127 pub(super) tool_call_cutoff: usize,
128 pub(super) unsummarized_count: usize,
129 pub(super) document_config: crate::config::DocumentConfig,
130 pub(super) graph_config: crate::config::GraphConfig,
131}
132
133pub(super) struct SkillState {
134 pub(super) registry: std::sync::Arc<std::sync::RwLock<SkillRegistry>>,
135 pub(super) skill_paths: Vec<PathBuf>,
136 pub(super) managed_dir: Option<PathBuf>,
137 pub(super) trust_config: crate::config::TrustConfig,
138 pub(super) matcher: Option<SkillMatcherBackend>,
139 pub(super) max_active_skills: usize,
140 pub(super) disambiguation_threshold: f32,
141 pub(super) embedding_model: String,
142 pub(super) skill_reload_rx: Option<mpsc::Receiver<SkillEvent>>,
143 pub(super) active_skill_names: Vec<String>,
144 pub(super) last_skills_prompt: String,
145 pub(super) prompt_mode: SkillPromptMode,
146 pub(super) available_custom_secrets: HashMap<String, Secret>,
148 pub(super) cosine_weight: f32,
149 pub(super) hybrid_search: bool,
150 pub(super) bm25_index: Option<zeph_skills::bm25::Bm25Index>,
151}
152
153pub(super) struct McpState {
154 pub(super) tools: Vec<zeph_mcp::McpTool>,
155 pub(super) registry: Option<zeph_mcp::McpToolRegistry>,
156 pub(super) manager: Option<std::sync::Arc<zeph_mcp::McpManager>>,
157 pub(super) allowed_commands: Vec<String>,
158 pub(super) max_dynamic: usize,
159 pub(super) shared_tools: Option<std::sync::Arc<std::sync::RwLock<Vec<zeph_mcp::McpTool>>>>,
161}
162
163pub(super) struct IndexState {
164 pub(super) retriever: Option<std::sync::Arc<zeph_index::retriever::CodeRetriever>>,
165 pub(super) repo_map_tokens: usize,
166 pub(super) cached_repo_map: Option<(String, std::time::Instant)>,
167 pub(super) repo_map_ttl: std::time::Duration,
168}
169
170pub(super) struct RuntimeConfig {
171 pub(super) security: SecurityConfig,
172 pub(super) timeouts: TimeoutConfig,
173 pub(super) model_name: String,
174 pub(super) permission_policy: zeph_tools::PermissionPolicy,
175 pub(super) redact_credentials: bool,
176}
177
178pub(super) struct SecurityState {
180 pub(super) sanitizer: ContentSanitizer,
181 pub(super) quarantine_summarizer: Option<QuarantinedSummarizer>,
182 pub(super) exfiltration_guard: crate::sanitizer::exfiltration::ExfiltrationGuard,
183 pub(super) flagged_urls: std::collections::HashSet<String>,
184}
185
186pub(super) struct DebugState {
188 pub(super) debug_dumper: Option<crate::debug_dump::DebugDumper>,
189 pub(super) dump_format: crate::debug_dump::DumpFormat,
190 pub(super) anomaly_detector: Option<zeph_tools::AnomalyDetector>,
191 pub(super) logging_config: crate::config::LoggingConfig,
192}
193
194pub struct Agent<C: Channel> {
195 provider: AnyProvider,
196 channel: C,
197 pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
198 messages: Vec<Message>,
199 pub(super) memory_state: MemoryState,
200 pub(super) skill_state: SkillState,
201 pub(super) context_manager: context_manager::ContextManager,
202 pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
203 pub(super) learning_engine: learning_engine::LearningEngine,
204 pub(super) feedback_detector: feedback_detector::FeedbackDetector,
205 pub(super) judge_detector: Option<feedback_detector::JudgeDetector>,
206 pub(super) judge_provider: Option<AnyProvider>,
207 config_path: Option<PathBuf>,
208 config_reload_rx: Option<mpsc::Receiver<ConfigEvent>>,
209 shutdown: watch::Receiver<bool>,
210 metrics_tx: Option<watch::Sender<MetricsSnapshot>>,
211 pub(super) runtime: RuntimeConfig,
212 pub(super) mcp: McpState,
213 pub(super) index: IndexState,
214 cancel_signal: Arc<Notify>,
215 cancel_token: CancellationToken,
216 start_time: Instant,
217 message_queue: VecDeque<QueuedMessage>,
218 summary_provider: Option<AnyProvider>,
219 provider_override: Option<Arc<std::sync::RwLock<Option<AnyProvider>>>>,
221 warmup_ready: Option<watch::Receiver<bool>>,
222 cost_tracker: Option<CostTracker>,
223 cached_prompt_tokens: u64,
224 env_context: EnvironmentContext,
225 pub(crate) token_counter: Arc<TokenCounter>,
226 stt: Option<Box<dyn SpeechToText>>,
227 update_notify_rx: Option<mpsc::Receiver<String>>,
228 custom_task_rx: Option<mpsc::Receiver<String>>,
229 pub(crate) subagent_manager: Option<crate::subagent::SubAgentManager>,
233 pub(crate) subagent_config: crate::config::SubAgentConfig,
234 pub(crate) orchestration_config: crate::config::OrchestrationConfig,
235 #[cfg(feature = "experiments")]
236 pub(super) experiment_config: crate::config::ExperimentConfig,
237 pub(super) response_cache: Option<std::sync::Arc<zeph_memory::ResponseCache>>,
238 pub(crate) parent_tool_use_id: Option<String>,
242 pub(super) debug_state: DebugState,
243 pub(super) instruction_blocks: Vec<InstructionBlock>,
245 pub(super) instruction_reload_rx: Option<mpsc::Receiver<InstructionEvent>>,
246 pub(super) instruction_reload_state: Option<InstructionReloadState>,
247 pub(super) security: SecurityState,
248 pending_image_parts: Vec<zeph_llm::provider::MessagePart>,
250 pub(super) pending_graph: Option<crate::orchestration::TaskGraph>,
252 plan_cancel_token: Option<CancellationToken>,
261
262 #[cfg(feature = "lsp-context")]
265 pub(super) lsp_hooks: Option<crate::lsp_hooks::LspHookRunner>,
266 #[cfg(feature = "experiments")]
268 pub(super) experiment_cancel: Option<tokio_util::sync::CancellationToken>,
269 #[cfg(feature = "experiments")]
272 pub(super) experiment_baseline: crate::experiments::ConfigSnapshot,
273 pub(super) experiment_notify_rx: Option<tokio::sync::mpsc::Receiver<String>>,
278 #[cfg(feature = "experiments")]
281 pub(super) experiment_notify_tx: tokio::sync::mpsc::Sender<String>,
282 pub(super) server_compaction_active: bool,
285}
286
287impl<C: Channel> Agent<C> {
288 #[must_use]
289 #[allow(clippy::too_many_lines)]
290 pub fn new(
291 provider: AnyProvider,
292 channel: C,
293 registry: SkillRegistry,
294 matcher: Option<SkillMatcherBackend>,
295 max_active_skills: usize,
296 tool_executor: impl ToolExecutor + 'static,
297 ) -> Self {
298 let registry = std::sync::Arc::new(std::sync::RwLock::new(registry));
299 Self::new_with_registry_arc(
300 provider,
301 channel,
302 registry,
303 matcher,
304 max_active_skills,
305 tool_executor,
306 )
307 }
308
309 #[must_use]
316 #[allow(clippy::too_many_lines)]
317 pub fn new_with_registry_arc(
318 provider: AnyProvider,
319 channel: C,
320 registry: std::sync::Arc<std::sync::RwLock<SkillRegistry>>,
321 matcher: Option<SkillMatcherBackend>,
322 max_active_skills: usize,
323 tool_executor: impl ToolExecutor + 'static,
324 ) -> Self {
325 let all_skills: Vec<Skill> = {
326 let reg = registry.read().expect("registry read lock poisoned");
327 reg.all_meta()
328 .iter()
329 .filter_map(|m| reg.get_skill(&m.name).ok())
330 .collect()
331 };
332 let empty_trust = HashMap::new();
333 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
334 let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
335 let system_prompt = build_system_prompt(&skills_prompt, None, None, false);
336 tracing::debug!(len = system_prompt.len(), "initial system prompt built");
337 tracing::trace!(prompt = %system_prompt, "full system prompt");
338
339 let initial_prompt_tokens = u64::try_from(system_prompt.len()).unwrap_or(0) / 4;
340 let (_tx, rx) = watch::channel(false);
341 let token_counter = Arc::new(TokenCounter::new());
342 #[cfg(feature = "experiments")]
346 let (exp_notify_tx, exp_notify_rx) = tokio::sync::mpsc::channel::<String>(4);
347 #[cfg(not(feature = "experiments"))]
348 let (_exp_notify_tx, exp_notify_rx) = tokio::sync::mpsc::channel::<String>(4);
349 Self {
350 provider,
351 channel,
352 tool_executor: Arc::new(tool_executor),
353 messages: vec![Message {
354 role: Role::System,
355 content: system_prompt,
356 parts: vec![],
357 metadata: MessageMetadata::default(),
358 }],
359 memory_state: MemoryState {
360 memory: None,
361 conversation_id: None,
362 history_limit: 50,
363 recall_limit: 5,
364 summarization_threshold: 50,
365 cross_session_score_threshold: 0.35,
366 autosave_assistant: false,
367 autosave_min_length: 20,
368 tool_call_cutoff: 6,
369 unsummarized_count: 0,
370 document_config: crate::config::DocumentConfig::default(),
371 graph_config: crate::config::GraphConfig::default(),
372 },
373 skill_state: SkillState {
374 registry,
375 skill_paths: Vec::new(),
376 managed_dir: None,
377 trust_config: crate::config::TrustConfig::default(),
378 matcher,
379 max_active_skills,
380 disambiguation_threshold: 0.05,
381 embedding_model: String::new(),
382 skill_reload_rx: None,
383 active_skill_names: Vec::new(),
384 last_skills_prompt: skills_prompt,
385 prompt_mode: SkillPromptMode::Auto,
386 available_custom_secrets: HashMap::new(),
387 cosine_weight: 0.7,
388 hybrid_search: false,
389 bm25_index: None,
390 },
391 context_manager: context_manager::ContextManager::new(),
392 tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
393 learning_engine: learning_engine::LearningEngine::new(),
394 feedback_detector: feedback_detector::FeedbackDetector::new(0.6),
395 judge_detector: None,
396 judge_provider: None,
397 config_path: None,
398 debug_state: DebugState {
399 debug_dumper: None,
400 dump_format: crate::debug_dump::DumpFormat::default(),
401 anomaly_detector: None,
402 logging_config: crate::config::LoggingConfig::default(),
403 },
404 config_reload_rx: None,
405 shutdown: rx,
406 metrics_tx: None,
407 runtime: RuntimeConfig {
408 security: SecurityConfig::default(),
409 timeouts: TimeoutConfig::default(),
410 model_name: String::new(),
411 permission_policy: zeph_tools::PermissionPolicy::default(),
412 redact_credentials: true,
413 },
414 mcp: McpState {
415 tools: Vec::new(),
416 registry: None,
417 manager: None,
418 allowed_commands: Vec::new(),
419 max_dynamic: 10,
420 shared_tools: None,
421 },
422 index: IndexState {
423 retriever: None,
424 repo_map_tokens: 0,
425 cached_repo_map: None,
426 repo_map_ttl: std::time::Duration::from_secs(300),
427 },
428 cancel_signal: Arc::new(Notify::new()),
429 cancel_token: CancellationToken::new(),
430 start_time: Instant::now(),
431 message_queue: VecDeque::new(),
432 summary_provider: None,
433 provider_override: None,
434 warmup_ready: None,
435 cost_tracker: None,
436 cached_prompt_tokens: initial_prompt_tokens,
437 env_context: EnvironmentContext::gather(""),
438 token_counter,
439 stt: None,
440 update_notify_rx: None,
441 custom_task_rx: None,
442 subagent_manager: None,
443 subagent_config: crate::config::SubAgentConfig::default(),
444 orchestration_config: crate::config::OrchestrationConfig::default(),
445 #[cfg(feature = "experiments")]
446 experiment_config: crate::config::ExperimentConfig::default(),
447 #[cfg(feature = "experiments")]
448 experiment_baseline: crate::experiments::ConfigSnapshot::default(),
449 experiment_notify_rx: Some(exp_notify_rx),
450 #[cfg(feature = "experiments")]
451 experiment_notify_tx: exp_notify_tx,
452 response_cache: None,
453 parent_tool_use_id: None,
454 instruction_blocks: Vec::new(),
455 instruction_reload_rx: None,
456 instruction_reload_state: None,
457 security: SecurityState {
458 sanitizer: ContentSanitizer::new(
459 &crate::sanitizer::ContentIsolationConfig::default(),
460 ),
461 quarantine_summarizer: None,
462 exfiltration_guard: crate::sanitizer::exfiltration::ExfiltrationGuard::new(
463 crate::sanitizer::exfiltration::ExfiltrationGuardConfig::default(),
464 ),
465 flagged_urls: std::collections::HashSet::new(),
466 },
467 pending_image_parts: Vec::new(),
468 pending_graph: None,
469 plan_cancel_token: None,
470
471 #[cfg(feature = "lsp-context")]
472 lsp_hooks: None,
473 #[cfg(feature = "experiments")]
474 experiment_cancel: None,
475 server_compaction_active: false,
476 }
477 }
478
479 pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
484 let Some(mgr) = &mut self.subagent_manager else {
485 return vec![];
486 };
487
488 let finished: Vec<String> = mgr
489 .statuses()
490 .into_iter()
491 .filter_map(|(id, status)| {
492 if matches!(
493 status.state,
494 crate::subagent::SubAgentState::Completed
495 | crate::subagent::SubAgentState::Failed
496 | crate::subagent::SubAgentState::Canceled
497 ) {
498 Some(id)
499 } else {
500 None
501 }
502 })
503 .collect();
504
505 let mut results = vec![];
506 for task_id in finished {
507 match mgr.collect(&task_id).await {
508 Ok(result) => results.push((task_id, result)),
509 Err(e) => {
510 tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
511 }
512 }
513 }
514 results
515 }
516
517 async fn handle_plan_command(
518 &mut self,
519 cmd: crate::orchestration::PlanCommand,
520 ) -> Result<(), error::AgentError> {
521 use crate::orchestration::PlanCommand;
522
523 if !self.config_for_orchestration().enabled {
524 self.channel
525 .send(
526 "Task orchestration is disabled. Set `orchestration.enabled = true` in config.",
527 )
528 .await?;
529 return Ok(());
530 }
531
532 match cmd {
533 PlanCommand::Goal(goal) => self.handle_plan_goal(&goal).await,
534 PlanCommand::Confirm => self.handle_plan_confirm().await,
535 PlanCommand::Status(id) => self.handle_plan_status(id.as_deref()).await,
536 PlanCommand::List => self.handle_plan_list().await,
537 PlanCommand::Cancel(id) => self.handle_plan_cancel(id.as_deref()).await,
538 PlanCommand::Resume(id) => self.handle_plan_resume(id.as_deref()).await,
539 PlanCommand::Retry(id) => self.handle_plan_retry(id.as_deref()).await,
540 }
541 }
542
543 fn config_for_orchestration(&self) -> &crate::config::OrchestrationConfig {
544 &self.orchestration_config
545 }
546
547 async fn handle_plan_goal(&mut self, goal: &str) -> Result<(), error::AgentError> {
548 use crate::orchestration::{LlmPlanner, Planner};
549
550 if self.pending_graph.is_some() {
551 self.channel
552 .send(
553 "A plan is already pending confirmation. \
554 Use /plan confirm to execute it or /plan cancel to discard.",
555 )
556 .await?;
557 return Ok(());
558 }
559
560 self.channel.send("Planning task decomposition...").await?;
561
562 let available_agents = self
563 .subagent_manager
564 .as_ref()
565 .map(|m| m.definitions().to_vec())
566 .unwrap_or_default();
567
568 let confirm_before_execute = self.orchestration_config.confirm_before_execute;
569 let graph = LlmPlanner::new(self.provider.clone(), &self.orchestration_config)
570 .plan(goal, &available_agents)
571 .await
572 .map_err(|e| error::AgentError::Other(e.to_string()))?;
573
574 let task_count = graph.tasks.len() as u64;
575 let snapshot = crate::metrics::TaskGraphSnapshot::from(&graph);
576 self.update_metrics(|m| {
577 m.orchestration.plans_total += 1;
578 m.orchestration.tasks_total += task_count;
579 m.orchestration_graph = Some(snapshot);
580 });
581
582 if confirm_before_execute {
583 let summary = format_plan_summary(&graph);
584 self.channel.send(&summary).await?;
585 self.channel
586 .send("Type `/plan confirm` to execute, or `/plan cancel` to abort.")
587 .await?;
588 self.pending_graph = Some(graph);
589 } else {
590 let summary = format_plan_summary(&graph);
593 self.channel.send(&summary).await?;
594 self.channel
595 .send("Plan ready. Full execution will be available in a future phase.")
596 .await?;
597 let now = std::time::Instant::now();
599 self.update_metrics(|m| {
600 if let Some(ref mut s) = m.orchestration_graph {
601 "completed".clone_into(&mut s.status);
602 s.completed_at = Some(now);
603 }
604 });
605 }
606
607 Ok(())
608 }
609
610 #[allow(clippy::too_many_lines)]
611 async fn handle_plan_confirm(&mut self) -> Result<(), error::AgentError> {
612 use crate::orchestration::{DagScheduler, GraphStatus, RuleBasedRouter};
613
614 let Some(graph) = self.pending_graph.take() else {
615 self.channel
616 .send("No pending plan to confirm. Use `/plan <goal>` to create one.")
617 .await?;
618 return Ok(());
619 };
620
621 if self.subagent_manager.is_none() {
623 self.channel
624 .send(
625 "No sub-agents configured. Add sub-agent definitions to config \
626 to enable plan execution.",
627 )
628 .await?;
629 self.pending_graph = Some(graph);
630 return Ok(());
631 }
632
633 if graph.tasks.is_empty() {
636 self.channel.send("Plan has no tasks.").await?;
637 self.pending_graph = Some(graph);
638 return Ok(());
639 }
640 if matches!(graph.status, GraphStatus::Completed | GraphStatus::Canceled) {
642 self.channel
643 .send(&format!(
644 "Cannot re-execute a {} plan. Use `/plan <goal>` to create a new one.",
645 graph.status
646 ))
647 .await?;
648 self.pending_graph = Some(graph);
649 return Ok(());
650 }
651
652 let available_agents = self
653 .subagent_manager
654 .as_ref()
655 .map(|m| m.definitions().to_vec())
656 .unwrap_or_default();
657
658 let max_concurrent = self.subagent_config.max_concurrent;
662 let max_parallel = self.orchestration_config.max_parallel as usize;
663 if max_concurrent < max_parallel + 1 {
664 tracing::warn!(
665 max_concurrent,
666 max_parallel,
667 "max_concurrent < max_parallel + 1: orchestration tasks may be starved by \
668 planning-phase sub-agents; recommend setting max_concurrent >= {}",
669 max_parallel + 1
670 );
671 }
672
673 let reserved = max_parallel.min(max_concurrent.saturating_sub(1));
676 if let Some(mgr) = self.subagent_manager.as_mut() {
677 mgr.reserve_slots(reserved);
678 }
679
680 let mut scheduler = if graph.status == GraphStatus::Created {
683 DagScheduler::new(
684 graph,
685 &self.orchestration_config,
686 Box::new(RuleBasedRouter),
687 available_agents,
688 )
689 } else {
690 DagScheduler::resume_from(
691 graph,
692 &self.orchestration_config,
693 Box::new(RuleBasedRouter),
694 available_agents,
695 )
696 }
697 .map_err(|e| {
698 if let Some(mgr) = self.subagent_manager.as_mut() {
700 mgr.release_reservation(reserved);
701 }
702 error::AgentError::Other(e.to_string())
703 })?;
704
705 let task_count = scheduler.graph().tasks.len();
706 self.channel
707 .send(&format!(
708 "Confirmed. Executing plan ({task_count} tasks)..."
709 ))
710 .await?;
711
712 let plan_token = CancellationToken::new();
713 self.plan_cancel_token = Some(plan_token.clone());
714
715 let scheduler_result = self
717 .run_scheduler_loop(&mut scheduler, task_count, plan_token)
718 .await;
719 self.plan_cancel_token = None;
720
721 if let Some(mgr) = self.subagent_manager.as_mut() {
723 mgr.release_reservation(reserved);
724 }
725
726 let final_status = match scheduler_result {
727 Ok(s) => s,
728 Err(e) => return Err(e),
729 };
730
731 let completed_graph = scheduler.into_graph();
732
733 let snapshot = crate::metrics::TaskGraphSnapshot::from(&completed_graph);
735 self.update_metrics(|m| {
736 m.orchestration_graph = Some(snapshot);
737 });
738
739 let result_label = self
740 .finalize_plan_execution(completed_graph, final_status)
741 .await?;
742
743 let now = std::time::Instant::now();
744 self.update_metrics(|m| {
745 if let Some(ref mut s) = m.orchestration_graph {
746 result_label.clone_into(&mut s.status);
747 s.completed_at = Some(now);
748 }
749 });
750 Ok(())
751 }
752
753 #[allow(clippy::too_many_lines)]
767 async fn run_scheduler_loop(
768 &mut self,
769 scheduler: &mut crate::orchestration::DagScheduler,
770 task_count: usize,
771 cancel_token: CancellationToken,
772 ) -> Result<crate::orchestration::GraphStatus, error::AgentError> {
773 use crate::orchestration::SchedulerAction;
774
775 let mut spawn_counter: usize = 0;
779
780 let mut denied_secrets: std::collections::HashSet<(String, String)> =
783 std::collections::HashSet::new();
784
785 let final_status = 'tick: loop {
786 let actions = scheduler.tick();
787
788 let mut any_spawn_success = false;
790 let mut any_concurrency_failure = false;
791
792 for action in actions {
793 match action {
794 SchedulerAction::Spawn {
795 task_id,
796 agent_def_name,
797 prompt,
798 } => {
799 let task_title = scheduler
800 .graph()
801 .tasks
802 .get(task_id.index())
803 .map_or("unknown", |t| t.title.as_str());
804
805 let provider = self.provider.clone();
806 let tool_executor = Arc::clone(&self.tool_executor);
807 let skills = self.filtered_skills_for(&agent_def_name);
808 let cfg = self.subagent_config.clone();
809 let event_tx = scheduler.event_sender();
810
811 let mgr = self
812 .subagent_manager
813 .as_mut()
814 .expect("subagent_manager checked above");
815 match mgr.spawn_for_task(
816 &agent_def_name,
817 &prompt,
818 provider,
819 tool_executor,
820 skills,
821 &cfg,
822 task_id,
823 event_tx,
824 ) {
825 Ok(handle_id) => {
826 spawn_counter += 1;
827 let _ = self
828 .channel
829 .send_status(&format!(
830 "Executing task {spawn_counter}/{task_count}: {task_title}..."
831 ))
832 .await;
833 scheduler.record_spawn(task_id, handle_id, agent_def_name);
834 any_spawn_success = true;
835 }
836 Err(e) => {
837 tracing::error!(error = %e, %task_id, "spawn_for_task failed");
838 if matches!(
839 e,
840 crate::subagent::SubAgentError::ConcurrencyLimit { .. }
841 ) {
842 any_concurrency_failure = true;
843 }
844 let extra = scheduler.record_spawn_failure(task_id, &e);
845 for a in extra {
846 match a {
847 SchedulerAction::Cancel { agent_handle_id } => {
848 if let Some(m) = self.subagent_manager.as_mut() {
849 let _ =
851 m.cancel(&agent_handle_id).inspect_err(|err| {
852 tracing::trace!(
853 error = %err,
854 "cancel after spawn failure: agent already gone"
855 );
856 });
857 }
858 }
859 SchedulerAction::Done { status } => {
860 break 'tick status;
861 }
862 SchedulerAction::Spawn { .. }
863 | SchedulerAction::RunInline { .. } => {}
864 }
865 }
866 }
867 }
868 }
869 SchedulerAction::Cancel { agent_handle_id } => {
870 if let Some(mgr) = self.subagent_manager.as_mut() {
871 let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
873 tracing::trace!(error = %e, "cancel: agent already gone");
874 });
875 }
876 }
877 SchedulerAction::RunInline { task_id, prompt } => {
888 spawn_counter += 1;
889 let task_title = scheduler
890 .graph()
891 .tasks
892 .get(task_id.index())
893 .map_or("unknown", |t| t.title.as_str());
894 let _ = self
895 .channel
896 .send_status(&format!(
897 "Executing task {spawn_counter}/{task_count} (inline): {task_title}..."
898 ))
899 .await;
900
901 let handle_id = format!("__inline_{task_id}__");
905 scheduler.record_spawn(task_id, handle_id.clone(), "__main__".to_string());
906
907 let event_tx = scheduler.event_sender();
908 let max_iter = self.tool_orchestrator.max_iterations;
909 let outcome = tokio::select! {
910 result = self.run_inline_tool_loop(&prompt, max_iter) => {
911 match result {
912 Ok(output) => crate::orchestration::TaskOutcome::Completed {
913 output,
914 artifacts: vec![],
915 },
916 Err(e) => crate::orchestration::TaskOutcome::Failed {
917 error: e.to_string(),
918 },
919 }
920 }
921 () = cancel_token.cancelled() => {
922 crate::orchestration::TaskOutcome::Failed {
924 error: "canceled".to_string(),
925 }
926 }
927 };
928 let event = crate::orchestration::TaskEvent {
929 task_id,
930 agent_handle_id: handle_id,
931 outcome,
932 };
933 if let Err(e) = event_tx.send(event).await {
934 tracing::warn!(
935 %task_id,
936 error = %e,
937 "inline task event send failed"
938 );
939 }
940 }
941 SchedulerAction::Done { status } => {
942 break 'tick status;
943 }
944 }
945 }
946
947 scheduler.record_batch_backoff(any_spawn_success, any_concurrency_failure);
949
950 self.process_pending_secret_requests(&mut denied_secrets)
952 .await;
953
954 let snapshot = crate::metrics::TaskGraphSnapshot::from(scheduler.graph());
956 self.update_metrics(|m| {
957 m.orchestration_graph = Some(snapshot);
958 });
959
960 tokio::select! {
981 biased;
983 () = cancel_token.cancelled() => {
984 let cancel_actions = scheduler.cancel_all();
985 for action in cancel_actions {
986 match action {
987 SchedulerAction::Cancel { agent_handle_id } => {
988 if let Some(mgr) = self.subagent_manager.as_mut() {
989 let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
990 tracing::trace!(
991 error = %e,
992 "cancel during plan cancellation: agent already gone"
993 );
994 });
995 }
996 }
997 SchedulerAction::Done { status } => {
998 break 'tick status;
999 }
1000 SchedulerAction::Spawn { .. } | SchedulerAction::RunInline { .. } => {}
1001 }
1002 }
1003 break 'tick crate::orchestration::GraphStatus::Canceled;
1006 }
1007 () = scheduler.wait_event() => {}
1008 result = self.channel.recv() => {
1009 if let Ok(Some(msg)) = result {
1010 if msg.text.trim().eq_ignore_ascii_case("/plan cancel") {
1011 let _ = self.channel.send_status("Canceling plan...").await;
1012 let cancel_actions = scheduler.cancel_all();
1013 for ca in cancel_actions {
1014 match ca {
1015 SchedulerAction::Cancel { agent_handle_id } => {
1016 if let Some(mgr) = self.subagent_manager.as_mut() {
1017 let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1019 tracing::trace!(error = %e, "cancel on user request: agent already gone");
1020 });
1021 }
1022 }
1023 SchedulerAction::Done { status } => {
1024 break 'tick status;
1025 }
1026 SchedulerAction::Spawn { .. }
1027 | SchedulerAction::RunInline { .. } => {}
1028 }
1029 }
1030 break 'tick crate::orchestration::GraphStatus::Canceled;
1033 }
1034 self.enqueue_or_merge(msg.text, vec![], msg.attachments);
1035 } else {
1036 let cancel_actions = scheduler.cancel_all();
1038 let n = cancel_actions
1039 .iter()
1040 .filter(|a| matches!(a, SchedulerAction::Cancel { .. }))
1041 .count();
1042 tracing::warn!(sub_agents = n, "scheduler channel closed, canceling running sub-agents");
1043 for action in cancel_actions {
1044 match action {
1045 SchedulerAction::Cancel { agent_handle_id } => {
1046 if let Some(mgr) = self.subagent_manager.as_mut() {
1047 let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1048 tracing::trace!(
1049 error = %e,
1050 "cancel on channel close: agent already gone"
1051 );
1052 });
1053 }
1054 }
1055 SchedulerAction::Done { status } => {
1056 break 'tick status;
1057 }
1058 SchedulerAction::Spawn { .. } | SchedulerAction::RunInline { .. } => {}
1059 }
1060 }
1061 break 'tick crate::orchestration::GraphStatus::Canceled;
1064 }
1065 }
1066 () = shutdown_signal(&mut self.shutdown) => {
1068 let cancel_actions = scheduler.cancel_all();
1069 let n = cancel_actions
1070 .iter()
1071 .filter(|a| matches!(a, SchedulerAction::Cancel { .. }))
1072 .count();
1073 tracing::warn!(sub_agents = n, "shutdown signal received, canceling running sub-agents");
1074 for action in cancel_actions {
1075 match action {
1076 SchedulerAction::Cancel { agent_handle_id } => {
1077 if let Some(mgr) = self.subagent_manager.as_mut() {
1078 let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1079 tracing::trace!(
1080 error = %e,
1081 "cancel on shutdown: agent already gone"
1082 );
1083 });
1084 }
1085 }
1086 SchedulerAction::Done { status } => {
1087 break 'tick status;
1088 }
1089 SchedulerAction::Spawn { .. } | SchedulerAction::RunInline { .. } => {}
1090 }
1091 }
1092 break 'tick crate::orchestration::GraphStatus::Canceled;
1095 }
1096 }
1097 };
1098
1099 self.process_pending_secret_requests(&mut std::collections::HashSet::new())
1102 .await;
1103
1104 Ok(final_status)
1105 }
1106
1107 async fn run_inline_tool_loop(
1114 &self,
1115 prompt: &str,
1116 max_iterations: usize,
1117 ) -> Result<String, zeph_llm::LlmError> {
1118 use zeph_llm::provider::{ChatResponse, Message, MessagePart, Role, ToolDefinition};
1119 use zeph_tools::executor::ToolCall;
1120
1121 let tool_defs: Vec<ToolDefinition> = self
1122 .tool_executor
1123 .tool_definitions_erased()
1124 .iter()
1125 .map(tool_execution::tool_def_to_definition)
1126 .collect();
1127
1128 tracing::debug!(
1129 prompt_len = prompt.len(),
1130 max_iterations,
1131 tool_count = tool_defs.len(),
1132 "inline tool loop: starting"
1133 );
1134
1135 let mut messages: Vec<Message> = vec![Message::from_legacy(Role::User, prompt)];
1136 let mut last_text = String::new();
1137
1138 for iteration in 0..max_iterations {
1139 let response = self.provider.chat_with_tools(&messages, &tool_defs).await?;
1140
1141 match response {
1142 ChatResponse::Text(text) => {
1143 tracing::debug!(iteration, "inline tool loop: text response, returning");
1144 return Ok(text);
1145 }
1146 ChatResponse::ToolUse {
1147 text, tool_calls, ..
1148 } => {
1149 tracing::debug!(
1150 iteration,
1151 tools = ?tool_calls.iter().map(|tc| &tc.name).collect::<Vec<_>>(),
1152 "inline tool loop: tool use"
1153 );
1154
1155 if let Some(ref t) = text {
1156 last_text.clone_from(t);
1157 }
1158
1159 let mut parts: Vec<MessagePart> = Vec::new();
1161 if let Some(ref t) = text
1162 && !t.is_empty()
1163 {
1164 parts.push(MessagePart::Text { text: t.clone() });
1165 }
1166 for tc in &tool_calls {
1167 parts.push(MessagePart::ToolUse {
1168 id: tc.id.clone(),
1169 name: tc.name.clone(),
1170 input: tc.input.clone(),
1171 });
1172 }
1173 messages.push(Message::from_parts(Role::Assistant, parts));
1174
1175 let mut result_parts: Vec<MessagePart> = Vec::new();
1177 for tc in &tool_calls {
1178 let call = ToolCall {
1179 tool_id: tc.name.clone(),
1180 params: match &tc.input {
1181 serde_json::Value::Object(map) => map.clone(),
1182 _ => serde_json::Map::new(),
1183 },
1184 };
1185 let output = match self.tool_executor.execute_tool_call_erased(&call).await
1186 {
1187 Ok(Some(out)) => out.summary,
1188 Ok(None) => "(no output)".to_owned(),
1189 Err(e) => format!("[error] {e}"),
1190 };
1191 let is_error = output.starts_with("[error]");
1192 result_parts.push(MessagePart::ToolResult {
1193 tool_use_id: tc.id.clone(),
1194 content: output,
1195 is_error,
1196 });
1197 }
1198 messages.push(Message::from_parts(Role::User, result_parts));
1199 }
1200 }
1201 }
1202
1203 tracing::debug!(
1204 max_iterations,
1205 last_text_empty = last_text.is_empty(),
1206 "inline tool loop: iteration limit reached"
1207 );
1208 Ok(last_text)
1209 }
1210
1211 async fn process_pending_secret_requests(
1219 &mut self,
1220 denied: &mut std::collections::HashSet<(String, String)>,
1221 ) {
1222 loop {
1223 let pending = self
1224 .subagent_manager
1225 .as_mut()
1226 .and_then(crate::subagent::SubAgentManager::try_recv_secret_request);
1227 let Some((req_handle_id, req)) = pending else {
1228 break;
1229 };
1230 let deny_key = (req_handle_id.clone(), req.secret_key.clone());
1231 if denied.contains(&deny_key) {
1232 tracing::debug!(
1233 handle_id = %req_handle_id,
1234 secret_key = %req.secret_key,
1235 "skipping duplicate secret prompt for already-denied key"
1236 );
1237 if let Some(mgr) = self.subagent_manager.as_mut() {
1238 let _ = mgr.deny_secret(&req_handle_id);
1239 }
1240 continue;
1241 }
1242 let prompt = format!(
1243 "Sub-agent requests secret '{}'. Allow?{}",
1244 crate::text::truncate_to_chars(&req.secret_key, 100),
1245 req.reason
1246 .as_deref()
1247 .map(|r| format!(" Reason: {}", crate::text::truncate_to_chars(r, 200)))
1248 .unwrap_or_default()
1249 );
1250 let approved = tokio::select! {
1252 result = self.channel.confirm(&prompt) => result.unwrap_or(false),
1253 () = tokio::time::sleep(std::time::Duration::from_secs(120)) => {
1254 let _ = self.channel.send("Secret request timed out.").await;
1255 false
1256 }
1257 };
1258 if let Some(mgr) = self.subagent_manager.as_mut() {
1259 if approved {
1260 let ttl = std::time::Duration::from_secs(300);
1261 let key = req.secret_key.clone();
1262 if mgr.approve_secret(&req_handle_id, &key, ttl).is_ok() {
1263 let _ = mgr.deliver_secret(&req_handle_id, key);
1264 }
1265 } else {
1266 denied.insert(deny_key);
1267 let _ = mgr.deny_secret(&req_handle_id);
1268 }
1269 }
1270 }
1271 }
1272
1273 async fn finalize_plan_execution(
1275 &mut self,
1276 completed_graph: crate::orchestration::TaskGraph,
1277 final_status: crate::orchestration::GraphStatus,
1278 ) -> Result<&'static str, error::AgentError> {
1279 use std::fmt::Write;
1280
1281 use crate::orchestration::{Aggregator, GraphStatus, LlmAggregator};
1282
1283 let result_label = match final_status {
1284 GraphStatus::Completed => {
1285 let completed_count = completed_graph
1287 .tasks
1288 .iter()
1289 .filter(|t| t.status == crate::orchestration::TaskStatus::Completed)
1290 .count() as u64;
1291 self.update_metrics(|m| m.orchestration.tasks_completed += completed_count);
1292
1293 let aggregator =
1294 LlmAggregator::new(self.provider.clone(), &self.orchestration_config);
1295 match aggregator.aggregate(&completed_graph).await {
1296 Ok(synthesis) => {
1297 self.channel.send(&synthesis).await?;
1298 }
1299 Err(e) => {
1300 tracing::error!(error = %e, "aggregation failed");
1301 self.channel
1302 .send(
1303 "Plan completed but aggregation failed. \
1304 Check individual task results.",
1305 )
1306 .await?;
1307 }
1308 }
1309 "completed"
1310 }
1311 GraphStatus::Failed => {
1312 let failed_tasks: Vec<_> = completed_graph
1313 .tasks
1314 .iter()
1315 .filter(|t| t.status == crate::orchestration::TaskStatus::Failed)
1316 .collect();
1317 self.update_metrics(|m| {
1318 m.orchestration.tasks_failed += failed_tasks.len() as u64;
1319 });
1320 let mut msg = format!(
1321 "Plan failed. {}/{} tasks failed:\n",
1322 failed_tasks.len(),
1323 completed_graph.tasks.len()
1324 );
1325 for t in &failed_tasks {
1326 let err: std::borrow::Cow<str> =
1328 t.result.as_ref().map_or("unknown error".into(), |r| {
1329 if r.output.len() > 500 {
1330 r.output.chars().take(500).collect::<String>().into()
1331 } else {
1332 r.output.as_str().into()
1333 }
1334 });
1335 let _ = writeln!(msg, " - {}: {err}", t.title);
1336 }
1337 msg.push_str("\nUse `/plan retry` to retry failed tasks.");
1338 self.channel.send(&msg).await?;
1339 self.pending_graph = Some(completed_graph);
1341 "failed"
1342 }
1343 GraphStatus::Paused => {
1344 self.channel
1345 .send(
1346 "Plan paused due to a task failure (ask strategy). \
1347 Use `/plan resume` to continue or `/plan retry` to retry failed tasks.",
1348 )
1349 .await?;
1350 self.pending_graph = Some(completed_graph);
1351 "paused"
1352 }
1353 GraphStatus::Canceled => {
1354 let done_count = completed_graph
1355 .tasks
1356 .iter()
1357 .filter(|t| t.status == crate::orchestration::TaskStatus::Completed)
1358 .count();
1359 self.update_metrics(|m| m.orchestration.tasks_completed += done_count as u64);
1360 let total = completed_graph.tasks.len();
1361 self.channel
1362 .send(&format!(
1363 "Plan canceled. {done_count}/{total} tasks completed before cancellation."
1364 ))
1365 .await?;
1366 "canceled"
1369 }
1370 other => {
1371 tracing::warn!(%other, "unexpected graph status after Done");
1372 self.channel
1373 .send(&format!("Plan ended with status: {other}"))
1374 .await?;
1375 "unknown"
1376 }
1377 };
1378 Ok(result_label)
1379 }
1380
1381 async fn handle_plan_status(
1382 &mut self,
1383 _graph_id: Option<&str>,
1384 ) -> Result<(), error::AgentError> {
1385 use crate::orchestration::GraphStatus;
1386 let Some(ref graph) = self.pending_graph else {
1387 self.channel.send("No active plan.").await?;
1388 return Ok(());
1389 };
1390 let msg = match graph.status {
1391 GraphStatus::Created => {
1392 "A plan is awaiting confirmation. Type `/plan confirm` to execute or `/plan cancel` to abort."
1393 }
1394 GraphStatus::Running => "Plan is currently running.",
1395 GraphStatus::Paused => {
1396 "Plan is paused. Use `/plan resume` to continue or `/plan cancel` to abort."
1397 }
1398 GraphStatus::Failed => {
1399 "Plan failed. Use `/plan retry` to retry or `/plan cancel` to discard."
1400 }
1401 GraphStatus::Completed => "Plan completed successfully.",
1402 GraphStatus::Canceled => "Plan was canceled.",
1403 };
1404 self.channel.send(msg).await?;
1405 Ok(())
1406 }
1407
1408 async fn handle_plan_list(&mut self) -> Result<(), error::AgentError> {
1409 if let Some(ref graph) = self.pending_graph {
1410 let summary = format_plan_summary(graph);
1411 let status_label = match graph.status {
1412 crate::orchestration::GraphStatus::Created => "awaiting confirmation",
1413 crate::orchestration::GraphStatus::Running => "running",
1414 crate::orchestration::GraphStatus::Paused => "paused",
1415 crate::orchestration::GraphStatus::Failed => "failed (retryable)",
1416 _ => "unknown",
1417 };
1418 self.channel
1419 .send(&format!("{summary}\nStatus: {status_label}"))
1420 .await?;
1421 } else {
1422 self.channel.send("No recent plans.").await?;
1423 }
1424 Ok(())
1425 }
1426
1427 async fn handle_plan_cancel(
1428 &mut self,
1429 _graph_id: Option<&str>,
1430 ) -> Result<(), error::AgentError> {
1431 if let Some(token) = self.plan_cancel_token.take() {
1432 token.cancel();
1439 self.channel.send("Canceling plan execution...").await?;
1440 } else if self.pending_graph.take().is_some() {
1441 let now = std::time::Instant::now();
1442 self.update_metrics(|m| {
1443 if let Some(ref mut s) = m.orchestration_graph {
1444 "canceled".clone_into(&mut s.status);
1445 s.completed_at = Some(now);
1446 }
1447 });
1448 self.channel.send("Plan canceled.").await?;
1449 } else {
1450 self.channel.send("No active plan to cancel.").await?;
1451 }
1452 Ok(())
1453 }
1454
1455 async fn handle_plan_resume(
1460 &mut self,
1461 graph_id: Option<&str>,
1462 ) -> Result<(), error::AgentError> {
1463 use crate::orchestration::GraphStatus;
1464
1465 let Some(ref graph) = self.pending_graph else {
1466 self.channel
1467 .send("No paused plan to resume. Use `/plan status` to check the current state.")
1468 .await?;
1469 return Ok(());
1470 };
1471
1472 if let Some(id) = graph_id
1474 && graph.id.to_string() != id
1475 {
1476 self.channel
1477 .send(&format!(
1478 "Graph id '{id}' does not match the active plan ({}). \
1479 Use `/plan status` to see the active plan id.",
1480 graph.id
1481 ))
1482 .await?;
1483 return Ok(());
1484 }
1485
1486 if graph.status != GraphStatus::Paused {
1487 self.channel
1488 .send(&format!(
1489 "The active plan is in '{}' status and cannot be resumed. \
1490 Only Paused plans can be resumed.",
1491 graph.status
1492 ))
1493 .await?;
1494 return Ok(());
1495 }
1496
1497 let graph = self.pending_graph.take().unwrap();
1498
1499 tracing::info!(
1500 graph_id = %graph.id,
1501 "resuming paused graph"
1502 );
1503
1504 self.channel
1505 .send(&format!(
1506 "Resuming plan: {}\nUse `/plan confirm` to continue execution.",
1507 graph.goal
1508 ))
1509 .await?;
1510
1511 self.pending_graph = Some(graph);
1513 Ok(())
1514 }
1515
1516 async fn handle_plan_retry(&mut self, graph_id: Option<&str>) -> Result<(), error::AgentError> {
1522 use crate::orchestration::{GraphStatus, dag};
1523
1524 let Some(ref graph) = self.pending_graph else {
1525 self.channel
1526 .send("No active plan to retry. Use `/plan status` to check the current state.")
1527 .await?;
1528 return Ok(());
1529 };
1530
1531 if let Some(id) = graph_id
1533 && graph.id.to_string() != id
1534 {
1535 self.channel
1536 .send(&format!(
1537 "Graph id '{id}' does not match the active plan ({}). \
1538 Use `/plan status` to see the active plan id.",
1539 graph.id
1540 ))
1541 .await?;
1542 return Ok(());
1543 }
1544
1545 if graph.status != GraphStatus::Failed && graph.status != GraphStatus::Paused {
1546 self.channel
1547 .send(&format!(
1548 "The active plan is in '{}' status. Only Failed or Paused plans can be retried.",
1549 graph.status
1550 ))
1551 .await?;
1552 return Ok(());
1553 }
1554
1555 let mut graph = self.pending_graph.take().unwrap();
1556
1557 let failed_count = graph
1559 .tasks
1560 .iter()
1561 .filter(|t| t.status == crate::orchestration::TaskStatus::Failed)
1562 .count();
1563
1564 dag::reset_for_retry(&mut graph).map_err(|e| error::AgentError::Other(e.to_string()))?;
1565
1566 for task in &mut graph.tasks {
1571 if task.status == crate::orchestration::TaskStatus::Running {
1572 task.status = crate::orchestration::TaskStatus::Ready;
1573 task.assigned_agent = None;
1574 }
1575 }
1576
1577 tracing::info!(
1578 graph_id = %graph.id,
1579 failed_count,
1580 "retrying failed tasks in graph"
1581 );
1582
1583 self.channel
1584 .send(&format!(
1585 "Retrying {failed_count} failed task(s) in plan: {}\n\
1586 Use `/plan confirm` to execute.",
1587 graph.goal
1588 ))
1589 .await?;
1590
1591 self.pending_graph = Some(graph);
1593 Ok(())
1594 }
1595
1596 pub async fn shutdown(&mut self) {
1597 self.channel.send("Shutting down...").await.ok();
1598
1599 self.provider.save_router_state();
1601
1602 if let Some(ref mut mgr) = self.subagent_manager {
1603 mgr.shutdown_all();
1604 }
1605
1606 if let Some(ref manager) = self.mcp.manager {
1607 manager.shutdown_all_shared().await;
1608 }
1609
1610 if let Some(ref tx) = self.metrics_tx {
1611 let m = tx.borrow();
1612 if m.filter_applications > 0 {
1613 #[allow(clippy::cast_precision_loss)]
1614 let pct = if m.filter_raw_tokens > 0 {
1615 m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
1616 } else {
1617 0.0
1618 };
1619 tracing::info!(
1620 raw_tokens = m.filter_raw_tokens,
1621 saved_tokens = m.filter_saved_tokens,
1622 applications = m.filter_applications,
1623 "tool output filtering saved ~{} tokens ({pct:.0}%)",
1624 m.filter_saved_tokens,
1625 );
1626 }
1627 }
1628 tracing::info!("agent shutdown complete");
1629 }
1630
1631 #[allow(clippy::too_many_lines)]
1637 pub async fn run(&mut self) -> Result<(), error::AgentError> {
1638 if let Some(mut rx) = self.warmup_ready.take()
1639 && !*rx.borrow()
1640 {
1641 let _ = rx.changed().await;
1642 if !*rx.borrow() {
1643 tracing::warn!("model warmup did not complete successfully");
1644 }
1645 }
1646
1647 loop {
1648 if let Some(ref slot) = self.provider_override
1650 && let Some(new_provider) = slot
1651 .write()
1652 .unwrap_or_else(std::sync::PoisonError::into_inner)
1653 .take()
1654 {
1655 tracing::debug!(provider = new_provider.name(), "ACP model override applied");
1656 self.provider = new_provider;
1657 }
1658
1659 if let Some(ref mgr) = self.subagent_manager {
1661 let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
1662 .statuses()
1663 .into_iter()
1664 .map(|(id, s)| {
1665 let def = mgr.agents_def(&id);
1666 crate::metrics::SubAgentMetrics {
1667 name: def.map_or_else(
1668 || id[..8.min(id.len())].to_owned(),
1669 |d| d.name.clone(),
1670 ),
1671 id: id.clone(),
1672 state: format!("{:?}", s.state).to_lowercase(),
1673 turns_used: s.turns_used,
1674 max_turns: def.map_or(20, |d| d.permissions.max_turns),
1675 background: def.is_some_and(|d| d.permissions.background),
1676 elapsed_secs: s.started_at.elapsed().as_secs(),
1677 permission_mode: def.map_or_else(String::new, |d| {
1678 use crate::subagent::def::PermissionMode;
1679 match d.permissions.permission_mode {
1680 PermissionMode::Default => String::new(),
1681 PermissionMode::AcceptEdits => "accept_edits".into(),
1682 PermissionMode::DontAsk => "dont_ask".into(),
1683 PermissionMode::BypassPermissions => {
1684 "bypass_permissions".into()
1685 }
1686 PermissionMode::Plan => "plan".into(),
1687 }
1688 }),
1689 }
1690 })
1691 .collect();
1692 self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
1693 }
1694
1695 let completed = self.poll_subagents().await;
1697 for (task_id, result) in completed {
1698 let notice = if result.is_empty() {
1699 format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
1700 } else {
1701 format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
1702 };
1703 if let Err(e) = self.channel.send(¬ice).await {
1704 tracing::warn!(error = %e, "failed to send sub-agent completion notice");
1705 }
1706 }
1707
1708 self.drain_channel();
1709
1710 let (text, image_parts) = if let Some(queued) = self.message_queue.pop_front() {
1711 self.notify_queue_count().await;
1712 if queued.raw_attachments.is_empty() {
1713 (queued.text, queued.image_parts)
1714 } else {
1715 let msg = crate::channel::ChannelMessage {
1716 text: queued.text,
1717 attachments: queued.raw_attachments,
1718 };
1719 self.resolve_message(msg).await
1720 }
1721 } else {
1722 let incoming = tokio::select! {
1723 result = self.channel.recv() => result?,
1724 () = shutdown_signal(&mut self.shutdown) => {
1725 tracing::info!("shutting down");
1726 break;
1727 }
1728 Some(_) = recv_optional(&mut self.skill_state.skill_reload_rx) => {
1729 self.reload_skills().await;
1730 continue;
1731 }
1732 Some(_) = recv_optional(&mut self.instruction_reload_rx) => {
1733 self.reload_instructions();
1734 continue;
1735 }
1736 Some(_) = recv_optional(&mut self.config_reload_rx) => {
1737 self.reload_config();
1738 continue;
1739 }
1740 Some(msg) = recv_optional(&mut self.update_notify_rx) => {
1741 if let Err(e) = self.channel.send(&msg).await {
1742 tracing::warn!("failed to send update notification: {e}");
1743 }
1744 continue;
1745 }
1746 Some(msg) = recv_optional(&mut self.experiment_notify_rx) => {
1747 #[cfg(feature = "experiments")]
1750 { self.experiment_cancel = None; }
1751 if let Err(e) = self.channel.send(&msg).await {
1752 tracing::warn!("failed to send experiment completion: {e}");
1753 }
1754 continue;
1755 }
1756 Some(prompt) = recv_optional(&mut self.custom_task_rx) => {
1757 tracing::info!("scheduler: injecting custom task as agent turn");
1758 let text = format!("[Scheduled task] {prompt}");
1759 Some(crate::channel::ChannelMessage { text, attachments: Vec::new() })
1760 }
1761 };
1762 let Some(msg) = incoming else { break };
1763 self.drain_channel();
1764 self.resolve_message(msg).await
1765 };
1766
1767 let trimmed = text.trim();
1768
1769 if trimmed == "/clear-queue" {
1770 let n = self.clear_queue();
1771 self.notify_queue_count().await;
1772 self.channel
1773 .send(&format!("Cleared {n} queued messages."))
1774 .await?;
1775 let _ = self.channel.flush_chunks().await;
1776 continue;
1777 }
1778
1779 if trimmed == "/compact" {
1780 if self.messages.len() > self.context_manager.compaction_preserve_tail + 1 {
1781 match self.compact_context().await {
1782 Ok(()) => {
1783 let _ = self.channel.send("Context compacted successfully.").await;
1784 }
1785 Err(e) => {
1786 let _ = self.channel.send(&format!("Compaction failed: {e}")).await;
1787 }
1788 }
1789 } else {
1790 let _ = self.channel.send("Nothing to compact.").await;
1791 }
1792 let _ = self.channel.flush_chunks().await;
1793 continue;
1794 }
1795
1796 if trimmed == "/clear" {
1797 self.clear_history();
1798 let _ = self.channel.flush_chunks().await;
1799 continue;
1800 }
1801
1802 if trimmed == "/model" || trimmed.starts_with("/model ") {
1803 self.handle_model_command(trimmed).await;
1804 let _ = self.channel.flush_chunks().await;
1805 continue;
1806 }
1807
1808 if trimmed == "/debug-dump" || trimmed.starts_with("/debug-dump ") {
1809 self.handle_debug_dump_command(trimmed).await;
1810 let _ = self.channel.flush_chunks().await;
1811 continue;
1812 }
1813
1814 if trimmed == "/exit" || trimmed == "/quit" {
1815 if self.channel.supports_exit() {
1816 break;
1817 }
1818 let _ = self
1819 .channel
1820 .send("/exit is not supported in this channel.")
1821 .await;
1822 continue;
1823 }
1824
1825 self.process_user_message(text, image_parts).await?;
1826 }
1827
1828 Ok(())
1829 }
1830
1831 pub fn set_model(&mut self, model_id: &str) -> Result<(), String> {
1839 if model_id.is_empty() {
1840 return Err("model id must not be empty".to_string());
1841 }
1842 if model_id.len() > 256 {
1843 return Err("model id exceeds maximum length of 256 characters".to_string());
1844 }
1845 if !model_id
1846 .chars()
1847 .all(|c| c.is_ascii() && !c.is_ascii_control())
1848 {
1849 return Err("model id must contain only printable ASCII characters".to_string());
1850 }
1851 self.runtime.model_name = model_id.to_string();
1852 tracing::info!(model = model_id, "set_model called");
1853 Ok(())
1854 }
1855
1856 #[allow(clippy::too_many_lines)]
1858 async fn handle_model_command(&mut self, trimmed: &str) {
1859 let arg = trimmed.strip_prefix("/model").map_or("", str::trim);
1860
1861 if arg == "refresh" {
1862 if let Some(cache_dir) = dirs::cache_dir() {
1864 let models_dir = cache_dir.join("zeph").join("models");
1865 if let Ok(entries) = std::fs::read_dir(&models_dir) {
1866 for entry in entries.flatten() {
1867 let path = entry.path();
1868 if path.extension().and_then(|e| e.to_str()) == Some("json") {
1869 let _ = std::fs::remove_file(&path);
1870 }
1871 }
1872 }
1873 }
1874 match self.provider.list_models_remote().await {
1875 Ok(models) => {
1876 let _ = self
1877 .channel
1878 .send(&format!("Fetched {} models.", models.len()))
1879 .await;
1880 }
1881 Err(e) => {
1882 let _ = self
1883 .channel
1884 .send(&format!("Error fetching models: {e}"))
1885 .await;
1886 }
1887 }
1888 return;
1889 }
1890
1891 if arg.is_empty() {
1892 let cache = zeph_llm::model_cache::ModelCache::for_slug(self.provider.name());
1894 let models = if cache.is_stale() {
1895 None
1896 } else {
1897 cache.load().unwrap_or(None)
1898 };
1899 let models = if let Some(m) = models {
1900 m
1901 } else {
1902 match self.provider.list_models_remote().await {
1903 Ok(m) => m,
1904 Err(e) => {
1905 let _ = self
1906 .channel
1907 .send(&format!("Error fetching models: {e}"))
1908 .await;
1909 return;
1910 }
1911 }
1912 };
1913
1914 if models.is_empty() {
1915 let _ = self.channel.send("No models available.").await;
1916 return;
1917 }
1918 let mut lines = vec!["Available models:".to_string()];
1919 for (i, m) in models.iter().enumerate() {
1920 lines.push(format!(" {}. {} ({})", i + 1, m.display_name, m.id));
1921 }
1922 let _ = self.channel.send(&lines.join("\n")).await;
1923 return;
1924 }
1925
1926 let model_id = arg;
1928
1929 let cache = zeph_llm::model_cache::ModelCache::for_slug(self.provider.name());
1932 let known_models: Option<Vec<zeph_llm::model_cache::RemoteModelInfo>> = if cache.is_stale()
1933 {
1934 match self.provider.list_models_remote().await {
1935 Ok(m) if !m.is_empty() => Some(m),
1936 _ => None,
1937 }
1938 } else {
1939 cache.load().unwrap_or(None)
1940 };
1941 if let Some(models) = known_models {
1942 if !models.iter().any(|m| m.id == model_id) {
1943 let mut lines = vec![format!("Unknown model '{model_id}'. Available models:")];
1944 for m in &models {
1945 lines.push(format!(" • {} ({})", m.display_name, m.id));
1946 }
1947 let _ = self.channel.send(&lines.join("\n")).await;
1948 return;
1949 }
1950 } else {
1951 let _ = self
1952 .channel
1953 .send(
1954 "Model list unavailable, switching anyway — verify your model name is correct.",
1955 )
1956 .await;
1957 }
1958
1959 match self.set_model(model_id) {
1960 Ok(()) => {
1961 let _ = self
1962 .channel
1963 .send(&format!("Switched to model: {model_id}"))
1964 .await;
1965 }
1966 Err(e) => {
1967 let _ = self.channel.send(&format!("Error: {e}")).await;
1968 }
1969 }
1970 }
1971
1972 async fn handle_debug_dump_command(&mut self, trimmed: &str) {
1974 let arg = trimmed.strip_prefix("/debug-dump").map_or("", str::trim);
1975 if arg.is_empty() {
1976 match &self.debug_state.debug_dumper {
1977 Some(d) => {
1978 let _ = self
1979 .channel
1980 .send(&format!("Debug dump active: {}", d.dir().display()))
1981 .await;
1982 }
1983 None => {
1984 let _ = self
1985 .channel
1986 .send(
1987 "Debug dump is inactive. Use `/debug-dump <path>` to enable, \
1988 or start with `--debug-dump [dir]`.",
1989 )
1990 .await;
1991 }
1992 }
1993 return;
1994 }
1995 let dir = std::path::PathBuf::from(arg);
1996 match crate::debug_dump::DebugDumper::new(&dir, self.debug_state.dump_format) {
1997 Ok(dumper) => {
1998 let path = dumper.dir().display().to_string();
1999 self.debug_state.debug_dumper = Some(dumper);
2000 let _ = self
2001 .channel
2002 .send(&format!("Debug dump enabled: {path}"))
2003 .await;
2004 }
2005 Err(e) => {
2006 let _ = self
2007 .channel
2008 .send(&format!("Failed to enable debug dump: {e}"))
2009 .await;
2010 }
2011 }
2012 }
2013
2014 async fn resolve_message(
2015 &self,
2016 msg: crate::channel::ChannelMessage,
2017 ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
2018 use crate::channel::{Attachment, AttachmentKind};
2019 use zeph_llm::provider::{ImageData, MessagePart};
2020
2021 let text_base = msg.text.clone();
2022
2023 let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
2024 .attachments
2025 .into_iter()
2026 .partition(|a| a.kind == AttachmentKind::Audio);
2027
2028 tracing::debug!(
2029 audio = audio_attachments.len(),
2030 has_stt = self.stt.is_some(),
2031 "resolve_message attachments"
2032 );
2033
2034 let text = if !audio_attachments.is_empty()
2035 && let Some(stt) = self.stt.as_ref()
2036 {
2037 let mut transcribed_parts = Vec::new();
2038 for attachment in &audio_attachments {
2039 if attachment.data.len() > MAX_AUDIO_BYTES {
2040 tracing::warn!(
2041 size = attachment.data.len(),
2042 max = MAX_AUDIO_BYTES,
2043 "audio attachment exceeds size limit, skipping"
2044 );
2045 continue;
2046 }
2047 match stt
2048 .transcribe(&attachment.data, attachment.filename.as_deref())
2049 .await
2050 {
2051 Ok(result) => {
2052 tracing::info!(
2053 len = result.text.len(),
2054 language = ?result.language,
2055 "audio transcribed"
2056 );
2057 transcribed_parts.push(result.text);
2058 }
2059 Err(e) => {
2060 tracing::error!(error = %e, "audio transcription failed");
2061 }
2062 }
2063 }
2064 if transcribed_parts.is_empty() {
2065 text_base
2066 } else {
2067 let transcribed = transcribed_parts.join("\n");
2068 if text_base.is_empty() {
2069 transcribed
2070 } else {
2071 format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
2072 }
2073 }
2074 } else {
2075 if !audio_attachments.is_empty() {
2076 tracing::warn!(
2077 count = audio_attachments.len(),
2078 "audio attachments received but no STT provider configured, dropping"
2079 );
2080 }
2081 text_base
2082 };
2083
2084 let mut image_parts = Vec::new();
2085 for attachment in image_attachments {
2086 if attachment.data.len() > MAX_IMAGE_BYTES {
2087 tracing::warn!(
2088 size = attachment.data.len(),
2089 max = MAX_IMAGE_BYTES,
2090 "image attachment exceeds size limit, skipping"
2091 );
2092 continue;
2093 }
2094 let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
2095 image_parts.push(MessagePart::Image(Box::new(ImageData {
2096 data: attachment.data,
2097 mime_type,
2098 })));
2099 }
2100
2101 (text, image_parts)
2102 }
2103
2104 #[allow(clippy::too_many_lines)]
2105 async fn process_user_message(
2106 &mut self,
2107 text: String,
2108 image_parts: Vec<zeph_llm::provider::MessagePart>,
2109 ) -> Result<(), error::AgentError> {
2110 self.cancel_token = CancellationToken::new();
2111 let signal = Arc::clone(&self.cancel_signal);
2112 let token = self.cancel_token.clone();
2113 tokio::spawn(async move {
2114 signal.notified().await;
2115 token.cancel();
2116 });
2117 let trimmed = text.trim();
2118
2119 if trimmed == "/help" {
2120 self.handle_help_command().await?;
2121 let _ = self.channel.flush_chunks().await;
2122 return Ok(());
2123 }
2124
2125 if trimmed == "/status" {
2126 self.handle_status_command().await?;
2127 let _ = self.channel.flush_chunks().await;
2128 return Ok(());
2129 }
2130
2131 if trimmed == "/skills" {
2132 self.handle_skills_command().await?;
2133 let _ = self.channel.flush_chunks().await;
2134 return Ok(());
2135 }
2136
2137 if trimmed == "/skill" || trimmed.starts_with("/skill ") {
2138 let rest = trimmed.strip_prefix("/skill").unwrap_or("").trim();
2139 self.handle_skill_command(rest).await?;
2140 let _ = self.channel.flush_chunks().await;
2141 return Ok(());
2142 }
2143
2144 if trimmed == "/feedback" || trimmed.starts_with("/feedback ") {
2145 let rest = trimmed.strip_prefix("/feedback").unwrap_or("").trim();
2146 self.handle_feedback(rest).await?;
2147 let _ = self.channel.flush_chunks().await;
2148 return Ok(());
2149 }
2150
2151 if trimmed == "/mcp" || trimmed.starts_with("/mcp ") {
2152 let args = trimmed.strip_prefix("/mcp").unwrap_or("").trim();
2153 self.handle_mcp_command(args).await?;
2154 let _ = self.channel.flush_chunks().await;
2155 return Ok(());
2156 }
2157
2158 if trimmed == "/image" || trimmed.starts_with("/image ") {
2159 let path = trimmed.strip_prefix("/image").unwrap_or("").trim();
2160 if path.is_empty() {
2161 self.channel.send("Usage: /image <path>").await?;
2162 let _ = self.channel.flush_chunks().await;
2163 return Ok(());
2164 }
2165 self.handle_image_command(path).await?;
2166 let _ = self.channel.flush_chunks().await;
2167 return Ok(());
2168 }
2169
2170 if trimmed == "/plan" || trimmed.starts_with("/plan ") {
2171 match crate::orchestration::PlanCommand::parse(trimmed) {
2172 Ok(cmd) => {
2173 self.handle_plan_command(cmd).await?;
2174 let _ = self.channel.flush_chunks().await;
2175 return Ok(());
2176 }
2177 Err(e) => {
2178 self.channel.send(&e.to_string()).await?;
2179 let _ = self.channel.flush_chunks().await;
2180 return Ok(());
2181 }
2182 }
2183 }
2184
2185 if trimmed == "/graph" || trimmed.starts_with("/graph ") {
2186 self.handle_graph_command(trimmed).await?;
2187 let _ = self.channel.flush_chunks().await;
2188 return Ok(());
2189 }
2190
2191 #[cfg(feature = "scheduler")]
2192 if trimmed == "/scheduler" || trimmed.starts_with("/scheduler ") {
2193 self.handle_scheduler_command(trimmed).await?;
2194 let _ = self.channel.flush_chunks().await;
2195 return Ok(());
2196 }
2197
2198 #[cfg(feature = "experiments")]
2199 if trimmed == "/experiment" || trimmed.starts_with("/experiment ") {
2200 self.handle_experiment_command(trimmed).await?;
2201 let _ = self.channel.flush_chunks().await;
2202 return Ok(());
2203 }
2204
2205 #[cfg(feature = "lsp-context")]
2206 if trimmed == "/lsp" {
2207 self.handle_lsp_status_command().await?;
2208 let _ = self.channel.flush_chunks().await;
2209 return Ok(());
2210 }
2211
2212 if trimmed == "/log" {
2213 self.handle_log_command().await?;
2214 let _ = self.channel.flush_chunks().await;
2215 return Ok(());
2216 }
2217
2218 if trimmed.starts_with("/agent") || trimmed.starts_with('@') {
2219 let known: Vec<String> = self
2220 .subagent_manager
2221 .as_ref()
2222 .map(|m| m.definitions().iter().map(|d| d.name.clone()).collect())
2223 .unwrap_or_default();
2224 match crate::subagent::AgentCommand::parse(trimmed, &known) {
2225 Ok(cmd) => {
2226 if let Some(msg) = self.handle_agent_command(cmd).await {
2227 self.channel.send(&msg).await?;
2228 }
2229 let _ = self.channel.flush_chunks().await;
2230 return Ok(());
2231 }
2232 Err(e) if trimmed.starts_with('@') => {
2233 tracing::debug!("@mention not matched as agent: {e}");
2235 }
2236 Err(e) => {
2237 self.channel.send(&e.to_string()).await?;
2238 let _ = self.channel.flush_chunks().await;
2239 return Ok(());
2240 }
2241 }
2242 }
2243
2244 self.check_pending_rollbacks().await;
2245 let conv_id = self.memory_state.conversation_id;
2248 self.rebuild_system_prompt(&text).await;
2249
2250 let correction_detection_enabled = self
2251 .learning_engine
2252 .config
2253 .as_ref()
2254 .is_none_or(|c| c.correction_detection);
2255 if self.is_learning_enabled() && correction_detection_enabled {
2256 let previous_user_messages: Vec<&str> = self
2257 .messages
2258 .iter()
2259 .filter(|m| m.role == Role::User)
2260 .map(|m| m.content.as_str())
2261 .collect();
2262 let regex_signal = self
2263 .feedback_detector
2264 .detect(trimmed, &previous_user_messages);
2265
2266 let judge_should_run = self
2279 .judge_detector
2280 .as_ref()
2281 .is_some_and(|jd| jd.should_invoke(regex_signal.as_ref()))
2282 && self
2283 .judge_detector
2284 .as_mut()
2285 .is_some_and(feedback_detector::JudgeDetector::check_rate_limit);
2286
2287 let signal = if judge_should_run {
2288 let judge_provider = self
2289 .judge_provider
2290 .clone()
2291 .unwrap_or_else(|| self.provider.clone());
2292 let assistant_snippet = self.last_assistant_response();
2293 let user_msg_owned = trimmed.to_owned();
2294 let memory_arc = self.memory_state.memory.clone();
2295 let skill_name = self
2296 .skill_state
2297 .active_skill_names
2298 .first()
2299 .cloned()
2300 .unwrap_or_default();
2301 let conv_id_bg = conv_id;
2302 let confidence_threshold = self
2304 .learning_engine
2305 .config
2306 .as_ref()
2307 .map_or(0.6, |c| c.correction_confidence_threshold);
2308
2309 tokio::spawn(async move {
2310 match feedback_detector::JudgeDetector::evaluate(
2311 &judge_provider,
2312 &user_msg_owned,
2313 &assistant_snippet,
2314 confidence_threshold,
2315 )
2316 .await
2317 {
2318 Ok(verdict) => {
2319 if let Some(signal) = verdict.into_signal(&user_msg_owned) {
2320 let is_self_correction = signal.kind
2325 == feedback_detector::CorrectionKind::SelfCorrection;
2326 tracing::info!(
2327 kind = signal.kind.as_str(),
2328 confidence = signal.confidence,
2329 source = "judge",
2330 is_self_correction,
2331 "correction signal detected"
2332 );
2333 if let Some(memory) = memory_arc {
2334 let correction_text =
2335 context::truncate_chars(&user_msg_owned, 500);
2336 match memory
2337 .sqlite()
2338 .store_user_correction(
2339 conv_id_bg.map(|c| c.0),
2340 &assistant_snippet,
2341 &correction_text,
2342 if skill_name.is_empty() {
2343 None
2344 } else {
2345 Some(skill_name.as_str())
2346 },
2347 signal.kind.as_str(),
2348 )
2349 .await
2350 {
2351 Ok(correction_id) => {
2352 if let Err(e) = memory
2353 .store_correction_embedding(
2354 correction_id,
2355 &correction_text,
2356 )
2357 .await
2358 {
2359 tracing::warn!(
2360 "failed to store correction embedding: {e:#}"
2361 );
2362 }
2363 }
2364 Err(e) => {
2365 tracing::warn!(
2366 "failed to store judge correction: {e:#}"
2367 );
2368 }
2369 }
2370 }
2371 }
2372 }
2373 Err(e) => {
2374 tracing::warn!("judge detector failed: {e:#}");
2375 }
2376 }
2377 });
2378
2379 None
2381 } else {
2382 regex_signal
2383 };
2384
2385 if let Some(signal) = signal {
2386 tracing::info!(
2387 kind = signal.kind.as_str(),
2388 confidence = signal.confidence,
2389 source = "regex",
2390 "implicit correction detected"
2391 );
2392 let feedback_text = context::truncate_chars(&signal.feedback_text, 500);
2394 if signal.kind != feedback_detector::CorrectionKind::SelfCorrection {
2397 self.record_skill_outcomes(
2398 "user_rejection",
2399 Some(&feedback_text),
2400 Some(signal.kind.as_str()),
2401 )
2402 .await;
2403 }
2404 if let Some(memory) = &self.memory_state.memory {
2405 let correction_text = context::truncate_chars(trimmed, 500);
2409 match memory
2410 .sqlite()
2411 .store_user_correction(
2412 conv_id.map(|c| c.0),
2413 "",
2414 &correction_text,
2415 self.skill_state
2416 .active_skill_names
2417 .first()
2418 .map(String::as_str),
2419 signal.kind.as_str(),
2420 )
2421 .await
2422 {
2423 Ok(correction_id) => {
2424 if let Err(e) = memory
2425 .store_correction_embedding(correction_id, &correction_text)
2426 .await
2427 {
2428 tracing::warn!("failed to store correction embedding: {e:#}");
2429 }
2430 }
2431 Err(e) => tracing::warn!("failed to store user correction: {e:#}"),
2432 }
2433 }
2434 }
2435 }
2436
2437 self.context_manager.compacted_this_turn = false;
2439
2440 self.maybe_apply_deferred_summaries();
2445
2446 if let Err(e) = self.maybe_proactive_compress().await {
2448 tracing::warn!("proactive compression failed: {e:#}");
2449 }
2450
2451 if let Err(e) = self.maybe_compact().await {
2452 tracing::warn!("context compaction failed: {e:#}");
2453 }
2454
2455 if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
2456 tracing::warn!("context preparation failed: {e:#}");
2457 }
2458
2459 self.learning_engine.reset_reflection();
2460
2461 let mut all_image_parts = std::mem::take(&mut self.pending_image_parts);
2462 all_image_parts.extend(image_parts);
2463 let image_parts = all_image_parts;
2464
2465 let user_msg = if !image_parts.is_empty() && self.provider.supports_vision() {
2466 let mut parts = vec![zeph_llm::provider::MessagePart::Text { text: text.clone() }];
2467 parts.extend(image_parts);
2468 Message::from_parts(Role::User, parts)
2469 } else {
2470 if !image_parts.is_empty() {
2471 tracing::warn!(
2472 count = image_parts.len(),
2473 "image attachments dropped: provider does not support vision"
2474 );
2475 }
2476 Message {
2477 role: Role::User,
2478 content: text.clone(),
2479 parts: vec![],
2480 metadata: MessageMetadata::default(),
2481 }
2482 };
2483 self.persist_message(Role::User, &text, &[], false).await;
2485 self.push_message(user_msg);
2486
2487 if let Err(e) = self.process_response().await {
2488 tracing::error!("Response processing failed: {e:#}");
2489 let user_msg = format!("Error: {e:#}");
2490 self.channel.send(&user_msg).await?;
2491 self.messages.pop();
2492 self.recompute_prompt_tokens();
2493 self.channel.flush_chunks().await?;
2494 }
2495
2496 Ok(())
2497 }
2498
2499 async fn handle_image_command(&mut self, path: &str) -> Result<(), error::AgentError> {
2500 use std::path::Component;
2501 use zeph_llm::provider::{ImageData, MessagePart};
2502
2503 let has_parent_dir = std::path::Path::new(path)
2505 .components()
2506 .any(|c| c == Component::ParentDir);
2507 if has_parent_dir {
2508 self.channel
2509 .send("Invalid image path: path traversal not allowed")
2510 .await?;
2511 let _ = self.channel.flush_chunks().await;
2512 return Ok(());
2513 }
2514
2515 let data = match std::fs::read(path) {
2516 Ok(d) => d,
2517 Err(e) => {
2518 self.channel
2519 .send(&format!("Cannot read image {path}: {e}"))
2520 .await?;
2521 let _ = self.channel.flush_chunks().await;
2522 return Ok(());
2523 }
2524 };
2525 if data.len() > MAX_IMAGE_BYTES {
2526 self.channel
2527 .send(&format!(
2528 "Image {path} exceeds size limit ({} MB), skipping",
2529 MAX_IMAGE_BYTES / 1024 / 1024
2530 ))
2531 .await?;
2532 let _ = self.channel.flush_chunks().await;
2533 return Ok(());
2534 }
2535 let mime_type = detect_image_mime(Some(path)).to_string();
2536 self.pending_image_parts
2537 .push(MessagePart::Image(Box::new(ImageData { data, mime_type })));
2538 self.channel
2539 .send(&format!("Image loaded: {path}. Send your message."))
2540 .await?;
2541 let _ = self.channel.flush_chunks().await;
2542 Ok(())
2543 }
2544
2545 async fn handle_help_command(&mut self) -> Result<(), error::AgentError> {
2546 use std::fmt::Write;
2547
2548 let mut out = String::from("Slash commands:\n\n");
2549
2550 let categories = [
2551 slash_commands::SlashCategory::Info,
2552 slash_commands::SlashCategory::Session,
2553 slash_commands::SlashCategory::Model,
2554 slash_commands::SlashCategory::Memory,
2555 slash_commands::SlashCategory::Tools,
2556 slash_commands::SlashCategory::Planning,
2557 slash_commands::SlashCategory::Debug,
2558 slash_commands::SlashCategory::Advanced,
2559 ];
2560
2561 for cat in &categories {
2562 let entries: Vec<_> = slash_commands::COMMANDS
2563 .iter()
2564 .filter(|c| &c.category == cat)
2565 .collect();
2566 if entries.is_empty() {
2567 continue;
2568 }
2569 let _ = writeln!(out, "{}:", cat.as_str());
2570 for cmd in entries {
2571 if cmd.args.is_empty() {
2572 let _ = write!(out, " {}", cmd.name);
2573 } else {
2574 let _ = write!(out, " {} {}", cmd.name, cmd.args);
2575 }
2576 let _ = write!(out, " — {}", cmd.description);
2577 if let Some(feat) = cmd.feature_gate {
2578 let _ = write!(out, " [requires: {feat}]");
2579 }
2580 let _ = writeln!(out);
2581 }
2582 let _ = writeln!(out);
2583 }
2584
2585 self.channel.send(out.trim_end()).await?;
2586 Ok(())
2587 }
2588
2589 async fn handle_status_command(&mut self) -> Result<(), error::AgentError> {
2590 use std::fmt::Write;
2591
2592 let uptime = self.start_time.elapsed().as_secs();
2593 let msg_count = self
2594 .messages
2595 .iter()
2596 .filter(|m| m.role == Role::User)
2597 .count();
2598
2599 let (api_calls, prompt_tokens, completion_tokens, cost_cents, mcp_servers) =
2600 if let Some(ref tx) = self.metrics_tx {
2601 let m = tx.borrow();
2602 (
2603 m.api_calls,
2604 m.prompt_tokens,
2605 m.completion_tokens,
2606 m.cost_spent_cents,
2607 m.mcp_server_count,
2608 )
2609 } else {
2610 (0, 0, 0, 0.0, 0)
2611 };
2612
2613 let skill_count = self
2614 .skill_state
2615 .registry
2616 .read()
2617 .map(|r| r.all_meta().len())
2618 .unwrap_or(0);
2619
2620 let mut out = String::from("Session status:\n\n");
2621 let _ = writeln!(out, "Provider: {}", self.provider.name());
2622 let _ = writeln!(out, "Model: {}", self.runtime.model_name);
2623 let _ = writeln!(out, "Uptime: {uptime}s");
2624 let _ = writeln!(out, "Turns: {msg_count}");
2625 let _ = writeln!(out, "API calls: {api_calls}");
2626 let _ = writeln!(
2627 out,
2628 "Tokens: {prompt_tokens} prompt / {completion_tokens} completion"
2629 );
2630 let _ = writeln!(out, "Skills: {skill_count}");
2631 let _ = writeln!(out, "MCP: {mcp_servers} server(s)");
2632 if cost_cents > 0.0 {
2633 let _ = writeln!(out, "Cost: ${:.4}", cost_cents / 100.0);
2634 }
2635
2636 self.channel.send(out.trim_end()).await?;
2637 Ok(())
2638 }
2639
2640 async fn handle_skills_command(&mut self) -> Result<(), error::AgentError> {
2641 use std::fmt::Write;
2642
2643 let mut output = String::from("Available skills:\n\n");
2644
2645 let all_meta: Vec<zeph_skills::loader::SkillMeta> = self
2646 .skill_state
2647 .registry
2648 .read()
2649 .expect("registry read lock")
2650 .all_meta()
2651 .into_iter()
2652 .cloned()
2653 .collect();
2654
2655 for meta in &all_meta {
2656 let trust_info = if let Some(memory) = &self.memory_state.memory {
2657 memory
2658 .sqlite()
2659 .load_skill_trust(&meta.name)
2660 .await
2661 .ok()
2662 .flatten()
2663 .map_or_else(String::new, |r| format!(" [{}]", r.trust_level))
2664 } else {
2665 String::new()
2666 };
2667 let _ = writeln!(output, "- {} — {}{trust_info}", meta.name, meta.description);
2668 }
2669
2670 if let Some(memory) = &self.memory_state.memory {
2671 match memory.sqlite().load_skill_usage().await {
2672 Ok(usage) if !usage.is_empty() => {
2673 output.push_str("\nUsage statistics:\n\n");
2674 for row in &usage {
2675 let _ = writeln!(
2676 output,
2677 "- {}: {} invocations (last: {})",
2678 row.skill_name, row.invocation_count, row.last_used_at,
2679 );
2680 }
2681 }
2682 Ok(_) => {}
2683 Err(e) => tracing::warn!("failed to load skill usage: {e:#}"),
2684 }
2685 }
2686
2687 self.channel.send(&output).await?;
2688 Ok(())
2689 }
2690
2691 async fn handle_feedback(&mut self, input: &str) -> Result<(), error::AgentError> {
2692 let Some((name, rest)) = input.split_once(' ') else {
2693 self.channel
2694 .send("Usage: /feedback <skill_name> <message>")
2695 .await?;
2696 return Ok(());
2697 };
2698 let (skill_name, feedback) = (name.trim(), rest.trim().trim_matches('"'));
2699
2700 if feedback.is_empty() {
2701 self.channel
2702 .send("Usage: /feedback <skill_name> <message>")
2703 .await?;
2704 return Ok(());
2705 }
2706
2707 let Some(memory) = &self.memory_state.memory else {
2708 self.channel.send("Memory not available.").await?;
2709 return Ok(());
2710 };
2711
2712 let outcome_type = if self.feedback_detector.detect(feedback, &[]).is_some() {
2713 "user_rejection"
2714 } else {
2715 "user_approval"
2716 };
2717
2718 memory
2719 .sqlite()
2720 .record_skill_outcome(
2721 skill_name,
2722 None,
2723 self.memory_state.conversation_id,
2724 outcome_type,
2725 None,
2726 Some(feedback),
2727 )
2728 .await?;
2729
2730 if self.is_learning_enabled() && outcome_type == "user_rejection" {
2731 self.generate_improved_skill(skill_name, feedback, "", Some(feedback))
2732 .await
2733 .ok();
2734 }
2735
2736 self.channel
2737 .send(&format!("Feedback recorded for \"{skill_name}\"."))
2738 .await?;
2739 Ok(())
2740 }
2741
2742 #[allow(clippy::too_many_lines)]
2743 async fn handle_agent_command(&mut self, cmd: crate::subagent::AgentCommand) -> Option<String> {
2744 use crate::subagent::{AgentCommand, SubAgentState};
2745 use std::fmt::Write as _;
2746
2747 match cmd {
2748 AgentCommand::List => {
2749 let mgr = self.subagent_manager.as_ref()?;
2750 let defs = mgr.definitions();
2751 if defs.is_empty() {
2752 return Some("No sub-agent definitions found.".into());
2753 }
2754 let mut out = String::from("Available sub-agents:\n");
2755 for d in defs {
2756 let memory_label = match d.memory {
2757 Some(crate::subagent::MemoryScope::User) => " [memory:user]",
2758 Some(crate::subagent::MemoryScope::Project) => " [memory:project]",
2759 Some(crate::subagent::MemoryScope::Local) => " [memory:local]",
2760 None => "",
2761 };
2762 if let Some(ref src) = d.source {
2763 let _ = writeln!(
2764 out,
2765 " {}{} — {} ({})",
2766 d.name, memory_label, d.description, src
2767 );
2768 } else {
2769 let _ = writeln!(out, " {}{} — {}", d.name, memory_label, d.description);
2770 }
2771 }
2772 Some(out)
2773 }
2774 AgentCommand::Background { name, prompt } => {
2775 let provider = self.provider.clone();
2776 let tool_executor = Arc::clone(&self.tool_executor);
2777 let skills = self.filtered_skills_for(&name);
2778 let mgr = self.subagent_manager.as_mut()?;
2779 let cfg = self.subagent_config.clone();
2780 match mgr.spawn(&name, &prompt, provider, tool_executor, skills, &cfg) {
2781 Ok(id) => Some(format!(
2782 "Sub-agent '{name}' started in background (id: {short})",
2783 short = &id[..8.min(id.len())]
2784 )),
2785 Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
2786 }
2787 }
2788 AgentCommand::Spawn { name, prompt }
2789 | AgentCommand::Mention {
2790 agent: name,
2791 prompt,
2792 } => {
2793 let provider = self.provider.clone();
2795 let tool_executor = Arc::clone(&self.tool_executor);
2796 let skills = self.filtered_skills_for(&name);
2797 let mgr = self.subagent_manager.as_mut()?;
2798 let cfg = self.subagent_config.clone();
2799 let task_id = match mgr.spawn(&name, &prompt, provider, tool_executor, skills, &cfg)
2800 {
2801 Ok(id) => id,
2802 Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
2803 };
2804 let short = task_id[..8.min(task_id.len())].to_owned();
2805 let _ = self
2806 .channel
2807 .send(&format!("Sub-agent '{name}' running... (id: {short})"))
2808 .await;
2809 let result = loop {
2811 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
2812
2813 #[allow(clippy::redundant_closure_for_method_calls)]
2817 let pending = self
2818 .subagent_manager
2819 .as_mut()
2820 .and_then(|m| m.try_recv_secret_request());
2821 if let Some((req_task_id, req)) = pending {
2822 let prompt = format!(
2830 "Sub-agent requests secret '{}'. Allow?",
2831 crate::text::truncate_to_chars(&req.secret_key, 100)
2832 );
2833 let approved = self.channel.confirm(&prompt).await.unwrap_or(false);
2834 if let Some(mgr) = self.subagent_manager.as_mut() {
2835 if approved {
2836 let ttl = std::time::Duration::from_secs(300);
2837 let key = req.secret_key.clone();
2838 if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
2839 let _ = mgr.deliver_secret(&req_task_id, key);
2840 }
2841 } else {
2842 let _ = mgr.deny_secret(&req_task_id);
2843 }
2844 }
2845 }
2846
2847 let mgr = self.subagent_manager.as_ref()?;
2848 let statuses = mgr.statuses();
2849 let Some((_, status)) = statuses.iter().find(|(id, _)| id == &task_id) else {
2850 break "Sub-agent completed (no status available).".to_owned();
2851 };
2852 match status.state {
2853 SubAgentState::Completed => {
2854 let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
2855 break format!("Sub-agent '{name}' completed: {msg}");
2856 }
2857 SubAgentState::Failed => {
2858 let msg = status
2859 .last_message
2860 .clone()
2861 .unwrap_or_else(|| "unknown error".into());
2862 break format!("Sub-agent '{name}' failed: {msg}");
2863 }
2864 SubAgentState::Canceled => {
2865 break format!("Sub-agent '{name}' was cancelled.");
2866 }
2867 _ => {
2868 let _ = self
2869 .channel
2870 .send_status(&format!(
2871 "sub-agent '{name}': turn {}/{}",
2872 status.turns_used,
2873 self.subagent_manager
2874 .as_ref()
2875 .and_then(|m| m.agents_def(&task_id))
2876 .map_or(20, |d| d.permissions.max_turns)
2877 ))
2878 .await;
2879 }
2880 }
2881 };
2882 Some(result)
2883 }
2884 AgentCommand::Status => {
2885 let mgr = self.subagent_manager.as_ref()?;
2886 let statuses = mgr.statuses();
2887 if statuses.is_empty() {
2888 return Some("No active sub-agents.".into());
2889 }
2890 let mut out = String::from("Active sub-agents:\n");
2891 for (id, s) in &statuses {
2892 let state = format!("{:?}", s.state).to_lowercase();
2893 let elapsed = s.started_at.elapsed().as_secs();
2894 let _ = writeln!(
2895 out,
2896 " [{short}] {state} turns={t} elapsed={elapsed}s {msg}",
2897 short = &id[..8.min(id.len())],
2898 t = s.turns_used,
2899 msg = s.last_message.as_deref().unwrap_or(""),
2900 );
2901 if let Some(def) = mgr.agents_def(id)
2903 && let Some(scope) = def.memory
2904 && let Ok(dir) =
2905 crate::subagent::memory::resolve_memory_dir(scope, &def.name)
2906 {
2907 let _ = writeln!(out, " memory: {}", dir.display());
2908 }
2909 }
2910 Some(out)
2911 }
2912 AgentCommand::Cancel { id } => {
2913 let mgr = self.subagent_manager.as_mut()?;
2914 let ids: Vec<String> = mgr
2916 .statuses()
2917 .into_iter()
2918 .map(|(task_id, _)| task_id)
2919 .filter(|task_id| task_id.starts_with(&id))
2920 .collect();
2921 match ids.as_slice() {
2922 [] => Some(format!("No sub-agent with id prefix '{id}'")),
2923 [full_id] => {
2924 let full_id = full_id.clone();
2925 match mgr.cancel(&full_id) {
2926 Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
2927 Err(e) => Some(format!("Cancel failed: {e}")),
2928 }
2929 }
2930 _ => Some(format!(
2931 "Ambiguous id prefix '{id}': matches {} agents",
2932 ids.len()
2933 )),
2934 }
2935 }
2936 AgentCommand::Approve { id } => {
2937 let mgr = self.subagent_manager.as_mut()?;
2939 let full_ids: Vec<String> = mgr
2940 .statuses()
2941 .into_iter()
2942 .map(|(tid, _)| tid)
2943 .filter(|tid| tid.starts_with(&id))
2944 .collect();
2945 let full_id = match full_ids.as_slice() {
2946 [] => return Some(format!("No sub-agent with id prefix '{id}'")),
2947 [fid] => fid.clone(),
2948 _ => {
2949 return Some(format!(
2950 "Ambiguous id prefix '{id}': matches {} agents",
2951 full_ids.len()
2952 ));
2953 }
2954 };
2955 if let Some((tid, req)) = mgr.try_recv_secret_request()
2956 && tid == full_id
2957 {
2958 let key = req.secret_key.clone();
2959 let ttl = std::time::Duration::from_secs(300);
2960 if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
2961 return Some(format!("Approve failed: {e}"));
2962 }
2963 if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
2964 return Some(format!("Secret delivery failed: {e}"));
2965 }
2966 return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
2967 }
2968 Some(format!(
2969 "No pending secret request for sub-agent '{full_id}'."
2970 ))
2971 }
2972 AgentCommand::Deny { id } => {
2973 let mgr = self.subagent_manager.as_mut()?;
2974 let full_ids: Vec<String> = mgr
2975 .statuses()
2976 .into_iter()
2977 .map(|(tid, _)| tid)
2978 .filter(|tid| tid.starts_with(&id))
2979 .collect();
2980 let full_id = match full_ids.as_slice() {
2981 [] => return Some(format!("No sub-agent with id prefix '{id}'")),
2982 [fid] => fid.clone(),
2983 _ => {
2984 return Some(format!(
2985 "Ambiguous id prefix '{id}': matches {} agents",
2986 full_ids.len()
2987 ));
2988 }
2989 };
2990 match mgr.deny_secret(&full_id) {
2991 Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
2992 Err(e) => Some(format!("Deny failed: {e}")),
2993 }
2994 }
2995 AgentCommand::Resume { id, prompt } => {
2996 let cfg = self.subagent_config.clone();
2997 let def_name = {
3000 let mgr = self.subagent_manager.as_ref()?;
3001 match mgr.def_name_for_resume(&id, &cfg) {
3002 Ok(name) => name,
3003 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
3004 }
3005 };
3006 let skills = self.filtered_skills_for(&def_name);
3007 let provider = self.provider.clone();
3008 let tool_executor = Arc::clone(&self.tool_executor);
3009 let mgr = self.subagent_manager.as_mut()?;
3010 let (task_id, _) =
3011 match mgr.resume(&id, &prompt, provider, tool_executor, skills, &cfg) {
3012 Ok(pair) => pair,
3013 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
3014 };
3015 let short = task_id[..8.min(task_id.len())].to_owned();
3016 let _ = self
3017 .channel
3018 .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
3019 .await;
3020 let result = loop {
3022 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
3023
3024 #[allow(clippy::redundant_closure_for_method_calls)]
3025 let pending = self
3026 .subagent_manager
3027 .as_mut()
3028 .and_then(|m| m.try_recv_secret_request());
3029 if let Some((req_task_id, req)) = pending {
3030 let confirm_prompt = format!(
3031 "Sub-agent requests secret '{}'. Allow?",
3032 crate::text::truncate_to_chars(&req.secret_key, 100)
3033 );
3034 let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
3035 if let Some(mgr) = self.subagent_manager.as_mut() {
3036 if approved {
3037 let ttl = std::time::Duration::from_secs(300);
3038 let key = req.secret_key.clone();
3039 if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
3040 let _ = mgr.deliver_secret(&req_task_id, key);
3041 }
3042 } else {
3043 let _ = mgr.deny_secret(&req_task_id);
3044 }
3045 }
3046 }
3047
3048 let mgr = self.subagent_manager.as_ref()?;
3049 let statuses = mgr.statuses();
3050 let Some((_, status)) = statuses.iter().find(|(tid, _)| tid == &task_id) else {
3051 break "Sub-agent resume completed (no status available).".to_owned();
3052 };
3053 match status.state {
3054 SubAgentState::Completed => {
3055 let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
3056 break format!("Resumed sub-agent completed: {msg}");
3057 }
3058 SubAgentState::Failed => {
3059 let msg = status
3060 .last_message
3061 .clone()
3062 .unwrap_or_else(|| "unknown error".into());
3063 break format!("Resumed sub-agent failed: {msg}");
3064 }
3065 SubAgentState::Canceled => {
3066 break "Resumed sub-agent was cancelled.".to_owned();
3067 }
3068 _ => {
3069 let _ = self
3070 .channel
3071 .send_status(&format!(
3072 "resumed sub-agent: turn {}/{}",
3073 status.turns_used,
3074 self.subagent_manager
3075 .as_ref()
3076 .and_then(|m| m.agents_def(&task_id))
3077 .map_or(20, |d| d.permissions.max_turns)
3078 ))
3079 .await;
3080 }
3081 }
3082 };
3083 Some(result)
3084 }
3085 }
3086 }
3087
3088 fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
3089 let mgr = self.subagent_manager.as_ref()?;
3090 let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
3091 let reg = self
3092 .skill_state
3093 .registry
3094 .read()
3095 .expect("registry read lock");
3096 match crate::subagent::filter_skills(®, &def.skills) {
3097 Ok(skills) => {
3098 let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
3099 if bodies.is_empty() {
3100 None
3101 } else {
3102 Some(bodies)
3103 }
3104 }
3105 Err(e) => {
3106 tracing::warn!(error = %e, "skill filtering failed for sub-agent");
3107 None
3108 }
3109 }
3110 }
3111
3112 #[allow(clippy::too_many_lines)]
3113 async fn reload_skills(&mut self) {
3114 let new_registry = SkillRegistry::load(&self.skill_state.skill_paths);
3115 if new_registry.fingerprint()
3116 == self
3117 .skill_state
3118 .registry
3119 .read()
3120 .expect("registry read lock")
3121 .fingerprint()
3122 {
3123 return;
3124 }
3125 let _ = self.channel.send_status("reloading skills...").await;
3126 *self
3127 .skill_state
3128 .registry
3129 .write()
3130 .expect("registry write lock") = new_registry;
3131
3132 let all_meta = self
3133 .skill_state
3134 .registry
3135 .read()
3136 .expect("registry read lock")
3137 .all_meta()
3138 .into_iter()
3139 .cloned()
3140 .collect::<Vec<_>>();
3141
3142 if let Some(ref memory) = self.memory_state.memory {
3144 let trust_cfg = self.skill_state.trust_config.clone();
3145 let managed_dir = self.skill_state.managed_dir.clone();
3146 for meta in &all_meta {
3147 let source_kind = if managed_dir
3148 .as_ref()
3149 .is_some_and(|d| meta.skill_dir.starts_with(d))
3150 {
3151 zeph_memory::sqlite::SourceKind::Hub
3152 } else {
3153 zeph_memory::sqlite::SourceKind::Local
3154 };
3155 let initial_level = if matches!(source_kind, zeph_memory::sqlite::SourceKind::Hub) {
3156 &trust_cfg.default_level
3157 } else {
3158 &trust_cfg.local_level
3159 };
3160 match zeph_skills::compute_skill_hash(&meta.skill_dir) {
3161 Ok(current_hash) => {
3162 let existing = memory
3163 .sqlite()
3164 .load_skill_trust(&meta.name)
3165 .await
3166 .ok()
3167 .flatten();
3168 let trust_level_str = if let Some(ref row) = existing {
3169 if row.blake3_hash == current_hash {
3170 row.trust_level.clone()
3171 } else {
3172 trust_cfg.hash_mismatch_level.to_string()
3173 }
3174 } else {
3175 initial_level.to_string()
3176 };
3177 let source_path = meta.skill_dir.to_str();
3178 if let Err(e) = memory
3179 .sqlite()
3180 .upsert_skill_trust(
3181 &meta.name,
3182 &trust_level_str,
3183 source_kind,
3184 None,
3185 source_path,
3186 ¤t_hash,
3187 )
3188 .await
3189 {
3190 tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
3191 }
3192 }
3193 Err(e) => {
3194 tracing::warn!("failed to compute hash for '{}': {e:#}", meta.name);
3195 }
3196 }
3197 }
3198 }
3199
3200 let all_meta = all_meta.iter().collect::<Vec<_>>();
3201 let provider = self.provider.clone();
3202 let embed_fn = |text: &str| -> zeph_skills::matcher::EmbedFuture {
3203 let owned = text.to_owned();
3204 let p = provider.clone();
3205 Box::pin(async move { p.embed(&owned).await })
3206 };
3207
3208 let needs_inmemory_rebuild = !self
3209 .skill_state
3210 .matcher
3211 .as_ref()
3212 .is_some_and(SkillMatcherBackend::is_qdrant);
3213
3214 if needs_inmemory_rebuild {
3215 self.skill_state.matcher = SkillMatcher::new(&all_meta, embed_fn)
3216 .await
3217 .map(SkillMatcherBackend::InMemory);
3218 } else if let Some(ref mut backend) = self.skill_state.matcher {
3219 let _ = self.channel.send_status("syncing skill index...").await;
3220 if let Err(e) = backend
3221 .sync(&all_meta, &self.skill_state.embedding_model, embed_fn)
3222 .await
3223 {
3224 tracing::warn!("failed to sync skill embeddings: {e:#}");
3225 }
3226 }
3227
3228 if self.skill_state.hybrid_search {
3229 let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
3230 let _ = self.channel.send_status("rebuilding search index...").await;
3231 self.skill_state.bm25_index = Some(zeph_skills::bm25::Bm25Index::build(&descs));
3232 }
3233
3234 let all_skills: Vec<Skill> = {
3235 let reg = self
3236 .skill_state
3237 .registry
3238 .read()
3239 .expect("registry read lock");
3240 reg.all_meta()
3241 .iter()
3242 .filter_map(|m| reg.get_skill(&m.name).ok())
3243 .collect()
3244 };
3245 let trust_map = self.build_skill_trust_map().await;
3246 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
3247 let skills_prompt = format_skills_prompt(&all_skills, &trust_map, &empty_health);
3248 self.skill_state
3249 .last_skills_prompt
3250 .clone_from(&skills_prompt);
3251 let system_prompt = build_system_prompt(&skills_prompt, None, None, false);
3252 if let Some(msg) = self.messages.first_mut() {
3253 msg.content = system_prompt;
3254 }
3255
3256 let _ = self.channel.send_status("").await;
3257 tracing::info!(
3258 "reloaded {} skill(s)",
3259 self.skill_state
3260 .registry
3261 .read()
3262 .expect("registry read lock")
3263 .all_meta()
3264 .len()
3265 );
3266 }
3267
3268 fn reload_instructions(&mut self) {
3269 if let Some(ref mut rx) = self.instruction_reload_rx {
3271 while rx.try_recv().is_ok() {}
3272 }
3273 let Some(ref state) = self.instruction_reload_state else {
3274 return;
3275 };
3276 let new_blocks = crate::instructions::load_instructions(
3277 &state.base_dir,
3278 &state.provider_kinds,
3279 &state.explicit_files,
3280 state.auto_detect,
3281 );
3282 let old_sources: std::collections::HashSet<_> =
3283 self.instruction_blocks.iter().map(|b| &b.source).collect();
3284 let new_sources: std::collections::HashSet<_> =
3285 new_blocks.iter().map(|b| &b.source).collect();
3286 for added in new_sources.difference(&old_sources) {
3287 tracing::info!(path = %added.display(), "instruction file added");
3288 }
3289 for removed in old_sources.difference(&new_sources) {
3290 tracing::info!(path = %removed.display(), "instruction file removed");
3291 }
3292 tracing::info!(
3293 old_count = self.instruction_blocks.len(),
3294 new_count = new_blocks.len(),
3295 "reloaded instruction files"
3296 );
3297 self.instruction_blocks = new_blocks;
3298 }
3299
3300 fn reload_config(&mut self) {
3301 let Some(ref path) = self.config_path else {
3302 return;
3303 };
3304 let config = match Config::load(path) {
3305 Ok(c) => c,
3306 Err(e) => {
3307 tracing::warn!("config reload failed: {e:#}");
3308 return;
3309 }
3310 };
3311
3312 self.runtime.security = config.security;
3313 self.runtime.timeouts = config.timeouts;
3314 self.runtime.redact_credentials = config.memory.redact_credentials;
3315 self.memory_state.history_limit = config.memory.history_limit;
3316 self.memory_state.recall_limit = config.memory.semantic.recall_limit;
3317 self.memory_state.summarization_threshold = config.memory.summarization_threshold;
3318 self.skill_state.max_active_skills = config.skills.max_active_skills;
3319 self.skill_state.disambiguation_threshold = config.skills.disambiguation_threshold;
3320 self.skill_state.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
3321 self.skill_state.hybrid_search = config.skills.hybrid_search;
3322
3323 if config.memory.context_budget_tokens > 0 {
3324 self.context_manager.budget = Some(
3325 ContextBudget::new(config.memory.context_budget_tokens, 0.20)
3326 .with_graph_enabled(config.memory.graph.enabled),
3327 );
3328 } else {
3329 self.context_manager.budget = None;
3330 }
3331
3332 {
3333 self.memory_state.graph_config = config.memory.graph.clone();
3334 }
3335 self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
3336 self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
3337 self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
3338 self.context_manager.compaction_cooldown_turns = config.memory.compaction_cooldown_turns;
3339 self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
3340 self.context_manager.compression = config.memory.compression.clone();
3341 self.context_manager.routing = config.memory.routing.clone();
3342 self.memory_state.cross_session_score_threshold =
3343 config.memory.cross_session_score_threshold;
3344
3345 self.index.repo_map_tokens = config.index.repo_map_tokens;
3346 self.index.repo_map_ttl = std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
3347
3348 tracing::info!("config reloaded");
3349 }
3350}
3351pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
3352 while !*rx.borrow_and_update() {
3353 if rx.changed().await.is_err() {
3354 std::future::pending::<()>().await;
3355 }
3356 }
3357}
3358
3359pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
3360 match rx {
3361 Some(inner) => {
3362 if let Some(v) = inner.recv().await {
3363 Some(v)
3364 } else {
3365 *rx = None;
3366 std::future::pending().await
3367 }
3368 }
3369 None => std::future::pending().await,
3370 }
3371}
3372
3373#[cfg(test)]
3374mod tests;
3375
3376#[cfg(test)]
3377pub(crate) use tests::agent_tests;