Skip to main content

zeph_core/agent/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4// TODO(arch-revised-2026-04-26): agent/ module split deferred to epic/m49+/agent-split.
5// Hard prerequisite: 100% task_supervisor adoption (currently 10/31 raw tokio::spawn
6// sites in this directory). See arch-assessment-revised-2026-04-26T02-04-23.md PR 1
7// and PR 8. Do not split this file until PR 1 is merged and a /specs/ entry exists.
8
9mod acp_commands;
10mod agent_access_impl;
11pub(crate) mod agent_supervisor;
12mod autodream;
13mod builder;
14pub(crate) mod channel_impl;
15mod command_context_impls;
16pub(super) mod compression_feedback;
17mod context;
18mod context_impls;
19pub(crate) mod context_manager;
20mod corrections;
21pub mod error;
22mod experiment_cmd;
23pub(crate) mod focus;
24mod index;
25mod learning;
26pub(crate) mod learning_engine;
27mod log_commands;
28mod loop_event;
29mod lsp_commands;
30mod magic_docs;
31mod mcp;
32pub(crate) mod memcot;
33mod message_queue;
34mod microcompact;
35mod model_commands;
36mod persistence;
37#[cfg(feature = "scheduler")]
38mod plan;
39mod policy_commands;
40mod provider_cmd;
41#[cfg(feature = "self-check")]
42mod quality_hook;
43pub(crate) mod rate_limiter;
44#[cfg(feature = "scheduler")]
45mod scheduler_commands;
46#[cfg(feature = "scheduler")]
47mod scheduler_loop;
48mod scope_commands;
49pub mod session_config;
50mod session_digest;
51pub(crate) mod sidequest;
52mod skill_management;
53pub mod slash_commands;
54pub mod speculative;
55pub(crate) mod state;
56pub(crate) mod task_injection;
57pub(crate) mod tool_execution;
58pub(crate) mod tool_orchestrator;
59pub mod trajectory;
60mod trajectory_commands;
61mod trust_commands;
62pub mod turn;
63mod utils;
64pub(crate) mod vigil;
65
66use std::collections::{HashMap, VecDeque};
67use std::fmt::Write as _;
68use std::sync::Arc;
69
70use parking_lot::RwLock;
71
72use tokio::sync::{mpsc, watch};
73use tokio_util::sync::CancellationToken;
74use zeph_llm::any::AnyProvider;
75use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
76use zeph_memory::TokenCounter;
77use zeph_memory::semantic::SemanticMemory;
78use zeph_skills::loader::Skill;
79use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
80use zeph_skills::prompt::format_skills_prompt;
81use zeph_skills::registry::SkillRegistry;
82use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
83
84use crate::channel::Channel;
85use crate::config::Config;
86use crate::context::{ContextBudget, build_system_prompt};
87use zeph_common::text::estimate_tokens;
88
89use loop_event::LoopEvent;
90use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, detect_image_mime};
91use state::MessageState;
92
93pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
94// CODE_CONTEXT_PREFIX is re-exported from zeph-agent-context::helpers so callers inside
95// zeph-core that build system-prompt injections can use it without depending on zeph-agent-context
96// directly. SESSION_DIGEST_PREFIX was removed when assembly migrated to ContextService.
97pub(crate) use zeph_agent_context::helpers::CODE_CONTEXT_PREFIX;
98pub(crate) const SCHEDULED_TASK_PREFIX: &str = "Execute the following scheduled task now: ";
99pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
100
101pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
102    use std::fmt::Write;
103    let capacity = "[tool output: ".len()
104        + tool_name.len()
105        + "]\n```\n".len()
106        + body.len()
107        + TOOL_OUTPUT_SUFFIX.len();
108    let mut buf = String::with_capacity(capacity);
109    let _ = write!(
110        buf,
111        "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
112    );
113    buf
114}
115
116/// Zeph agent: autonomous AI system with multi-model inference, semantic memory, skills,
117/// tool orchestration, and multi-channel I/O.
118///
119/// The agent maintains conversation history, manages LLM provider state, coordinates tool
120/// execution, and orchestrates memory and skill subsystems. It communicates with the outside
121/// world via the [`Channel`] trait, enabling support for CLI, Telegram, TUI, or custom I/O.
122///
123/// # Architecture
124///
125/// - **Message state**: Conversation history with system prompt, message queue, and metadata
126/// - **Memory state**: `SQLite` + Qdrant vector store for semantic search and compaction
127/// - **Skill state**: Registry, matching engine, and self-learning evolution
128/// - **Context manager**: Token budgeting, context assembly, and summarization
129/// - **Tool orchestrator**: DAG-based multi-tool execution with streaming output
130/// - **MCP client**: Multi-server support for Model Context Protocol
131/// - **Index state**: AST-based code indexing and semantic retrieval
132/// - **Security**: Sanitization, exfiltration detection, adversarial probes
133/// - **Metrics**: Token usage, latency, cost, and anomaly tracking
134///
135/// # Channel Contract
136///
137/// The agent requires a [`Channel`] implementation for user interaction:
138/// - Sends agent responses via `channel.send(message)`
139/// - Receives user input via `channel.recv()` / `channel.recv_internal()`
140/// - Supports structured events: tool invocations, tool output, streaming updates
141///
142/// # Lifecycle
143///
144/// 1. Create with [`Self::new`] or [`Self::new_with_registry_arc`]
145/// 2. Run main loop with [`Self::run`]
146/// 3. Clean up with [`Self::shutdown`] to persist state and close resources
147///
148pub struct Agent<C: Channel> {
149    // --- I/O & primary providers (kept inline) ---
150    provider: AnyProvider,
151    /// Dedicated embedding provider. Resolved once at bootstrap from `[[llm.providers]]`
152    /// (the entry with `embed = true`, or first entry with `embedding_model` set).
153    /// Falls back to `provider.clone()` when no dedicated entry exists.
154    /// **Never replaced** by `/provider switch`.
155    embedding_provider: AnyProvider,
156    channel: C,
157    pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
158
159    // --- Conversation core (kept inline) ---
160    pub(super) msg: MessageState,
161    pub(super) context_manager: context_manager::ContextManager,
162    pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
163
164    // --- Aggregated background services ---
165    pub(super) services: state::Services,
166
167    // --- Aggregated runtime / lifecycle / telemetry ---
168    pub(super) runtime: state::AgentRuntime,
169}
170
171/// Control flow signal returned by [`Agent::apply_dispatch_result`].
172enum DispatchFlow {
173    /// The command requested exit; the agent loop should `break`.
174    Break,
175    /// The command was handled; the agent loop should `continue`.
176    Continue,
177    /// The command was not recognised; the agent loop should fall through.
178    Fallthrough,
179}
180
181impl<C: Channel> Agent<C> {
182    /// Create a new agent instance with the given LLM provider, I/O channel, and subsystems.
183    ///
184    /// # Arguments
185    ///
186    /// * `provider` — Multi-model LLM provider (Claude, `OpenAI`, Ollama, Candle)
187    /// * `channel` — I/O abstraction for user interaction (CLI, Telegram, TUI, etc.)
188    /// * `registry` — Skill registry; moved into an internal `Arc<RwLock<_>>` for sharing
189    /// * `matcher` — Optional semantic skill matcher (e.g., Qdrant, BM25). If `None`,
190    ///   skills are matched by exact name only
191    /// * `max_active_skills` — Max concurrent skills in execution (must be > 0)
192    /// * `tool_executor` — Trait object for executing shell, web, and custom tools
193    ///
194    /// # Initialization
195    ///
196    /// The constructor:
197    /// 1. Wraps the skill registry into `Arc<RwLock<_>>` internally
198    /// 2. Builds the system prompt from registered skills
199    /// 3. Initializes all subsystems (memory, context manager, metrics, security)
200    /// 4. Returns a ready-to-run agent
201    ///
202    /// # Panics
203    ///
204    /// Panics if `max_active_skills` is 0.
205    #[must_use]
206    pub fn new(
207        provider: AnyProvider,
208        channel: C,
209        registry: SkillRegistry,
210        matcher: Option<SkillMatcherBackend>,
211        max_active_skills: usize,
212        tool_executor: impl ToolExecutor + 'static,
213    ) -> Self {
214        let registry = Arc::new(RwLock::new(registry));
215        let embedding_provider = provider.clone();
216        Self::new_with_registry_arc(
217            provider,
218            embedding_provider,
219            channel,
220            registry,
221            matcher,
222            max_active_skills,
223            tool_executor,
224        )
225    }
226
227    /// Create an agent from a pre-wrapped registry Arc, allowing the caller to
228    /// share the same Arc with other components (e.g. [`crate::SkillLoaderExecutor`]).
229    ///
230    /// # Panics
231    ///
232    /// Panics if the registry `RwLock` is poisoned.
233    #[must_use]
234    pub fn new_with_registry_arc(
235        provider: AnyProvider,
236        embedding_provider: AnyProvider,
237        channel: C,
238        registry: Arc<RwLock<SkillRegistry>>,
239        matcher: Option<SkillMatcherBackend>,
240        max_active_skills: usize,
241        tool_executor: impl ToolExecutor + 'static,
242    ) -> Self {
243        use state::{
244            AgentRuntime, CompressionState, DebugState, ExperimentState, FeedbackState, IndexState,
245            InstructionState, LifecycleState, McpState, MemoryState, MetricsState,
246            OrchestrationState, ProviderState, RuntimeConfig, SecurityState, Services,
247            SessionState, SkillState, ToolState,
248        };
249
250        debug_assert!(max_active_skills > 0, "max_active_skills must be > 0");
251        let all_skills: Vec<Skill> = {
252            let reg = registry.read();
253            reg.all_meta()
254                .iter()
255                .filter_map(|m| reg.skill(&m.name).ok())
256                .collect()
257        };
258        let empty_trust = HashMap::new();
259        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
260        let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
261        let system_prompt = build_system_prompt(&skills_prompt, None);
262        tracing::debug!(len = system_prompt.len(), "initial system prompt built");
263        tracing::trace!(prompt = %system_prompt, "full system prompt");
264
265        let initial_prompt_tokens = estimate_tokens(&system_prompt) as u64;
266        let token_counter = Arc::new(TokenCounter::new());
267
268        let services = Services {
269            memory: MemoryState::default(),
270            skill: SkillState::new(registry, matcher, max_active_skills, skills_prompt),
271            learning_engine: learning_engine::LearningEngine::new(),
272            feedback: FeedbackState::default(),
273            mcp: McpState::default(),
274            index: IndexState::default(),
275            session: SessionState::new(),
276            security: SecurityState::default(),
277            experiments: ExperimentState::new(),
278            compression: CompressionState::default(),
279            orchestration: OrchestrationState::default(),
280            focus: focus::FocusState::default(),
281            sidequest: sidequest::SidequestState::default(),
282            tool_state: ToolState::default(),
283            goal_accounting: None,
284            #[cfg(feature = "self-check")]
285            quality: None,
286            proactive_explorer: None,
287            promotion_engine: None,
288            taco_compressor: None,
289            speculation_engine: None,
290        };
291
292        let runtime = AgentRuntime {
293            config: RuntimeConfig::default(),
294            lifecycle: LifecycleState::new(),
295            providers: ProviderState::new(initial_prompt_tokens),
296            metrics: MetricsState::new(token_counter),
297            debug: DebugState::default(),
298            instructions: InstructionState::default(),
299        };
300
301        Self {
302            provider,
303            embedding_provider,
304            channel,
305            tool_executor: Arc::new(tool_executor),
306            msg: MessageState {
307                messages: vec![Message {
308                    role: Role::System,
309                    content: system_prompt,
310                    parts: vec![],
311                    metadata: MessageMetadata::default(),
312                }],
313                message_queue: VecDeque::new(),
314                pending_image_parts: Vec::new(),
315                last_persisted_message_id: None,
316                deferred_db_hide_ids: Vec::new(),
317                deferred_db_summaries: Vec::new(),
318            },
319            context_manager: context_manager::ContextManager::new(),
320            tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
321            services,
322            runtime,
323        }
324    }
325
326    /// Consume the agent and return the inner channel.
327    ///
328    /// Call this after [`run`][Agent::run] completes to retrieve the I/O channel (e.g., to
329    /// read captured responses from a headless channel such as `BenchmarkChannel`).
330    ///
331    /// # Examples
332    ///
333    /// ```no_run
334    /// # use zeph_core::agent::Agent;
335    /// // After agent.run().await completes, consume the agent to retrieve the channel.
336    /// // let channel: MyChannel = agent.into_channel();
337    /// ```
338    #[must_use]
339    pub fn into_channel(self) -> C {
340        self.channel
341    }
342
343    /// Poll all active sub-agents for completed/failed/canceled results.
344    ///
345    /// Non-blocking: returns immediately with a list of `(task_id, result)` pairs
346    /// for agents that have finished. Each completed agent is removed from the manager.
347    pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
348        let Some(mgr) = &mut self.services.orchestration.subagent_manager else {
349            return vec![];
350        };
351
352        let finished: Vec<String> = mgr
353            .statuses()
354            .into_iter()
355            .filter_map(|(id, status)| {
356                if matches!(
357                    status.state,
358                    zeph_subagent::SubAgentState::Completed
359                        | zeph_subagent::SubAgentState::Failed
360                        | zeph_subagent::SubAgentState::Canceled
361                ) {
362                    Some(id)
363                } else {
364                    None
365                }
366            })
367            .collect();
368
369        let mut results = vec![];
370        for task_id in finished {
371            match mgr.collect(&task_id).await {
372                Ok(result) => results.push((task_id, result)),
373                Err(e) => {
374                    tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
375                }
376            }
377        }
378        results
379    }
380
381    /// Call the LLM to generate a structured session summary with a configurable timeout.
382    ///
383    /// Falls back to plain-text chat if structured output fails or times out. Returns `None` on
384    /// any failure, logging a warning — callers must treat `None` as "skip storage".
385    ///
386    /// Each LLM attempt is bounded by `shutdown_summary_timeout_secs`; in the worst case
387    /// (structured call times out and plain-text fallback also times out) this adds up to
388    /// `2 * shutdown_summary_timeout_secs` of shutdown latency.
389    async fn call_llm_for_session_summary(
390        &self,
391        chat_messages: &[Message],
392    ) -> Option<zeph_memory::StructuredSummary> {
393        let timeout_dur = std::time::Duration::from_secs(
394            self.services
395                .memory
396                .compaction
397                .shutdown_summary_timeout_secs,
398        );
399        match tokio::time::timeout(
400            timeout_dur,
401            self.provider
402                .chat_typed_erased::<zeph_memory::StructuredSummary>(chat_messages),
403        )
404        .await
405        {
406            Ok(Ok(s)) => Some(s),
407            Ok(Err(e)) => {
408                tracing::warn!(
409                    "shutdown summary: structured LLM call failed, falling back to plain: {e:#}"
410                );
411                self.plain_text_summary_fallback(chat_messages, timeout_dur)
412                    .await
413            }
414            Err(_) => {
415                tracing::warn!(
416                    "shutdown summary: structured LLM call timed out after {}s, falling back to plain",
417                    self.services
418                        .memory
419                        .compaction
420                        .shutdown_summary_timeout_secs
421                );
422                self.plain_text_summary_fallback(chat_messages, timeout_dur)
423                    .await
424            }
425        }
426    }
427
428    async fn plain_text_summary_fallback(
429        &self,
430        chat_messages: &[Message],
431        timeout_dur: std::time::Duration,
432    ) -> Option<zeph_memory::StructuredSummary> {
433        match tokio::time::timeout(timeout_dur, self.provider.chat(chat_messages)).await {
434            Ok(Ok(plain)) => Some(zeph_memory::StructuredSummary {
435                summary: plain,
436                key_facts: vec![],
437                entities: vec![],
438            }),
439            Ok(Err(e)) => {
440                tracing::warn!("shutdown summary: plain LLM fallback failed: {e:#}");
441                None
442            }
443            Err(_) => {
444                tracing::warn!("shutdown summary: plain LLM fallback timed out");
445                None
446            }
447        }
448    }
449
450    /// Persist tombstone `ToolResult` messages for any assistant `ToolUse` parts that were written
451    /// to the DB during this session but never paired with a `ToolResult` (e.g. because stdin
452    /// closed while tool execution was in progress). Without this the next session startup strips
453    /// those assistant messages and emits orphan warnings.
454    async fn flush_orphaned_tool_use_on_shutdown(&mut self) {
455        use zeph_llm::provider::{MessagePart, Role};
456
457        // Walk messages in reverse: if the last assistant message (ignoring any trailing
458        // system messages) has ToolUse parts and is NOT immediately followed by a user
459        // message whose ToolResult ids cover those ToolUse ids, persist tombstones.
460        let msgs = &self.msg.messages;
461        // Find last assistant message index.
462        let Some(asst_idx) = msgs.iter().rposition(|m| m.role == Role::Assistant) else {
463            return;
464        };
465        let asst_msg = &msgs[asst_idx];
466        let tool_use_ids: Vec<(&str, &str, &serde_json::Value)> = asst_msg
467            .parts
468            .iter()
469            .filter_map(|p| {
470                if let MessagePart::ToolUse { id, name, input } = p {
471                    Some((id.as_str(), name.as_str(), input))
472                } else {
473                    None
474                }
475            })
476            .collect();
477        if tool_use_ids.is_empty() {
478            return;
479        }
480
481        // Check whether a following user message already pairs all ToolUse ids.
482        let paired_ids: std::collections::HashSet<&str> = msgs
483            .get(asst_idx + 1..)
484            .into_iter()
485            .flatten()
486            .filter(|m| m.role == Role::User)
487            .flat_map(|m| m.parts.iter())
488            .filter_map(|p| {
489                if let MessagePart::ToolResult { tool_use_id, .. } = p {
490                    Some(tool_use_id.as_str())
491                } else {
492                    None
493                }
494            })
495            .collect();
496
497        let unpaired: Vec<zeph_llm::provider::ToolUseRequest> = tool_use_ids
498            .iter()
499            .filter(|(id, _, _)| !paired_ids.contains(*id))
500            .map(|(id, name, input)| zeph_llm::provider::ToolUseRequest {
501                id: (*id).to_owned(),
502                name: (*name).to_owned().into(),
503                input: (*input).clone(),
504            })
505            .collect();
506
507        if unpaired.is_empty() {
508            return;
509        }
510
511        tracing::info!(
512            count = unpaired.len(),
513            "shutdown: persisting tombstone ToolResults for unpaired in-flight tool calls"
514        );
515        self.persist_cancelled_tool_results(&unpaired).await;
516    }
517
518    /// Generate and store a lightweight session summary at shutdown when no hard compaction fired.
519    ///
520    /// Guards:
521    /// - `shutdown_summary` config must be enabled
522    /// - `conversation_id` must be set (memory must be attached)
523    /// - no existing session summary in the store (primary guard — resilient to failed Qdrant writes)
524    /// - at least `shutdown_summary_min_messages` user-turn messages in history
525    ///
526    /// All errors are logged as warnings and swallowed — shutdown must never fail.
527    async fn maybe_store_shutdown_summary(&mut self) {
528        if !self.services.memory.compaction.shutdown_summary {
529            return;
530        }
531        let Some(memory) = self.services.memory.persistence.memory.clone() else {
532            return;
533        };
534        let Some(conversation_id) = self.services.memory.persistence.conversation_id else {
535            return;
536        };
537
538        // Primary guard: check if a summary already exists (handles failed Qdrant writes too).
539        match memory.has_session_summary(conversation_id).await {
540            Ok(true) => {
541                tracing::debug!("shutdown summary: session already has a summary, skipping");
542                return;
543            }
544            Ok(false) => {}
545            Err(e) => {
546                tracing::warn!("shutdown summary: failed to check existing summary: {e:#}");
547                return;
548            }
549        }
550
551        // Count user-turn messages only (skip system prompt at index 0).
552        let user_count = self
553            .msg
554            .messages
555            .iter()
556            .skip(1)
557            .filter(|m| m.role == Role::User)
558            .count();
559        if user_count
560            < self
561                .services
562                .memory
563                .compaction
564                .shutdown_summary_min_messages
565        {
566            tracing::debug!(
567                user_count,
568                min = self
569                    .services
570                    .memory
571                    .compaction
572                    .shutdown_summary_min_messages,
573                "shutdown summary: too few user messages, skipping"
574            );
575            return;
576        }
577
578        // TUI status — send errors silently ignored (TUI may already be gone at shutdown).
579        let _ = self.channel.send_status("Saving session summary...").await;
580
581        // Collect last N messages (skip system prompt at index 0).
582        let max = self
583            .services
584            .memory
585            .compaction
586            .shutdown_summary_max_messages;
587        if max == 0 {
588            tracing::debug!("shutdown summary: max_messages=0, skipping");
589            return;
590        }
591        let non_system: Vec<_> = self.msg.messages.iter().skip(1).collect();
592        let slice = if non_system.len() > max {
593            &non_system[non_system.len() - max..]
594        } else {
595            &non_system[..]
596        };
597
598        let msgs_for_prompt: Vec<(zeph_memory::MessageId, String, String)> = slice
599            .iter()
600            .map(|m| {
601                let role = match m.role {
602                    Role::User => "user".to_owned(),
603                    Role::Assistant => "assistant".to_owned(),
604                    Role::System => "system".to_owned(),
605                };
606                (zeph_memory::MessageId(0), role, m.content.clone())
607            })
608            .collect();
609
610        let prompt = zeph_memory::build_summarization_prompt(&msgs_for_prompt);
611        let chat_messages = vec![Message {
612            role: Role::User,
613            content: prompt,
614            parts: vec![],
615            metadata: MessageMetadata::default(),
616        }];
617
618        let Some(structured) = self.call_llm_for_session_summary(&chat_messages).await else {
619            let _ = self.channel.send_status("").await;
620            return;
621        };
622
623        if let Err(e) = memory
624            .store_shutdown_summary(conversation_id, &structured.summary, &structured.key_facts)
625            .await
626        {
627            tracing::warn!("shutdown summary: storage failed: {e:#}");
628        } else {
629            tracing::info!(
630                conversation_id = conversation_id.0,
631                "shutdown summary stored"
632            );
633        }
634
635        // Clear TUI status.
636        let _ = self.channel.send_status("").await;
637    }
638
639    /// Gracefully shut down the agent and persist state.
640    ///
641    /// Performs the following cleanup:
642    ///
643    /// 1. **Message persistence** — Deferred database writes (hide/summary operations)
644    ///    are flushed to memory or disk
645    /// 2. **Provider state** — LLM router state (e.g., Thompson sampling counters) is saved
646    ///    to the vault
647    /// 3. **Sub-agents** — All active sub-agent tasks are terminated
648    /// 4. **MCP servers** — All connected Model Context Protocol servers are shut down
649    /// 5. **Metrics finalization** — Compaction metrics and session metrics are recorded
650    /// 6. **Memory finalization** — Vector stores and semantic indices are flushed
651    /// 7. **Skill state** — Self-learning engine saves evolved skill definitions
652    ///
653    /// Call this before dropping the agent to ensure no data loss.
654    pub async fn shutdown(&mut self) {
655        let _ = self.channel.send_status("Shutting down...").await;
656
657        // CRIT-1: persist Thompson state accumulated during this session.
658        self.provider.save_router_state().await;
659
660        // Persist AdaptOrch Beta-arm table alongside Thompson state.
661        if let Some(ref advisor) = self.services.orchestration.topology_advisor
662            && let Err(e) = advisor.save()
663        {
664            tracing::warn!(error = %e, "adaptorch: failed to persist state");
665        }
666
667        if let Some(ref mut mgr) = self.services.orchestration.subagent_manager {
668            mgr.shutdown_all();
669        }
670
671        if let Some(ref manager) = self.services.mcp.manager {
672            manager.shutdown_all_shared().await;
673        }
674
675        // Finalize compaction trajectory: push the last open segment into the Vec.
676        // This segment would otherwise only be pushed when the next hard compaction fires,
677        // which never happens at session end.
678        if let Some(turns) = self.context_manager.turns_since_last_hard_compaction {
679            self.update_metrics(|m| {
680                m.compaction_turns_after_hard.push(turns);
681            });
682            self.context_manager.turns_since_last_hard_compaction = None;
683        }
684
685        if let Some(ref tx) = self.runtime.metrics.metrics_tx {
686            let m = tx.borrow();
687            if m.filter_applications > 0 {
688                #[allow(clippy::cast_precision_loss)]
689                let pct = if m.filter_raw_tokens > 0 {
690                    m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
691                } else {
692                    0.0
693                };
694                tracing::info!(
695                    raw_tokens = m.filter_raw_tokens,
696                    saved_tokens = m.filter_saved_tokens,
697                    applications = m.filter_applications,
698                    "tool output filtering saved ~{} tokens ({pct:.0}%)",
699                    m.filter_saved_tokens,
700                );
701            }
702            if m.compaction_hard_count > 0 {
703                tracing::info!(
704                    hard_compactions = m.compaction_hard_count,
705                    turns_after_hard = ?m.compaction_turns_after_hard,
706                    "hard compaction trajectory"
707                );
708            }
709        }
710
711        // Flush tombstone ToolResults for any assistant ToolUse that was persisted but never
712        // paired with a ToolResult (e.g. stdin EOF mid-execution). Without this the next session
713        // startup strips the orphaned ToolUse and emits warnings.
714        self.flush_orphaned_tool_use_on_shutdown().await;
715
716        // NOTE: forcibly aborts in-flight Enrichment and Telemetry tasks tracked by the
717        // supervisor. Before the sprint-1 refactor, experiment sessions were detached
718        // tokio::spawn calls that survived shutdown; they are now intentionally untracked
719        // (see experiment_cmd.rs) and will continue running until their own CancellationToken
720        // is triggered or the process exits.
721        self.runtime.lifecycle.supervisor.abort_all();
722
723        // Abort background task handles not tracked by BackgroundSupervisor.
724        // Per the Await Discipline rule, fire-and-forget handles must be aborted on shutdown.
725        if let Some(h) = self.services.compression.pending_task_goal.take() {
726            h.abort();
727        }
728        if let Some(h) = self.services.compression.pending_sidequest_result.take() {
729            h.abort();
730        }
731        if let Some(h) = self.services.compression.pending_subgoal.take() {
732            h.abort();
733        }
734
735        // Abort learning tasks (JoinSet detached at turn boundaries but not on shutdown).
736        self.services.learning_engine.learning_tasks.abort_all();
737
738        // Allow cancelled tasks to release their HTTP connections before the summary LLM call.
739        // abort_all() posts cancellation signals but does not drain tasks; aborted futures only
740        // observe cancellation at their next .await point. Without yielding here the summary
741        // call races in-flight enrichment HTTP connections for the same API rate-limit budget.
742        for _ in 0..4 {
743            tokio::task::yield_now().await;
744        }
745
746        self.maybe_store_shutdown_summary().await;
747        self.maybe_store_session_digest().await;
748
749        tracing::info!("agent shutdown complete");
750    }
751
752    /// Run the chat loop, receiving messages via the channel until EOF or shutdown.
753    ///
754    /// # Errors
755    ///
756    /// Returns an error if channel I/O or LLM communication fails.
757    /// Refresh sub-agent metrics snapshot for the TUI metrics panel.
758    fn refresh_subagent_metrics(&mut self) {
759        let Some(ref mgr) = self.services.orchestration.subagent_manager else {
760            return;
761        };
762        let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
763            .statuses()
764            .into_iter()
765            .map(|(id, s)| {
766                let def = mgr.agents_def(&id);
767                crate::metrics::SubAgentMetrics {
768                    name: def.map_or_else(|| id[..8.min(id.len())].to_owned(), |d| d.name.clone()),
769                    id: id.clone(),
770                    state: format!("{:?}", s.state).to_lowercase(),
771                    turns_used: s.turns_used,
772                    max_turns: def.map_or(20, |d| d.permissions.max_turns),
773                    background: def.is_some_and(|d| d.permissions.background),
774                    elapsed_secs: s.started_at.elapsed().as_secs(),
775                    permission_mode: def.map_or_else(String::new, |d| {
776                        use zeph_subagent::def::PermissionMode;
777                        match d.permissions.permission_mode {
778                            PermissionMode::Default => String::new(),
779                            PermissionMode::AcceptEdits => "accept_edits".into(),
780                            PermissionMode::DontAsk => "dont_ask".into(),
781                            PermissionMode::BypassPermissions => "bypass_permissions".into(),
782                            PermissionMode::Plan => "plan".into(),
783                        }
784                    }),
785                    transcript_dir: mgr
786                        .agent_transcript_dir(&id)
787                        .map(|p| p.to_string_lossy().into_owned()),
788                }
789            })
790            .collect();
791        self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
792    }
793
794    /// Non-blocking poll: notify the user when background sub-agents complete.
795    async fn notify_completed_subagents(&mut self) -> Result<(), error::AgentError> {
796        let completed = self.poll_subagents().await;
797        for (task_id, result) in completed {
798            let notice = if result.is_empty() {
799                format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
800            } else {
801                format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
802            };
803            if let Err(e) = self.channel.send(&notice).await {
804                tracing::warn!(error = %e, "failed to send sub-agent completion notice");
805            }
806        }
807        Ok(())
808    }
809
810    /// Run the agent main loop.
811    ///
812    /// # Errors
813    ///
814    /// Returns an error if the channel, LLM provider, or tool execution encounters a fatal error.
815    #[allow(clippy::too_many_lines)] // run loop is inherently large; each branch is independent
816    pub async fn run(&mut self) -> Result<(), error::AgentError>
817    where
818        C: 'static,
819    {
820        if let Some(mut rx) = self.runtime.lifecycle.warmup_ready.take()
821            && !*rx.borrow()
822        {
823            let _ = rx.changed().await;
824            if !*rx.borrow() {
825                tracing::warn!("model warmup did not complete successfully");
826            }
827        }
828
829        // Restore the last-used provider preference before any user interaction (#3308).
830        self.restore_channel_provider().await;
831
832        // Load the session digest once at session start for context injection.
833        self.load_and_cache_session_digest().await;
834        self.maybe_send_resume_recap().await;
835
836        loop {
837            self.apply_provider_override();
838            self.check_tool_refresh().await;
839            self.process_pending_elicitations().await;
840            self.refresh_subagent_metrics();
841            self.notify_completed_subagents().await?;
842            self.drain_channel();
843
844            let (text, image_parts) = if let Some(queued) = self.msg.message_queue.pop_front() {
845                self.notify_queue_count().await;
846                if queued.raw_attachments.is_empty() {
847                    (queued.text, queued.image_parts)
848                } else {
849                    let msg = crate::channel::ChannelMessage {
850                        text: queued.text,
851                        attachments: queued.raw_attachments,
852                    };
853                    self.resolve_message(msg).await
854                }
855            } else {
856                match self.next_event().await? {
857                    None | Some(LoopEvent::Shutdown) => break,
858                    Some(LoopEvent::SkillReload) => {
859                        self.reload_skills().await;
860                        continue;
861                    }
862                    Some(LoopEvent::InstructionReload) => {
863                        self.reload_instructions();
864                        continue;
865                    }
866                    Some(LoopEvent::ConfigReload) => {
867                        self.reload_config();
868                        continue;
869                    }
870                    Some(LoopEvent::UpdateNotification(msg)) => {
871                        if let Err(e) = self.channel.send(&msg).await {
872                            tracing::warn!("failed to send update notification: {e}");
873                        }
874                        continue;
875                    }
876                    Some(LoopEvent::ExperimentCompleted(msg)) => {
877                        self.services.experiments.cancel = None;
878                        if let Err(e) = self.channel.send(&msg).await {
879                            tracing::warn!("failed to send experiment completion: {e}");
880                        }
881                        continue;
882                    }
883                    Some(LoopEvent::ScheduledTask(prompt)) => {
884                        let text = format!("{SCHEDULED_TASK_PREFIX}{prompt}");
885                        let msg = crate::channel::ChannelMessage {
886                            text,
887                            attachments: Vec::new(),
888                        };
889                        self.drain_channel();
890                        self.resolve_message(msg).await
891                    }
892                    Some(LoopEvent::TaskInjected(injection)) => {
893                        if let Some(ref mut ls) = self.runtime.lifecycle.user_loop {
894                            ls.iteration += 1;
895                            tracing::info!(iteration = ls.iteration, "loop: tick");
896                        }
897                        let msg = crate::channel::ChannelMessage {
898                            text: injection.prompt,
899                            attachments: Vec::new(),
900                        };
901                        self.drain_channel();
902                        self.resolve_message(msg).await
903                    }
904                    Some(LoopEvent::FileChanged(event)) => {
905                        self.handle_file_changed(event).await;
906                        continue;
907                    }
908                    Some(LoopEvent::Message(msg)) => {
909                        self.drain_channel();
910                        self.resolve_message(msg).await
911                    }
912                }
913            };
914
915            let trimmed = text.trim();
916
917            // M3: extract flagged URLs from all slash commands before any registry dispatch,
918            // so `/skill install <url>` and similar commands populate user_provided_urls.
919            if trimmed.starts_with('/') {
920                let slash_urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
921                if !slash_urls.is_empty() {
922                    self.services
923                        .security
924                        .user_provided_urls
925                        .write()
926                        .extend(slash_urls);
927                }
928            }
929
930            // Registry dispatch: two-phase command dispatch.
931            //
932            // Phase 1 (session/debug): handlers that need sink + debug + messages but NOT agent.
933            // Phase 2 (agent): handlers that need &mut Agent directly; use null sentinels for
934            // the other CommandContext fields to satisfy the type but avoid borrow conflicts.
935            //
936            // STRUCTURAL NOTE (C4 — borrow-checker constraint, not deferred by oversight):
937            // A `TurnState<'a, C>` struct grouping disjoint `&mut Agent<C>` sub-fields would
938            // eliminate the LIFO-sentinel ordering below. The obstacle: `AgentAccess` is
939            // implemented on `Agent<C>` itself (see `agent_access_impl.rs`), which accesses
940            // fields like `memory_state`, `providers`, `mcp`, and `skill_state`. Those fields
941            // overlap with what a `TurnState` would need to borrow, so `AgentBackend::Real`
942            // cannot simultaneously hold `&mut Agent` while `TurnState` holds `&mut Agent.providers`.
943            // The fix requires splitting `Agent<C>` fields into two disjoint sub-structs and moving
944            // `AgentAccess` to the sub-struct that is disjoint from `TurnState`'s borrow set.
945            // That restructuring touches `agent_access_impl.rs`, `state.rs`, `builder.rs`, all
946            // command handlers, and the binary crate — estimated > 300 lines across > 5 files.
947            // Track as a multi-PR refactor; the current sentinel pattern is correct and safe.
948            //
949            // Drop-order rules enforced here:
950            //   - `sink_adapter` / `null_agent` declared before the registry block → dropped after.
951            //   - Phase-2 sentinels declared before `ctx` → dropped after `ctx`.
952            let session_impl = command_context_impls::SessionAccessImpl {
953                supports_exit: self.channel.supports_exit(),
954            };
955            let mut messages_impl = command_context_impls::MessageAccessImpl {
956                msg: &mut self.msg,
957                tool_state: &mut self.services.tool_state,
958                providers: &mut self.runtime.providers,
959                metrics: &self.runtime.metrics,
960                security: &mut self.services.security,
961                tool_orchestrator: &mut self.tool_orchestrator,
962            };
963            // sink_adapter declared before reg so it is dropped after reg (LIFO).
964            let mut sink_adapter = crate::channel::ChannelSinkAdapter(&mut self.channel);
965            // null_agent must be declared before reg so it lives longer (LIFO drop order).
966            let mut null_agent = zeph_commands::NullAgent;
967            let registry_handled = {
968                use zeph_commands::CommandRegistry;
969                use zeph_commands::handlers::debug::{
970                    DebugDumpCommand, DumpFormatCommand, LogCommand,
971                };
972                use zeph_commands::handlers::help::HelpCommand;
973                use zeph_commands::handlers::session::{
974                    ClearCommand, ClearQueueCommand, ExitCommand, QuitCommand, ResetCommand,
975                };
976
977                let mut reg = CommandRegistry::new();
978                reg.register(ExitCommand);
979                reg.register(QuitCommand);
980                reg.register(ClearCommand);
981                reg.register(ResetCommand);
982                reg.register(ClearQueueCommand);
983                reg.register(LogCommand);
984                reg.register(DebugDumpCommand);
985                reg.register(DumpFormatCommand);
986                reg.register(HelpCommand);
987                #[cfg(test)]
988                reg.register(test_stubs::TestErrorCommand);
989
990                let mut ctx = zeph_commands::CommandContext {
991                    sink: &mut sink_adapter,
992                    debug: &mut self.runtime.debug,
993                    messages: &mut messages_impl,
994                    session: &session_impl,
995                    agent: &mut null_agent,
996                };
997                reg.dispatch(&mut ctx, trimmed).await
998            };
999            let session_reg_missed = registry_handled.is_none();
1000            match self
1001                .apply_dispatch_result(registry_handled, trimmed, false)
1002                .await
1003            {
1004                DispatchFlow::Break => break,
1005                DispatchFlow::Continue => continue,
1006                DispatchFlow::Fallthrough => {
1007                    // Not handled by the session/debug registry; try agent-command registry.
1008                }
1009            }
1010
1011            // Agent-command registry: handlers access Agent<C> directly.
1012            // Null sentinels declared here so they outlive ctx regardless of whether the `if`
1013            // block is entered. `ctx` borrows both `self` and the sentinels; it must drop before
1014            // any subsequent `self.channel.*` calls. Because Rust drops in LIFO order, the
1015            // sentinels here will outlive ctx (ctx is declared later, inside the block).
1016            let mut agent_null_debug = command_context_impls::NullDebugAccess;
1017            let mut agent_null_messages = command_context_impls::NullMessageAccess;
1018            let agent_null_session = command_context_impls::NullSessionAccess;
1019            let mut agent_null_sink = zeph_commands::NullSink;
1020            let agent_result: Option<
1021                Result<zeph_commands::CommandOutput, zeph_commands::CommandError>,
1022            > = if session_reg_missed {
1023                use zeph_commands::CommandRegistry;
1024                use zeph_commands::handlers::{
1025                    acp::AcpCommand,
1026                    agent_cmd::AgentCommand,
1027                    compaction::{CompactCommand, NewConversationCommand, RecapCommand},
1028                    experiment::ExperimentCommand,
1029                    goal::GoalCommand,
1030                    loop_cmd::LoopCommand,
1031                    lsp::LspCommand,
1032                    mcp::McpCommand,
1033                    memory::{GraphCommand, GuidelinesCommand, MemoryCommand},
1034                    misc::{CacheStatsCommand, ImageCommand, NotifyTestCommand},
1035                    model::{ModelCommand, ProviderCommand},
1036                    plan::PlanCommand,
1037                    plugins::PluginsCommand,
1038                    policy::PolicyCommand,
1039                    scheduler::SchedulerCommand,
1040                    skill::{FeedbackCommand, SkillCommand, SkillsCommand},
1041                    status::{FocusCommand, GuardrailCommand, SideQuestCommand, StatusCommand},
1042                    trajectory::{ScopeCommand, TrajectoryCommand},
1043                };
1044
1045                let mut agent_reg = CommandRegistry::new();
1046                agent_reg.register(MemoryCommand);
1047                agent_reg.register(GraphCommand);
1048                agent_reg.register(GuidelinesCommand);
1049                agent_reg.register(ModelCommand);
1050                agent_reg.register(ProviderCommand);
1051                // Phase 6 migrations: /skill, /skills, /feedback use clone-before-await pattern.
1052                agent_reg.register(SkillCommand);
1053                agent_reg.register(SkillsCommand);
1054                agent_reg.register(FeedbackCommand);
1055                agent_reg.register(McpCommand);
1056                agent_reg.register(PolicyCommand);
1057                agent_reg.register(SchedulerCommand);
1058                agent_reg.register(LspCommand);
1059                // Phase 4 migrations (Send-safe commands):
1060                agent_reg.register(CacheStatsCommand);
1061                agent_reg.register(ImageCommand);
1062                agent_reg.register(NotifyTestCommand);
1063                agent_reg.register(StatusCommand);
1064                agent_reg.register(GuardrailCommand);
1065                agent_reg.register(FocusCommand);
1066                agent_reg.register(SideQuestCommand);
1067                agent_reg.register(AgentCommand);
1068                // Phase 5 migrations (Send-compatible):
1069                agent_reg.register(CompactCommand);
1070                agent_reg.register(NewConversationCommand);
1071                agent_reg.register(RecapCommand);
1072                agent_reg.register(ExperimentCommand);
1073                agent_reg.register(PlanCommand);
1074                agent_reg.register(LoopCommand);
1075                agent_reg.register(PluginsCommand);
1076                agent_reg.register(AcpCommand);
1077                agent_reg.register(TrajectoryCommand);
1078                agent_reg.register(ScopeCommand);
1079                agent_reg.register(GoalCommand);
1080
1081                let mut ctx = zeph_commands::CommandContext {
1082                    sink: &mut agent_null_sink,
1083                    debug: &mut agent_null_debug,
1084                    messages: &mut agent_null_messages,
1085                    session: &agent_null_session,
1086                    agent: self,
1087                };
1088                // self is reborrowed; ctx drops at end of this block.
1089                agent_reg.dispatch(&mut ctx, trimmed).await
1090            } else {
1091                None
1092            };
1093            // self.channel is available again here (ctx borrow dropped above).
1094            // Post-dispatch learning hook for `/skill reject` / `/feedback` is triggered
1095            // inside apply_dispatch_result when with_learning = true.
1096            match self
1097                .apply_dispatch_result(agent_result, trimmed, true)
1098                .await
1099            {
1100                DispatchFlow::Break => break,
1101                DispatchFlow::Continue => continue,
1102                DispatchFlow::Fallthrough => {
1103                    // Not handled by agent registry; fall through to existing dispatch.
1104                }
1105            }
1106
1107            match self.handle_builtin_command(trimmed) {
1108                Some(true) => break,
1109                Some(false) => continue,
1110                None => {}
1111            }
1112
1113            self.process_user_message(text, image_parts).await?;
1114        }
1115
1116        // autoDream: run background memory consolidation if conditions are met (#2697).
1117        // Runs with a timeout — partial state is acceptable for MVP.
1118        self.maybe_autodream().await;
1119
1120        // Flush trace collector on normal exit (C-04: Drop handles error/panic paths).
1121        if let Some(ref mut tc) = self.runtime.debug.trace_collector {
1122            tc.finish();
1123        }
1124
1125        Ok(())
1126    }
1127
1128    /// Dispatch a slash-command registry result and flush the channel.
1129    ///
1130    /// Returns [`DispatchFlow::Break`] on exit, [`DispatchFlow::Continue`] when handled, or
1131    /// [`DispatchFlow::Fallthrough`] when `result` is `None`.
1132    /// When `with_learning` is `true`, triggers the post-command learning hook for `Message` output.
1133    async fn apply_dispatch_result(
1134        &mut self,
1135        result: Option<Result<zeph_commands::CommandOutput, zeph_commands::CommandError>>,
1136        command: &str,
1137        with_learning: bool,
1138    ) -> DispatchFlow {
1139        match result {
1140            Some(Ok(zeph_commands::CommandOutput::Exit)) => {
1141                let _ = self.channel.flush_chunks().await;
1142                DispatchFlow::Break
1143            }
1144            Some(Ok(
1145                zeph_commands::CommandOutput::Continue | zeph_commands::CommandOutput::Silent,
1146            )) => {
1147                let _ = self.channel.flush_chunks().await;
1148                DispatchFlow::Continue
1149            }
1150            Some(Ok(zeph_commands::CommandOutput::Message(msg))) => {
1151                let _ = self.channel.send(&msg).await;
1152                let _ = self.channel.flush_chunks().await;
1153                if with_learning {
1154                    self.maybe_trigger_post_command_learning(command).await;
1155                }
1156                DispatchFlow::Continue
1157            }
1158            Some(Err(e)) => {
1159                let _ = self.channel.send(&e.to_string()).await;
1160                let _ = self.channel.flush_chunks().await;
1161                tracing::warn!(command = %command, error = %e.0, "slash command failed");
1162                DispatchFlow::Continue
1163            }
1164            None => DispatchFlow::Fallthrough,
1165        }
1166    }
1167
1168    /// Apply any pending LLM provider override from ACP `set_session_config_option`.
1169    fn apply_provider_override(&mut self) {
1170        if let Some(ref slot) = self.runtime.providers.provider_override
1171            && let Some(new_provider) = slot.write().take()
1172        {
1173            tracing::debug!(provider = new_provider.name(), "ACP model override applied");
1174            self.provider = new_provider;
1175        }
1176    }
1177
1178    /// Poll all event sources and return the next [`LoopEvent`].
1179    ///
1180    /// Returns `None` when the inbound channel closes (graceful shutdown).
1181    ///
1182    /// # Errors
1183    ///
1184    /// Propagates channel receive errors.
1185    async fn next_event(&mut self) -> Result<Option<LoopEvent>, error::AgentError> {
1186        let event = tokio::select! {
1187            result = self.channel.recv() => {
1188                return Ok(result?.map(LoopEvent::Message));
1189            }
1190            () = shutdown_signal(&mut self.runtime.lifecycle.shutdown) => {
1191                tracing::info!("shutting down");
1192                LoopEvent::Shutdown
1193            }
1194            Some(_) = recv_optional(&mut self.services.skill.skill_reload_rx) => {
1195                LoopEvent::SkillReload
1196            }
1197            Some(_) = recv_optional(&mut self.runtime.instructions.reload_rx) => {
1198                LoopEvent::InstructionReload
1199            }
1200            Some(_) = recv_optional(&mut self.runtime.lifecycle.config_reload_rx) => {
1201                LoopEvent::ConfigReload
1202            }
1203            Some(msg) = recv_optional(&mut self.runtime.lifecycle.update_notify_rx) => {
1204                LoopEvent::UpdateNotification(msg)
1205            }
1206            Some(msg) = recv_optional(&mut self.services.experiments.notify_rx) => {
1207                LoopEvent::ExperimentCompleted(msg)
1208            }
1209            Some(prompt) = recv_optional(&mut self.runtime.lifecycle.custom_task_rx) => {
1210                tracing::info!("scheduler: injecting custom task as agent turn");
1211                LoopEvent::ScheduledTask(prompt)
1212            }
1213            () = async {
1214                if let Some(ref mut ls) = self.runtime.lifecycle.user_loop {
1215                    if ls.cancel_tx.is_cancelled() {
1216                        std::future::pending::<()>().await;
1217                    } else {
1218                        ls.interval.tick().await;
1219                    }
1220                } else {
1221                    std::future::pending::<()>().await;
1222                }
1223            } => {
1224                // Re-check user_loop after the tick — /loop stop may have fired between the
1225                // interval firing and this arm executing. Returning Ok(None) causes the caller
1226                // to `continue` without injecting a stale or empty prompt.
1227                let Some(ls) = self.runtime.lifecycle.user_loop.as_ref() else {
1228                    return Ok(None);
1229                };
1230                if ls.cancel_tx.is_cancelled() {
1231                    self.runtime.lifecycle.user_loop = None;
1232                    return Ok(None);
1233                }
1234                let prompt = ls.prompt.clone();
1235                LoopEvent::TaskInjected(task_injection::TaskInjection { prompt })
1236            }
1237            Some(event) = recv_optional(&mut self.runtime.lifecycle.file_changed_rx) => {
1238                LoopEvent::FileChanged(event)
1239            }
1240        };
1241        Ok(Some(event))
1242    }
1243
1244    async fn resolve_message(
1245        &self,
1246        msg: crate::channel::ChannelMessage,
1247    ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
1248        use crate::channel::{Attachment, AttachmentKind};
1249        use zeph_llm::provider::{ImageData, MessagePart};
1250
1251        let text_base = msg.text.clone();
1252
1253        let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
1254            .attachments
1255            .into_iter()
1256            .partition(|a| a.kind == AttachmentKind::Audio);
1257
1258        tracing::debug!(
1259            audio = audio_attachments.len(),
1260            has_stt = self.runtime.providers.stt.is_some(),
1261            "resolve_message attachments"
1262        );
1263
1264        let text = if !audio_attachments.is_empty()
1265            && let Some(stt) = self.runtime.providers.stt.as_ref()
1266        {
1267            let mut transcribed_parts = Vec::new();
1268            for attachment in &audio_attachments {
1269                if attachment.data.len() > MAX_AUDIO_BYTES {
1270                    tracing::warn!(
1271                        size = attachment.data.len(),
1272                        max = MAX_AUDIO_BYTES,
1273                        "audio attachment exceeds size limit, skipping"
1274                    );
1275                    continue;
1276                }
1277                match stt
1278                    .transcribe(&attachment.data, attachment.filename.as_deref())
1279                    .await
1280                {
1281                    Ok(result) => {
1282                        tracing::info!(
1283                            len = result.text.len(),
1284                            language = ?result.language,
1285                            "audio transcribed"
1286                        );
1287                        transcribed_parts.push(result.text);
1288                    }
1289                    Err(e) => {
1290                        tracing::error!(error = %e, "audio transcription failed");
1291                    }
1292                }
1293            }
1294            if transcribed_parts.is_empty() {
1295                text_base
1296            } else {
1297                let transcribed = transcribed_parts.join("\n");
1298                if text_base.is_empty() {
1299                    transcribed
1300                } else {
1301                    format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
1302                }
1303            }
1304        } else {
1305            if !audio_attachments.is_empty() {
1306                tracing::warn!(
1307                    count = audio_attachments.len(),
1308                    "audio attachments received but no STT provider configured, dropping"
1309                );
1310            }
1311            text_base
1312        };
1313
1314        let mut image_parts = Vec::new();
1315        for attachment in image_attachments {
1316            if attachment.data.len() > MAX_IMAGE_BYTES {
1317                tracing::warn!(
1318                    size = attachment.data.len(),
1319                    max = MAX_IMAGE_BYTES,
1320                    "image attachment exceeds size limit, skipping"
1321                );
1322                continue;
1323            }
1324            let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
1325            image_parts.push(MessagePart::Image(Box::new(ImageData {
1326                data: attachment.data,
1327                mime_type,
1328            })));
1329        }
1330
1331        (text, image_parts)
1332    }
1333
1334    /// Create a new [`Turn`] for the given input and advance the turn counter.
1335    ///
1336    /// Clears per-turn state that must not carry over between turns:
1337    /// - per-turn `CancellationToken` (new token for each turn)
1338    /// - per-turn URL set in `SecurityState` (cleared here; re-populated in
1339    ///   `process_user_message_inner` after security checks)
1340    fn begin_turn(&mut self, input: turn::TurnInput) -> turn::Turn {
1341        let id = turn::TurnId(self.runtime.debug.iteration_counter as u64);
1342        self.runtime.debug.iteration_counter += 1;
1343        let cancel_token = CancellationToken::new();
1344        // keep agent-wide token in sync with per-turn token — TODO(#3498): consolidate in Phase 2
1345        self.runtime.lifecycle.cancel_token = cancel_token.clone();
1346        self.services.security.user_provided_urls.write().clear();
1347        // Reset per-turn LLM request counter for the notification gate.
1348        self.runtime.lifecycle.turn_llm_requests = 0;
1349
1350        // Spec 050 §2: drain pending risk signals from executor layers before advancing.
1351        {
1352            let pending: Vec<u8> = {
1353                let mut q = self.services.security.trajectory_signal_queue.lock();
1354                std::mem::take(&mut *q)
1355            };
1356            for code in pending {
1357                self.services
1358                    .security
1359                    .trajectory
1360                    .record(crate::agent::trajectory::RiskSignal::from_code(code));
1361            }
1362        }
1363        // Spec 050 Invariant 2: advance trajectory sentinel BEFORE any gate evaluation.
1364        // F5: write auto-recover audit entry when sentinel hard-resets.
1365        if self.services.security.trajectory.advance_turn()
1366            && let Some(logger) = self.tool_orchestrator.audit_logger.clone()
1367        {
1368            let entry = zeph_tools::AuditEntry {
1369                timestamp: zeph_tools::chrono_now(),
1370                tool: "<sentinel>".to_owned().into(),
1371                command: String::new(),
1372                result: zeph_tools::AuditResult::Success,
1373                duration_ms: 0,
1374                error_category: Some("trajectory_auto_recover".to_owned()),
1375                error_domain: Some("security".to_owned()),
1376                error_phase: None,
1377                claim_source: None,
1378                mcp_server_id: None,
1379                injection_flagged: false,
1380                embedding_anomalous: false,
1381                cross_boundary_mcp_to_acp: false,
1382                adversarial_policy_decision: None,
1383                exit_code: None,
1384                truncated: false,
1385                caller_id: None,
1386                policy_match: None,
1387                correlation_id: None,
1388                vigil_risk: None,
1389                execution_env: None,
1390                resolved_cwd: None,
1391                scope_at_definition: None,
1392                scope_at_dispatch: None,
1393            };
1394            self.runtime.lifecycle.supervisor.spawn(
1395                crate::agent::agent_supervisor::TaskClass::Telemetry,
1396                "trajectory-auto-recover-audit",
1397                async move { logger.log(&entry).await },
1398            );
1399        }
1400        // Publish updated risk level to the shared slot so PolicyGateExecutor can read it.
1401        let risk_level = self.services.security.trajectory.current_risk();
1402        *self.services.security.trajectory_risk_slot.write() = u8::from(risk_level);
1403        // TUI/CLI: emit a status indicator when risk reaches High or Critical (NFR-CG-006).
1404        if let Some(alert) = self.services.security.trajectory.poll_alert() {
1405            let msg = format!(
1406                "[trajectory] Risk level: {:?} (score={:.2})",
1407                alert.level, alert.score
1408            );
1409            tracing::warn!(
1410                level = ?alert.level,
1411                score = alert.score,
1412                "trajectory sentinel alert"
1413            );
1414            if let Some(ref tx) = self.services.session.status_tx {
1415                let _ = tx.send(msg);
1416            }
1417        }
1418
1419        let context = turn::TurnContext::new(id, cancel_token, self.runtime.config.timeouts);
1420        turn::Turn::new(context, input)
1421    }
1422
1423    /// Finalise a turn: copy accumulated timings into `MetricsState` and flush.
1424    ///
1425    /// Must be called exactly once per turn, after `process_user_message_inner` returns
1426    /// (regardless of success or error). Corresponds to the M2 resolution in the spec:
1427    /// `TurnMetrics.timings` is the single source of truth; `MetricsState.pending_timings`
1428    /// is populated from it here so the rest of the pipeline is unchanged.
1429    fn end_turn(&mut self, turn: turn::Turn) {
1430        self.runtime.metrics.pending_timings = turn.metrics.timings;
1431        self.flush_turn_timings();
1432        // Clear per-turn intent (FR-008): must not persist across turns.
1433        self.services.session.current_turn_intent = None;
1434        // Cancel all in-flight speculative handles at turn boundary.
1435        if let Some(ref engine) = self.services.speculation_engine {
1436            let metrics = engine.end_turn();
1437            if metrics.committed > 0 || metrics.cancelled > 0 {
1438                tracing::debug!(
1439                    committed = metrics.committed,
1440                    cancelled = metrics.cancelled,
1441                    wasted_ms = metrics.wasted_ms,
1442                    "speculation: turn boundary metrics"
1443                );
1444            }
1445        }
1446    }
1447
1448    #[cfg_attr(
1449        feature = "profiling",
1450        tracing::instrument(name = "agent.turn", skip_all, fields(turn_id))
1451    )]
1452    async fn process_user_message(
1453        &mut self,
1454        text: String,
1455        image_parts: Vec<zeph_llm::provider::MessagePart>,
1456    ) -> Result<(), error::AgentError> {
1457        let input = turn::TurnInput::new(text, image_parts);
1458        let mut t = self.begin_turn(input);
1459
1460        let turn_idx = usize::try_from(t.id().0).unwrap_or(usize::MAX);
1461        tracing::Span::current().record("turn_id", t.id().0);
1462        // Record iteration start in trace collector (C-02: owned guard, no borrow held).
1463        self.runtime
1464            .debug
1465            .start_iteration_span(turn_idx, t.input.text.trim());
1466
1467        let result = Box::pin(self.process_user_message_inner(&mut t)).await;
1468
1469        // Close iteration span regardless of outcome (partial trace preserved on error).
1470        let span_status = if result.is_ok() {
1471            crate::debug_dump::trace::SpanStatus::Ok
1472        } else {
1473            crate::debug_dump::trace::SpanStatus::Error {
1474                message: "iteration failed".to_owned(),
1475            }
1476        };
1477        self.runtime.debug.end_iteration_span(turn_idx, span_status);
1478
1479        self.end_turn(t);
1480        result
1481    }
1482
1483    async fn process_user_message_inner(
1484        &mut self,
1485        turn: &mut turn::Turn,
1486    ) -> Result<(), error::AgentError> {
1487        self.reap_background_tasks_and_update_metrics();
1488
1489        let tokens_before_turn = self
1490            .runtime
1491            .metrics
1492            .metrics_tx
1493            .as_ref()
1494            .map_or(0, |tx| tx.borrow().total_tokens);
1495
1496        // Drain any background shell completions that arrived since the last turn.
1497        // They are buffered in `pending_background_completions` and merged with the
1498        // real user message into a single user-role block below (N1 invariant).
1499        self.drain_background_completions();
1500
1501        self.wire_cancel_bridge(turn.cancel_token());
1502
1503        // Clone text out of Turn so we can hold both `&str` borrows and mutate turn.metrics.
1504        let text = turn.input.text.clone();
1505        let trimmed_owned = text.trim().to_owned();
1506        let trimmed = trimmed_owned.as_str();
1507
1508        // Capture current-turn intent for VIGIL gate (FR-007). Truncated to 1024 chars.
1509        // Must be set BEFORE any tool call; cleared at end_turn (FR-008).
1510        if self.services.security.vigil.is_some() {
1511            let intent_len = trimmed.floor_char_boundary(1024.min(trimmed.len()));
1512            self.services.session.current_turn_intent = Some(trimmed[..intent_len].to_owned());
1513        }
1514
1515        if let Some(result) = self.dispatch_slash_command(trimmed).await {
1516            return result;
1517        }
1518
1519        self.check_pending_rollbacks().await;
1520
1521        if self.pre_process_security(trimmed).await? {
1522            return Ok(());
1523        }
1524
1525        let t_ctx = std::time::Instant::now();
1526        tracing::debug!("turn timing: prepare_context start");
1527        self.advance_context_lifecycle_guarded(&text, trimmed).await;
1528        turn.metrics_mut().timings.prepare_context_ms =
1529            u64::try_from(t_ctx.elapsed().as_millis()).unwrap_or(u64::MAX);
1530        tracing::debug!(
1531            ms = turn.metrics_snapshot().timings.prepare_context_ms,
1532            "turn timing: prepare_context done"
1533        );
1534
1535        let image_parts = std::mem::take(&mut turn.input.image_parts);
1536        // Prepend any background completion blocks to the user text. All completions and the
1537        // user message MUST be merged into a single user-role block to satisfy the strict
1538        // user/assistant alternation rule (Anthropic Messages API — N1 invariant).
1539        let merged_text = self.build_user_message_text_with_bg_completions(&text);
1540        let user_msg = self.build_user_message(&merged_text, image_parts);
1541
1542        // Extract URLs from user input and add to user_provided_urls for grounding checks.
1543        // URL set was cleared in begin_turn; re-populate for this turn.
1544        let urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
1545        if !urls.is_empty() {
1546            self.services
1547                .security
1548                .user_provided_urls
1549                .write()
1550                .extend(urls);
1551        }
1552
1553        // Capture raw user input as goal text for A-MAC goal-conditioned write gating (#2483).
1554        // Derived from the raw input text before context assembly to avoid timing dependencies.
1555        self.services.memory.extraction.goal_text = Some(text.clone());
1556
1557        let t_persist = std::time::Instant::now();
1558        tracing::debug!("turn timing: persist_message(user) start");
1559        // Image parts intentionally excluded — base64 payloads too large for message history.
1560        self.persist_message(Role::User, &text, &[], false).await;
1561        turn.metrics_mut().timings.persist_message_ms =
1562            u64::try_from(t_persist.elapsed().as_millis()).unwrap_or(u64::MAX);
1563        tracing::debug!(
1564            ms = turn.metrics_snapshot().timings.persist_message_ms,
1565            "turn timing: persist_message(user) done"
1566        );
1567        self.push_message(user_msg);
1568
1569        // llm_chat_ms and tool_exec_ms are accumulated inside call_chat_with_tools and
1570        // handle_native_tool_calls respectively via metrics.pending_timings.
1571        tracing::debug!("turn timing: process_response start");
1572        let turn_had_error = if let Err(e) = self.process_response().await {
1573            // Detach any in-flight learning tasks before mutating message state.
1574            self.services.learning_engine.learning_tasks.detach_all();
1575            tracing::error!("Response processing failed: {e:#}");
1576
1577            // Record provider failure timestamp so the next turn can skip
1578            // expensive context preparation while providers are known-down.
1579            if e.is_no_providers() {
1580                self.runtime.lifecycle.last_no_providers_at = Some(std::time::Instant::now());
1581                let backoff_secs = self.runtime.config.timeouts.no_providers_backoff_secs;
1582                tracing::warn!(
1583                    backoff_secs,
1584                    "no providers available; backing off before next turn"
1585                );
1586                tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await;
1587            }
1588
1589            let user_msg = format!("Error: {e:#}");
1590            self.channel.send(&user_msg).await?;
1591            self.msg.messages.pop();
1592            self.recompute_prompt_tokens();
1593            self.channel.flush_chunks().await?;
1594            true
1595        } else {
1596            // Detach learning tasks spawned this turn — they are fire-and-forget and must not
1597            // leak into the next turn's context.
1598            self.services.learning_engine.learning_tasks.detach_all();
1599            self.truncate_old_tool_results();
1600            // MagicDocs: spawn background doc updates if any are due (#2702).
1601            self.maybe_update_magic_docs();
1602            // Compression spectrum: fire-and-forget promotion scan (#3305).
1603            self.maybe_spawn_promotion_scan();
1604            false
1605        };
1606        tracing::debug!("turn timing: process_response done");
1607
1608        // MARCH self-check hook: runs after every successful response, including cache-hit path.
1609        #[cfg(feature = "self-check")]
1610        if let Some(pipeline) = self.services.quality.clone() {
1611            self.run_self_check_for_turn(pipeline, turn.id().0).await;
1612        }
1613        // Flush pending response chunks and emit ResponseEnd exactly once per turn.
1614        // send() no longer emits ResponseEnd — flush_chunks() is the sole emitter.
1615        // When self-check appends a flag_marker chunk, this single call covers both
1616        // the main response and the marker, preventing the double response_end of #3243.
1617        let _ = self.channel.flush_chunks().await;
1618
1619        self.maybe_fire_completion_notification(turn, turn_had_error);
1620
1621        self.flush_goal_accounting(tokens_before_turn);
1622
1623        // Collect llm_chat_ms and tool_exec_ms from MetricsState.pending_timings (accumulated
1624        // by the tool execution chain) into turn.metrics so end_turn can flush them.
1625        // This is the Phase 1 bridging: existing code writes to pending_timings directly;
1626        // we harvest those values into Turn before end_turn overwrites pending_timings.
1627        turn.metrics_mut().timings.llm_chat_ms = self.runtime.metrics.pending_timings.llm_chat_ms;
1628        turn.metrics_mut().timings.tool_exec_ms = self.runtime.metrics.pending_timings.tool_exec_ms;
1629
1630        Ok(())
1631    }
1632
1633    /// Wire the per-turn cancellation token into the cancel bridge.
1634    ///
1635    /// The bridge translates `cancel_signal` (Notify) into a `CancellationToken` cancel so that
1636    /// channel-level abort requests propagate to the in-flight LLM call. The previous bridge task
1637    /// is aborted before a new one is spawned to prevent unbounded accumulation (#2737).
1638    fn wire_cancel_bridge(&mut self, turn_token: &tokio_util::sync::CancellationToken) {
1639        let signal = Arc::clone(&self.runtime.lifecycle.cancel_signal);
1640        let token = turn_token.clone();
1641        // Keep lifecycle.cancel_token in sync so existing code that reads it still works.
1642        self.runtime.lifecycle.cancel_token = turn_token.clone();
1643        if let Some(prev) = self.runtime.lifecycle.cancel_bridge_handle.take() {
1644            prev.abort();
1645        }
1646        self.runtime.lifecycle.cancel_bridge_handle =
1647            Some(self.runtime.lifecycle.task_supervisor.spawn_oneshot(
1648                std::sync::Arc::from("agent.lifecycle.cancel_bridge"),
1649                move || async move {
1650                    signal.notified().await;
1651                    token.cancel();
1652                },
1653            ));
1654    }
1655
1656    /// Reap completed background tasks, apply summarization signal, and update supervisor metrics.
1657    ///
1658    /// Called at the top of each turn, before any user message processing.
1659    fn reap_background_tasks_and_update_metrics(&mut self) {
1660        let bg_signal = self.runtime.lifecycle.supervisor.reap();
1661        if bg_signal.did_summarize {
1662            self.services.memory.persistence.unsummarized_count = 0;
1663            tracing::debug!("background summarization completed; unsummarized_count reset");
1664        }
1665        let snap = self.runtime.lifecycle.supervisor.metrics_snapshot();
1666        self.update_metrics(|m| {
1667            m.bg_inflight = snap.inflight as u64;
1668            m.bg_dropped = snap.total_dropped();
1669            m.bg_completed = snap.total_completed();
1670            m.bg_enrichment_inflight = snap.class_inflight[0] as u64;
1671            m.bg_telemetry_inflight = snap.class_inflight[1] as u64;
1672        });
1673
1674        // Update shell background run rows for TUI panel.
1675        if self.runtime.lifecycle.shell_executor_handle.is_some() {
1676            let shell_rows: Vec<crate::metrics::ShellBackgroundRunRow> = self
1677                .runtime
1678                .lifecycle
1679                .shell_executor_handle
1680                .as_ref()
1681                .map(|e| e.background_runs_snapshot())
1682                .unwrap_or_default()
1683                .into_iter()
1684                .map(|s| crate::metrics::ShellBackgroundRunRow {
1685                    run_id: truncate_shell_run_id(&s.run_id),
1686                    command: truncate_shell_command(&s.command),
1687                    elapsed_secs: s.elapsed_ms / 1000,
1688                })
1689                .collect();
1690            self.update_metrics(|m| {
1691                m.shell_background_runs = shell_rows;
1692            });
1693        }
1694
1695        // Intentional ordering: reap() runs before abort_class() so completed tasks are
1696        // accounted in the snapshot above.
1697        if self
1698            .runtime
1699            .config
1700            .supervisor_config
1701            .abort_enrichment_on_turn
1702        {
1703            self.runtime
1704                .lifecycle
1705                .supervisor
1706                .abort_class(agent_supervisor::TaskClass::Enrichment);
1707        }
1708    }
1709
1710    /// Fire completion notifications and `turn_complete` hooks after each turn.
1711    ///
1712    /// Builds [`crate::notifications::TurnSummary`] once and reuses it for both the
1713    /// [`crate::notifications::Notifier`] and any `[[hooks.turn_complete]]` entries. The
1714    /// `preview` field is already redacted by [`Self::last_assistant_preview`], so hook
1715    /// env vars carry no raw assistant output.
1716    ///
1717    /// Gating:
1718    /// - When a `Notifier` is configured, both the notifier and hooks share its
1719    ///   `should_fire` gate (`min_turn_duration_ms`, `only_on_error`, `enabled`).
1720    /// - When no `Notifier` is configured, hooks fire on every turn completion (the
1721    ///   notifier path is simply skipped).
1722    fn maybe_fire_completion_notification(&mut self, turn: &turn::Turn, is_error: bool) {
1723        let snap = turn.metrics_snapshot().timings.clone();
1724        let duration_ms = snap
1725            .prepare_context_ms
1726            .saturating_add(snap.llm_chat_ms)
1727            .saturating_add(snap.tool_exec_ms);
1728        let summary = crate::notifications::TurnSummary {
1729            duration_ms,
1730            preview: self.last_assistant_preview(160),
1731            // TODO: wire turn_tool_calls counter once LifecycleState tracks it (Phase 2).
1732            tool_calls: 0,
1733            llm_requests: self.runtime.lifecycle.turn_llm_requests,
1734            exit_status: if is_error {
1735                crate::notifications::TurnExitStatus::Error
1736            } else {
1737                crate::notifications::TurnExitStatus::Success
1738            },
1739        };
1740
1741        // Gate evaluation: notifier's should_fire result (or unconditional when absent).
1742        let gate_ok = self
1743            .runtime
1744            .lifecycle
1745            .notifier
1746            .as_ref()
1747            .is_none_or(|n| n.should_fire(&summary));
1748
1749        // 1) Existing notifier path — unchanged semantics.
1750        if let Some(ref notifier) = self.runtime.lifecycle.notifier
1751            && gate_ok
1752        {
1753            notifier.fire(&summary);
1754        }
1755
1756        // 2) turn_complete hooks — fire-and-forget, matching the Notifier::fire pattern.
1757        // MCP dispatch is omitted: turn_complete hooks are expected to be Command-type
1758        // (shell scripts for desktop notifications, status-bar updates, etc.). McpTool
1759        // hooks in this context would require a Send + 'static MCP handle; defer that
1760        // extension if needed via a follow-up.
1761        let hooks = self.services.session.hooks_config.turn_complete.clone();
1762        if !hooks.is_empty() && gate_ok {
1763            let mut env = std::collections::HashMap::new();
1764            env.insert(
1765                "ZEPH_TURN_DURATION_MS".to_owned(),
1766                summary.duration_ms.to_string(),
1767            );
1768            env.insert(
1769                "ZEPH_TURN_STATUS".to_owned(),
1770                if is_error { "error" } else { "success" }.to_owned(),
1771            );
1772            env.insert("ZEPH_TURN_PREVIEW".to_owned(), summary.preview.clone());
1773            env.insert(
1774                "ZEPH_TURN_LLM_REQUESTS".to_owned(),
1775                summary.llm_requests.to_string(),
1776            );
1777            let _span = tracing::info_span!("core.agent.turn_hooks").entered();
1778            let _accepted = self.runtime.lifecycle.supervisor.spawn(
1779                agent_supervisor::TaskClass::Telemetry,
1780                "turn-complete-hooks",
1781                async move {
1782                    // Explicitly 'static so the future satisfies tokio::spawn's bound.
1783                    // None holds no actual reference; the annotation is vacuously satisfied.
1784                    let no_mcp: Option<&'static dyn zeph_subagent::McpDispatch> = None;
1785                    if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, no_mcp).await {
1786                        tracing::warn!(error = %e, "turn_complete hook failed");
1787                    }
1788                },
1789            );
1790        }
1791    }
1792
1793    /// Publish the active goal snapshot to `MetricsSnapshot` and fire `on_turn_complete`
1794    /// accounting as a tracked background task.
1795    fn flush_goal_accounting(&mut self, tokens_before: u64) {
1796        let goal_snap = self
1797            .services
1798            .goal_accounting
1799            .as_ref()
1800            .and_then(|a| a.snapshot());
1801        self.update_metrics(|m| m.active_goal = goal_snap);
1802
1803        if let Some(ref accounting) = self.services.goal_accounting {
1804            let tokens_after = self
1805                .runtime
1806                .metrics
1807                .metrics_tx
1808                .as_ref()
1809                .map_or(0, |tx| tx.borrow().total_tokens);
1810            let turn_tokens = tokens_after.saturating_sub(tokens_before);
1811            let mut spawned: Option<
1812                std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + 'static>>,
1813            > = None;
1814            accounting.on_turn_complete(turn_tokens, |fut| {
1815                spawned = Some(fut);
1816            });
1817            if let Some(fut) = spawned {
1818                let _ = self.runtime.lifecycle.supervisor.spawn(
1819                    agent_supervisor::TaskClass::Telemetry,
1820                    "goal-accounting",
1821                    fut,
1822                );
1823            }
1824        }
1825    }
1826
1827    // Returns true if the input was blocked and the caller should return Ok(()) immediately.
1828    #[cfg_attr(
1829        feature = "profiling",
1830        tracing::instrument(name = "agent.security_prescreen", skip_all)
1831    )]
1832    async fn pre_process_security(&mut self, trimmed: &str) -> Result<bool, error::AgentError> {
1833        // Guardrail: LLM-based prompt injection pre-screening at the user input boundary.
1834        if let Some(ref guardrail) = self.services.security.guardrail {
1835            use zeph_sanitizer::guardrail::GuardrailVerdict;
1836            let verdict = guardrail.check(trimmed).await;
1837            match &verdict {
1838                GuardrailVerdict::Flagged { reason, .. } => {
1839                    tracing::warn!(
1840                        reason = %reason,
1841                        should_block = verdict.should_block(),
1842                        "guardrail flagged user input"
1843                    );
1844                    if verdict.should_block() {
1845                        let msg = format!("[guardrail] Input blocked: {reason}");
1846                        let _ = self.channel.send(&msg).await;
1847                        let _ = self.channel.flush_chunks().await;
1848                        return Ok(true);
1849                    }
1850                    // Warn mode: notify but continue.
1851                    let _ = self
1852                        .channel
1853                        .send(&format!("[guardrail] Warning: {reason}"))
1854                        .await;
1855                }
1856                GuardrailVerdict::Error { error } => {
1857                    if guardrail.error_should_block() {
1858                        tracing::warn!(%error, "guardrail check failed (fail_strategy=closed), blocking input");
1859                        let msg = "[guardrail] Input blocked: check failed (see logs for details)";
1860                        let _ = self.channel.send(msg).await;
1861                        let _ = self.channel.flush_chunks().await;
1862                        return Ok(true);
1863                    }
1864                    tracing::warn!(%error, "guardrail check failed (fail_strategy=open), allowing input");
1865                }
1866                GuardrailVerdict::Safe => {}
1867            }
1868        }
1869
1870        // ML classifier: lightweight injection detection on user input boundary.
1871        // Runs after guardrail (LLM-based) to layer defenses. On detection, blocks and returns.
1872        // Falls back to regex on classifier error/timeout — never degrades below regex baseline.
1873        // Gated by `scan_user_input`: DeBERTa is tuned for external/untrusted content, not
1874        // direct user chat. Disabled by default to prevent false positives on benign messages.
1875        #[cfg(feature = "classifiers")]
1876        if self.services.security.sanitizer.scan_user_input() {
1877            match self
1878                .services
1879                .security
1880                .sanitizer
1881                .classify_injection(trimmed)
1882                .await
1883            {
1884                zeph_sanitizer::InjectionVerdict::Blocked => {
1885                    self.push_classifier_metrics();
1886                    let _ = self
1887                        .channel
1888                        .send("[security] Input blocked: injection detected by classifier.")
1889                        .await;
1890                    let _ = self.channel.flush_chunks().await;
1891                    return Ok(true);
1892                }
1893                zeph_sanitizer::InjectionVerdict::Suspicious => {
1894                    tracing::warn!("injection_classifier soft_signal on user input");
1895                }
1896                zeph_sanitizer::InjectionVerdict::Clean => {}
1897            }
1898        }
1899        #[cfg(feature = "classifiers")]
1900        self.push_classifier_metrics();
1901
1902        Ok(false)
1903    }
1904
1905    /// Run `advance_context_lifecycle` with provider-health gating and a wall-clock timeout.
1906    ///
1907    /// Skips context preparation entirely when providers failed on the previous turn and the
1908    /// `no_providers_backoff_secs` window has not yet elapsed. When providers are available,
1909    /// wraps the call with `context_prep_timeout_secs` to prevent a stall when embed backends
1910    /// are rate-limited or unavailable (#3357).
1911    async fn advance_context_lifecycle_guarded(&mut self, text: &str, trimmed: &str) {
1912        let backoff_secs = self.runtime.config.timeouts.no_providers_backoff_secs;
1913        let prep_timeout_secs = self.runtime.config.timeouts.context_prep_timeout_secs;
1914
1915        // Skip expensive memory recall / embedding when providers are known-down.
1916        let providers_recently_failed = self
1917            .runtime
1918            .lifecycle
1919            .last_no_providers_at
1920            .is_some_and(|t| t.elapsed().as_secs() < backoff_secs);
1921
1922        if providers_recently_failed {
1923            tracing::warn!(
1924                backoff_secs,
1925                "skipping context preparation: providers were unavailable on last turn"
1926            );
1927            return;
1928        }
1929
1930        let timeout_dur = std::time::Duration::from_secs(prep_timeout_secs);
1931        match tokio::time::timeout(timeout_dur, self.advance_context_lifecycle(text, trimmed)).await
1932        {
1933            Ok(()) => {}
1934            Err(_elapsed) => {
1935                tracing::warn!(
1936                    timeout_secs = prep_timeout_secs,
1937                    "context preparation timed out; proceeding with degraded context"
1938                );
1939            }
1940        }
1941    }
1942
1943    #[cfg_attr(
1944        feature = "profiling",
1945        tracing::instrument(name = "agent.prepare_context", skip_all)
1946    )]
1947    async fn advance_context_lifecycle(&mut self, text: &str, trimmed: &str) {
1948        // Reset per-message pruning cache at the start of each turn (#2298).
1949        self.services.mcp.pruning_cache.reset();
1950
1951        // Extract before rebuild_system_prompt so the value is not tainted
1952        // by the secrets-bearing system prompt (ConversationId is just an i64).
1953        let conv_id = self.services.memory.persistence.conversation_id;
1954        self.rebuild_system_prompt(text).await;
1955
1956        self.detect_and_record_corrections(trimmed, conv_id).await;
1957        self.services.learning_engine.tick();
1958        self.analyze_and_learn().await;
1959        self.sync_graph_counts().await;
1960
1961        // Reset per-turn compaction guard FIRST so SideQuest sees a clean slate (C2 fix).
1962        // complete_focus and maybe_sidequest_eviction set this flag when they run (C1 fix).
1963        // advance_turn() transitions CompactedThisTurn → Cooling/Ready; all other states
1964        // pass through unchanged. See CompactionState::advance_turn for ordering guarantees.
1965        self.context_manager.compaction = self.context_manager.compaction.advance_turn();
1966
1967        // Tick Focus Agent and SideQuest turn counters (#1850, #1885).
1968        {
1969            self.services.focus.tick();
1970
1971            // SideQuest eviction: runs every N user turns when enabled.
1972            // Skipped when is_compacted_this_turn (focus truncation or prior eviction ran).
1973            let sidequest_should_fire = self.services.sidequest.tick();
1974            if sidequest_should_fire && !self.context_manager.compaction.is_compacted_this_turn() {
1975                self.maybe_sidequest_eviction();
1976            }
1977        }
1978
1979        // Experience memory: evolution sweep (fire-and-forget). Runs every N user turns,
1980        // gated on graph + experience config, and only when both stores are attached.
1981        {
1982            let cfg = &self.services.memory.extraction.graph_config.experience;
1983            if cfg.enabled
1984                && cfg.evolution_sweep_enabled
1985                && cfg.evolution_sweep_interval > 0
1986                && self
1987                    .services
1988                    .sidequest
1989                    .turn_counter
1990                    .checked_rem(cfg.evolution_sweep_interval as u64)
1991                    == Some(0)
1992                && let Some(memory) = self.services.memory.persistence.memory.as_ref()
1993                && let (Some(exp), Some(graph)) =
1994                    (memory.experience.as_ref(), memory.graph_store.as_ref())
1995            {
1996                let exp = std::sync::Arc::clone(exp);
1997                let graph = std::sync::Arc::clone(graph);
1998                let threshold = cfg.confidence_prune_threshold;
1999                let turn = self.services.sidequest.turn_counter;
2000                let accepted = self.runtime.lifecycle.supervisor.spawn(
2001                    agent_supervisor::TaskClass::Telemetry,
2002                    "experience-sweep",
2003                    async move {
2004                        match exp.evolution_sweep(graph.as_ref(), threshold).await {
2005                            Ok(stats) => tracing::info!(
2006                                turn,
2007                                self_loops = stats.pruned_self_loops,
2008                                low_confidence = stats.pruned_low_confidence,
2009                                "evolution sweep complete",
2010                            ),
2011                            Err(e) => tracing::warn!(
2012                                turn,
2013                                error = %e,
2014                                "evolution sweep failed",
2015                            ),
2016                        }
2017                    },
2018                );
2019                if !accepted {
2020                    tracing::warn!(
2021                        turn = self.services.sidequest.turn_counter,
2022                        "experience-sweep dropped (telemetry class at capacity)",
2023                    );
2024                }
2025            }
2026        }
2027
2028        // Cache-expiry warning (#2715): notify user when prompt cache has likely expired.
2029        if let Some(warning) = self.cache_expiry_warning() {
2030            tracing::info!(warning, "cache expiry warning");
2031            let _ = self.channel.send_status(&warning).await;
2032        }
2033
2034        // Time-based microcompact (#2699): strip stale low-value tool outputs before compaction.
2035        // Zero-LLM-cost; runs only when session gap exceeds configured threshold.
2036        self.maybe_time_based_microcompact();
2037
2038        // Tier 0: batch-apply deferred tool summaries when approaching context limit.
2039        // This is a pure in-memory operation (no LLM call) — summaries were pre-computed
2040        // during the tool loop. Intentionally does NOT set compacted_this_turn, so
2041        // proactive/reactive compaction may still fire if tokens remain above their thresholds.
2042        self.maybe_apply_deferred_summaries();
2043        self.flush_deferred_summaries().await;
2044
2045        // Proactive compression fires first (if configured); if it runs, reactive is skipped.
2046        if let Err(e) = self.maybe_proactive_compress().await {
2047            tracing::warn!("proactive compression failed: {e:#}");
2048        }
2049
2050        if let Err(e) = self.maybe_compact().await {
2051            tracing::warn!("context compaction failed: {e:#}");
2052        }
2053
2054        if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
2055            tracing::warn!("context preparation failed: {e:#}");
2056        }
2057
2058        // MAR: propagate top-1 recall confidence to the router for cost-aware routing.
2059        self.provider
2060            .set_memory_confidence(self.services.memory.persistence.last_recall_confidence);
2061
2062        self.services.learning_engine.reset_reflection();
2063    }
2064
2065    fn build_user_message(
2066        &mut self,
2067        text: &str,
2068        image_parts: Vec<zeph_llm::provider::MessagePart>,
2069    ) -> Message {
2070        let mut all_image_parts = std::mem::take(&mut self.msg.pending_image_parts);
2071        all_image_parts.extend(image_parts);
2072
2073        if !all_image_parts.is_empty() && self.provider.supports_vision() {
2074            let mut parts = vec![zeph_llm::provider::MessagePart::Text {
2075                text: text.to_owned(),
2076            }];
2077            parts.extend(all_image_parts);
2078            Message::from_parts(Role::User, parts)
2079        } else {
2080            if !all_image_parts.is_empty() {
2081                tracing::warn!(
2082                    count = all_image_parts.len(),
2083                    "image attachments dropped: provider does not support vision"
2084                );
2085            }
2086            Message {
2087                role: Role::User,
2088                content: text.to_owned(),
2089                parts: vec![],
2090                metadata: MessageMetadata::default(),
2091            }
2092        }
2093    }
2094
2095    /// Drain any ready [`zeph_tools::BackgroundCompletion`]s from the channel into
2096    /// `pending_background_completions`. Bounded by `BACKGROUND_COMPLETION_BUFFER_CAP`;
2097    /// on overflow the oldest entry is evicted and a placeholder is inserted.
2098    fn drain_background_completions(&mut self) {
2099        const BACKGROUND_COMPLETION_BUFFER_CAP: usize = 16;
2100
2101        let Some(ref mut rx) = self.runtime.lifecycle.background_completion_rx else {
2102            return;
2103        };
2104        // Non-blocking drain: collect all completions that are already ready.
2105        while let Ok(completion) = rx.try_recv() {
2106            if self.runtime.lifecycle.pending_background_completions.len()
2107                >= BACKGROUND_COMPLETION_BUFFER_CAP
2108            {
2109                tracing::warn!(
2110                    run_id = %completion.run_id,
2111                    "background completion buffer full; dropping run result"
2112                );
2113                // Buffer is full: drop the oldest queued completion and push a sentinel
2114                // for the new (incoming) run so the LLM is informed its result was lost.
2115                self.runtime
2116                    .lifecycle
2117                    .pending_background_completions
2118                    .pop_front();
2119                self.runtime
2120                    .lifecycle
2121                    .pending_background_completions
2122                    .push_back(zeph_tools::BackgroundCompletion {
2123                        run_id: completion.run_id,
2124                        exit_code: -1,
2125                        success: false,
2126                        elapsed_ms: 0,
2127                        command: completion.command,
2128                        output: format!(
2129                            "[background result for run {} dropped: buffer overflow]",
2130                            completion.run_id
2131                        ),
2132                    });
2133            } else {
2134                self.runtime
2135                    .lifecycle
2136                    .pending_background_completions
2137                    .push_back(completion);
2138            }
2139        }
2140    }
2141
2142    /// Format and drain `pending_background_completions` into a prefix string, then
2143    /// return the final merged text (prefix + user message). When there are no pending
2144    /// completions the original text is returned unchanged.
2145    fn build_user_message_text_with_bg_completions(&mut self, user_text: &str) -> String {
2146        if self
2147            .runtime
2148            .lifecycle
2149            .pending_background_completions
2150            .is_empty()
2151        {
2152            return user_text.to_owned();
2153        }
2154        let mut parts = String::new();
2155        for completion in self
2156            .runtime
2157            .lifecycle
2158            .pending_background_completions
2159            .drain(..)
2160        {
2161            let _ = write!(
2162                parts,
2163                "[Background task {} completed]\nexit_code: {}\nsuccess: {}\nelapsed_ms: {}\ncommand: {}\n\n{}\n\n",
2164                completion.run_id,
2165                completion.exit_code,
2166                completion.success,
2167                completion.elapsed_ms,
2168                completion.command,
2169                completion.output,
2170            );
2171        }
2172        parts.push_str(user_text);
2173        parts
2174    }
2175
2176    /// Poll a sub-agent until it reaches a terminal state, bridging secret requests to the
2177    /// channel. Returns a human-readable status string suitable for sending to the user.
2178    async fn poll_subagent_until_done(&mut self, task_id: &str, label: &str) -> Option<String> {
2179        use zeph_subagent::SubAgentState;
2180        let result = loop {
2181            tokio::time::sleep(std::time::Duration::from_millis(500)).await;
2182
2183            // Bridge secret requests from sub-agent to channel.confirm().
2184            // Fetch the pending request first, then release the borrow before
2185            // calling channel.confirm() (which requires &mut self).
2186            #[allow(clippy::redundant_closure_for_method_calls)]
2187            let pending = self
2188                .services
2189                .orchestration
2190                .subagent_manager
2191                .as_mut()
2192                .and_then(|m| m.try_recv_secret_request());
2193            if let Some((req_task_id, req)) = pending {
2194                // req.secret_key is pre-validated to [a-zA-Z0-9_-] in manager.rs
2195                // (SEC-P1-02), so it is safe to embed in the prompt string.
2196                let confirm_prompt = format!(
2197                    "Sub-agent requests secret '{}'. Allow?",
2198                    crate::text::truncate_to_chars(&req.secret_key, 100)
2199                );
2200                let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
2201                if let Some(mgr) = self.services.orchestration.subagent_manager.as_mut() {
2202                    if approved {
2203                        let ttl = std::time::Duration::from_mins(5);
2204                        let key = req.secret_key.clone();
2205                        if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
2206                            let _ = mgr.deliver_secret(&req_task_id, key);
2207                        }
2208                    } else {
2209                        let _ = mgr.deny_secret(&req_task_id);
2210                    }
2211                }
2212            }
2213
2214            let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2215            let statuses = mgr.statuses();
2216            let Some((_, status)) = statuses.iter().find(|(id, _)| id == task_id) else {
2217                break format!("{label} completed (no status available).");
2218            };
2219            match status.state {
2220                SubAgentState::Completed => {
2221                    let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
2222                    break format!("{label} completed: {msg}");
2223                }
2224                SubAgentState::Failed => {
2225                    let msg = status
2226                        .last_message
2227                        .clone()
2228                        .unwrap_or_else(|| "unknown error".into());
2229                    break format!("{label} failed: {msg}");
2230                }
2231                SubAgentState::Canceled => {
2232                    break format!("{label} was cancelled.");
2233                }
2234                _ => {
2235                    let _ = self
2236                        .channel
2237                        .send_status(&format!(
2238                            "{label}: turn {}/{}",
2239                            status.turns_used,
2240                            self.services
2241                                .orchestration
2242                                .subagent_manager
2243                                .as_ref()
2244                                .and_then(|m| m.agents_def(task_id))
2245                                .map_or(20, |d| d.permissions.max_turns)
2246                        ))
2247                        .await;
2248                }
2249            }
2250        };
2251        Some(result)
2252    }
2253
2254    /// Resolve a unique full `task_id` from a prefix. Returns `None` if the manager is absent,
2255    /// `Some(Err(msg))` on ambiguity/not-found, `Some(Ok(full_id))` on success.
2256    fn resolve_agent_id_prefix(&mut self, prefix: &str) -> Option<Result<String, String>> {
2257        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2258        let full_ids: Vec<String> = mgr
2259            .statuses()
2260            .into_iter()
2261            .map(|(tid, _)| tid)
2262            .filter(|tid| tid.starts_with(prefix))
2263            .collect();
2264        Some(match full_ids.as_slice() {
2265            [] => Err(format!("No sub-agent with id prefix '{prefix}'")),
2266            [fid] => Ok(fid.clone()),
2267            _ => Err(format!(
2268                "Ambiguous id prefix '{prefix}': matches {} agents",
2269                full_ids.len()
2270            )),
2271        })
2272    }
2273
2274    fn handle_agent_list(&self) -> Option<String> {
2275        use std::fmt::Write as _;
2276        let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2277        let defs = mgr.definitions();
2278        if defs.is_empty() {
2279            return Some("No sub-agent definitions found.".into());
2280        }
2281        let mut out = String::from("Available sub-agents:\n");
2282        for d in defs {
2283            let memory_label = match d.memory {
2284                Some(zeph_subagent::MemoryScope::User) => " [memory:user]",
2285                Some(zeph_subagent::MemoryScope::Project) => " [memory:project]",
2286                Some(zeph_subagent::MemoryScope::Local) => " [memory:local]",
2287                None => "",
2288            };
2289            if let Some(ref src) = d.source {
2290                let _ = writeln!(
2291                    out,
2292                    "  {}{} — {} ({})",
2293                    d.name, memory_label, d.description, src
2294                );
2295            } else {
2296                let _ = writeln!(out, "  {}{} — {}", d.name, memory_label, d.description);
2297            }
2298        }
2299        Some(out)
2300    }
2301
2302    fn handle_agent_status(&self) -> Option<String> {
2303        use std::fmt::Write as _;
2304        let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2305        let statuses = mgr.statuses();
2306        if statuses.is_empty() {
2307            return Some("No active sub-agents.".into());
2308        }
2309        let mut out = String::from("Active sub-agents:\n");
2310        for (id, s) in &statuses {
2311            let state = format!("{:?}", s.state).to_lowercase();
2312            let elapsed = s.started_at.elapsed().as_secs();
2313            let _ = writeln!(
2314                out,
2315                "  [{short}] {state}  turns={t}  elapsed={elapsed}s  {msg}",
2316                short = &id[..8.min(id.len())],
2317                t = s.turns_used,
2318                msg = s.last_message.as_deref().unwrap_or(""),
2319            );
2320            // Show memory directory path for agents with memory enabled.
2321            if let Some(def) = mgr.agents_def(id)
2322                && let Some(scope) = def.memory
2323                && let Ok(dir) = zeph_subagent::memory::resolve_memory_dir(scope, &def.name)
2324            {
2325                let _ = writeln!(out, "       memory: {}", dir.display());
2326            }
2327        }
2328        Some(out)
2329    }
2330
2331    fn handle_agent_approve(&mut self, id: &str) -> Option<String> {
2332        let full_id = match self.resolve_agent_id_prefix(id)? {
2333            Ok(fid) => fid,
2334            Err(msg) => return Some(msg),
2335        };
2336        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2337        if let Some((tid, req)) = mgr.try_recv_secret_request()
2338            && tid == full_id
2339        {
2340            let key = req.secret_key.clone();
2341            let ttl = std::time::Duration::from_mins(5);
2342            if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
2343                return Some(format!("Approve failed: {e}"));
2344            }
2345            if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
2346                return Some(format!("Secret delivery failed: {e}"));
2347            }
2348            return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
2349        }
2350        Some(format!(
2351            "No pending secret request for sub-agent '{full_id}'."
2352        ))
2353    }
2354
2355    fn handle_agent_deny(&mut self, id: &str) -> Option<String> {
2356        let full_id = match self.resolve_agent_id_prefix(id)? {
2357            Ok(fid) => fid,
2358            Err(msg) => return Some(msg),
2359        };
2360        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2361        match mgr.deny_secret(&full_id) {
2362            Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
2363            Err(e) => Some(format!("Deny failed: {e}")),
2364        }
2365    }
2366
2367    async fn handle_agent_command(&mut self, cmd: zeph_subagent::AgentCommand) -> Option<String> {
2368        use zeph_subagent::AgentCommand;
2369
2370        match cmd {
2371            AgentCommand::List => self.handle_agent_list(),
2372            AgentCommand::Background { name, prompt } => {
2373                self.handle_agent_background(&name, &prompt)
2374            }
2375            AgentCommand::Spawn { name, prompt }
2376            | AgentCommand::Mention {
2377                agent: name,
2378                prompt,
2379            } => self.handle_agent_spawn_foreground(&name, &prompt).await,
2380            AgentCommand::Status => self.handle_agent_status(),
2381            AgentCommand::Cancel { id } => self.handle_agent_cancel(&id),
2382            AgentCommand::Approve { id } => self.handle_agent_approve(&id),
2383            AgentCommand::Deny { id } => self.handle_agent_deny(&id),
2384            AgentCommand::Resume { id, prompt } => self.handle_agent_resume(&id, &prompt).await,
2385        }
2386    }
2387
2388    fn handle_agent_background(&mut self, name: &str, prompt: &str) -> Option<String> {
2389        let provider = self.provider.clone();
2390        let tool_executor = Arc::clone(&self.tool_executor);
2391        let skills = self.filtered_skills_for(name);
2392        let cfg = self.services.orchestration.subagent_config.clone();
2393        let spawn_ctx = self.build_spawn_context(&cfg);
2394        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2395        match mgr.spawn(
2396            name,
2397            prompt,
2398            provider,
2399            tool_executor,
2400            skills,
2401            &cfg,
2402            spawn_ctx,
2403        ) {
2404            Ok(id) => Some(format!(
2405                "Sub-agent '{name}' started in background (id: {short})",
2406                short = &id[..8.min(id.len())]
2407            )),
2408            Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
2409        }
2410    }
2411
2412    async fn handle_agent_spawn_foreground(&mut self, name: &str, prompt: &str) -> Option<String> {
2413        let provider = self.provider.clone();
2414        let tool_executor = Arc::clone(&self.tool_executor);
2415        let skills = self.filtered_skills_for(name);
2416        let cfg = self.services.orchestration.subagent_config.clone();
2417        let spawn_ctx = self.build_spawn_context(&cfg);
2418        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2419        let task_id = match mgr.spawn(
2420            name,
2421            prompt,
2422            provider,
2423            tool_executor,
2424            skills,
2425            &cfg,
2426            spawn_ctx,
2427        ) {
2428            Ok(id) => id,
2429            Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
2430        };
2431        let short = task_id[..8.min(task_id.len())].to_owned();
2432        let _ = self
2433            .channel
2434            .send(&format!("Sub-agent '{name}' running... (id: {short})"))
2435            .await;
2436        let label = format!("Sub-agent '{name}'");
2437        self.poll_subagent_until_done(&task_id, &label).await
2438    }
2439
2440    fn handle_agent_cancel(&mut self, id: &str) -> Option<String> {
2441        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2442        // Accept prefix match on task_id.
2443        let ids: Vec<String> = mgr
2444            .statuses()
2445            .into_iter()
2446            .map(|(task_id, _)| task_id)
2447            .filter(|task_id| task_id.starts_with(id))
2448            .collect();
2449        match ids.as_slice() {
2450            [] => Some(format!("No sub-agent with id prefix '{id}'")),
2451            [full_id] => {
2452                let full_id = full_id.clone();
2453                match mgr.cancel(&full_id) {
2454                    Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
2455                    Err(e) => Some(format!("Cancel failed: {e}")),
2456                }
2457            }
2458            _ => Some(format!(
2459                "Ambiguous id prefix '{id}': matches {} agents",
2460                ids.len()
2461            )),
2462        }
2463    }
2464
2465    async fn handle_agent_resume(&mut self, id: &str, prompt: &str) -> Option<String> {
2466        let cfg = self.services.orchestration.subagent_config.clone();
2467        // Resolve definition name from transcript meta before spawning so we can
2468        // look up skills by definition name rather than the UUID prefix (S1 fix).
2469        let def_name = {
2470            let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2471            match mgr.def_name_for_resume(id, &cfg) {
2472                Ok(name) => name,
2473                Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2474            }
2475        };
2476        let skills = self.filtered_skills_for(&def_name);
2477        let provider = self.provider.clone();
2478        let tool_executor = Arc::clone(&self.tool_executor);
2479        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2480        let (task_id, _) = match mgr.resume(id, prompt, provider, tool_executor, skills, &cfg) {
2481            Ok(pair) => pair,
2482            Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2483        };
2484        let short = task_id[..8.min(task_id.len())].to_owned();
2485        let _ = self
2486            .channel
2487            .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
2488            .await;
2489        self.poll_subagent_until_done(&task_id, "Resumed sub-agent")
2490            .await
2491    }
2492
2493    fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
2494        let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2495        let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
2496        let reg = self.services.skill.registry.read();
2497        match zeph_subagent::filter_skills(&reg, &def.skills) {
2498            Ok(skills) => {
2499                let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
2500                if bodies.is_empty() {
2501                    None
2502                } else {
2503                    Some(bodies)
2504                }
2505            }
2506            Err(e) => {
2507                tracing::warn!(error = %e, "skill filtering failed for sub-agent");
2508                None
2509            }
2510        }
2511    }
2512
2513    /// Build a `SpawnContext` from current agent state for sub-agent spawning.
2514    fn build_spawn_context(
2515        &self,
2516        cfg: &zeph_config::SubAgentConfig,
2517    ) -> zeph_subagent::SpawnContext {
2518        zeph_subagent::SpawnContext {
2519            parent_messages: self.extract_parent_messages(cfg),
2520            parent_cancel: Some(self.runtime.lifecycle.cancel_token.clone()),
2521            parent_provider_name: {
2522                let name = &self.runtime.config.active_provider_name;
2523                if name.is_empty() {
2524                    None
2525                } else {
2526                    Some(name.clone())
2527                }
2528            },
2529            spawn_depth: self.runtime.config.spawn_depth,
2530            mcp_tool_names: self.extract_mcp_tool_names(),
2531            // F3 spec 050 §4: propagate seeded score when parent is >= Elevated.
2532            seed_trajectory_score: {
2533                let child = self.services.security.trajectory.spawn_child();
2534                let score = child.score_now();
2535                if score > 0.0 { Some(score) } else { None }
2536            },
2537        }
2538    }
2539
2540    /// Extract recent parent messages for history propagation (Section 5.7 in spec).
2541    ///
2542    /// Filters system messages, takes last `context_window_turns * 2` messages,
2543    /// and applies a 25% context window cap using a 4-chars-per-token heuristic.
2544    fn extract_parent_messages(
2545        &self,
2546        config: &zeph_config::SubAgentConfig,
2547    ) -> Vec<zeph_llm::provider::Message> {
2548        use zeph_llm::provider::Role;
2549        if config.context_window_turns == 0 {
2550            return Vec::new();
2551        }
2552        let non_system: Vec<_> = self
2553            .msg
2554            .messages
2555            .iter()
2556            .filter(|m| m.role != Role::System)
2557            .cloned()
2558            .collect();
2559        let take_count = config.context_window_turns * 2;
2560        let start = non_system.len().saturating_sub(take_count);
2561        let mut msgs = non_system[start..].to_vec();
2562
2563        // Cap at 25% of model context window (rough 4-chars-per-token heuristic).
2564        let max_chars = 128_000usize / 4; // conservative default; 25% of 128K tokens
2565        let mut total_chars: usize = 0;
2566        let mut keep = msgs.len();
2567        for (i, m) in msgs.iter().enumerate() {
2568            total_chars += m.content.len();
2569            if total_chars > max_chars {
2570                keep = i;
2571                break;
2572            }
2573        }
2574        if keep < msgs.len() {
2575            tracing::info!(
2576                kept = keep,
2577                requested = config.context_window_turns * 2,
2578                "[subagent] truncated parent history from {} to {} turns due to token budget",
2579                config.context_window_turns * 2,
2580                keep
2581            );
2582            msgs.truncate(keep);
2583        }
2584        msgs
2585    }
2586
2587    /// Extract MCP tool names from the tool executor for diagnostic annotation.
2588    fn extract_mcp_tool_names(&self) -> Vec<String> {
2589        self.tool_executor
2590            .tool_definitions_erased()
2591            .into_iter()
2592            .filter(|t| t.id.starts_with("mcp_"))
2593            .map(|t| t.id.to_string())
2594            .collect()
2595    }
2596
2597    /// Classify a skill directory's source kind using on-disk markers and the bundled allowlist.
2598    ///
2599    /// Must be called from a blocking context (uses synchronous FS I/O).
2600    fn classify_source_kind(
2601        skill_dir: &std::path::Path,
2602        managed_dir: Option<&std::path::PathBuf>,
2603        bundled_names: &std::collections::HashSet<String>,
2604    ) -> zeph_memory::store::SourceKind {
2605        if managed_dir.is_some_and(|d| skill_dir.starts_with(d)) {
2606            let skill_name = skill_dir.file_name().and_then(|n| n.to_str()).unwrap_or("");
2607            let has_marker = skill_dir.join(".bundled").exists();
2608            if has_marker && bundled_names.contains(skill_name) {
2609                zeph_memory::store::SourceKind::Bundled
2610            } else {
2611                if has_marker {
2612                    tracing::warn!(
2613                        skill = %skill_name,
2614                        "skill has .bundled marker but is not in the bundled skill \
2615                         allowlist — classifying as Hub"
2616                    );
2617                }
2618                zeph_memory::store::SourceKind::Hub
2619            }
2620        } else {
2621            zeph_memory::store::SourceKind::Local
2622        }
2623    }
2624
2625    /// Update trust DB records for all reloaded skills.
2626    async fn update_trust_for_reloaded_skills(
2627        &mut self,
2628        all_meta: &[zeph_skills::loader::SkillMeta],
2629    ) {
2630        // Clone Arc before any .await so no &self fields are held across suspension points.
2631        let memory = self.services.memory.persistence.memory.clone();
2632        let Some(memory) = memory else {
2633            return;
2634        };
2635        let trust_cfg = self.services.skill.trust_config.clone();
2636        let managed_dir = self.services.skill.managed_dir.clone();
2637        let bundled_names: std::collections::HashSet<String> =
2638            zeph_skills::bundled_skill_names().into_iter().collect();
2639        for meta in all_meta {
2640            // Compute hash and classify source_kind in spawn_blocking — both are blocking FS calls
2641            // (.bundled marker .exists() and compute_skill_hash both do std::fs I/O).
2642            let skill_dir = meta.skill_dir.clone();
2643            let managed_dir_ref = managed_dir.clone();
2644            let bundled_names_ref = bundled_names.clone();
2645            let fs_result: Option<(String, zeph_memory::store::SourceKind)> =
2646                tokio::task::spawn_blocking(move || {
2647                    let hash = zeph_skills::compute_skill_hash(&skill_dir).ok()?;
2648                    let source_kind = Self::classify_source_kind(
2649                        &skill_dir,
2650                        managed_dir_ref.as_ref(),
2651                        &bundled_names_ref,
2652                    );
2653                    Some((hash, source_kind))
2654                })
2655                .await
2656                .unwrap_or(None);
2657
2658            let Some((current_hash, source_kind)) = fs_result else {
2659                tracing::warn!("failed to compute hash for '{}'", meta.name);
2660                continue;
2661            };
2662            let initial_level = match source_kind {
2663                zeph_memory::store::SourceKind::Bundled => &trust_cfg.bundled_level,
2664                zeph_memory::store::SourceKind::Hub => &trust_cfg.default_level,
2665                zeph_memory::store::SourceKind::Local | zeph_memory::store::SourceKind::File => {
2666                    &trust_cfg.local_level
2667                }
2668            };
2669            let existing = memory
2670                .sqlite()
2671                .load_skill_trust(&meta.name)
2672                .await
2673                .ok()
2674                .flatten();
2675            let trust_level_str = if let Some(ref row) = existing {
2676                if row.blake3_hash != current_hash {
2677                    trust_cfg.hash_mismatch_level.to_string()
2678                } else if row.source_kind != source_kind {
2679                    // source_kind changed (e.g., hub → bundled on upgrade).
2680                    // Never override an explicit operator block. For active trust levels,
2681                    // adopt the source-kind initial level when it grants more trust.
2682                    let stored = row
2683                        .trust_level
2684                        .parse::<zeph_common::SkillTrustLevel>()
2685                        .unwrap_or_else(|_| {
2686                            tracing::warn!(
2687                                skill = %meta.name,
2688                                raw = %row.trust_level,
2689                                "unrecognised trust_level in DB, treating as quarantined"
2690                            );
2691                            zeph_common::SkillTrustLevel::Quarantined
2692                        });
2693                    if !stored.is_active() || stored.severity() <= initial_level.severity() {
2694                        row.trust_level.clone()
2695                    } else {
2696                        initial_level.to_string()
2697                    }
2698                } else {
2699                    row.trust_level.clone()
2700                }
2701            } else {
2702                initial_level.to_string()
2703            };
2704            let source_path = meta.skill_dir.to_str();
2705            if let Err(e) = memory
2706                .sqlite()
2707                .upsert_skill_trust(
2708                    &meta.name,
2709                    &trust_level_str,
2710                    source_kind,
2711                    None,
2712                    source_path,
2713                    &current_hash,
2714                )
2715                .await
2716            {
2717                tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
2718            }
2719        }
2720    }
2721
2722    /// Rebuild or sync the in-memory skill matcher and BM25 index after a registry update.
2723    async fn rebuild_skill_matcher(&mut self, all_meta: &[&zeph_skills::loader::SkillMeta]) {
2724        let provider = self.embedding_provider.clone();
2725        let embed_timeout =
2726            std::time::Duration::from_secs(self.runtime.config.timeouts.embedding_seconds);
2727        let embed_fn = move |text: &str| -> zeph_skills::matcher::EmbedFuture {
2728            let owned = text.to_owned();
2729            let p = provider.clone();
2730            Box::pin(async move {
2731                if let Ok(result) = tokio::time::timeout(embed_timeout, p.embed(&owned)).await {
2732                    result
2733                } else {
2734                    tracing::warn!(
2735                        timeout_secs = embed_timeout.as_secs(),
2736                        "skill matcher: embedding timed out"
2737                    );
2738                    Err(zeph_llm::LlmError::Timeout)
2739                }
2740            })
2741        };
2742
2743        let needs_inmemory_rebuild = !self
2744            .services
2745            .skill
2746            .matcher
2747            .as_ref()
2748            .is_some_and(SkillMatcherBackend::is_qdrant);
2749
2750        if needs_inmemory_rebuild {
2751            self.services.skill.matcher = SkillMatcher::new(all_meta, embed_fn)
2752                .await
2753                .map(SkillMatcherBackend::InMemory);
2754        } else if let Some(ref mut backend) = self.services.skill.matcher {
2755            let _ = self.channel.send_status("syncing skill index...").await;
2756            let on_progress: Option<Box<dyn Fn(usize, usize) + Send>> =
2757                self.services.session.status_tx.clone().map(
2758                    |tx| -> Box<dyn Fn(usize, usize) + Send> {
2759                        Box::new(move |completed, total| {
2760                            let msg = format!("Syncing skills: {completed}/{total}");
2761                            let _ = tx.send(msg);
2762                        })
2763                    },
2764                );
2765            if let Err(e) = backend
2766                .sync(
2767                    all_meta,
2768                    &self.services.skill.embedding_model,
2769                    embed_fn,
2770                    on_progress,
2771                )
2772                .await
2773            {
2774                tracing::warn!("failed to sync skill embeddings: {e:#}");
2775            }
2776        }
2777
2778        if self.services.skill.hybrid_search {
2779            let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
2780            let _ = self.channel.send_status("rebuilding search index...").await;
2781            self.services.skill.rebuild_bm25(&descs);
2782        }
2783    }
2784
2785    #[cfg_attr(
2786        feature = "profiling",
2787        tracing::instrument(name = "skill.hot_reload", skip_all)
2788    )]
2789    async fn reload_skills(&mut self) {
2790        let old_fp = self.services.skill.fingerprint();
2791        let reload_paths = if let Some(ref supplier) = self.services.skill.plugin_dirs_supplier {
2792            let plugin_dirs = supplier();
2793            let mut paths = self.services.skill.skill_paths.clone();
2794            for dir in plugin_dirs {
2795                if !paths.contains(&dir) {
2796                    paths.push(dir);
2797                }
2798            }
2799            paths
2800        } else {
2801            self.services.skill.skill_paths.clone()
2802        };
2803        self.services.skill.registry.write().reload(&reload_paths);
2804        if self.services.skill.fingerprint() == old_fp {
2805            return;
2806        }
2807        let _ = self.channel.send_status("reloading skills...").await;
2808
2809        let all_meta = self
2810            .services
2811            .skill
2812            .registry
2813            .read()
2814            .all_meta()
2815            .into_iter()
2816            .cloned()
2817            .collect::<Vec<_>>();
2818
2819        self.update_trust_for_reloaded_skills(&all_meta).await;
2820
2821        let all_meta_refs = all_meta.iter().collect::<Vec<_>>();
2822        self.rebuild_skill_matcher(&all_meta_refs).await;
2823
2824        let all_skills: Vec<Skill> = {
2825            let reg = self.services.skill.registry.read();
2826            reg.all_meta()
2827                .iter()
2828                .filter_map(|m| reg.skill(&m.name).ok())
2829                .collect()
2830        };
2831        let trust_map = self.build_skill_trust_map().await;
2832        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
2833        let skills_prompt =
2834            state::SkillState::rebuild_prompt(&all_skills, &trust_map, &empty_health);
2835        self.services
2836            .skill
2837            .last_skills_prompt
2838            .clone_from(&skills_prompt);
2839        let system_prompt = build_system_prompt(&skills_prompt, None);
2840        if let Some(msg) = self.msg.messages.first_mut() {
2841            msg.content = system_prompt;
2842        }
2843
2844        let _ = self.channel.send_status("").await;
2845        tracing::info!(
2846            "reloaded {} skill(s)",
2847            self.services.skill.registry.read().all_meta().len()
2848        );
2849    }
2850
2851    fn reload_instructions(&mut self) {
2852        // Drain any additional queued events before reloading to avoid redundant reloads.
2853        if let Some(ref mut rx) = self.runtime.instructions.reload_rx {
2854            while rx.try_recv().is_ok() {}
2855        }
2856        let Some(ref state) = self.runtime.instructions.reload_state else {
2857            return;
2858        };
2859        let new_blocks = crate::instructions::load_instructions(
2860            &state.base_dir,
2861            &state.provider_kinds,
2862            &state.explicit_files,
2863            state.auto_detect,
2864        );
2865        let old_sources: std::collections::HashSet<_> = self
2866            .runtime
2867            .instructions
2868            .blocks
2869            .iter()
2870            .map(|b| &b.source)
2871            .collect();
2872        let new_sources: std::collections::HashSet<_> =
2873            new_blocks.iter().map(|b| &b.source).collect();
2874        for added in new_sources.difference(&old_sources) {
2875            tracing::info!(path = %added.display(), "instruction file added");
2876        }
2877        for removed in old_sources.difference(&new_sources) {
2878            tracing::info!(path = %removed.display(), "instruction file removed");
2879        }
2880        tracing::info!(
2881            old_count = self.runtime.instructions.blocks.len(),
2882            new_count = new_blocks.len(),
2883            "reloaded instruction files"
2884        );
2885        self.runtime.instructions.blocks = new_blocks;
2886    }
2887
2888    fn reload_config(&mut self) {
2889        let Some(path) = self.runtime.lifecycle.config_path.clone() else {
2890            return;
2891        };
2892        let Some(config) = self.load_config_with_overlay(&path) else {
2893            return;
2894        };
2895        let budget_tokens = resolve_context_budget(&config, &self.provider);
2896        self.runtime.config.security = config.security;
2897        self.runtime.config.timeouts = config.timeouts;
2898        self.runtime.config.redact_credentials = config.memory.redact_credentials;
2899        self.services.memory.persistence.history_limit = config.memory.history_limit;
2900        self.services.memory.persistence.recall_limit = config.memory.semantic.recall_limit;
2901        self.services.memory.compaction.summarization_threshold =
2902            config.memory.summarization_threshold;
2903        self.services.skill.max_active_skills = config.skills.max_active_skills.get();
2904        self.services.skill.disambiguation_threshold = config.skills.disambiguation_threshold;
2905        self.services.skill.min_injection_score = config.skills.min_injection_score;
2906        self.services.skill.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
2907        self.services.skill.hybrid_search = config.skills.hybrid_search;
2908        self.services.skill.two_stage_matching = config.skills.two_stage_matching;
2909        self.services.skill.confusability_threshold =
2910            config.skills.confusability_threshold.clamp(0.0, 1.0);
2911        config
2912            .skills
2913            .generation_provider
2914            .as_str()
2915            .clone_into(&mut self.services.skill.generation_provider_name);
2916        self.services.skill.generation_output_dir =
2917            config.skills.generation_output_dir.as_deref().map(|p| {
2918                if let Some(stripped) = p.strip_prefix("~/") {
2919                    dirs::home_dir()
2920                        .map_or_else(|| std::path::PathBuf::from(p), |h| h.join(stripped))
2921                } else {
2922                    std::path::PathBuf::from(p)
2923                }
2924            });
2925
2926        self.context_manager.budget = Some(
2927            ContextBudget::new(budget_tokens, 0.20).with_graph_enabled(config.memory.graph.enabled),
2928        );
2929
2930        {
2931            let graph_cfg = &config.memory.graph;
2932            if graph_cfg.rpe.enabled {
2933                // Re-create router only if it doesn't exist yet; preserve state on hot-reload.
2934                if self.services.memory.extraction.rpe_router.is_none() {
2935                    self.services.memory.extraction.rpe_router =
2936                        Some(std::sync::Mutex::new(zeph_memory::RpeRouter::new(
2937                            graph_cfg.rpe.threshold,
2938                            graph_cfg.rpe.max_skip_turns,
2939                        )));
2940                }
2941            } else {
2942                self.services.memory.extraction.rpe_router = None;
2943            }
2944            self.services.memory.extraction.graph_config = graph_cfg.clone();
2945        }
2946        self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
2947        self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
2948        self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
2949        self.context_manager.compaction_cooldown_turns = config.memory.compaction_cooldown_turns;
2950        self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
2951        self.context_manager.compression = config.memory.compression.clone();
2952        self.context_manager.routing = config.memory.store_routing.clone();
2953        // Resolve routing_classifier_provider from the provider pool (#2484).
2954        self.context_manager.store_routing_provider = if config
2955            .memory
2956            .store_routing
2957            .routing_classifier_provider
2958            .is_empty()
2959        {
2960            None
2961        } else {
2962            let resolved = self.resolve_background_provider(
2963                &config.memory.store_routing.routing_classifier_provider,
2964            );
2965            Some(std::sync::Arc::new(resolved))
2966        };
2967        self.services
2968            .memory
2969            .persistence
2970            .cross_session_score_threshold = config.memory.cross_session_score_threshold;
2971
2972        self.services.index.repo_map_tokens = config.index.repo_map_tokens;
2973        self.services.index.repo_map_ttl =
2974            std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
2975
2976        self.services
2977            .session
2978            .hooks_config
2979            .cwd_changed
2980            .clone_from(&config.hooks.cwd_changed);
2981        self.services
2982            .session
2983            .hooks_config
2984            .permission_denied
2985            .clone_from(&config.hooks.permission_denied);
2986        self.services
2987            .session
2988            .hooks_config
2989            .turn_complete
2990            .clone_from(&config.hooks.turn_complete);
2991        // file_changed_hooks require watcher restart to take effect — skipped here.
2992
2993        tracing::info!("config reloaded");
2994    }
2995
2996    /// Load config from disk, apply plugin overlays, and warn on shell divergence.
2997    ///
2998    /// Returns `None` when loading or overlay merge fails (caller keeps prior runtime state).
2999    fn load_config_with_overlay(&mut self, path: &std::path::Path) -> Option<Config> {
3000        let mut config = match Config::load(path) {
3001            Ok(c) => c,
3002            Err(e) => {
3003                tracing::warn!("config reload failed: {e:#}");
3004                return None;
3005            }
3006        };
3007
3008        // Re-apply plugin overlays. On error, keep previous runtime state intact.
3009        let new_overlay = if self.runtime.lifecycle.plugins_dir.as_os_str().is_empty() {
3010            None
3011        } else {
3012            match zeph_plugins::apply_plugin_config_overlays(
3013                &mut config,
3014                &self.runtime.lifecycle.plugins_dir,
3015            ) {
3016                Ok(o) => Some(o),
3017                Err(e) => {
3018                    tracing::warn!(
3019                        "plugin overlay merge failed during reload: {e:#}; \
3020                         keeping previous runtime state"
3021                    );
3022                    return None;
3023                }
3024            }
3025        };
3026
3027        // M4: detect shell-level divergence from the baked-in executor and warn loudly.
3028        // ShellExecutor is not rebuilt on hot-reload; only skill threshold is live.
3029        // A follow-up P2 issue tracks live-rebuild of ShellExecutor.
3030        if let Some(ref overlay) = new_overlay {
3031            self.warn_on_shell_overlay_divergence(overlay, &config);
3032        }
3033        Some(config)
3034    }
3035
3036    /// React to shell policy divergence detected on hot-reload.
3037    ///
3038    /// `blocked_commands` is rebuilt live via `ShellPolicyHandle::rebuild` — no restart needed.
3039    /// `allowed_commands` cannot be rebuilt (feeds sandbox path intersection at construction time)
3040    /// — emit a warn + status banner when it changes.
3041    fn warn_on_shell_overlay_divergence(
3042        &self,
3043        new_overlay: &zeph_plugins::ResolvedOverlay,
3044        config: &Config,
3045    ) {
3046        let new_blocked: Vec<String> = {
3047            let mut v = config.tools.shell.blocked_commands.clone();
3048            v.sort();
3049            v
3050        };
3051        let new_allowed: Vec<String> = {
3052            let mut v = config.tools.shell.allowed_commands.clone();
3053            v.sort();
3054            v
3055        };
3056
3057        let startup = &self.runtime.lifecycle.startup_shell_overlay;
3058        let blocked_changed = new_blocked != startup.blocked;
3059        let allowed_changed = new_allowed != startup.allowed;
3060
3061        // blocked_commands IS rebuilt live — emit info-level confirmation only.
3062        if blocked_changed && let Some(ref h) = self.runtime.lifecycle.shell_policy_handle {
3063            h.rebuild(&config.tools.shell);
3064            tracing::info!(
3065                blocked_count = h.snapshot_blocked().len(),
3066                "shell blocked_commands rebuilt from hot-reload"
3067            );
3068        }
3069
3070        // allowed_commands cannot be rebuilt — sandbox path intersection is computed at
3071        // executor construction time. Warn loudly so the user restarts.
3072        //
3073        // Note: when base `allowed_commands` is empty (the default), the overlay's
3074        // intersection semantics keep it empty, so this branch is silently unreachable
3075        // for users who do not set a non-empty base list.
3076        if allowed_changed {
3077            let msg = "plugin config overlay changed shell allowed_commands; RESTART REQUIRED \
3078                 for sandbox path recomputation (blocked_commands was rebuilt live)";
3079            tracing::warn!("{msg}");
3080            if let Some(ref tx) = self.services.session.status_tx {
3081                let _ = tx.send(msg.to_owned());
3082            }
3083        }
3084
3085        let _ = new_overlay;
3086    }
3087
3088    /// Run `SideQuest` tool output eviction pass (#1885).
3089    ///
3090    /// PERF-1 fix: two-phase non-blocking design.
3091    ///
3092    /// Phase 1 (apply, this turn): check for a background LLM result spawned last turn,
3093    /// validate and apply it immediately.
3094    ///
3095    /// Phase 2 (schedule, this turn): rebuild cursors and spawn a background `tokio::spawn`
3096    /// task for the LLM call. The result is stored in `pending_sidequest_result` and applied
3097    /// next turn, so the current agent turn is never blocked by the LLM call.
3098    fn maybe_sidequest_eviction(&mut self) {
3099        // S1 runtime guard: warn when SideQuest is enabled alongside a non-Reactive pruning
3100        // strategy — the two systems share the same pool of evictable tool outputs and can
3101        // interfere. Disable sidequest.enabled when pruning_strategy != Reactive.
3102        if self.services.sidequest.config.enabled {
3103            use crate::config::PruningStrategy;
3104            if !matches!(
3105                self.context_manager.compression.pruning_strategy,
3106                PruningStrategy::Reactive
3107            ) {
3108                tracing::warn!(
3109                    strategy = ?self.context_manager.compression.pruning_strategy,
3110                    "sidequest is enabled alongside a non-Reactive pruning strategy; \
3111                     consider disabling sidequest.enabled to avoid redundant eviction"
3112                );
3113            }
3114        }
3115
3116        // Guard: do not evict while a focus session is active.
3117        if self.services.focus.is_active() {
3118            tracing::debug!("sidequest: skipping — focus session active");
3119            // Drop any pending result — cursors may be stale relative to focus truncation.
3120            self.services.compression.pending_sidequest_result = None;
3121            return;
3122        }
3123
3124        // Phase 1: apply pending result from last turn's background LLM call.
3125        self.sidequest_apply_pending();
3126
3127        // Phase 2: rebuild cursors and schedule the next background eviction LLM call.
3128        self.sidequest_schedule_next();
3129    }
3130
3131    fn sidequest_apply_pending(&mut self) {
3132        let Some(handle) = self.services.compression.pending_sidequest_result.take() else {
3133            return;
3134        };
3135        // `try_join` is non-blocking: if the task isn't done yet, `Err(handle)` is returned
3136        // and we reschedule below.
3137        let result = match handle.try_join() {
3138            Ok(result) => result,
3139            Err(_handle) => {
3140                // Task still running — drop it; a fresh one is scheduled below.
3141                tracing::debug!("sidequest: background LLM task not yet complete, rescheduling");
3142                return;
3143            }
3144        };
3145        match result {
3146            Ok(Some(evicted_indices)) if !evicted_indices.is_empty() => {
3147                let cursors_snapshot = self.services.sidequest.tool_output_cursors.clone();
3148                let freed = self.services.sidequest.apply_eviction(
3149                    &mut self.msg.messages,
3150                    &evicted_indices,
3151                    &self.runtime.metrics.token_counter,
3152                );
3153                if freed > 0 {
3154                    self.recompute_prompt_tokens();
3155                    // C1 fix: prevent maybe_compact() from firing in the same turn.
3156                    // cooldown=0: eviction does not impose post-compaction cooldown.
3157                    self.context_manager.compaction =
3158                        crate::agent::context_manager::CompactionState::CompactedThisTurn {
3159                            cooldown: 0,
3160                        };
3161                    tracing::info!(
3162                        freed_tokens = freed,
3163                        evicted_cursors = evicted_indices.len(),
3164                        pass = self.services.sidequest.passes_run,
3165                        "sidequest eviction complete"
3166                    );
3167                    if let Some(ref d) = self.runtime.debug.debug_dumper {
3168                        d.dump_sidequest_eviction(&cursors_snapshot, &evicted_indices, freed);
3169                    }
3170                    if let Some(ref tx) = self.services.session.status_tx {
3171                        let _ = tx.send(format!("SideQuest evicted {freed} tokens"));
3172                    }
3173                } else {
3174                    // apply_eviction returned 0 — clear spinner so it doesn't dangle.
3175                    if let Some(ref tx) = self.services.session.status_tx {
3176                        let _ = tx.send(String::new());
3177                    }
3178                }
3179            }
3180            Ok(None | Some(_)) => {
3181                tracing::debug!("sidequest: pending result: no cursors to evict");
3182                if let Some(ref tx) = self.services.session.status_tx {
3183                    let _ = tx.send(String::new());
3184                }
3185            }
3186            Err(e) => {
3187                tracing::debug!("sidequest: background task error: {e}");
3188                if let Some(ref tx) = self.services.session.status_tx {
3189                    let _ = tx.send(String::new());
3190                }
3191            }
3192        }
3193    }
3194
3195    fn sidequest_schedule_next(&mut self) {
3196        use zeph_llm::provider::{Message, MessageMetadata, Role};
3197
3198        self.services
3199            .sidequest
3200            .rebuild_cursors(&self.msg.messages, &self.runtime.metrics.token_counter);
3201
3202        if self.services.sidequest.tool_output_cursors.is_empty() {
3203            tracing::debug!("sidequest: no eligible cursors");
3204            return;
3205        }
3206
3207        let prompt = self.services.sidequest.build_eviction_prompt();
3208        let max_eviction_ratio = self.services.sidequest.config.max_eviction_ratio;
3209        let n_cursors = self.services.sidequest.tool_output_cursors.len();
3210        // Clone the provider so the spawn closure owns it without borrowing self.
3211        let provider = self.summary_or_primary_provider().clone();
3212
3213        let eviction_future = async move {
3214            let msgs = [Message {
3215                role: Role::User,
3216                content: prompt,
3217                parts: vec![],
3218                metadata: MessageMetadata::default(),
3219            }];
3220            let response =
3221                match tokio::time::timeout(std::time::Duration::from_secs(5), provider.chat(&msgs))
3222                    .await
3223                {
3224                    Ok(Ok(r)) => r,
3225                    Ok(Err(e)) => {
3226                        tracing::debug!("sidequest bg: LLM call failed: {e:#}");
3227                        return None;
3228                    }
3229                    Err(_) => {
3230                        tracing::debug!("sidequest bg: LLM call timed out");
3231                        return None;
3232                    }
3233                };
3234
3235            let start = response.find('{')?;
3236            let end = response.rfind('}')?;
3237            if start > end {
3238                return None;
3239            }
3240            let json_slice = &response[start..=end];
3241            let parsed: sidequest::EvictionResponse = serde_json::from_str(json_slice).ok()?;
3242            let mut valid: Vec<usize> = parsed
3243                .del_cursors
3244                .into_iter()
3245                .filter(|&c| c < n_cursors)
3246                .collect();
3247            valid.sort_unstable();
3248            valid.dedup();
3249            #[allow(
3250                clippy::cast_precision_loss,
3251                clippy::cast_possible_truncation,
3252                clippy::cast_sign_loss
3253            )]
3254            let max_evict = ((n_cursors as f32) * max_eviction_ratio).ceil() as usize;
3255            valid.truncate(max_evict);
3256            Some(valid)
3257        };
3258        let handle = self.runtime.lifecycle.task_supervisor.spawn_oneshot(
3259            std::sync::Arc::from("agent.sidequest.eviction"),
3260            move || eviction_future,
3261        );
3262        self.services.compression.pending_sidequest_result = Some(handle);
3263        tracing::debug!("sidequest: background LLM eviction task spawned");
3264        if let Some(ref tx) = self.services.session.status_tx {
3265            let _ = tx.send("SideQuest: scoring tool outputs...".into());
3266        }
3267    }
3268
3269    /// Return an `McpDispatch` adapter backed by the agent's MCP manager, if present.
3270    fn mcp_dispatch(&self) -> Option<McpManagerDispatch> {
3271        self.services
3272            .mcp
3273            .manager
3274            .as_ref()
3275            .map(|m| McpManagerDispatch(Arc::clone(m)))
3276    }
3277
3278    /// Check if the process cwd has changed since last call and fire `CwdChanged` hooks.
3279    ///
3280    /// Called after each tool batch completes. The check is a single syscall and has
3281    /// negligible cost. Only fires when cwd actually changed (defense-in-depth: normally
3282    /// only `set_working_directory` changes cwd; shell child processes cannot affect it).
3283    pub(crate) async fn check_cwd_changed(&mut self) {
3284        let current = match std::env::current_dir() {
3285            Ok(p) => p,
3286            Err(e) => {
3287                tracing::warn!("check_cwd_changed: failed to get cwd: {e}");
3288                return;
3289            }
3290        };
3291        if current == self.runtime.lifecycle.last_known_cwd {
3292            return;
3293        }
3294        let old_cwd =
3295            std::mem::replace(&mut self.runtime.lifecycle.last_known_cwd, current.clone());
3296        self.services.session.env_context.working_dir = current.display().to_string();
3297
3298        tracing::info!(
3299            old = %old_cwd.display(),
3300            new = %current.display(),
3301            "working directory changed"
3302        );
3303
3304        let _ = self
3305            .channel
3306            .send_status("Working directory changed\u{2026}")
3307            .await;
3308
3309        let hooks = self.services.session.hooks_config.cwd_changed.clone();
3310        if hooks.is_empty() {
3311            tracing::debug!("CwdChanged: no hooks configured, skipping");
3312        } else {
3313            tracing::debug!(count = hooks.len(), "CwdChanged: firing hooks");
3314            let mut env = std::collections::HashMap::new();
3315            env.insert("ZEPH_OLD_CWD".to_owned(), old_cwd.display().to_string());
3316            env.insert("ZEPH_NEW_CWD".to_owned(), current.display().to_string());
3317            let dispatch = self.mcp_dispatch();
3318            let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
3319                .as_ref()
3320                .map(|d| d as &dyn zeph_subagent::McpDispatch);
3321            if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp).await {
3322                tracing::warn!(error = %e, "CwdChanged hook failed");
3323            } else {
3324                tracing::info!(count = hooks.len(), "CwdChanged: hooks fired");
3325            }
3326        }
3327
3328        let _ = self.channel.send_status("").await;
3329    }
3330
3331    /// Handle a `FileChangedEvent` from the file watcher.
3332    pub(crate) async fn handle_file_changed(
3333        &mut self,
3334        event: crate::file_watcher::FileChangedEvent,
3335    ) {
3336        tracing::info!(path = %event.path.display(), "file changed");
3337
3338        let _ = self
3339            .channel
3340            .send_status("Running file-change hook\u{2026}")
3341            .await;
3342
3343        let hooks = self
3344            .services
3345            .session
3346            .hooks_config
3347            .file_changed_hooks
3348            .clone();
3349        if hooks.is_empty() {
3350            tracing::debug!(path = %event.path.display(), "FileChanged: no hooks configured, skipping");
3351        } else {
3352            tracing::debug!(count = hooks.len(), path = %event.path.display(), "FileChanged: firing hooks");
3353            let mut env = std::collections::HashMap::new();
3354            env.insert(
3355                "ZEPH_CHANGED_PATH".to_owned(),
3356                event.path.display().to_string(),
3357            );
3358            let dispatch = self.mcp_dispatch();
3359            let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
3360                .as_ref()
3361                .map(|d| d as &dyn zeph_subagent::McpDispatch);
3362            if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp).await {
3363                tracing::warn!(error = %e, "FileChanged hook failed");
3364            } else {
3365                tracing::info!(count = hooks.len(), path = %event.path.display(), "FileChanged: hooks fired");
3366            }
3367        }
3368
3369        let _ = self.channel.send_status("").await;
3370    }
3371
3372    /// If the compression spectrum is enabled and a promotion engine is wired, spawn a
3373    /// background scan task.
3374    ///
3375    /// The task loads the most-recent episodic window from `SemanticMemory`, runs the
3376    /// greedy clustering scan, and calls `promote` for each qualifying candidate.
3377    ///
3378    /// Supervised via [`agent_supervisor::BackgroundSupervisor`] under
3379    /// [`agent_supervisor::TaskClass::Enrichment`] — dropped under high load rather than
3380    /// blocking the turn.
3381    pub(super) fn maybe_spawn_promotion_scan(&mut self) {
3382        let Some(engine) = self.services.promotion_engine.clone() else {
3383            return;
3384        };
3385
3386        let Some(memory) = self.services.memory.persistence.memory.clone() else {
3387            return;
3388        };
3389
3390        // Use a conservative window cap. The engine's own PromotionConfig thresholds
3391        // determine whether a cluster actually qualifies; this is just the DB scan limit.
3392        let promotion_window = 200usize;
3393
3394        let accepted = self.runtime.lifecycle.supervisor.spawn(
3395            agent_supervisor::TaskClass::Enrichment,
3396            "compression_spectrum.promotion_scan",
3397            async move {
3398                let span = tracing::info_span!("memory.compression.promote.background");
3399                let _enter = span.enter();
3400
3401                let window = match memory.load_promotion_window(promotion_window).await {
3402                    Ok(w) => w,
3403                    Err(e) => {
3404                        tracing::warn!(error = %e, "promotion scan: failed to load window");
3405                        return;
3406                    }
3407                };
3408
3409                if window.is_empty() {
3410                    return;
3411                }
3412
3413                let candidates = match engine.scan(&window).await {
3414                    Ok(c) => c,
3415                    Err(e) => {
3416                        tracing::warn!(error = %e, "promotion scan: clustering failed");
3417                        return;
3418                    }
3419                };
3420
3421                for candidate in &candidates {
3422                    if let Err(e) = engine.promote(candidate).await {
3423                        tracing::warn!(
3424                            signature = %candidate.signature,
3425                            error = %e,
3426                            "promotion scan: promote failed"
3427                        );
3428                    }
3429                }
3430
3431                tracing::info!(candidates = candidates.len(), "promotion scan: complete");
3432            },
3433        );
3434
3435        if accepted {
3436            tracing::debug!("compression_spectrum: promotion scan task enqueued");
3437        }
3438    }
3439}
3440/// Thin wrapper that implements [`zeph_subagent::McpDispatch`] over an [`Arc<zeph_mcp::McpManager>`].
3441///
3442/// Used to pass MCP tool dispatch capability into `fire_hooks` without coupling
3443/// `zeph-subagent` to `zeph-mcp`.
3444struct McpManagerDispatch(Arc<zeph_mcp::McpManager>);
3445
3446impl zeph_subagent::McpDispatch for McpManagerDispatch {
3447    fn call_tool<'a>(
3448        &'a self,
3449        server: &'a str,
3450        tool: &'a str,
3451        args: serde_json::Value,
3452    ) -> std::pin::Pin<
3453        Box<dyn std::future::Future<Output = Result<serde_json::Value, String>> + Send + 'a>,
3454    > {
3455        Box::pin(async move {
3456            self.0
3457                .call_tool(server, tool, args)
3458                .await
3459                .map(|result| {
3460                    // Extract text content from the MCP response as a JSON value.
3461                    let texts: Vec<serde_json::Value> = result
3462                        .content
3463                        .iter()
3464                        .filter_map(|c| {
3465                            if let rmcp::model::RawContent::Text(t) = &c.raw {
3466                                Some(serde_json::Value::String(t.text.clone()))
3467                            } else {
3468                                None
3469                            }
3470                        })
3471                        .collect();
3472                    serde_json::Value::Array(texts)
3473                })
3474                .map_err(|e| e.to_string())
3475        })
3476    }
3477}
3478
3479pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
3480    while !*rx.borrow_and_update() {
3481        if rx.changed().await.is_err() {
3482            std::future::pending::<()>().await;
3483        }
3484    }
3485}
3486
3487pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
3488    match rx {
3489        Some(inner) => {
3490            if let Some(v) = inner.recv().await {
3491                Some(v)
3492            } else {
3493                *rx = None;
3494                std::future::pending().await
3495            }
3496        }
3497        None => std::future::pending().await,
3498    }
3499}
3500
3501/// Resolve the effective context budget from config, applying the `auto_budget` fallback.
3502///
3503/// Mirrors `AppBuilder::auto_budget_tokens` so hot-reload and initial startup use the same
3504/// logic: if `auto_budget = true` and `context_budget_tokens == 0`, query the provider's
3505/// context window; if still 0, fall back to 128 000 tokens.
3506/// Truncate a background run command to at most 80 characters for TUI display.
3507fn truncate_shell_command(cmd: &str) -> String {
3508    if cmd.len() <= 80 {
3509        return cmd.to_owned();
3510    }
3511    let end = cmd.floor_char_boundary(79);
3512    format!("{}…", &cmd[..end])
3513}
3514
3515/// Take the first 8 characters of a run-id hex string for compact TUI display.
3516fn truncate_shell_run_id(id: &str) -> String {
3517    id.chars().take(8).collect()
3518}
3519
3520pub(crate) fn resolve_context_budget(config: &Config, provider: &AnyProvider) -> usize {
3521    let tokens = if config.memory.auto_budget && config.memory.context_budget_tokens == 0 {
3522        if let Some(ctx_size) = provider.context_window() {
3523            tracing::info!(
3524                model_context = ctx_size,
3525                "auto-configured context budget on reload"
3526            );
3527            ctx_size
3528        } else {
3529            0
3530        }
3531    } else {
3532        config.memory.context_budget_tokens
3533    };
3534    if tokens == 0 {
3535        tracing::warn!(
3536            "context_budget_tokens resolved to 0 on reload — using fallback of 128000 tokens"
3537        );
3538        128_000
3539    } else {
3540        tokens
3541    }
3542}
3543
3544#[cfg(test)]
3545mod tests;
3546
3547#[cfg(test)]
3548pub(crate) use tests::agent_tests;
3549
3550#[cfg(test)]
3551mod test_stubs {
3552    use std::pin::Pin;
3553
3554    use zeph_commands::{
3555        CommandContext, CommandError, CommandHandler, CommandOutput, SlashCategory,
3556    };
3557
3558    /// Stub slash command registered only in `#[cfg(test)]` builds.
3559    ///
3560    /// Triggers the `Some(Err(CommandError))` arm in the session/debug registry
3561    /// dispatch block so the non-fatal error path can be tested without production
3562    /// command validation logic.
3563    pub(super) struct TestErrorCommand;
3564
3565    impl CommandHandler<CommandContext<'_>> for TestErrorCommand {
3566        fn name(&self) -> &'static str {
3567            "/test-error"
3568        }
3569
3570        fn description(&self) -> &'static str {
3571            "Test stub: always returns CommandError"
3572        }
3573
3574        fn category(&self) -> SlashCategory {
3575            SlashCategory::Session
3576        }
3577
3578        fn handle<'a>(
3579            &'a self,
3580            _ctx: &'a mut CommandContext<'_>,
3581            _args: &'a str,
3582        ) -> Pin<
3583            Box<dyn std::future::Future<Output = Result<CommandOutput, CommandError>> + Send + 'a>,
3584        > {
3585            Box::pin(async { Err(CommandError::new("boom")) })
3586        }
3587    }
3588}