Skip to main content

zeph_core/agent/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4// TODO(arch-revised-2026-04-26): agent/ module split deferred to epic/m49+/agent-split.
5// Hard prerequisite: 100% task_supervisor adoption (currently 10/31 raw tokio::spawn
6// sites in this directory). See arch-assessment-revised-2026-04-26T02-04-23.md PR 1
7// and PR 8. Do not split this file until PR 1 is merged and a /specs/ entry exists.
8
9mod acp_commands;
10mod agent_access_impl;
11pub(crate) mod agent_supervisor;
12mod autodream;
13mod builder;
14pub(crate) mod channel_impl;
15#[cfg(feature = "cocoon")]
16mod cocoon_cmd;
17mod command_context_impls;
18pub(super) mod compression_feedback;
19mod context;
20mod context_impls;
21pub(crate) mod context_manager;
22mod corrections;
23pub mod error;
24mod experiment_cmd;
25pub(crate) mod focus;
26mod index;
27mod learning;
28pub(crate) mod learning_engine;
29mod log_commands;
30mod loop_event;
31mod lsp_commands;
32mod magic_docs;
33mod mcp;
34pub(crate) mod memcot;
35mod message_queue;
36mod microcompact;
37mod model_commands;
38mod persistence;
39#[cfg(feature = "scheduler")]
40mod plan;
41mod policy_commands;
42mod provider_cmd;
43mod quality_hook;
44pub(crate) mod rate_limiter;
45#[cfg(feature = "scheduler")]
46mod scheduler_commands;
47#[cfg(feature = "scheduler")]
48mod scheduler_loop;
49mod scope_commands;
50pub mod session_config;
51mod session_digest;
52pub mod shadow_sentinel;
53pub(crate) mod sidequest;
54mod skill_management;
55pub mod slash_commands;
56pub mod speculative;
57pub(crate) mod state;
58pub(crate) mod task_injection;
59pub(crate) mod tool_execution;
60pub(crate) mod tool_orchestrator;
61pub mod trajectory;
62mod trajectory_commands;
63mod trust_commands;
64pub mod turn;
65mod utils;
66pub(crate) mod vigil;
67
68use std::collections::{HashMap, VecDeque};
69use std::fmt::Write as _;
70use std::sync::Arc;
71
72use parking_lot::RwLock;
73
74use tokio::sync::{mpsc, watch};
75use tokio_util::sync::CancellationToken;
76use zeph_llm::any::AnyProvider;
77use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
78use zeph_memory::TokenCounter;
79use zeph_memory::semantic::SemanticMemory;
80use zeph_skills::loader::Skill;
81use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
82use zeph_skills::prompt::format_skills_prompt;
83use zeph_skills::registry::SkillRegistry;
84use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
85
86use crate::channel::Channel;
87use crate::config::Config;
88use crate::context::{ContextBudget, build_system_prompt};
89use zeph_common::text::estimate_tokens;
90
91use loop_event::LoopEvent;
92use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, detect_image_mime};
93use state::MessageState;
94
95pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
96// CODE_CONTEXT_PREFIX is re-exported from zeph-agent-context::helpers so callers inside
97// zeph-core that build system-prompt injections can use it without depending on zeph-agent-context
98// directly. SESSION_DIGEST_PREFIX was removed when assembly migrated to ContextService.
99pub(crate) use zeph_agent_context::helpers::CODE_CONTEXT_PREFIX;
100pub(crate) const SCHEDULED_TASK_PREFIX: &str = "Execute the following scheduled task now: ";
101pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
102
103pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
104    use std::fmt::Write;
105    let capacity = "[tool output: ".len()
106        + tool_name.len()
107        + "]\n```\n".len()
108        + body.len()
109        + TOOL_OUTPUT_SUFFIX.len();
110    let mut buf = String::with_capacity(capacity);
111    let _ = write!(
112        buf,
113        "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
114    );
115    buf
116}
117
118/// Zeph agent: autonomous AI system with multi-model inference, semantic memory, skills,
119/// tool orchestration, and multi-channel I/O.
120///
121/// The agent maintains conversation history, manages LLM provider state, coordinates tool
122/// execution, and orchestrates memory and skill subsystems. It communicates with the outside
123/// world via the [`Channel`] trait, enabling support for CLI, Telegram, TUI, or custom I/O.
124///
125/// # Architecture
126///
127/// - **Message state**: Conversation history with system prompt, message queue, and metadata
128/// - **Memory state**: `SQLite` + Qdrant vector store for semantic search and compaction
129/// - **Skill state**: Registry, matching engine, and self-learning evolution
130/// - **Context manager**: Token budgeting, context assembly, and summarization
131/// - **Tool orchestrator**: DAG-based multi-tool execution with streaming output
132/// - **MCP client**: Multi-server support for Model Context Protocol
133/// - **Index state**: AST-based code indexing and semantic retrieval
134/// - **Security**: Sanitization, exfiltration detection, adversarial probes
135/// - **Metrics**: Token usage, latency, cost, and anomaly tracking
136///
137/// # Channel Contract
138///
139/// The agent requires a [`Channel`] implementation for user interaction:
140/// - Sends agent responses via `channel.send(message)`
141/// - Receives user input via `channel.recv()` / `channel.recv_internal()`
142/// - Supports structured events: tool invocations, tool output, streaming updates
143///
144/// # Lifecycle
145///
146/// 1. Create with [`Self::new`] or [`Self::new_with_registry_arc`]
147/// 2. Run main loop with [`Self::run`]
148/// 3. Clean up with [`Self::shutdown`] to persist state and close resources
149///
150pub struct Agent<C: Channel> {
151    // --- I/O & primary providers (kept inline) ---
152    provider: AnyProvider,
153    /// Dedicated embedding provider. Resolved once at bootstrap from `[[llm.providers]]`
154    /// (the entry with `embed = true`, or first entry with `embedding_model` set).
155    /// Falls back to `provider.clone()` when no dedicated entry exists.
156    /// **Never replaced** by `/provider switch`.
157    embedding_provider: AnyProvider,
158    channel: C,
159    pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
160
161    // --- Conversation core (kept inline) ---
162    pub(super) msg: MessageState,
163    pub(super) context_manager: context_manager::ContextManager,
164    pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
165
166    // --- Aggregated background services ---
167    pub(super) services: state::Services,
168
169    // --- Aggregated runtime / lifecycle / telemetry ---
170    pub(super) runtime: state::AgentRuntime,
171}
172
173/// Control flow signal returned by [`Agent::apply_dispatch_result`].
174enum DispatchFlow {
175    /// The command requested exit; the agent loop should `break`.
176    Break,
177    /// The command was handled; the agent loop should `continue`.
178    Continue,
179    /// The command was not recognised; the agent loop should fall through.
180    Fallthrough,
181}
182
183impl<C: Channel> Agent<C> {
184    /// Create a new agent instance with the given LLM provider, I/O channel, and subsystems.
185    ///
186    /// # Arguments
187    ///
188    /// * `provider` — Multi-model LLM provider (Claude, `OpenAI`, Ollama, Candle)
189    /// * `channel` — I/O abstraction for user interaction (CLI, Telegram, TUI, etc.)
190    /// * `registry` — Skill registry; moved into an internal `Arc<RwLock<_>>` for sharing
191    /// * `matcher` — Optional semantic skill matcher (e.g., Qdrant, BM25). If `None`,
192    ///   skills are matched by exact name only
193    /// * `max_active_skills` — Max concurrent skills in execution (must be > 0)
194    /// * `tool_executor` — Trait object for executing shell, web, and custom tools
195    ///
196    /// # Initialization
197    ///
198    /// The constructor:
199    /// 1. Wraps the skill registry into `Arc<RwLock<_>>` internally
200    /// 2. Builds the system prompt from registered skills
201    /// 3. Initializes all subsystems (memory, context manager, metrics, security)
202    /// 4. Returns a ready-to-run agent
203    ///
204    /// # Panics
205    ///
206    /// Panics if `max_active_skills` is 0.
207    #[must_use]
208    pub fn new(
209        provider: AnyProvider,
210        channel: C,
211        registry: SkillRegistry,
212        matcher: Option<SkillMatcherBackend>,
213        max_active_skills: usize,
214        tool_executor: impl ToolExecutor + 'static,
215    ) -> Self {
216        let registry = Arc::new(RwLock::new(registry));
217        let embedding_provider = provider.clone();
218        Self::new_with_registry_arc(
219            provider,
220            embedding_provider,
221            channel,
222            registry,
223            matcher,
224            max_active_skills,
225            tool_executor,
226        )
227    }
228
229    /// Create an agent from a pre-wrapped registry Arc, allowing the caller to
230    /// share the same Arc with other components (e.g. [`crate::SkillLoaderExecutor`]).
231    ///
232    /// # Panics
233    ///
234    /// Panics if the registry `RwLock` is poisoned.
235    #[must_use]
236    pub fn new_with_registry_arc(
237        provider: AnyProvider,
238        embedding_provider: AnyProvider,
239        channel: C,
240        registry: Arc<RwLock<SkillRegistry>>,
241        matcher: Option<SkillMatcherBackend>,
242        max_active_skills: usize,
243        tool_executor: impl ToolExecutor + 'static,
244    ) -> Self {
245        use state::{
246            AgentRuntime, CompressionState, DebugState, ExperimentState, FeedbackState, IndexState,
247            InstructionState, LifecycleState, McpState, MemoryState, MetricsState,
248            OrchestrationState, ProviderState, RuntimeConfig, SecurityState, Services,
249            SessionState, SkillState, ToolState,
250        };
251
252        debug_assert!(max_active_skills > 0, "max_active_skills must be > 0");
253        let all_skills: Vec<Skill> = {
254            let reg = registry.read();
255            reg.all_meta()
256                .iter()
257                .filter_map(|m| reg.skill(&m.name).ok())
258                .collect()
259        };
260        let empty_trust = HashMap::new();
261        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
262        let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
263        let system_prompt = build_system_prompt(&skills_prompt, None);
264        tracing::debug!(len = system_prompt.len(), "initial system prompt built");
265        tracing::trace!(prompt = %system_prompt, "full system prompt");
266
267        let initial_prompt_tokens = estimate_tokens(&system_prompt) as u64;
268        let token_counter = Arc::new(TokenCounter::new());
269
270        let services = Services {
271            memory: MemoryState::default(),
272            skill: SkillState::new(registry, matcher, max_active_skills, skills_prompt),
273            learning_engine: learning_engine::LearningEngine::new(),
274            feedback: FeedbackState::default(),
275            mcp: McpState::default(),
276            index: IndexState::default(),
277            session: SessionState::new(),
278            security: SecurityState::default(),
279            experiments: ExperimentState::new(),
280            compression: CompressionState::default(),
281            orchestration: OrchestrationState::default(),
282            focus: focus::FocusState::default(),
283            sidequest: sidequest::SidequestState::default(),
284            tool_state: ToolState::default(),
285            goal_accounting: None,
286            quality: None,
287            proactive_explorer: None,
288            promotion_engine: None,
289            taco_compressor: None,
290            speculation_engine: None,
291        };
292
293        let runtime = AgentRuntime {
294            config: RuntimeConfig::default(),
295            lifecycle: LifecycleState::new(),
296            providers: ProviderState::new(initial_prompt_tokens),
297            metrics: MetricsState::new(token_counter),
298            debug: DebugState::default(),
299            instructions: InstructionState::default(),
300        };
301
302        Self {
303            provider,
304            embedding_provider,
305            channel,
306            tool_executor: Arc::new(tool_executor),
307            msg: MessageState {
308                messages: vec![Message {
309                    role: Role::System,
310                    content: system_prompt,
311                    parts: vec![],
312                    metadata: MessageMetadata::default(),
313                }],
314                message_queue: VecDeque::new(),
315                pending_image_parts: Vec::new(),
316                last_persisted_message_id: None,
317                deferred_db_hide_ids: Vec::new(),
318                deferred_db_summaries: Vec::new(),
319            },
320            context_manager: context_manager::ContextManager::new(),
321            tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
322            services,
323            runtime,
324        }
325    }
326
327    /// Consume the agent and return the inner channel.
328    ///
329    /// Call this after [`run`][Agent::run] completes to retrieve the I/O channel (e.g., to
330    /// read captured responses from a headless channel such as `BenchmarkChannel`).
331    ///
332    /// # Examples
333    ///
334    /// ```no_run
335    /// # use zeph_core::agent::Agent;
336    /// // After agent.run().await completes, consume the agent to retrieve the channel.
337    /// // let channel: MyChannel = agent.into_channel();
338    /// ```
339    #[must_use]
340    pub fn into_channel(self) -> C {
341        self.channel
342    }
343
344    /// Poll all active sub-agents for completed/failed/canceled results.
345    ///
346    /// Non-blocking: returns immediately with a list of `(task_id, result)` pairs
347    /// for agents that have finished. Each completed agent is removed from the manager.
348    #[tracing::instrument(name = "core.agent.poll_subagents", skip_all, level = "debug")]
349    pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
350        let Some(mgr) = &mut self.services.orchestration.subagent_manager else {
351            return vec![];
352        };
353
354        let finished: Vec<String> = mgr
355            .statuses()
356            .into_iter()
357            .filter_map(|(id, status)| {
358                if matches!(
359                    status.state,
360                    zeph_subagent::SubAgentState::Completed
361                        | zeph_subagent::SubAgentState::Failed
362                        | zeph_subagent::SubAgentState::Canceled
363                ) {
364                    Some(id)
365                } else {
366                    None
367                }
368            })
369            .collect();
370
371        let mut results = vec![];
372        for task_id in finished {
373            match mgr.collect(&task_id).await {
374                Ok(result) => results.push((task_id, result)),
375                Err(e) => {
376                    tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
377                }
378            }
379        }
380        results
381    }
382
383    /// Call the LLM to generate a structured session summary with a configurable timeout.
384    ///
385    /// Falls back to plain-text chat if structured output fails or times out. Returns `None` on
386    /// any failure, logging a warning — callers must treat `None` as "skip storage".
387    ///
388    /// Each LLM attempt is bounded by `shutdown_summary_timeout_secs`; in the worst case
389    /// (structured call times out and plain-text fallback also times out) this adds up to
390    /// `2 * shutdown_summary_timeout_secs` of shutdown latency.
391    async fn call_llm_for_session_summary(
392        &self,
393        chat_messages: &[Message],
394    ) -> Option<zeph_memory::StructuredSummary> {
395        let timeout_dur = std::time::Duration::from_secs(
396            self.services
397                .memory
398                .compaction
399                .shutdown_summary_timeout_secs,
400        );
401        match tokio::time::timeout(
402            timeout_dur,
403            self.provider
404                .chat_typed_erased::<zeph_memory::StructuredSummary>(chat_messages),
405        )
406        .await
407        {
408            Ok(Ok(s)) => Some(s),
409            Ok(Err(e)) => {
410                tracing::warn!(
411                    "shutdown summary: structured LLM call failed, falling back to plain: {e:#}"
412                );
413                self.plain_text_summary_fallback(chat_messages, timeout_dur)
414                    .await
415            }
416            Err(_) => {
417                tracing::warn!(
418                    "shutdown summary: structured LLM call timed out after {}s, falling back to plain",
419                    self.services
420                        .memory
421                        .compaction
422                        .shutdown_summary_timeout_secs
423                );
424                self.plain_text_summary_fallback(chat_messages, timeout_dur)
425                    .await
426            }
427        }
428    }
429
430    async fn plain_text_summary_fallback(
431        &self,
432        chat_messages: &[Message],
433        timeout_dur: std::time::Duration,
434    ) -> Option<zeph_memory::StructuredSummary> {
435        match tokio::time::timeout(timeout_dur, self.provider.chat(chat_messages)).await {
436            Ok(Ok(plain)) => Some(zeph_memory::StructuredSummary {
437                summary: plain,
438                key_facts: vec![],
439                entities: vec![],
440            }),
441            Ok(Err(e)) => {
442                tracing::warn!("shutdown summary: plain LLM fallback failed: {e:#}");
443                None
444            }
445            Err(_) => {
446                tracing::warn!("shutdown summary: plain LLM fallback timed out");
447                None
448            }
449        }
450    }
451
452    /// Persist tombstone `ToolResult` messages for any assistant `ToolUse` parts that were written
453    /// to the DB during this session but never paired with a `ToolResult` (e.g. because stdin
454    /// closed while tool execution was in progress). Without this the next session startup strips
455    /// those assistant messages and emits orphan warnings.
456    async fn flush_orphaned_tool_use_on_shutdown(&mut self) {
457        use zeph_llm::provider::{MessagePart, Role};
458
459        // Walk messages in reverse: if the last assistant message (ignoring any trailing
460        // system messages) has ToolUse parts and is NOT immediately followed by a user
461        // message whose ToolResult ids cover those ToolUse ids, persist tombstones.
462        let msgs = &self.msg.messages;
463        // Find last assistant message index.
464        let Some(asst_idx) = msgs.iter().rposition(|m| m.role == Role::Assistant) else {
465            return;
466        };
467        let asst_msg = &msgs[asst_idx];
468        let tool_use_ids: Vec<(&str, &str, &serde_json::Value)> = asst_msg
469            .parts
470            .iter()
471            .filter_map(|p| {
472                if let MessagePart::ToolUse { id, name, input } = p {
473                    Some((id.as_str(), name.as_str(), input))
474                } else {
475                    None
476                }
477            })
478            .collect();
479        if tool_use_ids.is_empty() {
480            return;
481        }
482
483        // Check whether a following user message already pairs all ToolUse ids.
484        let paired_ids: std::collections::HashSet<&str> = msgs
485            .get(asst_idx + 1..)
486            .into_iter()
487            .flatten()
488            .filter(|m| m.role == Role::User)
489            .flat_map(|m| m.parts.iter())
490            .filter_map(|p| {
491                if let MessagePart::ToolResult { tool_use_id, .. } = p {
492                    Some(tool_use_id.as_str())
493                } else {
494                    None
495                }
496            })
497            .collect();
498
499        let unpaired: Vec<zeph_llm::provider::ToolUseRequest> = tool_use_ids
500            .iter()
501            .filter(|(id, _, _)| !paired_ids.contains(*id))
502            .map(|(id, name, input)| zeph_llm::provider::ToolUseRequest {
503                id: (*id).to_owned(),
504                name: (*name).to_owned().into(),
505                input: (*input).clone(),
506            })
507            .collect();
508
509        if unpaired.is_empty() {
510            return;
511        }
512
513        tracing::info!(
514            count = unpaired.len(),
515            "shutdown: persisting tombstone ToolResults for unpaired in-flight tool calls"
516        );
517        self.persist_cancelled_tool_results(&unpaired).await;
518    }
519
520    /// Generate and store a lightweight session summary at shutdown when no hard compaction fired.
521    ///
522    /// Guards:
523    /// - `shutdown_summary` config must be enabled
524    /// - `conversation_id` must be set (memory must be attached)
525    /// - no existing session summary in the store (primary guard — resilient to failed Qdrant writes)
526    /// - at least `shutdown_summary_min_messages` user-turn messages in history
527    ///
528    /// All errors are logged as warnings and swallowed — shutdown must never fail.
529    async fn maybe_store_shutdown_summary(&mut self) {
530        if !self.services.memory.compaction.shutdown_summary {
531            return;
532        }
533        let Some(memory) = self.services.memory.persistence.memory.clone() else {
534            return;
535        };
536        let Some(conversation_id) = self.services.memory.persistence.conversation_id else {
537            return;
538        };
539
540        // Primary guard: check if a summary already exists (handles failed Qdrant writes too).
541        match memory.has_session_summary(conversation_id).await {
542            Ok(true) => {
543                tracing::debug!("shutdown summary: session already has a summary, skipping");
544                return;
545            }
546            Ok(false) => {}
547            Err(e) => {
548                tracing::warn!("shutdown summary: failed to check existing summary: {e:#}");
549                return;
550            }
551        }
552
553        // Count user-turn messages only (skip system prompt at index 0).
554        let user_count = self
555            .msg
556            .messages
557            .iter()
558            .skip(1)
559            .filter(|m| m.role == Role::User)
560            .count();
561        if user_count
562            < self
563                .services
564                .memory
565                .compaction
566                .shutdown_summary_min_messages
567        {
568            tracing::debug!(
569                user_count,
570                min = self
571                    .services
572                    .memory
573                    .compaction
574                    .shutdown_summary_min_messages,
575                "shutdown summary: too few user messages, skipping"
576            );
577            return;
578        }
579
580        // TUI status — send errors silently ignored (TUI may already be gone at shutdown).
581        let _ = self.channel.send_status("Saving session summary...").await;
582
583        // Collect last N messages (skip system prompt at index 0).
584        let max = self
585            .services
586            .memory
587            .compaction
588            .shutdown_summary_max_messages;
589        if max == 0 {
590            tracing::debug!("shutdown summary: max_messages=0, skipping");
591            return;
592        }
593        let non_system: Vec<_> = self.msg.messages.iter().skip(1).collect();
594        let slice = if non_system.len() > max {
595            &non_system[non_system.len() - max..]
596        } else {
597            &non_system[..]
598        };
599
600        let msgs_for_prompt: Vec<(zeph_memory::MessageId, String, String)> = slice
601            .iter()
602            .map(|m| {
603                let role = match m.role {
604                    Role::User => "user".to_owned(),
605                    Role::Assistant => "assistant".to_owned(),
606                    Role::System => "system".to_owned(),
607                };
608                (zeph_memory::MessageId(0), role, m.content.clone())
609            })
610            .collect();
611
612        let prompt = zeph_memory::build_summarization_prompt(&msgs_for_prompt);
613        let chat_messages = vec![Message {
614            role: Role::User,
615            content: prompt,
616            parts: vec![],
617            metadata: MessageMetadata::default(),
618        }];
619
620        let Some(structured) = self.call_llm_for_session_summary(&chat_messages).await else {
621            let _ = self.channel.send_status("").await;
622            return;
623        };
624
625        if let Err(e) = memory
626            .store_shutdown_summary(conversation_id, &structured.summary, &structured.key_facts)
627            .await
628        {
629            tracing::warn!("shutdown summary: storage failed: {e:#}");
630        } else {
631            tracing::info!(
632                conversation_id = conversation_id.0,
633                "shutdown summary stored"
634            );
635        }
636
637        // Clear TUI status.
638        let _ = self.channel.send_status("").await;
639    }
640
641    /// Gracefully shut down the agent and persist state.
642    ///
643    /// Performs the following cleanup:
644    ///
645    /// 1. **Message persistence** — Deferred database writes (hide/summary operations)
646    ///    are flushed to memory or disk
647    /// 2. **Provider state** — LLM router state (e.g., Thompson sampling counters) is saved
648    ///    to the vault
649    /// 3. **Sub-agents** — All active sub-agent tasks are terminated
650    /// 4. **MCP servers** — All connected Model Context Protocol servers are shut down
651    /// 5. **Metrics finalization** — Compaction metrics and session metrics are recorded
652    /// 6. **Memory finalization** — Vector stores and semantic indices are flushed
653    /// 7. **Skill state** — Self-learning engine saves evolved skill definitions
654    ///
655    /// Call this before dropping the agent to ensure no data loss.
656    #[tracing::instrument(name = "core.agent.shutdown", skip_all, level = "debug")]
657    pub async fn shutdown(&mut self) {
658        let _ = self.channel.send_status("Shutting down...").await;
659
660        // CRIT-1: persist Thompson state accumulated during this session.
661        self.provider.save_router_state().await;
662
663        // Persist AdaptOrch Beta-arm table alongside Thompson state.
664        if let Some(ref advisor) = self.services.orchestration.topology_advisor
665            && let Err(e) = advisor.save()
666        {
667            tracing::warn!(error = %e, "adaptorch: failed to persist state");
668        }
669
670        if let Some(ref mut mgr) = self.services.orchestration.subagent_manager {
671            mgr.shutdown_all();
672        }
673
674        if let Some(ref manager) = self.services.mcp.manager {
675            manager.shutdown_all_shared().await;
676        }
677
678        // Finalize compaction trajectory: push the last open segment into the Vec.
679        // This segment would otherwise only be pushed when the next hard compaction fires,
680        // which never happens at session end.
681        if let Some(turns) = self.context_manager.turns_since_last_hard_compaction {
682            self.update_metrics(|m| {
683                m.compaction_turns_after_hard.push(turns);
684            });
685            self.context_manager.turns_since_last_hard_compaction = None;
686        }
687
688        if let Some(ref tx) = self.runtime.metrics.metrics_tx {
689            let m = tx.borrow();
690            if m.filter_applications > 0 {
691                #[allow(clippy::cast_precision_loss)]
692                let pct = if m.filter_raw_tokens > 0 {
693                    m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
694                } else {
695                    0.0
696                };
697                tracing::info!(
698                    raw_tokens = m.filter_raw_tokens,
699                    saved_tokens = m.filter_saved_tokens,
700                    applications = m.filter_applications,
701                    "tool output filtering saved ~{} tokens ({pct:.0}%)",
702                    m.filter_saved_tokens,
703                );
704            }
705            if m.compaction_hard_count > 0 {
706                tracing::info!(
707                    hard_compactions = m.compaction_hard_count,
708                    turns_after_hard = ?m.compaction_turns_after_hard,
709                    "hard compaction trajectory"
710                );
711            }
712        }
713
714        // Flush tombstone ToolResults for any assistant ToolUse that was persisted but never
715        // paired with a ToolResult (e.g. stdin EOF mid-execution). Without this the next session
716        // startup strips the orphaned ToolUse and emits warnings.
717        self.flush_orphaned_tool_use_on_shutdown().await;
718
719        // NOTE: forcibly aborts in-flight Enrichment and Telemetry tasks tracked by the
720        // supervisor. Before the sprint-1 refactor, experiment sessions were detached
721        // tokio::spawn calls that survived shutdown; they are now intentionally untracked
722        // (see experiment_cmd.rs) and will continue running until their own CancellationToken
723        // is triggered or the process exits.
724        self.runtime.lifecycle.supervisor.abort_all();
725
726        // Abort background task handles not tracked by BackgroundSupervisor.
727        // Per the Await Discipline rule, fire-and-forget handles must be aborted on shutdown.
728        if let Some(h) = self.services.compression.pending_task_goal.take() {
729            h.abort();
730        }
731        if let Some(h) = self.services.compression.pending_sidequest_result.take() {
732            h.abort();
733        }
734        if let Some(h) = self.services.compression.pending_subgoal.take() {
735            h.abort();
736        }
737
738        // Abort learning tasks (JoinSet detached at turn boundaries but not on shutdown).
739        self.services.learning_engine.learning_tasks.abort_all();
740
741        // Allow cancelled tasks to release their HTTP connections before the summary LLM call.
742        // abort_all() posts cancellation signals but does not drain tasks; aborted futures only
743        // observe cancellation at their next .await point. Without yielding here the summary
744        // call races in-flight enrichment HTTP connections for the same API rate-limit budget.
745        for _ in 0..4 {
746            tokio::task::yield_now().await;
747        }
748
749        self.maybe_store_shutdown_summary().await;
750        self.maybe_store_session_digest().await;
751
752        tracing::info!("agent shutdown complete");
753    }
754
755    /// Run the chat loop, receiving messages via the channel until EOF or shutdown.
756    ///
757    /// # Errors
758    ///
759    /// Returns an error if channel I/O or LLM communication fails.
760    /// Refresh sub-agent metrics snapshot for the TUI metrics panel.
761    fn refresh_subagent_metrics(&mut self) {
762        let Some(ref mgr) = self.services.orchestration.subagent_manager else {
763            return;
764        };
765        let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
766            .statuses()
767            .into_iter()
768            .map(|(id, s)| {
769                let def = mgr.agents_def(&id);
770                crate::metrics::SubAgentMetrics {
771                    name: def.map_or_else(|| id[..8.min(id.len())].to_owned(), |d| d.name.clone()),
772                    id: id.clone(),
773                    state: format!("{:?}", s.state).to_lowercase(),
774                    turns_used: s.turns_used,
775                    max_turns: def.map_or(20, |d| d.permissions.max_turns),
776                    background: def.is_some_and(|d| d.permissions.background),
777                    elapsed_secs: s.started_at.elapsed().as_secs(),
778                    permission_mode: def.map_or_else(String::new, |d| {
779                        use zeph_subagent::def::PermissionMode;
780                        match d.permissions.permission_mode {
781                            PermissionMode::Default => String::new(),
782                            PermissionMode::AcceptEdits => "accept_edits".into(),
783                            PermissionMode::DontAsk => "dont_ask".into(),
784                            PermissionMode::BypassPermissions => "bypass_permissions".into(),
785                            PermissionMode::Plan => "plan".into(),
786                        }
787                    }),
788                    transcript_dir: mgr
789                        .agent_transcript_dir(&id)
790                        .map(|p| p.to_string_lossy().into_owned()),
791                }
792            })
793            .collect();
794        self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
795    }
796
797    /// Non-blocking poll: notify the user when background sub-agents complete.
798    async fn notify_completed_subagents(&mut self) -> Result<(), error::AgentError> {
799        let completed = self.poll_subagents().await;
800        for (task_id, result) in completed {
801            let notice = if result.is_empty() {
802                format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
803            } else {
804                format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
805            };
806            if let Err(e) = self.channel.send(&notice).await {
807                tracing::warn!(error = %e, "failed to send sub-agent completion notice");
808            }
809        }
810        Ok(())
811    }
812
813    /// Run the agent main loop.
814    ///
815    /// # Errors
816    ///
817    /// Returns an error if the channel, LLM provider, or tool execution encounters a fatal error.
818    #[tracing::instrument(name = "core.agent.run", skip_all, level = "debug", err)]
819    #[allow(clippy::too_many_lines)] // run loop is inherently large; each branch is independent
820    pub async fn run(&mut self) -> Result<(), error::AgentError>
821    where
822        C: 'static,
823    {
824        if let Some(mut rx) = self.runtime.lifecycle.warmup_ready.take()
825            && !*rx.borrow()
826        {
827            let _ = rx.changed().await;
828            if !*rx.borrow() {
829                tracing::warn!("model warmup did not complete successfully");
830            }
831        }
832
833        // Restore the last-used provider preference before any user interaction (#3308).
834        self.restore_channel_provider().await;
835
836        // Load the session digest once at session start for context injection.
837        self.load_and_cache_session_digest().await;
838        self.maybe_send_resume_recap().await;
839
840        loop {
841            self.apply_provider_override();
842            self.check_tool_refresh().await;
843            self.process_pending_elicitations().await;
844            self.refresh_subagent_metrics();
845            self.notify_completed_subagents().await?;
846            self.drain_channel();
847
848            let (text, image_parts) = if let Some(queued) = self.msg.message_queue.pop_front() {
849                self.notify_queue_count().await;
850                if queued.raw_attachments.is_empty() {
851                    (queued.text, queued.image_parts)
852                } else {
853                    let msg = crate::channel::ChannelMessage {
854                        text: queued.text,
855                        attachments: queued.raw_attachments,
856                        is_guest_context: false,
857                        is_from_bot: false,
858                    };
859                    self.resolve_message(msg).await
860                }
861            } else {
862                match self.next_event().await? {
863                    None | Some(LoopEvent::Shutdown) => break,
864                    Some(LoopEvent::SkillReload) => {
865                        self.reload_skills().await;
866                        continue;
867                    }
868                    Some(LoopEvent::InstructionReload) => {
869                        self.reload_instructions();
870                        continue;
871                    }
872                    Some(LoopEvent::ConfigReload) => {
873                        self.reload_config();
874                        continue;
875                    }
876                    Some(LoopEvent::UpdateNotification(msg)) => {
877                        if let Err(e) = self.channel.send(&msg).await {
878                            tracing::warn!("failed to send update notification: {e}");
879                        }
880                        continue;
881                    }
882                    Some(LoopEvent::ExperimentCompleted(msg)) => {
883                        self.services.experiments.cancel = None;
884                        if let Err(e) = self.channel.send(&msg).await {
885                            tracing::warn!("failed to send experiment completion: {e}");
886                        }
887                        continue;
888                    }
889                    Some(LoopEvent::ScheduledTask(prompt)) => {
890                        let text = format!("{SCHEDULED_TASK_PREFIX}{prompt}");
891                        let msg = crate::channel::ChannelMessage {
892                            text,
893                            attachments: Vec::new(),
894                            is_guest_context: false,
895                            is_from_bot: false,
896                        };
897                        self.drain_channel();
898                        self.resolve_message(msg).await
899                    }
900                    Some(LoopEvent::TaskInjected(injection)) => {
901                        if let Some(ref mut ls) = self.runtime.lifecycle.user_loop {
902                            ls.iteration += 1;
903                            tracing::info!(iteration = ls.iteration, "loop: tick");
904                        }
905                        let msg = crate::channel::ChannelMessage {
906                            text: injection.prompt,
907                            attachments: Vec::new(),
908                            is_guest_context: false,
909                            is_from_bot: false,
910                        };
911                        self.drain_channel();
912                        self.resolve_message(msg).await
913                    }
914                    Some(LoopEvent::FileChanged(event)) => {
915                        self.handle_file_changed(event).await;
916                        continue;
917                    }
918                    Some(LoopEvent::Message(msg)) => {
919                        self.services.session.is_guest_context = msg.is_guest_context;
920                        self.drain_channel();
921                        self.resolve_message(msg).await
922                    }
923                }
924            };
925
926            let trimmed = text.trim();
927
928            // M3: extract flagged URLs from all slash commands before any registry dispatch,
929            // so `/skill install <url>` and similar commands populate user_provided_urls.
930            if trimmed.starts_with('/') {
931                let slash_urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
932                if !slash_urls.is_empty() {
933                    self.services
934                        .security
935                        .user_provided_urls
936                        .write()
937                        .extend(slash_urls);
938                }
939            }
940
941            // Registry dispatch: two-phase command dispatch.
942            //
943            // Phase 1 (session/debug): handlers that need sink + debug + messages but NOT agent.
944            // Phase 2 (agent): handlers that need &mut Agent directly; use null sentinels for
945            // the other CommandContext fields to satisfy the type but avoid borrow conflicts.
946            //
947            // STRUCTURAL NOTE (C4 — borrow-checker constraint, not deferred by oversight):
948            // A `TurnState<'a, C>` struct grouping disjoint `&mut Agent<C>` sub-fields would
949            // eliminate the LIFO-sentinel ordering below. The obstacle: `AgentAccess` is
950            // implemented on `Agent<C>` itself (see `agent_access_impl.rs`), which accesses
951            // fields like `memory_state`, `providers`, `mcp`, and `skill_state`. Those fields
952            // overlap with what a `TurnState` would need to borrow, so `AgentBackend::Real`
953            // cannot simultaneously hold `&mut Agent` while `TurnState` holds `&mut Agent.providers`.
954            // The fix requires splitting `Agent<C>` fields into two disjoint sub-structs and moving
955            // `AgentAccess` to the sub-struct that is disjoint from `TurnState`'s borrow set.
956            // That restructuring touches `agent_access_impl.rs`, `state.rs`, `builder.rs`, all
957            // command handlers, and the binary crate — estimated > 300 lines across > 5 files.
958            // Track as a multi-PR refactor; the current sentinel pattern is correct and safe.
959            //
960            // Drop-order rules enforced here:
961            //   - `sink_adapter` / `null_agent` declared before the registry block → dropped after.
962            //   - Phase-2 sentinels declared before `ctx` → dropped after `ctx`.
963            let session_impl = command_context_impls::SessionAccessImpl {
964                supports_exit: self.channel.supports_exit(),
965            };
966            let mut messages_impl = command_context_impls::MessageAccessImpl {
967                msg: &mut self.msg,
968                tool_state: &mut self.services.tool_state,
969                providers: &mut self.runtime.providers,
970                metrics: &self.runtime.metrics,
971                security: &mut self.services.security,
972                tool_orchestrator: &mut self.tool_orchestrator,
973            };
974            // sink_adapter declared before reg so it is dropped after reg (LIFO).
975            let mut sink_adapter = crate::channel::ChannelSinkAdapter(&mut self.channel);
976            // null_agent must be declared before reg so it lives longer (LIFO drop order).
977            let mut null_agent = zeph_commands::NullAgent;
978            let registry_handled = {
979                use zeph_commands::CommandRegistry;
980                use zeph_commands::handlers::debug::{
981                    DebugDumpCommand, DumpFormatCommand, LogCommand,
982                };
983                use zeph_commands::handlers::help::HelpCommand;
984                use zeph_commands::handlers::session::{
985                    ClearCommand, ClearQueueCommand, ExitCommand, QuitCommand, ResetCommand,
986                };
987
988                let mut reg = CommandRegistry::new();
989                reg.register(ExitCommand);
990                reg.register(QuitCommand);
991                reg.register(ClearCommand);
992                reg.register(ResetCommand);
993                reg.register(ClearQueueCommand);
994                reg.register(LogCommand);
995                reg.register(DebugDumpCommand);
996                reg.register(DumpFormatCommand);
997                reg.register(HelpCommand);
998                #[cfg(test)]
999                reg.register(test_stubs::TestErrorCommand);
1000
1001                let mut ctx = zeph_commands::CommandContext {
1002                    sink: &mut sink_adapter,
1003                    debug: &mut self.runtime.debug,
1004                    messages: &mut messages_impl,
1005                    session: &session_impl,
1006                    agent: &mut null_agent,
1007                };
1008                reg.dispatch(&mut ctx, trimmed).await
1009            };
1010            let session_reg_missed = registry_handled.is_none();
1011            match self
1012                .apply_dispatch_result(registry_handled, trimmed, false)
1013                .await
1014            {
1015                DispatchFlow::Break => break,
1016                DispatchFlow::Continue => continue,
1017                DispatchFlow::Fallthrough => {
1018                    // Not handled by the session/debug registry; try agent-command registry.
1019                }
1020            }
1021
1022            // Agent-command registry: handlers access Agent<C> directly.
1023            // Null sentinels declared here so they outlive ctx regardless of whether the `if`
1024            // block is entered. `ctx` borrows both `self` and the sentinels; it must drop before
1025            // any subsequent `self.channel.*` calls. Because Rust drops in LIFO order, the
1026            // sentinels here will outlive ctx (ctx is declared later, inside the block).
1027            let mut agent_null_debug = command_context_impls::NullDebugAccess;
1028            let mut agent_null_messages = command_context_impls::NullMessageAccess;
1029            let agent_null_session = command_context_impls::NullSessionAccess;
1030            let mut agent_null_sink = zeph_commands::NullSink;
1031            let agent_result: Option<
1032                Result<zeph_commands::CommandOutput, zeph_commands::CommandError>,
1033            > = if session_reg_missed {
1034                use zeph_commands::CommandRegistry;
1035                use zeph_commands::handlers::{
1036                    acp::AcpCommand,
1037                    agent_cmd::AgentCommand,
1038                    compaction::{CompactCommand, NewConversationCommand, RecapCommand},
1039                    experiment::ExperimentCommand,
1040                    goal::GoalCommand,
1041                    loop_cmd::LoopCommand,
1042                    lsp::LspCommand,
1043                    mcp::McpCommand,
1044                    memory::{GraphCommand, GuidelinesCommand, MemoryCommand},
1045                    misc::{CacheStatsCommand, ImageCommand, NotifyTestCommand},
1046                    model::{ModelCommand, ProviderCommand},
1047                    plan::PlanCommand,
1048                    plugins::PluginsCommand,
1049                    policy::PolicyCommand,
1050                    scheduler::SchedulerCommand,
1051                    skill::{FeedbackCommand, SkillCommand, SkillsCommand},
1052                    status::{FocusCommand, GuardrailCommand, SideQuestCommand, StatusCommand},
1053                    trajectory::{ScopeCommand, TrajectoryCommand},
1054                };
1055
1056                let mut agent_reg = CommandRegistry::new();
1057                agent_reg.register(MemoryCommand);
1058                agent_reg.register(GraphCommand);
1059                agent_reg.register(GuidelinesCommand);
1060                agent_reg.register(ModelCommand);
1061                agent_reg.register(ProviderCommand);
1062                // Phase 6 migrations: /skill, /skills, /feedback use clone-before-await pattern.
1063                agent_reg.register(SkillCommand);
1064                agent_reg.register(SkillsCommand);
1065                agent_reg.register(FeedbackCommand);
1066                agent_reg.register(McpCommand);
1067                agent_reg.register(PolicyCommand);
1068                agent_reg.register(SchedulerCommand);
1069                agent_reg.register(LspCommand);
1070                // Phase 4 migrations (Send-safe commands):
1071                agent_reg.register(CacheStatsCommand);
1072                agent_reg.register(ImageCommand);
1073                agent_reg.register(NotifyTestCommand);
1074                agent_reg.register(StatusCommand);
1075                agent_reg.register(GuardrailCommand);
1076                agent_reg.register(FocusCommand);
1077                agent_reg.register(SideQuestCommand);
1078                agent_reg.register(AgentCommand);
1079                // Phase 5 migrations (Send-compatible):
1080                agent_reg.register(CompactCommand);
1081                agent_reg.register(NewConversationCommand);
1082                agent_reg.register(RecapCommand);
1083                agent_reg.register(ExperimentCommand);
1084                agent_reg.register(PlanCommand);
1085                agent_reg.register(LoopCommand);
1086                agent_reg.register(PluginsCommand);
1087                agent_reg.register(AcpCommand);
1088                #[cfg(feature = "cocoon")]
1089                agent_reg.register(zeph_commands::handlers::cocoon::CocoonCommand);
1090                agent_reg.register(TrajectoryCommand);
1091                agent_reg.register(ScopeCommand);
1092                agent_reg.register(GoalCommand);
1093
1094                let mut ctx = zeph_commands::CommandContext {
1095                    sink: &mut agent_null_sink,
1096                    debug: &mut agent_null_debug,
1097                    messages: &mut agent_null_messages,
1098                    session: &agent_null_session,
1099                    agent: self,
1100                };
1101                // self is reborrowed; ctx drops at end of this block.
1102                agent_reg.dispatch(&mut ctx, trimmed).await
1103            } else {
1104                None
1105            };
1106            // self.channel is available again here (ctx borrow dropped above).
1107            // Post-dispatch learning hook for `/skill reject` / `/feedback` is triggered
1108            // inside apply_dispatch_result when with_learning = true.
1109            match self
1110                .apply_dispatch_result(agent_result, trimmed, true)
1111                .await
1112            {
1113                DispatchFlow::Break => break,
1114                DispatchFlow::Continue => continue,
1115                DispatchFlow::Fallthrough => {
1116                    // Not handled by agent registry; fall through to existing dispatch.
1117                }
1118            }
1119
1120            match self.handle_builtin_command(trimmed) {
1121                Some(true) => break,
1122                Some(false) => continue,
1123                None => {}
1124            }
1125
1126            self.process_user_message(text, image_parts).await?;
1127        }
1128
1129        // autoDream: run background memory consolidation if conditions are met (#2697).
1130        // Runs with a timeout — partial state is acceptable for MVP.
1131        self.maybe_autodream().await;
1132
1133        // Flush trace collector on normal exit (C-04: Drop handles error/panic paths).
1134        if let Some(ref mut tc) = self.runtime.debug.trace_collector {
1135            tc.finish();
1136        }
1137
1138        Ok(())
1139    }
1140
1141    /// Dispatch a slash-command registry result and flush the channel.
1142    ///
1143    /// Returns [`DispatchFlow::Break`] on exit, [`DispatchFlow::Continue`] when handled, or
1144    /// [`DispatchFlow::Fallthrough`] when `result` is `None`.
1145    /// When `with_learning` is `true`, triggers the post-command learning hook for `Message` output.
1146    async fn apply_dispatch_result(
1147        &mut self,
1148        result: Option<Result<zeph_commands::CommandOutput, zeph_commands::CommandError>>,
1149        command: &str,
1150        with_learning: bool,
1151    ) -> DispatchFlow {
1152        match result {
1153            Some(Ok(zeph_commands::CommandOutput::Exit)) => {
1154                let _ = self.channel.flush_chunks().await;
1155                DispatchFlow::Break
1156            }
1157            Some(Ok(
1158                zeph_commands::CommandOutput::Continue | zeph_commands::CommandOutput::Silent,
1159            )) => {
1160                let _ = self.channel.flush_chunks().await;
1161                DispatchFlow::Continue
1162            }
1163            Some(Ok(zeph_commands::CommandOutput::Message(msg))) => {
1164                let _ = self.channel.send(&msg).await;
1165                let _ = self.channel.flush_chunks().await;
1166                if with_learning {
1167                    self.maybe_trigger_post_command_learning(command).await;
1168                }
1169                DispatchFlow::Continue
1170            }
1171            Some(Err(e)) => {
1172                let _ = self.channel.send(&e.to_string()).await;
1173                let _ = self.channel.flush_chunks().await;
1174                tracing::warn!(command = %command, error = %e.0, "slash command failed");
1175                DispatchFlow::Continue
1176            }
1177            None => DispatchFlow::Fallthrough,
1178        }
1179    }
1180
1181    /// Apply any pending LLM provider override from ACP `set_session_config_option`.
1182    fn apply_provider_override(&mut self) {
1183        if let Some(ref slot) = self.runtime.providers.provider_override
1184            && let Some(new_provider) = slot.write().take()
1185        {
1186            tracing::debug!(provider = new_provider.name(), "ACP model override applied");
1187            self.provider = new_provider;
1188        }
1189    }
1190
1191    /// Poll all event sources and return the next [`LoopEvent`].
1192    ///
1193    /// Returns `None` when the inbound channel closes (graceful shutdown).
1194    ///
1195    /// # Errors
1196    ///
1197    /// Propagates channel receive errors.
1198    #[tracing::instrument(name = "core.agent.next_event", skip_all, level = "debug", err)]
1199    async fn next_event(&mut self) -> Result<Option<LoopEvent>, error::AgentError> {
1200        let event = tokio::select! {
1201            result = self.channel.recv() => {
1202                return Ok(result?.map(LoopEvent::Message));
1203            }
1204            () = shutdown_signal(&mut self.runtime.lifecycle.shutdown) => {
1205                tracing::info!("shutting down");
1206                LoopEvent::Shutdown
1207            }
1208            Some(_) = recv_optional(&mut self.services.skill.skill_reload_rx) => {
1209                LoopEvent::SkillReload
1210            }
1211            Some(_) = recv_optional(&mut self.runtime.instructions.reload_rx) => {
1212                LoopEvent::InstructionReload
1213            }
1214            Some(_) = recv_optional(&mut self.runtime.lifecycle.config_reload_rx) => {
1215                LoopEvent::ConfigReload
1216            }
1217            Some(msg) = recv_optional(&mut self.runtime.lifecycle.update_notify_rx) => {
1218                LoopEvent::UpdateNotification(msg)
1219            }
1220            Some(msg) = recv_optional(&mut self.services.experiments.notify_rx) => {
1221                LoopEvent::ExperimentCompleted(msg)
1222            }
1223            Some(prompt) = recv_optional(&mut self.runtime.lifecycle.custom_task_rx) => {
1224                tracing::info!("scheduler: injecting custom task as agent turn");
1225                LoopEvent::ScheduledTask(prompt)
1226            }
1227            () = async {
1228                if let Some(ref mut ls) = self.runtime.lifecycle.user_loop {
1229                    if ls.cancel_tx.is_cancelled() {
1230                        std::future::pending::<()>().await;
1231                    } else {
1232                        ls.interval.tick().await;
1233                    }
1234                } else {
1235                    std::future::pending::<()>().await;
1236                }
1237            } => {
1238                // Re-check user_loop after the tick — /loop stop may have fired between the
1239                // interval firing and this arm executing. Returning Ok(None) causes the caller
1240                // to `continue` without injecting a stale or empty prompt.
1241                let Some(ls) = self.runtime.lifecycle.user_loop.as_ref() else {
1242                    return Ok(None);
1243                };
1244                if ls.cancel_tx.is_cancelled() {
1245                    self.runtime.lifecycle.user_loop = None;
1246                    return Ok(None);
1247                }
1248                let prompt = ls.prompt.clone();
1249                LoopEvent::TaskInjected(task_injection::TaskInjection { prompt })
1250            }
1251            Some(event) = recv_optional(&mut self.runtime.lifecycle.file_changed_rx) => {
1252                LoopEvent::FileChanged(event)
1253            }
1254        };
1255        Ok(Some(event))
1256    }
1257
1258    #[tracing::instrument(name = "core.agent.resolve_message", skip_all, level = "debug")]
1259    async fn resolve_message(
1260        &self,
1261        msg: crate::channel::ChannelMessage,
1262    ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
1263        use crate::channel::{Attachment, AttachmentKind};
1264        use zeph_llm::provider::{ImageData, MessagePart};
1265
1266        let text_base = msg.text.clone();
1267
1268        let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
1269            .attachments
1270            .into_iter()
1271            .partition(|a| a.kind == AttachmentKind::Audio);
1272
1273        tracing::debug!(
1274            audio = audio_attachments.len(),
1275            has_stt = self.runtime.providers.stt.is_some(),
1276            "resolve_message attachments"
1277        );
1278
1279        let text = if !audio_attachments.is_empty()
1280            && let Some(stt) = self.runtime.providers.stt.as_ref()
1281        {
1282            let mut transcribed_parts = Vec::new();
1283            for attachment in &audio_attachments {
1284                if attachment.data.len() > MAX_AUDIO_BYTES {
1285                    tracing::warn!(
1286                        size = attachment.data.len(),
1287                        max = MAX_AUDIO_BYTES,
1288                        "audio attachment exceeds size limit, skipping"
1289                    );
1290                    continue;
1291                }
1292                match stt
1293                    .transcribe(&attachment.data, attachment.filename.as_deref())
1294                    .await
1295                {
1296                    Ok(result) => {
1297                        tracing::info!(
1298                            len = result.text.len(),
1299                            language = ?result.language,
1300                            "audio transcribed"
1301                        );
1302                        transcribed_parts.push(result.text);
1303                    }
1304                    Err(e) => {
1305                        tracing::error!(error = %e, "audio transcription failed");
1306                    }
1307                }
1308            }
1309            if transcribed_parts.is_empty() {
1310                text_base
1311            } else {
1312                let transcribed = transcribed_parts.join("\n");
1313                if text_base.is_empty() {
1314                    transcribed
1315                } else {
1316                    format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
1317                }
1318            }
1319        } else {
1320            if !audio_attachments.is_empty() {
1321                tracing::warn!(
1322                    count = audio_attachments.len(),
1323                    "audio attachments received but no STT provider configured, dropping"
1324                );
1325            }
1326            text_base
1327        };
1328
1329        let mut image_parts = Vec::new();
1330        for attachment in image_attachments {
1331            if attachment.data.len() > MAX_IMAGE_BYTES {
1332                tracing::warn!(
1333                    size = attachment.data.len(),
1334                    max = MAX_IMAGE_BYTES,
1335                    "image attachment exceeds size limit, skipping"
1336                );
1337                continue;
1338            }
1339            let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
1340            image_parts.push(MessagePart::Image(Box::new(ImageData {
1341                data: attachment.data,
1342                mime_type,
1343            })));
1344        }
1345
1346        (text, image_parts)
1347    }
1348
1349    /// Create a new [`Turn`] for the given input and advance the turn counter.
1350    ///
1351    /// Clears per-turn state that must not carry over between turns:
1352    /// - per-turn `CancellationToken` (new token for each turn)
1353    /// - per-turn URL set in `SecurityState` (cleared here; re-populated in
1354    ///   `process_user_message_inner` after security checks)
1355    fn begin_turn(&mut self, input: turn::TurnInput) -> turn::Turn {
1356        let id = turn::TurnId(self.runtime.debug.iteration_counter as u64);
1357        self.runtime.debug.iteration_counter += 1;
1358        let cancel_token = CancellationToken::new();
1359        // keep agent-wide token in sync with per-turn token — TODO(#3498): consolidate in Phase 2
1360        self.runtime.lifecycle.cancel_token = cancel_token.clone();
1361        self.services.security.user_provided_urls.write().clear();
1362        // Reset per-turn LLM request counter for the notification gate.
1363        self.runtime.lifecycle.turn_llm_requests = 0;
1364
1365        // Spec 050 §2: drain pending risk signals from executor layers before advancing.
1366        {
1367            let pending: Vec<u8> = {
1368                let mut q = self.services.security.trajectory_signal_queue.lock();
1369                std::mem::take(&mut *q)
1370            };
1371            for code in pending {
1372                self.services
1373                    .security
1374                    .trajectory
1375                    .record(crate::agent::trajectory::RiskSignal::from_code(code));
1376            }
1377        }
1378        // Spec 050 Invariant 2: advance trajectory sentinel BEFORE any gate evaluation.
1379        // F5: write auto-recover audit entry when sentinel hard-resets.
1380        if self.services.security.trajectory.advance_turn()
1381            && let Some(logger) = self.tool_orchestrator.audit_logger.clone()
1382        {
1383            let entry = zeph_tools::AuditEntry {
1384                timestamp: zeph_tools::chrono_now(),
1385                tool: "<sentinel>".to_owned().into(),
1386                command: String::new(),
1387                result: zeph_tools::AuditResult::Success,
1388                duration_ms: 0,
1389                error_category: Some("trajectory_auto_recover".to_owned()),
1390                error_domain: Some("security".to_owned()),
1391                error_phase: None,
1392                claim_source: None,
1393                mcp_server_id: None,
1394                injection_flagged: false,
1395                embedding_anomalous: false,
1396                cross_boundary_mcp_to_acp: false,
1397                adversarial_policy_decision: None,
1398                exit_code: None,
1399                truncated: false,
1400                caller_id: None,
1401                policy_match: None,
1402                correlation_id: None,
1403                vigil_risk: None,
1404                execution_env: None,
1405                resolved_cwd: None,
1406                scope_at_definition: None,
1407                scope_at_dispatch: None,
1408            };
1409            self.runtime.lifecycle.supervisor.spawn(
1410                crate::agent::agent_supervisor::TaskClass::Telemetry,
1411                "trajectory-auto-recover-audit",
1412                async move { logger.log(&entry).await },
1413            );
1414        }
1415        // Spec 050 Phase 2: reset per-turn probe counter before any tool dispatch.
1416        if let Some(ref sentinel) = self.services.security.shadow_sentinel {
1417            sentinel.advance_turn();
1418        }
1419        // Publish updated risk level to the shared slot so PolicyGateExecutor can read it.
1420        let risk_level = self.services.security.trajectory.current_risk();
1421        *self.services.security.trajectory_risk_slot.write() = u8::from(risk_level);
1422        // TUI/CLI: emit a status indicator when risk reaches High or Critical (NFR-CG-006).
1423        if let Some(alert) = self.services.security.trajectory.poll_alert() {
1424            let msg = format!(
1425                "[trajectory] Risk level: {:?} (score={:.2})",
1426                alert.level, alert.score
1427            );
1428            tracing::warn!(
1429                level = ?alert.level,
1430                score = alert.score,
1431                "trajectory sentinel alert"
1432            );
1433            if let Some(ref tx) = self.services.session.status_tx {
1434                let _ = tx.send(msg);
1435            }
1436        }
1437
1438        let context = turn::TurnContext::new(id, cancel_token, self.runtime.config.timeouts);
1439        turn::Turn::new(context, input)
1440    }
1441
1442    /// Finalise a turn: copy accumulated timings into `MetricsState` and flush.
1443    ///
1444    /// Must be called exactly once per turn, after `process_user_message_inner` returns
1445    /// (regardless of success or error). Corresponds to the M2 resolution in the spec:
1446    /// `TurnMetrics.timings` is the single source of truth; `MetricsState.pending_timings`
1447    /// is populated from it here so the rest of the pipeline is unchanged.
1448    fn end_turn(&mut self, turn: turn::Turn) {
1449        self.runtime.metrics.pending_timings = turn.metrics.timings;
1450        self.flush_turn_timings();
1451        // Clear per-turn intent (FR-008): must not persist across turns.
1452        self.services.session.current_turn_intent = None;
1453        // Clear guest context flag: each turn is independently classified.
1454        self.services.session.is_guest_context = false;
1455        // Cancel all in-flight speculative handles at turn boundary.
1456        if let Some(ref engine) = self.services.speculation_engine {
1457            let metrics = engine.end_turn();
1458            if metrics.committed > 0 || metrics.cancelled > 0 {
1459                tracing::debug!(
1460                    committed = metrics.committed,
1461                    cancelled = metrics.cancelled,
1462                    wasted_ms = metrics.wasted_ms,
1463                    "speculation: turn boundary metrics"
1464                );
1465            }
1466        }
1467    }
1468
1469    #[tracing::instrument(
1470        name = "core.agent.process_user_message",
1471        skip_all,
1472        level = "debug",
1473        fields(turn_id),
1474        err
1475    )]
1476    async fn process_user_message(
1477        &mut self,
1478        text: String,
1479        image_parts: Vec<zeph_llm::provider::MessagePart>,
1480    ) -> Result<(), error::AgentError> {
1481        let input = turn::TurnInput::new(text, image_parts);
1482        let mut t = self.begin_turn(input);
1483
1484        let turn_idx = usize::try_from(t.id().0).unwrap_or(usize::MAX);
1485        tracing::Span::current().record("turn_id", t.id().0);
1486        // Record iteration start in trace collector (C-02: owned guard, no borrow held).
1487        self.runtime
1488            .debug
1489            .start_iteration_span(turn_idx, t.input.text.trim());
1490
1491        let result = Box::pin(self.process_user_message_inner(&mut t)).await;
1492
1493        // Close iteration span regardless of outcome (partial trace preserved on error).
1494        let span_status = if result.is_ok() {
1495            crate::debug_dump::trace::SpanStatus::Ok
1496        } else {
1497            crate::debug_dump::trace::SpanStatus::Error {
1498                message: "iteration failed".to_owned(),
1499            }
1500        };
1501        self.runtime.debug.end_iteration_span(turn_idx, span_status);
1502
1503        self.end_turn(t);
1504        result
1505    }
1506
1507    #[tracing::instrument(
1508        name = "core.agent.process_user_message_inner",
1509        skip_all,
1510        level = "debug",
1511        err
1512    )]
1513    async fn process_user_message_inner(
1514        &mut self,
1515        turn: &mut turn::Turn,
1516    ) -> Result<(), error::AgentError> {
1517        self.reap_background_tasks_and_update_metrics();
1518
1519        let tokens_before_turn = self
1520            .runtime
1521            .metrics
1522            .metrics_tx
1523            .as_ref()
1524            .map_or(0, |tx| tx.borrow().total_tokens);
1525
1526        // Drain any background shell completions that arrived since the last turn.
1527        // They are buffered in `pending_background_completions` and merged with the
1528        // real user message into a single user-role block below (N1 invariant).
1529        self.drain_background_completions();
1530
1531        self.wire_cancel_bridge(turn.cancel_token());
1532
1533        // Clone text out of Turn so we can hold both `&str` borrows and mutate turn.metrics.
1534        let text = turn.input.text.clone();
1535        let trimmed_owned = text.trim().to_owned();
1536        let trimmed = trimmed_owned.as_str();
1537
1538        // Capture current-turn intent for VIGIL gate (FR-007). Truncated to 1024 chars.
1539        // Must be set BEFORE any tool call; cleared at end_turn (FR-008).
1540        if self.services.security.vigil.is_some() {
1541            let intent_len = trimmed.floor_char_boundary(1024.min(trimmed.len()));
1542            self.services.session.current_turn_intent = Some(trimmed[..intent_len].to_owned());
1543        }
1544
1545        if let Some(result) = self.dispatch_slash_command(trimmed).await {
1546            return result;
1547        }
1548
1549        self.check_pending_rollbacks().await;
1550
1551        if self.pre_process_security(trimmed).await? {
1552            return Ok(());
1553        }
1554
1555        let t_ctx = std::time::Instant::now();
1556        tracing::debug!("turn timing: prepare_context start");
1557        self.advance_context_lifecycle_guarded(&text, trimmed).await;
1558        turn.metrics_mut().timings.prepare_context_ms =
1559            u64::try_from(t_ctx.elapsed().as_millis()).unwrap_or(u64::MAX);
1560        tracing::debug!(
1561            ms = turn.metrics_snapshot().timings.prepare_context_ms,
1562            "turn timing: prepare_context done"
1563        );
1564
1565        let image_parts = std::mem::take(&mut turn.input.image_parts);
1566        // Prepend any background completion blocks to the user text. All completions and the
1567        // user message MUST be merged into a single user-role block to satisfy the strict
1568        // user/assistant alternation rule (Anthropic Messages API — N1 invariant).
1569        let merged_text = self.build_user_message_text_with_bg_completions(&text);
1570        let user_msg = self.build_user_message(&merged_text, image_parts);
1571
1572        // Extract URLs from user input and add to user_provided_urls for grounding checks.
1573        // URL set was cleared in begin_turn; re-populate for this turn.
1574        let urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
1575        if !urls.is_empty() {
1576            self.services
1577                .security
1578                .user_provided_urls
1579                .write()
1580                .extend(urls);
1581        }
1582
1583        // Capture raw user input as goal text for A-MAC goal-conditioned write gating (#2483).
1584        // Derived from the raw input text before context assembly to avoid timing dependencies.
1585        self.services.memory.extraction.goal_text = Some(text.clone());
1586
1587        let t_persist = std::time::Instant::now();
1588        tracing::debug!("turn timing: persist_message(user) start");
1589        // Image parts intentionally excluded — base64 payloads too large for message history.
1590        self.persist_message(Role::User, &text, &[], false).await;
1591        turn.metrics_mut().timings.persist_message_ms =
1592            u64::try_from(t_persist.elapsed().as_millis()).unwrap_or(u64::MAX);
1593        tracing::debug!(
1594            ms = turn.metrics_snapshot().timings.persist_message_ms,
1595            "turn timing: persist_message(user) done"
1596        );
1597        self.push_message(user_msg);
1598
1599        // llm_chat_ms and tool_exec_ms are accumulated inside call_chat_with_tools and
1600        // handle_native_tool_calls respectively via metrics.pending_timings.
1601        tracing::debug!("turn timing: process_response start");
1602        let turn_had_error = if let Err(e) = self.process_response().await {
1603            // Detach any in-flight learning tasks before mutating message state.
1604            self.services.learning_engine.learning_tasks.detach_all();
1605            tracing::error!("Response processing failed: {e:#}");
1606
1607            // Record provider failure timestamp so the next turn can skip
1608            // expensive context preparation while providers are known-down.
1609            if e.is_no_providers() {
1610                self.runtime.lifecycle.last_no_providers_at = Some(std::time::Instant::now());
1611                let backoff_secs = self.runtime.config.timeouts.no_providers_backoff_secs;
1612                tracing::warn!(
1613                    backoff_secs,
1614                    "no providers available; backing off before next turn"
1615                );
1616                tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await;
1617            }
1618
1619            let user_msg = format!("Error: {e:#}");
1620            self.channel.send(&user_msg).await?;
1621            self.msg.messages.pop();
1622            self.recompute_prompt_tokens();
1623            self.channel.flush_chunks().await?;
1624            true
1625        } else {
1626            // Detach learning tasks spawned this turn — they are fire-and-forget and must not
1627            // leak into the next turn's context.
1628            self.services.learning_engine.learning_tasks.detach_all();
1629            self.truncate_old_tool_results();
1630            // MagicDocs: spawn background doc updates if any are due (#2702).
1631            self.maybe_update_magic_docs();
1632            // Compression spectrum: fire-and-forget promotion scan (#3305).
1633            self.maybe_spawn_promotion_scan();
1634            false
1635        };
1636        tracing::debug!("turn timing: process_response done");
1637
1638        // MARCH self-check hook: runs after every successful response, including cache-hit path.
1639        if let Some(pipeline) = self.services.quality.clone() {
1640            self.run_self_check_for_turn(pipeline, turn.id().0).await;
1641        }
1642        // Flush pending response chunks and emit ResponseEnd exactly once per turn.
1643        // send() no longer emits ResponseEnd — flush_chunks() is the sole emitter.
1644        // When self-check appends a flag_marker chunk, this single call covers both
1645        // the main response and the marker, preventing the double response_end of #3243.
1646        let _ = self.channel.flush_chunks().await;
1647
1648        self.maybe_fire_completion_notification(turn, turn_had_error);
1649
1650        self.flush_goal_accounting(tokens_before_turn);
1651
1652        // Collect llm_chat_ms and tool_exec_ms from MetricsState.pending_timings (accumulated
1653        // by the tool execution chain) into turn.metrics so end_turn can flush them.
1654        // This is the Phase 1 bridging: existing code writes to pending_timings directly;
1655        // we harvest those values into Turn before end_turn overwrites pending_timings.
1656        turn.metrics_mut().timings.llm_chat_ms = self.runtime.metrics.pending_timings.llm_chat_ms;
1657        turn.metrics_mut().timings.tool_exec_ms = self.runtime.metrics.pending_timings.tool_exec_ms;
1658
1659        Ok(())
1660    }
1661
1662    /// Wire the per-turn cancellation token into the cancel bridge.
1663    ///
1664    /// The bridge translates `cancel_signal` (Notify) into a `CancellationToken` cancel so that
1665    /// channel-level abort requests propagate to the in-flight LLM call. The previous bridge task
1666    /// is aborted before a new one is spawned to prevent unbounded accumulation (#2737).
1667    fn wire_cancel_bridge(&mut self, turn_token: &tokio_util::sync::CancellationToken) {
1668        let signal = Arc::clone(&self.runtime.lifecycle.cancel_signal);
1669        let token = turn_token.clone();
1670        // Keep lifecycle.cancel_token in sync so existing code that reads it still works.
1671        self.runtime.lifecycle.cancel_token = turn_token.clone();
1672        if let Some(prev) = self.runtime.lifecycle.cancel_bridge_handle.take() {
1673            prev.abort();
1674        }
1675        self.runtime.lifecycle.cancel_bridge_handle =
1676            Some(self.runtime.lifecycle.task_supervisor.spawn_oneshot(
1677                std::sync::Arc::from("agent.lifecycle.cancel_bridge"),
1678                move || async move {
1679                    signal.notified().await;
1680                    token.cancel();
1681                },
1682            ));
1683    }
1684
1685    /// Reap completed background tasks, apply summarization signal, and update supervisor metrics.
1686    ///
1687    /// Called at the top of each turn, before any user message processing.
1688    fn reap_background_tasks_and_update_metrics(&mut self) {
1689        let bg_signal = self.runtime.lifecycle.supervisor.reap();
1690        if bg_signal.did_summarize {
1691            self.services.memory.persistence.unsummarized_count = 0;
1692            tracing::debug!("background summarization completed; unsummarized_count reset");
1693        }
1694        let snap = self.runtime.lifecycle.supervisor.metrics_snapshot();
1695        self.update_metrics(|m| {
1696            m.bg_inflight = snap.inflight as u64;
1697            m.bg_dropped = snap.total_dropped();
1698            m.bg_completed = snap.total_completed();
1699            m.bg_enrichment_inflight = snap.class_inflight[0] as u64;
1700            m.bg_telemetry_inflight = snap.class_inflight[1] as u64;
1701        });
1702
1703        // Update shell background run rows for TUI panel.
1704        if self.runtime.lifecycle.shell_executor_handle.is_some() {
1705            let shell_rows: Vec<crate::metrics::ShellBackgroundRunRow> = self
1706                .runtime
1707                .lifecycle
1708                .shell_executor_handle
1709                .as_ref()
1710                .map(|e| e.background_runs_snapshot())
1711                .unwrap_or_default()
1712                .into_iter()
1713                .map(|s| crate::metrics::ShellBackgroundRunRow {
1714                    run_id: truncate_shell_run_id(&s.run_id),
1715                    command: truncate_shell_command(&s.command),
1716                    elapsed_secs: s.elapsed_ms / 1000,
1717                })
1718                .collect();
1719            self.update_metrics(|m| {
1720                m.shell_background_runs = shell_rows;
1721            });
1722        }
1723
1724        // Intentional ordering: reap() runs before abort_class() so completed tasks are
1725        // accounted in the snapshot above.
1726        if self
1727            .runtime
1728            .config
1729            .supervisor_config
1730            .abort_enrichment_on_turn
1731        {
1732            self.runtime
1733                .lifecycle
1734                .supervisor
1735                .abort_class(agent_supervisor::TaskClass::Enrichment);
1736        }
1737    }
1738
1739    /// Fire completion notifications and `turn_complete` hooks after each turn.
1740    ///
1741    /// Builds [`crate::notifications::TurnSummary`] once and reuses it for both the
1742    /// [`crate::notifications::Notifier`] and any `[[hooks.turn_complete]]` entries. The
1743    /// `preview` field is already redacted by [`Self::last_assistant_preview`], so hook
1744    /// env vars carry no raw assistant output.
1745    ///
1746    /// Gating:
1747    /// - When a `Notifier` is configured, both the notifier and hooks share its
1748    ///   `should_fire` gate (`min_turn_duration_ms`, `only_on_error`, `enabled`).
1749    /// - When no `Notifier` is configured, hooks fire on every turn completion (the
1750    ///   notifier path is simply skipped).
1751    fn maybe_fire_completion_notification(&mut self, turn: &turn::Turn, is_error: bool) {
1752        let snap = turn.metrics_snapshot().timings.clone();
1753        let duration_ms = snap
1754            .prepare_context_ms
1755            .saturating_add(snap.llm_chat_ms)
1756            .saturating_add(snap.tool_exec_ms);
1757        let summary = crate::notifications::TurnSummary {
1758            duration_ms,
1759            preview: self.last_assistant_preview(160),
1760            // TODO: wire turn_tool_calls counter once LifecycleState tracks it (Phase 2).
1761            tool_calls: 0,
1762            llm_requests: self.runtime.lifecycle.turn_llm_requests,
1763            exit_status: if is_error {
1764                crate::notifications::TurnExitStatus::Error
1765            } else {
1766                crate::notifications::TurnExitStatus::Success
1767            },
1768        };
1769
1770        // Gate evaluation: notifier's should_fire result (or unconditional when absent).
1771        let gate_ok = self
1772            .runtime
1773            .lifecycle
1774            .notifier
1775            .as_ref()
1776            .is_none_or(|n| n.should_fire(&summary));
1777
1778        // 1) Existing notifier path — unchanged semantics.
1779        if let Some(ref notifier) = self.runtime.lifecycle.notifier
1780            && gate_ok
1781        {
1782            notifier.fire(&summary);
1783        }
1784
1785        // 2) turn_complete hooks — fire-and-forget, matching the Notifier::fire pattern.
1786        // MCP dispatch is omitted: turn_complete hooks are expected to be Command-type
1787        // (shell scripts for desktop notifications, status-bar updates, etc.). McpTool
1788        // hooks in this context would require a Send + 'static MCP handle; defer that
1789        // extension if needed via a follow-up.
1790        let hooks = self.services.session.hooks_config.turn_complete.clone();
1791        if !hooks.is_empty() && gate_ok {
1792            let mut env = std::collections::HashMap::new();
1793            env.insert(
1794                "ZEPH_TURN_DURATION_MS".to_owned(),
1795                summary.duration_ms.to_string(),
1796            );
1797            env.insert(
1798                "ZEPH_TURN_STATUS".to_owned(),
1799                if is_error { "error" } else { "success" }.to_owned(),
1800            );
1801            env.insert("ZEPH_TURN_PREVIEW".to_owned(), summary.preview.clone());
1802            env.insert(
1803                "ZEPH_TURN_LLM_REQUESTS".to_owned(),
1804                summary.llm_requests.to_string(),
1805            );
1806            let _span = tracing::info_span!("core.agent.turn_hooks").entered();
1807            let _accepted = self.runtime.lifecycle.supervisor.spawn(
1808                agent_supervisor::TaskClass::Telemetry,
1809                "turn-complete-hooks",
1810                async move {
1811                    // Explicitly 'static so the future satisfies tokio::spawn's bound.
1812                    // None holds no actual reference; the annotation is vacuously satisfied.
1813                    let no_mcp: Option<&'static dyn zeph_subagent::McpDispatch> = None;
1814                    if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, no_mcp).await {
1815                        tracing::warn!(error = %e, "turn_complete hook failed");
1816                    }
1817                },
1818            );
1819        }
1820    }
1821
1822    /// Publish the active goal snapshot to `MetricsSnapshot` and fire `on_turn_complete`
1823    /// accounting as a tracked background task.
1824    fn flush_goal_accounting(&mut self, tokens_before: u64) {
1825        let goal_snap = self
1826            .services
1827            .goal_accounting
1828            .as_ref()
1829            .and_then(|a| a.snapshot());
1830        self.update_metrics(|m| m.active_goal = goal_snap);
1831
1832        if let Some(ref accounting) = self.services.goal_accounting {
1833            let tokens_after = self
1834                .runtime
1835                .metrics
1836                .metrics_tx
1837                .as_ref()
1838                .map_or(0, |tx| tx.borrow().total_tokens);
1839            let turn_tokens = tokens_after.saturating_sub(tokens_before);
1840            let mut spawned: Option<
1841                std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + 'static>>,
1842            > = None;
1843            accounting.on_turn_complete(turn_tokens, |fut| {
1844                spawned = Some(fut);
1845            });
1846            if let Some(fut) = spawned {
1847                let _ = self.runtime.lifecycle.supervisor.spawn(
1848                    agent_supervisor::TaskClass::Telemetry,
1849                    "goal-accounting",
1850                    fut,
1851                );
1852            }
1853        }
1854    }
1855
1856    // Returns true if the input was blocked and the caller should return Ok(()) immediately.
1857    #[tracing::instrument(
1858        name = "core.agent.pre_process_security",
1859        skip_all,
1860        level = "debug",
1861        err
1862    )]
1863    async fn pre_process_security(&mut self, trimmed: &str) -> Result<bool, error::AgentError> {
1864        // Guardrail: LLM-based prompt injection pre-screening at the user input boundary.
1865        if let Some(ref guardrail) = self.services.security.guardrail {
1866            use zeph_sanitizer::guardrail::GuardrailVerdict;
1867            let verdict = guardrail.check(trimmed).await;
1868            match &verdict {
1869                GuardrailVerdict::Flagged { reason, .. } => {
1870                    tracing::warn!(
1871                        reason = %reason,
1872                        should_block = verdict.should_block(),
1873                        "guardrail flagged user input"
1874                    );
1875                    if verdict.should_block() {
1876                        let msg = format!("[guardrail] Input blocked: {reason}");
1877                        let _ = self.channel.send(&msg).await;
1878                        let _ = self.channel.flush_chunks().await;
1879                        return Ok(true);
1880                    }
1881                    // Warn mode: notify but continue.
1882                    let _ = self
1883                        .channel
1884                        .send(&format!("[guardrail] Warning: {reason}"))
1885                        .await;
1886                }
1887                GuardrailVerdict::Error { error } => {
1888                    if guardrail.error_should_block() {
1889                        tracing::warn!(%error, "guardrail check failed (fail_strategy=closed), blocking input");
1890                        let msg = "[guardrail] Input blocked: check failed (see logs for details)";
1891                        let _ = self.channel.send(msg).await;
1892                        let _ = self.channel.flush_chunks().await;
1893                        return Ok(true);
1894                    }
1895                    tracing::warn!(%error, "guardrail check failed (fail_strategy=open), allowing input");
1896                }
1897                GuardrailVerdict::Safe => {}
1898            }
1899        }
1900
1901        // ML classifier: lightweight injection detection on user input boundary.
1902        // Runs after guardrail (LLM-based) to layer defenses. On detection, blocks and returns.
1903        // Falls back to regex on classifier error/timeout — never degrades below regex baseline.
1904        // Gated by `scan_user_input`: DeBERTa is tuned for external/untrusted content, not
1905        // direct user chat. Disabled by default to prevent false positives on benign messages.
1906        #[cfg(feature = "classifiers")]
1907        if self.services.security.sanitizer.scan_user_input() {
1908            match self
1909                .services
1910                .security
1911                .sanitizer
1912                .classify_injection(trimmed)
1913                .await
1914            {
1915                zeph_sanitizer::InjectionVerdict::Blocked => {
1916                    self.push_classifier_metrics();
1917                    let _ = self
1918                        .channel
1919                        .send("[security] Input blocked: injection detected by classifier.")
1920                        .await;
1921                    let _ = self.channel.flush_chunks().await;
1922                    return Ok(true);
1923                }
1924                zeph_sanitizer::InjectionVerdict::Suspicious => {
1925                    tracing::warn!("injection_classifier soft_signal on user input");
1926                }
1927                zeph_sanitizer::InjectionVerdict::Clean => {}
1928            }
1929        }
1930        #[cfg(feature = "classifiers")]
1931        self.push_classifier_metrics();
1932
1933        Ok(false)
1934    }
1935
1936    /// Run `advance_context_lifecycle` with provider-health gating and a wall-clock timeout.
1937    ///
1938    /// Skips context preparation entirely when providers failed on the previous turn and the
1939    /// `no_providers_backoff_secs` window has not yet elapsed. When providers are available,
1940    /// wraps the call with `context_prep_timeout_secs` to prevent a stall when embed backends
1941    /// are rate-limited or unavailable (#3357).
1942    async fn advance_context_lifecycle_guarded(&mut self, text: &str, trimmed: &str) {
1943        let backoff_secs = self.runtime.config.timeouts.no_providers_backoff_secs;
1944        let prep_timeout_secs = self.runtime.config.timeouts.context_prep_timeout_secs;
1945
1946        // Skip expensive memory recall / embedding when providers are known-down.
1947        let providers_recently_failed = self
1948            .runtime
1949            .lifecycle
1950            .last_no_providers_at
1951            .is_some_and(|t| t.elapsed().as_secs() < backoff_secs);
1952
1953        if providers_recently_failed {
1954            tracing::warn!(
1955                backoff_secs,
1956                "skipping context preparation: providers were unavailable on last turn"
1957            );
1958            return;
1959        }
1960
1961        let timeout_dur = std::time::Duration::from_secs(prep_timeout_secs);
1962        match tokio::time::timeout(timeout_dur, self.advance_context_lifecycle(text, trimmed)).await
1963        {
1964            Ok(()) => {}
1965            Err(_elapsed) => {
1966                tracing::warn!(
1967                    timeout_secs = prep_timeout_secs,
1968                    "context preparation timed out; proceeding with degraded context"
1969                );
1970            }
1971        }
1972    }
1973
1974    #[tracing::instrument(
1975        name = "core.agent.advance_context_lifecycle",
1976        skip_all,
1977        level = "debug"
1978    )]
1979    async fn advance_context_lifecycle(&mut self, text: &str, trimmed: &str) {
1980        // Reset per-message pruning cache at the start of each turn (#2298).
1981        self.services.mcp.pruning_cache.reset();
1982
1983        // Extract before rebuild_system_prompt so the value is not tainted
1984        // by the secrets-bearing system prompt (ConversationId is just an i64).
1985        let conv_id = self.services.memory.persistence.conversation_id;
1986        self.rebuild_system_prompt(text).await;
1987
1988        self.detect_and_record_corrections(trimmed, conv_id).await;
1989        self.services.learning_engine.tick();
1990        self.analyze_and_learn().await;
1991        self.sync_graph_counts().await;
1992
1993        // Reset per-turn compaction guard FIRST so SideQuest sees a clean slate (C2 fix).
1994        // complete_focus and maybe_sidequest_eviction set this flag when they run (C1 fix).
1995        // advance_turn() transitions CompactedThisTurn → Cooling/Ready; all other states
1996        // pass through unchanged. See CompactionState::advance_turn for ordering guarantees.
1997        self.context_manager.compaction = self.context_manager.compaction.advance_turn();
1998
1999        // Tick Focus Agent and SideQuest turn counters (#1850, #1885).
2000        {
2001            self.services.focus.tick();
2002
2003            // SideQuest eviction: runs every N user turns when enabled.
2004            // Skipped when is_compacted_this_turn (focus truncation or prior eviction ran).
2005            let sidequest_should_fire = self.services.sidequest.tick();
2006            if sidequest_should_fire && !self.context_manager.compaction.is_compacted_this_turn() {
2007                self.maybe_sidequest_eviction();
2008            }
2009        }
2010
2011        // Experience memory: evolution sweep (fire-and-forget). Runs every N user turns,
2012        // gated on graph + experience config, and only when both stores are attached.
2013        {
2014            let cfg = &self.services.memory.extraction.graph_config.experience;
2015            if cfg.enabled
2016                && cfg.evolution_sweep_enabled
2017                && cfg.evolution_sweep_interval > 0
2018                && self
2019                    .services
2020                    .sidequest
2021                    .turn_counter
2022                    .checked_rem(cfg.evolution_sweep_interval as u64)
2023                    == Some(0)
2024                && let Some(memory) = self.services.memory.persistence.memory.as_ref()
2025                && let (Some(exp), Some(graph)) =
2026                    (memory.experience.as_ref(), memory.graph_store.as_ref())
2027            {
2028                let exp = std::sync::Arc::clone(exp);
2029                let graph = std::sync::Arc::clone(graph);
2030                let threshold = cfg.confidence_prune_threshold;
2031                let turn = self.services.sidequest.turn_counter;
2032                let accepted = self.runtime.lifecycle.supervisor.spawn(
2033                    agent_supervisor::TaskClass::Telemetry,
2034                    "experience-sweep",
2035                    async move {
2036                        match exp.evolution_sweep(graph.as_ref(), threshold).await {
2037                            Ok(stats) => tracing::info!(
2038                                turn,
2039                                self_loops = stats.pruned_self_loops,
2040                                low_confidence = stats.pruned_low_confidence,
2041                                "evolution sweep complete",
2042                            ),
2043                            Err(e) => tracing::warn!(
2044                                turn,
2045                                error = %e,
2046                                "evolution sweep failed",
2047                            ),
2048                        }
2049                    },
2050                );
2051                if !accepted {
2052                    tracing::warn!(
2053                        turn = self.services.sidequest.turn_counter,
2054                        "experience-sweep dropped (telemetry class at capacity)",
2055                    );
2056                }
2057            }
2058        }
2059
2060        // Cache-expiry warning (#2715): notify user when prompt cache has likely expired.
2061        if let Some(warning) = self.cache_expiry_warning() {
2062            tracing::info!(warning, "cache expiry warning");
2063            let _ = self.channel.send_status(&warning).await;
2064        }
2065
2066        // Time-based microcompact (#2699): strip stale low-value tool outputs before compaction.
2067        // Zero-LLM-cost; runs only when session gap exceeds configured threshold.
2068        self.maybe_time_based_microcompact();
2069
2070        // Tier 0: batch-apply deferred tool summaries when approaching context limit.
2071        // This is a pure in-memory operation (no LLM call) — summaries were pre-computed
2072        // during the tool loop. Intentionally does NOT set compacted_this_turn, so
2073        // proactive/reactive compaction may still fire if tokens remain above their thresholds.
2074        self.maybe_apply_deferred_summaries();
2075        self.flush_deferred_summaries().await;
2076
2077        // Proactive compression fires first (if configured); if it runs, reactive is skipped.
2078        if let Err(e) = self.maybe_proactive_compress().await {
2079            tracing::warn!("proactive compression failed: {e:#}");
2080        }
2081
2082        if let Err(e) = self.maybe_compact().await {
2083            tracing::warn!("context compaction failed: {e:#}");
2084        }
2085
2086        if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
2087            tracing::warn!("context preparation failed: {e:#}");
2088        }
2089
2090        // MAR: propagate top-1 recall confidence to the router for cost-aware routing.
2091        self.provider
2092            .set_memory_confidence(self.services.memory.persistence.last_recall_confidence);
2093
2094        self.services.learning_engine.reset_reflection();
2095    }
2096
2097    fn build_user_message(
2098        &mut self,
2099        text: &str,
2100        image_parts: Vec<zeph_llm::provider::MessagePart>,
2101    ) -> Message {
2102        let mut all_image_parts = std::mem::take(&mut self.msg.pending_image_parts);
2103        all_image_parts.extend(image_parts);
2104
2105        if !all_image_parts.is_empty() && self.provider.supports_vision() {
2106            let mut parts = vec![zeph_llm::provider::MessagePart::Text {
2107                text: text.to_owned(),
2108            }];
2109            parts.extend(all_image_parts);
2110            Message::from_parts(Role::User, parts)
2111        } else {
2112            if !all_image_parts.is_empty() {
2113                tracing::warn!(
2114                    count = all_image_parts.len(),
2115                    "image attachments dropped: provider does not support vision"
2116                );
2117            }
2118            Message {
2119                role: Role::User,
2120                content: text.to_owned(),
2121                parts: vec![],
2122                metadata: MessageMetadata::default(),
2123            }
2124        }
2125    }
2126
2127    /// Drain any ready [`zeph_tools::BackgroundCompletion`]s from the channel into
2128    /// `pending_background_completions`. Bounded by `BACKGROUND_COMPLETION_BUFFER_CAP`;
2129    /// on overflow the oldest entry is evicted and a placeholder is inserted.
2130    fn drain_background_completions(&mut self) {
2131        const BACKGROUND_COMPLETION_BUFFER_CAP: usize = 16;
2132
2133        let Some(ref mut rx) = self.runtime.lifecycle.background_completion_rx else {
2134            return;
2135        };
2136        // Non-blocking drain: collect all completions that are already ready.
2137        while let Ok(completion) = rx.try_recv() {
2138            if self.runtime.lifecycle.pending_background_completions.len()
2139                >= BACKGROUND_COMPLETION_BUFFER_CAP
2140            {
2141                tracing::warn!(
2142                    run_id = %completion.run_id,
2143                    "background completion buffer full; dropping run result"
2144                );
2145                // Buffer is full: drop the oldest queued completion and push a sentinel
2146                // for the new (incoming) run so the LLM is informed its result was lost.
2147                self.runtime
2148                    .lifecycle
2149                    .pending_background_completions
2150                    .pop_front();
2151                self.runtime
2152                    .lifecycle
2153                    .pending_background_completions
2154                    .push_back(zeph_tools::BackgroundCompletion {
2155                        run_id: completion.run_id,
2156                        exit_code: -1,
2157                        success: false,
2158                        elapsed_ms: 0,
2159                        command: completion.command,
2160                        output: format!(
2161                            "[background result for run {} dropped: buffer overflow]",
2162                            completion.run_id
2163                        ),
2164                    });
2165            } else {
2166                self.runtime
2167                    .lifecycle
2168                    .pending_background_completions
2169                    .push_back(completion);
2170            }
2171        }
2172    }
2173
2174    /// Format and drain `pending_background_completions` into a prefix string, then
2175    /// return the final merged text (prefix + user message). When there are no pending
2176    /// completions the original text is returned unchanged.
2177    fn build_user_message_text_with_bg_completions(&mut self, user_text: &str) -> String {
2178        if self
2179            .runtime
2180            .lifecycle
2181            .pending_background_completions
2182            .is_empty()
2183        {
2184            return user_text.to_owned();
2185        }
2186        let mut parts = String::new();
2187        for completion in self
2188            .runtime
2189            .lifecycle
2190            .pending_background_completions
2191            .drain(..)
2192        {
2193            let _ = write!(
2194                parts,
2195                "[Background task {} completed]\nexit_code: {}\nsuccess: {}\nelapsed_ms: {}\ncommand: {}\n\n{}\n\n",
2196                completion.run_id,
2197                completion.exit_code,
2198                completion.success,
2199                completion.elapsed_ms,
2200                completion.command,
2201                completion.output,
2202            );
2203        }
2204        parts.push_str(user_text);
2205        parts
2206    }
2207
2208    /// Poll a sub-agent until it reaches a terminal state, bridging secret requests to the
2209    /// channel. Returns a human-readable status string suitable for sending to the user.
2210    async fn poll_subagent_until_done(&mut self, task_id: &str, label: &str) -> Option<String> {
2211        use zeph_subagent::SubAgentState;
2212        let result = loop {
2213            tokio::time::sleep(std::time::Duration::from_millis(500)).await;
2214
2215            // Bridge secret requests from sub-agent to channel.confirm().
2216            // Fetch the pending request first, then release the borrow before
2217            // calling channel.confirm() (which requires &mut self).
2218            #[allow(clippy::redundant_closure_for_method_calls)]
2219            let pending = self
2220                .services
2221                .orchestration
2222                .subagent_manager
2223                .as_mut()
2224                .and_then(|m| m.try_recv_secret_request());
2225            if let Some((req_task_id, req)) = pending {
2226                // req.secret_key is pre-validated to [a-zA-Z0-9_-] in manager.rs
2227                // (SEC-P1-02), so it is safe to embed in the prompt string.
2228                let confirm_prompt = format!(
2229                    "Sub-agent requests secret '{}'. Allow?",
2230                    crate::text::truncate_to_chars(&req.secret_key, 100)
2231                );
2232                let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
2233                if let Some(mgr) = self.services.orchestration.subagent_manager.as_mut() {
2234                    if approved {
2235                        let ttl = std::time::Duration::from_mins(5);
2236                        let key = req.secret_key.clone();
2237                        if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
2238                            let _ = mgr.deliver_secret(&req_task_id, key);
2239                        }
2240                    } else {
2241                        let _ = mgr.deny_secret(&req_task_id);
2242                    }
2243                }
2244            }
2245
2246            let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2247            let statuses = mgr.statuses();
2248            let Some((_, status)) = statuses.iter().find(|(id, _)| id == task_id) else {
2249                break format!("{label} completed (no status available).");
2250            };
2251            match status.state {
2252                SubAgentState::Completed => {
2253                    let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
2254                    break format!("{label} completed: {msg}");
2255                }
2256                SubAgentState::Failed => {
2257                    let msg = status
2258                        .last_message
2259                        .clone()
2260                        .unwrap_or_else(|| "unknown error".into());
2261                    break format!("{label} failed: {msg}");
2262                }
2263                SubAgentState::Canceled => {
2264                    break format!("{label} was cancelled.");
2265                }
2266                _ => {
2267                    let _ = self
2268                        .channel
2269                        .send_status(&format!(
2270                            "{label}: turn {}/{}",
2271                            status.turns_used,
2272                            self.services
2273                                .orchestration
2274                                .subagent_manager
2275                                .as_ref()
2276                                .and_then(|m| m.agents_def(task_id))
2277                                .map_or(20, |d| d.permissions.max_turns)
2278                        ))
2279                        .await;
2280                }
2281            }
2282        };
2283        Some(result)
2284    }
2285
2286    /// Resolve a unique full `task_id` from a prefix. Returns `None` if the manager is absent,
2287    /// `Some(Err(msg))` on ambiguity/not-found, `Some(Ok(full_id))` on success.
2288    fn resolve_agent_id_prefix(&mut self, prefix: &str) -> Option<Result<String, String>> {
2289        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2290        let full_ids: Vec<String> = mgr
2291            .statuses()
2292            .into_iter()
2293            .map(|(tid, _)| tid)
2294            .filter(|tid| tid.starts_with(prefix))
2295            .collect();
2296        Some(match full_ids.as_slice() {
2297            [] => Err(format!("No sub-agent with id prefix '{prefix}'")),
2298            [fid] => Ok(fid.clone()),
2299            _ => Err(format!(
2300                "Ambiguous id prefix '{prefix}': matches {} agents",
2301                full_ids.len()
2302            )),
2303        })
2304    }
2305
2306    fn handle_agent_list(&self) -> Option<String> {
2307        use std::fmt::Write as _;
2308        let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2309        let defs = mgr.definitions();
2310        if defs.is_empty() {
2311            return Some("No sub-agent definitions found.".into());
2312        }
2313        let mut out = String::from("Available sub-agents:\n");
2314        for d in defs {
2315            let memory_label = match d.memory {
2316                Some(zeph_subagent::MemoryScope::User) => " [memory:user]",
2317                Some(zeph_subagent::MemoryScope::Project) => " [memory:project]",
2318                Some(zeph_subagent::MemoryScope::Local) => " [memory:local]",
2319                None => "",
2320            };
2321            if let Some(ref src) = d.source {
2322                let _ = writeln!(
2323                    out,
2324                    "  {}{} — {} ({})",
2325                    d.name, memory_label, d.description, src
2326                );
2327            } else {
2328                let _ = writeln!(out, "  {}{} — {}", d.name, memory_label, d.description);
2329            }
2330        }
2331        Some(out)
2332    }
2333
2334    fn handle_agent_status(&self) -> Option<String> {
2335        use std::fmt::Write as _;
2336        let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2337        let statuses = mgr.statuses();
2338        if statuses.is_empty() {
2339            return Some("No active sub-agents.".into());
2340        }
2341        let mut out = String::from("Active sub-agents:\n");
2342        for (id, s) in &statuses {
2343            let state = format!("{:?}", s.state).to_lowercase();
2344            let elapsed = s.started_at.elapsed().as_secs();
2345            let _ = writeln!(
2346                out,
2347                "  [{short}] {state}  turns={t}  elapsed={elapsed}s  {msg}",
2348                short = &id[..8.min(id.len())],
2349                t = s.turns_used,
2350                msg = s.last_message.as_deref().unwrap_or(""),
2351            );
2352            // Show memory directory path for agents with memory enabled.
2353            if let Some(def) = mgr.agents_def(id)
2354                && let Some(scope) = def.memory
2355                && let Ok(dir) = zeph_subagent::memory::resolve_memory_dir(scope, &def.name)
2356            {
2357                let _ = writeln!(out, "       memory: {}", dir.display());
2358            }
2359        }
2360        Some(out)
2361    }
2362
2363    fn handle_agent_approve(&mut self, id: &str) -> Option<String> {
2364        let full_id = match self.resolve_agent_id_prefix(id)? {
2365            Ok(fid) => fid,
2366            Err(msg) => return Some(msg),
2367        };
2368        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2369        if let Some((tid, req)) = mgr.try_recv_secret_request()
2370            && tid == full_id
2371        {
2372            let key = req.secret_key.clone();
2373            let ttl = std::time::Duration::from_mins(5);
2374            if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
2375                return Some(format!("Approve failed: {e}"));
2376            }
2377            if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
2378                return Some(format!("Secret delivery failed: {e}"));
2379            }
2380            return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
2381        }
2382        Some(format!(
2383            "No pending secret request for sub-agent '{full_id}'."
2384        ))
2385    }
2386
2387    fn handle_agent_deny(&mut self, id: &str) -> Option<String> {
2388        let full_id = match self.resolve_agent_id_prefix(id)? {
2389            Ok(fid) => fid,
2390            Err(msg) => return Some(msg),
2391        };
2392        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2393        match mgr.deny_secret(&full_id) {
2394            Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
2395            Err(e) => Some(format!("Deny failed: {e}")),
2396        }
2397    }
2398
2399    async fn handle_agent_command(&mut self, cmd: zeph_subagent::AgentCommand) -> Option<String> {
2400        use zeph_subagent::AgentCommand;
2401
2402        match cmd {
2403            AgentCommand::List => self.handle_agent_list(),
2404            AgentCommand::Background { name, prompt } => {
2405                self.handle_agent_background(&name, &prompt)
2406            }
2407            AgentCommand::Spawn { name, prompt }
2408            | AgentCommand::Mention {
2409                agent: name,
2410                prompt,
2411            } => self.handle_agent_spawn_foreground(&name, &prompt).await,
2412            AgentCommand::Status => self.handle_agent_status(),
2413            AgentCommand::Cancel { id } => self.handle_agent_cancel(&id),
2414            AgentCommand::Approve { id } => self.handle_agent_approve(&id),
2415            AgentCommand::Deny { id } => self.handle_agent_deny(&id),
2416            AgentCommand::Resume { id, prompt } => self.handle_agent_resume(&id, &prompt).await,
2417        }
2418    }
2419
2420    fn handle_agent_background(&mut self, name: &str, prompt: &str) -> Option<String> {
2421        let provider = self.provider.clone();
2422        let tool_executor = Arc::clone(&self.tool_executor);
2423        let skills = self.filtered_skills_for(name);
2424        let cfg = self.services.orchestration.subagent_config.clone();
2425        let spawn_ctx = self.build_spawn_context(&cfg);
2426        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2427        match mgr.spawn(
2428            name,
2429            prompt,
2430            provider,
2431            tool_executor,
2432            skills,
2433            &cfg,
2434            spawn_ctx,
2435        ) {
2436            Ok(id) => Some(format!(
2437                "Sub-agent '{name}' started in background (id: {short})",
2438                short = &id[..8.min(id.len())]
2439            )),
2440            Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
2441        }
2442    }
2443
2444    async fn handle_agent_spawn_foreground(&mut self, name: &str, prompt: &str) -> Option<String> {
2445        let provider = self.provider.clone();
2446        let tool_executor = Arc::clone(&self.tool_executor);
2447        let skills = self.filtered_skills_for(name);
2448        let cfg = self.services.orchestration.subagent_config.clone();
2449        let spawn_ctx = self.build_spawn_context(&cfg);
2450        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2451        let task_id = match mgr.spawn(
2452            name,
2453            prompt,
2454            provider,
2455            tool_executor,
2456            skills,
2457            &cfg,
2458            spawn_ctx,
2459        ) {
2460            Ok(id) => id,
2461            Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
2462        };
2463        let short = task_id[..8.min(task_id.len())].to_owned();
2464        let _ = self
2465            .channel
2466            .send(&format!("Sub-agent '{name}' running... (id: {short})"))
2467            .await;
2468        let label = format!("Sub-agent '{name}'");
2469        self.poll_subagent_until_done(&task_id, &label).await
2470    }
2471
2472    fn handle_agent_cancel(&mut self, id: &str) -> Option<String> {
2473        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2474        // Accept prefix match on task_id.
2475        let ids: Vec<String> = mgr
2476            .statuses()
2477            .into_iter()
2478            .map(|(task_id, _)| task_id)
2479            .filter(|task_id| task_id.starts_with(id))
2480            .collect();
2481        match ids.as_slice() {
2482            [] => Some(format!("No sub-agent with id prefix '{id}'")),
2483            [full_id] => {
2484                let full_id = full_id.clone();
2485                match mgr.cancel(&full_id) {
2486                    Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
2487                    Err(e) => Some(format!("Cancel failed: {e}")),
2488                }
2489            }
2490            _ => Some(format!(
2491                "Ambiguous id prefix '{id}': matches {} agents",
2492                ids.len()
2493            )),
2494        }
2495    }
2496
2497    async fn handle_agent_resume(&mut self, id: &str, prompt: &str) -> Option<String> {
2498        let cfg = self.services.orchestration.subagent_config.clone();
2499        // Resolve definition name from transcript meta before spawning so we can
2500        // look up skills by definition name rather than the UUID prefix (S1 fix).
2501        let def_name = {
2502            let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2503            match mgr.def_name_for_resume(id, &cfg) {
2504                Ok(name) => name,
2505                Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2506            }
2507        };
2508        let skills = self.filtered_skills_for(&def_name);
2509        let provider = self.provider.clone();
2510        let tool_executor = Arc::clone(&self.tool_executor);
2511        let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2512        let (task_id, _) = match mgr.resume(id, prompt, provider, tool_executor, skills, &cfg) {
2513            Ok(pair) => pair,
2514            Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2515        };
2516        let short = task_id[..8.min(task_id.len())].to_owned();
2517        let _ = self
2518            .channel
2519            .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
2520            .await;
2521        self.poll_subagent_until_done(&task_id, "Resumed sub-agent")
2522            .await
2523    }
2524
2525    fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
2526        let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2527        let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
2528        let reg = self.services.skill.registry.read();
2529        match zeph_subagent::filter_skills(&reg, &def.skills) {
2530            Ok(skills) => {
2531                let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
2532                if bodies.is_empty() {
2533                    None
2534                } else {
2535                    Some(bodies)
2536                }
2537            }
2538            Err(e) => {
2539                tracing::warn!(error = %e, "skill filtering failed for sub-agent");
2540                None
2541            }
2542        }
2543    }
2544
2545    /// Build a `SpawnContext` from current agent state for sub-agent spawning.
2546    fn build_spawn_context(
2547        &self,
2548        cfg: &zeph_config::SubAgentConfig,
2549    ) -> zeph_subagent::SpawnContext {
2550        zeph_subagent::SpawnContext {
2551            parent_messages: self.extract_parent_messages(cfg),
2552            parent_cancel: Some(self.runtime.lifecycle.cancel_token.clone()),
2553            parent_provider_name: {
2554                let name = &self.runtime.config.active_provider_name;
2555                if name.is_empty() {
2556                    None
2557                } else {
2558                    Some(name.clone())
2559                }
2560            },
2561            spawn_depth: self.runtime.config.spawn_depth,
2562            mcp_tool_names: self.extract_mcp_tool_names(),
2563            // F3 spec 050 §4: propagate seeded score when parent is >= Elevated.
2564            seed_trajectory_score: {
2565                let child = self.services.security.trajectory.spawn_child();
2566                let score = child.score_now();
2567                if score > 0.0 { Some(score) } else { None }
2568            },
2569        }
2570    }
2571
2572    /// Extract recent parent messages for history propagation (Section 5.7 in spec).
2573    ///
2574    /// Filters system messages, takes last `context_window_turns * 2` messages,
2575    /// and applies a 25% context window cap using a 4-chars-per-token heuristic.
2576    fn extract_parent_messages(
2577        &self,
2578        config: &zeph_config::SubAgentConfig,
2579    ) -> Vec<zeph_llm::provider::Message> {
2580        use zeph_llm::provider::Role;
2581        if config.context_window_turns == 0 {
2582            return Vec::new();
2583        }
2584        let non_system: Vec<_> = self
2585            .msg
2586            .messages
2587            .iter()
2588            .filter(|m| m.role != Role::System)
2589            .cloned()
2590            .collect();
2591        let take_count = config.context_window_turns * 2;
2592        let start = non_system.len().saturating_sub(take_count);
2593        let mut msgs = non_system[start..].to_vec();
2594
2595        // Cap at 25% of model context window (rough 4-chars-per-token heuristic).
2596        let max_chars = 128_000usize / 4; // conservative default; 25% of 128K tokens
2597        let mut total_chars: usize = 0;
2598        let mut keep = msgs.len();
2599        for (i, m) in msgs.iter().enumerate() {
2600            total_chars += m.content.len();
2601            if total_chars > max_chars {
2602                keep = i;
2603                break;
2604            }
2605        }
2606        if keep < msgs.len() {
2607            tracing::info!(
2608                kept = keep,
2609                requested = config.context_window_turns * 2,
2610                "[subagent] truncated parent history from {} to {} turns due to token budget",
2611                config.context_window_turns * 2,
2612                keep
2613            );
2614            msgs.truncate(keep);
2615        }
2616        msgs
2617    }
2618
2619    /// Extract MCP tool names from the tool executor for diagnostic annotation.
2620    fn extract_mcp_tool_names(&self) -> Vec<String> {
2621        self.tool_executor
2622            .tool_definitions_erased()
2623            .into_iter()
2624            .filter(|t| t.id.starts_with("mcp_"))
2625            .map(|t| t.id.to_string())
2626            .collect()
2627    }
2628
2629    /// Classify a skill directory's source kind using on-disk markers and the bundled allowlist.
2630    ///
2631    /// Must be called from a blocking context (uses synchronous FS I/O).
2632    fn classify_source_kind(
2633        skill_dir: &std::path::Path,
2634        managed_dir: Option<&std::path::PathBuf>,
2635        bundled_names: &std::collections::HashSet<String>,
2636    ) -> zeph_memory::store::SourceKind {
2637        if managed_dir.is_some_and(|d| skill_dir.starts_with(d)) {
2638            let skill_name = skill_dir.file_name().and_then(|n| n.to_str()).unwrap_or("");
2639            let has_marker = skill_dir.join(".bundled").exists();
2640            if has_marker && bundled_names.contains(skill_name) {
2641                zeph_memory::store::SourceKind::Bundled
2642            } else {
2643                if has_marker {
2644                    tracing::warn!(
2645                        skill = %skill_name,
2646                        "skill has .bundled marker but is not in the bundled skill \
2647                         allowlist — classifying as Hub"
2648                    );
2649                }
2650                zeph_memory::store::SourceKind::Hub
2651            }
2652        } else {
2653            zeph_memory::store::SourceKind::Local
2654        }
2655    }
2656
2657    /// Update trust DB records for all reloaded skills.
2658    async fn update_trust_for_reloaded_skills(
2659        &mut self,
2660        all_meta: &[zeph_skills::loader::SkillMeta],
2661    ) {
2662        // Clone Arc before any .await so no &self fields are held across suspension points.
2663        let memory = self.services.memory.persistence.memory.clone();
2664        let Some(memory) = memory else {
2665            return;
2666        };
2667        let trust_cfg = self.services.skill.trust_config.clone();
2668        let managed_dir = self.services.skill.managed_dir.clone();
2669        let bundled_names: std::collections::HashSet<String> =
2670            zeph_skills::bundled_skill_names().into_iter().collect();
2671        for meta in all_meta {
2672            // Compute hash and classify source_kind in spawn_blocking — both are blocking FS calls
2673            // (.bundled marker .exists() and compute_skill_hash both do std::fs I/O).
2674            let skill_dir = meta.skill_dir.clone();
2675            let managed_dir_ref = managed_dir.clone();
2676            let bundled_names_ref = bundled_names.clone();
2677            let fs_result: Option<(String, zeph_memory::store::SourceKind)> =
2678                tokio::task::spawn_blocking(move || {
2679                    let hash = zeph_skills::compute_skill_hash(&skill_dir).ok()?;
2680                    let source_kind = Self::classify_source_kind(
2681                        &skill_dir,
2682                        managed_dir_ref.as_ref(),
2683                        &bundled_names_ref,
2684                    );
2685                    Some((hash, source_kind))
2686                })
2687                .await
2688                .unwrap_or(None);
2689
2690            let Some((current_hash, source_kind)) = fs_result else {
2691                tracing::warn!("failed to compute hash for '{}'", meta.name);
2692                continue;
2693            };
2694            let initial_level = match source_kind {
2695                zeph_memory::store::SourceKind::Bundled => &trust_cfg.bundled_level,
2696                zeph_memory::store::SourceKind::Hub => &trust_cfg.default_level,
2697                zeph_memory::store::SourceKind::Local | zeph_memory::store::SourceKind::File => {
2698                    &trust_cfg.local_level
2699                }
2700            };
2701            let existing = memory
2702                .sqlite()
2703                .load_skill_trust(&meta.name)
2704                .await
2705                .ok()
2706                .flatten();
2707            let trust_level_str = if let Some(ref row) = existing {
2708                if row.blake3_hash != current_hash {
2709                    trust_cfg.hash_mismatch_level.to_string()
2710                } else if row.source_kind != source_kind {
2711                    // source_kind changed (e.g., hub → bundled on upgrade).
2712                    // Never override an explicit operator block. For active trust levels,
2713                    // adopt the source-kind initial level when it grants more trust.
2714                    let stored = row
2715                        .trust_level
2716                        .parse::<zeph_common::SkillTrustLevel>()
2717                        .unwrap_or_else(|_| {
2718                            tracing::warn!(
2719                                skill = %meta.name,
2720                                raw = %row.trust_level,
2721                                "unrecognised trust_level in DB, treating as quarantined"
2722                            );
2723                            zeph_common::SkillTrustLevel::Quarantined
2724                        });
2725                    if !stored.is_active() || stored.severity() <= initial_level.severity() {
2726                        row.trust_level.clone()
2727                    } else {
2728                        initial_level.to_string()
2729                    }
2730                } else {
2731                    row.trust_level.clone()
2732                }
2733            } else {
2734                initial_level.to_string()
2735            };
2736            let source_path = meta.skill_dir.to_str();
2737            if let Err(e) = memory
2738                .sqlite()
2739                .upsert_skill_trust(
2740                    &meta.name,
2741                    &trust_level_str,
2742                    source_kind,
2743                    None,
2744                    source_path,
2745                    &current_hash,
2746                )
2747                .await
2748            {
2749                tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
2750            }
2751        }
2752    }
2753
2754    /// Rebuild or sync the in-memory skill matcher and BM25 index after a registry update.
2755    async fn rebuild_skill_matcher(&mut self, all_meta: &[&zeph_skills::loader::SkillMeta]) {
2756        let provider = self.embedding_provider.clone();
2757        let embed_timeout =
2758            std::time::Duration::from_secs(self.runtime.config.timeouts.embedding_seconds);
2759        let embed_fn = move |text: &str| -> zeph_skills::matcher::EmbedFuture {
2760            let owned = text.to_owned();
2761            let p = provider.clone();
2762            Box::pin(async move {
2763                if let Ok(result) = tokio::time::timeout(embed_timeout, p.embed(&owned)).await {
2764                    result
2765                } else {
2766                    tracing::warn!(
2767                        timeout_secs = embed_timeout.as_secs(),
2768                        "skill matcher: embedding timed out"
2769                    );
2770                    Err(zeph_llm::LlmError::Timeout)
2771                }
2772            })
2773        };
2774
2775        let needs_inmemory_rebuild = !self
2776            .services
2777            .skill
2778            .matcher
2779            .as_ref()
2780            .is_some_and(SkillMatcherBackend::is_qdrant);
2781
2782        if needs_inmemory_rebuild {
2783            self.services.skill.matcher = SkillMatcher::new(all_meta, embed_fn)
2784                .await
2785                .map(SkillMatcherBackend::InMemory);
2786        } else if let Some(ref mut backend) = self.services.skill.matcher {
2787            let _ = self.channel.send_status("syncing skill index...").await;
2788            let on_progress: Option<Box<dyn Fn(usize, usize) + Send>> =
2789                self.services.session.status_tx.clone().map(
2790                    |tx| -> Box<dyn Fn(usize, usize) + Send> {
2791                        Box::new(move |completed, total| {
2792                            let msg = format!("Syncing skills: {completed}/{total}");
2793                            let _ = tx.send(msg);
2794                        })
2795                    },
2796                );
2797            if let Err(e) = backend
2798                .sync(
2799                    all_meta,
2800                    &self.services.skill.embedding_model,
2801                    embed_fn,
2802                    on_progress,
2803                )
2804                .await
2805            {
2806                tracing::warn!("failed to sync skill embeddings: {e:#}");
2807            }
2808        }
2809
2810        if self.services.skill.hybrid_search {
2811            let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
2812            let _ = self.channel.send_status("rebuilding search index...").await;
2813            self.services.skill.rebuild_bm25(&descs);
2814        }
2815    }
2816
2817    #[tracing::instrument(name = "core.agent.reload_skills", skip_all, level = "debug")]
2818    async fn reload_skills(&mut self) {
2819        let old_fp = self.services.skill.fingerprint();
2820        let reload_paths = if let Some(ref supplier) = self.services.skill.plugin_dirs_supplier {
2821            let plugin_dirs = supplier();
2822            let mut paths = self.services.skill.skill_paths.clone();
2823            for dir in plugin_dirs {
2824                if !paths.contains(&dir) {
2825                    paths.push(dir);
2826                }
2827            }
2828            paths
2829        } else {
2830            self.services.skill.skill_paths.clone()
2831        };
2832        self.services.skill.registry.write().reload(&reload_paths);
2833        if self.services.skill.fingerprint() == old_fp {
2834            return;
2835        }
2836        let _ = self.channel.send_status("reloading skills...").await;
2837
2838        let all_meta = self
2839            .services
2840            .skill
2841            .registry
2842            .read()
2843            .all_meta()
2844            .into_iter()
2845            .cloned()
2846            .collect::<Vec<_>>();
2847
2848        self.update_trust_for_reloaded_skills(&all_meta).await;
2849
2850        let all_meta_refs = all_meta.iter().collect::<Vec<_>>();
2851        self.rebuild_skill_matcher(&all_meta_refs).await;
2852
2853        let all_skills: Vec<Skill> = {
2854            let reg = self.services.skill.registry.read();
2855            reg.all_meta()
2856                .iter()
2857                .filter_map(|m| reg.skill(&m.name).ok())
2858                .collect()
2859        };
2860        let trust_map = self.build_skill_trust_map().await;
2861        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
2862        let skills_prompt =
2863            state::SkillState::rebuild_prompt(&all_skills, &trust_map, &empty_health);
2864        self.services
2865            .skill
2866            .last_skills_prompt
2867            .clone_from(&skills_prompt);
2868        let system_prompt = build_system_prompt(&skills_prompt, None);
2869        if let Some(msg) = self.msg.messages.first_mut() {
2870            msg.content = system_prompt;
2871        }
2872
2873        let _ = self.channel.send_status("").await;
2874        tracing::info!(
2875            "reloaded {} skill(s)",
2876            self.services.skill.registry.read().all_meta().len()
2877        );
2878    }
2879
2880    fn reload_instructions(&mut self) {
2881        // Drain any additional queued events before reloading to avoid redundant reloads.
2882        if let Some(ref mut rx) = self.runtime.instructions.reload_rx {
2883            while rx.try_recv().is_ok() {}
2884        }
2885        let Some(ref state) = self.runtime.instructions.reload_state else {
2886            return;
2887        };
2888        let new_blocks = crate::instructions::load_instructions(
2889            &state.base_dir,
2890            &state.provider_kinds,
2891            &state.explicit_files,
2892            state.auto_detect,
2893        );
2894        let old_sources: std::collections::HashSet<_> = self
2895            .runtime
2896            .instructions
2897            .blocks
2898            .iter()
2899            .map(|b| &b.source)
2900            .collect();
2901        let new_sources: std::collections::HashSet<_> =
2902            new_blocks.iter().map(|b| &b.source).collect();
2903        for added in new_sources.difference(&old_sources) {
2904            tracing::info!(path = %added.display(), "instruction file added");
2905        }
2906        for removed in old_sources.difference(&new_sources) {
2907            tracing::info!(path = %removed.display(), "instruction file removed");
2908        }
2909        tracing::info!(
2910            old_count = self.runtime.instructions.blocks.len(),
2911            new_count = new_blocks.len(),
2912            "reloaded instruction files"
2913        );
2914        self.runtime.instructions.blocks = new_blocks;
2915    }
2916
2917    fn reload_config(&mut self) {
2918        let Some(path) = self.runtime.lifecycle.config_path.clone() else {
2919            return;
2920        };
2921        let Some(config) = self.load_config_with_overlay(&path) else {
2922            return;
2923        };
2924        let budget_tokens = resolve_context_budget(&config, &self.provider);
2925        self.runtime.config.security = config.security;
2926        self.runtime.config.timeouts = config.timeouts;
2927        self.runtime.config.redact_credentials = config.memory.redact_credentials;
2928        self.services.memory.persistence.history_limit = config.memory.history_limit;
2929        self.services.memory.persistence.recall_limit = config.memory.semantic.recall_limit;
2930        self.services.memory.compaction.summarization_threshold =
2931            config.memory.summarization_threshold;
2932        self.services.skill.max_active_skills = config.skills.max_active_skills.get();
2933        self.services.skill.disambiguation_threshold = config.skills.disambiguation_threshold;
2934        self.services.skill.min_injection_score = config.skills.min_injection_score;
2935        self.services.skill.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
2936        self.services.skill.hybrid_search = config.skills.hybrid_search;
2937        self.services.skill.two_stage_matching = config.skills.two_stage_matching;
2938        self.services.skill.confusability_threshold =
2939            config.skills.confusability_threshold.clamp(0.0, 1.0);
2940        config
2941            .skills
2942            .generation_provider
2943            .as_str()
2944            .clone_into(&mut self.services.skill.generation_provider_name);
2945        self.services.skill.generation_output_dir =
2946            config.skills.generation_output_dir.as_deref().map(|p| {
2947                if let Some(stripped) = p.strip_prefix("~/") {
2948                    dirs::home_dir()
2949                        .map_or_else(|| std::path::PathBuf::from(p), |h| h.join(stripped))
2950                } else {
2951                    std::path::PathBuf::from(p)
2952                }
2953            });
2954
2955        self.context_manager.budget = Some(
2956            ContextBudget::new(budget_tokens, 0.20).with_graph_enabled(config.memory.graph.enabled),
2957        );
2958
2959        {
2960            let graph_cfg = &config.memory.graph;
2961            if graph_cfg.rpe.enabled {
2962                // Re-create router only if it doesn't exist yet; preserve state on hot-reload.
2963                if self.services.memory.extraction.rpe_router.is_none() {
2964                    self.services.memory.extraction.rpe_router =
2965                        Some(std::sync::Mutex::new(zeph_memory::RpeRouter::new(
2966                            graph_cfg.rpe.threshold,
2967                            graph_cfg.rpe.max_skip_turns,
2968                        )));
2969                }
2970            } else {
2971                self.services.memory.extraction.rpe_router = None;
2972            }
2973            self.services.memory.extraction.graph_config = graph_cfg.clone();
2974        }
2975        self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
2976        self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
2977        self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
2978        self.context_manager.compaction_cooldown_turns = config.memory.compaction_cooldown_turns;
2979        self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
2980        self.context_manager.compression = config.memory.compression.clone();
2981        self.context_manager.routing = config.memory.store_routing.clone();
2982        // Resolve routing_classifier_provider from the provider pool (#2484).
2983        self.context_manager.store_routing_provider = if config
2984            .memory
2985            .store_routing
2986            .routing_classifier_provider
2987            .is_empty()
2988        {
2989            None
2990        } else {
2991            let resolved = self.resolve_background_provider(
2992                &config.memory.store_routing.routing_classifier_provider,
2993            );
2994            Some(std::sync::Arc::new(resolved))
2995        };
2996        self.services
2997            .memory
2998            .persistence
2999            .cross_session_score_threshold = config.memory.cross_session_score_threshold;
3000
3001        self.services.index.repo_map_tokens = config.index.repo_map_tokens;
3002        self.services.index.repo_map_ttl =
3003            std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
3004
3005        self.services
3006            .session
3007            .hooks_config
3008            .cwd_changed
3009            .clone_from(&config.hooks.cwd_changed);
3010        self.services
3011            .session
3012            .hooks_config
3013            .permission_denied
3014            .clone_from(&config.hooks.permission_denied);
3015        self.services
3016            .session
3017            .hooks_config
3018            .turn_complete
3019            .clone_from(&config.hooks.turn_complete);
3020        // file_changed_hooks require watcher restart to take effect — skipped here.
3021
3022        tracing::info!("config reloaded");
3023    }
3024
3025    /// Load config from disk, apply plugin overlays, and warn on shell divergence.
3026    ///
3027    /// Returns `None` when loading or overlay merge fails (caller keeps prior runtime state).
3028    fn load_config_with_overlay(&mut self, path: &std::path::Path) -> Option<Config> {
3029        let mut config = match Config::load(path) {
3030            Ok(c) => c,
3031            Err(e) => {
3032                tracing::warn!("config reload failed: {e:#}");
3033                return None;
3034            }
3035        };
3036
3037        // Re-apply plugin overlays. On error, keep previous runtime state intact.
3038        let new_overlay = if self.runtime.lifecycle.plugins_dir.as_os_str().is_empty() {
3039            None
3040        } else {
3041            match zeph_plugins::apply_plugin_config_overlays(
3042                &mut config,
3043                &self.runtime.lifecycle.plugins_dir,
3044            ) {
3045                Ok(o) => Some(o),
3046                Err(e) => {
3047                    tracing::warn!(
3048                        "plugin overlay merge failed during reload: {e:#}; \
3049                         keeping previous runtime state"
3050                    );
3051                    return None;
3052                }
3053            }
3054        };
3055
3056        // M4: detect shell-level divergence from the baked-in executor and warn loudly.
3057        // ShellExecutor is not rebuilt on hot-reload; only skill threshold is live.
3058        // A follow-up P2 issue tracks live-rebuild of ShellExecutor.
3059        if let Some(ref overlay) = new_overlay {
3060            self.warn_on_shell_overlay_divergence(overlay, &config);
3061        }
3062        Some(config)
3063    }
3064
3065    /// React to shell policy divergence detected on hot-reload.
3066    ///
3067    /// `blocked_commands` is rebuilt live via `ShellPolicyHandle::rebuild` — no restart needed.
3068    /// `allowed_commands` cannot be rebuilt (feeds sandbox path intersection at construction time)
3069    /// — emit a warn + status banner when it changes.
3070    fn warn_on_shell_overlay_divergence(
3071        &self,
3072        new_overlay: &zeph_plugins::ResolvedOverlay,
3073        config: &Config,
3074    ) {
3075        let new_blocked: Vec<String> = {
3076            let mut v = config.tools.shell.blocked_commands.clone();
3077            v.sort();
3078            v
3079        };
3080        let new_allowed: Vec<String> = {
3081            let mut v = config.tools.shell.allowed_commands.clone();
3082            v.sort();
3083            v
3084        };
3085
3086        let startup = &self.runtime.lifecycle.startup_shell_overlay;
3087        let blocked_changed = new_blocked != startup.blocked;
3088        let allowed_changed = new_allowed != startup.allowed;
3089
3090        // blocked_commands IS rebuilt live — emit info-level confirmation only.
3091        if blocked_changed && let Some(ref h) = self.runtime.lifecycle.shell_policy_handle {
3092            h.rebuild(&config.tools.shell);
3093            tracing::info!(
3094                blocked_count = h.snapshot_blocked().len(),
3095                "shell blocked_commands rebuilt from hot-reload"
3096            );
3097        }
3098
3099        // allowed_commands cannot be rebuilt — sandbox path intersection is computed at
3100        // executor construction time. Warn loudly so the user restarts.
3101        //
3102        // Note: when base `allowed_commands` is empty (the default), the overlay's
3103        // intersection semantics keep it empty, so this branch is silently unreachable
3104        // for users who do not set a non-empty base list.
3105        if allowed_changed {
3106            let msg = "plugin config overlay changed shell allowed_commands; RESTART REQUIRED \
3107                 for sandbox path recomputation (blocked_commands was rebuilt live)";
3108            tracing::warn!("{msg}");
3109            if let Some(ref tx) = self.services.session.status_tx {
3110                let _ = tx.send(msg.to_owned());
3111            }
3112        }
3113
3114        let _ = new_overlay;
3115    }
3116
3117    /// Run `SideQuest` tool output eviction pass (#1885).
3118    ///
3119    /// PERF-1 fix: two-phase non-blocking design.
3120    ///
3121    /// Phase 1 (apply, this turn): check for a background LLM result spawned last turn,
3122    /// validate and apply it immediately.
3123    ///
3124    /// Phase 2 (schedule, this turn): rebuild cursors and spawn a background `tokio::spawn`
3125    /// task for the LLM call. The result is stored in `pending_sidequest_result` and applied
3126    /// next turn, so the current agent turn is never blocked by the LLM call.
3127    fn maybe_sidequest_eviction(&mut self) {
3128        // S1 runtime guard: warn when SideQuest is enabled alongside a non-Reactive pruning
3129        // strategy — the two systems share the same pool of evictable tool outputs and can
3130        // interfere. Disable sidequest.enabled when pruning_strategy != Reactive.
3131        if self.services.sidequest.config.enabled {
3132            use crate::config::PruningStrategy;
3133            if !matches!(
3134                self.context_manager.compression.pruning_strategy,
3135                PruningStrategy::Reactive
3136            ) {
3137                tracing::warn!(
3138                    strategy = ?self.context_manager.compression.pruning_strategy,
3139                    "sidequest is enabled alongside a non-Reactive pruning strategy; \
3140                     consider disabling sidequest.enabled to avoid redundant eviction"
3141                );
3142            }
3143        }
3144
3145        // Guard: do not evict while a focus session is active.
3146        if self.services.focus.is_active() {
3147            tracing::debug!("sidequest: skipping — focus session active");
3148            // Drop any pending result — cursors may be stale relative to focus truncation.
3149            self.services.compression.pending_sidequest_result = None;
3150            return;
3151        }
3152
3153        // Phase 1: apply pending result from last turn's background LLM call.
3154        self.sidequest_apply_pending();
3155
3156        // Phase 2: rebuild cursors and schedule the next background eviction LLM call.
3157        self.sidequest_schedule_next();
3158    }
3159
3160    fn sidequest_apply_pending(&mut self) {
3161        let Some(handle) = self.services.compression.pending_sidequest_result.take() else {
3162            return;
3163        };
3164        // `try_join` is non-blocking: if the task isn't done yet, `Err(handle)` is returned
3165        // and we reschedule below.
3166        let result = match handle.try_join() {
3167            Ok(result) => result,
3168            Err(_handle) => {
3169                // Task still running — drop it; a fresh one is scheduled below.
3170                tracing::debug!("sidequest: background LLM task not yet complete, rescheduling");
3171                return;
3172            }
3173        };
3174        match result {
3175            Ok(Some(evicted_indices)) if !evicted_indices.is_empty() => {
3176                let cursors_snapshot = self.services.sidequest.tool_output_cursors.clone();
3177                let freed = self.services.sidequest.apply_eviction(
3178                    &mut self.msg.messages,
3179                    &evicted_indices,
3180                    &self.runtime.metrics.token_counter,
3181                );
3182                if freed > 0 {
3183                    self.recompute_prompt_tokens();
3184                    // C1 fix: prevent maybe_compact() from firing in the same turn.
3185                    // cooldown=0: eviction does not impose post-compaction cooldown.
3186                    self.context_manager.compaction =
3187                        crate::agent::context_manager::CompactionState::CompactedThisTurn {
3188                            cooldown: 0,
3189                        };
3190                    tracing::info!(
3191                        freed_tokens = freed,
3192                        evicted_cursors = evicted_indices.len(),
3193                        pass = self.services.sidequest.passes_run,
3194                        "sidequest eviction complete"
3195                    );
3196                    if let Some(ref d) = self.runtime.debug.debug_dumper {
3197                        d.dump_sidequest_eviction(&cursors_snapshot, &evicted_indices, freed);
3198                    }
3199                    if let Some(ref tx) = self.services.session.status_tx {
3200                        let _ = tx.send(format!("SideQuest evicted {freed} tokens"));
3201                    }
3202                } else {
3203                    // apply_eviction returned 0 — clear spinner so it doesn't dangle.
3204                    if let Some(ref tx) = self.services.session.status_tx {
3205                        let _ = tx.send(String::new());
3206                    }
3207                }
3208            }
3209            Ok(None | Some(_)) => {
3210                tracing::debug!("sidequest: pending result: no cursors to evict");
3211                if let Some(ref tx) = self.services.session.status_tx {
3212                    let _ = tx.send(String::new());
3213                }
3214            }
3215            Err(e) => {
3216                tracing::debug!("sidequest: background task error: {e}");
3217                if let Some(ref tx) = self.services.session.status_tx {
3218                    let _ = tx.send(String::new());
3219                }
3220            }
3221        }
3222    }
3223
3224    fn sidequest_schedule_next(&mut self) {
3225        use zeph_llm::provider::{Message, MessageMetadata, Role};
3226
3227        self.services
3228            .sidequest
3229            .rebuild_cursors(&self.msg.messages, &self.runtime.metrics.token_counter);
3230
3231        if self.services.sidequest.tool_output_cursors.is_empty() {
3232            tracing::debug!("sidequest: no eligible cursors");
3233            return;
3234        }
3235
3236        let prompt = self.services.sidequest.build_eviction_prompt();
3237        let max_eviction_ratio = self.services.sidequest.config.max_eviction_ratio;
3238        let n_cursors = self.services.sidequest.tool_output_cursors.len();
3239        // Clone the provider so the spawn closure owns it without borrowing self.
3240        let provider = self.summary_or_primary_provider().clone();
3241
3242        let eviction_future = async move {
3243            let msgs = [Message {
3244                role: Role::User,
3245                content: prompt,
3246                parts: vec![],
3247                metadata: MessageMetadata::default(),
3248            }];
3249            let response =
3250                match tokio::time::timeout(std::time::Duration::from_secs(5), provider.chat(&msgs))
3251                    .await
3252                {
3253                    Ok(Ok(r)) => r,
3254                    Ok(Err(e)) => {
3255                        tracing::debug!("sidequest bg: LLM call failed: {e:#}");
3256                        return None;
3257                    }
3258                    Err(_) => {
3259                        tracing::debug!("sidequest bg: LLM call timed out");
3260                        return None;
3261                    }
3262                };
3263
3264            let start = response.find('{')?;
3265            let end = response.rfind('}')?;
3266            if start > end {
3267                return None;
3268            }
3269            let json_slice = &response[start..=end];
3270            let parsed: sidequest::EvictionResponse = serde_json::from_str(json_slice).ok()?;
3271            let mut valid: Vec<usize> = parsed
3272                .del_cursors
3273                .into_iter()
3274                .filter(|&c| c < n_cursors)
3275                .collect();
3276            valid.sort_unstable();
3277            valid.dedup();
3278            #[allow(
3279                clippy::cast_precision_loss,
3280                clippy::cast_possible_truncation,
3281                clippy::cast_sign_loss
3282            )]
3283            let max_evict = ((n_cursors as f32) * max_eviction_ratio).ceil() as usize;
3284            valid.truncate(max_evict);
3285            Some(valid)
3286        };
3287        let handle = self.runtime.lifecycle.task_supervisor.spawn_oneshot(
3288            std::sync::Arc::from("agent.sidequest.eviction"),
3289            move || eviction_future,
3290        );
3291        self.services.compression.pending_sidequest_result = Some(handle);
3292        tracing::debug!("sidequest: background LLM eviction task spawned");
3293        if let Some(ref tx) = self.services.session.status_tx {
3294            let _ = tx.send("SideQuest: scoring tool outputs...".into());
3295        }
3296    }
3297
3298    /// Return an `McpDispatch` adapter backed by the agent's MCP manager, if present.
3299    fn mcp_dispatch(&self) -> Option<McpManagerDispatch> {
3300        self.services
3301            .mcp
3302            .manager
3303            .as_ref()
3304            .map(|m| McpManagerDispatch(Arc::clone(m)))
3305    }
3306
3307    /// Check if the process cwd has changed since last call and fire `CwdChanged` hooks.
3308    ///
3309    /// Called after each tool batch completes. The check is a single syscall and has
3310    /// negligible cost. Only fires when cwd actually changed (defense-in-depth: normally
3311    /// only `set_working_directory` changes cwd; shell child processes cannot affect it).
3312    pub(crate) async fn check_cwd_changed(&mut self) {
3313        let current = match std::env::current_dir() {
3314            Ok(p) => p,
3315            Err(e) => {
3316                tracing::warn!("check_cwd_changed: failed to get cwd: {e}");
3317                return;
3318            }
3319        };
3320        if current == self.runtime.lifecycle.last_known_cwd {
3321            return;
3322        }
3323        let old_cwd =
3324            std::mem::replace(&mut self.runtime.lifecycle.last_known_cwd, current.clone());
3325        self.services.session.env_context.working_dir = current.display().to_string();
3326
3327        tracing::info!(
3328            old = %old_cwd.display(),
3329            new = %current.display(),
3330            "working directory changed"
3331        );
3332
3333        let _ = self
3334            .channel
3335            .send_status("Working directory changed\u{2026}")
3336            .await;
3337
3338        let hooks = self.services.session.hooks_config.cwd_changed.clone();
3339        if hooks.is_empty() {
3340            tracing::debug!("CwdChanged: no hooks configured, skipping");
3341        } else {
3342            tracing::debug!(count = hooks.len(), "CwdChanged: firing hooks");
3343            let mut env = std::collections::HashMap::new();
3344            env.insert("ZEPH_OLD_CWD".to_owned(), old_cwd.display().to_string());
3345            env.insert("ZEPH_NEW_CWD".to_owned(), current.display().to_string());
3346            let dispatch = self.mcp_dispatch();
3347            let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
3348                .as_ref()
3349                .map(|d| d as &dyn zeph_subagent::McpDispatch);
3350            if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp).await {
3351                tracing::warn!(error = %e, "CwdChanged hook failed");
3352            } else {
3353                tracing::info!(count = hooks.len(), "CwdChanged: hooks fired");
3354            }
3355        }
3356
3357        let _ = self.channel.send_status("").await;
3358    }
3359
3360    /// Handle a `FileChangedEvent` from the file watcher.
3361    pub(crate) async fn handle_file_changed(
3362        &mut self,
3363        event: crate::file_watcher::FileChangedEvent,
3364    ) {
3365        tracing::info!(path = %event.path.display(), "file changed");
3366
3367        let _ = self
3368            .channel
3369            .send_status("Running file-change hook\u{2026}")
3370            .await;
3371
3372        let hooks = self
3373            .services
3374            .session
3375            .hooks_config
3376            .file_changed_hooks
3377            .clone();
3378        if hooks.is_empty() {
3379            tracing::debug!(path = %event.path.display(), "FileChanged: no hooks configured, skipping");
3380        } else {
3381            tracing::debug!(count = hooks.len(), path = %event.path.display(), "FileChanged: firing hooks");
3382            let mut env = std::collections::HashMap::new();
3383            env.insert(
3384                "ZEPH_CHANGED_PATH".to_owned(),
3385                event.path.display().to_string(),
3386            );
3387            let dispatch = self.mcp_dispatch();
3388            let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
3389                .as_ref()
3390                .map(|d| d as &dyn zeph_subagent::McpDispatch);
3391            if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp).await {
3392                tracing::warn!(error = %e, "FileChanged hook failed");
3393            } else {
3394                tracing::info!(count = hooks.len(), path = %event.path.display(), "FileChanged: hooks fired");
3395            }
3396        }
3397
3398        let _ = self.channel.send_status("").await;
3399    }
3400
3401    /// If the compression spectrum is enabled and a promotion engine is wired, spawn a
3402    /// background scan task.
3403    ///
3404    /// The task loads the most-recent episodic window from `SemanticMemory`, runs the
3405    /// greedy clustering scan, and calls `promote` for each qualifying candidate.
3406    ///
3407    /// Supervised via [`agent_supervisor::BackgroundSupervisor`] under
3408    /// [`agent_supervisor::TaskClass::Enrichment`] — dropped under high load rather than
3409    /// blocking the turn.
3410    pub(super) fn maybe_spawn_promotion_scan(&mut self) {
3411        let Some(engine) = self.services.promotion_engine.clone() else {
3412            return;
3413        };
3414
3415        let Some(memory) = self.services.memory.persistence.memory.clone() else {
3416            return;
3417        };
3418
3419        // Use a conservative window cap. The engine's own PromotionConfig thresholds
3420        // determine whether a cluster actually qualifies; this is just the DB scan limit.
3421        let promotion_window = 200usize;
3422
3423        let accepted = self.runtime.lifecycle.supervisor.spawn(
3424            agent_supervisor::TaskClass::Enrichment,
3425            "compression_spectrum.promotion_scan",
3426            async move {
3427                let span = tracing::info_span!("memory.compression.promote.background");
3428                let _enter = span.enter();
3429
3430                let window = match memory.load_promotion_window(promotion_window).await {
3431                    Ok(w) => w,
3432                    Err(e) => {
3433                        tracing::warn!(error = %e, "promotion scan: failed to load window");
3434                        return;
3435                    }
3436                };
3437
3438                if window.is_empty() {
3439                    return;
3440                }
3441
3442                let candidates = match engine.scan(&window).await {
3443                    Ok(c) => c,
3444                    Err(e) => {
3445                        tracing::warn!(error = %e, "promotion scan: clustering failed");
3446                        return;
3447                    }
3448                };
3449
3450                for candidate in &candidates {
3451                    if let Err(e) = engine.promote(candidate).await {
3452                        tracing::warn!(
3453                            signature = %candidate.signature,
3454                            error = %e,
3455                            "promotion scan: promote failed"
3456                        );
3457                    }
3458                }
3459
3460                tracing::info!(candidates = candidates.len(), "promotion scan: complete");
3461            },
3462        );
3463
3464        if accepted {
3465            tracing::debug!("compression_spectrum: promotion scan task enqueued");
3466        }
3467    }
3468}
3469/// Thin wrapper that implements [`zeph_subagent::McpDispatch`] over an [`Arc<zeph_mcp::McpManager>`].
3470///
3471/// Used to pass MCP tool dispatch capability into `fire_hooks` without coupling
3472/// `zeph-subagent` to `zeph-mcp`.
3473struct McpManagerDispatch(Arc<zeph_mcp::McpManager>);
3474
3475impl zeph_subagent::McpDispatch for McpManagerDispatch {
3476    fn call_tool<'a>(
3477        &'a self,
3478        server: &'a str,
3479        tool: &'a str,
3480        args: serde_json::Value,
3481    ) -> std::pin::Pin<
3482        Box<dyn std::future::Future<Output = Result<serde_json::Value, String>> + Send + 'a>,
3483    > {
3484        Box::pin(async move {
3485            self.0
3486                .call_tool(server, tool, args)
3487                .await
3488                .map(|result| {
3489                    // Extract text content from the MCP response as a JSON value.
3490                    let texts: Vec<serde_json::Value> = result
3491                        .content
3492                        .iter()
3493                        .filter_map(|c| {
3494                            if let rmcp::model::RawContent::Text(t) = &c.raw {
3495                                Some(serde_json::Value::String(t.text.clone()))
3496                            } else {
3497                                None
3498                            }
3499                        })
3500                        .collect();
3501                    serde_json::Value::Array(texts)
3502                })
3503                .map_err(|e| e.to_string())
3504        })
3505    }
3506}
3507
3508pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
3509    while !*rx.borrow_and_update() {
3510        if rx.changed().await.is_err() {
3511            std::future::pending::<()>().await;
3512        }
3513    }
3514}
3515
3516pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
3517    match rx {
3518        Some(inner) => {
3519            if let Some(v) = inner.recv().await {
3520                Some(v)
3521            } else {
3522                *rx = None;
3523                std::future::pending().await
3524            }
3525        }
3526        None => std::future::pending().await,
3527    }
3528}
3529
3530/// Resolve the effective context budget from config, applying the `auto_budget` fallback.
3531///
3532/// Mirrors `AppBuilder::auto_budget_tokens` so hot-reload and initial startup use the same
3533/// logic: if `auto_budget = true` and `context_budget_tokens == 0`, query the provider's
3534/// context window; if still 0, fall back to 128 000 tokens.
3535/// Truncate a background run command to at most 80 characters for TUI display.
3536fn truncate_shell_command(cmd: &str) -> String {
3537    if cmd.len() <= 80 {
3538        return cmd.to_owned();
3539    }
3540    let end = cmd.floor_char_boundary(79);
3541    format!("{}…", &cmd[..end])
3542}
3543
3544/// Take the first 8 characters of a run-id hex string for compact TUI display.
3545fn truncate_shell_run_id(id: &str) -> String {
3546    id.chars().take(8).collect()
3547}
3548
3549pub(crate) fn resolve_context_budget(config: &Config, provider: &AnyProvider) -> usize {
3550    let tokens = if config.memory.auto_budget && config.memory.context_budget_tokens == 0 {
3551        if let Some(ctx_size) = provider.context_window() {
3552            tracing::info!(
3553                model_context = ctx_size,
3554                "auto-configured context budget on reload"
3555            );
3556            ctx_size
3557        } else {
3558            0
3559        }
3560    } else {
3561        config.memory.context_budget_tokens
3562    };
3563    if tokens == 0 {
3564        tracing::warn!(
3565            "context_budget_tokens resolved to 0 on reload — using fallback of 128000 tokens"
3566        );
3567        128_000
3568    } else {
3569        tokens
3570    }
3571}
3572
3573#[cfg(test)]
3574mod tests;
3575
3576#[cfg(test)]
3577pub(crate) use tests::agent_tests;
3578
3579#[cfg(test)]
3580mod test_stubs {
3581    use std::pin::Pin;
3582
3583    use zeph_commands::{
3584        CommandContext, CommandError, CommandHandler, CommandOutput, SlashCategory,
3585    };
3586
3587    /// Stub slash command registered only in `#[cfg(test)]` builds.
3588    ///
3589    /// Triggers the `Some(Err(CommandError))` arm in the session/debug registry
3590    /// dispatch block so the non-fatal error path can be tested without production
3591    /// command validation logic.
3592    pub(super) struct TestErrorCommand;
3593
3594    impl CommandHandler<CommandContext<'_>> for TestErrorCommand {
3595        fn name(&self) -> &'static str {
3596            "/test-error"
3597        }
3598
3599        fn description(&self) -> &'static str {
3600            "Test stub: always returns CommandError"
3601        }
3602
3603        fn category(&self) -> SlashCategory {
3604            SlashCategory::Session
3605        }
3606
3607        fn handle<'a>(
3608            &'a self,
3609            _ctx: &'a mut CommandContext<'_>,
3610            _args: &'a str,
3611        ) -> Pin<
3612            Box<dyn std::future::Future<Output = Result<CommandOutput, CommandError>> + Send + 'a>,
3613        > {
3614            Box::pin(async { Err(CommandError::new("boom")) })
3615        }
3616    }
3617}