Skip to main content

zeph_core/agent/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4mod builder;
5#[cfg(feature = "compression-guidelines")]
6pub(super) mod compression_feedback;
7mod context;
8pub(crate) mod context_manager;
9pub mod error;
10#[cfg(feature = "experiments")]
11mod experiment_cmd;
12pub(super) mod feedback_detector;
13mod graph_commands;
14mod index;
15mod learning;
16pub(crate) mod learning_engine;
17mod log_commands;
18#[cfg(feature = "lsp-context")]
19mod lsp_commands;
20mod mcp;
21mod message_queue;
22mod persistence;
23pub(crate) mod rate_limiter;
24#[cfg(feature = "scheduler")]
25mod scheduler_commands;
26mod skill_management;
27pub mod slash_commands;
28pub(crate) mod tool_execution;
29pub(crate) mod tool_orchestrator;
30mod trust_commands;
31mod utils;
32
33use std::collections::VecDeque;
34use std::path::PathBuf;
35use std::time::Instant;
36
37use std::sync::Arc;
38
39use tokio::sync::{Notify, mpsc, watch};
40use tokio_util::sync::CancellationToken;
41use zeph_llm::any::AnyProvider;
42use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
43use zeph_llm::stt::SpeechToText;
44
45use crate::metrics::MetricsSnapshot;
46use std::collections::HashMap;
47use zeph_memory::TokenCounter;
48use zeph_memory::semantic::SemanticMemory;
49use zeph_skills::loader::Skill;
50use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
51use zeph_skills::prompt::format_skills_prompt;
52use zeph_skills::registry::SkillRegistry;
53use zeph_skills::watcher::SkillEvent;
54use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
55
56use crate::channel::Channel;
57use crate::config::Config;
58use crate::config::{SecurityConfig, SkillPromptMode, TimeoutConfig};
59use crate::config_watcher::ConfigEvent;
60use crate::context::{
61    ContextBudget, EnvironmentContext, build_system_prompt, build_system_prompt_with_instructions,
62};
63use crate::cost::CostTracker;
64use crate::instructions::{InstructionBlock, InstructionEvent, InstructionReloadState};
65use crate::sanitizer::ContentSanitizer;
66use crate::sanitizer::quarantine::QuarantinedSummarizer;
67use crate::vault::Secret;
68
69use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, QueuedMessage, detect_image_mime};
70
71pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
72pub(crate) const DOCUMENT_RAG_PREFIX: &str = "## Relevant documents\n";
73pub(crate) const RECALL_PREFIX: &str = "[semantic recall]\n";
74pub(crate) const CODE_CONTEXT_PREFIX: &str = "[code context]\n";
75pub(crate) const SUMMARY_PREFIX: &str = "[conversation summaries]\n";
76pub(crate) const CROSS_SESSION_PREFIX: &str = "[cross-session context]\n";
77pub(crate) const CORRECTIONS_PREFIX: &str = "[past corrections]\n";
78pub(crate) const GRAPH_FACTS_PREFIX: &str = "[known facts]\n";
79/// Prefix used for LSP context messages (`Role::System`) injected into message history.
80/// The tool-pair summarizer targets User/Assistant pairs and skips System messages,
81/// so these notes are never accidentally summarized. `remove_lsp_messages` uses this
82/// prefix to clear stale notes before each fresh injection.
83#[cfg(feature = "lsp-context")]
84pub(crate) const LSP_NOTE_PREFIX: &str = "[lsp ";
85pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
86
87fn format_plan_summary(graph: &crate::orchestration::TaskGraph) -> String {
88    use std::fmt::Write;
89    let mut out = String::new();
90    let _ = writeln!(out, "Plan: \"{}\"", graph.goal);
91    let _ = writeln!(out, "Tasks: {}", graph.tasks.len());
92    let _ = writeln!(out);
93    for (i, task) in graph.tasks.iter().enumerate() {
94        let deps = if task.depends_on.is_empty() {
95            String::new()
96        } else {
97            let ids: Vec<String> = task.depends_on.iter().map(ToString::to_string).collect();
98            format!(" (after: {})", ids.join(", "))
99        };
100        let agent = task.agent_hint.as_deref().unwrap_or("-");
101        let _ = writeln!(out, "  {}. [{}] {}{}", i + 1, agent, task.title, deps);
102    }
103    out
104}
105
106pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
107    use std::fmt::Write;
108    let capacity = "[tool output: ".len()
109        + tool_name.len()
110        + "]\n```\n".len()
111        + body.len()
112        + TOOL_OUTPUT_SUFFIX.len();
113    let mut buf = String::with_capacity(capacity);
114    let _ = write!(
115        buf,
116        "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
117    );
118    buf
119}
120
121pub(super) struct MemoryState {
122    pub(super) memory: Option<Arc<SemanticMemory>>,
123    pub(super) conversation_id: Option<zeph_memory::ConversationId>,
124    pub(super) history_limit: u32,
125    pub(super) recall_limit: usize,
126    pub(super) summarization_threshold: usize,
127    pub(super) cross_session_score_threshold: f32,
128    pub(super) autosave_assistant: bool,
129    pub(super) autosave_min_length: usize,
130    pub(super) tool_call_cutoff: usize,
131    pub(super) unsummarized_count: usize,
132    pub(super) document_config: crate::config::DocumentConfig,
133    pub(super) graph_config: crate::config::GraphConfig,
134    pub(super) compression_guidelines_config: zeph_memory::CompressionGuidelinesConfig,
135}
136
137pub(super) struct SkillState {
138    pub(super) registry: std::sync::Arc<std::sync::RwLock<SkillRegistry>>,
139    pub(super) skill_paths: Vec<PathBuf>,
140    pub(super) managed_dir: Option<PathBuf>,
141    pub(super) trust_config: crate::config::TrustConfig,
142    pub(super) matcher: Option<SkillMatcherBackend>,
143    pub(super) max_active_skills: usize,
144    pub(super) disambiguation_threshold: f32,
145    pub(super) embedding_model: String,
146    pub(super) skill_reload_rx: Option<mpsc::Receiver<SkillEvent>>,
147    pub(super) active_skill_names: Vec<String>,
148    pub(super) last_skills_prompt: String,
149    pub(super) prompt_mode: SkillPromptMode,
150    /// Custom secrets available at runtime: key=hyphenated name, value=secret.
151    pub(super) available_custom_secrets: HashMap<String, Secret>,
152    pub(super) cosine_weight: f32,
153    pub(super) hybrid_search: bool,
154    pub(super) bm25_index: Option<zeph_skills::bm25::Bm25Index>,
155}
156
157pub(super) struct McpState {
158    pub(super) tools: Vec<zeph_mcp::McpTool>,
159    pub(super) registry: Option<zeph_mcp::McpToolRegistry>,
160    pub(super) manager: Option<std::sync::Arc<zeph_mcp::McpManager>>,
161    pub(super) allowed_commands: Vec<String>,
162    pub(super) max_dynamic: usize,
163    /// Shared with `McpToolExecutor` so native `tool_use` sees the current tool list.
164    pub(super) shared_tools: Option<std::sync::Arc<std::sync::RwLock<Vec<zeph_mcp::McpTool>>>>,
165    /// Receives full flattened tool list after any `tools/list_changed` notification.
166    pub(super) tool_rx: Option<tokio::sync::watch::Receiver<Vec<zeph_mcp::McpTool>>>,
167}
168
169pub(super) struct IndexState {
170    pub(super) retriever: Option<std::sync::Arc<zeph_index::retriever::CodeRetriever>>,
171    pub(super) repo_map_tokens: usize,
172    pub(super) cached_repo_map: Option<(String, std::time::Instant)>,
173    pub(super) repo_map_ttl: std::time::Duration,
174}
175
176pub(super) struct RuntimeConfig {
177    pub(super) security: SecurityConfig,
178    pub(super) timeouts: TimeoutConfig,
179    pub(super) model_name: String,
180    pub(super) permission_policy: zeph_tools::PermissionPolicy,
181    pub(super) redact_credentials: bool,
182}
183
184/// Groups security-related subsystems (sanitizer, quarantine, exfiltration guard).
185pub(super) struct SecurityState {
186    pub(super) sanitizer: ContentSanitizer,
187    pub(super) quarantine_summarizer: Option<QuarantinedSummarizer>,
188    pub(super) exfiltration_guard: crate::sanitizer::exfiltration::ExfiltrationGuard,
189    pub(super) flagged_urls: std::collections::HashSet<String>,
190    pub(super) pii_filter: crate::sanitizer::pii::PiiFilter,
191    pub(super) memory_validator: crate::sanitizer::memory_validation::MemoryWriteValidator,
192}
193
194/// Groups debug/diagnostics subsystems (dumper, trace collector, anomaly detector, logging config).
195pub(super) struct DebugState {
196    pub(super) debug_dumper: Option<crate::debug_dump::DebugDumper>,
197    pub(super) dump_format: crate::debug_dump::DumpFormat,
198    pub(super) trace_collector: Option<crate::debug_dump::trace::TracingCollector>,
199    /// Monotonically increasing counter for `process_user_message` calls.
200    /// Used to key spans in `trace_collector.active_iterations`.
201    pub(super) iteration_counter: usize,
202    pub(super) anomaly_detector: Option<zeph_tools::AnomalyDetector>,
203    pub(super) logging_config: crate::config::LoggingConfig,
204    /// Base dump directory — stored so `/dump-format trace` can create a `TracingCollector` (CR-04).
205    pub(super) dump_dir: Option<PathBuf>,
206    /// Service name for `TracingCollector` created via runtime format switch (CR-04).
207    pub(super) trace_service_name: String,
208    /// Whether to redact in `TracingCollector` created via runtime format switch (CR-04).
209    pub(super) trace_redact: bool,
210    /// Span ID of the currently executing iteration — used by LLM/tool span wiring (CR-01).
211    /// Set to `Some` at the start of `process_user_message`, cleared at end.
212    pub(super) current_iteration_span_id: Option<[u8; 8]>,
213}
214
215/// Groups agent lifecycle state: shutdown signaling, timing, and I/O notification channels.
216pub(super) struct LifecycleState {
217    pub(super) shutdown: watch::Receiver<bool>,
218    pub(super) start_time: Instant,
219    pub(super) cancel_signal: Arc<Notify>,
220    pub(super) cancel_token: CancellationToken,
221    pub(super) config_path: Option<PathBuf>,
222    pub(super) config_reload_rx: Option<mpsc::Receiver<ConfigEvent>>,
223    pub(super) warmup_ready: Option<watch::Receiver<bool>>,
224    pub(super) update_notify_rx: Option<mpsc::Receiver<String>>,
225    pub(super) custom_task_rx: Option<mpsc::Receiver<String>>,
226}
227
228/// Groups provider-related state: alternate providers, runtime switching, and compaction flags.
229pub(super) struct ProviderState {
230    pub(super) summary_provider: Option<AnyProvider>,
231    /// Shared slot for runtime model switching; set by external caller (e.g. ACP).
232    pub(super) provider_override: Option<Arc<std::sync::RwLock<Option<AnyProvider>>>>,
233    pub(super) judge_provider: Option<AnyProvider>,
234    pub(super) cached_prompt_tokens: u64,
235    /// Whether the active provider has server-side compaction enabled (Claude compact-2026-01-12).
236    /// When true, client-side compaction is skipped.
237    pub(super) server_compaction_active: bool,
238    pub(super) stt: Option<Box<dyn SpeechToText>>,
239}
240
241/// Groups metrics and cost tracking state.
242pub(super) struct MetricsState {
243    pub(super) metrics_tx: Option<watch::Sender<MetricsSnapshot>>,
244    pub(super) cost_tracker: Option<CostTracker>,
245    pub(super) token_counter: Arc<TokenCounter>,
246    /// Set to `true` when Claude extended context (`enable_extended_context = true`) is active.
247    /// Read from config at build time, not derived from provider internals.
248    pub(super) extended_context: bool,
249}
250
251/// Groups task orchestration and subagent state.
252pub(super) struct OrchestrationState {
253    /// Graph waiting for `/plan confirm` before execution starts.
254    pub(super) pending_graph: Option<crate::orchestration::TaskGraph>,
255    /// Cancellation token for the currently executing plan. `None` when no plan is running.
256    /// Created fresh in `handle_plan_confirm()`, cancelled in `handle_plan_cancel()`.
257    ///
258    /// # Known limitation
259    ///
260    /// Token plumbing is ready; the delivery path requires the agent message loop to be
261    /// restructured so `/plan cancel` can be received while `run_scheduler_loop` holds
262    /// `&mut self`. See follow-up issue #1603 (SEC-M34-002).
263    pub(super) plan_cancel_token: Option<CancellationToken>,
264    /// Manages spawned sub-agents.
265    pub(super) subagent_manager: Option<crate::subagent::SubAgentManager>,
266    pub(super) subagent_config: crate::config::SubAgentConfig,
267    pub(super) orchestration_config: crate::config::OrchestrationConfig,
268}
269
270pub struct Agent<C: Channel> {
271    provider: AnyProvider,
272    channel: C,
273    pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
274    messages: Vec<Message>,
275    pub(super) memory_state: MemoryState,
276    pub(super) skill_state: SkillState,
277    pub(super) context_manager: context_manager::ContextManager,
278    pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
279    pub(super) learning_engine: learning_engine::LearningEngine,
280    pub(super) feedback_detector: feedback_detector::FeedbackDetector,
281    pub(super) judge_detector: Option<feedback_detector::JudgeDetector>,
282    pub(super) runtime: RuntimeConfig,
283    pub(super) mcp: McpState,
284    pub(super) index: IndexState,
285    message_queue: VecDeque<QueuedMessage>,
286    env_context: EnvironmentContext,
287    pub(super) response_cache: Option<std::sync::Arc<zeph_memory::ResponseCache>>,
288    /// Parent tool call ID when this agent runs as a subagent inside another agent session.
289    /// Propagated into every `LoopbackEvent::ToolStart` / `ToolOutput` so the IDE can build
290    /// a subagent hierarchy.
291    pub(crate) parent_tool_use_id: Option<String>,
292    pub(super) debug_state: DebugState,
293    /// Instruction blocks loaded at startup from provider-specific and explicit files.
294    pub(super) instruction_blocks: Vec<InstructionBlock>,
295    pub(super) instruction_reload_rx: Option<mpsc::Receiver<InstructionEvent>>,
296    pub(super) instruction_reload_state: Option<InstructionReloadState>,
297    pub(super) security: SecurityState,
298    /// Image parts staged by `/image` commands, attached to the next user message.
299    pending_image_parts: Vec<zeph_llm::provider::MessagePart>,
300    #[cfg(feature = "experiments")]
301    pub(super) experiment_config: crate::config::ExperimentConfig,
302    /// Cancellation token for a running experiment session. `Some` means an experiment is active.
303    #[cfg(feature = "experiments")]
304    pub(super) experiment_cancel: Option<tokio_util::sync::CancellationToken>,
305    /// Pre-built config snapshot used as the experiment baseline (agent path).
306    /// Set via `with_experiment_baseline()`; defaults to `ConfigSnapshot::default()`.
307    #[cfg(feature = "experiments")]
308    pub(super) experiment_baseline: crate::experiments::ConfigSnapshot,
309    /// Receives completion/error messages from the background experiment engine task.
310    /// When a message arrives in the agent loop, it is forwarded to the channel and
311    /// `experiment_cancel` is cleared. Always present so the select! branch compiles
312    /// unconditionally; only ever receives messages when the `experiments` feature is enabled.
313    pub(super) experiment_notify_rx: Option<tokio::sync::mpsc::Receiver<String>>,
314    /// Sender end paired with `experiment_notify_rx`. Cloned into the background task.
315    /// Feature-gated because it is only used in `experiment_cmd.rs`.
316    #[cfg(feature = "experiments")]
317    pub(super) experiment_notify_tx: tokio::sync::mpsc::Sender<String>,
318    /// LSP context injection hooks. Fires after native tool execution, injects
319    /// diagnostics/hover notes as `Role::System` messages before the next LLM call.
320    #[cfg(feature = "lsp-context")]
321    pub(super) lsp_hooks: Option<crate::lsp_hooks::LspHookRunner>,
322    // --- New sub-structs ---
323    pub(super) lifecycle: LifecycleState,
324    pub(super) providers: ProviderState,
325    pub(super) metrics: MetricsState,
326    pub(super) orchestration: OrchestrationState,
327    pub(super) rate_limiter: rate_limiter::ToolRateLimiter,
328}
329
330impl<C: Channel> Agent<C> {
331    #[must_use]
332    pub fn new(
333        provider: AnyProvider,
334        channel: C,
335        registry: SkillRegistry,
336        matcher: Option<SkillMatcherBackend>,
337        max_active_skills: usize,
338        tool_executor: impl ToolExecutor + 'static,
339    ) -> Self {
340        let registry = std::sync::Arc::new(std::sync::RwLock::new(registry));
341        Self::new_with_registry_arc(
342            provider,
343            channel,
344            registry,
345            matcher,
346            max_active_skills,
347            tool_executor,
348        )
349    }
350
351    /// Create an agent from a pre-wrapped registry Arc, allowing the caller to
352    /// share the same Arc with other components (e.g. [`crate::SkillLoaderExecutor`]).
353    ///
354    /// # Panics
355    ///
356    /// Panics if the registry `RwLock` is poisoned.
357    #[must_use]
358    #[allow(clippy::too_many_lines)] // flat struct literal initializing all Agent sub-structs — one field per sub-struct, cannot be split further
359    pub fn new_with_registry_arc(
360        provider: AnyProvider,
361        channel: C,
362        registry: std::sync::Arc<std::sync::RwLock<SkillRegistry>>,
363        matcher: Option<SkillMatcherBackend>,
364        max_active_skills: usize,
365        tool_executor: impl ToolExecutor + 'static,
366    ) -> Self {
367        let all_skills: Vec<Skill> = {
368            let reg = registry.read().expect("registry read lock poisoned");
369            reg.all_meta()
370                .iter()
371                .filter_map(|m| reg.get_skill(&m.name).ok())
372                .collect()
373        };
374        let empty_trust = HashMap::new();
375        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
376        let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
377        let system_prompt = build_system_prompt(&skills_prompt, None, None, false);
378        tracing::debug!(len = system_prompt.len(), "initial system prompt built");
379        tracing::trace!(prompt = %system_prompt, "full system prompt");
380
381        let initial_prompt_tokens = u64::try_from(system_prompt.len()).unwrap_or(0) / 4;
382        let (_tx, rx) = watch::channel(false);
383        let token_counter = Arc::new(TokenCounter::new());
384        // Always create the receiver side of the experiment notification channel so the
385        // select! branch in the agent loop compiles unconditionally. The sender is only
386        // stored when the experiments feature is enabled (it is only used in experiment_cmd.rs).
387        #[cfg(feature = "experiments")]
388        let (exp_notify_tx, exp_notify_rx) = tokio::sync::mpsc::channel::<String>(4);
389        #[cfg(not(feature = "experiments"))]
390        let (_exp_notify_tx, exp_notify_rx) = tokio::sync::mpsc::channel::<String>(4);
391        Self {
392            provider,
393            channel,
394            tool_executor: Arc::new(tool_executor),
395            messages: vec![Message {
396                role: Role::System,
397                content: system_prompt,
398                parts: vec![],
399                metadata: MessageMetadata::default(),
400            }],
401            memory_state: MemoryState {
402                memory: None,
403                conversation_id: None,
404                history_limit: 50,
405                recall_limit: 5,
406                summarization_threshold: 50,
407                cross_session_score_threshold: 0.35,
408                autosave_assistant: false,
409                autosave_min_length: 20,
410                tool_call_cutoff: 6,
411                unsummarized_count: 0,
412                document_config: crate::config::DocumentConfig::default(),
413                graph_config: crate::config::GraphConfig::default(),
414                compression_guidelines_config: zeph_memory::CompressionGuidelinesConfig::default(),
415            },
416            skill_state: SkillState {
417                registry,
418                skill_paths: Vec::new(),
419                managed_dir: None,
420                trust_config: crate::config::TrustConfig::default(),
421                matcher,
422                max_active_skills,
423                disambiguation_threshold: 0.05,
424                embedding_model: String::new(),
425                skill_reload_rx: None,
426                active_skill_names: Vec::new(),
427                last_skills_prompt: skills_prompt,
428                prompt_mode: SkillPromptMode::Auto,
429                available_custom_secrets: HashMap::new(),
430                cosine_weight: 0.7,
431                hybrid_search: false,
432                bm25_index: None,
433            },
434            context_manager: context_manager::ContextManager::new(),
435            tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
436            learning_engine: learning_engine::LearningEngine::new(),
437            feedback_detector: feedback_detector::FeedbackDetector::new(0.6),
438            judge_detector: None,
439            debug_state: DebugState {
440                debug_dumper: None,
441                dump_format: crate::debug_dump::DumpFormat::default(),
442                trace_collector: None,
443                iteration_counter: 0,
444                anomaly_detector: None,
445                logging_config: crate::config::LoggingConfig::default(),
446                dump_dir: None,
447                trace_service_name: String::new(),
448                trace_redact: true,
449                current_iteration_span_id: None,
450            },
451            runtime: RuntimeConfig {
452                security: SecurityConfig::default(),
453                timeouts: TimeoutConfig::default(),
454                model_name: String::new(),
455                permission_policy: zeph_tools::PermissionPolicy::default(),
456                redact_credentials: true,
457            },
458            mcp: McpState {
459                tools: Vec::new(),
460                registry: None,
461                manager: None,
462                allowed_commands: Vec::new(),
463                max_dynamic: 10,
464                shared_tools: None,
465                tool_rx: None,
466            },
467            index: IndexState {
468                retriever: None,
469                repo_map_tokens: 0,
470                cached_repo_map: None,
471                repo_map_ttl: std::time::Duration::from_secs(300),
472            },
473            message_queue: VecDeque::new(),
474            env_context: EnvironmentContext::gather(""),
475            response_cache: None,
476            parent_tool_use_id: None,
477            instruction_blocks: Vec::new(),
478            instruction_reload_rx: None,
479            instruction_reload_state: None,
480            security: SecurityState {
481                sanitizer: ContentSanitizer::new(
482                    &crate::sanitizer::ContentIsolationConfig::default(),
483                ),
484                quarantine_summarizer: None,
485                exfiltration_guard: crate::sanitizer::exfiltration::ExfiltrationGuard::new(
486                    crate::sanitizer::exfiltration::ExfiltrationGuardConfig::default(),
487                ),
488                flagged_urls: std::collections::HashSet::new(),
489                pii_filter: crate::sanitizer::pii::PiiFilter::new(
490                    crate::sanitizer::pii::PiiFilterConfig::default(),
491                ),
492                memory_validator: crate::sanitizer::memory_validation::MemoryWriteValidator::new(
493                    crate::sanitizer::memory_validation::MemoryWriteValidationConfig::default(),
494                ),
495            },
496            pending_image_parts: Vec::new(),
497            #[cfg(feature = "experiments")]
498            experiment_config: crate::config::ExperimentConfig::default(),
499            #[cfg(feature = "experiments")]
500            experiment_baseline: crate::experiments::ConfigSnapshot::default(),
501            experiment_notify_rx: Some(exp_notify_rx),
502            #[cfg(feature = "experiments")]
503            experiment_notify_tx: exp_notify_tx,
504            #[cfg(feature = "lsp-context")]
505            lsp_hooks: None,
506            #[cfg(feature = "experiments")]
507            experiment_cancel: None,
508            // --- New sub-structs ---
509            lifecycle: LifecycleState {
510                shutdown: rx,
511                start_time: Instant::now(),
512                cancel_signal: Arc::new(Notify::new()),
513                cancel_token: CancellationToken::new(),
514                config_path: None,
515                config_reload_rx: None,
516                warmup_ready: None,
517                update_notify_rx: None,
518                custom_task_rx: None,
519            },
520            providers: ProviderState {
521                summary_provider: None,
522                provider_override: None,
523                judge_provider: None,
524                cached_prompt_tokens: initial_prompt_tokens,
525                server_compaction_active: false,
526                stt: None,
527            },
528            metrics: MetricsState {
529                metrics_tx: None,
530                cost_tracker: None,
531                token_counter,
532                extended_context: false,
533            },
534            orchestration: OrchestrationState {
535                pending_graph: None,
536                plan_cancel_token: None,
537                subagent_manager: None,
538                subagent_config: crate::config::SubAgentConfig::default(),
539                orchestration_config: crate::config::OrchestrationConfig::default(),
540            },
541            rate_limiter: rate_limiter::ToolRateLimiter::new(
542                crate::agent::rate_limiter::RateLimitConfig::default(),
543            ),
544        }
545    }
546
547    /// Poll all active sub-agents for completed/failed/canceled results.
548    ///
549    /// Non-blocking: returns immediately with a list of `(task_id, result)` pairs
550    /// for agents that have finished. Each completed agent is removed from the manager.
551    pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
552        let Some(mgr) = &mut self.orchestration.subagent_manager else {
553            return vec![];
554        };
555
556        let finished: Vec<String> = mgr
557            .statuses()
558            .into_iter()
559            .filter_map(|(id, status)| {
560                if matches!(
561                    status.state,
562                    crate::subagent::SubAgentState::Completed
563                        | crate::subagent::SubAgentState::Failed
564                        | crate::subagent::SubAgentState::Canceled
565                ) {
566                    Some(id)
567                } else {
568                    None
569                }
570            })
571            .collect();
572
573        let mut results = vec![];
574        for task_id in finished {
575            match mgr.collect(&task_id).await {
576                Ok(result) => results.push((task_id, result)),
577                Err(e) => {
578                    tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
579                }
580            }
581        }
582        results
583    }
584
585    async fn handle_plan_command(
586        &mut self,
587        cmd: crate::orchestration::PlanCommand,
588    ) -> Result<(), error::AgentError> {
589        use crate::orchestration::PlanCommand;
590
591        if !self.config_for_orchestration().enabled {
592            self.channel
593                .send(
594                    "Task orchestration is disabled. Set `orchestration.enabled = true` in config.",
595                )
596                .await?;
597            return Ok(());
598        }
599
600        match cmd {
601            PlanCommand::Goal(goal) => self.handle_plan_goal(&goal).await,
602            PlanCommand::Confirm => self.handle_plan_confirm().await,
603            PlanCommand::Status(id) => self.handle_plan_status(id.as_deref()).await,
604            PlanCommand::List => self.handle_plan_list().await,
605            PlanCommand::Cancel(id) => self.handle_plan_cancel(id.as_deref()).await,
606            PlanCommand::Resume(id) => self.handle_plan_resume(id.as_deref()).await,
607            PlanCommand::Retry(id) => self.handle_plan_retry(id.as_deref()).await,
608        }
609    }
610
611    fn config_for_orchestration(&self) -> &crate::config::OrchestrationConfig {
612        &self.orchestration.orchestration_config
613    }
614
615    async fn handle_plan_goal(&mut self, goal: &str) -> Result<(), error::AgentError> {
616        use crate::orchestration::{LlmPlanner, Planner};
617
618        if self.orchestration.pending_graph.is_some() {
619            self.channel
620                .send(
621                    "A plan is already pending confirmation. \
622                     Use /plan confirm to execute it or /plan cancel to discard.",
623                )
624                .await?;
625            return Ok(());
626        }
627
628        self.channel.send("Planning task decomposition...").await?;
629
630        let available_agents = self
631            .orchestration
632            .subagent_manager
633            .as_ref()
634            .map(|m| m.definitions().to_vec())
635            .unwrap_or_default();
636
637        let confirm_before_execute = self
638            .orchestration
639            .orchestration_config
640            .confirm_before_execute;
641        let graph = LlmPlanner::new(
642            self.provider.clone(),
643            &self.orchestration.orchestration_config,
644        )
645        .plan(goal, &available_agents)
646        .await
647        .map_err(|e| error::AgentError::Other(e.to_string()))?;
648
649        let task_count = graph.tasks.len() as u64;
650        let snapshot = crate::metrics::TaskGraphSnapshot::from(&graph);
651        self.update_metrics(|m| {
652            m.orchestration.plans_total += 1;
653            m.orchestration.tasks_total += task_count;
654            m.orchestration_graph = Some(snapshot);
655        });
656
657        if confirm_before_execute {
658            let summary = format_plan_summary(&graph);
659            self.channel.send(&summary).await?;
660            self.channel
661                .send("Type `/plan confirm` to execute, or `/plan cancel` to abort.")
662                .await?;
663            self.orchestration.pending_graph = Some(graph);
664        } else {
665            // confirm_before_execute = false: display and proceed (Phase 5 will run scheduler).
666            // TODO(#1241): wire DagScheduler tick updates for Running task state
667            let summary = format_plan_summary(&graph);
668            self.channel.send(&summary).await?;
669            self.channel
670                .send("Plan ready. Full execution will be available in a future phase.")
671                .await?;
672            // IC1: graph was shown but never confirmed; clear snapshot so it doesn't linger.
673            let now = std::time::Instant::now();
674            self.update_metrics(|m| {
675                if let Some(ref mut s) = m.orchestration_graph {
676                    "completed".clone_into(&mut s.status);
677                    s.completed_at = Some(now);
678                }
679            });
680        }
681
682        Ok(())
683    }
684
685    /// Validate that the pending plan graph can be executed.
686    ///
687    /// Sends an appropriate error message and restores the graph to `pending_graph` when
688    /// validation fails. Returns `Ok(graph)` on success, `Err(())` when validation failed
689    /// and the caller should return early.
690    async fn validate_pending_graph(
691        &mut self,
692        graph: crate::orchestration::TaskGraph,
693    ) -> Result<crate::orchestration::TaskGraph, ()> {
694        use crate::orchestration::GraphStatus;
695
696        if self.orchestration.subagent_manager.is_none() {
697            let _ = self
698                .channel
699                .send(
700                    "No sub-agents configured. Add sub-agent definitions to config \
701                     to enable plan execution.",
702                )
703                .await;
704            self.orchestration.pending_graph = Some(graph);
705            return Err(());
706        }
707
708        // REV-2: pre-validate before moving graph into the constructor so we can
709        // restore it to pending_graph on failure.
710        if graph.tasks.is_empty() {
711            let _ = self.channel.send("Plan has no tasks.").await;
712            self.orchestration.pending_graph = Some(graph);
713            return Err(());
714        }
715
716        // resume_from() rejects Completed and Canceled — guard those here too.
717        if matches!(graph.status, GraphStatus::Completed | GraphStatus::Canceled) {
718            let _ = self
719                .channel
720                .send(&format!(
721                    "Cannot re-execute a {} plan. Use `/plan <goal>` to create a new one.",
722                    graph.status
723                ))
724                .await;
725            self.orchestration.pending_graph = Some(graph);
726            return Err(());
727        }
728
729        Ok(graph)
730    }
731
732    /// Build a [`DagScheduler`] from the graph, reserving sub-agent slots.
733    ///
734    /// Returns `(scheduler, reserved)` on success or an `AgentError` on failure.
735    /// Callers must call `mgr.release_reservation(reserved)` when done.
736    fn build_dag_scheduler(
737        &mut self,
738        graph: crate::orchestration::TaskGraph,
739    ) -> Result<(crate::orchestration::DagScheduler, usize), error::AgentError> {
740        use crate::orchestration::{DagScheduler, GraphStatus, RuleBasedRouter};
741
742        let available_agents = self
743            .orchestration
744            .subagent_manager
745            .as_ref()
746            .map(|m| m.definitions().to_vec())
747            .unwrap_or_default();
748
749        // Warn when max_concurrent is too low to support the configured parallelism.
750        // This is the main cause of DagScheduler deadlocks (#1619): a planning-phase
751        // sub-agent occupies the only slot while orchestration tasks are waiting.
752        let max_concurrent = self.orchestration.subagent_config.max_concurrent;
753        let max_parallel = self.orchestration.orchestration_config.max_parallel as usize;
754        if max_concurrent < max_parallel + 1 {
755            tracing::warn!(
756                max_concurrent,
757                max_parallel,
758                "max_concurrent < max_parallel + 1: orchestration tasks may be starved by \
759                 planning-phase sub-agents; recommend setting max_concurrent >= {}",
760                max_parallel + 1
761            );
762        }
763
764        // Reserve slots equal to max_parallel so the scheduler is guaranteed capacity
765        // even if a planning-phase sub-agent is occupying a slot (#1619).
766        let reserved = max_parallel.min(max_concurrent.saturating_sub(1));
767        if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
768            mgr.reserve_slots(reserved);
769        }
770
771        // Use resume_from() for graphs that are no longer in Created status
772        // (e.g., after /plan retry which calls reset_for_retry and sets status=Running).
773        let scheduler = if graph.status == GraphStatus::Created {
774            DagScheduler::new(
775                graph,
776                &self.orchestration.orchestration_config,
777                Box::new(RuleBasedRouter),
778                available_agents,
779            )
780        } else {
781            DagScheduler::resume_from(
782                graph,
783                &self.orchestration.orchestration_config,
784                Box::new(RuleBasedRouter),
785                available_agents,
786            )
787        }
788        .map_err(|e| {
789            // Release reservation before propagating error.
790            if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
791                mgr.release_reservation(reserved);
792            }
793            error::AgentError::Other(e.to_string())
794        })?;
795
796        Ok((scheduler, reserved))
797    }
798
799    async fn handle_plan_confirm(&mut self) -> Result<(), error::AgentError> {
800        let Some(graph) = self.orchestration.pending_graph.take() else {
801            self.channel
802                .send("No pending plan to confirm. Use `/plan <goal>` to create one.")
803                .await?;
804            return Ok(());
805        };
806
807        // validate_pending_graph sends the error message and restores the graph on failure.
808        let Ok(graph) = self.validate_pending_graph(graph).await else {
809            return Ok(());
810        };
811
812        let (mut scheduler, reserved) = self.build_dag_scheduler(graph)?;
813
814        let task_count = scheduler.graph().tasks.len();
815        self.channel
816            .send(&format!(
817                "Confirmed. Executing plan ({task_count} tasks)..."
818            ))
819            .await?;
820
821        let plan_token = CancellationToken::new();
822        self.orchestration.plan_cancel_token = Some(plan_token.clone());
823
824        // Use match instead of ? so plan_cancel_token is always cleared (CRIT-07).
825        let scheduler_result = self
826            .run_scheduler_loop(&mut scheduler, task_count, plan_token)
827            .await;
828        self.orchestration.plan_cancel_token = None;
829
830        // Always release the reservation, regardless of scheduler outcome.
831        if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
832            mgr.release_reservation(reserved);
833        }
834
835        let final_status = scheduler_result?;
836        let completed_graph = scheduler.into_graph();
837
838        // Final TUI snapshot update.
839        let snapshot = crate::metrics::TaskGraphSnapshot::from(&completed_graph);
840        self.update_metrics(|m| {
841            m.orchestration_graph = Some(snapshot);
842        });
843
844        let result_label = self
845            .finalize_plan_execution(completed_graph, final_status)
846            .await?;
847
848        let now = std::time::Instant::now();
849        self.update_metrics(|m| {
850            if let Some(ref mut s) = m.orchestration_graph {
851                result_label.clone_into(&mut s.status);
852                s.completed_at = Some(now);
853            }
854        });
855        Ok(())
856    }
857
858    /// Cancel all agents referenced in `cancel_actions`.
859    ///
860    /// Returns `Some(status)` if a `Done` action is encountered, `None` otherwise.
861    fn cancel_agents_from_actions(
862        &mut self,
863        cancel_actions: Vec<crate::orchestration::SchedulerAction>,
864    ) -> Option<crate::orchestration::GraphStatus> {
865        use crate::orchestration::SchedulerAction;
866        for action in cancel_actions {
867            match action {
868                SchedulerAction::Cancel { agent_handle_id } => {
869                    if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
870                        let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
871                            tracing::trace!(error = %e, "cancel: agent already gone");
872                        });
873                    }
874                }
875                SchedulerAction::Done { status } => return Some(status),
876                SchedulerAction::Spawn { .. } | SchedulerAction::RunInline { .. } => {}
877            }
878        }
879        None
880    }
881
882    /// Handle a `SchedulerAction::Spawn` — attempt to spawn a sub-agent for the given task.
883    ///
884    /// Returns `(spawn_success, concurrency_fail, done_status)`.
885    /// `done_status` is `Some` when spawn failure forces the scheduler to emit a `Done` action.
886    async fn handle_scheduler_spawn_action(
887        &mut self,
888        scheduler: &mut crate::orchestration::DagScheduler,
889        task_id: crate::orchestration::TaskId,
890        agent_def_name: String,
891        prompt: String,
892        spawn_counter: &mut usize,
893        task_count: usize,
894    ) -> (bool, bool, Option<crate::orchestration::GraphStatus>) {
895        let task_title = scheduler
896            .graph()
897            .tasks
898            .get(task_id.index())
899            .map_or("unknown", |t| t.title.as_str());
900
901        let provider = self.provider.clone();
902        let tool_executor = Arc::clone(&self.tool_executor);
903        let skills = self.filtered_skills_for(&agent_def_name);
904        let cfg = self.orchestration.subagent_config.clone();
905        let event_tx = scheduler.event_sender();
906
907        let mgr = self
908            .orchestration
909            .subagent_manager
910            .as_mut()
911            .expect("subagent_manager checked above");
912
913        match mgr.spawn_for_task(
914            &agent_def_name,
915            &prompt,
916            provider,
917            tool_executor,
918            skills,
919            &cfg,
920            task_id,
921            event_tx,
922        ) {
923            Ok(handle_id) => {
924                *spawn_counter += 1;
925                let _ = self
926                    .channel
927                    .send_status(&format!(
928                        "Executing task {spawn_counter}/{task_count}: {task_title}..."
929                    ))
930                    .await;
931                scheduler.record_spawn(task_id, handle_id, agent_def_name);
932                (true, false, None)
933            }
934            Err(e) => {
935                tracing::error!(error = %e, %task_id, "spawn_for_task failed");
936                let concurrency_fail =
937                    matches!(e, crate::subagent::SubAgentError::ConcurrencyLimit { .. });
938                let extra = scheduler.record_spawn_failure(task_id, &e);
939                let done_status = self.cancel_agents_from_actions(extra);
940                (false, concurrency_fail, done_status)
941            }
942        }
943    }
944
945    /// Execute a `RunInline` scheduler action: run the task synchronously in the current agent.
946    ///
947    /// Sends a status update, registers the spawn with the scheduler, runs the inline tool
948    /// loop (or cancels on token fire), and posts the completion event back to the scheduler.
949    async fn handle_run_inline_action(
950        &mut self,
951        scheduler: &mut crate::orchestration::DagScheduler,
952        task_id: crate::orchestration::TaskId,
953        prompt: String,
954        spawn_counter: usize,
955        task_count: usize,
956        cancel_token: &CancellationToken,
957    ) {
958        let task_title = scheduler
959            .graph()
960            .tasks
961            .get(task_id.index())
962            .map_or("unknown", |t| t.title.as_str());
963        let _ = self
964            .channel
965            .send_status(&format!(
966                "Executing task {spawn_counter}/{task_count} (inline): {task_title}..."
967            ))
968            .await;
969
970        // record_spawn before the inline call so the completion event is always
971        // buffered before any timeout check fires in the next tick().
972        let handle_id = format!("__inline_{task_id}__");
973        scheduler.record_spawn(task_id, handle_id.clone(), "__main__".to_string());
974
975        let event_tx = scheduler.event_sender();
976        let max_iter = self.tool_orchestrator.max_iterations;
977        let outcome = tokio::select! {
978            result = self.run_inline_tool_loop(&prompt, max_iter) => {
979                match result {
980                    Ok(output) => crate::orchestration::TaskOutcome::Completed {
981                        output,
982                        artifacts: vec![],
983                    },
984                    Err(e) => crate::orchestration::TaskOutcome::Failed {
985                        error: e.to_string(),
986                    },
987                }
988            }
989            () = cancel_token.cancelled() => {
990                // TODO: use TaskOutcome::Canceled when the variant is added (#1603)
991                crate::orchestration::TaskOutcome::Failed {
992                    error: "canceled".to_string(),
993                }
994            }
995        };
996        let event = crate::orchestration::TaskEvent {
997            task_id,
998            agent_handle_id: handle_id,
999            outcome,
1000        };
1001        if let Err(e) = event_tx.send(event).await {
1002            tracing::warn!(%task_id, error = %e, "inline task event send failed");
1003        }
1004    }
1005
1006    // too_many_lines: sequential scheduler event loop with 4 tokio::select! branches
1007    // (cancel token, scheduler tick, channel recv with /plan cancel + channel-close paths,
1008    // shutdown signal) — each branch requires distinct cancel/fail/ignore semantics that
1009    // cannot be split without introducing shared mutable state across async boundaries.
1010    #[allow(clippy::too_many_lines)]
1011    /// Drive the [`DagScheduler`] tick loop until it emits `SchedulerAction::Done`.
1012    ///
1013    /// Each iteration yields at `wait_event()`, during which `channel.recv()` is polled
1014    /// concurrently via `tokio::select!`. If the user sends `/plan cancel`, all running
1015    /// sub-agent tasks are aborted and the loop exits with [`GraphStatus::Canceled`].
1016    /// If the channel is closed (`Ok(None)`), all running sub-agent tasks are aborted
1017    /// and the loop exits with [`GraphStatus::Failed`].
1018    /// Other messages received during execution are queued in `message_queue` and
1019    /// processed after the plan completes.
1020    ///
1021    /// # Known limitations
1022    ///
1023    /// `RunInline` tasks block the tick loop for their entire duration — `/plan cancel`
1024    /// cannot interrupt an in-progress inline LLM call and will only be delivered on the
1025    /// next iteration after the call completes.
1026    async fn run_scheduler_loop(
1027        &mut self,
1028        scheduler: &mut crate::orchestration::DagScheduler,
1029        task_count: usize,
1030        cancel_token: CancellationToken,
1031    ) -> Result<crate::orchestration::GraphStatus, error::AgentError> {
1032        use crate::orchestration::SchedulerAction;
1033
1034        // Sequential spawn counter for human-readable "task N/M" progress messages.
1035        // task_id.index() reflects array position and can be non-contiguous for
1036        // parallel plans (e.g. 0, 2, 4), so we use a local counter instead.
1037        let mut spawn_counter: usize = 0;
1038
1039        // Tracks (handle_id, secret_key) pairs denied this plan execution to prevent
1040        // re-prompting the user when a sub-agent re-requests the same secret after denial.
1041        let mut denied_secrets: std::collections::HashSet<(String, String)> =
1042            std::collections::HashSet::new();
1043
1044        let final_status = 'tick: loop {
1045            let actions = scheduler.tick();
1046
1047            // Track batch-level spawn outcomes for record_batch_backoff() below.
1048            let mut any_spawn_success = false;
1049            let mut any_concurrency_failure = false;
1050
1051            for action in actions {
1052                match action {
1053                    SchedulerAction::Spawn {
1054                        task_id,
1055                        agent_def_name,
1056                        prompt,
1057                    } => {
1058                        let (success, fail, done) = self
1059                            .handle_scheduler_spawn_action(
1060                                scheduler,
1061                                task_id,
1062                                agent_def_name,
1063                                prompt,
1064                                &mut spawn_counter,
1065                                task_count,
1066                            )
1067                            .await;
1068                        any_spawn_success |= success;
1069                        any_concurrency_failure |= fail;
1070                        if let Some(s) = done {
1071                            break 'tick s;
1072                        }
1073                    }
1074                    SchedulerAction::Cancel { agent_handle_id } => {
1075                        if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1076                            // benign race: agent may have already finished
1077                            let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1078                                tracing::trace!(error = %e, "cancel: agent already gone");
1079                            });
1080                        }
1081                    }
1082                    // Inline execution: the LLM call blocks this tick loop for its
1083                    // duration. This is intentionally sequential and only expected in
1084                    // single-agent setups where no sub-agents are configured.
1085                    // Known limitation: if a RunInline action appears before Spawn actions
1086                    // in the same batch (mixed routing), those Spawn actions are delayed
1087                    // until the inline call completes. Refactor to tokio::spawn if mixed
1088                    // batches become common.
1089                    // TODO(post-MVP): wire CancellationToken into run_inline_tool_loop so
1090                    // that /plan cancel can interrupt a long-running inline LLM call instead
1091                    // of waiting for the current iteration to complete.
1092                    SchedulerAction::RunInline { task_id, prompt } => {
1093                        spawn_counter += 1;
1094                        self.handle_run_inline_action(
1095                            scheduler,
1096                            task_id,
1097                            prompt,
1098                            spawn_counter,
1099                            task_count,
1100                            &cancel_token,
1101                        )
1102                        .await;
1103                    }
1104                    SchedulerAction::Done { status } => {
1105                        break 'tick status;
1106                    }
1107                }
1108            }
1109
1110            // Update batch-level backoff counter after processing all Spawn actions.
1111            scheduler.record_batch_backoff(any_spawn_success, any_concurrency_failure);
1112
1113            // Drain all pending secret requests this tick (MED-2 fix).
1114            self.process_pending_secret_requests(&mut denied_secrets)
1115                .await;
1116
1117            // Update TUI with current graph state.
1118            let snapshot = crate::metrics::TaskGraphSnapshot::from(scheduler.graph());
1119            self.update_metrics(|m| {
1120                m.orchestration_graph = Some(snapshot);
1121            });
1122
1123            // NOTE(Telegram): Telegram's recv() is not fully cancel-safe — a message
1124            // consumed from the internal mpsc but not yet returned can be lost if the
1125            // select! cancels the future during the /start send().await path. For
1126            // non-command messages the race window is negligible. Acceptable for MVP.
1127            //
1128            // NOTE(RunInline): tasks in the RunInline arm above block this tick loop
1129            // synchronously (no await between loop iteration start and wait_event).
1130            // /plan cancel cannot interrupt an inline LLM call mid-execution; it is
1131            // delivered on the next tick after the inline call completes.
1132            // TODO(post-MVP): wire CancellationToken into run_inline_tool_loop.
1133            tokio::select! {
1134                // biased: token cancellation takes priority over new events and input.
1135                biased;
1136                () = cancel_token.cancelled() => {
1137                    let cancel_actions = scheduler.cancel_all();
1138                    for action in cancel_actions {
1139                        match action {
1140                            SchedulerAction::Cancel { agent_handle_id } => {
1141                                if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1142                                    let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1143                                        tracing::trace!(
1144                                            error = %e,
1145                                            "cancel during plan cancellation: agent already gone"
1146                                        );
1147                                    });
1148                                }
1149                            }
1150                            SchedulerAction::Done { status } => {
1151                                break 'tick status;
1152                            }
1153                            SchedulerAction::Spawn { .. } | SchedulerAction::RunInline { .. } => {}
1154                        }
1155                    }
1156                    // Defensive fallback: cancel_all always emits Done, but guard against
1157                    // future changes.
1158                    break 'tick crate::orchestration::GraphStatus::Canceled;
1159                }
1160                () = scheduler.wait_event() => {}
1161                result = self.channel.recv() => {
1162                    if let Ok(Some(msg)) = result {
1163                        if msg.text.trim().eq_ignore_ascii_case("/plan cancel") {
1164                            let _ = self.channel.send_status("Canceling plan...").await;
1165                            let cancel_actions = scheduler.cancel_all();
1166                            for ca in cancel_actions {
1167                                match ca {
1168                                    SchedulerAction::Cancel { agent_handle_id } => {
1169                                        if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1170                                            // benign race: agent may have already finished
1171                                            let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1172                                                tracing::trace!(error = %e, "cancel on user request: agent already gone");
1173                                            });
1174                                        }
1175                                    }
1176                                    SchedulerAction::Done { status } => {
1177                                        break 'tick status;
1178                                    }
1179                                    SchedulerAction::Spawn { .. }
1180                                    | SchedulerAction::RunInline { .. } => {}
1181                                }
1182                            }
1183                            // Defensive fallback: cancel_all always emits Done, but guard
1184                            // against future changes.
1185                            break 'tick crate::orchestration::GraphStatus::Canceled;
1186                        }
1187                        self.enqueue_or_merge(msg.text, vec![], msg.attachments);
1188                    } else {
1189                        // Channel closed — cancel running sub-agents and exit with Failed.
1190                        // This is an error condition (not a user-initiated cancel), so
1191                        // Done actions from cancel_all() are intentionally ignored.
1192                        let cancel_actions = scheduler.cancel_all();
1193                        let n = cancel_actions
1194                            .iter()
1195                            .filter(|a| matches!(a, SchedulerAction::Cancel { .. }))
1196                            .count();
1197                        tracing::warn!(sub_agents = n, "scheduler channel closed, canceling running sub-agents");
1198                        for action in cancel_actions {
1199                            match action {
1200                                SchedulerAction::Cancel { agent_handle_id } => {
1201                                    if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1202                                        let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1203                                            tracing::trace!(
1204                                                error = %e,
1205                                                "cancel on channel close: agent already gone"
1206                                            );
1207                                        });
1208                                    }
1209                                }
1210                                // Intentionally ignore Done here — channel close is not a user cancel.
1211                                SchedulerAction::Done { .. }
1212                                | SchedulerAction::Spawn { .. }
1213                                | SchedulerAction::RunInline { .. } => {}
1214                            }
1215                        }
1216                        break 'tick crate::orchestration::GraphStatus::Failed;
1217                    }
1218                }
1219                // Shutdown signal received — cancel running sub-agents and exit cleanly.
1220                () = shutdown_signal(&mut self.lifecycle.shutdown) => {
1221                    let cancel_actions = scheduler.cancel_all();
1222                    let n = cancel_actions
1223                        .iter()
1224                        .filter(|a| matches!(a, SchedulerAction::Cancel { .. }))
1225                        .count();
1226                    tracing::warn!(sub_agents = n, "shutdown signal received, canceling running sub-agents");
1227                    for action in cancel_actions {
1228                        match action {
1229                            SchedulerAction::Cancel { agent_handle_id } => {
1230                                if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1231                                    let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1232                                        tracing::trace!(
1233                                            error = %e,
1234                                            "cancel on shutdown: agent already gone"
1235                                        );
1236                                    });
1237                                }
1238                            }
1239                            SchedulerAction::Done { status } => {
1240                                break 'tick status;
1241                            }
1242                            SchedulerAction::Spawn { .. } | SchedulerAction::RunInline { .. } => {}
1243                        }
1244                    }
1245                    // Defensive fallback: cancel_all always emits Done, but guard against
1246                    // future changes.
1247                    break 'tick crate::orchestration::GraphStatus::Canceled;
1248                }
1249            }
1250        };
1251
1252        // Final drain: if the loop exited via Done on the first tick, secret
1253        // requests buffered before completion would otherwise be silently dropped.
1254        self.process_pending_secret_requests(&mut std::collections::HashSet::new())
1255            .await;
1256
1257        Ok(final_status)
1258    }
1259
1260    /// Run a tool-aware LLM loop for an inline scheduled task.
1261    ///
1262    /// Unlike [`process_response_native_tools`], this is intentionally stripped of all
1263    /// interactive-session machinery (channel sends, doom-loop detection, summarization,
1264    /// learning engine, sanitizer, metrics). Inline tasks are short-lived orchestration
1265    /// sub-tasks that run synchronously inside the scheduler tick loop.
1266    async fn run_inline_tool_loop(
1267        &self,
1268        prompt: &str,
1269        max_iterations: usize,
1270    ) -> Result<String, zeph_llm::LlmError> {
1271        use zeph_llm::provider::{ChatResponse, Message, MessagePart, Role, ToolDefinition};
1272        use zeph_tools::executor::ToolCall;
1273
1274        let tool_defs: Vec<ToolDefinition> = self
1275            .tool_executor
1276            .tool_definitions_erased()
1277            .iter()
1278            .map(tool_execution::tool_def_to_definition)
1279            .collect();
1280
1281        tracing::debug!(
1282            prompt_len = prompt.len(),
1283            max_iterations,
1284            tool_count = tool_defs.len(),
1285            "inline tool loop: starting"
1286        );
1287
1288        let mut messages: Vec<Message> = vec![Message::from_legacy(Role::User, prompt)];
1289        let mut last_text = String::new();
1290
1291        for iteration in 0..max_iterations {
1292            let response = self.provider.chat_with_tools(&messages, &tool_defs).await?;
1293
1294            match response {
1295                ChatResponse::Text(text) => {
1296                    tracing::debug!(iteration, "inline tool loop: text response, returning");
1297                    return Ok(text);
1298                }
1299                ChatResponse::ToolUse {
1300                    text, tool_calls, ..
1301                } => {
1302                    tracing::debug!(
1303                        iteration,
1304                        tools = ?tool_calls.iter().map(|tc| &tc.name).collect::<Vec<_>>(),
1305                        "inline tool loop: tool use"
1306                    );
1307
1308                    if let Some(ref t) = text {
1309                        last_text.clone_from(t);
1310                    }
1311
1312                    // Build assistant message with optional leading text + tool use parts.
1313                    let mut parts: Vec<MessagePart> = Vec::new();
1314                    if let Some(ref t) = text
1315                        && !t.is_empty()
1316                    {
1317                        parts.push(MessagePart::Text { text: t.clone() });
1318                    }
1319                    for tc in &tool_calls {
1320                        parts.push(MessagePart::ToolUse {
1321                            id: tc.id.clone(),
1322                            name: tc.name.clone(),
1323                            input: tc.input.clone(),
1324                        });
1325                    }
1326                    messages.push(Message::from_parts(Role::Assistant, parts));
1327
1328                    // Execute each tool call and collect results.
1329                    let mut result_parts: Vec<MessagePart> = Vec::new();
1330                    for tc in &tool_calls {
1331                        let call = ToolCall {
1332                            tool_id: tc.name.clone(),
1333                            params: match &tc.input {
1334                                serde_json::Value::Object(map) => map.clone(),
1335                                _ => serde_json::Map::new(),
1336                            },
1337                        };
1338                        let output = match self.tool_executor.execute_tool_call_erased(&call).await
1339                        {
1340                            Ok(Some(out)) => out.summary,
1341                            Ok(None) => "(no output)".to_owned(),
1342                            Err(e) => format!("[error] {e}"),
1343                        };
1344                        let is_error = output.starts_with("[error]");
1345                        result_parts.push(MessagePart::ToolResult {
1346                            tool_use_id: tc.id.clone(),
1347                            content: output,
1348                            is_error,
1349                        });
1350                    }
1351                    messages.push(Message::from_parts(Role::User, result_parts));
1352                }
1353            }
1354        }
1355
1356        tracing::debug!(
1357            max_iterations,
1358            last_text_empty = last_text.is_empty(),
1359            "inline tool loop: iteration limit reached"
1360        );
1361        Ok(last_text)
1362    }
1363
1364    /// Bridge pending secret requests from sub-agents to the user (non-blocking, time-bounded).
1365    ///
1366    /// SEC-P1-02: explicit user confirmation is required before granting any secret to a
1367    /// sub-agent. Denial is the default on timeout or channel error.
1368    ///
1369    /// `denied` tracks `(handle_id, secret_key)` pairs already denied this plan execution.
1370    /// Re-requests for a denied pair are auto-denied without prompting the user.
1371    async fn process_pending_secret_requests(
1372        &mut self,
1373        denied: &mut std::collections::HashSet<(String, String)>,
1374    ) {
1375        loop {
1376            let pending = self
1377                .orchestration
1378                .subagent_manager
1379                .as_mut()
1380                .and_then(crate::subagent::SubAgentManager::try_recv_secret_request);
1381            let Some((req_handle_id, req)) = pending else {
1382                break;
1383            };
1384            let deny_key = (req_handle_id.clone(), req.secret_key.clone());
1385            if denied.contains(&deny_key) {
1386                tracing::debug!(
1387                    handle_id = %req_handle_id,
1388                    secret_key = %req.secret_key,
1389                    "skipping duplicate secret prompt for already-denied key"
1390                );
1391                if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1392                    let _ = mgr.deny_secret(&req_handle_id);
1393                }
1394                continue;
1395            }
1396            let prompt = format!(
1397                "Sub-agent requests secret '{}'. Allow?{}",
1398                crate::text::truncate_to_chars(&req.secret_key, 100),
1399                req.reason
1400                    .as_deref()
1401                    .map(|r| format!(" Reason: {}", crate::text::truncate_to_chars(r, 200)))
1402                    .unwrap_or_default()
1403            );
1404            // CRIT-1 fix: use select! to avoid blocking the tick loop forever.
1405            let approved = tokio::select! {
1406                result = self.channel.confirm(&prompt) => result.unwrap_or(false),
1407                () = tokio::time::sleep(std::time::Duration::from_secs(120)) => {
1408                    let _ = self.channel.send("Secret request timed out.").await;
1409                    false
1410                }
1411            };
1412            if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1413                if approved {
1414                    let ttl = std::time::Duration::from_secs(300);
1415                    let key = req.secret_key.clone();
1416                    if mgr.approve_secret(&req_handle_id, &key, ttl).is_ok() {
1417                        let _ = mgr.deliver_secret(&req_handle_id, key);
1418                    }
1419                } else {
1420                    denied.insert(deny_key);
1421                    let _ = mgr.deny_secret(&req_handle_id);
1422                }
1423            }
1424        }
1425    }
1426
1427    /// Aggregate results or report failure after the tick loop completes.
1428    async fn finalize_plan_execution(
1429        &mut self,
1430        completed_graph: crate::orchestration::TaskGraph,
1431        final_status: crate::orchestration::GraphStatus,
1432    ) -> Result<&'static str, error::AgentError> {
1433        use std::fmt::Write;
1434
1435        use crate::orchestration::{Aggregator, GraphStatus, LlmAggregator};
1436
1437        let result_label = match final_status {
1438            GraphStatus::Completed => {
1439                // Update task completion counters.
1440                let completed_count = completed_graph
1441                    .tasks
1442                    .iter()
1443                    .filter(|t| t.status == crate::orchestration::TaskStatus::Completed)
1444                    .count() as u64;
1445                self.update_metrics(|m| m.orchestration.tasks_completed += completed_count);
1446
1447                let aggregator = LlmAggregator::new(
1448                    self.provider.clone(),
1449                    &self.orchestration.orchestration_config,
1450                );
1451                match aggregator.aggregate(&completed_graph).await {
1452                    Ok(synthesis) => {
1453                        self.channel.send(&synthesis).await?;
1454                    }
1455                    Err(e) => {
1456                        tracing::error!(error = %e, "aggregation failed");
1457                        self.channel
1458                            .send(
1459                                "Plan completed but aggregation failed. \
1460                                 Check individual task results.",
1461                            )
1462                            .await?;
1463                    }
1464                }
1465                "completed"
1466            }
1467            GraphStatus::Failed => {
1468                let failed_tasks: Vec<_> = completed_graph
1469                    .tasks
1470                    .iter()
1471                    .filter(|t| t.status == crate::orchestration::TaskStatus::Failed)
1472                    .collect();
1473                self.update_metrics(|m| {
1474                    m.orchestration.tasks_failed += failed_tasks.len() as u64;
1475                });
1476                let mut msg = format!(
1477                    "Plan failed. {}/{} tasks failed:\n",
1478                    failed_tasks.len(),
1479                    completed_graph.tasks.len()
1480                );
1481                for t in &failed_tasks {
1482                    // SEC-M34-002: truncate raw task output before displaying to user.
1483                    let err: std::borrow::Cow<str> =
1484                        t.result.as_ref().map_or("unknown error".into(), |r| {
1485                            if r.output.len() > 500 {
1486                                r.output.chars().take(500).collect::<String>().into()
1487                            } else {
1488                                r.output.as_str().into()
1489                            }
1490                        });
1491                    let _ = writeln!(msg, "  - {}: {err}", t.title);
1492                }
1493                msg.push_str("\nUse `/plan retry` to retry failed tasks.");
1494                self.channel.send(&msg).await?;
1495                // Store graph back so /plan retry and /plan resume work.
1496                self.orchestration.pending_graph = Some(completed_graph);
1497                "failed"
1498            }
1499            GraphStatus::Paused => {
1500                self.channel
1501                    .send(
1502                        "Plan paused due to a task failure (ask strategy). \
1503                         Use `/plan resume` to continue or `/plan retry` to retry failed tasks.",
1504                    )
1505                    .await?;
1506                self.orchestration.pending_graph = Some(completed_graph);
1507                "paused"
1508            }
1509            GraphStatus::Canceled => {
1510                let done_count = completed_graph
1511                    .tasks
1512                    .iter()
1513                    .filter(|t| t.status == crate::orchestration::TaskStatus::Completed)
1514                    .count();
1515                self.update_metrics(|m| m.orchestration.tasks_completed += done_count as u64);
1516                let total = completed_graph.tasks.len();
1517                self.channel
1518                    .send(&format!(
1519                        "Plan canceled. {done_count}/{total} tasks completed before cancellation."
1520                    ))
1521                    .await?;
1522                // Do NOT store graph back into pending_graph — canceled plans are not
1523                // retryable via /plan retry.
1524                "canceled"
1525            }
1526            other => {
1527                tracing::warn!(%other, "unexpected graph status after Done");
1528                self.channel
1529                    .send(&format!("Plan ended with status: {other}"))
1530                    .await?;
1531                "unknown"
1532            }
1533        };
1534        Ok(result_label)
1535    }
1536
1537    async fn handle_plan_status(
1538        &mut self,
1539        _graph_id: Option<&str>,
1540    ) -> Result<(), error::AgentError> {
1541        use crate::orchestration::GraphStatus;
1542        let Some(ref graph) = self.orchestration.pending_graph else {
1543            self.channel.send("No active plan.").await?;
1544            return Ok(());
1545        };
1546        let msg = match graph.status {
1547            GraphStatus::Created => {
1548                "A plan is awaiting confirmation. Type `/plan confirm` to execute or `/plan cancel` to abort."
1549            }
1550            GraphStatus::Running => "Plan is currently running.",
1551            GraphStatus::Paused => {
1552                "Plan is paused. Use `/plan resume` to continue or `/plan cancel` to abort."
1553            }
1554            GraphStatus::Failed => {
1555                "Plan failed. Use `/plan retry` to retry or `/plan cancel` to discard."
1556            }
1557            GraphStatus::Completed => "Plan completed successfully.",
1558            GraphStatus::Canceled => "Plan was canceled.",
1559        };
1560        self.channel.send(msg).await?;
1561        Ok(())
1562    }
1563
1564    async fn handle_plan_list(&mut self) -> Result<(), error::AgentError> {
1565        if let Some(ref graph) = self.orchestration.pending_graph {
1566            let summary = format_plan_summary(graph);
1567            let status_label = match graph.status {
1568                crate::orchestration::GraphStatus::Created => "awaiting confirmation",
1569                crate::orchestration::GraphStatus::Running => "running",
1570                crate::orchestration::GraphStatus::Paused => "paused",
1571                crate::orchestration::GraphStatus::Failed => "failed (retryable)",
1572                _ => "unknown",
1573            };
1574            self.channel
1575                .send(&format!("{summary}\nStatus: {status_label}"))
1576                .await?;
1577        } else {
1578            self.channel.send("No recent plans.").await?;
1579        }
1580        Ok(())
1581    }
1582
1583    async fn handle_plan_cancel(
1584        &mut self,
1585        _graph_id: Option<&str>,
1586    ) -> Result<(), error::AgentError> {
1587        if let Some(token) = self.orchestration.plan_cancel_token.take() {
1588            // In-flight plan: signal cancellation. The scheduler loop will pick this up
1589            // in the next tokio::select! iteration at wait_event().
1590            // NOTE: Due to &mut self being held by run_scheduler_loop, this branch is only
1591            // reachable if the channel has a concurrent reader (e.g. Telegram, TUI events).
1592            // CLI and synchronous channels cannot deliver this while the loop is active
1593            // (see #1603, SEC-M34-002).
1594            token.cancel();
1595            self.channel.send("Canceling plan execution...").await?;
1596        } else if self.orchestration.pending_graph.take().is_some() {
1597            let now = std::time::Instant::now();
1598            self.update_metrics(|m| {
1599                if let Some(ref mut s) = m.orchestration_graph {
1600                    "canceled".clone_into(&mut s.status);
1601                    s.completed_at = Some(now);
1602                }
1603            });
1604            self.channel.send("Plan canceled.").await?;
1605        } else {
1606            self.channel.send("No active plan to cancel.").await?;
1607        }
1608        Ok(())
1609    }
1610
1611    /// Resume a paused graph (Ask failure strategy triggered a pause).
1612    ///
1613    /// Looks for a pending graph in `Paused` status. If `graph_id` is provided
1614    /// it must match the active graph's id (SEC-P5-03).
1615    async fn handle_plan_resume(
1616        &mut self,
1617        graph_id: Option<&str>,
1618    ) -> Result<(), error::AgentError> {
1619        use crate::orchestration::GraphStatus;
1620
1621        let Some(ref graph) = self.orchestration.pending_graph else {
1622            self.channel
1623                .send("No paused plan to resume. Use `/plan status` to check the current state.")
1624                .await?;
1625            return Ok(());
1626        };
1627
1628        // SEC-P5-03: if a graph_id was provided, reject if it doesn't match.
1629        if let Some(id) = graph_id
1630            && graph.id.to_string() != id
1631        {
1632            self.channel
1633                .send(&format!(
1634                    "Graph id '{id}' does not match the active plan ({}). \
1635                     Use `/plan status` to see the active plan id.",
1636                    graph.id
1637                ))
1638                .await?;
1639            return Ok(());
1640        }
1641
1642        if graph.status != GraphStatus::Paused {
1643            self.channel
1644                .send(&format!(
1645                    "The active plan is in '{}' status and cannot be resumed. \
1646                     Only Paused plans can be resumed.",
1647                    graph.status
1648                ))
1649                .await?;
1650            return Ok(());
1651        }
1652
1653        let graph = self.orchestration.pending_graph.take().unwrap();
1654
1655        tracing::info!(
1656            graph_id = %graph.id,
1657            "resuming paused graph"
1658        );
1659
1660        self.channel
1661            .send(&format!(
1662                "Resuming plan: {}\nUse `/plan confirm` to continue execution.",
1663                graph.goal
1664            ))
1665            .await?;
1666
1667        // Store resumed graph back as pending. resume_from() will set status=Running in confirm.
1668        self.orchestration.pending_graph = Some(graph);
1669        Ok(())
1670    }
1671
1672    /// Retry failed tasks in a graph.
1673    ///
1674    /// Resets all `Failed` tasks to `Ready` and all `Skipped` dependents back
1675    /// to `Pending`, then re-stores the graph as pending for re-execution.
1676    /// If `graph_id` is provided it must match the active graph's id (SEC-P5-04).
1677    async fn handle_plan_retry(&mut self, graph_id: Option<&str>) -> Result<(), error::AgentError> {
1678        use crate::orchestration::{GraphStatus, dag};
1679
1680        let Some(ref graph) = self.orchestration.pending_graph else {
1681            self.channel
1682                .send("No active plan to retry. Use `/plan status` to check the current state.")
1683                .await?;
1684            return Ok(());
1685        };
1686
1687        // SEC-P5-04: if a graph_id was provided, reject if it doesn't match.
1688        if let Some(id) = graph_id
1689            && graph.id.to_string() != id
1690        {
1691            self.channel
1692                .send(&format!(
1693                    "Graph id '{id}' does not match the active plan ({}). \
1694                     Use `/plan status` to see the active plan id.",
1695                    graph.id
1696                ))
1697                .await?;
1698            return Ok(());
1699        }
1700
1701        if graph.status != GraphStatus::Failed && graph.status != GraphStatus::Paused {
1702            self.channel
1703                .send(&format!(
1704                    "The active plan is in '{}' status. Only Failed or Paused plans can be retried.",
1705                    graph.status
1706                ))
1707                .await?;
1708            return Ok(());
1709        }
1710
1711        let mut graph = self.orchestration.pending_graph.take().unwrap();
1712
1713        // IC3: count before reset so the message reflects actual failed tasks, not Ready count.
1714        let failed_count = graph
1715            .tasks
1716            .iter()
1717            .filter(|t| t.status == crate::orchestration::TaskStatus::Failed)
1718            .count();
1719
1720        dag::reset_for_retry(&mut graph).map_err(|e| error::AgentError::Other(e.to_string()))?;
1721
1722        // HIGH-1 fix: reset_for_retry only resets Failed/Canceled tasks. Any tasks that were
1723        // in Running state at pause time are left as Running with stale assigned_agent handles
1724        // (those sub-agents are long dead). Reset them to Ready so resume_from() does not try
1725        // to wait for their events.
1726        for task in &mut graph.tasks {
1727            if task.status == crate::orchestration::TaskStatus::Running {
1728                task.status = crate::orchestration::TaskStatus::Ready;
1729                task.assigned_agent = None;
1730            }
1731        }
1732
1733        tracing::info!(
1734            graph_id = %graph.id,
1735            failed_count,
1736            "retrying failed tasks in graph"
1737        );
1738
1739        self.channel
1740            .send(&format!(
1741                "Retrying {failed_count} failed task(s) in plan: {}\n\
1742                 Use `/plan confirm` to execute.",
1743                graph.goal
1744            ))
1745            .await?;
1746
1747        // Store retried graph back for re-execution via /plan confirm.
1748        self.orchestration.pending_graph = Some(graph);
1749        Ok(())
1750    }
1751
1752    pub async fn shutdown(&mut self) {
1753        self.channel.send("Shutting down...").await.ok();
1754
1755        // CRIT-1: persist Thompson state accumulated during this session.
1756        self.provider.save_router_state();
1757
1758        if let Some(ref mut mgr) = self.orchestration.subagent_manager {
1759            mgr.shutdown_all();
1760        }
1761
1762        if let Some(ref manager) = self.mcp.manager {
1763            manager.shutdown_all_shared().await;
1764        }
1765
1766        // Finalize compaction trajectory: push the last open segment into the Vec.
1767        // This segment would otherwise only be pushed when the next hard compaction fires,
1768        // which never happens at session end.
1769        if let Some(turns) = self.context_manager.turns_since_last_hard_compaction {
1770            self.update_metrics(|m| {
1771                m.compaction_turns_after_hard.push(turns);
1772            });
1773            self.context_manager.turns_since_last_hard_compaction = None;
1774        }
1775
1776        if let Some(ref tx) = self.metrics.metrics_tx {
1777            let m = tx.borrow();
1778            if m.filter_applications > 0 {
1779                #[allow(clippy::cast_precision_loss)]
1780                let pct = if m.filter_raw_tokens > 0 {
1781                    m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
1782                } else {
1783                    0.0
1784                };
1785                tracing::info!(
1786                    raw_tokens = m.filter_raw_tokens,
1787                    saved_tokens = m.filter_saved_tokens,
1788                    applications = m.filter_applications,
1789                    "tool output filtering saved ~{} tokens ({pct:.0}%)",
1790                    m.filter_saved_tokens,
1791                );
1792            }
1793            if m.compaction_hard_count > 0 {
1794                tracing::info!(
1795                    hard_compactions = m.compaction_hard_count,
1796                    turns_after_hard = ?m.compaction_turns_after_hard,
1797                    "hard compaction trajectory"
1798                );
1799            }
1800        }
1801        tracing::info!("agent shutdown complete");
1802    }
1803
1804    /// Run the chat loop, receiving messages via the channel until EOF or shutdown.
1805    ///
1806    /// # Errors
1807    ///
1808    /// Returns an error if channel I/O or LLM communication fails.
1809    /// Refresh sub-agent metrics snapshot for the TUI metrics panel.
1810    fn refresh_subagent_metrics(&mut self) {
1811        let Some(ref mgr) = self.orchestration.subagent_manager else {
1812            return;
1813        };
1814        let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
1815            .statuses()
1816            .into_iter()
1817            .map(|(id, s)| {
1818                let def = mgr.agents_def(&id);
1819                crate::metrics::SubAgentMetrics {
1820                    name: def.map_or_else(|| id[..8.min(id.len())].to_owned(), |d| d.name.clone()),
1821                    id: id.clone(),
1822                    state: format!("{:?}", s.state).to_lowercase(),
1823                    turns_used: s.turns_used,
1824                    max_turns: def.map_or(20, |d| d.permissions.max_turns),
1825                    background: def.is_some_and(|d| d.permissions.background),
1826                    elapsed_secs: s.started_at.elapsed().as_secs(),
1827                    permission_mode: def.map_or_else(String::new, |d| {
1828                        use crate::subagent::def::PermissionMode;
1829                        match d.permissions.permission_mode {
1830                            PermissionMode::Default => String::new(),
1831                            PermissionMode::AcceptEdits => "accept_edits".into(),
1832                            PermissionMode::DontAsk => "dont_ask".into(),
1833                            PermissionMode::BypassPermissions => "bypass_permissions".into(),
1834                            PermissionMode::Plan => "plan".into(),
1835                        }
1836                    }),
1837                }
1838            })
1839            .collect();
1840        self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
1841    }
1842
1843    /// Non-blocking poll: notify the user when background sub-agents complete.
1844    async fn notify_completed_subagents(&mut self) -> Result<(), error::AgentError> {
1845        let completed = self.poll_subagents().await;
1846        for (task_id, result) in completed {
1847            let notice = if result.is_empty() {
1848                format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
1849            } else {
1850                format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
1851            };
1852            if let Err(e) = self.channel.send(&notice).await {
1853                tracing::warn!(error = %e, "failed to send sub-agent completion notice");
1854            }
1855        }
1856        Ok(())
1857    }
1858
1859    /// Run the agent main loop.
1860    ///
1861    /// # Errors
1862    ///
1863    /// Returns an error if the channel, LLM provider, or tool execution encounters a fatal error.
1864    pub async fn run(&mut self) -> Result<(), error::AgentError> {
1865        if let Some(mut rx) = self.lifecycle.warmup_ready.take()
1866            && !*rx.borrow()
1867        {
1868            let _ = rx.changed().await;
1869            if !*rx.borrow() {
1870                tracing::warn!("model warmup did not complete successfully");
1871            }
1872        }
1873
1874        loop {
1875            // Apply any pending provider override (from ACP set_session_config_option).
1876            if let Some(ref slot) = self.providers.provider_override
1877                && let Some(new_provider) = slot
1878                    .write()
1879                    .unwrap_or_else(std::sync::PoisonError::into_inner)
1880                    .take()
1881            {
1882                tracing::debug!(provider = new_provider.name(), "ACP model override applied");
1883                self.provider = new_provider;
1884            }
1885
1886            // Poll for MCP tool list updates from tools/list_changed notifications.
1887            self.check_tool_refresh().await;
1888
1889            // Refresh sub-agent status in metrics before polling.
1890            self.refresh_subagent_metrics();
1891
1892            // Non-blocking poll: notify user when background sub-agents complete.
1893            self.notify_completed_subagents().await?;
1894
1895            self.drain_channel();
1896
1897            let (text, image_parts) = if let Some(queued) = self.message_queue.pop_front() {
1898                self.notify_queue_count().await;
1899                if queued.raw_attachments.is_empty() {
1900                    (queued.text, queued.image_parts)
1901                } else {
1902                    let msg = crate::channel::ChannelMessage {
1903                        text: queued.text,
1904                        attachments: queued.raw_attachments,
1905                    };
1906                    self.resolve_message(msg).await
1907                }
1908            } else {
1909                let incoming = tokio::select! {
1910                    result = self.channel.recv() => result?,
1911                    () = shutdown_signal(&mut self.lifecycle.shutdown) => {
1912                        tracing::info!("shutting down");
1913                        break;
1914                    }
1915                    Some(_) = recv_optional(&mut self.skill_state.skill_reload_rx) => {
1916                        self.reload_skills().await;
1917                        continue;
1918                    }
1919                    Some(_) = recv_optional(&mut self.instruction_reload_rx) => {
1920                        self.reload_instructions();
1921                        continue;
1922                    }
1923                    Some(_) = recv_optional(&mut self.lifecycle.config_reload_rx) => {
1924                        self.reload_config();
1925                        continue;
1926                    }
1927                    Some(msg) = recv_optional(&mut self.lifecycle.update_notify_rx) => {
1928                        if let Err(e) = self.channel.send(&msg).await {
1929                            tracing::warn!("failed to send update notification: {e}");
1930                        }
1931                        continue;
1932                    }
1933                    Some(msg) = recv_optional(&mut self.experiment_notify_rx) => {
1934                        // Experiment engine completed (ok or err). Clear the cancel token so
1935                        // status reports idle and new experiments can be started.
1936                        #[cfg(feature = "experiments")]
1937                        { self.experiment_cancel = None; }
1938                        if let Err(e) = self.channel.send(&msg).await {
1939                            tracing::warn!("failed to send experiment completion: {e}");
1940                        }
1941                        continue;
1942                    }
1943                    Some(prompt) = recv_optional(&mut self.lifecycle.custom_task_rx) => {
1944                        tracing::info!("scheduler: injecting custom task as agent turn");
1945                        let text = format!("[Scheduled task] {prompt}");
1946                        Some(crate::channel::ChannelMessage { text, attachments: Vec::new() })
1947                    }
1948                };
1949                let Some(msg) = incoming else { break };
1950                self.drain_channel();
1951                self.resolve_message(msg).await
1952            };
1953
1954            let trimmed = text.trim();
1955
1956            match self.handle_builtin_command(trimmed).await? {
1957                Some(true) => break,
1958                Some(false) => continue,
1959                None => {}
1960            }
1961
1962            self.process_user_message(text, image_parts).await?;
1963        }
1964
1965        // Flush trace collector on normal exit (C-04: Drop handles error/panic paths).
1966        if let Some(ref mut tc) = self.debug_state.trace_collector {
1967            tc.finish();
1968        }
1969
1970        Ok(())
1971    }
1972
1973    /// Handle built-in slash commands that short-circuit the main `run` loop.
1974    ///
1975    /// Returns `Some(true)` to break the loop (exit), `Some(false)` to continue to the next
1976    /// iteration, or `None` if the command was not recognized (caller should call
1977    /// `process_user_message`).
1978    async fn handle_builtin_command(
1979        &mut self,
1980        trimmed: &str,
1981    ) -> Result<Option<bool>, error::AgentError> {
1982        if trimmed == "/clear-queue" {
1983            let n = self.clear_queue();
1984            self.notify_queue_count().await;
1985            self.channel
1986                .send(&format!("Cleared {n} queued messages."))
1987                .await?;
1988            let _ = self.channel.flush_chunks().await;
1989            return Ok(Some(false));
1990        }
1991
1992        if trimmed == "/compact" {
1993            if self.messages.len() > self.context_manager.compaction_preserve_tail + 1 {
1994                match self.compact_context().await {
1995                    Ok(()) => {
1996                        let _ = self.channel.send("Context compacted successfully.").await;
1997                    }
1998                    Err(e) => {
1999                        let _ = self.channel.send(&format!("Compaction failed: {e}")).await;
2000                    }
2001                }
2002            } else {
2003                let _ = self.channel.send("Nothing to compact.").await;
2004            }
2005            let _ = self.channel.flush_chunks().await;
2006            return Ok(Some(false));
2007        }
2008
2009        if trimmed == "/clear" {
2010            self.clear_history();
2011            let _ = self.channel.flush_chunks().await;
2012            return Ok(Some(false));
2013        }
2014
2015        if trimmed == "/model" || trimmed.starts_with("/model ") {
2016            self.handle_model_command(trimmed).await;
2017            let _ = self.channel.flush_chunks().await;
2018            return Ok(Some(false));
2019        }
2020
2021        if trimmed == "/debug-dump" || trimmed.starts_with("/debug-dump ") {
2022            self.handle_debug_dump_command(trimmed).await;
2023            let _ = self.channel.flush_chunks().await;
2024            return Ok(Some(false));
2025        }
2026
2027        if trimmed.starts_with("/dump-format") {
2028            self.handle_dump_format_command(trimmed).await;
2029            let _ = self.channel.flush_chunks().await;
2030            return Ok(Some(false));
2031        }
2032
2033        if trimmed == "/exit" || trimmed == "/quit" {
2034            if self.channel.supports_exit() {
2035                return Ok(Some(true));
2036            }
2037            let _ = self
2038                .channel
2039                .send("/exit is not supported in this channel.")
2040                .await;
2041            return Ok(Some(false));
2042        }
2043
2044        Ok(None)
2045    }
2046
2047    /// Switch the active provider to one serving `model_id`.
2048    ///
2049    /// Looks up the model in the provider's remote model list (or cache).
2050    ///
2051    /// # Errors
2052    ///
2053    /// Returns `Err` if the model is not found.
2054    pub fn set_model(&mut self, model_id: &str) -> Result<(), String> {
2055        if model_id.is_empty() {
2056            return Err("model id must not be empty".to_string());
2057        }
2058        if model_id.len() > 256 {
2059            return Err("model id exceeds maximum length of 256 characters".to_string());
2060        }
2061        if !model_id
2062            .chars()
2063            .all(|c| c.is_ascii() && !c.is_ascii_control())
2064        {
2065            return Err("model id must contain only printable ASCII characters".to_string());
2066        }
2067        self.runtime.model_name = model_id.to_string();
2068        tracing::info!(model = model_id, "set_model called");
2069        Ok(())
2070    }
2071
2072    async fn handle_model_refresh(&mut self) {
2073        // Invalidate all model cache files in the cache directory.
2074        if let Some(cache_dir) = dirs::cache_dir() {
2075            let models_dir = cache_dir.join("zeph").join("models");
2076            if let Ok(entries) = std::fs::read_dir(&models_dir) {
2077                for entry in entries.flatten() {
2078                    let path = entry.path();
2079                    if path.extension().and_then(|e| e.to_str()) == Some("json") {
2080                        let _ = std::fs::remove_file(&path);
2081                    }
2082                }
2083            }
2084        }
2085        match self.provider.list_models_remote().await {
2086            Ok(models) => {
2087                let _ = self
2088                    .channel
2089                    .send(&format!("Fetched {} models.", models.len()))
2090                    .await;
2091            }
2092            Err(e) => {
2093                let _ = self
2094                    .channel
2095                    .send(&format!("Error fetching models: {e}"))
2096                    .await;
2097            }
2098        }
2099    }
2100
2101    async fn handle_model_list(&mut self) {
2102        let cache = zeph_llm::model_cache::ModelCache::for_slug(self.provider.name());
2103        let cached = if cache.is_stale() {
2104            None
2105        } else {
2106            cache.load().unwrap_or(None)
2107        };
2108        let models = if let Some(m) = cached {
2109            m
2110        } else {
2111            match self.provider.list_models_remote().await {
2112                Ok(m) => m,
2113                Err(e) => {
2114                    let _ = self
2115                        .channel
2116                        .send(&format!("Error fetching models: {e}"))
2117                        .await;
2118                    return;
2119                }
2120            }
2121        };
2122        if models.is_empty() {
2123            let _ = self.channel.send("No models available.").await;
2124            return;
2125        }
2126        let mut lines = vec!["Available models:".to_string()];
2127        for (i, m) in models.iter().enumerate() {
2128            lines.push(format!("  {}. {} ({})", i + 1, m.display_name, m.id));
2129        }
2130        let _ = self.channel.send(&lines.join("\n")).await;
2131    }
2132
2133    async fn handle_model_switch(&mut self, model_id: &str) {
2134        // Validate model_id against the known model list before switching.
2135        // Try disk cache first; fall back to a remote fetch if the cache is stale.
2136        let cache = zeph_llm::model_cache::ModelCache::for_slug(self.provider.name());
2137        let known_models: Option<Vec<zeph_llm::model_cache::RemoteModelInfo>> = if cache.is_stale()
2138        {
2139            match self.provider.list_models_remote().await {
2140                Ok(m) if !m.is_empty() => Some(m),
2141                _ => None,
2142            }
2143        } else {
2144            cache.load().unwrap_or(None)
2145        };
2146        if let Some(models) = known_models {
2147            if !models.iter().any(|m| m.id == model_id) {
2148                let mut lines = vec![format!("Unknown model '{model_id}'. Available models:")];
2149                for m in &models {
2150                    lines.push(format!("  • {} ({})", m.display_name, m.id));
2151                }
2152                let _ = self.channel.send(&lines.join("\n")).await;
2153                return;
2154            }
2155        } else {
2156            let _ = self
2157                .channel
2158                .send(
2159                    "Model list unavailable, switching anyway — verify your model name is correct.",
2160                )
2161                .await;
2162        }
2163        match self.set_model(model_id) {
2164            Ok(()) => {
2165                let _ = self
2166                    .channel
2167                    .send(&format!("Switched to model: {model_id}"))
2168                    .await;
2169            }
2170            Err(e) => {
2171                let _ = self.channel.send(&format!("Error: {e}")).await;
2172            }
2173        }
2174    }
2175
2176    /// Handle `/model`, `/model <id>`, and `/model refresh` commands.
2177    async fn handle_model_command(&mut self, trimmed: &str) {
2178        let arg = trimmed.strip_prefix("/model").map_or("", str::trim);
2179        if arg == "refresh" {
2180            self.handle_model_refresh().await;
2181        } else if arg.is_empty() {
2182            self.handle_model_list().await;
2183        } else {
2184            self.handle_model_switch(arg).await;
2185        }
2186    }
2187
2188    /// Handle `/debug-dump` and `/debug-dump <path>` commands.
2189    async fn handle_debug_dump_command(&mut self, trimmed: &str) {
2190        let arg = trimmed.strip_prefix("/debug-dump").map_or("", str::trim);
2191        if arg.is_empty() {
2192            match &self.debug_state.debug_dumper {
2193                Some(d) => {
2194                    let _ = self
2195                        .channel
2196                        .send(&format!("Debug dump active: {}", d.dir().display()))
2197                        .await;
2198                }
2199                None => {
2200                    let _ = self
2201                        .channel
2202                        .send(
2203                            "Debug dump is inactive. Use `/debug-dump <path>` to enable, \
2204                             or start with `--debug-dump [dir]`.",
2205                        )
2206                        .await;
2207                }
2208            }
2209            return;
2210        }
2211        let dir = std::path::PathBuf::from(arg);
2212        match crate::debug_dump::DebugDumper::new(&dir, self.debug_state.dump_format) {
2213            Ok(dumper) => {
2214                let path = dumper.dir().display().to_string();
2215                self.debug_state.debug_dumper = Some(dumper);
2216                let _ = self
2217                    .channel
2218                    .send(&format!("Debug dump enabled: {path}"))
2219                    .await;
2220            }
2221            Err(e) => {
2222                let _ = self
2223                    .channel
2224                    .send(&format!("Failed to enable debug dump: {e}"))
2225                    .await;
2226            }
2227        }
2228    }
2229
2230    /// Handle `/dump-format <json|raw|trace>` command — switch debug dump format at runtime.
2231    async fn handle_dump_format_command(&mut self, trimmed: &str) {
2232        let arg = trimmed.strip_prefix("/dump-format").map_or("", str::trim);
2233        if arg.is_empty() {
2234            let _ = self
2235                .channel
2236                .send(&format!(
2237                    "Current dump format: {:?}. Use `/dump-format json|raw|trace` to change.",
2238                    self.debug_state.dump_format
2239                ))
2240                .await;
2241            return;
2242        }
2243        let new_format = match arg {
2244            "json" => crate::debug_dump::DumpFormat::Json,
2245            "raw" => crate::debug_dump::DumpFormat::Raw,
2246            "trace" => crate::debug_dump::DumpFormat::Trace,
2247            other => {
2248                let _ = self
2249                    .channel
2250                    .send(&format!(
2251                        "Unknown format '{other}'. Valid values: json, raw, trace."
2252                    ))
2253                    .await;
2254                return;
2255            }
2256        };
2257        let was_trace = self.debug_state.dump_format == crate::debug_dump::DumpFormat::Trace;
2258        let now_trace = new_format == crate::debug_dump::DumpFormat::Trace;
2259
2260        // CR-04: when switching TO trace, create a fresh TracingCollector.
2261        if now_trace
2262            && !was_trace
2263            && let Some(ref dump_dir) = self.debug_state.dump_dir.clone()
2264        {
2265            let service_name = self.debug_state.trace_service_name.clone();
2266            let redact = self.debug_state.trace_redact;
2267            match crate::debug_dump::trace::TracingCollector::new(
2268                dump_dir.as_path(),
2269                &service_name,
2270                redact,
2271                None,
2272            ) {
2273                Ok(collector) => {
2274                    self.debug_state.trace_collector = Some(collector);
2275                }
2276                Err(e) => {
2277                    tracing::warn!(error = %e, "failed to create TracingCollector on format switch");
2278                }
2279            }
2280        }
2281        // CR-04: when switching AWAY from trace, flush and drop the collector.
2282        if was_trace
2283            && !now_trace
2284            && let Some(mut tc) = self.debug_state.trace_collector.take()
2285        {
2286            tc.finish();
2287        }
2288
2289        self.debug_state.dump_format = new_format;
2290        let _ = self
2291            .channel
2292            .send(&format!("Debug dump format set to: {arg}"))
2293            .await;
2294    }
2295
2296    async fn resolve_message(
2297        &self,
2298        msg: crate::channel::ChannelMessage,
2299    ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
2300        use crate::channel::{Attachment, AttachmentKind};
2301        use zeph_llm::provider::{ImageData, MessagePart};
2302
2303        let text_base = msg.text.clone();
2304
2305        let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
2306            .attachments
2307            .into_iter()
2308            .partition(|a| a.kind == AttachmentKind::Audio);
2309
2310        tracing::debug!(
2311            audio = audio_attachments.len(),
2312            has_stt = self.providers.stt.is_some(),
2313            "resolve_message attachments"
2314        );
2315
2316        let text = if !audio_attachments.is_empty()
2317            && let Some(stt) = self.providers.stt.as_ref()
2318        {
2319            let mut transcribed_parts = Vec::new();
2320            for attachment in &audio_attachments {
2321                if attachment.data.len() > MAX_AUDIO_BYTES {
2322                    tracing::warn!(
2323                        size = attachment.data.len(),
2324                        max = MAX_AUDIO_BYTES,
2325                        "audio attachment exceeds size limit, skipping"
2326                    );
2327                    continue;
2328                }
2329                match stt
2330                    .transcribe(&attachment.data, attachment.filename.as_deref())
2331                    .await
2332                {
2333                    Ok(result) => {
2334                        tracing::info!(
2335                            len = result.text.len(),
2336                            language = ?result.language,
2337                            "audio transcribed"
2338                        );
2339                        transcribed_parts.push(result.text);
2340                    }
2341                    Err(e) => {
2342                        tracing::error!(error = %e, "audio transcription failed");
2343                    }
2344                }
2345            }
2346            if transcribed_parts.is_empty() {
2347                text_base
2348            } else {
2349                let transcribed = transcribed_parts.join("\n");
2350                if text_base.is_empty() {
2351                    transcribed
2352                } else {
2353                    format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
2354                }
2355            }
2356        } else {
2357            if !audio_attachments.is_empty() {
2358                tracing::warn!(
2359                    count = audio_attachments.len(),
2360                    "audio attachments received but no STT provider configured, dropping"
2361                );
2362            }
2363            text_base
2364        };
2365
2366        let mut image_parts = Vec::new();
2367        for attachment in image_attachments {
2368            if attachment.data.len() > MAX_IMAGE_BYTES {
2369                tracing::warn!(
2370                    size = attachment.data.len(),
2371                    max = MAX_IMAGE_BYTES,
2372                    "image attachment exceeds size limit, skipping"
2373                );
2374                continue;
2375            }
2376            let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
2377            image_parts.push(MessagePart::Image(Box::new(ImageData {
2378                data: attachment.data,
2379                mime_type,
2380            })));
2381        }
2382
2383        (text, image_parts)
2384    }
2385
2386    /// Dispatch slash commands. Returns `Some(Ok(()))` when handled,
2387    /// `Some(Err(_))` on I/O error, `None` to fall through to LLM processing.
2388    async fn dispatch_slash_command(
2389        &mut self,
2390        trimmed: &str,
2391    ) -> Option<Result<(), error::AgentError>> {
2392        macro_rules! handled {
2393            ($expr:expr) => {{
2394                if let Err(e) = $expr {
2395                    return Some(Err(e));
2396                }
2397                let _ = self.channel.flush_chunks().await;
2398                return Some(Ok(()));
2399            }};
2400        }
2401
2402        if trimmed == "/help" {
2403            handled!(self.handle_help_command().await);
2404        }
2405
2406        if trimmed == "/status" {
2407            handled!(self.handle_status_command().await);
2408        }
2409
2410        if trimmed == "/skills" {
2411            handled!(self.handle_skills_command().await);
2412        }
2413
2414        if trimmed == "/skill" || trimmed.starts_with("/skill ") {
2415            let rest = trimmed
2416                .strip_prefix("/skill")
2417                .unwrap_or("")
2418                .trim()
2419                .to_owned();
2420            handled!(self.handle_skill_command(&rest).await);
2421        }
2422
2423        if trimmed == "/feedback" || trimmed.starts_with("/feedback ") {
2424            let rest = trimmed
2425                .strip_prefix("/feedback")
2426                .unwrap_or("")
2427                .trim()
2428                .to_owned();
2429            handled!(self.handle_feedback(&rest).await);
2430        }
2431
2432        if trimmed == "/mcp" || trimmed.starts_with("/mcp ") {
2433            let args = trimmed.strip_prefix("/mcp").unwrap_or("").trim().to_owned();
2434            handled!(self.handle_mcp_command(&args).await);
2435        }
2436
2437        if trimmed == "/image" || trimmed.starts_with("/image ") {
2438            let path = trimmed
2439                .strip_prefix("/image")
2440                .unwrap_or("")
2441                .trim()
2442                .to_owned();
2443            if path.is_empty() {
2444                handled!(
2445                    self.channel
2446                        .send("Usage: /image <path>")
2447                        .await
2448                        .map_err(Into::into)
2449                );
2450            }
2451            handled!(self.handle_image_command(&path).await);
2452        }
2453
2454        if trimmed == "/plan" || trimmed.starts_with("/plan ") {
2455            return Some(self.dispatch_plan_command(trimmed).await);
2456        }
2457
2458        if trimmed == "/graph" || trimmed.starts_with("/graph ") {
2459            handled!(self.handle_graph_command(trimmed).await);
2460        }
2461
2462        #[cfg(feature = "scheduler")]
2463        if trimmed == "/scheduler" || trimmed.starts_with("/scheduler ") {
2464            handled!(self.handle_scheduler_command(trimmed).await);
2465        }
2466
2467        #[cfg(feature = "experiments")]
2468        if trimmed == "/experiment" || trimmed.starts_with("/experiment ") {
2469            handled!(self.handle_experiment_command(trimmed).await);
2470        }
2471
2472        #[cfg(feature = "lsp-context")]
2473        if trimmed == "/lsp" {
2474            handled!(self.handle_lsp_status_command().await);
2475        }
2476
2477        if trimmed == "/log" {
2478            handled!(self.handle_log_command().await);
2479        }
2480
2481        if trimmed.starts_with("/agent") || trimmed.starts_with('@') {
2482            return self.dispatch_agent_command(trimmed).await;
2483        }
2484
2485        None
2486    }
2487
2488    async fn dispatch_plan_command(&mut self, trimmed: &str) -> Result<(), error::AgentError> {
2489        match crate::orchestration::PlanCommand::parse(trimmed) {
2490            Ok(cmd) => {
2491                self.handle_plan_command(cmd).await?;
2492            }
2493            Err(e) => {
2494                self.channel
2495                    .send(&e.to_string())
2496                    .await
2497                    .map_err(error::AgentError::from)?;
2498            }
2499        }
2500        let _ = self.channel.flush_chunks().await;
2501        Ok(())
2502    }
2503
2504    async fn dispatch_agent_command(
2505        &mut self,
2506        trimmed: &str,
2507    ) -> Option<Result<(), error::AgentError>> {
2508        let known: Vec<String> = self
2509            .orchestration
2510            .subagent_manager
2511            .as_ref()
2512            .map(|m| m.definitions().iter().map(|d| d.name.clone()).collect())
2513            .unwrap_or_default();
2514        match crate::subagent::AgentCommand::parse(trimmed, &known) {
2515            Ok(cmd) => {
2516                if let Some(msg) = self.handle_agent_command(cmd).await
2517                    && let Err(e) = self.channel.send(&msg).await
2518                {
2519                    return Some(Err(e.into()));
2520                }
2521                let _ = self.channel.flush_chunks().await;
2522                Some(Ok(()))
2523            }
2524            Err(e) if trimmed.starts_with('@') => {
2525                // Unknown @token — fall through to normal LLM processing
2526                tracing::debug!("@mention not matched as agent: {e}");
2527                None
2528            }
2529            Err(e) => {
2530                if let Err(send_err) = self.channel.send(&e.to_string()).await {
2531                    return Some(Err(send_err.into()));
2532                }
2533                let _ = self.channel.flush_chunks().await;
2534                Some(Ok(()))
2535            }
2536        }
2537    }
2538
2539    /// Spawn a background task to evaluate the user message with the LLM judge and store the
2540    /// correction result. Non-blocking: the task runs independently of the response pipeline.
2541    ///
2542    /// # Notes
2543    ///
2544    /// TODO(I3): `JoinHandle`s are not tracked — outstanding tasks may be aborted on runtime
2545    /// shutdown before `store_user_correction` completes. Acceptable for MVP.
2546    fn spawn_judge_correction_check(
2547        &mut self,
2548        trimmed: &str,
2549        conv_id: Option<zeph_memory::ConversationId>,
2550    ) {
2551        let judge_provider = self
2552            .providers
2553            .judge_provider
2554            .clone()
2555            .unwrap_or_else(|| self.provider.clone());
2556        let assistant_snippet = self.last_assistant_response();
2557        let user_msg_owned = trimmed.to_owned();
2558        let memory_arc = self.memory_state.memory.clone();
2559        let skill_name = self
2560            .skill_state
2561            .active_skill_names
2562            .first()
2563            .cloned()
2564            .unwrap_or_default();
2565        let conv_id_bg = conv_id;
2566        let confidence_threshold = self
2567            .learning_engine
2568            .config
2569            .as_ref()
2570            .map_or(0.6, |c| c.correction_confidence_threshold);
2571
2572        tokio::spawn(async move {
2573            match feedback_detector::JudgeDetector::evaluate(
2574                &judge_provider,
2575                &user_msg_owned,
2576                &assistant_snippet,
2577                confidence_threshold,
2578            )
2579            .await
2580            {
2581                Ok(verdict) => {
2582                    if let Some(signal) = verdict.into_signal(&user_msg_owned) {
2583                        // Self-corrections (user corrects their own statement) must not
2584                        // penalize skills. The judge path has no record_skill_outcomes()
2585                        // call today, but this guard mirrors the regex path to make the
2586                        // intent explicit and prevent future regressions if parity is added.
2587                        let is_self_correction =
2588                            signal.kind == feedback_detector::CorrectionKind::SelfCorrection;
2589                        tracing::info!(
2590                            kind = signal.kind.as_str(),
2591                            confidence = signal.confidence,
2592                            source = "judge",
2593                            is_self_correction,
2594                            "correction signal detected"
2595                        );
2596                        if let Some(memory) = memory_arc {
2597                            let correction_text = context::truncate_chars(&user_msg_owned, 500);
2598                            match memory
2599                                .sqlite()
2600                                .store_user_correction(
2601                                    conv_id_bg.map(|c| c.0),
2602                                    &assistant_snippet,
2603                                    &correction_text,
2604                                    if skill_name.is_empty() {
2605                                        None
2606                                    } else {
2607                                        Some(skill_name.as_str())
2608                                    },
2609                                    signal.kind.as_str(),
2610                                )
2611                                .await
2612                            {
2613                                Ok(correction_id) => {
2614                                    if let Err(e) = memory
2615                                        .store_correction_embedding(correction_id, &correction_text)
2616                                        .await
2617                                    {
2618                                        tracing::warn!(
2619                                            "failed to store correction embedding: {e:#}"
2620                                        );
2621                                    }
2622                                }
2623                                Err(e) => {
2624                                    tracing::warn!("failed to store judge correction: {e:#}");
2625                                }
2626                            }
2627                        }
2628                    }
2629                }
2630                Err(e) => {
2631                    tracing::warn!("judge detector failed: {e:#}");
2632                }
2633            }
2634        });
2635    }
2636
2637    /// Detect implicit corrections in the user's message and record them in the learning engine.
2638    ///
2639    /// Uses regex-based `FeedbackDetector` first. If a `JudgeDetector` is configured and the
2640    /// regex result is borderline, the LLM judge runs in a background task (non-blocking).
2641    async fn detect_and_record_corrections(
2642        &mut self,
2643        trimmed: &str,
2644        conv_id: Option<zeph_memory::ConversationId>,
2645    ) {
2646        let correction_detection_enabled = self
2647            .learning_engine
2648            .config
2649            .as_ref()
2650            .is_none_or(|c| c.correction_detection);
2651        if !self.is_learning_enabled() || !correction_detection_enabled {
2652            return;
2653        }
2654
2655        let previous_user_messages: Vec<&str> = self
2656            .messages
2657            .iter()
2658            .filter(|m| m.role == Role::User)
2659            .map(|m| m.content.as_str())
2660            .collect();
2661        let regex_signal = self
2662            .feedback_detector
2663            .detect(trimmed, &previous_user_messages);
2664
2665        // Judge mode: invoke LLM in background if regex is borderline or missed.
2666        //
2667        // The judge call is decoupled from the response pipeline — it records the
2668        // correction asynchronously via tokio::spawn and returns None immediately
2669        // so the user response is not blocked.
2670        //
2671        // TODO(I3): JoinHandles are not tracked — outstanding tasks may be aborted
2672        // on runtime shutdown before store_user_correction completes. This is
2673        // acceptable for the learning subsystem at MVP. Future: collect handles in
2674        // Agent and drain on graceful shutdown.
2675        // Check rate limit synchronously before deciding to spawn.
2676        // The judge_detector is &mut self so check_rate_limit() can update call_times.
2677        let judge_should_run = self
2678            .judge_detector
2679            .as_ref()
2680            .is_some_and(|jd| jd.should_invoke(regex_signal.as_ref()))
2681            && self
2682                .judge_detector
2683                .as_mut()
2684                .is_some_and(feedback_detector::JudgeDetector::check_rate_limit);
2685
2686        let signal = if judge_should_run {
2687            self.spawn_judge_correction_check(trimmed, conv_id);
2688            // Judge runs in background — return None so the response pipeline continues.
2689            None
2690        } else {
2691            regex_signal
2692        };
2693
2694        let Some(signal) = signal else { return };
2695        tracing::info!(
2696            kind = signal.kind.as_str(),
2697            confidence = signal.confidence,
2698            source = "regex",
2699            "implicit correction detected"
2700        );
2701        // REV-PH2-002 + SEC-PH2-002: cap feedback_text to 500 chars (UTF-8 safe)
2702        let feedback_text = context::truncate_chars(&signal.feedback_text, 500);
2703        // Self-corrections (user corrects their own statement) must not penalize skills —
2704        // the agent did nothing wrong. Store for analytics but skip skill outcome recording.
2705        if signal.kind != feedback_detector::CorrectionKind::SelfCorrection {
2706            self.record_skill_outcomes(
2707                "user_rejection",
2708                Some(&feedback_text),
2709                Some(signal.kind.as_str()),
2710            )
2711            .await;
2712        }
2713        if let Some(memory) = &self.memory_state.memory {
2714            // Use `trimmed` (raw user input, untainted by secrets) instead of
2715            // `feedback_text` (derived from previous_user_messages → self.messages)
2716            // to avoid the CodeQL cleartext-logging taint path.
2717            let correction_text = context::truncate_chars(trimmed, 500);
2718            match memory
2719                .sqlite()
2720                .store_user_correction(
2721                    conv_id.map(|c| c.0),
2722                    "",
2723                    &correction_text,
2724                    self.skill_state
2725                        .active_skill_names
2726                        .first()
2727                        .map(String::as_str),
2728                    signal.kind.as_str(),
2729                )
2730                .await
2731            {
2732                Ok(correction_id) => {
2733                    if let Err(e) = memory
2734                        .store_correction_embedding(correction_id, &correction_text)
2735                        .await
2736                    {
2737                        tracing::warn!("failed to store correction embedding: {e:#}");
2738                    }
2739                }
2740                Err(e) => tracing::warn!("failed to store user correction: {e:#}"),
2741            }
2742        }
2743    }
2744
2745    async fn process_user_message(
2746        &mut self,
2747        text: String,
2748        image_parts: Vec<zeph_llm::provider::MessagePart>,
2749    ) -> Result<(), error::AgentError> {
2750        // Record iteration start in trace collector (C-02: owned guard, no borrow held).
2751        let iteration_index = self.debug_state.iteration_counter;
2752        self.debug_state.iteration_counter += 1;
2753        if let Some(ref mut tc) = self.debug_state.trace_collector {
2754            tc.begin_iteration(iteration_index, text.trim());
2755            // CR-01: store the span ID so LLM/tool execution can attach child spans.
2756            self.debug_state.current_iteration_span_id =
2757                tc.current_iteration_span_id(iteration_index);
2758        }
2759
2760        let result = self
2761            .process_user_message_inner(text, image_parts, iteration_index)
2762            .await;
2763
2764        // Close iteration span regardless of outcome (partial trace preserved on error).
2765        if let Some(ref mut tc) = self.debug_state.trace_collector {
2766            let status = if result.is_ok() {
2767                crate::debug_dump::trace::SpanStatus::Ok
2768            } else {
2769                crate::debug_dump::trace::SpanStatus::Error {
2770                    message: "iteration failed".to_owned(),
2771                }
2772            };
2773            tc.end_iteration(iteration_index, status);
2774        }
2775        self.debug_state.current_iteration_span_id = None;
2776
2777        result
2778    }
2779
2780    async fn process_user_message_inner(
2781        &mut self,
2782        text: String,
2783        image_parts: Vec<zeph_llm::provider::MessagePart>,
2784        iteration_index: usize,
2785    ) -> Result<(), error::AgentError> {
2786        let _ = iteration_index; // Used indirectly via debug_state.current_iteration_span_id.
2787        self.lifecycle.cancel_token = CancellationToken::new();
2788        let signal = Arc::clone(&self.lifecycle.cancel_signal);
2789        let token = self.lifecycle.cancel_token.clone();
2790        tokio::spawn(async move {
2791            signal.notified().await;
2792            token.cancel();
2793        });
2794        let trimmed = text.trim();
2795
2796        if let Some(result) = self.dispatch_slash_command(trimmed).await {
2797            return result;
2798        }
2799
2800        self.check_pending_rollbacks().await;
2801        // Extract before rebuild_system_prompt so the value is not tainted
2802        // by the secrets-bearing system prompt (ConversationId is just an i64).
2803        let conv_id = self.memory_state.conversation_id;
2804        self.rebuild_system_prompt(&text).await;
2805
2806        self.detect_and_record_corrections(trimmed, conv_id).await;
2807
2808        // Reset per-turn compaction guard at the start of context management phase.
2809        self.context_manager.compacted_this_turn = false;
2810
2811        // Tier 0: batch-apply deferred tool summaries when approaching context limit.
2812        // This is a pure in-memory operation (no LLM call) — summaries were pre-computed
2813        // during the tool loop. Intentionally does NOT set compacted_this_turn, so
2814        // proactive/reactive compaction may still fire if tokens remain above their thresholds.
2815        self.maybe_apply_deferred_summaries();
2816
2817        // Proactive compression fires first (if configured); if it runs, reactive is skipped.
2818        if let Err(e) = self.maybe_proactive_compress().await {
2819            tracing::warn!("proactive compression failed: {e:#}");
2820        }
2821
2822        if let Err(e) = self.maybe_compact().await {
2823            tracing::warn!("context compaction failed: {e:#}");
2824        }
2825
2826        if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
2827            tracing::warn!("context preparation failed: {e:#}");
2828        }
2829
2830        self.learning_engine.reset_reflection();
2831
2832        let mut all_image_parts = std::mem::take(&mut self.pending_image_parts);
2833        all_image_parts.extend(image_parts);
2834        let image_parts = all_image_parts;
2835
2836        let user_msg = if !image_parts.is_empty() && self.provider.supports_vision() {
2837            let mut parts = vec![zeph_llm::provider::MessagePart::Text { text: text.clone() }];
2838            parts.extend(image_parts);
2839            Message::from_parts(Role::User, parts)
2840        } else {
2841            if !image_parts.is_empty() {
2842                tracing::warn!(
2843                    count = image_parts.len(),
2844                    "image attachments dropped: provider does not support vision"
2845                );
2846            }
2847            Message {
2848                role: Role::User,
2849                content: text.clone(),
2850                parts: vec![],
2851                metadata: MessageMetadata::default(),
2852            }
2853        };
2854        // Image parts intentionally excluded — base64 payloads too large for message history.
2855        self.persist_message(Role::User, &text, &[], false).await;
2856        self.push_message(user_msg);
2857
2858        if let Err(e) = self.process_response().await {
2859            tracing::error!("Response processing failed: {e:#}");
2860            let user_msg = format!("Error: {e:#}");
2861            self.channel.send(&user_msg).await?;
2862            self.messages.pop();
2863            self.recompute_prompt_tokens();
2864            self.channel.flush_chunks().await?;
2865        }
2866
2867        Ok(())
2868    }
2869
2870    async fn handle_image_command(&mut self, path: &str) -> Result<(), error::AgentError> {
2871        use std::path::Component;
2872        use zeph_llm::provider::{ImageData, MessagePart};
2873
2874        // Reject paths that traverse outside the current directory.
2875        let has_parent_dir = std::path::Path::new(path)
2876            .components()
2877            .any(|c| c == Component::ParentDir);
2878        if has_parent_dir {
2879            self.channel
2880                .send("Invalid image path: path traversal not allowed")
2881                .await?;
2882            let _ = self.channel.flush_chunks().await;
2883            return Ok(());
2884        }
2885
2886        let data = match std::fs::read(path) {
2887            Ok(d) => d,
2888            Err(e) => {
2889                self.channel
2890                    .send(&format!("Cannot read image {path}: {e}"))
2891                    .await?;
2892                let _ = self.channel.flush_chunks().await;
2893                return Ok(());
2894            }
2895        };
2896        if data.len() > MAX_IMAGE_BYTES {
2897            self.channel
2898                .send(&format!(
2899                    "Image {path} exceeds size limit ({} MB), skipping",
2900                    MAX_IMAGE_BYTES / 1024 / 1024
2901                ))
2902                .await?;
2903            let _ = self.channel.flush_chunks().await;
2904            return Ok(());
2905        }
2906        let mime_type = detect_image_mime(Some(path)).to_string();
2907        self.pending_image_parts
2908            .push(MessagePart::Image(Box::new(ImageData { data, mime_type })));
2909        self.channel
2910            .send(&format!("Image loaded: {path}. Send your message."))
2911            .await?;
2912        let _ = self.channel.flush_chunks().await;
2913        Ok(())
2914    }
2915
2916    async fn handle_help_command(&mut self) -> Result<(), error::AgentError> {
2917        use std::fmt::Write;
2918
2919        let mut out = String::from("Slash commands:\n\n");
2920
2921        let categories = [
2922            slash_commands::SlashCategory::Info,
2923            slash_commands::SlashCategory::Session,
2924            slash_commands::SlashCategory::Model,
2925            slash_commands::SlashCategory::Memory,
2926            slash_commands::SlashCategory::Tools,
2927            slash_commands::SlashCategory::Planning,
2928            slash_commands::SlashCategory::Debug,
2929            slash_commands::SlashCategory::Advanced,
2930        ];
2931
2932        for cat in &categories {
2933            let entries: Vec<_> = slash_commands::COMMANDS
2934                .iter()
2935                .filter(|c| &c.category == cat)
2936                .collect();
2937            if entries.is_empty() {
2938                continue;
2939            }
2940            let _ = writeln!(out, "{}:", cat.as_str());
2941            for cmd in entries {
2942                if cmd.args.is_empty() {
2943                    let _ = write!(out, "  {}", cmd.name);
2944                } else {
2945                    let _ = write!(out, "  {} {}", cmd.name, cmd.args);
2946                }
2947                let _ = write!(out, "  — {}", cmd.description);
2948                if let Some(feat) = cmd.feature_gate {
2949                    let _ = write!(out, " [requires: {feat}]");
2950                }
2951                let _ = writeln!(out);
2952            }
2953            let _ = writeln!(out);
2954        }
2955
2956        self.channel.send(out.trim_end()).await?;
2957        Ok(())
2958    }
2959
2960    async fn handle_status_command(&mut self) -> Result<(), error::AgentError> {
2961        use std::fmt::Write;
2962
2963        let uptime = self.lifecycle.start_time.elapsed().as_secs();
2964        let msg_count = self
2965            .messages
2966            .iter()
2967            .filter(|m| m.role == Role::User)
2968            .count();
2969
2970        let (api_calls, prompt_tokens, completion_tokens, cost_cents, mcp_servers) =
2971            if let Some(ref tx) = self.metrics.metrics_tx {
2972                let m = tx.borrow();
2973                (
2974                    m.api_calls,
2975                    m.prompt_tokens,
2976                    m.completion_tokens,
2977                    m.cost_spent_cents,
2978                    m.mcp_server_count,
2979                )
2980            } else {
2981                (0, 0, 0, 0.0, 0)
2982            };
2983
2984        let skill_count = self
2985            .skill_state
2986            .registry
2987            .read()
2988            .map(|r| r.all_meta().len())
2989            .unwrap_or(0);
2990
2991        let mut out = String::from("Session status:\n\n");
2992        let _ = writeln!(out, "Provider:  {}", self.provider.name());
2993        let _ = writeln!(out, "Model:     {}", self.runtime.model_name);
2994        let _ = writeln!(out, "Uptime:    {uptime}s");
2995        let _ = writeln!(out, "Turns:     {msg_count}");
2996        let _ = writeln!(out, "API calls: {api_calls}");
2997        let _ = writeln!(
2998            out,
2999            "Tokens:    {prompt_tokens} prompt / {completion_tokens} completion"
3000        );
3001        let _ = writeln!(out, "Skills:    {skill_count}");
3002        let _ = writeln!(out, "MCP:       {mcp_servers} server(s)");
3003        if cost_cents > 0.0 {
3004            let _ = writeln!(out, "Cost:      ${:.4}", cost_cents / 100.0);
3005        }
3006
3007        self.channel.send(out.trim_end()).await?;
3008        Ok(())
3009    }
3010
3011    async fn handle_skills_command(&mut self) -> Result<(), error::AgentError> {
3012        use std::fmt::Write;
3013
3014        let mut output = String::from("Available skills:\n\n");
3015
3016        let all_meta: Vec<zeph_skills::loader::SkillMeta> = self
3017            .skill_state
3018            .registry
3019            .read()
3020            .expect("registry read lock")
3021            .all_meta()
3022            .into_iter()
3023            .cloned()
3024            .collect();
3025
3026        for meta in &all_meta {
3027            let trust_info = if let Some(memory) = &self.memory_state.memory {
3028                memory
3029                    .sqlite()
3030                    .load_skill_trust(&meta.name)
3031                    .await
3032                    .ok()
3033                    .flatten()
3034                    .map_or_else(String::new, |r| format!(" [{}]", r.trust_level))
3035            } else {
3036                String::new()
3037            };
3038            let _ = writeln!(output, "- {} — {}{trust_info}", meta.name, meta.description);
3039        }
3040
3041        if let Some(memory) = &self.memory_state.memory {
3042            match memory.sqlite().load_skill_usage().await {
3043                Ok(usage) if !usage.is_empty() => {
3044                    output.push_str("\nUsage statistics:\n\n");
3045                    for row in &usage {
3046                        let _ = writeln!(
3047                            output,
3048                            "- {}: {} invocations (last: {})",
3049                            row.skill_name, row.invocation_count, row.last_used_at,
3050                        );
3051                    }
3052                }
3053                Ok(_) => {}
3054                Err(e) => tracing::warn!("failed to load skill usage: {e:#}"),
3055            }
3056        }
3057
3058        self.channel.send(&output).await?;
3059        Ok(())
3060    }
3061
3062    async fn handle_feedback(&mut self, input: &str) -> Result<(), error::AgentError> {
3063        let Some((name, rest)) = input.split_once(' ') else {
3064            self.channel
3065                .send("Usage: /feedback <skill_name> <message>")
3066                .await?;
3067            return Ok(());
3068        };
3069        let (skill_name, feedback) = (name.trim(), rest.trim().trim_matches('"'));
3070
3071        if feedback.is_empty() {
3072            self.channel
3073                .send("Usage: /feedback <skill_name> <message>")
3074                .await?;
3075            return Ok(());
3076        }
3077
3078        let Some(memory) = &self.memory_state.memory else {
3079            self.channel.send("Memory not available.").await?;
3080            return Ok(());
3081        };
3082
3083        let outcome_type = if self.feedback_detector.detect(feedback, &[]).is_some() {
3084            "user_rejection"
3085        } else {
3086            "user_approval"
3087        };
3088
3089        memory
3090            .sqlite()
3091            .record_skill_outcome(
3092                skill_name,
3093                None,
3094                self.memory_state.conversation_id,
3095                outcome_type,
3096                None,
3097                Some(feedback),
3098            )
3099            .await?;
3100
3101        if self.is_learning_enabled() && outcome_type == "user_rejection" {
3102            self.generate_improved_skill(skill_name, feedback, "", Some(feedback))
3103                .await
3104                .ok();
3105        }
3106
3107        self.channel
3108            .send(&format!("Feedback recorded for \"{skill_name}\"."))
3109            .await?;
3110        Ok(())
3111    }
3112
3113    /// Poll a sub-agent until it reaches a terminal state, bridging secret requests to the
3114    /// channel. Returns a human-readable status string suitable for sending to the user.
3115    async fn poll_subagent_until_done(&mut self, task_id: &str, label: &str) -> Option<String> {
3116        use crate::subagent::SubAgentState;
3117        let result = loop {
3118            tokio::time::sleep(std::time::Duration::from_millis(500)).await;
3119
3120            // Bridge secret requests from sub-agent to channel.confirm().
3121            // Fetch the pending request first, then release the borrow before
3122            // calling channel.confirm() (which requires &mut self).
3123            #[allow(clippy::redundant_closure_for_method_calls)]
3124            let pending = self
3125                .orchestration
3126                .subagent_manager
3127                .as_mut()
3128                .and_then(|m| m.try_recv_secret_request());
3129            if let Some((req_task_id, req)) = pending {
3130                // req.secret_key is pre-validated to [a-zA-Z0-9_-] in manager.rs
3131                // (SEC-P1-02), so it is safe to embed in the prompt string.
3132                let confirm_prompt = format!(
3133                    "Sub-agent requests secret '{}'. Allow?",
3134                    crate::text::truncate_to_chars(&req.secret_key, 100)
3135                );
3136                let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
3137                if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
3138                    if approved {
3139                        let ttl = std::time::Duration::from_secs(300);
3140                        let key = req.secret_key.clone();
3141                        if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
3142                            let _ = mgr.deliver_secret(&req_task_id, key);
3143                        }
3144                    } else {
3145                        let _ = mgr.deny_secret(&req_task_id);
3146                    }
3147                }
3148            }
3149
3150            let mgr = self.orchestration.subagent_manager.as_ref()?;
3151            let statuses = mgr.statuses();
3152            let Some((_, status)) = statuses.iter().find(|(id, _)| id == task_id) else {
3153                break format!("{label} completed (no status available).");
3154            };
3155            match status.state {
3156                SubAgentState::Completed => {
3157                    let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
3158                    break format!("{label} completed: {msg}");
3159                }
3160                SubAgentState::Failed => {
3161                    let msg = status
3162                        .last_message
3163                        .clone()
3164                        .unwrap_or_else(|| "unknown error".into());
3165                    break format!("{label} failed: {msg}");
3166                }
3167                SubAgentState::Canceled => {
3168                    break format!("{label} was cancelled.");
3169                }
3170                _ => {
3171                    let _ = self
3172                        .channel
3173                        .send_status(&format!(
3174                            "{label}: turn {}/{}",
3175                            status.turns_used,
3176                            self.orchestration
3177                                .subagent_manager
3178                                .as_ref()
3179                                .and_then(|m| m.agents_def(task_id))
3180                                .map_or(20, |d| d.permissions.max_turns)
3181                        ))
3182                        .await;
3183                }
3184            }
3185        };
3186        Some(result)
3187    }
3188
3189    /// Resolve a unique full `task_id` from a prefix. Returns `None` if the manager is absent,
3190    /// `Some(Err(msg))` on ambiguity/not-found, `Some(Ok(full_id))` on success.
3191    fn resolve_agent_id_prefix(&mut self, prefix: &str) -> Option<Result<String, String>> {
3192        let mgr = self.orchestration.subagent_manager.as_mut()?;
3193        let full_ids: Vec<String> = mgr
3194            .statuses()
3195            .into_iter()
3196            .map(|(tid, _)| tid)
3197            .filter(|tid| tid.starts_with(prefix))
3198            .collect();
3199        Some(match full_ids.as_slice() {
3200            [] => Err(format!("No sub-agent with id prefix '{prefix}'")),
3201            [fid] => Ok(fid.clone()),
3202            _ => Err(format!(
3203                "Ambiguous id prefix '{prefix}': matches {} agents",
3204                full_ids.len()
3205            )),
3206        })
3207    }
3208
3209    fn handle_agent_list(&self) -> Option<String> {
3210        use std::fmt::Write as _;
3211        let mgr = self.orchestration.subagent_manager.as_ref()?;
3212        let defs = mgr.definitions();
3213        if defs.is_empty() {
3214            return Some("No sub-agent definitions found.".into());
3215        }
3216        let mut out = String::from("Available sub-agents:\n");
3217        for d in defs {
3218            let memory_label = match d.memory {
3219                Some(crate::subagent::MemoryScope::User) => " [memory:user]",
3220                Some(crate::subagent::MemoryScope::Project) => " [memory:project]",
3221                Some(crate::subagent::MemoryScope::Local) => " [memory:local]",
3222                None => "",
3223            };
3224            if let Some(ref src) = d.source {
3225                let _ = writeln!(
3226                    out,
3227                    "  {}{} — {} ({})",
3228                    d.name, memory_label, d.description, src
3229                );
3230            } else {
3231                let _ = writeln!(out, "  {}{} — {}", d.name, memory_label, d.description);
3232            }
3233        }
3234        Some(out)
3235    }
3236
3237    fn handle_agent_status(&self) -> Option<String> {
3238        use std::fmt::Write as _;
3239        let mgr = self.orchestration.subagent_manager.as_ref()?;
3240        let statuses = mgr.statuses();
3241        if statuses.is_empty() {
3242            return Some("No active sub-agents.".into());
3243        }
3244        let mut out = String::from("Active sub-agents:\n");
3245        for (id, s) in &statuses {
3246            let state = format!("{:?}", s.state).to_lowercase();
3247            let elapsed = s.started_at.elapsed().as_secs();
3248            let _ = writeln!(
3249                out,
3250                "  [{short}] {state}  turns={t}  elapsed={elapsed}s  {msg}",
3251                short = &id[..8.min(id.len())],
3252                t = s.turns_used,
3253                msg = s.last_message.as_deref().unwrap_or(""),
3254            );
3255            // Show memory directory path for agents with memory enabled.
3256            if let Some(def) = mgr.agents_def(id)
3257                && let Some(scope) = def.memory
3258                && let Ok(dir) = crate::subagent::memory::resolve_memory_dir(scope, &def.name)
3259            {
3260                let _ = writeln!(out, "       memory: {}", dir.display());
3261            }
3262        }
3263        Some(out)
3264    }
3265
3266    fn handle_agent_approve(&mut self, id: &str) -> Option<String> {
3267        let full_id = match self.resolve_agent_id_prefix(id)? {
3268            Ok(fid) => fid,
3269            Err(msg) => return Some(msg),
3270        };
3271        let mgr = self.orchestration.subagent_manager.as_mut()?;
3272        if let Some((tid, req)) = mgr.try_recv_secret_request()
3273            && tid == full_id
3274        {
3275            let key = req.secret_key.clone();
3276            let ttl = std::time::Duration::from_secs(300);
3277            if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
3278                return Some(format!("Approve failed: {e}"));
3279            }
3280            if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
3281                return Some(format!("Secret delivery failed: {e}"));
3282            }
3283            return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
3284        }
3285        Some(format!(
3286            "No pending secret request for sub-agent '{full_id}'."
3287        ))
3288    }
3289
3290    fn handle_agent_deny(&mut self, id: &str) -> Option<String> {
3291        let full_id = match self.resolve_agent_id_prefix(id)? {
3292            Ok(fid) => fid,
3293            Err(msg) => return Some(msg),
3294        };
3295        let mgr = self.orchestration.subagent_manager.as_mut()?;
3296        match mgr.deny_secret(&full_id) {
3297            Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
3298            Err(e) => Some(format!("Deny failed: {e}")),
3299        }
3300    }
3301
3302    async fn handle_agent_command(&mut self, cmd: crate::subagent::AgentCommand) -> Option<String> {
3303        use crate::subagent::AgentCommand;
3304
3305        match cmd {
3306            AgentCommand::List => self.handle_agent_list(),
3307            AgentCommand::Background { name, prompt } => {
3308                let provider = self.provider.clone();
3309                let tool_executor = Arc::clone(&self.tool_executor);
3310                let skills = self.filtered_skills_for(&name);
3311                let mgr = self.orchestration.subagent_manager.as_mut()?;
3312                let cfg = self.orchestration.subagent_config.clone();
3313                match mgr.spawn(&name, &prompt, provider, tool_executor, skills, &cfg) {
3314                    Ok(id) => Some(format!(
3315                        "Sub-agent '{name}' started in background (id: {short})",
3316                        short = &id[..8.min(id.len())]
3317                    )),
3318                    Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
3319                }
3320            }
3321            AgentCommand::Spawn { name, prompt }
3322            | AgentCommand::Mention {
3323                agent: name,
3324                prompt,
3325            } => {
3326                // Foreground spawn: launch and await completion, streaming status to user.
3327                let provider = self.provider.clone();
3328                let tool_executor = Arc::clone(&self.tool_executor);
3329                let skills = self.filtered_skills_for(&name);
3330                let mgr = self.orchestration.subagent_manager.as_mut()?;
3331                let cfg = self.orchestration.subagent_config.clone();
3332                let task_id = match mgr.spawn(&name, &prompt, provider, tool_executor, skills, &cfg)
3333                {
3334                    Ok(id) => id,
3335                    Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
3336                };
3337                let short = task_id[..8.min(task_id.len())].to_owned();
3338                let _ = self
3339                    .channel
3340                    .send(&format!("Sub-agent '{name}' running... (id: {short})"))
3341                    .await;
3342                let label = format!("Sub-agent '{name}'");
3343                self.poll_subagent_until_done(&task_id, &label).await
3344            }
3345            AgentCommand::Status => self.handle_agent_status(),
3346            AgentCommand::Cancel { id } => {
3347                let mgr = self.orchestration.subagent_manager.as_mut()?;
3348                // Accept prefix match on task_id.
3349                let ids: Vec<String> = mgr
3350                    .statuses()
3351                    .into_iter()
3352                    .map(|(task_id, _)| task_id)
3353                    .filter(|task_id| task_id.starts_with(&id))
3354                    .collect();
3355                match ids.as_slice() {
3356                    [] => Some(format!("No sub-agent with id prefix '{id}'")),
3357                    [full_id] => {
3358                        let full_id = full_id.clone();
3359                        match mgr.cancel(&full_id) {
3360                            Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
3361                            Err(e) => Some(format!("Cancel failed: {e}")),
3362                        }
3363                    }
3364                    _ => Some(format!(
3365                        "Ambiguous id prefix '{id}': matches {} agents",
3366                        ids.len()
3367                    )),
3368                }
3369            }
3370            AgentCommand::Approve { id } => self.handle_agent_approve(&id),
3371            AgentCommand::Deny { id } => self.handle_agent_deny(&id),
3372            AgentCommand::Resume { id, prompt } => {
3373                let cfg = self.orchestration.subagent_config.clone();
3374                // Resolve definition name from transcript meta before spawning so we can
3375                // look up skills by definition name rather than the UUID prefix (S1 fix).
3376                let def_name = {
3377                    let mgr = self.orchestration.subagent_manager.as_ref()?;
3378                    match mgr.def_name_for_resume(&id, &cfg) {
3379                        Ok(name) => name,
3380                        Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
3381                    }
3382                };
3383                let skills = self.filtered_skills_for(&def_name);
3384                let provider = self.provider.clone();
3385                let tool_executor = Arc::clone(&self.tool_executor);
3386                let mgr = self.orchestration.subagent_manager.as_mut()?;
3387                let (task_id, _) =
3388                    match mgr.resume(&id, &prompt, provider, tool_executor, skills, &cfg) {
3389                        Ok(pair) => pair,
3390                        Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
3391                    };
3392                let short = task_id[..8.min(task_id.len())].to_owned();
3393                let _ = self
3394                    .channel
3395                    .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
3396                    .await;
3397                self.poll_subagent_until_done(&task_id, "Resumed sub-agent")
3398                    .await
3399            }
3400        }
3401    }
3402
3403    fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
3404        let mgr = self.orchestration.subagent_manager.as_ref()?;
3405        let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
3406        let reg = self
3407            .skill_state
3408            .registry
3409            .read()
3410            .expect("registry read lock");
3411        match crate::subagent::filter_skills(&reg, &def.skills) {
3412            Ok(skills) => {
3413                let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
3414                if bodies.is_empty() {
3415                    None
3416                } else {
3417                    Some(bodies)
3418                }
3419            }
3420            Err(e) => {
3421                tracing::warn!(error = %e, "skill filtering failed for sub-agent");
3422                None
3423            }
3424        }
3425    }
3426
3427    /// Update trust DB records for all reloaded skills.
3428    async fn update_trust_for_reloaded_skills(&self, all_meta: &[zeph_skills::loader::SkillMeta]) {
3429        let Some(ref memory) = self.memory_state.memory else {
3430            return;
3431        };
3432        let trust_cfg = self.skill_state.trust_config.clone();
3433        let managed_dir = self.skill_state.managed_dir.clone();
3434        for meta in all_meta {
3435            let source_kind = if managed_dir
3436                .as_ref()
3437                .is_some_and(|d| meta.skill_dir.starts_with(d))
3438            {
3439                zeph_memory::sqlite::SourceKind::Hub
3440            } else {
3441                zeph_memory::sqlite::SourceKind::Local
3442            };
3443            let initial_level = if matches!(source_kind, zeph_memory::sqlite::SourceKind::Hub) {
3444                &trust_cfg.default_level
3445            } else {
3446                &trust_cfg.local_level
3447            };
3448            match zeph_skills::compute_skill_hash(&meta.skill_dir) {
3449                Ok(current_hash) => {
3450                    let existing = memory
3451                        .sqlite()
3452                        .load_skill_trust(&meta.name)
3453                        .await
3454                        .ok()
3455                        .flatten();
3456                    let trust_level_str = if let Some(ref row) = existing {
3457                        if row.blake3_hash == current_hash {
3458                            row.trust_level.clone()
3459                        } else {
3460                            trust_cfg.hash_mismatch_level.to_string()
3461                        }
3462                    } else {
3463                        initial_level.to_string()
3464                    };
3465                    let source_path = meta.skill_dir.to_str();
3466                    if let Err(e) = memory
3467                        .sqlite()
3468                        .upsert_skill_trust(
3469                            &meta.name,
3470                            &trust_level_str,
3471                            source_kind,
3472                            None,
3473                            source_path,
3474                            &current_hash,
3475                        )
3476                        .await
3477                    {
3478                        tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
3479                    }
3480                }
3481                Err(e) => {
3482                    tracing::warn!("failed to compute hash for '{}': {e:#}", meta.name);
3483                }
3484            }
3485        }
3486    }
3487
3488    /// Rebuild or sync the in-memory skill matcher and BM25 index after a registry update.
3489    async fn rebuild_skill_matcher(&mut self, all_meta: &[&zeph_skills::loader::SkillMeta]) {
3490        let provider = self.provider.clone();
3491        let embed_fn = |text: &str| -> zeph_skills::matcher::EmbedFuture {
3492            let owned = text.to_owned();
3493            let p = provider.clone();
3494            Box::pin(async move { p.embed(&owned).await })
3495        };
3496
3497        let needs_inmemory_rebuild = !self
3498            .skill_state
3499            .matcher
3500            .as_ref()
3501            .is_some_and(SkillMatcherBackend::is_qdrant);
3502
3503        if needs_inmemory_rebuild {
3504            self.skill_state.matcher = SkillMatcher::new(all_meta, embed_fn)
3505                .await
3506                .map(SkillMatcherBackend::InMemory);
3507        } else if let Some(ref mut backend) = self.skill_state.matcher {
3508            let _ = self.channel.send_status("syncing skill index...").await;
3509            if let Err(e) = backend
3510                .sync(all_meta, &self.skill_state.embedding_model, embed_fn)
3511                .await
3512            {
3513                tracing::warn!("failed to sync skill embeddings: {e:#}");
3514            }
3515        }
3516
3517        if self.skill_state.hybrid_search {
3518            let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
3519            let _ = self.channel.send_status("rebuilding search index...").await;
3520            self.skill_state.bm25_index = Some(zeph_skills::bm25::Bm25Index::build(&descs));
3521        }
3522    }
3523
3524    async fn reload_skills(&mut self) {
3525        let new_registry = SkillRegistry::load(&self.skill_state.skill_paths);
3526        if new_registry.fingerprint()
3527            == self
3528                .skill_state
3529                .registry
3530                .read()
3531                .expect("registry read lock")
3532                .fingerprint()
3533        {
3534            return;
3535        }
3536        let _ = self.channel.send_status("reloading skills...").await;
3537        *self
3538            .skill_state
3539            .registry
3540            .write()
3541            .expect("registry write lock") = new_registry;
3542
3543        let all_meta = self
3544            .skill_state
3545            .registry
3546            .read()
3547            .expect("registry read lock")
3548            .all_meta()
3549            .into_iter()
3550            .cloned()
3551            .collect::<Vec<_>>();
3552
3553        self.update_trust_for_reloaded_skills(&all_meta).await;
3554
3555        let all_meta_refs = all_meta.iter().collect::<Vec<_>>();
3556        self.rebuild_skill_matcher(&all_meta_refs).await;
3557
3558        let all_skills: Vec<Skill> = {
3559            let reg = self
3560                .skill_state
3561                .registry
3562                .read()
3563                .expect("registry read lock");
3564            reg.all_meta()
3565                .iter()
3566                .filter_map(|m| reg.get_skill(&m.name).ok())
3567                .collect()
3568        };
3569        let trust_map = self.build_skill_trust_map().await;
3570        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
3571        let skills_prompt = format_skills_prompt(&all_skills, &trust_map, &empty_health);
3572        self.skill_state
3573            .last_skills_prompt
3574            .clone_from(&skills_prompt);
3575        let system_prompt = build_system_prompt(&skills_prompt, None, None, false);
3576        if let Some(msg) = self.messages.first_mut() {
3577            msg.content = system_prompt;
3578        }
3579
3580        let _ = self.channel.send_status("").await;
3581        tracing::info!(
3582            "reloaded {} skill(s)",
3583            self.skill_state
3584                .registry
3585                .read()
3586                .expect("registry read lock")
3587                .all_meta()
3588                .len()
3589        );
3590    }
3591
3592    fn reload_instructions(&mut self) {
3593        // Drain any additional queued events before reloading to avoid redundant reloads.
3594        if let Some(ref mut rx) = self.instruction_reload_rx {
3595            while rx.try_recv().is_ok() {}
3596        }
3597        let Some(ref state) = self.instruction_reload_state else {
3598            return;
3599        };
3600        let new_blocks = crate::instructions::load_instructions(
3601            &state.base_dir,
3602            &state.provider_kinds,
3603            &state.explicit_files,
3604            state.auto_detect,
3605        );
3606        let old_sources: std::collections::HashSet<_> =
3607            self.instruction_blocks.iter().map(|b| &b.source).collect();
3608        let new_sources: std::collections::HashSet<_> =
3609            new_blocks.iter().map(|b| &b.source).collect();
3610        for added in new_sources.difference(&old_sources) {
3611            tracing::info!(path = %added.display(), "instruction file added");
3612        }
3613        for removed in old_sources.difference(&new_sources) {
3614            tracing::info!(path = %removed.display(), "instruction file removed");
3615        }
3616        tracing::info!(
3617            old_count = self.instruction_blocks.len(),
3618            new_count = new_blocks.len(),
3619            "reloaded instruction files"
3620        );
3621        self.instruction_blocks = new_blocks;
3622    }
3623
3624    fn reload_config(&mut self) {
3625        let Some(ref path) = self.lifecycle.config_path else {
3626            return;
3627        };
3628        let config = match Config::load(path) {
3629            Ok(c) => c,
3630            Err(e) => {
3631                tracing::warn!("config reload failed: {e:#}");
3632                return;
3633            }
3634        };
3635
3636        self.runtime.security = config.security;
3637        self.runtime.timeouts = config.timeouts;
3638        self.runtime.redact_credentials = config.memory.redact_credentials;
3639        self.memory_state.history_limit = config.memory.history_limit;
3640        self.memory_state.recall_limit = config.memory.semantic.recall_limit;
3641        self.memory_state.summarization_threshold = config.memory.summarization_threshold;
3642        self.skill_state.max_active_skills = config.skills.max_active_skills;
3643        self.skill_state.disambiguation_threshold = config.skills.disambiguation_threshold;
3644        self.skill_state.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
3645        self.skill_state.hybrid_search = config.skills.hybrid_search;
3646
3647        if config.memory.context_budget_tokens > 0 {
3648            self.context_manager.budget = Some(
3649                ContextBudget::new(config.memory.context_budget_tokens, 0.20)
3650                    .with_graph_enabled(config.memory.graph.enabled),
3651            );
3652        } else {
3653            self.context_manager.budget = None;
3654        }
3655
3656        {
3657            self.memory_state.graph_config = config.memory.graph.clone();
3658        }
3659        self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
3660        self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
3661        self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
3662        self.context_manager.compaction_cooldown_turns = config.memory.compaction_cooldown_turns;
3663        self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
3664        self.context_manager.compression = config.memory.compression.clone();
3665        self.context_manager.routing = config.memory.routing.clone();
3666        self.memory_state.cross_session_score_threshold =
3667            config.memory.cross_session_score_threshold;
3668
3669        self.index.repo_map_tokens = config.index.repo_map_tokens;
3670        self.index.repo_map_ttl = std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
3671
3672        tracing::info!("config reloaded");
3673    }
3674}
3675pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
3676    while !*rx.borrow_and_update() {
3677        if rx.changed().await.is_err() {
3678            std::future::pending::<()>().await;
3679        }
3680    }
3681}
3682
3683pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
3684    match rx {
3685        Some(inner) => {
3686            if let Some(v) = inner.recv().await {
3687                Some(v)
3688            } else {
3689                *rx = None;
3690                std::future::pending().await
3691            }
3692        }
3693        None => std::future::pending().await,
3694    }
3695}
3696
3697#[cfg(test)]
3698mod tests;
3699
3700#[cfg(test)]
3701pub(crate) use tests::agent_tests;