1mod acp_commands;
5mod agent_access_impl;
6pub(crate) mod agent_supervisor;
7mod autodream;
8mod builder;
9mod command_context_impls;
10pub(crate) mod compaction_strategy;
11pub(super) mod compression_feedback;
12mod context;
13mod context_impls;
14pub(crate) mod context_manager;
15mod corrections;
16pub mod error;
17mod experiment_cmd;
18pub(super) mod feedback_detector;
19pub(crate) mod focus;
20mod index;
21mod learning;
22pub(crate) mod learning_engine;
23mod log_commands;
24mod loop_event;
25mod lsp_commands;
26mod magic_docs;
27mod mcp;
28mod message_queue;
29mod microcompact;
30mod model_commands;
31mod persistence;
32#[cfg(feature = "scheduler")]
33mod plan;
34mod policy_commands;
35mod provider_cmd;
36#[cfg(feature = "self-check")]
37mod quality_hook;
38pub(crate) mod rate_limiter;
39#[cfg(feature = "scheduler")]
40mod scheduler_commands;
41#[cfg(feature = "scheduler")]
42mod scheduler_loop;
43pub mod session_config;
44mod session_digest;
45pub(crate) mod sidequest;
46mod skill_management;
47pub mod slash_commands;
48pub mod speculative;
49pub(crate) mod state;
50pub(crate) mod task_injection;
51pub(crate) mod tool_execution;
52pub(crate) mod tool_orchestrator;
53mod trust_commands;
54pub mod turn;
55mod utils;
56pub(crate) mod vigil;
57
58use std::collections::{HashMap, VecDeque};
59use std::fmt::Write as _;
60use std::sync::Arc;
61
62use parking_lot::RwLock;
63
64use tokio::sync::{mpsc, watch};
65use tokio_util::sync::CancellationToken;
66use zeph_llm::any::AnyProvider;
67use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
68use zeph_memory::TokenCounter;
69use zeph_memory::semantic::SemanticMemory;
70use zeph_skills::loader::Skill;
71use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
72use zeph_skills::prompt::format_skills_prompt;
73use zeph_skills::registry::SkillRegistry;
74use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
75
76use crate::channel::Channel;
77use crate::config::Config;
78use crate::context::{ContextBudget, build_system_prompt};
79use zeph_common::text::estimate_tokens;
80
81use loop_event::LoopEvent;
82use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, detect_image_mime};
83use state::CompressionState;
84use state::{
85 DebugState, ExperimentState, FeedbackState, IndexState, InstructionState, LifecycleState,
86 McpState, MemoryState, MessageState, MetricsState, OrchestrationState, ProviderState,
87 RuntimeConfig, SecurityState, SessionState, SkillState, ToolState,
88};
89
90pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
91pub(crate) const DOCUMENT_RAG_PREFIX: &str = "## Relevant documents\n";
92pub(crate) const RECALL_PREFIX: &str = "[semantic recall]\n";
93pub(crate) const CODE_CONTEXT_PREFIX: &str = "[code context]\n";
94pub(crate) const SUMMARY_PREFIX: &str = "[conversation summaries]\n";
95pub(crate) const CROSS_SESSION_PREFIX: &str = "[cross-session context]\n";
96pub(crate) const CORRECTIONS_PREFIX: &str = "[past corrections]\n";
97pub(crate) const GRAPH_FACTS_PREFIX: &str = "[known facts]\n";
98pub(crate) const SCHEDULED_TASK_PREFIX: &str = "Execute the following scheduled task now: ";
99pub(crate) const SESSION_DIGEST_PREFIX: &str = "[Session digest from previous interaction]\n";
100pub(crate) const LSP_NOTE_PREFIX: &str = "[lsp ";
105pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
106
107pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
108 use std::fmt::Write;
109 let capacity = "[tool output: ".len()
110 + tool_name.len()
111 + "]\n```\n".len()
112 + body.len()
113 + TOOL_OUTPUT_SUFFIX.len();
114 let mut buf = String::with_capacity(capacity);
115 let _ = write!(
116 buf,
117 "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
118 );
119 buf
120}
121
122pub struct Agent<C: Channel> {
154 provider: AnyProvider,
155 embedding_provider: AnyProvider,
160 channel: C,
161 pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
162 pub(super) msg: MessageState,
163 pub(super) memory_state: MemoryState,
164 pub(super) skill_state: SkillState,
165 pub(super) context_manager: context_manager::ContextManager,
166 pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
167 pub(super) learning_engine: learning_engine::LearningEngine,
168 pub(super) feedback: FeedbackState,
169 pub(super) runtime: RuntimeConfig,
170 pub(super) mcp: McpState,
171 pub(super) index: IndexState,
172 pub(super) session: SessionState,
173 pub(super) debug_state: DebugState,
174 pub(super) instructions: InstructionState,
175 pub(super) security: SecurityState,
176 pub(super) experiments: ExperimentState,
177 pub(super) compression: CompressionState,
178 pub(super) lifecycle: LifecycleState,
179 pub(super) providers: ProviderState,
180 pub(super) metrics: MetricsState,
181 pub(super) orchestration: OrchestrationState,
182 pub(super) focus: focus::FocusState,
184 pub(super) sidequest: sidequest::SidequestState,
186 pub(super) tool_state: ToolState,
188 #[cfg(feature = "self-check")]
190 pub(super) quality: Option<std::sync::Arc<crate::quality::SelfCheckPipeline>>,
191 pub(super) proactive_explorer:
195 Option<std::sync::Arc<zeph_skills::proactive::ProactiveExplorer>>,
196 pub(super) promotion_engine:
200 Option<std::sync::Arc<zeph_memory::compression::promotion::PromotionEngine>>,
201}
202
203enum DispatchFlow {
205 Break,
207 Continue,
209 Fallthrough,
211}
212
213impl<C: Channel> Agent<C> {
214 #[must_use]
238 pub fn new(
239 provider: AnyProvider,
240 channel: C,
241 registry: SkillRegistry,
242 matcher: Option<SkillMatcherBackend>,
243 max_active_skills: usize,
244 tool_executor: impl ToolExecutor + 'static,
245 ) -> Self {
246 let registry = Arc::new(RwLock::new(registry));
247 let embedding_provider = provider.clone();
248 Self::new_with_registry_arc(
249 provider,
250 embedding_provider,
251 channel,
252 registry,
253 matcher,
254 max_active_skills,
255 tool_executor,
256 )
257 }
258
259 #[must_use]
266 #[allow(clippy::too_many_lines)] pub fn new_with_registry_arc(
268 provider: AnyProvider,
269 embedding_provider: AnyProvider,
270 channel: C,
271 registry: Arc<RwLock<SkillRegistry>>,
272 matcher: Option<SkillMatcherBackend>,
273 max_active_skills: usize,
274 tool_executor: impl ToolExecutor + 'static,
275 ) -> Self {
276 debug_assert!(max_active_skills > 0, "max_active_skills must be > 0");
277 let all_skills: Vec<Skill> = {
278 let reg = registry.read();
279 reg.all_meta()
280 .iter()
281 .filter_map(|m| reg.get_skill(&m.name).ok())
282 .collect()
283 };
284 let empty_trust = HashMap::new();
285 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
286 let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
287 let system_prompt = build_system_prompt(&skills_prompt, None);
288 tracing::debug!(len = system_prompt.len(), "initial system prompt built");
289 tracing::trace!(prompt = %system_prompt, "full system prompt");
290
291 let initial_prompt_tokens = estimate_tokens(&system_prompt) as u64;
292 let token_counter = Arc::new(TokenCounter::new());
293 Self {
294 provider,
295 embedding_provider,
296 channel,
297 tool_executor: Arc::new(tool_executor),
298 msg: MessageState {
299 messages: vec![Message {
300 role: Role::System,
301 content: system_prompt,
302 parts: vec![],
303 metadata: MessageMetadata::default(),
304 }],
305 message_queue: VecDeque::new(),
306 pending_image_parts: Vec::new(),
307 last_persisted_message_id: None,
308 deferred_db_hide_ids: Vec::new(),
309 deferred_db_summaries: Vec::new(),
310 },
311 memory_state: MemoryState::default(),
312 skill_state: SkillState::new(registry, matcher, max_active_skills, skills_prompt),
313 context_manager: context_manager::ContextManager::new(),
314 tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
315 learning_engine: learning_engine::LearningEngine::new(),
316 feedback: FeedbackState::default(),
317 debug_state: DebugState::default(),
318 runtime: RuntimeConfig::default(),
319 mcp: McpState::default(),
320 index: IndexState::default(),
321 session: SessionState::new(),
322 instructions: InstructionState::default(),
323 security: SecurityState::default(),
324 experiments: ExperimentState::new(),
325 compression: CompressionState::default(),
326 lifecycle: LifecycleState::new(),
327 providers: ProviderState::new(initial_prompt_tokens),
328 metrics: MetricsState::new(token_counter),
329 orchestration: OrchestrationState::default(),
330 focus: focus::FocusState::default(),
331 sidequest: sidequest::SidequestState::default(),
332 tool_state: ToolState::default(),
333 #[cfg(feature = "self-check")]
334 quality: None,
335 proactive_explorer: None,
336 promotion_engine: None,
337 }
338 }
339
340 pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
345 let Some(mgr) = &mut self.orchestration.subagent_manager else {
346 return vec![];
347 };
348
349 let finished: Vec<String> = mgr
350 .statuses()
351 .into_iter()
352 .filter_map(|(id, status)| {
353 if matches!(
354 status.state,
355 zeph_subagent::SubAgentState::Completed
356 | zeph_subagent::SubAgentState::Failed
357 | zeph_subagent::SubAgentState::Canceled
358 ) {
359 Some(id)
360 } else {
361 None
362 }
363 })
364 .collect();
365
366 let mut results = vec![];
367 for task_id in finished {
368 match mgr.collect(&task_id).await {
369 Ok(result) => results.push((task_id, result)),
370 Err(e) => {
371 tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
372 }
373 }
374 }
375 results
376 }
377
378 async fn call_llm_for_session_summary(
387 &self,
388 chat_messages: &[Message],
389 ) -> Option<zeph_memory::StructuredSummary> {
390 let timeout_dur = std::time::Duration::from_secs(
391 self.memory_state.compaction.shutdown_summary_timeout_secs,
392 );
393 match tokio::time::timeout(
394 timeout_dur,
395 self.provider
396 .chat_typed_erased::<zeph_memory::StructuredSummary>(chat_messages),
397 )
398 .await
399 {
400 Ok(Ok(s)) => Some(s),
401 Ok(Err(e)) => {
402 tracing::warn!(
403 "shutdown summary: structured LLM call failed, falling back to plain: {e:#}"
404 );
405 self.plain_text_summary_fallback(chat_messages, timeout_dur)
406 .await
407 }
408 Err(_) => {
409 tracing::warn!(
410 "shutdown summary: structured LLM call timed out after {}s, falling back to plain",
411 self.memory_state.compaction.shutdown_summary_timeout_secs
412 );
413 self.plain_text_summary_fallback(chat_messages, timeout_dur)
414 .await
415 }
416 }
417 }
418
419 async fn plain_text_summary_fallback(
420 &self,
421 chat_messages: &[Message],
422 timeout_dur: std::time::Duration,
423 ) -> Option<zeph_memory::StructuredSummary> {
424 match tokio::time::timeout(timeout_dur, self.provider.chat(chat_messages)).await {
425 Ok(Ok(plain)) => Some(zeph_memory::StructuredSummary {
426 summary: plain,
427 key_facts: vec![],
428 entities: vec![],
429 }),
430 Ok(Err(e)) => {
431 tracing::warn!("shutdown summary: plain LLM fallback failed: {e:#}");
432 None
433 }
434 Err(_) => {
435 tracing::warn!("shutdown summary: plain LLM fallback timed out");
436 None
437 }
438 }
439 }
440
441 async fn flush_orphaned_tool_use_on_shutdown(&mut self) {
446 use zeph_llm::provider::{MessagePart, Role};
447
448 let msgs = &self.msg.messages;
452 let Some(asst_idx) = msgs.iter().rposition(|m| m.role == Role::Assistant) else {
454 return;
455 };
456 let asst_msg = &msgs[asst_idx];
457 let tool_use_ids: Vec<(&str, &str, &serde_json::Value)> = asst_msg
458 .parts
459 .iter()
460 .filter_map(|p| {
461 if let MessagePart::ToolUse { id, name, input } = p {
462 Some((id.as_str(), name.as_str(), input))
463 } else {
464 None
465 }
466 })
467 .collect();
468 if tool_use_ids.is_empty() {
469 return;
470 }
471
472 let paired_ids: std::collections::HashSet<&str> = msgs
474 .get(asst_idx + 1..)
475 .into_iter()
476 .flatten()
477 .filter(|m| m.role == Role::User)
478 .flat_map(|m| m.parts.iter())
479 .filter_map(|p| {
480 if let MessagePart::ToolResult { tool_use_id, .. } = p {
481 Some(tool_use_id.as_str())
482 } else {
483 None
484 }
485 })
486 .collect();
487
488 let unpaired: Vec<zeph_llm::provider::ToolUseRequest> = tool_use_ids
489 .iter()
490 .filter(|(id, _, _)| !paired_ids.contains(*id))
491 .map(|(id, name, input)| zeph_llm::provider::ToolUseRequest {
492 id: (*id).to_owned(),
493 name: (*name).to_owned().into(),
494 input: (*input).clone(),
495 })
496 .collect();
497
498 if unpaired.is_empty() {
499 return;
500 }
501
502 tracing::info!(
503 count = unpaired.len(),
504 "shutdown: persisting tombstone ToolResults for unpaired in-flight tool calls"
505 );
506 self.persist_cancelled_tool_results(&unpaired).await;
507 }
508
509 async fn maybe_store_shutdown_summary(&mut self) {
519 if !self.memory_state.compaction.shutdown_summary {
520 return;
521 }
522 let Some(memory) = self.memory_state.persistence.memory.clone() else {
523 return;
524 };
525 let Some(conversation_id) = self.memory_state.persistence.conversation_id else {
526 return;
527 };
528
529 match memory.has_session_summary(conversation_id).await {
531 Ok(true) => {
532 tracing::debug!("shutdown summary: session already has a summary, skipping");
533 return;
534 }
535 Ok(false) => {}
536 Err(e) => {
537 tracing::warn!("shutdown summary: failed to check existing summary: {e:#}");
538 return;
539 }
540 }
541
542 let user_count = self
544 .msg
545 .messages
546 .iter()
547 .skip(1)
548 .filter(|m| m.role == Role::User)
549 .count();
550 if user_count < self.memory_state.compaction.shutdown_summary_min_messages {
551 tracing::debug!(
552 user_count,
553 min = self.memory_state.compaction.shutdown_summary_min_messages,
554 "shutdown summary: too few user messages, skipping"
555 );
556 return;
557 }
558
559 let _ = self.channel.send_status("Saving session summary...").await;
561
562 let max = self.memory_state.compaction.shutdown_summary_max_messages;
564 if max == 0 {
565 tracing::debug!("shutdown summary: max_messages=0, skipping");
566 return;
567 }
568 let non_system: Vec<_> = self.msg.messages.iter().skip(1).collect();
569 let slice = if non_system.len() > max {
570 &non_system[non_system.len() - max..]
571 } else {
572 &non_system[..]
573 };
574
575 let msgs_for_prompt: Vec<(zeph_memory::MessageId, String, String)> = slice
576 .iter()
577 .map(|m| {
578 let role = match m.role {
579 Role::User => "user".to_owned(),
580 Role::Assistant => "assistant".to_owned(),
581 Role::System => "system".to_owned(),
582 };
583 (zeph_memory::MessageId(0), role, m.content.clone())
584 })
585 .collect();
586
587 let prompt = zeph_memory::build_summarization_prompt(&msgs_for_prompt);
588 let chat_messages = vec![Message {
589 role: Role::User,
590 content: prompt,
591 parts: vec![],
592 metadata: MessageMetadata::default(),
593 }];
594
595 let Some(structured) = self.call_llm_for_session_summary(&chat_messages).await else {
596 let _ = self.channel.send_status("").await;
597 return;
598 };
599
600 if let Err(e) = memory
601 .store_shutdown_summary(conversation_id, &structured.summary, &structured.key_facts)
602 .await
603 {
604 tracing::warn!("shutdown summary: storage failed: {e:#}");
605 } else {
606 tracing::info!(
607 conversation_id = conversation_id.0,
608 "shutdown summary stored"
609 );
610 }
611
612 let _ = self.channel.send_status("").await;
614 }
615
616 pub async fn shutdown(&mut self) {
632 let _ = self.channel.send_status("Shutting down...").await;
633
634 self.provider.save_router_state();
636
637 if let Some(ref advisor) = self.orchestration.topology_advisor
639 && let Err(e) = advisor.save()
640 {
641 tracing::warn!(error = %e, "adaptorch: failed to persist state");
642 }
643
644 if let Some(ref mut mgr) = self.orchestration.subagent_manager {
645 mgr.shutdown_all();
646 }
647
648 if let Some(ref manager) = self.mcp.manager {
649 manager.shutdown_all_shared().await;
650 }
651
652 if let Some(turns) = self.context_manager.turns_since_last_hard_compaction {
656 self.update_metrics(|m| {
657 m.compaction_turns_after_hard.push(turns);
658 });
659 self.context_manager.turns_since_last_hard_compaction = None;
660 }
661
662 if let Some(ref tx) = self.metrics.metrics_tx {
663 let m = tx.borrow();
664 if m.filter_applications > 0 {
665 #[allow(clippy::cast_precision_loss)]
666 let pct = if m.filter_raw_tokens > 0 {
667 m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
668 } else {
669 0.0
670 };
671 tracing::info!(
672 raw_tokens = m.filter_raw_tokens,
673 saved_tokens = m.filter_saved_tokens,
674 applications = m.filter_applications,
675 "tool output filtering saved ~{} tokens ({pct:.0}%)",
676 m.filter_saved_tokens,
677 );
678 }
679 if m.compaction_hard_count > 0 {
680 tracing::info!(
681 hard_compactions = m.compaction_hard_count,
682 turns_after_hard = ?m.compaction_turns_after_hard,
683 "hard compaction trajectory"
684 );
685 }
686 }
687
688 self.flush_orphaned_tool_use_on_shutdown().await;
692
693 self.lifecycle.supervisor.abort_all();
694
695 self.maybe_store_shutdown_summary().await;
696 self.maybe_store_session_digest().await;
697
698 tracing::info!("agent shutdown complete");
699 }
700
701 fn refresh_subagent_metrics(&mut self) {
708 let Some(ref mgr) = self.orchestration.subagent_manager else {
709 return;
710 };
711 let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
712 .statuses()
713 .into_iter()
714 .map(|(id, s)| {
715 let def = mgr.agents_def(&id);
716 crate::metrics::SubAgentMetrics {
717 name: def.map_or_else(|| id[..8.min(id.len())].to_owned(), |d| d.name.clone()),
718 id: id.clone(),
719 state: format!("{:?}", s.state).to_lowercase(),
720 turns_used: s.turns_used,
721 max_turns: def.map_or(20, |d| d.permissions.max_turns),
722 background: def.is_some_and(|d| d.permissions.background),
723 elapsed_secs: s.started_at.elapsed().as_secs(),
724 permission_mode: def.map_or_else(String::new, |d| {
725 use zeph_subagent::def::PermissionMode;
726 match d.permissions.permission_mode {
727 PermissionMode::Default => String::new(),
728 PermissionMode::AcceptEdits => "accept_edits".into(),
729 PermissionMode::DontAsk => "dont_ask".into(),
730 PermissionMode::BypassPermissions => "bypass_permissions".into(),
731 PermissionMode::Plan => "plan".into(),
732 }
733 }),
734 transcript_dir: mgr
735 .agent_transcript_dir(&id)
736 .map(|p| p.to_string_lossy().into_owned()),
737 }
738 })
739 .collect();
740 self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
741 }
742
743 async fn notify_completed_subagents(&mut self) -> Result<(), error::AgentError> {
745 let completed = self.poll_subagents().await;
746 for (task_id, result) in completed {
747 let notice = if result.is_empty() {
748 format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
749 } else {
750 format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
751 };
752 if let Err(e) = self.channel.send(¬ice).await {
753 tracing::warn!(error = %e, "failed to send sub-agent completion notice");
754 }
755 }
756 Ok(())
757 }
758
759 #[allow(clippy::too_many_lines)] pub async fn run(&mut self) -> Result<(), error::AgentError>
766 where
767 C: 'static,
768 {
769 if let Some(mut rx) = self.lifecycle.warmup_ready.take()
770 && !*rx.borrow()
771 {
772 let _ = rx.changed().await;
773 if !*rx.borrow() {
774 tracing::warn!("model warmup did not complete successfully");
775 }
776 }
777
778 self.restore_channel_provider().await;
780
781 self.load_and_cache_session_digest().await;
783 self.maybe_send_resume_recap().await;
784
785 loop {
786 self.apply_provider_override();
787 self.check_tool_refresh().await;
788 self.process_pending_elicitations().await;
789 self.refresh_subagent_metrics();
790 self.notify_completed_subagents().await?;
791 self.drain_channel();
792
793 let (text, image_parts) = if let Some(queued) = self.msg.message_queue.pop_front() {
794 self.notify_queue_count().await;
795 if queued.raw_attachments.is_empty() {
796 (queued.text, queued.image_parts)
797 } else {
798 let msg = crate::channel::ChannelMessage {
799 text: queued.text,
800 attachments: queued.raw_attachments,
801 };
802 self.resolve_message(msg).await
803 }
804 } else {
805 match self.next_event().await? {
806 None | Some(LoopEvent::Shutdown) => break,
807 Some(LoopEvent::SkillReload) => {
808 self.reload_skills().await;
809 continue;
810 }
811 Some(LoopEvent::InstructionReload) => {
812 self.reload_instructions();
813 continue;
814 }
815 Some(LoopEvent::ConfigReload) => {
816 self.reload_config();
817 continue;
818 }
819 Some(LoopEvent::UpdateNotification(msg)) => {
820 if let Err(e) = self.channel.send(&msg).await {
821 tracing::warn!("failed to send update notification: {e}");
822 }
823 continue;
824 }
825 Some(LoopEvent::ExperimentCompleted(msg)) => {
826 self.experiments.cancel = None;
827 if let Err(e) = self.channel.send(&msg).await {
828 tracing::warn!("failed to send experiment completion: {e}");
829 }
830 continue;
831 }
832 Some(LoopEvent::ScheduledTask(prompt)) => {
833 let text = format!("{SCHEDULED_TASK_PREFIX}{prompt}");
834 let msg = crate::channel::ChannelMessage {
835 text,
836 attachments: Vec::new(),
837 };
838 self.drain_channel();
839 self.resolve_message(msg).await
840 }
841 Some(LoopEvent::TaskInjected(injection)) => {
842 if let Some(ref mut ls) = self.lifecycle.user_loop {
843 ls.iteration += 1;
844 tracing::info!(iteration = ls.iteration, "loop: tick");
845 }
846 let msg = crate::channel::ChannelMessage {
847 text: injection.prompt,
848 attachments: Vec::new(),
849 };
850 self.drain_channel();
851 self.resolve_message(msg).await
852 }
853 Some(LoopEvent::FileChanged(event)) => {
854 self.handle_file_changed(event).await;
855 continue;
856 }
857 Some(LoopEvent::Message(msg)) => {
858 self.drain_channel();
859 self.resolve_message(msg).await
860 }
861 }
862 };
863
864 let trimmed = text.trim();
865
866 if trimmed.starts_with('/') {
869 let slash_urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
870 if !slash_urls.is_empty() {
871 self.security.user_provided_urls.write().extend(slash_urls);
872 }
873 }
874
875 let session_impl = command_context_impls::SessionAccessImpl {
880 supports_exit: self.channel.supports_exit(),
881 };
882 let mut messages_impl = command_context_impls::MessageAccessImpl {
883 msg: &mut self.msg,
884 tool_state: &mut self.tool_state,
885 providers: &mut self.providers,
886 metrics: &self.metrics,
887 security: &mut self.security,
888 tool_orchestrator: &mut self.tool_orchestrator,
889 };
890 let mut sink_adapter = crate::channel::ChannelSinkAdapter(&mut self.channel);
892 let mut null_agent = zeph_commands::NullAgent;
894 let registry_handled = {
895 use zeph_commands::CommandRegistry;
896 use zeph_commands::handlers::debug::{
897 DebugDumpCommand, DumpFormatCommand, LogCommand,
898 };
899 use zeph_commands::handlers::help::HelpCommand;
900 use zeph_commands::handlers::session::{
901 ClearCommand, ClearQueueCommand, ExitCommand, QuitCommand, ResetCommand,
902 };
903
904 let mut reg = CommandRegistry::new();
905 reg.register(ExitCommand);
906 reg.register(QuitCommand);
907 reg.register(ClearCommand);
908 reg.register(ResetCommand);
909 reg.register(ClearQueueCommand);
910 reg.register(LogCommand);
911 reg.register(DebugDumpCommand);
912 reg.register(DumpFormatCommand);
913 reg.register(HelpCommand);
914 #[cfg(test)]
915 reg.register(test_stubs::TestErrorCommand);
916
917 let mut ctx = zeph_commands::CommandContext {
918 sink: &mut sink_adapter,
919 debug: &mut self.debug_state,
920 messages: &mut messages_impl,
921 session: &session_impl,
922 agent: &mut null_agent,
923 };
924 reg.dispatch(&mut ctx, trimmed).await
925 };
926 let session_reg_missed = registry_handled.is_none();
927 match self
928 .apply_dispatch_result(registry_handled, trimmed, false)
929 .await
930 {
931 DispatchFlow::Break => break,
932 DispatchFlow::Continue => continue,
933 DispatchFlow::Fallthrough => {
934 }
936 }
937
938 let mut agent_null_debug = command_context_impls::NullDebugAccess;
944 let mut agent_null_messages = command_context_impls::NullMessageAccess;
945 let agent_null_session = command_context_impls::NullSessionAccess;
946 let mut agent_null_sink = zeph_commands::NullSink;
947 let agent_result: Option<
948 Result<zeph_commands::CommandOutput, zeph_commands::CommandError>,
949 > = if session_reg_missed {
950 use zeph_commands::CommandRegistry;
951 use zeph_commands::handlers::{
952 acp::AcpCommand,
953 agent_cmd::AgentCommand,
954 compaction::{CompactCommand, NewConversationCommand, RecapCommand},
955 experiment::ExperimentCommand,
956 loop_cmd::LoopCommand,
957 lsp::LspCommand,
958 mcp::McpCommand,
959 memory::{GraphCommand, GuidelinesCommand, MemoryCommand},
960 misc::{CacheStatsCommand, ImageCommand, NotifyTestCommand},
961 model::{ModelCommand, ProviderCommand},
962 plan::PlanCommand,
963 plugins::PluginsCommand,
964 policy::PolicyCommand,
965 scheduler::SchedulerCommand,
966 skill::{FeedbackCommand, SkillCommand, SkillsCommand},
967 status::{FocusCommand, GuardrailCommand, SideQuestCommand, StatusCommand},
968 };
969
970 let mut agent_reg = CommandRegistry::new();
971 agent_reg.register(MemoryCommand);
972 agent_reg.register(GraphCommand);
973 agent_reg.register(GuidelinesCommand);
974 agent_reg.register(ModelCommand);
975 agent_reg.register(ProviderCommand);
976 agent_reg.register(SkillCommand);
978 agent_reg.register(SkillsCommand);
979 agent_reg.register(FeedbackCommand);
980 agent_reg.register(McpCommand);
981 agent_reg.register(PolicyCommand);
982 agent_reg.register(SchedulerCommand);
983 agent_reg.register(LspCommand);
984 agent_reg.register(CacheStatsCommand);
986 agent_reg.register(ImageCommand);
987 agent_reg.register(NotifyTestCommand);
988 agent_reg.register(StatusCommand);
989 agent_reg.register(GuardrailCommand);
990 agent_reg.register(FocusCommand);
991 agent_reg.register(SideQuestCommand);
992 agent_reg.register(AgentCommand);
993 agent_reg.register(CompactCommand);
995 agent_reg.register(NewConversationCommand);
996 agent_reg.register(RecapCommand);
997 agent_reg.register(ExperimentCommand);
998 agent_reg.register(PlanCommand);
999 agent_reg.register(LoopCommand);
1000 agent_reg.register(PluginsCommand);
1001 agent_reg.register(AcpCommand);
1002
1003 let mut ctx = zeph_commands::CommandContext {
1004 sink: &mut agent_null_sink,
1005 debug: &mut agent_null_debug,
1006 messages: &mut agent_null_messages,
1007 session: &agent_null_session,
1008 agent: self,
1009 };
1010 agent_reg.dispatch(&mut ctx, trimmed).await
1012 } else {
1013 None
1014 };
1015 match self
1019 .apply_dispatch_result(agent_result, trimmed, true)
1020 .await
1021 {
1022 DispatchFlow::Break => break,
1023 DispatchFlow::Continue => continue,
1024 DispatchFlow::Fallthrough => {
1025 }
1027 }
1028
1029 match self.handle_builtin_command(trimmed) {
1030 Some(true) => break,
1031 Some(false) => continue,
1032 None => {}
1033 }
1034
1035 self.process_user_message(text, image_parts).await?;
1036 }
1037
1038 self.maybe_autodream().await;
1041
1042 if let Some(ref mut tc) = self.debug_state.trace_collector {
1044 tc.finish();
1045 }
1046
1047 Ok(())
1048 }
1049
1050 async fn apply_dispatch_result(
1056 &mut self,
1057 result: Option<Result<zeph_commands::CommandOutput, zeph_commands::CommandError>>,
1058 command: &str,
1059 with_learning: bool,
1060 ) -> DispatchFlow {
1061 match result {
1062 Some(Ok(zeph_commands::CommandOutput::Exit)) => {
1063 let _ = self.channel.flush_chunks().await;
1064 DispatchFlow::Break
1065 }
1066 Some(Ok(
1067 zeph_commands::CommandOutput::Continue | zeph_commands::CommandOutput::Silent,
1068 )) => {
1069 let _ = self.channel.flush_chunks().await;
1070 DispatchFlow::Continue
1071 }
1072 Some(Ok(zeph_commands::CommandOutput::Message(msg))) => {
1073 let _ = self.channel.send(&msg).await;
1074 let _ = self.channel.flush_chunks().await;
1075 if with_learning {
1076 self.maybe_trigger_post_command_learning(command).await;
1077 }
1078 DispatchFlow::Continue
1079 }
1080 Some(Err(e)) => {
1081 let _ = self.channel.send(&e.to_string()).await;
1082 let _ = self.channel.flush_chunks().await;
1083 tracing::warn!(command = %command, error = %e.0, "slash command failed");
1084 DispatchFlow::Continue
1085 }
1086 None => DispatchFlow::Fallthrough,
1087 }
1088 }
1089
1090 fn apply_provider_override(&mut self) {
1092 if let Some(ref slot) = self.providers.provider_override
1093 && let Some(new_provider) = slot.write().take()
1094 {
1095 tracing::debug!(provider = new_provider.name(), "ACP model override applied");
1096 self.provider = new_provider;
1097 }
1098 }
1099
1100 async fn next_event(&mut self) -> Result<Option<LoopEvent>, error::AgentError> {
1108 let event = tokio::select! {
1109 result = self.channel.recv() => {
1110 return Ok(result?.map(LoopEvent::Message));
1111 }
1112 () = shutdown_signal(&mut self.lifecycle.shutdown) => {
1113 tracing::info!("shutting down");
1114 LoopEvent::Shutdown
1115 }
1116 Some(_) = recv_optional(&mut self.skill_state.skill_reload_rx) => {
1117 LoopEvent::SkillReload
1118 }
1119 Some(_) = recv_optional(&mut self.instructions.reload_rx) => {
1120 LoopEvent::InstructionReload
1121 }
1122 Some(_) = recv_optional(&mut self.lifecycle.config_reload_rx) => {
1123 LoopEvent::ConfigReload
1124 }
1125 Some(msg) = recv_optional(&mut self.lifecycle.update_notify_rx) => {
1126 LoopEvent::UpdateNotification(msg)
1127 }
1128 Some(msg) = recv_optional(&mut self.experiments.notify_rx) => {
1129 LoopEvent::ExperimentCompleted(msg)
1130 }
1131 Some(prompt) = recv_optional(&mut self.lifecycle.custom_task_rx) => {
1132 tracing::info!("scheduler: injecting custom task as agent turn");
1133 LoopEvent::ScheduledTask(prompt)
1134 }
1135 () = async {
1136 if let Some(ref mut ls) = self.lifecycle.user_loop {
1137 if ls.cancel_tx.is_cancelled() {
1138 std::future::pending::<()>().await;
1139 } else {
1140 ls.interval.tick().await;
1141 }
1142 } else {
1143 std::future::pending::<()>().await;
1144 }
1145 } => {
1146 let Some(ls) = self.lifecycle.user_loop.as_ref() else {
1150 return Ok(None);
1151 };
1152 if ls.cancel_tx.is_cancelled() {
1153 self.lifecycle.user_loop = None;
1154 return Ok(None);
1155 }
1156 let prompt = ls.prompt.clone();
1157 LoopEvent::TaskInjected(task_injection::TaskInjection { prompt })
1158 }
1159 Some(event) = recv_optional(&mut self.lifecycle.file_changed_rx) => {
1160 LoopEvent::FileChanged(event)
1161 }
1162 };
1163 Ok(Some(event))
1164 }
1165
1166 async fn resolve_message(
1167 &self,
1168 msg: crate::channel::ChannelMessage,
1169 ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
1170 use crate::channel::{Attachment, AttachmentKind};
1171 use zeph_llm::provider::{ImageData, MessagePart};
1172
1173 let text_base = msg.text.clone();
1174
1175 let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
1176 .attachments
1177 .into_iter()
1178 .partition(|a| a.kind == AttachmentKind::Audio);
1179
1180 tracing::debug!(
1181 audio = audio_attachments.len(),
1182 has_stt = self.providers.stt.is_some(),
1183 "resolve_message attachments"
1184 );
1185
1186 let text = if !audio_attachments.is_empty()
1187 && let Some(stt) = self.providers.stt.as_ref()
1188 {
1189 let mut transcribed_parts = Vec::new();
1190 for attachment in &audio_attachments {
1191 if attachment.data.len() > MAX_AUDIO_BYTES {
1192 tracing::warn!(
1193 size = attachment.data.len(),
1194 max = MAX_AUDIO_BYTES,
1195 "audio attachment exceeds size limit, skipping"
1196 );
1197 continue;
1198 }
1199 match stt
1200 .transcribe(&attachment.data, attachment.filename.as_deref())
1201 .await
1202 {
1203 Ok(result) => {
1204 tracing::info!(
1205 len = result.text.len(),
1206 language = ?result.language,
1207 "audio transcribed"
1208 );
1209 transcribed_parts.push(result.text);
1210 }
1211 Err(e) => {
1212 tracing::error!(error = %e, "audio transcription failed");
1213 }
1214 }
1215 }
1216 if transcribed_parts.is_empty() {
1217 text_base
1218 } else {
1219 let transcribed = transcribed_parts.join("\n");
1220 if text_base.is_empty() {
1221 transcribed
1222 } else {
1223 format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
1224 }
1225 }
1226 } else {
1227 if !audio_attachments.is_empty() {
1228 tracing::warn!(
1229 count = audio_attachments.len(),
1230 "audio attachments received but no STT provider configured, dropping"
1231 );
1232 }
1233 text_base
1234 };
1235
1236 let mut image_parts = Vec::new();
1237 for attachment in image_attachments {
1238 if attachment.data.len() > MAX_IMAGE_BYTES {
1239 tracing::warn!(
1240 size = attachment.data.len(),
1241 max = MAX_IMAGE_BYTES,
1242 "image attachment exceeds size limit, skipping"
1243 );
1244 continue;
1245 }
1246 let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
1247 image_parts.push(MessagePart::Image(Box::new(ImageData {
1248 data: attachment.data,
1249 mime_type,
1250 })));
1251 }
1252
1253 (text, image_parts)
1254 }
1255
1256 fn begin_turn(&mut self, input: turn::TurnInput) -> turn::Turn {
1263 let id = turn::TurnId(self.debug_state.iteration_counter as u64);
1264 self.debug_state.iteration_counter += 1;
1265 self.lifecycle.cancel_token = CancellationToken::new();
1266 self.security.user_provided_urls.write().clear();
1267 self.lifecycle.turn_llm_requests = 0;
1269 turn::Turn::new(id, input)
1270 }
1271
1272 fn end_turn(&mut self, turn: turn::Turn) {
1279 self.metrics.pending_timings = turn.metrics.timings;
1280 self.flush_turn_timings();
1281 self.session.current_turn_intent = None;
1283 }
1284
1285 #[cfg_attr(
1286 feature = "profiling",
1287 tracing::instrument(name = "agent.turn", skip_all, fields(turn_id))
1288 )]
1289 async fn process_user_message(
1290 &mut self,
1291 text: String,
1292 image_parts: Vec<zeph_llm::provider::MessagePart>,
1293 ) -> Result<(), error::AgentError> {
1294 let input = turn::TurnInput::new(text, image_parts);
1295 let mut t = self.begin_turn(input);
1296
1297 let turn_idx = usize::try_from(t.id().0).unwrap_or(usize::MAX);
1298 tracing::Span::current().record("turn_id", t.id().0);
1299 self.debug_state
1301 .start_iteration_span(turn_idx, t.input.text.trim());
1302
1303 let result = self.process_user_message_inner(&mut t).await;
1304
1305 let span_status = if result.is_ok() {
1307 crate::debug_dump::trace::SpanStatus::Ok
1308 } else {
1309 crate::debug_dump::trace::SpanStatus::Error {
1310 message: "iteration failed".to_owned(),
1311 }
1312 };
1313 self.debug_state.end_iteration_span(turn_idx, span_status);
1314
1315 self.end_turn(t);
1316 result
1317 }
1318
1319 async fn process_user_message_inner(
1320 &mut self,
1321 turn: &mut turn::Turn,
1322 ) -> Result<(), error::AgentError> {
1323 self.reap_background_tasks_and_update_metrics();
1324
1325 self.drain_background_completions();
1329
1330 let signal = Arc::clone(&self.lifecycle.cancel_signal);
1334 let token = turn.cancel_token.clone();
1335 self.lifecycle.cancel_token = turn.cancel_token.clone();
1337 if let Some(prev) = self.lifecycle.cancel_bridge_handle.take() {
1338 prev.abort();
1339 }
1340 self.lifecycle.cancel_bridge_handle = Some(tokio::spawn(async move {
1341 signal.notified().await;
1342 token.cancel();
1343 }));
1344
1345 let text = turn.input.text.clone();
1347 let trimmed_owned = text.trim().to_owned();
1348 let trimmed = trimmed_owned.as_str();
1349
1350 if self.security.vigil.is_some() {
1353 let intent_len = trimmed.floor_char_boundary(1024.min(trimmed.len()));
1354 self.session.current_turn_intent = Some(trimmed[..intent_len].to_owned());
1355 }
1356
1357 if let Some(result) = self.dispatch_slash_command(trimmed).await {
1358 return result;
1359 }
1360
1361 self.check_pending_rollbacks().await;
1362
1363 if self.pre_process_security(trimmed).await? {
1364 return Ok(());
1365 }
1366
1367 let t_ctx = std::time::Instant::now();
1368 tracing::debug!("turn timing: prepare_context start");
1369 self.advance_context_lifecycle_guarded(&text, trimmed).await;
1370 turn.metrics_mut().timings.prepare_context_ms =
1371 u64::try_from(t_ctx.elapsed().as_millis()).unwrap_or(u64::MAX);
1372 tracing::debug!(
1373 ms = turn.metrics_snapshot().timings.prepare_context_ms,
1374 "turn timing: prepare_context done"
1375 );
1376
1377 let image_parts = std::mem::take(&mut turn.input.image_parts);
1378 let merged_text = self.build_user_message_text_with_bg_completions(&text);
1382 let user_msg = self.build_user_message(&merged_text, image_parts);
1383
1384 let urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
1387 if !urls.is_empty() {
1388 self.security.user_provided_urls.write().extend(urls);
1389 }
1390
1391 self.memory_state.extraction.goal_text = Some(text.clone());
1394
1395 let t_persist = std::time::Instant::now();
1396 tracing::debug!("turn timing: persist_message(user) start");
1397 self.persist_message(Role::User, &text, &[], false).await;
1399 turn.metrics_mut().timings.persist_message_ms =
1400 u64::try_from(t_persist.elapsed().as_millis()).unwrap_or(u64::MAX);
1401 tracing::debug!(
1402 ms = turn.metrics_snapshot().timings.persist_message_ms,
1403 "turn timing: persist_message(user) done"
1404 );
1405 self.push_message(user_msg);
1406
1407 tracing::debug!("turn timing: process_response start");
1410 let turn_had_error = if let Err(e) = self.process_response().await {
1411 self.learning_engine.learning_tasks.detach_all();
1413 tracing::error!("Response processing failed: {e:#}");
1414
1415 if e.is_no_providers() {
1418 self.lifecycle.last_no_providers_at = Some(std::time::Instant::now());
1419 let backoff_secs = self.runtime.timeouts.no_providers_backoff_secs;
1420 tracing::warn!(
1421 backoff_secs,
1422 "no providers available; backing off before next turn"
1423 );
1424 tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await;
1425 }
1426
1427 let user_msg = format!("Error: {e:#}");
1428 self.channel.send(&user_msg).await?;
1429 self.msg.messages.pop();
1430 self.recompute_prompt_tokens();
1431 self.channel.flush_chunks().await?;
1432 true
1433 } else {
1434 self.learning_engine.learning_tasks.detach_all();
1437 self.truncate_old_tool_results();
1438 self.maybe_update_magic_docs();
1440 self.maybe_spawn_promotion_scan();
1442 false
1443 };
1444 tracing::debug!("turn timing: process_response done");
1445
1446 #[cfg(feature = "self-check")]
1448 if let Some(pipeline) = self.quality.clone() {
1449 self.run_self_check_for_turn(pipeline, turn.id().0).await;
1450 }
1451 let _ = self.channel.flush_chunks().await;
1456
1457 self.maybe_fire_completion_notification(turn, turn_had_error);
1458
1459 turn.metrics_mut().timings.llm_chat_ms = self.metrics.pending_timings.llm_chat_ms;
1464 turn.metrics_mut().timings.tool_exec_ms = self.metrics.pending_timings.tool_exec_ms;
1465
1466 Ok(())
1467 }
1468
1469 fn reap_background_tasks_and_update_metrics(&mut self) {
1473 let bg_signal = self.lifecycle.supervisor.reap();
1474 if bg_signal.did_summarize {
1475 self.memory_state.persistence.unsummarized_count = 0;
1476 tracing::debug!("background summarization completed; unsummarized_count reset");
1477 }
1478 let snap = self.lifecycle.supervisor.metrics_snapshot();
1479 self.update_metrics(|m| {
1480 m.bg_inflight = snap.inflight as u64;
1481 m.bg_dropped = snap.total_dropped();
1482 m.bg_completed = snap.total_completed();
1483 m.bg_enrichment_inflight = snap.class_inflight[0] as u64;
1484 m.bg_telemetry_inflight = snap.class_inflight[1] as u64;
1485 });
1486 if self.runtime.supervisor_config.abort_enrichment_on_turn {
1489 self.lifecycle
1490 .supervisor
1491 .abort_class(agent_supervisor::TaskClass::Enrichment);
1492 }
1493 }
1494
1495 fn maybe_fire_completion_notification(&mut self, turn: &turn::Turn, is_error: bool) {
1508 let snap = turn.metrics_snapshot().timings.clone();
1509 let duration_ms = snap
1510 .prepare_context_ms
1511 .saturating_add(snap.llm_chat_ms)
1512 .saturating_add(snap.tool_exec_ms);
1513 let summary = crate::notifications::TurnSummary {
1514 duration_ms,
1515 preview: self.last_assistant_preview(160),
1516 tool_calls: 0,
1518 llm_requests: self.lifecycle.turn_llm_requests,
1519 exit_status: if is_error {
1520 crate::notifications::TurnExitStatus::Error
1521 } else {
1522 crate::notifications::TurnExitStatus::Success
1523 },
1524 };
1525
1526 let gate_ok = self
1528 .lifecycle
1529 .notifier
1530 .as_ref()
1531 .is_none_or(|n| n.should_fire(&summary));
1532
1533 if let Some(ref notifier) = self.lifecycle.notifier
1535 && gate_ok
1536 {
1537 notifier.fire(&summary);
1538 }
1539
1540 let hooks = self.session.hooks_config.turn_complete.clone();
1546 if !hooks.is_empty() && gate_ok {
1547 let mut env = std::collections::HashMap::new();
1548 env.insert(
1549 "ZEPH_TURN_DURATION_MS".to_owned(),
1550 summary.duration_ms.to_string(),
1551 );
1552 env.insert(
1553 "ZEPH_TURN_STATUS".to_owned(),
1554 if is_error { "error" } else { "success" }.to_owned(),
1555 );
1556 env.insert("ZEPH_TURN_PREVIEW".to_owned(), summary.preview.clone());
1557 env.insert(
1558 "ZEPH_TURN_LLM_REQUESTS".to_owned(),
1559 summary.llm_requests.to_string(),
1560 );
1561 let _span = tracing::info_span!("core.agent.turn_hooks").entered();
1562 let _accepted = self.lifecycle.supervisor.spawn(
1563 agent_supervisor::TaskClass::Telemetry,
1564 "turn-complete-hooks",
1565 async move {
1566 let no_mcp: Option<&'static dyn zeph_subagent::McpDispatch> = None;
1569 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, no_mcp).await {
1570 tracing::warn!(error = %e, "turn_complete hook failed");
1571 }
1572 },
1573 );
1574 }
1575 }
1576
1577 #[cfg_attr(
1579 feature = "profiling",
1580 tracing::instrument(name = "agent.security_prescreen", skip_all)
1581 )]
1582 async fn pre_process_security(&mut self, trimmed: &str) -> Result<bool, error::AgentError> {
1583 if let Some(ref guardrail) = self.security.guardrail {
1585 use zeph_sanitizer::guardrail::GuardrailVerdict;
1586 let verdict = guardrail.check(trimmed).await;
1587 match &verdict {
1588 GuardrailVerdict::Flagged { reason, .. } => {
1589 tracing::warn!(
1590 reason = %reason,
1591 should_block = verdict.should_block(),
1592 "guardrail flagged user input"
1593 );
1594 if verdict.should_block() {
1595 let msg = format!("[guardrail] Input blocked: {reason}");
1596 let _ = self.channel.send(&msg).await;
1597 let _ = self.channel.flush_chunks().await;
1598 return Ok(true);
1599 }
1600 let _ = self
1602 .channel
1603 .send(&format!("[guardrail] Warning: {reason}"))
1604 .await;
1605 }
1606 GuardrailVerdict::Error { error } => {
1607 if guardrail.error_should_block() {
1608 tracing::warn!(%error, "guardrail check failed (fail_strategy=closed), blocking input");
1609 let msg = "[guardrail] Input blocked: check failed (see logs for details)";
1610 let _ = self.channel.send(msg).await;
1611 let _ = self.channel.flush_chunks().await;
1612 return Ok(true);
1613 }
1614 tracing::warn!(%error, "guardrail check failed (fail_strategy=open), allowing input");
1615 }
1616 GuardrailVerdict::Safe => {}
1617 }
1618 }
1619
1620 #[cfg(feature = "classifiers")]
1626 if self.security.sanitizer.scan_user_input() {
1627 match self.security.sanitizer.classify_injection(trimmed).await {
1628 zeph_sanitizer::InjectionVerdict::Blocked => {
1629 self.push_classifier_metrics();
1630 let _ = self
1631 .channel
1632 .send("[security] Input blocked: injection detected by classifier.")
1633 .await;
1634 let _ = self.channel.flush_chunks().await;
1635 return Ok(true);
1636 }
1637 zeph_sanitizer::InjectionVerdict::Suspicious => {
1638 tracing::warn!("injection_classifier soft_signal on user input");
1639 }
1640 zeph_sanitizer::InjectionVerdict::Clean => {}
1641 }
1642 }
1643 #[cfg(feature = "classifiers")]
1644 self.push_classifier_metrics();
1645
1646 Ok(false)
1647 }
1648
1649 async fn advance_context_lifecycle_guarded(&mut self, text: &str, trimmed: &str) {
1656 let backoff_secs = self.runtime.timeouts.no_providers_backoff_secs;
1657 let prep_timeout_secs = self.runtime.timeouts.context_prep_timeout_secs;
1658
1659 let providers_recently_failed = self
1661 .lifecycle
1662 .last_no_providers_at
1663 .is_some_and(|t| t.elapsed().as_secs() < backoff_secs);
1664
1665 if providers_recently_failed {
1666 tracing::warn!(
1667 backoff_secs,
1668 "skipping context preparation: providers were unavailable on last turn"
1669 );
1670 return;
1671 }
1672
1673 let timeout_dur = std::time::Duration::from_secs(prep_timeout_secs);
1674 match tokio::time::timeout(timeout_dur, self.advance_context_lifecycle(text, trimmed)).await
1675 {
1676 Ok(()) => {}
1677 Err(_elapsed) => {
1678 tracing::warn!(
1679 timeout_secs = prep_timeout_secs,
1680 "context preparation timed out; proceeding with degraded context"
1681 );
1682 }
1683 }
1684 }
1685
1686 #[cfg_attr(
1687 feature = "profiling",
1688 tracing::instrument(name = "agent.prepare_context", skip_all)
1689 )]
1690 async fn advance_context_lifecycle(&mut self, text: &str, trimmed: &str) {
1691 self.mcp.pruning_cache.reset();
1693
1694 let conv_id = self.memory_state.persistence.conversation_id;
1697 self.rebuild_system_prompt(text).await;
1698
1699 self.detect_and_record_corrections(trimmed, conv_id).await;
1700 self.learning_engine.tick();
1701 self.analyze_and_learn().await;
1702 self.sync_graph_counts().await;
1703
1704 self.context_manager.compaction = self.context_manager.compaction.advance_turn();
1709
1710 {
1712 self.focus.tick();
1713
1714 let sidequest_should_fire = self.sidequest.tick();
1717 if sidequest_should_fire && !self.context_manager.compaction.is_compacted_this_turn() {
1718 self.maybe_sidequest_eviction();
1719 }
1720 }
1721
1722 {
1725 let cfg = &self.memory_state.extraction.graph_config.experience;
1726 if cfg.enabled
1727 && cfg.evolution_sweep_enabled
1728 && cfg.evolution_sweep_interval > 0
1729 && self
1730 .sidequest
1731 .turn_counter
1732 .checked_rem(cfg.evolution_sweep_interval as u64)
1733 == Some(0)
1734 && let Some(memory) = self.memory_state.persistence.memory.as_ref()
1735 && let (Some(exp), Some(graph)) =
1736 (memory.experience.as_ref(), memory.graph_store.as_ref())
1737 {
1738 let exp = std::sync::Arc::clone(exp);
1739 let graph = std::sync::Arc::clone(graph);
1740 let threshold = cfg.confidence_prune_threshold;
1741 let turn = self.sidequest.turn_counter;
1742 let accepted = self.lifecycle.supervisor.spawn(
1743 agent_supervisor::TaskClass::Telemetry,
1744 "experience-sweep",
1745 async move {
1746 match exp.evolution_sweep(graph.as_ref(), threshold).await {
1747 Ok(stats) => tracing::info!(
1748 turn,
1749 self_loops = stats.pruned_self_loops,
1750 low_confidence = stats.pruned_low_confidence,
1751 "evolution sweep complete",
1752 ),
1753 Err(e) => tracing::warn!(
1754 turn,
1755 error = %e,
1756 "evolution sweep failed",
1757 ),
1758 }
1759 },
1760 );
1761 if !accepted {
1762 tracing::warn!(
1763 turn = self.sidequest.turn_counter,
1764 "experience-sweep dropped (telemetry class at capacity)",
1765 );
1766 }
1767 }
1768 }
1769
1770 if let Some(warning) = self.cache_expiry_warning() {
1772 tracing::info!(warning, "cache expiry warning");
1773 let _ = self.channel.send_status(&warning).await;
1774 }
1775
1776 self.maybe_time_based_microcompact();
1779
1780 self.maybe_apply_deferred_summaries();
1785 self.flush_deferred_summaries().await;
1786
1787 if let Err(e) = self.maybe_proactive_compress().await {
1789 tracing::warn!("proactive compression failed: {e:#}");
1790 }
1791
1792 if let Err(e) = self.maybe_compact().await {
1793 tracing::warn!("context compaction failed: {e:#}");
1794 }
1795
1796 if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
1797 tracing::warn!("context preparation failed: {e:#}");
1798 }
1799
1800 self.provider
1802 .set_memory_confidence(self.memory_state.persistence.last_recall_confidence);
1803
1804 self.learning_engine.reset_reflection();
1805 }
1806
1807 fn build_user_message(
1808 &mut self,
1809 text: &str,
1810 image_parts: Vec<zeph_llm::provider::MessagePart>,
1811 ) -> Message {
1812 let mut all_image_parts = std::mem::take(&mut self.msg.pending_image_parts);
1813 all_image_parts.extend(image_parts);
1814
1815 if !all_image_parts.is_empty() && self.provider.supports_vision() {
1816 let mut parts = vec![zeph_llm::provider::MessagePart::Text {
1817 text: text.to_owned(),
1818 }];
1819 parts.extend(all_image_parts);
1820 Message::from_parts(Role::User, parts)
1821 } else {
1822 if !all_image_parts.is_empty() {
1823 tracing::warn!(
1824 count = all_image_parts.len(),
1825 "image attachments dropped: provider does not support vision"
1826 );
1827 }
1828 Message {
1829 role: Role::User,
1830 content: text.to_owned(),
1831 parts: vec![],
1832 metadata: MessageMetadata::default(),
1833 }
1834 }
1835 }
1836
1837 fn drain_background_completions(&mut self) {
1841 const BACKGROUND_COMPLETION_BUFFER_CAP: usize = 16;
1842
1843 let Some(ref mut rx) = self.lifecycle.background_completion_rx else {
1844 return;
1845 };
1846 while let Ok(completion) = rx.try_recv() {
1848 if self.lifecycle.pending_background_completions.len()
1849 >= BACKGROUND_COMPLETION_BUFFER_CAP
1850 {
1851 tracing::warn!(
1852 run_id = %completion.run_id,
1853 "background completion buffer full; dropping run result"
1854 );
1855 self.lifecycle.pending_background_completions.pop_front();
1858 self.lifecycle.pending_background_completions.push_back(
1859 zeph_tools::BackgroundCompletion {
1860 run_id: completion.run_id,
1861 exit_code: -1,
1862 success: false,
1863 elapsed_ms: 0,
1864 command: completion.command,
1865 output: format!(
1866 "[background result for run {} dropped: buffer overflow]",
1867 completion.run_id
1868 ),
1869 },
1870 );
1871 } else {
1872 self.lifecycle
1873 .pending_background_completions
1874 .push_back(completion);
1875 }
1876 }
1877 }
1878
1879 fn build_user_message_text_with_bg_completions(&mut self, user_text: &str) -> String {
1883 if self.lifecycle.pending_background_completions.is_empty() {
1884 return user_text.to_owned();
1885 }
1886 let mut parts = String::new();
1887 for completion in self.lifecycle.pending_background_completions.drain(..) {
1888 let _ = write!(
1889 parts,
1890 "[Background task {} completed]\nexit_code: {}\nsuccess: {}\nelapsed_ms: {}\ncommand: {}\n\n{}\n\n",
1891 completion.run_id,
1892 completion.exit_code,
1893 completion.success,
1894 completion.elapsed_ms,
1895 completion.command,
1896 completion.output,
1897 );
1898 }
1899 parts.push_str(user_text);
1900 parts
1901 }
1902
1903 async fn poll_subagent_until_done(&mut self, task_id: &str, label: &str) -> Option<String> {
1906 use zeph_subagent::SubAgentState;
1907 let result = loop {
1908 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1909
1910 #[allow(clippy::redundant_closure_for_method_calls)]
1914 let pending = self
1915 .orchestration
1916 .subagent_manager
1917 .as_mut()
1918 .and_then(|m| m.try_recv_secret_request());
1919 if let Some((req_task_id, req)) = pending {
1920 let confirm_prompt = format!(
1923 "Sub-agent requests secret '{}'. Allow?",
1924 crate::text::truncate_to_chars(&req.secret_key, 100)
1925 );
1926 let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
1927 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1928 if approved {
1929 let ttl = std::time::Duration::from_mins(5);
1930 let key = req.secret_key.clone();
1931 if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
1932 let _ = mgr.deliver_secret(&req_task_id, key);
1933 }
1934 } else {
1935 let _ = mgr.deny_secret(&req_task_id);
1936 }
1937 }
1938 }
1939
1940 let mgr = self.orchestration.subagent_manager.as_ref()?;
1941 let statuses = mgr.statuses();
1942 let Some((_, status)) = statuses.iter().find(|(id, _)| id == task_id) else {
1943 break format!("{label} completed (no status available).");
1944 };
1945 match status.state {
1946 SubAgentState::Completed => {
1947 let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
1948 break format!("{label} completed: {msg}");
1949 }
1950 SubAgentState::Failed => {
1951 let msg = status
1952 .last_message
1953 .clone()
1954 .unwrap_or_else(|| "unknown error".into());
1955 break format!("{label} failed: {msg}");
1956 }
1957 SubAgentState::Canceled => {
1958 break format!("{label} was cancelled.");
1959 }
1960 _ => {
1961 let _ = self
1962 .channel
1963 .send_status(&format!(
1964 "{label}: turn {}/{}",
1965 status.turns_used,
1966 self.orchestration
1967 .subagent_manager
1968 .as_ref()
1969 .and_then(|m| m.agents_def(task_id))
1970 .map_or(20, |d| d.permissions.max_turns)
1971 ))
1972 .await;
1973 }
1974 }
1975 };
1976 Some(result)
1977 }
1978
1979 fn resolve_agent_id_prefix(&mut self, prefix: &str) -> Option<Result<String, String>> {
1982 let mgr = self.orchestration.subagent_manager.as_mut()?;
1983 let full_ids: Vec<String> = mgr
1984 .statuses()
1985 .into_iter()
1986 .map(|(tid, _)| tid)
1987 .filter(|tid| tid.starts_with(prefix))
1988 .collect();
1989 Some(match full_ids.as_slice() {
1990 [] => Err(format!("No sub-agent with id prefix '{prefix}'")),
1991 [fid] => Ok(fid.clone()),
1992 _ => Err(format!(
1993 "Ambiguous id prefix '{prefix}': matches {} agents",
1994 full_ids.len()
1995 )),
1996 })
1997 }
1998
1999 fn handle_agent_list(&self) -> Option<String> {
2000 use std::fmt::Write as _;
2001 let mgr = self.orchestration.subagent_manager.as_ref()?;
2002 let defs = mgr.definitions();
2003 if defs.is_empty() {
2004 return Some("No sub-agent definitions found.".into());
2005 }
2006 let mut out = String::from("Available sub-agents:\n");
2007 for d in defs {
2008 let memory_label = match d.memory {
2009 Some(zeph_subagent::MemoryScope::User) => " [memory:user]",
2010 Some(zeph_subagent::MemoryScope::Project) => " [memory:project]",
2011 Some(zeph_subagent::MemoryScope::Local) => " [memory:local]",
2012 None => "",
2013 };
2014 if let Some(ref src) = d.source {
2015 let _ = writeln!(
2016 out,
2017 " {}{} — {} ({})",
2018 d.name, memory_label, d.description, src
2019 );
2020 } else {
2021 let _ = writeln!(out, " {}{} — {}", d.name, memory_label, d.description);
2022 }
2023 }
2024 Some(out)
2025 }
2026
2027 fn handle_agent_status(&self) -> Option<String> {
2028 use std::fmt::Write as _;
2029 let mgr = self.orchestration.subagent_manager.as_ref()?;
2030 let statuses = mgr.statuses();
2031 if statuses.is_empty() {
2032 return Some("No active sub-agents.".into());
2033 }
2034 let mut out = String::from("Active sub-agents:\n");
2035 for (id, s) in &statuses {
2036 let state = format!("{:?}", s.state).to_lowercase();
2037 let elapsed = s.started_at.elapsed().as_secs();
2038 let _ = writeln!(
2039 out,
2040 " [{short}] {state} turns={t} elapsed={elapsed}s {msg}",
2041 short = &id[..8.min(id.len())],
2042 t = s.turns_used,
2043 msg = s.last_message.as_deref().unwrap_or(""),
2044 );
2045 if let Some(def) = mgr.agents_def(id)
2047 && let Some(scope) = def.memory
2048 && let Ok(dir) = zeph_subagent::memory::resolve_memory_dir(scope, &def.name)
2049 {
2050 let _ = writeln!(out, " memory: {}", dir.display());
2051 }
2052 }
2053 Some(out)
2054 }
2055
2056 fn handle_agent_approve(&mut self, id: &str) -> Option<String> {
2057 let full_id = match self.resolve_agent_id_prefix(id)? {
2058 Ok(fid) => fid,
2059 Err(msg) => return Some(msg),
2060 };
2061 let mgr = self.orchestration.subagent_manager.as_mut()?;
2062 if let Some((tid, req)) = mgr.try_recv_secret_request()
2063 && tid == full_id
2064 {
2065 let key = req.secret_key.clone();
2066 let ttl = std::time::Duration::from_mins(5);
2067 if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
2068 return Some(format!("Approve failed: {e}"));
2069 }
2070 if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
2071 return Some(format!("Secret delivery failed: {e}"));
2072 }
2073 return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
2074 }
2075 Some(format!(
2076 "No pending secret request for sub-agent '{full_id}'."
2077 ))
2078 }
2079
2080 fn handle_agent_deny(&mut self, id: &str) -> Option<String> {
2081 let full_id = match self.resolve_agent_id_prefix(id)? {
2082 Ok(fid) => fid,
2083 Err(msg) => return Some(msg),
2084 };
2085 let mgr = self.orchestration.subagent_manager.as_mut()?;
2086 match mgr.deny_secret(&full_id) {
2087 Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
2088 Err(e) => Some(format!("Deny failed: {e}")),
2089 }
2090 }
2091
2092 #[allow(clippy::too_many_lines)]
2093 async fn handle_agent_command(&mut self, cmd: zeph_subagent::AgentCommand) -> Option<String> {
2094 use zeph_subagent::AgentCommand;
2095
2096 match cmd {
2097 AgentCommand::List => self.handle_agent_list(),
2098 AgentCommand::Background { name, prompt } => {
2099 let provider = self.provider.clone();
2100 let tool_executor = Arc::clone(&self.tool_executor);
2101 let skills = self.filtered_skills_for(&name);
2102 let cfg = self.orchestration.subagent_config.clone();
2103 let spawn_ctx = self.build_spawn_context(&cfg);
2104 let mgr = self.orchestration.subagent_manager.as_mut()?;
2105 match mgr.spawn(
2106 &name,
2107 &prompt,
2108 provider,
2109 tool_executor,
2110 skills,
2111 &cfg,
2112 spawn_ctx,
2113 ) {
2114 Ok(id) => Some(format!(
2115 "Sub-agent '{name}' started in background (id: {short})",
2116 short = &id[..8.min(id.len())]
2117 )),
2118 Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
2119 }
2120 }
2121 AgentCommand::Spawn { name, prompt }
2122 | AgentCommand::Mention {
2123 agent: name,
2124 prompt,
2125 } => {
2126 let provider = self.provider.clone();
2128 let tool_executor = Arc::clone(&self.tool_executor);
2129 let skills = self.filtered_skills_for(&name);
2130 let cfg = self.orchestration.subagent_config.clone();
2131 let spawn_ctx = self.build_spawn_context(&cfg);
2132 let mgr = self.orchestration.subagent_manager.as_mut()?;
2133 let task_id = match mgr.spawn(
2134 &name,
2135 &prompt,
2136 provider,
2137 tool_executor,
2138 skills,
2139 &cfg,
2140 spawn_ctx,
2141 ) {
2142 Ok(id) => id,
2143 Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
2144 };
2145 let short = task_id[..8.min(task_id.len())].to_owned();
2146 let _ = self
2147 .channel
2148 .send(&format!("Sub-agent '{name}' running... (id: {short})"))
2149 .await;
2150 let label = format!("Sub-agent '{name}'");
2151 self.poll_subagent_until_done(&task_id, &label).await
2152 }
2153 AgentCommand::Status => self.handle_agent_status(),
2154 AgentCommand::Cancel { id } => {
2155 let mgr = self.orchestration.subagent_manager.as_mut()?;
2156 let ids: Vec<String> = mgr
2158 .statuses()
2159 .into_iter()
2160 .map(|(task_id, _)| task_id)
2161 .filter(|task_id| task_id.starts_with(&id))
2162 .collect();
2163 match ids.as_slice() {
2164 [] => Some(format!("No sub-agent with id prefix '{id}'")),
2165 [full_id] => {
2166 let full_id = full_id.clone();
2167 match mgr.cancel(&full_id) {
2168 Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
2169 Err(e) => Some(format!("Cancel failed: {e}")),
2170 }
2171 }
2172 _ => Some(format!(
2173 "Ambiguous id prefix '{id}': matches {} agents",
2174 ids.len()
2175 )),
2176 }
2177 }
2178 AgentCommand::Approve { id } => self.handle_agent_approve(&id),
2179 AgentCommand::Deny { id } => self.handle_agent_deny(&id),
2180 AgentCommand::Resume { id, prompt } => {
2181 let cfg = self.orchestration.subagent_config.clone();
2182 let def_name = {
2185 let mgr = self.orchestration.subagent_manager.as_ref()?;
2186 match mgr.def_name_for_resume(&id, &cfg) {
2187 Ok(name) => name,
2188 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2189 }
2190 };
2191 let skills = self.filtered_skills_for(&def_name);
2192 let provider = self.provider.clone();
2193 let tool_executor = Arc::clone(&self.tool_executor);
2194 let mgr = self.orchestration.subagent_manager.as_mut()?;
2195 let (task_id, _) =
2196 match mgr.resume(&id, &prompt, provider, tool_executor, skills, &cfg) {
2197 Ok(pair) => pair,
2198 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2199 };
2200 let short = task_id[..8.min(task_id.len())].to_owned();
2201 let _ = self
2202 .channel
2203 .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
2204 .await;
2205 self.poll_subagent_until_done(&task_id, "Resumed sub-agent")
2206 .await
2207 }
2208 }
2209 }
2210
2211 fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
2212 let mgr = self.orchestration.subagent_manager.as_ref()?;
2213 let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
2214 let reg = self.skill_state.registry.read();
2215 match zeph_subagent::filter_skills(®, &def.skills) {
2216 Ok(skills) => {
2217 let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
2218 if bodies.is_empty() {
2219 None
2220 } else {
2221 Some(bodies)
2222 }
2223 }
2224 Err(e) => {
2225 tracing::warn!(error = %e, "skill filtering failed for sub-agent");
2226 None
2227 }
2228 }
2229 }
2230
2231 fn build_spawn_context(
2233 &self,
2234 cfg: &zeph_config::SubAgentConfig,
2235 ) -> zeph_subagent::SpawnContext {
2236 zeph_subagent::SpawnContext {
2237 parent_messages: self.extract_parent_messages(cfg),
2238 parent_cancel: Some(self.lifecycle.cancel_token.clone()),
2239 parent_provider_name: {
2240 let name = &self.runtime.active_provider_name;
2241 if name.is_empty() {
2242 None
2243 } else {
2244 Some(name.clone())
2245 }
2246 },
2247 spawn_depth: self.runtime.spawn_depth,
2248 mcp_tool_names: self.extract_mcp_tool_names(),
2249 }
2250 }
2251
2252 fn extract_parent_messages(
2257 &self,
2258 config: &zeph_config::SubAgentConfig,
2259 ) -> Vec<zeph_llm::provider::Message> {
2260 use zeph_llm::provider::Role;
2261 if config.context_window_turns == 0 {
2262 return Vec::new();
2263 }
2264 let non_system: Vec<_> = self
2265 .msg
2266 .messages
2267 .iter()
2268 .filter(|m| m.role != Role::System)
2269 .cloned()
2270 .collect();
2271 let take_count = config.context_window_turns * 2;
2272 let start = non_system.len().saturating_sub(take_count);
2273 let mut msgs = non_system[start..].to_vec();
2274
2275 let max_chars = 128_000usize / 4; let mut total_chars: usize = 0;
2278 let mut keep = msgs.len();
2279 for (i, m) in msgs.iter().enumerate() {
2280 total_chars += m.content.len();
2281 if total_chars > max_chars {
2282 keep = i;
2283 break;
2284 }
2285 }
2286 if keep < msgs.len() {
2287 tracing::info!(
2288 kept = keep,
2289 requested = config.context_window_turns * 2,
2290 "[subagent] truncated parent history from {} to {} turns due to token budget",
2291 config.context_window_turns * 2,
2292 keep
2293 );
2294 msgs.truncate(keep);
2295 }
2296 msgs
2297 }
2298
2299 fn extract_mcp_tool_names(&self) -> Vec<String> {
2301 self.tool_executor
2302 .tool_definitions_erased()
2303 .into_iter()
2304 .filter(|t| t.id.starts_with("mcp_"))
2305 .map(|t| t.id.to_string())
2306 .collect()
2307 }
2308
2309 fn classify_source_kind(
2313 skill_dir: &std::path::Path,
2314 managed_dir: Option<&std::path::PathBuf>,
2315 bundled_names: &std::collections::HashSet<String>,
2316 ) -> zeph_memory::store::SourceKind {
2317 if managed_dir.is_some_and(|d| skill_dir.starts_with(d)) {
2318 let skill_name = skill_dir.file_name().and_then(|n| n.to_str()).unwrap_or("");
2319 let has_marker = skill_dir.join(".bundled").exists();
2320 if has_marker && bundled_names.contains(skill_name) {
2321 zeph_memory::store::SourceKind::Bundled
2322 } else {
2323 if has_marker {
2324 tracing::warn!(
2325 skill = %skill_name,
2326 "skill has .bundled marker but is not in the bundled skill \
2327 allowlist — classifying as Hub"
2328 );
2329 }
2330 zeph_memory::store::SourceKind::Hub
2331 }
2332 } else {
2333 zeph_memory::store::SourceKind::Local
2334 }
2335 }
2336
2337 async fn update_trust_for_reloaded_skills(
2339 &mut self,
2340 all_meta: &[zeph_skills::loader::SkillMeta],
2341 ) {
2342 let memory = self.memory_state.persistence.memory.clone();
2344 let Some(memory) = memory else {
2345 return;
2346 };
2347 let trust_cfg = self.skill_state.trust_config.clone();
2348 let managed_dir = self.skill_state.managed_dir.clone();
2349 let bundled_names: std::collections::HashSet<String> =
2350 zeph_skills::bundled_skill_names().into_iter().collect();
2351 for meta in all_meta {
2352 let skill_dir = meta.skill_dir.clone();
2355 let managed_dir_ref = managed_dir.clone();
2356 let bundled_names_ref = bundled_names.clone();
2357 let fs_result: Option<(String, zeph_memory::store::SourceKind)> =
2358 tokio::task::spawn_blocking(move || {
2359 let hash = zeph_skills::compute_skill_hash(&skill_dir).ok()?;
2360 let source_kind = Self::classify_source_kind(
2361 &skill_dir,
2362 managed_dir_ref.as_ref(),
2363 &bundled_names_ref,
2364 );
2365 Some((hash, source_kind))
2366 })
2367 .await
2368 .unwrap_or(None);
2369
2370 let Some((current_hash, source_kind)) = fs_result else {
2371 tracing::warn!("failed to compute hash for '{}'", meta.name);
2372 continue;
2373 };
2374 let initial_level = match source_kind {
2375 zeph_memory::store::SourceKind::Bundled => &trust_cfg.bundled_level,
2376 zeph_memory::store::SourceKind::Hub => &trust_cfg.default_level,
2377 zeph_memory::store::SourceKind::Local | zeph_memory::store::SourceKind::File => {
2378 &trust_cfg.local_level
2379 }
2380 };
2381 let existing = memory
2382 .sqlite()
2383 .load_skill_trust(&meta.name)
2384 .await
2385 .ok()
2386 .flatten();
2387 let trust_level_str = if let Some(ref row) = existing {
2388 if row.blake3_hash != current_hash {
2389 trust_cfg.hash_mismatch_level.to_string()
2390 } else if row.source_kind != source_kind {
2391 let stored = row
2395 .trust_level
2396 .parse::<zeph_tools::SkillTrustLevel>()
2397 .unwrap_or_else(|_| {
2398 tracing::warn!(
2399 skill = %meta.name,
2400 raw = %row.trust_level,
2401 "unrecognised trust_level in DB, treating as quarantined"
2402 );
2403 zeph_tools::SkillTrustLevel::Quarantined
2404 });
2405 if !stored.is_active() || stored.severity() <= initial_level.severity() {
2406 row.trust_level.clone()
2407 } else {
2408 initial_level.to_string()
2409 }
2410 } else {
2411 row.trust_level.clone()
2412 }
2413 } else {
2414 initial_level.to_string()
2415 };
2416 let source_path = meta.skill_dir.to_str();
2417 if let Err(e) = memory
2418 .sqlite()
2419 .upsert_skill_trust(
2420 &meta.name,
2421 &trust_level_str,
2422 source_kind,
2423 None,
2424 source_path,
2425 ¤t_hash,
2426 )
2427 .await
2428 {
2429 tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
2430 }
2431 }
2432 }
2433
2434 async fn rebuild_skill_matcher(&mut self, all_meta: &[&zeph_skills::loader::SkillMeta]) {
2436 let provider = self.embedding_provider.clone();
2437 let embed_timeout = std::time::Duration::from_secs(self.runtime.timeouts.embedding_seconds);
2438 let embed_fn = move |text: &str| -> zeph_skills::matcher::EmbedFuture {
2439 let owned = text.to_owned();
2440 let p = provider.clone();
2441 Box::pin(async move {
2442 if let Ok(result) = tokio::time::timeout(embed_timeout, p.embed(&owned)).await {
2443 result
2444 } else {
2445 tracing::warn!(
2446 timeout_secs = embed_timeout.as_secs(),
2447 "skill matcher: embedding timed out"
2448 );
2449 Err(zeph_llm::LlmError::Timeout)
2450 }
2451 })
2452 };
2453
2454 let needs_inmemory_rebuild = !self
2455 .skill_state
2456 .matcher
2457 .as_ref()
2458 .is_some_and(SkillMatcherBackend::is_qdrant);
2459
2460 if needs_inmemory_rebuild {
2461 self.skill_state.matcher = SkillMatcher::new(all_meta, embed_fn)
2462 .await
2463 .map(SkillMatcherBackend::InMemory);
2464 } else if let Some(ref mut backend) = self.skill_state.matcher {
2465 let _ = self.channel.send_status("syncing skill index...").await;
2466 let on_progress: Option<Box<dyn Fn(usize, usize) + Send>> = self
2467 .session
2468 .status_tx
2469 .clone()
2470 .map(|tx| -> Box<dyn Fn(usize, usize) + Send> {
2471 Box::new(move |completed, total| {
2472 let msg = format!("Syncing skills: {completed}/{total}");
2473 let _ = tx.send(msg);
2474 })
2475 });
2476 if let Err(e) = backend
2477 .sync(
2478 all_meta,
2479 &self.skill_state.embedding_model,
2480 embed_fn,
2481 on_progress,
2482 )
2483 .await
2484 {
2485 tracing::warn!("failed to sync skill embeddings: {e:#}");
2486 }
2487 }
2488
2489 if self.skill_state.hybrid_search {
2490 let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
2491 let _ = self.channel.send_status("rebuilding search index...").await;
2492 self.skill_state.rebuild_bm25(&descs);
2493 }
2494 }
2495
2496 #[cfg_attr(
2497 feature = "profiling",
2498 tracing::instrument(name = "skill.hot_reload", skip_all)
2499 )]
2500 async fn reload_skills(&mut self) {
2501 let old_fp = self.skill_state.fingerprint();
2502 let reload_paths = if let Some(ref supplier) = self.skill_state.plugin_dirs_supplier {
2503 let plugin_dirs = supplier();
2504 let mut paths = self.skill_state.skill_paths.clone();
2505 for dir in plugin_dirs {
2506 if !paths.contains(&dir) {
2507 paths.push(dir);
2508 }
2509 }
2510 paths
2511 } else {
2512 self.skill_state.skill_paths.clone()
2513 };
2514 self.skill_state.registry.write().reload(&reload_paths);
2515 if self.skill_state.fingerprint() == old_fp {
2516 return;
2517 }
2518 let _ = self.channel.send_status("reloading skills...").await;
2519
2520 let all_meta = self
2521 .skill_state
2522 .registry
2523 .read()
2524 .all_meta()
2525 .into_iter()
2526 .cloned()
2527 .collect::<Vec<_>>();
2528
2529 self.update_trust_for_reloaded_skills(&all_meta).await;
2530
2531 let all_meta_refs = all_meta.iter().collect::<Vec<_>>();
2532 self.rebuild_skill_matcher(&all_meta_refs).await;
2533
2534 let all_skills: Vec<Skill> = {
2535 let reg = self.skill_state.registry.read();
2536 reg.all_meta()
2537 .iter()
2538 .filter_map(|m| reg.get_skill(&m.name).ok())
2539 .collect()
2540 };
2541 let trust_map = self.build_skill_trust_map().await;
2542 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
2543 let skills_prompt = SkillState::rebuild_prompt(&all_skills, &trust_map, &empty_health);
2544 self.skill_state
2545 .last_skills_prompt
2546 .clone_from(&skills_prompt);
2547 let system_prompt = build_system_prompt(&skills_prompt, None);
2548 if let Some(msg) = self.msg.messages.first_mut() {
2549 msg.content = system_prompt;
2550 }
2551
2552 let _ = self.channel.send_status("").await;
2553 tracing::info!(
2554 "reloaded {} skill(s)",
2555 self.skill_state.registry.read().all_meta().len()
2556 );
2557 }
2558
2559 fn reload_instructions(&mut self) {
2560 if let Some(ref mut rx) = self.instructions.reload_rx {
2562 while rx.try_recv().is_ok() {}
2563 }
2564 let Some(ref state) = self.instructions.reload_state else {
2565 return;
2566 };
2567 let new_blocks = crate::instructions::load_instructions(
2568 &state.base_dir,
2569 &state.provider_kinds,
2570 &state.explicit_files,
2571 state.auto_detect,
2572 );
2573 let old_sources: std::collections::HashSet<_> =
2574 self.instructions.blocks.iter().map(|b| &b.source).collect();
2575 let new_sources: std::collections::HashSet<_> =
2576 new_blocks.iter().map(|b| &b.source).collect();
2577 for added in new_sources.difference(&old_sources) {
2578 tracing::info!(path = %added.display(), "instruction file added");
2579 }
2580 for removed in old_sources.difference(&new_sources) {
2581 tracing::info!(path = %removed.display(), "instruction file removed");
2582 }
2583 tracing::info!(
2584 old_count = self.instructions.blocks.len(),
2585 new_count = new_blocks.len(),
2586 "reloaded instruction files"
2587 );
2588 self.instructions.blocks = new_blocks;
2589 }
2590
2591 fn reload_config(&mut self) {
2592 let Some(path) = self.lifecycle.config_path.clone() else {
2593 return;
2594 };
2595 let Some(config) = self.load_config_with_overlay(&path) else {
2596 return;
2597 };
2598 let budget_tokens = resolve_context_budget(&config, &self.provider);
2599 self.runtime.security = config.security;
2600 self.runtime.timeouts = config.timeouts;
2601 self.runtime.redact_credentials = config.memory.redact_credentials;
2602 self.memory_state.persistence.history_limit = config.memory.history_limit;
2603 self.memory_state.persistence.recall_limit = config.memory.semantic.recall_limit;
2604 self.memory_state.compaction.summarization_threshold =
2605 config.memory.summarization_threshold;
2606 self.skill_state.max_active_skills = config.skills.max_active_skills;
2607 self.skill_state.disambiguation_threshold = config.skills.disambiguation_threshold;
2608 self.skill_state.min_injection_score = config.skills.min_injection_score;
2609 self.skill_state.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
2610 self.skill_state.hybrid_search = config.skills.hybrid_search;
2611 self.skill_state.two_stage_matching = config.skills.two_stage_matching;
2612 self.skill_state.confusability_threshold =
2613 config.skills.confusability_threshold.clamp(0.0, 1.0);
2614 config
2615 .skills
2616 .generation_provider
2617 .as_str()
2618 .clone_into(&mut self.skill_state.generation_provider_name);
2619 self.skill_state.generation_output_dir =
2620 config.skills.generation_output_dir.as_deref().map(|p| {
2621 if let Some(stripped) = p.strip_prefix("~/") {
2622 dirs::home_dir()
2623 .map_or_else(|| std::path::PathBuf::from(p), |h| h.join(stripped))
2624 } else {
2625 std::path::PathBuf::from(p)
2626 }
2627 });
2628
2629 self.context_manager.budget = Some(
2630 ContextBudget::new(budget_tokens, 0.20).with_graph_enabled(config.memory.graph.enabled),
2631 );
2632
2633 {
2634 let graph_cfg = &config.memory.graph;
2635 if graph_cfg.rpe.enabled {
2636 if self.memory_state.extraction.rpe_router.is_none() {
2638 self.memory_state.extraction.rpe_router =
2639 Some(std::sync::Mutex::new(zeph_memory::RpeRouter::new(
2640 graph_cfg.rpe.threshold,
2641 graph_cfg.rpe.max_skip_turns,
2642 )));
2643 }
2644 } else {
2645 self.memory_state.extraction.rpe_router = None;
2646 }
2647 self.memory_state.extraction.graph_config = graph_cfg.clone();
2648 }
2649 self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
2650 self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
2651 self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
2652 self.context_manager.compaction_cooldown_turns = config.memory.compaction_cooldown_turns;
2653 self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
2654 self.context_manager.compression = config.memory.compression.clone();
2655 self.context_manager.routing = config.memory.store_routing.clone();
2656 self.context_manager.store_routing_provider = if config
2658 .memory
2659 .store_routing
2660 .routing_classifier_provider
2661 .is_empty()
2662 {
2663 None
2664 } else {
2665 let resolved = self.resolve_background_provider(
2666 &config.memory.store_routing.routing_classifier_provider,
2667 );
2668 Some(std::sync::Arc::new(resolved))
2669 };
2670 self.memory_state.persistence.cross_session_score_threshold =
2671 config.memory.cross_session_score_threshold;
2672
2673 self.index.repo_map_tokens = config.index.repo_map_tokens;
2674 self.index.repo_map_ttl = std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
2675
2676 tracing::info!("config reloaded");
2677 }
2678
2679 fn load_config_with_overlay(&mut self, path: &std::path::Path) -> Option<Config> {
2683 let mut config = match Config::load(path) {
2684 Ok(c) => c,
2685 Err(e) => {
2686 tracing::warn!("config reload failed: {e:#}");
2687 return None;
2688 }
2689 };
2690
2691 let new_overlay = if self.lifecycle.plugins_dir.as_os_str().is_empty() {
2693 None
2694 } else {
2695 match zeph_plugins::apply_plugin_config_overlays(
2696 &mut config,
2697 &self.lifecycle.plugins_dir,
2698 ) {
2699 Ok(o) => Some(o),
2700 Err(e) => {
2701 tracing::warn!(
2702 "plugin overlay merge failed during reload: {e:#}; \
2703 keeping previous runtime state"
2704 );
2705 return None;
2706 }
2707 }
2708 };
2709
2710 if let Some(ref overlay) = new_overlay {
2714 self.warn_on_shell_overlay_divergence(overlay, &config);
2715 }
2716 Some(config)
2717 }
2718
2719 fn warn_on_shell_overlay_divergence(
2725 &self,
2726 new_overlay: &zeph_plugins::ResolvedOverlay,
2727 config: &Config,
2728 ) {
2729 let new_blocked: Vec<String> = {
2730 let mut v = config.tools.shell.blocked_commands.clone();
2731 v.sort();
2732 v
2733 };
2734 let new_allowed: Vec<String> = {
2735 let mut v = config.tools.shell.allowed_commands.clone();
2736 v.sort();
2737 v
2738 };
2739
2740 let startup = &self.lifecycle.startup_shell_overlay;
2741 let blocked_changed = new_blocked != startup.blocked;
2742 let allowed_changed = new_allowed != startup.allowed;
2743
2744 if blocked_changed && let Some(ref h) = self.lifecycle.shell_policy_handle {
2746 h.rebuild(&config.tools.shell);
2747 tracing::info!(
2748 blocked_count = h.snapshot_blocked().len(),
2749 "shell blocked_commands rebuilt from hot-reload"
2750 );
2751 }
2752
2753 if allowed_changed {
2760 let msg = "plugin config overlay changed shell allowed_commands; RESTART REQUIRED \
2761 for sandbox path recomputation (blocked_commands was rebuilt live)";
2762 tracing::warn!("{msg}");
2763 if let Some(ref tx) = self.session.status_tx {
2764 let _ = tx.send(msg.to_owned());
2765 }
2766 }
2767
2768 let _ = new_overlay;
2769 }
2770
2771 #[allow(clippy::too_many_lines)]
2782 fn maybe_sidequest_eviction(&mut self) {
2783 use zeph_llm::provider::{Message, MessageMetadata, Role};
2784
2785 if self.sidequest.config.enabled {
2789 use crate::config::PruningStrategy;
2790 if !matches!(
2791 self.context_manager.compression.pruning_strategy,
2792 PruningStrategy::Reactive
2793 ) {
2794 tracing::warn!(
2795 strategy = ?self.context_manager.compression.pruning_strategy,
2796 "sidequest is enabled alongside a non-Reactive pruning strategy; \
2797 consider disabling sidequest.enabled to avoid redundant eviction"
2798 );
2799 }
2800 }
2801
2802 if self.focus.is_active() {
2804 tracing::debug!("sidequest: skipping — focus session active");
2805 self.compression.pending_sidequest_result = None;
2807 return;
2808 }
2809
2810 if let Some(handle) = self.compression.pending_sidequest_result.take() {
2812 use futures::FutureExt as _;
2814 match handle.now_or_never() {
2815 Some(Ok(Some(evicted_indices))) if !evicted_indices.is_empty() => {
2816 let cursors_snapshot = self.sidequest.tool_output_cursors.clone();
2817 let freed = self.sidequest.apply_eviction(
2818 &mut self.msg.messages,
2819 &evicted_indices,
2820 &self.metrics.token_counter,
2821 );
2822 if freed > 0 {
2823 self.recompute_prompt_tokens();
2824 self.context_manager.compaction =
2827 crate::agent::context_manager::CompactionState::CompactedThisTurn {
2828 cooldown: 0,
2829 };
2830 tracing::info!(
2831 freed_tokens = freed,
2832 evicted_cursors = evicted_indices.len(),
2833 pass = self.sidequest.passes_run,
2834 "sidequest eviction complete"
2835 );
2836 if let Some(ref d) = self.debug_state.debug_dumper {
2837 d.dump_sidequest_eviction(&cursors_snapshot, &evicted_indices, freed);
2838 }
2839 if let Some(ref tx) = self.session.status_tx {
2840 let _ = tx.send(format!("SideQuest evicted {freed} tokens"));
2841 }
2842 } else {
2843 if let Some(ref tx) = self.session.status_tx {
2845 let _ = tx.send(String::new());
2846 }
2847 }
2848 }
2849 Some(Ok(None | Some(_))) => {
2850 tracing::debug!("sidequest: pending result: no cursors to evict");
2851 if let Some(ref tx) = self.session.status_tx {
2852 let _ = tx.send(String::new());
2853 }
2854 }
2855 Some(Err(e)) => {
2856 tracing::debug!("sidequest: background task panicked: {e}");
2857 if let Some(ref tx) = self.session.status_tx {
2858 let _ = tx.send(String::new());
2859 }
2860 }
2861 None => {
2862 tracing::debug!(
2866 "sidequest: background LLM task not yet complete, rescheduling"
2867 );
2868 }
2869 }
2870 }
2871
2872 self.sidequest
2874 .rebuild_cursors(&self.msg.messages, &self.metrics.token_counter);
2875
2876 if self.sidequest.tool_output_cursors.is_empty() {
2877 tracing::debug!("sidequest: no eligible cursors");
2878 return;
2879 }
2880
2881 let prompt = self.sidequest.build_eviction_prompt();
2882 let max_eviction_ratio = self.sidequest.config.max_eviction_ratio;
2883 let n_cursors = self.sidequest.tool_output_cursors.len();
2884 let provider = self.summary_or_primary_provider().clone();
2886
2887 let handle = tokio::spawn(async move {
2889 let msgs = [Message {
2890 role: Role::User,
2891 content: prompt,
2892 parts: vec![],
2893 metadata: MessageMetadata::default(),
2894 }];
2895 let response =
2896 match tokio::time::timeout(std::time::Duration::from_secs(5), provider.chat(&msgs))
2897 .await
2898 {
2899 Ok(Ok(r)) => r,
2900 Ok(Err(e)) => {
2901 tracing::debug!("sidequest bg: LLM call failed: {e:#}");
2902 return None;
2903 }
2904 Err(_) => {
2905 tracing::debug!("sidequest bg: LLM call timed out");
2906 return None;
2907 }
2908 };
2909
2910 let start = response.find('{')?;
2911 let end = response.rfind('}')?;
2912 if start > end {
2913 return None;
2914 }
2915 let json_slice = &response[start..=end];
2916 let parsed: sidequest::EvictionResponse = serde_json::from_str(json_slice).ok()?;
2917 let mut valid: Vec<usize> = parsed
2918 .del_cursors
2919 .into_iter()
2920 .filter(|&c| c < n_cursors)
2921 .collect();
2922 valid.sort_unstable();
2923 valid.dedup();
2924 #[allow(
2925 clippy::cast_precision_loss,
2926 clippy::cast_possible_truncation,
2927 clippy::cast_sign_loss
2928 )]
2929 let max_evict = ((n_cursors as f32) * max_eviction_ratio).ceil() as usize;
2930 valid.truncate(max_evict);
2931 Some(valid)
2932 });
2933
2934 self.compression.pending_sidequest_result = Some(handle);
2935 tracing::debug!("sidequest: background LLM eviction task spawned");
2936 if let Some(ref tx) = self.session.status_tx {
2937 let _ = tx.send("SideQuest: scoring tool outputs...".into());
2938 }
2939 }
2940
2941 fn mcp_dispatch(&self) -> Option<McpManagerDispatch> {
2943 self.mcp
2944 .manager
2945 .as_ref()
2946 .map(|m| McpManagerDispatch(Arc::clone(m)))
2947 }
2948
2949 pub(crate) async fn check_cwd_changed(&mut self) {
2955 let current = match std::env::current_dir() {
2956 Ok(p) => p,
2957 Err(e) => {
2958 tracing::warn!("check_cwd_changed: failed to get cwd: {e}");
2959 return;
2960 }
2961 };
2962 if current == self.lifecycle.last_known_cwd {
2963 return;
2964 }
2965 let old_cwd = std::mem::replace(&mut self.lifecycle.last_known_cwd, current.clone());
2966 self.session.env_context.working_dir = current.display().to_string();
2967
2968 tracing::info!(
2969 old = %old_cwd.display(),
2970 new = %current.display(),
2971 "working directory changed"
2972 );
2973
2974 let _ = self
2975 .channel
2976 .send_status("Working directory changed\u{2026}")
2977 .await;
2978
2979 let hooks = self.session.hooks_config.cwd_changed.clone();
2980 if !hooks.is_empty() {
2981 let mut env = std::collections::HashMap::new();
2982 env.insert("ZEPH_OLD_CWD".to_owned(), old_cwd.display().to_string());
2983 env.insert("ZEPH_NEW_CWD".to_owned(), current.display().to_string());
2984 let dispatch = self.mcp_dispatch();
2985 let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
2986 .as_ref()
2987 .map(|d| d as &dyn zeph_subagent::McpDispatch);
2988 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp).await {
2989 tracing::warn!(error = %e, "CwdChanged hook failed");
2990 }
2991 }
2992
2993 let _ = self.channel.send_status("").await;
2994 }
2995
2996 pub(crate) async fn handle_file_changed(
2998 &mut self,
2999 event: crate::file_watcher::FileChangedEvent,
3000 ) {
3001 tracing::info!(path = %event.path.display(), "file changed");
3002
3003 let _ = self
3004 .channel
3005 .send_status("Running file-change hook\u{2026}")
3006 .await;
3007
3008 let hooks = self.session.hooks_config.file_changed_hooks.clone();
3009 if !hooks.is_empty() {
3010 let mut env = std::collections::HashMap::new();
3011 env.insert(
3012 "ZEPH_CHANGED_PATH".to_owned(),
3013 event.path.display().to_string(),
3014 );
3015 let dispatch = self.mcp_dispatch();
3016 let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
3017 .as_ref()
3018 .map(|d| d as &dyn zeph_subagent::McpDispatch);
3019 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp).await {
3020 tracing::warn!(error = %e, "FileChanged hook failed");
3021 }
3022 }
3023
3024 let _ = self.channel.send_status("").await;
3025 }
3026
3027 pub(super) fn maybe_spawn_promotion_scan(&mut self) {
3037 let Some(engine) = self.promotion_engine.clone() else {
3038 return;
3039 };
3040
3041 let Some(memory) = self.memory_state.persistence.memory.clone() else {
3042 return;
3043 };
3044
3045 let promotion_window = 200usize;
3048
3049 let accepted = self.lifecycle.supervisor.spawn(
3050 agent_supervisor::TaskClass::Enrichment,
3051 "compression_spectrum.promotion_scan",
3052 async move {
3053 let span = tracing::info_span!("memory.compression.promote.background");
3054 let _enter = span.enter();
3055
3056 let window = match memory.load_promotion_window(promotion_window).await {
3057 Ok(w) => w,
3058 Err(e) => {
3059 tracing::warn!(error = %e, "promotion scan: failed to load window");
3060 return;
3061 }
3062 };
3063
3064 if window.is_empty() {
3065 return;
3066 }
3067
3068 let candidates = match engine.scan(&window).await {
3069 Ok(c) => c,
3070 Err(e) => {
3071 tracing::warn!(error = %e, "promotion scan: clustering failed");
3072 return;
3073 }
3074 };
3075
3076 for candidate in &candidates {
3077 if let Err(e) = engine.promote(candidate).await {
3078 tracing::warn!(
3079 signature = %candidate.signature,
3080 error = %e,
3081 "promotion scan: promote failed"
3082 );
3083 }
3084 }
3085
3086 tracing::info!(candidates = candidates.len(), "promotion scan: complete");
3087 },
3088 );
3089
3090 if accepted {
3091 tracing::debug!("compression_spectrum: promotion scan task enqueued");
3092 }
3093 }
3094}
3095struct McpManagerDispatch(Arc<zeph_mcp::McpManager>);
3100
3101impl zeph_subagent::McpDispatch for McpManagerDispatch {
3102 fn call_tool<'a>(
3103 &'a self,
3104 server: &'a str,
3105 tool: &'a str,
3106 args: serde_json::Value,
3107 ) -> std::pin::Pin<
3108 Box<dyn std::future::Future<Output = Result<serde_json::Value, String>> + Send + 'a>,
3109 > {
3110 Box::pin(async move {
3111 self.0
3112 .call_tool(server, tool, args)
3113 .await
3114 .map(|result| {
3115 let texts: Vec<serde_json::Value> = result
3117 .content
3118 .iter()
3119 .filter_map(|c| {
3120 if let rmcp::model::RawContent::Text(t) = &c.raw {
3121 Some(serde_json::Value::String(t.text.clone()))
3122 } else {
3123 None
3124 }
3125 })
3126 .collect();
3127 serde_json::Value::Array(texts)
3128 })
3129 .map_err(|e| e.to_string())
3130 })
3131 }
3132}
3133
3134pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
3135 while !*rx.borrow_and_update() {
3136 if rx.changed().await.is_err() {
3137 std::future::pending::<()>().await;
3138 }
3139 }
3140}
3141
3142pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
3143 match rx {
3144 Some(inner) => {
3145 if let Some(v) = inner.recv().await {
3146 Some(v)
3147 } else {
3148 *rx = None;
3149 std::future::pending().await
3150 }
3151 }
3152 None => std::future::pending().await,
3153 }
3154}
3155
3156pub(crate) fn resolve_context_budget(config: &Config, provider: &AnyProvider) -> usize {
3162 let tokens = if config.memory.auto_budget && config.memory.context_budget_tokens == 0 {
3163 if let Some(ctx_size) = provider.context_window() {
3164 tracing::info!(
3165 model_context = ctx_size,
3166 "auto-configured context budget on reload"
3167 );
3168 ctx_size
3169 } else {
3170 0
3171 }
3172 } else {
3173 config.memory.context_budget_tokens
3174 };
3175 if tokens == 0 {
3176 tracing::warn!(
3177 "context_budget_tokens resolved to 0 on reload — using fallback of 128000 tokens"
3178 );
3179 128_000
3180 } else {
3181 tokens
3182 }
3183}
3184
3185#[cfg(test)]
3186mod tests;
3187
3188#[cfg(test)]
3189pub(crate) use tests::agent_tests;
3190
3191#[cfg(test)]
3192mod test_stubs {
3193 use std::pin::Pin;
3194
3195 use zeph_commands::{
3196 CommandContext, CommandError, CommandHandler, CommandOutput, SlashCategory,
3197 };
3198
3199 pub(super) struct TestErrorCommand;
3205
3206 impl CommandHandler<CommandContext<'_>> for TestErrorCommand {
3207 fn name(&self) -> &'static str {
3208 "/test-error"
3209 }
3210
3211 fn description(&self) -> &'static str {
3212 "Test stub: always returns CommandError"
3213 }
3214
3215 fn category(&self) -> SlashCategory {
3216 SlashCategory::Session
3217 }
3218
3219 fn handle<'a>(
3220 &'a self,
3221 _ctx: &'a mut CommandContext<'_>,
3222 _args: &'a str,
3223 ) -> Pin<
3224 Box<dyn std::future::Future<Output = Result<CommandOutput, CommandError>> + Send + 'a>,
3225 > {
3226 Box::pin(async { Err(CommandError::new("boom")) })
3227 }
3228 }
3229}