Skip to main content

zeph_core/agent/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4mod accessors;
5mod builder;
6pub(crate) mod compaction_strategy;
7pub(super) mod compression_feedback;
8mod context;
9pub(crate) mod context_manager;
10mod corrections;
11pub mod error;
12mod experiment_cmd;
13pub(super) mod feedback_detector;
14pub(crate) mod focus;
15mod graph_commands;
16mod guidelines_commands;
17mod index;
18mod learning;
19pub(crate) mod learning_engine;
20mod log_commands;
21mod lsp_commands;
22mod mcp;
23mod memory_commands;
24mod message_queue;
25mod model_commands;
26mod persistence;
27mod plan;
28mod policy_commands;
29mod provider_cmd;
30pub(crate) mod rate_limiter;
31#[cfg(feature = "scheduler")]
32mod scheduler_commands;
33mod scheduler_loop;
34pub mod session_config;
35mod session_digest;
36pub(crate) mod sidequest;
37mod skill_management;
38pub mod slash_commands;
39pub(crate) mod state;
40pub(crate) mod tool_execution;
41pub(crate) mod tool_orchestrator;
42mod trust_commands;
43mod utils;
44
45use std::collections::{HashMap, HashSet, VecDeque};
46use std::sync::Arc;
47use std::time::Instant;
48
49use tokio::sync::{Notify, mpsc, watch};
50use tokio_util::sync::CancellationToken;
51use zeph_llm::any::AnyProvider;
52use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
53use zeph_memory::TokenCounter;
54use zeph_memory::semantic::SemanticMemory;
55use zeph_skills::loader::Skill;
56use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
57use zeph_skills::prompt::format_skills_prompt;
58use zeph_skills::registry::SkillRegistry;
59use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
60
61use crate::channel::Channel;
62use crate::config::Config;
63use crate::config::{SecurityConfig, SkillPromptMode, TimeoutConfig};
64use crate::context::{
65    ContextBudget, EnvironmentContext, build_system_prompt, build_system_prompt_with_instructions,
66};
67use zeph_sanitizer::ContentSanitizer;
68
69use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, detect_image_mime};
70use state::CompressionState;
71use state::{
72    DebugState, ExperimentState, FeedbackState, IndexState, InstructionState, LifecycleState,
73    McpState, MemoryState, MessageState, MetricsState, OrchestrationState, ProviderState,
74    RuntimeConfig, SecurityState, SessionState, SkillState,
75};
76
77pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
78pub(crate) const DOCUMENT_RAG_PREFIX: &str = "## Relevant documents\n";
79pub(crate) const RECALL_PREFIX: &str = "[semantic recall]\n";
80pub(crate) const CODE_CONTEXT_PREFIX: &str = "[code context]\n";
81pub(crate) const SUMMARY_PREFIX: &str = "[conversation summaries]\n";
82pub(crate) const CROSS_SESSION_PREFIX: &str = "[cross-session context]\n";
83pub(crate) const CORRECTIONS_PREFIX: &str = "[past corrections]\n";
84pub(crate) const GRAPH_FACTS_PREFIX: &str = "[known facts]\n";
85pub(crate) const SCHEDULED_TASK_PREFIX: &str = "Execute the following scheduled task now: ";
86pub(crate) const SESSION_DIGEST_PREFIX: &str = "[Session digest from previous interaction]\n";
87/// Prefix used for LSP context messages (`Role::System`) injected into message history.
88/// The tool-pair summarizer targets User/Assistant pairs and skips System messages,
89/// so these notes are never accidentally summarized. `remove_lsp_messages` uses this
90/// prefix to clear stale notes before each fresh injection.
91pub(crate) const LSP_NOTE_PREFIX: &str = "[lsp ";
92pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
93
94pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
95    use std::fmt::Write;
96    let capacity = "[tool output: ".len()
97        + tool_name.len()
98        + "]\n```\n".len()
99        + body.len()
100        + TOOL_OUTPUT_SUFFIX.len();
101    let mut buf = String::with_capacity(capacity);
102    let _ = write!(
103        buf,
104        "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
105    );
106    buf
107}
108
109pub struct Agent<C: Channel> {
110    provider: AnyProvider,
111    /// Dedicated embedding provider. Resolved once at bootstrap from `[[llm.providers]]`
112    /// (the entry with `embed = true`, or first entry with `embedding_model` set).
113    /// Falls back to `provider.clone()` when no dedicated entry exists.
114    /// **Never replaced** by `/provider switch`.
115    embedding_provider: AnyProvider,
116    channel: C,
117    pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
118    pub(super) msg: MessageState,
119    pub(super) memory_state: MemoryState,
120    pub(super) skill_state: SkillState,
121    pub(super) context_manager: context_manager::ContextManager,
122    pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
123    pub(super) learning_engine: learning_engine::LearningEngine,
124    pub(super) feedback: FeedbackState,
125    pub(super) runtime: RuntimeConfig,
126    pub(super) mcp: McpState,
127    pub(super) index: IndexState,
128    pub(super) session: SessionState,
129    pub(super) debug_state: DebugState,
130    pub(super) instructions: InstructionState,
131    pub(super) security: SecurityState,
132    pub(super) experiments: ExperimentState,
133    pub(super) compression: CompressionState,
134    pub(super) lifecycle: LifecycleState,
135    pub(super) providers: ProviderState,
136    pub(super) metrics: MetricsState,
137    pub(super) orchestration: OrchestrationState,
138    /// Focus agent state: active session tracking, knowledge block, reminder counters (#1850).
139    pub(super) focus: focus::FocusState,
140    /// `SideQuest` state: cursor tracking, turn counter, eviction stats (#1885).
141    pub(super) sidequest: sidequest::SidequestState,
142    /// Dynamic tool schema filter: pre-computed tool embeddings for per-turn filtering (#2020).
143    pub(super) tool_schema_filter: Option<zeph_tools::ToolSchemaFilter>,
144    /// Cached filtered tool IDs for the current user turn. Set by `compute_filtered_tool_ids()`
145    /// in `rebuild_system_prompt()`, consumed by the native tool loop on iteration 0.
146    pub(super) cached_filtered_tool_ids: Option<HashSet<String>>,
147    /// Tool dependency graph for sequential tool availability (issue #2024).
148    /// Built once from config, applied per-turn after tool schema filtering.
149    pub(super) dependency_graph: Option<zeph_tools::ToolDependencyGraph>,
150    /// Always-on tool IDs, mirrored from the tool schema filter for dependency gate bypass.
151    pub(super) dependency_always_on: HashSet<String>,
152    /// Tool IDs that completed successfully in the current session.
153    /// Grows monotonically per session; cleared on `/clear`.
154    /// NOTE: bounded by session length, typically < 1000 entries.
155    pub(super) completed_tool_ids: HashSet<String>,
156    /// DB row ID of the most recently persisted message. Set by `persist_message`;
157    /// consumed by `push_message` call sites to populate `metadata.db_id` on in-memory messages.
158    pub(super) last_persisted_message_id: Option<i64>,
159    /// DB message IDs pending hide after deferred tool pair summarization.
160    pub(super) deferred_db_hide_ids: Vec<i64>,
161    /// Summary texts pending insertion after deferred tool pair summarization.
162    pub(super) deferred_db_summaries: Vec<String>,
163    /// Runtime middleware layers for LLM calls and tool dispatch (#2286).
164    ///
165    /// Default: empty vec (zero-cost — loops never iterate).
166    pub(super) runtime_layers: Vec<std::sync::Arc<dyn crate::runtime_layer::RuntimeLayer>>,
167    /// Current tool loop iteration index within the active user turn. Reset to 0 at turn start,
168    /// incremented each iteration. Used to compute remaining tool call budget for `BudgetHint` (#2267).
169    pub(super) current_tool_iteration: usize,
170}
171
172impl<C: Channel> Agent<C> {
173    #[must_use]
174    pub fn new(
175        provider: AnyProvider,
176        channel: C,
177        registry: SkillRegistry,
178        matcher: Option<SkillMatcherBackend>,
179        max_active_skills: usize,
180        tool_executor: impl ToolExecutor + 'static,
181    ) -> Self {
182        let registry = std::sync::Arc::new(std::sync::RwLock::new(registry));
183        Self::new_with_registry_arc(
184            provider,
185            channel,
186            registry,
187            matcher,
188            max_active_skills,
189            tool_executor,
190        )
191    }
192
193    /// Create an agent from a pre-wrapped registry Arc, allowing the caller to
194    /// share the same Arc with other components (e.g. [`crate::SkillLoaderExecutor`]).
195    ///
196    /// # Panics
197    ///
198    /// Panics if the registry `RwLock` is poisoned.
199    #[must_use]
200    #[allow(clippy::too_many_lines)] // flat struct literal initializing all Agent sub-structs — one field per sub-struct, cannot be split further
201    pub fn new_with_registry_arc(
202        provider: AnyProvider,
203        channel: C,
204        registry: std::sync::Arc<std::sync::RwLock<SkillRegistry>>,
205        matcher: Option<SkillMatcherBackend>,
206        max_active_skills: usize,
207        tool_executor: impl ToolExecutor + 'static,
208    ) -> Self {
209        debug_assert!(max_active_skills > 0, "max_active_skills must be > 0");
210        let all_skills: Vec<Skill> = {
211            let reg = registry.read().expect("registry read lock poisoned");
212            reg.all_meta()
213                .iter()
214                .filter_map(|m| reg.get_skill(&m.name).ok())
215                .collect()
216        };
217        let empty_trust = HashMap::new();
218        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
219        let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
220        let system_prompt = build_system_prompt(&skills_prompt, None, None, false);
221        tracing::debug!(len = system_prompt.len(), "initial system prompt built");
222        tracing::trace!(prompt = %system_prompt, "full system prompt");
223
224        let initial_prompt_tokens = u64::try_from(system_prompt.len()).unwrap_or(0) / 4;
225        let (_tx, rx) = watch::channel(false);
226        let token_counter = Arc::new(TokenCounter::new());
227        // Always create the receiver side of the experiment notification channel so the
228        // select! branch in the agent loop compiles unconditionally. The sender is only
229        // stored when the experiments feature is enabled (it is only used in experiment_cmd.rs).
230        let (exp_notify_tx, exp_notify_rx) = tokio::sync::mpsc::channel::<String>(4);
231        let embedding_provider = provider.clone();
232        Self {
233            provider,
234            embedding_provider,
235            channel,
236            tool_executor: Arc::new(tool_executor),
237            msg: MessageState {
238                messages: vec![Message {
239                    role: Role::System,
240                    content: system_prompt,
241                    parts: vec![],
242                    metadata: MessageMetadata::default(),
243                }],
244                message_queue: VecDeque::new(),
245                pending_image_parts: Vec::new(),
246            },
247            memory_state: MemoryState {
248                memory: None,
249                conversation_id: None,
250                history_limit: 50,
251                recall_limit: 5,
252                summarization_threshold: 50,
253                cross_session_score_threshold: 0.35,
254                autosave_assistant: false,
255                autosave_min_length: 20,
256                tool_call_cutoff: 6,
257                unsummarized_count: 0,
258                document_config: crate::config::DocumentConfig::default(),
259                graph_config: crate::config::GraphConfig::default(),
260                compression_guidelines_config: zeph_memory::CompressionGuidelinesConfig::default(),
261                shutdown_summary: true,
262                shutdown_summary_min_messages: 4,
263                shutdown_summary_max_messages: 20,
264                shutdown_summary_timeout_secs: 10,
265                structured_summaries: false,
266                last_recall_confidence: None,
267                digest_config: crate::config::DigestConfig::default(),
268                cached_session_digest: None,
269                context_strategy: crate::config::ContextStrategy::default(),
270                crossover_turn_threshold: 20,
271                rpe_router: None,
272                goal_text: None,
273            },
274            skill_state: SkillState {
275                registry,
276                skill_paths: Vec::new(),
277                managed_dir: None,
278                trust_config: crate::config::TrustConfig::default(),
279                matcher,
280                max_active_skills,
281                disambiguation_threshold: 0.20,
282                min_injection_score: 0.20,
283                embedding_model: String::new(),
284                skill_reload_rx: None,
285                active_skill_names: Vec::new(),
286                last_skills_prompt: skills_prompt,
287                prompt_mode: SkillPromptMode::Auto,
288                available_custom_secrets: HashMap::new(),
289                cosine_weight: 0.7,
290                hybrid_search: false,
291                bm25_index: None,
292                two_stage_matching: false,
293                confusability_threshold: 0.0,
294                rl_head: None,
295                rl_weight: 0.3,
296                rl_warmup_updates: 50,
297                generation_output_dir: None,
298                generation_provider_name: String::new(),
299            },
300            context_manager: context_manager::ContextManager::new(),
301            tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
302            learning_engine: learning_engine::LearningEngine::new(),
303            feedback: FeedbackState {
304                detector: feedback_detector::FeedbackDetector::new(0.6),
305                judge: None,
306                llm_classifier: None,
307            },
308            debug_state: DebugState {
309                debug_dumper: None,
310                dump_format: crate::debug_dump::DumpFormat::default(),
311                trace_collector: None,
312                iteration_counter: 0,
313                anomaly_detector: None,
314                reasoning_model_warning: true,
315                logging_config: crate::config::LoggingConfig::default(),
316                dump_dir: None,
317                trace_service_name: String::new(),
318                trace_redact: true,
319                current_iteration_span_id: None,
320            },
321            runtime: RuntimeConfig {
322                security: SecurityConfig::default(),
323                timeouts: TimeoutConfig::default(),
324                model_name: String::new(),
325                active_provider_name: String::new(),
326                permission_policy: zeph_tools::PermissionPolicy::default(),
327                redact_credentials: true,
328                rate_limiter: rate_limiter::ToolRateLimiter::new(
329                    rate_limiter::RateLimitConfig::default(),
330                ),
331                semantic_cache_enabled: false,
332                semantic_cache_threshold: 0.95,
333                semantic_cache_max_candidates: 10,
334                dependency_config: zeph_tools::DependencyConfig::default(),
335                adversarial_policy_info: None,
336                spawn_depth: 0,
337                budget_hint_enabled: true,
338                channel_skills: zeph_config::ChannelSkillsConfig::default(),
339            },
340            mcp: McpState {
341                tools: Vec::new(),
342                registry: None,
343                manager: None,
344                allowed_commands: Vec::new(),
345                max_dynamic: 10,
346                elicitation_rx: None,
347                shared_tools: None,
348                tool_rx: None,
349                server_outcomes: Vec::new(),
350                pruning_cache: zeph_mcp::PruningCache::new(),
351                pruning_provider: None,
352                pruning_enabled: false,
353                pruning_params: zeph_mcp::PruningParams::default(),
354                semantic_index: None,
355                discovery_strategy: zeph_mcp::ToolDiscoveryStrategy::default(),
356                discovery_params: zeph_mcp::DiscoveryParams::default(),
357                discovery_provider: None,
358                elicitation_warn_sensitive_fields: true,
359            },
360            index: IndexState {
361                retriever: None,
362                repo_map_tokens: 0,
363                cached_repo_map: None,
364                repo_map_ttl: std::time::Duration::from_secs(300),
365            },
366            session: SessionState {
367                env_context: EnvironmentContext::gather(""),
368                response_cache: None,
369                parent_tool_use_id: None,
370                status_tx: None,
371                lsp_hooks: None,
372                policy_config: None,
373                hooks_config: state::HooksConfigSnapshot::default(),
374            },
375            instructions: InstructionState {
376                blocks: Vec::new(),
377                reload_rx: None,
378                reload_state: None,
379            },
380            security: SecurityState {
381                sanitizer: ContentSanitizer::new(&zeph_sanitizer::ContentIsolationConfig::default()),
382                quarantine_summarizer: None,
383                is_acp_session: false,
384                exfiltration_guard: zeph_sanitizer::exfiltration::ExfiltrationGuard::new(
385                    zeph_sanitizer::exfiltration::ExfiltrationGuardConfig::default(),
386                ),
387                flagged_urls: std::collections::HashSet::new(),
388                user_provided_urls: std::sync::Arc::new(std::sync::RwLock::new(
389                    std::collections::HashSet::new(),
390                )),
391                pii_filter: zeph_sanitizer::pii::PiiFilter::new(
392                    zeph_sanitizer::pii::PiiFilterConfig::default(),
393                ),
394                #[cfg(feature = "classifiers")]
395                pii_ner_backend: None,
396                #[cfg(feature = "classifiers")]
397                pii_ner_timeout_ms: 5000,
398                #[cfg(feature = "classifiers")]
399                pii_ner_max_chars: 8192,
400                #[cfg(feature = "classifiers")]
401                pii_ner_circuit_breaker_threshold: 2,
402                #[cfg(feature = "classifiers")]
403                pii_ner_consecutive_timeouts: 0,
404                #[cfg(feature = "classifiers")]
405                pii_ner_tripped: false,
406                memory_validator: zeph_sanitizer::memory_validation::MemoryWriteValidator::new(
407                    zeph_sanitizer::memory_validation::MemoryWriteValidationConfig::default(),
408                ),
409                guardrail: None,
410                response_verifier: zeph_sanitizer::response_verifier::ResponseVerifier::new(
411                    zeph_config::ResponseVerificationConfig::default(),
412                ),
413                causal_analyzer: None,
414            },
415            experiments: ExperimentState {
416                config: crate::config::ExperimentConfig::default(),
417                cancel: None,
418                baseline: crate::experiments::ConfigSnapshot::default(),
419                eval_provider: None,
420                notify_rx: Some(exp_notify_rx),
421                notify_tx: exp_notify_tx,
422            },
423            compression: CompressionState {
424                current_task_goal: None,
425                task_goal_user_msg_hash: None,
426                pending_task_goal: None,
427                pending_sidequest_result: None,
428                subgoal_registry: crate::agent::compaction_strategy::SubgoalRegistry::default(),
429                pending_subgoal: None,
430                subgoal_user_msg_hash: None,
431            },
432            lifecycle: LifecycleState {
433                shutdown: rx,
434                start_time: Instant::now(),
435                cancel_signal: Arc::new(Notify::new()),
436                cancel_token: CancellationToken::new(),
437                config_path: None,
438                config_reload_rx: None,
439                warmup_ready: None,
440                update_notify_rx: None,
441                custom_task_rx: None,
442                last_known_cwd: std::env::current_dir().unwrap_or_default(),
443                file_changed_rx: None,
444                file_watcher: None,
445            },
446            providers: ProviderState {
447                summary_provider: None,
448                provider_override: None,
449                judge_provider: None,
450                probe_provider: None,
451                compress_provider: None,
452                cached_prompt_tokens: initial_prompt_tokens,
453                server_compaction_active: false,
454                stt: None,
455                provider_pool: Vec::new(),
456                provider_config_snapshot: None,
457            },
458            metrics: MetricsState {
459                metrics_tx: None,
460                cost_tracker: None,
461                token_counter,
462                extended_context: false,
463                classifier_metrics: None,
464            },
465            orchestration: OrchestrationState {
466                planner_provider: None,
467                verify_provider: None,
468                pending_graph: None,
469                plan_cancel_token: None,
470                subagent_manager: None,
471                subagent_config: crate::config::SubAgentConfig::default(),
472                orchestration_config: crate::config::OrchestrationConfig::default(),
473                plan_cache: None,
474                pending_goal_embedding: None,
475            },
476            focus: focus::FocusState::default(),
477            sidequest: sidequest::SidequestState::default(),
478            tool_schema_filter: None,
479            cached_filtered_tool_ids: None,
480            dependency_graph: None,
481            dependency_always_on: HashSet::new(),
482            completed_tool_ids: HashSet::new(),
483            last_persisted_message_id: None,
484            deferred_db_hide_ids: Vec::new(),
485            deferred_db_summaries: Vec::new(),
486            runtime_layers: Vec::new(),
487            current_tool_iteration: 0,
488        }
489    }
490
491    /// Poll all active sub-agents for completed/failed/canceled results.
492    ///
493    /// Non-blocking: returns immediately with a list of `(task_id, result)` pairs
494    /// for agents that have finished. Each completed agent is removed from the manager.
495    pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
496        let Some(mgr) = &mut self.orchestration.subagent_manager else {
497            return vec![];
498        };
499
500        let finished: Vec<String> = mgr
501            .statuses()
502            .into_iter()
503            .filter_map(|(id, status)| {
504                if matches!(
505                    status.state,
506                    crate::subagent::SubAgentState::Completed
507                        | crate::subagent::SubAgentState::Failed
508                        | crate::subagent::SubAgentState::Canceled
509                ) {
510                    Some(id)
511                } else {
512                    None
513                }
514            })
515            .collect();
516
517        let mut results = vec![];
518        for task_id in finished {
519            match mgr.collect(&task_id).await {
520                Ok(result) => results.push((task_id, result)),
521                Err(e) => {
522                    tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
523                }
524            }
525        }
526        results
527    }
528
529    /// Call the LLM to generate a structured session summary with a configurable timeout.
530    ///
531    /// Falls back to plain-text chat if structured output fails or times out. Returns `None` on
532    /// any failure, logging a warning — callers must treat `None` as "skip storage".
533    ///
534    /// Each LLM attempt is bounded by `shutdown_summary_timeout_secs`; in the worst case
535    /// (structured call times out and plain-text fallback also times out) this adds up to
536    /// `2 * shutdown_summary_timeout_secs` of shutdown latency.
537    async fn call_llm_for_session_summary(
538        &self,
539        chat_messages: &[Message],
540    ) -> Option<zeph_memory::StructuredSummary> {
541        let timeout_dur =
542            std::time::Duration::from_secs(self.memory_state.shutdown_summary_timeout_secs);
543        match tokio::time::timeout(
544            timeout_dur,
545            self.provider
546                .chat_typed_erased::<zeph_memory::StructuredSummary>(chat_messages),
547        )
548        .await
549        {
550            Ok(Ok(s)) => Some(s),
551            Ok(Err(e)) => {
552                tracing::warn!(
553                    "shutdown summary: structured LLM call failed, falling back to plain: {e:#}"
554                );
555                self.plain_text_summary_fallback(chat_messages, timeout_dur)
556                    .await
557            }
558            Err(_) => {
559                tracing::warn!(
560                    "shutdown summary: structured LLM call timed out after {}s, falling back to plain",
561                    self.memory_state.shutdown_summary_timeout_secs
562                );
563                self.plain_text_summary_fallback(chat_messages, timeout_dur)
564                    .await
565            }
566        }
567    }
568
569    async fn plain_text_summary_fallback(
570        &self,
571        chat_messages: &[Message],
572        timeout_dur: std::time::Duration,
573    ) -> Option<zeph_memory::StructuredSummary> {
574        match tokio::time::timeout(timeout_dur, self.provider.chat(chat_messages)).await {
575            Ok(Ok(plain)) => Some(zeph_memory::StructuredSummary {
576                summary: plain,
577                key_facts: vec![],
578                entities: vec![],
579            }),
580            Ok(Err(e)) => {
581                tracing::warn!("shutdown summary: plain LLM fallback failed: {e:#}");
582                None
583            }
584            Err(_) => {
585                tracing::warn!("shutdown summary: plain LLM fallback timed out");
586                None
587            }
588        }
589    }
590
591    /// Persist tombstone `ToolResult` messages for any assistant `ToolUse` parts that were written
592    /// to the DB during this session but never paired with a `ToolResult` (e.g. because stdin
593    /// closed while tool execution was in progress). Without this the next session startup strips
594    /// those assistant messages and emits orphan warnings.
595    async fn flush_orphaned_tool_use_on_shutdown(&mut self) {
596        use zeph_llm::provider::{MessagePart, Role};
597
598        // Walk messages in reverse: if the last assistant message (ignoring any trailing
599        // system messages) has ToolUse parts and is NOT immediately followed by a user
600        // message whose ToolResult ids cover those ToolUse ids, persist tombstones.
601        let msgs = &self.msg.messages;
602        // Find last assistant message index.
603        let Some(asst_idx) = msgs.iter().rposition(|m| m.role == Role::Assistant) else {
604            return;
605        };
606        let asst_msg = &msgs[asst_idx];
607        let tool_use_ids: Vec<(&str, &str, &serde_json::Value)> = asst_msg
608            .parts
609            .iter()
610            .filter_map(|p| {
611                if let MessagePart::ToolUse { id, name, input } = p {
612                    Some((id.as_str(), name.as_str(), input))
613                } else {
614                    None
615                }
616            })
617            .collect();
618        if tool_use_ids.is_empty() {
619            return;
620        }
621
622        // Check whether a following user message already pairs all ToolUse ids.
623        let paired_ids: std::collections::HashSet<&str> = msgs
624            .get(asst_idx + 1..)
625            .into_iter()
626            .flatten()
627            .filter(|m| m.role == Role::User)
628            .flat_map(|m| m.parts.iter())
629            .filter_map(|p| {
630                if let MessagePart::ToolResult { tool_use_id, .. } = p {
631                    Some(tool_use_id.as_str())
632                } else {
633                    None
634                }
635            })
636            .collect();
637
638        let unpaired: Vec<zeph_llm::provider::ToolUseRequest> = tool_use_ids
639            .iter()
640            .filter(|(id, _, _)| !paired_ids.contains(*id))
641            .map(|(id, name, input)| zeph_llm::provider::ToolUseRequest {
642                id: (*id).to_owned(),
643                name: (*name).to_owned(),
644                input: (*input).clone(),
645            })
646            .collect();
647
648        if unpaired.is_empty() {
649            return;
650        }
651
652        tracing::info!(
653            count = unpaired.len(),
654            "shutdown: persisting tombstone ToolResults for unpaired in-flight tool calls"
655        );
656        self.persist_cancelled_tool_results(&unpaired).await;
657    }
658
659    /// Generate and store a lightweight session summary at shutdown when no hard compaction fired.
660    ///
661    /// Guards:
662    /// - `shutdown_summary` config must be enabled
663    /// - `conversation_id` must be set (memory must be attached)
664    /// - no existing session summary in the store (primary guard — resilient to failed Qdrant writes)
665    /// - at least `shutdown_summary_min_messages` user-turn messages in history
666    ///
667    /// All errors are logged as warnings and swallowed — shutdown must never fail.
668    async fn maybe_store_shutdown_summary(&mut self) {
669        if !self.memory_state.shutdown_summary {
670            return;
671        }
672        let Some(memory) = self.memory_state.memory.clone() else {
673            return;
674        };
675        let Some(conversation_id) = self.memory_state.conversation_id else {
676            return;
677        };
678
679        // Primary guard: check if a summary already exists (handles failed Qdrant writes too).
680        match memory.has_session_summary(conversation_id).await {
681            Ok(true) => {
682                tracing::debug!("shutdown summary: session already has a summary, skipping");
683                return;
684            }
685            Ok(false) => {}
686            Err(e) => {
687                tracing::warn!("shutdown summary: failed to check existing summary: {e:#}");
688                return;
689            }
690        }
691
692        // Count user-turn messages only (skip system prompt at index 0).
693        let user_count = self
694            .msg
695            .messages
696            .iter()
697            .skip(1)
698            .filter(|m| m.role == Role::User)
699            .count();
700        if user_count < self.memory_state.shutdown_summary_min_messages {
701            tracing::debug!(
702                user_count,
703                min = self.memory_state.shutdown_summary_min_messages,
704                "shutdown summary: too few user messages, skipping"
705            );
706            return;
707        }
708
709        // TUI status — send errors silently ignored (TUI may already be gone at shutdown).
710        let _ = self.channel.send_status("Saving session summary...").await;
711
712        // Collect last N messages (skip system prompt at index 0).
713        let max = self.memory_state.shutdown_summary_max_messages;
714        if max == 0 {
715            tracing::debug!("shutdown summary: max_messages=0, skipping");
716            return;
717        }
718        let non_system: Vec<_> = self.msg.messages.iter().skip(1).collect();
719        let slice = if non_system.len() > max {
720            &non_system[non_system.len() - max..]
721        } else {
722            &non_system[..]
723        };
724
725        let msgs_for_prompt: Vec<(zeph_memory::MessageId, String, String)> = slice
726            .iter()
727            .map(|m| {
728                let role = match m.role {
729                    Role::User => "user".to_owned(),
730                    Role::Assistant => "assistant".to_owned(),
731                    Role::System => "system".to_owned(),
732                };
733                (zeph_memory::MessageId(0), role, m.content.clone())
734            })
735            .collect();
736
737        let prompt = zeph_memory::build_summarization_prompt(&msgs_for_prompt);
738        let chat_messages = vec![Message {
739            role: Role::User,
740            content: prompt,
741            parts: vec![],
742            metadata: MessageMetadata::default(),
743        }];
744
745        let Some(structured) = self.call_llm_for_session_summary(&chat_messages).await else {
746            let _ = self.channel.send_status("").await;
747            return;
748        };
749
750        if let Err(e) = memory
751            .store_shutdown_summary(conversation_id, &structured.summary, &structured.key_facts)
752            .await
753        {
754            tracing::warn!("shutdown summary: storage failed: {e:#}");
755        } else {
756            tracing::info!(
757                conversation_id = conversation_id.0,
758                "shutdown summary stored"
759            );
760        }
761
762        // Clear TUI status.
763        let _ = self.channel.send_status("").await;
764    }
765
766    pub async fn shutdown(&mut self) {
767        self.channel.send("Shutting down...").await.ok();
768
769        // CRIT-1: persist Thompson state accumulated during this session.
770        self.provider.save_router_state();
771
772        if let Some(ref mut mgr) = self.orchestration.subagent_manager {
773            mgr.shutdown_all();
774        }
775
776        if let Some(ref manager) = self.mcp.manager {
777            manager.shutdown_all_shared().await;
778        }
779
780        // Finalize compaction trajectory: push the last open segment into the Vec.
781        // This segment would otherwise only be pushed when the next hard compaction fires,
782        // which never happens at session end.
783        if let Some(turns) = self.context_manager.turns_since_last_hard_compaction {
784            self.update_metrics(|m| {
785                m.compaction_turns_after_hard.push(turns);
786            });
787            self.context_manager.turns_since_last_hard_compaction = None;
788        }
789
790        if let Some(ref tx) = self.metrics.metrics_tx {
791            let m = tx.borrow();
792            if m.filter_applications > 0 {
793                #[allow(clippy::cast_precision_loss)]
794                let pct = if m.filter_raw_tokens > 0 {
795                    m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
796                } else {
797                    0.0
798                };
799                tracing::info!(
800                    raw_tokens = m.filter_raw_tokens,
801                    saved_tokens = m.filter_saved_tokens,
802                    applications = m.filter_applications,
803                    "tool output filtering saved ~{} tokens ({pct:.0}%)",
804                    m.filter_saved_tokens,
805                );
806            }
807            if m.compaction_hard_count > 0 {
808                tracing::info!(
809                    hard_compactions = m.compaction_hard_count,
810                    turns_after_hard = ?m.compaction_turns_after_hard,
811                    "hard compaction trajectory"
812                );
813            }
814        }
815
816        // Flush tombstone ToolResults for any assistant ToolUse that was persisted but never
817        // paired with a ToolResult (e.g. stdin EOF mid-execution). Without this the next session
818        // startup strips the orphaned ToolUse and emits warnings.
819        self.flush_orphaned_tool_use_on_shutdown().await;
820
821        self.maybe_store_shutdown_summary().await;
822        self.maybe_store_session_digest().await;
823
824        tracing::info!("agent shutdown complete");
825    }
826
827    /// Run the chat loop, receiving messages via the channel until EOF or shutdown.
828    ///
829    /// # Errors
830    ///
831    /// Returns an error if channel I/O or LLM communication fails.
832    /// Refresh sub-agent metrics snapshot for the TUI metrics panel.
833    fn refresh_subagent_metrics(&mut self) {
834        let Some(ref mgr) = self.orchestration.subagent_manager else {
835            return;
836        };
837        let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
838            .statuses()
839            .into_iter()
840            .map(|(id, s)| {
841                let def = mgr.agents_def(&id);
842                crate::metrics::SubAgentMetrics {
843                    name: def.map_or_else(|| id[..8.min(id.len())].to_owned(), |d| d.name.clone()),
844                    id: id.clone(),
845                    state: format!("{:?}", s.state).to_lowercase(),
846                    turns_used: s.turns_used,
847                    max_turns: def.map_or(20, |d| d.permissions.max_turns),
848                    background: def.is_some_and(|d| d.permissions.background),
849                    elapsed_secs: s.started_at.elapsed().as_secs(),
850                    permission_mode: def.map_or_else(String::new, |d| {
851                        use crate::subagent::def::PermissionMode;
852                        match d.permissions.permission_mode {
853                            PermissionMode::Default => String::new(),
854                            PermissionMode::AcceptEdits => "accept_edits".into(),
855                            PermissionMode::DontAsk => "dont_ask".into(),
856                            PermissionMode::BypassPermissions => "bypass_permissions".into(),
857                            PermissionMode::Plan => "plan".into(),
858                        }
859                    }),
860                    transcript_dir: mgr
861                        .agent_transcript_dir(&id)
862                        .map(|p| p.to_string_lossy().into_owned()),
863                }
864            })
865            .collect();
866        self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
867    }
868
869    /// Non-blocking poll: notify the user when background sub-agents complete.
870    async fn notify_completed_subagents(&mut self) -> Result<(), error::AgentError> {
871        let completed = self.poll_subagents().await;
872        for (task_id, result) in completed {
873            let notice = if result.is_empty() {
874                format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
875            } else {
876                format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
877            };
878            if let Err(e) = self.channel.send(&notice).await {
879                tracing::warn!(error = %e, "failed to send sub-agent completion notice");
880            }
881        }
882        Ok(())
883    }
884
885    /// Run the agent main loop.
886    ///
887    /// # Errors
888    ///
889    /// Returns an error if the channel, LLM provider, or tool execution encounters a fatal error.
890    pub async fn run(&mut self) -> Result<(), error::AgentError> {
891        if let Some(mut rx) = self.lifecycle.warmup_ready.take()
892            && !*rx.borrow()
893        {
894            let _ = rx.changed().await;
895            if !*rx.borrow() {
896                tracing::warn!("model warmup did not complete successfully");
897            }
898        }
899
900        // Load the session digest once at session start for context injection.
901        self.load_and_cache_session_digest().await;
902
903        loop {
904            // Apply any pending provider override (from ACP set_session_config_option).
905            if let Some(ref slot) = self.providers.provider_override
906                && let Some(new_provider) = slot
907                    .write()
908                    .unwrap_or_else(std::sync::PoisonError::into_inner)
909                    .take()
910            {
911                tracing::debug!(provider = new_provider.name(), "ACP model override applied");
912                self.provider = new_provider;
913            }
914
915            // Poll for MCP tool list updates from tools/list_changed notifications.
916            self.check_tool_refresh().await;
917
918            // Process any pending MCP elicitation requests from MCP servers.
919            self.process_pending_elicitations().await;
920
921            // Refresh sub-agent status in metrics before polling.
922            self.refresh_subagent_metrics();
923
924            // Non-blocking poll: notify user when background sub-agents complete.
925            self.notify_completed_subagents().await?;
926
927            self.drain_channel();
928
929            let (text, image_parts) = if let Some(queued) = self.msg.message_queue.pop_front() {
930                self.notify_queue_count().await;
931                if queued.raw_attachments.is_empty() {
932                    (queued.text, queued.image_parts)
933                } else {
934                    let msg = crate::channel::ChannelMessage {
935                        text: queued.text,
936                        attachments: queued.raw_attachments,
937                    };
938                    self.resolve_message(msg).await
939                }
940            } else {
941                let incoming = tokio::select! {
942                    result = self.channel.recv() => result?,
943                    () = shutdown_signal(&mut self.lifecycle.shutdown) => {
944                        tracing::info!("shutting down");
945                        break;
946                    }
947                    Some(_) = recv_optional(&mut self.skill_state.skill_reload_rx) => {
948                        self.reload_skills().await;
949                        continue;
950                    }
951                    Some(_) = recv_optional(&mut self.instructions.reload_rx) => {
952                        self.reload_instructions();
953                        continue;
954                    }
955                    Some(_) = recv_optional(&mut self.lifecycle.config_reload_rx) => {
956                        self.reload_config();
957                        continue;
958                    }
959                    Some(msg) = recv_optional(&mut self.lifecycle.update_notify_rx) => {
960                        if let Err(e) = self.channel.send(&msg).await {
961                            tracing::warn!("failed to send update notification: {e}");
962                        }
963                        continue;
964                    }
965                    Some(msg) = recv_optional(&mut self.experiments.notify_rx) => {
966                        // Experiment engine completed (ok or err). Clear the cancel token so
967                        // status reports idle and new experiments can be started.
968                        { self.experiments.cancel = None; }
969                        if let Err(e) = self.channel.send(&msg).await {
970                            tracing::warn!("failed to send experiment completion: {e}");
971                        }
972                        continue;
973                    }
974                    Some(prompt) = recv_optional(&mut self.lifecycle.custom_task_rx) => {
975                        tracing::info!("scheduler: injecting custom task as agent turn");
976                        let text = format!("{SCHEDULED_TASK_PREFIX}{prompt}");
977                        Some(crate::channel::ChannelMessage { text, attachments: Vec::new() })
978                    }
979                    Some(event) = recv_optional(&mut self.lifecycle.file_changed_rx) => {
980                        self.handle_file_changed(event).await;
981                        continue;
982                    }
983                };
984                let Some(msg) = incoming else { break };
985                self.drain_channel();
986                self.resolve_message(msg).await
987            };
988
989            let trimmed = text.trim();
990
991            match self.handle_builtin_command(trimmed).await? {
992                Some(true) => break,
993                Some(false) => continue,
994                None => {}
995            }
996
997            self.process_user_message(text, image_parts).await?;
998        }
999
1000        // Flush trace collector on normal exit (C-04: Drop handles error/panic paths).
1001        if let Some(ref mut tc) = self.debug_state.trace_collector {
1002            tc.finish();
1003        }
1004
1005        Ok(())
1006    }
1007
1008    /// Handle `/debug-dump` and `/debug-dump <path>` commands.
1009    async fn handle_debug_dump_command(&mut self, trimmed: &str) {
1010        let arg = trimmed.strip_prefix("/debug-dump").map_or("", str::trim);
1011        if arg.is_empty() {
1012            match &self.debug_state.debug_dumper {
1013                Some(d) => {
1014                    let _ = self
1015                        .channel
1016                        .send(&format!("Debug dump active: {}", d.dir().display()))
1017                        .await;
1018                }
1019                None => {
1020                    let _ = self
1021                        .channel
1022                        .send(
1023                            "Debug dump is inactive. Use `/debug-dump <path>` to enable, \
1024                             or start with `--debug-dump [dir]`.",
1025                        )
1026                        .await;
1027                }
1028            }
1029            return;
1030        }
1031        let dir = std::path::PathBuf::from(arg);
1032        match crate::debug_dump::DebugDumper::new(&dir, self.debug_state.dump_format) {
1033            Ok(dumper) => {
1034                let path = dumper.dir().display().to_string();
1035                self.debug_state.debug_dumper = Some(dumper);
1036                let _ = self
1037                    .channel
1038                    .send(&format!("Debug dump enabled: {path}"))
1039                    .await;
1040            }
1041            Err(e) => {
1042                let _ = self
1043                    .channel
1044                    .send(&format!("Failed to enable debug dump: {e}"))
1045                    .await;
1046            }
1047        }
1048    }
1049
1050    /// Handle `/dump-format <json|raw|trace>` command — switch debug dump format at runtime.
1051    async fn handle_dump_format_command(&mut self, trimmed: &str) {
1052        let arg = trimmed.strip_prefix("/dump-format").map_or("", str::trim);
1053        if arg.is_empty() {
1054            let _ = self
1055                .channel
1056                .send(&format!(
1057                    "Current dump format: {:?}. Use `/dump-format json|raw|trace` to change.",
1058                    self.debug_state.dump_format
1059                ))
1060                .await;
1061            return;
1062        }
1063        let new_format = match arg {
1064            "json" => crate::debug_dump::DumpFormat::Json,
1065            "raw" => crate::debug_dump::DumpFormat::Raw,
1066            "trace" => crate::debug_dump::DumpFormat::Trace,
1067            other => {
1068                let _ = self
1069                    .channel
1070                    .send(&format!(
1071                        "Unknown format '{other}'. Valid values: json, raw, trace."
1072                    ))
1073                    .await;
1074                return;
1075            }
1076        };
1077        let was_trace = self.debug_state.dump_format == crate::debug_dump::DumpFormat::Trace;
1078        let now_trace = new_format == crate::debug_dump::DumpFormat::Trace;
1079
1080        // CR-04: when switching TO trace, create a fresh TracingCollector.
1081        if now_trace
1082            && !was_trace
1083            && let Some(ref dump_dir) = self.debug_state.dump_dir.clone()
1084        {
1085            let service_name = self.debug_state.trace_service_name.clone();
1086            let redact = self.debug_state.trace_redact;
1087            match crate::debug_dump::trace::TracingCollector::new(
1088                dump_dir.as_path(),
1089                &service_name,
1090                redact,
1091                None,
1092            ) {
1093                Ok(collector) => {
1094                    self.debug_state.trace_collector = Some(collector);
1095                }
1096                Err(e) => {
1097                    tracing::warn!(error = %e, "failed to create TracingCollector on format switch");
1098                }
1099            }
1100        }
1101        // CR-04: when switching AWAY from trace, flush and drop the collector.
1102        if was_trace
1103            && !now_trace
1104            && let Some(mut tc) = self.debug_state.trace_collector.take()
1105        {
1106            tc.finish();
1107        }
1108
1109        self.debug_state.dump_format = new_format;
1110        let _ = self
1111            .channel
1112            .send(&format!("Debug dump format set to: {arg}"))
1113            .await;
1114    }
1115
1116    async fn resolve_message(
1117        &self,
1118        msg: crate::channel::ChannelMessage,
1119    ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
1120        use crate::channel::{Attachment, AttachmentKind};
1121        use zeph_llm::provider::{ImageData, MessagePart};
1122
1123        let text_base = msg.text.clone();
1124
1125        let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
1126            .attachments
1127            .into_iter()
1128            .partition(|a| a.kind == AttachmentKind::Audio);
1129
1130        tracing::debug!(
1131            audio = audio_attachments.len(),
1132            has_stt = self.providers.stt.is_some(),
1133            "resolve_message attachments"
1134        );
1135
1136        let text = if !audio_attachments.is_empty()
1137            && let Some(stt) = self.providers.stt.as_ref()
1138        {
1139            let mut transcribed_parts = Vec::new();
1140            for attachment in &audio_attachments {
1141                if attachment.data.len() > MAX_AUDIO_BYTES {
1142                    tracing::warn!(
1143                        size = attachment.data.len(),
1144                        max = MAX_AUDIO_BYTES,
1145                        "audio attachment exceeds size limit, skipping"
1146                    );
1147                    continue;
1148                }
1149                match stt
1150                    .transcribe(&attachment.data, attachment.filename.as_deref())
1151                    .await
1152                {
1153                    Ok(result) => {
1154                        tracing::info!(
1155                            len = result.text.len(),
1156                            language = ?result.language,
1157                            "audio transcribed"
1158                        );
1159                        transcribed_parts.push(result.text);
1160                    }
1161                    Err(e) => {
1162                        tracing::error!(error = %e, "audio transcription failed");
1163                    }
1164                }
1165            }
1166            if transcribed_parts.is_empty() {
1167                text_base
1168            } else {
1169                let transcribed = transcribed_parts.join("\n");
1170                if text_base.is_empty() {
1171                    transcribed
1172                } else {
1173                    format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
1174                }
1175            }
1176        } else {
1177            if !audio_attachments.is_empty() {
1178                tracing::warn!(
1179                    count = audio_attachments.len(),
1180                    "audio attachments received but no STT provider configured, dropping"
1181                );
1182            }
1183            text_base
1184        };
1185
1186        let mut image_parts = Vec::new();
1187        for attachment in image_attachments {
1188            if attachment.data.len() > MAX_IMAGE_BYTES {
1189                tracing::warn!(
1190                    size = attachment.data.len(),
1191                    max = MAX_IMAGE_BYTES,
1192                    "image attachment exceeds size limit, skipping"
1193                );
1194                continue;
1195            }
1196            let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
1197            image_parts.push(MessagePart::Image(Box::new(ImageData {
1198                data: attachment.data,
1199                mime_type,
1200            })));
1201        }
1202
1203        (text, image_parts)
1204    }
1205
1206    async fn process_user_message(
1207        &mut self,
1208        text: String,
1209        image_parts: Vec<zeph_llm::provider::MessagePart>,
1210    ) -> Result<(), error::AgentError> {
1211        // Record iteration start in trace collector (C-02: owned guard, no borrow held).
1212        let iteration_index = self.debug_state.iteration_counter;
1213        self.debug_state.iteration_counter += 1;
1214        if let Some(ref mut tc) = self.debug_state.trace_collector {
1215            tc.begin_iteration(iteration_index, text.trim());
1216            // CR-01: store the span ID so LLM/tool execution can attach child spans.
1217            self.debug_state.current_iteration_span_id =
1218                tc.current_iteration_span_id(iteration_index);
1219        }
1220
1221        let result = self
1222            .process_user_message_inner(text, image_parts, iteration_index)
1223            .await;
1224
1225        // Close iteration span regardless of outcome (partial trace preserved on error).
1226        if let Some(ref mut tc) = self.debug_state.trace_collector {
1227            let status = if result.is_ok() {
1228                crate::debug_dump::trace::SpanStatus::Ok
1229            } else {
1230                crate::debug_dump::trace::SpanStatus::Error {
1231                    message: "iteration failed".to_owned(),
1232                }
1233            };
1234            tc.end_iteration(iteration_index, status);
1235        }
1236        self.debug_state.current_iteration_span_id = None;
1237
1238        result
1239    }
1240
1241    async fn process_user_message_inner(
1242        &mut self,
1243        text: String,
1244        image_parts: Vec<zeph_llm::provider::MessagePart>,
1245        iteration_index: usize,
1246    ) -> Result<(), error::AgentError> {
1247        let _ = iteration_index; // Used indirectly via debug_state.current_iteration_span_id.
1248        self.lifecycle.cancel_token = CancellationToken::new();
1249        let signal = Arc::clone(&self.lifecycle.cancel_signal);
1250        let token = self.lifecycle.cancel_token.clone();
1251        tokio::spawn(async move {
1252            signal.notified().await;
1253            token.cancel();
1254        });
1255        let trimmed = text.trim();
1256
1257        if let Some(result) = self.dispatch_slash_command(trimmed).await {
1258            return result;
1259        }
1260
1261        self.check_pending_rollbacks().await;
1262
1263        if self.pre_process_security(trimmed).await? {
1264            return Ok(());
1265        }
1266
1267        self.advance_context_lifecycle(&text, trimmed).await;
1268
1269        let user_msg = self.build_user_message(&text, image_parts);
1270
1271        // Extract URLs from user input and add to user_provided_urls for grounding checks.
1272        let urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
1273        if !urls.is_empty()
1274            && let Ok(mut set) = self.security.user_provided_urls.write()
1275        {
1276            set.extend(urls);
1277        }
1278
1279        // Capture raw user input as goal text for A-MAC goal-conditioned write gating (#2483).
1280        // Derived from the raw input text before context assembly to avoid timing dependencies.
1281        self.memory_state.goal_text = Some(text.clone());
1282
1283        // Image parts intentionally excluded — base64 payloads too large for message history.
1284        self.persist_message(Role::User, &text, &[], false).await;
1285        self.push_message(user_msg);
1286
1287        if let Err(e) = self.process_response().await {
1288            tracing::error!("Response processing failed: {e:#}");
1289            let user_msg = format!("Error: {e:#}");
1290            self.channel.send(&user_msg).await?;
1291            self.msg.messages.pop();
1292            self.recompute_prompt_tokens();
1293            self.channel.flush_chunks().await?;
1294        }
1295
1296        Ok(())
1297    }
1298
1299    // Returns true if the input was blocked and the caller should return Ok(()) immediately.
1300    async fn pre_process_security(&mut self, trimmed: &str) -> Result<bool, error::AgentError> {
1301        // Guardrail: LLM-based prompt injection pre-screening at the user input boundary.
1302        if let Some(ref guardrail) = self.security.guardrail {
1303            use zeph_sanitizer::guardrail::GuardrailVerdict;
1304            let verdict = guardrail.check(trimmed).await;
1305            match &verdict {
1306                GuardrailVerdict::Flagged { reason, .. } => {
1307                    tracing::warn!(
1308                        reason = %reason,
1309                        should_block = verdict.should_block(),
1310                        "guardrail flagged user input"
1311                    );
1312                    if verdict.should_block() {
1313                        let msg = format!("[guardrail] Input blocked: {reason}");
1314                        let _ = self.channel.send(&msg).await;
1315                        let _ = self.channel.flush_chunks().await;
1316                        return Ok(true);
1317                    }
1318                    // Warn mode: notify but continue.
1319                    let _ = self
1320                        .channel
1321                        .send(&format!("[guardrail] Warning: {reason}"))
1322                        .await;
1323                }
1324                GuardrailVerdict::Error { error } => {
1325                    if guardrail.error_should_block() {
1326                        tracing::warn!(%error, "guardrail check failed (fail_strategy=closed), blocking input");
1327                        let msg = "[guardrail] Input blocked: check failed (see logs for details)";
1328                        let _ = self.channel.send(msg).await;
1329                        let _ = self.channel.flush_chunks().await;
1330                        return Ok(true);
1331                    }
1332                    tracing::warn!(%error, "guardrail check failed (fail_strategy=open), allowing input");
1333                }
1334                GuardrailVerdict::Safe => {}
1335            }
1336        }
1337
1338        // ML classifier: lightweight injection detection on user input boundary.
1339        // Runs after guardrail (LLM-based) to layer defenses. On detection, blocks and returns.
1340        // Falls back to regex on classifier error/timeout — never degrades below regex baseline.
1341        // Gated by `scan_user_input`: DeBERTa is tuned for external/untrusted content, not
1342        // direct user chat. Disabled by default to prevent false positives on benign messages.
1343        #[cfg(feature = "classifiers")]
1344        if self.security.sanitizer.scan_user_input() {
1345            match self.security.sanitizer.classify_injection(trimmed).await {
1346                zeph_sanitizer::InjectionVerdict::Blocked => {
1347                    self.push_classifier_metrics();
1348                    let _ = self
1349                        .channel
1350                        .send("[security] Input blocked: injection detected by classifier.")
1351                        .await;
1352                    let _ = self.channel.flush_chunks().await;
1353                    return Ok(true);
1354                }
1355                zeph_sanitizer::InjectionVerdict::Suspicious => {
1356                    tracing::warn!("injection_classifier soft_signal on user input");
1357                }
1358                zeph_sanitizer::InjectionVerdict::Clean => {}
1359            }
1360        }
1361        #[cfg(feature = "classifiers")]
1362        self.push_classifier_metrics();
1363
1364        Ok(false)
1365    }
1366
1367    async fn advance_context_lifecycle(&mut self, text: &str, trimmed: &str) {
1368        // Reset per-message pruning cache at the start of each turn (#2298).
1369        self.mcp.pruning_cache.reset();
1370
1371        // Extract before rebuild_system_prompt so the value is not tainted
1372        // by the secrets-bearing system prompt (ConversationId is just an i64).
1373        let conv_id = self.memory_state.conversation_id;
1374        self.rebuild_system_prompt(text).await;
1375
1376        self.detect_and_record_corrections(trimmed, conv_id).await;
1377        self.learning_engine.tick();
1378        self.analyze_and_learn().await;
1379        self.sync_graph_counts().await;
1380
1381        // Reset per-turn compaction guard FIRST so SideQuest sees a clean slate (C2 fix).
1382        // complete_focus and maybe_sidequest_eviction set this flag when they run (C1 fix).
1383        // advance_turn() transitions CompactedThisTurn → Cooling/Ready; all other states
1384        // pass through unchanged. See CompactionState::advance_turn for ordering guarantees.
1385        self.context_manager.compaction = self.context_manager.compaction.advance_turn();
1386
1387        // Tick Focus Agent and SideQuest turn counters (#1850, #1885).
1388        {
1389            self.focus.tick();
1390
1391            // SideQuest eviction: runs every N user turns when enabled.
1392            // Skipped when is_compacted_this_turn (focus truncation or prior eviction ran).
1393            let sidequest_should_fire = self.sidequest.tick();
1394            if sidequest_should_fire && !self.context_manager.compaction.is_compacted_this_turn() {
1395                self.maybe_sidequest_eviction();
1396            }
1397        }
1398
1399        // Tier 0: batch-apply deferred tool summaries when approaching context limit.
1400        // This is a pure in-memory operation (no LLM call) — summaries were pre-computed
1401        // during the tool loop. Intentionally does NOT set compacted_this_turn, so
1402        // proactive/reactive compaction may still fire if tokens remain above their thresholds.
1403        self.maybe_apply_deferred_summaries();
1404        self.flush_deferred_summaries().await;
1405
1406        // Proactive compression fires first (if configured); if it runs, reactive is skipped.
1407        if let Err(e) = self.maybe_proactive_compress().await {
1408            tracing::warn!("proactive compression failed: {e:#}");
1409        }
1410
1411        if let Err(e) = self.maybe_compact().await {
1412            tracing::warn!("context compaction failed: {e:#}");
1413        }
1414
1415        if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
1416            tracing::warn!("context preparation failed: {e:#}");
1417        }
1418
1419        // MAR: propagate top-1 recall confidence to the router for cost-aware routing.
1420        self.provider
1421            .set_memory_confidence(self.memory_state.last_recall_confidence);
1422
1423        self.learning_engine.reset_reflection();
1424    }
1425
1426    fn build_user_message(
1427        &mut self,
1428        text: &str,
1429        image_parts: Vec<zeph_llm::provider::MessagePart>,
1430    ) -> Message {
1431        let mut all_image_parts = std::mem::take(&mut self.msg.pending_image_parts);
1432        all_image_parts.extend(image_parts);
1433
1434        if !all_image_parts.is_empty() && self.provider.supports_vision() {
1435            let mut parts = vec![zeph_llm::provider::MessagePart::Text {
1436                text: text.to_owned(),
1437            }];
1438            parts.extend(all_image_parts);
1439            Message::from_parts(Role::User, parts)
1440        } else {
1441            if !all_image_parts.is_empty() {
1442                tracing::warn!(
1443                    count = all_image_parts.len(),
1444                    "image attachments dropped: provider does not support vision"
1445                );
1446            }
1447            Message {
1448                role: Role::User,
1449                content: text.to_owned(),
1450                parts: vec![],
1451                metadata: MessageMetadata::default(),
1452            }
1453        }
1454    }
1455
1456    /// Poll a sub-agent until it reaches a terminal state, bridging secret requests to the
1457    /// channel. Returns a human-readable status string suitable for sending to the user.
1458    async fn poll_subagent_until_done(&mut self, task_id: &str, label: &str) -> Option<String> {
1459        use crate::subagent::SubAgentState;
1460        let result = loop {
1461            tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1462
1463            // Bridge secret requests from sub-agent to channel.confirm().
1464            // Fetch the pending request first, then release the borrow before
1465            // calling channel.confirm() (which requires &mut self).
1466            #[allow(clippy::redundant_closure_for_method_calls)]
1467            let pending = self
1468                .orchestration
1469                .subagent_manager
1470                .as_mut()
1471                .and_then(|m| m.try_recv_secret_request());
1472            if let Some((req_task_id, req)) = pending {
1473                // req.secret_key is pre-validated to [a-zA-Z0-9_-] in manager.rs
1474                // (SEC-P1-02), so it is safe to embed in the prompt string.
1475                let confirm_prompt = format!(
1476                    "Sub-agent requests secret '{}'. Allow?",
1477                    crate::text::truncate_to_chars(&req.secret_key, 100)
1478                );
1479                let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
1480                if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1481                    if approved {
1482                        let ttl = std::time::Duration::from_secs(300);
1483                        let key = req.secret_key.clone();
1484                        if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
1485                            let _ = mgr.deliver_secret(&req_task_id, key);
1486                        }
1487                    } else {
1488                        let _ = mgr.deny_secret(&req_task_id);
1489                    }
1490                }
1491            }
1492
1493            let mgr = self.orchestration.subagent_manager.as_ref()?;
1494            let statuses = mgr.statuses();
1495            let Some((_, status)) = statuses.iter().find(|(id, _)| id == task_id) else {
1496                break format!("{label} completed (no status available).");
1497            };
1498            match status.state {
1499                SubAgentState::Completed => {
1500                    let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
1501                    break format!("{label} completed: {msg}");
1502                }
1503                SubAgentState::Failed => {
1504                    let msg = status
1505                        .last_message
1506                        .clone()
1507                        .unwrap_or_else(|| "unknown error".into());
1508                    break format!("{label} failed: {msg}");
1509                }
1510                SubAgentState::Canceled => {
1511                    break format!("{label} was cancelled.");
1512                }
1513                _ => {
1514                    let _ = self
1515                        .channel
1516                        .send_status(&format!(
1517                            "{label}: turn {}/{}",
1518                            status.turns_used,
1519                            self.orchestration
1520                                .subagent_manager
1521                                .as_ref()
1522                                .and_then(|m| m.agents_def(task_id))
1523                                .map_or(20, |d| d.permissions.max_turns)
1524                        ))
1525                        .await;
1526                }
1527            }
1528        };
1529        Some(result)
1530    }
1531
1532    /// Resolve a unique full `task_id` from a prefix. Returns `None` if the manager is absent,
1533    /// `Some(Err(msg))` on ambiguity/not-found, `Some(Ok(full_id))` on success.
1534    fn resolve_agent_id_prefix(&mut self, prefix: &str) -> Option<Result<String, String>> {
1535        let mgr = self.orchestration.subagent_manager.as_mut()?;
1536        let full_ids: Vec<String> = mgr
1537            .statuses()
1538            .into_iter()
1539            .map(|(tid, _)| tid)
1540            .filter(|tid| tid.starts_with(prefix))
1541            .collect();
1542        Some(match full_ids.as_slice() {
1543            [] => Err(format!("No sub-agent with id prefix '{prefix}'")),
1544            [fid] => Ok(fid.clone()),
1545            _ => Err(format!(
1546                "Ambiguous id prefix '{prefix}': matches {} agents",
1547                full_ids.len()
1548            )),
1549        })
1550    }
1551
1552    fn handle_agent_list(&self) -> Option<String> {
1553        use std::fmt::Write as _;
1554        let mgr = self.orchestration.subagent_manager.as_ref()?;
1555        let defs = mgr.definitions();
1556        if defs.is_empty() {
1557            return Some("No sub-agent definitions found.".into());
1558        }
1559        let mut out = String::from("Available sub-agents:\n");
1560        for d in defs {
1561            let memory_label = match d.memory {
1562                Some(crate::subagent::MemoryScope::User) => " [memory:user]",
1563                Some(crate::subagent::MemoryScope::Project) => " [memory:project]",
1564                Some(crate::subagent::MemoryScope::Local) => " [memory:local]",
1565                None => "",
1566            };
1567            if let Some(ref src) = d.source {
1568                let _ = writeln!(
1569                    out,
1570                    "  {}{} — {} ({})",
1571                    d.name, memory_label, d.description, src
1572                );
1573            } else {
1574                let _ = writeln!(out, "  {}{} — {}", d.name, memory_label, d.description);
1575            }
1576        }
1577        Some(out)
1578    }
1579
1580    fn handle_agent_status(&self) -> Option<String> {
1581        use std::fmt::Write as _;
1582        let mgr = self.orchestration.subagent_manager.as_ref()?;
1583        let statuses = mgr.statuses();
1584        if statuses.is_empty() {
1585            return Some("No active sub-agents.".into());
1586        }
1587        let mut out = String::from("Active sub-agents:\n");
1588        for (id, s) in &statuses {
1589            let state = format!("{:?}", s.state).to_lowercase();
1590            let elapsed = s.started_at.elapsed().as_secs();
1591            let _ = writeln!(
1592                out,
1593                "  [{short}] {state}  turns={t}  elapsed={elapsed}s  {msg}",
1594                short = &id[..8.min(id.len())],
1595                t = s.turns_used,
1596                msg = s.last_message.as_deref().unwrap_or(""),
1597            );
1598            // Show memory directory path for agents with memory enabled.
1599            if let Some(def) = mgr.agents_def(id)
1600                && let Some(scope) = def.memory
1601                && let Ok(dir) = crate::subagent::memory::resolve_memory_dir(scope, &def.name)
1602            {
1603                let _ = writeln!(out, "       memory: {}", dir.display());
1604            }
1605        }
1606        Some(out)
1607    }
1608
1609    fn handle_agent_approve(&mut self, id: &str) -> Option<String> {
1610        let full_id = match self.resolve_agent_id_prefix(id)? {
1611            Ok(fid) => fid,
1612            Err(msg) => return Some(msg),
1613        };
1614        let mgr = self.orchestration.subagent_manager.as_mut()?;
1615        if let Some((tid, req)) = mgr.try_recv_secret_request()
1616            && tid == full_id
1617        {
1618            let key = req.secret_key.clone();
1619            let ttl = std::time::Duration::from_secs(300);
1620            if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
1621                return Some(format!("Approve failed: {e}"));
1622            }
1623            if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
1624                return Some(format!("Secret delivery failed: {e}"));
1625            }
1626            return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
1627        }
1628        Some(format!(
1629            "No pending secret request for sub-agent '{full_id}'."
1630        ))
1631    }
1632
1633    fn handle_agent_deny(&mut self, id: &str) -> Option<String> {
1634        let full_id = match self.resolve_agent_id_prefix(id)? {
1635            Ok(fid) => fid,
1636            Err(msg) => return Some(msg),
1637        };
1638        let mgr = self.orchestration.subagent_manager.as_mut()?;
1639        match mgr.deny_secret(&full_id) {
1640            Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
1641            Err(e) => Some(format!("Deny failed: {e}")),
1642        }
1643    }
1644
1645    #[allow(clippy::too_many_lines)]
1646    async fn handle_agent_command(&mut self, cmd: crate::subagent::AgentCommand) -> Option<String> {
1647        use crate::subagent::AgentCommand;
1648
1649        match cmd {
1650            AgentCommand::List => self.handle_agent_list(),
1651            AgentCommand::Background { name, prompt } => {
1652                let provider = self.provider.clone();
1653                let tool_executor = Arc::clone(&self.tool_executor);
1654                let skills = self.filtered_skills_for(&name);
1655                let cfg = self.orchestration.subagent_config.clone();
1656                let spawn_ctx = self.build_spawn_context(&cfg);
1657                let mgr = self.orchestration.subagent_manager.as_mut()?;
1658                match mgr.spawn(
1659                    &name,
1660                    &prompt,
1661                    provider,
1662                    tool_executor,
1663                    skills,
1664                    &cfg,
1665                    spawn_ctx,
1666                ) {
1667                    Ok(id) => Some(format!(
1668                        "Sub-agent '{name}' started in background (id: {short})",
1669                        short = &id[..8.min(id.len())]
1670                    )),
1671                    Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
1672                }
1673            }
1674            AgentCommand::Spawn { name, prompt }
1675            | AgentCommand::Mention {
1676                agent: name,
1677                prompt,
1678            } => {
1679                // Foreground spawn: launch and await completion, streaming status to user.
1680                let provider = self.provider.clone();
1681                let tool_executor = Arc::clone(&self.tool_executor);
1682                let skills = self.filtered_skills_for(&name);
1683                let cfg = self.orchestration.subagent_config.clone();
1684                let spawn_ctx = self.build_spawn_context(&cfg);
1685                let mgr = self.orchestration.subagent_manager.as_mut()?;
1686                let task_id = match mgr.spawn(
1687                    &name,
1688                    &prompt,
1689                    provider,
1690                    tool_executor,
1691                    skills,
1692                    &cfg,
1693                    spawn_ctx,
1694                ) {
1695                    Ok(id) => id,
1696                    Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
1697                };
1698                let short = task_id[..8.min(task_id.len())].to_owned();
1699                let _ = self
1700                    .channel
1701                    .send(&format!("Sub-agent '{name}' running... (id: {short})"))
1702                    .await;
1703                let label = format!("Sub-agent '{name}'");
1704                self.poll_subagent_until_done(&task_id, &label).await
1705            }
1706            AgentCommand::Status => self.handle_agent_status(),
1707            AgentCommand::Cancel { id } => {
1708                let mgr = self.orchestration.subagent_manager.as_mut()?;
1709                // Accept prefix match on task_id.
1710                let ids: Vec<String> = mgr
1711                    .statuses()
1712                    .into_iter()
1713                    .map(|(task_id, _)| task_id)
1714                    .filter(|task_id| task_id.starts_with(&id))
1715                    .collect();
1716                match ids.as_slice() {
1717                    [] => Some(format!("No sub-agent with id prefix '{id}'")),
1718                    [full_id] => {
1719                        let full_id = full_id.clone();
1720                        match mgr.cancel(&full_id) {
1721                            Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
1722                            Err(e) => Some(format!("Cancel failed: {e}")),
1723                        }
1724                    }
1725                    _ => Some(format!(
1726                        "Ambiguous id prefix '{id}': matches {} agents",
1727                        ids.len()
1728                    )),
1729                }
1730            }
1731            AgentCommand::Approve { id } => self.handle_agent_approve(&id),
1732            AgentCommand::Deny { id } => self.handle_agent_deny(&id),
1733            AgentCommand::Resume { id, prompt } => {
1734                let cfg = self.orchestration.subagent_config.clone();
1735                // Resolve definition name from transcript meta before spawning so we can
1736                // look up skills by definition name rather than the UUID prefix (S1 fix).
1737                let def_name = {
1738                    let mgr = self.orchestration.subagent_manager.as_ref()?;
1739                    match mgr.def_name_for_resume(&id, &cfg) {
1740                        Ok(name) => name,
1741                        Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
1742                    }
1743                };
1744                let skills = self.filtered_skills_for(&def_name);
1745                let provider = self.provider.clone();
1746                let tool_executor = Arc::clone(&self.tool_executor);
1747                let mgr = self.orchestration.subagent_manager.as_mut()?;
1748                let (task_id, _) =
1749                    match mgr.resume(&id, &prompt, provider, tool_executor, skills, &cfg) {
1750                        Ok(pair) => pair,
1751                        Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
1752                    };
1753                let short = task_id[..8.min(task_id.len())].to_owned();
1754                let _ = self
1755                    .channel
1756                    .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
1757                    .await;
1758                self.poll_subagent_until_done(&task_id, "Resumed sub-agent")
1759                    .await
1760            }
1761        }
1762    }
1763
1764    fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
1765        let mgr = self.orchestration.subagent_manager.as_ref()?;
1766        let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
1767        let reg = self
1768            .skill_state
1769            .registry
1770            .read()
1771            .expect("registry read lock");
1772        match crate::subagent::filter_skills(&reg, &def.skills) {
1773            Ok(skills) => {
1774                let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
1775                if bodies.is_empty() {
1776                    None
1777                } else {
1778                    Some(bodies)
1779                }
1780            }
1781            Err(e) => {
1782                tracing::warn!(error = %e, "skill filtering failed for sub-agent");
1783                None
1784            }
1785        }
1786    }
1787
1788    /// Build a `SpawnContext` from current agent state for sub-agent spawning.
1789    fn build_spawn_context(
1790        &self,
1791        cfg: &zeph_config::SubAgentConfig,
1792    ) -> crate::subagent::SpawnContext {
1793        crate::subagent::SpawnContext {
1794            parent_messages: self.extract_parent_messages(cfg),
1795            parent_cancel: Some(self.lifecycle.cancel_token.clone()),
1796            parent_provider_name: {
1797                let name = &self.runtime.active_provider_name;
1798                if name.is_empty() {
1799                    None
1800                } else {
1801                    Some(name.clone())
1802                }
1803            },
1804            spawn_depth: self.runtime.spawn_depth,
1805            mcp_tool_names: self.extract_mcp_tool_names(),
1806        }
1807    }
1808
1809    /// Extract recent parent messages for history propagation (Section 5.7 in spec).
1810    ///
1811    /// Filters system messages, takes last `context_window_turns * 2` messages,
1812    /// and applies a 25% context window cap using a 4-chars-per-token heuristic.
1813    fn extract_parent_messages(
1814        &self,
1815        config: &zeph_config::SubAgentConfig,
1816    ) -> Vec<zeph_llm::provider::Message> {
1817        use zeph_llm::provider::Role;
1818        if config.context_window_turns == 0 {
1819            return Vec::new();
1820        }
1821        let non_system: Vec<_> = self
1822            .msg
1823            .messages
1824            .iter()
1825            .filter(|m| m.role != Role::System)
1826            .cloned()
1827            .collect();
1828        let take_count = config.context_window_turns * 2;
1829        let start = non_system.len().saturating_sub(take_count);
1830        let mut msgs = non_system[start..].to_vec();
1831
1832        // Cap at 25% of model context window (rough 4-chars-per-token heuristic).
1833        let max_chars = 128_000usize / 4; // conservative default; 25% of 128K tokens
1834        let mut total_chars: usize = 0;
1835        let mut keep = msgs.len();
1836        for (i, m) in msgs.iter().enumerate() {
1837            total_chars += m.content.len();
1838            if total_chars > max_chars {
1839                keep = i;
1840                break;
1841            }
1842        }
1843        if keep < msgs.len() {
1844            tracing::info!(
1845                kept = keep,
1846                requested = config.context_window_turns * 2,
1847                "[subagent] truncated parent history from {} to {} turns due to token budget",
1848                config.context_window_turns * 2,
1849                keep
1850            );
1851            msgs.truncate(keep);
1852        }
1853        msgs
1854    }
1855
1856    /// Extract MCP tool names from the tool executor for diagnostic annotation.
1857    fn extract_mcp_tool_names(&self) -> Vec<String> {
1858        self.tool_executor
1859            .tool_definitions_erased()
1860            .into_iter()
1861            .filter(|t| t.id.starts_with("mcp_"))
1862            .map(|t| t.id.to_string())
1863            .collect()
1864    }
1865
1866    /// Update trust DB records for all reloaded skills.
1867    async fn update_trust_for_reloaded_skills(&self, all_meta: &[zeph_skills::loader::SkillMeta]) {
1868        let Some(ref memory) = self.memory_state.memory else {
1869            return;
1870        };
1871        let trust_cfg = self.skill_state.trust_config.clone();
1872        let managed_dir = self.skill_state.managed_dir.clone();
1873        for meta in all_meta {
1874            let source_kind = if managed_dir
1875                .as_ref()
1876                .is_some_and(|d| meta.skill_dir.starts_with(d))
1877            {
1878                zeph_memory::store::SourceKind::Hub
1879            } else {
1880                zeph_memory::store::SourceKind::Local
1881            };
1882            let initial_level = if matches!(source_kind, zeph_memory::store::SourceKind::Hub) {
1883                &trust_cfg.default_level
1884            } else {
1885                &trust_cfg.local_level
1886            };
1887            match zeph_skills::compute_skill_hash(&meta.skill_dir) {
1888                Ok(current_hash) => {
1889                    let existing = memory
1890                        .sqlite()
1891                        .load_skill_trust(&meta.name)
1892                        .await
1893                        .ok()
1894                        .flatten();
1895                    let trust_level_str = if let Some(ref row) = existing {
1896                        if row.blake3_hash == current_hash {
1897                            row.trust_level.clone()
1898                        } else {
1899                            trust_cfg.hash_mismatch_level.to_string()
1900                        }
1901                    } else {
1902                        initial_level.to_string()
1903                    };
1904                    let source_path = meta.skill_dir.to_str();
1905                    if let Err(e) = memory
1906                        .sqlite()
1907                        .upsert_skill_trust(
1908                            &meta.name,
1909                            &trust_level_str,
1910                            source_kind,
1911                            None,
1912                            source_path,
1913                            &current_hash,
1914                        )
1915                        .await
1916                    {
1917                        tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
1918                    }
1919                }
1920                Err(e) => {
1921                    tracing::warn!("failed to compute hash for '{}': {e:#}", meta.name);
1922                }
1923            }
1924        }
1925    }
1926
1927    /// Rebuild or sync the in-memory skill matcher and BM25 index after a registry update.
1928    async fn rebuild_skill_matcher(&mut self, all_meta: &[&zeph_skills::loader::SkillMeta]) {
1929        let provider = self.embedding_provider.clone();
1930        let embed_fn = |text: &str| -> zeph_skills::matcher::EmbedFuture {
1931            let owned = text.to_owned();
1932            let p = provider.clone();
1933            Box::pin(async move { p.embed(&owned).await })
1934        };
1935
1936        let needs_inmemory_rebuild = !self
1937            .skill_state
1938            .matcher
1939            .as_ref()
1940            .is_some_and(SkillMatcherBackend::is_qdrant);
1941
1942        if needs_inmemory_rebuild {
1943            self.skill_state.matcher = SkillMatcher::new(all_meta, embed_fn)
1944                .await
1945                .map(SkillMatcherBackend::InMemory);
1946        } else if let Some(ref mut backend) = self.skill_state.matcher {
1947            let _ = self.channel.send_status("syncing skill index...").await;
1948            if let Err(e) = backend
1949                .sync(all_meta, &self.skill_state.embedding_model, embed_fn)
1950                .await
1951            {
1952                tracing::warn!("failed to sync skill embeddings: {e:#}");
1953            }
1954        }
1955
1956        if self.skill_state.hybrid_search {
1957            let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
1958            let _ = self.channel.send_status("rebuilding search index...").await;
1959            self.skill_state.bm25_index = Some(zeph_skills::bm25::Bm25Index::build(&descs));
1960        }
1961    }
1962
1963    async fn reload_skills(&mut self) {
1964        let new_registry = SkillRegistry::load(&self.skill_state.skill_paths);
1965        if new_registry.fingerprint()
1966            == self
1967                .skill_state
1968                .registry
1969                .read()
1970                .expect("registry read lock")
1971                .fingerprint()
1972        {
1973            return;
1974        }
1975        let _ = self.channel.send_status("reloading skills...").await;
1976        *self
1977            .skill_state
1978            .registry
1979            .write()
1980            .expect("registry write lock") = new_registry;
1981
1982        let all_meta = self
1983            .skill_state
1984            .registry
1985            .read()
1986            .expect("registry read lock")
1987            .all_meta()
1988            .into_iter()
1989            .cloned()
1990            .collect::<Vec<_>>();
1991
1992        self.update_trust_for_reloaded_skills(&all_meta).await;
1993
1994        let all_meta_refs = all_meta.iter().collect::<Vec<_>>();
1995        self.rebuild_skill_matcher(&all_meta_refs).await;
1996
1997        let all_skills: Vec<Skill> = {
1998            let reg = self
1999                .skill_state
2000                .registry
2001                .read()
2002                .expect("registry read lock");
2003            reg.all_meta()
2004                .iter()
2005                .filter_map(|m| reg.get_skill(&m.name).ok())
2006                .collect()
2007        };
2008        let trust_map = self.build_skill_trust_map().await;
2009        let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
2010        let skills_prompt = format_skills_prompt(&all_skills, &trust_map, &empty_health);
2011        self.skill_state
2012            .last_skills_prompt
2013            .clone_from(&skills_prompt);
2014        let system_prompt = build_system_prompt(&skills_prompt, None, None, false);
2015        if let Some(msg) = self.msg.messages.first_mut() {
2016            msg.content = system_prompt;
2017        }
2018
2019        let _ = self.channel.send_status("").await;
2020        tracing::info!(
2021            "reloaded {} skill(s)",
2022            self.skill_state
2023                .registry
2024                .read()
2025                .expect("registry read lock")
2026                .all_meta()
2027                .len()
2028        );
2029    }
2030
2031    fn reload_instructions(&mut self) {
2032        // Drain any additional queued events before reloading to avoid redundant reloads.
2033        if let Some(ref mut rx) = self.instructions.reload_rx {
2034            while rx.try_recv().is_ok() {}
2035        }
2036        let Some(ref state) = self.instructions.reload_state else {
2037            return;
2038        };
2039        let new_blocks = crate::instructions::load_instructions(
2040            &state.base_dir,
2041            &state.provider_kinds,
2042            &state.explicit_files,
2043            state.auto_detect,
2044        );
2045        let old_sources: std::collections::HashSet<_> =
2046            self.instructions.blocks.iter().map(|b| &b.source).collect();
2047        let new_sources: std::collections::HashSet<_> =
2048            new_blocks.iter().map(|b| &b.source).collect();
2049        for added in new_sources.difference(&old_sources) {
2050            tracing::info!(path = %added.display(), "instruction file added");
2051        }
2052        for removed in old_sources.difference(&new_sources) {
2053            tracing::info!(path = %removed.display(), "instruction file removed");
2054        }
2055        tracing::info!(
2056            old_count = self.instructions.blocks.len(),
2057            new_count = new_blocks.len(),
2058            "reloaded instruction files"
2059        );
2060        self.instructions.blocks = new_blocks;
2061    }
2062
2063    fn reload_config(&mut self) {
2064        let Some(ref path) = self.lifecycle.config_path else {
2065            return;
2066        };
2067        let config = match Config::load(path) {
2068            Ok(c) => c,
2069            Err(e) => {
2070                tracing::warn!("config reload failed: {e:#}");
2071                return;
2072            }
2073        };
2074
2075        self.runtime.security = config.security;
2076        self.runtime.timeouts = config.timeouts;
2077        self.runtime.redact_credentials = config.memory.redact_credentials;
2078        self.memory_state.history_limit = config.memory.history_limit;
2079        self.memory_state.recall_limit = config.memory.semantic.recall_limit;
2080        self.memory_state.summarization_threshold = config.memory.summarization_threshold;
2081        self.skill_state.max_active_skills = config.skills.max_active_skills;
2082        self.skill_state.disambiguation_threshold = config.skills.disambiguation_threshold;
2083        self.skill_state.min_injection_score = config.skills.min_injection_score;
2084        self.skill_state.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
2085        self.skill_state.hybrid_search = config.skills.hybrid_search;
2086        self.skill_state.two_stage_matching = config.skills.two_stage_matching;
2087        self.skill_state.confusability_threshold =
2088            config.skills.confusability_threshold.clamp(0.0, 1.0);
2089        config
2090            .skills
2091            .generation_provider
2092            .as_str()
2093            .clone_into(&mut self.skill_state.generation_provider_name);
2094        self.skill_state.generation_output_dir =
2095            config.skills.generation_output_dir.as_deref().map(|p| {
2096                if let Some(stripped) = p.strip_prefix("~/") {
2097                    dirs::home_dir()
2098                        .map_or_else(|| std::path::PathBuf::from(p), |h| h.join(stripped))
2099                } else {
2100                    std::path::PathBuf::from(p)
2101                }
2102            });
2103
2104        if config.memory.context_budget_tokens > 0 {
2105            self.context_manager.budget = Some(
2106                ContextBudget::new(config.memory.context_budget_tokens, 0.20)
2107                    .with_graph_enabled(config.memory.graph.enabled),
2108            );
2109        } else {
2110            self.context_manager.budget = None;
2111        }
2112
2113        {
2114            let graph_cfg = &config.memory.graph;
2115            if graph_cfg.rpe.enabled {
2116                // Re-create router only if it doesn't exist yet; preserve state on hot-reload.
2117                if self.memory_state.rpe_router.is_none() {
2118                    self.memory_state.rpe_router =
2119                        Some(std::sync::Mutex::new(zeph_memory::RpeRouter::new(
2120                            graph_cfg.rpe.threshold,
2121                            graph_cfg.rpe.max_skip_turns,
2122                        )));
2123                }
2124            } else {
2125                self.memory_state.rpe_router = None;
2126            }
2127            self.memory_state.graph_config = graph_cfg.clone();
2128        }
2129        self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
2130        self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
2131        self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
2132        self.context_manager.compaction_cooldown_turns = config.memory.compaction_cooldown_turns;
2133        self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
2134        self.context_manager.compression = config.memory.compression.clone();
2135        self.context_manager.routing = config.memory.store_routing.clone();
2136        // Resolve routing_classifier_provider from the provider pool (#2484).
2137        self.context_manager.store_routing_provider = if config
2138            .memory
2139            .store_routing
2140            .routing_classifier_provider
2141            .is_empty()
2142        {
2143            None
2144        } else {
2145            let resolved = self.resolve_background_provider(
2146                &config.memory.store_routing.routing_classifier_provider,
2147            );
2148            Some(std::sync::Arc::new(resolved))
2149        };
2150        self.memory_state.cross_session_score_threshold =
2151            config.memory.cross_session_score_threshold;
2152
2153        self.index.repo_map_tokens = config.index.repo_map_tokens;
2154        self.index.repo_map_ttl = std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
2155
2156        tracing::info!("config reloaded");
2157    }
2158
2159    /// `/focus` slash command: display Focus Agent status.
2160    async fn handle_focus_status_command(&mut self) -> Result<(), error::AgentError> {
2161        use std::fmt::Write;
2162        let mut out = String::from("Focus Agent status\n\n");
2163        let _ = writeln!(out, "Enabled:          {}", self.focus.config.enabled);
2164        let _ = writeln!(out, "Active session:   {}", self.focus.is_active());
2165        if let Some(ref scope) = self.focus.active_scope {
2166            let _ = writeln!(out, "Active scope:     {scope}");
2167        }
2168        let _ = writeln!(
2169            out,
2170            "Knowledge blocks: {}",
2171            self.focus.knowledge_blocks.len()
2172        );
2173        let _ = writeln!(out, "Turns since focus: {}", self.focus.turns_since_focus);
2174        self.channel.send(&out).await?;
2175        Ok(())
2176    }
2177
2178    /// `/sidequest` slash command: display `SideQuest` eviction stats.
2179    async fn handle_sidequest_status_command(&mut self) -> Result<(), error::AgentError> {
2180        use std::fmt::Write;
2181        let mut out = String::from("SideQuest status\n\n");
2182        let _ = writeln!(out, "Enabled:        {}", self.sidequest.config.enabled);
2183        let _ = writeln!(
2184            out,
2185            "Interval turns: {}",
2186            self.sidequest.config.interval_turns
2187        );
2188        let _ = writeln!(out, "Turn counter:   {}", self.sidequest.turn_counter);
2189        let _ = writeln!(out, "Passes run:     {}", self.sidequest.passes_run);
2190        let _ = writeln!(
2191            out,
2192            "Total evicted:  {} tool outputs",
2193            self.sidequest.total_evicted
2194        );
2195        self.channel.send(&out).await?;
2196        Ok(())
2197    }
2198
2199    /// Run `SideQuest` tool output eviction pass (#1885).
2200    ///
2201    /// PERF-1 fix: two-phase non-blocking design.
2202    ///
2203    /// Phase 1 (apply, this turn): check for a background LLM result spawned last turn,
2204    /// validate and apply it immediately.
2205    ///
2206    /// Phase 2 (schedule, this turn): rebuild cursors and spawn a background `tokio::spawn`
2207    /// task for the LLM call. The result is stored in `pending_sidequest_result` and applied
2208    /// next turn, so the current agent turn is never blocked by the LLM call.
2209    #[allow(clippy::too_many_lines)]
2210    fn maybe_sidequest_eviction(&mut self) {
2211        use zeph_llm::provider::{Message, MessageMetadata, Role};
2212
2213        // S1 runtime guard: warn when SideQuest is enabled alongside a non-Reactive pruning
2214        // strategy — the two systems share the same pool of evictable tool outputs and can
2215        // interfere. Disable sidequest.enabled when pruning_strategy != Reactive.
2216        if self.sidequest.config.enabled {
2217            use crate::config::PruningStrategy;
2218            if !matches!(
2219                self.context_manager.compression.pruning_strategy,
2220                PruningStrategy::Reactive
2221            ) {
2222                tracing::warn!(
2223                    strategy = ?self.context_manager.compression.pruning_strategy,
2224                    "sidequest is enabled alongside a non-Reactive pruning strategy; \
2225                     consider disabling sidequest.enabled to avoid redundant eviction"
2226                );
2227            }
2228        }
2229
2230        // Guard: do not evict while a focus session is active.
2231        if self.focus.is_active() {
2232            tracing::debug!("sidequest: skipping — focus session active");
2233            // Drop any pending result — cursors may be stale relative to focus truncation.
2234            self.compression.pending_sidequest_result = None;
2235            return;
2236        }
2237
2238        // Phase 1: apply pending result from last turn's background LLM call.
2239        if let Some(handle) = self.compression.pending_sidequest_result.take() {
2240            // `now_or_never` avoids blocking — if the task isn't done yet, skip this turn.
2241            use futures::FutureExt as _;
2242            match handle.now_or_never() {
2243                Some(Ok(Some(evicted_indices))) if !evicted_indices.is_empty() => {
2244                    let cursors_snapshot = self.sidequest.tool_output_cursors.clone();
2245                    let freed = self.sidequest.apply_eviction(
2246                        &mut self.msg.messages,
2247                        &evicted_indices,
2248                        &self.metrics.token_counter,
2249                    );
2250                    if freed > 0 {
2251                        self.recompute_prompt_tokens();
2252                        // C1 fix: prevent maybe_compact() from firing in the same turn.
2253                        // cooldown=0: eviction does not impose post-compaction cooldown.
2254                        self.context_manager.compaction =
2255                            crate::agent::context_manager::CompactionState::CompactedThisTurn {
2256                                cooldown: 0,
2257                            };
2258                        tracing::info!(
2259                            freed_tokens = freed,
2260                            evicted_cursors = evicted_indices.len(),
2261                            pass = self.sidequest.passes_run,
2262                            "sidequest eviction complete"
2263                        );
2264                        if let Some(ref d) = self.debug_state.debug_dumper {
2265                            d.dump_sidequest_eviction(&cursors_snapshot, &evicted_indices, freed);
2266                        }
2267                        if let Some(ref tx) = self.session.status_tx {
2268                            let _ = tx.send(format!("SideQuest evicted {freed} tokens"));
2269                        }
2270                    } else {
2271                        // apply_eviction returned 0 — clear spinner so it doesn't dangle.
2272                        if let Some(ref tx) = self.session.status_tx {
2273                            let _ = tx.send(String::new());
2274                        }
2275                    }
2276                }
2277                Some(Ok(None | Some(_))) => {
2278                    tracing::debug!("sidequest: pending result: no cursors to evict");
2279                    if let Some(ref tx) = self.session.status_tx {
2280                        let _ = tx.send(String::new());
2281                    }
2282                }
2283                Some(Err(e)) => {
2284                    tracing::debug!("sidequest: background task panicked: {e}");
2285                    if let Some(ref tx) = self.session.status_tx {
2286                        let _ = tx.send(String::new());
2287                    }
2288                }
2289                None => {
2290                    // Task still running — re-store and wait another turn.
2291                    // We already took it; we'd need to re-spawn, but instead just drop and
2292                    // schedule fresh below to keep the cursor list current.
2293                    tracing::debug!(
2294                        "sidequest: background LLM task not yet complete, rescheduling"
2295                    );
2296                }
2297            }
2298        }
2299
2300        // Phase 2: rebuild cursors and schedule the next background eviction LLM call.
2301        self.sidequest
2302            .rebuild_cursors(&self.msg.messages, &self.metrics.token_counter);
2303
2304        if self.sidequest.tool_output_cursors.is_empty() {
2305            tracing::debug!("sidequest: no eligible cursors");
2306            return;
2307        }
2308
2309        let prompt = self.sidequest.build_eviction_prompt();
2310        let max_eviction_ratio = self.sidequest.config.max_eviction_ratio;
2311        let n_cursors = self.sidequest.tool_output_cursors.len();
2312        // Clone the provider so the spawn closure owns it without borrowing self.
2313        let provider = self.summary_or_primary_provider().clone();
2314
2315        // Spawn background task: the LLM call runs without blocking the agent loop.
2316        let handle = tokio::spawn(async move {
2317            let msgs = [Message {
2318                role: Role::User,
2319                content: prompt,
2320                parts: vec![],
2321                metadata: MessageMetadata::default(),
2322            }];
2323            let response =
2324                match tokio::time::timeout(std::time::Duration::from_secs(5), provider.chat(&msgs))
2325                    .await
2326                {
2327                    Ok(Ok(r)) => r,
2328                    Ok(Err(e)) => {
2329                        tracing::debug!("sidequest bg: LLM call failed: {e:#}");
2330                        return None;
2331                    }
2332                    Err(_) => {
2333                        tracing::debug!("sidequest bg: LLM call timed out");
2334                        return None;
2335                    }
2336                };
2337
2338            let start = response.find('{')?;
2339            let end = response.rfind('}')?;
2340            if start > end {
2341                return None;
2342            }
2343            let json_slice = &response[start..=end];
2344            let parsed: sidequest::EvictionResponse = serde_json::from_str(json_slice).ok()?;
2345            let mut valid: Vec<usize> = parsed
2346                .del_cursors
2347                .into_iter()
2348                .filter(|&c| c < n_cursors)
2349                .collect();
2350            valid.sort_unstable();
2351            valid.dedup();
2352            #[allow(
2353                clippy::cast_precision_loss,
2354                clippy::cast_possible_truncation,
2355                clippy::cast_sign_loss
2356            )]
2357            let max_evict = ((n_cursors as f32) * max_eviction_ratio).ceil() as usize;
2358            valid.truncate(max_evict);
2359            Some(valid)
2360        });
2361
2362        self.compression.pending_sidequest_result = Some(handle);
2363        tracing::debug!("sidequest: background LLM eviction task spawned");
2364        if let Some(ref tx) = self.session.status_tx {
2365            let _ = tx.send("SideQuest: scoring tool outputs...".into());
2366        }
2367    }
2368
2369    /// Check if the process cwd has changed since last call and fire `CwdChanged` hooks.
2370    ///
2371    /// Called after each tool batch completes. The check is a single syscall and has
2372    /// negligible cost. Only fires when cwd actually changed (defense-in-depth: normally
2373    /// only `set_working_directory` changes cwd; shell child processes cannot affect it).
2374    pub(crate) async fn check_cwd_changed(&mut self) {
2375        let current = match std::env::current_dir() {
2376            Ok(p) => p,
2377            Err(e) => {
2378                tracing::warn!("check_cwd_changed: failed to get cwd: {e}");
2379                return;
2380            }
2381        };
2382        if current == self.lifecycle.last_known_cwd {
2383            return;
2384        }
2385        let old_cwd = std::mem::replace(&mut self.lifecycle.last_known_cwd, current.clone());
2386        self.session.env_context.working_dir = current.display().to_string();
2387
2388        tracing::info!(
2389            old = %old_cwd.display(),
2390            new = %current.display(),
2391            "working directory changed"
2392        );
2393
2394        let _ = self
2395            .channel
2396            .send_status("Working directory changed\u{2026}")
2397            .await;
2398
2399        let hooks = self.session.hooks_config.cwd_changed.clone();
2400        if !hooks.is_empty() {
2401            let mut env = std::collections::HashMap::new();
2402            env.insert("ZEPH_OLD_CWD".to_owned(), old_cwd.display().to_string());
2403            env.insert("ZEPH_NEW_CWD".to_owned(), current.display().to_string());
2404            if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env).await {
2405                tracing::warn!(error = %e, "CwdChanged hook failed");
2406            }
2407        }
2408
2409        let _ = self.channel.send_status("").await;
2410    }
2411
2412    /// Handle a `FileChangedEvent` from the file watcher.
2413    pub(crate) async fn handle_file_changed(
2414        &mut self,
2415        event: crate::file_watcher::FileChangedEvent,
2416    ) {
2417        tracing::info!(path = %event.path.display(), "file changed");
2418
2419        let _ = self
2420            .channel
2421            .send_status("Running file-change hook\u{2026}")
2422            .await;
2423
2424        let hooks = self.session.hooks_config.file_changed_hooks.clone();
2425        if !hooks.is_empty() {
2426            let mut env = std::collections::HashMap::new();
2427            env.insert(
2428                "ZEPH_CHANGED_PATH".to_owned(),
2429                event.path.display().to_string(),
2430            );
2431            if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env).await {
2432                tracing::warn!(error = %e, "FileChanged hook failed");
2433            }
2434        }
2435
2436        let _ = self.channel.send_status("").await;
2437    }
2438}
2439pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
2440    while !*rx.borrow_and_update() {
2441        if rx.changed().await.is_err() {
2442            std::future::pending::<()>().await;
2443        }
2444    }
2445}
2446
2447pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
2448    match rx {
2449        Some(inner) => {
2450            if let Some(v) = inner.recv().await {
2451                Some(v)
2452            } else {
2453                *rx = None;
2454                std::future::pending().await
2455            }
2456        }
2457        None => std::future::pending().await,
2458    }
2459}
2460
2461#[cfg(test)]
2462mod tests;
2463
2464#[cfg(test)]
2465pub(crate) use tests::agent_tests;