Skip to main content

zeph_core/agent/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4// TODO(arch-revised-2026-04-26): agent/ module split deferred to epic/m49+/agent-split.
5// Hard prerequisite: 100% task_supervisor adoption (currently 10/31 raw tokio::spawn
6// sites in this directory). See arch-assessment-revised-2026-04-26T02-04-23.md PR 1
7// and PR 8. Do not split this file until PR 1 is merged and a /specs/ entry exists.
8
9mod acp_commands;
10mod agent_access_impl;
11pub(crate) mod agent_supervisor;
12mod autodream;
13mod autonomous_turn;
14mod builder;
15pub(crate) mod channel_impl;
16#[cfg(feature = "cocoon")]
17mod cocoon_cmd;
18mod command_context_impls;
19pub(super) mod compression_feedback;
20mod context;
21mod context_impls;
22pub(crate) mod context_manager;
23mod corrections;
24pub mod error;
25mod experiment_cmd;
26pub(crate) mod focus;
27mod heuristic_promotion;
28mod index;
29mod learning;
30pub(crate) mod learning_engine;
31mod log_commands;
32mod loop_event;
33mod lsp_commands;
34mod magic_docs;
35mod mcp;
36pub(crate) mod memcot;
37mod message_queue;
38mod microcompact;
39mod model_commands;
40mod persistence;
41#[cfg(feature = "scheduler")]
42mod plan;
43mod policy_commands;
44mod provider_cmd;
45mod quality_hook;
46pub(crate) mod rate_limiter;
47#[cfg(feature = "scheduler")]
48mod scheduler_commands;
49#[cfg(feature = "scheduler")]
50mod scheduler_loop;
51mod scope_commands;
52pub mod session_config;
53mod session_digest;
54pub mod shadow_sentinel;
55pub(crate) mod sidequest;
56mod skill_management;
57pub mod slash_commands;
58pub mod speculative;
59pub(crate) mod state;
60pub(crate) mod task_injection;
61pub(crate) mod tool_execution;
62pub(crate) mod tool_orchestrator;
63mod trace_extraction;
64pub mod trajectory;
65mod trajectory_commands;
66mod trust_commands;
67pub mod turn;
68mod utils;
69pub(crate) mod vigil;
70
71use std::collections::{HashMap, VecDeque};
72use std::fmt::Write as _;
73use std::sync::Arc;
74
75use parking_lot::RwLock;
76
77use tokio::sync::{mpsc, watch};
78use tokio_util::sync::CancellationToken;
79use zeph_llm::any::AnyProvider;
80use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
81use zeph_memory::TokenCounter;
82use zeph_memory::semantic::SemanticMemory;
83use zeph_skills::loader::Skill;
84use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
85use zeph_skills::prompt::format_skills_prompt;
86use zeph_skills::registry::SkillRegistry;
87use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
88
89use tracing::Instrument as _;
90
91use crate::channel::Channel;
92use crate::config::Config;
93use crate::context::{ContextBudget, build_system_prompt};
94use zeph_common::text::estimate_tokens;
95
96use loop_event::LoopEvent;
97use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, detect_image_mime};
98use state::MessageState;
99
100pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
101// CODE_CONTEXT_PREFIX is re-exported from zeph-agent-context::helpers so callers inside
102// zeph-core that build system-prompt injections can use it without depending on zeph-agent-context
103// directly. SESSION_DIGEST_PREFIX was removed when assembly migrated to ContextService.
104pub(crate) use zeph_agent_context::helpers::CODE_CONTEXT_PREFIX;
105pub(crate) const SCHEDULED_TASK_PREFIX: &str = "Execute the following scheduled task now: ";
106pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
107
108pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
109    use std::fmt::Write;
110    let capacity = "[tool output: ".len()
111        + tool_name.len()
112        + "]\n```\n".len()
113        + body.len()
114        + TOOL_OUTPUT_SUFFIX.len();
115    let mut buf = String::with_capacity(capacity);
116    let _ = write!(
117        buf,
118        "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
119    );
120    buf
121}
122
123/// Zeph agent: autonomous AI system with multi-model inference, semantic memory, skills,
124/// tool orchestration, and multi-channel I/O.
125///
126/// The agent maintains conversation history, manages LLM provider state, coordinates tool
127/// execution, and orchestrates memory and skill subsystems. It communicates with the outside
128/// world via the [`Channel`] trait, enabling support for CLI, Telegram, TUI, or custom I/O.
129///
130/// # Architecture
131///
132/// - **Message state**: Conversation history with system prompt, message queue, and metadata
133/// - **Memory state**: `SQLite` + Qdrant vector store for semantic search and compaction
134/// - **Skill state**: Registry, matching engine, and self-learning evolution
135/// - **Context manager**: Token budgeting, context assembly, and summarization
136/// - **Tool orchestrator**: DAG-based multi-tool execution with streaming output
137/// - **MCP client**: Multi-server support for Model Context Protocol
138/// - **Index state**: AST-based code indexing and semantic retrieval
139/// - **Security**: Sanitization, exfiltration detection, adversarial probes
140/// - **Metrics**: Token usage, latency, cost, and anomaly tracking
141///
142/// # Channel Contract
143///
144/// The agent requires a [`Channel`] implementation for user interaction:
145/// - Sends agent responses via `channel.send(message)`
146/// - Receives user input via `channel.recv()` / `channel.recv_internal()`
147/// - Supports structured events: tool invocations, tool output, streaming updates
148///
149/// # Lifecycle
150///
151/// 1. Create with [`Self::new`] or [`Self::new_with_registry_arc`]
152/// 2. Run main loop with [`Self::run`]
153/// 3. Clean up with [`Self::shutdown`] to persist state and close resources
154///
155pub struct Agent<C: Channel> {
156    // --- I/O & primary providers (kept inline) ---
157    provider: AnyProvider,
158    /// Dedicated embedding provider. Resolved once at bootstrap from `[[llm.providers]]`
159    /// (the entry with `embed = true`, or first entry with `embedding_model` set).
160    /// Falls back to `provider.clone()` when no dedicated entry exists.
161    /// **Never replaced** by `/provider switch`.
162    embedding_provider: AnyProvider,
163    channel: C,
164    pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
165
166    // --- Conversation core (kept inline) ---
167    pub(super) msg: MessageState,
168    pub(super) context_manager: context_manager::ContextManager,
169    pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
170
171    // --- Aggregated background services ---
172    pub(super) services: state::Services,
173
174    // --- Aggregated runtime / lifecycle / telemetry ---
175    pub(super) runtime: state::AgentRuntime,
176}
177
178/// Control flow signal returned by [`Agent::apply_dispatch_result`].
179enum DispatchFlow {
180    /// The command requested exit; the agent loop should `break`.
181    Break,
182    /// The command was handled; the agent loop should `continue`.
183    Continue,
184    /// The command was not recognised; the agent loop should fall through.
185    Fallthrough,
186}
187
188impl<C: Channel> Agent<C> {
189    /// Create a new agent instance with the given LLM provider, I/O channel, and subsystems.
190    ///
191    /// # Arguments
192    ///
193    /// * `provider` — Multi-model LLM provider (Claude, `OpenAI`, Ollama, Candle)
194    /// * `channel` — I/O abstraction for user interaction (CLI, Telegram, TUI, etc.)
195    /// * `registry` — Skill registry; moved into an internal `Arc<RwLock<_>>` for sharing
196    /// * `matcher` — Optional semantic skill matcher (e.g., Qdrant, BM25). If `None`,
197    ///   skills are matched by exact name only
198    /// * `max_active_skills` — Max concurrent skills in execution (must be > 0)
199    /// * `tool_executor` — Trait object for executing shell, web, and custom tools
200    ///
201    /// # Initialization
202    ///
203    /// The constructor:
204    /// 1. Wraps the skill registry into `Arc<RwLock<_>>` internally
205    /// 2. Builds the system prompt from registered skills
206    /// 3. Initializes all subsystems (memory, context manager, metrics, security)
207    /// 4. Returns a ready-to-run agent
208    ///
209    /// # Panics
210    ///
211    /// Panics if `max_active_skills` is 0.
212    #[must_use]
213    pub fn new(
214        provider: AnyProvider,
215        channel: C,
216        registry: SkillRegistry,
217        matcher: Option<SkillMatcherBackend>,
218        max_active_skills: usize,
219        tool_executor: impl ToolExecutor + 'static,
220    ) -> Self {
221        let registry = Arc::new(RwLock::new(registry));
222        let embedding_provider = provider.clone();
223        Self::new_with_registry_arc(
224            provider,
225            embedding_provider,
226            channel,
227            registry,
228            matcher,
229            max_active_skills,
230            tool_executor,
231        )
232    }
233
234    /// Create an agent from a pre-wrapped registry Arc, allowing the caller to
235    /// share the same Arc with other components (e.g. [`crate::SkillLoaderExecutor`]).
236    ///
237    /// # Panics
238    ///
239    /// Panics if the registry `RwLock` is poisoned.
240    #[must_use]
241    pub fn new_with_registry_arc(
242        provider: AnyProvider,
243        embedding_provider: AnyProvider,
244        channel: C,
245        registry: Arc<RwLock<SkillRegistry>>,
246        matcher: Option<SkillMatcherBackend>,
247        max_active_skills: usize,
248        tool_executor: impl ToolExecutor + 'static,
249    ) -> Self {
250        use state::{
251            AgentRuntime, CompressionState, DebugState, ExperimentState, FeedbackState, IndexState,
252            InstructionState, LifecycleState, McpState, MemoryState, MetricsState,
253            OrchestrationState, ProviderState, RuntimeConfig, SecurityState, Services,
254            SessionState, SkillState, ToolState,
255        };
256
257        debug_assert!(max_active_skills > 0, "max_active_skills must be > 0");
258        let all_skills: Vec<Skill> = {
259            let reg = registry.read();
260            reg.all_meta()
261                .iter()
262                .filter_map(|m| reg.skill(&m.name).ok())
263                .collect()
264        };
265        let empty_trust = HashMap::new();
266        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
267        let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
268        let system_prompt = build_system_prompt(&skills_prompt, None);
269        tracing::debug!(len = system_prompt.len(), "initial system prompt built");
270        tracing::trace!(prompt = %system_prompt, "full system prompt");
271
272        let initial_prompt_tokens = estimate_tokens(&system_prompt) as u64;
273        let token_counter = Arc::new(TokenCounter::new());
274
275        let services = Services {
276            memory: MemoryState::default(),
277            skill: SkillState::new(registry, matcher, max_active_skills, skills_prompt),
278            learning_engine: learning_engine::LearningEngine::new(),
279            feedback: FeedbackState::default(),
280            mcp: McpState::default(),
281            index: IndexState::default(),
282            session: SessionState::new(),
283            security: SecurityState::default(),
284            experiments: ExperimentState::new(),
285            compression: CompressionState::default(),
286            orchestration: OrchestrationState::default(),
287            focus: focus::FocusState::default(),
288            sidequest: sidequest::SidequestState::default(),
289            tool_state: ToolState::default(),
290            goal_accounting: None,
291            quality: None,
292            proactive_explorer: None,
293            promotion_engine: None,
294            taco_compressor: None,
295            speculation_engine: None,
296            autonomous: crate::goal::AutonomousDriver::new(tokio::time::Duration::from_millis(500)),
297            autonomous_registry: crate::goal::AutonomousRegistry::new(),
298        };
299
300        let runtime = AgentRuntime {
301            config: RuntimeConfig::default(),
302            lifecycle: LifecycleState::new(),
303            providers: ProviderState::new(initial_prompt_tokens),
304            metrics: MetricsState::new(token_counter),
305            debug: DebugState::default(),
306            instructions: InstructionState::default(),
307            ephemeral_plugins: Vec::new(),
308        };
309
310        Self {
311            provider,
312            embedding_provider,
313            channel,
314            tool_executor: Arc::new(tool_executor),
315            msg: MessageState {
316                messages: vec![Message {
317                    role: Role::System,
318                    content: system_prompt,
319                    parts: vec![],
320                    metadata: MessageMetadata::default(),
321                }],
322                message_queue: VecDeque::new(),
323                pending_image_parts: Vec::new(),
324                last_persisted_message_id: None,
325                deferred_db_hide_ids: Vec::new(),
326                deferred_db_summaries: Vec::new(),
327            },
328            context_manager: context_manager::ContextManager::new(),
329            tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
330            services,
331            runtime,
332        }
333    }
334
335    /// Consume the agent and return the inner channel.
336    ///
337    /// Call this after [`run`][Agent::run] completes to retrieve the I/O channel (e.g., to
338    /// read captured responses from a headless channel such as `BenchmarkChannel`).
339    ///
340    /// # Examples
341    ///
342    /// ```no_run
343    /// # use zeph_core::agent::Agent;
344    /// // After agent.run().await completes, consume the agent to retrieve the channel.
345    /// // let channel: MyChannel = agent.into_channel();
346    /// ```
347    #[must_use]
348    pub fn into_channel(self) -> C {
349        self.channel
350    }
351
352    /// Poll all active sub-agents for completed/failed/canceled results.
353    ///
354    /// Non-blocking: returns immediately with a list of `(task_id, result)` pairs
355    /// for agents that have finished. Each completed agent is removed from the manager.
356    #[tracing::instrument(name = "core.agent.poll_subagents", skip_all, level = "debug")]
357    pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
358        let Some(mgr) = &mut self.services.orchestration.subagent_manager else {
359            return vec![];
360        };
361
362        let finished: Vec<String> = mgr
363            .statuses()
364            .into_iter()
365            .filter_map(|(id, status)| {
366                if matches!(
367                    status.state,
368                    zeph_subagent::SubAgentState::Completed
369                        | zeph_subagent::SubAgentState::Failed
370                        | zeph_subagent::SubAgentState::Canceled
371                ) {
372                    Some(id)
373                } else {
374                    None
375                }
376            })
377            .collect();
378
379        let mut results = vec![];
380        for task_id in finished {
381            match mgr.collect(&task_id).await {
382                Ok(result) => results.push((task_id, result)),
383                Err(e) => {
384                    tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
385                }
386            }
387        }
388        results
389    }
390
391    /// Call the LLM to generate a structured session summary with a configurable timeout.
392    ///
393    /// Falls back to plain-text chat if structured output fails or times out. Returns `None` on
394    /// any failure, logging a warning — callers must treat `None` as "skip storage".
395    ///
396    /// Each LLM attempt is bounded by `shutdown_summary_timeout_secs`; in the worst case
397    /// (structured call times out and plain-text fallback also times out) this adds up to
398    /// `2 * shutdown_summary_timeout_secs` of shutdown latency.
399    async fn call_llm_for_session_summary(
400        &self,
401        chat_messages: &[Message],
402    ) -> Option<zeph_memory::StructuredSummary> {
403        let provider = self.resolve_background_provider(
404            &self.services.memory.compaction.shutdown_summary_provider,
405        );
406        let timeout_dur = std::time::Duration::from_secs(
407            self.services
408                .memory
409                .compaction
410                .shutdown_summary_timeout_secs,
411        );
412        match tokio::time::timeout(
413            timeout_dur,
414            provider.chat_typed_erased::<zeph_memory::StructuredSummary>(chat_messages),
415        )
416        .await
417        {
418            Ok(Ok(s)) => Some(s),
419            Ok(Err(e)) => {
420                tracing::warn!(
421                    "shutdown summary: structured LLM call failed, falling back to plain: {e:#}"
422                );
423                self.plain_text_summary_fallback(&provider, chat_messages, timeout_dur)
424                    .await
425            }
426            Err(_) => {
427                tracing::warn!(
428                    "shutdown summary: structured LLM call timed out after {}s, falling back to plain",
429                    self.services
430                        .memory
431                        .compaction
432                        .shutdown_summary_timeout_secs
433                );
434                self.plain_text_summary_fallback(&provider, chat_messages, timeout_dur)
435                    .await
436            }
437        }
438    }
439
440    async fn plain_text_summary_fallback(
441        &self,
442        provider: &zeph_llm::any::AnyProvider,
443        chat_messages: &[Message],
444        timeout_dur: std::time::Duration,
445    ) -> Option<zeph_memory::StructuredSummary> {
446        match tokio::time::timeout(timeout_dur, provider.chat(chat_messages)).await {
447            Ok(Ok(plain)) => Some(zeph_memory::StructuredSummary {
448                summary: plain,
449                key_facts: vec![],
450                entities: vec![],
451            }),
452            Ok(Err(e)) => {
453                tracing::warn!("shutdown summary: plain LLM fallback failed: {e:#}");
454                None
455            }
456            Err(_) => {
457                tracing::warn!("shutdown summary: plain LLM fallback timed out");
458                None
459            }
460        }
461    }
462
463    /// Persist tombstone `ToolResult` messages for any assistant `ToolUse` parts that were written
464    /// to the DB during this session but never paired with a `ToolResult` (e.g. because stdin
465    /// closed while tool execution was in progress). Without this the next session startup strips
466    /// those assistant messages and emits orphan warnings.
467    async fn flush_orphaned_tool_use_on_shutdown(&mut self) {
468        use zeph_llm::provider::{MessagePart, Role};
469
470        // Walk messages in reverse: if the last assistant message (ignoring any trailing
471        // system messages) has ToolUse parts and is NOT immediately followed by a user
472        // message whose ToolResult ids cover those ToolUse ids, persist tombstones.
473        let msgs = &self.msg.messages;
474        // Find last assistant message index.
475        let Some(asst_idx) = msgs.iter().rposition(|m| m.role == Role::Assistant) else {
476            return;
477        };
478        let asst_msg = &msgs[asst_idx];
479        let tool_use_ids: Vec<(&str, &str, &serde_json::Value)> = asst_msg
480            .parts
481            .iter()
482            .filter_map(|p| {
483                if let MessagePart::ToolUse { id, name, input } = p {
484                    Some((id.as_str(), name.as_str(), input))
485                } else {
486                    None
487                }
488            })
489            .collect();
490        if tool_use_ids.is_empty() {
491            return;
492        }
493
494        // Check whether a following user message already pairs all ToolUse ids.
495        let paired_ids: std::collections::HashSet<&str> = msgs
496            .get(asst_idx + 1..)
497            .into_iter()
498            .flatten()
499            .filter(|m| m.role == Role::User)
500            .flat_map(|m| m.parts.iter())
501            .filter_map(|p| {
502                if let MessagePart::ToolResult { tool_use_id, .. } = p {
503                    Some(tool_use_id.as_str())
504                } else {
505                    None
506                }
507            })
508            .collect();
509
510        let unpaired: Vec<zeph_llm::provider::ToolUseRequest> = tool_use_ids
511            .iter()
512            .filter(|(id, _, _)| !paired_ids.contains(*id))
513            .map(|(id, name, input)| zeph_llm::provider::ToolUseRequest {
514                id: (*id).to_owned(),
515                name: (*name).to_owned().into(),
516                input: (*input).clone(),
517            })
518            .collect();
519
520        if unpaired.is_empty() {
521            return;
522        }
523
524        tracing::info!(
525            count = unpaired.len(),
526            "shutdown: persisting tombstone ToolResults for unpaired in-flight tool calls"
527        );
528        self.persist_cancelled_tool_results(&unpaired).await;
529    }
530
531    /// Generate and store a lightweight session summary at shutdown when no hard compaction fired.
532    ///
533    /// Guards:
534    /// - `shutdown_summary` config must be enabled
535    /// - `conversation_id` must be set (memory must be attached)
536    /// - no existing session summary in the store (primary guard — resilient to failed Qdrant writes)
537    /// - at least `shutdown_summary_min_messages` user-turn messages in history
538    ///
539    /// All errors are logged as warnings and swallowed — shutdown must never fail.
540    async fn maybe_store_shutdown_summary(&mut self) {
541        if !self.services.memory.compaction.shutdown_summary {
542            return;
543        }
544        let Some(memory) = self.services.memory.persistence.memory.clone() else {
545            return;
546        };
547        let Some(conversation_id) = self.services.memory.persistence.conversation_id else {
548            return;
549        };
550
551        // Primary guard: check if a summary already exists (handles failed Qdrant writes too).
552        match memory.has_session_summary(conversation_id).await {
553            Ok(true) => {
554                tracing::debug!("shutdown summary: session already has a summary, skipping");
555                return;
556            }
557            Ok(false) => {}
558            Err(e) => {
559                tracing::warn!("shutdown summary: failed to check existing summary: {e:#}");
560                return;
561            }
562        }
563
564        // Count user-turn messages only (skip system prompt at index 0).
565        let user_count = self
566            .msg
567            .messages
568            .iter()
569            .skip(1)
570            .filter(|m| m.role == Role::User)
571            .count();
572        if user_count
573            < self
574                .services
575                .memory
576                .compaction
577                .shutdown_summary_min_messages
578        {
579            tracing::debug!(
580                user_count,
581                min = self
582                    .services
583                    .memory
584                    .compaction
585                    .shutdown_summary_min_messages,
586                "shutdown summary: too few user messages, skipping"
587            );
588            return;
589        }
590
591        // TUI status — send errors silently ignored (TUI may already be gone at shutdown).
592        let _ = self.channel.send_status("Saving session summary...").await;
593
594        // Collect last N messages (skip system prompt at index 0).
595        let max = self
596            .services
597            .memory
598            .compaction
599            .shutdown_summary_max_messages;
600        if max == 0 {
601            tracing::debug!("shutdown summary: max_messages=0, skipping");
602            return;
603        }
604        let non_system: Vec<_> = self.msg.messages.iter().skip(1).collect();
605        let slice = if non_system.len() > max {
606            &non_system[non_system.len() - max..]
607        } else {
608            &non_system[..]
609        };
610
611        let msgs_for_prompt: Vec<(zeph_memory::MessageId, String, String)> = slice
612            .iter()
613            .map(|m| {
614                let role = match m.role {
615                    Role::Assistant => "assistant".to_owned(),
616                    Role::System => "system".to_owned(),
617                    Role::User | _ => "user".to_owned(),
618                };
619                (zeph_memory::MessageId(0), role, m.content.clone())
620            })
621            .collect();
622
623        let prompt = zeph_memory::build_summarization_prompt(&msgs_for_prompt);
624        let chat_messages = vec![Message {
625            role: Role::User,
626            content: prompt,
627            parts: vec![],
628            metadata: MessageMetadata::default(),
629        }];
630
631        let Some(structured) = self.call_llm_for_session_summary(&chat_messages).await else {
632            let _ = self.channel.send_status("").await;
633            return;
634        };
635
636        if let Err(e) = memory
637            .store_shutdown_summary(conversation_id, &structured.summary, &structured.key_facts)
638            .await
639        {
640            tracing::warn!("shutdown summary: storage failed: {e:#}");
641        } else {
642            tracing::info!(
643                conversation_id = conversation_id.0,
644                "shutdown summary stored"
645            );
646        }
647
648        // Clear TUI status.
649        let _ = self.channel.send_status("").await;
650    }
651
652    /// Gracefully shut down the agent and persist state.
653    ///
654    /// Performs the following cleanup:
655    ///
656    /// 1. **Message persistence** — Deferred database writes (hide/summary operations)
657    ///    are flushed to memory or disk
658    /// 2. **Provider state** — LLM router state (e.g., Thompson sampling counters) is saved
659    ///    to the vault
660    /// 3. **Sub-agents** — All active sub-agent tasks are terminated
661    /// 4. **MCP servers** — All connected Model Context Protocol servers are shut down
662    /// 5. **Metrics finalization** — Compaction metrics and session metrics are recorded
663    /// 6. **Memory finalization** — Vector stores and semantic indices are flushed
664    /// 7. **Skill state** — Self-learning engine saves evolved skill definitions
665    ///
666    /// Call this before dropping the agent to ensure no data loss.
667    #[tracing::instrument(name = "core.agent.shutdown", skip_all, level = "debug")]
668    pub async fn shutdown(&mut self) {
669        let _ = self.channel.send_status("Shutting down...").await;
670
671        // CRIT-1: persist Thompson state accumulated during this session.
672        self.provider.save_router_state().await;
673
674        // Persist AdaptOrch Beta-arm table alongside Thompson state.
675        if let Some(ref advisor) = self.services.orchestration.topology_advisor
676            && let Err(e) = advisor.save()
677        {
678            tracing::warn!(error = %e, "adaptorch: failed to persist state");
679        }
680
681        if let Some(ref mut mgr) = self.services.orchestration.subagent_manager {
682            mgr.shutdown_all();
683        }
684
685        if let Some(ref manager) = self.services.mcp.manager {
686            manager.shutdown_all_shared().await;
687        }
688
689        // Finalize compaction trajectory: push the last open segment into the Vec.
690        // This segment would otherwise only be pushed when the next hard compaction fires,
691        // which never happens at session end.
692        if let Some(turns) = self.context_manager.turns_since_last_hard_compaction() {
693            self.update_metrics(|m| {
694                m.compaction_turns_after_hard.push(turns);
695            });
696            self.context_manager
697                .set_turns_since_last_hard_compaction(None);
698        }
699
700        if let Some(ref tx) = self.runtime.metrics.metrics_tx {
701            let m = tx.borrow();
702            if m.filter_applications > 0 {
703                #[allow(clippy::cast_precision_loss)]
704                let pct = if m.filter_raw_tokens > 0 {
705                    m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
706                } else {
707                    0.0
708                };
709                tracing::info!(
710                    raw_tokens = m.filter_raw_tokens,
711                    saved_tokens = m.filter_saved_tokens,
712                    applications = m.filter_applications,
713                    "tool output filtering saved ~{} tokens ({pct:.0}%)",
714                    m.filter_saved_tokens,
715                );
716            }
717            if m.compaction_hard_count > 0 {
718                tracing::info!(
719                    hard_compactions = m.compaction_hard_count,
720                    turns_after_hard = ?m.compaction_turns_after_hard,
721                    "hard compaction trajectory"
722                );
723            }
724        }
725
726        // Flush tombstone ToolResults for any assistant ToolUse that was persisted but never
727        // paired with a ToolResult (e.g. stdin EOF mid-execution). Without this the next session
728        // startup strips the orphaned ToolUse and emits warnings.
729        self.flush_orphaned_tool_use_on_shutdown().await;
730
731        // Signal the experiment CancellationToken first so the task can clean up gracefully,
732        // then abort the handle to guarantee it does not outlive the agent regardless.
733        if let Some(ref token) = self.services.experiments.cancel {
734            token.cancel();
735        }
736        if let Some(h) = self.services.experiments.handle.take() {
737            h.abort();
738        }
739
740        // Signal cooperative cancellation to the graph-extraction background task before the
741        // hard abort below. This lets the task exit at a clean checkpoint (e.g. after the
742        // community-refresh select arm fires) rather than being cut mid-write.
743        if let Some(memory) = self.services.memory.persistence.memory.as_ref() {
744            memory.cancel_graph_extraction();
745        }
746
747        // Forcibly abort in-flight Enrichment and Telemetry tasks tracked by the supervisor.
748        self.runtime.lifecycle.supervisor.abort_all();
749
750        // Abort background task handles not tracked by BackgroundSupervisor.
751        // Per the Await Discipline rule, fire-and-forget handles must be aborted on shutdown.
752        if let Some(h) = self.services.compression.pending_task_goal.take() {
753            h.abort();
754        }
755        if let Some(h) = self.services.compression.pending_sidequest_result.take() {
756            h.abort();
757        }
758        if let Some(h) = self.services.compression.pending_subgoal.take() {
759            h.abort();
760        }
761
762        // Abort learning tasks (JoinSet detached at turn boundaries but not on shutdown).
763        self.services.learning_engine.learning_tasks.abort_all();
764
765        // Await the AutoSkill trace extraction task so it is not silently dropped.
766        // Bounded to avoid hanging shutdown when the LLM call inside the task stalls.
767        if let Some(h) = self.services.learning_engine.trace_extraction_handle.take() {
768            let deadline = std::time::Duration::from_mins(2);
769            match tokio::time::timeout(deadline, h).await {
770                Ok(Ok(())) => {}
771                Ok(Err(e)) => tracing::warn!("trace_extraction: task panicked at shutdown: {e}"),
772                Err(_) => tracing::warn!(
773                    "trace_extraction: timed out at shutdown ({}s), aborting",
774                    deadline.as_secs()
775                ),
776            }
777        }
778
779        // Abort the heuristic promotion loop (periodic task; abort is safe because
780        // promotion_already_evaluated ensures idempotent retry on next startup).
781        if let Some(h) = self
782            .services
783            .learning_engine
784            .heuristic_promotion_handle
785            .take()
786        {
787            h.abort();
788        }
789
790        // Drain pending shadow sentinel DB writes before final teardown.
791        if let Some(ref sentinel) = self.services.security.shadow_sentinel {
792            sentinel.drain_pending().await;
793        }
794
795        // Allow cancelled tasks to release their HTTP connections before the summary LLM call.
796        // abort_all() posts cancellation signals but does not drain tasks; aborted futures only
797        // observe cancellation at their next .await point. Without yielding here the summary
798        // call races in-flight enrichment HTTP connections for the same API rate-limit budget.
799        for _ in 0..4 {
800            tokio::task::yield_now().await;
801        }
802
803        self.maybe_store_shutdown_summary().await;
804        self.maybe_store_session_digest().await;
805
806        tracing::info!("agent shutdown complete");
807    }
808
809    /// Run the chat loop, receiving messages via the channel until EOF or shutdown.
810    ///
811    /// # Errors
812    ///
813    /// Returns an error if channel I/O or LLM communication fails.
814    /// Refresh sub-agent metrics snapshot for the TUI metrics panel.
815    fn refresh_subagent_metrics(&mut self) {
816        let Some(ref mgr) = self.services.orchestration.subagent_manager else {
817            return;
818        };
819        let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
820            .statuses()
821            .into_iter()
822            .map(|(id, s)| {
823                let def = mgr.agents_def(&id);
824                crate::metrics::SubAgentMetrics {
825                    name: def.map_or_else(|| id[..8.min(id.len())].to_owned(), |d| d.name.clone()),
826                    id: id.clone(),
827                    state: format!("{:?}", s.state).to_lowercase(),
828                    turns_used: s.turns_used,
829                    max_turns: def.map_or(20, |d| d.permissions.max_turns),
830                    background: def.is_some_and(|d| d.permissions.background),
831                    elapsed_secs: s.started_at.elapsed().as_secs(),
832                    permission_mode: def.map_or_else(String::new, |d| {
833                        use zeph_subagent::def::PermissionMode;
834                        match d.permissions.permission_mode {
835                            PermissionMode::AcceptEdits => "accept_edits".into(),
836                            PermissionMode::DontAsk => "dont_ask".into(),
837                            PermissionMode::BypassPermissions => "bypass_permissions".into(),
838                            PermissionMode::Plan => "plan".into(),
839                            _ => String::new(),
840                        }
841                    }),
842                    transcript_dir: mgr
843                        .agent_transcript_dir(&id)
844                        .map(|p| p.to_string_lossy().into_owned()),
845                }
846            })
847            .collect();
848        self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
849    }
850
851    /// Non-blocking poll: notify the user when background sub-agents complete.
852    async fn notify_completed_subagents(&mut self) -> Result<(), error::AgentError> {
853        let completed = self.poll_subagents().await;
854        for (task_id, result) in completed {
855            let notice = if result.is_empty() {
856                format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
857            } else {
858                format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
859            };
860            if let Err(e) = self.channel.send(&notice).await {
861                tracing::warn!(error = %e, "failed to send sub-agent completion notice");
862            }
863        }
864        Ok(())
865    }
866
867    /// Run the agent main loop.
868    ///
869    /// # Errors
870    ///
871    /// Returns an error if the channel, LLM provider, or tool execution encounters a fatal error.
872    #[tracing::instrument(name = "core.agent.run", skip_all, level = "debug", err)]
873    #[allow(clippy::too_many_lines)] // run loop is inherently large; each branch is independent
874    pub async fn run(&mut self) -> Result<(), error::AgentError>
875    where
876        C: 'static,
877    {
878        if let Some(mut rx) = self.runtime.lifecycle.warmup_ready.take()
879            && !*rx.borrow()
880        {
881            let _ = rx.changed().await;
882            if !*rx.borrow() {
883                tracing::warn!("model warmup did not complete successfully");
884            }
885        }
886
887        // Restore the last-used provider preference before any user interaction (#3308).
888        self.restore_channel_provider().await;
889
890        // Load the session digest once at session start for context injection.
891        self.load_and_cache_session_digest().await;
892        self.maybe_send_resume_recap().await;
893
894        // AutoSkill A6: start periodic heuristic promotion task at session startup so it runs
895        // even when the main loop exits early due to an error (spec 061). The function guards
896        // against double-spawn via a heuristic_promotion_handle.is_some() check.
897        self.maybe_start_heuristic_promotion();
898
899        loop {
900            self.apply_provider_override();
901            self.check_tool_refresh().await;
902            self.process_pending_elicitations().await;
903            self.refresh_subagent_metrics();
904            self.notify_completed_subagents().await?;
905            self.drain_channel();
906
907            let (text, image_parts) = if let Some(queued) = self.msg.message_queue.pop_front() {
908                self.notify_queue_count().await;
909                if queued.raw_attachments.is_empty() {
910                    (queued.text, queued.image_parts)
911                } else {
912                    let msg = crate::channel::ChannelMessage {
913                        text: queued.text,
914                        attachments: queued.raw_attachments,
915                        is_guest_context: false,
916                        is_from_bot: false,
917                    };
918                    self.resolve_message(msg).await
919                }
920            } else {
921                match self.next_event().await? {
922                    None | Some(LoopEvent::Shutdown) => break,
923                    Some(LoopEvent::SkillReload) => {
924                        self.reload_skills().await;
925                        continue;
926                    }
927                    Some(LoopEvent::InstructionReload) => {
928                        self.reload_instructions().await;
929                        continue;
930                    }
931                    Some(LoopEvent::ConfigReload) => {
932                        self.reload_config();
933                        continue;
934                    }
935                    Some(LoopEvent::UpdateNotification(msg)) => {
936                        if let Err(e) = self.channel.send(&msg).await {
937                            tracing::warn!("failed to send update notification: {e}");
938                        }
939                        continue;
940                    }
941                    Some(LoopEvent::ExperimentCompleted(msg)) => {
942                        self.services.experiments.cancel = None;
943                        self.services.experiments.handle = None;
944                        if let Err(e) = self.channel.send(&msg).await {
945                            tracing::warn!("failed to send experiment completion: {e}");
946                        }
947                        continue;
948                    }
949                    Some(LoopEvent::ScheduledTask(prompt)) => {
950                        let text = format!("{SCHEDULED_TASK_PREFIX}{prompt}");
951                        let msg = crate::channel::ChannelMessage {
952                            text,
953                            attachments: Vec::new(),
954                            is_guest_context: false,
955                            is_from_bot: false,
956                        };
957                        self.drain_channel();
958                        self.resolve_message(msg).await
959                    }
960                    Some(LoopEvent::TaskInjected(injection)) => {
961                        if let Some(ref mut ls) = self.runtime.lifecycle.user_loop {
962                            ls.iteration += 1;
963                            tracing::info!(iteration = ls.iteration, "loop: tick");
964                        }
965                        let msg = crate::channel::ChannelMessage {
966                            text: injection.prompt,
967                            attachments: Vec::new(),
968                            is_guest_context: false,
969                            is_from_bot: false,
970                        };
971                        self.drain_channel();
972                        self.resolve_message(msg).await
973                    }
974                    Some(LoopEvent::FileChanged(event)) => {
975                        self.handle_file_changed(event).await;
976                        continue;
977                    }
978                    Some(LoopEvent::AutonomousTick) => {
979                        if let Err(e) = self.run_autonomous_turn().await {
980                            tracing::warn!(error = %e, "autonomous turn error");
981                        }
982                        continue;
983                    }
984                    Some(LoopEvent::Message(msg)) => {
985                        self.services.session.is_guest_context = msg.is_guest_context;
986                        self.drain_channel();
987                        self.resolve_message(msg).await
988                    }
989                }
990            };
991
992            let trimmed = text.trim();
993
994            // M3: extract flagged URLs from all slash commands before any registry dispatch,
995            // so `/skill install <url>` and similar commands populate user_provided_urls.
996            if trimmed.starts_with('/') {
997                let slash_urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
998                if !slash_urls.is_empty() {
999                    self.services
1000                        .security
1001                        .user_provided_urls
1002                        .write()
1003                        .extend(slash_urls);
1004                }
1005            }
1006
1007            // Registry dispatch: two-phase command dispatch.
1008            //
1009            // Phase 1 (session/debug): handlers that need sink + debug + messages but NOT agent.
1010            // Phase 2 (agent): handlers that need &mut Agent directly; use null sentinels for
1011            // the other CommandContext fields to satisfy the type but avoid borrow conflicts.
1012            //
1013            // STRUCTURAL NOTE (C4 — borrow-checker constraint, not deferred by oversight):
1014            // A `TurnState<'a, C>` struct grouping disjoint `&mut Agent<C>` sub-fields would
1015            // eliminate the LIFO-sentinel ordering below. The obstacle: `AgentAccess` is
1016            // implemented on `Agent<C>` itself (see `agent_access_impl.rs`), which accesses
1017            // fields like `memory_state`, `providers`, `mcp`, and `skill_state`. Those fields
1018            // overlap with what a `TurnState` would need to borrow, so `AgentBackend::Real`
1019            // cannot simultaneously hold `&mut Agent` while `TurnState` holds `&mut Agent.providers`.
1020            // The fix requires splitting `Agent<C>` fields into two disjoint sub-structs and moving
1021            // `AgentAccess` to the sub-struct that is disjoint from `TurnState`'s borrow set.
1022            // That restructuring touches `agent_access_impl.rs`, `state.rs`, `builder.rs`, all
1023            // command handlers, and the binary crate — estimated > 300 lines across > 5 files.
1024            // Track as a multi-PR refactor; the current sentinel pattern is correct and safe.
1025            //
1026            // Drop-order rules enforced here:
1027            //   - `sink_adapter` / `null_agent` declared before the registry block → dropped after.
1028            //   - Phase-2 sentinels declared before `ctx` → dropped after `ctx`.
1029            let trusted = self.channel.supports_exit();
1030            let session_impl = command_context_impls::SessionAccessImpl {
1031                supports_exit: trusted,
1032            };
1033            let mut messages_impl = command_context_impls::MessageAccessImpl {
1034                msg: &mut self.msg,
1035                tool_state: &mut self.services.tool_state,
1036                providers: &mut self.runtime.providers,
1037                metrics: &self.runtime.metrics,
1038                security: &mut self.services.security,
1039                tool_orchestrator: &mut self.tool_orchestrator,
1040            };
1041            // sink_adapter declared before reg so it is dropped after reg (LIFO).
1042            let mut sink_adapter = crate::channel::ChannelSinkAdapter(&mut self.channel);
1043            // null_agent must be declared before reg so it lives longer (LIFO drop order).
1044            let mut null_agent = zeph_commands::NullAgent;
1045            let registry_handled = {
1046                use zeph_commands::CommandRegistry;
1047                use zeph_commands::handlers::debug::{
1048                    DebugDumpCommand, DumpFormatCommand, LogCommand,
1049                };
1050                use zeph_commands::handlers::help::HelpCommand;
1051                use zeph_commands::handlers::session::{
1052                    ClearCommand, ClearQueueCommand, ExitCommand, QuitCommand, ResetCommand,
1053                };
1054
1055                let mut reg = CommandRegistry::new();
1056                reg.register(ExitCommand);
1057                reg.register(QuitCommand);
1058                reg.register(ClearCommand);
1059                reg.register(ResetCommand);
1060                reg.register(ClearQueueCommand);
1061                reg.register(LogCommand);
1062                reg.register(DebugDumpCommand);
1063                reg.register(DumpFormatCommand);
1064                reg.register(HelpCommand);
1065                #[cfg(test)]
1066                reg.register(test_stubs::TestErrorCommand);
1067
1068                let mut ctx = zeph_commands::CommandContext {
1069                    sink: &mut sink_adapter,
1070                    debug: &mut self.runtime.debug,
1071                    messages: &mut messages_impl,
1072                    session: &session_impl,
1073                    agent: &mut null_agent,
1074                };
1075                reg.dispatch(&mut ctx, trimmed, trusted).await
1076            };
1077            let session_reg_missed = registry_handled.is_none();
1078            match self
1079                .apply_dispatch_result(registry_handled, trimmed, false)
1080                .await
1081            {
1082                DispatchFlow::Break => break,
1083                DispatchFlow::Continue => continue,
1084                DispatchFlow::Fallthrough => {
1085                    // Not handled by the session/debug registry; try agent-command registry.
1086                }
1087            }
1088
1089            // Agent-command registry: handlers access Agent<C> directly.
1090            // Null sentinels declared here so they outlive ctx regardless of whether the `if`
1091            // block is entered. `ctx` borrows both `self` and the sentinels; it must drop before
1092            // any subsequent `self.channel.*` calls. Because Rust drops in LIFO order, the
1093            // sentinels here will outlive ctx (ctx is declared later, inside the block).
1094            let mut agent_null_debug = command_context_impls::NullDebugAccess;
1095            let mut agent_null_messages = command_context_impls::NullMessageAccess;
1096            let agent_null_session = command_context_impls::NullSessionAccess;
1097            let mut agent_null_sink = zeph_commands::NullSink;
1098            let agent_result: Option<
1099                Result<zeph_commands::CommandOutput, zeph_commands::CommandError>,
1100            > = if session_reg_missed {
1101                use zeph_commands::CommandRegistry;
1102                use zeph_commands::handlers::{
1103                    acp::AcpCommand,
1104                    agent_cmd::AgentCommand,
1105                    agents_fleet::AgentsFleetCommand,
1106                    compaction::{CompactCommand, NewConversationCommand, RecapCommand},
1107                    experiment::ExperimentCommand,
1108                    goal::GoalCommand,
1109                    loop_cmd::LoopCommand,
1110                    lsp::LspCommand,
1111                    mcp::McpCommand,
1112                    memory::{GraphCommand, GuidelinesCommand, MemoryCommand},
1113                    misc::{CacheStatsCommand, ImageCommand, NotifyTestCommand},
1114                    model::{ModelCommand, ProviderCommand},
1115                    plan::PlanCommand,
1116                    plugins::PluginsCommand,
1117                    policy::PolicyCommand,
1118                    scheduler::SchedulerCommand,
1119                    skill::{FeedbackCommand, SkillCommand, SkillsCommand},
1120                    status::{FocusCommand, GuardrailCommand, SideQuestCommand, StatusCommand},
1121                    trajectory::{ScopeCommand, TrajectoryCommand},
1122                };
1123
1124                let mut agent_reg = CommandRegistry::new();
1125                agent_reg.register(MemoryCommand);
1126                agent_reg.register(GraphCommand);
1127                agent_reg.register(GuidelinesCommand);
1128                agent_reg.register(ModelCommand);
1129                agent_reg.register(ProviderCommand);
1130                // Phase 6 migrations: /skill, /skills, /feedback use clone-before-await pattern.
1131                agent_reg.register(SkillCommand);
1132                agent_reg.register(SkillsCommand);
1133                agent_reg.register(FeedbackCommand);
1134                agent_reg.register(McpCommand);
1135                agent_reg.register(PolicyCommand);
1136                agent_reg.register(SchedulerCommand);
1137                agent_reg.register(LspCommand);
1138                // Phase 4 migrations (Send-safe commands):
1139                agent_reg.register(CacheStatsCommand);
1140                agent_reg.register(ImageCommand);
1141                agent_reg.register(NotifyTestCommand);
1142                agent_reg.register(StatusCommand);
1143                agent_reg.register(GuardrailCommand);
1144                agent_reg.register(FocusCommand);
1145                agent_reg.register(SideQuestCommand);
1146                agent_reg.register(AgentCommand);
1147                agent_reg.register(AgentsFleetCommand);
1148                // Phase 5 migrations (Send-compatible):
1149                agent_reg.register(CompactCommand);
1150                agent_reg.register(NewConversationCommand);
1151                agent_reg.register(RecapCommand);
1152                agent_reg.register(ExperimentCommand);
1153                agent_reg.register(PlanCommand);
1154                agent_reg.register(LoopCommand);
1155                agent_reg.register(PluginsCommand);
1156                agent_reg.register(AcpCommand);
1157                #[cfg(feature = "cocoon")]
1158                agent_reg.register(zeph_commands::handlers::cocoon::CocoonCommand);
1159                agent_reg.register(TrajectoryCommand);
1160                agent_reg.register(ScopeCommand);
1161                agent_reg.register(GoalCommand);
1162
1163                let mut ctx = zeph_commands::CommandContext {
1164                    sink: &mut agent_null_sink,
1165                    debug: &mut agent_null_debug,
1166                    messages: &mut agent_null_messages,
1167                    session: &agent_null_session,
1168                    agent: self,
1169                };
1170                // self is reborrowed; ctx drops at end of this block.
1171                agent_reg.dispatch(&mut ctx, trimmed, trusted).await
1172            } else {
1173                None
1174            };
1175            // self.channel is available again here (ctx borrow dropped above).
1176
1177            // S1 fix: drain any pending autonomous session start queued by handle_goal.
1178            // handle_goal runs inside Box::pin(async move) and cannot borrow &mut self directly,
1179            // so it writes to pending_start_arc. We consume it here where &mut self is free.
1180            if let Some((cancelled_id, new_id)) = self.services.autonomous.flush_pending_start() {
1181                if let Some(cid) = cancelled_id {
1182                    tracing::info!(
1183                        goal_id = cid,
1184                        "autonomous: previous session cancelled for new goal"
1185                    );
1186                }
1187                self.sync_registry_entry();
1188                tracing::info!(goal_id = new_id, "autonomous: session started");
1189            }
1190
1191            // Post-dispatch learning hook for `/skill reject` / `/feedback` is triggered
1192            // inside apply_dispatch_result when with_learning = true.
1193            match self
1194                .apply_dispatch_result(agent_result, trimmed, true)
1195                .await
1196            {
1197                DispatchFlow::Break => break,
1198                DispatchFlow::Continue => continue,
1199                DispatchFlow::Fallthrough => {
1200                    // Not handled by agent registry; fall through to existing dispatch.
1201                }
1202            }
1203
1204            match self.handle_builtin_command(trimmed) {
1205                Some(true) => break,
1206                Some(false) => continue,
1207                None => {}
1208            }
1209
1210            self.process_user_message(text, image_parts).await?;
1211        }
1212
1213        // autoDream: run background memory consolidation if conditions are met (#2697).
1214        // Runs with a timeout — partial state is acceptable for MVP.
1215        self.maybe_autodream().await;
1216
1217        // AutoSkill A1: extract skill candidates from the completed session trace (spec 056).
1218        self.maybe_extract_skills_from_trace().await;
1219
1220        // Flush trace collector on normal exit (C-04: Drop handles error/panic paths).
1221        if let Some(ref mut tc) = self.runtime.debug.trace_collector {
1222            tc.finish();
1223        }
1224
1225        Ok(())
1226    }
1227
1228    /// Dispatch a slash-command registry result and flush the channel.
1229    ///
1230    /// Returns [`DispatchFlow::Break`] on exit, [`DispatchFlow::Continue`] when handled, or
1231    /// [`DispatchFlow::Fallthrough`] when `result` is `None`.
1232    /// When `with_learning` is `true`, triggers the post-command learning hook for `Message` output.
1233    async fn apply_dispatch_result(
1234        &mut self,
1235        result: Option<Result<zeph_commands::CommandOutput, zeph_commands::CommandError>>,
1236        command: &str,
1237        with_learning: bool,
1238    ) -> DispatchFlow {
1239        match result {
1240            Some(Ok(zeph_commands::CommandOutput::Exit)) => {
1241                let _ = self.channel.flush_chunks().await;
1242                DispatchFlow::Break
1243            }
1244            Some(Ok(zeph_commands::CommandOutput::Message(msg))) => {
1245                let _ = self.channel.send(&msg).await;
1246                let _ = self.channel.flush_chunks().await;
1247                if with_learning {
1248                    self.maybe_trigger_post_command_learning(command).await;
1249                }
1250                DispatchFlow::Continue
1251            }
1252            Some(Ok(_)) => {
1253                let _ = self.channel.flush_chunks().await;
1254                DispatchFlow::Continue
1255            }
1256            Some(Err(e)) => {
1257                let _ = self.channel.send(&e.to_string()).await;
1258                let _ = self.channel.flush_chunks().await;
1259                tracing::warn!(command = %command, error = %e.0, "slash command failed");
1260                DispatchFlow::Continue
1261            }
1262            None => DispatchFlow::Fallthrough,
1263        }
1264    }
1265
1266    /// Apply any pending LLM provider override from ACP `set_session_config_option`.
1267    fn apply_provider_override(&mut self) {
1268        if let Some(ref slot) = self.runtime.providers.provider_override
1269            && let Some(new_provider) = slot.write().take()
1270        {
1271            tracing::debug!(provider = new_provider.name(), "ACP model override applied");
1272            self.provider = new_provider;
1273        }
1274    }
1275
1276    /// Poll all event sources and return the next [`LoopEvent`].
1277    ///
1278    /// Returns `None` when the inbound channel closes (graceful shutdown).
1279    ///
1280    /// # Errors
1281    ///
1282    /// Propagates channel receive errors.
1283    #[tracing::instrument(name = "core.agent.next_event", skip_all, level = "debug", err)]
1284    async fn next_event(&mut self) -> Result<Option<LoopEvent>, error::AgentError> {
1285        let event = tokio::select! {
1286            result = self.channel.recv() => {
1287                return Ok(result?.map(LoopEvent::Message));
1288            }
1289            () = shutdown_signal(&mut self.runtime.lifecycle.shutdown) => {
1290                tracing::info!("shutting down");
1291                LoopEvent::Shutdown
1292            }
1293            Some(_) = recv_optional(&mut self.services.skill.skill_reload_rx) => {
1294                LoopEvent::SkillReload
1295            }
1296            Some(_) = recv_optional(&mut self.runtime.instructions.reload_rx) => {
1297                LoopEvent::InstructionReload
1298            }
1299            Some(_) = recv_optional(&mut self.runtime.lifecycle.config_reload_rx) => {
1300                LoopEvent::ConfigReload
1301            }
1302            Some(msg) = recv_optional(&mut self.runtime.lifecycle.update_notify_rx) => {
1303                LoopEvent::UpdateNotification(msg)
1304            }
1305            Some(msg) = recv_optional(&mut self.services.experiments.notify_rx) => {
1306                LoopEvent::ExperimentCompleted(msg)
1307            }
1308            Some(prompt) = recv_optional(&mut self.runtime.lifecycle.custom_task_rx) => {
1309                tracing::info!("scheduler: injecting custom task as agent turn");
1310                LoopEvent::ScheduledTask(prompt)
1311            }
1312            () = async {
1313                if let Some(ref mut ls) = self.runtime.lifecycle.user_loop {
1314                    if ls.cancel_tx.is_cancelled() {
1315                        std::future::pending::<()>().await;
1316                    } else {
1317                        ls.interval.tick().await;
1318                    }
1319                } else {
1320                    std::future::pending::<()>().await;
1321                }
1322            } => {
1323                // Re-check user_loop after the tick — /loop stop may have fired between the
1324                // interval firing and this arm executing. Returning Ok(None) causes the caller
1325                // to `continue` without injecting a stale or empty prompt.
1326                let Some(ls) = self.runtime.lifecycle.user_loop.as_ref() else {
1327                    return Ok(None);
1328                };
1329                if ls.cancel_tx.is_cancelled() {
1330                    self.runtime.lifecycle.user_loop = None;
1331                    return Ok(None);
1332                }
1333                let prompt = ls.prompt.clone();
1334                LoopEvent::TaskInjected(task_injection::TaskInjection { prompt })
1335            }
1336            Some(event) = recv_optional(&mut self.runtime.lifecycle.file_changed_rx) => {
1337                LoopEvent::FileChanged(event)
1338            }
1339            // Autonomous goal tick: fires when a running session is active.
1340            () = self.services.autonomous.next_tick(),
1341                if self.services.autonomous.should_tick() => {
1342                LoopEvent::AutonomousTick
1343            }
1344        };
1345        Ok(Some(event))
1346    }
1347
1348    #[tracing::instrument(name = "core.agent.resolve_message", skip_all, level = "debug")]
1349    async fn resolve_message(
1350        &self,
1351        msg: crate::channel::ChannelMessage,
1352    ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
1353        use crate::channel::{Attachment, AttachmentKind};
1354        use zeph_llm::provider::{ImageData, MessagePart};
1355
1356        let text_base = msg.text.clone();
1357
1358        let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
1359            .attachments
1360            .into_iter()
1361            .partition(|a| a.kind == AttachmentKind::Audio);
1362
1363        tracing::debug!(
1364            audio = audio_attachments.len(),
1365            has_stt = self.runtime.providers.stt.is_some(),
1366            "resolve_message attachments"
1367        );
1368
1369        let text = if !audio_attachments.is_empty()
1370            && let Some(stt) = self.runtime.providers.stt.as_ref()
1371        {
1372            let mut transcribed_parts = Vec::new();
1373            for attachment in &audio_attachments {
1374                if attachment.data.len() > MAX_AUDIO_BYTES {
1375                    tracing::warn!(
1376                        size = attachment.data.len(),
1377                        max = MAX_AUDIO_BYTES,
1378                        "audio attachment exceeds size limit, skipping"
1379                    );
1380                    continue;
1381                }
1382                match stt
1383                    .transcribe(&attachment.data, attachment.filename.as_deref())
1384                    .await
1385                {
1386                    Ok(result) => {
1387                        tracing::info!(
1388                            len = result.text.len(),
1389                            language = ?result.language,
1390                            "audio transcribed"
1391                        );
1392                        transcribed_parts.push(result.text);
1393                    }
1394                    Err(e) => {
1395                        tracing::error!(error = %e, "audio transcription failed");
1396                    }
1397                }
1398            }
1399            if transcribed_parts.is_empty() {
1400                text_base
1401            } else {
1402                let transcribed = transcribed_parts.join("\n");
1403                if text_base.is_empty() {
1404                    transcribed
1405                } else {
1406                    format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
1407                }
1408            }
1409        } else {
1410            if !audio_attachments.is_empty() {
1411                tracing::warn!(
1412                    count = audio_attachments.len(),
1413                    "audio attachments received but no STT provider configured, dropping"
1414                );
1415            }
1416            text_base
1417        };
1418
1419        let mut image_parts = Vec::new();
1420        for attachment in image_attachments {
1421            if attachment.data.len() > MAX_IMAGE_BYTES {
1422                tracing::warn!(
1423                    size = attachment.data.len(),
1424                    max = MAX_IMAGE_BYTES,
1425                    "image attachment exceeds size limit, skipping"
1426                );
1427                continue;
1428            }
1429            let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
1430            image_parts.push(MessagePart::Image(Box::new(ImageData {
1431                data: attachment.data,
1432                mime_type,
1433            })));
1434        }
1435
1436        (text, image_parts)
1437    }
1438
1439    /// Create a new [`Turn`] for the given input and advance the turn counter.
1440    ///
1441    /// Clears per-turn state that must not carry over between turns:
1442    /// - per-turn `CancellationToken` (new token for each turn)
1443    /// - per-turn URL set in `SecurityState` (cleared here; re-populated in
1444    ///   `process_user_message_inner` after security checks)
1445    fn begin_turn(&mut self, input: turn::TurnInput) -> turn::Turn {
1446        let id = turn::TurnId(self.runtime.debug.iteration_counter as u64);
1447        self.runtime.debug.iteration_counter += 1;
1448        let cancel_token = CancellationToken::new();
1449        // keep agent-wide token in sync with per-turn token — TODO(#3498): consolidate in Phase 2
1450        self.runtime.lifecycle.cancel_token = cancel_token.clone();
1451        self.services.security.user_provided_urls.write().clear();
1452        // Reset per-turn LLM request counter for the notification gate.
1453        self.runtime.lifecycle.turn_llm_requests = 0;
1454
1455        // Spec 050 §2: drain pending risk signals from executor layers before advancing.
1456        // Also advance MAGE accumulator (spec 004-16 FR-009) and ingest mapped signals.
1457        {
1458            use zeph_memory::shadow::{AuditSignalType as MageSignal, Severity as MageSev};
1459            let pending: Vec<u8> = {
1460                let mut q = self.services.security.trajectory_signal_queue.lock();
1461                std::mem::take(&mut *q)
1462            };
1463            self.services.security.mage_accumulator.advance_turn();
1464            for code in pending {
1465                self.services
1466                    .security
1467                    .trajectory
1468                    .record(crate::agent::trajectory::RiskSignal::from_code(code));
1469                // Map signal codes to MAGE AuditSignalType + Severity (spec 004-16 FR-002, FR-007).
1470                // Code 1=PolicyDeny, 6=VigilMedium, 7=VigilHigh, 2=ExfiltrationRedaction.
1471                let mage_signal: Option<(MageSignal, MageSev)> = match code {
1472                    1 => Some((MageSignal::PolicyViolation, MageSev::Medium)),
1473                    2 => Some((MageSignal::ToolChainAnomaly, MageSev::Medium)),
1474                    6 => Some((MageSignal::PromptInjectionPattern, MageSev::Medium)),
1475                    7 => Some((MageSignal::PromptInjectionPattern, MageSev::High)),
1476                    _ => None,
1477                };
1478                if let Some((sig, sev)) = mage_signal {
1479                    self.services.security.mage_accumulator.ingest(sig, sev);
1480                }
1481            }
1482        }
1483        // Spec 050 Invariant 2: advance trajectory sentinel BEFORE any gate evaluation.
1484        // F5: write auto-recover audit entry when sentinel hard-resets.
1485        if self.services.security.trajectory.advance_turn()
1486            && let Some(logger) = self.tool_orchestrator.audit_logger.clone()
1487        {
1488            let entry = zeph_tools::AuditEntry {
1489                timestamp: zeph_tools::chrono_now(),
1490                tool: "<sentinel>".to_owned().into(),
1491                command: String::new(),
1492                result: zeph_tools::AuditResult::Success,
1493                duration_ms: 0,
1494                error_category: Some("trajectory_auto_recover".to_owned()),
1495                error_domain: Some("security".to_owned()),
1496                error_phase: None,
1497                claim_source: None,
1498                mcp_server_id: None,
1499                injection_flagged: false,
1500                embedding_anomalous: false,
1501                cross_boundary_mcp_to_acp: false,
1502                adversarial_policy_decision: None,
1503                exit_code: None,
1504                truncated: false,
1505                caller_id: None,
1506                skill_name: None,
1507                policy_match: None,
1508                correlation_id: None,
1509                vigil_risk: None,
1510                execution_env: None,
1511                resolved_cwd: None,
1512                scope_at_definition: None,
1513                scope_at_dispatch: None,
1514            };
1515            self.runtime.lifecycle.supervisor.spawn(
1516                crate::agent::agent_supervisor::TaskClass::Telemetry,
1517                "trajectory-auto-recover-audit",
1518                async move { logger.log(&entry).await },
1519            );
1520        }
1521        // Spec 050 Phase 2: reset per-turn probe counter before any tool dispatch.
1522        if let Some(ref sentinel) = self.services.security.shadow_sentinel {
1523            sentinel.advance_turn();
1524        }
1525        // Reset per-turn risk chain state so scores don't bleed across turns.
1526        if let Some(ref acc) = self.services.security.risk_chain_accumulator {
1527            acc.reset();
1528        }
1529        // Publish updated risk level to the shared slot so PolicyGateExecutor can read it.
1530        let risk_level = self.services.security.trajectory.current_risk();
1531        *self.services.security.trajectory_risk_slot.write() = u8::from(risk_level);
1532        // TUI/CLI: emit a status indicator when risk reaches High or Critical (NFR-CG-006).
1533        if let Some(alert) = self.services.security.trajectory.poll_alert() {
1534            let msg = format!(
1535                "[trajectory] Risk level: {:?} (score={:.2})",
1536                alert.level, alert.score
1537            );
1538            tracing::warn!(
1539                level = ?alert.level,
1540                score = alert.score,
1541                "trajectory sentinel alert"
1542            );
1543            if let Some(ref tx) = self.services.session.status_tx {
1544                let _ = tx.send(msg);
1545            }
1546        }
1547
1548        let context = turn::TurnContext::new(id, cancel_token, self.runtime.config.timeouts)
1549            .with_tool_allowlist(self.runtime.config.channel_tool_allowlist.clone());
1550        turn::Turn::new(context, input)
1551    }
1552
1553    /// Finalise a turn: copy accumulated timings into `MetricsState` and flush.
1554    ///
1555    /// Must be called exactly once per turn, after `process_user_message_inner` returns
1556    /// (regardless of success or error). Corresponds to the M2 resolution in the spec:
1557    /// `TurnMetrics.timings` is the single source of truth; `MetricsState.pending_timings`
1558    /// is populated from it here so the rest of the pipeline is unchanged.
1559    fn end_turn(&mut self, turn: turn::Turn) {
1560        self.runtime.metrics.pending_timings = turn.metrics.timings;
1561        self.flush_turn_timings();
1562        // Clear per-turn intent (FR-008): must not persist across turns.
1563        self.services.session.current_turn_intent = None;
1564        // Clear guest context flag: each turn is independently classified.
1565        self.services.session.is_guest_context = false;
1566        // Cancel all in-flight speculative handles at turn boundary.
1567        if let Some(ref engine) = self.services.speculation_engine {
1568            let metrics = engine.end_turn();
1569            if metrics.committed > 0 || metrics.cancelled > 0 {
1570                tracing::debug!(
1571                    committed = metrics.committed,
1572                    cancelled = metrics.cancelled,
1573                    wasted_ms = metrics.wasted_ms,
1574                    "speculation: turn boundary metrics"
1575                );
1576            }
1577        }
1578    }
1579
1580    #[tracing::instrument(
1581        name = "core.agent.process_user_message",
1582        skip_all,
1583        level = "debug",
1584        fields(turn_id),
1585        err
1586    )]
1587    async fn process_user_message(
1588        &mut self,
1589        text: String,
1590        image_parts: Vec<zeph_llm::provider::MessagePart>,
1591    ) -> Result<(), error::AgentError> {
1592        let input = turn::TurnInput::new(text, image_parts);
1593        let mut t = self.begin_turn(input);
1594
1595        let turn_idx = usize::try_from(t.id().0).unwrap_or(usize::MAX);
1596        tracing::Span::current().record("turn_id", t.id().0);
1597        // Record iteration start in trace collector (C-02: owned guard, no borrow held).
1598        self.runtime
1599            .debug
1600            .start_iteration_span(turn_idx, t.input.text.trim());
1601
1602        let result = Box::pin(self.process_user_message_inner(&mut t)).await;
1603
1604        // Close iteration span regardless of outcome (partial trace preserved on error).
1605        let span_status = if result.is_ok() {
1606            crate::debug_dump::trace::SpanStatus::Ok
1607        } else {
1608            crate::debug_dump::trace::SpanStatus::Error {
1609                message: "iteration failed".to_owned(),
1610            }
1611        };
1612        self.runtime.debug.end_iteration_span(turn_idx, span_status);
1613
1614        self.end_turn(t);
1615        result
1616    }
1617
1618    #[tracing::instrument(
1619        name = "core.agent.process_user_message_inner",
1620        skip_all,
1621        level = "debug",
1622        err
1623    )]
1624    async fn process_user_message_inner(
1625        &mut self,
1626        turn: &mut turn::Turn,
1627    ) -> Result<(), error::AgentError> {
1628        self.reap_background_tasks_and_update_metrics();
1629
1630        let tokens_before_turn = self
1631            .runtime
1632            .metrics
1633            .metrics_tx
1634            .as_ref()
1635            .map_or(0, |tx| tx.borrow().total_tokens);
1636
1637        // Drain any background shell completions that arrived since the last turn.
1638        // They are buffered in `pending_background_completions` and merged with the
1639        // real user message into a single user-role block below (N1 invariant).
1640        self.drain_background_completions();
1641
1642        self.wire_cancel_bridge(turn.cancel_token());
1643
1644        // Clone text out of Turn so we can hold both `&str` borrows and mutate turn.metrics.
1645        let text = turn.input.text.clone();
1646        let trimmed_owned = text.trim().to_owned();
1647        let trimmed = trimmed_owned.as_str();
1648
1649        // Capture current-turn intent for VIGIL gate (FR-007). Truncated to 1024 chars.
1650        // Must be set BEFORE any tool call; cleared at end_turn (FR-008).
1651        if self.services.security.vigil.is_some() {
1652            let intent_len = trimmed.floor_char_boundary(1024.min(trimmed.len()));
1653            self.services.session.current_turn_intent = Some(trimmed[..intent_len].to_owned());
1654        }
1655
1656        if let Some(result) = self.dispatch_slash_command(trimmed).await {
1657            return result;
1658        }
1659
1660        self.check_pending_rollbacks().await;
1661
1662        if self.pre_process_security(trimmed).await? {
1663            return Ok(());
1664        }
1665
1666        let t_ctx = std::time::Instant::now();
1667        tracing::debug!("turn timing: prepare_context start");
1668        self.advance_context_lifecycle_guarded(&text, trimmed).await;
1669        turn.metrics_mut().timings.prepare_context_ms =
1670            u64::try_from(t_ctx.elapsed().as_millis()).unwrap_or(u64::MAX);
1671        tracing::debug!(
1672            ms = turn.metrics_snapshot().timings.prepare_context_ms,
1673            "turn timing: prepare_context done"
1674        );
1675        // Emit projected token count so TUI can display it before the LLM call.
1676        let _ = self
1677            .channel
1678            .send_context_estimate(
1679                usize::try_from(self.runtime.providers.cached_prompt_tokens).unwrap_or(usize::MAX),
1680            )
1681            .await;
1682
1683        let image_parts = std::mem::take(&mut turn.input.image_parts);
1684        // Prepend any background completion blocks to the user text. All completions and the
1685        // user message MUST be merged into a single user-role block to satisfy the strict
1686        // user/assistant alternation rule (Anthropic Messages API — N1 invariant).
1687        let merged_text = self.build_user_message_text_with_bg_completions(&text);
1688        let user_msg = self.build_user_message(&merged_text, image_parts);
1689
1690        // Extract URLs from user input and add to user_provided_urls for grounding checks.
1691        // URL set was cleared in begin_turn; re-populate for this turn.
1692        let urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
1693        if !urls.is_empty() {
1694            self.services
1695                .security
1696                .user_provided_urls
1697                .write()
1698                .extend(urls);
1699        }
1700
1701        // Capture raw user input as goal text for A-MAC goal-conditioned write gating (#2483).
1702        // Derived from the raw input text before context assembly to avoid timing dependencies.
1703        self.services.memory.extraction.goal_text = Some(text.clone());
1704
1705        let t_persist = std::time::Instant::now();
1706        tracing::debug!("turn timing: persist_message(user) start");
1707        // Image parts intentionally excluded — base64 payloads too large for message history.
1708        self.persist_message(Role::User, &text, &[], false).await;
1709        turn.metrics_mut().timings.persist_message_ms =
1710            u64::try_from(t_persist.elapsed().as_millis()).unwrap_or(u64::MAX);
1711        tracing::debug!(
1712            ms = turn.metrics_snapshot().timings.persist_message_ms,
1713            "turn timing: persist_message(user) done"
1714        );
1715        self.push_message(user_msg);
1716
1717        // Emit pre-LLM context size so the TUI gauge is non-zero before the provider responds.
1718        let context_estimate = self.runtime.providers.cached_prompt_tokens;
1719        self.update_metrics(|m| m.context_tokens = context_estimate);
1720
1721        // llm_chat_ms and tool_exec_ms are accumulated inside call_chat_with_tools and
1722        // handle_native_tool_calls respectively via metrics.pending_timings.
1723        tracing::debug!("turn timing: process_response start");
1724        let turn_had_error = if let Err(e) = self.process_response().await {
1725            // Detach any in-flight learning tasks before mutating message state.
1726            self.services.learning_engine.learning_tasks.detach_all();
1727            tracing::error!("Response processing failed: {e:#}");
1728
1729            // Record provider failure timestamp so the next turn can skip
1730            // expensive context preparation while providers are known-down.
1731            if e.is_no_providers() {
1732                self.runtime.lifecycle.last_no_providers_at = Some(std::time::Instant::now());
1733                let backoff_secs = self.runtime.config.timeouts.no_providers_backoff_secs;
1734                tracing::warn!(
1735                    backoff_secs,
1736                    "no providers available; backing off before next turn"
1737                );
1738                tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await;
1739            }
1740
1741            let user_msg = format!("Error: {e:#}");
1742            self.channel.send(&user_msg).await?;
1743            self.msg.messages.pop();
1744            self.recompute_prompt_tokens();
1745            self.channel.flush_chunks().await?;
1746            true
1747        } else {
1748            // Detach learning tasks spawned this turn — they are fire-and-forget and must not
1749            // leak into the next turn's context.
1750            self.services.learning_engine.learning_tasks.detach_all();
1751            self.truncate_old_tool_results();
1752            // MagicDocs: spawn background doc updates if any are due (#2702).
1753            self.maybe_update_magic_docs();
1754            // Compression spectrum: fire-and-forget promotion scan (#3305).
1755            self.maybe_spawn_promotion_scan();
1756            false
1757        };
1758        tracing::debug!("turn timing: process_response done");
1759
1760        // MARCH self-check hook: runs after every successful response, including cache-hit path.
1761        if let Some(pipeline) = self.services.quality.clone() {
1762            self.run_self_check_for_turn(pipeline, turn.id().0).await;
1763        }
1764        // Flush pending response chunks and emit ResponseEnd exactly once per turn.
1765        // send() no longer emits ResponseEnd — flush_chunks() is the sole emitter.
1766        // When self-check appends a flag_marker chunk, this single call covers both
1767        // the main response and the marker, preventing the double response_end of #3243.
1768        let _ = self.channel.flush_chunks().await;
1769
1770        self.maybe_fire_completion_notification(turn, turn_had_error);
1771
1772        self.flush_goal_accounting(tokens_before_turn);
1773
1774        // Collect llm_chat_ms and tool_exec_ms from MetricsState.pending_timings (accumulated
1775        // by the tool execution chain) into turn.metrics so end_turn can flush them.
1776        // This is the Phase 1 bridging: existing code writes to pending_timings directly;
1777        // we harvest those values into Turn before end_turn overwrites pending_timings.
1778        turn.metrics_mut().timings.llm_chat_ms = self.runtime.metrics.pending_timings.llm_chat_ms;
1779        turn.metrics_mut().timings.tool_exec_ms = self.runtime.metrics.pending_timings.tool_exec_ms;
1780
1781        Ok(())
1782    }
1783
1784    /// Wire the per-turn cancellation token into the cancel bridge.
1785    ///
1786    /// The bridge translates `cancel_signal` (Notify) into a `CancellationToken` cancel so that
1787    /// channel-level abort requests propagate to the in-flight LLM call. The previous bridge task
1788    /// is aborted before a new one is spawned to prevent unbounded accumulation (#2737).
1789    fn wire_cancel_bridge(&mut self, turn_token: &tokio_util::sync::CancellationToken) {
1790        let signal = Arc::clone(&self.runtime.lifecycle.cancel_signal);
1791        let token = turn_token.clone();
1792        // Keep lifecycle.cancel_token in sync so existing code that reads it still works.
1793        self.runtime.lifecycle.cancel_token = turn_token.clone();
1794        if let Some(prev) = self.runtime.lifecycle.cancel_bridge_handle.take() {
1795            prev.abort();
1796        }
1797        self.runtime.lifecycle.cancel_bridge_handle =
1798            Some(self.runtime.lifecycle.task_supervisor.spawn_oneshot(
1799                std::sync::Arc::from("agent.lifecycle.cancel_bridge"),
1800                move || async move {
1801                    signal.notified().await;
1802                    token.cancel();
1803                },
1804            ));
1805    }
1806
1807    /// Reap completed background tasks, apply summarization signal, and update supervisor metrics.
1808    ///
1809    /// Called at the top of each turn, before any user message processing.
1810    fn reap_background_tasks_and_update_metrics(&mut self) {
1811        let bg_signal = self.runtime.lifecycle.supervisor.reap();
1812        if bg_signal.did_summarize {
1813            self.services.memory.persistence.unsummarized_count = 0;
1814            tracing::debug!("background summarization completed; unsummarized_count reset");
1815        }
1816        let snap = self.runtime.lifecycle.supervisor.metrics_snapshot();
1817        self.update_metrics(|m| {
1818            m.bg_inflight = snap.inflight as u64;
1819            m.bg_dropped = snap.total_dropped();
1820            m.bg_completed = snap.total_completed();
1821            m.bg_enrichment_inflight = snap.class_inflight[0] as u64;
1822            m.bg_telemetry_inflight = snap.class_inflight[1] as u64;
1823        });
1824
1825        // Update shell background run rows for TUI panel.
1826        if self.runtime.lifecycle.shell_executor_handle.is_some() {
1827            let shell_rows: Vec<crate::metrics::ShellBackgroundRunRow> = self
1828                .runtime
1829                .lifecycle
1830                .shell_executor_handle
1831                .as_ref()
1832                .map(|e| e.background_runs_snapshot())
1833                .unwrap_or_default()
1834                .into_iter()
1835                .map(|s| crate::metrics::ShellBackgroundRunRow {
1836                    run_id: truncate_shell_run_id(&s.run_id),
1837                    command: truncate_shell_command(&s.command),
1838                    elapsed_secs: s.elapsed_ms / 1000,
1839                })
1840                .collect();
1841            self.update_metrics(|m| {
1842                m.shell_background_runs = shell_rows;
1843            });
1844        }
1845
1846        // Intentional ordering: reap() runs before abort_class() so completed tasks are
1847        // accounted in the snapshot above.
1848        if self
1849            .runtime
1850            .config
1851            .supervisor_config
1852            .abort_enrichment_on_turn
1853        {
1854            self.runtime
1855                .lifecycle
1856                .supervisor
1857                .abort_class(agent_supervisor::TaskClass::Enrichment);
1858        }
1859    }
1860
1861    /// Fire completion notifications and `turn_complete` hooks after each turn.
1862    ///
1863    /// Builds [`crate::notifications::TurnSummary`] once and reuses it for both the
1864    /// [`crate::notifications::Notifier`] and any `[[hooks.turn_complete]]` entries. The
1865    /// `preview` field is already redacted by [`Self::last_assistant_preview`], so hook
1866    /// env vars carry no raw assistant output.
1867    ///
1868    /// Gating:
1869    /// - When a `Notifier` is configured, both the notifier and hooks share its
1870    ///   `should_fire` gate (`min_turn_duration_ms`, `only_on_error`, `enabled`).
1871    /// - When no `Notifier` is configured, hooks fire on every turn completion (the
1872    ///   notifier path is simply skipped).
1873    fn maybe_fire_completion_notification(&mut self, turn: &turn::Turn, is_error: bool) {
1874        let snap = turn.metrics_snapshot().timings.clone();
1875        let duration_ms = snap
1876            .prepare_context_ms
1877            .saturating_add(snap.llm_chat_ms)
1878            .saturating_add(snap.tool_exec_ms);
1879        let summary = crate::notifications::TurnSummary {
1880            duration_ms,
1881            preview: self.last_assistant_preview(160),
1882            // TODO: wire turn_tool_calls counter once LifecycleState tracks it (Phase 2).
1883            tool_calls: 0,
1884            llm_requests: self.runtime.lifecycle.turn_llm_requests,
1885            exit_status: if is_error {
1886                crate::notifications::TurnExitStatus::Error
1887            } else {
1888                crate::notifications::TurnExitStatus::Success
1889            },
1890        };
1891
1892        // Gate evaluation: notifier's should_fire result (or unconditional when absent).
1893        let gate_ok = self
1894            .runtime
1895            .lifecycle
1896            .notifier
1897            .as_ref()
1898            .is_none_or(|n| n.should_fire(&summary));
1899
1900        // 1) Existing notifier path — unchanged semantics.
1901        if let Some(ref notifier) = self.runtime.lifecycle.notifier
1902            && gate_ok
1903        {
1904            notifier.fire(&summary);
1905        }
1906
1907        // 2) turn_complete hooks — fire-and-forget via supervisor.
1908        // McpManagerDispatch wraps Arc<McpManager> and is 'static, so it can be moved
1909        // into the async block satisfying tokio::spawn's bound. The &dyn McpDispatch
1910        // borrow is created inside the future from the owned dispatch value.
1911        let hooks = self.services.session.hooks_config.turn_complete.clone();
1912        if !hooks.is_empty() && gate_ok {
1913            let mut env = std::collections::HashMap::new();
1914            env.insert(
1915                "ZEPH_TURN_DURATION_MS".to_owned(),
1916                summary.duration_ms.to_string(),
1917            );
1918            env.insert(
1919                "ZEPH_TURN_STATUS".to_owned(),
1920                if is_error { "error" } else { "success" }.to_owned(),
1921            );
1922            env.insert("ZEPH_TURN_PREVIEW".to_owned(), summary.preview.clone());
1923            env.insert(
1924                "ZEPH_TURN_LLM_REQUESTS".to_owned(),
1925                summary.llm_requests.to_string(),
1926            );
1927            let dispatch = self.mcp_dispatch();
1928            let _span = tracing::info_span!("core.agent.turn_hooks").entered();
1929            let _accepted = self.runtime.lifecycle.supervisor.spawn(
1930                agent_supervisor::TaskClass::Telemetry,
1931                "turn-complete-hooks",
1932                async move {
1933                    let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
1934                        .as_ref()
1935                        .map(|d| d as &dyn zeph_subagent::McpDispatch);
1936                    if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp, None).await
1937                    {
1938                        tracing::warn!(error = %e, "turn_complete hook failed");
1939                    }
1940                },
1941            );
1942        }
1943    }
1944
1945    /// Publish the active goal snapshot to `MetricsSnapshot` and fire `on_turn_complete`
1946    /// accounting as a tracked background task.
1947    fn flush_goal_accounting(&mut self, tokens_before: u64) {
1948        let goal_snap = self
1949            .services
1950            .goal_accounting
1951            .as_ref()
1952            .and_then(|a| a.snapshot());
1953        self.update_metrics(|m| m.active_goal = goal_snap);
1954
1955        if let Some(ref accounting) = self.services.goal_accounting {
1956            let tokens_after = self
1957                .runtime
1958                .metrics
1959                .metrics_tx
1960                .as_ref()
1961                .map_or(0, |tx| tx.borrow().total_tokens);
1962            let turn_tokens = tokens_after.saturating_sub(tokens_before);
1963            let mut spawned: Option<
1964                std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + 'static>>,
1965            > = None;
1966            accounting.on_turn_complete(turn_tokens, |fut| {
1967                spawned = Some(fut);
1968            });
1969            if let Some(fut) = spawned {
1970                let _ = self.runtime.lifecycle.supervisor.spawn(
1971                    agent_supervisor::TaskClass::Telemetry,
1972                    "goal-accounting",
1973                    fut,
1974                );
1975            }
1976        }
1977    }
1978
1979    // Returns true if the input was blocked and the caller should return Ok(()) immediately.
1980    #[tracing::instrument(
1981        name = "core.agent.pre_process_security",
1982        skip_all,
1983        level = "debug",
1984        err
1985    )]
1986    async fn pre_process_security(&mut self, trimmed: &str) -> Result<bool, error::AgentError> {
1987        // Guardrail: LLM-based prompt injection pre-screening at the user input boundary.
1988        if let Some(ref guardrail) = self.services.security.guardrail {
1989            use zeph_sanitizer::guardrail::GuardrailVerdict;
1990            let verdict = guardrail.check(trimmed).await;
1991            match &verdict {
1992                GuardrailVerdict::Flagged { reason, .. } => {
1993                    tracing::warn!(
1994                        reason = %reason,
1995                        should_block = verdict.should_block(),
1996                        "guardrail flagged user input"
1997                    );
1998                    if verdict.should_block() {
1999                        let msg = format!("[guardrail] Input blocked: {reason}");
2000                        let _ = self.channel.send(&msg).await;
2001                        let _ = self.channel.flush_chunks().await;
2002                        return Ok(true);
2003                    }
2004                    // Warn mode: notify but continue.
2005                    let _ = self
2006                        .channel
2007                        .send(&format!("[guardrail] Warning: {reason}"))
2008                        .await;
2009                }
2010                GuardrailVerdict::Error { error } => {
2011                    if guardrail.error_should_block() {
2012                        tracing::warn!(%error, "guardrail check failed (fail_strategy=closed), blocking input");
2013                        let msg = "[guardrail] Input blocked: check failed (see logs for details)";
2014                        let _ = self.channel.send(msg).await;
2015                        let _ = self.channel.flush_chunks().await;
2016                        return Ok(true);
2017                    }
2018                    tracing::warn!(%error, "guardrail check failed (fail_strategy=open), allowing input");
2019                }
2020                _ => {}
2021            }
2022        }
2023
2024        // ML classifier: lightweight injection detection on user input boundary.
2025        // Runs after guardrail (LLM-based) to layer defenses. On detection, blocks and returns.
2026        // Falls back to regex on classifier error/timeout — never degrades below regex baseline.
2027        // Gated by `scan_user_input`: DeBERTa is tuned for external/untrusted content, not
2028        // direct user chat. Disabled by default to prevent false positives on benign messages.
2029        #[cfg(feature = "classifiers")]
2030        if self.services.security.sanitizer.scan_user_input() {
2031            match self
2032                .services
2033                .security
2034                .sanitizer
2035                .classify_injection(trimmed)
2036                .await
2037            {
2038                zeph_sanitizer::InjectionVerdict::Blocked => {
2039                    self.push_classifier_metrics();
2040                    let _ = self
2041                        .channel
2042                        .send("[security] Input blocked: injection detected by classifier.")
2043                        .await;
2044                    let _ = self.channel.flush_chunks().await;
2045                    return Ok(true);
2046                }
2047                zeph_sanitizer::InjectionVerdict::Suspicious => {
2048                    tracing::warn!("injection_classifier soft_signal on user input");
2049                }
2050                zeph_sanitizer::InjectionVerdict::Clean => {}
2051                _ => {}
2052            }
2053        }
2054        #[cfg(feature = "classifiers")]
2055        self.push_classifier_metrics();
2056
2057        Ok(false)
2058    }
2059
2060    /// Run `advance_context_lifecycle` with provider-health gating and a wall-clock timeout.
2061    ///
2062    /// Skips context preparation entirely when providers failed on the previous turn and the
2063    /// `no_providers_backoff_secs` window has not yet elapsed. When providers are available,
2064    /// wraps the call with `context_prep_timeout_secs` to prevent a stall when embed backends
2065    /// are rate-limited or unavailable (#3357).
2066    async fn advance_context_lifecycle_guarded(&mut self, text: &str, trimmed: &str) {
2067        let backoff_secs = self.runtime.config.timeouts.no_providers_backoff_secs;
2068        let prep_timeout_secs = self.runtime.config.timeouts.context_prep_timeout_secs;
2069
2070        // Skip expensive memory recall / embedding when providers are known-down.
2071        let providers_recently_failed = self
2072            .runtime
2073            .lifecycle
2074            .last_no_providers_at
2075            .is_some_and(|t| t.elapsed().as_secs() < backoff_secs);
2076
2077        if providers_recently_failed {
2078            tracing::warn!(
2079                backoff_secs,
2080                "skipping context preparation: providers were unavailable on last turn"
2081            );
2082            return;
2083        }
2084
2085        let timeout_dur = std::time::Duration::from_secs(prep_timeout_secs);
2086        match tokio::time::timeout(timeout_dur, self.advance_context_lifecycle(text, trimmed)).await
2087        {
2088            Ok(()) => {}
2089            Err(_elapsed) => {
2090                tracing::warn!(
2091                    timeout_secs = prep_timeout_secs,
2092                    "context preparation timed out; proceeding with degraded context"
2093                );
2094            }
2095        }
2096    }
2097
2098    #[tracing::instrument(
2099        name = "core.agent.advance_context_lifecycle",
2100        skip_all,
2101        level = "debug"
2102    )]
2103    async fn advance_context_lifecycle(&mut self, text: &str, trimmed: &str) {
2104        // Reset per-message pruning cache at the start of each turn (#2298).
2105        self.services.mcp.pruning_cache.reset();
2106
2107        // Extract before rebuild_system_prompt so the value is not tainted
2108        // by the secrets-bearing system prompt (ConversationId is just an i64).
2109        let conv_id = self.services.memory.persistence.conversation_id;
2110        self.rebuild_system_prompt(text).await;
2111
2112        self.detect_and_record_corrections(trimmed, conv_id).await;
2113        self.services.learning_engine.tick();
2114        self.analyze_and_learn().await;
2115        self.sync_graph_counts().await;
2116
2117        // Reset per-turn compaction guard FIRST so SideQuest sees a clean slate (C2 fix).
2118        // complete_focus and maybe_sidequest_eviction set this flag when they run (C1 fix).
2119        // advance_turn() transitions CompactedThisTurn → Cooling/Ready; all other states
2120        // pass through unchanged. See CompactionState::advance_turn for ordering guarantees.
2121        self.context_manager
2122            .set_compaction_state(self.context_manager.compaction_state().advance_turn());
2123
2124        // Tick Focus Agent and SideQuest turn counters (#1850, #1885).
2125        {
2126            self.services.focus.tick();
2127
2128            // SideQuest eviction: runs every N user turns when enabled.
2129            // Skipped when is_compacted_this_turn (focus truncation or prior eviction ran).
2130            let sidequest_should_fire = self.services.sidequest.tick();
2131            if sidequest_should_fire
2132                && !self
2133                    .context_manager
2134                    .compaction_state()
2135                    .is_compacted_this_turn()
2136            {
2137                self.maybe_sidequest_eviction();
2138            }
2139        }
2140
2141        // Experience memory: evolution sweep (fire-and-forget). Runs every N user turns,
2142        // gated on graph + experience config, and only when both stores are attached.
2143        {
2144            let cfg = &self.services.memory.extraction.graph_config.experience;
2145            if cfg.enabled
2146                && cfg.evolution_sweep_enabled
2147                && cfg.evolution_sweep_interval > 0
2148                && self
2149                    .services
2150                    .sidequest
2151                    .turn_counter
2152                    .checked_rem(cfg.evolution_sweep_interval as u64)
2153                    == Some(0)
2154                && let Some(memory) = self.services.memory.persistence.memory.as_ref()
2155                && let (Some(exp), Some(graph)) =
2156                    (memory.experience.as_ref(), memory.graph_store.as_ref())
2157            {
2158                let exp = std::sync::Arc::clone(exp);
2159                let graph = std::sync::Arc::clone(graph);
2160                let threshold = cfg.confidence_prune_threshold;
2161                let turn = self.services.sidequest.turn_counter;
2162                let accepted = self.runtime.lifecycle.supervisor.spawn(
2163                    agent_supervisor::TaskClass::Telemetry,
2164                    "experience-sweep",
2165                    async move {
2166                        match exp.evolution_sweep(graph.as_ref(), threshold).await {
2167                            Ok(stats) => tracing::info!(
2168                                turn,
2169                                self_loops = stats.pruned_self_loops,
2170                                low_confidence = stats.pruned_low_confidence,
2171                                "evolution sweep complete",
2172                            ),
2173                            Err(e) => tracing::warn!(
2174                                turn,
2175                                error = %e,
2176                                "evolution sweep failed",
2177                            ),
2178                        }
2179                    },
2180                );
2181                if !accepted {
2182                    tracing::warn!(
2183                        turn = self.services.sidequest.turn_counter,
2184                        "experience-sweep dropped (telemetry class at capacity)",
2185                    );
2186                }
2187            }
2188        }
2189
2190        // Cache-expiry warning (#2715): notify user when prompt cache has likely expired.
2191        if let Some(warning) = self.cache_expiry_warning() {
2192            tracing::info!(warning, "cache expiry warning");
2193            let _ = self.channel.send_status(&warning).await;
2194        }
2195
2196        // Time-based microcompact (#2699): strip stale low-value tool outputs before compaction.
2197        // Zero-LLM-cost; runs only when session gap exceeds configured threshold.
2198        self.maybe_time_based_microcompact();
2199
2200        // Tier 0: batch-apply deferred tool summaries when approaching context limit.
2201        // This is a pure in-memory operation (no LLM call) — summaries were pre-computed
2202        // during the tool loop. Intentionally does NOT set compacted_this_turn, so
2203        // proactive/reactive compaction may still fire if tokens remain above their thresholds.
2204        self.maybe_apply_deferred_summaries();
2205        self.flush_deferred_summaries().await;
2206
2207        // Proactive compression fires first (if configured); if it runs, reactive is skipped.
2208        if let Err(e) = self.maybe_proactive_compress().await {
2209            tracing::warn!("proactive compression failed: {e:#}");
2210        }
2211
2212        if let Err(e) = self.maybe_compact().await {
2213            tracing::warn!("context compaction failed: {e:#}");
2214        }
2215
2216        if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
2217            tracing::warn!("context preparation failed: {e:#}");
2218        }
2219
2220        // MAR: propagate top-1 recall confidence to the router for cost-aware routing.
2221        self.provider
2222            .set_memory_confidence(self.services.memory.persistence.last_recall_confidence);
2223
2224        self.services.learning_engine.reset_reflection();
2225    }
2226
2227    fn build_user_message(
2228        &mut self,
2229        text: &str,
2230        image_parts: Vec<zeph_llm::provider::MessagePart>,
2231    ) -> Message {
2232        let mut all_image_parts = std::mem::take(&mut self.msg.pending_image_parts);
2233        all_image_parts.extend(image_parts);
2234
2235        if !all_image_parts.is_empty() && self.provider.supports_vision() {
2236            let mut parts = vec![zeph_llm::provider::MessagePart::Text {
2237                text: text.to_owned(),
2238            }];
2239            parts.extend(all_image_parts);
2240            Message::from_parts(Role::User, parts)
2241        } else {
2242            if !all_image_parts.is_empty() {
2243                tracing::warn!(
2244                    count = all_image_parts.len(),
2245                    "image attachments dropped: provider does not support vision"
2246                );
2247            }
2248            Message {
2249                role: Role::User,
2250                content: text.to_owned(),
2251                parts: vec![],
2252                metadata: MessageMetadata::default(),
2253            }
2254        }
2255    }
2256
2257    /// Drain any ready [`zeph_tools::BackgroundCompletion`]s from the channel into
2258    /// `pending_background_completions`. Bounded by `BACKGROUND_COMPLETION_BUFFER_CAP`;
2259    /// on overflow the oldest entry is evicted and a placeholder is inserted.
2260    fn drain_background_completions(&mut self) {
2261        const BACKGROUND_COMPLETION_BUFFER_CAP: usize = 16;
2262
2263        let Some(ref mut rx) = self.runtime.lifecycle.background_completion_rx else {
2264            return;
2265        };
2266        // Non-blocking drain: collect all completions that are already ready.
2267        while let Ok(completion) = rx.try_recv() {
2268            if self.runtime.lifecycle.pending_background_completions.len()
2269                >= BACKGROUND_COMPLETION_BUFFER_CAP
2270            {
2271                tracing::warn!(
2272                    run_id = %completion.run_id,
2273                    "background completion buffer full; dropping run result"
2274                );
2275                // Buffer is full: drop the oldest queued completion and push a sentinel
2276                // for the new (incoming) run so the LLM is informed its result was lost.
2277                self.runtime
2278                    .lifecycle
2279                    .pending_background_completions
2280                    .pop_front();
2281                self.runtime
2282                    .lifecycle
2283                    .pending_background_completions
2284                    .push_back(zeph_tools::BackgroundCompletion {
2285                        run_id: completion.run_id,
2286                        exit_code: -1,
2287                        success: false,
2288                        elapsed_ms: 0,
2289                        command: completion.command,
2290                        output: format!(
2291                            "[background result for run {} dropped: buffer overflow]",
2292                            completion.run_id
2293                        ),
2294                    });
2295            } else {
2296                self.runtime
2297                    .lifecycle
2298                    .pending_background_completions
2299                    .push_back(completion);
2300            }
2301        }
2302    }
2303
2304    /// Format and drain `pending_background_completions` into a prefix string, then
2305    /// return the final merged text (prefix + user message). When there are no pending
2306    /// completions the original text is returned unchanged.
2307    fn build_user_message_text_with_bg_completions(&mut self, user_text: &str) -> String {
2308        if self
2309            .runtime
2310            .lifecycle
2311            .pending_background_completions
2312            .is_empty()
2313        {
2314            return user_text.to_owned();
2315        }
2316        let mut parts = String::new();
2317        for completion in self
2318            .runtime
2319            .lifecycle
2320            .pending_background_completions
2321            .drain(..)
2322        {
2323            let _ = write!(
2324                parts,
2325                "[Background task {} completed]\nexit_code: {}\nsuccess: {}\nelapsed_ms: {}\ncommand: {}\n\n{}\n\n",
2326                completion.run_id,
2327                completion.exit_code,
2328                completion.success,
2329                completion.elapsed_ms,
2330                completion.command,
2331                completion.output,
2332            );
2333        }
2334        parts.push_str(user_text);
2335        parts
2336    }
2337
2338    /// Poll a sub-agent until it reaches a terminal state, bridging secret requests to the
2339    /// channel. Returns a human-readable status string and success flag suitable for
2340    /// sending to the user and emitting lifecycle events.
2341    async fn poll_subagent_until_done(
2342        &mut self,
2343        task_id: &str,
2344        label: &str,
2345    ) -> Option<(String, bool)> {
2346        use zeph_subagent::SubAgentState;
2347        let result = loop {
2348            tokio::time::sleep(std::time::Duration::from_millis(500)).await;
2349
2350            // Bridge secret requests from sub-agent to channel.confirm().
2351            // Fetch the pending request first, then release the borrow before
2352            // calling channel.confirm() (which requires &mut self).
2353            #[allow(clippy::redundant_closure_for_method_calls)]
2354            let pending = self
2355                .services
2356                .orchestration
2357                .subagent_manager
2358                .as_mut()
2359                .and_then(|m| m.try_recv_secret_request());
2360            if let Some((req_task_id, req)) = pending {
2361                // req.secret_key is pre-validated to [a-zA-Z0-9_-] in manager.rs
2362                // (SEC-P1-02), so it is safe to embed in the prompt string.
2363                let confirm_prompt = format!(
2364                    "Sub-agent requests secret '{}'. Allow?",
2365                    crate::text::truncate_to_chars(&req.secret_key, 100)
2366                );
2367                let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
2368                if let Some(mgr) = self.services.orchestration.subagent_manager.as_mut() {
2369                    if approved {
2370                        let ttl = std::time::Duration::from_mins(5);
2371                        let key = req.secret_key.clone();
2372                        if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
2373                            let _ = mgr.deliver_secret(&req_task_id, key);
2374                        }
2375                    } else {
2376                        let _ = mgr.deny_secret(&req_task_id);
2377                    }
2378                }
2379            }
2380
2381            let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2382            let statuses = mgr.statuses();
2383            let Some((_, status)) = statuses.iter().find(|(id, _)| id == task_id) else {
2384                break (format!("{label} completed (no status available)."), true);
2385            };
2386            match status.state {
2387                SubAgentState::Completed => {
2388                    let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
2389                    break (format!("{label} completed: {msg}"), true);
2390                }
2391                SubAgentState::Failed => {
2392                    let msg = status
2393                        .last_message
2394                        .clone()
2395                        .unwrap_or_else(|| "unknown error".into());
2396                    break (format!("{label} failed: {msg}"), false);
2397                }
2398                SubAgentState::Canceled => {
2399                    break (format!("{label} was cancelled."), false);
2400                }
2401                _ => {
2402                    let _ = self
2403                        .channel
2404                        .send_status(&format!(
2405                            "{label}: turn {}/{}",
2406                            status.turns_used,
2407                            self.services
2408                                .orchestration
2409                                .subagent_manager
2410                                .as_ref()
2411                                .and_then(|m| m.agents_def(task_id))
2412                                .map_or(20, |d| d.permissions.max_turns)
2413                        ))
2414                        .await;
2415                }
2416            }
2417        };
2418        Some(result)
2419    }
2420
2421    /// Resolve a unique full `task_id` from a prefix. Returns `None` if the manager is absent,
2422    /// `Some(Err(msg))` on ambiguity/not-found, `Some(Ok(full_id))` on success.
2423    fn resolve_agent_id_prefix(&mut self, prefix: &str) -> Option<Result<String, String>> {
2424        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2425        let full_ids: Vec<String> = mgr
2426            .statuses()
2427            .into_iter()
2428            .map(|(tid, _)| tid)
2429            .filter(|tid| tid.starts_with(prefix))
2430            .collect();
2431        Some(match full_ids.as_slice() {
2432            [] => Err(format!("No sub-agent with id prefix '{prefix}'")),
2433            [fid] => Ok(fid.clone()),
2434            _ => Err(format!(
2435                "Ambiguous id prefix '{prefix}': matches {} agents",
2436                full_ids.len()
2437            )),
2438        })
2439    }
2440
2441    fn handle_agent_list(&self) -> Option<String> {
2442        use std::fmt::Write as _;
2443        let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2444        let defs = mgr.definitions();
2445        if defs.is_empty() {
2446            return Some("No sub-agent definitions found.".into());
2447        }
2448        let mut out = String::from("Available sub-agents:\n");
2449        for d in defs {
2450            let memory_label = match d.memory {
2451                Some(zeph_subagent::MemoryScope::User) => " [memory:user]",
2452                Some(zeph_subagent::MemoryScope::Project) => " [memory:project]",
2453                Some(zeph_subagent::MemoryScope::Local) => " [memory:local]",
2454                Some(_) => " [memory:unknown]",
2455                None => "",
2456            };
2457            if let Some(ref src) = d.source {
2458                let _ = writeln!(
2459                    out,
2460                    "  {}{} — {} ({})",
2461                    d.name, memory_label, d.description, src
2462                );
2463            } else {
2464                let _ = writeln!(out, "  {}{} — {}", d.name, memory_label, d.description);
2465            }
2466        }
2467        Some(out)
2468    }
2469
2470    fn handle_agent_status(&self) -> Option<String> {
2471        use std::fmt::Write as _;
2472        let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2473        let statuses = mgr.statuses();
2474        if statuses.is_empty() {
2475            return Some("No active sub-agents.".into());
2476        }
2477        let mut out = String::from("Active sub-agents:\n");
2478        for (id, s) in &statuses {
2479            let state = format!("{:?}", s.state).to_lowercase();
2480            let elapsed = s.started_at.elapsed().as_secs();
2481            let _ = writeln!(
2482                out,
2483                "  [{short}] {state}  turns={t}  elapsed={elapsed}s  {msg}",
2484                short = &id[..8.min(id.len())],
2485                t = s.turns_used,
2486                msg = s.last_message.as_deref().unwrap_or(""),
2487            );
2488            // Show memory directory path for agents with memory enabled.
2489            if let Some(def) = mgr.agents_def(id)
2490                && let Some(scope) = def.memory
2491                && let Ok(dir) = zeph_subagent::memory::resolve_memory_dir(scope, &def.name)
2492            {
2493                let _ = writeln!(out, "       memory: {}", dir.display());
2494            }
2495        }
2496        Some(out)
2497    }
2498
2499    fn handle_agent_approve(&mut self, id: &str) -> Option<String> {
2500        let full_id = match self.resolve_agent_id_prefix(id)? {
2501            Ok(fid) => fid,
2502            Err(msg) => return Some(msg),
2503        };
2504        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2505        if let Some((tid, req)) = mgr.try_recv_secret_request()
2506            && tid == full_id
2507        {
2508            let key = req.secret_key.clone();
2509            let ttl = std::time::Duration::from_mins(5);
2510            if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
2511                return Some(format!("Approve failed: {e}"));
2512            }
2513            if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
2514                return Some(format!("Secret delivery failed: {e}"));
2515            }
2516            return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
2517        }
2518        Some(format!(
2519            "No pending secret request for sub-agent '{full_id}'."
2520        ))
2521    }
2522
2523    fn handle_agent_deny(&mut self, id: &str) -> Option<String> {
2524        let full_id = match self.resolve_agent_id_prefix(id)? {
2525            Ok(fid) => fid,
2526            Err(msg) => return Some(msg),
2527        };
2528        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2529        match mgr.deny_secret(&full_id) {
2530            Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
2531            Err(e) => Some(format!("Deny failed: {e}")),
2532        }
2533    }
2534
2535    async fn handle_agent_command(&mut self, cmd: zeph_subagent::AgentCommand) -> Option<String> {
2536        use zeph_subagent::AgentCommand;
2537
2538        match cmd {
2539            AgentCommand::List => self.handle_agent_list(),
2540            AgentCommand::Background { name, prompt } => {
2541                self.handle_agent_background(&name, &prompt)
2542            }
2543            AgentCommand::Spawn { name, prompt }
2544            | AgentCommand::Mention {
2545                agent: name,
2546                prompt,
2547            } => self.handle_agent_spawn_foreground(&name, &prompt).await,
2548            AgentCommand::Status => self.handle_agent_status(),
2549            AgentCommand::Cancel { id } => self.handle_agent_cancel(&id),
2550            AgentCommand::Approve { id } => self.handle_agent_approve(&id),
2551            AgentCommand::Deny { id } => self.handle_agent_deny(&id),
2552            AgentCommand::Resume { id, prompt } => self.handle_agent_resume(&id, &prompt).await,
2553            _ => None,
2554        }
2555    }
2556
2557    /// Return the sub-agent definitions section formatted for the `/agents` fleet view.
2558    ///
2559    /// Produces a "Sub-agents:" header followed by one line per definition.
2560    /// Returns an empty string when no sub-agent manager is configured.
2561    pub(crate) fn handle_agents_definitions_list(&self) -> String {
2562        use std::fmt::Write as _;
2563
2564        let Some(mgr) = self.services.orchestration.subagent_manager.as_ref() else {
2565            return String::new();
2566        };
2567        let defs = mgr.definitions();
2568        if defs.is_empty() {
2569            return String::new();
2570        }
2571        let mut out = String::from("Sub-agents:\n");
2572        for d in defs {
2573            let memory_label = match d.memory {
2574                Some(zeph_subagent::MemoryScope::User) => " [memory:user]",
2575                Some(zeph_subagent::MemoryScope::Project) => " [memory:project]",
2576                Some(zeph_subagent::MemoryScope::Local) => " [memory:local]",
2577                Some(_) => " [memory:unknown]",
2578                None => "",
2579            };
2580            if let Some(ref src) = d.source {
2581                let _ = writeln!(
2582                    out,
2583                    "  {}{} — {} ({})",
2584                    d.name, memory_label, d.description, src
2585                );
2586            } else {
2587                let _ = writeln!(out, "  {}{} — {}", d.name, memory_label, d.description);
2588            }
2589        }
2590        out
2591    }
2592
2593    /// Execute an `/agents` CRUD subcommand and return a formatted string.
2594    ///
2595    /// Handles `show`, `create`, `edit`, `delete` (the `list` case is handled by
2596    /// [`handle_agents_definitions_list`] and never reaches this method).
2597    pub(crate) fn handle_agents_crud(&mut self, cmd: zeph_subagent::AgentsCommand) -> String {
2598        use zeph_subagent::AgentsCommand;
2599
2600        let Some(mgr) = self.services.orchestration.subagent_manager.as_ref() else {
2601            return "Sub-agent manager is not available.".to_owned();
2602        };
2603
2604        match cmd {
2605            AgentsCommand::List => self.handle_agents_definitions_list(),
2606            AgentsCommand::Show { name } => {
2607                match mgr.definitions().iter().find(|d| d.name == name) {
2608                    Some(d) => format!(
2609                        "Agent: {}\nDescription: {}\nSource: {}\n",
2610                        d.name,
2611                        d.description,
2612                        d.source.as_deref().unwrap_or("unknown"),
2613                    ),
2614                    None => format!("No sub-agent definition named '{name}'."),
2615                }
2616            }
2617            AgentsCommand::Create { name } => {
2618                format!(
2619                    "To create a sub-agent definition, create a file at `.zeph/agents/{name}.md`.\n\
2620                     See the sub-agent documentation for the required frontmatter."
2621                )
2622            }
2623            AgentsCommand::Edit { name } => {
2624                format!("To edit '{name}', open its definition file in `.zeph/agents/{name}.md`.")
2625            }
2626            AgentsCommand::Delete { name } => {
2627                format!("To delete '{name}', remove the file `.zeph/agents/{name}.md`.")
2628            }
2629            _ => "Unknown agents command.".to_owned(),
2630        }
2631    }
2632
2633    fn handle_agent_background(&mut self, name: &str, prompt: &str) -> Option<String> {
2634        let provider = self.provider.clone();
2635        let tool_executor = Arc::clone(&self.tool_executor);
2636        let skills = self.filtered_skills_for(name);
2637        let cfg = self.services.orchestration.subagent_config.clone();
2638        let spawn_ctx = self.build_spawn_context(&cfg);
2639        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2640        match mgr.spawn(
2641            name,
2642            prompt,
2643            provider,
2644            tool_executor,
2645            skills,
2646            &cfg,
2647            spawn_ctx,
2648        ) {
2649            Ok(id) => Some(format!(
2650                "Sub-agent '{name}' started in background (id: {short})",
2651                short = &id[..8.min(id.len())]
2652            )),
2653            Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
2654        }
2655    }
2656
2657    async fn handle_agent_spawn_foreground(&mut self, name: &str, prompt: &str) -> Option<String> {
2658        let provider = self.provider.clone();
2659        let tool_executor = Arc::clone(&self.tool_executor);
2660        let skills = self.filtered_skills_for(name);
2661        let cfg = self.services.orchestration.subagent_config.clone();
2662        let spawn_ctx = self.build_spawn_context(&cfg);
2663        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2664        let task_id = match mgr.spawn(
2665            name,
2666            prompt,
2667            provider,
2668            tool_executor,
2669            skills,
2670            &cfg,
2671            spawn_ctx,
2672        ) {
2673            Ok(id) => id,
2674            Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
2675        };
2676        let short = task_id[..8.min(task_id.len())].to_owned();
2677        let _ = self
2678            .channel
2679            .send(&format!("Sub-agent '{name}' running... (id: {short})"))
2680            .await;
2681        let _ = self
2682            .channel
2683            .notify_foreground_subagent_started(&task_id, name)
2684            .await;
2685        let label = format!("Sub-agent '{name}'");
2686        let Some((result, success)) = self.poll_subagent_until_done(&task_id, &label).await else {
2687            // Manager was dropped mid-poll; emit completed(false) so TUI does not stay stuck.
2688            let _ = self
2689                .channel
2690                .notify_foreground_subagent_completed(&task_id, name, false)
2691                .await;
2692            return None;
2693        };
2694        let _ = self
2695            .channel
2696            .notify_foreground_subagent_completed(&task_id, name, success)
2697            .await;
2698        Some(result)
2699    }
2700
2701    fn handle_agent_cancel(&mut self, id: &str) -> Option<String> {
2702        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2703        // Accept prefix match on task_id.
2704        let ids: Vec<String> = mgr
2705            .statuses()
2706            .into_iter()
2707            .map(|(task_id, _)| task_id)
2708            .filter(|task_id| task_id.starts_with(id))
2709            .collect();
2710        match ids.as_slice() {
2711            [] => Some(format!("No sub-agent with id prefix '{id}'")),
2712            [full_id] => {
2713                let full_id = full_id.clone();
2714                match mgr.cancel(&full_id) {
2715                    Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
2716                    Err(e) => Some(format!("Cancel failed: {e}")),
2717                }
2718            }
2719            _ => Some(format!(
2720                "Ambiguous id prefix '{id}': matches {} agents",
2721                ids.len()
2722            )),
2723        }
2724    }
2725
2726    async fn handle_agent_resume(&mut self, id: &str, prompt: &str) -> Option<String> {
2727        let cfg = self.services.orchestration.subagent_config.clone();
2728        // Resolve definition name from transcript meta before spawning so we can
2729        // look up skills by definition name rather than the UUID prefix (S1 fix).
2730        let def_name = {
2731            let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2732            match mgr.def_name_for_resume(id, &cfg) {
2733                Ok(name) => name,
2734                Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2735            }
2736        };
2737        let skills = self.filtered_skills_for(&def_name);
2738        let provider = self.provider.clone();
2739        let tool_executor = Arc::clone(&self.tool_executor);
2740        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2741        let (task_id, _) = match mgr.resume(id, prompt, provider, tool_executor, skills, &cfg, None)
2742        {
2743            Ok(pair) => pair,
2744            Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2745        };
2746        let short = task_id[..8.min(task_id.len())].to_owned();
2747        let _ = self
2748            .channel
2749            .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
2750            .await;
2751        let _ = self
2752            .channel
2753            .notify_foreground_subagent_started(&task_id, &def_name)
2754            .await;
2755        let Some((result, success)) = self
2756            .poll_subagent_until_done(&task_id, "Resumed sub-agent")
2757            .await
2758        else {
2759            // Manager was dropped mid-poll; emit completed(false) so TUI does not stay stuck.
2760            let _ = self
2761                .channel
2762                .notify_foreground_subagent_completed(&task_id, &def_name, false)
2763                .await;
2764            return None;
2765        };
2766        let _ = self
2767            .channel
2768            .notify_foreground_subagent_completed(&task_id, &def_name, success)
2769            .await;
2770        Some(result)
2771    }
2772
2773    fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
2774        let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2775        let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
2776        let reg = self.services.skill.registry.read();
2777        match zeph_subagent::filter_skills(&reg, &def.skills) {
2778            Ok(skills) => {
2779                let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
2780                if bodies.is_empty() {
2781                    None
2782                } else {
2783                    Some(bodies)
2784                }
2785            }
2786            Err(e) => {
2787                tracing::warn!(error = %e, "skill filtering failed for sub-agent");
2788                None
2789            }
2790        }
2791    }
2792
2793    /// Build a `SpawnContext` from current agent state for sub-agent spawning.
2794    fn build_spawn_context(
2795        &self,
2796        cfg: &zeph_config::SubAgentConfig,
2797    ) -> zeph_subagent::SpawnContext {
2798        zeph_subagent::SpawnContext {
2799            parent_messages: self.extract_parent_messages(cfg),
2800            parent_cancel: Some(self.runtime.lifecycle.cancel_token.clone()),
2801            parent_provider_name: {
2802                let name = &self.runtime.config.active_provider_name;
2803                if name.is_empty() {
2804                    None
2805                } else {
2806                    Some(name.clone())
2807                }
2808            },
2809            spawn_depth: self.runtime.config.spawn_depth,
2810            mcp_tool_names: self.extract_mcp_tool_names(),
2811            // F3 spec 050 §4: propagate seeded score when parent is >= Elevated.
2812            seed_trajectory_score: {
2813                let child = self.services.security.trajectory.spawn_child();
2814                let score = child.score_now();
2815                if score > 0.0 { Some(score) } else { None }
2816            },
2817            content_isolation: self.runtime.config.security.content_isolation.clone(),
2818            orchestrator_name: Some("zeph".to_owned()),
2819            orchestrator_role: Some("orchestrator".to_owned()),
2820            session_mcp_servers: Vec::new(),
2821            // Constraint propagation (#3993): populated by orchestration layer when spawning
2822            // with explicit trust/tool restrictions. Top-level agent sessions leave these None.
2823            ..Default::default()
2824        }
2825    }
2826
2827    /// Extract recent parent messages for history propagation (Section 5.7 in spec).
2828    ///
2829    /// Filters system messages, applies `context_window_turns` and `max_parent_messages` caps,
2830    /// applies a 25% context window cap using a 4-chars-per-token heuristic, prunes orphaned
2831    /// `ToolUse`/`ToolResult` pairs at the slice boundary, and optionally sanitizes text parts
2832    /// through the IPI pipeline according to `parent_context_policy`.
2833    fn extract_parent_messages(
2834        &self,
2835        config: &zeph_config::SubAgentConfig,
2836    ) -> Vec<zeph_llm::provider::Message> {
2837        use zeph_config::ParentContextPolicy;
2838        use zeph_llm::provider::Role;
2839
2840        if config.parent_context_policy == ParentContextPolicy::None
2841            || config.context_window_turns == 0
2842        {
2843            return Vec::new();
2844        }
2845
2846        let non_system: Vec<_> = self
2847            .msg
2848            .messages
2849            .iter()
2850            .filter(|m| m.role != Role::System)
2851            .cloned()
2852            .collect();
2853
2854        let take_count = config
2855            .context_window_turns
2856            .saturating_mul(2)
2857            .min(config.max_parent_messages);
2858        let start = non_system.len().saturating_sub(take_count);
2859        let mut msgs = non_system[start..].to_vec();
2860
2861        // Cap at 25% of model context window and prune orphaned tool pairs.
2862        let max_chars = 128_000usize / 4;
2863        let requested = msgs.len();
2864        trim_parent_messages(&mut msgs, max_chars);
2865        if msgs.len() < requested {
2866            tracing::info!(
2867                kept = msgs.len(),
2868                requested,
2869                "[subagent] truncated parent history due to token budget or orphan pruning"
2870            );
2871        }
2872
2873        if config.parent_context_policy == ParentContextPolicy::InheritSanitized {
2874            use zeph_sanitizer::{ContentSource, ContentSourceKind};
2875            let source =
2876                ContentSource::new(ContentSourceKind::A2aMessage).with_identifier("parent_history");
2877            msgs = sanitize_parent_messages(msgs, &self.services.security.sanitizer, &source);
2878        }
2879
2880        msgs
2881    }
2882
2883    /// Extract MCP tool names from the tool executor for diagnostic annotation.
2884    fn extract_mcp_tool_names(&self) -> Vec<String> {
2885        self.tool_executor
2886            .tool_definitions_erased()
2887            .into_iter()
2888            .filter(|t| t.id.starts_with("mcp_"))
2889            .map(|t| t.id.to_string())
2890            .collect()
2891    }
2892
2893    /// Classify a skill directory's source kind using on-disk markers and the bundled allowlist.
2894    ///
2895    /// Must be called from a blocking context (uses synchronous FS I/O).
2896    fn classify_source_kind(
2897        skill_dir: &std::path::Path,
2898        managed_dir: Option<&std::path::PathBuf>,
2899        bundled_names: &std::collections::HashSet<String>,
2900    ) -> zeph_memory::store::SourceKind {
2901        if managed_dir.is_some_and(|d| skill_dir.starts_with(d)) {
2902            let skill_name = skill_dir.file_name().and_then(|n| n.to_str()).unwrap_or("");
2903            let has_marker = skill_dir.join(".bundled").exists();
2904            if has_marker && bundled_names.contains(skill_name) {
2905                zeph_memory::store::SourceKind::Bundled
2906            } else {
2907                if has_marker {
2908                    tracing::warn!(
2909                        skill = %skill_name,
2910                        "skill has .bundled marker but is not in the bundled skill \
2911                         allowlist — classifying as Hub"
2912                    );
2913                }
2914                zeph_memory::store::SourceKind::Hub
2915            }
2916        } else {
2917            zeph_memory::store::SourceKind::Local
2918        }
2919    }
2920
2921    /// Update trust DB records for all reloaded skills.
2922    async fn update_trust_for_reloaded_skills(
2923        &mut self,
2924        all_meta: &[zeph_skills::loader::SkillMeta],
2925    ) {
2926        // Clone Arc before any .await so no &self fields are held across suspension points.
2927        let memory = self.services.memory.persistence.memory.clone();
2928        let Some(memory) = memory else {
2929            return;
2930        };
2931        let trust_cfg = self.services.skill.trust_config.clone();
2932        let managed_dir = self.services.skill.managed_dir.clone();
2933        let bundled_names: std::collections::HashSet<String> =
2934            zeph_skills::bundled_skill_names().into_iter().collect();
2935        for meta in all_meta {
2936            // Compute hash and classify source_kind in spawn_blocking — both are blocking FS calls
2937            // (.bundled marker .exists() and compute_skill_hash both do std::fs I/O).
2938            let skill_dir = meta.skill_dir.clone();
2939            let managed_dir_ref = managed_dir.clone();
2940            let bundled_names_ref = bundled_names.clone();
2941            let fs_result: Option<(String, zeph_memory::store::SourceKind)> =
2942                tokio::task::spawn_blocking(move || {
2943                    let hash = zeph_skills::compute_skill_hash(&skill_dir).ok()?;
2944                    let source_kind = Self::classify_source_kind(
2945                        &skill_dir,
2946                        managed_dir_ref.as_ref(),
2947                        &bundled_names_ref,
2948                    );
2949                    Some((hash, source_kind))
2950                })
2951                .await
2952                .unwrap_or(None);
2953
2954            let Some((current_hash, source_kind)) = fs_result else {
2955                tracing::warn!("failed to compute hash for '{}'", meta.name);
2956                continue;
2957            };
2958            let initial_level = match source_kind {
2959                zeph_memory::store::SourceKind::Bundled => &trust_cfg.bundled_level,
2960                zeph_memory::store::SourceKind::Local | zeph_memory::store::SourceKind::File => {
2961                    &trust_cfg.local_level
2962                }
2963                _ => &trust_cfg.default_level,
2964            };
2965            let existing = memory
2966                .sqlite()
2967                .load_skill_trust(&meta.name)
2968                .await
2969                .ok()
2970                .flatten();
2971            let trust_level = if let Some(ref row) = existing {
2972                if row.blake3_hash != current_hash {
2973                    trust_cfg.hash_mismatch_level
2974                } else if row.source_kind != source_kind {
2975                    // source_kind changed (e.g., hub → bundled on upgrade).
2976                    // Never override an explicit operator block. For active trust levels,
2977                    // adopt the source-kind initial level when it grants more trust.
2978                    let stored = row.trust_level;
2979                    if !stored.is_active() || stored.severity() <= initial_level.severity() {
2980                        stored
2981                    } else {
2982                        *initial_level
2983                    }
2984                } else {
2985                    row.trust_level
2986                }
2987            } else {
2988                *initial_level
2989            };
2990            let source_path = meta.skill_dir.to_str();
2991            if let Err(e) = memory
2992                .sqlite()
2993                .upsert_skill_trust(
2994                    &meta.name,
2995                    trust_level,
2996                    source_kind,
2997                    None,
2998                    source_path,
2999                    &current_hash,
3000                )
3001                .await
3002            {
3003                tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
3004            }
3005        }
3006    }
3007
3008    /// Rebuild or sync the in-memory skill matcher and BM25 index after a registry update.
3009    async fn rebuild_skill_matcher(&mut self, all_meta: &[&zeph_skills::loader::SkillMeta]) {
3010        let provider = self.embedding_provider.clone();
3011        let embed_timeout =
3012            std::time::Duration::from_secs(self.runtime.config.timeouts.embedding_seconds);
3013        let embed_fn = move |text: &str| -> zeph_skills::matcher::EmbedFuture {
3014            let owned = text.to_owned();
3015            let p = provider.clone();
3016            Box::pin(async move {
3017                if let Ok(result) = tokio::time::timeout(embed_timeout, p.embed(&owned)).await {
3018                    result
3019                } else {
3020                    tracing::warn!(
3021                        timeout_secs = embed_timeout.as_secs(),
3022                        "skill matcher: embedding timed out"
3023                    );
3024                    Err(zeph_llm::LlmError::Timeout)
3025                }
3026            })
3027        };
3028
3029        let needs_inmemory_rebuild = !self
3030            .services
3031            .skill
3032            .matcher
3033            .as_ref()
3034            .is_some_and(SkillMatcherBackend::is_qdrant);
3035
3036        if needs_inmemory_rebuild {
3037            self.services.skill.matcher = SkillMatcher::new(all_meta, embed_fn)
3038                .await
3039                .map(SkillMatcherBackend::InMemory);
3040        } else if let Some(ref mut backend) = self.services.skill.matcher {
3041            let _ = self.channel.send_status("syncing skill index...").await;
3042            let on_progress: Option<Box<dyn Fn(usize, usize) + Send>> =
3043                self.services.session.status_tx.clone().map(
3044                    |tx| -> Box<dyn Fn(usize, usize) + Send> {
3045                        Box::new(move |completed, total| {
3046                            let msg = format!("Syncing skills: {completed}/{total}");
3047                            let _ = tx.send(msg);
3048                        })
3049                    },
3050                );
3051            if let Err(e) = backend
3052                .sync(
3053                    all_meta,
3054                    &self.services.skill.embedding_model,
3055                    embed_fn,
3056                    on_progress,
3057                )
3058                .await
3059            {
3060                tracing::warn!("failed to sync skill embeddings: {e:#}");
3061            }
3062        }
3063
3064        if self.services.skill.hybrid_search {
3065            let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
3066            let _ = self.channel.send_status("rebuilding search index...").await;
3067            self.services.skill.rebuild_bm25(&descs);
3068        }
3069    }
3070
3071    #[tracing::instrument(name = "core.agent.reload_skills", skip_all, level = "debug")]
3072    async fn reload_skills(&mut self) {
3073        let old_fp = self.services.skill.fingerprint();
3074        let reload_paths = if let Some(ref supplier) = self.services.skill.plugin_dirs_supplier {
3075            let plugin_dirs = supplier();
3076            let mut paths = self.services.skill.skill_paths.clone();
3077            for dir in plugin_dirs {
3078                if !paths.contains(&dir) {
3079                    paths.push(dir);
3080                }
3081            }
3082            paths
3083        } else {
3084            self.services.skill.skill_paths.clone()
3085        };
3086        self.services.skill.registry.write().reload(&reload_paths);
3087        if self.services.skill.fingerprint() == old_fp {
3088            return;
3089        }
3090        let _ = self.channel.send_status("reloading skills...").await;
3091
3092        let all_meta = self
3093            .services
3094            .skill
3095            .registry
3096            .read()
3097            .all_meta()
3098            .into_iter()
3099            .cloned()
3100            .collect::<Vec<_>>();
3101
3102        self.update_trust_for_reloaded_skills(&all_meta).await;
3103
3104        let all_meta_refs = all_meta.iter().collect::<Vec<_>>();
3105        self.rebuild_skill_matcher(&all_meta_refs).await;
3106
3107        let all_skills: Vec<Skill> = {
3108            let reg = self.services.skill.registry.read();
3109            reg.all_meta()
3110                .iter()
3111                .filter_map(|m| reg.skill(&m.name).ok())
3112                .collect()
3113        };
3114        let trust_map = self.build_skill_trust_map().await;
3115        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
3116        let skills_prompt =
3117            state::SkillState::rebuild_prompt(&all_skills, &trust_map, &empty_health);
3118        self.services
3119            .skill
3120            .last_skills_prompt
3121            .clone_from(&skills_prompt);
3122        let system_prompt = build_system_prompt(&skills_prompt, None);
3123        if let Some(msg) = self.msg.messages.first_mut() {
3124            msg.content = system_prompt;
3125        }
3126
3127        let _ = self.channel.send_status("").await;
3128        tracing::info!(
3129            "reloaded {} skill(s)",
3130            self.services.skill.registry.read().all_meta().len()
3131        );
3132    }
3133
3134    async fn reload_instructions(&mut self) {
3135        // Drain any additional queued events before reloading to avoid redundant reloads.
3136        if let Some(ref mut rx) = self.runtime.instructions.reload_rx {
3137            while rx.try_recv().is_ok() {}
3138        }
3139        let Some(ref state) = self.runtime.instructions.reload_state else {
3140            return;
3141        };
3142        let base_dir = state.base_dir.clone();
3143        let provider_kinds = state.provider_kinds.clone();
3144        let explicit_files = state.explicit_files.clone();
3145        let auto_detect = state.auto_detect;
3146        let new_blocks = crate::instructions::load_instructions_async(
3147            base_dir,
3148            provider_kinds,
3149            explicit_files,
3150            auto_detect,
3151        )
3152        .await;
3153        let old_sources: std::collections::HashSet<_> = self
3154            .runtime
3155            .instructions
3156            .blocks
3157            .iter()
3158            .map(|b| &b.source)
3159            .collect();
3160        let new_sources: std::collections::HashSet<_> =
3161            new_blocks.iter().map(|b| &b.source).collect();
3162        for added in new_sources.difference(&old_sources) {
3163            tracing::info!(path = %added.display(), "instruction file added");
3164        }
3165        for removed in old_sources.difference(&new_sources) {
3166            tracing::info!(path = %removed.display(), "instruction file removed");
3167        }
3168        tracing::info!(
3169            old_count = self.runtime.instructions.blocks.len(),
3170            new_count = new_blocks.len(),
3171            "reloaded instruction files"
3172        );
3173        self.runtime.instructions.blocks = new_blocks;
3174    }
3175
3176    #[allow(clippy::too_many_lines)]
3177    fn reload_config(&mut self) {
3178        let Some(path) = self.runtime.lifecycle.config_path.clone() else {
3179            return;
3180        };
3181        let Some(config) = self.load_config_with_overlay(&path) else {
3182            return;
3183        };
3184        let budget_tokens = resolve_context_budget(&config, &self.provider);
3185        self.runtime.config.security = config.security;
3186        self.runtime.config.timeouts = config.timeouts;
3187        self.runtime.config.redact_credentials = config.memory.redact_credentials;
3188        self.services.memory.persistence.history_limit = config.memory.history_limit;
3189        self.services.memory.persistence.recall_limit = config.memory.semantic.recall_limit;
3190        self.services.memory.compaction.summarization_threshold =
3191            config.memory.summarization_threshold;
3192        self.services.skill.max_active_skills = config.skills.max_active_skills.get();
3193        self.services.skill.disambiguation_threshold = config.skills.disambiguation_threshold;
3194        self.services.skill.min_injection_score = config.skills.min_injection_score;
3195        self.services.skill.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
3196        self.services.skill.hybrid_search = config.skills.hybrid_search;
3197        {
3198            let alpha = config.skills.bm25_alpha;
3199            if !(0.0..=1.0).contains(&alpha) {
3200                tracing::warn!(
3201                    bm25_alpha = alpha,
3202                    "bm25_alpha is outside [0.0, 1.0]; clamping to valid range"
3203                );
3204            }
3205            self.services.skill.bm25_alpha = alpha.clamp(0.0, 1.0);
3206        }
3207        self.services.skill.two_stage_matching = config.skills.two_stage_matching;
3208        self.services.skill.confusability_threshold =
3209            config.skills.confusability_threshold.clamp(0.0, 1.0);
3210        self.services.skill.group_structured = config.skills.group_structured;
3211        self.services.skill.support_similarity_threshold =
3212            config.skills.support_similarity_threshold;
3213        config
3214            .skills
3215            .query_rewrite_provider
3216            .as_str()
3217            .clone_into(&mut self.services.skill.query_rewrite_provider_name);
3218        config
3219            .skills
3220            .generation_provider
3221            .as_str()
3222            .clone_into(&mut self.services.skill.generation_provider_name);
3223        config
3224            .skills
3225            .disambiguate_provider
3226            .as_str()
3227            .clone_into(&mut self.services.skill.disambiguate_provider_name);
3228        self.services.skill.generation_timeout_ms = config.skills.generation_timeout_ms;
3229        self.services.skill.semantic_scan = config.skills.semantic_scan;
3230        config
3231            .skills
3232            .semantic_scan_provider
3233            .as_str()
3234            .clone_into(&mut self.services.skill.semantic_scan_provider);
3235        self.services.skill.generation_output_dir =
3236            config.skills.generation_output_dir.as_deref().map(|p| {
3237                if let Some(stripped) = p.strip_prefix("~/") {
3238                    dirs::home_dir()
3239                        .map_or_else(|| std::path::PathBuf::from(p), |h| h.join(stripped))
3240                } else {
3241                    std::path::PathBuf::from(p)
3242                }
3243            });
3244
3245        self.context_manager.budget = Some(
3246            ContextBudget::new(budget_tokens, 0.20).with_graph_enabled(config.memory.graph.enabled),
3247        );
3248
3249        {
3250            let graph_cfg = &config.memory.graph;
3251            if graph_cfg.rpe.enabled {
3252                // Re-create router only if it doesn't exist yet; preserve state on hot-reload.
3253                if self.services.memory.extraction.rpe_router.is_none() {
3254                    self.services.memory.extraction.rpe_router =
3255                        Some(std::sync::Mutex::new(zeph_memory::RpeRouter::new(
3256                            graph_cfg.rpe.threshold,
3257                            graph_cfg.rpe.max_skip_turns,
3258                        )));
3259                }
3260            } else {
3261                self.services.memory.extraction.rpe_router = None;
3262            }
3263            self.services.memory.extraction.graph_config = graph_cfg.clone();
3264        }
3265        self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
3266        self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
3267        self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
3268        self.context_manager
3269            .set_compaction_cooldown_turns(config.memory.compaction_cooldown_turns);
3270        self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
3271        self.context_manager.compression = config.memory.compression.clone();
3272        self.context_manager.routing = config.memory.store_routing.clone();
3273        // Resolve routing_classifier_provider from the provider pool (#2484).
3274        self.context_manager.store_routing_provider = if config
3275            .memory
3276            .store_routing
3277            .routing_classifier_provider
3278            .is_empty()
3279        {
3280            None
3281        } else {
3282            let resolved = self.resolve_background_provider(
3283                config
3284                    .memory
3285                    .store_routing
3286                    .routing_classifier_provider
3287                    .as_str(),
3288            );
3289            Some(std::sync::Arc::new(resolved))
3290        };
3291        self.services
3292            .memory
3293            .persistence
3294            .cross_session_score_threshold = config.memory.cross_session_score_threshold;
3295
3296        self.services.index.repo_map_tokens = config.index.repo_map_tokens;
3297        self.services.index.repo_map_ttl =
3298            std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
3299
3300        self.services
3301            .session
3302            .hooks_config
3303            .cwd_changed
3304            .clone_from(&config.hooks.cwd_changed);
3305        self.services
3306            .session
3307            .hooks_config
3308            .permission_denied
3309            .clone_from(&config.hooks.permission_denied);
3310        self.services
3311            .session
3312            .hooks_config
3313            .turn_complete
3314            .clone_from(&config.hooks.turn_complete);
3315        // file_changed_hooks require watcher restart to take effect — skipped here.
3316
3317        tracing::info!("config reloaded");
3318    }
3319
3320    /// Load config from disk, apply plugin overlays, and warn on shell divergence.
3321    ///
3322    /// Returns `None` when loading or overlay merge fails (caller keeps prior runtime state).
3323    fn load_config_with_overlay(&mut self, path: &std::path::Path) -> Option<Config> {
3324        let mut config = match Config::load(path) {
3325            Ok(c) => c,
3326            Err(e) => {
3327                tracing::warn!("config reload failed: {e:#}");
3328                return None;
3329            }
3330        };
3331
3332        // Re-apply plugin overlays. On error, keep previous runtime state intact.
3333        let new_overlay = if self.runtime.lifecycle.plugins_dir.as_os_str().is_empty() {
3334            None
3335        } else {
3336            match zeph_plugins::apply_plugin_config_overlays(
3337                &mut config,
3338                &self.runtime.lifecycle.plugins_dir,
3339            ) {
3340                Ok(o) => Some(o),
3341                Err(e) => {
3342                    tracing::warn!(
3343                        "plugin overlay merge failed during reload: {e:#}; \
3344                         keeping previous runtime state"
3345                    );
3346                    return None;
3347                }
3348            }
3349        };
3350
3351        // M4: detect shell-level divergence from the baked-in executor and warn loudly.
3352        // ShellExecutor is not rebuilt on hot-reload; only skill threshold is live.
3353        // A follow-up P2 issue tracks live-rebuild of ShellExecutor.
3354        if let Some(ref overlay) = new_overlay {
3355            self.warn_on_shell_overlay_divergence(overlay, &config);
3356        }
3357        Some(config)
3358    }
3359
3360    /// React to shell policy divergence detected on hot-reload.
3361    ///
3362    /// `blocked_commands` is rebuilt live via `ShellPolicyHandle::rebuild` — no restart needed.
3363    /// `allowed_commands` cannot be rebuilt (feeds sandbox path intersection at construction time)
3364    /// — emit a warn + status banner when it changes.
3365    fn warn_on_shell_overlay_divergence(
3366        &self,
3367        new_overlay: &zeph_plugins::ResolvedOverlay,
3368        config: &Config,
3369    ) {
3370        let new_blocked: Vec<String> = {
3371            let mut v = config.tools.shell.blocked_commands.clone();
3372            v.sort();
3373            v
3374        };
3375        let new_allowed: Vec<String> = {
3376            let mut v = config.tools.shell.allowed_commands.clone();
3377            v.sort();
3378            v
3379        };
3380
3381        let startup = &self.runtime.lifecycle.startup_shell_overlay;
3382        let blocked_changed = new_blocked != startup.blocked;
3383        let allowed_changed = new_allowed != startup.allowed;
3384
3385        // blocked_commands IS rebuilt live — emit info-level confirmation only.
3386        if blocked_changed && let Some(ref h) = self.runtime.lifecycle.shell_policy_handle {
3387            h.rebuild(&config.tools.shell);
3388            tracing::info!(
3389                blocked_count = h.snapshot_blocked().len(),
3390                "shell blocked_commands rebuilt from hot-reload"
3391            );
3392        }
3393
3394        // allowed_commands cannot be rebuilt — sandbox path intersection is computed at
3395        // executor construction time. Warn loudly so the user restarts.
3396        //
3397        // Note: when base `allowed_commands` is empty (the default), the overlay's
3398        // intersection semantics keep it empty, so this branch is silently unreachable
3399        // for users who do not set a non-empty base list.
3400        if allowed_changed {
3401            let msg = "plugin config overlay changed shell allowed_commands; RESTART REQUIRED \
3402                 for sandbox path recomputation (blocked_commands was rebuilt live)";
3403            tracing::warn!("{msg}");
3404            if let Some(ref tx) = self.services.session.status_tx {
3405                let _ = tx.send(msg.to_owned());
3406            }
3407        }
3408
3409        let _ = new_overlay;
3410    }
3411
3412    /// Run `SideQuest` tool output eviction pass (#1885).
3413    ///
3414    /// PERF-1 fix: two-phase non-blocking design.
3415    ///
3416    /// Phase 1 (apply, this turn): check for a background LLM result spawned last turn,
3417    /// validate and apply it immediately.
3418    ///
3419    /// Phase 2 (schedule, this turn): rebuild cursors and spawn a background `tokio::spawn`
3420    /// task for the LLM call. The result is stored in `pending_sidequest_result` and applied
3421    /// next turn, so the current agent turn is never blocked by the LLM call.
3422    fn maybe_sidequest_eviction(&mut self) {
3423        // S1 runtime guard: warn when SideQuest is enabled alongside a non-Reactive pruning
3424        // strategy — the two systems share the same pool of evictable tool outputs and can
3425        // interfere. Disable sidequest.enabled when pruning_strategy != Reactive.
3426        if self.services.sidequest.config.enabled {
3427            use crate::config::PruningStrategy;
3428            if !matches!(
3429                self.context_manager.compression.pruning_strategy,
3430                PruningStrategy::Reactive
3431            ) {
3432                tracing::warn!(
3433                    strategy = ?self.context_manager.compression.pruning_strategy,
3434                    "sidequest is enabled alongside a non-Reactive pruning strategy; \
3435                     consider disabling sidequest.enabled to avoid redundant eviction"
3436                );
3437            }
3438        }
3439
3440        // Guard: do not evict while a focus session is active.
3441        if self.services.focus.is_active() {
3442            tracing::debug!("sidequest: skipping — focus session active");
3443            // Drop any pending result — cursors may be stale relative to focus truncation.
3444            self.services.compression.pending_sidequest_result = None;
3445            return;
3446        }
3447
3448        // Phase 1: apply pending result from last turn's background LLM call.
3449        self.sidequest_apply_pending();
3450
3451        // Phase 2: rebuild cursors and schedule the next background eviction LLM call.
3452        self.sidequest_schedule_next();
3453    }
3454
3455    fn sidequest_apply_pending(&mut self) {
3456        let Some(handle) = self.services.compression.pending_sidequest_result.take() else {
3457            return;
3458        };
3459        // `try_join` is non-blocking: if the task isn't done yet, `Err(handle)` is returned
3460        // and we reschedule below.
3461        let result = match handle.try_join() {
3462            Ok(result) => result,
3463            Err(_handle) => {
3464                // Task still running — drop it; a fresh one is scheduled below.
3465                tracing::debug!("sidequest: background LLM task not yet complete, rescheduling");
3466                return;
3467            }
3468        };
3469        match result {
3470            Ok(Some(evicted_indices)) if !evicted_indices.is_empty() => {
3471                let cursors_snapshot = self.services.sidequest.tool_output_cursors.clone();
3472                let freed = self.services.sidequest.apply_eviction(
3473                    &mut self.msg.messages,
3474                    &evicted_indices,
3475                    &self.runtime.metrics.token_counter,
3476                );
3477                if freed > 0 {
3478                    self.recompute_prompt_tokens();
3479                    // C1 fix: prevent maybe_compact() from firing in the same turn.
3480                    // cooldown=0: eviction does not impose post-compaction cooldown.
3481                    self.context_manager.set_compaction_state(
3482                        crate::agent::context_manager::CompactionState::CompactedThisTurn {
3483                            cooldown: 0,
3484                        },
3485                    );
3486                    tracing::info!(
3487                        freed_tokens = freed,
3488                        evicted_cursors = evicted_indices.len(),
3489                        pass = self.services.sidequest.passes_run,
3490                        "sidequest eviction complete"
3491                    );
3492                    if let Some(ref d) = self.runtime.debug.debug_dumper {
3493                        d.dump_sidequest_eviction(&cursors_snapshot, &evicted_indices, freed);
3494                    }
3495                    if let Some(ref tx) = self.services.session.status_tx {
3496                        let _ = tx.send(format!("SideQuest evicted {freed} tokens"));
3497                    }
3498                } else {
3499                    // apply_eviction returned 0 — clear spinner so it doesn't dangle.
3500                    if let Some(ref tx) = self.services.session.status_tx {
3501                        let _ = tx.send(String::new());
3502                    }
3503                }
3504            }
3505            Ok(None | Some(_)) => {
3506                tracing::debug!("sidequest: pending result: no cursors to evict");
3507                if let Some(ref tx) = self.services.session.status_tx {
3508                    let _ = tx.send(String::new());
3509                }
3510            }
3511            Err(e) => {
3512                tracing::debug!("sidequest: background task error: {e}");
3513                if let Some(ref tx) = self.services.session.status_tx {
3514                    let _ = tx.send(String::new());
3515                }
3516            }
3517        }
3518    }
3519
3520    fn sidequest_schedule_next(&mut self) {
3521        use zeph_llm::provider::{Message, MessageMetadata, Role};
3522
3523        self.services
3524            .sidequest
3525            .rebuild_cursors(&self.msg.messages, &self.runtime.metrics.token_counter);
3526
3527        if self.services.sidequest.tool_output_cursors.is_empty() {
3528            tracing::debug!("sidequest: no eligible cursors");
3529            return;
3530        }
3531
3532        let prompt = self.services.sidequest.build_eviction_prompt();
3533        let max_eviction_ratio = self.services.sidequest.config.max_eviction_ratio;
3534        let n_cursors = self.services.sidequest.tool_output_cursors.len();
3535        // Clone the provider so the spawn closure owns it without borrowing self.
3536        let provider = self.summary_or_primary_provider().clone();
3537
3538        let eviction_future = async move {
3539            let msgs = [Message {
3540                role: Role::User,
3541                content: prompt,
3542                parts: vec![],
3543                metadata: MessageMetadata::default(),
3544            }];
3545            let response =
3546                match tokio::time::timeout(std::time::Duration::from_secs(5), provider.chat(&msgs))
3547                    .await
3548                {
3549                    Ok(Ok(r)) => r,
3550                    Ok(Err(e)) => {
3551                        tracing::debug!("sidequest bg: LLM call failed: {e:#}");
3552                        return None;
3553                    }
3554                    Err(_) => {
3555                        tracing::debug!("sidequest bg: LLM call timed out");
3556                        return None;
3557                    }
3558                };
3559
3560            let start = response.find('{')?;
3561            let end = response.rfind('}')?;
3562            if start > end {
3563                return None;
3564            }
3565            let json_slice = &response[start..=end];
3566            let parsed: sidequest::EvictionResponse = serde_json::from_str(json_slice).ok()?;
3567            let mut valid: Vec<usize> = parsed
3568                .del_cursors
3569                .into_iter()
3570                .filter(|&c| c < n_cursors)
3571                .collect();
3572            valid.sort_unstable();
3573            valid.dedup();
3574            #[allow(
3575                clippy::cast_precision_loss,
3576                clippy::cast_possible_truncation,
3577                clippy::cast_sign_loss
3578            )]
3579            let max_evict = ((n_cursors as f32) * max_eviction_ratio).ceil() as usize;
3580            valid.truncate(max_evict);
3581            Some(valid)
3582        };
3583        let handle = self.runtime.lifecycle.task_supervisor.spawn_oneshot(
3584            std::sync::Arc::from("agent.sidequest.eviction"),
3585            move || eviction_future,
3586        );
3587        self.services.compression.pending_sidequest_result = Some(handle);
3588        tracing::debug!("sidequest: background LLM eviction task spawned");
3589        if let Some(ref tx) = self.services.session.status_tx {
3590            let _ = tx.send("SideQuest: scoring tool outputs...".into());
3591        }
3592    }
3593
3594    /// Return an `McpDispatch` adapter backed by the agent's MCP manager, if present.
3595    fn mcp_dispatch(&self) -> Option<McpManagerDispatch> {
3596        self.services
3597            .mcp
3598            .manager
3599            .as_ref()
3600            .map(|m| McpManagerDispatch(Arc::clone(m)))
3601    }
3602
3603    /// Check if the process cwd has changed since last call and fire `CwdChanged` hooks.
3604    ///
3605    /// Called after each tool batch completes. The check is a single syscall and has
3606    /// negligible cost. Only fires when cwd actually changed (defense-in-depth: normally
3607    /// only `set_working_directory` changes cwd; shell child processes cannot affect it).
3608    pub(crate) async fn check_cwd_changed(&mut self) {
3609        let current = match std::env::current_dir() {
3610            Ok(p) => p,
3611            Err(e) => {
3612                tracing::warn!("check_cwd_changed: failed to get cwd: {e}");
3613                return;
3614            }
3615        };
3616        if current == self.runtime.lifecycle.last_known_cwd {
3617            return;
3618        }
3619        let old_cwd =
3620            std::mem::replace(&mut self.runtime.lifecycle.last_known_cwd, current.clone());
3621        self.services.session.env_context.working_dir = current.display().to_string();
3622
3623        tracing::info!(
3624            old = %old_cwd.display(),
3625            new = %current.display(),
3626            "working directory changed"
3627        );
3628
3629        let _ = self
3630            .channel
3631            .send_status("Working directory changed\u{2026}")
3632            .await;
3633
3634        let hooks = self.services.session.hooks_config.cwd_changed.clone();
3635        if hooks.is_empty() {
3636            tracing::debug!("CwdChanged: no hooks configured, skipping");
3637        } else {
3638            tracing::debug!(count = hooks.len(), "CwdChanged: firing hooks");
3639            let mut env = std::collections::HashMap::new();
3640            env.insert("ZEPH_OLD_CWD".to_owned(), old_cwd.display().to_string());
3641            env.insert("ZEPH_NEW_CWD".to_owned(), current.display().to_string());
3642            let dispatch = self.mcp_dispatch();
3643            let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
3644                .as_ref()
3645                .map(|d| d as &dyn zeph_subagent::McpDispatch);
3646            if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp, None).await {
3647                tracing::warn!(error = %e, "CwdChanged hook failed");
3648            } else {
3649                tracing::info!(count = hooks.len(), "CwdChanged: hooks fired");
3650            }
3651        }
3652
3653        let _ = self.channel.send_status("").await;
3654    }
3655
3656    /// Handle a `FileChangedEvent` from the file watcher.
3657    pub(crate) async fn handle_file_changed(
3658        &mut self,
3659        event: crate::file_watcher::FileChangedEvent,
3660    ) {
3661        tracing::info!(path = %event.path.display(), "file changed");
3662
3663        let _ = self
3664            .channel
3665            .send_status("Running file-change hook\u{2026}")
3666            .await;
3667
3668        let hooks = self
3669            .services
3670            .session
3671            .hooks_config
3672            .file_changed_hooks
3673            .clone();
3674        if hooks.is_empty() {
3675            tracing::debug!(path = %event.path.display(), "FileChanged: no hooks configured, skipping");
3676        } else {
3677            tracing::debug!(count = hooks.len(), path = %event.path.display(), "FileChanged: firing hooks");
3678            let mut env = std::collections::HashMap::new();
3679            env.insert(
3680                "ZEPH_CHANGED_PATH".to_owned(),
3681                event.path.display().to_string(),
3682            );
3683            let dispatch = self.mcp_dispatch();
3684            let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
3685                .as_ref()
3686                .map(|d| d as &dyn zeph_subagent::McpDispatch);
3687            if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp, None).await {
3688                tracing::warn!(error = %e, "FileChanged hook failed");
3689            } else {
3690                tracing::info!(count = hooks.len(), path = %event.path.display(), "FileChanged: hooks fired");
3691            }
3692        }
3693
3694        let _ = self.channel.send_status("").await;
3695    }
3696
3697    /// If the compression spectrum is enabled and a promotion engine is wired, spawn a
3698    /// background scan task.
3699    ///
3700    /// The task loads the most-recent episodic window from `SemanticMemory`, runs the
3701    /// greedy clustering scan, and calls `promote` for each qualifying candidate.
3702    ///
3703    /// Supervised via [`agent_supervisor::BackgroundSupervisor`] under
3704    /// [`agent_supervisor::TaskClass::Enrichment`] — dropped under high load rather than
3705    /// blocking the turn.
3706    pub(super) fn maybe_spawn_promotion_scan(&mut self) {
3707        let Some(engine) = self.services.promotion_engine.clone() else {
3708            return;
3709        };
3710
3711        let Some(memory) = self.services.memory.persistence.memory.clone() else {
3712            return;
3713        };
3714
3715        // Use a conservative window cap. The engine's own PromotionConfig thresholds
3716        // determine whether a cluster actually qualifies; this is just the DB scan limit.
3717        let promotion_window = 200usize;
3718
3719        let accepted = self.runtime.lifecycle.supervisor.spawn(
3720            agent_supervisor::TaskClass::Enrichment,
3721            "compression_spectrum.promotion_scan",
3722            async move {
3723                let window = match memory.load_promotion_window(promotion_window).await {
3724                    Ok(w) => w,
3725                    Err(e) => {
3726                        tracing::warn!(error = %e, "promotion scan: failed to load window");
3727                        return;
3728                    }
3729                };
3730
3731                if window.is_empty() {
3732                    return;
3733                }
3734
3735                let candidates = match engine.scan(&window).await {
3736                    Ok(c) => c,
3737                    Err(e) => {
3738                        tracing::warn!(error = %e, "promotion scan: clustering failed");
3739                        return;
3740                    }
3741                };
3742
3743                for candidate in &candidates {
3744                    if let Err(e) = engine.promote(candidate).await {
3745                        tracing::warn!(
3746                            signature = %candidate.signature,
3747                            error = %e,
3748                            "promotion scan: promote failed"
3749                        );
3750                    }
3751                }
3752
3753                tracing::info!(candidates = candidates.len(), "promotion scan: complete");
3754            }
3755            .instrument(tracing::info_span!("memory.compression.promote.background")),
3756        );
3757
3758        if accepted {
3759            tracing::debug!("compression_spectrum: promotion scan task enqueued");
3760        }
3761    }
3762}
3763/// Estimates the JSON payload size of a single [`zeph_llm::provider::Message`] for token-budget
3764/// accounting.
3765///
3766/// When `parts` is empty the message is a legacy text-only message and `content.len()` is used
3767/// directly. Otherwise each part is measured individually so that structured variants (images,
3768/// tool invocations, thinking blocks) are accounted for rather than relying on the already-flat
3769/// `content` string, which may not reflect the actual API payload size.
3770pub(crate) fn estimate_parts_size(m: &zeph_llm::provider::Message) -> usize {
3771    use zeph_llm::provider::MessagePart;
3772    if m.parts.is_empty() {
3773        return m.content.len();
3774    }
3775    m.parts
3776        .iter()
3777        .map(|p| match p {
3778            MessagePart::Text { text }
3779            | MessagePart::Recall { text }
3780            | MessagePart::CodeContext { text }
3781            | MessagePart::Summary { text }
3782            | MessagePart::CrossSession { text } => text.len(),
3783            MessagePart::ToolOutput { body, .. } => body.len(),
3784            MessagePart::ToolUse { id, name, input } => {
3785                50 + id.len() + name.len() + input.to_string().len()
3786            }
3787            MessagePart::ToolResult {
3788                tool_use_id,
3789                content,
3790                ..
3791            } => 50 + tool_use_id.len() + content.len(),
3792            MessagePart::Image(img) => img.data.len() * 4 / 3,
3793            MessagePart::ThinkingBlock {
3794                thinking,
3795                signature,
3796            } => 50 + thinking.len() + signature.len(),
3797            MessagePart::RedactedThinkingBlock { data } => data.len(),
3798            MessagePart::Compaction { summary } => summary.len(),
3799            _ => 0,
3800        })
3801        .sum()
3802}
3803
3804/// Applies token-budget truncation and orphaned-tool-pair pruning to a parent message slice.
3805///
3806/// Budget truncation keeps the **most recent** messages that fit within `max_chars`
3807/// (a suffix), so the subagent always receives the freshest context.
3808///
3809/// Two passes are performed after budget truncation:
3810///
3811/// 1. Remove `ToolResult` parts from user messages whose matching `ToolUse` is no longer in the
3812///    slice (truncated away).
3813/// 2. Remove `ToolUse` parts from **interior** assistant messages whose matching `ToolResult`
3814///    was removed in pass 1 or was already absent. The trailing assistant message is exempt —
3815///    its unanswered `ToolUse` calls are not orphaned; the slice just ends before the result.
3816///
3817/// Messages that become fully empty after pruning are removed from `msgs`.
3818///
3819/// `rebuild_content` is called **only** when `retain` actually removed parts — preserving the
3820/// existing `content` field (and any `ThinkingBlock` text embedded there) for unmodified
3821/// messages.
3822pub(crate) fn trim_parent_messages(msgs: &mut Vec<zeph_llm::provider::Message>, max_chars: usize) {
3823    use zeph_llm::provider::{MessagePart, Role};
3824
3825    // Token-budget cap: keep the most recent messages that fit within max_chars.
3826    // We iterate from the end (newest) and drain from the front once the budget is exceeded,
3827    // so the subagent always receives the most recent context rather than stale early messages.
3828    let mut total_chars = 0usize;
3829    let mut drop_before = 0usize; // index of the first message to keep
3830    for (i, m) in msgs.iter().enumerate().rev() {
3831        total_chars += estimate_parts_size(m);
3832        if total_chars > max_chars {
3833            drop_before = i + 1;
3834            break;
3835        }
3836    }
3837    if drop_before > 0 {
3838        msgs.drain(..drop_before);
3839    }
3840
3841    // Pass 1: collect ToolUse IDs emitted by assistant messages; prune orphaned ToolResult
3842    // parts from user messages that reference a ToolUse no longer present in the slice.
3843    // Use owned Strings to avoid holding immutable borrows across the subsequent mutable loop.
3844    let emitted_tool_ids: std::collections::HashSet<String> = msgs
3845        .iter()
3846        .filter(|m| m.role == Role::Assistant)
3847        .flat_map(|m| m.parts.iter())
3848        .filter_map(|p| {
3849            if let MessagePart::ToolUse { id, .. } = p {
3850                Some(id.clone())
3851            } else {
3852                None
3853            }
3854        })
3855        .collect();
3856
3857    let mut orphans_removed = 0usize;
3858    for m in msgs.iter_mut() {
3859        if m.role != Role::User || m.parts.is_empty() {
3860            continue;
3861        }
3862        let before = m.parts.len();
3863        m.parts.retain(|p| match p {
3864            MessagePart::ToolResult { tool_use_id, .. } => {
3865                emitted_tool_ids.contains(tool_use_id.as_str())
3866            }
3867            _ => true,
3868        });
3869        let dropped = before - m.parts.len();
3870        if dropped > 0 {
3871            orphans_removed += dropped;
3872            if m.parts.is_empty() {
3873                m.content.clear();
3874            } else {
3875                m.rebuild_content();
3876            }
3877        }
3878    }
3879
3880    // Pass 2: collect ToolResult IDs present in user messages after pass 1; prune ToolUse
3881    // parts from assistant messages whose result is confirmed absent.
3882    //
3883    // The trailing assistant message is exempt: it may legitimately contain unanswered
3884    // ToolUse calls (the slice ends before the result arrives). Only interior assistant
3885    // messages — those followed by at least one user message — can have provably orphaned
3886    // ToolUse parts (the conversation moved on without answering them).
3887    let consumed_tool_ids: std::collections::HashSet<String> = msgs
3888        .iter()
3889        .filter(|m| m.role == Role::User)
3890        .flat_map(|m| m.parts.iter())
3891        .filter_map(|p| {
3892            if let MessagePart::ToolResult { tool_use_id, .. } = p {
3893                Some(tool_use_id.clone())
3894            } else {
3895                None
3896            }
3897        })
3898        .collect();
3899
3900    // Index of the last assistant message — exempt from pass 2.
3901    let last_assistant_idx = msgs
3902        .iter()
3903        .enumerate()
3904        .rev()
3905        .find(|(_, m)| m.role == Role::Assistant)
3906        .map(|(i, _)| i);
3907
3908    for (idx, m) in msgs.iter_mut().enumerate() {
3909        if m.role != Role::Assistant || m.parts.is_empty() {
3910            continue;
3911        }
3912        // Skip the trailing assistant message — its unanswered ToolUse calls are not orphaned.
3913        if Some(idx) == last_assistant_idx {
3914            continue;
3915        }
3916        let before = m.parts.len();
3917        m.parts.retain(|p| match p {
3918            MessagePart::ToolUse { id, .. } => consumed_tool_ids.contains(id.as_str()),
3919            _ => true,
3920        });
3921        let dropped = before - m.parts.len();
3922        if dropped > 0 {
3923            orphans_removed += dropped;
3924            if m.parts.is_empty() {
3925                m.content.clear();
3926            } else {
3927                m.rebuild_content();
3928            }
3929        }
3930    }
3931
3932    // Remove messages that were emptied by orphan pruning.
3933    msgs.retain(|m| !m.content.is_empty() || !m.parts.is_empty());
3934
3935    if orphans_removed > 0 {
3936        tracing::debug!(
3937            orphans = orphans_removed,
3938            "[subagent] pruned orphaned ToolUse/ToolResult parts from parent context boundary"
3939        );
3940    }
3941}
3942
3943/// Sanitize text parts of `msgs` through the IPI pipeline.
3944///
3945/// Only [`MessagePart::Text`] parts are passed through the sanitizer; structured parts
3946/// (`ToolUse`, `ToolResult`, `Recall`, `CodeContext`) are left untouched.  After sanitization
3947/// the message `content` field is rebuilt to stay consistent with the updated parts.
3948fn sanitize_parent_messages(
3949    mut msgs: Vec<zeph_llm::provider::Message>,
3950    sanitizer: &zeph_sanitizer::ContentSanitizer,
3951    source: &zeph_sanitizer::ContentSource,
3952) -> Vec<zeph_llm::provider::Message> {
3953    use zeph_llm::provider::MessagePart;
3954    for msg in &mut msgs {
3955        let mut changed = false;
3956        for part in &mut msg.parts {
3957            if let MessagePart::Text { text } = part {
3958                let clean = sanitizer.sanitize(text, source.clone());
3959                if clean.body != *text {
3960                    *text = clean.body;
3961                    changed = true;
3962                }
3963            }
3964        }
3965        if changed {
3966            msg.rebuild_content();
3967        }
3968    }
3969    msgs
3970}
3971
3972/// Thin wrapper that implements [`zeph_subagent::McpDispatch`] over an [`Arc<zeph_mcp::McpManager>`].
3973///
3974/// Used to pass MCP tool dispatch capability into `fire_hooks` without coupling
3975/// `zeph-subagent` to `zeph-mcp`.
3976struct McpManagerDispatch(Arc<zeph_mcp::McpManager>);
3977
3978impl zeph_subagent::McpDispatch for McpManagerDispatch {
3979    fn call_tool<'a>(
3980        &'a self,
3981        server: &'a str,
3982        tool: &'a str,
3983        args: serde_json::Value,
3984    ) -> std::pin::Pin<
3985        Box<dyn std::future::Future<Output = Result<serde_json::Value, String>> + Send + 'a>,
3986    > {
3987        Box::pin(async move {
3988            self.0
3989                .call_tool(server, tool, args)
3990                .await
3991                .map(|result| {
3992                    // Extract text content from the MCP response as a JSON value.
3993                    let texts: Vec<serde_json::Value> = result
3994                        .content
3995                        .iter()
3996                        .filter_map(|c| {
3997                            if let rmcp::model::RawContent::Text(t) = &c.raw {
3998                                Some(serde_json::Value::String(t.text.clone()))
3999                            } else {
4000                                None
4001                            }
4002                        })
4003                        .collect();
4004                    serde_json::Value::Array(texts)
4005                })
4006                .map_err(|e| e.to_string())
4007        })
4008    }
4009}
4010
4011pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
4012    while !*rx.borrow_and_update() {
4013        if rx.changed().await.is_err() {
4014            std::future::pending::<()>().await;
4015        }
4016    }
4017}
4018
4019pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
4020    match rx {
4021        Some(inner) => {
4022            if let Some(v) = inner.recv().await {
4023                Some(v)
4024            } else {
4025                *rx = None;
4026                std::future::pending().await
4027            }
4028        }
4029        None => std::future::pending().await,
4030    }
4031}
4032
4033/// Resolve the effective context budget from config, applying the `auto_budget` fallback.
4034///
4035/// Mirrors `AppBuilder::auto_budget_tokens` so hot-reload and initial startup use the same
4036/// logic: if `auto_budget = true` and `context_budget_tokens == 0`, query the provider's
4037/// context window; if still 0, fall back to 128 000 tokens.
4038/// Truncate a background run command to at most 80 characters for TUI display.
4039fn truncate_shell_command(cmd: &str) -> String {
4040    if cmd.len() <= 80 {
4041        return cmd.to_owned();
4042    }
4043    let end = cmd.floor_char_boundary(79);
4044    format!("{}…", &cmd[..end])
4045}
4046
4047/// Take the first 8 characters of a run-id hex string for compact TUI display.
4048fn truncate_shell_run_id(id: &str) -> String {
4049    id.chars().take(8).collect()
4050}
4051
4052pub(crate) fn resolve_context_budget(config: &Config, provider: &AnyProvider) -> usize {
4053    let tokens = if config.memory.auto_budget && config.memory.context_budget_tokens == 0 {
4054        if let Some(ctx_size) = provider.context_window() {
4055            tracing::info!(
4056                model_context = ctx_size,
4057                "auto-configured context budget on reload"
4058            );
4059            ctx_size
4060        } else {
4061            0
4062        }
4063    } else {
4064        config.memory.context_budget_tokens
4065    };
4066    if tokens == 0 {
4067        tracing::warn!(
4068            "context_budget_tokens resolved to 0 on reload — using fallback of 128000 tokens"
4069        );
4070        128_000
4071    } else {
4072        tokens
4073    }
4074}
4075
4076#[cfg(test)]
4077mod tests;
4078
4079#[cfg(test)]
4080pub(crate) use tests::agent_tests;
4081
4082#[cfg(test)]
4083mod test_stubs {
4084    use std::pin::Pin;
4085
4086    use zeph_commands::{
4087        CommandContext, CommandError, CommandHandler, CommandOutput, SlashCategory,
4088    };
4089
4090    /// Stub slash command registered only in `#[cfg(test)]` builds.
4091    ///
4092    /// Triggers the `Some(Err(CommandError))` arm in the session/debug registry
4093    /// dispatch block so the non-fatal error path can be tested without production
4094    /// command validation logic.
4095    pub(super) struct TestErrorCommand;
4096
4097    impl CommandHandler<CommandContext<'_>> for TestErrorCommand {
4098        fn name(&self) -> &'static str {
4099            "/test-error"
4100        }
4101
4102        fn description(&self) -> &'static str {
4103            "Test stub: always returns CommandError"
4104        }
4105
4106        fn category(&self) -> SlashCategory {
4107            SlashCategory::Session
4108        }
4109
4110        fn handle<'a>(
4111            &'a self,
4112            _ctx: &'a mut CommandContext<'_>,
4113            _args: &'a str,
4114        ) -> Pin<
4115            Box<dyn std::future::Future<Output = Result<CommandOutput, CommandError>> + Send + 'a>,
4116        > {
4117            Box::pin(async { Err(CommandError::new("boom")) })
4118        }
4119    }
4120}