Skip to main content

zeph_core/agent/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4mod agent_access_impl;
5pub(crate) mod agent_supervisor;
6mod autodream;
7mod builder;
8mod command_context_impls;
9pub(crate) mod compaction_strategy;
10pub(super) mod compression_feedback;
11mod context;
12mod context_impls;
13pub(crate) mod context_manager;
14mod corrections;
15pub mod error;
16mod experiment_cmd;
17pub(super) mod feedback_detector;
18pub(crate) mod focus;
19mod index;
20mod learning;
21pub(crate) mod learning_engine;
22mod log_commands;
23mod loop_event;
24mod lsp_commands;
25mod magic_docs;
26mod mcp;
27mod message_queue;
28mod microcompact;
29mod model_commands;
30mod persistence;
31#[cfg(feature = "scheduler")]
32mod plan;
33mod policy_commands;
34mod provider_cmd;
35#[cfg(feature = "self-check")]
36mod quality_hook;
37pub(crate) mod rate_limiter;
38#[cfg(feature = "scheduler")]
39mod scheduler_commands;
40#[cfg(feature = "scheduler")]
41mod scheduler_loop;
42pub mod session_config;
43mod session_digest;
44pub(crate) mod sidequest;
45mod skill_management;
46pub mod slash_commands;
47pub mod speculative;
48pub(crate) mod state;
49pub(crate) mod task_injection;
50pub(crate) mod tool_execution;
51pub(crate) mod tool_orchestrator;
52mod trust_commands;
53pub mod turn;
54mod utils;
55pub(crate) mod vigil;
56
57use std::collections::{HashMap, VecDeque};
58use std::sync::Arc;
59
60use parking_lot::RwLock;
61
62use tokio::sync::{mpsc, watch};
63use tokio_util::sync::CancellationToken;
64use zeph_llm::any::AnyProvider;
65use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
66use zeph_memory::TokenCounter;
67use zeph_memory::semantic::SemanticMemory;
68use zeph_skills::loader::Skill;
69use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
70use zeph_skills::prompt::format_skills_prompt;
71use zeph_skills::registry::SkillRegistry;
72use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
73
74use crate::channel::Channel;
75use crate::config::Config;
76use crate::context::{ContextBudget, build_system_prompt};
77use zeph_common::text::estimate_tokens;
78
79use loop_event::LoopEvent;
80use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, detect_image_mime};
81use state::CompressionState;
82use state::{
83    DebugState, ExperimentState, FeedbackState, IndexState, InstructionState, LifecycleState,
84    McpState, MemoryState, MessageState, MetricsState, OrchestrationState, ProviderState,
85    RuntimeConfig, SecurityState, SessionState, SkillState, ToolState,
86};
87
88pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
89pub(crate) const DOCUMENT_RAG_PREFIX: &str = "## Relevant documents\n";
90pub(crate) const RECALL_PREFIX: &str = "[semantic recall]\n";
91pub(crate) const CODE_CONTEXT_PREFIX: &str = "[code context]\n";
92pub(crate) const SUMMARY_PREFIX: &str = "[conversation summaries]\n";
93pub(crate) const CROSS_SESSION_PREFIX: &str = "[cross-session context]\n";
94pub(crate) const CORRECTIONS_PREFIX: &str = "[past corrections]\n";
95pub(crate) const GRAPH_FACTS_PREFIX: &str = "[known facts]\n";
96pub(crate) const SCHEDULED_TASK_PREFIX: &str = "Execute the following scheduled task now: ";
97pub(crate) const SESSION_DIGEST_PREFIX: &str = "[Session digest from previous interaction]\n";
98/// Prefix used for LSP context messages (`Role::System`) injected into message history.
99/// The tool-pair summarizer targets User/Assistant pairs and skips System messages,
100/// so these notes are never accidentally summarized. `remove_lsp_messages` uses this
101/// prefix to clear stale notes before each fresh injection.
102pub(crate) const LSP_NOTE_PREFIX: &str = "[lsp ";
103pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
104
105pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
106    use std::fmt::Write;
107    let capacity = "[tool output: ".len()
108        + tool_name.len()
109        + "]\n```\n".len()
110        + body.len()
111        + TOOL_OUTPUT_SUFFIX.len();
112    let mut buf = String::with_capacity(capacity);
113    let _ = write!(
114        buf,
115        "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
116    );
117    buf
118}
119
120/// Zeph agent: autonomous AI system with multi-model inference, semantic memory, skills,
121/// tool orchestration, and multi-channel I/O.
122///
123/// The agent maintains conversation history, manages LLM provider state, coordinates tool
124/// execution, and orchestrates memory and skill subsystems. It communicates with the outside
125/// world via the [`Channel`] trait, enabling support for CLI, Telegram, TUI, or custom I/O.
126///
127/// # Architecture
128///
129/// - **Message state**: Conversation history with system prompt, message queue, and metadata
130/// - **Memory state**: `SQLite` + Qdrant vector store for semantic search and compaction
131/// - **Skill state**: Registry, matching engine, and self-learning evolution
132/// - **Context manager**: Token budgeting, context assembly, and summarization
133/// - **Tool orchestrator**: DAG-based multi-tool execution with streaming output
134/// - **MCP client**: Multi-server support for Model Context Protocol
135/// - **Index state**: AST-based code indexing and semantic retrieval
136/// - **Security**: Sanitization, exfiltration detection, adversarial probes
137/// - **Metrics**: Token usage, latency, cost, and anomaly tracking
138///
139/// # Channel Contract
140///
141/// The agent requires a [`Channel`] implementation for user interaction:
142/// - Sends agent responses via `channel.send(message)`
143/// - Receives user input via `channel.recv()` / `channel.recv_internal()`
144/// - Supports structured events: tool invocations, tool output, streaming updates
145///
146/// # Lifecycle
147///
148/// 1. Create with [`Self::new`] or [`Self::new_with_registry_arc`]
149/// 2. Run main loop with [`Self::run`]
150/// 3. Clean up with [`Self::shutdown`] to persist state and close resources
151pub struct Agent<C: Channel> {
152    provider: AnyProvider,
153    /// Dedicated embedding provider. Resolved once at bootstrap from `[[llm.providers]]`
154    /// (the entry with `embed = true`, or first entry with `embedding_model` set).
155    /// Falls back to `provider.clone()` when no dedicated entry exists.
156    /// **Never replaced** by `/provider switch`.
157    embedding_provider: AnyProvider,
158    channel: C,
159    pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
160    pub(super) msg: MessageState,
161    pub(super) memory_state: MemoryState,
162    pub(super) skill_state: SkillState,
163    pub(super) context_manager: context_manager::ContextManager,
164    pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
165    pub(super) learning_engine: learning_engine::LearningEngine,
166    pub(super) feedback: FeedbackState,
167    pub(super) runtime: RuntimeConfig,
168    pub(super) mcp: McpState,
169    pub(super) index: IndexState,
170    pub(super) session: SessionState,
171    pub(super) debug_state: DebugState,
172    pub(super) instructions: InstructionState,
173    pub(super) security: SecurityState,
174    pub(super) experiments: ExperimentState,
175    pub(super) compression: CompressionState,
176    pub(super) lifecycle: LifecycleState,
177    pub(super) providers: ProviderState,
178    pub(super) metrics: MetricsState,
179    pub(super) orchestration: OrchestrationState,
180    /// Focus agent state: active session tracking, knowledge block, reminder counters (#1850).
181    pub(super) focus: focus::FocusState,
182    /// `SideQuest` state: cursor tracking, turn counter, eviction stats (#1885).
183    pub(super) sidequest: sidequest::SidequestState,
184    /// Tool filtering, dependency tracking, and iteration bookkeeping.
185    pub(super) tool_state: ToolState,
186    /// MARCH self-check pipeline, built at startup and rebuilt on provider swap.
187    #[cfg(feature = "self-check")]
188    pub(super) quality: Option<std::sync::Arc<crate::quality::SelfCheckPipeline>>,
189}
190
191/// Control flow signal returned by [`Agent::apply_dispatch_result`].
192enum DispatchFlow {
193    /// The command requested exit; the agent loop should `break`.
194    Break,
195    /// The command was handled; the agent loop should `continue`.
196    Continue,
197    /// The command was not recognised; the agent loop should fall through.
198    Fallthrough,
199}
200
201impl<C: Channel> Agent<C> {
202    /// Create a new agent instance with the given LLM provider, I/O channel, and subsystems.
203    ///
204    /// # Arguments
205    ///
206    /// * `provider` — Multi-model LLM provider (Claude, `OpenAI`, Ollama, Candle)
207    /// * `channel` — I/O abstraction for user interaction (CLI, Telegram, TUI, etc.)
208    /// * `registry` — Skill registry; moved into an internal `Arc<RwLock<_>>` for sharing
209    /// * `matcher` — Optional semantic skill matcher (e.g., Qdrant, BM25). If `None`,
210    ///   skills are matched by exact name only
211    /// * `max_active_skills` — Max concurrent skills in execution (must be > 0)
212    /// * `tool_executor` — Trait object for executing shell, web, and custom tools
213    ///
214    /// # Initialization
215    ///
216    /// The constructor:
217    /// 1. Wraps the skill registry into `Arc<RwLock<_>>` internally
218    /// 2. Builds the system prompt from registered skills
219    /// 3. Initializes all subsystems (memory, context manager, metrics, security)
220    /// 4. Returns a ready-to-run agent
221    ///
222    /// # Panics
223    ///
224    /// Panics if `max_active_skills` is 0.
225    #[must_use]
226    pub fn new(
227        provider: AnyProvider,
228        channel: C,
229        registry: SkillRegistry,
230        matcher: Option<SkillMatcherBackend>,
231        max_active_skills: usize,
232        tool_executor: impl ToolExecutor + 'static,
233    ) -> Self {
234        let registry = Arc::new(RwLock::new(registry));
235        let embedding_provider = provider.clone();
236        Self::new_with_registry_arc(
237            provider,
238            embedding_provider,
239            channel,
240            registry,
241            matcher,
242            max_active_skills,
243            tool_executor,
244        )
245    }
246
247    /// Create an agent from a pre-wrapped registry Arc, allowing the caller to
248    /// share the same Arc with other components (e.g. [`crate::SkillLoaderExecutor`]).
249    ///
250    /// # Panics
251    ///
252    /// Panics if the registry `RwLock` is poisoned.
253    #[must_use]
254    #[allow(clippy::too_many_lines)] // flat struct literal initializing all Agent sub-structs — one field per sub-struct, cannot be split further
255    pub fn new_with_registry_arc(
256        provider: AnyProvider,
257        embedding_provider: AnyProvider,
258        channel: C,
259        registry: Arc<RwLock<SkillRegistry>>,
260        matcher: Option<SkillMatcherBackend>,
261        max_active_skills: usize,
262        tool_executor: impl ToolExecutor + 'static,
263    ) -> Self {
264        debug_assert!(max_active_skills > 0, "max_active_skills must be > 0");
265        let all_skills: Vec<Skill> = {
266            let reg = registry.read();
267            reg.all_meta()
268                .iter()
269                .filter_map(|m| reg.get_skill(&m.name).ok())
270                .collect()
271        };
272        let empty_trust = HashMap::new();
273        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
274        let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
275        let system_prompt = build_system_prompt(&skills_prompt, None);
276        tracing::debug!(len = system_prompt.len(), "initial system prompt built");
277        tracing::trace!(prompt = %system_prompt, "full system prompt");
278
279        let initial_prompt_tokens = estimate_tokens(&system_prompt) as u64;
280        let token_counter = Arc::new(TokenCounter::new());
281        Self {
282            provider,
283            embedding_provider,
284            channel,
285            tool_executor: Arc::new(tool_executor),
286            msg: MessageState {
287                messages: vec![Message {
288                    role: Role::System,
289                    content: system_prompt,
290                    parts: vec![],
291                    metadata: MessageMetadata::default(),
292                }],
293                message_queue: VecDeque::new(),
294                pending_image_parts: Vec::new(),
295                last_persisted_message_id: None,
296                deferred_db_hide_ids: Vec::new(),
297                deferred_db_summaries: Vec::new(),
298            },
299            memory_state: MemoryState::default(),
300            skill_state: SkillState::new(registry, matcher, max_active_skills, skills_prompt),
301            context_manager: context_manager::ContextManager::new(),
302            tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
303            learning_engine: learning_engine::LearningEngine::new(),
304            feedback: FeedbackState::default(),
305            debug_state: DebugState::default(),
306            runtime: RuntimeConfig::default(),
307            mcp: McpState::default(),
308            index: IndexState::default(),
309            session: SessionState::new(),
310            instructions: InstructionState::default(),
311            security: SecurityState::default(),
312            experiments: ExperimentState::new(),
313            compression: CompressionState::default(),
314            lifecycle: LifecycleState::new(),
315            providers: ProviderState::new(initial_prompt_tokens),
316            metrics: MetricsState::new(token_counter),
317            orchestration: OrchestrationState::default(),
318            focus: focus::FocusState::default(),
319            sidequest: sidequest::SidequestState::default(),
320            tool_state: ToolState::default(),
321            #[cfg(feature = "self-check")]
322            quality: None,
323        }
324    }
325
326    /// Poll all active sub-agents for completed/failed/canceled results.
327    ///
328    /// Non-blocking: returns immediately with a list of `(task_id, result)` pairs
329    /// for agents that have finished. Each completed agent is removed from the manager.
330    pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
331        let Some(mgr) = &mut self.orchestration.subagent_manager else {
332            return vec![];
333        };
334
335        let finished: Vec<String> = mgr
336            .statuses()
337            .into_iter()
338            .filter_map(|(id, status)| {
339                if matches!(
340                    status.state,
341                    zeph_subagent::SubAgentState::Completed
342                        | zeph_subagent::SubAgentState::Failed
343                        | zeph_subagent::SubAgentState::Canceled
344                ) {
345                    Some(id)
346                } else {
347                    None
348                }
349            })
350            .collect();
351
352        let mut results = vec![];
353        for task_id in finished {
354            match mgr.collect(&task_id).await {
355                Ok(result) => results.push((task_id, result)),
356                Err(e) => {
357                    tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
358                }
359            }
360        }
361        results
362    }
363
364    /// Call the LLM to generate a structured session summary with a configurable timeout.
365    ///
366    /// Falls back to plain-text chat if structured output fails or times out. Returns `None` on
367    /// any failure, logging a warning — callers must treat `None` as "skip storage".
368    ///
369    /// Each LLM attempt is bounded by `shutdown_summary_timeout_secs`; in the worst case
370    /// (structured call times out and plain-text fallback also times out) this adds up to
371    /// `2 * shutdown_summary_timeout_secs` of shutdown latency.
372    async fn call_llm_for_session_summary(
373        &self,
374        chat_messages: &[Message],
375    ) -> Option<zeph_memory::StructuredSummary> {
376        let timeout_dur = std::time::Duration::from_secs(
377            self.memory_state.compaction.shutdown_summary_timeout_secs,
378        );
379        match tokio::time::timeout(
380            timeout_dur,
381            self.provider
382                .chat_typed_erased::<zeph_memory::StructuredSummary>(chat_messages),
383        )
384        .await
385        {
386            Ok(Ok(s)) => Some(s),
387            Ok(Err(e)) => {
388                tracing::warn!(
389                    "shutdown summary: structured LLM call failed, falling back to plain: {e:#}"
390                );
391                self.plain_text_summary_fallback(chat_messages, timeout_dur)
392                    .await
393            }
394            Err(_) => {
395                tracing::warn!(
396                    "shutdown summary: structured LLM call timed out after {}s, falling back to plain",
397                    self.memory_state.compaction.shutdown_summary_timeout_secs
398                );
399                self.plain_text_summary_fallback(chat_messages, timeout_dur)
400                    .await
401            }
402        }
403    }
404
405    async fn plain_text_summary_fallback(
406        &self,
407        chat_messages: &[Message],
408        timeout_dur: std::time::Duration,
409    ) -> Option<zeph_memory::StructuredSummary> {
410        match tokio::time::timeout(timeout_dur, self.provider.chat(chat_messages)).await {
411            Ok(Ok(plain)) => Some(zeph_memory::StructuredSummary {
412                summary: plain,
413                key_facts: vec![],
414                entities: vec![],
415            }),
416            Ok(Err(e)) => {
417                tracing::warn!("shutdown summary: plain LLM fallback failed: {e:#}");
418                None
419            }
420            Err(_) => {
421                tracing::warn!("shutdown summary: plain LLM fallback timed out");
422                None
423            }
424        }
425    }
426
427    /// Persist tombstone `ToolResult` messages for any assistant `ToolUse` parts that were written
428    /// to the DB during this session but never paired with a `ToolResult` (e.g. because stdin
429    /// closed while tool execution was in progress). Without this the next session startup strips
430    /// those assistant messages and emits orphan warnings.
431    async fn flush_orphaned_tool_use_on_shutdown(&mut self) {
432        use zeph_llm::provider::{MessagePart, Role};
433
434        // Walk messages in reverse: if the last assistant message (ignoring any trailing
435        // system messages) has ToolUse parts and is NOT immediately followed by a user
436        // message whose ToolResult ids cover those ToolUse ids, persist tombstones.
437        let msgs = &self.msg.messages;
438        // Find last assistant message index.
439        let Some(asst_idx) = msgs.iter().rposition(|m| m.role == Role::Assistant) else {
440            return;
441        };
442        let asst_msg = &msgs[asst_idx];
443        let tool_use_ids: Vec<(&str, &str, &serde_json::Value)> = asst_msg
444            .parts
445            .iter()
446            .filter_map(|p| {
447                if let MessagePart::ToolUse { id, name, input } = p {
448                    Some((id.as_str(), name.as_str(), input))
449                } else {
450                    None
451                }
452            })
453            .collect();
454        if tool_use_ids.is_empty() {
455            return;
456        }
457
458        // Check whether a following user message already pairs all ToolUse ids.
459        let paired_ids: std::collections::HashSet<&str> = msgs
460            .get(asst_idx + 1..)
461            .into_iter()
462            .flatten()
463            .filter(|m| m.role == Role::User)
464            .flat_map(|m| m.parts.iter())
465            .filter_map(|p| {
466                if let MessagePart::ToolResult { tool_use_id, .. } = p {
467                    Some(tool_use_id.as_str())
468                } else {
469                    None
470                }
471            })
472            .collect();
473
474        let unpaired: Vec<zeph_llm::provider::ToolUseRequest> = tool_use_ids
475            .iter()
476            .filter(|(id, _, _)| !paired_ids.contains(*id))
477            .map(|(id, name, input)| zeph_llm::provider::ToolUseRequest {
478                id: (*id).to_owned(),
479                name: (*name).to_owned().into(),
480                input: (*input).clone(),
481            })
482            .collect();
483
484        if unpaired.is_empty() {
485            return;
486        }
487
488        tracing::info!(
489            count = unpaired.len(),
490            "shutdown: persisting tombstone ToolResults for unpaired in-flight tool calls"
491        );
492        self.persist_cancelled_tool_results(&unpaired).await;
493    }
494
495    /// Generate and store a lightweight session summary at shutdown when no hard compaction fired.
496    ///
497    /// Guards:
498    /// - `shutdown_summary` config must be enabled
499    /// - `conversation_id` must be set (memory must be attached)
500    /// - no existing session summary in the store (primary guard — resilient to failed Qdrant writes)
501    /// - at least `shutdown_summary_min_messages` user-turn messages in history
502    ///
503    /// All errors are logged as warnings and swallowed — shutdown must never fail.
504    async fn maybe_store_shutdown_summary(&mut self) {
505        if !self.memory_state.compaction.shutdown_summary {
506            return;
507        }
508        let Some(memory) = self.memory_state.persistence.memory.clone() else {
509            return;
510        };
511        let Some(conversation_id) = self.memory_state.persistence.conversation_id else {
512            return;
513        };
514
515        // Primary guard: check if a summary already exists (handles failed Qdrant writes too).
516        match memory.has_session_summary(conversation_id).await {
517            Ok(true) => {
518                tracing::debug!("shutdown summary: session already has a summary, skipping");
519                return;
520            }
521            Ok(false) => {}
522            Err(e) => {
523                tracing::warn!("shutdown summary: failed to check existing summary: {e:#}");
524                return;
525            }
526        }
527
528        // Count user-turn messages only (skip system prompt at index 0).
529        let user_count = self
530            .msg
531            .messages
532            .iter()
533            .skip(1)
534            .filter(|m| m.role == Role::User)
535            .count();
536        if user_count < self.memory_state.compaction.shutdown_summary_min_messages {
537            tracing::debug!(
538                user_count,
539                min = self.memory_state.compaction.shutdown_summary_min_messages,
540                "shutdown summary: too few user messages, skipping"
541            );
542            return;
543        }
544
545        // TUI status — send errors silently ignored (TUI may already be gone at shutdown).
546        let _ = self.channel.send_status("Saving session summary...").await;
547
548        // Collect last N messages (skip system prompt at index 0).
549        let max = self.memory_state.compaction.shutdown_summary_max_messages;
550        if max == 0 {
551            tracing::debug!("shutdown summary: max_messages=0, skipping");
552            return;
553        }
554        let non_system: Vec<_> = self.msg.messages.iter().skip(1).collect();
555        let slice = if non_system.len() > max {
556            &non_system[non_system.len() - max..]
557        } else {
558            &non_system[..]
559        };
560
561        let msgs_for_prompt: Vec<(zeph_memory::MessageId, String, String)> = slice
562            .iter()
563            .map(|m| {
564                let role = match m.role {
565                    Role::User => "user".to_owned(),
566                    Role::Assistant => "assistant".to_owned(),
567                    Role::System => "system".to_owned(),
568                };
569                (zeph_memory::MessageId(0), role, m.content.clone())
570            })
571            .collect();
572
573        let prompt = zeph_memory::build_summarization_prompt(&msgs_for_prompt);
574        let chat_messages = vec![Message {
575            role: Role::User,
576            content: prompt,
577            parts: vec![],
578            metadata: MessageMetadata::default(),
579        }];
580
581        let Some(structured) = self.call_llm_for_session_summary(&chat_messages).await else {
582            let _ = self.channel.send_status("").await;
583            return;
584        };
585
586        if let Err(e) = memory
587            .store_shutdown_summary(conversation_id, &structured.summary, &structured.key_facts)
588            .await
589        {
590            tracing::warn!("shutdown summary: storage failed: {e:#}");
591        } else {
592            tracing::info!(
593                conversation_id = conversation_id.0,
594                "shutdown summary stored"
595            );
596        }
597
598        // Clear TUI status.
599        let _ = self.channel.send_status("").await;
600    }
601
602    /// Gracefully shut down the agent and persist state.
603    ///
604    /// Performs the following cleanup:
605    ///
606    /// 1. **Message persistence** — Deferred database writes (hide/summary operations)
607    ///    are flushed to memory or disk
608    /// 2. **Provider state** — LLM router state (e.g., Thompson sampling counters) is saved
609    ///    to the vault
610    /// 3. **Sub-agents** — All active sub-agent tasks are terminated
611    /// 4. **MCP servers** — All connected Model Context Protocol servers are shut down
612    /// 5. **Metrics finalization** — Compaction metrics and session metrics are recorded
613    /// 6. **Memory finalization** — Vector stores and semantic indices are flushed
614    /// 7. **Skill state** — Self-learning engine saves evolved skill definitions
615    ///
616    /// Call this before dropping the agent to ensure no data loss.
617    pub async fn shutdown(&mut self) {
618        let _ = self.channel.send_status("Shutting down...").await;
619
620        // CRIT-1: persist Thompson state accumulated during this session.
621        self.provider.save_router_state();
622
623        // Persist AdaptOrch Beta-arm table alongside Thompson state.
624        if let Some(ref advisor) = self.orchestration.topology_advisor
625            && let Err(e) = advisor.save()
626        {
627            tracing::warn!(error = %e, "adaptorch: failed to persist state");
628        }
629
630        if let Some(ref mut mgr) = self.orchestration.subagent_manager {
631            mgr.shutdown_all();
632        }
633
634        if let Some(ref manager) = self.mcp.manager {
635            manager.shutdown_all_shared().await;
636        }
637
638        // Finalize compaction trajectory: push the last open segment into the Vec.
639        // This segment would otherwise only be pushed when the next hard compaction fires,
640        // which never happens at session end.
641        if let Some(turns) = self.context_manager.turns_since_last_hard_compaction {
642            self.update_metrics(|m| {
643                m.compaction_turns_after_hard.push(turns);
644            });
645            self.context_manager.turns_since_last_hard_compaction = None;
646        }
647
648        if let Some(ref tx) = self.metrics.metrics_tx {
649            let m = tx.borrow();
650            if m.filter_applications > 0 {
651                #[allow(clippy::cast_precision_loss)]
652                let pct = if m.filter_raw_tokens > 0 {
653                    m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
654                } else {
655                    0.0
656                };
657                tracing::info!(
658                    raw_tokens = m.filter_raw_tokens,
659                    saved_tokens = m.filter_saved_tokens,
660                    applications = m.filter_applications,
661                    "tool output filtering saved ~{} tokens ({pct:.0}%)",
662                    m.filter_saved_tokens,
663                );
664            }
665            if m.compaction_hard_count > 0 {
666                tracing::info!(
667                    hard_compactions = m.compaction_hard_count,
668                    turns_after_hard = ?m.compaction_turns_after_hard,
669                    "hard compaction trajectory"
670                );
671            }
672        }
673
674        // Flush tombstone ToolResults for any assistant ToolUse that was persisted but never
675        // paired with a ToolResult (e.g. stdin EOF mid-execution). Without this the next session
676        // startup strips the orphaned ToolUse and emits warnings.
677        self.flush_orphaned_tool_use_on_shutdown().await;
678
679        self.lifecycle.supervisor.abort_all();
680
681        self.maybe_store_shutdown_summary().await;
682        self.maybe_store_session_digest().await;
683
684        tracing::info!("agent shutdown complete");
685    }
686
687    /// Run the chat loop, receiving messages via the channel until EOF or shutdown.
688    ///
689    /// # Errors
690    ///
691    /// Returns an error if channel I/O or LLM communication fails.
692    /// Refresh sub-agent metrics snapshot for the TUI metrics panel.
693    fn refresh_subagent_metrics(&mut self) {
694        let Some(ref mgr) = self.orchestration.subagent_manager else {
695            return;
696        };
697        let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
698            .statuses()
699            .into_iter()
700            .map(|(id, s)| {
701                let def = mgr.agents_def(&id);
702                crate::metrics::SubAgentMetrics {
703                    name: def.map_or_else(|| id[..8.min(id.len())].to_owned(), |d| d.name.clone()),
704                    id: id.clone(),
705                    state: format!("{:?}", s.state).to_lowercase(),
706                    turns_used: s.turns_used,
707                    max_turns: def.map_or(20, |d| d.permissions.max_turns),
708                    background: def.is_some_and(|d| d.permissions.background),
709                    elapsed_secs: s.started_at.elapsed().as_secs(),
710                    permission_mode: def.map_or_else(String::new, |d| {
711                        use zeph_subagent::def::PermissionMode;
712                        match d.permissions.permission_mode {
713                            PermissionMode::Default => String::new(),
714                            PermissionMode::AcceptEdits => "accept_edits".into(),
715                            PermissionMode::DontAsk => "dont_ask".into(),
716                            PermissionMode::BypassPermissions => "bypass_permissions".into(),
717                            PermissionMode::Plan => "plan".into(),
718                        }
719                    }),
720                    transcript_dir: mgr
721                        .agent_transcript_dir(&id)
722                        .map(|p| p.to_string_lossy().into_owned()),
723                }
724            })
725            .collect();
726        self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
727    }
728
729    /// Non-blocking poll: notify the user when background sub-agents complete.
730    async fn notify_completed_subagents(&mut self) -> Result<(), error::AgentError> {
731        let completed = self.poll_subagents().await;
732        for (task_id, result) in completed {
733            let notice = if result.is_empty() {
734                format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
735            } else {
736                format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
737            };
738            if let Err(e) = self.channel.send(&notice).await {
739                tracing::warn!(error = %e, "failed to send sub-agent completion notice");
740            }
741        }
742        Ok(())
743    }
744
745    /// Run the agent main loop.
746    ///
747    /// # Errors
748    ///
749    /// Returns an error if the channel, LLM provider, or tool execution encounters a fatal error.
750    #[allow(clippy::too_many_lines)] // run loop is inherently large; each branch is independent
751    pub async fn run(&mut self) -> Result<(), error::AgentError>
752    where
753        C: 'static,
754    {
755        if let Some(mut rx) = self.lifecycle.warmup_ready.take()
756            && !*rx.borrow()
757        {
758            let _ = rx.changed().await;
759            if !*rx.borrow() {
760                tracing::warn!("model warmup did not complete successfully");
761            }
762        }
763
764        // Load the session digest once at session start for context injection.
765        self.load_and_cache_session_digest().await;
766        self.maybe_send_resume_recap().await;
767
768        loop {
769            self.apply_provider_override();
770            self.check_tool_refresh().await;
771            self.process_pending_elicitations().await;
772            self.refresh_subagent_metrics();
773            self.notify_completed_subagents().await?;
774            self.drain_channel();
775
776            let (text, image_parts) = if let Some(queued) = self.msg.message_queue.pop_front() {
777                self.notify_queue_count().await;
778                if queued.raw_attachments.is_empty() {
779                    (queued.text, queued.image_parts)
780                } else {
781                    let msg = crate::channel::ChannelMessage {
782                        text: queued.text,
783                        attachments: queued.raw_attachments,
784                    };
785                    self.resolve_message(msg).await
786                }
787            } else {
788                match self.next_event().await? {
789                    None | Some(LoopEvent::Shutdown) => break,
790                    Some(LoopEvent::SkillReload) => {
791                        self.reload_skills().await;
792                        continue;
793                    }
794                    Some(LoopEvent::InstructionReload) => {
795                        self.reload_instructions();
796                        continue;
797                    }
798                    Some(LoopEvent::ConfigReload) => {
799                        self.reload_config();
800                        continue;
801                    }
802                    Some(LoopEvent::UpdateNotification(msg)) => {
803                        if let Err(e) = self.channel.send(&msg).await {
804                            tracing::warn!("failed to send update notification: {e}");
805                        }
806                        continue;
807                    }
808                    Some(LoopEvent::ExperimentCompleted(msg)) => {
809                        self.experiments.cancel = None;
810                        if let Err(e) = self.channel.send(&msg).await {
811                            tracing::warn!("failed to send experiment completion: {e}");
812                        }
813                        continue;
814                    }
815                    Some(LoopEvent::ScheduledTask(prompt)) => {
816                        let text = format!("{SCHEDULED_TASK_PREFIX}{prompt}");
817                        let msg = crate::channel::ChannelMessage {
818                            text,
819                            attachments: Vec::new(),
820                        };
821                        self.drain_channel();
822                        self.resolve_message(msg).await
823                    }
824                    Some(LoopEvent::TaskInjected(injection)) => {
825                        if let Some(ref mut ls) = self.lifecycle.user_loop {
826                            ls.iteration += 1;
827                            tracing::info!(iteration = ls.iteration, "loop: tick");
828                        }
829                        let msg = crate::channel::ChannelMessage {
830                            text: injection.prompt,
831                            attachments: Vec::new(),
832                        };
833                        self.drain_channel();
834                        self.resolve_message(msg).await
835                    }
836                    Some(LoopEvent::FileChanged(event)) => {
837                        self.handle_file_changed(event).await;
838                        continue;
839                    }
840                    Some(LoopEvent::Message(msg)) => {
841                        self.drain_channel();
842                        self.resolve_message(msg).await
843                    }
844                }
845            };
846
847            let trimmed = text.trim();
848
849            // M3: extract flagged URLs from all slash commands before any registry dispatch,
850            // so `/skill install <url>` and similar commands populate user_provided_urls.
851            if trimmed.starts_with('/') {
852                let slash_urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
853                if !slash_urls.is_empty() {
854                    self.security.user_provided_urls.write().extend(slash_urls);
855                }
856            }
857
858            // Registry dispatch: build the command registry, construct CommandContext, dispatch.
859            // The registry is built inline (all handlers are ZSTs) to avoid borrow-checker
860            // conflicts when constructing CommandContext from Agent<C> fields.
861            // Build context first so that borrows outlive the registry (drop order matters).
862            let session_impl = command_context_impls::SessionAccessImpl {
863                supports_exit: self.channel.supports_exit(),
864            };
865            let mut messages_impl = command_context_impls::MessageAccessImpl {
866                msg: &mut self.msg,
867                tool_state: &mut self.tool_state,
868                providers: &mut self.providers,
869                metrics: &self.metrics,
870                security: &mut self.security,
871                tool_orchestrator: &mut self.tool_orchestrator,
872            };
873            // sink_adapter declared before reg so it is dropped after reg (LIFO).
874            let mut sink_adapter = crate::channel::ChannelSinkAdapter(&mut self.channel);
875            // null_agent must be declared before reg so it lives longer (LIFO drop order).
876            let mut null_agent = zeph_commands::NullAgent;
877            let registry_handled = {
878                use zeph_commands::CommandRegistry;
879                use zeph_commands::handlers::debug::{
880                    DebugDumpCommand, DumpFormatCommand, LogCommand,
881                };
882                use zeph_commands::handlers::help::HelpCommand;
883                use zeph_commands::handlers::session::{
884                    ClearCommand, ClearQueueCommand, ExitCommand, QuitCommand, ResetCommand,
885                };
886
887                let mut reg = CommandRegistry::new();
888                reg.register(ExitCommand);
889                reg.register(QuitCommand);
890                reg.register(ClearCommand);
891                reg.register(ResetCommand);
892                reg.register(ClearQueueCommand);
893                reg.register(LogCommand);
894                reg.register(DebugDumpCommand);
895                reg.register(DumpFormatCommand);
896                reg.register(HelpCommand);
897                #[cfg(test)]
898                reg.register(test_stubs::TestErrorCommand);
899
900                let mut ctx = zeph_commands::CommandContext {
901                    sink: &mut sink_adapter,
902                    debug: &mut self.debug_state,
903                    messages: &mut messages_impl,
904                    session: &session_impl,
905                    agent: &mut null_agent,
906                };
907                reg.dispatch(&mut ctx, trimmed).await
908            };
909            let session_reg_missed = registry_handled.is_none();
910            match self
911                .apply_dispatch_result(registry_handled, trimmed, false)
912                .await
913            {
914                DispatchFlow::Break => break,
915                DispatchFlow::Continue => continue,
916                DispatchFlow::Fallthrough => {
917                    // Not handled by the session/debug registry; try agent-command registry.
918                }
919            }
920
921            // Agent-command registry: handlers access Agent<C> directly.
922            // Null sentinels declared here so they outlive ctx regardless of whether the `if`
923            // block is entered. `ctx` borrows both `self` and the sentinels; it must drop before
924            // any subsequent `self.channel.*` calls. Because Rust drops in LIFO order, the
925            // sentinels here will outlive ctx (ctx is declared later, inside the block).
926            let mut agent_null_debug = command_context_impls::NullDebugAccess;
927            let mut agent_null_messages = command_context_impls::NullMessageAccess;
928            let agent_null_session = command_context_impls::NullSessionAccess;
929            let mut agent_null_sink = zeph_commands::NullSink;
930            let agent_result: Option<
931                Result<zeph_commands::CommandOutput, zeph_commands::CommandError>,
932            > = if session_reg_missed {
933                use zeph_commands::CommandRegistry;
934                use zeph_commands::handlers::{
935                    agent_cmd::AgentCommand,
936                    compaction::{CompactCommand, NewConversationCommand, RecapCommand},
937                    experiment::ExperimentCommand,
938                    loop_cmd::LoopCommand,
939                    lsp::LspCommand,
940                    mcp::McpCommand,
941                    memory::{GraphCommand, GuidelinesCommand, MemoryCommand},
942                    misc::{CacheStatsCommand, ImageCommand},
943                    model::{ModelCommand, ProviderCommand},
944                    plan::PlanCommand,
945                    plugins::PluginsCommand,
946                    policy::PolicyCommand,
947                    scheduler::SchedulerCommand,
948                    skill::{FeedbackCommand, SkillCommand, SkillsCommand},
949                    status::{FocusCommand, GuardrailCommand, SideQuestCommand, StatusCommand},
950                };
951
952                let mut agent_reg = CommandRegistry::new();
953                agent_reg.register(MemoryCommand);
954                agent_reg.register(GraphCommand);
955                agent_reg.register(GuidelinesCommand);
956                agent_reg.register(ModelCommand);
957                agent_reg.register(ProviderCommand);
958                // Phase 6 migrations: /skill, /skills, /feedback use clone-before-await pattern.
959                agent_reg.register(SkillCommand);
960                agent_reg.register(SkillsCommand);
961                agent_reg.register(FeedbackCommand);
962                agent_reg.register(McpCommand);
963                agent_reg.register(PolicyCommand);
964                agent_reg.register(SchedulerCommand);
965                agent_reg.register(LspCommand);
966                // Phase 4 migrations (Send-safe commands):
967                agent_reg.register(CacheStatsCommand);
968                agent_reg.register(ImageCommand);
969                agent_reg.register(StatusCommand);
970                agent_reg.register(GuardrailCommand);
971                agent_reg.register(FocusCommand);
972                agent_reg.register(SideQuestCommand);
973                agent_reg.register(AgentCommand);
974                // Phase 5 migrations (Send-compatible):
975                agent_reg.register(CompactCommand);
976                agent_reg.register(NewConversationCommand);
977                agent_reg.register(RecapCommand);
978                agent_reg.register(ExperimentCommand);
979                agent_reg.register(PlanCommand);
980                agent_reg.register(LoopCommand);
981                agent_reg.register(PluginsCommand);
982
983                let mut ctx = zeph_commands::CommandContext {
984                    sink: &mut agent_null_sink,
985                    debug: &mut agent_null_debug,
986                    messages: &mut agent_null_messages,
987                    session: &agent_null_session,
988                    agent: self,
989                };
990                // self is reborrowed; ctx drops at end of this block.
991                agent_reg.dispatch(&mut ctx, trimmed).await
992            } else {
993                None
994            };
995            // self.channel is available again here (ctx borrow dropped above).
996            // Post-dispatch learning hook for `/skill reject` / `/feedback` is triggered
997            // inside apply_dispatch_result when with_learning = true.
998            match self
999                .apply_dispatch_result(agent_result, trimmed, true)
1000                .await
1001            {
1002                DispatchFlow::Break => break,
1003                DispatchFlow::Continue => continue,
1004                DispatchFlow::Fallthrough => {
1005                    // Not handled by agent registry; fall through to existing dispatch.
1006                }
1007            }
1008
1009            match self.handle_builtin_command(trimmed) {
1010                Some(true) => break,
1011                Some(false) => continue,
1012                None => {}
1013            }
1014
1015            self.process_user_message(text, image_parts).await?;
1016        }
1017
1018        // autoDream: run background memory consolidation if conditions are met (#2697).
1019        // Runs with a timeout — partial state is acceptable for MVP.
1020        self.maybe_autodream().await;
1021
1022        // Flush trace collector on normal exit (C-04: Drop handles error/panic paths).
1023        if let Some(ref mut tc) = self.debug_state.trace_collector {
1024            tc.finish();
1025        }
1026
1027        Ok(())
1028    }
1029
1030    /// Dispatch a slash-command registry result and flush the channel.
1031    ///
1032    /// Returns [`DispatchFlow::Break`] on exit, [`DispatchFlow::Continue`] when handled, or
1033    /// [`DispatchFlow::Fallthrough`] when `result` is `None`.
1034    /// When `with_learning` is `true`, triggers the post-command learning hook for `Message` output.
1035    async fn apply_dispatch_result(
1036        &mut self,
1037        result: Option<Result<zeph_commands::CommandOutput, zeph_commands::CommandError>>,
1038        command: &str,
1039        with_learning: bool,
1040    ) -> DispatchFlow {
1041        match result {
1042            Some(Ok(zeph_commands::CommandOutput::Exit)) => {
1043                let _ = self.channel.flush_chunks().await;
1044                DispatchFlow::Break
1045            }
1046            Some(Ok(
1047                zeph_commands::CommandOutput::Continue | zeph_commands::CommandOutput::Silent,
1048            )) => {
1049                let _ = self.channel.flush_chunks().await;
1050                DispatchFlow::Continue
1051            }
1052            Some(Ok(zeph_commands::CommandOutput::Message(msg))) => {
1053                let _ = self.channel.send(&msg).await;
1054                let _ = self.channel.flush_chunks().await;
1055                if with_learning {
1056                    self.maybe_trigger_post_command_learning(command).await;
1057                }
1058                DispatchFlow::Continue
1059            }
1060            Some(Err(e)) => {
1061                let _ = self.channel.send(&e.to_string()).await;
1062                let _ = self.channel.flush_chunks().await;
1063                tracing::warn!(command = %command, error = %e.0, "slash command failed");
1064                DispatchFlow::Continue
1065            }
1066            None => DispatchFlow::Fallthrough,
1067        }
1068    }
1069
1070    /// Apply any pending LLM provider override from ACP `set_session_config_option`.
1071    fn apply_provider_override(&mut self) {
1072        if let Some(ref slot) = self.providers.provider_override
1073            && let Some(new_provider) = slot.write().take()
1074        {
1075            tracing::debug!(provider = new_provider.name(), "ACP model override applied");
1076            self.provider = new_provider;
1077        }
1078    }
1079
1080    /// Poll all event sources and return the next [`LoopEvent`].
1081    ///
1082    /// Returns `None` when the inbound channel closes (graceful shutdown).
1083    ///
1084    /// # Errors
1085    ///
1086    /// Propagates channel receive errors.
1087    async fn next_event(&mut self) -> Result<Option<LoopEvent>, error::AgentError> {
1088        let event = tokio::select! {
1089            result = self.channel.recv() => {
1090                return Ok(result?.map(LoopEvent::Message));
1091            }
1092            () = shutdown_signal(&mut self.lifecycle.shutdown) => {
1093                tracing::info!("shutting down");
1094                LoopEvent::Shutdown
1095            }
1096            Some(_) = recv_optional(&mut self.skill_state.skill_reload_rx) => {
1097                LoopEvent::SkillReload
1098            }
1099            Some(_) = recv_optional(&mut self.instructions.reload_rx) => {
1100                LoopEvent::InstructionReload
1101            }
1102            Some(_) = recv_optional(&mut self.lifecycle.config_reload_rx) => {
1103                LoopEvent::ConfigReload
1104            }
1105            Some(msg) = recv_optional(&mut self.lifecycle.update_notify_rx) => {
1106                LoopEvent::UpdateNotification(msg)
1107            }
1108            Some(msg) = recv_optional(&mut self.experiments.notify_rx) => {
1109                LoopEvent::ExperimentCompleted(msg)
1110            }
1111            Some(prompt) = recv_optional(&mut self.lifecycle.custom_task_rx) => {
1112                tracing::info!("scheduler: injecting custom task as agent turn");
1113                LoopEvent::ScheduledTask(prompt)
1114            }
1115            () = async {
1116                if let Some(ref mut ls) = self.lifecycle.user_loop {
1117                    if ls.cancel_tx.is_cancelled() {
1118                        std::future::pending::<()>().await;
1119                    } else {
1120                        ls.interval.tick().await;
1121                    }
1122                } else {
1123                    std::future::pending::<()>().await;
1124                }
1125            } => {
1126                // Re-check user_loop after the tick — /loop stop may have fired between the
1127                // interval firing and this arm executing. Returning Ok(None) causes the caller
1128                // to `continue` without injecting a stale or empty prompt.
1129                let Some(ls) = self.lifecycle.user_loop.as_ref() else {
1130                    return Ok(None);
1131                };
1132                if ls.cancel_tx.is_cancelled() {
1133                    self.lifecycle.user_loop = None;
1134                    return Ok(None);
1135                }
1136                let prompt = ls.prompt.clone();
1137                LoopEvent::TaskInjected(task_injection::TaskInjection { prompt })
1138            }
1139            Some(event) = recv_optional(&mut self.lifecycle.file_changed_rx) => {
1140                LoopEvent::FileChanged(event)
1141            }
1142        };
1143        Ok(Some(event))
1144    }
1145
1146    async fn resolve_message(
1147        &self,
1148        msg: crate::channel::ChannelMessage,
1149    ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
1150        use crate::channel::{Attachment, AttachmentKind};
1151        use zeph_llm::provider::{ImageData, MessagePart};
1152
1153        let text_base = msg.text.clone();
1154
1155        let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
1156            .attachments
1157            .into_iter()
1158            .partition(|a| a.kind == AttachmentKind::Audio);
1159
1160        tracing::debug!(
1161            audio = audio_attachments.len(),
1162            has_stt = self.providers.stt.is_some(),
1163            "resolve_message attachments"
1164        );
1165
1166        let text = if !audio_attachments.is_empty()
1167            && let Some(stt) = self.providers.stt.as_ref()
1168        {
1169            let mut transcribed_parts = Vec::new();
1170            for attachment in &audio_attachments {
1171                if attachment.data.len() > MAX_AUDIO_BYTES {
1172                    tracing::warn!(
1173                        size = attachment.data.len(),
1174                        max = MAX_AUDIO_BYTES,
1175                        "audio attachment exceeds size limit, skipping"
1176                    );
1177                    continue;
1178                }
1179                match stt
1180                    .transcribe(&attachment.data, attachment.filename.as_deref())
1181                    .await
1182                {
1183                    Ok(result) => {
1184                        tracing::info!(
1185                            len = result.text.len(),
1186                            language = ?result.language,
1187                            "audio transcribed"
1188                        );
1189                        transcribed_parts.push(result.text);
1190                    }
1191                    Err(e) => {
1192                        tracing::error!(error = %e, "audio transcription failed");
1193                    }
1194                }
1195            }
1196            if transcribed_parts.is_empty() {
1197                text_base
1198            } else {
1199                let transcribed = transcribed_parts.join("\n");
1200                if text_base.is_empty() {
1201                    transcribed
1202                } else {
1203                    format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
1204                }
1205            }
1206        } else {
1207            if !audio_attachments.is_empty() {
1208                tracing::warn!(
1209                    count = audio_attachments.len(),
1210                    "audio attachments received but no STT provider configured, dropping"
1211                );
1212            }
1213            text_base
1214        };
1215
1216        let mut image_parts = Vec::new();
1217        for attachment in image_attachments {
1218            if attachment.data.len() > MAX_IMAGE_BYTES {
1219                tracing::warn!(
1220                    size = attachment.data.len(),
1221                    max = MAX_IMAGE_BYTES,
1222                    "image attachment exceeds size limit, skipping"
1223                );
1224                continue;
1225            }
1226            let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
1227            image_parts.push(MessagePart::Image(Box::new(ImageData {
1228                data: attachment.data,
1229                mime_type,
1230            })));
1231        }
1232
1233        (text, image_parts)
1234    }
1235
1236    /// Create a new [`Turn`] for the given input and advance the turn counter.
1237    ///
1238    /// Clears per-turn state that must not carry over between turns:
1239    /// - per-turn `CancellationToken` (new token for each turn)
1240    /// - per-turn URL set in `SecurityState` (cleared here; re-populated in
1241    ///   `process_user_message_inner` after security checks)
1242    fn begin_turn(&mut self, input: turn::TurnInput) -> turn::Turn {
1243        let id = turn::TurnId(self.debug_state.iteration_counter as u64);
1244        self.debug_state.iteration_counter += 1;
1245        self.lifecycle.cancel_token = CancellationToken::new();
1246        self.security.user_provided_urls.write().clear();
1247        turn::Turn::new(id, input)
1248    }
1249
1250    /// Finalise a turn: copy accumulated timings into `MetricsState` and flush.
1251    ///
1252    /// Must be called exactly once per turn, after `process_user_message_inner` returns
1253    /// (regardless of success or error). Corresponds to the M2 resolution in the spec:
1254    /// `TurnMetrics.timings` is the single source of truth; `MetricsState.pending_timings`
1255    /// is populated from it here so the rest of the pipeline is unchanged.
1256    fn end_turn(&mut self, turn: turn::Turn) {
1257        self.metrics.pending_timings = turn.metrics.timings;
1258        self.flush_turn_timings();
1259        // Clear per-turn intent (FR-008): must not persist across turns.
1260        self.session.current_turn_intent = None;
1261    }
1262
1263    #[cfg_attr(
1264        feature = "profiling",
1265        tracing::instrument(name = "agent.turn", skip_all, fields(turn_id))
1266    )]
1267    async fn process_user_message(
1268        &mut self,
1269        text: String,
1270        image_parts: Vec<zeph_llm::provider::MessagePart>,
1271    ) -> Result<(), error::AgentError> {
1272        let input = turn::TurnInput::new(text, image_parts);
1273        let mut t = self.begin_turn(input);
1274
1275        let turn_idx = usize::try_from(t.id().0).unwrap_or(usize::MAX);
1276        tracing::Span::current().record("turn_id", t.id().0);
1277        // Record iteration start in trace collector (C-02: owned guard, no borrow held).
1278        self.debug_state
1279            .start_iteration_span(turn_idx, t.input.text.trim());
1280
1281        let result = self.process_user_message_inner(&mut t).await;
1282
1283        // Close iteration span regardless of outcome (partial trace preserved on error).
1284        let span_status = if result.is_ok() {
1285            crate::debug_dump::trace::SpanStatus::Ok
1286        } else {
1287            crate::debug_dump::trace::SpanStatus::Error {
1288                message: "iteration failed".to_owned(),
1289            }
1290        };
1291        self.debug_state.end_iteration_span(turn_idx, span_status);
1292
1293        self.end_turn(t);
1294        result
1295    }
1296
1297    async fn process_user_message_inner(
1298        &mut self,
1299        turn: &mut turn::Turn,
1300    ) -> Result<(), error::AgentError> {
1301        // Reap completed background tasks from the previous turn. The summarization signal
1302        // is applied here — between turns — so `unsummarized_count` is always reset on the
1303        // foreground without shared mutable state across tasks (S1 fix from critic review).
1304        let bg_signal = self.lifecycle.supervisor.reap();
1305        if bg_signal.did_summarize {
1306            self.memory_state.persistence.unsummarized_count = 0;
1307            tracing::debug!("background summarization completed; unsummarized_count reset");
1308        }
1309        {
1310            let snap = self.lifecycle.supervisor.metrics_snapshot();
1311            self.update_metrics(|m| {
1312                m.bg_inflight = snap.inflight as u64;
1313                m.bg_dropped = snap.total_dropped();
1314                m.bg_completed = snap.total_completed();
1315                m.bg_enrichment_inflight = snap.class_inflight[0] as u64;
1316                m.bg_telemetry_inflight = snap.class_inflight[1] as u64;
1317            });
1318        }
1319
1320        // Intentional ordering: reap() runs before abort_class() so completed tasks are
1321        // accounted in the metrics snapshot above. The TUI may show a stale enrichment
1322        // inflight count for one cycle when abort fires, but this self-corrects next turn.
1323        if self.runtime.supervisor_config.abort_enrichment_on_turn {
1324            self.lifecycle
1325                .supervisor
1326                .abort_class(agent_supervisor::TaskClass::Enrichment);
1327        }
1328
1329        // Wire the per-turn cancellation token into the cancel bridge.
1330        // The bridge translates the `cancel_signal` (Notify) into a CancellationToken cancel.
1331        // Abort the previous bridge before spawning to prevent unbounded accumulation (#2737).
1332        let signal = Arc::clone(&self.lifecycle.cancel_signal);
1333        let token = turn.cancel_token.clone();
1334        // Keep lifecycle.cancel_token in sync so existing code that reads it still works.
1335        self.lifecycle.cancel_token = turn.cancel_token.clone();
1336        if let Some(prev) = self.lifecycle.cancel_bridge_handle.take() {
1337            prev.abort();
1338        }
1339        self.lifecycle.cancel_bridge_handle = Some(tokio::spawn(async move {
1340            signal.notified().await;
1341            token.cancel();
1342        }));
1343
1344        // Clone text out of Turn so we can hold both `&str` borrows and mutate turn.metrics.
1345        let text = turn.input.text.clone();
1346        let trimmed_owned = text.trim().to_owned();
1347        let trimmed = trimmed_owned.as_str();
1348
1349        // Capture current-turn intent for VIGIL gate (FR-007). Truncated to 1024 chars.
1350        // Must be set BEFORE any tool call; cleared at end_turn (FR-008).
1351        if self.security.vigil.is_some() {
1352            let intent_len = trimmed.floor_char_boundary(1024.min(trimmed.len()));
1353            self.session.current_turn_intent = Some(trimmed[..intent_len].to_owned());
1354        }
1355
1356        if let Some(result) = self.dispatch_slash_command(trimmed).await {
1357            return result;
1358        }
1359
1360        self.check_pending_rollbacks().await;
1361
1362        if self.pre_process_security(trimmed).await? {
1363            return Ok(());
1364        }
1365
1366        let t_ctx = std::time::Instant::now();
1367        tracing::debug!("turn timing: prepare_context start");
1368        self.advance_context_lifecycle(&text, trimmed).await;
1369        turn.metrics_mut().timings.prepare_context_ms =
1370            u64::try_from(t_ctx.elapsed().as_millis()).unwrap_or(u64::MAX);
1371        tracing::debug!(
1372            ms = turn.metrics_snapshot().timings.prepare_context_ms,
1373            "turn timing: prepare_context done"
1374        );
1375
1376        let image_parts = std::mem::take(&mut turn.input.image_parts);
1377        let user_msg = self.build_user_message(&text, image_parts);
1378
1379        // Extract URLs from user input and add to user_provided_urls for grounding checks.
1380        // URL set was cleared in begin_turn; re-populate for this turn.
1381        let urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
1382        if !urls.is_empty() {
1383            self.security.user_provided_urls.write().extend(urls);
1384        }
1385
1386        // Capture raw user input as goal text for A-MAC goal-conditioned write gating (#2483).
1387        // Derived from the raw input text before context assembly to avoid timing dependencies.
1388        self.memory_state.extraction.goal_text = Some(text.clone());
1389
1390        let t_persist = std::time::Instant::now();
1391        tracing::debug!("turn timing: persist_message(user) start");
1392        // Image parts intentionally excluded — base64 payloads too large for message history.
1393        self.persist_message(Role::User, &text, &[], false).await;
1394        turn.metrics_mut().timings.persist_message_ms =
1395            u64::try_from(t_persist.elapsed().as_millis()).unwrap_or(u64::MAX);
1396        tracing::debug!(
1397            ms = turn.metrics_snapshot().timings.persist_message_ms,
1398            "turn timing: persist_message(user) done"
1399        );
1400        self.push_message(user_msg);
1401
1402        // llm_chat_ms and tool_exec_ms are accumulated inside call_chat_with_tools and
1403        // handle_native_tool_calls respectively via metrics.pending_timings.
1404        tracing::debug!("turn timing: process_response start");
1405        if let Err(e) = self.process_response().await {
1406            // Detach any in-flight learning tasks before mutating message state.
1407            self.learning_engine.learning_tasks.detach_all();
1408            tracing::error!("Response processing failed: {e:#}");
1409            let user_msg = format!("Error: {e:#}");
1410            self.channel.send(&user_msg).await?;
1411            self.msg.messages.pop();
1412            self.recompute_prompt_tokens();
1413            self.channel.flush_chunks().await?;
1414        } else {
1415            // Detach learning tasks spawned this turn — they are fire-and-forget and must not
1416            // leak into the next turn's context.
1417            self.learning_engine.learning_tasks.detach_all();
1418            self.truncate_old_tool_results();
1419            // MagicDocs: spawn background doc updates if any are due (#2702).
1420            self.maybe_update_magic_docs();
1421        }
1422        tracing::debug!("turn timing: process_response done");
1423
1424        // MARCH self-check hook: runs after every successful response, including cache-hit path.
1425        #[cfg(feature = "self-check")]
1426        if let Some(pipeline) = self.quality.clone() {
1427            self.run_self_check_for_turn(pipeline, turn.id().0).await;
1428        }
1429        // Flush pending response chunks and emit ResponseEnd exactly once per turn.
1430        // send() no longer emits ResponseEnd — flush_chunks() is the sole emitter.
1431        // When self-check appends a flag_marker chunk, this single call covers both
1432        // the main response and the marker, preventing the double response_end of #3243.
1433        let _ = self.channel.flush_chunks().await;
1434
1435        // Collect llm_chat_ms and tool_exec_ms from MetricsState.pending_timings (accumulated
1436        // by the tool execution chain) into turn.metrics so end_turn can flush them.
1437        // This is the Phase 1 bridging: existing code writes to pending_timings directly;
1438        // we harvest those values into Turn before end_turn overwrites pending_timings.
1439        turn.metrics_mut().timings.llm_chat_ms = self.metrics.pending_timings.llm_chat_ms;
1440        turn.metrics_mut().timings.tool_exec_ms = self.metrics.pending_timings.tool_exec_ms;
1441
1442        Ok(())
1443    }
1444
1445    // Returns true if the input was blocked and the caller should return Ok(()) immediately.
1446    #[cfg_attr(
1447        feature = "profiling",
1448        tracing::instrument(name = "agent.security_prescreen", skip_all)
1449    )]
1450    async fn pre_process_security(&mut self, trimmed: &str) -> Result<bool, error::AgentError> {
1451        // Guardrail: LLM-based prompt injection pre-screening at the user input boundary.
1452        if let Some(ref guardrail) = self.security.guardrail {
1453            use zeph_sanitizer::guardrail::GuardrailVerdict;
1454            let verdict = guardrail.check(trimmed).await;
1455            match &verdict {
1456                GuardrailVerdict::Flagged { reason, .. } => {
1457                    tracing::warn!(
1458                        reason = %reason,
1459                        should_block = verdict.should_block(),
1460                        "guardrail flagged user input"
1461                    );
1462                    if verdict.should_block() {
1463                        let msg = format!("[guardrail] Input blocked: {reason}");
1464                        let _ = self.channel.send(&msg).await;
1465                        let _ = self.channel.flush_chunks().await;
1466                        return Ok(true);
1467                    }
1468                    // Warn mode: notify but continue.
1469                    let _ = self
1470                        .channel
1471                        .send(&format!("[guardrail] Warning: {reason}"))
1472                        .await;
1473                }
1474                GuardrailVerdict::Error { error } => {
1475                    if guardrail.error_should_block() {
1476                        tracing::warn!(%error, "guardrail check failed (fail_strategy=closed), blocking input");
1477                        let msg = "[guardrail] Input blocked: check failed (see logs for details)";
1478                        let _ = self.channel.send(msg).await;
1479                        let _ = self.channel.flush_chunks().await;
1480                        return Ok(true);
1481                    }
1482                    tracing::warn!(%error, "guardrail check failed (fail_strategy=open), allowing input");
1483                }
1484                GuardrailVerdict::Safe => {}
1485            }
1486        }
1487
1488        // ML classifier: lightweight injection detection on user input boundary.
1489        // Runs after guardrail (LLM-based) to layer defenses. On detection, blocks and returns.
1490        // Falls back to regex on classifier error/timeout — never degrades below regex baseline.
1491        // Gated by `scan_user_input`: DeBERTa is tuned for external/untrusted content, not
1492        // direct user chat. Disabled by default to prevent false positives on benign messages.
1493        #[cfg(feature = "classifiers")]
1494        if self.security.sanitizer.scan_user_input() {
1495            match self.security.sanitizer.classify_injection(trimmed).await {
1496                zeph_sanitizer::InjectionVerdict::Blocked => {
1497                    self.push_classifier_metrics();
1498                    let _ = self
1499                        .channel
1500                        .send("[security] Input blocked: injection detected by classifier.")
1501                        .await;
1502                    let _ = self.channel.flush_chunks().await;
1503                    return Ok(true);
1504                }
1505                zeph_sanitizer::InjectionVerdict::Suspicious => {
1506                    tracing::warn!("injection_classifier soft_signal on user input");
1507                }
1508                zeph_sanitizer::InjectionVerdict::Clean => {}
1509            }
1510        }
1511        #[cfg(feature = "classifiers")]
1512        self.push_classifier_metrics();
1513
1514        Ok(false)
1515    }
1516
1517    #[cfg_attr(
1518        feature = "profiling",
1519        tracing::instrument(name = "agent.prepare_context", skip_all)
1520    )]
1521    async fn advance_context_lifecycle(&mut self, text: &str, trimmed: &str) {
1522        // Reset per-message pruning cache at the start of each turn (#2298).
1523        self.mcp.pruning_cache.reset();
1524
1525        // Extract before rebuild_system_prompt so the value is not tainted
1526        // by the secrets-bearing system prompt (ConversationId is just an i64).
1527        let conv_id = self.memory_state.persistence.conversation_id;
1528        self.rebuild_system_prompt(text).await;
1529
1530        self.detect_and_record_corrections(trimmed, conv_id).await;
1531        self.learning_engine.tick();
1532        self.analyze_and_learn().await;
1533        self.sync_graph_counts().await;
1534
1535        // Reset per-turn compaction guard FIRST so SideQuest sees a clean slate (C2 fix).
1536        // complete_focus and maybe_sidequest_eviction set this flag when they run (C1 fix).
1537        // advance_turn() transitions CompactedThisTurn → Cooling/Ready; all other states
1538        // pass through unchanged. See CompactionState::advance_turn for ordering guarantees.
1539        self.context_manager.compaction = self.context_manager.compaction.advance_turn();
1540
1541        // Tick Focus Agent and SideQuest turn counters (#1850, #1885).
1542        {
1543            self.focus.tick();
1544
1545            // SideQuest eviction: runs every N user turns when enabled.
1546            // Skipped when is_compacted_this_turn (focus truncation or prior eviction ran).
1547            let sidequest_should_fire = self.sidequest.tick();
1548            if sidequest_should_fire && !self.context_manager.compaction.is_compacted_this_turn() {
1549                self.maybe_sidequest_eviction();
1550            }
1551        }
1552
1553        // Cache-expiry warning (#2715): notify user when prompt cache has likely expired.
1554        if let Some(warning) = self.cache_expiry_warning() {
1555            tracing::info!(warning, "cache expiry warning");
1556            let _ = self.channel.send_status(&warning).await;
1557        }
1558
1559        // Time-based microcompact (#2699): strip stale low-value tool outputs before compaction.
1560        // Zero-LLM-cost; runs only when session gap exceeds configured threshold.
1561        self.maybe_time_based_microcompact();
1562
1563        // Tier 0: batch-apply deferred tool summaries when approaching context limit.
1564        // This is a pure in-memory operation (no LLM call) — summaries were pre-computed
1565        // during the tool loop. Intentionally does NOT set compacted_this_turn, so
1566        // proactive/reactive compaction may still fire if tokens remain above their thresholds.
1567        self.maybe_apply_deferred_summaries();
1568        self.flush_deferred_summaries().await;
1569
1570        // Proactive compression fires first (if configured); if it runs, reactive is skipped.
1571        if let Err(e) = self.maybe_proactive_compress().await {
1572            tracing::warn!("proactive compression failed: {e:#}");
1573        }
1574
1575        if let Err(e) = self.maybe_compact().await {
1576            tracing::warn!("context compaction failed: {e:#}");
1577        }
1578
1579        if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
1580            tracing::warn!("context preparation failed: {e:#}");
1581        }
1582
1583        // MAR: propagate top-1 recall confidence to the router for cost-aware routing.
1584        self.provider
1585            .set_memory_confidence(self.memory_state.persistence.last_recall_confidence);
1586
1587        self.learning_engine.reset_reflection();
1588    }
1589
1590    fn build_user_message(
1591        &mut self,
1592        text: &str,
1593        image_parts: Vec<zeph_llm::provider::MessagePart>,
1594    ) -> Message {
1595        let mut all_image_parts = std::mem::take(&mut self.msg.pending_image_parts);
1596        all_image_parts.extend(image_parts);
1597
1598        if !all_image_parts.is_empty() && self.provider.supports_vision() {
1599            let mut parts = vec![zeph_llm::provider::MessagePart::Text {
1600                text: text.to_owned(),
1601            }];
1602            parts.extend(all_image_parts);
1603            Message::from_parts(Role::User, parts)
1604        } else {
1605            if !all_image_parts.is_empty() {
1606                tracing::warn!(
1607                    count = all_image_parts.len(),
1608                    "image attachments dropped: provider does not support vision"
1609                );
1610            }
1611            Message {
1612                role: Role::User,
1613                content: text.to_owned(),
1614                parts: vec![],
1615                metadata: MessageMetadata::default(),
1616            }
1617        }
1618    }
1619
1620    /// Poll a sub-agent until it reaches a terminal state, bridging secret requests to the
1621    /// channel. Returns a human-readable status string suitable for sending to the user.
1622    async fn poll_subagent_until_done(&mut self, task_id: &str, label: &str) -> Option<String> {
1623        use zeph_subagent::SubAgentState;
1624        let result = loop {
1625            tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1626
1627            // Bridge secret requests from sub-agent to channel.confirm().
1628            // Fetch the pending request first, then release the borrow before
1629            // calling channel.confirm() (which requires &mut self).
1630            #[allow(clippy::redundant_closure_for_method_calls)]
1631            let pending = self
1632                .orchestration
1633                .subagent_manager
1634                .as_mut()
1635                .and_then(|m| m.try_recv_secret_request());
1636            if let Some((req_task_id, req)) = pending {
1637                // req.secret_key is pre-validated to [a-zA-Z0-9_-] in manager.rs
1638                // (SEC-P1-02), so it is safe to embed in the prompt string.
1639                let confirm_prompt = format!(
1640                    "Sub-agent requests secret '{}'. Allow?",
1641                    crate::text::truncate_to_chars(&req.secret_key, 100)
1642                );
1643                let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
1644                if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1645                    if approved {
1646                        let ttl = std::time::Duration::from_mins(5);
1647                        let key = req.secret_key.clone();
1648                        if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
1649                            let _ = mgr.deliver_secret(&req_task_id, key);
1650                        }
1651                    } else {
1652                        let _ = mgr.deny_secret(&req_task_id);
1653                    }
1654                }
1655            }
1656
1657            let mgr = self.orchestration.subagent_manager.as_ref()?;
1658            let statuses = mgr.statuses();
1659            let Some((_, status)) = statuses.iter().find(|(id, _)| id == task_id) else {
1660                break format!("{label} completed (no status available).");
1661            };
1662            match status.state {
1663                SubAgentState::Completed => {
1664                    let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
1665                    break format!("{label} completed: {msg}");
1666                }
1667                SubAgentState::Failed => {
1668                    let msg = status
1669                        .last_message
1670                        .clone()
1671                        .unwrap_or_else(|| "unknown error".into());
1672                    break format!("{label} failed: {msg}");
1673                }
1674                SubAgentState::Canceled => {
1675                    break format!("{label} was cancelled.");
1676                }
1677                _ => {
1678                    let _ = self
1679                        .channel
1680                        .send_status(&format!(
1681                            "{label}: turn {}/{}",
1682                            status.turns_used,
1683                            self.orchestration
1684                                .subagent_manager
1685                                .as_ref()
1686                                .and_then(|m| m.agents_def(task_id))
1687                                .map_or(20, |d| d.permissions.max_turns)
1688                        ))
1689                        .await;
1690                }
1691            }
1692        };
1693        Some(result)
1694    }
1695
1696    /// Resolve a unique full `task_id` from a prefix. Returns `None` if the manager is absent,
1697    /// `Some(Err(msg))` on ambiguity/not-found, `Some(Ok(full_id))` on success.
1698    fn resolve_agent_id_prefix(&mut self, prefix: &str) -> Option<Result<String, String>> {
1699        let mgr = self.orchestration.subagent_manager.as_mut()?;
1700        let full_ids: Vec<String> = mgr
1701            .statuses()
1702            .into_iter()
1703            .map(|(tid, _)| tid)
1704            .filter(|tid| tid.starts_with(prefix))
1705            .collect();
1706        Some(match full_ids.as_slice() {
1707            [] => Err(format!("No sub-agent with id prefix '{prefix}'")),
1708            [fid] => Ok(fid.clone()),
1709            _ => Err(format!(
1710                "Ambiguous id prefix '{prefix}': matches {} agents",
1711                full_ids.len()
1712            )),
1713        })
1714    }
1715
1716    fn handle_agent_list(&self) -> Option<String> {
1717        use std::fmt::Write as _;
1718        let mgr = self.orchestration.subagent_manager.as_ref()?;
1719        let defs = mgr.definitions();
1720        if defs.is_empty() {
1721            return Some("No sub-agent definitions found.".into());
1722        }
1723        let mut out = String::from("Available sub-agents:\n");
1724        for d in defs {
1725            let memory_label = match d.memory {
1726                Some(zeph_subagent::MemoryScope::User) => " [memory:user]",
1727                Some(zeph_subagent::MemoryScope::Project) => " [memory:project]",
1728                Some(zeph_subagent::MemoryScope::Local) => " [memory:local]",
1729                None => "",
1730            };
1731            if let Some(ref src) = d.source {
1732                let _ = writeln!(
1733                    out,
1734                    "  {}{} — {} ({})",
1735                    d.name, memory_label, d.description, src
1736                );
1737            } else {
1738                let _ = writeln!(out, "  {}{} — {}", d.name, memory_label, d.description);
1739            }
1740        }
1741        Some(out)
1742    }
1743
1744    fn handle_agent_status(&self) -> Option<String> {
1745        use std::fmt::Write as _;
1746        let mgr = self.orchestration.subagent_manager.as_ref()?;
1747        let statuses = mgr.statuses();
1748        if statuses.is_empty() {
1749            return Some("No active sub-agents.".into());
1750        }
1751        let mut out = String::from("Active sub-agents:\n");
1752        for (id, s) in &statuses {
1753            let state = format!("{:?}", s.state).to_lowercase();
1754            let elapsed = s.started_at.elapsed().as_secs();
1755            let _ = writeln!(
1756                out,
1757                "  [{short}] {state}  turns={t}  elapsed={elapsed}s  {msg}",
1758                short = &id[..8.min(id.len())],
1759                t = s.turns_used,
1760                msg = s.last_message.as_deref().unwrap_or(""),
1761            );
1762            // Show memory directory path for agents with memory enabled.
1763            if let Some(def) = mgr.agents_def(id)
1764                && let Some(scope) = def.memory
1765                && let Ok(dir) = zeph_subagent::memory::resolve_memory_dir(scope, &def.name)
1766            {
1767                let _ = writeln!(out, "       memory: {}", dir.display());
1768            }
1769        }
1770        Some(out)
1771    }
1772
1773    fn handle_agent_approve(&mut self, id: &str) -> Option<String> {
1774        let full_id = match self.resolve_agent_id_prefix(id)? {
1775            Ok(fid) => fid,
1776            Err(msg) => return Some(msg),
1777        };
1778        let mgr = self.orchestration.subagent_manager.as_mut()?;
1779        if let Some((tid, req)) = mgr.try_recv_secret_request()
1780            && tid == full_id
1781        {
1782            let key = req.secret_key.clone();
1783            let ttl = std::time::Duration::from_mins(5);
1784            if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
1785                return Some(format!("Approve failed: {e}"));
1786            }
1787            if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
1788                return Some(format!("Secret delivery failed: {e}"));
1789            }
1790            return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
1791        }
1792        Some(format!(
1793            "No pending secret request for sub-agent '{full_id}'."
1794        ))
1795    }
1796
1797    fn handle_agent_deny(&mut self, id: &str) -> Option<String> {
1798        let full_id = match self.resolve_agent_id_prefix(id)? {
1799            Ok(fid) => fid,
1800            Err(msg) => return Some(msg),
1801        };
1802        let mgr = self.orchestration.subagent_manager.as_mut()?;
1803        match mgr.deny_secret(&full_id) {
1804            Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
1805            Err(e) => Some(format!("Deny failed: {e}")),
1806        }
1807    }
1808
1809    #[allow(clippy::too_many_lines)]
1810    async fn handle_agent_command(&mut self, cmd: zeph_subagent::AgentCommand) -> Option<String> {
1811        use zeph_subagent::AgentCommand;
1812
1813        match cmd {
1814            AgentCommand::List => self.handle_agent_list(),
1815            AgentCommand::Background { name, prompt } => {
1816                let provider = self.provider.clone();
1817                let tool_executor = Arc::clone(&self.tool_executor);
1818                let skills = self.filtered_skills_for(&name);
1819                let cfg = self.orchestration.subagent_config.clone();
1820                let spawn_ctx = self.build_spawn_context(&cfg);
1821                let mgr = self.orchestration.subagent_manager.as_mut()?;
1822                match mgr.spawn(
1823                    &name,
1824                    &prompt,
1825                    provider,
1826                    tool_executor,
1827                    skills,
1828                    &cfg,
1829                    spawn_ctx,
1830                ) {
1831                    Ok(id) => Some(format!(
1832                        "Sub-agent '{name}' started in background (id: {short})",
1833                        short = &id[..8.min(id.len())]
1834                    )),
1835                    Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
1836                }
1837            }
1838            AgentCommand::Spawn { name, prompt }
1839            | AgentCommand::Mention {
1840                agent: name,
1841                prompt,
1842            } => {
1843                // Foreground spawn: launch and await completion, streaming status to user.
1844                let provider = self.provider.clone();
1845                let tool_executor = Arc::clone(&self.tool_executor);
1846                let skills = self.filtered_skills_for(&name);
1847                let cfg = self.orchestration.subagent_config.clone();
1848                let spawn_ctx = self.build_spawn_context(&cfg);
1849                let mgr = self.orchestration.subagent_manager.as_mut()?;
1850                let task_id = match mgr.spawn(
1851                    &name,
1852                    &prompt,
1853                    provider,
1854                    tool_executor,
1855                    skills,
1856                    &cfg,
1857                    spawn_ctx,
1858                ) {
1859                    Ok(id) => id,
1860                    Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
1861                };
1862                let short = task_id[..8.min(task_id.len())].to_owned();
1863                let _ = self
1864                    .channel
1865                    .send(&format!("Sub-agent '{name}' running... (id: {short})"))
1866                    .await;
1867                let label = format!("Sub-agent '{name}'");
1868                self.poll_subagent_until_done(&task_id, &label).await
1869            }
1870            AgentCommand::Status => self.handle_agent_status(),
1871            AgentCommand::Cancel { id } => {
1872                let mgr = self.orchestration.subagent_manager.as_mut()?;
1873                // Accept prefix match on task_id.
1874                let ids: Vec<String> = mgr
1875                    .statuses()
1876                    .into_iter()
1877                    .map(|(task_id, _)| task_id)
1878                    .filter(|task_id| task_id.starts_with(&id))
1879                    .collect();
1880                match ids.as_slice() {
1881                    [] => Some(format!("No sub-agent with id prefix '{id}'")),
1882                    [full_id] => {
1883                        let full_id = full_id.clone();
1884                        match mgr.cancel(&full_id) {
1885                            Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
1886                            Err(e) => Some(format!("Cancel failed: {e}")),
1887                        }
1888                    }
1889                    _ => Some(format!(
1890                        "Ambiguous id prefix '{id}': matches {} agents",
1891                        ids.len()
1892                    )),
1893                }
1894            }
1895            AgentCommand::Approve { id } => self.handle_agent_approve(&id),
1896            AgentCommand::Deny { id } => self.handle_agent_deny(&id),
1897            AgentCommand::Resume { id, prompt } => {
1898                let cfg = self.orchestration.subagent_config.clone();
1899                // Resolve definition name from transcript meta before spawning so we can
1900                // look up skills by definition name rather than the UUID prefix (S1 fix).
1901                let def_name = {
1902                    let mgr = self.orchestration.subagent_manager.as_ref()?;
1903                    match mgr.def_name_for_resume(&id, &cfg) {
1904                        Ok(name) => name,
1905                        Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
1906                    }
1907                };
1908                let skills = self.filtered_skills_for(&def_name);
1909                let provider = self.provider.clone();
1910                let tool_executor = Arc::clone(&self.tool_executor);
1911                let mgr = self.orchestration.subagent_manager.as_mut()?;
1912                let (task_id, _) =
1913                    match mgr.resume(&id, &prompt, provider, tool_executor, skills, &cfg) {
1914                        Ok(pair) => pair,
1915                        Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
1916                    };
1917                let short = task_id[..8.min(task_id.len())].to_owned();
1918                let _ = self
1919                    .channel
1920                    .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
1921                    .await;
1922                self.poll_subagent_until_done(&task_id, "Resumed sub-agent")
1923                    .await
1924            }
1925        }
1926    }
1927
1928    fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
1929        let mgr = self.orchestration.subagent_manager.as_ref()?;
1930        let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
1931        let reg = self.skill_state.registry.read();
1932        match zeph_subagent::filter_skills(&reg, &def.skills) {
1933            Ok(skills) => {
1934                let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
1935                if bodies.is_empty() {
1936                    None
1937                } else {
1938                    Some(bodies)
1939                }
1940            }
1941            Err(e) => {
1942                tracing::warn!(error = %e, "skill filtering failed for sub-agent");
1943                None
1944            }
1945        }
1946    }
1947
1948    /// Build a `SpawnContext` from current agent state for sub-agent spawning.
1949    fn build_spawn_context(
1950        &self,
1951        cfg: &zeph_config::SubAgentConfig,
1952    ) -> zeph_subagent::SpawnContext {
1953        zeph_subagent::SpawnContext {
1954            parent_messages: self.extract_parent_messages(cfg),
1955            parent_cancel: Some(self.lifecycle.cancel_token.clone()),
1956            parent_provider_name: {
1957                let name = &self.runtime.active_provider_name;
1958                if name.is_empty() {
1959                    None
1960                } else {
1961                    Some(name.clone())
1962                }
1963            },
1964            spawn_depth: self.runtime.spawn_depth,
1965            mcp_tool_names: self.extract_mcp_tool_names(),
1966        }
1967    }
1968
1969    /// Extract recent parent messages for history propagation (Section 5.7 in spec).
1970    ///
1971    /// Filters system messages, takes last `context_window_turns * 2` messages,
1972    /// and applies a 25% context window cap using a 4-chars-per-token heuristic.
1973    fn extract_parent_messages(
1974        &self,
1975        config: &zeph_config::SubAgentConfig,
1976    ) -> Vec<zeph_llm::provider::Message> {
1977        use zeph_llm::provider::Role;
1978        if config.context_window_turns == 0 {
1979            return Vec::new();
1980        }
1981        let non_system: Vec<_> = self
1982            .msg
1983            .messages
1984            .iter()
1985            .filter(|m| m.role != Role::System)
1986            .cloned()
1987            .collect();
1988        let take_count = config.context_window_turns * 2;
1989        let start = non_system.len().saturating_sub(take_count);
1990        let mut msgs = non_system[start..].to_vec();
1991
1992        // Cap at 25% of model context window (rough 4-chars-per-token heuristic).
1993        let max_chars = 128_000usize / 4; // conservative default; 25% of 128K tokens
1994        let mut total_chars: usize = 0;
1995        let mut keep = msgs.len();
1996        for (i, m) in msgs.iter().enumerate() {
1997            total_chars += m.content.len();
1998            if total_chars > max_chars {
1999                keep = i;
2000                break;
2001            }
2002        }
2003        if keep < msgs.len() {
2004            tracing::info!(
2005                kept = keep,
2006                requested = config.context_window_turns * 2,
2007                "[subagent] truncated parent history from {} to {} turns due to token budget",
2008                config.context_window_turns * 2,
2009                keep
2010            );
2011            msgs.truncate(keep);
2012        }
2013        msgs
2014    }
2015
2016    /// Extract MCP tool names from the tool executor for diagnostic annotation.
2017    fn extract_mcp_tool_names(&self) -> Vec<String> {
2018        self.tool_executor
2019            .tool_definitions_erased()
2020            .into_iter()
2021            .filter(|t| t.id.starts_with("mcp_"))
2022            .map(|t| t.id.to_string())
2023            .collect()
2024    }
2025
2026    /// Classify a skill directory's source kind using on-disk markers and the bundled allowlist.
2027    ///
2028    /// Must be called from a blocking context (uses synchronous FS I/O).
2029    fn classify_source_kind(
2030        skill_dir: &std::path::Path,
2031        managed_dir: Option<&std::path::PathBuf>,
2032        bundled_names: &std::collections::HashSet<String>,
2033    ) -> zeph_memory::store::SourceKind {
2034        if managed_dir.is_some_and(|d| skill_dir.starts_with(d)) {
2035            let skill_name = skill_dir.file_name().and_then(|n| n.to_str()).unwrap_or("");
2036            let has_marker = skill_dir.join(".bundled").exists();
2037            if has_marker && bundled_names.contains(skill_name) {
2038                zeph_memory::store::SourceKind::Bundled
2039            } else {
2040                if has_marker {
2041                    tracing::warn!(
2042                        skill = %skill_name,
2043                        "skill has .bundled marker but is not in the bundled skill \
2044                         allowlist — classifying as Hub"
2045                    );
2046                }
2047                zeph_memory::store::SourceKind::Hub
2048            }
2049        } else {
2050            zeph_memory::store::SourceKind::Local
2051        }
2052    }
2053
2054    /// Update trust DB records for all reloaded skills.
2055    async fn update_trust_for_reloaded_skills(
2056        &mut self,
2057        all_meta: &[zeph_skills::loader::SkillMeta],
2058    ) {
2059        // Clone Arc before any .await so no &self fields are held across suspension points.
2060        let memory = self.memory_state.persistence.memory.clone();
2061        let Some(memory) = memory else {
2062            return;
2063        };
2064        let trust_cfg = self.skill_state.trust_config.clone();
2065        let managed_dir = self.skill_state.managed_dir.clone();
2066        let bundled_names: std::collections::HashSet<String> =
2067            zeph_skills::bundled_skill_names().into_iter().collect();
2068        for meta in all_meta {
2069            // Compute hash and classify source_kind in spawn_blocking — both are blocking FS calls
2070            // (.bundled marker .exists() and compute_skill_hash both do std::fs I/O).
2071            let skill_dir = meta.skill_dir.clone();
2072            let managed_dir_ref = managed_dir.clone();
2073            let bundled_names_ref = bundled_names.clone();
2074            let fs_result: Option<(String, zeph_memory::store::SourceKind)> =
2075                tokio::task::spawn_blocking(move || {
2076                    let hash = zeph_skills::compute_skill_hash(&skill_dir).ok()?;
2077                    let source_kind = Self::classify_source_kind(
2078                        &skill_dir,
2079                        managed_dir_ref.as_ref(),
2080                        &bundled_names_ref,
2081                    );
2082                    Some((hash, source_kind))
2083                })
2084                .await
2085                .unwrap_or(None);
2086
2087            let Some((current_hash, source_kind)) = fs_result else {
2088                tracing::warn!("failed to compute hash for '{}'", meta.name);
2089                continue;
2090            };
2091            let initial_level = match source_kind {
2092                zeph_memory::store::SourceKind::Bundled => &trust_cfg.bundled_level,
2093                zeph_memory::store::SourceKind::Hub => &trust_cfg.default_level,
2094                zeph_memory::store::SourceKind::Local | zeph_memory::store::SourceKind::File => {
2095                    &trust_cfg.local_level
2096                }
2097            };
2098            let existing = memory
2099                .sqlite()
2100                .load_skill_trust(&meta.name)
2101                .await
2102                .ok()
2103                .flatten();
2104            let trust_level_str = if let Some(ref row) = existing {
2105                if row.blake3_hash != current_hash {
2106                    trust_cfg.hash_mismatch_level.to_string()
2107                } else if row.source_kind != source_kind {
2108                    // source_kind changed (e.g., hub → bundled on upgrade).
2109                    // Never override an explicit operator block. For active trust levels,
2110                    // adopt the source-kind initial level when it grants more trust.
2111                    let stored = row
2112                        .trust_level
2113                        .parse::<zeph_tools::SkillTrustLevel>()
2114                        .unwrap_or_else(|_| {
2115                            tracing::warn!(
2116                                skill = %meta.name,
2117                                raw = %row.trust_level,
2118                                "unrecognised trust_level in DB, treating as quarantined"
2119                            );
2120                            zeph_tools::SkillTrustLevel::Quarantined
2121                        });
2122                    if !stored.is_active() || stored.severity() <= initial_level.severity() {
2123                        row.trust_level.clone()
2124                    } else {
2125                        initial_level.to_string()
2126                    }
2127                } else {
2128                    row.trust_level.clone()
2129                }
2130            } else {
2131                initial_level.to_string()
2132            };
2133            let source_path = meta.skill_dir.to_str();
2134            if let Err(e) = memory
2135                .sqlite()
2136                .upsert_skill_trust(
2137                    &meta.name,
2138                    &trust_level_str,
2139                    source_kind,
2140                    None,
2141                    source_path,
2142                    &current_hash,
2143                )
2144                .await
2145            {
2146                tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
2147            }
2148        }
2149    }
2150
2151    /// Rebuild or sync the in-memory skill matcher and BM25 index after a registry update.
2152    async fn rebuild_skill_matcher(&mut self, all_meta: &[&zeph_skills::loader::SkillMeta]) {
2153        let provider = self.embedding_provider.clone();
2154        let embed_timeout = std::time::Duration::from_secs(self.runtime.timeouts.embedding_seconds);
2155        let embed_fn = move |text: &str| -> zeph_skills::matcher::EmbedFuture {
2156            let owned = text.to_owned();
2157            let p = provider.clone();
2158            Box::pin(async move {
2159                if let Ok(result) = tokio::time::timeout(embed_timeout, p.embed(&owned)).await {
2160                    result
2161                } else {
2162                    tracing::warn!(
2163                        timeout_secs = embed_timeout.as_secs(),
2164                        "skill matcher: embedding timed out"
2165                    );
2166                    Err(zeph_llm::LlmError::Timeout)
2167                }
2168            })
2169        };
2170
2171        let needs_inmemory_rebuild = !self
2172            .skill_state
2173            .matcher
2174            .as_ref()
2175            .is_some_and(SkillMatcherBackend::is_qdrant);
2176
2177        if needs_inmemory_rebuild {
2178            self.skill_state.matcher = SkillMatcher::new(all_meta, embed_fn)
2179                .await
2180                .map(SkillMatcherBackend::InMemory);
2181        } else if let Some(ref mut backend) = self.skill_state.matcher {
2182            let _ = self.channel.send_status("syncing skill index...").await;
2183            let on_progress: Option<Box<dyn Fn(usize, usize) + Send>> = self
2184                .session
2185                .status_tx
2186                .clone()
2187                .map(|tx| -> Box<dyn Fn(usize, usize) + Send> {
2188                    Box::new(move |completed, total| {
2189                        let msg = format!("Syncing skills: {completed}/{total}");
2190                        let _ = tx.send(msg);
2191                    })
2192                });
2193            if let Err(e) = backend
2194                .sync(
2195                    all_meta,
2196                    &self.skill_state.embedding_model,
2197                    embed_fn,
2198                    on_progress,
2199                )
2200                .await
2201            {
2202                tracing::warn!("failed to sync skill embeddings: {e:#}");
2203            }
2204        }
2205
2206        if self.skill_state.hybrid_search {
2207            let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
2208            let _ = self.channel.send_status("rebuilding search index...").await;
2209            self.skill_state.rebuild_bm25(&descs);
2210        }
2211    }
2212
2213    #[cfg_attr(
2214        feature = "profiling",
2215        tracing::instrument(name = "skill.hot_reload", skip_all)
2216    )]
2217    async fn reload_skills(&mut self) {
2218        let old_fp = self.skill_state.fingerprint();
2219        let reload_paths = if let Some(ref supplier) = self.skill_state.plugin_dirs_supplier {
2220            let plugin_dirs = supplier();
2221            let mut paths = self.skill_state.skill_paths.clone();
2222            for dir in plugin_dirs {
2223                if !paths.contains(&dir) {
2224                    paths.push(dir);
2225                }
2226            }
2227            paths
2228        } else {
2229            self.skill_state.skill_paths.clone()
2230        };
2231        self.skill_state.registry.write().reload(&reload_paths);
2232        if self.skill_state.fingerprint() == old_fp {
2233            return;
2234        }
2235        let _ = self.channel.send_status("reloading skills...").await;
2236
2237        let all_meta = self
2238            .skill_state
2239            .registry
2240            .read()
2241            .all_meta()
2242            .into_iter()
2243            .cloned()
2244            .collect::<Vec<_>>();
2245
2246        self.update_trust_for_reloaded_skills(&all_meta).await;
2247
2248        let all_meta_refs = all_meta.iter().collect::<Vec<_>>();
2249        self.rebuild_skill_matcher(&all_meta_refs).await;
2250
2251        let all_skills: Vec<Skill> = {
2252            let reg = self.skill_state.registry.read();
2253            reg.all_meta()
2254                .iter()
2255                .filter_map(|m| reg.get_skill(&m.name).ok())
2256                .collect()
2257        };
2258        let trust_map = self.build_skill_trust_map().await;
2259        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
2260        let skills_prompt = SkillState::rebuild_prompt(&all_skills, &trust_map, &empty_health);
2261        self.skill_state
2262            .last_skills_prompt
2263            .clone_from(&skills_prompt);
2264        let system_prompt = build_system_prompt(&skills_prompt, None);
2265        if let Some(msg) = self.msg.messages.first_mut() {
2266            msg.content = system_prompt;
2267        }
2268
2269        let _ = self.channel.send_status("").await;
2270        tracing::info!(
2271            "reloaded {} skill(s)",
2272            self.skill_state.registry.read().all_meta().len()
2273        );
2274    }
2275
2276    fn reload_instructions(&mut self) {
2277        // Drain any additional queued events before reloading to avoid redundant reloads.
2278        if let Some(ref mut rx) = self.instructions.reload_rx {
2279            while rx.try_recv().is_ok() {}
2280        }
2281        let Some(ref state) = self.instructions.reload_state else {
2282            return;
2283        };
2284        let new_blocks = crate::instructions::load_instructions(
2285            &state.base_dir,
2286            &state.provider_kinds,
2287            &state.explicit_files,
2288            state.auto_detect,
2289        );
2290        let old_sources: std::collections::HashSet<_> =
2291            self.instructions.blocks.iter().map(|b| &b.source).collect();
2292        let new_sources: std::collections::HashSet<_> =
2293            new_blocks.iter().map(|b| &b.source).collect();
2294        for added in new_sources.difference(&old_sources) {
2295            tracing::info!(path = %added.display(), "instruction file added");
2296        }
2297        for removed in old_sources.difference(&new_sources) {
2298            tracing::info!(path = %removed.display(), "instruction file removed");
2299        }
2300        tracing::info!(
2301            old_count = self.instructions.blocks.len(),
2302            new_count = new_blocks.len(),
2303            "reloaded instruction files"
2304        );
2305        self.instructions.blocks = new_blocks;
2306    }
2307
2308    fn reload_config(&mut self) {
2309        let Some(path) = self.lifecycle.config_path.clone() else {
2310            return;
2311        };
2312        let Some(config) = self.load_config_with_overlay(&path) else {
2313            return;
2314        };
2315        let budget_tokens = resolve_context_budget(&config, &self.provider);
2316        self.runtime.security = config.security;
2317        self.runtime.timeouts = config.timeouts;
2318        self.runtime.redact_credentials = config.memory.redact_credentials;
2319        self.memory_state.persistence.history_limit = config.memory.history_limit;
2320        self.memory_state.persistence.recall_limit = config.memory.semantic.recall_limit;
2321        self.memory_state.compaction.summarization_threshold =
2322            config.memory.summarization_threshold;
2323        self.skill_state.max_active_skills = config.skills.max_active_skills;
2324        self.skill_state.disambiguation_threshold = config.skills.disambiguation_threshold;
2325        self.skill_state.min_injection_score = config.skills.min_injection_score;
2326        self.skill_state.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
2327        self.skill_state.hybrid_search = config.skills.hybrid_search;
2328        self.skill_state.two_stage_matching = config.skills.two_stage_matching;
2329        self.skill_state.confusability_threshold =
2330            config.skills.confusability_threshold.clamp(0.0, 1.0);
2331        config
2332            .skills
2333            .generation_provider
2334            .as_str()
2335            .clone_into(&mut self.skill_state.generation_provider_name);
2336        self.skill_state.generation_output_dir =
2337            config.skills.generation_output_dir.as_deref().map(|p| {
2338                if let Some(stripped) = p.strip_prefix("~/") {
2339                    dirs::home_dir()
2340                        .map_or_else(|| std::path::PathBuf::from(p), |h| h.join(stripped))
2341                } else {
2342                    std::path::PathBuf::from(p)
2343                }
2344            });
2345
2346        self.context_manager.budget = Some(
2347            ContextBudget::new(budget_tokens, 0.20).with_graph_enabled(config.memory.graph.enabled),
2348        );
2349
2350        {
2351            let graph_cfg = &config.memory.graph;
2352            if graph_cfg.rpe.enabled {
2353                // Re-create router only if it doesn't exist yet; preserve state on hot-reload.
2354                if self.memory_state.extraction.rpe_router.is_none() {
2355                    self.memory_state.extraction.rpe_router =
2356                        Some(std::sync::Mutex::new(zeph_memory::RpeRouter::new(
2357                            graph_cfg.rpe.threshold,
2358                            graph_cfg.rpe.max_skip_turns,
2359                        )));
2360                }
2361            } else {
2362                self.memory_state.extraction.rpe_router = None;
2363            }
2364            self.memory_state.extraction.graph_config = graph_cfg.clone();
2365        }
2366        self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
2367        self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
2368        self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
2369        self.context_manager.compaction_cooldown_turns = config.memory.compaction_cooldown_turns;
2370        self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
2371        self.context_manager.compression = config.memory.compression.clone();
2372        self.context_manager.routing = config.memory.store_routing.clone();
2373        // Resolve routing_classifier_provider from the provider pool (#2484).
2374        self.context_manager.store_routing_provider = if config
2375            .memory
2376            .store_routing
2377            .routing_classifier_provider
2378            .is_empty()
2379        {
2380            None
2381        } else {
2382            let resolved = self.resolve_background_provider(
2383                &config.memory.store_routing.routing_classifier_provider,
2384            );
2385            Some(std::sync::Arc::new(resolved))
2386        };
2387        self.memory_state.persistence.cross_session_score_threshold =
2388            config.memory.cross_session_score_threshold;
2389
2390        self.index.repo_map_tokens = config.index.repo_map_tokens;
2391        self.index.repo_map_ttl = std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
2392
2393        tracing::info!("config reloaded");
2394    }
2395
2396    /// Load config from disk, apply plugin overlays, and warn on shell divergence.
2397    ///
2398    /// Returns `None` when loading or overlay merge fails (caller keeps prior runtime state).
2399    fn load_config_with_overlay(&mut self, path: &std::path::Path) -> Option<Config> {
2400        let mut config = match Config::load(path) {
2401            Ok(c) => c,
2402            Err(e) => {
2403                tracing::warn!("config reload failed: {e:#}");
2404                return None;
2405            }
2406        };
2407
2408        // Re-apply plugin overlays. On error, keep previous runtime state intact.
2409        let new_overlay = if self.lifecycle.plugins_dir.as_os_str().is_empty() {
2410            None
2411        } else {
2412            match zeph_plugins::apply_plugin_config_overlays(
2413                &mut config,
2414                &self.lifecycle.plugins_dir,
2415            ) {
2416                Ok(o) => Some(o),
2417                Err(e) => {
2418                    tracing::warn!(
2419                        "plugin overlay merge failed during reload: {e:#}; \
2420                         keeping previous runtime state"
2421                    );
2422                    return None;
2423                }
2424            }
2425        };
2426
2427        // M4: detect shell-level divergence from the baked-in executor and warn loudly.
2428        // ShellExecutor is not rebuilt on hot-reload; only skill threshold is live.
2429        // A follow-up P2 issue tracks live-rebuild of ShellExecutor.
2430        if let Some(ref overlay) = new_overlay {
2431            self.warn_on_shell_overlay_divergence(overlay, &config);
2432        }
2433        Some(config)
2434    }
2435
2436    /// React to shell policy divergence detected on hot-reload.
2437    ///
2438    /// `blocked_commands` is rebuilt live via `ShellPolicyHandle::rebuild` — no restart needed.
2439    /// `allowed_commands` cannot be rebuilt (feeds sandbox path intersection at construction time)
2440    /// — emit a warn + status banner when it changes.
2441    fn warn_on_shell_overlay_divergence(
2442        &self,
2443        new_overlay: &zeph_plugins::ResolvedOverlay,
2444        config: &Config,
2445    ) {
2446        let new_blocked: Vec<String> = {
2447            let mut v = config.tools.shell.blocked_commands.clone();
2448            v.sort();
2449            v
2450        };
2451        let new_allowed: Vec<String> = {
2452            let mut v = config.tools.shell.allowed_commands.clone();
2453            v.sort();
2454            v
2455        };
2456
2457        let startup = &self.lifecycle.startup_shell_overlay;
2458        let blocked_changed = new_blocked != startup.blocked;
2459        let allowed_changed = new_allowed != startup.allowed;
2460
2461        // blocked_commands IS rebuilt live — emit info-level confirmation only.
2462        if blocked_changed && let Some(ref h) = self.lifecycle.shell_policy_handle {
2463            h.rebuild(&config.tools.shell);
2464            tracing::info!(
2465                blocked_count = h.snapshot_blocked().len(),
2466                "shell blocked_commands rebuilt from hot-reload"
2467            );
2468        }
2469
2470        // allowed_commands cannot be rebuilt — sandbox path intersection is computed at
2471        // executor construction time. Warn loudly so the user restarts.
2472        //
2473        // Note: when base `allowed_commands` is empty (the default), the overlay's
2474        // intersection semantics keep it empty, so this branch is silently unreachable
2475        // for users who do not set a non-empty base list.
2476        if allowed_changed {
2477            let msg = "plugin config overlay changed shell allowed_commands; RESTART REQUIRED \
2478                 for sandbox path recomputation (blocked_commands was rebuilt live)";
2479            tracing::warn!("{msg}");
2480            if let Some(ref tx) = self.session.status_tx {
2481                let _ = tx.send(msg.to_owned());
2482            }
2483        }
2484
2485        let _ = new_overlay;
2486    }
2487
2488    /// Run `SideQuest` tool output eviction pass (#1885).
2489    ///
2490    /// PERF-1 fix: two-phase non-blocking design.
2491    ///
2492    /// Phase 1 (apply, this turn): check for a background LLM result spawned last turn,
2493    /// validate and apply it immediately.
2494    ///
2495    /// Phase 2 (schedule, this turn): rebuild cursors and spawn a background `tokio::spawn`
2496    /// task for the LLM call. The result is stored in `pending_sidequest_result` and applied
2497    /// next turn, so the current agent turn is never blocked by the LLM call.
2498    #[allow(clippy::too_many_lines)]
2499    fn maybe_sidequest_eviction(&mut self) {
2500        use zeph_llm::provider::{Message, MessageMetadata, Role};
2501
2502        // S1 runtime guard: warn when SideQuest is enabled alongside a non-Reactive pruning
2503        // strategy — the two systems share the same pool of evictable tool outputs and can
2504        // interfere. Disable sidequest.enabled when pruning_strategy != Reactive.
2505        if self.sidequest.config.enabled {
2506            use crate::config::PruningStrategy;
2507            if !matches!(
2508                self.context_manager.compression.pruning_strategy,
2509                PruningStrategy::Reactive
2510            ) {
2511                tracing::warn!(
2512                    strategy = ?self.context_manager.compression.pruning_strategy,
2513                    "sidequest is enabled alongside a non-Reactive pruning strategy; \
2514                     consider disabling sidequest.enabled to avoid redundant eviction"
2515                );
2516            }
2517        }
2518
2519        // Guard: do not evict while a focus session is active.
2520        if self.focus.is_active() {
2521            tracing::debug!("sidequest: skipping — focus session active");
2522            // Drop any pending result — cursors may be stale relative to focus truncation.
2523            self.compression.pending_sidequest_result = None;
2524            return;
2525        }
2526
2527        // Phase 1: apply pending result from last turn's background LLM call.
2528        if let Some(handle) = self.compression.pending_sidequest_result.take() {
2529            // `now_or_never` avoids blocking — if the task isn't done yet, skip this turn.
2530            use futures::FutureExt as _;
2531            match handle.now_or_never() {
2532                Some(Ok(Some(evicted_indices))) if !evicted_indices.is_empty() => {
2533                    let cursors_snapshot = self.sidequest.tool_output_cursors.clone();
2534                    let freed = self.sidequest.apply_eviction(
2535                        &mut self.msg.messages,
2536                        &evicted_indices,
2537                        &self.metrics.token_counter,
2538                    );
2539                    if freed > 0 {
2540                        self.recompute_prompt_tokens();
2541                        // C1 fix: prevent maybe_compact() from firing in the same turn.
2542                        // cooldown=0: eviction does not impose post-compaction cooldown.
2543                        self.context_manager.compaction =
2544                            crate::agent::context_manager::CompactionState::CompactedThisTurn {
2545                                cooldown: 0,
2546                            };
2547                        tracing::info!(
2548                            freed_tokens = freed,
2549                            evicted_cursors = evicted_indices.len(),
2550                            pass = self.sidequest.passes_run,
2551                            "sidequest eviction complete"
2552                        );
2553                        if let Some(ref d) = self.debug_state.debug_dumper {
2554                            d.dump_sidequest_eviction(&cursors_snapshot, &evicted_indices, freed);
2555                        }
2556                        if let Some(ref tx) = self.session.status_tx {
2557                            let _ = tx.send(format!("SideQuest evicted {freed} tokens"));
2558                        }
2559                    } else {
2560                        // apply_eviction returned 0 — clear spinner so it doesn't dangle.
2561                        if let Some(ref tx) = self.session.status_tx {
2562                            let _ = tx.send(String::new());
2563                        }
2564                    }
2565                }
2566                Some(Ok(None | Some(_))) => {
2567                    tracing::debug!("sidequest: pending result: no cursors to evict");
2568                    if let Some(ref tx) = self.session.status_tx {
2569                        let _ = tx.send(String::new());
2570                    }
2571                }
2572                Some(Err(e)) => {
2573                    tracing::debug!("sidequest: background task panicked: {e}");
2574                    if let Some(ref tx) = self.session.status_tx {
2575                        let _ = tx.send(String::new());
2576                    }
2577                }
2578                None => {
2579                    // Task still running — re-store and wait another turn.
2580                    // We already took it; we'd need to re-spawn, but instead just drop and
2581                    // schedule fresh below to keep the cursor list current.
2582                    tracing::debug!(
2583                        "sidequest: background LLM task not yet complete, rescheduling"
2584                    );
2585                }
2586            }
2587        }
2588
2589        // Phase 2: rebuild cursors and schedule the next background eviction LLM call.
2590        self.sidequest
2591            .rebuild_cursors(&self.msg.messages, &self.metrics.token_counter);
2592
2593        if self.sidequest.tool_output_cursors.is_empty() {
2594            tracing::debug!("sidequest: no eligible cursors");
2595            return;
2596        }
2597
2598        let prompt = self.sidequest.build_eviction_prompt();
2599        let max_eviction_ratio = self.sidequest.config.max_eviction_ratio;
2600        let n_cursors = self.sidequest.tool_output_cursors.len();
2601        // Clone the provider so the spawn closure owns it without borrowing self.
2602        let provider = self.summary_or_primary_provider().clone();
2603
2604        // Spawn background task: the LLM call runs without blocking the agent loop.
2605        let handle = tokio::spawn(async move {
2606            let msgs = [Message {
2607                role: Role::User,
2608                content: prompt,
2609                parts: vec![],
2610                metadata: MessageMetadata::default(),
2611            }];
2612            let response =
2613                match tokio::time::timeout(std::time::Duration::from_secs(5), provider.chat(&msgs))
2614                    .await
2615                {
2616                    Ok(Ok(r)) => r,
2617                    Ok(Err(e)) => {
2618                        tracing::debug!("sidequest bg: LLM call failed: {e:#}");
2619                        return None;
2620                    }
2621                    Err(_) => {
2622                        tracing::debug!("sidequest bg: LLM call timed out");
2623                        return None;
2624                    }
2625                };
2626
2627            let start = response.find('{')?;
2628            let end = response.rfind('}')?;
2629            if start > end {
2630                return None;
2631            }
2632            let json_slice = &response[start..=end];
2633            let parsed: sidequest::EvictionResponse = serde_json::from_str(json_slice).ok()?;
2634            let mut valid: Vec<usize> = parsed
2635                .del_cursors
2636                .into_iter()
2637                .filter(|&c| c < n_cursors)
2638                .collect();
2639            valid.sort_unstable();
2640            valid.dedup();
2641            #[allow(
2642                clippy::cast_precision_loss,
2643                clippy::cast_possible_truncation,
2644                clippy::cast_sign_loss
2645            )]
2646            let max_evict = ((n_cursors as f32) * max_eviction_ratio).ceil() as usize;
2647            valid.truncate(max_evict);
2648            Some(valid)
2649        });
2650
2651        self.compression.pending_sidequest_result = Some(handle);
2652        tracing::debug!("sidequest: background LLM eviction task spawned");
2653        if let Some(ref tx) = self.session.status_tx {
2654            let _ = tx.send("SideQuest: scoring tool outputs...".into());
2655        }
2656    }
2657
2658    /// Check if the process cwd has changed since last call and fire `CwdChanged` hooks.
2659    ///
2660    /// Called after each tool batch completes. The check is a single syscall and has
2661    /// negligible cost. Only fires when cwd actually changed (defense-in-depth: normally
2662    /// only `set_working_directory` changes cwd; shell child processes cannot affect it).
2663    pub(crate) async fn check_cwd_changed(&mut self) {
2664        let current = match std::env::current_dir() {
2665            Ok(p) => p,
2666            Err(e) => {
2667                tracing::warn!("check_cwd_changed: failed to get cwd: {e}");
2668                return;
2669            }
2670        };
2671        if current == self.lifecycle.last_known_cwd {
2672            return;
2673        }
2674        let old_cwd = std::mem::replace(&mut self.lifecycle.last_known_cwd, current.clone());
2675        self.session.env_context.working_dir = current.display().to_string();
2676
2677        tracing::info!(
2678            old = %old_cwd.display(),
2679            new = %current.display(),
2680            "working directory changed"
2681        );
2682
2683        let _ = self
2684            .channel
2685            .send_status("Working directory changed\u{2026}")
2686            .await;
2687
2688        let hooks = self.session.hooks_config.cwd_changed.clone();
2689        if !hooks.is_empty() {
2690            let mut env = std::collections::HashMap::new();
2691            env.insert("ZEPH_OLD_CWD".to_owned(), old_cwd.display().to_string());
2692            env.insert("ZEPH_NEW_CWD".to_owned(), current.display().to_string());
2693            if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env).await {
2694                tracing::warn!(error = %e, "CwdChanged hook failed");
2695            }
2696        }
2697
2698        let _ = self.channel.send_status("").await;
2699    }
2700
2701    /// Handle a `FileChangedEvent` from the file watcher.
2702    pub(crate) async fn handle_file_changed(
2703        &mut self,
2704        event: crate::file_watcher::FileChangedEvent,
2705    ) {
2706        tracing::info!(path = %event.path.display(), "file changed");
2707
2708        let _ = self
2709            .channel
2710            .send_status("Running file-change hook\u{2026}")
2711            .await;
2712
2713        let hooks = self.session.hooks_config.file_changed_hooks.clone();
2714        if !hooks.is_empty() {
2715            let mut env = std::collections::HashMap::new();
2716            env.insert(
2717                "ZEPH_CHANGED_PATH".to_owned(),
2718                event.path.display().to_string(),
2719            );
2720            if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env).await {
2721                tracing::warn!(error = %e, "FileChanged hook failed");
2722            }
2723        }
2724
2725        let _ = self.channel.send_status("").await;
2726    }
2727}
2728pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
2729    while !*rx.borrow_and_update() {
2730        if rx.changed().await.is_err() {
2731            std::future::pending::<()>().await;
2732        }
2733    }
2734}
2735
2736pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
2737    match rx {
2738        Some(inner) => {
2739            if let Some(v) = inner.recv().await {
2740                Some(v)
2741            } else {
2742                *rx = None;
2743                std::future::pending().await
2744            }
2745        }
2746        None => std::future::pending().await,
2747    }
2748}
2749
2750/// Resolve the effective context budget from config, applying the `auto_budget` fallback.
2751///
2752/// Mirrors `AppBuilder::auto_budget_tokens` so hot-reload and initial startup use the same
2753/// logic: if `auto_budget = true` and `context_budget_tokens == 0`, query the provider's
2754/// context window; if still 0, fall back to 128 000 tokens.
2755pub(crate) fn resolve_context_budget(config: &Config, provider: &AnyProvider) -> usize {
2756    let tokens = if config.memory.auto_budget && config.memory.context_budget_tokens == 0 {
2757        if let Some(ctx_size) = provider.context_window() {
2758            tracing::info!(
2759                model_context = ctx_size,
2760                "auto-configured context budget on reload"
2761            );
2762            ctx_size
2763        } else {
2764            0
2765        }
2766    } else {
2767        config.memory.context_budget_tokens
2768    };
2769    if tokens == 0 {
2770        tracing::warn!(
2771            "context_budget_tokens resolved to 0 on reload — using fallback of 128000 tokens"
2772        );
2773        128_000
2774    } else {
2775        tokens
2776    }
2777}
2778
2779#[cfg(test)]
2780mod tests;
2781
2782#[cfg(test)]
2783pub(crate) use tests::agent_tests;
2784
2785#[cfg(test)]
2786mod test_stubs {
2787    use std::pin::Pin;
2788
2789    use zeph_commands::{
2790        CommandContext, CommandError, CommandHandler, CommandOutput, SlashCategory,
2791    };
2792
2793    /// Stub slash command registered only in `#[cfg(test)]` builds.
2794    ///
2795    /// Triggers the `Some(Err(CommandError))` arm in the session/debug registry
2796    /// dispatch block so the non-fatal error path can be tested without production
2797    /// command validation logic.
2798    pub(super) struct TestErrorCommand;
2799
2800    impl CommandHandler<CommandContext<'_>> for TestErrorCommand {
2801        fn name(&self) -> &'static str {
2802            "/test-error"
2803        }
2804
2805        fn description(&self) -> &'static str {
2806            "Test stub: always returns CommandError"
2807        }
2808
2809        fn category(&self) -> SlashCategory {
2810            SlashCategory::Session
2811        }
2812
2813        fn handle<'a>(
2814            &'a self,
2815            _ctx: &'a mut CommandContext<'_>,
2816            _args: &'a str,
2817        ) -> Pin<
2818            Box<dyn std::future::Future<Output = Result<CommandOutput, CommandError>> + Send + 'a>,
2819        > {
2820            Box::pin(async { Err(CommandError::new("boom")) })
2821        }
2822    }
2823}