Skip to main content

zeph_core/agent/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4// TODO(arch-revised-2026-04-26): agent/ module split deferred to epic/m49+/agent-split.
5// Hard prerequisite: 100% task_supervisor adoption (currently 10/31 raw tokio::spawn
6// sites in this directory). See arch-assessment-revised-2026-04-26T02-04-23.md PR 1
7// and PR 8. Do not split this file until PR 1 is merged and a /specs/ entry exists.
8
9mod acp_commands;
10mod agent_access_impl;
11pub(crate) mod agent_supervisor;
12mod autodream;
13mod builder;
14pub(crate) mod channel_impl;
15#[cfg(feature = "cocoon")]
16mod cocoon_cmd;
17mod command_context_impls;
18pub(super) mod compression_feedback;
19mod context;
20mod context_impls;
21pub(crate) mod context_manager;
22mod corrections;
23pub mod error;
24mod experiment_cmd;
25pub(crate) mod focus;
26mod index;
27mod learning;
28pub(crate) mod learning_engine;
29mod log_commands;
30mod loop_event;
31mod lsp_commands;
32mod magic_docs;
33mod mcp;
34pub(crate) mod memcot;
35mod message_queue;
36mod microcompact;
37mod model_commands;
38mod persistence;
39#[cfg(feature = "scheduler")]
40mod plan;
41mod policy_commands;
42mod provider_cmd;
43mod quality_hook;
44pub(crate) mod rate_limiter;
45#[cfg(feature = "scheduler")]
46mod scheduler_commands;
47#[cfg(feature = "scheduler")]
48mod scheduler_loop;
49mod scope_commands;
50pub mod session_config;
51mod session_digest;
52pub mod shadow_sentinel;
53pub(crate) mod sidequest;
54mod skill_management;
55pub mod slash_commands;
56pub mod speculative;
57pub(crate) mod state;
58pub(crate) mod task_injection;
59pub(crate) mod tool_execution;
60pub(crate) mod tool_orchestrator;
61pub mod trajectory;
62mod trajectory_commands;
63mod trust_commands;
64pub mod turn;
65mod utils;
66pub(crate) mod vigil;
67
68use std::collections::{HashMap, VecDeque};
69use std::fmt::Write as _;
70use std::sync::Arc;
71
72use parking_lot::RwLock;
73
74use tokio::sync::{mpsc, watch};
75use tokio_util::sync::CancellationToken;
76use zeph_llm::any::AnyProvider;
77use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
78use zeph_memory::TokenCounter;
79use zeph_memory::semantic::SemanticMemory;
80use zeph_skills::loader::Skill;
81use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
82use zeph_skills::prompt::format_skills_prompt;
83use zeph_skills::registry::SkillRegistry;
84use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
85
86use crate::channel::Channel;
87use crate::config::Config;
88use crate::context::{ContextBudget, build_system_prompt};
89use zeph_common::text::estimate_tokens;
90
91use loop_event::LoopEvent;
92use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, detect_image_mime};
93use state::MessageState;
94
95pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
96// CODE_CONTEXT_PREFIX is re-exported from zeph-agent-context::helpers so callers inside
97// zeph-core that build system-prompt injections can use it without depending on zeph-agent-context
98// directly. SESSION_DIGEST_PREFIX was removed when assembly migrated to ContextService.
99pub(crate) use zeph_agent_context::helpers::CODE_CONTEXT_PREFIX;
100pub(crate) const SCHEDULED_TASK_PREFIX: &str = "Execute the following scheduled task now: ";
101pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
102
103pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
104    use std::fmt::Write;
105    let capacity = "[tool output: ".len()
106        + tool_name.len()
107        + "]\n```\n".len()
108        + body.len()
109        + TOOL_OUTPUT_SUFFIX.len();
110    let mut buf = String::with_capacity(capacity);
111    let _ = write!(
112        buf,
113        "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
114    );
115    buf
116}
117
118/// Zeph agent: autonomous AI system with multi-model inference, semantic memory, skills,
119/// tool orchestration, and multi-channel I/O.
120///
121/// The agent maintains conversation history, manages LLM provider state, coordinates tool
122/// execution, and orchestrates memory and skill subsystems. It communicates with the outside
123/// world via the [`Channel`] trait, enabling support for CLI, Telegram, TUI, or custom I/O.
124///
125/// # Architecture
126///
127/// - **Message state**: Conversation history with system prompt, message queue, and metadata
128/// - **Memory state**: `SQLite` + Qdrant vector store for semantic search and compaction
129/// - **Skill state**: Registry, matching engine, and self-learning evolution
130/// - **Context manager**: Token budgeting, context assembly, and summarization
131/// - **Tool orchestrator**: DAG-based multi-tool execution with streaming output
132/// - **MCP client**: Multi-server support for Model Context Protocol
133/// - **Index state**: AST-based code indexing and semantic retrieval
134/// - **Security**: Sanitization, exfiltration detection, adversarial probes
135/// - **Metrics**: Token usage, latency, cost, and anomaly tracking
136///
137/// # Channel Contract
138///
139/// The agent requires a [`Channel`] implementation for user interaction:
140/// - Sends agent responses via `channel.send(message)`
141/// - Receives user input via `channel.recv()` / `channel.recv_internal()`
142/// - Supports structured events: tool invocations, tool output, streaming updates
143///
144/// # Lifecycle
145///
146/// 1. Create with [`Self::new`] or [`Self::new_with_registry_arc`]
147/// 2. Run main loop with [`Self::run`]
148/// 3. Clean up with [`Self::shutdown`] to persist state and close resources
149///
150pub struct Agent<C: Channel> {
151    // --- I/O & primary providers (kept inline) ---
152    provider: AnyProvider,
153    /// Dedicated embedding provider. Resolved once at bootstrap from `[[llm.providers]]`
154    /// (the entry with `embed = true`, or first entry with `embedding_model` set).
155    /// Falls back to `provider.clone()` when no dedicated entry exists.
156    /// **Never replaced** by `/provider switch`.
157    embedding_provider: AnyProvider,
158    channel: C,
159    pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
160
161    // --- Conversation core (kept inline) ---
162    pub(super) msg: MessageState,
163    pub(super) context_manager: context_manager::ContextManager,
164    pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
165
166    // --- Aggregated background services ---
167    pub(super) services: state::Services,
168
169    // --- Aggregated runtime / lifecycle / telemetry ---
170    pub(super) runtime: state::AgentRuntime,
171}
172
173/// Control flow signal returned by [`Agent::apply_dispatch_result`].
174enum DispatchFlow {
175    /// The command requested exit; the agent loop should `break`.
176    Break,
177    /// The command was handled; the agent loop should `continue`.
178    Continue,
179    /// The command was not recognised; the agent loop should fall through.
180    Fallthrough,
181}
182
183impl<C: Channel> Agent<C> {
184    /// Create a new agent instance with the given LLM provider, I/O channel, and subsystems.
185    ///
186    /// # Arguments
187    ///
188    /// * `provider` — Multi-model LLM provider (Claude, `OpenAI`, Ollama, Candle)
189    /// * `channel` — I/O abstraction for user interaction (CLI, Telegram, TUI, etc.)
190    /// * `registry` — Skill registry; moved into an internal `Arc<RwLock<_>>` for sharing
191    /// * `matcher` — Optional semantic skill matcher (e.g., Qdrant, BM25). If `None`,
192    ///   skills are matched by exact name only
193    /// * `max_active_skills` — Max concurrent skills in execution (must be > 0)
194    /// * `tool_executor` — Trait object for executing shell, web, and custom tools
195    ///
196    /// # Initialization
197    ///
198    /// The constructor:
199    /// 1. Wraps the skill registry into `Arc<RwLock<_>>` internally
200    /// 2. Builds the system prompt from registered skills
201    /// 3. Initializes all subsystems (memory, context manager, metrics, security)
202    /// 4. Returns a ready-to-run agent
203    ///
204    /// # Panics
205    ///
206    /// Panics if `max_active_skills` is 0.
207    #[must_use]
208    pub fn new(
209        provider: AnyProvider,
210        channel: C,
211        registry: SkillRegistry,
212        matcher: Option<SkillMatcherBackend>,
213        max_active_skills: usize,
214        tool_executor: impl ToolExecutor + 'static,
215    ) -> Self {
216        let registry = Arc::new(RwLock::new(registry));
217        let embedding_provider = provider.clone();
218        Self::new_with_registry_arc(
219            provider,
220            embedding_provider,
221            channel,
222            registry,
223            matcher,
224            max_active_skills,
225            tool_executor,
226        )
227    }
228
229    /// Create an agent from a pre-wrapped registry Arc, allowing the caller to
230    /// share the same Arc with other components (e.g. [`crate::SkillLoaderExecutor`]).
231    ///
232    /// # Panics
233    ///
234    /// Panics if the registry `RwLock` is poisoned.
235    #[must_use]
236    pub fn new_with_registry_arc(
237        provider: AnyProvider,
238        embedding_provider: AnyProvider,
239        channel: C,
240        registry: Arc<RwLock<SkillRegistry>>,
241        matcher: Option<SkillMatcherBackend>,
242        max_active_skills: usize,
243        tool_executor: impl ToolExecutor + 'static,
244    ) -> Self {
245        use state::{
246            AgentRuntime, CompressionState, DebugState, ExperimentState, FeedbackState, IndexState,
247            InstructionState, LifecycleState, McpState, MemoryState, MetricsState,
248            OrchestrationState, ProviderState, RuntimeConfig, SecurityState, Services,
249            SessionState, SkillState, ToolState,
250        };
251
252        debug_assert!(max_active_skills > 0, "max_active_skills must be > 0");
253        let all_skills: Vec<Skill> = {
254            let reg = registry.read();
255            reg.all_meta()
256                .iter()
257                .filter_map(|m| reg.skill(&m.name).ok())
258                .collect()
259        };
260        let empty_trust = HashMap::new();
261        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
262        let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
263        let system_prompt = build_system_prompt(&skills_prompt, None);
264        tracing::debug!(len = system_prompt.len(), "initial system prompt built");
265        tracing::trace!(prompt = %system_prompt, "full system prompt");
266
267        let initial_prompt_tokens = estimate_tokens(&system_prompt) as u64;
268        let token_counter = Arc::new(TokenCounter::new());
269
270        let services = Services {
271            memory: MemoryState::default(),
272            skill: SkillState::new(registry, matcher, max_active_skills, skills_prompt),
273            learning_engine: learning_engine::LearningEngine::new(),
274            feedback: FeedbackState::default(),
275            mcp: McpState::default(),
276            index: IndexState::default(),
277            session: SessionState::new(),
278            security: SecurityState::default(),
279            experiments: ExperimentState::new(),
280            compression: CompressionState::default(),
281            orchestration: OrchestrationState::default(),
282            focus: focus::FocusState::default(),
283            sidequest: sidequest::SidequestState::default(),
284            tool_state: ToolState::default(),
285            goal_accounting: None,
286            quality: None,
287            proactive_explorer: None,
288            promotion_engine: None,
289            taco_compressor: None,
290            speculation_engine: None,
291        };
292
293        let runtime = AgentRuntime {
294            config: RuntimeConfig::default(),
295            lifecycle: LifecycleState::new(),
296            providers: ProviderState::new(initial_prompt_tokens),
297            metrics: MetricsState::new(token_counter),
298            debug: DebugState::default(),
299            instructions: InstructionState::default(),
300        };
301
302        Self {
303            provider,
304            embedding_provider,
305            channel,
306            tool_executor: Arc::new(tool_executor),
307            msg: MessageState {
308                messages: vec![Message {
309                    role: Role::System,
310                    content: system_prompt,
311                    parts: vec![],
312                    metadata: MessageMetadata::default(),
313                }],
314                message_queue: VecDeque::new(),
315                pending_image_parts: Vec::new(),
316                last_persisted_message_id: None,
317                deferred_db_hide_ids: Vec::new(),
318                deferred_db_summaries: Vec::new(),
319            },
320            context_manager: context_manager::ContextManager::new(),
321            tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
322            services,
323            runtime,
324        }
325    }
326
327    /// Consume the agent and return the inner channel.
328    ///
329    /// Call this after [`run`][Agent::run] completes to retrieve the I/O channel (e.g., to
330    /// read captured responses from a headless channel such as `BenchmarkChannel`).
331    ///
332    /// # Examples
333    ///
334    /// ```no_run
335    /// # use zeph_core::agent::Agent;
336    /// // After agent.run().await completes, consume the agent to retrieve the channel.
337    /// // let channel: MyChannel = agent.into_channel();
338    /// ```
339    #[must_use]
340    pub fn into_channel(self) -> C {
341        self.channel
342    }
343
344    /// Poll all active sub-agents for completed/failed/canceled results.
345    ///
346    /// Non-blocking: returns immediately with a list of `(task_id, result)` pairs
347    /// for agents that have finished. Each completed agent is removed from the manager.
348    #[tracing::instrument(name = "core.agent.poll_subagents", skip_all, level = "debug")]
349    pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
350        let Some(mgr) = &mut self.services.orchestration.subagent_manager else {
351            return vec![];
352        };
353
354        let finished: Vec<String> = mgr
355            .statuses()
356            .into_iter()
357            .filter_map(|(id, status)| {
358                if matches!(
359                    status.state,
360                    zeph_subagent::SubAgentState::Completed
361                        | zeph_subagent::SubAgentState::Failed
362                        | zeph_subagent::SubAgentState::Canceled
363                ) {
364                    Some(id)
365                } else {
366                    None
367                }
368            })
369            .collect();
370
371        let mut results = vec![];
372        for task_id in finished {
373            match mgr.collect(&task_id).await {
374                Ok(result) => results.push((task_id, result)),
375                Err(e) => {
376                    tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
377                }
378            }
379        }
380        results
381    }
382
383    /// Call the LLM to generate a structured session summary with a configurable timeout.
384    ///
385    /// Falls back to plain-text chat if structured output fails or times out. Returns `None` on
386    /// any failure, logging a warning — callers must treat `None` as "skip storage".
387    ///
388    /// Each LLM attempt is bounded by `shutdown_summary_timeout_secs`; in the worst case
389    /// (structured call times out and plain-text fallback also times out) this adds up to
390    /// `2 * shutdown_summary_timeout_secs` of shutdown latency.
391    async fn call_llm_for_session_summary(
392        &self,
393        chat_messages: &[Message],
394    ) -> Option<zeph_memory::StructuredSummary> {
395        let timeout_dur = std::time::Duration::from_secs(
396            self.services
397                .memory
398                .compaction
399                .shutdown_summary_timeout_secs,
400        );
401        match tokio::time::timeout(
402            timeout_dur,
403            self.provider
404                .chat_typed_erased::<zeph_memory::StructuredSummary>(chat_messages),
405        )
406        .await
407        {
408            Ok(Ok(s)) => Some(s),
409            Ok(Err(e)) => {
410                tracing::warn!(
411                    "shutdown summary: structured LLM call failed, falling back to plain: {e:#}"
412                );
413                self.plain_text_summary_fallback(chat_messages, timeout_dur)
414                    .await
415            }
416            Err(_) => {
417                tracing::warn!(
418                    "shutdown summary: structured LLM call timed out after {}s, falling back to plain",
419                    self.services
420                        .memory
421                        .compaction
422                        .shutdown_summary_timeout_secs
423                );
424                self.plain_text_summary_fallback(chat_messages, timeout_dur)
425                    .await
426            }
427        }
428    }
429
430    async fn plain_text_summary_fallback(
431        &self,
432        chat_messages: &[Message],
433        timeout_dur: std::time::Duration,
434    ) -> Option<zeph_memory::StructuredSummary> {
435        match tokio::time::timeout(timeout_dur, self.provider.chat(chat_messages)).await {
436            Ok(Ok(plain)) => Some(zeph_memory::StructuredSummary {
437                summary: plain,
438                key_facts: vec![],
439                entities: vec![],
440            }),
441            Ok(Err(e)) => {
442                tracing::warn!("shutdown summary: plain LLM fallback failed: {e:#}");
443                None
444            }
445            Err(_) => {
446                tracing::warn!("shutdown summary: plain LLM fallback timed out");
447                None
448            }
449        }
450    }
451
452    /// Persist tombstone `ToolResult` messages for any assistant `ToolUse` parts that were written
453    /// to the DB during this session but never paired with a `ToolResult` (e.g. because stdin
454    /// closed while tool execution was in progress). Without this the next session startup strips
455    /// those assistant messages and emits orphan warnings.
456    async fn flush_orphaned_tool_use_on_shutdown(&mut self) {
457        use zeph_llm::provider::{MessagePart, Role};
458
459        // Walk messages in reverse: if the last assistant message (ignoring any trailing
460        // system messages) has ToolUse parts and is NOT immediately followed by a user
461        // message whose ToolResult ids cover those ToolUse ids, persist tombstones.
462        let msgs = &self.msg.messages;
463        // Find last assistant message index.
464        let Some(asst_idx) = msgs.iter().rposition(|m| m.role == Role::Assistant) else {
465            return;
466        };
467        let asst_msg = &msgs[asst_idx];
468        let tool_use_ids: Vec<(&str, &str, &serde_json::Value)> = asst_msg
469            .parts
470            .iter()
471            .filter_map(|p| {
472                if let MessagePart::ToolUse { id, name, input } = p {
473                    Some((id.as_str(), name.as_str(), input))
474                } else {
475                    None
476                }
477            })
478            .collect();
479        if tool_use_ids.is_empty() {
480            return;
481        }
482
483        // Check whether a following user message already pairs all ToolUse ids.
484        let paired_ids: std::collections::HashSet<&str> = msgs
485            .get(asst_idx + 1..)
486            .into_iter()
487            .flatten()
488            .filter(|m| m.role == Role::User)
489            .flat_map(|m| m.parts.iter())
490            .filter_map(|p| {
491                if let MessagePart::ToolResult { tool_use_id, .. } = p {
492                    Some(tool_use_id.as_str())
493                } else {
494                    None
495                }
496            })
497            .collect();
498
499        let unpaired: Vec<zeph_llm::provider::ToolUseRequest> = tool_use_ids
500            .iter()
501            .filter(|(id, _, _)| !paired_ids.contains(*id))
502            .map(|(id, name, input)| zeph_llm::provider::ToolUseRequest {
503                id: (*id).to_owned(),
504                name: (*name).to_owned().into(),
505                input: (*input).clone(),
506            })
507            .collect();
508
509        if unpaired.is_empty() {
510            return;
511        }
512
513        tracing::info!(
514            count = unpaired.len(),
515            "shutdown: persisting tombstone ToolResults for unpaired in-flight tool calls"
516        );
517        self.persist_cancelled_tool_results(&unpaired).await;
518    }
519
520    /// Generate and store a lightweight session summary at shutdown when no hard compaction fired.
521    ///
522    /// Guards:
523    /// - `shutdown_summary` config must be enabled
524    /// - `conversation_id` must be set (memory must be attached)
525    /// - no existing session summary in the store (primary guard — resilient to failed Qdrant writes)
526    /// - at least `shutdown_summary_min_messages` user-turn messages in history
527    ///
528    /// All errors are logged as warnings and swallowed — shutdown must never fail.
529    async fn maybe_store_shutdown_summary(&mut self) {
530        if !self.services.memory.compaction.shutdown_summary {
531            return;
532        }
533        let Some(memory) = self.services.memory.persistence.memory.clone() else {
534            return;
535        };
536        let Some(conversation_id) = self.services.memory.persistence.conversation_id else {
537            return;
538        };
539
540        // Primary guard: check if a summary already exists (handles failed Qdrant writes too).
541        match memory.has_session_summary(conversation_id).await {
542            Ok(true) => {
543                tracing::debug!("shutdown summary: session already has a summary, skipping");
544                return;
545            }
546            Ok(false) => {}
547            Err(e) => {
548                tracing::warn!("shutdown summary: failed to check existing summary: {e:#}");
549                return;
550            }
551        }
552
553        // Count user-turn messages only (skip system prompt at index 0).
554        let user_count = self
555            .msg
556            .messages
557            .iter()
558            .skip(1)
559            .filter(|m| m.role == Role::User)
560            .count();
561        if user_count
562            < self
563                .services
564                .memory
565                .compaction
566                .shutdown_summary_min_messages
567        {
568            tracing::debug!(
569                user_count,
570                min = self
571                    .services
572                    .memory
573                    .compaction
574                    .shutdown_summary_min_messages,
575                "shutdown summary: too few user messages, skipping"
576            );
577            return;
578        }
579
580        // TUI status — send errors silently ignored (TUI may already be gone at shutdown).
581        let _ = self.channel.send_status("Saving session summary...").await;
582
583        // Collect last N messages (skip system prompt at index 0).
584        let max = self
585            .services
586            .memory
587            .compaction
588            .shutdown_summary_max_messages;
589        if max == 0 {
590            tracing::debug!("shutdown summary: max_messages=0, skipping");
591            return;
592        }
593        let non_system: Vec<_> = self.msg.messages.iter().skip(1).collect();
594        let slice = if non_system.len() > max {
595            &non_system[non_system.len() - max..]
596        } else {
597            &non_system[..]
598        };
599
600        let msgs_for_prompt: Vec<(zeph_memory::MessageId, String, String)> = slice
601            .iter()
602            .map(|m| {
603                let role = match m.role {
604                    Role::User => "user".to_owned(),
605                    Role::Assistant => "assistant".to_owned(),
606                    Role::System => "system".to_owned(),
607                };
608                (zeph_memory::MessageId(0), role, m.content.clone())
609            })
610            .collect();
611
612        let prompt = zeph_memory::build_summarization_prompt(&msgs_for_prompt);
613        let chat_messages = vec![Message {
614            role: Role::User,
615            content: prompt,
616            parts: vec![],
617            metadata: MessageMetadata::default(),
618        }];
619
620        let Some(structured) = self.call_llm_for_session_summary(&chat_messages).await else {
621            let _ = self.channel.send_status("").await;
622            return;
623        };
624
625        if let Err(e) = memory
626            .store_shutdown_summary(conversation_id, &structured.summary, &structured.key_facts)
627            .await
628        {
629            tracing::warn!("shutdown summary: storage failed: {e:#}");
630        } else {
631            tracing::info!(
632                conversation_id = conversation_id.0,
633                "shutdown summary stored"
634            );
635        }
636
637        // Clear TUI status.
638        let _ = self.channel.send_status("").await;
639    }
640
641    /// Gracefully shut down the agent and persist state.
642    ///
643    /// Performs the following cleanup:
644    ///
645    /// 1. **Message persistence** — Deferred database writes (hide/summary operations)
646    ///    are flushed to memory or disk
647    /// 2. **Provider state** — LLM router state (e.g., Thompson sampling counters) is saved
648    ///    to the vault
649    /// 3. **Sub-agents** — All active sub-agent tasks are terminated
650    /// 4. **MCP servers** — All connected Model Context Protocol servers are shut down
651    /// 5. **Metrics finalization** — Compaction metrics and session metrics are recorded
652    /// 6. **Memory finalization** — Vector stores and semantic indices are flushed
653    /// 7. **Skill state** — Self-learning engine saves evolved skill definitions
654    ///
655    /// Call this before dropping the agent to ensure no data loss.
656    #[tracing::instrument(name = "core.agent.shutdown", skip_all, level = "debug")]
657    pub async fn shutdown(&mut self) {
658        let _ = self.channel.send_status("Shutting down...").await;
659
660        // CRIT-1: persist Thompson state accumulated during this session.
661        self.provider.save_router_state().await;
662
663        // Persist AdaptOrch Beta-arm table alongside Thompson state.
664        if let Some(ref advisor) = self.services.orchestration.topology_advisor
665            && let Err(e) = advisor.save()
666        {
667            tracing::warn!(error = %e, "adaptorch: failed to persist state");
668        }
669
670        if let Some(ref mut mgr) = self.services.orchestration.subagent_manager {
671            mgr.shutdown_all();
672        }
673
674        if let Some(ref manager) = self.services.mcp.manager {
675            manager.shutdown_all_shared().await;
676        }
677
678        // Finalize compaction trajectory: push the last open segment into the Vec.
679        // This segment would otherwise only be pushed when the next hard compaction fires,
680        // which never happens at session end.
681        if let Some(turns) = self.context_manager.turns_since_last_hard_compaction {
682            self.update_metrics(|m| {
683                m.compaction_turns_after_hard.push(turns);
684            });
685            self.context_manager.turns_since_last_hard_compaction = None;
686        }
687
688        if let Some(ref tx) = self.runtime.metrics.metrics_tx {
689            let m = tx.borrow();
690            if m.filter_applications > 0 {
691                #[allow(clippy::cast_precision_loss)]
692                let pct = if m.filter_raw_tokens > 0 {
693                    m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
694                } else {
695                    0.0
696                };
697                tracing::info!(
698                    raw_tokens = m.filter_raw_tokens,
699                    saved_tokens = m.filter_saved_tokens,
700                    applications = m.filter_applications,
701                    "tool output filtering saved ~{} tokens ({pct:.0}%)",
702                    m.filter_saved_tokens,
703                );
704            }
705            if m.compaction_hard_count > 0 {
706                tracing::info!(
707                    hard_compactions = m.compaction_hard_count,
708                    turns_after_hard = ?m.compaction_turns_after_hard,
709                    "hard compaction trajectory"
710                );
711            }
712        }
713
714        // Flush tombstone ToolResults for any assistant ToolUse that was persisted but never
715        // paired with a ToolResult (e.g. stdin EOF mid-execution). Without this the next session
716        // startup strips the orphaned ToolUse and emits warnings.
717        self.flush_orphaned_tool_use_on_shutdown().await;
718
719        // NOTE: forcibly aborts in-flight Enrichment and Telemetry tasks tracked by the
720        // supervisor. Before the sprint-1 refactor, experiment sessions were detached
721        // tokio::spawn calls that survived shutdown; they are now intentionally untracked
722        // (see experiment_cmd.rs) and will continue running until their own CancellationToken
723        // is triggered or the process exits.
724        self.runtime.lifecycle.supervisor.abort_all();
725
726        // Abort background task handles not tracked by BackgroundSupervisor.
727        // Per the Await Discipline rule, fire-and-forget handles must be aborted on shutdown.
728        if let Some(h) = self.services.compression.pending_task_goal.take() {
729            h.abort();
730        }
731        if let Some(h) = self.services.compression.pending_sidequest_result.take() {
732            h.abort();
733        }
734        if let Some(h) = self.services.compression.pending_subgoal.take() {
735            h.abort();
736        }
737
738        // Abort learning tasks (JoinSet detached at turn boundaries but not on shutdown).
739        self.services.learning_engine.learning_tasks.abort_all();
740
741        // Allow cancelled tasks to release their HTTP connections before the summary LLM call.
742        // abort_all() posts cancellation signals but does not drain tasks; aborted futures only
743        // observe cancellation at their next .await point. Without yielding here the summary
744        // call races in-flight enrichment HTTP connections for the same API rate-limit budget.
745        for _ in 0..4 {
746            tokio::task::yield_now().await;
747        }
748
749        self.maybe_store_shutdown_summary().await;
750        self.maybe_store_session_digest().await;
751
752        tracing::info!("agent shutdown complete");
753    }
754
755    /// Run the chat loop, receiving messages via the channel until EOF or shutdown.
756    ///
757    /// # Errors
758    ///
759    /// Returns an error if channel I/O or LLM communication fails.
760    /// Refresh sub-agent metrics snapshot for the TUI metrics panel.
761    fn refresh_subagent_metrics(&mut self) {
762        let Some(ref mgr) = self.services.orchestration.subagent_manager else {
763            return;
764        };
765        let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
766            .statuses()
767            .into_iter()
768            .map(|(id, s)| {
769                let def = mgr.agents_def(&id);
770                crate::metrics::SubAgentMetrics {
771                    name: def.map_or_else(|| id[..8.min(id.len())].to_owned(), |d| d.name.clone()),
772                    id: id.clone(),
773                    state: format!("{:?}", s.state).to_lowercase(),
774                    turns_used: s.turns_used,
775                    max_turns: def.map_or(20, |d| d.permissions.max_turns),
776                    background: def.is_some_and(|d| d.permissions.background),
777                    elapsed_secs: s.started_at.elapsed().as_secs(),
778                    permission_mode: def.map_or_else(String::new, |d| {
779                        use zeph_subagent::def::PermissionMode;
780                        match d.permissions.permission_mode {
781                            PermissionMode::Default => String::new(),
782                            PermissionMode::AcceptEdits => "accept_edits".into(),
783                            PermissionMode::DontAsk => "dont_ask".into(),
784                            PermissionMode::BypassPermissions => "bypass_permissions".into(),
785                            PermissionMode::Plan => "plan".into(),
786                        }
787                    }),
788                    transcript_dir: mgr
789                        .agent_transcript_dir(&id)
790                        .map(|p| p.to_string_lossy().into_owned()),
791                }
792            })
793            .collect();
794        self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
795    }
796
797    /// Non-blocking poll: notify the user when background sub-agents complete.
798    async fn notify_completed_subagents(&mut self) -> Result<(), error::AgentError> {
799        let completed = self.poll_subagents().await;
800        for (task_id, result) in completed {
801            let notice = if result.is_empty() {
802                format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
803            } else {
804                format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
805            };
806            if let Err(e) = self.channel.send(&notice).await {
807                tracing::warn!(error = %e, "failed to send sub-agent completion notice");
808            }
809        }
810        Ok(())
811    }
812
813    /// Run the agent main loop.
814    ///
815    /// # Errors
816    ///
817    /// Returns an error if the channel, LLM provider, or tool execution encounters a fatal error.
818    #[tracing::instrument(name = "core.agent.run", skip_all, level = "debug", err)]
819    #[allow(clippy::too_many_lines)] // run loop is inherently large; each branch is independent
820    pub async fn run(&mut self) -> Result<(), error::AgentError>
821    where
822        C: 'static,
823    {
824        if let Some(mut rx) = self.runtime.lifecycle.warmup_ready.take()
825            && !*rx.borrow()
826        {
827            let _ = rx.changed().await;
828            if !*rx.borrow() {
829                tracing::warn!("model warmup did not complete successfully");
830            }
831        }
832
833        // Restore the last-used provider preference before any user interaction (#3308).
834        self.restore_channel_provider().await;
835
836        // Load the session digest once at session start for context injection.
837        self.load_and_cache_session_digest().await;
838        self.maybe_send_resume_recap().await;
839
840        loop {
841            self.apply_provider_override();
842            self.check_tool_refresh().await;
843            self.process_pending_elicitations().await;
844            self.refresh_subagent_metrics();
845            self.notify_completed_subagents().await?;
846            self.drain_channel();
847
848            let (text, image_parts) = if let Some(queued) = self.msg.message_queue.pop_front() {
849                self.notify_queue_count().await;
850                if queued.raw_attachments.is_empty() {
851                    (queued.text, queued.image_parts)
852                } else {
853                    let msg = crate::channel::ChannelMessage {
854                        text: queued.text,
855                        attachments: queued.raw_attachments,
856                        is_guest_context: false,
857                        is_from_bot: false,
858                    };
859                    self.resolve_message(msg).await
860                }
861            } else {
862                match self.next_event().await? {
863                    None | Some(LoopEvent::Shutdown) => break,
864                    Some(LoopEvent::SkillReload) => {
865                        self.reload_skills().await;
866                        continue;
867                    }
868                    Some(LoopEvent::InstructionReload) => {
869                        self.reload_instructions();
870                        continue;
871                    }
872                    Some(LoopEvent::ConfigReload) => {
873                        self.reload_config();
874                        continue;
875                    }
876                    Some(LoopEvent::UpdateNotification(msg)) => {
877                        if let Err(e) = self.channel.send(&msg).await {
878                            tracing::warn!("failed to send update notification: {e}");
879                        }
880                        continue;
881                    }
882                    Some(LoopEvent::ExperimentCompleted(msg)) => {
883                        self.services.experiments.cancel = None;
884                        if let Err(e) = self.channel.send(&msg).await {
885                            tracing::warn!("failed to send experiment completion: {e}");
886                        }
887                        continue;
888                    }
889                    Some(LoopEvent::ScheduledTask(prompt)) => {
890                        let text = format!("{SCHEDULED_TASK_PREFIX}{prompt}");
891                        let msg = crate::channel::ChannelMessage {
892                            text,
893                            attachments: Vec::new(),
894                            is_guest_context: false,
895                            is_from_bot: false,
896                        };
897                        self.drain_channel();
898                        self.resolve_message(msg).await
899                    }
900                    Some(LoopEvent::TaskInjected(injection)) => {
901                        if let Some(ref mut ls) = self.runtime.lifecycle.user_loop {
902                            ls.iteration += 1;
903                            tracing::info!(iteration = ls.iteration, "loop: tick");
904                        }
905                        let msg = crate::channel::ChannelMessage {
906                            text: injection.prompt,
907                            attachments: Vec::new(),
908                            is_guest_context: false,
909                            is_from_bot: false,
910                        };
911                        self.drain_channel();
912                        self.resolve_message(msg).await
913                    }
914                    Some(LoopEvent::FileChanged(event)) => {
915                        self.handle_file_changed(event).await;
916                        continue;
917                    }
918                    Some(LoopEvent::Message(msg)) => {
919                        self.services.session.is_guest_context = msg.is_guest_context;
920                        self.drain_channel();
921                        self.resolve_message(msg).await
922                    }
923                }
924            };
925
926            let trimmed = text.trim();
927
928            // M3: extract flagged URLs from all slash commands before any registry dispatch,
929            // so `/skill install <url>` and similar commands populate user_provided_urls.
930            if trimmed.starts_with('/') {
931                let slash_urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
932                if !slash_urls.is_empty() {
933                    self.services
934                        .security
935                        .user_provided_urls
936                        .write()
937                        .extend(slash_urls);
938                }
939            }
940
941            // Registry dispatch: two-phase command dispatch.
942            //
943            // Phase 1 (session/debug): handlers that need sink + debug + messages but NOT agent.
944            // Phase 2 (agent): handlers that need &mut Agent directly; use null sentinels for
945            // the other CommandContext fields to satisfy the type but avoid borrow conflicts.
946            //
947            // STRUCTURAL NOTE (C4 — borrow-checker constraint, not deferred by oversight):
948            // A `TurnState<'a, C>` struct grouping disjoint `&mut Agent<C>` sub-fields would
949            // eliminate the LIFO-sentinel ordering below. The obstacle: `AgentAccess` is
950            // implemented on `Agent<C>` itself (see `agent_access_impl.rs`), which accesses
951            // fields like `memory_state`, `providers`, `mcp`, and `skill_state`. Those fields
952            // overlap with what a `TurnState` would need to borrow, so `AgentBackend::Real`
953            // cannot simultaneously hold `&mut Agent` while `TurnState` holds `&mut Agent.providers`.
954            // The fix requires splitting `Agent<C>` fields into two disjoint sub-structs and moving
955            // `AgentAccess` to the sub-struct that is disjoint from `TurnState`'s borrow set.
956            // That restructuring touches `agent_access_impl.rs`, `state.rs`, `builder.rs`, all
957            // command handlers, and the binary crate — estimated > 300 lines across > 5 files.
958            // Track as a multi-PR refactor; the current sentinel pattern is correct and safe.
959            //
960            // Drop-order rules enforced here:
961            //   - `sink_adapter` / `null_agent` declared before the registry block → dropped after.
962            //   - Phase-2 sentinels declared before `ctx` → dropped after `ctx`.
963            let session_impl = command_context_impls::SessionAccessImpl {
964                supports_exit: self.channel.supports_exit(),
965            };
966            let mut messages_impl = command_context_impls::MessageAccessImpl {
967                msg: &mut self.msg,
968                tool_state: &mut self.services.tool_state,
969                providers: &mut self.runtime.providers,
970                metrics: &self.runtime.metrics,
971                security: &mut self.services.security,
972                tool_orchestrator: &mut self.tool_orchestrator,
973            };
974            // sink_adapter declared before reg so it is dropped after reg (LIFO).
975            let mut sink_adapter = crate::channel::ChannelSinkAdapter(&mut self.channel);
976            // null_agent must be declared before reg so it lives longer (LIFO drop order).
977            let mut null_agent = zeph_commands::NullAgent;
978            let registry_handled = {
979                use zeph_commands::CommandRegistry;
980                use zeph_commands::handlers::debug::{
981                    DebugDumpCommand, DumpFormatCommand, LogCommand,
982                };
983                use zeph_commands::handlers::help::HelpCommand;
984                use zeph_commands::handlers::session::{
985                    ClearCommand, ClearQueueCommand, ExitCommand, QuitCommand, ResetCommand,
986                };
987
988                let mut reg = CommandRegistry::new();
989                reg.register(ExitCommand);
990                reg.register(QuitCommand);
991                reg.register(ClearCommand);
992                reg.register(ResetCommand);
993                reg.register(ClearQueueCommand);
994                reg.register(LogCommand);
995                reg.register(DebugDumpCommand);
996                reg.register(DumpFormatCommand);
997                reg.register(HelpCommand);
998                #[cfg(test)]
999                reg.register(test_stubs::TestErrorCommand);
1000
1001                let mut ctx = zeph_commands::CommandContext {
1002                    sink: &mut sink_adapter,
1003                    debug: &mut self.runtime.debug,
1004                    messages: &mut messages_impl,
1005                    session: &session_impl,
1006                    agent: &mut null_agent,
1007                };
1008                reg.dispatch(&mut ctx, trimmed).await
1009            };
1010            let session_reg_missed = registry_handled.is_none();
1011            match self
1012                .apply_dispatch_result(registry_handled, trimmed, false)
1013                .await
1014            {
1015                DispatchFlow::Break => break,
1016                DispatchFlow::Continue => continue,
1017                DispatchFlow::Fallthrough => {
1018                    // Not handled by the session/debug registry; try agent-command registry.
1019                }
1020            }
1021
1022            // Agent-command registry: handlers access Agent<C> directly.
1023            // Null sentinels declared here so they outlive ctx regardless of whether the `if`
1024            // block is entered. `ctx` borrows both `self` and the sentinels; it must drop before
1025            // any subsequent `self.channel.*` calls. Because Rust drops in LIFO order, the
1026            // sentinels here will outlive ctx (ctx is declared later, inside the block).
1027            let mut agent_null_debug = command_context_impls::NullDebugAccess;
1028            let mut agent_null_messages = command_context_impls::NullMessageAccess;
1029            let agent_null_session = command_context_impls::NullSessionAccess;
1030            let mut agent_null_sink = zeph_commands::NullSink;
1031            let agent_result: Option<
1032                Result<zeph_commands::CommandOutput, zeph_commands::CommandError>,
1033            > = if session_reg_missed {
1034                use zeph_commands::CommandRegistry;
1035                use zeph_commands::handlers::{
1036                    acp::AcpCommand,
1037                    agent_cmd::AgentCommand,
1038                    compaction::{CompactCommand, NewConversationCommand, RecapCommand},
1039                    experiment::ExperimentCommand,
1040                    goal::GoalCommand,
1041                    loop_cmd::LoopCommand,
1042                    lsp::LspCommand,
1043                    mcp::McpCommand,
1044                    memory::{GraphCommand, GuidelinesCommand, MemoryCommand},
1045                    misc::{CacheStatsCommand, ImageCommand, NotifyTestCommand},
1046                    model::{ModelCommand, ProviderCommand},
1047                    plan::PlanCommand,
1048                    plugins::PluginsCommand,
1049                    policy::PolicyCommand,
1050                    scheduler::SchedulerCommand,
1051                    skill::{FeedbackCommand, SkillCommand, SkillsCommand},
1052                    status::{FocusCommand, GuardrailCommand, SideQuestCommand, StatusCommand},
1053                    trajectory::{ScopeCommand, TrajectoryCommand},
1054                };
1055
1056                let mut agent_reg = CommandRegistry::new();
1057                agent_reg.register(MemoryCommand);
1058                agent_reg.register(GraphCommand);
1059                agent_reg.register(GuidelinesCommand);
1060                agent_reg.register(ModelCommand);
1061                agent_reg.register(ProviderCommand);
1062                // Phase 6 migrations: /skill, /skills, /feedback use clone-before-await pattern.
1063                agent_reg.register(SkillCommand);
1064                agent_reg.register(SkillsCommand);
1065                agent_reg.register(FeedbackCommand);
1066                agent_reg.register(McpCommand);
1067                agent_reg.register(PolicyCommand);
1068                agent_reg.register(SchedulerCommand);
1069                agent_reg.register(LspCommand);
1070                // Phase 4 migrations (Send-safe commands):
1071                agent_reg.register(CacheStatsCommand);
1072                agent_reg.register(ImageCommand);
1073                agent_reg.register(NotifyTestCommand);
1074                agent_reg.register(StatusCommand);
1075                agent_reg.register(GuardrailCommand);
1076                agent_reg.register(FocusCommand);
1077                agent_reg.register(SideQuestCommand);
1078                agent_reg.register(AgentCommand);
1079                // Phase 5 migrations (Send-compatible):
1080                agent_reg.register(CompactCommand);
1081                agent_reg.register(NewConversationCommand);
1082                agent_reg.register(RecapCommand);
1083                agent_reg.register(ExperimentCommand);
1084                agent_reg.register(PlanCommand);
1085                agent_reg.register(LoopCommand);
1086                agent_reg.register(PluginsCommand);
1087                agent_reg.register(AcpCommand);
1088                #[cfg(feature = "cocoon")]
1089                agent_reg.register(zeph_commands::handlers::cocoon::CocoonCommand);
1090                agent_reg.register(TrajectoryCommand);
1091                agent_reg.register(ScopeCommand);
1092                agent_reg.register(GoalCommand);
1093
1094                let mut ctx = zeph_commands::CommandContext {
1095                    sink: &mut agent_null_sink,
1096                    debug: &mut agent_null_debug,
1097                    messages: &mut agent_null_messages,
1098                    session: &agent_null_session,
1099                    agent: self,
1100                };
1101                // self is reborrowed; ctx drops at end of this block.
1102                agent_reg.dispatch(&mut ctx, trimmed).await
1103            } else {
1104                None
1105            };
1106            // self.channel is available again here (ctx borrow dropped above).
1107            // Post-dispatch learning hook for `/skill reject` / `/feedback` is triggered
1108            // inside apply_dispatch_result when with_learning = true.
1109            match self
1110                .apply_dispatch_result(agent_result, trimmed, true)
1111                .await
1112            {
1113                DispatchFlow::Break => break,
1114                DispatchFlow::Continue => continue,
1115                DispatchFlow::Fallthrough => {
1116                    // Not handled by agent registry; fall through to existing dispatch.
1117                }
1118            }
1119
1120            match self.handle_builtin_command(trimmed) {
1121                Some(true) => break,
1122                Some(false) => continue,
1123                None => {}
1124            }
1125
1126            self.process_user_message(text, image_parts).await?;
1127        }
1128
1129        // autoDream: run background memory consolidation if conditions are met (#2697).
1130        // Runs with a timeout — partial state is acceptable for MVP.
1131        self.maybe_autodream().await;
1132
1133        // Flush trace collector on normal exit (C-04: Drop handles error/panic paths).
1134        if let Some(ref mut tc) = self.runtime.debug.trace_collector {
1135            tc.finish();
1136        }
1137
1138        Ok(())
1139    }
1140
1141    /// Dispatch a slash-command registry result and flush the channel.
1142    ///
1143    /// Returns [`DispatchFlow::Break`] on exit, [`DispatchFlow::Continue`] when handled, or
1144    /// [`DispatchFlow::Fallthrough`] when `result` is `None`.
1145    /// When `with_learning` is `true`, triggers the post-command learning hook for `Message` output.
1146    async fn apply_dispatch_result(
1147        &mut self,
1148        result: Option<Result<zeph_commands::CommandOutput, zeph_commands::CommandError>>,
1149        command: &str,
1150        with_learning: bool,
1151    ) -> DispatchFlow {
1152        match result {
1153            Some(Ok(zeph_commands::CommandOutput::Exit)) => {
1154                let _ = self.channel.flush_chunks().await;
1155                DispatchFlow::Break
1156            }
1157            Some(Ok(
1158                zeph_commands::CommandOutput::Continue | zeph_commands::CommandOutput::Silent,
1159            )) => {
1160                let _ = self.channel.flush_chunks().await;
1161                DispatchFlow::Continue
1162            }
1163            Some(Ok(zeph_commands::CommandOutput::Message(msg))) => {
1164                let _ = self.channel.send(&msg).await;
1165                let _ = self.channel.flush_chunks().await;
1166                if with_learning {
1167                    self.maybe_trigger_post_command_learning(command).await;
1168                }
1169                DispatchFlow::Continue
1170            }
1171            Some(Err(e)) => {
1172                let _ = self.channel.send(&e.to_string()).await;
1173                let _ = self.channel.flush_chunks().await;
1174                tracing::warn!(command = %command, error = %e.0, "slash command failed");
1175                DispatchFlow::Continue
1176            }
1177            None => DispatchFlow::Fallthrough,
1178        }
1179    }
1180
1181    /// Apply any pending LLM provider override from ACP `set_session_config_option`.
1182    fn apply_provider_override(&mut self) {
1183        if let Some(ref slot) = self.runtime.providers.provider_override
1184            && let Some(new_provider) = slot.write().take()
1185        {
1186            tracing::debug!(provider = new_provider.name(), "ACP model override applied");
1187            self.provider = new_provider;
1188        }
1189    }
1190
1191    /// Poll all event sources and return the next [`LoopEvent`].
1192    ///
1193    /// Returns `None` when the inbound channel closes (graceful shutdown).
1194    ///
1195    /// # Errors
1196    ///
1197    /// Propagates channel receive errors.
1198    #[tracing::instrument(name = "core.agent.next_event", skip_all, level = "debug", err)]
1199    async fn next_event(&mut self) -> Result<Option<LoopEvent>, error::AgentError> {
1200        let event = tokio::select! {
1201            result = self.channel.recv() => {
1202                return Ok(result?.map(LoopEvent::Message));
1203            }
1204            () = shutdown_signal(&mut self.runtime.lifecycle.shutdown) => {
1205                tracing::info!("shutting down");
1206                LoopEvent::Shutdown
1207            }
1208            Some(_) = recv_optional(&mut self.services.skill.skill_reload_rx) => {
1209                LoopEvent::SkillReload
1210            }
1211            Some(_) = recv_optional(&mut self.runtime.instructions.reload_rx) => {
1212                LoopEvent::InstructionReload
1213            }
1214            Some(_) = recv_optional(&mut self.runtime.lifecycle.config_reload_rx) => {
1215                LoopEvent::ConfigReload
1216            }
1217            Some(msg) = recv_optional(&mut self.runtime.lifecycle.update_notify_rx) => {
1218                LoopEvent::UpdateNotification(msg)
1219            }
1220            Some(msg) = recv_optional(&mut self.services.experiments.notify_rx) => {
1221                LoopEvent::ExperimentCompleted(msg)
1222            }
1223            Some(prompt) = recv_optional(&mut self.runtime.lifecycle.custom_task_rx) => {
1224                tracing::info!("scheduler: injecting custom task as agent turn");
1225                LoopEvent::ScheduledTask(prompt)
1226            }
1227            () = async {
1228                if let Some(ref mut ls) = self.runtime.lifecycle.user_loop {
1229                    if ls.cancel_tx.is_cancelled() {
1230                        std::future::pending::<()>().await;
1231                    } else {
1232                        ls.interval.tick().await;
1233                    }
1234                } else {
1235                    std::future::pending::<()>().await;
1236                }
1237            } => {
1238                // Re-check user_loop after the tick — /loop stop may have fired between the
1239                // interval firing and this arm executing. Returning Ok(None) causes the caller
1240                // to `continue` without injecting a stale or empty prompt.
1241                let Some(ls) = self.runtime.lifecycle.user_loop.as_ref() else {
1242                    return Ok(None);
1243                };
1244                if ls.cancel_tx.is_cancelled() {
1245                    self.runtime.lifecycle.user_loop = None;
1246                    return Ok(None);
1247                }
1248                let prompt = ls.prompt.clone();
1249                LoopEvent::TaskInjected(task_injection::TaskInjection { prompt })
1250            }
1251            Some(event) = recv_optional(&mut self.runtime.lifecycle.file_changed_rx) => {
1252                LoopEvent::FileChanged(event)
1253            }
1254        };
1255        Ok(Some(event))
1256    }
1257
1258    #[tracing::instrument(name = "core.agent.resolve_message", skip_all, level = "debug")]
1259    async fn resolve_message(
1260        &self,
1261        msg: crate::channel::ChannelMessage,
1262    ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
1263        use crate::channel::{Attachment, AttachmentKind};
1264        use zeph_llm::provider::{ImageData, MessagePart};
1265
1266        let text_base = msg.text.clone();
1267
1268        let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
1269            .attachments
1270            .into_iter()
1271            .partition(|a| a.kind == AttachmentKind::Audio);
1272
1273        tracing::debug!(
1274            audio = audio_attachments.len(),
1275            has_stt = self.runtime.providers.stt.is_some(),
1276            "resolve_message attachments"
1277        );
1278
1279        let text = if !audio_attachments.is_empty()
1280            && let Some(stt) = self.runtime.providers.stt.as_ref()
1281        {
1282            let mut transcribed_parts = Vec::new();
1283            for attachment in &audio_attachments {
1284                if attachment.data.len() > MAX_AUDIO_BYTES {
1285                    tracing::warn!(
1286                        size = attachment.data.len(),
1287                        max = MAX_AUDIO_BYTES,
1288                        "audio attachment exceeds size limit, skipping"
1289                    );
1290                    continue;
1291                }
1292                match stt
1293                    .transcribe(&attachment.data, attachment.filename.as_deref())
1294                    .await
1295                {
1296                    Ok(result) => {
1297                        tracing::info!(
1298                            len = result.text.len(),
1299                            language = ?result.language,
1300                            "audio transcribed"
1301                        );
1302                        transcribed_parts.push(result.text);
1303                    }
1304                    Err(e) => {
1305                        tracing::error!(error = %e, "audio transcription failed");
1306                    }
1307                }
1308            }
1309            if transcribed_parts.is_empty() {
1310                text_base
1311            } else {
1312                let transcribed = transcribed_parts.join("\n");
1313                if text_base.is_empty() {
1314                    transcribed
1315                } else {
1316                    format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
1317                }
1318            }
1319        } else {
1320            if !audio_attachments.is_empty() {
1321                tracing::warn!(
1322                    count = audio_attachments.len(),
1323                    "audio attachments received but no STT provider configured, dropping"
1324                );
1325            }
1326            text_base
1327        };
1328
1329        let mut image_parts = Vec::new();
1330        for attachment in image_attachments {
1331            if attachment.data.len() > MAX_IMAGE_BYTES {
1332                tracing::warn!(
1333                    size = attachment.data.len(),
1334                    max = MAX_IMAGE_BYTES,
1335                    "image attachment exceeds size limit, skipping"
1336                );
1337                continue;
1338            }
1339            let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
1340            image_parts.push(MessagePart::Image(Box::new(ImageData {
1341                data: attachment.data,
1342                mime_type,
1343            })));
1344        }
1345
1346        (text, image_parts)
1347    }
1348
1349    /// Create a new [`Turn`] for the given input and advance the turn counter.
1350    ///
1351    /// Clears per-turn state that must not carry over between turns:
1352    /// - per-turn `CancellationToken` (new token for each turn)
1353    /// - per-turn URL set in `SecurityState` (cleared here; re-populated in
1354    ///   `process_user_message_inner` after security checks)
1355    fn begin_turn(&mut self, input: turn::TurnInput) -> turn::Turn {
1356        let id = turn::TurnId(self.runtime.debug.iteration_counter as u64);
1357        self.runtime.debug.iteration_counter += 1;
1358        let cancel_token = CancellationToken::new();
1359        // keep agent-wide token in sync with per-turn token — TODO(#3498): consolidate in Phase 2
1360        self.runtime.lifecycle.cancel_token = cancel_token.clone();
1361        self.services.security.user_provided_urls.write().clear();
1362        // Reset per-turn LLM request counter for the notification gate.
1363        self.runtime.lifecycle.turn_llm_requests = 0;
1364
1365        // Spec 050 §2: drain pending risk signals from executor layers before advancing.
1366        {
1367            let pending: Vec<u8> = {
1368                let mut q = self.services.security.trajectory_signal_queue.lock();
1369                std::mem::take(&mut *q)
1370            };
1371            for code in pending {
1372                self.services
1373                    .security
1374                    .trajectory
1375                    .record(crate::agent::trajectory::RiskSignal::from_code(code));
1376            }
1377        }
1378        // Spec 050 Invariant 2: advance trajectory sentinel BEFORE any gate evaluation.
1379        // F5: write auto-recover audit entry when sentinel hard-resets.
1380        if self.services.security.trajectory.advance_turn()
1381            && let Some(logger) = self.tool_orchestrator.audit_logger.clone()
1382        {
1383            let entry = zeph_tools::AuditEntry {
1384                timestamp: zeph_tools::chrono_now(),
1385                tool: "<sentinel>".to_owned().into(),
1386                command: String::new(),
1387                result: zeph_tools::AuditResult::Success,
1388                duration_ms: 0,
1389                error_category: Some("trajectory_auto_recover".to_owned()),
1390                error_domain: Some("security".to_owned()),
1391                error_phase: None,
1392                claim_source: None,
1393                mcp_server_id: None,
1394                injection_flagged: false,
1395                embedding_anomalous: false,
1396                cross_boundary_mcp_to_acp: false,
1397                adversarial_policy_decision: None,
1398                exit_code: None,
1399                truncated: false,
1400                caller_id: None,
1401                policy_match: None,
1402                correlation_id: None,
1403                vigil_risk: None,
1404                execution_env: None,
1405                resolved_cwd: None,
1406                scope_at_definition: None,
1407                scope_at_dispatch: None,
1408            };
1409            self.runtime.lifecycle.supervisor.spawn(
1410                crate::agent::agent_supervisor::TaskClass::Telemetry,
1411                "trajectory-auto-recover-audit",
1412                async move { logger.log(&entry).await },
1413            );
1414        }
1415        // Spec 050 Phase 2: reset per-turn probe counter before any tool dispatch.
1416        if let Some(ref sentinel) = self.services.security.shadow_sentinel {
1417            sentinel.advance_turn();
1418        }
1419        // Publish updated risk level to the shared slot so PolicyGateExecutor can read it.
1420        let risk_level = self.services.security.trajectory.current_risk();
1421        *self.services.security.trajectory_risk_slot.write() = u8::from(risk_level);
1422        // TUI/CLI: emit a status indicator when risk reaches High or Critical (NFR-CG-006).
1423        if let Some(alert) = self.services.security.trajectory.poll_alert() {
1424            let msg = format!(
1425                "[trajectory] Risk level: {:?} (score={:.2})",
1426                alert.level, alert.score
1427            );
1428            tracing::warn!(
1429                level = ?alert.level,
1430                score = alert.score,
1431                "trajectory sentinel alert"
1432            );
1433            if let Some(ref tx) = self.services.session.status_tx {
1434                let _ = tx.send(msg);
1435            }
1436        }
1437
1438        let context = turn::TurnContext::new(id, cancel_token, self.runtime.config.timeouts);
1439        turn::Turn::new(context, input)
1440    }
1441
1442    /// Finalise a turn: copy accumulated timings into `MetricsState` and flush.
1443    ///
1444    /// Must be called exactly once per turn, after `process_user_message_inner` returns
1445    /// (regardless of success or error). Corresponds to the M2 resolution in the spec:
1446    /// `TurnMetrics.timings` is the single source of truth; `MetricsState.pending_timings`
1447    /// is populated from it here so the rest of the pipeline is unchanged.
1448    fn end_turn(&mut self, turn: turn::Turn) {
1449        self.runtime.metrics.pending_timings = turn.metrics.timings;
1450        self.flush_turn_timings();
1451        // Clear per-turn intent (FR-008): must not persist across turns.
1452        self.services.session.current_turn_intent = None;
1453        // Clear guest context flag: each turn is independently classified.
1454        self.services.session.is_guest_context = false;
1455        // Cancel all in-flight speculative handles at turn boundary.
1456        if let Some(ref engine) = self.services.speculation_engine {
1457            let metrics = engine.end_turn();
1458            if metrics.committed > 0 || metrics.cancelled > 0 {
1459                tracing::debug!(
1460                    committed = metrics.committed,
1461                    cancelled = metrics.cancelled,
1462                    wasted_ms = metrics.wasted_ms,
1463                    "speculation: turn boundary metrics"
1464                );
1465            }
1466        }
1467    }
1468
1469    #[tracing::instrument(
1470        name = "core.agent.process_user_message",
1471        skip_all,
1472        level = "debug",
1473        fields(turn_id),
1474        err
1475    )]
1476    async fn process_user_message(
1477        &mut self,
1478        text: String,
1479        image_parts: Vec<zeph_llm::provider::MessagePart>,
1480    ) -> Result<(), error::AgentError> {
1481        let input = turn::TurnInput::new(text, image_parts);
1482        let mut t = self.begin_turn(input);
1483
1484        let turn_idx = usize::try_from(t.id().0).unwrap_or(usize::MAX);
1485        tracing::Span::current().record("turn_id", t.id().0);
1486        // Record iteration start in trace collector (C-02: owned guard, no borrow held).
1487        self.runtime
1488            .debug
1489            .start_iteration_span(turn_idx, t.input.text.trim());
1490
1491        let result = Box::pin(self.process_user_message_inner(&mut t)).await;
1492
1493        // Close iteration span regardless of outcome (partial trace preserved on error).
1494        let span_status = if result.is_ok() {
1495            crate::debug_dump::trace::SpanStatus::Ok
1496        } else {
1497            crate::debug_dump::trace::SpanStatus::Error {
1498                message: "iteration failed".to_owned(),
1499            }
1500        };
1501        self.runtime.debug.end_iteration_span(turn_idx, span_status);
1502
1503        self.end_turn(t);
1504        result
1505    }
1506
1507    #[tracing::instrument(
1508        name = "core.agent.process_user_message_inner",
1509        skip_all,
1510        level = "debug",
1511        err
1512    )]
1513    async fn process_user_message_inner(
1514        &mut self,
1515        turn: &mut turn::Turn,
1516    ) -> Result<(), error::AgentError> {
1517        self.reap_background_tasks_and_update_metrics();
1518
1519        let tokens_before_turn = self
1520            .runtime
1521            .metrics
1522            .metrics_tx
1523            .as_ref()
1524            .map_or(0, |tx| tx.borrow().total_tokens);
1525
1526        // Drain any background shell completions that arrived since the last turn.
1527        // They are buffered in `pending_background_completions` and merged with the
1528        // real user message into a single user-role block below (N1 invariant).
1529        self.drain_background_completions();
1530
1531        self.wire_cancel_bridge(turn.cancel_token());
1532
1533        // Clone text out of Turn so we can hold both `&str` borrows and mutate turn.metrics.
1534        let text = turn.input.text.clone();
1535        let trimmed_owned = text.trim().to_owned();
1536        let trimmed = trimmed_owned.as_str();
1537
1538        // Capture current-turn intent for VIGIL gate (FR-007). Truncated to 1024 chars.
1539        // Must be set BEFORE any tool call; cleared at end_turn (FR-008).
1540        if self.services.security.vigil.is_some() {
1541            let intent_len = trimmed.floor_char_boundary(1024.min(trimmed.len()));
1542            self.services.session.current_turn_intent = Some(trimmed[..intent_len].to_owned());
1543        }
1544
1545        if let Some(result) = self.dispatch_slash_command(trimmed).await {
1546            return result;
1547        }
1548
1549        self.check_pending_rollbacks().await;
1550
1551        if self.pre_process_security(trimmed).await? {
1552            return Ok(());
1553        }
1554
1555        let t_ctx = std::time::Instant::now();
1556        tracing::debug!("turn timing: prepare_context start");
1557        self.advance_context_lifecycle_guarded(&text, trimmed).await;
1558        turn.metrics_mut().timings.prepare_context_ms =
1559            u64::try_from(t_ctx.elapsed().as_millis()).unwrap_or(u64::MAX);
1560        tracing::debug!(
1561            ms = turn.metrics_snapshot().timings.prepare_context_ms,
1562            "turn timing: prepare_context done"
1563        );
1564
1565        let image_parts = std::mem::take(&mut turn.input.image_parts);
1566        // Prepend any background completion blocks to the user text. All completions and the
1567        // user message MUST be merged into a single user-role block to satisfy the strict
1568        // user/assistant alternation rule (Anthropic Messages API — N1 invariant).
1569        let merged_text = self.build_user_message_text_with_bg_completions(&text);
1570        let user_msg = self.build_user_message(&merged_text, image_parts);
1571
1572        // Extract URLs from user input and add to user_provided_urls for grounding checks.
1573        // URL set was cleared in begin_turn; re-populate for this turn.
1574        let urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
1575        if !urls.is_empty() {
1576            self.services
1577                .security
1578                .user_provided_urls
1579                .write()
1580                .extend(urls);
1581        }
1582
1583        // Capture raw user input as goal text for A-MAC goal-conditioned write gating (#2483).
1584        // Derived from the raw input text before context assembly to avoid timing dependencies.
1585        self.services.memory.extraction.goal_text = Some(text.clone());
1586
1587        let t_persist = std::time::Instant::now();
1588        tracing::debug!("turn timing: persist_message(user) start");
1589        // Image parts intentionally excluded — base64 payloads too large for message history.
1590        self.persist_message(Role::User, &text, &[], false).await;
1591        turn.metrics_mut().timings.persist_message_ms =
1592            u64::try_from(t_persist.elapsed().as_millis()).unwrap_or(u64::MAX);
1593        tracing::debug!(
1594            ms = turn.metrics_snapshot().timings.persist_message_ms,
1595            "turn timing: persist_message(user) done"
1596        );
1597        self.push_message(user_msg);
1598
1599        // llm_chat_ms and tool_exec_ms are accumulated inside call_chat_with_tools and
1600        // handle_native_tool_calls respectively via metrics.pending_timings.
1601        tracing::debug!("turn timing: process_response start");
1602        let turn_had_error = if let Err(e) = self.process_response().await {
1603            // Detach any in-flight learning tasks before mutating message state.
1604            self.services.learning_engine.learning_tasks.detach_all();
1605            tracing::error!("Response processing failed: {e:#}");
1606
1607            // Record provider failure timestamp so the next turn can skip
1608            // expensive context preparation while providers are known-down.
1609            if e.is_no_providers() {
1610                self.runtime.lifecycle.last_no_providers_at = Some(std::time::Instant::now());
1611                let backoff_secs = self.runtime.config.timeouts.no_providers_backoff_secs;
1612                tracing::warn!(
1613                    backoff_secs,
1614                    "no providers available; backing off before next turn"
1615                );
1616                tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await;
1617            }
1618
1619            let user_msg = format!("Error: {e:#}");
1620            self.channel.send(&user_msg).await?;
1621            self.msg.messages.pop();
1622            self.recompute_prompt_tokens();
1623            self.channel.flush_chunks().await?;
1624            true
1625        } else {
1626            // Detach learning tasks spawned this turn — they are fire-and-forget and must not
1627            // leak into the next turn's context.
1628            self.services.learning_engine.learning_tasks.detach_all();
1629            self.truncate_old_tool_results();
1630            // MagicDocs: spawn background doc updates if any are due (#2702).
1631            self.maybe_update_magic_docs();
1632            // Compression spectrum: fire-and-forget promotion scan (#3305).
1633            self.maybe_spawn_promotion_scan();
1634            false
1635        };
1636        tracing::debug!("turn timing: process_response done");
1637
1638        // MARCH self-check hook: runs after every successful response, including cache-hit path.
1639        if let Some(pipeline) = self.services.quality.clone() {
1640            self.run_self_check_for_turn(pipeline, turn.id().0).await;
1641        }
1642        // Flush pending response chunks and emit ResponseEnd exactly once per turn.
1643        // send() no longer emits ResponseEnd — flush_chunks() is the sole emitter.
1644        // When self-check appends a flag_marker chunk, this single call covers both
1645        // the main response and the marker, preventing the double response_end of #3243.
1646        let _ = self.channel.flush_chunks().await;
1647
1648        self.maybe_fire_completion_notification(turn, turn_had_error);
1649
1650        self.flush_goal_accounting(tokens_before_turn);
1651
1652        // Collect llm_chat_ms and tool_exec_ms from MetricsState.pending_timings (accumulated
1653        // by the tool execution chain) into turn.metrics so end_turn can flush them.
1654        // This is the Phase 1 bridging: existing code writes to pending_timings directly;
1655        // we harvest those values into Turn before end_turn overwrites pending_timings.
1656        turn.metrics_mut().timings.llm_chat_ms = self.runtime.metrics.pending_timings.llm_chat_ms;
1657        turn.metrics_mut().timings.tool_exec_ms = self.runtime.metrics.pending_timings.tool_exec_ms;
1658
1659        Ok(())
1660    }
1661
1662    /// Wire the per-turn cancellation token into the cancel bridge.
1663    ///
1664    /// The bridge translates `cancel_signal` (Notify) into a `CancellationToken` cancel so that
1665    /// channel-level abort requests propagate to the in-flight LLM call. The previous bridge task
1666    /// is aborted before a new one is spawned to prevent unbounded accumulation (#2737).
1667    fn wire_cancel_bridge(&mut self, turn_token: &tokio_util::sync::CancellationToken) {
1668        let signal = Arc::clone(&self.runtime.lifecycle.cancel_signal);
1669        let token = turn_token.clone();
1670        // Keep lifecycle.cancel_token in sync so existing code that reads it still works.
1671        self.runtime.lifecycle.cancel_token = turn_token.clone();
1672        if let Some(prev) = self.runtime.lifecycle.cancel_bridge_handle.take() {
1673            prev.abort();
1674        }
1675        self.runtime.lifecycle.cancel_bridge_handle =
1676            Some(self.runtime.lifecycle.task_supervisor.spawn_oneshot(
1677                std::sync::Arc::from("agent.lifecycle.cancel_bridge"),
1678                move || async move {
1679                    signal.notified().await;
1680                    token.cancel();
1681                },
1682            ));
1683    }
1684
1685    /// Reap completed background tasks, apply summarization signal, and update supervisor metrics.
1686    ///
1687    /// Called at the top of each turn, before any user message processing.
1688    fn reap_background_tasks_and_update_metrics(&mut self) {
1689        let bg_signal = self.runtime.lifecycle.supervisor.reap();
1690        if bg_signal.did_summarize {
1691            self.services.memory.persistence.unsummarized_count = 0;
1692            tracing::debug!("background summarization completed; unsummarized_count reset");
1693        }
1694        let snap = self.runtime.lifecycle.supervisor.metrics_snapshot();
1695        self.update_metrics(|m| {
1696            m.bg_inflight = snap.inflight as u64;
1697            m.bg_dropped = snap.total_dropped();
1698            m.bg_completed = snap.total_completed();
1699            m.bg_enrichment_inflight = snap.class_inflight[0] as u64;
1700            m.bg_telemetry_inflight = snap.class_inflight[1] as u64;
1701        });
1702
1703        // Update shell background run rows for TUI panel.
1704        if self.runtime.lifecycle.shell_executor_handle.is_some() {
1705            let shell_rows: Vec<crate::metrics::ShellBackgroundRunRow> = self
1706                .runtime
1707                .lifecycle
1708                .shell_executor_handle
1709                .as_ref()
1710                .map(|e| e.background_runs_snapshot())
1711                .unwrap_or_default()
1712                .into_iter()
1713                .map(|s| crate::metrics::ShellBackgroundRunRow {
1714                    run_id: truncate_shell_run_id(&s.run_id),
1715                    command: truncate_shell_command(&s.command),
1716                    elapsed_secs: s.elapsed_ms / 1000,
1717                })
1718                .collect();
1719            self.update_metrics(|m| {
1720                m.shell_background_runs = shell_rows;
1721            });
1722        }
1723
1724        // Intentional ordering: reap() runs before abort_class() so completed tasks are
1725        // accounted in the snapshot above.
1726        if self
1727            .runtime
1728            .config
1729            .supervisor_config
1730            .abort_enrichment_on_turn
1731        {
1732            self.runtime
1733                .lifecycle
1734                .supervisor
1735                .abort_class(agent_supervisor::TaskClass::Enrichment);
1736        }
1737    }
1738
1739    /// Fire completion notifications and `turn_complete` hooks after each turn.
1740    ///
1741    /// Builds [`crate::notifications::TurnSummary`] once and reuses it for both the
1742    /// [`crate::notifications::Notifier`] and any `[[hooks.turn_complete]]` entries. The
1743    /// `preview` field is already redacted by [`Self::last_assistant_preview`], so hook
1744    /// env vars carry no raw assistant output.
1745    ///
1746    /// Gating:
1747    /// - When a `Notifier` is configured, both the notifier and hooks share its
1748    ///   `should_fire` gate (`min_turn_duration_ms`, `only_on_error`, `enabled`).
1749    /// - When no `Notifier` is configured, hooks fire on every turn completion (the
1750    ///   notifier path is simply skipped).
1751    fn maybe_fire_completion_notification(&mut self, turn: &turn::Turn, is_error: bool) {
1752        let snap = turn.metrics_snapshot().timings.clone();
1753        let duration_ms = snap
1754            .prepare_context_ms
1755            .saturating_add(snap.llm_chat_ms)
1756            .saturating_add(snap.tool_exec_ms);
1757        let summary = crate::notifications::TurnSummary {
1758            duration_ms,
1759            preview: self.last_assistant_preview(160),
1760            // TODO: wire turn_tool_calls counter once LifecycleState tracks it (Phase 2).
1761            tool_calls: 0,
1762            llm_requests: self.runtime.lifecycle.turn_llm_requests,
1763            exit_status: if is_error {
1764                crate::notifications::TurnExitStatus::Error
1765            } else {
1766                crate::notifications::TurnExitStatus::Success
1767            },
1768        };
1769
1770        // Gate evaluation: notifier's should_fire result (or unconditional when absent).
1771        let gate_ok = self
1772            .runtime
1773            .lifecycle
1774            .notifier
1775            .as_ref()
1776            .is_none_or(|n| n.should_fire(&summary));
1777
1778        // 1) Existing notifier path — unchanged semantics.
1779        if let Some(ref notifier) = self.runtime.lifecycle.notifier
1780            && gate_ok
1781        {
1782            notifier.fire(&summary);
1783        }
1784
1785        // 2) turn_complete hooks — fire-and-forget via supervisor.
1786        // McpManagerDispatch wraps Arc<McpManager> and is 'static, so it can be moved
1787        // into the async block satisfying tokio::spawn's bound. The &dyn McpDispatch
1788        // borrow is created inside the future from the owned dispatch value.
1789        let hooks = self.services.session.hooks_config.turn_complete.clone();
1790        if !hooks.is_empty() && gate_ok {
1791            let mut env = std::collections::HashMap::new();
1792            env.insert(
1793                "ZEPH_TURN_DURATION_MS".to_owned(),
1794                summary.duration_ms.to_string(),
1795            );
1796            env.insert(
1797                "ZEPH_TURN_STATUS".to_owned(),
1798                if is_error { "error" } else { "success" }.to_owned(),
1799            );
1800            env.insert("ZEPH_TURN_PREVIEW".to_owned(), summary.preview.clone());
1801            env.insert(
1802                "ZEPH_TURN_LLM_REQUESTS".to_owned(),
1803                summary.llm_requests.to_string(),
1804            );
1805            let dispatch = self.mcp_dispatch();
1806            let _span = tracing::info_span!("core.agent.turn_hooks").entered();
1807            let _accepted = self.runtime.lifecycle.supervisor.spawn(
1808                agent_supervisor::TaskClass::Telemetry,
1809                "turn-complete-hooks",
1810                async move {
1811                    let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
1812                        .as_ref()
1813                        .map(|d| d as &dyn zeph_subagent::McpDispatch);
1814                    if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp).await {
1815                        tracing::warn!(error = %e, "turn_complete hook failed");
1816                    }
1817                },
1818            );
1819        }
1820    }
1821
1822    /// Publish the active goal snapshot to `MetricsSnapshot` and fire `on_turn_complete`
1823    /// accounting as a tracked background task.
1824    fn flush_goal_accounting(&mut self, tokens_before: u64) {
1825        let goal_snap = self
1826            .services
1827            .goal_accounting
1828            .as_ref()
1829            .and_then(|a| a.snapshot());
1830        self.update_metrics(|m| m.active_goal = goal_snap);
1831
1832        if let Some(ref accounting) = self.services.goal_accounting {
1833            let tokens_after = self
1834                .runtime
1835                .metrics
1836                .metrics_tx
1837                .as_ref()
1838                .map_or(0, |tx| tx.borrow().total_tokens);
1839            let turn_tokens = tokens_after.saturating_sub(tokens_before);
1840            let mut spawned: Option<
1841                std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + 'static>>,
1842            > = None;
1843            accounting.on_turn_complete(turn_tokens, |fut| {
1844                spawned = Some(fut);
1845            });
1846            if let Some(fut) = spawned {
1847                let _ = self.runtime.lifecycle.supervisor.spawn(
1848                    agent_supervisor::TaskClass::Telemetry,
1849                    "goal-accounting",
1850                    fut,
1851                );
1852            }
1853        }
1854    }
1855
1856    // Returns true if the input was blocked and the caller should return Ok(()) immediately.
1857    #[tracing::instrument(
1858        name = "core.agent.pre_process_security",
1859        skip_all,
1860        level = "debug",
1861        err
1862    )]
1863    async fn pre_process_security(&mut self, trimmed: &str) -> Result<bool, error::AgentError> {
1864        // Guardrail: LLM-based prompt injection pre-screening at the user input boundary.
1865        if let Some(ref guardrail) = self.services.security.guardrail {
1866            use zeph_sanitizer::guardrail::GuardrailVerdict;
1867            let verdict = guardrail.check(trimmed).await;
1868            match &verdict {
1869                GuardrailVerdict::Flagged { reason, .. } => {
1870                    tracing::warn!(
1871                        reason = %reason,
1872                        should_block = verdict.should_block(),
1873                        "guardrail flagged user input"
1874                    );
1875                    if verdict.should_block() {
1876                        let msg = format!("[guardrail] Input blocked: {reason}");
1877                        let _ = self.channel.send(&msg).await;
1878                        let _ = self.channel.flush_chunks().await;
1879                        return Ok(true);
1880                    }
1881                    // Warn mode: notify but continue.
1882                    let _ = self
1883                        .channel
1884                        .send(&format!("[guardrail] Warning: {reason}"))
1885                        .await;
1886                }
1887                GuardrailVerdict::Error { error } => {
1888                    if guardrail.error_should_block() {
1889                        tracing::warn!(%error, "guardrail check failed (fail_strategy=closed), blocking input");
1890                        let msg = "[guardrail] Input blocked: check failed (see logs for details)";
1891                        let _ = self.channel.send(msg).await;
1892                        let _ = self.channel.flush_chunks().await;
1893                        return Ok(true);
1894                    }
1895                    tracing::warn!(%error, "guardrail check failed (fail_strategy=open), allowing input");
1896                }
1897                GuardrailVerdict::Safe => {}
1898            }
1899        }
1900
1901        // ML classifier: lightweight injection detection on user input boundary.
1902        // Runs after guardrail (LLM-based) to layer defenses. On detection, blocks and returns.
1903        // Falls back to regex on classifier error/timeout — never degrades below regex baseline.
1904        // Gated by `scan_user_input`: DeBERTa is tuned for external/untrusted content, not
1905        // direct user chat. Disabled by default to prevent false positives on benign messages.
1906        #[cfg(feature = "classifiers")]
1907        if self.services.security.sanitizer.scan_user_input() {
1908            match self
1909                .services
1910                .security
1911                .sanitizer
1912                .classify_injection(trimmed)
1913                .await
1914            {
1915                zeph_sanitizer::InjectionVerdict::Blocked => {
1916                    self.push_classifier_metrics();
1917                    let _ = self
1918                        .channel
1919                        .send("[security] Input blocked: injection detected by classifier.")
1920                        .await;
1921                    let _ = self.channel.flush_chunks().await;
1922                    return Ok(true);
1923                }
1924                zeph_sanitizer::InjectionVerdict::Suspicious => {
1925                    tracing::warn!("injection_classifier soft_signal on user input");
1926                }
1927                zeph_sanitizer::InjectionVerdict::Clean => {}
1928            }
1929        }
1930        #[cfg(feature = "classifiers")]
1931        self.push_classifier_metrics();
1932
1933        Ok(false)
1934    }
1935
1936    /// Run `advance_context_lifecycle` with provider-health gating and a wall-clock timeout.
1937    ///
1938    /// Skips context preparation entirely when providers failed on the previous turn and the
1939    /// `no_providers_backoff_secs` window has not yet elapsed. When providers are available,
1940    /// wraps the call with `context_prep_timeout_secs` to prevent a stall when embed backends
1941    /// are rate-limited or unavailable (#3357).
1942    async fn advance_context_lifecycle_guarded(&mut self, text: &str, trimmed: &str) {
1943        let backoff_secs = self.runtime.config.timeouts.no_providers_backoff_secs;
1944        let prep_timeout_secs = self.runtime.config.timeouts.context_prep_timeout_secs;
1945
1946        // Skip expensive memory recall / embedding when providers are known-down.
1947        let providers_recently_failed = self
1948            .runtime
1949            .lifecycle
1950            .last_no_providers_at
1951            .is_some_and(|t| t.elapsed().as_secs() < backoff_secs);
1952
1953        if providers_recently_failed {
1954            tracing::warn!(
1955                backoff_secs,
1956                "skipping context preparation: providers were unavailable on last turn"
1957            );
1958            return;
1959        }
1960
1961        let timeout_dur = std::time::Duration::from_secs(prep_timeout_secs);
1962        match tokio::time::timeout(timeout_dur, self.advance_context_lifecycle(text, trimmed)).await
1963        {
1964            Ok(()) => {}
1965            Err(_elapsed) => {
1966                tracing::warn!(
1967                    timeout_secs = prep_timeout_secs,
1968                    "context preparation timed out; proceeding with degraded context"
1969                );
1970            }
1971        }
1972    }
1973
1974    #[tracing::instrument(
1975        name = "core.agent.advance_context_lifecycle",
1976        skip_all,
1977        level = "debug"
1978    )]
1979    async fn advance_context_lifecycle(&mut self, text: &str, trimmed: &str) {
1980        // Reset per-message pruning cache at the start of each turn (#2298).
1981        self.services.mcp.pruning_cache.reset();
1982
1983        // Extract before rebuild_system_prompt so the value is not tainted
1984        // by the secrets-bearing system prompt (ConversationId is just an i64).
1985        let conv_id = self.services.memory.persistence.conversation_id;
1986        self.rebuild_system_prompt(text).await;
1987
1988        self.detect_and_record_corrections(trimmed, conv_id).await;
1989        self.services.learning_engine.tick();
1990        self.analyze_and_learn().await;
1991        self.sync_graph_counts().await;
1992
1993        // Reset per-turn compaction guard FIRST so SideQuest sees a clean slate (C2 fix).
1994        // complete_focus and maybe_sidequest_eviction set this flag when they run (C1 fix).
1995        // advance_turn() transitions CompactedThisTurn → Cooling/Ready; all other states
1996        // pass through unchanged. See CompactionState::advance_turn for ordering guarantees.
1997        self.context_manager.compaction = self.context_manager.compaction.advance_turn();
1998
1999        // Tick Focus Agent and SideQuest turn counters (#1850, #1885).
2000        {
2001            self.services.focus.tick();
2002
2003            // SideQuest eviction: runs every N user turns when enabled.
2004            // Skipped when is_compacted_this_turn (focus truncation or prior eviction ran).
2005            let sidequest_should_fire = self.services.sidequest.tick();
2006            if sidequest_should_fire && !self.context_manager.compaction.is_compacted_this_turn() {
2007                self.maybe_sidequest_eviction();
2008            }
2009        }
2010
2011        // Experience memory: evolution sweep (fire-and-forget). Runs every N user turns,
2012        // gated on graph + experience config, and only when both stores are attached.
2013        {
2014            let cfg = &self.services.memory.extraction.graph_config.experience;
2015            if cfg.enabled
2016                && cfg.evolution_sweep_enabled
2017                && cfg.evolution_sweep_interval > 0
2018                && self
2019                    .services
2020                    .sidequest
2021                    .turn_counter
2022                    .checked_rem(cfg.evolution_sweep_interval as u64)
2023                    == Some(0)
2024                && let Some(memory) = self.services.memory.persistence.memory.as_ref()
2025                && let (Some(exp), Some(graph)) =
2026                    (memory.experience.as_ref(), memory.graph_store.as_ref())
2027            {
2028                let exp = std::sync::Arc::clone(exp);
2029                let graph = std::sync::Arc::clone(graph);
2030                let threshold = cfg.confidence_prune_threshold;
2031                let turn = self.services.sidequest.turn_counter;
2032                let accepted = self.runtime.lifecycle.supervisor.spawn(
2033                    agent_supervisor::TaskClass::Telemetry,
2034                    "experience-sweep",
2035                    async move {
2036                        match exp.evolution_sweep(graph.as_ref(), threshold).await {
2037                            Ok(stats) => tracing::info!(
2038                                turn,
2039                                self_loops = stats.pruned_self_loops,
2040                                low_confidence = stats.pruned_low_confidence,
2041                                "evolution sweep complete",
2042                            ),
2043                            Err(e) => tracing::warn!(
2044                                turn,
2045                                error = %e,
2046                                "evolution sweep failed",
2047                            ),
2048                        }
2049                    },
2050                );
2051                if !accepted {
2052                    tracing::warn!(
2053                        turn = self.services.sidequest.turn_counter,
2054                        "experience-sweep dropped (telemetry class at capacity)",
2055                    );
2056                }
2057            }
2058        }
2059
2060        // Cache-expiry warning (#2715): notify user when prompt cache has likely expired.
2061        if let Some(warning) = self.cache_expiry_warning() {
2062            tracing::info!(warning, "cache expiry warning");
2063            let _ = self.channel.send_status(&warning).await;
2064        }
2065
2066        // Time-based microcompact (#2699): strip stale low-value tool outputs before compaction.
2067        // Zero-LLM-cost; runs only when session gap exceeds configured threshold.
2068        self.maybe_time_based_microcompact();
2069
2070        // Tier 0: batch-apply deferred tool summaries when approaching context limit.
2071        // This is a pure in-memory operation (no LLM call) — summaries were pre-computed
2072        // during the tool loop. Intentionally does NOT set compacted_this_turn, so
2073        // proactive/reactive compaction may still fire if tokens remain above their thresholds.
2074        self.maybe_apply_deferred_summaries();
2075        self.flush_deferred_summaries().await;
2076
2077        // Proactive compression fires first (if configured); if it runs, reactive is skipped.
2078        if let Err(e) = self.maybe_proactive_compress().await {
2079            tracing::warn!("proactive compression failed: {e:#}");
2080        }
2081
2082        if let Err(e) = self.maybe_compact().await {
2083            tracing::warn!("context compaction failed: {e:#}");
2084        }
2085
2086        if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
2087            tracing::warn!("context preparation failed: {e:#}");
2088        }
2089
2090        // MAR: propagate top-1 recall confidence to the router for cost-aware routing.
2091        self.provider
2092            .set_memory_confidence(self.services.memory.persistence.last_recall_confidence);
2093
2094        self.services.learning_engine.reset_reflection();
2095    }
2096
2097    fn build_user_message(
2098        &mut self,
2099        text: &str,
2100        image_parts: Vec<zeph_llm::provider::MessagePart>,
2101    ) -> Message {
2102        let mut all_image_parts = std::mem::take(&mut self.msg.pending_image_parts);
2103        all_image_parts.extend(image_parts);
2104
2105        if !all_image_parts.is_empty() && self.provider.supports_vision() {
2106            let mut parts = vec![zeph_llm::provider::MessagePart::Text {
2107                text: text.to_owned(),
2108            }];
2109            parts.extend(all_image_parts);
2110            Message::from_parts(Role::User, parts)
2111        } else {
2112            if !all_image_parts.is_empty() {
2113                tracing::warn!(
2114                    count = all_image_parts.len(),
2115                    "image attachments dropped: provider does not support vision"
2116                );
2117            }
2118            Message {
2119                role: Role::User,
2120                content: text.to_owned(),
2121                parts: vec![],
2122                metadata: MessageMetadata::default(),
2123            }
2124        }
2125    }
2126
2127    /// Drain any ready [`zeph_tools::BackgroundCompletion`]s from the channel into
2128    /// `pending_background_completions`. Bounded by `BACKGROUND_COMPLETION_BUFFER_CAP`;
2129    /// on overflow the oldest entry is evicted and a placeholder is inserted.
2130    fn drain_background_completions(&mut self) {
2131        const BACKGROUND_COMPLETION_BUFFER_CAP: usize = 16;
2132
2133        let Some(ref mut rx) = self.runtime.lifecycle.background_completion_rx else {
2134            return;
2135        };
2136        // Non-blocking drain: collect all completions that are already ready.
2137        while let Ok(completion) = rx.try_recv() {
2138            if self.runtime.lifecycle.pending_background_completions.len()
2139                >= BACKGROUND_COMPLETION_BUFFER_CAP
2140            {
2141                tracing::warn!(
2142                    run_id = %completion.run_id,
2143                    "background completion buffer full; dropping run result"
2144                );
2145                // Buffer is full: drop the oldest queued completion and push a sentinel
2146                // for the new (incoming) run so the LLM is informed its result was lost.
2147                self.runtime
2148                    .lifecycle
2149                    .pending_background_completions
2150                    .pop_front();
2151                self.runtime
2152                    .lifecycle
2153                    .pending_background_completions
2154                    .push_back(zeph_tools::BackgroundCompletion {
2155                        run_id: completion.run_id,
2156                        exit_code: -1,
2157                        success: false,
2158                        elapsed_ms: 0,
2159                        command: completion.command,
2160                        output: format!(
2161                            "[background result for run {} dropped: buffer overflow]",
2162                            completion.run_id
2163                        ),
2164                    });
2165            } else {
2166                self.runtime
2167                    .lifecycle
2168                    .pending_background_completions
2169                    .push_back(completion);
2170            }
2171        }
2172    }
2173
2174    /// Format and drain `pending_background_completions` into a prefix string, then
2175    /// return the final merged text (prefix + user message). When there are no pending
2176    /// completions the original text is returned unchanged.
2177    fn build_user_message_text_with_bg_completions(&mut self, user_text: &str) -> String {
2178        if self
2179            .runtime
2180            .lifecycle
2181            .pending_background_completions
2182            .is_empty()
2183        {
2184            return user_text.to_owned();
2185        }
2186        let mut parts = String::new();
2187        for completion in self
2188            .runtime
2189            .lifecycle
2190            .pending_background_completions
2191            .drain(..)
2192        {
2193            let _ = write!(
2194                parts,
2195                "[Background task {} completed]\nexit_code: {}\nsuccess: {}\nelapsed_ms: {}\ncommand: {}\n\n{}\n\n",
2196                completion.run_id,
2197                completion.exit_code,
2198                completion.success,
2199                completion.elapsed_ms,
2200                completion.command,
2201                completion.output,
2202            );
2203        }
2204        parts.push_str(user_text);
2205        parts
2206    }
2207
2208    /// Poll a sub-agent until it reaches a terminal state, bridging secret requests to the
2209    /// channel. Returns a human-readable status string and success flag suitable for
2210    /// sending to the user and emitting lifecycle events.
2211    async fn poll_subagent_until_done(
2212        &mut self,
2213        task_id: &str,
2214        label: &str,
2215    ) -> Option<(String, bool)> {
2216        use zeph_subagent::SubAgentState;
2217        let result = loop {
2218            tokio::time::sleep(std::time::Duration::from_millis(500)).await;
2219
2220            // Bridge secret requests from sub-agent to channel.confirm().
2221            // Fetch the pending request first, then release the borrow before
2222            // calling channel.confirm() (which requires &mut self).
2223            #[allow(clippy::redundant_closure_for_method_calls)]
2224            let pending = self
2225                .services
2226                .orchestration
2227                .subagent_manager
2228                .as_mut()
2229                .and_then(|m| m.try_recv_secret_request());
2230            if let Some((req_task_id, req)) = pending {
2231                // req.secret_key is pre-validated to [a-zA-Z0-9_-] in manager.rs
2232                // (SEC-P1-02), so it is safe to embed in the prompt string.
2233                let confirm_prompt = format!(
2234                    "Sub-agent requests secret '{}'. Allow?",
2235                    crate::text::truncate_to_chars(&req.secret_key, 100)
2236                );
2237                let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
2238                if let Some(mgr) = self.services.orchestration.subagent_manager.as_mut() {
2239                    if approved {
2240                        let ttl = std::time::Duration::from_mins(5);
2241                        let key = req.secret_key.clone();
2242                        if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
2243                            let _ = mgr.deliver_secret(&req_task_id, key);
2244                        }
2245                    } else {
2246                        let _ = mgr.deny_secret(&req_task_id);
2247                    }
2248                }
2249            }
2250
2251            let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2252            let statuses = mgr.statuses();
2253            let Some((_, status)) = statuses.iter().find(|(id, _)| id == task_id) else {
2254                break (format!("{label} completed (no status available)."), true);
2255            };
2256            match status.state {
2257                SubAgentState::Completed => {
2258                    let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
2259                    break (format!("{label} completed: {msg}"), true);
2260                }
2261                SubAgentState::Failed => {
2262                    let msg = status
2263                        .last_message
2264                        .clone()
2265                        .unwrap_or_else(|| "unknown error".into());
2266                    break (format!("{label} failed: {msg}"), false);
2267                }
2268                SubAgentState::Canceled => {
2269                    break (format!("{label} was cancelled."), false);
2270                }
2271                _ => {
2272                    let _ = self
2273                        .channel
2274                        .send_status(&format!(
2275                            "{label}: turn {}/{}",
2276                            status.turns_used,
2277                            self.services
2278                                .orchestration
2279                                .subagent_manager
2280                                .as_ref()
2281                                .and_then(|m| m.agents_def(task_id))
2282                                .map_or(20, |d| d.permissions.max_turns)
2283                        ))
2284                        .await;
2285                }
2286            }
2287        };
2288        Some(result)
2289    }
2290
2291    /// Resolve a unique full `task_id` from a prefix. Returns `None` if the manager is absent,
2292    /// `Some(Err(msg))` on ambiguity/not-found, `Some(Ok(full_id))` on success.
2293    fn resolve_agent_id_prefix(&mut self, prefix: &str) -> Option<Result<String, String>> {
2294        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2295        let full_ids: Vec<String> = mgr
2296            .statuses()
2297            .into_iter()
2298            .map(|(tid, _)| tid)
2299            .filter(|tid| tid.starts_with(prefix))
2300            .collect();
2301        Some(match full_ids.as_slice() {
2302            [] => Err(format!("No sub-agent with id prefix '{prefix}'")),
2303            [fid] => Ok(fid.clone()),
2304            _ => Err(format!(
2305                "Ambiguous id prefix '{prefix}': matches {} agents",
2306                full_ids.len()
2307            )),
2308        })
2309    }
2310
2311    fn handle_agent_list(&self) -> Option<String> {
2312        use std::fmt::Write as _;
2313        let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2314        let defs = mgr.definitions();
2315        if defs.is_empty() {
2316            return Some("No sub-agent definitions found.".into());
2317        }
2318        let mut out = String::from("Available sub-agents:\n");
2319        for d in defs {
2320            let memory_label = match d.memory {
2321                Some(zeph_subagent::MemoryScope::User) => " [memory:user]",
2322                Some(zeph_subagent::MemoryScope::Project) => " [memory:project]",
2323                Some(zeph_subagent::MemoryScope::Local) => " [memory:local]",
2324                None => "",
2325            };
2326            if let Some(ref src) = d.source {
2327                let _ = writeln!(
2328                    out,
2329                    "  {}{} — {} ({})",
2330                    d.name, memory_label, d.description, src
2331                );
2332            } else {
2333                let _ = writeln!(out, "  {}{} — {}", d.name, memory_label, d.description);
2334            }
2335        }
2336        Some(out)
2337    }
2338
2339    fn handle_agent_status(&self) -> Option<String> {
2340        use std::fmt::Write as _;
2341        let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2342        let statuses = mgr.statuses();
2343        if statuses.is_empty() {
2344            return Some("No active sub-agents.".into());
2345        }
2346        let mut out = String::from("Active sub-agents:\n");
2347        for (id, s) in &statuses {
2348            let state = format!("{:?}", s.state).to_lowercase();
2349            let elapsed = s.started_at.elapsed().as_secs();
2350            let _ = writeln!(
2351                out,
2352                "  [{short}] {state}  turns={t}  elapsed={elapsed}s  {msg}",
2353                short = &id[..8.min(id.len())],
2354                t = s.turns_used,
2355                msg = s.last_message.as_deref().unwrap_or(""),
2356            );
2357            // Show memory directory path for agents with memory enabled.
2358            if let Some(def) = mgr.agents_def(id)
2359                && let Some(scope) = def.memory
2360                && let Ok(dir) = zeph_subagent::memory::resolve_memory_dir(scope, &def.name)
2361            {
2362                let _ = writeln!(out, "       memory: {}", dir.display());
2363            }
2364        }
2365        Some(out)
2366    }
2367
2368    fn handle_agent_approve(&mut self, id: &str) -> Option<String> {
2369        let full_id = match self.resolve_agent_id_prefix(id)? {
2370            Ok(fid) => fid,
2371            Err(msg) => return Some(msg),
2372        };
2373        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2374        if let Some((tid, req)) = mgr.try_recv_secret_request()
2375            && tid == full_id
2376        {
2377            let key = req.secret_key.clone();
2378            let ttl = std::time::Duration::from_mins(5);
2379            if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
2380                return Some(format!("Approve failed: {e}"));
2381            }
2382            if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
2383                return Some(format!("Secret delivery failed: {e}"));
2384            }
2385            return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
2386        }
2387        Some(format!(
2388            "No pending secret request for sub-agent '{full_id}'."
2389        ))
2390    }
2391
2392    fn handle_agent_deny(&mut self, id: &str) -> Option<String> {
2393        let full_id = match self.resolve_agent_id_prefix(id)? {
2394            Ok(fid) => fid,
2395            Err(msg) => return Some(msg),
2396        };
2397        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2398        match mgr.deny_secret(&full_id) {
2399            Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
2400            Err(e) => Some(format!("Deny failed: {e}")),
2401        }
2402    }
2403
2404    async fn handle_agent_command(&mut self, cmd: zeph_subagent::AgentCommand) -> Option<String> {
2405        use zeph_subagent::AgentCommand;
2406
2407        match cmd {
2408            AgentCommand::List => self.handle_agent_list(),
2409            AgentCommand::Background { name, prompt } => {
2410                self.handle_agent_background(&name, &prompt)
2411            }
2412            AgentCommand::Spawn { name, prompt }
2413            | AgentCommand::Mention {
2414                agent: name,
2415                prompt,
2416            } => self.handle_agent_spawn_foreground(&name, &prompt).await,
2417            AgentCommand::Status => self.handle_agent_status(),
2418            AgentCommand::Cancel { id } => self.handle_agent_cancel(&id),
2419            AgentCommand::Approve { id } => self.handle_agent_approve(&id),
2420            AgentCommand::Deny { id } => self.handle_agent_deny(&id),
2421            AgentCommand::Resume { id, prompt } => self.handle_agent_resume(&id, &prompt).await,
2422        }
2423    }
2424
2425    fn handle_agent_background(&mut self, name: &str, prompt: &str) -> Option<String> {
2426        let provider = self.provider.clone();
2427        let tool_executor = Arc::clone(&self.tool_executor);
2428        let skills = self.filtered_skills_for(name);
2429        let cfg = self.services.orchestration.subagent_config.clone();
2430        let spawn_ctx = self.build_spawn_context(&cfg);
2431        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2432        match mgr.spawn(
2433            name,
2434            prompt,
2435            provider,
2436            tool_executor,
2437            skills,
2438            &cfg,
2439            spawn_ctx,
2440        ) {
2441            Ok(id) => Some(format!(
2442                "Sub-agent '{name}' started in background (id: {short})",
2443                short = &id[..8.min(id.len())]
2444            )),
2445            Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
2446        }
2447    }
2448
2449    async fn handle_agent_spawn_foreground(&mut self, name: &str, prompt: &str) -> Option<String> {
2450        let provider = self.provider.clone();
2451        let tool_executor = Arc::clone(&self.tool_executor);
2452        let skills = self.filtered_skills_for(name);
2453        let cfg = self.services.orchestration.subagent_config.clone();
2454        let spawn_ctx = self.build_spawn_context(&cfg);
2455        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2456        let task_id = match mgr.spawn(
2457            name,
2458            prompt,
2459            provider,
2460            tool_executor,
2461            skills,
2462            &cfg,
2463            spawn_ctx,
2464        ) {
2465            Ok(id) => id,
2466            Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
2467        };
2468        let short = task_id[..8.min(task_id.len())].to_owned();
2469        let _ = self
2470            .channel
2471            .send(&format!("Sub-agent '{name}' running... (id: {short})"))
2472            .await;
2473        let _ = self
2474            .channel
2475            .notify_foreground_subagent_started(&task_id, name)
2476            .await;
2477        let label = format!("Sub-agent '{name}'");
2478        let Some((result, success)) = self.poll_subagent_until_done(&task_id, &label).await else {
2479            // Manager was dropped mid-poll; emit completed(false) so TUI does not stay stuck.
2480            let _ = self
2481                .channel
2482                .notify_foreground_subagent_completed(&task_id, name, false)
2483                .await;
2484            return None;
2485        };
2486        let _ = self
2487            .channel
2488            .notify_foreground_subagent_completed(&task_id, name, success)
2489            .await;
2490        Some(result)
2491    }
2492
2493    fn handle_agent_cancel(&mut self, id: &str) -> Option<String> {
2494        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2495        // Accept prefix match on task_id.
2496        let ids: Vec<String> = mgr
2497            .statuses()
2498            .into_iter()
2499            .map(|(task_id, _)| task_id)
2500            .filter(|task_id| task_id.starts_with(id))
2501            .collect();
2502        match ids.as_slice() {
2503            [] => Some(format!("No sub-agent with id prefix '{id}'")),
2504            [full_id] => {
2505                let full_id = full_id.clone();
2506                match mgr.cancel(&full_id) {
2507                    Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
2508                    Err(e) => Some(format!("Cancel failed: {e}")),
2509                }
2510            }
2511            _ => Some(format!(
2512                "Ambiguous id prefix '{id}': matches {} agents",
2513                ids.len()
2514            )),
2515        }
2516    }
2517
2518    async fn handle_agent_resume(&mut self, id: &str, prompt: &str) -> Option<String> {
2519        let cfg = self.services.orchestration.subagent_config.clone();
2520        // Resolve definition name from transcript meta before spawning so we can
2521        // look up skills by definition name rather than the UUID prefix (S1 fix).
2522        let def_name = {
2523            let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2524            match mgr.def_name_for_resume(id, &cfg) {
2525                Ok(name) => name,
2526                Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2527            }
2528        };
2529        let skills = self.filtered_skills_for(&def_name);
2530        let provider = self.provider.clone();
2531        let tool_executor = Arc::clone(&self.tool_executor);
2532        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2533        let (task_id, _) = match mgr.resume(id, prompt, provider, tool_executor, skills, &cfg) {
2534            Ok(pair) => pair,
2535            Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2536        };
2537        let short = task_id[..8.min(task_id.len())].to_owned();
2538        let _ = self
2539            .channel
2540            .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
2541            .await;
2542        let _ = self
2543            .channel
2544            .notify_foreground_subagent_started(&task_id, &def_name)
2545            .await;
2546        let Some((result, success)) = self
2547            .poll_subagent_until_done(&task_id, "Resumed sub-agent")
2548            .await
2549        else {
2550            // Manager was dropped mid-poll; emit completed(false) so TUI does not stay stuck.
2551            let _ = self
2552                .channel
2553                .notify_foreground_subagent_completed(&task_id, &def_name, false)
2554                .await;
2555            return None;
2556        };
2557        let _ = self
2558            .channel
2559            .notify_foreground_subagent_completed(&task_id, &def_name, success)
2560            .await;
2561        Some(result)
2562    }
2563
2564    fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
2565        let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2566        let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
2567        let reg = self.services.skill.registry.read();
2568        match zeph_subagent::filter_skills(&reg, &def.skills) {
2569            Ok(skills) => {
2570                let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
2571                if bodies.is_empty() {
2572                    None
2573                } else {
2574                    Some(bodies)
2575                }
2576            }
2577            Err(e) => {
2578                tracing::warn!(error = %e, "skill filtering failed for sub-agent");
2579                None
2580            }
2581        }
2582    }
2583
2584    /// Build a `SpawnContext` from current agent state for sub-agent spawning.
2585    fn build_spawn_context(
2586        &self,
2587        cfg: &zeph_config::SubAgentConfig,
2588    ) -> zeph_subagent::SpawnContext {
2589        zeph_subagent::SpawnContext {
2590            parent_messages: self.extract_parent_messages(cfg),
2591            parent_cancel: Some(self.runtime.lifecycle.cancel_token.clone()),
2592            parent_provider_name: {
2593                let name = &self.runtime.config.active_provider_name;
2594                if name.is_empty() {
2595                    None
2596                } else {
2597                    Some(name.clone())
2598                }
2599            },
2600            spawn_depth: self.runtime.config.spawn_depth,
2601            mcp_tool_names: self.extract_mcp_tool_names(),
2602            // F3 spec 050 §4: propagate seeded score when parent is >= Elevated.
2603            seed_trajectory_score: {
2604                let child = self.services.security.trajectory.spawn_child();
2605                let score = child.score_now();
2606                if score > 0.0 { Some(score) } else { None }
2607            },
2608        }
2609    }
2610
2611    /// Extract recent parent messages for history propagation (Section 5.7 in spec).
2612    ///
2613    /// Filters system messages, takes last `context_window_turns * 2` messages,
2614    /// applies a 25% context window cap using a 4-chars-per-token heuristic, and
2615    /// prunes orphaned `ToolUse`/`ToolResult` pairs at the slice boundary.
2616    fn extract_parent_messages(
2617        &self,
2618        config: &zeph_config::SubAgentConfig,
2619    ) -> Vec<zeph_llm::provider::Message> {
2620        use zeph_llm::provider::Role;
2621        if config.context_window_turns == 0 {
2622            return Vec::new();
2623        }
2624        let non_system: Vec<_> = self
2625            .msg
2626            .messages
2627            .iter()
2628            .filter(|m| m.role != Role::System)
2629            .cloned()
2630            .collect();
2631        let take_count = config.context_window_turns * 2;
2632        let start = non_system.len().saturating_sub(take_count);
2633        let mut msgs = non_system[start..].to_vec();
2634
2635        // Cap at 25% of model context window and prune orphaned tool pairs.
2636        let max_chars = 128_000usize / 4;
2637        let requested = msgs.len();
2638        trim_parent_messages(&mut msgs, max_chars);
2639        if msgs.len() < requested {
2640            tracing::info!(
2641                kept = msgs.len(),
2642                requested,
2643                "[subagent] truncated parent history due to token budget or orphan pruning"
2644            );
2645        }
2646        msgs
2647    }
2648
2649    /// Extract MCP tool names from the tool executor for diagnostic annotation.
2650    fn extract_mcp_tool_names(&self) -> Vec<String> {
2651        self.tool_executor
2652            .tool_definitions_erased()
2653            .into_iter()
2654            .filter(|t| t.id.starts_with("mcp_"))
2655            .map(|t| t.id.to_string())
2656            .collect()
2657    }
2658
2659    /// Classify a skill directory's source kind using on-disk markers and the bundled allowlist.
2660    ///
2661    /// Must be called from a blocking context (uses synchronous FS I/O).
2662    fn classify_source_kind(
2663        skill_dir: &std::path::Path,
2664        managed_dir: Option<&std::path::PathBuf>,
2665        bundled_names: &std::collections::HashSet<String>,
2666    ) -> zeph_memory::store::SourceKind {
2667        if managed_dir.is_some_and(|d| skill_dir.starts_with(d)) {
2668            let skill_name = skill_dir.file_name().and_then(|n| n.to_str()).unwrap_or("");
2669            let has_marker = skill_dir.join(".bundled").exists();
2670            if has_marker && bundled_names.contains(skill_name) {
2671                zeph_memory::store::SourceKind::Bundled
2672            } else {
2673                if has_marker {
2674                    tracing::warn!(
2675                        skill = %skill_name,
2676                        "skill has .bundled marker but is not in the bundled skill \
2677                         allowlist — classifying as Hub"
2678                    );
2679                }
2680                zeph_memory::store::SourceKind::Hub
2681            }
2682        } else {
2683            zeph_memory::store::SourceKind::Local
2684        }
2685    }
2686
2687    /// Update trust DB records for all reloaded skills.
2688    async fn update_trust_for_reloaded_skills(
2689        &mut self,
2690        all_meta: &[zeph_skills::loader::SkillMeta],
2691    ) {
2692        // Clone Arc before any .await so no &self fields are held across suspension points.
2693        let memory = self.services.memory.persistence.memory.clone();
2694        let Some(memory) = memory else {
2695            return;
2696        };
2697        let trust_cfg = self.services.skill.trust_config.clone();
2698        let managed_dir = self.services.skill.managed_dir.clone();
2699        let bundled_names: std::collections::HashSet<String> =
2700            zeph_skills::bundled_skill_names().into_iter().collect();
2701        for meta in all_meta {
2702            // Compute hash and classify source_kind in spawn_blocking — both are blocking FS calls
2703            // (.bundled marker .exists() and compute_skill_hash both do std::fs I/O).
2704            let skill_dir = meta.skill_dir.clone();
2705            let managed_dir_ref = managed_dir.clone();
2706            let bundled_names_ref = bundled_names.clone();
2707            let fs_result: Option<(String, zeph_memory::store::SourceKind)> =
2708                tokio::task::spawn_blocking(move || {
2709                    let hash = zeph_skills::compute_skill_hash(&skill_dir).ok()?;
2710                    let source_kind = Self::classify_source_kind(
2711                        &skill_dir,
2712                        managed_dir_ref.as_ref(),
2713                        &bundled_names_ref,
2714                    );
2715                    Some((hash, source_kind))
2716                })
2717                .await
2718                .unwrap_or(None);
2719
2720            let Some((current_hash, source_kind)) = fs_result else {
2721                tracing::warn!("failed to compute hash for '{}'", meta.name);
2722                continue;
2723            };
2724            let initial_level = match source_kind {
2725                zeph_memory::store::SourceKind::Bundled => &trust_cfg.bundled_level,
2726                zeph_memory::store::SourceKind::Hub => &trust_cfg.default_level,
2727                zeph_memory::store::SourceKind::Local | zeph_memory::store::SourceKind::File => {
2728                    &trust_cfg.local_level
2729                }
2730            };
2731            let existing = memory
2732                .sqlite()
2733                .load_skill_trust(&meta.name)
2734                .await
2735                .ok()
2736                .flatten();
2737            let trust_level_str = if let Some(ref row) = existing {
2738                if row.blake3_hash != current_hash {
2739                    trust_cfg.hash_mismatch_level.to_string()
2740                } else if row.source_kind != source_kind {
2741                    // source_kind changed (e.g., hub → bundled on upgrade).
2742                    // Never override an explicit operator block. For active trust levels,
2743                    // adopt the source-kind initial level when it grants more trust.
2744                    let stored = row
2745                        .trust_level
2746                        .parse::<zeph_common::SkillTrustLevel>()
2747                        .unwrap_or_else(|_| {
2748                            tracing::warn!(
2749                                skill = %meta.name,
2750                                raw = %row.trust_level,
2751                                "unrecognised trust_level in DB, treating as quarantined"
2752                            );
2753                            zeph_common::SkillTrustLevel::Quarantined
2754                        });
2755                    if !stored.is_active() || stored.severity() <= initial_level.severity() {
2756                        row.trust_level.clone()
2757                    } else {
2758                        initial_level.to_string()
2759                    }
2760                } else {
2761                    row.trust_level.clone()
2762                }
2763            } else {
2764                initial_level.to_string()
2765            };
2766            let source_path = meta.skill_dir.to_str();
2767            if let Err(e) = memory
2768                .sqlite()
2769                .upsert_skill_trust(
2770                    &meta.name,
2771                    &trust_level_str,
2772                    source_kind,
2773                    None,
2774                    source_path,
2775                    &current_hash,
2776                )
2777                .await
2778            {
2779                tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
2780            }
2781        }
2782    }
2783
2784    /// Rebuild or sync the in-memory skill matcher and BM25 index after a registry update.
2785    async fn rebuild_skill_matcher(&mut self, all_meta: &[&zeph_skills::loader::SkillMeta]) {
2786        let provider = self.embedding_provider.clone();
2787        let embed_timeout =
2788            std::time::Duration::from_secs(self.runtime.config.timeouts.embedding_seconds);
2789        let embed_fn = move |text: &str| -> zeph_skills::matcher::EmbedFuture {
2790            let owned = text.to_owned();
2791            let p = provider.clone();
2792            Box::pin(async move {
2793                if let Ok(result) = tokio::time::timeout(embed_timeout, p.embed(&owned)).await {
2794                    result
2795                } else {
2796                    tracing::warn!(
2797                        timeout_secs = embed_timeout.as_secs(),
2798                        "skill matcher: embedding timed out"
2799                    );
2800                    Err(zeph_llm::LlmError::Timeout)
2801                }
2802            })
2803        };
2804
2805        let needs_inmemory_rebuild = !self
2806            .services
2807            .skill
2808            .matcher
2809            .as_ref()
2810            .is_some_and(SkillMatcherBackend::is_qdrant);
2811
2812        if needs_inmemory_rebuild {
2813            self.services.skill.matcher = SkillMatcher::new(all_meta, embed_fn)
2814                .await
2815                .map(SkillMatcherBackend::InMemory);
2816        } else if let Some(ref mut backend) = self.services.skill.matcher {
2817            let _ = self.channel.send_status("syncing skill index...").await;
2818            let on_progress: Option<Box<dyn Fn(usize, usize) + Send>> =
2819                self.services.session.status_tx.clone().map(
2820                    |tx| -> Box<dyn Fn(usize, usize) + Send> {
2821                        Box::new(move |completed, total| {
2822                            let msg = format!("Syncing skills: {completed}/{total}");
2823                            let _ = tx.send(msg);
2824                        })
2825                    },
2826                );
2827            if let Err(e) = backend
2828                .sync(
2829                    all_meta,
2830                    &self.services.skill.embedding_model,
2831                    embed_fn,
2832                    on_progress,
2833                )
2834                .await
2835            {
2836                tracing::warn!("failed to sync skill embeddings: {e:#}");
2837            }
2838        }
2839
2840        if self.services.skill.hybrid_search {
2841            let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
2842            let _ = self.channel.send_status("rebuilding search index...").await;
2843            self.services.skill.rebuild_bm25(&descs);
2844        }
2845    }
2846
2847    #[tracing::instrument(name = "core.agent.reload_skills", skip_all, level = "debug")]
2848    async fn reload_skills(&mut self) {
2849        let old_fp = self.services.skill.fingerprint();
2850        let reload_paths = if let Some(ref supplier) = self.services.skill.plugin_dirs_supplier {
2851            let plugin_dirs = supplier();
2852            let mut paths = self.services.skill.skill_paths.clone();
2853            for dir in plugin_dirs {
2854                if !paths.contains(&dir) {
2855                    paths.push(dir);
2856                }
2857            }
2858            paths
2859        } else {
2860            self.services.skill.skill_paths.clone()
2861        };
2862        self.services.skill.registry.write().reload(&reload_paths);
2863        if self.services.skill.fingerprint() == old_fp {
2864            return;
2865        }
2866        let _ = self.channel.send_status("reloading skills...").await;
2867
2868        let all_meta = self
2869            .services
2870            .skill
2871            .registry
2872            .read()
2873            .all_meta()
2874            .into_iter()
2875            .cloned()
2876            .collect::<Vec<_>>();
2877
2878        self.update_trust_for_reloaded_skills(&all_meta).await;
2879
2880        let all_meta_refs = all_meta.iter().collect::<Vec<_>>();
2881        self.rebuild_skill_matcher(&all_meta_refs).await;
2882
2883        let all_skills: Vec<Skill> = {
2884            let reg = self.services.skill.registry.read();
2885            reg.all_meta()
2886                .iter()
2887                .filter_map(|m| reg.skill(&m.name).ok())
2888                .collect()
2889        };
2890        let trust_map = self.build_skill_trust_map().await;
2891        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
2892        let skills_prompt =
2893            state::SkillState::rebuild_prompt(&all_skills, &trust_map, &empty_health);
2894        self.services
2895            .skill
2896            .last_skills_prompt
2897            .clone_from(&skills_prompt);
2898        let system_prompt = build_system_prompt(&skills_prompt, None);
2899        if let Some(msg) = self.msg.messages.first_mut() {
2900            msg.content = system_prompt;
2901        }
2902
2903        let _ = self.channel.send_status("").await;
2904        tracing::info!(
2905            "reloaded {} skill(s)",
2906            self.services.skill.registry.read().all_meta().len()
2907        );
2908    }
2909
2910    fn reload_instructions(&mut self) {
2911        // Drain any additional queued events before reloading to avoid redundant reloads.
2912        if let Some(ref mut rx) = self.runtime.instructions.reload_rx {
2913            while rx.try_recv().is_ok() {}
2914        }
2915        let Some(ref state) = self.runtime.instructions.reload_state else {
2916            return;
2917        };
2918        let new_blocks = crate::instructions::load_instructions(
2919            &state.base_dir,
2920            &state.provider_kinds,
2921            &state.explicit_files,
2922            state.auto_detect,
2923        );
2924        let old_sources: std::collections::HashSet<_> = self
2925            .runtime
2926            .instructions
2927            .blocks
2928            .iter()
2929            .map(|b| &b.source)
2930            .collect();
2931        let new_sources: std::collections::HashSet<_> =
2932            new_blocks.iter().map(|b| &b.source).collect();
2933        for added in new_sources.difference(&old_sources) {
2934            tracing::info!(path = %added.display(), "instruction file added");
2935        }
2936        for removed in old_sources.difference(&new_sources) {
2937            tracing::info!(path = %removed.display(), "instruction file removed");
2938        }
2939        tracing::info!(
2940            old_count = self.runtime.instructions.blocks.len(),
2941            new_count = new_blocks.len(),
2942            "reloaded instruction files"
2943        );
2944        self.runtime.instructions.blocks = new_blocks;
2945    }
2946
2947    fn reload_config(&mut self) {
2948        let Some(path) = self.runtime.lifecycle.config_path.clone() else {
2949            return;
2950        };
2951        let Some(config) = self.load_config_with_overlay(&path) else {
2952            return;
2953        };
2954        let budget_tokens = resolve_context_budget(&config, &self.provider);
2955        self.runtime.config.security = config.security;
2956        self.runtime.config.timeouts = config.timeouts;
2957        self.runtime.config.redact_credentials = config.memory.redact_credentials;
2958        self.services.memory.persistence.history_limit = config.memory.history_limit;
2959        self.services.memory.persistence.recall_limit = config.memory.semantic.recall_limit;
2960        self.services.memory.compaction.summarization_threshold =
2961            config.memory.summarization_threshold;
2962        self.services.skill.max_active_skills = config.skills.max_active_skills.get();
2963        self.services.skill.disambiguation_threshold = config.skills.disambiguation_threshold;
2964        self.services.skill.min_injection_score = config.skills.min_injection_score;
2965        self.services.skill.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
2966        self.services.skill.hybrid_search = config.skills.hybrid_search;
2967        self.services.skill.two_stage_matching = config.skills.two_stage_matching;
2968        self.services.skill.confusability_threshold =
2969            config.skills.confusability_threshold.clamp(0.0, 1.0);
2970        config
2971            .skills
2972            .generation_provider
2973            .as_str()
2974            .clone_into(&mut self.services.skill.generation_provider_name);
2975        self.services.skill.generation_output_dir =
2976            config.skills.generation_output_dir.as_deref().map(|p| {
2977                if let Some(stripped) = p.strip_prefix("~/") {
2978                    dirs::home_dir()
2979                        .map_or_else(|| std::path::PathBuf::from(p), |h| h.join(stripped))
2980                } else {
2981                    std::path::PathBuf::from(p)
2982                }
2983            });
2984
2985        self.context_manager.budget = Some(
2986            ContextBudget::new(budget_tokens, 0.20).with_graph_enabled(config.memory.graph.enabled),
2987        );
2988
2989        {
2990            let graph_cfg = &config.memory.graph;
2991            if graph_cfg.rpe.enabled {
2992                // Re-create router only if it doesn't exist yet; preserve state on hot-reload.
2993                if self.services.memory.extraction.rpe_router.is_none() {
2994                    self.services.memory.extraction.rpe_router =
2995                        Some(std::sync::Mutex::new(zeph_memory::RpeRouter::new(
2996                            graph_cfg.rpe.threshold,
2997                            graph_cfg.rpe.max_skip_turns,
2998                        )));
2999                }
3000            } else {
3001                self.services.memory.extraction.rpe_router = None;
3002            }
3003            self.services.memory.extraction.graph_config = graph_cfg.clone();
3004        }
3005        self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
3006        self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
3007        self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
3008        self.context_manager.compaction_cooldown_turns = config.memory.compaction_cooldown_turns;
3009        self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
3010        self.context_manager.compression = config.memory.compression.clone();
3011        self.context_manager.routing = config.memory.store_routing.clone();
3012        // Resolve routing_classifier_provider from the provider pool (#2484).
3013        self.context_manager.store_routing_provider = if config
3014            .memory
3015            .store_routing
3016            .routing_classifier_provider
3017            .is_empty()
3018        {
3019            None
3020        } else {
3021            let resolved = self.resolve_background_provider(
3022                &config.memory.store_routing.routing_classifier_provider,
3023            );
3024            Some(std::sync::Arc::new(resolved))
3025        };
3026        self.services
3027            .memory
3028            .persistence
3029            .cross_session_score_threshold = config.memory.cross_session_score_threshold;
3030
3031        self.services.index.repo_map_tokens = config.index.repo_map_tokens;
3032        self.services.index.repo_map_ttl =
3033            std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
3034
3035        self.services
3036            .session
3037            .hooks_config
3038            .cwd_changed
3039            .clone_from(&config.hooks.cwd_changed);
3040        self.services
3041            .session
3042            .hooks_config
3043            .permission_denied
3044            .clone_from(&config.hooks.permission_denied);
3045        self.services
3046            .session
3047            .hooks_config
3048            .turn_complete
3049            .clone_from(&config.hooks.turn_complete);
3050        // file_changed_hooks require watcher restart to take effect — skipped here.
3051
3052        tracing::info!("config reloaded");
3053    }
3054
3055    /// Load config from disk, apply plugin overlays, and warn on shell divergence.
3056    ///
3057    /// Returns `None` when loading or overlay merge fails (caller keeps prior runtime state).
3058    fn load_config_with_overlay(&mut self, path: &std::path::Path) -> Option<Config> {
3059        let mut config = match Config::load(path) {
3060            Ok(c) => c,
3061            Err(e) => {
3062                tracing::warn!("config reload failed: {e:#}");
3063                return None;
3064            }
3065        };
3066
3067        // Re-apply plugin overlays. On error, keep previous runtime state intact.
3068        let new_overlay = if self.runtime.lifecycle.plugins_dir.as_os_str().is_empty() {
3069            None
3070        } else {
3071            match zeph_plugins::apply_plugin_config_overlays(
3072                &mut config,
3073                &self.runtime.lifecycle.plugins_dir,
3074            ) {
3075                Ok(o) => Some(o),
3076                Err(e) => {
3077                    tracing::warn!(
3078                        "plugin overlay merge failed during reload: {e:#}; \
3079                         keeping previous runtime state"
3080                    );
3081                    return None;
3082                }
3083            }
3084        };
3085
3086        // M4: detect shell-level divergence from the baked-in executor and warn loudly.
3087        // ShellExecutor is not rebuilt on hot-reload; only skill threshold is live.
3088        // A follow-up P2 issue tracks live-rebuild of ShellExecutor.
3089        if let Some(ref overlay) = new_overlay {
3090            self.warn_on_shell_overlay_divergence(overlay, &config);
3091        }
3092        Some(config)
3093    }
3094
3095    /// React to shell policy divergence detected on hot-reload.
3096    ///
3097    /// `blocked_commands` is rebuilt live via `ShellPolicyHandle::rebuild` — no restart needed.
3098    /// `allowed_commands` cannot be rebuilt (feeds sandbox path intersection at construction time)
3099    /// — emit a warn + status banner when it changes.
3100    fn warn_on_shell_overlay_divergence(
3101        &self,
3102        new_overlay: &zeph_plugins::ResolvedOverlay,
3103        config: &Config,
3104    ) {
3105        let new_blocked: Vec<String> = {
3106            let mut v = config.tools.shell.blocked_commands.clone();
3107            v.sort();
3108            v
3109        };
3110        let new_allowed: Vec<String> = {
3111            let mut v = config.tools.shell.allowed_commands.clone();
3112            v.sort();
3113            v
3114        };
3115
3116        let startup = &self.runtime.lifecycle.startup_shell_overlay;
3117        let blocked_changed = new_blocked != startup.blocked;
3118        let allowed_changed = new_allowed != startup.allowed;
3119
3120        // blocked_commands IS rebuilt live — emit info-level confirmation only.
3121        if blocked_changed && let Some(ref h) = self.runtime.lifecycle.shell_policy_handle {
3122            h.rebuild(&config.tools.shell);
3123            tracing::info!(
3124                blocked_count = h.snapshot_blocked().len(),
3125                "shell blocked_commands rebuilt from hot-reload"
3126            );
3127        }
3128
3129        // allowed_commands cannot be rebuilt — sandbox path intersection is computed at
3130        // executor construction time. Warn loudly so the user restarts.
3131        //
3132        // Note: when base `allowed_commands` is empty (the default), the overlay's
3133        // intersection semantics keep it empty, so this branch is silently unreachable
3134        // for users who do not set a non-empty base list.
3135        if allowed_changed {
3136            let msg = "plugin config overlay changed shell allowed_commands; RESTART REQUIRED \
3137                 for sandbox path recomputation (blocked_commands was rebuilt live)";
3138            tracing::warn!("{msg}");
3139            if let Some(ref tx) = self.services.session.status_tx {
3140                let _ = tx.send(msg.to_owned());
3141            }
3142        }
3143
3144        let _ = new_overlay;
3145    }
3146
3147    /// Run `SideQuest` tool output eviction pass (#1885).
3148    ///
3149    /// PERF-1 fix: two-phase non-blocking design.
3150    ///
3151    /// Phase 1 (apply, this turn): check for a background LLM result spawned last turn,
3152    /// validate and apply it immediately.
3153    ///
3154    /// Phase 2 (schedule, this turn): rebuild cursors and spawn a background `tokio::spawn`
3155    /// task for the LLM call. The result is stored in `pending_sidequest_result` and applied
3156    /// next turn, so the current agent turn is never blocked by the LLM call.
3157    fn maybe_sidequest_eviction(&mut self) {
3158        // S1 runtime guard: warn when SideQuest is enabled alongside a non-Reactive pruning
3159        // strategy — the two systems share the same pool of evictable tool outputs and can
3160        // interfere. Disable sidequest.enabled when pruning_strategy != Reactive.
3161        if self.services.sidequest.config.enabled {
3162            use crate::config::PruningStrategy;
3163            if !matches!(
3164                self.context_manager.compression.pruning_strategy,
3165                PruningStrategy::Reactive
3166            ) {
3167                tracing::warn!(
3168                    strategy = ?self.context_manager.compression.pruning_strategy,
3169                    "sidequest is enabled alongside a non-Reactive pruning strategy; \
3170                     consider disabling sidequest.enabled to avoid redundant eviction"
3171                );
3172            }
3173        }
3174
3175        // Guard: do not evict while a focus session is active.
3176        if self.services.focus.is_active() {
3177            tracing::debug!("sidequest: skipping — focus session active");
3178            // Drop any pending result — cursors may be stale relative to focus truncation.
3179            self.services.compression.pending_sidequest_result = None;
3180            return;
3181        }
3182
3183        // Phase 1: apply pending result from last turn's background LLM call.
3184        self.sidequest_apply_pending();
3185
3186        // Phase 2: rebuild cursors and schedule the next background eviction LLM call.
3187        self.sidequest_schedule_next();
3188    }
3189
3190    fn sidequest_apply_pending(&mut self) {
3191        let Some(handle) = self.services.compression.pending_sidequest_result.take() else {
3192            return;
3193        };
3194        // `try_join` is non-blocking: if the task isn't done yet, `Err(handle)` is returned
3195        // and we reschedule below.
3196        let result = match handle.try_join() {
3197            Ok(result) => result,
3198            Err(_handle) => {
3199                // Task still running — drop it; a fresh one is scheduled below.
3200                tracing::debug!("sidequest: background LLM task not yet complete, rescheduling");
3201                return;
3202            }
3203        };
3204        match result {
3205            Ok(Some(evicted_indices)) if !evicted_indices.is_empty() => {
3206                let cursors_snapshot = self.services.sidequest.tool_output_cursors.clone();
3207                let freed = self.services.sidequest.apply_eviction(
3208                    &mut self.msg.messages,
3209                    &evicted_indices,
3210                    &self.runtime.metrics.token_counter,
3211                );
3212                if freed > 0 {
3213                    self.recompute_prompt_tokens();
3214                    // C1 fix: prevent maybe_compact() from firing in the same turn.
3215                    // cooldown=0: eviction does not impose post-compaction cooldown.
3216                    self.context_manager.compaction =
3217                        crate::agent::context_manager::CompactionState::CompactedThisTurn {
3218                            cooldown: 0,
3219                        };
3220                    tracing::info!(
3221                        freed_tokens = freed,
3222                        evicted_cursors = evicted_indices.len(),
3223                        pass = self.services.sidequest.passes_run,
3224                        "sidequest eviction complete"
3225                    );
3226                    if let Some(ref d) = self.runtime.debug.debug_dumper {
3227                        d.dump_sidequest_eviction(&cursors_snapshot, &evicted_indices, freed);
3228                    }
3229                    if let Some(ref tx) = self.services.session.status_tx {
3230                        let _ = tx.send(format!("SideQuest evicted {freed} tokens"));
3231                    }
3232                } else {
3233                    // apply_eviction returned 0 — clear spinner so it doesn't dangle.
3234                    if let Some(ref tx) = self.services.session.status_tx {
3235                        let _ = tx.send(String::new());
3236                    }
3237                }
3238            }
3239            Ok(None | Some(_)) => {
3240                tracing::debug!("sidequest: pending result: no cursors to evict");
3241                if let Some(ref tx) = self.services.session.status_tx {
3242                    let _ = tx.send(String::new());
3243                }
3244            }
3245            Err(e) => {
3246                tracing::debug!("sidequest: background task error: {e}");
3247                if let Some(ref tx) = self.services.session.status_tx {
3248                    let _ = tx.send(String::new());
3249                }
3250            }
3251        }
3252    }
3253
3254    fn sidequest_schedule_next(&mut self) {
3255        use zeph_llm::provider::{Message, MessageMetadata, Role};
3256
3257        self.services
3258            .sidequest
3259            .rebuild_cursors(&self.msg.messages, &self.runtime.metrics.token_counter);
3260
3261        if self.services.sidequest.tool_output_cursors.is_empty() {
3262            tracing::debug!("sidequest: no eligible cursors");
3263            return;
3264        }
3265
3266        let prompt = self.services.sidequest.build_eviction_prompt();
3267        let max_eviction_ratio = self.services.sidequest.config.max_eviction_ratio;
3268        let n_cursors = self.services.sidequest.tool_output_cursors.len();
3269        // Clone the provider so the spawn closure owns it without borrowing self.
3270        let provider = self.summary_or_primary_provider().clone();
3271
3272        let eviction_future = async move {
3273            let msgs = [Message {
3274                role: Role::User,
3275                content: prompt,
3276                parts: vec![],
3277                metadata: MessageMetadata::default(),
3278            }];
3279            let response =
3280                match tokio::time::timeout(std::time::Duration::from_secs(5), provider.chat(&msgs))
3281                    .await
3282                {
3283                    Ok(Ok(r)) => r,
3284                    Ok(Err(e)) => {
3285                        tracing::debug!("sidequest bg: LLM call failed: {e:#}");
3286                        return None;
3287                    }
3288                    Err(_) => {
3289                        tracing::debug!("sidequest bg: LLM call timed out");
3290                        return None;
3291                    }
3292                };
3293
3294            let start = response.find('{')?;
3295            let end = response.rfind('}')?;
3296            if start > end {
3297                return None;
3298            }
3299            let json_slice = &response[start..=end];
3300            let parsed: sidequest::EvictionResponse = serde_json::from_str(json_slice).ok()?;
3301            let mut valid: Vec<usize> = parsed
3302                .del_cursors
3303                .into_iter()
3304                .filter(|&c| c < n_cursors)
3305                .collect();
3306            valid.sort_unstable();
3307            valid.dedup();
3308            #[allow(
3309                clippy::cast_precision_loss,
3310                clippy::cast_possible_truncation,
3311                clippy::cast_sign_loss
3312            )]
3313            let max_evict = ((n_cursors as f32) * max_eviction_ratio).ceil() as usize;
3314            valid.truncate(max_evict);
3315            Some(valid)
3316        };
3317        let handle = self.runtime.lifecycle.task_supervisor.spawn_oneshot(
3318            std::sync::Arc::from("agent.sidequest.eviction"),
3319            move || eviction_future,
3320        );
3321        self.services.compression.pending_sidequest_result = Some(handle);
3322        tracing::debug!("sidequest: background LLM eviction task spawned");
3323        if let Some(ref tx) = self.services.session.status_tx {
3324            let _ = tx.send("SideQuest: scoring tool outputs...".into());
3325        }
3326    }
3327
3328    /// Return an `McpDispatch` adapter backed by the agent's MCP manager, if present.
3329    fn mcp_dispatch(&self) -> Option<McpManagerDispatch> {
3330        self.services
3331            .mcp
3332            .manager
3333            .as_ref()
3334            .map(|m| McpManagerDispatch(Arc::clone(m)))
3335    }
3336
3337    /// Check if the process cwd has changed since last call and fire `CwdChanged` hooks.
3338    ///
3339    /// Called after each tool batch completes. The check is a single syscall and has
3340    /// negligible cost. Only fires when cwd actually changed (defense-in-depth: normally
3341    /// only `set_working_directory` changes cwd; shell child processes cannot affect it).
3342    pub(crate) async fn check_cwd_changed(&mut self) {
3343        let current = match std::env::current_dir() {
3344            Ok(p) => p,
3345            Err(e) => {
3346                tracing::warn!("check_cwd_changed: failed to get cwd: {e}");
3347                return;
3348            }
3349        };
3350        if current == self.runtime.lifecycle.last_known_cwd {
3351            return;
3352        }
3353        let old_cwd =
3354            std::mem::replace(&mut self.runtime.lifecycle.last_known_cwd, current.clone());
3355        self.services.session.env_context.working_dir = current.display().to_string();
3356
3357        tracing::info!(
3358            old = %old_cwd.display(),
3359            new = %current.display(),
3360            "working directory changed"
3361        );
3362
3363        let _ = self
3364            .channel
3365            .send_status("Working directory changed\u{2026}")
3366            .await;
3367
3368        let hooks = self.services.session.hooks_config.cwd_changed.clone();
3369        if hooks.is_empty() {
3370            tracing::debug!("CwdChanged: no hooks configured, skipping");
3371        } else {
3372            tracing::debug!(count = hooks.len(), "CwdChanged: firing hooks");
3373            let mut env = std::collections::HashMap::new();
3374            env.insert("ZEPH_OLD_CWD".to_owned(), old_cwd.display().to_string());
3375            env.insert("ZEPH_NEW_CWD".to_owned(), current.display().to_string());
3376            let dispatch = self.mcp_dispatch();
3377            let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
3378                .as_ref()
3379                .map(|d| d as &dyn zeph_subagent::McpDispatch);
3380            if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp).await {
3381                tracing::warn!(error = %e, "CwdChanged hook failed");
3382            } else {
3383                tracing::info!(count = hooks.len(), "CwdChanged: hooks fired");
3384            }
3385        }
3386
3387        let _ = self.channel.send_status("").await;
3388    }
3389
3390    /// Handle a `FileChangedEvent` from the file watcher.
3391    pub(crate) async fn handle_file_changed(
3392        &mut self,
3393        event: crate::file_watcher::FileChangedEvent,
3394    ) {
3395        tracing::info!(path = %event.path.display(), "file changed");
3396
3397        let _ = self
3398            .channel
3399            .send_status("Running file-change hook\u{2026}")
3400            .await;
3401
3402        let hooks = self
3403            .services
3404            .session
3405            .hooks_config
3406            .file_changed_hooks
3407            .clone();
3408        if hooks.is_empty() {
3409            tracing::debug!(path = %event.path.display(), "FileChanged: no hooks configured, skipping");
3410        } else {
3411            tracing::debug!(count = hooks.len(), path = %event.path.display(), "FileChanged: firing hooks");
3412            let mut env = std::collections::HashMap::new();
3413            env.insert(
3414                "ZEPH_CHANGED_PATH".to_owned(),
3415                event.path.display().to_string(),
3416            );
3417            let dispatch = self.mcp_dispatch();
3418            let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
3419                .as_ref()
3420                .map(|d| d as &dyn zeph_subagent::McpDispatch);
3421            if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp).await {
3422                tracing::warn!(error = %e, "FileChanged hook failed");
3423            } else {
3424                tracing::info!(count = hooks.len(), path = %event.path.display(), "FileChanged: hooks fired");
3425            }
3426        }
3427
3428        let _ = self.channel.send_status("").await;
3429    }
3430
3431    /// If the compression spectrum is enabled and a promotion engine is wired, spawn a
3432    /// background scan task.
3433    ///
3434    /// The task loads the most-recent episodic window from `SemanticMemory`, runs the
3435    /// greedy clustering scan, and calls `promote` for each qualifying candidate.
3436    ///
3437    /// Supervised via [`agent_supervisor::BackgroundSupervisor`] under
3438    /// [`agent_supervisor::TaskClass::Enrichment`] — dropped under high load rather than
3439    /// blocking the turn.
3440    pub(super) fn maybe_spawn_promotion_scan(&mut self) {
3441        let Some(engine) = self.services.promotion_engine.clone() else {
3442            return;
3443        };
3444
3445        let Some(memory) = self.services.memory.persistence.memory.clone() else {
3446            return;
3447        };
3448
3449        // Use a conservative window cap. The engine's own PromotionConfig thresholds
3450        // determine whether a cluster actually qualifies; this is just the DB scan limit.
3451        let promotion_window = 200usize;
3452
3453        let accepted = self.runtime.lifecycle.supervisor.spawn(
3454            agent_supervisor::TaskClass::Enrichment,
3455            "compression_spectrum.promotion_scan",
3456            async move {
3457                let span = tracing::info_span!("memory.compression.promote.background");
3458                let _enter = span.enter();
3459
3460                let window = match memory.load_promotion_window(promotion_window).await {
3461                    Ok(w) => w,
3462                    Err(e) => {
3463                        tracing::warn!(error = %e, "promotion scan: failed to load window");
3464                        return;
3465                    }
3466                };
3467
3468                if window.is_empty() {
3469                    return;
3470                }
3471
3472                let candidates = match engine.scan(&window).await {
3473                    Ok(c) => c,
3474                    Err(e) => {
3475                        tracing::warn!(error = %e, "promotion scan: clustering failed");
3476                        return;
3477                    }
3478                };
3479
3480                for candidate in &candidates {
3481                    if let Err(e) = engine.promote(candidate).await {
3482                        tracing::warn!(
3483                            signature = %candidate.signature,
3484                            error = %e,
3485                            "promotion scan: promote failed"
3486                        );
3487                    }
3488                }
3489
3490                tracing::info!(candidates = candidates.len(), "promotion scan: complete");
3491            },
3492        );
3493
3494        if accepted {
3495            tracing::debug!("compression_spectrum: promotion scan task enqueued");
3496        }
3497    }
3498}
3499/// Estimates the JSON payload size of a single [`zeph_llm::provider::Message`] for token-budget
3500/// accounting.
3501///
3502/// When `parts` is empty the message is a legacy text-only message and `content.len()` is used
3503/// directly. Otherwise each part is measured individually so that structured variants (images,
3504/// tool invocations, thinking blocks) are accounted for rather than relying on the already-flat
3505/// `content` string, which may not reflect the actual API payload size.
3506pub(crate) fn estimate_parts_size(m: &zeph_llm::provider::Message) -> usize {
3507    use zeph_llm::provider::MessagePart;
3508    if m.parts.is_empty() {
3509        return m.content.len();
3510    }
3511    m.parts
3512        .iter()
3513        .map(|p| match p {
3514            MessagePart::Text { text }
3515            | MessagePart::Recall { text }
3516            | MessagePart::CodeContext { text }
3517            | MessagePart::Summary { text }
3518            | MessagePart::CrossSession { text } => text.len(),
3519            MessagePart::ToolOutput { body, .. } => body.len(),
3520            MessagePart::ToolUse { id, name, input } => {
3521                50 + id.len() + name.len() + input.to_string().len()
3522            }
3523            MessagePart::ToolResult {
3524                tool_use_id,
3525                content,
3526                ..
3527            } => 50 + tool_use_id.len() + content.len(),
3528            MessagePart::Image(img) => img.data.len() * 4 / 3,
3529            MessagePart::ThinkingBlock {
3530                thinking,
3531                signature,
3532            } => 50 + thinking.len() + signature.len(),
3533            MessagePart::RedactedThinkingBlock { data } => data.len(),
3534            MessagePart::Compaction { summary } => summary.len(),
3535        })
3536        .sum()
3537}
3538
3539/// Applies token-budget truncation and orphaned-tool-pair pruning to a parent message slice.
3540///
3541/// Budget truncation keeps the **most recent** messages that fit within `max_chars`
3542/// (a suffix), so the subagent always receives the freshest context.
3543///
3544/// Two passes are performed after budget truncation:
3545///
3546/// 1. Remove `ToolResult` parts from user messages whose matching `ToolUse` is no longer in the
3547///    slice (truncated away).
3548/// 2. Remove `ToolUse` parts from **interior** assistant messages whose matching `ToolResult`
3549///    was removed in pass 1 or was already absent. The trailing assistant message is exempt —
3550///    its unanswered `ToolUse` calls are not orphaned; the slice just ends before the result.
3551///
3552/// Messages that become fully empty after pruning are removed from `msgs`.
3553///
3554/// `rebuild_content` is called **only** when `retain` actually removed parts — preserving the
3555/// existing `content` field (and any `ThinkingBlock` text embedded there) for unmodified
3556/// messages.
3557pub(crate) fn trim_parent_messages(msgs: &mut Vec<zeph_llm::provider::Message>, max_chars: usize) {
3558    use zeph_llm::provider::{MessagePart, Role};
3559
3560    // Token-budget cap: keep the most recent messages that fit within max_chars.
3561    // We iterate from the end (newest) and drain from the front once the budget is exceeded,
3562    // so the subagent always receives the most recent context rather than stale early messages.
3563    let mut total_chars = 0usize;
3564    let mut drop_before = 0usize; // index of the first message to keep
3565    for (i, m) in msgs.iter().enumerate().rev() {
3566        total_chars += estimate_parts_size(m);
3567        if total_chars > max_chars {
3568            drop_before = i + 1;
3569            break;
3570        }
3571    }
3572    if drop_before > 0 {
3573        msgs.drain(..drop_before);
3574    }
3575
3576    // Pass 1: collect ToolUse IDs emitted by assistant messages; prune orphaned ToolResult
3577    // parts from user messages that reference a ToolUse no longer present in the slice.
3578    // Use owned Strings to avoid holding immutable borrows across the subsequent mutable loop.
3579    let emitted_tool_ids: std::collections::HashSet<String> = msgs
3580        .iter()
3581        .filter(|m| m.role == Role::Assistant)
3582        .flat_map(|m| m.parts.iter())
3583        .filter_map(|p| {
3584            if let MessagePart::ToolUse { id, .. } = p {
3585                Some(id.clone())
3586            } else {
3587                None
3588            }
3589        })
3590        .collect();
3591
3592    let mut orphans_removed = 0usize;
3593    for m in msgs.iter_mut() {
3594        if m.role != Role::User || m.parts.is_empty() {
3595            continue;
3596        }
3597        let before = m.parts.len();
3598        m.parts.retain(|p| match p {
3599            MessagePart::ToolResult { tool_use_id, .. } => {
3600                emitted_tool_ids.contains(tool_use_id.as_str())
3601            }
3602            _ => true,
3603        });
3604        let dropped = before - m.parts.len();
3605        if dropped > 0 {
3606            orphans_removed += dropped;
3607            if m.parts.is_empty() {
3608                m.content.clear();
3609            } else {
3610                m.rebuild_content();
3611            }
3612        }
3613    }
3614
3615    // Pass 2: collect ToolResult IDs present in user messages after pass 1; prune ToolUse
3616    // parts from assistant messages whose result is confirmed absent.
3617    //
3618    // The trailing assistant message is exempt: it may legitimately contain unanswered
3619    // ToolUse calls (the slice ends before the result arrives). Only interior assistant
3620    // messages — those followed by at least one user message — can have provably orphaned
3621    // ToolUse parts (the conversation moved on without answering them).
3622    let consumed_tool_ids: std::collections::HashSet<String> = msgs
3623        .iter()
3624        .filter(|m| m.role == Role::User)
3625        .flat_map(|m| m.parts.iter())
3626        .filter_map(|p| {
3627            if let MessagePart::ToolResult { tool_use_id, .. } = p {
3628                Some(tool_use_id.clone())
3629            } else {
3630                None
3631            }
3632        })
3633        .collect();
3634
3635    // Index of the last assistant message — exempt from pass 2.
3636    let last_assistant_idx = msgs
3637        .iter()
3638        .enumerate()
3639        .rev()
3640        .find(|(_, m)| m.role == Role::Assistant)
3641        .map(|(i, _)| i);
3642
3643    for (idx, m) in msgs.iter_mut().enumerate() {
3644        if m.role != Role::Assistant || m.parts.is_empty() {
3645            continue;
3646        }
3647        // Skip the trailing assistant message — its unanswered ToolUse calls are not orphaned.
3648        if Some(idx) == last_assistant_idx {
3649            continue;
3650        }
3651        let before = m.parts.len();
3652        m.parts.retain(|p| match p {
3653            MessagePart::ToolUse { id, .. } => consumed_tool_ids.contains(id.as_str()),
3654            _ => true,
3655        });
3656        let dropped = before - m.parts.len();
3657        if dropped > 0 {
3658            orphans_removed += dropped;
3659            if m.parts.is_empty() {
3660                m.content.clear();
3661            } else {
3662                m.rebuild_content();
3663            }
3664        }
3665    }
3666
3667    // Remove messages that were emptied by orphan pruning.
3668    msgs.retain(|m| !m.content.is_empty() || !m.parts.is_empty());
3669
3670    if orphans_removed > 0 {
3671        tracing::debug!(
3672            orphans = orphans_removed,
3673            "[subagent] pruned orphaned ToolUse/ToolResult parts from parent context boundary"
3674        );
3675    }
3676}
3677
3678/// Thin wrapper that implements [`zeph_subagent::McpDispatch`] over an [`Arc<zeph_mcp::McpManager>`].
3679///
3680/// Used to pass MCP tool dispatch capability into `fire_hooks` without coupling
3681/// `zeph-subagent` to `zeph-mcp`.
3682struct McpManagerDispatch(Arc<zeph_mcp::McpManager>);
3683
3684impl zeph_subagent::McpDispatch for McpManagerDispatch {
3685    fn call_tool<'a>(
3686        &'a self,
3687        server: &'a str,
3688        tool: &'a str,
3689        args: serde_json::Value,
3690    ) -> std::pin::Pin<
3691        Box<dyn std::future::Future<Output = Result<serde_json::Value, String>> + Send + 'a>,
3692    > {
3693        Box::pin(async move {
3694            self.0
3695                .call_tool(server, tool, args)
3696                .await
3697                .map(|result| {
3698                    // Extract text content from the MCP response as a JSON value.
3699                    let texts: Vec<serde_json::Value> = result
3700                        .content
3701                        .iter()
3702                        .filter_map(|c| {
3703                            if let rmcp::model::RawContent::Text(t) = &c.raw {
3704                                Some(serde_json::Value::String(t.text.clone()))
3705                            } else {
3706                                None
3707                            }
3708                        })
3709                        .collect();
3710                    serde_json::Value::Array(texts)
3711                })
3712                .map_err(|e| e.to_string())
3713        })
3714    }
3715}
3716
3717pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
3718    while !*rx.borrow_and_update() {
3719        if rx.changed().await.is_err() {
3720            std::future::pending::<()>().await;
3721        }
3722    }
3723}
3724
3725pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
3726    match rx {
3727        Some(inner) => {
3728            if let Some(v) = inner.recv().await {
3729                Some(v)
3730            } else {
3731                *rx = None;
3732                std::future::pending().await
3733            }
3734        }
3735        None => std::future::pending().await,
3736    }
3737}
3738
3739/// Resolve the effective context budget from config, applying the `auto_budget` fallback.
3740///
3741/// Mirrors `AppBuilder::auto_budget_tokens` so hot-reload and initial startup use the same
3742/// logic: if `auto_budget = true` and `context_budget_tokens == 0`, query the provider's
3743/// context window; if still 0, fall back to 128 000 tokens.
3744/// Truncate a background run command to at most 80 characters for TUI display.
3745fn truncate_shell_command(cmd: &str) -> String {
3746    if cmd.len() <= 80 {
3747        return cmd.to_owned();
3748    }
3749    let end = cmd.floor_char_boundary(79);
3750    format!("{}…", &cmd[..end])
3751}
3752
3753/// Take the first 8 characters of a run-id hex string for compact TUI display.
3754fn truncate_shell_run_id(id: &str) -> String {
3755    id.chars().take(8).collect()
3756}
3757
3758pub(crate) fn resolve_context_budget(config: &Config, provider: &AnyProvider) -> usize {
3759    let tokens = if config.memory.auto_budget && config.memory.context_budget_tokens == 0 {
3760        if let Some(ctx_size) = provider.context_window() {
3761            tracing::info!(
3762                model_context = ctx_size,
3763                "auto-configured context budget on reload"
3764            );
3765            ctx_size
3766        } else {
3767            0
3768        }
3769    } else {
3770        config.memory.context_budget_tokens
3771    };
3772    if tokens == 0 {
3773        tracing::warn!(
3774            "context_budget_tokens resolved to 0 on reload — using fallback of 128000 tokens"
3775        );
3776        128_000
3777    } else {
3778        tokens
3779    }
3780}
3781
3782#[cfg(test)]
3783mod tests;
3784
3785#[cfg(test)]
3786pub(crate) use tests::agent_tests;
3787
3788#[cfg(test)]
3789mod test_stubs {
3790    use std::pin::Pin;
3791
3792    use zeph_commands::{
3793        CommandContext, CommandError, CommandHandler, CommandOutput, SlashCategory,
3794    };
3795
3796    /// Stub slash command registered only in `#[cfg(test)]` builds.
3797    ///
3798    /// Triggers the `Some(Err(CommandError))` arm in the session/debug registry
3799    /// dispatch block so the non-fatal error path can be tested without production
3800    /// command validation logic.
3801    pub(super) struct TestErrorCommand;
3802
3803    impl CommandHandler<CommandContext<'_>> for TestErrorCommand {
3804        fn name(&self) -> &'static str {
3805            "/test-error"
3806        }
3807
3808        fn description(&self) -> &'static str {
3809            "Test stub: always returns CommandError"
3810        }
3811
3812        fn category(&self) -> SlashCategory {
3813            SlashCategory::Session
3814        }
3815
3816        fn handle<'a>(
3817            &'a self,
3818            _ctx: &'a mut CommandContext<'_>,
3819            _args: &'a str,
3820        ) -> Pin<
3821            Box<dyn std::future::Future<Output = Result<CommandOutput, CommandError>> + Send + 'a>,
3822        > {
3823            Box::pin(async { Err(CommandError::new("boom")) })
3824        }
3825    }
3826}