1mod accessors;
5mod builder;
6pub(crate) mod compaction_strategy;
7pub(super) mod compression_feedback;
8mod context;
9pub(crate) mod context_manager;
10mod corrections;
11pub mod error;
12mod experiment_cmd;
13pub(super) mod feedback_detector;
14pub(crate) mod focus;
15mod graph_commands;
16mod guidelines_commands;
17mod index;
18mod learning;
19pub(crate) mod learning_engine;
20mod log_commands;
21mod lsp_commands;
22mod mcp;
23mod memory_commands;
24mod message_queue;
25mod model_commands;
26mod persistence;
27mod plan;
28mod policy_commands;
29mod provider_cmd;
30pub(crate) mod rate_limiter;
31#[cfg(feature = "scheduler")]
32mod scheduler_commands;
33mod scheduler_loop;
34pub mod session_config;
35mod session_digest;
36pub(crate) mod sidequest;
37mod skill_management;
38pub mod slash_commands;
39pub(crate) mod state;
40pub(crate) mod tool_execution;
41pub(crate) mod tool_orchestrator;
42mod trust_commands;
43mod utils;
44
45use std::collections::{HashMap, HashSet, VecDeque};
46use std::sync::Arc;
47use std::time::Instant;
48
49use tokio::sync::{Notify, mpsc, watch};
50use tokio_util::sync::CancellationToken;
51use zeph_llm::any::AnyProvider;
52use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
53use zeph_memory::TokenCounter;
54use zeph_memory::semantic::SemanticMemory;
55use zeph_skills::loader::Skill;
56use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
57use zeph_skills::prompt::format_skills_prompt;
58use zeph_skills::registry::SkillRegistry;
59use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
60
61use crate::channel::Channel;
62use crate::config::Config;
63use crate::config::{SecurityConfig, SkillPromptMode, TimeoutConfig};
64use crate::context::{
65 ContextBudget, EnvironmentContext, build_system_prompt, build_system_prompt_with_instructions,
66};
67use zeph_sanitizer::ContentSanitizer;
68
69use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, detect_image_mime};
70use state::CompressionState;
71use state::{
72 DebugState, ExperimentState, FeedbackState, IndexState, InstructionState, LifecycleState,
73 McpState, MemoryState, MessageState, MetricsState, OrchestrationState, ProviderState,
74 RuntimeConfig, SecurityState, SessionState, SkillState,
75};
76
77pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
78pub(crate) const DOCUMENT_RAG_PREFIX: &str = "## Relevant documents\n";
79pub(crate) const RECALL_PREFIX: &str = "[semantic recall]\n";
80pub(crate) const CODE_CONTEXT_PREFIX: &str = "[code context]\n";
81pub(crate) const SUMMARY_PREFIX: &str = "[conversation summaries]\n";
82pub(crate) const CROSS_SESSION_PREFIX: &str = "[cross-session context]\n";
83pub(crate) const CORRECTIONS_PREFIX: &str = "[past corrections]\n";
84pub(crate) const GRAPH_FACTS_PREFIX: &str = "[known facts]\n";
85pub(crate) const SCHEDULED_TASK_PREFIX: &str = "Execute the following scheduled task now: ";
86pub(crate) const SESSION_DIGEST_PREFIX: &str = "[Session digest from previous interaction]\n";
87pub(crate) const LSP_NOTE_PREFIX: &str = "[lsp ";
92pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
93
94pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
95 use std::fmt::Write;
96 let capacity = "[tool output: ".len()
97 + tool_name.len()
98 + "]\n```\n".len()
99 + body.len()
100 + TOOL_OUTPUT_SUFFIX.len();
101 let mut buf = String::with_capacity(capacity);
102 let _ = write!(
103 buf,
104 "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
105 );
106 buf
107}
108
109pub struct Agent<C: Channel> {
110 provider: AnyProvider,
111 embedding_provider: AnyProvider,
116 channel: C,
117 pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
118 pub(super) msg: MessageState,
119 pub(super) memory_state: MemoryState,
120 pub(super) skill_state: SkillState,
121 pub(super) context_manager: context_manager::ContextManager,
122 pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
123 pub(super) learning_engine: learning_engine::LearningEngine,
124 pub(super) feedback: FeedbackState,
125 pub(super) runtime: RuntimeConfig,
126 pub(super) mcp: McpState,
127 pub(super) index: IndexState,
128 pub(super) session: SessionState,
129 pub(super) debug_state: DebugState,
130 pub(super) instructions: InstructionState,
131 pub(super) security: SecurityState,
132 pub(super) experiments: ExperimentState,
133 pub(super) compression: CompressionState,
134 pub(super) lifecycle: LifecycleState,
135 pub(super) providers: ProviderState,
136 pub(super) metrics: MetricsState,
137 pub(super) orchestration: OrchestrationState,
138 pub(super) focus: focus::FocusState,
140 pub(super) sidequest: sidequest::SidequestState,
142 pub(super) tool_schema_filter: Option<zeph_tools::ToolSchemaFilter>,
144 pub(super) cached_filtered_tool_ids: Option<HashSet<String>>,
147 pub(super) dependency_graph: Option<zeph_tools::ToolDependencyGraph>,
150 pub(super) dependency_always_on: HashSet<String>,
152 pub(super) completed_tool_ids: HashSet<String>,
156 pub(super) last_persisted_message_id: Option<i64>,
159 pub(super) deferred_db_hide_ids: Vec<i64>,
161 pub(super) deferred_db_summaries: Vec<String>,
163 pub(super) runtime_layers: Vec<std::sync::Arc<dyn crate::runtime_layer::RuntimeLayer>>,
167 pub(super) current_tool_iteration: usize,
170}
171
172impl<C: Channel> Agent<C> {
173 #[must_use]
174 pub fn new(
175 provider: AnyProvider,
176 channel: C,
177 registry: SkillRegistry,
178 matcher: Option<SkillMatcherBackend>,
179 max_active_skills: usize,
180 tool_executor: impl ToolExecutor + 'static,
181 ) -> Self {
182 let registry = std::sync::Arc::new(std::sync::RwLock::new(registry));
183 Self::new_with_registry_arc(
184 provider,
185 channel,
186 registry,
187 matcher,
188 max_active_skills,
189 tool_executor,
190 )
191 }
192
193 #[must_use]
200 #[allow(clippy::too_many_lines)] pub fn new_with_registry_arc(
202 provider: AnyProvider,
203 channel: C,
204 registry: std::sync::Arc<std::sync::RwLock<SkillRegistry>>,
205 matcher: Option<SkillMatcherBackend>,
206 max_active_skills: usize,
207 tool_executor: impl ToolExecutor + 'static,
208 ) -> Self {
209 debug_assert!(max_active_skills > 0, "max_active_skills must be > 0");
210 let all_skills: Vec<Skill> = {
211 let reg = registry.read().expect("registry read lock poisoned");
212 reg.all_meta()
213 .iter()
214 .filter_map(|m| reg.get_skill(&m.name).ok())
215 .collect()
216 };
217 let empty_trust = HashMap::new();
218 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
219 let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
220 let system_prompt = build_system_prompt(&skills_prompt, None, None, false);
221 tracing::debug!(len = system_prompt.len(), "initial system prompt built");
222 tracing::trace!(prompt = %system_prompt, "full system prompt");
223
224 let initial_prompt_tokens = u64::try_from(system_prompt.len()).unwrap_or(0) / 4;
225 let (_tx, rx) = watch::channel(false);
226 let token_counter = Arc::new(TokenCounter::new());
227 let (exp_notify_tx, exp_notify_rx) = tokio::sync::mpsc::channel::<String>(4);
231 let embedding_provider = provider.clone();
232 Self {
233 provider,
234 embedding_provider,
235 channel,
236 tool_executor: Arc::new(tool_executor),
237 msg: MessageState {
238 messages: vec![Message {
239 role: Role::System,
240 content: system_prompt,
241 parts: vec![],
242 metadata: MessageMetadata::default(),
243 }],
244 message_queue: VecDeque::new(),
245 pending_image_parts: Vec::new(),
246 },
247 memory_state: MemoryState {
248 memory: None,
249 conversation_id: None,
250 history_limit: 50,
251 recall_limit: 5,
252 summarization_threshold: 50,
253 cross_session_score_threshold: 0.35,
254 autosave_assistant: false,
255 autosave_min_length: 20,
256 tool_call_cutoff: 6,
257 unsummarized_count: 0,
258 document_config: crate::config::DocumentConfig::default(),
259 graph_config: crate::config::GraphConfig::default(),
260 compression_guidelines_config: zeph_memory::CompressionGuidelinesConfig::default(),
261 shutdown_summary: true,
262 shutdown_summary_min_messages: 4,
263 shutdown_summary_max_messages: 20,
264 shutdown_summary_timeout_secs: 10,
265 structured_summaries: false,
266 last_recall_confidence: None,
267 digest_config: crate::config::DigestConfig::default(),
268 cached_session_digest: None,
269 context_strategy: crate::config::ContextStrategy::default(),
270 crossover_turn_threshold: 20,
271 rpe_router: None,
272 goal_text: None,
273 },
274 skill_state: SkillState {
275 registry,
276 skill_paths: Vec::new(),
277 managed_dir: None,
278 trust_config: crate::config::TrustConfig::default(),
279 matcher,
280 max_active_skills,
281 disambiguation_threshold: 0.20,
282 min_injection_score: 0.20,
283 embedding_model: String::new(),
284 skill_reload_rx: None,
285 active_skill_names: Vec::new(),
286 last_skills_prompt: skills_prompt,
287 prompt_mode: SkillPromptMode::Auto,
288 available_custom_secrets: HashMap::new(),
289 cosine_weight: 0.7,
290 hybrid_search: false,
291 bm25_index: None,
292 two_stage_matching: false,
293 confusability_threshold: 0.0,
294 rl_head: None,
295 rl_weight: 0.3,
296 rl_warmup_updates: 50,
297 generation_output_dir: None,
298 generation_provider_name: String::new(),
299 },
300 context_manager: context_manager::ContextManager::new(),
301 tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
302 learning_engine: learning_engine::LearningEngine::new(),
303 feedback: FeedbackState {
304 detector: feedback_detector::FeedbackDetector::new(0.6),
305 judge: None,
306 llm_classifier: None,
307 },
308 debug_state: DebugState {
309 debug_dumper: None,
310 dump_format: crate::debug_dump::DumpFormat::default(),
311 trace_collector: None,
312 iteration_counter: 0,
313 anomaly_detector: None,
314 reasoning_model_warning: true,
315 logging_config: crate::config::LoggingConfig::default(),
316 dump_dir: None,
317 trace_service_name: String::new(),
318 trace_redact: true,
319 current_iteration_span_id: None,
320 },
321 runtime: RuntimeConfig {
322 security: SecurityConfig::default(),
323 timeouts: TimeoutConfig::default(),
324 model_name: String::new(),
325 active_provider_name: String::new(),
326 permission_policy: zeph_tools::PermissionPolicy::default(),
327 redact_credentials: true,
328 rate_limiter: rate_limiter::ToolRateLimiter::new(
329 rate_limiter::RateLimitConfig::default(),
330 ),
331 semantic_cache_enabled: false,
332 semantic_cache_threshold: 0.95,
333 semantic_cache_max_candidates: 10,
334 dependency_config: zeph_tools::DependencyConfig::default(),
335 adversarial_policy_info: None,
336 spawn_depth: 0,
337 budget_hint_enabled: true,
338 channel_skills: zeph_config::ChannelSkillsConfig::default(),
339 },
340 mcp: McpState {
341 tools: Vec::new(),
342 registry: None,
343 manager: None,
344 allowed_commands: Vec::new(),
345 max_dynamic: 10,
346 elicitation_rx: None,
347 shared_tools: None,
348 tool_rx: None,
349 server_outcomes: Vec::new(),
350 pruning_cache: zeph_mcp::PruningCache::new(),
351 pruning_provider: None,
352 pruning_enabled: false,
353 pruning_params: zeph_mcp::PruningParams::default(),
354 semantic_index: None,
355 discovery_strategy: zeph_mcp::ToolDiscoveryStrategy::default(),
356 discovery_params: zeph_mcp::DiscoveryParams::default(),
357 discovery_provider: None,
358 elicitation_warn_sensitive_fields: true,
359 },
360 index: IndexState {
361 retriever: None,
362 repo_map_tokens: 0,
363 cached_repo_map: None,
364 repo_map_ttl: std::time::Duration::from_secs(300),
365 },
366 session: SessionState {
367 env_context: EnvironmentContext::gather(""),
368 response_cache: None,
369 parent_tool_use_id: None,
370 status_tx: None,
371 lsp_hooks: None,
372 policy_config: None,
373 hooks_config: state::HooksConfigSnapshot::default(),
374 },
375 instructions: InstructionState {
376 blocks: Vec::new(),
377 reload_rx: None,
378 reload_state: None,
379 },
380 security: SecurityState {
381 sanitizer: ContentSanitizer::new(&zeph_sanitizer::ContentIsolationConfig::default()),
382 quarantine_summarizer: None,
383 is_acp_session: false,
384 exfiltration_guard: zeph_sanitizer::exfiltration::ExfiltrationGuard::new(
385 zeph_sanitizer::exfiltration::ExfiltrationGuardConfig::default(),
386 ),
387 flagged_urls: std::collections::HashSet::new(),
388 user_provided_urls: std::sync::Arc::new(std::sync::RwLock::new(
389 std::collections::HashSet::new(),
390 )),
391 pii_filter: zeph_sanitizer::pii::PiiFilter::new(
392 zeph_sanitizer::pii::PiiFilterConfig::default(),
393 ),
394 #[cfg(feature = "classifiers")]
395 pii_ner_backend: None,
396 #[cfg(feature = "classifiers")]
397 pii_ner_timeout_ms: 5000,
398 #[cfg(feature = "classifiers")]
399 pii_ner_max_chars: 8192,
400 #[cfg(feature = "classifiers")]
401 pii_ner_circuit_breaker_threshold: 2,
402 #[cfg(feature = "classifiers")]
403 pii_ner_consecutive_timeouts: 0,
404 #[cfg(feature = "classifiers")]
405 pii_ner_tripped: false,
406 memory_validator: zeph_sanitizer::memory_validation::MemoryWriteValidator::new(
407 zeph_sanitizer::memory_validation::MemoryWriteValidationConfig::default(),
408 ),
409 guardrail: None,
410 response_verifier: zeph_sanitizer::response_verifier::ResponseVerifier::new(
411 zeph_config::ResponseVerificationConfig::default(),
412 ),
413 causal_analyzer: None,
414 },
415 experiments: ExperimentState {
416 config: crate::config::ExperimentConfig::default(),
417 cancel: None,
418 baseline: crate::experiments::ConfigSnapshot::default(),
419 eval_provider: None,
420 notify_rx: Some(exp_notify_rx),
421 notify_tx: exp_notify_tx,
422 },
423 compression: CompressionState {
424 current_task_goal: None,
425 task_goal_user_msg_hash: None,
426 pending_task_goal: None,
427 pending_sidequest_result: None,
428 subgoal_registry: crate::agent::compaction_strategy::SubgoalRegistry::default(),
429 pending_subgoal: None,
430 subgoal_user_msg_hash: None,
431 },
432 lifecycle: LifecycleState {
433 shutdown: rx,
434 start_time: Instant::now(),
435 cancel_signal: Arc::new(Notify::new()),
436 cancel_token: CancellationToken::new(),
437 config_path: None,
438 config_reload_rx: None,
439 warmup_ready: None,
440 update_notify_rx: None,
441 custom_task_rx: None,
442 last_known_cwd: std::env::current_dir().unwrap_or_default(),
443 file_changed_rx: None,
444 file_watcher: None,
445 },
446 providers: ProviderState {
447 summary_provider: None,
448 provider_override: None,
449 judge_provider: None,
450 probe_provider: None,
451 compress_provider: None,
452 cached_prompt_tokens: initial_prompt_tokens,
453 server_compaction_active: false,
454 stt: None,
455 provider_pool: Vec::new(),
456 provider_config_snapshot: None,
457 },
458 metrics: MetricsState {
459 metrics_tx: None,
460 cost_tracker: None,
461 token_counter,
462 extended_context: false,
463 classifier_metrics: None,
464 },
465 orchestration: OrchestrationState {
466 planner_provider: None,
467 verify_provider: None,
468 pending_graph: None,
469 plan_cancel_token: None,
470 subagent_manager: None,
471 subagent_config: crate::config::SubAgentConfig::default(),
472 orchestration_config: crate::config::OrchestrationConfig::default(),
473 plan_cache: None,
474 pending_goal_embedding: None,
475 },
476 focus: focus::FocusState::default(),
477 sidequest: sidequest::SidequestState::default(),
478 tool_schema_filter: None,
479 cached_filtered_tool_ids: None,
480 dependency_graph: None,
481 dependency_always_on: HashSet::new(),
482 completed_tool_ids: HashSet::new(),
483 last_persisted_message_id: None,
484 deferred_db_hide_ids: Vec::new(),
485 deferred_db_summaries: Vec::new(),
486 runtime_layers: Vec::new(),
487 current_tool_iteration: 0,
488 }
489 }
490
491 pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
496 let Some(mgr) = &mut self.orchestration.subagent_manager else {
497 return vec![];
498 };
499
500 let finished: Vec<String> = mgr
501 .statuses()
502 .into_iter()
503 .filter_map(|(id, status)| {
504 if matches!(
505 status.state,
506 crate::subagent::SubAgentState::Completed
507 | crate::subagent::SubAgentState::Failed
508 | crate::subagent::SubAgentState::Canceled
509 ) {
510 Some(id)
511 } else {
512 None
513 }
514 })
515 .collect();
516
517 let mut results = vec![];
518 for task_id in finished {
519 match mgr.collect(&task_id).await {
520 Ok(result) => results.push((task_id, result)),
521 Err(e) => {
522 tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
523 }
524 }
525 }
526 results
527 }
528
529 async fn call_llm_for_session_summary(
538 &self,
539 chat_messages: &[Message],
540 ) -> Option<zeph_memory::StructuredSummary> {
541 let timeout_dur =
542 std::time::Duration::from_secs(self.memory_state.shutdown_summary_timeout_secs);
543 match tokio::time::timeout(
544 timeout_dur,
545 self.provider
546 .chat_typed_erased::<zeph_memory::StructuredSummary>(chat_messages),
547 )
548 .await
549 {
550 Ok(Ok(s)) => Some(s),
551 Ok(Err(e)) => {
552 tracing::warn!(
553 "shutdown summary: structured LLM call failed, falling back to plain: {e:#}"
554 );
555 self.plain_text_summary_fallback(chat_messages, timeout_dur)
556 .await
557 }
558 Err(_) => {
559 tracing::warn!(
560 "shutdown summary: structured LLM call timed out after {}s, falling back to plain",
561 self.memory_state.shutdown_summary_timeout_secs
562 );
563 self.plain_text_summary_fallback(chat_messages, timeout_dur)
564 .await
565 }
566 }
567 }
568
569 async fn plain_text_summary_fallback(
570 &self,
571 chat_messages: &[Message],
572 timeout_dur: std::time::Duration,
573 ) -> Option<zeph_memory::StructuredSummary> {
574 match tokio::time::timeout(timeout_dur, self.provider.chat(chat_messages)).await {
575 Ok(Ok(plain)) => Some(zeph_memory::StructuredSummary {
576 summary: plain,
577 key_facts: vec![],
578 entities: vec![],
579 }),
580 Ok(Err(e)) => {
581 tracing::warn!("shutdown summary: plain LLM fallback failed: {e:#}");
582 None
583 }
584 Err(_) => {
585 tracing::warn!("shutdown summary: plain LLM fallback timed out");
586 None
587 }
588 }
589 }
590
591 async fn flush_orphaned_tool_use_on_shutdown(&mut self) {
596 use zeph_llm::provider::{MessagePart, Role};
597
598 let msgs = &self.msg.messages;
602 let Some(asst_idx) = msgs.iter().rposition(|m| m.role == Role::Assistant) else {
604 return;
605 };
606 let asst_msg = &msgs[asst_idx];
607 let tool_use_ids: Vec<(&str, &str, &serde_json::Value)> = asst_msg
608 .parts
609 .iter()
610 .filter_map(|p| {
611 if let MessagePart::ToolUse { id, name, input } = p {
612 Some((id.as_str(), name.as_str(), input))
613 } else {
614 None
615 }
616 })
617 .collect();
618 if tool_use_ids.is_empty() {
619 return;
620 }
621
622 let paired_ids: std::collections::HashSet<&str> = msgs
624 .get(asst_idx + 1..)
625 .into_iter()
626 .flatten()
627 .filter(|m| m.role == Role::User)
628 .flat_map(|m| m.parts.iter())
629 .filter_map(|p| {
630 if let MessagePart::ToolResult { tool_use_id, .. } = p {
631 Some(tool_use_id.as_str())
632 } else {
633 None
634 }
635 })
636 .collect();
637
638 let unpaired: Vec<zeph_llm::provider::ToolUseRequest> = tool_use_ids
639 .iter()
640 .filter(|(id, _, _)| !paired_ids.contains(*id))
641 .map(|(id, name, input)| zeph_llm::provider::ToolUseRequest {
642 id: (*id).to_owned(),
643 name: (*name).to_owned(),
644 input: (*input).clone(),
645 })
646 .collect();
647
648 if unpaired.is_empty() {
649 return;
650 }
651
652 tracing::info!(
653 count = unpaired.len(),
654 "shutdown: persisting tombstone ToolResults for unpaired in-flight tool calls"
655 );
656 self.persist_cancelled_tool_results(&unpaired).await;
657 }
658
659 async fn maybe_store_shutdown_summary(&mut self) {
669 if !self.memory_state.shutdown_summary {
670 return;
671 }
672 let Some(memory) = self.memory_state.memory.clone() else {
673 return;
674 };
675 let Some(conversation_id) = self.memory_state.conversation_id else {
676 return;
677 };
678
679 match memory.has_session_summary(conversation_id).await {
681 Ok(true) => {
682 tracing::debug!("shutdown summary: session already has a summary, skipping");
683 return;
684 }
685 Ok(false) => {}
686 Err(e) => {
687 tracing::warn!("shutdown summary: failed to check existing summary: {e:#}");
688 return;
689 }
690 }
691
692 let user_count = self
694 .msg
695 .messages
696 .iter()
697 .skip(1)
698 .filter(|m| m.role == Role::User)
699 .count();
700 if user_count < self.memory_state.shutdown_summary_min_messages {
701 tracing::debug!(
702 user_count,
703 min = self.memory_state.shutdown_summary_min_messages,
704 "shutdown summary: too few user messages, skipping"
705 );
706 return;
707 }
708
709 let _ = self.channel.send_status("Saving session summary...").await;
711
712 let max = self.memory_state.shutdown_summary_max_messages;
714 if max == 0 {
715 tracing::debug!("shutdown summary: max_messages=0, skipping");
716 return;
717 }
718 let non_system: Vec<_> = self.msg.messages.iter().skip(1).collect();
719 let slice = if non_system.len() > max {
720 &non_system[non_system.len() - max..]
721 } else {
722 &non_system[..]
723 };
724
725 let msgs_for_prompt: Vec<(zeph_memory::MessageId, String, String)> = slice
726 .iter()
727 .map(|m| {
728 let role = match m.role {
729 Role::User => "user".to_owned(),
730 Role::Assistant => "assistant".to_owned(),
731 Role::System => "system".to_owned(),
732 };
733 (zeph_memory::MessageId(0), role, m.content.clone())
734 })
735 .collect();
736
737 let prompt = zeph_memory::build_summarization_prompt(&msgs_for_prompt);
738 let chat_messages = vec![Message {
739 role: Role::User,
740 content: prompt,
741 parts: vec![],
742 metadata: MessageMetadata::default(),
743 }];
744
745 let Some(structured) = self.call_llm_for_session_summary(&chat_messages).await else {
746 let _ = self.channel.send_status("").await;
747 return;
748 };
749
750 if let Err(e) = memory
751 .store_shutdown_summary(conversation_id, &structured.summary, &structured.key_facts)
752 .await
753 {
754 tracing::warn!("shutdown summary: storage failed: {e:#}");
755 } else {
756 tracing::info!(
757 conversation_id = conversation_id.0,
758 "shutdown summary stored"
759 );
760 }
761
762 let _ = self.channel.send_status("").await;
764 }
765
766 pub async fn shutdown(&mut self) {
767 self.channel.send("Shutting down...").await.ok();
768
769 self.provider.save_router_state();
771
772 if let Some(ref mut mgr) = self.orchestration.subagent_manager {
773 mgr.shutdown_all();
774 }
775
776 if let Some(ref manager) = self.mcp.manager {
777 manager.shutdown_all_shared().await;
778 }
779
780 if let Some(turns) = self.context_manager.turns_since_last_hard_compaction {
784 self.update_metrics(|m| {
785 m.compaction_turns_after_hard.push(turns);
786 });
787 self.context_manager.turns_since_last_hard_compaction = None;
788 }
789
790 if let Some(ref tx) = self.metrics.metrics_tx {
791 let m = tx.borrow();
792 if m.filter_applications > 0 {
793 #[allow(clippy::cast_precision_loss)]
794 let pct = if m.filter_raw_tokens > 0 {
795 m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
796 } else {
797 0.0
798 };
799 tracing::info!(
800 raw_tokens = m.filter_raw_tokens,
801 saved_tokens = m.filter_saved_tokens,
802 applications = m.filter_applications,
803 "tool output filtering saved ~{} tokens ({pct:.0}%)",
804 m.filter_saved_tokens,
805 );
806 }
807 if m.compaction_hard_count > 0 {
808 tracing::info!(
809 hard_compactions = m.compaction_hard_count,
810 turns_after_hard = ?m.compaction_turns_after_hard,
811 "hard compaction trajectory"
812 );
813 }
814 }
815
816 self.flush_orphaned_tool_use_on_shutdown().await;
820
821 self.maybe_store_shutdown_summary().await;
822 self.maybe_store_session_digest().await;
823
824 tracing::info!("agent shutdown complete");
825 }
826
827 fn refresh_subagent_metrics(&mut self) {
834 let Some(ref mgr) = self.orchestration.subagent_manager else {
835 return;
836 };
837 let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
838 .statuses()
839 .into_iter()
840 .map(|(id, s)| {
841 let def = mgr.agents_def(&id);
842 crate::metrics::SubAgentMetrics {
843 name: def.map_or_else(|| id[..8.min(id.len())].to_owned(), |d| d.name.clone()),
844 id: id.clone(),
845 state: format!("{:?}", s.state).to_lowercase(),
846 turns_used: s.turns_used,
847 max_turns: def.map_or(20, |d| d.permissions.max_turns),
848 background: def.is_some_and(|d| d.permissions.background),
849 elapsed_secs: s.started_at.elapsed().as_secs(),
850 permission_mode: def.map_or_else(String::new, |d| {
851 use crate::subagent::def::PermissionMode;
852 match d.permissions.permission_mode {
853 PermissionMode::Default => String::new(),
854 PermissionMode::AcceptEdits => "accept_edits".into(),
855 PermissionMode::DontAsk => "dont_ask".into(),
856 PermissionMode::BypassPermissions => "bypass_permissions".into(),
857 PermissionMode::Plan => "plan".into(),
858 }
859 }),
860 transcript_dir: mgr
861 .agent_transcript_dir(&id)
862 .map(|p| p.to_string_lossy().into_owned()),
863 }
864 })
865 .collect();
866 self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
867 }
868
869 async fn notify_completed_subagents(&mut self) -> Result<(), error::AgentError> {
871 let completed = self.poll_subagents().await;
872 for (task_id, result) in completed {
873 let notice = if result.is_empty() {
874 format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
875 } else {
876 format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
877 };
878 if let Err(e) = self.channel.send(¬ice).await {
879 tracing::warn!(error = %e, "failed to send sub-agent completion notice");
880 }
881 }
882 Ok(())
883 }
884
885 pub async fn run(&mut self) -> Result<(), error::AgentError> {
891 if let Some(mut rx) = self.lifecycle.warmup_ready.take()
892 && !*rx.borrow()
893 {
894 let _ = rx.changed().await;
895 if !*rx.borrow() {
896 tracing::warn!("model warmup did not complete successfully");
897 }
898 }
899
900 self.load_and_cache_session_digest().await;
902
903 loop {
904 if let Some(ref slot) = self.providers.provider_override
906 && let Some(new_provider) = slot
907 .write()
908 .unwrap_or_else(std::sync::PoisonError::into_inner)
909 .take()
910 {
911 tracing::debug!(provider = new_provider.name(), "ACP model override applied");
912 self.provider = new_provider;
913 }
914
915 self.check_tool_refresh().await;
917
918 self.process_pending_elicitations().await;
920
921 self.refresh_subagent_metrics();
923
924 self.notify_completed_subagents().await?;
926
927 self.drain_channel();
928
929 let (text, image_parts) = if let Some(queued) = self.msg.message_queue.pop_front() {
930 self.notify_queue_count().await;
931 if queued.raw_attachments.is_empty() {
932 (queued.text, queued.image_parts)
933 } else {
934 let msg = crate::channel::ChannelMessage {
935 text: queued.text,
936 attachments: queued.raw_attachments,
937 };
938 self.resolve_message(msg).await
939 }
940 } else {
941 let incoming = tokio::select! {
942 result = self.channel.recv() => result?,
943 () = shutdown_signal(&mut self.lifecycle.shutdown) => {
944 tracing::info!("shutting down");
945 break;
946 }
947 Some(_) = recv_optional(&mut self.skill_state.skill_reload_rx) => {
948 self.reload_skills().await;
949 continue;
950 }
951 Some(_) = recv_optional(&mut self.instructions.reload_rx) => {
952 self.reload_instructions();
953 continue;
954 }
955 Some(_) = recv_optional(&mut self.lifecycle.config_reload_rx) => {
956 self.reload_config();
957 continue;
958 }
959 Some(msg) = recv_optional(&mut self.lifecycle.update_notify_rx) => {
960 if let Err(e) = self.channel.send(&msg).await {
961 tracing::warn!("failed to send update notification: {e}");
962 }
963 continue;
964 }
965 Some(msg) = recv_optional(&mut self.experiments.notify_rx) => {
966 { self.experiments.cancel = None; }
969 if let Err(e) = self.channel.send(&msg).await {
970 tracing::warn!("failed to send experiment completion: {e}");
971 }
972 continue;
973 }
974 Some(prompt) = recv_optional(&mut self.lifecycle.custom_task_rx) => {
975 tracing::info!("scheduler: injecting custom task as agent turn");
976 let text = format!("{SCHEDULED_TASK_PREFIX}{prompt}");
977 Some(crate::channel::ChannelMessage { text, attachments: Vec::new() })
978 }
979 Some(event) = recv_optional(&mut self.lifecycle.file_changed_rx) => {
980 self.handle_file_changed(event).await;
981 continue;
982 }
983 };
984 let Some(msg) = incoming else { break };
985 self.drain_channel();
986 self.resolve_message(msg).await
987 };
988
989 let trimmed = text.trim();
990
991 match self.handle_builtin_command(trimmed).await? {
992 Some(true) => break,
993 Some(false) => continue,
994 None => {}
995 }
996
997 self.process_user_message(text, image_parts).await?;
998 }
999
1000 if let Some(ref mut tc) = self.debug_state.trace_collector {
1002 tc.finish();
1003 }
1004
1005 Ok(())
1006 }
1007
1008 async fn handle_debug_dump_command(&mut self, trimmed: &str) {
1010 let arg = trimmed.strip_prefix("/debug-dump").map_or("", str::trim);
1011 if arg.is_empty() {
1012 match &self.debug_state.debug_dumper {
1013 Some(d) => {
1014 let _ = self
1015 .channel
1016 .send(&format!("Debug dump active: {}", d.dir().display()))
1017 .await;
1018 }
1019 None => {
1020 let _ = self
1021 .channel
1022 .send(
1023 "Debug dump is inactive. Use `/debug-dump <path>` to enable, \
1024 or start with `--debug-dump [dir]`.",
1025 )
1026 .await;
1027 }
1028 }
1029 return;
1030 }
1031 let dir = std::path::PathBuf::from(arg);
1032 match crate::debug_dump::DebugDumper::new(&dir, self.debug_state.dump_format) {
1033 Ok(dumper) => {
1034 let path = dumper.dir().display().to_string();
1035 self.debug_state.debug_dumper = Some(dumper);
1036 let _ = self
1037 .channel
1038 .send(&format!("Debug dump enabled: {path}"))
1039 .await;
1040 }
1041 Err(e) => {
1042 let _ = self
1043 .channel
1044 .send(&format!("Failed to enable debug dump: {e}"))
1045 .await;
1046 }
1047 }
1048 }
1049
1050 async fn handle_dump_format_command(&mut self, trimmed: &str) {
1052 let arg = trimmed.strip_prefix("/dump-format").map_or("", str::trim);
1053 if arg.is_empty() {
1054 let _ = self
1055 .channel
1056 .send(&format!(
1057 "Current dump format: {:?}. Use `/dump-format json|raw|trace` to change.",
1058 self.debug_state.dump_format
1059 ))
1060 .await;
1061 return;
1062 }
1063 let new_format = match arg {
1064 "json" => crate::debug_dump::DumpFormat::Json,
1065 "raw" => crate::debug_dump::DumpFormat::Raw,
1066 "trace" => crate::debug_dump::DumpFormat::Trace,
1067 other => {
1068 let _ = self
1069 .channel
1070 .send(&format!(
1071 "Unknown format '{other}'. Valid values: json, raw, trace."
1072 ))
1073 .await;
1074 return;
1075 }
1076 };
1077 let was_trace = self.debug_state.dump_format == crate::debug_dump::DumpFormat::Trace;
1078 let now_trace = new_format == crate::debug_dump::DumpFormat::Trace;
1079
1080 if now_trace
1082 && !was_trace
1083 && let Some(ref dump_dir) = self.debug_state.dump_dir.clone()
1084 {
1085 let service_name = self.debug_state.trace_service_name.clone();
1086 let redact = self.debug_state.trace_redact;
1087 match crate::debug_dump::trace::TracingCollector::new(
1088 dump_dir.as_path(),
1089 &service_name,
1090 redact,
1091 None,
1092 ) {
1093 Ok(collector) => {
1094 self.debug_state.trace_collector = Some(collector);
1095 }
1096 Err(e) => {
1097 tracing::warn!(error = %e, "failed to create TracingCollector on format switch");
1098 }
1099 }
1100 }
1101 if was_trace
1103 && !now_trace
1104 && let Some(mut tc) = self.debug_state.trace_collector.take()
1105 {
1106 tc.finish();
1107 }
1108
1109 self.debug_state.dump_format = new_format;
1110 let _ = self
1111 .channel
1112 .send(&format!("Debug dump format set to: {arg}"))
1113 .await;
1114 }
1115
1116 async fn resolve_message(
1117 &self,
1118 msg: crate::channel::ChannelMessage,
1119 ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
1120 use crate::channel::{Attachment, AttachmentKind};
1121 use zeph_llm::provider::{ImageData, MessagePart};
1122
1123 let text_base = msg.text.clone();
1124
1125 let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
1126 .attachments
1127 .into_iter()
1128 .partition(|a| a.kind == AttachmentKind::Audio);
1129
1130 tracing::debug!(
1131 audio = audio_attachments.len(),
1132 has_stt = self.providers.stt.is_some(),
1133 "resolve_message attachments"
1134 );
1135
1136 let text = if !audio_attachments.is_empty()
1137 && let Some(stt) = self.providers.stt.as_ref()
1138 {
1139 let mut transcribed_parts = Vec::new();
1140 for attachment in &audio_attachments {
1141 if attachment.data.len() > MAX_AUDIO_BYTES {
1142 tracing::warn!(
1143 size = attachment.data.len(),
1144 max = MAX_AUDIO_BYTES,
1145 "audio attachment exceeds size limit, skipping"
1146 );
1147 continue;
1148 }
1149 match stt
1150 .transcribe(&attachment.data, attachment.filename.as_deref())
1151 .await
1152 {
1153 Ok(result) => {
1154 tracing::info!(
1155 len = result.text.len(),
1156 language = ?result.language,
1157 "audio transcribed"
1158 );
1159 transcribed_parts.push(result.text);
1160 }
1161 Err(e) => {
1162 tracing::error!(error = %e, "audio transcription failed");
1163 }
1164 }
1165 }
1166 if transcribed_parts.is_empty() {
1167 text_base
1168 } else {
1169 let transcribed = transcribed_parts.join("\n");
1170 if text_base.is_empty() {
1171 transcribed
1172 } else {
1173 format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
1174 }
1175 }
1176 } else {
1177 if !audio_attachments.is_empty() {
1178 tracing::warn!(
1179 count = audio_attachments.len(),
1180 "audio attachments received but no STT provider configured, dropping"
1181 );
1182 }
1183 text_base
1184 };
1185
1186 let mut image_parts = Vec::new();
1187 for attachment in image_attachments {
1188 if attachment.data.len() > MAX_IMAGE_BYTES {
1189 tracing::warn!(
1190 size = attachment.data.len(),
1191 max = MAX_IMAGE_BYTES,
1192 "image attachment exceeds size limit, skipping"
1193 );
1194 continue;
1195 }
1196 let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
1197 image_parts.push(MessagePart::Image(Box::new(ImageData {
1198 data: attachment.data,
1199 mime_type,
1200 })));
1201 }
1202
1203 (text, image_parts)
1204 }
1205
1206 async fn process_user_message(
1207 &mut self,
1208 text: String,
1209 image_parts: Vec<zeph_llm::provider::MessagePart>,
1210 ) -> Result<(), error::AgentError> {
1211 let iteration_index = self.debug_state.iteration_counter;
1213 self.debug_state.iteration_counter += 1;
1214 if let Some(ref mut tc) = self.debug_state.trace_collector {
1215 tc.begin_iteration(iteration_index, text.trim());
1216 self.debug_state.current_iteration_span_id =
1218 tc.current_iteration_span_id(iteration_index);
1219 }
1220
1221 let result = self
1222 .process_user_message_inner(text, image_parts, iteration_index)
1223 .await;
1224
1225 if let Some(ref mut tc) = self.debug_state.trace_collector {
1227 let status = if result.is_ok() {
1228 crate::debug_dump::trace::SpanStatus::Ok
1229 } else {
1230 crate::debug_dump::trace::SpanStatus::Error {
1231 message: "iteration failed".to_owned(),
1232 }
1233 };
1234 tc.end_iteration(iteration_index, status);
1235 }
1236 self.debug_state.current_iteration_span_id = None;
1237
1238 result
1239 }
1240
1241 async fn process_user_message_inner(
1242 &mut self,
1243 text: String,
1244 image_parts: Vec<zeph_llm::provider::MessagePart>,
1245 iteration_index: usize,
1246 ) -> Result<(), error::AgentError> {
1247 let _ = iteration_index; self.lifecycle.cancel_token = CancellationToken::new();
1249 let signal = Arc::clone(&self.lifecycle.cancel_signal);
1250 let token = self.lifecycle.cancel_token.clone();
1251 tokio::spawn(async move {
1252 signal.notified().await;
1253 token.cancel();
1254 });
1255 let trimmed = text.trim();
1256
1257 if let Some(result) = self.dispatch_slash_command(trimmed).await {
1258 return result;
1259 }
1260
1261 self.check_pending_rollbacks().await;
1262
1263 if self.pre_process_security(trimmed).await? {
1264 return Ok(());
1265 }
1266
1267 self.advance_context_lifecycle(&text, trimmed).await;
1268
1269 let user_msg = self.build_user_message(&text, image_parts);
1270
1271 let urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
1273 if !urls.is_empty()
1274 && let Ok(mut set) = self.security.user_provided_urls.write()
1275 {
1276 set.extend(urls);
1277 }
1278
1279 self.memory_state.goal_text = Some(text.clone());
1282
1283 self.persist_message(Role::User, &text, &[], false).await;
1285 self.push_message(user_msg);
1286
1287 if let Err(e) = self.process_response().await {
1288 tracing::error!("Response processing failed: {e:#}");
1289 let user_msg = format!("Error: {e:#}");
1290 self.channel.send(&user_msg).await?;
1291 self.msg.messages.pop();
1292 self.recompute_prompt_tokens();
1293 self.channel.flush_chunks().await?;
1294 }
1295
1296 Ok(())
1297 }
1298
1299 async fn pre_process_security(&mut self, trimmed: &str) -> Result<bool, error::AgentError> {
1301 if let Some(ref guardrail) = self.security.guardrail {
1303 use zeph_sanitizer::guardrail::GuardrailVerdict;
1304 let verdict = guardrail.check(trimmed).await;
1305 match &verdict {
1306 GuardrailVerdict::Flagged { reason, .. } => {
1307 tracing::warn!(
1308 reason = %reason,
1309 should_block = verdict.should_block(),
1310 "guardrail flagged user input"
1311 );
1312 if verdict.should_block() {
1313 let msg = format!("[guardrail] Input blocked: {reason}");
1314 let _ = self.channel.send(&msg).await;
1315 let _ = self.channel.flush_chunks().await;
1316 return Ok(true);
1317 }
1318 let _ = self
1320 .channel
1321 .send(&format!("[guardrail] Warning: {reason}"))
1322 .await;
1323 }
1324 GuardrailVerdict::Error { error } => {
1325 if guardrail.error_should_block() {
1326 tracing::warn!(%error, "guardrail check failed (fail_strategy=closed), blocking input");
1327 let msg = "[guardrail] Input blocked: check failed (see logs for details)";
1328 let _ = self.channel.send(msg).await;
1329 let _ = self.channel.flush_chunks().await;
1330 return Ok(true);
1331 }
1332 tracing::warn!(%error, "guardrail check failed (fail_strategy=open), allowing input");
1333 }
1334 GuardrailVerdict::Safe => {}
1335 }
1336 }
1337
1338 #[cfg(feature = "classifiers")]
1344 if self.security.sanitizer.scan_user_input() {
1345 match self.security.sanitizer.classify_injection(trimmed).await {
1346 zeph_sanitizer::InjectionVerdict::Blocked => {
1347 self.push_classifier_metrics();
1348 let _ = self
1349 .channel
1350 .send("[security] Input blocked: injection detected by classifier.")
1351 .await;
1352 let _ = self.channel.flush_chunks().await;
1353 return Ok(true);
1354 }
1355 zeph_sanitizer::InjectionVerdict::Suspicious => {
1356 tracing::warn!("injection_classifier soft_signal on user input");
1357 }
1358 zeph_sanitizer::InjectionVerdict::Clean => {}
1359 }
1360 }
1361 #[cfg(feature = "classifiers")]
1362 self.push_classifier_metrics();
1363
1364 Ok(false)
1365 }
1366
1367 async fn advance_context_lifecycle(&mut self, text: &str, trimmed: &str) {
1368 self.mcp.pruning_cache.reset();
1370
1371 let conv_id = self.memory_state.conversation_id;
1374 self.rebuild_system_prompt(text).await;
1375
1376 self.detect_and_record_corrections(trimmed, conv_id).await;
1377 self.learning_engine.tick();
1378 self.analyze_and_learn().await;
1379 self.sync_graph_counts().await;
1380
1381 self.context_manager.compaction = self.context_manager.compaction.advance_turn();
1386
1387 {
1389 self.focus.tick();
1390
1391 let sidequest_should_fire = self.sidequest.tick();
1394 if sidequest_should_fire && !self.context_manager.compaction.is_compacted_this_turn() {
1395 self.maybe_sidequest_eviction();
1396 }
1397 }
1398
1399 self.maybe_apply_deferred_summaries();
1404 self.flush_deferred_summaries().await;
1405
1406 if let Err(e) = self.maybe_proactive_compress().await {
1408 tracing::warn!("proactive compression failed: {e:#}");
1409 }
1410
1411 if let Err(e) = self.maybe_compact().await {
1412 tracing::warn!("context compaction failed: {e:#}");
1413 }
1414
1415 if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
1416 tracing::warn!("context preparation failed: {e:#}");
1417 }
1418
1419 self.provider
1421 .set_memory_confidence(self.memory_state.last_recall_confidence);
1422
1423 self.learning_engine.reset_reflection();
1424 }
1425
1426 fn build_user_message(
1427 &mut self,
1428 text: &str,
1429 image_parts: Vec<zeph_llm::provider::MessagePart>,
1430 ) -> Message {
1431 let mut all_image_parts = std::mem::take(&mut self.msg.pending_image_parts);
1432 all_image_parts.extend(image_parts);
1433
1434 if !all_image_parts.is_empty() && self.provider.supports_vision() {
1435 let mut parts = vec![zeph_llm::provider::MessagePart::Text {
1436 text: text.to_owned(),
1437 }];
1438 parts.extend(all_image_parts);
1439 Message::from_parts(Role::User, parts)
1440 } else {
1441 if !all_image_parts.is_empty() {
1442 tracing::warn!(
1443 count = all_image_parts.len(),
1444 "image attachments dropped: provider does not support vision"
1445 );
1446 }
1447 Message {
1448 role: Role::User,
1449 content: text.to_owned(),
1450 parts: vec![],
1451 metadata: MessageMetadata::default(),
1452 }
1453 }
1454 }
1455
1456 async fn poll_subagent_until_done(&mut self, task_id: &str, label: &str) -> Option<String> {
1459 use crate::subagent::SubAgentState;
1460 let result = loop {
1461 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1462
1463 #[allow(clippy::redundant_closure_for_method_calls)]
1467 let pending = self
1468 .orchestration
1469 .subagent_manager
1470 .as_mut()
1471 .and_then(|m| m.try_recv_secret_request());
1472 if let Some((req_task_id, req)) = pending {
1473 let confirm_prompt = format!(
1476 "Sub-agent requests secret '{}'. Allow?",
1477 crate::text::truncate_to_chars(&req.secret_key, 100)
1478 );
1479 let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
1480 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1481 if approved {
1482 let ttl = std::time::Duration::from_secs(300);
1483 let key = req.secret_key.clone();
1484 if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
1485 let _ = mgr.deliver_secret(&req_task_id, key);
1486 }
1487 } else {
1488 let _ = mgr.deny_secret(&req_task_id);
1489 }
1490 }
1491 }
1492
1493 let mgr = self.orchestration.subagent_manager.as_ref()?;
1494 let statuses = mgr.statuses();
1495 let Some((_, status)) = statuses.iter().find(|(id, _)| id == task_id) else {
1496 break format!("{label} completed (no status available).");
1497 };
1498 match status.state {
1499 SubAgentState::Completed => {
1500 let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
1501 break format!("{label} completed: {msg}");
1502 }
1503 SubAgentState::Failed => {
1504 let msg = status
1505 .last_message
1506 .clone()
1507 .unwrap_or_else(|| "unknown error".into());
1508 break format!("{label} failed: {msg}");
1509 }
1510 SubAgentState::Canceled => {
1511 break format!("{label} was cancelled.");
1512 }
1513 _ => {
1514 let _ = self
1515 .channel
1516 .send_status(&format!(
1517 "{label}: turn {}/{}",
1518 status.turns_used,
1519 self.orchestration
1520 .subagent_manager
1521 .as_ref()
1522 .and_then(|m| m.agents_def(task_id))
1523 .map_or(20, |d| d.permissions.max_turns)
1524 ))
1525 .await;
1526 }
1527 }
1528 };
1529 Some(result)
1530 }
1531
1532 fn resolve_agent_id_prefix(&mut self, prefix: &str) -> Option<Result<String, String>> {
1535 let mgr = self.orchestration.subagent_manager.as_mut()?;
1536 let full_ids: Vec<String> = mgr
1537 .statuses()
1538 .into_iter()
1539 .map(|(tid, _)| tid)
1540 .filter(|tid| tid.starts_with(prefix))
1541 .collect();
1542 Some(match full_ids.as_slice() {
1543 [] => Err(format!("No sub-agent with id prefix '{prefix}'")),
1544 [fid] => Ok(fid.clone()),
1545 _ => Err(format!(
1546 "Ambiguous id prefix '{prefix}': matches {} agents",
1547 full_ids.len()
1548 )),
1549 })
1550 }
1551
1552 fn handle_agent_list(&self) -> Option<String> {
1553 use std::fmt::Write as _;
1554 let mgr = self.orchestration.subagent_manager.as_ref()?;
1555 let defs = mgr.definitions();
1556 if defs.is_empty() {
1557 return Some("No sub-agent definitions found.".into());
1558 }
1559 let mut out = String::from("Available sub-agents:\n");
1560 for d in defs {
1561 let memory_label = match d.memory {
1562 Some(crate::subagent::MemoryScope::User) => " [memory:user]",
1563 Some(crate::subagent::MemoryScope::Project) => " [memory:project]",
1564 Some(crate::subagent::MemoryScope::Local) => " [memory:local]",
1565 None => "",
1566 };
1567 if let Some(ref src) = d.source {
1568 let _ = writeln!(
1569 out,
1570 " {}{} — {} ({})",
1571 d.name, memory_label, d.description, src
1572 );
1573 } else {
1574 let _ = writeln!(out, " {}{} — {}", d.name, memory_label, d.description);
1575 }
1576 }
1577 Some(out)
1578 }
1579
1580 fn handle_agent_status(&self) -> Option<String> {
1581 use std::fmt::Write as _;
1582 let mgr = self.orchestration.subagent_manager.as_ref()?;
1583 let statuses = mgr.statuses();
1584 if statuses.is_empty() {
1585 return Some("No active sub-agents.".into());
1586 }
1587 let mut out = String::from("Active sub-agents:\n");
1588 for (id, s) in &statuses {
1589 let state = format!("{:?}", s.state).to_lowercase();
1590 let elapsed = s.started_at.elapsed().as_secs();
1591 let _ = writeln!(
1592 out,
1593 " [{short}] {state} turns={t} elapsed={elapsed}s {msg}",
1594 short = &id[..8.min(id.len())],
1595 t = s.turns_used,
1596 msg = s.last_message.as_deref().unwrap_or(""),
1597 );
1598 if let Some(def) = mgr.agents_def(id)
1600 && let Some(scope) = def.memory
1601 && let Ok(dir) = crate::subagent::memory::resolve_memory_dir(scope, &def.name)
1602 {
1603 let _ = writeln!(out, " memory: {}", dir.display());
1604 }
1605 }
1606 Some(out)
1607 }
1608
1609 fn handle_agent_approve(&mut self, id: &str) -> Option<String> {
1610 let full_id = match self.resolve_agent_id_prefix(id)? {
1611 Ok(fid) => fid,
1612 Err(msg) => return Some(msg),
1613 };
1614 let mgr = self.orchestration.subagent_manager.as_mut()?;
1615 if let Some((tid, req)) = mgr.try_recv_secret_request()
1616 && tid == full_id
1617 {
1618 let key = req.secret_key.clone();
1619 let ttl = std::time::Duration::from_secs(300);
1620 if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
1621 return Some(format!("Approve failed: {e}"));
1622 }
1623 if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
1624 return Some(format!("Secret delivery failed: {e}"));
1625 }
1626 return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
1627 }
1628 Some(format!(
1629 "No pending secret request for sub-agent '{full_id}'."
1630 ))
1631 }
1632
1633 fn handle_agent_deny(&mut self, id: &str) -> Option<String> {
1634 let full_id = match self.resolve_agent_id_prefix(id)? {
1635 Ok(fid) => fid,
1636 Err(msg) => return Some(msg),
1637 };
1638 let mgr = self.orchestration.subagent_manager.as_mut()?;
1639 match mgr.deny_secret(&full_id) {
1640 Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
1641 Err(e) => Some(format!("Deny failed: {e}")),
1642 }
1643 }
1644
1645 #[allow(clippy::too_many_lines)]
1646 async fn handle_agent_command(&mut self, cmd: crate::subagent::AgentCommand) -> Option<String> {
1647 use crate::subagent::AgentCommand;
1648
1649 match cmd {
1650 AgentCommand::List => self.handle_agent_list(),
1651 AgentCommand::Background { name, prompt } => {
1652 let provider = self.provider.clone();
1653 let tool_executor = Arc::clone(&self.tool_executor);
1654 let skills = self.filtered_skills_for(&name);
1655 let cfg = self.orchestration.subagent_config.clone();
1656 let spawn_ctx = self.build_spawn_context(&cfg);
1657 let mgr = self.orchestration.subagent_manager.as_mut()?;
1658 match mgr.spawn(
1659 &name,
1660 &prompt,
1661 provider,
1662 tool_executor,
1663 skills,
1664 &cfg,
1665 spawn_ctx,
1666 ) {
1667 Ok(id) => Some(format!(
1668 "Sub-agent '{name}' started in background (id: {short})",
1669 short = &id[..8.min(id.len())]
1670 )),
1671 Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
1672 }
1673 }
1674 AgentCommand::Spawn { name, prompt }
1675 | AgentCommand::Mention {
1676 agent: name,
1677 prompt,
1678 } => {
1679 let provider = self.provider.clone();
1681 let tool_executor = Arc::clone(&self.tool_executor);
1682 let skills = self.filtered_skills_for(&name);
1683 let cfg = self.orchestration.subagent_config.clone();
1684 let spawn_ctx = self.build_spawn_context(&cfg);
1685 let mgr = self.orchestration.subagent_manager.as_mut()?;
1686 let task_id = match mgr.spawn(
1687 &name,
1688 &prompt,
1689 provider,
1690 tool_executor,
1691 skills,
1692 &cfg,
1693 spawn_ctx,
1694 ) {
1695 Ok(id) => id,
1696 Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
1697 };
1698 let short = task_id[..8.min(task_id.len())].to_owned();
1699 let _ = self
1700 .channel
1701 .send(&format!("Sub-agent '{name}' running... (id: {short})"))
1702 .await;
1703 let label = format!("Sub-agent '{name}'");
1704 self.poll_subagent_until_done(&task_id, &label).await
1705 }
1706 AgentCommand::Status => self.handle_agent_status(),
1707 AgentCommand::Cancel { id } => {
1708 let mgr = self.orchestration.subagent_manager.as_mut()?;
1709 let ids: Vec<String> = mgr
1711 .statuses()
1712 .into_iter()
1713 .map(|(task_id, _)| task_id)
1714 .filter(|task_id| task_id.starts_with(&id))
1715 .collect();
1716 match ids.as_slice() {
1717 [] => Some(format!("No sub-agent with id prefix '{id}'")),
1718 [full_id] => {
1719 let full_id = full_id.clone();
1720 match mgr.cancel(&full_id) {
1721 Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
1722 Err(e) => Some(format!("Cancel failed: {e}")),
1723 }
1724 }
1725 _ => Some(format!(
1726 "Ambiguous id prefix '{id}': matches {} agents",
1727 ids.len()
1728 )),
1729 }
1730 }
1731 AgentCommand::Approve { id } => self.handle_agent_approve(&id),
1732 AgentCommand::Deny { id } => self.handle_agent_deny(&id),
1733 AgentCommand::Resume { id, prompt } => {
1734 let cfg = self.orchestration.subagent_config.clone();
1735 let def_name = {
1738 let mgr = self.orchestration.subagent_manager.as_ref()?;
1739 match mgr.def_name_for_resume(&id, &cfg) {
1740 Ok(name) => name,
1741 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
1742 }
1743 };
1744 let skills = self.filtered_skills_for(&def_name);
1745 let provider = self.provider.clone();
1746 let tool_executor = Arc::clone(&self.tool_executor);
1747 let mgr = self.orchestration.subagent_manager.as_mut()?;
1748 let (task_id, _) =
1749 match mgr.resume(&id, &prompt, provider, tool_executor, skills, &cfg) {
1750 Ok(pair) => pair,
1751 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
1752 };
1753 let short = task_id[..8.min(task_id.len())].to_owned();
1754 let _ = self
1755 .channel
1756 .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
1757 .await;
1758 self.poll_subagent_until_done(&task_id, "Resumed sub-agent")
1759 .await
1760 }
1761 }
1762 }
1763
1764 fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
1765 let mgr = self.orchestration.subagent_manager.as_ref()?;
1766 let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
1767 let reg = self
1768 .skill_state
1769 .registry
1770 .read()
1771 .expect("registry read lock");
1772 match crate::subagent::filter_skills(®, &def.skills) {
1773 Ok(skills) => {
1774 let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
1775 if bodies.is_empty() {
1776 None
1777 } else {
1778 Some(bodies)
1779 }
1780 }
1781 Err(e) => {
1782 tracing::warn!(error = %e, "skill filtering failed for sub-agent");
1783 None
1784 }
1785 }
1786 }
1787
1788 fn build_spawn_context(
1790 &self,
1791 cfg: &zeph_config::SubAgentConfig,
1792 ) -> crate::subagent::SpawnContext {
1793 crate::subagent::SpawnContext {
1794 parent_messages: self.extract_parent_messages(cfg),
1795 parent_cancel: Some(self.lifecycle.cancel_token.clone()),
1796 parent_provider_name: {
1797 let name = &self.runtime.active_provider_name;
1798 if name.is_empty() {
1799 None
1800 } else {
1801 Some(name.clone())
1802 }
1803 },
1804 spawn_depth: self.runtime.spawn_depth,
1805 mcp_tool_names: self.extract_mcp_tool_names(),
1806 }
1807 }
1808
1809 fn extract_parent_messages(
1814 &self,
1815 config: &zeph_config::SubAgentConfig,
1816 ) -> Vec<zeph_llm::provider::Message> {
1817 use zeph_llm::provider::Role;
1818 if config.context_window_turns == 0 {
1819 return Vec::new();
1820 }
1821 let non_system: Vec<_> = self
1822 .msg
1823 .messages
1824 .iter()
1825 .filter(|m| m.role != Role::System)
1826 .cloned()
1827 .collect();
1828 let take_count = config.context_window_turns * 2;
1829 let start = non_system.len().saturating_sub(take_count);
1830 let mut msgs = non_system[start..].to_vec();
1831
1832 let max_chars = 128_000usize / 4; let mut total_chars: usize = 0;
1835 let mut keep = msgs.len();
1836 for (i, m) in msgs.iter().enumerate() {
1837 total_chars += m.content.len();
1838 if total_chars > max_chars {
1839 keep = i;
1840 break;
1841 }
1842 }
1843 if keep < msgs.len() {
1844 tracing::info!(
1845 kept = keep,
1846 requested = config.context_window_turns * 2,
1847 "[subagent] truncated parent history from {} to {} turns due to token budget",
1848 config.context_window_turns * 2,
1849 keep
1850 );
1851 msgs.truncate(keep);
1852 }
1853 msgs
1854 }
1855
1856 fn extract_mcp_tool_names(&self) -> Vec<String> {
1858 self.tool_executor
1859 .tool_definitions_erased()
1860 .into_iter()
1861 .filter(|t| t.id.starts_with("mcp_"))
1862 .map(|t| t.id.to_string())
1863 .collect()
1864 }
1865
1866 async fn update_trust_for_reloaded_skills(&self, all_meta: &[zeph_skills::loader::SkillMeta]) {
1868 let Some(ref memory) = self.memory_state.memory else {
1869 return;
1870 };
1871 let trust_cfg = self.skill_state.trust_config.clone();
1872 let managed_dir = self.skill_state.managed_dir.clone();
1873 for meta in all_meta {
1874 let source_kind = if managed_dir
1875 .as_ref()
1876 .is_some_and(|d| meta.skill_dir.starts_with(d))
1877 {
1878 zeph_memory::store::SourceKind::Hub
1879 } else {
1880 zeph_memory::store::SourceKind::Local
1881 };
1882 let initial_level = if matches!(source_kind, zeph_memory::store::SourceKind::Hub) {
1883 &trust_cfg.default_level
1884 } else {
1885 &trust_cfg.local_level
1886 };
1887 match zeph_skills::compute_skill_hash(&meta.skill_dir) {
1888 Ok(current_hash) => {
1889 let existing = memory
1890 .sqlite()
1891 .load_skill_trust(&meta.name)
1892 .await
1893 .ok()
1894 .flatten();
1895 let trust_level_str = if let Some(ref row) = existing {
1896 if row.blake3_hash == current_hash {
1897 row.trust_level.clone()
1898 } else {
1899 trust_cfg.hash_mismatch_level.to_string()
1900 }
1901 } else {
1902 initial_level.to_string()
1903 };
1904 let source_path = meta.skill_dir.to_str();
1905 if let Err(e) = memory
1906 .sqlite()
1907 .upsert_skill_trust(
1908 &meta.name,
1909 &trust_level_str,
1910 source_kind,
1911 None,
1912 source_path,
1913 ¤t_hash,
1914 )
1915 .await
1916 {
1917 tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
1918 }
1919 }
1920 Err(e) => {
1921 tracing::warn!("failed to compute hash for '{}': {e:#}", meta.name);
1922 }
1923 }
1924 }
1925 }
1926
1927 async fn rebuild_skill_matcher(&mut self, all_meta: &[&zeph_skills::loader::SkillMeta]) {
1929 let provider = self.embedding_provider.clone();
1930 let embed_fn = |text: &str| -> zeph_skills::matcher::EmbedFuture {
1931 let owned = text.to_owned();
1932 let p = provider.clone();
1933 Box::pin(async move { p.embed(&owned).await })
1934 };
1935
1936 let needs_inmemory_rebuild = !self
1937 .skill_state
1938 .matcher
1939 .as_ref()
1940 .is_some_and(SkillMatcherBackend::is_qdrant);
1941
1942 if needs_inmemory_rebuild {
1943 self.skill_state.matcher = SkillMatcher::new(all_meta, embed_fn)
1944 .await
1945 .map(SkillMatcherBackend::InMemory);
1946 } else if let Some(ref mut backend) = self.skill_state.matcher {
1947 let _ = self.channel.send_status("syncing skill index...").await;
1948 if let Err(e) = backend
1949 .sync(all_meta, &self.skill_state.embedding_model, embed_fn)
1950 .await
1951 {
1952 tracing::warn!("failed to sync skill embeddings: {e:#}");
1953 }
1954 }
1955
1956 if self.skill_state.hybrid_search {
1957 let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
1958 let _ = self.channel.send_status("rebuilding search index...").await;
1959 self.skill_state.bm25_index = Some(zeph_skills::bm25::Bm25Index::build(&descs));
1960 }
1961 }
1962
1963 async fn reload_skills(&mut self) {
1964 let new_registry = SkillRegistry::load(&self.skill_state.skill_paths);
1965 if new_registry.fingerprint()
1966 == self
1967 .skill_state
1968 .registry
1969 .read()
1970 .expect("registry read lock")
1971 .fingerprint()
1972 {
1973 return;
1974 }
1975 let _ = self.channel.send_status("reloading skills...").await;
1976 *self
1977 .skill_state
1978 .registry
1979 .write()
1980 .expect("registry write lock") = new_registry;
1981
1982 let all_meta = self
1983 .skill_state
1984 .registry
1985 .read()
1986 .expect("registry read lock")
1987 .all_meta()
1988 .into_iter()
1989 .cloned()
1990 .collect::<Vec<_>>();
1991
1992 self.update_trust_for_reloaded_skills(&all_meta).await;
1993
1994 let all_meta_refs = all_meta.iter().collect::<Vec<_>>();
1995 self.rebuild_skill_matcher(&all_meta_refs).await;
1996
1997 let all_skills: Vec<Skill> = {
1998 let reg = self
1999 .skill_state
2000 .registry
2001 .read()
2002 .expect("registry read lock");
2003 reg.all_meta()
2004 .iter()
2005 .filter_map(|m| reg.get_skill(&m.name).ok())
2006 .collect()
2007 };
2008 let trust_map = self.build_skill_trust_map().await;
2009 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
2010 let skills_prompt = format_skills_prompt(&all_skills, &trust_map, &empty_health);
2011 self.skill_state
2012 .last_skills_prompt
2013 .clone_from(&skills_prompt);
2014 let system_prompt = build_system_prompt(&skills_prompt, None, None, false);
2015 if let Some(msg) = self.msg.messages.first_mut() {
2016 msg.content = system_prompt;
2017 }
2018
2019 let _ = self.channel.send_status("").await;
2020 tracing::info!(
2021 "reloaded {} skill(s)",
2022 self.skill_state
2023 .registry
2024 .read()
2025 .expect("registry read lock")
2026 .all_meta()
2027 .len()
2028 );
2029 }
2030
2031 fn reload_instructions(&mut self) {
2032 if let Some(ref mut rx) = self.instructions.reload_rx {
2034 while rx.try_recv().is_ok() {}
2035 }
2036 let Some(ref state) = self.instructions.reload_state else {
2037 return;
2038 };
2039 let new_blocks = crate::instructions::load_instructions(
2040 &state.base_dir,
2041 &state.provider_kinds,
2042 &state.explicit_files,
2043 state.auto_detect,
2044 );
2045 let old_sources: std::collections::HashSet<_> =
2046 self.instructions.blocks.iter().map(|b| &b.source).collect();
2047 let new_sources: std::collections::HashSet<_> =
2048 new_blocks.iter().map(|b| &b.source).collect();
2049 for added in new_sources.difference(&old_sources) {
2050 tracing::info!(path = %added.display(), "instruction file added");
2051 }
2052 for removed in old_sources.difference(&new_sources) {
2053 tracing::info!(path = %removed.display(), "instruction file removed");
2054 }
2055 tracing::info!(
2056 old_count = self.instructions.blocks.len(),
2057 new_count = new_blocks.len(),
2058 "reloaded instruction files"
2059 );
2060 self.instructions.blocks = new_blocks;
2061 }
2062
2063 fn reload_config(&mut self) {
2064 let Some(ref path) = self.lifecycle.config_path else {
2065 return;
2066 };
2067 let config = match Config::load(path) {
2068 Ok(c) => c,
2069 Err(e) => {
2070 tracing::warn!("config reload failed: {e:#}");
2071 return;
2072 }
2073 };
2074
2075 self.runtime.security = config.security;
2076 self.runtime.timeouts = config.timeouts;
2077 self.runtime.redact_credentials = config.memory.redact_credentials;
2078 self.memory_state.history_limit = config.memory.history_limit;
2079 self.memory_state.recall_limit = config.memory.semantic.recall_limit;
2080 self.memory_state.summarization_threshold = config.memory.summarization_threshold;
2081 self.skill_state.max_active_skills = config.skills.max_active_skills;
2082 self.skill_state.disambiguation_threshold = config.skills.disambiguation_threshold;
2083 self.skill_state.min_injection_score = config.skills.min_injection_score;
2084 self.skill_state.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
2085 self.skill_state.hybrid_search = config.skills.hybrid_search;
2086 self.skill_state.two_stage_matching = config.skills.two_stage_matching;
2087 self.skill_state.confusability_threshold =
2088 config.skills.confusability_threshold.clamp(0.0, 1.0);
2089 config
2090 .skills
2091 .generation_provider
2092 .as_str()
2093 .clone_into(&mut self.skill_state.generation_provider_name);
2094 self.skill_state.generation_output_dir =
2095 config.skills.generation_output_dir.as_deref().map(|p| {
2096 if let Some(stripped) = p.strip_prefix("~/") {
2097 dirs::home_dir()
2098 .map_or_else(|| std::path::PathBuf::from(p), |h| h.join(stripped))
2099 } else {
2100 std::path::PathBuf::from(p)
2101 }
2102 });
2103
2104 if config.memory.context_budget_tokens > 0 {
2105 self.context_manager.budget = Some(
2106 ContextBudget::new(config.memory.context_budget_tokens, 0.20)
2107 .with_graph_enabled(config.memory.graph.enabled),
2108 );
2109 } else {
2110 self.context_manager.budget = None;
2111 }
2112
2113 {
2114 let graph_cfg = &config.memory.graph;
2115 if graph_cfg.rpe.enabled {
2116 if self.memory_state.rpe_router.is_none() {
2118 self.memory_state.rpe_router =
2119 Some(std::sync::Mutex::new(zeph_memory::RpeRouter::new(
2120 graph_cfg.rpe.threshold,
2121 graph_cfg.rpe.max_skip_turns,
2122 )));
2123 }
2124 } else {
2125 self.memory_state.rpe_router = None;
2126 }
2127 self.memory_state.graph_config = graph_cfg.clone();
2128 }
2129 self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
2130 self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
2131 self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
2132 self.context_manager.compaction_cooldown_turns = config.memory.compaction_cooldown_turns;
2133 self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
2134 self.context_manager.compression = config.memory.compression.clone();
2135 self.context_manager.routing = config.memory.store_routing.clone();
2136 self.context_manager.store_routing_provider = if config
2138 .memory
2139 .store_routing
2140 .routing_classifier_provider
2141 .is_empty()
2142 {
2143 None
2144 } else {
2145 let resolved = self.resolve_background_provider(
2146 &config.memory.store_routing.routing_classifier_provider,
2147 );
2148 Some(std::sync::Arc::new(resolved))
2149 };
2150 self.memory_state.cross_session_score_threshold =
2151 config.memory.cross_session_score_threshold;
2152
2153 self.index.repo_map_tokens = config.index.repo_map_tokens;
2154 self.index.repo_map_ttl = std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
2155
2156 tracing::info!("config reloaded");
2157 }
2158
2159 async fn handle_focus_status_command(&mut self) -> Result<(), error::AgentError> {
2161 use std::fmt::Write;
2162 let mut out = String::from("Focus Agent status\n\n");
2163 let _ = writeln!(out, "Enabled: {}", self.focus.config.enabled);
2164 let _ = writeln!(out, "Active session: {}", self.focus.is_active());
2165 if let Some(ref scope) = self.focus.active_scope {
2166 let _ = writeln!(out, "Active scope: {scope}");
2167 }
2168 let _ = writeln!(
2169 out,
2170 "Knowledge blocks: {}",
2171 self.focus.knowledge_blocks.len()
2172 );
2173 let _ = writeln!(out, "Turns since focus: {}", self.focus.turns_since_focus);
2174 self.channel.send(&out).await?;
2175 Ok(())
2176 }
2177
2178 async fn handle_sidequest_status_command(&mut self) -> Result<(), error::AgentError> {
2180 use std::fmt::Write;
2181 let mut out = String::from("SideQuest status\n\n");
2182 let _ = writeln!(out, "Enabled: {}", self.sidequest.config.enabled);
2183 let _ = writeln!(
2184 out,
2185 "Interval turns: {}",
2186 self.sidequest.config.interval_turns
2187 );
2188 let _ = writeln!(out, "Turn counter: {}", self.sidequest.turn_counter);
2189 let _ = writeln!(out, "Passes run: {}", self.sidequest.passes_run);
2190 let _ = writeln!(
2191 out,
2192 "Total evicted: {} tool outputs",
2193 self.sidequest.total_evicted
2194 );
2195 self.channel.send(&out).await?;
2196 Ok(())
2197 }
2198
2199 #[allow(clippy::too_many_lines)]
2210 fn maybe_sidequest_eviction(&mut self) {
2211 use zeph_llm::provider::{Message, MessageMetadata, Role};
2212
2213 if self.sidequest.config.enabled {
2217 use crate::config::PruningStrategy;
2218 if !matches!(
2219 self.context_manager.compression.pruning_strategy,
2220 PruningStrategy::Reactive
2221 ) {
2222 tracing::warn!(
2223 strategy = ?self.context_manager.compression.pruning_strategy,
2224 "sidequest is enabled alongside a non-Reactive pruning strategy; \
2225 consider disabling sidequest.enabled to avoid redundant eviction"
2226 );
2227 }
2228 }
2229
2230 if self.focus.is_active() {
2232 tracing::debug!("sidequest: skipping — focus session active");
2233 self.compression.pending_sidequest_result = None;
2235 return;
2236 }
2237
2238 if let Some(handle) = self.compression.pending_sidequest_result.take() {
2240 use futures::FutureExt as _;
2242 match handle.now_or_never() {
2243 Some(Ok(Some(evicted_indices))) if !evicted_indices.is_empty() => {
2244 let cursors_snapshot = self.sidequest.tool_output_cursors.clone();
2245 let freed = self.sidequest.apply_eviction(
2246 &mut self.msg.messages,
2247 &evicted_indices,
2248 &self.metrics.token_counter,
2249 );
2250 if freed > 0 {
2251 self.recompute_prompt_tokens();
2252 self.context_manager.compaction =
2255 crate::agent::context_manager::CompactionState::CompactedThisTurn {
2256 cooldown: 0,
2257 };
2258 tracing::info!(
2259 freed_tokens = freed,
2260 evicted_cursors = evicted_indices.len(),
2261 pass = self.sidequest.passes_run,
2262 "sidequest eviction complete"
2263 );
2264 if let Some(ref d) = self.debug_state.debug_dumper {
2265 d.dump_sidequest_eviction(&cursors_snapshot, &evicted_indices, freed);
2266 }
2267 if let Some(ref tx) = self.session.status_tx {
2268 let _ = tx.send(format!("SideQuest evicted {freed} tokens"));
2269 }
2270 } else {
2271 if let Some(ref tx) = self.session.status_tx {
2273 let _ = tx.send(String::new());
2274 }
2275 }
2276 }
2277 Some(Ok(None | Some(_))) => {
2278 tracing::debug!("sidequest: pending result: no cursors to evict");
2279 if let Some(ref tx) = self.session.status_tx {
2280 let _ = tx.send(String::new());
2281 }
2282 }
2283 Some(Err(e)) => {
2284 tracing::debug!("sidequest: background task panicked: {e}");
2285 if let Some(ref tx) = self.session.status_tx {
2286 let _ = tx.send(String::new());
2287 }
2288 }
2289 None => {
2290 tracing::debug!(
2294 "sidequest: background LLM task not yet complete, rescheduling"
2295 );
2296 }
2297 }
2298 }
2299
2300 self.sidequest
2302 .rebuild_cursors(&self.msg.messages, &self.metrics.token_counter);
2303
2304 if self.sidequest.tool_output_cursors.is_empty() {
2305 tracing::debug!("sidequest: no eligible cursors");
2306 return;
2307 }
2308
2309 let prompt = self.sidequest.build_eviction_prompt();
2310 let max_eviction_ratio = self.sidequest.config.max_eviction_ratio;
2311 let n_cursors = self.sidequest.tool_output_cursors.len();
2312 let provider = self.summary_or_primary_provider().clone();
2314
2315 let handle = tokio::spawn(async move {
2317 let msgs = [Message {
2318 role: Role::User,
2319 content: prompt,
2320 parts: vec![],
2321 metadata: MessageMetadata::default(),
2322 }];
2323 let response =
2324 match tokio::time::timeout(std::time::Duration::from_secs(5), provider.chat(&msgs))
2325 .await
2326 {
2327 Ok(Ok(r)) => r,
2328 Ok(Err(e)) => {
2329 tracing::debug!("sidequest bg: LLM call failed: {e:#}");
2330 return None;
2331 }
2332 Err(_) => {
2333 tracing::debug!("sidequest bg: LLM call timed out");
2334 return None;
2335 }
2336 };
2337
2338 let start = response.find('{')?;
2339 let end = response.rfind('}')?;
2340 if start > end {
2341 return None;
2342 }
2343 let json_slice = &response[start..=end];
2344 let parsed: sidequest::EvictionResponse = serde_json::from_str(json_slice).ok()?;
2345 let mut valid: Vec<usize> = parsed
2346 .del_cursors
2347 .into_iter()
2348 .filter(|&c| c < n_cursors)
2349 .collect();
2350 valid.sort_unstable();
2351 valid.dedup();
2352 #[allow(
2353 clippy::cast_precision_loss,
2354 clippy::cast_possible_truncation,
2355 clippy::cast_sign_loss
2356 )]
2357 let max_evict = ((n_cursors as f32) * max_eviction_ratio).ceil() as usize;
2358 valid.truncate(max_evict);
2359 Some(valid)
2360 });
2361
2362 self.compression.pending_sidequest_result = Some(handle);
2363 tracing::debug!("sidequest: background LLM eviction task spawned");
2364 if let Some(ref tx) = self.session.status_tx {
2365 let _ = tx.send("SideQuest: scoring tool outputs...".into());
2366 }
2367 }
2368
2369 pub(crate) async fn check_cwd_changed(&mut self) {
2375 let current = match std::env::current_dir() {
2376 Ok(p) => p,
2377 Err(e) => {
2378 tracing::warn!("check_cwd_changed: failed to get cwd: {e}");
2379 return;
2380 }
2381 };
2382 if current == self.lifecycle.last_known_cwd {
2383 return;
2384 }
2385 let old_cwd = std::mem::replace(&mut self.lifecycle.last_known_cwd, current.clone());
2386 self.session.env_context.working_dir = current.display().to_string();
2387
2388 tracing::info!(
2389 old = %old_cwd.display(),
2390 new = %current.display(),
2391 "working directory changed"
2392 );
2393
2394 let _ = self
2395 .channel
2396 .send_status("Working directory changed\u{2026}")
2397 .await;
2398
2399 let hooks = self.session.hooks_config.cwd_changed.clone();
2400 if !hooks.is_empty() {
2401 let mut env = std::collections::HashMap::new();
2402 env.insert("ZEPH_OLD_CWD".to_owned(), old_cwd.display().to_string());
2403 env.insert("ZEPH_NEW_CWD".to_owned(), current.display().to_string());
2404 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env).await {
2405 tracing::warn!(error = %e, "CwdChanged hook failed");
2406 }
2407 }
2408
2409 let _ = self.channel.send_status("").await;
2410 }
2411
2412 pub(crate) async fn handle_file_changed(
2414 &mut self,
2415 event: crate::file_watcher::FileChangedEvent,
2416 ) {
2417 tracing::info!(path = %event.path.display(), "file changed");
2418
2419 let _ = self
2420 .channel
2421 .send_status("Running file-change hook\u{2026}")
2422 .await;
2423
2424 let hooks = self.session.hooks_config.file_changed_hooks.clone();
2425 if !hooks.is_empty() {
2426 let mut env = std::collections::HashMap::new();
2427 env.insert(
2428 "ZEPH_CHANGED_PATH".to_owned(),
2429 event.path.display().to_string(),
2430 );
2431 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env).await {
2432 tracing::warn!(error = %e, "FileChanged hook failed");
2433 }
2434 }
2435
2436 let _ = self.channel.send_status("").await;
2437 }
2438}
2439pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
2440 while !*rx.borrow_and_update() {
2441 if rx.changed().await.is_err() {
2442 std::future::pending::<()>().await;
2443 }
2444 }
2445}
2446
2447pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
2448 match rx {
2449 Some(inner) => {
2450 if let Some(v) = inner.recv().await {
2451 Some(v)
2452 } else {
2453 *rx = None;
2454 std::future::pending().await
2455 }
2456 }
2457 None => std::future::pending().await,
2458 }
2459}
2460
2461#[cfg(test)]
2462mod tests;
2463
2464#[cfg(test)]
2465pub(crate) use tests::agent_tests;