Skip to main content

zeph_core/agent/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4mod acp_commands;
5mod agent_access_impl;
6pub(crate) mod agent_supervisor;
7mod autodream;
8mod builder;
9mod command_context_impls;
10pub(crate) mod compaction_strategy;
11pub(super) mod compression_feedback;
12mod context;
13mod context_impls;
14pub(crate) mod context_manager;
15mod corrections;
16pub mod error;
17mod experiment_cmd;
18pub(super) mod feedback_detector;
19pub(crate) mod focus;
20mod index;
21mod learning;
22pub(crate) mod learning_engine;
23mod log_commands;
24mod loop_event;
25mod lsp_commands;
26mod magic_docs;
27mod mcp;
28mod message_queue;
29mod microcompact;
30mod model_commands;
31mod persistence;
32#[cfg(feature = "scheduler")]
33mod plan;
34mod policy_commands;
35mod provider_cmd;
36#[cfg(feature = "self-check")]
37mod quality_hook;
38pub(crate) mod rate_limiter;
39#[cfg(feature = "scheduler")]
40mod scheduler_commands;
41#[cfg(feature = "scheduler")]
42mod scheduler_loop;
43pub mod session_config;
44mod session_digest;
45pub(crate) mod sidequest;
46mod skill_management;
47pub mod slash_commands;
48pub mod speculative;
49pub(crate) mod state;
50pub(crate) mod task_injection;
51pub(crate) mod tool_execution;
52pub(crate) mod tool_orchestrator;
53mod trust_commands;
54pub mod turn;
55mod utils;
56pub(crate) mod vigil;
57
58use std::collections::{HashMap, VecDeque};
59use std::fmt::Write as _;
60use std::sync::Arc;
61
62use parking_lot::RwLock;
63
64use tokio::sync::{mpsc, watch};
65use tokio_util::sync::CancellationToken;
66use zeph_llm::any::AnyProvider;
67use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
68use zeph_memory::TokenCounter;
69use zeph_memory::semantic::SemanticMemory;
70use zeph_skills::loader::Skill;
71use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
72use zeph_skills::prompt::format_skills_prompt;
73use zeph_skills::registry::SkillRegistry;
74use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
75
76use crate::channel::Channel;
77use crate::config::Config;
78use crate::context::{ContextBudget, build_system_prompt};
79use zeph_common::text::estimate_tokens;
80
81use loop_event::LoopEvent;
82use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, detect_image_mime};
83use state::CompressionState;
84use state::{
85    DebugState, ExperimentState, FeedbackState, IndexState, InstructionState, LifecycleState,
86    McpState, MemoryState, MessageState, MetricsState, OrchestrationState, ProviderState,
87    RuntimeConfig, SecurityState, SessionState, SkillState, ToolState,
88};
89
90pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
91pub(crate) const DOCUMENT_RAG_PREFIX: &str = "## Relevant documents\n";
92pub(crate) const RECALL_PREFIX: &str = "[semantic recall]\n";
93pub(crate) const CODE_CONTEXT_PREFIX: &str = "[code context]\n";
94pub(crate) const SUMMARY_PREFIX: &str = "[conversation summaries]\n";
95pub(crate) const CROSS_SESSION_PREFIX: &str = "[cross-session context]\n";
96pub(crate) const CORRECTIONS_PREFIX: &str = "[past corrections]\n";
97pub(crate) const GRAPH_FACTS_PREFIX: &str = "[known facts]\n";
98pub(crate) const SCHEDULED_TASK_PREFIX: &str = "Execute the following scheduled task now: ";
99pub(crate) const SESSION_DIGEST_PREFIX: &str = "[Session digest from previous interaction]\n";
100/// Prefix used for LSP context messages (`Role::System`) injected into message history.
101/// The tool-pair summarizer targets User/Assistant pairs and skips System messages,
102/// so these notes are never accidentally summarized. `remove_lsp_messages` uses this
103/// prefix to clear stale notes before each fresh injection.
104pub(crate) const LSP_NOTE_PREFIX: &str = "[lsp ";
105pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
106
107pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
108    use std::fmt::Write;
109    let capacity = "[tool output: ".len()
110        + tool_name.len()
111        + "]\n```\n".len()
112        + body.len()
113        + TOOL_OUTPUT_SUFFIX.len();
114    let mut buf = String::with_capacity(capacity);
115    let _ = write!(
116        buf,
117        "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
118    );
119    buf
120}
121
122/// Zeph agent: autonomous AI system with multi-model inference, semantic memory, skills,
123/// tool orchestration, and multi-channel I/O.
124///
125/// The agent maintains conversation history, manages LLM provider state, coordinates tool
126/// execution, and orchestrates memory and skill subsystems. It communicates with the outside
127/// world via the [`Channel`] trait, enabling support for CLI, Telegram, TUI, or custom I/O.
128///
129/// # Architecture
130///
131/// - **Message state**: Conversation history with system prompt, message queue, and metadata
132/// - **Memory state**: `SQLite` + Qdrant vector store for semantic search and compaction
133/// - **Skill state**: Registry, matching engine, and self-learning evolution
134/// - **Context manager**: Token budgeting, context assembly, and summarization
135/// - **Tool orchestrator**: DAG-based multi-tool execution with streaming output
136/// - **MCP client**: Multi-server support for Model Context Protocol
137/// - **Index state**: AST-based code indexing and semantic retrieval
138/// - **Security**: Sanitization, exfiltration detection, adversarial probes
139/// - **Metrics**: Token usage, latency, cost, and anomaly tracking
140///
141/// # Channel Contract
142///
143/// The agent requires a [`Channel`] implementation for user interaction:
144/// - Sends agent responses via `channel.send(message)`
145/// - Receives user input via `channel.recv()` / `channel.recv_internal()`
146/// - Supports structured events: tool invocations, tool output, streaming updates
147///
148/// # Lifecycle
149///
150/// 1. Create with [`Self::new`] or [`Self::new_with_registry_arc`]
151/// 2. Run main loop with [`Self::run`]
152/// 3. Clean up with [`Self::shutdown`] to persist state and close resources
153pub struct Agent<C: Channel> {
154    provider: AnyProvider,
155    /// Dedicated embedding provider. Resolved once at bootstrap from `[[llm.providers]]`
156    /// (the entry with `embed = true`, or first entry with `embedding_model` set).
157    /// Falls back to `provider.clone()` when no dedicated entry exists.
158    /// **Never replaced** by `/provider switch`.
159    embedding_provider: AnyProvider,
160    channel: C,
161    pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
162    pub(super) msg: MessageState,
163    pub(super) memory_state: MemoryState,
164    pub(super) skill_state: SkillState,
165    pub(super) context_manager: context_manager::ContextManager,
166    pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
167    pub(super) learning_engine: learning_engine::LearningEngine,
168    pub(super) feedback: FeedbackState,
169    pub(super) runtime: RuntimeConfig,
170    pub(super) mcp: McpState,
171    pub(super) index: IndexState,
172    pub(super) session: SessionState,
173    pub(super) debug_state: DebugState,
174    pub(super) instructions: InstructionState,
175    pub(super) security: SecurityState,
176    pub(super) experiments: ExperimentState,
177    pub(super) compression: CompressionState,
178    pub(super) lifecycle: LifecycleState,
179    pub(super) providers: ProviderState,
180    pub(super) metrics: MetricsState,
181    pub(super) orchestration: OrchestrationState,
182    /// Focus agent state: active session tracking, knowledge block, reminder counters (#1850).
183    pub(super) focus: focus::FocusState,
184    /// `SideQuest` state: cursor tracking, turn counter, eviction stats (#1885).
185    pub(super) sidequest: sidequest::SidequestState,
186    /// Tool filtering, dependency tracking, and iteration bookkeeping.
187    pub(super) tool_state: ToolState,
188    /// MARCH self-check pipeline, built at startup and rebuilt on provider swap.
189    #[cfg(feature = "self-check")]
190    pub(super) quality: Option<std::sync::Arc<crate::quality::SelfCheckPipeline>>,
191    /// Proactive world-knowledge explorer (#3320).
192    ///
193    /// `Some` when `config.skills.proactive_exploration.enabled = true`.
194    pub(super) proactive_explorer:
195        Option<std::sync::Arc<zeph_skills::proactive::ProactiveExplorer>>,
196    /// Experience compression spectrum promotion engine (#3305).
197    ///
198    /// `Some` when `config.memory.compression_spectrum.enabled = true`.
199    pub(super) promotion_engine:
200        Option<std::sync::Arc<zeph_memory::compression::promotion::PromotionEngine>>,
201}
202
203/// Control flow signal returned by [`Agent::apply_dispatch_result`].
204enum DispatchFlow {
205    /// The command requested exit; the agent loop should `break`.
206    Break,
207    /// The command was handled; the agent loop should `continue`.
208    Continue,
209    /// The command was not recognised; the agent loop should fall through.
210    Fallthrough,
211}
212
213impl<C: Channel> Agent<C> {
214    /// Create a new agent instance with the given LLM provider, I/O channel, and subsystems.
215    ///
216    /// # Arguments
217    ///
218    /// * `provider` — Multi-model LLM provider (Claude, `OpenAI`, Ollama, Candle)
219    /// * `channel` — I/O abstraction for user interaction (CLI, Telegram, TUI, etc.)
220    /// * `registry` — Skill registry; moved into an internal `Arc<RwLock<_>>` for sharing
221    /// * `matcher` — Optional semantic skill matcher (e.g., Qdrant, BM25). If `None`,
222    ///   skills are matched by exact name only
223    /// * `max_active_skills` — Max concurrent skills in execution (must be > 0)
224    /// * `tool_executor` — Trait object for executing shell, web, and custom tools
225    ///
226    /// # Initialization
227    ///
228    /// The constructor:
229    /// 1. Wraps the skill registry into `Arc<RwLock<_>>` internally
230    /// 2. Builds the system prompt from registered skills
231    /// 3. Initializes all subsystems (memory, context manager, metrics, security)
232    /// 4. Returns a ready-to-run agent
233    ///
234    /// # Panics
235    ///
236    /// Panics if `max_active_skills` is 0.
237    #[must_use]
238    pub fn new(
239        provider: AnyProvider,
240        channel: C,
241        registry: SkillRegistry,
242        matcher: Option<SkillMatcherBackend>,
243        max_active_skills: usize,
244        tool_executor: impl ToolExecutor + 'static,
245    ) -> Self {
246        let registry = Arc::new(RwLock::new(registry));
247        let embedding_provider = provider.clone();
248        Self::new_with_registry_arc(
249            provider,
250            embedding_provider,
251            channel,
252            registry,
253            matcher,
254            max_active_skills,
255            tool_executor,
256        )
257    }
258
259    /// Create an agent from a pre-wrapped registry Arc, allowing the caller to
260    /// share the same Arc with other components (e.g. [`crate::SkillLoaderExecutor`]).
261    ///
262    /// # Panics
263    ///
264    /// Panics if the registry `RwLock` is poisoned.
265    #[must_use]
266    #[allow(clippy::too_many_lines)] // flat struct literal initializing all Agent sub-structs — one field per sub-struct, cannot be split further
267    pub fn new_with_registry_arc(
268        provider: AnyProvider,
269        embedding_provider: AnyProvider,
270        channel: C,
271        registry: Arc<RwLock<SkillRegistry>>,
272        matcher: Option<SkillMatcherBackend>,
273        max_active_skills: usize,
274        tool_executor: impl ToolExecutor + 'static,
275    ) -> Self {
276        debug_assert!(max_active_skills > 0, "max_active_skills must be > 0");
277        let all_skills: Vec<Skill> = {
278            let reg = registry.read();
279            reg.all_meta()
280                .iter()
281                .filter_map(|m| reg.get_skill(&m.name).ok())
282                .collect()
283        };
284        let empty_trust = HashMap::new();
285        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
286        let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
287        let system_prompt = build_system_prompt(&skills_prompt, None);
288        tracing::debug!(len = system_prompt.len(), "initial system prompt built");
289        tracing::trace!(prompt = %system_prompt, "full system prompt");
290
291        let initial_prompt_tokens = estimate_tokens(&system_prompt) as u64;
292        let token_counter = Arc::new(TokenCounter::new());
293        Self {
294            provider,
295            embedding_provider,
296            channel,
297            tool_executor: Arc::new(tool_executor),
298            msg: MessageState {
299                messages: vec![Message {
300                    role: Role::System,
301                    content: system_prompt,
302                    parts: vec![],
303                    metadata: MessageMetadata::default(),
304                }],
305                message_queue: VecDeque::new(),
306                pending_image_parts: Vec::new(),
307                last_persisted_message_id: None,
308                deferred_db_hide_ids: Vec::new(),
309                deferred_db_summaries: Vec::new(),
310            },
311            memory_state: MemoryState::default(),
312            skill_state: SkillState::new(registry, matcher, max_active_skills, skills_prompt),
313            context_manager: context_manager::ContextManager::new(),
314            tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
315            learning_engine: learning_engine::LearningEngine::new(),
316            feedback: FeedbackState::default(),
317            debug_state: DebugState::default(),
318            runtime: RuntimeConfig::default(),
319            mcp: McpState::default(),
320            index: IndexState::default(),
321            session: SessionState::new(),
322            instructions: InstructionState::default(),
323            security: SecurityState::default(),
324            experiments: ExperimentState::new(),
325            compression: CompressionState::default(),
326            lifecycle: LifecycleState::new(),
327            providers: ProviderState::new(initial_prompt_tokens),
328            metrics: MetricsState::new(token_counter),
329            orchestration: OrchestrationState::default(),
330            focus: focus::FocusState::default(),
331            sidequest: sidequest::SidequestState::default(),
332            tool_state: ToolState::default(),
333            #[cfg(feature = "self-check")]
334            quality: None,
335            proactive_explorer: None,
336            promotion_engine: None,
337        }
338    }
339
340    /// Poll all active sub-agents for completed/failed/canceled results.
341    ///
342    /// Non-blocking: returns immediately with a list of `(task_id, result)` pairs
343    /// for agents that have finished. Each completed agent is removed from the manager.
344    pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
345        let Some(mgr) = &mut self.orchestration.subagent_manager else {
346            return vec![];
347        };
348
349        let finished: Vec<String> = mgr
350            .statuses()
351            .into_iter()
352            .filter_map(|(id, status)| {
353                if matches!(
354                    status.state,
355                    zeph_subagent::SubAgentState::Completed
356                        | zeph_subagent::SubAgentState::Failed
357                        | zeph_subagent::SubAgentState::Canceled
358                ) {
359                    Some(id)
360                } else {
361                    None
362                }
363            })
364            .collect();
365
366        let mut results = vec![];
367        for task_id in finished {
368            match mgr.collect(&task_id).await {
369                Ok(result) => results.push((task_id, result)),
370                Err(e) => {
371                    tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
372                }
373            }
374        }
375        results
376    }
377
378    /// Call the LLM to generate a structured session summary with a configurable timeout.
379    ///
380    /// Falls back to plain-text chat if structured output fails or times out. Returns `None` on
381    /// any failure, logging a warning — callers must treat `None` as "skip storage".
382    ///
383    /// Each LLM attempt is bounded by `shutdown_summary_timeout_secs`; in the worst case
384    /// (structured call times out and plain-text fallback also times out) this adds up to
385    /// `2 * shutdown_summary_timeout_secs` of shutdown latency.
386    async fn call_llm_for_session_summary(
387        &self,
388        chat_messages: &[Message],
389    ) -> Option<zeph_memory::StructuredSummary> {
390        let timeout_dur = std::time::Duration::from_secs(
391            self.memory_state.compaction.shutdown_summary_timeout_secs,
392        );
393        match tokio::time::timeout(
394            timeout_dur,
395            self.provider
396                .chat_typed_erased::<zeph_memory::StructuredSummary>(chat_messages),
397        )
398        .await
399        {
400            Ok(Ok(s)) => Some(s),
401            Ok(Err(e)) => {
402                tracing::warn!(
403                    "shutdown summary: structured LLM call failed, falling back to plain: {e:#}"
404                );
405                self.plain_text_summary_fallback(chat_messages, timeout_dur)
406                    .await
407            }
408            Err(_) => {
409                tracing::warn!(
410                    "shutdown summary: structured LLM call timed out after {}s, falling back to plain",
411                    self.memory_state.compaction.shutdown_summary_timeout_secs
412                );
413                self.plain_text_summary_fallback(chat_messages, timeout_dur)
414                    .await
415            }
416        }
417    }
418
419    async fn plain_text_summary_fallback(
420        &self,
421        chat_messages: &[Message],
422        timeout_dur: std::time::Duration,
423    ) -> Option<zeph_memory::StructuredSummary> {
424        match tokio::time::timeout(timeout_dur, self.provider.chat(chat_messages)).await {
425            Ok(Ok(plain)) => Some(zeph_memory::StructuredSummary {
426                summary: plain,
427                key_facts: vec![],
428                entities: vec![],
429            }),
430            Ok(Err(e)) => {
431                tracing::warn!("shutdown summary: plain LLM fallback failed: {e:#}");
432                None
433            }
434            Err(_) => {
435                tracing::warn!("shutdown summary: plain LLM fallback timed out");
436                None
437            }
438        }
439    }
440
441    /// Persist tombstone `ToolResult` messages for any assistant `ToolUse` parts that were written
442    /// to the DB during this session but never paired with a `ToolResult` (e.g. because stdin
443    /// closed while tool execution was in progress). Without this the next session startup strips
444    /// those assistant messages and emits orphan warnings.
445    async fn flush_orphaned_tool_use_on_shutdown(&mut self) {
446        use zeph_llm::provider::{MessagePart, Role};
447
448        // Walk messages in reverse: if the last assistant message (ignoring any trailing
449        // system messages) has ToolUse parts and is NOT immediately followed by a user
450        // message whose ToolResult ids cover those ToolUse ids, persist tombstones.
451        let msgs = &self.msg.messages;
452        // Find last assistant message index.
453        let Some(asst_idx) = msgs.iter().rposition(|m| m.role == Role::Assistant) else {
454            return;
455        };
456        let asst_msg = &msgs[asst_idx];
457        let tool_use_ids: Vec<(&str, &str, &serde_json::Value)> = asst_msg
458            .parts
459            .iter()
460            .filter_map(|p| {
461                if let MessagePart::ToolUse { id, name, input } = p {
462                    Some((id.as_str(), name.as_str(), input))
463                } else {
464                    None
465                }
466            })
467            .collect();
468        if tool_use_ids.is_empty() {
469            return;
470        }
471
472        // Check whether a following user message already pairs all ToolUse ids.
473        let paired_ids: std::collections::HashSet<&str> = msgs
474            .get(asst_idx + 1..)
475            .into_iter()
476            .flatten()
477            .filter(|m| m.role == Role::User)
478            .flat_map(|m| m.parts.iter())
479            .filter_map(|p| {
480                if let MessagePart::ToolResult { tool_use_id, .. } = p {
481                    Some(tool_use_id.as_str())
482                } else {
483                    None
484                }
485            })
486            .collect();
487
488        let unpaired: Vec<zeph_llm::provider::ToolUseRequest> = tool_use_ids
489            .iter()
490            .filter(|(id, _, _)| !paired_ids.contains(*id))
491            .map(|(id, name, input)| zeph_llm::provider::ToolUseRequest {
492                id: (*id).to_owned(),
493                name: (*name).to_owned().into(),
494                input: (*input).clone(),
495            })
496            .collect();
497
498        if unpaired.is_empty() {
499            return;
500        }
501
502        tracing::info!(
503            count = unpaired.len(),
504            "shutdown: persisting tombstone ToolResults for unpaired in-flight tool calls"
505        );
506        self.persist_cancelled_tool_results(&unpaired).await;
507    }
508
509    /// Generate and store a lightweight session summary at shutdown when no hard compaction fired.
510    ///
511    /// Guards:
512    /// - `shutdown_summary` config must be enabled
513    /// - `conversation_id` must be set (memory must be attached)
514    /// - no existing session summary in the store (primary guard — resilient to failed Qdrant writes)
515    /// - at least `shutdown_summary_min_messages` user-turn messages in history
516    ///
517    /// All errors are logged as warnings and swallowed — shutdown must never fail.
518    async fn maybe_store_shutdown_summary(&mut self) {
519        if !self.memory_state.compaction.shutdown_summary {
520            return;
521        }
522        let Some(memory) = self.memory_state.persistence.memory.clone() else {
523            return;
524        };
525        let Some(conversation_id) = self.memory_state.persistence.conversation_id else {
526            return;
527        };
528
529        // Primary guard: check if a summary already exists (handles failed Qdrant writes too).
530        match memory.has_session_summary(conversation_id).await {
531            Ok(true) => {
532                tracing::debug!("shutdown summary: session already has a summary, skipping");
533                return;
534            }
535            Ok(false) => {}
536            Err(e) => {
537                tracing::warn!("shutdown summary: failed to check existing summary: {e:#}");
538                return;
539            }
540        }
541
542        // Count user-turn messages only (skip system prompt at index 0).
543        let user_count = self
544            .msg
545            .messages
546            .iter()
547            .skip(1)
548            .filter(|m| m.role == Role::User)
549            .count();
550        if user_count < self.memory_state.compaction.shutdown_summary_min_messages {
551            tracing::debug!(
552                user_count,
553                min = self.memory_state.compaction.shutdown_summary_min_messages,
554                "shutdown summary: too few user messages, skipping"
555            );
556            return;
557        }
558
559        // TUI status — send errors silently ignored (TUI may already be gone at shutdown).
560        let _ = self.channel.send_status("Saving session summary...").await;
561
562        // Collect last N messages (skip system prompt at index 0).
563        let max = self.memory_state.compaction.shutdown_summary_max_messages;
564        if max == 0 {
565            tracing::debug!("shutdown summary: max_messages=0, skipping");
566            return;
567        }
568        let non_system: Vec<_> = self.msg.messages.iter().skip(1).collect();
569        let slice = if non_system.len() > max {
570            &non_system[non_system.len() - max..]
571        } else {
572            &non_system[..]
573        };
574
575        let msgs_for_prompt: Vec<(zeph_memory::MessageId, String, String)> = slice
576            .iter()
577            .map(|m| {
578                let role = match m.role {
579                    Role::User => "user".to_owned(),
580                    Role::Assistant => "assistant".to_owned(),
581                    Role::System => "system".to_owned(),
582                };
583                (zeph_memory::MessageId(0), role, m.content.clone())
584            })
585            .collect();
586
587        let prompt = zeph_memory::build_summarization_prompt(&msgs_for_prompt);
588        let chat_messages = vec![Message {
589            role: Role::User,
590            content: prompt,
591            parts: vec![],
592            metadata: MessageMetadata::default(),
593        }];
594
595        let Some(structured) = self.call_llm_for_session_summary(&chat_messages).await else {
596            let _ = self.channel.send_status("").await;
597            return;
598        };
599
600        if let Err(e) = memory
601            .store_shutdown_summary(conversation_id, &structured.summary, &structured.key_facts)
602            .await
603        {
604            tracing::warn!("shutdown summary: storage failed: {e:#}");
605        } else {
606            tracing::info!(
607                conversation_id = conversation_id.0,
608                "shutdown summary stored"
609            );
610        }
611
612        // Clear TUI status.
613        let _ = self.channel.send_status("").await;
614    }
615
616    /// Gracefully shut down the agent and persist state.
617    ///
618    /// Performs the following cleanup:
619    ///
620    /// 1. **Message persistence** — Deferred database writes (hide/summary operations)
621    ///    are flushed to memory or disk
622    /// 2. **Provider state** — LLM router state (e.g., Thompson sampling counters) is saved
623    ///    to the vault
624    /// 3. **Sub-agents** — All active sub-agent tasks are terminated
625    /// 4. **MCP servers** — All connected Model Context Protocol servers are shut down
626    /// 5. **Metrics finalization** — Compaction metrics and session metrics are recorded
627    /// 6. **Memory finalization** — Vector stores and semantic indices are flushed
628    /// 7. **Skill state** — Self-learning engine saves evolved skill definitions
629    ///
630    /// Call this before dropping the agent to ensure no data loss.
631    pub async fn shutdown(&mut self) {
632        let _ = self.channel.send_status("Shutting down...").await;
633
634        // CRIT-1: persist Thompson state accumulated during this session.
635        self.provider.save_router_state();
636
637        // Persist AdaptOrch Beta-arm table alongside Thompson state.
638        if let Some(ref advisor) = self.orchestration.topology_advisor
639            && let Err(e) = advisor.save()
640        {
641            tracing::warn!(error = %e, "adaptorch: failed to persist state");
642        }
643
644        if let Some(ref mut mgr) = self.orchestration.subagent_manager {
645            mgr.shutdown_all();
646        }
647
648        if let Some(ref manager) = self.mcp.manager {
649            manager.shutdown_all_shared().await;
650        }
651
652        // Finalize compaction trajectory: push the last open segment into the Vec.
653        // This segment would otherwise only be pushed when the next hard compaction fires,
654        // which never happens at session end.
655        if let Some(turns) = self.context_manager.turns_since_last_hard_compaction {
656            self.update_metrics(|m| {
657                m.compaction_turns_after_hard.push(turns);
658            });
659            self.context_manager.turns_since_last_hard_compaction = None;
660        }
661
662        if let Some(ref tx) = self.metrics.metrics_tx {
663            let m = tx.borrow();
664            if m.filter_applications > 0 {
665                #[allow(clippy::cast_precision_loss)]
666                let pct = if m.filter_raw_tokens > 0 {
667                    m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
668                } else {
669                    0.0
670                };
671                tracing::info!(
672                    raw_tokens = m.filter_raw_tokens,
673                    saved_tokens = m.filter_saved_tokens,
674                    applications = m.filter_applications,
675                    "tool output filtering saved ~{} tokens ({pct:.0}%)",
676                    m.filter_saved_tokens,
677                );
678            }
679            if m.compaction_hard_count > 0 {
680                tracing::info!(
681                    hard_compactions = m.compaction_hard_count,
682                    turns_after_hard = ?m.compaction_turns_after_hard,
683                    "hard compaction trajectory"
684                );
685            }
686        }
687
688        // Flush tombstone ToolResults for any assistant ToolUse that was persisted but never
689        // paired with a ToolResult (e.g. stdin EOF mid-execution). Without this the next session
690        // startup strips the orphaned ToolUse and emits warnings.
691        self.flush_orphaned_tool_use_on_shutdown().await;
692
693        self.lifecycle.supervisor.abort_all();
694
695        self.maybe_store_shutdown_summary().await;
696        self.maybe_store_session_digest().await;
697
698        tracing::info!("agent shutdown complete");
699    }
700
701    /// Run the chat loop, receiving messages via the channel until EOF or shutdown.
702    ///
703    /// # Errors
704    ///
705    /// Returns an error if channel I/O or LLM communication fails.
706    /// Refresh sub-agent metrics snapshot for the TUI metrics panel.
707    fn refresh_subagent_metrics(&mut self) {
708        let Some(ref mgr) = self.orchestration.subagent_manager else {
709            return;
710        };
711        let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
712            .statuses()
713            .into_iter()
714            .map(|(id, s)| {
715                let def = mgr.agents_def(&id);
716                crate::metrics::SubAgentMetrics {
717                    name: def.map_or_else(|| id[..8.min(id.len())].to_owned(), |d| d.name.clone()),
718                    id: id.clone(),
719                    state: format!("{:?}", s.state).to_lowercase(),
720                    turns_used: s.turns_used,
721                    max_turns: def.map_or(20, |d| d.permissions.max_turns),
722                    background: def.is_some_and(|d| d.permissions.background),
723                    elapsed_secs: s.started_at.elapsed().as_secs(),
724                    permission_mode: def.map_or_else(String::new, |d| {
725                        use zeph_subagent::def::PermissionMode;
726                        match d.permissions.permission_mode {
727                            PermissionMode::Default => String::new(),
728                            PermissionMode::AcceptEdits => "accept_edits".into(),
729                            PermissionMode::DontAsk => "dont_ask".into(),
730                            PermissionMode::BypassPermissions => "bypass_permissions".into(),
731                            PermissionMode::Plan => "plan".into(),
732                        }
733                    }),
734                    transcript_dir: mgr
735                        .agent_transcript_dir(&id)
736                        .map(|p| p.to_string_lossy().into_owned()),
737                }
738            })
739            .collect();
740        self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
741    }
742
743    /// Non-blocking poll: notify the user when background sub-agents complete.
744    async fn notify_completed_subagents(&mut self) -> Result<(), error::AgentError> {
745        let completed = self.poll_subagents().await;
746        for (task_id, result) in completed {
747            let notice = if result.is_empty() {
748                format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
749            } else {
750                format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
751            };
752            if let Err(e) = self.channel.send(&notice).await {
753                tracing::warn!(error = %e, "failed to send sub-agent completion notice");
754            }
755        }
756        Ok(())
757    }
758
759    /// Run the agent main loop.
760    ///
761    /// # Errors
762    ///
763    /// Returns an error if the channel, LLM provider, or tool execution encounters a fatal error.
764    #[allow(clippy::too_many_lines)] // run loop is inherently large; each branch is independent
765    pub async fn run(&mut self) -> Result<(), error::AgentError>
766    where
767        C: 'static,
768    {
769        if let Some(mut rx) = self.lifecycle.warmup_ready.take()
770            && !*rx.borrow()
771        {
772            let _ = rx.changed().await;
773            if !*rx.borrow() {
774                tracing::warn!("model warmup did not complete successfully");
775            }
776        }
777
778        // Restore the last-used provider preference before any user interaction (#3308).
779        self.restore_channel_provider().await;
780
781        // Load the session digest once at session start for context injection.
782        self.load_and_cache_session_digest().await;
783        self.maybe_send_resume_recap().await;
784
785        loop {
786            self.apply_provider_override();
787            self.check_tool_refresh().await;
788            self.process_pending_elicitations().await;
789            self.refresh_subagent_metrics();
790            self.notify_completed_subagents().await?;
791            self.drain_channel();
792
793            let (text, image_parts) = if let Some(queued) = self.msg.message_queue.pop_front() {
794                self.notify_queue_count().await;
795                if queued.raw_attachments.is_empty() {
796                    (queued.text, queued.image_parts)
797                } else {
798                    let msg = crate::channel::ChannelMessage {
799                        text: queued.text,
800                        attachments: queued.raw_attachments,
801                    };
802                    self.resolve_message(msg).await
803                }
804            } else {
805                match self.next_event().await? {
806                    None | Some(LoopEvent::Shutdown) => break,
807                    Some(LoopEvent::SkillReload) => {
808                        self.reload_skills().await;
809                        continue;
810                    }
811                    Some(LoopEvent::InstructionReload) => {
812                        self.reload_instructions();
813                        continue;
814                    }
815                    Some(LoopEvent::ConfigReload) => {
816                        self.reload_config();
817                        continue;
818                    }
819                    Some(LoopEvent::UpdateNotification(msg)) => {
820                        if let Err(e) = self.channel.send(&msg).await {
821                            tracing::warn!("failed to send update notification: {e}");
822                        }
823                        continue;
824                    }
825                    Some(LoopEvent::ExperimentCompleted(msg)) => {
826                        self.experiments.cancel = None;
827                        if let Err(e) = self.channel.send(&msg).await {
828                            tracing::warn!("failed to send experiment completion: {e}");
829                        }
830                        continue;
831                    }
832                    Some(LoopEvent::ScheduledTask(prompt)) => {
833                        let text = format!("{SCHEDULED_TASK_PREFIX}{prompt}");
834                        let msg = crate::channel::ChannelMessage {
835                            text,
836                            attachments: Vec::new(),
837                        };
838                        self.drain_channel();
839                        self.resolve_message(msg).await
840                    }
841                    Some(LoopEvent::TaskInjected(injection)) => {
842                        if let Some(ref mut ls) = self.lifecycle.user_loop {
843                            ls.iteration += 1;
844                            tracing::info!(iteration = ls.iteration, "loop: tick");
845                        }
846                        let msg = crate::channel::ChannelMessage {
847                            text: injection.prompt,
848                            attachments: Vec::new(),
849                        };
850                        self.drain_channel();
851                        self.resolve_message(msg).await
852                    }
853                    Some(LoopEvent::FileChanged(event)) => {
854                        self.handle_file_changed(event).await;
855                        continue;
856                    }
857                    Some(LoopEvent::Message(msg)) => {
858                        self.drain_channel();
859                        self.resolve_message(msg).await
860                    }
861                }
862            };
863
864            let trimmed = text.trim();
865
866            // M3: extract flagged URLs from all slash commands before any registry dispatch,
867            // so `/skill install <url>` and similar commands populate user_provided_urls.
868            if trimmed.starts_with('/') {
869                let slash_urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
870                if !slash_urls.is_empty() {
871                    self.security.user_provided_urls.write().extend(slash_urls);
872                }
873            }
874
875            // Registry dispatch: build the command registry, construct CommandContext, dispatch.
876            // The registry is built inline (all handlers are ZSTs) to avoid borrow-checker
877            // conflicts when constructing CommandContext from Agent<C> fields.
878            // Build context first so that borrows outlive the registry (drop order matters).
879            let session_impl = command_context_impls::SessionAccessImpl {
880                supports_exit: self.channel.supports_exit(),
881            };
882            let mut messages_impl = command_context_impls::MessageAccessImpl {
883                msg: &mut self.msg,
884                tool_state: &mut self.tool_state,
885                providers: &mut self.providers,
886                metrics: &self.metrics,
887                security: &mut self.security,
888                tool_orchestrator: &mut self.tool_orchestrator,
889            };
890            // sink_adapter declared before reg so it is dropped after reg (LIFO).
891            let mut sink_adapter = crate::channel::ChannelSinkAdapter(&mut self.channel);
892            // null_agent must be declared before reg so it lives longer (LIFO drop order).
893            let mut null_agent = zeph_commands::NullAgent;
894            let registry_handled = {
895                use zeph_commands::CommandRegistry;
896                use zeph_commands::handlers::debug::{
897                    DebugDumpCommand, DumpFormatCommand, LogCommand,
898                };
899                use zeph_commands::handlers::help::HelpCommand;
900                use zeph_commands::handlers::session::{
901                    ClearCommand, ClearQueueCommand, ExitCommand, QuitCommand, ResetCommand,
902                };
903
904                let mut reg = CommandRegistry::new();
905                reg.register(ExitCommand);
906                reg.register(QuitCommand);
907                reg.register(ClearCommand);
908                reg.register(ResetCommand);
909                reg.register(ClearQueueCommand);
910                reg.register(LogCommand);
911                reg.register(DebugDumpCommand);
912                reg.register(DumpFormatCommand);
913                reg.register(HelpCommand);
914                #[cfg(test)]
915                reg.register(test_stubs::TestErrorCommand);
916
917                let mut ctx = zeph_commands::CommandContext {
918                    sink: &mut sink_adapter,
919                    debug: &mut self.debug_state,
920                    messages: &mut messages_impl,
921                    session: &session_impl,
922                    agent: &mut null_agent,
923                };
924                reg.dispatch(&mut ctx, trimmed).await
925            };
926            let session_reg_missed = registry_handled.is_none();
927            match self
928                .apply_dispatch_result(registry_handled, trimmed, false)
929                .await
930            {
931                DispatchFlow::Break => break,
932                DispatchFlow::Continue => continue,
933                DispatchFlow::Fallthrough => {
934                    // Not handled by the session/debug registry; try agent-command registry.
935                }
936            }
937
938            // Agent-command registry: handlers access Agent<C> directly.
939            // Null sentinels declared here so they outlive ctx regardless of whether the `if`
940            // block is entered. `ctx` borrows both `self` and the sentinels; it must drop before
941            // any subsequent `self.channel.*` calls. Because Rust drops in LIFO order, the
942            // sentinels here will outlive ctx (ctx is declared later, inside the block).
943            let mut agent_null_debug = command_context_impls::NullDebugAccess;
944            let mut agent_null_messages = command_context_impls::NullMessageAccess;
945            let agent_null_session = command_context_impls::NullSessionAccess;
946            let mut agent_null_sink = zeph_commands::NullSink;
947            let agent_result: Option<
948                Result<zeph_commands::CommandOutput, zeph_commands::CommandError>,
949            > = if session_reg_missed {
950                use zeph_commands::CommandRegistry;
951                use zeph_commands::handlers::{
952                    acp::AcpCommand,
953                    agent_cmd::AgentCommand,
954                    compaction::{CompactCommand, NewConversationCommand, RecapCommand},
955                    experiment::ExperimentCommand,
956                    loop_cmd::LoopCommand,
957                    lsp::LspCommand,
958                    mcp::McpCommand,
959                    memory::{GraphCommand, GuidelinesCommand, MemoryCommand},
960                    misc::{CacheStatsCommand, ImageCommand, NotifyTestCommand},
961                    model::{ModelCommand, ProviderCommand},
962                    plan::PlanCommand,
963                    plugins::PluginsCommand,
964                    policy::PolicyCommand,
965                    scheduler::SchedulerCommand,
966                    skill::{FeedbackCommand, SkillCommand, SkillsCommand},
967                    status::{FocusCommand, GuardrailCommand, SideQuestCommand, StatusCommand},
968                };
969
970                let mut agent_reg = CommandRegistry::new();
971                agent_reg.register(MemoryCommand);
972                agent_reg.register(GraphCommand);
973                agent_reg.register(GuidelinesCommand);
974                agent_reg.register(ModelCommand);
975                agent_reg.register(ProviderCommand);
976                // Phase 6 migrations: /skill, /skills, /feedback use clone-before-await pattern.
977                agent_reg.register(SkillCommand);
978                agent_reg.register(SkillsCommand);
979                agent_reg.register(FeedbackCommand);
980                agent_reg.register(McpCommand);
981                agent_reg.register(PolicyCommand);
982                agent_reg.register(SchedulerCommand);
983                agent_reg.register(LspCommand);
984                // Phase 4 migrations (Send-safe commands):
985                agent_reg.register(CacheStatsCommand);
986                agent_reg.register(ImageCommand);
987                agent_reg.register(NotifyTestCommand);
988                agent_reg.register(StatusCommand);
989                agent_reg.register(GuardrailCommand);
990                agent_reg.register(FocusCommand);
991                agent_reg.register(SideQuestCommand);
992                agent_reg.register(AgentCommand);
993                // Phase 5 migrations (Send-compatible):
994                agent_reg.register(CompactCommand);
995                agent_reg.register(NewConversationCommand);
996                agent_reg.register(RecapCommand);
997                agent_reg.register(ExperimentCommand);
998                agent_reg.register(PlanCommand);
999                agent_reg.register(LoopCommand);
1000                agent_reg.register(PluginsCommand);
1001                agent_reg.register(AcpCommand);
1002
1003                let mut ctx = zeph_commands::CommandContext {
1004                    sink: &mut agent_null_sink,
1005                    debug: &mut agent_null_debug,
1006                    messages: &mut agent_null_messages,
1007                    session: &agent_null_session,
1008                    agent: self,
1009                };
1010                // self is reborrowed; ctx drops at end of this block.
1011                agent_reg.dispatch(&mut ctx, trimmed).await
1012            } else {
1013                None
1014            };
1015            // self.channel is available again here (ctx borrow dropped above).
1016            // Post-dispatch learning hook for `/skill reject` / `/feedback` is triggered
1017            // inside apply_dispatch_result when with_learning = true.
1018            match self
1019                .apply_dispatch_result(agent_result, trimmed, true)
1020                .await
1021            {
1022                DispatchFlow::Break => break,
1023                DispatchFlow::Continue => continue,
1024                DispatchFlow::Fallthrough => {
1025                    // Not handled by agent registry; fall through to existing dispatch.
1026                }
1027            }
1028
1029            match self.handle_builtin_command(trimmed) {
1030                Some(true) => break,
1031                Some(false) => continue,
1032                None => {}
1033            }
1034
1035            self.process_user_message(text, image_parts).await?;
1036        }
1037
1038        // autoDream: run background memory consolidation if conditions are met (#2697).
1039        // Runs with a timeout — partial state is acceptable for MVP.
1040        self.maybe_autodream().await;
1041
1042        // Flush trace collector on normal exit (C-04: Drop handles error/panic paths).
1043        if let Some(ref mut tc) = self.debug_state.trace_collector {
1044            tc.finish();
1045        }
1046
1047        Ok(())
1048    }
1049
1050    /// Dispatch a slash-command registry result and flush the channel.
1051    ///
1052    /// Returns [`DispatchFlow::Break`] on exit, [`DispatchFlow::Continue`] when handled, or
1053    /// [`DispatchFlow::Fallthrough`] when `result` is `None`.
1054    /// When `with_learning` is `true`, triggers the post-command learning hook for `Message` output.
1055    async fn apply_dispatch_result(
1056        &mut self,
1057        result: Option<Result<zeph_commands::CommandOutput, zeph_commands::CommandError>>,
1058        command: &str,
1059        with_learning: bool,
1060    ) -> DispatchFlow {
1061        match result {
1062            Some(Ok(zeph_commands::CommandOutput::Exit)) => {
1063                let _ = self.channel.flush_chunks().await;
1064                DispatchFlow::Break
1065            }
1066            Some(Ok(
1067                zeph_commands::CommandOutput::Continue | zeph_commands::CommandOutput::Silent,
1068            )) => {
1069                let _ = self.channel.flush_chunks().await;
1070                DispatchFlow::Continue
1071            }
1072            Some(Ok(zeph_commands::CommandOutput::Message(msg))) => {
1073                let _ = self.channel.send(&msg).await;
1074                let _ = self.channel.flush_chunks().await;
1075                if with_learning {
1076                    self.maybe_trigger_post_command_learning(command).await;
1077                }
1078                DispatchFlow::Continue
1079            }
1080            Some(Err(e)) => {
1081                let _ = self.channel.send(&e.to_string()).await;
1082                let _ = self.channel.flush_chunks().await;
1083                tracing::warn!(command = %command, error = %e.0, "slash command failed");
1084                DispatchFlow::Continue
1085            }
1086            None => DispatchFlow::Fallthrough,
1087        }
1088    }
1089
1090    /// Apply any pending LLM provider override from ACP `set_session_config_option`.
1091    fn apply_provider_override(&mut self) {
1092        if let Some(ref slot) = self.providers.provider_override
1093            && let Some(new_provider) = slot.write().take()
1094        {
1095            tracing::debug!(provider = new_provider.name(), "ACP model override applied");
1096            self.provider = new_provider;
1097        }
1098    }
1099
1100    /// Poll all event sources and return the next [`LoopEvent`].
1101    ///
1102    /// Returns `None` when the inbound channel closes (graceful shutdown).
1103    ///
1104    /// # Errors
1105    ///
1106    /// Propagates channel receive errors.
1107    async fn next_event(&mut self) -> Result<Option<LoopEvent>, error::AgentError> {
1108        let event = tokio::select! {
1109            result = self.channel.recv() => {
1110                return Ok(result?.map(LoopEvent::Message));
1111            }
1112            () = shutdown_signal(&mut self.lifecycle.shutdown) => {
1113                tracing::info!("shutting down");
1114                LoopEvent::Shutdown
1115            }
1116            Some(_) = recv_optional(&mut self.skill_state.skill_reload_rx) => {
1117                LoopEvent::SkillReload
1118            }
1119            Some(_) = recv_optional(&mut self.instructions.reload_rx) => {
1120                LoopEvent::InstructionReload
1121            }
1122            Some(_) = recv_optional(&mut self.lifecycle.config_reload_rx) => {
1123                LoopEvent::ConfigReload
1124            }
1125            Some(msg) = recv_optional(&mut self.lifecycle.update_notify_rx) => {
1126                LoopEvent::UpdateNotification(msg)
1127            }
1128            Some(msg) = recv_optional(&mut self.experiments.notify_rx) => {
1129                LoopEvent::ExperimentCompleted(msg)
1130            }
1131            Some(prompt) = recv_optional(&mut self.lifecycle.custom_task_rx) => {
1132                tracing::info!("scheduler: injecting custom task as agent turn");
1133                LoopEvent::ScheduledTask(prompt)
1134            }
1135            () = async {
1136                if let Some(ref mut ls) = self.lifecycle.user_loop {
1137                    if ls.cancel_tx.is_cancelled() {
1138                        std::future::pending::<()>().await;
1139                    } else {
1140                        ls.interval.tick().await;
1141                    }
1142                } else {
1143                    std::future::pending::<()>().await;
1144                }
1145            } => {
1146                // Re-check user_loop after the tick — /loop stop may have fired between the
1147                // interval firing and this arm executing. Returning Ok(None) causes the caller
1148                // to `continue` without injecting a stale or empty prompt.
1149                let Some(ls) = self.lifecycle.user_loop.as_ref() else {
1150                    return Ok(None);
1151                };
1152                if ls.cancel_tx.is_cancelled() {
1153                    self.lifecycle.user_loop = None;
1154                    return Ok(None);
1155                }
1156                let prompt = ls.prompt.clone();
1157                LoopEvent::TaskInjected(task_injection::TaskInjection { prompt })
1158            }
1159            Some(event) = recv_optional(&mut self.lifecycle.file_changed_rx) => {
1160                LoopEvent::FileChanged(event)
1161            }
1162        };
1163        Ok(Some(event))
1164    }
1165
1166    async fn resolve_message(
1167        &self,
1168        msg: crate::channel::ChannelMessage,
1169    ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
1170        use crate::channel::{Attachment, AttachmentKind};
1171        use zeph_llm::provider::{ImageData, MessagePart};
1172
1173        let text_base = msg.text.clone();
1174
1175        let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
1176            .attachments
1177            .into_iter()
1178            .partition(|a| a.kind == AttachmentKind::Audio);
1179
1180        tracing::debug!(
1181            audio = audio_attachments.len(),
1182            has_stt = self.providers.stt.is_some(),
1183            "resolve_message attachments"
1184        );
1185
1186        let text = if !audio_attachments.is_empty()
1187            && let Some(stt) = self.providers.stt.as_ref()
1188        {
1189            let mut transcribed_parts = Vec::new();
1190            for attachment in &audio_attachments {
1191                if attachment.data.len() > MAX_AUDIO_BYTES {
1192                    tracing::warn!(
1193                        size = attachment.data.len(),
1194                        max = MAX_AUDIO_BYTES,
1195                        "audio attachment exceeds size limit, skipping"
1196                    );
1197                    continue;
1198                }
1199                match stt
1200                    .transcribe(&attachment.data, attachment.filename.as_deref())
1201                    .await
1202                {
1203                    Ok(result) => {
1204                        tracing::info!(
1205                            len = result.text.len(),
1206                            language = ?result.language,
1207                            "audio transcribed"
1208                        );
1209                        transcribed_parts.push(result.text);
1210                    }
1211                    Err(e) => {
1212                        tracing::error!(error = %e, "audio transcription failed");
1213                    }
1214                }
1215            }
1216            if transcribed_parts.is_empty() {
1217                text_base
1218            } else {
1219                let transcribed = transcribed_parts.join("\n");
1220                if text_base.is_empty() {
1221                    transcribed
1222                } else {
1223                    format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
1224                }
1225            }
1226        } else {
1227            if !audio_attachments.is_empty() {
1228                tracing::warn!(
1229                    count = audio_attachments.len(),
1230                    "audio attachments received but no STT provider configured, dropping"
1231                );
1232            }
1233            text_base
1234        };
1235
1236        let mut image_parts = Vec::new();
1237        for attachment in image_attachments {
1238            if attachment.data.len() > MAX_IMAGE_BYTES {
1239                tracing::warn!(
1240                    size = attachment.data.len(),
1241                    max = MAX_IMAGE_BYTES,
1242                    "image attachment exceeds size limit, skipping"
1243                );
1244                continue;
1245            }
1246            let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
1247            image_parts.push(MessagePart::Image(Box::new(ImageData {
1248                data: attachment.data,
1249                mime_type,
1250            })));
1251        }
1252
1253        (text, image_parts)
1254    }
1255
1256    /// Create a new [`Turn`] for the given input and advance the turn counter.
1257    ///
1258    /// Clears per-turn state that must not carry over between turns:
1259    /// - per-turn `CancellationToken` (new token for each turn)
1260    /// - per-turn URL set in `SecurityState` (cleared here; re-populated in
1261    ///   `process_user_message_inner` after security checks)
1262    fn begin_turn(&mut self, input: turn::TurnInput) -> turn::Turn {
1263        let id = turn::TurnId(self.debug_state.iteration_counter as u64);
1264        self.debug_state.iteration_counter += 1;
1265        self.lifecycle.cancel_token = CancellationToken::new();
1266        self.security.user_provided_urls.write().clear();
1267        // Reset per-turn LLM request counter for the notification gate.
1268        self.lifecycle.turn_llm_requests = 0;
1269        turn::Turn::new(id, input)
1270    }
1271
1272    /// Finalise a turn: copy accumulated timings into `MetricsState` and flush.
1273    ///
1274    /// Must be called exactly once per turn, after `process_user_message_inner` returns
1275    /// (regardless of success or error). Corresponds to the M2 resolution in the spec:
1276    /// `TurnMetrics.timings` is the single source of truth; `MetricsState.pending_timings`
1277    /// is populated from it here so the rest of the pipeline is unchanged.
1278    fn end_turn(&mut self, turn: turn::Turn) {
1279        self.metrics.pending_timings = turn.metrics.timings;
1280        self.flush_turn_timings();
1281        // Clear per-turn intent (FR-008): must not persist across turns.
1282        self.session.current_turn_intent = None;
1283    }
1284
1285    #[cfg_attr(
1286        feature = "profiling",
1287        tracing::instrument(name = "agent.turn", skip_all, fields(turn_id))
1288    )]
1289    async fn process_user_message(
1290        &mut self,
1291        text: String,
1292        image_parts: Vec<zeph_llm::provider::MessagePart>,
1293    ) -> Result<(), error::AgentError> {
1294        let input = turn::TurnInput::new(text, image_parts);
1295        let mut t = self.begin_turn(input);
1296
1297        let turn_idx = usize::try_from(t.id().0).unwrap_or(usize::MAX);
1298        tracing::Span::current().record("turn_id", t.id().0);
1299        // Record iteration start in trace collector (C-02: owned guard, no borrow held).
1300        self.debug_state
1301            .start_iteration_span(turn_idx, t.input.text.trim());
1302
1303        let result = self.process_user_message_inner(&mut t).await;
1304
1305        // Close iteration span regardless of outcome (partial trace preserved on error).
1306        let span_status = if result.is_ok() {
1307            crate::debug_dump::trace::SpanStatus::Ok
1308        } else {
1309            crate::debug_dump::trace::SpanStatus::Error {
1310                message: "iteration failed".to_owned(),
1311            }
1312        };
1313        self.debug_state.end_iteration_span(turn_idx, span_status);
1314
1315        self.end_turn(t);
1316        result
1317    }
1318
1319    async fn process_user_message_inner(
1320        &mut self,
1321        turn: &mut turn::Turn,
1322    ) -> Result<(), error::AgentError> {
1323        self.reap_background_tasks_and_update_metrics();
1324
1325        // Drain any background shell completions that arrived since the last turn.
1326        // They are buffered in `pending_background_completions` and merged with the
1327        // real user message into a single user-role block below (N1 invariant).
1328        self.drain_background_completions();
1329
1330        // Wire the per-turn cancellation token into the cancel bridge.
1331        // The bridge translates the `cancel_signal` (Notify) into a CancellationToken cancel.
1332        // Abort the previous bridge before spawning to prevent unbounded accumulation (#2737).
1333        let signal = Arc::clone(&self.lifecycle.cancel_signal);
1334        let token = turn.cancel_token.clone();
1335        // Keep lifecycle.cancel_token in sync so existing code that reads it still works.
1336        self.lifecycle.cancel_token = turn.cancel_token.clone();
1337        if let Some(prev) = self.lifecycle.cancel_bridge_handle.take() {
1338            prev.abort();
1339        }
1340        self.lifecycle.cancel_bridge_handle = Some(tokio::spawn(async move {
1341            signal.notified().await;
1342            token.cancel();
1343        }));
1344
1345        // Clone text out of Turn so we can hold both `&str` borrows and mutate turn.metrics.
1346        let text = turn.input.text.clone();
1347        let trimmed_owned = text.trim().to_owned();
1348        let trimmed = trimmed_owned.as_str();
1349
1350        // Capture current-turn intent for VIGIL gate (FR-007). Truncated to 1024 chars.
1351        // Must be set BEFORE any tool call; cleared at end_turn (FR-008).
1352        if self.security.vigil.is_some() {
1353            let intent_len = trimmed.floor_char_boundary(1024.min(trimmed.len()));
1354            self.session.current_turn_intent = Some(trimmed[..intent_len].to_owned());
1355        }
1356
1357        if let Some(result) = self.dispatch_slash_command(trimmed).await {
1358            return result;
1359        }
1360
1361        self.check_pending_rollbacks().await;
1362
1363        if self.pre_process_security(trimmed).await? {
1364            return Ok(());
1365        }
1366
1367        let t_ctx = std::time::Instant::now();
1368        tracing::debug!("turn timing: prepare_context start");
1369        self.advance_context_lifecycle_guarded(&text, trimmed).await;
1370        turn.metrics_mut().timings.prepare_context_ms =
1371            u64::try_from(t_ctx.elapsed().as_millis()).unwrap_or(u64::MAX);
1372        tracing::debug!(
1373            ms = turn.metrics_snapshot().timings.prepare_context_ms,
1374            "turn timing: prepare_context done"
1375        );
1376
1377        let image_parts = std::mem::take(&mut turn.input.image_parts);
1378        // Prepend any background completion blocks to the user text. All completions and the
1379        // user message MUST be merged into a single user-role block to satisfy the strict
1380        // user/assistant alternation rule (Anthropic Messages API — N1 invariant).
1381        let merged_text = self.build_user_message_text_with_bg_completions(&text);
1382        let user_msg = self.build_user_message(&merged_text, image_parts);
1383
1384        // Extract URLs from user input and add to user_provided_urls for grounding checks.
1385        // URL set was cleared in begin_turn; re-populate for this turn.
1386        let urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
1387        if !urls.is_empty() {
1388            self.security.user_provided_urls.write().extend(urls);
1389        }
1390
1391        // Capture raw user input as goal text for A-MAC goal-conditioned write gating (#2483).
1392        // Derived from the raw input text before context assembly to avoid timing dependencies.
1393        self.memory_state.extraction.goal_text = Some(text.clone());
1394
1395        let t_persist = std::time::Instant::now();
1396        tracing::debug!("turn timing: persist_message(user) start");
1397        // Image parts intentionally excluded — base64 payloads too large for message history.
1398        self.persist_message(Role::User, &text, &[], false).await;
1399        turn.metrics_mut().timings.persist_message_ms =
1400            u64::try_from(t_persist.elapsed().as_millis()).unwrap_or(u64::MAX);
1401        tracing::debug!(
1402            ms = turn.metrics_snapshot().timings.persist_message_ms,
1403            "turn timing: persist_message(user) done"
1404        );
1405        self.push_message(user_msg);
1406
1407        // llm_chat_ms and tool_exec_ms are accumulated inside call_chat_with_tools and
1408        // handle_native_tool_calls respectively via metrics.pending_timings.
1409        tracing::debug!("turn timing: process_response start");
1410        let turn_had_error = if let Err(e) = self.process_response().await {
1411            // Detach any in-flight learning tasks before mutating message state.
1412            self.learning_engine.learning_tasks.detach_all();
1413            tracing::error!("Response processing failed: {e:#}");
1414
1415            // Record provider failure timestamp so the next turn can skip
1416            // expensive context preparation while providers are known-down.
1417            if e.is_no_providers() {
1418                self.lifecycle.last_no_providers_at = Some(std::time::Instant::now());
1419                let backoff_secs = self.runtime.timeouts.no_providers_backoff_secs;
1420                tracing::warn!(
1421                    backoff_secs,
1422                    "no providers available; backing off before next turn"
1423                );
1424                tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await;
1425            }
1426
1427            let user_msg = format!("Error: {e:#}");
1428            self.channel.send(&user_msg).await?;
1429            self.msg.messages.pop();
1430            self.recompute_prompt_tokens();
1431            self.channel.flush_chunks().await?;
1432            true
1433        } else {
1434            // Detach learning tasks spawned this turn — they are fire-and-forget and must not
1435            // leak into the next turn's context.
1436            self.learning_engine.learning_tasks.detach_all();
1437            self.truncate_old_tool_results();
1438            // MagicDocs: spawn background doc updates if any are due (#2702).
1439            self.maybe_update_magic_docs();
1440            // Compression spectrum: fire-and-forget promotion scan (#3305).
1441            self.maybe_spawn_promotion_scan();
1442            false
1443        };
1444        tracing::debug!("turn timing: process_response done");
1445
1446        // MARCH self-check hook: runs after every successful response, including cache-hit path.
1447        #[cfg(feature = "self-check")]
1448        if let Some(pipeline) = self.quality.clone() {
1449            self.run_self_check_for_turn(pipeline, turn.id().0).await;
1450        }
1451        // Flush pending response chunks and emit ResponseEnd exactly once per turn.
1452        // send() no longer emits ResponseEnd — flush_chunks() is the sole emitter.
1453        // When self-check appends a flag_marker chunk, this single call covers both
1454        // the main response and the marker, preventing the double response_end of #3243.
1455        let _ = self.channel.flush_chunks().await;
1456
1457        self.maybe_fire_completion_notification(turn, turn_had_error);
1458
1459        // Collect llm_chat_ms and tool_exec_ms from MetricsState.pending_timings (accumulated
1460        // by the tool execution chain) into turn.metrics so end_turn can flush them.
1461        // This is the Phase 1 bridging: existing code writes to pending_timings directly;
1462        // we harvest those values into Turn before end_turn overwrites pending_timings.
1463        turn.metrics_mut().timings.llm_chat_ms = self.metrics.pending_timings.llm_chat_ms;
1464        turn.metrics_mut().timings.tool_exec_ms = self.metrics.pending_timings.tool_exec_ms;
1465
1466        Ok(())
1467    }
1468
1469    /// Reap completed background tasks, apply summarization signal, and update supervisor metrics.
1470    ///
1471    /// Called at the top of each turn, before any user message processing.
1472    fn reap_background_tasks_and_update_metrics(&mut self) {
1473        let bg_signal = self.lifecycle.supervisor.reap();
1474        if bg_signal.did_summarize {
1475            self.memory_state.persistence.unsummarized_count = 0;
1476            tracing::debug!("background summarization completed; unsummarized_count reset");
1477        }
1478        let snap = self.lifecycle.supervisor.metrics_snapshot();
1479        self.update_metrics(|m| {
1480            m.bg_inflight = snap.inflight as u64;
1481            m.bg_dropped = snap.total_dropped();
1482            m.bg_completed = snap.total_completed();
1483            m.bg_enrichment_inflight = snap.class_inflight[0] as u64;
1484            m.bg_telemetry_inflight = snap.class_inflight[1] as u64;
1485        });
1486        // Intentional ordering: reap() runs before abort_class() so completed tasks are
1487        // accounted in the snapshot above.
1488        if self.runtime.supervisor_config.abort_enrichment_on_turn {
1489            self.lifecycle
1490                .supervisor
1491                .abort_class(agent_supervisor::TaskClass::Enrichment);
1492        }
1493    }
1494
1495    /// Fire completion notifications and `turn_complete` hooks after each turn.
1496    ///
1497    /// Builds [`crate::notifications::TurnSummary`] once and reuses it for both the
1498    /// [`crate::notifications::Notifier`] and any `[[hooks.turn_complete]]` entries. The
1499    /// `preview` field is already redacted by [`Self::last_assistant_preview`], so hook
1500    /// env vars carry no raw assistant output.
1501    ///
1502    /// Gating:
1503    /// - When a `Notifier` is configured, both the notifier and hooks share its
1504    ///   `should_fire` gate (`min_turn_duration_ms`, `only_on_error`, `enabled`).
1505    /// - When no `Notifier` is configured, hooks fire on every turn completion (the
1506    ///   notifier path is simply skipped).
1507    fn maybe_fire_completion_notification(&mut self, turn: &turn::Turn, is_error: bool) {
1508        let snap = turn.metrics_snapshot().timings.clone();
1509        let duration_ms = snap
1510            .prepare_context_ms
1511            .saturating_add(snap.llm_chat_ms)
1512            .saturating_add(snap.tool_exec_ms);
1513        let summary = crate::notifications::TurnSummary {
1514            duration_ms,
1515            preview: self.last_assistant_preview(160),
1516            // TODO: wire turn_tool_calls counter once LifecycleState tracks it (Phase 2).
1517            tool_calls: 0,
1518            llm_requests: self.lifecycle.turn_llm_requests,
1519            exit_status: if is_error {
1520                crate::notifications::TurnExitStatus::Error
1521            } else {
1522                crate::notifications::TurnExitStatus::Success
1523            },
1524        };
1525
1526        // Gate evaluation: notifier's should_fire result (or unconditional when absent).
1527        let gate_ok = self
1528            .lifecycle
1529            .notifier
1530            .as_ref()
1531            .is_none_or(|n| n.should_fire(&summary));
1532
1533        // 1) Existing notifier path — unchanged semantics.
1534        if let Some(ref notifier) = self.lifecycle.notifier
1535            && gate_ok
1536        {
1537            notifier.fire(&summary);
1538        }
1539
1540        // 2) turn_complete hooks — fire-and-forget, matching the Notifier::fire pattern.
1541        // MCP dispatch is omitted: turn_complete hooks are expected to be Command-type
1542        // (shell scripts for desktop notifications, status-bar updates, etc.). McpTool
1543        // hooks in this context would require a Send + 'static MCP handle; defer that
1544        // extension if needed via a follow-up.
1545        let hooks = self.session.hooks_config.turn_complete.clone();
1546        if !hooks.is_empty() && gate_ok {
1547            let mut env = std::collections::HashMap::new();
1548            env.insert(
1549                "ZEPH_TURN_DURATION_MS".to_owned(),
1550                summary.duration_ms.to_string(),
1551            );
1552            env.insert(
1553                "ZEPH_TURN_STATUS".to_owned(),
1554                if is_error { "error" } else { "success" }.to_owned(),
1555            );
1556            env.insert("ZEPH_TURN_PREVIEW".to_owned(), summary.preview.clone());
1557            env.insert(
1558                "ZEPH_TURN_LLM_REQUESTS".to_owned(),
1559                summary.llm_requests.to_string(),
1560            );
1561            let _span = tracing::info_span!("core.agent.turn_hooks").entered();
1562            let _accepted = self.lifecycle.supervisor.spawn(
1563                agent_supervisor::TaskClass::Telemetry,
1564                "turn-complete-hooks",
1565                async move {
1566                    // Explicitly 'static so the future satisfies tokio::spawn's bound.
1567                    // None holds no actual reference; the annotation is vacuously satisfied.
1568                    let no_mcp: Option<&'static dyn zeph_subagent::McpDispatch> = None;
1569                    if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, no_mcp).await {
1570                        tracing::warn!(error = %e, "turn_complete hook failed");
1571                    }
1572                },
1573            );
1574        }
1575    }
1576
1577    // Returns true if the input was blocked and the caller should return Ok(()) immediately.
1578    #[cfg_attr(
1579        feature = "profiling",
1580        tracing::instrument(name = "agent.security_prescreen", skip_all)
1581    )]
1582    async fn pre_process_security(&mut self, trimmed: &str) -> Result<bool, error::AgentError> {
1583        // Guardrail: LLM-based prompt injection pre-screening at the user input boundary.
1584        if let Some(ref guardrail) = self.security.guardrail {
1585            use zeph_sanitizer::guardrail::GuardrailVerdict;
1586            let verdict = guardrail.check(trimmed).await;
1587            match &verdict {
1588                GuardrailVerdict::Flagged { reason, .. } => {
1589                    tracing::warn!(
1590                        reason = %reason,
1591                        should_block = verdict.should_block(),
1592                        "guardrail flagged user input"
1593                    );
1594                    if verdict.should_block() {
1595                        let msg = format!("[guardrail] Input blocked: {reason}");
1596                        let _ = self.channel.send(&msg).await;
1597                        let _ = self.channel.flush_chunks().await;
1598                        return Ok(true);
1599                    }
1600                    // Warn mode: notify but continue.
1601                    let _ = self
1602                        .channel
1603                        .send(&format!("[guardrail] Warning: {reason}"))
1604                        .await;
1605                }
1606                GuardrailVerdict::Error { error } => {
1607                    if guardrail.error_should_block() {
1608                        tracing::warn!(%error, "guardrail check failed (fail_strategy=closed), blocking input");
1609                        let msg = "[guardrail] Input blocked: check failed (see logs for details)";
1610                        let _ = self.channel.send(msg).await;
1611                        let _ = self.channel.flush_chunks().await;
1612                        return Ok(true);
1613                    }
1614                    tracing::warn!(%error, "guardrail check failed (fail_strategy=open), allowing input");
1615                }
1616                GuardrailVerdict::Safe => {}
1617            }
1618        }
1619
1620        // ML classifier: lightweight injection detection on user input boundary.
1621        // Runs after guardrail (LLM-based) to layer defenses. On detection, blocks and returns.
1622        // Falls back to regex on classifier error/timeout — never degrades below regex baseline.
1623        // Gated by `scan_user_input`: DeBERTa is tuned for external/untrusted content, not
1624        // direct user chat. Disabled by default to prevent false positives on benign messages.
1625        #[cfg(feature = "classifiers")]
1626        if self.security.sanitizer.scan_user_input() {
1627            match self.security.sanitizer.classify_injection(trimmed).await {
1628                zeph_sanitizer::InjectionVerdict::Blocked => {
1629                    self.push_classifier_metrics();
1630                    let _ = self
1631                        .channel
1632                        .send("[security] Input blocked: injection detected by classifier.")
1633                        .await;
1634                    let _ = self.channel.flush_chunks().await;
1635                    return Ok(true);
1636                }
1637                zeph_sanitizer::InjectionVerdict::Suspicious => {
1638                    tracing::warn!("injection_classifier soft_signal on user input");
1639                }
1640                zeph_sanitizer::InjectionVerdict::Clean => {}
1641            }
1642        }
1643        #[cfg(feature = "classifiers")]
1644        self.push_classifier_metrics();
1645
1646        Ok(false)
1647    }
1648
1649    /// Run `advance_context_lifecycle` with provider-health gating and a wall-clock timeout.
1650    ///
1651    /// Skips context preparation entirely when providers failed on the previous turn and the
1652    /// `no_providers_backoff_secs` window has not yet elapsed. When providers are available,
1653    /// wraps the call with `context_prep_timeout_secs` to prevent a stall when embed backends
1654    /// are rate-limited or unavailable (#3357).
1655    async fn advance_context_lifecycle_guarded(&mut self, text: &str, trimmed: &str) {
1656        let backoff_secs = self.runtime.timeouts.no_providers_backoff_secs;
1657        let prep_timeout_secs = self.runtime.timeouts.context_prep_timeout_secs;
1658
1659        // Skip expensive memory recall / embedding when providers are known-down.
1660        let providers_recently_failed = self
1661            .lifecycle
1662            .last_no_providers_at
1663            .is_some_and(|t| t.elapsed().as_secs() < backoff_secs);
1664
1665        if providers_recently_failed {
1666            tracing::warn!(
1667                backoff_secs,
1668                "skipping context preparation: providers were unavailable on last turn"
1669            );
1670            return;
1671        }
1672
1673        let timeout_dur = std::time::Duration::from_secs(prep_timeout_secs);
1674        match tokio::time::timeout(timeout_dur, self.advance_context_lifecycle(text, trimmed)).await
1675        {
1676            Ok(()) => {}
1677            Err(_elapsed) => {
1678                tracing::warn!(
1679                    timeout_secs = prep_timeout_secs,
1680                    "context preparation timed out; proceeding with degraded context"
1681                );
1682            }
1683        }
1684    }
1685
1686    #[cfg_attr(
1687        feature = "profiling",
1688        tracing::instrument(name = "agent.prepare_context", skip_all)
1689    )]
1690    async fn advance_context_lifecycle(&mut self, text: &str, trimmed: &str) {
1691        // Reset per-message pruning cache at the start of each turn (#2298).
1692        self.mcp.pruning_cache.reset();
1693
1694        // Extract before rebuild_system_prompt so the value is not tainted
1695        // by the secrets-bearing system prompt (ConversationId is just an i64).
1696        let conv_id = self.memory_state.persistence.conversation_id;
1697        self.rebuild_system_prompt(text).await;
1698
1699        self.detect_and_record_corrections(trimmed, conv_id).await;
1700        self.learning_engine.tick();
1701        self.analyze_and_learn().await;
1702        self.sync_graph_counts().await;
1703
1704        // Reset per-turn compaction guard FIRST so SideQuest sees a clean slate (C2 fix).
1705        // complete_focus and maybe_sidequest_eviction set this flag when they run (C1 fix).
1706        // advance_turn() transitions CompactedThisTurn → Cooling/Ready; all other states
1707        // pass through unchanged. See CompactionState::advance_turn for ordering guarantees.
1708        self.context_manager.compaction = self.context_manager.compaction.advance_turn();
1709
1710        // Tick Focus Agent and SideQuest turn counters (#1850, #1885).
1711        {
1712            self.focus.tick();
1713
1714            // SideQuest eviction: runs every N user turns when enabled.
1715            // Skipped when is_compacted_this_turn (focus truncation or prior eviction ran).
1716            let sidequest_should_fire = self.sidequest.tick();
1717            if sidequest_should_fire && !self.context_manager.compaction.is_compacted_this_turn() {
1718                self.maybe_sidequest_eviction();
1719            }
1720        }
1721
1722        // Experience memory: evolution sweep (fire-and-forget). Runs every N user turns,
1723        // gated on graph + experience config, and only when both stores are attached.
1724        {
1725            let cfg = &self.memory_state.extraction.graph_config.experience;
1726            if cfg.enabled
1727                && cfg.evolution_sweep_enabled
1728                && cfg.evolution_sweep_interval > 0
1729                && self
1730                    .sidequest
1731                    .turn_counter
1732                    .checked_rem(cfg.evolution_sweep_interval as u64)
1733                    == Some(0)
1734                && let Some(memory) = self.memory_state.persistence.memory.as_ref()
1735                && let (Some(exp), Some(graph)) =
1736                    (memory.experience.as_ref(), memory.graph_store.as_ref())
1737            {
1738                let exp = std::sync::Arc::clone(exp);
1739                let graph = std::sync::Arc::clone(graph);
1740                let threshold = cfg.confidence_prune_threshold;
1741                let turn = self.sidequest.turn_counter;
1742                let accepted = self.lifecycle.supervisor.spawn(
1743                    agent_supervisor::TaskClass::Telemetry,
1744                    "experience-sweep",
1745                    async move {
1746                        match exp.evolution_sweep(graph.as_ref(), threshold).await {
1747                            Ok(stats) => tracing::info!(
1748                                turn,
1749                                self_loops = stats.pruned_self_loops,
1750                                low_confidence = stats.pruned_low_confidence,
1751                                "evolution sweep complete",
1752                            ),
1753                            Err(e) => tracing::warn!(
1754                                turn,
1755                                error = %e,
1756                                "evolution sweep failed",
1757                            ),
1758                        }
1759                    },
1760                );
1761                if !accepted {
1762                    tracing::warn!(
1763                        turn = self.sidequest.turn_counter,
1764                        "experience-sweep dropped (telemetry class at capacity)",
1765                    );
1766                }
1767            }
1768        }
1769
1770        // Cache-expiry warning (#2715): notify user when prompt cache has likely expired.
1771        if let Some(warning) = self.cache_expiry_warning() {
1772            tracing::info!(warning, "cache expiry warning");
1773            let _ = self.channel.send_status(&warning).await;
1774        }
1775
1776        // Time-based microcompact (#2699): strip stale low-value tool outputs before compaction.
1777        // Zero-LLM-cost; runs only when session gap exceeds configured threshold.
1778        self.maybe_time_based_microcompact();
1779
1780        // Tier 0: batch-apply deferred tool summaries when approaching context limit.
1781        // This is a pure in-memory operation (no LLM call) — summaries were pre-computed
1782        // during the tool loop. Intentionally does NOT set compacted_this_turn, so
1783        // proactive/reactive compaction may still fire if tokens remain above their thresholds.
1784        self.maybe_apply_deferred_summaries();
1785        self.flush_deferred_summaries().await;
1786
1787        // Proactive compression fires first (if configured); if it runs, reactive is skipped.
1788        if let Err(e) = self.maybe_proactive_compress().await {
1789            tracing::warn!("proactive compression failed: {e:#}");
1790        }
1791
1792        if let Err(e) = self.maybe_compact().await {
1793            tracing::warn!("context compaction failed: {e:#}");
1794        }
1795
1796        if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
1797            tracing::warn!("context preparation failed: {e:#}");
1798        }
1799
1800        // MAR: propagate top-1 recall confidence to the router for cost-aware routing.
1801        self.provider
1802            .set_memory_confidence(self.memory_state.persistence.last_recall_confidence);
1803
1804        self.learning_engine.reset_reflection();
1805    }
1806
1807    fn build_user_message(
1808        &mut self,
1809        text: &str,
1810        image_parts: Vec<zeph_llm::provider::MessagePart>,
1811    ) -> Message {
1812        let mut all_image_parts = std::mem::take(&mut self.msg.pending_image_parts);
1813        all_image_parts.extend(image_parts);
1814
1815        if !all_image_parts.is_empty() && self.provider.supports_vision() {
1816            let mut parts = vec![zeph_llm::provider::MessagePart::Text {
1817                text: text.to_owned(),
1818            }];
1819            parts.extend(all_image_parts);
1820            Message::from_parts(Role::User, parts)
1821        } else {
1822            if !all_image_parts.is_empty() {
1823                tracing::warn!(
1824                    count = all_image_parts.len(),
1825                    "image attachments dropped: provider does not support vision"
1826                );
1827            }
1828            Message {
1829                role: Role::User,
1830                content: text.to_owned(),
1831                parts: vec![],
1832                metadata: MessageMetadata::default(),
1833            }
1834        }
1835    }
1836
1837    /// Drain any ready [`zeph_tools::BackgroundCompletion`]s from the channel into
1838    /// `pending_background_completions`. Bounded by `BACKGROUND_COMPLETION_BUFFER_CAP`;
1839    /// on overflow the oldest entry is evicted and a placeholder is inserted.
1840    fn drain_background_completions(&mut self) {
1841        const BACKGROUND_COMPLETION_BUFFER_CAP: usize = 16;
1842
1843        let Some(ref mut rx) = self.lifecycle.background_completion_rx else {
1844            return;
1845        };
1846        // Non-blocking drain: collect all completions that are already ready.
1847        while let Ok(completion) = rx.try_recv() {
1848            if self.lifecycle.pending_background_completions.len()
1849                >= BACKGROUND_COMPLETION_BUFFER_CAP
1850            {
1851                tracing::warn!(
1852                    run_id = %completion.run_id,
1853                    "background completion buffer full; dropping run result"
1854                );
1855                // Buffer is full: drop the oldest queued completion and push a sentinel
1856                // for the new (incoming) run so the LLM is informed its result was lost.
1857                self.lifecycle.pending_background_completions.pop_front();
1858                self.lifecycle.pending_background_completions.push_back(
1859                    zeph_tools::BackgroundCompletion {
1860                        run_id: completion.run_id,
1861                        exit_code: -1,
1862                        success: false,
1863                        elapsed_ms: 0,
1864                        command: completion.command,
1865                        output: format!(
1866                            "[background result for run {} dropped: buffer overflow]",
1867                            completion.run_id
1868                        ),
1869                    },
1870                );
1871            } else {
1872                self.lifecycle
1873                    .pending_background_completions
1874                    .push_back(completion);
1875            }
1876        }
1877    }
1878
1879    /// Format and drain `pending_background_completions` into a prefix string, then
1880    /// return the final merged text (prefix + user message). When there are no pending
1881    /// completions the original text is returned unchanged.
1882    fn build_user_message_text_with_bg_completions(&mut self, user_text: &str) -> String {
1883        if self.lifecycle.pending_background_completions.is_empty() {
1884            return user_text.to_owned();
1885        }
1886        let mut parts = String::new();
1887        for completion in self.lifecycle.pending_background_completions.drain(..) {
1888            let _ = write!(
1889                parts,
1890                "[Background task {} completed]\nexit_code: {}\nsuccess: {}\nelapsed_ms: {}\ncommand: {}\n\n{}\n\n",
1891                completion.run_id,
1892                completion.exit_code,
1893                completion.success,
1894                completion.elapsed_ms,
1895                completion.command,
1896                completion.output,
1897            );
1898        }
1899        parts.push_str(user_text);
1900        parts
1901    }
1902
1903    /// Poll a sub-agent until it reaches a terminal state, bridging secret requests to the
1904    /// channel. Returns a human-readable status string suitable for sending to the user.
1905    async fn poll_subagent_until_done(&mut self, task_id: &str, label: &str) -> Option<String> {
1906        use zeph_subagent::SubAgentState;
1907        let result = loop {
1908            tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1909
1910            // Bridge secret requests from sub-agent to channel.confirm().
1911            // Fetch the pending request first, then release the borrow before
1912            // calling channel.confirm() (which requires &mut self).
1913            #[allow(clippy::redundant_closure_for_method_calls)]
1914            let pending = self
1915                .orchestration
1916                .subagent_manager
1917                .as_mut()
1918                .and_then(|m| m.try_recv_secret_request());
1919            if let Some((req_task_id, req)) = pending {
1920                // req.secret_key is pre-validated to [a-zA-Z0-9_-] in manager.rs
1921                // (SEC-P1-02), so it is safe to embed in the prompt string.
1922                let confirm_prompt = format!(
1923                    "Sub-agent requests secret '{}'. Allow?",
1924                    crate::text::truncate_to_chars(&req.secret_key, 100)
1925                );
1926                let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
1927                if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1928                    if approved {
1929                        let ttl = std::time::Duration::from_mins(5);
1930                        let key = req.secret_key.clone();
1931                        if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
1932                            let _ = mgr.deliver_secret(&req_task_id, key);
1933                        }
1934                    } else {
1935                        let _ = mgr.deny_secret(&req_task_id);
1936                    }
1937                }
1938            }
1939
1940            let mgr = self.orchestration.subagent_manager.as_ref()?;
1941            let statuses = mgr.statuses();
1942            let Some((_, status)) = statuses.iter().find(|(id, _)| id == task_id) else {
1943                break format!("{label} completed (no status available).");
1944            };
1945            match status.state {
1946                SubAgentState::Completed => {
1947                    let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
1948                    break format!("{label} completed: {msg}");
1949                }
1950                SubAgentState::Failed => {
1951                    let msg = status
1952                        .last_message
1953                        .clone()
1954                        .unwrap_or_else(|| "unknown error".into());
1955                    break format!("{label} failed: {msg}");
1956                }
1957                SubAgentState::Canceled => {
1958                    break format!("{label} was cancelled.");
1959                }
1960                _ => {
1961                    let _ = self
1962                        .channel
1963                        .send_status(&format!(
1964                            "{label}: turn {}/{}",
1965                            status.turns_used,
1966                            self.orchestration
1967                                .subagent_manager
1968                                .as_ref()
1969                                .and_then(|m| m.agents_def(task_id))
1970                                .map_or(20, |d| d.permissions.max_turns)
1971                        ))
1972                        .await;
1973                }
1974            }
1975        };
1976        Some(result)
1977    }
1978
1979    /// Resolve a unique full `task_id` from a prefix. Returns `None` if the manager is absent,
1980    /// `Some(Err(msg))` on ambiguity/not-found, `Some(Ok(full_id))` on success.
1981    fn resolve_agent_id_prefix(&mut self, prefix: &str) -> Option<Result<String, String>> {
1982        let mgr = self.orchestration.subagent_manager.as_mut()?;
1983        let full_ids: Vec<String> = mgr
1984            .statuses()
1985            .into_iter()
1986            .map(|(tid, _)| tid)
1987            .filter(|tid| tid.starts_with(prefix))
1988            .collect();
1989        Some(match full_ids.as_slice() {
1990            [] => Err(format!("No sub-agent with id prefix '{prefix}'")),
1991            [fid] => Ok(fid.clone()),
1992            _ => Err(format!(
1993                "Ambiguous id prefix '{prefix}': matches {} agents",
1994                full_ids.len()
1995            )),
1996        })
1997    }
1998
1999    fn handle_agent_list(&self) -> Option<String> {
2000        use std::fmt::Write as _;
2001        let mgr = self.orchestration.subagent_manager.as_ref()?;
2002        let defs = mgr.definitions();
2003        if defs.is_empty() {
2004            return Some("No sub-agent definitions found.".into());
2005        }
2006        let mut out = String::from("Available sub-agents:\n");
2007        for d in defs {
2008            let memory_label = match d.memory {
2009                Some(zeph_subagent::MemoryScope::User) => " [memory:user]",
2010                Some(zeph_subagent::MemoryScope::Project) => " [memory:project]",
2011                Some(zeph_subagent::MemoryScope::Local) => " [memory:local]",
2012                None => "",
2013            };
2014            if let Some(ref src) = d.source {
2015                let _ = writeln!(
2016                    out,
2017                    "  {}{} — {} ({})",
2018                    d.name, memory_label, d.description, src
2019                );
2020            } else {
2021                let _ = writeln!(out, "  {}{} — {}", d.name, memory_label, d.description);
2022            }
2023        }
2024        Some(out)
2025    }
2026
2027    fn handle_agent_status(&self) -> Option<String> {
2028        use std::fmt::Write as _;
2029        let mgr = self.orchestration.subagent_manager.as_ref()?;
2030        let statuses = mgr.statuses();
2031        if statuses.is_empty() {
2032            return Some("No active sub-agents.".into());
2033        }
2034        let mut out = String::from("Active sub-agents:\n");
2035        for (id, s) in &statuses {
2036            let state = format!("{:?}", s.state).to_lowercase();
2037            let elapsed = s.started_at.elapsed().as_secs();
2038            let _ = writeln!(
2039                out,
2040                "  [{short}] {state}  turns={t}  elapsed={elapsed}s  {msg}",
2041                short = &id[..8.min(id.len())],
2042                t = s.turns_used,
2043                msg = s.last_message.as_deref().unwrap_or(""),
2044            );
2045            // Show memory directory path for agents with memory enabled.
2046            if let Some(def) = mgr.agents_def(id)
2047                && let Some(scope) = def.memory
2048                && let Ok(dir) = zeph_subagent::memory::resolve_memory_dir(scope, &def.name)
2049            {
2050                let _ = writeln!(out, "       memory: {}", dir.display());
2051            }
2052        }
2053        Some(out)
2054    }
2055
2056    fn handle_agent_approve(&mut self, id: &str) -> Option<String> {
2057        let full_id = match self.resolve_agent_id_prefix(id)? {
2058            Ok(fid) => fid,
2059            Err(msg) => return Some(msg),
2060        };
2061        let mgr = self.orchestration.subagent_manager.as_mut()?;
2062        if let Some((tid, req)) = mgr.try_recv_secret_request()
2063            && tid == full_id
2064        {
2065            let key = req.secret_key.clone();
2066            let ttl = std::time::Duration::from_mins(5);
2067            if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
2068                return Some(format!("Approve failed: {e}"));
2069            }
2070            if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
2071                return Some(format!("Secret delivery failed: {e}"));
2072            }
2073            return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
2074        }
2075        Some(format!(
2076            "No pending secret request for sub-agent '{full_id}'."
2077        ))
2078    }
2079
2080    fn handle_agent_deny(&mut self, id: &str) -> Option<String> {
2081        let full_id = match self.resolve_agent_id_prefix(id)? {
2082            Ok(fid) => fid,
2083            Err(msg) => return Some(msg),
2084        };
2085        let mgr = self.orchestration.subagent_manager.as_mut()?;
2086        match mgr.deny_secret(&full_id) {
2087            Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
2088            Err(e) => Some(format!("Deny failed: {e}")),
2089        }
2090    }
2091
2092    #[allow(clippy::too_many_lines)]
2093    async fn handle_agent_command(&mut self, cmd: zeph_subagent::AgentCommand) -> Option<String> {
2094        use zeph_subagent::AgentCommand;
2095
2096        match cmd {
2097            AgentCommand::List => self.handle_agent_list(),
2098            AgentCommand::Background { name, prompt } => {
2099                let provider = self.provider.clone();
2100                let tool_executor = Arc::clone(&self.tool_executor);
2101                let skills = self.filtered_skills_for(&name);
2102                let cfg = self.orchestration.subagent_config.clone();
2103                let spawn_ctx = self.build_spawn_context(&cfg);
2104                let mgr = self.orchestration.subagent_manager.as_mut()?;
2105                match mgr.spawn(
2106                    &name,
2107                    &prompt,
2108                    provider,
2109                    tool_executor,
2110                    skills,
2111                    &cfg,
2112                    spawn_ctx,
2113                ) {
2114                    Ok(id) => Some(format!(
2115                        "Sub-agent '{name}' started in background (id: {short})",
2116                        short = &id[..8.min(id.len())]
2117                    )),
2118                    Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
2119                }
2120            }
2121            AgentCommand::Spawn { name, prompt }
2122            | AgentCommand::Mention {
2123                agent: name,
2124                prompt,
2125            } => {
2126                // Foreground spawn: launch and await completion, streaming status to user.
2127                let provider = self.provider.clone();
2128                let tool_executor = Arc::clone(&self.tool_executor);
2129                let skills = self.filtered_skills_for(&name);
2130                let cfg = self.orchestration.subagent_config.clone();
2131                let spawn_ctx = self.build_spawn_context(&cfg);
2132                let mgr = self.orchestration.subagent_manager.as_mut()?;
2133                let task_id = match mgr.spawn(
2134                    &name,
2135                    &prompt,
2136                    provider,
2137                    tool_executor,
2138                    skills,
2139                    &cfg,
2140                    spawn_ctx,
2141                ) {
2142                    Ok(id) => id,
2143                    Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
2144                };
2145                let short = task_id[..8.min(task_id.len())].to_owned();
2146                let _ = self
2147                    .channel
2148                    .send(&format!("Sub-agent '{name}' running... (id: {short})"))
2149                    .await;
2150                let label = format!("Sub-agent '{name}'");
2151                self.poll_subagent_until_done(&task_id, &label).await
2152            }
2153            AgentCommand::Status => self.handle_agent_status(),
2154            AgentCommand::Cancel { id } => {
2155                let mgr = self.orchestration.subagent_manager.as_mut()?;
2156                // Accept prefix match on task_id.
2157                let ids: Vec<String> = mgr
2158                    .statuses()
2159                    .into_iter()
2160                    .map(|(task_id, _)| task_id)
2161                    .filter(|task_id| task_id.starts_with(&id))
2162                    .collect();
2163                match ids.as_slice() {
2164                    [] => Some(format!("No sub-agent with id prefix '{id}'")),
2165                    [full_id] => {
2166                        let full_id = full_id.clone();
2167                        match mgr.cancel(&full_id) {
2168                            Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
2169                            Err(e) => Some(format!("Cancel failed: {e}")),
2170                        }
2171                    }
2172                    _ => Some(format!(
2173                        "Ambiguous id prefix '{id}': matches {} agents",
2174                        ids.len()
2175                    )),
2176                }
2177            }
2178            AgentCommand::Approve { id } => self.handle_agent_approve(&id),
2179            AgentCommand::Deny { id } => self.handle_agent_deny(&id),
2180            AgentCommand::Resume { id, prompt } => {
2181                let cfg = self.orchestration.subagent_config.clone();
2182                // Resolve definition name from transcript meta before spawning so we can
2183                // look up skills by definition name rather than the UUID prefix (S1 fix).
2184                let def_name = {
2185                    let mgr = self.orchestration.subagent_manager.as_ref()?;
2186                    match mgr.def_name_for_resume(&id, &cfg) {
2187                        Ok(name) => name,
2188                        Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2189                    }
2190                };
2191                let skills = self.filtered_skills_for(&def_name);
2192                let provider = self.provider.clone();
2193                let tool_executor = Arc::clone(&self.tool_executor);
2194                let mgr = self.orchestration.subagent_manager.as_mut()?;
2195                let (task_id, _) =
2196                    match mgr.resume(&id, &prompt, provider, tool_executor, skills, &cfg) {
2197                        Ok(pair) => pair,
2198                        Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2199                    };
2200                let short = task_id[..8.min(task_id.len())].to_owned();
2201                let _ = self
2202                    .channel
2203                    .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
2204                    .await;
2205                self.poll_subagent_until_done(&task_id, "Resumed sub-agent")
2206                    .await
2207            }
2208        }
2209    }
2210
2211    fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
2212        let mgr = self.orchestration.subagent_manager.as_ref()?;
2213        let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
2214        let reg = self.skill_state.registry.read();
2215        match zeph_subagent::filter_skills(&reg, &def.skills) {
2216            Ok(skills) => {
2217                let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
2218                if bodies.is_empty() {
2219                    None
2220                } else {
2221                    Some(bodies)
2222                }
2223            }
2224            Err(e) => {
2225                tracing::warn!(error = %e, "skill filtering failed for sub-agent");
2226                None
2227            }
2228        }
2229    }
2230
2231    /// Build a `SpawnContext` from current agent state for sub-agent spawning.
2232    fn build_spawn_context(
2233        &self,
2234        cfg: &zeph_config::SubAgentConfig,
2235    ) -> zeph_subagent::SpawnContext {
2236        zeph_subagent::SpawnContext {
2237            parent_messages: self.extract_parent_messages(cfg),
2238            parent_cancel: Some(self.lifecycle.cancel_token.clone()),
2239            parent_provider_name: {
2240                let name = &self.runtime.active_provider_name;
2241                if name.is_empty() {
2242                    None
2243                } else {
2244                    Some(name.clone())
2245                }
2246            },
2247            spawn_depth: self.runtime.spawn_depth,
2248            mcp_tool_names: self.extract_mcp_tool_names(),
2249        }
2250    }
2251
2252    /// Extract recent parent messages for history propagation (Section 5.7 in spec).
2253    ///
2254    /// Filters system messages, takes last `context_window_turns * 2` messages,
2255    /// and applies a 25% context window cap using a 4-chars-per-token heuristic.
2256    fn extract_parent_messages(
2257        &self,
2258        config: &zeph_config::SubAgentConfig,
2259    ) -> Vec<zeph_llm::provider::Message> {
2260        use zeph_llm::provider::Role;
2261        if config.context_window_turns == 0 {
2262            return Vec::new();
2263        }
2264        let non_system: Vec<_> = self
2265            .msg
2266            .messages
2267            .iter()
2268            .filter(|m| m.role != Role::System)
2269            .cloned()
2270            .collect();
2271        let take_count = config.context_window_turns * 2;
2272        let start = non_system.len().saturating_sub(take_count);
2273        let mut msgs = non_system[start..].to_vec();
2274
2275        // Cap at 25% of model context window (rough 4-chars-per-token heuristic).
2276        let max_chars = 128_000usize / 4; // conservative default; 25% of 128K tokens
2277        let mut total_chars: usize = 0;
2278        let mut keep = msgs.len();
2279        for (i, m) in msgs.iter().enumerate() {
2280            total_chars += m.content.len();
2281            if total_chars > max_chars {
2282                keep = i;
2283                break;
2284            }
2285        }
2286        if keep < msgs.len() {
2287            tracing::info!(
2288                kept = keep,
2289                requested = config.context_window_turns * 2,
2290                "[subagent] truncated parent history from {} to {} turns due to token budget",
2291                config.context_window_turns * 2,
2292                keep
2293            );
2294            msgs.truncate(keep);
2295        }
2296        msgs
2297    }
2298
2299    /// Extract MCP tool names from the tool executor for diagnostic annotation.
2300    fn extract_mcp_tool_names(&self) -> Vec<String> {
2301        self.tool_executor
2302            .tool_definitions_erased()
2303            .into_iter()
2304            .filter(|t| t.id.starts_with("mcp_"))
2305            .map(|t| t.id.to_string())
2306            .collect()
2307    }
2308
2309    /// Classify a skill directory's source kind using on-disk markers and the bundled allowlist.
2310    ///
2311    /// Must be called from a blocking context (uses synchronous FS I/O).
2312    fn classify_source_kind(
2313        skill_dir: &std::path::Path,
2314        managed_dir: Option<&std::path::PathBuf>,
2315        bundled_names: &std::collections::HashSet<String>,
2316    ) -> zeph_memory::store::SourceKind {
2317        if managed_dir.is_some_and(|d| skill_dir.starts_with(d)) {
2318            let skill_name = skill_dir.file_name().and_then(|n| n.to_str()).unwrap_or("");
2319            let has_marker = skill_dir.join(".bundled").exists();
2320            if has_marker && bundled_names.contains(skill_name) {
2321                zeph_memory::store::SourceKind::Bundled
2322            } else {
2323                if has_marker {
2324                    tracing::warn!(
2325                        skill = %skill_name,
2326                        "skill has .bundled marker but is not in the bundled skill \
2327                         allowlist — classifying as Hub"
2328                    );
2329                }
2330                zeph_memory::store::SourceKind::Hub
2331            }
2332        } else {
2333            zeph_memory::store::SourceKind::Local
2334        }
2335    }
2336
2337    /// Update trust DB records for all reloaded skills.
2338    async fn update_trust_for_reloaded_skills(
2339        &mut self,
2340        all_meta: &[zeph_skills::loader::SkillMeta],
2341    ) {
2342        // Clone Arc before any .await so no &self fields are held across suspension points.
2343        let memory = self.memory_state.persistence.memory.clone();
2344        let Some(memory) = memory else {
2345            return;
2346        };
2347        let trust_cfg = self.skill_state.trust_config.clone();
2348        let managed_dir = self.skill_state.managed_dir.clone();
2349        let bundled_names: std::collections::HashSet<String> =
2350            zeph_skills::bundled_skill_names().into_iter().collect();
2351        for meta in all_meta {
2352            // Compute hash and classify source_kind in spawn_blocking — both are blocking FS calls
2353            // (.bundled marker .exists() and compute_skill_hash both do std::fs I/O).
2354            let skill_dir = meta.skill_dir.clone();
2355            let managed_dir_ref = managed_dir.clone();
2356            let bundled_names_ref = bundled_names.clone();
2357            let fs_result: Option<(String, zeph_memory::store::SourceKind)> =
2358                tokio::task::spawn_blocking(move || {
2359                    let hash = zeph_skills::compute_skill_hash(&skill_dir).ok()?;
2360                    let source_kind = Self::classify_source_kind(
2361                        &skill_dir,
2362                        managed_dir_ref.as_ref(),
2363                        &bundled_names_ref,
2364                    );
2365                    Some((hash, source_kind))
2366                })
2367                .await
2368                .unwrap_or(None);
2369
2370            let Some((current_hash, source_kind)) = fs_result else {
2371                tracing::warn!("failed to compute hash for '{}'", meta.name);
2372                continue;
2373            };
2374            let initial_level = match source_kind {
2375                zeph_memory::store::SourceKind::Bundled => &trust_cfg.bundled_level,
2376                zeph_memory::store::SourceKind::Hub => &trust_cfg.default_level,
2377                zeph_memory::store::SourceKind::Local | zeph_memory::store::SourceKind::File => {
2378                    &trust_cfg.local_level
2379                }
2380            };
2381            let existing = memory
2382                .sqlite()
2383                .load_skill_trust(&meta.name)
2384                .await
2385                .ok()
2386                .flatten();
2387            let trust_level_str = if let Some(ref row) = existing {
2388                if row.blake3_hash != current_hash {
2389                    trust_cfg.hash_mismatch_level.to_string()
2390                } else if row.source_kind != source_kind {
2391                    // source_kind changed (e.g., hub → bundled on upgrade).
2392                    // Never override an explicit operator block. For active trust levels,
2393                    // adopt the source-kind initial level when it grants more trust.
2394                    let stored = row
2395                        .trust_level
2396                        .parse::<zeph_tools::SkillTrustLevel>()
2397                        .unwrap_or_else(|_| {
2398                            tracing::warn!(
2399                                skill = %meta.name,
2400                                raw = %row.trust_level,
2401                                "unrecognised trust_level in DB, treating as quarantined"
2402                            );
2403                            zeph_tools::SkillTrustLevel::Quarantined
2404                        });
2405                    if !stored.is_active() || stored.severity() <= initial_level.severity() {
2406                        row.trust_level.clone()
2407                    } else {
2408                        initial_level.to_string()
2409                    }
2410                } else {
2411                    row.trust_level.clone()
2412                }
2413            } else {
2414                initial_level.to_string()
2415            };
2416            let source_path = meta.skill_dir.to_str();
2417            if let Err(e) = memory
2418                .sqlite()
2419                .upsert_skill_trust(
2420                    &meta.name,
2421                    &trust_level_str,
2422                    source_kind,
2423                    None,
2424                    source_path,
2425                    &current_hash,
2426                )
2427                .await
2428            {
2429                tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
2430            }
2431        }
2432    }
2433
2434    /// Rebuild or sync the in-memory skill matcher and BM25 index after a registry update.
2435    async fn rebuild_skill_matcher(&mut self, all_meta: &[&zeph_skills::loader::SkillMeta]) {
2436        let provider = self.embedding_provider.clone();
2437        let embed_timeout = std::time::Duration::from_secs(self.runtime.timeouts.embedding_seconds);
2438        let embed_fn = move |text: &str| -> zeph_skills::matcher::EmbedFuture {
2439            let owned = text.to_owned();
2440            let p = provider.clone();
2441            Box::pin(async move {
2442                if let Ok(result) = tokio::time::timeout(embed_timeout, p.embed(&owned)).await {
2443                    result
2444                } else {
2445                    tracing::warn!(
2446                        timeout_secs = embed_timeout.as_secs(),
2447                        "skill matcher: embedding timed out"
2448                    );
2449                    Err(zeph_llm::LlmError::Timeout)
2450                }
2451            })
2452        };
2453
2454        let needs_inmemory_rebuild = !self
2455            .skill_state
2456            .matcher
2457            .as_ref()
2458            .is_some_and(SkillMatcherBackend::is_qdrant);
2459
2460        if needs_inmemory_rebuild {
2461            self.skill_state.matcher = SkillMatcher::new(all_meta, embed_fn)
2462                .await
2463                .map(SkillMatcherBackend::InMemory);
2464        } else if let Some(ref mut backend) = self.skill_state.matcher {
2465            let _ = self.channel.send_status("syncing skill index...").await;
2466            let on_progress: Option<Box<dyn Fn(usize, usize) + Send>> = self
2467                .session
2468                .status_tx
2469                .clone()
2470                .map(|tx| -> Box<dyn Fn(usize, usize) + Send> {
2471                    Box::new(move |completed, total| {
2472                        let msg = format!("Syncing skills: {completed}/{total}");
2473                        let _ = tx.send(msg);
2474                    })
2475                });
2476            if let Err(e) = backend
2477                .sync(
2478                    all_meta,
2479                    &self.skill_state.embedding_model,
2480                    embed_fn,
2481                    on_progress,
2482                )
2483                .await
2484            {
2485                tracing::warn!("failed to sync skill embeddings: {e:#}");
2486            }
2487        }
2488
2489        if self.skill_state.hybrid_search {
2490            let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
2491            let _ = self.channel.send_status("rebuilding search index...").await;
2492            self.skill_state.rebuild_bm25(&descs);
2493        }
2494    }
2495
2496    #[cfg_attr(
2497        feature = "profiling",
2498        tracing::instrument(name = "skill.hot_reload", skip_all)
2499    )]
2500    async fn reload_skills(&mut self) {
2501        let old_fp = self.skill_state.fingerprint();
2502        let reload_paths = if let Some(ref supplier) = self.skill_state.plugin_dirs_supplier {
2503            let plugin_dirs = supplier();
2504            let mut paths = self.skill_state.skill_paths.clone();
2505            for dir in plugin_dirs {
2506                if !paths.contains(&dir) {
2507                    paths.push(dir);
2508                }
2509            }
2510            paths
2511        } else {
2512            self.skill_state.skill_paths.clone()
2513        };
2514        self.skill_state.registry.write().reload(&reload_paths);
2515        if self.skill_state.fingerprint() == old_fp {
2516            return;
2517        }
2518        let _ = self.channel.send_status("reloading skills...").await;
2519
2520        let all_meta = self
2521            .skill_state
2522            .registry
2523            .read()
2524            .all_meta()
2525            .into_iter()
2526            .cloned()
2527            .collect::<Vec<_>>();
2528
2529        self.update_trust_for_reloaded_skills(&all_meta).await;
2530
2531        let all_meta_refs = all_meta.iter().collect::<Vec<_>>();
2532        self.rebuild_skill_matcher(&all_meta_refs).await;
2533
2534        let all_skills: Vec<Skill> = {
2535            let reg = self.skill_state.registry.read();
2536            reg.all_meta()
2537                .iter()
2538                .filter_map(|m| reg.get_skill(&m.name).ok())
2539                .collect()
2540        };
2541        let trust_map = self.build_skill_trust_map().await;
2542        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
2543        let skills_prompt = SkillState::rebuild_prompt(&all_skills, &trust_map, &empty_health);
2544        self.skill_state
2545            .last_skills_prompt
2546            .clone_from(&skills_prompt);
2547        let system_prompt = build_system_prompt(&skills_prompt, None);
2548        if let Some(msg) = self.msg.messages.first_mut() {
2549            msg.content = system_prompt;
2550        }
2551
2552        let _ = self.channel.send_status("").await;
2553        tracing::info!(
2554            "reloaded {} skill(s)",
2555            self.skill_state.registry.read().all_meta().len()
2556        );
2557    }
2558
2559    fn reload_instructions(&mut self) {
2560        // Drain any additional queued events before reloading to avoid redundant reloads.
2561        if let Some(ref mut rx) = self.instructions.reload_rx {
2562            while rx.try_recv().is_ok() {}
2563        }
2564        let Some(ref state) = self.instructions.reload_state else {
2565            return;
2566        };
2567        let new_blocks = crate::instructions::load_instructions(
2568            &state.base_dir,
2569            &state.provider_kinds,
2570            &state.explicit_files,
2571            state.auto_detect,
2572        );
2573        let old_sources: std::collections::HashSet<_> =
2574            self.instructions.blocks.iter().map(|b| &b.source).collect();
2575        let new_sources: std::collections::HashSet<_> =
2576            new_blocks.iter().map(|b| &b.source).collect();
2577        for added in new_sources.difference(&old_sources) {
2578            tracing::info!(path = %added.display(), "instruction file added");
2579        }
2580        for removed in old_sources.difference(&new_sources) {
2581            tracing::info!(path = %removed.display(), "instruction file removed");
2582        }
2583        tracing::info!(
2584            old_count = self.instructions.blocks.len(),
2585            new_count = new_blocks.len(),
2586            "reloaded instruction files"
2587        );
2588        self.instructions.blocks = new_blocks;
2589    }
2590
2591    fn reload_config(&mut self) {
2592        let Some(path) = self.lifecycle.config_path.clone() else {
2593            return;
2594        };
2595        let Some(config) = self.load_config_with_overlay(&path) else {
2596            return;
2597        };
2598        let budget_tokens = resolve_context_budget(&config, &self.provider);
2599        self.runtime.security = config.security;
2600        self.runtime.timeouts = config.timeouts;
2601        self.runtime.redact_credentials = config.memory.redact_credentials;
2602        self.memory_state.persistence.history_limit = config.memory.history_limit;
2603        self.memory_state.persistence.recall_limit = config.memory.semantic.recall_limit;
2604        self.memory_state.compaction.summarization_threshold =
2605            config.memory.summarization_threshold;
2606        self.skill_state.max_active_skills = config.skills.max_active_skills;
2607        self.skill_state.disambiguation_threshold = config.skills.disambiguation_threshold;
2608        self.skill_state.min_injection_score = config.skills.min_injection_score;
2609        self.skill_state.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
2610        self.skill_state.hybrid_search = config.skills.hybrid_search;
2611        self.skill_state.two_stage_matching = config.skills.two_stage_matching;
2612        self.skill_state.confusability_threshold =
2613            config.skills.confusability_threshold.clamp(0.0, 1.0);
2614        config
2615            .skills
2616            .generation_provider
2617            .as_str()
2618            .clone_into(&mut self.skill_state.generation_provider_name);
2619        self.skill_state.generation_output_dir =
2620            config.skills.generation_output_dir.as_deref().map(|p| {
2621                if let Some(stripped) = p.strip_prefix("~/") {
2622                    dirs::home_dir()
2623                        .map_or_else(|| std::path::PathBuf::from(p), |h| h.join(stripped))
2624                } else {
2625                    std::path::PathBuf::from(p)
2626                }
2627            });
2628
2629        self.context_manager.budget = Some(
2630            ContextBudget::new(budget_tokens, 0.20).with_graph_enabled(config.memory.graph.enabled),
2631        );
2632
2633        {
2634            let graph_cfg = &config.memory.graph;
2635            if graph_cfg.rpe.enabled {
2636                // Re-create router only if it doesn't exist yet; preserve state on hot-reload.
2637                if self.memory_state.extraction.rpe_router.is_none() {
2638                    self.memory_state.extraction.rpe_router =
2639                        Some(std::sync::Mutex::new(zeph_memory::RpeRouter::new(
2640                            graph_cfg.rpe.threshold,
2641                            graph_cfg.rpe.max_skip_turns,
2642                        )));
2643                }
2644            } else {
2645                self.memory_state.extraction.rpe_router = None;
2646            }
2647            self.memory_state.extraction.graph_config = graph_cfg.clone();
2648        }
2649        self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
2650        self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
2651        self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
2652        self.context_manager.compaction_cooldown_turns = config.memory.compaction_cooldown_turns;
2653        self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
2654        self.context_manager.compression = config.memory.compression.clone();
2655        self.context_manager.routing = config.memory.store_routing.clone();
2656        // Resolve routing_classifier_provider from the provider pool (#2484).
2657        self.context_manager.store_routing_provider = if config
2658            .memory
2659            .store_routing
2660            .routing_classifier_provider
2661            .is_empty()
2662        {
2663            None
2664        } else {
2665            let resolved = self.resolve_background_provider(
2666                &config.memory.store_routing.routing_classifier_provider,
2667            );
2668            Some(std::sync::Arc::new(resolved))
2669        };
2670        self.memory_state.persistence.cross_session_score_threshold =
2671            config.memory.cross_session_score_threshold;
2672
2673        self.index.repo_map_tokens = config.index.repo_map_tokens;
2674        self.index.repo_map_ttl = std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
2675
2676        tracing::info!("config reloaded");
2677    }
2678
2679    /// Load config from disk, apply plugin overlays, and warn on shell divergence.
2680    ///
2681    /// Returns `None` when loading or overlay merge fails (caller keeps prior runtime state).
2682    fn load_config_with_overlay(&mut self, path: &std::path::Path) -> Option<Config> {
2683        let mut config = match Config::load(path) {
2684            Ok(c) => c,
2685            Err(e) => {
2686                tracing::warn!("config reload failed: {e:#}");
2687                return None;
2688            }
2689        };
2690
2691        // Re-apply plugin overlays. On error, keep previous runtime state intact.
2692        let new_overlay = if self.lifecycle.plugins_dir.as_os_str().is_empty() {
2693            None
2694        } else {
2695            match zeph_plugins::apply_plugin_config_overlays(
2696                &mut config,
2697                &self.lifecycle.plugins_dir,
2698            ) {
2699                Ok(o) => Some(o),
2700                Err(e) => {
2701                    tracing::warn!(
2702                        "plugin overlay merge failed during reload: {e:#}; \
2703                         keeping previous runtime state"
2704                    );
2705                    return None;
2706                }
2707            }
2708        };
2709
2710        // M4: detect shell-level divergence from the baked-in executor and warn loudly.
2711        // ShellExecutor is not rebuilt on hot-reload; only skill threshold is live.
2712        // A follow-up P2 issue tracks live-rebuild of ShellExecutor.
2713        if let Some(ref overlay) = new_overlay {
2714            self.warn_on_shell_overlay_divergence(overlay, &config);
2715        }
2716        Some(config)
2717    }
2718
2719    /// React to shell policy divergence detected on hot-reload.
2720    ///
2721    /// `blocked_commands` is rebuilt live via `ShellPolicyHandle::rebuild` — no restart needed.
2722    /// `allowed_commands` cannot be rebuilt (feeds sandbox path intersection at construction time)
2723    /// — emit a warn + status banner when it changes.
2724    fn warn_on_shell_overlay_divergence(
2725        &self,
2726        new_overlay: &zeph_plugins::ResolvedOverlay,
2727        config: &Config,
2728    ) {
2729        let new_blocked: Vec<String> = {
2730            let mut v = config.tools.shell.blocked_commands.clone();
2731            v.sort();
2732            v
2733        };
2734        let new_allowed: Vec<String> = {
2735            let mut v = config.tools.shell.allowed_commands.clone();
2736            v.sort();
2737            v
2738        };
2739
2740        let startup = &self.lifecycle.startup_shell_overlay;
2741        let blocked_changed = new_blocked != startup.blocked;
2742        let allowed_changed = new_allowed != startup.allowed;
2743
2744        // blocked_commands IS rebuilt live — emit info-level confirmation only.
2745        if blocked_changed && let Some(ref h) = self.lifecycle.shell_policy_handle {
2746            h.rebuild(&config.tools.shell);
2747            tracing::info!(
2748                blocked_count = h.snapshot_blocked().len(),
2749                "shell blocked_commands rebuilt from hot-reload"
2750            );
2751        }
2752
2753        // allowed_commands cannot be rebuilt — sandbox path intersection is computed at
2754        // executor construction time. Warn loudly so the user restarts.
2755        //
2756        // Note: when base `allowed_commands` is empty (the default), the overlay's
2757        // intersection semantics keep it empty, so this branch is silently unreachable
2758        // for users who do not set a non-empty base list.
2759        if allowed_changed {
2760            let msg = "plugin config overlay changed shell allowed_commands; RESTART REQUIRED \
2761                 for sandbox path recomputation (blocked_commands was rebuilt live)";
2762            tracing::warn!("{msg}");
2763            if let Some(ref tx) = self.session.status_tx {
2764                let _ = tx.send(msg.to_owned());
2765            }
2766        }
2767
2768        let _ = new_overlay;
2769    }
2770
2771    /// Run `SideQuest` tool output eviction pass (#1885).
2772    ///
2773    /// PERF-1 fix: two-phase non-blocking design.
2774    ///
2775    /// Phase 1 (apply, this turn): check for a background LLM result spawned last turn,
2776    /// validate and apply it immediately.
2777    ///
2778    /// Phase 2 (schedule, this turn): rebuild cursors and spawn a background `tokio::spawn`
2779    /// task for the LLM call. The result is stored in `pending_sidequest_result` and applied
2780    /// next turn, so the current agent turn is never blocked by the LLM call.
2781    #[allow(clippy::too_many_lines)]
2782    fn maybe_sidequest_eviction(&mut self) {
2783        use zeph_llm::provider::{Message, MessageMetadata, Role};
2784
2785        // S1 runtime guard: warn when SideQuest is enabled alongside a non-Reactive pruning
2786        // strategy — the two systems share the same pool of evictable tool outputs and can
2787        // interfere. Disable sidequest.enabled when pruning_strategy != Reactive.
2788        if self.sidequest.config.enabled {
2789            use crate::config::PruningStrategy;
2790            if !matches!(
2791                self.context_manager.compression.pruning_strategy,
2792                PruningStrategy::Reactive
2793            ) {
2794                tracing::warn!(
2795                    strategy = ?self.context_manager.compression.pruning_strategy,
2796                    "sidequest is enabled alongside a non-Reactive pruning strategy; \
2797                     consider disabling sidequest.enabled to avoid redundant eviction"
2798                );
2799            }
2800        }
2801
2802        // Guard: do not evict while a focus session is active.
2803        if self.focus.is_active() {
2804            tracing::debug!("sidequest: skipping — focus session active");
2805            // Drop any pending result — cursors may be stale relative to focus truncation.
2806            self.compression.pending_sidequest_result = None;
2807            return;
2808        }
2809
2810        // Phase 1: apply pending result from last turn's background LLM call.
2811        if let Some(handle) = self.compression.pending_sidequest_result.take() {
2812            // `now_or_never` avoids blocking — if the task isn't done yet, skip this turn.
2813            use futures::FutureExt as _;
2814            match handle.now_or_never() {
2815                Some(Ok(Some(evicted_indices))) if !evicted_indices.is_empty() => {
2816                    let cursors_snapshot = self.sidequest.tool_output_cursors.clone();
2817                    let freed = self.sidequest.apply_eviction(
2818                        &mut self.msg.messages,
2819                        &evicted_indices,
2820                        &self.metrics.token_counter,
2821                    );
2822                    if freed > 0 {
2823                        self.recompute_prompt_tokens();
2824                        // C1 fix: prevent maybe_compact() from firing in the same turn.
2825                        // cooldown=0: eviction does not impose post-compaction cooldown.
2826                        self.context_manager.compaction =
2827                            crate::agent::context_manager::CompactionState::CompactedThisTurn {
2828                                cooldown: 0,
2829                            };
2830                        tracing::info!(
2831                            freed_tokens = freed,
2832                            evicted_cursors = evicted_indices.len(),
2833                            pass = self.sidequest.passes_run,
2834                            "sidequest eviction complete"
2835                        );
2836                        if let Some(ref d) = self.debug_state.debug_dumper {
2837                            d.dump_sidequest_eviction(&cursors_snapshot, &evicted_indices, freed);
2838                        }
2839                        if let Some(ref tx) = self.session.status_tx {
2840                            let _ = tx.send(format!("SideQuest evicted {freed} tokens"));
2841                        }
2842                    } else {
2843                        // apply_eviction returned 0 — clear spinner so it doesn't dangle.
2844                        if let Some(ref tx) = self.session.status_tx {
2845                            let _ = tx.send(String::new());
2846                        }
2847                    }
2848                }
2849                Some(Ok(None | Some(_))) => {
2850                    tracing::debug!("sidequest: pending result: no cursors to evict");
2851                    if let Some(ref tx) = self.session.status_tx {
2852                        let _ = tx.send(String::new());
2853                    }
2854                }
2855                Some(Err(e)) => {
2856                    tracing::debug!("sidequest: background task panicked: {e}");
2857                    if let Some(ref tx) = self.session.status_tx {
2858                        let _ = tx.send(String::new());
2859                    }
2860                }
2861                None => {
2862                    // Task still running — re-store and wait another turn.
2863                    // We already took it; we'd need to re-spawn, but instead just drop and
2864                    // schedule fresh below to keep the cursor list current.
2865                    tracing::debug!(
2866                        "sidequest: background LLM task not yet complete, rescheduling"
2867                    );
2868                }
2869            }
2870        }
2871
2872        // Phase 2: rebuild cursors and schedule the next background eviction LLM call.
2873        self.sidequest
2874            .rebuild_cursors(&self.msg.messages, &self.metrics.token_counter);
2875
2876        if self.sidequest.tool_output_cursors.is_empty() {
2877            tracing::debug!("sidequest: no eligible cursors");
2878            return;
2879        }
2880
2881        let prompt = self.sidequest.build_eviction_prompt();
2882        let max_eviction_ratio = self.sidequest.config.max_eviction_ratio;
2883        let n_cursors = self.sidequest.tool_output_cursors.len();
2884        // Clone the provider so the spawn closure owns it without borrowing self.
2885        let provider = self.summary_or_primary_provider().clone();
2886
2887        // Spawn background task: the LLM call runs without blocking the agent loop.
2888        let handle = tokio::spawn(async move {
2889            let msgs = [Message {
2890                role: Role::User,
2891                content: prompt,
2892                parts: vec![],
2893                metadata: MessageMetadata::default(),
2894            }];
2895            let response =
2896                match tokio::time::timeout(std::time::Duration::from_secs(5), provider.chat(&msgs))
2897                    .await
2898                {
2899                    Ok(Ok(r)) => r,
2900                    Ok(Err(e)) => {
2901                        tracing::debug!("sidequest bg: LLM call failed: {e:#}");
2902                        return None;
2903                    }
2904                    Err(_) => {
2905                        tracing::debug!("sidequest bg: LLM call timed out");
2906                        return None;
2907                    }
2908                };
2909
2910            let start = response.find('{')?;
2911            let end = response.rfind('}')?;
2912            if start > end {
2913                return None;
2914            }
2915            let json_slice = &response[start..=end];
2916            let parsed: sidequest::EvictionResponse = serde_json::from_str(json_slice).ok()?;
2917            let mut valid: Vec<usize> = parsed
2918                .del_cursors
2919                .into_iter()
2920                .filter(|&c| c < n_cursors)
2921                .collect();
2922            valid.sort_unstable();
2923            valid.dedup();
2924            #[allow(
2925                clippy::cast_precision_loss,
2926                clippy::cast_possible_truncation,
2927                clippy::cast_sign_loss
2928            )]
2929            let max_evict = ((n_cursors as f32) * max_eviction_ratio).ceil() as usize;
2930            valid.truncate(max_evict);
2931            Some(valid)
2932        });
2933
2934        self.compression.pending_sidequest_result = Some(handle);
2935        tracing::debug!("sidequest: background LLM eviction task spawned");
2936        if let Some(ref tx) = self.session.status_tx {
2937            let _ = tx.send("SideQuest: scoring tool outputs...".into());
2938        }
2939    }
2940
2941    /// Return an `McpDispatch` adapter backed by the agent's MCP manager, if present.
2942    fn mcp_dispatch(&self) -> Option<McpManagerDispatch> {
2943        self.mcp
2944            .manager
2945            .as_ref()
2946            .map(|m| McpManagerDispatch(Arc::clone(m)))
2947    }
2948
2949    /// Check if the process cwd has changed since last call and fire `CwdChanged` hooks.
2950    ///
2951    /// Called after each tool batch completes. The check is a single syscall and has
2952    /// negligible cost. Only fires when cwd actually changed (defense-in-depth: normally
2953    /// only `set_working_directory` changes cwd; shell child processes cannot affect it).
2954    pub(crate) async fn check_cwd_changed(&mut self) {
2955        let current = match std::env::current_dir() {
2956            Ok(p) => p,
2957            Err(e) => {
2958                tracing::warn!("check_cwd_changed: failed to get cwd: {e}");
2959                return;
2960            }
2961        };
2962        if current == self.lifecycle.last_known_cwd {
2963            return;
2964        }
2965        let old_cwd = std::mem::replace(&mut self.lifecycle.last_known_cwd, current.clone());
2966        self.session.env_context.working_dir = current.display().to_string();
2967
2968        tracing::info!(
2969            old = %old_cwd.display(),
2970            new = %current.display(),
2971            "working directory changed"
2972        );
2973
2974        let _ = self
2975            .channel
2976            .send_status("Working directory changed\u{2026}")
2977            .await;
2978
2979        let hooks = self.session.hooks_config.cwd_changed.clone();
2980        if !hooks.is_empty() {
2981            let mut env = std::collections::HashMap::new();
2982            env.insert("ZEPH_OLD_CWD".to_owned(), old_cwd.display().to_string());
2983            env.insert("ZEPH_NEW_CWD".to_owned(), current.display().to_string());
2984            let dispatch = self.mcp_dispatch();
2985            let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
2986                .as_ref()
2987                .map(|d| d as &dyn zeph_subagent::McpDispatch);
2988            if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp).await {
2989                tracing::warn!(error = %e, "CwdChanged hook failed");
2990            }
2991        }
2992
2993        let _ = self.channel.send_status("").await;
2994    }
2995
2996    /// Handle a `FileChangedEvent` from the file watcher.
2997    pub(crate) async fn handle_file_changed(
2998        &mut self,
2999        event: crate::file_watcher::FileChangedEvent,
3000    ) {
3001        tracing::info!(path = %event.path.display(), "file changed");
3002
3003        let _ = self
3004            .channel
3005            .send_status("Running file-change hook\u{2026}")
3006            .await;
3007
3008        let hooks = self.session.hooks_config.file_changed_hooks.clone();
3009        if !hooks.is_empty() {
3010            let mut env = std::collections::HashMap::new();
3011            env.insert(
3012                "ZEPH_CHANGED_PATH".to_owned(),
3013                event.path.display().to_string(),
3014            );
3015            let dispatch = self.mcp_dispatch();
3016            let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
3017                .as_ref()
3018                .map(|d| d as &dyn zeph_subagent::McpDispatch);
3019            if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp).await {
3020                tracing::warn!(error = %e, "FileChanged hook failed");
3021            }
3022        }
3023
3024        let _ = self.channel.send_status("").await;
3025    }
3026
3027    /// If the compression spectrum is enabled and a promotion engine is wired, spawn a
3028    /// background scan task.
3029    ///
3030    /// The task loads the most-recent episodic window from `SemanticMemory`, runs the
3031    /// greedy clustering scan, and calls `promote` for each qualifying candidate.
3032    ///
3033    /// Supervised via [`agent_supervisor::BackgroundSupervisor`] under
3034    /// [`agent_supervisor::TaskClass::Enrichment`] — dropped under high load rather than
3035    /// blocking the turn.
3036    pub(super) fn maybe_spawn_promotion_scan(&mut self) {
3037        let Some(engine) = self.promotion_engine.clone() else {
3038            return;
3039        };
3040
3041        let Some(memory) = self.memory_state.persistence.memory.clone() else {
3042            return;
3043        };
3044
3045        // Use a conservative window cap. The engine's own PromotionConfig thresholds
3046        // determine whether a cluster actually qualifies; this is just the DB scan limit.
3047        let promotion_window = 200usize;
3048
3049        let accepted = self.lifecycle.supervisor.spawn(
3050            agent_supervisor::TaskClass::Enrichment,
3051            "compression_spectrum.promotion_scan",
3052            async move {
3053                let span = tracing::info_span!("memory.compression.promote.background");
3054                let _enter = span.enter();
3055
3056                let window = match memory.load_promotion_window(promotion_window).await {
3057                    Ok(w) => w,
3058                    Err(e) => {
3059                        tracing::warn!(error = %e, "promotion scan: failed to load window");
3060                        return;
3061                    }
3062                };
3063
3064                if window.is_empty() {
3065                    return;
3066                }
3067
3068                let candidates = match engine.scan(&window).await {
3069                    Ok(c) => c,
3070                    Err(e) => {
3071                        tracing::warn!(error = %e, "promotion scan: clustering failed");
3072                        return;
3073                    }
3074                };
3075
3076                for candidate in &candidates {
3077                    if let Err(e) = engine.promote(candidate).await {
3078                        tracing::warn!(
3079                            signature = %candidate.signature,
3080                            error = %e,
3081                            "promotion scan: promote failed"
3082                        );
3083                    }
3084                }
3085
3086                tracing::info!(candidates = candidates.len(), "promotion scan: complete");
3087            },
3088        );
3089
3090        if accepted {
3091            tracing::debug!("compression_spectrum: promotion scan task enqueued");
3092        }
3093    }
3094}
3095/// Thin wrapper that implements [`zeph_subagent::McpDispatch`] over an [`Arc<zeph_mcp::McpManager>`].
3096///
3097/// Used to pass MCP tool dispatch capability into `fire_hooks` without coupling
3098/// `zeph-subagent` to `zeph-mcp`.
3099struct McpManagerDispatch(Arc<zeph_mcp::McpManager>);
3100
3101impl zeph_subagent::McpDispatch for McpManagerDispatch {
3102    fn call_tool<'a>(
3103        &'a self,
3104        server: &'a str,
3105        tool: &'a str,
3106        args: serde_json::Value,
3107    ) -> std::pin::Pin<
3108        Box<dyn std::future::Future<Output = Result<serde_json::Value, String>> + Send + 'a>,
3109    > {
3110        Box::pin(async move {
3111            self.0
3112                .call_tool(server, tool, args)
3113                .await
3114                .map(|result| {
3115                    // Extract text content from the MCP response as a JSON value.
3116                    let texts: Vec<serde_json::Value> = result
3117                        .content
3118                        .iter()
3119                        .filter_map(|c| {
3120                            if let rmcp::model::RawContent::Text(t) = &c.raw {
3121                                Some(serde_json::Value::String(t.text.clone()))
3122                            } else {
3123                                None
3124                            }
3125                        })
3126                        .collect();
3127                    serde_json::Value::Array(texts)
3128                })
3129                .map_err(|e| e.to_string())
3130        })
3131    }
3132}
3133
3134pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
3135    while !*rx.borrow_and_update() {
3136        if rx.changed().await.is_err() {
3137            std::future::pending::<()>().await;
3138        }
3139    }
3140}
3141
3142pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
3143    match rx {
3144        Some(inner) => {
3145            if let Some(v) = inner.recv().await {
3146                Some(v)
3147            } else {
3148                *rx = None;
3149                std::future::pending().await
3150            }
3151        }
3152        None => std::future::pending().await,
3153    }
3154}
3155
3156/// Resolve the effective context budget from config, applying the `auto_budget` fallback.
3157///
3158/// Mirrors `AppBuilder::auto_budget_tokens` so hot-reload and initial startup use the same
3159/// logic: if `auto_budget = true` and `context_budget_tokens == 0`, query the provider's
3160/// context window; if still 0, fall back to 128 000 tokens.
3161pub(crate) fn resolve_context_budget(config: &Config, provider: &AnyProvider) -> usize {
3162    let tokens = if config.memory.auto_budget && config.memory.context_budget_tokens == 0 {
3163        if let Some(ctx_size) = provider.context_window() {
3164            tracing::info!(
3165                model_context = ctx_size,
3166                "auto-configured context budget on reload"
3167            );
3168            ctx_size
3169        } else {
3170            0
3171        }
3172    } else {
3173        config.memory.context_budget_tokens
3174    };
3175    if tokens == 0 {
3176        tracing::warn!(
3177            "context_budget_tokens resolved to 0 on reload — using fallback of 128000 tokens"
3178        );
3179        128_000
3180    } else {
3181        tokens
3182    }
3183}
3184
3185#[cfg(test)]
3186mod tests;
3187
3188#[cfg(test)]
3189pub(crate) use tests::agent_tests;
3190
3191#[cfg(test)]
3192mod test_stubs {
3193    use std::pin::Pin;
3194
3195    use zeph_commands::{
3196        CommandContext, CommandError, CommandHandler, CommandOutput, SlashCategory,
3197    };
3198
3199    /// Stub slash command registered only in `#[cfg(test)]` builds.
3200    ///
3201    /// Triggers the `Some(Err(CommandError))` arm in the session/debug registry
3202    /// dispatch block so the non-fatal error path can be tested without production
3203    /// command validation logic.
3204    pub(super) struct TestErrorCommand;
3205
3206    impl CommandHandler<CommandContext<'_>> for TestErrorCommand {
3207        fn name(&self) -> &'static str {
3208            "/test-error"
3209        }
3210
3211        fn description(&self) -> &'static str {
3212            "Test stub: always returns CommandError"
3213        }
3214
3215        fn category(&self) -> SlashCategory {
3216            SlashCategory::Session
3217        }
3218
3219        fn handle<'a>(
3220            &'a self,
3221            _ctx: &'a mut CommandContext<'_>,
3222            _args: &'a str,
3223        ) -> Pin<
3224            Box<dyn std::future::Future<Output = Result<CommandOutput, CommandError>> + Send + 'a>,
3225        > {
3226            Box::pin(async { Err(CommandError::new("boom")) })
3227        }
3228    }
3229}