Skip to main content

zeph_core/agent/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4mod accessors;
5mod builder;
6pub(crate) mod compaction_strategy;
7pub(super) mod compression_feedback;
8mod context;
9pub(crate) mod context_manager;
10mod corrections;
11pub mod error;
12mod experiment_cmd;
13pub(super) mod feedback_detector;
14pub(crate) mod focus;
15mod graph_commands;
16mod guidelines_commands;
17mod index;
18mod learning;
19pub(crate) mod learning_engine;
20mod log_commands;
21mod lsp_commands;
22mod mcp;
23mod memory_commands;
24mod message_queue;
25mod model_commands;
26mod persistence;
27mod plan;
28mod policy_commands;
29mod provider_cmd;
30pub(crate) mod rate_limiter;
31#[cfg(feature = "scheduler")]
32mod scheduler_commands;
33mod scheduler_loop;
34pub mod session_config;
35mod session_digest;
36pub(crate) mod sidequest;
37mod skill_management;
38pub mod slash_commands;
39pub(crate) mod state;
40pub(crate) mod tool_execution;
41pub(crate) mod tool_orchestrator;
42mod trust_commands;
43mod utils;
44
45use std::collections::{HashMap, HashSet, VecDeque};
46use std::sync::Arc;
47use std::time::Instant;
48
49use tokio::sync::{Notify, mpsc, watch};
50use tokio_util::sync::CancellationToken;
51use zeph_llm::any::AnyProvider;
52use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
53use zeph_memory::TokenCounter;
54use zeph_memory::semantic::SemanticMemory;
55use zeph_skills::loader::Skill;
56use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
57use zeph_skills::prompt::format_skills_prompt;
58use zeph_skills::registry::SkillRegistry;
59use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
60
61use crate::channel::Channel;
62use crate::config::Config;
63use crate::config::{SecurityConfig, SkillPromptMode, TimeoutConfig};
64use crate::context::{
65    ContextBudget, EnvironmentContext, build_system_prompt, build_system_prompt_with_instructions,
66};
67use zeph_sanitizer::ContentSanitizer;
68
69use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, detect_image_mime};
70use state::CompressionState;
71use state::{
72    DebugState, ExperimentState, FeedbackState, IndexState, InstructionState, LifecycleState,
73    McpState, MemoryState, MessageState, MetricsState, OrchestrationState, ProviderState,
74    RuntimeConfig, SecurityState, SessionState, SkillState,
75};
76
77pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
78pub(crate) const DOCUMENT_RAG_PREFIX: &str = "## Relevant documents\n";
79pub(crate) const RECALL_PREFIX: &str = "[semantic recall]\n";
80pub(crate) const CODE_CONTEXT_PREFIX: &str = "[code context]\n";
81pub(crate) const SUMMARY_PREFIX: &str = "[conversation summaries]\n";
82pub(crate) const CROSS_SESSION_PREFIX: &str = "[cross-session context]\n";
83pub(crate) const CORRECTIONS_PREFIX: &str = "[past corrections]\n";
84pub(crate) const GRAPH_FACTS_PREFIX: &str = "[known facts]\n";
85pub(crate) const SCHEDULED_TASK_PREFIX: &str = "Execute the following scheduled task now: ";
86pub(crate) const SESSION_DIGEST_PREFIX: &str = "[Session digest from previous interaction]\n";
87/// Prefix used for LSP context messages (`Role::System`) injected into message history.
88/// The tool-pair summarizer targets User/Assistant pairs and skips System messages,
89/// so these notes are never accidentally summarized. `remove_lsp_messages` uses this
90/// prefix to clear stale notes before each fresh injection.
91pub(crate) const LSP_NOTE_PREFIX: &str = "[lsp ";
92pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
93
94pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
95    use std::fmt::Write;
96    let capacity = "[tool output: ".len()
97        + tool_name.len()
98        + "]\n```\n".len()
99        + body.len()
100        + TOOL_OUTPUT_SUFFIX.len();
101    let mut buf = String::with_capacity(capacity);
102    let _ = write!(
103        buf,
104        "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
105    );
106    buf
107}
108
109pub struct Agent<C: Channel> {
110    provider: AnyProvider,
111    /// Dedicated embedding provider. Resolved once at bootstrap from `[[llm.providers]]`
112    /// (the entry with `embed = true`, or first entry with `embedding_model` set).
113    /// Falls back to `provider.clone()` when no dedicated entry exists.
114    /// **Never replaced** by `/provider switch`.
115    embedding_provider: AnyProvider,
116    channel: C,
117    pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
118    pub(super) msg: MessageState,
119    pub(super) memory_state: MemoryState,
120    pub(super) skill_state: SkillState,
121    pub(super) context_manager: context_manager::ContextManager,
122    pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
123    pub(super) learning_engine: learning_engine::LearningEngine,
124    pub(super) feedback: FeedbackState,
125    pub(super) runtime: RuntimeConfig,
126    pub(super) mcp: McpState,
127    pub(super) index: IndexState,
128    pub(super) session: SessionState,
129    pub(super) debug_state: DebugState,
130    pub(super) instructions: InstructionState,
131    pub(super) security: SecurityState,
132    pub(super) experiments: ExperimentState,
133    pub(super) compression: CompressionState,
134    pub(super) lifecycle: LifecycleState,
135    pub(super) providers: ProviderState,
136    pub(super) metrics: MetricsState,
137    pub(super) orchestration: OrchestrationState,
138    /// Focus agent state: active session tracking, knowledge block, reminder counters (#1850).
139    pub(super) focus: focus::FocusState,
140    /// `SideQuest` state: cursor tracking, turn counter, eviction stats (#1885).
141    pub(super) sidequest: sidequest::SidequestState,
142    /// Dynamic tool schema filter: pre-computed tool embeddings for per-turn filtering (#2020).
143    pub(super) tool_schema_filter: Option<zeph_tools::ToolSchemaFilter>,
144    /// Cached filtered tool IDs for the current user turn. Set by `compute_filtered_tool_ids()`
145    /// in `rebuild_system_prompt()`, consumed by the native tool loop on iteration 0.
146    pub(super) cached_filtered_tool_ids: Option<HashSet<String>>,
147    /// Tool dependency graph for sequential tool availability (issue #2024).
148    /// Built once from config, applied per-turn after tool schema filtering.
149    pub(super) dependency_graph: Option<zeph_tools::ToolDependencyGraph>,
150    /// Always-on tool IDs, mirrored from the tool schema filter for dependency gate bypass.
151    pub(super) dependency_always_on: HashSet<String>,
152    /// Tool IDs that completed successfully in the current session.
153    /// Grows monotonically per session; cleared on `/clear`.
154    /// NOTE: bounded by session length, typically < 1000 entries.
155    pub(super) completed_tool_ids: HashSet<String>,
156    /// DB row ID of the most recently persisted message. Set by `persist_message`;
157    /// consumed by `push_message` call sites to populate `metadata.db_id` on in-memory messages.
158    pub(super) last_persisted_message_id: Option<i64>,
159    /// DB message IDs pending hide after deferred tool pair summarization.
160    pub(super) deferred_db_hide_ids: Vec<i64>,
161    /// Summary texts pending insertion after deferred tool pair summarization.
162    pub(super) deferred_db_summaries: Vec<String>,
163    /// Runtime middleware layers for LLM calls and tool dispatch (#2286).
164    ///
165    /// Default: empty vec (zero-cost — loops never iterate).
166    pub(super) runtime_layers: Vec<std::sync::Arc<dyn crate::runtime_layer::RuntimeLayer>>,
167    /// Current tool loop iteration index within the active user turn. Reset to 0 at turn start,
168    /// incremented each iteration. Used to compute remaining tool call budget for `BudgetHint` (#2267).
169    pub(super) current_tool_iteration: usize,
170}
171
172impl<C: Channel> Agent<C> {
173    #[must_use]
174    pub fn new(
175        provider: AnyProvider,
176        channel: C,
177        registry: SkillRegistry,
178        matcher: Option<SkillMatcherBackend>,
179        max_active_skills: usize,
180        tool_executor: impl ToolExecutor + 'static,
181    ) -> Self {
182        let registry = std::sync::Arc::new(std::sync::RwLock::new(registry));
183        Self::new_with_registry_arc(
184            provider,
185            channel,
186            registry,
187            matcher,
188            max_active_skills,
189            tool_executor,
190        )
191    }
192
193    /// Create an agent from a pre-wrapped registry Arc, allowing the caller to
194    /// share the same Arc with other components (e.g. [`crate::SkillLoaderExecutor`]).
195    ///
196    /// # Panics
197    ///
198    /// Panics if the registry `RwLock` is poisoned.
199    #[must_use]
200    #[allow(clippy::too_many_lines)] // flat struct literal initializing all Agent sub-structs — one field per sub-struct, cannot be split further
201    pub fn new_with_registry_arc(
202        provider: AnyProvider,
203        channel: C,
204        registry: std::sync::Arc<std::sync::RwLock<SkillRegistry>>,
205        matcher: Option<SkillMatcherBackend>,
206        max_active_skills: usize,
207        tool_executor: impl ToolExecutor + 'static,
208    ) -> Self {
209        debug_assert!(max_active_skills > 0, "max_active_skills must be > 0");
210        let all_skills: Vec<Skill> = {
211            let reg = registry.read().expect("registry read lock poisoned");
212            reg.all_meta()
213                .iter()
214                .filter_map(|m| reg.get_skill(&m.name).ok())
215                .collect()
216        };
217        let empty_trust = HashMap::new();
218        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
219        let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
220        let system_prompt = build_system_prompt(&skills_prompt, None, None, false);
221        tracing::debug!(len = system_prompt.len(), "initial system prompt built");
222        tracing::trace!(prompt = %system_prompt, "full system prompt");
223
224        let initial_prompt_tokens = u64::try_from(system_prompt.len()).unwrap_or(0) / 4;
225        let (_tx, rx) = watch::channel(false);
226        let token_counter = Arc::new(TokenCounter::new());
227        // Always create the receiver side of the experiment notification channel so the
228        // select! branch in the agent loop compiles unconditionally. The sender is only
229        // stored when the experiments feature is enabled (it is only used in experiment_cmd.rs).
230        let (exp_notify_tx, exp_notify_rx) = tokio::sync::mpsc::channel::<String>(4);
231        let embedding_provider = provider.clone();
232        Self {
233            provider,
234            embedding_provider,
235            channel,
236            tool_executor: Arc::new(tool_executor),
237            msg: MessageState {
238                messages: vec![Message {
239                    role: Role::System,
240                    content: system_prompt,
241                    parts: vec![],
242                    metadata: MessageMetadata::default(),
243                }],
244                message_queue: VecDeque::new(),
245                pending_image_parts: Vec::new(),
246            },
247            memory_state: MemoryState {
248                memory: None,
249                conversation_id: None,
250                history_limit: 50,
251                recall_limit: 5,
252                summarization_threshold: 50,
253                cross_session_score_threshold: 0.35,
254                autosave_assistant: false,
255                autosave_min_length: 20,
256                tool_call_cutoff: 6,
257                unsummarized_count: 0,
258                document_config: crate::config::DocumentConfig::default(),
259                graph_config: crate::config::GraphConfig::default(),
260                compression_guidelines_config: zeph_memory::CompressionGuidelinesConfig::default(),
261                shutdown_summary: true,
262                shutdown_summary_min_messages: 4,
263                shutdown_summary_max_messages: 20,
264                shutdown_summary_timeout_secs: 10,
265                structured_summaries: false,
266                last_recall_confidence: None,
267                digest_config: crate::config::DigestConfig::default(),
268                cached_session_digest: None,
269                context_strategy: crate::config::ContextStrategy::default(),
270                crossover_turn_threshold: 20,
271                rpe_router: None,
272                goal_text: None,
273                persona_config: crate::config::PersonaConfig::default(),
274            },
275            skill_state: SkillState {
276                registry,
277                skill_paths: Vec::new(),
278                managed_dir: None,
279                trust_config: crate::config::TrustConfig::default(),
280                matcher,
281                max_active_skills,
282                disambiguation_threshold: 0.20,
283                min_injection_score: 0.20,
284                embedding_model: String::new(),
285                skill_reload_rx: None,
286                active_skill_names: Vec::new(),
287                last_skills_prompt: skills_prompt,
288                prompt_mode: SkillPromptMode::Auto,
289                available_custom_secrets: HashMap::new(),
290                cosine_weight: 0.7,
291                hybrid_search: false,
292                bm25_index: None,
293                two_stage_matching: false,
294                confusability_threshold: 0.0,
295                rl_head: None,
296                rl_weight: 0.3,
297                rl_warmup_updates: 50,
298                generation_output_dir: None,
299                generation_provider_name: String::new(),
300            },
301            context_manager: context_manager::ContextManager::new(),
302            tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
303            learning_engine: learning_engine::LearningEngine::new(),
304            feedback: FeedbackState {
305                detector: feedback_detector::FeedbackDetector::new(0.6),
306                judge: None,
307                llm_classifier: None,
308            },
309            debug_state: DebugState {
310                debug_dumper: None,
311                dump_format: crate::debug_dump::DumpFormat::default(),
312                trace_collector: None,
313                iteration_counter: 0,
314                anomaly_detector: None,
315                reasoning_model_warning: true,
316                logging_config: crate::config::LoggingConfig::default(),
317                dump_dir: None,
318                trace_service_name: String::new(),
319                trace_redact: true,
320                current_iteration_span_id: None,
321            },
322            runtime: RuntimeConfig {
323                security: SecurityConfig::default(),
324                timeouts: TimeoutConfig::default(),
325                model_name: String::new(),
326                active_provider_name: String::new(),
327                permission_policy: zeph_tools::PermissionPolicy::default(),
328                redact_credentials: true,
329                rate_limiter: rate_limiter::ToolRateLimiter::new(
330                    rate_limiter::RateLimitConfig::default(),
331                ),
332                semantic_cache_enabled: false,
333                semantic_cache_threshold: 0.95,
334                semantic_cache_max_candidates: 10,
335                dependency_config: zeph_tools::DependencyConfig::default(),
336                adversarial_policy_info: None,
337                spawn_depth: 0,
338                budget_hint_enabled: true,
339                channel_skills: zeph_config::ChannelSkillsConfig::default(),
340            },
341            mcp: McpState {
342                tools: Vec::new(),
343                registry: None,
344                manager: None,
345                allowed_commands: Vec::new(),
346                max_dynamic: 10,
347                elicitation_rx: None,
348                shared_tools: None,
349                tool_rx: None,
350                server_outcomes: Vec::new(),
351                pruning_cache: zeph_mcp::PruningCache::new(),
352                pruning_provider: None,
353                pruning_enabled: false,
354                pruning_params: zeph_mcp::PruningParams::default(),
355                semantic_index: None,
356                discovery_strategy: zeph_mcp::ToolDiscoveryStrategy::default(),
357                discovery_params: zeph_mcp::DiscoveryParams::default(),
358                discovery_provider: None,
359                elicitation_warn_sensitive_fields: true,
360            },
361            index: IndexState {
362                retriever: None,
363                repo_map_tokens: 0,
364                cached_repo_map: None,
365                repo_map_ttl: std::time::Duration::from_secs(300),
366            },
367            session: SessionState {
368                env_context: EnvironmentContext::gather(""),
369                response_cache: None,
370                parent_tool_use_id: None,
371                status_tx: None,
372                lsp_hooks: None,
373                policy_config: None,
374                hooks_config: state::HooksConfigSnapshot::default(),
375            },
376            instructions: InstructionState {
377                blocks: Vec::new(),
378                reload_rx: None,
379                reload_state: None,
380            },
381            security: SecurityState {
382                sanitizer: ContentSanitizer::new(&zeph_sanitizer::ContentIsolationConfig::default()),
383                quarantine_summarizer: None,
384                is_acp_session: false,
385                exfiltration_guard: zeph_sanitizer::exfiltration::ExfiltrationGuard::new(
386                    zeph_sanitizer::exfiltration::ExfiltrationGuardConfig::default(),
387                ),
388                flagged_urls: std::collections::HashSet::new(),
389                user_provided_urls: std::sync::Arc::new(std::sync::RwLock::new(
390                    std::collections::HashSet::new(),
391                )),
392                pii_filter: zeph_sanitizer::pii::PiiFilter::new(
393                    zeph_sanitizer::pii::PiiFilterConfig::default(),
394                ),
395                #[cfg(feature = "classifiers")]
396                pii_ner_backend: None,
397                #[cfg(feature = "classifiers")]
398                pii_ner_timeout_ms: 5000,
399                #[cfg(feature = "classifiers")]
400                pii_ner_max_chars: 8192,
401                #[cfg(feature = "classifiers")]
402                pii_ner_circuit_breaker_threshold: 2,
403                #[cfg(feature = "classifiers")]
404                pii_ner_consecutive_timeouts: 0,
405                #[cfg(feature = "classifiers")]
406                pii_ner_tripped: false,
407                memory_validator: zeph_sanitizer::memory_validation::MemoryWriteValidator::new(
408                    zeph_sanitizer::memory_validation::MemoryWriteValidationConfig::default(),
409                ),
410                guardrail: None,
411                response_verifier: zeph_sanitizer::response_verifier::ResponseVerifier::new(
412                    zeph_config::ResponseVerificationConfig::default(),
413                ),
414                causal_analyzer: None,
415            },
416            experiments: ExperimentState {
417                config: crate::config::ExperimentConfig::default(),
418                cancel: None,
419                baseline: crate::experiments::ConfigSnapshot::default(),
420                eval_provider: None,
421                notify_rx: Some(exp_notify_rx),
422                notify_tx: exp_notify_tx,
423            },
424            compression: CompressionState {
425                current_task_goal: None,
426                task_goal_user_msg_hash: None,
427                pending_task_goal: None,
428                pending_sidequest_result: None,
429                subgoal_registry: crate::agent::compaction_strategy::SubgoalRegistry::default(),
430                pending_subgoal: None,
431                subgoal_user_msg_hash: None,
432            },
433            lifecycle: LifecycleState {
434                shutdown: rx,
435                start_time: Instant::now(),
436                cancel_signal: Arc::new(Notify::new()),
437                cancel_token: CancellationToken::new(),
438                config_path: None,
439                config_reload_rx: None,
440                warmup_ready: None,
441                update_notify_rx: None,
442                custom_task_rx: None,
443                last_known_cwd: std::env::current_dir().unwrap_or_default(),
444                file_changed_rx: None,
445                file_watcher: None,
446            },
447            providers: ProviderState {
448                summary_provider: None,
449                provider_override: None,
450                judge_provider: None,
451                probe_provider: None,
452                compress_provider: None,
453                cached_prompt_tokens: initial_prompt_tokens,
454                server_compaction_active: false,
455                stt: None,
456                provider_pool: Vec::new(),
457                provider_config_snapshot: None,
458            },
459            metrics: MetricsState {
460                metrics_tx: None,
461                cost_tracker: None,
462                token_counter,
463                extended_context: false,
464                classifier_metrics: None,
465            },
466            orchestration: OrchestrationState {
467                planner_provider: None,
468                verify_provider: None,
469                pending_graph: None,
470                plan_cancel_token: None,
471                subagent_manager: None,
472                subagent_config: crate::config::SubAgentConfig::default(),
473                orchestration_config: crate::config::OrchestrationConfig::default(),
474                plan_cache: None,
475                pending_goal_embedding: None,
476            },
477            focus: focus::FocusState::default(),
478            sidequest: sidequest::SidequestState::default(),
479            tool_schema_filter: None,
480            cached_filtered_tool_ids: None,
481            dependency_graph: None,
482            dependency_always_on: HashSet::new(),
483            completed_tool_ids: HashSet::new(),
484            last_persisted_message_id: None,
485            deferred_db_hide_ids: Vec::new(),
486            deferred_db_summaries: Vec::new(),
487            runtime_layers: Vec::new(),
488            current_tool_iteration: 0,
489        }
490    }
491
492    /// Poll all active sub-agents for completed/failed/canceled results.
493    ///
494    /// Non-blocking: returns immediately with a list of `(task_id, result)` pairs
495    /// for agents that have finished. Each completed agent is removed from the manager.
496    pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
497        let Some(mgr) = &mut self.orchestration.subagent_manager else {
498            return vec![];
499        };
500
501        let finished: Vec<String> = mgr
502            .statuses()
503            .into_iter()
504            .filter_map(|(id, status)| {
505                if matches!(
506                    status.state,
507                    crate::subagent::SubAgentState::Completed
508                        | crate::subagent::SubAgentState::Failed
509                        | crate::subagent::SubAgentState::Canceled
510                ) {
511                    Some(id)
512                } else {
513                    None
514                }
515            })
516            .collect();
517
518        let mut results = vec![];
519        for task_id in finished {
520            match mgr.collect(&task_id).await {
521                Ok(result) => results.push((task_id, result)),
522                Err(e) => {
523                    tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
524                }
525            }
526        }
527        results
528    }
529
530    /// Call the LLM to generate a structured session summary with a configurable timeout.
531    ///
532    /// Falls back to plain-text chat if structured output fails or times out. Returns `None` on
533    /// any failure, logging a warning — callers must treat `None` as "skip storage".
534    ///
535    /// Each LLM attempt is bounded by `shutdown_summary_timeout_secs`; in the worst case
536    /// (structured call times out and plain-text fallback also times out) this adds up to
537    /// `2 * shutdown_summary_timeout_secs` of shutdown latency.
538    async fn call_llm_for_session_summary(
539        &self,
540        chat_messages: &[Message],
541    ) -> Option<zeph_memory::StructuredSummary> {
542        let timeout_dur =
543            std::time::Duration::from_secs(self.memory_state.shutdown_summary_timeout_secs);
544        match tokio::time::timeout(
545            timeout_dur,
546            self.provider
547                .chat_typed_erased::<zeph_memory::StructuredSummary>(chat_messages),
548        )
549        .await
550        {
551            Ok(Ok(s)) => Some(s),
552            Ok(Err(e)) => {
553                tracing::warn!(
554                    "shutdown summary: structured LLM call failed, falling back to plain: {e:#}"
555                );
556                self.plain_text_summary_fallback(chat_messages, timeout_dur)
557                    .await
558            }
559            Err(_) => {
560                tracing::warn!(
561                    "shutdown summary: structured LLM call timed out after {}s, falling back to plain",
562                    self.memory_state.shutdown_summary_timeout_secs
563                );
564                self.plain_text_summary_fallback(chat_messages, timeout_dur)
565                    .await
566            }
567        }
568    }
569
570    async fn plain_text_summary_fallback(
571        &self,
572        chat_messages: &[Message],
573        timeout_dur: std::time::Duration,
574    ) -> Option<zeph_memory::StructuredSummary> {
575        match tokio::time::timeout(timeout_dur, self.provider.chat(chat_messages)).await {
576            Ok(Ok(plain)) => Some(zeph_memory::StructuredSummary {
577                summary: plain,
578                key_facts: vec![],
579                entities: vec![],
580            }),
581            Ok(Err(e)) => {
582                tracing::warn!("shutdown summary: plain LLM fallback failed: {e:#}");
583                None
584            }
585            Err(_) => {
586                tracing::warn!("shutdown summary: plain LLM fallback timed out");
587                None
588            }
589        }
590    }
591
592    /// Persist tombstone `ToolResult` messages for any assistant `ToolUse` parts that were written
593    /// to the DB during this session but never paired with a `ToolResult` (e.g. because stdin
594    /// closed while tool execution was in progress). Without this the next session startup strips
595    /// those assistant messages and emits orphan warnings.
596    async fn flush_orphaned_tool_use_on_shutdown(&mut self) {
597        use zeph_llm::provider::{MessagePart, Role};
598
599        // Walk messages in reverse: if the last assistant message (ignoring any trailing
600        // system messages) has ToolUse parts and is NOT immediately followed by a user
601        // message whose ToolResult ids cover those ToolUse ids, persist tombstones.
602        let msgs = &self.msg.messages;
603        // Find last assistant message index.
604        let Some(asst_idx) = msgs.iter().rposition(|m| m.role == Role::Assistant) else {
605            return;
606        };
607        let asst_msg = &msgs[asst_idx];
608        let tool_use_ids: Vec<(&str, &str, &serde_json::Value)> = asst_msg
609            .parts
610            .iter()
611            .filter_map(|p| {
612                if let MessagePart::ToolUse { id, name, input } = p {
613                    Some((id.as_str(), name.as_str(), input))
614                } else {
615                    None
616                }
617            })
618            .collect();
619        if tool_use_ids.is_empty() {
620            return;
621        }
622
623        // Check whether a following user message already pairs all ToolUse ids.
624        let paired_ids: std::collections::HashSet<&str> = msgs
625            .get(asst_idx + 1..)
626            .into_iter()
627            .flatten()
628            .filter(|m| m.role == Role::User)
629            .flat_map(|m| m.parts.iter())
630            .filter_map(|p| {
631                if let MessagePart::ToolResult { tool_use_id, .. } = p {
632                    Some(tool_use_id.as_str())
633                } else {
634                    None
635                }
636            })
637            .collect();
638
639        let unpaired: Vec<zeph_llm::provider::ToolUseRequest> = tool_use_ids
640            .iter()
641            .filter(|(id, _, _)| !paired_ids.contains(*id))
642            .map(|(id, name, input)| zeph_llm::provider::ToolUseRequest {
643                id: (*id).to_owned(),
644                name: (*name).to_owned(),
645                input: (*input).clone(),
646            })
647            .collect();
648
649        if unpaired.is_empty() {
650            return;
651        }
652
653        tracing::info!(
654            count = unpaired.len(),
655            "shutdown: persisting tombstone ToolResults for unpaired in-flight tool calls"
656        );
657        self.persist_cancelled_tool_results(&unpaired).await;
658    }
659
660    /// Generate and store a lightweight session summary at shutdown when no hard compaction fired.
661    ///
662    /// Guards:
663    /// - `shutdown_summary` config must be enabled
664    /// - `conversation_id` must be set (memory must be attached)
665    /// - no existing session summary in the store (primary guard — resilient to failed Qdrant writes)
666    /// - at least `shutdown_summary_min_messages` user-turn messages in history
667    ///
668    /// All errors are logged as warnings and swallowed — shutdown must never fail.
669    async fn maybe_store_shutdown_summary(&mut self) {
670        if !self.memory_state.shutdown_summary {
671            return;
672        }
673        let Some(memory) = self.memory_state.memory.clone() else {
674            return;
675        };
676        let Some(conversation_id) = self.memory_state.conversation_id else {
677            return;
678        };
679
680        // Primary guard: check if a summary already exists (handles failed Qdrant writes too).
681        match memory.has_session_summary(conversation_id).await {
682            Ok(true) => {
683                tracing::debug!("shutdown summary: session already has a summary, skipping");
684                return;
685            }
686            Ok(false) => {}
687            Err(e) => {
688                tracing::warn!("shutdown summary: failed to check existing summary: {e:#}");
689                return;
690            }
691        }
692
693        // Count user-turn messages only (skip system prompt at index 0).
694        let user_count = self
695            .msg
696            .messages
697            .iter()
698            .skip(1)
699            .filter(|m| m.role == Role::User)
700            .count();
701        if user_count < self.memory_state.shutdown_summary_min_messages {
702            tracing::debug!(
703                user_count,
704                min = self.memory_state.shutdown_summary_min_messages,
705                "shutdown summary: too few user messages, skipping"
706            );
707            return;
708        }
709
710        // TUI status — send errors silently ignored (TUI may already be gone at shutdown).
711        let _ = self.channel.send_status("Saving session summary...").await;
712
713        // Collect last N messages (skip system prompt at index 0).
714        let max = self.memory_state.shutdown_summary_max_messages;
715        if max == 0 {
716            tracing::debug!("shutdown summary: max_messages=0, skipping");
717            return;
718        }
719        let non_system: Vec<_> = self.msg.messages.iter().skip(1).collect();
720        let slice = if non_system.len() > max {
721            &non_system[non_system.len() - max..]
722        } else {
723            &non_system[..]
724        };
725
726        let msgs_for_prompt: Vec<(zeph_memory::MessageId, String, String)> = slice
727            .iter()
728            .map(|m| {
729                let role = match m.role {
730                    Role::User => "user".to_owned(),
731                    Role::Assistant => "assistant".to_owned(),
732                    Role::System => "system".to_owned(),
733                };
734                (zeph_memory::MessageId(0), role, m.content.clone())
735            })
736            .collect();
737
738        let prompt = zeph_memory::build_summarization_prompt(&msgs_for_prompt);
739        let chat_messages = vec![Message {
740            role: Role::User,
741            content: prompt,
742            parts: vec![],
743            metadata: MessageMetadata::default(),
744        }];
745
746        let Some(structured) = self.call_llm_for_session_summary(&chat_messages).await else {
747            let _ = self.channel.send_status("").await;
748            return;
749        };
750
751        if let Err(e) = memory
752            .store_shutdown_summary(conversation_id, &structured.summary, &structured.key_facts)
753            .await
754        {
755            tracing::warn!("shutdown summary: storage failed: {e:#}");
756        } else {
757            tracing::info!(
758                conversation_id = conversation_id.0,
759                "shutdown summary stored"
760            );
761        }
762
763        // Clear TUI status.
764        let _ = self.channel.send_status("").await;
765    }
766
767    pub async fn shutdown(&mut self) {
768        self.channel.send("Shutting down...").await.ok();
769
770        // CRIT-1: persist Thompson state accumulated during this session.
771        self.provider.save_router_state();
772
773        if let Some(ref mut mgr) = self.orchestration.subagent_manager {
774            mgr.shutdown_all();
775        }
776
777        if let Some(ref manager) = self.mcp.manager {
778            manager.shutdown_all_shared().await;
779        }
780
781        // Finalize compaction trajectory: push the last open segment into the Vec.
782        // This segment would otherwise only be pushed when the next hard compaction fires,
783        // which never happens at session end.
784        if let Some(turns) = self.context_manager.turns_since_last_hard_compaction {
785            self.update_metrics(|m| {
786                m.compaction_turns_after_hard.push(turns);
787            });
788            self.context_manager.turns_since_last_hard_compaction = None;
789        }
790
791        if let Some(ref tx) = self.metrics.metrics_tx {
792            let m = tx.borrow();
793            if m.filter_applications > 0 {
794                #[allow(clippy::cast_precision_loss)]
795                let pct = if m.filter_raw_tokens > 0 {
796                    m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
797                } else {
798                    0.0
799                };
800                tracing::info!(
801                    raw_tokens = m.filter_raw_tokens,
802                    saved_tokens = m.filter_saved_tokens,
803                    applications = m.filter_applications,
804                    "tool output filtering saved ~{} tokens ({pct:.0}%)",
805                    m.filter_saved_tokens,
806                );
807            }
808            if m.compaction_hard_count > 0 {
809                tracing::info!(
810                    hard_compactions = m.compaction_hard_count,
811                    turns_after_hard = ?m.compaction_turns_after_hard,
812                    "hard compaction trajectory"
813                );
814            }
815        }
816
817        // Flush tombstone ToolResults for any assistant ToolUse that was persisted but never
818        // paired with a ToolResult (e.g. stdin EOF mid-execution). Without this the next session
819        // startup strips the orphaned ToolUse and emits warnings.
820        self.flush_orphaned_tool_use_on_shutdown().await;
821
822        self.maybe_store_shutdown_summary().await;
823        self.maybe_store_session_digest().await;
824
825        tracing::info!("agent shutdown complete");
826    }
827
828    /// Run the chat loop, receiving messages via the channel until EOF or shutdown.
829    ///
830    /// # Errors
831    ///
832    /// Returns an error if channel I/O or LLM communication fails.
833    /// Refresh sub-agent metrics snapshot for the TUI metrics panel.
834    fn refresh_subagent_metrics(&mut self) {
835        let Some(ref mgr) = self.orchestration.subagent_manager else {
836            return;
837        };
838        let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
839            .statuses()
840            .into_iter()
841            .map(|(id, s)| {
842                let def = mgr.agents_def(&id);
843                crate::metrics::SubAgentMetrics {
844                    name: def.map_or_else(|| id[..8.min(id.len())].to_owned(), |d| d.name.clone()),
845                    id: id.clone(),
846                    state: format!("{:?}", s.state).to_lowercase(),
847                    turns_used: s.turns_used,
848                    max_turns: def.map_or(20, |d| d.permissions.max_turns),
849                    background: def.is_some_and(|d| d.permissions.background),
850                    elapsed_secs: s.started_at.elapsed().as_secs(),
851                    permission_mode: def.map_or_else(String::new, |d| {
852                        use crate::subagent::def::PermissionMode;
853                        match d.permissions.permission_mode {
854                            PermissionMode::Default => String::new(),
855                            PermissionMode::AcceptEdits => "accept_edits".into(),
856                            PermissionMode::DontAsk => "dont_ask".into(),
857                            PermissionMode::BypassPermissions => "bypass_permissions".into(),
858                            PermissionMode::Plan => "plan".into(),
859                        }
860                    }),
861                    transcript_dir: mgr
862                        .agent_transcript_dir(&id)
863                        .map(|p| p.to_string_lossy().into_owned()),
864                }
865            })
866            .collect();
867        self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
868    }
869
870    /// Non-blocking poll: notify the user when background sub-agents complete.
871    async fn notify_completed_subagents(&mut self) -> Result<(), error::AgentError> {
872        let completed = self.poll_subagents().await;
873        for (task_id, result) in completed {
874            let notice = if result.is_empty() {
875                format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
876            } else {
877                format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
878            };
879            if let Err(e) = self.channel.send(&notice).await {
880                tracing::warn!(error = %e, "failed to send sub-agent completion notice");
881            }
882        }
883        Ok(())
884    }
885
886    /// Run the agent main loop.
887    ///
888    /// # Errors
889    ///
890    /// Returns an error if the channel, LLM provider, or tool execution encounters a fatal error.
891    pub async fn run(&mut self) -> Result<(), error::AgentError> {
892        if let Some(mut rx) = self.lifecycle.warmup_ready.take()
893            && !*rx.borrow()
894        {
895            let _ = rx.changed().await;
896            if !*rx.borrow() {
897                tracing::warn!("model warmup did not complete successfully");
898            }
899        }
900
901        // Load the session digest once at session start for context injection.
902        self.load_and_cache_session_digest().await;
903
904        loop {
905            // Apply any pending provider override (from ACP set_session_config_option).
906            if let Some(ref slot) = self.providers.provider_override
907                && let Some(new_provider) = slot
908                    .write()
909                    .unwrap_or_else(std::sync::PoisonError::into_inner)
910                    .take()
911            {
912                tracing::debug!(provider = new_provider.name(), "ACP model override applied");
913                self.provider = new_provider;
914            }
915
916            // Poll for MCP tool list updates from tools/list_changed notifications.
917            self.check_tool_refresh().await;
918
919            // Process any pending MCP elicitation requests from MCP servers.
920            self.process_pending_elicitations().await;
921
922            // Refresh sub-agent status in metrics before polling.
923            self.refresh_subagent_metrics();
924
925            // Non-blocking poll: notify user when background sub-agents complete.
926            self.notify_completed_subagents().await?;
927
928            self.drain_channel();
929
930            let (text, image_parts) = if let Some(queued) = self.msg.message_queue.pop_front() {
931                self.notify_queue_count().await;
932                if queued.raw_attachments.is_empty() {
933                    (queued.text, queued.image_parts)
934                } else {
935                    let msg = crate::channel::ChannelMessage {
936                        text: queued.text,
937                        attachments: queued.raw_attachments,
938                    };
939                    self.resolve_message(msg).await
940                }
941            } else {
942                let incoming = tokio::select! {
943                    result = self.channel.recv() => result?,
944                    () = shutdown_signal(&mut self.lifecycle.shutdown) => {
945                        tracing::info!("shutting down");
946                        break;
947                    }
948                    Some(_) = recv_optional(&mut self.skill_state.skill_reload_rx) => {
949                        self.reload_skills().await;
950                        continue;
951                    }
952                    Some(_) = recv_optional(&mut self.instructions.reload_rx) => {
953                        self.reload_instructions();
954                        continue;
955                    }
956                    Some(_) = recv_optional(&mut self.lifecycle.config_reload_rx) => {
957                        self.reload_config();
958                        continue;
959                    }
960                    Some(msg) = recv_optional(&mut self.lifecycle.update_notify_rx) => {
961                        if let Err(e) = self.channel.send(&msg).await {
962                            tracing::warn!("failed to send update notification: {e}");
963                        }
964                        continue;
965                    }
966                    Some(msg) = recv_optional(&mut self.experiments.notify_rx) => {
967                        // Experiment engine completed (ok or err). Clear the cancel token so
968                        // status reports idle and new experiments can be started.
969                        { self.experiments.cancel = None; }
970                        if let Err(e) = self.channel.send(&msg).await {
971                            tracing::warn!("failed to send experiment completion: {e}");
972                        }
973                        continue;
974                    }
975                    Some(prompt) = recv_optional(&mut self.lifecycle.custom_task_rx) => {
976                        tracing::info!("scheduler: injecting custom task as agent turn");
977                        let text = format!("{SCHEDULED_TASK_PREFIX}{prompt}");
978                        Some(crate::channel::ChannelMessage { text, attachments: Vec::new() })
979                    }
980                    Some(event) = recv_optional(&mut self.lifecycle.file_changed_rx) => {
981                        self.handle_file_changed(event).await;
982                        continue;
983                    }
984                };
985                let Some(msg) = incoming else { break };
986                self.drain_channel();
987                self.resolve_message(msg).await
988            };
989
990            let trimmed = text.trim();
991
992            match self.handle_builtin_command(trimmed).await? {
993                Some(true) => break,
994                Some(false) => continue,
995                None => {}
996            }
997
998            self.process_user_message(text, image_parts).await?;
999        }
1000
1001        // Flush trace collector on normal exit (C-04: Drop handles error/panic paths).
1002        if let Some(ref mut tc) = self.debug_state.trace_collector {
1003            tc.finish();
1004        }
1005
1006        Ok(())
1007    }
1008
1009    /// Handle `/debug-dump` and `/debug-dump <path>` commands.
1010    async fn handle_debug_dump_command(&mut self, trimmed: &str) {
1011        let arg = trimmed.strip_prefix("/debug-dump").map_or("", str::trim);
1012        if arg.is_empty() {
1013            match &self.debug_state.debug_dumper {
1014                Some(d) => {
1015                    let _ = self
1016                        .channel
1017                        .send(&format!("Debug dump active: {}", d.dir().display()))
1018                        .await;
1019                }
1020                None => {
1021                    let _ = self
1022                        .channel
1023                        .send(
1024                            "Debug dump is inactive. Use `/debug-dump <path>` to enable, \
1025                             or start with `--debug-dump [dir]`.",
1026                        )
1027                        .await;
1028                }
1029            }
1030            return;
1031        }
1032        let dir = std::path::PathBuf::from(arg);
1033        match crate::debug_dump::DebugDumper::new(&dir, self.debug_state.dump_format) {
1034            Ok(dumper) => {
1035                let path = dumper.dir().display().to_string();
1036                self.debug_state.debug_dumper = Some(dumper);
1037                let _ = self
1038                    .channel
1039                    .send(&format!("Debug dump enabled: {path}"))
1040                    .await;
1041            }
1042            Err(e) => {
1043                let _ = self
1044                    .channel
1045                    .send(&format!("Failed to enable debug dump: {e}"))
1046                    .await;
1047            }
1048        }
1049    }
1050
1051    /// Handle `/dump-format <json|raw|trace>` command — switch debug dump format at runtime.
1052    async fn handle_dump_format_command(&mut self, trimmed: &str) {
1053        let arg = trimmed.strip_prefix("/dump-format").map_or("", str::trim);
1054        if arg.is_empty() {
1055            let _ = self
1056                .channel
1057                .send(&format!(
1058                    "Current dump format: {:?}. Use `/dump-format json|raw|trace` to change.",
1059                    self.debug_state.dump_format
1060                ))
1061                .await;
1062            return;
1063        }
1064        let new_format = match arg {
1065            "json" => crate::debug_dump::DumpFormat::Json,
1066            "raw" => crate::debug_dump::DumpFormat::Raw,
1067            "trace" => crate::debug_dump::DumpFormat::Trace,
1068            other => {
1069                let _ = self
1070                    .channel
1071                    .send(&format!(
1072                        "Unknown format '{other}'. Valid values: json, raw, trace."
1073                    ))
1074                    .await;
1075                return;
1076            }
1077        };
1078        let was_trace = self.debug_state.dump_format == crate::debug_dump::DumpFormat::Trace;
1079        let now_trace = new_format == crate::debug_dump::DumpFormat::Trace;
1080
1081        // CR-04: when switching TO trace, create a fresh TracingCollector.
1082        if now_trace
1083            && !was_trace
1084            && let Some(ref dump_dir) = self.debug_state.dump_dir.clone()
1085        {
1086            let service_name = self.debug_state.trace_service_name.clone();
1087            let redact = self.debug_state.trace_redact;
1088            match crate::debug_dump::trace::TracingCollector::new(
1089                dump_dir.as_path(),
1090                &service_name,
1091                redact,
1092                None,
1093            ) {
1094                Ok(collector) => {
1095                    self.debug_state.trace_collector = Some(collector);
1096                }
1097                Err(e) => {
1098                    tracing::warn!(error = %e, "failed to create TracingCollector on format switch");
1099                }
1100            }
1101        }
1102        // CR-04: when switching AWAY from trace, flush and drop the collector.
1103        if was_trace
1104            && !now_trace
1105            && let Some(mut tc) = self.debug_state.trace_collector.take()
1106        {
1107            tc.finish();
1108        }
1109
1110        self.debug_state.dump_format = new_format;
1111        let _ = self
1112            .channel
1113            .send(&format!("Debug dump format set to: {arg}"))
1114            .await;
1115    }
1116
1117    async fn resolve_message(
1118        &self,
1119        msg: crate::channel::ChannelMessage,
1120    ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
1121        use crate::channel::{Attachment, AttachmentKind};
1122        use zeph_llm::provider::{ImageData, MessagePart};
1123
1124        let text_base = msg.text.clone();
1125
1126        let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
1127            .attachments
1128            .into_iter()
1129            .partition(|a| a.kind == AttachmentKind::Audio);
1130
1131        tracing::debug!(
1132            audio = audio_attachments.len(),
1133            has_stt = self.providers.stt.is_some(),
1134            "resolve_message attachments"
1135        );
1136
1137        let text = if !audio_attachments.is_empty()
1138            && let Some(stt) = self.providers.stt.as_ref()
1139        {
1140            let mut transcribed_parts = Vec::new();
1141            for attachment in &audio_attachments {
1142                if attachment.data.len() > MAX_AUDIO_BYTES {
1143                    tracing::warn!(
1144                        size = attachment.data.len(),
1145                        max = MAX_AUDIO_BYTES,
1146                        "audio attachment exceeds size limit, skipping"
1147                    );
1148                    continue;
1149                }
1150                match stt
1151                    .transcribe(&attachment.data, attachment.filename.as_deref())
1152                    .await
1153                {
1154                    Ok(result) => {
1155                        tracing::info!(
1156                            len = result.text.len(),
1157                            language = ?result.language,
1158                            "audio transcribed"
1159                        );
1160                        transcribed_parts.push(result.text);
1161                    }
1162                    Err(e) => {
1163                        tracing::error!(error = %e, "audio transcription failed");
1164                    }
1165                }
1166            }
1167            if transcribed_parts.is_empty() {
1168                text_base
1169            } else {
1170                let transcribed = transcribed_parts.join("\n");
1171                if text_base.is_empty() {
1172                    transcribed
1173                } else {
1174                    format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
1175                }
1176            }
1177        } else {
1178            if !audio_attachments.is_empty() {
1179                tracing::warn!(
1180                    count = audio_attachments.len(),
1181                    "audio attachments received but no STT provider configured, dropping"
1182                );
1183            }
1184            text_base
1185        };
1186
1187        let mut image_parts = Vec::new();
1188        for attachment in image_attachments {
1189            if attachment.data.len() > MAX_IMAGE_BYTES {
1190                tracing::warn!(
1191                    size = attachment.data.len(),
1192                    max = MAX_IMAGE_BYTES,
1193                    "image attachment exceeds size limit, skipping"
1194                );
1195                continue;
1196            }
1197            let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
1198            image_parts.push(MessagePart::Image(Box::new(ImageData {
1199                data: attachment.data,
1200                mime_type,
1201            })));
1202        }
1203
1204        (text, image_parts)
1205    }
1206
1207    async fn process_user_message(
1208        &mut self,
1209        text: String,
1210        image_parts: Vec<zeph_llm::provider::MessagePart>,
1211    ) -> Result<(), error::AgentError> {
1212        // Record iteration start in trace collector (C-02: owned guard, no borrow held).
1213        let iteration_index = self.debug_state.iteration_counter;
1214        self.debug_state.iteration_counter += 1;
1215        if let Some(ref mut tc) = self.debug_state.trace_collector {
1216            tc.begin_iteration(iteration_index, text.trim());
1217            // CR-01: store the span ID so LLM/tool execution can attach child spans.
1218            self.debug_state.current_iteration_span_id =
1219                tc.current_iteration_span_id(iteration_index);
1220        }
1221
1222        let result = self
1223            .process_user_message_inner(text, image_parts, iteration_index)
1224            .await;
1225
1226        // Close iteration span regardless of outcome (partial trace preserved on error).
1227        if let Some(ref mut tc) = self.debug_state.trace_collector {
1228            let status = if result.is_ok() {
1229                crate::debug_dump::trace::SpanStatus::Ok
1230            } else {
1231                crate::debug_dump::trace::SpanStatus::Error {
1232                    message: "iteration failed".to_owned(),
1233                }
1234            };
1235            tc.end_iteration(iteration_index, status);
1236        }
1237        self.debug_state.current_iteration_span_id = None;
1238
1239        result
1240    }
1241
1242    async fn process_user_message_inner(
1243        &mut self,
1244        text: String,
1245        image_parts: Vec<zeph_llm::provider::MessagePart>,
1246        iteration_index: usize,
1247    ) -> Result<(), error::AgentError> {
1248        let _ = iteration_index; // Used indirectly via debug_state.current_iteration_span_id.
1249        self.lifecycle.cancel_token = CancellationToken::new();
1250        let signal = Arc::clone(&self.lifecycle.cancel_signal);
1251        let token = self.lifecycle.cancel_token.clone();
1252        tokio::spawn(async move {
1253            signal.notified().await;
1254            token.cancel();
1255        });
1256        let trimmed = text.trim();
1257
1258        if let Some(result) = self.dispatch_slash_command(trimmed).await {
1259            return result;
1260        }
1261
1262        self.check_pending_rollbacks().await;
1263
1264        if self.pre_process_security(trimmed).await? {
1265            return Ok(());
1266        }
1267
1268        self.advance_context_lifecycle(&text, trimmed).await;
1269
1270        let user_msg = self.build_user_message(&text, image_parts);
1271
1272        // Extract URLs from user input and add to user_provided_urls for grounding checks.
1273        let urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
1274        if !urls.is_empty()
1275            && let Ok(mut set) = self.security.user_provided_urls.write()
1276        {
1277            set.extend(urls);
1278        }
1279
1280        // Capture raw user input as goal text for A-MAC goal-conditioned write gating (#2483).
1281        // Derived from the raw input text before context assembly to avoid timing dependencies.
1282        self.memory_state.goal_text = Some(text.clone());
1283
1284        // Image parts intentionally excluded — base64 payloads too large for message history.
1285        self.persist_message(Role::User, &text, &[], false).await;
1286        self.push_message(user_msg);
1287
1288        if let Err(e) = self.process_response().await {
1289            tracing::error!("Response processing failed: {e:#}");
1290            let user_msg = format!("Error: {e:#}");
1291            self.channel.send(&user_msg).await?;
1292            self.msg.messages.pop();
1293            self.recompute_prompt_tokens();
1294            self.channel.flush_chunks().await?;
1295        }
1296
1297        Ok(())
1298    }
1299
1300    // Returns true if the input was blocked and the caller should return Ok(()) immediately.
1301    async fn pre_process_security(&mut self, trimmed: &str) -> Result<bool, error::AgentError> {
1302        // Guardrail: LLM-based prompt injection pre-screening at the user input boundary.
1303        if let Some(ref guardrail) = self.security.guardrail {
1304            use zeph_sanitizer::guardrail::GuardrailVerdict;
1305            let verdict = guardrail.check(trimmed).await;
1306            match &verdict {
1307                GuardrailVerdict::Flagged { reason, .. } => {
1308                    tracing::warn!(
1309                        reason = %reason,
1310                        should_block = verdict.should_block(),
1311                        "guardrail flagged user input"
1312                    );
1313                    if verdict.should_block() {
1314                        let msg = format!("[guardrail] Input blocked: {reason}");
1315                        let _ = self.channel.send(&msg).await;
1316                        let _ = self.channel.flush_chunks().await;
1317                        return Ok(true);
1318                    }
1319                    // Warn mode: notify but continue.
1320                    let _ = self
1321                        .channel
1322                        .send(&format!("[guardrail] Warning: {reason}"))
1323                        .await;
1324                }
1325                GuardrailVerdict::Error { error } => {
1326                    if guardrail.error_should_block() {
1327                        tracing::warn!(%error, "guardrail check failed (fail_strategy=closed), blocking input");
1328                        let msg = "[guardrail] Input blocked: check failed (see logs for details)";
1329                        let _ = self.channel.send(msg).await;
1330                        let _ = self.channel.flush_chunks().await;
1331                        return Ok(true);
1332                    }
1333                    tracing::warn!(%error, "guardrail check failed (fail_strategy=open), allowing input");
1334                }
1335                GuardrailVerdict::Safe => {}
1336            }
1337        }
1338
1339        // ML classifier: lightweight injection detection on user input boundary.
1340        // Runs after guardrail (LLM-based) to layer defenses. On detection, blocks and returns.
1341        // Falls back to regex on classifier error/timeout — never degrades below regex baseline.
1342        // Gated by `scan_user_input`: DeBERTa is tuned for external/untrusted content, not
1343        // direct user chat. Disabled by default to prevent false positives on benign messages.
1344        #[cfg(feature = "classifiers")]
1345        if self.security.sanitizer.scan_user_input() {
1346            match self.security.sanitizer.classify_injection(trimmed).await {
1347                zeph_sanitizer::InjectionVerdict::Blocked => {
1348                    self.push_classifier_metrics();
1349                    let _ = self
1350                        .channel
1351                        .send("[security] Input blocked: injection detected by classifier.")
1352                        .await;
1353                    let _ = self.channel.flush_chunks().await;
1354                    return Ok(true);
1355                }
1356                zeph_sanitizer::InjectionVerdict::Suspicious => {
1357                    tracing::warn!("injection_classifier soft_signal on user input");
1358                }
1359                zeph_sanitizer::InjectionVerdict::Clean => {}
1360            }
1361        }
1362        #[cfg(feature = "classifiers")]
1363        self.push_classifier_metrics();
1364
1365        Ok(false)
1366    }
1367
1368    async fn advance_context_lifecycle(&mut self, text: &str, trimmed: &str) {
1369        // Reset per-message pruning cache at the start of each turn (#2298).
1370        self.mcp.pruning_cache.reset();
1371
1372        // Extract before rebuild_system_prompt so the value is not tainted
1373        // by the secrets-bearing system prompt (ConversationId is just an i64).
1374        let conv_id = self.memory_state.conversation_id;
1375        self.rebuild_system_prompt(text).await;
1376
1377        self.detect_and_record_corrections(trimmed, conv_id).await;
1378        self.learning_engine.tick();
1379        self.analyze_and_learn().await;
1380        self.sync_graph_counts().await;
1381
1382        // Reset per-turn compaction guard FIRST so SideQuest sees a clean slate (C2 fix).
1383        // complete_focus and maybe_sidequest_eviction set this flag when they run (C1 fix).
1384        // advance_turn() transitions CompactedThisTurn → Cooling/Ready; all other states
1385        // pass through unchanged. See CompactionState::advance_turn for ordering guarantees.
1386        self.context_manager.compaction = self.context_manager.compaction.advance_turn();
1387
1388        // Tick Focus Agent and SideQuest turn counters (#1850, #1885).
1389        {
1390            self.focus.tick();
1391
1392            // SideQuest eviction: runs every N user turns when enabled.
1393            // Skipped when is_compacted_this_turn (focus truncation or prior eviction ran).
1394            let sidequest_should_fire = self.sidequest.tick();
1395            if sidequest_should_fire && !self.context_manager.compaction.is_compacted_this_turn() {
1396                self.maybe_sidequest_eviction();
1397            }
1398        }
1399
1400        // Tier 0: batch-apply deferred tool summaries when approaching context limit.
1401        // This is a pure in-memory operation (no LLM call) — summaries were pre-computed
1402        // during the tool loop. Intentionally does NOT set compacted_this_turn, so
1403        // proactive/reactive compaction may still fire if tokens remain above their thresholds.
1404        self.maybe_apply_deferred_summaries();
1405        self.flush_deferred_summaries().await;
1406
1407        // Proactive compression fires first (if configured); if it runs, reactive is skipped.
1408        if let Err(e) = self.maybe_proactive_compress().await {
1409            tracing::warn!("proactive compression failed: {e:#}");
1410        }
1411
1412        if let Err(e) = self.maybe_compact().await {
1413            tracing::warn!("context compaction failed: {e:#}");
1414        }
1415
1416        if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
1417            tracing::warn!("context preparation failed: {e:#}");
1418        }
1419
1420        // MAR: propagate top-1 recall confidence to the router for cost-aware routing.
1421        self.provider
1422            .set_memory_confidence(self.memory_state.last_recall_confidence);
1423
1424        self.learning_engine.reset_reflection();
1425    }
1426
1427    fn build_user_message(
1428        &mut self,
1429        text: &str,
1430        image_parts: Vec<zeph_llm::provider::MessagePart>,
1431    ) -> Message {
1432        let mut all_image_parts = std::mem::take(&mut self.msg.pending_image_parts);
1433        all_image_parts.extend(image_parts);
1434
1435        if !all_image_parts.is_empty() && self.provider.supports_vision() {
1436            let mut parts = vec![zeph_llm::provider::MessagePart::Text {
1437                text: text.to_owned(),
1438            }];
1439            parts.extend(all_image_parts);
1440            Message::from_parts(Role::User, parts)
1441        } else {
1442            if !all_image_parts.is_empty() {
1443                tracing::warn!(
1444                    count = all_image_parts.len(),
1445                    "image attachments dropped: provider does not support vision"
1446                );
1447            }
1448            Message {
1449                role: Role::User,
1450                content: text.to_owned(),
1451                parts: vec![],
1452                metadata: MessageMetadata::default(),
1453            }
1454        }
1455    }
1456
1457    /// Poll a sub-agent until it reaches a terminal state, bridging secret requests to the
1458    /// channel. Returns a human-readable status string suitable for sending to the user.
1459    async fn poll_subagent_until_done(&mut self, task_id: &str, label: &str) -> Option<String> {
1460        use crate::subagent::SubAgentState;
1461        let result = loop {
1462            tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1463
1464            // Bridge secret requests from sub-agent to channel.confirm().
1465            // Fetch the pending request first, then release the borrow before
1466            // calling channel.confirm() (which requires &mut self).
1467            #[allow(clippy::redundant_closure_for_method_calls)]
1468            let pending = self
1469                .orchestration
1470                .subagent_manager
1471                .as_mut()
1472                .and_then(|m| m.try_recv_secret_request());
1473            if let Some((req_task_id, req)) = pending {
1474                // req.secret_key is pre-validated to [a-zA-Z0-9_-] in manager.rs
1475                // (SEC-P1-02), so it is safe to embed in the prompt string.
1476                let confirm_prompt = format!(
1477                    "Sub-agent requests secret '{}'. Allow?",
1478                    crate::text::truncate_to_chars(&req.secret_key, 100)
1479                );
1480                let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
1481                if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1482                    if approved {
1483                        let ttl = std::time::Duration::from_secs(300);
1484                        let key = req.secret_key.clone();
1485                        if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
1486                            let _ = mgr.deliver_secret(&req_task_id, key);
1487                        }
1488                    } else {
1489                        let _ = mgr.deny_secret(&req_task_id);
1490                    }
1491                }
1492            }
1493
1494            let mgr = self.orchestration.subagent_manager.as_ref()?;
1495            let statuses = mgr.statuses();
1496            let Some((_, status)) = statuses.iter().find(|(id, _)| id == task_id) else {
1497                break format!("{label} completed (no status available).");
1498            };
1499            match status.state {
1500                SubAgentState::Completed => {
1501                    let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
1502                    break format!("{label} completed: {msg}");
1503                }
1504                SubAgentState::Failed => {
1505                    let msg = status
1506                        .last_message
1507                        .clone()
1508                        .unwrap_or_else(|| "unknown error".into());
1509                    break format!("{label} failed: {msg}");
1510                }
1511                SubAgentState::Canceled => {
1512                    break format!("{label} was cancelled.");
1513                }
1514                _ => {
1515                    let _ = self
1516                        .channel
1517                        .send_status(&format!(
1518                            "{label}: turn {}/{}",
1519                            status.turns_used,
1520                            self.orchestration
1521                                .subagent_manager
1522                                .as_ref()
1523                                .and_then(|m| m.agents_def(task_id))
1524                                .map_or(20, |d| d.permissions.max_turns)
1525                        ))
1526                        .await;
1527                }
1528            }
1529        };
1530        Some(result)
1531    }
1532
1533    /// Resolve a unique full `task_id` from a prefix. Returns `None` if the manager is absent,
1534    /// `Some(Err(msg))` on ambiguity/not-found, `Some(Ok(full_id))` on success.
1535    fn resolve_agent_id_prefix(&mut self, prefix: &str) -> Option<Result<String, String>> {
1536        let mgr = self.orchestration.subagent_manager.as_mut()?;
1537        let full_ids: Vec<String> = mgr
1538            .statuses()
1539            .into_iter()
1540            .map(|(tid, _)| tid)
1541            .filter(|tid| tid.starts_with(prefix))
1542            .collect();
1543        Some(match full_ids.as_slice() {
1544            [] => Err(format!("No sub-agent with id prefix '{prefix}'")),
1545            [fid] => Ok(fid.clone()),
1546            _ => Err(format!(
1547                "Ambiguous id prefix '{prefix}': matches {} agents",
1548                full_ids.len()
1549            )),
1550        })
1551    }
1552
1553    fn handle_agent_list(&self) -> Option<String> {
1554        use std::fmt::Write as _;
1555        let mgr = self.orchestration.subagent_manager.as_ref()?;
1556        let defs = mgr.definitions();
1557        if defs.is_empty() {
1558            return Some("No sub-agent definitions found.".into());
1559        }
1560        let mut out = String::from("Available sub-agents:\n");
1561        for d in defs {
1562            let memory_label = match d.memory {
1563                Some(crate::subagent::MemoryScope::User) => " [memory:user]",
1564                Some(crate::subagent::MemoryScope::Project) => " [memory:project]",
1565                Some(crate::subagent::MemoryScope::Local) => " [memory:local]",
1566                None => "",
1567            };
1568            if let Some(ref src) = d.source {
1569                let _ = writeln!(
1570                    out,
1571                    "  {}{} — {} ({})",
1572                    d.name, memory_label, d.description, src
1573                );
1574            } else {
1575                let _ = writeln!(out, "  {}{} — {}", d.name, memory_label, d.description);
1576            }
1577        }
1578        Some(out)
1579    }
1580
1581    fn handle_agent_status(&self) -> Option<String> {
1582        use std::fmt::Write as _;
1583        let mgr = self.orchestration.subagent_manager.as_ref()?;
1584        let statuses = mgr.statuses();
1585        if statuses.is_empty() {
1586            return Some("No active sub-agents.".into());
1587        }
1588        let mut out = String::from("Active sub-agents:\n");
1589        for (id, s) in &statuses {
1590            let state = format!("{:?}", s.state).to_lowercase();
1591            let elapsed = s.started_at.elapsed().as_secs();
1592            let _ = writeln!(
1593                out,
1594                "  [{short}] {state}  turns={t}  elapsed={elapsed}s  {msg}",
1595                short = &id[..8.min(id.len())],
1596                t = s.turns_used,
1597                msg = s.last_message.as_deref().unwrap_or(""),
1598            );
1599            // Show memory directory path for agents with memory enabled.
1600            if let Some(def) = mgr.agents_def(id)
1601                && let Some(scope) = def.memory
1602                && let Ok(dir) = crate::subagent::memory::resolve_memory_dir(scope, &def.name)
1603            {
1604                let _ = writeln!(out, "       memory: {}", dir.display());
1605            }
1606        }
1607        Some(out)
1608    }
1609
1610    fn handle_agent_approve(&mut self, id: &str) -> Option<String> {
1611        let full_id = match self.resolve_agent_id_prefix(id)? {
1612            Ok(fid) => fid,
1613            Err(msg) => return Some(msg),
1614        };
1615        let mgr = self.orchestration.subagent_manager.as_mut()?;
1616        if let Some((tid, req)) = mgr.try_recv_secret_request()
1617            && tid == full_id
1618        {
1619            let key = req.secret_key.clone();
1620            let ttl = std::time::Duration::from_secs(300);
1621            if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
1622                return Some(format!("Approve failed: {e}"));
1623            }
1624            if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
1625                return Some(format!("Secret delivery failed: {e}"));
1626            }
1627            return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
1628        }
1629        Some(format!(
1630            "No pending secret request for sub-agent '{full_id}'."
1631        ))
1632    }
1633
1634    fn handle_agent_deny(&mut self, id: &str) -> Option<String> {
1635        let full_id = match self.resolve_agent_id_prefix(id)? {
1636            Ok(fid) => fid,
1637            Err(msg) => return Some(msg),
1638        };
1639        let mgr = self.orchestration.subagent_manager.as_mut()?;
1640        match mgr.deny_secret(&full_id) {
1641            Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
1642            Err(e) => Some(format!("Deny failed: {e}")),
1643        }
1644    }
1645
1646    #[allow(clippy::too_many_lines)]
1647    async fn handle_agent_command(&mut self, cmd: crate::subagent::AgentCommand) -> Option<String> {
1648        use crate::subagent::AgentCommand;
1649
1650        match cmd {
1651            AgentCommand::List => self.handle_agent_list(),
1652            AgentCommand::Background { name, prompt } => {
1653                let provider = self.provider.clone();
1654                let tool_executor = Arc::clone(&self.tool_executor);
1655                let skills = self.filtered_skills_for(&name);
1656                let cfg = self.orchestration.subagent_config.clone();
1657                let spawn_ctx = self.build_spawn_context(&cfg);
1658                let mgr = self.orchestration.subagent_manager.as_mut()?;
1659                match mgr.spawn(
1660                    &name,
1661                    &prompt,
1662                    provider,
1663                    tool_executor,
1664                    skills,
1665                    &cfg,
1666                    spawn_ctx,
1667                ) {
1668                    Ok(id) => Some(format!(
1669                        "Sub-agent '{name}' started in background (id: {short})",
1670                        short = &id[..8.min(id.len())]
1671                    )),
1672                    Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
1673                }
1674            }
1675            AgentCommand::Spawn { name, prompt }
1676            | AgentCommand::Mention {
1677                agent: name,
1678                prompt,
1679            } => {
1680                // Foreground spawn: launch and await completion, streaming status to user.
1681                let provider = self.provider.clone();
1682                let tool_executor = Arc::clone(&self.tool_executor);
1683                let skills = self.filtered_skills_for(&name);
1684                let cfg = self.orchestration.subagent_config.clone();
1685                let spawn_ctx = self.build_spawn_context(&cfg);
1686                let mgr = self.orchestration.subagent_manager.as_mut()?;
1687                let task_id = match mgr.spawn(
1688                    &name,
1689                    &prompt,
1690                    provider,
1691                    tool_executor,
1692                    skills,
1693                    &cfg,
1694                    spawn_ctx,
1695                ) {
1696                    Ok(id) => id,
1697                    Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
1698                };
1699                let short = task_id[..8.min(task_id.len())].to_owned();
1700                let _ = self
1701                    .channel
1702                    .send(&format!("Sub-agent '{name}' running... (id: {short})"))
1703                    .await;
1704                let label = format!("Sub-agent '{name}'");
1705                self.poll_subagent_until_done(&task_id, &label).await
1706            }
1707            AgentCommand::Status => self.handle_agent_status(),
1708            AgentCommand::Cancel { id } => {
1709                let mgr = self.orchestration.subagent_manager.as_mut()?;
1710                // Accept prefix match on task_id.
1711                let ids: Vec<String> = mgr
1712                    .statuses()
1713                    .into_iter()
1714                    .map(|(task_id, _)| task_id)
1715                    .filter(|task_id| task_id.starts_with(&id))
1716                    .collect();
1717                match ids.as_slice() {
1718                    [] => Some(format!("No sub-agent with id prefix '{id}'")),
1719                    [full_id] => {
1720                        let full_id = full_id.clone();
1721                        match mgr.cancel(&full_id) {
1722                            Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
1723                            Err(e) => Some(format!("Cancel failed: {e}")),
1724                        }
1725                    }
1726                    _ => Some(format!(
1727                        "Ambiguous id prefix '{id}': matches {} agents",
1728                        ids.len()
1729                    )),
1730                }
1731            }
1732            AgentCommand::Approve { id } => self.handle_agent_approve(&id),
1733            AgentCommand::Deny { id } => self.handle_agent_deny(&id),
1734            AgentCommand::Resume { id, prompt } => {
1735                let cfg = self.orchestration.subagent_config.clone();
1736                // Resolve definition name from transcript meta before spawning so we can
1737                // look up skills by definition name rather than the UUID prefix (S1 fix).
1738                let def_name = {
1739                    let mgr = self.orchestration.subagent_manager.as_ref()?;
1740                    match mgr.def_name_for_resume(&id, &cfg) {
1741                        Ok(name) => name,
1742                        Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
1743                    }
1744                };
1745                let skills = self.filtered_skills_for(&def_name);
1746                let provider = self.provider.clone();
1747                let tool_executor = Arc::clone(&self.tool_executor);
1748                let mgr = self.orchestration.subagent_manager.as_mut()?;
1749                let (task_id, _) =
1750                    match mgr.resume(&id, &prompt, provider, tool_executor, skills, &cfg) {
1751                        Ok(pair) => pair,
1752                        Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
1753                    };
1754                let short = task_id[..8.min(task_id.len())].to_owned();
1755                let _ = self
1756                    .channel
1757                    .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
1758                    .await;
1759                self.poll_subagent_until_done(&task_id, "Resumed sub-agent")
1760                    .await
1761            }
1762        }
1763    }
1764
1765    fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
1766        let mgr = self.orchestration.subagent_manager.as_ref()?;
1767        let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
1768        let reg = self
1769            .skill_state
1770            .registry
1771            .read()
1772            .expect("registry read lock");
1773        match crate::subagent::filter_skills(&reg, &def.skills) {
1774            Ok(skills) => {
1775                let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
1776                if bodies.is_empty() {
1777                    None
1778                } else {
1779                    Some(bodies)
1780                }
1781            }
1782            Err(e) => {
1783                tracing::warn!(error = %e, "skill filtering failed for sub-agent");
1784                None
1785            }
1786        }
1787    }
1788
1789    /// Build a `SpawnContext` from current agent state for sub-agent spawning.
1790    fn build_spawn_context(
1791        &self,
1792        cfg: &zeph_config::SubAgentConfig,
1793    ) -> crate::subagent::SpawnContext {
1794        crate::subagent::SpawnContext {
1795            parent_messages: self.extract_parent_messages(cfg),
1796            parent_cancel: Some(self.lifecycle.cancel_token.clone()),
1797            parent_provider_name: {
1798                let name = &self.runtime.active_provider_name;
1799                if name.is_empty() {
1800                    None
1801                } else {
1802                    Some(name.clone())
1803                }
1804            },
1805            spawn_depth: self.runtime.spawn_depth,
1806            mcp_tool_names: self.extract_mcp_tool_names(),
1807        }
1808    }
1809
1810    /// Extract recent parent messages for history propagation (Section 5.7 in spec).
1811    ///
1812    /// Filters system messages, takes last `context_window_turns * 2` messages,
1813    /// and applies a 25% context window cap using a 4-chars-per-token heuristic.
1814    fn extract_parent_messages(
1815        &self,
1816        config: &zeph_config::SubAgentConfig,
1817    ) -> Vec<zeph_llm::provider::Message> {
1818        use zeph_llm::provider::Role;
1819        if config.context_window_turns == 0 {
1820            return Vec::new();
1821        }
1822        let non_system: Vec<_> = self
1823            .msg
1824            .messages
1825            .iter()
1826            .filter(|m| m.role != Role::System)
1827            .cloned()
1828            .collect();
1829        let take_count = config.context_window_turns * 2;
1830        let start = non_system.len().saturating_sub(take_count);
1831        let mut msgs = non_system[start..].to_vec();
1832
1833        // Cap at 25% of model context window (rough 4-chars-per-token heuristic).
1834        let max_chars = 128_000usize / 4; // conservative default; 25% of 128K tokens
1835        let mut total_chars: usize = 0;
1836        let mut keep = msgs.len();
1837        for (i, m) in msgs.iter().enumerate() {
1838            total_chars += m.content.len();
1839            if total_chars > max_chars {
1840                keep = i;
1841                break;
1842            }
1843        }
1844        if keep < msgs.len() {
1845            tracing::info!(
1846                kept = keep,
1847                requested = config.context_window_turns * 2,
1848                "[subagent] truncated parent history from {} to {} turns due to token budget",
1849                config.context_window_turns * 2,
1850                keep
1851            );
1852            msgs.truncate(keep);
1853        }
1854        msgs
1855    }
1856
1857    /// Extract MCP tool names from the tool executor for diagnostic annotation.
1858    fn extract_mcp_tool_names(&self) -> Vec<String> {
1859        self.tool_executor
1860            .tool_definitions_erased()
1861            .into_iter()
1862            .filter(|t| t.id.starts_with("mcp_"))
1863            .map(|t| t.id.to_string())
1864            .collect()
1865    }
1866
1867    /// Update trust DB records for all reloaded skills.
1868    async fn update_trust_for_reloaded_skills(&self, all_meta: &[zeph_skills::loader::SkillMeta]) {
1869        let Some(ref memory) = self.memory_state.memory else {
1870            return;
1871        };
1872        let trust_cfg = self.skill_state.trust_config.clone();
1873        let managed_dir = self.skill_state.managed_dir.clone();
1874        for meta in all_meta {
1875            let source_kind = if managed_dir
1876                .as_ref()
1877                .is_some_and(|d| meta.skill_dir.starts_with(d))
1878            {
1879                zeph_memory::store::SourceKind::Hub
1880            } else {
1881                zeph_memory::store::SourceKind::Local
1882            };
1883            let initial_level = if matches!(source_kind, zeph_memory::store::SourceKind::Hub) {
1884                &trust_cfg.default_level
1885            } else {
1886                &trust_cfg.local_level
1887            };
1888            match zeph_skills::compute_skill_hash(&meta.skill_dir) {
1889                Ok(current_hash) => {
1890                    let existing = memory
1891                        .sqlite()
1892                        .load_skill_trust(&meta.name)
1893                        .await
1894                        .ok()
1895                        .flatten();
1896                    let trust_level_str = if let Some(ref row) = existing {
1897                        if row.blake3_hash == current_hash {
1898                            row.trust_level.clone()
1899                        } else {
1900                            trust_cfg.hash_mismatch_level.to_string()
1901                        }
1902                    } else {
1903                        initial_level.to_string()
1904                    };
1905                    let source_path = meta.skill_dir.to_str();
1906                    if let Err(e) = memory
1907                        .sqlite()
1908                        .upsert_skill_trust(
1909                            &meta.name,
1910                            &trust_level_str,
1911                            source_kind,
1912                            None,
1913                            source_path,
1914                            &current_hash,
1915                        )
1916                        .await
1917                    {
1918                        tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
1919                    }
1920                }
1921                Err(e) => {
1922                    tracing::warn!("failed to compute hash for '{}': {e:#}", meta.name);
1923                }
1924            }
1925        }
1926    }
1927
1928    /// Rebuild or sync the in-memory skill matcher and BM25 index after a registry update.
1929    async fn rebuild_skill_matcher(&mut self, all_meta: &[&zeph_skills::loader::SkillMeta]) {
1930        let provider = self.embedding_provider.clone();
1931        let embed_fn = |text: &str| -> zeph_skills::matcher::EmbedFuture {
1932            let owned = text.to_owned();
1933            let p = provider.clone();
1934            Box::pin(async move { p.embed(&owned).await })
1935        };
1936
1937        let needs_inmemory_rebuild = !self
1938            .skill_state
1939            .matcher
1940            .as_ref()
1941            .is_some_and(SkillMatcherBackend::is_qdrant);
1942
1943        if needs_inmemory_rebuild {
1944            self.skill_state.matcher = SkillMatcher::new(all_meta, embed_fn)
1945                .await
1946                .map(SkillMatcherBackend::InMemory);
1947        } else if let Some(ref mut backend) = self.skill_state.matcher {
1948            let _ = self.channel.send_status("syncing skill index...").await;
1949            if let Err(e) = backend
1950                .sync(all_meta, &self.skill_state.embedding_model, embed_fn)
1951                .await
1952            {
1953                tracing::warn!("failed to sync skill embeddings: {e:#}");
1954            }
1955        }
1956
1957        if self.skill_state.hybrid_search {
1958            let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
1959            let _ = self.channel.send_status("rebuilding search index...").await;
1960            self.skill_state.bm25_index = Some(zeph_skills::bm25::Bm25Index::build(&descs));
1961        }
1962    }
1963
1964    async fn reload_skills(&mut self) {
1965        let new_registry = SkillRegistry::load(&self.skill_state.skill_paths);
1966        if new_registry.fingerprint()
1967            == self
1968                .skill_state
1969                .registry
1970                .read()
1971                .expect("registry read lock")
1972                .fingerprint()
1973        {
1974            return;
1975        }
1976        let _ = self.channel.send_status("reloading skills...").await;
1977        *self
1978            .skill_state
1979            .registry
1980            .write()
1981            .expect("registry write lock") = new_registry;
1982
1983        let all_meta = self
1984            .skill_state
1985            .registry
1986            .read()
1987            .expect("registry read lock")
1988            .all_meta()
1989            .into_iter()
1990            .cloned()
1991            .collect::<Vec<_>>();
1992
1993        self.update_trust_for_reloaded_skills(&all_meta).await;
1994
1995        let all_meta_refs = all_meta.iter().collect::<Vec<_>>();
1996        self.rebuild_skill_matcher(&all_meta_refs).await;
1997
1998        let all_skills: Vec<Skill> = {
1999            let reg = self
2000                .skill_state
2001                .registry
2002                .read()
2003                .expect("registry read lock");
2004            reg.all_meta()
2005                .iter()
2006                .filter_map(|m| reg.get_skill(&m.name).ok())
2007                .collect()
2008        };
2009        let trust_map = self.build_skill_trust_map().await;
2010        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
2011        let skills_prompt = format_skills_prompt(&all_skills, &trust_map, &empty_health);
2012        self.skill_state
2013            .last_skills_prompt
2014            .clone_from(&skills_prompt);
2015        let system_prompt = build_system_prompt(&skills_prompt, None, None, false);
2016        if let Some(msg) = self.msg.messages.first_mut() {
2017            msg.content = system_prompt;
2018        }
2019
2020        let _ = self.channel.send_status("").await;
2021        tracing::info!(
2022            "reloaded {} skill(s)",
2023            self.skill_state
2024                .registry
2025                .read()
2026                .expect("registry read lock")
2027                .all_meta()
2028                .len()
2029        );
2030    }
2031
2032    fn reload_instructions(&mut self) {
2033        // Drain any additional queued events before reloading to avoid redundant reloads.
2034        if let Some(ref mut rx) = self.instructions.reload_rx {
2035            while rx.try_recv().is_ok() {}
2036        }
2037        let Some(ref state) = self.instructions.reload_state else {
2038            return;
2039        };
2040        let new_blocks = crate::instructions::load_instructions(
2041            &state.base_dir,
2042            &state.provider_kinds,
2043            &state.explicit_files,
2044            state.auto_detect,
2045        );
2046        let old_sources: std::collections::HashSet<_> =
2047            self.instructions.blocks.iter().map(|b| &b.source).collect();
2048        let new_sources: std::collections::HashSet<_> =
2049            new_blocks.iter().map(|b| &b.source).collect();
2050        for added in new_sources.difference(&old_sources) {
2051            tracing::info!(path = %added.display(), "instruction file added");
2052        }
2053        for removed in old_sources.difference(&new_sources) {
2054            tracing::info!(path = %removed.display(), "instruction file removed");
2055        }
2056        tracing::info!(
2057            old_count = self.instructions.blocks.len(),
2058            new_count = new_blocks.len(),
2059            "reloaded instruction files"
2060        );
2061        self.instructions.blocks = new_blocks;
2062    }
2063
2064    fn reload_config(&mut self) {
2065        let Some(ref path) = self.lifecycle.config_path else {
2066            return;
2067        };
2068        let config = match Config::load(path) {
2069            Ok(c) => c,
2070            Err(e) => {
2071                tracing::warn!("config reload failed: {e:#}");
2072                return;
2073            }
2074        };
2075
2076        self.runtime.security = config.security;
2077        self.runtime.timeouts = config.timeouts;
2078        self.runtime.redact_credentials = config.memory.redact_credentials;
2079        self.memory_state.history_limit = config.memory.history_limit;
2080        self.memory_state.recall_limit = config.memory.semantic.recall_limit;
2081        self.memory_state.summarization_threshold = config.memory.summarization_threshold;
2082        self.skill_state.max_active_skills = config.skills.max_active_skills;
2083        self.skill_state.disambiguation_threshold = config.skills.disambiguation_threshold;
2084        self.skill_state.min_injection_score = config.skills.min_injection_score;
2085        self.skill_state.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
2086        self.skill_state.hybrid_search = config.skills.hybrid_search;
2087        self.skill_state.two_stage_matching = config.skills.two_stage_matching;
2088        self.skill_state.confusability_threshold =
2089            config.skills.confusability_threshold.clamp(0.0, 1.0);
2090        config
2091            .skills
2092            .generation_provider
2093            .as_str()
2094            .clone_into(&mut self.skill_state.generation_provider_name);
2095        self.skill_state.generation_output_dir =
2096            config.skills.generation_output_dir.as_deref().map(|p| {
2097                if let Some(stripped) = p.strip_prefix("~/") {
2098                    dirs::home_dir()
2099                        .map_or_else(|| std::path::PathBuf::from(p), |h| h.join(stripped))
2100                } else {
2101                    std::path::PathBuf::from(p)
2102                }
2103            });
2104
2105        if config.memory.context_budget_tokens > 0 {
2106            self.context_manager.budget = Some(
2107                ContextBudget::new(config.memory.context_budget_tokens, 0.20)
2108                    .with_graph_enabled(config.memory.graph.enabled),
2109            );
2110        } else {
2111            self.context_manager.budget = None;
2112        }
2113
2114        {
2115            let graph_cfg = &config.memory.graph;
2116            if graph_cfg.rpe.enabled {
2117                // Re-create router only if it doesn't exist yet; preserve state on hot-reload.
2118                if self.memory_state.rpe_router.is_none() {
2119                    self.memory_state.rpe_router =
2120                        Some(std::sync::Mutex::new(zeph_memory::RpeRouter::new(
2121                            graph_cfg.rpe.threshold,
2122                            graph_cfg.rpe.max_skip_turns,
2123                        )));
2124                }
2125            } else {
2126                self.memory_state.rpe_router = None;
2127            }
2128            self.memory_state.graph_config = graph_cfg.clone();
2129        }
2130        self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
2131        self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
2132        self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
2133        self.context_manager.compaction_cooldown_turns = config.memory.compaction_cooldown_turns;
2134        self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
2135        self.context_manager.compression = config.memory.compression.clone();
2136        self.context_manager.routing = config.memory.store_routing.clone();
2137        // Resolve routing_classifier_provider from the provider pool (#2484).
2138        self.context_manager.store_routing_provider = if config
2139            .memory
2140            .store_routing
2141            .routing_classifier_provider
2142            .is_empty()
2143        {
2144            None
2145        } else {
2146            let resolved = self.resolve_background_provider(
2147                &config.memory.store_routing.routing_classifier_provider,
2148            );
2149            Some(std::sync::Arc::new(resolved))
2150        };
2151        self.memory_state.cross_session_score_threshold =
2152            config.memory.cross_session_score_threshold;
2153
2154        self.index.repo_map_tokens = config.index.repo_map_tokens;
2155        self.index.repo_map_ttl = std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
2156
2157        tracing::info!("config reloaded");
2158    }
2159
2160    /// `/focus` slash command: display Focus Agent status.
2161    async fn handle_focus_status_command(&mut self) -> Result<(), error::AgentError> {
2162        use std::fmt::Write;
2163        let mut out = String::from("Focus Agent status\n\n");
2164        let _ = writeln!(out, "Enabled:          {}", self.focus.config.enabled);
2165        let _ = writeln!(out, "Active session:   {}", self.focus.is_active());
2166        if let Some(ref scope) = self.focus.active_scope {
2167            let _ = writeln!(out, "Active scope:     {scope}");
2168        }
2169        let _ = writeln!(
2170            out,
2171            "Knowledge blocks: {}",
2172            self.focus.knowledge_blocks.len()
2173        );
2174        let _ = writeln!(out, "Turns since focus: {}", self.focus.turns_since_focus);
2175        self.channel.send(&out).await?;
2176        Ok(())
2177    }
2178
2179    /// `/sidequest` slash command: display `SideQuest` eviction stats.
2180    async fn handle_sidequest_status_command(&mut self) -> Result<(), error::AgentError> {
2181        use std::fmt::Write;
2182        let mut out = String::from("SideQuest status\n\n");
2183        let _ = writeln!(out, "Enabled:        {}", self.sidequest.config.enabled);
2184        let _ = writeln!(
2185            out,
2186            "Interval turns: {}",
2187            self.sidequest.config.interval_turns
2188        );
2189        let _ = writeln!(out, "Turn counter:   {}", self.sidequest.turn_counter);
2190        let _ = writeln!(out, "Passes run:     {}", self.sidequest.passes_run);
2191        let _ = writeln!(
2192            out,
2193            "Total evicted:  {} tool outputs",
2194            self.sidequest.total_evicted
2195        );
2196        self.channel.send(&out).await?;
2197        Ok(())
2198    }
2199
2200    /// Run `SideQuest` tool output eviction pass (#1885).
2201    ///
2202    /// PERF-1 fix: two-phase non-blocking design.
2203    ///
2204    /// Phase 1 (apply, this turn): check for a background LLM result spawned last turn,
2205    /// validate and apply it immediately.
2206    ///
2207    /// Phase 2 (schedule, this turn): rebuild cursors and spawn a background `tokio::spawn`
2208    /// task for the LLM call. The result is stored in `pending_sidequest_result` and applied
2209    /// next turn, so the current agent turn is never blocked by the LLM call.
2210    #[allow(clippy::too_many_lines)]
2211    fn maybe_sidequest_eviction(&mut self) {
2212        use zeph_llm::provider::{Message, MessageMetadata, Role};
2213
2214        // S1 runtime guard: warn when SideQuest is enabled alongside a non-Reactive pruning
2215        // strategy — the two systems share the same pool of evictable tool outputs and can
2216        // interfere. Disable sidequest.enabled when pruning_strategy != Reactive.
2217        if self.sidequest.config.enabled {
2218            use crate::config::PruningStrategy;
2219            if !matches!(
2220                self.context_manager.compression.pruning_strategy,
2221                PruningStrategy::Reactive
2222            ) {
2223                tracing::warn!(
2224                    strategy = ?self.context_manager.compression.pruning_strategy,
2225                    "sidequest is enabled alongside a non-Reactive pruning strategy; \
2226                     consider disabling sidequest.enabled to avoid redundant eviction"
2227                );
2228            }
2229        }
2230
2231        // Guard: do not evict while a focus session is active.
2232        if self.focus.is_active() {
2233            tracing::debug!("sidequest: skipping — focus session active");
2234            // Drop any pending result — cursors may be stale relative to focus truncation.
2235            self.compression.pending_sidequest_result = None;
2236            return;
2237        }
2238
2239        // Phase 1: apply pending result from last turn's background LLM call.
2240        if let Some(handle) = self.compression.pending_sidequest_result.take() {
2241            // `now_or_never` avoids blocking — if the task isn't done yet, skip this turn.
2242            use futures::FutureExt as _;
2243            match handle.now_or_never() {
2244                Some(Ok(Some(evicted_indices))) if !evicted_indices.is_empty() => {
2245                    let cursors_snapshot = self.sidequest.tool_output_cursors.clone();
2246                    let freed = self.sidequest.apply_eviction(
2247                        &mut self.msg.messages,
2248                        &evicted_indices,
2249                        &self.metrics.token_counter,
2250                    );
2251                    if freed > 0 {
2252                        self.recompute_prompt_tokens();
2253                        // C1 fix: prevent maybe_compact() from firing in the same turn.
2254                        // cooldown=0: eviction does not impose post-compaction cooldown.
2255                        self.context_manager.compaction =
2256                            crate::agent::context_manager::CompactionState::CompactedThisTurn {
2257                                cooldown: 0,
2258                            };
2259                        tracing::info!(
2260                            freed_tokens = freed,
2261                            evicted_cursors = evicted_indices.len(),
2262                            pass = self.sidequest.passes_run,
2263                            "sidequest eviction complete"
2264                        );
2265                        if let Some(ref d) = self.debug_state.debug_dumper {
2266                            d.dump_sidequest_eviction(&cursors_snapshot, &evicted_indices, freed);
2267                        }
2268                        if let Some(ref tx) = self.session.status_tx {
2269                            let _ = tx.send(format!("SideQuest evicted {freed} tokens"));
2270                        }
2271                    } else {
2272                        // apply_eviction returned 0 — clear spinner so it doesn't dangle.
2273                        if let Some(ref tx) = self.session.status_tx {
2274                            let _ = tx.send(String::new());
2275                        }
2276                    }
2277                }
2278                Some(Ok(None | Some(_))) => {
2279                    tracing::debug!("sidequest: pending result: no cursors to evict");
2280                    if let Some(ref tx) = self.session.status_tx {
2281                        let _ = tx.send(String::new());
2282                    }
2283                }
2284                Some(Err(e)) => {
2285                    tracing::debug!("sidequest: background task panicked: {e}");
2286                    if let Some(ref tx) = self.session.status_tx {
2287                        let _ = tx.send(String::new());
2288                    }
2289                }
2290                None => {
2291                    // Task still running — re-store and wait another turn.
2292                    // We already took it; we'd need to re-spawn, but instead just drop and
2293                    // schedule fresh below to keep the cursor list current.
2294                    tracing::debug!(
2295                        "sidequest: background LLM task not yet complete, rescheduling"
2296                    );
2297                }
2298            }
2299        }
2300
2301        // Phase 2: rebuild cursors and schedule the next background eviction LLM call.
2302        self.sidequest
2303            .rebuild_cursors(&self.msg.messages, &self.metrics.token_counter);
2304
2305        if self.sidequest.tool_output_cursors.is_empty() {
2306            tracing::debug!("sidequest: no eligible cursors");
2307            return;
2308        }
2309
2310        let prompt = self.sidequest.build_eviction_prompt();
2311        let max_eviction_ratio = self.sidequest.config.max_eviction_ratio;
2312        let n_cursors = self.sidequest.tool_output_cursors.len();
2313        // Clone the provider so the spawn closure owns it without borrowing self.
2314        let provider = self.summary_or_primary_provider().clone();
2315
2316        // Spawn background task: the LLM call runs without blocking the agent loop.
2317        let handle = tokio::spawn(async move {
2318            let msgs = [Message {
2319                role: Role::User,
2320                content: prompt,
2321                parts: vec![],
2322                metadata: MessageMetadata::default(),
2323            }];
2324            let response =
2325                match tokio::time::timeout(std::time::Duration::from_secs(5), provider.chat(&msgs))
2326                    .await
2327                {
2328                    Ok(Ok(r)) => r,
2329                    Ok(Err(e)) => {
2330                        tracing::debug!("sidequest bg: LLM call failed: {e:#}");
2331                        return None;
2332                    }
2333                    Err(_) => {
2334                        tracing::debug!("sidequest bg: LLM call timed out");
2335                        return None;
2336                    }
2337                };
2338
2339            let start = response.find('{')?;
2340            let end = response.rfind('}')?;
2341            if start > end {
2342                return None;
2343            }
2344            let json_slice = &response[start..=end];
2345            let parsed: sidequest::EvictionResponse = serde_json::from_str(json_slice).ok()?;
2346            let mut valid: Vec<usize> = parsed
2347                .del_cursors
2348                .into_iter()
2349                .filter(|&c| c < n_cursors)
2350                .collect();
2351            valid.sort_unstable();
2352            valid.dedup();
2353            #[allow(
2354                clippy::cast_precision_loss,
2355                clippy::cast_possible_truncation,
2356                clippy::cast_sign_loss
2357            )]
2358            let max_evict = ((n_cursors as f32) * max_eviction_ratio).ceil() as usize;
2359            valid.truncate(max_evict);
2360            Some(valid)
2361        });
2362
2363        self.compression.pending_sidequest_result = Some(handle);
2364        tracing::debug!("sidequest: background LLM eviction task spawned");
2365        if let Some(ref tx) = self.session.status_tx {
2366            let _ = tx.send("SideQuest: scoring tool outputs...".into());
2367        }
2368    }
2369
2370    /// Check if the process cwd has changed since last call and fire `CwdChanged` hooks.
2371    ///
2372    /// Called after each tool batch completes. The check is a single syscall and has
2373    /// negligible cost. Only fires when cwd actually changed (defense-in-depth: normally
2374    /// only `set_working_directory` changes cwd; shell child processes cannot affect it).
2375    pub(crate) async fn check_cwd_changed(&mut self) {
2376        let current = match std::env::current_dir() {
2377            Ok(p) => p,
2378            Err(e) => {
2379                tracing::warn!("check_cwd_changed: failed to get cwd: {e}");
2380                return;
2381            }
2382        };
2383        if current == self.lifecycle.last_known_cwd {
2384            return;
2385        }
2386        let old_cwd = std::mem::replace(&mut self.lifecycle.last_known_cwd, current.clone());
2387        self.session.env_context.working_dir = current.display().to_string();
2388
2389        tracing::info!(
2390            old = %old_cwd.display(),
2391            new = %current.display(),
2392            "working directory changed"
2393        );
2394
2395        let _ = self
2396            .channel
2397            .send_status("Working directory changed\u{2026}")
2398            .await;
2399
2400        let hooks = self.session.hooks_config.cwd_changed.clone();
2401        if !hooks.is_empty() {
2402            let mut env = std::collections::HashMap::new();
2403            env.insert("ZEPH_OLD_CWD".to_owned(), old_cwd.display().to_string());
2404            env.insert("ZEPH_NEW_CWD".to_owned(), current.display().to_string());
2405            if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env).await {
2406                tracing::warn!(error = %e, "CwdChanged hook failed");
2407            }
2408        }
2409
2410        let _ = self.channel.send_status("").await;
2411    }
2412
2413    /// Handle a `FileChangedEvent` from the file watcher.
2414    pub(crate) async fn handle_file_changed(
2415        &mut self,
2416        event: crate::file_watcher::FileChangedEvent,
2417    ) {
2418        tracing::info!(path = %event.path.display(), "file changed");
2419
2420        let _ = self
2421            .channel
2422            .send_status("Running file-change hook\u{2026}")
2423            .await;
2424
2425        let hooks = self.session.hooks_config.file_changed_hooks.clone();
2426        if !hooks.is_empty() {
2427            let mut env = std::collections::HashMap::new();
2428            env.insert(
2429                "ZEPH_CHANGED_PATH".to_owned(),
2430                event.path.display().to_string(),
2431            );
2432            if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env).await {
2433                tracing::warn!(error = %e, "FileChanged hook failed");
2434            }
2435        }
2436
2437        let _ = self.channel.send_status("").await;
2438    }
2439}
2440pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
2441    while !*rx.borrow_and_update() {
2442        if rx.changed().await.is_err() {
2443            std::future::pending::<()>().await;
2444        }
2445    }
2446}
2447
2448pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
2449    match rx {
2450        Some(inner) => {
2451            if let Some(v) = inner.recv().await {
2452                Some(v)
2453            } else {
2454                *rx = None;
2455                std::future::pending().await
2456            }
2457        }
2458        None => std::future::pending().await,
2459    }
2460}
2461
2462#[cfg(test)]
2463mod tests;
2464
2465#[cfg(test)]
2466pub(crate) use tests::agent_tests;