Skip to main content

zeph_core/agent/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4mod accessors;
5mod builder;
6pub(crate) mod compaction_strategy;
7#[cfg(feature = "compression-guidelines")]
8pub(super) mod compression_feedback;
9mod context;
10pub(crate) mod context_manager;
11pub mod error;
12#[cfg(feature = "experiments")]
13mod experiment_cmd;
14pub(super) mod feedback_detector;
15pub(crate) mod focus;
16mod graph_commands;
17#[cfg(feature = "compression-guidelines")]
18mod guidelines_commands;
19mod index;
20mod learning;
21pub(crate) mod learning_engine;
22mod log_commands;
23#[cfg(feature = "lsp-context")]
24mod lsp_commands;
25mod mcp;
26mod memory_commands;
27mod message_queue;
28mod persistence;
29#[cfg(feature = "policy-enforcer")]
30mod policy_commands;
31mod provider_cmd;
32pub(crate) mod rate_limiter;
33#[cfg(feature = "scheduler")]
34mod scheduler_commands;
35pub mod session_config;
36pub(crate) mod sidequest;
37mod skill_management;
38pub mod slash_commands;
39pub(crate) mod state;
40pub(crate) mod tool_execution;
41pub(crate) mod tool_orchestrator;
42mod trust_commands;
43mod utils;
44
45use std::collections::{HashMap, HashSet, VecDeque};
46use std::sync::Arc;
47use std::time::Instant;
48
49use tokio::sync::{Notify, mpsc, watch};
50use tokio_util::sync::CancellationToken;
51use zeph_llm::any::AnyProvider;
52use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
53use zeph_memory::TokenCounter;
54use zeph_memory::semantic::SemanticMemory;
55use zeph_skills::loader::Skill;
56use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
57use zeph_skills::prompt::format_skills_prompt;
58use zeph_skills::registry::SkillRegistry;
59use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
60
61use crate::channel::Channel;
62use crate::config::Config;
63use crate::config::{SecurityConfig, SkillPromptMode, TimeoutConfig};
64use crate::context::{
65    ContextBudget, EnvironmentContext, build_system_prompt, build_system_prompt_with_instructions,
66};
67use zeph_sanitizer::ContentSanitizer;
68
69use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, detect_image_mime};
70#[cfg(feature = "context-compression")]
71use state::CompressionState;
72use state::{
73    DebugState, ExperimentState, FeedbackState, IndexState, InstructionState, LifecycleState,
74    McpState, MemoryState, MessageState, MetricsState, OrchestrationState, ProviderState,
75    RuntimeConfig, SecurityState, SessionState, SkillState,
76};
77
78pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
79pub(crate) const DOCUMENT_RAG_PREFIX: &str = "## Relevant documents\n";
80pub(crate) const RECALL_PREFIX: &str = "[semantic recall]\n";
81pub(crate) const CODE_CONTEXT_PREFIX: &str = "[code context]\n";
82pub(crate) const SUMMARY_PREFIX: &str = "[conversation summaries]\n";
83pub(crate) const CROSS_SESSION_PREFIX: &str = "[cross-session context]\n";
84pub(crate) const CORRECTIONS_PREFIX: &str = "[past corrections]\n";
85pub(crate) const GRAPH_FACTS_PREFIX: &str = "[known facts]\n";
86pub(crate) const SCHEDULED_TASK_PREFIX: &str = "Execute the following scheduled task now: ";
87/// Prefix used for LSP context messages (`Role::System`) injected into message history.
88/// The tool-pair summarizer targets User/Assistant pairs and skips System messages,
89/// so these notes are never accidentally summarized. `remove_lsp_messages` uses this
90/// prefix to clear stale notes before each fresh injection.
91#[cfg(feature = "lsp-context")]
92pub(crate) const LSP_NOTE_PREFIX: &str = "[lsp ";
93pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
94
95fn format_plan_summary(graph: &crate::orchestration::TaskGraph) -> String {
96    use std::fmt::Write;
97    let mut out = String::new();
98    let _ = writeln!(out, "Plan: \"{}\"", graph.goal);
99    let _ = writeln!(out, "Tasks: {}", graph.tasks.len());
100    let _ = writeln!(out);
101    for (i, task) in graph.tasks.iter().enumerate() {
102        let deps = if task.depends_on.is_empty() {
103            String::new()
104        } else {
105            let ids: Vec<String> = task.depends_on.iter().map(ToString::to_string).collect();
106            format!(" (after: {})", ids.join(", "))
107        };
108        let agent = task.agent_hint.as_deref().unwrap_or("-");
109        let _ = writeln!(out, "  {}. [{}] {}{}", i + 1, agent, task.title, deps);
110    }
111    out
112}
113
114pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
115    use std::fmt::Write;
116    let capacity = "[tool output: ".len()
117        + tool_name.len()
118        + "]\n```\n".len()
119        + body.len()
120        + TOOL_OUTPUT_SUFFIX.len();
121    let mut buf = String::with_capacity(capacity);
122    let _ = write!(
123        buf,
124        "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
125    );
126    buf
127}
128
129pub struct Agent<C: Channel> {
130    provider: AnyProvider,
131    /// Dedicated embedding provider. Resolved once at bootstrap from `[[llm.providers]]`
132    /// (the entry with `embed = true`, or first entry with `embedding_model` set).
133    /// Falls back to `provider.clone()` when no dedicated entry exists.
134    /// **Never replaced** by `/provider switch`.
135    embedding_provider: AnyProvider,
136    channel: C,
137    pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
138    pub(super) msg: MessageState,
139    pub(super) memory_state: MemoryState,
140    pub(super) skill_state: SkillState,
141    pub(super) context_manager: context_manager::ContextManager,
142    pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
143    pub(super) learning_engine: learning_engine::LearningEngine,
144    pub(super) feedback: FeedbackState,
145    pub(super) runtime: RuntimeConfig,
146    pub(super) mcp: McpState,
147    pub(super) index: IndexState,
148    pub(super) session: SessionState,
149    pub(super) debug_state: DebugState,
150    pub(super) instructions: InstructionState,
151    pub(super) security: SecurityState,
152    pub(super) experiments: ExperimentState,
153    #[cfg(feature = "context-compression")]
154    pub(super) compression: CompressionState,
155    pub(super) lifecycle: LifecycleState,
156    pub(super) providers: ProviderState,
157    pub(super) metrics: MetricsState,
158    pub(super) orchestration: OrchestrationState,
159    /// Focus agent state: active session tracking, knowledge block, reminder counters (#1850).
160    pub(super) focus: focus::FocusState,
161    /// `SideQuest` state: cursor tracking, turn counter, eviction stats (#1885).
162    pub(super) sidequest: sidequest::SidequestState,
163    /// Dynamic tool schema filter: pre-computed tool embeddings for per-turn filtering (#2020).
164    pub(super) tool_schema_filter: Option<zeph_tools::ToolSchemaFilter>,
165    /// Cached filtered tool IDs for the current user turn. Set by `compute_filtered_tool_ids()`
166    /// in `rebuild_system_prompt()`, consumed by the native tool loop on iteration 0.
167    pub(super) cached_filtered_tool_ids: Option<HashSet<String>>,
168    /// Tool dependency graph for sequential tool availability (issue #2024).
169    /// Built once from config, applied per-turn after tool schema filtering.
170    pub(super) dependency_graph: Option<zeph_tools::ToolDependencyGraph>,
171    /// Always-on tool IDs, mirrored from the tool schema filter for dependency gate bypass.
172    pub(super) dependency_always_on: HashSet<String>,
173    /// Tool IDs that completed successfully in the current session.
174    /// Grows monotonically per session; cleared on `/clear`.
175    /// NOTE: bounded by session length, typically < 1000 entries.
176    pub(super) completed_tool_ids: HashSet<String>,
177    /// DB row ID of the most recently persisted message. Set by `persist_message`;
178    /// consumed by `push_message` call sites to populate `metadata.db_id` on in-memory messages.
179    pub(super) last_persisted_message_id: Option<i64>,
180    /// DB message IDs pending hide after deferred tool pair summarization.
181    pub(super) deferred_db_hide_ids: Vec<i64>,
182    /// Summary texts pending insertion after deferred tool pair summarization.
183    pub(super) deferred_db_summaries: Vec<String>,
184}
185
186impl<C: Channel> Agent<C> {
187    #[must_use]
188    pub fn new(
189        provider: AnyProvider,
190        channel: C,
191        registry: SkillRegistry,
192        matcher: Option<SkillMatcherBackend>,
193        max_active_skills: usize,
194        tool_executor: impl ToolExecutor + 'static,
195    ) -> Self {
196        let registry = std::sync::Arc::new(std::sync::RwLock::new(registry));
197        Self::new_with_registry_arc(
198            provider,
199            channel,
200            registry,
201            matcher,
202            max_active_skills,
203            tool_executor,
204        )
205    }
206
207    /// Create an agent from a pre-wrapped registry Arc, allowing the caller to
208    /// share the same Arc with other components (e.g. [`crate::SkillLoaderExecutor`]).
209    ///
210    /// # Panics
211    ///
212    /// Panics if the registry `RwLock` is poisoned.
213    #[must_use]
214    #[allow(clippy::too_many_lines)] // flat struct literal initializing all Agent sub-structs — one field per sub-struct, cannot be split further
215    pub fn new_with_registry_arc(
216        provider: AnyProvider,
217        channel: C,
218        registry: std::sync::Arc<std::sync::RwLock<SkillRegistry>>,
219        matcher: Option<SkillMatcherBackend>,
220        max_active_skills: usize,
221        tool_executor: impl ToolExecutor + 'static,
222    ) -> Self {
223        debug_assert!(max_active_skills > 0, "max_active_skills must be > 0");
224        let all_skills: Vec<Skill> = {
225            let reg = registry.read().expect("registry read lock poisoned");
226            reg.all_meta()
227                .iter()
228                .filter_map(|m| reg.get_skill(&m.name).ok())
229                .collect()
230        };
231        let empty_trust = HashMap::new();
232        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
233        let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
234        let system_prompt = build_system_prompt(&skills_prompt, None, None, false);
235        tracing::debug!(len = system_prompt.len(), "initial system prompt built");
236        tracing::trace!(prompt = %system_prompt, "full system prompt");
237
238        let initial_prompt_tokens = u64::try_from(system_prompt.len()).unwrap_or(0) / 4;
239        let (_tx, rx) = watch::channel(false);
240        let token_counter = Arc::new(TokenCounter::new());
241        // Always create the receiver side of the experiment notification channel so the
242        // select! branch in the agent loop compiles unconditionally. The sender is only
243        // stored when the experiments feature is enabled (it is only used in experiment_cmd.rs).
244        #[cfg(feature = "experiments")]
245        let (exp_notify_tx, exp_notify_rx) = tokio::sync::mpsc::channel::<String>(4);
246        #[cfg(not(feature = "experiments"))]
247        let (_exp_notify_tx, exp_notify_rx) = tokio::sync::mpsc::channel::<String>(4);
248        let embedding_provider = provider.clone();
249        Self {
250            provider,
251            embedding_provider,
252            channel,
253            tool_executor: Arc::new(tool_executor),
254            msg: MessageState {
255                messages: vec![Message {
256                    role: Role::System,
257                    content: system_prompt,
258                    parts: vec![],
259                    metadata: MessageMetadata::default(),
260                }],
261                message_queue: VecDeque::new(),
262                pending_image_parts: Vec::new(),
263            },
264            memory_state: MemoryState {
265                memory: None,
266                conversation_id: None,
267                history_limit: 50,
268                recall_limit: 5,
269                summarization_threshold: 50,
270                cross_session_score_threshold: 0.35,
271                autosave_assistant: false,
272                autosave_min_length: 20,
273                tool_call_cutoff: 6,
274                unsummarized_count: 0,
275                document_config: crate::config::DocumentConfig::default(),
276                graph_config: crate::config::GraphConfig::default(),
277                compression_guidelines_config: zeph_memory::CompressionGuidelinesConfig::default(),
278                shutdown_summary: true,
279                shutdown_summary_min_messages: 4,
280                shutdown_summary_max_messages: 20,
281                shutdown_summary_timeout_secs: 10,
282                structured_summaries: false,
283            },
284            skill_state: SkillState {
285                registry,
286                skill_paths: Vec::new(),
287                managed_dir: None,
288                trust_config: crate::config::TrustConfig::default(),
289                matcher,
290                max_active_skills,
291                disambiguation_threshold: 0.05,
292                embedding_model: String::new(),
293                skill_reload_rx: None,
294                active_skill_names: Vec::new(),
295                last_skills_prompt: skills_prompt,
296                prompt_mode: SkillPromptMode::Auto,
297                available_custom_secrets: HashMap::new(),
298                cosine_weight: 0.7,
299                hybrid_search: false,
300                bm25_index: None,
301            },
302            context_manager: context_manager::ContextManager::new(),
303            tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
304            learning_engine: learning_engine::LearningEngine::new(),
305            feedback: FeedbackState {
306                detector: feedback_detector::FeedbackDetector::new(0.6),
307                judge: None,
308                llm_classifier: None,
309            },
310            debug_state: DebugState {
311                debug_dumper: None,
312                dump_format: crate::debug_dump::DumpFormat::default(),
313                trace_collector: None,
314                iteration_counter: 0,
315                anomaly_detector: None,
316                logging_config: crate::config::LoggingConfig::default(),
317                dump_dir: None,
318                trace_service_name: String::new(),
319                trace_redact: true,
320                current_iteration_span_id: None,
321            },
322            runtime: RuntimeConfig {
323                security: SecurityConfig::default(),
324                timeouts: TimeoutConfig::default(),
325                model_name: String::new(),
326                active_provider_name: String::new(),
327                permission_policy: zeph_tools::PermissionPolicy::default(),
328                redact_credentials: true,
329                rate_limiter: rate_limiter::ToolRateLimiter::new(
330                    rate_limiter::RateLimitConfig::default(),
331                ),
332                semantic_cache_enabled: false,
333                semantic_cache_threshold: 0.95,
334                semantic_cache_max_candidates: 10,
335                dependency_config: zeph_tools::DependencyConfig::default(),
336            },
337            mcp: McpState {
338                tools: Vec::new(),
339                registry: None,
340                manager: None,
341                allowed_commands: Vec::new(),
342                max_dynamic: 10,
343                shared_tools: None,
344                tool_rx: None,
345            },
346            index: IndexState {
347                retriever: None,
348                repo_map_tokens: 0,
349                cached_repo_map: None,
350                repo_map_ttl: std::time::Duration::from_secs(300),
351            },
352            session: SessionState {
353                env_context: EnvironmentContext::gather(""),
354                response_cache: None,
355                parent_tool_use_id: None,
356                status_tx: None,
357                #[cfg(feature = "lsp-context")]
358                lsp_hooks: None,
359                #[cfg(feature = "policy-enforcer")]
360                policy_config: None,
361            },
362            instructions: InstructionState {
363                blocks: Vec::new(),
364                reload_rx: None,
365                reload_state: None,
366            },
367            security: SecurityState {
368                sanitizer: ContentSanitizer::new(&zeph_sanitizer::ContentIsolationConfig::default()),
369                quarantine_summarizer: None,
370                exfiltration_guard: zeph_sanitizer::exfiltration::ExfiltrationGuard::new(
371                    zeph_sanitizer::exfiltration::ExfiltrationGuardConfig::default(),
372                ),
373                flagged_urls: std::collections::HashSet::new(),
374                user_provided_urls: std::sync::Arc::new(std::sync::RwLock::new(
375                    std::collections::HashSet::new(),
376                )),
377                pii_filter: zeph_sanitizer::pii::PiiFilter::new(
378                    zeph_sanitizer::pii::PiiFilterConfig::default(),
379                ),
380                #[cfg(feature = "classifiers")]
381                pii_ner_backend: None,
382                #[cfg(feature = "classifiers")]
383                pii_ner_timeout_ms: 5000,
384                memory_validator: zeph_sanitizer::memory_validation::MemoryWriteValidator::new(
385                    zeph_sanitizer::memory_validation::MemoryWriteValidationConfig::default(),
386                ),
387                #[cfg(feature = "guardrail")]
388                guardrail: None,
389                response_verifier: zeph_sanitizer::response_verifier::ResponseVerifier::new(
390                    zeph_config::ResponseVerificationConfig::default(),
391                ),
392            },
393            experiments: ExperimentState {
394                #[cfg(feature = "experiments")]
395                config: crate::config::ExperimentConfig::default(),
396                #[cfg(feature = "experiments")]
397                cancel: None,
398                #[cfg(feature = "experiments")]
399                baseline: crate::experiments::ConfigSnapshot::default(),
400                #[cfg(feature = "experiments")]
401                eval_provider: None,
402                notify_rx: Some(exp_notify_rx),
403                #[cfg(feature = "experiments")]
404                notify_tx: exp_notify_tx,
405            },
406            #[cfg(feature = "context-compression")]
407            compression: CompressionState {
408                current_task_goal: None,
409                task_goal_user_msg_hash: None,
410                pending_task_goal: None,
411                pending_sidequest_result: None,
412                subgoal_registry: crate::agent::compaction_strategy::SubgoalRegistry::default(),
413                pending_subgoal: None,
414                subgoal_user_msg_hash: None,
415            },
416            lifecycle: LifecycleState {
417                shutdown: rx,
418                start_time: Instant::now(),
419                cancel_signal: Arc::new(Notify::new()),
420                cancel_token: CancellationToken::new(),
421                config_path: None,
422                config_reload_rx: None,
423                warmup_ready: None,
424                update_notify_rx: None,
425                custom_task_rx: None,
426            },
427            providers: ProviderState {
428                summary_provider: None,
429                provider_override: None,
430                judge_provider: None,
431                probe_provider: None,
432                cached_prompt_tokens: initial_prompt_tokens,
433                server_compaction_active: false,
434                stt: None,
435                provider_pool: Vec::new(),
436                provider_config_snapshot: None,
437            },
438            metrics: MetricsState {
439                metrics_tx: None,
440                cost_tracker: None,
441                token_counter,
442                extended_context: false,
443            },
444            orchestration: OrchestrationState {
445                planner_provider: None,
446                pending_graph: None,
447                plan_cancel_token: None,
448                subagent_manager: None,
449                subagent_config: crate::config::SubAgentConfig::default(),
450                orchestration_config: crate::config::OrchestrationConfig::default(),
451                plan_cache: None,
452                pending_goal_embedding: None,
453            },
454            focus: focus::FocusState::default(),
455            sidequest: sidequest::SidequestState::default(),
456            tool_schema_filter: None,
457            cached_filtered_tool_ids: None,
458            dependency_graph: None,
459            dependency_always_on: HashSet::new(),
460            completed_tool_ids: HashSet::new(),
461            last_persisted_message_id: None,
462            deferred_db_hide_ids: Vec::new(),
463            deferred_db_summaries: Vec::new(),
464        }
465    }
466
467    /// Poll all active sub-agents for completed/failed/canceled results.
468    ///
469    /// Non-blocking: returns immediately with a list of `(task_id, result)` pairs
470    /// for agents that have finished. Each completed agent is removed from the manager.
471    pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
472        let Some(mgr) = &mut self.orchestration.subagent_manager else {
473            return vec![];
474        };
475
476        let finished: Vec<String> = mgr
477            .statuses()
478            .into_iter()
479            .filter_map(|(id, status)| {
480                if matches!(
481                    status.state,
482                    crate::subagent::SubAgentState::Completed
483                        | crate::subagent::SubAgentState::Failed
484                        | crate::subagent::SubAgentState::Canceled
485                ) {
486                    Some(id)
487                } else {
488                    None
489                }
490            })
491            .collect();
492
493        let mut results = vec![];
494        for task_id in finished {
495            match mgr.collect(&task_id).await {
496                Ok(result) => results.push((task_id, result)),
497                Err(e) => {
498                    tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
499                }
500            }
501        }
502        results
503    }
504
505    async fn handle_plan_command(
506        &mut self,
507        cmd: crate::orchestration::PlanCommand,
508    ) -> Result<(), error::AgentError> {
509        use crate::orchestration::PlanCommand;
510
511        if !self.config_for_orchestration().enabled {
512            self.channel
513                .send(
514                    "Task orchestration is disabled. Set `orchestration.enabled = true` in config.",
515                )
516                .await?;
517            return Ok(());
518        }
519
520        match cmd {
521            PlanCommand::Goal(goal) => self.handle_plan_goal(&goal).await,
522            PlanCommand::Confirm => self.handle_plan_confirm().await,
523            PlanCommand::Status(id) => self.handle_plan_status(id.as_deref()).await,
524            PlanCommand::List => self.handle_plan_list().await,
525            PlanCommand::Cancel(id) => self.handle_plan_cancel(id.as_deref()).await,
526            PlanCommand::Resume(id) => self.handle_plan_resume(id.as_deref()).await,
527            PlanCommand::Retry(id) => self.handle_plan_retry(id.as_deref()).await,
528        }
529    }
530
531    fn config_for_orchestration(&self) -> &crate::config::OrchestrationConfig {
532        &self.orchestration.orchestration_config
533    }
534
535    /// Lazily initialize `OrchestrationState::plan_cache` on the first `/plan` call.
536    ///
537    /// No-op when the cache is already initialized, disabled in config, or memory is unavailable.
538    async fn init_plan_cache_if_needed(&mut self) {
539        let plan_cache_config = self.orchestration.orchestration_config.plan_cache.clone();
540        if !plan_cache_config.enabled || self.orchestration.plan_cache.is_some() {
541            return;
542        }
543        if let Some(ref memory) = self.memory_state.memory {
544            let pool = memory.sqlite().pool().clone();
545            let embed_model = self.skill_state.embedding_model.clone();
546            match crate::orchestration::PlanCache::new(pool, plan_cache_config, &embed_model).await
547            {
548                Ok(cache) => self.orchestration.plan_cache = Some(cache),
549                Err(e) => {
550                    tracing::warn!(error = %e, "plan cache: init failed, proceeding without cache");
551                }
552            }
553        } else {
554            tracing::warn!("plan cache: memory not configured, proceeding without cache");
555        }
556    }
557
558    /// Compute a normalized goal embedding for plan cache lookups (best-effort).
559    ///
560    /// Returns `None` when the cache is disabled, the provider does not support embeddings,
561    /// or the embedding call fails.
562    async fn goal_embedding_for_cache(&self, goal: &str) -> Option<Vec<f32>> {
563        use crate::orchestration::normalize_goal;
564
565        self.orchestration.plan_cache.as_ref()?;
566        let normalized = normalize_goal(goal);
567        match self.embedding_provider.embed(&normalized).await {
568            Ok(emb) => Some(emb),
569            Err(zeph_llm::LlmError::EmbedUnsupported { .. }) => {
570                tracing::debug!(
571                    "plan cache: provider does not support embeddings, skipping cache lookup"
572                );
573                None
574            }
575            Err(e) => {
576                tracing::warn!(error = %e, "plan cache: goal embedding failed, skipping cache");
577                None
578            }
579        }
580    }
581
582    async fn handle_plan_goal(&mut self, goal: &str) -> Result<(), error::AgentError> {
583        use crate::orchestration::{LlmPlanner, plan_with_cache};
584
585        if self.orchestration.pending_graph.is_some() {
586            self.channel
587                .send(
588                    "A plan is already pending confirmation. \
589                     Use /plan confirm to execute it or /plan cancel to discard.",
590                )
591                .await?;
592            return Ok(());
593        }
594
595        self.channel.send("Planning task decomposition...").await?;
596
597        let available_agents = self
598            .orchestration
599            .subagent_manager
600            .as_ref()
601            .map(|m| m.definitions().to_vec())
602            .unwrap_or_default();
603
604        let confirm_before_execute = self
605            .orchestration
606            .orchestration_config
607            .confirm_before_execute;
608
609        self.init_plan_cache_if_needed().await;
610        let goal_embedding = self.goal_embedding_for_cache(goal).await;
611
612        tracing::debug!(
613            cache_enabled = self.orchestration.orchestration_config.plan_cache.enabled,
614            has_embedding = goal_embedding.is_some(),
615            "plan cache state for goal"
616        );
617
618        let planner_provider = self
619            .orchestration
620            .planner_provider
621            .as_ref()
622            .unwrap_or(&self.provider)
623            .clone();
624        let planner = LlmPlanner::new(planner_provider, &self.orchestration.orchestration_config);
625        let embed_model = self.skill_state.embedding_model.clone();
626        let (graph, planner_usage) = plan_with_cache(
627            &planner,
628            self.orchestration.plan_cache.as_ref(),
629            &self.provider,
630            goal_embedding.as_deref(),
631            &embed_model,
632            goal,
633            &available_agents,
634            self.orchestration.orchestration_config.max_tasks,
635        )
636        .await
637        .map_err(|e| error::AgentError::Other(e.to_string()))?;
638
639        // Store embedding for cache_plan() after execution completes.
640        self.orchestration.pending_goal_embedding = goal_embedding;
641
642        let task_count = graph.tasks.len() as u64;
643        let snapshot = crate::metrics::TaskGraphSnapshot::from(&graph);
644        let (planner_prompt, planner_completion) = planner_usage.unwrap_or((0, 0));
645        self.update_metrics(|m| {
646            m.api_calls += 1;
647            m.prompt_tokens += planner_prompt;
648            m.completion_tokens += planner_completion;
649            m.total_tokens = m.prompt_tokens + m.completion_tokens;
650            m.orchestration.plans_total += 1;
651            m.orchestration.tasks_total += task_count;
652            m.orchestration_graph = Some(snapshot);
653        });
654        self.record_cost(planner_prompt, planner_completion);
655        self.record_cache_usage();
656
657        if confirm_before_execute {
658            let summary = format_plan_summary(&graph);
659            self.channel.send(&summary).await?;
660            self.channel
661                .send("Type `/plan confirm` to execute, or `/plan cancel` to abort.")
662                .await?;
663            self.orchestration.pending_graph = Some(graph);
664        } else {
665            // confirm_before_execute = false: display and proceed (Phase 5 will run scheduler).
666            // TODO(#1241): wire DagScheduler tick updates for Running task state
667            let summary = format_plan_summary(&graph);
668            self.channel.send(&summary).await?;
669            self.channel
670                .send("Plan ready. Full execution will be available in a future phase.")
671                .await?;
672            // IC1: graph was shown but never confirmed; clear snapshot so it doesn't linger.
673            let now = std::time::Instant::now();
674            self.update_metrics(|m| {
675                if let Some(ref mut s) = m.orchestration_graph {
676                    "completed".clone_into(&mut s.status);
677                    s.completed_at = Some(now);
678                }
679            });
680            // pending_goal_embedding intentionally not cleared — overwritten on next /plan goal.
681        }
682
683        Ok(())
684    }
685
686    /// Validate that the pending plan graph can be executed.
687    ///
688    /// Sends an appropriate error message and restores the graph to `pending_graph` when
689    /// validation fails. Returns `Ok(graph)` on success, `Err(())` when validation failed
690    /// and the caller should return early.
691    async fn validate_pending_graph(
692        &mut self,
693        graph: crate::orchestration::TaskGraph,
694    ) -> Result<crate::orchestration::TaskGraph, ()> {
695        use crate::orchestration::GraphStatus;
696
697        if self.orchestration.subagent_manager.is_none() {
698            let _ = self
699                .channel
700                .send(
701                    "No sub-agents configured. Add sub-agent definitions to config \
702                     to enable plan execution.",
703                )
704                .await;
705            self.orchestration.pending_graph = Some(graph);
706            return Err(());
707        }
708
709        // REV-2: pre-validate before moving graph into the constructor so we can
710        // restore it to pending_graph on failure.
711        if graph.tasks.is_empty() {
712            let _ = self.channel.send("Plan has no tasks.").await;
713            self.orchestration.pending_graph = Some(graph);
714            return Err(());
715        }
716
717        // resume_from() rejects Completed and Canceled — guard those here too.
718        if matches!(graph.status, GraphStatus::Completed | GraphStatus::Canceled) {
719            let _ = self
720                .channel
721                .send(&format!(
722                    "Cannot re-execute a {} plan. Use `/plan <goal>` to create a new one.",
723                    graph.status
724                ))
725                .await;
726            self.orchestration.pending_graph = Some(graph);
727            return Err(());
728        }
729
730        Ok(graph)
731    }
732
733    /// Build a [`DagScheduler`] from the graph, reserving sub-agent slots.
734    ///
735    /// Returns `(scheduler, reserved)` on success or an `AgentError` on failure.
736    /// Callers must call `mgr.release_reservation(reserved)` when done.
737    fn build_dag_scheduler(
738        &mut self,
739        graph: crate::orchestration::TaskGraph,
740    ) -> Result<(crate::orchestration::DagScheduler, usize), error::AgentError> {
741        use crate::orchestration::{DagScheduler, GraphStatus, RuleBasedRouter};
742
743        let available_agents = self
744            .orchestration
745            .subagent_manager
746            .as_ref()
747            .map(|m| m.definitions().to_vec())
748            .unwrap_or_default();
749
750        // Warn when max_concurrent is too low to support the configured parallelism.
751        // This is the main cause of DagScheduler deadlocks (#1619): a planning-phase
752        // sub-agent occupies the only slot while orchestration tasks are waiting.
753        let max_concurrent = self.orchestration.subagent_config.max_concurrent;
754        let max_parallel = self.orchestration.orchestration_config.max_parallel as usize;
755        if max_concurrent < max_parallel + 1 {
756            tracing::warn!(
757                max_concurrent,
758                max_parallel,
759                "max_concurrent < max_parallel + 1: orchestration tasks may be starved by \
760                 planning-phase sub-agents; recommend setting max_concurrent >= {}",
761                max_parallel + 1
762            );
763        }
764
765        // Reserve slots equal to max_parallel so the scheduler is guaranteed capacity
766        // even if a planning-phase sub-agent is occupying a slot (#1619).
767        let reserved = max_parallel.min(max_concurrent.saturating_sub(1));
768        if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
769            mgr.reserve_slots(reserved);
770        }
771
772        // Use resume_from() for graphs that are no longer in Created status
773        // (e.g., after /plan retry which calls reset_for_retry and sets status=Running).
774        let scheduler = if graph.status == GraphStatus::Created {
775            DagScheduler::new(
776                graph,
777                &self.orchestration.orchestration_config,
778                Box::new(RuleBasedRouter),
779                available_agents,
780            )
781        } else {
782            DagScheduler::resume_from(
783                graph,
784                &self.orchestration.orchestration_config,
785                Box::new(RuleBasedRouter),
786                available_agents,
787            )
788        }
789        .map_err(|e| {
790            // Release reservation before propagating error.
791            if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
792                mgr.release_reservation(reserved);
793            }
794            error::AgentError::Other(e.to_string())
795        })?;
796
797        // Validate verify_provider name against the known provider pool (#2238).
798        let provider_names: Vec<&str> = self
799            .providers
800            .provider_pool
801            .iter()
802            .filter_map(|e| e.name.as_deref())
803            .collect();
804        scheduler
805            .validate_verify_config(&provider_names)
806            .map_err(|e| {
807                if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
808                    mgr.release_reservation(reserved);
809                }
810                error::AgentError::Other(e.to_string())
811            })?;
812
813        Ok((scheduler, reserved))
814    }
815
816    async fn handle_plan_confirm(&mut self) -> Result<(), error::AgentError> {
817        let Some(graph) = self.orchestration.pending_graph.take() else {
818            self.channel
819                .send("No pending plan to confirm. Use `/plan <goal>` to create one.")
820                .await?;
821            return Ok(());
822        };
823
824        // validate_pending_graph sends the error message and restores the graph on failure.
825        let Ok(graph) = self.validate_pending_graph(graph).await else {
826            return Ok(());
827        };
828
829        let (mut scheduler, reserved) = self.build_dag_scheduler(graph)?;
830
831        let task_count = scheduler.graph().tasks.len();
832        self.channel
833            .send(&format!(
834                "Confirmed. Executing plan ({task_count} tasks)..."
835            ))
836            .await?;
837
838        let plan_token = CancellationToken::new();
839        self.orchestration.plan_cancel_token = Some(plan_token.clone());
840
841        // Use match instead of ? so plan_cancel_token is always cleared (CRIT-07).
842        let scheduler_result = self
843            .run_scheduler_loop(&mut scheduler, task_count, plan_token)
844            .await;
845        self.orchestration.plan_cancel_token = None;
846
847        // Always release the reservation, regardless of scheduler outcome.
848        if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
849            mgr.release_reservation(reserved);
850        }
851
852        let final_status = scheduler_result?;
853        let completed_graph = scheduler.into_graph();
854
855        // Final TUI snapshot update.
856        let snapshot = crate::metrics::TaskGraphSnapshot::from(&completed_graph);
857        self.update_metrics(|m| {
858            m.orchestration_graph = Some(snapshot);
859        });
860
861        let result_label = self
862            .finalize_plan_execution(completed_graph, final_status)
863            .await?;
864
865        let now = std::time::Instant::now();
866        self.update_metrics(|m| {
867            if let Some(ref mut s) = m.orchestration_graph {
868                result_label.clone_into(&mut s.status);
869                s.completed_at = Some(now);
870            }
871        });
872        Ok(())
873    }
874
875    /// Cancel all agents referenced in `cancel_actions`.
876    ///
877    /// Returns `Some(status)` if a `Done` action is encountered, `None` otherwise.
878    fn cancel_agents_from_actions(
879        &mut self,
880        cancel_actions: Vec<crate::orchestration::SchedulerAction>,
881    ) -> Option<crate::orchestration::GraphStatus> {
882        use crate::orchestration::SchedulerAction;
883        for action in cancel_actions {
884            match action {
885                SchedulerAction::Cancel { agent_handle_id } => {
886                    if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
887                        let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
888                            tracing::trace!(error = %e, "cancel: agent already gone");
889                        });
890                    }
891                }
892                SchedulerAction::Done { status } => return Some(status),
893                SchedulerAction::Spawn { .. }
894                | SchedulerAction::RunInline { .. }
895                | SchedulerAction::Verify { .. } => {}
896            }
897        }
898        None
899    }
900
901    /// Handle a `SchedulerAction::Spawn` — attempt to spawn a sub-agent for the given task.
902    ///
903    /// Returns `(spawn_success, concurrency_fail, done_status)`.
904    /// `done_status` is `Some` when spawn failure forces the scheduler to emit a `Done` action.
905    async fn handle_scheduler_spawn_action(
906        &mut self,
907        scheduler: &mut crate::orchestration::DagScheduler,
908        task_id: crate::orchestration::TaskId,
909        agent_def_name: String,
910        prompt: String,
911        spawn_counter: &mut usize,
912        task_count: usize,
913    ) -> (bool, bool, Option<crate::orchestration::GraphStatus>) {
914        let task_title = scheduler
915            .graph()
916            .tasks
917            .get(task_id.index())
918            .map_or("unknown", |t| t.title.as_str());
919
920        let provider = self.provider.clone();
921        let tool_executor = Arc::clone(&self.tool_executor);
922        let skills = self.filtered_skills_for(&agent_def_name);
923        let cfg = self.orchestration.subagent_config.clone();
924        let event_tx = scheduler.event_sender();
925
926        let mgr = self
927            .orchestration
928            .subagent_manager
929            .as_mut()
930            .expect("subagent_manager checked above");
931
932        let on_done = {
933            use crate::orchestration::{TaskEvent, TaskOutcome};
934            move |handle_id: String, result: Result<String, crate::subagent::SubAgentError>| {
935                let outcome = match &result {
936                    Ok(output) => TaskOutcome::Completed {
937                        output: output.clone(),
938                        artifacts: vec![],
939                    },
940                    Err(e) => TaskOutcome::Failed {
941                        error: e.to_string(),
942                    },
943                };
944                let tx = event_tx;
945                tokio::spawn(async move {
946                    if let Err(e) = tx
947                        .send(TaskEvent {
948                            task_id,
949                            agent_handle_id: handle_id,
950                            outcome,
951                        })
952                        .await
953                    {
954                        tracing::warn!(
955                            error = %e,
956                            "failed to send TaskEvent: scheduler may have been dropped"
957                        );
958                    }
959                });
960            }
961        };
962
963        match mgr.spawn_for_task(
964            &agent_def_name,
965            &prompt,
966            provider,
967            tool_executor,
968            skills,
969            &cfg,
970            on_done,
971        ) {
972            Ok(handle_id) => {
973                *spawn_counter += 1;
974                let _ = self
975                    .channel
976                    .send_status(&format!(
977                        "Executing task {spawn_counter}/{task_count}: {task_title}..."
978                    ))
979                    .await;
980                scheduler.record_spawn(task_id, handle_id, agent_def_name);
981                (true, false, None)
982            }
983            Err(e) => {
984                tracing::error!(error = %e, %task_id, "spawn_for_task failed");
985                let concurrency_fail =
986                    matches!(e, crate::subagent::SubAgentError::ConcurrencyLimit { .. });
987                let extra = scheduler.record_spawn_failure(task_id, &e);
988                let done_status = self.cancel_agents_from_actions(extra);
989                (false, concurrency_fail, done_status)
990            }
991        }
992    }
993
994    /// Execute a `RunInline` scheduler action: run the task synchronously in the current agent.
995    ///
996    /// Sends a status update, registers the spawn with the scheduler, runs the inline tool
997    /// loop (or cancels on token fire), and posts the completion event back to the scheduler.
998    async fn handle_run_inline_action(
999        &mut self,
1000        scheduler: &mut crate::orchestration::DagScheduler,
1001        task_id: crate::orchestration::TaskId,
1002        prompt: String,
1003        spawn_counter: usize,
1004        task_count: usize,
1005        cancel_token: &CancellationToken,
1006    ) {
1007        let task_title = scheduler
1008            .graph()
1009            .tasks
1010            .get(task_id.index())
1011            .map_or("unknown", |t| t.title.as_str());
1012        let _ = self
1013            .channel
1014            .send_status(&format!(
1015                "Executing task {spawn_counter}/{task_count} (inline): {task_title}..."
1016            ))
1017            .await;
1018
1019        // record_spawn before the inline call so the completion event is always
1020        // buffered before any timeout check fires in the next tick().
1021        let handle_id = format!("__inline_{task_id}__");
1022        scheduler.record_spawn(task_id, handle_id.clone(), "__main__".to_string());
1023
1024        let event_tx = scheduler.event_sender();
1025        let max_iter = self.tool_orchestrator.max_iterations;
1026        let outcome = tokio::select! {
1027            result = self.run_inline_tool_loop(&prompt, max_iter) => {
1028                match result {
1029                    Ok(output) => crate::orchestration::TaskOutcome::Completed {
1030                        output,
1031                        artifacts: vec![],
1032                    },
1033                    Err(e) => crate::orchestration::TaskOutcome::Failed {
1034                        error: e.to_string(),
1035                    },
1036                }
1037            }
1038            () = cancel_token.cancelled() => {
1039                // TODO: use TaskOutcome::Canceled when the variant is added (#1603)
1040                crate::orchestration::TaskOutcome::Failed {
1041                    error: "canceled".to_string(),
1042                }
1043            }
1044        };
1045        let event = crate::orchestration::TaskEvent {
1046            task_id,
1047            agent_handle_id: handle_id,
1048            outcome,
1049        };
1050        if let Err(e) = event_tx.send(event).await {
1051            tracing::warn!(%task_id, error = %e, "inline task event send failed");
1052        }
1053    }
1054
1055    // too_many_lines: sequential scheduler event loop with 4 tokio::select! branches
1056    // (cancel token, scheduler tick, channel recv with /plan cancel + channel-close paths,
1057    // shutdown signal) — each branch requires distinct cancel/fail/ignore semantics that
1058    // cannot be split without introducing shared mutable state across async boundaries.
1059    #[allow(clippy::too_many_lines)]
1060    /// Drive the [`DagScheduler`] tick loop until it emits `SchedulerAction::Done`.
1061    ///
1062    /// Each iteration yields at `wait_event()`, during which `channel.recv()` is polled
1063    /// concurrently via `tokio::select!`. If the user sends `/plan cancel`, all running
1064    /// sub-agent tasks are aborted and the loop exits with [`GraphStatus::Canceled`].
1065    /// If the channel is closed (`Ok(None)`), all running sub-agent tasks are aborted
1066    /// and the loop exits with [`GraphStatus::Failed`].
1067    /// Other messages received during execution are queued in `message_queue` and
1068    /// processed after the plan completes.
1069    ///
1070    /// # Known limitations
1071    ///
1072    /// `RunInline` tasks block the tick loop for their entire duration — `/plan cancel`
1073    /// cannot interrupt an in-progress inline LLM call and will only be delivered on the
1074    /// next iteration after the call completes.
1075    async fn run_scheduler_loop(
1076        &mut self,
1077        scheduler: &mut crate::orchestration::DagScheduler,
1078        task_count: usize,
1079        cancel_token: CancellationToken,
1080    ) -> Result<crate::orchestration::GraphStatus, error::AgentError> {
1081        use crate::orchestration::SchedulerAction;
1082
1083        // Sequential spawn counter for human-readable "task N/M" progress messages.
1084        // task_id.index() reflects array position and can be non-contiguous for
1085        // parallel plans (e.g. 0, 2, 4), so we use a local counter instead.
1086        let mut spawn_counter: usize = 0;
1087
1088        // Tracks (handle_id, secret_key) pairs denied this plan execution to prevent
1089        // re-prompting the user when a sub-agent re-requests the same secret after denial.
1090        let mut denied_secrets: std::collections::HashSet<(String, String)> =
1091            std::collections::HashSet::new();
1092
1093        let final_status = 'tick: loop {
1094            let actions = scheduler.tick();
1095
1096            // Track batch-level spawn outcomes for record_batch_backoff() below.
1097            let mut any_spawn_success = false;
1098            let mut any_concurrency_failure = false;
1099
1100            for action in actions {
1101                match action {
1102                    SchedulerAction::Spawn {
1103                        task_id,
1104                        agent_def_name,
1105                        prompt,
1106                    } => {
1107                        let (success, fail, done) = self
1108                            .handle_scheduler_spawn_action(
1109                                scheduler,
1110                                task_id,
1111                                agent_def_name,
1112                                prompt,
1113                                &mut spawn_counter,
1114                                task_count,
1115                            )
1116                            .await;
1117                        any_spawn_success |= success;
1118                        any_concurrency_failure |= fail;
1119                        if let Some(s) = done {
1120                            break 'tick s;
1121                        }
1122                    }
1123                    SchedulerAction::Cancel { agent_handle_id } => {
1124                        if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1125                            // benign race: agent may have already finished
1126                            let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1127                                tracing::trace!(error = %e, "cancel: agent already gone");
1128                            });
1129                        }
1130                    }
1131                    // Inline execution: the LLM call blocks this tick loop for its
1132                    // duration. This is intentionally sequential and only expected in
1133                    // single-agent setups where no sub-agents are configured.
1134                    // Known limitation: if a RunInline action appears before Spawn actions
1135                    // in the same batch (mixed routing), those Spawn actions are delayed
1136                    // until the inline call completes. Refactor to tokio::spawn if mixed
1137                    // batches become common.
1138                    // TODO(post-MVP): wire CancellationToken into run_inline_tool_loop so
1139                    // that /plan cancel can interrupt a long-running inline LLM call instead
1140                    // of waiting for the current iteration to complete.
1141                    SchedulerAction::RunInline { task_id, prompt } => {
1142                        spawn_counter += 1;
1143                        self.handle_run_inline_action(
1144                            scheduler,
1145                            task_id,
1146                            prompt,
1147                            spawn_counter,
1148                            task_count,
1149                            &cancel_token,
1150                        )
1151                        .await;
1152                    }
1153                    SchedulerAction::Done { status } => {
1154                        break 'tick status;
1155                    }
1156                    SchedulerAction::Verify { .. } => {
1157                        // Verification is fire-and-forget from the scheduler's perspective.
1158                        // The core agent does not drive PlanVerifier directly — that is handled
1159                        // by the scheduler's inject_tasks path. No action needed here.
1160                    }
1161                }
1162            }
1163
1164            // Update batch-level backoff counter after processing all Spawn actions.
1165            scheduler.record_batch_backoff(any_spawn_success, any_concurrency_failure);
1166
1167            // Drain all pending secret requests this tick (MED-2 fix).
1168            self.process_pending_secret_requests(&mut denied_secrets)
1169                .await;
1170
1171            // Update TUI with current graph state.
1172            let snapshot = crate::metrics::TaskGraphSnapshot::from(scheduler.graph());
1173            self.update_metrics(|m| {
1174                m.orchestration_graph = Some(snapshot);
1175            });
1176
1177            // NOTE(Telegram): Telegram's recv() is not fully cancel-safe — a message
1178            // consumed from the internal mpsc but not yet returned can be lost if the
1179            // select! cancels the future during the /start send().await path. For
1180            // non-command messages the race window is negligible. Acceptable for MVP.
1181            //
1182            // NOTE(RunInline): tasks in the RunInline arm above block this tick loop
1183            // synchronously (no await between loop iteration start and wait_event).
1184            // /plan cancel cannot interrupt an inline LLM call mid-execution; it is
1185            // delivered on the next tick after the inline call completes.
1186            // TODO(post-MVP): wire CancellationToken into run_inline_tool_loop.
1187            tokio::select! {
1188                // biased: token cancellation takes priority over new events and input.
1189                biased;
1190                () = cancel_token.cancelled() => {
1191                    let cancel_actions = scheduler.cancel_all();
1192                    for action in cancel_actions {
1193                        match action {
1194                            SchedulerAction::Cancel { agent_handle_id } => {
1195                                if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1196                                    let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1197                                        tracing::trace!(
1198                                            error = %e,
1199                                            "cancel during plan cancellation: agent already gone"
1200                                        );
1201                                    });
1202                                }
1203                            }
1204                            SchedulerAction::Done { status } => {
1205                                break 'tick status;
1206                            }
1207                            SchedulerAction::Spawn { .. }
1208                            | SchedulerAction::RunInline { .. }
1209                            | SchedulerAction::Verify { .. } => {}
1210                        }
1211                    }
1212                    // Defensive fallback: cancel_all always emits Done, but guard against
1213                    // future changes.
1214                    break 'tick crate::orchestration::GraphStatus::Canceled;
1215                }
1216                () = scheduler.wait_event() => {}
1217                result = self.channel.recv() => {
1218                    if let Ok(Some(msg)) = result {
1219                        if msg.text.trim().eq_ignore_ascii_case("/plan cancel") {
1220                            let _ = self.channel.send_status("Canceling plan...").await;
1221                            let cancel_actions = scheduler.cancel_all();
1222                            for ca in cancel_actions {
1223                                match ca {
1224                                    SchedulerAction::Cancel { agent_handle_id } => {
1225                                        if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1226                                            // benign race: agent may have already finished
1227                                            let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1228                                                tracing::trace!(error = %e, "cancel on user request: agent already gone");
1229                                            });
1230                                        }
1231                                    }
1232                                    SchedulerAction::Done { status } => {
1233                                        break 'tick status;
1234                                    }
1235                                    SchedulerAction::Spawn { .. }
1236                                    | SchedulerAction::RunInline { .. }
1237                                    | SchedulerAction::Verify { .. } => {}
1238                                }
1239                            }
1240                            // Defensive fallback: cancel_all always emits Done, but guard
1241                            // against future changes.
1242                            break 'tick crate::orchestration::GraphStatus::Canceled;
1243                        }
1244                        self.enqueue_or_merge(msg.text, vec![], msg.attachments);
1245                    } else {
1246                        // Channel closed. Drain buffered completion events BEFORE canceling
1247                        // so that tasks which completed between the last tick and the
1248                        // channel-close are recorded as Completed, not Canceled.
1249                        // cancel_all() empties self.running first, causing process_event()
1250                        // to silently discard any late completions — drain must come first.
1251                        let drain_actions = scheduler.tick();
1252                        let mut natural_done: Option<crate::orchestration::GraphStatus> = None;
1253                        for action in drain_actions {
1254                            match action {
1255                                SchedulerAction::Cancel { agent_handle_id } => {
1256                                    if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1257                                        let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1258                                            tracing::trace!(
1259                                                error = %e,
1260                                                "cancel during drain on channel close: agent already gone"
1261                                            );
1262                                        });
1263                                    }
1264                                }
1265                                SchedulerAction::Done { status } => {
1266                                    natural_done = Some(status);
1267                                }
1268                                // Ignore Spawn/RunInline/Verify — we are shutting down.
1269                                SchedulerAction::Spawn { .. }
1270                                | SchedulerAction::RunInline { .. }
1271                                | SchedulerAction::Verify { .. } => {}
1272                            }
1273                        }
1274
1275                        // If the plan completed naturally during the drain tick, honor that.
1276                        if let Some(status) = natural_done {
1277                            break 'tick status;
1278                        }
1279
1280                        // Cancel remaining running tasks after the drain.
1281                        let cancel_actions = scheduler.cancel_all();
1282                        let n = cancel_actions
1283                            .iter()
1284                            .filter(|a| matches!(a, SchedulerAction::Cancel { .. }))
1285                            .count();
1286                        // Use supports_exit() to distinguish termination semantics:
1287                        // - CLI/TUI (supports_exit=true): stdin EOF or TUI close → Canceled
1288                        // - Telegram/Discord/Slack (supports_exit=false): infra failure → Failed
1289                        //   so the user can /plan retry after reconnecting.
1290                        let shutdown_status = if self.channel.supports_exit() {
1291                            crate::orchestration::GraphStatus::Canceled
1292                        } else {
1293                            crate::orchestration::GraphStatus::Failed
1294                        };
1295                        tracing::warn!(
1296                            sub_agents = n,
1297                            supports_exit = self.channel.supports_exit(),
1298                            status = ?shutdown_status,
1299                            "scheduler channel closed, canceling running sub-agents"
1300                        );
1301                        for action in cancel_actions {
1302                            match action {
1303                                SchedulerAction::Cancel { agent_handle_id } => {
1304                                    if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1305                                        let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1306                                            tracing::trace!(
1307                                                error = %e,
1308                                                "cancel on channel close: agent already gone"
1309                                            );
1310                                        });
1311                                    }
1312                                }
1313                                // Intentionally ignore Done here — we use shutdown_status above.
1314                                SchedulerAction::Done { .. }
1315                                | SchedulerAction::Spawn { .. }
1316                                | SchedulerAction::RunInline { .. }
1317                                | SchedulerAction::Verify { .. } => {}
1318                            }
1319                        }
1320                        break 'tick shutdown_status;
1321                    }
1322                }
1323                // Shutdown signal received — cancel running sub-agents and exit cleanly.
1324                () = shutdown_signal(&mut self.lifecycle.shutdown) => {
1325                    let cancel_actions = scheduler.cancel_all();
1326                    let n = cancel_actions
1327                        .iter()
1328                        .filter(|a| matches!(a, SchedulerAction::Cancel { .. }))
1329                        .count();
1330                    tracing::warn!(sub_agents = n, "shutdown signal received, canceling running sub-agents");
1331                    for action in cancel_actions {
1332                        match action {
1333                            SchedulerAction::Cancel { agent_handle_id } => {
1334                                if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1335                                    let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1336                                        tracing::trace!(
1337                                            error = %e,
1338                                            "cancel on shutdown: agent already gone"
1339                                        );
1340                                    });
1341                                }
1342                            }
1343                            SchedulerAction::Done { status } => {
1344                                break 'tick status;
1345                            }
1346                            SchedulerAction::Spawn { .. }
1347                            | SchedulerAction::RunInline { .. }
1348                            | SchedulerAction::Verify { .. } => {}
1349                        }
1350                    }
1351                    // Defensive fallback: cancel_all always emits Done, but guard against
1352                    // future changes.
1353                    break 'tick crate::orchestration::GraphStatus::Canceled;
1354                }
1355            }
1356        };
1357
1358        // Final drain: if the loop exited via Done on the first tick, secret
1359        // requests buffered before completion would otherwise be silently dropped.
1360        self.process_pending_secret_requests(&mut std::collections::HashSet::new())
1361            .await;
1362
1363        Ok(final_status)
1364    }
1365
1366    /// Run a tool-aware LLM loop for an inline scheduled task.
1367    ///
1368    /// Unlike [`process_response_native_tools`], this is intentionally stripped of all
1369    /// interactive-session machinery (channel sends, doom-loop detection, summarization,
1370    /// learning engine, sanitizer, metrics). Inline tasks are short-lived orchestration
1371    /// sub-tasks that run synchronously inside the scheduler tick loop.
1372    async fn run_inline_tool_loop(
1373        &self,
1374        prompt: &str,
1375        max_iterations: usize,
1376    ) -> Result<String, zeph_llm::LlmError> {
1377        use zeph_llm::provider::{ChatResponse, Message, MessagePart, Role, ToolDefinition};
1378        use zeph_tools::executor::ToolCall;
1379
1380        // CRIT-01 / TAFC isolation: inline tool loops run as subagent orchestration tasks
1381        // (scheduler, planner, aggregator) and intentionally bypass TAFC augmentation.
1382        // They use their own private message history and never surface TAFC think fields
1383        // to the interactive session, so no stripping is needed here (CRIT-02 compliance).
1384        let tool_defs: Vec<ToolDefinition> = self
1385            .tool_executor
1386            .tool_definitions_erased()
1387            .iter()
1388            .map(tool_execution::tool_def_to_definition)
1389            .collect();
1390
1391        tracing::debug!(
1392            prompt_len = prompt.len(),
1393            max_iterations,
1394            tool_count = tool_defs.len(),
1395            "inline tool loop: starting"
1396        );
1397
1398        let mut messages: Vec<Message> = vec![Message::from_legacy(Role::User, prompt)];
1399        let mut last_text = String::new();
1400
1401        for iteration in 0..max_iterations {
1402            let response = self.provider.chat_with_tools(&messages, &tool_defs).await?;
1403
1404            match response {
1405                ChatResponse::Text(text) => {
1406                    tracing::debug!(iteration, "inline tool loop: text response, returning");
1407                    return Ok(text);
1408                }
1409                ChatResponse::ToolUse {
1410                    text, tool_calls, ..
1411                } => {
1412                    tracing::debug!(
1413                        iteration,
1414                        tools = ?tool_calls.iter().map(|tc| &tc.name).collect::<Vec<_>>(),
1415                        "inline tool loop: tool use"
1416                    );
1417
1418                    if let Some(ref t) = text {
1419                        last_text.clone_from(t);
1420                    }
1421
1422                    // Build assistant message with optional leading text + tool use parts.
1423                    let mut parts: Vec<MessagePart> = Vec::new();
1424                    if let Some(ref t) = text
1425                        && !t.is_empty()
1426                    {
1427                        parts.push(MessagePart::Text { text: t.clone() });
1428                    }
1429                    for tc in &tool_calls {
1430                        parts.push(MessagePart::ToolUse {
1431                            id: tc.id.clone(),
1432                            name: tc.name.clone(),
1433                            input: tc.input.clone(),
1434                        });
1435                    }
1436                    messages.push(Message::from_parts(Role::Assistant, parts));
1437
1438                    // Execute each tool call and collect results.
1439                    let mut result_parts: Vec<MessagePart> = Vec::new();
1440                    for tc in &tool_calls {
1441                        let call = ToolCall {
1442                            tool_id: tc.name.clone(),
1443                            params: match &tc.input {
1444                                serde_json::Value::Object(map) => map.clone(),
1445                                _ => serde_json::Map::new(),
1446                            },
1447                        };
1448                        let output = match self.tool_executor.execute_tool_call_erased(&call).await
1449                        {
1450                            Ok(Some(out)) => out.summary,
1451                            Ok(None) => "(no output)".to_owned(),
1452                            Err(e) => format!("[error] {e}"),
1453                        };
1454                        let is_error = output.starts_with("[error]");
1455                        result_parts.push(MessagePart::ToolResult {
1456                            tool_use_id: tc.id.clone(),
1457                            content: output,
1458                            is_error,
1459                        });
1460                    }
1461                    messages.push(Message::from_parts(Role::User, result_parts));
1462                }
1463            }
1464        }
1465
1466        tracing::debug!(
1467            max_iterations,
1468            last_text_empty = last_text.is_empty(),
1469            "inline tool loop: iteration limit reached"
1470        );
1471        Ok(last_text)
1472    }
1473
1474    /// Bridge pending secret requests from sub-agents to the user (non-blocking, time-bounded).
1475    ///
1476    /// SEC-P1-02: explicit user confirmation is required before granting any secret to a
1477    /// sub-agent. Denial is the default on timeout or channel error.
1478    ///
1479    /// `denied` tracks `(handle_id, secret_key)` pairs already denied this plan execution.
1480    /// Re-requests for a denied pair are auto-denied without prompting the user.
1481    async fn process_pending_secret_requests(
1482        &mut self,
1483        denied: &mut std::collections::HashSet<(String, String)>,
1484    ) {
1485        loop {
1486            let pending = self
1487                .orchestration
1488                .subagent_manager
1489                .as_mut()
1490                .and_then(crate::subagent::SubAgentManager::try_recv_secret_request);
1491            let Some((req_handle_id, req)) = pending else {
1492                break;
1493            };
1494            let deny_key = (req_handle_id.clone(), req.secret_key.clone());
1495            if denied.contains(&deny_key) {
1496                tracing::debug!(
1497                    handle_id = %req_handle_id,
1498                    secret_key = %req.secret_key,
1499                    "skipping duplicate secret prompt for already-denied key"
1500                );
1501                if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1502                    let _ = mgr.deny_secret(&req_handle_id);
1503                }
1504                continue;
1505            }
1506            let prompt = format!(
1507                "Sub-agent requests secret '{}'. Allow?{}",
1508                crate::text::truncate_to_chars(&req.secret_key, 100),
1509                req.reason
1510                    .as_deref()
1511                    .map(|r| format!(" Reason: {}", crate::text::truncate_to_chars(r, 200)))
1512                    .unwrap_or_default()
1513            );
1514            // CRIT-1 fix: use select! to avoid blocking the tick loop forever.
1515            let approved = tokio::select! {
1516                result = self.channel.confirm(&prompt) => result.unwrap_or(false),
1517                () = tokio::time::sleep(std::time::Duration::from_secs(120)) => {
1518                    let _ = self.channel.send("Secret request timed out.").await;
1519                    false
1520                }
1521            };
1522            if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1523                if approved {
1524                    let ttl = std::time::Duration::from_secs(300);
1525                    let key = req.secret_key.clone();
1526                    if mgr.approve_secret(&req_handle_id, &key, ttl).is_ok() {
1527                        let _ = mgr.deliver_secret(&req_handle_id, key);
1528                    }
1529                } else {
1530                    denied.insert(deny_key);
1531                    let _ = mgr.deny_secret(&req_handle_id);
1532                }
1533            }
1534        }
1535    }
1536
1537    /// Aggregate results or report failure after the tick loop completes.
1538    #[allow(clippy::too_many_lines)]
1539    async fn finalize_plan_execution(
1540        &mut self,
1541        completed_graph: crate::orchestration::TaskGraph,
1542        final_status: crate::orchestration::GraphStatus,
1543    ) -> Result<&'static str, error::AgentError> {
1544        use std::fmt::Write;
1545
1546        use crate::orchestration::{Aggregator, GraphStatus, LlmAggregator};
1547
1548        let result_label = match final_status {
1549            GraphStatus::Completed => {
1550                // Update task completion counters.
1551                let completed_count = completed_graph
1552                    .tasks
1553                    .iter()
1554                    .filter(|t| t.status == crate::orchestration::TaskStatus::Completed)
1555                    .count() as u64;
1556                let skipped_count = completed_graph
1557                    .tasks
1558                    .iter()
1559                    .filter(|t| t.status == crate::orchestration::TaskStatus::Skipped)
1560                    .count() as u64;
1561                self.update_metrics(|m| {
1562                    m.orchestration.tasks_completed += completed_count;
1563                    m.orchestration.tasks_skipped += skipped_count;
1564                });
1565
1566                let aggregator = LlmAggregator::new(
1567                    self.provider.clone(),
1568                    &self.orchestration.orchestration_config,
1569                );
1570                match aggregator.aggregate(&completed_graph).await {
1571                    Ok((synthesis, aggregator_usage)) => {
1572                        let (aggr_prompt, aggr_completion) = aggregator_usage.unwrap_or((0, 0));
1573                        self.update_metrics(|m| {
1574                            m.api_calls += 1;
1575                            m.prompt_tokens += aggr_prompt;
1576                            m.completion_tokens += aggr_completion;
1577                            m.total_tokens = m.prompt_tokens + m.completion_tokens;
1578                        });
1579                        self.record_cost(aggr_prompt, aggr_completion);
1580                        self.record_cache_usage();
1581                        self.channel.send(&synthesis).await?;
1582                    }
1583                    Err(e) => {
1584                        tracing::error!(error = %e, "aggregation failed");
1585                        self.channel
1586                            .send(
1587                                "Plan completed but aggregation failed. \
1588                                 Check individual task results.",
1589                            )
1590                            .await?;
1591                    }
1592                }
1593
1594                // Cache the completed plan template (best-effort, never blocks execution).
1595                if let Some(ref cache) = self.orchestration.plan_cache
1596                    && let Some(embedding) = self.orchestration.pending_goal_embedding.take()
1597                {
1598                    let embed_model = self.skill_state.embedding_model.clone();
1599                    if let Err(e) = cache
1600                        .cache_plan(&completed_graph, &embedding, &embed_model)
1601                        .await
1602                    {
1603                        tracing::warn!(error = %e, "plan cache: failed to cache completed plan");
1604                    }
1605                }
1606
1607                "completed"
1608            }
1609            GraphStatus::Failed => {
1610                let failed_tasks: Vec<_> = completed_graph
1611                    .tasks
1612                    .iter()
1613                    .filter(|t| t.status == crate::orchestration::TaskStatus::Failed)
1614                    .collect();
1615                let cancelled_tasks: Vec<_> = completed_graph
1616                    .tasks
1617                    .iter()
1618                    .filter(|t| t.status == crate::orchestration::TaskStatus::Canceled)
1619                    .collect();
1620                let completed_count = completed_graph
1621                    .tasks
1622                    .iter()
1623                    .filter(|t| t.status == crate::orchestration::TaskStatus::Completed)
1624                    .count() as u64;
1625                let skipped_count = completed_graph
1626                    .tasks
1627                    .iter()
1628                    .filter(|t| t.status == crate::orchestration::TaskStatus::Skipped)
1629                    .count() as u64;
1630                self.update_metrics(|m| {
1631                    m.orchestration.tasks_failed += failed_tasks.len() as u64;
1632                    m.orchestration.tasks_completed += completed_count;
1633                    m.orchestration.tasks_skipped += skipped_count;
1634                });
1635                let total = completed_graph.tasks.len();
1636                let msg = if failed_tasks.is_empty() && !cancelled_tasks.is_empty() {
1637                    // Pure scheduler deadlock: no tasks actually failed, some were canceled.
1638                    format!(
1639                        "Plan canceled. {}/{} tasks did not run.\n\
1640                         Use `/plan retry` to retry or check logs for details.",
1641                        cancelled_tasks.len(),
1642                        total
1643                    )
1644                } else if failed_tasks.is_empty() && cancelled_tasks.is_empty() {
1645                    // Should not occur through normal scheduler paths; make it visible.
1646                    tracing::warn!(
1647                        "plan finished with GraphStatus::Failed but no failed or canceled tasks"
1648                    );
1649                    "Plan failed. No task errors recorded; check logs for details.".to_string()
1650                } else {
1651                    let mut m = if cancelled_tasks.is_empty() {
1652                        format!(
1653                            "Plan failed. {}/{} tasks failed:\n",
1654                            failed_tasks.len(),
1655                            total
1656                        )
1657                    } else {
1658                        format!(
1659                            "Plan failed. {}/{} tasks failed, {} canceled:\n",
1660                            failed_tasks.len(),
1661                            total,
1662                            cancelled_tasks.len()
1663                        )
1664                    };
1665                    for t in &failed_tasks {
1666                        // SEC-M34-002: truncate raw task output before displaying to user.
1667                        let err: std::borrow::Cow<str> =
1668                            t.result.as_ref().map_or("unknown error".into(), |r| {
1669                                if r.output.len() > 500 {
1670                                    r.output.chars().take(500).collect::<String>().into()
1671                                } else {
1672                                    r.output.as_str().into()
1673                                }
1674                            });
1675                        let _ = writeln!(m, "  - {}: {err}", t.title);
1676                    }
1677                    m.push_str("\nUse `/plan retry` to retry failed tasks.");
1678                    m
1679                };
1680                self.channel.send(&msg).await?;
1681                // Store graph back so /plan retry and /plan resume work.
1682                // pending_goal_embedding is retained: retry/resume goes through handle_plan_confirm
1683                // -> finalize_plan_execution again, reusing the same embedding. A new /plan goal
1684                // cannot be issued while pending_graph is Some, so the embedding cannot go stale.
1685                self.orchestration.pending_graph = Some(completed_graph);
1686                "failed"
1687            }
1688            GraphStatus::Paused => {
1689                self.channel
1690                    .send(
1691                        "Plan paused due to a task failure (ask strategy). \
1692                         Use `/plan resume` to continue or `/plan retry` to retry failed tasks.",
1693                    )
1694                    .await?;
1695                // Same retention rationale as Failed: embedding reused on resume/retry.
1696                self.orchestration.pending_graph = Some(completed_graph);
1697                "paused"
1698            }
1699            GraphStatus::Canceled => {
1700                let done_count = completed_graph
1701                    .tasks
1702                    .iter()
1703                    .filter(|t| t.status == crate::orchestration::TaskStatus::Completed)
1704                    .count();
1705                self.update_metrics(|m| m.orchestration.tasks_completed += done_count as u64);
1706                let total = completed_graph.tasks.len();
1707                self.channel
1708                    .send(&format!(
1709                        "Plan canceled. {done_count}/{total} tasks completed before cancellation."
1710                    ))
1711                    .await?;
1712                // Do NOT store graph back into pending_graph — canceled plans are not
1713                // retryable via /plan retry.
1714                self.orchestration.pending_goal_embedding.take();
1715                "canceled"
1716            }
1717            other => {
1718                tracing::warn!(%other, "unexpected graph status after Done");
1719                self.channel
1720                    .send(&format!("Plan ended with status: {other}"))
1721                    .await?;
1722                self.orchestration.pending_goal_embedding.take();
1723                "unknown"
1724            }
1725        };
1726        Ok(result_label)
1727    }
1728
1729    async fn handle_plan_status(
1730        &mut self,
1731        _graph_id: Option<&str>,
1732    ) -> Result<(), error::AgentError> {
1733        use crate::orchestration::GraphStatus;
1734        let Some(ref graph) = self.orchestration.pending_graph else {
1735            self.channel.send("No active plan.").await?;
1736            return Ok(());
1737        };
1738        let msg = match graph.status {
1739            GraphStatus::Created => {
1740                "A plan is awaiting confirmation. Type `/plan confirm` to execute or `/plan cancel` to abort."
1741            }
1742            GraphStatus::Running => "Plan is currently running.",
1743            GraphStatus::Paused => {
1744                "Plan is paused. Use `/plan resume` to continue or `/plan cancel` to abort."
1745            }
1746            GraphStatus::Failed => {
1747                "Plan failed. Use `/plan retry` to retry or `/plan cancel` to discard."
1748            }
1749            GraphStatus::Completed => "Plan completed successfully.",
1750            GraphStatus::Canceled => "Plan was canceled.",
1751        };
1752        self.channel.send(msg).await?;
1753        Ok(())
1754    }
1755
1756    async fn handle_plan_list(&mut self) -> Result<(), error::AgentError> {
1757        if let Some(ref graph) = self.orchestration.pending_graph {
1758            let summary = format_plan_summary(graph);
1759            let status_label = match graph.status {
1760                crate::orchestration::GraphStatus::Created => "awaiting confirmation",
1761                crate::orchestration::GraphStatus::Running => "running",
1762                crate::orchestration::GraphStatus::Paused => "paused",
1763                crate::orchestration::GraphStatus::Failed => "failed (retryable)",
1764                _ => "unknown",
1765            };
1766            self.channel
1767                .send(&format!("{summary}\nStatus: {status_label}"))
1768                .await?;
1769        } else {
1770            self.channel.send("No recent plans.").await?;
1771        }
1772        Ok(())
1773    }
1774
1775    async fn handle_plan_cancel(
1776        &mut self,
1777        _graph_id: Option<&str>,
1778    ) -> Result<(), error::AgentError> {
1779        if let Some(token) = self.orchestration.plan_cancel_token.take() {
1780            // In-flight plan: signal cancellation. The scheduler loop will pick this up
1781            // in the next tokio::select! iteration at wait_event().
1782            // NOTE: Due to &mut self being held by run_scheduler_loop, this branch is only
1783            // reachable if the channel has a concurrent reader (e.g. Telegram, TUI events).
1784            // CLI and synchronous channels cannot deliver this while the loop is active
1785            // (see #1603, SEC-M34-002).
1786            token.cancel();
1787            self.channel.send("Canceling plan execution...").await?;
1788        } else if self.orchestration.pending_graph.take().is_some() {
1789            let now = std::time::Instant::now();
1790            self.update_metrics(|m| {
1791                if let Some(ref mut s) = m.orchestration_graph {
1792                    "canceled".clone_into(&mut s.status);
1793                    s.completed_at = Some(now);
1794                }
1795            });
1796            self.orchestration.pending_goal_embedding = None;
1797            self.channel.send("Plan canceled.").await?;
1798        } else {
1799            self.channel.send("No active plan to cancel.").await?;
1800        }
1801        Ok(())
1802    }
1803
1804    /// Resume a paused graph (Ask failure strategy triggered a pause).
1805    ///
1806    /// Looks for a pending graph in `Paused` status. If `graph_id` is provided
1807    /// it must match the active graph's id (SEC-P5-03).
1808    async fn handle_plan_resume(
1809        &mut self,
1810        graph_id: Option<&str>,
1811    ) -> Result<(), error::AgentError> {
1812        use crate::orchestration::GraphStatus;
1813
1814        let Some(ref graph) = self.orchestration.pending_graph else {
1815            self.channel
1816                .send("No paused plan to resume. Use `/plan status` to check the current state.")
1817                .await?;
1818            return Ok(());
1819        };
1820
1821        // SEC-P5-03: if a graph_id was provided, reject if it doesn't match.
1822        if let Some(id) = graph_id
1823            && graph.id.to_string() != id
1824        {
1825            self.channel
1826                .send(&format!(
1827                    "Graph id '{id}' does not match the active plan ({}). \
1828                     Use `/plan status` to see the active plan id.",
1829                    graph.id
1830                ))
1831                .await?;
1832            return Ok(());
1833        }
1834
1835        if graph.status != GraphStatus::Paused {
1836            self.channel
1837                .send(&format!(
1838                    "The active plan is in '{}' status and cannot be resumed. \
1839                     Only Paused plans can be resumed.",
1840                    graph.status
1841                ))
1842                .await?;
1843            return Ok(());
1844        }
1845
1846        let graph = self.orchestration.pending_graph.take().unwrap();
1847
1848        tracing::info!(
1849            graph_id = %graph.id,
1850            "resuming paused graph"
1851        );
1852
1853        self.channel
1854            .send(&format!(
1855                "Resuming plan: {}\nUse `/plan confirm` to continue execution.",
1856                graph.goal
1857            ))
1858            .await?;
1859
1860        // Store resumed graph back as pending. resume_from() will set status=Running in confirm.
1861        self.orchestration.pending_graph = Some(graph);
1862        Ok(())
1863    }
1864
1865    /// Retry failed tasks in a graph.
1866    ///
1867    /// Resets all `Failed` tasks to `Ready` and all `Skipped` dependents back
1868    /// to `Pending`, then re-stores the graph as pending for re-execution.
1869    /// If `graph_id` is provided it must match the active graph's id (SEC-P5-04).
1870    async fn handle_plan_retry(&mut self, graph_id: Option<&str>) -> Result<(), error::AgentError> {
1871        use crate::orchestration::{GraphStatus, dag};
1872
1873        let Some(ref graph) = self.orchestration.pending_graph else {
1874            self.channel
1875                .send("No active plan to retry. Use `/plan status` to check the current state.")
1876                .await?;
1877            return Ok(());
1878        };
1879
1880        // SEC-P5-04: if a graph_id was provided, reject if it doesn't match.
1881        if let Some(id) = graph_id
1882            && graph.id.to_string() != id
1883        {
1884            self.channel
1885                .send(&format!(
1886                    "Graph id '{id}' does not match the active plan ({}). \
1887                     Use `/plan status` to see the active plan id.",
1888                    graph.id
1889                ))
1890                .await?;
1891            return Ok(());
1892        }
1893
1894        if graph.status != GraphStatus::Failed && graph.status != GraphStatus::Paused {
1895            self.channel
1896                .send(&format!(
1897                    "The active plan is in '{}' status. Only Failed or Paused plans can be retried.",
1898                    graph.status
1899                ))
1900                .await?;
1901            return Ok(());
1902        }
1903
1904        let mut graph = self.orchestration.pending_graph.take().unwrap();
1905
1906        // IC3: count before reset so the message reflects actual failed tasks, not Ready count.
1907        let failed_count = graph
1908            .tasks
1909            .iter()
1910            .filter(|t| t.status == crate::orchestration::TaskStatus::Failed)
1911            .count();
1912
1913        dag::reset_for_retry(&mut graph).map_err(|e| error::AgentError::Other(e.to_string()))?;
1914
1915        // HIGH-1 fix: reset_for_retry only resets Failed/Canceled tasks. Any tasks that were
1916        // in Running state at pause time are left as Running with stale assigned_agent handles
1917        // (those sub-agents are long dead). Reset them to Ready so resume_from() does not try
1918        // to wait for their events.
1919        for task in &mut graph.tasks {
1920            if task.status == crate::orchestration::TaskStatus::Running {
1921                task.status = crate::orchestration::TaskStatus::Ready;
1922                task.assigned_agent = None;
1923            }
1924        }
1925
1926        tracing::info!(
1927            graph_id = %graph.id,
1928            failed_count,
1929            "retrying failed tasks in graph"
1930        );
1931
1932        self.channel
1933            .send(&format!(
1934                "Retrying {failed_count} failed task(s) in plan: {}\n\
1935                 Use `/plan confirm` to execute.",
1936                graph.goal
1937            ))
1938            .await?;
1939
1940        // Store retried graph back for re-execution via /plan confirm.
1941        self.orchestration.pending_graph = Some(graph);
1942        Ok(())
1943    }
1944
1945    /// Call the LLM to generate a structured session summary with a configurable timeout.
1946    ///
1947    /// Falls back to plain-text chat if structured output fails or times out. Returns `None` on
1948    /// any failure, logging a warning — callers must treat `None` as "skip storage".
1949    ///
1950    /// Each LLM attempt is bounded by `shutdown_summary_timeout_secs`; in the worst case
1951    /// (structured call times out and plain-text fallback also times out) this adds up to
1952    /// `2 * shutdown_summary_timeout_secs` of shutdown latency.
1953    async fn call_llm_for_session_summary(
1954        &self,
1955        chat_messages: &[Message],
1956    ) -> Option<zeph_memory::StructuredSummary> {
1957        let timeout_dur =
1958            std::time::Duration::from_secs(self.memory_state.shutdown_summary_timeout_secs);
1959        match tokio::time::timeout(
1960            timeout_dur,
1961            self.provider
1962                .chat_typed_erased::<zeph_memory::StructuredSummary>(chat_messages),
1963        )
1964        .await
1965        {
1966            Ok(Ok(s)) => Some(s),
1967            Ok(Err(e)) => {
1968                tracing::warn!(
1969                    "shutdown summary: structured LLM call failed, falling back to plain: {e:#}"
1970                );
1971                self.plain_text_summary_fallback(chat_messages, timeout_dur)
1972                    .await
1973            }
1974            Err(_) => {
1975                tracing::warn!(
1976                    "shutdown summary: structured LLM call timed out after {}s, falling back to plain",
1977                    self.memory_state.shutdown_summary_timeout_secs
1978                );
1979                self.plain_text_summary_fallback(chat_messages, timeout_dur)
1980                    .await
1981            }
1982        }
1983    }
1984
1985    async fn plain_text_summary_fallback(
1986        &self,
1987        chat_messages: &[Message],
1988        timeout_dur: std::time::Duration,
1989    ) -> Option<zeph_memory::StructuredSummary> {
1990        match tokio::time::timeout(timeout_dur, self.provider.chat(chat_messages)).await {
1991            Ok(Ok(plain)) => Some(zeph_memory::StructuredSummary {
1992                summary: plain,
1993                key_facts: vec![],
1994                entities: vec![],
1995            }),
1996            Ok(Err(e)) => {
1997                tracing::warn!("shutdown summary: plain LLM fallback failed: {e:#}");
1998                None
1999            }
2000            Err(_) => {
2001                tracing::warn!("shutdown summary: plain LLM fallback timed out");
2002                None
2003            }
2004        }
2005    }
2006
2007    /// Generate and store a lightweight session summary at shutdown when no hard compaction fired.
2008    ///
2009    /// Guards:
2010    /// - `shutdown_summary` config must be enabled
2011    /// - `conversation_id` must be set (memory must be attached)
2012    /// - no existing session summary in the store (primary guard — resilient to failed Qdrant writes)
2013    /// - at least `shutdown_summary_min_messages` user-turn messages in history
2014    ///
2015    /// All errors are logged as warnings and swallowed — shutdown must never fail.
2016    async fn maybe_store_shutdown_summary(&mut self) {
2017        if !self.memory_state.shutdown_summary {
2018            return;
2019        }
2020        let Some(memory) = self.memory_state.memory.clone() else {
2021            return;
2022        };
2023        let Some(conversation_id) = self.memory_state.conversation_id else {
2024            return;
2025        };
2026
2027        // Primary guard: check if a summary already exists (handles failed Qdrant writes too).
2028        match memory.has_session_summary(conversation_id).await {
2029            Ok(true) => {
2030                tracing::debug!("shutdown summary: session already has a summary, skipping");
2031                return;
2032            }
2033            Ok(false) => {}
2034            Err(e) => {
2035                tracing::warn!("shutdown summary: failed to check existing summary: {e:#}");
2036                return;
2037            }
2038        }
2039
2040        // Count user-turn messages only (skip system prompt at index 0).
2041        let user_count = self
2042            .msg
2043            .messages
2044            .iter()
2045            .skip(1)
2046            .filter(|m| m.role == Role::User)
2047            .count();
2048        if user_count < self.memory_state.shutdown_summary_min_messages {
2049            tracing::debug!(
2050                user_count,
2051                min = self.memory_state.shutdown_summary_min_messages,
2052                "shutdown summary: too few user messages, skipping"
2053            );
2054            return;
2055        }
2056
2057        // TUI status — send errors silently ignored (TUI may already be gone at shutdown).
2058        let _ = self.channel.send_status("Saving session summary...").await;
2059
2060        // Collect last N messages (skip system prompt at index 0).
2061        let max = self.memory_state.shutdown_summary_max_messages;
2062        if max == 0 {
2063            tracing::debug!("shutdown summary: max_messages=0, skipping");
2064            return;
2065        }
2066        let non_system: Vec<_> = self.msg.messages.iter().skip(1).collect();
2067        let slice = if non_system.len() > max {
2068            &non_system[non_system.len() - max..]
2069        } else {
2070            &non_system[..]
2071        };
2072
2073        let msgs_for_prompt: Vec<(zeph_memory::MessageId, String, String)> = slice
2074            .iter()
2075            .map(|m| {
2076                let role = match m.role {
2077                    Role::User => "user".to_owned(),
2078                    Role::Assistant => "assistant".to_owned(),
2079                    Role::System => "system".to_owned(),
2080                };
2081                (zeph_memory::MessageId(0), role, m.content.clone())
2082            })
2083            .collect();
2084
2085        let prompt = zeph_memory::build_summarization_prompt(&msgs_for_prompt);
2086        let chat_messages = vec![Message {
2087            role: Role::User,
2088            content: prompt,
2089            parts: vec![],
2090            metadata: MessageMetadata::default(),
2091        }];
2092
2093        let Some(structured) = self.call_llm_for_session_summary(&chat_messages).await else {
2094            let _ = self.channel.send_status("").await;
2095            return;
2096        };
2097
2098        if let Err(e) = memory
2099            .store_shutdown_summary(conversation_id, &structured.summary, &structured.key_facts)
2100            .await
2101        {
2102            tracing::warn!("shutdown summary: storage failed: {e:#}");
2103        } else {
2104            tracing::info!(
2105                conversation_id = conversation_id.0,
2106                "shutdown summary stored"
2107            );
2108        }
2109
2110        // Clear TUI status.
2111        let _ = self.channel.send_status("").await;
2112    }
2113
2114    pub async fn shutdown(&mut self) {
2115        self.channel.send("Shutting down...").await.ok();
2116
2117        // CRIT-1: persist Thompson state accumulated during this session.
2118        self.provider.save_router_state();
2119
2120        if let Some(ref mut mgr) = self.orchestration.subagent_manager {
2121            mgr.shutdown_all();
2122        }
2123
2124        if let Some(ref manager) = self.mcp.manager {
2125            manager.shutdown_all_shared().await;
2126        }
2127
2128        // Finalize compaction trajectory: push the last open segment into the Vec.
2129        // This segment would otherwise only be pushed when the next hard compaction fires,
2130        // which never happens at session end.
2131        if let Some(turns) = self.context_manager.turns_since_last_hard_compaction {
2132            self.update_metrics(|m| {
2133                m.compaction_turns_after_hard.push(turns);
2134            });
2135            self.context_manager.turns_since_last_hard_compaction = None;
2136        }
2137
2138        if let Some(ref tx) = self.metrics.metrics_tx {
2139            let m = tx.borrow();
2140            if m.filter_applications > 0 {
2141                #[allow(clippy::cast_precision_loss)]
2142                let pct = if m.filter_raw_tokens > 0 {
2143                    m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
2144                } else {
2145                    0.0
2146                };
2147                tracing::info!(
2148                    raw_tokens = m.filter_raw_tokens,
2149                    saved_tokens = m.filter_saved_tokens,
2150                    applications = m.filter_applications,
2151                    "tool output filtering saved ~{} tokens ({pct:.0}%)",
2152                    m.filter_saved_tokens,
2153                );
2154            }
2155            if m.compaction_hard_count > 0 {
2156                tracing::info!(
2157                    hard_compactions = m.compaction_hard_count,
2158                    turns_after_hard = ?m.compaction_turns_after_hard,
2159                    "hard compaction trajectory"
2160                );
2161            }
2162        }
2163
2164        self.maybe_store_shutdown_summary().await;
2165
2166        tracing::info!("agent shutdown complete");
2167    }
2168
2169    /// Run the chat loop, receiving messages via the channel until EOF or shutdown.
2170    ///
2171    /// # Errors
2172    ///
2173    /// Returns an error if channel I/O or LLM communication fails.
2174    /// Refresh sub-agent metrics snapshot for the TUI metrics panel.
2175    fn refresh_subagent_metrics(&mut self) {
2176        let Some(ref mgr) = self.orchestration.subagent_manager else {
2177            return;
2178        };
2179        let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
2180            .statuses()
2181            .into_iter()
2182            .map(|(id, s)| {
2183                let def = mgr.agents_def(&id);
2184                crate::metrics::SubAgentMetrics {
2185                    name: def.map_or_else(|| id[..8.min(id.len())].to_owned(), |d| d.name.clone()),
2186                    id: id.clone(),
2187                    state: format!("{:?}", s.state).to_lowercase(),
2188                    turns_used: s.turns_used,
2189                    max_turns: def.map_or(20, |d| d.permissions.max_turns),
2190                    background: def.is_some_and(|d| d.permissions.background),
2191                    elapsed_secs: s.started_at.elapsed().as_secs(),
2192                    permission_mode: def.map_or_else(String::new, |d| {
2193                        use crate::subagent::def::PermissionMode;
2194                        match d.permissions.permission_mode {
2195                            PermissionMode::Default => String::new(),
2196                            PermissionMode::AcceptEdits => "accept_edits".into(),
2197                            PermissionMode::DontAsk => "dont_ask".into(),
2198                            PermissionMode::BypassPermissions => "bypass_permissions".into(),
2199                            PermissionMode::Plan => "plan".into(),
2200                        }
2201                    }),
2202                }
2203            })
2204            .collect();
2205        self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
2206    }
2207
2208    /// Non-blocking poll: notify the user when background sub-agents complete.
2209    async fn notify_completed_subagents(&mut self) -> Result<(), error::AgentError> {
2210        let completed = self.poll_subagents().await;
2211        for (task_id, result) in completed {
2212            let notice = if result.is_empty() {
2213                format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
2214            } else {
2215                format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
2216            };
2217            if let Err(e) = self.channel.send(&notice).await {
2218                tracing::warn!(error = %e, "failed to send sub-agent completion notice");
2219            }
2220        }
2221        Ok(())
2222    }
2223
2224    /// Run the agent main loop.
2225    ///
2226    /// # Errors
2227    ///
2228    /// Returns an error if the channel, LLM provider, or tool execution encounters a fatal error.
2229    pub async fn run(&mut self) -> Result<(), error::AgentError> {
2230        if let Some(mut rx) = self.lifecycle.warmup_ready.take()
2231            && !*rx.borrow()
2232        {
2233            let _ = rx.changed().await;
2234            if !*rx.borrow() {
2235                tracing::warn!("model warmup did not complete successfully");
2236            }
2237        }
2238
2239        loop {
2240            // Apply any pending provider override (from ACP set_session_config_option).
2241            if let Some(ref slot) = self.providers.provider_override
2242                && let Some(new_provider) = slot
2243                    .write()
2244                    .unwrap_or_else(std::sync::PoisonError::into_inner)
2245                    .take()
2246            {
2247                tracing::debug!(provider = new_provider.name(), "ACP model override applied");
2248                self.provider = new_provider;
2249            }
2250
2251            // Poll for MCP tool list updates from tools/list_changed notifications.
2252            self.check_tool_refresh().await;
2253
2254            // Refresh sub-agent status in metrics before polling.
2255            self.refresh_subagent_metrics();
2256
2257            // Non-blocking poll: notify user when background sub-agents complete.
2258            self.notify_completed_subagents().await?;
2259
2260            self.drain_channel();
2261
2262            let (text, image_parts) = if let Some(queued) = self.msg.message_queue.pop_front() {
2263                self.notify_queue_count().await;
2264                if queued.raw_attachments.is_empty() {
2265                    (queued.text, queued.image_parts)
2266                } else {
2267                    let msg = crate::channel::ChannelMessage {
2268                        text: queued.text,
2269                        attachments: queued.raw_attachments,
2270                    };
2271                    self.resolve_message(msg).await
2272                }
2273            } else {
2274                let incoming = tokio::select! {
2275                    result = self.channel.recv() => result?,
2276                    () = shutdown_signal(&mut self.lifecycle.shutdown) => {
2277                        tracing::info!("shutting down");
2278                        break;
2279                    }
2280                    Some(_) = recv_optional(&mut self.skill_state.skill_reload_rx) => {
2281                        self.reload_skills().await;
2282                        continue;
2283                    }
2284                    Some(_) = recv_optional(&mut self.instructions.reload_rx) => {
2285                        self.reload_instructions();
2286                        continue;
2287                    }
2288                    Some(_) = recv_optional(&mut self.lifecycle.config_reload_rx) => {
2289                        self.reload_config();
2290                        continue;
2291                    }
2292                    Some(msg) = recv_optional(&mut self.lifecycle.update_notify_rx) => {
2293                        if let Err(e) = self.channel.send(&msg).await {
2294                            tracing::warn!("failed to send update notification: {e}");
2295                        }
2296                        continue;
2297                    }
2298                    Some(msg) = recv_optional(&mut self.experiments.notify_rx) => {
2299                        // Experiment engine completed (ok or err). Clear the cancel token so
2300                        // status reports idle and new experiments can be started.
2301                        #[cfg(feature = "experiments")]
2302                        { self.experiments.cancel = None; }
2303                        if let Err(e) = self.channel.send(&msg).await {
2304                            tracing::warn!("failed to send experiment completion: {e}");
2305                        }
2306                        continue;
2307                    }
2308                    Some(prompt) = recv_optional(&mut self.lifecycle.custom_task_rx) => {
2309                        tracing::info!("scheduler: injecting custom task as agent turn");
2310                        let text = format!("{SCHEDULED_TASK_PREFIX}{prompt}");
2311                        Some(crate::channel::ChannelMessage { text, attachments: Vec::new() })
2312                    }
2313                };
2314                let Some(msg) = incoming else { break };
2315                self.drain_channel();
2316                self.resolve_message(msg).await
2317            };
2318
2319            let trimmed = text.trim();
2320
2321            match self.handle_builtin_command(trimmed).await? {
2322                Some(true) => break,
2323                Some(false) => continue,
2324                None => {}
2325            }
2326
2327            self.process_user_message(text, image_parts).await?;
2328        }
2329
2330        // Flush trace collector on normal exit (C-04: Drop handles error/panic paths).
2331        if let Some(ref mut tc) = self.debug_state.trace_collector {
2332            tc.finish();
2333        }
2334
2335        Ok(())
2336    }
2337
2338    /// Handle built-in slash commands that short-circuit the main `run` loop.
2339    ///
2340    /// Returns `Some(true)` to break the loop (exit), `Some(false)` to continue to the next
2341    /// iteration, or `None` if the command was not recognized (caller should call
2342    /// `process_user_message`).
2343    async fn handle_builtin_command(
2344        &mut self,
2345        trimmed: &str,
2346    ) -> Result<Option<bool>, error::AgentError> {
2347        if trimmed == "/clear-queue" {
2348            let n = self.clear_queue();
2349            self.notify_queue_count().await;
2350            self.channel
2351                .send(&format!("Cleared {n} queued messages."))
2352                .await?;
2353            let _ = self.channel.flush_chunks().await;
2354            return Ok(Some(false));
2355        }
2356
2357        if trimmed == "/compact" {
2358            if self.msg.messages.len() > self.context_manager.compaction_preserve_tail + 1 {
2359                match self.compact_context().await {
2360                    Ok(
2361                        context::CompactionOutcome::Compacted
2362                        | context::CompactionOutcome::NoChange,
2363                    ) => {
2364                        let _ = self.channel.send("Context compacted successfully.").await;
2365                    }
2366                    Ok(context::CompactionOutcome::ProbeRejected) => {
2367                        let _ = self
2368                            .channel
2369                            .send(
2370                                "Compaction rejected: summary quality below threshold. \
2371                                 Original context preserved.",
2372                            )
2373                            .await;
2374                    }
2375                    Err(e) => {
2376                        let _ = self.channel.send(&format!("Compaction failed: {e}")).await;
2377                    }
2378                }
2379            } else {
2380                let _ = self.channel.send("Nothing to compact.").await;
2381            }
2382            let _ = self.channel.flush_chunks().await;
2383            return Ok(Some(false));
2384        }
2385
2386        if trimmed == "/clear" {
2387            self.clear_history();
2388            self.tool_orchestrator.clear_cache();
2389            if let Ok(mut urls) = self.security.user_provided_urls.write() {
2390                urls.clear();
2391            }
2392            let _ = self.channel.flush_chunks().await;
2393            return Ok(Some(false));
2394        }
2395
2396        if trimmed == "/cache-stats" {
2397            let stats = self.tool_orchestrator.cache_stats();
2398            self.channel.send(&stats).await?;
2399            let _ = self.channel.flush_chunks().await;
2400            return Ok(Some(false));
2401        }
2402
2403        if trimmed == "/model" || trimmed.starts_with("/model ") {
2404            self.handle_model_command(trimmed).await;
2405            let _ = self.channel.flush_chunks().await;
2406            return Ok(Some(false));
2407        }
2408
2409        if trimmed == "/provider" || trimmed.starts_with("/provider ") {
2410            self.handle_provider_command(trimmed).await;
2411            let _ = self.channel.flush_chunks().await;
2412            return Ok(Some(false));
2413        }
2414
2415        if trimmed == "/debug-dump" || trimmed.starts_with("/debug-dump ") {
2416            self.handle_debug_dump_command(trimmed).await;
2417            let _ = self.channel.flush_chunks().await;
2418            return Ok(Some(false));
2419        }
2420
2421        if trimmed.starts_with("/dump-format") {
2422            self.handle_dump_format_command(trimmed).await;
2423            let _ = self.channel.flush_chunks().await;
2424            return Ok(Some(false));
2425        }
2426
2427        if trimmed == "/exit" || trimmed == "/quit" {
2428            if self.channel.supports_exit() {
2429                return Ok(Some(true));
2430            }
2431            let _ = self
2432                .channel
2433                .send("/exit is not supported in this channel.")
2434                .await;
2435            return Ok(Some(false));
2436        }
2437
2438        Ok(None)
2439    }
2440
2441    /// Switch the active provider to one serving `model_id`.
2442    ///
2443    /// Looks up the model in the provider's remote model list (or cache).
2444    ///
2445    /// # Errors
2446    ///
2447    /// Returns `Err` if the model is not found.
2448    pub fn set_model(&mut self, model_id: &str) -> Result<(), String> {
2449        if model_id.is_empty() {
2450            return Err("model id must not be empty".to_string());
2451        }
2452        if model_id.len() > 256 {
2453            return Err("model id exceeds maximum length of 256 characters".to_string());
2454        }
2455        if !model_id
2456            .chars()
2457            .all(|c| c.is_ascii() && !c.is_ascii_control())
2458        {
2459            return Err("model id must contain only printable ASCII characters".to_string());
2460        }
2461        self.runtime.model_name = model_id.to_string();
2462        tracing::info!(model = model_id, "set_model called");
2463        Ok(())
2464    }
2465
2466    async fn handle_model_refresh(&mut self) {
2467        // Invalidate all model cache files in the cache directory.
2468        if let Some(cache_dir) = dirs::cache_dir() {
2469            let models_dir = cache_dir.join("zeph").join("models");
2470            if let Ok(entries) = std::fs::read_dir(&models_dir) {
2471                for entry in entries.flatten() {
2472                    let path = entry.path();
2473                    if path.extension().and_then(|e| e.to_str()) == Some("json") {
2474                        let _ = std::fs::remove_file(&path);
2475                    }
2476                }
2477            }
2478        }
2479        match self.provider.list_models_remote().await {
2480            Ok(models) => {
2481                let _ = self
2482                    .channel
2483                    .send(&format!("Fetched {} models.", models.len()))
2484                    .await;
2485            }
2486            Err(e) => {
2487                let _ = self
2488                    .channel
2489                    .send(&format!("Error fetching models: {e}"))
2490                    .await;
2491            }
2492        }
2493    }
2494
2495    async fn handle_model_list(&mut self) {
2496        let cache = zeph_llm::model_cache::ModelCache::for_slug(self.provider.name());
2497        let cached = if cache.is_stale() {
2498            None
2499        } else {
2500            cache.load().unwrap_or(None)
2501        };
2502        let models = if let Some(m) = cached {
2503            m
2504        } else {
2505            match self.provider.list_models_remote().await {
2506                Ok(m) => m,
2507                Err(e) => {
2508                    let _ = self
2509                        .channel
2510                        .send(&format!("Error fetching models: {e}"))
2511                        .await;
2512                    return;
2513                }
2514            }
2515        };
2516        if models.is_empty() {
2517            let _ = self.channel.send("No models available.").await;
2518            return;
2519        }
2520        let mut lines = vec!["Available models:".to_string()];
2521        for (i, m) in models.iter().enumerate() {
2522            lines.push(format!("  {}. {} ({})", i + 1, m.display_name, m.id));
2523        }
2524        let _ = self.channel.send(&lines.join("\n")).await;
2525    }
2526
2527    async fn handle_model_switch(&mut self, model_id: &str) {
2528        // Validate model_id against the known model list before switching.
2529        // Try disk cache first; fall back to a remote fetch if the cache is stale.
2530        let cache = zeph_llm::model_cache::ModelCache::for_slug(self.provider.name());
2531        let known_models: Option<Vec<zeph_llm::model_cache::RemoteModelInfo>> = if cache.is_stale()
2532        {
2533            match self.provider.list_models_remote().await {
2534                Ok(m) if !m.is_empty() => Some(m),
2535                _ => None,
2536            }
2537        } else {
2538            cache.load().unwrap_or(None)
2539        };
2540        if let Some(models) = known_models {
2541            if !models.iter().any(|m| m.id == model_id) {
2542                let mut lines = vec![format!("Unknown model '{model_id}'. Available models:")];
2543                for m in &models {
2544                    lines.push(format!("  • {} ({})", m.display_name, m.id));
2545                }
2546                let _ = self.channel.send(&lines.join("\n")).await;
2547                return;
2548            }
2549        } else {
2550            let _ = self
2551                .channel
2552                .send(
2553                    "Model list unavailable, switching anyway — verify your model name is correct.",
2554                )
2555                .await;
2556        }
2557        match self.set_model(model_id) {
2558            Ok(()) => {
2559                let _ = self
2560                    .channel
2561                    .send(&format!("Switched to model: {model_id}"))
2562                    .await;
2563            }
2564            Err(e) => {
2565                let _ = self.channel.send(&format!("Error: {e}")).await;
2566            }
2567        }
2568    }
2569
2570    /// Handle `/model`, `/model <id>`, and `/model refresh` commands.
2571    async fn handle_model_command(&mut self, trimmed: &str) {
2572        let arg = trimmed.strip_prefix("/model").map_or("", str::trim);
2573        if arg == "refresh" {
2574            self.handle_model_refresh().await;
2575        } else if arg.is_empty() {
2576            self.handle_model_list().await;
2577        } else {
2578            self.handle_model_switch(arg).await;
2579        }
2580    }
2581
2582    /// Handle `/debug-dump` and `/debug-dump <path>` commands.
2583    async fn handle_debug_dump_command(&mut self, trimmed: &str) {
2584        let arg = trimmed.strip_prefix("/debug-dump").map_or("", str::trim);
2585        if arg.is_empty() {
2586            match &self.debug_state.debug_dumper {
2587                Some(d) => {
2588                    let _ = self
2589                        .channel
2590                        .send(&format!("Debug dump active: {}", d.dir().display()))
2591                        .await;
2592                }
2593                None => {
2594                    let _ = self
2595                        .channel
2596                        .send(
2597                            "Debug dump is inactive. Use `/debug-dump <path>` to enable, \
2598                             or start with `--debug-dump [dir]`.",
2599                        )
2600                        .await;
2601                }
2602            }
2603            return;
2604        }
2605        let dir = std::path::PathBuf::from(arg);
2606        match crate::debug_dump::DebugDumper::new(&dir, self.debug_state.dump_format) {
2607            Ok(dumper) => {
2608                let path = dumper.dir().display().to_string();
2609                self.debug_state.debug_dumper = Some(dumper);
2610                let _ = self
2611                    .channel
2612                    .send(&format!("Debug dump enabled: {path}"))
2613                    .await;
2614            }
2615            Err(e) => {
2616                let _ = self
2617                    .channel
2618                    .send(&format!("Failed to enable debug dump: {e}"))
2619                    .await;
2620            }
2621        }
2622    }
2623
2624    /// Handle `/dump-format <json|raw|trace>` command — switch debug dump format at runtime.
2625    async fn handle_dump_format_command(&mut self, trimmed: &str) {
2626        let arg = trimmed.strip_prefix("/dump-format").map_or("", str::trim);
2627        if arg.is_empty() {
2628            let _ = self
2629                .channel
2630                .send(&format!(
2631                    "Current dump format: {:?}. Use `/dump-format json|raw|trace` to change.",
2632                    self.debug_state.dump_format
2633                ))
2634                .await;
2635            return;
2636        }
2637        let new_format = match arg {
2638            "json" => crate::debug_dump::DumpFormat::Json,
2639            "raw" => crate::debug_dump::DumpFormat::Raw,
2640            "trace" => crate::debug_dump::DumpFormat::Trace,
2641            other => {
2642                let _ = self
2643                    .channel
2644                    .send(&format!(
2645                        "Unknown format '{other}'. Valid values: json, raw, trace."
2646                    ))
2647                    .await;
2648                return;
2649            }
2650        };
2651        let was_trace = self.debug_state.dump_format == crate::debug_dump::DumpFormat::Trace;
2652        let now_trace = new_format == crate::debug_dump::DumpFormat::Trace;
2653
2654        // CR-04: when switching TO trace, create a fresh TracingCollector.
2655        if now_trace
2656            && !was_trace
2657            && let Some(ref dump_dir) = self.debug_state.dump_dir.clone()
2658        {
2659            let service_name = self.debug_state.trace_service_name.clone();
2660            let redact = self.debug_state.trace_redact;
2661            match crate::debug_dump::trace::TracingCollector::new(
2662                dump_dir.as_path(),
2663                &service_name,
2664                redact,
2665                None,
2666            ) {
2667                Ok(collector) => {
2668                    self.debug_state.trace_collector = Some(collector);
2669                }
2670                Err(e) => {
2671                    tracing::warn!(error = %e, "failed to create TracingCollector on format switch");
2672                }
2673            }
2674        }
2675        // CR-04: when switching AWAY from trace, flush and drop the collector.
2676        if was_trace
2677            && !now_trace
2678            && let Some(mut tc) = self.debug_state.trace_collector.take()
2679        {
2680            tc.finish();
2681        }
2682
2683        self.debug_state.dump_format = new_format;
2684        let _ = self
2685            .channel
2686            .send(&format!("Debug dump format set to: {arg}"))
2687            .await;
2688    }
2689
2690    async fn resolve_message(
2691        &self,
2692        msg: crate::channel::ChannelMessage,
2693    ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
2694        use crate::channel::{Attachment, AttachmentKind};
2695        use zeph_llm::provider::{ImageData, MessagePart};
2696
2697        let text_base = msg.text.clone();
2698
2699        let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
2700            .attachments
2701            .into_iter()
2702            .partition(|a| a.kind == AttachmentKind::Audio);
2703
2704        tracing::debug!(
2705            audio = audio_attachments.len(),
2706            has_stt = self.providers.stt.is_some(),
2707            "resolve_message attachments"
2708        );
2709
2710        let text = if !audio_attachments.is_empty()
2711            && let Some(stt) = self.providers.stt.as_ref()
2712        {
2713            let mut transcribed_parts = Vec::new();
2714            for attachment in &audio_attachments {
2715                if attachment.data.len() > MAX_AUDIO_BYTES {
2716                    tracing::warn!(
2717                        size = attachment.data.len(),
2718                        max = MAX_AUDIO_BYTES,
2719                        "audio attachment exceeds size limit, skipping"
2720                    );
2721                    continue;
2722                }
2723                match stt
2724                    .transcribe(&attachment.data, attachment.filename.as_deref())
2725                    .await
2726                {
2727                    Ok(result) => {
2728                        tracing::info!(
2729                            len = result.text.len(),
2730                            language = ?result.language,
2731                            "audio transcribed"
2732                        );
2733                        transcribed_parts.push(result.text);
2734                    }
2735                    Err(e) => {
2736                        tracing::error!(error = %e, "audio transcription failed");
2737                    }
2738                }
2739            }
2740            if transcribed_parts.is_empty() {
2741                text_base
2742            } else {
2743                let transcribed = transcribed_parts.join("\n");
2744                if text_base.is_empty() {
2745                    transcribed
2746                } else {
2747                    format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
2748                }
2749            }
2750        } else {
2751            if !audio_attachments.is_empty() {
2752                tracing::warn!(
2753                    count = audio_attachments.len(),
2754                    "audio attachments received but no STT provider configured, dropping"
2755                );
2756            }
2757            text_base
2758        };
2759
2760        let mut image_parts = Vec::new();
2761        for attachment in image_attachments {
2762            if attachment.data.len() > MAX_IMAGE_BYTES {
2763                tracing::warn!(
2764                    size = attachment.data.len(),
2765                    max = MAX_IMAGE_BYTES,
2766                    "image attachment exceeds size limit, skipping"
2767                );
2768                continue;
2769            }
2770            let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
2771            image_parts.push(MessagePart::Image(Box::new(ImageData {
2772                data: attachment.data,
2773                mime_type,
2774            })));
2775        }
2776
2777        (text, image_parts)
2778    }
2779
2780    /// Dispatch slash commands. Returns `Some(Ok(()))` when handled,
2781    /// `Some(Err(_))` on I/O error, `None` to fall through to LLM processing.
2782    #[allow(clippy::too_many_lines)]
2783    async fn dispatch_slash_command(
2784        &mut self,
2785        trimmed: &str,
2786    ) -> Option<Result<(), error::AgentError>> {
2787        macro_rules! handled {
2788            ($expr:expr) => {{
2789                if let Err(e) = $expr {
2790                    return Some(Err(e));
2791                }
2792                let _ = self.channel.flush_chunks().await;
2793                return Some(Ok(()));
2794            }};
2795        }
2796
2797        // Slash command arguments may contain user-provided URLs (e.g. `/browse https://...`).
2798        // Extract them here so UrlGroundingVerifier allows follow-up fetch calls.
2799        let slash_urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
2800        if !slash_urls.is_empty()
2801            && let Ok(mut set) = self.security.user_provided_urls.write()
2802        {
2803            set.extend(slash_urls);
2804        }
2805
2806        if trimmed == "/help" {
2807            handled!(self.handle_help_command().await);
2808        }
2809
2810        if trimmed == "/status" {
2811            handled!(self.handle_status_command().await);
2812        }
2813
2814        #[cfg(feature = "guardrail")]
2815        if trimmed == "/guardrail" {
2816            handled!(self.handle_guardrail_command().await);
2817        }
2818
2819        if trimmed == "/skills" {
2820            handled!(self.handle_skills_command().await);
2821        }
2822
2823        if trimmed == "/skill" || trimmed.starts_with("/skill ") {
2824            let rest = trimmed
2825                .strip_prefix("/skill")
2826                .unwrap_or("")
2827                .trim()
2828                .to_owned();
2829            handled!(self.handle_skill_command(&rest).await);
2830        }
2831
2832        if trimmed == "/feedback" || trimmed.starts_with("/feedback ") {
2833            let rest = trimmed
2834                .strip_prefix("/feedback")
2835                .unwrap_or("")
2836                .trim()
2837                .to_owned();
2838            handled!(self.handle_feedback(&rest).await);
2839        }
2840
2841        if trimmed == "/mcp" || trimmed.starts_with("/mcp ") {
2842            let args = trimmed.strip_prefix("/mcp").unwrap_or("").trim().to_owned();
2843            handled!(self.handle_mcp_command(&args).await);
2844        }
2845
2846        if trimmed == "/image" || trimmed.starts_with("/image ") {
2847            let path = trimmed
2848                .strip_prefix("/image")
2849                .unwrap_or("")
2850                .trim()
2851                .to_owned();
2852            if path.is_empty() {
2853                handled!(
2854                    self.channel
2855                        .send("Usage: /image <path>")
2856                        .await
2857                        .map_err(Into::into)
2858                );
2859            }
2860            handled!(self.handle_image_command(&path).await);
2861        }
2862
2863        if trimmed == "/plan" || trimmed.starts_with("/plan ") {
2864            return Some(self.dispatch_plan_command(trimmed).await);
2865        }
2866
2867        if trimmed == "/graph" || trimmed.starts_with("/graph ") {
2868            handled!(self.handle_graph_command(trimmed).await);
2869        }
2870
2871        if trimmed == "/memory" || trimmed.starts_with("/memory ") {
2872            handled!(self.handle_memory_command(trimmed).await);
2873        }
2874
2875        #[cfg(feature = "compression-guidelines")]
2876        if trimmed == "/guidelines" {
2877            handled!(self.handle_guidelines_command().await);
2878        }
2879
2880        #[cfg(feature = "scheduler")]
2881        if trimmed == "/scheduler" || trimmed.starts_with("/scheduler ") {
2882            handled!(self.handle_scheduler_command(trimmed).await);
2883        }
2884
2885        #[cfg(feature = "experiments")]
2886        if trimmed == "/experiment" || trimmed.starts_with("/experiment ") {
2887            handled!(self.handle_experiment_command(trimmed).await);
2888        }
2889
2890        #[cfg(feature = "lsp-context")]
2891        if trimmed == "/lsp" {
2892            handled!(self.handle_lsp_status_command().await);
2893        }
2894
2895        #[cfg(feature = "policy-enforcer")]
2896        if trimmed == "/policy" || trimmed.starts_with("/policy ") {
2897            let args = trimmed
2898                .strip_prefix("/policy")
2899                .unwrap_or("")
2900                .trim()
2901                .to_owned();
2902            handled!(self.handle_policy_command(&args).await);
2903        }
2904
2905        if trimmed == "/log" {
2906            handled!(self.handle_log_command().await);
2907        }
2908
2909        if trimmed.starts_with("/agent") || trimmed.starts_with('@') {
2910            return self.dispatch_agent_command(trimmed).await;
2911        }
2912
2913        #[cfg(feature = "context-compression")]
2914        if trimmed == "/focus" {
2915            handled!(self.handle_focus_status_command().await);
2916        }
2917
2918        #[cfg(feature = "context-compression")]
2919        if trimmed == "/sidequest" {
2920            handled!(self.handle_sidequest_status_command().await);
2921        }
2922
2923        None
2924    }
2925
2926    async fn dispatch_plan_command(&mut self, trimmed: &str) -> Result<(), error::AgentError> {
2927        match crate::orchestration::PlanCommand::parse(trimmed) {
2928            Ok(cmd) => {
2929                self.handle_plan_command(cmd).await?;
2930            }
2931            Err(e) => {
2932                self.channel
2933                    .send(&e.to_string())
2934                    .await
2935                    .map_err(error::AgentError::from)?;
2936            }
2937        }
2938        let _ = self.channel.flush_chunks().await;
2939        Ok(())
2940    }
2941
2942    async fn dispatch_agent_command(
2943        &mut self,
2944        trimmed: &str,
2945    ) -> Option<Result<(), error::AgentError>> {
2946        let known: Vec<String> = self
2947            .orchestration
2948            .subagent_manager
2949            .as_ref()
2950            .map(|m| m.definitions().iter().map(|d| d.name.clone()).collect())
2951            .unwrap_or_default();
2952        match crate::subagent::AgentCommand::parse(trimmed, &known) {
2953            Ok(cmd) => {
2954                if let Some(msg) = self.handle_agent_command(cmd).await
2955                    && let Err(e) = self.channel.send(&msg).await
2956                {
2957                    return Some(Err(e.into()));
2958                }
2959                let _ = self.channel.flush_chunks().await;
2960                Some(Ok(()))
2961            }
2962            Err(e) if trimmed.starts_with('@') => {
2963                // Unknown @token — fall through to normal LLM processing
2964                tracing::debug!("@mention not matched as agent: {e}");
2965                None
2966            }
2967            Err(e) => {
2968                if let Err(send_err) = self.channel.send(&e.to_string()).await {
2969                    return Some(Err(send_err.into()));
2970                }
2971                let _ = self.channel.flush_chunks().await;
2972                Some(Ok(()))
2973            }
2974        }
2975    }
2976
2977    /// Spawn a background task to evaluate the user message with the LLM judge (or `LlmClassifier`)
2978    /// and store the correction result. Non-blocking: the task runs independently of the response
2979    /// pipeline.
2980    ///
2981    /// # Notes
2982    ///
2983    /// TODO(I3): `JoinHandle`s are not tracked — outstanding tasks may be aborted on runtime
2984    /// shutdown before `store_user_correction` completes. Acceptable for MVP.
2985    fn spawn_judge_correction_check(
2986        &mut self,
2987        trimmed: &str,
2988        conv_id: Option<zeph_memory::ConversationId>,
2989    ) {
2990        let assistant_snippet = self.last_assistant_response();
2991        let user_msg_owned = trimmed.to_owned();
2992        let memory_arc = self.memory_state.memory.clone();
2993        let skill_name = self
2994            .skill_state
2995            .active_skill_names
2996            .first()
2997            .cloned()
2998            .unwrap_or_default();
2999        let conv_id_bg = conv_id;
3000        let confidence_threshold = self
3001            .learning_engine
3002            .config
3003            .as_ref()
3004            .map_or(0.6, |c| c.correction_confidence_threshold);
3005
3006        if let Some(llm_classifier) = self.feedback.llm_classifier.clone() {
3007            // DetectorMode::Model: clone the classifier (cheap — it holds Arc<AnyProvider>).
3008            let user_msg = user_msg_owned.clone();
3009            let assistant = assistant_snippet.clone();
3010            let memory_arc2 = memory_arc.clone();
3011            let skill_name2 = skill_name.clone();
3012            tokio::spawn(async move {
3013                match llm_classifier
3014                    .classify_feedback(&user_msg, &assistant, confidence_threshold)
3015                    .await
3016                {
3017                    Ok(verdict) => {
3018                        if let Some(signal) = feedback_verdict_into_signal(&verdict, &user_msg) {
3019                            let is_self_correction =
3020                                signal.kind == feedback_detector::CorrectionKind::SelfCorrection;
3021                            tracing::info!(
3022                                kind = signal.kind.as_str(),
3023                                confidence = signal.confidence,
3024                                source = "llm-classifier",
3025                                is_self_correction,
3026                                "correction signal detected"
3027                            );
3028                            store_correction_in_memory(
3029                                memory_arc2,
3030                                conv_id_bg,
3031                                &assistant,
3032                                &user_msg,
3033                                skill_name2,
3034                                signal.kind.as_str(),
3035                            )
3036                            .await;
3037                        }
3038                    }
3039                    Err(e) => {
3040                        tracing::warn!("llm-classifier failed: {e:#}");
3041                    }
3042                }
3043            });
3044        } else {
3045            // DetectorMode::Judge (legacy path).
3046            let judge_provider = self
3047                .providers
3048                .judge_provider
3049                .clone()
3050                .unwrap_or_else(|| self.provider.clone());
3051            let user_msg = user_msg_owned.clone();
3052            let assistant = assistant_snippet.clone();
3053            tokio::spawn(async move {
3054                match feedback_detector::JudgeDetector::evaluate(
3055                    &judge_provider,
3056                    &user_msg,
3057                    &assistant,
3058                    confidence_threshold,
3059                )
3060                .await
3061                {
3062                    Ok(verdict) => {
3063                        if let Some(signal) = verdict.into_signal(&user_msg) {
3064                            // Self-corrections (user corrects their own statement) must not
3065                            // penalize skills. The judge path has no record_skill_outcomes()
3066                            // call today, but this guard mirrors the regex path to make the
3067                            // intent explicit and prevent future regressions if parity is added.
3068                            let is_self_correction =
3069                                signal.kind == feedback_detector::CorrectionKind::SelfCorrection;
3070                            tracing::info!(
3071                                kind = signal.kind.as_str(),
3072                                confidence = signal.confidence,
3073                                source = "judge",
3074                                is_self_correction,
3075                                "correction signal detected"
3076                            );
3077                            store_correction_in_memory(
3078                                memory_arc,
3079                                conv_id_bg,
3080                                &assistant,
3081                                &user_msg,
3082                                skill_name,
3083                                signal.kind.as_str(),
3084                            )
3085                            .await;
3086                        }
3087                    }
3088                    Err(e) => {
3089                        tracing::warn!("judge detector failed: {e:#}");
3090                    }
3091                }
3092            });
3093        }
3094    }
3095
3096    /// Detect implicit corrections in the user's message and record them in the learning engine.
3097    ///
3098    /// Uses regex-based `FeedbackDetector` first. If a `JudgeDetector` is configured and the
3099    /// regex result is borderline, the LLM judge runs in a background task (non-blocking).
3100    /// When `DetectorMode::Model` and an `LlmClassifier` is attached, the LLM classifier is
3101    /// used instead of `JudgeDetector`, sharing the same adaptive thresholds and rate limiter.
3102    #[allow(clippy::too_many_lines)]
3103    async fn detect_and_record_corrections(
3104        &mut self,
3105        trimmed: &str,
3106        conv_id: Option<zeph_memory::ConversationId>,
3107    ) {
3108        let correction_detection_enabled = self
3109            .learning_engine
3110            .config
3111            .as_ref()
3112            .is_none_or(|c| c.correction_detection);
3113        if !correction_detection_enabled {
3114            return;
3115        }
3116
3117        let previous_user_messages: Vec<&str> = self
3118            .msg
3119            .messages
3120            .iter()
3121            .filter(|m| m.role == Role::User)
3122            .map(|m| m.content.as_str())
3123            .collect();
3124
3125        let regex_signal = self
3126            .feedback
3127            .detector
3128            .detect(trimmed, &previous_user_messages);
3129
3130        // Judge/Model mode: invoke LLM in background if regex is borderline or missed.
3131        //
3132        // The LLM call is decoupled from the response pipeline — it records the
3133        // correction asynchronously via tokio::spawn and returns None immediately
3134        // so the user response is not blocked.
3135        //
3136        // TODO(I3): JoinHandles are not tracked — outstanding tasks may be aborted
3137        // on runtime shutdown before store_user_correction completes. This is
3138        // acceptable for the learning subsystem at MVP. Future: collect handles in
3139        // Agent and drain on graceful shutdown.
3140        // Check rate limit synchronously before deciding to spawn.
3141        // The feedback.judge is &mut self so check_rate_limit() can update call_times.
3142        //
3143        // DetectorMode::Model reuses the judge's adaptive thresholds + rate limiter.
3144        // If llm_classifier is present but judge is None, create a temporary JudgeDetector
3145        // for threshold/rate-limit checking only (not for actual LLM calls).
3146        let judge_should_run = if self.feedback.llm_classifier.is_some() {
3147            // Model mode: use judge thresholds + rate limiter for gating.
3148            let adaptive_low = self
3149                .learning_engine
3150                .config
3151                .as_ref()
3152                .map_or(0.5, |c| c.judge_adaptive_low);
3153            let adaptive_high = self
3154                .learning_engine
3155                .config
3156                .as_ref()
3157                .map_or(0.8, |c| c.judge_adaptive_high);
3158            let should_invoke = self
3159                .feedback
3160                .judge
3161                .get_or_insert_with(|| {
3162                    feedback_detector::JudgeDetector::new(adaptive_low, adaptive_high)
3163                })
3164                .should_invoke(regex_signal.as_ref());
3165            should_invoke
3166                && self
3167                    .feedback
3168                    .judge
3169                    .as_mut()
3170                    .is_some_and(feedback_detector::JudgeDetector::check_rate_limit)
3171        } else {
3172            // Judge mode (or regex-only when neither judge nor llm_classifier is set).
3173            self.feedback
3174                .judge
3175                .as_ref()
3176                .is_some_and(|jd| jd.should_invoke(regex_signal.as_ref()))
3177                && self
3178                    .feedback
3179                    .judge
3180                    .as_mut() // lgtm[rust/cleartext-logging]
3181                    .is_some_and(feedback_detector::JudgeDetector::check_rate_limit)
3182        };
3183
3184        let (signal, signal_source) = if judge_should_run {
3185            self.spawn_judge_correction_check(trimmed, conv_id);
3186            // Judge runs in background — return None so the response pipeline continues.
3187            (None, "judge")
3188        } else {
3189            (regex_signal, "regex")
3190        };
3191
3192        let Some(signal) = signal else { return };
3193        tracing::info!(
3194            kind = signal.kind.as_str(),
3195            confidence = signal.confidence,
3196            source = signal_source,
3197            "implicit correction detected"
3198        );
3199        // REV-PH2-002 + SEC-PH2-002: cap feedback_text to 500 chars (UTF-8 safe)
3200        let feedback_text = context::truncate_chars(&signal.feedback_text, 500);
3201        // Self-corrections (user corrects their own statement) must not penalize skills —
3202        // the agent did nothing wrong. Store for analytics but skip skill outcome recording.
3203        if self.is_learning_enabled()
3204            && signal.kind != feedback_detector::CorrectionKind::SelfCorrection
3205        {
3206            self.record_skill_outcomes(
3207                "user_rejection",
3208                Some(&feedback_text),
3209                Some(signal.kind.as_str()),
3210            )
3211            .await;
3212        }
3213        if let Some(memory) = &self.memory_state.memory {
3214            // Use `trimmed` (raw user input, untainted by secrets) instead of
3215            // `feedback_text` (derived from previous_user_messages → self.msg.messages)
3216            // to avoid the CodeQL cleartext-logging taint path.
3217            let correction_text = context::truncate_chars(trimmed, 500);
3218            match memory
3219                .sqlite()
3220                .store_user_correction(
3221                    conv_id.map(|c| c.0),
3222                    "",
3223                    &correction_text,
3224                    self.skill_state
3225                        .active_skill_names
3226                        .first()
3227                        .map(String::as_str),
3228                    signal.kind.as_str(),
3229                )
3230                .await
3231            {
3232                Ok(correction_id) => {
3233                    if let Err(e) = memory
3234                        .store_correction_embedding(correction_id, &correction_text)
3235                        .await
3236                    {
3237                        tracing::warn!("failed to store correction embedding: {e:#}");
3238                    }
3239                }
3240                Err(e) => tracing::warn!("failed to store user correction: {e:#}"),
3241            }
3242        }
3243    }
3244
3245    async fn process_user_message(
3246        &mut self,
3247        text: String,
3248        image_parts: Vec<zeph_llm::provider::MessagePart>,
3249    ) -> Result<(), error::AgentError> {
3250        // Record iteration start in trace collector (C-02: owned guard, no borrow held).
3251        let iteration_index = self.debug_state.iteration_counter;
3252        self.debug_state.iteration_counter += 1;
3253        if let Some(ref mut tc) = self.debug_state.trace_collector {
3254            tc.begin_iteration(iteration_index, text.trim());
3255            // CR-01: store the span ID so LLM/tool execution can attach child spans.
3256            self.debug_state.current_iteration_span_id =
3257                tc.current_iteration_span_id(iteration_index);
3258        }
3259
3260        let result = self
3261            .process_user_message_inner(text, image_parts, iteration_index)
3262            .await;
3263
3264        // Close iteration span regardless of outcome (partial trace preserved on error).
3265        if let Some(ref mut tc) = self.debug_state.trace_collector {
3266            let status = if result.is_ok() {
3267                crate::debug_dump::trace::SpanStatus::Ok
3268            } else {
3269                crate::debug_dump::trace::SpanStatus::Error {
3270                    message: "iteration failed".to_owned(),
3271                }
3272            };
3273            tc.end_iteration(iteration_index, status);
3274        }
3275        self.debug_state.current_iteration_span_id = None;
3276
3277        result
3278    }
3279
3280    #[allow(clippy::too_many_lines)]
3281    async fn process_user_message_inner(
3282        &mut self,
3283        text: String,
3284        image_parts: Vec<zeph_llm::provider::MessagePart>,
3285        iteration_index: usize,
3286    ) -> Result<(), error::AgentError> {
3287        let _ = iteration_index; // Used indirectly via debug_state.current_iteration_span_id.
3288        self.lifecycle.cancel_token = CancellationToken::new();
3289        let signal = Arc::clone(&self.lifecycle.cancel_signal);
3290        let token = self.lifecycle.cancel_token.clone();
3291        tokio::spawn(async move {
3292            signal.notified().await;
3293            token.cancel();
3294        });
3295        let trimmed = text.trim();
3296
3297        if let Some(result) = self.dispatch_slash_command(trimmed).await {
3298            return result;
3299        }
3300
3301        self.check_pending_rollbacks().await;
3302
3303        // Guardrail: LLM-based prompt injection pre-screening at the user input boundary.
3304        #[cfg(feature = "guardrail")]
3305        if let Some(ref guardrail) = self.security.guardrail {
3306            use zeph_sanitizer::guardrail::GuardrailVerdict;
3307            let verdict = guardrail.check(trimmed).await;
3308            match &verdict {
3309                GuardrailVerdict::Flagged { reason, .. } => {
3310                    tracing::warn!(
3311                        reason = %reason,
3312                        should_block = verdict.should_block(),
3313                        "guardrail flagged user input"
3314                    );
3315                    if verdict.should_block() {
3316                        let msg = format!("[guardrail] Input blocked: {reason}");
3317                        let _ = self.channel.send(&msg).await;
3318                        let _ = self.channel.flush_chunks().await;
3319                        return Ok(());
3320                    }
3321                    // Warn mode: notify but continue.
3322                    let _ = self
3323                        .channel
3324                        .send(&format!("[guardrail] Warning: {reason}"))
3325                        .await;
3326                }
3327                GuardrailVerdict::Error { error } => {
3328                    if guardrail.error_should_block() {
3329                        tracing::warn!(%error, "guardrail check failed (fail_strategy=closed), blocking input");
3330                        let msg = "[guardrail] Input blocked: check failed (see logs for details)";
3331                        let _ = self.channel.send(msg).await;
3332                        let _ = self.channel.flush_chunks().await;
3333                        return Ok(());
3334                    }
3335                    tracing::warn!(%error, "guardrail check failed (fail_strategy=open), allowing input");
3336                }
3337                GuardrailVerdict::Safe => {}
3338            }
3339        }
3340
3341        // ML classifier: lightweight injection detection on user input boundary.
3342        // Runs after guardrail (LLM-based) to layer defenses. On detection, blocks and returns.
3343        // Falls back to regex on classifier error/timeout — never degrades below regex baseline.
3344        #[cfg(feature = "classifiers")]
3345        if self.security.sanitizer.classify_injection(trimmed).await {
3346            let _ = self
3347                .channel
3348                .send("[security] Input blocked: injection detected by classifier.")
3349                .await;
3350            let _ = self.channel.flush_chunks().await;
3351            return Ok(());
3352        }
3353
3354        // Extract before rebuild_system_prompt so the value is not tainted
3355        // by the secrets-bearing system prompt (ConversationId is just an i64).
3356        let conv_id = self.memory_state.conversation_id;
3357        self.rebuild_system_prompt(&text).await;
3358
3359        self.detect_and_record_corrections(trimmed, conv_id).await;
3360        self.learning_engine.tick();
3361        self.analyze_and_learn().await;
3362        self.sync_graph_counts().await;
3363
3364        // Reset per-turn compaction guard FIRST so SideQuest sees a clean slate (C2 fix).
3365        // complete_focus and maybe_sidequest_eviction set this flag when they run (C1 fix).
3366        // advance_turn() transitions CompactedThisTurn → Cooling/Ready; all other states
3367        // pass through unchanged. See CompactionState::advance_turn for ordering guarantees.
3368        self.context_manager.compaction = self.context_manager.compaction.advance_turn();
3369
3370        // Tick Focus Agent and SideQuest turn counters (#1850, #1885).
3371        #[cfg(feature = "context-compression")]
3372        {
3373            self.focus.tick();
3374
3375            // SideQuest eviction: runs every N user turns when enabled.
3376            // Skipped when is_compacted_this_turn (focus truncation or prior eviction ran).
3377            let sidequest_should_fire = self.sidequest.tick();
3378            if sidequest_should_fire && !self.context_manager.compaction.is_compacted_this_turn() {
3379                self.maybe_sidequest_eviction();
3380            }
3381        }
3382
3383        // Tier 0: batch-apply deferred tool summaries when approaching context limit.
3384        // This is a pure in-memory operation (no LLM call) — summaries were pre-computed
3385        // during the tool loop. Intentionally does NOT set compacted_this_turn, so
3386        // proactive/reactive compaction may still fire if tokens remain above their thresholds.
3387        self.maybe_apply_deferred_summaries();
3388        self.flush_deferred_summaries().await;
3389
3390        // Proactive compression fires first (if configured); if it runs, reactive is skipped.
3391        if let Err(e) = self.maybe_proactive_compress().await {
3392            tracing::warn!("proactive compression failed: {e:#}");
3393        }
3394
3395        if let Err(e) = self.maybe_compact().await {
3396            tracing::warn!("context compaction failed: {e:#}");
3397        }
3398
3399        if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
3400            tracing::warn!("context preparation failed: {e:#}");
3401        }
3402
3403        self.learning_engine.reset_reflection();
3404
3405        let mut all_image_parts = std::mem::take(&mut self.msg.pending_image_parts);
3406        all_image_parts.extend(image_parts);
3407        let image_parts = all_image_parts;
3408
3409        let user_msg = if !image_parts.is_empty() && self.provider.supports_vision() {
3410            let mut parts = vec![zeph_llm::provider::MessagePart::Text { text: text.clone() }];
3411            parts.extend(image_parts);
3412            Message::from_parts(Role::User, parts)
3413        } else {
3414            if !image_parts.is_empty() {
3415                tracing::warn!(
3416                    count = image_parts.len(),
3417                    "image attachments dropped: provider does not support vision"
3418                );
3419            }
3420            Message {
3421                role: Role::User,
3422                content: text.clone(),
3423                parts: vec![],
3424                metadata: MessageMetadata::default(),
3425            }
3426        };
3427        // Extract URLs from user input and add to user_provided_urls for grounding checks.
3428        let urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
3429        if !urls.is_empty()
3430            && let Ok(mut set) = self.security.user_provided_urls.write()
3431        {
3432            set.extend(urls);
3433        }
3434
3435        // Image parts intentionally excluded — base64 payloads too large for message history.
3436        self.persist_message(Role::User, &text, &[], false).await;
3437        self.push_message(user_msg);
3438
3439        if let Err(e) = self.process_response().await {
3440            tracing::error!("Response processing failed: {e:#}");
3441            let user_msg = format!("Error: {e:#}");
3442            self.channel.send(&user_msg).await?;
3443            self.msg.messages.pop();
3444            self.recompute_prompt_tokens();
3445            self.channel.flush_chunks().await?;
3446        }
3447
3448        Ok(())
3449    }
3450
3451    async fn handle_image_command(&mut self, path: &str) -> Result<(), error::AgentError> {
3452        use std::path::Component;
3453        use zeph_llm::provider::{ImageData, MessagePart};
3454
3455        // Reject paths that traverse outside the current directory.
3456        let has_parent_dir = std::path::Path::new(path)
3457            .components()
3458            .any(|c| c == Component::ParentDir);
3459        if has_parent_dir {
3460            self.channel
3461                .send("Invalid image path: path traversal not allowed")
3462                .await?;
3463            let _ = self.channel.flush_chunks().await;
3464            return Ok(());
3465        }
3466
3467        let data = match std::fs::read(path) {
3468            Ok(d) => d,
3469            Err(e) => {
3470                self.channel
3471                    .send(&format!("Cannot read image {path}: {e}"))
3472                    .await?;
3473                let _ = self.channel.flush_chunks().await;
3474                return Ok(());
3475            }
3476        };
3477        if data.len() > MAX_IMAGE_BYTES {
3478            self.channel
3479                .send(&format!(
3480                    "Image {path} exceeds size limit ({} MB), skipping",
3481                    MAX_IMAGE_BYTES / 1024 / 1024
3482                ))
3483                .await?;
3484            let _ = self.channel.flush_chunks().await;
3485            return Ok(());
3486        }
3487        let mime_type = detect_image_mime(Some(path)).to_string();
3488        self.msg
3489            .pending_image_parts
3490            .push(MessagePart::Image(Box::new(ImageData { data, mime_type })));
3491        self.channel
3492            .send(&format!("Image loaded: {path}. Send your message."))
3493            .await?;
3494        let _ = self.channel.flush_chunks().await;
3495        Ok(())
3496    }
3497
3498    async fn handle_help_command(&mut self) -> Result<(), error::AgentError> {
3499        use std::fmt::Write;
3500
3501        let mut out = String::from("Slash commands:\n\n");
3502
3503        let categories = [
3504            slash_commands::SlashCategory::Info,
3505            slash_commands::SlashCategory::Session,
3506            slash_commands::SlashCategory::Model,
3507            slash_commands::SlashCategory::Memory,
3508            slash_commands::SlashCategory::Tools,
3509            slash_commands::SlashCategory::Planning,
3510            slash_commands::SlashCategory::Debug,
3511            slash_commands::SlashCategory::Advanced,
3512        ];
3513
3514        for cat in &categories {
3515            let entries: Vec<_> = slash_commands::COMMANDS
3516                .iter()
3517                .filter(|c| &c.category == cat)
3518                .collect();
3519            if entries.is_empty() {
3520                continue;
3521            }
3522            let _ = writeln!(out, "{}:", cat.as_str());
3523            for cmd in entries {
3524                if cmd.args.is_empty() {
3525                    let _ = write!(out, "  {}", cmd.name);
3526                } else {
3527                    let _ = write!(out, "  {} {}", cmd.name, cmd.args);
3528                }
3529                let _ = write!(out, "  — {}", cmd.description);
3530                if let Some(feat) = cmd.feature_gate {
3531                    let _ = write!(out, " [requires: {feat}]");
3532                }
3533                let _ = writeln!(out);
3534            }
3535            let _ = writeln!(out);
3536        }
3537
3538        self.channel.send(out.trim_end()).await?;
3539        Ok(())
3540    }
3541
3542    #[allow(clippy::too_many_lines)]
3543    async fn handle_status_command(&mut self) -> Result<(), error::AgentError> {
3544        use std::fmt::Write;
3545
3546        let uptime = self.lifecycle.start_time.elapsed().as_secs();
3547        let msg_count = self
3548            .msg
3549            .messages
3550            .iter()
3551            .filter(|m| m.role == Role::User)
3552            .count();
3553
3554        let (
3555            api_calls,
3556            prompt_tokens,
3557            completion_tokens,
3558            cost_cents,
3559            mcp_servers,
3560            orch_plans,
3561            orch_tasks,
3562            orch_completed,
3563            orch_failed,
3564            orch_skipped,
3565        ) = if let Some(ref tx) = self.metrics.metrics_tx {
3566            let m = tx.borrow();
3567            (
3568                m.api_calls,
3569                m.prompt_tokens,
3570                m.completion_tokens,
3571                m.cost_spent_cents,
3572                m.mcp_server_count,
3573                m.orchestration.plans_total,
3574                m.orchestration.tasks_total,
3575                m.orchestration.tasks_completed,
3576                m.orchestration.tasks_failed,
3577                m.orchestration.tasks_skipped,
3578            )
3579        } else {
3580            (0, 0, 0, 0.0, 0, 0, 0, 0, 0, 0)
3581        };
3582
3583        let skill_count = self
3584            .skill_state
3585            .registry
3586            .read()
3587            .map(|r| r.all_meta().len())
3588            .unwrap_or(0);
3589
3590        let mut out = String::from("Session status:\n\n");
3591        let _ = writeln!(out, "Provider:  {}", self.provider.name());
3592        let _ = writeln!(out, "Model:     {}", self.runtime.model_name);
3593        let _ = writeln!(out, "Uptime:    {uptime}s");
3594        let _ = writeln!(out, "Turns:     {msg_count}");
3595        let _ = writeln!(out, "API calls: {api_calls}");
3596        let _ = writeln!(
3597            out,
3598            "Tokens:    {prompt_tokens} prompt / {completion_tokens} completion"
3599        );
3600        let _ = writeln!(out, "Skills:    {skill_count}");
3601        let _ = writeln!(out, "MCP:       {mcp_servers} server(s)");
3602        if let Some(ref tf) = self.tool_schema_filter {
3603            let _ = writeln!(
3604                out,
3605                "Filter:    enabled (top_k={}, always_on={}, {} embeddings)",
3606                tf.top_k(),
3607                tf.always_on_count(),
3608                tf.embedding_count(),
3609            );
3610        }
3611        if cost_cents > 0.0 {
3612            let _ = writeln!(out, "Cost:      ${:.4}", cost_cents / 100.0);
3613        }
3614        if orch_plans > 0 {
3615            let _ = writeln!(out);
3616            let _ = writeln!(out, "Orchestration:");
3617            let _ = writeln!(out, "  Plans:     {orch_plans}");
3618            let _ = writeln!(out, "  Tasks:     {orch_completed}/{orch_tasks} completed");
3619            if orch_failed > 0 {
3620                let _ = writeln!(out, "  Failed:    {orch_failed}");
3621            }
3622            if orch_skipped > 0 {
3623                let _ = writeln!(out, "  Skipped:   {orch_skipped}");
3624            }
3625        }
3626
3627        // Subgoal display (#2022): show active subgoal when a subgoal strategy is active.
3628        #[cfg(feature = "context-compression")]
3629        {
3630            use crate::config::PruningStrategy;
3631            if matches!(
3632                self.context_manager.compression.pruning_strategy,
3633                PruningStrategy::Subgoal | PruningStrategy::SubgoalMig
3634            ) {
3635                let _ = writeln!(out);
3636                let _ = writeln!(
3637                    out,
3638                    "Pruning:   {}",
3639                    match self.context_manager.compression.pruning_strategy {
3640                        PruningStrategy::SubgoalMig => "subgoal_mig",
3641                        _ => "subgoal",
3642                    }
3643                );
3644                let subgoal_count = self.compression.subgoal_registry.subgoals.len();
3645                let _ = writeln!(out, "Subgoals:  {subgoal_count} tracked");
3646                if let Some(active) = self.compression.subgoal_registry.active_subgoal() {
3647                    let _ = writeln!(out, "Active:    \"{}\"", active.description);
3648                } else {
3649                    let _ = writeln!(out, "Active:    (none yet)");
3650                }
3651            }
3652        }
3653
3654        // Graph memory status: show recall mode when graph memory is enabled.
3655        let gc = &self.memory_state.graph_config;
3656        if gc.enabled {
3657            let _ = writeln!(out);
3658            if gc.spreading_activation.enabled {
3659                let _ = writeln!(
3660                    out,
3661                    "Graph recall: spreading activation (lambda={:.2}, hops={})",
3662                    gc.spreading_activation.decay_lambda, gc.spreading_activation.max_hops,
3663                );
3664            } else {
3665                let _ = writeln!(out, "Graph recall: BFS (hops={})", gc.max_hops,);
3666            }
3667        }
3668
3669        self.channel.send(out.trim_end()).await?;
3670        Ok(())
3671    }
3672
3673    #[cfg(feature = "guardrail")]
3674    async fn handle_guardrail_command(&mut self) -> Result<(), error::AgentError> {
3675        use std::fmt::Write;
3676
3677        let mut out = String::new();
3678        if let Some(ref guardrail) = self.security.guardrail {
3679            let stats = guardrail.stats();
3680            let _ = writeln!(out, "Guardrail: enabled");
3681            let _ = writeln!(out, "Action:    {:?}", guardrail.action());
3682            let _ = writeln!(out, "Fail strategy: {:?}", guardrail.fail_strategy());
3683            let _ = writeln!(out, "Timeout:   {}ms", guardrail.timeout_ms());
3684            let _ = writeln!(
3685                out,
3686                "Tool scan: {}",
3687                if guardrail.scan_tool_output() {
3688                    "enabled"
3689                } else {
3690                    "disabled"
3691                }
3692            );
3693            let _ = writeln!(out, "\nStats:");
3694            let _ = writeln!(out, "  Total checks:  {}", stats.total_checks);
3695            let _ = writeln!(out, "  Flagged:       {}", stats.flagged_count);
3696            let _ = writeln!(out, "  Errors:        {}", stats.error_count);
3697            let _ = writeln!(out, "  Avg latency:   {}ms", stats.avg_latency_ms());
3698        } else {
3699            out.push_str("Guardrail: disabled\n");
3700            out.push_str(
3701                "Enable with: --guardrail flag or [security.guardrail] enabled = true in config",
3702            );
3703        }
3704
3705        self.channel.send(out.trim_end()).await?;
3706        Ok(())
3707    }
3708
3709    async fn handle_skills_command(&mut self) -> Result<(), error::AgentError> {
3710        use std::fmt::Write;
3711
3712        let mut output = String::from("Available skills:\n\n");
3713
3714        let all_meta: Vec<zeph_skills::loader::SkillMeta> = self
3715            .skill_state
3716            .registry
3717            .read()
3718            .expect("registry read lock")
3719            .all_meta()
3720            .into_iter()
3721            .cloned()
3722            .collect();
3723
3724        for meta in &all_meta {
3725            let trust_info = if let Some(memory) = &self.memory_state.memory {
3726                memory
3727                    .sqlite()
3728                    .load_skill_trust(&meta.name)
3729                    .await
3730                    .ok()
3731                    .flatten()
3732                    .map_or_else(String::new, |r| format!(" [{}]", r.trust_level))
3733            } else {
3734                String::new()
3735            };
3736            let _ = writeln!(output, "- {} — {}{trust_info}", meta.name, meta.description);
3737        }
3738
3739        if let Some(memory) = &self.memory_state.memory {
3740            match memory.sqlite().load_skill_usage().await {
3741                Ok(usage) if !usage.is_empty() => {
3742                    output.push_str("\nUsage statistics:\n\n");
3743                    for row in &usage {
3744                        let _ = writeln!(
3745                            output,
3746                            "- {}: {} invocations (last: {})",
3747                            row.skill_name, row.invocation_count, row.last_used_at,
3748                        );
3749                    }
3750                }
3751                Ok(_) => {}
3752                Err(e) => tracing::warn!("failed to load skill usage: {e:#}"),
3753            }
3754        }
3755
3756        self.channel.send(&output).await?;
3757        Ok(())
3758    }
3759
3760    async fn handle_feedback(&mut self, input: &str) -> Result<(), error::AgentError> {
3761        let Some((name, rest)) = input.split_once(' ') else {
3762            self.channel
3763                .send("Usage: /feedback <skill_name> <message>")
3764                .await?;
3765            return Ok(());
3766        };
3767        let (skill_name, feedback) = (name.trim(), rest.trim().trim_matches('"'));
3768
3769        if feedback.is_empty() {
3770            self.channel
3771                .send("Usage: /feedback <skill_name> <message>")
3772                .await?;
3773            return Ok(());
3774        }
3775
3776        let Some(memory) = &self.memory_state.memory else {
3777            self.channel.send("Memory not available.").await?;
3778            return Ok(());
3779        };
3780
3781        let outcome_type = if self.feedback.detector.detect(feedback, &[]).is_some() {
3782            "user_rejection"
3783        } else {
3784            "user_approval"
3785        };
3786
3787        memory
3788            .sqlite()
3789            .record_skill_outcome(
3790                skill_name,
3791                None,
3792                self.memory_state.conversation_id,
3793                outcome_type,
3794                None,
3795                Some(feedback),
3796            )
3797            .await?;
3798
3799        if self.is_learning_enabled() && outcome_type == "user_rejection" {
3800            self.generate_improved_skill(skill_name, feedback, "", Some(feedback))
3801                .await
3802                .ok();
3803        }
3804
3805        self.channel
3806            .send(&format!("Feedback recorded for \"{skill_name}\"."))
3807            .await?;
3808        Ok(())
3809    }
3810
3811    /// Poll a sub-agent until it reaches a terminal state, bridging secret requests to the
3812    /// channel. Returns a human-readable status string suitable for sending to the user.
3813    async fn poll_subagent_until_done(&mut self, task_id: &str, label: &str) -> Option<String> {
3814        use crate::subagent::SubAgentState;
3815        let result = loop {
3816            tokio::time::sleep(std::time::Duration::from_millis(500)).await;
3817
3818            // Bridge secret requests from sub-agent to channel.confirm().
3819            // Fetch the pending request first, then release the borrow before
3820            // calling channel.confirm() (which requires &mut self).
3821            #[allow(clippy::redundant_closure_for_method_calls)]
3822            let pending = self
3823                .orchestration
3824                .subagent_manager
3825                .as_mut()
3826                .and_then(|m| m.try_recv_secret_request());
3827            if let Some((req_task_id, req)) = pending {
3828                // req.secret_key is pre-validated to [a-zA-Z0-9_-] in manager.rs
3829                // (SEC-P1-02), so it is safe to embed in the prompt string.
3830                let confirm_prompt = format!(
3831                    "Sub-agent requests secret '{}'. Allow?",
3832                    crate::text::truncate_to_chars(&req.secret_key, 100)
3833                );
3834                let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
3835                if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
3836                    if approved {
3837                        let ttl = std::time::Duration::from_secs(300);
3838                        let key = req.secret_key.clone();
3839                        if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
3840                            let _ = mgr.deliver_secret(&req_task_id, key);
3841                        }
3842                    } else {
3843                        let _ = mgr.deny_secret(&req_task_id);
3844                    }
3845                }
3846            }
3847
3848            let mgr = self.orchestration.subagent_manager.as_ref()?;
3849            let statuses = mgr.statuses();
3850            let Some((_, status)) = statuses.iter().find(|(id, _)| id == task_id) else {
3851                break format!("{label} completed (no status available).");
3852            };
3853            match status.state {
3854                SubAgentState::Completed => {
3855                    let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
3856                    break format!("{label} completed: {msg}");
3857                }
3858                SubAgentState::Failed => {
3859                    let msg = status
3860                        .last_message
3861                        .clone()
3862                        .unwrap_or_else(|| "unknown error".into());
3863                    break format!("{label} failed: {msg}");
3864                }
3865                SubAgentState::Canceled => {
3866                    break format!("{label} was cancelled.");
3867                }
3868                _ => {
3869                    let _ = self
3870                        .channel
3871                        .send_status(&format!(
3872                            "{label}: turn {}/{}",
3873                            status.turns_used,
3874                            self.orchestration
3875                                .subagent_manager
3876                                .as_ref()
3877                                .and_then(|m| m.agents_def(task_id))
3878                                .map_or(20, |d| d.permissions.max_turns)
3879                        ))
3880                        .await;
3881                }
3882            }
3883        };
3884        Some(result)
3885    }
3886
3887    /// Resolve a unique full `task_id` from a prefix. Returns `None` if the manager is absent,
3888    /// `Some(Err(msg))` on ambiguity/not-found, `Some(Ok(full_id))` on success.
3889    fn resolve_agent_id_prefix(&mut self, prefix: &str) -> Option<Result<String, String>> {
3890        let mgr = self.orchestration.subagent_manager.as_mut()?;
3891        let full_ids: Vec<String> = mgr
3892            .statuses()
3893            .into_iter()
3894            .map(|(tid, _)| tid)
3895            .filter(|tid| tid.starts_with(prefix))
3896            .collect();
3897        Some(match full_ids.as_slice() {
3898            [] => Err(format!("No sub-agent with id prefix '{prefix}'")),
3899            [fid] => Ok(fid.clone()),
3900            _ => Err(format!(
3901                "Ambiguous id prefix '{prefix}': matches {} agents",
3902                full_ids.len()
3903            )),
3904        })
3905    }
3906
3907    fn handle_agent_list(&self) -> Option<String> {
3908        use std::fmt::Write as _;
3909        let mgr = self.orchestration.subagent_manager.as_ref()?;
3910        let defs = mgr.definitions();
3911        if defs.is_empty() {
3912            return Some("No sub-agent definitions found.".into());
3913        }
3914        let mut out = String::from("Available sub-agents:\n");
3915        for d in defs {
3916            let memory_label = match d.memory {
3917                Some(crate::subagent::MemoryScope::User) => " [memory:user]",
3918                Some(crate::subagent::MemoryScope::Project) => " [memory:project]",
3919                Some(crate::subagent::MemoryScope::Local) => " [memory:local]",
3920                None => "",
3921            };
3922            if let Some(ref src) = d.source {
3923                let _ = writeln!(
3924                    out,
3925                    "  {}{} — {} ({})",
3926                    d.name, memory_label, d.description, src
3927                );
3928            } else {
3929                let _ = writeln!(out, "  {}{} — {}", d.name, memory_label, d.description);
3930            }
3931        }
3932        Some(out)
3933    }
3934
3935    fn handle_agent_status(&self) -> Option<String> {
3936        use std::fmt::Write as _;
3937        let mgr = self.orchestration.subagent_manager.as_ref()?;
3938        let statuses = mgr.statuses();
3939        if statuses.is_empty() {
3940            return Some("No active sub-agents.".into());
3941        }
3942        let mut out = String::from("Active sub-agents:\n");
3943        for (id, s) in &statuses {
3944            let state = format!("{:?}", s.state).to_lowercase();
3945            let elapsed = s.started_at.elapsed().as_secs();
3946            let _ = writeln!(
3947                out,
3948                "  [{short}] {state}  turns={t}  elapsed={elapsed}s  {msg}",
3949                short = &id[..8.min(id.len())],
3950                t = s.turns_used,
3951                msg = s.last_message.as_deref().unwrap_or(""),
3952            );
3953            // Show memory directory path for agents with memory enabled.
3954            if let Some(def) = mgr.agents_def(id)
3955                && let Some(scope) = def.memory
3956                && let Ok(dir) = crate::subagent::memory::resolve_memory_dir(scope, &def.name)
3957            {
3958                let _ = writeln!(out, "       memory: {}", dir.display());
3959            }
3960        }
3961        Some(out)
3962    }
3963
3964    fn handle_agent_approve(&mut self, id: &str) -> Option<String> {
3965        let full_id = match self.resolve_agent_id_prefix(id)? {
3966            Ok(fid) => fid,
3967            Err(msg) => return Some(msg),
3968        };
3969        let mgr = self.orchestration.subagent_manager.as_mut()?;
3970        if let Some((tid, req)) = mgr.try_recv_secret_request()
3971            && tid == full_id
3972        {
3973            let key = req.secret_key.clone();
3974            let ttl = std::time::Duration::from_secs(300);
3975            if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
3976                return Some(format!("Approve failed: {e}"));
3977            }
3978            if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
3979                return Some(format!("Secret delivery failed: {e}"));
3980            }
3981            return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
3982        }
3983        Some(format!(
3984            "No pending secret request for sub-agent '{full_id}'."
3985        ))
3986    }
3987
3988    fn handle_agent_deny(&mut self, id: &str) -> Option<String> {
3989        let full_id = match self.resolve_agent_id_prefix(id)? {
3990            Ok(fid) => fid,
3991            Err(msg) => return Some(msg),
3992        };
3993        let mgr = self.orchestration.subagent_manager.as_mut()?;
3994        match mgr.deny_secret(&full_id) {
3995            Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
3996            Err(e) => Some(format!("Deny failed: {e}")),
3997        }
3998    }
3999
4000    async fn handle_agent_command(&mut self, cmd: crate::subagent::AgentCommand) -> Option<String> {
4001        use crate::subagent::AgentCommand;
4002
4003        match cmd {
4004            AgentCommand::List => self.handle_agent_list(),
4005            AgentCommand::Background { name, prompt } => {
4006                let provider = self.provider.clone();
4007                let tool_executor = Arc::clone(&self.tool_executor);
4008                let skills = self.filtered_skills_for(&name);
4009                let mgr = self.orchestration.subagent_manager.as_mut()?;
4010                let cfg = self.orchestration.subagent_config.clone();
4011                match mgr.spawn(&name, &prompt, provider, tool_executor, skills, &cfg) {
4012                    Ok(id) => Some(format!(
4013                        "Sub-agent '{name}' started in background (id: {short})",
4014                        short = &id[..8.min(id.len())]
4015                    )),
4016                    Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
4017                }
4018            }
4019            AgentCommand::Spawn { name, prompt }
4020            | AgentCommand::Mention {
4021                agent: name,
4022                prompt,
4023            } => {
4024                // Foreground spawn: launch and await completion, streaming status to user.
4025                let provider = self.provider.clone();
4026                let tool_executor = Arc::clone(&self.tool_executor);
4027                let skills = self.filtered_skills_for(&name);
4028                let mgr = self.orchestration.subagent_manager.as_mut()?;
4029                let cfg = self.orchestration.subagent_config.clone();
4030                let task_id = match mgr.spawn(&name, &prompt, provider, tool_executor, skills, &cfg)
4031                {
4032                    Ok(id) => id,
4033                    Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
4034                };
4035                let short = task_id[..8.min(task_id.len())].to_owned();
4036                let _ = self
4037                    .channel
4038                    .send(&format!("Sub-agent '{name}' running... (id: {short})"))
4039                    .await;
4040                let label = format!("Sub-agent '{name}'");
4041                self.poll_subagent_until_done(&task_id, &label).await
4042            }
4043            AgentCommand::Status => self.handle_agent_status(),
4044            AgentCommand::Cancel { id } => {
4045                let mgr = self.orchestration.subagent_manager.as_mut()?;
4046                // Accept prefix match on task_id.
4047                let ids: Vec<String> = mgr
4048                    .statuses()
4049                    .into_iter()
4050                    .map(|(task_id, _)| task_id)
4051                    .filter(|task_id| task_id.starts_with(&id))
4052                    .collect();
4053                match ids.as_slice() {
4054                    [] => Some(format!("No sub-agent with id prefix '{id}'")),
4055                    [full_id] => {
4056                        let full_id = full_id.clone();
4057                        match mgr.cancel(&full_id) {
4058                            Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
4059                            Err(e) => Some(format!("Cancel failed: {e}")),
4060                        }
4061                    }
4062                    _ => Some(format!(
4063                        "Ambiguous id prefix '{id}': matches {} agents",
4064                        ids.len()
4065                    )),
4066                }
4067            }
4068            AgentCommand::Approve { id } => self.handle_agent_approve(&id),
4069            AgentCommand::Deny { id } => self.handle_agent_deny(&id),
4070            AgentCommand::Resume { id, prompt } => {
4071                let cfg = self.orchestration.subagent_config.clone();
4072                // Resolve definition name from transcript meta before spawning so we can
4073                // look up skills by definition name rather than the UUID prefix (S1 fix).
4074                let def_name = {
4075                    let mgr = self.orchestration.subagent_manager.as_ref()?;
4076                    match mgr.def_name_for_resume(&id, &cfg) {
4077                        Ok(name) => name,
4078                        Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
4079                    }
4080                };
4081                let skills = self.filtered_skills_for(&def_name);
4082                let provider = self.provider.clone();
4083                let tool_executor = Arc::clone(&self.tool_executor);
4084                let mgr = self.orchestration.subagent_manager.as_mut()?;
4085                let (task_id, _) =
4086                    match mgr.resume(&id, &prompt, provider, tool_executor, skills, &cfg) {
4087                        Ok(pair) => pair,
4088                        Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
4089                    };
4090                let short = task_id[..8.min(task_id.len())].to_owned();
4091                let _ = self
4092                    .channel
4093                    .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
4094                    .await;
4095                self.poll_subagent_until_done(&task_id, "Resumed sub-agent")
4096                    .await
4097            }
4098        }
4099    }
4100
4101    fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
4102        let mgr = self.orchestration.subagent_manager.as_ref()?;
4103        let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
4104        let reg = self
4105            .skill_state
4106            .registry
4107            .read()
4108            .expect("registry read lock");
4109        match crate::subagent::filter_skills(&reg, &def.skills) {
4110            Ok(skills) => {
4111                let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
4112                if bodies.is_empty() {
4113                    None
4114                } else {
4115                    Some(bodies)
4116                }
4117            }
4118            Err(e) => {
4119                tracing::warn!(error = %e, "skill filtering failed for sub-agent");
4120                None
4121            }
4122        }
4123    }
4124
4125    /// Update trust DB records for all reloaded skills.
4126    async fn update_trust_for_reloaded_skills(&self, all_meta: &[zeph_skills::loader::SkillMeta]) {
4127        let Some(ref memory) = self.memory_state.memory else {
4128            return;
4129        };
4130        let trust_cfg = self.skill_state.trust_config.clone();
4131        let managed_dir = self.skill_state.managed_dir.clone();
4132        for meta in all_meta {
4133            let source_kind = if managed_dir
4134                .as_ref()
4135                .is_some_and(|d| meta.skill_dir.starts_with(d))
4136            {
4137                zeph_memory::sqlite::SourceKind::Hub
4138            } else {
4139                zeph_memory::sqlite::SourceKind::Local
4140            };
4141            let initial_level = if matches!(source_kind, zeph_memory::sqlite::SourceKind::Hub) {
4142                &trust_cfg.default_level
4143            } else {
4144                &trust_cfg.local_level
4145            };
4146            match zeph_skills::compute_skill_hash(&meta.skill_dir) {
4147                Ok(current_hash) => {
4148                    let existing = memory
4149                        .sqlite()
4150                        .load_skill_trust(&meta.name)
4151                        .await
4152                        .ok()
4153                        .flatten();
4154                    let trust_level_str = if let Some(ref row) = existing {
4155                        if row.blake3_hash == current_hash {
4156                            row.trust_level.clone()
4157                        } else {
4158                            trust_cfg.hash_mismatch_level.to_string()
4159                        }
4160                    } else {
4161                        initial_level.to_string()
4162                    };
4163                    let source_path = meta.skill_dir.to_str();
4164                    if let Err(e) = memory
4165                        .sqlite()
4166                        .upsert_skill_trust(
4167                            &meta.name,
4168                            &trust_level_str,
4169                            source_kind,
4170                            None,
4171                            source_path,
4172                            &current_hash,
4173                        )
4174                        .await
4175                    {
4176                        tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
4177                    }
4178                }
4179                Err(e) => {
4180                    tracing::warn!("failed to compute hash for '{}': {e:#}", meta.name);
4181                }
4182            }
4183        }
4184    }
4185
4186    /// Rebuild or sync the in-memory skill matcher and BM25 index after a registry update.
4187    async fn rebuild_skill_matcher(&mut self, all_meta: &[&zeph_skills::loader::SkillMeta]) {
4188        let provider = self.embedding_provider.clone();
4189        let embed_fn = |text: &str| -> zeph_skills::matcher::EmbedFuture {
4190            let owned = text.to_owned();
4191            let p = provider.clone();
4192            Box::pin(async move { p.embed(&owned).await })
4193        };
4194
4195        let needs_inmemory_rebuild = !self
4196            .skill_state
4197            .matcher
4198            .as_ref()
4199            .is_some_and(SkillMatcherBackend::is_qdrant);
4200
4201        if needs_inmemory_rebuild {
4202            self.skill_state.matcher = SkillMatcher::new(all_meta, embed_fn)
4203                .await
4204                .map(SkillMatcherBackend::InMemory);
4205        } else if let Some(ref mut backend) = self.skill_state.matcher {
4206            let _ = self.channel.send_status("syncing skill index...").await;
4207            if let Err(e) = backend
4208                .sync(all_meta, &self.skill_state.embedding_model, embed_fn)
4209                .await
4210            {
4211                tracing::warn!("failed to sync skill embeddings: {e:#}");
4212            }
4213        }
4214
4215        if self.skill_state.hybrid_search {
4216            let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
4217            let _ = self.channel.send_status("rebuilding search index...").await;
4218            self.skill_state.bm25_index = Some(zeph_skills::bm25::Bm25Index::build(&descs));
4219        }
4220    }
4221
4222    async fn reload_skills(&mut self) {
4223        let new_registry = SkillRegistry::load(&self.skill_state.skill_paths);
4224        if new_registry.fingerprint()
4225            == self
4226                .skill_state
4227                .registry
4228                .read()
4229                .expect("registry read lock")
4230                .fingerprint()
4231        {
4232            return;
4233        }
4234        let _ = self.channel.send_status("reloading skills...").await;
4235        *self
4236            .skill_state
4237            .registry
4238            .write()
4239            .expect("registry write lock") = new_registry;
4240
4241        let all_meta = self
4242            .skill_state
4243            .registry
4244            .read()
4245            .expect("registry read lock")
4246            .all_meta()
4247            .into_iter()
4248            .cloned()
4249            .collect::<Vec<_>>();
4250
4251        self.update_trust_for_reloaded_skills(&all_meta).await;
4252
4253        let all_meta_refs = all_meta.iter().collect::<Vec<_>>();
4254        self.rebuild_skill_matcher(&all_meta_refs).await;
4255
4256        let all_skills: Vec<Skill> = {
4257            let reg = self
4258                .skill_state
4259                .registry
4260                .read()
4261                .expect("registry read lock");
4262            reg.all_meta()
4263                .iter()
4264                .filter_map(|m| reg.get_skill(&m.name).ok())
4265                .collect()
4266        };
4267        let trust_map = self.build_skill_trust_map().await;
4268        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
4269        let skills_prompt = format_skills_prompt(&all_skills, &trust_map, &empty_health);
4270        self.skill_state
4271            .last_skills_prompt
4272            .clone_from(&skills_prompt);
4273        let system_prompt = build_system_prompt(&skills_prompt, None, None, false);
4274        if let Some(msg) = self.msg.messages.first_mut() {
4275            msg.content = system_prompt;
4276        }
4277
4278        let _ = self.channel.send_status("").await;
4279        tracing::info!(
4280            "reloaded {} skill(s)",
4281            self.skill_state
4282                .registry
4283                .read()
4284                .expect("registry read lock")
4285                .all_meta()
4286                .len()
4287        );
4288    }
4289
4290    fn reload_instructions(&mut self) {
4291        // Drain any additional queued events before reloading to avoid redundant reloads.
4292        if let Some(ref mut rx) = self.instructions.reload_rx {
4293            while rx.try_recv().is_ok() {}
4294        }
4295        let Some(ref state) = self.instructions.reload_state else {
4296            return;
4297        };
4298        let new_blocks = crate::instructions::load_instructions(
4299            &state.base_dir,
4300            &state.provider_kinds,
4301            &state.explicit_files,
4302            state.auto_detect,
4303        );
4304        let old_sources: std::collections::HashSet<_> =
4305            self.instructions.blocks.iter().map(|b| &b.source).collect();
4306        let new_sources: std::collections::HashSet<_> =
4307            new_blocks.iter().map(|b| &b.source).collect();
4308        for added in new_sources.difference(&old_sources) {
4309            tracing::info!(path = %added.display(), "instruction file added");
4310        }
4311        for removed in old_sources.difference(&new_sources) {
4312            tracing::info!(path = %removed.display(), "instruction file removed");
4313        }
4314        tracing::info!(
4315            old_count = self.instructions.blocks.len(),
4316            new_count = new_blocks.len(),
4317            "reloaded instruction files"
4318        );
4319        self.instructions.blocks = new_blocks;
4320    }
4321
4322    fn reload_config(&mut self) {
4323        let Some(ref path) = self.lifecycle.config_path else {
4324            return;
4325        };
4326        let config = match Config::load(path) {
4327            Ok(c) => c,
4328            Err(e) => {
4329                tracing::warn!("config reload failed: {e:#}");
4330                return;
4331            }
4332        };
4333
4334        self.runtime.security = config.security;
4335        self.runtime.timeouts = config.timeouts;
4336        self.runtime.redact_credentials = config.memory.redact_credentials;
4337        self.memory_state.history_limit = config.memory.history_limit;
4338        self.memory_state.recall_limit = config.memory.semantic.recall_limit;
4339        self.memory_state.summarization_threshold = config.memory.summarization_threshold;
4340        self.skill_state.max_active_skills = config.skills.max_active_skills;
4341        self.skill_state.disambiguation_threshold = config.skills.disambiguation_threshold;
4342        self.skill_state.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
4343        self.skill_state.hybrid_search = config.skills.hybrid_search;
4344
4345        if config.memory.context_budget_tokens > 0 {
4346            self.context_manager.budget = Some(
4347                ContextBudget::new(config.memory.context_budget_tokens, 0.20)
4348                    .with_graph_enabled(config.memory.graph.enabled),
4349            );
4350        } else {
4351            self.context_manager.budget = None;
4352        }
4353
4354        {
4355            self.memory_state.graph_config = config.memory.graph.clone();
4356        }
4357        self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
4358        self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
4359        self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
4360        self.context_manager.compaction_cooldown_turns = config.memory.compaction_cooldown_turns;
4361        self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
4362        self.context_manager.compression = config.memory.compression.clone();
4363        self.context_manager.routing = config.memory.routing.clone();
4364        self.memory_state.cross_session_score_threshold =
4365            config.memory.cross_session_score_threshold;
4366
4367        self.index.repo_map_tokens = config.index.repo_map_tokens;
4368        self.index.repo_map_ttl = std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
4369
4370        tracing::info!("config reloaded");
4371    }
4372
4373    /// `/focus` slash command: display Focus Agent status.
4374    #[cfg(feature = "context-compression")]
4375    async fn handle_focus_status_command(&mut self) -> Result<(), error::AgentError> {
4376        use std::fmt::Write;
4377        let mut out = String::from("Focus Agent status\n\n");
4378        let _ = writeln!(out, "Enabled:          {}", self.focus.config.enabled);
4379        let _ = writeln!(out, "Active session:   {}", self.focus.is_active());
4380        if let Some(ref scope) = self.focus.active_scope {
4381            let _ = writeln!(out, "Active scope:     {scope}");
4382        }
4383        let _ = writeln!(
4384            out,
4385            "Knowledge blocks: {}",
4386            self.focus.knowledge_blocks.len()
4387        );
4388        let _ = writeln!(out, "Turns since focus: {}", self.focus.turns_since_focus);
4389        self.channel.send(&out).await?;
4390        Ok(())
4391    }
4392
4393    /// `/sidequest` slash command: display `SideQuest` eviction stats.
4394    #[cfg(feature = "context-compression")]
4395    async fn handle_sidequest_status_command(&mut self) -> Result<(), error::AgentError> {
4396        use std::fmt::Write;
4397        let mut out = String::from("SideQuest status\n\n");
4398        let _ = writeln!(out, "Enabled:        {}", self.sidequest.config.enabled);
4399        let _ = writeln!(
4400            out,
4401            "Interval turns: {}",
4402            self.sidequest.config.interval_turns
4403        );
4404        let _ = writeln!(out, "Turn counter:   {}", self.sidequest.turn_counter);
4405        let _ = writeln!(out, "Passes run:     {}", self.sidequest.passes_run);
4406        let _ = writeln!(
4407            out,
4408            "Total evicted:  {} tool outputs",
4409            self.sidequest.total_evicted
4410        );
4411        self.channel.send(&out).await?;
4412        Ok(())
4413    }
4414
4415    /// Run `SideQuest` tool output eviction pass (#1885).
4416    ///
4417    /// PERF-1 fix: two-phase non-blocking design.
4418    ///
4419    /// Phase 1 (apply, this turn): check for a background LLM result spawned last turn,
4420    /// validate and apply it immediately.
4421    ///
4422    /// Phase 2 (schedule, this turn): rebuild cursors and spawn a background `tokio::spawn`
4423    /// task for the LLM call. The result is stored in `pending_sidequest_result` and applied
4424    /// next turn, so the current agent turn is never blocked by the LLM call.
4425    #[cfg(feature = "context-compression")]
4426    #[allow(clippy::too_many_lines)]
4427    fn maybe_sidequest_eviction(&mut self) {
4428        use zeph_llm::provider::{Message, MessageMetadata, Role};
4429
4430        // S1 runtime guard: warn when SideQuest is enabled alongside a non-Reactive pruning
4431        // strategy — the two systems share the same pool of evictable tool outputs and can
4432        // interfere. Disable sidequest.enabled when pruning_strategy != Reactive.
4433        if self.sidequest.config.enabled {
4434            use crate::config::PruningStrategy;
4435            if !matches!(
4436                self.context_manager.compression.pruning_strategy,
4437                PruningStrategy::Reactive
4438            ) {
4439                tracing::warn!(
4440                    strategy = ?self.context_manager.compression.pruning_strategy,
4441                    "sidequest is enabled alongside a non-Reactive pruning strategy; \
4442                     consider disabling sidequest.enabled to avoid redundant eviction"
4443                );
4444            }
4445        }
4446
4447        // Guard: do not evict while a focus session is active.
4448        if self.focus.is_active() {
4449            tracing::debug!("sidequest: skipping — focus session active");
4450            // Drop any pending result — cursors may be stale relative to focus truncation.
4451            self.compression.pending_sidequest_result = None;
4452            return;
4453        }
4454
4455        // Phase 1: apply pending result from last turn's background LLM call.
4456        if let Some(handle) = self.compression.pending_sidequest_result.take() {
4457            // `now_or_never` avoids blocking — if the task isn't done yet, skip this turn.
4458            use futures::FutureExt as _;
4459            match handle.now_or_never() {
4460                Some(Ok(Some(evicted_indices))) if !evicted_indices.is_empty() => {
4461                    let cursors_snapshot = self.sidequest.tool_output_cursors.clone();
4462                    let freed = self.sidequest.apply_eviction(
4463                        &mut self.msg.messages,
4464                        &evicted_indices,
4465                        &self.metrics.token_counter,
4466                    );
4467                    if freed > 0 {
4468                        self.recompute_prompt_tokens();
4469                        // C1 fix: prevent maybe_compact() from firing in the same turn.
4470                        // cooldown=0: eviction does not impose post-compaction cooldown.
4471                        self.context_manager.compaction =
4472                            crate::agent::context_manager::CompactionState::CompactedThisTurn {
4473                                cooldown: 0,
4474                            };
4475                        tracing::info!(
4476                            freed_tokens = freed,
4477                            evicted_cursors = evicted_indices.len(),
4478                            pass = self.sidequest.passes_run,
4479                            "sidequest eviction complete"
4480                        );
4481                        if let Some(ref d) = self.debug_state.debug_dumper {
4482                            d.dump_sidequest_eviction(&cursors_snapshot, &evicted_indices, freed);
4483                        }
4484                        if let Some(ref tx) = self.session.status_tx {
4485                            let _ = tx.send(format!("SideQuest evicted {freed} tokens"));
4486                        }
4487                    } else {
4488                        // apply_eviction returned 0 — clear spinner so it doesn't dangle.
4489                        if let Some(ref tx) = self.session.status_tx {
4490                            let _ = tx.send(String::new());
4491                        }
4492                    }
4493                }
4494                Some(Ok(None | Some(_))) => {
4495                    tracing::debug!("sidequest: pending result: no cursors to evict");
4496                    if let Some(ref tx) = self.session.status_tx {
4497                        let _ = tx.send(String::new());
4498                    }
4499                }
4500                Some(Err(e)) => {
4501                    tracing::debug!("sidequest: background task panicked: {e}");
4502                    if let Some(ref tx) = self.session.status_tx {
4503                        let _ = tx.send(String::new());
4504                    }
4505                }
4506                None => {
4507                    // Task still running — re-store and wait another turn.
4508                    // We already took it; we'd need to re-spawn, but instead just drop and
4509                    // schedule fresh below to keep the cursor list current.
4510                    tracing::debug!(
4511                        "sidequest: background LLM task not yet complete, rescheduling"
4512                    );
4513                }
4514            }
4515        }
4516
4517        // Phase 2: rebuild cursors and schedule the next background eviction LLM call.
4518        self.sidequest
4519            .rebuild_cursors(&self.msg.messages, &self.metrics.token_counter);
4520
4521        if self.sidequest.tool_output_cursors.is_empty() {
4522            tracing::debug!("sidequest: no eligible cursors");
4523            return;
4524        }
4525
4526        let prompt = self.sidequest.build_eviction_prompt();
4527        let max_eviction_ratio = self.sidequest.config.max_eviction_ratio;
4528        let n_cursors = self.sidequest.tool_output_cursors.len();
4529        // Clone the provider so the spawn closure owns it without borrowing self.
4530        let provider = self.summary_or_primary_provider().clone();
4531
4532        // Spawn background task: the LLM call runs without blocking the agent loop.
4533        let handle = tokio::spawn(async move {
4534            let msgs = [Message {
4535                role: Role::User,
4536                content: prompt,
4537                parts: vec![],
4538                metadata: MessageMetadata::default(),
4539            }];
4540            let response =
4541                match tokio::time::timeout(std::time::Duration::from_secs(5), provider.chat(&msgs))
4542                    .await
4543                {
4544                    Ok(Ok(r)) => r,
4545                    Ok(Err(e)) => {
4546                        tracing::debug!("sidequest bg: LLM call failed: {e:#}");
4547                        return None;
4548                    }
4549                    Err(_) => {
4550                        tracing::debug!("sidequest bg: LLM call timed out");
4551                        return None;
4552                    }
4553                };
4554
4555            let start = response.find('{')?;
4556            let end = response.rfind('}')?;
4557            if start > end {
4558                return None;
4559            }
4560            let json_slice = &response[start..=end];
4561            let parsed: sidequest::EvictionResponse = serde_json::from_str(json_slice).ok()?;
4562            let mut valid: Vec<usize> = parsed
4563                .del_cursors
4564                .into_iter()
4565                .filter(|&c| c < n_cursors)
4566                .collect();
4567            valid.sort_unstable();
4568            valid.dedup();
4569            #[allow(
4570                clippy::cast_precision_loss,
4571                clippy::cast_possible_truncation,
4572                clippy::cast_sign_loss
4573            )]
4574            let max_evict = ((n_cursors as f32) * max_eviction_ratio).ceil() as usize;
4575            valid.truncate(max_evict);
4576            Some(valid)
4577        });
4578
4579        self.compression.pending_sidequest_result = Some(handle);
4580        tracing::debug!("sidequest: background LLM eviction task spawned");
4581        if let Some(ref tx) = self.session.status_tx {
4582            let _ = tx.send("SideQuest: scoring tool outputs...".into());
4583        }
4584    }
4585}
4586pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
4587    while !*rx.borrow_and_update() {
4588        if rx.changed().await.is_err() {
4589            std::future::pending::<()>().await;
4590        }
4591    }
4592}
4593
4594/// Convert a `FeedbackVerdict` (from `LlmClassifier`) into a `CorrectionSignal`.
4595///
4596/// Mirrors `JudgeVerdict::into_signal` to keep both code paths symmetric.
4597fn feedback_verdict_into_signal(
4598    verdict: &zeph_llm::classifier::llm::FeedbackVerdict,
4599    user_message: &str,
4600) -> Option<feedback_detector::CorrectionSignal> {
4601    if !verdict.is_correction {
4602        return None;
4603    }
4604    let confidence = verdict.confidence.clamp(0.0, 1.0);
4605    let kind_raw = verdict.kind.trim().to_lowercase().replace(' ', "_");
4606    let kind = match kind_raw.as_str() {
4607        "explicit_rejection" => feedback_detector::CorrectionKind::ExplicitRejection,
4608        "alternative_request" => feedback_detector::CorrectionKind::AlternativeRequest,
4609        "repetition" => feedback_detector::CorrectionKind::Repetition,
4610        "self_correction" => feedback_detector::CorrectionKind::SelfCorrection,
4611        other => {
4612            tracing::warn!(
4613                kind = other,
4614                "llm-classifier returned unknown correction kind, discarding"
4615            );
4616            return None;
4617        }
4618    };
4619    Some(feedback_detector::CorrectionSignal {
4620        confidence,
4621        kind,
4622        feedback_text: user_message.to_owned(),
4623    })
4624}
4625
4626/// Store a correction record in memory (shared by judge and llm-classifier paths).
4627async fn store_correction_in_memory(
4628    memory: Option<std::sync::Arc<zeph_memory::semantic::SemanticMemory>>,
4629    conv_id: Option<zeph_memory::ConversationId>,
4630    assistant_snippet: &str,
4631    user_msg: &str,
4632    skill_name: String,
4633    kind_str: &str,
4634) {
4635    let Some(mem) = memory else { return };
4636    let correction_text = context::truncate_chars(user_msg, 500);
4637    match mem
4638        .sqlite()
4639        .store_user_correction(
4640            conv_id.map(|c| c.0),
4641            assistant_snippet,
4642            &correction_text,
4643            if skill_name.is_empty() {
4644                None
4645            } else {
4646                Some(skill_name.as_str())
4647            },
4648            kind_str,
4649        )
4650        .await
4651    {
4652        Ok(correction_id) => {
4653            if let Err(e) = mem
4654                .store_correction_embedding(correction_id, &correction_text)
4655                .await
4656            {
4657                tracing::warn!("failed to store correction embedding: {e:#}");
4658            }
4659        }
4660        Err(e) => {
4661            tracing::warn!("failed to store judge correction: {e:#}");
4662        }
4663    }
4664}
4665
4666pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
4667    match rx {
4668        Some(inner) => {
4669            if let Some(v) = inner.recv().await {
4670                Some(v)
4671            } else {
4672                *rx = None;
4673                std::future::pending().await
4674            }
4675        }
4676        None => std::future::pending().await,
4677    }
4678}
4679
4680#[cfg(test)]
4681mod tests;
4682
4683#[cfg(test)]
4684pub(crate) use tests::agent_tests;