1mod agent_access_impl;
5pub(crate) mod agent_supervisor;
6mod autodream;
7mod builder;
8mod command_context_impls;
9pub(crate) mod compaction_strategy;
10pub(super) mod compression_feedback;
11mod context;
12mod context_impls;
13pub(crate) mod context_manager;
14mod corrections;
15pub mod error;
16mod experiment_cmd;
17pub(super) mod feedback_detector;
18pub(crate) mod focus;
19mod index;
20mod learning;
21pub(crate) mod learning_engine;
22mod log_commands;
23mod loop_event;
24mod lsp_commands;
25mod magic_docs;
26mod mcp;
27mod message_queue;
28mod microcompact;
29mod model_commands;
30mod persistence;
31#[cfg(feature = "scheduler")]
32mod plan;
33mod policy_commands;
34mod provider_cmd;
35pub(crate) mod rate_limiter;
36#[cfg(feature = "scheduler")]
37mod scheduler_commands;
38#[cfg(feature = "scheduler")]
39mod scheduler_loop;
40pub mod session_config;
41mod session_digest;
42pub(crate) mod sidequest;
43mod skill_management;
44pub mod slash_commands;
45pub mod speculative;
46pub(crate) mod state;
47pub(crate) mod tool_execution;
48pub(crate) mod tool_orchestrator;
49mod trust_commands;
50pub mod turn;
51mod utils;
52pub(crate) mod vigil;
53
54use std::collections::{HashMap, VecDeque};
55use std::sync::Arc;
56
57use parking_lot::RwLock;
58
59use tokio::sync::{mpsc, watch};
60use tokio_util::sync::CancellationToken;
61use zeph_llm::any::AnyProvider;
62use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
63use zeph_memory::TokenCounter;
64use zeph_memory::semantic::SemanticMemory;
65use zeph_skills::loader::Skill;
66use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
67use zeph_skills::prompt::format_skills_prompt;
68use zeph_skills::registry::SkillRegistry;
69use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
70
71use crate::channel::Channel;
72use crate::config::Config;
73use crate::context::{ContextBudget, build_system_prompt};
74use zeph_common::text::estimate_tokens;
75
76use loop_event::LoopEvent;
77use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, detect_image_mime};
78use state::CompressionState;
79use state::{
80 DebugState, ExperimentState, FeedbackState, IndexState, InstructionState, LifecycleState,
81 McpState, MemoryState, MessageState, MetricsState, OrchestrationState, ProviderState,
82 RuntimeConfig, SecurityState, SessionState, SkillState, ToolState,
83};
84
85pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
86pub(crate) const DOCUMENT_RAG_PREFIX: &str = "## Relevant documents\n";
87pub(crate) const RECALL_PREFIX: &str = "[semantic recall]\n";
88pub(crate) const CODE_CONTEXT_PREFIX: &str = "[code context]\n";
89pub(crate) const SUMMARY_PREFIX: &str = "[conversation summaries]\n";
90pub(crate) const CROSS_SESSION_PREFIX: &str = "[cross-session context]\n";
91pub(crate) const CORRECTIONS_PREFIX: &str = "[past corrections]\n";
92pub(crate) const GRAPH_FACTS_PREFIX: &str = "[known facts]\n";
93pub(crate) const SCHEDULED_TASK_PREFIX: &str = "Execute the following scheduled task now: ";
94pub(crate) const SESSION_DIGEST_PREFIX: &str = "[Session digest from previous interaction]\n";
95pub(crate) const LSP_NOTE_PREFIX: &str = "[lsp ";
100pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
101
102pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
103 use std::fmt::Write;
104 let capacity = "[tool output: ".len()
105 + tool_name.len()
106 + "]\n```\n".len()
107 + body.len()
108 + TOOL_OUTPUT_SUFFIX.len();
109 let mut buf = String::with_capacity(capacity);
110 let _ = write!(
111 buf,
112 "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
113 );
114 buf
115}
116
117pub struct Agent<C: Channel> {
149 provider: AnyProvider,
150 embedding_provider: AnyProvider,
155 channel: C,
156 pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
157 pub(super) msg: MessageState,
158 pub(super) memory_state: MemoryState,
159 pub(super) skill_state: SkillState,
160 pub(super) context_manager: context_manager::ContextManager,
161 pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
162 pub(super) learning_engine: learning_engine::LearningEngine,
163 pub(super) feedback: FeedbackState,
164 pub(super) runtime: RuntimeConfig,
165 pub(super) mcp: McpState,
166 pub(super) index: IndexState,
167 pub(super) session: SessionState,
168 pub(super) debug_state: DebugState,
169 pub(super) instructions: InstructionState,
170 pub(super) security: SecurityState,
171 pub(super) experiments: ExperimentState,
172 pub(super) compression: CompressionState,
173 pub(super) lifecycle: LifecycleState,
174 pub(super) providers: ProviderState,
175 pub(super) metrics: MetricsState,
176 pub(super) orchestration: OrchestrationState,
177 pub(super) focus: focus::FocusState,
179 pub(super) sidequest: sidequest::SidequestState,
181 pub(super) tool_state: ToolState,
183}
184
185impl<C: Channel> Agent<C> {
186 #[must_use]
210 pub fn new(
211 provider: AnyProvider,
212 channel: C,
213 registry: SkillRegistry,
214 matcher: Option<SkillMatcherBackend>,
215 max_active_skills: usize,
216 tool_executor: impl ToolExecutor + 'static,
217 ) -> Self {
218 let registry = Arc::new(RwLock::new(registry));
219 let embedding_provider = provider.clone();
220 Self::new_with_registry_arc(
221 provider,
222 embedding_provider,
223 channel,
224 registry,
225 matcher,
226 max_active_skills,
227 tool_executor,
228 )
229 }
230
231 #[must_use]
238 #[allow(clippy::too_many_lines)] pub fn new_with_registry_arc(
240 provider: AnyProvider,
241 embedding_provider: AnyProvider,
242 channel: C,
243 registry: Arc<RwLock<SkillRegistry>>,
244 matcher: Option<SkillMatcherBackend>,
245 max_active_skills: usize,
246 tool_executor: impl ToolExecutor + 'static,
247 ) -> Self {
248 debug_assert!(max_active_skills > 0, "max_active_skills must be > 0");
249 let all_skills: Vec<Skill> = {
250 let reg = registry.read();
251 reg.all_meta()
252 .iter()
253 .filter_map(|m| reg.get_skill(&m.name).ok())
254 .collect()
255 };
256 let empty_trust = HashMap::new();
257 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
258 let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
259 let system_prompt = build_system_prompt(&skills_prompt, None);
260 tracing::debug!(len = system_prompt.len(), "initial system prompt built");
261 tracing::trace!(prompt = %system_prompt, "full system prompt");
262
263 let initial_prompt_tokens = estimate_tokens(&system_prompt) as u64;
264 let token_counter = Arc::new(TokenCounter::new());
265 Self {
266 provider,
267 embedding_provider,
268 channel,
269 tool_executor: Arc::new(tool_executor),
270 msg: MessageState {
271 messages: vec![Message {
272 role: Role::System,
273 content: system_prompt,
274 parts: vec![],
275 metadata: MessageMetadata::default(),
276 }],
277 message_queue: VecDeque::new(),
278 pending_image_parts: Vec::new(),
279 last_persisted_message_id: None,
280 deferred_db_hide_ids: Vec::new(),
281 deferred_db_summaries: Vec::new(),
282 },
283 memory_state: MemoryState::default(),
284 skill_state: SkillState::new(registry, matcher, max_active_skills, skills_prompt),
285 context_manager: context_manager::ContextManager::new(),
286 tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
287 learning_engine: learning_engine::LearningEngine::new(),
288 feedback: FeedbackState::default(),
289 debug_state: DebugState::default(),
290 runtime: RuntimeConfig::default(),
291 mcp: McpState::default(),
292 index: IndexState::default(),
293 session: SessionState::new(),
294 instructions: InstructionState::default(),
295 security: SecurityState::default(),
296 experiments: ExperimentState::new(),
297 compression: CompressionState::default(),
298 lifecycle: LifecycleState::new(),
299 providers: ProviderState::new(initial_prompt_tokens),
300 metrics: MetricsState::new(token_counter),
301 orchestration: OrchestrationState::default(),
302 focus: focus::FocusState::default(),
303 sidequest: sidequest::SidequestState::default(),
304 tool_state: ToolState::default(),
305 }
306 }
307
308 pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
313 let Some(mgr) = &mut self.orchestration.subagent_manager else {
314 return vec![];
315 };
316
317 let finished: Vec<String> = mgr
318 .statuses()
319 .into_iter()
320 .filter_map(|(id, status)| {
321 if matches!(
322 status.state,
323 zeph_subagent::SubAgentState::Completed
324 | zeph_subagent::SubAgentState::Failed
325 | zeph_subagent::SubAgentState::Canceled
326 ) {
327 Some(id)
328 } else {
329 None
330 }
331 })
332 .collect();
333
334 let mut results = vec![];
335 for task_id in finished {
336 match mgr.collect(&task_id).await {
337 Ok(result) => results.push((task_id, result)),
338 Err(e) => {
339 tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
340 }
341 }
342 }
343 results
344 }
345
346 async fn call_llm_for_session_summary(
355 &self,
356 chat_messages: &[Message],
357 ) -> Option<zeph_memory::StructuredSummary> {
358 let timeout_dur = std::time::Duration::from_secs(
359 self.memory_state.compaction.shutdown_summary_timeout_secs,
360 );
361 match tokio::time::timeout(
362 timeout_dur,
363 self.provider
364 .chat_typed_erased::<zeph_memory::StructuredSummary>(chat_messages),
365 )
366 .await
367 {
368 Ok(Ok(s)) => Some(s),
369 Ok(Err(e)) => {
370 tracing::warn!(
371 "shutdown summary: structured LLM call failed, falling back to plain: {e:#}"
372 );
373 self.plain_text_summary_fallback(chat_messages, timeout_dur)
374 .await
375 }
376 Err(_) => {
377 tracing::warn!(
378 "shutdown summary: structured LLM call timed out after {}s, falling back to plain",
379 self.memory_state.compaction.shutdown_summary_timeout_secs
380 );
381 self.plain_text_summary_fallback(chat_messages, timeout_dur)
382 .await
383 }
384 }
385 }
386
387 async fn plain_text_summary_fallback(
388 &self,
389 chat_messages: &[Message],
390 timeout_dur: std::time::Duration,
391 ) -> Option<zeph_memory::StructuredSummary> {
392 match tokio::time::timeout(timeout_dur, self.provider.chat(chat_messages)).await {
393 Ok(Ok(plain)) => Some(zeph_memory::StructuredSummary {
394 summary: plain,
395 key_facts: vec![],
396 entities: vec![],
397 }),
398 Ok(Err(e)) => {
399 tracing::warn!("shutdown summary: plain LLM fallback failed: {e:#}");
400 None
401 }
402 Err(_) => {
403 tracing::warn!("shutdown summary: plain LLM fallback timed out");
404 None
405 }
406 }
407 }
408
409 async fn flush_orphaned_tool_use_on_shutdown(&mut self) {
414 use zeph_llm::provider::{MessagePart, Role};
415
416 let msgs = &self.msg.messages;
420 let Some(asst_idx) = msgs.iter().rposition(|m| m.role == Role::Assistant) else {
422 return;
423 };
424 let asst_msg = &msgs[asst_idx];
425 let tool_use_ids: Vec<(&str, &str, &serde_json::Value)> = asst_msg
426 .parts
427 .iter()
428 .filter_map(|p| {
429 if let MessagePart::ToolUse { id, name, input } = p {
430 Some((id.as_str(), name.as_str(), input))
431 } else {
432 None
433 }
434 })
435 .collect();
436 if tool_use_ids.is_empty() {
437 return;
438 }
439
440 let paired_ids: std::collections::HashSet<&str> = msgs
442 .get(asst_idx + 1..)
443 .into_iter()
444 .flatten()
445 .filter(|m| m.role == Role::User)
446 .flat_map(|m| m.parts.iter())
447 .filter_map(|p| {
448 if let MessagePart::ToolResult { tool_use_id, .. } = p {
449 Some(tool_use_id.as_str())
450 } else {
451 None
452 }
453 })
454 .collect();
455
456 let unpaired: Vec<zeph_llm::provider::ToolUseRequest> = tool_use_ids
457 .iter()
458 .filter(|(id, _, _)| !paired_ids.contains(*id))
459 .map(|(id, name, input)| zeph_llm::provider::ToolUseRequest {
460 id: (*id).to_owned(),
461 name: (*name).to_owned().into(),
462 input: (*input).clone(),
463 })
464 .collect();
465
466 if unpaired.is_empty() {
467 return;
468 }
469
470 tracing::info!(
471 count = unpaired.len(),
472 "shutdown: persisting tombstone ToolResults for unpaired in-flight tool calls"
473 );
474 self.persist_cancelled_tool_results(&unpaired).await;
475 }
476
477 async fn maybe_store_shutdown_summary(&mut self) {
487 if !self.memory_state.compaction.shutdown_summary {
488 return;
489 }
490 let Some(memory) = self.memory_state.persistence.memory.clone() else {
491 return;
492 };
493 let Some(conversation_id) = self.memory_state.persistence.conversation_id else {
494 return;
495 };
496
497 match memory.has_session_summary(conversation_id).await {
499 Ok(true) => {
500 tracing::debug!("shutdown summary: session already has a summary, skipping");
501 return;
502 }
503 Ok(false) => {}
504 Err(e) => {
505 tracing::warn!("shutdown summary: failed to check existing summary: {e:#}");
506 return;
507 }
508 }
509
510 let user_count = self
512 .msg
513 .messages
514 .iter()
515 .skip(1)
516 .filter(|m| m.role == Role::User)
517 .count();
518 if user_count < self.memory_state.compaction.shutdown_summary_min_messages {
519 tracing::debug!(
520 user_count,
521 min = self.memory_state.compaction.shutdown_summary_min_messages,
522 "shutdown summary: too few user messages, skipping"
523 );
524 return;
525 }
526
527 let _ = self.channel.send_status("Saving session summary...").await;
529
530 let max = self.memory_state.compaction.shutdown_summary_max_messages;
532 if max == 0 {
533 tracing::debug!("shutdown summary: max_messages=0, skipping");
534 return;
535 }
536 let non_system: Vec<_> = self.msg.messages.iter().skip(1).collect();
537 let slice = if non_system.len() > max {
538 &non_system[non_system.len() - max..]
539 } else {
540 &non_system[..]
541 };
542
543 let msgs_for_prompt: Vec<(zeph_memory::MessageId, String, String)> = slice
544 .iter()
545 .map(|m| {
546 let role = match m.role {
547 Role::User => "user".to_owned(),
548 Role::Assistant => "assistant".to_owned(),
549 Role::System => "system".to_owned(),
550 };
551 (zeph_memory::MessageId(0), role, m.content.clone())
552 })
553 .collect();
554
555 let prompt = zeph_memory::build_summarization_prompt(&msgs_for_prompt);
556 let chat_messages = vec![Message {
557 role: Role::User,
558 content: prompt,
559 parts: vec![],
560 metadata: MessageMetadata::default(),
561 }];
562
563 let Some(structured) = self.call_llm_for_session_summary(&chat_messages).await else {
564 let _ = self.channel.send_status("").await;
565 return;
566 };
567
568 if let Err(e) = memory
569 .store_shutdown_summary(conversation_id, &structured.summary, &structured.key_facts)
570 .await
571 {
572 tracing::warn!("shutdown summary: storage failed: {e:#}");
573 } else {
574 tracing::info!(
575 conversation_id = conversation_id.0,
576 "shutdown summary stored"
577 );
578 }
579
580 let _ = self.channel.send_status("").await;
582 }
583
584 pub async fn shutdown(&mut self) {
600 self.channel.send("Shutting down...").await.ok();
601
602 self.provider.save_router_state();
604
605 if let Some(ref advisor) = self.orchestration.topology_advisor
607 && let Err(e) = advisor.save()
608 {
609 tracing::warn!(error = %e, "adaptorch: failed to persist state");
610 }
611
612 if let Some(ref mut mgr) = self.orchestration.subagent_manager {
613 mgr.shutdown_all();
614 }
615
616 if let Some(ref manager) = self.mcp.manager {
617 manager.shutdown_all_shared().await;
618 }
619
620 if let Some(turns) = self.context_manager.turns_since_last_hard_compaction {
624 self.update_metrics(|m| {
625 m.compaction_turns_after_hard.push(turns);
626 });
627 self.context_manager.turns_since_last_hard_compaction = None;
628 }
629
630 if let Some(ref tx) = self.metrics.metrics_tx {
631 let m = tx.borrow();
632 if m.filter_applications > 0 {
633 #[allow(clippy::cast_precision_loss)]
634 let pct = if m.filter_raw_tokens > 0 {
635 m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
636 } else {
637 0.0
638 };
639 tracing::info!(
640 raw_tokens = m.filter_raw_tokens,
641 saved_tokens = m.filter_saved_tokens,
642 applications = m.filter_applications,
643 "tool output filtering saved ~{} tokens ({pct:.0}%)",
644 m.filter_saved_tokens,
645 );
646 }
647 if m.compaction_hard_count > 0 {
648 tracing::info!(
649 hard_compactions = m.compaction_hard_count,
650 turns_after_hard = ?m.compaction_turns_after_hard,
651 "hard compaction trajectory"
652 );
653 }
654 }
655
656 self.flush_orphaned_tool_use_on_shutdown().await;
660
661 self.lifecycle.supervisor.abort_all();
662
663 self.maybe_store_shutdown_summary().await;
664 self.maybe_store_session_digest().await;
665
666 tracing::info!("agent shutdown complete");
667 }
668
669 fn refresh_subagent_metrics(&mut self) {
676 let Some(ref mgr) = self.orchestration.subagent_manager else {
677 return;
678 };
679 let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
680 .statuses()
681 .into_iter()
682 .map(|(id, s)| {
683 let def = mgr.agents_def(&id);
684 crate::metrics::SubAgentMetrics {
685 name: def.map_or_else(|| id[..8.min(id.len())].to_owned(), |d| d.name.clone()),
686 id: id.clone(),
687 state: format!("{:?}", s.state).to_lowercase(),
688 turns_used: s.turns_used,
689 max_turns: def.map_or(20, |d| d.permissions.max_turns),
690 background: def.is_some_and(|d| d.permissions.background),
691 elapsed_secs: s.started_at.elapsed().as_secs(),
692 permission_mode: def.map_or_else(String::new, |d| {
693 use zeph_subagent::def::PermissionMode;
694 match d.permissions.permission_mode {
695 PermissionMode::Default => String::new(),
696 PermissionMode::AcceptEdits => "accept_edits".into(),
697 PermissionMode::DontAsk => "dont_ask".into(),
698 PermissionMode::BypassPermissions => "bypass_permissions".into(),
699 PermissionMode::Plan => "plan".into(),
700 }
701 }),
702 transcript_dir: mgr
703 .agent_transcript_dir(&id)
704 .map(|p| p.to_string_lossy().into_owned()),
705 }
706 })
707 .collect();
708 self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
709 }
710
711 async fn notify_completed_subagents(&mut self) -> Result<(), error::AgentError> {
713 let completed = self.poll_subagents().await;
714 for (task_id, result) in completed {
715 let notice = if result.is_empty() {
716 format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
717 } else {
718 format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
719 };
720 if let Err(e) = self.channel.send(¬ice).await {
721 tracing::warn!(error = %e, "failed to send sub-agent completion notice");
722 }
723 }
724 Ok(())
725 }
726
727 #[allow(clippy::too_many_lines)] pub async fn run(&mut self) -> Result<(), error::AgentError>
734 where
735 C: 'static,
736 {
737 if let Some(mut rx) = self.lifecycle.warmup_ready.take()
738 && !*rx.borrow()
739 {
740 let _ = rx.changed().await;
741 if !*rx.borrow() {
742 tracing::warn!("model warmup did not complete successfully");
743 }
744 }
745
746 self.load_and_cache_session_digest().await;
748 self.maybe_send_resume_recap().await;
749
750 loop {
751 self.apply_provider_override();
752 self.check_tool_refresh().await;
753 self.process_pending_elicitations().await;
754 self.refresh_subagent_metrics();
755 self.notify_completed_subagents().await?;
756 self.drain_channel();
757
758 let (text, image_parts) = if let Some(queued) = self.msg.message_queue.pop_front() {
759 self.notify_queue_count().await;
760 if queued.raw_attachments.is_empty() {
761 (queued.text, queued.image_parts)
762 } else {
763 let msg = crate::channel::ChannelMessage {
764 text: queued.text,
765 attachments: queued.raw_attachments,
766 };
767 self.resolve_message(msg).await
768 }
769 } else {
770 match self.next_event().await? {
771 None | Some(LoopEvent::Shutdown) => break,
772 Some(LoopEvent::SkillReload) => {
773 self.reload_skills().await;
774 continue;
775 }
776 Some(LoopEvent::InstructionReload) => {
777 self.reload_instructions();
778 continue;
779 }
780 Some(LoopEvent::ConfigReload) => {
781 self.reload_config();
782 continue;
783 }
784 Some(LoopEvent::UpdateNotification(msg)) => {
785 if let Err(e) = self.channel.send(&msg).await {
786 tracing::warn!("failed to send update notification: {e}");
787 }
788 continue;
789 }
790 Some(LoopEvent::ExperimentCompleted(msg)) => {
791 self.experiments.cancel = None;
792 if let Err(e) = self.channel.send(&msg).await {
793 tracing::warn!("failed to send experiment completion: {e}");
794 }
795 continue;
796 }
797 Some(LoopEvent::ScheduledTask(prompt)) => {
798 let text = format!("{SCHEDULED_TASK_PREFIX}{prompt}");
799 let msg = crate::channel::ChannelMessage {
800 text,
801 attachments: Vec::new(),
802 };
803 self.drain_channel();
804 self.resolve_message(msg).await
805 }
806 Some(LoopEvent::FileChanged(event)) => {
807 self.handle_file_changed(event).await;
808 continue;
809 }
810 Some(LoopEvent::Message(msg)) => {
811 self.drain_channel();
812 self.resolve_message(msg).await
813 }
814 }
815 };
816
817 let trimmed = text.trim();
818
819 if trimmed.starts_with('/') {
822 let slash_urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
823 if !slash_urls.is_empty() {
824 self.security.user_provided_urls.write().extend(slash_urls);
825 }
826 }
827
828 let session_impl = command_context_impls::SessionAccessImpl {
833 supports_exit: self.channel.supports_exit(),
834 };
835 let mut messages_impl = command_context_impls::MessageAccessImpl {
836 msg: &mut self.msg,
837 tool_state: &mut self.tool_state,
838 providers: &mut self.providers,
839 metrics: &self.metrics,
840 security: &mut self.security,
841 tool_orchestrator: &mut self.tool_orchestrator,
842 };
843 let mut sink_adapter = crate::channel::ChannelSinkAdapter(&mut self.channel);
845 let mut null_agent = zeph_commands::NullAgent;
847 let registry_handled = {
848 use zeph_commands::CommandRegistry;
849 use zeph_commands::handlers::debug::{
850 DebugDumpCommand, DumpFormatCommand, LogCommand,
851 };
852 use zeph_commands::handlers::help::HelpCommand;
853 use zeph_commands::handlers::session::{
854 ClearCommand, ClearQueueCommand, ExitCommand, QuitCommand, ResetCommand,
855 };
856
857 let mut reg = CommandRegistry::new();
858 reg.register(ExitCommand);
859 reg.register(QuitCommand);
860 reg.register(ClearCommand);
861 reg.register(ResetCommand);
862 reg.register(ClearQueueCommand);
863 reg.register(LogCommand);
864 reg.register(DebugDumpCommand);
865 reg.register(DumpFormatCommand);
866 reg.register(HelpCommand);
867
868 let mut ctx = zeph_commands::CommandContext {
869 sink: &mut sink_adapter,
870 debug: &mut self.debug_state,
871 messages: &mut messages_impl,
872 session: &session_impl,
873 agent: &mut null_agent,
874 };
875 reg.dispatch(&mut ctx, trimmed).await
876 };
877 match registry_handled {
878 Some(Ok(zeph_commands::CommandOutput::Exit)) => {
879 let _ = self.channel.flush_chunks().await;
880 break;
881 }
882 Some(Ok(
883 zeph_commands::CommandOutput::Continue | zeph_commands::CommandOutput::Silent,
884 )) => {
885 let _ = self.channel.flush_chunks().await;
886 continue;
887 }
888 Some(Ok(zeph_commands::CommandOutput::Message(msg))) => {
889 let _ = self.channel.send(&msg).await;
890 let _ = self.channel.flush_chunks().await;
891 continue;
892 }
893 Some(Err(e)) => return Err(error::AgentError::Other(e.0)),
894 None => {
895 }
897 }
898
899 let mut agent_null_debug = command_context_impls::NullDebugAccess;
905 let mut agent_null_messages = command_context_impls::NullMessageAccess;
906 let agent_null_session = command_context_impls::NullSessionAccess;
907 let mut agent_null_sink = zeph_commands::NullSink;
908 let agent_result: Option<
909 Result<zeph_commands::CommandOutput, zeph_commands::CommandError>,
910 > = if registry_handled.is_none() {
911 use zeph_commands::CommandRegistry;
912 use zeph_commands::handlers::{
913 agent_cmd::AgentCommand,
914 compaction::{CompactCommand, NewConversationCommand, RecapCommand},
915 experiment::ExperimentCommand,
916 lsp::LspCommand,
917 mcp::McpCommand,
918 memory::{GraphCommand, GuidelinesCommand, MemoryCommand},
919 misc::{CacheStatsCommand, ImageCommand},
920 model::{ModelCommand, ProviderCommand},
921 plan::PlanCommand,
922 policy::PolicyCommand,
923 scheduler::SchedulerCommand,
924 skill::{FeedbackCommand, SkillCommand, SkillsCommand},
925 status::{FocusCommand, GuardrailCommand, SideQuestCommand, StatusCommand},
926 };
927
928 let mut agent_reg = CommandRegistry::new();
929 agent_reg.register(MemoryCommand);
930 agent_reg.register(GraphCommand);
931 agent_reg.register(GuidelinesCommand);
932 agent_reg.register(ModelCommand);
933 agent_reg.register(ProviderCommand);
934 agent_reg.register(SkillCommand);
936 agent_reg.register(SkillsCommand);
937 agent_reg.register(FeedbackCommand);
938 agent_reg.register(McpCommand);
939 agent_reg.register(PolicyCommand);
940 agent_reg.register(SchedulerCommand);
941 agent_reg.register(LspCommand);
942 agent_reg.register(CacheStatsCommand);
944 agent_reg.register(ImageCommand);
945 agent_reg.register(StatusCommand);
946 agent_reg.register(GuardrailCommand);
947 agent_reg.register(FocusCommand);
948 agent_reg.register(SideQuestCommand);
949 agent_reg.register(AgentCommand);
950 agent_reg.register(CompactCommand);
952 agent_reg.register(NewConversationCommand);
953 agent_reg.register(RecapCommand);
954 agent_reg.register(ExperimentCommand);
955 agent_reg.register(PlanCommand);
956
957 let mut ctx = zeph_commands::CommandContext {
958 sink: &mut agent_null_sink,
959 debug: &mut agent_null_debug,
960 messages: &mut agent_null_messages,
961 session: &agent_null_session,
962 agent: self,
963 };
964 agent_reg.dispatch(&mut ctx, trimmed).await
966 } else {
967 None
968 };
969 match agent_result {
971 Some(Ok(zeph_commands::CommandOutput::Exit)) => {
972 let _ = self.channel.flush_chunks().await;
973 break;
974 }
975 Some(Ok(
976 zeph_commands::CommandOutput::Continue | zeph_commands::CommandOutput::Silent,
977 )) => {
978 let _ = self.channel.flush_chunks().await;
979 continue;
980 }
981 Some(Ok(zeph_commands::CommandOutput::Message(msg))) => {
982 let _ = self.channel.send(&msg).await;
983 let _ = self.channel.flush_chunks().await;
984 self.maybe_trigger_post_command_learning(trimmed).await;
988 continue;
989 }
990 Some(Err(e)) => return Err(error::AgentError::Other(e.0)),
991 None => {
992 }
994 }
995
996 match self.handle_builtin_command(trimmed) {
997 Some(true) => break,
998 Some(false) => continue,
999 None => {}
1000 }
1001
1002 self.process_user_message(text, image_parts).await?;
1003 }
1004
1005 self.maybe_autodream().await;
1008
1009 if let Some(ref mut tc) = self.debug_state.trace_collector {
1011 tc.finish();
1012 }
1013
1014 Ok(())
1015 }
1016
1017 fn apply_provider_override(&mut self) {
1019 if let Some(ref slot) = self.providers.provider_override
1020 && let Some(new_provider) = slot.write().take()
1021 {
1022 tracing::debug!(provider = new_provider.name(), "ACP model override applied");
1023 self.provider = new_provider;
1024 }
1025 }
1026
1027 async fn next_event(&mut self) -> Result<Option<LoopEvent>, error::AgentError> {
1035 let event = tokio::select! {
1036 result = self.channel.recv() => {
1037 return Ok(result?.map(LoopEvent::Message));
1038 }
1039 () = shutdown_signal(&mut self.lifecycle.shutdown) => {
1040 tracing::info!("shutting down");
1041 LoopEvent::Shutdown
1042 }
1043 Some(_) = recv_optional(&mut self.skill_state.skill_reload_rx) => {
1044 LoopEvent::SkillReload
1045 }
1046 Some(_) = recv_optional(&mut self.instructions.reload_rx) => {
1047 LoopEvent::InstructionReload
1048 }
1049 Some(_) = recv_optional(&mut self.lifecycle.config_reload_rx) => {
1050 LoopEvent::ConfigReload
1051 }
1052 Some(msg) = recv_optional(&mut self.lifecycle.update_notify_rx) => {
1053 LoopEvent::UpdateNotification(msg)
1054 }
1055 Some(msg) = recv_optional(&mut self.experiments.notify_rx) => {
1056 LoopEvent::ExperimentCompleted(msg)
1057 }
1058 Some(prompt) = recv_optional(&mut self.lifecycle.custom_task_rx) => {
1059 tracing::info!("scheduler: injecting custom task as agent turn");
1060 LoopEvent::ScheduledTask(prompt)
1061 }
1062 Some(event) = recv_optional(&mut self.lifecycle.file_changed_rx) => {
1063 LoopEvent::FileChanged(event)
1064 }
1065 };
1066 Ok(Some(event))
1067 }
1068
1069 async fn resolve_message(
1070 &self,
1071 msg: crate::channel::ChannelMessage,
1072 ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
1073 use crate::channel::{Attachment, AttachmentKind};
1074 use zeph_llm::provider::{ImageData, MessagePart};
1075
1076 let text_base = msg.text.clone();
1077
1078 let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
1079 .attachments
1080 .into_iter()
1081 .partition(|a| a.kind == AttachmentKind::Audio);
1082
1083 tracing::debug!(
1084 audio = audio_attachments.len(),
1085 has_stt = self.providers.stt.is_some(),
1086 "resolve_message attachments"
1087 );
1088
1089 let text = if !audio_attachments.is_empty()
1090 && let Some(stt) = self.providers.stt.as_ref()
1091 {
1092 let mut transcribed_parts = Vec::new();
1093 for attachment in &audio_attachments {
1094 if attachment.data.len() > MAX_AUDIO_BYTES {
1095 tracing::warn!(
1096 size = attachment.data.len(),
1097 max = MAX_AUDIO_BYTES,
1098 "audio attachment exceeds size limit, skipping"
1099 );
1100 continue;
1101 }
1102 match stt
1103 .transcribe(&attachment.data, attachment.filename.as_deref())
1104 .await
1105 {
1106 Ok(result) => {
1107 tracing::info!(
1108 len = result.text.len(),
1109 language = ?result.language,
1110 "audio transcribed"
1111 );
1112 transcribed_parts.push(result.text);
1113 }
1114 Err(e) => {
1115 tracing::error!(error = %e, "audio transcription failed");
1116 }
1117 }
1118 }
1119 if transcribed_parts.is_empty() {
1120 text_base
1121 } else {
1122 let transcribed = transcribed_parts.join("\n");
1123 if text_base.is_empty() {
1124 transcribed
1125 } else {
1126 format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
1127 }
1128 }
1129 } else {
1130 if !audio_attachments.is_empty() {
1131 tracing::warn!(
1132 count = audio_attachments.len(),
1133 "audio attachments received but no STT provider configured, dropping"
1134 );
1135 }
1136 text_base
1137 };
1138
1139 let mut image_parts = Vec::new();
1140 for attachment in image_attachments {
1141 if attachment.data.len() > MAX_IMAGE_BYTES {
1142 tracing::warn!(
1143 size = attachment.data.len(),
1144 max = MAX_IMAGE_BYTES,
1145 "image attachment exceeds size limit, skipping"
1146 );
1147 continue;
1148 }
1149 let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
1150 image_parts.push(MessagePart::Image(Box::new(ImageData {
1151 data: attachment.data,
1152 mime_type,
1153 })));
1154 }
1155
1156 (text, image_parts)
1157 }
1158
1159 fn begin_turn(&mut self, input: turn::TurnInput) -> turn::Turn {
1166 let id = turn::TurnId(self.debug_state.iteration_counter as u64);
1167 self.debug_state.iteration_counter += 1;
1168 self.lifecycle.cancel_token = CancellationToken::new();
1169 self.security.user_provided_urls.write().clear();
1170 turn::Turn::new(id, input)
1171 }
1172
1173 fn end_turn(&mut self, turn: turn::Turn) {
1180 self.metrics.pending_timings = turn.metrics.timings;
1181 self.flush_turn_timings();
1182 self.session.current_turn_intent = None;
1184 }
1185
1186 #[cfg_attr(
1187 feature = "profiling",
1188 tracing::instrument(name = "agent.turn", skip_all, fields(turn_id))
1189 )]
1190 async fn process_user_message(
1191 &mut self,
1192 text: String,
1193 image_parts: Vec<zeph_llm::provider::MessagePart>,
1194 ) -> Result<(), error::AgentError> {
1195 let input = turn::TurnInput::new(text, image_parts);
1196 let mut t = self.begin_turn(input);
1197
1198 let turn_idx = usize::try_from(t.id().0).unwrap_or(usize::MAX);
1199 tracing::Span::current().record("turn_id", t.id().0);
1200 self.debug_state
1202 .start_iteration_span(turn_idx, t.input.text.trim());
1203
1204 let result = self.process_user_message_inner(&mut t).await;
1205
1206 let span_status = if result.is_ok() {
1208 crate::debug_dump::trace::SpanStatus::Ok
1209 } else {
1210 crate::debug_dump::trace::SpanStatus::Error {
1211 message: "iteration failed".to_owned(),
1212 }
1213 };
1214 self.debug_state.end_iteration_span(turn_idx, span_status);
1215
1216 self.end_turn(t);
1217 result
1218 }
1219
1220 async fn process_user_message_inner(
1221 &mut self,
1222 turn: &mut turn::Turn,
1223 ) -> Result<(), error::AgentError> {
1224 let bg_signal = self.lifecycle.supervisor.reap();
1228 if bg_signal.did_summarize {
1229 self.memory_state.persistence.unsummarized_count = 0;
1230 tracing::debug!("background summarization completed; unsummarized_count reset");
1231 }
1232 {
1233 let snap = self.lifecycle.supervisor.metrics_snapshot();
1234 self.update_metrics(|m| {
1235 m.bg_inflight = snap.inflight as u64;
1236 m.bg_dropped = snap.total_dropped();
1237 m.bg_completed = snap.total_completed();
1238 m.bg_enrichment_inflight = snap.class_inflight[0] as u64;
1239 m.bg_telemetry_inflight = snap.class_inflight[1] as u64;
1240 });
1241 }
1242
1243 if self.runtime.supervisor_config.abort_enrichment_on_turn {
1247 self.lifecycle
1248 .supervisor
1249 .abort_class(agent_supervisor::TaskClass::Enrichment);
1250 }
1251
1252 let signal = Arc::clone(&self.lifecycle.cancel_signal);
1256 let token = turn.cancel_token.clone();
1257 self.lifecycle.cancel_token = turn.cancel_token.clone();
1259 if let Some(prev) = self.lifecycle.cancel_bridge_handle.take() {
1260 prev.abort();
1261 }
1262 self.lifecycle.cancel_bridge_handle = Some(tokio::spawn(async move {
1263 signal.notified().await;
1264 token.cancel();
1265 }));
1266
1267 let text = turn.input.text.clone();
1269 let trimmed_owned = text.trim().to_owned();
1270 let trimmed = trimmed_owned.as_str();
1271
1272 if self.security.vigil.is_some() {
1275 let intent_len = trimmed.floor_char_boundary(1024.min(trimmed.len()));
1276 self.session.current_turn_intent = Some(trimmed[..intent_len].to_owned());
1277 }
1278
1279 if let Some(result) = self.dispatch_slash_command(trimmed).await {
1280 return result;
1281 }
1282
1283 self.check_pending_rollbacks().await;
1284
1285 if self.pre_process_security(trimmed).await? {
1286 return Ok(());
1287 }
1288
1289 let t_ctx = std::time::Instant::now();
1290 tracing::debug!("turn timing: prepare_context start");
1291 self.advance_context_lifecycle(&text, trimmed).await;
1292 turn.metrics_mut().timings.prepare_context_ms =
1293 u64::try_from(t_ctx.elapsed().as_millis()).unwrap_or(u64::MAX);
1294 tracing::debug!(
1295 ms = turn.metrics_snapshot().timings.prepare_context_ms,
1296 "turn timing: prepare_context done"
1297 );
1298
1299 let image_parts = std::mem::take(&mut turn.input.image_parts);
1300 let user_msg = self.build_user_message(&text, image_parts);
1301
1302 let urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
1305 if !urls.is_empty() {
1306 self.security.user_provided_urls.write().extend(urls);
1307 }
1308
1309 self.memory_state.extraction.goal_text = Some(text.clone());
1312
1313 let t_persist = std::time::Instant::now();
1314 tracing::debug!("turn timing: persist_message(user) start");
1315 self.persist_message(Role::User, &text, &[], false).await;
1317 turn.metrics_mut().timings.persist_message_ms =
1318 u64::try_from(t_persist.elapsed().as_millis()).unwrap_or(u64::MAX);
1319 tracing::debug!(
1320 ms = turn.metrics_snapshot().timings.persist_message_ms,
1321 "turn timing: persist_message(user) done"
1322 );
1323 self.push_message(user_msg);
1324
1325 tracing::debug!("turn timing: process_response start");
1328 if let Err(e) = self.process_response().await {
1329 self.learning_engine.learning_tasks.detach_all();
1331 tracing::error!("Response processing failed: {e:#}");
1332 let user_msg = format!("Error: {e:#}");
1333 self.channel.send(&user_msg).await?;
1334 self.msg.messages.pop();
1335 self.recompute_prompt_tokens();
1336 self.channel.flush_chunks().await?;
1337 } else {
1338 self.learning_engine.learning_tasks.detach_all();
1341 self.truncate_old_tool_results();
1342 self.maybe_update_magic_docs();
1344 }
1345 tracing::debug!("turn timing: process_response done");
1346
1347 turn.metrics_mut().timings.llm_chat_ms = self.metrics.pending_timings.llm_chat_ms;
1352 turn.metrics_mut().timings.tool_exec_ms = self.metrics.pending_timings.tool_exec_ms;
1353
1354 Ok(())
1355 }
1356
1357 #[cfg_attr(
1359 feature = "profiling",
1360 tracing::instrument(name = "agent.security_prescreen", skip_all)
1361 )]
1362 async fn pre_process_security(&mut self, trimmed: &str) -> Result<bool, error::AgentError> {
1363 if let Some(ref guardrail) = self.security.guardrail {
1365 use zeph_sanitizer::guardrail::GuardrailVerdict;
1366 let verdict = guardrail.check(trimmed).await;
1367 match &verdict {
1368 GuardrailVerdict::Flagged { reason, .. } => {
1369 tracing::warn!(
1370 reason = %reason,
1371 should_block = verdict.should_block(),
1372 "guardrail flagged user input"
1373 );
1374 if verdict.should_block() {
1375 let msg = format!("[guardrail] Input blocked: {reason}");
1376 let _ = self.channel.send(&msg).await;
1377 let _ = self.channel.flush_chunks().await;
1378 return Ok(true);
1379 }
1380 let _ = self
1382 .channel
1383 .send(&format!("[guardrail] Warning: {reason}"))
1384 .await;
1385 }
1386 GuardrailVerdict::Error { error } => {
1387 if guardrail.error_should_block() {
1388 tracing::warn!(%error, "guardrail check failed (fail_strategy=closed), blocking input");
1389 let msg = "[guardrail] Input blocked: check failed (see logs for details)";
1390 let _ = self.channel.send(msg).await;
1391 let _ = self.channel.flush_chunks().await;
1392 return Ok(true);
1393 }
1394 tracing::warn!(%error, "guardrail check failed (fail_strategy=open), allowing input");
1395 }
1396 GuardrailVerdict::Safe => {}
1397 }
1398 }
1399
1400 #[cfg(feature = "classifiers")]
1406 if self.security.sanitizer.scan_user_input() {
1407 match self.security.sanitizer.classify_injection(trimmed).await {
1408 zeph_sanitizer::InjectionVerdict::Blocked => {
1409 self.push_classifier_metrics();
1410 let _ = self
1411 .channel
1412 .send("[security] Input blocked: injection detected by classifier.")
1413 .await;
1414 let _ = self.channel.flush_chunks().await;
1415 return Ok(true);
1416 }
1417 zeph_sanitizer::InjectionVerdict::Suspicious => {
1418 tracing::warn!("injection_classifier soft_signal on user input");
1419 }
1420 zeph_sanitizer::InjectionVerdict::Clean => {}
1421 }
1422 }
1423 #[cfg(feature = "classifiers")]
1424 self.push_classifier_metrics();
1425
1426 Ok(false)
1427 }
1428
1429 #[cfg_attr(
1430 feature = "profiling",
1431 tracing::instrument(name = "agent.prepare_context", skip_all)
1432 )]
1433 async fn advance_context_lifecycle(&mut self, text: &str, trimmed: &str) {
1434 self.mcp.pruning_cache.reset();
1436
1437 let conv_id = self.memory_state.persistence.conversation_id;
1440 self.rebuild_system_prompt(text).await;
1441
1442 self.detect_and_record_corrections(trimmed, conv_id).await;
1443 self.learning_engine.tick();
1444 self.analyze_and_learn().await;
1445 self.sync_graph_counts().await;
1446
1447 self.context_manager.compaction = self.context_manager.compaction.advance_turn();
1452
1453 {
1455 self.focus.tick();
1456
1457 let sidequest_should_fire = self.sidequest.tick();
1460 if sidequest_should_fire && !self.context_manager.compaction.is_compacted_this_turn() {
1461 self.maybe_sidequest_eviction();
1462 }
1463 }
1464
1465 if let Some(warning) = self.cache_expiry_warning() {
1467 tracing::info!(warning, "cache expiry warning");
1468 let _ = self.channel.send_status(&warning).await;
1469 }
1470
1471 self.maybe_time_based_microcompact();
1474
1475 self.maybe_apply_deferred_summaries();
1480 self.flush_deferred_summaries().await;
1481
1482 if let Err(e) = self.maybe_proactive_compress().await {
1484 tracing::warn!("proactive compression failed: {e:#}");
1485 }
1486
1487 if let Err(e) = self.maybe_compact().await {
1488 tracing::warn!("context compaction failed: {e:#}");
1489 }
1490
1491 if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
1492 tracing::warn!("context preparation failed: {e:#}");
1493 }
1494
1495 self.provider
1497 .set_memory_confidence(self.memory_state.persistence.last_recall_confidence);
1498
1499 self.learning_engine.reset_reflection();
1500 }
1501
1502 fn build_user_message(
1503 &mut self,
1504 text: &str,
1505 image_parts: Vec<zeph_llm::provider::MessagePart>,
1506 ) -> Message {
1507 let mut all_image_parts = std::mem::take(&mut self.msg.pending_image_parts);
1508 all_image_parts.extend(image_parts);
1509
1510 if !all_image_parts.is_empty() && self.provider.supports_vision() {
1511 let mut parts = vec![zeph_llm::provider::MessagePart::Text {
1512 text: text.to_owned(),
1513 }];
1514 parts.extend(all_image_parts);
1515 Message::from_parts(Role::User, parts)
1516 } else {
1517 if !all_image_parts.is_empty() {
1518 tracing::warn!(
1519 count = all_image_parts.len(),
1520 "image attachments dropped: provider does not support vision"
1521 );
1522 }
1523 Message {
1524 role: Role::User,
1525 content: text.to_owned(),
1526 parts: vec![],
1527 metadata: MessageMetadata::default(),
1528 }
1529 }
1530 }
1531
1532 async fn poll_subagent_until_done(&mut self, task_id: &str, label: &str) -> Option<String> {
1535 use zeph_subagent::SubAgentState;
1536 let result = loop {
1537 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1538
1539 #[allow(clippy::redundant_closure_for_method_calls)]
1543 let pending = self
1544 .orchestration
1545 .subagent_manager
1546 .as_mut()
1547 .and_then(|m| m.try_recv_secret_request());
1548 if let Some((req_task_id, req)) = pending {
1549 let confirm_prompt = format!(
1552 "Sub-agent requests secret '{}'. Allow?",
1553 crate::text::truncate_to_chars(&req.secret_key, 100)
1554 );
1555 let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
1556 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1557 if approved {
1558 let ttl = std::time::Duration::from_mins(5);
1559 let key = req.secret_key.clone();
1560 if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
1561 let _ = mgr.deliver_secret(&req_task_id, key);
1562 }
1563 } else {
1564 let _ = mgr.deny_secret(&req_task_id);
1565 }
1566 }
1567 }
1568
1569 let mgr = self.orchestration.subagent_manager.as_ref()?;
1570 let statuses = mgr.statuses();
1571 let Some((_, status)) = statuses.iter().find(|(id, _)| id == task_id) else {
1572 break format!("{label} completed (no status available).");
1573 };
1574 match status.state {
1575 SubAgentState::Completed => {
1576 let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
1577 break format!("{label} completed: {msg}");
1578 }
1579 SubAgentState::Failed => {
1580 let msg = status
1581 .last_message
1582 .clone()
1583 .unwrap_or_else(|| "unknown error".into());
1584 break format!("{label} failed: {msg}");
1585 }
1586 SubAgentState::Canceled => {
1587 break format!("{label} was cancelled.");
1588 }
1589 _ => {
1590 let _ = self
1591 .channel
1592 .send_status(&format!(
1593 "{label}: turn {}/{}",
1594 status.turns_used,
1595 self.orchestration
1596 .subagent_manager
1597 .as_ref()
1598 .and_then(|m| m.agents_def(task_id))
1599 .map_or(20, |d| d.permissions.max_turns)
1600 ))
1601 .await;
1602 }
1603 }
1604 };
1605 Some(result)
1606 }
1607
1608 fn resolve_agent_id_prefix(&mut self, prefix: &str) -> Option<Result<String, String>> {
1611 let mgr = self.orchestration.subagent_manager.as_mut()?;
1612 let full_ids: Vec<String> = mgr
1613 .statuses()
1614 .into_iter()
1615 .map(|(tid, _)| tid)
1616 .filter(|tid| tid.starts_with(prefix))
1617 .collect();
1618 Some(match full_ids.as_slice() {
1619 [] => Err(format!("No sub-agent with id prefix '{prefix}'")),
1620 [fid] => Ok(fid.clone()),
1621 _ => Err(format!(
1622 "Ambiguous id prefix '{prefix}': matches {} agents",
1623 full_ids.len()
1624 )),
1625 })
1626 }
1627
1628 fn handle_agent_list(&self) -> Option<String> {
1629 use std::fmt::Write as _;
1630 let mgr = self.orchestration.subagent_manager.as_ref()?;
1631 let defs = mgr.definitions();
1632 if defs.is_empty() {
1633 return Some("No sub-agent definitions found.".into());
1634 }
1635 let mut out = String::from("Available sub-agents:\n");
1636 for d in defs {
1637 let memory_label = match d.memory {
1638 Some(zeph_subagent::MemoryScope::User) => " [memory:user]",
1639 Some(zeph_subagent::MemoryScope::Project) => " [memory:project]",
1640 Some(zeph_subagent::MemoryScope::Local) => " [memory:local]",
1641 None => "",
1642 };
1643 if let Some(ref src) = d.source {
1644 let _ = writeln!(
1645 out,
1646 " {}{} — {} ({})",
1647 d.name, memory_label, d.description, src
1648 );
1649 } else {
1650 let _ = writeln!(out, " {}{} — {}", d.name, memory_label, d.description);
1651 }
1652 }
1653 Some(out)
1654 }
1655
1656 fn handle_agent_status(&self) -> Option<String> {
1657 use std::fmt::Write as _;
1658 let mgr = self.orchestration.subagent_manager.as_ref()?;
1659 let statuses = mgr.statuses();
1660 if statuses.is_empty() {
1661 return Some("No active sub-agents.".into());
1662 }
1663 let mut out = String::from("Active sub-agents:\n");
1664 for (id, s) in &statuses {
1665 let state = format!("{:?}", s.state).to_lowercase();
1666 let elapsed = s.started_at.elapsed().as_secs();
1667 let _ = writeln!(
1668 out,
1669 " [{short}] {state} turns={t} elapsed={elapsed}s {msg}",
1670 short = &id[..8.min(id.len())],
1671 t = s.turns_used,
1672 msg = s.last_message.as_deref().unwrap_or(""),
1673 );
1674 if let Some(def) = mgr.agents_def(id)
1676 && let Some(scope) = def.memory
1677 && let Ok(dir) = zeph_subagent::memory::resolve_memory_dir(scope, &def.name)
1678 {
1679 let _ = writeln!(out, " memory: {}", dir.display());
1680 }
1681 }
1682 Some(out)
1683 }
1684
1685 fn handle_agent_approve(&mut self, id: &str) -> Option<String> {
1686 let full_id = match self.resolve_agent_id_prefix(id)? {
1687 Ok(fid) => fid,
1688 Err(msg) => return Some(msg),
1689 };
1690 let mgr = self.orchestration.subagent_manager.as_mut()?;
1691 if let Some((tid, req)) = mgr.try_recv_secret_request()
1692 && tid == full_id
1693 {
1694 let key = req.secret_key.clone();
1695 let ttl = std::time::Duration::from_mins(5);
1696 if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
1697 return Some(format!("Approve failed: {e}"));
1698 }
1699 if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
1700 return Some(format!("Secret delivery failed: {e}"));
1701 }
1702 return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
1703 }
1704 Some(format!(
1705 "No pending secret request for sub-agent '{full_id}'."
1706 ))
1707 }
1708
1709 fn handle_agent_deny(&mut self, id: &str) -> Option<String> {
1710 let full_id = match self.resolve_agent_id_prefix(id)? {
1711 Ok(fid) => fid,
1712 Err(msg) => return Some(msg),
1713 };
1714 let mgr = self.orchestration.subagent_manager.as_mut()?;
1715 match mgr.deny_secret(&full_id) {
1716 Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
1717 Err(e) => Some(format!("Deny failed: {e}")),
1718 }
1719 }
1720
1721 #[allow(clippy::too_many_lines)]
1722 async fn handle_agent_command(&mut self, cmd: zeph_subagent::AgentCommand) -> Option<String> {
1723 use zeph_subagent::AgentCommand;
1724
1725 match cmd {
1726 AgentCommand::List => self.handle_agent_list(),
1727 AgentCommand::Background { name, prompt } => {
1728 let provider = self.provider.clone();
1729 let tool_executor = Arc::clone(&self.tool_executor);
1730 let skills = self.filtered_skills_for(&name);
1731 let cfg = self.orchestration.subagent_config.clone();
1732 let spawn_ctx = self.build_spawn_context(&cfg);
1733 let mgr = self.orchestration.subagent_manager.as_mut()?;
1734 match mgr.spawn(
1735 &name,
1736 &prompt,
1737 provider,
1738 tool_executor,
1739 skills,
1740 &cfg,
1741 spawn_ctx,
1742 ) {
1743 Ok(id) => Some(format!(
1744 "Sub-agent '{name}' started in background (id: {short})",
1745 short = &id[..8.min(id.len())]
1746 )),
1747 Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
1748 }
1749 }
1750 AgentCommand::Spawn { name, prompt }
1751 | AgentCommand::Mention {
1752 agent: name,
1753 prompt,
1754 } => {
1755 let provider = self.provider.clone();
1757 let tool_executor = Arc::clone(&self.tool_executor);
1758 let skills = self.filtered_skills_for(&name);
1759 let cfg = self.orchestration.subagent_config.clone();
1760 let spawn_ctx = self.build_spawn_context(&cfg);
1761 let mgr = self.orchestration.subagent_manager.as_mut()?;
1762 let task_id = match mgr.spawn(
1763 &name,
1764 &prompt,
1765 provider,
1766 tool_executor,
1767 skills,
1768 &cfg,
1769 spawn_ctx,
1770 ) {
1771 Ok(id) => id,
1772 Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
1773 };
1774 let short = task_id[..8.min(task_id.len())].to_owned();
1775 let _ = self
1776 .channel
1777 .send(&format!("Sub-agent '{name}' running... (id: {short})"))
1778 .await;
1779 let label = format!("Sub-agent '{name}'");
1780 self.poll_subagent_until_done(&task_id, &label).await
1781 }
1782 AgentCommand::Status => self.handle_agent_status(),
1783 AgentCommand::Cancel { id } => {
1784 let mgr = self.orchestration.subagent_manager.as_mut()?;
1785 let ids: Vec<String> = mgr
1787 .statuses()
1788 .into_iter()
1789 .map(|(task_id, _)| task_id)
1790 .filter(|task_id| task_id.starts_with(&id))
1791 .collect();
1792 match ids.as_slice() {
1793 [] => Some(format!("No sub-agent with id prefix '{id}'")),
1794 [full_id] => {
1795 let full_id = full_id.clone();
1796 match mgr.cancel(&full_id) {
1797 Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
1798 Err(e) => Some(format!("Cancel failed: {e}")),
1799 }
1800 }
1801 _ => Some(format!(
1802 "Ambiguous id prefix '{id}': matches {} agents",
1803 ids.len()
1804 )),
1805 }
1806 }
1807 AgentCommand::Approve { id } => self.handle_agent_approve(&id),
1808 AgentCommand::Deny { id } => self.handle_agent_deny(&id),
1809 AgentCommand::Resume { id, prompt } => {
1810 let cfg = self.orchestration.subagent_config.clone();
1811 let def_name = {
1814 let mgr = self.orchestration.subagent_manager.as_ref()?;
1815 match mgr.def_name_for_resume(&id, &cfg) {
1816 Ok(name) => name,
1817 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
1818 }
1819 };
1820 let skills = self.filtered_skills_for(&def_name);
1821 let provider = self.provider.clone();
1822 let tool_executor = Arc::clone(&self.tool_executor);
1823 let mgr = self.orchestration.subagent_manager.as_mut()?;
1824 let (task_id, _) =
1825 match mgr.resume(&id, &prompt, provider, tool_executor, skills, &cfg) {
1826 Ok(pair) => pair,
1827 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
1828 };
1829 let short = task_id[..8.min(task_id.len())].to_owned();
1830 let _ = self
1831 .channel
1832 .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
1833 .await;
1834 self.poll_subagent_until_done(&task_id, "Resumed sub-agent")
1835 .await
1836 }
1837 }
1838 }
1839
1840 fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
1841 let mgr = self.orchestration.subagent_manager.as_ref()?;
1842 let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
1843 let reg = self.skill_state.registry.read();
1844 match zeph_subagent::filter_skills(®, &def.skills) {
1845 Ok(skills) => {
1846 let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
1847 if bodies.is_empty() {
1848 None
1849 } else {
1850 Some(bodies)
1851 }
1852 }
1853 Err(e) => {
1854 tracing::warn!(error = %e, "skill filtering failed for sub-agent");
1855 None
1856 }
1857 }
1858 }
1859
1860 fn build_spawn_context(
1862 &self,
1863 cfg: &zeph_config::SubAgentConfig,
1864 ) -> zeph_subagent::SpawnContext {
1865 zeph_subagent::SpawnContext {
1866 parent_messages: self.extract_parent_messages(cfg),
1867 parent_cancel: Some(self.lifecycle.cancel_token.clone()),
1868 parent_provider_name: {
1869 let name = &self.runtime.active_provider_name;
1870 if name.is_empty() {
1871 None
1872 } else {
1873 Some(name.clone())
1874 }
1875 },
1876 spawn_depth: self.runtime.spawn_depth,
1877 mcp_tool_names: self.extract_mcp_tool_names(),
1878 }
1879 }
1880
1881 fn extract_parent_messages(
1886 &self,
1887 config: &zeph_config::SubAgentConfig,
1888 ) -> Vec<zeph_llm::provider::Message> {
1889 use zeph_llm::provider::Role;
1890 if config.context_window_turns == 0 {
1891 return Vec::new();
1892 }
1893 let non_system: Vec<_> = self
1894 .msg
1895 .messages
1896 .iter()
1897 .filter(|m| m.role != Role::System)
1898 .cloned()
1899 .collect();
1900 let take_count = config.context_window_turns * 2;
1901 let start = non_system.len().saturating_sub(take_count);
1902 let mut msgs = non_system[start..].to_vec();
1903
1904 let max_chars = 128_000usize / 4; let mut total_chars: usize = 0;
1907 let mut keep = msgs.len();
1908 for (i, m) in msgs.iter().enumerate() {
1909 total_chars += m.content.len();
1910 if total_chars > max_chars {
1911 keep = i;
1912 break;
1913 }
1914 }
1915 if keep < msgs.len() {
1916 tracing::info!(
1917 kept = keep,
1918 requested = config.context_window_turns * 2,
1919 "[subagent] truncated parent history from {} to {} turns due to token budget",
1920 config.context_window_turns * 2,
1921 keep
1922 );
1923 msgs.truncate(keep);
1924 }
1925 msgs
1926 }
1927
1928 fn extract_mcp_tool_names(&self) -> Vec<String> {
1930 self.tool_executor
1931 .tool_definitions_erased()
1932 .into_iter()
1933 .filter(|t| t.id.starts_with("mcp_"))
1934 .map(|t| t.id.to_string())
1935 .collect()
1936 }
1937
1938 fn classify_source_kind(
1942 skill_dir: &std::path::Path,
1943 managed_dir: Option<&std::path::PathBuf>,
1944 bundled_names: &std::collections::HashSet<String>,
1945 ) -> zeph_memory::store::SourceKind {
1946 if managed_dir.is_some_and(|d| skill_dir.starts_with(d)) {
1947 let skill_name = skill_dir.file_name().and_then(|n| n.to_str()).unwrap_or("");
1948 let has_marker = skill_dir.join(".bundled").exists();
1949 if has_marker && bundled_names.contains(skill_name) {
1950 zeph_memory::store::SourceKind::Bundled
1951 } else {
1952 if has_marker {
1953 tracing::warn!(
1954 skill = %skill_name,
1955 "skill has .bundled marker but is not in the bundled skill \
1956 allowlist — classifying as Hub"
1957 );
1958 }
1959 zeph_memory::store::SourceKind::Hub
1960 }
1961 } else {
1962 zeph_memory::store::SourceKind::Local
1963 }
1964 }
1965
1966 async fn update_trust_for_reloaded_skills(
1968 &mut self,
1969 all_meta: &[zeph_skills::loader::SkillMeta],
1970 ) {
1971 let memory = self.memory_state.persistence.memory.clone();
1973 let Some(memory) = memory else {
1974 return;
1975 };
1976 let trust_cfg = self.skill_state.trust_config.clone();
1977 let managed_dir = self.skill_state.managed_dir.clone();
1978 let bundled_names: std::collections::HashSet<String> =
1979 zeph_skills::bundled_skill_names().into_iter().collect();
1980 for meta in all_meta {
1981 let skill_dir = meta.skill_dir.clone();
1984 let managed_dir_ref = managed_dir.clone();
1985 let bundled_names_ref = bundled_names.clone();
1986 let fs_result: Option<(String, zeph_memory::store::SourceKind)> =
1987 tokio::task::spawn_blocking(move || {
1988 let hash = zeph_skills::compute_skill_hash(&skill_dir).ok()?;
1989 let source_kind = Self::classify_source_kind(
1990 &skill_dir,
1991 managed_dir_ref.as_ref(),
1992 &bundled_names_ref,
1993 );
1994 Some((hash, source_kind))
1995 })
1996 .await
1997 .unwrap_or(None);
1998
1999 let Some((current_hash, source_kind)) = fs_result else {
2000 tracing::warn!("failed to compute hash for '{}'", meta.name);
2001 continue;
2002 };
2003 let initial_level = match source_kind {
2004 zeph_memory::store::SourceKind::Bundled => &trust_cfg.bundled_level,
2005 zeph_memory::store::SourceKind::Hub => &trust_cfg.default_level,
2006 zeph_memory::store::SourceKind::Local | zeph_memory::store::SourceKind::File => {
2007 &trust_cfg.local_level
2008 }
2009 };
2010 let existing = memory
2011 .sqlite()
2012 .load_skill_trust(&meta.name)
2013 .await
2014 .ok()
2015 .flatten();
2016 let trust_level_str = if let Some(ref row) = existing {
2017 if row.blake3_hash != current_hash {
2018 trust_cfg.hash_mismatch_level.to_string()
2019 } else if row.source_kind != source_kind {
2020 let stored = row
2024 .trust_level
2025 .parse::<zeph_tools::SkillTrustLevel>()
2026 .unwrap_or_else(|_| {
2027 tracing::warn!(
2028 skill = %meta.name,
2029 raw = %row.trust_level,
2030 "unrecognised trust_level in DB, treating as quarantined"
2031 );
2032 zeph_tools::SkillTrustLevel::Quarantined
2033 });
2034 if !stored.is_active() || stored.severity() <= initial_level.severity() {
2035 row.trust_level.clone()
2036 } else {
2037 initial_level.to_string()
2038 }
2039 } else {
2040 row.trust_level.clone()
2041 }
2042 } else {
2043 initial_level.to_string()
2044 };
2045 let source_path = meta.skill_dir.to_str();
2046 if let Err(e) = memory
2047 .sqlite()
2048 .upsert_skill_trust(
2049 &meta.name,
2050 &trust_level_str,
2051 source_kind,
2052 None,
2053 source_path,
2054 ¤t_hash,
2055 )
2056 .await
2057 {
2058 tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
2059 }
2060 }
2061 }
2062
2063 async fn rebuild_skill_matcher(&mut self, all_meta: &[&zeph_skills::loader::SkillMeta]) {
2065 let provider = self.embedding_provider.clone();
2066 let embed_timeout = std::time::Duration::from_secs(self.runtime.timeouts.embedding_seconds);
2067 let embed_fn = move |text: &str| -> zeph_skills::matcher::EmbedFuture {
2068 let owned = text.to_owned();
2069 let p = provider.clone();
2070 Box::pin(async move {
2071 if let Ok(result) = tokio::time::timeout(embed_timeout, p.embed(&owned)).await {
2072 result
2073 } else {
2074 tracing::warn!(
2075 timeout_secs = embed_timeout.as_secs(),
2076 "skill matcher: embedding timed out"
2077 );
2078 Err(zeph_llm::LlmError::Timeout)
2079 }
2080 })
2081 };
2082
2083 let needs_inmemory_rebuild = !self
2084 .skill_state
2085 .matcher
2086 .as_ref()
2087 .is_some_and(SkillMatcherBackend::is_qdrant);
2088
2089 if needs_inmemory_rebuild {
2090 self.skill_state.matcher = SkillMatcher::new(all_meta, embed_fn)
2091 .await
2092 .map(SkillMatcherBackend::InMemory);
2093 } else if let Some(ref mut backend) = self.skill_state.matcher {
2094 let _ = self.channel.send_status("syncing skill index...").await;
2095 let on_progress: Option<Box<dyn Fn(usize, usize) + Send>> = self
2096 .session
2097 .status_tx
2098 .clone()
2099 .map(|tx| -> Box<dyn Fn(usize, usize) + Send> {
2100 Box::new(move |completed, total| {
2101 let msg = format!("Syncing skills: {completed}/{total}");
2102 let _ = tx.send(msg);
2103 })
2104 });
2105 if let Err(e) = backend
2106 .sync(
2107 all_meta,
2108 &self.skill_state.embedding_model,
2109 embed_fn,
2110 on_progress,
2111 )
2112 .await
2113 {
2114 tracing::warn!("failed to sync skill embeddings: {e:#}");
2115 }
2116 }
2117
2118 if self.skill_state.hybrid_search {
2119 let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
2120 let _ = self.channel.send_status("rebuilding search index...").await;
2121 self.skill_state.rebuild_bm25(&descs);
2122 }
2123 }
2124
2125 #[cfg_attr(
2126 feature = "profiling",
2127 tracing::instrument(name = "skill.hot_reload", skip_all)
2128 )]
2129 async fn reload_skills(&mut self) {
2130 let old_fp = self.skill_state.fingerprint();
2131 let reload_paths = if let Some(ref supplier) = self.skill_state.plugin_dirs_supplier {
2132 let plugin_dirs = supplier();
2133 let mut paths = self.skill_state.skill_paths.clone();
2134 for dir in plugin_dirs {
2135 if !paths.contains(&dir) {
2136 paths.push(dir);
2137 }
2138 }
2139 paths
2140 } else {
2141 self.skill_state.skill_paths.clone()
2142 };
2143 self.skill_state.registry.write().reload(&reload_paths);
2144 if self.skill_state.fingerprint() == old_fp {
2145 return;
2146 }
2147 let _ = self.channel.send_status("reloading skills...").await;
2148
2149 let all_meta = self
2150 .skill_state
2151 .registry
2152 .read()
2153 .all_meta()
2154 .into_iter()
2155 .cloned()
2156 .collect::<Vec<_>>();
2157
2158 self.update_trust_for_reloaded_skills(&all_meta).await;
2159
2160 let all_meta_refs = all_meta.iter().collect::<Vec<_>>();
2161 self.rebuild_skill_matcher(&all_meta_refs).await;
2162
2163 let all_skills: Vec<Skill> = {
2164 let reg = self.skill_state.registry.read();
2165 reg.all_meta()
2166 .iter()
2167 .filter_map(|m| reg.get_skill(&m.name).ok())
2168 .collect()
2169 };
2170 let trust_map = self.build_skill_trust_map().await;
2171 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
2172 let skills_prompt = SkillState::rebuild_prompt(&all_skills, &trust_map, &empty_health);
2173 self.skill_state
2174 .last_skills_prompt
2175 .clone_from(&skills_prompt);
2176 let system_prompt = build_system_prompt(&skills_prompt, None);
2177 if let Some(msg) = self.msg.messages.first_mut() {
2178 msg.content = system_prompt;
2179 }
2180
2181 let _ = self.channel.send_status("").await;
2182 tracing::info!(
2183 "reloaded {} skill(s)",
2184 self.skill_state.registry.read().all_meta().len()
2185 );
2186 }
2187
2188 fn reload_instructions(&mut self) {
2189 if let Some(ref mut rx) = self.instructions.reload_rx {
2191 while rx.try_recv().is_ok() {}
2192 }
2193 let Some(ref state) = self.instructions.reload_state else {
2194 return;
2195 };
2196 let new_blocks = crate::instructions::load_instructions(
2197 &state.base_dir,
2198 &state.provider_kinds,
2199 &state.explicit_files,
2200 state.auto_detect,
2201 );
2202 let old_sources: std::collections::HashSet<_> =
2203 self.instructions.blocks.iter().map(|b| &b.source).collect();
2204 let new_sources: std::collections::HashSet<_> =
2205 new_blocks.iter().map(|b| &b.source).collect();
2206 for added in new_sources.difference(&old_sources) {
2207 tracing::info!(path = %added.display(), "instruction file added");
2208 }
2209 for removed in old_sources.difference(&new_sources) {
2210 tracing::info!(path = %removed.display(), "instruction file removed");
2211 }
2212 tracing::info!(
2213 old_count = self.instructions.blocks.len(),
2214 new_count = new_blocks.len(),
2215 "reloaded instruction files"
2216 );
2217 self.instructions.blocks = new_blocks;
2218 }
2219
2220 fn reload_config(&mut self) {
2221 let Some(path) = self.lifecycle.config_path.clone() else {
2222 return;
2223 };
2224 let Some(config) = self.load_config_with_overlay(&path) else {
2225 return;
2226 };
2227 let budget_tokens = resolve_context_budget(&config, &self.provider);
2228 self.runtime.security = config.security;
2229 self.runtime.timeouts = config.timeouts;
2230 self.runtime.redact_credentials = config.memory.redact_credentials;
2231 self.memory_state.persistence.history_limit = config.memory.history_limit;
2232 self.memory_state.persistence.recall_limit = config.memory.semantic.recall_limit;
2233 self.memory_state.compaction.summarization_threshold =
2234 config.memory.summarization_threshold;
2235 self.skill_state.max_active_skills = config.skills.max_active_skills;
2236 self.skill_state.disambiguation_threshold = config.skills.disambiguation_threshold;
2237 self.skill_state.min_injection_score = config.skills.min_injection_score;
2238 self.skill_state.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
2239 self.skill_state.hybrid_search = config.skills.hybrid_search;
2240 self.skill_state.two_stage_matching = config.skills.two_stage_matching;
2241 self.skill_state.confusability_threshold =
2242 config.skills.confusability_threshold.clamp(0.0, 1.0);
2243 config
2244 .skills
2245 .generation_provider
2246 .as_str()
2247 .clone_into(&mut self.skill_state.generation_provider_name);
2248 self.skill_state.generation_output_dir =
2249 config.skills.generation_output_dir.as_deref().map(|p| {
2250 if let Some(stripped) = p.strip_prefix("~/") {
2251 dirs::home_dir()
2252 .map_or_else(|| std::path::PathBuf::from(p), |h| h.join(stripped))
2253 } else {
2254 std::path::PathBuf::from(p)
2255 }
2256 });
2257
2258 self.context_manager.budget = Some(
2259 ContextBudget::new(budget_tokens, 0.20).with_graph_enabled(config.memory.graph.enabled),
2260 );
2261
2262 {
2263 let graph_cfg = &config.memory.graph;
2264 if graph_cfg.rpe.enabled {
2265 if self.memory_state.extraction.rpe_router.is_none() {
2267 self.memory_state.extraction.rpe_router =
2268 Some(std::sync::Mutex::new(zeph_memory::RpeRouter::new(
2269 graph_cfg.rpe.threshold,
2270 graph_cfg.rpe.max_skip_turns,
2271 )));
2272 }
2273 } else {
2274 self.memory_state.extraction.rpe_router = None;
2275 }
2276 self.memory_state.extraction.graph_config = graph_cfg.clone();
2277 }
2278 self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
2279 self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
2280 self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
2281 self.context_manager.compaction_cooldown_turns = config.memory.compaction_cooldown_turns;
2282 self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
2283 self.context_manager.compression = config.memory.compression.clone();
2284 self.context_manager.routing = config.memory.store_routing.clone();
2285 self.context_manager.store_routing_provider = if config
2287 .memory
2288 .store_routing
2289 .routing_classifier_provider
2290 .is_empty()
2291 {
2292 None
2293 } else {
2294 let resolved = self.resolve_background_provider(
2295 &config.memory.store_routing.routing_classifier_provider,
2296 );
2297 Some(std::sync::Arc::new(resolved))
2298 };
2299 self.memory_state.persistence.cross_session_score_threshold =
2300 config.memory.cross_session_score_threshold;
2301
2302 self.index.repo_map_tokens = config.index.repo_map_tokens;
2303 self.index.repo_map_ttl = std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
2304
2305 tracing::info!("config reloaded");
2306 }
2307
2308 fn load_config_with_overlay(&mut self, path: &std::path::Path) -> Option<Config> {
2312 let mut config = match Config::load(path) {
2313 Ok(c) => c,
2314 Err(e) => {
2315 tracing::warn!("config reload failed: {e:#}");
2316 return None;
2317 }
2318 };
2319
2320 let new_overlay = if self.lifecycle.plugins_dir.as_os_str().is_empty() {
2322 None
2323 } else {
2324 match zeph_plugins::apply_plugin_config_overlays(
2325 &mut config,
2326 &self.lifecycle.plugins_dir,
2327 ) {
2328 Ok(o) => Some(o),
2329 Err(e) => {
2330 tracing::warn!(
2331 "plugin overlay merge failed during reload: {e:#}; \
2332 keeping previous runtime state"
2333 );
2334 return None;
2335 }
2336 }
2337 };
2338
2339 if let Some(ref overlay) = new_overlay {
2343 self.warn_on_shell_overlay_divergence(overlay, &config);
2344 }
2345 Some(config)
2346 }
2347
2348 fn warn_on_shell_overlay_divergence(
2354 &self,
2355 new_overlay: &zeph_plugins::ResolvedOverlay,
2356 config: &Config,
2357 ) {
2358 let new_blocked: Vec<String> = {
2359 let mut v = config.tools.shell.blocked_commands.clone();
2360 v.sort();
2361 v
2362 };
2363 let new_allowed: Vec<String> = {
2364 let mut v = config.tools.shell.allowed_commands.clone();
2365 v.sort();
2366 v
2367 };
2368
2369 let startup = &self.lifecycle.startup_shell_overlay;
2370 let blocked_changed = new_blocked != startup.blocked;
2371 let allowed_changed = new_allowed != startup.allowed;
2372
2373 if blocked_changed && let Some(ref h) = self.lifecycle.shell_policy_handle {
2375 h.rebuild(&config.tools.shell);
2376 tracing::info!(
2377 blocked_count = h.snapshot_blocked().len(),
2378 "shell blocked_commands rebuilt from hot-reload"
2379 );
2380 }
2381
2382 if allowed_changed {
2389 let msg = "plugin config overlay changed shell allowed_commands; RESTART REQUIRED \
2390 for sandbox path recomputation (blocked_commands was rebuilt live)";
2391 tracing::warn!("{msg}");
2392 if let Some(ref tx) = self.session.status_tx {
2393 let _ = tx.send(msg.to_owned());
2394 }
2395 }
2396
2397 let _ = new_overlay;
2398 }
2399
2400 #[allow(clippy::too_many_lines)]
2411 fn maybe_sidequest_eviction(&mut self) {
2412 use zeph_llm::provider::{Message, MessageMetadata, Role};
2413
2414 if self.sidequest.config.enabled {
2418 use crate::config::PruningStrategy;
2419 if !matches!(
2420 self.context_manager.compression.pruning_strategy,
2421 PruningStrategy::Reactive
2422 ) {
2423 tracing::warn!(
2424 strategy = ?self.context_manager.compression.pruning_strategy,
2425 "sidequest is enabled alongside a non-Reactive pruning strategy; \
2426 consider disabling sidequest.enabled to avoid redundant eviction"
2427 );
2428 }
2429 }
2430
2431 if self.focus.is_active() {
2433 tracing::debug!("sidequest: skipping — focus session active");
2434 self.compression.pending_sidequest_result = None;
2436 return;
2437 }
2438
2439 if let Some(handle) = self.compression.pending_sidequest_result.take() {
2441 use futures::FutureExt as _;
2443 match handle.now_or_never() {
2444 Some(Ok(Some(evicted_indices))) if !evicted_indices.is_empty() => {
2445 let cursors_snapshot = self.sidequest.tool_output_cursors.clone();
2446 let freed = self.sidequest.apply_eviction(
2447 &mut self.msg.messages,
2448 &evicted_indices,
2449 &self.metrics.token_counter,
2450 );
2451 if freed > 0 {
2452 self.recompute_prompt_tokens();
2453 self.context_manager.compaction =
2456 crate::agent::context_manager::CompactionState::CompactedThisTurn {
2457 cooldown: 0,
2458 };
2459 tracing::info!(
2460 freed_tokens = freed,
2461 evicted_cursors = evicted_indices.len(),
2462 pass = self.sidequest.passes_run,
2463 "sidequest eviction complete"
2464 );
2465 if let Some(ref d) = self.debug_state.debug_dumper {
2466 d.dump_sidequest_eviction(&cursors_snapshot, &evicted_indices, freed);
2467 }
2468 if let Some(ref tx) = self.session.status_tx {
2469 let _ = tx.send(format!("SideQuest evicted {freed} tokens"));
2470 }
2471 } else {
2472 if let Some(ref tx) = self.session.status_tx {
2474 let _ = tx.send(String::new());
2475 }
2476 }
2477 }
2478 Some(Ok(None | Some(_))) => {
2479 tracing::debug!("sidequest: pending result: no cursors to evict");
2480 if let Some(ref tx) = self.session.status_tx {
2481 let _ = tx.send(String::new());
2482 }
2483 }
2484 Some(Err(e)) => {
2485 tracing::debug!("sidequest: background task panicked: {e}");
2486 if let Some(ref tx) = self.session.status_tx {
2487 let _ = tx.send(String::new());
2488 }
2489 }
2490 None => {
2491 tracing::debug!(
2495 "sidequest: background LLM task not yet complete, rescheduling"
2496 );
2497 }
2498 }
2499 }
2500
2501 self.sidequest
2503 .rebuild_cursors(&self.msg.messages, &self.metrics.token_counter);
2504
2505 if self.sidequest.tool_output_cursors.is_empty() {
2506 tracing::debug!("sidequest: no eligible cursors");
2507 return;
2508 }
2509
2510 let prompt = self.sidequest.build_eviction_prompt();
2511 let max_eviction_ratio = self.sidequest.config.max_eviction_ratio;
2512 let n_cursors = self.sidequest.tool_output_cursors.len();
2513 let provider = self.summary_or_primary_provider().clone();
2515
2516 let handle = tokio::spawn(async move {
2518 let msgs = [Message {
2519 role: Role::User,
2520 content: prompt,
2521 parts: vec![],
2522 metadata: MessageMetadata::default(),
2523 }];
2524 let response =
2525 match tokio::time::timeout(std::time::Duration::from_secs(5), provider.chat(&msgs))
2526 .await
2527 {
2528 Ok(Ok(r)) => r,
2529 Ok(Err(e)) => {
2530 tracing::debug!("sidequest bg: LLM call failed: {e:#}");
2531 return None;
2532 }
2533 Err(_) => {
2534 tracing::debug!("sidequest bg: LLM call timed out");
2535 return None;
2536 }
2537 };
2538
2539 let start = response.find('{')?;
2540 let end = response.rfind('}')?;
2541 if start > end {
2542 return None;
2543 }
2544 let json_slice = &response[start..=end];
2545 let parsed: sidequest::EvictionResponse = serde_json::from_str(json_slice).ok()?;
2546 let mut valid: Vec<usize> = parsed
2547 .del_cursors
2548 .into_iter()
2549 .filter(|&c| c < n_cursors)
2550 .collect();
2551 valid.sort_unstable();
2552 valid.dedup();
2553 #[allow(
2554 clippy::cast_precision_loss,
2555 clippy::cast_possible_truncation,
2556 clippy::cast_sign_loss
2557 )]
2558 let max_evict = ((n_cursors as f32) * max_eviction_ratio).ceil() as usize;
2559 valid.truncate(max_evict);
2560 Some(valid)
2561 });
2562
2563 self.compression.pending_sidequest_result = Some(handle);
2564 tracing::debug!("sidequest: background LLM eviction task spawned");
2565 if let Some(ref tx) = self.session.status_tx {
2566 let _ = tx.send("SideQuest: scoring tool outputs...".into());
2567 }
2568 }
2569
2570 pub(crate) async fn check_cwd_changed(&mut self) {
2576 let current = match std::env::current_dir() {
2577 Ok(p) => p,
2578 Err(e) => {
2579 tracing::warn!("check_cwd_changed: failed to get cwd: {e}");
2580 return;
2581 }
2582 };
2583 if current == self.lifecycle.last_known_cwd {
2584 return;
2585 }
2586 let old_cwd = std::mem::replace(&mut self.lifecycle.last_known_cwd, current.clone());
2587 self.session.env_context.working_dir = current.display().to_string();
2588
2589 tracing::info!(
2590 old = %old_cwd.display(),
2591 new = %current.display(),
2592 "working directory changed"
2593 );
2594
2595 let _ = self
2596 .channel
2597 .send_status("Working directory changed\u{2026}")
2598 .await;
2599
2600 let hooks = self.session.hooks_config.cwd_changed.clone();
2601 if !hooks.is_empty() {
2602 let mut env = std::collections::HashMap::new();
2603 env.insert("ZEPH_OLD_CWD".to_owned(), old_cwd.display().to_string());
2604 env.insert("ZEPH_NEW_CWD".to_owned(), current.display().to_string());
2605 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env).await {
2606 tracing::warn!(error = %e, "CwdChanged hook failed");
2607 }
2608 }
2609
2610 let _ = self.channel.send_status("").await;
2611 }
2612
2613 pub(crate) async fn handle_file_changed(
2615 &mut self,
2616 event: crate::file_watcher::FileChangedEvent,
2617 ) {
2618 tracing::info!(path = %event.path.display(), "file changed");
2619
2620 let _ = self
2621 .channel
2622 .send_status("Running file-change hook\u{2026}")
2623 .await;
2624
2625 let hooks = self.session.hooks_config.file_changed_hooks.clone();
2626 if !hooks.is_empty() {
2627 let mut env = std::collections::HashMap::new();
2628 env.insert(
2629 "ZEPH_CHANGED_PATH".to_owned(),
2630 event.path.display().to_string(),
2631 );
2632 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env).await {
2633 tracing::warn!(error = %e, "FileChanged hook failed");
2634 }
2635 }
2636
2637 let _ = self.channel.send_status("").await;
2638 }
2639}
2640pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
2641 while !*rx.borrow_and_update() {
2642 if rx.changed().await.is_err() {
2643 std::future::pending::<()>().await;
2644 }
2645 }
2646}
2647
2648pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
2649 match rx {
2650 Some(inner) => {
2651 if let Some(v) = inner.recv().await {
2652 Some(v)
2653 } else {
2654 *rx = None;
2655 std::future::pending().await
2656 }
2657 }
2658 None => std::future::pending().await,
2659 }
2660}
2661
2662pub(crate) fn resolve_context_budget(config: &Config, provider: &AnyProvider) -> usize {
2668 let tokens = if config.memory.auto_budget && config.memory.context_budget_tokens == 0 {
2669 if let Some(ctx_size) = provider.context_window() {
2670 tracing::info!(
2671 model_context = ctx_size,
2672 "auto-configured context budget on reload"
2673 );
2674 ctx_size
2675 } else {
2676 0
2677 }
2678 } else {
2679 config.memory.context_budget_tokens
2680 };
2681 if tokens == 0 {
2682 tracing::warn!(
2683 "context_budget_tokens resolved to 0 on reload — using fallback of 128000 tokens"
2684 );
2685 128_000
2686 } else {
2687 tokens
2688 }
2689}
2690
2691#[cfg(test)]
2692mod tests;
2693
2694#[cfg(test)]
2695pub(crate) use tests::agent_tests;