Skip to main content

zeph_core/agent/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4// TODO(arch-revised-2026-04-26): agent/ module split deferred to epic/m49+/agent-split.
5// Hard prerequisite: 100% task_supervisor adoption (currently 10/31 raw tokio::spawn
6// sites in this directory). See arch-assessment-revised-2026-04-26T02-04-23.md PR 1
7// and PR 8. Do not split this file until PR 1 is merged and a /specs/ entry exists.
8
9mod acp_commands;
10mod agent_access_impl;
11pub(crate) mod agent_supervisor;
12mod autodream;
13mod autonomous_turn;
14mod builder;
15pub(crate) mod channel_impl;
16#[cfg(feature = "cocoon")]
17mod cocoon_cmd;
18mod command_context_impls;
19pub(super) mod compression_feedback;
20mod context;
21mod context_impls;
22pub(crate) mod context_manager;
23mod corrections;
24pub mod error;
25mod experiment_cmd;
26pub(crate) mod focus;
27mod index;
28mod learning;
29pub(crate) mod learning_engine;
30mod log_commands;
31mod loop_event;
32mod lsp_commands;
33mod magic_docs;
34mod mcp;
35pub(crate) mod memcot;
36mod message_queue;
37mod microcompact;
38mod model_commands;
39mod persistence;
40#[cfg(feature = "scheduler")]
41mod plan;
42mod policy_commands;
43mod provider_cmd;
44mod quality_hook;
45pub(crate) mod rate_limiter;
46#[cfg(feature = "scheduler")]
47mod scheduler_commands;
48#[cfg(feature = "scheduler")]
49mod scheduler_loop;
50mod scope_commands;
51pub mod session_config;
52mod session_digest;
53pub mod shadow_sentinel;
54pub(crate) mod sidequest;
55mod skill_management;
56pub mod slash_commands;
57pub mod speculative;
58pub(crate) mod state;
59pub(crate) mod task_injection;
60pub(crate) mod tool_execution;
61pub(crate) mod tool_orchestrator;
62pub mod trajectory;
63mod trajectory_commands;
64mod trust_commands;
65pub mod turn;
66mod utils;
67pub(crate) mod vigil;
68
69use std::collections::{HashMap, VecDeque};
70use std::fmt::Write as _;
71use std::sync::Arc;
72
73use parking_lot::RwLock;
74
75use tokio::sync::{mpsc, watch};
76use tokio_util::sync::CancellationToken;
77use zeph_llm::any::AnyProvider;
78use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
79use zeph_memory::TokenCounter;
80use zeph_memory::semantic::SemanticMemory;
81use zeph_skills::loader::Skill;
82use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
83use zeph_skills::prompt::format_skills_prompt;
84use zeph_skills::registry::SkillRegistry;
85use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
86
87use crate::channel::Channel;
88use crate::config::Config;
89use crate::context::{ContextBudget, build_system_prompt};
90use zeph_common::text::estimate_tokens;
91
92use loop_event::LoopEvent;
93use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, detect_image_mime};
94use state::MessageState;
95
96pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
97// CODE_CONTEXT_PREFIX is re-exported from zeph-agent-context::helpers so callers inside
98// zeph-core that build system-prompt injections can use it without depending on zeph-agent-context
99// directly. SESSION_DIGEST_PREFIX was removed when assembly migrated to ContextService.
100pub(crate) use zeph_agent_context::helpers::CODE_CONTEXT_PREFIX;
101pub(crate) const SCHEDULED_TASK_PREFIX: &str = "Execute the following scheduled task now: ";
102pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
103
104pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
105    use std::fmt::Write;
106    let capacity = "[tool output: ".len()
107        + tool_name.len()
108        + "]\n```\n".len()
109        + body.len()
110        + TOOL_OUTPUT_SUFFIX.len();
111    let mut buf = String::with_capacity(capacity);
112    let _ = write!(
113        buf,
114        "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
115    );
116    buf
117}
118
119/// Zeph agent: autonomous AI system with multi-model inference, semantic memory, skills,
120/// tool orchestration, and multi-channel I/O.
121///
122/// The agent maintains conversation history, manages LLM provider state, coordinates tool
123/// execution, and orchestrates memory and skill subsystems. It communicates with the outside
124/// world via the [`Channel`] trait, enabling support for CLI, Telegram, TUI, or custom I/O.
125///
126/// # Architecture
127///
128/// - **Message state**: Conversation history with system prompt, message queue, and metadata
129/// - **Memory state**: `SQLite` + Qdrant vector store for semantic search and compaction
130/// - **Skill state**: Registry, matching engine, and self-learning evolution
131/// - **Context manager**: Token budgeting, context assembly, and summarization
132/// - **Tool orchestrator**: DAG-based multi-tool execution with streaming output
133/// - **MCP client**: Multi-server support for Model Context Protocol
134/// - **Index state**: AST-based code indexing and semantic retrieval
135/// - **Security**: Sanitization, exfiltration detection, adversarial probes
136/// - **Metrics**: Token usage, latency, cost, and anomaly tracking
137///
138/// # Channel Contract
139///
140/// The agent requires a [`Channel`] implementation for user interaction:
141/// - Sends agent responses via `channel.send(message)`
142/// - Receives user input via `channel.recv()` / `channel.recv_internal()`
143/// - Supports structured events: tool invocations, tool output, streaming updates
144///
145/// # Lifecycle
146///
147/// 1. Create with [`Self::new`] or [`Self::new_with_registry_arc`]
148/// 2. Run main loop with [`Self::run`]
149/// 3. Clean up with [`Self::shutdown`] to persist state and close resources
150///
151pub struct Agent<C: Channel> {
152    // --- I/O & primary providers (kept inline) ---
153    provider: AnyProvider,
154    /// Dedicated embedding provider. Resolved once at bootstrap from `[[llm.providers]]`
155    /// (the entry with `embed = true`, or first entry with `embedding_model` set).
156    /// Falls back to `provider.clone()` when no dedicated entry exists.
157    /// **Never replaced** by `/provider switch`.
158    embedding_provider: AnyProvider,
159    channel: C,
160    pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
161
162    // --- Conversation core (kept inline) ---
163    pub(super) msg: MessageState,
164    pub(super) context_manager: context_manager::ContextManager,
165    pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
166
167    // --- Aggregated background services ---
168    pub(super) services: state::Services,
169
170    // --- Aggregated runtime / lifecycle / telemetry ---
171    pub(super) runtime: state::AgentRuntime,
172}
173
174/// Control flow signal returned by [`Agent::apply_dispatch_result`].
175enum DispatchFlow {
176    /// The command requested exit; the agent loop should `break`.
177    Break,
178    /// The command was handled; the agent loop should `continue`.
179    Continue,
180    /// The command was not recognised; the agent loop should fall through.
181    Fallthrough,
182}
183
184impl<C: Channel> Agent<C> {
185    /// Create a new agent instance with the given LLM provider, I/O channel, and subsystems.
186    ///
187    /// # Arguments
188    ///
189    /// * `provider` — Multi-model LLM provider (Claude, `OpenAI`, Ollama, Candle)
190    /// * `channel` — I/O abstraction for user interaction (CLI, Telegram, TUI, etc.)
191    /// * `registry` — Skill registry; moved into an internal `Arc<RwLock<_>>` for sharing
192    /// * `matcher` — Optional semantic skill matcher (e.g., Qdrant, BM25). If `None`,
193    ///   skills are matched by exact name only
194    /// * `max_active_skills` — Max concurrent skills in execution (must be > 0)
195    /// * `tool_executor` — Trait object for executing shell, web, and custom tools
196    ///
197    /// # Initialization
198    ///
199    /// The constructor:
200    /// 1. Wraps the skill registry into `Arc<RwLock<_>>` internally
201    /// 2. Builds the system prompt from registered skills
202    /// 3. Initializes all subsystems (memory, context manager, metrics, security)
203    /// 4. Returns a ready-to-run agent
204    ///
205    /// # Panics
206    ///
207    /// Panics if `max_active_skills` is 0.
208    #[must_use]
209    pub fn new(
210        provider: AnyProvider,
211        channel: C,
212        registry: SkillRegistry,
213        matcher: Option<SkillMatcherBackend>,
214        max_active_skills: usize,
215        tool_executor: impl ToolExecutor + 'static,
216    ) -> Self {
217        let registry = Arc::new(RwLock::new(registry));
218        let embedding_provider = provider.clone();
219        Self::new_with_registry_arc(
220            provider,
221            embedding_provider,
222            channel,
223            registry,
224            matcher,
225            max_active_skills,
226            tool_executor,
227        )
228    }
229
230    /// Create an agent from a pre-wrapped registry Arc, allowing the caller to
231    /// share the same Arc with other components (e.g. [`crate::SkillLoaderExecutor`]).
232    ///
233    /// # Panics
234    ///
235    /// Panics if the registry `RwLock` is poisoned.
236    #[must_use]
237    pub fn new_with_registry_arc(
238        provider: AnyProvider,
239        embedding_provider: AnyProvider,
240        channel: C,
241        registry: Arc<RwLock<SkillRegistry>>,
242        matcher: Option<SkillMatcherBackend>,
243        max_active_skills: usize,
244        tool_executor: impl ToolExecutor + 'static,
245    ) -> Self {
246        use state::{
247            AgentRuntime, CompressionState, DebugState, ExperimentState, FeedbackState, IndexState,
248            InstructionState, LifecycleState, McpState, MemoryState, MetricsState,
249            OrchestrationState, ProviderState, RuntimeConfig, SecurityState, Services,
250            SessionState, SkillState, ToolState,
251        };
252
253        debug_assert!(max_active_skills > 0, "max_active_skills must be > 0");
254        let all_skills: Vec<Skill> = {
255            let reg = registry.read();
256            reg.all_meta()
257                .iter()
258                .filter_map(|m| reg.skill(&m.name).ok())
259                .collect()
260        };
261        let empty_trust = HashMap::new();
262        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
263        let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
264        let system_prompt = build_system_prompt(&skills_prompt, None);
265        tracing::debug!(len = system_prompt.len(), "initial system prompt built");
266        tracing::trace!(prompt = %system_prompt, "full system prompt");
267
268        let initial_prompt_tokens = estimate_tokens(&system_prompt) as u64;
269        let token_counter = Arc::new(TokenCounter::new());
270
271        let services = Services {
272            memory: MemoryState::default(),
273            skill: SkillState::new(registry, matcher, max_active_skills, skills_prompt),
274            learning_engine: learning_engine::LearningEngine::new(),
275            feedback: FeedbackState::default(),
276            mcp: McpState::default(),
277            index: IndexState::default(),
278            session: SessionState::new(),
279            security: SecurityState::default(),
280            experiments: ExperimentState::new(),
281            compression: CompressionState::default(),
282            orchestration: OrchestrationState::default(),
283            focus: focus::FocusState::default(),
284            sidequest: sidequest::SidequestState::default(),
285            tool_state: ToolState::default(),
286            goal_accounting: None,
287            quality: None,
288            proactive_explorer: None,
289            promotion_engine: None,
290            taco_compressor: None,
291            speculation_engine: None,
292            autonomous: crate::goal::AutonomousDriver::new(tokio::time::Duration::from_millis(500)),
293            autonomous_registry: crate::goal::AutonomousRegistry::new(),
294        };
295
296        let runtime = AgentRuntime {
297            config: RuntimeConfig::default(),
298            lifecycle: LifecycleState::new(),
299            providers: ProviderState::new(initial_prompt_tokens),
300            metrics: MetricsState::new(token_counter),
301            debug: DebugState::default(),
302            instructions: InstructionState::default(),
303        };
304
305        Self {
306            provider,
307            embedding_provider,
308            channel,
309            tool_executor: Arc::new(tool_executor),
310            msg: MessageState {
311                messages: vec![Message {
312                    role: Role::System,
313                    content: system_prompt,
314                    parts: vec![],
315                    metadata: MessageMetadata::default(),
316                }],
317                message_queue: VecDeque::new(),
318                pending_image_parts: Vec::new(),
319                last_persisted_message_id: None,
320                deferred_db_hide_ids: Vec::new(),
321                deferred_db_summaries: Vec::new(),
322            },
323            context_manager: context_manager::ContextManager::new(),
324            tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
325            services,
326            runtime,
327        }
328    }
329
330    /// Consume the agent and return the inner channel.
331    ///
332    /// Call this after [`run`][Agent::run] completes to retrieve the I/O channel (e.g., to
333    /// read captured responses from a headless channel such as `BenchmarkChannel`).
334    ///
335    /// # Examples
336    ///
337    /// ```no_run
338    /// # use zeph_core::agent::Agent;
339    /// // After agent.run().await completes, consume the agent to retrieve the channel.
340    /// // let channel: MyChannel = agent.into_channel();
341    /// ```
342    #[must_use]
343    pub fn into_channel(self) -> C {
344        self.channel
345    }
346
347    /// Poll all active sub-agents for completed/failed/canceled results.
348    ///
349    /// Non-blocking: returns immediately with a list of `(task_id, result)` pairs
350    /// for agents that have finished. Each completed agent is removed from the manager.
351    #[tracing::instrument(name = "core.agent.poll_subagents", skip_all, level = "debug")]
352    pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
353        let Some(mgr) = &mut self.services.orchestration.subagent_manager else {
354            return vec![];
355        };
356
357        let finished: Vec<String> = mgr
358            .statuses()
359            .into_iter()
360            .filter_map(|(id, status)| {
361                if matches!(
362                    status.state,
363                    zeph_subagent::SubAgentState::Completed
364                        | zeph_subagent::SubAgentState::Failed
365                        | zeph_subagent::SubAgentState::Canceled
366                ) {
367                    Some(id)
368                } else {
369                    None
370                }
371            })
372            .collect();
373
374        let mut results = vec![];
375        for task_id in finished {
376            match mgr.collect(&task_id).await {
377                Ok(result) => results.push((task_id, result)),
378                Err(e) => {
379                    tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
380                }
381            }
382        }
383        results
384    }
385
386    /// Call the LLM to generate a structured session summary with a configurable timeout.
387    ///
388    /// Falls back to plain-text chat if structured output fails or times out. Returns `None` on
389    /// any failure, logging a warning — callers must treat `None` as "skip storage".
390    ///
391    /// Each LLM attempt is bounded by `shutdown_summary_timeout_secs`; in the worst case
392    /// (structured call times out and plain-text fallback also times out) this adds up to
393    /// `2 * shutdown_summary_timeout_secs` of shutdown latency.
394    async fn call_llm_for_session_summary(
395        &self,
396        chat_messages: &[Message],
397    ) -> Option<zeph_memory::StructuredSummary> {
398        let provider = self.resolve_background_provider(
399            &self.services.memory.compaction.shutdown_summary_provider,
400        );
401        let timeout_dur = std::time::Duration::from_secs(
402            self.services
403                .memory
404                .compaction
405                .shutdown_summary_timeout_secs,
406        );
407        match tokio::time::timeout(
408            timeout_dur,
409            provider.chat_typed_erased::<zeph_memory::StructuredSummary>(chat_messages),
410        )
411        .await
412        {
413            Ok(Ok(s)) => Some(s),
414            Ok(Err(e)) => {
415                tracing::warn!(
416                    "shutdown summary: structured LLM call failed, falling back to plain: {e:#}"
417                );
418                self.plain_text_summary_fallback(&provider, chat_messages, timeout_dur)
419                    .await
420            }
421            Err(_) => {
422                tracing::warn!(
423                    "shutdown summary: structured LLM call timed out after {}s, falling back to plain",
424                    self.services
425                        .memory
426                        .compaction
427                        .shutdown_summary_timeout_secs
428                );
429                self.plain_text_summary_fallback(&provider, chat_messages, timeout_dur)
430                    .await
431            }
432        }
433    }
434
435    async fn plain_text_summary_fallback(
436        &self,
437        provider: &zeph_llm::any::AnyProvider,
438        chat_messages: &[Message],
439        timeout_dur: std::time::Duration,
440    ) -> Option<zeph_memory::StructuredSummary> {
441        match tokio::time::timeout(timeout_dur, provider.chat(chat_messages)).await {
442            Ok(Ok(plain)) => Some(zeph_memory::StructuredSummary {
443                summary: plain,
444                key_facts: vec![],
445                entities: vec![],
446            }),
447            Ok(Err(e)) => {
448                tracing::warn!("shutdown summary: plain LLM fallback failed: {e:#}");
449                None
450            }
451            Err(_) => {
452                tracing::warn!("shutdown summary: plain LLM fallback timed out");
453                None
454            }
455        }
456    }
457
458    /// Persist tombstone `ToolResult` messages for any assistant `ToolUse` parts that were written
459    /// to the DB during this session but never paired with a `ToolResult` (e.g. because stdin
460    /// closed while tool execution was in progress). Without this the next session startup strips
461    /// those assistant messages and emits orphan warnings.
462    async fn flush_orphaned_tool_use_on_shutdown(&mut self) {
463        use zeph_llm::provider::{MessagePart, Role};
464
465        // Walk messages in reverse: if the last assistant message (ignoring any trailing
466        // system messages) has ToolUse parts and is NOT immediately followed by a user
467        // message whose ToolResult ids cover those ToolUse ids, persist tombstones.
468        let msgs = &self.msg.messages;
469        // Find last assistant message index.
470        let Some(asst_idx) = msgs.iter().rposition(|m| m.role == Role::Assistant) else {
471            return;
472        };
473        let asst_msg = &msgs[asst_idx];
474        let tool_use_ids: Vec<(&str, &str, &serde_json::Value)> = asst_msg
475            .parts
476            .iter()
477            .filter_map(|p| {
478                if let MessagePart::ToolUse { id, name, input } = p {
479                    Some((id.as_str(), name.as_str(), input))
480                } else {
481                    None
482                }
483            })
484            .collect();
485        if tool_use_ids.is_empty() {
486            return;
487        }
488
489        // Check whether a following user message already pairs all ToolUse ids.
490        let paired_ids: std::collections::HashSet<&str> = msgs
491            .get(asst_idx + 1..)
492            .into_iter()
493            .flatten()
494            .filter(|m| m.role == Role::User)
495            .flat_map(|m| m.parts.iter())
496            .filter_map(|p| {
497                if let MessagePart::ToolResult { tool_use_id, .. } = p {
498                    Some(tool_use_id.as_str())
499                } else {
500                    None
501                }
502            })
503            .collect();
504
505        let unpaired: Vec<zeph_llm::provider::ToolUseRequest> = tool_use_ids
506            .iter()
507            .filter(|(id, _, _)| !paired_ids.contains(*id))
508            .map(|(id, name, input)| zeph_llm::provider::ToolUseRequest {
509                id: (*id).to_owned(),
510                name: (*name).to_owned().into(),
511                input: (*input).clone(),
512            })
513            .collect();
514
515        if unpaired.is_empty() {
516            return;
517        }
518
519        tracing::info!(
520            count = unpaired.len(),
521            "shutdown: persisting tombstone ToolResults for unpaired in-flight tool calls"
522        );
523        self.persist_cancelled_tool_results(&unpaired).await;
524    }
525
526    /// Generate and store a lightweight session summary at shutdown when no hard compaction fired.
527    ///
528    /// Guards:
529    /// - `shutdown_summary` config must be enabled
530    /// - `conversation_id` must be set (memory must be attached)
531    /// - no existing session summary in the store (primary guard — resilient to failed Qdrant writes)
532    /// - at least `shutdown_summary_min_messages` user-turn messages in history
533    ///
534    /// All errors are logged as warnings and swallowed — shutdown must never fail.
535    async fn maybe_store_shutdown_summary(&mut self) {
536        if !self.services.memory.compaction.shutdown_summary {
537            return;
538        }
539        let Some(memory) = self.services.memory.persistence.memory.clone() else {
540            return;
541        };
542        let Some(conversation_id) = self.services.memory.persistence.conversation_id else {
543            return;
544        };
545
546        // Primary guard: check if a summary already exists (handles failed Qdrant writes too).
547        match memory.has_session_summary(conversation_id).await {
548            Ok(true) => {
549                tracing::debug!("shutdown summary: session already has a summary, skipping");
550                return;
551            }
552            Ok(false) => {}
553            Err(e) => {
554                tracing::warn!("shutdown summary: failed to check existing summary: {e:#}");
555                return;
556            }
557        }
558
559        // Count user-turn messages only (skip system prompt at index 0).
560        let user_count = self
561            .msg
562            .messages
563            .iter()
564            .skip(1)
565            .filter(|m| m.role == Role::User)
566            .count();
567        if user_count
568            < self
569                .services
570                .memory
571                .compaction
572                .shutdown_summary_min_messages
573        {
574            tracing::debug!(
575                user_count,
576                min = self
577                    .services
578                    .memory
579                    .compaction
580                    .shutdown_summary_min_messages,
581                "shutdown summary: too few user messages, skipping"
582            );
583            return;
584        }
585
586        // TUI status — send errors silently ignored (TUI may already be gone at shutdown).
587        let _ = self.channel.send_status("Saving session summary...").await;
588
589        // Collect last N messages (skip system prompt at index 0).
590        let max = self
591            .services
592            .memory
593            .compaction
594            .shutdown_summary_max_messages;
595        if max == 0 {
596            tracing::debug!("shutdown summary: max_messages=0, skipping");
597            return;
598        }
599        let non_system: Vec<_> = self.msg.messages.iter().skip(1).collect();
600        let slice = if non_system.len() > max {
601            &non_system[non_system.len() - max..]
602        } else {
603            &non_system[..]
604        };
605
606        let msgs_for_prompt: Vec<(zeph_memory::MessageId, String, String)> = slice
607            .iter()
608            .map(|m| {
609                let role = match m.role {
610                    Role::User => "user".to_owned(),
611                    Role::Assistant => "assistant".to_owned(),
612                    Role::System => "system".to_owned(),
613                };
614                (zeph_memory::MessageId(0), role, m.content.clone())
615            })
616            .collect();
617
618        let prompt = zeph_memory::build_summarization_prompt(&msgs_for_prompt);
619        let chat_messages = vec![Message {
620            role: Role::User,
621            content: prompt,
622            parts: vec![],
623            metadata: MessageMetadata::default(),
624        }];
625
626        let Some(structured) = self.call_llm_for_session_summary(&chat_messages).await else {
627            let _ = self.channel.send_status("").await;
628            return;
629        };
630
631        if let Err(e) = memory
632            .store_shutdown_summary(conversation_id, &structured.summary, &structured.key_facts)
633            .await
634        {
635            tracing::warn!("shutdown summary: storage failed: {e:#}");
636        } else {
637            tracing::info!(
638                conversation_id = conversation_id.0,
639                "shutdown summary stored"
640            );
641        }
642
643        // Clear TUI status.
644        let _ = self.channel.send_status("").await;
645    }
646
647    /// Gracefully shut down the agent and persist state.
648    ///
649    /// Performs the following cleanup:
650    ///
651    /// 1. **Message persistence** — Deferred database writes (hide/summary operations)
652    ///    are flushed to memory or disk
653    /// 2. **Provider state** — LLM router state (e.g., Thompson sampling counters) is saved
654    ///    to the vault
655    /// 3. **Sub-agents** — All active sub-agent tasks are terminated
656    /// 4. **MCP servers** — All connected Model Context Protocol servers are shut down
657    /// 5. **Metrics finalization** — Compaction metrics and session metrics are recorded
658    /// 6. **Memory finalization** — Vector stores and semantic indices are flushed
659    /// 7. **Skill state** — Self-learning engine saves evolved skill definitions
660    ///
661    /// Call this before dropping the agent to ensure no data loss.
662    #[tracing::instrument(name = "core.agent.shutdown", skip_all, level = "debug")]
663    pub async fn shutdown(&mut self) {
664        let _ = self.channel.send_status("Shutting down...").await;
665
666        // CRIT-1: persist Thompson state accumulated during this session.
667        self.provider.save_router_state().await;
668
669        // Persist AdaptOrch Beta-arm table alongside Thompson state.
670        if let Some(ref advisor) = self.services.orchestration.topology_advisor
671            && let Err(e) = advisor.save()
672        {
673            tracing::warn!(error = %e, "adaptorch: failed to persist state");
674        }
675
676        if let Some(ref mut mgr) = self.services.orchestration.subagent_manager {
677            mgr.shutdown_all();
678        }
679
680        if let Some(ref manager) = self.services.mcp.manager {
681            manager.shutdown_all_shared().await;
682        }
683
684        // Finalize compaction trajectory: push the last open segment into the Vec.
685        // This segment would otherwise only be pushed when the next hard compaction fires,
686        // which never happens at session end.
687        if let Some(turns) = self.context_manager.turns_since_last_hard_compaction() {
688            self.update_metrics(|m| {
689                m.compaction_turns_after_hard.push(turns);
690            });
691            self.context_manager
692                .set_turns_since_last_hard_compaction(None);
693        }
694
695        if let Some(ref tx) = self.runtime.metrics.metrics_tx {
696            let m = tx.borrow();
697            if m.filter_applications > 0 {
698                #[allow(clippy::cast_precision_loss)]
699                let pct = if m.filter_raw_tokens > 0 {
700                    m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
701                } else {
702                    0.0
703                };
704                tracing::info!(
705                    raw_tokens = m.filter_raw_tokens,
706                    saved_tokens = m.filter_saved_tokens,
707                    applications = m.filter_applications,
708                    "tool output filtering saved ~{} tokens ({pct:.0}%)",
709                    m.filter_saved_tokens,
710                );
711            }
712            if m.compaction_hard_count > 0 {
713                tracing::info!(
714                    hard_compactions = m.compaction_hard_count,
715                    turns_after_hard = ?m.compaction_turns_after_hard,
716                    "hard compaction trajectory"
717                );
718            }
719        }
720
721        // Flush tombstone ToolResults for any assistant ToolUse that was persisted but never
722        // paired with a ToolResult (e.g. stdin EOF mid-execution). Without this the next session
723        // startup strips the orphaned ToolUse and emits warnings.
724        self.flush_orphaned_tool_use_on_shutdown().await;
725
726        // Signal the experiment CancellationToken first so the task can clean up gracefully,
727        // then abort the handle to guarantee it does not outlive the agent regardless.
728        if let Some(ref token) = self.services.experiments.cancel {
729            token.cancel();
730        }
731        if let Some(h) = self.services.experiments.handle.take() {
732            h.abort();
733        }
734
735        // Forcibly abort in-flight Enrichment and Telemetry tasks tracked by the supervisor.
736        self.runtime.lifecycle.supervisor.abort_all();
737
738        // Abort background task handles not tracked by BackgroundSupervisor.
739        // Per the Await Discipline rule, fire-and-forget handles must be aborted on shutdown.
740        if let Some(h) = self.services.compression.pending_task_goal.take() {
741            h.abort();
742        }
743        if let Some(h) = self.services.compression.pending_sidequest_result.take() {
744            h.abort();
745        }
746        if let Some(h) = self.services.compression.pending_subgoal.take() {
747            h.abort();
748        }
749
750        // Abort learning tasks (JoinSet detached at turn boundaries but not on shutdown).
751        self.services.learning_engine.learning_tasks.abort_all();
752
753        // Allow cancelled tasks to release their HTTP connections before the summary LLM call.
754        // abort_all() posts cancellation signals but does not drain tasks; aborted futures only
755        // observe cancellation at their next .await point. Without yielding here the summary
756        // call races in-flight enrichment HTTP connections for the same API rate-limit budget.
757        for _ in 0..4 {
758            tokio::task::yield_now().await;
759        }
760
761        self.maybe_store_shutdown_summary().await;
762        self.maybe_store_session_digest().await;
763
764        tracing::info!("agent shutdown complete");
765    }
766
767    /// Run the chat loop, receiving messages via the channel until EOF or shutdown.
768    ///
769    /// # Errors
770    ///
771    /// Returns an error if channel I/O or LLM communication fails.
772    /// Refresh sub-agent metrics snapshot for the TUI metrics panel.
773    fn refresh_subagent_metrics(&mut self) {
774        let Some(ref mgr) = self.services.orchestration.subagent_manager else {
775            return;
776        };
777        let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
778            .statuses()
779            .into_iter()
780            .map(|(id, s)| {
781                let def = mgr.agents_def(&id);
782                crate::metrics::SubAgentMetrics {
783                    name: def.map_or_else(|| id[..8.min(id.len())].to_owned(), |d| d.name.clone()),
784                    id: id.clone(),
785                    state: format!("{:?}", s.state).to_lowercase(),
786                    turns_used: s.turns_used,
787                    max_turns: def.map_or(20, |d| d.permissions.max_turns),
788                    background: def.is_some_and(|d| d.permissions.background),
789                    elapsed_secs: s.started_at.elapsed().as_secs(),
790                    permission_mode: def.map_or_else(String::new, |d| {
791                        use zeph_subagent::def::PermissionMode;
792                        match d.permissions.permission_mode {
793                            PermissionMode::Default => String::new(),
794                            PermissionMode::AcceptEdits => "accept_edits".into(),
795                            PermissionMode::DontAsk => "dont_ask".into(),
796                            PermissionMode::BypassPermissions => "bypass_permissions".into(),
797                            PermissionMode::Plan => "plan".into(),
798                        }
799                    }),
800                    transcript_dir: mgr
801                        .agent_transcript_dir(&id)
802                        .map(|p| p.to_string_lossy().into_owned()),
803                }
804            })
805            .collect();
806        self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
807    }
808
809    /// Non-blocking poll: notify the user when background sub-agents complete.
810    async fn notify_completed_subagents(&mut self) -> Result<(), error::AgentError> {
811        let completed = self.poll_subagents().await;
812        for (task_id, result) in completed {
813            let notice = if result.is_empty() {
814                format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
815            } else {
816                format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
817            };
818            if let Err(e) = self.channel.send(&notice).await {
819                tracing::warn!(error = %e, "failed to send sub-agent completion notice");
820            }
821        }
822        Ok(())
823    }
824
825    /// Run the agent main loop.
826    ///
827    /// # Errors
828    ///
829    /// Returns an error if the channel, LLM provider, or tool execution encounters a fatal error.
830    #[tracing::instrument(name = "core.agent.run", skip_all, level = "debug", err)]
831    #[allow(clippy::too_many_lines)] // run loop is inherently large; each branch is independent
832    pub async fn run(&mut self) -> Result<(), error::AgentError>
833    where
834        C: 'static,
835    {
836        if let Some(mut rx) = self.runtime.lifecycle.warmup_ready.take()
837            && !*rx.borrow()
838        {
839            let _ = rx.changed().await;
840            if !*rx.borrow() {
841                tracing::warn!("model warmup did not complete successfully");
842            }
843        }
844
845        // Restore the last-used provider preference before any user interaction (#3308).
846        self.restore_channel_provider().await;
847
848        // Load the session digest once at session start for context injection.
849        self.load_and_cache_session_digest().await;
850        self.maybe_send_resume_recap().await;
851
852        loop {
853            self.apply_provider_override();
854            self.check_tool_refresh().await;
855            self.process_pending_elicitations().await;
856            self.refresh_subagent_metrics();
857            self.notify_completed_subagents().await?;
858            self.drain_channel();
859
860            let (text, image_parts) = if let Some(queued) = self.msg.message_queue.pop_front() {
861                self.notify_queue_count().await;
862                if queued.raw_attachments.is_empty() {
863                    (queued.text, queued.image_parts)
864                } else {
865                    let msg = crate::channel::ChannelMessage {
866                        text: queued.text,
867                        attachments: queued.raw_attachments,
868                        is_guest_context: false,
869                        is_from_bot: false,
870                    };
871                    self.resolve_message(msg).await
872                }
873            } else {
874                match self.next_event().await? {
875                    None | Some(LoopEvent::Shutdown) => break,
876                    Some(LoopEvent::SkillReload) => {
877                        self.reload_skills().await;
878                        continue;
879                    }
880                    Some(LoopEvent::InstructionReload) => {
881                        self.reload_instructions().await;
882                        continue;
883                    }
884                    Some(LoopEvent::ConfigReload) => {
885                        self.reload_config();
886                        continue;
887                    }
888                    Some(LoopEvent::UpdateNotification(msg)) => {
889                        if let Err(e) = self.channel.send(&msg).await {
890                            tracing::warn!("failed to send update notification: {e}");
891                        }
892                        continue;
893                    }
894                    Some(LoopEvent::ExperimentCompleted(msg)) => {
895                        self.services.experiments.cancel = None;
896                        self.services.experiments.handle = None;
897                        if let Err(e) = self.channel.send(&msg).await {
898                            tracing::warn!("failed to send experiment completion: {e}");
899                        }
900                        continue;
901                    }
902                    Some(LoopEvent::ScheduledTask(prompt)) => {
903                        let text = format!("{SCHEDULED_TASK_PREFIX}{prompt}");
904                        let msg = crate::channel::ChannelMessage {
905                            text,
906                            attachments: Vec::new(),
907                            is_guest_context: false,
908                            is_from_bot: false,
909                        };
910                        self.drain_channel();
911                        self.resolve_message(msg).await
912                    }
913                    Some(LoopEvent::TaskInjected(injection)) => {
914                        if let Some(ref mut ls) = self.runtime.lifecycle.user_loop {
915                            ls.iteration += 1;
916                            tracing::info!(iteration = ls.iteration, "loop: tick");
917                        }
918                        let msg = crate::channel::ChannelMessage {
919                            text: injection.prompt,
920                            attachments: Vec::new(),
921                            is_guest_context: false,
922                            is_from_bot: false,
923                        };
924                        self.drain_channel();
925                        self.resolve_message(msg).await
926                    }
927                    Some(LoopEvent::FileChanged(event)) => {
928                        self.handle_file_changed(event).await;
929                        continue;
930                    }
931                    Some(LoopEvent::AutonomousTick) => {
932                        if let Err(e) = self.run_autonomous_turn().await {
933                            tracing::warn!(error = %e, "autonomous turn error");
934                        }
935                        continue;
936                    }
937                    Some(LoopEvent::Message(msg)) => {
938                        self.services.session.is_guest_context = msg.is_guest_context;
939                        self.drain_channel();
940                        self.resolve_message(msg).await
941                    }
942                }
943            };
944
945            let trimmed = text.trim();
946
947            // M3: extract flagged URLs from all slash commands before any registry dispatch,
948            // so `/skill install <url>` and similar commands populate user_provided_urls.
949            if trimmed.starts_with('/') {
950                let slash_urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
951                if !slash_urls.is_empty() {
952                    self.services
953                        .security
954                        .user_provided_urls
955                        .write()
956                        .extend(slash_urls);
957                }
958            }
959
960            // Registry dispatch: two-phase command dispatch.
961            //
962            // Phase 1 (session/debug): handlers that need sink + debug + messages but NOT agent.
963            // Phase 2 (agent): handlers that need &mut Agent directly; use null sentinels for
964            // the other CommandContext fields to satisfy the type but avoid borrow conflicts.
965            //
966            // STRUCTURAL NOTE (C4 — borrow-checker constraint, not deferred by oversight):
967            // A `TurnState<'a, C>` struct grouping disjoint `&mut Agent<C>` sub-fields would
968            // eliminate the LIFO-sentinel ordering below. The obstacle: `AgentAccess` is
969            // implemented on `Agent<C>` itself (see `agent_access_impl.rs`), which accesses
970            // fields like `memory_state`, `providers`, `mcp`, and `skill_state`. Those fields
971            // overlap with what a `TurnState` would need to borrow, so `AgentBackend::Real`
972            // cannot simultaneously hold `&mut Agent` while `TurnState` holds `&mut Agent.providers`.
973            // The fix requires splitting `Agent<C>` fields into two disjoint sub-structs and moving
974            // `AgentAccess` to the sub-struct that is disjoint from `TurnState`'s borrow set.
975            // That restructuring touches `agent_access_impl.rs`, `state.rs`, `builder.rs`, all
976            // command handlers, and the binary crate — estimated > 300 lines across > 5 files.
977            // Track as a multi-PR refactor; the current sentinel pattern is correct and safe.
978            //
979            // Drop-order rules enforced here:
980            //   - `sink_adapter` / `null_agent` declared before the registry block → dropped after.
981            //   - Phase-2 sentinels declared before `ctx` → dropped after `ctx`.
982            let session_impl = command_context_impls::SessionAccessImpl {
983                supports_exit: self.channel.supports_exit(),
984            };
985            let mut messages_impl = command_context_impls::MessageAccessImpl {
986                msg: &mut self.msg,
987                tool_state: &mut self.services.tool_state,
988                providers: &mut self.runtime.providers,
989                metrics: &self.runtime.metrics,
990                security: &mut self.services.security,
991                tool_orchestrator: &mut self.tool_orchestrator,
992            };
993            // sink_adapter declared before reg so it is dropped after reg (LIFO).
994            let mut sink_adapter = crate::channel::ChannelSinkAdapter(&mut self.channel);
995            // null_agent must be declared before reg so it lives longer (LIFO drop order).
996            let mut null_agent = zeph_commands::NullAgent;
997            let registry_handled = {
998                use zeph_commands::CommandRegistry;
999                use zeph_commands::handlers::debug::{
1000                    DebugDumpCommand, DumpFormatCommand, LogCommand,
1001                };
1002                use zeph_commands::handlers::help::HelpCommand;
1003                use zeph_commands::handlers::session::{
1004                    ClearCommand, ClearQueueCommand, ExitCommand, QuitCommand, ResetCommand,
1005                };
1006
1007                let mut reg = CommandRegistry::new();
1008                reg.register(ExitCommand);
1009                reg.register(QuitCommand);
1010                reg.register(ClearCommand);
1011                reg.register(ResetCommand);
1012                reg.register(ClearQueueCommand);
1013                reg.register(LogCommand);
1014                reg.register(DebugDumpCommand);
1015                reg.register(DumpFormatCommand);
1016                reg.register(HelpCommand);
1017                #[cfg(test)]
1018                reg.register(test_stubs::TestErrorCommand);
1019
1020                let mut ctx = zeph_commands::CommandContext {
1021                    sink: &mut sink_adapter,
1022                    debug: &mut self.runtime.debug,
1023                    messages: &mut messages_impl,
1024                    session: &session_impl,
1025                    agent: &mut null_agent,
1026                };
1027                reg.dispatch(&mut ctx, trimmed).await
1028            };
1029            let session_reg_missed = registry_handled.is_none();
1030            match self
1031                .apply_dispatch_result(registry_handled, trimmed, false)
1032                .await
1033            {
1034                DispatchFlow::Break => break,
1035                DispatchFlow::Continue => continue,
1036                DispatchFlow::Fallthrough => {
1037                    // Not handled by the session/debug registry; try agent-command registry.
1038                }
1039            }
1040
1041            // Agent-command registry: handlers access Agent<C> directly.
1042            // Null sentinels declared here so they outlive ctx regardless of whether the `if`
1043            // block is entered. `ctx` borrows both `self` and the sentinels; it must drop before
1044            // any subsequent `self.channel.*` calls. Because Rust drops in LIFO order, the
1045            // sentinels here will outlive ctx (ctx is declared later, inside the block).
1046            let mut agent_null_debug = command_context_impls::NullDebugAccess;
1047            let mut agent_null_messages = command_context_impls::NullMessageAccess;
1048            let agent_null_session = command_context_impls::NullSessionAccess;
1049            let mut agent_null_sink = zeph_commands::NullSink;
1050            let agent_result: Option<
1051                Result<zeph_commands::CommandOutput, zeph_commands::CommandError>,
1052            > = if session_reg_missed {
1053                use zeph_commands::CommandRegistry;
1054                use zeph_commands::handlers::{
1055                    acp::AcpCommand,
1056                    agent_cmd::AgentCommand,
1057                    agents_fleet::AgentsFleetCommand,
1058                    compaction::{CompactCommand, NewConversationCommand, RecapCommand},
1059                    experiment::ExperimentCommand,
1060                    goal::GoalCommand,
1061                    loop_cmd::LoopCommand,
1062                    lsp::LspCommand,
1063                    mcp::McpCommand,
1064                    memory::{GraphCommand, GuidelinesCommand, MemoryCommand},
1065                    misc::{CacheStatsCommand, ImageCommand, NotifyTestCommand},
1066                    model::{ModelCommand, ProviderCommand},
1067                    plan::PlanCommand,
1068                    plugins::PluginsCommand,
1069                    policy::PolicyCommand,
1070                    scheduler::SchedulerCommand,
1071                    skill::{FeedbackCommand, SkillCommand, SkillsCommand},
1072                    status::{FocusCommand, GuardrailCommand, SideQuestCommand, StatusCommand},
1073                    trajectory::{ScopeCommand, TrajectoryCommand},
1074                };
1075
1076                let mut agent_reg = CommandRegistry::new();
1077                agent_reg.register(MemoryCommand);
1078                agent_reg.register(GraphCommand);
1079                agent_reg.register(GuidelinesCommand);
1080                agent_reg.register(ModelCommand);
1081                agent_reg.register(ProviderCommand);
1082                // Phase 6 migrations: /skill, /skills, /feedback use clone-before-await pattern.
1083                agent_reg.register(SkillCommand);
1084                agent_reg.register(SkillsCommand);
1085                agent_reg.register(FeedbackCommand);
1086                agent_reg.register(McpCommand);
1087                agent_reg.register(PolicyCommand);
1088                agent_reg.register(SchedulerCommand);
1089                agent_reg.register(LspCommand);
1090                // Phase 4 migrations (Send-safe commands):
1091                agent_reg.register(CacheStatsCommand);
1092                agent_reg.register(ImageCommand);
1093                agent_reg.register(NotifyTestCommand);
1094                agent_reg.register(StatusCommand);
1095                agent_reg.register(GuardrailCommand);
1096                agent_reg.register(FocusCommand);
1097                agent_reg.register(SideQuestCommand);
1098                agent_reg.register(AgentCommand);
1099                agent_reg.register(AgentsFleetCommand);
1100                // Phase 5 migrations (Send-compatible):
1101                agent_reg.register(CompactCommand);
1102                agent_reg.register(NewConversationCommand);
1103                agent_reg.register(RecapCommand);
1104                agent_reg.register(ExperimentCommand);
1105                agent_reg.register(PlanCommand);
1106                agent_reg.register(LoopCommand);
1107                agent_reg.register(PluginsCommand);
1108                agent_reg.register(AcpCommand);
1109                #[cfg(feature = "cocoon")]
1110                agent_reg.register(zeph_commands::handlers::cocoon::CocoonCommand);
1111                agent_reg.register(TrajectoryCommand);
1112                agent_reg.register(ScopeCommand);
1113                agent_reg.register(GoalCommand);
1114
1115                let mut ctx = zeph_commands::CommandContext {
1116                    sink: &mut agent_null_sink,
1117                    debug: &mut agent_null_debug,
1118                    messages: &mut agent_null_messages,
1119                    session: &agent_null_session,
1120                    agent: self,
1121                };
1122                // self is reborrowed; ctx drops at end of this block.
1123                agent_reg.dispatch(&mut ctx, trimmed).await
1124            } else {
1125                None
1126            };
1127            // self.channel is available again here (ctx borrow dropped above).
1128
1129            // S1 fix: drain any pending autonomous session start queued by handle_goal.
1130            // handle_goal runs inside Box::pin(async move) and cannot borrow &mut self directly,
1131            // so it writes to pending_start_arc. We consume it here where &mut self is free.
1132            if let Some((cancelled_id, new_id)) = self.services.autonomous.flush_pending_start() {
1133                if let Some(cid) = cancelled_id {
1134                    tracing::info!(
1135                        goal_id = cid,
1136                        "autonomous: previous session cancelled for new goal"
1137                    );
1138                }
1139                self.sync_registry_entry();
1140                tracing::info!(goal_id = new_id, "autonomous: session started");
1141            }
1142
1143            // Post-dispatch learning hook for `/skill reject` / `/feedback` is triggered
1144            // inside apply_dispatch_result when with_learning = true.
1145            match self
1146                .apply_dispatch_result(agent_result, trimmed, true)
1147                .await
1148            {
1149                DispatchFlow::Break => break,
1150                DispatchFlow::Continue => continue,
1151                DispatchFlow::Fallthrough => {
1152                    // Not handled by agent registry; fall through to existing dispatch.
1153                }
1154            }
1155
1156            match self.handle_builtin_command(trimmed) {
1157                Some(true) => break,
1158                Some(false) => continue,
1159                None => {}
1160            }
1161
1162            self.process_user_message(text, image_parts).await?;
1163        }
1164
1165        // autoDream: run background memory consolidation if conditions are met (#2697).
1166        // Runs with a timeout — partial state is acceptable for MVP.
1167        self.maybe_autodream().await;
1168
1169        // Flush trace collector on normal exit (C-04: Drop handles error/panic paths).
1170        if let Some(ref mut tc) = self.runtime.debug.trace_collector {
1171            tc.finish();
1172        }
1173
1174        Ok(())
1175    }
1176
1177    /// Dispatch a slash-command registry result and flush the channel.
1178    ///
1179    /// Returns [`DispatchFlow::Break`] on exit, [`DispatchFlow::Continue`] when handled, or
1180    /// [`DispatchFlow::Fallthrough`] when `result` is `None`.
1181    /// When `with_learning` is `true`, triggers the post-command learning hook for `Message` output.
1182    async fn apply_dispatch_result(
1183        &mut self,
1184        result: Option<Result<zeph_commands::CommandOutput, zeph_commands::CommandError>>,
1185        command: &str,
1186        with_learning: bool,
1187    ) -> DispatchFlow {
1188        match result {
1189            Some(Ok(zeph_commands::CommandOutput::Exit)) => {
1190                let _ = self.channel.flush_chunks().await;
1191                DispatchFlow::Break
1192            }
1193            Some(Ok(
1194                zeph_commands::CommandOutput::Continue | zeph_commands::CommandOutput::Silent,
1195            )) => {
1196                let _ = self.channel.flush_chunks().await;
1197                DispatchFlow::Continue
1198            }
1199            Some(Ok(zeph_commands::CommandOutput::Message(msg))) => {
1200                let _ = self.channel.send(&msg).await;
1201                let _ = self.channel.flush_chunks().await;
1202                if with_learning {
1203                    self.maybe_trigger_post_command_learning(command).await;
1204                }
1205                DispatchFlow::Continue
1206            }
1207            Some(Err(e)) => {
1208                let _ = self.channel.send(&e.to_string()).await;
1209                let _ = self.channel.flush_chunks().await;
1210                tracing::warn!(command = %command, error = %e.0, "slash command failed");
1211                DispatchFlow::Continue
1212            }
1213            None => DispatchFlow::Fallthrough,
1214        }
1215    }
1216
1217    /// Apply any pending LLM provider override from ACP `set_session_config_option`.
1218    fn apply_provider_override(&mut self) {
1219        if let Some(ref slot) = self.runtime.providers.provider_override
1220            && let Some(new_provider) = slot.write().take()
1221        {
1222            tracing::debug!(provider = new_provider.name(), "ACP model override applied");
1223            self.provider = new_provider;
1224        }
1225    }
1226
1227    /// Poll all event sources and return the next [`LoopEvent`].
1228    ///
1229    /// Returns `None` when the inbound channel closes (graceful shutdown).
1230    ///
1231    /// # Errors
1232    ///
1233    /// Propagates channel receive errors.
1234    #[tracing::instrument(name = "core.agent.next_event", skip_all, level = "debug", err)]
1235    async fn next_event(&mut self) -> Result<Option<LoopEvent>, error::AgentError> {
1236        let event = tokio::select! {
1237            result = self.channel.recv() => {
1238                return Ok(result?.map(LoopEvent::Message));
1239            }
1240            () = shutdown_signal(&mut self.runtime.lifecycle.shutdown) => {
1241                tracing::info!("shutting down");
1242                LoopEvent::Shutdown
1243            }
1244            Some(_) = recv_optional(&mut self.services.skill.skill_reload_rx) => {
1245                LoopEvent::SkillReload
1246            }
1247            Some(_) = recv_optional(&mut self.runtime.instructions.reload_rx) => {
1248                LoopEvent::InstructionReload
1249            }
1250            Some(_) = recv_optional(&mut self.runtime.lifecycle.config_reload_rx) => {
1251                LoopEvent::ConfigReload
1252            }
1253            Some(msg) = recv_optional(&mut self.runtime.lifecycle.update_notify_rx) => {
1254                LoopEvent::UpdateNotification(msg)
1255            }
1256            Some(msg) = recv_optional(&mut self.services.experiments.notify_rx) => {
1257                LoopEvent::ExperimentCompleted(msg)
1258            }
1259            Some(prompt) = recv_optional(&mut self.runtime.lifecycle.custom_task_rx) => {
1260                tracing::info!("scheduler: injecting custom task as agent turn");
1261                LoopEvent::ScheduledTask(prompt)
1262            }
1263            () = async {
1264                if let Some(ref mut ls) = self.runtime.lifecycle.user_loop {
1265                    if ls.cancel_tx.is_cancelled() {
1266                        std::future::pending::<()>().await;
1267                    } else {
1268                        ls.interval.tick().await;
1269                    }
1270                } else {
1271                    std::future::pending::<()>().await;
1272                }
1273            } => {
1274                // Re-check user_loop after the tick — /loop stop may have fired between the
1275                // interval firing and this arm executing. Returning Ok(None) causes the caller
1276                // to `continue` without injecting a stale or empty prompt.
1277                let Some(ls) = self.runtime.lifecycle.user_loop.as_ref() else {
1278                    return Ok(None);
1279                };
1280                if ls.cancel_tx.is_cancelled() {
1281                    self.runtime.lifecycle.user_loop = None;
1282                    return Ok(None);
1283                }
1284                let prompt = ls.prompt.clone();
1285                LoopEvent::TaskInjected(task_injection::TaskInjection { prompt })
1286            }
1287            Some(event) = recv_optional(&mut self.runtime.lifecycle.file_changed_rx) => {
1288                LoopEvent::FileChanged(event)
1289            }
1290            // Autonomous goal tick: fires when a running session is active.
1291            () = self.services.autonomous.next_tick(),
1292                if self.services.autonomous.should_tick() => {
1293                LoopEvent::AutonomousTick
1294            }
1295        };
1296        Ok(Some(event))
1297    }
1298
1299    #[tracing::instrument(name = "core.agent.resolve_message", skip_all, level = "debug")]
1300    async fn resolve_message(
1301        &self,
1302        msg: crate::channel::ChannelMessage,
1303    ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
1304        use crate::channel::{Attachment, AttachmentKind};
1305        use zeph_llm::provider::{ImageData, MessagePart};
1306
1307        let text_base = msg.text.clone();
1308
1309        let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
1310            .attachments
1311            .into_iter()
1312            .partition(|a| a.kind == AttachmentKind::Audio);
1313
1314        tracing::debug!(
1315            audio = audio_attachments.len(),
1316            has_stt = self.runtime.providers.stt.is_some(),
1317            "resolve_message attachments"
1318        );
1319
1320        let text = if !audio_attachments.is_empty()
1321            && let Some(stt) = self.runtime.providers.stt.as_ref()
1322        {
1323            let mut transcribed_parts = Vec::new();
1324            for attachment in &audio_attachments {
1325                if attachment.data.len() > MAX_AUDIO_BYTES {
1326                    tracing::warn!(
1327                        size = attachment.data.len(),
1328                        max = MAX_AUDIO_BYTES,
1329                        "audio attachment exceeds size limit, skipping"
1330                    );
1331                    continue;
1332                }
1333                match stt
1334                    .transcribe(&attachment.data, attachment.filename.as_deref())
1335                    .await
1336                {
1337                    Ok(result) => {
1338                        tracing::info!(
1339                            len = result.text.len(),
1340                            language = ?result.language,
1341                            "audio transcribed"
1342                        );
1343                        transcribed_parts.push(result.text);
1344                    }
1345                    Err(e) => {
1346                        tracing::error!(error = %e, "audio transcription failed");
1347                    }
1348                }
1349            }
1350            if transcribed_parts.is_empty() {
1351                text_base
1352            } else {
1353                let transcribed = transcribed_parts.join("\n");
1354                if text_base.is_empty() {
1355                    transcribed
1356                } else {
1357                    format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
1358                }
1359            }
1360        } else {
1361            if !audio_attachments.is_empty() {
1362                tracing::warn!(
1363                    count = audio_attachments.len(),
1364                    "audio attachments received but no STT provider configured, dropping"
1365                );
1366            }
1367            text_base
1368        };
1369
1370        let mut image_parts = Vec::new();
1371        for attachment in image_attachments {
1372            if attachment.data.len() > MAX_IMAGE_BYTES {
1373                tracing::warn!(
1374                    size = attachment.data.len(),
1375                    max = MAX_IMAGE_BYTES,
1376                    "image attachment exceeds size limit, skipping"
1377                );
1378                continue;
1379            }
1380            let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
1381            image_parts.push(MessagePart::Image(Box::new(ImageData {
1382                data: attachment.data,
1383                mime_type,
1384            })));
1385        }
1386
1387        (text, image_parts)
1388    }
1389
1390    /// Create a new [`Turn`] for the given input and advance the turn counter.
1391    ///
1392    /// Clears per-turn state that must not carry over between turns:
1393    /// - per-turn `CancellationToken` (new token for each turn)
1394    /// - per-turn URL set in `SecurityState` (cleared here; re-populated in
1395    ///   `process_user_message_inner` after security checks)
1396    fn begin_turn(&mut self, input: turn::TurnInput) -> turn::Turn {
1397        let id = turn::TurnId(self.runtime.debug.iteration_counter as u64);
1398        self.runtime.debug.iteration_counter += 1;
1399        let cancel_token = CancellationToken::new();
1400        // keep agent-wide token in sync with per-turn token — TODO(#3498): consolidate in Phase 2
1401        self.runtime.lifecycle.cancel_token = cancel_token.clone();
1402        self.services.security.user_provided_urls.write().clear();
1403        // Reset per-turn LLM request counter for the notification gate.
1404        self.runtime.lifecycle.turn_llm_requests = 0;
1405
1406        // Spec 050 §2: drain pending risk signals from executor layers before advancing.
1407        {
1408            let pending: Vec<u8> = {
1409                let mut q = self.services.security.trajectory_signal_queue.lock();
1410                std::mem::take(&mut *q)
1411            };
1412            for code in pending {
1413                self.services
1414                    .security
1415                    .trajectory
1416                    .record(crate::agent::trajectory::RiskSignal::from_code(code));
1417            }
1418        }
1419        // Spec 050 Invariant 2: advance trajectory sentinel BEFORE any gate evaluation.
1420        // F5: write auto-recover audit entry when sentinel hard-resets.
1421        if self.services.security.trajectory.advance_turn()
1422            && let Some(logger) = self.tool_orchestrator.audit_logger.clone()
1423        {
1424            let entry = zeph_tools::AuditEntry {
1425                timestamp: zeph_tools::chrono_now(),
1426                tool: "<sentinel>".to_owned().into(),
1427                command: String::new(),
1428                result: zeph_tools::AuditResult::Success,
1429                duration_ms: 0,
1430                error_category: Some("trajectory_auto_recover".to_owned()),
1431                error_domain: Some("security".to_owned()),
1432                error_phase: None,
1433                claim_source: None,
1434                mcp_server_id: None,
1435                injection_flagged: false,
1436                embedding_anomalous: false,
1437                cross_boundary_mcp_to_acp: false,
1438                adversarial_policy_decision: None,
1439                exit_code: None,
1440                truncated: false,
1441                caller_id: None,
1442                policy_match: None,
1443                correlation_id: None,
1444                vigil_risk: None,
1445                execution_env: None,
1446                resolved_cwd: None,
1447                scope_at_definition: None,
1448                scope_at_dispatch: None,
1449            };
1450            self.runtime.lifecycle.supervisor.spawn(
1451                crate::agent::agent_supervisor::TaskClass::Telemetry,
1452                "trajectory-auto-recover-audit",
1453                async move { logger.log(&entry).await },
1454            );
1455        }
1456        // Spec 050 Phase 2: reset per-turn probe counter before any tool dispatch.
1457        if let Some(ref sentinel) = self.services.security.shadow_sentinel {
1458            sentinel.advance_turn();
1459        }
1460        // Reset per-turn risk chain state so scores don't bleed across turns.
1461        if let Some(ref acc) = self.services.security.risk_chain_accumulator {
1462            acc.reset();
1463        }
1464        // Publish updated risk level to the shared slot so PolicyGateExecutor can read it.
1465        let risk_level = self.services.security.trajectory.current_risk();
1466        *self.services.security.trajectory_risk_slot.write() = u8::from(risk_level);
1467        // TUI/CLI: emit a status indicator when risk reaches High or Critical (NFR-CG-006).
1468        if let Some(alert) = self.services.security.trajectory.poll_alert() {
1469            let msg = format!(
1470                "[trajectory] Risk level: {:?} (score={:.2})",
1471                alert.level, alert.score
1472            );
1473            tracing::warn!(
1474                level = ?alert.level,
1475                score = alert.score,
1476                "trajectory sentinel alert"
1477            );
1478            if let Some(ref tx) = self.services.session.status_tx {
1479                let _ = tx.send(msg);
1480            }
1481        }
1482
1483        let context = turn::TurnContext::new(id, cancel_token, self.runtime.config.timeouts)
1484            .with_tool_allowlist(self.runtime.config.channel_tool_allowlist.clone());
1485        turn::Turn::new(context, input)
1486    }
1487
1488    /// Finalise a turn: copy accumulated timings into `MetricsState` and flush.
1489    ///
1490    /// Must be called exactly once per turn, after `process_user_message_inner` returns
1491    /// (regardless of success or error). Corresponds to the M2 resolution in the spec:
1492    /// `TurnMetrics.timings` is the single source of truth; `MetricsState.pending_timings`
1493    /// is populated from it here so the rest of the pipeline is unchanged.
1494    fn end_turn(&mut self, turn: turn::Turn) {
1495        self.runtime.metrics.pending_timings = turn.metrics.timings;
1496        self.flush_turn_timings();
1497        // Clear per-turn intent (FR-008): must not persist across turns.
1498        self.services.session.current_turn_intent = None;
1499        // Clear guest context flag: each turn is independently classified.
1500        self.services.session.is_guest_context = false;
1501        // Cancel all in-flight speculative handles at turn boundary.
1502        if let Some(ref engine) = self.services.speculation_engine {
1503            let metrics = engine.end_turn();
1504            if metrics.committed > 0 || metrics.cancelled > 0 {
1505                tracing::debug!(
1506                    committed = metrics.committed,
1507                    cancelled = metrics.cancelled,
1508                    wasted_ms = metrics.wasted_ms,
1509                    "speculation: turn boundary metrics"
1510                );
1511            }
1512        }
1513    }
1514
1515    #[tracing::instrument(
1516        name = "core.agent.process_user_message",
1517        skip_all,
1518        level = "debug",
1519        fields(turn_id),
1520        err
1521    )]
1522    async fn process_user_message(
1523        &mut self,
1524        text: String,
1525        image_parts: Vec<zeph_llm::provider::MessagePart>,
1526    ) -> Result<(), error::AgentError> {
1527        let input = turn::TurnInput::new(text, image_parts);
1528        let mut t = self.begin_turn(input);
1529
1530        let turn_idx = usize::try_from(t.id().0).unwrap_or(usize::MAX);
1531        tracing::Span::current().record("turn_id", t.id().0);
1532        // Record iteration start in trace collector (C-02: owned guard, no borrow held).
1533        self.runtime
1534            .debug
1535            .start_iteration_span(turn_idx, t.input.text.trim());
1536
1537        let result = Box::pin(self.process_user_message_inner(&mut t)).await;
1538
1539        // Close iteration span regardless of outcome (partial trace preserved on error).
1540        let span_status = if result.is_ok() {
1541            crate::debug_dump::trace::SpanStatus::Ok
1542        } else {
1543            crate::debug_dump::trace::SpanStatus::Error {
1544                message: "iteration failed".to_owned(),
1545            }
1546        };
1547        self.runtime.debug.end_iteration_span(turn_idx, span_status);
1548
1549        self.end_turn(t);
1550        result
1551    }
1552
1553    #[tracing::instrument(
1554        name = "core.agent.process_user_message_inner",
1555        skip_all,
1556        level = "debug",
1557        err
1558    )]
1559    async fn process_user_message_inner(
1560        &mut self,
1561        turn: &mut turn::Turn,
1562    ) -> Result<(), error::AgentError> {
1563        self.reap_background_tasks_and_update_metrics();
1564
1565        let tokens_before_turn = self
1566            .runtime
1567            .metrics
1568            .metrics_tx
1569            .as_ref()
1570            .map_or(0, |tx| tx.borrow().total_tokens);
1571
1572        // Drain any background shell completions that arrived since the last turn.
1573        // They are buffered in `pending_background_completions` and merged with the
1574        // real user message into a single user-role block below (N1 invariant).
1575        self.drain_background_completions();
1576
1577        self.wire_cancel_bridge(turn.cancel_token());
1578
1579        // Clone text out of Turn so we can hold both `&str` borrows and mutate turn.metrics.
1580        let text = turn.input.text.clone();
1581        let trimmed_owned = text.trim().to_owned();
1582        let trimmed = trimmed_owned.as_str();
1583
1584        // Capture current-turn intent for VIGIL gate (FR-007). Truncated to 1024 chars.
1585        // Must be set BEFORE any tool call; cleared at end_turn (FR-008).
1586        if self.services.security.vigil.is_some() {
1587            let intent_len = trimmed.floor_char_boundary(1024.min(trimmed.len()));
1588            self.services.session.current_turn_intent = Some(trimmed[..intent_len].to_owned());
1589        }
1590
1591        if let Some(result) = self.dispatch_slash_command(trimmed).await {
1592            return result;
1593        }
1594
1595        self.check_pending_rollbacks().await;
1596
1597        if self.pre_process_security(trimmed).await? {
1598            return Ok(());
1599        }
1600
1601        let t_ctx = std::time::Instant::now();
1602        tracing::debug!("turn timing: prepare_context start");
1603        self.advance_context_lifecycle_guarded(&text, trimmed).await;
1604        turn.metrics_mut().timings.prepare_context_ms =
1605            u64::try_from(t_ctx.elapsed().as_millis()).unwrap_or(u64::MAX);
1606        tracing::debug!(
1607            ms = turn.metrics_snapshot().timings.prepare_context_ms,
1608            "turn timing: prepare_context done"
1609        );
1610        // Emit projected token count so TUI can display it before the LLM call.
1611        let _ = self
1612            .channel
1613            .send_context_estimate(
1614                usize::try_from(self.runtime.providers.cached_prompt_tokens).unwrap_or(usize::MAX),
1615            )
1616            .await;
1617
1618        let image_parts = std::mem::take(&mut turn.input.image_parts);
1619        // Prepend any background completion blocks to the user text. All completions and the
1620        // user message MUST be merged into a single user-role block to satisfy the strict
1621        // user/assistant alternation rule (Anthropic Messages API — N1 invariant).
1622        let merged_text = self.build_user_message_text_with_bg_completions(&text);
1623        let user_msg = self.build_user_message(&merged_text, image_parts);
1624
1625        // Extract URLs from user input and add to user_provided_urls for grounding checks.
1626        // URL set was cleared in begin_turn; re-populate for this turn.
1627        let urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
1628        if !urls.is_empty() {
1629            self.services
1630                .security
1631                .user_provided_urls
1632                .write()
1633                .extend(urls);
1634        }
1635
1636        // Capture raw user input as goal text for A-MAC goal-conditioned write gating (#2483).
1637        // Derived from the raw input text before context assembly to avoid timing dependencies.
1638        self.services.memory.extraction.goal_text = Some(text.clone());
1639
1640        let t_persist = std::time::Instant::now();
1641        tracing::debug!("turn timing: persist_message(user) start");
1642        // Image parts intentionally excluded — base64 payloads too large for message history.
1643        self.persist_message(Role::User, &text, &[], false).await;
1644        turn.metrics_mut().timings.persist_message_ms =
1645            u64::try_from(t_persist.elapsed().as_millis()).unwrap_or(u64::MAX);
1646        tracing::debug!(
1647            ms = turn.metrics_snapshot().timings.persist_message_ms,
1648            "turn timing: persist_message(user) done"
1649        );
1650        self.push_message(user_msg);
1651
1652        // Emit pre-LLM context size so the TUI gauge is non-zero before the provider responds.
1653        let context_estimate = self.runtime.providers.cached_prompt_tokens;
1654        self.update_metrics(|m| m.context_tokens = context_estimate);
1655
1656        // llm_chat_ms and tool_exec_ms are accumulated inside call_chat_with_tools and
1657        // handle_native_tool_calls respectively via metrics.pending_timings.
1658        tracing::debug!("turn timing: process_response start");
1659        let turn_had_error = if let Err(e) = self.process_response().await {
1660            // Detach any in-flight learning tasks before mutating message state.
1661            self.services.learning_engine.learning_tasks.detach_all();
1662            tracing::error!("Response processing failed: {e:#}");
1663
1664            // Record provider failure timestamp so the next turn can skip
1665            // expensive context preparation while providers are known-down.
1666            if e.is_no_providers() {
1667                self.runtime.lifecycle.last_no_providers_at = Some(std::time::Instant::now());
1668                let backoff_secs = self.runtime.config.timeouts.no_providers_backoff_secs;
1669                tracing::warn!(
1670                    backoff_secs,
1671                    "no providers available; backing off before next turn"
1672                );
1673                tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await;
1674            }
1675
1676            let user_msg = format!("Error: {e:#}");
1677            self.channel.send(&user_msg).await?;
1678            self.msg.messages.pop();
1679            self.recompute_prompt_tokens();
1680            self.channel.flush_chunks().await?;
1681            true
1682        } else {
1683            // Detach learning tasks spawned this turn — they are fire-and-forget and must not
1684            // leak into the next turn's context.
1685            self.services.learning_engine.learning_tasks.detach_all();
1686            self.truncate_old_tool_results();
1687            // MagicDocs: spawn background doc updates if any are due (#2702).
1688            self.maybe_update_magic_docs();
1689            // Compression spectrum: fire-and-forget promotion scan (#3305).
1690            self.maybe_spawn_promotion_scan();
1691            false
1692        };
1693        tracing::debug!("turn timing: process_response done");
1694
1695        // MARCH self-check hook: runs after every successful response, including cache-hit path.
1696        if let Some(pipeline) = self.services.quality.clone() {
1697            self.run_self_check_for_turn(pipeline, turn.id().0).await;
1698        }
1699        // Flush pending response chunks and emit ResponseEnd exactly once per turn.
1700        // send() no longer emits ResponseEnd — flush_chunks() is the sole emitter.
1701        // When self-check appends a flag_marker chunk, this single call covers both
1702        // the main response and the marker, preventing the double response_end of #3243.
1703        let _ = self.channel.flush_chunks().await;
1704
1705        self.maybe_fire_completion_notification(turn, turn_had_error);
1706
1707        self.flush_goal_accounting(tokens_before_turn);
1708
1709        // Collect llm_chat_ms and tool_exec_ms from MetricsState.pending_timings (accumulated
1710        // by the tool execution chain) into turn.metrics so end_turn can flush them.
1711        // This is the Phase 1 bridging: existing code writes to pending_timings directly;
1712        // we harvest those values into Turn before end_turn overwrites pending_timings.
1713        turn.metrics_mut().timings.llm_chat_ms = self.runtime.metrics.pending_timings.llm_chat_ms;
1714        turn.metrics_mut().timings.tool_exec_ms = self.runtime.metrics.pending_timings.tool_exec_ms;
1715
1716        Ok(())
1717    }
1718
1719    /// Wire the per-turn cancellation token into the cancel bridge.
1720    ///
1721    /// The bridge translates `cancel_signal` (Notify) into a `CancellationToken` cancel so that
1722    /// channel-level abort requests propagate to the in-flight LLM call. The previous bridge task
1723    /// is aborted before a new one is spawned to prevent unbounded accumulation (#2737).
1724    fn wire_cancel_bridge(&mut self, turn_token: &tokio_util::sync::CancellationToken) {
1725        let signal = Arc::clone(&self.runtime.lifecycle.cancel_signal);
1726        let token = turn_token.clone();
1727        // Keep lifecycle.cancel_token in sync so existing code that reads it still works.
1728        self.runtime.lifecycle.cancel_token = turn_token.clone();
1729        if let Some(prev) = self.runtime.lifecycle.cancel_bridge_handle.take() {
1730            prev.abort();
1731        }
1732        self.runtime.lifecycle.cancel_bridge_handle =
1733            Some(self.runtime.lifecycle.task_supervisor.spawn_oneshot(
1734                std::sync::Arc::from("agent.lifecycle.cancel_bridge"),
1735                move || async move {
1736                    signal.notified().await;
1737                    token.cancel();
1738                },
1739            ));
1740    }
1741
1742    /// Reap completed background tasks, apply summarization signal, and update supervisor metrics.
1743    ///
1744    /// Called at the top of each turn, before any user message processing.
1745    fn reap_background_tasks_and_update_metrics(&mut self) {
1746        let bg_signal = self.runtime.lifecycle.supervisor.reap();
1747        if bg_signal.did_summarize {
1748            self.services.memory.persistence.unsummarized_count = 0;
1749            tracing::debug!("background summarization completed; unsummarized_count reset");
1750        }
1751        let snap = self.runtime.lifecycle.supervisor.metrics_snapshot();
1752        self.update_metrics(|m| {
1753            m.bg_inflight = snap.inflight as u64;
1754            m.bg_dropped = snap.total_dropped();
1755            m.bg_completed = snap.total_completed();
1756            m.bg_enrichment_inflight = snap.class_inflight[0] as u64;
1757            m.bg_telemetry_inflight = snap.class_inflight[1] as u64;
1758        });
1759
1760        // Update shell background run rows for TUI panel.
1761        if self.runtime.lifecycle.shell_executor_handle.is_some() {
1762            let shell_rows: Vec<crate::metrics::ShellBackgroundRunRow> = self
1763                .runtime
1764                .lifecycle
1765                .shell_executor_handle
1766                .as_ref()
1767                .map(|e| e.background_runs_snapshot())
1768                .unwrap_or_default()
1769                .into_iter()
1770                .map(|s| crate::metrics::ShellBackgroundRunRow {
1771                    run_id: truncate_shell_run_id(&s.run_id),
1772                    command: truncate_shell_command(&s.command),
1773                    elapsed_secs: s.elapsed_ms / 1000,
1774                })
1775                .collect();
1776            self.update_metrics(|m| {
1777                m.shell_background_runs = shell_rows;
1778            });
1779        }
1780
1781        // Intentional ordering: reap() runs before abort_class() so completed tasks are
1782        // accounted in the snapshot above.
1783        if self
1784            .runtime
1785            .config
1786            .supervisor_config
1787            .abort_enrichment_on_turn
1788        {
1789            self.runtime
1790                .lifecycle
1791                .supervisor
1792                .abort_class(agent_supervisor::TaskClass::Enrichment);
1793        }
1794    }
1795
1796    /// Fire completion notifications and `turn_complete` hooks after each turn.
1797    ///
1798    /// Builds [`crate::notifications::TurnSummary`] once and reuses it for both the
1799    /// [`crate::notifications::Notifier`] and any `[[hooks.turn_complete]]` entries. The
1800    /// `preview` field is already redacted by [`Self::last_assistant_preview`], so hook
1801    /// env vars carry no raw assistant output.
1802    ///
1803    /// Gating:
1804    /// - When a `Notifier` is configured, both the notifier and hooks share its
1805    ///   `should_fire` gate (`min_turn_duration_ms`, `only_on_error`, `enabled`).
1806    /// - When no `Notifier` is configured, hooks fire on every turn completion (the
1807    ///   notifier path is simply skipped).
1808    fn maybe_fire_completion_notification(&mut self, turn: &turn::Turn, is_error: bool) {
1809        let snap = turn.metrics_snapshot().timings.clone();
1810        let duration_ms = snap
1811            .prepare_context_ms
1812            .saturating_add(snap.llm_chat_ms)
1813            .saturating_add(snap.tool_exec_ms);
1814        let summary = crate::notifications::TurnSummary {
1815            duration_ms,
1816            preview: self.last_assistant_preview(160),
1817            // TODO: wire turn_tool_calls counter once LifecycleState tracks it (Phase 2).
1818            tool_calls: 0,
1819            llm_requests: self.runtime.lifecycle.turn_llm_requests,
1820            exit_status: if is_error {
1821                crate::notifications::TurnExitStatus::Error
1822            } else {
1823                crate::notifications::TurnExitStatus::Success
1824            },
1825        };
1826
1827        // Gate evaluation: notifier's should_fire result (or unconditional when absent).
1828        let gate_ok = self
1829            .runtime
1830            .lifecycle
1831            .notifier
1832            .as_ref()
1833            .is_none_or(|n| n.should_fire(&summary));
1834
1835        // 1) Existing notifier path — unchanged semantics.
1836        if let Some(ref notifier) = self.runtime.lifecycle.notifier
1837            && gate_ok
1838        {
1839            notifier.fire(&summary);
1840        }
1841
1842        // 2) turn_complete hooks — fire-and-forget via supervisor.
1843        // McpManagerDispatch wraps Arc<McpManager> and is 'static, so it can be moved
1844        // into the async block satisfying tokio::spawn's bound. The &dyn McpDispatch
1845        // borrow is created inside the future from the owned dispatch value.
1846        let hooks = self.services.session.hooks_config.turn_complete.clone();
1847        if !hooks.is_empty() && gate_ok {
1848            let mut env = std::collections::HashMap::new();
1849            env.insert(
1850                "ZEPH_TURN_DURATION_MS".to_owned(),
1851                summary.duration_ms.to_string(),
1852            );
1853            env.insert(
1854                "ZEPH_TURN_STATUS".to_owned(),
1855                if is_error { "error" } else { "success" }.to_owned(),
1856            );
1857            env.insert("ZEPH_TURN_PREVIEW".to_owned(), summary.preview.clone());
1858            env.insert(
1859                "ZEPH_TURN_LLM_REQUESTS".to_owned(),
1860                summary.llm_requests.to_string(),
1861            );
1862            let dispatch = self.mcp_dispatch();
1863            let _span = tracing::info_span!("core.agent.turn_hooks").entered();
1864            let _accepted = self.runtime.lifecycle.supervisor.spawn(
1865                agent_supervisor::TaskClass::Telemetry,
1866                "turn-complete-hooks",
1867                async move {
1868                    let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
1869                        .as_ref()
1870                        .map(|d| d as &dyn zeph_subagent::McpDispatch);
1871                    if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp, None).await
1872                    {
1873                        tracing::warn!(error = %e, "turn_complete hook failed");
1874                    }
1875                },
1876            );
1877        }
1878    }
1879
1880    /// Publish the active goal snapshot to `MetricsSnapshot` and fire `on_turn_complete`
1881    /// accounting as a tracked background task.
1882    fn flush_goal_accounting(&mut self, tokens_before: u64) {
1883        let goal_snap = self
1884            .services
1885            .goal_accounting
1886            .as_ref()
1887            .and_then(|a| a.snapshot());
1888        self.update_metrics(|m| m.active_goal = goal_snap);
1889
1890        if let Some(ref accounting) = self.services.goal_accounting {
1891            let tokens_after = self
1892                .runtime
1893                .metrics
1894                .metrics_tx
1895                .as_ref()
1896                .map_or(0, |tx| tx.borrow().total_tokens);
1897            let turn_tokens = tokens_after.saturating_sub(tokens_before);
1898            let mut spawned: Option<
1899                std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + 'static>>,
1900            > = None;
1901            accounting.on_turn_complete(turn_tokens, |fut| {
1902                spawned = Some(fut);
1903            });
1904            if let Some(fut) = spawned {
1905                let _ = self.runtime.lifecycle.supervisor.spawn(
1906                    agent_supervisor::TaskClass::Telemetry,
1907                    "goal-accounting",
1908                    fut,
1909                );
1910            }
1911        }
1912    }
1913
1914    // Returns true if the input was blocked and the caller should return Ok(()) immediately.
1915    #[tracing::instrument(
1916        name = "core.agent.pre_process_security",
1917        skip_all,
1918        level = "debug",
1919        err
1920    )]
1921    async fn pre_process_security(&mut self, trimmed: &str) -> Result<bool, error::AgentError> {
1922        // Guardrail: LLM-based prompt injection pre-screening at the user input boundary.
1923        if let Some(ref guardrail) = self.services.security.guardrail {
1924            use zeph_sanitizer::guardrail::GuardrailVerdict;
1925            let verdict = guardrail.check(trimmed).await;
1926            match &verdict {
1927                GuardrailVerdict::Flagged { reason, .. } => {
1928                    tracing::warn!(
1929                        reason = %reason,
1930                        should_block = verdict.should_block(),
1931                        "guardrail flagged user input"
1932                    );
1933                    if verdict.should_block() {
1934                        let msg = format!("[guardrail] Input blocked: {reason}");
1935                        let _ = self.channel.send(&msg).await;
1936                        let _ = self.channel.flush_chunks().await;
1937                        return Ok(true);
1938                    }
1939                    // Warn mode: notify but continue.
1940                    let _ = self
1941                        .channel
1942                        .send(&format!("[guardrail] Warning: {reason}"))
1943                        .await;
1944                }
1945                GuardrailVerdict::Error { error } => {
1946                    if guardrail.error_should_block() {
1947                        tracing::warn!(%error, "guardrail check failed (fail_strategy=closed), blocking input");
1948                        let msg = "[guardrail] Input blocked: check failed (see logs for details)";
1949                        let _ = self.channel.send(msg).await;
1950                        let _ = self.channel.flush_chunks().await;
1951                        return Ok(true);
1952                    }
1953                    tracing::warn!(%error, "guardrail check failed (fail_strategy=open), allowing input");
1954                }
1955                GuardrailVerdict::Safe => {}
1956            }
1957        }
1958
1959        // ML classifier: lightweight injection detection on user input boundary.
1960        // Runs after guardrail (LLM-based) to layer defenses. On detection, blocks and returns.
1961        // Falls back to regex on classifier error/timeout — never degrades below regex baseline.
1962        // Gated by `scan_user_input`: DeBERTa is tuned for external/untrusted content, not
1963        // direct user chat. Disabled by default to prevent false positives on benign messages.
1964        #[cfg(feature = "classifiers")]
1965        if self.services.security.sanitizer.scan_user_input() {
1966            match self
1967                .services
1968                .security
1969                .sanitizer
1970                .classify_injection(trimmed)
1971                .await
1972            {
1973                zeph_sanitizer::InjectionVerdict::Blocked => {
1974                    self.push_classifier_metrics();
1975                    let _ = self
1976                        .channel
1977                        .send("[security] Input blocked: injection detected by classifier.")
1978                        .await;
1979                    let _ = self.channel.flush_chunks().await;
1980                    return Ok(true);
1981                }
1982                zeph_sanitizer::InjectionVerdict::Suspicious => {
1983                    tracing::warn!("injection_classifier soft_signal on user input");
1984                }
1985                zeph_sanitizer::InjectionVerdict::Clean => {}
1986            }
1987        }
1988        #[cfg(feature = "classifiers")]
1989        self.push_classifier_metrics();
1990
1991        Ok(false)
1992    }
1993
1994    /// Run `advance_context_lifecycle` with provider-health gating and a wall-clock timeout.
1995    ///
1996    /// Skips context preparation entirely when providers failed on the previous turn and the
1997    /// `no_providers_backoff_secs` window has not yet elapsed. When providers are available,
1998    /// wraps the call with `context_prep_timeout_secs` to prevent a stall when embed backends
1999    /// are rate-limited or unavailable (#3357).
2000    async fn advance_context_lifecycle_guarded(&mut self, text: &str, trimmed: &str) {
2001        let backoff_secs = self.runtime.config.timeouts.no_providers_backoff_secs;
2002        let prep_timeout_secs = self.runtime.config.timeouts.context_prep_timeout_secs;
2003
2004        // Skip expensive memory recall / embedding when providers are known-down.
2005        let providers_recently_failed = self
2006            .runtime
2007            .lifecycle
2008            .last_no_providers_at
2009            .is_some_and(|t| t.elapsed().as_secs() < backoff_secs);
2010
2011        if providers_recently_failed {
2012            tracing::warn!(
2013                backoff_secs,
2014                "skipping context preparation: providers were unavailable on last turn"
2015            );
2016            return;
2017        }
2018
2019        let timeout_dur = std::time::Duration::from_secs(prep_timeout_secs);
2020        match tokio::time::timeout(timeout_dur, self.advance_context_lifecycle(text, trimmed)).await
2021        {
2022            Ok(()) => {}
2023            Err(_elapsed) => {
2024                tracing::warn!(
2025                    timeout_secs = prep_timeout_secs,
2026                    "context preparation timed out; proceeding with degraded context"
2027                );
2028            }
2029        }
2030    }
2031
2032    #[tracing::instrument(
2033        name = "core.agent.advance_context_lifecycle",
2034        skip_all,
2035        level = "debug"
2036    )]
2037    async fn advance_context_lifecycle(&mut self, text: &str, trimmed: &str) {
2038        // Reset per-message pruning cache at the start of each turn (#2298).
2039        self.services.mcp.pruning_cache.reset();
2040
2041        // Extract before rebuild_system_prompt so the value is not tainted
2042        // by the secrets-bearing system prompt (ConversationId is just an i64).
2043        let conv_id = self.services.memory.persistence.conversation_id;
2044        self.rebuild_system_prompt(text).await;
2045
2046        self.detect_and_record_corrections(trimmed, conv_id).await;
2047        self.services.learning_engine.tick();
2048        self.analyze_and_learn().await;
2049        self.sync_graph_counts().await;
2050
2051        // Reset per-turn compaction guard FIRST so SideQuest sees a clean slate (C2 fix).
2052        // complete_focus and maybe_sidequest_eviction set this flag when they run (C1 fix).
2053        // advance_turn() transitions CompactedThisTurn → Cooling/Ready; all other states
2054        // pass through unchanged. See CompactionState::advance_turn for ordering guarantees.
2055        self.context_manager
2056            .set_compaction_state(self.context_manager.compaction_state().advance_turn());
2057
2058        // Tick Focus Agent and SideQuest turn counters (#1850, #1885).
2059        {
2060            self.services.focus.tick();
2061
2062            // SideQuest eviction: runs every N user turns when enabled.
2063            // Skipped when is_compacted_this_turn (focus truncation or prior eviction ran).
2064            let sidequest_should_fire = self.services.sidequest.tick();
2065            if sidequest_should_fire
2066                && !self
2067                    .context_manager
2068                    .compaction_state()
2069                    .is_compacted_this_turn()
2070            {
2071                self.maybe_sidequest_eviction();
2072            }
2073        }
2074
2075        // Experience memory: evolution sweep (fire-and-forget). Runs every N user turns,
2076        // gated on graph + experience config, and only when both stores are attached.
2077        {
2078            let cfg = &self.services.memory.extraction.graph_config.experience;
2079            if cfg.enabled
2080                && cfg.evolution_sweep_enabled
2081                && cfg.evolution_sweep_interval > 0
2082                && self
2083                    .services
2084                    .sidequest
2085                    .turn_counter
2086                    .checked_rem(cfg.evolution_sweep_interval as u64)
2087                    == Some(0)
2088                && let Some(memory) = self.services.memory.persistence.memory.as_ref()
2089                && let (Some(exp), Some(graph)) =
2090                    (memory.experience.as_ref(), memory.graph_store.as_ref())
2091            {
2092                let exp = std::sync::Arc::clone(exp);
2093                let graph = std::sync::Arc::clone(graph);
2094                let threshold = cfg.confidence_prune_threshold;
2095                let turn = self.services.sidequest.turn_counter;
2096                let accepted = self.runtime.lifecycle.supervisor.spawn(
2097                    agent_supervisor::TaskClass::Telemetry,
2098                    "experience-sweep",
2099                    async move {
2100                        match exp.evolution_sweep(graph.as_ref(), threshold).await {
2101                            Ok(stats) => tracing::info!(
2102                                turn,
2103                                self_loops = stats.pruned_self_loops,
2104                                low_confidence = stats.pruned_low_confidence,
2105                                "evolution sweep complete",
2106                            ),
2107                            Err(e) => tracing::warn!(
2108                                turn,
2109                                error = %e,
2110                                "evolution sweep failed",
2111                            ),
2112                        }
2113                    },
2114                );
2115                if !accepted {
2116                    tracing::warn!(
2117                        turn = self.services.sidequest.turn_counter,
2118                        "experience-sweep dropped (telemetry class at capacity)",
2119                    );
2120                }
2121            }
2122        }
2123
2124        // Cache-expiry warning (#2715): notify user when prompt cache has likely expired.
2125        if let Some(warning) = self.cache_expiry_warning() {
2126            tracing::info!(warning, "cache expiry warning");
2127            let _ = self.channel.send_status(&warning).await;
2128        }
2129
2130        // Time-based microcompact (#2699): strip stale low-value tool outputs before compaction.
2131        // Zero-LLM-cost; runs only when session gap exceeds configured threshold.
2132        self.maybe_time_based_microcompact();
2133
2134        // Tier 0: batch-apply deferred tool summaries when approaching context limit.
2135        // This is a pure in-memory operation (no LLM call) — summaries were pre-computed
2136        // during the tool loop. Intentionally does NOT set compacted_this_turn, so
2137        // proactive/reactive compaction may still fire if tokens remain above their thresholds.
2138        self.maybe_apply_deferred_summaries();
2139        self.flush_deferred_summaries().await;
2140
2141        // Proactive compression fires first (if configured); if it runs, reactive is skipped.
2142        if let Err(e) = self.maybe_proactive_compress().await {
2143            tracing::warn!("proactive compression failed: {e:#}");
2144        }
2145
2146        if let Err(e) = self.maybe_compact().await {
2147            tracing::warn!("context compaction failed: {e:#}");
2148        }
2149
2150        if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
2151            tracing::warn!("context preparation failed: {e:#}");
2152        }
2153
2154        // MAR: propagate top-1 recall confidence to the router for cost-aware routing.
2155        self.provider
2156            .set_memory_confidence(self.services.memory.persistence.last_recall_confidence);
2157
2158        self.services.learning_engine.reset_reflection();
2159    }
2160
2161    fn build_user_message(
2162        &mut self,
2163        text: &str,
2164        image_parts: Vec<zeph_llm::provider::MessagePart>,
2165    ) -> Message {
2166        let mut all_image_parts = std::mem::take(&mut self.msg.pending_image_parts);
2167        all_image_parts.extend(image_parts);
2168
2169        if !all_image_parts.is_empty() && self.provider.supports_vision() {
2170            let mut parts = vec![zeph_llm::provider::MessagePart::Text {
2171                text: text.to_owned(),
2172            }];
2173            parts.extend(all_image_parts);
2174            Message::from_parts(Role::User, parts)
2175        } else {
2176            if !all_image_parts.is_empty() {
2177                tracing::warn!(
2178                    count = all_image_parts.len(),
2179                    "image attachments dropped: provider does not support vision"
2180                );
2181            }
2182            Message {
2183                role: Role::User,
2184                content: text.to_owned(),
2185                parts: vec![],
2186                metadata: MessageMetadata::default(),
2187            }
2188        }
2189    }
2190
2191    /// Drain any ready [`zeph_tools::BackgroundCompletion`]s from the channel into
2192    /// `pending_background_completions`. Bounded by `BACKGROUND_COMPLETION_BUFFER_CAP`;
2193    /// on overflow the oldest entry is evicted and a placeholder is inserted.
2194    fn drain_background_completions(&mut self) {
2195        const BACKGROUND_COMPLETION_BUFFER_CAP: usize = 16;
2196
2197        let Some(ref mut rx) = self.runtime.lifecycle.background_completion_rx else {
2198            return;
2199        };
2200        // Non-blocking drain: collect all completions that are already ready.
2201        while let Ok(completion) = rx.try_recv() {
2202            if self.runtime.lifecycle.pending_background_completions.len()
2203                >= BACKGROUND_COMPLETION_BUFFER_CAP
2204            {
2205                tracing::warn!(
2206                    run_id = %completion.run_id,
2207                    "background completion buffer full; dropping run result"
2208                );
2209                // Buffer is full: drop the oldest queued completion and push a sentinel
2210                // for the new (incoming) run so the LLM is informed its result was lost.
2211                self.runtime
2212                    .lifecycle
2213                    .pending_background_completions
2214                    .pop_front();
2215                self.runtime
2216                    .lifecycle
2217                    .pending_background_completions
2218                    .push_back(zeph_tools::BackgroundCompletion {
2219                        run_id: completion.run_id,
2220                        exit_code: -1,
2221                        success: false,
2222                        elapsed_ms: 0,
2223                        command: completion.command,
2224                        output: format!(
2225                            "[background result for run {} dropped: buffer overflow]",
2226                            completion.run_id
2227                        ),
2228                    });
2229            } else {
2230                self.runtime
2231                    .lifecycle
2232                    .pending_background_completions
2233                    .push_back(completion);
2234            }
2235        }
2236    }
2237
2238    /// Format and drain `pending_background_completions` into a prefix string, then
2239    /// return the final merged text (prefix + user message). When there are no pending
2240    /// completions the original text is returned unchanged.
2241    fn build_user_message_text_with_bg_completions(&mut self, user_text: &str) -> String {
2242        if self
2243            .runtime
2244            .lifecycle
2245            .pending_background_completions
2246            .is_empty()
2247        {
2248            return user_text.to_owned();
2249        }
2250        let mut parts = String::new();
2251        for completion in self
2252            .runtime
2253            .lifecycle
2254            .pending_background_completions
2255            .drain(..)
2256        {
2257            let _ = write!(
2258                parts,
2259                "[Background task {} completed]\nexit_code: {}\nsuccess: {}\nelapsed_ms: {}\ncommand: {}\n\n{}\n\n",
2260                completion.run_id,
2261                completion.exit_code,
2262                completion.success,
2263                completion.elapsed_ms,
2264                completion.command,
2265                completion.output,
2266            );
2267        }
2268        parts.push_str(user_text);
2269        parts
2270    }
2271
2272    /// Poll a sub-agent until it reaches a terminal state, bridging secret requests to the
2273    /// channel. Returns a human-readable status string and success flag suitable for
2274    /// sending to the user and emitting lifecycle events.
2275    async fn poll_subagent_until_done(
2276        &mut self,
2277        task_id: &str,
2278        label: &str,
2279    ) -> Option<(String, bool)> {
2280        use zeph_subagent::SubAgentState;
2281        let result = loop {
2282            tokio::time::sleep(std::time::Duration::from_millis(500)).await;
2283
2284            // Bridge secret requests from sub-agent to channel.confirm().
2285            // Fetch the pending request first, then release the borrow before
2286            // calling channel.confirm() (which requires &mut self).
2287            #[allow(clippy::redundant_closure_for_method_calls)]
2288            let pending = self
2289                .services
2290                .orchestration
2291                .subagent_manager
2292                .as_mut()
2293                .and_then(|m| m.try_recv_secret_request());
2294            if let Some((req_task_id, req)) = pending {
2295                // req.secret_key is pre-validated to [a-zA-Z0-9_-] in manager.rs
2296                // (SEC-P1-02), so it is safe to embed in the prompt string.
2297                let confirm_prompt = format!(
2298                    "Sub-agent requests secret '{}'. Allow?",
2299                    crate::text::truncate_to_chars(&req.secret_key, 100)
2300                );
2301                let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
2302                if let Some(mgr) = self.services.orchestration.subagent_manager.as_mut() {
2303                    if approved {
2304                        let ttl = std::time::Duration::from_mins(5);
2305                        let key = req.secret_key.clone();
2306                        if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
2307                            let _ = mgr.deliver_secret(&req_task_id, key);
2308                        }
2309                    } else {
2310                        let _ = mgr.deny_secret(&req_task_id);
2311                    }
2312                }
2313            }
2314
2315            let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2316            let statuses = mgr.statuses();
2317            let Some((_, status)) = statuses.iter().find(|(id, _)| id == task_id) else {
2318                break (format!("{label} completed (no status available)."), true);
2319            };
2320            match status.state {
2321                SubAgentState::Completed => {
2322                    let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
2323                    break (format!("{label} completed: {msg}"), true);
2324                }
2325                SubAgentState::Failed => {
2326                    let msg = status
2327                        .last_message
2328                        .clone()
2329                        .unwrap_or_else(|| "unknown error".into());
2330                    break (format!("{label} failed: {msg}"), false);
2331                }
2332                SubAgentState::Canceled => {
2333                    break (format!("{label} was cancelled."), false);
2334                }
2335                _ => {
2336                    let _ = self
2337                        .channel
2338                        .send_status(&format!(
2339                            "{label}: turn {}/{}",
2340                            status.turns_used,
2341                            self.services
2342                                .orchestration
2343                                .subagent_manager
2344                                .as_ref()
2345                                .and_then(|m| m.agents_def(task_id))
2346                                .map_or(20, |d| d.permissions.max_turns)
2347                        ))
2348                        .await;
2349                }
2350            }
2351        };
2352        Some(result)
2353    }
2354
2355    /// Resolve a unique full `task_id` from a prefix. Returns `None` if the manager is absent,
2356    /// `Some(Err(msg))` on ambiguity/not-found, `Some(Ok(full_id))` on success.
2357    fn resolve_agent_id_prefix(&mut self, prefix: &str) -> Option<Result<String, String>> {
2358        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2359        let full_ids: Vec<String> = mgr
2360            .statuses()
2361            .into_iter()
2362            .map(|(tid, _)| tid)
2363            .filter(|tid| tid.starts_with(prefix))
2364            .collect();
2365        Some(match full_ids.as_slice() {
2366            [] => Err(format!("No sub-agent with id prefix '{prefix}'")),
2367            [fid] => Ok(fid.clone()),
2368            _ => Err(format!(
2369                "Ambiguous id prefix '{prefix}': matches {} agents",
2370                full_ids.len()
2371            )),
2372        })
2373    }
2374
2375    fn handle_agent_list(&self) -> Option<String> {
2376        use std::fmt::Write as _;
2377        let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2378        let defs = mgr.definitions();
2379        if defs.is_empty() {
2380            return Some("No sub-agent definitions found.".into());
2381        }
2382        let mut out = String::from("Available sub-agents:\n");
2383        for d in defs {
2384            let memory_label = match d.memory {
2385                Some(zeph_subagent::MemoryScope::User) => " [memory:user]",
2386                Some(zeph_subagent::MemoryScope::Project) => " [memory:project]",
2387                Some(zeph_subagent::MemoryScope::Local) => " [memory:local]",
2388                None => "",
2389            };
2390            if let Some(ref src) = d.source {
2391                let _ = writeln!(
2392                    out,
2393                    "  {}{} — {} ({})",
2394                    d.name, memory_label, d.description, src
2395                );
2396            } else {
2397                let _ = writeln!(out, "  {}{} — {}", d.name, memory_label, d.description);
2398            }
2399        }
2400        Some(out)
2401    }
2402
2403    fn handle_agent_status(&self) -> Option<String> {
2404        use std::fmt::Write as _;
2405        let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2406        let statuses = mgr.statuses();
2407        if statuses.is_empty() {
2408            return Some("No active sub-agents.".into());
2409        }
2410        let mut out = String::from("Active sub-agents:\n");
2411        for (id, s) in &statuses {
2412            let state = format!("{:?}", s.state).to_lowercase();
2413            let elapsed = s.started_at.elapsed().as_secs();
2414            let _ = writeln!(
2415                out,
2416                "  [{short}] {state}  turns={t}  elapsed={elapsed}s  {msg}",
2417                short = &id[..8.min(id.len())],
2418                t = s.turns_used,
2419                msg = s.last_message.as_deref().unwrap_or(""),
2420            );
2421            // Show memory directory path for agents with memory enabled.
2422            if let Some(def) = mgr.agents_def(id)
2423                && let Some(scope) = def.memory
2424                && let Ok(dir) = zeph_subagent::memory::resolve_memory_dir(scope, &def.name)
2425            {
2426                let _ = writeln!(out, "       memory: {}", dir.display());
2427            }
2428        }
2429        Some(out)
2430    }
2431
2432    fn handle_agent_approve(&mut self, id: &str) -> Option<String> {
2433        let full_id = match self.resolve_agent_id_prefix(id)? {
2434            Ok(fid) => fid,
2435            Err(msg) => return Some(msg),
2436        };
2437        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2438        if let Some((tid, req)) = mgr.try_recv_secret_request()
2439            && tid == full_id
2440        {
2441            let key = req.secret_key.clone();
2442            let ttl = std::time::Duration::from_mins(5);
2443            if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
2444                return Some(format!("Approve failed: {e}"));
2445            }
2446            if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
2447                return Some(format!("Secret delivery failed: {e}"));
2448            }
2449            return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
2450        }
2451        Some(format!(
2452            "No pending secret request for sub-agent '{full_id}'."
2453        ))
2454    }
2455
2456    fn handle_agent_deny(&mut self, id: &str) -> Option<String> {
2457        let full_id = match self.resolve_agent_id_prefix(id)? {
2458            Ok(fid) => fid,
2459            Err(msg) => return Some(msg),
2460        };
2461        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2462        match mgr.deny_secret(&full_id) {
2463            Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
2464            Err(e) => Some(format!("Deny failed: {e}")),
2465        }
2466    }
2467
2468    async fn handle_agent_command(&mut self, cmd: zeph_subagent::AgentCommand) -> Option<String> {
2469        use zeph_subagent::AgentCommand;
2470
2471        match cmd {
2472            AgentCommand::List => self.handle_agent_list(),
2473            AgentCommand::Background { name, prompt } => {
2474                self.handle_agent_background(&name, &prompt)
2475            }
2476            AgentCommand::Spawn { name, prompt }
2477            | AgentCommand::Mention {
2478                agent: name,
2479                prompt,
2480            } => self.handle_agent_spawn_foreground(&name, &prompt).await,
2481            AgentCommand::Status => self.handle_agent_status(),
2482            AgentCommand::Cancel { id } => self.handle_agent_cancel(&id),
2483            AgentCommand::Approve { id } => self.handle_agent_approve(&id),
2484            AgentCommand::Deny { id } => self.handle_agent_deny(&id),
2485            AgentCommand::Resume { id, prompt } => self.handle_agent_resume(&id, &prompt).await,
2486        }
2487    }
2488
2489    /// Return the sub-agent definitions section formatted for the `/agents` fleet view.
2490    ///
2491    /// Produces a "Sub-agents:" header followed by one line per definition.
2492    /// Returns an empty string when no sub-agent manager is configured.
2493    pub(crate) fn handle_agents_definitions_list(&self) -> String {
2494        use std::fmt::Write as _;
2495
2496        let Some(mgr) = self.services.orchestration.subagent_manager.as_ref() else {
2497            return String::new();
2498        };
2499        let defs = mgr.definitions();
2500        if defs.is_empty() {
2501            return String::new();
2502        }
2503        let mut out = String::from("Sub-agents:\n");
2504        for d in defs {
2505            let memory_label = match d.memory {
2506                Some(zeph_subagent::MemoryScope::User) => " [memory:user]",
2507                Some(zeph_subagent::MemoryScope::Project) => " [memory:project]",
2508                Some(zeph_subagent::MemoryScope::Local) => " [memory:local]",
2509                None => "",
2510            };
2511            if let Some(ref src) = d.source {
2512                let _ = writeln!(
2513                    out,
2514                    "  {}{} — {} ({})",
2515                    d.name, memory_label, d.description, src
2516                );
2517            } else {
2518                let _ = writeln!(out, "  {}{} — {}", d.name, memory_label, d.description);
2519            }
2520        }
2521        out
2522    }
2523
2524    /// Execute an `/agents` CRUD subcommand and return a formatted string.
2525    ///
2526    /// Handles `show`, `create`, `edit`, `delete` (the `list` case is handled by
2527    /// [`handle_agents_definitions_list`] and never reaches this method).
2528    pub(crate) fn handle_agents_crud(&mut self, cmd: zeph_subagent::AgentsCommand) -> String {
2529        use zeph_subagent::AgentsCommand;
2530
2531        let Some(mgr) = self.services.orchestration.subagent_manager.as_ref() else {
2532            return "Sub-agent manager is not available.".to_owned();
2533        };
2534
2535        match cmd {
2536            AgentsCommand::List => self.handle_agents_definitions_list(),
2537            AgentsCommand::Show { name } => {
2538                match mgr.definitions().iter().find(|d| d.name == name) {
2539                    Some(d) => format!(
2540                        "Agent: {}\nDescription: {}\nSource: {}\n",
2541                        d.name,
2542                        d.description,
2543                        d.source.as_deref().unwrap_or("unknown"),
2544                    ),
2545                    None => format!("No sub-agent definition named '{name}'."),
2546                }
2547            }
2548            AgentsCommand::Create { name } => {
2549                format!(
2550                    "To create a sub-agent definition, create a file at `.zeph/agents/{name}.md`.\n\
2551                     See the sub-agent documentation for the required frontmatter."
2552                )
2553            }
2554            AgentsCommand::Edit { name } => {
2555                format!("To edit '{name}', open its definition file in `.zeph/agents/{name}.md`.")
2556            }
2557            AgentsCommand::Delete { name } => {
2558                format!("To delete '{name}', remove the file `.zeph/agents/{name}.md`.")
2559            }
2560        }
2561    }
2562
2563    fn handle_agent_background(&mut self, name: &str, prompt: &str) -> Option<String> {
2564        let provider = self.provider.clone();
2565        let tool_executor = Arc::clone(&self.tool_executor);
2566        let skills = self.filtered_skills_for(name);
2567        let cfg = self.services.orchestration.subagent_config.clone();
2568        let spawn_ctx = self.build_spawn_context(&cfg);
2569        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2570        match mgr.spawn(
2571            name,
2572            prompt,
2573            provider,
2574            tool_executor,
2575            skills,
2576            &cfg,
2577            spawn_ctx,
2578        ) {
2579            Ok(id) => Some(format!(
2580                "Sub-agent '{name}' started in background (id: {short})",
2581                short = &id[..8.min(id.len())]
2582            )),
2583            Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
2584        }
2585    }
2586
2587    async fn handle_agent_spawn_foreground(&mut self, name: &str, prompt: &str) -> Option<String> {
2588        let provider = self.provider.clone();
2589        let tool_executor = Arc::clone(&self.tool_executor);
2590        let skills = self.filtered_skills_for(name);
2591        let cfg = self.services.orchestration.subagent_config.clone();
2592        let spawn_ctx = self.build_spawn_context(&cfg);
2593        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2594        let task_id = match mgr.spawn(
2595            name,
2596            prompt,
2597            provider,
2598            tool_executor,
2599            skills,
2600            &cfg,
2601            spawn_ctx,
2602        ) {
2603            Ok(id) => id,
2604            Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
2605        };
2606        let short = task_id[..8.min(task_id.len())].to_owned();
2607        let _ = self
2608            .channel
2609            .send(&format!("Sub-agent '{name}' running... (id: {short})"))
2610            .await;
2611        let _ = self
2612            .channel
2613            .notify_foreground_subagent_started(&task_id, name)
2614            .await;
2615        let label = format!("Sub-agent '{name}'");
2616        let Some((result, success)) = self.poll_subagent_until_done(&task_id, &label).await else {
2617            // Manager was dropped mid-poll; emit completed(false) so TUI does not stay stuck.
2618            let _ = self
2619                .channel
2620                .notify_foreground_subagent_completed(&task_id, name, false)
2621                .await;
2622            return None;
2623        };
2624        let _ = self
2625            .channel
2626            .notify_foreground_subagent_completed(&task_id, name, success)
2627            .await;
2628        Some(result)
2629    }
2630
2631    fn handle_agent_cancel(&mut self, id: &str) -> Option<String> {
2632        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2633        // Accept prefix match on task_id.
2634        let ids: Vec<String> = mgr
2635            .statuses()
2636            .into_iter()
2637            .map(|(task_id, _)| task_id)
2638            .filter(|task_id| task_id.starts_with(id))
2639            .collect();
2640        match ids.as_slice() {
2641            [] => Some(format!("No sub-agent with id prefix '{id}'")),
2642            [full_id] => {
2643                let full_id = full_id.clone();
2644                match mgr.cancel(&full_id) {
2645                    Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
2646                    Err(e) => Some(format!("Cancel failed: {e}")),
2647                }
2648            }
2649            _ => Some(format!(
2650                "Ambiguous id prefix '{id}': matches {} agents",
2651                ids.len()
2652            )),
2653        }
2654    }
2655
2656    async fn handle_agent_resume(&mut self, id: &str, prompt: &str) -> Option<String> {
2657        let cfg = self.services.orchestration.subagent_config.clone();
2658        // Resolve definition name from transcript meta before spawning so we can
2659        // look up skills by definition name rather than the UUID prefix (S1 fix).
2660        let def_name = {
2661            let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2662            match mgr.def_name_for_resume(id, &cfg) {
2663                Ok(name) => name,
2664                Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2665            }
2666        };
2667        let skills = self.filtered_skills_for(&def_name);
2668        let provider = self.provider.clone();
2669        let tool_executor = Arc::clone(&self.tool_executor);
2670        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2671        let (task_id, _) = match mgr.resume(id, prompt, provider, tool_executor, skills, &cfg) {
2672            Ok(pair) => pair,
2673            Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2674        };
2675        let short = task_id[..8.min(task_id.len())].to_owned();
2676        let _ = self
2677            .channel
2678            .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
2679            .await;
2680        let _ = self
2681            .channel
2682            .notify_foreground_subagent_started(&task_id, &def_name)
2683            .await;
2684        let Some((result, success)) = self
2685            .poll_subagent_until_done(&task_id, "Resumed sub-agent")
2686            .await
2687        else {
2688            // Manager was dropped mid-poll; emit completed(false) so TUI does not stay stuck.
2689            let _ = self
2690                .channel
2691                .notify_foreground_subagent_completed(&task_id, &def_name, false)
2692                .await;
2693            return None;
2694        };
2695        let _ = self
2696            .channel
2697            .notify_foreground_subagent_completed(&task_id, &def_name, success)
2698            .await;
2699        Some(result)
2700    }
2701
2702    fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
2703        let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2704        let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
2705        let reg = self.services.skill.registry.read();
2706        match zeph_subagent::filter_skills(&reg, &def.skills) {
2707            Ok(skills) => {
2708                let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
2709                if bodies.is_empty() {
2710                    None
2711                } else {
2712                    Some(bodies)
2713                }
2714            }
2715            Err(e) => {
2716                tracing::warn!(error = %e, "skill filtering failed for sub-agent");
2717                None
2718            }
2719        }
2720    }
2721
2722    /// Build a `SpawnContext` from current agent state for sub-agent spawning.
2723    fn build_spawn_context(
2724        &self,
2725        cfg: &zeph_config::SubAgentConfig,
2726    ) -> zeph_subagent::SpawnContext {
2727        zeph_subagent::SpawnContext {
2728            parent_messages: self.extract_parent_messages(cfg),
2729            parent_cancel: Some(self.runtime.lifecycle.cancel_token.clone()),
2730            parent_provider_name: {
2731                let name = &self.runtime.config.active_provider_name;
2732                if name.is_empty() {
2733                    None
2734                } else {
2735                    Some(name.clone())
2736                }
2737            },
2738            spawn_depth: self.runtime.config.spawn_depth,
2739            mcp_tool_names: self.extract_mcp_tool_names(),
2740            // F3 spec 050 §4: propagate seeded score when parent is >= Elevated.
2741            seed_trajectory_score: {
2742                let child = self.services.security.trajectory.spawn_child();
2743                let score = child.score_now();
2744                if score > 0.0 { Some(score) } else { None }
2745            },
2746            content_isolation: self.runtime.config.security.content_isolation.clone(),
2747            orchestrator_name: Some("zeph".to_owned()),
2748            orchestrator_role: Some("orchestrator".to_owned()),
2749            session_mcp_servers: Vec::new(),
2750        }
2751    }
2752
2753    /// Extract recent parent messages for history propagation (Section 5.7 in spec).
2754    ///
2755    /// Filters system messages, applies `context_window_turns` and `max_parent_messages` caps,
2756    /// applies a 25% context window cap using a 4-chars-per-token heuristic, prunes orphaned
2757    /// `ToolUse`/`ToolResult` pairs at the slice boundary, and optionally sanitizes text parts
2758    /// through the IPI pipeline according to `parent_context_policy`.
2759    fn extract_parent_messages(
2760        &self,
2761        config: &zeph_config::SubAgentConfig,
2762    ) -> Vec<zeph_llm::provider::Message> {
2763        use zeph_config::ParentContextPolicy;
2764        use zeph_llm::provider::Role;
2765
2766        if config.parent_context_policy == ParentContextPolicy::None
2767            || config.context_window_turns == 0
2768        {
2769            return Vec::new();
2770        }
2771
2772        let non_system: Vec<_> = self
2773            .msg
2774            .messages
2775            .iter()
2776            .filter(|m| m.role != Role::System)
2777            .cloned()
2778            .collect();
2779
2780        let take_count = config
2781            .context_window_turns
2782            .saturating_mul(2)
2783            .min(config.max_parent_messages);
2784        let start = non_system.len().saturating_sub(take_count);
2785        let mut msgs = non_system[start..].to_vec();
2786
2787        // Cap at 25% of model context window and prune orphaned tool pairs.
2788        let max_chars = 128_000usize / 4;
2789        let requested = msgs.len();
2790        trim_parent_messages(&mut msgs, max_chars);
2791        if msgs.len() < requested {
2792            tracing::info!(
2793                kept = msgs.len(),
2794                requested,
2795                "[subagent] truncated parent history due to token budget or orphan pruning"
2796            );
2797        }
2798
2799        if config.parent_context_policy == ParentContextPolicy::InheritSanitized {
2800            use zeph_sanitizer::{ContentSource, ContentSourceKind};
2801            let source =
2802                ContentSource::new(ContentSourceKind::A2aMessage).with_identifier("parent_history");
2803            msgs = sanitize_parent_messages(msgs, &self.services.security.sanitizer, &source);
2804        }
2805
2806        msgs
2807    }
2808
2809    /// Extract MCP tool names from the tool executor for diagnostic annotation.
2810    fn extract_mcp_tool_names(&self) -> Vec<String> {
2811        self.tool_executor
2812            .tool_definitions_erased()
2813            .into_iter()
2814            .filter(|t| t.id.starts_with("mcp_"))
2815            .map(|t| t.id.to_string())
2816            .collect()
2817    }
2818
2819    /// Classify a skill directory's source kind using on-disk markers and the bundled allowlist.
2820    ///
2821    /// Must be called from a blocking context (uses synchronous FS I/O).
2822    fn classify_source_kind(
2823        skill_dir: &std::path::Path,
2824        managed_dir: Option<&std::path::PathBuf>,
2825        bundled_names: &std::collections::HashSet<String>,
2826    ) -> zeph_memory::store::SourceKind {
2827        if managed_dir.is_some_and(|d| skill_dir.starts_with(d)) {
2828            let skill_name = skill_dir.file_name().and_then(|n| n.to_str()).unwrap_or("");
2829            let has_marker = skill_dir.join(".bundled").exists();
2830            if has_marker && bundled_names.contains(skill_name) {
2831                zeph_memory::store::SourceKind::Bundled
2832            } else {
2833                if has_marker {
2834                    tracing::warn!(
2835                        skill = %skill_name,
2836                        "skill has .bundled marker but is not in the bundled skill \
2837                         allowlist — classifying as Hub"
2838                    );
2839                }
2840                zeph_memory::store::SourceKind::Hub
2841            }
2842        } else {
2843            zeph_memory::store::SourceKind::Local
2844        }
2845    }
2846
2847    /// Update trust DB records for all reloaded skills.
2848    async fn update_trust_for_reloaded_skills(
2849        &mut self,
2850        all_meta: &[zeph_skills::loader::SkillMeta],
2851    ) {
2852        // Clone Arc before any .await so no &self fields are held across suspension points.
2853        let memory = self.services.memory.persistence.memory.clone();
2854        let Some(memory) = memory else {
2855            return;
2856        };
2857        let trust_cfg = self.services.skill.trust_config.clone();
2858        let managed_dir = self.services.skill.managed_dir.clone();
2859        let bundled_names: std::collections::HashSet<String> =
2860            zeph_skills::bundled_skill_names().into_iter().collect();
2861        for meta in all_meta {
2862            // Compute hash and classify source_kind in spawn_blocking — both are blocking FS calls
2863            // (.bundled marker .exists() and compute_skill_hash both do std::fs I/O).
2864            let skill_dir = meta.skill_dir.clone();
2865            let managed_dir_ref = managed_dir.clone();
2866            let bundled_names_ref = bundled_names.clone();
2867            let fs_result: Option<(String, zeph_memory::store::SourceKind)> =
2868                tokio::task::spawn_blocking(move || {
2869                    let hash = zeph_skills::compute_skill_hash(&skill_dir).ok()?;
2870                    let source_kind = Self::classify_source_kind(
2871                        &skill_dir,
2872                        managed_dir_ref.as_ref(),
2873                        &bundled_names_ref,
2874                    );
2875                    Some((hash, source_kind))
2876                })
2877                .await
2878                .unwrap_or(None);
2879
2880            let Some((current_hash, source_kind)) = fs_result else {
2881                tracing::warn!("failed to compute hash for '{}'", meta.name);
2882                continue;
2883            };
2884            let initial_level = match source_kind {
2885                zeph_memory::store::SourceKind::Bundled => &trust_cfg.bundled_level,
2886                zeph_memory::store::SourceKind::Hub => &trust_cfg.default_level,
2887                zeph_memory::store::SourceKind::Local | zeph_memory::store::SourceKind::File => {
2888                    &trust_cfg.local_level
2889                }
2890            };
2891            let existing = memory
2892                .sqlite()
2893                .load_skill_trust(&meta.name)
2894                .await
2895                .ok()
2896                .flatten();
2897            let trust_level = if let Some(ref row) = existing {
2898                if row.blake3_hash != current_hash {
2899                    trust_cfg.hash_mismatch_level
2900                } else if row.source_kind != source_kind {
2901                    // source_kind changed (e.g., hub → bundled on upgrade).
2902                    // Never override an explicit operator block. For active trust levels,
2903                    // adopt the source-kind initial level when it grants more trust.
2904                    let stored = row.trust_level;
2905                    if !stored.is_active() || stored.severity() <= initial_level.severity() {
2906                        stored
2907                    } else {
2908                        *initial_level
2909                    }
2910                } else {
2911                    row.trust_level
2912                }
2913            } else {
2914                *initial_level
2915            };
2916            let source_path = meta.skill_dir.to_str();
2917            if let Err(e) = memory
2918                .sqlite()
2919                .upsert_skill_trust(
2920                    &meta.name,
2921                    trust_level,
2922                    source_kind,
2923                    None,
2924                    source_path,
2925                    &current_hash,
2926                )
2927                .await
2928            {
2929                tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
2930            }
2931        }
2932    }
2933
2934    /// Rebuild or sync the in-memory skill matcher and BM25 index after a registry update.
2935    async fn rebuild_skill_matcher(&mut self, all_meta: &[&zeph_skills::loader::SkillMeta]) {
2936        let provider = self.embedding_provider.clone();
2937        let embed_timeout =
2938            std::time::Duration::from_secs(self.runtime.config.timeouts.embedding_seconds);
2939        let embed_fn = move |text: &str| -> zeph_skills::matcher::EmbedFuture {
2940            let owned = text.to_owned();
2941            let p = provider.clone();
2942            Box::pin(async move {
2943                if let Ok(result) = tokio::time::timeout(embed_timeout, p.embed(&owned)).await {
2944                    result
2945                } else {
2946                    tracing::warn!(
2947                        timeout_secs = embed_timeout.as_secs(),
2948                        "skill matcher: embedding timed out"
2949                    );
2950                    Err(zeph_llm::LlmError::Timeout)
2951                }
2952            })
2953        };
2954
2955        let needs_inmemory_rebuild = !self
2956            .services
2957            .skill
2958            .matcher
2959            .as_ref()
2960            .is_some_and(SkillMatcherBackend::is_qdrant);
2961
2962        if needs_inmemory_rebuild {
2963            self.services.skill.matcher = SkillMatcher::new(all_meta, embed_fn)
2964                .await
2965                .map(SkillMatcherBackend::InMemory);
2966        } else if let Some(ref mut backend) = self.services.skill.matcher {
2967            let _ = self.channel.send_status("syncing skill index...").await;
2968            let on_progress: Option<Box<dyn Fn(usize, usize) + Send>> =
2969                self.services.session.status_tx.clone().map(
2970                    |tx| -> Box<dyn Fn(usize, usize) + Send> {
2971                        Box::new(move |completed, total| {
2972                            let msg = format!("Syncing skills: {completed}/{total}");
2973                            let _ = tx.send(msg);
2974                        })
2975                    },
2976                );
2977            if let Err(e) = backend
2978                .sync(
2979                    all_meta,
2980                    &self.services.skill.embedding_model,
2981                    embed_fn,
2982                    on_progress,
2983                )
2984                .await
2985            {
2986                tracing::warn!("failed to sync skill embeddings: {e:#}");
2987            }
2988        }
2989
2990        if self.services.skill.hybrid_search {
2991            let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
2992            let _ = self.channel.send_status("rebuilding search index...").await;
2993            self.services.skill.rebuild_bm25(&descs);
2994        }
2995    }
2996
2997    #[tracing::instrument(name = "core.agent.reload_skills", skip_all, level = "debug")]
2998    async fn reload_skills(&mut self) {
2999        let old_fp = self.services.skill.fingerprint();
3000        let reload_paths = if let Some(ref supplier) = self.services.skill.plugin_dirs_supplier {
3001            let plugin_dirs = supplier();
3002            let mut paths = self.services.skill.skill_paths.clone();
3003            for dir in plugin_dirs {
3004                if !paths.contains(&dir) {
3005                    paths.push(dir);
3006                }
3007            }
3008            paths
3009        } else {
3010            self.services.skill.skill_paths.clone()
3011        };
3012        self.services.skill.registry.write().reload(&reload_paths);
3013        if self.services.skill.fingerprint() == old_fp {
3014            return;
3015        }
3016        let _ = self.channel.send_status("reloading skills...").await;
3017
3018        let all_meta = self
3019            .services
3020            .skill
3021            .registry
3022            .read()
3023            .all_meta()
3024            .into_iter()
3025            .cloned()
3026            .collect::<Vec<_>>();
3027
3028        self.update_trust_for_reloaded_skills(&all_meta).await;
3029
3030        let all_meta_refs = all_meta.iter().collect::<Vec<_>>();
3031        self.rebuild_skill_matcher(&all_meta_refs).await;
3032
3033        let all_skills: Vec<Skill> = {
3034            let reg = self.services.skill.registry.read();
3035            reg.all_meta()
3036                .iter()
3037                .filter_map(|m| reg.skill(&m.name).ok())
3038                .collect()
3039        };
3040        let trust_map = self.build_skill_trust_map().await;
3041        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
3042        let skills_prompt =
3043            state::SkillState::rebuild_prompt(&all_skills, &trust_map, &empty_health);
3044        self.services
3045            .skill
3046            .last_skills_prompt
3047            .clone_from(&skills_prompt);
3048        let system_prompt = build_system_prompt(&skills_prompt, None);
3049        if let Some(msg) = self.msg.messages.first_mut() {
3050            msg.content = system_prompt;
3051        }
3052
3053        let _ = self.channel.send_status("").await;
3054        tracing::info!(
3055            "reloaded {} skill(s)",
3056            self.services.skill.registry.read().all_meta().len()
3057        );
3058    }
3059
3060    async fn reload_instructions(&mut self) {
3061        // Drain any additional queued events before reloading to avoid redundant reloads.
3062        if let Some(ref mut rx) = self.runtime.instructions.reload_rx {
3063            while rx.try_recv().is_ok() {}
3064        }
3065        let Some(ref state) = self.runtime.instructions.reload_state else {
3066            return;
3067        };
3068        let base_dir = state.base_dir.clone();
3069        let provider_kinds = state.provider_kinds.clone();
3070        let explicit_files = state.explicit_files.clone();
3071        let auto_detect = state.auto_detect;
3072        let new_blocks = crate::instructions::load_instructions_async(
3073            base_dir,
3074            provider_kinds,
3075            explicit_files,
3076            auto_detect,
3077        )
3078        .await;
3079        let old_sources: std::collections::HashSet<_> = self
3080            .runtime
3081            .instructions
3082            .blocks
3083            .iter()
3084            .map(|b| &b.source)
3085            .collect();
3086        let new_sources: std::collections::HashSet<_> =
3087            new_blocks.iter().map(|b| &b.source).collect();
3088        for added in new_sources.difference(&old_sources) {
3089            tracing::info!(path = %added.display(), "instruction file added");
3090        }
3091        for removed in old_sources.difference(&new_sources) {
3092            tracing::info!(path = %removed.display(), "instruction file removed");
3093        }
3094        tracing::info!(
3095            old_count = self.runtime.instructions.blocks.len(),
3096            new_count = new_blocks.len(),
3097            "reloaded instruction files"
3098        );
3099        self.runtime.instructions.blocks = new_blocks;
3100    }
3101
3102    #[allow(clippy::too_many_lines)]
3103    fn reload_config(&mut self) {
3104        let Some(path) = self.runtime.lifecycle.config_path.clone() else {
3105            return;
3106        };
3107        let Some(config) = self.load_config_with_overlay(&path) else {
3108            return;
3109        };
3110        let budget_tokens = resolve_context_budget(&config, &self.provider);
3111        self.runtime.config.security = config.security;
3112        self.runtime.config.timeouts = config.timeouts;
3113        self.runtime.config.redact_credentials = config.memory.redact_credentials;
3114        self.services.memory.persistence.history_limit = config.memory.history_limit;
3115        self.services.memory.persistence.recall_limit = config.memory.semantic.recall_limit;
3116        self.services.memory.compaction.summarization_threshold =
3117            config.memory.summarization_threshold;
3118        self.services.skill.max_active_skills = config.skills.max_active_skills.get();
3119        self.services.skill.disambiguation_threshold = config.skills.disambiguation_threshold;
3120        self.services.skill.min_injection_score = config.skills.min_injection_score;
3121        self.services.skill.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
3122        self.services.skill.hybrid_search = config.skills.hybrid_search;
3123        self.services.skill.two_stage_matching = config.skills.two_stage_matching;
3124        self.services.skill.confusability_threshold =
3125            config.skills.confusability_threshold.clamp(0.0, 1.0);
3126        config
3127            .skills
3128            .generation_provider
3129            .as_str()
3130            .clone_into(&mut self.services.skill.generation_provider_name);
3131        config
3132            .skills
3133            .disambiguate_provider
3134            .as_str()
3135            .clone_into(&mut self.services.skill.disambiguate_provider_name);
3136        self.services.skill.generation_timeout_ms = config.skills.generation_timeout_ms;
3137        self.services.skill.generation_output_dir =
3138            config.skills.generation_output_dir.as_deref().map(|p| {
3139                if let Some(stripped) = p.strip_prefix("~/") {
3140                    dirs::home_dir()
3141                        .map_or_else(|| std::path::PathBuf::from(p), |h| h.join(stripped))
3142                } else {
3143                    std::path::PathBuf::from(p)
3144                }
3145            });
3146
3147        self.context_manager.budget = Some(
3148            ContextBudget::new(budget_tokens, 0.20).with_graph_enabled(config.memory.graph.enabled),
3149        );
3150
3151        {
3152            let graph_cfg = &config.memory.graph;
3153            if graph_cfg.rpe.enabled {
3154                // Re-create router only if it doesn't exist yet; preserve state on hot-reload.
3155                if self.services.memory.extraction.rpe_router.is_none() {
3156                    self.services.memory.extraction.rpe_router =
3157                        Some(std::sync::Mutex::new(zeph_memory::RpeRouter::new(
3158                            graph_cfg.rpe.threshold,
3159                            graph_cfg.rpe.max_skip_turns,
3160                        )));
3161                }
3162            } else {
3163                self.services.memory.extraction.rpe_router = None;
3164            }
3165            self.services.memory.extraction.graph_config = graph_cfg.clone();
3166        }
3167        self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
3168        self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
3169        self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
3170        self.context_manager
3171            .set_compaction_cooldown_turns(config.memory.compaction_cooldown_turns);
3172        self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
3173        self.context_manager.compression = config.memory.compression.clone();
3174        self.context_manager.routing = config.memory.store_routing.clone();
3175        // Resolve routing_classifier_provider from the provider pool (#2484).
3176        self.context_manager.store_routing_provider = if config
3177            .memory
3178            .store_routing
3179            .routing_classifier_provider
3180            .is_empty()
3181        {
3182            None
3183        } else {
3184            let resolved = self.resolve_background_provider(
3185                config
3186                    .memory
3187                    .store_routing
3188                    .routing_classifier_provider
3189                    .as_str(),
3190            );
3191            Some(std::sync::Arc::new(resolved))
3192        };
3193        self.services
3194            .memory
3195            .persistence
3196            .cross_session_score_threshold = config.memory.cross_session_score_threshold;
3197
3198        self.services.index.repo_map_tokens = config.index.repo_map_tokens;
3199        self.services.index.repo_map_ttl =
3200            std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
3201
3202        self.services
3203            .session
3204            .hooks_config
3205            .cwd_changed
3206            .clone_from(&config.hooks.cwd_changed);
3207        self.services
3208            .session
3209            .hooks_config
3210            .permission_denied
3211            .clone_from(&config.hooks.permission_denied);
3212        self.services
3213            .session
3214            .hooks_config
3215            .turn_complete
3216            .clone_from(&config.hooks.turn_complete);
3217        // file_changed_hooks require watcher restart to take effect — skipped here.
3218
3219        tracing::info!("config reloaded");
3220    }
3221
3222    /// Load config from disk, apply plugin overlays, and warn on shell divergence.
3223    ///
3224    /// Returns `None` when loading or overlay merge fails (caller keeps prior runtime state).
3225    fn load_config_with_overlay(&mut self, path: &std::path::Path) -> Option<Config> {
3226        let mut config = match Config::load(path) {
3227            Ok(c) => c,
3228            Err(e) => {
3229                tracing::warn!("config reload failed: {e:#}");
3230                return None;
3231            }
3232        };
3233
3234        // Re-apply plugin overlays. On error, keep previous runtime state intact.
3235        let new_overlay = if self.runtime.lifecycle.plugins_dir.as_os_str().is_empty() {
3236            None
3237        } else {
3238            match zeph_plugins::apply_plugin_config_overlays(
3239                &mut config,
3240                &self.runtime.lifecycle.plugins_dir,
3241            ) {
3242                Ok(o) => Some(o),
3243                Err(e) => {
3244                    tracing::warn!(
3245                        "plugin overlay merge failed during reload: {e:#}; \
3246                         keeping previous runtime state"
3247                    );
3248                    return None;
3249                }
3250            }
3251        };
3252
3253        // M4: detect shell-level divergence from the baked-in executor and warn loudly.
3254        // ShellExecutor is not rebuilt on hot-reload; only skill threshold is live.
3255        // A follow-up P2 issue tracks live-rebuild of ShellExecutor.
3256        if let Some(ref overlay) = new_overlay {
3257            self.warn_on_shell_overlay_divergence(overlay, &config);
3258        }
3259        Some(config)
3260    }
3261
3262    /// React to shell policy divergence detected on hot-reload.
3263    ///
3264    /// `blocked_commands` is rebuilt live via `ShellPolicyHandle::rebuild` — no restart needed.
3265    /// `allowed_commands` cannot be rebuilt (feeds sandbox path intersection at construction time)
3266    /// — emit a warn + status banner when it changes.
3267    fn warn_on_shell_overlay_divergence(
3268        &self,
3269        new_overlay: &zeph_plugins::ResolvedOverlay,
3270        config: &Config,
3271    ) {
3272        let new_blocked: Vec<String> = {
3273            let mut v = config.tools.shell.blocked_commands.clone();
3274            v.sort();
3275            v
3276        };
3277        let new_allowed: Vec<String> = {
3278            let mut v = config.tools.shell.allowed_commands.clone();
3279            v.sort();
3280            v
3281        };
3282
3283        let startup = &self.runtime.lifecycle.startup_shell_overlay;
3284        let blocked_changed = new_blocked != startup.blocked;
3285        let allowed_changed = new_allowed != startup.allowed;
3286
3287        // blocked_commands IS rebuilt live — emit info-level confirmation only.
3288        if blocked_changed && let Some(ref h) = self.runtime.lifecycle.shell_policy_handle {
3289            h.rebuild(&config.tools.shell);
3290            tracing::info!(
3291                blocked_count = h.snapshot_blocked().len(),
3292                "shell blocked_commands rebuilt from hot-reload"
3293            );
3294        }
3295
3296        // allowed_commands cannot be rebuilt — sandbox path intersection is computed at
3297        // executor construction time. Warn loudly so the user restarts.
3298        //
3299        // Note: when base `allowed_commands` is empty (the default), the overlay's
3300        // intersection semantics keep it empty, so this branch is silently unreachable
3301        // for users who do not set a non-empty base list.
3302        if allowed_changed {
3303            let msg = "plugin config overlay changed shell allowed_commands; RESTART REQUIRED \
3304                 for sandbox path recomputation (blocked_commands was rebuilt live)";
3305            tracing::warn!("{msg}");
3306            if let Some(ref tx) = self.services.session.status_tx {
3307                let _ = tx.send(msg.to_owned());
3308            }
3309        }
3310
3311        let _ = new_overlay;
3312    }
3313
3314    /// Run `SideQuest` tool output eviction pass (#1885).
3315    ///
3316    /// PERF-1 fix: two-phase non-blocking design.
3317    ///
3318    /// Phase 1 (apply, this turn): check for a background LLM result spawned last turn,
3319    /// validate and apply it immediately.
3320    ///
3321    /// Phase 2 (schedule, this turn): rebuild cursors and spawn a background `tokio::spawn`
3322    /// task for the LLM call. The result is stored in `pending_sidequest_result` and applied
3323    /// next turn, so the current agent turn is never blocked by the LLM call.
3324    fn maybe_sidequest_eviction(&mut self) {
3325        // S1 runtime guard: warn when SideQuest is enabled alongside a non-Reactive pruning
3326        // strategy — the two systems share the same pool of evictable tool outputs and can
3327        // interfere. Disable sidequest.enabled when pruning_strategy != Reactive.
3328        if self.services.sidequest.config.enabled {
3329            use crate::config::PruningStrategy;
3330            if !matches!(
3331                self.context_manager.compression.pruning_strategy,
3332                PruningStrategy::Reactive
3333            ) {
3334                tracing::warn!(
3335                    strategy = ?self.context_manager.compression.pruning_strategy,
3336                    "sidequest is enabled alongside a non-Reactive pruning strategy; \
3337                     consider disabling sidequest.enabled to avoid redundant eviction"
3338                );
3339            }
3340        }
3341
3342        // Guard: do not evict while a focus session is active.
3343        if self.services.focus.is_active() {
3344            tracing::debug!("sidequest: skipping — focus session active");
3345            // Drop any pending result — cursors may be stale relative to focus truncation.
3346            self.services.compression.pending_sidequest_result = None;
3347            return;
3348        }
3349
3350        // Phase 1: apply pending result from last turn's background LLM call.
3351        self.sidequest_apply_pending();
3352
3353        // Phase 2: rebuild cursors and schedule the next background eviction LLM call.
3354        self.sidequest_schedule_next();
3355    }
3356
3357    fn sidequest_apply_pending(&mut self) {
3358        let Some(handle) = self.services.compression.pending_sidequest_result.take() else {
3359            return;
3360        };
3361        // `try_join` is non-blocking: if the task isn't done yet, `Err(handle)` is returned
3362        // and we reschedule below.
3363        let result = match handle.try_join() {
3364            Ok(result) => result,
3365            Err(_handle) => {
3366                // Task still running — drop it; a fresh one is scheduled below.
3367                tracing::debug!("sidequest: background LLM task not yet complete, rescheduling");
3368                return;
3369            }
3370        };
3371        match result {
3372            Ok(Some(evicted_indices)) if !evicted_indices.is_empty() => {
3373                let cursors_snapshot = self.services.sidequest.tool_output_cursors.clone();
3374                let freed = self.services.sidequest.apply_eviction(
3375                    &mut self.msg.messages,
3376                    &evicted_indices,
3377                    &self.runtime.metrics.token_counter,
3378                );
3379                if freed > 0 {
3380                    self.recompute_prompt_tokens();
3381                    // C1 fix: prevent maybe_compact() from firing in the same turn.
3382                    // cooldown=0: eviction does not impose post-compaction cooldown.
3383                    self.context_manager.set_compaction_state(
3384                        crate::agent::context_manager::CompactionState::CompactedThisTurn {
3385                            cooldown: 0,
3386                        },
3387                    );
3388                    tracing::info!(
3389                        freed_tokens = freed,
3390                        evicted_cursors = evicted_indices.len(),
3391                        pass = self.services.sidequest.passes_run,
3392                        "sidequest eviction complete"
3393                    );
3394                    if let Some(ref d) = self.runtime.debug.debug_dumper {
3395                        d.dump_sidequest_eviction(&cursors_snapshot, &evicted_indices, freed);
3396                    }
3397                    if let Some(ref tx) = self.services.session.status_tx {
3398                        let _ = tx.send(format!("SideQuest evicted {freed} tokens"));
3399                    }
3400                } else {
3401                    // apply_eviction returned 0 — clear spinner so it doesn't dangle.
3402                    if let Some(ref tx) = self.services.session.status_tx {
3403                        let _ = tx.send(String::new());
3404                    }
3405                }
3406            }
3407            Ok(None | Some(_)) => {
3408                tracing::debug!("sidequest: pending result: no cursors to evict");
3409                if let Some(ref tx) = self.services.session.status_tx {
3410                    let _ = tx.send(String::new());
3411                }
3412            }
3413            Err(e) => {
3414                tracing::debug!("sidequest: background task error: {e}");
3415                if let Some(ref tx) = self.services.session.status_tx {
3416                    let _ = tx.send(String::new());
3417                }
3418            }
3419        }
3420    }
3421
3422    fn sidequest_schedule_next(&mut self) {
3423        use zeph_llm::provider::{Message, MessageMetadata, Role};
3424
3425        self.services
3426            .sidequest
3427            .rebuild_cursors(&self.msg.messages, &self.runtime.metrics.token_counter);
3428
3429        if self.services.sidequest.tool_output_cursors.is_empty() {
3430            tracing::debug!("sidequest: no eligible cursors");
3431            return;
3432        }
3433
3434        let prompt = self.services.sidequest.build_eviction_prompt();
3435        let max_eviction_ratio = self.services.sidequest.config.max_eviction_ratio;
3436        let n_cursors = self.services.sidequest.tool_output_cursors.len();
3437        // Clone the provider so the spawn closure owns it without borrowing self.
3438        let provider = self.summary_or_primary_provider().clone();
3439
3440        let eviction_future = async move {
3441            let msgs = [Message {
3442                role: Role::User,
3443                content: prompt,
3444                parts: vec![],
3445                metadata: MessageMetadata::default(),
3446            }];
3447            let response =
3448                match tokio::time::timeout(std::time::Duration::from_secs(5), provider.chat(&msgs))
3449                    .await
3450                {
3451                    Ok(Ok(r)) => r,
3452                    Ok(Err(e)) => {
3453                        tracing::debug!("sidequest bg: LLM call failed: {e:#}");
3454                        return None;
3455                    }
3456                    Err(_) => {
3457                        tracing::debug!("sidequest bg: LLM call timed out");
3458                        return None;
3459                    }
3460                };
3461
3462            let start = response.find('{')?;
3463            let end = response.rfind('}')?;
3464            if start > end {
3465                return None;
3466            }
3467            let json_slice = &response[start..=end];
3468            let parsed: sidequest::EvictionResponse = serde_json::from_str(json_slice).ok()?;
3469            let mut valid: Vec<usize> = parsed
3470                .del_cursors
3471                .into_iter()
3472                .filter(|&c| c < n_cursors)
3473                .collect();
3474            valid.sort_unstable();
3475            valid.dedup();
3476            #[allow(
3477                clippy::cast_precision_loss,
3478                clippy::cast_possible_truncation,
3479                clippy::cast_sign_loss
3480            )]
3481            let max_evict = ((n_cursors as f32) * max_eviction_ratio).ceil() as usize;
3482            valid.truncate(max_evict);
3483            Some(valid)
3484        };
3485        let handle = self.runtime.lifecycle.task_supervisor.spawn_oneshot(
3486            std::sync::Arc::from("agent.sidequest.eviction"),
3487            move || eviction_future,
3488        );
3489        self.services.compression.pending_sidequest_result = Some(handle);
3490        tracing::debug!("sidequest: background LLM eviction task spawned");
3491        if let Some(ref tx) = self.services.session.status_tx {
3492            let _ = tx.send("SideQuest: scoring tool outputs...".into());
3493        }
3494    }
3495
3496    /// Return an `McpDispatch` adapter backed by the agent's MCP manager, if present.
3497    fn mcp_dispatch(&self) -> Option<McpManagerDispatch> {
3498        self.services
3499            .mcp
3500            .manager
3501            .as_ref()
3502            .map(|m| McpManagerDispatch(Arc::clone(m)))
3503    }
3504
3505    /// Check if the process cwd has changed since last call and fire `CwdChanged` hooks.
3506    ///
3507    /// Called after each tool batch completes. The check is a single syscall and has
3508    /// negligible cost. Only fires when cwd actually changed (defense-in-depth: normally
3509    /// only `set_working_directory` changes cwd; shell child processes cannot affect it).
3510    pub(crate) async fn check_cwd_changed(&mut self) {
3511        let current = match std::env::current_dir() {
3512            Ok(p) => p,
3513            Err(e) => {
3514                tracing::warn!("check_cwd_changed: failed to get cwd: {e}");
3515                return;
3516            }
3517        };
3518        if current == self.runtime.lifecycle.last_known_cwd {
3519            return;
3520        }
3521        let old_cwd =
3522            std::mem::replace(&mut self.runtime.lifecycle.last_known_cwd, current.clone());
3523        self.services.session.env_context.working_dir = current.display().to_string();
3524
3525        tracing::info!(
3526            old = %old_cwd.display(),
3527            new = %current.display(),
3528            "working directory changed"
3529        );
3530
3531        let _ = self
3532            .channel
3533            .send_status("Working directory changed\u{2026}")
3534            .await;
3535
3536        let hooks = self.services.session.hooks_config.cwd_changed.clone();
3537        if hooks.is_empty() {
3538            tracing::debug!("CwdChanged: no hooks configured, skipping");
3539        } else {
3540            tracing::debug!(count = hooks.len(), "CwdChanged: firing hooks");
3541            let mut env = std::collections::HashMap::new();
3542            env.insert("ZEPH_OLD_CWD".to_owned(), old_cwd.display().to_string());
3543            env.insert("ZEPH_NEW_CWD".to_owned(), current.display().to_string());
3544            let dispatch = self.mcp_dispatch();
3545            let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
3546                .as_ref()
3547                .map(|d| d as &dyn zeph_subagent::McpDispatch);
3548            if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp, None).await {
3549                tracing::warn!(error = %e, "CwdChanged hook failed");
3550            } else {
3551                tracing::info!(count = hooks.len(), "CwdChanged: hooks fired");
3552            }
3553        }
3554
3555        let _ = self.channel.send_status("").await;
3556    }
3557
3558    /// Handle a `FileChangedEvent` from the file watcher.
3559    pub(crate) async fn handle_file_changed(
3560        &mut self,
3561        event: crate::file_watcher::FileChangedEvent,
3562    ) {
3563        tracing::info!(path = %event.path.display(), "file changed");
3564
3565        let _ = self
3566            .channel
3567            .send_status("Running file-change hook\u{2026}")
3568            .await;
3569
3570        let hooks = self
3571            .services
3572            .session
3573            .hooks_config
3574            .file_changed_hooks
3575            .clone();
3576        if hooks.is_empty() {
3577            tracing::debug!(path = %event.path.display(), "FileChanged: no hooks configured, skipping");
3578        } else {
3579            tracing::debug!(count = hooks.len(), path = %event.path.display(), "FileChanged: firing hooks");
3580            let mut env = std::collections::HashMap::new();
3581            env.insert(
3582                "ZEPH_CHANGED_PATH".to_owned(),
3583                event.path.display().to_string(),
3584            );
3585            let dispatch = self.mcp_dispatch();
3586            let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
3587                .as_ref()
3588                .map(|d| d as &dyn zeph_subagent::McpDispatch);
3589            if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp, None).await {
3590                tracing::warn!(error = %e, "FileChanged hook failed");
3591            } else {
3592                tracing::info!(count = hooks.len(), path = %event.path.display(), "FileChanged: hooks fired");
3593            }
3594        }
3595
3596        let _ = self.channel.send_status("").await;
3597    }
3598
3599    /// If the compression spectrum is enabled and a promotion engine is wired, spawn a
3600    /// background scan task.
3601    ///
3602    /// The task loads the most-recent episodic window from `SemanticMemory`, runs the
3603    /// greedy clustering scan, and calls `promote` for each qualifying candidate.
3604    ///
3605    /// Supervised via [`agent_supervisor::BackgroundSupervisor`] under
3606    /// [`agent_supervisor::TaskClass::Enrichment`] — dropped under high load rather than
3607    /// blocking the turn.
3608    pub(super) fn maybe_spawn_promotion_scan(&mut self) {
3609        let Some(engine) = self.services.promotion_engine.clone() else {
3610            return;
3611        };
3612
3613        let Some(memory) = self.services.memory.persistence.memory.clone() else {
3614            return;
3615        };
3616
3617        // Use a conservative window cap. The engine's own PromotionConfig thresholds
3618        // determine whether a cluster actually qualifies; this is just the DB scan limit.
3619        let promotion_window = 200usize;
3620
3621        let accepted = self.runtime.lifecycle.supervisor.spawn(
3622            agent_supervisor::TaskClass::Enrichment,
3623            "compression_spectrum.promotion_scan",
3624            async move {
3625                let span = tracing::info_span!("memory.compression.promote.background");
3626                let _enter = span.enter();
3627
3628                let window = match memory.load_promotion_window(promotion_window).await {
3629                    Ok(w) => w,
3630                    Err(e) => {
3631                        tracing::warn!(error = %e, "promotion scan: failed to load window");
3632                        return;
3633                    }
3634                };
3635
3636                if window.is_empty() {
3637                    return;
3638                }
3639
3640                let candidates = match engine.scan(&window).await {
3641                    Ok(c) => c,
3642                    Err(e) => {
3643                        tracing::warn!(error = %e, "promotion scan: clustering failed");
3644                        return;
3645                    }
3646                };
3647
3648                for candidate in &candidates {
3649                    if let Err(e) = engine.promote(candidate).await {
3650                        tracing::warn!(
3651                            signature = %candidate.signature,
3652                            error = %e,
3653                            "promotion scan: promote failed"
3654                        );
3655                    }
3656                }
3657
3658                tracing::info!(candidates = candidates.len(), "promotion scan: complete");
3659            },
3660        );
3661
3662        if accepted {
3663            tracing::debug!("compression_spectrum: promotion scan task enqueued");
3664        }
3665    }
3666}
3667/// Estimates the JSON payload size of a single [`zeph_llm::provider::Message`] for token-budget
3668/// accounting.
3669///
3670/// When `parts` is empty the message is a legacy text-only message and `content.len()` is used
3671/// directly. Otherwise each part is measured individually so that structured variants (images,
3672/// tool invocations, thinking blocks) are accounted for rather than relying on the already-flat
3673/// `content` string, which may not reflect the actual API payload size.
3674pub(crate) fn estimate_parts_size(m: &zeph_llm::provider::Message) -> usize {
3675    use zeph_llm::provider::MessagePart;
3676    if m.parts.is_empty() {
3677        return m.content.len();
3678    }
3679    m.parts
3680        .iter()
3681        .map(|p| match p {
3682            MessagePart::Text { text }
3683            | MessagePart::Recall { text }
3684            | MessagePart::CodeContext { text }
3685            | MessagePart::Summary { text }
3686            | MessagePart::CrossSession { text } => text.len(),
3687            MessagePart::ToolOutput { body, .. } => body.len(),
3688            MessagePart::ToolUse { id, name, input } => {
3689                50 + id.len() + name.len() + input.to_string().len()
3690            }
3691            MessagePart::ToolResult {
3692                tool_use_id,
3693                content,
3694                ..
3695            } => 50 + tool_use_id.len() + content.len(),
3696            MessagePart::Image(img) => img.data.len() * 4 / 3,
3697            MessagePart::ThinkingBlock {
3698                thinking,
3699                signature,
3700            } => 50 + thinking.len() + signature.len(),
3701            MessagePart::RedactedThinkingBlock { data } => data.len(),
3702            MessagePart::Compaction { summary } => summary.len(),
3703        })
3704        .sum()
3705}
3706
3707/// Applies token-budget truncation and orphaned-tool-pair pruning to a parent message slice.
3708///
3709/// Budget truncation keeps the **most recent** messages that fit within `max_chars`
3710/// (a suffix), so the subagent always receives the freshest context.
3711///
3712/// Two passes are performed after budget truncation:
3713///
3714/// 1. Remove `ToolResult` parts from user messages whose matching `ToolUse` is no longer in the
3715///    slice (truncated away).
3716/// 2. Remove `ToolUse` parts from **interior** assistant messages whose matching `ToolResult`
3717///    was removed in pass 1 or was already absent. The trailing assistant message is exempt —
3718///    its unanswered `ToolUse` calls are not orphaned; the slice just ends before the result.
3719///
3720/// Messages that become fully empty after pruning are removed from `msgs`.
3721///
3722/// `rebuild_content` is called **only** when `retain` actually removed parts — preserving the
3723/// existing `content` field (and any `ThinkingBlock` text embedded there) for unmodified
3724/// messages.
3725pub(crate) fn trim_parent_messages(msgs: &mut Vec<zeph_llm::provider::Message>, max_chars: usize) {
3726    use zeph_llm::provider::{MessagePart, Role};
3727
3728    // Token-budget cap: keep the most recent messages that fit within max_chars.
3729    // We iterate from the end (newest) and drain from the front once the budget is exceeded,
3730    // so the subagent always receives the most recent context rather than stale early messages.
3731    let mut total_chars = 0usize;
3732    let mut drop_before = 0usize; // index of the first message to keep
3733    for (i, m) in msgs.iter().enumerate().rev() {
3734        total_chars += estimate_parts_size(m);
3735        if total_chars > max_chars {
3736            drop_before = i + 1;
3737            break;
3738        }
3739    }
3740    if drop_before > 0 {
3741        msgs.drain(..drop_before);
3742    }
3743
3744    // Pass 1: collect ToolUse IDs emitted by assistant messages; prune orphaned ToolResult
3745    // parts from user messages that reference a ToolUse no longer present in the slice.
3746    // Use owned Strings to avoid holding immutable borrows across the subsequent mutable loop.
3747    let emitted_tool_ids: std::collections::HashSet<String> = msgs
3748        .iter()
3749        .filter(|m| m.role == Role::Assistant)
3750        .flat_map(|m| m.parts.iter())
3751        .filter_map(|p| {
3752            if let MessagePart::ToolUse { id, .. } = p {
3753                Some(id.clone())
3754            } else {
3755                None
3756            }
3757        })
3758        .collect();
3759
3760    let mut orphans_removed = 0usize;
3761    for m in msgs.iter_mut() {
3762        if m.role != Role::User || m.parts.is_empty() {
3763            continue;
3764        }
3765        let before = m.parts.len();
3766        m.parts.retain(|p| match p {
3767            MessagePart::ToolResult { tool_use_id, .. } => {
3768                emitted_tool_ids.contains(tool_use_id.as_str())
3769            }
3770            _ => true,
3771        });
3772        let dropped = before - m.parts.len();
3773        if dropped > 0 {
3774            orphans_removed += dropped;
3775            if m.parts.is_empty() {
3776                m.content.clear();
3777            } else {
3778                m.rebuild_content();
3779            }
3780        }
3781    }
3782
3783    // Pass 2: collect ToolResult IDs present in user messages after pass 1; prune ToolUse
3784    // parts from assistant messages whose result is confirmed absent.
3785    //
3786    // The trailing assistant message is exempt: it may legitimately contain unanswered
3787    // ToolUse calls (the slice ends before the result arrives). Only interior assistant
3788    // messages — those followed by at least one user message — can have provably orphaned
3789    // ToolUse parts (the conversation moved on without answering them).
3790    let consumed_tool_ids: std::collections::HashSet<String> = msgs
3791        .iter()
3792        .filter(|m| m.role == Role::User)
3793        .flat_map(|m| m.parts.iter())
3794        .filter_map(|p| {
3795            if let MessagePart::ToolResult { tool_use_id, .. } = p {
3796                Some(tool_use_id.clone())
3797            } else {
3798                None
3799            }
3800        })
3801        .collect();
3802
3803    // Index of the last assistant message — exempt from pass 2.
3804    let last_assistant_idx = msgs
3805        .iter()
3806        .enumerate()
3807        .rev()
3808        .find(|(_, m)| m.role == Role::Assistant)
3809        .map(|(i, _)| i);
3810
3811    for (idx, m) in msgs.iter_mut().enumerate() {
3812        if m.role != Role::Assistant || m.parts.is_empty() {
3813            continue;
3814        }
3815        // Skip the trailing assistant message — its unanswered ToolUse calls are not orphaned.
3816        if Some(idx) == last_assistant_idx {
3817            continue;
3818        }
3819        let before = m.parts.len();
3820        m.parts.retain(|p| match p {
3821            MessagePart::ToolUse { id, .. } => consumed_tool_ids.contains(id.as_str()),
3822            _ => true,
3823        });
3824        let dropped = before - m.parts.len();
3825        if dropped > 0 {
3826            orphans_removed += dropped;
3827            if m.parts.is_empty() {
3828                m.content.clear();
3829            } else {
3830                m.rebuild_content();
3831            }
3832        }
3833    }
3834
3835    // Remove messages that were emptied by orphan pruning.
3836    msgs.retain(|m| !m.content.is_empty() || !m.parts.is_empty());
3837
3838    if orphans_removed > 0 {
3839        tracing::debug!(
3840            orphans = orphans_removed,
3841            "[subagent] pruned orphaned ToolUse/ToolResult parts from parent context boundary"
3842        );
3843    }
3844}
3845
3846/// Sanitize text parts of `msgs` through the IPI pipeline.
3847///
3848/// Only [`MessagePart::Text`] parts are passed through the sanitizer; structured parts
3849/// (`ToolUse`, `ToolResult`, `Recall`, `CodeContext`) are left untouched.  After sanitization
3850/// the message `content` field is rebuilt to stay consistent with the updated parts.
3851fn sanitize_parent_messages(
3852    mut msgs: Vec<zeph_llm::provider::Message>,
3853    sanitizer: &zeph_sanitizer::ContentSanitizer,
3854    source: &zeph_sanitizer::ContentSource,
3855) -> Vec<zeph_llm::provider::Message> {
3856    use zeph_llm::provider::MessagePart;
3857    for msg in &mut msgs {
3858        let mut changed = false;
3859        for part in &mut msg.parts {
3860            if let MessagePart::Text { text } = part {
3861                let clean = sanitizer.sanitize(text, source.clone());
3862                if clean.body != *text {
3863                    *text = clean.body;
3864                    changed = true;
3865                }
3866            }
3867        }
3868        if changed {
3869            msg.rebuild_content();
3870        }
3871    }
3872    msgs
3873}
3874
3875/// Thin wrapper that implements [`zeph_subagent::McpDispatch`] over an [`Arc<zeph_mcp::McpManager>`].
3876///
3877/// Used to pass MCP tool dispatch capability into `fire_hooks` without coupling
3878/// `zeph-subagent` to `zeph-mcp`.
3879struct McpManagerDispatch(Arc<zeph_mcp::McpManager>);
3880
3881impl zeph_subagent::McpDispatch for McpManagerDispatch {
3882    fn call_tool<'a>(
3883        &'a self,
3884        server: &'a str,
3885        tool: &'a str,
3886        args: serde_json::Value,
3887    ) -> std::pin::Pin<
3888        Box<dyn std::future::Future<Output = Result<serde_json::Value, String>> + Send + 'a>,
3889    > {
3890        Box::pin(async move {
3891            self.0
3892                .call_tool(server, tool, args)
3893                .await
3894                .map(|result| {
3895                    // Extract text content from the MCP response as a JSON value.
3896                    let texts: Vec<serde_json::Value> = result
3897                        .content
3898                        .iter()
3899                        .filter_map(|c| {
3900                            if let rmcp::model::RawContent::Text(t) = &c.raw {
3901                                Some(serde_json::Value::String(t.text.clone()))
3902                            } else {
3903                                None
3904                            }
3905                        })
3906                        .collect();
3907                    serde_json::Value::Array(texts)
3908                })
3909                .map_err(|e| e.to_string())
3910        })
3911    }
3912}
3913
3914pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
3915    while !*rx.borrow_and_update() {
3916        if rx.changed().await.is_err() {
3917            std::future::pending::<()>().await;
3918        }
3919    }
3920}
3921
3922pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
3923    match rx {
3924        Some(inner) => {
3925            if let Some(v) = inner.recv().await {
3926                Some(v)
3927            } else {
3928                *rx = None;
3929                std::future::pending().await
3930            }
3931        }
3932        None => std::future::pending().await,
3933    }
3934}
3935
3936/// Resolve the effective context budget from config, applying the `auto_budget` fallback.
3937///
3938/// Mirrors `AppBuilder::auto_budget_tokens` so hot-reload and initial startup use the same
3939/// logic: if `auto_budget = true` and `context_budget_tokens == 0`, query the provider's
3940/// context window; if still 0, fall back to 128 000 tokens.
3941/// Truncate a background run command to at most 80 characters for TUI display.
3942fn truncate_shell_command(cmd: &str) -> String {
3943    if cmd.len() <= 80 {
3944        return cmd.to_owned();
3945    }
3946    let end = cmd.floor_char_boundary(79);
3947    format!("{}…", &cmd[..end])
3948}
3949
3950/// Take the first 8 characters of a run-id hex string for compact TUI display.
3951fn truncate_shell_run_id(id: &str) -> String {
3952    id.chars().take(8).collect()
3953}
3954
3955pub(crate) fn resolve_context_budget(config: &Config, provider: &AnyProvider) -> usize {
3956    let tokens = if config.memory.auto_budget && config.memory.context_budget_tokens == 0 {
3957        if let Some(ctx_size) = provider.context_window() {
3958            tracing::info!(
3959                model_context = ctx_size,
3960                "auto-configured context budget on reload"
3961            );
3962            ctx_size
3963        } else {
3964            0
3965        }
3966    } else {
3967        config.memory.context_budget_tokens
3968    };
3969    if tokens == 0 {
3970        tracing::warn!(
3971            "context_budget_tokens resolved to 0 on reload — using fallback of 128000 tokens"
3972        );
3973        128_000
3974    } else {
3975        tokens
3976    }
3977}
3978
3979#[cfg(test)]
3980mod tests;
3981
3982#[cfg(test)]
3983pub(crate) use tests::agent_tests;
3984
3985#[cfg(test)]
3986mod test_stubs {
3987    use std::pin::Pin;
3988
3989    use zeph_commands::{
3990        CommandContext, CommandError, CommandHandler, CommandOutput, SlashCategory,
3991    };
3992
3993    /// Stub slash command registered only in `#[cfg(test)]` builds.
3994    ///
3995    /// Triggers the `Some(Err(CommandError))` arm in the session/debug registry
3996    /// dispatch block so the non-fatal error path can be tested without production
3997    /// command validation logic.
3998    pub(super) struct TestErrorCommand;
3999
4000    impl CommandHandler<CommandContext<'_>> for TestErrorCommand {
4001        fn name(&self) -> &'static str {
4002            "/test-error"
4003        }
4004
4005        fn description(&self) -> &'static str {
4006            "Test stub: always returns CommandError"
4007        }
4008
4009        fn category(&self) -> SlashCategory {
4010            SlashCategory::Session
4011        }
4012
4013        fn handle<'a>(
4014            &'a self,
4015            _ctx: &'a mut CommandContext<'_>,
4016            _args: &'a str,
4017        ) -> Pin<
4018            Box<dyn std::future::Future<Output = Result<CommandOutput, CommandError>> + Send + 'a>,
4019        > {
4020            Box::pin(async { Err(CommandError::new("boom")) })
4021        }
4022    }
4023}