Skip to main content

zeph_core/agent/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4// TODO(arch-revised-2026-04-26): agent/ module split deferred to epic/m49+/agent-split.
5// Hard prerequisite: 100% task_supervisor adoption (currently 10/31 raw tokio::spawn
6// sites in this directory). See arch-assessment-revised-2026-04-26T02-04-23.md PR 1
7// and PR 8. Do not split this file until PR 1 is merged and a /specs/ entry exists.
8
9mod acp_commands;
10mod agent_access_impl;
11pub(crate) mod agent_supervisor;
12mod autodream;
13mod autonomous_turn;
14mod builder;
15pub(crate) mod channel_impl;
16#[cfg(feature = "cocoon")]
17mod cocoon_cmd;
18mod command_context_impls;
19pub(super) mod compression_feedback;
20mod context;
21mod context_impls;
22pub(crate) mod context_manager;
23mod corrections;
24pub mod error;
25mod experiment_cmd;
26pub(crate) mod focus;
27mod heuristic_promotion;
28mod index;
29mod learning;
30pub(crate) mod learning_engine;
31mod log_commands;
32mod loop_event;
33mod lsp_commands;
34mod magic_docs;
35mod mcp;
36pub(crate) mod memcot;
37mod message_queue;
38mod microcompact;
39mod model_commands;
40mod persistence;
41#[cfg(feature = "scheduler")]
42mod plan;
43mod policy_commands;
44mod provider_cmd;
45mod quality_hook;
46pub(crate) mod rate_limiter;
47#[cfg(feature = "scheduler")]
48mod scheduler_commands;
49#[cfg(feature = "scheduler")]
50mod scheduler_loop;
51mod scope_commands;
52pub mod session_config;
53mod session_digest;
54pub mod shadow_sentinel;
55pub(crate) mod sidequest;
56mod skill_management;
57pub mod slash_commands;
58pub mod speculative;
59pub(crate) mod state;
60pub(crate) mod task_injection;
61pub(crate) mod tool_execution;
62pub(crate) mod tool_orchestrator;
63mod trace_extraction;
64pub mod trajectory;
65mod trajectory_commands;
66mod trust_commands;
67pub mod turn;
68mod utils;
69pub(crate) mod vigil;
70
71use std::collections::{HashMap, VecDeque};
72use std::fmt::Write as _;
73use std::sync::Arc;
74
75use parking_lot::RwLock;
76
77use tokio::sync::{mpsc, watch};
78use tokio_util::sync::CancellationToken;
79use zeph_llm::any::AnyProvider;
80use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
81use zeph_memory::TokenCounter;
82use zeph_memory::semantic::SemanticMemory;
83use zeph_skills::loader::Skill;
84use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
85use zeph_skills::prompt::format_skills_prompt;
86use zeph_skills::registry::SkillRegistry;
87use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
88
89use crate::channel::Channel;
90use crate::config::Config;
91use crate::context::{ContextBudget, build_system_prompt};
92use zeph_common::text::estimate_tokens;
93
94use loop_event::LoopEvent;
95use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, detect_image_mime};
96use state::MessageState;
97
98pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
99// CODE_CONTEXT_PREFIX is re-exported from zeph-agent-context::helpers so callers inside
100// zeph-core that build system-prompt injections can use it without depending on zeph-agent-context
101// directly. SESSION_DIGEST_PREFIX was removed when assembly migrated to ContextService.
102pub(crate) use zeph_agent_context::helpers::CODE_CONTEXT_PREFIX;
103pub(crate) const SCHEDULED_TASK_PREFIX: &str = "Execute the following scheduled task now: ";
104pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
105
106pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
107    use std::fmt::Write;
108    let capacity = "[tool output: ".len()
109        + tool_name.len()
110        + "]\n```\n".len()
111        + body.len()
112        + TOOL_OUTPUT_SUFFIX.len();
113    let mut buf = String::with_capacity(capacity);
114    let _ = write!(
115        buf,
116        "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
117    );
118    buf
119}
120
121/// Zeph agent: autonomous AI system with multi-model inference, semantic memory, skills,
122/// tool orchestration, and multi-channel I/O.
123///
124/// The agent maintains conversation history, manages LLM provider state, coordinates tool
125/// execution, and orchestrates memory and skill subsystems. It communicates with the outside
126/// world via the [`Channel`] trait, enabling support for CLI, Telegram, TUI, or custom I/O.
127///
128/// # Architecture
129///
130/// - **Message state**: Conversation history with system prompt, message queue, and metadata
131/// - **Memory state**: `SQLite` + Qdrant vector store for semantic search and compaction
132/// - **Skill state**: Registry, matching engine, and self-learning evolution
133/// - **Context manager**: Token budgeting, context assembly, and summarization
134/// - **Tool orchestrator**: DAG-based multi-tool execution with streaming output
135/// - **MCP client**: Multi-server support for Model Context Protocol
136/// - **Index state**: AST-based code indexing and semantic retrieval
137/// - **Security**: Sanitization, exfiltration detection, adversarial probes
138/// - **Metrics**: Token usage, latency, cost, and anomaly tracking
139///
140/// # Channel Contract
141///
142/// The agent requires a [`Channel`] implementation for user interaction:
143/// - Sends agent responses via `channel.send(message)`
144/// - Receives user input via `channel.recv()` / `channel.recv_internal()`
145/// - Supports structured events: tool invocations, tool output, streaming updates
146///
147/// # Lifecycle
148///
149/// 1. Create with [`Self::new`] or [`Self::new_with_registry_arc`]
150/// 2. Run main loop with [`Self::run`]
151/// 3. Clean up with [`Self::shutdown`] to persist state and close resources
152///
153pub struct Agent<C: Channel> {
154    // --- I/O & primary providers (kept inline) ---
155    provider: AnyProvider,
156    /// Dedicated embedding provider. Resolved once at bootstrap from `[[llm.providers]]`
157    /// (the entry with `embed = true`, or first entry with `embedding_model` set).
158    /// Falls back to `provider.clone()` when no dedicated entry exists.
159    /// **Never replaced** by `/provider switch`.
160    embedding_provider: AnyProvider,
161    channel: C,
162    pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
163
164    // --- Conversation core (kept inline) ---
165    pub(super) msg: MessageState,
166    pub(super) context_manager: context_manager::ContextManager,
167    pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
168
169    // --- Aggregated background services ---
170    pub(super) services: state::Services,
171
172    // --- Aggregated runtime / lifecycle / telemetry ---
173    pub(super) runtime: state::AgentRuntime,
174}
175
176/// Control flow signal returned by [`Agent::apply_dispatch_result`].
177enum DispatchFlow {
178    /// The command requested exit; the agent loop should `break`.
179    Break,
180    /// The command was handled; the agent loop should `continue`.
181    Continue,
182    /// The command was not recognised; the agent loop should fall through.
183    Fallthrough,
184}
185
186impl<C: Channel> Agent<C> {
187    /// Create a new agent instance with the given LLM provider, I/O channel, and subsystems.
188    ///
189    /// # Arguments
190    ///
191    /// * `provider` — Multi-model LLM provider (Claude, `OpenAI`, Ollama, Candle)
192    /// * `channel` — I/O abstraction for user interaction (CLI, Telegram, TUI, etc.)
193    /// * `registry` — Skill registry; moved into an internal `Arc<RwLock<_>>` for sharing
194    /// * `matcher` — Optional semantic skill matcher (e.g., Qdrant, BM25). If `None`,
195    ///   skills are matched by exact name only
196    /// * `max_active_skills` — Max concurrent skills in execution (must be > 0)
197    /// * `tool_executor` — Trait object for executing shell, web, and custom tools
198    ///
199    /// # Initialization
200    ///
201    /// The constructor:
202    /// 1. Wraps the skill registry into `Arc<RwLock<_>>` internally
203    /// 2. Builds the system prompt from registered skills
204    /// 3. Initializes all subsystems (memory, context manager, metrics, security)
205    /// 4. Returns a ready-to-run agent
206    ///
207    /// # Panics
208    ///
209    /// Panics if `max_active_skills` is 0.
210    #[must_use]
211    pub fn new(
212        provider: AnyProvider,
213        channel: C,
214        registry: SkillRegistry,
215        matcher: Option<SkillMatcherBackend>,
216        max_active_skills: usize,
217        tool_executor: impl ToolExecutor + 'static,
218    ) -> Self {
219        let registry = Arc::new(RwLock::new(registry));
220        let embedding_provider = provider.clone();
221        Self::new_with_registry_arc(
222            provider,
223            embedding_provider,
224            channel,
225            registry,
226            matcher,
227            max_active_skills,
228            tool_executor,
229        )
230    }
231
232    /// Create an agent from a pre-wrapped registry Arc, allowing the caller to
233    /// share the same Arc with other components (e.g. [`crate::SkillLoaderExecutor`]).
234    ///
235    /// # Panics
236    ///
237    /// Panics if the registry `RwLock` is poisoned.
238    #[must_use]
239    pub fn new_with_registry_arc(
240        provider: AnyProvider,
241        embedding_provider: AnyProvider,
242        channel: C,
243        registry: Arc<RwLock<SkillRegistry>>,
244        matcher: Option<SkillMatcherBackend>,
245        max_active_skills: usize,
246        tool_executor: impl ToolExecutor + 'static,
247    ) -> Self {
248        use state::{
249            AgentRuntime, CompressionState, DebugState, ExperimentState, FeedbackState, IndexState,
250            InstructionState, LifecycleState, McpState, MemoryState, MetricsState,
251            OrchestrationState, ProviderState, RuntimeConfig, SecurityState, Services,
252            SessionState, SkillState, ToolState,
253        };
254
255        debug_assert!(max_active_skills > 0, "max_active_skills must be > 0");
256        let all_skills: Vec<Skill> = {
257            let reg = registry.read();
258            reg.all_meta()
259                .iter()
260                .filter_map(|m| reg.skill(&m.name).ok())
261                .collect()
262        };
263        let empty_trust = HashMap::new();
264        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
265        let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
266        let system_prompt = build_system_prompt(&skills_prompt, None);
267        tracing::debug!(len = system_prompt.len(), "initial system prompt built");
268        tracing::trace!(prompt = %system_prompt, "full system prompt");
269
270        let initial_prompt_tokens = estimate_tokens(&system_prompt) as u64;
271        let token_counter = Arc::new(TokenCounter::new());
272
273        let services = Services {
274            memory: MemoryState::default(),
275            skill: SkillState::new(registry, matcher, max_active_skills, skills_prompt),
276            learning_engine: learning_engine::LearningEngine::new(),
277            feedback: FeedbackState::default(),
278            mcp: McpState::default(),
279            index: IndexState::default(),
280            session: SessionState::new(),
281            security: SecurityState::default(),
282            experiments: ExperimentState::new(),
283            compression: CompressionState::default(),
284            orchestration: OrchestrationState::default(),
285            focus: focus::FocusState::default(),
286            sidequest: sidequest::SidequestState::default(),
287            tool_state: ToolState::default(),
288            goal_accounting: None,
289            quality: None,
290            proactive_explorer: None,
291            promotion_engine: None,
292            taco_compressor: None,
293            speculation_engine: None,
294            autonomous: crate::goal::AutonomousDriver::new(tokio::time::Duration::from_millis(500)),
295            autonomous_registry: crate::goal::AutonomousRegistry::new(),
296        };
297
298        let runtime = AgentRuntime {
299            config: RuntimeConfig::default(),
300            lifecycle: LifecycleState::new(),
301            providers: ProviderState::new(initial_prompt_tokens),
302            metrics: MetricsState::new(token_counter),
303            debug: DebugState::default(),
304            instructions: InstructionState::default(),
305        };
306
307        Self {
308            provider,
309            embedding_provider,
310            channel,
311            tool_executor: Arc::new(tool_executor),
312            msg: MessageState {
313                messages: vec![Message {
314                    role: Role::System,
315                    content: system_prompt,
316                    parts: vec![],
317                    metadata: MessageMetadata::default(),
318                }],
319                message_queue: VecDeque::new(),
320                pending_image_parts: Vec::new(),
321                last_persisted_message_id: None,
322                deferred_db_hide_ids: Vec::new(),
323                deferred_db_summaries: Vec::new(),
324            },
325            context_manager: context_manager::ContextManager::new(),
326            tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
327            services,
328            runtime,
329        }
330    }
331
332    /// Consume the agent and return the inner channel.
333    ///
334    /// Call this after [`run`][Agent::run] completes to retrieve the I/O channel (e.g., to
335    /// read captured responses from a headless channel such as `BenchmarkChannel`).
336    ///
337    /// # Examples
338    ///
339    /// ```no_run
340    /// # use zeph_core::agent::Agent;
341    /// // After agent.run().await completes, consume the agent to retrieve the channel.
342    /// // let channel: MyChannel = agent.into_channel();
343    /// ```
344    #[must_use]
345    pub fn into_channel(self) -> C {
346        self.channel
347    }
348
349    /// Poll all active sub-agents for completed/failed/canceled results.
350    ///
351    /// Non-blocking: returns immediately with a list of `(task_id, result)` pairs
352    /// for agents that have finished. Each completed agent is removed from the manager.
353    #[tracing::instrument(name = "core.agent.poll_subagents", skip_all, level = "debug")]
354    pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
355        let Some(mgr) = &mut self.services.orchestration.subagent_manager else {
356            return vec![];
357        };
358
359        let finished: Vec<String> = mgr
360            .statuses()
361            .into_iter()
362            .filter_map(|(id, status)| {
363                if matches!(
364                    status.state,
365                    zeph_subagent::SubAgentState::Completed
366                        | zeph_subagent::SubAgentState::Failed
367                        | zeph_subagent::SubAgentState::Canceled
368                ) {
369                    Some(id)
370                } else {
371                    None
372                }
373            })
374            .collect();
375
376        let mut results = vec![];
377        for task_id in finished {
378            match mgr.collect(&task_id).await {
379                Ok(result) => results.push((task_id, result)),
380                Err(e) => {
381                    tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
382                }
383            }
384        }
385        results
386    }
387
388    /// Call the LLM to generate a structured session summary with a configurable timeout.
389    ///
390    /// Falls back to plain-text chat if structured output fails or times out. Returns `None` on
391    /// any failure, logging a warning — callers must treat `None` as "skip storage".
392    ///
393    /// Each LLM attempt is bounded by `shutdown_summary_timeout_secs`; in the worst case
394    /// (structured call times out and plain-text fallback also times out) this adds up to
395    /// `2 * shutdown_summary_timeout_secs` of shutdown latency.
396    async fn call_llm_for_session_summary(
397        &self,
398        chat_messages: &[Message],
399    ) -> Option<zeph_memory::StructuredSummary> {
400        let provider = self.resolve_background_provider(
401            &self.services.memory.compaction.shutdown_summary_provider,
402        );
403        let timeout_dur = std::time::Duration::from_secs(
404            self.services
405                .memory
406                .compaction
407                .shutdown_summary_timeout_secs,
408        );
409        match tokio::time::timeout(
410            timeout_dur,
411            provider.chat_typed_erased::<zeph_memory::StructuredSummary>(chat_messages),
412        )
413        .await
414        {
415            Ok(Ok(s)) => Some(s),
416            Ok(Err(e)) => {
417                tracing::warn!(
418                    "shutdown summary: structured LLM call failed, falling back to plain: {e:#}"
419                );
420                self.plain_text_summary_fallback(&provider, chat_messages, timeout_dur)
421                    .await
422            }
423            Err(_) => {
424                tracing::warn!(
425                    "shutdown summary: structured LLM call timed out after {}s, falling back to plain",
426                    self.services
427                        .memory
428                        .compaction
429                        .shutdown_summary_timeout_secs
430                );
431                self.plain_text_summary_fallback(&provider, chat_messages, timeout_dur)
432                    .await
433            }
434        }
435    }
436
437    async fn plain_text_summary_fallback(
438        &self,
439        provider: &zeph_llm::any::AnyProvider,
440        chat_messages: &[Message],
441        timeout_dur: std::time::Duration,
442    ) -> Option<zeph_memory::StructuredSummary> {
443        match tokio::time::timeout(timeout_dur, provider.chat(chat_messages)).await {
444            Ok(Ok(plain)) => Some(zeph_memory::StructuredSummary {
445                summary: plain,
446                key_facts: vec![],
447                entities: vec![],
448            }),
449            Ok(Err(e)) => {
450                tracing::warn!("shutdown summary: plain LLM fallback failed: {e:#}");
451                None
452            }
453            Err(_) => {
454                tracing::warn!("shutdown summary: plain LLM fallback timed out");
455                None
456            }
457        }
458    }
459
460    /// Persist tombstone `ToolResult` messages for any assistant `ToolUse` parts that were written
461    /// to the DB during this session but never paired with a `ToolResult` (e.g. because stdin
462    /// closed while tool execution was in progress). Without this the next session startup strips
463    /// those assistant messages and emits orphan warnings.
464    async fn flush_orphaned_tool_use_on_shutdown(&mut self) {
465        use zeph_llm::provider::{MessagePart, Role};
466
467        // Walk messages in reverse: if the last assistant message (ignoring any trailing
468        // system messages) has ToolUse parts and is NOT immediately followed by a user
469        // message whose ToolResult ids cover those ToolUse ids, persist tombstones.
470        let msgs = &self.msg.messages;
471        // Find last assistant message index.
472        let Some(asst_idx) = msgs.iter().rposition(|m| m.role == Role::Assistant) else {
473            return;
474        };
475        let asst_msg = &msgs[asst_idx];
476        let tool_use_ids: Vec<(&str, &str, &serde_json::Value)> = asst_msg
477            .parts
478            .iter()
479            .filter_map(|p| {
480                if let MessagePart::ToolUse { id, name, input } = p {
481                    Some((id.as_str(), name.as_str(), input))
482                } else {
483                    None
484                }
485            })
486            .collect();
487        if tool_use_ids.is_empty() {
488            return;
489        }
490
491        // Check whether a following user message already pairs all ToolUse ids.
492        let paired_ids: std::collections::HashSet<&str> = msgs
493            .get(asst_idx + 1..)
494            .into_iter()
495            .flatten()
496            .filter(|m| m.role == Role::User)
497            .flat_map(|m| m.parts.iter())
498            .filter_map(|p| {
499                if let MessagePart::ToolResult { tool_use_id, .. } = p {
500                    Some(tool_use_id.as_str())
501                } else {
502                    None
503                }
504            })
505            .collect();
506
507        let unpaired: Vec<zeph_llm::provider::ToolUseRequest> = tool_use_ids
508            .iter()
509            .filter(|(id, _, _)| !paired_ids.contains(*id))
510            .map(|(id, name, input)| zeph_llm::provider::ToolUseRequest {
511                id: (*id).to_owned(),
512                name: (*name).to_owned().into(),
513                input: (*input).clone(),
514            })
515            .collect();
516
517        if unpaired.is_empty() {
518            return;
519        }
520
521        tracing::info!(
522            count = unpaired.len(),
523            "shutdown: persisting tombstone ToolResults for unpaired in-flight tool calls"
524        );
525        self.persist_cancelled_tool_results(&unpaired).await;
526    }
527
528    /// Generate and store a lightweight session summary at shutdown when no hard compaction fired.
529    ///
530    /// Guards:
531    /// - `shutdown_summary` config must be enabled
532    /// - `conversation_id` must be set (memory must be attached)
533    /// - no existing session summary in the store (primary guard — resilient to failed Qdrant writes)
534    /// - at least `shutdown_summary_min_messages` user-turn messages in history
535    ///
536    /// All errors are logged as warnings and swallowed — shutdown must never fail.
537    async fn maybe_store_shutdown_summary(&mut self) {
538        if !self.services.memory.compaction.shutdown_summary {
539            return;
540        }
541        let Some(memory) = self.services.memory.persistence.memory.clone() else {
542            return;
543        };
544        let Some(conversation_id) = self.services.memory.persistence.conversation_id else {
545            return;
546        };
547
548        // Primary guard: check if a summary already exists (handles failed Qdrant writes too).
549        match memory.has_session_summary(conversation_id).await {
550            Ok(true) => {
551                tracing::debug!("shutdown summary: session already has a summary, skipping");
552                return;
553            }
554            Ok(false) => {}
555            Err(e) => {
556                tracing::warn!("shutdown summary: failed to check existing summary: {e:#}");
557                return;
558            }
559        }
560
561        // Count user-turn messages only (skip system prompt at index 0).
562        let user_count = self
563            .msg
564            .messages
565            .iter()
566            .skip(1)
567            .filter(|m| m.role == Role::User)
568            .count();
569        if user_count
570            < self
571                .services
572                .memory
573                .compaction
574                .shutdown_summary_min_messages
575        {
576            tracing::debug!(
577                user_count,
578                min = self
579                    .services
580                    .memory
581                    .compaction
582                    .shutdown_summary_min_messages,
583                "shutdown summary: too few user messages, skipping"
584            );
585            return;
586        }
587
588        // TUI status — send errors silently ignored (TUI may already be gone at shutdown).
589        let _ = self.channel.send_status("Saving session summary...").await;
590
591        // Collect last N messages (skip system prompt at index 0).
592        let max = self
593            .services
594            .memory
595            .compaction
596            .shutdown_summary_max_messages;
597        if max == 0 {
598            tracing::debug!("shutdown summary: max_messages=0, skipping");
599            return;
600        }
601        let non_system: Vec<_> = self.msg.messages.iter().skip(1).collect();
602        let slice = if non_system.len() > max {
603            &non_system[non_system.len() - max..]
604        } else {
605            &non_system[..]
606        };
607
608        let msgs_for_prompt: Vec<(zeph_memory::MessageId, String, String)> = slice
609            .iter()
610            .map(|m| {
611                let role = match m.role {
612                    Role::User => "user".to_owned(),
613                    Role::Assistant => "assistant".to_owned(),
614                    Role::System => "system".to_owned(),
615                };
616                (zeph_memory::MessageId(0), role, m.content.clone())
617            })
618            .collect();
619
620        let prompt = zeph_memory::build_summarization_prompt(&msgs_for_prompt);
621        let chat_messages = vec![Message {
622            role: Role::User,
623            content: prompt,
624            parts: vec![],
625            metadata: MessageMetadata::default(),
626        }];
627
628        let Some(structured) = self.call_llm_for_session_summary(&chat_messages).await else {
629            let _ = self.channel.send_status("").await;
630            return;
631        };
632
633        if let Err(e) = memory
634            .store_shutdown_summary(conversation_id, &structured.summary, &structured.key_facts)
635            .await
636        {
637            tracing::warn!("shutdown summary: storage failed: {e:#}");
638        } else {
639            tracing::info!(
640                conversation_id = conversation_id.0,
641                "shutdown summary stored"
642            );
643        }
644
645        // Clear TUI status.
646        let _ = self.channel.send_status("").await;
647    }
648
649    /// Gracefully shut down the agent and persist state.
650    ///
651    /// Performs the following cleanup:
652    ///
653    /// 1. **Message persistence** — Deferred database writes (hide/summary operations)
654    ///    are flushed to memory or disk
655    /// 2. **Provider state** — LLM router state (e.g., Thompson sampling counters) is saved
656    ///    to the vault
657    /// 3. **Sub-agents** — All active sub-agent tasks are terminated
658    /// 4. **MCP servers** — All connected Model Context Protocol servers are shut down
659    /// 5. **Metrics finalization** — Compaction metrics and session metrics are recorded
660    /// 6. **Memory finalization** — Vector stores and semantic indices are flushed
661    /// 7. **Skill state** — Self-learning engine saves evolved skill definitions
662    ///
663    /// Call this before dropping the agent to ensure no data loss.
664    #[tracing::instrument(name = "core.agent.shutdown", skip_all, level = "debug")]
665    pub async fn shutdown(&mut self) {
666        let _ = self.channel.send_status("Shutting down...").await;
667
668        // CRIT-1: persist Thompson state accumulated during this session.
669        self.provider.save_router_state().await;
670
671        // Persist AdaptOrch Beta-arm table alongside Thompson state.
672        if let Some(ref advisor) = self.services.orchestration.topology_advisor
673            && let Err(e) = advisor.save()
674        {
675            tracing::warn!(error = %e, "adaptorch: failed to persist state");
676        }
677
678        if let Some(ref mut mgr) = self.services.orchestration.subagent_manager {
679            mgr.shutdown_all();
680        }
681
682        if let Some(ref manager) = self.services.mcp.manager {
683            manager.shutdown_all_shared().await;
684        }
685
686        // Finalize compaction trajectory: push the last open segment into the Vec.
687        // This segment would otherwise only be pushed when the next hard compaction fires,
688        // which never happens at session end.
689        if let Some(turns) = self.context_manager.turns_since_last_hard_compaction() {
690            self.update_metrics(|m| {
691                m.compaction_turns_after_hard.push(turns);
692            });
693            self.context_manager
694                .set_turns_since_last_hard_compaction(None);
695        }
696
697        if let Some(ref tx) = self.runtime.metrics.metrics_tx {
698            let m = tx.borrow();
699            if m.filter_applications > 0 {
700                #[allow(clippy::cast_precision_loss)]
701                let pct = if m.filter_raw_tokens > 0 {
702                    m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
703                } else {
704                    0.0
705                };
706                tracing::info!(
707                    raw_tokens = m.filter_raw_tokens,
708                    saved_tokens = m.filter_saved_tokens,
709                    applications = m.filter_applications,
710                    "tool output filtering saved ~{} tokens ({pct:.0}%)",
711                    m.filter_saved_tokens,
712                );
713            }
714            if m.compaction_hard_count > 0 {
715                tracing::info!(
716                    hard_compactions = m.compaction_hard_count,
717                    turns_after_hard = ?m.compaction_turns_after_hard,
718                    "hard compaction trajectory"
719                );
720            }
721        }
722
723        // Flush tombstone ToolResults for any assistant ToolUse that was persisted but never
724        // paired with a ToolResult (e.g. stdin EOF mid-execution). Without this the next session
725        // startup strips the orphaned ToolUse and emits warnings.
726        self.flush_orphaned_tool_use_on_shutdown().await;
727
728        // Signal the experiment CancellationToken first so the task can clean up gracefully,
729        // then abort the handle to guarantee it does not outlive the agent regardless.
730        if let Some(ref token) = self.services.experiments.cancel {
731            token.cancel();
732        }
733        if let Some(h) = self.services.experiments.handle.take() {
734            h.abort();
735        }
736
737        // Signal cooperative cancellation to the graph-extraction background task before the
738        // hard abort below. This lets the task exit at a clean checkpoint (e.g. after the
739        // community-refresh select arm fires) rather than being cut mid-write.
740        if let Some(memory) = self.services.memory.persistence.memory.as_ref() {
741            memory.cancel_graph_extraction();
742        }
743
744        // Forcibly abort in-flight Enrichment and Telemetry tasks tracked by the supervisor.
745        self.runtime.lifecycle.supervisor.abort_all();
746
747        // Abort background task handles not tracked by BackgroundSupervisor.
748        // Per the Await Discipline rule, fire-and-forget handles must be aborted on shutdown.
749        if let Some(h) = self.services.compression.pending_task_goal.take() {
750            h.abort();
751        }
752        if let Some(h) = self.services.compression.pending_sidequest_result.take() {
753            h.abort();
754        }
755        if let Some(h) = self.services.compression.pending_subgoal.take() {
756            h.abort();
757        }
758
759        // Abort learning tasks (JoinSet detached at turn boundaries but not on shutdown).
760        self.services.learning_engine.learning_tasks.abort_all();
761
762        // Await the AutoSkill trace extraction task so it is not silently dropped.
763        // Bounded to avoid hanging shutdown when the LLM call inside the task stalls.
764        if let Some(h) = self.services.learning_engine.trace_extraction_handle.take() {
765            let deadline = std::time::Duration::from_mins(2);
766            match tokio::time::timeout(deadline, h).await {
767                Ok(Ok(())) => {}
768                Ok(Err(e)) => tracing::warn!("trace_extraction: task panicked at shutdown: {e}"),
769                Err(_) => tracing::warn!(
770                    "trace_extraction: timed out at shutdown ({}s), aborting",
771                    deadline.as_secs()
772                ),
773            }
774        }
775
776        // Abort the heuristic promotion loop (periodic task; abort is safe because
777        // promotion_already_evaluated ensures idempotent retry on next startup).
778        if let Some(h) = self
779            .services
780            .learning_engine
781            .heuristic_promotion_handle
782            .take()
783        {
784            h.abort();
785        }
786
787        // Drain pending shadow sentinel DB writes before final teardown.
788        if let Some(ref sentinel) = self.services.security.shadow_sentinel {
789            sentinel.drain_pending().await;
790        }
791
792        // Allow cancelled tasks to release their HTTP connections before the summary LLM call.
793        // abort_all() posts cancellation signals but does not drain tasks; aborted futures only
794        // observe cancellation at their next .await point. Without yielding here the summary
795        // call races in-flight enrichment HTTP connections for the same API rate-limit budget.
796        for _ in 0..4 {
797            tokio::task::yield_now().await;
798        }
799
800        self.maybe_store_shutdown_summary().await;
801        self.maybe_store_session_digest().await;
802
803        tracing::info!("agent shutdown complete");
804    }
805
806    /// Run the chat loop, receiving messages via the channel until EOF or shutdown.
807    ///
808    /// # Errors
809    ///
810    /// Returns an error if channel I/O or LLM communication fails.
811    /// Refresh sub-agent metrics snapshot for the TUI metrics panel.
812    fn refresh_subagent_metrics(&mut self) {
813        let Some(ref mgr) = self.services.orchestration.subagent_manager else {
814            return;
815        };
816        let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
817            .statuses()
818            .into_iter()
819            .map(|(id, s)| {
820                let def = mgr.agents_def(&id);
821                crate::metrics::SubAgentMetrics {
822                    name: def.map_or_else(|| id[..8.min(id.len())].to_owned(), |d| d.name.clone()),
823                    id: id.clone(),
824                    state: format!("{:?}", s.state).to_lowercase(),
825                    turns_used: s.turns_used,
826                    max_turns: def.map_or(20, |d| d.permissions.max_turns),
827                    background: def.is_some_and(|d| d.permissions.background),
828                    elapsed_secs: s.started_at.elapsed().as_secs(),
829                    permission_mode: def.map_or_else(String::new, |d| {
830                        use zeph_subagent::def::PermissionMode;
831                        match d.permissions.permission_mode {
832                            PermissionMode::AcceptEdits => "accept_edits".into(),
833                            PermissionMode::DontAsk => "dont_ask".into(),
834                            PermissionMode::BypassPermissions => "bypass_permissions".into(),
835                            PermissionMode::Plan => "plan".into(),
836                            _ => String::new(),
837                        }
838                    }),
839                    transcript_dir: mgr
840                        .agent_transcript_dir(&id)
841                        .map(|p| p.to_string_lossy().into_owned()),
842                }
843            })
844            .collect();
845        self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
846    }
847
848    /// Non-blocking poll: notify the user when background sub-agents complete.
849    async fn notify_completed_subagents(&mut self) -> Result<(), error::AgentError> {
850        let completed = self.poll_subagents().await;
851        for (task_id, result) in completed {
852            let notice = if result.is_empty() {
853                format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
854            } else {
855                format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
856            };
857            if let Err(e) = self.channel.send(&notice).await {
858                tracing::warn!(error = %e, "failed to send sub-agent completion notice");
859            }
860        }
861        Ok(())
862    }
863
864    /// Run the agent main loop.
865    ///
866    /// # Errors
867    ///
868    /// Returns an error if the channel, LLM provider, or tool execution encounters a fatal error.
869    #[tracing::instrument(name = "core.agent.run", skip_all, level = "debug", err)]
870    #[allow(clippy::too_many_lines)] // run loop is inherently large; each branch is independent
871    pub async fn run(&mut self) -> Result<(), error::AgentError>
872    where
873        C: 'static,
874    {
875        if let Some(mut rx) = self.runtime.lifecycle.warmup_ready.take()
876            && !*rx.borrow()
877        {
878            let _ = rx.changed().await;
879            if !*rx.borrow() {
880                tracing::warn!("model warmup did not complete successfully");
881            }
882        }
883
884        // Restore the last-used provider preference before any user interaction (#3308).
885        self.restore_channel_provider().await;
886
887        // Load the session digest once at session start for context injection.
888        self.load_and_cache_session_digest().await;
889        self.maybe_send_resume_recap().await;
890
891        // AutoSkill A6: start periodic heuristic promotion task at session startup so it runs
892        // even when the main loop exits early due to an error (spec 061). The function guards
893        // against double-spawn via a heuristic_promotion_handle.is_some() check.
894        self.maybe_start_heuristic_promotion();
895
896        loop {
897            self.apply_provider_override();
898            self.check_tool_refresh().await;
899            self.process_pending_elicitations().await;
900            self.refresh_subagent_metrics();
901            self.notify_completed_subagents().await?;
902            self.drain_channel();
903
904            let (text, image_parts) = if let Some(queued) = self.msg.message_queue.pop_front() {
905                self.notify_queue_count().await;
906                if queued.raw_attachments.is_empty() {
907                    (queued.text, queued.image_parts)
908                } else {
909                    let msg = crate::channel::ChannelMessage {
910                        text: queued.text,
911                        attachments: queued.raw_attachments,
912                        is_guest_context: false,
913                        is_from_bot: false,
914                    };
915                    self.resolve_message(msg).await
916                }
917            } else {
918                match self.next_event().await? {
919                    None | Some(LoopEvent::Shutdown) => break,
920                    Some(LoopEvent::SkillReload) => {
921                        self.reload_skills().await;
922                        continue;
923                    }
924                    Some(LoopEvent::InstructionReload) => {
925                        self.reload_instructions().await;
926                        continue;
927                    }
928                    Some(LoopEvent::ConfigReload) => {
929                        self.reload_config();
930                        continue;
931                    }
932                    Some(LoopEvent::UpdateNotification(msg)) => {
933                        if let Err(e) = self.channel.send(&msg).await {
934                            tracing::warn!("failed to send update notification: {e}");
935                        }
936                        continue;
937                    }
938                    Some(LoopEvent::ExperimentCompleted(msg)) => {
939                        self.services.experiments.cancel = None;
940                        self.services.experiments.handle = None;
941                        if let Err(e) = self.channel.send(&msg).await {
942                            tracing::warn!("failed to send experiment completion: {e}");
943                        }
944                        continue;
945                    }
946                    Some(LoopEvent::ScheduledTask(prompt)) => {
947                        let text = format!("{SCHEDULED_TASK_PREFIX}{prompt}");
948                        let msg = crate::channel::ChannelMessage {
949                            text,
950                            attachments: Vec::new(),
951                            is_guest_context: false,
952                            is_from_bot: false,
953                        };
954                        self.drain_channel();
955                        self.resolve_message(msg).await
956                    }
957                    Some(LoopEvent::TaskInjected(injection)) => {
958                        if let Some(ref mut ls) = self.runtime.lifecycle.user_loop {
959                            ls.iteration += 1;
960                            tracing::info!(iteration = ls.iteration, "loop: tick");
961                        }
962                        let msg = crate::channel::ChannelMessage {
963                            text: injection.prompt,
964                            attachments: Vec::new(),
965                            is_guest_context: false,
966                            is_from_bot: false,
967                        };
968                        self.drain_channel();
969                        self.resolve_message(msg).await
970                    }
971                    Some(LoopEvent::FileChanged(event)) => {
972                        self.handle_file_changed(event).await;
973                        continue;
974                    }
975                    Some(LoopEvent::AutonomousTick) => {
976                        if let Err(e) = self.run_autonomous_turn().await {
977                            tracing::warn!(error = %e, "autonomous turn error");
978                        }
979                        continue;
980                    }
981                    Some(LoopEvent::Message(msg)) => {
982                        self.services.session.is_guest_context = msg.is_guest_context;
983                        self.drain_channel();
984                        self.resolve_message(msg).await
985                    }
986                }
987            };
988
989            let trimmed = text.trim();
990
991            // M3: extract flagged URLs from all slash commands before any registry dispatch,
992            // so `/skill install <url>` and similar commands populate user_provided_urls.
993            if trimmed.starts_with('/') {
994                let slash_urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
995                if !slash_urls.is_empty() {
996                    self.services
997                        .security
998                        .user_provided_urls
999                        .write()
1000                        .extend(slash_urls);
1001                }
1002            }
1003
1004            // Registry dispatch: two-phase command dispatch.
1005            //
1006            // Phase 1 (session/debug): handlers that need sink + debug + messages but NOT agent.
1007            // Phase 2 (agent): handlers that need &mut Agent directly; use null sentinels for
1008            // the other CommandContext fields to satisfy the type but avoid borrow conflicts.
1009            //
1010            // STRUCTURAL NOTE (C4 — borrow-checker constraint, not deferred by oversight):
1011            // A `TurnState<'a, C>` struct grouping disjoint `&mut Agent<C>` sub-fields would
1012            // eliminate the LIFO-sentinel ordering below. The obstacle: `AgentAccess` is
1013            // implemented on `Agent<C>` itself (see `agent_access_impl.rs`), which accesses
1014            // fields like `memory_state`, `providers`, `mcp`, and `skill_state`. Those fields
1015            // overlap with what a `TurnState` would need to borrow, so `AgentBackend::Real`
1016            // cannot simultaneously hold `&mut Agent` while `TurnState` holds `&mut Agent.providers`.
1017            // The fix requires splitting `Agent<C>` fields into two disjoint sub-structs and moving
1018            // `AgentAccess` to the sub-struct that is disjoint from `TurnState`'s borrow set.
1019            // That restructuring touches `agent_access_impl.rs`, `state.rs`, `builder.rs`, all
1020            // command handlers, and the binary crate — estimated > 300 lines across > 5 files.
1021            // Track as a multi-PR refactor; the current sentinel pattern is correct and safe.
1022            //
1023            // Drop-order rules enforced here:
1024            //   - `sink_adapter` / `null_agent` declared before the registry block → dropped after.
1025            //   - Phase-2 sentinels declared before `ctx` → dropped after `ctx`.
1026            let session_impl = command_context_impls::SessionAccessImpl {
1027                supports_exit: self.channel.supports_exit(),
1028            };
1029            let mut messages_impl = command_context_impls::MessageAccessImpl {
1030                msg: &mut self.msg,
1031                tool_state: &mut self.services.tool_state,
1032                providers: &mut self.runtime.providers,
1033                metrics: &self.runtime.metrics,
1034                security: &mut self.services.security,
1035                tool_orchestrator: &mut self.tool_orchestrator,
1036            };
1037            // sink_adapter declared before reg so it is dropped after reg (LIFO).
1038            let mut sink_adapter = crate::channel::ChannelSinkAdapter(&mut self.channel);
1039            // null_agent must be declared before reg so it lives longer (LIFO drop order).
1040            let mut null_agent = zeph_commands::NullAgent;
1041            let registry_handled = {
1042                use zeph_commands::CommandRegistry;
1043                use zeph_commands::handlers::debug::{
1044                    DebugDumpCommand, DumpFormatCommand, LogCommand,
1045                };
1046                use zeph_commands::handlers::help::HelpCommand;
1047                use zeph_commands::handlers::session::{
1048                    ClearCommand, ClearQueueCommand, ExitCommand, QuitCommand, ResetCommand,
1049                };
1050
1051                let mut reg = CommandRegistry::new();
1052                reg.register(ExitCommand);
1053                reg.register(QuitCommand);
1054                reg.register(ClearCommand);
1055                reg.register(ResetCommand);
1056                reg.register(ClearQueueCommand);
1057                reg.register(LogCommand);
1058                reg.register(DebugDumpCommand);
1059                reg.register(DumpFormatCommand);
1060                reg.register(HelpCommand);
1061                #[cfg(test)]
1062                reg.register(test_stubs::TestErrorCommand);
1063
1064                let mut ctx = zeph_commands::CommandContext {
1065                    sink: &mut sink_adapter,
1066                    debug: &mut self.runtime.debug,
1067                    messages: &mut messages_impl,
1068                    session: &session_impl,
1069                    agent: &mut null_agent,
1070                };
1071                reg.dispatch(&mut ctx, trimmed).await
1072            };
1073            let session_reg_missed = registry_handled.is_none();
1074            match self
1075                .apply_dispatch_result(registry_handled, trimmed, false)
1076                .await
1077            {
1078                DispatchFlow::Break => break,
1079                DispatchFlow::Continue => continue,
1080                DispatchFlow::Fallthrough => {
1081                    // Not handled by the session/debug registry; try agent-command registry.
1082                }
1083            }
1084
1085            // Agent-command registry: handlers access Agent<C> directly.
1086            // Null sentinels declared here so they outlive ctx regardless of whether the `if`
1087            // block is entered. `ctx` borrows both `self` and the sentinels; it must drop before
1088            // any subsequent `self.channel.*` calls. Because Rust drops in LIFO order, the
1089            // sentinels here will outlive ctx (ctx is declared later, inside the block).
1090            let mut agent_null_debug = command_context_impls::NullDebugAccess;
1091            let mut agent_null_messages = command_context_impls::NullMessageAccess;
1092            let agent_null_session = command_context_impls::NullSessionAccess;
1093            let mut agent_null_sink = zeph_commands::NullSink;
1094            let agent_result: Option<
1095                Result<zeph_commands::CommandOutput, zeph_commands::CommandError>,
1096            > = if session_reg_missed {
1097                use zeph_commands::CommandRegistry;
1098                use zeph_commands::handlers::{
1099                    acp::AcpCommand,
1100                    agent_cmd::AgentCommand,
1101                    agents_fleet::AgentsFleetCommand,
1102                    compaction::{CompactCommand, NewConversationCommand, RecapCommand},
1103                    experiment::ExperimentCommand,
1104                    goal::GoalCommand,
1105                    loop_cmd::LoopCommand,
1106                    lsp::LspCommand,
1107                    mcp::McpCommand,
1108                    memory::{GraphCommand, GuidelinesCommand, MemoryCommand},
1109                    misc::{CacheStatsCommand, ImageCommand, NotifyTestCommand},
1110                    model::{ModelCommand, ProviderCommand},
1111                    plan::PlanCommand,
1112                    plugins::PluginsCommand,
1113                    policy::PolicyCommand,
1114                    scheduler::SchedulerCommand,
1115                    skill::{FeedbackCommand, SkillCommand, SkillsCommand},
1116                    status::{FocusCommand, GuardrailCommand, SideQuestCommand, StatusCommand},
1117                    trajectory::{ScopeCommand, TrajectoryCommand},
1118                };
1119
1120                let mut agent_reg = CommandRegistry::new();
1121                agent_reg.register(MemoryCommand);
1122                agent_reg.register(GraphCommand);
1123                agent_reg.register(GuidelinesCommand);
1124                agent_reg.register(ModelCommand);
1125                agent_reg.register(ProviderCommand);
1126                // Phase 6 migrations: /skill, /skills, /feedback use clone-before-await pattern.
1127                agent_reg.register(SkillCommand);
1128                agent_reg.register(SkillsCommand);
1129                agent_reg.register(FeedbackCommand);
1130                agent_reg.register(McpCommand);
1131                agent_reg.register(PolicyCommand);
1132                agent_reg.register(SchedulerCommand);
1133                agent_reg.register(LspCommand);
1134                // Phase 4 migrations (Send-safe commands):
1135                agent_reg.register(CacheStatsCommand);
1136                agent_reg.register(ImageCommand);
1137                agent_reg.register(NotifyTestCommand);
1138                agent_reg.register(StatusCommand);
1139                agent_reg.register(GuardrailCommand);
1140                agent_reg.register(FocusCommand);
1141                agent_reg.register(SideQuestCommand);
1142                agent_reg.register(AgentCommand);
1143                agent_reg.register(AgentsFleetCommand);
1144                // Phase 5 migrations (Send-compatible):
1145                agent_reg.register(CompactCommand);
1146                agent_reg.register(NewConversationCommand);
1147                agent_reg.register(RecapCommand);
1148                agent_reg.register(ExperimentCommand);
1149                agent_reg.register(PlanCommand);
1150                agent_reg.register(LoopCommand);
1151                agent_reg.register(PluginsCommand);
1152                agent_reg.register(AcpCommand);
1153                #[cfg(feature = "cocoon")]
1154                agent_reg.register(zeph_commands::handlers::cocoon::CocoonCommand);
1155                agent_reg.register(TrajectoryCommand);
1156                agent_reg.register(ScopeCommand);
1157                agent_reg.register(GoalCommand);
1158
1159                let mut ctx = zeph_commands::CommandContext {
1160                    sink: &mut agent_null_sink,
1161                    debug: &mut agent_null_debug,
1162                    messages: &mut agent_null_messages,
1163                    session: &agent_null_session,
1164                    agent: self,
1165                };
1166                // self is reborrowed; ctx drops at end of this block.
1167                agent_reg.dispatch(&mut ctx, trimmed).await
1168            } else {
1169                None
1170            };
1171            // self.channel is available again here (ctx borrow dropped above).
1172
1173            // S1 fix: drain any pending autonomous session start queued by handle_goal.
1174            // handle_goal runs inside Box::pin(async move) and cannot borrow &mut self directly,
1175            // so it writes to pending_start_arc. We consume it here where &mut self is free.
1176            if let Some((cancelled_id, new_id)) = self.services.autonomous.flush_pending_start() {
1177                if let Some(cid) = cancelled_id {
1178                    tracing::info!(
1179                        goal_id = cid,
1180                        "autonomous: previous session cancelled for new goal"
1181                    );
1182                }
1183                self.sync_registry_entry();
1184                tracing::info!(goal_id = new_id, "autonomous: session started");
1185            }
1186
1187            // Post-dispatch learning hook for `/skill reject` / `/feedback` is triggered
1188            // inside apply_dispatch_result when with_learning = true.
1189            match self
1190                .apply_dispatch_result(agent_result, trimmed, true)
1191                .await
1192            {
1193                DispatchFlow::Break => break,
1194                DispatchFlow::Continue => continue,
1195                DispatchFlow::Fallthrough => {
1196                    // Not handled by agent registry; fall through to existing dispatch.
1197                }
1198            }
1199
1200            match self.handle_builtin_command(trimmed) {
1201                Some(true) => break,
1202                Some(false) => continue,
1203                None => {}
1204            }
1205
1206            self.process_user_message(text, image_parts).await?;
1207        }
1208
1209        // autoDream: run background memory consolidation if conditions are met (#2697).
1210        // Runs with a timeout — partial state is acceptable for MVP.
1211        self.maybe_autodream().await;
1212
1213        // AutoSkill A1: extract skill candidates from the completed session trace (spec 056).
1214        self.maybe_extract_skills_from_trace().await;
1215
1216        // Flush trace collector on normal exit (C-04: Drop handles error/panic paths).
1217        if let Some(ref mut tc) = self.runtime.debug.trace_collector {
1218            tc.finish();
1219        }
1220
1221        Ok(())
1222    }
1223
1224    /// Dispatch a slash-command registry result and flush the channel.
1225    ///
1226    /// Returns [`DispatchFlow::Break`] on exit, [`DispatchFlow::Continue`] when handled, or
1227    /// [`DispatchFlow::Fallthrough`] when `result` is `None`.
1228    /// When `with_learning` is `true`, triggers the post-command learning hook for `Message` output.
1229    async fn apply_dispatch_result(
1230        &mut self,
1231        result: Option<Result<zeph_commands::CommandOutput, zeph_commands::CommandError>>,
1232        command: &str,
1233        with_learning: bool,
1234    ) -> DispatchFlow {
1235        match result {
1236            Some(Ok(zeph_commands::CommandOutput::Exit)) => {
1237                let _ = self.channel.flush_chunks().await;
1238                DispatchFlow::Break
1239            }
1240            Some(Ok(zeph_commands::CommandOutput::Message(msg))) => {
1241                let _ = self.channel.send(&msg).await;
1242                let _ = self.channel.flush_chunks().await;
1243                if with_learning {
1244                    self.maybe_trigger_post_command_learning(command).await;
1245                }
1246                DispatchFlow::Continue
1247            }
1248            Some(Ok(_)) => {
1249                let _ = self.channel.flush_chunks().await;
1250                DispatchFlow::Continue
1251            }
1252            Some(Err(e)) => {
1253                let _ = self.channel.send(&e.to_string()).await;
1254                let _ = self.channel.flush_chunks().await;
1255                tracing::warn!(command = %command, error = %e.0, "slash command failed");
1256                DispatchFlow::Continue
1257            }
1258            None => DispatchFlow::Fallthrough,
1259        }
1260    }
1261
1262    /// Apply any pending LLM provider override from ACP `set_session_config_option`.
1263    fn apply_provider_override(&mut self) {
1264        if let Some(ref slot) = self.runtime.providers.provider_override
1265            && let Some(new_provider) = slot.write().take()
1266        {
1267            tracing::debug!(provider = new_provider.name(), "ACP model override applied");
1268            self.provider = new_provider;
1269        }
1270    }
1271
1272    /// Poll all event sources and return the next [`LoopEvent`].
1273    ///
1274    /// Returns `None` when the inbound channel closes (graceful shutdown).
1275    ///
1276    /// # Errors
1277    ///
1278    /// Propagates channel receive errors.
1279    #[tracing::instrument(name = "core.agent.next_event", skip_all, level = "debug", err)]
1280    async fn next_event(&mut self) -> Result<Option<LoopEvent>, error::AgentError> {
1281        let event = tokio::select! {
1282            result = self.channel.recv() => {
1283                return Ok(result?.map(LoopEvent::Message));
1284            }
1285            () = shutdown_signal(&mut self.runtime.lifecycle.shutdown) => {
1286                tracing::info!("shutting down");
1287                LoopEvent::Shutdown
1288            }
1289            Some(_) = recv_optional(&mut self.services.skill.skill_reload_rx) => {
1290                LoopEvent::SkillReload
1291            }
1292            Some(_) = recv_optional(&mut self.runtime.instructions.reload_rx) => {
1293                LoopEvent::InstructionReload
1294            }
1295            Some(_) = recv_optional(&mut self.runtime.lifecycle.config_reload_rx) => {
1296                LoopEvent::ConfigReload
1297            }
1298            Some(msg) = recv_optional(&mut self.runtime.lifecycle.update_notify_rx) => {
1299                LoopEvent::UpdateNotification(msg)
1300            }
1301            Some(msg) = recv_optional(&mut self.services.experiments.notify_rx) => {
1302                LoopEvent::ExperimentCompleted(msg)
1303            }
1304            Some(prompt) = recv_optional(&mut self.runtime.lifecycle.custom_task_rx) => {
1305                tracing::info!("scheduler: injecting custom task as agent turn");
1306                LoopEvent::ScheduledTask(prompt)
1307            }
1308            () = async {
1309                if let Some(ref mut ls) = self.runtime.lifecycle.user_loop {
1310                    if ls.cancel_tx.is_cancelled() {
1311                        std::future::pending::<()>().await;
1312                    } else {
1313                        ls.interval.tick().await;
1314                    }
1315                } else {
1316                    std::future::pending::<()>().await;
1317                }
1318            } => {
1319                // Re-check user_loop after the tick — /loop stop may have fired between the
1320                // interval firing and this arm executing. Returning Ok(None) causes the caller
1321                // to `continue` without injecting a stale or empty prompt.
1322                let Some(ls) = self.runtime.lifecycle.user_loop.as_ref() else {
1323                    return Ok(None);
1324                };
1325                if ls.cancel_tx.is_cancelled() {
1326                    self.runtime.lifecycle.user_loop = None;
1327                    return Ok(None);
1328                }
1329                let prompt = ls.prompt.clone();
1330                LoopEvent::TaskInjected(task_injection::TaskInjection { prompt })
1331            }
1332            Some(event) = recv_optional(&mut self.runtime.lifecycle.file_changed_rx) => {
1333                LoopEvent::FileChanged(event)
1334            }
1335            // Autonomous goal tick: fires when a running session is active.
1336            () = self.services.autonomous.next_tick(),
1337                if self.services.autonomous.should_tick() => {
1338                LoopEvent::AutonomousTick
1339            }
1340        };
1341        Ok(Some(event))
1342    }
1343
1344    #[tracing::instrument(name = "core.agent.resolve_message", skip_all, level = "debug")]
1345    async fn resolve_message(
1346        &self,
1347        msg: crate::channel::ChannelMessage,
1348    ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
1349        use crate::channel::{Attachment, AttachmentKind};
1350        use zeph_llm::provider::{ImageData, MessagePart};
1351
1352        let text_base = msg.text.clone();
1353
1354        let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
1355            .attachments
1356            .into_iter()
1357            .partition(|a| a.kind == AttachmentKind::Audio);
1358
1359        tracing::debug!(
1360            audio = audio_attachments.len(),
1361            has_stt = self.runtime.providers.stt.is_some(),
1362            "resolve_message attachments"
1363        );
1364
1365        let text = if !audio_attachments.is_empty()
1366            && let Some(stt) = self.runtime.providers.stt.as_ref()
1367        {
1368            let mut transcribed_parts = Vec::new();
1369            for attachment in &audio_attachments {
1370                if attachment.data.len() > MAX_AUDIO_BYTES {
1371                    tracing::warn!(
1372                        size = attachment.data.len(),
1373                        max = MAX_AUDIO_BYTES,
1374                        "audio attachment exceeds size limit, skipping"
1375                    );
1376                    continue;
1377                }
1378                match stt
1379                    .transcribe(&attachment.data, attachment.filename.as_deref())
1380                    .await
1381                {
1382                    Ok(result) => {
1383                        tracing::info!(
1384                            len = result.text.len(),
1385                            language = ?result.language,
1386                            "audio transcribed"
1387                        );
1388                        transcribed_parts.push(result.text);
1389                    }
1390                    Err(e) => {
1391                        tracing::error!(error = %e, "audio transcription failed");
1392                    }
1393                }
1394            }
1395            if transcribed_parts.is_empty() {
1396                text_base
1397            } else {
1398                let transcribed = transcribed_parts.join("\n");
1399                if text_base.is_empty() {
1400                    transcribed
1401                } else {
1402                    format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
1403                }
1404            }
1405        } else {
1406            if !audio_attachments.is_empty() {
1407                tracing::warn!(
1408                    count = audio_attachments.len(),
1409                    "audio attachments received but no STT provider configured, dropping"
1410                );
1411            }
1412            text_base
1413        };
1414
1415        let mut image_parts = Vec::new();
1416        for attachment in image_attachments {
1417            if attachment.data.len() > MAX_IMAGE_BYTES {
1418                tracing::warn!(
1419                    size = attachment.data.len(),
1420                    max = MAX_IMAGE_BYTES,
1421                    "image attachment exceeds size limit, skipping"
1422                );
1423                continue;
1424            }
1425            let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
1426            image_parts.push(MessagePart::Image(Box::new(ImageData {
1427                data: attachment.data,
1428                mime_type,
1429            })));
1430        }
1431
1432        (text, image_parts)
1433    }
1434
1435    /// Create a new [`Turn`] for the given input and advance the turn counter.
1436    ///
1437    /// Clears per-turn state that must not carry over between turns:
1438    /// - per-turn `CancellationToken` (new token for each turn)
1439    /// - per-turn URL set in `SecurityState` (cleared here; re-populated in
1440    ///   `process_user_message_inner` after security checks)
1441    fn begin_turn(&mut self, input: turn::TurnInput) -> turn::Turn {
1442        let id = turn::TurnId(self.runtime.debug.iteration_counter as u64);
1443        self.runtime.debug.iteration_counter += 1;
1444        let cancel_token = CancellationToken::new();
1445        // keep agent-wide token in sync with per-turn token — TODO(#3498): consolidate in Phase 2
1446        self.runtime.lifecycle.cancel_token = cancel_token.clone();
1447        self.services.security.user_provided_urls.write().clear();
1448        // Reset per-turn LLM request counter for the notification gate.
1449        self.runtime.lifecycle.turn_llm_requests = 0;
1450
1451        // Spec 050 §2: drain pending risk signals from executor layers before advancing.
1452        // Also advance MAGE accumulator (spec 004-16 FR-009) and ingest mapped signals.
1453        {
1454            use zeph_memory::shadow::{AuditSignalType as MageSignal, Severity as MageSev};
1455            let pending: Vec<u8> = {
1456                let mut q = self.services.security.trajectory_signal_queue.lock();
1457                std::mem::take(&mut *q)
1458            };
1459            self.services.security.mage_accumulator.advance_turn();
1460            for code in pending {
1461                self.services
1462                    .security
1463                    .trajectory
1464                    .record(crate::agent::trajectory::RiskSignal::from_code(code));
1465                // Map signal codes to MAGE AuditSignalType + Severity (spec 004-16 FR-002, FR-007).
1466                // Code 1=PolicyDeny, 6=VigilMedium, 7=VigilHigh, 2=ExfiltrationRedaction.
1467                let mage_signal: Option<(MageSignal, MageSev)> = match code {
1468                    1 => Some((MageSignal::PolicyViolation, MageSev::Medium)),
1469                    2 => Some((MageSignal::ToolChainAnomaly, MageSev::Medium)),
1470                    6 => Some((MageSignal::PromptInjectionPattern, MageSev::Medium)),
1471                    7 => Some((MageSignal::PromptInjectionPattern, MageSev::High)),
1472                    _ => None,
1473                };
1474                if let Some((sig, sev)) = mage_signal {
1475                    self.services.security.mage_accumulator.ingest(sig, sev);
1476                }
1477            }
1478        }
1479        // Spec 050 Invariant 2: advance trajectory sentinel BEFORE any gate evaluation.
1480        // F5: write auto-recover audit entry when sentinel hard-resets.
1481        if self.services.security.trajectory.advance_turn()
1482            && let Some(logger) = self.tool_orchestrator.audit_logger.clone()
1483        {
1484            let entry = zeph_tools::AuditEntry {
1485                timestamp: zeph_tools::chrono_now(),
1486                tool: "<sentinel>".to_owned().into(),
1487                command: String::new(),
1488                result: zeph_tools::AuditResult::Success,
1489                duration_ms: 0,
1490                error_category: Some("trajectory_auto_recover".to_owned()),
1491                error_domain: Some("security".to_owned()),
1492                error_phase: None,
1493                claim_source: None,
1494                mcp_server_id: None,
1495                injection_flagged: false,
1496                embedding_anomalous: false,
1497                cross_boundary_mcp_to_acp: false,
1498                adversarial_policy_decision: None,
1499                exit_code: None,
1500                truncated: false,
1501                caller_id: None,
1502                policy_match: None,
1503                correlation_id: None,
1504                vigil_risk: None,
1505                execution_env: None,
1506                resolved_cwd: None,
1507                scope_at_definition: None,
1508                scope_at_dispatch: None,
1509            };
1510            self.runtime.lifecycle.supervisor.spawn(
1511                crate::agent::agent_supervisor::TaskClass::Telemetry,
1512                "trajectory-auto-recover-audit",
1513                async move { logger.log(&entry).await },
1514            );
1515        }
1516        // Spec 050 Phase 2: reset per-turn probe counter before any tool dispatch.
1517        if let Some(ref sentinel) = self.services.security.shadow_sentinel {
1518            sentinel.advance_turn();
1519        }
1520        // Reset per-turn risk chain state so scores don't bleed across turns.
1521        if let Some(ref acc) = self.services.security.risk_chain_accumulator {
1522            acc.reset();
1523        }
1524        // Publish updated risk level to the shared slot so PolicyGateExecutor can read it.
1525        let risk_level = self.services.security.trajectory.current_risk();
1526        *self.services.security.trajectory_risk_slot.write() = u8::from(risk_level);
1527        // TUI/CLI: emit a status indicator when risk reaches High or Critical (NFR-CG-006).
1528        if let Some(alert) = self.services.security.trajectory.poll_alert() {
1529            let msg = format!(
1530                "[trajectory] Risk level: {:?} (score={:.2})",
1531                alert.level, alert.score
1532            );
1533            tracing::warn!(
1534                level = ?alert.level,
1535                score = alert.score,
1536                "trajectory sentinel alert"
1537            );
1538            if let Some(ref tx) = self.services.session.status_tx {
1539                let _ = tx.send(msg);
1540            }
1541        }
1542
1543        let context = turn::TurnContext::new(id, cancel_token, self.runtime.config.timeouts)
1544            .with_tool_allowlist(self.runtime.config.channel_tool_allowlist.clone());
1545        turn::Turn::new(context, input)
1546    }
1547
1548    /// Finalise a turn: copy accumulated timings into `MetricsState` and flush.
1549    ///
1550    /// Must be called exactly once per turn, after `process_user_message_inner` returns
1551    /// (regardless of success or error). Corresponds to the M2 resolution in the spec:
1552    /// `TurnMetrics.timings` is the single source of truth; `MetricsState.pending_timings`
1553    /// is populated from it here so the rest of the pipeline is unchanged.
1554    fn end_turn(&mut self, turn: turn::Turn) {
1555        self.runtime.metrics.pending_timings = turn.metrics.timings;
1556        self.flush_turn_timings();
1557        // Clear per-turn intent (FR-008): must not persist across turns.
1558        self.services.session.current_turn_intent = None;
1559        // Clear guest context flag: each turn is independently classified.
1560        self.services.session.is_guest_context = false;
1561        // Cancel all in-flight speculative handles at turn boundary.
1562        if let Some(ref engine) = self.services.speculation_engine {
1563            let metrics = engine.end_turn();
1564            if metrics.committed > 0 || metrics.cancelled > 0 {
1565                tracing::debug!(
1566                    committed = metrics.committed,
1567                    cancelled = metrics.cancelled,
1568                    wasted_ms = metrics.wasted_ms,
1569                    "speculation: turn boundary metrics"
1570                );
1571            }
1572        }
1573    }
1574
1575    #[tracing::instrument(
1576        name = "core.agent.process_user_message",
1577        skip_all,
1578        level = "debug",
1579        fields(turn_id),
1580        err
1581    )]
1582    async fn process_user_message(
1583        &mut self,
1584        text: String,
1585        image_parts: Vec<zeph_llm::provider::MessagePart>,
1586    ) -> Result<(), error::AgentError> {
1587        let input = turn::TurnInput::new(text, image_parts);
1588        let mut t = self.begin_turn(input);
1589
1590        let turn_idx = usize::try_from(t.id().0).unwrap_or(usize::MAX);
1591        tracing::Span::current().record("turn_id", t.id().0);
1592        // Record iteration start in trace collector (C-02: owned guard, no borrow held).
1593        self.runtime
1594            .debug
1595            .start_iteration_span(turn_idx, t.input.text.trim());
1596
1597        let result = Box::pin(self.process_user_message_inner(&mut t)).await;
1598
1599        // Close iteration span regardless of outcome (partial trace preserved on error).
1600        let span_status = if result.is_ok() {
1601            crate::debug_dump::trace::SpanStatus::Ok
1602        } else {
1603            crate::debug_dump::trace::SpanStatus::Error {
1604                message: "iteration failed".to_owned(),
1605            }
1606        };
1607        self.runtime.debug.end_iteration_span(turn_idx, span_status);
1608
1609        self.end_turn(t);
1610        result
1611    }
1612
1613    #[tracing::instrument(
1614        name = "core.agent.process_user_message_inner",
1615        skip_all,
1616        level = "debug",
1617        err
1618    )]
1619    async fn process_user_message_inner(
1620        &mut self,
1621        turn: &mut turn::Turn,
1622    ) -> Result<(), error::AgentError> {
1623        self.reap_background_tasks_and_update_metrics();
1624
1625        let tokens_before_turn = self
1626            .runtime
1627            .metrics
1628            .metrics_tx
1629            .as_ref()
1630            .map_or(0, |tx| tx.borrow().total_tokens);
1631
1632        // Drain any background shell completions that arrived since the last turn.
1633        // They are buffered in `pending_background_completions` and merged with the
1634        // real user message into a single user-role block below (N1 invariant).
1635        self.drain_background_completions();
1636
1637        self.wire_cancel_bridge(turn.cancel_token());
1638
1639        // Clone text out of Turn so we can hold both `&str` borrows and mutate turn.metrics.
1640        let text = turn.input.text.clone();
1641        let trimmed_owned = text.trim().to_owned();
1642        let trimmed = trimmed_owned.as_str();
1643
1644        // Capture current-turn intent for VIGIL gate (FR-007). Truncated to 1024 chars.
1645        // Must be set BEFORE any tool call; cleared at end_turn (FR-008).
1646        if self.services.security.vigil.is_some() {
1647            let intent_len = trimmed.floor_char_boundary(1024.min(trimmed.len()));
1648            self.services.session.current_turn_intent = Some(trimmed[..intent_len].to_owned());
1649        }
1650
1651        if let Some(result) = self.dispatch_slash_command(trimmed).await {
1652            return result;
1653        }
1654
1655        self.check_pending_rollbacks().await;
1656
1657        if self.pre_process_security(trimmed).await? {
1658            return Ok(());
1659        }
1660
1661        let t_ctx = std::time::Instant::now();
1662        tracing::debug!("turn timing: prepare_context start");
1663        self.advance_context_lifecycle_guarded(&text, trimmed).await;
1664        turn.metrics_mut().timings.prepare_context_ms =
1665            u64::try_from(t_ctx.elapsed().as_millis()).unwrap_or(u64::MAX);
1666        tracing::debug!(
1667            ms = turn.metrics_snapshot().timings.prepare_context_ms,
1668            "turn timing: prepare_context done"
1669        );
1670        // Emit projected token count so TUI can display it before the LLM call.
1671        let _ = self
1672            .channel
1673            .send_context_estimate(
1674                usize::try_from(self.runtime.providers.cached_prompt_tokens).unwrap_or(usize::MAX),
1675            )
1676            .await;
1677
1678        let image_parts = std::mem::take(&mut turn.input.image_parts);
1679        // Prepend any background completion blocks to the user text. All completions and the
1680        // user message MUST be merged into a single user-role block to satisfy the strict
1681        // user/assistant alternation rule (Anthropic Messages API — N1 invariant).
1682        let merged_text = self.build_user_message_text_with_bg_completions(&text);
1683        let user_msg = self.build_user_message(&merged_text, image_parts);
1684
1685        // Extract URLs from user input and add to user_provided_urls for grounding checks.
1686        // URL set was cleared in begin_turn; re-populate for this turn.
1687        let urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
1688        if !urls.is_empty() {
1689            self.services
1690                .security
1691                .user_provided_urls
1692                .write()
1693                .extend(urls);
1694        }
1695
1696        // Capture raw user input as goal text for A-MAC goal-conditioned write gating (#2483).
1697        // Derived from the raw input text before context assembly to avoid timing dependencies.
1698        self.services.memory.extraction.goal_text = Some(text.clone());
1699
1700        let t_persist = std::time::Instant::now();
1701        tracing::debug!("turn timing: persist_message(user) start");
1702        // Image parts intentionally excluded — base64 payloads too large for message history.
1703        self.persist_message(Role::User, &text, &[], false).await;
1704        turn.metrics_mut().timings.persist_message_ms =
1705            u64::try_from(t_persist.elapsed().as_millis()).unwrap_or(u64::MAX);
1706        tracing::debug!(
1707            ms = turn.metrics_snapshot().timings.persist_message_ms,
1708            "turn timing: persist_message(user) done"
1709        );
1710        self.push_message(user_msg);
1711
1712        // Emit pre-LLM context size so the TUI gauge is non-zero before the provider responds.
1713        let context_estimate = self.runtime.providers.cached_prompt_tokens;
1714        self.update_metrics(|m| m.context_tokens = context_estimate);
1715
1716        // llm_chat_ms and tool_exec_ms are accumulated inside call_chat_with_tools and
1717        // handle_native_tool_calls respectively via metrics.pending_timings.
1718        tracing::debug!("turn timing: process_response start");
1719        let turn_had_error = if let Err(e) = self.process_response().await {
1720            // Detach any in-flight learning tasks before mutating message state.
1721            self.services.learning_engine.learning_tasks.detach_all();
1722            tracing::error!("Response processing failed: {e:#}");
1723
1724            // Record provider failure timestamp so the next turn can skip
1725            // expensive context preparation while providers are known-down.
1726            if e.is_no_providers() {
1727                self.runtime.lifecycle.last_no_providers_at = Some(std::time::Instant::now());
1728                let backoff_secs = self.runtime.config.timeouts.no_providers_backoff_secs;
1729                tracing::warn!(
1730                    backoff_secs,
1731                    "no providers available; backing off before next turn"
1732                );
1733                tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await;
1734            }
1735
1736            let user_msg = format!("Error: {e:#}");
1737            self.channel.send(&user_msg).await?;
1738            self.msg.messages.pop();
1739            self.recompute_prompt_tokens();
1740            self.channel.flush_chunks().await?;
1741            true
1742        } else {
1743            // Detach learning tasks spawned this turn — they are fire-and-forget and must not
1744            // leak into the next turn's context.
1745            self.services.learning_engine.learning_tasks.detach_all();
1746            self.truncate_old_tool_results();
1747            // MagicDocs: spawn background doc updates if any are due (#2702).
1748            self.maybe_update_magic_docs();
1749            // Compression spectrum: fire-and-forget promotion scan (#3305).
1750            self.maybe_spawn_promotion_scan();
1751            false
1752        };
1753        tracing::debug!("turn timing: process_response done");
1754
1755        // MARCH self-check hook: runs after every successful response, including cache-hit path.
1756        if let Some(pipeline) = self.services.quality.clone() {
1757            self.run_self_check_for_turn(pipeline, turn.id().0).await;
1758        }
1759        // Flush pending response chunks and emit ResponseEnd exactly once per turn.
1760        // send() no longer emits ResponseEnd — flush_chunks() is the sole emitter.
1761        // When self-check appends a flag_marker chunk, this single call covers both
1762        // the main response and the marker, preventing the double response_end of #3243.
1763        let _ = self.channel.flush_chunks().await;
1764
1765        self.maybe_fire_completion_notification(turn, turn_had_error);
1766
1767        self.flush_goal_accounting(tokens_before_turn);
1768
1769        // Collect llm_chat_ms and tool_exec_ms from MetricsState.pending_timings (accumulated
1770        // by the tool execution chain) into turn.metrics so end_turn can flush them.
1771        // This is the Phase 1 bridging: existing code writes to pending_timings directly;
1772        // we harvest those values into Turn before end_turn overwrites pending_timings.
1773        turn.metrics_mut().timings.llm_chat_ms = self.runtime.metrics.pending_timings.llm_chat_ms;
1774        turn.metrics_mut().timings.tool_exec_ms = self.runtime.metrics.pending_timings.tool_exec_ms;
1775
1776        Ok(())
1777    }
1778
1779    /// Wire the per-turn cancellation token into the cancel bridge.
1780    ///
1781    /// The bridge translates `cancel_signal` (Notify) into a `CancellationToken` cancel so that
1782    /// channel-level abort requests propagate to the in-flight LLM call. The previous bridge task
1783    /// is aborted before a new one is spawned to prevent unbounded accumulation (#2737).
1784    fn wire_cancel_bridge(&mut self, turn_token: &tokio_util::sync::CancellationToken) {
1785        let signal = Arc::clone(&self.runtime.lifecycle.cancel_signal);
1786        let token = turn_token.clone();
1787        // Keep lifecycle.cancel_token in sync so existing code that reads it still works.
1788        self.runtime.lifecycle.cancel_token = turn_token.clone();
1789        if let Some(prev) = self.runtime.lifecycle.cancel_bridge_handle.take() {
1790            prev.abort();
1791        }
1792        self.runtime.lifecycle.cancel_bridge_handle =
1793            Some(self.runtime.lifecycle.task_supervisor.spawn_oneshot(
1794                std::sync::Arc::from("agent.lifecycle.cancel_bridge"),
1795                move || async move {
1796                    signal.notified().await;
1797                    token.cancel();
1798                },
1799            ));
1800    }
1801
1802    /// Reap completed background tasks, apply summarization signal, and update supervisor metrics.
1803    ///
1804    /// Called at the top of each turn, before any user message processing.
1805    fn reap_background_tasks_and_update_metrics(&mut self) {
1806        let bg_signal = self.runtime.lifecycle.supervisor.reap();
1807        if bg_signal.did_summarize {
1808            self.services.memory.persistence.unsummarized_count = 0;
1809            tracing::debug!("background summarization completed; unsummarized_count reset");
1810        }
1811        let snap = self.runtime.lifecycle.supervisor.metrics_snapshot();
1812        self.update_metrics(|m| {
1813            m.bg_inflight = snap.inflight as u64;
1814            m.bg_dropped = snap.total_dropped();
1815            m.bg_completed = snap.total_completed();
1816            m.bg_enrichment_inflight = snap.class_inflight[0] as u64;
1817            m.bg_telemetry_inflight = snap.class_inflight[1] as u64;
1818        });
1819
1820        // Update shell background run rows for TUI panel.
1821        if self.runtime.lifecycle.shell_executor_handle.is_some() {
1822            let shell_rows: Vec<crate::metrics::ShellBackgroundRunRow> = self
1823                .runtime
1824                .lifecycle
1825                .shell_executor_handle
1826                .as_ref()
1827                .map(|e| e.background_runs_snapshot())
1828                .unwrap_or_default()
1829                .into_iter()
1830                .map(|s| crate::metrics::ShellBackgroundRunRow {
1831                    run_id: truncate_shell_run_id(&s.run_id),
1832                    command: truncate_shell_command(&s.command),
1833                    elapsed_secs: s.elapsed_ms / 1000,
1834                })
1835                .collect();
1836            self.update_metrics(|m| {
1837                m.shell_background_runs = shell_rows;
1838            });
1839        }
1840
1841        // Intentional ordering: reap() runs before abort_class() so completed tasks are
1842        // accounted in the snapshot above.
1843        if self
1844            .runtime
1845            .config
1846            .supervisor_config
1847            .abort_enrichment_on_turn
1848        {
1849            self.runtime
1850                .lifecycle
1851                .supervisor
1852                .abort_class(agent_supervisor::TaskClass::Enrichment);
1853        }
1854    }
1855
1856    /// Fire completion notifications and `turn_complete` hooks after each turn.
1857    ///
1858    /// Builds [`crate::notifications::TurnSummary`] once and reuses it for both the
1859    /// [`crate::notifications::Notifier`] and any `[[hooks.turn_complete]]` entries. The
1860    /// `preview` field is already redacted by [`Self::last_assistant_preview`], so hook
1861    /// env vars carry no raw assistant output.
1862    ///
1863    /// Gating:
1864    /// - When a `Notifier` is configured, both the notifier and hooks share its
1865    ///   `should_fire` gate (`min_turn_duration_ms`, `only_on_error`, `enabled`).
1866    /// - When no `Notifier` is configured, hooks fire on every turn completion (the
1867    ///   notifier path is simply skipped).
1868    fn maybe_fire_completion_notification(&mut self, turn: &turn::Turn, is_error: bool) {
1869        let snap = turn.metrics_snapshot().timings.clone();
1870        let duration_ms = snap
1871            .prepare_context_ms
1872            .saturating_add(snap.llm_chat_ms)
1873            .saturating_add(snap.tool_exec_ms);
1874        let summary = crate::notifications::TurnSummary {
1875            duration_ms,
1876            preview: self.last_assistant_preview(160),
1877            // TODO: wire turn_tool_calls counter once LifecycleState tracks it (Phase 2).
1878            tool_calls: 0,
1879            llm_requests: self.runtime.lifecycle.turn_llm_requests,
1880            exit_status: if is_error {
1881                crate::notifications::TurnExitStatus::Error
1882            } else {
1883                crate::notifications::TurnExitStatus::Success
1884            },
1885        };
1886
1887        // Gate evaluation: notifier's should_fire result (or unconditional when absent).
1888        let gate_ok = self
1889            .runtime
1890            .lifecycle
1891            .notifier
1892            .as_ref()
1893            .is_none_or(|n| n.should_fire(&summary));
1894
1895        // 1) Existing notifier path — unchanged semantics.
1896        if let Some(ref notifier) = self.runtime.lifecycle.notifier
1897            && gate_ok
1898        {
1899            notifier.fire(&summary);
1900        }
1901
1902        // 2) turn_complete hooks — fire-and-forget via supervisor.
1903        // McpManagerDispatch wraps Arc<McpManager> and is 'static, so it can be moved
1904        // into the async block satisfying tokio::spawn's bound. The &dyn McpDispatch
1905        // borrow is created inside the future from the owned dispatch value.
1906        let hooks = self.services.session.hooks_config.turn_complete.clone();
1907        if !hooks.is_empty() && gate_ok {
1908            let mut env = std::collections::HashMap::new();
1909            env.insert(
1910                "ZEPH_TURN_DURATION_MS".to_owned(),
1911                summary.duration_ms.to_string(),
1912            );
1913            env.insert(
1914                "ZEPH_TURN_STATUS".to_owned(),
1915                if is_error { "error" } else { "success" }.to_owned(),
1916            );
1917            env.insert("ZEPH_TURN_PREVIEW".to_owned(), summary.preview.clone());
1918            env.insert(
1919                "ZEPH_TURN_LLM_REQUESTS".to_owned(),
1920                summary.llm_requests.to_string(),
1921            );
1922            let dispatch = self.mcp_dispatch();
1923            let _span = tracing::info_span!("core.agent.turn_hooks").entered();
1924            let _accepted = self.runtime.lifecycle.supervisor.spawn(
1925                agent_supervisor::TaskClass::Telemetry,
1926                "turn-complete-hooks",
1927                async move {
1928                    let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
1929                        .as_ref()
1930                        .map(|d| d as &dyn zeph_subagent::McpDispatch);
1931                    if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp, None).await
1932                    {
1933                        tracing::warn!(error = %e, "turn_complete hook failed");
1934                    }
1935                },
1936            );
1937        }
1938    }
1939
1940    /// Publish the active goal snapshot to `MetricsSnapshot` and fire `on_turn_complete`
1941    /// accounting as a tracked background task.
1942    fn flush_goal_accounting(&mut self, tokens_before: u64) {
1943        let goal_snap = self
1944            .services
1945            .goal_accounting
1946            .as_ref()
1947            .and_then(|a| a.snapshot());
1948        self.update_metrics(|m| m.active_goal = goal_snap);
1949
1950        if let Some(ref accounting) = self.services.goal_accounting {
1951            let tokens_after = self
1952                .runtime
1953                .metrics
1954                .metrics_tx
1955                .as_ref()
1956                .map_or(0, |tx| tx.borrow().total_tokens);
1957            let turn_tokens = tokens_after.saturating_sub(tokens_before);
1958            let mut spawned: Option<
1959                std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + 'static>>,
1960            > = None;
1961            accounting.on_turn_complete(turn_tokens, |fut| {
1962                spawned = Some(fut);
1963            });
1964            if let Some(fut) = spawned {
1965                let _ = self.runtime.lifecycle.supervisor.spawn(
1966                    agent_supervisor::TaskClass::Telemetry,
1967                    "goal-accounting",
1968                    fut,
1969                );
1970            }
1971        }
1972    }
1973
1974    // Returns true if the input was blocked and the caller should return Ok(()) immediately.
1975    #[tracing::instrument(
1976        name = "core.agent.pre_process_security",
1977        skip_all,
1978        level = "debug",
1979        err
1980    )]
1981    async fn pre_process_security(&mut self, trimmed: &str) -> Result<bool, error::AgentError> {
1982        // Guardrail: LLM-based prompt injection pre-screening at the user input boundary.
1983        if let Some(ref guardrail) = self.services.security.guardrail {
1984            use zeph_sanitizer::guardrail::GuardrailVerdict;
1985            let verdict = guardrail.check(trimmed).await;
1986            match &verdict {
1987                GuardrailVerdict::Flagged { reason, .. } => {
1988                    tracing::warn!(
1989                        reason = %reason,
1990                        should_block = verdict.should_block(),
1991                        "guardrail flagged user input"
1992                    );
1993                    if verdict.should_block() {
1994                        let msg = format!("[guardrail] Input blocked: {reason}");
1995                        let _ = self.channel.send(&msg).await;
1996                        let _ = self.channel.flush_chunks().await;
1997                        return Ok(true);
1998                    }
1999                    // Warn mode: notify but continue.
2000                    let _ = self
2001                        .channel
2002                        .send(&format!("[guardrail] Warning: {reason}"))
2003                        .await;
2004                }
2005                GuardrailVerdict::Error { error } => {
2006                    if guardrail.error_should_block() {
2007                        tracing::warn!(%error, "guardrail check failed (fail_strategy=closed), blocking input");
2008                        let msg = "[guardrail] Input blocked: check failed (see logs for details)";
2009                        let _ = self.channel.send(msg).await;
2010                        let _ = self.channel.flush_chunks().await;
2011                        return Ok(true);
2012                    }
2013                    tracing::warn!(%error, "guardrail check failed (fail_strategy=open), allowing input");
2014                }
2015                GuardrailVerdict::Safe => {}
2016            }
2017        }
2018
2019        // ML classifier: lightweight injection detection on user input boundary.
2020        // Runs after guardrail (LLM-based) to layer defenses. On detection, blocks and returns.
2021        // Falls back to regex on classifier error/timeout — never degrades below regex baseline.
2022        // Gated by `scan_user_input`: DeBERTa is tuned for external/untrusted content, not
2023        // direct user chat. Disabled by default to prevent false positives on benign messages.
2024        #[cfg(feature = "classifiers")]
2025        if self.services.security.sanitizer.scan_user_input() {
2026            match self
2027                .services
2028                .security
2029                .sanitizer
2030                .classify_injection(trimmed)
2031                .await
2032            {
2033                zeph_sanitizer::InjectionVerdict::Blocked => {
2034                    self.push_classifier_metrics();
2035                    let _ = self
2036                        .channel
2037                        .send("[security] Input blocked: injection detected by classifier.")
2038                        .await;
2039                    let _ = self.channel.flush_chunks().await;
2040                    return Ok(true);
2041                }
2042                zeph_sanitizer::InjectionVerdict::Suspicious => {
2043                    tracing::warn!("injection_classifier soft_signal on user input");
2044                }
2045                zeph_sanitizer::InjectionVerdict::Clean => {}
2046                _ => {}
2047            }
2048        }
2049        #[cfg(feature = "classifiers")]
2050        self.push_classifier_metrics();
2051
2052        Ok(false)
2053    }
2054
2055    /// Run `advance_context_lifecycle` with provider-health gating and a wall-clock timeout.
2056    ///
2057    /// Skips context preparation entirely when providers failed on the previous turn and the
2058    /// `no_providers_backoff_secs` window has not yet elapsed. When providers are available,
2059    /// wraps the call with `context_prep_timeout_secs` to prevent a stall when embed backends
2060    /// are rate-limited or unavailable (#3357).
2061    async fn advance_context_lifecycle_guarded(&mut self, text: &str, trimmed: &str) {
2062        let backoff_secs = self.runtime.config.timeouts.no_providers_backoff_secs;
2063        let prep_timeout_secs = self.runtime.config.timeouts.context_prep_timeout_secs;
2064
2065        // Skip expensive memory recall / embedding when providers are known-down.
2066        let providers_recently_failed = self
2067            .runtime
2068            .lifecycle
2069            .last_no_providers_at
2070            .is_some_and(|t| t.elapsed().as_secs() < backoff_secs);
2071
2072        if providers_recently_failed {
2073            tracing::warn!(
2074                backoff_secs,
2075                "skipping context preparation: providers were unavailable on last turn"
2076            );
2077            return;
2078        }
2079
2080        let timeout_dur = std::time::Duration::from_secs(prep_timeout_secs);
2081        match tokio::time::timeout(timeout_dur, self.advance_context_lifecycle(text, trimmed)).await
2082        {
2083            Ok(()) => {}
2084            Err(_elapsed) => {
2085                tracing::warn!(
2086                    timeout_secs = prep_timeout_secs,
2087                    "context preparation timed out; proceeding with degraded context"
2088                );
2089            }
2090        }
2091    }
2092
2093    #[tracing::instrument(
2094        name = "core.agent.advance_context_lifecycle",
2095        skip_all,
2096        level = "debug"
2097    )]
2098    async fn advance_context_lifecycle(&mut self, text: &str, trimmed: &str) {
2099        // Reset per-message pruning cache at the start of each turn (#2298).
2100        self.services.mcp.pruning_cache.reset();
2101
2102        // Extract before rebuild_system_prompt so the value is not tainted
2103        // by the secrets-bearing system prompt (ConversationId is just an i64).
2104        let conv_id = self.services.memory.persistence.conversation_id;
2105        self.rebuild_system_prompt(text).await;
2106
2107        self.detect_and_record_corrections(trimmed, conv_id).await;
2108        self.services.learning_engine.tick();
2109        self.analyze_and_learn().await;
2110        self.sync_graph_counts().await;
2111
2112        // Reset per-turn compaction guard FIRST so SideQuest sees a clean slate (C2 fix).
2113        // complete_focus and maybe_sidequest_eviction set this flag when they run (C1 fix).
2114        // advance_turn() transitions CompactedThisTurn → Cooling/Ready; all other states
2115        // pass through unchanged. See CompactionState::advance_turn for ordering guarantees.
2116        self.context_manager
2117            .set_compaction_state(self.context_manager.compaction_state().advance_turn());
2118
2119        // Tick Focus Agent and SideQuest turn counters (#1850, #1885).
2120        {
2121            self.services.focus.tick();
2122
2123            // SideQuest eviction: runs every N user turns when enabled.
2124            // Skipped when is_compacted_this_turn (focus truncation or prior eviction ran).
2125            let sidequest_should_fire = self.services.sidequest.tick();
2126            if sidequest_should_fire
2127                && !self
2128                    .context_manager
2129                    .compaction_state()
2130                    .is_compacted_this_turn()
2131            {
2132                self.maybe_sidequest_eviction();
2133            }
2134        }
2135
2136        // Experience memory: evolution sweep (fire-and-forget). Runs every N user turns,
2137        // gated on graph + experience config, and only when both stores are attached.
2138        {
2139            let cfg = &self.services.memory.extraction.graph_config.experience;
2140            if cfg.enabled
2141                && cfg.evolution_sweep_enabled
2142                && cfg.evolution_sweep_interval > 0
2143                && self
2144                    .services
2145                    .sidequest
2146                    .turn_counter
2147                    .checked_rem(cfg.evolution_sweep_interval as u64)
2148                    == Some(0)
2149                && let Some(memory) = self.services.memory.persistence.memory.as_ref()
2150                && let (Some(exp), Some(graph)) =
2151                    (memory.experience.as_ref(), memory.graph_store.as_ref())
2152            {
2153                let exp = std::sync::Arc::clone(exp);
2154                let graph = std::sync::Arc::clone(graph);
2155                let threshold = cfg.confidence_prune_threshold;
2156                let turn = self.services.sidequest.turn_counter;
2157                let accepted = self.runtime.lifecycle.supervisor.spawn(
2158                    agent_supervisor::TaskClass::Telemetry,
2159                    "experience-sweep",
2160                    async move {
2161                        match exp.evolution_sweep(graph.as_ref(), threshold).await {
2162                            Ok(stats) => tracing::info!(
2163                                turn,
2164                                self_loops = stats.pruned_self_loops,
2165                                low_confidence = stats.pruned_low_confidence,
2166                                "evolution sweep complete",
2167                            ),
2168                            Err(e) => tracing::warn!(
2169                                turn,
2170                                error = %e,
2171                                "evolution sweep failed",
2172                            ),
2173                        }
2174                    },
2175                );
2176                if !accepted {
2177                    tracing::warn!(
2178                        turn = self.services.sidequest.turn_counter,
2179                        "experience-sweep dropped (telemetry class at capacity)",
2180                    );
2181                }
2182            }
2183        }
2184
2185        // Cache-expiry warning (#2715): notify user when prompt cache has likely expired.
2186        if let Some(warning) = self.cache_expiry_warning() {
2187            tracing::info!(warning, "cache expiry warning");
2188            let _ = self.channel.send_status(&warning).await;
2189        }
2190
2191        // Time-based microcompact (#2699): strip stale low-value tool outputs before compaction.
2192        // Zero-LLM-cost; runs only when session gap exceeds configured threshold.
2193        self.maybe_time_based_microcompact();
2194
2195        // Tier 0: batch-apply deferred tool summaries when approaching context limit.
2196        // This is a pure in-memory operation (no LLM call) — summaries were pre-computed
2197        // during the tool loop. Intentionally does NOT set compacted_this_turn, so
2198        // proactive/reactive compaction may still fire if tokens remain above their thresholds.
2199        self.maybe_apply_deferred_summaries();
2200        self.flush_deferred_summaries().await;
2201
2202        // Proactive compression fires first (if configured); if it runs, reactive is skipped.
2203        if let Err(e) = self.maybe_proactive_compress().await {
2204            tracing::warn!("proactive compression failed: {e:#}");
2205        }
2206
2207        if let Err(e) = self.maybe_compact().await {
2208            tracing::warn!("context compaction failed: {e:#}");
2209        }
2210
2211        if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
2212            tracing::warn!("context preparation failed: {e:#}");
2213        }
2214
2215        // MAR: propagate top-1 recall confidence to the router for cost-aware routing.
2216        self.provider
2217            .set_memory_confidence(self.services.memory.persistence.last_recall_confidence);
2218
2219        self.services.learning_engine.reset_reflection();
2220    }
2221
2222    fn build_user_message(
2223        &mut self,
2224        text: &str,
2225        image_parts: Vec<zeph_llm::provider::MessagePart>,
2226    ) -> Message {
2227        let mut all_image_parts = std::mem::take(&mut self.msg.pending_image_parts);
2228        all_image_parts.extend(image_parts);
2229
2230        if !all_image_parts.is_empty() && self.provider.supports_vision() {
2231            let mut parts = vec![zeph_llm::provider::MessagePart::Text {
2232                text: text.to_owned(),
2233            }];
2234            parts.extend(all_image_parts);
2235            Message::from_parts(Role::User, parts)
2236        } else {
2237            if !all_image_parts.is_empty() {
2238                tracing::warn!(
2239                    count = all_image_parts.len(),
2240                    "image attachments dropped: provider does not support vision"
2241                );
2242            }
2243            Message {
2244                role: Role::User,
2245                content: text.to_owned(),
2246                parts: vec![],
2247                metadata: MessageMetadata::default(),
2248            }
2249        }
2250    }
2251
2252    /// Drain any ready [`zeph_tools::BackgroundCompletion`]s from the channel into
2253    /// `pending_background_completions`. Bounded by `BACKGROUND_COMPLETION_BUFFER_CAP`;
2254    /// on overflow the oldest entry is evicted and a placeholder is inserted.
2255    fn drain_background_completions(&mut self) {
2256        const BACKGROUND_COMPLETION_BUFFER_CAP: usize = 16;
2257
2258        let Some(ref mut rx) = self.runtime.lifecycle.background_completion_rx else {
2259            return;
2260        };
2261        // Non-blocking drain: collect all completions that are already ready.
2262        while let Ok(completion) = rx.try_recv() {
2263            if self.runtime.lifecycle.pending_background_completions.len()
2264                >= BACKGROUND_COMPLETION_BUFFER_CAP
2265            {
2266                tracing::warn!(
2267                    run_id = %completion.run_id,
2268                    "background completion buffer full; dropping run result"
2269                );
2270                // Buffer is full: drop the oldest queued completion and push a sentinel
2271                // for the new (incoming) run so the LLM is informed its result was lost.
2272                self.runtime
2273                    .lifecycle
2274                    .pending_background_completions
2275                    .pop_front();
2276                self.runtime
2277                    .lifecycle
2278                    .pending_background_completions
2279                    .push_back(zeph_tools::BackgroundCompletion {
2280                        run_id: completion.run_id,
2281                        exit_code: -1,
2282                        success: false,
2283                        elapsed_ms: 0,
2284                        command: completion.command,
2285                        output: format!(
2286                            "[background result for run {} dropped: buffer overflow]",
2287                            completion.run_id
2288                        ),
2289                    });
2290            } else {
2291                self.runtime
2292                    .lifecycle
2293                    .pending_background_completions
2294                    .push_back(completion);
2295            }
2296        }
2297    }
2298
2299    /// Format and drain `pending_background_completions` into a prefix string, then
2300    /// return the final merged text (prefix + user message). When there are no pending
2301    /// completions the original text is returned unchanged.
2302    fn build_user_message_text_with_bg_completions(&mut self, user_text: &str) -> String {
2303        if self
2304            .runtime
2305            .lifecycle
2306            .pending_background_completions
2307            .is_empty()
2308        {
2309            return user_text.to_owned();
2310        }
2311        let mut parts = String::new();
2312        for completion in self
2313            .runtime
2314            .lifecycle
2315            .pending_background_completions
2316            .drain(..)
2317        {
2318            let _ = write!(
2319                parts,
2320                "[Background task {} completed]\nexit_code: {}\nsuccess: {}\nelapsed_ms: {}\ncommand: {}\n\n{}\n\n",
2321                completion.run_id,
2322                completion.exit_code,
2323                completion.success,
2324                completion.elapsed_ms,
2325                completion.command,
2326                completion.output,
2327            );
2328        }
2329        parts.push_str(user_text);
2330        parts
2331    }
2332
2333    /// Poll a sub-agent until it reaches a terminal state, bridging secret requests to the
2334    /// channel. Returns a human-readable status string and success flag suitable for
2335    /// sending to the user and emitting lifecycle events.
2336    async fn poll_subagent_until_done(
2337        &mut self,
2338        task_id: &str,
2339        label: &str,
2340    ) -> Option<(String, bool)> {
2341        use zeph_subagent::SubAgentState;
2342        let result = loop {
2343            tokio::time::sleep(std::time::Duration::from_millis(500)).await;
2344
2345            // Bridge secret requests from sub-agent to channel.confirm().
2346            // Fetch the pending request first, then release the borrow before
2347            // calling channel.confirm() (which requires &mut self).
2348            #[allow(clippy::redundant_closure_for_method_calls)]
2349            let pending = self
2350                .services
2351                .orchestration
2352                .subagent_manager
2353                .as_mut()
2354                .and_then(|m| m.try_recv_secret_request());
2355            if let Some((req_task_id, req)) = pending {
2356                // req.secret_key is pre-validated to [a-zA-Z0-9_-] in manager.rs
2357                // (SEC-P1-02), so it is safe to embed in the prompt string.
2358                let confirm_prompt = format!(
2359                    "Sub-agent requests secret '{}'. Allow?",
2360                    crate::text::truncate_to_chars(&req.secret_key, 100)
2361                );
2362                let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
2363                if let Some(mgr) = self.services.orchestration.subagent_manager.as_mut() {
2364                    if approved {
2365                        let ttl = std::time::Duration::from_mins(5);
2366                        let key = req.secret_key.clone();
2367                        if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
2368                            let _ = mgr.deliver_secret(&req_task_id, key);
2369                        }
2370                    } else {
2371                        let _ = mgr.deny_secret(&req_task_id);
2372                    }
2373                }
2374            }
2375
2376            let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2377            let statuses = mgr.statuses();
2378            let Some((_, status)) = statuses.iter().find(|(id, _)| id == task_id) else {
2379                break (format!("{label} completed (no status available)."), true);
2380            };
2381            match status.state {
2382                SubAgentState::Completed => {
2383                    let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
2384                    break (format!("{label} completed: {msg}"), true);
2385                }
2386                SubAgentState::Failed => {
2387                    let msg = status
2388                        .last_message
2389                        .clone()
2390                        .unwrap_or_else(|| "unknown error".into());
2391                    break (format!("{label} failed: {msg}"), false);
2392                }
2393                SubAgentState::Canceled => {
2394                    break (format!("{label} was cancelled."), false);
2395                }
2396                _ => {
2397                    let _ = self
2398                        .channel
2399                        .send_status(&format!(
2400                            "{label}: turn {}/{}",
2401                            status.turns_used,
2402                            self.services
2403                                .orchestration
2404                                .subagent_manager
2405                                .as_ref()
2406                                .and_then(|m| m.agents_def(task_id))
2407                                .map_or(20, |d| d.permissions.max_turns)
2408                        ))
2409                        .await;
2410                }
2411            }
2412        };
2413        Some(result)
2414    }
2415
2416    /// Resolve a unique full `task_id` from a prefix. Returns `None` if the manager is absent,
2417    /// `Some(Err(msg))` on ambiguity/not-found, `Some(Ok(full_id))` on success.
2418    fn resolve_agent_id_prefix(&mut self, prefix: &str) -> Option<Result<String, String>> {
2419        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2420        let full_ids: Vec<String> = mgr
2421            .statuses()
2422            .into_iter()
2423            .map(|(tid, _)| tid)
2424            .filter(|tid| tid.starts_with(prefix))
2425            .collect();
2426        Some(match full_ids.as_slice() {
2427            [] => Err(format!("No sub-agent with id prefix '{prefix}'")),
2428            [fid] => Ok(fid.clone()),
2429            _ => Err(format!(
2430                "Ambiguous id prefix '{prefix}': matches {} agents",
2431                full_ids.len()
2432            )),
2433        })
2434    }
2435
2436    fn handle_agent_list(&self) -> Option<String> {
2437        use std::fmt::Write as _;
2438        let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2439        let defs = mgr.definitions();
2440        if defs.is_empty() {
2441            return Some("No sub-agent definitions found.".into());
2442        }
2443        let mut out = String::from("Available sub-agents:\n");
2444        for d in defs {
2445            let memory_label = match d.memory {
2446                Some(zeph_subagent::MemoryScope::User) => " [memory:user]",
2447                Some(zeph_subagent::MemoryScope::Project) => " [memory:project]",
2448                Some(zeph_subagent::MemoryScope::Local) => " [memory:local]",
2449                Some(_) => " [memory:unknown]",
2450                None => "",
2451            };
2452            if let Some(ref src) = d.source {
2453                let _ = writeln!(
2454                    out,
2455                    "  {}{} — {} ({})",
2456                    d.name, memory_label, d.description, src
2457                );
2458            } else {
2459                let _ = writeln!(out, "  {}{} — {}", d.name, memory_label, d.description);
2460            }
2461        }
2462        Some(out)
2463    }
2464
2465    fn handle_agent_status(&self) -> Option<String> {
2466        use std::fmt::Write as _;
2467        let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2468        let statuses = mgr.statuses();
2469        if statuses.is_empty() {
2470            return Some("No active sub-agents.".into());
2471        }
2472        let mut out = String::from("Active sub-agents:\n");
2473        for (id, s) in &statuses {
2474            let state = format!("{:?}", s.state).to_lowercase();
2475            let elapsed = s.started_at.elapsed().as_secs();
2476            let _ = writeln!(
2477                out,
2478                "  [{short}] {state}  turns={t}  elapsed={elapsed}s  {msg}",
2479                short = &id[..8.min(id.len())],
2480                t = s.turns_used,
2481                msg = s.last_message.as_deref().unwrap_or(""),
2482            );
2483            // Show memory directory path for agents with memory enabled.
2484            if let Some(def) = mgr.agents_def(id)
2485                && let Some(scope) = def.memory
2486                && let Ok(dir) = zeph_subagent::memory::resolve_memory_dir(scope, &def.name)
2487            {
2488                let _ = writeln!(out, "       memory: {}", dir.display());
2489            }
2490        }
2491        Some(out)
2492    }
2493
2494    fn handle_agent_approve(&mut self, id: &str) -> Option<String> {
2495        let full_id = match self.resolve_agent_id_prefix(id)? {
2496            Ok(fid) => fid,
2497            Err(msg) => return Some(msg),
2498        };
2499        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2500        if let Some((tid, req)) = mgr.try_recv_secret_request()
2501            && tid == full_id
2502        {
2503            let key = req.secret_key.clone();
2504            let ttl = std::time::Duration::from_mins(5);
2505            if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
2506                return Some(format!("Approve failed: {e}"));
2507            }
2508            if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
2509                return Some(format!("Secret delivery failed: {e}"));
2510            }
2511            return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
2512        }
2513        Some(format!(
2514            "No pending secret request for sub-agent '{full_id}'."
2515        ))
2516    }
2517
2518    fn handle_agent_deny(&mut self, id: &str) -> Option<String> {
2519        let full_id = match self.resolve_agent_id_prefix(id)? {
2520            Ok(fid) => fid,
2521            Err(msg) => return Some(msg),
2522        };
2523        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2524        match mgr.deny_secret(&full_id) {
2525            Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
2526            Err(e) => Some(format!("Deny failed: {e}")),
2527        }
2528    }
2529
2530    async fn handle_agent_command(&mut self, cmd: zeph_subagent::AgentCommand) -> Option<String> {
2531        use zeph_subagent::AgentCommand;
2532
2533        match cmd {
2534            AgentCommand::List => self.handle_agent_list(),
2535            AgentCommand::Background { name, prompt } => {
2536                self.handle_agent_background(&name, &prompt)
2537            }
2538            AgentCommand::Spawn { name, prompt }
2539            | AgentCommand::Mention {
2540                agent: name,
2541                prompt,
2542            } => self.handle_agent_spawn_foreground(&name, &prompt).await,
2543            AgentCommand::Status => self.handle_agent_status(),
2544            AgentCommand::Cancel { id } => self.handle_agent_cancel(&id),
2545            AgentCommand::Approve { id } => self.handle_agent_approve(&id),
2546            AgentCommand::Deny { id } => self.handle_agent_deny(&id),
2547            AgentCommand::Resume { id, prompt } => self.handle_agent_resume(&id, &prompt).await,
2548            _ => None,
2549        }
2550    }
2551
2552    /// Return the sub-agent definitions section formatted for the `/agents` fleet view.
2553    ///
2554    /// Produces a "Sub-agents:" header followed by one line per definition.
2555    /// Returns an empty string when no sub-agent manager is configured.
2556    pub(crate) fn handle_agents_definitions_list(&self) -> String {
2557        use std::fmt::Write as _;
2558
2559        let Some(mgr) = self.services.orchestration.subagent_manager.as_ref() else {
2560            return String::new();
2561        };
2562        let defs = mgr.definitions();
2563        if defs.is_empty() {
2564            return String::new();
2565        }
2566        let mut out = String::from("Sub-agents:\n");
2567        for d in defs {
2568            let memory_label = match d.memory {
2569                Some(zeph_subagent::MemoryScope::User) => " [memory:user]",
2570                Some(zeph_subagent::MemoryScope::Project) => " [memory:project]",
2571                Some(zeph_subagent::MemoryScope::Local) => " [memory:local]",
2572                Some(_) => " [memory:unknown]",
2573                None => "",
2574            };
2575            if let Some(ref src) = d.source {
2576                let _ = writeln!(
2577                    out,
2578                    "  {}{} — {} ({})",
2579                    d.name, memory_label, d.description, src
2580                );
2581            } else {
2582                let _ = writeln!(out, "  {}{} — {}", d.name, memory_label, d.description);
2583            }
2584        }
2585        out
2586    }
2587
2588    /// Execute an `/agents` CRUD subcommand and return a formatted string.
2589    ///
2590    /// Handles `show`, `create`, `edit`, `delete` (the `list` case is handled by
2591    /// [`handle_agents_definitions_list`] and never reaches this method).
2592    pub(crate) fn handle_agents_crud(&mut self, cmd: zeph_subagent::AgentsCommand) -> String {
2593        use zeph_subagent::AgentsCommand;
2594
2595        let Some(mgr) = self.services.orchestration.subagent_manager.as_ref() else {
2596            return "Sub-agent manager is not available.".to_owned();
2597        };
2598
2599        match cmd {
2600            AgentsCommand::List => self.handle_agents_definitions_list(),
2601            AgentsCommand::Show { name } => {
2602                match mgr.definitions().iter().find(|d| d.name == name) {
2603                    Some(d) => format!(
2604                        "Agent: {}\nDescription: {}\nSource: {}\n",
2605                        d.name,
2606                        d.description,
2607                        d.source.as_deref().unwrap_or("unknown"),
2608                    ),
2609                    None => format!("No sub-agent definition named '{name}'."),
2610                }
2611            }
2612            AgentsCommand::Create { name } => {
2613                format!(
2614                    "To create a sub-agent definition, create a file at `.zeph/agents/{name}.md`.\n\
2615                     See the sub-agent documentation for the required frontmatter."
2616                )
2617            }
2618            AgentsCommand::Edit { name } => {
2619                format!("To edit '{name}', open its definition file in `.zeph/agents/{name}.md`.")
2620            }
2621            AgentsCommand::Delete { name } => {
2622                format!("To delete '{name}', remove the file `.zeph/agents/{name}.md`.")
2623            }
2624            _ => "Unknown agents command.".to_owned(),
2625        }
2626    }
2627
2628    fn handle_agent_background(&mut self, name: &str, prompt: &str) -> Option<String> {
2629        let provider = self.provider.clone();
2630        let tool_executor = Arc::clone(&self.tool_executor);
2631        let skills = self.filtered_skills_for(name);
2632        let cfg = self.services.orchestration.subagent_config.clone();
2633        let spawn_ctx = self.build_spawn_context(&cfg);
2634        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2635        match mgr.spawn(
2636            name,
2637            prompt,
2638            provider,
2639            tool_executor,
2640            skills,
2641            &cfg,
2642            spawn_ctx,
2643        ) {
2644            Ok(id) => Some(format!(
2645                "Sub-agent '{name}' started in background (id: {short})",
2646                short = &id[..8.min(id.len())]
2647            )),
2648            Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
2649        }
2650    }
2651
2652    async fn handle_agent_spawn_foreground(&mut self, name: &str, prompt: &str) -> Option<String> {
2653        let provider = self.provider.clone();
2654        let tool_executor = Arc::clone(&self.tool_executor);
2655        let skills = self.filtered_skills_for(name);
2656        let cfg = self.services.orchestration.subagent_config.clone();
2657        let spawn_ctx = self.build_spawn_context(&cfg);
2658        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2659        let task_id = match mgr.spawn(
2660            name,
2661            prompt,
2662            provider,
2663            tool_executor,
2664            skills,
2665            &cfg,
2666            spawn_ctx,
2667        ) {
2668            Ok(id) => id,
2669            Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
2670        };
2671        let short = task_id[..8.min(task_id.len())].to_owned();
2672        let _ = self
2673            .channel
2674            .send(&format!("Sub-agent '{name}' running... (id: {short})"))
2675            .await;
2676        let _ = self
2677            .channel
2678            .notify_foreground_subagent_started(&task_id, name)
2679            .await;
2680        let label = format!("Sub-agent '{name}'");
2681        let Some((result, success)) = self.poll_subagent_until_done(&task_id, &label).await else {
2682            // Manager was dropped mid-poll; emit completed(false) so TUI does not stay stuck.
2683            let _ = self
2684                .channel
2685                .notify_foreground_subagent_completed(&task_id, name, false)
2686                .await;
2687            return None;
2688        };
2689        let _ = self
2690            .channel
2691            .notify_foreground_subagent_completed(&task_id, name, success)
2692            .await;
2693        Some(result)
2694    }
2695
2696    fn handle_agent_cancel(&mut self, id: &str) -> Option<String> {
2697        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2698        // Accept prefix match on task_id.
2699        let ids: Vec<String> = mgr
2700            .statuses()
2701            .into_iter()
2702            .map(|(task_id, _)| task_id)
2703            .filter(|task_id| task_id.starts_with(id))
2704            .collect();
2705        match ids.as_slice() {
2706            [] => Some(format!("No sub-agent with id prefix '{id}'")),
2707            [full_id] => {
2708                let full_id = full_id.clone();
2709                match mgr.cancel(&full_id) {
2710                    Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
2711                    Err(e) => Some(format!("Cancel failed: {e}")),
2712                }
2713            }
2714            _ => Some(format!(
2715                "Ambiguous id prefix '{id}': matches {} agents",
2716                ids.len()
2717            )),
2718        }
2719    }
2720
2721    async fn handle_agent_resume(&mut self, id: &str, prompt: &str) -> Option<String> {
2722        let cfg = self.services.orchestration.subagent_config.clone();
2723        // Resolve definition name from transcript meta before spawning so we can
2724        // look up skills by definition name rather than the UUID prefix (S1 fix).
2725        let def_name = {
2726            let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2727            match mgr.def_name_for_resume(id, &cfg) {
2728                Ok(name) => name,
2729                Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2730            }
2731        };
2732        let skills = self.filtered_skills_for(&def_name);
2733        let provider = self.provider.clone();
2734        let tool_executor = Arc::clone(&self.tool_executor);
2735        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2736        let (task_id, _) = match mgr.resume(id, prompt, provider, tool_executor, skills, &cfg) {
2737            Ok(pair) => pair,
2738            Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2739        };
2740        let short = task_id[..8.min(task_id.len())].to_owned();
2741        let _ = self
2742            .channel
2743            .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
2744            .await;
2745        let _ = self
2746            .channel
2747            .notify_foreground_subagent_started(&task_id, &def_name)
2748            .await;
2749        let Some((result, success)) = self
2750            .poll_subagent_until_done(&task_id, "Resumed sub-agent")
2751            .await
2752        else {
2753            // Manager was dropped mid-poll; emit completed(false) so TUI does not stay stuck.
2754            let _ = self
2755                .channel
2756                .notify_foreground_subagent_completed(&task_id, &def_name, false)
2757                .await;
2758            return None;
2759        };
2760        let _ = self
2761            .channel
2762            .notify_foreground_subagent_completed(&task_id, &def_name, success)
2763            .await;
2764        Some(result)
2765    }
2766
2767    fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
2768        let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2769        let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
2770        let reg = self.services.skill.registry.read();
2771        match zeph_subagent::filter_skills(&reg, &def.skills) {
2772            Ok(skills) => {
2773                let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
2774                if bodies.is_empty() {
2775                    None
2776                } else {
2777                    Some(bodies)
2778                }
2779            }
2780            Err(e) => {
2781                tracing::warn!(error = %e, "skill filtering failed for sub-agent");
2782                None
2783            }
2784        }
2785    }
2786
2787    /// Build a `SpawnContext` from current agent state for sub-agent spawning.
2788    fn build_spawn_context(
2789        &self,
2790        cfg: &zeph_config::SubAgentConfig,
2791    ) -> zeph_subagent::SpawnContext {
2792        zeph_subagent::SpawnContext {
2793            parent_messages: self.extract_parent_messages(cfg),
2794            parent_cancel: Some(self.runtime.lifecycle.cancel_token.clone()),
2795            parent_provider_name: {
2796                let name = &self.runtime.config.active_provider_name;
2797                if name.is_empty() {
2798                    None
2799                } else {
2800                    Some(name.clone())
2801                }
2802            },
2803            spawn_depth: self.runtime.config.spawn_depth,
2804            mcp_tool_names: self.extract_mcp_tool_names(),
2805            // F3 spec 050 §4: propagate seeded score when parent is >= Elevated.
2806            seed_trajectory_score: {
2807                let child = self.services.security.trajectory.spawn_child();
2808                let score = child.score_now();
2809                if score > 0.0 { Some(score) } else { None }
2810            },
2811            content_isolation: self.runtime.config.security.content_isolation.clone(),
2812            orchestrator_name: Some("zeph".to_owned()),
2813            orchestrator_role: Some("orchestrator".to_owned()),
2814            session_mcp_servers: Vec::new(),
2815        }
2816    }
2817
2818    /// Extract recent parent messages for history propagation (Section 5.7 in spec).
2819    ///
2820    /// Filters system messages, applies `context_window_turns` and `max_parent_messages` caps,
2821    /// applies a 25% context window cap using a 4-chars-per-token heuristic, prunes orphaned
2822    /// `ToolUse`/`ToolResult` pairs at the slice boundary, and optionally sanitizes text parts
2823    /// through the IPI pipeline according to `parent_context_policy`.
2824    fn extract_parent_messages(
2825        &self,
2826        config: &zeph_config::SubAgentConfig,
2827    ) -> Vec<zeph_llm::provider::Message> {
2828        use zeph_config::ParentContextPolicy;
2829        use zeph_llm::provider::Role;
2830
2831        if config.parent_context_policy == ParentContextPolicy::None
2832            || config.context_window_turns == 0
2833        {
2834            return Vec::new();
2835        }
2836
2837        let non_system: Vec<_> = self
2838            .msg
2839            .messages
2840            .iter()
2841            .filter(|m| m.role != Role::System)
2842            .cloned()
2843            .collect();
2844
2845        let take_count = config
2846            .context_window_turns
2847            .saturating_mul(2)
2848            .min(config.max_parent_messages);
2849        let start = non_system.len().saturating_sub(take_count);
2850        let mut msgs = non_system[start..].to_vec();
2851
2852        // Cap at 25% of model context window and prune orphaned tool pairs.
2853        let max_chars = 128_000usize / 4;
2854        let requested = msgs.len();
2855        trim_parent_messages(&mut msgs, max_chars);
2856        if msgs.len() < requested {
2857            tracing::info!(
2858                kept = msgs.len(),
2859                requested,
2860                "[subagent] truncated parent history due to token budget or orphan pruning"
2861            );
2862        }
2863
2864        if config.parent_context_policy == ParentContextPolicy::InheritSanitized {
2865            use zeph_sanitizer::{ContentSource, ContentSourceKind};
2866            let source =
2867                ContentSource::new(ContentSourceKind::A2aMessage).with_identifier("parent_history");
2868            msgs = sanitize_parent_messages(msgs, &self.services.security.sanitizer, &source);
2869        }
2870
2871        msgs
2872    }
2873
2874    /// Extract MCP tool names from the tool executor for diagnostic annotation.
2875    fn extract_mcp_tool_names(&self) -> Vec<String> {
2876        self.tool_executor
2877            .tool_definitions_erased()
2878            .into_iter()
2879            .filter(|t| t.id.starts_with("mcp_"))
2880            .map(|t| t.id.to_string())
2881            .collect()
2882    }
2883
2884    /// Classify a skill directory's source kind using on-disk markers and the bundled allowlist.
2885    ///
2886    /// Must be called from a blocking context (uses synchronous FS I/O).
2887    fn classify_source_kind(
2888        skill_dir: &std::path::Path,
2889        managed_dir: Option<&std::path::PathBuf>,
2890        bundled_names: &std::collections::HashSet<String>,
2891    ) -> zeph_memory::store::SourceKind {
2892        if managed_dir.is_some_and(|d| skill_dir.starts_with(d)) {
2893            let skill_name = skill_dir.file_name().and_then(|n| n.to_str()).unwrap_or("");
2894            let has_marker = skill_dir.join(".bundled").exists();
2895            if has_marker && bundled_names.contains(skill_name) {
2896                zeph_memory::store::SourceKind::Bundled
2897            } else {
2898                if has_marker {
2899                    tracing::warn!(
2900                        skill = %skill_name,
2901                        "skill has .bundled marker but is not in the bundled skill \
2902                         allowlist — classifying as Hub"
2903                    );
2904                }
2905                zeph_memory::store::SourceKind::Hub
2906            }
2907        } else {
2908            zeph_memory::store::SourceKind::Local
2909        }
2910    }
2911
2912    /// Update trust DB records for all reloaded skills.
2913    async fn update_trust_for_reloaded_skills(
2914        &mut self,
2915        all_meta: &[zeph_skills::loader::SkillMeta],
2916    ) {
2917        // Clone Arc before any .await so no &self fields are held across suspension points.
2918        let memory = self.services.memory.persistence.memory.clone();
2919        let Some(memory) = memory else {
2920            return;
2921        };
2922        let trust_cfg = self.services.skill.trust_config.clone();
2923        let managed_dir = self.services.skill.managed_dir.clone();
2924        let bundled_names: std::collections::HashSet<String> =
2925            zeph_skills::bundled_skill_names().into_iter().collect();
2926        for meta in all_meta {
2927            // Compute hash and classify source_kind in spawn_blocking — both are blocking FS calls
2928            // (.bundled marker .exists() and compute_skill_hash both do std::fs I/O).
2929            let skill_dir = meta.skill_dir.clone();
2930            let managed_dir_ref = managed_dir.clone();
2931            let bundled_names_ref = bundled_names.clone();
2932            let fs_result: Option<(String, zeph_memory::store::SourceKind)> =
2933                tokio::task::spawn_blocking(move || {
2934                    let hash = zeph_skills::compute_skill_hash(&skill_dir).ok()?;
2935                    let source_kind = Self::classify_source_kind(
2936                        &skill_dir,
2937                        managed_dir_ref.as_ref(),
2938                        &bundled_names_ref,
2939                    );
2940                    Some((hash, source_kind))
2941                })
2942                .await
2943                .unwrap_or(None);
2944
2945            let Some((current_hash, source_kind)) = fs_result else {
2946                tracing::warn!("failed to compute hash for '{}'", meta.name);
2947                continue;
2948            };
2949            let initial_level = match source_kind {
2950                zeph_memory::store::SourceKind::Bundled => &trust_cfg.bundled_level,
2951                zeph_memory::store::SourceKind::Local | zeph_memory::store::SourceKind::File => {
2952                    &trust_cfg.local_level
2953                }
2954                _ => &trust_cfg.default_level,
2955            };
2956            let existing = memory
2957                .sqlite()
2958                .load_skill_trust(&meta.name)
2959                .await
2960                .ok()
2961                .flatten();
2962            let trust_level = if let Some(ref row) = existing {
2963                if row.blake3_hash != current_hash {
2964                    trust_cfg.hash_mismatch_level
2965                } else if row.source_kind != source_kind {
2966                    // source_kind changed (e.g., hub → bundled on upgrade).
2967                    // Never override an explicit operator block. For active trust levels,
2968                    // adopt the source-kind initial level when it grants more trust.
2969                    let stored = row.trust_level;
2970                    if !stored.is_active() || stored.severity() <= initial_level.severity() {
2971                        stored
2972                    } else {
2973                        *initial_level
2974                    }
2975                } else {
2976                    row.trust_level
2977                }
2978            } else {
2979                *initial_level
2980            };
2981            let source_path = meta.skill_dir.to_str();
2982            if let Err(e) = memory
2983                .sqlite()
2984                .upsert_skill_trust(
2985                    &meta.name,
2986                    trust_level,
2987                    source_kind,
2988                    None,
2989                    source_path,
2990                    &current_hash,
2991                )
2992                .await
2993            {
2994                tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
2995            }
2996        }
2997    }
2998
2999    /// Rebuild or sync the in-memory skill matcher and BM25 index after a registry update.
3000    async fn rebuild_skill_matcher(&mut self, all_meta: &[&zeph_skills::loader::SkillMeta]) {
3001        let provider = self.embedding_provider.clone();
3002        let embed_timeout =
3003            std::time::Duration::from_secs(self.runtime.config.timeouts.embedding_seconds);
3004        let embed_fn = move |text: &str| -> zeph_skills::matcher::EmbedFuture {
3005            let owned = text.to_owned();
3006            let p = provider.clone();
3007            Box::pin(async move {
3008                if let Ok(result) = tokio::time::timeout(embed_timeout, p.embed(&owned)).await {
3009                    result
3010                } else {
3011                    tracing::warn!(
3012                        timeout_secs = embed_timeout.as_secs(),
3013                        "skill matcher: embedding timed out"
3014                    );
3015                    Err(zeph_llm::LlmError::Timeout)
3016                }
3017            })
3018        };
3019
3020        let needs_inmemory_rebuild = !self
3021            .services
3022            .skill
3023            .matcher
3024            .as_ref()
3025            .is_some_and(SkillMatcherBackend::is_qdrant);
3026
3027        if needs_inmemory_rebuild {
3028            self.services.skill.matcher = SkillMatcher::new(all_meta, embed_fn)
3029                .await
3030                .map(SkillMatcherBackend::InMemory);
3031        } else if let Some(ref mut backend) = self.services.skill.matcher {
3032            let _ = self.channel.send_status("syncing skill index...").await;
3033            let on_progress: Option<Box<dyn Fn(usize, usize) + Send>> =
3034                self.services.session.status_tx.clone().map(
3035                    |tx| -> Box<dyn Fn(usize, usize) + Send> {
3036                        Box::new(move |completed, total| {
3037                            let msg = format!("Syncing skills: {completed}/{total}");
3038                            let _ = tx.send(msg);
3039                        })
3040                    },
3041                );
3042            if let Err(e) = backend
3043                .sync(
3044                    all_meta,
3045                    &self.services.skill.embedding_model,
3046                    embed_fn,
3047                    on_progress,
3048                )
3049                .await
3050            {
3051                tracing::warn!("failed to sync skill embeddings: {e:#}");
3052            }
3053        }
3054
3055        if self.services.skill.hybrid_search {
3056            let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
3057            let _ = self.channel.send_status("rebuilding search index...").await;
3058            self.services.skill.rebuild_bm25(&descs);
3059        }
3060    }
3061
3062    #[tracing::instrument(name = "core.agent.reload_skills", skip_all, level = "debug")]
3063    async fn reload_skills(&mut self) {
3064        let old_fp = self.services.skill.fingerprint();
3065        let reload_paths = if let Some(ref supplier) = self.services.skill.plugin_dirs_supplier {
3066            let plugin_dirs = supplier();
3067            let mut paths = self.services.skill.skill_paths.clone();
3068            for dir in plugin_dirs {
3069                if !paths.contains(&dir) {
3070                    paths.push(dir);
3071                }
3072            }
3073            paths
3074        } else {
3075            self.services.skill.skill_paths.clone()
3076        };
3077        self.services.skill.registry.write().reload(&reload_paths);
3078        if self.services.skill.fingerprint() == old_fp {
3079            return;
3080        }
3081        let _ = self.channel.send_status("reloading skills...").await;
3082
3083        let all_meta = self
3084            .services
3085            .skill
3086            .registry
3087            .read()
3088            .all_meta()
3089            .into_iter()
3090            .cloned()
3091            .collect::<Vec<_>>();
3092
3093        self.update_trust_for_reloaded_skills(&all_meta).await;
3094
3095        let all_meta_refs = all_meta.iter().collect::<Vec<_>>();
3096        self.rebuild_skill_matcher(&all_meta_refs).await;
3097
3098        let all_skills: Vec<Skill> = {
3099            let reg = self.services.skill.registry.read();
3100            reg.all_meta()
3101                .iter()
3102                .filter_map(|m| reg.skill(&m.name).ok())
3103                .collect()
3104        };
3105        let trust_map = self.build_skill_trust_map().await;
3106        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
3107        let skills_prompt =
3108            state::SkillState::rebuild_prompt(&all_skills, &trust_map, &empty_health);
3109        self.services
3110            .skill
3111            .last_skills_prompt
3112            .clone_from(&skills_prompt);
3113        let system_prompt = build_system_prompt(&skills_prompt, None);
3114        if let Some(msg) = self.msg.messages.first_mut() {
3115            msg.content = system_prompt;
3116        }
3117
3118        let _ = self.channel.send_status("").await;
3119        tracing::info!(
3120            "reloaded {} skill(s)",
3121            self.services.skill.registry.read().all_meta().len()
3122        );
3123    }
3124
3125    async fn reload_instructions(&mut self) {
3126        // Drain any additional queued events before reloading to avoid redundant reloads.
3127        if let Some(ref mut rx) = self.runtime.instructions.reload_rx {
3128            while rx.try_recv().is_ok() {}
3129        }
3130        let Some(ref state) = self.runtime.instructions.reload_state else {
3131            return;
3132        };
3133        let base_dir = state.base_dir.clone();
3134        let provider_kinds = state.provider_kinds.clone();
3135        let explicit_files = state.explicit_files.clone();
3136        let auto_detect = state.auto_detect;
3137        let new_blocks = crate::instructions::load_instructions_async(
3138            base_dir,
3139            provider_kinds,
3140            explicit_files,
3141            auto_detect,
3142        )
3143        .await;
3144        let old_sources: std::collections::HashSet<_> = self
3145            .runtime
3146            .instructions
3147            .blocks
3148            .iter()
3149            .map(|b| &b.source)
3150            .collect();
3151        let new_sources: std::collections::HashSet<_> =
3152            new_blocks.iter().map(|b| &b.source).collect();
3153        for added in new_sources.difference(&old_sources) {
3154            tracing::info!(path = %added.display(), "instruction file added");
3155        }
3156        for removed in old_sources.difference(&new_sources) {
3157            tracing::info!(path = %removed.display(), "instruction file removed");
3158        }
3159        tracing::info!(
3160            old_count = self.runtime.instructions.blocks.len(),
3161            new_count = new_blocks.len(),
3162            "reloaded instruction files"
3163        );
3164        self.runtime.instructions.blocks = new_blocks;
3165    }
3166
3167    #[allow(clippy::too_many_lines)]
3168    fn reload_config(&mut self) {
3169        let Some(path) = self.runtime.lifecycle.config_path.clone() else {
3170            return;
3171        };
3172        let Some(config) = self.load_config_with_overlay(&path) else {
3173            return;
3174        };
3175        let budget_tokens = resolve_context_budget(&config, &self.provider);
3176        self.runtime.config.security = config.security;
3177        self.runtime.config.timeouts = config.timeouts;
3178        self.runtime.config.redact_credentials = config.memory.redact_credentials;
3179        self.services.memory.persistence.history_limit = config.memory.history_limit;
3180        self.services.memory.persistence.recall_limit = config.memory.semantic.recall_limit;
3181        self.services.memory.compaction.summarization_threshold =
3182            config.memory.summarization_threshold;
3183        self.services.skill.max_active_skills = config.skills.max_active_skills.get();
3184        self.services.skill.disambiguation_threshold = config.skills.disambiguation_threshold;
3185        self.services.skill.min_injection_score = config.skills.min_injection_score;
3186        self.services.skill.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
3187        self.services.skill.hybrid_search = config.skills.hybrid_search;
3188        {
3189            let alpha = config.skills.bm25_alpha;
3190            if !(0.0..=1.0).contains(&alpha) {
3191                tracing::warn!(
3192                    bm25_alpha = alpha,
3193                    "bm25_alpha is outside [0.0, 1.0]; clamping to valid range"
3194                );
3195            }
3196            self.services.skill.bm25_alpha = alpha.clamp(0.0, 1.0);
3197        }
3198        self.services.skill.two_stage_matching = config.skills.two_stage_matching;
3199        self.services.skill.confusability_threshold =
3200            config.skills.confusability_threshold.clamp(0.0, 1.0);
3201        self.services.skill.group_structured = config.skills.group_structured;
3202        self.services.skill.support_similarity_threshold =
3203            config.skills.support_similarity_threshold;
3204        config
3205            .skills
3206            .query_rewrite_provider
3207            .as_str()
3208            .clone_into(&mut self.services.skill.query_rewrite_provider_name);
3209        config
3210            .skills
3211            .generation_provider
3212            .as_str()
3213            .clone_into(&mut self.services.skill.generation_provider_name);
3214        config
3215            .skills
3216            .disambiguate_provider
3217            .as_str()
3218            .clone_into(&mut self.services.skill.disambiguate_provider_name);
3219        self.services.skill.generation_timeout_ms = config.skills.generation_timeout_ms;
3220        self.services.skill.generation_output_dir =
3221            config.skills.generation_output_dir.as_deref().map(|p| {
3222                if let Some(stripped) = p.strip_prefix("~/") {
3223                    dirs::home_dir()
3224                        .map_or_else(|| std::path::PathBuf::from(p), |h| h.join(stripped))
3225                } else {
3226                    std::path::PathBuf::from(p)
3227                }
3228            });
3229
3230        self.context_manager.budget = Some(
3231            ContextBudget::new(budget_tokens, 0.20).with_graph_enabled(config.memory.graph.enabled),
3232        );
3233
3234        {
3235            let graph_cfg = &config.memory.graph;
3236            if graph_cfg.rpe.enabled {
3237                // Re-create router only if it doesn't exist yet; preserve state on hot-reload.
3238                if self.services.memory.extraction.rpe_router.is_none() {
3239                    self.services.memory.extraction.rpe_router =
3240                        Some(std::sync::Mutex::new(zeph_memory::RpeRouter::new(
3241                            graph_cfg.rpe.threshold,
3242                            graph_cfg.rpe.max_skip_turns,
3243                        )));
3244                }
3245            } else {
3246                self.services.memory.extraction.rpe_router = None;
3247            }
3248            self.services.memory.extraction.graph_config = graph_cfg.clone();
3249        }
3250        self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
3251        self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
3252        self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
3253        self.context_manager
3254            .set_compaction_cooldown_turns(config.memory.compaction_cooldown_turns);
3255        self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
3256        self.context_manager.compression = config.memory.compression.clone();
3257        self.context_manager.routing = config.memory.store_routing.clone();
3258        // Resolve routing_classifier_provider from the provider pool (#2484).
3259        self.context_manager.store_routing_provider = if config
3260            .memory
3261            .store_routing
3262            .routing_classifier_provider
3263            .is_empty()
3264        {
3265            None
3266        } else {
3267            let resolved = self.resolve_background_provider(
3268                config
3269                    .memory
3270                    .store_routing
3271                    .routing_classifier_provider
3272                    .as_str(),
3273            );
3274            Some(std::sync::Arc::new(resolved))
3275        };
3276        self.services
3277            .memory
3278            .persistence
3279            .cross_session_score_threshold = config.memory.cross_session_score_threshold;
3280
3281        self.services.index.repo_map_tokens = config.index.repo_map_tokens;
3282        self.services.index.repo_map_ttl =
3283            std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
3284
3285        self.services
3286            .session
3287            .hooks_config
3288            .cwd_changed
3289            .clone_from(&config.hooks.cwd_changed);
3290        self.services
3291            .session
3292            .hooks_config
3293            .permission_denied
3294            .clone_from(&config.hooks.permission_denied);
3295        self.services
3296            .session
3297            .hooks_config
3298            .turn_complete
3299            .clone_from(&config.hooks.turn_complete);
3300        // file_changed_hooks require watcher restart to take effect — skipped here.
3301
3302        tracing::info!("config reloaded");
3303    }
3304
3305    /// Load config from disk, apply plugin overlays, and warn on shell divergence.
3306    ///
3307    /// Returns `None` when loading or overlay merge fails (caller keeps prior runtime state).
3308    fn load_config_with_overlay(&mut self, path: &std::path::Path) -> Option<Config> {
3309        let mut config = match Config::load(path) {
3310            Ok(c) => c,
3311            Err(e) => {
3312                tracing::warn!("config reload failed: {e:#}");
3313                return None;
3314            }
3315        };
3316
3317        // Re-apply plugin overlays. On error, keep previous runtime state intact.
3318        let new_overlay = if self.runtime.lifecycle.plugins_dir.as_os_str().is_empty() {
3319            None
3320        } else {
3321            match zeph_plugins::apply_plugin_config_overlays(
3322                &mut config,
3323                &self.runtime.lifecycle.plugins_dir,
3324            ) {
3325                Ok(o) => Some(o),
3326                Err(e) => {
3327                    tracing::warn!(
3328                        "plugin overlay merge failed during reload: {e:#}; \
3329                         keeping previous runtime state"
3330                    );
3331                    return None;
3332                }
3333            }
3334        };
3335
3336        // M4: detect shell-level divergence from the baked-in executor and warn loudly.
3337        // ShellExecutor is not rebuilt on hot-reload; only skill threshold is live.
3338        // A follow-up P2 issue tracks live-rebuild of ShellExecutor.
3339        if let Some(ref overlay) = new_overlay {
3340            self.warn_on_shell_overlay_divergence(overlay, &config);
3341        }
3342        Some(config)
3343    }
3344
3345    /// React to shell policy divergence detected on hot-reload.
3346    ///
3347    /// `blocked_commands` is rebuilt live via `ShellPolicyHandle::rebuild` — no restart needed.
3348    /// `allowed_commands` cannot be rebuilt (feeds sandbox path intersection at construction time)
3349    /// — emit a warn + status banner when it changes.
3350    fn warn_on_shell_overlay_divergence(
3351        &self,
3352        new_overlay: &zeph_plugins::ResolvedOverlay,
3353        config: &Config,
3354    ) {
3355        let new_blocked: Vec<String> = {
3356            let mut v = config.tools.shell.blocked_commands.clone();
3357            v.sort();
3358            v
3359        };
3360        let new_allowed: Vec<String> = {
3361            let mut v = config.tools.shell.allowed_commands.clone();
3362            v.sort();
3363            v
3364        };
3365
3366        let startup = &self.runtime.lifecycle.startup_shell_overlay;
3367        let blocked_changed = new_blocked != startup.blocked;
3368        let allowed_changed = new_allowed != startup.allowed;
3369
3370        // blocked_commands IS rebuilt live — emit info-level confirmation only.
3371        if blocked_changed && let Some(ref h) = self.runtime.lifecycle.shell_policy_handle {
3372            h.rebuild(&config.tools.shell);
3373            tracing::info!(
3374                blocked_count = h.snapshot_blocked().len(),
3375                "shell blocked_commands rebuilt from hot-reload"
3376            );
3377        }
3378
3379        // allowed_commands cannot be rebuilt — sandbox path intersection is computed at
3380        // executor construction time. Warn loudly so the user restarts.
3381        //
3382        // Note: when base `allowed_commands` is empty (the default), the overlay's
3383        // intersection semantics keep it empty, so this branch is silently unreachable
3384        // for users who do not set a non-empty base list.
3385        if allowed_changed {
3386            let msg = "plugin config overlay changed shell allowed_commands; RESTART REQUIRED \
3387                 for sandbox path recomputation (blocked_commands was rebuilt live)";
3388            tracing::warn!("{msg}");
3389            if let Some(ref tx) = self.services.session.status_tx {
3390                let _ = tx.send(msg.to_owned());
3391            }
3392        }
3393
3394        let _ = new_overlay;
3395    }
3396
3397    /// Run `SideQuest` tool output eviction pass (#1885).
3398    ///
3399    /// PERF-1 fix: two-phase non-blocking design.
3400    ///
3401    /// Phase 1 (apply, this turn): check for a background LLM result spawned last turn,
3402    /// validate and apply it immediately.
3403    ///
3404    /// Phase 2 (schedule, this turn): rebuild cursors and spawn a background `tokio::spawn`
3405    /// task for the LLM call. The result is stored in `pending_sidequest_result` and applied
3406    /// next turn, so the current agent turn is never blocked by the LLM call.
3407    fn maybe_sidequest_eviction(&mut self) {
3408        // S1 runtime guard: warn when SideQuest is enabled alongside a non-Reactive pruning
3409        // strategy — the two systems share the same pool of evictable tool outputs and can
3410        // interfere. Disable sidequest.enabled when pruning_strategy != Reactive.
3411        if self.services.sidequest.config.enabled {
3412            use crate::config::PruningStrategy;
3413            if !matches!(
3414                self.context_manager.compression.pruning_strategy,
3415                PruningStrategy::Reactive
3416            ) {
3417                tracing::warn!(
3418                    strategy = ?self.context_manager.compression.pruning_strategy,
3419                    "sidequest is enabled alongside a non-Reactive pruning strategy; \
3420                     consider disabling sidequest.enabled to avoid redundant eviction"
3421                );
3422            }
3423        }
3424
3425        // Guard: do not evict while a focus session is active.
3426        if self.services.focus.is_active() {
3427            tracing::debug!("sidequest: skipping — focus session active");
3428            // Drop any pending result — cursors may be stale relative to focus truncation.
3429            self.services.compression.pending_sidequest_result = None;
3430            return;
3431        }
3432
3433        // Phase 1: apply pending result from last turn's background LLM call.
3434        self.sidequest_apply_pending();
3435
3436        // Phase 2: rebuild cursors and schedule the next background eviction LLM call.
3437        self.sidequest_schedule_next();
3438    }
3439
3440    fn sidequest_apply_pending(&mut self) {
3441        let Some(handle) = self.services.compression.pending_sidequest_result.take() else {
3442            return;
3443        };
3444        // `try_join` is non-blocking: if the task isn't done yet, `Err(handle)` is returned
3445        // and we reschedule below.
3446        let result = match handle.try_join() {
3447            Ok(result) => result,
3448            Err(_handle) => {
3449                // Task still running — drop it; a fresh one is scheduled below.
3450                tracing::debug!("sidequest: background LLM task not yet complete, rescheduling");
3451                return;
3452            }
3453        };
3454        match result {
3455            Ok(Some(evicted_indices)) if !evicted_indices.is_empty() => {
3456                let cursors_snapshot = self.services.sidequest.tool_output_cursors.clone();
3457                let freed = self.services.sidequest.apply_eviction(
3458                    &mut self.msg.messages,
3459                    &evicted_indices,
3460                    &self.runtime.metrics.token_counter,
3461                );
3462                if freed > 0 {
3463                    self.recompute_prompt_tokens();
3464                    // C1 fix: prevent maybe_compact() from firing in the same turn.
3465                    // cooldown=0: eviction does not impose post-compaction cooldown.
3466                    self.context_manager.set_compaction_state(
3467                        crate::agent::context_manager::CompactionState::CompactedThisTurn {
3468                            cooldown: 0,
3469                        },
3470                    );
3471                    tracing::info!(
3472                        freed_tokens = freed,
3473                        evicted_cursors = evicted_indices.len(),
3474                        pass = self.services.sidequest.passes_run,
3475                        "sidequest eviction complete"
3476                    );
3477                    if let Some(ref d) = self.runtime.debug.debug_dumper {
3478                        d.dump_sidequest_eviction(&cursors_snapshot, &evicted_indices, freed);
3479                    }
3480                    if let Some(ref tx) = self.services.session.status_tx {
3481                        let _ = tx.send(format!("SideQuest evicted {freed} tokens"));
3482                    }
3483                } else {
3484                    // apply_eviction returned 0 — clear spinner so it doesn't dangle.
3485                    if let Some(ref tx) = self.services.session.status_tx {
3486                        let _ = tx.send(String::new());
3487                    }
3488                }
3489            }
3490            Ok(None | Some(_)) => {
3491                tracing::debug!("sidequest: pending result: no cursors to evict");
3492                if let Some(ref tx) = self.services.session.status_tx {
3493                    let _ = tx.send(String::new());
3494                }
3495            }
3496            Err(e) => {
3497                tracing::debug!("sidequest: background task error: {e}");
3498                if let Some(ref tx) = self.services.session.status_tx {
3499                    let _ = tx.send(String::new());
3500                }
3501            }
3502        }
3503    }
3504
3505    fn sidequest_schedule_next(&mut self) {
3506        use zeph_llm::provider::{Message, MessageMetadata, Role};
3507
3508        self.services
3509            .sidequest
3510            .rebuild_cursors(&self.msg.messages, &self.runtime.metrics.token_counter);
3511
3512        if self.services.sidequest.tool_output_cursors.is_empty() {
3513            tracing::debug!("sidequest: no eligible cursors");
3514            return;
3515        }
3516
3517        let prompt = self.services.sidequest.build_eviction_prompt();
3518        let max_eviction_ratio = self.services.sidequest.config.max_eviction_ratio;
3519        let n_cursors = self.services.sidequest.tool_output_cursors.len();
3520        // Clone the provider so the spawn closure owns it without borrowing self.
3521        let provider = self.summary_or_primary_provider().clone();
3522
3523        let eviction_future = async move {
3524            let msgs = [Message {
3525                role: Role::User,
3526                content: prompt,
3527                parts: vec![],
3528                metadata: MessageMetadata::default(),
3529            }];
3530            let response =
3531                match tokio::time::timeout(std::time::Duration::from_secs(5), provider.chat(&msgs))
3532                    .await
3533                {
3534                    Ok(Ok(r)) => r,
3535                    Ok(Err(e)) => {
3536                        tracing::debug!("sidequest bg: LLM call failed: {e:#}");
3537                        return None;
3538                    }
3539                    Err(_) => {
3540                        tracing::debug!("sidequest bg: LLM call timed out");
3541                        return None;
3542                    }
3543                };
3544
3545            let start = response.find('{')?;
3546            let end = response.rfind('}')?;
3547            if start > end {
3548                return None;
3549            }
3550            let json_slice = &response[start..=end];
3551            let parsed: sidequest::EvictionResponse = serde_json::from_str(json_slice).ok()?;
3552            let mut valid: Vec<usize> = parsed
3553                .del_cursors
3554                .into_iter()
3555                .filter(|&c| c < n_cursors)
3556                .collect();
3557            valid.sort_unstable();
3558            valid.dedup();
3559            #[allow(
3560                clippy::cast_precision_loss,
3561                clippy::cast_possible_truncation,
3562                clippy::cast_sign_loss
3563            )]
3564            let max_evict = ((n_cursors as f32) * max_eviction_ratio).ceil() as usize;
3565            valid.truncate(max_evict);
3566            Some(valid)
3567        };
3568        let handle = self.runtime.lifecycle.task_supervisor.spawn_oneshot(
3569            std::sync::Arc::from("agent.sidequest.eviction"),
3570            move || eviction_future,
3571        );
3572        self.services.compression.pending_sidequest_result = Some(handle);
3573        tracing::debug!("sidequest: background LLM eviction task spawned");
3574        if let Some(ref tx) = self.services.session.status_tx {
3575            let _ = tx.send("SideQuest: scoring tool outputs...".into());
3576        }
3577    }
3578
3579    /// Return an `McpDispatch` adapter backed by the agent's MCP manager, if present.
3580    fn mcp_dispatch(&self) -> Option<McpManagerDispatch> {
3581        self.services
3582            .mcp
3583            .manager
3584            .as_ref()
3585            .map(|m| McpManagerDispatch(Arc::clone(m)))
3586    }
3587
3588    /// Check if the process cwd has changed since last call and fire `CwdChanged` hooks.
3589    ///
3590    /// Called after each tool batch completes. The check is a single syscall and has
3591    /// negligible cost. Only fires when cwd actually changed (defense-in-depth: normally
3592    /// only `set_working_directory` changes cwd; shell child processes cannot affect it).
3593    pub(crate) async fn check_cwd_changed(&mut self) {
3594        let current = match std::env::current_dir() {
3595            Ok(p) => p,
3596            Err(e) => {
3597                tracing::warn!("check_cwd_changed: failed to get cwd: {e}");
3598                return;
3599            }
3600        };
3601        if current == self.runtime.lifecycle.last_known_cwd {
3602            return;
3603        }
3604        let old_cwd =
3605            std::mem::replace(&mut self.runtime.lifecycle.last_known_cwd, current.clone());
3606        self.services.session.env_context.working_dir = current.display().to_string();
3607
3608        tracing::info!(
3609            old = %old_cwd.display(),
3610            new = %current.display(),
3611            "working directory changed"
3612        );
3613
3614        let _ = self
3615            .channel
3616            .send_status("Working directory changed\u{2026}")
3617            .await;
3618
3619        let hooks = self.services.session.hooks_config.cwd_changed.clone();
3620        if hooks.is_empty() {
3621            tracing::debug!("CwdChanged: no hooks configured, skipping");
3622        } else {
3623            tracing::debug!(count = hooks.len(), "CwdChanged: firing hooks");
3624            let mut env = std::collections::HashMap::new();
3625            env.insert("ZEPH_OLD_CWD".to_owned(), old_cwd.display().to_string());
3626            env.insert("ZEPH_NEW_CWD".to_owned(), current.display().to_string());
3627            let dispatch = self.mcp_dispatch();
3628            let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
3629                .as_ref()
3630                .map(|d| d as &dyn zeph_subagent::McpDispatch);
3631            if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp, None).await {
3632                tracing::warn!(error = %e, "CwdChanged hook failed");
3633            } else {
3634                tracing::info!(count = hooks.len(), "CwdChanged: hooks fired");
3635            }
3636        }
3637
3638        let _ = self.channel.send_status("").await;
3639    }
3640
3641    /// Handle a `FileChangedEvent` from the file watcher.
3642    pub(crate) async fn handle_file_changed(
3643        &mut self,
3644        event: crate::file_watcher::FileChangedEvent,
3645    ) {
3646        tracing::info!(path = %event.path.display(), "file changed");
3647
3648        let _ = self
3649            .channel
3650            .send_status("Running file-change hook\u{2026}")
3651            .await;
3652
3653        let hooks = self
3654            .services
3655            .session
3656            .hooks_config
3657            .file_changed_hooks
3658            .clone();
3659        if hooks.is_empty() {
3660            tracing::debug!(path = %event.path.display(), "FileChanged: no hooks configured, skipping");
3661        } else {
3662            tracing::debug!(count = hooks.len(), path = %event.path.display(), "FileChanged: firing hooks");
3663            let mut env = std::collections::HashMap::new();
3664            env.insert(
3665                "ZEPH_CHANGED_PATH".to_owned(),
3666                event.path.display().to_string(),
3667            );
3668            let dispatch = self.mcp_dispatch();
3669            let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
3670                .as_ref()
3671                .map(|d| d as &dyn zeph_subagent::McpDispatch);
3672            if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp, None).await {
3673                tracing::warn!(error = %e, "FileChanged hook failed");
3674            } else {
3675                tracing::info!(count = hooks.len(), path = %event.path.display(), "FileChanged: hooks fired");
3676            }
3677        }
3678
3679        let _ = self.channel.send_status("").await;
3680    }
3681
3682    /// If the compression spectrum is enabled and a promotion engine is wired, spawn a
3683    /// background scan task.
3684    ///
3685    /// The task loads the most-recent episodic window from `SemanticMemory`, runs the
3686    /// greedy clustering scan, and calls `promote` for each qualifying candidate.
3687    ///
3688    /// Supervised via [`agent_supervisor::BackgroundSupervisor`] under
3689    /// [`agent_supervisor::TaskClass::Enrichment`] — dropped under high load rather than
3690    /// blocking the turn.
3691    pub(super) fn maybe_spawn_promotion_scan(&mut self) {
3692        let Some(engine) = self.services.promotion_engine.clone() else {
3693            return;
3694        };
3695
3696        let Some(memory) = self.services.memory.persistence.memory.clone() else {
3697            return;
3698        };
3699
3700        // Use a conservative window cap. The engine's own PromotionConfig thresholds
3701        // determine whether a cluster actually qualifies; this is just the DB scan limit.
3702        let promotion_window = 200usize;
3703
3704        let accepted = self.runtime.lifecycle.supervisor.spawn(
3705            agent_supervisor::TaskClass::Enrichment,
3706            "compression_spectrum.promotion_scan",
3707            async move {
3708                let span = tracing::info_span!("memory.compression.promote.background");
3709                let _enter = span.enter();
3710
3711                let window = match memory.load_promotion_window(promotion_window).await {
3712                    Ok(w) => w,
3713                    Err(e) => {
3714                        tracing::warn!(error = %e, "promotion scan: failed to load window");
3715                        return;
3716                    }
3717                };
3718
3719                if window.is_empty() {
3720                    return;
3721                }
3722
3723                let candidates = match engine.scan(&window).await {
3724                    Ok(c) => c,
3725                    Err(e) => {
3726                        tracing::warn!(error = %e, "promotion scan: clustering failed");
3727                        return;
3728                    }
3729                };
3730
3731                for candidate in &candidates {
3732                    if let Err(e) = engine.promote(candidate).await {
3733                        tracing::warn!(
3734                            signature = %candidate.signature,
3735                            error = %e,
3736                            "promotion scan: promote failed"
3737                        );
3738                    }
3739                }
3740
3741                tracing::info!(candidates = candidates.len(), "promotion scan: complete");
3742            },
3743        );
3744
3745        if accepted {
3746            tracing::debug!("compression_spectrum: promotion scan task enqueued");
3747        }
3748    }
3749}
3750/// Estimates the JSON payload size of a single [`zeph_llm::provider::Message`] for token-budget
3751/// accounting.
3752///
3753/// When `parts` is empty the message is a legacy text-only message and `content.len()` is used
3754/// directly. Otherwise each part is measured individually so that structured variants (images,
3755/// tool invocations, thinking blocks) are accounted for rather than relying on the already-flat
3756/// `content` string, which may not reflect the actual API payload size.
3757pub(crate) fn estimate_parts_size(m: &zeph_llm::provider::Message) -> usize {
3758    use zeph_llm::provider::MessagePart;
3759    if m.parts.is_empty() {
3760        return m.content.len();
3761    }
3762    m.parts
3763        .iter()
3764        .map(|p| match p {
3765            MessagePart::Text { text }
3766            | MessagePart::Recall { text }
3767            | MessagePart::CodeContext { text }
3768            | MessagePart::Summary { text }
3769            | MessagePart::CrossSession { text } => text.len(),
3770            MessagePart::ToolOutput { body, .. } => body.len(),
3771            MessagePart::ToolUse { id, name, input } => {
3772                50 + id.len() + name.len() + input.to_string().len()
3773            }
3774            MessagePart::ToolResult {
3775                tool_use_id,
3776                content,
3777                ..
3778            } => 50 + tool_use_id.len() + content.len(),
3779            MessagePart::Image(img) => img.data.len() * 4 / 3,
3780            MessagePart::ThinkingBlock {
3781                thinking,
3782                signature,
3783            } => 50 + thinking.len() + signature.len(),
3784            MessagePart::RedactedThinkingBlock { data } => data.len(),
3785            MessagePart::Compaction { summary } => summary.len(),
3786            _ => 0,
3787        })
3788        .sum()
3789}
3790
3791/// Applies token-budget truncation and orphaned-tool-pair pruning to a parent message slice.
3792///
3793/// Budget truncation keeps the **most recent** messages that fit within `max_chars`
3794/// (a suffix), so the subagent always receives the freshest context.
3795///
3796/// Two passes are performed after budget truncation:
3797///
3798/// 1. Remove `ToolResult` parts from user messages whose matching `ToolUse` is no longer in the
3799///    slice (truncated away).
3800/// 2. Remove `ToolUse` parts from **interior** assistant messages whose matching `ToolResult`
3801///    was removed in pass 1 or was already absent. The trailing assistant message is exempt —
3802///    its unanswered `ToolUse` calls are not orphaned; the slice just ends before the result.
3803///
3804/// Messages that become fully empty after pruning are removed from `msgs`.
3805///
3806/// `rebuild_content` is called **only** when `retain` actually removed parts — preserving the
3807/// existing `content` field (and any `ThinkingBlock` text embedded there) for unmodified
3808/// messages.
3809pub(crate) fn trim_parent_messages(msgs: &mut Vec<zeph_llm::provider::Message>, max_chars: usize) {
3810    use zeph_llm::provider::{MessagePart, Role};
3811
3812    // Token-budget cap: keep the most recent messages that fit within max_chars.
3813    // We iterate from the end (newest) and drain from the front once the budget is exceeded,
3814    // so the subagent always receives the most recent context rather than stale early messages.
3815    let mut total_chars = 0usize;
3816    let mut drop_before = 0usize; // index of the first message to keep
3817    for (i, m) in msgs.iter().enumerate().rev() {
3818        total_chars += estimate_parts_size(m);
3819        if total_chars > max_chars {
3820            drop_before = i + 1;
3821            break;
3822        }
3823    }
3824    if drop_before > 0 {
3825        msgs.drain(..drop_before);
3826    }
3827
3828    // Pass 1: collect ToolUse IDs emitted by assistant messages; prune orphaned ToolResult
3829    // parts from user messages that reference a ToolUse no longer present in the slice.
3830    // Use owned Strings to avoid holding immutable borrows across the subsequent mutable loop.
3831    let emitted_tool_ids: std::collections::HashSet<String> = msgs
3832        .iter()
3833        .filter(|m| m.role == Role::Assistant)
3834        .flat_map(|m| m.parts.iter())
3835        .filter_map(|p| {
3836            if let MessagePart::ToolUse { id, .. } = p {
3837                Some(id.clone())
3838            } else {
3839                None
3840            }
3841        })
3842        .collect();
3843
3844    let mut orphans_removed = 0usize;
3845    for m in msgs.iter_mut() {
3846        if m.role != Role::User || m.parts.is_empty() {
3847            continue;
3848        }
3849        let before = m.parts.len();
3850        m.parts.retain(|p| match p {
3851            MessagePart::ToolResult { tool_use_id, .. } => {
3852                emitted_tool_ids.contains(tool_use_id.as_str())
3853            }
3854            _ => true,
3855        });
3856        let dropped = before - m.parts.len();
3857        if dropped > 0 {
3858            orphans_removed += dropped;
3859            if m.parts.is_empty() {
3860                m.content.clear();
3861            } else {
3862                m.rebuild_content();
3863            }
3864        }
3865    }
3866
3867    // Pass 2: collect ToolResult IDs present in user messages after pass 1; prune ToolUse
3868    // parts from assistant messages whose result is confirmed absent.
3869    //
3870    // The trailing assistant message is exempt: it may legitimately contain unanswered
3871    // ToolUse calls (the slice ends before the result arrives). Only interior assistant
3872    // messages — those followed by at least one user message — can have provably orphaned
3873    // ToolUse parts (the conversation moved on without answering them).
3874    let consumed_tool_ids: std::collections::HashSet<String> = msgs
3875        .iter()
3876        .filter(|m| m.role == Role::User)
3877        .flat_map(|m| m.parts.iter())
3878        .filter_map(|p| {
3879            if let MessagePart::ToolResult { tool_use_id, .. } = p {
3880                Some(tool_use_id.clone())
3881            } else {
3882                None
3883            }
3884        })
3885        .collect();
3886
3887    // Index of the last assistant message — exempt from pass 2.
3888    let last_assistant_idx = msgs
3889        .iter()
3890        .enumerate()
3891        .rev()
3892        .find(|(_, m)| m.role == Role::Assistant)
3893        .map(|(i, _)| i);
3894
3895    for (idx, m) in msgs.iter_mut().enumerate() {
3896        if m.role != Role::Assistant || m.parts.is_empty() {
3897            continue;
3898        }
3899        // Skip the trailing assistant message — its unanswered ToolUse calls are not orphaned.
3900        if Some(idx) == last_assistant_idx {
3901            continue;
3902        }
3903        let before = m.parts.len();
3904        m.parts.retain(|p| match p {
3905            MessagePart::ToolUse { id, .. } => consumed_tool_ids.contains(id.as_str()),
3906            _ => true,
3907        });
3908        let dropped = before - m.parts.len();
3909        if dropped > 0 {
3910            orphans_removed += dropped;
3911            if m.parts.is_empty() {
3912                m.content.clear();
3913            } else {
3914                m.rebuild_content();
3915            }
3916        }
3917    }
3918
3919    // Remove messages that were emptied by orphan pruning.
3920    msgs.retain(|m| !m.content.is_empty() || !m.parts.is_empty());
3921
3922    if orphans_removed > 0 {
3923        tracing::debug!(
3924            orphans = orphans_removed,
3925            "[subagent] pruned orphaned ToolUse/ToolResult parts from parent context boundary"
3926        );
3927    }
3928}
3929
3930/// Sanitize text parts of `msgs` through the IPI pipeline.
3931///
3932/// Only [`MessagePart::Text`] parts are passed through the sanitizer; structured parts
3933/// (`ToolUse`, `ToolResult`, `Recall`, `CodeContext`) are left untouched.  After sanitization
3934/// the message `content` field is rebuilt to stay consistent with the updated parts.
3935fn sanitize_parent_messages(
3936    mut msgs: Vec<zeph_llm::provider::Message>,
3937    sanitizer: &zeph_sanitizer::ContentSanitizer,
3938    source: &zeph_sanitizer::ContentSource,
3939) -> Vec<zeph_llm::provider::Message> {
3940    use zeph_llm::provider::MessagePart;
3941    for msg in &mut msgs {
3942        let mut changed = false;
3943        for part in &mut msg.parts {
3944            if let MessagePart::Text { text } = part {
3945                let clean = sanitizer.sanitize(text, source.clone());
3946                if clean.body != *text {
3947                    *text = clean.body;
3948                    changed = true;
3949                }
3950            }
3951        }
3952        if changed {
3953            msg.rebuild_content();
3954        }
3955    }
3956    msgs
3957}
3958
3959/// Thin wrapper that implements [`zeph_subagent::McpDispatch`] over an [`Arc<zeph_mcp::McpManager>`].
3960///
3961/// Used to pass MCP tool dispatch capability into `fire_hooks` without coupling
3962/// `zeph-subagent` to `zeph-mcp`.
3963struct McpManagerDispatch(Arc<zeph_mcp::McpManager>);
3964
3965impl zeph_subagent::McpDispatch for McpManagerDispatch {
3966    fn call_tool<'a>(
3967        &'a self,
3968        server: &'a str,
3969        tool: &'a str,
3970        args: serde_json::Value,
3971    ) -> std::pin::Pin<
3972        Box<dyn std::future::Future<Output = Result<serde_json::Value, String>> + Send + 'a>,
3973    > {
3974        Box::pin(async move {
3975            self.0
3976                .call_tool(server, tool, args)
3977                .await
3978                .map(|result| {
3979                    // Extract text content from the MCP response as a JSON value.
3980                    let texts: Vec<serde_json::Value> = result
3981                        .content
3982                        .iter()
3983                        .filter_map(|c| {
3984                            if let rmcp::model::RawContent::Text(t) = &c.raw {
3985                                Some(serde_json::Value::String(t.text.clone()))
3986                            } else {
3987                                None
3988                            }
3989                        })
3990                        .collect();
3991                    serde_json::Value::Array(texts)
3992                })
3993                .map_err(|e| e.to_string())
3994        })
3995    }
3996}
3997
3998pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
3999    while !*rx.borrow_and_update() {
4000        if rx.changed().await.is_err() {
4001            std::future::pending::<()>().await;
4002        }
4003    }
4004}
4005
4006pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
4007    match rx {
4008        Some(inner) => {
4009            if let Some(v) = inner.recv().await {
4010                Some(v)
4011            } else {
4012                *rx = None;
4013                std::future::pending().await
4014            }
4015        }
4016        None => std::future::pending().await,
4017    }
4018}
4019
4020/// Resolve the effective context budget from config, applying the `auto_budget` fallback.
4021///
4022/// Mirrors `AppBuilder::auto_budget_tokens` so hot-reload and initial startup use the same
4023/// logic: if `auto_budget = true` and `context_budget_tokens == 0`, query the provider's
4024/// context window; if still 0, fall back to 128 000 tokens.
4025/// Truncate a background run command to at most 80 characters for TUI display.
4026fn truncate_shell_command(cmd: &str) -> String {
4027    if cmd.len() <= 80 {
4028        return cmd.to_owned();
4029    }
4030    let end = cmd.floor_char_boundary(79);
4031    format!("{}…", &cmd[..end])
4032}
4033
4034/// Take the first 8 characters of a run-id hex string for compact TUI display.
4035fn truncate_shell_run_id(id: &str) -> String {
4036    id.chars().take(8).collect()
4037}
4038
4039pub(crate) fn resolve_context_budget(config: &Config, provider: &AnyProvider) -> usize {
4040    let tokens = if config.memory.auto_budget && config.memory.context_budget_tokens == 0 {
4041        if let Some(ctx_size) = provider.context_window() {
4042            tracing::info!(
4043                model_context = ctx_size,
4044                "auto-configured context budget on reload"
4045            );
4046            ctx_size
4047        } else {
4048            0
4049        }
4050    } else {
4051        config.memory.context_budget_tokens
4052    };
4053    if tokens == 0 {
4054        tracing::warn!(
4055            "context_budget_tokens resolved to 0 on reload — using fallback of 128000 tokens"
4056        );
4057        128_000
4058    } else {
4059        tokens
4060    }
4061}
4062
4063#[cfg(test)]
4064mod tests;
4065
4066#[cfg(test)]
4067pub(crate) use tests::agent_tests;
4068
4069#[cfg(test)]
4070mod test_stubs {
4071    use std::pin::Pin;
4072
4073    use zeph_commands::{
4074        CommandContext, CommandError, CommandHandler, CommandOutput, SlashCategory,
4075    };
4076
4077    /// Stub slash command registered only in `#[cfg(test)]` builds.
4078    ///
4079    /// Triggers the `Some(Err(CommandError))` arm in the session/debug registry
4080    /// dispatch block so the non-fatal error path can be tested without production
4081    /// command validation logic.
4082    pub(super) struct TestErrorCommand;
4083
4084    impl CommandHandler<CommandContext<'_>> for TestErrorCommand {
4085        fn name(&self) -> &'static str {
4086            "/test-error"
4087        }
4088
4089        fn description(&self) -> &'static str {
4090            "Test stub: always returns CommandError"
4091        }
4092
4093        fn category(&self) -> SlashCategory {
4094            SlashCategory::Session
4095        }
4096
4097        fn handle<'a>(
4098            &'a self,
4099            _ctx: &'a mut CommandContext<'_>,
4100            _args: &'a str,
4101        ) -> Pin<
4102            Box<dyn std::future::Future<Output = Result<CommandOutput, CommandError>> + Send + 'a>,
4103        > {
4104            Box::pin(async { Err(CommandError::new("boom")) })
4105        }
4106    }
4107}