Skip to main content

zeph_core/agent/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4// TODO(arch-revised-2026-04-26): agent/ module split deferred to epic/m49+/agent-split.
5// Hard prerequisite: 100% task_supervisor adoption (currently 10/31 raw tokio::spawn
6// sites in this directory). See arch-assessment-revised-2026-04-26T02-04-23.md PR 1
7// and PR 8. Do not split this file until PR 1 is merged and a /specs/ entry exists.
8
9mod acp_commands;
10mod agent_access_impl;
11pub(crate) mod agent_supervisor;
12mod autodream;
13mod builder;
14pub(crate) mod channel_impl;
15mod command_context_impls;
16pub(super) mod compression_feedback;
17mod context;
18mod context_impls;
19pub(crate) mod context_manager;
20mod corrections;
21pub mod error;
22mod experiment_cmd;
23pub(crate) mod focus;
24mod index;
25mod learning;
26pub(crate) mod learning_engine;
27mod log_commands;
28mod loop_event;
29mod lsp_commands;
30mod magic_docs;
31mod mcp;
32mod message_queue;
33mod microcompact;
34mod model_commands;
35mod persistence;
36#[cfg(feature = "scheduler")]
37mod plan;
38mod policy_commands;
39mod provider_cmd;
40#[cfg(feature = "self-check")]
41mod quality_hook;
42pub(crate) mod rate_limiter;
43#[cfg(feature = "scheduler")]
44mod scheduler_commands;
45#[cfg(feature = "scheduler")]
46mod scheduler_loop;
47pub mod session_config;
48mod session_digest;
49pub(crate) mod sidequest;
50mod skill_management;
51pub mod slash_commands;
52pub mod speculative;
53pub(crate) mod state;
54pub(crate) mod task_injection;
55pub(crate) mod tool_execution;
56pub(crate) mod tool_orchestrator;
57mod trust_commands;
58pub mod turn;
59mod utils;
60pub(crate) mod vigil;
61
62use std::collections::{HashMap, VecDeque};
63use std::fmt::Write as _;
64use std::sync::Arc;
65
66use parking_lot::RwLock;
67
68use tokio::sync::{mpsc, watch};
69use tokio_util::sync::CancellationToken;
70use zeph_llm::any::AnyProvider;
71use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
72use zeph_memory::TokenCounter;
73use zeph_memory::semantic::SemanticMemory;
74use zeph_skills::loader::Skill;
75use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
76use zeph_skills::prompt::format_skills_prompt;
77use zeph_skills::registry::SkillRegistry;
78use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
79
80use crate::channel::Channel;
81use crate::config::Config;
82use crate::context::{ContextBudget, build_system_prompt};
83use zeph_common::text::estimate_tokens;
84
85use loop_event::LoopEvent;
86use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, detect_image_mime};
87use state::MessageState;
88
89pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
90// CODE_CONTEXT_PREFIX is re-exported from zeph-agent-context::helpers so callers inside
91// zeph-core that build system-prompt injections can use it without depending on zeph-agent-context
92// directly. SESSION_DIGEST_PREFIX was removed when assembly migrated to ContextService.
93pub(crate) use zeph_agent_context::helpers::CODE_CONTEXT_PREFIX;
94pub(crate) const SCHEDULED_TASK_PREFIX: &str = "Execute the following scheduled task now: ";
95pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
96
97pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
98    use std::fmt::Write;
99    let capacity = "[tool output: ".len()
100        + tool_name.len()
101        + "]\n```\n".len()
102        + body.len()
103        + TOOL_OUTPUT_SUFFIX.len();
104    let mut buf = String::with_capacity(capacity);
105    let _ = write!(
106        buf,
107        "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
108    );
109    buf
110}
111
112/// Zeph agent: autonomous AI system with multi-model inference, semantic memory, skills,
113/// tool orchestration, and multi-channel I/O.
114///
115/// The agent maintains conversation history, manages LLM provider state, coordinates tool
116/// execution, and orchestrates memory and skill subsystems. It communicates with the outside
117/// world via the [`Channel`] trait, enabling support for CLI, Telegram, TUI, or custom I/O.
118///
119/// # Architecture
120///
121/// - **Message state**: Conversation history with system prompt, message queue, and metadata
122/// - **Memory state**: `SQLite` + Qdrant vector store for semantic search and compaction
123/// - **Skill state**: Registry, matching engine, and self-learning evolution
124/// - **Context manager**: Token budgeting, context assembly, and summarization
125/// - **Tool orchestrator**: DAG-based multi-tool execution with streaming output
126/// - **MCP client**: Multi-server support for Model Context Protocol
127/// - **Index state**: AST-based code indexing and semantic retrieval
128/// - **Security**: Sanitization, exfiltration detection, adversarial probes
129/// - **Metrics**: Token usage, latency, cost, and anomaly tracking
130///
131/// # Channel Contract
132///
133/// The agent requires a [`Channel`] implementation for user interaction:
134/// - Sends agent responses via `channel.send(message)`
135/// - Receives user input via `channel.recv()` / `channel.recv_internal()`
136/// - Supports structured events: tool invocations, tool output, streaming updates
137///
138/// # Lifecycle
139///
140/// 1. Create with [`Self::new`] or [`Self::new_with_registry_arc`]
141/// 2. Run main loop with [`Self::run`]
142/// 3. Clean up with [`Self::shutdown`] to persist state and close resources
143///
144pub struct Agent<C: Channel> {
145    // --- I/O & primary providers (kept inline) ---
146    provider: AnyProvider,
147    /// Dedicated embedding provider. Resolved once at bootstrap from `[[llm.providers]]`
148    /// (the entry with `embed = true`, or first entry with `embedding_model` set).
149    /// Falls back to `provider.clone()` when no dedicated entry exists.
150    /// **Never replaced** by `/provider switch`.
151    embedding_provider: AnyProvider,
152    channel: C,
153    pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
154
155    // --- Conversation core (kept inline) ---
156    pub(super) msg: MessageState,
157    pub(super) context_manager: context_manager::ContextManager,
158    pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
159
160    // --- Aggregated background services ---
161    pub(super) services: state::Services,
162
163    // --- Aggregated runtime / lifecycle / telemetry ---
164    pub(super) runtime: state::AgentRuntime,
165}
166
167/// Control flow signal returned by [`Agent::apply_dispatch_result`].
168enum DispatchFlow {
169    /// The command requested exit; the agent loop should `break`.
170    Break,
171    /// The command was handled; the agent loop should `continue`.
172    Continue,
173    /// The command was not recognised; the agent loop should fall through.
174    Fallthrough,
175}
176
177impl<C: Channel> Agent<C> {
178    /// Create a new agent instance with the given LLM provider, I/O channel, and subsystems.
179    ///
180    /// # Arguments
181    ///
182    /// * `provider` — Multi-model LLM provider (Claude, `OpenAI`, Ollama, Candle)
183    /// * `channel` — I/O abstraction for user interaction (CLI, Telegram, TUI, etc.)
184    /// * `registry` — Skill registry; moved into an internal `Arc<RwLock<_>>` for sharing
185    /// * `matcher` — Optional semantic skill matcher (e.g., Qdrant, BM25). If `None`,
186    ///   skills are matched by exact name only
187    /// * `max_active_skills` — Max concurrent skills in execution (must be > 0)
188    /// * `tool_executor` — Trait object for executing shell, web, and custom tools
189    ///
190    /// # Initialization
191    ///
192    /// The constructor:
193    /// 1. Wraps the skill registry into `Arc<RwLock<_>>` internally
194    /// 2. Builds the system prompt from registered skills
195    /// 3. Initializes all subsystems (memory, context manager, metrics, security)
196    /// 4. Returns a ready-to-run agent
197    ///
198    /// # Panics
199    ///
200    /// Panics if `max_active_skills` is 0.
201    #[must_use]
202    pub fn new(
203        provider: AnyProvider,
204        channel: C,
205        registry: SkillRegistry,
206        matcher: Option<SkillMatcherBackend>,
207        max_active_skills: usize,
208        tool_executor: impl ToolExecutor + 'static,
209    ) -> Self {
210        let registry = Arc::new(RwLock::new(registry));
211        let embedding_provider = provider.clone();
212        Self::new_with_registry_arc(
213            provider,
214            embedding_provider,
215            channel,
216            registry,
217            matcher,
218            max_active_skills,
219            tool_executor,
220        )
221    }
222
223    /// Create an agent from a pre-wrapped registry Arc, allowing the caller to
224    /// share the same Arc with other components (e.g. [`crate::SkillLoaderExecutor`]).
225    ///
226    /// # Panics
227    ///
228    /// Panics if the registry `RwLock` is poisoned.
229    #[must_use]
230    pub fn new_with_registry_arc(
231        provider: AnyProvider,
232        embedding_provider: AnyProvider,
233        channel: C,
234        registry: Arc<RwLock<SkillRegistry>>,
235        matcher: Option<SkillMatcherBackend>,
236        max_active_skills: usize,
237        tool_executor: impl ToolExecutor + 'static,
238    ) -> Self {
239        use state::{
240            AgentRuntime, CompressionState, DebugState, ExperimentState, FeedbackState, IndexState,
241            InstructionState, LifecycleState, McpState, MemoryState, MetricsState,
242            OrchestrationState, ProviderState, RuntimeConfig, SecurityState, Services,
243            SessionState, SkillState, ToolState,
244        };
245
246        debug_assert!(max_active_skills > 0, "max_active_skills must be > 0");
247        let all_skills: Vec<Skill> = {
248            let reg = registry.read();
249            reg.all_meta()
250                .iter()
251                .filter_map(|m| reg.skill(&m.name).ok())
252                .collect()
253        };
254        let empty_trust = HashMap::new();
255        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
256        let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
257        let system_prompt = build_system_prompt(&skills_prompt, None);
258        tracing::debug!(len = system_prompt.len(), "initial system prompt built");
259        tracing::trace!(prompt = %system_prompt, "full system prompt");
260
261        let initial_prompt_tokens = estimate_tokens(&system_prompt) as u64;
262        let token_counter = Arc::new(TokenCounter::new());
263
264        let services = Services {
265            memory: MemoryState::default(),
266            skill: SkillState::new(registry, matcher, max_active_skills, skills_prompt),
267            learning_engine: learning_engine::LearningEngine::new(),
268            feedback: FeedbackState::default(),
269            mcp: McpState::default(),
270            index: IndexState::default(),
271            session: SessionState::new(),
272            security: SecurityState::default(),
273            experiments: ExperimentState::new(),
274            compression: CompressionState::default(),
275            orchestration: OrchestrationState::default(),
276            focus: focus::FocusState::default(),
277            sidequest: sidequest::SidequestState::default(),
278            tool_state: ToolState::default(),
279            #[cfg(feature = "self-check")]
280            quality: None,
281            proactive_explorer: None,
282            promotion_engine: None,
283        };
284
285        let runtime = AgentRuntime {
286            config: RuntimeConfig::default(),
287            lifecycle: LifecycleState::new(),
288            providers: ProviderState::new(initial_prompt_tokens),
289            metrics: MetricsState::new(token_counter),
290            debug: DebugState::default(),
291            instructions: InstructionState::default(),
292        };
293
294        Self {
295            provider,
296            embedding_provider,
297            channel,
298            tool_executor: Arc::new(tool_executor),
299            msg: MessageState {
300                messages: vec![Message {
301                    role: Role::System,
302                    content: system_prompt,
303                    parts: vec![],
304                    metadata: MessageMetadata::default(),
305                }],
306                message_queue: VecDeque::new(),
307                pending_image_parts: Vec::new(),
308                last_persisted_message_id: None,
309                deferred_db_hide_ids: Vec::new(),
310                deferred_db_summaries: Vec::new(),
311            },
312            context_manager: context_manager::ContextManager::new(),
313            tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
314            services,
315            runtime,
316        }
317    }
318
319    /// Consume the agent and return the inner channel.
320    ///
321    /// Call this after [`run`][Agent::run] completes to retrieve the I/O channel (e.g., to
322    /// read captured responses from a headless channel such as `BenchmarkChannel`).
323    ///
324    /// # Examples
325    ///
326    /// ```no_run
327    /// # use zeph_core::agent::Agent;
328    /// // After agent.run().await completes, consume the agent to retrieve the channel.
329    /// // let channel: MyChannel = agent.into_channel();
330    /// ```
331    #[must_use]
332    pub fn into_channel(self) -> C {
333        self.channel
334    }
335
336    /// Poll all active sub-agents for completed/failed/canceled results.
337    ///
338    /// Non-blocking: returns immediately with a list of `(task_id, result)` pairs
339    /// for agents that have finished. Each completed agent is removed from the manager.
340    pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
341        let Some(mgr) = &mut self.services.orchestration.subagent_manager else {
342            return vec![];
343        };
344
345        let finished: Vec<String> = mgr
346            .statuses()
347            .into_iter()
348            .filter_map(|(id, status)| {
349                if matches!(
350                    status.state,
351                    zeph_subagent::SubAgentState::Completed
352                        | zeph_subagent::SubAgentState::Failed
353                        | zeph_subagent::SubAgentState::Canceled
354                ) {
355                    Some(id)
356                } else {
357                    None
358                }
359            })
360            .collect();
361
362        let mut results = vec![];
363        for task_id in finished {
364            match mgr.collect(&task_id).await {
365                Ok(result) => results.push((task_id, result)),
366                Err(e) => {
367                    tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
368                }
369            }
370        }
371        results
372    }
373
374    /// Call the LLM to generate a structured session summary with a configurable timeout.
375    ///
376    /// Falls back to plain-text chat if structured output fails or times out. Returns `None` on
377    /// any failure, logging a warning — callers must treat `None` as "skip storage".
378    ///
379    /// Each LLM attempt is bounded by `shutdown_summary_timeout_secs`; in the worst case
380    /// (structured call times out and plain-text fallback also times out) this adds up to
381    /// `2 * shutdown_summary_timeout_secs` of shutdown latency.
382    async fn call_llm_for_session_summary(
383        &self,
384        chat_messages: &[Message],
385    ) -> Option<zeph_memory::StructuredSummary> {
386        let timeout_dur = std::time::Duration::from_secs(
387            self.services
388                .memory
389                .compaction
390                .shutdown_summary_timeout_secs,
391        );
392        match tokio::time::timeout(
393            timeout_dur,
394            self.provider
395                .chat_typed_erased::<zeph_memory::StructuredSummary>(chat_messages),
396        )
397        .await
398        {
399            Ok(Ok(s)) => Some(s),
400            Ok(Err(e)) => {
401                tracing::warn!(
402                    "shutdown summary: structured LLM call failed, falling back to plain: {e:#}"
403                );
404                self.plain_text_summary_fallback(chat_messages, timeout_dur)
405                    .await
406            }
407            Err(_) => {
408                tracing::warn!(
409                    "shutdown summary: structured LLM call timed out after {}s, falling back to plain",
410                    self.services
411                        .memory
412                        .compaction
413                        .shutdown_summary_timeout_secs
414                );
415                self.plain_text_summary_fallback(chat_messages, timeout_dur)
416                    .await
417            }
418        }
419    }
420
421    async fn plain_text_summary_fallback(
422        &self,
423        chat_messages: &[Message],
424        timeout_dur: std::time::Duration,
425    ) -> Option<zeph_memory::StructuredSummary> {
426        match tokio::time::timeout(timeout_dur, self.provider.chat(chat_messages)).await {
427            Ok(Ok(plain)) => Some(zeph_memory::StructuredSummary {
428                summary: plain,
429                key_facts: vec![],
430                entities: vec![],
431            }),
432            Ok(Err(e)) => {
433                tracing::warn!("shutdown summary: plain LLM fallback failed: {e:#}");
434                None
435            }
436            Err(_) => {
437                tracing::warn!("shutdown summary: plain LLM fallback timed out");
438                None
439            }
440        }
441    }
442
443    /// Persist tombstone `ToolResult` messages for any assistant `ToolUse` parts that were written
444    /// to the DB during this session but never paired with a `ToolResult` (e.g. because stdin
445    /// closed while tool execution was in progress). Without this the next session startup strips
446    /// those assistant messages and emits orphan warnings.
447    async fn flush_orphaned_tool_use_on_shutdown(&mut self) {
448        use zeph_llm::provider::{MessagePart, Role};
449
450        // Walk messages in reverse: if the last assistant message (ignoring any trailing
451        // system messages) has ToolUse parts and is NOT immediately followed by a user
452        // message whose ToolResult ids cover those ToolUse ids, persist tombstones.
453        let msgs = &self.msg.messages;
454        // Find last assistant message index.
455        let Some(asst_idx) = msgs.iter().rposition(|m| m.role == Role::Assistant) else {
456            return;
457        };
458        let asst_msg = &msgs[asst_idx];
459        let tool_use_ids: Vec<(&str, &str, &serde_json::Value)> = asst_msg
460            .parts
461            .iter()
462            .filter_map(|p| {
463                if let MessagePart::ToolUse { id, name, input } = p {
464                    Some((id.as_str(), name.as_str(), input))
465                } else {
466                    None
467                }
468            })
469            .collect();
470        if tool_use_ids.is_empty() {
471            return;
472        }
473
474        // Check whether a following user message already pairs all ToolUse ids.
475        let paired_ids: std::collections::HashSet<&str> = msgs
476            .get(asst_idx + 1..)
477            .into_iter()
478            .flatten()
479            .filter(|m| m.role == Role::User)
480            .flat_map(|m| m.parts.iter())
481            .filter_map(|p| {
482                if let MessagePart::ToolResult { tool_use_id, .. } = p {
483                    Some(tool_use_id.as_str())
484                } else {
485                    None
486                }
487            })
488            .collect();
489
490        let unpaired: Vec<zeph_llm::provider::ToolUseRequest> = tool_use_ids
491            .iter()
492            .filter(|(id, _, _)| !paired_ids.contains(*id))
493            .map(|(id, name, input)| zeph_llm::provider::ToolUseRequest {
494                id: (*id).to_owned(),
495                name: (*name).to_owned().into(),
496                input: (*input).clone(),
497            })
498            .collect();
499
500        if unpaired.is_empty() {
501            return;
502        }
503
504        tracing::info!(
505            count = unpaired.len(),
506            "shutdown: persisting tombstone ToolResults for unpaired in-flight tool calls"
507        );
508        self.persist_cancelled_tool_results(&unpaired).await;
509    }
510
511    /// Generate and store a lightweight session summary at shutdown when no hard compaction fired.
512    ///
513    /// Guards:
514    /// - `shutdown_summary` config must be enabled
515    /// - `conversation_id` must be set (memory must be attached)
516    /// - no existing session summary in the store (primary guard — resilient to failed Qdrant writes)
517    /// - at least `shutdown_summary_min_messages` user-turn messages in history
518    ///
519    /// All errors are logged as warnings and swallowed — shutdown must never fail.
520    async fn maybe_store_shutdown_summary(&mut self) {
521        if !self.services.memory.compaction.shutdown_summary {
522            return;
523        }
524        let Some(memory) = self.services.memory.persistence.memory.clone() else {
525            return;
526        };
527        let Some(conversation_id) = self.services.memory.persistence.conversation_id else {
528            return;
529        };
530
531        // Primary guard: check if a summary already exists (handles failed Qdrant writes too).
532        match memory.has_session_summary(conversation_id).await {
533            Ok(true) => {
534                tracing::debug!("shutdown summary: session already has a summary, skipping");
535                return;
536            }
537            Ok(false) => {}
538            Err(e) => {
539                tracing::warn!("shutdown summary: failed to check existing summary: {e:#}");
540                return;
541            }
542        }
543
544        // Count user-turn messages only (skip system prompt at index 0).
545        let user_count = self
546            .msg
547            .messages
548            .iter()
549            .skip(1)
550            .filter(|m| m.role == Role::User)
551            .count();
552        if user_count
553            < self
554                .services
555                .memory
556                .compaction
557                .shutdown_summary_min_messages
558        {
559            tracing::debug!(
560                user_count,
561                min = self
562                    .services
563                    .memory
564                    .compaction
565                    .shutdown_summary_min_messages,
566                "shutdown summary: too few user messages, skipping"
567            );
568            return;
569        }
570
571        // TUI status — send errors silently ignored (TUI may already be gone at shutdown).
572        let _ = self.channel.send_status("Saving session summary...").await;
573
574        // Collect last N messages (skip system prompt at index 0).
575        let max = self
576            .services
577            .memory
578            .compaction
579            .shutdown_summary_max_messages;
580        if max == 0 {
581            tracing::debug!("shutdown summary: max_messages=0, skipping");
582            return;
583        }
584        let non_system: Vec<_> = self.msg.messages.iter().skip(1).collect();
585        let slice = if non_system.len() > max {
586            &non_system[non_system.len() - max..]
587        } else {
588            &non_system[..]
589        };
590
591        let msgs_for_prompt: Vec<(zeph_memory::MessageId, String, String)> = slice
592            .iter()
593            .map(|m| {
594                let role = match m.role {
595                    Role::User => "user".to_owned(),
596                    Role::Assistant => "assistant".to_owned(),
597                    Role::System => "system".to_owned(),
598                };
599                (zeph_memory::MessageId(0), role, m.content.clone())
600            })
601            .collect();
602
603        let prompt = zeph_memory::build_summarization_prompt(&msgs_for_prompt);
604        let chat_messages = vec![Message {
605            role: Role::User,
606            content: prompt,
607            parts: vec![],
608            metadata: MessageMetadata::default(),
609        }];
610
611        let Some(structured) = self.call_llm_for_session_summary(&chat_messages).await else {
612            let _ = self.channel.send_status("").await;
613            return;
614        };
615
616        if let Err(e) = memory
617            .store_shutdown_summary(conversation_id, &structured.summary, &structured.key_facts)
618            .await
619        {
620            tracing::warn!("shutdown summary: storage failed: {e:#}");
621        } else {
622            tracing::info!(
623                conversation_id = conversation_id.0,
624                "shutdown summary stored"
625            );
626        }
627
628        // Clear TUI status.
629        let _ = self.channel.send_status("").await;
630    }
631
632    /// Gracefully shut down the agent and persist state.
633    ///
634    /// Performs the following cleanup:
635    ///
636    /// 1. **Message persistence** — Deferred database writes (hide/summary operations)
637    ///    are flushed to memory or disk
638    /// 2. **Provider state** — LLM router state (e.g., Thompson sampling counters) is saved
639    ///    to the vault
640    /// 3. **Sub-agents** — All active sub-agent tasks are terminated
641    /// 4. **MCP servers** — All connected Model Context Protocol servers are shut down
642    /// 5. **Metrics finalization** — Compaction metrics and session metrics are recorded
643    /// 6. **Memory finalization** — Vector stores and semantic indices are flushed
644    /// 7. **Skill state** — Self-learning engine saves evolved skill definitions
645    ///
646    /// Call this before dropping the agent to ensure no data loss.
647    pub async fn shutdown(&mut self) {
648        let _ = self.channel.send_status("Shutting down...").await;
649
650        // CRIT-1: persist Thompson state accumulated during this session.
651        self.provider.save_router_state().await;
652
653        // Persist AdaptOrch Beta-arm table alongside Thompson state.
654        if let Some(ref advisor) = self.services.orchestration.topology_advisor
655            && let Err(e) = advisor.save()
656        {
657            tracing::warn!(error = %e, "adaptorch: failed to persist state");
658        }
659
660        if let Some(ref mut mgr) = self.services.orchestration.subagent_manager {
661            mgr.shutdown_all();
662        }
663
664        if let Some(ref manager) = self.services.mcp.manager {
665            manager.shutdown_all_shared().await;
666        }
667
668        // Finalize compaction trajectory: push the last open segment into the Vec.
669        // This segment would otherwise only be pushed when the next hard compaction fires,
670        // which never happens at session end.
671        if let Some(turns) = self.context_manager.turns_since_last_hard_compaction {
672            self.update_metrics(|m| {
673                m.compaction_turns_after_hard.push(turns);
674            });
675            self.context_manager.turns_since_last_hard_compaction = None;
676        }
677
678        if let Some(ref tx) = self.runtime.metrics.metrics_tx {
679            let m = tx.borrow();
680            if m.filter_applications > 0 {
681                #[allow(clippy::cast_precision_loss)]
682                let pct = if m.filter_raw_tokens > 0 {
683                    m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
684                } else {
685                    0.0
686                };
687                tracing::info!(
688                    raw_tokens = m.filter_raw_tokens,
689                    saved_tokens = m.filter_saved_tokens,
690                    applications = m.filter_applications,
691                    "tool output filtering saved ~{} tokens ({pct:.0}%)",
692                    m.filter_saved_tokens,
693                );
694            }
695            if m.compaction_hard_count > 0 {
696                tracing::info!(
697                    hard_compactions = m.compaction_hard_count,
698                    turns_after_hard = ?m.compaction_turns_after_hard,
699                    "hard compaction trajectory"
700                );
701            }
702        }
703
704        // Flush tombstone ToolResults for any assistant ToolUse that was persisted but never
705        // paired with a ToolResult (e.g. stdin EOF mid-execution). Without this the next session
706        // startup strips the orphaned ToolUse and emits warnings.
707        self.flush_orphaned_tool_use_on_shutdown().await;
708
709        // NOTE: forcibly aborts in-flight Enrichment and Telemetry tasks tracked by the
710        // supervisor. Before the sprint-1 refactor, experiment sessions were detached
711        // tokio::spawn calls that survived shutdown; they are now intentionally untracked
712        // (see experiment_cmd.rs) and will continue running until their own CancellationToken
713        // is triggered or the process exits.
714        self.runtime.lifecycle.supervisor.abort_all();
715
716        // Abort background task handles not tracked by BackgroundSupervisor.
717        // Per the Await Discipline rule, fire-and-forget handles must be aborted on shutdown.
718        if let Some(h) = self.services.compression.pending_task_goal.take() {
719            h.abort();
720        }
721        if let Some(h) = self.services.compression.pending_sidequest_result.take() {
722            h.abort();
723        }
724        if let Some(h) = self.services.compression.pending_subgoal.take() {
725            h.abort();
726        }
727
728        // Abort learning tasks (JoinSet detached at turn boundaries but not on shutdown).
729        self.services.learning_engine.learning_tasks.abort_all();
730
731        // Allow cancelled tasks to release their HTTP connections before the summary LLM call.
732        // abort_all() posts cancellation signals but does not drain tasks; aborted futures only
733        // observe cancellation at their next .await point. Without yielding here the summary
734        // call races in-flight enrichment HTTP connections for the same API rate-limit budget.
735        for _ in 0..4 {
736            tokio::task::yield_now().await;
737        }
738
739        self.maybe_store_shutdown_summary().await;
740        self.maybe_store_session_digest().await;
741
742        tracing::info!("agent shutdown complete");
743    }
744
745    /// Run the chat loop, receiving messages via the channel until EOF or shutdown.
746    ///
747    /// # Errors
748    ///
749    /// Returns an error if channel I/O or LLM communication fails.
750    /// Refresh sub-agent metrics snapshot for the TUI metrics panel.
751    fn refresh_subagent_metrics(&mut self) {
752        let Some(ref mgr) = self.services.orchestration.subagent_manager else {
753            return;
754        };
755        let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
756            .statuses()
757            .into_iter()
758            .map(|(id, s)| {
759                let def = mgr.agents_def(&id);
760                crate::metrics::SubAgentMetrics {
761                    name: def.map_or_else(|| id[..8.min(id.len())].to_owned(), |d| d.name.clone()),
762                    id: id.clone(),
763                    state: format!("{:?}", s.state).to_lowercase(),
764                    turns_used: s.turns_used,
765                    max_turns: def.map_or(20, |d| d.permissions.max_turns),
766                    background: def.is_some_and(|d| d.permissions.background),
767                    elapsed_secs: s.started_at.elapsed().as_secs(),
768                    permission_mode: def.map_or_else(String::new, |d| {
769                        use zeph_subagent::def::PermissionMode;
770                        match d.permissions.permission_mode {
771                            PermissionMode::Default => String::new(),
772                            PermissionMode::AcceptEdits => "accept_edits".into(),
773                            PermissionMode::DontAsk => "dont_ask".into(),
774                            PermissionMode::BypassPermissions => "bypass_permissions".into(),
775                            PermissionMode::Plan => "plan".into(),
776                        }
777                    }),
778                    transcript_dir: mgr
779                        .agent_transcript_dir(&id)
780                        .map(|p| p.to_string_lossy().into_owned()),
781                }
782            })
783            .collect();
784        self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
785    }
786
787    /// Non-blocking poll: notify the user when background sub-agents complete.
788    async fn notify_completed_subagents(&mut self) -> Result<(), error::AgentError> {
789        let completed = self.poll_subagents().await;
790        for (task_id, result) in completed {
791            let notice = if result.is_empty() {
792                format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
793            } else {
794                format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
795            };
796            if let Err(e) = self.channel.send(&notice).await {
797                tracing::warn!(error = %e, "failed to send sub-agent completion notice");
798            }
799        }
800        Ok(())
801    }
802
803    /// Run the agent main loop.
804    ///
805    /// # Errors
806    ///
807    /// Returns an error if the channel, LLM provider, or tool execution encounters a fatal error.
808    #[allow(clippy::too_many_lines)] // run loop is inherently large; each branch is independent
809    pub async fn run(&mut self) -> Result<(), error::AgentError>
810    where
811        C: 'static,
812    {
813        if let Some(mut rx) = self.runtime.lifecycle.warmup_ready.take()
814            && !*rx.borrow()
815        {
816            let _ = rx.changed().await;
817            if !*rx.borrow() {
818                tracing::warn!("model warmup did not complete successfully");
819            }
820        }
821
822        // Restore the last-used provider preference before any user interaction (#3308).
823        self.restore_channel_provider().await;
824
825        // Load the session digest once at session start for context injection.
826        self.load_and_cache_session_digest().await;
827        self.maybe_send_resume_recap().await;
828
829        loop {
830            self.apply_provider_override();
831            self.check_tool_refresh().await;
832            self.process_pending_elicitations().await;
833            self.refresh_subagent_metrics();
834            self.notify_completed_subagents().await?;
835            self.drain_channel();
836
837            let (text, image_parts) = if let Some(queued) = self.msg.message_queue.pop_front() {
838                self.notify_queue_count().await;
839                if queued.raw_attachments.is_empty() {
840                    (queued.text, queued.image_parts)
841                } else {
842                    let msg = crate::channel::ChannelMessage {
843                        text: queued.text,
844                        attachments: queued.raw_attachments,
845                    };
846                    self.resolve_message(msg).await
847                }
848            } else {
849                match self.next_event().await? {
850                    None | Some(LoopEvent::Shutdown) => break,
851                    Some(LoopEvent::SkillReload) => {
852                        self.reload_skills().await;
853                        continue;
854                    }
855                    Some(LoopEvent::InstructionReload) => {
856                        self.reload_instructions();
857                        continue;
858                    }
859                    Some(LoopEvent::ConfigReload) => {
860                        self.reload_config();
861                        continue;
862                    }
863                    Some(LoopEvent::UpdateNotification(msg)) => {
864                        if let Err(e) = self.channel.send(&msg).await {
865                            tracing::warn!("failed to send update notification: {e}");
866                        }
867                        continue;
868                    }
869                    Some(LoopEvent::ExperimentCompleted(msg)) => {
870                        self.services.experiments.cancel = None;
871                        if let Err(e) = self.channel.send(&msg).await {
872                            tracing::warn!("failed to send experiment completion: {e}");
873                        }
874                        continue;
875                    }
876                    Some(LoopEvent::ScheduledTask(prompt)) => {
877                        let text = format!("{SCHEDULED_TASK_PREFIX}{prompt}");
878                        let msg = crate::channel::ChannelMessage {
879                            text,
880                            attachments: Vec::new(),
881                        };
882                        self.drain_channel();
883                        self.resolve_message(msg).await
884                    }
885                    Some(LoopEvent::TaskInjected(injection)) => {
886                        if let Some(ref mut ls) = self.runtime.lifecycle.user_loop {
887                            ls.iteration += 1;
888                            tracing::info!(iteration = ls.iteration, "loop: tick");
889                        }
890                        let msg = crate::channel::ChannelMessage {
891                            text: injection.prompt,
892                            attachments: Vec::new(),
893                        };
894                        self.drain_channel();
895                        self.resolve_message(msg).await
896                    }
897                    Some(LoopEvent::FileChanged(event)) => {
898                        self.handle_file_changed(event).await;
899                        continue;
900                    }
901                    Some(LoopEvent::Message(msg)) => {
902                        self.drain_channel();
903                        self.resolve_message(msg).await
904                    }
905                }
906            };
907
908            let trimmed = text.trim();
909
910            // M3: extract flagged URLs from all slash commands before any registry dispatch,
911            // so `/skill install <url>` and similar commands populate user_provided_urls.
912            if trimmed.starts_with('/') {
913                let slash_urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
914                if !slash_urls.is_empty() {
915                    self.services
916                        .security
917                        .user_provided_urls
918                        .write()
919                        .extend(slash_urls);
920                }
921            }
922
923            // Registry dispatch: two-phase command dispatch.
924            //
925            // Phase 1 (session/debug): handlers that need sink + debug + messages but NOT agent.
926            // Phase 2 (agent): handlers that need &mut Agent directly; use null sentinels for
927            // the other CommandContext fields to satisfy the type but avoid borrow conflicts.
928            //
929            // STRUCTURAL NOTE (C4 — borrow-checker constraint, not deferred by oversight):
930            // A `TurnState<'a, C>` struct grouping disjoint `&mut Agent<C>` sub-fields would
931            // eliminate the LIFO-sentinel ordering below. The obstacle: `AgentAccess` is
932            // implemented on `Agent<C>` itself (see `agent_access_impl.rs`), which accesses
933            // fields like `memory_state`, `providers`, `mcp`, and `skill_state`. Those fields
934            // overlap with what a `TurnState` would need to borrow, so `AgentBackend::Real`
935            // cannot simultaneously hold `&mut Agent` while `TurnState` holds `&mut Agent.providers`.
936            // The fix requires splitting `Agent<C>` fields into two disjoint sub-structs and moving
937            // `AgentAccess` to the sub-struct that is disjoint from `TurnState`'s borrow set.
938            // That restructuring touches `agent_access_impl.rs`, `state.rs`, `builder.rs`, all
939            // command handlers, and the binary crate — estimated > 300 lines across > 5 files.
940            // Track as a multi-PR refactor; the current sentinel pattern is correct and safe.
941            //
942            // Drop-order rules enforced here:
943            //   - `sink_adapter` / `null_agent` declared before the registry block → dropped after.
944            //   - Phase-2 sentinels declared before `ctx` → dropped after `ctx`.
945            let session_impl = command_context_impls::SessionAccessImpl {
946                supports_exit: self.channel.supports_exit(),
947            };
948            let mut messages_impl = command_context_impls::MessageAccessImpl {
949                msg: &mut self.msg,
950                tool_state: &mut self.services.tool_state,
951                providers: &mut self.runtime.providers,
952                metrics: &self.runtime.metrics,
953                security: &mut self.services.security,
954                tool_orchestrator: &mut self.tool_orchestrator,
955            };
956            // sink_adapter declared before reg so it is dropped after reg (LIFO).
957            let mut sink_adapter = crate::channel::ChannelSinkAdapter(&mut self.channel);
958            // null_agent must be declared before reg so it lives longer (LIFO drop order).
959            let mut null_agent = zeph_commands::NullAgent;
960            let registry_handled = {
961                use zeph_commands::CommandRegistry;
962                use zeph_commands::handlers::debug::{
963                    DebugDumpCommand, DumpFormatCommand, LogCommand,
964                };
965                use zeph_commands::handlers::help::HelpCommand;
966                use zeph_commands::handlers::session::{
967                    ClearCommand, ClearQueueCommand, ExitCommand, QuitCommand, ResetCommand,
968                };
969
970                let mut reg = CommandRegistry::new();
971                reg.register(ExitCommand);
972                reg.register(QuitCommand);
973                reg.register(ClearCommand);
974                reg.register(ResetCommand);
975                reg.register(ClearQueueCommand);
976                reg.register(LogCommand);
977                reg.register(DebugDumpCommand);
978                reg.register(DumpFormatCommand);
979                reg.register(HelpCommand);
980                #[cfg(test)]
981                reg.register(test_stubs::TestErrorCommand);
982
983                let mut ctx = zeph_commands::CommandContext {
984                    sink: &mut sink_adapter,
985                    debug: &mut self.runtime.debug,
986                    messages: &mut messages_impl,
987                    session: &session_impl,
988                    agent: &mut null_agent,
989                };
990                reg.dispatch(&mut ctx, trimmed).await
991            };
992            let session_reg_missed = registry_handled.is_none();
993            match self
994                .apply_dispatch_result(registry_handled, trimmed, false)
995                .await
996            {
997                DispatchFlow::Break => break,
998                DispatchFlow::Continue => continue,
999                DispatchFlow::Fallthrough => {
1000                    // Not handled by the session/debug registry; try agent-command registry.
1001                }
1002            }
1003
1004            // Agent-command registry: handlers access Agent<C> directly.
1005            // Null sentinels declared here so they outlive ctx regardless of whether the `if`
1006            // block is entered. `ctx` borrows both `self` and the sentinels; it must drop before
1007            // any subsequent `self.channel.*` calls. Because Rust drops in LIFO order, the
1008            // sentinels here will outlive ctx (ctx is declared later, inside the block).
1009            let mut agent_null_debug = command_context_impls::NullDebugAccess;
1010            let mut agent_null_messages = command_context_impls::NullMessageAccess;
1011            let agent_null_session = command_context_impls::NullSessionAccess;
1012            let mut agent_null_sink = zeph_commands::NullSink;
1013            let agent_result: Option<
1014                Result<zeph_commands::CommandOutput, zeph_commands::CommandError>,
1015            > = if session_reg_missed {
1016                use zeph_commands::CommandRegistry;
1017                use zeph_commands::handlers::{
1018                    acp::AcpCommand,
1019                    agent_cmd::AgentCommand,
1020                    compaction::{CompactCommand, NewConversationCommand, RecapCommand},
1021                    experiment::ExperimentCommand,
1022                    loop_cmd::LoopCommand,
1023                    lsp::LspCommand,
1024                    mcp::McpCommand,
1025                    memory::{GraphCommand, GuidelinesCommand, MemoryCommand},
1026                    misc::{CacheStatsCommand, ImageCommand, NotifyTestCommand},
1027                    model::{ModelCommand, ProviderCommand},
1028                    plan::PlanCommand,
1029                    plugins::PluginsCommand,
1030                    policy::PolicyCommand,
1031                    scheduler::SchedulerCommand,
1032                    skill::{FeedbackCommand, SkillCommand, SkillsCommand},
1033                    status::{FocusCommand, GuardrailCommand, SideQuestCommand, StatusCommand},
1034                };
1035
1036                let mut agent_reg = CommandRegistry::new();
1037                agent_reg.register(MemoryCommand);
1038                agent_reg.register(GraphCommand);
1039                agent_reg.register(GuidelinesCommand);
1040                agent_reg.register(ModelCommand);
1041                agent_reg.register(ProviderCommand);
1042                // Phase 6 migrations: /skill, /skills, /feedback use clone-before-await pattern.
1043                agent_reg.register(SkillCommand);
1044                agent_reg.register(SkillsCommand);
1045                agent_reg.register(FeedbackCommand);
1046                agent_reg.register(McpCommand);
1047                agent_reg.register(PolicyCommand);
1048                agent_reg.register(SchedulerCommand);
1049                agent_reg.register(LspCommand);
1050                // Phase 4 migrations (Send-safe commands):
1051                agent_reg.register(CacheStatsCommand);
1052                agent_reg.register(ImageCommand);
1053                agent_reg.register(NotifyTestCommand);
1054                agent_reg.register(StatusCommand);
1055                agent_reg.register(GuardrailCommand);
1056                agent_reg.register(FocusCommand);
1057                agent_reg.register(SideQuestCommand);
1058                agent_reg.register(AgentCommand);
1059                // Phase 5 migrations (Send-compatible):
1060                agent_reg.register(CompactCommand);
1061                agent_reg.register(NewConversationCommand);
1062                agent_reg.register(RecapCommand);
1063                agent_reg.register(ExperimentCommand);
1064                agent_reg.register(PlanCommand);
1065                agent_reg.register(LoopCommand);
1066                agent_reg.register(PluginsCommand);
1067                agent_reg.register(AcpCommand);
1068
1069                let mut ctx = zeph_commands::CommandContext {
1070                    sink: &mut agent_null_sink,
1071                    debug: &mut agent_null_debug,
1072                    messages: &mut agent_null_messages,
1073                    session: &agent_null_session,
1074                    agent: self,
1075                };
1076                // self is reborrowed; ctx drops at end of this block.
1077                agent_reg.dispatch(&mut ctx, trimmed).await
1078            } else {
1079                None
1080            };
1081            // self.channel is available again here (ctx borrow dropped above).
1082            // Post-dispatch learning hook for `/skill reject` / `/feedback` is triggered
1083            // inside apply_dispatch_result when with_learning = true.
1084            match self
1085                .apply_dispatch_result(agent_result, trimmed, true)
1086                .await
1087            {
1088                DispatchFlow::Break => break,
1089                DispatchFlow::Continue => continue,
1090                DispatchFlow::Fallthrough => {
1091                    // Not handled by agent registry; fall through to existing dispatch.
1092                }
1093            }
1094
1095            match self.handle_builtin_command(trimmed) {
1096                Some(true) => break,
1097                Some(false) => continue,
1098                None => {}
1099            }
1100
1101            self.process_user_message(text, image_parts).await?;
1102        }
1103
1104        // autoDream: run background memory consolidation if conditions are met (#2697).
1105        // Runs with a timeout — partial state is acceptable for MVP.
1106        self.maybe_autodream().await;
1107
1108        // Flush trace collector on normal exit (C-04: Drop handles error/panic paths).
1109        if let Some(ref mut tc) = self.runtime.debug.trace_collector {
1110            tc.finish();
1111        }
1112
1113        Ok(())
1114    }
1115
1116    /// Dispatch a slash-command registry result and flush the channel.
1117    ///
1118    /// Returns [`DispatchFlow::Break`] on exit, [`DispatchFlow::Continue`] when handled, or
1119    /// [`DispatchFlow::Fallthrough`] when `result` is `None`.
1120    /// When `with_learning` is `true`, triggers the post-command learning hook for `Message` output.
1121    async fn apply_dispatch_result(
1122        &mut self,
1123        result: Option<Result<zeph_commands::CommandOutput, zeph_commands::CommandError>>,
1124        command: &str,
1125        with_learning: bool,
1126    ) -> DispatchFlow {
1127        match result {
1128            Some(Ok(zeph_commands::CommandOutput::Exit)) => {
1129                let _ = self.channel.flush_chunks().await;
1130                DispatchFlow::Break
1131            }
1132            Some(Ok(
1133                zeph_commands::CommandOutput::Continue | zeph_commands::CommandOutput::Silent,
1134            )) => {
1135                let _ = self.channel.flush_chunks().await;
1136                DispatchFlow::Continue
1137            }
1138            Some(Ok(zeph_commands::CommandOutput::Message(msg))) => {
1139                let _ = self.channel.send(&msg).await;
1140                let _ = self.channel.flush_chunks().await;
1141                if with_learning {
1142                    self.maybe_trigger_post_command_learning(command).await;
1143                }
1144                DispatchFlow::Continue
1145            }
1146            Some(Err(e)) => {
1147                let _ = self.channel.send(&e.to_string()).await;
1148                let _ = self.channel.flush_chunks().await;
1149                tracing::warn!(command = %command, error = %e.0, "slash command failed");
1150                DispatchFlow::Continue
1151            }
1152            None => DispatchFlow::Fallthrough,
1153        }
1154    }
1155
1156    /// Apply any pending LLM provider override from ACP `set_session_config_option`.
1157    fn apply_provider_override(&mut self) {
1158        if let Some(ref slot) = self.runtime.providers.provider_override
1159            && let Some(new_provider) = slot.write().take()
1160        {
1161            tracing::debug!(provider = new_provider.name(), "ACP model override applied");
1162            self.provider = new_provider;
1163        }
1164    }
1165
1166    /// Poll all event sources and return the next [`LoopEvent`].
1167    ///
1168    /// Returns `None` when the inbound channel closes (graceful shutdown).
1169    ///
1170    /// # Errors
1171    ///
1172    /// Propagates channel receive errors.
1173    async fn next_event(&mut self) -> Result<Option<LoopEvent>, error::AgentError> {
1174        let event = tokio::select! {
1175            result = self.channel.recv() => {
1176                return Ok(result?.map(LoopEvent::Message));
1177            }
1178            () = shutdown_signal(&mut self.runtime.lifecycle.shutdown) => {
1179                tracing::info!("shutting down");
1180                LoopEvent::Shutdown
1181            }
1182            Some(_) = recv_optional(&mut self.services.skill.skill_reload_rx) => {
1183                LoopEvent::SkillReload
1184            }
1185            Some(_) = recv_optional(&mut self.runtime.instructions.reload_rx) => {
1186                LoopEvent::InstructionReload
1187            }
1188            Some(_) = recv_optional(&mut self.runtime.lifecycle.config_reload_rx) => {
1189                LoopEvent::ConfigReload
1190            }
1191            Some(msg) = recv_optional(&mut self.runtime.lifecycle.update_notify_rx) => {
1192                LoopEvent::UpdateNotification(msg)
1193            }
1194            Some(msg) = recv_optional(&mut self.services.experiments.notify_rx) => {
1195                LoopEvent::ExperimentCompleted(msg)
1196            }
1197            Some(prompt) = recv_optional(&mut self.runtime.lifecycle.custom_task_rx) => {
1198                tracing::info!("scheduler: injecting custom task as agent turn");
1199                LoopEvent::ScheduledTask(prompt)
1200            }
1201            () = async {
1202                if let Some(ref mut ls) = self.runtime.lifecycle.user_loop {
1203                    if ls.cancel_tx.is_cancelled() {
1204                        std::future::pending::<()>().await;
1205                    } else {
1206                        ls.interval.tick().await;
1207                    }
1208                } else {
1209                    std::future::pending::<()>().await;
1210                }
1211            } => {
1212                // Re-check user_loop after the tick — /loop stop may have fired between the
1213                // interval firing and this arm executing. Returning Ok(None) causes the caller
1214                // to `continue` without injecting a stale or empty prompt.
1215                let Some(ls) = self.runtime.lifecycle.user_loop.as_ref() else {
1216                    return Ok(None);
1217                };
1218                if ls.cancel_tx.is_cancelled() {
1219                    self.runtime.lifecycle.user_loop = None;
1220                    return Ok(None);
1221                }
1222                let prompt = ls.prompt.clone();
1223                LoopEvent::TaskInjected(task_injection::TaskInjection { prompt })
1224            }
1225            Some(event) = recv_optional(&mut self.runtime.lifecycle.file_changed_rx) => {
1226                LoopEvent::FileChanged(event)
1227            }
1228        };
1229        Ok(Some(event))
1230    }
1231
1232    async fn resolve_message(
1233        &self,
1234        msg: crate::channel::ChannelMessage,
1235    ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
1236        use crate::channel::{Attachment, AttachmentKind};
1237        use zeph_llm::provider::{ImageData, MessagePart};
1238
1239        let text_base = msg.text.clone();
1240
1241        let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
1242            .attachments
1243            .into_iter()
1244            .partition(|a| a.kind == AttachmentKind::Audio);
1245
1246        tracing::debug!(
1247            audio = audio_attachments.len(),
1248            has_stt = self.runtime.providers.stt.is_some(),
1249            "resolve_message attachments"
1250        );
1251
1252        let text = if !audio_attachments.is_empty()
1253            && let Some(stt) = self.runtime.providers.stt.as_ref()
1254        {
1255            let mut transcribed_parts = Vec::new();
1256            for attachment in &audio_attachments {
1257                if attachment.data.len() > MAX_AUDIO_BYTES {
1258                    tracing::warn!(
1259                        size = attachment.data.len(),
1260                        max = MAX_AUDIO_BYTES,
1261                        "audio attachment exceeds size limit, skipping"
1262                    );
1263                    continue;
1264                }
1265                match stt
1266                    .transcribe(&attachment.data, attachment.filename.as_deref())
1267                    .await
1268                {
1269                    Ok(result) => {
1270                        tracing::info!(
1271                            len = result.text.len(),
1272                            language = ?result.language,
1273                            "audio transcribed"
1274                        );
1275                        transcribed_parts.push(result.text);
1276                    }
1277                    Err(e) => {
1278                        tracing::error!(error = %e, "audio transcription failed");
1279                    }
1280                }
1281            }
1282            if transcribed_parts.is_empty() {
1283                text_base
1284            } else {
1285                let transcribed = transcribed_parts.join("\n");
1286                if text_base.is_empty() {
1287                    transcribed
1288                } else {
1289                    format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
1290                }
1291            }
1292        } else {
1293            if !audio_attachments.is_empty() {
1294                tracing::warn!(
1295                    count = audio_attachments.len(),
1296                    "audio attachments received but no STT provider configured, dropping"
1297                );
1298            }
1299            text_base
1300        };
1301
1302        let mut image_parts = Vec::new();
1303        for attachment in image_attachments {
1304            if attachment.data.len() > MAX_IMAGE_BYTES {
1305                tracing::warn!(
1306                    size = attachment.data.len(),
1307                    max = MAX_IMAGE_BYTES,
1308                    "image attachment exceeds size limit, skipping"
1309                );
1310                continue;
1311            }
1312            let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
1313            image_parts.push(MessagePart::Image(Box::new(ImageData {
1314                data: attachment.data,
1315                mime_type,
1316            })));
1317        }
1318
1319        (text, image_parts)
1320    }
1321
1322    /// Create a new [`Turn`] for the given input and advance the turn counter.
1323    ///
1324    /// Clears per-turn state that must not carry over between turns:
1325    /// - per-turn `CancellationToken` (new token for each turn)
1326    /// - per-turn URL set in `SecurityState` (cleared here; re-populated in
1327    ///   `process_user_message_inner` after security checks)
1328    fn begin_turn(&mut self, input: turn::TurnInput) -> turn::Turn {
1329        let id = turn::TurnId(self.runtime.debug.iteration_counter as u64);
1330        self.runtime.debug.iteration_counter += 1;
1331        let cancel_token = CancellationToken::new();
1332        // keep agent-wide token in sync with per-turn token — TODO(#3498): consolidate in Phase 2
1333        self.runtime.lifecycle.cancel_token = cancel_token.clone();
1334        self.services.security.user_provided_urls.write().clear();
1335        // Reset per-turn LLM request counter for the notification gate.
1336        self.runtime.lifecycle.turn_llm_requests = 0;
1337
1338        let context = turn::TurnContext::new(id, cancel_token, self.runtime.config.timeouts);
1339        turn::Turn::new(context, input)
1340    }
1341
1342    /// Finalise a turn: copy accumulated timings into `MetricsState` and flush.
1343    ///
1344    /// Must be called exactly once per turn, after `process_user_message_inner` returns
1345    /// (regardless of success or error). Corresponds to the M2 resolution in the spec:
1346    /// `TurnMetrics.timings` is the single source of truth; `MetricsState.pending_timings`
1347    /// is populated from it here so the rest of the pipeline is unchanged.
1348    fn end_turn(&mut self, turn: turn::Turn) {
1349        self.runtime.metrics.pending_timings = turn.metrics.timings;
1350        self.flush_turn_timings();
1351        // Clear per-turn intent (FR-008): must not persist across turns.
1352        self.services.session.current_turn_intent = None;
1353    }
1354
1355    #[cfg_attr(
1356        feature = "profiling",
1357        tracing::instrument(name = "agent.turn", skip_all, fields(turn_id))
1358    )]
1359    async fn process_user_message(
1360        &mut self,
1361        text: String,
1362        image_parts: Vec<zeph_llm::provider::MessagePart>,
1363    ) -> Result<(), error::AgentError> {
1364        let input = turn::TurnInput::new(text, image_parts);
1365        let mut t = self.begin_turn(input);
1366
1367        let turn_idx = usize::try_from(t.id().0).unwrap_or(usize::MAX);
1368        tracing::Span::current().record("turn_id", t.id().0);
1369        // Record iteration start in trace collector (C-02: owned guard, no borrow held).
1370        self.runtime
1371            .debug
1372            .start_iteration_span(turn_idx, t.input.text.trim());
1373
1374        let result = self.process_user_message_inner(&mut t).await;
1375
1376        // Close iteration span regardless of outcome (partial trace preserved on error).
1377        let span_status = if result.is_ok() {
1378            crate::debug_dump::trace::SpanStatus::Ok
1379        } else {
1380            crate::debug_dump::trace::SpanStatus::Error {
1381                message: "iteration failed".to_owned(),
1382            }
1383        };
1384        self.runtime.debug.end_iteration_span(turn_idx, span_status);
1385
1386        self.end_turn(t);
1387        result
1388    }
1389
1390    async fn process_user_message_inner(
1391        &mut self,
1392        turn: &mut turn::Turn,
1393    ) -> Result<(), error::AgentError> {
1394        self.reap_background_tasks_and_update_metrics();
1395
1396        // Drain any background shell completions that arrived since the last turn.
1397        // They are buffered in `pending_background_completions` and merged with the
1398        // real user message into a single user-role block below (N1 invariant).
1399        self.drain_background_completions();
1400
1401        self.wire_cancel_bridge(turn.cancel_token());
1402
1403        // Clone text out of Turn so we can hold both `&str` borrows and mutate turn.metrics.
1404        let text = turn.input.text.clone();
1405        let trimmed_owned = text.trim().to_owned();
1406        let trimmed = trimmed_owned.as_str();
1407
1408        // Capture current-turn intent for VIGIL gate (FR-007). Truncated to 1024 chars.
1409        // Must be set BEFORE any tool call; cleared at end_turn (FR-008).
1410        if self.services.security.vigil.is_some() {
1411            let intent_len = trimmed.floor_char_boundary(1024.min(trimmed.len()));
1412            self.services.session.current_turn_intent = Some(trimmed[..intent_len].to_owned());
1413        }
1414
1415        if let Some(result) = self.dispatch_slash_command(trimmed).await {
1416            return result;
1417        }
1418
1419        self.check_pending_rollbacks().await;
1420
1421        if self.pre_process_security(trimmed).await? {
1422            return Ok(());
1423        }
1424
1425        let t_ctx = std::time::Instant::now();
1426        tracing::debug!("turn timing: prepare_context start");
1427        self.advance_context_lifecycle_guarded(&text, trimmed).await;
1428        turn.metrics_mut().timings.prepare_context_ms =
1429            u64::try_from(t_ctx.elapsed().as_millis()).unwrap_or(u64::MAX);
1430        tracing::debug!(
1431            ms = turn.metrics_snapshot().timings.prepare_context_ms,
1432            "turn timing: prepare_context done"
1433        );
1434
1435        let image_parts = std::mem::take(&mut turn.input.image_parts);
1436        // Prepend any background completion blocks to the user text. All completions and the
1437        // user message MUST be merged into a single user-role block to satisfy the strict
1438        // user/assistant alternation rule (Anthropic Messages API — N1 invariant).
1439        let merged_text = self.build_user_message_text_with_bg_completions(&text);
1440        let user_msg = self.build_user_message(&merged_text, image_parts);
1441
1442        // Extract URLs from user input and add to user_provided_urls for grounding checks.
1443        // URL set was cleared in begin_turn; re-populate for this turn.
1444        let urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
1445        if !urls.is_empty() {
1446            self.services
1447                .security
1448                .user_provided_urls
1449                .write()
1450                .extend(urls);
1451        }
1452
1453        // Capture raw user input as goal text for A-MAC goal-conditioned write gating (#2483).
1454        // Derived from the raw input text before context assembly to avoid timing dependencies.
1455        self.services.memory.extraction.goal_text = Some(text.clone());
1456
1457        let t_persist = std::time::Instant::now();
1458        tracing::debug!("turn timing: persist_message(user) start");
1459        // Image parts intentionally excluded — base64 payloads too large for message history.
1460        self.persist_message(Role::User, &text, &[], false).await;
1461        turn.metrics_mut().timings.persist_message_ms =
1462            u64::try_from(t_persist.elapsed().as_millis()).unwrap_or(u64::MAX);
1463        tracing::debug!(
1464            ms = turn.metrics_snapshot().timings.persist_message_ms,
1465            "turn timing: persist_message(user) done"
1466        );
1467        self.push_message(user_msg);
1468
1469        // llm_chat_ms and tool_exec_ms are accumulated inside call_chat_with_tools and
1470        // handle_native_tool_calls respectively via metrics.pending_timings.
1471        tracing::debug!("turn timing: process_response start");
1472        let turn_had_error = if let Err(e) = self.process_response().await {
1473            // Detach any in-flight learning tasks before mutating message state.
1474            self.services.learning_engine.learning_tasks.detach_all();
1475            tracing::error!("Response processing failed: {e:#}");
1476
1477            // Record provider failure timestamp so the next turn can skip
1478            // expensive context preparation while providers are known-down.
1479            if e.is_no_providers() {
1480                self.runtime.lifecycle.last_no_providers_at = Some(std::time::Instant::now());
1481                let backoff_secs = self.runtime.config.timeouts.no_providers_backoff_secs;
1482                tracing::warn!(
1483                    backoff_secs,
1484                    "no providers available; backing off before next turn"
1485                );
1486                tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await;
1487            }
1488
1489            let user_msg = format!("Error: {e:#}");
1490            self.channel.send(&user_msg).await?;
1491            self.msg.messages.pop();
1492            self.recompute_prompt_tokens();
1493            self.channel.flush_chunks().await?;
1494            true
1495        } else {
1496            // Detach learning tasks spawned this turn — they are fire-and-forget and must not
1497            // leak into the next turn's context.
1498            self.services.learning_engine.learning_tasks.detach_all();
1499            self.truncate_old_tool_results();
1500            // MagicDocs: spawn background doc updates if any are due (#2702).
1501            self.maybe_update_magic_docs();
1502            // Compression spectrum: fire-and-forget promotion scan (#3305).
1503            self.maybe_spawn_promotion_scan();
1504            false
1505        };
1506        tracing::debug!("turn timing: process_response done");
1507
1508        // MARCH self-check hook: runs after every successful response, including cache-hit path.
1509        #[cfg(feature = "self-check")]
1510        if let Some(pipeline) = self.services.quality.clone() {
1511            self.run_self_check_for_turn(pipeline, turn.id().0).await;
1512        }
1513        // Flush pending response chunks and emit ResponseEnd exactly once per turn.
1514        // send() no longer emits ResponseEnd — flush_chunks() is the sole emitter.
1515        // When self-check appends a flag_marker chunk, this single call covers both
1516        // the main response and the marker, preventing the double response_end of #3243.
1517        let _ = self.channel.flush_chunks().await;
1518
1519        self.maybe_fire_completion_notification(turn, turn_had_error);
1520
1521        // Collect llm_chat_ms and tool_exec_ms from MetricsState.pending_timings (accumulated
1522        // by the tool execution chain) into turn.metrics so end_turn can flush them.
1523        // This is the Phase 1 bridging: existing code writes to pending_timings directly;
1524        // we harvest those values into Turn before end_turn overwrites pending_timings.
1525        turn.metrics_mut().timings.llm_chat_ms = self.runtime.metrics.pending_timings.llm_chat_ms;
1526        turn.metrics_mut().timings.tool_exec_ms = self.runtime.metrics.pending_timings.tool_exec_ms;
1527
1528        Ok(())
1529    }
1530
1531    /// Wire the per-turn cancellation token into the cancel bridge.
1532    ///
1533    /// The bridge translates `cancel_signal` (Notify) into a `CancellationToken` cancel so that
1534    /// channel-level abort requests propagate to the in-flight LLM call. The previous bridge task
1535    /// is aborted before a new one is spawned to prevent unbounded accumulation (#2737).
1536    fn wire_cancel_bridge(&mut self, turn_token: &tokio_util::sync::CancellationToken) {
1537        let signal = Arc::clone(&self.runtime.lifecycle.cancel_signal);
1538        let token = turn_token.clone();
1539        // Keep lifecycle.cancel_token in sync so existing code that reads it still works.
1540        self.runtime.lifecycle.cancel_token = turn_token.clone();
1541        if let Some(prev) = self.runtime.lifecycle.cancel_bridge_handle.take() {
1542            prev.abort();
1543        }
1544        self.runtime.lifecycle.cancel_bridge_handle =
1545            Some(self.runtime.lifecycle.task_supervisor.spawn_oneshot(
1546                std::sync::Arc::from("agent.lifecycle.cancel_bridge"),
1547                move || async move {
1548                    signal.notified().await;
1549                    token.cancel();
1550                },
1551            ));
1552    }
1553
1554    /// Reap completed background tasks, apply summarization signal, and update supervisor metrics.
1555    ///
1556    /// Called at the top of each turn, before any user message processing.
1557    fn reap_background_tasks_and_update_metrics(&mut self) {
1558        let bg_signal = self.runtime.lifecycle.supervisor.reap();
1559        if bg_signal.did_summarize {
1560            self.services.memory.persistence.unsummarized_count = 0;
1561            tracing::debug!("background summarization completed; unsummarized_count reset");
1562        }
1563        let snap = self.runtime.lifecycle.supervisor.metrics_snapshot();
1564        self.update_metrics(|m| {
1565            m.bg_inflight = snap.inflight as u64;
1566            m.bg_dropped = snap.total_dropped();
1567            m.bg_completed = snap.total_completed();
1568            m.bg_enrichment_inflight = snap.class_inflight[0] as u64;
1569            m.bg_telemetry_inflight = snap.class_inflight[1] as u64;
1570        });
1571
1572        // Update shell background run rows for TUI panel.
1573        if self.runtime.lifecycle.shell_executor_handle.is_some() {
1574            let shell_rows: Vec<crate::metrics::ShellBackgroundRunRow> = self
1575                .runtime
1576                .lifecycle
1577                .shell_executor_handle
1578                .as_ref()
1579                .map(|e| e.background_runs_snapshot())
1580                .unwrap_or_default()
1581                .into_iter()
1582                .map(|s| crate::metrics::ShellBackgroundRunRow {
1583                    run_id: truncate_shell_run_id(&s.run_id),
1584                    command: truncate_shell_command(&s.command),
1585                    elapsed_secs: s.elapsed_ms / 1000,
1586                })
1587                .collect();
1588            self.update_metrics(|m| {
1589                m.shell_background_runs = shell_rows;
1590            });
1591        }
1592
1593        // Intentional ordering: reap() runs before abort_class() so completed tasks are
1594        // accounted in the snapshot above.
1595        if self
1596            .runtime
1597            .config
1598            .supervisor_config
1599            .abort_enrichment_on_turn
1600        {
1601            self.runtime
1602                .lifecycle
1603                .supervisor
1604                .abort_class(agent_supervisor::TaskClass::Enrichment);
1605        }
1606    }
1607
1608    /// Fire completion notifications and `turn_complete` hooks after each turn.
1609    ///
1610    /// Builds [`crate::notifications::TurnSummary`] once and reuses it for both the
1611    /// [`crate::notifications::Notifier`] and any `[[hooks.turn_complete]]` entries. The
1612    /// `preview` field is already redacted by [`Self::last_assistant_preview`], so hook
1613    /// env vars carry no raw assistant output.
1614    ///
1615    /// Gating:
1616    /// - When a `Notifier` is configured, both the notifier and hooks share its
1617    ///   `should_fire` gate (`min_turn_duration_ms`, `only_on_error`, `enabled`).
1618    /// - When no `Notifier` is configured, hooks fire on every turn completion (the
1619    ///   notifier path is simply skipped).
1620    fn maybe_fire_completion_notification(&mut self, turn: &turn::Turn, is_error: bool) {
1621        let snap = turn.metrics_snapshot().timings.clone();
1622        let duration_ms = snap
1623            .prepare_context_ms
1624            .saturating_add(snap.llm_chat_ms)
1625            .saturating_add(snap.tool_exec_ms);
1626        let summary = crate::notifications::TurnSummary {
1627            duration_ms,
1628            preview: self.last_assistant_preview(160),
1629            // TODO: wire turn_tool_calls counter once LifecycleState tracks it (Phase 2).
1630            tool_calls: 0,
1631            llm_requests: self.runtime.lifecycle.turn_llm_requests,
1632            exit_status: if is_error {
1633                crate::notifications::TurnExitStatus::Error
1634            } else {
1635                crate::notifications::TurnExitStatus::Success
1636            },
1637        };
1638
1639        // Gate evaluation: notifier's should_fire result (or unconditional when absent).
1640        let gate_ok = self
1641            .runtime
1642            .lifecycle
1643            .notifier
1644            .as_ref()
1645            .is_none_or(|n| n.should_fire(&summary));
1646
1647        // 1) Existing notifier path — unchanged semantics.
1648        if let Some(ref notifier) = self.runtime.lifecycle.notifier
1649            && gate_ok
1650        {
1651            notifier.fire(&summary);
1652        }
1653
1654        // 2) turn_complete hooks — fire-and-forget, matching the Notifier::fire pattern.
1655        // MCP dispatch is omitted: turn_complete hooks are expected to be Command-type
1656        // (shell scripts for desktop notifications, status-bar updates, etc.). McpTool
1657        // hooks in this context would require a Send + 'static MCP handle; defer that
1658        // extension if needed via a follow-up.
1659        let hooks = self.services.session.hooks_config.turn_complete.clone();
1660        if !hooks.is_empty() && gate_ok {
1661            let mut env = std::collections::HashMap::new();
1662            env.insert(
1663                "ZEPH_TURN_DURATION_MS".to_owned(),
1664                summary.duration_ms.to_string(),
1665            );
1666            env.insert(
1667                "ZEPH_TURN_STATUS".to_owned(),
1668                if is_error { "error" } else { "success" }.to_owned(),
1669            );
1670            env.insert("ZEPH_TURN_PREVIEW".to_owned(), summary.preview.clone());
1671            env.insert(
1672                "ZEPH_TURN_LLM_REQUESTS".to_owned(),
1673                summary.llm_requests.to_string(),
1674            );
1675            let _span = tracing::info_span!("core.agent.turn_hooks").entered();
1676            let _accepted = self.runtime.lifecycle.supervisor.spawn(
1677                agent_supervisor::TaskClass::Telemetry,
1678                "turn-complete-hooks",
1679                async move {
1680                    // Explicitly 'static so the future satisfies tokio::spawn's bound.
1681                    // None holds no actual reference; the annotation is vacuously satisfied.
1682                    let no_mcp: Option<&'static dyn zeph_subagent::McpDispatch> = None;
1683                    if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, no_mcp).await {
1684                        tracing::warn!(error = %e, "turn_complete hook failed");
1685                    }
1686                },
1687            );
1688        }
1689    }
1690
1691    // Returns true if the input was blocked and the caller should return Ok(()) immediately.
1692    #[cfg_attr(
1693        feature = "profiling",
1694        tracing::instrument(name = "agent.security_prescreen", skip_all)
1695    )]
1696    async fn pre_process_security(&mut self, trimmed: &str) -> Result<bool, error::AgentError> {
1697        // Guardrail: LLM-based prompt injection pre-screening at the user input boundary.
1698        if let Some(ref guardrail) = self.services.security.guardrail {
1699            use zeph_sanitizer::guardrail::GuardrailVerdict;
1700            let verdict = guardrail.check(trimmed).await;
1701            match &verdict {
1702                GuardrailVerdict::Flagged { reason, .. } => {
1703                    tracing::warn!(
1704                        reason = %reason,
1705                        should_block = verdict.should_block(),
1706                        "guardrail flagged user input"
1707                    );
1708                    if verdict.should_block() {
1709                        let msg = format!("[guardrail] Input blocked: {reason}");
1710                        let _ = self.channel.send(&msg).await;
1711                        let _ = self.channel.flush_chunks().await;
1712                        return Ok(true);
1713                    }
1714                    // Warn mode: notify but continue.
1715                    let _ = self
1716                        .channel
1717                        .send(&format!("[guardrail] Warning: {reason}"))
1718                        .await;
1719                }
1720                GuardrailVerdict::Error { error } => {
1721                    if guardrail.error_should_block() {
1722                        tracing::warn!(%error, "guardrail check failed (fail_strategy=closed), blocking input");
1723                        let msg = "[guardrail] Input blocked: check failed (see logs for details)";
1724                        let _ = self.channel.send(msg).await;
1725                        let _ = self.channel.flush_chunks().await;
1726                        return Ok(true);
1727                    }
1728                    tracing::warn!(%error, "guardrail check failed (fail_strategy=open), allowing input");
1729                }
1730                GuardrailVerdict::Safe => {}
1731            }
1732        }
1733
1734        // ML classifier: lightweight injection detection on user input boundary.
1735        // Runs after guardrail (LLM-based) to layer defenses. On detection, blocks and returns.
1736        // Falls back to regex on classifier error/timeout — never degrades below regex baseline.
1737        // Gated by `scan_user_input`: DeBERTa is tuned for external/untrusted content, not
1738        // direct user chat. Disabled by default to prevent false positives on benign messages.
1739        #[cfg(feature = "classifiers")]
1740        if self.services.security.sanitizer.scan_user_input() {
1741            match self
1742                .services
1743                .security
1744                .sanitizer
1745                .classify_injection(trimmed)
1746                .await
1747            {
1748                zeph_sanitizer::InjectionVerdict::Blocked => {
1749                    self.push_classifier_metrics();
1750                    let _ = self
1751                        .channel
1752                        .send("[security] Input blocked: injection detected by classifier.")
1753                        .await;
1754                    let _ = self.channel.flush_chunks().await;
1755                    return Ok(true);
1756                }
1757                zeph_sanitizer::InjectionVerdict::Suspicious => {
1758                    tracing::warn!("injection_classifier soft_signal on user input");
1759                }
1760                zeph_sanitizer::InjectionVerdict::Clean => {}
1761            }
1762        }
1763        #[cfg(feature = "classifiers")]
1764        self.push_classifier_metrics();
1765
1766        Ok(false)
1767    }
1768
1769    /// Run `advance_context_lifecycle` with provider-health gating and a wall-clock timeout.
1770    ///
1771    /// Skips context preparation entirely when providers failed on the previous turn and the
1772    /// `no_providers_backoff_secs` window has not yet elapsed. When providers are available,
1773    /// wraps the call with `context_prep_timeout_secs` to prevent a stall when embed backends
1774    /// are rate-limited or unavailable (#3357).
1775    async fn advance_context_lifecycle_guarded(&mut self, text: &str, trimmed: &str) {
1776        let backoff_secs = self.runtime.config.timeouts.no_providers_backoff_secs;
1777        let prep_timeout_secs = self.runtime.config.timeouts.context_prep_timeout_secs;
1778
1779        // Skip expensive memory recall / embedding when providers are known-down.
1780        let providers_recently_failed = self
1781            .runtime
1782            .lifecycle
1783            .last_no_providers_at
1784            .is_some_and(|t| t.elapsed().as_secs() < backoff_secs);
1785
1786        if providers_recently_failed {
1787            tracing::warn!(
1788                backoff_secs,
1789                "skipping context preparation: providers were unavailable on last turn"
1790            );
1791            return;
1792        }
1793
1794        let timeout_dur = std::time::Duration::from_secs(prep_timeout_secs);
1795        match tokio::time::timeout(timeout_dur, self.advance_context_lifecycle(text, trimmed)).await
1796        {
1797            Ok(()) => {}
1798            Err(_elapsed) => {
1799                tracing::warn!(
1800                    timeout_secs = prep_timeout_secs,
1801                    "context preparation timed out; proceeding with degraded context"
1802                );
1803            }
1804        }
1805    }
1806
1807    #[cfg_attr(
1808        feature = "profiling",
1809        tracing::instrument(name = "agent.prepare_context", skip_all)
1810    )]
1811    async fn advance_context_lifecycle(&mut self, text: &str, trimmed: &str) {
1812        // Reset per-message pruning cache at the start of each turn (#2298).
1813        self.services.mcp.pruning_cache.reset();
1814
1815        // Extract before rebuild_system_prompt so the value is not tainted
1816        // by the secrets-bearing system prompt (ConversationId is just an i64).
1817        let conv_id = self.services.memory.persistence.conversation_id;
1818        self.rebuild_system_prompt(text).await;
1819
1820        self.detect_and_record_corrections(trimmed, conv_id).await;
1821        self.services.learning_engine.tick();
1822        self.analyze_and_learn().await;
1823        self.sync_graph_counts().await;
1824
1825        // Reset per-turn compaction guard FIRST so SideQuest sees a clean slate (C2 fix).
1826        // complete_focus and maybe_sidequest_eviction set this flag when they run (C1 fix).
1827        // advance_turn() transitions CompactedThisTurn → Cooling/Ready; all other states
1828        // pass through unchanged. See CompactionState::advance_turn for ordering guarantees.
1829        self.context_manager.compaction = self.context_manager.compaction.advance_turn();
1830
1831        // Tick Focus Agent and SideQuest turn counters (#1850, #1885).
1832        {
1833            self.services.focus.tick();
1834
1835            // SideQuest eviction: runs every N user turns when enabled.
1836            // Skipped when is_compacted_this_turn (focus truncation or prior eviction ran).
1837            let sidequest_should_fire = self.services.sidequest.tick();
1838            if sidequest_should_fire && !self.context_manager.compaction.is_compacted_this_turn() {
1839                self.maybe_sidequest_eviction();
1840            }
1841        }
1842
1843        // Experience memory: evolution sweep (fire-and-forget). Runs every N user turns,
1844        // gated on graph + experience config, and only when both stores are attached.
1845        {
1846            let cfg = &self.services.memory.extraction.graph_config.experience;
1847            if cfg.enabled
1848                && cfg.evolution_sweep_enabled
1849                && cfg.evolution_sweep_interval > 0
1850                && self
1851                    .services
1852                    .sidequest
1853                    .turn_counter
1854                    .checked_rem(cfg.evolution_sweep_interval as u64)
1855                    == Some(0)
1856                && let Some(memory) = self.services.memory.persistence.memory.as_ref()
1857                && let (Some(exp), Some(graph)) =
1858                    (memory.experience.as_ref(), memory.graph_store.as_ref())
1859            {
1860                let exp = std::sync::Arc::clone(exp);
1861                let graph = std::sync::Arc::clone(graph);
1862                let threshold = cfg.confidence_prune_threshold;
1863                let turn = self.services.sidequest.turn_counter;
1864                let accepted = self.runtime.lifecycle.supervisor.spawn(
1865                    agent_supervisor::TaskClass::Telemetry,
1866                    "experience-sweep",
1867                    async move {
1868                        match exp.evolution_sweep(graph.as_ref(), threshold).await {
1869                            Ok(stats) => tracing::info!(
1870                                turn,
1871                                self_loops = stats.pruned_self_loops,
1872                                low_confidence = stats.pruned_low_confidence,
1873                                "evolution sweep complete",
1874                            ),
1875                            Err(e) => tracing::warn!(
1876                                turn,
1877                                error = %e,
1878                                "evolution sweep failed",
1879                            ),
1880                        }
1881                    },
1882                );
1883                if !accepted {
1884                    tracing::warn!(
1885                        turn = self.services.sidequest.turn_counter,
1886                        "experience-sweep dropped (telemetry class at capacity)",
1887                    );
1888                }
1889            }
1890        }
1891
1892        // Cache-expiry warning (#2715): notify user when prompt cache has likely expired.
1893        if let Some(warning) = self.cache_expiry_warning() {
1894            tracing::info!(warning, "cache expiry warning");
1895            let _ = self.channel.send_status(&warning).await;
1896        }
1897
1898        // Time-based microcompact (#2699): strip stale low-value tool outputs before compaction.
1899        // Zero-LLM-cost; runs only when session gap exceeds configured threshold.
1900        self.maybe_time_based_microcompact();
1901
1902        // Tier 0: batch-apply deferred tool summaries when approaching context limit.
1903        // This is a pure in-memory operation (no LLM call) — summaries were pre-computed
1904        // during the tool loop. Intentionally does NOT set compacted_this_turn, so
1905        // proactive/reactive compaction may still fire if tokens remain above their thresholds.
1906        self.maybe_apply_deferred_summaries();
1907        self.flush_deferred_summaries().await;
1908
1909        // Proactive compression fires first (if configured); if it runs, reactive is skipped.
1910        if let Err(e) = self.maybe_proactive_compress().await {
1911            tracing::warn!("proactive compression failed: {e:#}");
1912        }
1913
1914        if let Err(e) = self.maybe_compact().await {
1915            tracing::warn!("context compaction failed: {e:#}");
1916        }
1917
1918        if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
1919            tracing::warn!("context preparation failed: {e:#}");
1920        }
1921
1922        // MAR: propagate top-1 recall confidence to the router for cost-aware routing.
1923        self.provider
1924            .set_memory_confidence(self.services.memory.persistence.last_recall_confidence);
1925
1926        self.services.learning_engine.reset_reflection();
1927    }
1928
1929    fn build_user_message(
1930        &mut self,
1931        text: &str,
1932        image_parts: Vec<zeph_llm::provider::MessagePart>,
1933    ) -> Message {
1934        let mut all_image_parts = std::mem::take(&mut self.msg.pending_image_parts);
1935        all_image_parts.extend(image_parts);
1936
1937        if !all_image_parts.is_empty() && self.provider.supports_vision() {
1938            let mut parts = vec![zeph_llm::provider::MessagePart::Text {
1939                text: text.to_owned(),
1940            }];
1941            parts.extend(all_image_parts);
1942            Message::from_parts(Role::User, parts)
1943        } else {
1944            if !all_image_parts.is_empty() {
1945                tracing::warn!(
1946                    count = all_image_parts.len(),
1947                    "image attachments dropped: provider does not support vision"
1948                );
1949            }
1950            Message {
1951                role: Role::User,
1952                content: text.to_owned(),
1953                parts: vec![],
1954                metadata: MessageMetadata::default(),
1955            }
1956        }
1957    }
1958
1959    /// Drain any ready [`zeph_tools::BackgroundCompletion`]s from the channel into
1960    /// `pending_background_completions`. Bounded by `BACKGROUND_COMPLETION_BUFFER_CAP`;
1961    /// on overflow the oldest entry is evicted and a placeholder is inserted.
1962    fn drain_background_completions(&mut self) {
1963        const BACKGROUND_COMPLETION_BUFFER_CAP: usize = 16;
1964
1965        let Some(ref mut rx) = self.runtime.lifecycle.background_completion_rx else {
1966            return;
1967        };
1968        // Non-blocking drain: collect all completions that are already ready.
1969        while let Ok(completion) = rx.try_recv() {
1970            if self.runtime.lifecycle.pending_background_completions.len()
1971                >= BACKGROUND_COMPLETION_BUFFER_CAP
1972            {
1973                tracing::warn!(
1974                    run_id = %completion.run_id,
1975                    "background completion buffer full; dropping run result"
1976                );
1977                // Buffer is full: drop the oldest queued completion and push a sentinel
1978                // for the new (incoming) run so the LLM is informed its result was lost.
1979                self.runtime
1980                    .lifecycle
1981                    .pending_background_completions
1982                    .pop_front();
1983                self.runtime
1984                    .lifecycle
1985                    .pending_background_completions
1986                    .push_back(zeph_tools::BackgroundCompletion {
1987                        run_id: completion.run_id,
1988                        exit_code: -1,
1989                        success: false,
1990                        elapsed_ms: 0,
1991                        command: completion.command,
1992                        output: format!(
1993                            "[background result for run {} dropped: buffer overflow]",
1994                            completion.run_id
1995                        ),
1996                    });
1997            } else {
1998                self.runtime
1999                    .lifecycle
2000                    .pending_background_completions
2001                    .push_back(completion);
2002            }
2003        }
2004    }
2005
2006    /// Format and drain `pending_background_completions` into a prefix string, then
2007    /// return the final merged text (prefix + user message). When there are no pending
2008    /// completions the original text is returned unchanged.
2009    fn build_user_message_text_with_bg_completions(&mut self, user_text: &str) -> String {
2010        if self
2011            .runtime
2012            .lifecycle
2013            .pending_background_completions
2014            .is_empty()
2015        {
2016            return user_text.to_owned();
2017        }
2018        let mut parts = String::new();
2019        for completion in self
2020            .runtime
2021            .lifecycle
2022            .pending_background_completions
2023            .drain(..)
2024        {
2025            let _ = write!(
2026                parts,
2027                "[Background task {} completed]\nexit_code: {}\nsuccess: {}\nelapsed_ms: {}\ncommand: {}\n\n{}\n\n",
2028                completion.run_id,
2029                completion.exit_code,
2030                completion.success,
2031                completion.elapsed_ms,
2032                completion.command,
2033                completion.output,
2034            );
2035        }
2036        parts.push_str(user_text);
2037        parts
2038    }
2039
2040    /// Poll a sub-agent until it reaches a terminal state, bridging secret requests to the
2041    /// channel. Returns a human-readable status string suitable for sending to the user.
2042    async fn poll_subagent_until_done(&mut self, task_id: &str, label: &str) -> Option<String> {
2043        use zeph_subagent::SubAgentState;
2044        let result = loop {
2045            tokio::time::sleep(std::time::Duration::from_millis(500)).await;
2046
2047            // Bridge secret requests from sub-agent to channel.confirm().
2048            // Fetch the pending request first, then release the borrow before
2049            // calling channel.confirm() (which requires &mut self).
2050            #[allow(clippy::redundant_closure_for_method_calls)]
2051            let pending = self
2052                .services
2053                .orchestration
2054                .subagent_manager
2055                .as_mut()
2056                .and_then(|m| m.try_recv_secret_request());
2057            if let Some((req_task_id, req)) = pending {
2058                // req.secret_key is pre-validated to [a-zA-Z0-9_-] in manager.rs
2059                // (SEC-P1-02), so it is safe to embed in the prompt string.
2060                let confirm_prompt = format!(
2061                    "Sub-agent requests secret '{}'. Allow?",
2062                    crate::text::truncate_to_chars(&req.secret_key, 100)
2063                );
2064                let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
2065                if let Some(mgr) = self.services.orchestration.subagent_manager.as_mut() {
2066                    if approved {
2067                        let ttl = std::time::Duration::from_mins(5);
2068                        let key = req.secret_key.clone();
2069                        if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
2070                            let _ = mgr.deliver_secret(&req_task_id, key);
2071                        }
2072                    } else {
2073                        let _ = mgr.deny_secret(&req_task_id);
2074                    }
2075                }
2076            }
2077
2078            let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2079            let statuses = mgr.statuses();
2080            let Some((_, status)) = statuses.iter().find(|(id, _)| id == task_id) else {
2081                break format!("{label} completed (no status available).");
2082            };
2083            match status.state {
2084                SubAgentState::Completed => {
2085                    let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
2086                    break format!("{label} completed: {msg}");
2087                }
2088                SubAgentState::Failed => {
2089                    let msg = status
2090                        .last_message
2091                        .clone()
2092                        .unwrap_or_else(|| "unknown error".into());
2093                    break format!("{label} failed: {msg}");
2094                }
2095                SubAgentState::Canceled => {
2096                    break format!("{label} was cancelled.");
2097                }
2098                _ => {
2099                    let _ = self
2100                        .channel
2101                        .send_status(&format!(
2102                            "{label}: turn {}/{}",
2103                            status.turns_used,
2104                            self.services
2105                                .orchestration
2106                                .subagent_manager
2107                                .as_ref()
2108                                .and_then(|m| m.agents_def(task_id))
2109                                .map_or(20, |d| d.permissions.max_turns)
2110                        ))
2111                        .await;
2112                }
2113            }
2114        };
2115        Some(result)
2116    }
2117
2118    /// Resolve a unique full `task_id` from a prefix. Returns `None` if the manager is absent,
2119    /// `Some(Err(msg))` on ambiguity/not-found, `Some(Ok(full_id))` on success.
2120    fn resolve_agent_id_prefix(&mut self, prefix: &str) -> Option<Result<String, String>> {
2121        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2122        let full_ids: Vec<String> = mgr
2123            .statuses()
2124            .into_iter()
2125            .map(|(tid, _)| tid)
2126            .filter(|tid| tid.starts_with(prefix))
2127            .collect();
2128        Some(match full_ids.as_slice() {
2129            [] => Err(format!("No sub-agent with id prefix '{prefix}'")),
2130            [fid] => Ok(fid.clone()),
2131            _ => Err(format!(
2132                "Ambiguous id prefix '{prefix}': matches {} agents",
2133                full_ids.len()
2134            )),
2135        })
2136    }
2137
2138    fn handle_agent_list(&self) -> Option<String> {
2139        use std::fmt::Write as _;
2140        let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2141        let defs = mgr.definitions();
2142        if defs.is_empty() {
2143            return Some("No sub-agent definitions found.".into());
2144        }
2145        let mut out = String::from("Available sub-agents:\n");
2146        for d in defs {
2147            let memory_label = match d.memory {
2148                Some(zeph_subagent::MemoryScope::User) => " [memory:user]",
2149                Some(zeph_subagent::MemoryScope::Project) => " [memory:project]",
2150                Some(zeph_subagent::MemoryScope::Local) => " [memory:local]",
2151                None => "",
2152            };
2153            if let Some(ref src) = d.source {
2154                let _ = writeln!(
2155                    out,
2156                    "  {}{} — {} ({})",
2157                    d.name, memory_label, d.description, src
2158                );
2159            } else {
2160                let _ = writeln!(out, "  {}{} — {}", d.name, memory_label, d.description);
2161            }
2162        }
2163        Some(out)
2164    }
2165
2166    fn handle_agent_status(&self) -> Option<String> {
2167        use std::fmt::Write as _;
2168        let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2169        let statuses = mgr.statuses();
2170        if statuses.is_empty() {
2171            return Some("No active sub-agents.".into());
2172        }
2173        let mut out = String::from("Active sub-agents:\n");
2174        for (id, s) in &statuses {
2175            let state = format!("{:?}", s.state).to_lowercase();
2176            let elapsed = s.started_at.elapsed().as_secs();
2177            let _ = writeln!(
2178                out,
2179                "  [{short}] {state}  turns={t}  elapsed={elapsed}s  {msg}",
2180                short = &id[..8.min(id.len())],
2181                t = s.turns_used,
2182                msg = s.last_message.as_deref().unwrap_or(""),
2183            );
2184            // Show memory directory path for agents with memory enabled.
2185            if let Some(def) = mgr.agents_def(id)
2186                && let Some(scope) = def.memory
2187                && let Ok(dir) = zeph_subagent::memory::resolve_memory_dir(scope, &def.name)
2188            {
2189                let _ = writeln!(out, "       memory: {}", dir.display());
2190            }
2191        }
2192        Some(out)
2193    }
2194
2195    fn handle_agent_approve(&mut self, id: &str) -> Option<String> {
2196        let full_id = match self.resolve_agent_id_prefix(id)? {
2197            Ok(fid) => fid,
2198            Err(msg) => return Some(msg),
2199        };
2200        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2201        if let Some((tid, req)) = mgr.try_recv_secret_request()
2202            && tid == full_id
2203        {
2204            let key = req.secret_key.clone();
2205            let ttl = std::time::Duration::from_mins(5);
2206            if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
2207                return Some(format!("Approve failed: {e}"));
2208            }
2209            if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
2210                return Some(format!("Secret delivery failed: {e}"));
2211            }
2212            return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
2213        }
2214        Some(format!(
2215            "No pending secret request for sub-agent '{full_id}'."
2216        ))
2217    }
2218
2219    fn handle_agent_deny(&mut self, id: &str) -> Option<String> {
2220        let full_id = match self.resolve_agent_id_prefix(id)? {
2221            Ok(fid) => fid,
2222            Err(msg) => return Some(msg),
2223        };
2224        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2225        match mgr.deny_secret(&full_id) {
2226            Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
2227            Err(e) => Some(format!("Deny failed: {e}")),
2228        }
2229    }
2230
2231    async fn handle_agent_command(&mut self, cmd: zeph_subagent::AgentCommand) -> Option<String> {
2232        use zeph_subagent::AgentCommand;
2233
2234        match cmd {
2235            AgentCommand::List => self.handle_agent_list(),
2236            AgentCommand::Background { name, prompt } => {
2237                self.handle_agent_background(&name, &prompt)
2238            }
2239            AgentCommand::Spawn { name, prompt }
2240            | AgentCommand::Mention {
2241                agent: name,
2242                prompt,
2243            } => self.handle_agent_spawn_foreground(&name, &prompt).await,
2244            AgentCommand::Status => self.handle_agent_status(),
2245            AgentCommand::Cancel { id } => self.handle_agent_cancel(&id),
2246            AgentCommand::Approve { id } => self.handle_agent_approve(&id),
2247            AgentCommand::Deny { id } => self.handle_agent_deny(&id),
2248            AgentCommand::Resume { id, prompt } => self.handle_agent_resume(&id, &prompt).await,
2249        }
2250    }
2251
2252    fn handle_agent_background(&mut self, name: &str, prompt: &str) -> Option<String> {
2253        let provider = self.provider.clone();
2254        let tool_executor = Arc::clone(&self.tool_executor);
2255        let skills = self.filtered_skills_for(name);
2256        let cfg = self.services.orchestration.subagent_config.clone();
2257        let spawn_ctx = self.build_spawn_context(&cfg);
2258        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2259        match mgr.spawn(
2260            name,
2261            prompt,
2262            provider,
2263            tool_executor,
2264            skills,
2265            &cfg,
2266            spawn_ctx,
2267        ) {
2268            Ok(id) => Some(format!(
2269                "Sub-agent '{name}' started in background (id: {short})",
2270                short = &id[..8.min(id.len())]
2271            )),
2272            Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
2273        }
2274    }
2275
2276    async fn handle_agent_spawn_foreground(&mut self, name: &str, prompt: &str) -> Option<String> {
2277        let provider = self.provider.clone();
2278        let tool_executor = Arc::clone(&self.tool_executor);
2279        let skills = self.filtered_skills_for(name);
2280        let cfg = self.services.orchestration.subagent_config.clone();
2281        let spawn_ctx = self.build_spawn_context(&cfg);
2282        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2283        let task_id = match mgr.spawn(
2284            name,
2285            prompt,
2286            provider,
2287            tool_executor,
2288            skills,
2289            &cfg,
2290            spawn_ctx,
2291        ) {
2292            Ok(id) => id,
2293            Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
2294        };
2295        let short = task_id[..8.min(task_id.len())].to_owned();
2296        let _ = self
2297            .channel
2298            .send(&format!("Sub-agent '{name}' running... (id: {short})"))
2299            .await;
2300        let label = format!("Sub-agent '{name}'");
2301        self.poll_subagent_until_done(&task_id, &label).await
2302    }
2303
2304    fn handle_agent_cancel(&mut self, id: &str) -> Option<String> {
2305        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2306        // Accept prefix match on task_id.
2307        let ids: Vec<String> = mgr
2308            .statuses()
2309            .into_iter()
2310            .map(|(task_id, _)| task_id)
2311            .filter(|task_id| task_id.starts_with(id))
2312            .collect();
2313        match ids.as_slice() {
2314            [] => Some(format!("No sub-agent with id prefix '{id}'")),
2315            [full_id] => {
2316                let full_id = full_id.clone();
2317                match mgr.cancel(&full_id) {
2318                    Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
2319                    Err(e) => Some(format!("Cancel failed: {e}")),
2320                }
2321            }
2322            _ => Some(format!(
2323                "Ambiguous id prefix '{id}': matches {} agents",
2324                ids.len()
2325            )),
2326        }
2327    }
2328
2329    async fn handle_agent_resume(&mut self, id: &str, prompt: &str) -> Option<String> {
2330        let cfg = self.services.orchestration.subagent_config.clone();
2331        // Resolve definition name from transcript meta before spawning so we can
2332        // look up skills by definition name rather than the UUID prefix (S1 fix).
2333        let def_name = {
2334            let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2335            match mgr.def_name_for_resume(id, &cfg) {
2336                Ok(name) => name,
2337                Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2338            }
2339        };
2340        let skills = self.filtered_skills_for(&def_name);
2341        let provider = self.provider.clone();
2342        let tool_executor = Arc::clone(&self.tool_executor);
2343        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2344        let (task_id, _) = match mgr.resume(id, prompt, provider, tool_executor, skills, &cfg) {
2345            Ok(pair) => pair,
2346            Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2347        };
2348        let short = task_id[..8.min(task_id.len())].to_owned();
2349        let _ = self
2350            .channel
2351            .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
2352            .await;
2353        self.poll_subagent_until_done(&task_id, "Resumed sub-agent")
2354            .await
2355    }
2356
2357    fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
2358        let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2359        let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
2360        let reg = self.services.skill.registry.read();
2361        match zeph_subagent::filter_skills(&reg, &def.skills) {
2362            Ok(skills) => {
2363                let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
2364                if bodies.is_empty() {
2365                    None
2366                } else {
2367                    Some(bodies)
2368                }
2369            }
2370            Err(e) => {
2371                tracing::warn!(error = %e, "skill filtering failed for sub-agent");
2372                None
2373            }
2374        }
2375    }
2376
2377    /// Build a `SpawnContext` from current agent state for sub-agent spawning.
2378    fn build_spawn_context(
2379        &self,
2380        cfg: &zeph_config::SubAgentConfig,
2381    ) -> zeph_subagent::SpawnContext {
2382        zeph_subagent::SpawnContext {
2383            parent_messages: self.extract_parent_messages(cfg),
2384            parent_cancel: Some(self.runtime.lifecycle.cancel_token.clone()),
2385            parent_provider_name: {
2386                let name = &self.runtime.config.active_provider_name;
2387                if name.is_empty() {
2388                    None
2389                } else {
2390                    Some(name.clone())
2391                }
2392            },
2393            spawn_depth: self.runtime.config.spawn_depth,
2394            mcp_tool_names: self.extract_mcp_tool_names(),
2395        }
2396    }
2397
2398    /// Extract recent parent messages for history propagation (Section 5.7 in spec).
2399    ///
2400    /// Filters system messages, takes last `context_window_turns * 2` messages,
2401    /// and applies a 25% context window cap using a 4-chars-per-token heuristic.
2402    fn extract_parent_messages(
2403        &self,
2404        config: &zeph_config::SubAgentConfig,
2405    ) -> Vec<zeph_llm::provider::Message> {
2406        use zeph_llm::provider::Role;
2407        if config.context_window_turns == 0 {
2408            return Vec::new();
2409        }
2410        let non_system: Vec<_> = self
2411            .msg
2412            .messages
2413            .iter()
2414            .filter(|m| m.role != Role::System)
2415            .cloned()
2416            .collect();
2417        let take_count = config.context_window_turns * 2;
2418        let start = non_system.len().saturating_sub(take_count);
2419        let mut msgs = non_system[start..].to_vec();
2420
2421        // Cap at 25% of model context window (rough 4-chars-per-token heuristic).
2422        let max_chars = 128_000usize / 4; // conservative default; 25% of 128K tokens
2423        let mut total_chars: usize = 0;
2424        let mut keep = msgs.len();
2425        for (i, m) in msgs.iter().enumerate() {
2426            total_chars += m.content.len();
2427            if total_chars > max_chars {
2428                keep = i;
2429                break;
2430            }
2431        }
2432        if keep < msgs.len() {
2433            tracing::info!(
2434                kept = keep,
2435                requested = config.context_window_turns * 2,
2436                "[subagent] truncated parent history from {} to {} turns due to token budget",
2437                config.context_window_turns * 2,
2438                keep
2439            );
2440            msgs.truncate(keep);
2441        }
2442        msgs
2443    }
2444
2445    /// Extract MCP tool names from the tool executor for diagnostic annotation.
2446    fn extract_mcp_tool_names(&self) -> Vec<String> {
2447        self.tool_executor
2448            .tool_definitions_erased()
2449            .into_iter()
2450            .filter(|t| t.id.starts_with("mcp_"))
2451            .map(|t| t.id.to_string())
2452            .collect()
2453    }
2454
2455    /// Classify a skill directory's source kind using on-disk markers and the bundled allowlist.
2456    ///
2457    /// Must be called from a blocking context (uses synchronous FS I/O).
2458    fn classify_source_kind(
2459        skill_dir: &std::path::Path,
2460        managed_dir: Option<&std::path::PathBuf>,
2461        bundled_names: &std::collections::HashSet<String>,
2462    ) -> zeph_memory::store::SourceKind {
2463        if managed_dir.is_some_and(|d| skill_dir.starts_with(d)) {
2464            let skill_name = skill_dir.file_name().and_then(|n| n.to_str()).unwrap_or("");
2465            let has_marker = skill_dir.join(".bundled").exists();
2466            if has_marker && bundled_names.contains(skill_name) {
2467                zeph_memory::store::SourceKind::Bundled
2468            } else {
2469                if has_marker {
2470                    tracing::warn!(
2471                        skill = %skill_name,
2472                        "skill has .bundled marker but is not in the bundled skill \
2473                         allowlist — classifying as Hub"
2474                    );
2475                }
2476                zeph_memory::store::SourceKind::Hub
2477            }
2478        } else {
2479            zeph_memory::store::SourceKind::Local
2480        }
2481    }
2482
2483    /// Update trust DB records for all reloaded skills.
2484    async fn update_trust_for_reloaded_skills(
2485        &mut self,
2486        all_meta: &[zeph_skills::loader::SkillMeta],
2487    ) {
2488        // Clone Arc before any .await so no &self fields are held across suspension points.
2489        let memory = self.services.memory.persistence.memory.clone();
2490        let Some(memory) = memory else {
2491            return;
2492        };
2493        let trust_cfg = self.services.skill.trust_config.clone();
2494        let managed_dir = self.services.skill.managed_dir.clone();
2495        let bundled_names: std::collections::HashSet<String> =
2496            zeph_skills::bundled_skill_names().into_iter().collect();
2497        for meta in all_meta {
2498            // Compute hash and classify source_kind in spawn_blocking — both are blocking FS calls
2499            // (.bundled marker .exists() and compute_skill_hash both do std::fs I/O).
2500            let skill_dir = meta.skill_dir.clone();
2501            let managed_dir_ref = managed_dir.clone();
2502            let bundled_names_ref = bundled_names.clone();
2503            let fs_result: Option<(String, zeph_memory::store::SourceKind)> =
2504                tokio::task::spawn_blocking(move || {
2505                    let hash = zeph_skills::compute_skill_hash(&skill_dir).ok()?;
2506                    let source_kind = Self::classify_source_kind(
2507                        &skill_dir,
2508                        managed_dir_ref.as_ref(),
2509                        &bundled_names_ref,
2510                    );
2511                    Some((hash, source_kind))
2512                })
2513                .await
2514                .unwrap_or(None);
2515
2516            let Some((current_hash, source_kind)) = fs_result else {
2517                tracing::warn!("failed to compute hash for '{}'", meta.name);
2518                continue;
2519            };
2520            let initial_level = match source_kind {
2521                zeph_memory::store::SourceKind::Bundled => &trust_cfg.bundled_level,
2522                zeph_memory::store::SourceKind::Hub => &trust_cfg.default_level,
2523                zeph_memory::store::SourceKind::Local | zeph_memory::store::SourceKind::File => {
2524                    &trust_cfg.local_level
2525                }
2526            };
2527            let existing = memory
2528                .sqlite()
2529                .load_skill_trust(&meta.name)
2530                .await
2531                .ok()
2532                .flatten();
2533            let trust_level_str = if let Some(ref row) = existing {
2534                if row.blake3_hash != current_hash {
2535                    trust_cfg.hash_mismatch_level.to_string()
2536                } else if row.source_kind != source_kind {
2537                    // source_kind changed (e.g., hub → bundled on upgrade).
2538                    // Never override an explicit operator block. For active trust levels,
2539                    // adopt the source-kind initial level when it grants more trust.
2540                    let stored = row
2541                        .trust_level
2542                        .parse::<zeph_common::SkillTrustLevel>()
2543                        .unwrap_or_else(|_| {
2544                            tracing::warn!(
2545                                skill = %meta.name,
2546                                raw = %row.trust_level,
2547                                "unrecognised trust_level in DB, treating as quarantined"
2548                            );
2549                            zeph_common::SkillTrustLevel::Quarantined
2550                        });
2551                    if !stored.is_active() || stored.severity() <= initial_level.severity() {
2552                        row.trust_level.clone()
2553                    } else {
2554                        initial_level.to_string()
2555                    }
2556                } else {
2557                    row.trust_level.clone()
2558                }
2559            } else {
2560                initial_level.to_string()
2561            };
2562            let source_path = meta.skill_dir.to_str();
2563            if let Err(e) = memory
2564                .sqlite()
2565                .upsert_skill_trust(
2566                    &meta.name,
2567                    &trust_level_str,
2568                    source_kind,
2569                    None,
2570                    source_path,
2571                    &current_hash,
2572                )
2573                .await
2574            {
2575                tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
2576            }
2577        }
2578    }
2579
2580    /// Rebuild or sync the in-memory skill matcher and BM25 index after a registry update.
2581    async fn rebuild_skill_matcher(&mut self, all_meta: &[&zeph_skills::loader::SkillMeta]) {
2582        let provider = self.embedding_provider.clone();
2583        let embed_timeout =
2584            std::time::Duration::from_secs(self.runtime.config.timeouts.embedding_seconds);
2585        let embed_fn = move |text: &str| -> zeph_skills::matcher::EmbedFuture {
2586            let owned = text.to_owned();
2587            let p = provider.clone();
2588            Box::pin(async move {
2589                if let Ok(result) = tokio::time::timeout(embed_timeout, p.embed(&owned)).await {
2590                    result
2591                } else {
2592                    tracing::warn!(
2593                        timeout_secs = embed_timeout.as_secs(),
2594                        "skill matcher: embedding timed out"
2595                    );
2596                    Err(zeph_llm::LlmError::Timeout)
2597                }
2598            })
2599        };
2600
2601        let needs_inmemory_rebuild = !self
2602            .services
2603            .skill
2604            .matcher
2605            .as_ref()
2606            .is_some_and(SkillMatcherBackend::is_qdrant);
2607
2608        if needs_inmemory_rebuild {
2609            self.services.skill.matcher = SkillMatcher::new(all_meta, embed_fn)
2610                .await
2611                .map(SkillMatcherBackend::InMemory);
2612        } else if let Some(ref mut backend) = self.services.skill.matcher {
2613            let _ = self.channel.send_status("syncing skill index...").await;
2614            let on_progress: Option<Box<dyn Fn(usize, usize) + Send>> =
2615                self.services.session.status_tx.clone().map(
2616                    |tx| -> Box<dyn Fn(usize, usize) + Send> {
2617                        Box::new(move |completed, total| {
2618                            let msg = format!("Syncing skills: {completed}/{total}");
2619                            let _ = tx.send(msg);
2620                        })
2621                    },
2622                );
2623            if let Err(e) = backend
2624                .sync(
2625                    all_meta,
2626                    &self.services.skill.embedding_model,
2627                    embed_fn,
2628                    on_progress,
2629                )
2630                .await
2631            {
2632                tracing::warn!("failed to sync skill embeddings: {e:#}");
2633            }
2634        }
2635
2636        if self.services.skill.hybrid_search {
2637            let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
2638            let _ = self.channel.send_status("rebuilding search index...").await;
2639            self.services.skill.rebuild_bm25(&descs);
2640        }
2641    }
2642
2643    #[cfg_attr(
2644        feature = "profiling",
2645        tracing::instrument(name = "skill.hot_reload", skip_all)
2646    )]
2647    async fn reload_skills(&mut self) {
2648        let old_fp = self.services.skill.fingerprint();
2649        let reload_paths = if let Some(ref supplier) = self.services.skill.plugin_dirs_supplier {
2650            let plugin_dirs = supplier();
2651            let mut paths = self.services.skill.skill_paths.clone();
2652            for dir in plugin_dirs {
2653                if !paths.contains(&dir) {
2654                    paths.push(dir);
2655                }
2656            }
2657            paths
2658        } else {
2659            self.services.skill.skill_paths.clone()
2660        };
2661        self.services.skill.registry.write().reload(&reload_paths);
2662        if self.services.skill.fingerprint() == old_fp {
2663            return;
2664        }
2665        let _ = self.channel.send_status("reloading skills...").await;
2666
2667        let all_meta = self
2668            .services
2669            .skill
2670            .registry
2671            .read()
2672            .all_meta()
2673            .into_iter()
2674            .cloned()
2675            .collect::<Vec<_>>();
2676
2677        self.update_trust_for_reloaded_skills(&all_meta).await;
2678
2679        let all_meta_refs = all_meta.iter().collect::<Vec<_>>();
2680        self.rebuild_skill_matcher(&all_meta_refs).await;
2681
2682        let all_skills: Vec<Skill> = {
2683            let reg = self.services.skill.registry.read();
2684            reg.all_meta()
2685                .iter()
2686                .filter_map(|m| reg.skill(&m.name).ok())
2687                .collect()
2688        };
2689        let trust_map = self.build_skill_trust_map().await;
2690        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
2691        let skills_prompt =
2692            state::SkillState::rebuild_prompt(&all_skills, &trust_map, &empty_health);
2693        self.services
2694            .skill
2695            .last_skills_prompt
2696            .clone_from(&skills_prompt);
2697        let system_prompt = build_system_prompt(&skills_prompt, None);
2698        if let Some(msg) = self.msg.messages.first_mut() {
2699            msg.content = system_prompt;
2700        }
2701
2702        let _ = self.channel.send_status("").await;
2703        tracing::info!(
2704            "reloaded {} skill(s)",
2705            self.services.skill.registry.read().all_meta().len()
2706        );
2707    }
2708
2709    fn reload_instructions(&mut self) {
2710        // Drain any additional queued events before reloading to avoid redundant reloads.
2711        if let Some(ref mut rx) = self.runtime.instructions.reload_rx {
2712            while rx.try_recv().is_ok() {}
2713        }
2714        let Some(ref state) = self.runtime.instructions.reload_state else {
2715            return;
2716        };
2717        let new_blocks = crate::instructions::load_instructions(
2718            &state.base_dir,
2719            &state.provider_kinds,
2720            &state.explicit_files,
2721            state.auto_detect,
2722        );
2723        let old_sources: std::collections::HashSet<_> = self
2724            .runtime
2725            .instructions
2726            .blocks
2727            .iter()
2728            .map(|b| &b.source)
2729            .collect();
2730        let new_sources: std::collections::HashSet<_> =
2731            new_blocks.iter().map(|b| &b.source).collect();
2732        for added in new_sources.difference(&old_sources) {
2733            tracing::info!(path = %added.display(), "instruction file added");
2734        }
2735        for removed in old_sources.difference(&new_sources) {
2736            tracing::info!(path = %removed.display(), "instruction file removed");
2737        }
2738        tracing::info!(
2739            old_count = self.runtime.instructions.blocks.len(),
2740            new_count = new_blocks.len(),
2741            "reloaded instruction files"
2742        );
2743        self.runtime.instructions.blocks = new_blocks;
2744    }
2745
2746    fn reload_config(&mut self) {
2747        let Some(path) = self.runtime.lifecycle.config_path.clone() else {
2748            return;
2749        };
2750        let Some(config) = self.load_config_with_overlay(&path) else {
2751            return;
2752        };
2753        let budget_tokens = resolve_context_budget(&config, &self.provider);
2754        self.runtime.config.security = config.security;
2755        self.runtime.config.timeouts = config.timeouts;
2756        self.runtime.config.redact_credentials = config.memory.redact_credentials;
2757        self.services.memory.persistence.history_limit = config.memory.history_limit;
2758        self.services.memory.persistence.recall_limit = config.memory.semantic.recall_limit;
2759        self.services.memory.compaction.summarization_threshold =
2760            config.memory.summarization_threshold;
2761        self.services.skill.max_active_skills = config.skills.max_active_skills.get();
2762        self.services.skill.disambiguation_threshold = config.skills.disambiguation_threshold;
2763        self.services.skill.min_injection_score = config.skills.min_injection_score;
2764        self.services.skill.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
2765        self.services.skill.hybrid_search = config.skills.hybrid_search;
2766        self.services.skill.two_stage_matching = config.skills.two_stage_matching;
2767        self.services.skill.confusability_threshold =
2768            config.skills.confusability_threshold.clamp(0.0, 1.0);
2769        config
2770            .skills
2771            .generation_provider
2772            .as_str()
2773            .clone_into(&mut self.services.skill.generation_provider_name);
2774        self.services.skill.generation_output_dir =
2775            config.skills.generation_output_dir.as_deref().map(|p| {
2776                if let Some(stripped) = p.strip_prefix("~/") {
2777                    dirs::home_dir()
2778                        .map_or_else(|| std::path::PathBuf::from(p), |h| h.join(stripped))
2779                } else {
2780                    std::path::PathBuf::from(p)
2781                }
2782            });
2783
2784        self.context_manager.budget = Some(
2785            ContextBudget::new(budget_tokens, 0.20).with_graph_enabled(config.memory.graph.enabled),
2786        );
2787
2788        {
2789            let graph_cfg = &config.memory.graph;
2790            if graph_cfg.rpe.enabled {
2791                // Re-create router only if it doesn't exist yet; preserve state on hot-reload.
2792                if self.services.memory.extraction.rpe_router.is_none() {
2793                    self.services.memory.extraction.rpe_router =
2794                        Some(std::sync::Mutex::new(zeph_memory::RpeRouter::new(
2795                            graph_cfg.rpe.threshold,
2796                            graph_cfg.rpe.max_skip_turns,
2797                        )));
2798                }
2799            } else {
2800                self.services.memory.extraction.rpe_router = None;
2801            }
2802            self.services.memory.extraction.graph_config = graph_cfg.clone();
2803        }
2804        self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
2805        self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
2806        self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
2807        self.context_manager.compaction_cooldown_turns = config.memory.compaction_cooldown_turns;
2808        self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
2809        self.context_manager.compression = config.memory.compression.clone();
2810        self.context_manager.routing = config.memory.store_routing.clone();
2811        // Resolve routing_classifier_provider from the provider pool (#2484).
2812        self.context_manager.store_routing_provider = if config
2813            .memory
2814            .store_routing
2815            .routing_classifier_provider
2816            .is_empty()
2817        {
2818            None
2819        } else {
2820            let resolved = self.resolve_background_provider(
2821                &config.memory.store_routing.routing_classifier_provider,
2822            );
2823            Some(std::sync::Arc::new(resolved))
2824        };
2825        self.services
2826            .memory
2827            .persistence
2828            .cross_session_score_threshold = config.memory.cross_session_score_threshold;
2829
2830        self.services.index.repo_map_tokens = config.index.repo_map_tokens;
2831        self.services.index.repo_map_ttl =
2832            std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
2833
2834        tracing::info!("config reloaded");
2835    }
2836
2837    /// Load config from disk, apply plugin overlays, and warn on shell divergence.
2838    ///
2839    /// Returns `None` when loading or overlay merge fails (caller keeps prior runtime state).
2840    fn load_config_with_overlay(&mut self, path: &std::path::Path) -> Option<Config> {
2841        let mut config = match Config::load(path) {
2842            Ok(c) => c,
2843            Err(e) => {
2844                tracing::warn!("config reload failed: {e:#}");
2845                return None;
2846            }
2847        };
2848
2849        // Re-apply plugin overlays. On error, keep previous runtime state intact.
2850        let new_overlay = if self.runtime.lifecycle.plugins_dir.as_os_str().is_empty() {
2851            None
2852        } else {
2853            match zeph_plugins::apply_plugin_config_overlays(
2854                &mut config,
2855                &self.runtime.lifecycle.plugins_dir,
2856            ) {
2857                Ok(o) => Some(o),
2858                Err(e) => {
2859                    tracing::warn!(
2860                        "plugin overlay merge failed during reload: {e:#}; \
2861                         keeping previous runtime state"
2862                    );
2863                    return None;
2864                }
2865            }
2866        };
2867
2868        // M4: detect shell-level divergence from the baked-in executor and warn loudly.
2869        // ShellExecutor is not rebuilt on hot-reload; only skill threshold is live.
2870        // A follow-up P2 issue tracks live-rebuild of ShellExecutor.
2871        if let Some(ref overlay) = new_overlay {
2872            self.warn_on_shell_overlay_divergence(overlay, &config);
2873        }
2874        Some(config)
2875    }
2876
2877    /// React to shell policy divergence detected on hot-reload.
2878    ///
2879    /// `blocked_commands` is rebuilt live via `ShellPolicyHandle::rebuild` — no restart needed.
2880    /// `allowed_commands` cannot be rebuilt (feeds sandbox path intersection at construction time)
2881    /// — emit a warn + status banner when it changes.
2882    fn warn_on_shell_overlay_divergence(
2883        &self,
2884        new_overlay: &zeph_plugins::ResolvedOverlay,
2885        config: &Config,
2886    ) {
2887        let new_blocked: Vec<String> = {
2888            let mut v = config.tools.shell.blocked_commands.clone();
2889            v.sort();
2890            v
2891        };
2892        let new_allowed: Vec<String> = {
2893            let mut v = config.tools.shell.allowed_commands.clone();
2894            v.sort();
2895            v
2896        };
2897
2898        let startup = &self.runtime.lifecycle.startup_shell_overlay;
2899        let blocked_changed = new_blocked != startup.blocked;
2900        let allowed_changed = new_allowed != startup.allowed;
2901
2902        // blocked_commands IS rebuilt live — emit info-level confirmation only.
2903        if blocked_changed && let Some(ref h) = self.runtime.lifecycle.shell_policy_handle {
2904            h.rebuild(&config.tools.shell);
2905            tracing::info!(
2906                blocked_count = h.snapshot_blocked().len(),
2907                "shell blocked_commands rebuilt from hot-reload"
2908            );
2909        }
2910
2911        // allowed_commands cannot be rebuilt — sandbox path intersection is computed at
2912        // executor construction time. Warn loudly so the user restarts.
2913        //
2914        // Note: when base `allowed_commands` is empty (the default), the overlay's
2915        // intersection semantics keep it empty, so this branch is silently unreachable
2916        // for users who do not set a non-empty base list.
2917        if allowed_changed {
2918            let msg = "plugin config overlay changed shell allowed_commands; RESTART REQUIRED \
2919                 for sandbox path recomputation (blocked_commands was rebuilt live)";
2920            tracing::warn!("{msg}");
2921            if let Some(ref tx) = self.services.session.status_tx {
2922                let _ = tx.send(msg.to_owned());
2923            }
2924        }
2925
2926        let _ = new_overlay;
2927    }
2928
2929    /// Run `SideQuest` tool output eviction pass (#1885).
2930    ///
2931    /// PERF-1 fix: two-phase non-blocking design.
2932    ///
2933    /// Phase 1 (apply, this turn): check for a background LLM result spawned last turn,
2934    /// validate and apply it immediately.
2935    ///
2936    /// Phase 2 (schedule, this turn): rebuild cursors and spawn a background `tokio::spawn`
2937    /// task for the LLM call. The result is stored in `pending_sidequest_result` and applied
2938    /// next turn, so the current agent turn is never blocked by the LLM call.
2939    fn maybe_sidequest_eviction(&mut self) {
2940        // S1 runtime guard: warn when SideQuest is enabled alongside a non-Reactive pruning
2941        // strategy — the two systems share the same pool of evictable tool outputs and can
2942        // interfere. Disable sidequest.enabled when pruning_strategy != Reactive.
2943        if self.services.sidequest.config.enabled {
2944            use crate::config::PruningStrategy;
2945            if !matches!(
2946                self.context_manager.compression.pruning_strategy,
2947                PruningStrategy::Reactive
2948            ) {
2949                tracing::warn!(
2950                    strategy = ?self.context_manager.compression.pruning_strategy,
2951                    "sidequest is enabled alongside a non-Reactive pruning strategy; \
2952                     consider disabling sidequest.enabled to avoid redundant eviction"
2953                );
2954            }
2955        }
2956
2957        // Guard: do not evict while a focus session is active.
2958        if self.services.focus.is_active() {
2959            tracing::debug!("sidequest: skipping — focus session active");
2960            // Drop any pending result — cursors may be stale relative to focus truncation.
2961            self.services.compression.pending_sidequest_result = None;
2962            return;
2963        }
2964
2965        // Phase 1: apply pending result from last turn's background LLM call.
2966        self.sidequest_apply_pending();
2967
2968        // Phase 2: rebuild cursors and schedule the next background eviction LLM call.
2969        self.sidequest_schedule_next();
2970    }
2971
2972    fn sidequest_apply_pending(&mut self) {
2973        let Some(handle) = self.services.compression.pending_sidequest_result.take() else {
2974            return;
2975        };
2976        // `try_join` is non-blocking: if the task isn't done yet, `Err(handle)` is returned
2977        // and we reschedule below.
2978        let result = match handle.try_join() {
2979            Ok(result) => result,
2980            Err(_handle) => {
2981                // Task still running — drop it; a fresh one is scheduled below.
2982                tracing::debug!("sidequest: background LLM task not yet complete, rescheduling");
2983                return;
2984            }
2985        };
2986        match result {
2987            Ok(Some(evicted_indices)) if !evicted_indices.is_empty() => {
2988                let cursors_snapshot = self.services.sidequest.tool_output_cursors.clone();
2989                let freed = self.services.sidequest.apply_eviction(
2990                    &mut self.msg.messages,
2991                    &evicted_indices,
2992                    &self.runtime.metrics.token_counter,
2993                );
2994                if freed > 0 {
2995                    self.recompute_prompt_tokens();
2996                    // C1 fix: prevent maybe_compact() from firing in the same turn.
2997                    // cooldown=0: eviction does not impose post-compaction cooldown.
2998                    self.context_manager.compaction =
2999                        crate::agent::context_manager::CompactionState::CompactedThisTurn {
3000                            cooldown: 0,
3001                        };
3002                    tracing::info!(
3003                        freed_tokens = freed,
3004                        evicted_cursors = evicted_indices.len(),
3005                        pass = self.services.sidequest.passes_run,
3006                        "sidequest eviction complete"
3007                    );
3008                    if let Some(ref d) = self.runtime.debug.debug_dumper {
3009                        d.dump_sidequest_eviction(&cursors_snapshot, &evicted_indices, freed);
3010                    }
3011                    if let Some(ref tx) = self.services.session.status_tx {
3012                        let _ = tx.send(format!("SideQuest evicted {freed} tokens"));
3013                    }
3014                } else {
3015                    // apply_eviction returned 0 — clear spinner so it doesn't dangle.
3016                    if let Some(ref tx) = self.services.session.status_tx {
3017                        let _ = tx.send(String::new());
3018                    }
3019                }
3020            }
3021            Ok(None | Some(_)) => {
3022                tracing::debug!("sidequest: pending result: no cursors to evict");
3023                if let Some(ref tx) = self.services.session.status_tx {
3024                    let _ = tx.send(String::new());
3025                }
3026            }
3027            Err(e) => {
3028                tracing::debug!("sidequest: background task error: {e}");
3029                if let Some(ref tx) = self.services.session.status_tx {
3030                    let _ = tx.send(String::new());
3031                }
3032            }
3033        }
3034    }
3035
3036    fn sidequest_schedule_next(&mut self) {
3037        use zeph_llm::provider::{Message, MessageMetadata, Role};
3038
3039        self.services
3040            .sidequest
3041            .rebuild_cursors(&self.msg.messages, &self.runtime.metrics.token_counter);
3042
3043        if self.services.sidequest.tool_output_cursors.is_empty() {
3044            tracing::debug!("sidequest: no eligible cursors");
3045            return;
3046        }
3047
3048        let prompt = self.services.sidequest.build_eviction_prompt();
3049        let max_eviction_ratio = self.services.sidequest.config.max_eviction_ratio;
3050        let n_cursors = self.services.sidequest.tool_output_cursors.len();
3051        // Clone the provider so the spawn closure owns it without borrowing self.
3052        let provider = self.summary_or_primary_provider().clone();
3053
3054        let eviction_future = async move {
3055            let msgs = [Message {
3056                role: Role::User,
3057                content: prompt,
3058                parts: vec![],
3059                metadata: MessageMetadata::default(),
3060            }];
3061            let response =
3062                match tokio::time::timeout(std::time::Duration::from_secs(5), provider.chat(&msgs))
3063                    .await
3064                {
3065                    Ok(Ok(r)) => r,
3066                    Ok(Err(e)) => {
3067                        tracing::debug!("sidequest bg: LLM call failed: {e:#}");
3068                        return None;
3069                    }
3070                    Err(_) => {
3071                        tracing::debug!("sidequest bg: LLM call timed out");
3072                        return None;
3073                    }
3074                };
3075
3076            let start = response.find('{')?;
3077            let end = response.rfind('}')?;
3078            if start > end {
3079                return None;
3080            }
3081            let json_slice = &response[start..=end];
3082            let parsed: sidequest::EvictionResponse = serde_json::from_str(json_slice).ok()?;
3083            let mut valid: Vec<usize> = parsed
3084                .del_cursors
3085                .into_iter()
3086                .filter(|&c| c < n_cursors)
3087                .collect();
3088            valid.sort_unstable();
3089            valid.dedup();
3090            #[allow(
3091                clippy::cast_precision_loss,
3092                clippy::cast_possible_truncation,
3093                clippy::cast_sign_loss
3094            )]
3095            let max_evict = ((n_cursors as f32) * max_eviction_ratio).ceil() as usize;
3096            valid.truncate(max_evict);
3097            Some(valid)
3098        };
3099        let handle = self.runtime.lifecycle.task_supervisor.spawn_oneshot(
3100            std::sync::Arc::from("agent.sidequest.eviction"),
3101            move || eviction_future,
3102        );
3103        self.services.compression.pending_sidequest_result = Some(handle);
3104        tracing::debug!("sidequest: background LLM eviction task spawned");
3105        if let Some(ref tx) = self.services.session.status_tx {
3106            let _ = tx.send("SideQuest: scoring tool outputs...".into());
3107        }
3108    }
3109
3110    /// Return an `McpDispatch` adapter backed by the agent's MCP manager, if present.
3111    fn mcp_dispatch(&self) -> Option<McpManagerDispatch> {
3112        self.services
3113            .mcp
3114            .manager
3115            .as_ref()
3116            .map(|m| McpManagerDispatch(Arc::clone(m)))
3117    }
3118
3119    /// Check if the process cwd has changed since last call and fire `CwdChanged` hooks.
3120    ///
3121    /// Called after each tool batch completes. The check is a single syscall and has
3122    /// negligible cost. Only fires when cwd actually changed (defense-in-depth: normally
3123    /// only `set_working_directory` changes cwd; shell child processes cannot affect it).
3124    pub(crate) async fn check_cwd_changed(&mut self) {
3125        let current = match std::env::current_dir() {
3126            Ok(p) => p,
3127            Err(e) => {
3128                tracing::warn!("check_cwd_changed: failed to get cwd: {e}");
3129                return;
3130            }
3131        };
3132        if current == self.runtime.lifecycle.last_known_cwd {
3133            return;
3134        }
3135        let old_cwd =
3136            std::mem::replace(&mut self.runtime.lifecycle.last_known_cwd, current.clone());
3137        self.services.session.env_context.working_dir = current.display().to_string();
3138
3139        tracing::info!(
3140            old = %old_cwd.display(),
3141            new = %current.display(),
3142            "working directory changed"
3143        );
3144
3145        let _ = self
3146            .channel
3147            .send_status("Working directory changed\u{2026}")
3148            .await;
3149
3150        let hooks = self.services.session.hooks_config.cwd_changed.clone();
3151        if !hooks.is_empty() {
3152            let mut env = std::collections::HashMap::new();
3153            env.insert("ZEPH_OLD_CWD".to_owned(), old_cwd.display().to_string());
3154            env.insert("ZEPH_NEW_CWD".to_owned(), current.display().to_string());
3155            let dispatch = self.mcp_dispatch();
3156            let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
3157                .as_ref()
3158                .map(|d| d as &dyn zeph_subagent::McpDispatch);
3159            if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp).await {
3160                tracing::warn!(error = %e, "CwdChanged hook failed");
3161            }
3162        }
3163
3164        let _ = self.channel.send_status("").await;
3165    }
3166
3167    /// Handle a `FileChangedEvent` from the file watcher.
3168    pub(crate) async fn handle_file_changed(
3169        &mut self,
3170        event: crate::file_watcher::FileChangedEvent,
3171    ) {
3172        tracing::info!(path = %event.path.display(), "file changed");
3173
3174        let _ = self
3175            .channel
3176            .send_status("Running file-change hook\u{2026}")
3177            .await;
3178
3179        let hooks = self
3180            .services
3181            .session
3182            .hooks_config
3183            .file_changed_hooks
3184            .clone();
3185        if !hooks.is_empty() {
3186            let mut env = std::collections::HashMap::new();
3187            env.insert(
3188                "ZEPH_CHANGED_PATH".to_owned(),
3189                event.path.display().to_string(),
3190            );
3191            let dispatch = self.mcp_dispatch();
3192            let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
3193                .as_ref()
3194                .map(|d| d as &dyn zeph_subagent::McpDispatch);
3195            if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp).await {
3196                tracing::warn!(error = %e, "FileChanged hook failed");
3197            }
3198        }
3199
3200        let _ = self.channel.send_status("").await;
3201    }
3202
3203    /// If the compression spectrum is enabled and a promotion engine is wired, spawn a
3204    /// background scan task.
3205    ///
3206    /// The task loads the most-recent episodic window from `SemanticMemory`, runs the
3207    /// greedy clustering scan, and calls `promote` for each qualifying candidate.
3208    ///
3209    /// Supervised via [`agent_supervisor::BackgroundSupervisor`] under
3210    /// [`agent_supervisor::TaskClass::Enrichment`] — dropped under high load rather than
3211    /// blocking the turn.
3212    pub(super) fn maybe_spawn_promotion_scan(&mut self) {
3213        let Some(engine) = self.services.promotion_engine.clone() else {
3214            return;
3215        };
3216
3217        let Some(memory) = self.services.memory.persistence.memory.clone() else {
3218            return;
3219        };
3220
3221        // Use a conservative window cap. The engine's own PromotionConfig thresholds
3222        // determine whether a cluster actually qualifies; this is just the DB scan limit.
3223        let promotion_window = 200usize;
3224
3225        let accepted = self.runtime.lifecycle.supervisor.spawn(
3226            agent_supervisor::TaskClass::Enrichment,
3227            "compression_spectrum.promotion_scan",
3228            async move {
3229                let span = tracing::info_span!("memory.compression.promote.background");
3230                let _enter = span.enter();
3231
3232                let window = match memory.load_promotion_window(promotion_window).await {
3233                    Ok(w) => w,
3234                    Err(e) => {
3235                        tracing::warn!(error = %e, "promotion scan: failed to load window");
3236                        return;
3237                    }
3238                };
3239
3240                if window.is_empty() {
3241                    return;
3242                }
3243
3244                let candidates = match engine.scan(&window).await {
3245                    Ok(c) => c,
3246                    Err(e) => {
3247                        tracing::warn!(error = %e, "promotion scan: clustering failed");
3248                        return;
3249                    }
3250                };
3251
3252                for candidate in &candidates {
3253                    if let Err(e) = engine.promote(candidate).await {
3254                        tracing::warn!(
3255                            signature = %candidate.signature,
3256                            error = %e,
3257                            "promotion scan: promote failed"
3258                        );
3259                    }
3260                }
3261
3262                tracing::info!(candidates = candidates.len(), "promotion scan: complete");
3263            },
3264        );
3265
3266        if accepted {
3267            tracing::debug!("compression_spectrum: promotion scan task enqueued");
3268        }
3269    }
3270}
3271/// Thin wrapper that implements [`zeph_subagent::McpDispatch`] over an [`Arc<zeph_mcp::McpManager>`].
3272///
3273/// Used to pass MCP tool dispatch capability into `fire_hooks` without coupling
3274/// `zeph-subagent` to `zeph-mcp`.
3275struct McpManagerDispatch(Arc<zeph_mcp::McpManager>);
3276
3277impl zeph_subagent::McpDispatch for McpManagerDispatch {
3278    fn call_tool<'a>(
3279        &'a self,
3280        server: &'a str,
3281        tool: &'a str,
3282        args: serde_json::Value,
3283    ) -> std::pin::Pin<
3284        Box<dyn std::future::Future<Output = Result<serde_json::Value, String>> + Send + 'a>,
3285    > {
3286        Box::pin(async move {
3287            self.0
3288                .call_tool(server, tool, args)
3289                .await
3290                .map(|result| {
3291                    // Extract text content from the MCP response as a JSON value.
3292                    let texts: Vec<serde_json::Value> = result
3293                        .content
3294                        .iter()
3295                        .filter_map(|c| {
3296                            if let rmcp::model::RawContent::Text(t) = &c.raw {
3297                                Some(serde_json::Value::String(t.text.clone()))
3298                            } else {
3299                                None
3300                            }
3301                        })
3302                        .collect();
3303                    serde_json::Value::Array(texts)
3304                })
3305                .map_err(|e| e.to_string())
3306        })
3307    }
3308}
3309
3310pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
3311    while !*rx.borrow_and_update() {
3312        if rx.changed().await.is_err() {
3313            std::future::pending::<()>().await;
3314        }
3315    }
3316}
3317
3318pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
3319    match rx {
3320        Some(inner) => {
3321            if let Some(v) = inner.recv().await {
3322                Some(v)
3323            } else {
3324                *rx = None;
3325                std::future::pending().await
3326            }
3327        }
3328        None => std::future::pending().await,
3329    }
3330}
3331
3332/// Resolve the effective context budget from config, applying the `auto_budget` fallback.
3333///
3334/// Mirrors `AppBuilder::auto_budget_tokens` so hot-reload and initial startup use the same
3335/// logic: if `auto_budget = true` and `context_budget_tokens == 0`, query the provider's
3336/// context window; if still 0, fall back to 128 000 tokens.
3337/// Truncate a background run command to at most 80 characters for TUI display.
3338fn truncate_shell_command(cmd: &str) -> String {
3339    if cmd.len() <= 80 {
3340        return cmd.to_owned();
3341    }
3342    let end = cmd.floor_char_boundary(79);
3343    format!("{}…", &cmd[..end])
3344}
3345
3346/// Take the first 8 characters of a run-id hex string for compact TUI display.
3347fn truncate_shell_run_id(id: &str) -> String {
3348    id.chars().take(8).collect()
3349}
3350
3351pub(crate) fn resolve_context_budget(config: &Config, provider: &AnyProvider) -> usize {
3352    let tokens = if config.memory.auto_budget && config.memory.context_budget_tokens == 0 {
3353        if let Some(ctx_size) = provider.context_window() {
3354            tracing::info!(
3355                model_context = ctx_size,
3356                "auto-configured context budget on reload"
3357            );
3358            ctx_size
3359        } else {
3360            0
3361        }
3362    } else {
3363        config.memory.context_budget_tokens
3364    };
3365    if tokens == 0 {
3366        tracing::warn!(
3367            "context_budget_tokens resolved to 0 on reload — using fallback of 128000 tokens"
3368        );
3369        128_000
3370    } else {
3371        tokens
3372    }
3373}
3374
3375#[cfg(test)]
3376mod tests;
3377
3378#[cfg(test)]
3379pub(crate) use tests::agent_tests;
3380
3381#[cfg(test)]
3382mod test_stubs {
3383    use std::pin::Pin;
3384
3385    use zeph_commands::{
3386        CommandContext, CommandError, CommandHandler, CommandOutput, SlashCategory,
3387    };
3388
3389    /// Stub slash command registered only in `#[cfg(test)]` builds.
3390    ///
3391    /// Triggers the `Some(Err(CommandError))` arm in the session/debug registry
3392    /// dispatch block so the non-fatal error path can be tested without production
3393    /// command validation logic.
3394    pub(super) struct TestErrorCommand;
3395
3396    impl CommandHandler<CommandContext<'_>> for TestErrorCommand {
3397        fn name(&self) -> &'static str {
3398            "/test-error"
3399        }
3400
3401        fn description(&self) -> &'static str {
3402            "Test stub: always returns CommandError"
3403        }
3404
3405        fn category(&self) -> SlashCategory {
3406            SlashCategory::Session
3407        }
3408
3409        fn handle<'a>(
3410            &'a self,
3411            _ctx: &'a mut CommandContext<'_>,
3412            _args: &'a str,
3413        ) -> Pin<
3414            Box<dyn std::future::Future<Output = Result<CommandOutput, CommandError>> + Send + 'a>,
3415        > {
3416            Box::pin(async { Err(CommandError::new("boom")) })
3417        }
3418    }
3419}