1mod acp_commands;
10mod agent_access_impl;
11pub(crate) mod agent_supervisor;
12mod autodream;
13mod autonomous_turn;
14mod builder;
15pub(crate) mod channel_impl;
16#[cfg(feature = "cocoon")]
17mod cocoon_cmd;
18mod command_context_impls;
19pub(super) mod compression_feedback;
20mod context;
21mod context_impls;
22pub(crate) mod context_manager;
23mod corrections;
24pub mod error;
25mod experiment_cmd;
26pub(crate) mod focus;
27mod index;
28mod learning;
29pub(crate) mod learning_engine;
30mod log_commands;
31mod loop_event;
32mod lsp_commands;
33mod magic_docs;
34mod mcp;
35pub(crate) mod memcot;
36mod message_queue;
37mod microcompact;
38mod model_commands;
39mod persistence;
40#[cfg(feature = "scheduler")]
41mod plan;
42mod policy_commands;
43mod provider_cmd;
44mod quality_hook;
45pub(crate) mod rate_limiter;
46#[cfg(feature = "scheduler")]
47mod scheduler_commands;
48#[cfg(feature = "scheduler")]
49mod scheduler_loop;
50mod scope_commands;
51pub mod session_config;
52mod session_digest;
53pub mod shadow_sentinel;
54pub(crate) mod sidequest;
55mod skill_management;
56pub mod slash_commands;
57pub mod speculative;
58pub(crate) mod state;
59pub(crate) mod task_injection;
60pub(crate) mod tool_execution;
61pub(crate) mod tool_orchestrator;
62pub mod trajectory;
63mod trajectory_commands;
64mod trust_commands;
65pub mod turn;
66mod utils;
67pub(crate) mod vigil;
68
69use std::collections::{HashMap, VecDeque};
70use std::fmt::Write as _;
71use std::sync::Arc;
72
73use parking_lot::RwLock;
74
75use tokio::sync::{mpsc, watch};
76use tokio_util::sync::CancellationToken;
77use zeph_llm::any::AnyProvider;
78use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
79use zeph_memory::TokenCounter;
80use zeph_memory::semantic::SemanticMemory;
81use zeph_skills::loader::Skill;
82use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
83use zeph_skills::prompt::format_skills_prompt;
84use zeph_skills::registry::SkillRegistry;
85use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
86
87use crate::channel::Channel;
88use crate::config::Config;
89use crate::context::{ContextBudget, build_system_prompt};
90use zeph_common::text::estimate_tokens;
91
92use loop_event::LoopEvent;
93use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, detect_image_mime};
94use state::MessageState;
95
96pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
97pub(crate) use zeph_agent_context::helpers::CODE_CONTEXT_PREFIX;
101pub(crate) const SCHEDULED_TASK_PREFIX: &str = "Execute the following scheduled task now: ";
102pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
103
104pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
105 use std::fmt::Write;
106 let capacity = "[tool output: ".len()
107 + tool_name.len()
108 + "]\n```\n".len()
109 + body.len()
110 + TOOL_OUTPUT_SUFFIX.len();
111 let mut buf = String::with_capacity(capacity);
112 let _ = write!(
113 buf,
114 "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
115 );
116 buf
117}
118
119pub struct Agent<C: Channel> {
152 provider: AnyProvider,
154 embedding_provider: AnyProvider,
159 channel: C,
160 pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
161
162 pub(super) msg: MessageState,
164 pub(super) context_manager: context_manager::ContextManager,
165 pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
166
167 pub(super) services: state::Services,
169
170 pub(super) runtime: state::AgentRuntime,
172}
173
174enum DispatchFlow {
176 Break,
178 Continue,
180 Fallthrough,
182}
183
184impl<C: Channel> Agent<C> {
185 #[must_use]
209 pub fn new(
210 provider: AnyProvider,
211 channel: C,
212 registry: SkillRegistry,
213 matcher: Option<SkillMatcherBackend>,
214 max_active_skills: usize,
215 tool_executor: impl ToolExecutor + 'static,
216 ) -> Self {
217 let registry = Arc::new(RwLock::new(registry));
218 let embedding_provider = provider.clone();
219 Self::new_with_registry_arc(
220 provider,
221 embedding_provider,
222 channel,
223 registry,
224 matcher,
225 max_active_skills,
226 tool_executor,
227 )
228 }
229
230 #[must_use]
237 pub fn new_with_registry_arc(
238 provider: AnyProvider,
239 embedding_provider: AnyProvider,
240 channel: C,
241 registry: Arc<RwLock<SkillRegistry>>,
242 matcher: Option<SkillMatcherBackend>,
243 max_active_skills: usize,
244 tool_executor: impl ToolExecutor + 'static,
245 ) -> Self {
246 use state::{
247 AgentRuntime, CompressionState, DebugState, ExperimentState, FeedbackState, IndexState,
248 InstructionState, LifecycleState, McpState, MemoryState, MetricsState,
249 OrchestrationState, ProviderState, RuntimeConfig, SecurityState, Services,
250 SessionState, SkillState, ToolState,
251 };
252
253 debug_assert!(max_active_skills > 0, "max_active_skills must be > 0");
254 let all_skills: Vec<Skill> = {
255 let reg = registry.read();
256 reg.all_meta()
257 .iter()
258 .filter_map(|m| reg.skill(&m.name).ok())
259 .collect()
260 };
261 let empty_trust = HashMap::new();
262 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
263 let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
264 let system_prompt = build_system_prompt(&skills_prompt, None);
265 tracing::debug!(len = system_prompt.len(), "initial system prompt built");
266 tracing::trace!(prompt = %system_prompt, "full system prompt");
267
268 let initial_prompt_tokens = estimate_tokens(&system_prompt) as u64;
269 let token_counter = Arc::new(TokenCounter::new());
270
271 let services = Services {
272 memory: MemoryState::default(),
273 skill: SkillState::new(registry, matcher, max_active_skills, skills_prompt),
274 learning_engine: learning_engine::LearningEngine::new(),
275 feedback: FeedbackState::default(),
276 mcp: McpState::default(),
277 index: IndexState::default(),
278 session: SessionState::new(),
279 security: SecurityState::default(),
280 experiments: ExperimentState::new(),
281 compression: CompressionState::default(),
282 orchestration: OrchestrationState::default(),
283 focus: focus::FocusState::default(),
284 sidequest: sidequest::SidequestState::default(),
285 tool_state: ToolState::default(),
286 goal_accounting: None,
287 quality: None,
288 proactive_explorer: None,
289 promotion_engine: None,
290 taco_compressor: None,
291 speculation_engine: None,
292 autonomous: crate::goal::AutonomousDriver::new(tokio::time::Duration::from_millis(500)),
293 autonomous_registry: crate::goal::AutonomousRegistry::new(),
294 };
295
296 let runtime = AgentRuntime {
297 config: RuntimeConfig::default(),
298 lifecycle: LifecycleState::new(),
299 providers: ProviderState::new(initial_prompt_tokens),
300 metrics: MetricsState::new(token_counter),
301 debug: DebugState::default(),
302 instructions: InstructionState::default(),
303 };
304
305 Self {
306 provider,
307 embedding_provider,
308 channel,
309 tool_executor: Arc::new(tool_executor),
310 msg: MessageState {
311 messages: vec![Message {
312 role: Role::System,
313 content: system_prompt,
314 parts: vec![],
315 metadata: MessageMetadata::default(),
316 }],
317 message_queue: VecDeque::new(),
318 pending_image_parts: Vec::new(),
319 last_persisted_message_id: None,
320 deferred_db_hide_ids: Vec::new(),
321 deferred_db_summaries: Vec::new(),
322 },
323 context_manager: context_manager::ContextManager::new(),
324 tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
325 services,
326 runtime,
327 }
328 }
329
330 #[must_use]
343 pub fn into_channel(self) -> C {
344 self.channel
345 }
346
347 #[tracing::instrument(name = "core.agent.poll_subagents", skip_all, level = "debug")]
352 pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
353 let Some(mgr) = &mut self.services.orchestration.subagent_manager else {
354 return vec![];
355 };
356
357 let finished: Vec<String> = mgr
358 .statuses()
359 .into_iter()
360 .filter_map(|(id, status)| {
361 if matches!(
362 status.state,
363 zeph_subagent::SubAgentState::Completed
364 | zeph_subagent::SubAgentState::Failed
365 | zeph_subagent::SubAgentState::Canceled
366 ) {
367 Some(id)
368 } else {
369 None
370 }
371 })
372 .collect();
373
374 let mut results = vec![];
375 for task_id in finished {
376 match mgr.collect(&task_id).await {
377 Ok(result) => results.push((task_id, result)),
378 Err(e) => {
379 tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
380 }
381 }
382 }
383 results
384 }
385
386 async fn call_llm_for_session_summary(
395 &self,
396 chat_messages: &[Message],
397 ) -> Option<zeph_memory::StructuredSummary> {
398 let provider = self.resolve_background_provider(
399 &self.services.memory.compaction.shutdown_summary_provider,
400 );
401 let timeout_dur = std::time::Duration::from_secs(
402 self.services
403 .memory
404 .compaction
405 .shutdown_summary_timeout_secs,
406 );
407 match tokio::time::timeout(
408 timeout_dur,
409 provider.chat_typed_erased::<zeph_memory::StructuredSummary>(chat_messages),
410 )
411 .await
412 {
413 Ok(Ok(s)) => Some(s),
414 Ok(Err(e)) => {
415 tracing::warn!(
416 "shutdown summary: structured LLM call failed, falling back to plain: {e:#}"
417 );
418 self.plain_text_summary_fallback(&provider, chat_messages, timeout_dur)
419 .await
420 }
421 Err(_) => {
422 tracing::warn!(
423 "shutdown summary: structured LLM call timed out after {}s, falling back to plain",
424 self.services
425 .memory
426 .compaction
427 .shutdown_summary_timeout_secs
428 );
429 self.plain_text_summary_fallback(&provider, chat_messages, timeout_dur)
430 .await
431 }
432 }
433 }
434
435 async fn plain_text_summary_fallback(
436 &self,
437 provider: &zeph_llm::any::AnyProvider,
438 chat_messages: &[Message],
439 timeout_dur: std::time::Duration,
440 ) -> Option<zeph_memory::StructuredSummary> {
441 match tokio::time::timeout(timeout_dur, provider.chat(chat_messages)).await {
442 Ok(Ok(plain)) => Some(zeph_memory::StructuredSummary {
443 summary: plain,
444 key_facts: vec![],
445 entities: vec![],
446 }),
447 Ok(Err(e)) => {
448 tracing::warn!("shutdown summary: plain LLM fallback failed: {e:#}");
449 None
450 }
451 Err(_) => {
452 tracing::warn!("shutdown summary: plain LLM fallback timed out");
453 None
454 }
455 }
456 }
457
458 async fn flush_orphaned_tool_use_on_shutdown(&mut self) {
463 use zeph_llm::provider::{MessagePart, Role};
464
465 let msgs = &self.msg.messages;
469 let Some(asst_idx) = msgs.iter().rposition(|m| m.role == Role::Assistant) else {
471 return;
472 };
473 let asst_msg = &msgs[asst_idx];
474 let tool_use_ids: Vec<(&str, &str, &serde_json::Value)> = asst_msg
475 .parts
476 .iter()
477 .filter_map(|p| {
478 if let MessagePart::ToolUse { id, name, input } = p {
479 Some((id.as_str(), name.as_str(), input))
480 } else {
481 None
482 }
483 })
484 .collect();
485 if tool_use_ids.is_empty() {
486 return;
487 }
488
489 let paired_ids: std::collections::HashSet<&str> = msgs
491 .get(asst_idx + 1..)
492 .into_iter()
493 .flatten()
494 .filter(|m| m.role == Role::User)
495 .flat_map(|m| m.parts.iter())
496 .filter_map(|p| {
497 if let MessagePart::ToolResult { tool_use_id, .. } = p {
498 Some(tool_use_id.as_str())
499 } else {
500 None
501 }
502 })
503 .collect();
504
505 let unpaired: Vec<zeph_llm::provider::ToolUseRequest> = tool_use_ids
506 .iter()
507 .filter(|(id, _, _)| !paired_ids.contains(*id))
508 .map(|(id, name, input)| zeph_llm::provider::ToolUseRequest {
509 id: (*id).to_owned(),
510 name: (*name).to_owned().into(),
511 input: (*input).clone(),
512 })
513 .collect();
514
515 if unpaired.is_empty() {
516 return;
517 }
518
519 tracing::info!(
520 count = unpaired.len(),
521 "shutdown: persisting tombstone ToolResults for unpaired in-flight tool calls"
522 );
523 self.persist_cancelled_tool_results(&unpaired).await;
524 }
525
526 async fn maybe_store_shutdown_summary(&mut self) {
536 if !self.services.memory.compaction.shutdown_summary {
537 return;
538 }
539 let Some(memory) = self.services.memory.persistence.memory.clone() else {
540 return;
541 };
542 let Some(conversation_id) = self.services.memory.persistence.conversation_id else {
543 return;
544 };
545
546 match memory.has_session_summary(conversation_id).await {
548 Ok(true) => {
549 tracing::debug!("shutdown summary: session already has a summary, skipping");
550 return;
551 }
552 Ok(false) => {}
553 Err(e) => {
554 tracing::warn!("shutdown summary: failed to check existing summary: {e:#}");
555 return;
556 }
557 }
558
559 let user_count = self
561 .msg
562 .messages
563 .iter()
564 .skip(1)
565 .filter(|m| m.role == Role::User)
566 .count();
567 if user_count
568 < self
569 .services
570 .memory
571 .compaction
572 .shutdown_summary_min_messages
573 {
574 tracing::debug!(
575 user_count,
576 min = self
577 .services
578 .memory
579 .compaction
580 .shutdown_summary_min_messages,
581 "shutdown summary: too few user messages, skipping"
582 );
583 return;
584 }
585
586 let _ = self.channel.send_status("Saving session summary...").await;
588
589 let max = self
591 .services
592 .memory
593 .compaction
594 .shutdown_summary_max_messages;
595 if max == 0 {
596 tracing::debug!("shutdown summary: max_messages=0, skipping");
597 return;
598 }
599 let non_system: Vec<_> = self.msg.messages.iter().skip(1).collect();
600 let slice = if non_system.len() > max {
601 &non_system[non_system.len() - max..]
602 } else {
603 &non_system[..]
604 };
605
606 let msgs_for_prompt: Vec<(zeph_memory::MessageId, String, String)> = slice
607 .iter()
608 .map(|m| {
609 let role = match m.role {
610 Role::User => "user".to_owned(),
611 Role::Assistant => "assistant".to_owned(),
612 Role::System => "system".to_owned(),
613 };
614 (zeph_memory::MessageId(0), role, m.content.clone())
615 })
616 .collect();
617
618 let prompt = zeph_memory::build_summarization_prompt(&msgs_for_prompt);
619 let chat_messages = vec![Message {
620 role: Role::User,
621 content: prompt,
622 parts: vec![],
623 metadata: MessageMetadata::default(),
624 }];
625
626 let Some(structured) = self.call_llm_for_session_summary(&chat_messages).await else {
627 let _ = self.channel.send_status("").await;
628 return;
629 };
630
631 if let Err(e) = memory
632 .store_shutdown_summary(conversation_id, &structured.summary, &structured.key_facts)
633 .await
634 {
635 tracing::warn!("shutdown summary: storage failed: {e:#}");
636 } else {
637 tracing::info!(
638 conversation_id = conversation_id.0,
639 "shutdown summary stored"
640 );
641 }
642
643 let _ = self.channel.send_status("").await;
645 }
646
647 #[tracing::instrument(name = "core.agent.shutdown", skip_all, level = "debug")]
663 pub async fn shutdown(&mut self) {
664 let _ = self.channel.send_status("Shutting down...").await;
665
666 self.provider.save_router_state().await;
668
669 if let Some(ref advisor) = self.services.orchestration.topology_advisor
671 && let Err(e) = advisor.save()
672 {
673 tracing::warn!(error = %e, "adaptorch: failed to persist state");
674 }
675
676 if let Some(ref mut mgr) = self.services.orchestration.subagent_manager {
677 mgr.shutdown_all();
678 }
679
680 if let Some(ref manager) = self.services.mcp.manager {
681 manager.shutdown_all_shared().await;
682 }
683
684 if let Some(turns) = self.context_manager.turns_since_last_hard_compaction() {
688 self.update_metrics(|m| {
689 m.compaction_turns_after_hard.push(turns);
690 });
691 self.context_manager
692 .set_turns_since_last_hard_compaction(None);
693 }
694
695 if let Some(ref tx) = self.runtime.metrics.metrics_tx {
696 let m = tx.borrow();
697 if m.filter_applications > 0 {
698 #[allow(clippy::cast_precision_loss)]
699 let pct = if m.filter_raw_tokens > 0 {
700 m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
701 } else {
702 0.0
703 };
704 tracing::info!(
705 raw_tokens = m.filter_raw_tokens,
706 saved_tokens = m.filter_saved_tokens,
707 applications = m.filter_applications,
708 "tool output filtering saved ~{} tokens ({pct:.0}%)",
709 m.filter_saved_tokens,
710 );
711 }
712 if m.compaction_hard_count > 0 {
713 tracing::info!(
714 hard_compactions = m.compaction_hard_count,
715 turns_after_hard = ?m.compaction_turns_after_hard,
716 "hard compaction trajectory"
717 );
718 }
719 }
720
721 self.flush_orphaned_tool_use_on_shutdown().await;
725
726 if let Some(ref token) = self.services.experiments.cancel {
729 token.cancel();
730 }
731 if let Some(h) = self.services.experiments.handle.take() {
732 h.abort();
733 }
734
735 self.runtime.lifecycle.supervisor.abort_all();
737
738 if let Some(h) = self.services.compression.pending_task_goal.take() {
741 h.abort();
742 }
743 if let Some(h) = self.services.compression.pending_sidequest_result.take() {
744 h.abort();
745 }
746 if let Some(h) = self.services.compression.pending_subgoal.take() {
747 h.abort();
748 }
749
750 self.services.learning_engine.learning_tasks.abort_all();
752
753 for _ in 0..4 {
758 tokio::task::yield_now().await;
759 }
760
761 self.maybe_store_shutdown_summary().await;
762 self.maybe_store_session_digest().await;
763
764 tracing::info!("agent shutdown complete");
765 }
766
767 fn refresh_subagent_metrics(&mut self) {
774 let Some(ref mgr) = self.services.orchestration.subagent_manager else {
775 return;
776 };
777 let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
778 .statuses()
779 .into_iter()
780 .map(|(id, s)| {
781 let def = mgr.agents_def(&id);
782 crate::metrics::SubAgentMetrics {
783 name: def.map_or_else(|| id[..8.min(id.len())].to_owned(), |d| d.name.clone()),
784 id: id.clone(),
785 state: format!("{:?}", s.state).to_lowercase(),
786 turns_used: s.turns_used,
787 max_turns: def.map_or(20, |d| d.permissions.max_turns),
788 background: def.is_some_and(|d| d.permissions.background),
789 elapsed_secs: s.started_at.elapsed().as_secs(),
790 permission_mode: def.map_or_else(String::new, |d| {
791 use zeph_subagent::def::PermissionMode;
792 match d.permissions.permission_mode {
793 PermissionMode::Default => String::new(),
794 PermissionMode::AcceptEdits => "accept_edits".into(),
795 PermissionMode::DontAsk => "dont_ask".into(),
796 PermissionMode::BypassPermissions => "bypass_permissions".into(),
797 PermissionMode::Plan => "plan".into(),
798 }
799 }),
800 transcript_dir: mgr
801 .agent_transcript_dir(&id)
802 .map(|p| p.to_string_lossy().into_owned()),
803 }
804 })
805 .collect();
806 self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
807 }
808
809 async fn notify_completed_subagents(&mut self) -> Result<(), error::AgentError> {
811 let completed = self.poll_subagents().await;
812 for (task_id, result) in completed {
813 let notice = if result.is_empty() {
814 format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
815 } else {
816 format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
817 };
818 if let Err(e) = self.channel.send(¬ice).await {
819 tracing::warn!(error = %e, "failed to send sub-agent completion notice");
820 }
821 }
822 Ok(())
823 }
824
825 #[tracing::instrument(name = "core.agent.run", skip_all, level = "debug", err)]
831 #[allow(clippy::too_many_lines)] pub async fn run(&mut self) -> Result<(), error::AgentError>
833 where
834 C: 'static,
835 {
836 if let Some(mut rx) = self.runtime.lifecycle.warmup_ready.take()
837 && !*rx.borrow()
838 {
839 let _ = rx.changed().await;
840 if !*rx.borrow() {
841 tracing::warn!("model warmup did not complete successfully");
842 }
843 }
844
845 self.restore_channel_provider().await;
847
848 self.load_and_cache_session_digest().await;
850 self.maybe_send_resume_recap().await;
851
852 loop {
853 self.apply_provider_override();
854 self.check_tool_refresh().await;
855 self.process_pending_elicitations().await;
856 self.refresh_subagent_metrics();
857 self.notify_completed_subagents().await?;
858 self.drain_channel();
859
860 let (text, image_parts) = if let Some(queued) = self.msg.message_queue.pop_front() {
861 self.notify_queue_count().await;
862 if queued.raw_attachments.is_empty() {
863 (queued.text, queued.image_parts)
864 } else {
865 let msg = crate::channel::ChannelMessage {
866 text: queued.text,
867 attachments: queued.raw_attachments,
868 is_guest_context: false,
869 is_from_bot: false,
870 };
871 self.resolve_message(msg).await
872 }
873 } else {
874 match self.next_event().await? {
875 None | Some(LoopEvent::Shutdown) => break,
876 Some(LoopEvent::SkillReload) => {
877 self.reload_skills().await;
878 continue;
879 }
880 Some(LoopEvent::InstructionReload) => {
881 self.reload_instructions().await;
882 continue;
883 }
884 Some(LoopEvent::ConfigReload) => {
885 self.reload_config();
886 continue;
887 }
888 Some(LoopEvent::UpdateNotification(msg)) => {
889 if let Err(e) = self.channel.send(&msg).await {
890 tracing::warn!("failed to send update notification: {e}");
891 }
892 continue;
893 }
894 Some(LoopEvent::ExperimentCompleted(msg)) => {
895 self.services.experiments.cancel = None;
896 self.services.experiments.handle = None;
897 if let Err(e) = self.channel.send(&msg).await {
898 tracing::warn!("failed to send experiment completion: {e}");
899 }
900 continue;
901 }
902 Some(LoopEvent::ScheduledTask(prompt)) => {
903 let text = format!("{SCHEDULED_TASK_PREFIX}{prompt}");
904 let msg = crate::channel::ChannelMessage {
905 text,
906 attachments: Vec::new(),
907 is_guest_context: false,
908 is_from_bot: false,
909 };
910 self.drain_channel();
911 self.resolve_message(msg).await
912 }
913 Some(LoopEvent::TaskInjected(injection)) => {
914 if let Some(ref mut ls) = self.runtime.lifecycle.user_loop {
915 ls.iteration += 1;
916 tracing::info!(iteration = ls.iteration, "loop: tick");
917 }
918 let msg = crate::channel::ChannelMessage {
919 text: injection.prompt,
920 attachments: Vec::new(),
921 is_guest_context: false,
922 is_from_bot: false,
923 };
924 self.drain_channel();
925 self.resolve_message(msg).await
926 }
927 Some(LoopEvent::FileChanged(event)) => {
928 self.handle_file_changed(event).await;
929 continue;
930 }
931 Some(LoopEvent::AutonomousTick) => {
932 if let Err(e) = self.run_autonomous_turn().await {
933 tracing::warn!(error = %e, "autonomous turn error");
934 }
935 continue;
936 }
937 Some(LoopEvent::Message(msg)) => {
938 self.services.session.is_guest_context = msg.is_guest_context;
939 self.drain_channel();
940 self.resolve_message(msg).await
941 }
942 }
943 };
944
945 let trimmed = text.trim();
946
947 if trimmed.starts_with('/') {
950 let slash_urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
951 if !slash_urls.is_empty() {
952 self.services
953 .security
954 .user_provided_urls
955 .write()
956 .extend(slash_urls);
957 }
958 }
959
960 let session_impl = command_context_impls::SessionAccessImpl {
983 supports_exit: self.channel.supports_exit(),
984 };
985 let mut messages_impl = command_context_impls::MessageAccessImpl {
986 msg: &mut self.msg,
987 tool_state: &mut self.services.tool_state,
988 providers: &mut self.runtime.providers,
989 metrics: &self.runtime.metrics,
990 security: &mut self.services.security,
991 tool_orchestrator: &mut self.tool_orchestrator,
992 };
993 let mut sink_adapter = crate::channel::ChannelSinkAdapter(&mut self.channel);
995 let mut null_agent = zeph_commands::NullAgent;
997 let registry_handled = {
998 use zeph_commands::CommandRegistry;
999 use zeph_commands::handlers::debug::{
1000 DebugDumpCommand, DumpFormatCommand, LogCommand,
1001 };
1002 use zeph_commands::handlers::help::HelpCommand;
1003 use zeph_commands::handlers::session::{
1004 ClearCommand, ClearQueueCommand, ExitCommand, QuitCommand, ResetCommand,
1005 };
1006
1007 let mut reg = CommandRegistry::new();
1008 reg.register(ExitCommand);
1009 reg.register(QuitCommand);
1010 reg.register(ClearCommand);
1011 reg.register(ResetCommand);
1012 reg.register(ClearQueueCommand);
1013 reg.register(LogCommand);
1014 reg.register(DebugDumpCommand);
1015 reg.register(DumpFormatCommand);
1016 reg.register(HelpCommand);
1017 #[cfg(test)]
1018 reg.register(test_stubs::TestErrorCommand);
1019
1020 let mut ctx = zeph_commands::CommandContext {
1021 sink: &mut sink_adapter,
1022 debug: &mut self.runtime.debug,
1023 messages: &mut messages_impl,
1024 session: &session_impl,
1025 agent: &mut null_agent,
1026 };
1027 reg.dispatch(&mut ctx, trimmed).await
1028 };
1029 let session_reg_missed = registry_handled.is_none();
1030 match self
1031 .apply_dispatch_result(registry_handled, trimmed, false)
1032 .await
1033 {
1034 DispatchFlow::Break => break,
1035 DispatchFlow::Continue => continue,
1036 DispatchFlow::Fallthrough => {
1037 }
1039 }
1040
1041 let mut agent_null_debug = command_context_impls::NullDebugAccess;
1047 let mut agent_null_messages = command_context_impls::NullMessageAccess;
1048 let agent_null_session = command_context_impls::NullSessionAccess;
1049 let mut agent_null_sink = zeph_commands::NullSink;
1050 let agent_result: Option<
1051 Result<zeph_commands::CommandOutput, zeph_commands::CommandError>,
1052 > = if session_reg_missed {
1053 use zeph_commands::CommandRegistry;
1054 use zeph_commands::handlers::{
1055 acp::AcpCommand,
1056 agent_cmd::AgentCommand,
1057 agents_fleet::AgentsFleetCommand,
1058 compaction::{CompactCommand, NewConversationCommand, RecapCommand},
1059 experiment::ExperimentCommand,
1060 goal::GoalCommand,
1061 loop_cmd::LoopCommand,
1062 lsp::LspCommand,
1063 mcp::McpCommand,
1064 memory::{GraphCommand, GuidelinesCommand, MemoryCommand},
1065 misc::{CacheStatsCommand, ImageCommand, NotifyTestCommand},
1066 model::{ModelCommand, ProviderCommand},
1067 plan::PlanCommand,
1068 plugins::PluginsCommand,
1069 policy::PolicyCommand,
1070 scheduler::SchedulerCommand,
1071 skill::{FeedbackCommand, SkillCommand, SkillsCommand},
1072 status::{FocusCommand, GuardrailCommand, SideQuestCommand, StatusCommand},
1073 trajectory::{ScopeCommand, TrajectoryCommand},
1074 };
1075
1076 let mut agent_reg = CommandRegistry::new();
1077 agent_reg.register(MemoryCommand);
1078 agent_reg.register(GraphCommand);
1079 agent_reg.register(GuidelinesCommand);
1080 agent_reg.register(ModelCommand);
1081 agent_reg.register(ProviderCommand);
1082 agent_reg.register(SkillCommand);
1084 agent_reg.register(SkillsCommand);
1085 agent_reg.register(FeedbackCommand);
1086 agent_reg.register(McpCommand);
1087 agent_reg.register(PolicyCommand);
1088 agent_reg.register(SchedulerCommand);
1089 agent_reg.register(LspCommand);
1090 agent_reg.register(CacheStatsCommand);
1092 agent_reg.register(ImageCommand);
1093 agent_reg.register(NotifyTestCommand);
1094 agent_reg.register(StatusCommand);
1095 agent_reg.register(GuardrailCommand);
1096 agent_reg.register(FocusCommand);
1097 agent_reg.register(SideQuestCommand);
1098 agent_reg.register(AgentCommand);
1099 agent_reg.register(AgentsFleetCommand);
1100 agent_reg.register(CompactCommand);
1102 agent_reg.register(NewConversationCommand);
1103 agent_reg.register(RecapCommand);
1104 agent_reg.register(ExperimentCommand);
1105 agent_reg.register(PlanCommand);
1106 agent_reg.register(LoopCommand);
1107 agent_reg.register(PluginsCommand);
1108 agent_reg.register(AcpCommand);
1109 #[cfg(feature = "cocoon")]
1110 agent_reg.register(zeph_commands::handlers::cocoon::CocoonCommand);
1111 agent_reg.register(TrajectoryCommand);
1112 agent_reg.register(ScopeCommand);
1113 agent_reg.register(GoalCommand);
1114
1115 let mut ctx = zeph_commands::CommandContext {
1116 sink: &mut agent_null_sink,
1117 debug: &mut agent_null_debug,
1118 messages: &mut agent_null_messages,
1119 session: &agent_null_session,
1120 agent: self,
1121 };
1122 agent_reg.dispatch(&mut ctx, trimmed).await
1124 } else {
1125 None
1126 };
1127 if let Some((cancelled_id, new_id)) = self.services.autonomous.flush_pending_start() {
1133 if let Some(cid) = cancelled_id {
1134 tracing::info!(
1135 goal_id = cid,
1136 "autonomous: previous session cancelled for new goal"
1137 );
1138 }
1139 self.sync_registry_entry();
1140 tracing::info!(goal_id = new_id, "autonomous: session started");
1141 }
1142
1143 match self
1146 .apply_dispatch_result(agent_result, trimmed, true)
1147 .await
1148 {
1149 DispatchFlow::Break => break,
1150 DispatchFlow::Continue => continue,
1151 DispatchFlow::Fallthrough => {
1152 }
1154 }
1155
1156 match self.handle_builtin_command(trimmed) {
1157 Some(true) => break,
1158 Some(false) => continue,
1159 None => {}
1160 }
1161
1162 self.process_user_message(text, image_parts).await?;
1163 }
1164
1165 self.maybe_autodream().await;
1168
1169 if let Some(ref mut tc) = self.runtime.debug.trace_collector {
1171 tc.finish();
1172 }
1173
1174 Ok(())
1175 }
1176
1177 async fn apply_dispatch_result(
1183 &mut self,
1184 result: Option<Result<zeph_commands::CommandOutput, zeph_commands::CommandError>>,
1185 command: &str,
1186 with_learning: bool,
1187 ) -> DispatchFlow {
1188 match result {
1189 Some(Ok(zeph_commands::CommandOutput::Exit)) => {
1190 let _ = self.channel.flush_chunks().await;
1191 DispatchFlow::Break
1192 }
1193 Some(Ok(
1194 zeph_commands::CommandOutput::Continue | zeph_commands::CommandOutput::Silent,
1195 )) => {
1196 let _ = self.channel.flush_chunks().await;
1197 DispatchFlow::Continue
1198 }
1199 Some(Ok(zeph_commands::CommandOutput::Message(msg))) => {
1200 let _ = self.channel.send(&msg).await;
1201 let _ = self.channel.flush_chunks().await;
1202 if with_learning {
1203 self.maybe_trigger_post_command_learning(command).await;
1204 }
1205 DispatchFlow::Continue
1206 }
1207 Some(Err(e)) => {
1208 let _ = self.channel.send(&e.to_string()).await;
1209 let _ = self.channel.flush_chunks().await;
1210 tracing::warn!(command = %command, error = %e.0, "slash command failed");
1211 DispatchFlow::Continue
1212 }
1213 None => DispatchFlow::Fallthrough,
1214 }
1215 }
1216
1217 fn apply_provider_override(&mut self) {
1219 if let Some(ref slot) = self.runtime.providers.provider_override
1220 && let Some(new_provider) = slot.write().take()
1221 {
1222 tracing::debug!(provider = new_provider.name(), "ACP model override applied");
1223 self.provider = new_provider;
1224 }
1225 }
1226
1227 #[tracing::instrument(name = "core.agent.next_event", skip_all, level = "debug", err)]
1235 async fn next_event(&mut self) -> Result<Option<LoopEvent>, error::AgentError> {
1236 let event = tokio::select! {
1237 result = self.channel.recv() => {
1238 return Ok(result?.map(LoopEvent::Message));
1239 }
1240 () = shutdown_signal(&mut self.runtime.lifecycle.shutdown) => {
1241 tracing::info!("shutting down");
1242 LoopEvent::Shutdown
1243 }
1244 Some(_) = recv_optional(&mut self.services.skill.skill_reload_rx) => {
1245 LoopEvent::SkillReload
1246 }
1247 Some(_) = recv_optional(&mut self.runtime.instructions.reload_rx) => {
1248 LoopEvent::InstructionReload
1249 }
1250 Some(_) = recv_optional(&mut self.runtime.lifecycle.config_reload_rx) => {
1251 LoopEvent::ConfigReload
1252 }
1253 Some(msg) = recv_optional(&mut self.runtime.lifecycle.update_notify_rx) => {
1254 LoopEvent::UpdateNotification(msg)
1255 }
1256 Some(msg) = recv_optional(&mut self.services.experiments.notify_rx) => {
1257 LoopEvent::ExperimentCompleted(msg)
1258 }
1259 Some(prompt) = recv_optional(&mut self.runtime.lifecycle.custom_task_rx) => {
1260 tracing::info!("scheduler: injecting custom task as agent turn");
1261 LoopEvent::ScheduledTask(prompt)
1262 }
1263 () = async {
1264 if let Some(ref mut ls) = self.runtime.lifecycle.user_loop {
1265 if ls.cancel_tx.is_cancelled() {
1266 std::future::pending::<()>().await;
1267 } else {
1268 ls.interval.tick().await;
1269 }
1270 } else {
1271 std::future::pending::<()>().await;
1272 }
1273 } => {
1274 let Some(ls) = self.runtime.lifecycle.user_loop.as_ref() else {
1278 return Ok(None);
1279 };
1280 if ls.cancel_tx.is_cancelled() {
1281 self.runtime.lifecycle.user_loop = None;
1282 return Ok(None);
1283 }
1284 let prompt = ls.prompt.clone();
1285 LoopEvent::TaskInjected(task_injection::TaskInjection { prompt })
1286 }
1287 Some(event) = recv_optional(&mut self.runtime.lifecycle.file_changed_rx) => {
1288 LoopEvent::FileChanged(event)
1289 }
1290 () = self.services.autonomous.next_tick(),
1292 if self.services.autonomous.should_tick() => {
1293 LoopEvent::AutonomousTick
1294 }
1295 };
1296 Ok(Some(event))
1297 }
1298
1299 #[tracing::instrument(name = "core.agent.resolve_message", skip_all, level = "debug")]
1300 async fn resolve_message(
1301 &self,
1302 msg: crate::channel::ChannelMessage,
1303 ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
1304 use crate::channel::{Attachment, AttachmentKind};
1305 use zeph_llm::provider::{ImageData, MessagePart};
1306
1307 let text_base = msg.text.clone();
1308
1309 let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
1310 .attachments
1311 .into_iter()
1312 .partition(|a| a.kind == AttachmentKind::Audio);
1313
1314 tracing::debug!(
1315 audio = audio_attachments.len(),
1316 has_stt = self.runtime.providers.stt.is_some(),
1317 "resolve_message attachments"
1318 );
1319
1320 let text = if !audio_attachments.is_empty()
1321 && let Some(stt) = self.runtime.providers.stt.as_ref()
1322 {
1323 let mut transcribed_parts = Vec::new();
1324 for attachment in &audio_attachments {
1325 if attachment.data.len() > MAX_AUDIO_BYTES {
1326 tracing::warn!(
1327 size = attachment.data.len(),
1328 max = MAX_AUDIO_BYTES,
1329 "audio attachment exceeds size limit, skipping"
1330 );
1331 continue;
1332 }
1333 match stt
1334 .transcribe(&attachment.data, attachment.filename.as_deref())
1335 .await
1336 {
1337 Ok(result) => {
1338 tracing::info!(
1339 len = result.text.len(),
1340 language = ?result.language,
1341 "audio transcribed"
1342 );
1343 transcribed_parts.push(result.text);
1344 }
1345 Err(e) => {
1346 tracing::error!(error = %e, "audio transcription failed");
1347 }
1348 }
1349 }
1350 if transcribed_parts.is_empty() {
1351 text_base
1352 } else {
1353 let transcribed = transcribed_parts.join("\n");
1354 if text_base.is_empty() {
1355 transcribed
1356 } else {
1357 format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
1358 }
1359 }
1360 } else {
1361 if !audio_attachments.is_empty() {
1362 tracing::warn!(
1363 count = audio_attachments.len(),
1364 "audio attachments received but no STT provider configured, dropping"
1365 );
1366 }
1367 text_base
1368 };
1369
1370 let mut image_parts = Vec::new();
1371 for attachment in image_attachments {
1372 if attachment.data.len() > MAX_IMAGE_BYTES {
1373 tracing::warn!(
1374 size = attachment.data.len(),
1375 max = MAX_IMAGE_BYTES,
1376 "image attachment exceeds size limit, skipping"
1377 );
1378 continue;
1379 }
1380 let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
1381 image_parts.push(MessagePart::Image(Box::new(ImageData {
1382 data: attachment.data,
1383 mime_type,
1384 })));
1385 }
1386
1387 (text, image_parts)
1388 }
1389
1390 fn begin_turn(&mut self, input: turn::TurnInput) -> turn::Turn {
1397 let id = turn::TurnId(self.runtime.debug.iteration_counter as u64);
1398 self.runtime.debug.iteration_counter += 1;
1399 let cancel_token = CancellationToken::new();
1400 self.runtime.lifecycle.cancel_token = cancel_token.clone();
1402 self.services.security.user_provided_urls.write().clear();
1403 self.runtime.lifecycle.turn_llm_requests = 0;
1405
1406 {
1408 let pending: Vec<u8> = {
1409 let mut q = self.services.security.trajectory_signal_queue.lock();
1410 std::mem::take(&mut *q)
1411 };
1412 for code in pending {
1413 self.services
1414 .security
1415 .trajectory
1416 .record(crate::agent::trajectory::RiskSignal::from_code(code));
1417 }
1418 }
1419 if self.services.security.trajectory.advance_turn()
1422 && let Some(logger) = self.tool_orchestrator.audit_logger.clone()
1423 {
1424 let entry = zeph_tools::AuditEntry {
1425 timestamp: zeph_tools::chrono_now(),
1426 tool: "<sentinel>".to_owned().into(),
1427 command: String::new(),
1428 result: zeph_tools::AuditResult::Success,
1429 duration_ms: 0,
1430 error_category: Some("trajectory_auto_recover".to_owned()),
1431 error_domain: Some("security".to_owned()),
1432 error_phase: None,
1433 claim_source: None,
1434 mcp_server_id: None,
1435 injection_flagged: false,
1436 embedding_anomalous: false,
1437 cross_boundary_mcp_to_acp: false,
1438 adversarial_policy_decision: None,
1439 exit_code: None,
1440 truncated: false,
1441 caller_id: None,
1442 policy_match: None,
1443 correlation_id: None,
1444 vigil_risk: None,
1445 execution_env: None,
1446 resolved_cwd: None,
1447 scope_at_definition: None,
1448 scope_at_dispatch: None,
1449 };
1450 self.runtime.lifecycle.supervisor.spawn(
1451 crate::agent::agent_supervisor::TaskClass::Telemetry,
1452 "trajectory-auto-recover-audit",
1453 async move { logger.log(&entry).await },
1454 );
1455 }
1456 if let Some(ref sentinel) = self.services.security.shadow_sentinel {
1458 sentinel.advance_turn();
1459 }
1460 if let Some(ref acc) = self.services.security.risk_chain_accumulator {
1462 acc.reset();
1463 }
1464 let risk_level = self.services.security.trajectory.current_risk();
1466 *self.services.security.trajectory_risk_slot.write() = u8::from(risk_level);
1467 if let Some(alert) = self.services.security.trajectory.poll_alert() {
1469 let msg = format!(
1470 "[trajectory] Risk level: {:?} (score={:.2})",
1471 alert.level, alert.score
1472 );
1473 tracing::warn!(
1474 level = ?alert.level,
1475 score = alert.score,
1476 "trajectory sentinel alert"
1477 );
1478 if let Some(ref tx) = self.services.session.status_tx {
1479 let _ = tx.send(msg);
1480 }
1481 }
1482
1483 let context = turn::TurnContext::new(id, cancel_token, self.runtime.config.timeouts)
1484 .with_tool_allowlist(self.runtime.config.channel_tool_allowlist.clone());
1485 turn::Turn::new(context, input)
1486 }
1487
1488 fn end_turn(&mut self, turn: turn::Turn) {
1495 self.runtime.metrics.pending_timings = turn.metrics.timings;
1496 self.flush_turn_timings();
1497 self.services.session.current_turn_intent = None;
1499 self.services.session.is_guest_context = false;
1501 if let Some(ref engine) = self.services.speculation_engine {
1503 let metrics = engine.end_turn();
1504 if metrics.committed > 0 || metrics.cancelled > 0 {
1505 tracing::debug!(
1506 committed = metrics.committed,
1507 cancelled = metrics.cancelled,
1508 wasted_ms = metrics.wasted_ms,
1509 "speculation: turn boundary metrics"
1510 );
1511 }
1512 }
1513 }
1514
1515 #[tracing::instrument(
1516 name = "core.agent.process_user_message",
1517 skip_all,
1518 level = "debug",
1519 fields(turn_id),
1520 err
1521 )]
1522 async fn process_user_message(
1523 &mut self,
1524 text: String,
1525 image_parts: Vec<zeph_llm::provider::MessagePart>,
1526 ) -> Result<(), error::AgentError> {
1527 let input = turn::TurnInput::new(text, image_parts);
1528 let mut t = self.begin_turn(input);
1529
1530 let turn_idx = usize::try_from(t.id().0).unwrap_or(usize::MAX);
1531 tracing::Span::current().record("turn_id", t.id().0);
1532 self.runtime
1534 .debug
1535 .start_iteration_span(turn_idx, t.input.text.trim());
1536
1537 let result = Box::pin(self.process_user_message_inner(&mut t)).await;
1538
1539 let span_status = if result.is_ok() {
1541 crate::debug_dump::trace::SpanStatus::Ok
1542 } else {
1543 crate::debug_dump::trace::SpanStatus::Error {
1544 message: "iteration failed".to_owned(),
1545 }
1546 };
1547 self.runtime.debug.end_iteration_span(turn_idx, span_status);
1548
1549 self.end_turn(t);
1550 result
1551 }
1552
1553 #[tracing::instrument(
1554 name = "core.agent.process_user_message_inner",
1555 skip_all,
1556 level = "debug",
1557 err
1558 )]
1559 async fn process_user_message_inner(
1560 &mut self,
1561 turn: &mut turn::Turn,
1562 ) -> Result<(), error::AgentError> {
1563 self.reap_background_tasks_and_update_metrics();
1564
1565 let tokens_before_turn = self
1566 .runtime
1567 .metrics
1568 .metrics_tx
1569 .as_ref()
1570 .map_or(0, |tx| tx.borrow().total_tokens);
1571
1572 self.drain_background_completions();
1576
1577 self.wire_cancel_bridge(turn.cancel_token());
1578
1579 let text = turn.input.text.clone();
1581 let trimmed_owned = text.trim().to_owned();
1582 let trimmed = trimmed_owned.as_str();
1583
1584 if self.services.security.vigil.is_some() {
1587 let intent_len = trimmed.floor_char_boundary(1024.min(trimmed.len()));
1588 self.services.session.current_turn_intent = Some(trimmed[..intent_len].to_owned());
1589 }
1590
1591 if let Some(result) = self.dispatch_slash_command(trimmed).await {
1592 return result;
1593 }
1594
1595 self.check_pending_rollbacks().await;
1596
1597 if self.pre_process_security(trimmed).await? {
1598 return Ok(());
1599 }
1600
1601 let t_ctx = std::time::Instant::now();
1602 tracing::debug!("turn timing: prepare_context start");
1603 self.advance_context_lifecycle_guarded(&text, trimmed).await;
1604 turn.metrics_mut().timings.prepare_context_ms =
1605 u64::try_from(t_ctx.elapsed().as_millis()).unwrap_or(u64::MAX);
1606 tracing::debug!(
1607 ms = turn.metrics_snapshot().timings.prepare_context_ms,
1608 "turn timing: prepare_context done"
1609 );
1610 let _ = self
1612 .channel
1613 .send_context_estimate(
1614 usize::try_from(self.runtime.providers.cached_prompt_tokens).unwrap_or(usize::MAX),
1615 )
1616 .await;
1617
1618 let image_parts = std::mem::take(&mut turn.input.image_parts);
1619 let merged_text = self.build_user_message_text_with_bg_completions(&text);
1623 let user_msg = self.build_user_message(&merged_text, image_parts);
1624
1625 let urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
1628 if !urls.is_empty() {
1629 self.services
1630 .security
1631 .user_provided_urls
1632 .write()
1633 .extend(urls);
1634 }
1635
1636 self.services.memory.extraction.goal_text = Some(text.clone());
1639
1640 let t_persist = std::time::Instant::now();
1641 tracing::debug!("turn timing: persist_message(user) start");
1642 self.persist_message(Role::User, &text, &[], false).await;
1644 turn.metrics_mut().timings.persist_message_ms =
1645 u64::try_from(t_persist.elapsed().as_millis()).unwrap_or(u64::MAX);
1646 tracing::debug!(
1647 ms = turn.metrics_snapshot().timings.persist_message_ms,
1648 "turn timing: persist_message(user) done"
1649 );
1650 self.push_message(user_msg);
1651
1652 let context_estimate = self.runtime.providers.cached_prompt_tokens;
1654 self.update_metrics(|m| m.context_tokens = context_estimate);
1655
1656 tracing::debug!("turn timing: process_response start");
1659 let turn_had_error = if let Err(e) = self.process_response().await {
1660 self.services.learning_engine.learning_tasks.detach_all();
1662 tracing::error!("Response processing failed: {e:#}");
1663
1664 if e.is_no_providers() {
1667 self.runtime.lifecycle.last_no_providers_at = Some(std::time::Instant::now());
1668 let backoff_secs = self.runtime.config.timeouts.no_providers_backoff_secs;
1669 tracing::warn!(
1670 backoff_secs,
1671 "no providers available; backing off before next turn"
1672 );
1673 tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await;
1674 }
1675
1676 let user_msg = format!("Error: {e:#}");
1677 self.channel.send(&user_msg).await?;
1678 self.msg.messages.pop();
1679 self.recompute_prompt_tokens();
1680 self.channel.flush_chunks().await?;
1681 true
1682 } else {
1683 self.services.learning_engine.learning_tasks.detach_all();
1686 self.truncate_old_tool_results();
1687 self.maybe_update_magic_docs();
1689 self.maybe_spawn_promotion_scan();
1691 false
1692 };
1693 tracing::debug!("turn timing: process_response done");
1694
1695 if let Some(pipeline) = self.services.quality.clone() {
1697 self.run_self_check_for_turn(pipeline, turn.id().0).await;
1698 }
1699 let _ = self.channel.flush_chunks().await;
1704
1705 self.maybe_fire_completion_notification(turn, turn_had_error);
1706
1707 self.flush_goal_accounting(tokens_before_turn);
1708
1709 turn.metrics_mut().timings.llm_chat_ms = self.runtime.metrics.pending_timings.llm_chat_ms;
1714 turn.metrics_mut().timings.tool_exec_ms = self.runtime.metrics.pending_timings.tool_exec_ms;
1715
1716 Ok(())
1717 }
1718
1719 fn wire_cancel_bridge(&mut self, turn_token: &tokio_util::sync::CancellationToken) {
1725 let signal = Arc::clone(&self.runtime.lifecycle.cancel_signal);
1726 let token = turn_token.clone();
1727 self.runtime.lifecycle.cancel_token = turn_token.clone();
1729 if let Some(prev) = self.runtime.lifecycle.cancel_bridge_handle.take() {
1730 prev.abort();
1731 }
1732 self.runtime.lifecycle.cancel_bridge_handle =
1733 Some(self.runtime.lifecycle.task_supervisor.spawn_oneshot(
1734 std::sync::Arc::from("agent.lifecycle.cancel_bridge"),
1735 move || async move {
1736 signal.notified().await;
1737 token.cancel();
1738 },
1739 ));
1740 }
1741
1742 fn reap_background_tasks_and_update_metrics(&mut self) {
1746 let bg_signal = self.runtime.lifecycle.supervisor.reap();
1747 if bg_signal.did_summarize {
1748 self.services.memory.persistence.unsummarized_count = 0;
1749 tracing::debug!("background summarization completed; unsummarized_count reset");
1750 }
1751 let snap = self.runtime.lifecycle.supervisor.metrics_snapshot();
1752 self.update_metrics(|m| {
1753 m.bg_inflight = snap.inflight as u64;
1754 m.bg_dropped = snap.total_dropped();
1755 m.bg_completed = snap.total_completed();
1756 m.bg_enrichment_inflight = snap.class_inflight[0] as u64;
1757 m.bg_telemetry_inflight = snap.class_inflight[1] as u64;
1758 });
1759
1760 if self.runtime.lifecycle.shell_executor_handle.is_some() {
1762 let shell_rows: Vec<crate::metrics::ShellBackgroundRunRow> = self
1763 .runtime
1764 .lifecycle
1765 .shell_executor_handle
1766 .as_ref()
1767 .map(|e| e.background_runs_snapshot())
1768 .unwrap_or_default()
1769 .into_iter()
1770 .map(|s| crate::metrics::ShellBackgroundRunRow {
1771 run_id: truncate_shell_run_id(&s.run_id),
1772 command: truncate_shell_command(&s.command),
1773 elapsed_secs: s.elapsed_ms / 1000,
1774 })
1775 .collect();
1776 self.update_metrics(|m| {
1777 m.shell_background_runs = shell_rows;
1778 });
1779 }
1780
1781 if self
1784 .runtime
1785 .config
1786 .supervisor_config
1787 .abort_enrichment_on_turn
1788 {
1789 self.runtime
1790 .lifecycle
1791 .supervisor
1792 .abort_class(agent_supervisor::TaskClass::Enrichment);
1793 }
1794 }
1795
1796 fn maybe_fire_completion_notification(&mut self, turn: &turn::Turn, is_error: bool) {
1809 let snap = turn.metrics_snapshot().timings.clone();
1810 let duration_ms = snap
1811 .prepare_context_ms
1812 .saturating_add(snap.llm_chat_ms)
1813 .saturating_add(snap.tool_exec_ms);
1814 let summary = crate::notifications::TurnSummary {
1815 duration_ms,
1816 preview: self.last_assistant_preview(160),
1817 tool_calls: 0,
1819 llm_requests: self.runtime.lifecycle.turn_llm_requests,
1820 exit_status: if is_error {
1821 crate::notifications::TurnExitStatus::Error
1822 } else {
1823 crate::notifications::TurnExitStatus::Success
1824 },
1825 };
1826
1827 let gate_ok = self
1829 .runtime
1830 .lifecycle
1831 .notifier
1832 .as_ref()
1833 .is_none_or(|n| n.should_fire(&summary));
1834
1835 if let Some(ref notifier) = self.runtime.lifecycle.notifier
1837 && gate_ok
1838 {
1839 notifier.fire(&summary);
1840 }
1841
1842 let hooks = self.services.session.hooks_config.turn_complete.clone();
1847 if !hooks.is_empty() && gate_ok {
1848 let mut env = std::collections::HashMap::new();
1849 env.insert(
1850 "ZEPH_TURN_DURATION_MS".to_owned(),
1851 summary.duration_ms.to_string(),
1852 );
1853 env.insert(
1854 "ZEPH_TURN_STATUS".to_owned(),
1855 if is_error { "error" } else { "success" }.to_owned(),
1856 );
1857 env.insert("ZEPH_TURN_PREVIEW".to_owned(), summary.preview.clone());
1858 env.insert(
1859 "ZEPH_TURN_LLM_REQUESTS".to_owned(),
1860 summary.llm_requests.to_string(),
1861 );
1862 let dispatch = self.mcp_dispatch();
1863 let _span = tracing::info_span!("core.agent.turn_hooks").entered();
1864 let _accepted = self.runtime.lifecycle.supervisor.spawn(
1865 agent_supervisor::TaskClass::Telemetry,
1866 "turn-complete-hooks",
1867 async move {
1868 let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
1869 .as_ref()
1870 .map(|d| d as &dyn zeph_subagent::McpDispatch);
1871 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp, None).await
1872 {
1873 tracing::warn!(error = %e, "turn_complete hook failed");
1874 }
1875 },
1876 );
1877 }
1878 }
1879
1880 fn flush_goal_accounting(&mut self, tokens_before: u64) {
1883 let goal_snap = self
1884 .services
1885 .goal_accounting
1886 .as_ref()
1887 .and_then(|a| a.snapshot());
1888 self.update_metrics(|m| m.active_goal = goal_snap);
1889
1890 if let Some(ref accounting) = self.services.goal_accounting {
1891 let tokens_after = self
1892 .runtime
1893 .metrics
1894 .metrics_tx
1895 .as_ref()
1896 .map_or(0, |tx| tx.borrow().total_tokens);
1897 let turn_tokens = tokens_after.saturating_sub(tokens_before);
1898 let mut spawned: Option<
1899 std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + 'static>>,
1900 > = None;
1901 accounting.on_turn_complete(turn_tokens, |fut| {
1902 spawned = Some(fut);
1903 });
1904 if let Some(fut) = spawned {
1905 let _ = self.runtime.lifecycle.supervisor.spawn(
1906 agent_supervisor::TaskClass::Telemetry,
1907 "goal-accounting",
1908 fut,
1909 );
1910 }
1911 }
1912 }
1913
1914 #[tracing::instrument(
1916 name = "core.agent.pre_process_security",
1917 skip_all,
1918 level = "debug",
1919 err
1920 )]
1921 async fn pre_process_security(&mut self, trimmed: &str) -> Result<bool, error::AgentError> {
1922 if let Some(ref guardrail) = self.services.security.guardrail {
1924 use zeph_sanitizer::guardrail::GuardrailVerdict;
1925 let verdict = guardrail.check(trimmed).await;
1926 match &verdict {
1927 GuardrailVerdict::Flagged { reason, .. } => {
1928 tracing::warn!(
1929 reason = %reason,
1930 should_block = verdict.should_block(),
1931 "guardrail flagged user input"
1932 );
1933 if verdict.should_block() {
1934 let msg = format!("[guardrail] Input blocked: {reason}");
1935 let _ = self.channel.send(&msg).await;
1936 let _ = self.channel.flush_chunks().await;
1937 return Ok(true);
1938 }
1939 let _ = self
1941 .channel
1942 .send(&format!("[guardrail] Warning: {reason}"))
1943 .await;
1944 }
1945 GuardrailVerdict::Error { error } => {
1946 if guardrail.error_should_block() {
1947 tracing::warn!(%error, "guardrail check failed (fail_strategy=closed), blocking input");
1948 let msg = "[guardrail] Input blocked: check failed (see logs for details)";
1949 let _ = self.channel.send(msg).await;
1950 let _ = self.channel.flush_chunks().await;
1951 return Ok(true);
1952 }
1953 tracing::warn!(%error, "guardrail check failed (fail_strategy=open), allowing input");
1954 }
1955 GuardrailVerdict::Safe => {}
1956 }
1957 }
1958
1959 #[cfg(feature = "classifiers")]
1965 if self.services.security.sanitizer.scan_user_input() {
1966 match self
1967 .services
1968 .security
1969 .sanitizer
1970 .classify_injection(trimmed)
1971 .await
1972 {
1973 zeph_sanitizer::InjectionVerdict::Blocked => {
1974 self.push_classifier_metrics();
1975 let _ = self
1976 .channel
1977 .send("[security] Input blocked: injection detected by classifier.")
1978 .await;
1979 let _ = self.channel.flush_chunks().await;
1980 return Ok(true);
1981 }
1982 zeph_sanitizer::InjectionVerdict::Suspicious => {
1983 tracing::warn!("injection_classifier soft_signal on user input");
1984 }
1985 zeph_sanitizer::InjectionVerdict::Clean => {}
1986 }
1987 }
1988 #[cfg(feature = "classifiers")]
1989 self.push_classifier_metrics();
1990
1991 Ok(false)
1992 }
1993
1994 async fn advance_context_lifecycle_guarded(&mut self, text: &str, trimmed: &str) {
2001 let backoff_secs = self.runtime.config.timeouts.no_providers_backoff_secs;
2002 let prep_timeout_secs = self.runtime.config.timeouts.context_prep_timeout_secs;
2003
2004 let providers_recently_failed = self
2006 .runtime
2007 .lifecycle
2008 .last_no_providers_at
2009 .is_some_and(|t| t.elapsed().as_secs() < backoff_secs);
2010
2011 if providers_recently_failed {
2012 tracing::warn!(
2013 backoff_secs,
2014 "skipping context preparation: providers were unavailable on last turn"
2015 );
2016 return;
2017 }
2018
2019 let timeout_dur = std::time::Duration::from_secs(prep_timeout_secs);
2020 match tokio::time::timeout(timeout_dur, self.advance_context_lifecycle(text, trimmed)).await
2021 {
2022 Ok(()) => {}
2023 Err(_elapsed) => {
2024 tracing::warn!(
2025 timeout_secs = prep_timeout_secs,
2026 "context preparation timed out; proceeding with degraded context"
2027 );
2028 }
2029 }
2030 }
2031
2032 #[tracing::instrument(
2033 name = "core.agent.advance_context_lifecycle",
2034 skip_all,
2035 level = "debug"
2036 )]
2037 async fn advance_context_lifecycle(&mut self, text: &str, trimmed: &str) {
2038 self.services.mcp.pruning_cache.reset();
2040
2041 let conv_id = self.services.memory.persistence.conversation_id;
2044 self.rebuild_system_prompt(text).await;
2045
2046 self.detect_and_record_corrections(trimmed, conv_id).await;
2047 self.services.learning_engine.tick();
2048 self.analyze_and_learn().await;
2049 self.sync_graph_counts().await;
2050
2051 self.context_manager
2056 .set_compaction_state(self.context_manager.compaction_state().advance_turn());
2057
2058 {
2060 self.services.focus.tick();
2061
2062 let sidequest_should_fire = self.services.sidequest.tick();
2065 if sidequest_should_fire
2066 && !self
2067 .context_manager
2068 .compaction_state()
2069 .is_compacted_this_turn()
2070 {
2071 self.maybe_sidequest_eviction();
2072 }
2073 }
2074
2075 {
2078 let cfg = &self.services.memory.extraction.graph_config.experience;
2079 if cfg.enabled
2080 && cfg.evolution_sweep_enabled
2081 && cfg.evolution_sweep_interval > 0
2082 && self
2083 .services
2084 .sidequest
2085 .turn_counter
2086 .checked_rem(cfg.evolution_sweep_interval as u64)
2087 == Some(0)
2088 && let Some(memory) = self.services.memory.persistence.memory.as_ref()
2089 && let (Some(exp), Some(graph)) =
2090 (memory.experience.as_ref(), memory.graph_store.as_ref())
2091 {
2092 let exp = std::sync::Arc::clone(exp);
2093 let graph = std::sync::Arc::clone(graph);
2094 let threshold = cfg.confidence_prune_threshold;
2095 let turn = self.services.sidequest.turn_counter;
2096 let accepted = self.runtime.lifecycle.supervisor.spawn(
2097 agent_supervisor::TaskClass::Telemetry,
2098 "experience-sweep",
2099 async move {
2100 match exp.evolution_sweep(graph.as_ref(), threshold).await {
2101 Ok(stats) => tracing::info!(
2102 turn,
2103 self_loops = stats.pruned_self_loops,
2104 low_confidence = stats.pruned_low_confidence,
2105 "evolution sweep complete",
2106 ),
2107 Err(e) => tracing::warn!(
2108 turn,
2109 error = %e,
2110 "evolution sweep failed",
2111 ),
2112 }
2113 },
2114 );
2115 if !accepted {
2116 tracing::warn!(
2117 turn = self.services.sidequest.turn_counter,
2118 "experience-sweep dropped (telemetry class at capacity)",
2119 );
2120 }
2121 }
2122 }
2123
2124 if let Some(warning) = self.cache_expiry_warning() {
2126 tracing::info!(warning, "cache expiry warning");
2127 let _ = self.channel.send_status(&warning).await;
2128 }
2129
2130 self.maybe_time_based_microcompact();
2133
2134 self.maybe_apply_deferred_summaries();
2139 self.flush_deferred_summaries().await;
2140
2141 if let Err(e) = self.maybe_proactive_compress().await {
2143 tracing::warn!("proactive compression failed: {e:#}");
2144 }
2145
2146 if let Err(e) = self.maybe_compact().await {
2147 tracing::warn!("context compaction failed: {e:#}");
2148 }
2149
2150 if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
2151 tracing::warn!("context preparation failed: {e:#}");
2152 }
2153
2154 self.provider
2156 .set_memory_confidence(self.services.memory.persistence.last_recall_confidence);
2157
2158 self.services.learning_engine.reset_reflection();
2159 }
2160
2161 fn build_user_message(
2162 &mut self,
2163 text: &str,
2164 image_parts: Vec<zeph_llm::provider::MessagePart>,
2165 ) -> Message {
2166 let mut all_image_parts = std::mem::take(&mut self.msg.pending_image_parts);
2167 all_image_parts.extend(image_parts);
2168
2169 if !all_image_parts.is_empty() && self.provider.supports_vision() {
2170 let mut parts = vec![zeph_llm::provider::MessagePart::Text {
2171 text: text.to_owned(),
2172 }];
2173 parts.extend(all_image_parts);
2174 Message::from_parts(Role::User, parts)
2175 } else {
2176 if !all_image_parts.is_empty() {
2177 tracing::warn!(
2178 count = all_image_parts.len(),
2179 "image attachments dropped: provider does not support vision"
2180 );
2181 }
2182 Message {
2183 role: Role::User,
2184 content: text.to_owned(),
2185 parts: vec![],
2186 metadata: MessageMetadata::default(),
2187 }
2188 }
2189 }
2190
2191 fn drain_background_completions(&mut self) {
2195 const BACKGROUND_COMPLETION_BUFFER_CAP: usize = 16;
2196
2197 let Some(ref mut rx) = self.runtime.lifecycle.background_completion_rx else {
2198 return;
2199 };
2200 while let Ok(completion) = rx.try_recv() {
2202 if self.runtime.lifecycle.pending_background_completions.len()
2203 >= BACKGROUND_COMPLETION_BUFFER_CAP
2204 {
2205 tracing::warn!(
2206 run_id = %completion.run_id,
2207 "background completion buffer full; dropping run result"
2208 );
2209 self.runtime
2212 .lifecycle
2213 .pending_background_completions
2214 .pop_front();
2215 self.runtime
2216 .lifecycle
2217 .pending_background_completions
2218 .push_back(zeph_tools::BackgroundCompletion {
2219 run_id: completion.run_id,
2220 exit_code: -1,
2221 success: false,
2222 elapsed_ms: 0,
2223 command: completion.command,
2224 output: format!(
2225 "[background result for run {} dropped: buffer overflow]",
2226 completion.run_id
2227 ),
2228 });
2229 } else {
2230 self.runtime
2231 .lifecycle
2232 .pending_background_completions
2233 .push_back(completion);
2234 }
2235 }
2236 }
2237
2238 fn build_user_message_text_with_bg_completions(&mut self, user_text: &str) -> String {
2242 if self
2243 .runtime
2244 .lifecycle
2245 .pending_background_completions
2246 .is_empty()
2247 {
2248 return user_text.to_owned();
2249 }
2250 let mut parts = String::new();
2251 for completion in self
2252 .runtime
2253 .lifecycle
2254 .pending_background_completions
2255 .drain(..)
2256 {
2257 let _ = write!(
2258 parts,
2259 "[Background task {} completed]\nexit_code: {}\nsuccess: {}\nelapsed_ms: {}\ncommand: {}\n\n{}\n\n",
2260 completion.run_id,
2261 completion.exit_code,
2262 completion.success,
2263 completion.elapsed_ms,
2264 completion.command,
2265 completion.output,
2266 );
2267 }
2268 parts.push_str(user_text);
2269 parts
2270 }
2271
2272 async fn poll_subagent_until_done(
2276 &mut self,
2277 task_id: &str,
2278 label: &str,
2279 ) -> Option<(String, bool)> {
2280 use zeph_subagent::SubAgentState;
2281 let result = loop {
2282 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
2283
2284 #[allow(clippy::redundant_closure_for_method_calls)]
2288 let pending = self
2289 .services
2290 .orchestration
2291 .subagent_manager
2292 .as_mut()
2293 .and_then(|m| m.try_recv_secret_request());
2294 if let Some((req_task_id, req)) = pending {
2295 let confirm_prompt = format!(
2298 "Sub-agent requests secret '{}'. Allow?",
2299 crate::text::truncate_to_chars(&req.secret_key, 100)
2300 );
2301 let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
2302 if let Some(mgr) = self.services.orchestration.subagent_manager.as_mut() {
2303 if approved {
2304 let ttl = std::time::Duration::from_mins(5);
2305 let key = req.secret_key.clone();
2306 if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
2307 let _ = mgr.deliver_secret(&req_task_id, key);
2308 }
2309 } else {
2310 let _ = mgr.deny_secret(&req_task_id);
2311 }
2312 }
2313 }
2314
2315 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2316 let statuses = mgr.statuses();
2317 let Some((_, status)) = statuses.iter().find(|(id, _)| id == task_id) else {
2318 break (format!("{label} completed (no status available)."), true);
2319 };
2320 match status.state {
2321 SubAgentState::Completed => {
2322 let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
2323 break (format!("{label} completed: {msg}"), true);
2324 }
2325 SubAgentState::Failed => {
2326 let msg = status
2327 .last_message
2328 .clone()
2329 .unwrap_or_else(|| "unknown error".into());
2330 break (format!("{label} failed: {msg}"), false);
2331 }
2332 SubAgentState::Canceled => {
2333 break (format!("{label} was cancelled."), false);
2334 }
2335 _ => {
2336 let _ = self
2337 .channel
2338 .send_status(&format!(
2339 "{label}: turn {}/{}",
2340 status.turns_used,
2341 self.services
2342 .orchestration
2343 .subagent_manager
2344 .as_ref()
2345 .and_then(|m| m.agents_def(task_id))
2346 .map_or(20, |d| d.permissions.max_turns)
2347 ))
2348 .await;
2349 }
2350 }
2351 };
2352 Some(result)
2353 }
2354
2355 fn resolve_agent_id_prefix(&mut self, prefix: &str) -> Option<Result<String, String>> {
2358 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2359 let full_ids: Vec<String> = mgr
2360 .statuses()
2361 .into_iter()
2362 .map(|(tid, _)| tid)
2363 .filter(|tid| tid.starts_with(prefix))
2364 .collect();
2365 Some(match full_ids.as_slice() {
2366 [] => Err(format!("No sub-agent with id prefix '{prefix}'")),
2367 [fid] => Ok(fid.clone()),
2368 _ => Err(format!(
2369 "Ambiguous id prefix '{prefix}': matches {} agents",
2370 full_ids.len()
2371 )),
2372 })
2373 }
2374
2375 fn handle_agent_list(&self) -> Option<String> {
2376 use std::fmt::Write as _;
2377 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2378 let defs = mgr.definitions();
2379 if defs.is_empty() {
2380 return Some("No sub-agent definitions found.".into());
2381 }
2382 let mut out = String::from("Available sub-agents:\n");
2383 for d in defs {
2384 let memory_label = match d.memory {
2385 Some(zeph_subagent::MemoryScope::User) => " [memory:user]",
2386 Some(zeph_subagent::MemoryScope::Project) => " [memory:project]",
2387 Some(zeph_subagent::MemoryScope::Local) => " [memory:local]",
2388 None => "",
2389 };
2390 if let Some(ref src) = d.source {
2391 let _ = writeln!(
2392 out,
2393 " {}{} — {} ({})",
2394 d.name, memory_label, d.description, src
2395 );
2396 } else {
2397 let _ = writeln!(out, " {}{} — {}", d.name, memory_label, d.description);
2398 }
2399 }
2400 Some(out)
2401 }
2402
2403 fn handle_agent_status(&self) -> Option<String> {
2404 use std::fmt::Write as _;
2405 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2406 let statuses = mgr.statuses();
2407 if statuses.is_empty() {
2408 return Some("No active sub-agents.".into());
2409 }
2410 let mut out = String::from("Active sub-agents:\n");
2411 for (id, s) in &statuses {
2412 let state = format!("{:?}", s.state).to_lowercase();
2413 let elapsed = s.started_at.elapsed().as_secs();
2414 let _ = writeln!(
2415 out,
2416 " [{short}] {state} turns={t} elapsed={elapsed}s {msg}",
2417 short = &id[..8.min(id.len())],
2418 t = s.turns_used,
2419 msg = s.last_message.as_deref().unwrap_or(""),
2420 );
2421 if let Some(def) = mgr.agents_def(id)
2423 && let Some(scope) = def.memory
2424 && let Ok(dir) = zeph_subagent::memory::resolve_memory_dir(scope, &def.name)
2425 {
2426 let _ = writeln!(out, " memory: {}", dir.display());
2427 }
2428 }
2429 Some(out)
2430 }
2431
2432 fn handle_agent_approve(&mut self, id: &str) -> Option<String> {
2433 let full_id = match self.resolve_agent_id_prefix(id)? {
2434 Ok(fid) => fid,
2435 Err(msg) => return Some(msg),
2436 };
2437 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2438 if let Some((tid, req)) = mgr.try_recv_secret_request()
2439 && tid == full_id
2440 {
2441 let key = req.secret_key.clone();
2442 let ttl = std::time::Duration::from_mins(5);
2443 if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
2444 return Some(format!("Approve failed: {e}"));
2445 }
2446 if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
2447 return Some(format!("Secret delivery failed: {e}"));
2448 }
2449 return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
2450 }
2451 Some(format!(
2452 "No pending secret request for sub-agent '{full_id}'."
2453 ))
2454 }
2455
2456 fn handle_agent_deny(&mut self, id: &str) -> Option<String> {
2457 let full_id = match self.resolve_agent_id_prefix(id)? {
2458 Ok(fid) => fid,
2459 Err(msg) => return Some(msg),
2460 };
2461 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2462 match mgr.deny_secret(&full_id) {
2463 Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
2464 Err(e) => Some(format!("Deny failed: {e}")),
2465 }
2466 }
2467
2468 async fn handle_agent_command(&mut self, cmd: zeph_subagent::AgentCommand) -> Option<String> {
2469 use zeph_subagent::AgentCommand;
2470
2471 match cmd {
2472 AgentCommand::List => self.handle_agent_list(),
2473 AgentCommand::Background { name, prompt } => {
2474 self.handle_agent_background(&name, &prompt)
2475 }
2476 AgentCommand::Spawn { name, prompt }
2477 | AgentCommand::Mention {
2478 agent: name,
2479 prompt,
2480 } => self.handle_agent_spawn_foreground(&name, &prompt).await,
2481 AgentCommand::Status => self.handle_agent_status(),
2482 AgentCommand::Cancel { id } => self.handle_agent_cancel(&id),
2483 AgentCommand::Approve { id } => self.handle_agent_approve(&id),
2484 AgentCommand::Deny { id } => self.handle_agent_deny(&id),
2485 AgentCommand::Resume { id, prompt } => self.handle_agent_resume(&id, &prompt).await,
2486 }
2487 }
2488
2489 pub(crate) fn handle_agents_definitions_list(&self) -> String {
2494 use std::fmt::Write as _;
2495
2496 let Some(mgr) = self.services.orchestration.subagent_manager.as_ref() else {
2497 return String::new();
2498 };
2499 let defs = mgr.definitions();
2500 if defs.is_empty() {
2501 return String::new();
2502 }
2503 let mut out = String::from("Sub-agents:\n");
2504 for d in defs {
2505 let memory_label = match d.memory {
2506 Some(zeph_subagent::MemoryScope::User) => " [memory:user]",
2507 Some(zeph_subagent::MemoryScope::Project) => " [memory:project]",
2508 Some(zeph_subagent::MemoryScope::Local) => " [memory:local]",
2509 None => "",
2510 };
2511 if let Some(ref src) = d.source {
2512 let _ = writeln!(
2513 out,
2514 " {}{} — {} ({})",
2515 d.name, memory_label, d.description, src
2516 );
2517 } else {
2518 let _ = writeln!(out, " {}{} — {}", d.name, memory_label, d.description);
2519 }
2520 }
2521 out
2522 }
2523
2524 pub(crate) fn handle_agents_crud(&mut self, cmd: zeph_subagent::AgentsCommand) -> String {
2529 use zeph_subagent::AgentsCommand;
2530
2531 let Some(mgr) = self.services.orchestration.subagent_manager.as_ref() else {
2532 return "Sub-agent manager is not available.".to_owned();
2533 };
2534
2535 match cmd {
2536 AgentsCommand::List => self.handle_agents_definitions_list(),
2537 AgentsCommand::Show { name } => {
2538 match mgr.definitions().iter().find(|d| d.name == name) {
2539 Some(d) => format!(
2540 "Agent: {}\nDescription: {}\nSource: {}\n",
2541 d.name,
2542 d.description,
2543 d.source.as_deref().unwrap_or("unknown"),
2544 ),
2545 None => format!("No sub-agent definition named '{name}'."),
2546 }
2547 }
2548 AgentsCommand::Create { name } => {
2549 format!(
2550 "To create a sub-agent definition, create a file at `.zeph/agents/{name}.md`.\n\
2551 See the sub-agent documentation for the required frontmatter."
2552 )
2553 }
2554 AgentsCommand::Edit { name } => {
2555 format!("To edit '{name}', open its definition file in `.zeph/agents/{name}.md`.")
2556 }
2557 AgentsCommand::Delete { name } => {
2558 format!("To delete '{name}', remove the file `.zeph/agents/{name}.md`.")
2559 }
2560 }
2561 }
2562
2563 fn handle_agent_background(&mut self, name: &str, prompt: &str) -> Option<String> {
2564 let provider = self.provider.clone();
2565 let tool_executor = Arc::clone(&self.tool_executor);
2566 let skills = self.filtered_skills_for(name);
2567 let cfg = self.services.orchestration.subagent_config.clone();
2568 let spawn_ctx = self.build_spawn_context(&cfg);
2569 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2570 match mgr.spawn(
2571 name,
2572 prompt,
2573 provider,
2574 tool_executor,
2575 skills,
2576 &cfg,
2577 spawn_ctx,
2578 ) {
2579 Ok(id) => Some(format!(
2580 "Sub-agent '{name}' started in background (id: {short})",
2581 short = &id[..8.min(id.len())]
2582 )),
2583 Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
2584 }
2585 }
2586
2587 async fn handle_agent_spawn_foreground(&mut self, name: &str, prompt: &str) -> Option<String> {
2588 let provider = self.provider.clone();
2589 let tool_executor = Arc::clone(&self.tool_executor);
2590 let skills = self.filtered_skills_for(name);
2591 let cfg = self.services.orchestration.subagent_config.clone();
2592 let spawn_ctx = self.build_spawn_context(&cfg);
2593 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2594 let task_id = match mgr.spawn(
2595 name,
2596 prompt,
2597 provider,
2598 tool_executor,
2599 skills,
2600 &cfg,
2601 spawn_ctx,
2602 ) {
2603 Ok(id) => id,
2604 Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
2605 };
2606 let short = task_id[..8.min(task_id.len())].to_owned();
2607 let _ = self
2608 .channel
2609 .send(&format!("Sub-agent '{name}' running... (id: {short})"))
2610 .await;
2611 let _ = self
2612 .channel
2613 .notify_foreground_subagent_started(&task_id, name)
2614 .await;
2615 let label = format!("Sub-agent '{name}'");
2616 let Some((result, success)) = self.poll_subagent_until_done(&task_id, &label).await else {
2617 let _ = self
2619 .channel
2620 .notify_foreground_subagent_completed(&task_id, name, false)
2621 .await;
2622 return None;
2623 };
2624 let _ = self
2625 .channel
2626 .notify_foreground_subagent_completed(&task_id, name, success)
2627 .await;
2628 Some(result)
2629 }
2630
2631 fn handle_agent_cancel(&mut self, id: &str) -> Option<String> {
2632 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2633 let ids: Vec<String> = mgr
2635 .statuses()
2636 .into_iter()
2637 .map(|(task_id, _)| task_id)
2638 .filter(|task_id| task_id.starts_with(id))
2639 .collect();
2640 match ids.as_slice() {
2641 [] => Some(format!("No sub-agent with id prefix '{id}'")),
2642 [full_id] => {
2643 let full_id = full_id.clone();
2644 match mgr.cancel(&full_id) {
2645 Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
2646 Err(e) => Some(format!("Cancel failed: {e}")),
2647 }
2648 }
2649 _ => Some(format!(
2650 "Ambiguous id prefix '{id}': matches {} agents",
2651 ids.len()
2652 )),
2653 }
2654 }
2655
2656 async fn handle_agent_resume(&mut self, id: &str, prompt: &str) -> Option<String> {
2657 let cfg = self.services.orchestration.subagent_config.clone();
2658 let def_name = {
2661 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2662 match mgr.def_name_for_resume(id, &cfg) {
2663 Ok(name) => name,
2664 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2665 }
2666 };
2667 let skills = self.filtered_skills_for(&def_name);
2668 let provider = self.provider.clone();
2669 let tool_executor = Arc::clone(&self.tool_executor);
2670 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2671 let (task_id, _) = match mgr.resume(id, prompt, provider, tool_executor, skills, &cfg) {
2672 Ok(pair) => pair,
2673 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2674 };
2675 let short = task_id[..8.min(task_id.len())].to_owned();
2676 let _ = self
2677 .channel
2678 .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
2679 .await;
2680 let _ = self
2681 .channel
2682 .notify_foreground_subagent_started(&task_id, &def_name)
2683 .await;
2684 let Some((result, success)) = self
2685 .poll_subagent_until_done(&task_id, "Resumed sub-agent")
2686 .await
2687 else {
2688 let _ = self
2690 .channel
2691 .notify_foreground_subagent_completed(&task_id, &def_name, false)
2692 .await;
2693 return None;
2694 };
2695 let _ = self
2696 .channel
2697 .notify_foreground_subagent_completed(&task_id, &def_name, success)
2698 .await;
2699 Some(result)
2700 }
2701
2702 fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
2703 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2704 let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
2705 let reg = self.services.skill.registry.read();
2706 match zeph_subagent::filter_skills(®, &def.skills) {
2707 Ok(skills) => {
2708 let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
2709 if bodies.is_empty() {
2710 None
2711 } else {
2712 Some(bodies)
2713 }
2714 }
2715 Err(e) => {
2716 tracing::warn!(error = %e, "skill filtering failed for sub-agent");
2717 None
2718 }
2719 }
2720 }
2721
2722 fn build_spawn_context(
2724 &self,
2725 cfg: &zeph_config::SubAgentConfig,
2726 ) -> zeph_subagent::SpawnContext {
2727 zeph_subagent::SpawnContext {
2728 parent_messages: self.extract_parent_messages(cfg),
2729 parent_cancel: Some(self.runtime.lifecycle.cancel_token.clone()),
2730 parent_provider_name: {
2731 let name = &self.runtime.config.active_provider_name;
2732 if name.is_empty() {
2733 None
2734 } else {
2735 Some(name.clone())
2736 }
2737 },
2738 spawn_depth: self.runtime.config.spawn_depth,
2739 mcp_tool_names: self.extract_mcp_tool_names(),
2740 seed_trajectory_score: {
2742 let child = self.services.security.trajectory.spawn_child();
2743 let score = child.score_now();
2744 if score > 0.0 { Some(score) } else { None }
2745 },
2746 content_isolation: self.runtime.config.security.content_isolation.clone(),
2747 orchestrator_name: Some("zeph".to_owned()),
2748 orchestrator_role: Some("orchestrator".to_owned()),
2749 session_mcp_servers: Vec::new(),
2750 }
2751 }
2752
2753 fn extract_parent_messages(
2760 &self,
2761 config: &zeph_config::SubAgentConfig,
2762 ) -> Vec<zeph_llm::provider::Message> {
2763 use zeph_config::ParentContextPolicy;
2764 use zeph_llm::provider::Role;
2765
2766 if config.parent_context_policy == ParentContextPolicy::None
2767 || config.context_window_turns == 0
2768 {
2769 return Vec::new();
2770 }
2771
2772 let non_system: Vec<_> = self
2773 .msg
2774 .messages
2775 .iter()
2776 .filter(|m| m.role != Role::System)
2777 .cloned()
2778 .collect();
2779
2780 let take_count = config
2781 .context_window_turns
2782 .saturating_mul(2)
2783 .min(config.max_parent_messages);
2784 let start = non_system.len().saturating_sub(take_count);
2785 let mut msgs = non_system[start..].to_vec();
2786
2787 let max_chars = 128_000usize / 4;
2789 let requested = msgs.len();
2790 trim_parent_messages(&mut msgs, max_chars);
2791 if msgs.len() < requested {
2792 tracing::info!(
2793 kept = msgs.len(),
2794 requested,
2795 "[subagent] truncated parent history due to token budget or orphan pruning"
2796 );
2797 }
2798
2799 if config.parent_context_policy == ParentContextPolicy::InheritSanitized {
2800 use zeph_sanitizer::{ContentSource, ContentSourceKind};
2801 let source =
2802 ContentSource::new(ContentSourceKind::A2aMessage).with_identifier("parent_history");
2803 msgs = sanitize_parent_messages(msgs, &self.services.security.sanitizer, &source);
2804 }
2805
2806 msgs
2807 }
2808
2809 fn extract_mcp_tool_names(&self) -> Vec<String> {
2811 self.tool_executor
2812 .tool_definitions_erased()
2813 .into_iter()
2814 .filter(|t| t.id.starts_with("mcp_"))
2815 .map(|t| t.id.to_string())
2816 .collect()
2817 }
2818
2819 fn classify_source_kind(
2823 skill_dir: &std::path::Path,
2824 managed_dir: Option<&std::path::PathBuf>,
2825 bundled_names: &std::collections::HashSet<String>,
2826 ) -> zeph_memory::store::SourceKind {
2827 if managed_dir.is_some_and(|d| skill_dir.starts_with(d)) {
2828 let skill_name = skill_dir.file_name().and_then(|n| n.to_str()).unwrap_or("");
2829 let has_marker = skill_dir.join(".bundled").exists();
2830 if has_marker && bundled_names.contains(skill_name) {
2831 zeph_memory::store::SourceKind::Bundled
2832 } else {
2833 if has_marker {
2834 tracing::warn!(
2835 skill = %skill_name,
2836 "skill has .bundled marker but is not in the bundled skill \
2837 allowlist — classifying as Hub"
2838 );
2839 }
2840 zeph_memory::store::SourceKind::Hub
2841 }
2842 } else {
2843 zeph_memory::store::SourceKind::Local
2844 }
2845 }
2846
2847 async fn update_trust_for_reloaded_skills(
2849 &mut self,
2850 all_meta: &[zeph_skills::loader::SkillMeta],
2851 ) {
2852 let memory = self.services.memory.persistence.memory.clone();
2854 let Some(memory) = memory else {
2855 return;
2856 };
2857 let trust_cfg = self.services.skill.trust_config.clone();
2858 let managed_dir = self.services.skill.managed_dir.clone();
2859 let bundled_names: std::collections::HashSet<String> =
2860 zeph_skills::bundled_skill_names().into_iter().collect();
2861 for meta in all_meta {
2862 let skill_dir = meta.skill_dir.clone();
2865 let managed_dir_ref = managed_dir.clone();
2866 let bundled_names_ref = bundled_names.clone();
2867 let fs_result: Option<(String, zeph_memory::store::SourceKind)> =
2868 tokio::task::spawn_blocking(move || {
2869 let hash = zeph_skills::compute_skill_hash(&skill_dir).ok()?;
2870 let source_kind = Self::classify_source_kind(
2871 &skill_dir,
2872 managed_dir_ref.as_ref(),
2873 &bundled_names_ref,
2874 );
2875 Some((hash, source_kind))
2876 })
2877 .await
2878 .unwrap_or(None);
2879
2880 let Some((current_hash, source_kind)) = fs_result else {
2881 tracing::warn!("failed to compute hash for '{}'", meta.name);
2882 continue;
2883 };
2884 let initial_level = match source_kind {
2885 zeph_memory::store::SourceKind::Bundled => &trust_cfg.bundled_level,
2886 zeph_memory::store::SourceKind::Hub => &trust_cfg.default_level,
2887 zeph_memory::store::SourceKind::Local | zeph_memory::store::SourceKind::File => {
2888 &trust_cfg.local_level
2889 }
2890 };
2891 let existing = memory
2892 .sqlite()
2893 .load_skill_trust(&meta.name)
2894 .await
2895 .ok()
2896 .flatten();
2897 let trust_level = if let Some(ref row) = existing {
2898 if row.blake3_hash != current_hash {
2899 trust_cfg.hash_mismatch_level
2900 } else if row.source_kind != source_kind {
2901 let stored = row.trust_level;
2905 if !stored.is_active() || stored.severity() <= initial_level.severity() {
2906 stored
2907 } else {
2908 *initial_level
2909 }
2910 } else {
2911 row.trust_level
2912 }
2913 } else {
2914 *initial_level
2915 };
2916 let source_path = meta.skill_dir.to_str();
2917 if let Err(e) = memory
2918 .sqlite()
2919 .upsert_skill_trust(
2920 &meta.name,
2921 trust_level,
2922 source_kind,
2923 None,
2924 source_path,
2925 ¤t_hash,
2926 )
2927 .await
2928 {
2929 tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
2930 }
2931 }
2932 }
2933
2934 async fn rebuild_skill_matcher(&mut self, all_meta: &[&zeph_skills::loader::SkillMeta]) {
2936 let provider = self.embedding_provider.clone();
2937 let embed_timeout =
2938 std::time::Duration::from_secs(self.runtime.config.timeouts.embedding_seconds);
2939 let embed_fn = move |text: &str| -> zeph_skills::matcher::EmbedFuture {
2940 let owned = text.to_owned();
2941 let p = provider.clone();
2942 Box::pin(async move {
2943 if let Ok(result) = tokio::time::timeout(embed_timeout, p.embed(&owned)).await {
2944 result
2945 } else {
2946 tracing::warn!(
2947 timeout_secs = embed_timeout.as_secs(),
2948 "skill matcher: embedding timed out"
2949 );
2950 Err(zeph_llm::LlmError::Timeout)
2951 }
2952 })
2953 };
2954
2955 let needs_inmemory_rebuild = !self
2956 .services
2957 .skill
2958 .matcher
2959 .as_ref()
2960 .is_some_and(SkillMatcherBackend::is_qdrant);
2961
2962 if needs_inmemory_rebuild {
2963 self.services.skill.matcher = SkillMatcher::new(all_meta, embed_fn)
2964 .await
2965 .map(SkillMatcherBackend::InMemory);
2966 } else if let Some(ref mut backend) = self.services.skill.matcher {
2967 let _ = self.channel.send_status("syncing skill index...").await;
2968 let on_progress: Option<Box<dyn Fn(usize, usize) + Send>> =
2969 self.services.session.status_tx.clone().map(
2970 |tx| -> Box<dyn Fn(usize, usize) + Send> {
2971 Box::new(move |completed, total| {
2972 let msg = format!("Syncing skills: {completed}/{total}");
2973 let _ = tx.send(msg);
2974 })
2975 },
2976 );
2977 if let Err(e) = backend
2978 .sync(
2979 all_meta,
2980 &self.services.skill.embedding_model,
2981 embed_fn,
2982 on_progress,
2983 )
2984 .await
2985 {
2986 tracing::warn!("failed to sync skill embeddings: {e:#}");
2987 }
2988 }
2989
2990 if self.services.skill.hybrid_search {
2991 let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
2992 let _ = self.channel.send_status("rebuilding search index...").await;
2993 self.services.skill.rebuild_bm25(&descs);
2994 }
2995 }
2996
2997 #[tracing::instrument(name = "core.agent.reload_skills", skip_all, level = "debug")]
2998 async fn reload_skills(&mut self) {
2999 let old_fp = self.services.skill.fingerprint();
3000 let reload_paths = if let Some(ref supplier) = self.services.skill.plugin_dirs_supplier {
3001 let plugin_dirs = supplier();
3002 let mut paths = self.services.skill.skill_paths.clone();
3003 for dir in plugin_dirs {
3004 if !paths.contains(&dir) {
3005 paths.push(dir);
3006 }
3007 }
3008 paths
3009 } else {
3010 self.services.skill.skill_paths.clone()
3011 };
3012 self.services.skill.registry.write().reload(&reload_paths);
3013 if self.services.skill.fingerprint() == old_fp {
3014 return;
3015 }
3016 let _ = self.channel.send_status("reloading skills...").await;
3017
3018 let all_meta = self
3019 .services
3020 .skill
3021 .registry
3022 .read()
3023 .all_meta()
3024 .into_iter()
3025 .cloned()
3026 .collect::<Vec<_>>();
3027
3028 self.update_trust_for_reloaded_skills(&all_meta).await;
3029
3030 let all_meta_refs = all_meta.iter().collect::<Vec<_>>();
3031 self.rebuild_skill_matcher(&all_meta_refs).await;
3032
3033 let all_skills: Vec<Skill> = {
3034 let reg = self.services.skill.registry.read();
3035 reg.all_meta()
3036 .iter()
3037 .filter_map(|m| reg.skill(&m.name).ok())
3038 .collect()
3039 };
3040 let trust_map = self.build_skill_trust_map().await;
3041 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
3042 let skills_prompt =
3043 state::SkillState::rebuild_prompt(&all_skills, &trust_map, &empty_health);
3044 self.services
3045 .skill
3046 .last_skills_prompt
3047 .clone_from(&skills_prompt);
3048 let system_prompt = build_system_prompt(&skills_prompt, None);
3049 if let Some(msg) = self.msg.messages.first_mut() {
3050 msg.content = system_prompt;
3051 }
3052
3053 let _ = self.channel.send_status("").await;
3054 tracing::info!(
3055 "reloaded {} skill(s)",
3056 self.services.skill.registry.read().all_meta().len()
3057 );
3058 }
3059
3060 async fn reload_instructions(&mut self) {
3061 if let Some(ref mut rx) = self.runtime.instructions.reload_rx {
3063 while rx.try_recv().is_ok() {}
3064 }
3065 let Some(ref state) = self.runtime.instructions.reload_state else {
3066 return;
3067 };
3068 let base_dir = state.base_dir.clone();
3069 let provider_kinds = state.provider_kinds.clone();
3070 let explicit_files = state.explicit_files.clone();
3071 let auto_detect = state.auto_detect;
3072 let new_blocks = crate::instructions::load_instructions_async(
3073 base_dir,
3074 provider_kinds,
3075 explicit_files,
3076 auto_detect,
3077 )
3078 .await;
3079 let old_sources: std::collections::HashSet<_> = self
3080 .runtime
3081 .instructions
3082 .blocks
3083 .iter()
3084 .map(|b| &b.source)
3085 .collect();
3086 let new_sources: std::collections::HashSet<_> =
3087 new_blocks.iter().map(|b| &b.source).collect();
3088 for added in new_sources.difference(&old_sources) {
3089 tracing::info!(path = %added.display(), "instruction file added");
3090 }
3091 for removed in old_sources.difference(&new_sources) {
3092 tracing::info!(path = %removed.display(), "instruction file removed");
3093 }
3094 tracing::info!(
3095 old_count = self.runtime.instructions.blocks.len(),
3096 new_count = new_blocks.len(),
3097 "reloaded instruction files"
3098 );
3099 self.runtime.instructions.blocks = new_blocks;
3100 }
3101
3102 #[allow(clippy::too_many_lines)]
3103 fn reload_config(&mut self) {
3104 let Some(path) = self.runtime.lifecycle.config_path.clone() else {
3105 return;
3106 };
3107 let Some(config) = self.load_config_with_overlay(&path) else {
3108 return;
3109 };
3110 let budget_tokens = resolve_context_budget(&config, &self.provider);
3111 self.runtime.config.security = config.security;
3112 self.runtime.config.timeouts = config.timeouts;
3113 self.runtime.config.redact_credentials = config.memory.redact_credentials;
3114 self.services.memory.persistence.history_limit = config.memory.history_limit;
3115 self.services.memory.persistence.recall_limit = config.memory.semantic.recall_limit;
3116 self.services.memory.compaction.summarization_threshold =
3117 config.memory.summarization_threshold;
3118 self.services.skill.max_active_skills = config.skills.max_active_skills.get();
3119 self.services.skill.disambiguation_threshold = config.skills.disambiguation_threshold;
3120 self.services.skill.min_injection_score = config.skills.min_injection_score;
3121 self.services.skill.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
3122 self.services.skill.hybrid_search = config.skills.hybrid_search;
3123 self.services.skill.two_stage_matching = config.skills.two_stage_matching;
3124 self.services.skill.confusability_threshold =
3125 config.skills.confusability_threshold.clamp(0.0, 1.0);
3126 config
3127 .skills
3128 .generation_provider
3129 .as_str()
3130 .clone_into(&mut self.services.skill.generation_provider_name);
3131 config
3132 .skills
3133 .disambiguate_provider
3134 .as_str()
3135 .clone_into(&mut self.services.skill.disambiguate_provider_name);
3136 self.services.skill.generation_timeout_ms = config.skills.generation_timeout_ms;
3137 self.services.skill.generation_output_dir =
3138 config.skills.generation_output_dir.as_deref().map(|p| {
3139 if let Some(stripped) = p.strip_prefix("~/") {
3140 dirs::home_dir()
3141 .map_or_else(|| std::path::PathBuf::from(p), |h| h.join(stripped))
3142 } else {
3143 std::path::PathBuf::from(p)
3144 }
3145 });
3146
3147 self.context_manager.budget = Some(
3148 ContextBudget::new(budget_tokens, 0.20).with_graph_enabled(config.memory.graph.enabled),
3149 );
3150
3151 {
3152 let graph_cfg = &config.memory.graph;
3153 if graph_cfg.rpe.enabled {
3154 if self.services.memory.extraction.rpe_router.is_none() {
3156 self.services.memory.extraction.rpe_router =
3157 Some(std::sync::Mutex::new(zeph_memory::RpeRouter::new(
3158 graph_cfg.rpe.threshold,
3159 graph_cfg.rpe.max_skip_turns,
3160 )));
3161 }
3162 } else {
3163 self.services.memory.extraction.rpe_router = None;
3164 }
3165 self.services.memory.extraction.graph_config = graph_cfg.clone();
3166 }
3167 self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
3168 self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
3169 self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
3170 self.context_manager
3171 .set_compaction_cooldown_turns(config.memory.compaction_cooldown_turns);
3172 self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
3173 self.context_manager.compression = config.memory.compression.clone();
3174 self.context_manager.routing = config.memory.store_routing.clone();
3175 self.context_manager.store_routing_provider = if config
3177 .memory
3178 .store_routing
3179 .routing_classifier_provider
3180 .is_empty()
3181 {
3182 None
3183 } else {
3184 let resolved = self.resolve_background_provider(
3185 config
3186 .memory
3187 .store_routing
3188 .routing_classifier_provider
3189 .as_str(),
3190 );
3191 Some(std::sync::Arc::new(resolved))
3192 };
3193 self.services
3194 .memory
3195 .persistence
3196 .cross_session_score_threshold = config.memory.cross_session_score_threshold;
3197
3198 self.services.index.repo_map_tokens = config.index.repo_map_tokens;
3199 self.services.index.repo_map_ttl =
3200 std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
3201
3202 self.services
3203 .session
3204 .hooks_config
3205 .cwd_changed
3206 .clone_from(&config.hooks.cwd_changed);
3207 self.services
3208 .session
3209 .hooks_config
3210 .permission_denied
3211 .clone_from(&config.hooks.permission_denied);
3212 self.services
3213 .session
3214 .hooks_config
3215 .turn_complete
3216 .clone_from(&config.hooks.turn_complete);
3217 tracing::info!("config reloaded");
3220 }
3221
3222 fn load_config_with_overlay(&mut self, path: &std::path::Path) -> Option<Config> {
3226 let mut config = match Config::load(path) {
3227 Ok(c) => c,
3228 Err(e) => {
3229 tracing::warn!("config reload failed: {e:#}");
3230 return None;
3231 }
3232 };
3233
3234 let new_overlay = if self.runtime.lifecycle.plugins_dir.as_os_str().is_empty() {
3236 None
3237 } else {
3238 match zeph_plugins::apply_plugin_config_overlays(
3239 &mut config,
3240 &self.runtime.lifecycle.plugins_dir,
3241 ) {
3242 Ok(o) => Some(o),
3243 Err(e) => {
3244 tracing::warn!(
3245 "plugin overlay merge failed during reload: {e:#}; \
3246 keeping previous runtime state"
3247 );
3248 return None;
3249 }
3250 }
3251 };
3252
3253 if let Some(ref overlay) = new_overlay {
3257 self.warn_on_shell_overlay_divergence(overlay, &config);
3258 }
3259 Some(config)
3260 }
3261
3262 fn warn_on_shell_overlay_divergence(
3268 &self,
3269 new_overlay: &zeph_plugins::ResolvedOverlay,
3270 config: &Config,
3271 ) {
3272 let new_blocked: Vec<String> = {
3273 let mut v = config.tools.shell.blocked_commands.clone();
3274 v.sort();
3275 v
3276 };
3277 let new_allowed: Vec<String> = {
3278 let mut v = config.tools.shell.allowed_commands.clone();
3279 v.sort();
3280 v
3281 };
3282
3283 let startup = &self.runtime.lifecycle.startup_shell_overlay;
3284 let blocked_changed = new_blocked != startup.blocked;
3285 let allowed_changed = new_allowed != startup.allowed;
3286
3287 if blocked_changed && let Some(ref h) = self.runtime.lifecycle.shell_policy_handle {
3289 h.rebuild(&config.tools.shell);
3290 tracing::info!(
3291 blocked_count = h.snapshot_blocked().len(),
3292 "shell blocked_commands rebuilt from hot-reload"
3293 );
3294 }
3295
3296 if allowed_changed {
3303 let msg = "plugin config overlay changed shell allowed_commands; RESTART REQUIRED \
3304 for sandbox path recomputation (blocked_commands was rebuilt live)";
3305 tracing::warn!("{msg}");
3306 if let Some(ref tx) = self.services.session.status_tx {
3307 let _ = tx.send(msg.to_owned());
3308 }
3309 }
3310
3311 let _ = new_overlay;
3312 }
3313
3314 fn maybe_sidequest_eviction(&mut self) {
3325 if self.services.sidequest.config.enabled {
3329 use crate::config::PruningStrategy;
3330 if !matches!(
3331 self.context_manager.compression.pruning_strategy,
3332 PruningStrategy::Reactive
3333 ) {
3334 tracing::warn!(
3335 strategy = ?self.context_manager.compression.pruning_strategy,
3336 "sidequest is enabled alongside a non-Reactive pruning strategy; \
3337 consider disabling sidequest.enabled to avoid redundant eviction"
3338 );
3339 }
3340 }
3341
3342 if self.services.focus.is_active() {
3344 tracing::debug!("sidequest: skipping — focus session active");
3345 self.services.compression.pending_sidequest_result = None;
3347 return;
3348 }
3349
3350 self.sidequest_apply_pending();
3352
3353 self.sidequest_schedule_next();
3355 }
3356
3357 fn sidequest_apply_pending(&mut self) {
3358 let Some(handle) = self.services.compression.pending_sidequest_result.take() else {
3359 return;
3360 };
3361 let result = match handle.try_join() {
3364 Ok(result) => result,
3365 Err(_handle) => {
3366 tracing::debug!("sidequest: background LLM task not yet complete, rescheduling");
3368 return;
3369 }
3370 };
3371 match result {
3372 Ok(Some(evicted_indices)) if !evicted_indices.is_empty() => {
3373 let cursors_snapshot = self.services.sidequest.tool_output_cursors.clone();
3374 let freed = self.services.sidequest.apply_eviction(
3375 &mut self.msg.messages,
3376 &evicted_indices,
3377 &self.runtime.metrics.token_counter,
3378 );
3379 if freed > 0 {
3380 self.recompute_prompt_tokens();
3381 self.context_manager.set_compaction_state(
3384 crate::agent::context_manager::CompactionState::CompactedThisTurn {
3385 cooldown: 0,
3386 },
3387 );
3388 tracing::info!(
3389 freed_tokens = freed,
3390 evicted_cursors = evicted_indices.len(),
3391 pass = self.services.sidequest.passes_run,
3392 "sidequest eviction complete"
3393 );
3394 if let Some(ref d) = self.runtime.debug.debug_dumper {
3395 d.dump_sidequest_eviction(&cursors_snapshot, &evicted_indices, freed);
3396 }
3397 if let Some(ref tx) = self.services.session.status_tx {
3398 let _ = tx.send(format!("SideQuest evicted {freed} tokens"));
3399 }
3400 } else {
3401 if let Some(ref tx) = self.services.session.status_tx {
3403 let _ = tx.send(String::new());
3404 }
3405 }
3406 }
3407 Ok(None | Some(_)) => {
3408 tracing::debug!("sidequest: pending result: no cursors to evict");
3409 if let Some(ref tx) = self.services.session.status_tx {
3410 let _ = tx.send(String::new());
3411 }
3412 }
3413 Err(e) => {
3414 tracing::debug!("sidequest: background task error: {e}");
3415 if let Some(ref tx) = self.services.session.status_tx {
3416 let _ = tx.send(String::new());
3417 }
3418 }
3419 }
3420 }
3421
3422 fn sidequest_schedule_next(&mut self) {
3423 use zeph_llm::provider::{Message, MessageMetadata, Role};
3424
3425 self.services
3426 .sidequest
3427 .rebuild_cursors(&self.msg.messages, &self.runtime.metrics.token_counter);
3428
3429 if self.services.sidequest.tool_output_cursors.is_empty() {
3430 tracing::debug!("sidequest: no eligible cursors");
3431 return;
3432 }
3433
3434 let prompt = self.services.sidequest.build_eviction_prompt();
3435 let max_eviction_ratio = self.services.sidequest.config.max_eviction_ratio;
3436 let n_cursors = self.services.sidequest.tool_output_cursors.len();
3437 let provider = self.summary_or_primary_provider().clone();
3439
3440 let eviction_future = async move {
3441 let msgs = [Message {
3442 role: Role::User,
3443 content: prompt,
3444 parts: vec![],
3445 metadata: MessageMetadata::default(),
3446 }];
3447 let response =
3448 match tokio::time::timeout(std::time::Duration::from_secs(5), provider.chat(&msgs))
3449 .await
3450 {
3451 Ok(Ok(r)) => r,
3452 Ok(Err(e)) => {
3453 tracing::debug!("sidequest bg: LLM call failed: {e:#}");
3454 return None;
3455 }
3456 Err(_) => {
3457 tracing::debug!("sidequest bg: LLM call timed out");
3458 return None;
3459 }
3460 };
3461
3462 let start = response.find('{')?;
3463 let end = response.rfind('}')?;
3464 if start > end {
3465 return None;
3466 }
3467 let json_slice = &response[start..=end];
3468 let parsed: sidequest::EvictionResponse = serde_json::from_str(json_slice).ok()?;
3469 let mut valid: Vec<usize> = parsed
3470 .del_cursors
3471 .into_iter()
3472 .filter(|&c| c < n_cursors)
3473 .collect();
3474 valid.sort_unstable();
3475 valid.dedup();
3476 #[allow(
3477 clippy::cast_precision_loss,
3478 clippy::cast_possible_truncation,
3479 clippy::cast_sign_loss
3480 )]
3481 let max_evict = ((n_cursors as f32) * max_eviction_ratio).ceil() as usize;
3482 valid.truncate(max_evict);
3483 Some(valid)
3484 };
3485 let handle = self.runtime.lifecycle.task_supervisor.spawn_oneshot(
3486 std::sync::Arc::from("agent.sidequest.eviction"),
3487 move || eviction_future,
3488 );
3489 self.services.compression.pending_sidequest_result = Some(handle);
3490 tracing::debug!("sidequest: background LLM eviction task spawned");
3491 if let Some(ref tx) = self.services.session.status_tx {
3492 let _ = tx.send("SideQuest: scoring tool outputs...".into());
3493 }
3494 }
3495
3496 fn mcp_dispatch(&self) -> Option<McpManagerDispatch> {
3498 self.services
3499 .mcp
3500 .manager
3501 .as_ref()
3502 .map(|m| McpManagerDispatch(Arc::clone(m)))
3503 }
3504
3505 pub(crate) async fn check_cwd_changed(&mut self) {
3511 let current = match std::env::current_dir() {
3512 Ok(p) => p,
3513 Err(e) => {
3514 tracing::warn!("check_cwd_changed: failed to get cwd: {e}");
3515 return;
3516 }
3517 };
3518 if current == self.runtime.lifecycle.last_known_cwd {
3519 return;
3520 }
3521 let old_cwd =
3522 std::mem::replace(&mut self.runtime.lifecycle.last_known_cwd, current.clone());
3523 self.services.session.env_context.working_dir = current.display().to_string();
3524
3525 tracing::info!(
3526 old = %old_cwd.display(),
3527 new = %current.display(),
3528 "working directory changed"
3529 );
3530
3531 let _ = self
3532 .channel
3533 .send_status("Working directory changed\u{2026}")
3534 .await;
3535
3536 let hooks = self.services.session.hooks_config.cwd_changed.clone();
3537 if hooks.is_empty() {
3538 tracing::debug!("CwdChanged: no hooks configured, skipping");
3539 } else {
3540 tracing::debug!(count = hooks.len(), "CwdChanged: firing hooks");
3541 let mut env = std::collections::HashMap::new();
3542 env.insert("ZEPH_OLD_CWD".to_owned(), old_cwd.display().to_string());
3543 env.insert("ZEPH_NEW_CWD".to_owned(), current.display().to_string());
3544 let dispatch = self.mcp_dispatch();
3545 let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
3546 .as_ref()
3547 .map(|d| d as &dyn zeph_subagent::McpDispatch);
3548 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp, None).await {
3549 tracing::warn!(error = %e, "CwdChanged hook failed");
3550 } else {
3551 tracing::info!(count = hooks.len(), "CwdChanged: hooks fired");
3552 }
3553 }
3554
3555 let _ = self.channel.send_status("").await;
3556 }
3557
3558 pub(crate) async fn handle_file_changed(
3560 &mut self,
3561 event: crate::file_watcher::FileChangedEvent,
3562 ) {
3563 tracing::info!(path = %event.path.display(), "file changed");
3564
3565 let _ = self
3566 .channel
3567 .send_status("Running file-change hook\u{2026}")
3568 .await;
3569
3570 let hooks = self
3571 .services
3572 .session
3573 .hooks_config
3574 .file_changed_hooks
3575 .clone();
3576 if hooks.is_empty() {
3577 tracing::debug!(path = %event.path.display(), "FileChanged: no hooks configured, skipping");
3578 } else {
3579 tracing::debug!(count = hooks.len(), path = %event.path.display(), "FileChanged: firing hooks");
3580 let mut env = std::collections::HashMap::new();
3581 env.insert(
3582 "ZEPH_CHANGED_PATH".to_owned(),
3583 event.path.display().to_string(),
3584 );
3585 let dispatch = self.mcp_dispatch();
3586 let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
3587 .as_ref()
3588 .map(|d| d as &dyn zeph_subagent::McpDispatch);
3589 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp, None).await {
3590 tracing::warn!(error = %e, "FileChanged hook failed");
3591 } else {
3592 tracing::info!(count = hooks.len(), path = %event.path.display(), "FileChanged: hooks fired");
3593 }
3594 }
3595
3596 let _ = self.channel.send_status("").await;
3597 }
3598
3599 pub(super) fn maybe_spawn_promotion_scan(&mut self) {
3609 let Some(engine) = self.services.promotion_engine.clone() else {
3610 return;
3611 };
3612
3613 let Some(memory) = self.services.memory.persistence.memory.clone() else {
3614 return;
3615 };
3616
3617 let promotion_window = 200usize;
3620
3621 let accepted = self.runtime.lifecycle.supervisor.spawn(
3622 agent_supervisor::TaskClass::Enrichment,
3623 "compression_spectrum.promotion_scan",
3624 async move {
3625 let span = tracing::info_span!("memory.compression.promote.background");
3626 let _enter = span.enter();
3627
3628 let window = match memory.load_promotion_window(promotion_window).await {
3629 Ok(w) => w,
3630 Err(e) => {
3631 tracing::warn!(error = %e, "promotion scan: failed to load window");
3632 return;
3633 }
3634 };
3635
3636 if window.is_empty() {
3637 return;
3638 }
3639
3640 let candidates = match engine.scan(&window).await {
3641 Ok(c) => c,
3642 Err(e) => {
3643 tracing::warn!(error = %e, "promotion scan: clustering failed");
3644 return;
3645 }
3646 };
3647
3648 for candidate in &candidates {
3649 if let Err(e) = engine.promote(candidate).await {
3650 tracing::warn!(
3651 signature = %candidate.signature,
3652 error = %e,
3653 "promotion scan: promote failed"
3654 );
3655 }
3656 }
3657
3658 tracing::info!(candidates = candidates.len(), "promotion scan: complete");
3659 },
3660 );
3661
3662 if accepted {
3663 tracing::debug!("compression_spectrum: promotion scan task enqueued");
3664 }
3665 }
3666}
3667pub(crate) fn estimate_parts_size(m: &zeph_llm::provider::Message) -> usize {
3675 use zeph_llm::provider::MessagePart;
3676 if m.parts.is_empty() {
3677 return m.content.len();
3678 }
3679 m.parts
3680 .iter()
3681 .map(|p| match p {
3682 MessagePart::Text { text }
3683 | MessagePart::Recall { text }
3684 | MessagePart::CodeContext { text }
3685 | MessagePart::Summary { text }
3686 | MessagePart::CrossSession { text } => text.len(),
3687 MessagePart::ToolOutput { body, .. } => body.len(),
3688 MessagePart::ToolUse { id, name, input } => {
3689 50 + id.len() + name.len() + input.to_string().len()
3690 }
3691 MessagePart::ToolResult {
3692 tool_use_id,
3693 content,
3694 ..
3695 } => 50 + tool_use_id.len() + content.len(),
3696 MessagePart::Image(img) => img.data.len() * 4 / 3,
3697 MessagePart::ThinkingBlock {
3698 thinking,
3699 signature,
3700 } => 50 + thinking.len() + signature.len(),
3701 MessagePart::RedactedThinkingBlock { data } => data.len(),
3702 MessagePart::Compaction { summary } => summary.len(),
3703 })
3704 .sum()
3705}
3706
3707pub(crate) fn trim_parent_messages(msgs: &mut Vec<zeph_llm::provider::Message>, max_chars: usize) {
3726 use zeph_llm::provider::{MessagePart, Role};
3727
3728 let mut total_chars = 0usize;
3732 let mut drop_before = 0usize; for (i, m) in msgs.iter().enumerate().rev() {
3734 total_chars += estimate_parts_size(m);
3735 if total_chars > max_chars {
3736 drop_before = i + 1;
3737 break;
3738 }
3739 }
3740 if drop_before > 0 {
3741 msgs.drain(..drop_before);
3742 }
3743
3744 let emitted_tool_ids: std::collections::HashSet<String> = msgs
3748 .iter()
3749 .filter(|m| m.role == Role::Assistant)
3750 .flat_map(|m| m.parts.iter())
3751 .filter_map(|p| {
3752 if let MessagePart::ToolUse { id, .. } = p {
3753 Some(id.clone())
3754 } else {
3755 None
3756 }
3757 })
3758 .collect();
3759
3760 let mut orphans_removed = 0usize;
3761 for m in msgs.iter_mut() {
3762 if m.role != Role::User || m.parts.is_empty() {
3763 continue;
3764 }
3765 let before = m.parts.len();
3766 m.parts.retain(|p| match p {
3767 MessagePart::ToolResult { tool_use_id, .. } => {
3768 emitted_tool_ids.contains(tool_use_id.as_str())
3769 }
3770 _ => true,
3771 });
3772 let dropped = before - m.parts.len();
3773 if dropped > 0 {
3774 orphans_removed += dropped;
3775 if m.parts.is_empty() {
3776 m.content.clear();
3777 } else {
3778 m.rebuild_content();
3779 }
3780 }
3781 }
3782
3783 let consumed_tool_ids: std::collections::HashSet<String> = msgs
3791 .iter()
3792 .filter(|m| m.role == Role::User)
3793 .flat_map(|m| m.parts.iter())
3794 .filter_map(|p| {
3795 if let MessagePart::ToolResult { tool_use_id, .. } = p {
3796 Some(tool_use_id.clone())
3797 } else {
3798 None
3799 }
3800 })
3801 .collect();
3802
3803 let last_assistant_idx = msgs
3805 .iter()
3806 .enumerate()
3807 .rev()
3808 .find(|(_, m)| m.role == Role::Assistant)
3809 .map(|(i, _)| i);
3810
3811 for (idx, m) in msgs.iter_mut().enumerate() {
3812 if m.role != Role::Assistant || m.parts.is_empty() {
3813 continue;
3814 }
3815 if Some(idx) == last_assistant_idx {
3817 continue;
3818 }
3819 let before = m.parts.len();
3820 m.parts.retain(|p| match p {
3821 MessagePart::ToolUse { id, .. } => consumed_tool_ids.contains(id.as_str()),
3822 _ => true,
3823 });
3824 let dropped = before - m.parts.len();
3825 if dropped > 0 {
3826 orphans_removed += dropped;
3827 if m.parts.is_empty() {
3828 m.content.clear();
3829 } else {
3830 m.rebuild_content();
3831 }
3832 }
3833 }
3834
3835 msgs.retain(|m| !m.content.is_empty() || !m.parts.is_empty());
3837
3838 if orphans_removed > 0 {
3839 tracing::debug!(
3840 orphans = orphans_removed,
3841 "[subagent] pruned orphaned ToolUse/ToolResult parts from parent context boundary"
3842 );
3843 }
3844}
3845
3846fn sanitize_parent_messages(
3852 mut msgs: Vec<zeph_llm::provider::Message>,
3853 sanitizer: &zeph_sanitizer::ContentSanitizer,
3854 source: &zeph_sanitizer::ContentSource,
3855) -> Vec<zeph_llm::provider::Message> {
3856 use zeph_llm::provider::MessagePart;
3857 for msg in &mut msgs {
3858 let mut changed = false;
3859 for part in &mut msg.parts {
3860 if let MessagePart::Text { text } = part {
3861 let clean = sanitizer.sanitize(text, source.clone());
3862 if clean.body != *text {
3863 *text = clean.body;
3864 changed = true;
3865 }
3866 }
3867 }
3868 if changed {
3869 msg.rebuild_content();
3870 }
3871 }
3872 msgs
3873}
3874
3875struct McpManagerDispatch(Arc<zeph_mcp::McpManager>);
3880
3881impl zeph_subagent::McpDispatch for McpManagerDispatch {
3882 fn call_tool<'a>(
3883 &'a self,
3884 server: &'a str,
3885 tool: &'a str,
3886 args: serde_json::Value,
3887 ) -> std::pin::Pin<
3888 Box<dyn std::future::Future<Output = Result<serde_json::Value, String>> + Send + 'a>,
3889 > {
3890 Box::pin(async move {
3891 self.0
3892 .call_tool(server, tool, args)
3893 .await
3894 .map(|result| {
3895 let texts: Vec<serde_json::Value> = result
3897 .content
3898 .iter()
3899 .filter_map(|c| {
3900 if let rmcp::model::RawContent::Text(t) = &c.raw {
3901 Some(serde_json::Value::String(t.text.clone()))
3902 } else {
3903 None
3904 }
3905 })
3906 .collect();
3907 serde_json::Value::Array(texts)
3908 })
3909 .map_err(|e| e.to_string())
3910 })
3911 }
3912}
3913
3914pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
3915 while !*rx.borrow_and_update() {
3916 if rx.changed().await.is_err() {
3917 std::future::pending::<()>().await;
3918 }
3919 }
3920}
3921
3922pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
3923 match rx {
3924 Some(inner) => {
3925 if let Some(v) = inner.recv().await {
3926 Some(v)
3927 } else {
3928 *rx = None;
3929 std::future::pending().await
3930 }
3931 }
3932 None => std::future::pending().await,
3933 }
3934}
3935
3936fn truncate_shell_command(cmd: &str) -> String {
3943 if cmd.len() <= 80 {
3944 return cmd.to_owned();
3945 }
3946 let end = cmd.floor_char_boundary(79);
3947 format!("{}…", &cmd[..end])
3948}
3949
3950fn truncate_shell_run_id(id: &str) -> String {
3952 id.chars().take(8).collect()
3953}
3954
3955pub(crate) fn resolve_context_budget(config: &Config, provider: &AnyProvider) -> usize {
3956 let tokens = if config.memory.auto_budget && config.memory.context_budget_tokens == 0 {
3957 if let Some(ctx_size) = provider.context_window() {
3958 tracing::info!(
3959 model_context = ctx_size,
3960 "auto-configured context budget on reload"
3961 );
3962 ctx_size
3963 } else {
3964 0
3965 }
3966 } else {
3967 config.memory.context_budget_tokens
3968 };
3969 if tokens == 0 {
3970 tracing::warn!(
3971 "context_budget_tokens resolved to 0 on reload — using fallback of 128000 tokens"
3972 );
3973 128_000
3974 } else {
3975 tokens
3976 }
3977}
3978
3979#[cfg(test)]
3980mod tests;
3981
3982#[cfg(test)]
3983pub(crate) use tests::agent_tests;
3984
3985#[cfg(test)]
3986mod test_stubs {
3987 use std::pin::Pin;
3988
3989 use zeph_commands::{
3990 CommandContext, CommandError, CommandHandler, CommandOutput, SlashCategory,
3991 };
3992
3993 pub(super) struct TestErrorCommand;
3999
4000 impl CommandHandler<CommandContext<'_>> for TestErrorCommand {
4001 fn name(&self) -> &'static str {
4002 "/test-error"
4003 }
4004
4005 fn description(&self) -> &'static str {
4006 "Test stub: always returns CommandError"
4007 }
4008
4009 fn category(&self) -> SlashCategory {
4010 SlashCategory::Session
4011 }
4012
4013 fn handle<'a>(
4014 &'a self,
4015 _ctx: &'a mut CommandContext<'_>,
4016 _args: &'a str,
4017 ) -> Pin<
4018 Box<dyn std::future::Future<Output = Result<CommandOutput, CommandError>> + Send + 'a>,
4019 > {
4020 Box::pin(async { Err(CommandError::new("boom")) })
4021 }
4022 }
4023}