Skip to main content

zeph_core/agent/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4mod agent_access_impl;
5pub(crate) mod agent_supervisor;
6mod autodream;
7mod builder;
8mod command_context_impls;
9pub(crate) mod compaction_strategy;
10pub(super) mod compression_feedback;
11mod context;
12mod context_impls;
13pub(crate) mod context_manager;
14mod corrections;
15pub mod error;
16mod experiment_cmd;
17pub(super) mod feedback_detector;
18pub(crate) mod focus;
19mod index;
20mod learning;
21pub(crate) mod learning_engine;
22mod log_commands;
23mod loop_event;
24mod lsp_commands;
25mod magic_docs;
26mod mcp;
27mod message_queue;
28mod microcompact;
29mod model_commands;
30mod persistence;
31#[cfg(feature = "scheduler")]
32mod plan;
33mod policy_commands;
34mod provider_cmd;
35pub(crate) mod rate_limiter;
36#[cfg(feature = "scheduler")]
37mod scheduler_commands;
38#[cfg(feature = "scheduler")]
39mod scheduler_loop;
40pub mod session_config;
41mod session_digest;
42pub(crate) mod sidequest;
43mod skill_management;
44pub mod slash_commands;
45pub(crate) mod state;
46pub(crate) mod tool_execution;
47pub(crate) mod tool_orchestrator;
48mod trust_commands;
49pub mod turn;
50mod utils;
51
52use std::collections::{HashMap, VecDeque};
53use std::sync::Arc;
54
55use parking_lot::RwLock;
56
57use tokio::sync::{mpsc, watch};
58use tokio_util::sync::CancellationToken;
59use zeph_llm::any::AnyProvider;
60use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
61use zeph_memory::TokenCounter;
62use zeph_memory::semantic::SemanticMemory;
63use zeph_skills::loader::Skill;
64use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
65use zeph_skills::prompt::format_skills_prompt;
66use zeph_skills::registry::SkillRegistry;
67use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
68
69use crate::channel::Channel;
70use crate::config::Config;
71use crate::context::{ContextBudget, build_system_prompt};
72use zeph_common::text::estimate_tokens;
73
74use loop_event::LoopEvent;
75use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, detect_image_mime};
76use state::CompressionState;
77use state::{
78    DebugState, ExperimentState, FeedbackState, IndexState, InstructionState, LifecycleState,
79    McpState, MemoryState, MessageState, MetricsState, OrchestrationState, ProviderState,
80    RuntimeConfig, SecurityState, SessionState, SkillState, ToolState,
81};
82
83pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
84pub(crate) const DOCUMENT_RAG_PREFIX: &str = "## Relevant documents\n";
85pub(crate) const RECALL_PREFIX: &str = "[semantic recall]\n";
86pub(crate) const CODE_CONTEXT_PREFIX: &str = "[code context]\n";
87pub(crate) const SUMMARY_PREFIX: &str = "[conversation summaries]\n";
88pub(crate) const CROSS_SESSION_PREFIX: &str = "[cross-session context]\n";
89pub(crate) const CORRECTIONS_PREFIX: &str = "[past corrections]\n";
90pub(crate) const GRAPH_FACTS_PREFIX: &str = "[known facts]\n";
91pub(crate) const SCHEDULED_TASK_PREFIX: &str = "Execute the following scheduled task now: ";
92pub(crate) const SESSION_DIGEST_PREFIX: &str = "[Session digest from previous interaction]\n";
93/// Prefix used for LSP context messages (`Role::System`) injected into message history.
94/// The tool-pair summarizer targets User/Assistant pairs and skips System messages,
95/// so these notes are never accidentally summarized. `remove_lsp_messages` uses this
96/// prefix to clear stale notes before each fresh injection.
97pub(crate) const LSP_NOTE_PREFIX: &str = "[lsp ";
98pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
99
100pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
101    use std::fmt::Write;
102    let capacity = "[tool output: ".len()
103        + tool_name.len()
104        + "]\n```\n".len()
105        + body.len()
106        + TOOL_OUTPUT_SUFFIX.len();
107    let mut buf = String::with_capacity(capacity);
108    let _ = write!(
109        buf,
110        "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
111    );
112    buf
113}
114
115/// Zeph agent: autonomous AI system with multi-model inference, semantic memory, skills,
116/// tool orchestration, and multi-channel I/O.
117///
118/// The agent maintains conversation history, manages LLM provider state, coordinates tool
119/// execution, and orchestrates memory and skill subsystems. It communicates with the outside
120/// world via the [`Channel`] trait, enabling support for CLI, Telegram, TUI, or custom I/O.
121///
122/// # Architecture
123///
124/// - **Message state**: Conversation history with system prompt, message queue, and metadata
125/// - **Memory state**: `SQLite` + Qdrant vector store for semantic search and compaction
126/// - **Skill state**: Registry, matching engine, and self-learning evolution
127/// - **Context manager**: Token budgeting, context assembly, and summarization
128/// - **Tool orchestrator**: DAG-based multi-tool execution with streaming output
129/// - **MCP client**: Multi-server support for Model Context Protocol
130/// - **Index state**: AST-based code indexing and semantic retrieval
131/// - **Security**: Sanitization, exfiltration detection, adversarial probes
132/// - **Metrics**: Token usage, latency, cost, and anomaly tracking
133///
134/// # Channel Contract
135///
136/// The agent requires a [`Channel`] implementation for user interaction:
137/// - Sends agent responses via `channel.send(message)`
138/// - Receives user input via `channel.recv()` / `channel.recv_internal()`
139/// - Supports structured events: tool invocations, tool output, streaming updates
140///
141/// # Lifecycle
142///
143/// 1. Create with [`Self::new`] or [`Self::new_with_registry_arc`]
144/// 2. Run main loop with [`Self::run`]
145/// 3. Clean up with [`Self::shutdown`] to persist state and close resources
146pub struct Agent<C: Channel> {
147    provider: AnyProvider,
148    /// Dedicated embedding provider. Resolved once at bootstrap from `[[llm.providers]]`
149    /// (the entry with `embed = true`, or first entry with `embedding_model` set).
150    /// Falls back to `provider.clone()` when no dedicated entry exists.
151    /// **Never replaced** by `/provider switch`.
152    embedding_provider: AnyProvider,
153    channel: C,
154    pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
155    pub(super) msg: MessageState,
156    pub(super) memory_state: MemoryState,
157    pub(super) skill_state: SkillState,
158    pub(super) context_manager: context_manager::ContextManager,
159    pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
160    pub(super) learning_engine: learning_engine::LearningEngine,
161    pub(super) feedback: FeedbackState,
162    pub(super) runtime: RuntimeConfig,
163    pub(super) mcp: McpState,
164    pub(super) index: IndexState,
165    pub(super) session: SessionState,
166    pub(super) debug_state: DebugState,
167    pub(super) instructions: InstructionState,
168    pub(super) security: SecurityState,
169    pub(super) experiments: ExperimentState,
170    pub(super) compression: CompressionState,
171    pub(super) lifecycle: LifecycleState,
172    pub(super) providers: ProviderState,
173    pub(super) metrics: MetricsState,
174    pub(super) orchestration: OrchestrationState,
175    /// Focus agent state: active session tracking, knowledge block, reminder counters (#1850).
176    pub(super) focus: focus::FocusState,
177    /// `SideQuest` state: cursor tracking, turn counter, eviction stats (#1885).
178    pub(super) sidequest: sidequest::SidequestState,
179    /// Tool filtering, dependency tracking, and iteration bookkeeping.
180    pub(super) tool_state: ToolState,
181}
182
183impl<C: Channel> Agent<C> {
184    /// Create a new agent instance with the given LLM provider, I/O channel, and subsystems.
185    ///
186    /// # Arguments
187    ///
188    /// * `provider` — Multi-model LLM provider (Claude, `OpenAI`, Ollama, Candle)
189    /// * `channel` — I/O abstraction for user interaction (CLI, Telegram, TUI, etc.)
190    /// * `registry` — Skill registry; moved into an internal `Arc<RwLock<_>>` for sharing
191    /// * `matcher` — Optional semantic skill matcher (e.g., Qdrant, BM25). If `None`,
192    ///   skills are matched by exact name only
193    /// * `max_active_skills` — Max concurrent skills in execution (must be > 0)
194    /// * `tool_executor` — Trait object for executing shell, web, and custom tools
195    ///
196    /// # Initialization
197    ///
198    /// The constructor:
199    /// 1. Wraps the skill registry into `Arc<RwLock<_>>` internally
200    /// 2. Builds the system prompt from registered skills
201    /// 3. Initializes all subsystems (memory, context manager, metrics, security)
202    /// 4. Returns a ready-to-run agent
203    ///
204    /// # Panics
205    ///
206    /// Panics if `max_active_skills` is 0.
207    #[must_use]
208    pub fn new(
209        provider: AnyProvider,
210        channel: C,
211        registry: SkillRegistry,
212        matcher: Option<SkillMatcherBackend>,
213        max_active_skills: usize,
214        tool_executor: impl ToolExecutor + 'static,
215    ) -> Self {
216        let registry = Arc::new(RwLock::new(registry));
217        let embedding_provider = provider.clone();
218        Self::new_with_registry_arc(
219            provider,
220            embedding_provider,
221            channel,
222            registry,
223            matcher,
224            max_active_skills,
225            tool_executor,
226        )
227    }
228
229    /// Create an agent from a pre-wrapped registry Arc, allowing the caller to
230    /// share the same Arc with other components (e.g. [`crate::SkillLoaderExecutor`]).
231    ///
232    /// # Panics
233    ///
234    /// Panics if the registry `RwLock` is poisoned.
235    #[must_use]
236    #[allow(clippy::too_many_lines)] // flat struct literal initializing all Agent sub-structs — one field per sub-struct, cannot be split further
237    pub fn new_with_registry_arc(
238        provider: AnyProvider,
239        embedding_provider: AnyProvider,
240        channel: C,
241        registry: Arc<RwLock<SkillRegistry>>,
242        matcher: Option<SkillMatcherBackend>,
243        max_active_skills: usize,
244        tool_executor: impl ToolExecutor + 'static,
245    ) -> Self {
246        debug_assert!(max_active_skills > 0, "max_active_skills must be > 0");
247        let all_skills: Vec<Skill> = {
248            let reg = registry.read();
249            reg.all_meta()
250                .iter()
251                .filter_map(|m| reg.get_skill(&m.name).ok())
252                .collect()
253        };
254        let empty_trust = HashMap::new();
255        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
256        let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
257        let system_prompt = build_system_prompt(&skills_prompt, None);
258        tracing::debug!(len = system_prompt.len(), "initial system prompt built");
259        tracing::trace!(prompt = %system_prompt, "full system prompt");
260
261        let initial_prompt_tokens = estimate_tokens(&system_prompt) as u64;
262        let token_counter = Arc::new(TokenCounter::new());
263        Self {
264            provider,
265            embedding_provider,
266            channel,
267            tool_executor: Arc::new(tool_executor),
268            msg: MessageState {
269                messages: vec![Message {
270                    role: Role::System,
271                    content: system_prompt,
272                    parts: vec![],
273                    metadata: MessageMetadata::default(),
274                }],
275                message_queue: VecDeque::new(),
276                pending_image_parts: Vec::new(),
277                last_persisted_message_id: None,
278                deferred_db_hide_ids: Vec::new(),
279                deferred_db_summaries: Vec::new(),
280            },
281            memory_state: MemoryState::default(),
282            skill_state: SkillState::new(registry, matcher, max_active_skills, skills_prompt),
283            context_manager: context_manager::ContextManager::new(),
284            tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
285            learning_engine: learning_engine::LearningEngine::new(),
286            feedback: FeedbackState::default(),
287            debug_state: DebugState::default(),
288            runtime: RuntimeConfig::default(),
289            mcp: McpState::default(),
290            index: IndexState::default(),
291            session: SessionState::new(),
292            instructions: InstructionState::default(),
293            security: SecurityState::default(),
294            experiments: ExperimentState::new(),
295            compression: CompressionState::default(),
296            lifecycle: LifecycleState::new(),
297            providers: ProviderState::new(initial_prompt_tokens),
298            metrics: MetricsState::new(token_counter),
299            orchestration: OrchestrationState::default(),
300            focus: focus::FocusState::default(),
301            sidequest: sidequest::SidequestState::default(),
302            tool_state: ToolState::default(),
303        }
304    }
305
306    /// Poll all active sub-agents for completed/failed/canceled results.
307    ///
308    /// Non-blocking: returns immediately with a list of `(task_id, result)` pairs
309    /// for agents that have finished. Each completed agent is removed from the manager.
310    pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
311        let Some(mgr) = &mut self.orchestration.subagent_manager else {
312            return vec![];
313        };
314
315        let finished: Vec<String> = mgr
316            .statuses()
317            .into_iter()
318            .filter_map(|(id, status)| {
319                if matches!(
320                    status.state,
321                    zeph_subagent::SubAgentState::Completed
322                        | zeph_subagent::SubAgentState::Failed
323                        | zeph_subagent::SubAgentState::Canceled
324                ) {
325                    Some(id)
326                } else {
327                    None
328                }
329            })
330            .collect();
331
332        let mut results = vec![];
333        for task_id in finished {
334            match mgr.collect(&task_id).await {
335                Ok(result) => results.push((task_id, result)),
336                Err(e) => {
337                    tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
338                }
339            }
340        }
341        results
342    }
343
344    /// Call the LLM to generate a structured session summary with a configurable timeout.
345    ///
346    /// Falls back to plain-text chat if structured output fails or times out. Returns `None` on
347    /// any failure, logging a warning — callers must treat `None` as "skip storage".
348    ///
349    /// Each LLM attempt is bounded by `shutdown_summary_timeout_secs`; in the worst case
350    /// (structured call times out and plain-text fallback also times out) this adds up to
351    /// `2 * shutdown_summary_timeout_secs` of shutdown latency.
352    async fn call_llm_for_session_summary(
353        &self,
354        chat_messages: &[Message],
355    ) -> Option<zeph_memory::StructuredSummary> {
356        let timeout_dur = std::time::Duration::from_secs(
357            self.memory_state.compaction.shutdown_summary_timeout_secs,
358        );
359        match tokio::time::timeout(
360            timeout_dur,
361            self.provider
362                .chat_typed_erased::<zeph_memory::StructuredSummary>(chat_messages),
363        )
364        .await
365        {
366            Ok(Ok(s)) => Some(s),
367            Ok(Err(e)) => {
368                tracing::warn!(
369                    "shutdown summary: structured LLM call failed, falling back to plain: {e:#}"
370                );
371                self.plain_text_summary_fallback(chat_messages, timeout_dur)
372                    .await
373            }
374            Err(_) => {
375                tracing::warn!(
376                    "shutdown summary: structured LLM call timed out after {}s, falling back to plain",
377                    self.memory_state.compaction.shutdown_summary_timeout_secs
378                );
379                self.plain_text_summary_fallback(chat_messages, timeout_dur)
380                    .await
381            }
382        }
383    }
384
385    async fn plain_text_summary_fallback(
386        &self,
387        chat_messages: &[Message],
388        timeout_dur: std::time::Duration,
389    ) -> Option<zeph_memory::StructuredSummary> {
390        match tokio::time::timeout(timeout_dur, self.provider.chat(chat_messages)).await {
391            Ok(Ok(plain)) => Some(zeph_memory::StructuredSummary {
392                summary: plain,
393                key_facts: vec![],
394                entities: vec![],
395            }),
396            Ok(Err(e)) => {
397                tracing::warn!("shutdown summary: plain LLM fallback failed: {e:#}");
398                None
399            }
400            Err(_) => {
401                tracing::warn!("shutdown summary: plain LLM fallback timed out");
402                None
403            }
404        }
405    }
406
407    /// Persist tombstone `ToolResult` messages for any assistant `ToolUse` parts that were written
408    /// to the DB during this session but never paired with a `ToolResult` (e.g. because stdin
409    /// closed while tool execution was in progress). Without this the next session startup strips
410    /// those assistant messages and emits orphan warnings.
411    async fn flush_orphaned_tool_use_on_shutdown(&mut self) {
412        use zeph_llm::provider::{MessagePart, Role};
413
414        // Walk messages in reverse: if the last assistant message (ignoring any trailing
415        // system messages) has ToolUse parts and is NOT immediately followed by a user
416        // message whose ToolResult ids cover those ToolUse ids, persist tombstones.
417        let msgs = &self.msg.messages;
418        // Find last assistant message index.
419        let Some(asst_idx) = msgs.iter().rposition(|m| m.role == Role::Assistant) else {
420            return;
421        };
422        let asst_msg = &msgs[asst_idx];
423        let tool_use_ids: Vec<(&str, &str, &serde_json::Value)> = asst_msg
424            .parts
425            .iter()
426            .filter_map(|p| {
427                if let MessagePart::ToolUse { id, name, input } = p {
428                    Some((id.as_str(), name.as_str(), input))
429                } else {
430                    None
431                }
432            })
433            .collect();
434        if tool_use_ids.is_empty() {
435            return;
436        }
437
438        // Check whether a following user message already pairs all ToolUse ids.
439        let paired_ids: std::collections::HashSet<&str> = msgs
440            .get(asst_idx + 1..)
441            .into_iter()
442            .flatten()
443            .filter(|m| m.role == Role::User)
444            .flat_map(|m| m.parts.iter())
445            .filter_map(|p| {
446                if let MessagePart::ToolResult { tool_use_id, .. } = p {
447                    Some(tool_use_id.as_str())
448                } else {
449                    None
450                }
451            })
452            .collect();
453
454        let unpaired: Vec<zeph_llm::provider::ToolUseRequest> = tool_use_ids
455            .iter()
456            .filter(|(id, _, _)| !paired_ids.contains(*id))
457            .map(|(id, name, input)| zeph_llm::provider::ToolUseRequest {
458                id: (*id).to_owned(),
459                name: (*name).to_owned().into(),
460                input: (*input).clone(),
461            })
462            .collect();
463
464        if unpaired.is_empty() {
465            return;
466        }
467
468        tracing::info!(
469            count = unpaired.len(),
470            "shutdown: persisting tombstone ToolResults for unpaired in-flight tool calls"
471        );
472        self.persist_cancelled_tool_results(&unpaired).await;
473    }
474
475    /// Generate and store a lightweight session summary at shutdown when no hard compaction fired.
476    ///
477    /// Guards:
478    /// - `shutdown_summary` config must be enabled
479    /// - `conversation_id` must be set (memory must be attached)
480    /// - no existing session summary in the store (primary guard — resilient to failed Qdrant writes)
481    /// - at least `shutdown_summary_min_messages` user-turn messages in history
482    ///
483    /// All errors are logged as warnings and swallowed — shutdown must never fail.
484    async fn maybe_store_shutdown_summary(&mut self) {
485        if !self.memory_state.compaction.shutdown_summary {
486            return;
487        }
488        let Some(memory) = self.memory_state.persistence.memory.clone() else {
489            return;
490        };
491        let Some(conversation_id) = self.memory_state.persistence.conversation_id else {
492            return;
493        };
494
495        // Primary guard: check if a summary already exists (handles failed Qdrant writes too).
496        match memory.has_session_summary(conversation_id).await {
497            Ok(true) => {
498                tracing::debug!("shutdown summary: session already has a summary, skipping");
499                return;
500            }
501            Ok(false) => {}
502            Err(e) => {
503                tracing::warn!("shutdown summary: failed to check existing summary: {e:#}");
504                return;
505            }
506        }
507
508        // Count user-turn messages only (skip system prompt at index 0).
509        let user_count = self
510            .msg
511            .messages
512            .iter()
513            .skip(1)
514            .filter(|m| m.role == Role::User)
515            .count();
516        if user_count < self.memory_state.compaction.shutdown_summary_min_messages {
517            tracing::debug!(
518                user_count,
519                min = self.memory_state.compaction.shutdown_summary_min_messages,
520                "shutdown summary: too few user messages, skipping"
521            );
522            return;
523        }
524
525        // TUI status — send errors silently ignored (TUI may already be gone at shutdown).
526        let _ = self.channel.send_status("Saving session summary...").await;
527
528        // Collect last N messages (skip system prompt at index 0).
529        let max = self.memory_state.compaction.shutdown_summary_max_messages;
530        if max == 0 {
531            tracing::debug!("shutdown summary: max_messages=0, skipping");
532            return;
533        }
534        let non_system: Vec<_> = self.msg.messages.iter().skip(1).collect();
535        let slice = if non_system.len() > max {
536            &non_system[non_system.len() - max..]
537        } else {
538            &non_system[..]
539        };
540
541        let msgs_for_prompt: Vec<(zeph_memory::MessageId, String, String)> = slice
542            .iter()
543            .map(|m| {
544                let role = match m.role {
545                    Role::User => "user".to_owned(),
546                    Role::Assistant => "assistant".to_owned(),
547                    Role::System => "system".to_owned(),
548                };
549                (zeph_memory::MessageId(0), role, m.content.clone())
550            })
551            .collect();
552
553        let prompt = zeph_memory::build_summarization_prompt(&msgs_for_prompt);
554        let chat_messages = vec![Message {
555            role: Role::User,
556            content: prompt,
557            parts: vec![],
558            metadata: MessageMetadata::default(),
559        }];
560
561        let Some(structured) = self.call_llm_for_session_summary(&chat_messages).await else {
562            let _ = self.channel.send_status("").await;
563            return;
564        };
565
566        if let Err(e) = memory
567            .store_shutdown_summary(conversation_id, &structured.summary, &structured.key_facts)
568            .await
569        {
570            tracing::warn!("shutdown summary: storage failed: {e:#}");
571        } else {
572            tracing::info!(
573                conversation_id = conversation_id.0,
574                "shutdown summary stored"
575            );
576        }
577
578        // Clear TUI status.
579        let _ = self.channel.send_status("").await;
580    }
581
582    /// Gracefully shut down the agent and persist state.
583    ///
584    /// Performs the following cleanup:
585    ///
586    /// 1. **Message persistence** — Deferred database writes (hide/summary operations)
587    ///    are flushed to memory or disk
588    /// 2. **Provider state** — LLM router state (e.g., Thompson sampling counters) is saved
589    ///    to the vault
590    /// 3. **Sub-agents** — All active sub-agent tasks are terminated
591    /// 4. **MCP servers** — All connected Model Context Protocol servers are shut down
592    /// 5. **Metrics finalization** — Compaction metrics and session metrics are recorded
593    /// 6. **Memory finalization** — Vector stores and semantic indices are flushed
594    /// 7. **Skill state** — Self-learning engine saves evolved skill definitions
595    ///
596    /// Call this before dropping the agent to ensure no data loss.
597    pub async fn shutdown(&mut self) {
598        self.channel.send("Shutting down...").await.ok();
599
600        // CRIT-1: persist Thompson state accumulated during this session.
601        self.provider.save_router_state();
602
603        if let Some(ref mut mgr) = self.orchestration.subagent_manager {
604            mgr.shutdown_all();
605        }
606
607        if let Some(ref manager) = self.mcp.manager {
608            manager.shutdown_all_shared().await;
609        }
610
611        // Finalize compaction trajectory: push the last open segment into the Vec.
612        // This segment would otherwise only be pushed when the next hard compaction fires,
613        // which never happens at session end.
614        if let Some(turns) = self.context_manager.turns_since_last_hard_compaction {
615            self.update_metrics(|m| {
616                m.compaction_turns_after_hard.push(turns);
617            });
618            self.context_manager.turns_since_last_hard_compaction = None;
619        }
620
621        if let Some(ref tx) = self.metrics.metrics_tx {
622            let m = tx.borrow();
623            if m.filter_applications > 0 {
624                #[allow(clippy::cast_precision_loss)]
625                let pct = if m.filter_raw_tokens > 0 {
626                    m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
627                } else {
628                    0.0
629                };
630                tracing::info!(
631                    raw_tokens = m.filter_raw_tokens,
632                    saved_tokens = m.filter_saved_tokens,
633                    applications = m.filter_applications,
634                    "tool output filtering saved ~{} tokens ({pct:.0}%)",
635                    m.filter_saved_tokens,
636                );
637            }
638            if m.compaction_hard_count > 0 {
639                tracing::info!(
640                    hard_compactions = m.compaction_hard_count,
641                    turns_after_hard = ?m.compaction_turns_after_hard,
642                    "hard compaction trajectory"
643                );
644            }
645        }
646
647        // Flush tombstone ToolResults for any assistant ToolUse that was persisted but never
648        // paired with a ToolResult (e.g. stdin EOF mid-execution). Without this the next session
649        // startup strips the orphaned ToolUse and emits warnings.
650        self.flush_orphaned_tool_use_on_shutdown().await;
651
652        self.lifecycle.supervisor.abort_all();
653
654        self.maybe_store_shutdown_summary().await;
655        self.maybe_store_session_digest().await;
656
657        tracing::info!("agent shutdown complete");
658    }
659
660    /// Run the chat loop, receiving messages via the channel until EOF or shutdown.
661    ///
662    /// # Errors
663    ///
664    /// Returns an error if channel I/O or LLM communication fails.
665    /// Refresh sub-agent metrics snapshot for the TUI metrics panel.
666    fn refresh_subagent_metrics(&mut self) {
667        let Some(ref mgr) = self.orchestration.subagent_manager else {
668            return;
669        };
670        let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
671            .statuses()
672            .into_iter()
673            .map(|(id, s)| {
674                let def = mgr.agents_def(&id);
675                crate::metrics::SubAgentMetrics {
676                    name: def.map_or_else(|| id[..8.min(id.len())].to_owned(), |d| d.name.clone()),
677                    id: id.clone(),
678                    state: format!("{:?}", s.state).to_lowercase(),
679                    turns_used: s.turns_used,
680                    max_turns: def.map_or(20, |d| d.permissions.max_turns),
681                    background: def.is_some_and(|d| d.permissions.background),
682                    elapsed_secs: s.started_at.elapsed().as_secs(),
683                    permission_mode: def.map_or_else(String::new, |d| {
684                        use zeph_subagent::def::PermissionMode;
685                        match d.permissions.permission_mode {
686                            PermissionMode::Default => String::new(),
687                            PermissionMode::AcceptEdits => "accept_edits".into(),
688                            PermissionMode::DontAsk => "dont_ask".into(),
689                            PermissionMode::BypassPermissions => "bypass_permissions".into(),
690                            PermissionMode::Plan => "plan".into(),
691                        }
692                    }),
693                    transcript_dir: mgr
694                        .agent_transcript_dir(&id)
695                        .map(|p| p.to_string_lossy().into_owned()),
696                }
697            })
698            .collect();
699        self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
700    }
701
702    /// Non-blocking poll: notify the user when background sub-agents complete.
703    async fn notify_completed_subagents(&mut self) -> Result<(), error::AgentError> {
704        let completed = self.poll_subagents().await;
705        for (task_id, result) in completed {
706            let notice = if result.is_empty() {
707                format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
708            } else {
709                format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
710            };
711            if let Err(e) = self.channel.send(&notice).await {
712                tracing::warn!(error = %e, "failed to send sub-agent completion notice");
713            }
714        }
715        Ok(())
716    }
717
718    /// Run the agent main loop.
719    ///
720    /// # Errors
721    ///
722    /// Returns an error if the channel, LLM provider, or tool execution encounters a fatal error.
723    #[allow(clippy::too_many_lines)] // run loop is inherently large; each branch is independent
724    pub async fn run(&mut self) -> Result<(), error::AgentError>
725    where
726        C: 'static,
727    {
728        if let Some(mut rx) = self.lifecycle.warmup_ready.take()
729            && !*rx.borrow()
730        {
731            let _ = rx.changed().await;
732            if !*rx.borrow() {
733                tracing::warn!("model warmup did not complete successfully");
734            }
735        }
736
737        // Load the session digest once at session start for context injection.
738        self.load_and_cache_session_digest().await;
739
740        loop {
741            self.apply_provider_override();
742            self.check_tool_refresh().await;
743            self.process_pending_elicitations().await;
744            self.refresh_subagent_metrics();
745            self.notify_completed_subagents().await?;
746            self.drain_channel();
747
748            let (text, image_parts) = if let Some(queued) = self.msg.message_queue.pop_front() {
749                self.notify_queue_count().await;
750                if queued.raw_attachments.is_empty() {
751                    (queued.text, queued.image_parts)
752                } else {
753                    let msg = crate::channel::ChannelMessage {
754                        text: queued.text,
755                        attachments: queued.raw_attachments,
756                    };
757                    self.resolve_message(msg).await
758                }
759            } else {
760                match self.next_event().await? {
761                    None | Some(LoopEvent::Shutdown) => break,
762                    Some(LoopEvent::SkillReload) => {
763                        self.reload_skills().await;
764                        continue;
765                    }
766                    Some(LoopEvent::InstructionReload) => {
767                        self.reload_instructions();
768                        continue;
769                    }
770                    Some(LoopEvent::ConfigReload) => {
771                        self.reload_config();
772                        continue;
773                    }
774                    Some(LoopEvent::UpdateNotification(msg)) => {
775                        if let Err(e) = self.channel.send(&msg).await {
776                            tracing::warn!("failed to send update notification: {e}");
777                        }
778                        continue;
779                    }
780                    Some(LoopEvent::ExperimentCompleted(msg)) => {
781                        self.experiments.cancel = None;
782                        if let Err(e) = self.channel.send(&msg).await {
783                            tracing::warn!("failed to send experiment completion: {e}");
784                        }
785                        continue;
786                    }
787                    Some(LoopEvent::ScheduledTask(prompt)) => {
788                        let text = format!("{SCHEDULED_TASK_PREFIX}{prompt}");
789                        let msg = crate::channel::ChannelMessage {
790                            text,
791                            attachments: Vec::new(),
792                        };
793                        self.drain_channel();
794                        self.resolve_message(msg).await
795                    }
796                    Some(LoopEvent::FileChanged(event)) => {
797                        self.handle_file_changed(event).await;
798                        continue;
799                    }
800                    Some(LoopEvent::Message(msg)) => {
801                        self.drain_channel();
802                        self.resolve_message(msg).await
803                    }
804                }
805            };
806
807            let trimmed = text.trim();
808
809            // M3: extract flagged URLs from all slash commands before any registry dispatch,
810            // so `/skill install <url>` and similar commands populate user_provided_urls.
811            if trimmed.starts_with('/') {
812                let slash_urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
813                if !slash_urls.is_empty() {
814                    self.security.user_provided_urls.write().extend(slash_urls);
815                }
816            }
817
818            // Registry dispatch: build the command registry, construct CommandContext, dispatch.
819            // The registry is built inline (all handlers are ZSTs) to avoid borrow-checker
820            // conflicts when constructing CommandContext from Agent<C> fields.
821            // Build context first so that borrows outlive the registry (drop order matters).
822            let session_impl = command_context_impls::SessionAccessImpl {
823                supports_exit: self.channel.supports_exit(),
824            };
825            let mut messages_impl = command_context_impls::MessageAccessImpl {
826                msg: &mut self.msg,
827                tool_state: &mut self.tool_state,
828                providers: &mut self.providers,
829                metrics: &self.metrics,
830                security: &mut self.security,
831                tool_orchestrator: &mut self.tool_orchestrator,
832            };
833            // sink_adapter declared before reg so it is dropped after reg (LIFO).
834            let mut sink_adapter = crate::channel::ChannelSinkAdapter(&mut self.channel);
835            // null_agent must be declared before reg so it lives longer (LIFO drop order).
836            let mut null_agent = zeph_commands::NullAgent;
837            let registry_handled = {
838                use zeph_commands::CommandRegistry;
839                use zeph_commands::handlers::debug::{
840                    DebugDumpCommand, DumpFormatCommand, LogCommand,
841                };
842                use zeph_commands::handlers::help::HelpCommand;
843                use zeph_commands::handlers::session::{
844                    ClearCommand, ClearQueueCommand, ExitCommand, QuitCommand, ResetCommand,
845                };
846
847                let mut reg = CommandRegistry::new();
848                reg.register(ExitCommand);
849                reg.register(QuitCommand);
850                reg.register(ClearCommand);
851                reg.register(ResetCommand);
852                reg.register(ClearQueueCommand);
853                reg.register(LogCommand);
854                reg.register(DebugDumpCommand);
855                reg.register(DumpFormatCommand);
856                reg.register(HelpCommand);
857
858                let mut ctx = zeph_commands::CommandContext {
859                    sink: &mut sink_adapter,
860                    debug: &mut self.debug_state,
861                    messages: &mut messages_impl,
862                    session: &session_impl,
863                    agent: &mut null_agent,
864                };
865                reg.dispatch(&mut ctx, trimmed).await
866            };
867            match registry_handled {
868                Some(Ok(zeph_commands::CommandOutput::Exit)) => {
869                    let _ = self.channel.flush_chunks().await;
870                    break;
871                }
872                Some(Ok(
873                    zeph_commands::CommandOutput::Continue | zeph_commands::CommandOutput::Silent,
874                )) => {
875                    let _ = self.channel.flush_chunks().await;
876                    continue;
877                }
878                Some(Ok(zeph_commands::CommandOutput::Message(msg))) => {
879                    let _ = self.channel.send(&msg).await;
880                    let _ = self.channel.flush_chunks().await;
881                    continue;
882                }
883                Some(Err(e)) => return Err(error::AgentError::Other(e.0)),
884                None => {
885                    // Not handled by the session/debug registry; try agent-command registry.
886                }
887            }
888
889            // Agent-command registry: handlers access Agent<C> directly.
890            // Null sentinels declared here so they outlive ctx regardless of whether the `if`
891            // block is entered. `ctx` borrows both `self` and the sentinels; it must drop before
892            // any subsequent `self.channel.*` calls. Because Rust drops in LIFO order, the
893            // sentinels here will outlive ctx (ctx is declared later, inside the block).
894            let mut agent_null_debug = command_context_impls::NullDebugAccess;
895            let mut agent_null_messages = command_context_impls::NullMessageAccess;
896            let agent_null_session = command_context_impls::NullSessionAccess;
897            let mut agent_null_sink = zeph_commands::NullSink;
898            let agent_result: Option<
899                Result<zeph_commands::CommandOutput, zeph_commands::CommandError>,
900            > = if registry_handled.is_none() {
901                use zeph_commands::CommandRegistry;
902                use zeph_commands::handlers::{
903                    agent_cmd::AgentCommand,
904                    compaction::{CompactCommand, NewConversationCommand},
905                    experiment::ExperimentCommand,
906                    lsp::LspCommand,
907                    mcp::McpCommand,
908                    memory::{GraphCommand, GuidelinesCommand, MemoryCommand},
909                    misc::{CacheStatsCommand, ImageCommand},
910                    model::{ModelCommand, ProviderCommand},
911                    plan::PlanCommand,
912                    policy::PolicyCommand,
913                    scheduler::SchedulerCommand,
914                    skill::{FeedbackCommand, SkillCommand, SkillsCommand},
915                    status::{FocusCommand, GuardrailCommand, SideQuestCommand, StatusCommand},
916                };
917
918                let mut agent_reg = CommandRegistry::new();
919                agent_reg.register(MemoryCommand);
920                agent_reg.register(GraphCommand);
921                agent_reg.register(GuidelinesCommand);
922                agent_reg.register(ModelCommand);
923                agent_reg.register(ProviderCommand);
924                // Phase 6 migrations: /skill, /skills, /feedback use clone-before-await pattern.
925                agent_reg.register(SkillCommand);
926                agent_reg.register(SkillsCommand);
927                agent_reg.register(FeedbackCommand);
928                agent_reg.register(McpCommand);
929                agent_reg.register(PolicyCommand);
930                agent_reg.register(SchedulerCommand);
931                agent_reg.register(LspCommand);
932                // Phase 4 migrations (Send-safe commands):
933                agent_reg.register(CacheStatsCommand);
934                agent_reg.register(ImageCommand);
935                agent_reg.register(StatusCommand);
936                agent_reg.register(GuardrailCommand);
937                agent_reg.register(FocusCommand);
938                agent_reg.register(SideQuestCommand);
939                agent_reg.register(AgentCommand);
940                // Phase 5 migrations (Send-compatible):
941                agent_reg.register(CompactCommand);
942                agent_reg.register(NewConversationCommand);
943                agent_reg.register(ExperimentCommand);
944                agent_reg.register(PlanCommand);
945
946                let mut ctx = zeph_commands::CommandContext {
947                    sink: &mut agent_null_sink,
948                    debug: &mut agent_null_debug,
949                    messages: &mut agent_null_messages,
950                    session: &agent_null_session,
951                    agent: self,
952                };
953                // self is reborrowed; ctx drops at end of this block.
954                agent_reg.dispatch(&mut ctx, trimmed).await
955            } else {
956                None
957            };
958            // self.channel is available again here (ctx borrow dropped above).
959            match agent_result {
960                Some(Ok(zeph_commands::CommandOutput::Exit)) => {
961                    let _ = self.channel.flush_chunks().await;
962                    break;
963                }
964                Some(Ok(
965                    zeph_commands::CommandOutput::Continue | zeph_commands::CommandOutput::Silent,
966                )) => {
967                    let _ = self.channel.flush_chunks().await;
968                    continue;
969                }
970                Some(Ok(zeph_commands::CommandOutput::Message(msg))) => {
971                    let _ = self.channel.send(&msg).await;
972                    let _ = self.channel.flush_chunks().await;
973                    // Post-dispatch learning hook: trigger generate_improved_skill for
974                    // `/skill reject` and `/feedback` commands. These calls require &mut self
975                    // and cannot be placed inside the Send future in agent_access_impl.rs.
976                    self.maybe_trigger_post_command_learning(trimmed).await;
977                    continue;
978                }
979                Some(Err(e)) => return Err(error::AgentError::Other(e.0)),
980                None => {
981                    // Not handled by agent registry; fall through to existing dispatch.
982                }
983            }
984
985            match self.handle_builtin_command(trimmed) {
986                Some(true) => break,
987                Some(false) => continue,
988                None => {}
989            }
990
991            self.process_user_message(text, image_parts).await?;
992        }
993
994        // autoDream: run background memory consolidation if conditions are met (#2697).
995        // Runs with a timeout — partial state is acceptable for MVP.
996        self.maybe_autodream().await;
997
998        // Flush trace collector on normal exit (C-04: Drop handles error/panic paths).
999        if let Some(ref mut tc) = self.debug_state.trace_collector {
1000            tc.finish();
1001        }
1002
1003        Ok(())
1004    }
1005
1006    /// Apply any pending LLM provider override from ACP `set_session_config_option`.
1007    fn apply_provider_override(&mut self) {
1008        if let Some(ref slot) = self.providers.provider_override
1009            && let Some(new_provider) = slot.write().take()
1010        {
1011            tracing::debug!(provider = new_provider.name(), "ACP model override applied");
1012            self.provider = new_provider;
1013        }
1014    }
1015
1016    /// Poll all event sources and return the next [`LoopEvent`].
1017    ///
1018    /// Returns `None` when the inbound channel closes (graceful shutdown).
1019    ///
1020    /// # Errors
1021    ///
1022    /// Propagates channel receive errors.
1023    async fn next_event(&mut self) -> Result<Option<LoopEvent>, error::AgentError> {
1024        let event = tokio::select! {
1025            result = self.channel.recv() => {
1026                return Ok(result?.map(LoopEvent::Message));
1027            }
1028            () = shutdown_signal(&mut self.lifecycle.shutdown) => {
1029                tracing::info!("shutting down");
1030                LoopEvent::Shutdown
1031            }
1032            Some(_) = recv_optional(&mut self.skill_state.skill_reload_rx) => {
1033                LoopEvent::SkillReload
1034            }
1035            Some(_) = recv_optional(&mut self.instructions.reload_rx) => {
1036                LoopEvent::InstructionReload
1037            }
1038            Some(_) = recv_optional(&mut self.lifecycle.config_reload_rx) => {
1039                LoopEvent::ConfigReload
1040            }
1041            Some(msg) = recv_optional(&mut self.lifecycle.update_notify_rx) => {
1042                LoopEvent::UpdateNotification(msg)
1043            }
1044            Some(msg) = recv_optional(&mut self.experiments.notify_rx) => {
1045                LoopEvent::ExperimentCompleted(msg)
1046            }
1047            Some(prompt) = recv_optional(&mut self.lifecycle.custom_task_rx) => {
1048                tracing::info!("scheduler: injecting custom task as agent turn");
1049                LoopEvent::ScheduledTask(prompt)
1050            }
1051            Some(event) = recv_optional(&mut self.lifecycle.file_changed_rx) => {
1052                LoopEvent::FileChanged(event)
1053            }
1054        };
1055        Ok(Some(event))
1056    }
1057
1058    async fn resolve_message(
1059        &self,
1060        msg: crate::channel::ChannelMessage,
1061    ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
1062        use crate::channel::{Attachment, AttachmentKind};
1063        use zeph_llm::provider::{ImageData, MessagePart};
1064
1065        let text_base = msg.text.clone();
1066
1067        let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
1068            .attachments
1069            .into_iter()
1070            .partition(|a| a.kind == AttachmentKind::Audio);
1071
1072        tracing::debug!(
1073            audio = audio_attachments.len(),
1074            has_stt = self.providers.stt.is_some(),
1075            "resolve_message attachments"
1076        );
1077
1078        let text = if !audio_attachments.is_empty()
1079            && let Some(stt) = self.providers.stt.as_ref()
1080        {
1081            let mut transcribed_parts = Vec::new();
1082            for attachment in &audio_attachments {
1083                if attachment.data.len() > MAX_AUDIO_BYTES {
1084                    tracing::warn!(
1085                        size = attachment.data.len(),
1086                        max = MAX_AUDIO_BYTES,
1087                        "audio attachment exceeds size limit, skipping"
1088                    );
1089                    continue;
1090                }
1091                match stt
1092                    .transcribe(&attachment.data, attachment.filename.as_deref())
1093                    .await
1094                {
1095                    Ok(result) => {
1096                        tracing::info!(
1097                            len = result.text.len(),
1098                            language = ?result.language,
1099                            "audio transcribed"
1100                        );
1101                        transcribed_parts.push(result.text);
1102                    }
1103                    Err(e) => {
1104                        tracing::error!(error = %e, "audio transcription failed");
1105                    }
1106                }
1107            }
1108            if transcribed_parts.is_empty() {
1109                text_base
1110            } else {
1111                let transcribed = transcribed_parts.join("\n");
1112                if text_base.is_empty() {
1113                    transcribed
1114                } else {
1115                    format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
1116                }
1117            }
1118        } else {
1119            if !audio_attachments.is_empty() {
1120                tracing::warn!(
1121                    count = audio_attachments.len(),
1122                    "audio attachments received but no STT provider configured, dropping"
1123                );
1124            }
1125            text_base
1126        };
1127
1128        let mut image_parts = Vec::new();
1129        for attachment in image_attachments {
1130            if attachment.data.len() > MAX_IMAGE_BYTES {
1131                tracing::warn!(
1132                    size = attachment.data.len(),
1133                    max = MAX_IMAGE_BYTES,
1134                    "image attachment exceeds size limit, skipping"
1135                );
1136                continue;
1137            }
1138            let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
1139            image_parts.push(MessagePart::Image(Box::new(ImageData {
1140                data: attachment.data,
1141                mime_type,
1142            })));
1143        }
1144
1145        (text, image_parts)
1146    }
1147
1148    /// Create a new [`Turn`] for the given input and advance the turn counter.
1149    ///
1150    /// Clears per-turn state that must not carry over between turns:
1151    /// - per-turn `CancellationToken` (new token for each turn)
1152    /// - per-turn URL set in `SecurityState` (cleared here; re-populated in
1153    ///   `process_user_message_inner` after security checks)
1154    fn begin_turn(&mut self, input: turn::TurnInput) -> turn::Turn {
1155        let id = turn::TurnId(self.debug_state.iteration_counter as u64);
1156        self.debug_state.iteration_counter += 1;
1157        self.lifecycle.cancel_token = CancellationToken::new();
1158        self.security.user_provided_urls.write().clear();
1159        turn::Turn::new(id, input)
1160    }
1161
1162    /// Finalise a turn: copy accumulated timings into `MetricsState` and flush.
1163    ///
1164    /// Must be called exactly once per turn, after `process_user_message_inner` returns
1165    /// (regardless of success or error). Corresponds to the M2 resolution in the spec:
1166    /// `TurnMetrics.timings` is the single source of truth; `MetricsState.pending_timings`
1167    /// is populated from it here so the rest of the pipeline is unchanged.
1168    fn end_turn(&mut self, turn: turn::Turn) {
1169        self.metrics.pending_timings = turn.metrics.timings;
1170        self.flush_turn_timings();
1171    }
1172
1173    #[cfg_attr(
1174        feature = "profiling",
1175        tracing::instrument(name = "agent.turn", skip_all, fields(turn_id))
1176    )]
1177    async fn process_user_message(
1178        &mut self,
1179        text: String,
1180        image_parts: Vec<zeph_llm::provider::MessagePart>,
1181    ) -> Result<(), error::AgentError> {
1182        let input = turn::TurnInput::new(text, image_parts);
1183        let mut t = self.begin_turn(input);
1184
1185        let turn_idx = usize::try_from(t.id().0).unwrap_or(usize::MAX);
1186        tracing::Span::current().record("turn_id", t.id().0);
1187        // Record iteration start in trace collector (C-02: owned guard, no borrow held).
1188        self.debug_state
1189            .start_iteration_span(turn_idx, t.input.text.trim());
1190
1191        let result = self.process_user_message_inner(&mut t).await;
1192
1193        // Close iteration span regardless of outcome (partial trace preserved on error).
1194        let span_status = if result.is_ok() {
1195            crate::debug_dump::trace::SpanStatus::Ok
1196        } else {
1197            crate::debug_dump::trace::SpanStatus::Error {
1198                message: "iteration failed".to_owned(),
1199            }
1200        };
1201        self.debug_state.end_iteration_span(turn_idx, span_status);
1202
1203        self.end_turn(t);
1204        result
1205    }
1206
1207    async fn process_user_message_inner(
1208        &mut self,
1209        turn: &mut turn::Turn,
1210    ) -> Result<(), error::AgentError> {
1211        // Reap completed background tasks from the previous turn. The summarization signal
1212        // is applied here — between turns — so `unsummarized_count` is always reset on the
1213        // foreground without shared mutable state across tasks (S1 fix from critic review).
1214        let bg_signal = self.lifecycle.supervisor.reap();
1215        if bg_signal.did_summarize {
1216            self.memory_state.persistence.unsummarized_count = 0;
1217            tracing::debug!("background summarization completed; unsummarized_count reset");
1218        }
1219        {
1220            let snap = self.lifecycle.supervisor.metrics_snapshot();
1221            self.update_metrics(|m| {
1222                m.bg_inflight = snap.inflight as u64;
1223                m.bg_dropped = snap.total_dropped();
1224                m.bg_completed = snap.total_completed();
1225                m.bg_enrichment_inflight = snap.class_inflight[0] as u64;
1226                m.bg_telemetry_inflight = snap.class_inflight[1] as u64;
1227            });
1228        }
1229
1230        // Intentional ordering: reap() runs before abort_class() so completed tasks are
1231        // accounted in the metrics snapshot above. The TUI may show a stale enrichment
1232        // inflight count for one cycle when abort fires, but this self-corrects next turn.
1233        if self.runtime.supervisor_config.abort_enrichment_on_turn {
1234            self.lifecycle
1235                .supervisor
1236                .abort_class(agent_supervisor::TaskClass::Enrichment);
1237        }
1238
1239        // Wire the per-turn cancellation token into the cancel bridge.
1240        // The bridge translates the `cancel_signal` (Notify) into a CancellationToken cancel.
1241        // Abort the previous bridge before spawning to prevent unbounded accumulation (#2737).
1242        let signal = Arc::clone(&self.lifecycle.cancel_signal);
1243        let token = turn.cancel_token.clone();
1244        // Keep lifecycle.cancel_token in sync so existing code that reads it still works.
1245        self.lifecycle.cancel_token = turn.cancel_token.clone();
1246        if let Some(prev) = self.lifecycle.cancel_bridge_handle.take() {
1247            prev.abort();
1248        }
1249        self.lifecycle.cancel_bridge_handle = Some(tokio::spawn(async move {
1250            signal.notified().await;
1251            token.cancel();
1252        }));
1253
1254        // Clone text out of Turn so we can hold both `&str` borrows and mutate turn.metrics.
1255        let text = turn.input.text.clone();
1256        let trimmed_owned = text.trim().to_owned();
1257        let trimmed = trimmed_owned.as_str();
1258
1259        if let Some(result) = self.dispatch_slash_command(trimmed).await {
1260            return result;
1261        }
1262
1263        self.check_pending_rollbacks().await;
1264
1265        if self.pre_process_security(trimmed).await? {
1266            return Ok(());
1267        }
1268
1269        let t_ctx = std::time::Instant::now();
1270        tracing::debug!("turn timing: prepare_context start");
1271        self.advance_context_lifecycle(&text, trimmed).await;
1272        turn.metrics_mut().timings.prepare_context_ms =
1273            u64::try_from(t_ctx.elapsed().as_millis()).unwrap_or(u64::MAX);
1274        tracing::debug!(
1275            ms = turn.metrics_snapshot().timings.prepare_context_ms,
1276            "turn timing: prepare_context done"
1277        );
1278
1279        let image_parts = std::mem::take(&mut turn.input.image_parts);
1280        let user_msg = self.build_user_message(&text, image_parts);
1281
1282        // Extract URLs from user input and add to user_provided_urls for grounding checks.
1283        // URL set was cleared in begin_turn; re-populate for this turn.
1284        let urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
1285        if !urls.is_empty() {
1286            self.security.user_provided_urls.write().extend(urls);
1287        }
1288
1289        // Capture raw user input as goal text for A-MAC goal-conditioned write gating (#2483).
1290        // Derived from the raw input text before context assembly to avoid timing dependencies.
1291        self.memory_state.extraction.goal_text = Some(text.clone());
1292
1293        let t_persist = std::time::Instant::now();
1294        tracing::debug!("turn timing: persist_message(user) start");
1295        // Image parts intentionally excluded — base64 payloads too large for message history.
1296        self.persist_message(Role::User, &text, &[], false).await;
1297        turn.metrics_mut().timings.persist_message_ms =
1298            u64::try_from(t_persist.elapsed().as_millis()).unwrap_or(u64::MAX);
1299        tracing::debug!(
1300            ms = turn.metrics_snapshot().timings.persist_message_ms,
1301            "turn timing: persist_message(user) done"
1302        );
1303        self.push_message(user_msg);
1304
1305        // llm_chat_ms and tool_exec_ms are accumulated inside call_chat_with_tools and
1306        // handle_native_tool_calls respectively via metrics.pending_timings.
1307        tracing::debug!("turn timing: process_response start");
1308        if let Err(e) = self.process_response().await {
1309            // Detach any in-flight learning tasks before mutating message state.
1310            self.learning_engine.learning_tasks.detach_all();
1311            tracing::error!("Response processing failed: {e:#}");
1312            let user_msg = format!("Error: {e:#}");
1313            self.channel.send(&user_msg).await?;
1314            self.msg.messages.pop();
1315            self.recompute_prompt_tokens();
1316            self.channel.flush_chunks().await?;
1317        } else {
1318            // Detach learning tasks spawned this turn — they are fire-and-forget and must not
1319            // leak into the next turn's context.
1320            self.learning_engine.learning_tasks.detach_all();
1321            self.truncate_old_tool_results();
1322            // MagicDocs: spawn background doc updates if any are due (#2702).
1323            self.maybe_update_magic_docs();
1324        }
1325        tracing::debug!("turn timing: process_response done");
1326
1327        // Collect llm_chat_ms and tool_exec_ms from MetricsState.pending_timings (accumulated
1328        // by the tool execution chain) into turn.metrics so end_turn can flush them.
1329        // This is the Phase 1 bridging: existing code writes to pending_timings directly;
1330        // we harvest those values into Turn before end_turn overwrites pending_timings.
1331        turn.metrics_mut().timings.llm_chat_ms = self.metrics.pending_timings.llm_chat_ms;
1332        turn.metrics_mut().timings.tool_exec_ms = self.metrics.pending_timings.tool_exec_ms;
1333
1334        Ok(())
1335    }
1336
1337    // Returns true if the input was blocked and the caller should return Ok(()) immediately.
1338    #[cfg_attr(
1339        feature = "profiling",
1340        tracing::instrument(name = "agent.security_prescreen", skip_all)
1341    )]
1342    async fn pre_process_security(&mut self, trimmed: &str) -> Result<bool, error::AgentError> {
1343        // Guardrail: LLM-based prompt injection pre-screening at the user input boundary.
1344        if let Some(ref guardrail) = self.security.guardrail {
1345            use zeph_sanitizer::guardrail::GuardrailVerdict;
1346            let verdict = guardrail.check(trimmed).await;
1347            match &verdict {
1348                GuardrailVerdict::Flagged { reason, .. } => {
1349                    tracing::warn!(
1350                        reason = %reason,
1351                        should_block = verdict.should_block(),
1352                        "guardrail flagged user input"
1353                    );
1354                    if verdict.should_block() {
1355                        let msg = format!("[guardrail] Input blocked: {reason}");
1356                        let _ = self.channel.send(&msg).await;
1357                        let _ = self.channel.flush_chunks().await;
1358                        return Ok(true);
1359                    }
1360                    // Warn mode: notify but continue.
1361                    let _ = self
1362                        .channel
1363                        .send(&format!("[guardrail] Warning: {reason}"))
1364                        .await;
1365                }
1366                GuardrailVerdict::Error { error } => {
1367                    if guardrail.error_should_block() {
1368                        tracing::warn!(%error, "guardrail check failed (fail_strategy=closed), blocking input");
1369                        let msg = "[guardrail] Input blocked: check failed (see logs for details)";
1370                        let _ = self.channel.send(msg).await;
1371                        let _ = self.channel.flush_chunks().await;
1372                        return Ok(true);
1373                    }
1374                    tracing::warn!(%error, "guardrail check failed (fail_strategy=open), allowing input");
1375                }
1376                GuardrailVerdict::Safe => {}
1377            }
1378        }
1379
1380        // ML classifier: lightweight injection detection on user input boundary.
1381        // Runs after guardrail (LLM-based) to layer defenses. On detection, blocks and returns.
1382        // Falls back to regex on classifier error/timeout — never degrades below regex baseline.
1383        // Gated by `scan_user_input`: DeBERTa is tuned for external/untrusted content, not
1384        // direct user chat. Disabled by default to prevent false positives on benign messages.
1385        #[cfg(feature = "classifiers")]
1386        if self.security.sanitizer.scan_user_input() {
1387            match self.security.sanitizer.classify_injection(trimmed).await {
1388                zeph_sanitizer::InjectionVerdict::Blocked => {
1389                    self.push_classifier_metrics();
1390                    let _ = self
1391                        .channel
1392                        .send("[security] Input blocked: injection detected by classifier.")
1393                        .await;
1394                    let _ = self.channel.flush_chunks().await;
1395                    return Ok(true);
1396                }
1397                zeph_sanitizer::InjectionVerdict::Suspicious => {
1398                    tracing::warn!("injection_classifier soft_signal on user input");
1399                }
1400                zeph_sanitizer::InjectionVerdict::Clean => {}
1401            }
1402        }
1403        #[cfg(feature = "classifiers")]
1404        self.push_classifier_metrics();
1405
1406        Ok(false)
1407    }
1408
1409    #[cfg_attr(
1410        feature = "profiling",
1411        tracing::instrument(name = "agent.prepare_context", skip_all)
1412    )]
1413    async fn advance_context_lifecycle(&mut self, text: &str, trimmed: &str) {
1414        // Reset per-message pruning cache at the start of each turn (#2298).
1415        self.mcp.pruning_cache.reset();
1416
1417        // Extract before rebuild_system_prompt so the value is not tainted
1418        // by the secrets-bearing system prompt (ConversationId is just an i64).
1419        let conv_id = self.memory_state.persistence.conversation_id;
1420        self.rebuild_system_prompt(text).await;
1421
1422        self.detect_and_record_corrections(trimmed, conv_id).await;
1423        self.learning_engine.tick();
1424        self.analyze_and_learn().await;
1425        self.sync_graph_counts().await;
1426
1427        // Reset per-turn compaction guard FIRST so SideQuest sees a clean slate (C2 fix).
1428        // complete_focus and maybe_sidequest_eviction set this flag when they run (C1 fix).
1429        // advance_turn() transitions CompactedThisTurn → Cooling/Ready; all other states
1430        // pass through unchanged. See CompactionState::advance_turn for ordering guarantees.
1431        self.context_manager.compaction = self.context_manager.compaction.advance_turn();
1432
1433        // Tick Focus Agent and SideQuest turn counters (#1850, #1885).
1434        {
1435            self.focus.tick();
1436
1437            // SideQuest eviction: runs every N user turns when enabled.
1438            // Skipped when is_compacted_this_turn (focus truncation or prior eviction ran).
1439            let sidequest_should_fire = self.sidequest.tick();
1440            if sidequest_should_fire && !self.context_manager.compaction.is_compacted_this_turn() {
1441                self.maybe_sidequest_eviction();
1442            }
1443        }
1444
1445        // Cache-expiry warning (#2715): notify user when prompt cache has likely expired.
1446        if let Some(warning) = self.cache_expiry_warning() {
1447            tracing::info!(warning, "cache expiry warning");
1448            let _ = self.channel.send_status(&warning).await;
1449        }
1450
1451        // Time-based microcompact (#2699): strip stale low-value tool outputs before compaction.
1452        // Zero-LLM-cost; runs only when session gap exceeds configured threshold.
1453        self.maybe_time_based_microcompact();
1454
1455        // Tier 0: batch-apply deferred tool summaries when approaching context limit.
1456        // This is a pure in-memory operation (no LLM call) — summaries were pre-computed
1457        // during the tool loop. Intentionally does NOT set compacted_this_turn, so
1458        // proactive/reactive compaction may still fire if tokens remain above their thresholds.
1459        self.maybe_apply_deferred_summaries();
1460        self.flush_deferred_summaries().await;
1461
1462        // Proactive compression fires first (if configured); if it runs, reactive is skipped.
1463        if let Err(e) = self.maybe_proactive_compress().await {
1464            tracing::warn!("proactive compression failed: {e:#}");
1465        }
1466
1467        if let Err(e) = self.maybe_compact().await {
1468            tracing::warn!("context compaction failed: {e:#}");
1469        }
1470
1471        if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
1472            tracing::warn!("context preparation failed: {e:#}");
1473        }
1474
1475        // MAR: propagate top-1 recall confidence to the router for cost-aware routing.
1476        self.provider
1477            .set_memory_confidence(self.memory_state.persistence.last_recall_confidence);
1478
1479        self.learning_engine.reset_reflection();
1480    }
1481
1482    fn build_user_message(
1483        &mut self,
1484        text: &str,
1485        image_parts: Vec<zeph_llm::provider::MessagePart>,
1486    ) -> Message {
1487        let mut all_image_parts = std::mem::take(&mut self.msg.pending_image_parts);
1488        all_image_parts.extend(image_parts);
1489
1490        if !all_image_parts.is_empty() && self.provider.supports_vision() {
1491            let mut parts = vec![zeph_llm::provider::MessagePart::Text {
1492                text: text.to_owned(),
1493            }];
1494            parts.extend(all_image_parts);
1495            Message::from_parts(Role::User, parts)
1496        } else {
1497            if !all_image_parts.is_empty() {
1498                tracing::warn!(
1499                    count = all_image_parts.len(),
1500                    "image attachments dropped: provider does not support vision"
1501                );
1502            }
1503            Message {
1504                role: Role::User,
1505                content: text.to_owned(),
1506                parts: vec![],
1507                metadata: MessageMetadata::default(),
1508            }
1509        }
1510    }
1511
1512    /// Poll a sub-agent until it reaches a terminal state, bridging secret requests to the
1513    /// channel. Returns a human-readable status string suitable for sending to the user.
1514    async fn poll_subagent_until_done(&mut self, task_id: &str, label: &str) -> Option<String> {
1515        use zeph_subagent::SubAgentState;
1516        let result = loop {
1517            tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1518
1519            // Bridge secret requests from sub-agent to channel.confirm().
1520            // Fetch the pending request first, then release the borrow before
1521            // calling channel.confirm() (which requires &mut self).
1522            #[allow(clippy::redundant_closure_for_method_calls)]
1523            let pending = self
1524                .orchestration
1525                .subagent_manager
1526                .as_mut()
1527                .and_then(|m| m.try_recv_secret_request());
1528            if let Some((req_task_id, req)) = pending {
1529                // req.secret_key is pre-validated to [a-zA-Z0-9_-] in manager.rs
1530                // (SEC-P1-02), so it is safe to embed in the prompt string.
1531                let confirm_prompt = format!(
1532                    "Sub-agent requests secret '{}'. Allow?",
1533                    crate::text::truncate_to_chars(&req.secret_key, 100)
1534                );
1535                let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
1536                if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1537                    if approved {
1538                        let ttl = std::time::Duration::from_secs(300);
1539                        let key = req.secret_key.clone();
1540                        if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
1541                            let _ = mgr.deliver_secret(&req_task_id, key);
1542                        }
1543                    } else {
1544                        let _ = mgr.deny_secret(&req_task_id);
1545                    }
1546                }
1547            }
1548
1549            let mgr = self.orchestration.subagent_manager.as_ref()?;
1550            let statuses = mgr.statuses();
1551            let Some((_, status)) = statuses.iter().find(|(id, _)| id == task_id) else {
1552                break format!("{label} completed (no status available).");
1553            };
1554            match status.state {
1555                SubAgentState::Completed => {
1556                    let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
1557                    break format!("{label} completed: {msg}");
1558                }
1559                SubAgentState::Failed => {
1560                    let msg = status
1561                        .last_message
1562                        .clone()
1563                        .unwrap_or_else(|| "unknown error".into());
1564                    break format!("{label} failed: {msg}");
1565                }
1566                SubAgentState::Canceled => {
1567                    break format!("{label} was cancelled.");
1568                }
1569                _ => {
1570                    let _ = self
1571                        .channel
1572                        .send_status(&format!(
1573                            "{label}: turn {}/{}",
1574                            status.turns_used,
1575                            self.orchestration
1576                                .subagent_manager
1577                                .as_ref()
1578                                .and_then(|m| m.agents_def(task_id))
1579                                .map_or(20, |d| d.permissions.max_turns)
1580                        ))
1581                        .await;
1582                }
1583            }
1584        };
1585        Some(result)
1586    }
1587
1588    /// Resolve a unique full `task_id` from a prefix. Returns `None` if the manager is absent,
1589    /// `Some(Err(msg))` on ambiguity/not-found, `Some(Ok(full_id))` on success.
1590    fn resolve_agent_id_prefix(&mut self, prefix: &str) -> Option<Result<String, String>> {
1591        let mgr = self.orchestration.subagent_manager.as_mut()?;
1592        let full_ids: Vec<String> = mgr
1593            .statuses()
1594            .into_iter()
1595            .map(|(tid, _)| tid)
1596            .filter(|tid| tid.starts_with(prefix))
1597            .collect();
1598        Some(match full_ids.as_slice() {
1599            [] => Err(format!("No sub-agent with id prefix '{prefix}'")),
1600            [fid] => Ok(fid.clone()),
1601            _ => Err(format!(
1602                "Ambiguous id prefix '{prefix}': matches {} agents",
1603                full_ids.len()
1604            )),
1605        })
1606    }
1607
1608    fn handle_agent_list(&self) -> Option<String> {
1609        use std::fmt::Write as _;
1610        let mgr = self.orchestration.subagent_manager.as_ref()?;
1611        let defs = mgr.definitions();
1612        if defs.is_empty() {
1613            return Some("No sub-agent definitions found.".into());
1614        }
1615        let mut out = String::from("Available sub-agents:\n");
1616        for d in defs {
1617            let memory_label = match d.memory {
1618                Some(zeph_subagent::MemoryScope::User) => " [memory:user]",
1619                Some(zeph_subagent::MemoryScope::Project) => " [memory:project]",
1620                Some(zeph_subagent::MemoryScope::Local) => " [memory:local]",
1621                None => "",
1622            };
1623            if let Some(ref src) = d.source {
1624                let _ = writeln!(
1625                    out,
1626                    "  {}{} — {} ({})",
1627                    d.name, memory_label, d.description, src
1628                );
1629            } else {
1630                let _ = writeln!(out, "  {}{} — {}", d.name, memory_label, d.description);
1631            }
1632        }
1633        Some(out)
1634    }
1635
1636    fn handle_agent_status(&self) -> Option<String> {
1637        use std::fmt::Write as _;
1638        let mgr = self.orchestration.subagent_manager.as_ref()?;
1639        let statuses = mgr.statuses();
1640        if statuses.is_empty() {
1641            return Some("No active sub-agents.".into());
1642        }
1643        let mut out = String::from("Active sub-agents:\n");
1644        for (id, s) in &statuses {
1645            let state = format!("{:?}", s.state).to_lowercase();
1646            let elapsed = s.started_at.elapsed().as_secs();
1647            let _ = writeln!(
1648                out,
1649                "  [{short}] {state}  turns={t}  elapsed={elapsed}s  {msg}",
1650                short = &id[..8.min(id.len())],
1651                t = s.turns_used,
1652                msg = s.last_message.as_deref().unwrap_or(""),
1653            );
1654            // Show memory directory path for agents with memory enabled.
1655            if let Some(def) = mgr.agents_def(id)
1656                && let Some(scope) = def.memory
1657                && let Ok(dir) = zeph_subagent::memory::resolve_memory_dir(scope, &def.name)
1658            {
1659                let _ = writeln!(out, "       memory: {}", dir.display());
1660            }
1661        }
1662        Some(out)
1663    }
1664
1665    fn handle_agent_approve(&mut self, id: &str) -> Option<String> {
1666        let full_id = match self.resolve_agent_id_prefix(id)? {
1667            Ok(fid) => fid,
1668            Err(msg) => return Some(msg),
1669        };
1670        let mgr = self.orchestration.subagent_manager.as_mut()?;
1671        if let Some((tid, req)) = mgr.try_recv_secret_request()
1672            && tid == full_id
1673        {
1674            let key = req.secret_key.clone();
1675            let ttl = std::time::Duration::from_secs(300);
1676            if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
1677                return Some(format!("Approve failed: {e}"));
1678            }
1679            if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
1680                return Some(format!("Secret delivery failed: {e}"));
1681            }
1682            return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
1683        }
1684        Some(format!(
1685            "No pending secret request for sub-agent '{full_id}'."
1686        ))
1687    }
1688
1689    fn handle_agent_deny(&mut self, id: &str) -> Option<String> {
1690        let full_id = match self.resolve_agent_id_prefix(id)? {
1691            Ok(fid) => fid,
1692            Err(msg) => return Some(msg),
1693        };
1694        let mgr = self.orchestration.subagent_manager.as_mut()?;
1695        match mgr.deny_secret(&full_id) {
1696            Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
1697            Err(e) => Some(format!("Deny failed: {e}")),
1698        }
1699    }
1700
1701    #[allow(clippy::too_many_lines)]
1702    async fn handle_agent_command(&mut self, cmd: zeph_subagent::AgentCommand) -> Option<String> {
1703        use zeph_subagent::AgentCommand;
1704
1705        match cmd {
1706            AgentCommand::List => self.handle_agent_list(),
1707            AgentCommand::Background { name, prompt } => {
1708                let provider = self.provider.clone();
1709                let tool_executor = Arc::clone(&self.tool_executor);
1710                let skills = self.filtered_skills_for(&name);
1711                let cfg = self.orchestration.subagent_config.clone();
1712                let spawn_ctx = self.build_spawn_context(&cfg);
1713                let mgr = self.orchestration.subagent_manager.as_mut()?;
1714                match mgr.spawn(
1715                    &name,
1716                    &prompt,
1717                    provider,
1718                    tool_executor,
1719                    skills,
1720                    &cfg,
1721                    spawn_ctx,
1722                ) {
1723                    Ok(id) => Some(format!(
1724                        "Sub-agent '{name}' started in background (id: {short})",
1725                        short = &id[..8.min(id.len())]
1726                    )),
1727                    Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
1728                }
1729            }
1730            AgentCommand::Spawn { name, prompt }
1731            | AgentCommand::Mention {
1732                agent: name,
1733                prompt,
1734            } => {
1735                // Foreground spawn: launch and await completion, streaming status to user.
1736                let provider = self.provider.clone();
1737                let tool_executor = Arc::clone(&self.tool_executor);
1738                let skills = self.filtered_skills_for(&name);
1739                let cfg = self.orchestration.subagent_config.clone();
1740                let spawn_ctx = self.build_spawn_context(&cfg);
1741                let mgr = self.orchestration.subagent_manager.as_mut()?;
1742                let task_id = match mgr.spawn(
1743                    &name,
1744                    &prompt,
1745                    provider,
1746                    tool_executor,
1747                    skills,
1748                    &cfg,
1749                    spawn_ctx,
1750                ) {
1751                    Ok(id) => id,
1752                    Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
1753                };
1754                let short = task_id[..8.min(task_id.len())].to_owned();
1755                let _ = self
1756                    .channel
1757                    .send(&format!("Sub-agent '{name}' running... (id: {short})"))
1758                    .await;
1759                let label = format!("Sub-agent '{name}'");
1760                self.poll_subagent_until_done(&task_id, &label).await
1761            }
1762            AgentCommand::Status => self.handle_agent_status(),
1763            AgentCommand::Cancel { id } => {
1764                let mgr = self.orchestration.subagent_manager.as_mut()?;
1765                // Accept prefix match on task_id.
1766                let ids: Vec<String> = mgr
1767                    .statuses()
1768                    .into_iter()
1769                    .map(|(task_id, _)| task_id)
1770                    .filter(|task_id| task_id.starts_with(&id))
1771                    .collect();
1772                match ids.as_slice() {
1773                    [] => Some(format!("No sub-agent with id prefix '{id}'")),
1774                    [full_id] => {
1775                        let full_id = full_id.clone();
1776                        match mgr.cancel(&full_id) {
1777                            Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
1778                            Err(e) => Some(format!("Cancel failed: {e}")),
1779                        }
1780                    }
1781                    _ => Some(format!(
1782                        "Ambiguous id prefix '{id}': matches {} agents",
1783                        ids.len()
1784                    )),
1785                }
1786            }
1787            AgentCommand::Approve { id } => self.handle_agent_approve(&id),
1788            AgentCommand::Deny { id } => self.handle_agent_deny(&id),
1789            AgentCommand::Resume { id, prompt } => {
1790                let cfg = self.orchestration.subagent_config.clone();
1791                // Resolve definition name from transcript meta before spawning so we can
1792                // look up skills by definition name rather than the UUID prefix (S1 fix).
1793                let def_name = {
1794                    let mgr = self.orchestration.subagent_manager.as_ref()?;
1795                    match mgr.def_name_for_resume(&id, &cfg) {
1796                        Ok(name) => name,
1797                        Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
1798                    }
1799                };
1800                let skills = self.filtered_skills_for(&def_name);
1801                let provider = self.provider.clone();
1802                let tool_executor = Arc::clone(&self.tool_executor);
1803                let mgr = self.orchestration.subagent_manager.as_mut()?;
1804                let (task_id, _) =
1805                    match mgr.resume(&id, &prompt, provider, tool_executor, skills, &cfg) {
1806                        Ok(pair) => pair,
1807                        Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
1808                    };
1809                let short = task_id[..8.min(task_id.len())].to_owned();
1810                let _ = self
1811                    .channel
1812                    .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
1813                    .await;
1814                self.poll_subagent_until_done(&task_id, "Resumed sub-agent")
1815                    .await
1816            }
1817        }
1818    }
1819
1820    fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
1821        let mgr = self.orchestration.subagent_manager.as_ref()?;
1822        let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
1823        let reg = self.skill_state.registry.read();
1824        match zeph_subagent::filter_skills(&reg, &def.skills) {
1825            Ok(skills) => {
1826                let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
1827                if bodies.is_empty() {
1828                    None
1829                } else {
1830                    Some(bodies)
1831                }
1832            }
1833            Err(e) => {
1834                tracing::warn!(error = %e, "skill filtering failed for sub-agent");
1835                None
1836            }
1837        }
1838    }
1839
1840    /// Build a `SpawnContext` from current agent state for sub-agent spawning.
1841    fn build_spawn_context(
1842        &self,
1843        cfg: &zeph_config::SubAgentConfig,
1844    ) -> zeph_subagent::SpawnContext {
1845        zeph_subagent::SpawnContext {
1846            parent_messages: self.extract_parent_messages(cfg),
1847            parent_cancel: Some(self.lifecycle.cancel_token.clone()),
1848            parent_provider_name: {
1849                let name = &self.runtime.active_provider_name;
1850                if name.is_empty() {
1851                    None
1852                } else {
1853                    Some(name.clone())
1854                }
1855            },
1856            spawn_depth: self.runtime.spawn_depth,
1857            mcp_tool_names: self.extract_mcp_tool_names(),
1858        }
1859    }
1860
1861    /// Extract recent parent messages for history propagation (Section 5.7 in spec).
1862    ///
1863    /// Filters system messages, takes last `context_window_turns * 2` messages,
1864    /// and applies a 25% context window cap using a 4-chars-per-token heuristic.
1865    fn extract_parent_messages(
1866        &self,
1867        config: &zeph_config::SubAgentConfig,
1868    ) -> Vec<zeph_llm::provider::Message> {
1869        use zeph_llm::provider::Role;
1870        if config.context_window_turns == 0 {
1871            return Vec::new();
1872        }
1873        let non_system: Vec<_> = self
1874            .msg
1875            .messages
1876            .iter()
1877            .filter(|m| m.role != Role::System)
1878            .cloned()
1879            .collect();
1880        let take_count = config.context_window_turns * 2;
1881        let start = non_system.len().saturating_sub(take_count);
1882        let mut msgs = non_system[start..].to_vec();
1883
1884        // Cap at 25% of model context window (rough 4-chars-per-token heuristic).
1885        let max_chars = 128_000usize / 4; // conservative default; 25% of 128K tokens
1886        let mut total_chars: usize = 0;
1887        let mut keep = msgs.len();
1888        for (i, m) in msgs.iter().enumerate() {
1889            total_chars += m.content.len();
1890            if total_chars > max_chars {
1891                keep = i;
1892                break;
1893            }
1894        }
1895        if keep < msgs.len() {
1896            tracing::info!(
1897                kept = keep,
1898                requested = config.context_window_turns * 2,
1899                "[subagent] truncated parent history from {} to {} turns due to token budget",
1900                config.context_window_turns * 2,
1901                keep
1902            );
1903            msgs.truncate(keep);
1904        }
1905        msgs
1906    }
1907
1908    /// Extract MCP tool names from the tool executor for diagnostic annotation.
1909    fn extract_mcp_tool_names(&self) -> Vec<String> {
1910        self.tool_executor
1911            .tool_definitions_erased()
1912            .into_iter()
1913            .filter(|t| t.id.starts_with("mcp_"))
1914            .map(|t| t.id.to_string())
1915            .collect()
1916    }
1917
1918    /// Update trust DB records for all reloaded skills.
1919    async fn update_trust_for_reloaded_skills(
1920        &mut self,
1921        all_meta: &[zeph_skills::loader::SkillMeta],
1922    ) {
1923        // Clone Arc before any .await so no &self fields are held across suspension points.
1924        let memory = self.memory_state.persistence.memory.clone();
1925        let Some(memory) = memory else {
1926            return;
1927        };
1928        let trust_cfg = self.skill_state.trust_config.clone();
1929        let managed_dir = self.skill_state.managed_dir.clone();
1930        for meta in all_meta {
1931            let source_kind = if managed_dir
1932                .as_ref()
1933                .is_some_and(|d| meta.skill_dir.starts_with(d))
1934            {
1935                zeph_memory::store::SourceKind::Hub
1936            } else {
1937                zeph_memory::store::SourceKind::Local
1938            };
1939            let initial_level = if matches!(source_kind, zeph_memory::store::SourceKind::Hub) {
1940                &trust_cfg.default_level
1941            } else {
1942                &trust_cfg.local_level
1943            };
1944            match zeph_skills::compute_skill_hash(&meta.skill_dir) {
1945                Ok(current_hash) => {
1946                    let existing = memory
1947                        .sqlite()
1948                        .load_skill_trust(&meta.name)
1949                        .await
1950                        .ok()
1951                        .flatten();
1952                    let trust_level_str = if let Some(ref row) = existing {
1953                        if row.blake3_hash == current_hash {
1954                            row.trust_level.clone()
1955                        } else {
1956                            trust_cfg.hash_mismatch_level.to_string()
1957                        }
1958                    } else {
1959                        initial_level.to_string()
1960                    };
1961                    let source_path = meta.skill_dir.to_str();
1962                    if let Err(e) = memory
1963                        .sqlite()
1964                        .upsert_skill_trust(
1965                            &meta.name,
1966                            &trust_level_str,
1967                            source_kind,
1968                            None,
1969                            source_path,
1970                            &current_hash,
1971                        )
1972                        .await
1973                    {
1974                        tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
1975                    }
1976                }
1977                Err(e) => {
1978                    tracing::warn!("failed to compute hash for '{}': {e:#}", meta.name);
1979                }
1980            }
1981        }
1982    }
1983
1984    /// Rebuild or sync the in-memory skill matcher and BM25 index after a registry update.
1985    async fn rebuild_skill_matcher(&mut self, all_meta: &[&zeph_skills::loader::SkillMeta]) {
1986        let provider = self.embedding_provider.clone();
1987        let embed_timeout = std::time::Duration::from_secs(self.runtime.timeouts.embedding_seconds);
1988        let embed_fn = move |text: &str| -> zeph_skills::matcher::EmbedFuture {
1989            let owned = text.to_owned();
1990            let p = provider.clone();
1991            Box::pin(async move {
1992                if let Ok(result) = tokio::time::timeout(embed_timeout, p.embed(&owned)).await {
1993                    result
1994                } else {
1995                    tracing::warn!(
1996                        timeout_secs = embed_timeout.as_secs(),
1997                        "skill matcher: embedding timed out"
1998                    );
1999                    Err(zeph_llm::LlmError::Timeout)
2000                }
2001            })
2002        };
2003
2004        let needs_inmemory_rebuild = !self
2005            .skill_state
2006            .matcher
2007            .as_ref()
2008            .is_some_and(SkillMatcherBackend::is_qdrant);
2009
2010        if needs_inmemory_rebuild {
2011            self.skill_state.matcher = SkillMatcher::new(all_meta, embed_fn)
2012                .await
2013                .map(SkillMatcherBackend::InMemory);
2014        } else if let Some(ref mut backend) = self.skill_state.matcher {
2015            let _ = self.channel.send_status("syncing skill index...").await;
2016            let on_progress: Option<Box<dyn Fn(usize, usize) + Send>> = self
2017                .session
2018                .status_tx
2019                .clone()
2020                .map(|tx| -> Box<dyn Fn(usize, usize) + Send> {
2021                    Box::new(move |completed, total| {
2022                        let msg = format!("Syncing skills: {completed}/{total}");
2023                        let _ = tx.send(msg);
2024                    })
2025                });
2026            if let Err(e) = backend
2027                .sync(
2028                    all_meta,
2029                    &self.skill_state.embedding_model,
2030                    embed_fn,
2031                    on_progress,
2032                )
2033                .await
2034            {
2035                tracing::warn!("failed to sync skill embeddings: {e:#}");
2036            }
2037        }
2038
2039        if self.skill_state.hybrid_search {
2040            let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
2041            let _ = self.channel.send_status("rebuilding search index...").await;
2042            self.skill_state.rebuild_bm25(&descs);
2043        }
2044    }
2045
2046    #[cfg_attr(
2047        feature = "profiling",
2048        tracing::instrument(name = "skill.hot_reload", skip_all)
2049    )]
2050    async fn reload_skills(&mut self) {
2051        let new_registry = SkillRegistry::load(&self.skill_state.skill_paths);
2052        if new_registry.fingerprint() == self.skill_state.fingerprint() {
2053            return;
2054        }
2055        let _ = self.channel.send_status("reloading skills...").await;
2056        *self.skill_state.registry.write() = new_registry;
2057
2058        let all_meta = self
2059            .skill_state
2060            .registry
2061            .read()
2062            .all_meta()
2063            .into_iter()
2064            .cloned()
2065            .collect::<Vec<_>>();
2066
2067        self.update_trust_for_reloaded_skills(&all_meta).await;
2068
2069        let all_meta_refs = all_meta.iter().collect::<Vec<_>>();
2070        self.rebuild_skill_matcher(&all_meta_refs).await;
2071
2072        let all_skills: Vec<Skill> = {
2073            let reg = self.skill_state.registry.read();
2074            reg.all_meta()
2075                .iter()
2076                .filter_map(|m| reg.get_skill(&m.name).ok())
2077                .collect()
2078        };
2079        let trust_map = self.build_skill_trust_map().await;
2080        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
2081        let skills_prompt = SkillState::rebuild_prompt(&all_skills, &trust_map, &empty_health);
2082        self.skill_state
2083            .last_skills_prompt
2084            .clone_from(&skills_prompt);
2085        let system_prompt = build_system_prompt(&skills_prompt, None);
2086        if let Some(msg) = self.msg.messages.first_mut() {
2087            msg.content = system_prompt;
2088        }
2089
2090        let _ = self.channel.send_status("").await;
2091        tracing::info!(
2092            "reloaded {} skill(s)",
2093            self.skill_state.registry.read().all_meta().len()
2094        );
2095    }
2096
2097    fn reload_instructions(&mut self) {
2098        // Drain any additional queued events before reloading to avoid redundant reloads.
2099        if let Some(ref mut rx) = self.instructions.reload_rx {
2100            while rx.try_recv().is_ok() {}
2101        }
2102        let Some(ref state) = self.instructions.reload_state else {
2103            return;
2104        };
2105        let new_blocks = crate::instructions::load_instructions(
2106            &state.base_dir,
2107            &state.provider_kinds,
2108            &state.explicit_files,
2109            state.auto_detect,
2110        );
2111        let old_sources: std::collections::HashSet<_> =
2112            self.instructions.blocks.iter().map(|b| &b.source).collect();
2113        let new_sources: std::collections::HashSet<_> =
2114            new_blocks.iter().map(|b| &b.source).collect();
2115        for added in new_sources.difference(&old_sources) {
2116            tracing::info!(path = %added.display(), "instruction file added");
2117        }
2118        for removed in old_sources.difference(&new_sources) {
2119            tracing::info!(path = %removed.display(), "instruction file removed");
2120        }
2121        tracing::info!(
2122            old_count = self.instructions.blocks.len(),
2123            new_count = new_blocks.len(),
2124            "reloaded instruction files"
2125        );
2126        self.instructions.blocks = new_blocks;
2127    }
2128
2129    fn reload_config(&mut self) {
2130        let Some(ref path) = self.lifecycle.config_path else {
2131            return;
2132        };
2133        let config = match Config::load(path) {
2134            Ok(c) => c,
2135            Err(e) => {
2136                tracing::warn!("config reload failed: {e:#}");
2137                return;
2138            }
2139        };
2140
2141        let budget_tokens = resolve_context_budget(&config, &self.provider);
2142        self.runtime.security = config.security;
2143        self.runtime.timeouts = config.timeouts;
2144        self.runtime.redact_credentials = config.memory.redact_credentials;
2145        self.memory_state.persistence.history_limit = config.memory.history_limit;
2146        self.memory_state.persistence.recall_limit = config.memory.semantic.recall_limit;
2147        self.memory_state.compaction.summarization_threshold =
2148            config.memory.summarization_threshold;
2149        self.skill_state.max_active_skills = config.skills.max_active_skills;
2150        self.skill_state.disambiguation_threshold = config.skills.disambiguation_threshold;
2151        self.skill_state.min_injection_score = config.skills.min_injection_score;
2152        self.skill_state.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
2153        self.skill_state.hybrid_search = config.skills.hybrid_search;
2154        self.skill_state.two_stage_matching = config.skills.two_stage_matching;
2155        self.skill_state.confusability_threshold =
2156            config.skills.confusability_threshold.clamp(0.0, 1.0);
2157        config
2158            .skills
2159            .generation_provider
2160            .as_str()
2161            .clone_into(&mut self.skill_state.generation_provider_name);
2162        self.skill_state.generation_output_dir =
2163            config.skills.generation_output_dir.as_deref().map(|p| {
2164                if let Some(stripped) = p.strip_prefix("~/") {
2165                    dirs::home_dir()
2166                        .map_or_else(|| std::path::PathBuf::from(p), |h| h.join(stripped))
2167                } else {
2168                    std::path::PathBuf::from(p)
2169                }
2170            });
2171
2172        self.context_manager.budget = Some(
2173            ContextBudget::new(budget_tokens, 0.20).with_graph_enabled(config.memory.graph.enabled),
2174        );
2175
2176        {
2177            let graph_cfg = &config.memory.graph;
2178            if graph_cfg.rpe.enabled {
2179                // Re-create router only if it doesn't exist yet; preserve state on hot-reload.
2180                if self.memory_state.extraction.rpe_router.is_none() {
2181                    self.memory_state.extraction.rpe_router =
2182                        Some(std::sync::Mutex::new(zeph_memory::RpeRouter::new(
2183                            graph_cfg.rpe.threshold,
2184                            graph_cfg.rpe.max_skip_turns,
2185                        )));
2186                }
2187            } else {
2188                self.memory_state.extraction.rpe_router = None;
2189            }
2190            self.memory_state.extraction.graph_config = graph_cfg.clone();
2191        }
2192        self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
2193        self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
2194        self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
2195        self.context_manager.compaction_cooldown_turns = config.memory.compaction_cooldown_turns;
2196        self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
2197        self.context_manager.compression = config.memory.compression.clone();
2198        self.context_manager.routing = config.memory.store_routing.clone();
2199        // Resolve routing_classifier_provider from the provider pool (#2484).
2200        self.context_manager.store_routing_provider = if config
2201            .memory
2202            .store_routing
2203            .routing_classifier_provider
2204            .is_empty()
2205        {
2206            None
2207        } else {
2208            let resolved = self.resolve_background_provider(
2209                &config.memory.store_routing.routing_classifier_provider,
2210            );
2211            Some(std::sync::Arc::new(resolved))
2212        };
2213        self.memory_state.persistence.cross_session_score_threshold =
2214            config.memory.cross_session_score_threshold;
2215
2216        self.index.repo_map_tokens = config.index.repo_map_tokens;
2217        self.index.repo_map_ttl = std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
2218
2219        tracing::info!("config reloaded");
2220    }
2221
2222    /// Run `SideQuest` tool output eviction pass (#1885).
2223    ///
2224    /// PERF-1 fix: two-phase non-blocking design.
2225    ///
2226    /// Phase 1 (apply, this turn): check for a background LLM result spawned last turn,
2227    /// validate and apply it immediately.
2228    ///
2229    /// Phase 2 (schedule, this turn): rebuild cursors and spawn a background `tokio::spawn`
2230    /// task for the LLM call. The result is stored in `pending_sidequest_result` and applied
2231    /// next turn, so the current agent turn is never blocked by the LLM call.
2232    #[allow(clippy::too_many_lines)]
2233    fn maybe_sidequest_eviction(&mut self) {
2234        use zeph_llm::provider::{Message, MessageMetadata, Role};
2235
2236        // S1 runtime guard: warn when SideQuest is enabled alongside a non-Reactive pruning
2237        // strategy — the two systems share the same pool of evictable tool outputs and can
2238        // interfere. Disable sidequest.enabled when pruning_strategy != Reactive.
2239        if self.sidequest.config.enabled {
2240            use crate::config::PruningStrategy;
2241            if !matches!(
2242                self.context_manager.compression.pruning_strategy,
2243                PruningStrategy::Reactive
2244            ) {
2245                tracing::warn!(
2246                    strategy = ?self.context_manager.compression.pruning_strategy,
2247                    "sidequest is enabled alongside a non-Reactive pruning strategy; \
2248                     consider disabling sidequest.enabled to avoid redundant eviction"
2249                );
2250            }
2251        }
2252
2253        // Guard: do not evict while a focus session is active.
2254        if self.focus.is_active() {
2255            tracing::debug!("sidequest: skipping — focus session active");
2256            // Drop any pending result — cursors may be stale relative to focus truncation.
2257            self.compression.pending_sidequest_result = None;
2258            return;
2259        }
2260
2261        // Phase 1: apply pending result from last turn's background LLM call.
2262        if let Some(handle) = self.compression.pending_sidequest_result.take() {
2263            // `now_or_never` avoids blocking — if the task isn't done yet, skip this turn.
2264            use futures::FutureExt as _;
2265            match handle.now_or_never() {
2266                Some(Ok(Some(evicted_indices))) if !evicted_indices.is_empty() => {
2267                    let cursors_snapshot = self.sidequest.tool_output_cursors.clone();
2268                    let freed = self.sidequest.apply_eviction(
2269                        &mut self.msg.messages,
2270                        &evicted_indices,
2271                        &self.metrics.token_counter,
2272                    );
2273                    if freed > 0 {
2274                        self.recompute_prompt_tokens();
2275                        // C1 fix: prevent maybe_compact() from firing in the same turn.
2276                        // cooldown=0: eviction does not impose post-compaction cooldown.
2277                        self.context_manager.compaction =
2278                            crate::agent::context_manager::CompactionState::CompactedThisTurn {
2279                                cooldown: 0,
2280                            };
2281                        tracing::info!(
2282                            freed_tokens = freed,
2283                            evicted_cursors = evicted_indices.len(),
2284                            pass = self.sidequest.passes_run,
2285                            "sidequest eviction complete"
2286                        );
2287                        if let Some(ref d) = self.debug_state.debug_dumper {
2288                            d.dump_sidequest_eviction(&cursors_snapshot, &evicted_indices, freed);
2289                        }
2290                        if let Some(ref tx) = self.session.status_tx {
2291                            let _ = tx.send(format!("SideQuest evicted {freed} tokens"));
2292                        }
2293                    } else {
2294                        // apply_eviction returned 0 — clear spinner so it doesn't dangle.
2295                        if let Some(ref tx) = self.session.status_tx {
2296                            let _ = tx.send(String::new());
2297                        }
2298                    }
2299                }
2300                Some(Ok(None | Some(_))) => {
2301                    tracing::debug!("sidequest: pending result: no cursors to evict");
2302                    if let Some(ref tx) = self.session.status_tx {
2303                        let _ = tx.send(String::new());
2304                    }
2305                }
2306                Some(Err(e)) => {
2307                    tracing::debug!("sidequest: background task panicked: {e}");
2308                    if let Some(ref tx) = self.session.status_tx {
2309                        let _ = tx.send(String::new());
2310                    }
2311                }
2312                None => {
2313                    // Task still running — re-store and wait another turn.
2314                    // We already took it; we'd need to re-spawn, but instead just drop and
2315                    // schedule fresh below to keep the cursor list current.
2316                    tracing::debug!(
2317                        "sidequest: background LLM task not yet complete, rescheduling"
2318                    );
2319                }
2320            }
2321        }
2322
2323        // Phase 2: rebuild cursors and schedule the next background eviction LLM call.
2324        self.sidequest
2325            .rebuild_cursors(&self.msg.messages, &self.metrics.token_counter);
2326
2327        if self.sidequest.tool_output_cursors.is_empty() {
2328            tracing::debug!("sidequest: no eligible cursors");
2329            return;
2330        }
2331
2332        let prompt = self.sidequest.build_eviction_prompt();
2333        let max_eviction_ratio = self.sidequest.config.max_eviction_ratio;
2334        let n_cursors = self.sidequest.tool_output_cursors.len();
2335        // Clone the provider so the spawn closure owns it without borrowing self.
2336        let provider = self.summary_or_primary_provider().clone();
2337
2338        // Spawn background task: the LLM call runs without blocking the agent loop.
2339        let handle = tokio::spawn(async move {
2340            let msgs = [Message {
2341                role: Role::User,
2342                content: prompt,
2343                parts: vec![],
2344                metadata: MessageMetadata::default(),
2345            }];
2346            let response =
2347                match tokio::time::timeout(std::time::Duration::from_secs(5), provider.chat(&msgs))
2348                    .await
2349                {
2350                    Ok(Ok(r)) => r,
2351                    Ok(Err(e)) => {
2352                        tracing::debug!("sidequest bg: LLM call failed: {e:#}");
2353                        return None;
2354                    }
2355                    Err(_) => {
2356                        tracing::debug!("sidequest bg: LLM call timed out");
2357                        return None;
2358                    }
2359                };
2360
2361            let start = response.find('{')?;
2362            let end = response.rfind('}')?;
2363            if start > end {
2364                return None;
2365            }
2366            let json_slice = &response[start..=end];
2367            let parsed: sidequest::EvictionResponse = serde_json::from_str(json_slice).ok()?;
2368            let mut valid: Vec<usize> = parsed
2369                .del_cursors
2370                .into_iter()
2371                .filter(|&c| c < n_cursors)
2372                .collect();
2373            valid.sort_unstable();
2374            valid.dedup();
2375            #[allow(
2376                clippy::cast_precision_loss,
2377                clippy::cast_possible_truncation,
2378                clippy::cast_sign_loss
2379            )]
2380            let max_evict = ((n_cursors as f32) * max_eviction_ratio).ceil() as usize;
2381            valid.truncate(max_evict);
2382            Some(valid)
2383        });
2384
2385        self.compression.pending_sidequest_result = Some(handle);
2386        tracing::debug!("sidequest: background LLM eviction task spawned");
2387        if let Some(ref tx) = self.session.status_tx {
2388            let _ = tx.send("SideQuest: scoring tool outputs...".into());
2389        }
2390    }
2391
2392    /// Check if the process cwd has changed since last call and fire `CwdChanged` hooks.
2393    ///
2394    /// Called after each tool batch completes. The check is a single syscall and has
2395    /// negligible cost. Only fires when cwd actually changed (defense-in-depth: normally
2396    /// only `set_working_directory` changes cwd; shell child processes cannot affect it).
2397    pub(crate) async fn check_cwd_changed(&mut self) {
2398        let current = match std::env::current_dir() {
2399            Ok(p) => p,
2400            Err(e) => {
2401                tracing::warn!("check_cwd_changed: failed to get cwd: {e}");
2402                return;
2403            }
2404        };
2405        if current == self.lifecycle.last_known_cwd {
2406            return;
2407        }
2408        let old_cwd = std::mem::replace(&mut self.lifecycle.last_known_cwd, current.clone());
2409        self.session.env_context.working_dir = current.display().to_string();
2410
2411        tracing::info!(
2412            old = %old_cwd.display(),
2413            new = %current.display(),
2414            "working directory changed"
2415        );
2416
2417        let _ = self
2418            .channel
2419            .send_status("Working directory changed\u{2026}")
2420            .await;
2421
2422        let hooks = self.session.hooks_config.cwd_changed.clone();
2423        if !hooks.is_empty() {
2424            let mut env = std::collections::HashMap::new();
2425            env.insert("ZEPH_OLD_CWD".to_owned(), old_cwd.display().to_string());
2426            env.insert("ZEPH_NEW_CWD".to_owned(), current.display().to_string());
2427            if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env).await {
2428                tracing::warn!(error = %e, "CwdChanged hook failed");
2429            }
2430        }
2431
2432        let _ = self.channel.send_status("").await;
2433    }
2434
2435    /// Handle a `FileChangedEvent` from the file watcher.
2436    pub(crate) async fn handle_file_changed(
2437        &mut self,
2438        event: crate::file_watcher::FileChangedEvent,
2439    ) {
2440        tracing::info!(path = %event.path.display(), "file changed");
2441
2442        let _ = self
2443            .channel
2444            .send_status("Running file-change hook\u{2026}")
2445            .await;
2446
2447        let hooks = self.session.hooks_config.file_changed_hooks.clone();
2448        if !hooks.is_empty() {
2449            let mut env = std::collections::HashMap::new();
2450            env.insert(
2451                "ZEPH_CHANGED_PATH".to_owned(),
2452                event.path.display().to_string(),
2453            );
2454            if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env).await {
2455                tracing::warn!(error = %e, "FileChanged hook failed");
2456            }
2457        }
2458
2459        let _ = self.channel.send_status("").await;
2460    }
2461}
2462pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
2463    while !*rx.borrow_and_update() {
2464        if rx.changed().await.is_err() {
2465            std::future::pending::<()>().await;
2466        }
2467    }
2468}
2469
2470pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
2471    match rx {
2472        Some(inner) => {
2473            if let Some(v) = inner.recv().await {
2474                Some(v)
2475            } else {
2476                *rx = None;
2477                std::future::pending().await
2478            }
2479        }
2480        None => std::future::pending().await,
2481    }
2482}
2483
2484/// Resolve the effective context budget from config, applying the `auto_budget` fallback.
2485///
2486/// Mirrors `AppBuilder::auto_budget_tokens` so hot-reload and initial startup use the same
2487/// logic: if `auto_budget = true` and `context_budget_tokens == 0`, query the provider's
2488/// context window; if still 0, fall back to 128 000 tokens.
2489pub(crate) fn resolve_context_budget(config: &Config, provider: &AnyProvider) -> usize {
2490    let tokens = if config.memory.auto_budget && config.memory.context_budget_tokens == 0 {
2491        if let Some(ctx_size) = provider.context_window() {
2492            tracing::info!(
2493                model_context = ctx_size,
2494                "auto-configured context budget on reload"
2495            );
2496            ctx_size
2497        } else {
2498            0
2499        }
2500    } else {
2501        config.memory.context_budget_tokens
2502    };
2503    if tokens == 0 {
2504        tracing::warn!(
2505            "context_budget_tokens resolved to 0 on reload — using fallback of 128000 tokens"
2506        );
2507        128_000
2508    } else {
2509        tokens
2510    }
2511}
2512
2513#[cfg(test)]
2514mod tests;
2515
2516#[cfg(test)]
2517pub(crate) use tests::agent_tests;