Skip to main content

zeph_core/agent/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4mod agent_access_impl;
5pub(crate) mod agent_supervisor;
6mod autodream;
7mod builder;
8mod command_context_impls;
9pub(crate) mod compaction_strategy;
10pub(super) mod compression_feedback;
11mod context;
12mod context_impls;
13pub(crate) mod context_manager;
14mod corrections;
15pub mod error;
16mod experiment_cmd;
17pub(super) mod feedback_detector;
18pub(crate) mod focus;
19mod index;
20mod learning;
21pub(crate) mod learning_engine;
22mod log_commands;
23mod loop_event;
24mod lsp_commands;
25mod magic_docs;
26mod mcp;
27mod message_queue;
28mod microcompact;
29mod model_commands;
30mod persistence;
31#[cfg(feature = "scheduler")]
32mod plan;
33mod policy_commands;
34mod provider_cmd;
35pub(crate) mod rate_limiter;
36#[cfg(feature = "scheduler")]
37mod scheduler_commands;
38#[cfg(feature = "scheduler")]
39mod scheduler_loop;
40pub mod session_config;
41mod session_digest;
42pub(crate) mod sidequest;
43mod skill_management;
44pub mod slash_commands;
45pub mod speculative;
46pub(crate) mod state;
47pub(crate) mod tool_execution;
48pub(crate) mod tool_orchestrator;
49mod trust_commands;
50pub mod turn;
51mod utils;
52pub(crate) mod vigil;
53
54use std::collections::{HashMap, VecDeque};
55use std::sync::Arc;
56
57use parking_lot::RwLock;
58
59use tokio::sync::{mpsc, watch};
60use tokio_util::sync::CancellationToken;
61use zeph_llm::any::AnyProvider;
62use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
63use zeph_memory::TokenCounter;
64use zeph_memory::semantic::SemanticMemory;
65use zeph_skills::loader::Skill;
66use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
67use zeph_skills::prompt::format_skills_prompt;
68use zeph_skills::registry::SkillRegistry;
69use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
70
71use crate::channel::Channel;
72use crate::config::Config;
73use crate::context::{ContextBudget, build_system_prompt};
74use zeph_common::text::estimate_tokens;
75
76use loop_event::LoopEvent;
77use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, detect_image_mime};
78use state::CompressionState;
79use state::{
80    DebugState, ExperimentState, FeedbackState, IndexState, InstructionState, LifecycleState,
81    McpState, MemoryState, MessageState, MetricsState, OrchestrationState, ProviderState,
82    RuntimeConfig, SecurityState, SessionState, SkillState, ToolState,
83};
84
85pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
86pub(crate) const DOCUMENT_RAG_PREFIX: &str = "## Relevant documents\n";
87pub(crate) const RECALL_PREFIX: &str = "[semantic recall]\n";
88pub(crate) const CODE_CONTEXT_PREFIX: &str = "[code context]\n";
89pub(crate) const SUMMARY_PREFIX: &str = "[conversation summaries]\n";
90pub(crate) const CROSS_SESSION_PREFIX: &str = "[cross-session context]\n";
91pub(crate) const CORRECTIONS_PREFIX: &str = "[past corrections]\n";
92pub(crate) const GRAPH_FACTS_PREFIX: &str = "[known facts]\n";
93pub(crate) const SCHEDULED_TASK_PREFIX: &str = "Execute the following scheduled task now: ";
94pub(crate) const SESSION_DIGEST_PREFIX: &str = "[Session digest from previous interaction]\n";
95/// Prefix used for LSP context messages (`Role::System`) injected into message history.
96/// The tool-pair summarizer targets User/Assistant pairs and skips System messages,
97/// so these notes are never accidentally summarized. `remove_lsp_messages` uses this
98/// prefix to clear stale notes before each fresh injection.
99pub(crate) const LSP_NOTE_PREFIX: &str = "[lsp ";
100pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
101
102pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
103    use std::fmt::Write;
104    let capacity = "[tool output: ".len()
105        + tool_name.len()
106        + "]\n```\n".len()
107        + body.len()
108        + TOOL_OUTPUT_SUFFIX.len();
109    let mut buf = String::with_capacity(capacity);
110    let _ = write!(
111        buf,
112        "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
113    );
114    buf
115}
116
117/// Zeph agent: autonomous AI system with multi-model inference, semantic memory, skills,
118/// tool orchestration, and multi-channel I/O.
119///
120/// The agent maintains conversation history, manages LLM provider state, coordinates tool
121/// execution, and orchestrates memory and skill subsystems. It communicates with the outside
122/// world via the [`Channel`] trait, enabling support for CLI, Telegram, TUI, or custom I/O.
123///
124/// # Architecture
125///
126/// - **Message state**: Conversation history with system prompt, message queue, and metadata
127/// - **Memory state**: `SQLite` + Qdrant vector store for semantic search and compaction
128/// - **Skill state**: Registry, matching engine, and self-learning evolution
129/// - **Context manager**: Token budgeting, context assembly, and summarization
130/// - **Tool orchestrator**: DAG-based multi-tool execution with streaming output
131/// - **MCP client**: Multi-server support for Model Context Protocol
132/// - **Index state**: AST-based code indexing and semantic retrieval
133/// - **Security**: Sanitization, exfiltration detection, adversarial probes
134/// - **Metrics**: Token usage, latency, cost, and anomaly tracking
135///
136/// # Channel Contract
137///
138/// The agent requires a [`Channel`] implementation for user interaction:
139/// - Sends agent responses via `channel.send(message)`
140/// - Receives user input via `channel.recv()` / `channel.recv_internal()`
141/// - Supports structured events: tool invocations, tool output, streaming updates
142///
143/// # Lifecycle
144///
145/// 1. Create with [`Self::new`] or [`Self::new_with_registry_arc`]
146/// 2. Run main loop with [`Self::run`]
147/// 3. Clean up with [`Self::shutdown`] to persist state and close resources
148pub struct Agent<C: Channel> {
149    provider: AnyProvider,
150    /// Dedicated embedding provider. Resolved once at bootstrap from `[[llm.providers]]`
151    /// (the entry with `embed = true`, or first entry with `embedding_model` set).
152    /// Falls back to `provider.clone()` when no dedicated entry exists.
153    /// **Never replaced** by `/provider switch`.
154    embedding_provider: AnyProvider,
155    channel: C,
156    pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
157    pub(super) msg: MessageState,
158    pub(super) memory_state: MemoryState,
159    pub(super) skill_state: SkillState,
160    pub(super) context_manager: context_manager::ContextManager,
161    pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
162    pub(super) learning_engine: learning_engine::LearningEngine,
163    pub(super) feedback: FeedbackState,
164    pub(super) runtime: RuntimeConfig,
165    pub(super) mcp: McpState,
166    pub(super) index: IndexState,
167    pub(super) session: SessionState,
168    pub(super) debug_state: DebugState,
169    pub(super) instructions: InstructionState,
170    pub(super) security: SecurityState,
171    pub(super) experiments: ExperimentState,
172    pub(super) compression: CompressionState,
173    pub(super) lifecycle: LifecycleState,
174    pub(super) providers: ProviderState,
175    pub(super) metrics: MetricsState,
176    pub(super) orchestration: OrchestrationState,
177    /// Focus agent state: active session tracking, knowledge block, reminder counters (#1850).
178    pub(super) focus: focus::FocusState,
179    /// `SideQuest` state: cursor tracking, turn counter, eviction stats (#1885).
180    pub(super) sidequest: sidequest::SidequestState,
181    /// Tool filtering, dependency tracking, and iteration bookkeeping.
182    pub(super) tool_state: ToolState,
183}
184
185impl<C: Channel> Agent<C> {
186    /// Create a new agent instance with the given LLM provider, I/O channel, and subsystems.
187    ///
188    /// # Arguments
189    ///
190    /// * `provider` — Multi-model LLM provider (Claude, `OpenAI`, Ollama, Candle)
191    /// * `channel` — I/O abstraction for user interaction (CLI, Telegram, TUI, etc.)
192    /// * `registry` — Skill registry; moved into an internal `Arc<RwLock<_>>` for sharing
193    /// * `matcher` — Optional semantic skill matcher (e.g., Qdrant, BM25). If `None`,
194    ///   skills are matched by exact name only
195    /// * `max_active_skills` — Max concurrent skills in execution (must be > 0)
196    /// * `tool_executor` — Trait object for executing shell, web, and custom tools
197    ///
198    /// # Initialization
199    ///
200    /// The constructor:
201    /// 1. Wraps the skill registry into `Arc<RwLock<_>>` internally
202    /// 2. Builds the system prompt from registered skills
203    /// 3. Initializes all subsystems (memory, context manager, metrics, security)
204    /// 4. Returns a ready-to-run agent
205    ///
206    /// # Panics
207    ///
208    /// Panics if `max_active_skills` is 0.
209    #[must_use]
210    pub fn new(
211        provider: AnyProvider,
212        channel: C,
213        registry: SkillRegistry,
214        matcher: Option<SkillMatcherBackend>,
215        max_active_skills: usize,
216        tool_executor: impl ToolExecutor + 'static,
217    ) -> Self {
218        let registry = Arc::new(RwLock::new(registry));
219        let embedding_provider = provider.clone();
220        Self::new_with_registry_arc(
221            provider,
222            embedding_provider,
223            channel,
224            registry,
225            matcher,
226            max_active_skills,
227            tool_executor,
228        )
229    }
230
231    /// Create an agent from a pre-wrapped registry Arc, allowing the caller to
232    /// share the same Arc with other components (e.g. [`crate::SkillLoaderExecutor`]).
233    ///
234    /// # Panics
235    ///
236    /// Panics if the registry `RwLock` is poisoned.
237    #[must_use]
238    #[allow(clippy::too_many_lines)] // flat struct literal initializing all Agent sub-structs — one field per sub-struct, cannot be split further
239    pub fn new_with_registry_arc(
240        provider: AnyProvider,
241        embedding_provider: AnyProvider,
242        channel: C,
243        registry: Arc<RwLock<SkillRegistry>>,
244        matcher: Option<SkillMatcherBackend>,
245        max_active_skills: usize,
246        tool_executor: impl ToolExecutor + 'static,
247    ) -> Self {
248        debug_assert!(max_active_skills > 0, "max_active_skills must be > 0");
249        let all_skills: Vec<Skill> = {
250            let reg = registry.read();
251            reg.all_meta()
252                .iter()
253                .filter_map(|m| reg.get_skill(&m.name).ok())
254                .collect()
255        };
256        let empty_trust = HashMap::new();
257        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
258        let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
259        let system_prompt = build_system_prompt(&skills_prompt, None);
260        tracing::debug!(len = system_prompt.len(), "initial system prompt built");
261        tracing::trace!(prompt = %system_prompt, "full system prompt");
262
263        let initial_prompt_tokens = estimate_tokens(&system_prompt) as u64;
264        let token_counter = Arc::new(TokenCounter::new());
265        Self {
266            provider,
267            embedding_provider,
268            channel,
269            tool_executor: Arc::new(tool_executor),
270            msg: MessageState {
271                messages: vec![Message {
272                    role: Role::System,
273                    content: system_prompt,
274                    parts: vec![],
275                    metadata: MessageMetadata::default(),
276                }],
277                message_queue: VecDeque::new(),
278                pending_image_parts: Vec::new(),
279                last_persisted_message_id: None,
280                deferred_db_hide_ids: Vec::new(),
281                deferred_db_summaries: Vec::new(),
282            },
283            memory_state: MemoryState::default(),
284            skill_state: SkillState::new(registry, matcher, max_active_skills, skills_prompt),
285            context_manager: context_manager::ContextManager::new(),
286            tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
287            learning_engine: learning_engine::LearningEngine::new(),
288            feedback: FeedbackState::default(),
289            debug_state: DebugState::default(),
290            runtime: RuntimeConfig::default(),
291            mcp: McpState::default(),
292            index: IndexState::default(),
293            session: SessionState::new(),
294            instructions: InstructionState::default(),
295            security: SecurityState::default(),
296            experiments: ExperimentState::new(),
297            compression: CompressionState::default(),
298            lifecycle: LifecycleState::new(),
299            providers: ProviderState::new(initial_prompt_tokens),
300            metrics: MetricsState::new(token_counter),
301            orchestration: OrchestrationState::default(),
302            focus: focus::FocusState::default(),
303            sidequest: sidequest::SidequestState::default(),
304            tool_state: ToolState::default(),
305        }
306    }
307
308    /// Poll all active sub-agents for completed/failed/canceled results.
309    ///
310    /// Non-blocking: returns immediately with a list of `(task_id, result)` pairs
311    /// for agents that have finished. Each completed agent is removed from the manager.
312    pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
313        let Some(mgr) = &mut self.orchestration.subagent_manager else {
314            return vec![];
315        };
316
317        let finished: Vec<String> = mgr
318            .statuses()
319            .into_iter()
320            .filter_map(|(id, status)| {
321                if matches!(
322                    status.state,
323                    zeph_subagent::SubAgentState::Completed
324                        | zeph_subagent::SubAgentState::Failed
325                        | zeph_subagent::SubAgentState::Canceled
326                ) {
327                    Some(id)
328                } else {
329                    None
330                }
331            })
332            .collect();
333
334        let mut results = vec![];
335        for task_id in finished {
336            match mgr.collect(&task_id).await {
337                Ok(result) => results.push((task_id, result)),
338                Err(e) => {
339                    tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
340                }
341            }
342        }
343        results
344    }
345
346    /// Call the LLM to generate a structured session summary with a configurable timeout.
347    ///
348    /// Falls back to plain-text chat if structured output fails or times out. Returns `None` on
349    /// any failure, logging a warning — callers must treat `None` as "skip storage".
350    ///
351    /// Each LLM attempt is bounded by `shutdown_summary_timeout_secs`; in the worst case
352    /// (structured call times out and plain-text fallback also times out) this adds up to
353    /// `2 * shutdown_summary_timeout_secs` of shutdown latency.
354    async fn call_llm_for_session_summary(
355        &self,
356        chat_messages: &[Message],
357    ) -> Option<zeph_memory::StructuredSummary> {
358        let timeout_dur = std::time::Duration::from_secs(
359            self.memory_state.compaction.shutdown_summary_timeout_secs,
360        );
361        match tokio::time::timeout(
362            timeout_dur,
363            self.provider
364                .chat_typed_erased::<zeph_memory::StructuredSummary>(chat_messages),
365        )
366        .await
367        {
368            Ok(Ok(s)) => Some(s),
369            Ok(Err(e)) => {
370                tracing::warn!(
371                    "shutdown summary: structured LLM call failed, falling back to plain: {e:#}"
372                );
373                self.plain_text_summary_fallback(chat_messages, timeout_dur)
374                    .await
375            }
376            Err(_) => {
377                tracing::warn!(
378                    "shutdown summary: structured LLM call timed out after {}s, falling back to plain",
379                    self.memory_state.compaction.shutdown_summary_timeout_secs
380                );
381                self.plain_text_summary_fallback(chat_messages, timeout_dur)
382                    .await
383            }
384        }
385    }
386
387    async fn plain_text_summary_fallback(
388        &self,
389        chat_messages: &[Message],
390        timeout_dur: std::time::Duration,
391    ) -> Option<zeph_memory::StructuredSummary> {
392        match tokio::time::timeout(timeout_dur, self.provider.chat(chat_messages)).await {
393            Ok(Ok(plain)) => Some(zeph_memory::StructuredSummary {
394                summary: plain,
395                key_facts: vec![],
396                entities: vec![],
397            }),
398            Ok(Err(e)) => {
399                tracing::warn!("shutdown summary: plain LLM fallback failed: {e:#}");
400                None
401            }
402            Err(_) => {
403                tracing::warn!("shutdown summary: plain LLM fallback timed out");
404                None
405            }
406        }
407    }
408
409    /// Persist tombstone `ToolResult` messages for any assistant `ToolUse` parts that were written
410    /// to the DB during this session but never paired with a `ToolResult` (e.g. because stdin
411    /// closed while tool execution was in progress). Without this the next session startup strips
412    /// those assistant messages and emits orphan warnings.
413    async fn flush_orphaned_tool_use_on_shutdown(&mut self) {
414        use zeph_llm::provider::{MessagePart, Role};
415
416        // Walk messages in reverse: if the last assistant message (ignoring any trailing
417        // system messages) has ToolUse parts and is NOT immediately followed by a user
418        // message whose ToolResult ids cover those ToolUse ids, persist tombstones.
419        let msgs = &self.msg.messages;
420        // Find last assistant message index.
421        let Some(asst_idx) = msgs.iter().rposition(|m| m.role == Role::Assistant) else {
422            return;
423        };
424        let asst_msg = &msgs[asst_idx];
425        let tool_use_ids: Vec<(&str, &str, &serde_json::Value)> = asst_msg
426            .parts
427            .iter()
428            .filter_map(|p| {
429                if let MessagePart::ToolUse { id, name, input } = p {
430                    Some((id.as_str(), name.as_str(), input))
431                } else {
432                    None
433                }
434            })
435            .collect();
436        if tool_use_ids.is_empty() {
437            return;
438        }
439
440        // Check whether a following user message already pairs all ToolUse ids.
441        let paired_ids: std::collections::HashSet<&str> = msgs
442            .get(asst_idx + 1..)
443            .into_iter()
444            .flatten()
445            .filter(|m| m.role == Role::User)
446            .flat_map(|m| m.parts.iter())
447            .filter_map(|p| {
448                if let MessagePart::ToolResult { tool_use_id, .. } = p {
449                    Some(tool_use_id.as_str())
450                } else {
451                    None
452                }
453            })
454            .collect();
455
456        let unpaired: Vec<zeph_llm::provider::ToolUseRequest> = tool_use_ids
457            .iter()
458            .filter(|(id, _, _)| !paired_ids.contains(*id))
459            .map(|(id, name, input)| zeph_llm::provider::ToolUseRequest {
460                id: (*id).to_owned(),
461                name: (*name).to_owned().into(),
462                input: (*input).clone(),
463            })
464            .collect();
465
466        if unpaired.is_empty() {
467            return;
468        }
469
470        tracing::info!(
471            count = unpaired.len(),
472            "shutdown: persisting tombstone ToolResults for unpaired in-flight tool calls"
473        );
474        self.persist_cancelled_tool_results(&unpaired).await;
475    }
476
477    /// Generate and store a lightweight session summary at shutdown when no hard compaction fired.
478    ///
479    /// Guards:
480    /// - `shutdown_summary` config must be enabled
481    /// - `conversation_id` must be set (memory must be attached)
482    /// - no existing session summary in the store (primary guard — resilient to failed Qdrant writes)
483    /// - at least `shutdown_summary_min_messages` user-turn messages in history
484    ///
485    /// All errors are logged as warnings and swallowed — shutdown must never fail.
486    async fn maybe_store_shutdown_summary(&mut self) {
487        if !self.memory_state.compaction.shutdown_summary {
488            return;
489        }
490        let Some(memory) = self.memory_state.persistence.memory.clone() else {
491            return;
492        };
493        let Some(conversation_id) = self.memory_state.persistence.conversation_id else {
494            return;
495        };
496
497        // Primary guard: check if a summary already exists (handles failed Qdrant writes too).
498        match memory.has_session_summary(conversation_id).await {
499            Ok(true) => {
500                tracing::debug!("shutdown summary: session already has a summary, skipping");
501                return;
502            }
503            Ok(false) => {}
504            Err(e) => {
505                tracing::warn!("shutdown summary: failed to check existing summary: {e:#}");
506                return;
507            }
508        }
509
510        // Count user-turn messages only (skip system prompt at index 0).
511        let user_count = self
512            .msg
513            .messages
514            .iter()
515            .skip(1)
516            .filter(|m| m.role == Role::User)
517            .count();
518        if user_count < self.memory_state.compaction.shutdown_summary_min_messages {
519            tracing::debug!(
520                user_count,
521                min = self.memory_state.compaction.shutdown_summary_min_messages,
522                "shutdown summary: too few user messages, skipping"
523            );
524            return;
525        }
526
527        // TUI status — send errors silently ignored (TUI may already be gone at shutdown).
528        let _ = self.channel.send_status("Saving session summary...").await;
529
530        // Collect last N messages (skip system prompt at index 0).
531        let max = self.memory_state.compaction.shutdown_summary_max_messages;
532        if max == 0 {
533            tracing::debug!("shutdown summary: max_messages=0, skipping");
534            return;
535        }
536        let non_system: Vec<_> = self.msg.messages.iter().skip(1).collect();
537        let slice = if non_system.len() > max {
538            &non_system[non_system.len() - max..]
539        } else {
540            &non_system[..]
541        };
542
543        let msgs_for_prompt: Vec<(zeph_memory::MessageId, String, String)> = slice
544            .iter()
545            .map(|m| {
546                let role = match m.role {
547                    Role::User => "user".to_owned(),
548                    Role::Assistant => "assistant".to_owned(),
549                    Role::System => "system".to_owned(),
550                };
551                (zeph_memory::MessageId(0), role, m.content.clone())
552            })
553            .collect();
554
555        let prompt = zeph_memory::build_summarization_prompt(&msgs_for_prompt);
556        let chat_messages = vec![Message {
557            role: Role::User,
558            content: prompt,
559            parts: vec![],
560            metadata: MessageMetadata::default(),
561        }];
562
563        let Some(structured) = self.call_llm_for_session_summary(&chat_messages).await else {
564            let _ = self.channel.send_status("").await;
565            return;
566        };
567
568        if let Err(e) = memory
569            .store_shutdown_summary(conversation_id, &structured.summary, &structured.key_facts)
570            .await
571        {
572            tracing::warn!("shutdown summary: storage failed: {e:#}");
573        } else {
574            tracing::info!(
575                conversation_id = conversation_id.0,
576                "shutdown summary stored"
577            );
578        }
579
580        // Clear TUI status.
581        let _ = self.channel.send_status("").await;
582    }
583
584    /// Gracefully shut down the agent and persist state.
585    ///
586    /// Performs the following cleanup:
587    ///
588    /// 1. **Message persistence** — Deferred database writes (hide/summary operations)
589    ///    are flushed to memory or disk
590    /// 2. **Provider state** — LLM router state (e.g., Thompson sampling counters) is saved
591    ///    to the vault
592    /// 3. **Sub-agents** — All active sub-agent tasks are terminated
593    /// 4. **MCP servers** — All connected Model Context Protocol servers are shut down
594    /// 5. **Metrics finalization** — Compaction metrics and session metrics are recorded
595    /// 6. **Memory finalization** — Vector stores and semantic indices are flushed
596    /// 7. **Skill state** — Self-learning engine saves evolved skill definitions
597    ///
598    /// Call this before dropping the agent to ensure no data loss.
599    pub async fn shutdown(&mut self) {
600        self.channel.send("Shutting down...").await.ok();
601
602        // CRIT-1: persist Thompson state accumulated during this session.
603        self.provider.save_router_state();
604
605        // Persist AdaptOrch Beta-arm table alongside Thompson state.
606        if let Some(ref advisor) = self.orchestration.topology_advisor
607            && let Err(e) = advisor.save()
608        {
609            tracing::warn!(error = %e, "adaptorch: failed to persist state");
610        }
611
612        if let Some(ref mut mgr) = self.orchestration.subagent_manager {
613            mgr.shutdown_all();
614        }
615
616        if let Some(ref manager) = self.mcp.manager {
617            manager.shutdown_all_shared().await;
618        }
619
620        // Finalize compaction trajectory: push the last open segment into the Vec.
621        // This segment would otherwise only be pushed when the next hard compaction fires,
622        // which never happens at session end.
623        if let Some(turns) = self.context_manager.turns_since_last_hard_compaction {
624            self.update_metrics(|m| {
625                m.compaction_turns_after_hard.push(turns);
626            });
627            self.context_manager.turns_since_last_hard_compaction = None;
628        }
629
630        if let Some(ref tx) = self.metrics.metrics_tx {
631            let m = tx.borrow();
632            if m.filter_applications > 0 {
633                #[allow(clippy::cast_precision_loss)]
634                let pct = if m.filter_raw_tokens > 0 {
635                    m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
636                } else {
637                    0.0
638                };
639                tracing::info!(
640                    raw_tokens = m.filter_raw_tokens,
641                    saved_tokens = m.filter_saved_tokens,
642                    applications = m.filter_applications,
643                    "tool output filtering saved ~{} tokens ({pct:.0}%)",
644                    m.filter_saved_tokens,
645                );
646            }
647            if m.compaction_hard_count > 0 {
648                tracing::info!(
649                    hard_compactions = m.compaction_hard_count,
650                    turns_after_hard = ?m.compaction_turns_after_hard,
651                    "hard compaction trajectory"
652                );
653            }
654        }
655
656        // Flush tombstone ToolResults for any assistant ToolUse that was persisted but never
657        // paired with a ToolResult (e.g. stdin EOF mid-execution). Without this the next session
658        // startup strips the orphaned ToolUse and emits warnings.
659        self.flush_orphaned_tool_use_on_shutdown().await;
660
661        self.lifecycle.supervisor.abort_all();
662
663        self.maybe_store_shutdown_summary().await;
664        self.maybe_store_session_digest().await;
665
666        tracing::info!("agent shutdown complete");
667    }
668
669    /// Run the chat loop, receiving messages via the channel until EOF or shutdown.
670    ///
671    /// # Errors
672    ///
673    /// Returns an error if channel I/O or LLM communication fails.
674    /// Refresh sub-agent metrics snapshot for the TUI metrics panel.
675    fn refresh_subagent_metrics(&mut self) {
676        let Some(ref mgr) = self.orchestration.subagent_manager else {
677            return;
678        };
679        let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
680            .statuses()
681            .into_iter()
682            .map(|(id, s)| {
683                let def = mgr.agents_def(&id);
684                crate::metrics::SubAgentMetrics {
685                    name: def.map_or_else(|| id[..8.min(id.len())].to_owned(), |d| d.name.clone()),
686                    id: id.clone(),
687                    state: format!("{:?}", s.state).to_lowercase(),
688                    turns_used: s.turns_used,
689                    max_turns: def.map_or(20, |d| d.permissions.max_turns),
690                    background: def.is_some_and(|d| d.permissions.background),
691                    elapsed_secs: s.started_at.elapsed().as_secs(),
692                    permission_mode: def.map_or_else(String::new, |d| {
693                        use zeph_subagent::def::PermissionMode;
694                        match d.permissions.permission_mode {
695                            PermissionMode::Default => String::new(),
696                            PermissionMode::AcceptEdits => "accept_edits".into(),
697                            PermissionMode::DontAsk => "dont_ask".into(),
698                            PermissionMode::BypassPermissions => "bypass_permissions".into(),
699                            PermissionMode::Plan => "plan".into(),
700                        }
701                    }),
702                    transcript_dir: mgr
703                        .agent_transcript_dir(&id)
704                        .map(|p| p.to_string_lossy().into_owned()),
705                }
706            })
707            .collect();
708        self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
709    }
710
711    /// Non-blocking poll: notify the user when background sub-agents complete.
712    async fn notify_completed_subagents(&mut self) -> Result<(), error::AgentError> {
713        let completed = self.poll_subagents().await;
714        for (task_id, result) in completed {
715            let notice = if result.is_empty() {
716                format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
717            } else {
718                format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
719            };
720            if let Err(e) = self.channel.send(&notice).await {
721                tracing::warn!(error = %e, "failed to send sub-agent completion notice");
722            }
723        }
724        Ok(())
725    }
726
727    /// Run the agent main loop.
728    ///
729    /// # Errors
730    ///
731    /// Returns an error if the channel, LLM provider, or tool execution encounters a fatal error.
732    #[allow(clippy::too_many_lines)] // run loop is inherently large; each branch is independent
733    pub async fn run(&mut self) -> Result<(), error::AgentError>
734    where
735        C: 'static,
736    {
737        if let Some(mut rx) = self.lifecycle.warmup_ready.take()
738            && !*rx.borrow()
739        {
740            let _ = rx.changed().await;
741            if !*rx.borrow() {
742                tracing::warn!("model warmup did not complete successfully");
743            }
744        }
745
746        // Load the session digest once at session start for context injection.
747        self.load_and_cache_session_digest().await;
748        self.maybe_send_resume_recap().await;
749
750        loop {
751            self.apply_provider_override();
752            self.check_tool_refresh().await;
753            self.process_pending_elicitations().await;
754            self.refresh_subagent_metrics();
755            self.notify_completed_subagents().await?;
756            self.drain_channel();
757
758            let (text, image_parts) = if let Some(queued) = self.msg.message_queue.pop_front() {
759                self.notify_queue_count().await;
760                if queued.raw_attachments.is_empty() {
761                    (queued.text, queued.image_parts)
762                } else {
763                    let msg = crate::channel::ChannelMessage {
764                        text: queued.text,
765                        attachments: queued.raw_attachments,
766                    };
767                    self.resolve_message(msg).await
768                }
769            } else {
770                match self.next_event().await? {
771                    None | Some(LoopEvent::Shutdown) => break,
772                    Some(LoopEvent::SkillReload) => {
773                        self.reload_skills().await;
774                        continue;
775                    }
776                    Some(LoopEvent::InstructionReload) => {
777                        self.reload_instructions();
778                        continue;
779                    }
780                    Some(LoopEvent::ConfigReload) => {
781                        self.reload_config();
782                        continue;
783                    }
784                    Some(LoopEvent::UpdateNotification(msg)) => {
785                        if let Err(e) = self.channel.send(&msg).await {
786                            tracing::warn!("failed to send update notification: {e}");
787                        }
788                        continue;
789                    }
790                    Some(LoopEvent::ExperimentCompleted(msg)) => {
791                        self.experiments.cancel = None;
792                        if let Err(e) = self.channel.send(&msg).await {
793                            tracing::warn!("failed to send experiment completion: {e}");
794                        }
795                        continue;
796                    }
797                    Some(LoopEvent::ScheduledTask(prompt)) => {
798                        let text = format!("{SCHEDULED_TASK_PREFIX}{prompt}");
799                        let msg = crate::channel::ChannelMessage {
800                            text,
801                            attachments: Vec::new(),
802                        };
803                        self.drain_channel();
804                        self.resolve_message(msg).await
805                    }
806                    Some(LoopEvent::FileChanged(event)) => {
807                        self.handle_file_changed(event).await;
808                        continue;
809                    }
810                    Some(LoopEvent::Message(msg)) => {
811                        self.drain_channel();
812                        self.resolve_message(msg).await
813                    }
814                }
815            };
816
817            let trimmed = text.trim();
818
819            // M3: extract flagged URLs from all slash commands before any registry dispatch,
820            // so `/skill install <url>` and similar commands populate user_provided_urls.
821            if trimmed.starts_with('/') {
822                let slash_urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
823                if !slash_urls.is_empty() {
824                    self.security.user_provided_urls.write().extend(slash_urls);
825                }
826            }
827
828            // Registry dispatch: build the command registry, construct CommandContext, dispatch.
829            // The registry is built inline (all handlers are ZSTs) to avoid borrow-checker
830            // conflicts when constructing CommandContext from Agent<C> fields.
831            // Build context first so that borrows outlive the registry (drop order matters).
832            let session_impl = command_context_impls::SessionAccessImpl {
833                supports_exit: self.channel.supports_exit(),
834            };
835            let mut messages_impl = command_context_impls::MessageAccessImpl {
836                msg: &mut self.msg,
837                tool_state: &mut self.tool_state,
838                providers: &mut self.providers,
839                metrics: &self.metrics,
840                security: &mut self.security,
841                tool_orchestrator: &mut self.tool_orchestrator,
842            };
843            // sink_adapter declared before reg so it is dropped after reg (LIFO).
844            let mut sink_adapter = crate::channel::ChannelSinkAdapter(&mut self.channel);
845            // null_agent must be declared before reg so it lives longer (LIFO drop order).
846            let mut null_agent = zeph_commands::NullAgent;
847            let registry_handled = {
848                use zeph_commands::CommandRegistry;
849                use zeph_commands::handlers::debug::{
850                    DebugDumpCommand, DumpFormatCommand, LogCommand,
851                };
852                use zeph_commands::handlers::help::HelpCommand;
853                use zeph_commands::handlers::session::{
854                    ClearCommand, ClearQueueCommand, ExitCommand, QuitCommand, ResetCommand,
855                };
856
857                let mut reg = CommandRegistry::new();
858                reg.register(ExitCommand);
859                reg.register(QuitCommand);
860                reg.register(ClearCommand);
861                reg.register(ResetCommand);
862                reg.register(ClearQueueCommand);
863                reg.register(LogCommand);
864                reg.register(DebugDumpCommand);
865                reg.register(DumpFormatCommand);
866                reg.register(HelpCommand);
867
868                let mut ctx = zeph_commands::CommandContext {
869                    sink: &mut sink_adapter,
870                    debug: &mut self.debug_state,
871                    messages: &mut messages_impl,
872                    session: &session_impl,
873                    agent: &mut null_agent,
874                };
875                reg.dispatch(&mut ctx, trimmed).await
876            };
877            match registry_handled {
878                Some(Ok(zeph_commands::CommandOutput::Exit)) => {
879                    let _ = self.channel.flush_chunks().await;
880                    break;
881                }
882                Some(Ok(
883                    zeph_commands::CommandOutput::Continue | zeph_commands::CommandOutput::Silent,
884                )) => {
885                    let _ = self.channel.flush_chunks().await;
886                    continue;
887                }
888                Some(Ok(zeph_commands::CommandOutput::Message(msg))) => {
889                    let _ = self.channel.send(&msg).await;
890                    let _ = self.channel.flush_chunks().await;
891                    continue;
892                }
893                Some(Err(e)) => return Err(error::AgentError::Other(e.0)),
894                None => {
895                    // Not handled by the session/debug registry; try agent-command registry.
896                }
897            }
898
899            // Agent-command registry: handlers access Agent<C> directly.
900            // Null sentinels declared here so they outlive ctx regardless of whether the `if`
901            // block is entered. `ctx` borrows both `self` and the sentinels; it must drop before
902            // any subsequent `self.channel.*` calls. Because Rust drops in LIFO order, the
903            // sentinels here will outlive ctx (ctx is declared later, inside the block).
904            let mut agent_null_debug = command_context_impls::NullDebugAccess;
905            let mut agent_null_messages = command_context_impls::NullMessageAccess;
906            let agent_null_session = command_context_impls::NullSessionAccess;
907            let mut agent_null_sink = zeph_commands::NullSink;
908            let agent_result: Option<
909                Result<zeph_commands::CommandOutput, zeph_commands::CommandError>,
910            > = if registry_handled.is_none() {
911                use zeph_commands::CommandRegistry;
912                use zeph_commands::handlers::{
913                    agent_cmd::AgentCommand,
914                    compaction::{CompactCommand, NewConversationCommand, RecapCommand},
915                    experiment::ExperimentCommand,
916                    lsp::LspCommand,
917                    mcp::McpCommand,
918                    memory::{GraphCommand, GuidelinesCommand, MemoryCommand},
919                    misc::{CacheStatsCommand, ImageCommand},
920                    model::{ModelCommand, ProviderCommand},
921                    plan::PlanCommand,
922                    policy::PolicyCommand,
923                    scheduler::SchedulerCommand,
924                    skill::{FeedbackCommand, SkillCommand, SkillsCommand},
925                    status::{FocusCommand, GuardrailCommand, SideQuestCommand, StatusCommand},
926                };
927
928                let mut agent_reg = CommandRegistry::new();
929                agent_reg.register(MemoryCommand);
930                agent_reg.register(GraphCommand);
931                agent_reg.register(GuidelinesCommand);
932                agent_reg.register(ModelCommand);
933                agent_reg.register(ProviderCommand);
934                // Phase 6 migrations: /skill, /skills, /feedback use clone-before-await pattern.
935                agent_reg.register(SkillCommand);
936                agent_reg.register(SkillsCommand);
937                agent_reg.register(FeedbackCommand);
938                agent_reg.register(McpCommand);
939                agent_reg.register(PolicyCommand);
940                agent_reg.register(SchedulerCommand);
941                agent_reg.register(LspCommand);
942                // Phase 4 migrations (Send-safe commands):
943                agent_reg.register(CacheStatsCommand);
944                agent_reg.register(ImageCommand);
945                agent_reg.register(StatusCommand);
946                agent_reg.register(GuardrailCommand);
947                agent_reg.register(FocusCommand);
948                agent_reg.register(SideQuestCommand);
949                agent_reg.register(AgentCommand);
950                // Phase 5 migrations (Send-compatible):
951                agent_reg.register(CompactCommand);
952                agent_reg.register(NewConversationCommand);
953                agent_reg.register(RecapCommand);
954                agent_reg.register(ExperimentCommand);
955                agent_reg.register(PlanCommand);
956
957                let mut ctx = zeph_commands::CommandContext {
958                    sink: &mut agent_null_sink,
959                    debug: &mut agent_null_debug,
960                    messages: &mut agent_null_messages,
961                    session: &agent_null_session,
962                    agent: self,
963                };
964                // self is reborrowed; ctx drops at end of this block.
965                agent_reg.dispatch(&mut ctx, trimmed).await
966            } else {
967                None
968            };
969            // self.channel is available again here (ctx borrow dropped above).
970            match agent_result {
971                Some(Ok(zeph_commands::CommandOutput::Exit)) => {
972                    let _ = self.channel.flush_chunks().await;
973                    break;
974                }
975                Some(Ok(
976                    zeph_commands::CommandOutput::Continue | zeph_commands::CommandOutput::Silent,
977                )) => {
978                    let _ = self.channel.flush_chunks().await;
979                    continue;
980                }
981                Some(Ok(zeph_commands::CommandOutput::Message(msg))) => {
982                    let _ = self.channel.send(&msg).await;
983                    let _ = self.channel.flush_chunks().await;
984                    // Post-dispatch learning hook: trigger generate_improved_skill for
985                    // `/skill reject` and `/feedback` commands. These calls require &mut self
986                    // and cannot be placed inside the Send future in agent_access_impl.rs.
987                    self.maybe_trigger_post_command_learning(trimmed).await;
988                    continue;
989                }
990                Some(Err(e)) => return Err(error::AgentError::Other(e.0)),
991                None => {
992                    // Not handled by agent registry; fall through to existing dispatch.
993                }
994            }
995
996            match self.handle_builtin_command(trimmed) {
997                Some(true) => break,
998                Some(false) => continue,
999                None => {}
1000            }
1001
1002            self.process_user_message(text, image_parts).await?;
1003        }
1004
1005        // autoDream: run background memory consolidation if conditions are met (#2697).
1006        // Runs with a timeout — partial state is acceptable for MVP.
1007        self.maybe_autodream().await;
1008
1009        // Flush trace collector on normal exit (C-04: Drop handles error/panic paths).
1010        if let Some(ref mut tc) = self.debug_state.trace_collector {
1011            tc.finish();
1012        }
1013
1014        Ok(())
1015    }
1016
1017    /// Apply any pending LLM provider override from ACP `set_session_config_option`.
1018    fn apply_provider_override(&mut self) {
1019        if let Some(ref slot) = self.providers.provider_override
1020            && let Some(new_provider) = slot.write().take()
1021        {
1022            tracing::debug!(provider = new_provider.name(), "ACP model override applied");
1023            self.provider = new_provider;
1024        }
1025    }
1026
1027    /// Poll all event sources and return the next [`LoopEvent`].
1028    ///
1029    /// Returns `None` when the inbound channel closes (graceful shutdown).
1030    ///
1031    /// # Errors
1032    ///
1033    /// Propagates channel receive errors.
1034    async fn next_event(&mut self) -> Result<Option<LoopEvent>, error::AgentError> {
1035        let event = tokio::select! {
1036            result = self.channel.recv() => {
1037                return Ok(result?.map(LoopEvent::Message));
1038            }
1039            () = shutdown_signal(&mut self.lifecycle.shutdown) => {
1040                tracing::info!("shutting down");
1041                LoopEvent::Shutdown
1042            }
1043            Some(_) = recv_optional(&mut self.skill_state.skill_reload_rx) => {
1044                LoopEvent::SkillReload
1045            }
1046            Some(_) = recv_optional(&mut self.instructions.reload_rx) => {
1047                LoopEvent::InstructionReload
1048            }
1049            Some(_) = recv_optional(&mut self.lifecycle.config_reload_rx) => {
1050                LoopEvent::ConfigReload
1051            }
1052            Some(msg) = recv_optional(&mut self.lifecycle.update_notify_rx) => {
1053                LoopEvent::UpdateNotification(msg)
1054            }
1055            Some(msg) = recv_optional(&mut self.experiments.notify_rx) => {
1056                LoopEvent::ExperimentCompleted(msg)
1057            }
1058            Some(prompt) = recv_optional(&mut self.lifecycle.custom_task_rx) => {
1059                tracing::info!("scheduler: injecting custom task as agent turn");
1060                LoopEvent::ScheduledTask(prompt)
1061            }
1062            Some(event) = recv_optional(&mut self.lifecycle.file_changed_rx) => {
1063                LoopEvent::FileChanged(event)
1064            }
1065        };
1066        Ok(Some(event))
1067    }
1068
1069    async fn resolve_message(
1070        &self,
1071        msg: crate::channel::ChannelMessage,
1072    ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
1073        use crate::channel::{Attachment, AttachmentKind};
1074        use zeph_llm::provider::{ImageData, MessagePart};
1075
1076        let text_base = msg.text.clone();
1077
1078        let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
1079            .attachments
1080            .into_iter()
1081            .partition(|a| a.kind == AttachmentKind::Audio);
1082
1083        tracing::debug!(
1084            audio = audio_attachments.len(),
1085            has_stt = self.providers.stt.is_some(),
1086            "resolve_message attachments"
1087        );
1088
1089        let text = if !audio_attachments.is_empty()
1090            && let Some(stt) = self.providers.stt.as_ref()
1091        {
1092            let mut transcribed_parts = Vec::new();
1093            for attachment in &audio_attachments {
1094                if attachment.data.len() > MAX_AUDIO_BYTES {
1095                    tracing::warn!(
1096                        size = attachment.data.len(),
1097                        max = MAX_AUDIO_BYTES,
1098                        "audio attachment exceeds size limit, skipping"
1099                    );
1100                    continue;
1101                }
1102                match stt
1103                    .transcribe(&attachment.data, attachment.filename.as_deref())
1104                    .await
1105                {
1106                    Ok(result) => {
1107                        tracing::info!(
1108                            len = result.text.len(),
1109                            language = ?result.language,
1110                            "audio transcribed"
1111                        );
1112                        transcribed_parts.push(result.text);
1113                    }
1114                    Err(e) => {
1115                        tracing::error!(error = %e, "audio transcription failed");
1116                    }
1117                }
1118            }
1119            if transcribed_parts.is_empty() {
1120                text_base
1121            } else {
1122                let transcribed = transcribed_parts.join("\n");
1123                if text_base.is_empty() {
1124                    transcribed
1125                } else {
1126                    format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
1127                }
1128            }
1129        } else {
1130            if !audio_attachments.is_empty() {
1131                tracing::warn!(
1132                    count = audio_attachments.len(),
1133                    "audio attachments received but no STT provider configured, dropping"
1134                );
1135            }
1136            text_base
1137        };
1138
1139        let mut image_parts = Vec::new();
1140        for attachment in image_attachments {
1141            if attachment.data.len() > MAX_IMAGE_BYTES {
1142                tracing::warn!(
1143                    size = attachment.data.len(),
1144                    max = MAX_IMAGE_BYTES,
1145                    "image attachment exceeds size limit, skipping"
1146                );
1147                continue;
1148            }
1149            let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
1150            image_parts.push(MessagePart::Image(Box::new(ImageData {
1151                data: attachment.data,
1152                mime_type,
1153            })));
1154        }
1155
1156        (text, image_parts)
1157    }
1158
1159    /// Create a new [`Turn`] for the given input and advance the turn counter.
1160    ///
1161    /// Clears per-turn state that must not carry over between turns:
1162    /// - per-turn `CancellationToken` (new token for each turn)
1163    /// - per-turn URL set in `SecurityState` (cleared here; re-populated in
1164    ///   `process_user_message_inner` after security checks)
1165    fn begin_turn(&mut self, input: turn::TurnInput) -> turn::Turn {
1166        let id = turn::TurnId(self.debug_state.iteration_counter as u64);
1167        self.debug_state.iteration_counter += 1;
1168        self.lifecycle.cancel_token = CancellationToken::new();
1169        self.security.user_provided_urls.write().clear();
1170        turn::Turn::new(id, input)
1171    }
1172
1173    /// Finalise a turn: copy accumulated timings into `MetricsState` and flush.
1174    ///
1175    /// Must be called exactly once per turn, after `process_user_message_inner` returns
1176    /// (regardless of success or error). Corresponds to the M2 resolution in the spec:
1177    /// `TurnMetrics.timings` is the single source of truth; `MetricsState.pending_timings`
1178    /// is populated from it here so the rest of the pipeline is unchanged.
1179    fn end_turn(&mut self, turn: turn::Turn) {
1180        self.metrics.pending_timings = turn.metrics.timings;
1181        self.flush_turn_timings();
1182        // Clear per-turn intent (FR-008): must not persist across turns.
1183        self.session.current_turn_intent = None;
1184    }
1185
1186    #[cfg_attr(
1187        feature = "profiling",
1188        tracing::instrument(name = "agent.turn", skip_all, fields(turn_id))
1189    )]
1190    async fn process_user_message(
1191        &mut self,
1192        text: String,
1193        image_parts: Vec<zeph_llm::provider::MessagePart>,
1194    ) -> Result<(), error::AgentError> {
1195        let input = turn::TurnInput::new(text, image_parts);
1196        let mut t = self.begin_turn(input);
1197
1198        let turn_idx = usize::try_from(t.id().0).unwrap_or(usize::MAX);
1199        tracing::Span::current().record("turn_id", t.id().0);
1200        // Record iteration start in trace collector (C-02: owned guard, no borrow held).
1201        self.debug_state
1202            .start_iteration_span(turn_idx, t.input.text.trim());
1203
1204        let result = self.process_user_message_inner(&mut t).await;
1205
1206        // Close iteration span regardless of outcome (partial trace preserved on error).
1207        let span_status = if result.is_ok() {
1208            crate::debug_dump::trace::SpanStatus::Ok
1209        } else {
1210            crate::debug_dump::trace::SpanStatus::Error {
1211                message: "iteration failed".to_owned(),
1212            }
1213        };
1214        self.debug_state.end_iteration_span(turn_idx, span_status);
1215
1216        self.end_turn(t);
1217        result
1218    }
1219
1220    async fn process_user_message_inner(
1221        &mut self,
1222        turn: &mut turn::Turn,
1223    ) -> Result<(), error::AgentError> {
1224        // Reap completed background tasks from the previous turn. The summarization signal
1225        // is applied here — between turns — so `unsummarized_count` is always reset on the
1226        // foreground without shared mutable state across tasks (S1 fix from critic review).
1227        let bg_signal = self.lifecycle.supervisor.reap();
1228        if bg_signal.did_summarize {
1229            self.memory_state.persistence.unsummarized_count = 0;
1230            tracing::debug!("background summarization completed; unsummarized_count reset");
1231        }
1232        {
1233            let snap = self.lifecycle.supervisor.metrics_snapshot();
1234            self.update_metrics(|m| {
1235                m.bg_inflight = snap.inflight as u64;
1236                m.bg_dropped = snap.total_dropped();
1237                m.bg_completed = snap.total_completed();
1238                m.bg_enrichment_inflight = snap.class_inflight[0] as u64;
1239                m.bg_telemetry_inflight = snap.class_inflight[1] as u64;
1240            });
1241        }
1242
1243        // Intentional ordering: reap() runs before abort_class() so completed tasks are
1244        // accounted in the metrics snapshot above. The TUI may show a stale enrichment
1245        // inflight count for one cycle when abort fires, but this self-corrects next turn.
1246        if self.runtime.supervisor_config.abort_enrichment_on_turn {
1247            self.lifecycle
1248                .supervisor
1249                .abort_class(agent_supervisor::TaskClass::Enrichment);
1250        }
1251
1252        // Wire the per-turn cancellation token into the cancel bridge.
1253        // The bridge translates the `cancel_signal` (Notify) into a CancellationToken cancel.
1254        // Abort the previous bridge before spawning to prevent unbounded accumulation (#2737).
1255        let signal = Arc::clone(&self.lifecycle.cancel_signal);
1256        let token = turn.cancel_token.clone();
1257        // Keep lifecycle.cancel_token in sync so existing code that reads it still works.
1258        self.lifecycle.cancel_token = turn.cancel_token.clone();
1259        if let Some(prev) = self.lifecycle.cancel_bridge_handle.take() {
1260            prev.abort();
1261        }
1262        self.lifecycle.cancel_bridge_handle = Some(tokio::spawn(async move {
1263            signal.notified().await;
1264            token.cancel();
1265        }));
1266
1267        // Clone text out of Turn so we can hold both `&str` borrows and mutate turn.metrics.
1268        let text = turn.input.text.clone();
1269        let trimmed_owned = text.trim().to_owned();
1270        let trimmed = trimmed_owned.as_str();
1271
1272        // Capture current-turn intent for VIGIL gate (FR-007). Truncated to 1024 chars.
1273        // Must be set BEFORE any tool call; cleared at end_turn (FR-008).
1274        if self.security.vigil.is_some() {
1275            let intent_len = trimmed.floor_char_boundary(1024.min(trimmed.len()));
1276            self.session.current_turn_intent = Some(trimmed[..intent_len].to_owned());
1277        }
1278
1279        if let Some(result) = self.dispatch_slash_command(trimmed).await {
1280            return result;
1281        }
1282
1283        self.check_pending_rollbacks().await;
1284
1285        if self.pre_process_security(trimmed).await? {
1286            return Ok(());
1287        }
1288
1289        let t_ctx = std::time::Instant::now();
1290        tracing::debug!("turn timing: prepare_context start");
1291        self.advance_context_lifecycle(&text, trimmed).await;
1292        turn.metrics_mut().timings.prepare_context_ms =
1293            u64::try_from(t_ctx.elapsed().as_millis()).unwrap_or(u64::MAX);
1294        tracing::debug!(
1295            ms = turn.metrics_snapshot().timings.prepare_context_ms,
1296            "turn timing: prepare_context done"
1297        );
1298
1299        let image_parts = std::mem::take(&mut turn.input.image_parts);
1300        let user_msg = self.build_user_message(&text, image_parts);
1301
1302        // Extract URLs from user input and add to user_provided_urls for grounding checks.
1303        // URL set was cleared in begin_turn; re-populate for this turn.
1304        let urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
1305        if !urls.is_empty() {
1306            self.security.user_provided_urls.write().extend(urls);
1307        }
1308
1309        // Capture raw user input as goal text for A-MAC goal-conditioned write gating (#2483).
1310        // Derived from the raw input text before context assembly to avoid timing dependencies.
1311        self.memory_state.extraction.goal_text = Some(text.clone());
1312
1313        let t_persist = std::time::Instant::now();
1314        tracing::debug!("turn timing: persist_message(user) start");
1315        // Image parts intentionally excluded — base64 payloads too large for message history.
1316        self.persist_message(Role::User, &text, &[], false).await;
1317        turn.metrics_mut().timings.persist_message_ms =
1318            u64::try_from(t_persist.elapsed().as_millis()).unwrap_or(u64::MAX);
1319        tracing::debug!(
1320            ms = turn.metrics_snapshot().timings.persist_message_ms,
1321            "turn timing: persist_message(user) done"
1322        );
1323        self.push_message(user_msg);
1324
1325        // llm_chat_ms and tool_exec_ms are accumulated inside call_chat_with_tools and
1326        // handle_native_tool_calls respectively via metrics.pending_timings.
1327        tracing::debug!("turn timing: process_response start");
1328        if let Err(e) = self.process_response().await {
1329            // Detach any in-flight learning tasks before mutating message state.
1330            self.learning_engine.learning_tasks.detach_all();
1331            tracing::error!("Response processing failed: {e:#}");
1332            let user_msg = format!("Error: {e:#}");
1333            self.channel.send(&user_msg).await?;
1334            self.msg.messages.pop();
1335            self.recompute_prompt_tokens();
1336            self.channel.flush_chunks().await?;
1337        } else {
1338            // Detach learning tasks spawned this turn — they are fire-and-forget and must not
1339            // leak into the next turn's context.
1340            self.learning_engine.learning_tasks.detach_all();
1341            self.truncate_old_tool_results();
1342            // MagicDocs: spawn background doc updates if any are due (#2702).
1343            self.maybe_update_magic_docs();
1344        }
1345        tracing::debug!("turn timing: process_response done");
1346
1347        // Collect llm_chat_ms and tool_exec_ms from MetricsState.pending_timings (accumulated
1348        // by the tool execution chain) into turn.metrics so end_turn can flush them.
1349        // This is the Phase 1 bridging: existing code writes to pending_timings directly;
1350        // we harvest those values into Turn before end_turn overwrites pending_timings.
1351        turn.metrics_mut().timings.llm_chat_ms = self.metrics.pending_timings.llm_chat_ms;
1352        turn.metrics_mut().timings.tool_exec_ms = self.metrics.pending_timings.tool_exec_ms;
1353
1354        Ok(())
1355    }
1356
1357    // Returns true if the input was blocked and the caller should return Ok(()) immediately.
1358    #[cfg_attr(
1359        feature = "profiling",
1360        tracing::instrument(name = "agent.security_prescreen", skip_all)
1361    )]
1362    async fn pre_process_security(&mut self, trimmed: &str) -> Result<bool, error::AgentError> {
1363        // Guardrail: LLM-based prompt injection pre-screening at the user input boundary.
1364        if let Some(ref guardrail) = self.security.guardrail {
1365            use zeph_sanitizer::guardrail::GuardrailVerdict;
1366            let verdict = guardrail.check(trimmed).await;
1367            match &verdict {
1368                GuardrailVerdict::Flagged { reason, .. } => {
1369                    tracing::warn!(
1370                        reason = %reason,
1371                        should_block = verdict.should_block(),
1372                        "guardrail flagged user input"
1373                    );
1374                    if verdict.should_block() {
1375                        let msg = format!("[guardrail] Input blocked: {reason}");
1376                        let _ = self.channel.send(&msg).await;
1377                        let _ = self.channel.flush_chunks().await;
1378                        return Ok(true);
1379                    }
1380                    // Warn mode: notify but continue.
1381                    let _ = self
1382                        .channel
1383                        .send(&format!("[guardrail] Warning: {reason}"))
1384                        .await;
1385                }
1386                GuardrailVerdict::Error { error } => {
1387                    if guardrail.error_should_block() {
1388                        tracing::warn!(%error, "guardrail check failed (fail_strategy=closed), blocking input");
1389                        let msg = "[guardrail] Input blocked: check failed (see logs for details)";
1390                        let _ = self.channel.send(msg).await;
1391                        let _ = self.channel.flush_chunks().await;
1392                        return Ok(true);
1393                    }
1394                    tracing::warn!(%error, "guardrail check failed (fail_strategy=open), allowing input");
1395                }
1396                GuardrailVerdict::Safe => {}
1397            }
1398        }
1399
1400        // ML classifier: lightweight injection detection on user input boundary.
1401        // Runs after guardrail (LLM-based) to layer defenses. On detection, blocks and returns.
1402        // Falls back to regex on classifier error/timeout — never degrades below regex baseline.
1403        // Gated by `scan_user_input`: DeBERTa is tuned for external/untrusted content, not
1404        // direct user chat. Disabled by default to prevent false positives on benign messages.
1405        #[cfg(feature = "classifiers")]
1406        if self.security.sanitizer.scan_user_input() {
1407            match self.security.sanitizer.classify_injection(trimmed).await {
1408                zeph_sanitizer::InjectionVerdict::Blocked => {
1409                    self.push_classifier_metrics();
1410                    let _ = self
1411                        .channel
1412                        .send("[security] Input blocked: injection detected by classifier.")
1413                        .await;
1414                    let _ = self.channel.flush_chunks().await;
1415                    return Ok(true);
1416                }
1417                zeph_sanitizer::InjectionVerdict::Suspicious => {
1418                    tracing::warn!("injection_classifier soft_signal on user input");
1419                }
1420                zeph_sanitizer::InjectionVerdict::Clean => {}
1421            }
1422        }
1423        #[cfg(feature = "classifiers")]
1424        self.push_classifier_metrics();
1425
1426        Ok(false)
1427    }
1428
1429    #[cfg_attr(
1430        feature = "profiling",
1431        tracing::instrument(name = "agent.prepare_context", skip_all)
1432    )]
1433    async fn advance_context_lifecycle(&mut self, text: &str, trimmed: &str) {
1434        // Reset per-message pruning cache at the start of each turn (#2298).
1435        self.mcp.pruning_cache.reset();
1436
1437        // Extract before rebuild_system_prompt so the value is not tainted
1438        // by the secrets-bearing system prompt (ConversationId is just an i64).
1439        let conv_id = self.memory_state.persistence.conversation_id;
1440        self.rebuild_system_prompt(text).await;
1441
1442        self.detect_and_record_corrections(trimmed, conv_id).await;
1443        self.learning_engine.tick();
1444        self.analyze_and_learn().await;
1445        self.sync_graph_counts().await;
1446
1447        // Reset per-turn compaction guard FIRST so SideQuest sees a clean slate (C2 fix).
1448        // complete_focus and maybe_sidequest_eviction set this flag when they run (C1 fix).
1449        // advance_turn() transitions CompactedThisTurn → Cooling/Ready; all other states
1450        // pass through unchanged. See CompactionState::advance_turn for ordering guarantees.
1451        self.context_manager.compaction = self.context_manager.compaction.advance_turn();
1452
1453        // Tick Focus Agent and SideQuest turn counters (#1850, #1885).
1454        {
1455            self.focus.tick();
1456
1457            // SideQuest eviction: runs every N user turns when enabled.
1458            // Skipped when is_compacted_this_turn (focus truncation or prior eviction ran).
1459            let sidequest_should_fire = self.sidequest.tick();
1460            if sidequest_should_fire && !self.context_manager.compaction.is_compacted_this_turn() {
1461                self.maybe_sidequest_eviction();
1462            }
1463        }
1464
1465        // Cache-expiry warning (#2715): notify user when prompt cache has likely expired.
1466        if let Some(warning) = self.cache_expiry_warning() {
1467            tracing::info!(warning, "cache expiry warning");
1468            let _ = self.channel.send_status(&warning).await;
1469        }
1470
1471        // Time-based microcompact (#2699): strip stale low-value tool outputs before compaction.
1472        // Zero-LLM-cost; runs only when session gap exceeds configured threshold.
1473        self.maybe_time_based_microcompact();
1474
1475        // Tier 0: batch-apply deferred tool summaries when approaching context limit.
1476        // This is a pure in-memory operation (no LLM call) — summaries were pre-computed
1477        // during the tool loop. Intentionally does NOT set compacted_this_turn, so
1478        // proactive/reactive compaction may still fire if tokens remain above their thresholds.
1479        self.maybe_apply_deferred_summaries();
1480        self.flush_deferred_summaries().await;
1481
1482        // Proactive compression fires first (if configured); if it runs, reactive is skipped.
1483        if let Err(e) = self.maybe_proactive_compress().await {
1484            tracing::warn!("proactive compression failed: {e:#}");
1485        }
1486
1487        if let Err(e) = self.maybe_compact().await {
1488            tracing::warn!("context compaction failed: {e:#}");
1489        }
1490
1491        if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
1492            tracing::warn!("context preparation failed: {e:#}");
1493        }
1494
1495        // MAR: propagate top-1 recall confidence to the router for cost-aware routing.
1496        self.provider
1497            .set_memory_confidence(self.memory_state.persistence.last_recall_confidence);
1498
1499        self.learning_engine.reset_reflection();
1500    }
1501
1502    fn build_user_message(
1503        &mut self,
1504        text: &str,
1505        image_parts: Vec<zeph_llm::provider::MessagePart>,
1506    ) -> Message {
1507        let mut all_image_parts = std::mem::take(&mut self.msg.pending_image_parts);
1508        all_image_parts.extend(image_parts);
1509
1510        if !all_image_parts.is_empty() && self.provider.supports_vision() {
1511            let mut parts = vec![zeph_llm::provider::MessagePart::Text {
1512                text: text.to_owned(),
1513            }];
1514            parts.extend(all_image_parts);
1515            Message::from_parts(Role::User, parts)
1516        } else {
1517            if !all_image_parts.is_empty() {
1518                tracing::warn!(
1519                    count = all_image_parts.len(),
1520                    "image attachments dropped: provider does not support vision"
1521                );
1522            }
1523            Message {
1524                role: Role::User,
1525                content: text.to_owned(),
1526                parts: vec![],
1527                metadata: MessageMetadata::default(),
1528            }
1529        }
1530    }
1531
1532    /// Poll a sub-agent until it reaches a terminal state, bridging secret requests to the
1533    /// channel. Returns a human-readable status string suitable for sending to the user.
1534    async fn poll_subagent_until_done(&mut self, task_id: &str, label: &str) -> Option<String> {
1535        use zeph_subagent::SubAgentState;
1536        let result = loop {
1537            tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1538
1539            // Bridge secret requests from sub-agent to channel.confirm().
1540            // Fetch the pending request first, then release the borrow before
1541            // calling channel.confirm() (which requires &mut self).
1542            #[allow(clippy::redundant_closure_for_method_calls)]
1543            let pending = self
1544                .orchestration
1545                .subagent_manager
1546                .as_mut()
1547                .and_then(|m| m.try_recv_secret_request());
1548            if let Some((req_task_id, req)) = pending {
1549                // req.secret_key is pre-validated to [a-zA-Z0-9_-] in manager.rs
1550                // (SEC-P1-02), so it is safe to embed in the prompt string.
1551                let confirm_prompt = format!(
1552                    "Sub-agent requests secret '{}'. Allow?",
1553                    crate::text::truncate_to_chars(&req.secret_key, 100)
1554                );
1555                let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
1556                if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1557                    if approved {
1558                        let ttl = std::time::Duration::from_mins(5);
1559                        let key = req.secret_key.clone();
1560                        if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
1561                            let _ = mgr.deliver_secret(&req_task_id, key);
1562                        }
1563                    } else {
1564                        let _ = mgr.deny_secret(&req_task_id);
1565                    }
1566                }
1567            }
1568
1569            let mgr = self.orchestration.subagent_manager.as_ref()?;
1570            let statuses = mgr.statuses();
1571            let Some((_, status)) = statuses.iter().find(|(id, _)| id == task_id) else {
1572                break format!("{label} completed (no status available).");
1573            };
1574            match status.state {
1575                SubAgentState::Completed => {
1576                    let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
1577                    break format!("{label} completed: {msg}");
1578                }
1579                SubAgentState::Failed => {
1580                    let msg = status
1581                        .last_message
1582                        .clone()
1583                        .unwrap_or_else(|| "unknown error".into());
1584                    break format!("{label} failed: {msg}");
1585                }
1586                SubAgentState::Canceled => {
1587                    break format!("{label} was cancelled.");
1588                }
1589                _ => {
1590                    let _ = self
1591                        .channel
1592                        .send_status(&format!(
1593                            "{label}: turn {}/{}",
1594                            status.turns_used,
1595                            self.orchestration
1596                                .subagent_manager
1597                                .as_ref()
1598                                .and_then(|m| m.agents_def(task_id))
1599                                .map_or(20, |d| d.permissions.max_turns)
1600                        ))
1601                        .await;
1602                }
1603            }
1604        };
1605        Some(result)
1606    }
1607
1608    /// Resolve a unique full `task_id` from a prefix. Returns `None` if the manager is absent,
1609    /// `Some(Err(msg))` on ambiguity/not-found, `Some(Ok(full_id))` on success.
1610    fn resolve_agent_id_prefix(&mut self, prefix: &str) -> Option<Result<String, String>> {
1611        let mgr = self.orchestration.subagent_manager.as_mut()?;
1612        let full_ids: Vec<String> = mgr
1613            .statuses()
1614            .into_iter()
1615            .map(|(tid, _)| tid)
1616            .filter(|tid| tid.starts_with(prefix))
1617            .collect();
1618        Some(match full_ids.as_slice() {
1619            [] => Err(format!("No sub-agent with id prefix '{prefix}'")),
1620            [fid] => Ok(fid.clone()),
1621            _ => Err(format!(
1622                "Ambiguous id prefix '{prefix}': matches {} agents",
1623                full_ids.len()
1624            )),
1625        })
1626    }
1627
1628    fn handle_agent_list(&self) -> Option<String> {
1629        use std::fmt::Write as _;
1630        let mgr = self.orchestration.subagent_manager.as_ref()?;
1631        let defs = mgr.definitions();
1632        if defs.is_empty() {
1633            return Some("No sub-agent definitions found.".into());
1634        }
1635        let mut out = String::from("Available sub-agents:\n");
1636        for d in defs {
1637            let memory_label = match d.memory {
1638                Some(zeph_subagent::MemoryScope::User) => " [memory:user]",
1639                Some(zeph_subagent::MemoryScope::Project) => " [memory:project]",
1640                Some(zeph_subagent::MemoryScope::Local) => " [memory:local]",
1641                None => "",
1642            };
1643            if let Some(ref src) = d.source {
1644                let _ = writeln!(
1645                    out,
1646                    "  {}{} — {} ({})",
1647                    d.name, memory_label, d.description, src
1648                );
1649            } else {
1650                let _ = writeln!(out, "  {}{} — {}", d.name, memory_label, d.description);
1651            }
1652        }
1653        Some(out)
1654    }
1655
1656    fn handle_agent_status(&self) -> Option<String> {
1657        use std::fmt::Write as _;
1658        let mgr = self.orchestration.subagent_manager.as_ref()?;
1659        let statuses = mgr.statuses();
1660        if statuses.is_empty() {
1661            return Some("No active sub-agents.".into());
1662        }
1663        let mut out = String::from("Active sub-agents:\n");
1664        for (id, s) in &statuses {
1665            let state = format!("{:?}", s.state).to_lowercase();
1666            let elapsed = s.started_at.elapsed().as_secs();
1667            let _ = writeln!(
1668                out,
1669                "  [{short}] {state}  turns={t}  elapsed={elapsed}s  {msg}",
1670                short = &id[..8.min(id.len())],
1671                t = s.turns_used,
1672                msg = s.last_message.as_deref().unwrap_or(""),
1673            );
1674            // Show memory directory path for agents with memory enabled.
1675            if let Some(def) = mgr.agents_def(id)
1676                && let Some(scope) = def.memory
1677                && let Ok(dir) = zeph_subagent::memory::resolve_memory_dir(scope, &def.name)
1678            {
1679                let _ = writeln!(out, "       memory: {}", dir.display());
1680            }
1681        }
1682        Some(out)
1683    }
1684
1685    fn handle_agent_approve(&mut self, id: &str) -> Option<String> {
1686        let full_id = match self.resolve_agent_id_prefix(id)? {
1687            Ok(fid) => fid,
1688            Err(msg) => return Some(msg),
1689        };
1690        let mgr = self.orchestration.subagent_manager.as_mut()?;
1691        if let Some((tid, req)) = mgr.try_recv_secret_request()
1692            && tid == full_id
1693        {
1694            let key = req.secret_key.clone();
1695            let ttl = std::time::Duration::from_mins(5);
1696            if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
1697                return Some(format!("Approve failed: {e}"));
1698            }
1699            if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
1700                return Some(format!("Secret delivery failed: {e}"));
1701            }
1702            return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
1703        }
1704        Some(format!(
1705            "No pending secret request for sub-agent '{full_id}'."
1706        ))
1707    }
1708
1709    fn handle_agent_deny(&mut self, id: &str) -> Option<String> {
1710        let full_id = match self.resolve_agent_id_prefix(id)? {
1711            Ok(fid) => fid,
1712            Err(msg) => return Some(msg),
1713        };
1714        let mgr = self.orchestration.subagent_manager.as_mut()?;
1715        match mgr.deny_secret(&full_id) {
1716            Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
1717            Err(e) => Some(format!("Deny failed: {e}")),
1718        }
1719    }
1720
1721    #[allow(clippy::too_many_lines)]
1722    async fn handle_agent_command(&mut self, cmd: zeph_subagent::AgentCommand) -> Option<String> {
1723        use zeph_subagent::AgentCommand;
1724
1725        match cmd {
1726            AgentCommand::List => self.handle_agent_list(),
1727            AgentCommand::Background { name, prompt } => {
1728                let provider = self.provider.clone();
1729                let tool_executor = Arc::clone(&self.tool_executor);
1730                let skills = self.filtered_skills_for(&name);
1731                let cfg = self.orchestration.subagent_config.clone();
1732                let spawn_ctx = self.build_spawn_context(&cfg);
1733                let mgr = self.orchestration.subagent_manager.as_mut()?;
1734                match mgr.spawn(
1735                    &name,
1736                    &prompt,
1737                    provider,
1738                    tool_executor,
1739                    skills,
1740                    &cfg,
1741                    spawn_ctx,
1742                ) {
1743                    Ok(id) => Some(format!(
1744                        "Sub-agent '{name}' started in background (id: {short})",
1745                        short = &id[..8.min(id.len())]
1746                    )),
1747                    Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
1748                }
1749            }
1750            AgentCommand::Spawn { name, prompt }
1751            | AgentCommand::Mention {
1752                agent: name,
1753                prompt,
1754            } => {
1755                // Foreground spawn: launch and await completion, streaming status to user.
1756                let provider = self.provider.clone();
1757                let tool_executor = Arc::clone(&self.tool_executor);
1758                let skills = self.filtered_skills_for(&name);
1759                let cfg = self.orchestration.subagent_config.clone();
1760                let spawn_ctx = self.build_spawn_context(&cfg);
1761                let mgr = self.orchestration.subagent_manager.as_mut()?;
1762                let task_id = match mgr.spawn(
1763                    &name,
1764                    &prompt,
1765                    provider,
1766                    tool_executor,
1767                    skills,
1768                    &cfg,
1769                    spawn_ctx,
1770                ) {
1771                    Ok(id) => id,
1772                    Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
1773                };
1774                let short = task_id[..8.min(task_id.len())].to_owned();
1775                let _ = self
1776                    .channel
1777                    .send(&format!("Sub-agent '{name}' running... (id: {short})"))
1778                    .await;
1779                let label = format!("Sub-agent '{name}'");
1780                self.poll_subagent_until_done(&task_id, &label).await
1781            }
1782            AgentCommand::Status => self.handle_agent_status(),
1783            AgentCommand::Cancel { id } => {
1784                let mgr = self.orchestration.subagent_manager.as_mut()?;
1785                // Accept prefix match on task_id.
1786                let ids: Vec<String> = mgr
1787                    .statuses()
1788                    .into_iter()
1789                    .map(|(task_id, _)| task_id)
1790                    .filter(|task_id| task_id.starts_with(&id))
1791                    .collect();
1792                match ids.as_slice() {
1793                    [] => Some(format!("No sub-agent with id prefix '{id}'")),
1794                    [full_id] => {
1795                        let full_id = full_id.clone();
1796                        match mgr.cancel(&full_id) {
1797                            Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
1798                            Err(e) => Some(format!("Cancel failed: {e}")),
1799                        }
1800                    }
1801                    _ => Some(format!(
1802                        "Ambiguous id prefix '{id}': matches {} agents",
1803                        ids.len()
1804                    )),
1805                }
1806            }
1807            AgentCommand::Approve { id } => self.handle_agent_approve(&id),
1808            AgentCommand::Deny { id } => self.handle_agent_deny(&id),
1809            AgentCommand::Resume { id, prompt } => {
1810                let cfg = self.orchestration.subagent_config.clone();
1811                // Resolve definition name from transcript meta before spawning so we can
1812                // look up skills by definition name rather than the UUID prefix (S1 fix).
1813                let def_name = {
1814                    let mgr = self.orchestration.subagent_manager.as_ref()?;
1815                    match mgr.def_name_for_resume(&id, &cfg) {
1816                        Ok(name) => name,
1817                        Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
1818                    }
1819                };
1820                let skills = self.filtered_skills_for(&def_name);
1821                let provider = self.provider.clone();
1822                let tool_executor = Arc::clone(&self.tool_executor);
1823                let mgr = self.orchestration.subagent_manager.as_mut()?;
1824                let (task_id, _) =
1825                    match mgr.resume(&id, &prompt, provider, tool_executor, skills, &cfg) {
1826                        Ok(pair) => pair,
1827                        Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
1828                    };
1829                let short = task_id[..8.min(task_id.len())].to_owned();
1830                let _ = self
1831                    .channel
1832                    .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
1833                    .await;
1834                self.poll_subagent_until_done(&task_id, "Resumed sub-agent")
1835                    .await
1836            }
1837        }
1838    }
1839
1840    fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
1841        let mgr = self.orchestration.subagent_manager.as_ref()?;
1842        let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
1843        let reg = self.skill_state.registry.read();
1844        match zeph_subagent::filter_skills(&reg, &def.skills) {
1845            Ok(skills) => {
1846                let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
1847                if bodies.is_empty() {
1848                    None
1849                } else {
1850                    Some(bodies)
1851                }
1852            }
1853            Err(e) => {
1854                tracing::warn!(error = %e, "skill filtering failed for sub-agent");
1855                None
1856            }
1857        }
1858    }
1859
1860    /// Build a `SpawnContext` from current agent state for sub-agent spawning.
1861    fn build_spawn_context(
1862        &self,
1863        cfg: &zeph_config::SubAgentConfig,
1864    ) -> zeph_subagent::SpawnContext {
1865        zeph_subagent::SpawnContext {
1866            parent_messages: self.extract_parent_messages(cfg),
1867            parent_cancel: Some(self.lifecycle.cancel_token.clone()),
1868            parent_provider_name: {
1869                let name = &self.runtime.active_provider_name;
1870                if name.is_empty() {
1871                    None
1872                } else {
1873                    Some(name.clone())
1874                }
1875            },
1876            spawn_depth: self.runtime.spawn_depth,
1877            mcp_tool_names: self.extract_mcp_tool_names(),
1878        }
1879    }
1880
1881    /// Extract recent parent messages for history propagation (Section 5.7 in spec).
1882    ///
1883    /// Filters system messages, takes last `context_window_turns * 2` messages,
1884    /// and applies a 25% context window cap using a 4-chars-per-token heuristic.
1885    fn extract_parent_messages(
1886        &self,
1887        config: &zeph_config::SubAgentConfig,
1888    ) -> Vec<zeph_llm::provider::Message> {
1889        use zeph_llm::provider::Role;
1890        if config.context_window_turns == 0 {
1891            return Vec::new();
1892        }
1893        let non_system: Vec<_> = self
1894            .msg
1895            .messages
1896            .iter()
1897            .filter(|m| m.role != Role::System)
1898            .cloned()
1899            .collect();
1900        let take_count = config.context_window_turns * 2;
1901        let start = non_system.len().saturating_sub(take_count);
1902        let mut msgs = non_system[start..].to_vec();
1903
1904        // Cap at 25% of model context window (rough 4-chars-per-token heuristic).
1905        let max_chars = 128_000usize / 4; // conservative default; 25% of 128K tokens
1906        let mut total_chars: usize = 0;
1907        let mut keep = msgs.len();
1908        for (i, m) in msgs.iter().enumerate() {
1909            total_chars += m.content.len();
1910            if total_chars > max_chars {
1911                keep = i;
1912                break;
1913            }
1914        }
1915        if keep < msgs.len() {
1916            tracing::info!(
1917                kept = keep,
1918                requested = config.context_window_turns * 2,
1919                "[subagent] truncated parent history from {} to {} turns due to token budget",
1920                config.context_window_turns * 2,
1921                keep
1922            );
1923            msgs.truncate(keep);
1924        }
1925        msgs
1926    }
1927
1928    /// Extract MCP tool names from the tool executor for diagnostic annotation.
1929    fn extract_mcp_tool_names(&self) -> Vec<String> {
1930        self.tool_executor
1931            .tool_definitions_erased()
1932            .into_iter()
1933            .filter(|t| t.id.starts_with("mcp_"))
1934            .map(|t| t.id.to_string())
1935            .collect()
1936    }
1937
1938    /// Classify a skill directory's source kind using on-disk markers and the bundled allowlist.
1939    ///
1940    /// Must be called from a blocking context (uses synchronous FS I/O).
1941    fn classify_source_kind(
1942        skill_dir: &std::path::Path,
1943        managed_dir: Option<&std::path::PathBuf>,
1944        bundled_names: &std::collections::HashSet<String>,
1945    ) -> zeph_memory::store::SourceKind {
1946        if managed_dir.is_some_and(|d| skill_dir.starts_with(d)) {
1947            let skill_name = skill_dir.file_name().and_then(|n| n.to_str()).unwrap_or("");
1948            let has_marker = skill_dir.join(".bundled").exists();
1949            if has_marker && bundled_names.contains(skill_name) {
1950                zeph_memory::store::SourceKind::Bundled
1951            } else {
1952                if has_marker {
1953                    tracing::warn!(
1954                        skill = %skill_name,
1955                        "skill has .bundled marker but is not in the bundled skill \
1956                         allowlist — classifying as Hub"
1957                    );
1958                }
1959                zeph_memory::store::SourceKind::Hub
1960            }
1961        } else {
1962            zeph_memory::store::SourceKind::Local
1963        }
1964    }
1965
1966    /// Update trust DB records for all reloaded skills.
1967    async fn update_trust_for_reloaded_skills(
1968        &mut self,
1969        all_meta: &[zeph_skills::loader::SkillMeta],
1970    ) {
1971        // Clone Arc before any .await so no &self fields are held across suspension points.
1972        let memory = self.memory_state.persistence.memory.clone();
1973        let Some(memory) = memory else {
1974            return;
1975        };
1976        let trust_cfg = self.skill_state.trust_config.clone();
1977        let managed_dir = self.skill_state.managed_dir.clone();
1978        let bundled_names: std::collections::HashSet<String> =
1979            zeph_skills::bundled_skill_names().into_iter().collect();
1980        for meta in all_meta {
1981            // Compute hash and classify source_kind in spawn_blocking — both are blocking FS calls
1982            // (.bundled marker .exists() and compute_skill_hash both do std::fs I/O).
1983            let skill_dir = meta.skill_dir.clone();
1984            let managed_dir_ref = managed_dir.clone();
1985            let bundled_names_ref = bundled_names.clone();
1986            let fs_result: Option<(String, zeph_memory::store::SourceKind)> =
1987                tokio::task::spawn_blocking(move || {
1988                    let hash = zeph_skills::compute_skill_hash(&skill_dir).ok()?;
1989                    let source_kind = Self::classify_source_kind(
1990                        &skill_dir,
1991                        managed_dir_ref.as_ref(),
1992                        &bundled_names_ref,
1993                    );
1994                    Some((hash, source_kind))
1995                })
1996                .await
1997                .unwrap_or(None);
1998
1999            let Some((current_hash, source_kind)) = fs_result else {
2000                tracing::warn!("failed to compute hash for '{}'", meta.name);
2001                continue;
2002            };
2003            let initial_level = match source_kind {
2004                zeph_memory::store::SourceKind::Bundled => &trust_cfg.bundled_level,
2005                zeph_memory::store::SourceKind::Hub => &trust_cfg.default_level,
2006                zeph_memory::store::SourceKind::Local | zeph_memory::store::SourceKind::File => {
2007                    &trust_cfg.local_level
2008                }
2009            };
2010            let existing = memory
2011                .sqlite()
2012                .load_skill_trust(&meta.name)
2013                .await
2014                .ok()
2015                .flatten();
2016            let trust_level_str = if let Some(ref row) = existing {
2017                if row.blake3_hash != current_hash {
2018                    trust_cfg.hash_mismatch_level.to_string()
2019                } else if row.source_kind != source_kind {
2020                    // source_kind changed (e.g., hub → bundled on upgrade).
2021                    // Never override an explicit operator block. For active trust levels,
2022                    // adopt the source-kind initial level when it grants more trust.
2023                    let stored = row
2024                        .trust_level
2025                        .parse::<zeph_tools::SkillTrustLevel>()
2026                        .unwrap_or_else(|_| {
2027                            tracing::warn!(
2028                                skill = %meta.name,
2029                                raw = %row.trust_level,
2030                                "unrecognised trust_level in DB, treating as quarantined"
2031                            );
2032                            zeph_tools::SkillTrustLevel::Quarantined
2033                        });
2034                    if !stored.is_active() || stored.severity() <= initial_level.severity() {
2035                        row.trust_level.clone()
2036                    } else {
2037                        initial_level.to_string()
2038                    }
2039                } else {
2040                    row.trust_level.clone()
2041                }
2042            } else {
2043                initial_level.to_string()
2044            };
2045            let source_path = meta.skill_dir.to_str();
2046            if let Err(e) = memory
2047                .sqlite()
2048                .upsert_skill_trust(
2049                    &meta.name,
2050                    &trust_level_str,
2051                    source_kind,
2052                    None,
2053                    source_path,
2054                    &current_hash,
2055                )
2056                .await
2057            {
2058                tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
2059            }
2060        }
2061    }
2062
2063    /// Rebuild or sync the in-memory skill matcher and BM25 index after a registry update.
2064    async fn rebuild_skill_matcher(&mut self, all_meta: &[&zeph_skills::loader::SkillMeta]) {
2065        let provider = self.embedding_provider.clone();
2066        let embed_timeout = std::time::Duration::from_secs(self.runtime.timeouts.embedding_seconds);
2067        let embed_fn = move |text: &str| -> zeph_skills::matcher::EmbedFuture {
2068            let owned = text.to_owned();
2069            let p = provider.clone();
2070            Box::pin(async move {
2071                if let Ok(result) = tokio::time::timeout(embed_timeout, p.embed(&owned)).await {
2072                    result
2073                } else {
2074                    tracing::warn!(
2075                        timeout_secs = embed_timeout.as_secs(),
2076                        "skill matcher: embedding timed out"
2077                    );
2078                    Err(zeph_llm::LlmError::Timeout)
2079                }
2080            })
2081        };
2082
2083        let needs_inmemory_rebuild = !self
2084            .skill_state
2085            .matcher
2086            .as_ref()
2087            .is_some_and(SkillMatcherBackend::is_qdrant);
2088
2089        if needs_inmemory_rebuild {
2090            self.skill_state.matcher = SkillMatcher::new(all_meta, embed_fn)
2091                .await
2092                .map(SkillMatcherBackend::InMemory);
2093        } else if let Some(ref mut backend) = self.skill_state.matcher {
2094            let _ = self.channel.send_status("syncing skill index...").await;
2095            let on_progress: Option<Box<dyn Fn(usize, usize) + Send>> = self
2096                .session
2097                .status_tx
2098                .clone()
2099                .map(|tx| -> Box<dyn Fn(usize, usize) + Send> {
2100                    Box::new(move |completed, total| {
2101                        let msg = format!("Syncing skills: {completed}/{total}");
2102                        let _ = tx.send(msg);
2103                    })
2104                });
2105            if let Err(e) = backend
2106                .sync(
2107                    all_meta,
2108                    &self.skill_state.embedding_model,
2109                    embed_fn,
2110                    on_progress,
2111                )
2112                .await
2113            {
2114                tracing::warn!("failed to sync skill embeddings: {e:#}");
2115            }
2116        }
2117
2118        if self.skill_state.hybrid_search {
2119            let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
2120            let _ = self.channel.send_status("rebuilding search index...").await;
2121            self.skill_state.rebuild_bm25(&descs);
2122        }
2123    }
2124
2125    #[cfg_attr(
2126        feature = "profiling",
2127        tracing::instrument(name = "skill.hot_reload", skip_all)
2128    )]
2129    async fn reload_skills(&mut self) {
2130        let old_fp = self.skill_state.fingerprint();
2131        let reload_paths = if let Some(ref supplier) = self.skill_state.plugin_dirs_supplier {
2132            let plugin_dirs = supplier();
2133            let mut paths = self.skill_state.skill_paths.clone();
2134            for dir in plugin_dirs {
2135                if !paths.contains(&dir) {
2136                    paths.push(dir);
2137                }
2138            }
2139            paths
2140        } else {
2141            self.skill_state.skill_paths.clone()
2142        };
2143        self.skill_state.registry.write().reload(&reload_paths);
2144        if self.skill_state.fingerprint() == old_fp {
2145            return;
2146        }
2147        let _ = self.channel.send_status("reloading skills...").await;
2148
2149        let all_meta = self
2150            .skill_state
2151            .registry
2152            .read()
2153            .all_meta()
2154            .into_iter()
2155            .cloned()
2156            .collect::<Vec<_>>();
2157
2158        self.update_trust_for_reloaded_skills(&all_meta).await;
2159
2160        let all_meta_refs = all_meta.iter().collect::<Vec<_>>();
2161        self.rebuild_skill_matcher(&all_meta_refs).await;
2162
2163        let all_skills: Vec<Skill> = {
2164            let reg = self.skill_state.registry.read();
2165            reg.all_meta()
2166                .iter()
2167                .filter_map(|m| reg.get_skill(&m.name).ok())
2168                .collect()
2169        };
2170        let trust_map = self.build_skill_trust_map().await;
2171        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
2172        let skills_prompt = SkillState::rebuild_prompt(&all_skills, &trust_map, &empty_health);
2173        self.skill_state
2174            .last_skills_prompt
2175            .clone_from(&skills_prompt);
2176        let system_prompt = build_system_prompt(&skills_prompt, None);
2177        if let Some(msg) = self.msg.messages.first_mut() {
2178            msg.content = system_prompt;
2179        }
2180
2181        let _ = self.channel.send_status("").await;
2182        tracing::info!(
2183            "reloaded {} skill(s)",
2184            self.skill_state.registry.read().all_meta().len()
2185        );
2186    }
2187
2188    fn reload_instructions(&mut self) {
2189        // Drain any additional queued events before reloading to avoid redundant reloads.
2190        if let Some(ref mut rx) = self.instructions.reload_rx {
2191            while rx.try_recv().is_ok() {}
2192        }
2193        let Some(ref state) = self.instructions.reload_state else {
2194            return;
2195        };
2196        let new_blocks = crate::instructions::load_instructions(
2197            &state.base_dir,
2198            &state.provider_kinds,
2199            &state.explicit_files,
2200            state.auto_detect,
2201        );
2202        let old_sources: std::collections::HashSet<_> =
2203            self.instructions.blocks.iter().map(|b| &b.source).collect();
2204        let new_sources: std::collections::HashSet<_> =
2205            new_blocks.iter().map(|b| &b.source).collect();
2206        for added in new_sources.difference(&old_sources) {
2207            tracing::info!(path = %added.display(), "instruction file added");
2208        }
2209        for removed in old_sources.difference(&new_sources) {
2210            tracing::info!(path = %removed.display(), "instruction file removed");
2211        }
2212        tracing::info!(
2213            old_count = self.instructions.blocks.len(),
2214            new_count = new_blocks.len(),
2215            "reloaded instruction files"
2216        );
2217        self.instructions.blocks = new_blocks;
2218    }
2219
2220    fn reload_config(&mut self) {
2221        let Some(path) = self.lifecycle.config_path.clone() else {
2222            return;
2223        };
2224        let Some(config) = self.load_config_with_overlay(&path) else {
2225            return;
2226        };
2227        let budget_tokens = resolve_context_budget(&config, &self.provider);
2228        self.runtime.security = config.security;
2229        self.runtime.timeouts = config.timeouts;
2230        self.runtime.redact_credentials = config.memory.redact_credentials;
2231        self.memory_state.persistence.history_limit = config.memory.history_limit;
2232        self.memory_state.persistence.recall_limit = config.memory.semantic.recall_limit;
2233        self.memory_state.compaction.summarization_threshold =
2234            config.memory.summarization_threshold;
2235        self.skill_state.max_active_skills = config.skills.max_active_skills;
2236        self.skill_state.disambiguation_threshold = config.skills.disambiguation_threshold;
2237        self.skill_state.min_injection_score = config.skills.min_injection_score;
2238        self.skill_state.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
2239        self.skill_state.hybrid_search = config.skills.hybrid_search;
2240        self.skill_state.two_stage_matching = config.skills.two_stage_matching;
2241        self.skill_state.confusability_threshold =
2242            config.skills.confusability_threshold.clamp(0.0, 1.0);
2243        config
2244            .skills
2245            .generation_provider
2246            .as_str()
2247            .clone_into(&mut self.skill_state.generation_provider_name);
2248        self.skill_state.generation_output_dir =
2249            config.skills.generation_output_dir.as_deref().map(|p| {
2250                if let Some(stripped) = p.strip_prefix("~/") {
2251                    dirs::home_dir()
2252                        .map_or_else(|| std::path::PathBuf::from(p), |h| h.join(stripped))
2253                } else {
2254                    std::path::PathBuf::from(p)
2255                }
2256            });
2257
2258        self.context_manager.budget = Some(
2259            ContextBudget::new(budget_tokens, 0.20).with_graph_enabled(config.memory.graph.enabled),
2260        );
2261
2262        {
2263            let graph_cfg = &config.memory.graph;
2264            if graph_cfg.rpe.enabled {
2265                // Re-create router only if it doesn't exist yet; preserve state on hot-reload.
2266                if self.memory_state.extraction.rpe_router.is_none() {
2267                    self.memory_state.extraction.rpe_router =
2268                        Some(std::sync::Mutex::new(zeph_memory::RpeRouter::new(
2269                            graph_cfg.rpe.threshold,
2270                            graph_cfg.rpe.max_skip_turns,
2271                        )));
2272                }
2273            } else {
2274                self.memory_state.extraction.rpe_router = None;
2275            }
2276            self.memory_state.extraction.graph_config = graph_cfg.clone();
2277        }
2278        self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
2279        self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
2280        self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
2281        self.context_manager.compaction_cooldown_turns = config.memory.compaction_cooldown_turns;
2282        self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
2283        self.context_manager.compression = config.memory.compression.clone();
2284        self.context_manager.routing = config.memory.store_routing.clone();
2285        // Resolve routing_classifier_provider from the provider pool (#2484).
2286        self.context_manager.store_routing_provider = if config
2287            .memory
2288            .store_routing
2289            .routing_classifier_provider
2290            .is_empty()
2291        {
2292            None
2293        } else {
2294            let resolved = self.resolve_background_provider(
2295                &config.memory.store_routing.routing_classifier_provider,
2296            );
2297            Some(std::sync::Arc::new(resolved))
2298        };
2299        self.memory_state.persistence.cross_session_score_threshold =
2300            config.memory.cross_session_score_threshold;
2301
2302        self.index.repo_map_tokens = config.index.repo_map_tokens;
2303        self.index.repo_map_ttl = std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
2304
2305        tracing::info!("config reloaded");
2306    }
2307
2308    /// Load config from disk, apply plugin overlays, and warn on shell divergence.
2309    ///
2310    /// Returns `None` when loading or overlay merge fails (caller keeps prior runtime state).
2311    fn load_config_with_overlay(&mut self, path: &std::path::Path) -> Option<Config> {
2312        let mut config = match Config::load(path) {
2313            Ok(c) => c,
2314            Err(e) => {
2315                tracing::warn!("config reload failed: {e:#}");
2316                return None;
2317            }
2318        };
2319
2320        // Re-apply plugin overlays. On error, keep previous runtime state intact.
2321        let new_overlay = if self.lifecycle.plugins_dir.as_os_str().is_empty() {
2322            None
2323        } else {
2324            match zeph_plugins::apply_plugin_config_overlays(
2325                &mut config,
2326                &self.lifecycle.plugins_dir,
2327            ) {
2328                Ok(o) => Some(o),
2329                Err(e) => {
2330                    tracing::warn!(
2331                        "plugin overlay merge failed during reload: {e:#}; \
2332                         keeping previous runtime state"
2333                    );
2334                    return None;
2335                }
2336            }
2337        };
2338
2339        // M4: detect shell-level divergence from the baked-in executor and warn loudly.
2340        // ShellExecutor is not rebuilt on hot-reload; only skill threshold is live.
2341        // A follow-up P2 issue tracks live-rebuild of ShellExecutor.
2342        if let Some(ref overlay) = new_overlay {
2343            self.warn_on_shell_overlay_divergence(overlay, &config);
2344        }
2345        Some(config)
2346    }
2347
2348    /// React to shell policy divergence detected on hot-reload.
2349    ///
2350    /// `blocked_commands` is rebuilt live via `ShellPolicyHandle::rebuild` — no restart needed.
2351    /// `allowed_commands` cannot be rebuilt (feeds sandbox path intersection at construction time)
2352    /// — emit a warn + status banner when it changes.
2353    fn warn_on_shell_overlay_divergence(
2354        &self,
2355        new_overlay: &zeph_plugins::ResolvedOverlay,
2356        config: &Config,
2357    ) {
2358        let new_blocked: Vec<String> = {
2359            let mut v = config.tools.shell.blocked_commands.clone();
2360            v.sort();
2361            v
2362        };
2363        let new_allowed: Vec<String> = {
2364            let mut v = config.tools.shell.allowed_commands.clone();
2365            v.sort();
2366            v
2367        };
2368
2369        let startup = &self.lifecycle.startup_shell_overlay;
2370        let blocked_changed = new_blocked != startup.blocked;
2371        let allowed_changed = new_allowed != startup.allowed;
2372
2373        // blocked_commands IS rebuilt live — emit info-level confirmation only.
2374        if blocked_changed && let Some(ref h) = self.lifecycle.shell_policy_handle {
2375            h.rebuild(&config.tools.shell);
2376            tracing::info!(
2377                blocked_count = h.snapshot_blocked().len(),
2378                "shell blocked_commands rebuilt from hot-reload"
2379            );
2380        }
2381
2382        // allowed_commands cannot be rebuilt — sandbox path intersection is computed at
2383        // executor construction time. Warn loudly so the user restarts.
2384        //
2385        // Note: when base `allowed_commands` is empty (the default), the overlay's
2386        // intersection semantics keep it empty, so this branch is silently unreachable
2387        // for users who do not set a non-empty base list.
2388        if allowed_changed {
2389            let msg = "plugin config overlay changed shell allowed_commands; RESTART REQUIRED \
2390                 for sandbox path recomputation (blocked_commands was rebuilt live)";
2391            tracing::warn!("{msg}");
2392            if let Some(ref tx) = self.session.status_tx {
2393                let _ = tx.send(msg.to_owned());
2394            }
2395        }
2396
2397        let _ = new_overlay;
2398    }
2399
2400    /// Run `SideQuest` tool output eviction pass (#1885).
2401    ///
2402    /// PERF-1 fix: two-phase non-blocking design.
2403    ///
2404    /// Phase 1 (apply, this turn): check for a background LLM result spawned last turn,
2405    /// validate and apply it immediately.
2406    ///
2407    /// Phase 2 (schedule, this turn): rebuild cursors and spawn a background `tokio::spawn`
2408    /// task for the LLM call. The result is stored in `pending_sidequest_result` and applied
2409    /// next turn, so the current agent turn is never blocked by the LLM call.
2410    #[allow(clippy::too_many_lines)]
2411    fn maybe_sidequest_eviction(&mut self) {
2412        use zeph_llm::provider::{Message, MessageMetadata, Role};
2413
2414        // S1 runtime guard: warn when SideQuest is enabled alongside a non-Reactive pruning
2415        // strategy — the two systems share the same pool of evictable tool outputs and can
2416        // interfere. Disable sidequest.enabled when pruning_strategy != Reactive.
2417        if self.sidequest.config.enabled {
2418            use crate::config::PruningStrategy;
2419            if !matches!(
2420                self.context_manager.compression.pruning_strategy,
2421                PruningStrategy::Reactive
2422            ) {
2423                tracing::warn!(
2424                    strategy = ?self.context_manager.compression.pruning_strategy,
2425                    "sidequest is enabled alongside a non-Reactive pruning strategy; \
2426                     consider disabling sidequest.enabled to avoid redundant eviction"
2427                );
2428            }
2429        }
2430
2431        // Guard: do not evict while a focus session is active.
2432        if self.focus.is_active() {
2433            tracing::debug!("sidequest: skipping — focus session active");
2434            // Drop any pending result — cursors may be stale relative to focus truncation.
2435            self.compression.pending_sidequest_result = None;
2436            return;
2437        }
2438
2439        // Phase 1: apply pending result from last turn's background LLM call.
2440        if let Some(handle) = self.compression.pending_sidequest_result.take() {
2441            // `now_or_never` avoids blocking — if the task isn't done yet, skip this turn.
2442            use futures::FutureExt as _;
2443            match handle.now_or_never() {
2444                Some(Ok(Some(evicted_indices))) if !evicted_indices.is_empty() => {
2445                    let cursors_snapshot = self.sidequest.tool_output_cursors.clone();
2446                    let freed = self.sidequest.apply_eviction(
2447                        &mut self.msg.messages,
2448                        &evicted_indices,
2449                        &self.metrics.token_counter,
2450                    );
2451                    if freed > 0 {
2452                        self.recompute_prompt_tokens();
2453                        // C1 fix: prevent maybe_compact() from firing in the same turn.
2454                        // cooldown=0: eviction does not impose post-compaction cooldown.
2455                        self.context_manager.compaction =
2456                            crate::agent::context_manager::CompactionState::CompactedThisTurn {
2457                                cooldown: 0,
2458                            };
2459                        tracing::info!(
2460                            freed_tokens = freed,
2461                            evicted_cursors = evicted_indices.len(),
2462                            pass = self.sidequest.passes_run,
2463                            "sidequest eviction complete"
2464                        );
2465                        if let Some(ref d) = self.debug_state.debug_dumper {
2466                            d.dump_sidequest_eviction(&cursors_snapshot, &evicted_indices, freed);
2467                        }
2468                        if let Some(ref tx) = self.session.status_tx {
2469                            let _ = tx.send(format!("SideQuest evicted {freed} tokens"));
2470                        }
2471                    } else {
2472                        // apply_eviction returned 0 — clear spinner so it doesn't dangle.
2473                        if let Some(ref tx) = self.session.status_tx {
2474                            let _ = tx.send(String::new());
2475                        }
2476                    }
2477                }
2478                Some(Ok(None | Some(_))) => {
2479                    tracing::debug!("sidequest: pending result: no cursors to evict");
2480                    if let Some(ref tx) = self.session.status_tx {
2481                        let _ = tx.send(String::new());
2482                    }
2483                }
2484                Some(Err(e)) => {
2485                    tracing::debug!("sidequest: background task panicked: {e}");
2486                    if let Some(ref tx) = self.session.status_tx {
2487                        let _ = tx.send(String::new());
2488                    }
2489                }
2490                None => {
2491                    // Task still running — re-store and wait another turn.
2492                    // We already took it; we'd need to re-spawn, but instead just drop and
2493                    // schedule fresh below to keep the cursor list current.
2494                    tracing::debug!(
2495                        "sidequest: background LLM task not yet complete, rescheduling"
2496                    );
2497                }
2498            }
2499        }
2500
2501        // Phase 2: rebuild cursors and schedule the next background eviction LLM call.
2502        self.sidequest
2503            .rebuild_cursors(&self.msg.messages, &self.metrics.token_counter);
2504
2505        if self.sidequest.tool_output_cursors.is_empty() {
2506            tracing::debug!("sidequest: no eligible cursors");
2507            return;
2508        }
2509
2510        let prompt = self.sidequest.build_eviction_prompt();
2511        let max_eviction_ratio = self.sidequest.config.max_eviction_ratio;
2512        let n_cursors = self.sidequest.tool_output_cursors.len();
2513        // Clone the provider so the spawn closure owns it without borrowing self.
2514        let provider = self.summary_or_primary_provider().clone();
2515
2516        // Spawn background task: the LLM call runs without blocking the agent loop.
2517        let handle = tokio::spawn(async move {
2518            let msgs = [Message {
2519                role: Role::User,
2520                content: prompt,
2521                parts: vec![],
2522                metadata: MessageMetadata::default(),
2523            }];
2524            let response =
2525                match tokio::time::timeout(std::time::Duration::from_secs(5), provider.chat(&msgs))
2526                    .await
2527                {
2528                    Ok(Ok(r)) => r,
2529                    Ok(Err(e)) => {
2530                        tracing::debug!("sidequest bg: LLM call failed: {e:#}");
2531                        return None;
2532                    }
2533                    Err(_) => {
2534                        tracing::debug!("sidequest bg: LLM call timed out");
2535                        return None;
2536                    }
2537                };
2538
2539            let start = response.find('{')?;
2540            let end = response.rfind('}')?;
2541            if start > end {
2542                return None;
2543            }
2544            let json_slice = &response[start..=end];
2545            let parsed: sidequest::EvictionResponse = serde_json::from_str(json_slice).ok()?;
2546            let mut valid: Vec<usize> = parsed
2547                .del_cursors
2548                .into_iter()
2549                .filter(|&c| c < n_cursors)
2550                .collect();
2551            valid.sort_unstable();
2552            valid.dedup();
2553            #[allow(
2554                clippy::cast_precision_loss,
2555                clippy::cast_possible_truncation,
2556                clippy::cast_sign_loss
2557            )]
2558            let max_evict = ((n_cursors as f32) * max_eviction_ratio).ceil() as usize;
2559            valid.truncate(max_evict);
2560            Some(valid)
2561        });
2562
2563        self.compression.pending_sidequest_result = Some(handle);
2564        tracing::debug!("sidequest: background LLM eviction task spawned");
2565        if let Some(ref tx) = self.session.status_tx {
2566            let _ = tx.send("SideQuest: scoring tool outputs...".into());
2567        }
2568    }
2569
2570    /// Check if the process cwd has changed since last call and fire `CwdChanged` hooks.
2571    ///
2572    /// Called after each tool batch completes. The check is a single syscall and has
2573    /// negligible cost. Only fires when cwd actually changed (defense-in-depth: normally
2574    /// only `set_working_directory` changes cwd; shell child processes cannot affect it).
2575    pub(crate) async fn check_cwd_changed(&mut self) {
2576        let current = match std::env::current_dir() {
2577            Ok(p) => p,
2578            Err(e) => {
2579                tracing::warn!("check_cwd_changed: failed to get cwd: {e}");
2580                return;
2581            }
2582        };
2583        if current == self.lifecycle.last_known_cwd {
2584            return;
2585        }
2586        let old_cwd = std::mem::replace(&mut self.lifecycle.last_known_cwd, current.clone());
2587        self.session.env_context.working_dir = current.display().to_string();
2588
2589        tracing::info!(
2590            old = %old_cwd.display(),
2591            new = %current.display(),
2592            "working directory changed"
2593        );
2594
2595        let _ = self
2596            .channel
2597            .send_status("Working directory changed\u{2026}")
2598            .await;
2599
2600        let hooks = self.session.hooks_config.cwd_changed.clone();
2601        if !hooks.is_empty() {
2602            let mut env = std::collections::HashMap::new();
2603            env.insert("ZEPH_OLD_CWD".to_owned(), old_cwd.display().to_string());
2604            env.insert("ZEPH_NEW_CWD".to_owned(), current.display().to_string());
2605            if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env).await {
2606                tracing::warn!(error = %e, "CwdChanged hook failed");
2607            }
2608        }
2609
2610        let _ = self.channel.send_status("").await;
2611    }
2612
2613    /// Handle a `FileChangedEvent` from the file watcher.
2614    pub(crate) async fn handle_file_changed(
2615        &mut self,
2616        event: crate::file_watcher::FileChangedEvent,
2617    ) {
2618        tracing::info!(path = %event.path.display(), "file changed");
2619
2620        let _ = self
2621            .channel
2622            .send_status("Running file-change hook\u{2026}")
2623            .await;
2624
2625        let hooks = self.session.hooks_config.file_changed_hooks.clone();
2626        if !hooks.is_empty() {
2627            let mut env = std::collections::HashMap::new();
2628            env.insert(
2629                "ZEPH_CHANGED_PATH".to_owned(),
2630                event.path.display().to_string(),
2631            );
2632            if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env).await {
2633                tracing::warn!(error = %e, "FileChanged hook failed");
2634            }
2635        }
2636
2637        let _ = self.channel.send_status("").await;
2638    }
2639}
2640pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
2641    while !*rx.borrow_and_update() {
2642        if rx.changed().await.is_err() {
2643            std::future::pending::<()>().await;
2644        }
2645    }
2646}
2647
2648pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
2649    match rx {
2650        Some(inner) => {
2651            if let Some(v) = inner.recv().await {
2652                Some(v)
2653            } else {
2654                *rx = None;
2655                std::future::pending().await
2656            }
2657        }
2658        None => std::future::pending().await,
2659    }
2660}
2661
2662/// Resolve the effective context budget from config, applying the `auto_budget` fallback.
2663///
2664/// Mirrors `AppBuilder::auto_budget_tokens` so hot-reload and initial startup use the same
2665/// logic: if `auto_budget = true` and `context_budget_tokens == 0`, query the provider's
2666/// context window; if still 0, fall back to 128 000 tokens.
2667pub(crate) fn resolve_context_budget(config: &Config, provider: &AnyProvider) -> usize {
2668    let tokens = if config.memory.auto_budget && config.memory.context_budget_tokens == 0 {
2669        if let Some(ctx_size) = provider.context_window() {
2670            tracing::info!(
2671                model_context = ctx_size,
2672                "auto-configured context budget on reload"
2673            );
2674            ctx_size
2675        } else {
2676            0
2677        }
2678    } else {
2679        config.memory.context_budget_tokens
2680    };
2681    if tokens == 0 {
2682        tracing::warn!(
2683            "context_budget_tokens resolved to 0 on reload — using fallback of 128000 tokens"
2684        );
2685        128_000
2686    } else {
2687        tokens
2688    }
2689}
2690
2691#[cfg(test)]
2692mod tests;
2693
2694#[cfg(test)]
2695pub(crate) use tests::agent_tests;