1mod acp_commands;
10mod agent_access_impl;
11pub(crate) mod agent_supervisor;
12mod autodream;
13mod builder;
14pub(crate) mod channel_impl;
15mod command_context_impls;
16pub(super) mod compression_feedback;
17mod context;
18mod context_impls;
19pub(crate) mod context_manager;
20mod corrections;
21pub mod error;
22mod experiment_cmd;
23pub(crate) mod focus;
24mod index;
25mod learning;
26pub(crate) mod learning_engine;
27mod log_commands;
28mod loop_event;
29mod lsp_commands;
30mod magic_docs;
31mod mcp;
32mod message_queue;
33mod microcompact;
34mod model_commands;
35mod persistence;
36#[cfg(feature = "scheduler")]
37mod plan;
38mod policy_commands;
39mod provider_cmd;
40#[cfg(feature = "self-check")]
41mod quality_hook;
42pub(crate) mod rate_limiter;
43#[cfg(feature = "scheduler")]
44mod scheduler_commands;
45#[cfg(feature = "scheduler")]
46mod scheduler_loop;
47pub mod session_config;
48mod session_digest;
49pub(crate) mod sidequest;
50mod skill_management;
51pub mod slash_commands;
52pub mod speculative;
53pub(crate) mod state;
54pub(crate) mod task_injection;
55pub(crate) mod tool_execution;
56pub(crate) mod tool_orchestrator;
57mod trust_commands;
58pub mod turn;
59mod utils;
60pub(crate) mod vigil;
61
62use std::collections::{HashMap, VecDeque};
63use std::fmt::Write as _;
64use std::sync::Arc;
65
66use parking_lot::RwLock;
67
68use tokio::sync::{mpsc, watch};
69use tokio_util::sync::CancellationToken;
70use zeph_llm::any::AnyProvider;
71use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
72use zeph_memory::TokenCounter;
73use zeph_memory::semantic::SemanticMemory;
74use zeph_skills::loader::Skill;
75use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
76use zeph_skills::prompt::format_skills_prompt;
77use zeph_skills::registry::SkillRegistry;
78use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
79
80use crate::channel::Channel;
81use crate::config::Config;
82use crate::context::{ContextBudget, build_system_prompt};
83use zeph_common::text::estimate_tokens;
84
85use loop_event::LoopEvent;
86use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, detect_image_mime};
87use state::MessageState;
88
89pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
90pub(crate) use zeph_agent_context::helpers::CODE_CONTEXT_PREFIX;
94pub(crate) const SCHEDULED_TASK_PREFIX: &str = "Execute the following scheduled task now: ";
95pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
96
97pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
98 use std::fmt::Write;
99 let capacity = "[tool output: ".len()
100 + tool_name.len()
101 + "]\n```\n".len()
102 + body.len()
103 + TOOL_OUTPUT_SUFFIX.len();
104 let mut buf = String::with_capacity(capacity);
105 let _ = write!(
106 buf,
107 "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
108 );
109 buf
110}
111
112pub struct Agent<C: Channel> {
145 provider: AnyProvider,
147 embedding_provider: AnyProvider,
152 channel: C,
153 pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
154
155 pub(super) msg: MessageState,
157 pub(super) context_manager: context_manager::ContextManager,
158 pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
159
160 pub(super) services: state::Services,
162
163 pub(super) runtime: state::AgentRuntime,
165}
166
167enum DispatchFlow {
169 Break,
171 Continue,
173 Fallthrough,
175}
176
177impl<C: Channel> Agent<C> {
178 #[must_use]
202 pub fn new(
203 provider: AnyProvider,
204 channel: C,
205 registry: SkillRegistry,
206 matcher: Option<SkillMatcherBackend>,
207 max_active_skills: usize,
208 tool_executor: impl ToolExecutor + 'static,
209 ) -> Self {
210 let registry = Arc::new(RwLock::new(registry));
211 let embedding_provider = provider.clone();
212 Self::new_with_registry_arc(
213 provider,
214 embedding_provider,
215 channel,
216 registry,
217 matcher,
218 max_active_skills,
219 tool_executor,
220 )
221 }
222
223 #[must_use]
230 pub fn new_with_registry_arc(
231 provider: AnyProvider,
232 embedding_provider: AnyProvider,
233 channel: C,
234 registry: Arc<RwLock<SkillRegistry>>,
235 matcher: Option<SkillMatcherBackend>,
236 max_active_skills: usize,
237 tool_executor: impl ToolExecutor + 'static,
238 ) -> Self {
239 use state::{
240 AgentRuntime, CompressionState, DebugState, ExperimentState, FeedbackState, IndexState,
241 InstructionState, LifecycleState, McpState, MemoryState, MetricsState,
242 OrchestrationState, ProviderState, RuntimeConfig, SecurityState, Services,
243 SessionState, SkillState, ToolState,
244 };
245
246 debug_assert!(max_active_skills > 0, "max_active_skills must be > 0");
247 let all_skills: Vec<Skill> = {
248 let reg = registry.read();
249 reg.all_meta()
250 .iter()
251 .filter_map(|m| reg.skill(&m.name).ok())
252 .collect()
253 };
254 let empty_trust = HashMap::new();
255 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
256 let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
257 let system_prompt = build_system_prompt(&skills_prompt, None);
258 tracing::debug!(len = system_prompt.len(), "initial system prompt built");
259 tracing::trace!(prompt = %system_prompt, "full system prompt");
260
261 let initial_prompt_tokens = estimate_tokens(&system_prompt) as u64;
262 let token_counter = Arc::new(TokenCounter::new());
263
264 let services = Services {
265 memory: MemoryState::default(),
266 skill: SkillState::new(registry, matcher, max_active_skills, skills_prompt),
267 learning_engine: learning_engine::LearningEngine::new(),
268 feedback: FeedbackState::default(),
269 mcp: McpState::default(),
270 index: IndexState::default(),
271 session: SessionState::new(),
272 security: SecurityState::default(),
273 experiments: ExperimentState::new(),
274 compression: CompressionState::default(),
275 orchestration: OrchestrationState::default(),
276 focus: focus::FocusState::default(),
277 sidequest: sidequest::SidequestState::default(),
278 tool_state: ToolState::default(),
279 #[cfg(feature = "self-check")]
280 quality: None,
281 proactive_explorer: None,
282 promotion_engine: None,
283 };
284
285 let runtime = AgentRuntime {
286 config: RuntimeConfig::default(),
287 lifecycle: LifecycleState::new(),
288 providers: ProviderState::new(initial_prompt_tokens),
289 metrics: MetricsState::new(token_counter),
290 debug: DebugState::default(),
291 instructions: InstructionState::default(),
292 };
293
294 Self {
295 provider,
296 embedding_provider,
297 channel,
298 tool_executor: Arc::new(tool_executor),
299 msg: MessageState {
300 messages: vec![Message {
301 role: Role::System,
302 content: system_prompt,
303 parts: vec![],
304 metadata: MessageMetadata::default(),
305 }],
306 message_queue: VecDeque::new(),
307 pending_image_parts: Vec::new(),
308 last_persisted_message_id: None,
309 deferred_db_hide_ids: Vec::new(),
310 deferred_db_summaries: Vec::new(),
311 },
312 context_manager: context_manager::ContextManager::new(),
313 tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
314 services,
315 runtime,
316 }
317 }
318
319 #[must_use]
332 pub fn into_channel(self) -> C {
333 self.channel
334 }
335
336 pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
341 let Some(mgr) = &mut self.services.orchestration.subagent_manager else {
342 return vec![];
343 };
344
345 let finished: Vec<String> = mgr
346 .statuses()
347 .into_iter()
348 .filter_map(|(id, status)| {
349 if matches!(
350 status.state,
351 zeph_subagent::SubAgentState::Completed
352 | zeph_subagent::SubAgentState::Failed
353 | zeph_subagent::SubAgentState::Canceled
354 ) {
355 Some(id)
356 } else {
357 None
358 }
359 })
360 .collect();
361
362 let mut results = vec![];
363 for task_id in finished {
364 match mgr.collect(&task_id).await {
365 Ok(result) => results.push((task_id, result)),
366 Err(e) => {
367 tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
368 }
369 }
370 }
371 results
372 }
373
374 async fn call_llm_for_session_summary(
383 &self,
384 chat_messages: &[Message],
385 ) -> Option<zeph_memory::StructuredSummary> {
386 let timeout_dur = std::time::Duration::from_secs(
387 self.services
388 .memory
389 .compaction
390 .shutdown_summary_timeout_secs,
391 );
392 match tokio::time::timeout(
393 timeout_dur,
394 self.provider
395 .chat_typed_erased::<zeph_memory::StructuredSummary>(chat_messages),
396 )
397 .await
398 {
399 Ok(Ok(s)) => Some(s),
400 Ok(Err(e)) => {
401 tracing::warn!(
402 "shutdown summary: structured LLM call failed, falling back to plain: {e:#}"
403 );
404 self.plain_text_summary_fallback(chat_messages, timeout_dur)
405 .await
406 }
407 Err(_) => {
408 tracing::warn!(
409 "shutdown summary: structured LLM call timed out after {}s, falling back to plain",
410 self.services
411 .memory
412 .compaction
413 .shutdown_summary_timeout_secs
414 );
415 self.plain_text_summary_fallback(chat_messages, timeout_dur)
416 .await
417 }
418 }
419 }
420
421 async fn plain_text_summary_fallback(
422 &self,
423 chat_messages: &[Message],
424 timeout_dur: std::time::Duration,
425 ) -> Option<zeph_memory::StructuredSummary> {
426 match tokio::time::timeout(timeout_dur, self.provider.chat(chat_messages)).await {
427 Ok(Ok(plain)) => Some(zeph_memory::StructuredSummary {
428 summary: plain,
429 key_facts: vec![],
430 entities: vec![],
431 }),
432 Ok(Err(e)) => {
433 tracing::warn!("shutdown summary: plain LLM fallback failed: {e:#}");
434 None
435 }
436 Err(_) => {
437 tracing::warn!("shutdown summary: plain LLM fallback timed out");
438 None
439 }
440 }
441 }
442
443 async fn flush_orphaned_tool_use_on_shutdown(&mut self) {
448 use zeph_llm::provider::{MessagePart, Role};
449
450 let msgs = &self.msg.messages;
454 let Some(asst_idx) = msgs.iter().rposition(|m| m.role == Role::Assistant) else {
456 return;
457 };
458 let asst_msg = &msgs[asst_idx];
459 let tool_use_ids: Vec<(&str, &str, &serde_json::Value)> = asst_msg
460 .parts
461 .iter()
462 .filter_map(|p| {
463 if let MessagePart::ToolUse { id, name, input } = p {
464 Some((id.as_str(), name.as_str(), input))
465 } else {
466 None
467 }
468 })
469 .collect();
470 if tool_use_ids.is_empty() {
471 return;
472 }
473
474 let paired_ids: std::collections::HashSet<&str> = msgs
476 .get(asst_idx + 1..)
477 .into_iter()
478 .flatten()
479 .filter(|m| m.role == Role::User)
480 .flat_map(|m| m.parts.iter())
481 .filter_map(|p| {
482 if let MessagePart::ToolResult { tool_use_id, .. } = p {
483 Some(tool_use_id.as_str())
484 } else {
485 None
486 }
487 })
488 .collect();
489
490 let unpaired: Vec<zeph_llm::provider::ToolUseRequest> = tool_use_ids
491 .iter()
492 .filter(|(id, _, _)| !paired_ids.contains(*id))
493 .map(|(id, name, input)| zeph_llm::provider::ToolUseRequest {
494 id: (*id).to_owned(),
495 name: (*name).to_owned().into(),
496 input: (*input).clone(),
497 })
498 .collect();
499
500 if unpaired.is_empty() {
501 return;
502 }
503
504 tracing::info!(
505 count = unpaired.len(),
506 "shutdown: persisting tombstone ToolResults for unpaired in-flight tool calls"
507 );
508 self.persist_cancelled_tool_results(&unpaired).await;
509 }
510
511 async fn maybe_store_shutdown_summary(&mut self) {
521 if !self.services.memory.compaction.shutdown_summary {
522 return;
523 }
524 let Some(memory) = self.services.memory.persistence.memory.clone() else {
525 return;
526 };
527 let Some(conversation_id) = self.services.memory.persistence.conversation_id else {
528 return;
529 };
530
531 match memory.has_session_summary(conversation_id).await {
533 Ok(true) => {
534 tracing::debug!("shutdown summary: session already has a summary, skipping");
535 return;
536 }
537 Ok(false) => {}
538 Err(e) => {
539 tracing::warn!("shutdown summary: failed to check existing summary: {e:#}");
540 return;
541 }
542 }
543
544 let user_count = self
546 .msg
547 .messages
548 .iter()
549 .skip(1)
550 .filter(|m| m.role == Role::User)
551 .count();
552 if user_count
553 < self
554 .services
555 .memory
556 .compaction
557 .shutdown_summary_min_messages
558 {
559 tracing::debug!(
560 user_count,
561 min = self
562 .services
563 .memory
564 .compaction
565 .shutdown_summary_min_messages,
566 "shutdown summary: too few user messages, skipping"
567 );
568 return;
569 }
570
571 let _ = self.channel.send_status("Saving session summary...").await;
573
574 let max = self
576 .services
577 .memory
578 .compaction
579 .shutdown_summary_max_messages;
580 if max == 0 {
581 tracing::debug!("shutdown summary: max_messages=0, skipping");
582 return;
583 }
584 let non_system: Vec<_> = self.msg.messages.iter().skip(1).collect();
585 let slice = if non_system.len() > max {
586 &non_system[non_system.len() - max..]
587 } else {
588 &non_system[..]
589 };
590
591 let msgs_for_prompt: Vec<(zeph_memory::MessageId, String, String)> = slice
592 .iter()
593 .map(|m| {
594 let role = match m.role {
595 Role::User => "user".to_owned(),
596 Role::Assistant => "assistant".to_owned(),
597 Role::System => "system".to_owned(),
598 };
599 (zeph_memory::MessageId(0), role, m.content.clone())
600 })
601 .collect();
602
603 let prompt = zeph_memory::build_summarization_prompt(&msgs_for_prompt);
604 let chat_messages = vec![Message {
605 role: Role::User,
606 content: prompt,
607 parts: vec![],
608 metadata: MessageMetadata::default(),
609 }];
610
611 let Some(structured) = self.call_llm_for_session_summary(&chat_messages).await else {
612 let _ = self.channel.send_status("").await;
613 return;
614 };
615
616 if let Err(e) = memory
617 .store_shutdown_summary(conversation_id, &structured.summary, &structured.key_facts)
618 .await
619 {
620 tracing::warn!("shutdown summary: storage failed: {e:#}");
621 } else {
622 tracing::info!(
623 conversation_id = conversation_id.0,
624 "shutdown summary stored"
625 );
626 }
627
628 let _ = self.channel.send_status("").await;
630 }
631
632 pub async fn shutdown(&mut self) {
648 let _ = self.channel.send_status("Shutting down...").await;
649
650 self.provider.save_router_state().await;
652
653 if let Some(ref advisor) = self.services.orchestration.topology_advisor
655 && let Err(e) = advisor.save()
656 {
657 tracing::warn!(error = %e, "adaptorch: failed to persist state");
658 }
659
660 if let Some(ref mut mgr) = self.services.orchestration.subagent_manager {
661 mgr.shutdown_all();
662 }
663
664 if let Some(ref manager) = self.services.mcp.manager {
665 manager.shutdown_all_shared().await;
666 }
667
668 if let Some(turns) = self.context_manager.turns_since_last_hard_compaction {
672 self.update_metrics(|m| {
673 m.compaction_turns_after_hard.push(turns);
674 });
675 self.context_manager.turns_since_last_hard_compaction = None;
676 }
677
678 if let Some(ref tx) = self.runtime.metrics.metrics_tx {
679 let m = tx.borrow();
680 if m.filter_applications > 0 {
681 #[allow(clippy::cast_precision_loss)]
682 let pct = if m.filter_raw_tokens > 0 {
683 m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
684 } else {
685 0.0
686 };
687 tracing::info!(
688 raw_tokens = m.filter_raw_tokens,
689 saved_tokens = m.filter_saved_tokens,
690 applications = m.filter_applications,
691 "tool output filtering saved ~{} tokens ({pct:.0}%)",
692 m.filter_saved_tokens,
693 );
694 }
695 if m.compaction_hard_count > 0 {
696 tracing::info!(
697 hard_compactions = m.compaction_hard_count,
698 turns_after_hard = ?m.compaction_turns_after_hard,
699 "hard compaction trajectory"
700 );
701 }
702 }
703
704 self.flush_orphaned_tool_use_on_shutdown().await;
708
709 self.runtime.lifecycle.supervisor.abort_all();
715
716 if let Some(h) = self.services.compression.pending_task_goal.take() {
719 h.abort();
720 }
721 if let Some(h) = self.services.compression.pending_sidequest_result.take() {
722 h.abort();
723 }
724 if let Some(h) = self.services.compression.pending_subgoal.take() {
725 h.abort();
726 }
727
728 self.services.learning_engine.learning_tasks.abort_all();
730
731 for _ in 0..4 {
736 tokio::task::yield_now().await;
737 }
738
739 self.maybe_store_shutdown_summary().await;
740 self.maybe_store_session_digest().await;
741
742 tracing::info!("agent shutdown complete");
743 }
744
745 fn refresh_subagent_metrics(&mut self) {
752 let Some(ref mgr) = self.services.orchestration.subagent_manager else {
753 return;
754 };
755 let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
756 .statuses()
757 .into_iter()
758 .map(|(id, s)| {
759 let def = mgr.agents_def(&id);
760 crate::metrics::SubAgentMetrics {
761 name: def.map_or_else(|| id[..8.min(id.len())].to_owned(), |d| d.name.clone()),
762 id: id.clone(),
763 state: format!("{:?}", s.state).to_lowercase(),
764 turns_used: s.turns_used,
765 max_turns: def.map_or(20, |d| d.permissions.max_turns),
766 background: def.is_some_and(|d| d.permissions.background),
767 elapsed_secs: s.started_at.elapsed().as_secs(),
768 permission_mode: def.map_or_else(String::new, |d| {
769 use zeph_subagent::def::PermissionMode;
770 match d.permissions.permission_mode {
771 PermissionMode::Default => String::new(),
772 PermissionMode::AcceptEdits => "accept_edits".into(),
773 PermissionMode::DontAsk => "dont_ask".into(),
774 PermissionMode::BypassPermissions => "bypass_permissions".into(),
775 PermissionMode::Plan => "plan".into(),
776 }
777 }),
778 transcript_dir: mgr
779 .agent_transcript_dir(&id)
780 .map(|p| p.to_string_lossy().into_owned()),
781 }
782 })
783 .collect();
784 self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
785 }
786
787 async fn notify_completed_subagents(&mut self) -> Result<(), error::AgentError> {
789 let completed = self.poll_subagents().await;
790 for (task_id, result) in completed {
791 let notice = if result.is_empty() {
792 format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
793 } else {
794 format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
795 };
796 if let Err(e) = self.channel.send(¬ice).await {
797 tracing::warn!(error = %e, "failed to send sub-agent completion notice");
798 }
799 }
800 Ok(())
801 }
802
803 #[allow(clippy::too_many_lines)] pub async fn run(&mut self) -> Result<(), error::AgentError>
810 where
811 C: 'static,
812 {
813 if let Some(mut rx) = self.runtime.lifecycle.warmup_ready.take()
814 && !*rx.borrow()
815 {
816 let _ = rx.changed().await;
817 if !*rx.borrow() {
818 tracing::warn!("model warmup did not complete successfully");
819 }
820 }
821
822 self.restore_channel_provider().await;
824
825 self.load_and_cache_session_digest().await;
827 self.maybe_send_resume_recap().await;
828
829 loop {
830 self.apply_provider_override();
831 self.check_tool_refresh().await;
832 self.process_pending_elicitations().await;
833 self.refresh_subagent_metrics();
834 self.notify_completed_subagents().await?;
835 self.drain_channel();
836
837 let (text, image_parts) = if let Some(queued) = self.msg.message_queue.pop_front() {
838 self.notify_queue_count().await;
839 if queued.raw_attachments.is_empty() {
840 (queued.text, queued.image_parts)
841 } else {
842 let msg = crate::channel::ChannelMessage {
843 text: queued.text,
844 attachments: queued.raw_attachments,
845 };
846 self.resolve_message(msg).await
847 }
848 } else {
849 match self.next_event().await? {
850 None | Some(LoopEvent::Shutdown) => break,
851 Some(LoopEvent::SkillReload) => {
852 self.reload_skills().await;
853 continue;
854 }
855 Some(LoopEvent::InstructionReload) => {
856 self.reload_instructions();
857 continue;
858 }
859 Some(LoopEvent::ConfigReload) => {
860 self.reload_config();
861 continue;
862 }
863 Some(LoopEvent::UpdateNotification(msg)) => {
864 if let Err(e) = self.channel.send(&msg).await {
865 tracing::warn!("failed to send update notification: {e}");
866 }
867 continue;
868 }
869 Some(LoopEvent::ExperimentCompleted(msg)) => {
870 self.services.experiments.cancel = None;
871 if let Err(e) = self.channel.send(&msg).await {
872 tracing::warn!("failed to send experiment completion: {e}");
873 }
874 continue;
875 }
876 Some(LoopEvent::ScheduledTask(prompt)) => {
877 let text = format!("{SCHEDULED_TASK_PREFIX}{prompt}");
878 let msg = crate::channel::ChannelMessage {
879 text,
880 attachments: Vec::new(),
881 };
882 self.drain_channel();
883 self.resolve_message(msg).await
884 }
885 Some(LoopEvent::TaskInjected(injection)) => {
886 if let Some(ref mut ls) = self.runtime.lifecycle.user_loop {
887 ls.iteration += 1;
888 tracing::info!(iteration = ls.iteration, "loop: tick");
889 }
890 let msg = crate::channel::ChannelMessage {
891 text: injection.prompt,
892 attachments: Vec::new(),
893 };
894 self.drain_channel();
895 self.resolve_message(msg).await
896 }
897 Some(LoopEvent::FileChanged(event)) => {
898 self.handle_file_changed(event).await;
899 continue;
900 }
901 Some(LoopEvent::Message(msg)) => {
902 self.drain_channel();
903 self.resolve_message(msg).await
904 }
905 }
906 };
907
908 let trimmed = text.trim();
909
910 if trimmed.starts_with('/') {
913 let slash_urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
914 if !slash_urls.is_empty() {
915 self.services
916 .security
917 .user_provided_urls
918 .write()
919 .extend(slash_urls);
920 }
921 }
922
923 let session_impl = command_context_impls::SessionAccessImpl {
946 supports_exit: self.channel.supports_exit(),
947 };
948 let mut messages_impl = command_context_impls::MessageAccessImpl {
949 msg: &mut self.msg,
950 tool_state: &mut self.services.tool_state,
951 providers: &mut self.runtime.providers,
952 metrics: &self.runtime.metrics,
953 security: &mut self.services.security,
954 tool_orchestrator: &mut self.tool_orchestrator,
955 };
956 let mut sink_adapter = crate::channel::ChannelSinkAdapter(&mut self.channel);
958 let mut null_agent = zeph_commands::NullAgent;
960 let registry_handled = {
961 use zeph_commands::CommandRegistry;
962 use zeph_commands::handlers::debug::{
963 DebugDumpCommand, DumpFormatCommand, LogCommand,
964 };
965 use zeph_commands::handlers::help::HelpCommand;
966 use zeph_commands::handlers::session::{
967 ClearCommand, ClearQueueCommand, ExitCommand, QuitCommand, ResetCommand,
968 };
969
970 let mut reg = CommandRegistry::new();
971 reg.register(ExitCommand);
972 reg.register(QuitCommand);
973 reg.register(ClearCommand);
974 reg.register(ResetCommand);
975 reg.register(ClearQueueCommand);
976 reg.register(LogCommand);
977 reg.register(DebugDumpCommand);
978 reg.register(DumpFormatCommand);
979 reg.register(HelpCommand);
980 #[cfg(test)]
981 reg.register(test_stubs::TestErrorCommand);
982
983 let mut ctx = zeph_commands::CommandContext {
984 sink: &mut sink_adapter,
985 debug: &mut self.runtime.debug,
986 messages: &mut messages_impl,
987 session: &session_impl,
988 agent: &mut null_agent,
989 };
990 reg.dispatch(&mut ctx, trimmed).await
991 };
992 let session_reg_missed = registry_handled.is_none();
993 match self
994 .apply_dispatch_result(registry_handled, trimmed, false)
995 .await
996 {
997 DispatchFlow::Break => break,
998 DispatchFlow::Continue => continue,
999 DispatchFlow::Fallthrough => {
1000 }
1002 }
1003
1004 let mut agent_null_debug = command_context_impls::NullDebugAccess;
1010 let mut agent_null_messages = command_context_impls::NullMessageAccess;
1011 let agent_null_session = command_context_impls::NullSessionAccess;
1012 let mut agent_null_sink = zeph_commands::NullSink;
1013 let agent_result: Option<
1014 Result<zeph_commands::CommandOutput, zeph_commands::CommandError>,
1015 > = if session_reg_missed {
1016 use zeph_commands::CommandRegistry;
1017 use zeph_commands::handlers::{
1018 acp::AcpCommand,
1019 agent_cmd::AgentCommand,
1020 compaction::{CompactCommand, NewConversationCommand, RecapCommand},
1021 experiment::ExperimentCommand,
1022 loop_cmd::LoopCommand,
1023 lsp::LspCommand,
1024 mcp::McpCommand,
1025 memory::{GraphCommand, GuidelinesCommand, MemoryCommand},
1026 misc::{CacheStatsCommand, ImageCommand, NotifyTestCommand},
1027 model::{ModelCommand, ProviderCommand},
1028 plan::PlanCommand,
1029 plugins::PluginsCommand,
1030 policy::PolicyCommand,
1031 scheduler::SchedulerCommand,
1032 skill::{FeedbackCommand, SkillCommand, SkillsCommand},
1033 status::{FocusCommand, GuardrailCommand, SideQuestCommand, StatusCommand},
1034 };
1035
1036 let mut agent_reg = CommandRegistry::new();
1037 agent_reg.register(MemoryCommand);
1038 agent_reg.register(GraphCommand);
1039 agent_reg.register(GuidelinesCommand);
1040 agent_reg.register(ModelCommand);
1041 agent_reg.register(ProviderCommand);
1042 agent_reg.register(SkillCommand);
1044 agent_reg.register(SkillsCommand);
1045 agent_reg.register(FeedbackCommand);
1046 agent_reg.register(McpCommand);
1047 agent_reg.register(PolicyCommand);
1048 agent_reg.register(SchedulerCommand);
1049 agent_reg.register(LspCommand);
1050 agent_reg.register(CacheStatsCommand);
1052 agent_reg.register(ImageCommand);
1053 agent_reg.register(NotifyTestCommand);
1054 agent_reg.register(StatusCommand);
1055 agent_reg.register(GuardrailCommand);
1056 agent_reg.register(FocusCommand);
1057 agent_reg.register(SideQuestCommand);
1058 agent_reg.register(AgentCommand);
1059 agent_reg.register(CompactCommand);
1061 agent_reg.register(NewConversationCommand);
1062 agent_reg.register(RecapCommand);
1063 agent_reg.register(ExperimentCommand);
1064 agent_reg.register(PlanCommand);
1065 agent_reg.register(LoopCommand);
1066 agent_reg.register(PluginsCommand);
1067 agent_reg.register(AcpCommand);
1068
1069 let mut ctx = zeph_commands::CommandContext {
1070 sink: &mut agent_null_sink,
1071 debug: &mut agent_null_debug,
1072 messages: &mut agent_null_messages,
1073 session: &agent_null_session,
1074 agent: self,
1075 };
1076 agent_reg.dispatch(&mut ctx, trimmed).await
1078 } else {
1079 None
1080 };
1081 match self
1085 .apply_dispatch_result(agent_result, trimmed, true)
1086 .await
1087 {
1088 DispatchFlow::Break => break,
1089 DispatchFlow::Continue => continue,
1090 DispatchFlow::Fallthrough => {
1091 }
1093 }
1094
1095 match self.handle_builtin_command(trimmed) {
1096 Some(true) => break,
1097 Some(false) => continue,
1098 None => {}
1099 }
1100
1101 self.process_user_message(text, image_parts).await?;
1102 }
1103
1104 self.maybe_autodream().await;
1107
1108 if let Some(ref mut tc) = self.runtime.debug.trace_collector {
1110 tc.finish();
1111 }
1112
1113 Ok(())
1114 }
1115
1116 async fn apply_dispatch_result(
1122 &mut self,
1123 result: Option<Result<zeph_commands::CommandOutput, zeph_commands::CommandError>>,
1124 command: &str,
1125 with_learning: bool,
1126 ) -> DispatchFlow {
1127 match result {
1128 Some(Ok(zeph_commands::CommandOutput::Exit)) => {
1129 let _ = self.channel.flush_chunks().await;
1130 DispatchFlow::Break
1131 }
1132 Some(Ok(
1133 zeph_commands::CommandOutput::Continue | zeph_commands::CommandOutput::Silent,
1134 )) => {
1135 let _ = self.channel.flush_chunks().await;
1136 DispatchFlow::Continue
1137 }
1138 Some(Ok(zeph_commands::CommandOutput::Message(msg))) => {
1139 let _ = self.channel.send(&msg).await;
1140 let _ = self.channel.flush_chunks().await;
1141 if with_learning {
1142 self.maybe_trigger_post_command_learning(command).await;
1143 }
1144 DispatchFlow::Continue
1145 }
1146 Some(Err(e)) => {
1147 let _ = self.channel.send(&e.to_string()).await;
1148 let _ = self.channel.flush_chunks().await;
1149 tracing::warn!(command = %command, error = %e.0, "slash command failed");
1150 DispatchFlow::Continue
1151 }
1152 None => DispatchFlow::Fallthrough,
1153 }
1154 }
1155
1156 fn apply_provider_override(&mut self) {
1158 if let Some(ref slot) = self.runtime.providers.provider_override
1159 && let Some(new_provider) = slot.write().take()
1160 {
1161 tracing::debug!(provider = new_provider.name(), "ACP model override applied");
1162 self.provider = new_provider;
1163 }
1164 }
1165
1166 async fn next_event(&mut self) -> Result<Option<LoopEvent>, error::AgentError> {
1174 let event = tokio::select! {
1175 result = self.channel.recv() => {
1176 return Ok(result?.map(LoopEvent::Message));
1177 }
1178 () = shutdown_signal(&mut self.runtime.lifecycle.shutdown) => {
1179 tracing::info!("shutting down");
1180 LoopEvent::Shutdown
1181 }
1182 Some(_) = recv_optional(&mut self.services.skill.skill_reload_rx) => {
1183 LoopEvent::SkillReload
1184 }
1185 Some(_) = recv_optional(&mut self.runtime.instructions.reload_rx) => {
1186 LoopEvent::InstructionReload
1187 }
1188 Some(_) = recv_optional(&mut self.runtime.lifecycle.config_reload_rx) => {
1189 LoopEvent::ConfigReload
1190 }
1191 Some(msg) = recv_optional(&mut self.runtime.lifecycle.update_notify_rx) => {
1192 LoopEvent::UpdateNotification(msg)
1193 }
1194 Some(msg) = recv_optional(&mut self.services.experiments.notify_rx) => {
1195 LoopEvent::ExperimentCompleted(msg)
1196 }
1197 Some(prompt) = recv_optional(&mut self.runtime.lifecycle.custom_task_rx) => {
1198 tracing::info!("scheduler: injecting custom task as agent turn");
1199 LoopEvent::ScheduledTask(prompt)
1200 }
1201 () = async {
1202 if let Some(ref mut ls) = self.runtime.lifecycle.user_loop {
1203 if ls.cancel_tx.is_cancelled() {
1204 std::future::pending::<()>().await;
1205 } else {
1206 ls.interval.tick().await;
1207 }
1208 } else {
1209 std::future::pending::<()>().await;
1210 }
1211 } => {
1212 let Some(ls) = self.runtime.lifecycle.user_loop.as_ref() else {
1216 return Ok(None);
1217 };
1218 if ls.cancel_tx.is_cancelled() {
1219 self.runtime.lifecycle.user_loop = None;
1220 return Ok(None);
1221 }
1222 let prompt = ls.prompt.clone();
1223 LoopEvent::TaskInjected(task_injection::TaskInjection { prompt })
1224 }
1225 Some(event) = recv_optional(&mut self.runtime.lifecycle.file_changed_rx) => {
1226 LoopEvent::FileChanged(event)
1227 }
1228 };
1229 Ok(Some(event))
1230 }
1231
1232 async fn resolve_message(
1233 &self,
1234 msg: crate::channel::ChannelMessage,
1235 ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
1236 use crate::channel::{Attachment, AttachmentKind};
1237 use zeph_llm::provider::{ImageData, MessagePart};
1238
1239 let text_base = msg.text.clone();
1240
1241 let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
1242 .attachments
1243 .into_iter()
1244 .partition(|a| a.kind == AttachmentKind::Audio);
1245
1246 tracing::debug!(
1247 audio = audio_attachments.len(),
1248 has_stt = self.runtime.providers.stt.is_some(),
1249 "resolve_message attachments"
1250 );
1251
1252 let text = if !audio_attachments.is_empty()
1253 && let Some(stt) = self.runtime.providers.stt.as_ref()
1254 {
1255 let mut transcribed_parts = Vec::new();
1256 for attachment in &audio_attachments {
1257 if attachment.data.len() > MAX_AUDIO_BYTES {
1258 tracing::warn!(
1259 size = attachment.data.len(),
1260 max = MAX_AUDIO_BYTES,
1261 "audio attachment exceeds size limit, skipping"
1262 );
1263 continue;
1264 }
1265 match stt
1266 .transcribe(&attachment.data, attachment.filename.as_deref())
1267 .await
1268 {
1269 Ok(result) => {
1270 tracing::info!(
1271 len = result.text.len(),
1272 language = ?result.language,
1273 "audio transcribed"
1274 );
1275 transcribed_parts.push(result.text);
1276 }
1277 Err(e) => {
1278 tracing::error!(error = %e, "audio transcription failed");
1279 }
1280 }
1281 }
1282 if transcribed_parts.is_empty() {
1283 text_base
1284 } else {
1285 let transcribed = transcribed_parts.join("\n");
1286 if text_base.is_empty() {
1287 transcribed
1288 } else {
1289 format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
1290 }
1291 }
1292 } else {
1293 if !audio_attachments.is_empty() {
1294 tracing::warn!(
1295 count = audio_attachments.len(),
1296 "audio attachments received but no STT provider configured, dropping"
1297 );
1298 }
1299 text_base
1300 };
1301
1302 let mut image_parts = Vec::new();
1303 for attachment in image_attachments {
1304 if attachment.data.len() > MAX_IMAGE_BYTES {
1305 tracing::warn!(
1306 size = attachment.data.len(),
1307 max = MAX_IMAGE_BYTES,
1308 "image attachment exceeds size limit, skipping"
1309 );
1310 continue;
1311 }
1312 let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
1313 image_parts.push(MessagePart::Image(Box::new(ImageData {
1314 data: attachment.data,
1315 mime_type,
1316 })));
1317 }
1318
1319 (text, image_parts)
1320 }
1321
1322 fn begin_turn(&mut self, input: turn::TurnInput) -> turn::Turn {
1329 let id = turn::TurnId(self.runtime.debug.iteration_counter as u64);
1330 self.runtime.debug.iteration_counter += 1;
1331 let cancel_token = CancellationToken::new();
1332 self.runtime.lifecycle.cancel_token = cancel_token.clone();
1334 self.services.security.user_provided_urls.write().clear();
1335 self.runtime.lifecycle.turn_llm_requests = 0;
1337
1338 let context = turn::TurnContext::new(id, cancel_token, self.runtime.config.timeouts);
1339 turn::Turn::new(context, input)
1340 }
1341
1342 fn end_turn(&mut self, turn: turn::Turn) {
1349 self.runtime.metrics.pending_timings = turn.metrics.timings;
1350 self.flush_turn_timings();
1351 self.services.session.current_turn_intent = None;
1353 }
1354
1355 #[cfg_attr(
1356 feature = "profiling",
1357 tracing::instrument(name = "agent.turn", skip_all, fields(turn_id))
1358 )]
1359 async fn process_user_message(
1360 &mut self,
1361 text: String,
1362 image_parts: Vec<zeph_llm::provider::MessagePart>,
1363 ) -> Result<(), error::AgentError> {
1364 let input = turn::TurnInput::new(text, image_parts);
1365 let mut t = self.begin_turn(input);
1366
1367 let turn_idx = usize::try_from(t.id().0).unwrap_or(usize::MAX);
1368 tracing::Span::current().record("turn_id", t.id().0);
1369 self.runtime
1371 .debug
1372 .start_iteration_span(turn_idx, t.input.text.trim());
1373
1374 let result = self.process_user_message_inner(&mut t).await;
1375
1376 let span_status = if result.is_ok() {
1378 crate::debug_dump::trace::SpanStatus::Ok
1379 } else {
1380 crate::debug_dump::trace::SpanStatus::Error {
1381 message: "iteration failed".to_owned(),
1382 }
1383 };
1384 self.runtime.debug.end_iteration_span(turn_idx, span_status);
1385
1386 self.end_turn(t);
1387 result
1388 }
1389
1390 async fn process_user_message_inner(
1391 &mut self,
1392 turn: &mut turn::Turn,
1393 ) -> Result<(), error::AgentError> {
1394 self.reap_background_tasks_and_update_metrics();
1395
1396 self.drain_background_completions();
1400
1401 self.wire_cancel_bridge(turn.cancel_token());
1402
1403 let text = turn.input.text.clone();
1405 let trimmed_owned = text.trim().to_owned();
1406 let trimmed = trimmed_owned.as_str();
1407
1408 if self.services.security.vigil.is_some() {
1411 let intent_len = trimmed.floor_char_boundary(1024.min(trimmed.len()));
1412 self.services.session.current_turn_intent = Some(trimmed[..intent_len].to_owned());
1413 }
1414
1415 if let Some(result) = self.dispatch_slash_command(trimmed).await {
1416 return result;
1417 }
1418
1419 self.check_pending_rollbacks().await;
1420
1421 if self.pre_process_security(trimmed).await? {
1422 return Ok(());
1423 }
1424
1425 let t_ctx = std::time::Instant::now();
1426 tracing::debug!("turn timing: prepare_context start");
1427 self.advance_context_lifecycle_guarded(&text, trimmed).await;
1428 turn.metrics_mut().timings.prepare_context_ms =
1429 u64::try_from(t_ctx.elapsed().as_millis()).unwrap_or(u64::MAX);
1430 tracing::debug!(
1431 ms = turn.metrics_snapshot().timings.prepare_context_ms,
1432 "turn timing: prepare_context done"
1433 );
1434
1435 let image_parts = std::mem::take(&mut turn.input.image_parts);
1436 let merged_text = self.build_user_message_text_with_bg_completions(&text);
1440 let user_msg = self.build_user_message(&merged_text, image_parts);
1441
1442 let urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
1445 if !urls.is_empty() {
1446 self.services
1447 .security
1448 .user_provided_urls
1449 .write()
1450 .extend(urls);
1451 }
1452
1453 self.services.memory.extraction.goal_text = Some(text.clone());
1456
1457 let t_persist = std::time::Instant::now();
1458 tracing::debug!("turn timing: persist_message(user) start");
1459 self.persist_message(Role::User, &text, &[], false).await;
1461 turn.metrics_mut().timings.persist_message_ms =
1462 u64::try_from(t_persist.elapsed().as_millis()).unwrap_or(u64::MAX);
1463 tracing::debug!(
1464 ms = turn.metrics_snapshot().timings.persist_message_ms,
1465 "turn timing: persist_message(user) done"
1466 );
1467 self.push_message(user_msg);
1468
1469 tracing::debug!("turn timing: process_response start");
1472 let turn_had_error = if let Err(e) = self.process_response().await {
1473 self.services.learning_engine.learning_tasks.detach_all();
1475 tracing::error!("Response processing failed: {e:#}");
1476
1477 if e.is_no_providers() {
1480 self.runtime.lifecycle.last_no_providers_at = Some(std::time::Instant::now());
1481 let backoff_secs = self.runtime.config.timeouts.no_providers_backoff_secs;
1482 tracing::warn!(
1483 backoff_secs,
1484 "no providers available; backing off before next turn"
1485 );
1486 tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await;
1487 }
1488
1489 let user_msg = format!("Error: {e:#}");
1490 self.channel.send(&user_msg).await?;
1491 self.msg.messages.pop();
1492 self.recompute_prompt_tokens();
1493 self.channel.flush_chunks().await?;
1494 true
1495 } else {
1496 self.services.learning_engine.learning_tasks.detach_all();
1499 self.truncate_old_tool_results();
1500 self.maybe_update_magic_docs();
1502 self.maybe_spawn_promotion_scan();
1504 false
1505 };
1506 tracing::debug!("turn timing: process_response done");
1507
1508 #[cfg(feature = "self-check")]
1510 if let Some(pipeline) = self.services.quality.clone() {
1511 self.run_self_check_for_turn(pipeline, turn.id().0).await;
1512 }
1513 let _ = self.channel.flush_chunks().await;
1518
1519 self.maybe_fire_completion_notification(turn, turn_had_error);
1520
1521 turn.metrics_mut().timings.llm_chat_ms = self.runtime.metrics.pending_timings.llm_chat_ms;
1526 turn.metrics_mut().timings.tool_exec_ms = self.runtime.metrics.pending_timings.tool_exec_ms;
1527
1528 Ok(())
1529 }
1530
1531 fn wire_cancel_bridge(&mut self, turn_token: &tokio_util::sync::CancellationToken) {
1537 let signal = Arc::clone(&self.runtime.lifecycle.cancel_signal);
1538 let token = turn_token.clone();
1539 self.runtime.lifecycle.cancel_token = turn_token.clone();
1541 if let Some(prev) = self.runtime.lifecycle.cancel_bridge_handle.take() {
1542 prev.abort();
1543 }
1544 self.runtime.lifecycle.cancel_bridge_handle =
1545 Some(self.runtime.lifecycle.task_supervisor.spawn_oneshot(
1546 std::sync::Arc::from("agent.lifecycle.cancel_bridge"),
1547 move || async move {
1548 signal.notified().await;
1549 token.cancel();
1550 },
1551 ));
1552 }
1553
1554 fn reap_background_tasks_and_update_metrics(&mut self) {
1558 let bg_signal = self.runtime.lifecycle.supervisor.reap();
1559 if bg_signal.did_summarize {
1560 self.services.memory.persistence.unsummarized_count = 0;
1561 tracing::debug!("background summarization completed; unsummarized_count reset");
1562 }
1563 let snap = self.runtime.lifecycle.supervisor.metrics_snapshot();
1564 self.update_metrics(|m| {
1565 m.bg_inflight = snap.inflight as u64;
1566 m.bg_dropped = snap.total_dropped();
1567 m.bg_completed = snap.total_completed();
1568 m.bg_enrichment_inflight = snap.class_inflight[0] as u64;
1569 m.bg_telemetry_inflight = snap.class_inflight[1] as u64;
1570 });
1571
1572 if self.runtime.lifecycle.shell_executor_handle.is_some() {
1574 let shell_rows: Vec<crate::metrics::ShellBackgroundRunRow> = self
1575 .runtime
1576 .lifecycle
1577 .shell_executor_handle
1578 .as_ref()
1579 .map(|e| e.background_runs_snapshot())
1580 .unwrap_or_default()
1581 .into_iter()
1582 .map(|s| crate::metrics::ShellBackgroundRunRow {
1583 run_id: truncate_shell_run_id(&s.run_id),
1584 command: truncate_shell_command(&s.command),
1585 elapsed_secs: s.elapsed_ms / 1000,
1586 })
1587 .collect();
1588 self.update_metrics(|m| {
1589 m.shell_background_runs = shell_rows;
1590 });
1591 }
1592
1593 if self
1596 .runtime
1597 .config
1598 .supervisor_config
1599 .abort_enrichment_on_turn
1600 {
1601 self.runtime
1602 .lifecycle
1603 .supervisor
1604 .abort_class(agent_supervisor::TaskClass::Enrichment);
1605 }
1606 }
1607
1608 fn maybe_fire_completion_notification(&mut self, turn: &turn::Turn, is_error: bool) {
1621 let snap = turn.metrics_snapshot().timings.clone();
1622 let duration_ms = snap
1623 .prepare_context_ms
1624 .saturating_add(snap.llm_chat_ms)
1625 .saturating_add(snap.tool_exec_ms);
1626 let summary = crate::notifications::TurnSummary {
1627 duration_ms,
1628 preview: self.last_assistant_preview(160),
1629 tool_calls: 0,
1631 llm_requests: self.runtime.lifecycle.turn_llm_requests,
1632 exit_status: if is_error {
1633 crate::notifications::TurnExitStatus::Error
1634 } else {
1635 crate::notifications::TurnExitStatus::Success
1636 },
1637 };
1638
1639 let gate_ok = self
1641 .runtime
1642 .lifecycle
1643 .notifier
1644 .as_ref()
1645 .is_none_or(|n| n.should_fire(&summary));
1646
1647 if let Some(ref notifier) = self.runtime.lifecycle.notifier
1649 && gate_ok
1650 {
1651 notifier.fire(&summary);
1652 }
1653
1654 let hooks = self.services.session.hooks_config.turn_complete.clone();
1660 if !hooks.is_empty() && gate_ok {
1661 let mut env = std::collections::HashMap::new();
1662 env.insert(
1663 "ZEPH_TURN_DURATION_MS".to_owned(),
1664 summary.duration_ms.to_string(),
1665 );
1666 env.insert(
1667 "ZEPH_TURN_STATUS".to_owned(),
1668 if is_error { "error" } else { "success" }.to_owned(),
1669 );
1670 env.insert("ZEPH_TURN_PREVIEW".to_owned(), summary.preview.clone());
1671 env.insert(
1672 "ZEPH_TURN_LLM_REQUESTS".to_owned(),
1673 summary.llm_requests.to_string(),
1674 );
1675 let _span = tracing::info_span!("core.agent.turn_hooks").entered();
1676 let _accepted = self.runtime.lifecycle.supervisor.spawn(
1677 agent_supervisor::TaskClass::Telemetry,
1678 "turn-complete-hooks",
1679 async move {
1680 let no_mcp: Option<&'static dyn zeph_subagent::McpDispatch> = None;
1683 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, no_mcp).await {
1684 tracing::warn!(error = %e, "turn_complete hook failed");
1685 }
1686 },
1687 );
1688 }
1689 }
1690
1691 #[cfg_attr(
1693 feature = "profiling",
1694 tracing::instrument(name = "agent.security_prescreen", skip_all)
1695 )]
1696 async fn pre_process_security(&mut self, trimmed: &str) -> Result<bool, error::AgentError> {
1697 if let Some(ref guardrail) = self.services.security.guardrail {
1699 use zeph_sanitizer::guardrail::GuardrailVerdict;
1700 let verdict = guardrail.check(trimmed).await;
1701 match &verdict {
1702 GuardrailVerdict::Flagged { reason, .. } => {
1703 tracing::warn!(
1704 reason = %reason,
1705 should_block = verdict.should_block(),
1706 "guardrail flagged user input"
1707 );
1708 if verdict.should_block() {
1709 let msg = format!("[guardrail] Input blocked: {reason}");
1710 let _ = self.channel.send(&msg).await;
1711 let _ = self.channel.flush_chunks().await;
1712 return Ok(true);
1713 }
1714 let _ = self
1716 .channel
1717 .send(&format!("[guardrail] Warning: {reason}"))
1718 .await;
1719 }
1720 GuardrailVerdict::Error { error } => {
1721 if guardrail.error_should_block() {
1722 tracing::warn!(%error, "guardrail check failed (fail_strategy=closed), blocking input");
1723 let msg = "[guardrail] Input blocked: check failed (see logs for details)";
1724 let _ = self.channel.send(msg).await;
1725 let _ = self.channel.flush_chunks().await;
1726 return Ok(true);
1727 }
1728 tracing::warn!(%error, "guardrail check failed (fail_strategy=open), allowing input");
1729 }
1730 GuardrailVerdict::Safe => {}
1731 }
1732 }
1733
1734 #[cfg(feature = "classifiers")]
1740 if self.services.security.sanitizer.scan_user_input() {
1741 match self
1742 .services
1743 .security
1744 .sanitizer
1745 .classify_injection(trimmed)
1746 .await
1747 {
1748 zeph_sanitizer::InjectionVerdict::Blocked => {
1749 self.push_classifier_metrics();
1750 let _ = self
1751 .channel
1752 .send("[security] Input blocked: injection detected by classifier.")
1753 .await;
1754 let _ = self.channel.flush_chunks().await;
1755 return Ok(true);
1756 }
1757 zeph_sanitizer::InjectionVerdict::Suspicious => {
1758 tracing::warn!("injection_classifier soft_signal on user input");
1759 }
1760 zeph_sanitizer::InjectionVerdict::Clean => {}
1761 }
1762 }
1763 #[cfg(feature = "classifiers")]
1764 self.push_classifier_metrics();
1765
1766 Ok(false)
1767 }
1768
1769 async fn advance_context_lifecycle_guarded(&mut self, text: &str, trimmed: &str) {
1776 let backoff_secs = self.runtime.config.timeouts.no_providers_backoff_secs;
1777 let prep_timeout_secs = self.runtime.config.timeouts.context_prep_timeout_secs;
1778
1779 let providers_recently_failed = self
1781 .runtime
1782 .lifecycle
1783 .last_no_providers_at
1784 .is_some_and(|t| t.elapsed().as_secs() < backoff_secs);
1785
1786 if providers_recently_failed {
1787 tracing::warn!(
1788 backoff_secs,
1789 "skipping context preparation: providers were unavailable on last turn"
1790 );
1791 return;
1792 }
1793
1794 let timeout_dur = std::time::Duration::from_secs(prep_timeout_secs);
1795 match tokio::time::timeout(timeout_dur, self.advance_context_lifecycle(text, trimmed)).await
1796 {
1797 Ok(()) => {}
1798 Err(_elapsed) => {
1799 tracing::warn!(
1800 timeout_secs = prep_timeout_secs,
1801 "context preparation timed out; proceeding with degraded context"
1802 );
1803 }
1804 }
1805 }
1806
1807 #[cfg_attr(
1808 feature = "profiling",
1809 tracing::instrument(name = "agent.prepare_context", skip_all)
1810 )]
1811 async fn advance_context_lifecycle(&mut self, text: &str, trimmed: &str) {
1812 self.services.mcp.pruning_cache.reset();
1814
1815 let conv_id = self.services.memory.persistence.conversation_id;
1818 self.rebuild_system_prompt(text).await;
1819
1820 self.detect_and_record_corrections(trimmed, conv_id).await;
1821 self.services.learning_engine.tick();
1822 self.analyze_and_learn().await;
1823 self.sync_graph_counts().await;
1824
1825 self.context_manager.compaction = self.context_manager.compaction.advance_turn();
1830
1831 {
1833 self.services.focus.tick();
1834
1835 let sidequest_should_fire = self.services.sidequest.tick();
1838 if sidequest_should_fire && !self.context_manager.compaction.is_compacted_this_turn() {
1839 self.maybe_sidequest_eviction();
1840 }
1841 }
1842
1843 {
1846 let cfg = &self.services.memory.extraction.graph_config.experience;
1847 if cfg.enabled
1848 && cfg.evolution_sweep_enabled
1849 && cfg.evolution_sweep_interval > 0
1850 && self
1851 .services
1852 .sidequest
1853 .turn_counter
1854 .checked_rem(cfg.evolution_sweep_interval as u64)
1855 == Some(0)
1856 && let Some(memory) = self.services.memory.persistence.memory.as_ref()
1857 && let (Some(exp), Some(graph)) =
1858 (memory.experience.as_ref(), memory.graph_store.as_ref())
1859 {
1860 let exp = std::sync::Arc::clone(exp);
1861 let graph = std::sync::Arc::clone(graph);
1862 let threshold = cfg.confidence_prune_threshold;
1863 let turn = self.services.sidequest.turn_counter;
1864 let accepted = self.runtime.lifecycle.supervisor.spawn(
1865 agent_supervisor::TaskClass::Telemetry,
1866 "experience-sweep",
1867 async move {
1868 match exp.evolution_sweep(graph.as_ref(), threshold).await {
1869 Ok(stats) => tracing::info!(
1870 turn,
1871 self_loops = stats.pruned_self_loops,
1872 low_confidence = stats.pruned_low_confidence,
1873 "evolution sweep complete",
1874 ),
1875 Err(e) => tracing::warn!(
1876 turn,
1877 error = %e,
1878 "evolution sweep failed",
1879 ),
1880 }
1881 },
1882 );
1883 if !accepted {
1884 tracing::warn!(
1885 turn = self.services.sidequest.turn_counter,
1886 "experience-sweep dropped (telemetry class at capacity)",
1887 );
1888 }
1889 }
1890 }
1891
1892 if let Some(warning) = self.cache_expiry_warning() {
1894 tracing::info!(warning, "cache expiry warning");
1895 let _ = self.channel.send_status(&warning).await;
1896 }
1897
1898 self.maybe_time_based_microcompact();
1901
1902 self.maybe_apply_deferred_summaries();
1907 self.flush_deferred_summaries().await;
1908
1909 if let Err(e) = self.maybe_proactive_compress().await {
1911 tracing::warn!("proactive compression failed: {e:#}");
1912 }
1913
1914 if let Err(e) = self.maybe_compact().await {
1915 tracing::warn!("context compaction failed: {e:#}");
1916 }
1917
1918 if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
1919 tracing::warn!("context preparation failed: {e:#}");
1920 }
1921
1922 self.provider
1924 .set_memory_confidence(self.services.memory.persistence.last_recall_confidence);
1925
1926 self.services.learning_engine.reset_reflection();
1927 }
1928
1929 fn build_user_message(
1930 &mut self,
1931 text: &str,
1932 image_parts: Vec<zeph_llm::provider::MessagePart>,
1933 ) -> Message {
1934 let mut all_image_parts = std::mem::take(&mut self.msg.pending_image_parts);
1935 all_image_parts.extend(image_parts);
1936
1937 if !all_image_parts.is_empty() && self.provider.supports_vision() {
1938 let mut parts = vec![zeph_llm::provider::MessagePart::Text {
1939 text: text.to_owned(),
1940 }];
1941 parts.extend(all_image_parts);
1942 Message::from_parts(Role::User, parts)
1943 } else {
1944 if !all_image_parts.is_empty() {
1945 tracing::warn!(
1946 count = all_image_parts.len(),
1947 "image attachments dropped: provider does not support vision"
1948 );
1949 }
1950 Message {
1951 role: Role::User,
1952 content: text.to_owned(),
1953 parts: vec![],
1954 metadata: MessageMetadata::default(),
1955 }
1956 }
1957 }
1958
1959 fn drain_background_completions(&mut self) {
1963 const BACKGROUND_COMPLETION_BUFFER_CAP: usize = 16;
1964
1965 let Some(ref mut rx) = self.runtime.lifecycle.background_completion_rx else {
1966 return;
1967 };
1968 while let Ok(completion) = rx.try_recv() {
1970 if self.runtime.lifecycle.pending_background_completions.len()
1971 >= BACKGROUND_COMPLETION_BUFFER_CAP
1972 {
1973 tracing::warn!(
1974 run_id = %completion.run_id,
1975 "background completion buffer full; dropping run result"
1976 );
1977 self.runtime
1980 .lifecycle
1981 .pending_background_completions
1982 .pop_front();
1983 self.runtime
1984 .lifecycle
1985 .pending_background_completions
1986 .push_back(zeph_tools::BackgroundCompletion {
1987 run_id: completion.run_id,
1988 exit_code: -1,
1989 success: false,
1990 elapsed_ms: 0,
1991 command: completion.command,
1992 output: format!(
1993 "[background result for run {} dropped: buffer overflow]",
1994 completion.run_id
1995 ),
1996 });
1997 } else {
1998 self.runtime
1999 .lifecycle
2000 .pending_background_completions
2001 .push_back(completion);
2002 }
2003 }
2004 }
2005
2006 fn build_user_message_text_with_bg_completions(&mut self, user_text: &str) -> String {
2010 if self
2011 .runtime
2012 .lifecycle
2013 .pending_background_completions
2014 .is_empty()
2015 {
2016 return user_text.to_owned();
2017 }
2018 let mut parts = String::new();
2019 for completion in self
2020 .runtime
2021 .lifecycle
2022 .pending_background_completions
2023 .drain(..)
2024 {
2025 let _ = write!(
2026 parts,
2027 "[Background task {} completed]\nexit_code: {}\nsuccess: {}\nelapsed_ms: {}\ncommand: {}\n\n{}\n\n",
2028 completion.run_id,
2029 completion.exit_code,
2030 completion.success,
2031 completion.elapsed_ms,
2032 completion.command,
2033 completion.output,
2034 );
2035 }
2036 parts.push_str(user_text);
2037 parts
2038 }
2039
2040 async fn poll_subagent_until_done(&mut self, task_id: &str, label: &str) -> Option<String> {
2043 use zeph_subagent::SubAgentState;
2044 let result = loop {
2045 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
2046
2047 #[allow(clippy::redundant_closure_for_method_calls)]
2051 let pending = self
2052 .services
2053 .orchestration
2054 .subagent_manager
2055 .as_mut()
2056 .and_then(|m| m.try_recv_secret_request());
2057 if let Some((req_task_id, req)) = pending {
2058 let confirm_prompt = format!(
2061 "Sub-agent requests secret '{}'. Allow?",
2062 crate::text::truncate_to_chars(&req.secret_key, 100)
2063 );
2064 let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
2065 if let Some(mgr) = self.services.orchestration.subagent_manager.as_mut() {
2066 if approved {
2067 let ttl = std::time::Duration::from_mins(5);
2068 let key = req.secret_key.clone();
2069 if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
2070 let _ = mgr.deliver_secret(&req_task_id, key);
2071 }
2072 } else {
2073 let _ = mgr.deny_secret(&req_task_id);
2074 }
2075 }
2076 }
2077
2078 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2079 let statuses = mgr.statuses();
2080 let Some((_, status)) = statuses.iter().find(|(id, _)| id == task_id) else {
2081 break format!("{label} completed (no status available).");
2082 };
2083 match status.state {
2084 SubAgentState::Completed => {
2085 let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
2086 break format!("{label} completed: {msg}");
2087 }
2088 SubAgentState::Failed => {
2089 let msg = status
2090 .last_message
2091 .clone()
2092 .unwrap_or_else(|| "unknown error".into());
2093 break format!("{label} failed: {msg}");
2094 }
2095 SubAgentState::Canceled => {
2096 break format!("{label} was cancelled.");
2097 }
2098 _ => {
2099 let _ = self
2100 .channel
2101 .send_status(&format!(
2102 "{label}: turn {}/{}",
2103 status.turns_used,
2104 self.services
2105 .orchestration
2106 .subagent_manager
2107 .as_ref()
2108 .and_then(|m| m.agents_def(task_id))
2109 .map_or(20, |d| d.permissions.max_turns)
2110 ))
2111 .await;
2112 }
2113 }
2114 };
2115 Some(result)
2116 }
2117
2118 fn resolve_agent_id_prefix(&mut self, prefix: &str) -> Option<Result<String, String>> {
2121 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2122 let full_ids: Vec<String> = mgr
2123 .statuses()
2124 .into_iter()
2125 .map(|(tid, _)| tid)
2126 .filter(|tid| tid.starts_with(prefix))
2127 .collect();
2128 Some(match full_ids.as_slice() {
2129 [] => Err(format!("No sub-agent with id prefix '{prefix}'")),
2130 [fid] => Ok(fid.clone()),
2131 _ => Err(format!(
2132 "Ambiguous id prefix '{prefix}': matches {} agents",
2133 full_ids.len()
2134 )),
2135 })
2136 }
2137
2138 fn handle_agent_list(&self) -> Option<String> {
2139 use std::fmt::Write as _;
2140 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2141 let defs = mgr.definitions();
2142 if defs.is_empty() {
2143 return Some("No sub-agent definitions found.".into());
2144 }
2145 let mut out = String::from("Available sub-agents:\n");
2146 for d in defs {
2147 let memory_label = match d.memory {
2148 Some(zeph_subagent::MemoryScope::User) => " [memory:user]",
2149 Some(zeph_subagent::MemoryScope::Project) => " [memory:project]",
2150 Some(zeph_subagent::MemoryScope::Local) => " [memory:local]",
2151 None => "",
2152 };
2153 if let Some(ref src) = d.source {
2154 let _ = writeln!(
2155 out,
2156 " {}{} — {} ({})",
2157 d.name, memory_label, d.description, src
2158 );
2159 } else {
2160 let _ = writeln!(out, " {}{} — {}", d.name, memory_label, d.description);
2161 }
2162 }
2163 Some(out)
2164 }
2165
2166 fn handle_agent_status(&self) -> Option<String> {
2167 use std::fmt::Write as _;
2168 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2169 let statuses = mgr.statuses();
2170 if statuses.is_empty() {
2171 return Some("No active sub-agents.".into());
2172 }
2173 let mut out = String::from("Active sub-agents:\n");
2174 for (id, s) in &statuses {
2175 let state = format!("{:?}", s.state).to_lowercase();
2176 let elapsed = s.started_at.elapsed().as_secs();
2177 let _ = writeln!(
2178 out,
2179 " [{short}] {state} turns={t} elapsed={elapsed}s {msg}",
2180 short = &id[..8.min(id.len())],
2181 t = s.turns_used,
2182 msg = s.last_message.as_deref().unwrap_or(""),
2183 );
2184 if let Some(def) = mgr.agents_def(id)
2186 && let Some(scope) = def.memory
2187 && let Ok(dir) = zeph_subagent::memory::resolve_memory_dir(scope, &def.name)
2188 {
2189 let _ = writeln!(out, " memory: {}", dir.display());
2190 }
2191 }
2192 Some(out)
2193 }
2194
2195 fn handle_agent_approve(&mut self, id: &str) -> Option<String> {
2196 let full_id = match self.resolve_agent_id_prefix(id)? {
2197 Ok(fid) => fid,
2198 Err(msg) => return Some(msg),
2199 };
2200 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2201 if let Some((tid, req)) = mgr.try_recv_secret_request()
2202 && tid == full_id
2203 {
2204 let key = req.secret_key.clone();
2205 let ttl = std::time::Duration::from_mins(5);
2206 if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
2207 return Some(format!("Approve failed: {e}"));
2208 }
2209 if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
2210 return Some(format!("Secret delivery failed: {e}"));
2211 }
2212 return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
2213 }
2214 Some(format!(
2215 "No pending secret request for sub-agent '{full_id}'."
2216 ))
2217 }
2218
2219 fn handle_agent_deny(&mut self, id: &str) -> Option<String> {
2220 let full_id = match self.resolve_agent_id_prefix(id)? {
2221 Ok(fid) => fid,
2222 Err(msg) => return Some(msg),
2223 };
2224 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2225 match mgr.deny_secret(&full_id) {
2226 Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
2227 Err(e) => Some(format!("Deny failed: {e}")),
2228 }
2229 }
2230
2231 async fn handle_agent_command(&mut self, cmd: zeph_subagent::AgentCommand) -> Option<String> {
2232 use zeph_subagent::AgentCommand;
2233
2234 match cmd {
2235 AgentCommand::List => self.handle_agent_list(),
2236 AgentCommand::Background { name, prompt } => {
2237 self.handle_agent_background(&name, &prompt)
2238 }
2239 AgentCommand::Spawn { name, prompt }
2240 | AgentCommand::Mention {
2241 agent: name,
2242 prompt,
2243 } => self.handle_agent_spawn_foreground(&name, &prompt).await,
2244 AgentCommand::Status => self.handle_agent_status(),
2245 AgentCommand::Cancel { id } => self.handle_agent_cancel(&id),
2246 AgentCommand::Approve { id } => self.handle_agent_approve(&id),
2247 AgentCommand::Deny { id } => self.handle_agent_deny(&id),
2248 AgentCommand::Resume { id, prompt } => self.handle_agent_resume(&id, &prompt).await,
2249 }
2250 }
2251
2252 fn handle_agent_background(&mut self, name: &str, prompt: &str) -> Option<String> {
2253 let provider = self.provider.clone();
2254 let tool_executor = Arc::clone(&self.tool_executor);
2255 let skills = self.filtered_skills_for(name);
2256 let cfg = self.services.orchestration.subagent_config.clone();
2257 let spawn_ctx = self.build_spawn_context(&cfg);
2258 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2259 match mgr.spawn(
2260 name,
2261 prompt,
2262 provider,
2263 tool_executor,
2264 skills,
2265 &cfg,
2266 spawn_ctx,
2267 ) {
2268 Ok(id) => Some(format!(
2269 "Sub-agent '{name}' started in background (id: {short})",
2270 short = &id[..8.min(id.len())]
2271 )),
2272 Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
2273 }
2274 }
2275
2276 async fn handle_agent_spawn_foreground(&mut self, name: &str, prompt: &str) -> Option<String> {
2277 let provider = self.provider.clone();
2278 let tool_executor = Arc::clone(&self.tool_executor);
2279 let skills = self.filtered_skills_for(name);
2280 let cfg = self.services.orchestration.subagent_config.clone();
2281 let spawn_ctx = self.build_spawn_context(&cfg);
2282 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2283 let task_id = match mgr.spawn(
2284 name,
2285 prompt,
2286 provider,
2287 tool_executor,
2288 skills,
2289 &cfg,
2290 spawn_ctx,
2291 ) {
2292 Ok(id) => id,
2293 Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
2294 };
2295 let short = task_id[..8.min(task_id.len())].to_owned();
2296 let _ = self
2297 .channel
2298 .send(&format!("Sub-agent '{name}' running... (id: {short})"))
2299 .await;
2300 let label = format!("Sub-agent '{name}'");
2301 self.poll_subagent_until_done(&task_id, &label).await
2302 }
2303
2304 fn handle_agent_cancel(&mut self, id: &str) -> Option<String> {
2305 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2306 let ids: Vec<String> = mgr
2308 .statuses()
2309 .into_iter()
2310 .map(|(task_id, _)| task_id)
2311 .filter(|task_id| task_id.starts_with(id))
2312 .collect();
2313 match ids.as_slice() {
2314 [] => Some(format!("No sub-agent with id prefix '{id}'")),
2315 [full_id] => {
2316 let full_id = full_id.clone();
2317 match mgr.cancel(&full_id) {
2318 Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
2319 Err(e) => Some(format!("Cancel failed: {e}")),
2320 }
2321 }
2322 _ => Some(format!(
2323 "Ambiguous id prefix '{id}': matches {} agents",
2324 ids.len()
2325 )),
2326 }
2327 }
2328
2329 async fn handle_agent_resume(&mut self, id: &str, prompt: &str) -> Option<String> {
2330 let cfg = self.services.orchestration.subagent_config.clone();
2331 let def_name = {
2334 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2335 match mgr.def_name_for_resume(id, &cfg) {
2336 Ok(name) => name,
2337 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2338 }
2339 };
2340 let skills = self.filtered_skills_for(&def_name);
2341 let provider = self.provider.clone();
2342 let tool_executor = Arc::clone(&self.tool_executor);
2343 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2344 let (task_id, _) = match mgr.resume(id, prompt, provider, tool_executor, skills, &cfg) {
2345 Ok(pair) => pair,
2346 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2347 };
2348 let short = task_id[..8.min(task_id.len())].to_owned();
2349 let _ = self
2350 .channel
2351 .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
2352 .await;
2353 self.poll_subagent_until_done(&task_id, "Resumed sub-agent")
2354 .await
2355 }
2356
2357 fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
2358 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2359 let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
2360 let reg = self.services.skill.registry.read();
2361 match zeph_subagent::filter_skills(®, &def.skills) {
2362 Ok(skills) => {
2363 let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
2364 if bodies.is_empty() {
2365 None
2366 } else {
2367 Some(bodies)
2368 }
2369 }
2370 Err(e) => {
2371 tracing::warn!(error = %e, "skill filtering failed for sub-agent");
2372 None
2373 }
2374 }
2375 }
2376
2377 fn build_spawn_context(
2379 &self,
2380 cfg: &zeph_config::SubAgentConfig,
2381 ) -> zeph_subagent::SpawnContext {
2382 zeph_subagent::SpawnContext {
2383 parent_messages: self.extract_parent_messages(cfg),
2384 parent_cancel: Some(self.runtime.lifecycle.cancel_token.clone()),
2385 parent_provider_name: {
2386 let name = &self.runtime.config.active_provider_name;
2387 if name.is_empty() {
2388 None
2389 } else {
2390 Some(name.clone())
2391 }
2392 },
2393 spawn_depth: self.runtime.config.spawn_depth,
2394 mcp_tool_names: self.extract_mcp_tool_names(),
2395 }
2396 }
2397
2398 fn extract_parent_messages(
2403 &self,
2404 config: &zeph_config::SubAgentConfig,
2405 ) -> Vec<zeph_llm::provider::Message> {
2406 use zeph_llm::provider::Role;
2407 if config.context_window_turns == 0 {
2408 return Vec::new();
2409 }
2410 let non_system: Vec<_> = self
2411 .msg
2412 .messages
2413 .iter()
2414 .filter(|m| m.role != Role::System)
2415 .cloned()
2416 .collect();
2417 let take_count = config.context_window_turns * 2;
2418 let start = non_system.len().saturating_sub(take_count);
2419 let mut msgs = non_system[start..].to_vec();
2420
2421 let max_chars = 128_000usize / 4; let mut total_chars: usize = 0;
2424 let mut keep = msgs.len();
2425 for (i, m) in msgs.iter().enumerate() {
2426 total_chars += m.content.len();
2427 if total_chars > max_chars {
2428 keep = i;
2429 break;
2430 }
2431 }
2432 if keep < msgs.len() {
2433 tracing::info!(
2434 kept = keep,
2435 requested = config.context_window_turns * 2,
2436 "[subagent] truncated parent history from {} to {} turns due to token budget",
2437 config.context_window_turns * 2,
2438 keep
2439 );
2440 msgs.truncate(keep);
2441 }
2442 msgs
2443 }
2444
2445 fn extract_mcp_tool_names(&self) -> Vec<String> {
2447 self.tool_executor
2448 .tool_definitions_erased()
2449 .into_iter()
2450 .filter(|t| t.id.starts_with("mcp_"))
2451 .map(|t| t.id.to_string())
2452 .collect()
2453 }
2454
2455 fn classify_source_kind(
2459 skill_dir: &std::path::Path,
2460 managed_dir: Option<&std::path::PathBuf>,
2461 bundled_names: &std::collections::HashSet<String>,
2462 ) -> zeph_memory::store::SourceKind {
2463 if managed_dir.is_some_and(|d| skill_dir.starts_with(d)) {
2464 let skill_name = skill_dir.file_name().and_then(|n| n.to_str()).unwrap_or("");
2465 let has_marker = skill_dir.join(".bundled").exists();
2466 if has_marker && bundled_names.contains(skill_name) {
2467 zeph_memory::store::SourceKind::Bundled
2468 } else {
2469 if has_marker {
2470 tracing::warn!(
2471 skill = %skill_name,
2472 "skill has .bundled marker but is not in the bundled skill \
2473 allowlist — classifying as Hub"
2474 );
2475 }
2476 zeph_memory::store::SourceKind::Hub
2477 }
2478 } else {
2479 zeph_memory::store::SourceKind::Local
2480 }
2481 }
2482
2483 async fn update_trust_for_reloaded_skills(
2485 &mut self,
2486 all_meta: &[zeph_skills::loader::SkillMeta],
2487 ) {
2488 let memory = self.services.memory.persistence.memory.clone();
2490 let Some(memory) = memory else {
2491 return;
2492 };
2493 let trust_cfg = self.services.skill.trust_config.clone();
2494 let managed_dir = self.services.skill.managed_dir.clone();
2495 let bundled_names: std::collections::HashSet<String> =
2496 zeph_skills::bundled_skill_names().into_iter().collect();
2497 for meta in all_meta {
2498 let skill_dir = meta.skill_dir.clone();
2501 let managed_dir_ref = managed_dir.clone();
2502 let bundled_names_ref = bundled_names.clone();
2503 let fs_result: Option<(String, zeph_memory::store::SourceKind)> =
2504 tokio::task::spawn_blocking(move || {
2505 let hash = zeph_skills::compute_skill_hash(&skill_dir).ok()?;
2506 let source_kind = Self::classify_source_kind(
2507 &skill_dir,
2508 managed_dir_ref.as_ref(),
2509 &bundled_names_ref,
2510 );
2511 Some((hash, source_kind))
2512 })
2513 .await
2514 .unwrap_or(None);
2515
2516 let Some((current_hash, source_kind)) = fs_result else {
2517 tracing::warn!("failed to compute hash for '{}'", meta.name);
2518 continue;
2519 };
2520 let initial_level = match source_kind {
2521 zeph_memory::store::SourceKind::Bundled => &trust_cfg.bundled_level,
2522 zeph_memory::store::SourceKind::Hub => &trust_cfg.default_level,
2523 zeph_memory::store::SourceKind::Local | zeph_memory::store::SourceKind::File => {
2524 &trust_cfg.local_level
2525 }
2526 };
2527 let existing = memory
2528 .sqlite()
2529 .load_skill_trust(&meta.name)
2530 .await
2531 .ok()
2532 .flatten();
2533 let trust_level_str = if let Some(ref row) = existing {
2534 if row.blake3_hash != current_hash {
2535 trust_cfg.hash_mismatch_level.to_string()
2536 } else if row.source_kind != source_kind {
2537 let stored = row
2541 .trust_level
2542 .parse::<zeph_common::SkillTrustLevel>()
2543 .unwrap_or_else(|_| {
2544 tracing::warn!(
2545 skill = %meta.name,
2546 raw = %row.trust_level,
2547 "unrecognised trust_level in DB, treating as quarantined"
2548 );
2549 zeph_common::SkillTrustLevel::Quarantined
2550 });
2551 if !stored.is_active() || stored.severity() <= initial_level.severity() {
2552 row.trust_level.clone()
2553 } else {
2554 initial_level.to_string()
2555 }
2556 } else {
2557 row.trust_level.clone()
2558 }
2559 } else {
2560 initial_level.to_string()
2561 };
2562 let source_path = meta.skill_dir.to_str();
2563 if let Err(e) = memory
2564 .sqlite()
2565 .upsert_skill_trust(
2566 &meta.name,
2567 &trust_level_str,
2568 source_kind,
2569 None,
2570 source_path,
2571 ¤t_hash,
2572 )
2573 .await
2574 {
2575 tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
2576 }
2577 }
2578 }
2579
2580 async fn rebuild_skill_matcher(&mut self, all_meta: &[&zeph_skills::loader::SkillMeta]) {
2582 let provider = self.embedding_provider.clone();
2583 let embed_timeout =
2584 std::time::Duration::from_secs(self.runtime.config.timeouts.embedding_seconds);
2585 let embed_fn = move |text: &str| -> zeph_skills::matcher::EmbedFuture {
2586 let owned = text.to_owned();
2587 let p = provider.clone();
2588 Box::pin(async move {
2589 if let Ok(result) = tokio::time::timeout(embed_timeout, p.embed(&owned)).await {
2590 result
2591 } else {
2592 tracing::warn!(
2593 timeout_secs = embed_timeout.as_secs(),
2594 "skill matcher: embedding timed out"
2595 );
2596 Err(zeph_llm::LlmError::Timeout)
2597 }
2598 })
2599 };
2600
2601 let needs_inmemory_rebuild = !self
2602 .services
2603 .skill
2604 .matcher
2605 .as_ref()
2606 .is_some_and(SkillMatcherBackend::is_qdrant);
2607
2608 if needs_inmemory_rebuild {
2609 self.services.skill.matcher = SkillMatcher::new(all_meta, embed_fn)
2610 .await
2611 .map(SkillMatcherBackend::InMemory);
2612 } else if let Some(ref mut backend) = self.services.skill.matcher {
2613 let _ = self.channel.send_status("syncing skill index...").await;
2614 let on_progress: Option<Box<dyn Fn(usize, usize) + Send>> =
2615 self.services.session.status_tx.clone().map(
2616 |tx| -> Box<dyn Fn(usize, usize) + Send> {
2617 Box::new(move |completed, total| {
2618 let msg = format!("Syncing skills: {completed}/{total}");
2619 let _ = tx.send(msg);
2620 })
2621 },
2622 );
2623 if let Err(e) = backend
2624 .sync(
2625 all_meta,
2626 &self.services.skill.embedding_model,
2627 embed_fn,
2628 on_progress,
2629 )
2630 .await
2631 {
2632 tracing::warn!("failed to sync skill embeddings: {e:#}");
2633 }
2634 }
2635
2636 if self.services.skill.hybrid_search {
2637 let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
2638 let _ = self.channel.send_status("rebuilding search index...").await;
2639 self.services.skill.rebuild_bm25(&descs);
2640 }
2641 }
2642
2643 #[cfg_attr(
2644 feature = "profiling",
2645 tracing::instrument(name = "skill.hot_reload", skip_all)
2646 )]
2647 async fn reload_skills(&mut self) {
2648 let old_fp = self.services.skill.fingerprint();
2649 let reload_paths = if let Some(ref supplier) = self.services.skill.plugin_dirs_supplier {
2650 let plugin_dirs = supplier();
2651 let mut paths = self.services.skill.skill_paths.clone();
2652 for dir in plugin_dirs {
2653 if !paths.contains(&dir) {
2654 paths.push(dir);
2655 }
2656 }
2657 paths
2658 } else {
2659 self.services.skill.skill_paths.clone()
2660 };
2661 self.services.skill.registry.write().reload(&reload_paths);
2662 if self.services.skill.fingerprint() == old_fp {
2663 return;
2664 }
2665 let _ = self.channel.send_status("reloading skills...").await;
2666
2667 let all_meta = self
2668 .services
2669 .skill
2670 .registry
2671 .read()
2672 .all_meta()
2673 .into_iter()
2674 .cloned()
2675 .collect::<Vec<_>>();
2676
2677 self.update_trust_for_reloaded_skills(&all_meta).await;
2678
2679 let all_meta_refs = all_meta.iter().collect::<Vec<_>>();
2680 self.rebuild_skill_matcher(&all_meta_refs).await;
2681
2682 let all_skills: Vec<Skill> = {
2683 let reg = self.services.skill.registry.read();
2684 reg.all_meta()
2685 .iter()
2686 .filter_map(|m| reg.skill(&m.name).ok())
2687 .collect()
2688 };
2689 let trust_map = self.build_skill_trust_map().await;
2690 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
2691 let skills_prompt =
2692 state::SkillState::rebuild_prompt(&all_skills, &trust_map, &empty_health);
2693 self.services
2694 .skill
2695 .last_skills_prompt
2696 .clone_from(&skills_prompt);
2697 let system_prompt = build_system_prompt(&skills_prompt, None);
2698 if let Some(msg) = self.msg.messages.first_mut() {
2699 msg.content = system_prompt;
2700 }
2701
2702 let _ = self.channel.send_status("").await;
2703 tracing::info!(
2704 "reloaded {} skill(s)",
2705 self.services.skill.registry.read().all_meta().len()
2706 );
2707 }
2708
2709 fn reload_instructions(&mut self) {
2710 if let Some(ref mut rx) = self.runtime.instructions.reload_rx {
2712 while rx.try_recv().is_ok() {}
2713 }
2714 let Some(ref state) = self.runtime.instructions.reload_state else {
2715 return;
2716 };
2717 let new_blocks = crate::instructions::load_instructions(
2718 &state.base_dir,
2719 &state.provider_kinds,
2720 &state.explicit_files,
2721 state.auto_detect,
2722 );
2723 let old_sources: std::collections::HashSet<_> = self
2724 .runtime
2725 .instructions
2726 .blocks
2727 .iter()
2728 .map(|b| &b.source)
2729 .collect();
2730 let new_sources: std::collections::HashSet<_> =
2731 new_blocks.iter().map(|b| &b.source).collect();
2732 for added in new_sources.difference(&old_sources) {
2733 tracing::info!(path = %added.display(), "instruction file added");
2734 }
2735 for removed in old_sources.difference(&new_sources) {
2736 tracing::info!(path = %removed.display(), "instruction file removed");
2737 }
2738 tracing::info!(
2739 old_count = self.runtime.instructions.blocks.len(),
2740 new_count = new_blocks.len(),
2741 "reloaded instruction files"
2742 );
2743 self.runtime.instructions.blocks = new_blocks;
2744 }
2745
2746 fn reload_config(&mut self) {
2747 let Some(path) = self.runtime.lifecycle.config_path.clone() else {
2748 return;
2749 };
2750 let Some(config) = self.load_config_with_overlay(&path) else {
2751 return;
2752 };
2753 let budget_tokens = resolve_context_budget(&config, &self.provider);
2754 self.runtime.config.security = config.security;
2755 self.runtime.config.timeouts = config.timeouts;
2756 self.runtime.config.redact_credentials = config.memory.redact_credentials;
2757 self.services.memory.persistence.history_limit = config.memory.history_limit;
2758 self.services.memory.persistence.recall_limit = config.memory.semantic.recall_limit;
2759 self.services.memory.compaction.summarization_threshold =
2760 config.memory.summarization_threshold;
2761 self.services.skill.max_active_skills = config.skills.max_active_skills.get();
2762 self.services.skill.disambiguation_threshold = config.skills.disambiguation_threshold;
2763 self.services.skill.min_injection_score = config.skills.min_injection_score;
2764 self.services.skill.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
2765 self.services.skill.hybrid_search = config.skills.hybrid_search;
2766 self.services.skill.two_stage_matching = config.skills.two_stage_matching;
2767 self.services.skill.confusability_threshold =
2768 config.skills.confusability_threshold.clamp(0.0, 1.0);
2769 config
2770 .skills
2771 .generation_provider
2772 .as_str()
2773 .clone_into(&mut self.services.skill.generation_provider_name);
2774 self.services.skill.generation_output_dir =
2775 config.skills.generation_output_dir.as_deref().map(|p| {
2776 if let Some(stripped) = p.strip_prefix("~/") {
2777 dirs::home_dir()
2778 .map_or_else(|| std::path::PathBuf::from(p), |h| h.join(stripped))
2779 } else {
2780 std::path::PathBuf::from(p)
2781 }
2782 });
2783
2784 self.context_manager.budget = Some(
2785 ContextBudget::new(budget_tokens, 0.20).with_graph_enabled(config.memory.graph.enabled),
2786 );
2787
2788 {
2789 let graph_cfg = &config.memory.graph;
2790 if graph_cfg.rpe.enabled {
2791 if self.services.memory.extraction.rpe_router.is_none() {
2793 self.services.memory.extraction.rpe_router =
2794 Some(std::sync::Mutex::new(zeph_memory::RpeRouter::new(
2795 graph_cfg.rpe.threshold,
2796 graph_cfg.rpe.max_skip_turns,
2797 )));
2798 }
2799 } else {
2800 self.services.memory.extraction.rpe_router = None;
2801 }
2802 self.services.memory.extraction.graph_config = graph_cfg.clone();
2803 }
2804 self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
2805 self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
2806 self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
2807 self.context_manager.compaction_cooldown_turns = config.memory.compaction_cooldown_turns;
2808 self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
2809 self.context_manager.compression = config.memory.compression.clone();
2810 self.context_manager.routing = config.memory.store_routing.clone();
2811 self.context_manager.store_routing_provider = if config
2813 .memory
2814 .store_routing
2815 .routing_classifier_provider
2816 .is_empty()
2817 {
2818 None
2819 } else {
2820 let resolved = self.resolve_background_provider(
2821 &config.memory.store_routing.routing_classifier_provider,
2822 );
2823 Some(std::sync::Arc::new(resolved))
2824 };
2825 self.services
2826 .memory
2827 .persistence
2828 .cross_session_score_threshold = config.memory.cross_session_score_threshold;
2829
2830 self.services.index.repo_map_tokens = config.index.repo_map_tokens;
2831 self.services.index.repo_map_ttl =
2832 std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
2833
2834 tracing::info!("config reloaded");
2835 }
2836
2837 fn load_config_with_overlay(&mut self, path: &std::path::Path) -> Option<Config> {
2841 let mut config = match Config::load(path) {
2842 Ok(c) => c,
2843 Err(e) => {
2844 tracing::warn!("config reload failed: {e:#}");
2845 return None;
2846 }
2847 };
2848
2849 let new_overlay = if self.runtime.lifecycle.plugins_dir.as_os_str().is_empty() {
2851 None
2852 } else {
2853 match zeph_plugins::apply_plugin_config_overlays(
2854 &mut config,
2855 &self.runtime.lifecycle.plugins_dir,
2856 ) {
2857 Ok(o) => Some(o),
2858 Err(e) => {
2859 tracing::warn!(
2860 "plugin overlay merge failed during reload: {e:#}; \
2861 keeping previous runtime state"
2862 );
2863 return None;
2864 }
2865 }
2866 };
2867
2868 if let Some(ref overlay) = new_overlay {
2872 self.warn_on_shell_overlay_divergence(overlay, &config);
2873 }
2874 Some(config)
2875 }
2876
2877 fn warn_on_shell_overlay_divergence(
2883 &self,
2884 new_overlay: &zeph_plugins::ResolvedOverlay,
2885 config: &Config,
2886 ) {
2887 let new_blocked: Vec<String> = {
2888 let mut v = config.tools.shell.blocked_commands.clone();
2889 v.sort();
2890 v
2891 };
2892 let new_allowed: Vec<String> = {
2893 let mut v = config.tools.shell.allowed_commands.clone();
2894 v.sort();
2895 v
2896 };
2897
2898 let startup = &self.runtime.lifecycle.startup_shell_overlay;
2899 let blocked_changed = new_blocked != startup.blocked;
2900 let allowed_changed = new_allowed != startup.allowed;
2901
2902 if blocked_changed && let Some(ref h) = self.runtime.lifecycle.shell_policy_handle {
2904 h.rebuild(&config.tools.shell);
2905 tracing::info!(
2906 blocked_count = h.snapshot_blocked().len(),
2907 "shell blocked_commands rebuilt from hot-reload"
2908 );
2909 }
2910
2911 if allowed_changed {
2918 let msg = "plugin config overlay changed shell allowed_commands; RESTART REQUIRED \
2919 for sandbox path recomputation (blocked_commands was rebuilt live)";
2920 tracing::warn!("{msg}");
2921 if let Some(ref tx) = self.services.session.status_tx {
2922 let _ = tx.send(msg.to_owned());
2923 }
2924 }
2925
2926 let _ = new_overlay;
2927 }
2928
2929 fn maybe_sidequest_eviction(&mut self) {
2940 if self.services.sidequest.config.enabled {
2944 use crate::config::PruningStrategy;
2945 if !matches!(
2946 self.context_manager.compression.pruning_strategy,
2947 PruningStrategy::Reactive
2948 ) {
2949 tracing::warn!(
2950 strategy = ?self.context_manager.compression.pruning_strategy,
2951 "sidequest is enabled alongside a non-Reactive pruning strategy; \
2952 consider disabling sidequest.enabled to avoid redundant eviction"
2953 );
2954 }
2955 }
2956
2957 if self.services.focus.is_active() {
2959 tracing::debug!("sidequest: skipping — focus session active");
2960 self.services.compression.pending_sidequest_result = None;
2962 return;
2963 }
2964
2965 self.sidequest_apply_pending();
2967
2968 self.sidequest_schedule_next();
2970 }
2971
2972 fn sidequest_apply_pending(&mut self) {
2973 let Some(handle) = self.services.compression.pending_sidequest_result.take() else {
2974 return;
2975 };
2976 let result = match handle.try_join() {
2979 Ok(result) => result,
2980 Err(_handle) => {
2981 tracing::debug!("sidequest: background LLM task not yet complete, rescheduling");
2983 return;
2984 }
2985 };
2986 match result {
2987 Ok(Some(evicted_indices)) if !evicted_indices.is_empty() => {
2988 let cursors_snapshot = self.services.sidequest.tool_output_cursors.clone();
2989 let freed = self.services.sidequest.apply_eviction(
2990 &mut self.msg.messages,
2991 &evicted_indices,
2992 &self.runtime.metrics.token_counter,
2993 );
2994 if freed > 0 {
2995 self.recompute_prompt_tokens();
2996 self.context_manager.compaction =
2999 crate::agent::context_manager::CompactionState::CompactedThisTurn {
3000 cooldown: 0,
3001 };
3002 tracing::info!(
3003 freed_tokens = freed,
3004 evicted_cursors = evicted_indices.len(),
3005 pass = self.services.sidequest.passes_run,
3006 "sidequest eviction complete"
3007 );
3008 if let Some(ref d) = self.runtime.debug.debug_dumper {
3009 d.dump_sidequest_eviction(&cursors_snapshot, &evicted_indices, freed);
3010 }
3011 if let Some(ref tx) = self.services.session.status_tx {
3012 let _ = tx.send(format!("SideQuest evicted {freed} tokens"));
3013 }
3014 } else {
3015 if let Some(ref tx) = self.services.session.status_tx {
3017 let _ = tx.send(String::new());
3018 }
3019 }
3020 }
3021 Ok(None | Some(_)) => {
3022 tracing::debug!("sidequest: pending result: no cursors to evict");
3023 if let Some(ref tx) = self.services.session.status_tx {
3024 let _ = tx.send(String::new());
3025 }
3026 }
3027 Err(e) => {
3028 tracing::debug!("sidequest: background task error: {e}");
3029 if let Some(ref tx) = self.services.session.status_tx {
3030 let _ = tx.send(String::new());
3031 }
3032 }
3033 }
3034 }
3035
3036 fn sidequest_schedule_next(&mut self) {
3037 use zeph_llm::provider::{Message, MessageMetadata, Role};
3038
3039 self.services
3040 .sidequest
3041 .rebuild_cursors(&self.msg.messages, &self.runtime.metrics.token_counter);
3042
3043 if self.services.sidequest.tool_output_cursors.is_empty() {
3044 tracing::debug!("sidequest: no eligible cursors");
3045 return;
3046 }
3047
3048 let prompt = self.services.sidequest.build_eviction_prompt();
3049 let max_eviction_ratio = self.services.sidequest.config.max_eviction_ratio;
3050 let n_cursors = self.services.sidequest.tool_output_cursors.len();
3051 let provider = self.summary_or_primary_provider().clone();
3053
3054 let eviction_future = async move {
3055 let msgs = [Message {
3056 role: Role::User,
3057 content: prompt,
3058 parts: vec![],
3059 metadata: MessageMetadata::default(),
3060 }];
3061 let response =
3062 match tokio::time::timeout(std::time::Duration::from_secs(5), provider.chat(&msgs))
3063 .await
3064 {
3065 Ok(Ok(r)) => r,
3066 Ok(Err(e)) => {
3067 tracing::debug!("sidequest bg: LLM call failed: {e:#}");
3068 return None;
3069 }
3070 Err(_) => {
3071 tracing::debug!("sidequest bg: LLM call timed out");
3072 return None;
3073 }
3074 };
3075
3076 let start = response.find('{')?;
3077 let end = response.rfind('}')?;
3078 if start > end {
3079 return None;
3080 }
3081 let json_slice = &response[start..=end];
3082 let parsed: sidequest::EvictionResponse = serde_json::from_str(json_slice).ok()?;
3083 let mut valid: Vec<usize> = parsed
3084 .del_cursors
3085 .into_iter()
3086 .filter(|&c| c < n_cursors)
3087 .collect();
3088 valid.sort_unstable();
3089 valid.dedup();
3090 #[allow(
3091 clippy::cast_precision_loss,
3092 clippy::cast_possible_truncation,
3093 clippy::cast_sign_loss
3094 )]
3095 let max_evict = ((n_cursors as f32) * max_eviction_ratio).ceil() as usize;
3096 valid.truncate(max_evict);
3097 Some(valid)
3098 };
3099 let handle = self.runtime.lifecycle.task_supervisor.spawn_oneshot(
3100 std::sync::Arc::from("agent.sidequest.eviction"),
3101 move || eviction_future,
3102 );
3103 self.services.compression.pending_sidequest_result = Some(handle);
3104 tracing::debug!("sidequest: background LLM eviction task spawned");
3105 if let Some(ref tx) = self.services.session.status_tx {
3106 let _ = tx.send("SideQuest: scoring tool outputs...".into());
3107 }
3108 }
3109
3110 fn mcp_dispatch(&self) -> Option<McpManagerDispatch> {
3112 self.services
3113 .mcp
3114 .manager
3115 .as_ref()
3116 .map(|m| McpManagerDispatch(Arc::clone(m)))
3117 }
3118
3119 pub(crate) async fn check_cwd_changed(&mut self) {
3125 let current = match std::env::current_dir() {
3126 Ok(p) => p,
3127 Err(e) => {
3128 tracing::warn!("check_cwd_changed: failed to get cwd: {e}");
3129 return;
3130 }
3131 };
3132 if current == self.runtime.lifecycle.last_known_cwd {
3133 return;
3134 }
3135 let old_cwd =
3136 std::mem::replace(&mut self.runtime.lifecycle.last_known_cwd, current.clone());
3137 self.services.session.env_context.working_dir = current.display().to_string();
3138
3139 tracing::info!(
3140 old = %old_cwd.display(),
3141 new = %current.display(),
3142 "working directory changed"
3143 );
3144
3145 let _ = self
3146 .channel
3147 .send_status("Working directory changed\u{2026}")
3148 .await;
3149
3150 let hooks = self.services.session.hooks_config.cwd_changed.clone();
3151 if !hooks.is_empty() {
3152 let mut env = std::collections::HashMap::new();
3153 env.insert("ZEPH_OLD_CWD".to_owned(), old_cwd.display().to_string());
3154 env.insert("ZEPH_NEW_CWD".to_owned(), current.display().to_string());
3155 let dispatch = self.mcp_dispatch();
3156 let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
3157 .as_ref()
3158 .map(|d| d as &dyn zeph_subagent::McpDispatch);
3159 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp).await {
3160 tracing::warn!(error = %e, "CwdChanged hook failed");
3161 }
3162 }
3163
3164 let _ = self.channel.send_status("").await;
3165 }
3166
3167 pub(crate) async fn handle_file_changed(
3169 &mut self,
3170 event: crate::file_watcher::FileChangedEvent,
3171 ) {
3172 tracing::info!(path = %event.path.display(), "file changed");
3173
3174 let _ = self
3175 .channel
3176 .send_status("Running file-change hook\u{2026}")
3177 .await;
3178
3179 let hooks = self
3180 .services
3181 .session
3182 .hooks_config
3183 .file_changed_hooks
3184 .clone();
3185 if !hooks.is_empty() {
3186 let mut env = std::collections::HashMap::new();
3187 env.insert(
3188 "ZEPH_CHANGED_PATH".to_owned(),
3189 event.path.display().to_string(),
3190 );
3191 let dispatch = self.mcp_dispatch();
3192 let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
3193 .as_ref()
3194 .map(|d| d as &dyn zeph_subagent::McpDispatch);
3195 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp).await {
3196 tracing::warn!(error = %e, "FileChanged hook failed");
3197 }
3198 }
3199
3200 let _ = self.channel.send_status("").await;
3201 }
3202
3203 pub(super) fn maybe_spawn_promotion_scan(&mut self) {
3213 let Some(engine) = self.services.promotion_engine.clone() else {
3214 return;
3215 };
3216
3217 let Some(memory) = self.services.memory.persistence.memory.clone() else {
3218 return;
3219 };
3220
3221 let promotion_window = 200usize;
3224
3225 let accepted = self.runtime.lifecycle.supervisor.spawn(
3226 agent_supervisor::TaskClass::Enrichment,
3227 "compression_spectrum.promotion_scan",
3228 async move {
3229 let span = tracing::info_span!("memory.compression.promote.background");
3230 let _enter = span.enter();
3231
3232 let window = match memory.load_promotion_window(promotion_window).await {
3233 Ok(w) => w,
3234 Err(e) => {
3235 tracing::warn!(error = %e, "promotion scan: failed to load window");
3236 return;
3237 }
3238 };
3239
3240 if window.is_empty() {
3241 return;
3242 }
3243
3244 let candidates = match engine.scan(&window).await {
3245 Ok(c) => c,
3246 Err(e) => {
3247 tracing::warn!(error = %e, "promotion scan: clustering failed");
3248 return;
3249 }
3250 };
3251
3252 for candidate in &candidates {
3253 if let Err(e) = engine.promote(candidate).await {
3254 tracing::warn!(
3255 signature = %candidate.signature,
3256 error = %e,
3257 "promotion scan: promote failed"
3258 );
3259 }
3260 }
3261
3262 tracing::info!(candidates = candidates.len(), "promotion scan: complete");
3263 },
3264 );
3265
3266 if accepted {
3267 tracing::debug!("compression_spectrum: promotion scan task enqueued");
3268 }
3269 }
3270}
3271struct McpManagerDispatch(Arc<zeph_mcp::McpManager>);
3276
3277impl zeph_subagent::McpDispatch for McpManagerDispatch {
3278 fn call_tool<'a>(
3279 &'a self,
3280 server: &'a str,
3281 tool: &'a str,
3282 args: serde_json::Value,
3283 ) -> std::pin::Pin<
3284 Box<dyn std::future::Future<Output = Result<serde_json::Value, String>> + Send + 'a>,
3285 > {
3286 Box::pin(async move {
3287 self.0
3288 .call_tool(server, tool, args)
3289 .await
3290 .map(|result| {
3291 let texts: Vec<serde_json::Value> = result
3293 .content
3294 .iter()
3295 .filter_map(|c| {
3296 if let rmcp::model::RawContent::Text(t) = &c.raw {
3297 Some(serde_json::Value::String(t.text.clone()))
3298 } else {
3299 None
3300 }
3301 })
3302 .collect();
3303 serde_json::Value::Array(texts)
3304 })
3305 .map_err(|e| e.to_string())
3306 })
3307 }
3308}
3309
3310pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
3311 while !*rx.borrow_and_update() {
3312 if rx.changed().await.is_err() {
3313 std::future::pending::<()>().await;
3314 }
3315 }
3316}
3317
3318pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
3319 match rx {
3320 Some(inner) => {
3321 if let Some(v) = inner.recv().await {
3322 Some(v)
3323 } else {
3324 *rx = None;
3325 std::future::pending().await
3326 }
3327 }
3328 None => std::future::pending().await,
3329 }
3330}
3331
3332fn truncate_shell_command(cmd: &str) -> String {
3339 if cmd.len() <= 80 {
3340 return cmd.to_owned();
3341 }
3342 let end = cmd.floor_char_boundary(79);
3343 format!("{}…", &cmd[..end])
3344}
3345
3346fn truncate_shell_run_id(id: &str) -> String {
3348 id.chars().take(8).collect()
3349}
3350
3351pub(crate) fn resolve_context_budget(config: &Config, provider: &AnyProvider) -> usize {
3352 let tokens = if config.memory.auto_budget && config.memory.context_budget_tokens == 0 {
3353 if let Some(ctx_size) = provider.context_window() {
3354 tracing::info!(
3355 model_context = ctx_size,
3356 "auto-configured context budget on reload"
3357 );
3358 ctx_size
3359 } else {
3360 0
3361 }
3362 } else {
3363 config.memory.context_budget_tokens
3364 };
3365 if tokens == 0 {
3366 tracing::warn!(
3367 "context_budget_tokens resolved to 0 on reload — using fallback of 128000 tokens"
3368 );
3369 128_000
3370 } else {
3371 tokens
3372 }
3373}
3374
3375#[cfg(test)]
3376mod tests;
3377
3378#[cfg(test)]
3379pub(crate) use tests::agent_tests;
3380
3381#[cfg(test)]
3382mod test_stubs {
3383 use std::pin::Pin;
3384
3385 use zeph_commands::{
3386 CommandContext, CommandError, CommandHandler, CommandOutput, SlashCategory,
3387 };
3388
3389 pub(super) struct TestErrorCommand;
3395
3396 impl CommandHandler<CommandContext<'_>> for TestErrorCommand {
3397 fn name(&self) -> &'static str {
3398 "/test-error"
3399 }
3400
3401 fn description(&self) -> &'static str {
3402 "Test stub: always returns CommandError"
3403 }
3404
3405 fn category(&self) -> SlashCategory {
3406 SlashCategory::Session
3407 }
3408
3409 fn handle<'a>(
3410 &'a self,
3411 _ctx: &'a mut CommandContext<'_>,
3412 _args: &'a str,
3413 ) -> Pin<
3414 Box<dyn std::future::Future<Output = Result<CommandOutput, CommandError>> + Send + 'a>,
3415 > {
3416 Box::pin(async { Err(CommandError::new("boom")) })
3417 }
3418 }
3419}