1mod accessors;
5mod builder;
6pub(crate) mod compaction_strategy;
7pub(super) mod compression_feedback;
8mod context;
9pub(crate) mod context_manager;
10mod corrections;
11pub mod error;
12mod experiment_cmd;
13pub(super) mod feedback_detector;
14pub(crate) mod focus;
15mod graph_commands;
16mod guidelines_commands;
17mod index;
18mod learning;
19pub(crate) mod learning_engine;
20mod log_commands;
21mod lsp_commands;
22mod mcp;
23mod memory_commands;
24mod message_queue;
25mod model_commands;
26mod persistence;
27mod plan;
28mod policy_commands;
29mod provider_cmd;
30pub(crate) mod rate_limiter;
31#[cfg(feature = "scheduler")]
32mod scheduler_commands;
33mod scheduler_loop;
34pub mod session_config;
35mod session_digest;
36pub(crate) mod sidequest;
37mod skill_management;
38pub mod slash_commands;
39pub(crate) mod state;
40pub(crate) mod tool_execution;
41pub(crate) mod tool_orchestrator;
42mod trust_commands;
43mod utils;
44
45use std::collections::{HashMap, HashSet, VecDeque};
46use std::sync::Arc;
47use std::time::Instant;
48
49use tokio::sync::{Notify, mpsc, watch};
50use tokio_util::sync::CancellationToken;
51use zeph_llm::any::AnyProvider;
52use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
53use zeph_memory::TokenCounter;
54use zeph_memory::semantic::SemanticMemory;
55use zeph_skills::loader::Skill;
56use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
57use zeph_skills::prompt::format_skills_prompt;
58use zeph_skills::registry::SkillRegistry;
59use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
60
61use crate::channel::Channel;
62use crate::config::Config;
63use crate::config::{SecurityConfig, SkillPromptMode, TimeoutConfig};
64use crate::context::{
65 ContextBudget, EnvironmentContext, build_system_prompt, build_system_prompt_with_instructions,
66};
67use zeph_sanitizer::ContentSanitizer;
68
69use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, detect_image_mime};
70use state::CompressionState;
71use state::{
72 DebugState, ExperimentState, FeedbackState, IndexState, InstructionState, LifecycleState,
73 McpState, MemoryState, MessageState, MetricsState, OrchestrationState, ProviderState,
74 RuntimeConfig, SecurityState, SessionState, SkillState,
75};
76
77pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
78pub(crate) const DOCUMENT_RAG_PREFIX: &str = "## Relevant documents\n";
79pub(crate) const RECALL_PREFIX: &str = "[semantic recall]\n";
80pub(crate) const CODE_CONTEXT_PREFIX: &str = "[code context]\n";
81pub(crate) const SUMMARY_PREFIX: &str = "[conversation summaries]\n";
82pub(crate) const CROSS_SESSION_PREFIX: &str = "[cross-session context]\n";
83pub(crate) const CORRECTIONS_PREFIX: &str = "[past corrections]\n";
84pub(crate) const GRAPH_FACTS_PREFIX: &str = "[known facts]\n";
85pub(crate) const SCHEDULED_TASK_PREFIX: &str = "Execute the following scheduled task now: ";
86pub(crate) const SESSION_DIGEST_PREFIX: &str = "[Session digest from previous interaction]\n";
87pub(crate) const LSP_NOTE_PREFIX: &str = "[lsp ";
92pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
93
94pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
95 use std::fmt::Write;
96 let capacity = "[tool output: ".len()
97 + tool_name.len()
98 + "]\n```\n".len()
99 + body.len()
100 + TOOL_OUTPUT_SUFFIX.len();
101 let mut buf = String::with_capacity(capacity);
102 let _ = write!(
103 buf,
104 "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
105 );
106 buf
107}
108
109pub struct Agent<C: Channel> {
110 provider: AnyProvider,
111 embedding_provider: AnyProvider,
116 channel: C,
117 pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
118 pub(super) msg: MessageState,
119 pub(super) memory_state: MemoryState,
120 pub(super) skill_state: SkillState,
121 pub(super) context_manager: context_manager::ContextManager,
122 pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
123 pub(super) learning_engine: learning_engine::LearningEngine,
124 pub(super) feedback: FeedbackState,
125 pub(super) runtime: RuntimeConfig,
126 pub(super) mcp: McpState,
127 pub(super) index: IndexState,
128 pub(super) session: SessionState,
129 pub(super) debug_state: DebugState,
130 pub(super) instructions: InstructionState,
131 pub(super) security: SecurityState,
132 pub(super) experiments: ExperimentState,
133 pub(super) compression: CompressionState,
134 pub(super) lifecycle: LifecycleState,
135 pub(super) providers: ProviderState,
136 pub(super) metrics: MetricsState,
137 pub(super) orchestration: OrchestrationState,
138 pub(super) focus: focus::FocusState,
140 pub(super) sidequest: sidequest::SidequestState,
142 pub(super) tool_schema_filter: Option<zeph_tools::ToolSchemaFilter>,
144 pub(super) cached_filtered_tool_ids: Option<HashSet<String>>,
147 pub(super) dependency_graph: Option<zeph_tools::ToolDependencyGraph>,
150 pub(super) dependency_always_on: HashSet<String>,
152 pub(super) completed_tool_ids: HashSet<String>,
156 pub(super) last_persisted_message_id: Option<i64>,
159 pub(super) deferred_db_hide_ids: Vec<i64>,
161 pub(super) deferred_db_summaries: Vec<String>,
163 pub(super) runtime_layers: Vec<std::sync::Arc<dyn crate::runtime_layer::RuntimeLayer>>,
167 pub(super) current_tool_iteration: usize,
170}
171
172impl<C: Channel> Agent<C> {
173 #[must_use]
174 pub fn new(
175 provider: AnyProvider,
176 channel: C,
177 registry: SkillRegistry,
178 matcher: Option<SkillMatcherBackend>,
179 max_active_skills: usize,
180 tool_executor: impl ToolExecutor + 'static,
181 ) -> Self {
182 let registry = std::sync::Arc::new(std::sync::RwLock::new(registry));
183 Self::new_with_registry_arc(
184 provider,
185 channel,
186 registry,
187 matcher,
188 max_active_skills,
189 tool_executor,
190 )
191 }
192
193 #[must_use]
200 #[allow(clippy::too_many_lines)] pub fn new_with_registry_arc(
202 provider: AnyProvider,
203 channel: C,
204 registry: std::sync::Arc<std::sync::RwLock<SkillRegistry>>,
205 matcher: Option<SkillMatcherBackend>,
206 max_active_skills: usize,
207 tool_executor: impl ToolExecutor + 'static,
208 ) -> Self {
209 debug_assert!(max_active_skills > 0, "max_active_skills must be > 0");
210 let all_skills: Vec<Skill> = {
211 let reg = registry.read().expect("registry read lock poisoned");
212 reg.all_meta()
213 .iter()
214 .filter_map(|m| reg.get_skill(&m.name).ok())
215 .collect()
216 };
217 let empty_trust = HashMap::new();
218 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
219 let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
220 let system_prompt = build_system_prompt(&skills_prompt, None, None, false);
221 tracing::debug!(len = system_prompt.len(), "initial system prompt built");
222 tracing::trace!(prompt = %system_prompt, "full system prompt");
223
224 let initial_prompt_tokens = u64::try_from(system_prompt.len()).unwrap_or(0) / 4;
225 let (_tx, rx) = watch::channel(false);
226 let token_counter = Arc::new(TokenCounter::new());
227 let (exp_notify_tx, exp_notify_rx) = tokio::sync::mpsc::channel::<String>(4);
231 let embedding_provider = provider.clone();
232 Self {
233 provider,
234 embedding_provider,
235 channel,
236 tool_executor: Arc::new(tool_executor),
237 msg: MessageState {
238 messages: vec![Message {
239 role: Role::System,
240 content: system_prompt,
241 parts: vec![],
242 metadata: MessageMetadata::default(),
243 }],
244 message_queue: VecDeque::new(),
245 pending_image_parts: Vec::new(),
246 },
247 memory_state: MemoryState {
248 memory: None,
249 conversation_id: None,
250 history_limit: 50,
251 recall_limit: 5,
252 summarization_threshold: 50,
253 cross_session_score_threshold: 0.35,
254 autosave_assistant: false,
255 autosave_min_length: 20,
256 tool_call_cutoff: 6,
257 unsummarized_count: 0,
258 document_config: crate::config::DocumentConfig::default(),
259 graph_config: crate::config::GraphConfig::default(),
260 compression_guidelines_config: zeph_memory::CompressionGuidelinesConfig::default(),
261 shutdown_summary: true,
262 shutdown_summary_min_messages: 4,
263 shutdown_summary_max_messages: 20,
264 shutdown_summary_timeout_secs: 10,
265 structured_summaries: false,
266 last_recall_confidence: None,
267 digest_config: crate::config::DigestConfig::default(),
268 cached_session_digest: None,
269 context_strategy: crate::config::ContextStrategy::default(),
270 crossover_turn_threshold: 20,
271 rpe_router: None,
272 goal_text: None,
273 persona_config: crate::config::PersonaConfig::default(),
274 },
275 skill_state: SkillState {
276 registry,
277 skill_paths: Vec::new(),
278 managed_dir: None,
279 trust_config: crate::config::TrustConfig::default(),
280 matcher,
281 max_active_skills,
282 disambiguation_threshold: 0.20,
283 min_injection_score: 0.20,
284 embedding_model: String::new(),
285 skill_reload_rx: None,
286 active_skill_names: Vec::new(),
287 last_skills_prompt: skills_prompt,
288 prompt_mode: SkillPromptMode::Auto,
289 available_custom_secrets: HashMap::new(),
290 cosine_weight: 0.7,
291 hybrid_search: false,
292 bm25_index: None,
293 two_stage_matching: false,
294 confusability_threshold: 0.0,
295 rl_head: None,
296 rl_weight: 0.3,
297 rl_warmup_updates: 50,
298 generation_output_dir: None,
299 generation_provider_name: String::new(),
300 },
301 context_manager: context_manager::ContextManager::new(),
302 tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
303 learning_engine: learning_engine::LearningEngine::new(),
304 feedback: FeedbackState {
305 detector: feedback_detector::FeedbackDetector::new(0.6),
306 judge: None,
307 llm_classifier: None,
308 },
309 debug_state: DebugState {
310 debug_dumper: None,
311 dump_format: crate::debug_dump::DumpFormat::default(),
312 trace_collector: None,
313 iteration_counter: 0,
314 anomaly_detector: None,
315 reasoning_model_warning: true,
316 logging_config: crate::config::LoggingConfig::default(),
317 dump_dir: None,
318 trace_service_name: String::new(),
319 trace_redact: true,
320 current_iteration_span_id: None,
321 },
322 runtime: RuntimeConfig {
323 security: SecurityConfig::default(),
324 timeouts: TimeoutConfig::default(),
325 model_name: String::new(),
326 active_provider_name: String::new(),
327 permission_policy: zeph_tools::PermissionPolicy::default(),
328 redact_credentials: true,
329 rate_limiter: rate_limiter::ToolRateLimiter::new(
330 rate_limiter::RateLimitConfig::default(),
331 ),
332 semantic_cache_enabled: false,
333 semantic_cache_threshold: 0.95,
334 semantic_cache_max_candidates: 10,
335 dependency_config: zeph_tools::DependencyConfig::default(),
336 adversarial_policy_info: None,
337 spawn_depth: 0,
338 budget_hint_enabled: true,
339 channel_skills: zeph_config::ChannelSkillsConfig::default(),
340 },
341 mcp: McpState {
342 tools: Vec::new(),
343 registry: None,
344 manager: None,
345 allowed_commands: Vec::new(),
346 max_dynamic: 10,
347 elicitation_rx: None,
348 shared_tools: None,
349 tool_rx: None,
350 server_outcomes: Vec::new(),
351 pruning_cache: zeph_mcp::PruningCache::new(),
352 pruning_provider: None,
353 pruning_enabled: false,
354 pruning_params: zeph_mcp::PruningParams::default(),
355 semantic_index: None,
356 discovery_strategy: zeph_mcp::ToolDiscoveryStrategy::default(),
357 discovery_params: zeph_mcp::DiscoveryParams::default(),
358 discovery_provider: None,
359 elicitation_warn_sensitive_fields: true,
360 },
361 index: IndexState {
362 retriever: None,
363 repo_map_tokens: 0,
364 cached_repo_map: None,
365 repo_map_ttl: std::time::Duration::from_secs(300),
366 },
367 session: SessionState {
368 env_context: EnvironmentContext::gather(""),
369 response_cache: None,
370 parent_tool_use_id: None,
371 status_tx: None,
372 lsp_hooks: None,
373 policy_config: None,
374 hooks_config: state::HooksConfigSnapshot::default(),
375 },
376 instructions: InstructionState {
377 blocks: Vec::new(),
378 reload_rx: None,
379 reload_state: None,
380 },
381 security: SecurityState {
382 sanitizer: ContentSanitizer::new(&zeph_sanitizer::ContentIsolationConfig::default()),
383 quarantine_summarizer: None,
384 is_acp_session: false,
385 exfiltration_guard: zeph_sanitizer::exfiltration::ExfiltrationGuard::new(
386 zeph_sanitizer::exfiltration::ExfiltrationGuardConfig::default(),
387 ),
388 flagged_urls: std::collections::HashSet::new(),
389 user_provided_urls: std::sync::Arc::new(std::sync::RwLock::new(
390 std::collections::HashSet::new(),
391 )),
392 pii_filter: zeph_sanitizer::pii::PiiFilter::new(
393 zeph_sanitizer::pii::PiiFilterConfig::default(),
394 ),
395 #[cfg(feature = "classifiers")]
396 pii_ner_backend: None,
397 #[cfg(feature = "classifiers")]
398 pii_ner_timeout_ms: 5000,
399 #[cfg(feature = "classifiers")]
400 pii_ner_max_chars: 8192,
401 #[cfg(feature = "classifiers")]
402 pii_ner_circuit_breaker_threshold: 2,
403 #[cfg(feature = "classifiers")]
404 pii_ner_consecutive_timeouts: 0,
405 #[cfg(feature = "classifiers")]
406 pii_ner_tripped: false,
407 memory_validator: zeph_sanitizer::memory_validation::MemoryWriteValidator::new(
408 zeph_sanitizer::memory_validation::MemoryWriteValidationConfig::default(),
409 ),
410 guardrail: None,
411 response_verifier: zeph_sanitizer::response_verifier::ResponseVerifier::new(
412 zeph_config::ResponseVerificationConfig::default(),
413 ),
414 causal_analyzer: None,
415 },
416 experiments: ExperimentState {
417 config: crate::config::ExperimentConfig::default(),
418 cancel: None,
419 baseline: crate::experiments::ConfigSnapshot::default(),
420 eval_provider: None,
421 notify_rx: Some(exp_notify_rx),
422 notify_tx: exp_notify_tx,
423 },
424 compression: CompressionState {
425 current_task_goal: None,
426 task_goal_user_msg_hash: None,
427 pending_task_goal: None,
428 pending_sidequest_result: None,
429 subgoal_registry: crate::agent::compaction_strategy::SubgoalRegistry::default(),
430 pending_subgoal: None,
431 subgoal_user_msg_hash: None,
432 },
433 lifecycle: LifecycleState {
434 shutdown: rx,
435 start_time: Instant::now(),
436 cancel_signal: Arc::new(Notify::new()),
437 cancel_token: CancellationToken::new(),
438 config_path: None,
439 config_reload_rx: None,
440 warmup_ready: None,
441 update_notify_rx: None,
442 custom_task_rx: None,
443 last_known_cwd: std::env::current_dir().unwrap_or_default(),
444 file_changed_rx: None,
445 file_watcher: None,
446 },
447 providers: ProviderState {
448 summary_provider: None,
449 provider_override: None,
450 judge_provider: None,
451 probe_provider: None,
452 compress_provider: None,
453 cached_prompt_tokens: initial_prompt_tokens,
454 server_compaction_active: false,
455 stt: None,
456 provider_pool: Vec::new(),
457 provider_config_snapshot: None,
458 },
459 metrics: MetricsState {
460 metrics_tx: None,
461 cost_tracker: None,
462 token_counter,
463 extended_context: false,
464 classifier_metrics: None,
465 },
466 orchestration: OrchestrationState {
467 planner_provider: None,
468 verify_provider: None,
469 pending_graph: None,
470 plan_cancel_token: None,
471 subagent_manager: None,
472 subagent_config: crate::config::SubAgentConfig::default(),
473 orchestration_config: crate::config::OrchestrationConfig::default(),
474 plan_cache: None,
475 pending_goal_embedding: None,
476 },
477 focus: focus::FocusState::default(),
478 sidequest: sidequest::SidequestState::default(),
479 tool_schema_filter: None,
480 cached_filtered_tool_ids: None,
481 dependency_graph: None,
482 dependency_always_on: HashSet::new(),
483 completed_tool_ids: HashSet::new(),
484 last_persisted_message_id: None,
485 deferred_db_hide_ids: Vec::new(),
486 deferred_db_summaries: Vec::new(),
487 runtime_layers: Vec::new(),
488 current_tool_iteration: 0,
489 }
490 }
491
492 pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
497 let Some(mgr) = &mut self.orchestration.subagent_manager else {
498 return vec![];
499 };
500
501 let finished: Vec<String> = mgr
502 .statuses()
503 .into_iter()
504 .filter_map(|(id, status)| {
505 if matches!(
506 status.state,
507 crate::subagent::SubAgentState::Completed
508 | crate::subagent::SubAgentState::Failed
509 | crate::subagent::SubAgentState::Canceled
510 ) {
511 Some(id)
512 } else {
513 None
514 }
515 })
516 .collect();
517
518 let mut results = vec![];
519 for task_id in finished {
520 match mgr.collect(&task_id).await {
521 Ok(result) => results.push((task_id, result)),
522 Err(e) => {
523 tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
524 }
525 }
526 }
527 results
528 }
529
530 async fn call_llm_for_session_summary(
539 &self,
540 chat_messages: &[Message],
541 ) -> Option<zeph_memory::StructuredSummary> {
542 let timeout_dur =
543 std::time::Duration::from_secs(self.memory_state.shutdown_summary_timeout_secs);
544 match tokio::time::timeout(
545 timeout_dur,
546 self.provider
547 .chat_typed_erased::<zeph_memory::StructuredSummary>(chat_messages),
548 )
549 .await
550 {
551 Ok(Ok(s)) => Some(s),
552 Ok(Err(e)) => {
553 tracing::warn!(
554 "shutdown summary: structured LLM call failed, falling back to plain: {e:#}"
555 );
556 self.plain_text_summary_fallback(chat_messages, timeout_dur)
557 .await
558 }
559 Err(_) => {
560 tracing::warn!(
561 "shutdown summary: structured LLM call timed out after {}s, falling back to plain",
562 self.memory_state.shutdown_summary_timeout_secs
563 );
564 self.plain_text_summary_fallback(chat_messages, timeout_dur)
565 .await
566 }
567 }
568 }
569
570 async fn plain_text_summary_fallback(
571 &self,
572 chat_messages: &[Message],
573 timeout_dur: std::time::Duration,
574 ) -> Option<zeph_memory::StructuredSummary> {
575 match tokio::time::timeout(timeout_dur, self.provider.chat(chat_messages)).await {
576 Ok(Ok(plain)) => Some(zeph_memory::StructuredSummary {
577 summary: plain,
578 key_facts: vec![],
579 entities: vec![],
580 }),
581 Ok(Err(e)) => {
582 tracing::warn!("shutdown summary: plain LLM fallback failed: {e:#}");
583 None
584 }
585 Err(_) => {
586 tracing::warn!("shutdown summary: plain LLM fallback timed out");
587 None
588 }
589 }
590 }
591
592 async fn flush_orphaned_tool_use_on_shutdown(&mut self) {
597 use zeph_llm::provider::{MessagePart, Role};
598
599 let msgs = &self.msg.messages;
603 let Some(asst_idx) = msgs.iter().rposition(|m| m.role == Role::Assistant) else {
605 return;
606 };
607 let asst_msg = &msgs[asst_idx];
608 let tool_use_ids: Vec<(&str, &str, &serde_json::Value)> = asst_msg
609 .parts
610 .iter()
611 .filter_map(|p| {
612 if let MessagePart::ToolUse { id, name, input } = p {
613 Some((id.as_str(), name.as_str(), input))
614 } else {
615 None
616 }
617 })
618 .collect();
619 if tool_use_ids.is_empty() {
620 return;
621 }
622
623 let paired_ids: std::collections::HashSet<&str> = msgs
625 .get(asst_idx + 1..)
626 .into_iter()
627 .flatten()
628 .filter(|m| m.role == Role::User)
629 .flat_map(|m| m.parts.iter())
630 .filter_map(|p| {
631 if let MessagePart::ToolResult { tool_use_id, .. } = p {
632 Some(tool_use_id.as_str())
633 } else {
634 None
635 }
636 })
637 .collect();
638
639 let unpaired: Vec<zeph_llm::provider::ToolUseRequest> = tool_use_ids
640 .iter()
641 .filter(|(id, _, _)| !paired_ids.contains(*id))
642 .map(|(id, name, input)| zeph_llm::provider::ToolUseRequest {
643 id: (*id).to_owned(),
644 name: (*name).to_owned(),
645 input: (*input).clone(),
646 })
647 .collect();
648
649 if unpaired.is_empty() {
650 return;
651 }
652
653 tracing::info!(
654 count = unpaired.len(),
655 "shutdown: persisting tombstone ToolResults for unpaired in-flight tool calls"
656 );
657 self.persist_cancelled_tool_results(&unpaired).await;
658 }
659
660 async fn maybe_store_shutdown_summary(&mut self) {
670 if !self.memory_state.shutdown_summary {
671 return;
672 }
673 let Some(memory) = self.memory_state.memory.clone() else {
674 return;
675 };
676 let Some(conversation_id) = self.memory_state.conversation_id else {
677 return;
678 };
679
680 match memory.has_session_summary(conversation_id).await {
682 Ok(true) => {
683 tracing::debug!("shutdown summary: session already has a summary, skipping");
684 return;
685 }
686 Ok(false) => {}
687 Err(e) => {
688 tracing::warn!("shutdown summary: failed to check existing summary: {e:#}");
689 return;
690 }
691 }
692
693 let user_count = self
695 .msg
696 .messages
697 .iter()
698 .skip(1)
699 .filter(|m| m.role == Role::User)
700 .count();
701 if user_count < self.memory_state.shutdown_summary_min_messages {
702 tracing::debug!(
703 user_count,
704 min = self.memory_state.shutdown_summary_min_messages,
705 "shutdown summary: too few user messages, skipping"
706 );
707 return;
708 }
709
710 let _ = self.channel.send_status("Saving session summary...").await;
712
713 let max = self.memory_state.shutdown_summary_max_messages;
715 if max == 0 {
716 tracing::debug!("shutdown summary: max_messages=0, skipping");
717 return;
718 }
719 let non_system: Vec<_> = self.msg.messages.iter().skip(1).collect();
720 let slice = if non_system.len() > max {
721 &non_system[non_system.len() - max..]
722 } else {
723 &non_system[..]
724 };
725
726 let msgs_for_prompt: Vec<(zeph_memory::MessageId, String, String)> = slice
727 .iter()
728 .map(|m| {
729 let role = match m.role {
730 Role::User => "user".to_owned(),
731 Role::Assistant => "assistant".to_owned(),
732 Role::System => "system".to_owned(),
733 };
734 (zeph_memory::MessageId(0), role, m.content.clone())
735 })
736 .collect();
737
738 let prompt = zeph_memory::build_summarization_prompt(&msgs_for_prompt);
739 let chat_messages = vec![Message {
740 role: Role::User,
741 content: prompt,
742 parts: vec![],
743 metadata: MessageMetadata::default(),
744 }];
745
746 let Some(structured) = self.call_llm_for_session_summary(&chat_messages).await else {
747 let _ = self.channel.send_status("").await;
748 return;
749 };
750
751 if let Err(e) = memory
752 .store_shutdown_summary(conversation_id, &structured.summary, &structured.key_facts)
753 .await
754 {
755 tracing::warn!("shutdown summary: storage failed: {e:#}");
756 } else {
757 tracing::info!(
758 conversation_id = conversation_id.0,
759 "shutdown summary stored"
760 );
761 }
762
763 let _ = self.channel.send_status("").await;
765 }
766
767 pub async fn shutdown(&mut self) {
768 self.channel.send("Shutting down...").await.ok();
769
770 self.provider.save_router_state();
772
773 if let Some(ref mut mgr) = self.orchestration.subagent_manager {
774 mgr.shutdown_all();
775 }
776
777 if let Some(ref manager) = self.mcp.manager {
778 manager.shutdown_all_shared().await;
779 }
780
781 if let Some(turns) = self.context_manager.turns_since_last_hard_compaction {
785 self.update_metrics(|m| {
786 m.compaction_turns_after_hard.push(turns);
787 });
788 self.context_manager.turns_since_last_hard_compaction = None;
789 }
790
791 if let Some(ref tx) = self.metrics.metrics_tx {
792 let m = tx.borrow();
793 if m.filter_applications > 0 {
794 #[allow(clippy::cast_precision_loss)]
795 let pct = if m.filter_raw_tokens > 0 {
796 m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
797 } else {
798 0.0
799 };
800 tracing::info!(
801 raw_tokens = m.filter_raw_tokens,
802 saved_tokens = m.filter_saved_tokens,
803 applications = m.filter_applications,
804 "tool output filtering saved ~{} tokens ({pct:.0}%)",
805 m.filter_saved_tokens,
806 );
807 }
808 if m.compaction_hard_count > 0 {
809 tracing::info!(
810 hard_compactions = m.compaction_hard_count,
811 turns_after_hard = ?m.compaction_turns_after_hard,
812 "hard compaction trajectory"
813 );
814 }
815 }
816
817 self.flush_orphaned_tool_use_on_shutdown().await;
821
822 self.maybe_store_shutdown_summary().await;
823 self.maybe_store_session_digest().await;
824
825 tracing::info!("agent shutdown complete");
826 }
827
828 fn refresh_subagent_metrics(&mut self) {
835 let Some(ref mgr) = self.orchestration.subagent_manager else {
836 return;
837 };
838 let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
839 .statuses()
840 .into_iter()
841 .map(|(id, s)| {
842 let def = mgr.agents_def(&id);
843 crate::metrics::SubAgentMetrics {
844 name: def.map_or_else(|| id[..8.min(id.len())].to_owned(), |d| d.name.clone()),
845 id: id.clone(),
846 state: format!("{:?}", s.state).to_lowercase(),
847 turns_used: s.turns_used,
848 max_turns: def.map_or(20, |d| d.permissions.max_turns),
849 background: def.is_some_and(|d| d.permissions.background),
850 elapsed_secs: s.started_at.elapsed().as_secs(),
851 permission_mode: def.map_or_else(String::new, |d| {
852 use crate::subagent::def::PermissionMode;
853 match d.permissions.permission_mode {
854 PermissionMode::Default => String::new(),
855 PermissionMode::AcceptEdits => "accept_edits".into(),
856 PermissionMode::DontAsk => "dont_ask".into(),
857 PermissionMode::BypassPermissions => "bypass_permissions".into(),
858 PermissionMode::Plan => "plan".into(),
859 }
860 }),
861 transcript_dir: mgr
862 .agent_transcript_dir(&id)
863 .map(|p| p.to_string_lossy().into_owned()),
864 }
865 })
866 .collect();
867 self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
868 }
869
870 async fn notify_completed_subagents(&mut self) -> Result<(), error::AgentError> {
872 let completed = self.poll_subagents().await;
873 for (task_id, result) in completed {
874 let notice = if result.is_empty() {
875 format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
876 } else {
877 format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
878 };
879 if let Err(e) = self.channel.send(¬ice).await {
880 tracing::warn!(error = %e, "failed to send sub-agent completion notice");
881 }
882 }
883 Ok(())
884 }
885
886 pub async fn run(&mut self) -> Result<(), error::AgentError> {
892 if let Some(mut rx) = self.lifecycle.warmup_ready.take()
893 && !*rx.borrow()
894 {
895 let _ = rx.changed().await;
896 if !*rx.borrow() {
897 tracing::warn!("model warmup did not complete successfully");
898 }
899 }
900
901 self.load_and_cache_session_digest().await;
903
904 loop {
905 if let Some(ref slot) = self.providers.provider_override
907 && let Some(new_provider) = slot
908 .write()
909 .unwrap_or_else(std::sync::PoisonError::into_inner)
910 .take()
911 {
912 tracing::debug!(provider = new_provider.name(), "ACP model override applied");
913 self.provider = new_provider;
914 }
915
916 self.check_tool_refresh().await;
918
919 self.process_pending_elicitations().await;
921
922 self.refresh_subagent_metrics();
924
925 self.notify_completed_subagents().await?;
927
928 self.drain_channel();
929
930 let (text, image_parts) = if let Some(queued) = self.msg.message_queue.pop_front() {
931 self.notify_queue_count().await;
932 if queued.raw_attachments.is_empty() {
933 (queued.text, queued.image_parts)
934 } else {
935 let msg = crate::channel::ChannelMessage {
936 text: queued.text,
937 attachments: queued.raw_attachments,
938 };
939 self.resolve_message(msg).await
940 }
941 } else {
942 let incoming = tokio::select! {
943 result = self.channel.recv() => result?,
944 () = shutdown_signal(&mut self.lifecycle.shutdown) => {
945 tracing::info!("shutting down");
946 break;
947 }
948 Some(_) = recv_optional(&mut self.skill_state.skill_reload_rx) => {
949 self.reload_skills().await;
950 continue;
951 }
952 Some(_) = recv_optional(&mut self.instructions.reload_rx) => {
953 self.reload_instructions();
954 continue;
955 }
956 Some(_) = recv_optional(&mut self.lifecycle.config_reload_rx) => {
957 self.reload_config();
958 continue;
959 }
960 Some(msg) = recv_optional(&mut self.lifecycle.update_notify_rx) => {
961 if let Err(e) = self.channel.send(&msg).await {
962 tracing::warn!("failed to send update notification: {e}");
963 }
964 continue;
965 }
966 Some(msg) = recv_optional(&mut self.experiments.notify_rx) => {
967 { self.experiments.cancel = None; }
970 if let Err(e) = self.channel.send(&msg).await {
971 tracing::warn!("failed to send experiment completion: {e}");
972 }
973 continue;
974 }
975 Some(prompt) = recv_optional(&mut self.lifecycle.custom_task_rx) => {
976 tracing::info!("scheduler: injecting custom task as agent turn");
977 let text = format!("{SCHEDULED_TASK_PREFIX}{prompt}");
978 Some(crate::channel::ChannelMessage { text, attachments: Vec::new() })
979 }
980 Some(event) = recv_optional(&mut self.lifecycle.file_changed_rx) => {
981 self.handle_file_changed(event).await;
982 continue;
983 }
984 };
985 let Some(msg) = incoming else { break };
986 self.drain_channel();
987 self.resolve_message(msg).await
988 };
989
990 let trimmed = text.trim();
991
992 match self.handle_builtin_command(trimmed).await? {
993 Some(true) => break,
994 Some(false) => continue,
995 None => {}
996 }
997
998 self.process_user_message(text, image_parts).await?;
999 }
1000
1001 if let Some(ref mut tc) = self.debug_state.trace_collector {
1003 tc.finish();
1004 }
1005
1006 Ok(())
1007 }
1008
1009 async fn handle_debug_dump_command(&mut self, trimmed: &str) {
1011 let arg = trimmed.strip_prefix("/debug-dump").map_or("", str::trim);
1012 if arg.is_empty() {
1013 match &self.debug_state.debug_dumper {
1014 Some(d) => {
1015 let _ = self
1016 .channel
1017 .send(&format!("Debug dump active: {}", d.dir().display()))
1018 .await;
1019 }
1020 None => {
1021 let _ = self
1022 .channel
1023 .send(
1024 "Debug dump is inactive. Use `/debug-dump <path>` to enable, \
1025 or start with `--debug-dump [dir]`.",
1026 )
1027 .await;
1028 }
1029 }
1030 return;
1031 }
1032 let dir = std::path::PathBuf::from(arg);
1033 match crate::debug_dump::DebugDumper::new(&dir, self.debug_state.dump_format) {
1034 Ok(dumper) => {
1035 let path = dumper.dir().display().to_string();
1036 self.debug_state.debug_dumper = Some(dumper);
1037 let _ = self
1038 .channel
1039 .send(&format!("Debug dump enabled: {path}"))
1040 .await;
1041 }
1042 Err(e) => {
1043 let _ = self
1044 .channel
1045 .send(&format!("Failed to enable debug dump: {e}"))
1046 .await;
1047 }
1048 }
1049 }
1050
1051 async fn handle_dump_format_command(&mut self, trimmed: &str) {
1053 let arg = trimmed.strip_prefix("/dump-format").map_or("", str::trim);
1054 if arg.is_empty() {
1055 let _ = self
1056 .channel
1057 .send(&format!(
1058 "Current dump format: {:?}. Use `/dump-format json|raw|trace` to change.",
1059 self.debug_state.dump_format
1060 ))
1061 .await;
1062 return;
1063 }
1064 let new_format = match arg {
1065 "json" => crate::debug_dump::DumpFormat::Json,
1066 "raw" => crate::debug_dump::DumpFormat::Raw,
1067 "trace" => crate::debug_dump::DumpFormat::Trace,
1068 other => {
1069 let _ = self
1070 .channel
1071 .send(&format!(
1072 "Unknown format '{other}'. Valid values: json, raw, trace."
1073 ))
1074 .await;
1075 return;
1076 }
1077 };
1078 let was_trace = self.debug_state.dump_format == crate::debug_dump::DumpFormat::Trace;
1079 let now_trace = new_format == crate::debug_dump::DumpFormat::Trace;
1080
1081 if now_trace
1083 && !was_trace
1084 && let Some(ref dump_dir) = self.debug_state.dump_dir.clone()
1085 {
1086 let service_name = self.debug_state.trace_service_name.clone();
1087 let redact = self.debug_state.trace_redact;
1088 match crate::debug_dump::trace::TracingCollector::new(
1089 dump_dir.as_path(),
1090 &service_name,
1091 redact,
1092 None,
1093 ) {
1094 Ok(collector) => {
1095 self.debug_state.trace_collector = Some(collector);
1096 }
1097 Err(e) => {
1098 tracing::warn!(error = %e, "failed to create TracingCollector on format switch");
1099 }
1100 }
1101 }
1102 if was_trace
1104 && !now_trace
1105 && let Some(mut tc) = self.debug_state.trace_collector.take()
1106 {
1107 tc.finish();
1108 }
1109
1110 self.debug_state.dump_format = new_format;
1111 let _ = self
1112 .channel
1113 .send(&format!("Debug dump format set to: {arg}"))
1114 .await;
1115 }
1116
1117 async fn resolve_message(
1118 &self,
1119 msg: crate::channel::ChannelMessage,
1120 ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
1121 use crate::channel::{Attachment, AttachmentKind};
1122 use zeph_llm::provider::{ImageData, MessagePart};
1123
1124 let text_base = msg.text.clone();
1125
1126 let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
1127 .attachments
1128 .into_iter()
1129 .partition(|a| a.kind == AttachmentKind::Audio);
1130
1131 tracing::debug!(
1132 audio = audio_attachments.len(),
1133 has_stt = self.providers.stt.is_some(),
1134 "resolve_message attachments"
1135 );
1136
1137 let text = if !audio_attachments.is_empty()
1138 && let Some(stt) = self.providers.stt.as_ref()
1139 {
1140 let mut transcribed_parts = Vec::new();
1141 for attachment in &audio_attachments {
1142 if attachment.data.len() > MAX_AUDIO_BYTES {
1143 tracing::warn!(
1144 size = attachment.data.len(),
1145 max = MAX_AUDIO_BYTES,
1146 "audio attachment exceeds size limit, skipping"
1147 );
1148 continue;
1149 }
1150 match stt
1151 .transcribe(&attachment.data, attachment.filename.as_deref())
1152 .await
1153 {
1154 Ok(result) => {
1155 tracing::info!(
1156 len = result.text.len(),
1157 language = ?result.language,
1158 "audio transcribed"
1159 );
1160 transcribed_parts.push(result.text);
1161 }
1162 Err(e) => {
1163 tracing::error!(error = %e, "audio transcription failed");
1164 }
1165 }
1166 }
1167 if transcribed_parts.is_empty() {
1168 text_base
1169 } else {
1170 let transcribed = transcribed_parts.join("\n");
1171 if text_base.is_empty() {
1172 transcribed
1173 } else {
1174 format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
1175 }
1176 }
1177 } else {
1178 if !audio_attachments.is_empty() {
1179 tracing::warn!(
1180 count = audio_attachments.len(),
1181 "audio attachments received but no STT provider configured, dropping"
1182 );
1183 }
1184 text_base
1185 };
1186
1187 let mut image_parts = Vec::new();
1188 for attachment in image_attachments {
1189 if attachment.data.len() > MAX_IMAGE_BYTES {
1190 tracing::warn!(
1191 size = attachment.data.len(),
1192 max = MAX_IMAGE_BYTES,
1193 "image attachment exceeds size limit, skipping"
1194 );
1195 continue;
1196 }
1197 let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
1198 image_parts.push(MessagePart::Image(Box::new(ImageData {
1199 data: attachment.data,
1200 mime_type,
1201 })));
1202 }
1203
1204 (text, image_parts)
1205 }
1206
1207 async fn process_user_message(
1208 &mut self,
1209 text: String,
1210 image_parts: Vec<zeph_llm::provider::MessagePart>,
1211 ) -> Result<(), error::AgentError> {
1212 let iteration_index = self.debug_state.iteration_counter;
1214 self.debug_state.iteration_counter += 1;
1215 if let Some(ref mut tc) = self.debug_state.trace_collector {
1216 tc.begin_iteration(iteration_index, text.trim());
1217 self.debug_state.current_iteration_span_id =
1219 tc.current_iteration_span_id(iteration_index);
1220 }
1221
1222 let result = self
1223 .process_user_message_inner(text, image_parts, iteration_index)
1224 .await;
1225
1226 if let Some(ref mut tc) = self.debug_state.trace_collector {
1228 let status = if result.is_ok() {
1229 crate::debug_dump::trace::SpanStatus::Ok
1230 } else {
1231 crate::debug_dump::trace::SpanStatus::Error {
1232 message: "iteration failed".to_owned(),
1233 }
1234 };
1235 tc.end_iteration(iteration_index, status);
1236 }
1237 self.debug_state.current_iteration_span_id = None;
1238
1239 result
1240 }
1241
1242 async fn process_user_message_inner(
1243 &mut self,
1244 text: String,
1245 image_parts: Vec<zeph_llm::provider::MessagePart>,
1246 iteration_index: usize,
1247 ) -> Result<(), error::AgentError> {
1248 let _ = iteration_index; self.lifecycle.cancel_token = CancellationToken::new();
1250 let signal = Arc::clone(&self.lifecycle.cancel_signal);
1251 let token = self.lifecycle.cancel_token.clone();
1252 tokio::spawn(async move {
1253 signal.notified().await;
1254 token.cancel();
1255 });
1256 let trimmed = text.trim();
1257
1258 if let Some(result) = self.dispatch_slash_command(trimmed).await {
1259 return result;
1260 }
1261
1262 self.check_pending_rollbacks().await;
1263
1264 if self.pre_process_security(trimmed).await? {
1265 return Ok(());
1266 }
1267
1268 self.advance_context_lifecycle(&text, trimmed).await;
1269
1270 let user_msg = self.build_user_message(&text, image_parts);
1271
1272 let urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
1274 if !urls.is_empty()
1275 && let Ok(mut set) = self.security.user_provided_urls.write()
1276 {
1277 set.extend(urls);
1278 }
1279
1280 self.memory_state.goal_text = Some(text.clone());
1283
1284 self.persist_message(Role::User, &text, &[], false).await;
1286 self.push_message(user_msg);
1287
1288 if let Err(e) = self.process_response().await {
1289 tracing::error!("Response processing failed: {e:#}");
1290 let user_msg = format!("Error: {e:#}");
1291 self.channel.send(&user_msg).await?;
1292 self.msg.messages.pop();
1293 self.recompute_prompt_tokens();
1294 self.channel.flush_chunks().await?;
1295 }
1296
1297 Ok(())
1298 }
1299
1300 async fn pre_process_security(&mut self, trimmed: &str) -> Result<bool, error::AgentError> {
1302 if let Some(ref guardrail) = self.security.guardrail {
1304 use zeph_sanitizer::guardrail::GuardrailVerdict;
1305 let verdict = guardrail.check(trimmed).await;
1306 match &verdict {
1307 GuardrailVerdict::Flagged { reason, .. } => {
1308 tracing::warn!(
1309 reason = %reason,
1310 should_block = verdict.should_block(),
1311 "guardrail flagged user input"
1312 );
1313 if verdict.should_block() {
1314 let msg = format!("[guardrail] Input blocked: {reason}");
1315 let _ = self.channel.send(&msg).await;
1316 let _ = self.channel.flush_chunks().await;
1317 return Ok(true);
1318 }
1319 let _ = self
1321 .channel
1322 .send(&format!("[guardrail] Warning: {reason}"))
1323 .await;
1324 }
1325 GuardrailVerdict::Error { error } => {
1326 if guardrail.error_should_block() {
1327 tracing::warn!(%error, "guardrail check failed (fail_strategy=closed), blocking input");
1328 let msg = "[guardrail] Input blocked: check failed (see logs for details)";
1329 let _ = self.channel.send(msg).await;
1330 let _ = self.channel.flush_chunks().await;
1331 return Ok(true);
1332 }
1333 tracing::warn!(%error, "guardrail check failed (fail_strategy=open), allowing input");
1334 }
1335 GuardrailVerdict::Safe => {}
1336 }
1337 }
1338
1339 #[cfg(feature = "classifiers")]
1345 if self.security.sanitizer.scan_user_input() {
1346 match self.security.sanitizer.classify_injection(trimmed).await {
1347 zeph_sanitizer::InjectionVerdict::Blocked => {
1348 self.push_classifier_metrics();
1349 let _ = self
1350 .channel
1351 .send("[security] Input blocked: injection detected by classifier.")
1352 .await;
1353 let _ = self.channel.flush_chunks().await;
1354 return Ok(true);
1355 }
1356 zeph_sanitizer::InjectionVerdict::Suspicious => {
1357 tracing::warn!("injection_classifier soft_signal on user input");
1358 }
1359 zeph_sanitizer::InjectionVerdict::Clean => {}
1360 }
1361 }
1362 #[cfg(feature = "classifiers")]
1363 self.push_classifier_metrics();
1364
1365 Ok(false)
1366 }
1367
1368 async fn advance_context_lifecycle(&mut self, text: &str, trimmed: &str) {
1369 self.mcp.pruning_cache.reset();
1371
1372 let conv_id = self.memory_state.conversation_id;
1375 self.rebuild_system_prompt(text).await;
1376
1377 self.detect_and_record_corrections(trimmed, conv_id).await;
1378 self.learning_engine.tick();
1379 self.analyze_and_learn().await;
1380 self.sync_graph_counts().await;
1381
1382 self.context_manager.compaction = self.context_manager.compaction.advance_turn();
1387
1388 {
1390 self.focus.tick();
1391
1392 let sidequest_should_fire = self.sidequest.tick();
1395 if sidequest_should_fire && !self.context_manager.compaction.is_compacted_this_turn() {
1396 self.maybe_sidequest_eviction();
1397 }
1398 }
1399
1400 self.maybe_apply_deferred_summaries();
1405 self.flush_deferred_summaries().await;
1406
1407 if let Err(e) = self.maybe_proactive_compress().await {
1409 tracing::warn!("proactive compression failed: {e:#}");
1410 }
1411
1412 if let Err(e) = self.maybe_compact().await {
1413 tracing::warn!("context compaction failed: {e:#}");
1414 }
1415
1416 if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
1417 tracing::warn!("context preparation failed: {e:#}");
1418 }
1419
1420 self.provider
1422 .set_memory_confidence(self.memory_state.last_recall_confidence);
1423
1424 self.learning_engine.reset_reflection();
1425 }
1426
1427 fn build_user_message(
1428 &mut self,
1429 text: &str,
1430 image_parts: Vec<zeph_llm::provider::MessagePart>,
1431 ) -> Message {
1432 let mut all_image_parts = std::mem::take(&mut self.msg.pending_image_parts);
1433 all_image_parts.extend(image_parts);
1434
1435 if !all_image_parts.is_empty() && self.provider.supports_vision() {
1436 let mut parts = vec![zeph_llm::provider::MessagePart::Text {
1437 text: text.to_owned(),
1438 }];
1439 parts.extend(all_image_parts);
1440 Message::from_parts(Role::User, parts)
1441 } else {
1442 if !all_image_parts.is_empty() {
1443 tracing::warn!(
1444 count = all_image_parts.len(),
1445 "image attachments dropped: provider does not support vision"
1446 );
1447 }
1448 Message {
1449 role: Role::User,
1450 content: text.to_owned(),
1451 parts: vec![],
1452 metadata: MessageMetadata::default(),
1453 }
1454 }
1455 }
1456
1457 async fn poll_subagent_until_done(&mut self, task_id: &str, label: &str) -> Option<String> {
1460 use crate::subagent::SubAgentState;
1461 let result = loop {
1462 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1463
1464 #[allow(clippy::redundant_closure_for_method_calls)]
1468 let pending = self
1469 .orchestration
1470 .subagent_manager
1471 .as_mut()
1472 .and_then(|m| m.try_recv_secret_request());
1473 if let Some((req_task_id, req)) = pending {
1474 let confirm_prompt = format!(
1477 "Sub-agent requests secret '{}'. Allow?",
1478 crate::text::truncate_to_chars(&req.secret_key, 100)
1479 );
1480 let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
1481 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1482 if approved {
1483 let ttl = std::time::Duration::from_secs(300);
1484 let key = req.secret_key.clone();
1485 if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
1486 let _ = mgr.deliver_secret(&req_task_id, key);
1487 }
1488 } else {
1489 let _ = mgr.deny_secret(&req_task_id);
1490 }
1491 }
1492 }
1493
1494 let mgr = self.orchestration.subagent_manager.as_ref()?;
1495 let statuses = mgr.statuses();
1496 let Some((_, status)) = statuses.iter().find(|(id, _)| id == task_id) else {
1497 break format!("{label} completed (no status available).");
1498 };
1499 match status.state {
1500 SubAgentState::Completed => {
1501 let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
1502 break format!("{label} completed: {msg}");
1503 }
1504 SubAgentState::Failed => {
1505 let msg = status
1506 .last_message
1507 .clone()
1508 .unwrap_or_else(|| "unknown error".into());
1509 break format!("{label} failed: {msg}");
1510 }
1511 SubAgentState::Canceled => {
1512 break format!("{label} was cancelled.");
1513 }
1514 _ => {
1515 let _ = self
1516 .channel
1517 .send_status(&format!(
1518 "{label}: turn {}/{}",
1519 status.turns_used,
1520 self.orchestration
1521 .subagent_manager
1522 .as_ref()
1523 .and_then(|m| m.agents_def(task_id))
1524 .map_or(20, |d| d.permissions.max_turns)
1525 ))
1526 .await;
1527 }
1528 }
1529 };
1530 Some(result)
1531 }
1532
1533 fn resolve_agent_id_prefix(&mut self, prefix: &str) -> Option<Result<String, String>> {
1536 let mgr = self.orchestration.subagent_manager.as_mut()?;
1537 let full_ids: Vec<String> = mgr
1538 .statuses()
1539 .into_iter()
1540 .map(|(tid, _)| tid)
1541 .filter(|tid| tid.starts_with(prefix))
1542 .collect();
1543 Some(match full_ids.as_slice() {
1544 [] => Err(format!("No sub-agent with id prefix '{prefix}'")),
1545 [fid] => Ok(fid.clone()),
1546 _ => Err(format!(
1547 "Ambiguous id prefix '{prefix}': matches {} agents",
1548 full_ids.len()
1549 )),
1550 })
1551 }
1552
1553 fn handle_agent_list(&self) -> Option<String> {
1554 use std::fmt::Write as _;
1555 let mgr = self.orchestration.subagent_manager.as_ref()?;
1556 let defs = mgr.definitions();
1557 if defs.is_empty() {
1558 return Some("No sub-agent definitions found.".into());
1559 }
1560 let mut out = String::from("Available sub-agents:\n");
1561 for d in defs {
1562 let memory_label = match d.memory {
1563 Some(crate::subagent::MemoryScope::User) => " [memory:user]",
1564 Some(crate::subagent::MemoryScope::Project) => " [memory:project]",
1565 Some(crate::subagent::MemoryScope::Local) => " [memory:local]",
1566 None => "",
1567 };
1568 if let Some(ref src) = d.source {
1569 let _ = writeln!(
1570 out,
1571 " {}{} — {} ({})",
1572 d.name, memory_label, d.description, src
1573 );
1574 } else {
1575 let _ = writeln!(out, " {}{} — {}", d.name, memory_label, d.description);
1576 }
1577 }
1578 Some(out)
1579 }
1580
1581 fn handle_agent_status(&self) -> Option<String> {
1582 use std::fmt::Write as _;
1583 let mgr = self.orchestration.subagent_manager.as_ref()?;
1584 let statuses = mgr.statuses();
1585 if statuses.is_empty() {
1586 return Some("No active sub-agents.".into());
1587 }
1588 let mut out = String::from("Active sub-agents:\n");
1589 for (id, s) in &statuses {
1590 let state = format!("{:?}", s.state).to_lowercase();
1591 let elapsed = s.started_at.elapsed().as_secs();
1592 let _ = writeln!(
1593 out,
1594 " [{short}] {state} turns={t} elapsed={elapsed}s {msg}",
1595 short = &id[..8.min(id.len())],
1596 t = s.turns_used,
1597 msg = s.last_message.as_deref().unwrap_or(""),
1598 );
1599 if let Some(def) = mgr.agents_def(id)
1601 && let Some(scope) = def.memory
1602 && let Ok(dir) = crate::subagent::memory::resolve_memory_dir(scope, &def.name)
1603 {
1604 let _ = writeln!(out, " memory: {}", dir.display());
1605 }
1606 }
1607 Some(out)
1608 }
1609
1610 fn handle_agent_approve(&mut self, id: &str) -> Option<String> {
1611 let full_id = match self.resolve_agent_id_prefix(id)? {
1612 Ok(fid) => fid,
1613 Err(msg) => return Some(msg),
1614 };
1615 let mgr = self.orchestration.subagent_manager.as_mut()?;
1616 if let Some((tid, req)) = mgr.try_recv_secret_request()
1617 && tid == full_id
1618 {
1619 let key = req.secret_key.clone();
1620 let ttl = std::time::Duration::from_secs(300);
1621 if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
1622 return Some(format!("Approve failed: {e}"));
1623 }
1624 if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
1625 return Some(format!("Secret delivery failed: {e}"));
1626 }
1627 return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
1628 }
1629 Some(format!(
1630 "No pending secret request for sub-agent '{full_id}'."
1631 ))
1632 }
1633
1634 fn handle_agent_deny(&mut self, id: &str) -> Option<String> {
1635 let full_id = match self.resolve_agent_id_prefix(id)? {
1636 Ok(fid) => fid,
1637 Err(msg) => return Some(msg),
1638 };
1639 let mgr = self.orchestration.subagent_manager.as_mut()?;
1640 match mgr.deny_secret(&full_id) {
1641 Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
1642 Err(e) => Some(format!("Deny failed: {e}")),
1643 }
1644 }
1645
1646 #[allow(clippy::too_many_lines)]
1647 async fn handle_agent_command(&mut self, cmd: crate::subagent::AgentCommand) -> Option<String> {
1648 use crate::subagent::AgentCommand;
1649
1650 match cmd {
1651 AgentCommand::List => self.handle_agent_list(),
1652 AgentCommand::Background { name, prompt } => {
1653 let provider = self.provider.clone();
1654 let tool_executor = Arc::clone(&self.tool_executor);
1655 let skills = self.filtered_skills_for(&name);
1656 let cfg = self.orchestration.subagent_config.clone();
1657 let spawn_ctx = self.build_spawn_context(&cfg);
1658 let mgr = self.orchestration.subagent_manager.as_mut()?;
1659 match mgr.spawn(
1660 &name,
1661 &prompt,
1662 provider,
1663 tool_executor,
1664 skills,
1665 &cfg,
1666 spawn_ctx,
1667 ) {
1668 Ok(id) => Some(format!(
1669 "Sub-agent '{name}' started in background (id: {short})",
1670 short = &id[..8.min(id.len())]
1671 )),
1672 Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
1673 }
1674 }
1675 AgentCommand::Spawn { name, prompt }
1676 | AgentCommand::Mention {
1677 agent: name,
1678 prompt,
1679 } => {
1680 let provider = self.provider.clone();
1682 let tool_executor = Arc::clone(&self.tool_executor);
1683 let skills = self.filtered_skills_for(&name);
1684 let cfg = self.orchestration.subagent_config.clone();
1685 let spawn_ctx = self.build_spawn_context(&cfg);
1686 let mgr = self.orchestration.subagent_manager.as_mut()?;
1687 let task_id = match mgr.spawn(
1688 &name,
1689 &prompt,
1690 provider,
1691 tool_executor,
1692 skills,
1693 &cfg,
1694 spawn_ctx,
1695 ) {
1696 Ok(id) => id,
1697 Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
1698 };
1699 let short = task_id[..8.min(task_id.len())].to_owned();
1700 let _ = self
1701 .channel
1702 .send(&format!("Sub-agent '{name}' running... (id: {short})"))
1703 .await;
1704 let label = format!("Sub-agent '{name}'");
1705 self.poll_subagent_until_done(&task_id, &label).await
1706 }
1707 AgentCommand::Status => self.handle_agent_status(),
1708 AgentCommand::Cancel { id } => {
1709 let mgr = self.orchestration.subagent_manager.as_mut()?;
1710 let ids: Vec<String> = mgr
1712 .statuses()
1713 .into_iter()
1714 .map(|(task_id, _)| task_id)
1715 .filter(|task_id| task_id.starts_with(&id))
1716 .collect();
1717 match ids.as_slice() {
1718 [] => Some(format!("No sub-agent with id prefix '{id}'")),
1719 [full_id] => {
1720 let full_id = full_id.clone();
1721 match mgr.cancel(&full_id) {
1722 Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
1723 Err(e) => Some(format!("Cancel failed: {e}")),
1724 }
1725 }
1726 _ => Some(format!(
1727 "Ambiguous id prefix '{id}': matches {} agents",
1728 ids.len()
1729 )),
1730 }
1731 }
1732 AgentCommand::Approve { id } => self.handle_agent_approve(&id),
1733 AgentCommand::Deny { id } => self.handle_agent_deny(&id),
1734 AgentCommand::Resume { id, prompt } => {
1735 let cfg = self.orchestration.subagent_config.clone();
1736 let def_name = {
1739 let mgr = self.orchestration.subagent_manager.as_ref()?;
1740 match mgr.def_name_for_resume(&id, &cfg) {
1741 Ok(name) => name,
1742 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
1743 }
1744 };
1745 let skills = self.filtered_skills_for(&def_name);
1746 let provider = self.provider.clone();
1747 let tool_executor = Arc::clone(&self.tool_executor);
1748 let mgr = self.orchestration.subagent_manager.as_mut()?;
1749 let (task_id, _) =
1750 match mgr.resume(&id, &prompt, provider, tool_executor, skills, &cfg) {
1751 Ok(pair) => pair,
1752 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
1753 };
1754 let short = task_id[..8.min(task_id.len())].to_owned();
1755 let _ = self
1756 .channel
1757 .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
1758 .await;
1759 self.poll_subagent_until_done(&task_id, "Resumed sub-agent")
1760 .await
1761 }
1762 }
1763 }
1764
1765 fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
1766 let mgr = self.orchestration.subagent_manager.as_ref()?;
1767 let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
1768 let reg = self
1769 .skill_state
1770 .registry
1771 .read()
1772 .expect("registry read lock");
1773 match crate::subagent::filter_skills(®, &def.skills) {
1774 Ok(skills) => {
1775 let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
1776 if bodies.is_empty() {
1777 None
1778 } else {
1779 Some(bodies)
1780 }
1781 }
1782 Err(e) => {
1783 tracing::warn!(error = %e, "skill filtering failed for sub-agent");
1784 None
1785 }
1786 }
1787 }
1788
1789 fn build_spawn_context(
1791 &self,
1792 cfg: &zeph_config::SubAgentConfig,
1793 ) -> crate::subagent::SpawnContext {
1794 crate::subagent::SpawnContext {
1795 parent_messages: self.extract_parent_messages(cfg),
1796 parent_cancel: Some(self.lifecycle.cancel_token.clone()),
1797 parent_provider_name: {
1798 let name = &self.runtime.active_provider_name;
1799 if name.is_empty() {
1800 None
1801 } else {
1802 Some(name.clone())
1803 }
1804 },
1805 spawn_depth: self.runtime.spawn_depth,
1806 mcp_tool_names: self.extract_mcp_tool_names(),
1807 }
1808 }
1809
1810 fn extract_parent_messages(
1815 &self,
1816 config: &zeph_config::SubAgentConfig,
1817 ) -> Vec<zeph_llm::provider::Message> {
1818 use zeph_llm::provider::Role;
1819 if config.context_window_turns == 0 {
1820 return Vec::new();
1821 }
1822 let non_system: Vec<_> = self
1823 .msg
1824 .messages
1825 .iter()
1826 .filter(|m| m.role != Role::System)
1827 .cloned()
1828 .collect();
1829 let take_count = config.context_window_turns * 2;
1830 let start = non_system.len().saturating_sub(take_count);
1831 let mut msgs = non_system[start..].to_vec();
1832
1833 let max_chars = 128_000usize / 4; let mut total_chars: usize = 0;
1836 let mut keep = msgs.len();
1837 for (i, m) in msgs.iter().enumerate() {
1838 total_chars += m.content.len();
1839 if total_chars > max_chars {
1840 keep = i;
1841 break;
1842 }
1843 }
1844 if keep < msgs.len() {
1845 tracing::info!(
1846 kept = keep,
1847 requested = config.context_window_turns * 2,
1848 "[subagent] truncated parent history from {} to {} turns due to token budget",
1849 config.context_window_turns * 2,
1850 keep
1851 );
1852 msgs.truncate(keep);
1853 }
1854 msgs
1855 }
1856
1857 fn extract_mcp_tool_names(&self) -> Vec<String> {
1859 self.tool_executor
1860 .tool_definitions_erased()
1861 .into_iter()
1862 .filter(|t| t.id.starts_with("mcp_"))
1863 .map(|t| t.id.to_string())
1864 .collect()
1865 }
1866
1867 async fn update_trust_for_reloaded_skills(&self, all_meta: &[zeph_skills::loader::SkillMeta]) {
1869 let Some(ref memory) = self.memory_state.memory else {
1870 return;
1871 };
1872 let trust_cfg = self.skill_state.trust_config.clone();
1873 let managed_dir = self.skill_state.managed_dir.clone();
1874 for meta in all_meta {
1875 let source_kind = if managed_dir
1876 .as_ref()
1877 .is_some_and(|d| meta.skill_dir.starts_with(d))
1878 {
1879 zeph_memory::store::SourceKind::Hub
1880 } else {
1881 zeph_memory::store::SourceKind::Local
1882 };
1883 let initial_level = if matches!(source_kind, zeph_memory::store::SourceKind::Hub) {
1884 &trust_cfg.default_level
1885 } else {
1886 &trust_cfg.local_level
1887 };
1888 match zeph_skills::compute_skill_hash(&meta.skill_dir) {
1889 Ok(current_hash) => {
1890 let existing = memory
1891 .sqlite()
1892 .load_skill_trust(&meta.name)
1893 .await
1894 .ok()
1895 .flatten();
1896 let trust_level_str = if let Some(ref row) = existing {
1897 if row.blake3_hash == current_hash {
1898 row.trust_level.clone()
1899 } else {
1900 trust_cfg.hash_mismatch_level.to_string()
1901 }
1902 } else {
1903 initial_level.to_string()
1904 };
1905 let source_path = meta.skill_dir.to_str();
1906 if let Err(e) = memory
1907 .sqlite()
1908 .upsert_skill_trust(
1909 &meta.name,
1910 &trust_level_str,
1911 source_kind,
1912 None,
1913 source_path,
1914 ¤t_hash,
1915 )
1916 .await
1917 {
1918 tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
1919 }
1920 }
1921 Err(e) => {
1922 tracing::warn!("failed to compute hash for '{}': {e:#}", meta.name);
1923 }
1924 }
1925 }
1926 }
1927
1928 async fn rebuild_skill_matcher(&mut self, all_meta: &[&zeph_skills::loader::SkillMeta]) {
1930 let provider = self.embedding_provider.clone();
1931 let embed_fn = |text: &str| -> zeph_skills::matcher::EmbedFuture {
1932 let owned = text.to_owned();
1933 let p = provider.clone();
1934 Box::pin(async move { p.embed(&owned).await })
1935 };
1936
1937 let needs_inmemory_rebuild = !self
1938 .skill_state
1939 .matcher
1940 .as_ref()
1941 .is_some_and(SkillMatcherBackend::is_qdrant);
1942
1943 if needs_inmemory_rebuild {
1944 self.skill_state.matcher = SkillMatcher::new(all_meta, embed_fn)
1945 .await
1946 .map(SkillMatcherBackend::InMemory);
1947 } else if let Some(ref mut backend) = self.skill_state.matcher {
1948 let _ = self.channel.send_status("syncing skill index...").await;
1949 if let Err(e) = backend
1950 .sync(all_meta, &self.skill_state.embedding_model, embed_fn)
1951 .await
1952 {
1953 tracing::warn!("failed to sync skill embeddings: {e:#}");
1954 }
1955 }
1956
1957 if self.skill_state.hybrid_search {
1958 let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
1959 let _ = self.channel.send_status("rebuilding search index...").await;
1960 self.skill_state.bm25_index = Some(zeph_skills::bm25::Bm25Index::build(&descs));
1961 }
1962 }
1963
1964 async fn reload_skills(&mut self) {
1965 let new_registry = SkillRegistry::load(&self.skill_state.skill_paths);
1966 if new_registry.fingerprint()
1967 == self
1968 .skill_state
1969 .registry
1970 .read()
1971 .expect("registry read lock")
1972 .fingerprint()
1973 {
1974 return;
1975 }
1976 let _ = self.channel.send_status("reloading skills...").await;
1977 *self
1978 .skill_state
1979 .registry
1980 .write()
1981 .expect("registry write lock") = new_registry;
1982
1983 let all_meta = self
1984 .skill_state
1985 .registry
1986 .read()
1987 .expect("registry read lock")
1988 .all_meta()
1989 .into_iter()
1990 .cloned()
1991 .collect::<Vec<_>>();
1992
1993 self.update_trust_for_reloaded_skills(&all_meta).await;
1994
1995 let all_meta_refs = all_meta.iter().collect::<Vec<_>>();
1996 self.rebuild_skill_matcher(&all_meta_refs).await;
1997
1998 let all_skills: Vec<Skill> = {
1999 let reg = self
2000 .skill_state
2001 .registry
2002 .read()
2003 .expect("registry read lock");
2004 reg.all_meta()
2005 .iter()
2006 .filter_map(|m| reg.get_skill(&m.name).ok())
2007 .collect()
2008 };
2009 let trust_map = self.build_skill_trust_map().await;
2010 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
2011 let skills_prompt = format_skills_prompt(&all_skills, &trust_map, &empty_health);
2012 self.skill_state
2013 .last_skills_prompt
2014 .clone_from(&skills_prompt);
2015 let system_prompt = build_system_prompt(&skills_prompt, None, None, false);
2016 if let Some(msg) = self.msg.messages.first_mut() {
2017 msg.content = system_prompt;
2018 }
2019
2020 let _ = self.channel.send_status("").await;
2021 tracing::info!(
2022 "reloaded {} skill(s)",
2023 self.skill_state
2024 .registry
2025 .read()
2026 .expect("registry read lock")
2027 .all_meta()
2028 .len()
2029 );
2030 }
2031
2032 fn reload_instructions(&mut self) {
2033 if let Some(ref mut rx) = self.instructions.reload_rx {
2035 while rx.try_recv().is_ok() {}
2036 }
2037 let Some(ref state) = self.instructions.reload_state else {
2038 return;
2039 };
2040 let new_blocks = crate::instructions::load_instructions(
2041 &state.base_dir,
2042 &state.provider_kinds,
2043 &state.explicit_files,
2044 state.auto_detect,
2045 );
2046 let old_sources: std::collections::HashSet<_> =
2047 self.instructions.blocks.iter().map(|b| &b.source).collect();
2048 let new_sources: std::collections::HashSet<_> =
2049 new_blocks.iter().map(|b| &b.source).collect();
2050 for added in new_sources.difference(&old_sources) {
2051 tracing::info!(path = %added.display(), "instruction file added");
2052 }
2053 for removed in old_sources.difference(&new_sources) {
2054 tracing::info!(path = %removed.display(), "instruction file removed");
2055 }
2056 tracing::info!(
2057 old_count = self.instructions.blocks.len(),
2058 new_count = new_blocks.len(),
2059 "reloaded instruction files"
2060 );
2061 self.instructions.blocks = new_blocks;
2062 }
2063
2064 fn reload_config(&mut self) {
2065 let Some(ref path) = self.lifecycle.config_path else {
2066 return;
2067 };
2068 let config = match Config::load(path) {
2069 Ok(c) => c,
2070 Err(e) => {
2071 tracing::warn!("config reload failed: {e:#}");
2072 return;
2073 }
2074 };
2075
2076 self.runtime.security = config.security;
2077 self.runtime.timeouts = config.timeouts;
2078 self.runtime.redact_credentials = config.memory.redact_credentials;
2079 self.memory_state.history_limit = config.memory.history_limit;
2080 self.memory_state.recall_limit = config.memory.semantic.recall_limit;
2081 self.memory_state.summarization_threshold = config.memory.summarization_threshold;
2082 self.skill_state.max_active_skills = config.skills.max_active_skills;
2083 self.skill_state.disambiguation_threshold = config.skills.disambiguation_threshold;
2084 self.skill_state.min_injection_score = config.skills.min_injection_score;
2085 self.skill_state.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
2086 self.skill_state.hybrid_search = config.skills.hybrid_search;
2087 self.skill_state.two_stage_matching = config.skills.two_stage_matching;
2088 self.skill_state.confusability_threshold =
2089 config.skills.confusability_threshold.clamp(0.0, 1.0);
2090 config
2091 .skills
2092 .generation_provider
2093 .as_str()
2094 .clone_into(&mut self.skill_state.generation_provider_name);
2095 self.skill_state.generation_output_dir =
2096 config.skills.generation_output_dir.as_deref().map(|p| {
2097 if let Some(stripped) = p.strip_prefix("~/") {
2098 dirs::home_dir()
2099 .map_or_else(|| std::path::PathBuf::from(p), |h| h.join(stripped))
2100 } else {
2101 std::path::PathBuf::from(p)
2102 }
2103 });
2104
2105 if config.memory.context_budget_tokens > 0 {
2106 self.context_manager.budget = Some(
2107 ContextBudget::new(config.memory.context_budget_tokens, 0.20)
2108 .with_graph_enabled(config.memory.graph.enabled),
2109 );
2110 } else {
2111 self.context_manager.budget = None;
2112 }
2113
2114 {
2115 let graph_cfg = &config.memory.graph;
2116 if graph_cfg.rpe.enabled {
2117 if self.memory_state.rpe_router.is_none() {
2119 self.memory_state.rpe_router =
2120 Some(std::sync::Mutex::new(zeph_memory::RpeRouter::new(
2121 graph_cfg.rpe.threshold,
2122 graph_cfg.rpe.max_skip_turns,
2123 )));
2124 }
2125 } else {
2126 self.memory_state.rpe_router = None;
2127 }
2128 self.memory_state.graph_config = graph_cfg.clone();
2129 }
2130 self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
2131 self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
2132 self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
2133 self.context_manager.compaction_cooldown_turns = config.memory.compaction_cooldown_turns;
2134 self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
2135 self.context_manager.compression = config.memory.compression.clone();
2136 self.context_manager.routing = config.memory.store_routing.clone();
2137 self.context_manager.store_routing_provider = if config
2139 .memory
2140 .store_routing
2141 .routing_classifier_provider
2142 .is_empty()
2143 {
2144 None
2145 } else {
2146 let resolved = self.resolve_background_provider(
2147 &config.memory.store_routing.routing_classifier_provider,
2148 );
2149 Some(std::sync::Arc::new(resolved))
2150 };
2151 self.memory_state.cross_session_score_threshold =
2152 config.memory.cross_session_score_threshold;
2153
2154 self.index.repo_map_tokens = config.index.repo_map_tokens;
2155 self.index.repo_map_ttl = std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
2156
2157 tracing::info!("config reloaded");
2158 }
2159
2160 async fn handle_focus_status_command(&mut self) -> Result<(), error::AgentError> {
2162 use std::fmt::Write;
2163 let mut out = String::from("Focus Agent status\n\n");
2164 let _ = writeln!(out, "Enabled: {}", self.focus.config.enabled);
2165 let _ = writeln!(out, "Active session: {}", self.focus.is_active());
2166 if let Some(ref scope) = self.focus.active_scope {
2167 let _ = writeln!(out, "Active scope: {scope}");
2168 }
2169 let _ = writeln!(
2170 out,
2171 "Knowledge blocks: {}",
2172 self.focus.knowledge_blocks.len()
2173 );
2174 let _ = writeln!(out, "Turns since focus: {}", self.focus.turns_since_focus);
2175 self.channel.send(&out).await?;
2176 Ok(())
2177 }
2178
2179 async fn handle_sidequest_status_command(&mut self) -> Result<(), error::AgentError> {
2181 use std::fmt::Write;
2182 let mut out = String::from("SideQuest status\n\n");
2183 let _ = writeln!(out, "Enabled: {}", self.sidequest.config.enabled);
2184 let _ = writeln!(
2185 out,
2186 "Interval turns: {}",
2187 self.sidequest.config.interval_turns
2188 );
2189 let _ = writeln!(out, "Turn counter: {}", self.sidequest.turn_counter);
2190 let _ = writeln!(out, "Passes run: {}", self.sidequest.passes_run);
2191 let _ = writeln!(
2192 out,
2193 "Total evicted: {} tool outputs",
2194 self.sidequest.total_evicted
2195 );
2196 self.channel.send(&out).await?;
2197 Ok(())
2198 }
2199
2200 #[allow(clippy::too_many_lines)]
2211 fn maybe_sidequest_eviction(&mut self) {
2212 use zeph_llm::provider::{Message, MessageMetadata, Role};
2213
2214 if self.sidequest.config.enabled {
2218 use crate::config::PruningStrategy;
2219 if !matches!(
2220 self.context_manager.compression.pruning_strategy,
2221 PruningStrategy::Reactive
2222 ) {
2223 tracing::warn!(
2224 strategy = ?self.context_manager.compression.pruning_strategy,
2225 "sidequest is enabled alongside a non-Reactive pruning strategy; \
2226 consider disabling sidequest.enabled to avoid redundant eviction"
2227 );
2228 }
2229 }
2230
2231 if self.focus.is_active() {
2233 tracing::debug!("sidequest: skipping — focus session active");
2234 self.compression.pending_sidequest_result = None;
2236 return;
2237 }
2238
2239 if let Some(handle) = self.compression.pending_sidequest_result.take() {
2241 use futures::FutureExt as _;
2243 match handle.now_or_never() {
2244 Some(Ok(Some(evicted_indices))) if !evicted_indices.is_empty() => {
2245 let cursors_snapshot = self.sidequest.tool_output_cursors.clone();
2246 let freed = self.sidequest.apply_eviction(
2247 &mut self.msg.messages,
2248 &evicted_indices,
2249 &self.metrics.token_counter,
2250 );
2251 if freed > 0 {
2252 self.recompute_prompt_tokens();
2253 self.context_manager.compaction =
2256 crate::agent::context_manager::CompactionState::CompactedThisTurn {
2257 cooldown: 0,
2258 };
2259 tracing::info!(
2260 freed_tokens = freed,
2261 evicted_cursors = evicted_indices.len(),
2262 pass = self.sidequest.passes_run,
2263 "sidequest eviction complete"
2264 );
2265 if let Some(ref d) = self.debug_state.debug_dumper {
2266 d.dump_sidequest_eviction(&cursors_snapshot, &evicted_indices, freed);
2267 }
2268 if let Some(ref tx) = self.session.status_tx {
2269 let _ = tx.send(format!("SideQuest evicted {freed} tokens"));
2270 }
2271 } else {
2272 if let Some(ref tx) = self.session.status_tx {
2274 let _ = tx.send(String::new());
2275 }
2276 }
2277 }
2278 Some(Ok(None | Some(_))) => {
2279 tracing::debug!("sidequest: pending result: no cursors to evict");
2280 if let Some(ref tx) = self.session.status_tx {
2281 let _ = tx.send(String::new());
2282 }
2283 }
2284 Some(Err(e)) => {
2285 tracing::debug!("sidequest: background task panicked: {e}");
2286 if let Some(ref tx) = self.session.status_tx {
2287 let _ = tx.send(String::new());
2288 }
2289 }
2290 None => {
2291 tracing::debug!(
2295 "sidequest: background LLM task not yet complete, rescheduling"
2296 );
2297 }
2298 }
2299 }
2300
2301 self.sidequest
2303 .rebuild_cursors(&self.msg.messages, &self.metrics.token_counter);
2304
2305 if self.sidequest.tool_output_cursors.is_empty() {
2306 tracing::debug!("sidequest: no eligible cursors");
2307 return;
2308 }
2309
2310 let prompt = self.sidequest.build_eviction_prompt();
2311 let max_eviction_ratio = self.sidequest.config.max_eviction_ratio;
2312 let n_cursors = self.sidequest.tool_output_cursors.len();
2313 let provider = self.summary_or_primary_provider().clone();
2315
2316 let handle = tokio::spawn(async move {
2318 let msgs = [Message {
2319 role: Role::User,
2320 content: prompt,
2321 parts: vec![],
2322 metadata: MessageMetadata::default(),
2323 }];
2324 let response =
2325 match tokio::time::timeout(std::time::Duration::from_secs(5), provider.chat(&msgs))
2326 .await
2327 {
2328 Ok(Ok(r)) => r,
2329 Ok(Err(e)) => {
2330 tracing::debug!("sidequest bg: LLM call failed: {e:#}");
2331 return None;
2332 }
2333 Err(_) => {
2334 tracing::debug!("sidequest bg: LLM call timed out");
2335 return None;
2336 }
2337 };
2338
2339 let start = response.find('{')?;
2340 let end = response.rfind('}')?;
2341 if start > end {
2342 return None;
2343 }
2344 let json_slice = &response[start..=end];
2345 let parsed: sidequest::EvictionResponse = serde_json::from_str(json_slice).ok()?;
2346 let mut valid: Vec<usize> = parsed
2347 .del_cursors
2348 .into_iter()
2349 .filter(|&c| c < n_cursors)
2350 .collect();
2351 valid.sort_unstable();
2352 valid.dedup();
2353 #[allow(
2354 clippy::cast_precision_loss,
2355 clippy::cast_possible_truncation,
2356 clippy::cast_sign_loss
2357 )]
2358 let max_evict = ((n_cursors as f32) * max_eviction_ratio).ceil() as usize;
2359 valid.truncate(max_evict);
2360 Some(valid)
2361 });
2362
2363 self.compression.pending_sidequest_result = Some(handle);
2364 tracing::debug!("sidequest: background LLM eviction task spawned");
2365 if let Some(ref tx) = self.session.status_tx {
2366 let _ = tx.send("SideQuest: scoring tool outputs...".into());
2367 }
2368 }
2369
2370 pub(crate) async fn check_cwd_changed(&mut self) {
2376 let current = match std::env::current_dir() {
2377 Ok(p) => p,
2378 Err(e) => {
2379 tracing::warn!("check_cwd_changed: failed to get cwd: {e}");
2380 return;
2381 }
2382 };
2383 if current == self.lifecycle.last_known_cwd {
2384 return;
2385 }
2386 let old_cwd = std::mem::replace(&mut self.lifecycle.last_known_cwd, current.clone());
2387 self.session.env_context.working_dir = current.display().to_string();
2388
2389 tracing::info!(
2390 old = %old_cwd.display(),
2391 new = %current.display(),
2392 "working directory changed"
2393 );
2394
2395 let _ = self
2396 .channel
2397 .send_status("Working directory changed\u{2026}")
2398 .await;
2399
2400 let hooks = self.session.hooks_config.cwd_changed.clone();
2401 if !hooks.is_empty() {
2402 let mut env = std::collections::HashMap::new();
2403 env.insert("ZEPH_OLD_CWD".to_owned(), old_cwd.display().to_string());
2404 env.insert("ZEPH_NEW_CWD".to_owned(), current.display().to_string());
2405 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env).await {
2406 tracing::warn!(error = %e, "CwdChanged hook failed");
2407 }
2408 }
2409
2410 let _ = self.channel.send_status("").await;
2411 }
2412
2413 pub(crate) async fn handle_file_changed(
2415 &mut self,
2416 event: crate::file_watcher::FileChangedEvent,
2417 ) {
2418 tracing::info!(path = %event.path.display(), "file changed");
2419
2420 let _ = self
2421 .channel
2422 .send_status("Running file-change hook\u{2026}")
2423 .await;
2424
2425 let hooks = self.session.hooks_config.file_changed_hooks.clone();
2426 if !hooks.is_empty() {
2427 let mut env = std::collections::HashMap::new();
2428 env.insert(
2429 "ZEPH_CHANGED_PATH".to_owned(),
2430 event.path.display().to_string(),
2431 );
2432 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env).await {
2433 tracing::warn!(error = %e, "FileChanged hook failed");
2434 }
2435 }
2436
2437 let _ = self.channel.send_status("").await;
2438 }
2439}
2440pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
2441 while !*rx.borrow_and_update() {
2442 if rx.changed().await.is_err() {
2443 std::future::pending::<()>().await;
2444 }
2445 }
2446}
2447
2448pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
2449 match rx {
2450 Some(inner) => {
2451 if let Some(v) = inner.recv().await {
2452 Some(v)
2453 } else {
2454 *rx = None;
2455 std::future::pending().await
2456 }
2457 }
2458 None => std::future::pending().await,
2459 }
2460}
2461
2462#[cfg(test)]
2463mod tests;
2464
2465#[cfg(test)]
2466pub(crate) use tests::agent_tests;