Skip to main content

zeph_core/agent/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4mod accessors;
5mod builder;
6pub(crate) mod compaction_strategy;
7#[cfg(feature = "compression-guidelines")]
8pub(super) mod compression_feedback;
9mod context;
10pub(crate) mod context_manager;
11pub mod error;
12#[cfg(feature = "experiments")]
13mod experiment_cmd;
14pub(super) mod feedback_detector;
15pub(crate) mod focus;
16mod graph_commands;
17#[cfg(feature = "compression-guidelines")]
18mod guidelines_commands;
19mod index;
20mod learning;
21pub(crate) mod learning_engine;
22mod log_commands;
23#[cfg(feature = "lsp-context")]
24mod lsp_commands;
25mod mcp;
26mod memory_commands;
27mod message_queue;
28mod persistence;
29#[cfg(feature = "policy-enforcer")]
30mod policy_commands;
31mod provider_cmd;
32pub(crate) mod rate_limiter;
33#[cfg(feature = "scheduler")]
34mod scheduler_commands;
35pub mod session_config;
36mod session_digest;
37pub(crate) mod sidequest;
38mod skill_management;
39pub mod slash_commands;
40pub(crate) mod state;
41pub(crate) mod tool_execution;
42pub(crate) mod tool_orchestrator;
43mod trust_commands;
44mod utils;
45
46use std::collections::{HashMap, HashSet, VecDeque};
47use std::sync::Arc;
48use std::time::Instant;
49
50use tokio::sync::{Notify, mpsc, watch};
51use tokio_util::sync::CancellationToken;
52use zeph_llm::any::AnyProvider;
53use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
54use zeph_memory::TokenCounter;
55use zeph_memory::semantic::SemanticMemory;
56use zeph_skills::loader::Skill;
57use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
58use zeph_skills::prompt::format_skills_prompt;
59use zeph_skills::registry::SkillRegistry;
60use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
61
62use crate::channel::Channel;
63use crate::config::Config;
64use crate::config::{SecurityConfig, SkillPromptMode, TimeoutConfig};
65use crate::context::{
66    ContextBudget, EnvironmentContext, build_system_prompt, build_system_prompt_with_instructions,
67};
68use zeph_sanitizer::ContentSanitizer;
69
70use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, detect_image_mime};
71#[cfg(feature = "context-compression")]
72use state::CompressionState;
73use state::{
74    DebugState, ExperimentState, FeedbackState, IndexState, InstructionState, LifecycleState,
75    McpState, MemoryState, MessageState, MetricsState, OrchestrationState, ProviderState,
76    RuntimeConfig, SecurityState, SessionState, SkillState,
77};
78
79pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
80pub(crate) const DOCUMENT_RAG_PREFIX: &str = "## Relevant documents\n";
81pub(crate) const RECALL_PREFIX: &str = "[semantic recall]\n";
82pub(crate) const CODE_CONTEXT_PREFIX: &str = "[code context]\n";
83pub(crate) const SUMMARY_PREFIX: &str = "[conversation summaries]\n";
84pub(crate) const CROSS_SESSION_PREFIX: &str = "[cross-session context]\n";
85pub(crate) const CORRECTIONS_PREFIX: &str = "[past corrections]\n";
86pub(crate) const GRAPH_FACTS_PREFIX: &str = "[known facts]\n";
87pub(crate) const SCHEDULED_TASK_PREFIX: &str = "Execute the following scheduled task now: ";
88pub(crate) const SESSION_DIGEST_PREFIX: &str = "[Session digest from previous interaction]\n";
89/// Prefix used for LSP context messages (`Role::System`) injected into message history.
90/// The tool-pair summarizer targets User/Assistant pairs and skips System messages,
91/// so these notes are never accidentally summarized. `remove_lsp_messages` uses this
92/// prefix to clear stale notes before each fresh injection.
93#[cfg(feature = "lsp-context")]
94pub(crate) const LSP_NOTE_PREFIX: &str = "[lsp ";
95pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
96
97fn format_plan_summary(graph: &crate::orchestration::TaskGraph) -> String {
98    use std::fmt::Write;
99    let mut out = String::new();
100    let _ = writeln!(out, "Plan: \"{}\"", graph.goal);
101    let _ = writeln!(out, "Tasks: {}", graph.tasks.len());
102    let _ = writeln!(out);
103    for (i, task) in graph.tasks.iter().enumerate() {
104        let deps = if task.depends_on.is_empty() {
105            String::new()
106        } else {
107            let ids: Vec<String> = task.depends_on.iter().map(ToString::to_string).collect();
108            format!(" (after: {})", ids.join(", "))
109        };
110        let agent = task.agent_hint.as_deref().unwrap_or("-");
111        let _ = writeln!(out, "  {}. [{}] {}{}", i + 1, agent, task.title, deps);
112    }
113    out
114}
115
116/// Concatenate completed task outputs into a single string truncated to `max_tokens * 4` chars.
117///
118/// Logs a warning when truncation occurs (C3). Returns an empty string when no completed
119/// tasks have results.
120fn collect_and_truncate_task_outputs(
121    graph: &crate::orchestration::TaskGraph,
122    max_tokens: u32,
123) -> String {
124    use crate::orchestration::TaskStatus;
125
126    let char_budget = max_tokens as usize * 4;
127    let mut raw = String::new();
128    for task in &graph.tasks {
129        if task.status == TaskStatus::Completed
130            && let Some(ref result) = task.result
131        {
132            if !raw.is_empty() {
133                raw.push('\n');
134            }
135            raw.push_str(&result.output);
136        }
137    }
138    if raw.len() > char_budget {
139        tracing::warn!(
140            original_len = raw.len(),
141            truncated_to = char_budget,
142            "whole-plan verify: output truncated to verify_max_tokens * 4 chars"
143        );
144        raw.chars().take(char_budget).collect()
145    } else {
146        raw
147    }
148}
149
150pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
151    use std::fmt::Write;
152    let capacity = "[tool output: ".len()
153        + tool_name.len()
154        + "]\n```\n".len()
155        + body.len()
156        + TOOL_OUTPUT_SUFFIX.len();
157    let mut buf = String::with_capacity(capacity);
158    let _ = write!(
159        buf,
160        "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
161    );
162    buf
163}
164
165pub struct Agent<C: Channel> {
166    provider: AnyProvider,
167    /// Dedicated embedding provider. Resolved once at bootstrap from `[[llm.providers]]`
168    /// (the entry with `embed = true`, or first entry with `embedding_model` set).
169    /// Falls back to `provider.clone()` when no dedicated entry exists.
170    /// **Never replaced** by `/provider switch`.
171    embedding_provider: AnyProvider,
172    channel: C,
173    pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
174    pub(super) msg: MessageState,
175    pub(super) memory_state: MemoryState,
176    pub(super) skill_state: SkillState,
177    pub(super) context_manager: context_manager::ContextManager,
178    pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
179    pub(super) learning_engine: learning_engine::LearningEngine,
180    pub(super) feedback: FeedbackState,
181    pub(super) runtime: RuntimeConfig,
182    pub(super) mcp: McpState,
183    pub(super) index: IndexState,
184    pub(super) session: SessionState,
185    pub(super) debug_state: DebugState,
186    pub(super) instructions: InstructionState,
187    pub(super) security: SecurityState,
188    pub(super) experiments: ExperimentState,
189    #[cfg(feature = "context-compression")]
190    pub(super) compression: CompressionState,
191    pub(super) lifecycle: LifecycleState,
192    pub(super) providers: ProviderState,
193    pub(super) metrics: MetricsState,
194    pub(super) orchestration: OrchestrationState,
195    /// Focus agent state: active session tracking, knowledge block, reminder counters (#1850).
196    pub(super) focus: focus::FocusState,
197    /// `SideQuest` state: cursor tracking, turn counter, eviction stats (#1885).
198    pub(super) sidequest: sidequest::SidequestState,
199    /// Dynamic tool schema filter: pre-computed tool embeddings for per-turn filtering (#2020).
200    pub(super) tool_schema_filter: Option<zeph_tools::ToolSchemaFilter>,
201    /// Cached filtered tool IDs for the current user turn. Set by `compute_filtered_tool_ids()`
202    /// in `rebuild_system_prompt()`, consumed by the native tool loop on iteration 0.
203    pub(super) cached_filtered_tool_ids: Option<HashSet<String>>,
204    /// Tool dependency graph for sequential tool availability (issue #2024).
205    /// Built once from config, applied per-turn after tool schema filtering.
206    pub(super) dependency_graph: Option<zeph_tools::ToolDependencyGraph>,
207    /// Always-on tool IDs, mirrored from the tool schema filter for dependency gate bypass.
208    pub(super) dependency_always_on: HashSet<String>,
209    /// Tool IDs that completed successfully in the current session.
210    /// Grows monotonically per session; cleared on `/clear`.
211    /// NOTE: bounded by session length, typically < 1000 entries.
212    pub(super) completed_tool_ids: HashSet<String>,
213    /// DB row ID of the most recently persisted message. Set by `persist_message`;
214    /// consumed by `push_message` call sites to populate `metadata.db_id` on in-memory messages.
215    pub(super) last_persisted_message_id: Option<i64>,
216    /// DB message IDs pending hide after deferred tool pair summarization.
217    pub(super) deferred_db_hide_ids: Vec<i64>,
218    /// Summary texts pending insertion after deferred tool pair summarization.
219    pub(super) deferred_db_summaries: Vec<String>,
220    /// Runtime middleware layers for LLM calls and tool dispatch (#2286).
221    ///
222    /// Default: empty vec (zero-cost — loops never iterate).
223    pub(super) runtime_layers: Vec<std::sync::Arc<dyn crate::runtime_layer::RuntimeLayer>>,
224}
225
226impl<C: Channel> Agent<C> {
227    #[must_use]
228    pub fn new(
229        provider: AnyProvider,
230        channel: C,
231        registry: SkillRegistry,
232        matcher: Option<SkillMatcherBackend>,
233        max_active_skills: usize,
234        tool_executor: impl ToolExecutor + 'static,
235    ) -> Self {
236        let registry = std::sync::Arc::new(std::sync::RwLock::new(registry));
237        Self::new_with_registry_arc(
238            provider,
239            channel,
240            registry,
241            matcher,
242            max_active_skills,
243            tool_executor,
244        )
245    }
246
247    /// Create an agent from a pre-wrapped registry Arc, allowing the caller to
248    /// share the same Arc with other components (e.g. [`crate::SkillLoaderExecutor`]).
249    ///
250    /// # Panics
251    ///
252    /// Panics if the registry `RwLock` is poisoned.
253    #[must_use]
254    #[allow(clippy::too_many_lines)] // flat struct literal initializing all Agent sub-structs — one field per sub-struct, cannot be split further
255    pub fn new_with_registry_arc(
256        provider: AnyProvider,
257        channel: C,
258        registry: std::sync::Arc<std::sync::RwLock<SkillRegistry>>,
259        matcher: Option<SkillMatcherBackend>,
260        max_active_skills: usize,
261        tool_executor: impl ToolExecutor + 'static,
262    ) -> Self {
263        debug_assert!(max_active_skills > 0, "max_active_skills must be > 0");
264        let all_skills: Vec<Skill> = {
265            let reg = registry.read().expect("registry read lock poisoned");
266            reg.all_meta()
267                .iter()
268                .filter_map(|m| reg.get_skill(&m.name).ok())
269                .collect()
270        };
271        let empty_trust = HashMap::new();
272        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
273        let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
274        let system_prompt = build_system_prompt(&skills_prompt, None, None, false);
275        tracing::debug!(len = system_prompt.len(), "initial system prompt built");
276        tracing::trace!(prompt = %system_prompt, "full system prompt");
277
278        let initial_prompt_tokens = u64::try_from(system_prompt.len()).unwrap_or(0) / 4;
279        let (_tx, rx) = watch::channel(false);
280        let token_counter = Arc::new(TokenCounter::new());
281        // Always create the receiver side of the experiment notification channel so the
282        // select! branch in the agent loop compiles unconditionally. The sender is only
283        // stored when the experiments feature is enabled (it is only used in experiment_cmd.rs).
284        #[cfg(feature = "experiments")]
285        let (exp_notify_tx, exp_notify_rx) = tokio::sync::mpsc::channel::<String>(4);
286        #[cfg(not(feature = "experiments"))]
287        let (_exp_notify_tx, exp_notify_rx) = tokio::sync::mpsc::channel::<String>(4);
288        let embedding_provider = provider.clone();
289        Self {
290            provider,
291            embedding_provider,
292            channel,
293            tool_executor: Arc::new(tool_executor),
294            msg: MessageState {
295                messages: vec![Message {
296                    role: Role::System,
297                    content: system_prompt,
298                    parts: vec![],
299                    metadata: MessageMetadata::default(),
300                }],
301                message_queue: VecDeque::new(),
302                pending_image_parts: Vec::new(),
303            },
304            memory_state: MemoryState {
305                memory: None,
306                conversation_id: None,
307                history_limit: 50,
308                recall_limit: 5,
309                summarization_threshold: 50,
310                cross_session_score_threshold: 0.35,
311                autosave_assistant: false,
312                autosave_min_length: 20,
313                tool_call_cutoff: 6,
314                unsummarized_count: 0,
315                document_config: crate::config::DocumentConfig::default(),
316                graph_config: crate::config::GraphConfig::default(),
317                compression_guidelines_config: zeph_memory::CompressionGuidelinesConfig::default(),
318                shutdown_summary: true,
319                shutdown_summary_min_messages: 4,
320                shutdown_summary_max_messages: 20,
321                shutdown_summary_timeout_secs: 10,
322                structured_summaries: false,
323                last_recall_confidence: None,
324                digest_config: crate::config::DigestConfig::default(),
325                cached_session_digest: None,
326                context_strategy: crate::config::ContextStrategy::default(),
327                crossover_turn_threshold: 20,
328                rpe_router: None,
329                goal_text: None,
330            },
331            skill_state: SkillState {
332                registry,
333                skill_paths: Vec::new(),
334                managed_dir: None,
335                trust_config: crate::config::TrustConfig::default(),
336                matcher,
337                max_active_skills,
338                disambiguation_threshold: 0.20,
339                min_injection_score: 0.20,
340                embedding_model: String::new(),
341                skill_reload_rx: None,
342                active_skill_names: Vec::new(),
343                last_skills_prompt: skills_prompt,
344                prompt_mode: SkillPromptMode::Auto,
345                available_custom_secrets: HashMap::new(),
346                cosine_weight: 0.7,
347                hybrid_search: false,
348                bm25_index: None,
349                two_stage_matching: false,
350                confusability_threshold: 0.0,
351            },
352            context_manager: context_manager::ContextManager::new(),
353            tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
354            learning_engine: learning_engine::LearningEngine::new(),
355            feedback: FeedbackState {
356                detector: feedback_detector::FeedbackDetector::new(0.6),
357                judge: None,
358                llm_classifier: None,
359            },
360            debug_state: DebugState {
361                debug_dumper: None,
362                dump_format: crate::debug_dump::DumpFormat::default(),
363                trace_collector: None,
364                iteration_counter: 0,
365                anomaly_detector: None,
366                reasoning_model_warning: true,
367                logging_config: crate::config::LoggingConfig::default(),
368                dump_dir: None,
369                trace_service_name: String::new(),
370                trace_redact: true,
371                current_iteration_span_id: None,
372            },
373            runtime: RuntimeConfig {
374                security: SecurityConfig::default(),
375                timeouts: TimeoutConfig::default(),
376                model_name: String::new(),
377                active_provider_name: String::new(),
378                permission_policy: zeph_tools::PermissionPolicy::default(),
379                redact_credentials: true,
380                rate_limiter: rate_limiter::ToolRateLimiter::new(
381                    rate_limiter::RateLimitConfig::default(),
382                ),
383                semantic_cache_enabled: false,
384                semantic_cache_threshold: 0.95,
385                semantic_cache_max_candidates: 10,
386                dependency_config: zeph_tools::DependencyConfig::default(),
387                #[cfg(feature = "policy-enforcer")]
388                adversarial_policy_info: None,
389            },
390            mcp: McpState {
391                tools: Vec::new(),
392                registry: None,
393                manager: None,
394                allowed_commands: Vec::new(),
395                max_dynamic: 10,
396                elicitation_rx: None,
397                shared_tools: None,
398                tool_rx: None,
399                server_outcomes: Vec::new(),
400                pruning_cache: zeph_mcp::PruningCache::new(),
401                pruning_provider: None,
402                pruning_enabled: false,
403                pruning_params: zeph_mcp::PruningParams::default(),
404                semantic_index: None,
405                discovery_strategy: zeph_mcp::ToolDiscoveryStrategy::default(),
406                discovery_params: zeph_mcp::DiscoveryParams::default(),
407                discovery_provider: None,
408                elicitation_warn_sensitive_fields: true,
409            },
410            index: IndexState {
411                retriever: None,
412                repo_map_tokens: 0,
413                cached_repo_map: None,
414                repo_map_ttl: std::time::Duration::from_secs(300),
415            },
416            session: SessionState {
417                env_context: EnvironmentContext::gather(""),
418                response_cache: None,
419                parent_tool_use_id: None,
420                status_tx: None,
421                #[cfg(feature = "lsp-context")]
422                lsp_hooks: None,
423                #[cfg(feature = "policy-enforcer")]
424                policy_config: None,
425                hooks_config: state::HooksConfigSnapshot::default(),
426            },
427            instructions: InstructionState {
428                blocks: Vec::new(),
429                reload_rx: None,
430                reload_state: None,
431            },
432            security: SecurityState {
433                sanitizer: ContentSanitizer::new(&zeph_sanitizer::ContentIsolationConfig::default()),
434                quarantine_summarizer: None,
435                is_acp_session: false,
436                exfiltration_guard: zeph_sanitizer::exfiltration::ExfiltrationGuard::new(
437                    zeph_sanitizer::exfiltration::ExfiltrationGuardConfig::default(),
438                ),
439                flagged_urls: std::collections::HashSet::new(),
440                user_provided_urls: std::sync::Arc::new(std::sync::RwLock::new(
441                    std::collections::HashSet::new(),
442                )),
443                pii_filter: zeph_sanitizer::pii::PiiFilter::new(
444                    zeph_sanitizer::pii::PiiFilterConfig::default(),
445                ),
446                #[cfg(feature = "classifiers")]
447                pii_ner_backend: None,
448                #[cfg(feature = "classifiers")]
449                pii_ner_timeout_ms: 5000,
450                #[cfg(feature = "classifiers")]
451                pii_ner_max_chars: 8192,
452                memory_validator: zeph_sanitizer::memory_validation::MemoryWriteValidator::new(
453                    zeph_sanitizer::memory_validation::MemoryWriteValidationConfig::default(),
454                ),
455                #[cfg(feature = "guardrail")]
456                guardrail: None,
457                response_verifier: zeph_sanitizer::response_verifier::ResponseVerifier::new(
458                    zeph_config::ResponseVerificationConfig::default(),
459                ),
460                causal_analyzer: None,
461            },
462            experiments: ExperimentState {
463                #[cfg(feature = "experiments")]
464                config: crate::config::ExperimentConfig::default(),
465                #[cfg(feature = "experiments")]
466                cancel: None,
467                #[cfg(feature = "experiments")]
468                baseline: crate::experiments::ConfigSnapshot::default(),
469                #[cfg(feature = "experiments")]
470                eval_provider: None,
471                notify_rx: Some(exp_notify_rx),
472                #[cfg(feature = "experiments")]
473                notify_tx: exp_notify_tx,
474            },
475            #[cfg(feature = "context-compression")]
476            compression: CompressionState {
477                current_task_goal: None,
478                task_goal_user_msg_hash: None,
479                pending_task_goal: None,
480                pending_sidequest_result: None,
481                subgoal_registry: crate::agent::compaction_strategy::SubgoalRegistry::default(),
482                pending_subgoal: None,
483                subgoal_user_msg_hash: None,
484            },
485            lifecycle: LifecycleState {
486                shutdown: rx,
487                start_time: Instant::now(),
488                cancel_signal: Arc::new(Notify::new()),
489                cancel_token: CancellationToken::new(),
490                config_path: None,
491                config_reload_rx: None,
492                warmup_ready: None,
493                update_notify_rx: None,
494                custom_task_rx: None,
495                last_known_cwd: std::env::current_dir().unwrap_or_default(),
496                file_changed_rx: None,
497                file_watcher: None,
498            },
499            providers: ProviderState {
500                summary_provider: None,
501                provider_override: None,
502                judge_provider: None,
503                probe_provider: None,
504                #[cfg(feature = "context-compression")]
505                compress_provider: None,
506                cached_prompt_tokens: initial_prompt_tokens,
507                server_compaction_active: false,
508                stt: None,
509                provider_pool: Vec::new(),
510                provider_config_snapshot: None,
511            },
512            metrics: MetricsState {
513                metrics_tx: None,
514                cost_tracker: None,
515                token_counter,
516                extended_context: false,
517                classifier_metrics: None,
518            },
519            orchestration: OrchestrationState {
520                planner_provider: None,
521                verify_provider: None,
522                pending_graph: None,
523                plan_cancel_token: None,
524                subagent_manager: None,
525                subagent_config: crate::config::SubAgentConfig::default(),
526                orchestration_config: crate::config::OrchestrationConfig::default(),
527                plan_cache: None,
528                pending_goal_embedding: None,
529            },
530            focus: focus::FocusState::default(),
531            sidequest: sidequest::SidequestState::default(),
532            tool_schema_filter: None,
533            cached_filtered_tool_ids: None,
534            dependency_graph: None,
535            dependency_always_on: HashSet::new(),
536            completed_tool_ids: HashSet::new(),
537            last_persisted_message_id: None,
538            deferred_db_hide_ids: Vec::new(),
539            deferred_db_summaries: Vec::new(),
540            runtime_layers: Vec::new(),
541        }
542    }
543
544    /// Poll all active sub-agents for completed/failed/canceled results.
545    ///
546    /// Non-blocking: returns immediately with a list of `(task_id, result)` pairs
547    /// for agents that have finished. Each completed agent is removed from the manager.
548    pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
549        let Some(mgr) = &mut self.orchestration.subagent_manager else {
550            return vec![];
551        };
552
553        let finished: Vec<String> = mgr
554            .statuses()
555            .into_iter()
556            .filter_map(|(id, status)| {
557                if matches!(
558                    status.state,
559                    crate::subagent::SubAgentState::Completed
560                        | crate::subagent::SubAgentState::Failed
561                        | crate::subagent::SubAgentState::Canceled
562                ) {
563                    Some(id)
564                } else {
565                    None
566                }
567            })
568            .collect();
569
570        let mut results = vec![];
571        for task_id in finished {
572            match mgr.collect(&task_id).await {
573                Ok(result) => results.push((task_id, result)),
574                Err(e) => {
575                    tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
576                }
577            }
578        }
579        results
580    }
581
582    async fn handle_plan_command(
583        &mut self,
584        cmd: crate::orchestration::PlanCommand,
585    ) -> Result<(), error::AgentError> {
586        use crate::orchestration::PlanCommand;
587
588        if !self.config_for_orchestration().enabled {
589            self.channel
590                .send(
591                    "Task orchestration is disabled. Set `orchestration.enabled = true` in config.",
592                )
593                .await?;
594            return Ok(());
595        }
596
597        match cmd {
598            PlanCommand::Goal(goal) => self.handle_plan_goal(&goal).await,
599            PlanCommand::Confirm => self.handle_plan_confirm().await,
600            PlanCommand::Status(id) => self.handle_plan_status(id.as_deref()).await,
601            PlanCommand::List => self.handle_plan_list().await,
602            PlanCommand::Cancel(id) => self.handle_plan_cancel(id.as_deref()).await,
603            PlanCommand::Resume(id) => self.handle_plan_resume(id.as_deref()).await,
604            PlanCommand::Retry(id) => self.handle_plan_retry(id.as_deref()).await,
605        }
606    }
607
608    fn config_for_orchestration(&self) -> &crate::config::OrchestrationConfig {
609        &self.orchestration.orchestration_config
610    }
611
612    /// Lazily initialize `OrchestrationState::plan_cache` on the first `/plan` call.
613    ///
614    /// No-op when the cache is already initialized, disabled in config, or memory is unavailable.
615    async fn init_plan_cache_if_needed(&mut self) {
616        let plan_cache_config = self.orchestration.orchestration_config.plan_cache.clone();
617        if !plan_cache_config.enabled || self.orchestration.plan_cache.is_some() {
618            return;
619        }
620        if let Some(ref memory) = self.memory_state.memory {
621            let pool = memory.sqlite().pool().clone();
622            let embed_model = self.skill_state.embedding_model.clone();
623            match crate::orchestration::PlanCache::new(pool, plan_cache_config, &embed_model).await
624            {
625                Ok(cache) => self.orchestration.plan_cache = Some(cache),
626                Err(e) => {
627                    tracing::warn!(error = %e, "plan cache: init failed, proceeding without cache");
628                }
629            }
630        } else {
631            tracing::warn!("plan cache: memory not configured, proceeding without cache");
632        }
633    }
634
635    /// Compute a normalized goal embedding for plan cache lookups (best-effort).
636    ///
637    /// Returns `None` when the cache is disabled, the provider does not support embeddings,
638    /// or the embedding call fails.
639    async fn goal_embedding_for_cache(&self, goal: &str) -> Option<Vec<f32>> {
640        use crate::orchestration::normalize_goal;
641
642        self.orchestration.plan_cache.as_ref()?;
643        let normalized = normalize_goal(goal);
644        match self.embedding_provider.embed(&normalized).await {
645            Ok(emb) => Some(emb),
646            Err(zeph_llm::LlmError::EmbedUnsupported { .. }) => {
647                tracing::debug!(
648                    "plan cache: provider does not support embeddings, skipping cache lookup"
649                );
650                None
651            }
652            Err(e) => {
653                tracing::warn!(error = %e, "plan cache: goal embedding failed, skipping cache");
654                None
655            }
656        }
657    }
658
659    async fn handle_plan_goal(&mut self, goal: &str) -> Result<(), error::AgentError> {
660        use crate::orchestration::{LlmPlanner, plan_with_cache};
661
662        if self.orchestration.pending_graph.is_some() {
663            self.channel
664                .send(
665                    "A plan is already pending confirmation. \
666                     Use /plan confirm to execute it or /plan cancel to discard.",
667                )
668                .await?;
669            return Ok(());
670        }
671
672        self.channel.send("Planning task decomposition...").await?;
673
674        let available_agents = self
675            .orchestration
676            .subagent_manager
677            .as_ref()
678            .map(|m| m.definitions().to_vec())
679            .unwrap_or_default();
680
681        let confirm_before_execute = self
682            .orchestration
683            .orchestration_config
684            .confirm_before_execute;
685
686        self.init_plan_cache_if_needed().await;
687        let goal_embedding = self.goal_embedding_for_cache(goal).await;
688
689        tracing::debug!(
690            cache_enabled = self.orchestration.orchestration_config.plan_cache.enabled,
691            has_embedding = goal_embedding.is_some(),
692            "plan cache state for goal"
693        );
694
695        let planner_provider = self
696            .orchestration
697            .planner_provider
698            .as_ref()
699            .unwrap_or(&self.provider)
700            .clone();
701        let planner = LlmPlanner::new(planner_provider, &self.orchestration.orchestration_config);
702        let embed_model = self.skill_state.embedding_model.clone();
703        let (graph, planner_usage) = plan_with_cache(
704            &planner,
705            self.orchestration.plan_cache.as_ref(),
706            &self.provider,
707            goal_embedding.as_deref(),
708            &embed_model,
709            goal,
710            &available_agents,
711            self.orchestration.orchestration_config.max_tasks,
712        )
713        .await
714        .map_err(|e| error::AgentError::Other(e.to_string()))?;
715
716        // Store embedding for cache_plan() after execution completes.
717        self.orchestration.pending_goal_embedding = goal_embedding;
718
719        let task_count = graph.tasks.len() as u64;
720        let snapshot = crate::metrics::TaskGraphSnapshot::from(&graph);
721        let (planner_prompt, planner_completion) = planner_usage.unwrap_or((0, 0));
722        self.update_metrics(|m| {
723            m.api_calls += 1;
724            m.prompt_tokens += planner_prompt;
725            m.completion_tokens += planner_completion;
726            m.total_tokens = m.prompt_tokens + m.completion_tokens;
727            m.orchestration.plans_total += 1;
728            m.orchestration.tasks_total += task_count;
729            m.orchestration_graph = Some(snapshot);
730        });
731        self.record_cost(planner_prompt, planner_completion);
732        self.record_cache_usage();
733
734        if confirm_before_execute {
735            let summary = format_plan_summary(&graph);
736            self.channel.send(&summary).await?;
737            self.channel
738                .send("Type `/plan confirm` to execute, or `/plan cancel` to abort.")
739                .await?;
740            self.orchestration.pending_graph = Some(graph);
741        } else {
742            // confirm_before_execute = false: display and proceed (Phase 5 will run scheduler).
743            // TODO(#1241): wire DagScheduler tick updates for Running task state
744            let summary = format_plan_summary(&graph);
745            self.channel.send(&summary).await?;
746            self.channel
747                .send("Plan ready. Full execution will be available in a future phase.")
748                .await?;
749            // IC1: graph was shown but never confirmed; clear snapshot so it doesn't linger.
750            let now = std::time::Instant::now();
751            self.update_metrics(|m| {
752                if let Some(ref mut s) = m.orchestration_graph {
753                    "completed".clone_into(&mut s.status);
754                    s.completed_at = Some(now);
755                }
756            });
757            // pending_goal_embedding intentionally not cleared — overwritten on next /plan goal.
758        }
759
760        Ok(())
761    }
762
763    /// Validate that the pending plan graph can be executed.
764    ///
765    /// Sends an appropriate error message and restores the graph to `pending_graph` when
766    /// validation fails. Returns `Ok(graph)` on success, `Err(())` when validation failed
767    /// and the caller should return early.
768    async fn validate_pending_graph(
769        &mut self,
770        graph: crate::orchestration::TaskGraph,
771    ) -> Result<crate::orchestration::TaskGraph, ()> {
772        use crate::orchestration::GraphStatus;
773
774        if self.orchestration.subagent_manager.is_none() {
775            let _ = self
776                .channel
777                .send(
778                    "No sub-agents configured. Add sub-agent definitions to config \
779                     to enable plan execution.",
780                )
781                .await;
782            self.orchestration.pending_graph = Some(graph);
783            return Err(());
784        }
785
786        // REV-2: pre-validate before moving graph into the constructor so we can
787        // restore it to pending_graph on failure.
788        if graph.tasks.is_empty() {
789            let _ = self.channel.send("Plan has no tasks.").await;
790            self.orchestration.pending_graph = Some(graph);
791            return Err(());
792        }
793
794        // resume_from() rejects Completed and Canceled — guard those here too.
795        if matches!(graph.status, GraphStatus::Completed | GraphStatus::Canceled) {
796            let _ = self
797                .channel
798                .send(&format!(
799                    "Cannot re-execute a {} plan. Use `/plan <goal>` to create a new one.",
800                    graph.status
801                ))
802                .await;
803            self.orchestration.pending_graph = Some(graph);
804            return Err(());
805        }
806
807        Ok(graph)
808    }
809
810    /// Build a [`DagScheduler`] from the graph, reserving sub-agent slots.
811    ///
812    /// Returns `(scheduler, reserved)` on success or an `AgentError` on failure.
813    /// Callers must call `mgr.release_reservation(reserved)` when done.
814    fn build_dag_scheduler(
815        &mut self,
816        graph: crate::orchestration::TaskGraph,
817    ) -> Result<(crate::orchestration::DagScheduler, usize), error::AgentError> {
818        use crate::orchestration::{DagScheduler, GraphStatus, RuleBasedRouter};
819
820        let available_agents = self
821            .orchestration
822            .subagent_manager
823            .as_ref()
824            .map(|m| m.definitions().to_vec())
825            .unwrap_or_default();
826
827        // Warn when max_concurrent is too low to support the configured parallelism.
828        // This is the main cause of DagScheduler deadlocks (#1619): a planning-phase
829        // sub-agent occupies the only slot while orchestration tasks are waiting.
830        let max_concurrent = self.orchestration.subagent_config.max_concurrent;
831        let max_parallel = self.orchestration.orchestration_config.max_parallel as usize;
832        if max_concurrent < max_parallel + 1 {
833            tracing::warn!(
834                max_concurrent,
835                max_parallel,
836                "max_concurrent < max_parallel + 1: orchestration tasks may be starved by \
837                 planning-phase sub-agents; recommend setting max_concurrent >= {}",
838                max_parallel + 1
839            );
840        }
841
842        // Reserve slots equal to max_parallel so the scheduler is guaranteed capacity
843        // even if a planning-phase sub-agent is occupying a slot (#1619).
844        let reserved = max_parallel.min(max_concurrent.saturating_sub(1));
845        if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
846            mgr.reserve_slots(reserved);
847        }
848
849        // Use resume_from() for graphs that are no longer in Created status
850        // (e.g., after /plan retry which calls reset_for_retry and sets status=Running).
851        let scheduler = if graph.status == GraphStatus::Created {
852            DagScheduler::new(
853                graph,
854                &self.orchestration.orchestration_config,
855                Box::new(RuleBasedRouter),
856                available_agents,
857            )
858        } else {
859            DagScheduler::resume_from(
860                graph,
861                &self.orchestration.orchestration_config,
862                Box::new(RuleBasedRouter),
863                available_agents,
864            )
865        }
866        .map_err(|e| {
867            // Release reservation before propagating error.
868            if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
869                mgr.release_reservation(reserved);
870            }
871            error::AgentError::Other(e.to_string())
872        })?;
873
874        // Validate verify_provider name against the known provider pool (#2238).
875        let provider_names: Vec<&str> = self
876            .providers
877            .provider_pool
878            .iter()
879            .filter_map(|e| e.name.as_deref())
880            .collect();
881        scheduler
882            .validate_verify_config(&provider_names)
883            .map_err(|e| {
884                if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
885                    mgr.release_reservation(reserved);
886                }
887                error::AgentError::Other(e.to_string())
888            })?;
889
890        Ok((scheduler, reserved))
891    }
892
893    async fn handle_plan_confirm(&mut self) -> Result<(), error::AgentError> {
894        let Some(graph) = self.orchestration.pending_graph.take() else {
895            self.channel
896                .send("No pending plan to confirm. Use `/plan <goal>` to create one.")
897                .await?;
898            return Ok(());
899        };
900
901        // validate_pending_graph sends the error message and restores the graph on failure.
902        let Ok(graph) = self.validate_pending_graph(graph).await else {
903            return Ok(());
904        };
905
906        let (mut scheduler, reserved) = self.build_dag_scheduler(graph)?;
907
908        let task_count = scheduler.graph().tasks.len();
909        self.channel
910            .send(&format!(
911                "Confirmed. Executing plan ({task_count} tasks)..."
912            ))
913            .await?;
914
915        let plan_token = CancellationToken::new();
916        self.orchestration.plan_cancel_token = Some(plan_token.clone());
917
918        // Use match instead of ? so plan_cancel_token is always cleared (CRIT-07).
919        let scheduler_result = self
920            .run_scheduler_loop(&mut scheduler, task_count, plan_token)
921            .await;
922        self.orchestration.plan_cancel_token = None;
923
924        // Always release the reservation, regardless of scheduler outcome.
925        if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
926            mgr.release_reservation(reserved);
927        }
928
929        let final_status = scheduler_result?;
930
931        // Whole-plan verification: after all tasks complete, verify the full plan output
932        // against the original goal and trigger a single replan cycle if gaps are found.
933        // Only runs on Completed graphs — Failed/Canceled graphs skip verification.
934        // Fail-open: any error logs a warn and proceeds to aggregation unchanged.
935        let extra_task_outputs = self
936            .run_whole_plan_verify(&mut scheduler, final_status)
937            .await;
938
939        let mut completed_graph = scheduler.into_graph();
940
941        // Merge partial DAG outputs (from whole-plan replan) into the original graph so the
942        // Aggregator sees both original and gap-filling task results (C2).
943        // IDs are already offset by the original graph size (set in run_whole_plan_verify).
944        if let Some(extra_tasks) = extra_task_outputs {
945            completed_graph.tasks.extend(extra_tasks);
946        }
947
948        // Final TUI snapshot update.
949        let snapshot = crate::metrics::TaskGraphSnapshot::from(&completed_graph);
950        self.update_metrics(|m| {
951            m.orchestration_graph = Some(snapshot);
952        });
953
954        let result_label = self
955            .finalize_plan_execution(completed_graph, final_status)
956            .await?;
957
958        let now = std::time::Instant::now();
959        self.update_metrics(|m| {
960            if let Some(ref mut s) = m.orchestration_graph {
961                result_label.clone_into(&mut s.status);
962                s.completed_at = Some(now);
963            }
964        });
965        Ok(())
966    }
967
968    /// Run whole-plan verification after `DagScheduler` reaches `Done{Completed}`.
969    ///
970    /// Returns completed `TaskNode`s from the partial replan DAG when a replan cycle
971    /// was executed. Returns `None` when verification is disabled, not applicable, or
972    /// the plan passes the threshold. Returns `None` on any error (fail-open).
973    ///
974    /// The returned tasks must be merged into the original graph by the caller (C2)
975    /// so the `Aggregator` sees both original and gap-filling task outputs.
976    async fn run_whole_plan_verify(
977        &mut self,
978        scheduler: &mut crate::orchestration::DagScheduler,
979        final_status: crate::orchestration::GraphStatus,
980    ) -> Option<Vec<crate::orchestration::TaskNode>> {
981        use crate::orchestration::{GraphStatus, PlanVerifier};
982
983        if final_status != GraphStatus::Completed
984            || !self.orchestration.orchestration_config.verify_completeness
985            || scheduler.max_replans_remaining() == 0
986        {
987            return None;
988        }
989
990        let threshold = scheduler.completeness_threshold();
991        let max_tokens = self.orchestration.orchestration_config.verify_max_tokens;
992        let max_tasks = self.orchestration.orchestration_config.max_tasks;
993        let goal = scheduler.graph().goal.clone();
994        let truncated_output = collect_and_truncate_task_outputs(scheduler.graph(), max_tokens);
995
996        if truncated_output.is_empty() {
997            return None;
998        }
999
1000        let verify_provider = self
1001            .orchestration
1002            .verify_provider
1003            .as_ref()
1004            .unwrap_or(&self.provider)
1005            .clone();
1006        let mut verifier =
1007            PlanVerifier::new(verify_provider, max_tokens, self.security.sanitizer.clone());
1008        let result = verifier.verify_plan(&goal, &truncated_output).await;
1009
1010        tracing::debug!(
1011            complete = result.complete,
1012            confidence = result.confidence,
1013            gaps = result.gaps.len(),
1014            threshold,
1015            "whole-plan verification result"
1016        );
1017
1018        let should_replan =
1019            !result.complete && result.confidence < f64::from(threshold) && !result.gaps.is_empty();
1020
1021        if !should_replan {
1022            return None;
1023        }
1024
1025        scheduler.record_whole_plan_replan();
1026
1027        let next_id = u32::try_from(scheduler.graph().tasks.len()).unwrap_or(u32::MAX);
1028        let gap_tasks = match verifier
1029            .replan_from_plan(&goal, &result.gaps, next_id, max_tasks)
1030            .await
1031        {
1032            Ok(tasks) => tasks,
1033            Err(e) => {
1034                tracing::warn!(error = %e, "whole-plan replan_from_plan failed (fail-open)");
1035                return None;
1036            }
1037        };
1038
1039        if gap_tasks.is_empty() {
1040            return None;
1041        }
1042
1043        self.execute_partial_replan_dag(gap_tasks, &goal).await
1044    }
1045
1046    /// Build and run a partial DAG from gap tasks generated by whole-plan verification.
1047    ///
1048    /// Uses `max_replans=0` and `verify_completeness=false` to prevent recursive replan
1049    /// loops (C1 / INV-2). Returns completed task nodes on success, `None` on any error.
1050    async fn execute_partial_replan_dag(
1051        &mut self,
1052        gap_tasks: Vec<crate::orchestration::TaskNode>,
1053        goal: &str,
1054    ) -> Option<Vec<crate::orchestration::TaskNode>> {
1055        use crate::orchestration::{DagScheduler, RuleBasedRouter, TaskStatus};
1056
1057        let mut partial_graph = crate::orchestration::TaskGraph::new(goal);
1058        partial_graph.tasks = gap_tasks;
1059
1060        let mut partial_config = self.orchestration.orchestration_config.clone();
1061        // INV-2: prevent recursive whole-plan replan loops in the partial DAG.
1062        partial_config.max_replans = 0;
1063        partial_config.verify_completeness = false;
1064
1065        let available_agents = self
1066            .orchestration
1067            .subagent_manager
1068            .as_ref()
1069            .map(|m| m.definitions().to_vec())
1070            .unwrap_or_default();
1071
1072        let mut partial_scheduler = match DagScheduler::new(
1073            partial_graph,
1074            &partial_config,
1075            Box::new(RuleBasedRouter),
1076            available_agents,
1077        ) {
1078            Ok(s) => s,
1079            Err(e) => {
1080                tracing::warn!(
1081                    error = %e,
1082                    "whole-plan replan: failed to create partial DagScheduler (fail-open)"
1083                );
1084                return None;
1085            }
1086        };
1087
1088        let partial_task_count = partial_scheduler.graph().tasks.len();
1089        let cancel_token = CancellationToken::new();
1090        if let Err(e) = self
1091            .run_scheduler_loop(&mut partial_scheduler, partial_task_count, cancel_token)
1092            .await
1093        {
1094            tracing::warn!(
1095                error = %e,
1096                "whole-plan replan: partial DAG run failed (fail-open)"
1097            );
1098        }
1099
1100        let completed: Vec<_> = partial_scheduler
1101            .into_graph()
1102            .tasks
1103            .into_iter()
1104            .filter(|t| t.status == TaskStatus::Completed)
1105            .collect();
1106
1107        if completed.is_empty() {
1108            None
1109        } else {
1110            Some(completed)
1111        }
1112    }
1113
1114    /// Cancel all agents referenced in `cancel_actions`.
1115    ///
1116    /// Returns `Some(status)` if a `Done` action is encountered, `None` otherwise.
1117    fn cancel_agents_from_actions(
1118        &mut self,
1119        cancel_actions: Vec<crate::orchestration::SchedulerAction>,
1120    ) -> Option<crate::orchestration::GraphStatus> {
1121        use crate::orchestration::SchedulerAction;
1122        for action in cancel_actions {
1123            match action {
1124                SchedulerAction::Cancel { agent_handle_id } => {
1125                    if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1126                        let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1127                            tracing::trace!(error = %e, "cancel: agent already gone");
1128                        });
1129                    }
1130                }
1131                SchedulerAction::Done { status } => return Some(status),
1132                SchedulerAction::Spawn { .. }
1133                | SchedulerAction::RunInline { .. }
1134                | SchedulerAction::Verify { .. } => {}
1135            }
1136        }
1137        None
1138    }
1139
1140    /// Handle a `SchedulerAction::Spawn` — attempt to spawn a sub-agent for the given task.
1141    ///
1142    /// Returns `(spawn_success, concurrency_fail, done_status)`.
1143    /// `done_status` is `Some` when spawn failure forces the scheduler to emit a `Done` action.
1144    async fn handle_scheduler_spawn_action(
1145        &mut self,
1146        scheduler: &mut crate::orchestration::DagScheduler,
1147        task_id: crate::orchestration::TaskId,
1148        agent_def_name: String,
1149        prompt: String,
1150        spawn_counter: &mut usize,
1151        task_count: usize,
1152    ) -> (bool, bool, Option<crate::orchestration::GraphStatus>) {
1153        let task_title = scheduler
1154            .graph()
1155            .tasks
1156            .get(task_id.index())
1157            .map_or("unknown", |t| t.title.as_str());
1158
1159        let provider = self.provider.clone();
1160        let tool_executor = Arc::clone(&self.tool_executor);
1161        let skills = self.filtered_skills_for(&agent_def_name);
1162        let cfg = self.orchestration.subagent_config.clone();
1163        let event_tx = scheduler.event_sender();
1164
1165        let mgr = self
1166            .orchestration
1167            .subagent_manager
1168            .as_mut()
1169            .expect("subagent_manager checked above");
1170
1171        let on_done = {
1172            use crate::orchestration::{TaskEvent, TaskOutcome};
1173            move |handle_id: String, result: Result<String, crate::subagent::SubAgentError>| {
1174                let outcome = match &result {
1175                    Ok(output) => TaskOutcome::Completed {
1176                        output: output.clone(),
1177                        artifacts: vec![],
1178                    },
1179                    Err(e) => TaskOutcome::Failed {
1180                        error: e.to_string(),
1181                    },
1182                };
1183                let tx = event_tx;
1184                tokio::spawn(async move {
1185                    if let Err(e) = tx
1186                        .send(TaskEvent {
1187                            task_id,
1188                            agent_handle_id: handle_id,
1189                            outcome,
1190                        })
1191                        .await
1192                    {
1193                        tracing::warn!(
1194                            error = %e,
1195                            "failed to send TaskEvent: scheduler may have been dropped"
1196                        );
1197                    }
1198                });
1199            }
1200        };
1201
1202        match mgr.spawn_for_task(
1203            &agent_def_name,
1204            &prompt,
1205            provider,
1206            tool_executor,
1207            skills,
1208            &cfg,
1209            on_done,
1210        ) {
1211            Ok(handle_id) => {
1212                *spawn_counter += 1;
1213                let _ = self
1214                    .channel
1215                    .send_status(&format!(
1216                        "Executing task {spawn_counter}/{task_count}: {task_title}..."
1217                    ))
1218                    .await;
1219                scheduler.record_spawn(task_id, handle_id, agent_def_name);
1220                (true, false, None)
1221            }
1222            Err(e) => {
1223                tracing::error!(error = %e, %task_id, "spawn_for_task failed");
1224                let concurrency_fail =
1225                    matches!(e, crate::subagent::SubAgentError::ConcurrencyLimit { .. });
1226                let extra = scheduler.record_spawn_failure(task_id, &e);
1227                let done_status = self.cancel_agents_from_actions(extra);
1228                (false, concurrency_fail, done_status)
1229            }
1230        }
1231    }
1232
1233    /// Execute a `RunInline` scheduler action: run the task synchronously in the current agent.
1234    ///
1235    /// Sends a status update, registers the spawn with the scheduler, runs the inline tool
1236    /// loop (or cancels on token fire), and posts the completion event back to the scheduler.
1237    async fn handle_run_inline_action(
1238        &mut self,
1239        scheduler: &mut crate::orchestration::DagScheduler,
1240        task_id: crate::orchestration::TaskId,
1241        prompt: String,
1242        spawn_counter: usize,
1243        task_count: usize,
1244        cancel_token: &CancellationToken,
1245    ) {
1246        let task_title = scheduler
1247            .graph()
1248            .tasks
1249            .get(task_id.index())
1250            .map_or("unknown", |t| t.title.as_str());
1251        let _ = self
1252            .channel
1253            .send_status(&format!(
1254                "Executing task {spawn_counter}/{task_count} (inline): {task_title}..."
1255            ))
1256            .await;
1257
1258        // record_spawn before the inline call so the completion event is always
1259        // buffered before any timeout check fires in the next tick().
1260        let handle_id = format!("__inline_{task_id}__");
1261        scheduler.record_spawn(task_id, handle_id.clone(), "__main__".to_string());
1262
1263        let event_tx = scheduler.event_sender();
1264        let max_iter = self.tool_orchestrator.max_iterations;
1265        let outcome = tokio::select! {
1266            result = self.run_inline_tool_loop(&prompt, max_iter) => {
1267                match result {
1268                    Ok(output) => crate::orchestration::TaskOutcome::Completed {
1269                        output,
1270                        artifacts: vec![],
1271                    },
1272                    Err(e) => crate::orchestration::TaskOutcome::Failed {
1273                        error: e.to_string(),
1274                    },
1275                }
1276            }
1277            () = cancel_token.cancelled() => {
1278                // TODO: use TaskOutcome::Canceled when the variant is added (#1603)
1279                crate::orchestration::TaskOutcome::Failed {
1280                    error: "canceled".to_string(),
1281                }
1282            }
1283        };
1284        let event = crate::orchestration::TaskEvent {
1285            task_id,
1286            agent_handle_id: handle_id,
1287            outcome,
1288        };
1289        if let Err(e) = event_tx.send(event).await {
1290            tracing::warn!(%task_id, error = %e, "inline task event send failed");
1291        }
1292    }
1293
1294    // too_many_lines: sequential scheduler event loop with 4 tokio::select! branches
1295    // (cancel token, scheduler tick, channel recv with /plan cancel + channel-close paths,
1296    // shutdown signal) — each branch requires distinct cancel/fail/ignore semantics that
1297    // cannot be split without introducing shared mutable state across async boundaries.
1298    #[allow(clippy::too_many_lines)]
1299    /// Drive the [`DagScheduler`] tick loop until it emits `SchedulerAction::Done`.
1300    ///
1301    /// Each iteration yields at `wait_event()`, during which `channel.recv()` is polled
1302    /// concurrently via `tokio::select!`. If the user sends `/plan cancel`, all running
1303    /// sub-agent tasks are aborted and the loop exits with [`GraphStatus::Canceled`].
1304    /// If the channel is closed (`Ok(None)`), all running sub-agent tasks are aborted
1305    /// and the loop exits with [`GraphStatus::Failed`].
1306    /// Other messages received during execution are queued in `message_queue` and
1307    /// processed after the plan completes.
1308    ///
1309    /// # Known limitations
1310    ///
1311    /// `RunInline` tasks block the tick loop for their entire duration — `/plan cancel`
1312    /// cannot interrupt an in-progress inline LLM call and will only be delivered on the
1313    /// next iteration after the call completes.
1314    async fn run_scheduler_loop(
1315        &mut self,
1316        scheduler: &mut crate::orchestration::DagScheduler,
1317        task_count: usize,
1318        cancel_token: CancellationToken,
1319    ) -> Result<crate::orchestration::GraphStatus, error::AgentError> {
1320        use crate::orchestration::{PlanVerifier, SchedulerAction};
1321
1322        // Sequential spawn counter for human-readable "task N/M" progress messages.
1323        // task_id.index() reflects array position and can be non-contiguous for
1324        // parallel plans (e.g. 0, 2, 4), so we use a local counter instead.
1325        let mut spawn_counter: usize = 0;
1326
1327        // Tracks (handle_id, secret_key) pairs denied this plan execution to prevent
1328        // re-prompting the user when a sub-agent re-requests the same secret after denial.
1329        let mut denied_secrets: std::collections::HashSet<(String, String)> =
1330            std::collections::HashSet::new();
1331
1332        // Lazily initialized per-task verifier. Created once on the first Verify action;
1333        // reused for all subsequent per-task Verify calls in this scheduler run.
1334        let mut plan_verifier: Option<PlanVerifier<AnyProvider>> = None;
1335
1336        let final_status = 'tick: loop {
1337            let actions = scheduler.tick();
1338
1339            // Track batch-level spawn outcomes for record_batch_backoff() below.
1340            let mut any_spawn_success = false;
1341            let mut any_concurrency_failure = false;
1342
1343            for action in actions {
1344                match action {
1345                    SchedulerAction::Spawn {
1346                        task_id,
1347                        agent_def_name,
1348                        prompt,
1349                    } => {
1350                        let (success, fail, done) = self
1351                            .handle_scheduler_spawn_action(
1352                                scheduler,
1353                                task_id,
1354                                agent_def_name,
1355                                prompt,
1356                                &mut spawn_counter,
1357                                task_count,
1358                            )
1359                            .await;
1360                        any_spawn_success |= success;
1361                        any_concurrency_failure |= fail;
1362                        if let Some(s) = done {
1363                            break 'tick s;
1364                        }
1365                    }
1366                    SchedulerAction::Cancel { agent_handle_id } => {
1367                        if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1368                            // benign race: agent may have already finished
1369                            let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1370                                tracing::trace!(error = %e, "cancel: agent already gone");
1371                            });
1372                        }
1373                    }
1374                    // Inline execution: the LLM call blocks this tick loop for its
1375                    // duration. This is intentionally sequential and only expected in
1376                    // single-agent setups where no sub-agents are configured.
1377                    // Known limitation: if a RunInline action appears before Spawn actions
1378                    // in the same batch (mixed routing), those Spawn actions are delayed
1379                    // until the inline call completes. Refactor to tokio::spawn if mixed
1380                    // batches become common.
1381                    // TODO(post-MVP): wire CancellationToken into run_inline_tool_loop so
1382                    // that /plan cancel can interrupt a long-running inline LLM call instead
1383                    // of waiting for the current iteration to complete.
1384                    SchedulerAction::RunInline { task_id, prompt } => {
1385                        spawn_counter += 1;
1386                        self.handle_run_inline_action(
1387                            scheduler,
1388                            task_id,
1389                            prompt,
1390                            spawn_counter,
1391                            task_count,
1392                            &cancel_token,
1393                        )
1394                        .await;
1395                    }
1396                    SchedulerAction::Done { status } => {
1397                        break 'tick status;
1398                    }
1399                    SchedulerAction::Verify { task_id, output } => {
1400                        // Per-task verification: evaluate the completed task's output and
1401                        // optionally inject replan tasks if gaps are found below threshold.
1402                        // Fail-open: any error logs a warning and continues without blocking.
1403                        let verify_provider = self
1404                            .orchestration
1405                            .verify_provider
1406                            .as_ref()
1407                            .unwrap_or(&self.provider)
1408                            .clone();
1409                        let max_tokens = self.orchestration.orchestration_config.verify_max_tokens;
1410                        let threshold = self
1411                            .orchestration
1412                            .orchestration_config
1413                            .completeness_threshold;
1414                        let sanitizer = self.security.sanitizer.clone();
1415
1416                        // Initialize the verifier once; reuse across Verify actions.
1417                        let verifier = plan_verifier.get_or_insert_with(|| {
1418                            PlanVerifier::new(verify_provider, max_tokens, sanitizer)
1419                        });
1420
1421                        let task = scheduler.graph().tasks.get(task_id.index()).cloned();
1422
1423                        if let Some(task) = task {
1424                            let result = verifier.verify(&task, &output).await;
1425                            tracing::debug!(
1426                                task_id = %task_id,
1427                                complete = result.complete,
1428                                confidence = result.confidence,
1429                                gaps = result.gaps.len(),
1430                                "per-task verification result"
1431                            );
1432
1433                            let should_replan = !result.complete
1434                                && result.confidence < f64::from(threshold)
1435                                && result.gaps.iter().any(|g| {
1436                                    matches!(
1437                                        g.severity,
1438                                        crate::orchestration::GapSeverity::Critical
1439                                            | crate::orchestration::GapSeverity::Important
1440                                    )
1441                                });
1442
1443                            if should_replan {
1444                                let max_tasks_u32 =
1445                                    self.orchestration.orchestration_config.max_tasks;
1446                                let max_tasks = max_tasks_u32 as usize;
1447                                match verifier
1448                                    .replan(&task, &result.gaps, scheduler.graph(), max_tasks_u32)
1449                                    .await
1450                                {
1451                                    Ok(new_tasks) if !new_tasks.is_empty() => {
1452                                        if let Err(e) =
1453                                            scheduler.inject_tasks(task_id, new_tasks, max_tasks)
1454                                        {
1455                                            tracing::warn!(
1456                                                error = %e,
1457                                                task_id = %task_id,
1458                                                "per-task replan inject_tasks failed (fail-open)"
1459                                            );
1460                                        }
1461                                    }
1462                                    Ok(_) => {}
1463                                    Err(e) => {
1464                                        tracing::warn!(
1465                                            error = %e,
1466                                            task_id = %task_id,
1467                                            "per-task replan failed (fail-open)"
1468                                        );
1469                                    }
1470                                }
1471                            }
1472                        }
1473                    }
1474                }
1475            }
1476
1477            // Update batch-level backoff counter after processing all Spawn actions.
1478            scheduler.record_batch_backoff(any_spawn_success, any_concurrency_failure);
1479
1480            // Drain all pending secret requests this tick (MED-2 fix).
1481            self.process_pending_secret_requests(&mut denied_secrets)
1482                .await;
1483
1484            // Update TUI with current graph state.
1485            let snapshot = crate::metrics::TaskGraphSnapshot::from(scheduler.graph());
1486            self.update_metrics(|m| {
1487                m.orchestration_graph = Some(snapshot);
1488            });
1489
1490            // NOTE(Telegram): Telegram's recv() is not fully cancel-safe — a message
1491            // consumed from the internal mpsc but not yet returned can be lost if the
1492            // select! cancels the future during the /start send().await path. For
1493            // non-command messages the race window is negligible. Acceptable for MVP.
1494            //
1495            // NOTE(RunInline): tasks in the RunInline arm above block this tick loop
1496            // synchronously (no await between loop iteration start and wait_event).
1497            // /plan cancel cannot interrupt an inline LLM call mid-execution; it is
1498            // delivered on the next tick after the inline call completes.
1499            // TODO(post-MVP): wire CancellationToken into run_inline_tool_loop.
1500            tokio::select! {
1501                // biased: token cancellation takes priority over new events and input.
1502                biased;
1503                () = cancel_token.cancelled() => {
1504                    let cancel_actions = scheduler.cancel_all();
1505                    for action in cancel_actions {
1506                        match action {
1507                            SchedulerAction::Cancel { agent_handle_id } => {
1508                                if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1509                                    let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1510                                        tracing::trace!(
1511                                            error = %e,
1512                                            "cancel during plan cancellation: agent already gone"
1513                                        );
1514                                    });
1515                                }
1516                            }
1517                            SchedulerAction::Done { status } => {
1518                                break 'tick status;
1519                            }
1520                            SchedulerAction::Spawn { .. }
1521                            | SchedulerAction::RunInline { .. }
1522                            | SchedulerAction::Verify { .. } => {}
1523                        }
1524                    }
1525                    // Defensive fallback: cancel_all always emits Done, but guard against
1526                    // future changes.
1527                    break 'tick crate::orchestration::GraphStatus::Canceled;
1528                }
1529                () = scheduler.wait_event() => {}
1530                result = self.channel.recv() => {
1531                    if let Ok(Some(msg)) = result {
1532                        if msg.text.trim().eq_ignore_ascii_case("/plan cancel") {
1533                            let _ = self.channel.send_status("Canceling plan...").await;
1534                            let cancel_actions = scheduler.cancel_all();
1535                            for ca in cancel_actions {
1536                                match ca {
1537                                    SchedulerAction::Cancel { agent_handle_id } => {
1538                                        if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1539                                            // benign race: agent may have already finished
1540                                            let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1541                                                tracing::trace!(error = %e, "cancel on user request: agent already gone");
1542                                            });
1543                                        }
1544                                    }
1545                                    SchedulerAction::Done { status } => {
1546                                        break 'tick status;
1547                                    }
1548                                    SchedulerAction::Spawn { .. }
1549                                    | SchedulerAction::RunInline { .. }
1550                                    | SchedulerAction::Verify { .. } => {}
1551                                }
1552                            }
1553                            // Defensive fallback: cancel_all always emits Done, but guard
1554                            // against future changes.
1555                            break 'tick crate::orchestration::GraphStatus::Canceled;
1556                        }
1557                        self.enqueue_or_merge(msg.text, vec![], msg.attachments);
1558                    } else {
1559                        // Channel closed. Drain buffered completion events BEFORE canceling
1560                        // so that tasks which completed between the last tick and the
1561                        // channel-close are recorded as Completed, not Canceled.
1562                        // cancel_all() empties self.running first, causing process_event()
1563                        // to silently discard any late completions — drain must come first.
1564                        let drain_actions = scheduler.tick();
1565                        let mut natural_done: Option<crate::orchestration::GraphStatus> = None;
1566                        for action in drain_actions {
1567                            match action {
1568                                SchedulerAction::Cancel { agent_handle_id } => {
1569                                    if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1570                                        let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1571                                            tracing::trace!(
1572                                                error = %e,
1573                                                "cancel during drain on channel close: agent already gone"
1574                                            );
1575                                        });
1576                                    }
1577                                }
1578                                SchedulerAction::Done { status } => {
1579                                    natural_done = Some(status);
1580                                }
1581                                // Ignore Spawn/RunInline/Verify — we are shutting down.
1582                                SchedulerAction::Spawn { .. }
1583                                | SchedulerAction::RunInline { .. }
1584                                | SchedulerAction::Verify { .. } => {}
1585                            }
1586                        }
1587
1588                        // If the plan completed naturally during the drain tick, honor that.
1589                        if let Some(status) = natural_done {
1590                            break 'tick status;
1591                        }
1592
1593                        // Cancel remaining running tasks after the drain.
1594                        let cancel_actions = scheduler.cancel_all();
1595                        let n = cancel_actions
1596                            .iter()
1597                            .filter(|a| matches!(a, SchedulerAction::Cancel { .. }))
1598                            .count();
1599                        // Use supports_exit() to distinguish termination semantics:
1600                        // - CLI/TUI (supports_exit=true): stdin EOF or TUI close → Canceled
1601                        // - Telegram/Discord/Slack (supports_exit=false): infra failure → Failed
1602                        //   so the user can /plan retry after reconnecting.
1603                        let shutdown_status = if self.channel.supports_exit() {
1604                            crate::orchestration::GraphStatus::Canceled
1605                        } else {
1606                            crate::orchestration::GraphStatus::Failed
1607                        };
1608                        tracing::warn!(
1609                            sub_agents = n,
1610                            supports_exit = self.channel.supports_exit(),
1611                            status = ?shutdown_status,
1612                            "scheduler channel closed, canceling running sub-agents"
1613                        );
1614                        for action in cancel_actions {
1615                            match action {
1616                                SchedulerAction::Cancel { agent_handle_id } => {
1617                                    if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1618                                        let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1619                                            tracing::trace!(
1620                                                error = %e,
1621                                                "cancel on channel close: agent already gone"
1622                                            );
1623                                        });
1624                                    }
1625                                }
1626                                // Intentionally ignore Done here — we use shutdown_status above.
1627                                SchedulerAction::Done { .. }
1628                                | SchedulerAction::Spawn { .. }
1629                                | SchedulerAction::RunInline { .. }
1630                                | SchedulerAction::Verify { .. } => {}
1631                            }
1632                        }
1633                        break 'tick shutdown_status;
1634                    }
1635                }
1636                // Shutdown signal received — cancel running sub-agents and exit cleanly.
1637                () = shutdown_signal(&mut self.lifecycle.shutdown) => {
1638                    let cancel_actions = scheduler.cancel_all();
1639                    let n = cancel_actions
1640                        .iter()
1641                        .filter(|a| matches!(a, SchedulerAction::Cancel { .. }))
1642                        .count();
1643                    tracing::warn!(sub_agents = n, "shutdown signal received, canceling running sub-agents");
1644                    for action in cancel_actions {
1645                        match action {
1646                            SchedulerAction::Cancel { agent_handle_id } => {
1647                                if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1648                                    let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1649                                        tracing::trace!(
1650                                            error = %e,
1651                                            "cancel on shutdown: agent already gone"
1652                                        );
1653                                    });
1654                                }
1655                            }
1656                            SchedulerAction::Done { status } => {
1657                                break 'tick status;
1658                            }
1659                            SchedulerAction::Spawn { .. }
1660                            | SchedulerAction::RunInline { .. }
1661                            | SchedulerAction::Verify { .. } => {}
1662                        }
1663                    }
1664                    // Defensive fallback: cancel_all always emits Done, but guard against
1665                    // future changes.
1666                    break 'tick crate::orchestration::GraphStatus::Canceled;
1667                }
1668            }
1669        };
1670
1671        // Final drain: if the loop exited via Done on the first tick, secret
1672        // requests buffered before completion would otherwise be silently dropped.
1673        self.process_pending_secret_requests(&mut std::collections::HashSet::new())
1674            .await;
1675
1676        Ok(final_status)
1677    }
1678
1679    /// Run a tool-aware LLM loop for an inline scheduled task.
1680    ///
1681    /// Unlike [`process_response_native_tools`], this is intentionally stripped of all
1682    /// interactive-session machinery (channel sends, doom-loop detection, summarization,
1683    /// learning engine, sanitizer, metrics). Inline tasks are short-lived orchestration
1684    /// sub-tasks that run synchronously inside the scheduler tick loop.
1685    async fn run_inline_tool_loop(
1686        &mut self,
1687        prompt: &str,
1688        max_iterations: usize,
1689    ) -> Result<String, zeph_llm::LlmError> {
1690        use zeph_llm::provider::{ChatResponse, Message, MessagePart, Role, ToolDefinition};
1691        use zeph_tools::executor::ToolCall;
1692
1693        // CRIT-01 / TAFC isolation: inline tool loops run as subagent orchestration tasks
1694        // (scheduler, planner, aggregator) and intentionally bypass TAFC augmentation.
1695        // They use their own private message history and never surface TAFC think fields
1696        // to the interactive session, so no stripping is needed here (CRIT-02 compliance).
1697        let tool_defs: Vec<ToolDefinition> = self
1698            .tool_executor
1699            .tool_definitions_erased()
1700            .iter()
1701            .map(tool_execution::tool_def_to_definition)
1702            .collect();
1703
1704        tracing::debug!(
1705            prompt_len = prompt.len(),
1706            max_iterations,
1707            tool_count = tool_defs.len(),
1708            "inline tool loop: starting"
1709        );
1710
1711        let mut messages: Vec<Message> = vec![Message::from_legacy(Role::User, prompt)];
1712        let mut last_text = String::new();
1713
1714        for iteration in 0..max_iterations {
1715            let response = self.provider.chat_with_tools(&messages, &tool_defs).await?;
1716
1717            match response {
1718                ChatResponse::Text(text) => {
1719                    tracing::debug!(iteration, "inline tool loop: text response, returning");
1720                    return Ok(text);
1721                }
1722                ChatResponse::ToolUse {
1723                    text, tool_calls, ..
1724                } => {
1725                    tracing::debug!(
1726                        iteration,
1727                        tools = ?tool_calls.iter().map(|tc| &tc.name).collect::<Vec<_>>(),
1728                        "inline tool loop: tool use"
1729                    );
1730
1731                    if let Some(ref t) = text {
1732                        last_text.clone_from(t);
1733                    }
1734
1735                    // Build assistant message with optional leading text + tool use parts.
1736                    let mut parts: Vec<MessagePart> = Vec::new();
1737                    if let Some(ref t) = text
1738                        && !t.is_empty()
1739                    {
1740                        parts.push(MessagePart::Text { text: t.clone() });
1741                    }
1742                    for tc in &tool_calls {
1743                        parts.push(MessagePart::ToolUse {
1744                            id: tc.id.clone(),
1745                            name: tc.name.clone(),
1746                            input: tc.input.clone(),
1747                        });
1748                    }
1749                    messages.push(Message::from_parts(Role::Assistant, parts));
1750
1751                    // Execute each tool call and collect results.
1752                    // Drain elicitation_rx concurrently to avoid deadlock: MCP tool may
1753                    // send an elicitation request and block waiting for a response while
1754                    // execute_tool_call_erased is blocked waiting for the tool to finish.
1755                    let mut result_parts: Vec<MessagePart> = Vec::new();
1756                    for tc in &tool_calls {
1757                        let call = ToolCall {
1758                            tool_id: tc.name.clone(),
1759                            params: match &tc.input {
1760                                serde_json::Value::Object(map) => map.clone(),
1761                                _ => serde_json::Map::new(),
1762                            },
1763                        };
1764                        let output = loop {
1765                            tokio::select! {
1766                                result = self.tool_executor.execute_tool_call_erased(&call) => {
1767                                    break match result {
1768                                        Ok(Some(out)) => out.summary,
1769                                        Ok(None) => "(no output)".to_owned(),
1770                                        Err(e) => format!("[error] {e}"),
1771                                    };
1772                                }
1773                                Some(event) = async {
1774                                    match self.mcp.elicitation_rx.as_mut() {
1775                                        Some(rx) => rx.recv().await,
1776                                        None => std::future::pending().await,
1777                                    }
1778                                } => {
1779                                    self.handle_elicitation_event(event).await;
1780                                }
1781                            }
1782                        };
1783                        let is_error = output.starts_with("[error]");
1784                        result_parts.push(MessagePart::ToolResult {
1785                            tool_use_id: tc.id.clone(),
1786                            content: output,
1787                            is_error,
1788                        });
1789                    }
1790                    messages.push(Message::from_parts(Role::User, result_parts));
1791                }
1792            }
1793        }
1794
1795        tracing::debug!(
1796            max_iterations,
1797            last_text_empty = last_text.is_empty(),
1798            "inline tool loop: iteration limit reached"
1799        );
1800        Ok(last_text)
1801    }
1802
1803    /// Bridge pending secret requests from sub-agents to the user (non-blocking, time-bounded).
1804    ///
1805    /// SEC-P1-02: explicit user confirmation is required before granting any secret to a
1806    /// sub-agent. Denial is the default on timeout or channel error.
1807    ///
1808    /// `denied` tracks `(handle_id, secret_key)` pairs already denied this plan execution.
1809    /// Re-requests for a denied pair are auto-denied without prompting the user.
1810    async fn process_pending_secret_requests(
1811        &mut self,
1812        denied: &mut std::collections::HashSet<(String, String)>,
1813    ) {
1814        loop {
1815            let pending = self
1816                .orchestration
1817                .subagent_manager
1818                .as_mut()
1819                .and_then(crate::subagent::SubAgentManager::try_recv_secret_request);
1820            let Some((req_handle_id, req)) = pending else {
1821                break;
1822            };
1823            let deny_key = (req_handle_id.clone(), req.secret_key.clone());
1824            if denied.contains(&deny_key) {
1825                tracing::debug!(
1826                    handle_id = %req_handle_id,
1827                    secret_key = %req.secret_key,
1828                    "skipping duplicate secret prompt for already-denied key"
1829                );
1830                if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1831                    let _ = mgr.deny_secret(&req_handle_id);
1832                }
1833                continue;
1834            }
1835            let prompt = format!(
1836                "Sub-agent requests secret '{}'. Allow?{}",
1837                crate::text::truncate_to_chars(&req.secret_key, 100),
1838                req.reason
1839                    .as_deref()
1840                    .map(|r| format!(" Reason: {}", crate::text::truncate_to_chars(r, 200)))
1841                    .unwrap_or_default()
1842            );
1843            // CRIT-1 fix: use select! to avoid blocking the tick loop forever.
1844            let approved = tokio::select! {
1845                result = self.channel.confirm(&prompt) => result.unwrap_or(false),
1846                () = tokio::time::sleep(std::time::Duration::from_secs(120)) => {
1847                    let _ = self.channel.send("Secret request timed out.").await;
1848                    false
1849                }
1850            };
1851            if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1852                if approved {
1853                    let ttl = std::time::Duration::from_secs(300);
1854                    let key = req.secret_key.clone();
1855                    if mgr.approve_secret(&req_handle_id, &key, ttl).is_ok() {
1856                        let _ = mgr.deliver_secret(&req_handle_id, key);
1857                    }
1858                } else {
1859                    denied.insert(deny_key);
1860                    let _ = mgr.deny_secret(&req_handle_id);
1861                }
1862            }
1863        }
1864    }
1865
1866    /// Aggregate results or report failure after the tick loop completes.
1867    #[allow(clippy::too_many_lines)]
1868    async fn finalize_plan_execution(
1869        &mut self,
1870        completed_graph: crate::orchestration::TaskGraph,
1871        final_status: crate::orchestration::GraphStatus,
1872    ) -> Result<&'static str, error::AgentError> {
1873        use std::fmt::Write;
1874
1875        use crate::orchestration::{Aggregator, GraphStatus, LlmAggregator};
1876
1877        let result_label = match final_status {
1878            GraphStatus::Completed => {
1879                // Update task completion counters.
1880                let completed_count = completed_graph
1881                    .tasks
1882                    .iter()
1883                    .filter(|t| t.status == crate::orchestration::TaskStatus::Completed)
1884                    .count() as u64;
1885                let skipped_count = completed_graph
1886                    .tasks
1887                    .iter()
1888                    .filter(|t| t.status == crate::orchestration::TaskStatus::Skipped)
1889                    .count() as u64;
1890                self.update_metrics(|m| {
1891                    m.orchestration.tasks_completed += completed_count;
1892                    m.orchestration.tasks_skipped += skipped_count;
1893                });
1894
1895                let aggregator = LlmAggregator::new(
1896                    self.provider.clone(),
1897                    &self.orchestration.orchestration_config,
1898                );
1899                match aggregator.aggregate(&completed_graph).await {
1900                    Ok((synthesis, aggregator_usage)) => {
1901                        let (aggr_prompt, aggr_completion) = aggregator_usage.unwrap_or((0, 0));
1902                        self.update_metrics(|m| {
1903                            m.api_calls += 1;
1904                            m.prompt_tokens += aggr_prompt;
1905                            m.completion_tokens += aggr_completion;
1906                            m.total_tokens = m.prompt_tokens + m.completion_tokens;
1907                        });
1908                        self.record_cost(aggr_prompt, aggr_completion);
1909                        self.record_cache_usage();
1910                        self.channel.send(&synthesis).await?;
1911                    }
1912                    Err(e) => {
1913                        tracing::error!(error = %e, "aggregation failed");
1914                        self.channel
1915                            .send(
1916                                "Plan completed but aggregation failed. \
1917                                 Check individual task results.",
1918                            )
1919                            .await?;
1920                    }
1921                }
1922
1923                // Cache the completed plan template (best-effort, never blocks execution).
1924                if let Some(ref cache) = self.orchestration.plan_cache
1925                    && let Some(embedding) = self.orchestration.pending_goal_embedding.take()
1926                {
1927                    let embed_model = self.skill_state.embedding_model.clone();
1928                    if let Err(e) = cache
1929                        .cache_plan(&completed_graph, &embedding, &embed_model)
1930                        .await
1931                    {
1932                        tracing::warn!(error = %e, "plan cache: failed to cache completed plan");
1933                    }
1934                }
1935
1936                "completed"
1937            }
1938            GraphStatus::Failed => {
1939                let failed_tasks: Vec<_> = completed_graph
1940                    .tasks
1941                    .iter()
1942                    .filter(|t| t.status == crate::orchestration::TaskStatus::Failed)
1943                    .collect();
1944                let cancelled_tasks: Vec<_> = completed_graph
1945                    .tasks
1946                    .iter()
1947                    .filter(|t| t.status == crate::orchestration::TaskStatus::Canceled)
1948                    .collect();
1949                let completed_count = completed_graph
1950                    .tasks
1951                    .iter()
1952                    .filter(|t| t.status == crate::orchestration::TaskStatus::Completed)
1953                    .count() as u64;
1954                let skipped_count = completed_graph
1955                    .tasks
1956                    .iter()
1957                    .filter(|t| t.status == crate::orchestration::TaskStatus::Skipped)
1958                    .count() as u64;
1959                self.update_metrics(|m| {
1960                    m.orchestration.tasks_failed += failed_tasks.len() as u64;
1961                    m.orchestration.tasks_completed += completed_count;
1962                    m.orchestration.tasks_skipped += skipped_count;
1963                });
1964                let total = completed_graph.tasks.len();
1965                let msg = if failed_tasks.is_empty() && !cancelled_tasks.is_empty() {
1966                    // Pure scheduler deadlock: no tasks actually failed, some were canceled.
1967                    format!(
1968                        "Plan canceled. {}/{} tasks did not run.\n\
1969                         Use `/plan retry` to retry or check logs for details.",
1970                        cancelled_tasks.len(),
1971                        total
1972                    )
1973                } else if failed_tasks.is_empty() && cancelled_tasks.is_empty() {
1974                    // Should not occur through normal scheduler paths; make it visible.
1975                    tracing::warn!(
1976                        "plan finished with GraphStatus::Failed but no failed or canceled tasks"
1977                    );
1978                    "Plan failed. No task errors recorded; check logs for details.".to_string()
1979                } else {
1980                    let mut m = if cancelled_tasks.is_empty() {
1981                        format!(
1982                            "Plan failed. {}/{} tasks failed:\n",
1983                            failed_tasks.len(),
1984                            total
1985                        )
1986                    } else {
1987                        format!(
1988                            "Plan failed. {}/{} tasks failed, {} canceled:\n",
1989                            failed_tasks.len(),
1990                            total,
1991                            cancelled_tasks.len()
1992                        )
1993                    };
1994                    for t in &failed_tasks {
1995                        // SEC-M34-002: truncate raw task output before displaying to user.
1996                        let err: std::borrow::Cow<str> =
1997                            t.result.as_ref().map_or("unknown error".into(), |r| {
1998                                if r.output.len() > 500 {
1999                                    r.output.chars().take(500).collect::<String>().into()
2000                                } else {
2001                                    r.output.as_str().into()
2002                                }
2003                            });
2004                        let _ = writeln!(m, "  - {}: {err}", t.title);
2005                    }
2006                    m.push_str("\nUse `/plan retry` to retry failed tasks.");
2007                    m
2008                };
2009                self.channel.send(&msg).await?;
2010                // Store graph back so /plan retry and /plan resume work.
2011                // pending_goal_embedding is retained: retry/resume goes through handle_plan_confirm
2012                // -> finalize_plan_execution again, reusing the same embedding. A new /plan goal
2013                // cannot be issued while pending_graph is Some, so the embedding cannot go stale.
2014                self.orchestration.pending_graph = Some(completed_graph);
2015                "failed"
2016            }
2017            GraphStatus::Paused => {
2018                self.channel
2019                    .send(
2020                        "Plan paused due to a task failure (ask strategy). \
2021                         Use `/plan resume` to continue or `/plan retry` to retry failed tasks.",
2022                    )
2023                    .await?;
2024                // Same retention rationale as Failed: embedding reused on resume/retry.
2025                self.orchestration.pending_graph = Some(completed_graph);
2026                "paused"
2027            }
2028            GraphStatus::Canceled => {
2029                let done_count = completed_graph
2030                    .tasks
2031                    .iter()
2032                    .filter(|t| t.status == crate::orchestration::TaskStatus::Completed)
2033                    .count();
2034                self.update_metrics(|m| m.orchestration.tasks_completed += done_count as u64);
2035                let total = completed_graph.tasks.len();
2036                self.channel
2037                    .send(&format!(
2038                        "Plan canceled. {done_count}/{total} tasks completed before cancellation."
2039                    ))
2040                    .await?;
2041                // Do NOT store graph back into pending_graph — canceled plans are not
2042                // retryable via /plan retry.
2043                self.orchestration.pending_goal_embedding.take();
2044                "canceled"
2045            }
2046            other => {
2047                tracing::warn!(%other, "unexpected graph status after Done");
2048                self.channel
2049                    .send(&format!("Plan ended with status: {other}"))
2050                    .await?;
2051                self.orchestration.pending_goal_embedding.take();
2052                "unknown"
2053            }
2054        };
2055        Ok(result_label)
2056    }
2057
2058    async fn handle_plan_status(
2059        &mut self,
2060        _graph_id: Option<&str>,
2061    ) -> Result<(), error::AgentError> {
2062        use crate::orchestration::GraphStatus;
2063        let Some(ref graph) = self.orchestration.pending_graph else {
2064            self.channel.send("No active plan.").await?;
2065            return Ok(());
2066        };
2067        let msg = match graph.status {
2068            GraphStatus::Created => {
2069                "A plan is awaiting confirmation. Type `/plan confirm` to execute or `/plan cancel` to abort."
2070            }
2071            GraphStatus::Running => "Plan is currently running.",
2072            GraphStatus::Paused => {
2073                "Plan is paused. Use `/plan resume` to continue or `/plan cancel` to abort."
2074            }
2075            GraphStatus::Failed => {
2076                "Plan failed. Use `/plan retry` to retry or `/plan cancel` to discard."
2077            }
2078            GraphStatus::Completed => "Plan completed successfully.",
2079            GraphStatus::Canceled => "Plan was canceled.",
2080        };
2081        self.channel.send(msg).await?;
2082        Ok(())
2083    }
2084
2085    async fn handle_plan_list(&mut self) -> Result<(), error::AgentError> {
2086        if let Some(ref graph) = self.orchestration.pending_graph {
2087            let summary = format_plan_summary(graph);
2088            let status_label = match graph.status {
2089                crate::orchestration::GraphStatus::Created => "awaiting confirmation",
2090                crate::orchestration::GraphStatus::Running => "running",
2091                crate::orchestration::GraphStatus::Paused => "paused",
2092                crate::orchestration::GraphStatus::Failed => "failed (retryable)",
2093                _ => "unknown",
2094            };
2095            self.channel
2096                .send(&format!("{summary}\nStatus: {status_label}"))
2097                .await?;
2098        } else {
2099            self.channel.send("No recent plans.").await?;
2100        }
2101        Ok(())
2102    }
2103
2104    async fn handle_plan_cancel(
2105        &mut self,
2106        _graph_id: Option<&str>,
2107    ) -> Result<(), error::AgentError> {
2108        if let Some(token) = self.orchestration.plan_cancel_token.take() {
2109            // In-flight plan: signal cancellation. The scheduler loop will pick this up
2110            // in the next tokio::select! iteration at wait_event().
2111            // NOTE: Due to &mut self being held by run_scheduler_loop, this branch is only
2112            // reachable if the channel has a concurrent reader (e.g. Telegram, TUI events).
2113            // CLI and synchronous channels cannot deliver this while the loop is active
2114            // (see #1603, SEC-M34-002).
2115            token.cancel();
2116            self.channel.send("Canceling plan execution...").await?;
2117        } else if self.orchestration.pending_graph.take().is_some() {
2118            let now = std::time::Instant::now();
2119            self.update_metrics(|m| {
2120                if let Some(ref mut s) = m.orchestration_graph {
2121                    "canceled".clone_into(&mut s.status);
2122                    s.completed_at = Some(now);
2123                }
2124            });
2125            self.orchestration.pending_goal_embedding = None;
2126            self.channel.send("Plan canceled.").await?;
2127        } else {
2128            self.channel.send("No active plan to cancel.").await?;
2129        }
2130        Ok(())
2131    }
2132
2133    /// Resume a paused graph (Ask failure strategy triggered a pause).
2134    ///
2135    /// Looks for a pending graph in `Paused` status. If `graph_id` is provided
2136    /// it must match the active graph's id (SEC-P5-03).
2137    async fn handle_plan_resume(
2138        &mut self,
2139        graph_id: Option<&str>,
2140    ) -> Result<(), error::AgentError> {
2141        use crate::orchestration::GraphStatus;
2142
2143        let Some(ref graph) = self.orchestration.pending_graph else {
2144            self.channel
2145                .send("No paused plan to resume. Use `/plan status` to check the current state.")
2146                .await?;
2147            return Ok(());
2148        };
2149
2150        // SEC-P5-03: if a graph_id was provided, reject if it doesn't match.
2151        if let Some(id) = graph_id
2152            && graph.id.to_string() != id
2153        {
2154            self.channel
2155                .send(&format!(
2156                    "Graph id '{id}' does not match the active plan ({}). \
2157                     Use `/plan status` to see the active plan id.",
2158                    graph.id
2159                ))
2160                .await?;
2161            return Ok(());
2162        }
2163
2164        if graph.status != GraphStatus::Paused {
2165            self.channel
2166                .send(&format!(
2167                    "The active plan is in '{}' status and cannot be resumed. \
2168                     Only Paused plans can be resumed.",
2169                    graph.status
2170                ))
2171                .await?;
2172            return Ok(());
2173        }
2174
2175        let graph = self.orchestration.pending_graph.take().unwrap();
2176
2177        tracing::info!(
2178            graph_id = %graph.id,
2179            "resuming paused graph"
2180        );
2181
2182        self.channel
2183            .send(&format!(
2184                "Resuming plan: {}\nUse `/plan confirm` to continue execution.",
2185                graph.goal
2186            ))
2187            .await?;
2188
2189        // Store resumed graph back as pending. resume_from() will set status=Running in confirm.
2190        self.orchestration.pending_graph = Some(graph);
2191        Ok(())
2192    }
2193
2194    /// Retry failed tasks in a graph.
2195    ///
2196    /// Resets all `Failed` tasks to `Ready` and all `Skipped` dependents back
2197    /// to `Pending`, then re-stores the graph as pending for re-execution.
2198    /// If `graph_id` is provided it must match the active graph's id (SEC-P5-04).
2199    async fn handle_plan_retry(&mut self, graph_id: Option<&str>) -> Result<(), error::AgentError> {
2200        use crate::orchestration::{GraphStatus, dag};
2201
2202        let Some(ref graph) = self.orchestration.pending_graph else {
2203            self.channel
2204                .send("No active plan to retry. Use `/plan status` to check the current state.")
2205                .await?;
2206            return Ok(());
2207        };
2208
2209        // SEC-P5-04: if a graph_id was provided, reject if it doesn't match.
2210        if let Some(id) = graph_id
2211            && graph.id.to_string() != id
2212        {
2213            self.channel
2214                .send(&format!(
2215                    "Graph id '{id}' does not match the active plan ({}). \
2216                     Use `/plan status` to see the active plan id.",
2217                    graph.id
2218                ))
2219                .await?;
2220            return Ok(());
2221        }
2222
2223        if graph.status != GraphStatus::Failed && graph.status != GraphStatus::Paused {
2224            self.channel
2225                .send(&format!(
2226                    "The active plan is in '{}' status. Only Failed or Paused plans can be retried.",
2227                    graph.status
2228                ))
2229                .await?;
2230            return Ok(());
2231        }
2232
2233        let mut graph = self.orchestration.pending_graph.take().unwrap();
2234
2235        // IC3: count before reset so the message reflects actual failed tasks, not Ready count.
2236        let failed_count = graph
2237            .tasks
2238            .iter()
2239            .filter(|t| t.status == crate::orchestration::TaskStatus::Failed)
2240            .count();
2241
2242        dag::reset_for_retry(&mut graph).map_err(|e| error::AgentError::Other(e.to_string()))?;
2243
2244        // HIGH-1 fix: reset_for_retry only resets Failed/Canceled tasks. Any tasks that were
2245        // in Running state at pause time are left as Running with stale assigned_agent handles
2246        // (those sub-agents are long dead). Reset them to Ready so resume_from() does not try
2247        // to wait for their events.
2248        for task in &mut graph.tasks {
2249            if task.status == crate::orchestration::TaskStatus::Running {
2250                task.status = crate::orchestration::TaskStatus::Ready;
2251                task.assigned_agent = None;
2252            }
2253        }
2254
2255        tracing::info!(
2256            graph_id = %graph.id,
2257            failed_count,
2258            "retrying failed tasks in graph"
2259        );
2260
2261        self.channel
2262            .send(&format!(
2263                "Retrying {failed_count} failed task(s) in plan: {}\n\
2264                 Use `/plan confirm` to execute.",
2265                graph.goal
2266            ))
2267            .await?;
2268
2269        // Store retried graph back for re-execution via /plan confirm.
2270        self.orchestration.pending_graph = Some(graph);
2271        Ok(())
2272    }
2273
2274    /// Call the LLM to generate a structured session summary with a configurable timeout.
2275    ///
2276    /// Falls back to plain-text chat if structured output fails or times out. Returns `None` on
2277    /// any failure, logging a warning — callers must treat `None` as "skip storage".
2278    ///
2279    /// Each LLM attempt is bounded by `shutdown_summary_timeout_secs`; in the worst case
2280    /// (structured call times out and plain-text fallback also times out) this adds up to
2281    /// `2 * shutdown_summary_timeout_secs` of shutdown latency.
2282    async fn call_llm_for_session_summary(
2283        &self,
2284        chat_messages: &[Message],
2285    ) -> Option<zeph_memory::StructuredSummary> {
2286        let timeout_dur =
2287            std::time::Duration::from_secs(self.memory_state.shutdown_summary_timeout_secs);
2288        match tokio::time::timeout(
2289            timeout_dur,
2290            self.provider
2291                .chat_typed_erased::<zeph_memory::StructuredSummary>(chat_messages),
2292        )
2293        .await
2294        {
2295            Ok(Ok(s)) => Some(s),
2296            Ok(Err(e)) => {
2297                tracing::warn!(
2298                    "shutdown summary: structured LLM call failed, falling back to plain: {e:#}"
2299                );
2300                self.plain_text_summary_fallback(chat_messages, timeout_dur)
2301                    .await
2302            }
2303            Err(_) => {
2304                tracing::warn!(
2305                    "shutdown summary: structured LLM call timed out after {}s, falling back to plain",
2306                    self.memory_state.shutdown_summary_timeout_secs
2307                );
2308                self.plain_text_summary_fallback(chat_messages, timeout_dur)
2309                    .await
2310            }
2311        }
2312    }
2313
2314    async fn plain_text_summary_fallback(
2315        &self,
2316        chat_messages: &[Message],
2317        timeout_dur: std::time::Duration,
2318    ) -> Option<zeph_memory::StructuredSummary> {
2319        match tokio::time::timeout(timeout_dur, self.provider.chat(chat_messages)).await {
2320            Ok(Ok(plain)) => Some(zeph_memory::StructuredSummary {
2321                summary: plain,
2322                key_facts: vec![],
2323                entities: vec![],
2324            }),
2325            Ok(Err(e)) => {
2326                tracing::warn!("shutdown summary: plain LLM fallback failed: {e:#}");
2327                None
2328            }
2329            Err(_) => {
2330                tracing::warn!("shutdown summary: plain LLM fallback timed out");
2331                None
2332            }
2333        }
2334    }
2335
2336    /// Generate and store a lightweight session summary at shutdown when no hard compaction fired.
2337    ///
2338    /// Guards:
2339    /// - `shutdown_summary` config must be enabled
2340    /// - `conversation_id` must be set (memory must be attached)
2341    /// - no existing session summary in the store (primary guard — resilient to failed Qdrant writes)
2342    /// - at least `shutdown_summary_min_messages` user-turn messages in history
2343    ///
2344    /// All errors are logged as warnings and swallowed — shutdown must never fail.
2345    async fn maybe_store_shutdown_summary(&mut self) {
2346        if !self.memory_state.shutdown_summary {
2347            return;
2348        }
2349        let Some(memory) = self.memory_state.memory.clone() else {
2350            return;
2351        };
2352        let Some(conversation_id) = self.memory_state.conversation_id else {
2353            return;
2354        };
2355
2356        // Primary guard: check if a summary already exists (handles failed Qdrant writes too).
2357        match memory.has_session_summary(conversation_id).await {
2358            Ok(true) => {
2359                tracing::debug!("shutdown summary: session already has a summary, skipping");
2360                return;
2361            }
2362            Ok(false) => {}
2363            Err(e) => {
2364                tracing::warn!("shutdown summary: failed to check existing summary: {e:#}");
2365                return;
2366            }
2367        }
2368
2369        // Count user-turn messages only (skip system prompt at index 0).
2370        let user_count = self
2371            .msg
2372            .messages
2373            .iter()
2374            .skip(1)
2375            .filter(|m| m.role == Role::User)
2376            .count();
2377        if user_count < self.memory_state.shutdown_summary_min_messages {
2378            tracing::debug!(
2379                user_count,
2380                min = self.memory_state.shutdown_summary_min_messages,
2381                "shutdown summary: too few user messages, skipping"
2382            );
2383            return;
2384        }
2385
2386        // TUI status — send errors silently ignored (TUI may already be gone at shutdown).
2387        let _ = self.channel.send_status("Saving session summary...").await;
2388
2389        // Collect last N messages (skip system prompt at index 0).
2390        let max = self.memory_state.shutdown_summary_max_messages;
2391        if max == 0 {
2392            tracing::debug!("shutdown summary: max_messages=0, skipping");
2393            return;
2394        }
2395        let non_system: Vec<_> = self.msg.messages.iter().skip(1).collect();
2396        let slice = if non_system.len() > max {
2397            &non_system[non_system.len() - max..]
2398        } else {
2399            &non_system[..]
2400        };
2401
2402        let msgs_for_prompt: Vec<(zeph_memory::MessageId, String, String)> = slice
2403            .iter()
2404            .map(|m| {
2405                let role = match m.role {
2406                    Role::User => "user".to_owned(),
2407                    Role::Assistant => "assistant".to_owned(),
2408                    Role::System => "system".to_owned(),
2409                };
2410                (zeph_memory::MessageId(0), role, m.content.clone())
2411            })
2412            .collect();
2413
2414        let prompt = zeph_memory::build_summarization_prompt(&msgs_for_prompt);
2415        let chat_messages = vec![Message {
2416            role: Role::User,
2417            content: prompt,
2418            parts: vec![],
2419            metadata: MessageMetadata::default(),
2420        }];
2421
2422        let Some(structured) = self.call_llm_for_session_summary(&chat_messages).await else {
2423            let _ = self.channel.send_status("").await;
2424            return;
2425        };
2426
2427        if let Err(e) = memory
2428            .store_shutdown_summary(conversation_id, &structured.summary, &structured.key_facts)
2429            .await
2430        {
2431            tracing::warn!("shutdown summary: storage failed: {e:#}");
2432        } else {
2433            tracing::info!(
2434                conversation_id = conversation_id.0,
2435                "shutdown summary stored"
2436            );
2437        }
2438
2439        // Clear TUI status.
2440        let _ = self.channel.send_status("").await;
2441    }
2442
2443    pub async fn shutdown(&mut self) {
2444        self.channel.send("Shutting down...").await.ok();
2445
2446        // CRIT-1: persist Thompson state accumulated during this session.
2447        self.provider.save_router_state();
2448
2449        if let Some(ref mut mgr) = self.orchestration.subagent_manager {
2450            mgr.shutdown_all();
2451        }
2452
2453        if let Some(ref manager) = self.mcp.manager {
2454            manager.shutdown_all_shared().await;
2455        }
2456
2457        // Finalize compaction trajectory: push the last open segment into the Vec.
2458        // This segment would otherwise only be pushed when the next hard compaction fires,
2459        // which never happens at session end.
2460        if let Some(turns) = self.context_manager.turns_since_last_hard_compaction {
2461            self.update_metrics(|m| {
2462                m.compaction_turns_after_hard.push(turns);
2463            });
2464            self.context_manager.turns_since_last_hard_compaction = None;
2465        }
2466
2467        if let Some(ref tx) = self.metrics.metrics_tx {
2468            let m = tx.borrow();
2469            if m.filter_applications > 0 {
2470                #[allow(clippy::cast_precision_loss)]
2471                let pct = if m.filter_raw_tokens > 0 {
2472                    m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
2473                } else {
2474                    0.0
2475                };
2476                tracing::info!(
2477                    raw_tokens = m.filter_raw_tokens,
2478                    saved_tokens = m.filter_saved_tokens,
2479                    applications = m.filter_applications,
2480                    "tool output filtering saved ~{} tokens ({pct:.0}%)",
2481                    m.filter_saved_tokens,
2482                );
2483            }
2484            if m.compaction_hard_count > 0 {
2485                tracing::info!(
2486                    hard_compactions = m.compaction_hard_count,
2487                    turns_after_hard = ?m.compaction_turns_after_hard,
2488                    "hard compaction trajectory"
2489                );
2490            }
2491        }
2492
2493        self.maybe_store_shutdown_summary().await;
2494        self.maybe_store_session_digest().await;
2495
2496        tracing::info!("agent shutdown complete");
2497    }
2498
2499    /// Run the chat loop, receiving messages via the channel until EOF or shutdown.
2500    ///
2501    /// # Errors
2502    ///
2503    /// Returns an error if channel I/O or LLM communication fails.
2504    /// Refresh sub-agent metrics snapshot for the TUI metrics panel.
2505    fn refresh_subagent_metrics(&mut self) {
2506        let Some(ref mgr) = self.orchestration.subagent_manager else {
2507            return;
2508        };
2509        let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
2510            .statuses()
2511            .into_iter()
2512            .map(|(id, s)| {
2513                let def = mgr.agents_def(&id);
2514                crate::metrics::SubAgentMetrics {
2515                    name: def.map_or_else(|| id[..8.min(id.len())].to_owned(), |d| d.name.clone()),
2516                    id: id.clone(),
2517                    state: format!("{:?}", s.state).to_lowercase(),
2518                    turns_used: s.turns_used,
2519                    max_turns: def.map_or(20, |d| d.permissions.max_turns),
2520                    background: def.is_some_and(|d| d.permissions.background),
2521                    elapsed_secs: s.started_at.elapsed().as_secs(),
2522                    permission_mode: def.map_or_else(String::new, |d| {
2523                        use crate::subagent::def::PermissionMode;
2524                        match d.permissions.permission_mode {
2525                            PermissionMode::Default => String::new(),
2526                            PermissionMode::AcceptEdits => "accept_edits".into(),
2527                            PermissionMode::DontAsk => "dont_ask".into(),
2528                            PermissionMode::BypassPermissions => "bypass_permissions".into(),
2529                            PermissionMode::Plan => "plan".into(),
2530                        }
2531                    }),
2532                    transcript_dir: mgr
2533                        .agent_transcript_dir(&id)
2534                        .map(|p| p.to_string_lossy().into_owned()),
2535                }
2536            })
2537            .collect();
2538        self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
2539    }
2540
2541    /// Non-blocking poll: notify the user when background sub-agents complete.
2542    async fn notify_completed_subagents(&mut self) -> Result<(), error::AgentError> {
2543        let completed = self.poll_subagents().await;
2544        for (task_id, result) in completed {
2545            let notice = if result.is_empty() {
2546                format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
2547            } else {
2548                format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
2549            };
2550            if let Err(e) = self.channel.send(&notice).await {
2551                tracing::warn!(error = %e, "failed to send sub-agent completion notice");
2552            }
2553        }
2554        Ok(())
2555    }
2556
2557    /// Run the agent main loop.
2558    ///
2559    /// # Errors
2560    ///
2561    /// Returns an error if the channel, LLM provider, or tool execution encounters a fatal error.
2562    pub async fn run(&mut self) -> Result<(), error::AgentError> {
2563        if let Some(mut rx) = self.lifecycle.warmup_ready.take()
2564            && !*rx.borrow()
2565        {
2566            let _ = rx.changed().await;
2567            if !*rx.borrow() {
2568                tracing::warn!("model warmup did not complete successfully");
2569            }
2570        }
2571
2572        // Load the session digest once at session start for context injection.
2573        self.load_and_cache_session_digest().await;
2574
2575        loop {
2576            // Apply any pending provider override (from ACP set_session_config_option).
2577            if let Some(ref slot) = self.providers.provider_override
2578                && let Some(new_provider) = slot
2579                    .write()
2580                    .unwrap_or_else(std::sync::PoisonError::into_inner)
2581                    .take()
2582            {
2583                tracing::debug!(provider = new_provider.name(), "ACP model override applied");
2584                self.provider = new_provider;
2585            }
2586
2587            // Poll for MCP tool list updates from tools/list_changed notifications.
2588            self.check_tool_refresh().await;
2589
2590            // Process any pending MCP elicitation requests from MCP servers.
2591            self.process_pending_elicitations().await;
2592
2593            // Refresh sub-agent status in metrics before polling.
2594            self.refresh_subagent_metrics();
2595
2596            // Non-blocking poll: notify user when background sub-agents complete.
2597            self.notify_completed_subagents().await?;
2598
2599            self.drain_channel();
2600
2601            let (text, image_parts) = if let Some(queued) = self.msg.message_queue.pop_front() {
2602                self.notify_queue_count().await;
2603                if queued.raw_attachments.is_empty() {
2604                    (queued.text, queued.image_parts)
2605                } else {
2606                    let msg = crate::channel::ChannelMessage {
2607                        text: queued.text,
2608                        attachments: queued.raw_attachments,
2609                    };
2610                    self.resolve_message(msg).await
2611                }
2612            } else {
2613                let incoming = tokio::select! {
2614                    result = self.channel.recv() => result?,
2615                    () = shutdown_signal(&mut self.lifecycle.shutdown) => {
2616                        tracing::info!("shutting down");
2617                        break;
2618                    }
2619                    Some(_) = recv_optional(&mut self.skill_state.skill_reload_rx) => {
2620                        self.reload_skills().await;
2621                        continue;
2622                    }
2623                    Some(_) = recv_optional(&mut self.instructions.reload_rx) => {
2624                        self.reload_instructions();
2625                        continue;
2626                    }
2627                    Some(_) = recv_optional(&mut self.lifecycle.config_reload_rx) => {
2628                        self.reload_config();
2629                        continue;
2630                    }
2631                    Some(msg) = recv_optional(&mut self.lifecycle.update_notify_rx) => {
2632                        if let Err(e) = self.channel.send(&msg).await {
2633                            tracing::warn!("failed to send update notification: {e}");
2634                        }
2635                        continue;
2636                    }
2637                    Some(msg) = recv_optional(&mut self.experiments.notify_rx) => {
2638                        // Experiment engine completed (ok or err). Clear the cancel token so
2639                        // status reports idle and new experiments can be started.
2640                        #[cfg(feature = "experiments")]
2641                        { self.experiments.cancel = None; }
2642                        if let Err(e) = self.channel.send(&msg).await {
2643                            tracing::warn!("failed to send experiment completion: {e}");
2644                        }
2645                        continue;
2646                    }
2647                    Some(prompt) = recv_optional(&mut self.lifecycle.custom_task_rx) => {
2648                        tracing::info!("scheduler: injecting custom task as agent turn");
2649                        let text = format!("{SCHEDULED_TASK_PREFIX}{prompt}");
2650                        Some(crate::channel::ChannelMessage { text, attachments: Vec::new() })
2651                    }
2652                    Some(event) = recv_optional(&mut self.lifecycle.file_changed_rx) => {
2653                        self.handle_file_changed(event).await;
2654                        continue;
2655                    }
2656                };
2657                let Some(msg) = incoming else { break };
2658                self.drain_channel();
2659                self.resolve_message(msg).await
2660            };
2661
2662            let trimmed = text.trim();
2663
2664            match self.handle_builtin_command(trimmed).await? {
2665                Some(true) => break,
2666                Some(false) => continue,
2667                None => {}
2668            }
2669
2670            self.process_user_message(text, image_parts).await?;
2671        }
2672
2673        // Flush trace collector on normal exit (C-04: Drop handles error/panic paths).
2674        if let Some(ref mut tc) = self.debug_state.trace_collector {
2675            tc.finish();
2676        }
2677
2678        Ok(())
2679    }
2680
2681    /// Handle built-in slash commands that short-circuit the main `run` loop.
2682    ///
2683    /// Returns `Some(true)` to break the loop (exit), `Some(false)` to continue to the next
2684    /// iteration, or `None` if the command was not recognized (caller should call
2685    /// `process_user_message`).
2686    #[allow(clippy::too_many_lines)]
2687    async fn handle_builtin_command(
2688        &mut self,
2689        trimmed: &str,
2690    ) -> Result<Option<bool>, error::AgentError> {
2691        if trimmed == "/clear-queue" {
2692            let n = self.clear_queue();
2693            self.notify_queue_count().await;
2694            self.channel
2695                .send(&format!("Cleared {n} queued messages."))
2696                .await?;
2697            let _ = self.channel.flush_chunks().await;
2698            return Ok(Some(false));
2699        }
2700
2701        if trimmed == "/compact" {
2702            if self.msg.messages.len() > self.context_manager.compaction_preserve_tail + 1 {
2703                match self.compact_context().await {
2704                    Ok(
2705                        context::CompactionOutcome::Compacted
2706                        | context::CompactionOutcome::NoChange,
2707                    ) => {
2708                        let _ = self.channel.send("Context compacted successfully.").await;
2709                    }
2710                    Ok(context::CompactionOutcome::ProbeRejected) => {
2711                        let _ = self
2712                            .channel
2713                            .send(
2714                                "Compaction rejected: summary quality below threshold. \
2715                                 Original context preserved.",
2716                            )
2717                            .await;
2718                    }
2719                    Err(e) => {
2720                        let _ = self.channel.send(&format!("Compaction failed: {e}")).await;
2721                    }
2722                }
2723            } else {
2724                let _ = self.channel.send("Nothing to compact.").await;
2725            }
2726            let _ = self.channel.flush_chunks().await;
2727            return Ok(Some(false));
2728        }
2729
2730        if trimmed == "/new" || trimmed.starts_with("/new ") {
2731            let args = trimmed.strip_prefix("/new").unwrap_or("").trim();
2732            let keep_plan = args.split_whitespace().any(|a| a == "--keep-plan");
2733            let no_digest = args.split_whitespace().any(|a| a == "--no-digest");
2734            match self.reset_conversation(keep_plan, no_digest).await {
2735                Ok((old_id, new_id)) => {
2736                    let old = old_id.map_or_else(|| "none".to_string(), |id| id.0.to_string());
2737                    let new = new_id.map_or_else(|| "none".to_string(), |id| id.0.to_string());
2738                    let keep_note = if keep_plan { " (plan preserved)" } else { "" };
2739                    self.channel
2740                        .send(&format!(
2741                            "New conversation started. Previous: {old} → Current: {new}{keep_note}"
2742                        ))
2743                        .await?;
2744                }
2745                Err(e) => {
2746                    self.channel
2747                        .send(&format!("Failed to start new conversation: {e}"))
2748                        .await?;
2749                }
2750            }
2751            let _ = self.channel.flush_chunks().await;
2752            return Ok(Some(false));
2753        }
2754
2755        if trimmed == "/clear" {
2756            self.clear_history();
2757            self.tool_orchestrator.clear_cache();
2758            if let Ok(mut urls) = self.security.user_provided_urls.write() {
2759                urls.clear();
2760            }
2761            let _ = self.channel.flush_chunks().await;
2762            return Ok(Some(false));
2763        }
2764
2765        if trimmed == "/reset" {
2766            self.clear_history();
2767            self.tool_orchestrator.clear_cache();
2768            if let Ok(mut urls) = self.security.user_provided_urls.write() {
2769                urls.clear();
2770            }
2771            self.channel.send("Conversation history reset.").await?;
2772            let _ = self.channel.flush_chunks().await;
2773            return Ok(Some(false));
2774        }
2775
2776        if trimmed == "/cache-stats" {
2777            let stats = self.tool_orchestrator.cache_stats();
2778            self.channel.send(&stats).await?;
2779            let _ = self.channel.flush_chunks().await;
2780            return Ok(Some(false));
2781        }
2782
2783        if trimmed == "/model" || trimmed.starts_with("/model ") {
2784            self.handle_model_command(trimmed).await;
2785            let _ = self.channel.flush_chunks().await;
2786            return Ok(Some(false));
2787        }
2788
2789        if trimmed == "/provider" || trimmed.starts_with("/provider ") {
2790            self.handle_provider_command(trimmed).await;
2791            let _ = self.channel.flush_chunks().await;
2792            return Ok(Some(false));
2793        }
2794
2795        if trimmed == "/debug-dump" || trimmed.starts_with("/debug-dump ") {
2796            self.handle_debug_dump_command(trimmed).await;
2797            let _ = self.channel.flush_chunks().await;
2798            return Ok(Some(false));
2799        }
2800
2801        if trimmed.starts_with("/dump-format") {
2802            self.handle_dump_format_command(trimmed).await;
2803            let _ = self.channel.flush_chunks().await;
2804            return Ok(Some(false));
2805        }
2806
2807        if trimmed == "/exit" || trimmed == "/quit" {
2808            if self.channel.supports_exit() {
2809                return Ok(Some(true));
2810            }
2811            let _ = self
2812                .channel
2813                .send("/exit is not supported in this channel.")
2814                .await;
2815            return Ok(Some(false));
2816        }
2817
2818        Ok(None)
2819    }
2820
2821    /// Switch the active provider to one serving `model_id`.
2822    ///
2823    /// Looks up the model in the provider's remote model list (or cache).
2824    ///
2825    /// # Errors
2826    ///
2827    /// Returns `Err` if the model is not found.
2828    pub fn set_model(&mut self, model_id: &str) -> Result<(), String> {
2829        if model_id.is_empty() {
2830            return Err("model id must not be empty".to_string());
2831        }
2832        if model_id.len() > 256 {
2833            return Err("model id exceeds maximum length of 256 characters".to_string());
2834        }
2835        if !model_id
2836            .chars()
2837            .all(|c| c.is_ascii() && !c.is_ascii_control())
2838        {
2839            return Err("model id must contain only printable ASCII characters".to_string());
2840        }
2841        self.runtime.model_name = model_id.to_string();
2842        tracing::info!(model = model_id, "set_model called");
2843        Ok(())
2844    }
2845
2846    async fn handle_model_refresh(&mut self) {
2847        // Invalidate all model cache files in the cache directory.
2848        if let Some(cache_dir) = dirs::cache_dir() {
2849            let models_dir = cache_dir.join("zeph").join("models");
2850            if let Ok(entries) = std::fs::read_dir(&models_dir) {
2851                for entry in entries.flatten() {
2852                    let path = entry.path();
2853                    if path.extension().and_then(|e| e.to_str()) == Some("json") {
2854                        let _ = std::fs::remove_file(&path);
2855                    }
2856                }
2857            }
2858        }
2859        match self.provider.list_models_remote().await {
2860            Ok(models) => {
2861                let _ = self
2862                    .channel
2863                    .send(&format!("Fetched {} models.", models.len()))
2864                    .await;
2865            }
2866            Err(e) => {
2867                let _ = self
2868                    .channel
2869                    .send(&format!("Error fetching models: {e}"))
2870                    .await;
2871            }
2872        }
2873    }
2874
2875    async fn handle_model_list(&mut self) {
2876        let cache = zeph_llm::model_cache::ModelCache::for_slug(self.provider.name());
2877        let cached = if cache.is_stale() {
2878            None
2879        } else {
2880            cache.load().unwrap_or(None)
2881        };
2882        let models = if let Some(m) = cached {
2883            m
2884        } else {
2885            match self.provider.list_models_remote().await {
2886                Ok(m) => m,
2887                Err(e) => {
2888                    let _ = self
2889                        .channel
2890                        .send(&format!("Error fetching models: {e}"))
2891                        .await;
2892                    return;
2893                }
2894            }
2895        };
2896        if models.is_empty() {
2897            let _ = self.channel.send("No models available.").await;
2898            return;
2899        }
2900        let mut lines = vec!["Available models:".to_string()];
2901        for (i, m) in models.iter().enumerate() {
2902            lines.push(format!("  {}. {} ({})", i + 1, m.display_name, m.id));
2903        }
2904        let _ = self.channel.send(&lines.join("\n")).await;
2905    }
2906
2907    async fn handle_model_switch(&mut self, model_id: &str) {
2908        // Validate model_id against the known model list before switching.
2909        // Try disk cache first; fall back to a remote fetch if the cache is stale.
2910        let cache = zeph_llm::model_cache::ModelCache::for_slug(self.provider.name());
2911        let known_models: Option<Vec<zeph_llm::model_cache::RemoteModelInfo>> = if cache.is_stale()
2912        {
2913            match self.provider.list_models_remote().await {
2914                Ok(m) if !m.is_empty() => Some(m),
2915                _ => None,
2916            }
2917        } else {
2918            cache.load().unwrap_or(None)
2919        };
2920        if let Some(models) = known_models {
2921            if !models.iter().any(|m| m.id == model_id) {
2922                let mut lines = vec![format!("Unknown model '{model_id}'. Available models:")];
2923                for m in &models {
2924                    lines.push(format!("  • {} ({})", m.display_name, m.id));
2925                }
2926                let _ = self.channel.send(&lines.join("\n")).await;
2927                return;
2928            }
2929        } else {
2930            let _ = self
2931                .channel
2932                .send(
2933                    "Model list unavailable, switching anyway — verify your model name is correct.",
2934                )
2935                .await;
2936        }
2937        match self.set_model(model_id) {
2938            Ok(()) => {
2939                let _ = self
2940                    .channel
2941                    .send(&format!("Switched to model: {model_id}"))
2942                    .await;
2943            }
2944            Err(e) => {
2945                let _ = self.channel.send(&format!("Error: {e}")).await;
2946            }
2947        }
2948    }
2949
2950    /// Handle `/model`, `/model <id>`, and `/model refresh` commands.
2951    async fn handle_model_command(&mut self, trimmed: &str) {
2952        let arg = trimmed.strip_prefix("/model").map_or("", str::trim);
2953        if arg == "refresh" {
2954            self.handle_model_refresh().await;
2955        } else if arg.is_empty() {
2956            self.handle_model_list().await;
2957        } else {
2958            self.handle_model_switch(arg).await;
2959        }
2960    }
2961
2962    /// Handle `/debug-dump` and `/debug-dump <path>` commands.
2963    async fn handle_debug_dump_command(&mut self, trimmed: &str) {
2964        let arg = trimmed.strip_prefix("/debug-dump").map_or("", str::trim);
2965        if arg.is_empty() {
2966            match &self.debug_state.debug_dumper {
2967                Some(d) => {
2968                    let _ = self
2969                        .channel
2970                        .send(&format!("Debug dump active: {}", d.dir().display()))
2971                        .await;
2972                }
2973                None => {
2974                    let _ = self
2975                        .channel
2976                        .send(
2977                            "Debug dump is inactive. Use `/debug-dump <path>` to enable, \
2978                             or start with `--debug-dump [dir]`.",
2979                        )
2980                        .await;
2981                }
2982            }
2983            return;
2984        }
2985        let dir = std::path::PathBuf::from(arg);
2986        match crate::debug_dump::DebugDumper::new(&dir, self.debug_state.dump_format) {
2987            Ok(dumper) => {
2988                let path = dumper.dir().display().to_string();
2989                self.debug_state.debug_dumper = Some(dumper);
2990                let _ = self
2991                    .channel
2992                    .send(&format!("Debug dump enabled: {path}"))
2993                    .await;
2994            }
2995            Err(e) => {
2996                let _ = self
2997                    .channel
2998                    .send(&format!("Failed to enable debug dump: {e}"))
2999                    .await;
3000            }
3001        }
3002    }
3003
3004    /// Handle `/dump-format <json|raw|trace>` command — switch debug dump format at runtime.
3005    async fn handle_dump_format_command(&mut self, trimmed: &str) {
3006        let arg = trimmed.strip_prefix("/dump-format").map_or("", str::trim);
3007        if arg.is_empty() {
3008            let _ = self
3009                .channel
3010                .send(&format!(
3011                    "Current dump format: {:?}. Use `/dump-format json|raw|trace` to change.",
3012                    self.debug_state.dump_format
3013                ))
3014                .await;
3015            return;
3016        }
3017        let new_format = match arg {
3018            "json" => crate::debug_dump::DumpFormat::Json,
3019            "raw" => crate::debug_dump::DumpFormat::Raw,
3020            "trace" => crate::debug_dump::DumpFormat::Trace,
3021            other => {
3022                let _ = self
3023                    .channel
3024                    .send(&format!(
3025                        "Unknown format '{other}'. Valid values: json, raw, trace."
3026                    ))
3027                    .await;
3028                return;
3029            }
3030        };
3031        let was_trace = self.debug_state.dump_format == crate::debug_dump::DumpFormat::Trace;
3032        let now_trace = new_format == crate::debug_dump::DumpFormat::Trace;
3033
3034        // CR-04: when switching TO trace, create a fresh TracingCollector.
3035        if now_trace
3036            && !was_trace
3037            && let Some(ref dump_dir) = self.debug_state.dump_dir.clone()
3038        {
3039            let service_name = self.debug_state.trace_service_name.clone();
3040            let redact = self.debug_state.trace_redact;
3041            match crate::debug_dump::trace::TracingCollector::new(
3042                dump_dir.as_path(),
3043                &service_name,
3044                redact,
3045                None,
3046            ) {
3047                Ok(collector) => {
3048                    self.debug_state.trace_collector = Some(collector);
3049                }
3050                Err(e) => {
3051                    tracing::warn!(error = %e, "failed to create TracingCollector on format switch");
3052                }
3053            }
3054        }
3055        // CR-04: when switching AWAY from trace, flush and drop the collector.
3056        if was_trace
3057            && !now_trace
3058            && let Some(mut tc) = self.debug_state.trace_collector.take()
3059        {
3060            tc.finish();
3061        }
3062
3063        self.debug_state.dump_format = new_format;
3064        let _ = self
3065            .channel
3066            .send(&format!("Debug dump format set to: {arg}"))
3067            .await;
3068    }
3069
3070    async fn resolve_message(
3071        &self,
3072        msg: crate::channel::ChannelMessage,
3073    ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
3074        use crate::channel::{Attachment, AttachmentKind};
3075        use zeph_llm::provider::{ImageData, MessagePart};
3076
3077        let text_base = msg.text.clone();
3078
3079        let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
3080            .attachments
3081            .into_iter()
3082            .partition(|a| a.kind == AttachmentKind::Audio);
3083
3084        tracing::debug!(
3085            audio = audio_attachments.len(),
3086            has_stt = self.providers.stt.is_some(),
3087            "resolve_message attachments"
3088        );
3089
3090        let text = if !audio_attachments.is_empty()
3091            && let Some(stt) = self.providers.stt.as_ref()
3092        {
3093            let mut transcribed_parts = Vec::new();
3094            for attachment in &audio_attachments {
3095                if attachment.data.len() > MAX_AUDIO_BYTES {
3096                    tracing::warn!(
3097                        size = attachment.data.len(),
3098                        max = MAX_AUDIO_BYTES,
3099                        "audio attachment exceeds size limit, skipping"
3100                    );
3101                    continue;
3102                }
3103                match stt
3104                    .transcribe(&attachment.data, attachment.filename.as_deref())
3105                    .await
3106                {
3107                    Ok(result) => {
3108                        tracing::info!(
3109                            len = result.text.len(),
3110                            language = ?result.language,
3111                            "audio transcribed"
3112                        );
3113                        transcribed_parts.push(result.text);
3114                    }
3115                    Err(e) => {
3116                        tracing::error!(error = %e, "audio transcription failed");
3117                    }
3118                }
3119            }
3120            if transcribed_parts.is_empty() {
3121                text_base
3122            } else {
3123                let transcribed = transcribed_parts.join("\n");
3124                if text_base.is_empty() {
3125                    transcribed
3126                } else {
3127                    format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
3128                }
3129            }
3130        } else {
3131            if !audio_attachments.is_empty() {
3132                tracing::warn!(
3133                    count = audio_attachments.len(),
3134                    "audio attachments received but no STT provider configured, dropping"
3135                );
3136            }
3137            text_base
3138        };
3139
3140        let mut image_parts = Vec::new();
3141        for attachment in image_attachments {
3142            if attachment.data.len() > MAX_IMAGE_BYTES {
3143                tracing::warn!(
3144                    size = attachment.data.len(),
3145                    max = MAX_IMAGE_BYTES,
3146                    "image attachment exceeds size limit, skipping"
3147                );
3148                continue;
3149            }
3150            let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
3151            image_parts.push(MessagePart::Image(Box::new(ImageData {
3152                data: attachment.data,
3153                mime_type,
3154            })));
3155        }
3156
3157        (text, image_parts)
3158    }
3159
3160    /// Dispatch slash commands. Returns `Some(Ok(()))` when handled,
3161    /// `Some(Err(_))` on I/O error, `None` to fall through to LLM processing.
3162    #[allow(clippy::too_many_lines)]
3163    async fn dispatch_slash_command(
3164        &mut self,
3165        trimmed: &str,
3166    ) -> Option<Result<(), error::AgentError>> {
3167        macro_rules! handled {
3168            ($expr:expr) => {{
3169                if let Err(e) = $expr {
3170                    return Some(Err(e));
3171                }
3172                let _ = self.channel.flush_chunks().await;
3173                return Some(Ok(()));
3174            }};
3175        }
3176
3177        // Slash command arguments may contain user-provided URLs (e.g. `/browse https://...`).
3178        // Extract them here so UrlGroundingVerifier allows follow-up fetch calls.
3179        let slash_urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
3180        if !slash_urls.is_empty()
3181            && let Ok(mut set) = self.security.user_provided_urls.write()
3182        {
3183            set.extend(slash_urls);
3184        }
3185
3186        if trimmed == "/help" {
3187            handled!(self.handle_help_command().await);
3188        }
3189
3190        if trimmed == "/status" {
3191            handled!(self.handle_status_command().await);
3192        }
3193
3194        #[cfg(feature = "guardrail")]
3195        if trimmed == "/guardrail" {
3196            handled!(self.handle_guardrail_command().await);
3197        }
3198
3199        if trimmed == "/skills" || trimmed.starts_with("/skills ") {
3200            let subcommand = trimmed.strip_prefix("/skills").unwrap_or("").trim();
3201            handled!(self.handle_skills_family(subcommand).await);
3202        }
3203
3204        if trimmed == "/skill" || trimmed.starts_with("/skill ") {
3205            let rest = trimmed
3206                .strip_prefix("/skill")
3207                .unwrap_or("")
3208                .trim()
3209                .to_owned();
3210            handled!(self.handle_skill_command(&rest).await);
3211        }
3212
3213        if trimmed == "/feedback" || trimmed.starts_with("/feedback ") {
3214            let rest = trimmed
3215                .strip_prefix("/feedback")
3216                .unwrap_or("")
3217                .trim()
3218                .to_owned();
3219            handled!(self.handle_feedback(&rest).await);
3220        }
3221
3222        if trimmed == "/mcp" || trimmed.starts_with("/mcp ") {
3223            let args = trimmed.strip_prefix("/mcp").unwrap_or("").trim().to_owned();
3224            handled!(self.handle_mcp_command(&args).await);
3225        }
3226
3227        if trimmed == "/image" || trimmed.starts_with("/image ") {
3228            let path = trimmed
3229                .strip_prefix("/image")
3230                .unwrap_or("")
3231                .trim()
3232                .to_owned();
3233            if path.is_empty() {
3234                handled!(
3235                    self.channel
3236                        .send("Usage: /image <path>")
3237                        .await
3238                        .map_err(Into::into)
3239                );
3240            }
3241            handled!(self.handle_image_command(&path).await);
3242        }
3243
3244        if trimmed == "/plan" || trimmed.starts_with("/plan ") {
3245            return Some(self.dispatch_plan_command(trimmed).await);
3246        }
3247
3248        if trimmed == "/graph" || trimmed.starts_with("/graph ") {
3249            handled!(self.handle_graph_command(trimmed).await);
3250        }
3251
3252        if trimmed == "/memory" || trimmed.starts_with("/memory ") {
3253            handled!(self.handle_memory_command(trimmed).await);
3254        }
3255
3256        #[cfg(feature = "compression-guidelines")]
3257        if trimmed == "/guidelines" {
3258            handled!(self.handle_guidelines_command().await);
3259        }
3260
3261        #[cfg(feature = "scheduler")]
3262        if trimmed == "/scheduler" || trimmed.starts_with("/scheduler ") {
3263            handled!(self.handle_scheduler_command(trimmed).await);
3264        }
3265
3266        #[cfg(feature = "experiments")]
3267        if trimmed == "/experiment" || trimmed.starts_with("/experiment ") {
3268            handled!(self.handle_experiment_command(trimmed).await);
3269        }
3270
3271        #[cfg(feature = "lsp-context")]
3272        if trimmed == "/lsp" {
3273            handled!(self.handle_lsp_status_command().await);
3274        }
3275
3276        #[cfg(feature = "policy-enforcer")]
3277        if trimmed == "/policy" || trimmed.starts_with("/policy ") {
3278            let args = trimmed
3279                .strip_prefix("/policy")
3280                .unwrap_or("")
3281                .trim()
3282                .to_owned();
3283            handled!(self.handle_policy_command(&args).await);
3284        }
3285
3286        if trimmed == "/log" {
3287            handled!(self.handle_log_command().await);
3288        }
3289
3290        if trimmed.starts_with("/agent") || trimmed.starts_with('@') {
3291            return self.dispatch_agent_command(trimmed).await;
3292        }
3293
3294        #[cfg(feature = "context-compression")]
3295        if trimmed == "/focus" {
3296            handled!(self.handle_focus_status_command().await);
3297        }
3298
3299        #[cfg(feature = "context-compression")]
3300        if trimmed == "/sidequest" {
3301            handled!(self.handle_sidequest_status_command().await);
3302        }
3303
3304        None
3305    }
3306
3307    async fn dispatch_plan_command(&mut self, trimmed: &str) -> Result<(), error::AgentError> {
3308        match crate::orchestration::PlanCommand::parse(trimmed) {
3309            Ok(cmd) => {
3310                self.handle_plan_command(cmd).await?;
3311            }
3312            Err(e) => {
3313                self.channel
3314                    .send(&e.to_string())
3315                    .await
3316                    .map_err(error::AgentError::from)?;
3317            }
3318        }
3319        let _ = self.channel.flush_chunks().await;
3320        Ok(())
3321    }
3322
3323    async fn dispatch_agent_command(
3324        &mut self,
3325        trimmed: &str,
3326    ) -> Option<Result<(), error::AgentError>> {
3327        let known: Vec<String> = self
3328            .orchestration
3329            .subagent_manager
3330            .as_ref()
3331            .map(|m| m.definitions().iter().map(|d| d.name.clone()).collect())
3332            .unwrap_or_default();
3333        match crate::subagent::AgentCommand::parse(trimmed, &known) {
3334            Ok(cmd) => {
3335                if let Some(msg) = self.handle_agent_command(cmd).await
3336                    && let Err(e) = self.channel.send(&msg).await
3337                {
3338                    return Some(Err(e.into()));
3339                }
3340                let _ = self.channel.flush_chunks().await;
3341                Some(Ok(()))
3342            }
3343            Err(e) if trimmed.starts_with('@') => {
3344                // Unknown @token — fall through to normal LLM processing
3345                tracing::debug!("@mention not matched as agent: {e}");
3346                None
3347            }
3348            Err(e) => {
3349                if let Err(send_err) = self.channel.send(&e.to_string()).await {
3350                    return Some(Err(send_err.into()));
3351                }
3352                let _ = self.channel.flush_chunks().await;
3353                Some(Ok(()))
3354            }
3355        }
3356    }
3357
3358    /// Spawn a background task to evaluate the user message with the LLM judge (or `LlmClassifier`)
3359    /// and store the correction result. Non-blocking: the task runs independently of the response
3360    /// pipeline.
3361    ///
3362    /// # Notes
3363    ///
3364    /// TODO(I3): `JoinHandle`s are not tracked — outstanding tasks may be aborted on runtime
3365    /// shutdown before `store_user_correction` completes. Acceptable for MVP.
3366    #[allow(clippy::too_many_lines)]
3367    fn spawn_judge_correction_check(
3368        &mut self,
3369        trimmed: &str,
3370        conv_id: Option<zeph_memory::ConversationId>,
3371    ) {
3372        let assistant_snippet = self.last_assistant_response();
3373        let user_msg_owned = trimmed.to_owned();
3374        let memory_arc = self.memory_state.memory.clone();
3375        let skill_name = self
3376            .skill_state
3377            .active_skill_names
3378            .first()
3379            .cloned()
3380            .unwrap_or_default();
3381        let conv_id_bg = conv_id;
3382        let confidence_threshold = self
3383            .learning_engine
3384            .config
3385            .as_ref()
3386            .map_or(0.6, |c| c.correction_confidence_threshold);
3387
3388        if let Some(llm_classifier) = self.feedback.llm_classifier.clone() {
3389            // DetectorMode::Model: clone the classifier (cheap — it holds Arc<AnyProvider>).
3390            let user_msg = user_msg_owned.clone();
3391            let assistant = assistant_snippet.clone();
3392            let memory_arc2 = memory_arc.clone();
3393            let skill_name2 = skill_name.clone();
3394            // Clone metrics handles for use inside the spawned task.
3395            let classifier_metrics_bg = self.metrics.classifier_metrics.clone();
3396            let metrics_tx_bg = self.metrics.metrics_tx.clone();
3397            tokio::spawn(async move {
3398                match llm_classifier
3399                    .classify_feedback(&user_msg, &assistant, confidence_threshold)
3400                    .await
3401                {
3402                    Ok(verdict) => {
3403                        // Push classifier snapshot after feedback classification.
3404                        if let (Some(ref cm), Some(ref tx)) = (classifier_metrics_bg, metrics_tx_bg)
3405                        {
3406                            let snap = cm.snapshot();
3407                            tx.send_modify(|ms| ms.classifier = snap);
3408                        }
3409                        if let Some(signal) = feedback_verdict_into_signal(&verdict, &user_msg) {
3410                            let is_self_correction =
3411                                signal.kind == feedback_detector::CorrectionKind::SelfCorrection;
3412                            tracing::info!(
3413                                kind = signal.kind.as_str(),
3414                                confidence = signal.confidence,
3415                                source = "llm-classifier",
3416                                is_self_correction,
3417                                "correction signal detected"
3418                            );
3419                            store_correction_in_memory(
3420                                memory_arc2,
3421                                conv_id_bg,
3422                                &assistant,
3423                                &user_msg,
3424                                skill_name2,
3425                                signal.kind.as_str(),
3426                            )
3427                            .await;
3428                        }
3429                    }
3430                    Err(e) => {
3431                        tracing::warn!("llm-classifier failed: {e:#}");
3432                    }
3433                }
3434            });
3435        } else {
3436            // DetectorMode::Judge (legacy path).
3437            let judge_provider = self
3438                .providers
3439                .judge_provider
3440                .clone()
3441                .unwrap_or_else(|| self.provider.clone());
3442            let user_msg = user_msg_owned.clone();
3443            let assistant = assistant_snippet.clone();
3444            tokio::spawn(async move {
3445                match feedback_detector::JudgeDetector::evaluate(
3446                    &judge_provider,
3447                    &user_msg,
3448                    &assistant,
3449                    confidence_threshold,
3450                )
3451                .await
3452                {
3453                    Ok(verdict) => {
3454                        if let Some(signal) = verdict.into_signal(&user_msg) {
3455                            // Self-corrections (user corrects their own statement) must not
3456                            // penalize skills. The judge path has no record_skill_outcomes()
3457                            // call today, but this guard mirrors the regex path to make the
3458                            // intent explicit and prevent future regressions if parity is added.
3459                            let is_self_correction =
3460                                signal.kind == feedback_detector::CorrectionKind::SelfCorrection;
3461                            tracing::info!(
3462                                kind = signal.kind.as_str(),
3463                                confidence = signal.confidence,
3464                                source = "judge",
3465                                is_self_correction,
3466                                "correction signal detected"
3467                            );
3468                            store_correction_in_memory(
3469                                memory_arc,
3470                                conv_id_bg,
3471                                &assistant,
3472                                &user_msg,
3473                                skill_name,
3474                                signal.kind.as_str(),
3475                            )
3476                            .await;
3477                        }
3478                    }
3479                    Err(e) => {
3480                        tracing::warn!("judge detector failed: {e:#}");
3481                    }
3482                }
3483            });
3484        }
3485    }
3486
3487    /// Detect implicit corrections in the user's message and record them in the learning engine.
3488    ///
3489    /// Uses regex-based `FeedbackDetector` first. If a `JudgeDetector` is configured and the
3490    /// regex result is borderline, the LLM judge runs in a background task (non-blocking).
3491    /// When `DetectorMode::Model` and an `LlmClassifier` is attached, the LLM classifier is
3492    /// used instead of `JudgeDetector`, sharing the same adaptive thresholds and rate limiter.
3493    #[allow(clippy::too_many_lines)]
3494    async fn detect_and_record_corrections(
3495        &mut self,
3496        trimmed: &str,
3497        conv_id: Option<zeph_memory::ConversationId>,
3498    ) {
3499        let correction_detection_enabled = self
3500            .learning_engine
3501            .config
3502            .as_ref()
3503            .is_none_or(|c| c.correction_detection);
3504        if !correction_detection_enabled {
3505            return;
3506        }
3507
3508        let previous_user_messages: Vec<&str> = self
3509            .msg
3510            .messages
3511            .iter()
3512            .filter(|m| m.role == Role::User)
3513            .map(|m| m.content.as_str())
3514            .collect();
3515
3516        let regex_signal = self
3517            .feedback
3518            .detector
3519            .detect(trimmed, &previous_user_messages);
3520
3521        // Judge/Model mode: invoke LLM in background if regex is borderline or missed.
3522        //
3523        // The LLM call is decoupled from the response pipeline — it records the
3524        // correction asynchronously via tokio::spawn and returns None immediately
3525        // so the user response is not blocked.
3526        //
3527        // TODO(I3): JoinHandles are not tracked — outstanding tasks may be aborted
3528        // on runtime shutdown before store_user_correction completes. This is
3529        // acceptable for the learning subsystem at MVP. Future: collect handles in
3530        // Agent and drain on graceful shutdown.
3531        // Check rate limit synchronously before deciding to spawn.
3532        // The feedback.judge is &mut self so check_rate_limit() can update call_times.
3533        //
3534        // DetectorMode::Model reuses the judge's adaptive thresholds + rate limiter.
3535        // If llm_classifier is present but judge is None, create a temporary JudgeDetector
3536        // for threshold/rate-limit checking only (not for actual LLM calls).
3537        let judge_should_run = if self.feedback.llm_classifier.is_some() {
3538            // Model mode: use judge thresholds + rate limiter for gating.
3539            let adaptive_low = self
3540                .learning_engine
3541                .config
3542                .as_ref()
3543                .map_or(0.5, |c| c.judge_adaptive_low);
3544            let adaptive_high = self
3545                .learning_engine
3546                .config
3547                .as_ref()
3548                .map_or(0.8, |c| c.judge_adaptive_high);
3549            let should_invoke = self
3550                .feedback
3551                .judge
3552                .get_or_insert_with(|| {
3553                    feedback_detector::JudgeDetector::new(adaptive_low, adaptive_high)
3554                })
3555                .should_invoke(regex_signal.as_ref());
3556            should_invoke
3557                && self
3558                    .feedback
3559                    .judge
3560                    .as_mut()
3561                    .is_some_and(feedback_detector::JudgeDetector::check_rate_limit)
3562        } else {
3563            // Judge mode (or regex-only when neither judge nor llm_classifier is set).
3564            self.feedback
3565                .judge
3566                .as_ref()
3567                .is_some_and(|jd| jd.should_invoke(regex_signal.as_ref()))
3568                && self
3569                    .feedback
3570                    .judge
3571                    .as_mut() // lgtm[rust/cleartext-logging]
3572                    .is_some_and(feedback_detector::JudgeDetector::check_rate_limit)
3573        };
3574
3575        let (signal, signal_source) = if judge_should_run {
3576            self.spawn_judge_correction_check(trimmed, conv_id);
3577            // Judge runs in background — return None so the response pipeline continues.
3578            (None, "judge")
3579        } else {
3580            (regex_signal, "regex")
3581        };
3582
3583        let Some(signal) = signal else { return };
3584        tracing::info!(
3585            kind = signal.kind.as_str(),
3586            confidence = signal.confidence,
3587            source = signal_source,
3588            "implicit correction detected"
3589        );
3590        // REV-PH2-002 + SEC-PH2-002: cap feedback_text to 500 chars (UTF-8 safe)
3591        let feedback_text = context::truncate_chars(&signal.feedback_text, 500);
3592        // Self-corrections (user corrects their own statement) must not penalize skills —
3593        // the agent did nothing wrong. Store for analytics but skip skill outcome recording.
3594        if self.is_learning_enabled()
3595            && signal.kind != feedback_detector::CorrectionKind::SelfCorrection
3596        {
3597            self.record_skill_outcomes(
3598                "user_rejection",
3599                Some(&feedback_text),
3600                Some(signal.kind.as_str()),
3601            )
3602            .await;
3603        }
3604        if let Some(memory) = &self.memory_state.memory {
3605            // Use `trimmed` (raw user input, untainted by secrets) instead of
3606            // `feedback_text` (derived from previous_user_messages → self.msg.messages)
3607            // to avoid the CodeQL cleartext-logging taint path.
3608            let correction_text = context::truncate_chars(trimmed, 500);
3609            match memory
3610                .sqlite()
3611                .store_user_correction(
3612                    conv_id.map(|c| c.0),
3613                    "",
3614                    &correction_text,
3615                    self.skill_state
3616                        .active_skill_names
3617                        .first()
3618                        .map(String::as_str),
3619                    signal.kind.as_str(),
3620                )
3621                .await
3622            {
3623                Ok(correction_id) => {
3624                    if let Err(e) = memory
3625                        .store_correction_embedding(correction_id, &correction_text)
3626                        .await
3627                    {
3628                        tracing::warn!("failed to store correction embedding: {e:#}");
3629                    }
3630                }
3631                Err(e) => tracing::warn!("failed to store user correction: {e:#}"),
3632            }
3633        }
3634    }
3635
3636    async fn process_user_message(
3637        &mut self,
3638        text: String,
3639        image_parts: Vec<zeph_llm::provider::MessagePart>,
3640    ) -> Result<(), error::AgentError> {
3641        // Record iteration start in trace collector (C-02: owned guard, no borrow held).
3642        let iteration_index = self.debug_state.iteration_counter;
3643        self.debug_state.iteration_counter += 1;
3644        if let Some(ref mut tc) = self.debug_state.trace_collector {
3645            tc.begin_iteration(iteration_index, text.trim());
3646            // CR-01: store the span ID so LLM/tool execution can attach child spans.
3647            self.debug_state.current_iteration_span_id =
3648                tc.current_iteration_span_id(iteration_index);
3649        }
3650
3651        let result = self
3652            .process_user_message_inner(text, image_parts, iteration_index)
3653            .await;
3654
3655        // Close iteration span regardless of outcome (partial trace preserved on error).
3656        if let Some(ref mut tc) = self.debug_state.trace_collector {
3657            let status = if result.is_ok() {
3658                crate::debug_dump::trace::SpanStatus::Ok
3659            } else {
3660                crate::debug_dump::trace::SpanStatus::Error {
3661                    message: "iteration failed".to_owned(),
3662                }
3663            };
3664            tc.end_iteration(iteration_index, status);
3665        }
3666        self.debug_state.current_iteration_span_id = None;
3667
3668        result
3669    }
3670
3671    #[allow(clippy::too_many_lines)]
3672    async fn process_user_message_inner(
3673        &mut self,
3674        text: String,
3675        image_parts: Vec<zeph_llm::provider::MessagePart>,
3676        iteration_index: usize,
3677    ) -> Result<(), error::AgentError> {
3678        let _ = iteration_index; // Used indirectly via debug_state.current_iteration_span_id.
3679        self.lifecycle.cancel_token = CancellationToken::new();
3680        let signal = Arc::clone(&self.lifecycle.cancel_signal);
3681        let token = self.lifecycle.cancel_token.clone();
3682        tokio::spawn(async move {
3683            signal.notified().await;
3684            token.cancel();
3685        });
3686        let trimmed = text.trim();
3687
3688        if let Some(result) = self.dispatch_slash_command(trimmed).await {
3689            return result;
3690        }
3691
3692        self.check_pending_rollbacks().await;
3693
3694        // Guardrail: LLM-based prompt injection pre-screening at the user input boundary.
3695        #[cfg(feature = "guardrail")]
3696        if let Some(ref guardrail) = self.security.guardrail {
3697            use zeph_sanitizer::guardrail::GuardrailVerdict;
3698            let verdict = guardrail.check(trimmed).await;
3699            match &verdict {
3700                GuardrailVerdict::Flagged { reason, .. } => {
3701                    tracing::warn!(
3702                        reason = %reason,
3703                        should_block = verdict.should_block(),
3704                        "guardrail flagged user input"
3705                    );
3706                    if verdict.should_block() {
3707                        let msg = format!("[guardrail] Input blocked: {reason}");
3708                        let _ = self.channel.send(&msg).await;
3709                        let _ = self.channel.flush_chunks().await;
3710                        return Ok(());
3711                    }
3712                    // Warn mode: notify but continue.
3713                    let _ = self
3714                        .channel
3715                        .send(&format!("[guardrail] Warning: {reason}"))
3716                        .await;
3717                }
3718                GuardrailVerdict::Error { error } => {
3719                    if guardrail.error_should_block() {
3720                        tracing::warn!(%error, "guardrail check failed (fail_strategy=closed), blocking input");
3721                        let msg = "[guardrail] Input blocked: check failed (see logs for details)";
3722                        let _ = self.channel.send(msg).await;
3723                        let _ = self.channel.flush_chunks().await;
3724                        return Ok(());
3725                    }
3726                    tracing::warn!(%error, "guardrail check failed (fail_strategy=open), allowing input");
3727                }
3728                GuardrailVerdict::Safe => {}
3729            }
3730        }
3731
3732        // ML classifier: lightweight injection detection on user input boundary.
3733        // Runs after guardrail (LLM-based) to layer defenses. On detection, blocks and returns.
3734        // Falls back to regex on classifier error/timeout — never degrades below regex baseline.
3735        // Gated by `scan_user_input`: DeBERTa is tuned for external/untrusted content, not
3736        // direct user chat. Disabled by default to prevent false positives on benign messages.
3737        #[cfg(feature = "classifiers")]
3738        if self.security.sanitizer.scan_user_input() {
3739            match self.security.sanitizer.classify_injection(trimmed).await {
3740                zeph_sanitizer::InjectionVerdict::Blocked => {
3741                    self.push_classifier_metrics();
3742                    let _ = self
3743                        .channel
3744                        .send("[security] Input blocked: injection detected by classifier.")
3745                        .await;
3746                    let _ = self.channel.flush_chunks().await;
3747                    return Ok(());
3748                }
3749                zeph_sanitizer::InjectionVerdict::Suspicious => {
3750                    tracing::warn!("injection_classifier soft_signal on user input");
3751                }
3752                zeph_sanitizer::InjectionVerdict::Clean => {}
3753            }
3754        }
3755        #[cfg(feature = "classifiers")]
3756        self.push_classifier_metrics();
3757
3758        // Reset per-message pruning cache at the start of each turn (#2298).
3759        self.mcp.pruning_cache.reset();
3760
3761        // Extract before rebuild_system_prompt so the value is not tainted
3762        // by the secrets-bearing system prompt (ConversationId is just an i64).
3763        let conv_id = self.memory_state.conversation_id;
3764        self.rebuild_system_prompt(&text).await;
3765
3766        self.detect_and_record_corrections(trimmed, conv_id).await;
3767        self.learning_engine.tick();
3768        self.analyze_and_learn().await;
3769        self.sync_graph_counts().await;
3770
3771        // Reset per-turn compaction guard FIRST so SideQuest sees a clean slate (C2 fix).
3772        // complete_focus and maybe_sidequest_eviction set this flag when they run (C1 fix).
3773        // advance_turn() transitions CompactedThisTurn → Cooling/Ready; all other states
3774        // pass through unchanged. See CompactionState::advance_turn for ordering guarantees.
3775        self.context_manager.compaction = self.context_manager.compaction.advance_turn();
3776
3777        // Tick Focus Agent and SideQuest turn counters (#1850, #1885).
3778        #[cfg(feature = "context-compression")]
3779        {
3780            self.focus.tick();
3781
3782            // SideQuest eviction: runs every N user turns when enabled.
3783            // Skipped when is_compacted_this_turn (focus truncation or prior eviction ran).
3784            let sidequest_should_fire = self.sidequest.tick();
3785            if sidequest_should_fire && !self.context_manager.compaction.is_compacted_this_turn() {
3786                self.maybe_sidequest_eviction();
3787            }
3788        }
3789
3790        // Tier 0: batch-apply deferred tool summaries when approaching context limit.
3791        // This is a pure in-memory operation (no LLM call) — summaries were pre-computed
3792        // during the tool loop. Intentionally does NOT set compacted_this_turn, so
3793        // proactive/reactive compaction may still fire if tokens remain above their thresholds.
3794        self.maybe_apply_deferred_summaries();
3795        self.flush_deferred_summaries().await;
3796
3797        // Proactive compression fires first (if configured); if it runs, reactive is skipped.
3798        if let Err(e) = self.maybe_proactive_compress().await {
3799            tracing::warn!("proactive compression failed: {e:#}");
3800        }
3801
3802        if let Err(e) = self.maybe_compact().await {
3803            tracing::warn!("context compaction failed: {e:#}");
3804        }
3805
3806        if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
3807            tracing::warn!("context preparation failed: {e:#}");
3808        }
3809
3810        // MAR: propagate top-1 recall confidence to the router for cost-aware routing.
3811        self.provider
3812            .set_memory_confidence(self.memory_state.last_recall_confidence);
3813
3814        self.learning_engine.reset_reflection();
3815
3816        let mut all_image_parts = std::mem::take(&mut self.msg.pending_image_parts);
3817        all_image_parts.extend(image_parts);
3818        let image_parts = all_image_parts;
3819
3820        let user_msg = if !image_parts.is_empty() && self.provider.supports_vision() {
3821            let mut parts = vec![zeph_llm::provider::MessagePart::Text { text: text.clone() }];
3822            parts.extend(image_parts);
3823            Message::from_parts(Role::User, parts)
3824        } else {
3825            if !image_parts.is_empty() {
3826                tracing::warn!(
3827                    count = image_parts.len(),
3828                    "image attachments dropped: provider does not support vision"
3829                );
3830            }
3831            Message {
3832                role: Role::User,
3833                content: text.clone(),
3834                parts: vec![],
3835                metadata: MessageMetadata::default(),
3836            }
3837        };
3838        // Extract URLs from user input and add to user_provided_urls for grounding checks.
3839        let urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
3840        if !urls.is_empty()
3841            && let Ok(mut set) = self.security.user_provided_urls.write()
3842        {
3843            set.extend(urls);
3844        }
3845
3846        // Capture raw user input as goal text for A-MAC goal-conditioned write gating (#2483).
3847        // Derived from the raw input text before context assembly to avoid timing dependencies.
3848        self.memory_state.goal_text = Some(text.clone());
3849
3850        // Image parts intentionally excluded — base64 payloads too large for message history.
3851        self.persist_message(Role::User, &text, &[], false).await;
3852        self.push_message(user_msg);
3853
3854        if let Err(e) = self.process_response().await {
3855            tracing::error!("Response processing failed: {e:#}");
3856            let user_msg = format!("Error: {e:#}");
3857            self.channel.send(&user_msg).await?;
3858            self.msg.messages.pop();
3859            self.recompute_prompt_tokens();
3860            self.channel.flush_chunks().await?;
3861        }
3862
3863        Ok(())
3864    }
3865
3866    async fn handle_image_command(&mut self, path: &str) -> Result<(), error::AgentError> {
3867        use std::path::Component;
3868        use zeph_llm::provider::{ImageData, MessagePart};
3869
3870        // Reject paths that traverse outside the current directory.
3871        let has_parent_dir = std::path::Path::new(path)
3872            .components()
3873            .any(|c| c == Component::ParentDir);
3874        if has_parent_dir {
3875            self.channel
3876                .send("Invalid image path: path traversal not allowed")
3877                .await?;
3878            let _ = self.channel.flush_chunks().await;
3879            return Ok(());
3880        }
3881
3882        let data = match std::fs::read(path) {
3883            Ok(d) => d,
3884            Err(e) => {
3885                self.channel
3886                    .send(&format!("Cannot read image {path}: {e}"))
3887                    .await?;
3888                let _ = self.channel.flush_chunks().await;
3889                return Ok(());
3890            }
3891        };
3892        if data.len() > MAX_IMAGE_BYTES {
3893            self.channel
3894                .send(&format!(
3895                    "Image {path} exceeds size limit ({} MB), skipping",
3896                    MAX_IMAGE_BYTES / 1024 / 1024
3897                ))
3898                .await?;
3899            let _ = self.channel.flush_chunks().await;
3900            return Ok(());
3901        }
3902        let mime_type = detect_image_mime(Some(path)).to_string();
3903        self.msg
3904            .pending_image_parts
3905            .push(MessagePart::Image(Box::new(ImageData { data, mime_type })));
3906        self.channel
3907            .send(&format!("Image loaded: {path}. Send your message."))
3908            .await?;
3909        let _ = self.channel.flush_chunks().await;
3910        Ok(())
3911    }
3912
3913    async fn handle_help_command(&mut self) -> Result<(), error::AgentError> {
3914        use std::fmt::Write;
3915
3916        let mut out = String::from("Slash commands:\n\n");
3917
3918        let categories = [
3919            slash_commands::SlashCategory::Info,
3920            slash_commands::SlashCategory::Session,
3921            slash_commands::SlashCategory::Model,
3922            slash_commands::SlashCategory::Memory,
3923            slash_commands::SlashCategory::Tools,
3924            slash_commands::SlashCategory::Planning,
3925            slash_commands::SlashCategory::Debug,
3926            slash_commands::SlashCategory::Advanced,
3927        ];
3928
3929        for cat in &categories {
3930            let entries: Vec<_> = slash_commands::COMMANDS
3931                .iter()
3932                .filter(|c| &c.category == cat)
3933                .collect();
3934            if entries.is_empty() {
3935                continue;
3936            }
3937            let _ = writeln!(out, "{}:", cat.as_str());
3938            for cmd in entries {
3939                if cmd.args.is_empty() {
3940                    let _ = write!(out, "  {}", cmd.name);
3941                } else {
3942                    let _ = write!(out, "  {} {}", cmd.name, cmd.args);
3943                }
3944                let _ = write!(out, "  — {}", cmd.description);
3945                if let Some(feat) = cmd.feature_gate {
3946                    let _ = write!(out, " [requires: {feat}]");
3947                }
3948                let _ = writeln!(out);
3949            }
3950            let _ = writeln!(out);
3951        }
3952
3953        self.channel.send(out.trim_end()).await?;
3954        Ok(())
3955    }
3956
3957    #[allow(clippy::too_many_lines)]
3958    async fn handle_status_command(&mut self) -> Result<(), error::AgentError> {
3959        use std::fmt::Write;
3960
3961        let uptime = self.lifecycle.start_time.elapsed().as_secs();
3962        let msg_count = self
3963            .msg
3964            .messages
3965            .iter()
3966            .filter(|m| m.role == Role::User)
3967            .count();
3968
3969        let (
3970            api_calls,
3971            prompt_tokens,
3972            completion_tokens,
3973            cost_cents,
3974            mcp_servers,
3975            orch_plans,
3976            orch_tasks,
3977            orch_completed,
3978            orch_failed,
3979            orch_skipped,
3980        ) = if let Some(ref tx) = self.metrics.metrics_tx {
3981            let m = tx.borrow();
3982            (
3983                m.api_calls,
3984                m.prompt_tokens,
3985                m.completion_tokens,
3986                m.cost_spent_cents,
3987                m.mcp_server_count,
3988                m.orchestration.plans_total,
3989                m.orchestration.tasks_total,
3990                m.orchestration.tasks_completed,
3991                m.orchestration.tasks_failed,
3992                m.orchestration.tasks_skipped,
3993            )
3994        } else {
3995            (0, 0, 0, 0.0, 0, 0, 0, 0, 0, 0)
3996        };
3997
3998        let skill_count = self
3999            .skill_state
4000            .registry
4001            .read()
4002            .map(|r| r.all_meta().len())
4003            .unwrap_or(0);
4004
4005        let mut out = String::from("Session status:\n\n");
4006        let _ = writeln!(out, "Provider:  {}", self.provider.name());
4007        let _ = writeln!(out, "Model:     {}", self.runtime.model_name);
4008        let _ = writeln!(out, "Uptime:    {uptime}s");
4009        let _ = writeln!(out, "Turns:     {msg_count}");
4010        let _ = writeln!(out, "API calls: {api_calls}");
4011        let _ = writeln!(
4012            out,
4013            "Tokens:    {prompt_tokens} prompt / {completion_tokens} completion"
4014        );
4015        let _ = writeln!(out, "Skills:    {skill_count}");
4016        let _ = writeln!(out, "MCP:       {mcp_servers} server(s)");
4017        if let Some(ref tf) = self.tool_schema_filter {
4018            let _ = writeln!(
4019                out,
4020                "Filter:    enabled (top_k={}, always_on={}, {} embeddings)",
4021                tf.top_k(),
4022                tf.always_on_count(),
4023                tf.embedding_count(),
4024            );
4025        }
4026        #[cfg(feature = "policy-enforcer")]
4027        if let Some(ref adv) = self.runtime.adversarial_policy_info {
4028            let provider_display = if adv.provider.is_empty() {
4029                "default"
4030            } else {
4031                adv.provider.as_str()
4032            };
4033            let _ = writeln!(
4034                out,
4035                "Adv gate:  enabled (provider={}, policies={}, fail_open={})",
4036                provider_display, adv.policy_count, adv.fail_open
4037            );
4038        }
4039        if cost_cents > 0.0 {
4040            let _ = writeln!(out, "Cost:      ${:.4}", cost_cents / 100.0);
4041        }
4042        if orch_plans > 0 {
4043            let _ = writeln!(out);
4044            let _ = writeln!(out, "Orchestration:");
4045            let _ = writeln!(out, "  Plans:     {orch_plans}");
4046            let _ = writeln!(out, "  Tasks:     {orch_completed}/{orch_tasks} completed");
4047            if orch_failed > 0 {
4048                let _ = writeln!(out, "  Failed:    {orch_failed}");
4049            }
4050            if orch_skipped > 0 {
4051                let _ = writeln!(out, "  Skipped:   {orch_skipped}");
4052            }
4053        }
4054
4055        // Subgoal display (#2022): show active subgoal when a subgoal strategy is active.
4056        #[cfg(feature = "context-compression")]
4057        {
4058            use crate::config::PruningStrategy;
4059            if matches!(
4060                self.context_manager.compression.pruning_strategy,
4061                PruningStrategy::Subgoal | PruningStrategy::SubgoalMig
4062            ) {
4063                let _ = writeln!(out);
4064                let _ = writeln!(
4065                    out,
4066                    "Pruning:   {}",
4067                    match self.context_manager.compression.pruning_strategy {
4068                        PruningStrategy::SubgoalMig => "subgoal_mig",
4069                        _ => "subgoal",
4070                    }
4071                );
4072                let subgoal_count = self.compression.subgoal_registry.subgoals.len();
4073                let _ = writeln!(out, "Subgoals:  {subgoal_count} tracked");
4074                if let Some(active) = self.compression.subgoal_registry.active_subgoal() {
4075                    let _ = writeln!(out, "Active:    \"{}\"", active.description);
4076                } else {
4077                    let _ = writeln!(out, "Active:    (none yet)");
4078                }
4079            }
4080        }
4081
4082        // Graph memory status: show recall mode when graph memory is enabled.
4083        let gc = &self.memory_state.graph_config;
4084        if gc.enabled {
4085            let _ = writeln!(out);
4086            if gc.spreading_activation.enabled {
4087                let _ = writeln!(
4088                    out,
4089                    "Graph recall: spreading activation (lambda={:.2}, hops={})",
4090                    gc.spreading_activation.decay_lambda, gc.spreading_activation.max_hops,
4091                );
4092            } else {
4093                let _ = writeln!(out, "Graph recall: BFS (hops={})", gc.max_hops,);
4094            }
4095        }
4096
4097        self.channel.send(out.trim_end()).await?;
4098        Ok(())
4099    }
4100
4101    #[cfg(feature = "guardrail")]
4102    async fn handle_guardrail_command(&mut self) -> Result<(), error::AgentError> {
4103        use std::fmt::Write;
4104
4105        let mut out = String::new();
4106        if let Some(ref guardrail) = self.security.guardrail {
4107            let stats = guardrail.stats();
4108            let _ = writeln!(out, "Guardrail: enabled");
4109            let _ = writeln!(out, "Action:    {:?}", guardrail.action());
4110            let _ = writeln!(out, "Fail strategy: {:?}", guardrail.fail_strategy());
4111            let _ = writeln!(out, "Timeout:   {}ms", guardrail.timeout_ms());
4112            let _ = writeln!(
4113                out,
4114                "Tool scan: {}",
4115                if guardrail.scan_tool_output() {
4116                    "enabled"
4117                } else {
4118                    "disabled"
4119                }
4120            );
4121            let _ = writeln!(out, "\nStats:");
4122            let _ = writeln!(out, "  Total checks:  {}", stats.total_checks);
4123            let _ = writeln!(out, "  Flagged:       {}", stats.flagged_count);
4124            let _ = writeln!(out, "  Errors:        {}", stats.error_count);
4125            let _ = writeln!(out, "  Avg latency:   {}ms", stats.avg_latency_ms());
4126        } else {
4127            out.push_str("Guardrail: disabled\n");
4128            out.push_str(
4129                "Enable with: --guardrail flag or [security.guardrail] enabled = true in config",
4130            );
4131        }
4132
4133        self.channel.send(out.trim_end()).await?;
4134        Ok(())
4135    }
4136
4137    async fn handle_skills_family(&mut self, subcommand: &str) -> Result<(), error::AgentError> {
4138        match subcommand {
4139            "" => self.handle_skills_command().await,
4140            "confusability" => self.handle_skills_confusability_command().await,
4141            other => {
4142                self.channel
4143                    .send(&format!(
4144                        "Unknown /skills subcommand: '{other}'. Available: confusability"
4145                    ))
4146                    .await?;
4147                Ok(())
4148            }
4149        }
4150    }
4151
4152    async fn handle_skills_command(&mut self) -> Result<(), error::AgentError> {
4153        use std::collections::BTreeMap;
4154        use std::fmt::Write;
4155
4156        let all_meta: Vec<zeph_skills::loader::SkillMeta> = self
4157            .skill_state
4158            .registry
4159            .read()
4160            .expect("registry read lock")
4161            .all_meta()
4162            .into_iter()
4163            .cloned()
4164            .collect();
4165
4166        // Collect trust info for all skills.
4167        let mut trust_map: std::collections::HashMap<String, String> =
4168            std::collections::HashMap::new();
4169        for meta in &all_meta {
4170            if let Some(memory) = &self.memory_state.memory {
4171                let info = memory
4172                    .sqlite()
4173                    .load_skill_trust(&meta.name)
4174                    .await
4175                    .ok()
4176                    .flatten()
4177                    .map_or_else(String::new, |r| format!(" [{}]", r.trust_level));
4178                trust_map.insert(meta.name.clone(), info);
4179            }
4180        }
4181
4182        let mut output = String::from("Available skills:\n\n");
4183
4184        // Group by category when any skill has one set.
4185        let has_categories = all_meta.iter().any(|m| m.category.is_some());
4186        if has_categories {
4187            // BTreeMap for stable alphabetical category order.
4188            let mut by_category: BTreeMap<&str, Vec<&zeph_skills::loader::SkillMeta>> =
4189                BTreeMap::new();
4190            for meta in &all_meta {
4191                let cat = meta.category.as_deref().unwrap_or("other");
4192                by_category.entry(cat).or_default().push(meta);
4193            }
4194            for (cat, skills) in &by_category {
4195                let _ = writeln!(output, "[{cat}]");
4196                for meta in skills {
4197                    let trust_info = trust_map.get(&meta.name).map_or("", String::as_str);
4198                    let _ = writeln!(output, "- {} — {}{trust_info}", meta.name, meta.description);
4199                }
4200                output.push('\n');
4201            }
4202        } else {
4203            for meta in &all_meta {
4204                let trust_info = trust_map.get(&meta.name).map_or("", String::as_str);
4205                let _ = writeln!(output, "- {} — {}{trust_info}", meta.name, meta.description);
4206            }
4207        }
4208
4209        if let Some(memory) = &self.memory_state.memory {
4210            match memory.sqlite().load_skill_usage().await {
4211                Ok(usage) if !usage.is_empty() => {
4212                    output.push_str("\nUsage statistics:\n\n");
4213                    for row in &usage {
4214                        let _ = writeln!(
4215                            output,
4216                            "- {}: {} invocations (last: {})",
4217                            row.skill_name, row.invocation_count, row.last_used_at,
4218                        );
4219                    }
4220                }
4221                Ok(_) => {}
4222                Err(e) => tracing::warn!("failed to load skill usage: {e:#}"),
4223            }
4224        }
4225
4226        self.channel.send(&output).await?;
4227        Ok(())
4228    }
4229
4230    async fn handle_skills_confusability_command(&mut self) -> Result<(), error::AgentError> {
4231        let threshold = self.skill_state.confusability_threshold;
4232        if threshold <= 0.0 {
4233            self.channel
4234                .send(
4235                    "Confusability monitoring is disabled. \
4236                     Set [skills] confusability_threshold in config (e.g. 0.85) to enable.",
4237                )
4238                .await?;
4239            return Ok(());
4240        }
4241
4242        let Some(matcher) = &self.skill_state.matcher else {
4243            self.channel
4244                .send("Skill matcher not available (no embedding provider configured).")
4245                .await?;
4246            return Ok(());
4247        };
4248
4249        let all_meta: Vec<zeph_skills::loader::SkillMeta> = self
4250            .skill_state
4251            .registry
4252            .read()
4253            .expect("registry read lock")
4254            .all_meta()
4255            .into_iter()
4256            .cloned()
4257            .collect();
4258        let refs: Vec<&zeph_skills::loader::SkillMeta> = all_meta.iter().collect();
4259
4260        let report = matcher.confusability_report(&refs, threshold).await;
4261        self.channel.send(&report.to_string()).await?;
4262        Ok(())
4263    }
4264
4265    async fn handle_feedback(&mut self, input: &str) -> Result<(), error::AgentError> {
4266        let Some((name, rest)) = input.split_once(' ') else {
4267            self.channel
4268                .send("Usage: /feedback <skill_name> <message>")
4269                .await?;
4270            return Ok(());
4271        };
4272        let (skill_name, feedback) = (name.trim(), rest.trim().trim_matches('"'));
4273
4274        if feedback.is_empty() {
4275            self.channel
4276                .send("Usage: /feedback <skill_name> <message>")
4277                .await?;
4278            return Ok(());
4279        }
4280
4281        let Some(memory) = &self.memory_state.memory else {
4282            self.channel.send("Memory not available.").await?;
4283            return Ok(());
4284        };
4285
4286        let outcome_type = if self.feedback.detector.detect(feedback, &[]).is_some() {
4287            "user_rejection"
4288        } else {
4289            "user_approval"
4290        };
4291
4292        memory
4293            .sqlite()
4294            .record_skill_outcome(
4295                skill_name,
4296                None,
4297                self.memory_state.conversation_id,
4298                outcome_type,
4299                None,
4300                Some(feedback),
4301            )
4302            .await?;
4303
4304        if self.is_learning_enabled() && outcome_type == "user_rejection" {
4305            self.generate_improved_skill(skill_name, feedback, "", Some(feedback))
4306                .await
4307                .ok();
4308        }
4309
4310        self.channel
4311            .send(&format!("Feedback recorded for \"{skill_name}\"."))
4312            .await?;
4313        Ok(())
4314    }
4315
4316    /// Poll a sub-agent until it reaches a terminal state, bridging secret requests to the
4317    /// channel. Returns a human-readable status string suitable for sending to the user.
4318    async fn poll_subagent_until_done(&mut self, task_id: &str, label: &str) -> Option<String> {
4319        use crate::subagent::SubAgentState;
4320        let result = loop {
4321            tokio::time::sleep(std::time::Duration::from_millis(500)).await;
4322
4323            // Bridge secret requests from sub-agent to channel.confirm().
4324            // Fetch the pending request first, then release the borrow before
4325            // calling channel.confirm() (which requires &mut self).
4326            #[allow(clippy::redundant_closure_for_method_calls)]
4327            let pending = self
4328                .orchestration
4329                .subagent_manager
4330                .as_mut()
4331                .and_then(|m| m.try_recv_secret_request());
4332            if let Some((req_task_id, req)) = pending {
4333                // req.secret_key is pre-validated to [a-zA-Z0-9_-] in manager.rs
4334                // (SEC-P1-02), so it is safe to embed in the prompt string.
4335                let confirm_prompt = format!(
4336                    "Sub-agent requests secret '{}'. Allow?",
4337                    crate::text::truncate_to_chars(&req.secret_key, 100)
4338                );
4339                let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
4340                if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
4341                    if approved {
4342                        let ttl = std::time::Duration::from_secs(300);
4343                        let key = req.secret_key.clone();
4344                        if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
4345                            let _ = mgr.deliver_secret(&req_task_id, key);
4346                        }
4347                    } else {
4348                        let _ = mgr.deny_secret(&req_task_id);
4349                    }
4350                }
4351            }
4352
4353            let mgr = self.orchestration.subagent_manager.as_ref()?;
4354            let statuses = mgr.statuses();
4355            let Some((_, status)) = statuses.iter().find(|(id, _)| id == task_id) else {
4356                break format!("{label} completed (no status available).");
4357            };
4358            match status.state {
4359                SubAgentState::Completed => {
4360                    let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
4361                    break format!("{label} completed: {msg}");
4362                }
4363                SubAgentState::Failed => {
4364                    let msg = status
4365                        .last_message
4366                        .clone()
4367                        .unwrap_or_else(|| "unknown error".into());
4368                    break format!("{label} failed: {msg}");
4369                }
4370                SubAgentState::Canceled => {
4371                    break format!("{label} was cancelled.");
4372                }
4373                _ => {
4374                    let _ = self
4375                        .channel
4376                        .send_status(&format!(
4377                            "{label}: turn {}/{}",
4378                            status.turns_used,
4379                            self.orchestration
4380                                .subagent_manager
4381                                .as_ref()
4382                                .and_then(|m| m.agents_def(task_id))
4383                                .map_or(20, |d| d.permissions.max_turns)
4384                        ))
4385                        .await;
4386                }
4387            }
4388        };
4389        Some(result)
4390    }
4391
4392    /// Resolve a unique full `task_id` from a prefix. Returns `None` if the manager is absent,
4393    /// `Some(Err(msg))` on ambiguity/not-found, `Some(Ok(full_id))` on success.
4394    fn resolve_agent_id_prefix(&mut self, prefix: &str) -> Option<Result<String, String>> {
4395        let mgr = self.orchestration.subagent_manager.as_mut()?;
4396        let full_ids: Vec<String> = mgr
4397            .statuses()
4398            .into_iter()
4399            .map(|(tid, _)| tid)
4400            .filter(|tid| tid.starts_with(prefix))
4401            .collect();
4402        Some(match full_ids.as_slice() {
4403            [] => Err(format!("No sub-agent with id prefix '{prefix}'")),
4404            [fid] => Ok(fid.clone()),
4405            _ => Err(format!(
4406                "Ambiguous id prefix '{prefix}': matches {} agents",
4407                full_ids.len()
4408            )),
4409        })
4410    }
4411
4412    fn handle_agent_list(&self) -> Option<String> {
4413        use std::fmt::Write as _;
4414        let mgr = self.orchestration.subagent_manager.as_ref()?;
4415        let defs = mgr.definitions();
4416        if defs.is_empty() {
4417            return Some("No sub-agent definitions found.".into());
4418        }
4419        let mut out = String::from("Available sub-agents:\n");
4420        for d in defs {
4421            let memory_label = match d.memory {
4422                Some(crate::subagent::MemoryScope::User) => " [memory:user]",
4423                Some(crate::subagent::MemoryScope::Project) => " [memory:project]",
4424                Some(crate::subagent::MemoryScope::Local) => " [memory:local]",
4425                None => "",
4426            };
4427            if let Some(ref src) = d.source {
4428                let _ = writeln!(
4429                    out,
4430                    "  {}{} — {} ({})",
4431                    d.name, memory_label, d.description, src
4432                );
4433            } else {
4434                let _ = writeln!(out, "  {}{} — {}", d.name, memory_label, d.description);
4435            }
4436        }
4437        Some(out)
4438    }
4439
4440    fn handle_agent_status(&self) -> Option<String> {
4441        use std::fmt::Write as _;
4442        let mgr = self.orchestration.subagent_manager.as_ref()?;
4443        let statuses = mgr.statuses();
4444        if statuses.is_empty() {
4445            return Some("No active sub-agents.".into());
4446        }
4447        let mut out = String::from("Active sub-agents:\n");
4448        for (id, s) in &statuses {
4449            let state = format!("{:?}", s.state).to_lowercase();
4450            let elapsed = s.started_at.elapsed().as_secs();
4451            let _ = writeln!(
4452                out,
4453                "  [{short}] {state}  turns={t}  elapsed={elapsed}s  {msg}",
4454                short = &id[..8.min(id.len())],
4455                t = s.turns_used,
4456                msg = s.last_message.as_deref().unwrap_or(""),
4457            );
4458            // Show memory directory path for agents with memory enabled.
4459            if let Some(def) = mgr.agents_def(id)
4460                && let Some(scope) = def.memory
4461                && let Ok(dir) = crate::subagent::memory::resolve_memory_dir(scope, &def.name)
4462            {
4463                let _ = writeln!(out, "       memory: {}", dir.display());
4464            }
4465        }
4466        Some(out)
4467    }
4468
4469    fn handle_agent_approve(&mut self, id: &str) -> Option<String> {
4470        let full_id = match self.resolve_agent_id_prefix(id)? {
4471            Ok(fid) => fid,
4472            Err(msg) => return Some(msg),
4473        };
4474        let mgr = self.orchestration.subagent_manager.as_mut()?;
4475        if let Some((tid, req)) = mgr.try_recv_secret_request()
4476            && tid == full_id
4477        {
4478            let key = req.secret_key.clone();
4479            let ttl = std::time::Duration::from_secs(300);
4480            if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
4481                return Some(format!("Approve failed: {e}"));
4482            }
4483            if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
4484                return Some(format!("Secret delivery failed: {e}"));
4485            }
4486            return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
4487        }
4488        Some(format!(
4489            "No pending secret request for sub-agent '{full_id}'."
4490        ))
4491    }
4492
4493    fn handle_agent_deny(&mut self, id: &str) -> Option<String> {
4494        let full_id = match self.resolve_agent_id_prefix(id)? {
4495            Ok(fid) => fid,
4496            Err(msg) => return Some(msg),
4497        };
4498        let mgr = self.orchestration.subagent_manager.as_mut()?;
4499        match mgr.deny_secret(&full_id) {
4500            Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
4501            Err(e) => Some(format!("Deny failed: {e}")),
4502        }
4503    }
4504
4505    async fn handle_agent_command(&mut self, cmd: crate::subagent::AgentCommand) -> Option<String> {
4506        use crate::subagent::AgentCommand;
4507
4508        match cmd {
4509            AgentCommand::List => self.handle_agent_list(),
4510            AgentCommand::Background { name, prompt } => {
4511                let provider = self.provider.clone();
4512                let tool_executor = Arc::clone(&self.tool_executor);
4513                let skills = self.filtered_skills_for(&name);
4514                let mgr = self.orchestration.subagent_manager.as_mut()?;
4515                let cfg = self.orchestration.subagent_config.clone();
4516                match mgr.spawn(&name, &prompt, provider, tool_executor, skills, &cfg) {
4517                    Ok(id) => Some(format!(
4518                        "Sub-agent '{name}' started in background (id: {short})",
4519                        short = &id[..8.min(id.len())]
4520                    )),
4521                    Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
4522                }
4523            }
4524            AgentCommand::Spawn { name, prompt }
4525            | AgentCommand::Mention {
4526                agent: name,
4527                prompt,
4528            } => {
4529                // Foreground spawn: launch and await completion, streaming status to user.
4530                let provider = self.provider.clone();
4531                let tool_executor = Arc::clone(&self.tool_executor);
4532                let skills = self.filtered_skills_for(&name);
4533                let mgr = self.orchestration.subagent_manager.as_mut()?;
4534                let cfg = self.orchestration.subagent_config.clone();
4535                let task_id = match mgr.spawn(&name, &prompt, provider, tool_executor, skills, &cfg)
4536                {
4537                    Ok(id) => id,
4538                    Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
4539                };
4540                let short = task_id[..8.min(task_id.len())].to_owned();
4541                let _ = self
4542                    .channel
4543                    .send(&format!("Sub-agent '{name}' running... (id: {short})"))
4544                    .await;
4545                let label = format!("Sub-agent '{name}'");
4546                self.poll_subagent_until_done(&task_id, &label).await
4547            }
4548            AgentCommand::Status => self.handle_agent_status(),
4549            AgentCommand::Cancel { id } => {
4550                let mgr = self.orchestration.subagent_manager.as_mut()?;
4551                // Accept prefix match on task_id.
4552                let ids: Vec<String> = mgr
4553                    .statuses()
4554                    .into_iter()
4555                    .map(|(task_id, _)| task_id)
4556                    .filter(|task_id| task_id.starts_with(&id))
4557                    .collect();
4558                match ids.as_slice() {
4559                    [] => Some(format!("No sub-agent with id prefix '{id}'")),
4560                    [full_id] => {
4561                        let full_id = full_id.clone();
4562                        match mgr.cancel(&full_id) {
4563                            Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
4564                            Err(e) => Some(format!("Cancel failed: {e}")),
4565                        }
4566                    }
4567                    _ => Some(format!(
4568                        "Ambiguous id prefix '{id}': matches {} agents",
4569                        ids.len()
4570                    )),
4571                }
4572            }
4573            AgentCommand::Approve { id } => self.handle_agent_approve(&id),
4574            AgentCommand::Deny { id } => self.handle_agent_deny(&id),
4575            AgentCommand::Resume { id, prompt } => {
4576                let cfg = self.orchestration.subagent_config.clone();
4577                // Resolve definition name from transcript meta before spawning so we can
4578                // look up skills by definition name rather than the UUID prefix (S1 fix).
4579                let def_name = {
4580                    let mgr = self.orchestration.subagent_manager.as_ref()?;
4581                    match mgr.def_name_for_resume(&id, &cfg) {
4582                        Ok(name) => name,
4583                        Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
4584                    }
4585                };
4586                let skills = self.filtered_skills_for(&def_name);
4587                let provider = self.provider.clone();
4588                let tool_executor = Arc::clone(&self.tool_executor);
4589                let mgr = self.orchestration.subagent_manager.as_mut()?;
4590                let (task_id, _) =
4591                    match mgr.resume(&id, &prompt, provider, tool_executor, skills, &cfg) {
4592                        Ok(pair) => pair,
4593                        Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
4594                    };
4595                let short = task_id[..8.min(task_id.len())].to_owned();
4596                let _ = self
4597                    .channel
4598                    .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
4599                    .await;
4600                self.poll_subagent_until_done(&task_id, "Resumed sub-agent")
4601                    .await
4602            }
4603        }
4604    }
4605
4606    fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
4607        let mgr = self.orchestration.subagent_manager.as_ref()?;
4608        let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
4609        let reg = self
4610            .skill_state
4611            .registry
4612            .read()
4613            .expect("registry read lock");
4614        match crate::subagent::filter_skills(&reg, &def.skills) {
4615            Ok(skills) => {
4616                let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
4617                if bodies.is_empty() {
4618                    None
4619                } else {
4620                    Some(bodies)
4621                }
4622            }
4623            Err(e) => {
4624                tracing::warn!(error = %e, "skill filtering failed for sub-agent");
4625                None
4626            }
4627        }
4628    }
4629
4630    /// Update trust DB records for all reloaded skills.
4631    async fn update_trust_for_reloaded_skills(&self, all_meta: &[zeph_skills::loader::SkillMeta]) {
4632        let Some(ref memory) = self.memory_state.memory else {
4633            return;
4634        };
4635        let trust_cfg = self.skill_state.trust_config.clone();
4636        let managed_dir = self.skill_state.managed_dir.clone();
4637        for meta in all_meta {
4638            let source_kind = if managed_dir
4639                .as_ref()
4640                .is_some_and(|d| meta.skill_dir.starts_with(d))
4641            {
4642                zeph_memory::store::SourceKind::Hub
4643            } else {
4644                zeph_memory::store::SourceKind::Local
4645            };
4646            let initial_level = if matches!(source_kind, zeph_memory::store::SourceKind::Hub) {
4647                &trust_cfg.default_level
4648            } else {
4649                &trust_cfg.local_level
4650            };
4651            match zeph_skills::compute_skill_hash(&meta.skill_dir) {
4652                Ok(current_hash) => {
4653                    let existing = memory
4654                        .sqlite()
4655                        .load_skill_trust(&meta.name)
4656                        .await
4657                        .ok()
4658                        .flatten();
4659                    let trust_level_str = if let Some(ref row) = existing {
4660                        if row.blake3_hash == current_hash {
4661                            row.trust_level.clone()
4662                        } else {
4663                            trust_cfg.hash_mismatch_level.to_string()
4664                        }
4665                    } else {
4666                        initial_level.to_string()
4667                    };
4668                    let source_path = meta.skill_dir.to_str();
4669                    if let Err(e) = memory
4670                        .sqlite()
4671                        .upsert_skill_trust(
4672                            &meta.name,
4673                            &trust_level_str,
4674                            source_kind,
4675                            None,
4676                            source_path,
4677                            &current_hash,
4678                        )
4679                        .await
4680                    {
4681                        tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
4682                    }
4683                }
4684                Err(e) => {
4685                    tracing::warn!("failed to compute hash for '{}': {e:#}", meta.name);
4686                }
4687            }
4688        }
4689    }
4690
4691    /// Rebuild or sync the in-memory skill matcher and BM25 index after a registry update.
4692    async fn rebuild_skill_matcher(&mut self, all_meta: &[&zeph_skills::loader::SkillMeta]) {
4693        let provider = self.embedding_provider.clone();
4694        let embed_fn = |text: &str| -> zeph_skills::matcher::EmbedFuture {
4695            let owned = text.to_owned();
4696            let p = provider.clone();
4697            Box::pin(async move { p.embed(&owned).await })
4698        };
4699
4700        let needs_inmemory_rebuild = !self
4701            .skill_state
4702            .matcher
4703            .as_ref()
4704            .is_some_and(SkillMatcherBackend::is_qdrant);
4705
4706        if needs_inmemory_rebuild {
4707            self.skill_state.matcher = SkillMatcher::new(all_meta, embed_fn)
4708                .await
4709                .map(SkillMatcherBackend::InMemory);
4710        } else if let Some(ref mut backend) = self.skill_state.matcher {
4711            let _ = self.channel.send_status("syncing skill index...").await;
4712            if let Err(e) = backend
4713                .sync(all_meta, &self.skill_state.embedding_model, embed_fn)
4714                .await
4715            {
4716                tracing::warn!("failed to sync skill embeddings: {e:#}");
4717            }
4718        }
4719
4720        if self.skill_state.hybrid_search {
4721            let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
4722            let _ = self.channel.send_status("rebuilding search index...").await;
4723            self.skill_state.bm25_index = Some(zeph_skills::bm25::Bm25Index::build(&descs));
4724        }
4725    }
4726
4727    async fn reload_skills(&mut self) {
4728        let new_registry = SkillRegistry::load(&self.skill_state.skill_paths);
4729        if new_registry.fingerprint()
4730            == self
4731                .skill_state
4732                .registry
4733                .read()
4734                .expect("registry read lock")
4735                .fingerprint()
4736        {
4737            return;
4738        }
4739        let _ = self.channel.send_status("reloading skills...").await;
4740        *self
4741            .skill_state
4742            .registry
4743            .write()
4744            .expect("registry write lock") = new_registry;
4745
4746        let all_meta = self
4747            .skill_state
4748            .registry
4749            .read()
4750            .expect("registry read lock")
4751            .all_meta()
4752            .into_iter()
4753            .cloned()
4754            .collect::<Vec<_>>();
4755
4756        self.update_trust_for_reloaded_skills(&all_meta).await;
4757
4758        let all_meta_refs = all_meta.iter().collect::<Vec<_>>();
4759        self.rebuild_skill_matcher(&all_meta_refs).await;
4760
4761        let all_skills: Vec<Skill> = {
4762            let reg = self
4763                .skill_state
4764                .registry
4765                .read()
4766                .expect("registry read lock");
4767            reg.all_meta()
4768                .iter()
4769                .filter_map(|m| reg.get_skill(&m.name).ok())
4770                .collect()
4771        };
4772        let trust_map = self.build_skill_trust_map().await;
4773        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
4774        let skills_prompt = format_skills_prompt(&all_skills, &trust_map, &empty_health);
4775        self.skill_state
4776            .last_skills_prompt
4777            .clone_from(&skills_prompt);
4778        let system_prompt = build_system_prompt(&skills_prompt, None, None, false);
4779        if let Some(msg) = self.msg.messages.first_mut() {
4780            msg.content = system_prompt;
4781        }
4782
4783        let _ = self.channel.send_status("").await;
4784        tracing::info!(
4785            "reloaded {} skill(s)",
4786            self.skill_state
4787                .registry
4788                .read()
4789                .expect("registry read lock")
4790                .all_meta()
4791                .len()
4792        );
4793    }
4794
4795    fn reload_instructions(&mut self) {
4796        // Drain any additional queued events before reloading to avoid redundant reloads.
4797        if let Some(ref mut rx) = self.instructions.reload_rx {
4798            while rx.try_recv().is_ok() {}
4799        }
4800        let Some(ref state) = self.instructions.reload_state else {
4801            return;
4802        };
4803        let new_blocks = crate::instructions::load_instructions(
4804            &state.base_dir,
4805            &state.provider_kinds,
4806            &state.explicit_files,
4807            state.auto_detect,
4808        );
4809        let old_sources: std::collections::HashSet<_> =
4810            self.instructions.blocks.iter().map(|b| &b.source).collect();
4811        let new_sources: std::collections::HashSet<_> =
4812            new_blocks.iter().map(|b| &b.source).collect();
4813        for added in new_sources.difference(&old_sources) {
4814            tracing::info!(path = %added.display(), "instruction file added");
4815        }
4816        for removed in old_sources.difference(&new_sources) {
4817            tracing::info!(path = %removed.display(), "instruction file removed");
4818        }
4819        tracing::info!(
4820            old_count = self.instructions.blocks.len(),
4821            new_count = new_blocks.len(),
4822            "reloaded instruction files"
4823        );
4824        self.instructions.blocks = new_blocks;
4825    }
4826
4827    fn reload_config(&mut self) {
4828        let Some(ref path) = self.lifecycle.config_path else {
4829            return;
4830        };
4831        let config = match Config::load(path) {
4832            Ok(c) => c,
4833            Err(e) => {
4834                tracing::warn!("config reload failed: {e:#}");
4835                return;
4836            }
4837        };
4838
4839        self.runtime.security = config.security;
4840        self.runtime.timeouts = config.timeouts;
4841        self.runtime.redact_credentials = config.memory.redact_credentials;
4842        self.memory_state.history_limit = config.memory.history_limit;
4843        self.memory_state.recall_limit = config.memory.semantic.recall_limit;
4844        self.memory_state.summarization_threshold = config.memory.summarization_threshold;
4845        self.skill_state.max_active_skills = config.skills.max_active_skills;
4846        self.skill_state.disambiguation_threshold = config.skills.disambiguation_threshold;
4847        self.skill_state.min_injection_score = config.skills.min_injection_score;
4848        self.skill_state.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
4849        self.skill_state.hybrid_search = config.skills.hybrid_search;
4850        self.skill_state.two_stage_matching = config.skills.two_stage_matching;
4851        self.skill_state.confusability_threshold =
4852            config.skills.confusability_threshold.clamp(0.0, 1.0);
4853
4854        if config.memory.context_budget_tokens > 0 {
4855            self.context_manager.budget = Some(
4856                ContextBudget::new(config.memory.context_budget_tokens, 0.20)
4857                    .with_graph_enabled(config.memory.graph.enabled),
4858            );
4859        } else {
4860            self.context_manager.budget = None;
4861        }
4862
4863        {
4864            let graph_cfg = &config.memory.graph;
4865            if graph_cfg.rpe.enabled {
4866                // Re-create router only if it doesn't exist yet; preserve state on hot-reload.
4867                if self.memory_state.rpe_router.is_none() {
4868                    self.memory_state.rpe_router =
4869                        Some(std::sync::Mutex::new(zeph_memory::RpeRouter::new(
4870                            graph_cfg.rpe.threshold,
4871                            graph_cfg.rpe.max_skip_turns,
4872                        )));
4873                }
4874            } else {
4875                self.memory_state.rpe_router = None;
4876            }
4877            self.memory_state.graph_config = graph_cfg.clone();
4878        }
4879        self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
4880        self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
4881        self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
4882        self.context_manager.compaction_cooldown_turns = config.memory.compaction_cooldown_turns;
4883        self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
4884        self.context_manager.compression = config.memory.compression.clone();
4885        self.context_manager.routing = config.memory.store_routing.clone();
4886        // Resolve routing_classifier_provider from the provider pool (#2484).
4887        self.context_manager.store_routing_provider = if config
4888            .memory
4889            .store_routing
4890            .routing_classifier_provider
4891            .is_empty()
4892        {
4893            None
4894        } else {
4895            let resolved = self.resolve_background_provider(
4896                &config.memory.store_routing.routing_classifier_provider,
4897            );
4898            Some(std::sync::Arc::new(resolved))
4899        };
4900        self.memory_state.cross_session_score_threshold =
4901            config.memory.cross_session_score_threshold;
4902
4903        self.index.repo_map_tokens = config.index.repo_map_tokens;
4904        self.index.repo_map_ttl = std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
4905
4906        tracing::info!("config reloaded");
4907    }
4908
4909    /// `/focus` slash command: display Focus Agent status.
4910    #[cfg(feature = "context-compression")]
4911    async fn handle_focus_status_command(&mut self) -> Result<(), error::AgentError> {
4912        use std::fmt::Write;
4913        let mut out = String::from("Focus Agent status\n\n");
4914        let _ = writeln!(out, "Enabled:          {}", self.focus.config.enabled);
4915        let _ = writeln!(out, "Active session:   {}", self.focus.is_active());
4916        if let Some(ref scope) = self.focus.active_scope {
4917            let _ = writeln!(out, "Active scope:     {scope}");
4918        }
4919        let _ = writeln!(
4920            out,
4921            "Knowledge blocks: {}",
4922            self.focus.knowledge_blocks.len()
4923        );
4924        let _ = writeln!(out, "Turns since focus: {}", self.focus.turns_since_focus);
4925        self.channel.send(&out).await?;
4926        Ok(())
4927    }
4928
4929    /// `/sidequest` slash command: display `SideQuest` eviction stats.
4930    #[cfg(feature = "context-compression")]
4931    async fn handle_sidequest_status_command(&mut self) -> Result<(), error::AgentError> {
4932        use std::fmt::Write;
4933        let mut out = String::from("SideQuest status\n\n");
4934        let _ = writeln!(out, "Enabled:        {}", self.sidequest.config.enabled);
4935        let _ = writeln!(
4936            out,
4937            "Interval turns: {}",
4938            self.sidequest.config.interval_turns
4939        );
4940        let _ = writeln!(out, "Turn counter:   {}", self.sidequest.turn_counter);
4941        let _ = writeln!(out, "Passes run:     {}", self.sidequest.passes_run);
4942        let _ = writeln!(
4943            out,
4944            "Total evicted:  {} tool outputs",
4945            self.sidequest.total_evicted
4946        );
4947        self.channel.send(&out).await?;
4948        Ok(())
4949    }
4950
4951    /// Run `SideQuest` tool output eviction pass (#1885).
4952    ///
4953    /// PERF-1 fix: two-phase non-blocking design.
4954    ///
4955    /// Phase 1 (apply, this turn): check for a background LLM result spawned last turn,
4956    /// validate and apply it immediately.
4957    ///
4958    /// Phase 2 (schedule, this turn): rebuild cursors and spawn a background `tokio::spawn`
4959    /// task for the LLM call. The result is stored in `pending_sidequest_result` and applied
4960    /// next turn, so the current agent turn is never blocked by the LLM call.
4961    #[cfg(feature = "context-compression")]
4962    #[allow(clippy::too_many_lines)]
4963    fn maybe_sidequest_eviction(&mut self) {
4964        use zeph_llm::provider::{Message, MessageMetadata, Role};
4965
4966        // S1 runtime guard: warn when SideQuest is enabled alongside a non-Reactive pruning
4967        // strategy — the two systems share the same pool of evictable tool outputs and can
4968        // interfere. Disable sidequest.enabled when pruning_strategy != Reactive.
4969        if self.sidequest.config.enabled {
4970            use crate::config::PruningStrategy;
4971            if !matches!(
4972                self.context_manager.compression.pruning_strategy,
4973                PruningStrategy::Reactive
4974            ) {
4975                tracing::warn!(
4976                    strategy = ?self.context_manager.compression.pruning_strategy,
4977                    "sidequest is enabled alongside a non-Reactive pruning strategy; \
4978                     consider disabling sidequest.enabled to avoid redundant eviction"
4979                );
4980            }
4981        }
4982
4983        // Guard: do not evict while a focus session is active.
4984        if self.focus.is_active() {
4985            tracing::debug!("sidequest: skipping — focus session active");
4986            // Drop any pending result — cursors may be stale relative to focus truncation.
4987            self.compression.pending_sidequest_result = None;
4988            return;
4989        }
4990
4991        // Phase 1: apply pending result from last turn's background LLM call.
4992        if let Some(handle) = self.compression.pending_sidequest_result.take() {
4993            // `now_or_never` avoids blocking — if the task isn't done yet, skip this turn.
4994            use futures::FutureExt as _;
4995            match handle.now_or_never() {
4996                Some(Ok(Some(evicted_indices))) if !evicted_indices.is_empty() => {
4997                    let cursors_snapshot = self.sidequest.tool_output_cursors.clone();
4998                    let freed = self.sidequest.apply_eviction(
4999                        &mut self.msg.messages,
5000                        &evicted_indices,
5001                        &self.metrics.token_counter,
5002                    );
5003                    if freed > 0 {
5004                        self.recompute_prompt_tokens();
5005                        // C1 fix: prevent maybe_compact() from firing in the same turn.
5006                        // cooldown=0: eviction does not impose post-compaction cooldown.
5007                        self.context_manager.compaction =
5008                            crate::agent::context_manager::CompactionState::CompactedThisTurn {
5009                                cooldown: 0,
5010                            };
5011                        tracing::info!(
5012                            freed_tokens = freed,
5013                            evicted_cursors = evicted_indices.len(),
5014                            pass = self.sidequest.passes_run,
5015                            "sidequest eviction complete"
5016                        );
5017                        if let Some(ref d) = self.debug_state.debug_dumper {
5018                            d.dump_sidequest_eviction(&cursors_snapshot, &evicted_indices, freed);
5019                        }
5020                        if let Some(ref tx) = self.session.status_tx {
5021                            let _ = tx.send(format!("SideQuest evicted {freed} tokens"));
5022                        }
5023                    } else {
5024                        // apply_eviction returned 0 — clear spinner so it doesn't dangle.
5025                        if let Some(ref tx) = self.session.status_tx {
5026                            let _ = tx.send(String::new());
5027                        }
5028                    }
5029                }
5030                Some(Ok(None | Some(_))) => {
5031                    tracing::debug!("sidequest: pending result: no cursors to evict");
5032                    if let Some(ref tx) = self.session.status_tx {
5033                        let _ = tx.send(String::new());
5034                    }
5035                }
5036                Some(Err(e)) => {
5037                    tracing::debug!("sidequest: background task panicked: {e}");
5038                    if let Some(ref tx) = self.session.status_tx {
5039                        let _ = tx.send(String::new());
5040                    }
5041                }
5042                None => {
5043                    // Task still running — re-store and wait another turn.
5044                    // We already took it; we'd need to re-spawn, but instead just drop and
5045                    // schedule fresh below to keep the cursor list current.
5046                    tracing::debug!(
5047                        "sidequest: background LLM task not yet complete, rescheduling"
5048                    );
5049                }
5050            }
5051        }
5052
5053        // Phase 2: rebuild cursors and schedule the next background eviction LLM call.
5054        self.sidequest
5055            .rebuild_cursors(&self.msg.messages, &self.metrics.token_counter);
5056
5057        if self.sidequest.tool_output_cursors.is_empty() {
5058            tracing::debug!("sidequest: no eligible cursors");
5059            return;
5060        }
5061
5062        let prompt = self.sidequest.build_eviction_prompt();
5063        let max_eviction_ratio = self.sidequest.config.max_eviction_ratio;
5064        let n_cursors = self.sidequest.tool_output_cursors.len();
5065        // Clone the provider so the spawn closure owns it without borrowing self.
5066        let provider = self.summary_or_primary_provider().clone();
5067
5068        // Spawn background task: the LLM call runs without blocking the agent loop.
5069        let handle = tokio::spawn(async move {
5070            let msgs = [Message {
5071                role: Role::User,
5072                content: prompt,
5073                parts: vec![],
5074                metadata: MessageMetadata::default(),
5075            }];
5076            let response =
5077                match tokio::time::timeout(std::time::Duration::from_secs(5), provider.chat(&msgs))
5078                    .await
5079                {
5080                    Ok(Ok(r)) => r,
5081                    Ok(Err(e)) => {
5082                        tracing::debug!("sidequest bg: LLM call failed: {e:#}");
5083                        return None;
5084                    }
5085                    Err(_) => {
5086                        tracing::debug!("sidequest bg: LLM call timed out");
5087                        return None;
5088                    }
5089                };
5090
5091            let start = response.find('{')?;
5092            let end = response.rfind('}')?;
5093            if start > end {
5094                return None;
5095            }
5096            let json_slice = &response[start..=end];
5097            let parsed: sidequest::EvictionResponse = serde_json::from_str(json_slice).ok()?;
5098            let mut valid: Vec<usize> = parsed
5099                .del_cursors
5100                .into_iter()
5101                .filter(|&c| c < n_cursors)
5102                .collect();
5103            valid.sort_unstable();
5104            valid.dedup();
5105            #[allow(
5106                clippy::cast_precision_loss,
5107                clippy::cast_possible_truncation,
5108                clippy::cast_sign_loss
5109            )]
5110            let max_evict = ((n_cursors as f32) * max_eviction_ratio).ceil() as usize;
5111            valid.truncate(max_evict);
5112            Some(valid)
5113        });
5114
5115        self.compression.pending_sidequest_result = Some(handle);
5116        tracing::debug!("sidequest: background LLM eviction task spawned");
5117        if let Some(ref tx) = self.session.status_tx {
5118            let _ = tx.send("SideQuest: scoring tool outputs...".into());
5119        }
5120    }
5121
5122    /// Check if the process cwd has changed since last call and fire `CwdChanged` hooks.
5123    ///
5124    /// Called after each tool batch completes. The check is a single syscall and has
5125    /// negligible cost. Only fires when cwd actually changed (defense-in-depth: normally
5126    /// only `set_working_directory` changes cwd; shell child processes cannot affect it).
5127    pub(crate) async fn check_cwd_changed(&mut self) {
5128        let current = match std::env::current_dir() {
5129            Ok(p) => p,
5130            Err(e) => {
5131                tracing::warn!("check_cwd_changed: failed to get cwd: {e}");
5132                return;
5133            }
5134        };
5135        if current == self.lifecycle.last_known_cwd {
5136            return;
5137        }
5138        let old_cwd = std::mem::replace(&mut self.lifecycle.last_known_cwd, current.clone());
5139        self.session.env_context.working_dir = current.display().to_string();
5140
5141        tracing::info!(
5142            old = %old_cwd.display(),
5143            new = %current.display(),
5144            "working directory changed"
5145        );
5146
5147        let _ = self
5148            .channel
5149            .send_status("Working directory changed\u{2026}")
5150            .await;
5151
5152        let hooks = self.session.hooks_config.cwd_changed.clone();
5153        if !hooks.is_empty() {
5154            let mut env = std::collections::HashMap::new();
5155            env.insert("ZEPH_OLD_CWD".to_owned(), old_cwd.display().to_string());
5156            env.insert("ZEPH_NEW_CWD".to_owned(), current.display().to_string());
5157            if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env).await {
5158                tracing::warn!(error = %e, "CwdChanged hook failed");
5159            }
5160        }
5161
5162        let _ = self.channel.send_status("").await;
5163    }
5164
5165    /// Handle a `FileChangedEvent` from the file watcher.
5166    pub(crate) async fn handle_file_changed(
5167        &mut self,
5168        event: crate::file_watcher::FileChangedEvent,
5169    ) {
5170        tracing::info!(path = %event.path.display(), "file changed");
5171
5172        let _ = self
5173            .channel
5174            .send_status("Running file-change hook\u{2026}")
5175            .await;
5176
5177        let hooks = self.session.hooks_config.file_changed_hooks.clone();
5178        if !hooks.is_empty() {
5179            let mut env = std::collections::HashMap::new();
5180            env.insert(
5181                "ZEPH_CHANGED_PATH".to_owned(),
5182                event.path.display().to_string(),
5183            );
5184            if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env).await {
5185                tracing::warn!(error = %e, "FileChanged hook failed");
5186            }
5187        }
5188
5189        let _ = self.channel.send_status("").await;
5190    }
5191}
5192pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
5193    while !*rx.borrow_and_update() {
5194        if rx.changed().await.is_err() {
5195            std::future::pending::<()>().await;
5196        }
5197    }
5198}
5199
5200/// Convert a `FeedbackVerdict` (from `LlmClassifier`) into a `CorrectionSignal`.
5201///
5202/// Mirrors `JudgeVerdict::into_signal` to keep both code paths symmetric.
5203fn feedback_verdict_into_signal(
5204    verdict: &zeph_llm::classifier::llm::FeedbackVerdict,
5205    user_message: &str,
5206) -> Option<feedback_detector::CorrectionSignal> {
5207    if !verdict.is_correction {
5208        return None;
5209    }
5210    let confidence = verdict.confidence.clamp(0.0, 1.0);
5211    let kind_raw = verdict.kind.trim().to_lowercase().replace(' ', "_");
5212    let kind = match kind_raw.as_str() {
5213        "explicit_rejection" => feedback_detector::CorrectionKind::ExplicitRejection,
5214        "alternative_request" => feedback_detector::CorrectionKind::AlternativeRequest,
5215        "repetition" => feedback_detector::CorrectionKind::Repetition,
5216        "self_correction" => feedback_detector::CorrectionKind::SelfCorrection,
5217        other => {
5218            tracing::warn!(
5219                kind = other,
5220                "llm-classifier returned unknown correction kind, discarding"
5221            );
5222            return None;
5223        }
5224    };
5225    Some(feedback_detector::CorrectionSignal {
5226        confidence,
5227        kind,
5228        feedback_text: user_message.to_owned(),
5229    })
5230}
5231
5232/// Store a correction record in memory (shared by judge and llm-classifier paths).
5233async fn store_correction_in_memory(
5234    memory: Option<std::sync::Arc<zeph_memory::semantic::SemanticMemory>>,
5235    conv_id: Option<zeph_memory::ConversationId>,
5236    assistant_snippet: &str,
5237    user_msg: &str,
5238    skill_name: String,
5239    kind_str: &str,
5240) {
5241    let Some(mem) = memory else { return };
5242    let correction_text = context::truncate_chars(user_msg, 500);
5243    match mem
5244        .sqlite()
5245        .store_user_correction(
5246            conv_id.map(|c| c.0),
5247            assistant_snippet,
5248            &correction_text,
5249            if skill_name.is_empty() {
5250                None
5251            } else {
5252                Some(skill_name.as_str())
5253            },
5254            kind_str,
5255        )
5256        .await
5257    {
5258        Ok(correction_id) => {
5259            if let Err(e) = mem
5260                .store_correction_embedding(correction_id, &correction_text)
5261                .await
5262            {
5263                tracing::warn!("failed to store correction embedding: {e:#}");
5264            }
5265        }
5266        Err(e) => {
5267            tracing::warn!("failed to store judge correction: {e:#}");
5268        }
5269    }
5270}
5271
5272pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
5273    match rx {
5274        Some(inner) => {
5275            if let Some(v) = inner.recv().await {
5276                Some(v)
5277            } else {
5278                *rx = None;
5279                std::future::pending().await
5280            }
5281        }
5282        None => std::future::pending().await,
5283    }
5284}
5285
5286#[cfg(test)]
5287mod tests;
5288
5289#[cfg(test)]
5290pub(crate) use tests::agent_tests;