Skip to main content

zeph_core/agent/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4mod builder;
5mod context;
6pub(crate) mod context_manager;
7pub mod error;
8#[cfg(feature = "experiments")]
9mod experiment_cmd;
10pub(super) mod feedback_detector;
11mod graph_commands;
12mod index;
13mod learning;
14pub(crate) mod learning_engine;
15mod log_commands;
16#[cfg(feature = "lsp-context")]
17mod lsp_commands;
18mod mcp;
19mod message_queue;
20mod persistence;
21#[cfg(feature = "scheduler")]
22mod scheduler_commands;
23mod skill_management;
24pub mod slash_commands;
25pub(crate) mod tool_execution;
26pub(crate) mod tool_orchestrator;
27mod trust_commands;
28mod utils;
29
30use std::collections::VecDeque;
31use std::path::PathBuf;
32use std::time::Instant;
33
34use std::sync::Arc;
35
36use tokio::sync::{Notify, mpsc, watch};
37use tokio_util::sync::CancellationToken;
38use zeph_llm::any::AnyProvider;
39use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
40use zeph_llm::stt::SpeechToText;
41
42use crate::metrics::MetricsSnapshot;
43use std::collections::HashMap;
44use zeph_memory::TokenCounter;
45use zeph_memory::semantic::SemanticMemory;
46use zeph_skills::loader::Skill;
47use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
48use zeph_skills::prompt::format_skills_prompt;
49use zeph_skills::registry::SkillRegistry;
50use zeph_skills::watcher::SkillEvent;
51use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
52
53use crate::channel::Channel;
54use crate::config::Config;
55use crate::config::{SecurityConfig, SkillPromptMode, TimeoutConfig};
56use crate::config_watcher::ConfigEvent;
57use crate::context::{
58    ContextBudget, EnvironmentContext, build_system_prompt, build_system_prompt_with_instructions,
59};
60use crate::cost::CostTracker;
61use crate::instructions::{InstructionBlock, InstructionEvent, InstructionReloadState};
62use crate::sanitizer::ContentSanitizer;
63use crate::sanitizer::quarantine::QuarantinedSummarizer;
64use crate::vault::Secret;
65
66use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, QueuedMessage, detect_image_mime};
67
68pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
69pub(crate) const DOCUMENT_RAG_PREFIX: &str = "## Relevant documents\n";
70pub(crate) const RECALL_PREFIX: &str = "[semantic recall]\n";
71pub(crate) const CODE_CONTEXT_PREFIX: &str = "[code context]\n";
72pub(crate) const SUMMARY_PREFIX: &str = "[conversation summaries]\n";
73pub(crate) const CROSS_SESSION_PREFIX: &str = "[cross-session context]\n";
74pub(crate) const CORRECTIONS_PREFIX: &str = "[past corrections]\n";
75pub(crate) const GRAPH_FACTS_PREFIX: &str = "[known facts]\n";
76/// Prefix used for LSP context messages (`Role::System`) injected into message history.
77/// The tool-pair summarizer targets User/Assistant pairs and skips System messages,
78/// so these notes are never accidentally summarized. `remove_lsp_messages` uses this
79/// prefix to clear stale notes before each fresh injection.
80#[cfg(feature = "lsp-context")]
81pub(crate) const LSP_NOTE_PREFIX: &str = "[lsp ";
82pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
83
84fn format_plan_summary(graph: &crate::orchestration::TaskGraph) -> String {
85    use std::fmt::Write;
86    let mut out = String::new();
87    let _ = writeln!(out, "Plan: \"{}\"", graph.goal);
88    let _ = writeln!(out, "Tasks: {}", graph.tasks.len());
89    let _ = writeln!(out);
90    for (i, task) in graph.tasks.iter().enumerate() {
91        let deps = if task.depends_on.is_empty() {
92            String::new()
93        } else {
94            let ids: Vec<String> = task.depends_on.iter().map(ToString::to_string).collect();
95            format!(" (after: {})", ids.join(", "))
96        };
97        let agent = task.agent_hint.as_deref().unwrap_or("-");
98        let _ = writeln!(out, "  {}. [{}] {}{}", i + 1, agent, task.title, deps);
99    }
100    out
101}
102
103pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
104    use std::fmt::Write;
105    let capacity = "[tool output: ".len()
106        + tool_name.len()
107        + "]\n```\n".len()
108        + body.len()
109        + TOOL_OUTPUT_SUFFIX.len();
110    let mut buf = String::with_capacity(capacity);
111    let _ = write!(
112        buf,
113        "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
114    );
115    buf
116}
117
118pub(super) struct MemoryState {
119    pub(super) memory: Option<Arc<SemanticMemory>>,
120    pub(super) conversation_id: Option<zeph_memory::ConversationId>,
121    pub(super) history_limit: u32,
122    pub(super) recall_limit: usize,
123    pub(super) summarization_threshold: usize,
124    pub(super) cross_session_score_threshold: f32,
125    pub(super) autosave_assistant: bool,
126    pub(super) autosave_min_length: usize,
127    pub(super) tool_call_cutoff: usize,
128    pub(super) unsummarized_count: usize,
129    pub(super) document_config: crate::config::DocumentConfig,
130    pub(super) graph_config: crate::config::GraphConfig,
131}
132
133pub(super) struct SkillState {
134    pub(super) registry: std::sync::Arc<std::sync::RwLock<SkillRegistry>>,
135    pub(super) skill_paths: Vec<PathBuf>,
136    pub(super) managed_dir: Option<PathBuf>,
137    pub(super) trust_config: crate::config::TrustConfig,
138    pub(super) matcher: Option<SkillMatcherBackend>,
139    pub(super) max_active_skills: usize,
140    pub(super) disambiguation_threshold: f32,
141    pub(super) embedding_model: String,
142    pub(super) skill_reload_rx: Option<mpsc::Receiver<SkillEvent>>,
143    pub(super) active_skill_names: Vec<String>,
144    pub(super) last_skills_prompt: String,
145    pub(super) prompt_mode: SkillPromptMode,
146    /// Custom secrets available at runtime: key=hyphenated name, value=secret.
147    pub(super) available_custom_secrets: HashMap<String, Secret>,
148    pub(super) cosine_weight: f32,
149    pub(super) hybrid_search: bool,
150    pub(super) bm25_index: Option<zeph_skills::bm25::Bm25Index>,
151}
152
153pub(super) struct McpState {
154    pub(super) tools: Vec<zeph_mcp::McpTool>,
155    pub(super) registry: Option<zeph_mcp::McpToolRegistry>,
156    pub(super) manager: Option<std::sync::Arc<zeph_mcp::McpManager>>,
157    pub(super) allowed_commands: Vec<String>,
158    pub(super) max_dynamic: usize,
159    /// Shared with `McpToolExecutor` so native `tool_use` sees the current tool list.
160    pub(super) shared_tools: Option<std::sync::Arc<std::sync::RwLock<Vec<zeph_mcp::McpTool>>>>,
161}
162
163pub(super) struct IndexState {
164    pub(super) retriever: Option<std::sync::Arc<zeph_index::retriever::CodeRetriever>>,
165    pub(super) repo_map_tokens: usize,
166    pub(super) cached_repo_map: Option<(String, std::time::Instant)>,
167    pub(super) repo_map_ttl: std::time::Duration,
168}
169
170pub(super) struct RuntimeConfig {
171    pub(super) security: SecurityConfig,
172    pub(super) timeouts: TimeoutConfig,
173    pub(super) model_name: String,
174    pub(super) permission_policy: zeph_tools::PermissionPolicy,
175    pub(super) redact_credentials: bool,
176}
177
178/// Groups security-related subsystems (sanitizer, quarantine, exfiltration guard).
179pub(super) struct SecurityState {
180    pub(super) sanitizer: ContentSanitizer,
181    pub(super) quarantine_summarizer: Option<QuarantinedSummarizer>,
182    pub(super) exfiltration_guard: crate::sanitizer::exfiltration::ExfiltrationGuard,
183    pub(super) flagged_urls: std::collections::HashSet<String>,
184}
185
186/// Groups debug/diagnostics subsystems (dumper, anomaly detector, logging config).
187pub(super) struct DebugState {
188    pub(super) debug_dumper: Option<crate::debug_dump::DebugDumper>,
189    pub(super) dump_format: crate::debug_dump::DumpFormat,
190    pub(super) anomaly_detector: Option<zeph_tools::AnomalyDetector>,
191    pub(super) logging_config: crate::config::LoggingConfig,
192}
193
194pub struct Agent<C: Channel> {
195    provider: AnyProvider,
196    channel: C,
197    pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
198    messages: Vec<Message>,
199    pub(super) memory_state: MemoryState,
200    pub(super) skill_state: SkillState,
201    pub(super) context_manager: context_manager::ContextManager,
202    pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
203    pub(super) learning_engine: learning_engine::LearningEngine,
204    pub(super) feedback_detector: feedback_detector::FeedbackDetector,
205    pub(super) judge_detector: Option<feedback_detector::JudgeDetector>,
206    pub(super) judge_provider: Option<AnyProvider>,
207    config_path: Option<PathBuf>,
208    config_reload_rx: Option<mpsc::Receiver<ConfigEvent>>,
209    shutdown: watch::Receiver<bool>,
210    metrics_tx: Option<watch::Sender<MetricsSnapshot>>,
211    pub(super) runtime: RuntimeConfig,
212    pub(super) mcp: McpState,
213    pub(super) index: IndexState,
214    cancel_signal: Arc<Notify>,
215    cancel_token: CancellationToken,
216    start_time: Instant,
217    message_queue: VecDeque<QueuedMessage>,
218    summary_provider: Option<AnyProvider>,
219    /// Shared slot for runtime model switching; set by external caller (e.g. ACP).
220    provider_override: Option<Arc<std::sync::RwLock<Option<AnyProvider>>>>,
221    warmup_ready: Option<watch::Receiver<bool>>,
222    cost_tracker: Option<CostTracker>,
223    cached_prompt_tokens: u64,
224    env_context: EnvironmentContext,
225    pub(crate) token_counter: Arc<TokenCounter>,
226    stt: Option<Box<dyn SpeechToText>>,
227    update_notify_rx: Option<mpsc::Receiver<String>>,
228    custom_task_rx: Option<mpsc::Receiver<String>>,
229    /// Manages spawned sub-agents. Wired up during construction but not yet
230    /// dispatched to in the current agent loop iteration; retained for
231    /// forward-compatible multi-agent orchestration.
232    pub(crate) subagent_manager: Option<crate::subagent::SubAgentManager>,
233    pub(crate) subagent_config: crate::config::SubAgentConfig,
234    pub(crate) orchestration_config: crate::config::OrchestrationConfig,
235    #[cfg(feature = "experiments")]
236    pub(super) experiment_config: crate::config::ExperimentConfig,
237    pub(super) response_cache: Option<std::sync::Arc<zeph_memory::ResponseCache>>,
238    /// Parent tool call ID when this agent runs as a subagent inside another agent session.
239    /// Propagated into every `LoopbackEvent::ToolStart` / `ToolOutput` so the IDE can build
240    /// a subagent hierarchy.
241    pub(crate) parent_tool_use_id: Option<String>,
242    pub(super) debug_state: DebugState,
243    /// Instruction blocks loaded at startup from provider-specific and explicit files.
244    pub(super) instruction_blocks: Vec<InstructionBlock>,
245    pub(super) instruction_reload_rx: Option<mpsc::Receiver<InstructionEvent>>,
246    pub(super) instruction_reload_state: Option<InstructionReloadState>,
247    pub(super) security: SecurityState,
248    /// Image parts staged by `/image` commands, attached to the next user message.
249    pending_image_parts: Vec<zeph_llm::provider::MessagePart>,
250    /// Graph waiting for `/plan confirm` before execution starts.
251    pub(super) pending_graph: Option<crate::orchestration::TaskGraph>,
252    /// Cancellation token for the currently executing plan. `None` when no plan is running.
253    /// Created fresh in `handle_plan_confirm()`, cancelled in `handle_plan_cancel()`.
254    ///
255    /// # Known limitation
256    ///
257    /// Token plumbing is ready; the delivery path requires the agent message loop to be
258    /// restructured so `/plan cancel` can be received while `run_scheduler_loop` holds
259    /// `&mut self`. See follow-up issue #1603 (SEC-M34-002).
260    plan_cancel_token: Option<CancellationToken>,
261
262    /// LSP context injection hooks. Fires after native tool execution, injects
263    /// diagnostics/hover notes as `Role::System` messages before the next LLM call.
264    #[cfg(feature = "lsp-context")]
265    pub(super) lsp_hooks: Option<crate::lsp_hooks::LspHookRunner>,
266    /// Cancellation token for a running experiment session. `Some` means an experiment is active.
267    #[cfg(feature = "experiments")]
268    pub(super) experiment_cancel: Option<tokio_util::sync::CancellationToken>,
269    /// Pre-built config snapshot used as the experiment baseline (agent path).
270    /// Set via `with_experiment_baseline()`; defaults to `ConfigSnapshot::default()`.
271    #[cfg(feature = "experiments")]
272    pub(super) experiment_baseline: crate::experiments::ConfigSnapshot,
273    /// Receives completion/error messages from the background experiment engine task.
274    /// When a message arrives in the agent loop, it is forwarded to the channel and
275    /// `experiment_cancel` is cleared. Always present so the select! branch compiles
276    /// unconditionally; only ever receives messages when the `experiments` feature is enabled.
277    pub(super) experiment_notify_rx: Option<tokio::sync::mpsc::Receiver<String>>,
278    /// Sender end paired with `experiment_notify_rx`. Cloned into the background task.
279    /// Feature-gated because it is only used in `experiment_cmd.rs`.
280    #[cfg(feature = "experiments")]
281    pub(super) experiment_notify_tx: tokio::sync::mpsc::Sender<String>,
282    /// Whether the active provider has server-side compaction enabled (Claude compact-2026-01-12).
283    /// When true, client-side compaction is skipped.
284    pub(super) server_compaction_active: bool,
285}
286
287impl<C: Channel> Agent<C> {
288    #[must_use]
289    #[allow(clippy::too_many_lines)]
290    pub fn new(
291        provider: AnyProvider,
292        channel: C,
293        registry: SkillRegistry,
294        matcher: Option<SkillMatcherBackend>,
295        max_active_skills: usize,
296        tool_executor: impl ToolExecutor + 'static,
297    ) -> Self {
298        let registry = std::sync::Arc::new(std::sync::RwLock::new(registry));
299        Self::new_with_registry_arc(
300            provider,
301            channel,
302            registry,
303            matcher,
304            max_active_skills,
305            tool_executor,
306        )
307    }
308
309    /// Create an agent from a pre-wrapped registry Arc, allowing the caller to
310    /// share the same Arc with other components (e.g. [`crate::SkillLoaderExecutor`]).
311    ///
312    /// # Panics
313    ///
314    /// Panics if the registry `RwLock` is poisoned.
315    #[must_use]
316    #[allow(clippy::too_many_lines)]
317    pub fn new_with_registry_arc(
318        provider: AnyProvider,
319        channel: C,
320        registry: std::sync::Arc<std::sync::RwLock<SkillRegistry>>,
321        matcher: Option<SkillMatcherBackend>,
322        max_active_skills: usize,
323        tool_executor: impl ToolExecutor + 'static,
324    ) -> Self {
325        let all_skills: Vec<Skill> = {
326            let reg = registry.read().expect("registry read lock poisoned");
327            reg.all_meta()
328                .iter()
329                .filter_map(|m| reg.get_skill(&m.name).ok())
330                .collect()
331        };
332        let empty_trust = HashMap::new();
333        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
334        let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
335        let system_prompt = build_system_prompt(&skills_prompt, None, None, false);
336        tracing::debug!(len = system_prompt.len(), "initial system prompt built");
337        tracing::trace!(prompt = %system_prompt, "full system prompt");
338
339        let initial_prompt_tokens = u64::try_from(system_prompt.len()).unwrap_or(0) / 4;
340        let (_tx, rx) = watch::channel(false);
341        let token_counter = Arc::new(TokenCounter::new());
342        // Always create the receiver side of the experiment notification channel so the
343        // select! branch in the agent loop compiles unconditionally. The sender is only
344        // stored when the experiments feature is enabled (it is only used in experiment_cmd.rs).
345        #[cfg(feature = "experiments")]
346        let (exp_notify_tx, exp_notify_rx) = tokio::sync::mpsc::channel::<String>(4);
347        #[cfg(not(feature = "experiments"))]
348        let (_exp_notify_tx, exp_notify_rx) = tokio::sync::mpsc::channel::<String>(4);
349        Self {
350            provider,
351            channel,
352            tool_executor: Arc::new(tool_executor),
353            messages: vec![Message {
354                role: Role::System,
355                content: system_prompt,
356                parts: vec![],
357                metadata: MessageMetadata::default(),
358            }],
359            memory_state: MemoryState {
360                memory: None,
361                conversation_id: None,
362                history_limit: 50,
363                recall_limit: 5,
364                summarization_threshold: 50,
365                cross_session_score_threshold: 0.35,
366                autosave_assistant: false,
367                autosave_min_length: 20,
368                tool_call_cutoff: 6,
369                unsummarized_count: 0,
370                document_config: crate::config::DocumentConfig::default(),
371                graph_config: crate::config::GraphConfig::default(),
372            },
373            skill_state: SkillState {
374                registry,
375                skill_paths: Vec::new(),
376                managed_dir: None,
377                trust_config: crate::config::TrustConfig::default(),
378                matcher,
379                max_active_skills,
380                disambiguation_threshold: 0.05,
381                embedding_model: String::new(),
382                skill_reload_rx: None,
383                active_skill_names: Vec::new(),
384                last_skills_prompt: skills_prompt,
385                prompt_mode: SkillPromptMode::Auto,
386                available_custom_secrets: HashMap::new(),
387                cosine_weight: 0.7,
388                hybrid_search: false,
389                bm25_index: None,
390            },
391            context_manager: context_manager::ContextManager::new(),
392            tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
393            learning_engine: learning_engine::LearningEngine::new(),
394            feedback_detector: feedback_detector::FeedbackDetector::new(0.6),
395            judge_detector: None,
396            judge_provider: None,
397            config_path: None,
398            debug_state: DebugState {
399                debug_dumper: None,
400                dump_format: crate::debug_dump::DumpFormat::default(),
401                anomaly_detector: None,
402                logging_config: crate::config::LoggingConfig::default(),
403            },
404            config_reload_rx: None,
405            shutdown: rx,
406            metrics_tx: None,
407            runtime: RuntimeConfig {
408                security: SecurityConfig::default(),
409                timeouts: TimeoutConfig::default(),
410                model_name: String::new(),
411                permission_policy: zeph_tools::PermissionPolicy::default(),
412                redact_credentials: true,
413            },
414            mcp: McpState {
415                tools: Vec::new(),
416                registry: None,
417                manager: None,
418                allowed_commands: Vec::new(),
419                max_dynamic: 10,
420                shared_tools: None,
421            },
422            index: IndexState {
423                retriever: None,
424                repo_map_tokens: 0,
425                cached_repo_map: None,
426                repo_map_ttl: std::time::Duration::from_secs(300),
427            },
428            cancel_signal: Arc::new(Notify::new()),
429            cancel_token: CancellationToken::new(),
430            start_time: Instant::now(),
431            message_queue: VecDeque::new(),
432            summary_provider: None,
433            provider_override: None,
434            warmup_ready: None,
435            cost_tracker: None,
436            cached_prompt_tokens: initial_prompt_tokens,
437            env_context: EnvironmentContext::gather(""),
438            token_counter,
439            stt: None,
440            update_notify_rx: None,
441            custom_task_rx: None,
442            subagent_manager: None,
443            subagent_config: crate::config::SubAgentConfig::default(),
444            orchestration_config: crate::config::OrchestrationConfig::default(),
445            #[cfg(feature = "experiments")]
446            experiment_config: crate::config::ExperimentConfig::default(),
447            #[cfg(feature = "experiments")]
448            experiment_baseline: crate::experiments::ConfigSnapshot::default(),
449            experiment_notify_rx: Some(exp_notify_rx),
450            #[cfg(feature = "experiments")]
451            experiment_notify_tx: exp_notify_tx,
452            response_cache: None,
453            parent_tool_use_id: None,
454            instruction_blocks: Vec::new(),
455            instruction_reload_rx: None,
456            instruction_reload_state: None,
457            security: SecurityState {
458                sanitizer: ContentSanitizer::new(
459                    &crate::sanitizer::ContentIsolationConfig::default(),
460                ),
461                quarantine_summarizer: None,
462                exfiltration_guard: crate::sanitizer::exfiltration::ExfiltrationGuard::new(
463                    crate::sanitizer::exfiltration::ExfiltrationGuardConfig::default(),
464                ),
465                flagged_urls: std::collections::HashSet::new(),
466            },
467            pending_image_parts: Vec::new(),
468            pending_graph: None,
469            plan_cancel_token: None,
470
471            #[cfg(feature = "lsp-context")]
472            lsp_hooks: None,
473            #[cfg(feature = "experiments")]
474            experiment_cancel: None,
475            server_compaction_active: false,
476        }
477    }
478
479    /// Poll all active sub-agents for completed/failed/canceled results.
480    ///
481    /// Non-blocking: returns immediately with a list of `(task_id, result)` pairs
482    /// for agents that have finished. Each completed agent is removed from the manager.
483    pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
484        let Some(mgr) = &mut self.subagent_manager else {
485            return vec![];
486        };
487
488        let finished: Vec<String> = mgr
489            .statuses()
490            .into_iter()
491            .filter_map(|(id, status)| {
492                if matches!(
493                    status.state,
494                    crate::subagent::SubAgentState::Completed
495                        | crate::subagent::SubAgentState::Failed
496                        | crate::subagent::SubAgentState::Canceled
497                ) {
498                    Some(id)
499                } else {
500                    None
501                }
502            })
503            .collect();
504
505        let mut results = vec![];
506        for task_id in finished {
507            match mgr.collect(&task_id).await {
508                Ok(result) => results.push((task_id, result)),
509                Err(e) => {
510                    tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
511                }
512            }
513        }
514        results
515    }
516
517    async fn handle_plan_command(
518        &mut self,
519        cmd: crate::orchestration::PlanCommand,
520    ) -> Result<(), error::AgentError> {
521        use crate::orchestration::PlanCommand;
522
523        if !self.config_for_orchestration().enabled {
524            self.channel
525                .send(
526                    "Task orchestration is disabled. Set `orchestration.enabled = true` in config.",
527                )
528                .await?;
529            return Ok(());
530        }
531
532        match cmd {
533            PlanCommand::Goal(goal) => self.handle_plan_goal(&goal).await,
534            PlanCommand::Confirm => self.handle_plan_confirm().await,
535            PlanCommand::Status(id) => self.handle_plan_status(id.as_deref()).await,
536            PlanCommand::List => self.handle_plan_list().await,
537            PlanCommand::Cancel(id) => self.handle_plan_cancel(id.as_deref()).await,
538            PlanCommand::Resume(id) => self.handle_plan_resume(id.as_deref()).await,
539            PlanCommand::Retry(id) => self.handle_plan_retry(id.as_deref()).await,
540        }
541    }
542
543    fn config_for_orchestration(&self) -> &crate::config::OrchestrationConfig {
544        &self.orchestration_config
545    }
546
547    async fn handle_plan_goal(&mut self, goal: &str) -> Result<(), error::AgentError> {
548        use crate::orchestration::{LlmPlanner, Planner};
549
550        if self.pending_graph.is_some() {
551            self.channel
552                .send(
553                    "A plan is already pending confirmation. \
554                     Use /plan confirm to execute it or /plan cancel to discard.",
555                )
556                .await?;
557            return Ok(());
558        }
559
560        self.channel.send("Planning task decomposition...").await?;
561
562        let available_agents = self
563            .subagent_manager
564            .as_ref()
565            .map(|m| m.definitions().to_vec())
566            .unwrap_or_default();
567
568        let confirm_before_execute = self.orchestration_config.confirm_before_execute;
569        let graph = LlmPlanner::new(self.provider.clone(), &self.orchestration_config)
570            .plan(goal, &available_agents)
571            .await
572            .map_err(|e| error::AgentError::Other(e.to_string()))?;
573
574        let task_count = graph.tasks.len() as u64;
575        let snapshot = crate::metrics::TaskGraphSnapshot::from(&graph);
576        self.update_metrics(|m| {
577            m.orchestration.plans_total += 1;
578            m.orchestration.tasks_total += task_count;
579            m.orchestration_graph = Some(snapshot);
580        });
581
582        if confirm_before_execute {
583            let summary = format_plan_summary(&graph);
584            self.channel.send(&summary).await?;
585            self.channel
586                .send("Type `/plan confirm` to execute, or `/plan cancel` to abort.")
587                .await?;
588            self.pending_graph = Some(graph);
589        } else {
590            // confirm_before_execute = false: display and proceed (Phase 5 will run scheduler).
591            // TODO(#1241): wire DagScheduler tick updates for Running task state
592            let summary = format_plan_summary(&graph);
593            self.channel.send(&summary).await?;
594            self.channel
595                .send("Plan ready. Full execution will be available in a future phase.")
596                .await?;
597            // IC1: graph was shown but never confirmed; clear snapshot so it doesn't linger.
598            let now = std::time::Instant::now();
599            self.update_metrics(|m| {
600                if let Some(ref mut s) = m.orchestration_graph {
601                    "completed".clone_into(&mut s.status);
602                    s.completed_at = Some(now);
603                }
604            });
605        }
606
607        Ok(())
608    }
609
610    #[allow(clippy::too_many_lines)]
611    async fn handle_plan_confirm(&mut self) -> Result<(), error::AgentError> {
612        use crate::orchestration::{DagScheduler, GraphStatus, RuleBasedRouter};
613
614        let Some(graph) = self.pending_graph.take() else {
615            self.channel
616                .send("No pending plan to confirm. Use `/plan <goal>` to create one.")
617                .await?;
618            return Ok(());
619        };
620
621        // When subagent manager is not configured, restore graph and inform the user.
622        if self.subagent_manager.is_none() {
623            self.channel
624                .send(
625                    "No sub-agents configured. Add sub-agent definitions to config \
626                     to enable plan execution.",
627                )
628                .await?;
629            self.pending_graph = Some(graph);
630            return Ok(());
631        }
632
633        // REV-2: pre-validate before moving graph into the constructor so we can
634        // restore it to pending_graph on failure.
635        if graph.tasks.is_empty() {
636            self.channel.send("Plan has no tasks.").await?;
637            self.pending_graph = Some(graph);
638            return Ok(());
639        }
640        // resume_from() rejects Completed and Canceled — guard those here too.
641        if matches!(graph.status, GraphStatus::Completed | GraphStatus::Canceled) {
642            self.channel
643                .send(&format!(
644                    "Cannot re-execute a {} plan. Use `/plan <goal>` to create a new one.",
645                    graph.status
646                ))
647                .await?;
648            self.pending_graph = Some(graph);
649            return Ok(());
650        }
651
652        let available_agents = self
653            .subagent_manager
654            .as_ref()
655            .map(|m| m.definitions().to_vec())
656            .unwrap_or_default();
657
658        // Warn when max_concurrent is too low to support the configured parallelism.
659        // This is the main cause of DagScheduler deadlocks (#1619): a planning-phase
660        // sub-agent occupies the only slot while orchestration tasks are waiting.
661        let max_concurrent = self.subagent_config.max_concurrent;
662        let max_parallel = self.orchestration_config.max_parallel as usize;
663        if max_concurrent < max_parallel + 1 {
664            tracing::warn!(
665                max_concurrent,
666                max_parallel,
667                "max_concurrent < max_parallel + 1: orchestration tasks may be starved by \
668                 planning-phase sub-agents; recommend setting max_concurrent >= {}",
669                max_parallel + 1
670            );
671        }
672
673        // Reserve slots equal to max_parallel so the scheduler is guaranteed capacity
674        // even if a planning-phase sub-agent is occupying a slot (#1619).
675        let reserved = max_parallel.min(max_concurrent.saturating_sub(1));
676        if let Some(mgr) = self.subagent_manager.as_mut() {
677            mgr.reserve_slots(reserved);
678        }
679
680        // Use resume_from() for graphs that are no longer in Created status
681        // (e.g., after /plan retry which calls reset_for_retry and sets status=Running).
682        let mut scheduler = if graph.status == GraphStatus::Created {
683            DagScheduler::new(
684                graph,
685                &self.orchestration_config,
686                Box::new(RuleBasedRouter),
687                available_agents,
688            )
689        } else {
690            DagScheduler::resume_from(
691                graph,
692                &self.orchestration_config,
693                Box::new(RuleBasedRouter),
694                available_agents,
695            )
696        }
697        .map_err(|e| {
698            // Release reservation before propagating error.
699            if let Some(mgr) = self.subagent_manager.as_mut() {
700                mgr.release_reservation(reserved);
701            }
702            error::AgentError::Other(e.to_string())
703        })?;
704
705        let task_count = scheduler.graph().tasks.len();
706        self.channel
707            .send(&format!(
708                "Confirmed. Executing plan ({task_count} tasks)..."
709            ))
710            .await?;
711
712        let plan_token = CancellationToken::new();
713        self.plan_cancel_token = Some(plan_token.clone());
714
715        // Use match instead of ? so plan_cancel_token is always cleared (CRIT-07).
716        let scheduler_result = self
717            .run_scheduler_loop(&mut scheduler, task_count, plan_token)
718            .await;
719        self.plan_cancel_token = None;
720
721        // Always release the reservation, regardless of scheduler outcome.
722        if let Some(mgr) = self.subagent_manager.as_mut() {
723            mgr.release_reservation(reserved);
724        }
725
726        let final_status = match scheduler_result {
727            Ok(s) => s,
728            Err(e) => return Err(e),
729        };
730
731        let completed_graph = scheduler.into_graph();
732
733        // Final TUI snapshot update.
734        let snapshot = crate::metrics::TaskGraphSnapshot::from(&completed_graph);
735        self.update_metrics(|m| {
736            m.orchestration_graph = Some(snapshot);
737        });
738
739        let result_label = self
740            .finalize_plan_execution(completed_graph, final_status)
741            .await?;
742
743        let now = std::time::Instant::now();
744        self.update_metrics(|m| {
745            if let Some(ref mut s) = m.orchestration_graph {
746                result_label.clone_into(&mut s.status);
747                s.completed_at = Some(now);
748            }
749        });
750        Ok(())
751    }
752
753    /// Drive the [`DagScheduler`] tick loop until it emits `SchedulerAction::Done`.
754    ///
755    /// Each iteration yields at `wait_event()`, during which `channel.recv()` is polled
756    /// concurrently via `tokio::select!`. If the user sends `/plan cancel`, all running
757    /// sub-agent tasks are aborted and the loop exits with [`GraphStatus::Canceled`].
758    /// Other messages received during execution are queued in `message_queue` and
759    /// processed after the plan completes.
760    ///
761    /// # Known limitations
762    ///
763    /// `RunInline` tasks block the tick loop for their entire duration — `/plan cancel`
764    /// cannot interrupt an in-progress inline LLM call and will only be delivered on the
765    /// next iteration after the call completes.
766    #[allow(clippy::too_many_lines)]
767    async fn run_scheduler_loop(
768        &mut self,
769        scheduler: &mut crate::orchestration::DagScheduler,
770        task_count: usize,
771        cancel_token: CancellationToken,
772    ) -> Result<crate::orchestration::GraphStatus, error::AgentError> {
773        use crate::orchestration::SchedulerAction;
774
775        // Sequential spawn counter for human-readable "task N/M" progress messages.
776        // task_id.index() reflects array position and can be non-contiguous for
777        // parallel plans (e.g. 0, 2, 4), so we use a local counter instead.
778        let mut spawn_counter: usize = 0;
779
780        // Tracks (handle_id, secret_key) pairs denied this plan execution to prevent
781        // re-prompting the user when a sub-agent re-requests the same secret after denial.
782        let mut denied_secrets: std::collections::HashSet<(String, String)> =
783            std::collections::HashSet::new();
784
785        let final_status = 'tick: loop {
786            let actions = scheduler.tick();
787
788            // Track batch-level spawn outcomes for record_batch_backoff() below.
789            let mut any_spawn_success = false;
790            let mut any_concurrency_failure = false;
791
792            for action in actions {
793                match action {
794                    SchedulerAction::Spawn {
795                        task_id,
796                        agent_def_name,
797                        prompt,
798                    } => {
799                        let task_title = scheduler
800                            .graph()
801                            .tasks
802                            .get(task_id.index())
803                            .map_or("unknown", |t| t.title.as_str());
804
805                        let provider = self.provider.clone();
806                        let tool_executor = Arc::clone(&self.tool_executor);
807                        let skills = self.filtered_skills_for(&agent_def_name);
808                        let cfg = self.subagent_config.clone();
809                        let event_tx = scheduler.event_sender();
810
811                        let mgr = self
812                            .subagent_manager
813                            .as_mut()
814                            .expect("subagent_manager checked above");
815                        match mgr.spawn_for_task(
816                            &agent_def_name,
817                            &prompt,
818                            provider,
819                            tool_executor,
820                            skills,
821                            &cfg,
822                            task_id,
823                            event_tx,
824                        ) {
825                            Ok(handle_id) => {
826                                spawn_counter += 1;
827                                let _ = self
828                                    .channel
829                                    .send_status(&format!(
830                                        "Executing task {spawn_counter}/{task_count}: {task_title}..."
831                                    ))
832                                    .await;
833                                scheduler.record_spawn(task_id, handle_id, agent_def_name);
834                                any_spawn_success = true;
835                            }
836                            Err(e) => {
837                                tracing::error!(error = %e, %task_id, "spawn_for_task failed");
838                                if matches!(
839                                    e,
840                                    crate::subagent::SubAgentError::ConcurrencyLimit { .. }
841                                ) {
842                                    any_concurrency_failure = true;
843                                }
844                                let extra = scheduler.record_spawn_failure(task_id, &e);
845                                for a in extra {
846                                    match a {
847                                        SchedulerAction::Cancel { agent_handle_id } => {
848                                            if let Some(m) = self.subagent_manager.as_mut() {
849                                                // benign race: agent may have already finished
850                                                let _ =
851                                                    m.cancel(&agent_handle_id).inspect_err(|err| {
852                                                        tracing::trace!(
853                                                            error = %err,
854                                                            "cancel after spawn failure: agent already gone"
855                                                        );
856                                                    });
857                                            }
858                                        }
859                                        SchedulerAction::Done { status } => {
860                                            break 'tick status;
861                                        }
862                                        SchedulerAction::Spawn { .. }
863                                        | SchedulerAction::RunInline { .. } => {}
864                                    }
865                                }
866                            }
867                        }
868                    }
869                    SchedulerAction::Cancel { agent_handle_id } => {
870                        if let Some(mgr) = self.subagent_manager.as_mut() {
871                            // benign race: agent may have already finished
872                            let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
873                                tracing::trace!(error = %e, "cancel: agent already gone");
874                            });
875                        }
876                    }
877                    // Inline execution: the LLM call blocks this tick loop for its
878                    // duration. This is intentionally sequential and only expected in
879                    // single-agent setups where no sub-agents are configured.
880                    // Known limitation: if a RunInline action appears before Spawn actions
881                    // in the same batch (mixed routing), those Spawn actions are delayed
882                    // until the inline call completes. Refactor to tokio::spawn if mixed
883                    // batches become common.
884                    // TODO(post-MVP): wire CancellationToken into run_inline_tool_loop so
885                    // that /plan cancel can interrupt a long-running inline LLM call instead
886                    // of waiting for the current iteration to complete.
887                    SchedulerAction::RunInline { task_id, prompt } => {
888                        spawn_counter += 1;
889                        let task_title = scheduler
890                            .graph()
891                            .tasks
892                            .get(task_id.index())
893                            .map_or("unknown", |t| t.title.as_str());
894                        let _ = self
895                            .channel
896                            .send_status(&format!(
897                                "Executing task {spawn_counter}/{task_count} (inline): {task_title}..."
898                            ))
899                            .await;
900
901                        // record_spawn before chat(): the inline call completes before
902                        // wait_event() runs, so the completion event is always buffered
903                        // before any timeout check fires in the next tick().
904                        let handle_id = format!("__inline_{task_id}__");
905                        scheduler.record_spawn(task_id, handle_id.clone(), "__main__".to_string());
906
907                        let event_tx = scheduler.event_sender();
908                        let max_iter = self.tool_orchestrator.max_iterations;
909                        let outcome = tokio::select! {
910                            result = self.run_inline_tool_loop(&prompt, max_iter) => {
911                                match result {
912                                    Ok(output) => crate::orchestration::TaskOutcome::Completed {
913                                        output,
914                                        artifacts: vec![],
915                                    },
916                                    Err(e) => crate::orchestration::TaskOutcome::Failed {
917                                        error: e.to_string(),
918                                    },
919                                }
920                            }
921                            () = cancel_token.cancelled() => {
922                                // TODO: use TaskOutcome::Canceled when the variant is added (#1603)
923                                crate::orchestration::TaskOutcome::Failed {
924                                    error: "canceled".to_string(),
925                                }
926                            }
927                        };
928                        let event = crate::orchestration::TaskEvent {
929                            task_id,
930                            agent_handle_id: handle_id,
931                            outcome,
932                        };
933                        if let Err(e) = event_tx.send(event).await {
934                            tracing::warn!(
935                                %task_id,
936                                error = %e,
937                                "inline task event send failed"
938                            );
939                        }
940                    }
941                    SchedulerAction::Done { status } => {
942                        break 'tick status;
943                    }
944                }
945            }
946
947            // Update batch-level backoff counter after processing all Spawn actions.
948            scheduler.record_batch_backoff(any_spawn_success, any_concurrency_failure);
949
950            // Drain all pending secret requests this tick (MED-2 fix).
951            self.process_pending_secret_requests(&mut denied_secrets)
952                .await;
953
954            // Update TUI with current graph state.
955            let snapshot = crate::metrics::TaskGraphSnapshot::from(scheduler.graph());
956            self.update_metrics(|m| {
957                m.orchestration_graph = Some(snapshot);
958            });
959
960            // Poll channel.recv() and cancel_token concurrently with the scheduler's event wait
961            // so that /plan cancel can be delivered during plan execution. Without this select!,
962            // run_scheduler_loop holds &mut self for the entire plan, blocking the main
963            // run() loop's channel.recv() call.
964            //
965            // Cancellation paths:
966            //   cancel_token.cancelled() — fired by handle_plan_cancel() from concurrent channels
967            //                              (TUI, Telegram, ACP) that have their own event loops.
968            //   channel.recv("/plan cancel") — for CLI where the main dispatch loop is blocked.
969            //
970            // NOTE(Telegram): Telegram's recv() is not fully cancel-safe — a message
971            // consumed from the internal mpsc but not yet returned can be lost if the
972            // select! cancels the future during the /start send().await path. For
973            // non-command messages the race window is negligible. Acceptable for MVP.
974            //
975            // NOTE(RunInline): tasks in the RunInline arm above block this tick loop
976            // synchronously (no await between loop iteration start and wait_event).
977            // /plan cancel cannot interrupt an inline LLM call mid-execution; it is
978            // delivered on the next tick after the inline call completes.
979            // TODO(post-MVP): wire CancellationToken into run_inline_tool_loop.
980            tokio::select! {
981                // biased: token cancellation takes priority over new events and input.
982                biased;
983                () = cancel_token.cancelled() => {
984                    let cancel_actions = scheduler.cancel_all();
985                    for action in cancel_actions {
986                        match action {
987                            SchedulerAction::Cancel { agent_handle_id } => {
988                                if let Some(mgr) = self.subagent_manager.as_mut() {
989                                    let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
990                                        tracing::trace!(
991                                            error = %e,
992                                            "cancel during plan cancellation: agent already gone"
993                                        );
994                                    });
995                                }
996                            }
997                            SchedulerAction::Done { status } => {
998                                break 'tick status;
999                            }
1000                            SchedulerAction::Spawn { .. } | SchedulerAction::RunInline { .. } => {}
1001                        }
1002                    }
1003                    // Defensive fallback: cancel_all always emits Done, but guard against
1004                    // future changes.
1005                    break 'tick crate::orchestration::GraphStatus::Canceled;
1006                }
1007                () = scheduler.wait_event() => {}
1008                result = self.channel.recv() => {
1009                    if let Ok(Some(msg)) = result {
1010                        if msg.text.trim().eq_ignore_ascii_case("/plan cancel") {
1011                            let _ = self.channel.send_status("Canceling plan...").await;
1012                            let cancel_actions = scheduler.cancel_all();
1013                            for ca in cancel_actions {
1014                                match ca {
1015                                    SchedulerAction::Cancel { agent_handle_id } => {
1016                                        if let Some(mgr) = self.subagent_manager.as_mut() {
1017                                            // benign race: agent may have already finished
1018                                            let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1019                                                tracing::trace!(error = %e, "cancel on user request: agent already gone");
1020                                            });
1021                                        }
1022                                    }
1023                                    SchedulerAction::Done { status } => {
1024                                        break 'tick status;
1025                                    }
1026                                    SchedulerAction::Spawn { .. }
1027                                    | SchedulerAction::RunInline { .. } => {}
1028                                }
1029                            }
1030                            // Defensive fallback: cancel_all always emits Done, but guard
1031                            // against future changes.
1032                            break 'tick crate::orchestration::GraphStatus::Canceled;
1033                        }
1034                        self.enqueue_or_merge(msg.text, vec![], msg.attachments);
1035                    } else {
1036                        // Channel closed — cancel running sub-agents and exit cleanly.
1037                        let cancel_actions = scheduler.cancel_all();
1038                        let n = cancel_actions
1039                            .iter()
1040                            .filter(|a| matches!(a, SchedulerAction::Cancel { .. }))
1041                            .count();
1042                        tracing::warn!(sub_agents = n, "scheduler channel closed, canceling running sub-agents");
1043                        for action in cancel_actions {
1044                            match action {
1045                                SchedulerAction::Cancel { agent_handle_id } => {
1046                                    if let Some(mgr) = self.subagent_manager.as_mut() {
1047                                        let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1048                                            tracing::trace!(
1049                                                error = %e,
1050                                                "cancel on channel close: agent already gone"
1051                                            );
1052                                        });
1053                                    }
1054                                }
1055                                SchedulerAction::Done { status } => {
1056                                    break 'tick status;
1057                                }
1058                                SchedulerAction::Spawn { .. } | SchedulerAction::RunInline { .. } => {}
1059                            }
1060                        }
1061                        // Defensive fallback: cancel_all always emits Done, but guard
1062                        // against future changes.
1063                        break 'tick crate::orchestration::GraphStatus::Canceled;
1064                    }
1065                }
1066                // Shutdown signal received — cancel running sub-agents and exit cleanly.
1067                () = shutdown_signal(&mut self.shutdown) => {
1068                    let cancel_actions = scheduler.cancel_all();
1069                    let n = cancel_actions
1070                        .iter()
1071                        .filter(|a| matches!(a, SchedulerAction::Cancel { .. }))
1072                        .count();
1073                    tracing::warn!(sub_agents = n, "shutdown signal received, canceling running sub-agents");
1074                    for action in cancel_actions {
1075                        match action {
1076                            SchedulerAction::Cancel { agent_handle_id } => {
1077                                if let Some(mgr) = self.subagent_manager.as_mut() {
1078                                    let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1079                                        tracing::trace!(
1080                                            error = %e,
1081                                            "cancel on shutdown: agent already gone"
1082                                        );
1083                                    });
1084                                }
1085                            }
1086                            SchedulerAction::Done { status } => {
1087                                break 'tick status;
1088                            }
1089                            SchedulerAction::Spawn { .. } | SchedulerAction::RunInline { .. } => {}
1090                        }
1091                    }
1092                    // Defensive fallback: cancel_all always emits Done, but guard against
1093                    // future changes.
1094                    break 'tick crate::orchestration::GraphStatus::Canceled;
1095                }
1096            }
1097        };
1098
1099        // Final drain: if the loop exited via Done on the first tick, secret
1100        // requests buffered before completion would otherwise be silently dropped.
1101        self.process_pending_secret_requests(&mut std::collections::HashSet::new())
1102            .await;
1103
1104        Ok(final_status)
1105    }
1106
1107    /// Run a tool-aware LLM loop for an inline scheduled task.
1108    ///
1109    /// Unlike [`process_response_native_tools`], this is intentionally stripped of all
1110    /// interactive-session machinery (channel sends, doom-loop detection, summarization,
1111    /// learning engine, sanitizer, metrics). Inline tasks are short-lived orchestration
1112    /// sub-tasks that run synchronously inside the scheduler tick loop.
1113    async fn run_inline_tool_loop(
1114        &self,
1115        prompt: &str,
1116        max_iterations: usize,
1117    ) -> Result<String, zeph_llm::LlmError> {
1118        use zeph_llm::provider::{ChatResponse, Message, MessagePart, Role, ToolDefinition};
1119        use zeph_tools::executor::ToolCall;
1120
1121        let tool_defs: Vec<ToolDefinition> = self
1122            .tool_executor
1123            .tool_definitions_erased()
1124            .iter()
1125            .map(tool_execution::tool_def_to_definition)
1126            .collect();
1127
1128        tracing::debug!(
1129            prompt_len = prompt.len(),
1130            max_iterations,
1131            tool_count = tool_defs.len(),
1132            "inline tool loop: starting"
1133        );
1134
1135        let mut messages: Vec<Message> = vec![Message::from_legacy(Role::User, prompt)];
1136        let mut last_text = String::new();
1137
1138        for iteration in 0..max_iterations {
1139            let response = self.provider.chat_with_tools(&messages, &tool_defs).await?;
1140
1141            match response {
1142                ChatResponse::Text(text) => {
1143                    tracing::debug!(iteration, "inline tool loop: text response, returning");
1144                    return Ok(text);
1145                }
1146                ChatResponse::ToolUse {
1147                    text, tool_calls, ..
1148                } => {
1149                    tracing::debug!(
1150                        iteration,
1151                        tools = ?tool_calls.iter().map(|tc| &tc.name).collect::<Vec<_>>(),
1152                        "inline tool loop: tool use"
1153                    );
1154
1155                    if let Some(ref t) = text {
1156                        last_text.clone_from(t);
1157                    }
1158
1159                    // Build assistant message with optional leading text + tool use parts.
1160                    let mut parts: Vec<MessagePart> = Vec::new();
1161                    if let Some(ref t) = text
1162                        && !t.is_empty()
1163                    {
1164                        parts.push(MessagePart::Text { text: t.clone() });
1165                    }
1166                    for tc in &tool_calls {
1167                        parts.push(MessagePart::ToolUse {
1168                            id: tc.id.clone(),
1169                            name: tc.name.clone(),
1170                            input: tc.input.clone(),
1171                        });
1172                    }
1173                    messages.push(Message::from_parts(Role::Assistant, parts));
1174
1175                    // Execute each tool call and collect results.
1176                    let mut result_parts: Vec<MessagePart> = Vec::new();
1177                    for tc in &tool_calls {
1178                        let call = ToolCall {
1179                            tool_id: tc.name.clone(),
1180                            params: match &tc.input {
1181                                serde_json::Value::Object(map) => map.clone(),
1182                                _ => serde_json::Map::new(),
1183                            },
1184                        };
1185                        let output = match self.tool_executor.execute_tool_call_erased(&call).await
1186                        {
1187                            Ok(Some(out)) => out.summary,
1188                            Ok(None) => "(no output)".to_owned(),
1189                            Err(e) => format!("[error] {e}"),
1190                        };
1191                        let is_error = output.starts_with("[error]");
1192                        result_parts.push(MessagePart::ToolResult {
1193                            tool_use_id: tc.id.clone(),
1194                            content: output,
1195                            is_error,
1196                        });
1197                    }
1198                    messages.push(Message::from_parts(Role::User, result_parts));
1199                }
1200            }
1201        }
1202
1203        tracing::debug!(
1204            max_iterations,
1205            last_text_empty = last_text.is_empty(),
1206            "inline tool loop: iteration limit reached"
1207        );
1208        Ok(last_text)
1209    }
1210
1211    /// Bridge pending secret requests from sub-agents to the user (non-blocking, time-bounded).
1212    ///
1213    /// SEC-P1-02: explicit user confirmation is required before granting any secret to a
1214    /// sub-agent. Denial is the default on timeout or channel error.
1215    ///
1216    /// `denied` tracks `(handle_id, secret_key)` pairs already denied this plan execution.
1217    /// Re-requests for a denied pair are auto-denied without prompting the user.
1218    async fn process_pending_secret_requests(
1219        &mut self,
1220        denied: &mut std::collections::HashSet<(String, String)>,
1221    ) {
1222        loop {
1223            let pending = self
1224                .subagent_manager
1225                .as_mut()
1226                .and_then(crate::subagent::SubAgentManager::try_recv_secret_request);
1227            let Some((req_handle_id, req)) = pending else {
1228                break;
1229            };
1230            let deny_key = (req_handle_id.clone(), req.secret_key.clone());
1231            if denied.contains(&deny_key) {
1232                tracing::debug!(
1233                    handle_id = %req_handle_id,
1234                    secret_key = %req.secret_key,
1235                    "skipping duplicate secret prompt for already-denied key"
1236                );
1237                if let Some(mgr) = self.subagent_manager.as_mut() {
1238                    let _ = mgr.deny_secret(&req_handle_id);
1239                }
1240                continue;
1241            }
1242            let prompt = format!(
1243                "Sub-agent requests secret '{}'. Allow?{}",
1244                crate::text::truncate_to_chars(&req.secret_key, 100),
1245                req.reason
1246                    .as_deref()
1247                    .map(|r| format!(" Reason: {}", crate::text::truncate_to_chars(r, 200)))
1248                    .unwrap_or_default()
1249            );
1250            // CRIT-1 fix: use select! to avoid blocking the tick loop forever.
1251            let approved = tokio::select! {
1252                result = self.channel.confirm(&prompt) => result.unwrap_or(false),
1253                () = tokio::time::sleep(std::time::Duration::from_secs(120)) => {
1254                    let _ = self.channel.send("Secret request timed out.").await;
1255                    false
1256                }
1257            };
1258            if let Some(mgr) = self.subagent_manager.as_mut() {
1259                if approved {
1260                    let ttl = std::time::Duration::from_secs(300);
1261                    let key = req.secret_key.clone();
1262                    if mgr.approve_secret(&req_handle_id, &key, ttl).is_ok() {
1263                        let _ = mgr.deliver_secret(&req_handle_id, key);
1264                    }
1265                } else {
1266                    denied.insert(deny_key);
1267                    let _ = mgr.deny_secret(&req_handle_id);
1268                }
1269            }
1270        }
1271    }
1272
1273    /// Aggregate results or report failure after the tick loop completes.
1274    async fn finalize_plan_execution(
1275        &mut self,
1276        completed_graph: crate::orchestration::TaskGraph,
1277        final_status: crate::orchestration::GraphStatus,
1278    ) -> Result<&'static str, error::AgentError> {
1279        use std::fmt::Write;
1280
1281        use crate::orchestration::{Aggregator, GraphStatus, LlmAggregator};
1282
1283        let result_label = match final_status {
1284            GraphStatus::Completed => {
1285                // Update task completion counters.
1286                let completed_count = completed_graph
1287                    .tasks
1288                    .iter()
1289                    .filter(|t| t.status == crate::orchestration::TaskStatus::Completed)
1290                    .count() as u64;
1291                self.update_metrics(|m| m.orchestration.tasks_completed += completed_count);
1292
1293                let aggregator =
1294                    LlmAggregator::new(self.provider.clone(), &self.orchestration_config);
1295                match aggregator.aggregate(&completed_graph).await {
1296                    Ok(synthesis) => {
1297                        self.channel.send(&synthesis).await?;
1298                    }
1299                    Err(e) => {
1300                        tracing::error!(error = %e, "aggregation failed");
1301                        self.channel
1302                            .send(
1303                                "Plan completed but aggregation failed. \
1304                                 Check individual task results.",
1305                            )
1306                            .await?;
1307                    }
1308                }
1309                "completed"
1310            }
1311            GraphStatus::Failed => {
1312                let failed_tasks: Vec<_> = completed_graph
1313                    .tasks
1314                    .iter()
1315                    .filter(|t| t.status == crate::orchestration::TaskStatus::Failed)
1316                    .collect();
1317                self.update_metrics(|m| {
1318                    m.orchestration.tasks_failed += failed_tasks.len() as u64;
1319                });
1320                let mut msg = format!(
1321                    "Plan failed. {}/{} tasks failed:\n",
1322                    failed_tasks.len(),
1323                    completed_graph.tasks.len()
1324                );
1325                for t in &failed_tasks {
1326                    // SEC-M34-002: truncate raw task output before displaying to user.
1327                    let err: std::borrow::Cow<str> =
1328                        t.result.as_ref().map_or("unknown error".into(), |r| {
1329                            if r.output.len() > 500 {
1330                                r.output.chars().take(500).collect::<String>().into()
1331                            } else {
1332                                r.output.as_str().into()
1333                            }
1334                        });
1335                    let _ = writeln!(msg, "  - {}: {err}", t.title);
1336                }
1337                msg.push_str("\nUse `/plan retry` to retry failed tasks.");
1338                self.channel.send(&msg).await?;
1339                // Store graph back so /plan retry and /plan resume work.
1340                self.pending_graph = Some(completed_graph);
1341                "failed"
1342            }
1343            GraphStatus::Paused => {
1344                self.channel
1345                    .send(
1346                        "Plan paused due to a task failure (ask strategy). \
1347                         Use `/plan resume` to continue or `/plan retry` to retry failed tasks.",
1348                    )
1349                    .await?;
1350                self.pending_graph = Some(completed_graph);
1351                "paused"
1352            }
1353            GraphStatus::Canceled => {
1354                let done_count = completed_graph
1355                    .tasks
1356                    .iter()
1357                    .filter(|t| t.status == crate::orchestration::TaskStatus::Completed)
1358                    .count();
1359                self.update_metrics(|m| m.orchestration.tasks_completed += done_count as u64);
1360                let total = completed_graph.tasks.len();
1361                self.channel
1362                    .send(&format!(
1363                        "Plan canceled. {done_count}/{total} tasks completed before cancellation."
1364                    ))
1365                    .await?;
1366                // Do NOT store graph back into pending_graph — canceled plans are not
1367                // retryable via /plan retry.
1368                "canceled"
1369            }
1370            other => {
1371                tracing::warn!(%other, "unexpected graph status after Done");
1372                self.channel
1373                    .send(&format!("Plan ended with status: {other}"))
1374                    .await?;
1375                "unknown"
1376            }
1377        };
1378        Ok(result_label)
1379    }
1380
1381    async fn handle_plan_status(
1382        &mut self,
1383        _graph_id: Option<&str>,
1384    ) -> Result<(), error::AgentError> {
1385        use crate::orchestration::GraphStatus;
1386        let Some(ref graph) = self.pending_graph else {
1387            self.channel.send("No active plan.").await?;
1388            return Ok(());
1389        };
1390        let msg = match graph.status {
1391            GraphStatus::Created => {
1392                "A plan is awaiting confirmation. Type `/plan confirm` to execute or `/plan cancel` to abort."
1393            }
1394            GraphStatus::Running => "Plan is currently running.",
1395            GraphStatus::Paused => {
1396                "Plan is paused. Use `/plan resume` to continue or `/plan cancel` to abort."
1397            }
1398            GraphStatus::Failed => {
1399                "Plan failed. Use `/plan retry` to retry or `/plan cancel` to discard."
1400            }
1401            GraphStatus::Completed => "Plan completed successfully.",
1402            GraphStatus::Canceled => "Plan was canceled.",
1403        };
1404        self.channel.send(msg).await?;
1405        Ok(())
1406    }
1407
1408    async fn handle_plan_list(&mut self) -> Result<(), error::AgentError> {
1409        if let Some(ref graph) = self.pending_graph {
1410            let summary = format_plan_summary(graph);
1411            let status_label = match graph.status {
1412                crate::orchestration::GraphStatus::Created => "awaiting confirmation",
1413                crate::orchestration::GraphStatus::Running => "running",
1414                crate::orchestration::GraphStatus::Paused => "paused",
1415                crate::orchestration::GraphStatus::Failed => "failed (retryable)",
1416                _ => "unknown",
1417            };
1418            self.channel
1419                .send(&format!("{summary}\nStatus: {status_label}"))
1420                .await?;
1421        } else {
1422            self.channel.send("No recent plans.").await?;
1423        }
1424        Ok(())
1425    }
1426
1427    async fn handle_plan_cancel(
1428        &mut self,
1429        _graph_id: Option<&str>,
1430    ) -> Result<(), error::AgentError> {
1431        if let Some(token) = self.plan_cancel_token.take() {
1432            // In-flight plan: signal cancellation. The scheduler loop will pick this up
1433            // in the next tokio::select! iteration at wait_event().
1434            // NOTE: Due to &mut self being held by run_scheduler_loop, this branch is only
1435            // reachable if the channel has a concurrent reader (e.g. Telegram, TUI events).
1436            // CLI and synchronous channels cannot deliver this while the loop is active
1437            // (see #1603, SEC-M34-002).
1438            token.cancel();
1439            self.channel.send("Canceling plan execution...").await?;
1440        } else if self.pending_graph.take().is_some() {
1441            let now = std::time::Instant::now();
1442            self.update_metrics(|m| {
1443                if let Some(ref mut s) = m.orchestration_graph {
1444                    "canceled".clone_into(&mut s.status);
1445                    s.completed_at = Some(now);
1446                }
1447            });
1448            self.channel.send("Plan canceled.").await?;
1449        } else {
1450            self.channel.send("No active plan to cancel.").await?;
1451        }
1452        Ok(())
1453    }
1454
1455    /// Resume a paused graph (Ask failure strategy triggered a pause).
1456    ///
1457    /// Looks for a pending graph in `Paused` status. If `graph_id` is provided
1458    /// it must match the active graph's id (SEC-P5-03).
1459    async fn handle_plan_resume(
1460        &mut self,
1461        graph_id: Option<&str>,
1462    ) -> Result<(), error::AgentError> {
1463        use crate::orchestration::GraphStatus;
1464
1465        let Some(ref graph) = self.pending_graph else {
1466            self.channel
1467                .send("No paused plan to resume. Use `/plan status` to check the current state.")
1468                .await?;
1469            return Ok(());
1470        };
1471
1472        // SEC-P5-03: if a graph_id was provided, reject if it doesn't match.
1473        if let Some(id) = graph_id
1474            && graph.id.to_string() != id
1475        {
1476            self.channel
1477                .send(&format!(
1478                    "Graph id '{id}' does not match the active plan ({}). \
1479                     Use `/plan status` to see the active plan id.",
1480                    graph.id
1481                ))
1482                .await?;
1483            return Ok(());
1484        }
1485
1486        if graph.status != GraphStatus::Paused {
1487            self.channel
1488                .send(&format!(
1489                    "The active plan is in '{}' status and cannot be resumed. \
1490                     Only Paused plans can be resumed.",
1491                    graph.status
1492                ))
1493                .await?;
1494            return Ok(());
1495        }
1496
1497        let graph = self.pending_graph.take().unwrap();
1498
1499        tracing::info!(
1500            graph_id = %graph.id,
1501            "resuming paused graph"
1502        );
1503
1504        self.channel
1505            .send(&format!(
1506                "Resuming plan: {}\nUse `/plan confirm` to continue execution.",
1507                graph.goal
1508            ))
1509            .await?;
1510
1511        // Store resumed graph back as pending. resume_from() will set status=Running in confirm.
1512        self.pending_graph = Some(graph);
1513        Ok(())
1514    }
1515
1516    /// Retry failed tasks in a graph.
1517    ///
1518    /// Resets all `Failed` tasks to `Ready` and all `Skipped` dependents back
1519    /// to `Pending`, then re-stores the graph as pending for re-execution.
1520    /// If `graph_id` is provided it must match the active graph's id (SEC-P5-04).
1521    async fn handle_plan_retry(&mut self, graph_id: Option<&str>) -> Result<(), error::AgentError> {
1522        use crate::orchestration::{GraphStatus, dag};
1523
1524        let Some(ref graph) = self.pending_graph else {
1525            self.channel
1526                .send("No active plan to retry. Use `/plan status` to check the current state.")
1527                .await?;
1528            return Ok(());
1529        };
1530
1531        // SEC-P5-04: if a graph_id was provided, reject if it doesn't match.
1532        if let Some(id) = graph_id
1533            && graph.id.to_string() != id
1534        {
1535            self.channel
1536                .send(&format!(
1537                    "Graph id '{id}' does not match the active plan ({}). \
1538                     Use `/plan status` to see the active plan id.",
1539                    graph.id
1540                ))
1541                .await?;
1542            return Ok(());
1543        }
1544
1545        if graph.status != GraphStatus::Failed && graph.status != GraphStatus::Paused {
1546            self.channel
1547                .send(&format!(
1548                    "The active plan is in '{}' status. Only Failed or Paused plans can be retried.",
1549                    graph.status
1550                ))
1551                .await?;
1552            return Ok(());
1553        }
1554
1555        let mut graph = self.pending_graph.take().unwrap();
1556
1557        // IC3: count before reset so the message reflects actual failed tasks, not Ready count.
1558        let failed_count = graph
1559            .tasks
1560            .iter()
1561            .filter(|t| t.status == crate::orchestration::TaskStatus::Failed)
1562            .count();
1563
1564        dag::reset_for_retry(&mut graph).map_err(|e| error::AgentError::Other(e.to_string()))?;
1565
1566        // HIGH-1 fix: reset_for_retry only resets Failed/Canceled tasks. Any tasks that were
1567        // in Running state at pause time are left as Running with stale assigned_agent handles
1568        // (those sub-agents are long dead). Reset them to Ready so resume_from() does not try
1569        // to wait for their events.
1570        for task in &mut graph.tasks {
1571            if task.status == crate::orchestration::TaskStatus::Running {
1572                task.status = crate::orchestration::TaskStatus::Ready;
1573                task.assigned_agent = None;
1574            }
1575        }
1576
1577        tracing::info!(
1578            graph_id = %graph.id,
1579            failed_count,
1580            "retrying failed tasks in graph"
1581        );
1582
1583        self.channel
1584            .send(&format!(
1585                "Retrying {failed_count} failed task(s) in plan: {}\n\
1586                 Use `/plan confirm` to execute.",
1587                graph.goal
1588            ))
1589            .await?;
1590
1591        // Store retried graph back for re-execution via /plan confirm.
1592        self.pending_graph = Some(graph);
1593        Ok(())
1594    }
1595
1596    pub async fn shutdown(&mut self) {
1597        self.channel.send("Shutting down...").await.ok();
1598
1599        // CRIT-1: persist Thompson state accumulated during this session.
1600        self.provider.save_router_state();
1601
1602        if let Some(ref mut mgr) = self.subagent_manager {
1603            mgr.shutdown_all();
1604        }
1605
1606        if let Some(ref manager) = self.mcp.manager {
1607            manager.shutdown_all_shared().await;
1608        }
1609
1610        if let Some(ref tx) = self.metrics_tx {
1611            let m = tx.borrow();
1612            if m.filter_applications > 0 {
1613                #[allow(clippy::cast_precision_loss)]
1614                let pct = if m.filter_raw_tokens > 0 {
1615                    m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
1616                } else {
1617                    0.0
1618                };
1619                tracing::info!(
1620                    raw_tokens = m.filter_raw_tokens,
1621                    saved_tokens = m.filter_saved_tokens,
1622                    applications = m.filter_applications,
1623                    "tool output filtering saved ~{} tokens ({pct:.0}%)",
1624                    m.filter_saved_tokens,
1625                );
1626            }
1627        }
1628        tracing::info!("agent shutdown complete");
1629    }
1630
1631    /// Run the chat loop, receiving messages via the channel until EOF or shutdown.
1632    ///
1633    /// # Errors
1634    ///
1635    /// Returns an error if channel I/O or LLM communication fails.
1636    #[allow(clippy::too_many_lines)]
1637    pub async fn run(&mut self) -> Result<(), error::AgentError> {
1638        if let Some(mut rx) = self.warmup_ready.take()
1639            && !*rx.borrow()
1640        {
1641            let _ = rx.changed().await;
1642            if !*rx.borrow() {
1643                tracing::warn!("model warmup did not complete successfully");
1644            }
1645        }
1646
1647        loop {
1648            // Apply any pending provider override (from ACP set_session_config_option).
1649            if let Some(ref slot) = self.provider_override
1650                && let Some(new_provider) = slot
1651                    .write()
1652                    .unwrap_or_else(std::sync::PoisonError::into_inner)
1653                    .take()
1654            {
1655                tracing::debug!(provider = new_provider.name(), "ACP model override applied");
1656                self.provider = new_provider;
1657            }
1658
1659            // Refresh sub-agent status in metrics before polling.
1660            if let Some(ref mgr) = self.subagent_manager {
1661                let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
1662                    .statuses()
1663                    .into_iter()
1664                    .map(|(id, s)| {
1665                        let def = mgr.agents_def(&id);
1666                        crate::metrics::SubAgentMetrics {
1667                            name: def.map_or_else(
1668                                || id[..8.min(id.len())].to_owned(),
1669                                |d| d.name.clone(),
1670                            ),
1671                            id: id.clone(),
1672                            state: format!("{:?}", s.state).to_lowercase(),
1673                            turns_used: s.turns_used,
1674                            max_turns: def.map_or(20, |d| d.permissions.max_turns),
1675                            background: def.is_some_and(|d| d.permissions.background),
1676                            elapsed_secs: s.started_at.elapsed().as_secs(),
1677                            permission_mode: def.map_or_else(String::new, |d| {
1678                                use crate::subagent::def::PermissionMode;
1679                                match d.permissions.permission_mode {
1680                                    PermissionMode::Default => String::new(),
1681                                    PermissionMode::AcceptEdits => "accept_edits".into(),
1682                                    PermissionMode::DontAsk => "dont_ask".into(),
1683                                    PermissionMode::BypassPermissions => {
1684                                        "bypass_permissions".into()
1685                                    }
1686                                    PermissionMode::Plan => "plan".into(),
1687                                }
1688                            }),
1689                        }
1690                    })
1691                    .collect();
1692                self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
1693            }
1694
1695            // Non-blocking poll: notify user when background sub-agents complete.
1696            let completed = self.poll_subagents().await;
1697            for (task_id, result) in completed {
1698                let notice = if result.is_empty() {
1699                    format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
1700                } else {
1701                    format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
1702                };
1703                if let Err(e) = self.channel.send(&notice).await {
1704                    tracing::warn!(error = %e, "failed to send sub-agent completion notice");
1705                }
1706            }
1707
1708            self.drain_channel();
1709
1710            let (text, image_parts) = if let Some(queued) = self.message_queue.pop_front() {
1711                self.notify_queue_count().await;
1712                if queued.raw_attachments.is_empty() {
1713                    (queued.text, queued.image_parts)
1714                } else {
1715                    let msg = crate::channel::ChannelMessage {
1716                        text: queued.text,
1717                        attachments: queued.raw_attachments,
1718                    };
1719                    self.resolve_message(msg).await
1720                }
1721            } else {
1722                let incoming = tokio::select! {
1723                    result = self.channel.recv() => result?,
1724                    () = shutdown_signal(&mut self.shutdown) => {
1725                        tracing::info!("shutting down");
1726                        break;
1727                    }
1728                    Some(_) = recv_optional(&mut self.skill_state.skill_reload_rx) => {
1729                        self.reload_skills().await;
1730                        continue;
1731                    }
1732                    Some(_) = recv_optional(&mut self.instruction_reload_rx) => {
1733                        self.reload_instructions();
1734                        continue;
1735                    }
1736                    Some(_) = recv_optional(&mut self.config_reload_rx) => {
1737                        self.reload_config();
1738                        continue;
1739                    }
1740                    Some(msg) = recv_optional(&mut self.update_notify_rx) => {
1741                        if let Err(e) = self.channel.send(&msg).await {
1742                            tracing::warn!("failed to send update notification: {e}");
1743                        }
1744                        continue;
1745                    }
1746                    Some(msg) = recv_optional(&mut self.experiment_notify_rx) => {
1747                        // Experiment engine completed (ok or err). Clear the cancel token so
1748                        // status reports idle and new experiments can be started.
1749                        #[cfg(feature = "experiments")]
1750                        { self.experiment_cancel = None; }
1751                        if let Err(e) = self.channel.send(&msg).await {
1752                            tracing::warn!("failed to send experiment completion: {e}");
1753                        }
1754                        continue;
1755                    }
1756                    Some(prompt) = recv_optional(&mut self.custom_task_rx) => {
1757                        tracing::info!("scheduler: injecting custom task as agent turn");
1758                        let text = format!("[Scheduled task] {prompt}");
1759                        Some(crate::channel::ChannelMessage { text, attachments: Vec::new() })
1760                    }
1761                };
1762                let Some(msg) = incoming else { break };
1763                self.drain_channel();
1764                self.resolve_message(msg).await
1765            };
1766
1767            let trimmed = text.trim();
1768
1769            if trimmed == "/clear-queue" {
1770                let n = self.clear_queue();
1771                self.notify_queue_count().await;
1772                self.channel
1773                    .send(&format!("Cleared {n} queued messages."))
1774                    .await?;
1775                let _ = self.channel.flush_chunks().await;
1776                continue;
1777            }
1778
1779            if trimmed == "/compact" {
1780                if self.messages.len() > self.context_manager.compaction_preserve_tail + 1 {
1781                    match self.compact_context().await {
1782                        Ok(()) => {
1783                            let _ = self.channel.send("Context compacted successfully.").await;
1784                        }
1785                        Err(e) => {
1786                            let _ = self.channel.send(&format!("Compaction failed: {e}")).await;
1787                        }
1788                    }
1789                } else {
1790                    let _ = self.channel.send("Nothing to compact.").await;
1791                }
1792                let _ = self.channel.flush_chunks().await;
1793                continue;
1794            }
1795
1796            if trimmed == "/clear" {
1797                self.clear_history();
1798                let _ = self.channel.flush_chunks().await;
1799                continue;
1800            }
1801
1802            if trimmed == "/model" || trimmed.starts_with("/model ") {
1803                self.handle_model_command(trimmed).await;
1804                let _ = self.channel.flush_chunks().await;
1805                continue;
1806            }
1807
1808            if trimmed == "/debug-dump" || trimmed.starts_with("/debug-dump ") {
1809                self.handle_debug_dump_command(trimmed).await;
1810                let _ = self.channel.flush_chunks().await;
1811                continue;
1812            }
1813
1814            if trimmed == "/exit" || trimmed == "/quit" {
1815                if self.channel.supports_exit() {
1816                    break;
1817                }
1818                let _ = self
1819                    .channel
1820                    .send("/exit is not supported in this channel.")
1821                    .await;
1822                continue;
1823            }
1824
1825            self.process_user_message(text, image_parts).await?;
1826        }
1827
1828        Ok(())
1829    }
1830
1831    /// Switch the active provider to one serving `model_id`.
1832    ///
1833    /// Looks up the model in the provider's remote model list (or cache).
1834    ///
1835    /// # Errors
1836    ///
1837    /// Returns `Err` if the model is not found.
1838    pub fn set_model(&mut self, model_id: &str) -> Result<(), String> {
1839        if model_id.is_empty() {
1840            return Err("model id must not be empty".to_string());
1841        }
1842        if model_id.len() > 256 {
1843            return Err("model id exceeds maximum length of 256 characters".to_string());
1844        }
1845        if !model_id
1846            .chars()
1847            .all(|c| c.is_ascii() && !c.is_ascii_control())
1848        {
1849            return Err("model id must contain only printable ASCII characters".to_string());
1850        }
1851        self.runtime.model_name = model_id.to_string();
1852        tracing::info!(model = model_id, "set_model called");
1853        Ok(())
1854    }
1855
1856    /// Handle `/model`, `/model <id>`, and `/model refresh` commands.
1857    #[allow(clippy::too_many_lines)]
1858    async fn handle_model_command(&mut self, trimmed: &str) {
1859        let arg = trimmed.strip_prefix("/model").map_or("", str::trim);
1860
1861        if arg == "refresh" {
1862            // Invalidate all model cache files in the cache directory.
1863            if let Some(cache_dir) = dirs::cache_dir() {
1864                let models_dir = cache_dir.join("zeph").join("models");
1865                if let Ok(entries) = std::fs::read_dir(&models_dir) {
1866                    for entry in entries.flatten() {
1867                        let path = entry.path();
1868                        if path.extension().and_then(|e| e.to_str()) == Some("json") {
1869                            let _ = std::fs::remove_file(&path);
1870                        }
1871                    }
1872                }
1873            }
1874            match self.provider.list_models_remote().await {
1875                Ok(models) => {
1876                    let _ = self
1877                        .channel
1878                        .send(&format!("Fetched {} models.", models.len()))
1879                        .await;
1880                }
1881                Err(e) => {
1882                    let _ = self
1883                        .channel
1884                        .send(&format!("Error fetching models: {e}"))
1885                        .await;
1886                }
1887            }
1888            return;
1889        }
1890
1891        if arg.is_empty() {
1892            // List models: try cache first, then remote.
1893            let cache = zeph_llm::model_cache::ModelCache::for_slug(self.provider.name());
1894            let models = if cache.is_stale() {
1895                None
1896            } else {
1897                cache.load().unwrap_or(None)
1898            };
1899            let models = if let Some(m) = models {
1900                m
1901            } else {
1902                match self.provider.list_models_remote().await {
1903                    Ok(m) => m,
1904                    Err(e) => {
1905                        let _ = self
1906                            .channel
1907                            .send(&format!("Error fetching models: {e}"))
1908                            .await;
1909                        return;
1910                    }
1911                }
1912            };
1913
1914            if models.is_empty() {
1915                let _ = self.channel.send("No models available.").await;
1916                return;
1917            }
1918            let mut lines = vec!["Available models:".to_string()];
1919            for (i, m) in models.iter().enumerate() {
1920                lines.push(format!("  {}. {} ({})", i + 1, m.display_name, m.id));
1921            }
1922            let _ = self.channel.send(&lines.join("\n")).await;
1923            return;
1924        }
1925
1926        // `/model <id>` — switch model
1927        let model_id = arg;
1928
1929        // Validate model_id against the known model list before switching.
1930        // Try disk cache first; fall back to a remote fetch if the cache is stale.
1931        let cache = zeph_llm::model_cache::ModelCache::for_slug(self.provider.name());
1932        let known_models: Option<Vec<zeph_llm::model_cache::RemoteModelInfo>> = if cache.is_stale()
1933        {
1934            match self.provider.list_models_remote().await {
1935                Ok(m) if !m.is_empty() => Some(m),
1936                _ => None,
1937            }
1938        } else {
1939            cache.load().unwrap_or(None)
1940        };
1941        if let Some(models) = known_models {
1942            if !models.iter().any(|m| m.id == model_id) {
1943                let mut lines = vec![format!("Unknown model '{model_id}'. Available models:")];
1944                for m in &models {
1945                    lines.push(format!("  • {} ({})", m.display_name, m.id));
1946                }
1947                let _ = self.channel.send(&lines.join("\n")).await;
1948                return;
1949            }
1950        } else {
1951            let _ = self
1952                .channel
1953                .send(
1954                    "Model list unavailable, switching anyway — verify your model name is correct.",
1955                )
1956                .await;
1957        }
1958
1959        match self.set_model(model_id) {
1960            Ok(()) => {
1961                let _ = self
1962                    .channel
1963                    .send(&format!("Switched to model: {model_id}"))
1964                    .await;
1965            }
1966            Err(e) => {
1967                let _ = self.channel.send(&format!("Error: {e}")).await;
1968            }
1969        }
1970    }
1971
1972    /// Handle `/debug-dump` and `/debug-dump <path>` commands.
1973    async fn handle_debug_dump_command(&mut self, trimmed: &str) {
1974        let arg = trimmed.strip_prefix("/debug-dump").map_or("", str::trim);
1975        if arg.is_empty() {
1976            match &self.debug_state.debug_dumper {
1977                Some(d) => {
1978                    let _ = self
1979                        .channel
1980                        .send(&format!("Debug dump active: {}", d.dir().display()))
1981                        .await;
1982                }
1983                None => {
1984                    let _ = self
1985                        .channel
1986                        .send(
1987                            "Debug dump is inactive. Use `/debug-dump <path>` to enable, \
1988                             or start with `--debug-dump [dir]`.",
1989                        )
1990                        .await;
1991                }
1992            }
1993            return;
1994        }
1995        let dir = std::path::PathBuf::from(arg);
1996        match crate::debug_dump::DebugDumper::new(&dir, self.debug_state.dump_format) {
1997            Ok(dumper) => {
1998                let path = dumper.dir().display().to_string();
1999                self.debug_state.debug_dumper = Some(dumper);
2000                let _ = self
2001                    .channel
2002                    .send(&format!("Debug dump enabled: {path}"))
2003                    .await;
2004            }
2005            Err(e) => {
2006                let _ = self
2007                    .channel
2008                    .send(&format!("Failed to enable debug dump: {e}"))
2009                    .await;
2010            }
2011        }
2012    }
2013
2014    async fn resolve_message(
2015        &self,
2016        msg: crate::channel::ChannelMessage,
2017    ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
2018        use crate::channel::{Attachment, AttachmentKind};
2019        use zeph_llm::provider::{ImageData, MessagePart};
2020
2021        let text_base = msg.text.clone();
2022
2023        let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
2024            .attachments
2025            .into_iter()
2026            .partition(|a| a.kind == AttachmentKind::Audio);
2027
2028        tracing::debug!(
2029            audio = audio_attachments.len(),
2030            has_stt = self.stt.is_some(),
2031            "resolve_message attachments"
2032        );
2033
2034        let text = if !audio_attachments.is_empty()
2035            && let Some(stt) = self.stt.as_ref()
2036        {
2037            let mut transcribed_parts = Vec::new();
2038            for attachment in &audio_attachments {
2039                if attachment.data.len() > MAX_AUDIO_BYTES {
2040                    tracing::warn!(
2041                        size = attachment.data.len(),
2042                        max = MAX_AUDIO_BYTES,
2043                        "audio attachment exceeds size limit, skipping"
2044                    );
2045                    continue;
2046                }
2047                match stt
2048                    .transcribe(&attachment.data, attachment.filename.as_deref())
2049                    .await
2050                {
2051                    Ok(result) => {
2052                        tracing::info!(
2053                            len = result.text.len(),
2054                            language = ?result.language,
2055                            "audio transcribed"
2056                        );
2057                        transcribed_parts.push(result.text);
2058                    }
2059                    Err(e) => {
2060                        tracing::error!(error = %e, "audio transcription failed");
2061                    }
2062                }
2063            }
2064            if transcribed_parts.is_empty() {
2065                text_base
2066            } else {
2067                let transcribed = transcribed_parts.join("\n");
2068                if text_base.is_empty() {
2069                    transcribed
2070                } else {
2071                    format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
2072                }
2073            }
2074        } else {
2075            if !audio_attachments.is_empty() {
2076                tracing::warn!(
2077                    count = audio_attachments.len(),
2078                    "audio attachments received but no STT provider configured, dropping"
2079                );
2080            }
2081            text_base
2082        };
2083
2084        let mut image_parts = Vec::new();
2085        for attachment in image_attachments {
2086            if attachment.data.len() > MAX_IMAGE_BYTES {
2087                tracing::warn!(
2088                    size = attachment.data.len(),
2089                    max = MAX_IMAGE_BYTES,
2090                    "image attachment exceeds size limit, skipping"
2091                );
2092                continue;
2093            }
2094            let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
2095            image_parts.push(MessagePart::Image(Box::new(ImageData {
2096                data: attachment.data,
2097                mime_type,
2098            })));
2099        }
2100
2101        (text, image_parts)
2102    }
2103
2104    #[allow(clippy::too_many_lines)]
2105    async fn process_user_message(
2106        &mut self,
2107        text: String,
2108        image_parts: Vec<zeph_llm::provider::MessagePart>,
2109    ) -> Result<(), error::AgentError> {
2110        self.cancel_token = CancellationToken::new();
2111        let signal = Arc::clone(&self.cancel_signal);
2112        let token = self.cancel_token.clone();
2113        tokio::spawn(async move {
2114            signal.notified().await;
2115            token.cancel();
2116        });
2117        let trimmed = text.trim();
2118
2119        if trimmed == "/help" {
2120            self.handle_help_command().await?;
2121            let _ = self.channel.flush_chunks().await;
2122            return Ok(());
2123        }
2124
2125        if trimmed == "/status" {
2126            self.handle_status_command().await?;
2127            let _ = self.channel.flush_chunks().await;
2128            return Ok(());
2129        }
2130
2131        if trimmed == "/skills" {
2132            self.handle_skills_command().await?;
2133            let _ = self.channel.flush_chunks().await;
2134            return Ok(());
2135        }
2136
2137        if trimmed == "/skill" || trimmed.starts_with("/skill ") {
2138            let rest = trimmed.strip_prefix("/skill").unwrap_or("").trim();
2139            self.handle_skill_command(rest).await?;
2140            let _ = self.channel.flush_chunks().await;
2141            return Ok(());
2142        }
2143
2144        if trimmed == "/feedback" || trimmed.starts_with("/feedback ") {
2145            let rest = trimmed.strip_prefix("/feedback").unwrap_or("").trim();
2146            self.handle_feedback(rest).await?;
2147            let _ = self.channel.flush_chunks().await;
2148            return Ok(());
2149        }
2150
2151        if trimmed == "/mcp" || trimmed.starts_with("/mcp ") {
2152            let args = trimmed.strip_prefix("/mcp").unwrap_or("").trim();
2153            self.handle_mcp_command(args).await?;
2154            let _ = self.channel.flush_chunks().await;
2155            return Ok(());
2156        }
2157
2158        if trimmed == "/image" || trimmed.starts_with("/image ") {
2159            let path = trimmed.strip_prefix("/image").unwrap_or("").trim();
2160            if path.is_empty() {
2161                self.channel.send("Usage: /image <path>").await?;
2162                let _ = self.channel.flush_chunks().await;
2163                return Ok(());
2164            }
2165            self.handle_image_command(path).await?;
2166            let _ = self.channel.flush_chunks().await;
2167            return Ok(());
2168        }
2169
2170        if trimmed == "/plan" || trimmed.starts_with("/plan ") {
2171            match crate::orchestration::PlanCommand::parse(trimmed) {
2172                Ok(cmd) => {
2173                    self.handle_plan_command(cmd).await?;
2174                    let _ = self.channel.flush_chunks().await;
2175                    return Ok(());
2176                }
2177                Err(e) => {
2178                    self.channel.send(&e.to_string()).await?;
2179                    let _ = self.channel.flush_chunks().await;
2180                    return Ok(());
2181                }
2182            }
2183        }
2184
2185        if trimmed == "/graph" || trimmed.starts_with("/graph ") {
2186            self.handle_graph_command(trimmed).await?;
2187            let _ = self.channel.flush_chunks().await;
2188            return Ok(());
2189        }
2190
2191        #[cfg(feature = "scheduler")]
2192        if trimmed == "/scheduler" || trimmed.starts_with("/scheduler ") {
2193            self.handle_scheduler_command(trimmed).await?;
2194            let _ = self.channel.flush_chunks().await;
2195            return Ok(());
2196        }
2197
2198        #[cfg(feature = "experiments")]
2199        if trimmed == "/experiment" || trimmed.starts_with("/experiment ") {
2200            self.handle_experiment_command(trimmed).await?;
2201            let _ = self.channel.flush_chunks().await;
2202            return Ok(());
2203        }
2204
2205        #[cfg(feature = "lsp-context")]
2206        if trimmed == "/lsp" {
2207            self.handle_lsp_status_command().await?;
2208            let _ = self.channel.flush_chunks().await;
2209            return Ok(());
2210        }
2211
2212        if trimmed == "/log" {
2213            self.handle_log_command().await?;
2214            let _ = self.channel.flush_chunks().await;
2215            return Ok(());
2216        }
2217
2218        if trimmed.starts_with("/agent") || trimmed.starts_with('@') {
2219            let known: Vec<String> = self
2220                .subagent_manager
2221                .as_ref()
2222                .map(|m| m.definitions().iter().map(|d| d.name.clone()).collect())
2223                .unwrap_or_default();
2224            match crate::subagent::AgentCommand::parse(trimmed, &known) {
2225                Ok(cmd) => {
2226                    if let Some(msg) = self.handle_agent_command(cmd).await {
2227                        self.channel.send(&msg).await?;
2228                    }
2229                    let _ = self.channel.flush_chunks().await;
2230                    return Ok(());
2231                }
2232                Err(e) if trimmed.starts_with('@') => {
2233                    // Unknown @token — fall through to normal LLM processing
2234                    tracing::debug!("@mention not matched as agent: {e}");
2235                }
2236                Err(e) => {
2237                    self.channel.send(&e.to_string()).await?;
2238                    let _ = self.channel.flush_chunks().await;
2239                    return Ok(());
2240                }
2241            }
2242        }
2243
2244        self.check_pending_rollbacks().await;
2245        // Extract before rebuild_system_prompt so the value is not tainted
2246        // by the secrets-bearing system prompt (ConversationId is just an i64).
2247        let conv_id = self.memory_state.conversation_id;
2248        self.rebuild_system_prompt(&text).await;
2249
2250        let correction_detection_enabled = self
2251            .learning_engine
2252            .config
2253            .as_ref()
2254            .is_none_or(|c| c.correction_detection);
2255        if self.is_learning_enabled() && correction_detection_enabled {
2256            let previous_user_messages: Vec<&str> = self
2257                .messages
2258                .iter()
2259                .filter(|m| m.role == Role::User)
2260                .map(|m| m.content.as_str())
2261                .collect();
2262            let regex_signal = self
2263                .feedback_detector
2264                .detect(trimmed, &previous_user_messages);
2265
2266            // Judge mode: invoke LLM in background if regex is borderline or missed.
2267            //
2268            // The judge call is decoupled from the response pipeline — it records the
2269            // correction asynchronously via tokio::spawn and returns None immediately
2270            // so the user response is not blocked.
2271            //
2272            // TODO(I3): JoinHandles are not tracked — outstanding tasks may be aborted
2273            // on runtime shutdown before store_user_correction completes. This is
2274            // acceptable for the learning subsystem at MVP. Future: collect handles in
2275            // Agent and drain on graceful shutdown.
2276            // Check rate limit synchronously before deciding to spawn.
2277            // The judge_detector is &mut self so check_rate_limit() can update call_times.
2278            let judge_should_run = self
2279                .judge_detector
2280                .as_ref()
2281                .is_some_and(|jd| jd.should_invoke(regex_signal.as_ref()))
2282                && self
2283                    .judge_detector
2284                    .as_mut()
2285                    .is_some_and(feedback_detector::JudgeDetector::check_rate_limit);
2286
2287            let signal = if judge_should_run {
2288                let judge_provider = self
2289                    .judge_provider
2290                    .clone()
2291                    .unwrap_or_else(|| self.provider.clone());
2292                let assistant_snippet = self.last_assistant_response();
2293                let user_msg_owned = trimmed.to_owned();
2294                let memory_arc = self.memory_state.memory.clone();
2295                let skill_name = self
2296                    .skill_state
2297                    .active_skill_names
2298                    .first()
2299                    .cloned()
2300                    .unwrap_or_default();
2301                let conv_id_bg = conv_id;
2302                // Extract only the scalar config values needed by the spawned task.
2303                let confidence_threshold = self
2304                    .learning_engine
2305                    .config
2306                    .as_ref()
2307                    .map_or(0.6, |c| c.correction_confidence_threshold);
2308
2309                tokio::spawn(async move {
2310                    match feedback_detector::JudgeDetector::evaluate(
2311                        &judge_provider,
2312                        &user_msg_owned,
2313                        &assistant_snippet,
2314                        confidence_threshold,
2315                    )
2316                    .await
2317                    {
2318                        Ok(verdict) => {
2319                            if let Some(signal) = verdict.into_signal(&user_msg_owned) {
2320                                // Self-corrections (user corrects their own statement) must not
2321                                // penalize skills. The judge path has no record_skill_outcomes()
2322                                // call today, but this guard mirrors the regex path to make the
2323                                // intent explicit and prevent future regressions if parity is added.
2324                                let is_self_correction = signal.kind
2325                                    == feedback_detector::CorrectionKind::SelfCorrection;
2326                                tracing::info!(
2327                                    kind = signal.kind.as_str(),
2328                                    confidence = signal.confidence,
2329                                    source = "judge",
2330                                    is_self_correction,
2331                                    "correction signal detected"
2332                                );
2333                                if let Some(memory) = memory_arc {
2334                                    let correction_text =
2335                                        context::truncate_chars(&user_msg_owned, 500);
2336                                    match memory
2337                                        .sqlite()
2338                                        .store_user_correction(
2339                                            conv_id_bg.map(|c| c.0),
2340                                            &assistant_snippet,
2341                                            &correction_text,
2342                                            if skill_name.is_empty() {
2343                                                None
2344                                            } else {
2345                                                Some(skill_name.as_str())
2346                                            },
2347                                            signal.kind.as_str(),
2348                                        )
2349                                        .await
2350                                    {
2351                                        Ok(correction_id) => {
2352                                            if let Err(e) = memory
2353                                                .store_correction_embedding(
2354                                                    correction_id,
2355                                                    &correction_text,
2356                                                )
2357                                                .await
2358                                            {
2359                                                tracing::warn!(
2360                                                    "failed to store correction embedding: {e:#}"
2361                                                );
2362                                            }
2363                                        }
2364                                        Err(e) => {
2365                                            tracing::warn!(
2366                                                "failed to store judge correction: {e:#}"
2367                                            );
2368                                        }
2369                                    }
2370                                }
2371                            }
2372                        }
2373                        Err(e) => {
2374                            tracing::warn!("judge detector failed: {e:#}");
2375                        }
2376                    }
2377                });
2378
2379                // Judge runs in background — return None so the response pipeline continues.
2380                None
2381            } else {
2382                regex_signal
2383            };
2384
2385            if let Some(signal) = signal {
2386                tracing::info!(
2387                    kind = signal.kind.as_str(),
2388                    confidence = signal.confidence,
2389                    source = "regex",
2390                    "implicit correction detected"
2391                );
2392                // REV-PH2-002 + SEC-PH2-002: cap feedback_text to 500 chars (UTF-8 safe)
2393                let feedback_text = context::truncate_chars(&signal.feedback_text, 500);
2394                // Self-corrections (user corrects their own statement) must not penalize skills —
2395                // the agent did nothing wrong. Store for analytics but skip skill outcome recording.
2396                if signal.kind != feedback_detector::CorrectionKind::SelfCorrection {
2397                    self.record_skill_outcomes(
2398                        "user_rejection",
2399                        Some(&feedback_text),
2400                        Some(signal.kind.as_str()),
2401                    )
2402                    .await;
2403                }
2404                if let Some(memory) = &self.memory_state.memory {
2405                    // Use `trimmed` (raw user input, untainted by secrets) instead of
2406                    // `feedback_text` (derived from previous_user_messages → self.messages)
2407                    // to avoid the CodeQL cleartext-logging taint path.
2408                    let correction_text = context::truncate_chars(trimmed, 500);
2409                    match memory
2410                        .sqlite()
2411                        .store_user_correction(
2412                            conv_id.map(|c| c.0),
2413                            "",
2414                            &correction_text,
2415                            self.skill_state
2416                                .active_skill_names
2417                                .first()
2418                                .map(String::as_str),
2419                            signal.kind.as_str(),
2420                        )
2421                        .await
2422                    {
2423                        Ok(correction_id) => {
2424                            if let Err(e) = memory
2425                                .store_correction_embedding(correction_id, &correction_text)
2426                                .await
2427                            {
2428                                tracing::warn!("failed to store correction embedding: {e:#}");
2429                            }
2430                        }
2431                        Err(e) => tracing::warn!("failed to store user correction: {e:#}"),
2432                    }
2433                }
2434            }
2435        }
2436
2437        // Reset per-turn compaction guard at the start of context management phase.
2438        self.context_manager.compacted_this_turn = false;
2439
2440        // Tier 0: batch-apply deferred tool summaries when approaching context limit.
2441        // This is a pure in-memory operation (no LLM call) — summaries were pre-computed
2442        // during the tool loop. Intentionally does NOT set compacted_this_turn, so
2443        // proactive/reactive compaction may still fire if tokens remain above their thresholds.
2444        self.maybe_apply_deferred_summaries();
2445
2446        // Proactive compression fires first (if configured); if it runs, reactive is skipped.
2447        if let Err(e) = self.maybe_proactive_compress().await {
2448            tracing::warn!("proactive compression failed: {e:#}");
2449        }
2450
2451        if let Err(e) = self.maybe_compact().await {
2452            tracing::warn!("context compaction failed: {e:#}");
2453        }
2454
2455        if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
2456            tracing::warn!("context preparation failed: {e:#}");
2457        }
2458
2459        self.learning_engine.reset_reflection();
2460
2461        let mut all_image_parts = std::mem::take(&mut self.pending_image_parts);
2462        all_image_parts.extend(image_parts);
2463        let image_parts = all_image_parts;
2464
2465        let user_msg = if !image_parts.is_empty() && self.provider.supports_vision() {
2466            let mut parts = vec![zeph_llm::provider::MessagePart::Text { text: text.clone() }];
2467            parts.extend(image_parts);
2468            Message::from_parts(Role::User, parts)
2469        } else {
2470            if !image_parts.is_empty() {
2471                tracing::warn!(
2472                    count = image_parts.len(),
2473                    "image attachments dropped: provider does not support vision"
2474                );
2475            }
2476            Message {
2477                role: Role::User,
2478                content: text.clone(),
2479                parts: vec![],
2480                metadata: MessageMetadata::default(),
2481            }
2482        };
2483        // Image parts intentionally excluded — base64 payloads too large for message history.
2484        self.persist_message(Role::User, &text, &[], false).await;
2485        self.push_message(user_msg);
2486
2487        if let Err(e) = self.process_response().await {
2488            tracing::error!("Response processing failed: {e:#}");
2489            let user_msg = format!("Error: {e:#}");
2490            self.channel.send(&user_msg).await?;
2491            self.messages.pop();
2492            self.recompute_prompt_tokens();
2493            self.channel.flush_chunks().await?;
2494        }
2495
2496        Ok(())
2497    }
2498
2499    async fn handle_image_command(&mut self, path: &str) -> Result<(), error::AgentError> {
2500        use std::path::Component;
2501        use zeph_llm::provider::{ImageData, MessagePart};
2502
2503        // Reject paths that traverse outside the current directory.
2504        let has_parent_dir = std::path::Path::new(path)
2505            .components()
2506            .any(|c| c == Component::ParentDir);
2507        if has_parent_dir {
2508            self.channel
2509                .send("Invalid image path: path traversal not allowed")
2510                .await?;
2511            let _ = self.channel.flush_chunks().await;
2512            return Ok(());
2513        }
2514
2515        let data = match std::fs::read(path) {
2516            Ok(d) => d,
2517            Err(e) => {
2518                self.channel
2519                    .send(&format!("Cannot read image {path}: {e}"))
2520                    .await?;
2521                let _ = self.channel.flush_chunks().await;
2522                return Ok(());
2523            }
2524        };
2525        if data.len() > MAX_IMAGE_BYTES {
2526            self.channel
2527                .send(&format!(
2528                    "Image {path} exceeds size limit ({} MB), skipping",
2529                    MAX_IMAGE_BYTES / 1024 / 1024
2530                ))
2531                .await?;
2532            let _ = self.channel.flush_chunks().await;
2533            return Ok(());
2534        }
2535        let mime_type = detect_image_mime(Some(path)).to_string();
2536        self.pending_image_parts
2537            .push(MessagePart::Image(Box::new(ImageData { data, mime_type })));
2538        self.channel
2539            .send(&format!("Image loaded: {path}. Send your message."))
2540            .await?;
2541        let _ = self.channel.flush_chunks().await;
2542        Ok(())
2543    }
2544
2545    async fn handle_help_command(&mut self) -> Result<(), error::AgentError> {
2546        use std::fmt::Write;
2547
2548        let mut out = String::from("Slash commands:\n\n");
2549
2550        let categories = [
2551            slash_commands::SlashCategory::Info,
2552            slash_commands::SlashCategory::Session,
2553            slash_commands::SlashCategory::Model,
2554            slash_commands::SlashCategory::Memory,
2555            slash_commands::SlashCategory::Tools,
2556            slash_commands::SlashCategory::Planning,
2557            slash_commands::SlashCategory::Debug,
2558            slash_commands::SlashCategory::Advanced,
2559        ];
2560
2561        for cat in &categories {
2562            let entries: Vec<_> = slash_commands::COMMANDS
2563                .iter()
2564                .filter(|c| &c.category == cat)
2565                .collect();
2566            if entries.is_empty() {
2567                continue;
2568            }
2569            let _ = writeln!(out, "{}:", cat.as_str());
2570            for cmd in entries {
2571                if cmd.args.is_empty() {
2572                    let _ = write!(out, "  {}", cmd.name);
2573                } else {
2574                    let _ = write!(out, "  {} {}", cmd.name, cmd.args);
2575                }
2576                let _ = write!(out, "  — {}", cmd.description);
2577                if let Some(feat) = cmd.feature_gate {
2578                    let _ = write!(out, " [requires: {feat}]");
2579                }
2580                let _ = writeln!(out);
2581            }
2582            let _ = writeln!(out);
2583        }
2584
2585        self.channel.send(out.trim_end()).await?;
2586        Ok(())
2587    }
2588
2589    async fn handle_status_command(&mut self) -> Result<(), error::AgentError> {
2590        use std::fmt::Write;
2591
2592        let uptime = self.start_time.elapsed().as_secs();
2593        let msg_count = self
2594            .messages
2595            .iter()
2596            .filter(|m| m.role == Role::User)
2597            .count();
2598
2599        let (api_calls, prompt_tokens, completion_tokens, cost_cents, mcp_servers) =
2600            if let Some(ref tx) = self.metrics_tx {
2601                let m = tx.borrow();
2602                (
2603                    m.api_calls,
2604                    m.prompt_tokens,
2605                    m.completion_tokens,
2606                    m.cost_spent_cents,
2607                    m.mcp_server_count,
2608                )
2609            } else {
2610                (0, 0, 0, 0.0, 0)
2611            };
2612
2613        let skill_count = self
2614            .skill_state
2615            .registry
2616            .read()
2617            .map(|r| r.all_meta().len())
2618            .unwrap_or(0);
2619
2620        let mut out = String::from("Session status:\n\n");
2621        let _ = writeln!(out, "Provider:  {}", self.provider.name());
2622        let _ = writeln!(out, "Model:     {}", self.runtime.model_name);
2623        let _ = writeln!(out, "Uptime:    {uptime}s");
2624        let _ = writeln!(out, "Turns:     {msg_count}");
2625        let _ = writeln!(out, "API calls: {api_calls}");
2626        let _ = writeln!(
2627            out,
2628            "Tokens:    {prompt_tokens} prompt / {completion_tokens} completion"
2629        );
2630        let _ = writeln!(out, "Skills:    {skill_count}");
2631        let _ = writeln!(out, "MCP:       {mcp_servers} server(s)");
2632        if cost_cents > 0.0 {
2633            let _ = writeln!(out, "Cost:      ${:.4}", cost_cents / 100.0);
2634        }
2635
2636        self.channel.send(out.trim_end()).await?;
2637        Ok(())
2638    }
2639
2640    async fn handle_skills_command(&mut self) -> Result<(), error::AgentError> {
2641        use std::fmt::Write;
2642
2643        let mut output = String::from("Available skills:\n\n");
2644
2645        let all_meta: Vec<zeph_skills::loader::SkillMeta> = self
2646            .skill_state
2647            .registry
2648            .read()
2649            .expect("registry read lock")
2650            .all_meta()
2651            .into_iter()
2652            .cloned()
2653            .collect();
2654
2655        for meta in &all_meta {
2656            let trust_info = if let Some(memory) = &self.memory_state.memory {
2657                memory
2658                    .sqlite()
2659                    .load_skill_trust(&meta.name)
2660                    .await
2661                    .ok()
2662                    .flatten()
2663                    .map_or_else(String::new, |r| format!(" [{}]", r.trust_level))
2664            } else {
2665                String::new()
2666            };
2667            let _ = writeln!(output, "- {} — {}{trust_info}", meta.name, meta.description);
2668        }
2669
2670        if let Some(memory) = &self.memory_state.memory {
2671            match memory.sqlite().load_skill_usage().await {
2672                Ok(usage) if !usage.is_empty() => {
2673                    output.push_str("\nUsage statistics:\n\n");
2674                    for row in &usage {
2675                        let _ = writeln!(
2676                            output,
2677                            "- {}: {} invocations (last: {})",
2678                            row.skill_name, row.invocation_count, row.last_used_at,
2679                        );
2680                    }
2681                }
2682                Ok(_) => {}
2683                Err(e) => tracing::warn!("failed to load skill usage: {e:#}"),
2684            }
2685        }
2686
2687        self.channel.send(&output).await?;
2688        Ok(())
2689    }
2690
2691    async fn handle_feedback(&mut self, input: &str) -> Result<(), error::AgentError> {
2692        let Some((name, rest)) = input.split_once(' ') else {
2693            self.channel
2694                .send("Usage: /feedback <skill_name> <message>")
2695                .await?;
2696            return Ok(());
2697        };
2698        let (skill_name, feedback) = (name.trim(), rest.trim().trim_matches('"'));
2699
2700        if feedback.is_empty() {
2701            self.channel
2702                .send("Usage: /feedback <skill_name> <message>")
2703                .await?;
2704            return Ok(());
2705        }
2706
2707        let Some(memory) = &self.memory_state.memory else {
2708            self.channel.send("Memory not available.").await?;
2709            return Ok(());
2710        };
2711
2712        let outcome_type = if self.feedback_detector.detect(feedback, &[]).is_some() {
2713            "user_rejection"
2714        } else {
2715            "user_approval"
2716        };
2717
2718        memory
2719            .sqlite()
2720            .record_skill_outcome(
2721                skill_name,
2722                None,
2723                self.memory_state.conversation_id,
2724                outcome_type,
2725                None,
2726                Some(feedback),
2727            )
2728            .await?;
2729
2730        if self.is_learning_enabled() && outcome_type == "user_rejection" {
2731            self.generate_improved_skill(skill_name, feedback, "", Some(feedback))
2732                .await
2733                .ok();
2734        }
2735
2736        self.channel
2737            .send(&format!("Feedback recorded for \"{skill_name}\"."))
2738            .await?;
2739        Ok(())
2740    }
2741
2742    #[allow(clippy::too_many_lines)]
2743    async fn handle_agent_command(&mut self, cmd: crate::subagent::AgentCommand) -> Option<String> {
2744        use crate::subagent::{AgentCommand, SubAgentState};
2745        use std::fmt::Write as _;
2746
2747        match cmd {
2748            AgentCommand::List => {
2749                let mgr = self.subagent_manager.as_ref()?;
2750                let defs = mgr.definitions();
2751                if defs.is_empty() {
2752                    return Some("No sub-agent definitions found.".into());
2753                }
2754                let mut out = String::from("Available sub-agents:\n");
2755                for d in defs {
2756                    let memory_label = match d.memory {
2757                        Some(crate::subagent::MemoryScope::User) => " [memory:user]",
2758                        Some(crate::subagent::MemoryScope::Project) => " [memory:project]",
2759                        Some(crate::subagent::MemoryScope::Local) => " [memory:local]",
2760                        None => "",
2761                    };
2762                    if let Some(ref src) = d.source {
2763                        let _ = writeln!(
2764                            out,
2765                            "  {}{} — {} ({})",
2766                            d.name, memory_label, d.description, src
2767                        );
2768                    } else {
2769                        let _ = writeln!(out, "  {}{} — {}", d.name, memory_label, d.description);
2770                    }
2771                }
2772                Some(out)
2773            }
2774            AgentCommand::Background { name, prompt } => {
2775                let provider = self.provider.clone();
2776                let tool_executor = Arc::clone(&self.tool_executor);
2777                let skills = self.filtered_skills_for(&name);
2778                let mgr = self.subagent_manager.as_mut()?;
2779                let cfg = self.subagent_config.clone();
2780                match mgr.spawn(&name, &prompt, provider, tool_executor, skills, &cfg) {
2781                    Ok(id) => Some(format!(
2782                        "Sub-agent '{name}' started in background (id: {short})",
2783                        short = &id[..8.min(id.len())]
2784                    )),
2785                    Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
2786                }
2787            }
2788            AgentCommand::Spawn { name, prompt }
2789            | AgentCommand::Mention {
2790                agent: name,
2791                prompt,
2792            } => {
2793                // Foreground spawn: launch and await completion, streaming status to user.
2794                let provider = self.provider.clone();
2795                let tool_executor = Arc::clone(&self.tool_executor);
2796                let skills = self.filtered_skills_for(&name);
2797                let mgr = self.subagent_manager.as_mut()?;
2798                let cfg = self.subagent_config.clone();
2799                let task_id = match mgr.spawn(&name, &prompt, provider, tool_executor, skills, &cfg)
2800                {
2801                    Ok(id) => id,
2802                    Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
2803                };
2804                let short = task_id[..8.min(task_id.len())].to_owned();
2805                let _ = self
2806                    .channel
2807                    .send(&format!("Sub-agent '{name}' running... (id: {short})"))
2808                    .await;
2809                // Poll until the sub-agent reaches a terminal state.
2810                let result = loop {
2811                    tokio::time::sleep(std::time::Duration::from_millis(500)).await;
2812
2813                    // Bridge secret requests from sub-agent to channel.confirm().
2814                    // Fetch the pending request first, then release the borrow before
2815                    // calling channel.confirm() (which requires &mut self).
2816                    #[allow(clippy::redundant_closure_for_method_calls)]
2817                    let pending = self
2818                        .subagent_manager
2819                        .as_mut()
2820                        .and_then(|m| m.try_recv_secret_request());
2821                    if let Some((req_task_id, req)) = pending {
2822                        // req.secret_key is pre-validated to [a-zA-Z0-9_-] in manager.rs
2823                        // (SEC-P1-02), so it is safe to embed in the prompt string.
2824                        //
2825                        // confirm() timeout (30s for Telegram) is a UX timeout — how long to
2826                        // wait for operator input. The grant TTL (300s below) is a security
2827                        // bound on how long an approved secret remains usable. Both values are
2828                        // intentionally different: short confirm window, longer grant lifetime.
2829                        let prompt = format!(
2830                            "Sub-agent requests secret '{}'. Allow?",
2831                            crate::text::truncate_to_chars(&req.secret_key, 100)
2832                        );
2833                        let approved = self.channel.confirm(&prompt).await.unwrap_or(false);
2834                        if let Some(mgr) = self.subagent_manager.as_mut() {
2835                            if approved {
2836                                let ttl = std::time::Duration::from_secs(300);
2837                                let key = req.secret_key.clone();
2838                                if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
2839                                    let _ = mgr.deliver_secret(&req_task_id, key);
2840                                }
2841                            } else {
2842                                let _ = mgr.deny_secret(&req_task_id);
2843                            }
2844                        }
2845                    }
2846
2847                    let mgr = self.subagent_manager.as_ref()?;
2848                    let statuses = mgr.statuses();
2849                    let Some((_, status)) = statuses.iter().find(|(id, _)| id == &task_id) else {
2850                        break "Sub-agent completed (no status available).".to_owned();
2851                    };
2852                    match status.state {
2853                        SubAgentState::Completed => {
2854                            let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
2855                            break format!("Sub-agent '{name}' completed: {msg}");
2856                        }
2857                        SubAgentState::Failed => {
2858                            let msg = status
2859                                .last_message
2860                                .clone()
2861                                .unwrap_or_else(|| "unknown error".into());
2862                            break format!("Sub-agent '{name}' failed: {msg}");
2863                        }
2864                        SubAgentState::Canceled => {
2865                            break format!("Sub-agent '{name}' was cancelled.");
2866                        }
2867                        _ => {
2868                            let _ = self
2869                                .channel
2870                                .send_status(&format!(
2871                                    "sub-agent '{name}': turn {}/{}",
2872                                    status.turns_used,
2873                                    self.subagent_manager
2874                                        .as_ref()
2875                                        .and_then(|m| m.agents_def(&task_id))
2876                                        .map_or(20, |d| d.permissions.max_turns)
2877                                ))
2878                                .await;
2879                        }
2880                    }
2881                };
2882                Some(result)
2883            }
2884            AgentCommand::Status => {
2885                let mgr = self.subagent_manager.as_ref()?;
2886                let statuses = mgr.statuses();
2887                if statuses.is_empty() {
2888                    return Some("No active sub-agents.".into());
2889                }
2890                let mut out = String::from("Active sub-agents:\n");
2891                for (id, s) in &statuses {
2892                    let state = format!("{:?}", s.state).to_lowercase();
2893                    let elapsed = s.started_at.elapsed().as_secs();
2894                    let _ = writeln!(
2895                        out,
2896                        "  [{short}] {state}  turns={t}  elapsed={elapsed}s  {msg}",
2897                        short = &id[..8.min(id.len())],
2898                        t = s.turns_used,
2899                        msg = s.last_message.as_deref().unwrap_or(""),
2900                    );
2901                    // Show memory directory path for agents with memory enabled.
2902                    if let Some(def) = mgr.agents_def(id)
2903                        && let Some(scope) = def.memory
2904                        && let Ok(dir) =
2905                            crate::subagent::memory::resolve_memory_dir(scope, &def.name)
2906                    {
2907                        let _ = writeln!(out, "       memory: {}", dir.display());
2908                    }
2909                }
2910                Some(out)
2911            }
2912            AgentCommand::Cancel { id } => {
2913                let mgr = self.subagent_manager.as_mut()?;
2914                // Accept prefix match on task_id.
2915                let ids: Vec<String> = mgr
2916                    .statuses()
2917                    .into_iter()
2918                    .map(|(task_id, _)| task_id)
2919                    .filter(|task_id| task_id.starts_with(&id))
2920                    .collect();
2921                match ids.as_slice() {
2922                    [] => Some(format!("No sub-agent with id prefix '{id}'")),
2923                    [full_id] => {
2924                        let full_id = full_id.clone();
2925                        match mgr.cancel(&full_id) {
2926                            Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
2927                            Err(e) => Some(format!("Cancel failed: {e}")),
2928                        }
2929                    }
2930                    _ => Some(format!(
2931                        "Ambiguous id prefix '{id}': matches {} agents",
2932                        ids.len()
2933                    )),
2934                }
2935            }
2936            AgentCommand::Approve { id } => {
2937                // Look up pending secret request for the given task_id prefix.
2938                let mgr = self.subagent_manager.as_mut()?;
2939                let full_ids: Vec<String> = mgr
2940                    .statuses()
2941                    .into_iter()
2942                    .map(|(tid, _)| tid)
2943                    .filter(|tid| tid.starts_with(&id))
2944                    .collect();
2945                let full_id = match full_ids.as_slice() {
2946                    [] => return Some(format!("No sub-agent with id prefix '{id}'")),
2947                    [fid] => fid.clone(),
2948                    _ => {
2949                        return Some(format!(
2950                            "Ambiguous id prefix '{id}': matches {} agents",
2951                            full_ids.len()
2952                        ));
2953                    }
2954                };
2955                if let Some((tid, req)) = mgr.try_recv_secret_request()
2956                    && tid == full_id
2957                {
2958                    let key = req.secret_key.clone();
2959                    let ttl = std::time::Duration::from_secs(300);
2960                    if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
2961                        return Some(format!("Approve failed: {e}"));
2962                    }
2963                    if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
2964                        return Some(format!("Secret delivery failed: {e}"));
2965                    }
2966                    return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
2967                }
2968                Some(format!(
2969                    "No pending secret request for sub-agent '{full_id}'."
2970                ))
2971            }
2972            AgentCommand::Deny { id } => {
2973                let mgr = self.subagent_manager.as_mut()?;
2974                let full_ids: Vec<String> = mgr
2975                    .statuses()
2976                    .into_iter()
2977                    .map(|(tid, _)| tid)
2978                    .filter(|tid| tid.starts_with(&id))
2979                    .collect();
2980                let full_id = match full_ids.as_slice() {
2981                    [] => return Some(format!("No sub-agent with id prefix '{id}'")),
2982                    [fid] => fid.clone(),
2983                    _ => {
2984                        return Some(format!(
2985                            "Ambiguous id prefix '{id}': matches {} agents",
2986                            full_ids.len()
2987                        ));
2988                    }
2989                };
2990                match mgr.deny_secret(&full_id) {
2991                    Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
2992                    Err(e) => Some(format!("Deny failed: {e}")),
2993                }
2994            }
2995            AgentCommand::Resume { id, prompt } => {
2996                let cfg = self.subagent_config.clone();
2997                // Resolve definition name from transcript meta before spawning so we can
2998                // look up skills by definition name rather than the UUID prefix (S1 fix).
2999                let def_name = {
3000                    let mgr = self.subagent_manager.as_ref()?;
3001                    match mgr.def_name_for_resume(&id, &cfg) {
3002                        Ok(name) => name,
3003                        Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
3004                    }
3005                };
3006                let skills = self.filtered_skills_for(&def_name);
3007                let provider = self.provider.clone();
3008                let tool_executor = Arc::clone(&self.tool_executor);
3009                let mgr = self.subagent_manager.as_mut()?;
3010                let (task_id, _) =
3011                    match mgr.resume(&id, &prompt, provider, tool_executor, skills, &cfg) {
3012                        Ok(pair) => pair,
3013                        Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
3014                    };
3015                let short = task_id[..8.min(task_id.len())].to_owned();
3016                let _ = self
3017                    .channel
3018                    .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
3019                    .await;
3020                // Poll until the sub-agent reaches a terminal state (same as Spawn).
3021                let result = loop {
3022                    tokio::time::sleep(std::time::Duration::from_millis(500)).await;
3023
3024                    #[allow(clippy::redundant_closure_for_method_calls)]
3025                    let pending = self
3026                        .subagent_manager
3027                        .as_mut()
3028                        .and_then(|m| m.try_recv_secret_request());
3029                    if let Some((req_task_id, req)) = pending {
3030                        let confirm_prompt = format!(
3031                            "Sub-agent requests secret '{}'. Allow?",
3032                            crate::text::truncate_to_chars(&req.secret_key, 100)
3033                        );
3034                        let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
3035                        if let Some(mgr) = self.subagent_manager.as_mut() {
3036                            if approved {
3037                                let ttl = std::time::Duration::from_secs(300);
3038                                let key = req.secret_key.clone();
3039                                if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
3040                                    let _ = mgr.deliver_secret(&req_task_id, key);
3041                                }
3042                            } else {
3043                                let _ = mgr.deny_secret(&req_task_id);
3044                            }
3045                        }
3046                    }
3047
3048                    let mgr = self.subagent_manager.as_ref()?;
3049                    let statuses = mgr.statuses();
3050                    let Some((_, status)) = statuses.iter().find(|(tid, _)| tid == &task_id) else {
3051                        break "Sub-agent resume completed (no status available).".to_owned();
3052                    };
3053                    match status.state {
3054                        SubAgentState::Completed => {
3055                            let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
3056                            break format!("Resumed sub-agent completed: {msg}");
3057                        }
3058                        SubAgentState::Failed => {
3059                            let msg = status
3060                                .last_message
3061                                .clone()
3062                                .unwrap_or_else(|| "unknown error".into());
3063                            break format!("Resumed sub-agent failed: {msg}");
3064                        }
3065                        SubAgentState::Canceled => {
3066                            break "Resumed sub-agent was cancelled.".to_owned();
3067                        }
3068                        _ => {
3069                            let _ = self
3070                                .channel
3071                                .send_status(&format!(
3072                                    "resumed sub-agent: turn {}/{}",
3073                                    status.turns_used,
3074                                    self.subagent_manager
3075                                        .as_ref()
3076                                        .and_then(|m| m.agents_def(&task_id))
3077                                        .map_or(20, |d| d.permissions.max_turns)
3078                                ))
3079                                .await;
3080                        }
3081                    }
3082                };
3083                Some(result)
3084            }
3085        }
3086    }
3087
3088    fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
3089        let mgr = self.subagent_manager.as_ref()?;
3090        let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
3091        let reg = self
3092            .skill_state
3093            .registry
3094            .read()
3095            .expect("registry read lock");
3096        match crate::subagent::filter_skills(&reg, &def.skills) {
3097            Ok(skills) => {
3098                let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
3099                if bodies.is_empty() {
3100                    None
3101                } else {
3102                    Some(bodies)
3103                }
3104            }
3105            Err(e) => {
3106                tracing::warn!(error = %e, "skill filtering failed for sub-agent");
3107                None
3108            }
3109        }
3110    }
3111
3112    #[allow(clippy::too_many_lines)]
3113    async fn reload_skills(&mut self) {
3114        let new_registry = SkillRegistry::load(&self.skill_state.skill_paths);
3115        if new_registry.fingerprint()
3116            == self
3117                .skill_state
3118                .registry
3119                .read()
3120                .expect("registry read lock")
3121                .fingerprint()
3122        {
3123            return;
3124        }
3125        let _ = self.channel.send_status("reloading skills...").await;
3126        *self
3127            .skill_state
3128            .registry
3129            .write()
3130            .expect("registry write lock") = new_registry;
3131
3132        let all_meta = self
3133            .skill_state
3134            .registry
3135            .read()
3136            .expect("registry read lock")
3137            .all_meta()
3138            .into_iter()
3139            .cloned()
3140            .collect::<Vec<_>>();
3141
3142        // Update trust DB records for reloaded skills.
3143        if let Some(ref memory) = self.memory_state.memory {
3144            let trust_cfg = self.skill_state.trust_config.clone();
3145            let managed_dir = self.skill_state.managed_dir.clone();
3146            for meta in &all_meta {
3147                let source_kind = if managed_dir
3148                    .as_ref()
3149                    .is_some_and(|d| meta.skill_dir.starts_with(d))
3150                {
3151                    zeph_memory::sqlite::SourceKind::Hub
3152                } else {
3153                    zeph_memory::sqlite::SourceKind::Local
3154                };
3155                let initial_level = if matches!(source_kind, zeph_memory::sqlite::SourceKind::Hub) {
3156                    &trust_cfg.default_level
3157                } else {
3158                    &trust_cfg.local_level
3159                };
3160                match zeph_skills::compute_skill_hash(&meta.skill_dir) {
3161                    Ok(current_hash) => {
3162                        let existing = memory
3163                            .sqlite()
3164                            .load_skill_trust(&meta.name)
3165                            .await
3166                            .ok()
3167                            .flatten();
3168                        let trust_level_str = if let Some(ref row) = existing {
3169                            if row.blake3_hash == current_hash {
3170                                row.trust_level.clone()
3171                            } else {
3172                                trust_cfg.hash_mismatch_level.to_string()
3173                            }
3174                        } else {
3175                            initial_level.to_string()
3176                        };
3177                        let source_path = meta.skill_dir.to_str();
3178                        if let Err(e) = memory
3179                            .sqlite()
3180                            .upsert_skill_trust(
3181                                &meta.name,
3182                                &trust_level_str,
3183                                source_kind,
3184                                None,
3185                                source_path,
3186                                &current_hash,
3187                            )
3188                            .await
3189                        {
3190                            tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
3191                        }
3192                    }
3193                    Err(e) => {
3194                        tracing::warn!("failed to compute hash for '{}': {e:#}", meta.name);
3195                    }
3196                }
3197            }
3198        }
3199
3200        let all_meta = all_meta.iter().collect::<Vec<_>>();
3201        let provider = self.provider.clone();
3202        let embed_fn = |text: &str| -> zeph_skills::matcher::EmbedFuture {
3203            let owned = text.to_owned();
3204            let p = provider.clone();
3205            Box::pin(async move { p.embed(&owned).await })
3206        };
3207
3208        let needs_inmemory_rebuild = !self
3209            .skill_state
3210            .matcher
3211            .as_ref()
3212            .is_some_and(SkillMatcherBackend::is_qdrant);
3213
3214        if needs_inmemory_rebuild {
3215            self.skill_state.matcher = SkillMatcher::new(&all_meta, embed_fn)
3216                .await
3217                .map(SkillMatcherBackend::InMemory);
3218        } else if let Some(ref mut backend) = self.skill_state.matcher {
3219            let _ = self.channel.send_status("syncing skill index...").await;
3220            if let Err(e) = backend
3221                .sync(&all_meta, &self.skill_state.embedding_model, embed_fn)
3222                .await
3223            {
3224                tracing::warn!("failed to sync skill embeddings: {e:#}");
3225            }
3226        }
3227
3228        if self.skill_state.hybrid_search {
3229            let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
3230            let _ = self.channel.send_status("rebuilding search index...").await;
3231            self.skill_state.bm25_index = Some(zeph_skills::bm25::Bm25Index::build(&descs));
3232        }
3233
3234        let all_skills: Vec<Skill> = {
3235            let reg = self
3236                .skill_state
3237                .registry
3238                .read()
3239                .expect("registry read lock");
3240            reg.all_meta()
3241                .iter()
3242                .filter_map(|m| reg.get_skill(&m.name).ok())
3243                .collect()
3244        };
3245        let trust_map = self.build_skill_trust_map().await;
3246        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
3247        let skills_prompt = format_skills_prompt(&all_skills, &trust_map, &empty_health);
3248        self.skill_state
3249            .last_skills_prompt
3250            .clone_from(&skills_prompt);
3251        let system_prompt = build_system_prompt(&skills_prompt, None, None, false);
3252        if let Some(msg) = self.messages.first_mut() {
3253            msg.content = system_prompt;
3254        }
3255
3256        let _ = self.channel.send_status("").await;
3257        tracing::info!(
3258            "reloaded {} skill(s)",
3259            self.skill_state
3260                .registry
3261                .read()
3262                .expect("registry read lock")
3263                .all_meta()
3264                .len()
3265        );
3266    }
3267
3268    fn reload_instructions(&mut self) {
3269        // Drain any additional queued events before reloading to avoid redundant reloads.
3270        if let Some(ref mut rx) = self.instruction_reload_rx {
3271            while rx.try_recv().is_ok() {}
3272        }
3273        let Some(ref state) = self.instruction_reload_state else {
3274            return;
3275        };
3276        let new_blocks = crate::instructions::load_instructions(
3277            &state.base_dir,
3278            &state.provider_kinds,
3279            &state.explicit_files,
3280            state.auto_detect,
3281        );
3282        let old_sources: std::collections::HashSet<_> =
3283            self.instruction_blocks.iter().map(|b| &b.source).collect();
3284        let new_sources: std::collections::HashSet<_> =
3285            new_blocks.iter().map(|b| &b.source).collect();
3286        for added in new_sources.difference(&old_sources) {
3287            tracing::info!(path = %added.display(), "instruction file added");
3288        }
3289        for removed in old_sources.difference(&new_sources) {
3290            tracing::info!(path = %removed.display(), "instruction file removed");
3291        }
3292        tracing::info!(
3293            old_count = self.instruction_blocks.len(),
3294            new_count = new_blocks.len(),
3295            "reloaded instruction files"
3296        );
3297        self.instruction_blocks = new_blocks;
3298    }
3299
3300    fn reload_config(&mut self) {
3301        let Some(ref path) = self.config_path else {
3302            return;
3303        };
3304        let config = match Config::load(path) {
3305            Ok(c) => c,
3306            Err(e) => {
3307                tracing::warn!("config reload failed: {e:#}");
3308                return;
3309            }
3310        };
3311
3312        self.runtime.security = config.security;
3313        self.runtime.timeouts = config.timeouts;
3314        self.runtime.redact_credentials = config.memory.redact_credentials;
3315        self.memory_state.history_limit = config.memory.history_limit;
3316        self.memory_state.recall_limit = config.memory.semantic.recall_limit;
3317        self.memory_state.summarization_threshold = config.memory.summarization_threshold;
3318        self.skill_state.max_active_skills = config.skills.max_active_skills;
3319        self.skill_state.disambiguation_threshold = config.skills.disambiguation_threshold;
3320        self.skill_state.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
3321        self.skill_state.hybrid_search = config.skills.hybrid_search;
3322
3323        if config.memory.context_budget_tokens > 0 {
3324            self.context_manager.budget = Some(
3325                ContextBudget::new(config.memory.context_budget_tokens, 0.20)
3326                    .with_graph_enabled(config.memory.graph.enabled),
3327            );
3328        } else {
3329            self.context_manager.budget = None;
3330        }
3331
3332        {
3333            self.memory_state.graph_config = config.memory.graph.clone();
3334        }
3335        self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
3336        self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
3337        self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
3338        self.context_manager.compaction_cooldown_turns = config.memory.compaction_cooldown_turns;
3339        self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
3340        self.context_manager.compression = config.memory.compression.clone();
3341        self.context_manager.routing = config.memory.routing.clone();
3342        self.memory_state.cross_session_score_threshold =
3343            config.memory.cross_session_score_threshold;
3344
3345        self.index.repo_map_tokens = config.index.repo_map_tokens;
3346        self.index.repo_map_ttl = std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
3347
3348        tracing::info!("config reloaded");
3349    }
3350}
3351pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
3352    while !*rx.borrow_and_update() {
3353        if rx.changed().await.is_err() {
3354            std::future::pending::<()>().await;
3355        }
3356    }
3357}
3358
3359pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
3360    match rx {
3361        Some(inner) => {
3362            if let Some(v) = inner.recv().await {
3363                Some(v)
3364            } else {
3365                *rx = None;
3366                std::future::pending().await
3367            }
3368        }
3369        None => std::future::pending().await,
3370    }
3371}
3372
3373#[cfg(test)]
3374mod tests;
3375
3376#[cfg(test)]
3377pub(crate) use tests::agent_tests;