1mod agent_access_impl;
5pub(crate) mod agent_supervisor;
6mod autodream;
7mod builder;
8mod command_context_impls;
9pub(crate) mod compaction_strategy;
10pub(super) mod compression_feedback;
11mod context;
12mod context_impls;
13pub(crate) mod context_manager;
14mod corrections;
15pub mod error;
16mod experiment_cmd;
17pub(super) mod feedback_detector;
18pub(crate) mod focus;
19mod index;
20mod learning;
21pub(crate) mod learning_engine;
22mod log_commands;
23mod loop_event;
24mod lsp_commands;
25mod magic_docs;
26mod mcp;
27mod message_queue;
28mod microcompact;
29mod model_commands;
30mod persistence;
31#[cfg(feature = "scheduler")]
32mod plan;
33mod policy_commands;
34mod provider_cmd;
35pub(crate) mod rate_limiter;
36#[cfg(feature = "scheduler")]
37mod scheduler_commands;
38#[cfg(feature = "scheduler")]
39mod scheduler_loop;
40pub mod session_config;
41mod session_digest;
42pub(crate) mod sidequest;
43mod skill_management;
44pub mod slash_commands;
45pub(crate) mod state;
46pub(crate) mod tool_execution;
47pub(crate) mod tool_orchestrator;
48mod trust_commands;
49pub mod turn;
50mod utils;
51
52use std::collections::{HashMap, VecDeque};
53use std::sync::Arc;
54
55use parking_lot::RwLock;
56
57use tokio::sync::{mpsc, watch};
58use tokio_util::sync::CancellationToken;
59use zeph_llm::any::AnyProvider;
60use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
61use zeph_memory::TokenCounter;
62use zeph_memory::semantic::SemanticMemory;
63use zeph_skills::loader::Skill;
64use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
65use zeph_skills::prompt::format_skills_prompt;
66use zeph_skills::registry::SkillRegistry;
67use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
68
69use crate::channel::Channel;
70use crate::config::Config;
71use crate::context::{ContextBudget, build_system_prompt};
72use zeph_common::text::estimate_tokens;
73
74use loop_event::LoopEvent;
75use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, detect_image_mime};
76use state::CompressionState;
77use state::{
78 DebugState, ExperimentState, FeedbackState, IndexState, InstructionState, LifecycleState,
79 McpState, MemoryState, MessageState, MetricsState, OrchestrationState, ProviderState,
80 RuntimeConfig, SecurityState, SessionState, SkillState, ToolState,
81};
82
83pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
84pub(crate) const DOCUMENT_RAG_PREFIX: &str = "## Relevant documents\n";
85pub(crate) const RECALL_PREFIX: &str = "[semantic recall]\n";
86pub(crate) const CODE_CONTEXT_PREFIX: &str = "[code context]\n";
87pub(crate) const SUMMARY_PREFIX: &str = "[conversation summaries]\n";
88pub(crate) const CROSS_SESSION_PREFIX: &str = "[cross-session context]\n";
89pub(crate) const CORRECTIONS_PREFIX: &str = "[past corrections]\n";
90pub(crate) const GRAPH_FACTS_PREFIX: &str = "[known facts]\n";
91pub(crate) const SCHEDULED_TASK_PREFIX: &str = "Execute the following scheduled task now: ";
92pub(crate) const SESSION_DIGEST_PREFIX: &str = "[Session digest from previous interaction]\n";
93pub(crate) const LSP_NOTE_PREFIX: &str = "[lsp ";
98pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
99
100pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
101 use std::fmt::Write;
102 let capacity = "[tool output: ".len()
103 + tool_name.len()
104 + "]\n```\n".len()
105 + body.len()
106 + TOOL_OUTPUT_SUFFIX.len();
107 let mut buf = String::with_capacity(capacity);
108 let _ = write!(
109 buf,
110 "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
111 );
112 buf
113}
114
115pub struct Agent<C: Channel> {
147 provider: AnyProvider,
148 embedding_provider: AnyProvider,
153 channel: C,
154 pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
155 pub(super) msg: MessageState,
156 pub(super) memory_state: MemoryState,
157 pub(super) skill_state: SkillState,
158 pub(super) context_manager: context_manager::ContextManager,
159 pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
160 pub(super) learning_engine: learning_engine::LearningEngine,
161 pub(super) feedback: FeedbackState,
162 pub(super) runtime: RuntimeConfig,
163 pub(super) mcp: McpState,
164 pub(super) index: IndexState,
165 pub(super) session: SessionState,
166 pub(super) debug_state: DebugState,
167 pub(super) instructions: InstructionState,
168 pub(super) security: SecurityState,
169 pub(super) experiments: ExperimentState,
170 pub(super) compression: CompressionState,
171 pub(super) lifecycle: LifecycleState,
172 pub(super) providers: ProviderState,
173 pub(super) metrics: MetricsState,
174 pub(super) orchestration: OrchestrationState,
175 pub(super) focus: focus::FocusState,
177 pub(super) sidequest: sidequest::SidequestState,
179 pub(super) tool_state: ToolState,
181}
182
183impl<C: Channel> Agent<C> {
184 #[must_use]
208 pub fn new(
209 provider: AnyProvider,
210 channel: C,
211 registry: SkillRegistry,
212 matcher: Option<SkillMatcherBackend>,
213 max_active_skills: usize,
214 tool_executor: impl ToolExecutor + 'static,
215 ) -> Self {
216 let registry = Arc::new(RwLock::new(registry));
217 let embedding_provider = provider.clone();
218 Self::new_with_registry_arc(
219 provider,
220 embedding_provider,
221 channel,
222 registry,
223 matcher,
224 max_active_skills,
225 tool_executor,
226 )
227 }
228
229 #[must_use]
236 #[allow(clippy::too_many_lines)] pub fn new_with_registry_arc(
238 provider: AnyProvider,
239 embedding_provider: AnyProvider,
240 channel: C,
241 registry: Arc<RwLock<SkillRegistry>>,
242 matcher: Option<SkillMatcherBackend>,
243 max_active_skills: usize,
244 tool_executor: impl ToolExecutor + 'static,
245 ) -> Self {
246 debug_assert!(max_active_skills > 0, "max_active_skills must be > 0");
247 let all_skills: Vec<Skill> = {
248 let reg = registry.read();
249 reg.all_meta()
250 .iter()
251 .filter_map(|m| reg.get_skill(&m.name).ok())
252 .collect()
253 };
254 let empty_trust = HashMap::new();
255 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
256 let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
257 let system_prompt = build_system_prompt(&skills_prompt, None);
258 tracing::debug!(len = system_prompt.len(), "initial system prompt built");
259 tracing::trace!(prompt = %system_prompt, "full system prompt");
260
261 let initial_prompt_tokens = estimate_tokens(&system_prompt) as u64;
262 let token_counter = Arc::new(TokenCounter::new());
263 Self {
264 provider,
265 embedding_provider,
266 channel,
267 tool_executor: Arc::new(tool_executor),
268 msg: MessageState {
269 messages: vec![Message {
270 role: Role::System,
271 content: system_prompt,
272 parts: vec![],
273 metadata: MessageMetadata::default(),
274 }],
275 message_queue: VecDeque::new(),
276 pending_image_parts: Vec::new(),
277 last_persisted_message_id: None,
278 deferred_db_hide_ids: Vec::new(),
279 deferred_db_summaries: Vec::new(),
280 },
281 memory_state: MemoryState::default(),
282 skill_state: SkillState::new(registry, matcher, max_active_skills, skills_prompt),
283 context_manager: context_manager::ContextManager::new(),
284 tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
285 learning_engine: learning_engine::LearningEngine::new(),
286 feedback: FeedbackState::default(),
287 debug_state: DebugState::default(),
288 runtime: RuntimeConfig::default(),
289 mcp: McpState::default(),
290 index: IndexState::default(),
291 session: SessionState::new(),
292 instructions: InstructionState::default(),
293 security: SecurityState::default(),
294 experiments: ExperimentState::new(),
295 compression: CompressionState::default(),
296 lifecycle: LifecycleState::new(),
297 providers: ProviderState::new(initial_prompt_tokens),
298 metrics: MetricsState::new(token_counter),
299 orchestration: OrchestrationState::default(),
300 focus: focus::FocusState::default(),
301 sidequest: sidequest::SidequestState::default(),
302 tool_state: ToolState::default(),
303 }
304 }
305
306 pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
311 let Some(mgr) = &mut self.orchestration.subagent_manager else {
312 return vec![];
313 };
314
315 let finished: Vec<String> = mgr
316 .statuses()
317 .into_iter()
318 .filter_map(|(id, status)| {
319 if matches!(
320 status.state,
321 zeph_subagent::SubAgentState::Completed
322 | zeph_subagent::SubAgentState::Failed
323 | zeph_subagent::SubAgentState::Canceled
324 ) {
325 Some(id)
326 } else {
327 None
328 }
329 })
330 .collect();
331
332 let mut results = vec![];
333 for task_id in finished {
334 match mgr.collect(&task_id).await {
335 Ok(result) => results.push((task_id, result)),
336 Err(e) => {
337 tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
338 }
339 }
340 }
341 results
342 }
343
344 async fn call_llm_for_session_summary(
353 &self,
354 chat_messages: &[Message],
355 ) -> Option<zeph_memory::StructuredSummary> {
356 let timeout_dur = std::time::Duration::from_secs(
357 self.memory_state.compaction.shutdown_summary_timeout_secs,
358 );
359 match tokio::time::timeout(
360 timeout_dur,
361 self.provider
362 .chat_typed_erased::<zeph_memory::StructuredSummary>(chat_messages),
363 )
364 .await
365 {
366 Ok(Ok(s)) => Some(s),
367 Ok(Err(e)) => {
368 tracing::warn!(
369 "shutdown summary: structured LLM call failed, falling back to plain: {e:#}"
370 );
371 self.plain_text_summary_fallback(chat_messages, timeout_dur)
372 .await
373 }
374 Err(_) => {
375 tracing::warn!(
376 "shutdown summary: structured LLM call timed out after {}s, falling back to plain",
377 self.memory_state.compaction.shutdown_summary_timeout_secs
378 );
379 self.plain_text_summary_fallback(chat_messages, timeout_dur)
380 .await
381 }
382 }
383 }
384
385 async fn plain_text_summary_fallback(
386 &self,
387 chat_messages: &[Message],
388 timeout_dur: std::time::Duration,
389 ) -> Option<zeph_memory::StructuredSummary> {
390 match tokio::time::timeout(timeout_dur, self.provider.chat(chat_messages)).await {
391 Ok(Ok(plain)) => Some(zeph_memory::StructuredSummary {
392 summary: plain,
393 key_facts: vec![],
394 entities: vec![],
395 }),
396 Ok(Err(e)) => {
397 tracing::warn!("shutdown summary: plain LLM fallback failed: {e:#}");
398 None
399 }
400 Err(_) => {
401 tracing::warn!("shutdown summary: plain LLM fallback timed out");
402 None
403 }
404 }
405 }
406
407 async fn flush_orphaned_tool_use_on_shutdown(&mut self) {
412 use zeph_llm::provider::{MessagePart, Role};
413
414 let msgs = &self.msg.messages;
418 let Some(asst_idx) = msgs.iter().rposition(|m| m.role == Role::Assistant) else {
420 return;
421 };
422 let asst_msg = &msgs[asst_idx];
423 let tool_use_ids: Vec<(&str, &str, &serde_json::Value)> = asst_msg
424 .parts
425 .iter()
426 .filter_map(|p| {
427 if let MessagePart::ToolUse { id, name, input } = p {
428 Some((id.as_str(), name.as_str(), input))
429 } else {
430 None
431 }
432 })
433 .collect();
434 if tool_use_ids.is_empty() {
435 return;
436 }
437
438 let paired_ids: std::collections::HashSet<&str> = msgs
440 .get(asst_idx + 1..)
441 .into_iter()
442 .flatten()
443 .filter(|m| m.role == Role::User)
444 .flat_map(|m| m.parts.iter())
445 .filter_map(|p| {
446 if let MessagePart::ToolResult { tool_use_id, .. } = p {
447 Some(tool_use_id.as_str())
448 } else {
449 None
450 }
451 })
452 .collect();
453
454 let unpaired: Vec<zeph_llm::provider::ToolUseRequest> = tool_use_ids
455 .iter()
456 .filter(|(id, _, _)| !paired_ids.contains(*id))
457 .map(|(id, name, input)| zeph_llm::provider::ToolUseRequest {
458 id: (*id).to_owned(),
459 name: (*name).to_owned().into(),
460 input: (*input).clone(),
461 })
462 .collect();
463
464 if unpaired.is_empty() {
465 return;
466 }
467
468 tracing::info!(
469 count = unpaired.len(),
470 "shutdown: persisting tombstone ToolResults for unpaired in-flight tool calls"
471 );
472 self.persist_cancelled_tool_results(&unpaired).await;
473 }
474
475 async fn maybe_store_shutdown_summary(&mut self) {
485 if !self.memory_state.compaction.shutdown_summary {
486 return;
487 }
488 let Some(memory) = self.memory_state.persistence.memory.clone() else {
489 return;
490 };
491 let Some(conversation_id) = self.memory_state.persistence.conversation_id else {
492 return;
493 };
494
495 match memory.has_session_summary(conversation_id).await {
497 Ok(true) => {
498 tracing::debug!("shutdown summary: session already has a summary, skipping");
499 return;
500 }
501 Ok(false) => {}
502 Err(e) => {
503 tracing::warn!("shutdown summary: failed to check existing summary: {e:#}");
504 return;
505 }
506 }
507
508 let user_count = self
510 .msg
511 .messages
512 .iter()
513 .skip(1)
514 .filter(|m| m.role == Role::User)
515 .count();
516 if user_count < self.memory_state.compaction.shutdown_summary_min_messages {
517 tracing::debug!(
518 user_count,
519 min = self.memory_state.compaction.shutdown_summary_min_messages,
520 "shutdown summary: too few user messages, skipping"
521 );
522 return;
523 }
524
525 let _ = self.channel.send_status("Saving session summary...").await;
527
528 let max = self.memory_state.compaction.shutdown_summary_max_messages;
530 if max == 0 {
531 tracing::debug!("shutdown summary: max_messages=0, skipping");
532 return;
533 }
534 let non_system: Vec<_> = self.msg.messages.iter().skip(1).collect();
535 let slice = if non_system.len() > max {
536 &non_system[non_system.len() - max..]
537 } else {
538 &non_system[..]
539 };
540
541 let msgs_for_prompt: Vec<(zeph_memory::MessageId, String, String)> = slice
542 .iter()
543 .map(|m| {
544 let role = match m.role {
545 Role::User => "user".to_owned(),
546 Role::Assistant => "assistant".to_owned(),
547 Role::System => "system".to_owned(),
548 };
549 (zeph_memory::MessageId(0), role, m.content.clone())
550 })
551 .collect();
552
553 let prompt = zeph_memory::build_summarization_prompt(&msgs_for_prompt);
554 let chat_messages = vec![Message {
555 role: Role::User,
556 content: prompt,
557 parts: vec![],
558 metadata: MessageMetadata::default(),
559 }];
560
561 let Some(structured) = self.call_llm_for_session_summary(&chat_messages).await else {
562 let _ = self.channel.send_status("").await;
563 return;
564 };
565
566 if let Err(e) = memory
567 .store_shutdown_summary(conversation_id, &structured.summary, &structured.key_facts)
568 .await
569 {
570 tracing::warn!("shutdown summary: storage failed: {e:#}");
571 } else {
572 tracing::info!(
573 conversation_id = conversation_id.0,
574 "shutdown summary stored"
575 );
576 }
577
578 let _ = self.channel.send_status("").await;
580 }
581
582 pub async fn shutdown(&mut self) {
598 self.channel.send("Shutting down...").await.ok();
599
600 self.provider.save_router_state();
602
603 if let Some(ref mut mgr) = self.orchestration.subagent_manager {
604 mgr.shutdown_all();
605 }
606
607 if let Some(ref manager) = self.mcp.manager {
608 manager.shutdown_all_shared().await;
609 }
610
611 if let Some(turns) = self.context_manager.turns_since_last_hard_compaction {
615 self.update_metrics(|m| {
616 m.compaction_turns_after_hard.push(turns);
617 });
618 self.context_manager.turns_since_last_hard_compaction = None;
619 }
620
621 if let Some(ref tx) = self.metrics.metrics_tx {
622 let m = tx.borrow();
623 if m.filter_applications > 0 {
624 #[allow(clippy::cast_precision_loss)]
625 let pct = if m.filter_raw_tokens > 0 {
626 m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
627 } else {
628 0.0
629 };
630 tracing::info!(
631 raw_tokens = m.filter_raw_tokens,
632 saved_tokens = m.filter_saved_tokens,
633 applications = m.filter_applications,
634 "tool output filtering saved ~{} tokens ({pct:.0}%)",
635 m.filter_saved_tokens,
636 );
637 }
638 if m.compaction_hard_count > 0 {
639 tracing::info!(
640 hard_compactions = m.compaction_hard_count,
641 turns_after_hard = ?m.compaction_turns_after_hard,
642 "hard compaction trajectory"
643 );
644 }
645 }
646
647 self.flush_orphaned_tool_use_on_shutdown().await;
651
652 self.lifecycle.supervisor.abort_all();
653
654 self.maybe_store_shutdown_summary().await;
655 self.maybe_store_session_digest().await;
656
657 tracing::info!("agent shutdown complete");
658 }
659
660 fn refresh_subagent_metrics(&mut self) {
667 let Some(ref mgr) = self.orchestration.subagent_manager else {
668 return;
669 };
670 let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
671 .statuses()
672 .into_iter()
673 .map(|(id, s)| {
674 let def = mgr.agents_def(&id);
675 crate::metrics::SubAgentMetrics {
676 name: def.map_or_else(|| id[..8.min(id.len())].to_owned(), |d| d.name.clone()),
677 id: id.clone(),
678 state: format!("{:?}", s.state).to_lowercase(),
679 turns_used: s.turns_used,
680 max_turns: def.map_or(20, |d| d.permissions.max_turns),
681 background: def.is_some_and(|d| d.permissions.background),
682 elapsed_secs: s.started_at.elapsed().as_secs(),
683 permission_mode: def.map_or_else(String::new, |d| {
684 use zeph_subagent::def::PermissionMode;
685 match d.permissions.permission_mode {
686 PermissionMode::Default => String::new(),
687 PermissionMode::AcceptEdits => "accept_edits".into(),
688 PermissionMode::DontAsk => "dont_ask".into(),
689 PermissionMode::BypassPermissions => "bypass_permissions".into(),
690 PermissionMode::Plan => "plan".into(),
691 }
692 }),
693 transcript_dir: mgr
694 .agent_transcript_dir(&id)
695 .map(|p| p.to_string_lossy().into_owned()),
696 }
697 })
698 .collect();
699 self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
700 }
701
702 async fn notify_completed_subagents(&mut self) -> Result<(), error::AgentError> {
704 let completed = self.poll_subagents().await;
705 for (task_id, result) in completed {
706 let notice = if result.is_empty() {
707 format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
708 } else {
709 format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
710 };
711 if let Err(e) = self.channel.send(¬ice).await {
712 tracing::warn!(error = %e, "failed to send sub-agent completion notice");
713 }
714 }
715 Ok(())
716 }
717
718 #[allow(clippy::too_many_lines)] pub async fn run(&mut self) -> Result<(), error::AgentError>
725 where
726 C: 'static,
727 {
728 if let Some(mut rx) = self.lifecycle.warmup_ready.take()
729 && !*rx.borrow()
730 {
731 let _ = rx.changed().await;
732 if !*rx.borrow() {
733 tracing::warn!("model warmup did not complete successfully");
734 }
735 }
736
737 self.load_and_cache_session_digest().await;
739
740 loop {
741 self.apply_provider_override();
742 self.check_tool_refresh().await;
743 self.process_pending_elicitations().await;
744 self.refresh_subagent_metrics();
745 self.notify_completed_subagents().await?;
746 self.drain_channel();
747
748 let (text, image_parts) = if let Some(queued) = self.msg.message_queue.pop_front() {
749 self.notify_queue_count().await;
750 if queued.raw_attachments.is_empty() {
751 (queued.text, queued.image_parts)
752 } else {
753 let msg = crate::channel::ChannelMessage {
754 text: queued.text,
755 attachments: queued.raw_attachments,
756 };
757 self.resolve_message(msg).await
758 }
759 } else {
760 match self.next_event().await? {
761 None | Some(LoopEvent::Shutdown) => break,
762 Some(LoopEvent::SkillReload) => {
763 self.reload_skills().await;
764 continue;
765 }
766 Some(LoopEvent::InstructionReload) => {
767 self.reload_instructions();
768 continue;
769 }
770 Some(LoopEvent::ConfigReload) => {
771 self.reload_config();
772 continue;
773 }
774 Some(LoopEvent::UpdateNotification(msg)) => {
775 if let Err(e) = self.channel.send(&msg).await {
776 tracing::warn!("failed to send update notification: {e}");
777 }
778 continue;
779 }
780 Some(LoopEvent::ExperimentCompleted(msg)) => {
781 self.experiments.cancel = None;
782 if let Err(e) = self.channel.send(&msg).await {
783 tracing::warn!("failed to send experiment completion: {e}");
784 }
785 continue;
786 }
787 Some(LoopEvent::ScheduledTask(prompt)) => {
788 let text = format!("{SCHEDULED_TASK_PREFIX}{prompt}");
789 let msg = crate::channel::ChannelMessage {
790 text,
791 attachments: Vec::new(),
792 };
793 self.drain_channel();
794 self.resolve_message(msg).await
795 }
796 Some(LoopEvent::FileChanged(event)) => {
797 self.handle_file_changed(event).await;
798 continue;
799 }
800 Some(LoopEvent::Message(msg)) => {
801 self.drain_channel();
802 self.resolve_message(msg).await
803 }
804 }
805 };
806
807 let trimmed = text.trim();
808
809 if trimmed.starts_with('/') {
812 let slash_urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
813 if !slash_urls.is_empty() {
814 self.security.user_provided_urls.write().extend(slash_urls);
815 }
816 }
817
818 let session_impl = command_context_impls::SessionAccessImpl {
823 supports_exit: self.channel.supports_exit(),
824 };
825 let mut messages_impl = command_context_impls::MessageAccessImpl {
826 msg: &mut self.msg,
827 tool_state: &mut self.tool_state,
828 providers: &mut self.providers,
829 metrics: &self.metrics,
830 security: &mut self.security,
831 tool_orchestrator: &mut self.tool_orchestrator,
832 };
833 let mut sink_adapter = crate::channel::ChannelSinkAdapter(&mut self.channel);
835 let mut null_agent = zeph_commands::NullAgent;
837 let registry_handled = {
838 use zeph_commands::CommandRegistry;
839 use zeph_commands::handlers::debug::{
840 DebugDumpCommand, DumpFormatCommand, LogCommand,
841 };
842 use zeph_commands::handlers::help::HelpCommand;
843 use zeph_commands::handlers::session::{
844 ClearCommand, ClearQueueCommand, ExitCommand, QuitCommand, ResetCommand,
845 };
846
847 let mut reg = CommandRegistry::new();
848 reg.register(ExitCommand);
849 reg.register(QuitCommand);
850 reg.register(ClearCommand);
851 reg.register(ResetCommand);
852 reg.register(ClearQueueCommand);
853 reg.register(LogCommand);
854 reg.register(DebugDumpCommand);
855 reg.register(DumpFormatCommand);
856 reg.register(HelpCommand);
857
858 let mut ctx = zeph_commands::CommandContext {
859 sink: &mut sink_adapter,
860 debug: &mut self.debug_state,
861 messages: &mut messages_impl,
862 session: &session_impl,
863 agent: &mut null_agent,
864 };
865 reg.dispatch(&mut ctx, trimmed).await
866 };
867 match registry_handled {
868 Some(Ok(zeph_commands::CommandOutput::Exit)) => {
869 let _ = self.channel.flush_chunks().await;
870 break;
871 }
872 Some(Ok(
873 zeph_commands::CommandOutput::Continue | zeph_commands::CommandOutput::Silent,
874 )) => {
875 let _ = self.channel.flush_chunks().await;
876 continue;
877 }
878 Some(Ok(zeph_commands::CommandOutput::Message(msg))) => {
879 let _ = self.channel.send(&msg).await;
880 let _ = self.channel.flush_chunks().await;
881 continue;
882 }
883 Some(Err(e)) => return Err(error::AgentError::Other(e.0)),
884 None => {
885 }
887 }
888
889 let mut agent_null_debug = command_context_impls::NullDebugAccess;
895 let mut agent_null_messages = command_context_impls::NullMessageAccess;
896 let agent_null_session = command_context_impls::NullSessionAccess;
897 let mut agent_null_sink = zeph_commands::NullSink;
898 let agent_result: Option<
899 Result<zeph_commands::CommandOutput, zeph_commands::CommandError>,
900 > = if registry_handled.is_none() {
901 use zeph_commands::CommandRegistry;
902 use zeph_commands::handlers::{
903 agent_cmd::AgentCommand,
904 compaction::{CompactCommand, NewConversationCommand},
905 experiment::ExperimentCommand,
906 lsp::LspCommand,
907 mcp::McpCommand,
908 memory::{GraphCommand, GuidelinesCommand, MemoryCommand},
909 misc::{CacheStatsCommand, ImageCommand},
910 model::{ModelCommand, ProviderCommand},
911 plan::PlanCommand,
912 policy::PolicyCommand,
913 scheduler::SchedulerCommand,
914 skill::{FeedbackCommand, SkillCommand, SkillsCommand},
915 status::{FocusCommand, GuardrailCommand, SideQuestCommand, StatusCommand},
916 };
917
918 let mut agent_reg = CommandRegistry::new();
919 agent_reg.register(MemoryCommand);
920 agent_reg.register(GraphCommand);
921 agent_reg.register(GuidelinesCommand);
922 agent_reg.register(ModelCommand);
923 agent_reg.register(ProviderCommand);
924 agent_reg.register(SkillCommand);
926 agent_reg.register(SkillsCommand);
927 agent_reg.register(FeedbackCommand);
928 agent_reg.register(McpCommand);
929 agent_reg.register(PolicyCommand);
930 agent_reg.register(SchedulerCommand);
931 agent_reg.register(LspCommand);
932 agent_reg.register(CacheStatsCommand);
934 agent_reg.register(ImageCommand);
935 agent_reg.register(StatusCommand);
936 agent_reg.register(GuardrailCommand);
937 agent_reg.register(FocusCommand);
938 agent_reg.register(SideQuestCommand);
939 agent_reg.register(AgentCommand);
940 agent_reg.register(CompactCommand);
942 agent_reg.register(NewConversationCommand);
943 agent_reg.register(ExperimentCommand);
944 agent_reg.register(PlanCommand);
945
946 let mut ctx = zeph_commands::CommandContext {
947 sink: &mut agent_null_sink,
948 debug: &mut agent_null_debug,
949 messages: &mut agent_null_messages,
950 session: &agent_null_session,
951 agent: self,
952 };
953 agent_reg.dispatch(&mut ctx, trimmed).await
955 } else {
956 None
957 };
958 match agent_result {
960 Some(Ok(zeph_commands::CommandOutput::Exit)) => {
961 let _ = self.channel.flush_chunks().await;
962 break;
963 }
964 Some(Ok(
965 zeph_commands::CommandOutput::Continue | zeph_commands::CommandOutput::Silent,
966 )) => {
967 let _ = self.channel.flush_chunks().await;
968 continue;
969 }
970 Some(Ok(zeph_commands::CommandOutput::Message(msg))) => {
971 let _ = self.channel.send(&msg).await;
972 let _ = self.channel.flush_chunks().await;
973 self.maybe_trigger_post_command_learning(trimmed).await;
977 continue;
978 }
979 Some(Err(e)) => return Err(error::AgentError::Other(e.0)),
980 None => {
981 }
983 }
984
985 match self.handle_builtin_command(trimmed) {
986 Some(true) => break,
987 Some(false) => continue,
988 None => {}
989 }
990
991 self.process_user_message(text, image_parts).await?;
992 }
993
994 self.maybe_autodream().await;
997
998 if let Some(ref mut tc) = self.debug_state.trace_collector {
1000 tc.finish();
1001 }
1002
1003 Ok(())
1004 }
1005
1006 fn apply_provider_override(&mut self) {
1008 if let Some(ref slot) = self.providers.provider_override
1009 && let Some(new_provider) = slot.write().take()
1010 {
1011 tracing::debug!(provider = new_provider.name(), "ACP model override applied");
1012 self.provider = new_provider;
1013 }
1014 }
1015
1016 async fn next_event(&mut self) -> Result<Option<LoopEvent>, error::AgentError> {
1024 let event = tokio::select! {
1025 result = self.channel.recv() => {
1026 return Ok(result?.map(LoopEvent::Message));
1027 }
1028 () = shutdown_signal(&mut self.lifecycle.shutdown) => {
1029 tracing::info!("shutting down");
1030 LoopEvent::Shutdown
1031 }
1032 Some(_) = recv_optional(&mut self.skill_state.skill_reload_rx) => {
1033 LoopEvent::SkillReload
1034 }
1035 Some(_) = recv_optional(&mut self.instructions.reload_rx) => {
1036 LoopEvent::InstructionReload
1037 }
1038 Some(_) = recv_optional(&mut self.lifecycle.config_reload_rx) => {
1039 LoopEvent::ConfigReload
1040 }
1041 Some(msg) = recv_optional(&mut self.lifecycle.update_notify_rx) => {
1042 LoopEvent::UpdateNotification(msg)
1043 }
1044 Some(msg) = recv_optional(&mut self.experiments.notify_rx) => {
1045 LoopEvent::ExperimentCompleted(msg)
1046 }
1047 Some(prompt) = recv_optional(&mut self.lifecycle.custom_task_rx) => {
1048 tracing::info!("scheduler: injecting custom task as agent turn");
1049 LoopEvent::ScheduledTask(prompt)
1050 }
1051 Some(event) = recv_optional(&mut self.lifecycle.file_changed_rx) => {
1052 LoopEvent::FileChanged(event)
1053 }
1054 };
1055 Ok(Some(event))
1056 }
1057
1058 async fn resolve_message(
1059 &self,
1060 msg: crate::channel::ChannelMessage,
1061 ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
1062 use crate::channel::{Attachment, AttachmentKind};
1063 use zeph_llm::provider::{ImageData, MessagePart};
1064
1065 let text_base = msg.text.clone();
1066
1067 let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
1068 .attachments
1069 .into_iter()
1070 .partition(|a| a.kind == AttachmentKind::Audio);
1071
1072 tracing::debug!(
1073 audio = audio_attachments.len(),
1074 has_stt = self.providers.stt.is_some(),
1075 "resolve_message attachments"
1076 );
1077
1078 let text = if !audio_attachments.is_empty()
1079 && let Some(stt) = self.providers.stt.as_ref()
1080 {
1081 let mut transcribed_parts = Vec::new();
1082 for attachment in &audio_attachments {
1083 if attachment.data.len() > MAX_AUDIO_BYTES {
1084 tracing::warn!(
1085 size = attachment.data.len(),
1086 max = MAX_AUDIO_BYTES,
1087 "audio attachment exceeds size limit, skipping"
1088 );
1089 continue;
1090 }
1091 match stt
1092 .transcribe(&attachment.data, attachment.filename.as_deref())
1093 .await
1094 {
1095 Ok(result) => {
1096 tracing::info!(
1097 len = result.text.len(),
1098 language = ?result.language,
1099 "audio transcribed"
1100 );
1101 transcribed_parts.push(result.text);
1102 }
1103 Err(e) => {
1104 tracing::error!(error = %e, "audio transcription failed");
1105 }
1106 }
1107 }
1108 if transcribed_parts.is_empty() {
1109 text_base
1110 } else {
1111 let transcribed = transcribed_parts.join("\n");
1112 if text_base.is_empty() {
1113 transcribed
1114 } else {
1115 format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
1116 }
1117 }
1118 } else {
1119 if !audio_attachments.is_empty() {
1120 tracing::warn!(
1121 count = audio_attachments.len(),
1122 "audio attachments received but no STT provider configured, dropping"
1123 );
1124 }
1125 text_base
1126 };
1127
1128 let mut image_parts = Vec::new();
1129 for attachment in image_attachments {
1130 if attachment.data.len() > MAX_IMAGE_BYTES {
1131 tracing::warn!(
1132 size = attachment.data.len(),
1133 max = MAX_IMAGE_BYTES,
1134 "image attachment exceeds size limit, skipping"
1135 );
1136 continue;
1137 }
1138 let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
1139 image_parts.push(MessagePart::Image(Box::new(ImageData {
1140 data: attachment.data,
1141 mime_type,
1142 })));
1143 }
1144
1145 (text, image_parts)
1146 }
1147
1148 fn begin_turn(&mut self, input: turn::TurnInput) -> turn::Turn {
1155 let id = turn::TurnId(self.debug_state.iteration_counter as u64);
1156 self.debug_state.iteration_counter += 1;
1157 self.lifecycle.cancel_token = CancellationToken::new();
1158 self.security.user_provided_urls.write().clear();
1159 turn::Turn::new(id, input)
1160 }
1161
1162 fn end_turn(&mut self, turn: turn::Turn) {
1169 self.metrics.pending_timings = turn.metrics.timings;
1170 self.flush_turn_timings();
1171 }
1172
1173 #[cfg_attr(
1174 feature = "profiling",
1175 tracing::instrument(name = "agent.turn", skip_all, fields(turn_id))
1176 )]
1177 async fn process_user_message(
1178 &mut self,
1179 text: String,
1180 image_parts: Vec<zeph_llm::provider::MessagePart>,
1181 ) -> Result<(), error::AgentError> {
1182 let input = turn::TurnInput::new(text, image_parts);
1183 let mut t = self.begin_turn(input);
1184
1185 let turn_idx = usize::try_from(t.id().0).unwrap_or(usize::MAX);
1186 tracing::Span::current().record("turn_id", t.id().0);
1187 self.debug_state
1189 .start_iteration_span(turn_idx, t.input.text.trim());
1190
1191 let result = self.process_user_message_inner(&mut t).await;
1192
1193 let span_status = if result.is_ok() {
1195 crate::debug_dump::trace::SpanStatus::Ok
1196 } else {
1197 crate::debug_dump::trace::SpanStatus::Error {
1198 message: "iteration failed".to_owned(),
1199 }
1200 };
1201 self.debug_state.end_iteration_span(turn_idx, span_status);
1202
1203 self.end_turn(t);
1204 result
1205 }
1206
1207 async fn process_user_message_inner(
1208 &mut self,
1209 turn: &mut turn::Turn,
1210 ) -> Result<(), error::AgentError> {
1211 let bg_signal = self.lifecycle.supervisor.reap();
1215 if bg_signal.did_summarize {
1216 self.memory_state.persistence.unsummarized_count = 0;
1217 tracing::debug!("background summarization completed; unsummarized_count reset");
1218 }
1219 {
1220 let snap = self.lifecycle.supervisor.metrics_snapshot();
1221 self.update_metrics(|m| {
1222 m.bg_inflight = snap.inflight as u64;
1223 m.bg_dropped = snap.total_dropped();
1224 m.bg_completed = snap.total_completed();
1225 m.bg_enrichment_inflight = snap.class_inflight[0] as u64;
1226 m.bg_telemetry_inflight = snap.class_inflight[1] as u64;
1227 });
1228 }
1229
1230 if self.runtime.supervisor_config.abort_enrichment_on_turn {
1234 self.lifecycle
1235 .supervisor
1236 .abort_class(agent_supervisor::TaskClass::Enrichment);
1237 }
1238
1239 let signal = Arc::clone(&self.lifecycle.cancel_signal);
1243 let token = turn.cancel_token.clone();
1244 self.lifecycle.cancel_token = turn.cancel_token.clone();
1246 if let Some(prev) = self.lifecycle.cancel_bridge_handle.take() {
1247 prev.abort();
1248 }
1249 self.lifecycle.cancel_bridge_handle = Some(tokio::spawn(async move {
1250 signal.notified().await;
1251 token.cancel();
1252 }));
1253
1254 let text = turn.input.text.clone();
1256 let trimmed_owned = text.trim().to_owned();
1257 let trimmed = trimmed_owned.as_str();
1258
1259 if let Some(result) = self.dispatch_slash_command(trimmed).await {
1260 return result;
1261 }
1262
1263 self.check_pending_rollbacks().await;
1264
1265 if self.pre_process_security(trimmed).await? {
1266 return Ok(());
1267 }
1268
1269 let t_ctx = std::time::Instant::now();
1270 tracing::debug!("turn timing: prepare_context start");
1271 self.advance_context_lifecycle(&text, trimmed).await;
1272 turn.metrics_mut().timings.prepare_context_ms =
1273 u64::try_from(t_ctx.elapsed().as_millis()).unwrap_or(u64::MAX);
1274 tracing::debug!(
1275 ms = turn.metrics_snapshot().timings.prepare_context_ms,
1276 "turn timing: prepare_context done"
1277 );
1278
1279 let image_parts = std::mem::take(&mut turn.input.image_parts);
1280 let user_msg = self.build_user_message(&text, image_parts);
1281
1282 let urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
1285 if !urls.is_empty() {
1286 self.security.user_provided_urls.write().extend(urls);
1287 }
1288
1289 self.memory_state.extraction.goal_text = Some(text.clone());
1292
1293 let t_persist = std::time::Instant::now();
1294 tracing::debug!("turn timing: persist_message(user) start");
1295 self.persist_message(Role::User, &text, &[], false).await;
1297 turn.metrics_mut().timings.persist_message_ms =
1298 u64::try_from(t_persist.elapsed().as_millis()).unwrap_or(u64::MAX);
1299 tracing::debug!(
1300 ms = turn.metrics_snapshot().timings.persist_message_ms,
1301 "turn timing: persist_message(user) done"
1302 );
1303 self.push_message(user_msg);
1304
1305 tracing::debug!("turn timing: process_response start");
1308 if let Err(e) = self.process_response().await {
1309 self.learning_engine.learning_tasks.detach_all();
1311 tracing::error!("Response processing failed: {e:#}");
1312 let user_msg = format!("Error: {e:#}");
1313 self.channel.send(&user_msg).await?;
1314 self.msg.messages.pop();
1315 self.recompute_prompt_tokens();
1316 self.channel.flush_chunks().await?;
1317 } else {
1318 self.learning_engine.learning_tasks.detach_all();
1321 self.truncate_old_tool_results();
1322 self.maybe_update_magic_docs();
1324 }
1325 tracing::debug!("turn timing: process_response done");
1326
1327 turn.metrics_mut().timings.llm_chat_ms = self.metrics.pending_timings.llm_chat_ms;
1332 turn.metrics_mut().timings.tool_exec_ms = self.metrics.pending_timings.tool_exec_ms;
1333
1334 Ok(())
1335 }
1336
1337 #[cfg_attr(
1339 feature = "profiling",
1340 tracing::instrument(name = "agent.security_prescreen", skip_all)
1341 )]
1342 async fn pre_process_security(&mut self, trimmed: &str) -> Result<bool, error::AgentError> {
1343 if let Some(ref guardrail) = self.security.guardrail {
1345 use zeph_sanitizer::guardrail::GuardrailVerdict;
1346 let verdict = guardrail.check(trimmed).await;
1347 match &verdict {
1348 GuardrailVerdict::Flagged { reason, .. } => {
1349 tracing::warn!(
1350 reason = %reason,
1351 should_block = verdict.should_block(),
1352 "guardrail flagged user input"
1353 );
1354 if verdict.should_block() {
1355 let msg = format!("[guardrail] Input blocked: {reason}");
1356 let _ = self.channel.send(&msg).await;
1357 let _ = self.channel.flush_chunks().await;
1358 return Ok(true);
1359 }
1360 let _ = self
1362 .channel
1363 .send(&format!("[guardrail] Warning: {reason}"))
1364 .await;
1365 }
1366 GuardrailVerdict::Error { error } => {
1367 if guardrail.error_should_block() {
1368 tracing::warn!(%error, "guardrail check failed (fail_strategy=closed), blocking input");
1369 let msg = "[guardrail] Input blocked: check failed (see logs for details)";
1370 let _ = self.channel.send(msg).await;
1371 let _ = self.channel.flush_chunks().await;
1372 return Ok(true);
1373 }
1374 tracing::warn!(%error, "guardrail check failed (fail_strategy=open), allowing input");
1375 }
1376 GuardrailVerdict::Safe => {}
1377 }
1378 }
1379
1380 #[cfg(feature = "classifiers")]
1386 if self.security.sanitizer.scan_user_input() {
1387 match self.security.sanitizer.classify_injection(trimmed).await {
1388 zeph_sanitizer::InjectionVerdict::Blocked => {
1389 self.push_classifier_metrics();
1390 let _ = self
1391 .channel
1392 .send("[security] Input blocked: injection detected by classifier.")
1393 .await;
1394 let _ = self.channel.flush_chunks().await;
1395 return Ok(true);
1396 }
1397 zeph_sanitizer::InjectionVerdict::Suspicious => {
1398 tracing::warn!("injection_classifier soft_signal on user input");
1399 }
1400 zeph_sanitizer::InjectionVerdict::Clean => {}
1401 }
1402 }
1403 #[cfg(feature = "classifiers")]
1404 self.push_classifier_metrics();
1405
1406 Ok(false)
1407 }
1408
1409 #[cfg_attr(
1410 feature = "profiling",
1411 tracing::instrument(name = "agent.prepare_context", skip_all)
1412 )]
1413 async fn advance_context_lifecycle(&mut self, text: &str, trimmed: &str) {
1414 self.mcp.pruning_cache.reset();
1416
1417 let conv_id = self.memory_state.persistence.conversation_id;
1420 self.rebuild_system_prompt(text).await;
1421
1422 self.detect_and_record_corrections(trimmed, conv_id).await;
1423 self.learning_engine.tick();
1424 self.analyze_and_learn().await;
1425 self.sync_graph_counts().await;
1426
1427 self.context_manager.compaction = self.context_manager.compaction.advance_turn();
1432
1433 {
1435 self.focus.tick();
1436
1437 let sidequest_should_fire = self.sidequest.tick();
1440 if sidequest_should_fire && !self.context_manager.compaction.is_compacted_this_turn() {
1441 self.maybe_sidequest_eviction();
1442 }
1443 }
1444
1445 if let Some(warning) = self.cache_expiry_warning() {
1447 tracing::info!(warning, "cache expiry warning");
1448 let _ = self.channel.send_status(&warning).await;
1449 }
1450
1451 self.maybe_time_based_microcompact();
1454
1455 self.maybe_apply_deferred_summaries();
1460 self.flush_deferred_summaries().await;
1461
1462 if let Err(e) = self.maybe_proactive_compress().await {
1464 tracing::warn!("proactive compression failed: {e:#}");
1465 }
1466
1467 if let Err(e) = self.maybe_compact().await {
1468 tracing::warn!("context compaction failed: {e:#}");
1469 }
1470
1471 if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
1472 tracing::warn!("context preparation failed: {e:#}");
1473 }
1474
1475 self.provider
1477 .set_memory_confidence(self.memory_state.persistence.last_recall_confidence);
1478
1479 self.learning_engine.reset_reflection();
1480 }
1481
1482 fn build_user_message(
1483 &mut self,
1484 text: &str,
1485 image_parts: Vec<zeph_llm::provider::MessagePart>,
1486 ) -> Message {
1487 let mut all_image_parts = std::mem::take(&mut self.msg.pending_image_parts);
1488 all_image_parts.extend(image_parts);
1489
1490 if !all_image_parts.is_empty() && self.provider.supports_vision() {
1491 let mut parts = vec![zeph_llm::provider::MessagePart::Text {
1492 text: text.to_owned(),
1493 }];
1494 parts.extend(all_image_parts);
1495 Message::from_parts(Role::User, parts)
1496 } else {
1497 if !all_image_parts.is_empty() {
1498 tracing::warn!(
1499 count = all_image_parts.len(),
1500 "image attachments dropped: provider does not support vision"
1501 );
1502 }
1503 Message {
1504 role: Role::User,
1505 content: text.to_owned(),
1506 parts: vec![],
1507 metadata: MessageMetadata::default(),
1508 }
1509 }
1510 }
1511
1512 async fn poll_subagent_until_done(&mut self, task_id: &str, label: &str) -> Option<String> {
1515 use zeph_subagent::SubAgentState;
1516 let result = loop {
1517 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1518
1519 #[allow(clippy::redundant_closure_for_method_calls)]
1523 let pending = self
1524 .orchestration
1525 .subagent_manager
1526 .as_mut()
1527 .and_then(|m| m.try_recv_secret_request());
1528 if let Some((req_task_id, req)) = pending {
1529 let confirm_prompt = format!(
1532 "Sub-agent requests secret '{}'. Allow?",
1533 crate::text::truncate_to_chars(&req.secret_key, 100)
1534 );
1535 let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
1536 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1537 if approved {
1538 let ttl = std::time::Duration::from_secs(300);
1539 let key = req.secret_key.clone();
1540 if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
1541 let _ = mgr.deliver_secret(&req_task_id, key);
1542 }
1543 } else {
1544 let _ = mgr.deny_secret(&req_task_id);
1545 }
1546 }
1547 }
1548
1549 let mgr = self.orchestration.subagent_manager.as_ref()?;
1550 let statuses = mgr.statuses();
1551 let Some((_, status)) = statuses.iter().find(|(id, _)| id == task_id) else {
1552 break format!("{label} completed (no status available).");
1553 };
1554 match status.state {
1555 SubAgentState::Completed => {
1556 let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
1557 break format!("{label} completed: {msg}");
1558 }
1559 SubAgentState::Failed => {
1560 let msg = status
1561 .last_message
1562 .clone()
1563 .unwrap_or_else(|| "unknown error".into());
1564 break format!("{label} failed: {msg}");
1565 }
1566 SubAgentState::Canceled => {
1567 break format!("{label} was cancelled.");
1568 }
1569 _ => {
1570 let _ = self
1571 .channel
1572 .send_status(&format!(
1573 "{label}: turn {}/{}",
1574 status.turns_used,
1575 self.orchestration
1576 .subagent_manager
1577 .as_ref()
1578 .and_then(|m| m.agents_def(task_id))
1579 .map_or(20, |d| d.permissions.max_turns)
1580 ))
1581 .await;
1582 }
1583 }
1584 };
1585 Some(result)
1586 }
1587
1588 fn resolve_agent_id_prefix(&mut self, prefix: &str) -> Option<Result<String, String>> {
1591 let mgr = self.orchestration.subagent_manager.as_mut()?;
1592 let full_ids: Vec<String> = mgr
1593 .statuses()
1594 .into_iter()
1595 .map(|(tid, _)| tid)
1596 .filter(|tid| tid.starts_with(prefix))
1597 .collect();
1598 Some(match full_ids.as_slice() {
1599 [] => Err(format!("No sub-agent with id prefix '{prefix}'")),
1600 [fid] => Ok(fid.clone()),
1601 _ => Err(format!(
1602 "Ambiguous id prefix '{prefix}': matches {} agents",
1603 full_ids.len()
1604 )),
1605 })
1606 }
1607
1608 fn handle_agent_list(&self) -> Option<String> {
1609 use std::fmt::Write as _;
1610 let mgr = self.orchestration.subagent_manager.as_ref()?;
1611 let defs = mgr.definitions();
1612 if defs.is_empty() {
1613 return Some("No sub-agent definitions found.".into());
1614 }
1615 let mut out = String::from("Available sub-agents:\n");
1616 for d in defs {
1617 let memory_label = match d.memory {
1618 Some(zeph_subagent::MemoryScope::User) => " [memory:user]",
1619 Some(zeph_subagent::MemoryScope::Project) => " [memory:project]",
1620 Some(zeph_subagent::MemoryScope::Local) => " [memory:local]",
1621 None => "",
1622 };
1623 if let Some(ref src) = d.source {
1624 let _ = writeln!(
1625 out,
1626 " {}{} — {} ({})",
1627 d.name, memory_label, d.description, src
1628 );
1629 } else {
1630 let _ = writeln!(out, " {}{} — {}", d.name, memory_label, d.description);
1631 }
1632 }
1633 Some(out)
1634 }
1635
1636 fn handle_agent_status(&self) -> Option<String> {
1637 use std::fmt::Write as _;
1638 let mgr = self.orchestration.subagent_manager.as_ref()?;
1639 let statuses = mgr.statuses();
1640 if statuses.is_empty() {
1641 return Some("No active sub-agents.".into());
1642 }
1643 let mut out = String::from("Active sub-agents:\n");
1644 for (id, s) in &statuses {
1645 let state = format!("{:?}", s.state).to_lowercase();
1646 let elapsed = s.started_at.elapsed().as_secs();
1647 let _ = writeln!(
1648 out,
1649 " [{short}] {state} turns={t} elapsed={elapsed}s {msg}",
1650 short = &id[..8.min(id.len())],
1651 t = s.turns_used,
1652 msg = s.last_message.as_deref().unwrap_or(""),
1653 );
1654 if let Some(def) = mgr.agents_def(id)
1656 && let Some(scope) = def.memory
1657 && let Ok(dir) = zeph_subagent::memory::resolve_memory_dir(scope, &def.name)
1658 {
1659 let _ = writeln!(out, " memory: {}", dir.display());
1660 }
1661 }
1662 Some(out)
1663 }
1664
1665 fn handle_agent_approve(&mut self, id: &str) -> Option<String> {
1666 let full_id = match self.resolve_agent_id_prefix(id)? {
1667 Ok(fid) => fid,
1668 Err(msg) => return Some(msg),
1669 };
1670 let mgr = self.orchestration.subagent_manager.as_mut()?;
1671 if let Some((tid, req)) = mgr.try_recv_secret_request()
1672 && tid == full_id
1673 {
1674 let key = req.secret_key.clone();
1675 let ttl = std::time::Duration::from_secs(300);
1676 if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
1677 return Some(format!("Approve failed: {e}"));
1678 }
1679 if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
1680 return Some(format!("Secret delivery failed: {e}"));
1681 }
1682 return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
1683 }
1684 Some(format!(
1685 "No pending secret request for sub-agent '{full_id}'."
1686 ))
1687 }
1688
1689 fn handle_agent_deny(&mut self, id: &str) -> Option<String> {
1690 let full_id = match self.resolve_agent_id_prefix(id)? {
1691 Ok(fid) => fid,
1692 Err(msg) => return Some(msg),
1693 };
1694 let mgr = self.orchestration.subagent_manager.as_mut()?;
1695 match mgr.deny_secret(&full_id) {
1696 Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
1697 Err(e) => Some(format!("Deny failed: {e}")),
1698 }
1699 }
1700
1701 #[allow(clippy::too_many_lines)]
1702 async fn handle_agent_command(&mut self, cmd: zeph_subagent::AgentCommand) -> Option<String> {
1703 use zeph_subagent::AgentCommand;
1704
1705 match cmd {
1706 AgentCommand::List => self.handle_agent_list(),
1707 AgentCommand::Background { name, prompt } => {
1708 let provider = self.provider.clone();
1709 let tool_executor = Arc::clone(&self.tool_executor);
1710 let skills = self.filtered_skills_for(&name);
1711 let cfg = self.orchestration.subagent_config.clone();
1712 let spawn_ctx = self.build_spawn_context(&cfg);
1713 let mgr = self.orchestration.subagent_manager.as_mut()?;
1714 match mgr.spawn(
1715 &name,
1716 &prompt,
1717 provider,
1718 tool_executor,
1719 skills,
1720 &cfg,
1721 spawn_ctx,
1722 ) {
1723 Ok(id) => Some(format!(
1724 "Sub-agent '{name}' started in background (id: {short})",
1725 short = &id[..8.min(id.len())]
1726 )),
1727 Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
1728 }
1729 }
1730 AgentCommand::Spawn { name, prompt }
1731 | AgentCommand::Mention {
1732 agent: name,
1733 prompt,
1734 } => {
1735 let provider = self.provider.clone();
1737 let tool_executor = Arc::clone(&self.tool_executor);
1738 let skills = self.filtered_skills_for(&name);
1739 let cfg = self.orchestration.subagent_config.clone();
1740 let spawn_ctx = self.build_spawn_context(&cfg);
1741 let mgr = self.orchestration.subagent_manager.as_mut()?;
1742 let task_id = match mgr.spawn(
1743 &name,
1744 &prompt,
1745 provider,
1746 tool_executor,
1747 skills,
1748 &cfg,
1749 spawn_ctx,
1750 ) {
1751 Ok(id) => id,
1752 Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
1753 };
1754 let short = task_id[..8.min(task_id.len())].to_owned();
1755 let _ = self
1756 .channel
1757 .send(&format!("Sub-agent '{name}' running... (id: {short})"))
1758 .await;
1759 let label = format!("Sub-agent '{name}'");
1760 self.poll_subagent_until_done(&task_id, &label).await
1761 }
1762 AgentCommand::Status => self.handle_agent_status(),
1763 AgentCommand::Cancel { id } => {
1764 let mgr = self.orchestration.subagent_manager.as_mut()?;
1765 let ids: Vec<String> = mgr
1767 .statuses()
1768 .into_iter()
1769 .map(|(task_id, _)| task_id)
1770 .filter(|task_id| task_id.starts_with(&id))
1771 .collect();
1772 match ids.as_slice() {
1773 [] => Some(format!("No sub-agent with id prefix '{id}'")),
1774 [full_id] => {
1775 let full_id = full_id.clone();
1776 match mgr.cancel(&full_id) {
1777 Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
1778 Err(e) => Some(format!("Cancel failed: {e}")),
1779 }
1780 }
1781 _ => Some(format!(
1782 "Ambiguous id prefix '{id}': matches {} agents",
1783 ids.len()
1784 )),
1785 }
1786 }
1787 AgentCommand::Approve { id } => self.handle_agent_approve(&id),
1788 AgentCommand::Deny { id } => self.handle_agent_deny(&id),
1789 AgentCommand::Resume { id, prompt } => {
1790 let cfg = self.orchestration.subagent_config.clone();
1791 let def_name = {
1794 let mgr = self.orchestration.subagent_manager.as_ref()?;
1795 match mgr.def_name_for_resume(&id, &cfg) {
1796 Ok(name) => name,
1797 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
1798 }
1799 };
1800 let skills = self.filtered_skills_for(&def_name);
1801 let provider = self.provider.clone();
1802 let tool_executor = Arc::clone(&self.tool_executor);
1803 let mgr = self.orchestration.subagent_manager.as_mut()?;
1804 let (task_id, _) =
1805 match mgr.resume(&id, &prompt, provider, tool_executor, skills, &cfg) {
1806 Ok(pair) => pair,
1807 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
1808 };
1809 let short = task_id[..8.min(task_id.len())].to_owned();
1810 let _ = self
1811 .channel
1812 .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
1813 .await;
1814 self.poll_subagent_until_done(&task_id, "Resumed sub-agent")
1815 .await
1816 }
1817 }
1818 }
1819
1820 fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
1821 let mgr = self.orchestration.subagent_manager.as_ref()?;
1822 let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
1823 let reg = self.skill_state.registry.read();
1824 match zeph_subagent::filter_skills(®, &def.skills) {
1825 Ok(skills) => {
1826 let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
1827 if bodies.is_empty() {
1828 None
1829 } else {
1830 Some(bodies)
1831 }
1832 }
1833 Err(e) => {
1834 tracing::warn!(error = %e, "skill filtering failed for sub-agent");
1835 None
1836 }
1837 }
1838 }
1839
1840 fn build_spawn_context(
1842 &self,
1843 cfg: &zeph_config::SubAgentConfig,
1844 ) -> zeph_subagent::SpawnContext {
1845 zeph_subagent::SpawnContext {
1846 parent_messages: self.extract_parent_messages(cfg),
1847 parent_cancel: Some(self.lifecycle.cancel_token.clone()),
1848 parent_provider_name: {
1849 let name = &self.runtime.active_provider_name;
1850 if name.is_empty() {
1851 None
1852 } else {
1853 Some(name.clone())
1854 }
1855 },
1856 spawn_depth: self.runtime.spawn_depth,
1857 mcp_tool_names: self.extract_mcp_tool_names(),
1858 }
1859 }
1860
1861 fn extract_parent_messages(
1866 &self,
1867 config: &zeph_config::SubAgentConfig,
1868 ) -> Vec<zeph_llm::provider::Message> {
1869 use zeph_llm::provider::Role;
1870 if config.context_window_turns == 0 {
1871 return Vec::new();
1872 }
1873 let non_system: Vec<_> = self
1874 .msg
1875 .messages
1876 .iter()
1877 .filter(|m| m.role != Role::System)
1878 .cloned()
1879 .collect();
1880 let take_count = config.context_window_turns * 2;
1881 let start = non_system.len().saturating_sub(take_count);
1882 let mut msgs = non_system[start..].to_vec();
1883
1884 let max_chars = 128_000usize / 4; let mut total_chars: usize = 0;
1887 let mut keep = msgs.len();
1888 for (i, m) in msgs.iter().enumerate() {
1889 total_chars += m.content.len();
1890 if total_chars > max_chars {
1891 keep = i;
1892 break;
1893 }
1894 }
1895 if keep < msgs.len() {
1896 tracing::info!(
1897 kept = keep,
1898 requested = config.context_window_turns * 2,
1899 "[subagent] truncated parent history from {} to {} turns due to token budget",
1900 config.context_window_turns * 2,
1901 keep
1902 );
1903 msgs.truncate(keep);
1904 }
1905 msgs
1906 }
1907
1908 fn extract_mcp_tool_names(&self) -> Vec<String> {
1910 self.tool_executor
1911 .tool_definitions_erased()
1912 .into_iter()
1913 .filter(|t| t.id.starts_with("mcp_"))
1914 .map(|t| t.id.to_string())
1915 .collect()
1916 }
1917
1918 async fn update_trust_for_reloaded_skills(
1920 &mut self,
1921 all_meta: &[zeph_skills::loader::SkillMeta],
1922 ) {
1923 let memory = self.memory_state.persistence.memory.clone();
1925 let Some(memory) = memory else {
1926 return;
1927 };
1928 let trust_cfg = self.skill_state.trust_config.clone();
1929 let managed_dir = self.skill_state.managed_dir.clone();
1930 for meta in all_meta {
1931 let source_kind = if managed_dir
1932 .as_ref()
1933 .is_some_and(|d| meta.skill_dir.starts_with(d))
1934 {
1935 zeph_memory::store::SourceKind::Hub
1936 } else {
1937 zeph_memory::store::SourceKind::Local
1938 };
1939 let initial_level = if matches!(source_kind, zeph_memory::store::SourceKind::Hub) {
1940 &trust_cfg.default_level
1941 } else {
1942 &trust_cfg.local_level
1943 };
1944 match zeph_skills::compute_skill_hash(&meta.skill_dir) {
1945 Ok(current_hash) => {
1946 let existing = memory
1947 .sqlite()
1948 .load_skill_trust(&meta.name)
1949 .await
1950 .ok()
1951 .flatten();
1952 let trust_level_str = if let Some(ref row) = existing {
1953 if row.blake3_hash == current_hash {
1954 row.trust_level.clone()
1955 } else {
1956 trust_cfg.hash_mismatch_level.to_string()
1957 }
1958 } else {
1959 initial_level.to_string()
1960 };
1961 let source_path = meta.skill_dir.to_str();
1962 if let Err(e) = memory
1963 .sqlite()
1964 .upsert_skill_trust(
1965 &meta.name,
1966 &trust_level_str,
1967 source_kind,
1968 None,
1969 source_path,
1970 ¤t_hash,
1971 )
1972 .await
1973 {
1974 tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
1975 }
1976 }
1977 Err(e) => {
1978 tracing::warn!("failed to compute hash for '{}': {e:#}", meta.name);
1979 }
1980 }
1981 }
1982 }
1983
1984 async fn rebuild_skill_matcher(&mut self, all_meta: &[&zeph_skills::loader::SkillMeta]) {
1986 let provider = self.embedding_provider.clone();
1987 let embed_timeout = std::time::Duration::from_secs(self.runtime.timeouts.embedding_seconds);
1988 let embed_fn = move |text: &str| -> zeph_skills::matcher::EmbedFuture {
1989 let owned = text.to_owned();
1990 let p = provider.clone();
1991 Box::pin(async move {
1992 if let Ok(result) = tokio::time::timeout(embed_timeout, p.embed(&owned)).await {
1993 result
1994 } else {
1995 tracing::warn!(
1996 timeout_secs = embed_timeout.as_secs(),
1997 "skill matcher: embedding timed out"
1998 );
1999 Err(zeph_llm::LlmError::Timeout)
2000 }
2001 })
2002 };
2003
2004 let needs_inmemory_rebuild = !self
2005 .skill_state
2006 .matcher
2007 .as_ref()
2008 .is_some_and(SkillMatcherBackend::is_qdrant);
2009
2010 if needs_inmemory_rebuild {
2011 self.skill_state.matcher = SkillMatcher::new(all_meta, embed_fn)
2012 .await
2013 .map(SkillMatcherBackend::InMemory);
2014 } else if let Some(ref mut backend) = self.skill_state.matcher {
2015 let _ = self.channel.send_status("syncing skill index...").await;
2016 let on_progress: Option<Box<dyn Fn(usize, usize) + Send>> = self
2017 .session
2018 .status_tx
2019 .clone()
2020 .map(|tx| -> Box<dyn Fn(usize, usize) + Send> {
2021 Box::new(move |completed, total| {
2022 let msg = format!("Syncing skills: {completed}/{total}");
2023 let _ = tx.send(msg);
2024 })
2025 });
2026 if let Err(e) = backend
2027 .sync(
2028 all_meta,
2029 &self.skill_state.embedding_model,
2030 embed_fn,
2031 on_progress,
2032 )
2033 .await
2034 {
2035 tracing::warn!("failed to sync skill embeddings: {e:#}");
2036 }
2037 }
2038
2039 if self.skill_state.hybrid_search {
2040 let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
2041 let _ = self.channel.send_status("rebuilding search index...").await;
2042 self.skill_state.rebuild_bm25(&descs);
2043 }
2044 }
2045
2046 #[cfg_attr(
2047 feature = "profiling",
2048 tracing::instrument(name = "skill.hot_reload", skip_all)
2049 )]
2050 async fn reload_skills(&mut self) {
2051 let new_registry = SkillRegistry::load(&self.skill_state.skill_paths);
2052 if new_registry.fingerprint() == self.skill_state.fingerprint() {
2053 return;
2054 }
2055 let _ = self.channel.send_status("reloading skills...").await;
2056 *self.skill_state.registry.write() = new_registry;
2057
2058 let all_meta = self
2059 .skill_state
2060 .registry
2061 .read()
2062 .all_meta()
2063 .into_iter()
2064 .cloned()
2065 .collect::<Vec<_>>();
2066
2067 self.update_trust_for_reloaded_skills(&all_meta).await;
2068
2069 let all_meta_refs = all_meta.iter().collect::<Vec<_>>();
2070 self.rebuild_skill_matcher(&all_meta_refs).await;
2071
2072 let all_skills: Vec<Skill> = {
2073 let reg = self.skill_state.registry.read();
2074 reg.all_meta()
2075 .iter()
2076 .filter_map(|m| reg.get_skill(&m.name).ok())
2077 .collect()
2078 };
2079 let trust_map = self.build_skill_trust_map().await;
2080 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
2081 let skills_prompt = SkillState::rebuild_prompt(&all_skills, &trust_map, &empty_health);
2082 self.skill_state
2083 .last_skills_prompt
2084 .clone_from(&skills_prompt);
2085 let system_prompt = build_system_prompt(&skills_prompt, None);
2086 if let Some(msg) = self.msg.messages.first_mut() {
2087 msg.content = system_prompt;
2088 }
2089
2090 let _ = self.channel.send_status("").await;
2091 tracing::info!(
2092 "reloaded {} skill(s)",
2093 self.skill_state.registry.read().all_meta().len()
2094 );
2095 }
2096
2097 fn reload_instructions(&mut self) {
2098 if let Some(ref mut rx) = self.instructions.reload_rx {
2100 while rx.try_recv().is_ok() {}
2101 }
2102 let Some(ref state) = self.instructions.reload_state else {
2103 return;
2104 };
2105 let new_blocks = crate::instructions::load_instructions(
2106 &state.base_dir,
2107 &state.provider_kinds,
2108 &state.explicit_files,
2109 state.auto_detect,
2110 );
2111 let old_sources: std::collections::HashSet<_> =
2112 self.instructions.blocks.iter().map(|b| &b.source).collect();
2113 let new_sources: std::collections::HashSet<_> =
2114 new_blocks.iter().map(|b| &b.source).collect();
2115 for added in new_sources.difference(&old_sources) {
2116 tracing::info!(path = %added.display(), "instruction file added");
2117 }
2118 for removed in old_sources.difference(&new_sources) {
2119 tracing::info!(path = %removed.display(), "instruction file removed");
2120 }
2121 tracing::info!(
2122 old_count = self.instructions.blocks.len(),
2123 new_count = new_blocks.len(),
2124 "reloaded instruction files"
2125 );
2126 self.instructions.blocks = new_blocks;
2127 }
2128
2129 fn reload_config(&mut self) {
2130 let Some(ref path) = self.lifecycle.config_path else {
2131 return;
2132 };
2133 let config = match Config::load(path) {
2134 Ok(c) => c,
2135 Err(e) => {
2136 tracing::warn!("config reload failed: {e:#}");
2137 return;
2138 }
2139 };
2140
2141 let budget_tokens = resolve_context_budget(&config, &self.provider);
2142 self.runtime.security = config.security;
2143 self.runtime.timeouts = config.timeouts;
2144 self.runtime.redact_credentials = config.memory.redact_credentials;
2145 self.memory_state.persistence.history_limit = config.memory.history_limit;
2146 self.memory_state.persistence.recall_limit = config.memory.semantic.recall_limit;
2147 self.memory_state.compaction.summarization_threshold =
2148 config.memory.summarization_threshold;
2149 self.skill_state.max_active_skills = config.skills.max_active_skills;
2150 self.skill_state.disambiguation_threshold = config.skills.disambiguation_threshold;
2151 self.skill_state.min_injection_score = config.skills.min_injection_score;
2152 self.skill_state.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
2153 self.skill_state.hybrid_search = config.skills.hybrid_search;
2154 self.skill_state.two_stage_matching = config.skills.two_stage_matching;
2155 self.skill_state.confusability_threshold =
2156 config.skills.confusability_threshold.clamp(0.0, 1.0);
2157 config
2158 .skills
2159 .generation_provider
2160 .as_str()
2161 .clone_into(&mut self.skill_state.generation_provider_name);
2162 self.skill_state.generation_output_dir =
2163 config.skills.generation_output_dir.as_deref().map(|p| {
2164 if let Some(stripped) = p.strip_prefix("~/") {
2165 dirs::home_dir()
2166 .map_or_else(|| std::path::PathBuf::from(p), |h| h.join(stripped))
2167 } else {
2168 std::path::PathBuf::from(p)
2169 }
2170 });
2171
2172 self.context_manager.budget = Some(
2173 ContextBudget::new(budget_tokens, 0.20).with_graph_enabled(config.memory.graph.enabled),
2174 );
2175
2176 {
2177 let graph_cfg = &config.memory.graph;
2178 if graph_cfg.rpe.enabled {
2179 if self.memory_state.extraction.rpe_router.is_none() {
2181 self.memory_state.extraction.rpe_router =
2182 Some(std::sync::Mutex::new(zeph_memory::RpeRouter::new(
2183 graph_cfg.rpe.threshold,
2184 graph_cfg.rpe.max_skip_turns,
2185 )));
2186 }
2187 } else {
2188 self.memory_state.extraction.rpe_router = None;
2189 }
2190 self.memory_state.extraction.graph_config = graph_cfg.clone();
2191 }
2192 self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
2193 self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
2194 self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
2195 self.context_manager.compaction_cooldown_turns = config.memory.compaction_cooldown_turns;
2196 self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
2197 self.context_manager.compression = config.memory.compression.clone();
2198 self.context_manager.routing = config.memory.store_routing.clone();
2199 self.context_manager.store_routing_provider = if config
2201 .memory
2202 .store_routing
2203 .routing_classifier_provider
2204 .is_empty()
2205 {
2206 None
2207 } else {
2208 let resolved = self.resolve_background_provider(
2209 &config.memory.store_routing.routing_classifier_provider,
2210 );
2211 Some(std::sync::Arc::new(resolved))
2212 };
2213 self.memory_state.persistence.cross_session_score_threshold =
2214 config.memory.cross_session_score_threshold;
2215
2216 self.index.repo_map_tokens = config.index.repo_map_tokens;
2217 self.index.repo_map_ttl = std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
2218
2219 tracing::info!("config reloaded");
2220 }
2221
2222 #[allow(clippy::too_many_lines)]
2233 fn maybe_sidequest_eviction(&mut self) {
2234 use zeph_llm::provider::{Message, MessageMetadata, Role};
2235
2236 if self.sidequest.config.enabled {
2240 use crate::config::PruningStrategy;
2241 if !matches!(
2242 self.context_manager.compression.pruning_strategy,
2243 PruningStrategy::Reactive
2244 ) {
2245 tracing::warn!(
2246 strategy = ?self.context_manager.compression.pruning_strategy,
2247 "sidequest is enabled alongside a non-Reactive pruning strategy; \
2248 consider disabling sidequest.enabled to avoid redundant eviction"
2249 );
2250 }
2251 }
2252
2253 if self.focus.is_active() {
2255 tracing::debug!("sidequest: skipping — focus session active");
2256 self.compression.pending_sidequest_result = None;
2258 return;
2259 }
2260
2261 if let Some(handle) = self.compression.pending_sidequest_result.take() {
2263 use futures::FutureExt as _;
2265 match handle.now_or_never() {
2266 Some(Ok(Some(evicted_indices))) if !evicted_indices.is_empty() => {
2267 let cursors_snapshot = self.sidequest.tool_output_cursors.clone();
2268 let freed = self.sidequest.apply_eviction(
2269 &mut self.msg.messages,
2270 &evicted_indices,
2271 &self.metrics.token_counter,
2272 );
2273 if freed > 0 {
2274 self.recompute_prompt_tokens();
2275 self.context_manager.compaction =
2278 crate::agent::context_manager::CompactionState::CompactedThisTurn {
2279 cooldown: 0,
2280 };
2281 tracing::info!(
2282 freed_tokens = freed,
2283 evicted_cursors = evicted_indices.len(),
2284 pass = self.sidequest.passes_run,
2285 "sidequest eviction complete"
2286 );
2287 if let Some(ref d) = self.debug_state.debug_dumper {
2288 d.dump_sidequest_eviction(&cursors_snapshot, &evicted_indices, freed);
2289 }
2290 if let Some(ref tx) = self.session.status_tx {
2291 let _ = tx.send(format!("SideQuest evicted {freed} tokens"));
2292 }
2293 } else {
2294 if let Some(ref tx) = self.session.status_tx {
2296 let _ = tx.send(String::new());
2297 }
2298 }
2299 }
2300 Some(Ok(None | Some(_))) => {
2301 tracing::debug!("sidequest: pending result: no cursors to evict");
2302 if let Some(ref tx) = self.session.status_tx {
2303 let _ = tx.send(String::new());
2304 }
2305 }
2306 Some(Err(e)) => {
2307 tracing::debug!("sidequest: background task panicked: {e}");
2308 if let Some(ref tx) = self.session.status_tx {
2309 let _ = tx.send(String::new());
2310 }
2311 }
2312 None => {
2313 tracing::debug!(
2317 "sidequest: background LLM task not yet complete, rescheduling"
2318 );
2319 }
2320 }
2321 }
2322
2323 self.sidequest
2325 .rebuild_cursors(&self.msg.messages, &self.metrics.token_counter);
2326
2327 if self.sidequest.tool_output_cursors.is_empty() {
2328 tracing::debug!("sidequest: no eligible cursors");
2329 return;
2330 }
2331
2332 let prompt = self.sidequest.build_eviction_prompt();
2333 let max_eviction_ratio = self.sidequest.config.max_eviction_ratio;
2334 let n_cursors = self.sidequest.tool_output_cursors.len();
2335 let provider = self.summary_or_primary_provider().clone();
2337
2338 let handle = tokio::spawn(async move {
2340 let msgs = [Message {
2341 role: Role::User,
2342 content: prompt,
2343 parts: vec![],
2344 metadata: MessageMetadata::default(),
2345 }];
2346 let response =
2347 match tokio::time::timeout(std::time::Duration::from_secs(5), provider.chat(&msgs))
2348 .await
2349 {
2350 Ok(Ok(r)) => r,
2351 Ok(Err(e)) => {
2352 tracing::debug!("sidequest bg: LLM call failed: {e:#}");
2353 return None;
2354 }
2355 Err(_) => {
2356 tracing::debug!("sidequest bg: LLM call timed out");
2357 return None;
2358 }
2359 };
2360
2361 let start = response.find('{')?;
2362 let end = response.rfind('}')?;
2363 if start > end {
2364 return None;
2365 }
2366 let json_slice = &response[start..=end];
2367 let parsed: sidequest::EvictionResponse = serde_json::from_str(json_slice).ok()?;
2368 let mut valid: Vec<usize> = parsed
2369 .del_cursors
2370 .into_iter()
2371 .filter(|&c| c < n_cursors)
2372 .collect();
2373 valid.sort_unstable();
2374 valid.dedup();
2375 #[allow(
2376 clippy::cast_precision_loss,
2377 clippy::cast_possible_truncation,
2378 clippy::cast_sign_loss
2379 )]
2380 let max_evict = ((n_cursors as f32) * max_eviction_ratio).ceil() as usize;
2381 valid.truncate(max_evict);
2382 Some(valid)
2383 });
2384
2385 self.compression.pending_sidequest_result = Some(handle);
2386 tracing::debug!("sidequest: background LLM eviction task spawned");
2387 if let Some(ref tx) = self.session.status_tx {
2388 let _ = tx.send("SideQuest: scoring tool outputs...".into());
2389 }
2390 }
2391
2392 pub(crate) async fn check_cwd_changed(&mut self) {
2398 let current = match std::env::current_dir() {
2399 Ok(p) => p,
2400 Err(e) => {
2401 tracing::warn!("check_cwd_changed: failed to get cwd: {e}");
2402 return;
2403 }
2404 };
2405 if current == self.lifecycle.last_known_cwd {
2406 return;
2407 }
2408 let old_cwd = std::mem::replace(&mut self.lifecycle.last_known_cwd, current.clone());
2409 self.session.env_context.working_dir = current.display().to_string();
2410
2411 tracing::info!(
2412 old = %old_cwd.display(),
2413 new = %current.display(),
2414 "working directory changed"
2415 );
2416
2417 let _ = self
2418 .channel
2419 .send_status("Working directory changed\u{2026}")
2420 .await;
2421
2422 let hooks = self.session.hooks_config.cwd_changed.clone();
2423 if !hooks.is_empty() {
2424 let mut env = std::collections::HashMap::new();
2425 env.insert("ZEPH_OLD_CWD".to_owned(), old_cwd.display().to_string());
2426 env.insert("ZEPH_NEW_CWD".to_owned(), current.display().to_string());
2427 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env).await {
2428 tracing::warn!(error = %e, "CwdChanged hook failed");
2429 }
2430 }
2431
2432 let _ = self.channel.send_status("").await;
2433 }
2434
2435 pub(crate) async fn handle_file_changed(
2437 &mut self,
2438 event: crate::file_watcher::FileChangedEvent,
2439 ) {
2440 tracing::info!(path = %event.path.display(), "file changed");
2441
2442 let _ = self
2443 .channel
2444 .send_status("Running file-change hook\u{2026}")
2445 .await;
2446
2447 let hooks = self.session.hooks_config.file_changed_hooks.clone();
2448 if !hooks.is_empty() {
2449 let mut env = std::collections::HashMap::new();
2450 env.insert(
2451 "ZEPH_CHANGED_PATH".to_owned(),
2452 event.path.display().to_string(),
2453 );
2454 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env).await {
2455 tracing::warn!(error = %e, "FileChanged hook failed");
2456 }
2457 }
2458
2459 let _ = self.channel.send_status("").await;
2460 }
2461}
2462pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
2463 while !*rx.borrow_and_update() {
2464 if rx.changed().await.is_err() {
2465 std::future::pending::<()>().await;
2466 }
2467 }
2468}
2469
2470pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
2471 match rx {
2472 Some(inner) => {
2473 if let Some(v) = inner.recv().await {
2474 Some(v)
2475 } else {
2476 *rx = None;
2477 std::future::pending().await
2478 }
2479 }
2480 None => std::future::pending().await,
2481 }
2482}
2483
2484pub(crate) fn resolve_context_budget(config: &Config, provider: &AnyProvider) -> usize {
2490 let tokens = if config.memory.auto_budget && config.memory.context_budget_tokens == 0 {
2491 if let Some(ctx_size) = provider.context_window() {
2492 tracing::info!(
2493 model_context = ctx_size,
2494 "auto-configured context budget on reload"
2495 );
2496 ctx_size
2497 } else {
2498 0
2499 }
2500 } else {
2501 config.memory.context_budget_tokens
2502 };
2503 if tokens == 0 {
2504 tracing::warn!(
2505 "context_budget_tokens resolved to 0 on reload — using fallback of 128000 tokens"
2506 );
2507 128_000
2508 } else {
2509 tokens
2510 }
2511}
2512
2513#[cfg(test)]
2514mod tests;
2515
2516#[cfg(test)]
2517pub(crate) use tests::agent_tests;