1mod acp_commands;
10mod agent_access_impl;
11pub(crate) mod agent_supervisor;
12mod autodream;
13mod autonomous_turn;
14mod builder;
15pub(crate) mod channel_impl;
16#[cfg(feature = "cocoon")]
17mod cocoon_cmd;
18mod command_context_impls;
19pub(super) mod compression_feedback;
20mod context;
21mod context_impls;
22pub(crate) mod context_manager;
23mod corrections;
24pub mod error;
25mod experiment_cmd;
26pub(crate) mod focus;
27mod heuristic_promotion;
28mod index;
29mod learning;
30pub(crate) mod learning_engine;
31mod log_commands;
32mod loop_event;
33mod lsp_commands;
34mod magic_docs;
35mod mcp;
36pub(crate) mod memcot;
37mod message_queue;
38mod microcompact;
39mod model_commands;
40mod persistence;
41#[cfg(feature = "scheduler")]
42mod plan;
43mod policy_commands;
44mod provider_cmd;
45mod quality_hook;
46pub(crate) mod rate_limiter;
47#[cfg(feature = "scheduler")]
48mod scheduler_commands;
49#[cfg(feature = "scheduler")]
50mod scheduler_loop;
51mod scope_commands;
52pub mod session_config;
53mod session_digest;
54pub mod shadow_sentinel;
55pub(crate) mod sidequest;
56mod skill_management;
57pub mod slash_commands;
58pub mod speculative;
59pub(crate) mod state;
60pub(crate) mod task_injection;
61pub(crate) mod tool_execution;
62pub(crate) mod tool_orchestrator;
63mod trace_extraction;
64pub mod trajectory;
65mod trajectory_commands;
66mod trust_commands;
67pub mod turn;
68mod utils;
69pub(crate) mod vigil;
70
71use std::collections::{HashMap, VecDeque};
72use std::fmt::Write as _;
73use std::sync::Arc;
74
75use parking_lot::RwLock;
76
77use tokio::sync::{mpsc, watch};
78use tokio_util::sync::CancellationToken;
79use zeph_llm::any::AnyProvider;
80use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
81use zeph_memory::TokenCounter;
82use zeph_memory::semantic::SemanticMemory;
83use zeph_skills::loader::Skill;
84use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
85use zeph_skills::prompt::format_skills_prompt;
86use zeph_skills::registry::SkillRegistry;
87use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
88
89use crate::channel::Channel;
90use crate::config::Config;
91use crate::context::{ContextBudget, build_system_prompt};
92use zeph_common::text::estimate_tokens;
93
94use loop_event::LoopEvent;
95use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, detect_image_mime};
96use state::MessageState;
97
98pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
99pub(crate) use zeph_agent_context::helpers::CODE_CONTEXT_PREFIX;
103pub(crate) const SCHEDULED_TASK_PREFIX: &str = "Execute the following scheduled task now: ";
104pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
105
106pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
107 use std::fmt::Write;
108 let capacity = "[tool output: ".len()
109 + tool_name.len()
110 + "]\n```\n".len()
111 + body.len()
112 + TOOL_OUTPUT_SUFFIX.len();
113 let mut buf = String::with_capacity(capacity);
114 let _ = write!(
115 buf,
116 "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
117 );
118 buf
119}
120
121pub struct Agent<C: Channel> {
154 provider: AnyProvider,
156 embedding_provider: AnyProvider,
161 channel: C,
162 pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
163
164 pub(super) msg: MessageState,
166 pub(super) context_manager: context_manager::ContextManager,
167 pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
168
169 pub(super) services: state::Services,
171
172 pub(super) runtime: state::AgentRuntime,
174}
175
176enum DispatchFlow {
178 Break,
180 Continue,
182 Fallthrough,
184}
185
186impl<C: Channel> Agent<C> {
187 #[must_use]
211 pub fn new(
212 provider: AnyProvider,
213 channel: C,
214 registry: SkillRegistry,
215 matcher: Option<SkillMatcherBackend>,
216 max_active_skills: usize,
217 tool_executor: impl ToolExecutor + 'static,
218 ) -> Self {
219 let registry = Arc::new(RwLock::new(registry));
220 let embedding_provider = provider.clone();
221 Self::new_with_registry_arc(
222 provider,
223 embedding_provider,
224 channel,
225 registry,
226 matcher,
227 max_active_skills,
228 tool_executor,
229 )
230 }
231
232 #[must_use]
239 pub fn new_with_registry_arc(
240 provider: AnyProvider,
241 embedding_provider: AnyProvider,
242 channel: C,
243 registry: Arc<RwLock<SkillRegistry>>,
244 matcher: Option<SkillMatcherBackend>,
245 max_active_skills: usize,
246 tool_executor: impl ToolExecutor + 'static,
247 ) -> Self {
248 use state::{
249 AgentRuntime, CompressionState, DebugState, ExperimentState, FeedbackState, IndexState,
250 InstructionState, LifecycleState, McpState, MemoryState, MetricsState,
251 OrchestrationState, ProviderState, RuntimeConfig, SecurityState, Services,
252 SessionState, SkillState, ToolState,
253 };
254
255 debug_assert!(max_active_skills > 0, "max_active_skills must be > 0");
256 let all_skills: Vec<Skill> = {
257 let reg = registry.read();
258 reg.all_meta()
259 .iter()
260 .filter_map(|m| reg.skill(&m.name).ok())
261 .collect()
262 };
263 let empty_trust = HashMap::new();
264 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
265 let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
266 let system_prompt = build_system_prompt(&skills_prompt, None);
267 tracing::debug!(len = system_prompt.len(), "initial system prompt built");
268 tracing::trace!(prompt = %system_prompt, "full system prompt");
269
270 let initial_prompt_tokens = estimate_tokens(&system_prompt) as u64;
271 let token_counter = Arc::new(TokenCounter::new());
272
273 let services = Services {
274 memory: MemoryState::default(),
275 skill: SkillState::new(registry, matcher, max_active_skills, skills_prompt),
276 learning_engine: learning_engine::LearningEngine::new(),
277 feedback: FeedbackState::default(),
278 mcp: McpState::default(),
279 index: IndexState::default(),
280 session: SessionState::new(),
281 security: SecurityState::default(),
282 experiments: ExperimentState::new(),
283 compression: CompressionState::default(),
284 orchestration: OrchestrationState::default(),
285 focus: focus::FocusState::default(),
286 sidequest: sidequest::SidequestState::default(),
287 tool_state: ToolState::default(),
288 goal_accounting: None,
289 quality: None,
290 proactive_explorer: None,
291 promotion_engine: None,
292 taco_compressor: None,
293 speculation_engine: None,
294 autonomous: crate::goal::AutonomousDriver::new(tokio::time::Duration::from_millis(500)),
295 autonomous_registry: crate::goal::AutonomousRegistry::new(),
296 };
297
298 let runtime = AgentRuntime {
299 config: RuntimeConfig::default(),
300 lifecycle: LifecycleState::new(),
301 providers: ProviderState::new(initial_prompt_tokens),
302 metrics: MetricsState::new(token_counter),
303 debug: DebugState::default(),
304 instructions: InstructionState::default(),
305 };
306
307 Self {
308 provider,
309 embedding_provider,
310 channel,
311 tool_executor: Arc::new(tool_executor),
312 msg: MessageState {
313 messages: vec![Message {
314 role: Role::System,
315 content: system_prompt,
316 parts: vec![],
317 metadata: MessageMetadata::default(),
318 }],
319 message_queue: VecDeque::new(),
320 pending_image_parts: Vec::new(),
321 last_persisted_message_id: None,
322 deferred_db_hide_ids: Vec::new(),
323 deferred_db_summaries: Vec::new(),
324 },
325 context_manager: context_manager::ContextManager::new(),
326 tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
327 services,
328 runtime,
329 }
330 }
331
332 #[must_use]
345 pub fn into_channel(self) -> C {
346 self.channel
347 }
348
349 #[tracing::instrument(name = "core.agent.poll_subagents", skip_all, level = "debug")]
354 pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
355 let Some(mgr) = &mut self.services.orchestration.subagent_manager else {
356 return vec![];
357 };
358
359 let finished: Vec<String> = mgr
360 .statuses()
361 .into_iter()
362 .filter_map(|(id, status)| {
363 if matches!(
364 status.state,
365 zeph_subagent::SubAgentState::Completed
366 | zeph_subagent::SubAgentState::Failed
367 | zeph_subagent::SubAgentState::Canceled
368 ) {
369 Some(id)
370 } else {
371 None
372 }
373 })
374 .collect();
375
376 let mut results = vec![];
377 for task_id in finished {
378 match mgr.collect(&task_id).await {
379 Ok(result) => results.push((task_id, result)),
380 Err(e) => {
381 tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
382 }
383 }
384 }
385 results
386 }
387
388 async fn call_llm_for_session_summary(
397 &self,
398 chat_messages: &[Message],
399 ) -> Option<zeph_memory::StructuredSummary> {
400 let provider = self.resolve_background_provider(
401 &self.services.memory.compaction.shutdown_summary_provider,
402 );
403 let timeout_dur = std::time::Duration::from_secs(
404 self.services
405 .memory
406 .compaction
407 .shutdown_summary_timeout_secs,
408 );
409 match tokio::time::timeout(
410 timeout_dur,
411 provider.chat_typed_erased::<zeph_memory::StructuredSummary>(chat_messages),
412 )
413 .await
414 {
415 Ok(Ok(s)) => Some(s),
416 Ok(Err(e)) => {
417 tracing::warn!(
418 "shutdown summary: structured LLM call failed, falling back to plain: {e:#}"
419 );
420 self.plain_text_summary_fallback(&provider, chat_messages, timeout_dur)
421 .await
422 }
423 Err(_) => {
424 tracing::warn!(
425 "shutdown summary: structured LLM call timed out after {}s, falling back to plain",
426 self.services
427 .memory
428 .compaction
429 .shutdown_summary_timeout_secs
430 );
431 self.plain_text_summary_fallback(&provider, chat_messages, timeout_dur)
432 .await
433 }
434 }
435 }
436
437 async fn plain_text_summary_fallback(
438 &self,
439 provider: &zeph_llm::any::AnyProvider,
440 chat_messages: &[Message],
441 timeout_dur: std::time::Duration,
442 ) -> Option<zeph_memory::StructuredSummary> {
443 match tokio::time::timeout(timeout_dur, provider.chat(chat_messages)).await {
444 Ok(Ok(plain)) => Some(zeph_memory::StructuredSummary {
445 summary: plain,
446 key_facts: vec![],
447 entities: vec![],
448 }),
449 Ok(Err(e)) => {
450 tracing::warn!("shutdown summary: plain LLM fallback failed: {e:#}");
451 None
452 }
453 Err(_) => {
454 tracing::warn!("shutdown summary: plain LLM fallback timed out");
455 None
456 }
457 }
458 }
459
460 async fn flush_orphaned_tool_use_on_shutdown(&mut self) {
465 use zeph_llm::provider::{MessagePart, Role};
466
467 let msgs = &self.msg.messages;
471 let Some(asst_idx) = msgs.iter().rposition(|m| m.role == Role::Assistant) else {
473 return;
474 };
475 let asst_msg = &msgs[asst_idx];
476 let tool_use_ids: Vec<(&str, &str, &serde_json::Value)> = asst_msg
477 .parts
478 .iter()
479 .filter_map(|p| {
480 if let MessagePart::ToolUse { id, name, input } = p {
481 Some((id.as_str(), name.as_str(), input))
482 } else {
483 None
484 }
485 })
486 .collect();
487 if tool_use_ids.is_empty() {
488 return;
489 }
490
491 let paired_ids: std::collections::HashSet<&str> = msgs
493 .get(asst_idx + 1..)
494 .into_iter()
495 .flatten()
496 .filter(|m| m.role == Role::User)
497 .flat_map(|m| m.parts.iter())
498 .filter_map(|p| {
499 if let MessagePart::ToolResult { tool_use_id, .. } = p {
500 Some(tool_use_id.as_str())
501 } else {
502 None
503 }
504 })
505 .collect();
506
507 let unpaired: Vec<zeph_llm::provider::ToolUseRequest> = tool_use_ids
508 .iter()
509 .filter(|(id, _, _)| !paired_ids.contains(*id))
510 .map(|(id, name, input)| zeph_llm::provider::ToolUseRequest {
511 id: (*id).to_owned(),
512 name: (*name).to_owned().into(),
513 input: (*input).clone(),
514 })
515 .collect();
516
517 if unpaired.is_empty() {
518 return;
519 }
520
521 tracing::info!(
522 count = unpaired.len(),
523 "shutdown: persisting tombstone ToolResults for unpaired in-flight tool calls"
524 );
525 self.persist_cancelled_tool_results(&unpaired).await;
526 }
527
528 async fn maybe_store_shutdown_summary(&mut self) {
538 if !self.services.memory.compaction.shutdown_summary {
539 return;
540 }
541 let Some(memory) = self.services.memory.persistence.memory.clone() else {
542 return;
543 };
544 let Some(conversation_id) = self.services.memory.persistence.conversation_id else {
545 return;
546 };
547
548 match memory.has_session_summary(conversation_id).await {
550 Ok(true) => {
551 tracing::debug!("shutdown summary: session already has a summary, skipping");
552 return;
553 }
554 Ok(false) => {}
555 Err(e) => {
556 tracing::warn!("shutdown summary: failed to check existing summary: {e:#}");
557 return;
558 }
559 }
560
561 let user_count = self
563 .msg
564 .messages
565 .iter()
566 .skip(1)
567 .filter(|m| m.role == Role::User)
568 .count();
569 if user_count
570 < self
571 .services
572 .memory
573 .compaction
574 .shutdown_summary_min_messages
575 {
576 tracing::debug!(
577 user_count,
578 min = self
579 .services
580 .memory
581 .compaction
582 .shutdown_summary_min_messages,
583 "shutdown summary: too few user messages, skipping"
584 );
585 return;
586 }
587
588 let _ = self.channel.send_status("Saving session summary...").await;
590
591 let max = self
593 .services
594 .memory
595 .compaction
596 .shutdown_summary_max_messages;
597 if max == 0 {
598 tracing::debug!("shutdown summary: max_messages=0, skipping");
599 return;
600 }
601 let non_system: Vec<_> = self.msg.messages.iter().skip(1).collect();
602 let slice = if non_system.len() > max {
603 &non_system[non_system.len() - max..]
604 } else {
605 &non_system[..]
606 };
607
608 let msgs_for_prompt: Vec<(zeph_memory::MessageId, String, String)> = slice
609 .iter()
610 .map(|m| {
611 let role = match m.role {
612 Role::User => "user".to_owned(),
613 Role::Assistant => "assistant".to_owned(),
614 Role::System => "system".to_owned(),
615 };
616 (zeph_memory::MessageId(0), role, m.content.clone())
617 })
618 .collect();
619
620 let prompt = zeph_memory::build_summarization_prompt(&msgs_for_prompt);
621 let chat_messages = vec![Message {
622 role: Role::User,
623 content: prompt,
624 parts: vec![],
625 metadata: MessageMetadata::default(),
626 }];
627
628 let Some(structured) = self.call_llm_for_session_summary(&chat_messages).await else {
629 let _ = self.channel.send_status("").await;
630 return;
631 };
632
633 if let Err(e) = memory
634 .store_shutdown_summary(conversation_id, &structured.summary, &structured.key_facts)
635 .await
636 {
637 tracing::warn!("shutdown summary: storage failed: {e:#}");
638 } else {
639 tracing::info!(
640 conversation_id = conversation_id.0,
641 "shutdown summary stored"
642 );
643 }
644
645 let _ = self.channel.send_status("").await;
647 }
648
649 #[tracing::instrument(name = "core.agent.shutdown", skip_all, level = "debug")]
665 pub async fn shutdown(&mut self) {
666 let _ = self.channel.send_status("Shutting down...").await;
667
668 self.provider.save_router_state().await;
670
671 if let Some(ref advisor) = self.services.orchestration.topology_advisor
673 && let Err(e) = advisor.save()
674 {
675 tracing::warn!(error = %e, "adaptorch: failed to persist state");
676 }
677
678 if let Some(ref mut mgr) = self.services.orchestration.subagent_manager {
679 mgr.shutdown_all();
680 }
681
682 if let Some(ref manager) = self.services.mcp.manager {
683 manager.shutdown_all_shared().await;
684 }
685
686 if let Some(turns) = self.context_manager.turns_since_last_hard_compaction() {
690 self.update_metrics(|m| {
691 m.compaction_turns_after_hard.push(turns);
692 });
693 self.context_manager
694 .set_turns_since_last_hard_compaction(None);
695 }
696
697 if let Some(ref tx) = self.runtime.metrics.metrics_tx {
698 let m = tx.borrow();
699 if m.filter_applications > 0 {
700 #[allow(clippy::cast_precision_loss)]
701 let pct = if m.filter_raw_tokens > 0 {
702 m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
703 } else {
704 0.0
705 };
706 tracing::info!(
707 raw_tokens = m.filter_raw_tokens,
708 saved_tokens = m.filter_saved_tokens,
709 applications = m.filter_applications,
710 "tool output filtering saved ~{} tokens ({pct:.0}%)",
711 m.filter_saved_tokens,
712 );
713 }
714 if m.compaction_hard_count > 0 {
715 tracing::info!(
716 hard_compactions = m.compaction_hard_count,
717 turns_after_hard = ?m.compaction_turns_after_hard,
718 "hard compaction trajectory"
719 );
720 }
721 }
722
723 self.flush_orphaned_tool_use_on_shutdown().await;
727
728 if let Some(ref token) = self.services.experiments.cancel {
731 token.cancel();
732 }
733 if let Some(h) = self.services.experiments.handle.take() {
734 h.abort();
735 }
736
737 if let Some(memory) = self.services.memory.persistence.memory.as_ref() {
741 memory.cancel_graph_extraction();
742 }
743
744 self.runtime.lifecycle.supervisor.abort_all();
746
747 if let Some(h) = self.services.compression.pending_task_goal.take() {
750 h.abort();
751 }
752 if let Some(h) = self.services.compression.pending_sidequest_result.take() {
753 h.abort();
754 }
755 if let Some(h) = self.services.compression.pending_subgoal.take() {
756 h.abort();
757 }
758
759 self.services.learning_engine.learning_tasks.abort_all();
761
762 if let Some(h) = self.services.learning_engine.trace_extraction_handle.take() {
765 let deadline = std::time::Duration::from_mins(2);
766 match tokio::time::timeout(deadline, h).await {
767 Ok(Ok(())) => {}
768 Ok(Err(e)) => tracing::warn!("trace_extraction: task panicked at shutdown: {e}"),
769 Err(_) => tracing::warn!(
770 "trace_extraction: timed out at shutdown ({}s), aborting",
771 deadline.as_secs()
772 ),
773 }
774 }
775
776 if let Some(h) = self
779 .services
780 .learning_engine
781 .heuristic_promotion_handle
782 .take()
783 {
784 h.abort();
785 }
786
787 if let Some(ref sentinel) = self.services.security.shadow_sentinel {
789 sentinel.drain_pending().await;
790 }
791
792 for _ in 0..4 {
797 tokio::task::yield_now().await;
798 }
799
800 self.maybe_store_shutdown_summary().await;
801 self.maybe_store_session_digest().await;
802
803 tracing::info!("agent shutdown complete");
804 }
805
806 fn refresh_subagent_metrics(&mut self) {
813 let Some(ref mgr) = self.services.orchestration.subagent_manager else {
814 return;
815 };
816 let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
817 .statuses()
818 .into_iter()
819 .map(|(id, s)| {
820 let def = mgr.agents_def(&id);
821 crate::metrics::SubAgentMetrics {
822 name: def.map_or_else(|| id[..8.min(id.len())].to_owned(), |d| d.name.clone()),
823 id: id.clone(),
824 state: format!("{:?}", s.state).to_lowercase(),
825 turns_used: s.turns_used,
826 max_turns: def.map_or(20, |d| d.permissions.max_turns),
827 background: def.is_some_and(|d| d.permissions.background),
828 elapsed_secs: s.started_at.elapsed().as_secs(),
829 permission_mode: def.map_or_else(String::new, |d| {
830 use zeph_subagent::def::PermissionMode;
831 match d.permissions.permission_mode {
832 PermissionMode::AcceptEdits => "accept_edits".into(),
833 PermissionMode::DontAsk => "dont_ask".into(),
834 PermissionMode::BypassPermissions => "bypass_permissions".into(),
835 PermissionMode::Plan => "plan".into(),
836 _ => String::new(),
837 }
838 }),
839 transcript_dir: mgr
840 .agent_transcript_dir(&id)
841 .map(|p| p.to_string_lossy().into_owned()),
842 }
843 })
844 .collect();
845 self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
846 }
847
848 async fn notify_completed_subagents(&mut self) -> Result<(), error::AgentError> {
850 let completed = self.poll_subagents().await;
851 for (task_id, result) in completed {
852 let notice = if result.is_empty() {
853 format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
854 } else {
855 format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
856 };
857 if let Err(e) = self.channel.send(¬ice).await {
858 tracing::warn!(error = %e, "failed to send sub-agent completion notice");
859 }
860 }
861 Ok(())
862 }
863
864 #[tracing::instrument(name = "core.agent.run", skip_all, level = "debug", err)]
870 #[allow(clippy::too_many_lines)] pub async fn run(&mut self) -> Result<(), error::AgentError>
872 where
873 C: 'static,
874 {
875 if let Some(mut rx) = self.runtime.lifecycle.warmup_ready.take()
876 && !*rx.borrow()
877 {
878 let _ = rx.changed().await;
879 if !*rx.borrow() {
880 tracing::warn!("model warmup did not complete successfully");
881 }
882 }
883
884 self.restore_channel_provider().await;
886
887 self.load_and_cache_session_digest().await;
889 self.maybe_send_resume_recap().await;
890
891 self.maybe_start_heuristic_promotion();
895
896 loop {
897 self.apply_provider_override();
898 self.check_tool_refresh().await;
899 self.process_pending_elicitations().await;
900 self.refresh_subagent_metrics();
901 self.notify_completed_subagents().await?;
902 self.drain_channel();
903
904 let (text, image_parts) = if let Some(queued) = self.msg.message_queue.pop_front() {
905 self.notify_queue_count().await;
906 if queued.raw_attachments.is_empty() {
907 (queued.text, queued.image_parts)
908 } else {
909 let msg = crate::channel::ChannelMessage {
910 text: queued.text,
911 attachments: queued.raw_attachments,
912 is_guest_context: false,
913 is_from_bot: false,
914 };
915 self.resolve_message(msg).await
916 }
917 } else {
918 match self.next_event().await? {
919 None | Some(LoopEvent::Shutdown) => break,
920 Some(LoopEvent::SkillReload) => {
921 self.reload_skills().await;
922 continue;
923 }
924 Some(LoopEvent::InstructionReload) => {
925 self.reload_instructions().await;
926 continue;
927 }
928 Some(LoopEvent::ConfigReload) => {
929 self.reload_config();
930 continue;
931 }
932 Some(LoopEvent::UpdateNotification(msg)) => {
933 if let Err(e) = self.channel.send(&msg).await {
934 tracing::warn!("failed to send update notification: {e}");
935 }
936 continue;
937 }
938 Some(LoopEvent::ExperimentCompleted(msg)) => {
939 self.services.experiments.cancel = None;
940 self.services.experiments.handle = None;
941 if let Err(e) = self.channel.send(&msg).await {
942 tracing::warn!("failed to send experiment completion: {e}");
943 }
944 continue;
945 }
946 Some(LoopEvent::ScheduledTask(prompt)) => {
947 let text = format!("{SCHEDULED_TASK_PREFIX}{prompt}");
948 let msg = crate::channel::ChannelMessage {
949 text,
950 attachments: Vec::new(),
951 is_guest_context: false,
952 is_from_bot: false,
953 };
954 self.drain_channel();
955 self.resolve_message(msg).await
956 }
957 Some(LoopEvent::TaskInjected(injection)) => {
958 if let Some(ref mut ls) = self.runtime.lifecycle.user_loop {
959 ls.iteration += 1;
960 tracing::info!(iteration = ls.iteration, "loop: tick");
961 }
962 let msg = crate::channel::ChannelMessage {
963 text: injection.prompt,
964 attachments: Vec::new(),
965 is_guest_context: false,
966 is_from_bot: false,
967 };
968 self.drain_channel();
969 self.resolve_message(msg).await
970 }
971 Some(LoopEvent::FileChanged(event)) => {
972 self.handle_file_changed(event).await;
973 continue;
974 }
975 Some(LoopEvent::AutonomousTick) => {
976 if let Err(e) = self.run_autonomous_turn().await {
977 tracing::warn!(error = %e, "autonomous turn error");
978 }
979 continue;
980 }
981 Some(LoopEvent::Message(msg)) => {
982 self.services.session.is_guest_context = msg.is_guest_context;
983 self.drain_channel();
984 self.resolve_message(msg).await
985 }
986 }
987 };
988
989 let trimmed = text.trim();
990
991 if trimmed.starts_with('/') {
994 let slash_urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
995 if !slash_urls.is_empty() {
996 self.services
997 .security
998 .user_provided_urls
999 .write()
1000 .extend(slash_urls);
1001 }
1002 }
1003
1004 let session_impl = command_context_impls::SessionAccessImpl {
1027 supports_exit: self.channel.supports_exit(),
1028 };
1029 let mut messages_impl = command_context_impls::MessageAccessImpl {
1030 msg: &mut self.msg,
1031 tool_state: &mut self.services.tool_state,
1032 providers: &mut self.runtime.providers,
1033 metrics: &self.runtime.metrics,
1034 security: &mut self.services.security,
1035 tool_orchestrator: &mut self.tool_orchestrator,
1036 };
1037 let mut sink_adapter = crate::channel::ChannelSinkAdapter(&mut self.channel);
1039 let mut null_agent = zeph_commands::NullAgent;
1041 let registry_handled = {
1042 use zeph_commands::CommandRegistry;
1043 use zeph_commands::handlers::debug::{
1044 DebugDumpCommand, DumpFormatCommand, LogCommand,
1045 };
1046 use zeph_commands::handlers::help::HelpCommand;
1047 use zeph_commands::handlers::session::{
1048 ClearCommand, ClearQueueCommand, ExitCommand, QuitCommand, ResetCommand,
1049 };
1050
1051 let mut reg = CommandRegistry::new();
1052 reg.register(ExitCommand);
1053 reg.register(QuitCommand);
1054 reg.register(ClearCommand);
1055 reg.register(ResetCommand);
1056 reg.register(ClearQueueCommand);
1057 reg.register(LogCommand);
1058 reg.register(DebugDumpCommand);
1059 reg.register(DumpFormatCommand);
1060 reg.register(HelpCommand);
1061 #[cfg(test)]
1062 reg.register(test_stubs::TestErrorCommand);
1063
1064 let mut ctx = zeph_commands::CommandContext {
1065 sink: &mut sink_adapter,
1066 debug: &mut self.runtime.debug,
1067 messages: &mut messages_impl,
1068 session: &session_impl,
1069 agent: &mut null_agent,
1070 };
1071 reg.dispatch(&mut ctx, trimmed).await
1072 };
1073 let session_reg_missed = registry_handled.is_none();
1074 match self
1075 .apply_dispatch_result(registry_handled, trimmed, false)
1076 .await
1077 {
1078 DispatchFlow::Break => break,
1079 DispatchFlow::Continue => continue,
1080 DispatchFlow::Fallthrough => {
1081 }
1083 }
1084
1085 let mut agent_null_debug = command_context_impls::NullDebugAccess;
1091 let mut agent_null_messages = command_context_impls::NullMessageAccess;
1092 let agent_null_session = command_context_impls::NullSessionAccess;
1093 let mut agent_null_sink = zeph_commands::NullSink;
1094 let agent_result: Option<
1095 Result<zeph_commands::CommandOutput, zeph_commands::CommandError>,
1096 > = if session_reg_missed {
1097 use zeph_commands::CommandRegistry;
1098 use zeph_commands::handlers::{
1099 acp::AcpCommand,
1100 agent_cmd::AgentCommand,
1101 agents_fleet::AgentsFleetCommand,
1102 compaction::{CompactCommand, NewConversationCommand, RecapCommand},
1103 experiment::ExperimentCommand,
1104 goal::GoalCommand,
1105 loop_cmd::LoopCommand,
1106 lsp::LspCommand,
1107 mcp::McpCommand,
1108 memory::{GraphCommand, GuidelinesCommand, MemoryCommand},
1109 misc::{CacheStatsCommand, ImageCommand, NotifyTestCommand},
1110 model::{ModelCommand, ProviderCommand},
1111 plan::PlanCommand,
1112 plugins::PluginsCommand,
1113 policy::PolicyCommand,
1114 scheduler::SchedulerCommand,
1115 skill::{FeedbackCommand, SkillCommand, SkillsCommand},
1116 status::{FocusCommand, GuardrailCommand, SideQuestCommand, StatusCommand},
1117 trajectory::{ScopeCommand, TrajectoryCommand},
1118 };
1119
1120 let mut agent_reg = CommandRegistry::new();
1121 agent_reg.register(MemoryCommand);
1122 agent_reg.register(GraphCommand);
1123 agent_reg.register(GuidelinesCommand);
1124 agent_reg.register(ModelCommand);
1125 agent_reg.register(ProviderCommand);
1126 agent_reg.register(SkillCommand);
1128 agent_reg.register(SkillsCommand);
1129 agent_reg.register(FeedbackCommand);
1130 agent_reg.register(McpCommand);
1131 agent_reg.register(PolicyCommand);
1132 agent_reg.register(SchedulerCommand);
1133 agent_reg.register(LspCommand);
1134 agent_reg.register(CacheStatsCommand);
1136 agent_reg.register(ImageCommand);
1137 agent_reg.register(NotifyTestCommand);
1138 agent_reg.register(StatusCommand);
1139 agent_reg.register(GuardrailCommand);
1140 agent_reg.register(FocusCommand);
1141 agent_reg.register(SideQuestCommand);
1142 agent_reg.register(AgentCommand);
1143 agent_reg.register(AgentsFleetCommand);
1144 agent_reg.register(CompactCommand);
1146 agent_reg.register(NewConversationCommand);
1147 agent_reg.register(RecapCommand);
1148 agent_reg.register(ExperimentCommand);
1149 agent_reg.register(PlanCommand);
1150 agent_reg.register(LoopCommand);
1151 agent_reg.register(PluginsCommand);
1152 agent_reg.register(AcpCommand);
1153 #[cfg(feature = "cocoon")]
1154 agent_reg.register(zeph_commands::handlers::cocoon::CocoonCommand);
1155 agent_reg.register(TrajectoryCommand);
1156 agent_reg.register(ScopeCommand);
1157 agent_reg.register(GoalCommand);
1158
1159 let mut ctx = zeph_commands::CommandContext {
1160 sink: &mut agent_null_sink,
1161 debug: &mut agent_null_debug,
1162 messages: &mut agent_null_messages,
1163 session: &agent_null_session,
1164 agent: self,
1165 };
1166 agent_reg.dispatch(&mut ctx, trimmed).await
1168 } else {
1169 None
1170 };
1171 if let Some((cancelled_id, new_id)) = self.services.autonomous.flush_pending_start() {
1177 if let Some(cid) = cancelled_id {
1178 tracing::info!(
1179 goal_id = cid,
1180 "autonomous: previous session cancelled for new goal"
1181 );
1182 }
1183 self.sync_registry_entry();
1184 tracing::info!(goal_id = new_id, "autonomous: session started");
1185 }
1186
1187 match self
1190 .apply_dispatch_result(agent_result, trimmed, true)
1191 .await
1192 {
1193 DispatchFlow::Break => break,
1194 DispatchFlow::Continue => continue,
1195 DispatchFlow::Fallthrough => {
1196 }
1198 }
1199
1200 match self.handle_builtin_command(trimmed) {
1201 Some(true) => break,
1202 Some(false) => continue,
1203 None => {}
1204 }
1205
1206 self.process_user_message(text, image_parts).await?;
1207 }
1208
1209 self.maybe_autodream().await;
1212
1213 self.maybe_extract_skills_from_trace().await;
1215
1216 if let Some(ref mut tc) = self.runtime.debug.trace_collector {
1218 tc.finish();
1219 }
1220
1221 Ok(())
1222 }
1223
1224 async fn apply_dispatch_result(
1230 &mut self,
1231 result: Option<Result<zeph_commands::CommandOutput, zeph_commands::CommandError>>,
1232 command: &str,
1233 with_learning: bool,
1234 ) -> DispatchFlow {
1235 match result {
1236 Some(Ok(zeph_commands::CommandOutput::Exit)) => {
1237 let _ = self.channel.flush_chunks().await;
1238 DispatchFlow::Break
1239 }
1240 Some(Ok(zeph_commands::CommandOutput::Message(msg))) => {
1241 let _ = self.channel.send(&msg).await;
1242 let _ = self.channel.flush_chunks().await;
1243 if with_learning {
1244 self.maybe_trigger_post_command_learning(command).await;
1245 }
1246 DispatchFlow::Continue
1247 }
1248 Some(Ok(_)) => {
1249 let _ = self.channel.flush_chunks().await;
1250 DispatchFlow::Continue
1251 }
1252 Some(Err(e)) => {
1253 let _ = self.channel.send(&e.to_string()).await;
1254 let _ = self.channel.flush_chunks().await;
1255 tracing::warn!(command = %command, error = %e.0, "slash command failed");
1256 DispatchFlow::Continue
1257 }
1258 None => DispatchFlow::Fallthrough,
1259 }
1260 }
1261
1262 fn apply_provider_override(&mut self) {
1264 if let Some(ref slot) = self.runtime.providers.provider_override
1265 && let Some(new_provider) = slot.write().take()
1266 {
1267 tracing::debug!(provider = new_provider.name(), "ACP model override applied");
1268 self.provider = new_provider;
1269 }
1270 }
1271
1272 #[tracing::instrument(name = "core.agent.next_event", skip_all, level = "debug", err)]
1280 async fn next_event(&mut self) -> Result<Option<LoopEvent>, error::AgentError> {
1281 let event = tokio::select! {
1282 result = self.channel.recv() => {
1283 return Ok(result?.map(LoopEvent::Message));
1284 }
1285 () = shutdown_signal(&mut self.runtime.lifecycle.shutdown) => {
1286 tracing::info!("shutting down");
1287 LoopEvent::Shutdown
1288 }
1289 Some(_) = recv_optional(&mut self.services.skill.skill_reload_rx) => {
1290 LoopEvent::SkillReload
1291 }
1292 Some(_) = recv_optional(&mut self.runtime.instructions.reload_rx) => {
1293 LoopEvent::InstructionReload
1294 }
1295 Some(_) = recv_optional(&mut self.runtime.lifecycle.config_reload_rx) => {
1296 LoopEvent::ConfigReload
1297 }
1298 Some(msg) = recv_optional(&mut self.runtime.lifecycle.update_notify_rx) => {
1299 LoopEvent::UpdateNotification(msg)
1300 }
1301 Some(msg) = recv_optional(&mut self.services.experiments.notify_rx) => {
1302 LoopEvent::ExperimentCompleted(msg)
1303 }
1304 Some(prompt) = recv_optional(&mut self.runtime.lifecycle.custom_task_rx) => {
1305 tracing::info!("scheduler: injecting custom task as agent turn");
1306 LoopEvent::ScheduledTask(prompt)
1307 }
1308 () = async {
1309 if let Some(ref mut ls) = self.runtime.lifecycle.user_loop {
1310 if ls.cancel_tx.is_cancelled() {
1311 std::future::pending::<()>().await;
1312 } else {
1313 ls.interval.tick().await;
1314 }
1315 } else {
1316 std::future::pending::<()>().await;
1317 }
1318 } => {
1319 let Some(ls) = self.runtime.lifecycle.user_loop.as_ref() else {
1323 return Ok(None);
1324 };
1325 if ls.cancel_tx.is_cancelled() {
1326 self.runtime.lifecycle.user_loop = None;
1327 return Ok(None);
1328 }
1329 let prompt = ls.prompt.clone();
1330 LoopEvent::TaskInjected(task_injection::TaskInjection { prompt })
1331 }
1332 Some(event) = recv_optional(&mut self.runtime.lifecycle.file_changed_rx) => {
1333 LoopEvent::FileChanged(event)
1334 }
1335 () = self.services.autonomous.next_tick(),
1337 if self.services.autonomous.should_tick() => {
1338 LoopEvent::AutonomousTick
1339 }
1340 };
1341 Ok(Some(event))
1342 }
1343
1344 #[tracing::instrument(name = "core.agent.resolve_message", skip_all, level = "debug")]
1345 async fn resolve_message(
1346 &self,
1347 msg: crate::channel::ChannelMessage,
1348 ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
1349 use crate::channel::{Attachment, AttachmentKind};
1350 use zeph_llm::provider::{ImageData, MessagePart};
1351
1352 let text_base = msg.text.clone();
1353
1354 let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
1355 .attachments
1356 .into_iter()
1357 .partition(|a| a.kind == AttachmentKind::Audio);
1358
1359 tracing::debug!(
1360 audio = audio_attachments.len(),
1361 has_stt = self.runtime.providers.stt.is_some(),
1362 "resolve_message attachments"
1363 );
1364
1365 let text = if !audio_attachments.is_empty()
1366 && let Some(stt) = self.runtime.providers.stt.as_ref()
1367 {
1368 let mut transcribed_parts = Vec::new();
1369 for attachment in &audio_attachments {
1370 if attachment.data.len() > MAX_AUDIO_BYTES {
1371 tracing::warn!(
1372 size = attachment.data.len(),
1373 max = MAX_AUDIO_BYTES,
1374 "audio attachment exceeds size limit, skipping"
1375 );
1376 continue;
1377 }
1378 match stt
1379 .transcribe(&attachment.data, attachment.filename.as_deref())
1380 .await
1381 {
1382 Ok(result) => {
1383 tracing::info!(
1384 len = result.text.len(),
1385 language = ?result.language,
1386 "audio transcribed"
1387 );
1388 transcribed_parts.push(result.text);
1389 }
1390 Err(e) => {
1391 tracing::error!(error = %e, "audio transcription failed");
1392 }
1393 }
1394 }
1395 if transcribed_parts.is_empty() {
1396 text_base
1397 } else {
1398 let transcribed = transcribed_parts.join("\n");
1399 if text_base.is_empty() {
1400 transcribed
1401 } else {
1402 format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
1403 }
1404 }
1405 } else {
1406 if !audio_attachments.is_empty() {
1407 tracing::warn!(
1408 count = audio_attachments.len(),
1409 "audio attachments received but no STT provider configured, dropping"
1410 );
1411 }
1412 text_base
1413 };
1414
1415 let mut image_parts = Vec::new();
1416 for attachment in image_attachments {
1417 if attachment.data.len() > MAX_IMAGE_BYTES {
1418 tracing::warn!(
1419 size = attachment.data.len(),
1420 max = MAX_IMAGE_BYTES,
1421 "image attachment exceeds size limit, skipping"
1422 );
1423 continue;
1424 }
1425 let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
1426 image_parts.push(MessagePart::Image(Box::new(ImageData {
1427 data: attachment.data,
1428 mime_type,
1429 })));
1430 }
1431
1432 (text, image_parts)
1433 }
1434
1435 fn begin_turn(&mut self, input: turn::TurnInput) -> turn::Turn {
1442 let id = turn::TurnId(self.runtime.debug.iteration_counter as u64);
1443 self.runtime.debug.iteration_counter += 1;
1444 let cancel_token = CancellationToken::new();
1445 self.runtime.lifecycle.cancel_token = cancel_token.clone();
1447 self.services.security.user_provided_urls.write().clear();
1448 self.runtime.lifecycle.turn_llm_requests = 0;
1450
1451 {
1454 use zeph_memory::shadow::{AuditSignalType as MageSignal, Severity as MageSev};
1455 let pending: Vec<u8> = {
1456 let mut q = self.services.security.trajectory_signal_queue.lock();
1457 std::mem::take(&mut *q)
1458 };
1459 self.services.security.mage_accumulator.advance_turn();
1460 for code in pending {
1461 self.services
1462 .security
1463 .trajectory
1464 .record(crate::agent::trajectory::RiskSignal::from_code(code));
1465 let mage_signal: Option<(MageSignal, MageSev)> = match code {
1468 1 => Some((MageSignal::PolicyViolation, MageSev::Medium)),
1469 2 => Some((MageSignal::ToolChainAnomaly, MageSev::Medium)),
1470 6 => Some((MageSignal::PromptInjectionPattern, MageSev::Medium)),
1471 7 => Some((MageSignal::PromptInjectionPattern, MageSev::High)),
1472 _ => None,
1473 };
1474 if let Some((sig, sev)) = mage_signal {
1475 self.services.security.mage_accumulator.ingest(sig, sev);
1476 }
1477 }
1478 }
1479 if self.services.security.trajectory.advance_turn()
1482 && let Some(logger) = self.tool_orchestrator.audit_logger.clone()
1483 {
1484 let entry = zeph_tools::AuditEntry {
1485 timestamp: zeph_tools::chrono_now(),
1486 tool: "<sentinel>".to_owned().into(),
1487 command: String::new(),
1488 result: zeph_tools::AuditResult::Success,
1489 duration_ms: 0,
1490 error_category: Some("trajectory_auto_recover".to_owned()),
1491 error_domain: Some("security".to_owned()),
1492 error_phase: None,
1493 claim_source: None,
1494 mcp_server_id: None,
1495 injection_flagged: false,
1496 embedding_anomalous: false,
1497 cross_boundary_mcp_to_acp: false,
1498 adversarial_policy_decision: None,
1499 exit_code: None,
1500 truncated: false,
1501 caller_id: None,
1502 policy_match: None,
1503 correlation_id: None,
1504 vigil_risk: None,
1505 execution_env: None,
1506 resolved_cwd: None,
1507 scope_at_definition: None,
1508 scope_at_dispatch: None,
1509 };
1510 self.runtime.lifecycle.supervisor.spawn(
1511 crate::agent::agent_supervisor::TaskClass::Telemetry,
1512 "trajectory-auto-recover-audit",
1513 async move { logger.log(&entry).await },
1514 );
1515 }
1516 if let Some(ref sentinel) = self.services.security.shadow_sentinel {
1518 sentinel.advance_turn();
1519 }
1520 if let Some(ref acc) = self.services.security.risk_chain_accumulator {
1522 acc.reset();
1523 }
1524 let risk_level = self.services.security.trajectory.current_risk();
1526 *self.services.security.trajectory_risk_slot.write() = u8::from(risk_level);
1527 if let Some(alert) = self.services.security.trajectory.poll_alert() {
1529 let msg = format!(
1530 "[trajectory] Risk level: {:?} (score={:.2})",
1531 alert.level, alert.score
1532 );
1533 tracing::warn!(
1534 level = ?alert.level,
1535 score = alert.score,
1536 "trajectory sentinel alert"
1537 );
1538 if let Some(ref tx) = self.services.session.status_tx {
1539 let _ = tx.send(msg);
1540 }
1541 }
1542
1543 let context = turn::TurnContext::new(id, cancel_token, self.runtime.config.timeouts)
1544 .with_tool_allowlist(self.runtime.config.channel_tool_allowlist.clone());
1545 turn::Turn::new(context, input)
1546 }
1547
1548 fn end_turn(&mut self, turn: turn::Turn) {
1555 self.runtime.metrics.pending_timings = turn.metrics.timings;
1556 self.flush_turn_timings();
1557 self.services.session.current_turn_intent = None;
1559 self.services.session.is_guest_context = false;
1561 if let Some(ref engine) = self.services.speculation_engine {
1563 let metrics = engine.end_turn();
1564 if metrics.committed > 0 || metrics.cancelled > 0 {
1565 tracing::debug!(
1566 committed = metrics.committed,
1567 cancelled = metrics.cancelled,
1568 wasted_ms = metrics.wasted_ms,
1569 "speculation: turn boundary metrics"
1570 );
1571 }
1572 }
1573 }
1574
1575 #[tracing::instrument(
1576 name = "core.agent.process_user_message",
1577 skip_all,
1578 level = "debug",
1579 fields(turn_id),
1580 err
1581 )]
1582 async fn process_user_message(
1583 &mut self,
1584 text: String,
1585 image_parts: Vec<zeph_llm::provider::MessagePart>,
1586 ) -> Result<(), error::AgentError> {
1587 let input = turn::TurnInput::new(text, image_parts);
1588 let mut t = self.begin_turn(input);
1589
1590 let turn_idx = usize::try_from(t.id().0).unwrap_or(usize::MAX);
1591 tracing::Span::current().record("turn_id", t.id().0);
1592 self.runtime
1594 .debug
1595 .start_iteration_span(turn_idx, t.input.text.trim());
1596
1597 let result = Box::pin(self.process_user_message_inner(&mut t)).await;
1598
1599 let span_status = if result.is_ok() {
1601 crate::debug_dump::trace::SpanStatus::Ok
1602 } else {
1603 crate::debug_dump::trace::SpanStatus::Error {
1604 message: "iteration failed".to_owned(),
1605 }
1606 };
1607 self.runtime.debug.end_iteration_span(turn_idx, span_status);
1608
1609 self.end_turn(t);
1610 result
1611 }
1612
1613 #[tracing::instrument(
1614 name = "core.agent.process_user_message_inner",
1615 skip_all,
1616 level = "debug",
1617 err
1618 )]
1619 async fn process_user_message_inner(
1620 &mut self,
1621 turn: &mut turn::Turn,
1622 ) -> Result<(), error::AgentError> {
1623 self.reap_background_tasks_and_update_metrics();
1624
1625 let tokens_before_turn = self
1626 .runtime
1627 .metrics
1628 .metrics_tx
1629 .as_ref()
1630 .map_or(0, |tx| tx.borrow().total_tokens);
1631
1632 self.drain_background_completions();
1636
1637 self.wire_cancel_bridge(turn.cancel_token());
1638
1639 let text = turn.input.text.clone();
1641 let trimmed_owned = text.trim().to_owned();
1642 let trimmed = trimmed_owned.as_str();
1643
1644 if self.services.security.vigil.is_some() {
1647 let intent_len = trimmed.floor_char_boundary(1024.min(trimmed.len()));
1648 self.services.session.current_turn_intent = Some(trimmed[..intent_len].to_owned());
1649 }
1650
1651 if let Some(result) = self.dispatch_slash_command(trimmed).await {
1652 return result;
1653 }
1654
1655 self.check_pending_rollbacks().await;
1656
1657 if self.pre_process_security(trimmed).await? {
1658 return Ok(());
1659 }
1660
1661 let t_ctx = std::time::Instant::now();
1662 tracing::debug!("turn timing: prepare_context start");
1663 self.advance_context_lifecycle_guarded(&text, trimmed).await;
1664 turn.metrics_mut().timings.prepare_context_ms =
1665 u64::try_from(t_ctx.elapsed().as_millis()).unwrap_or(u64::MAX);
1666 tracing::debug!(
1667 ms = turn.metrics_snapshot().timings.prepare_context_ms,
1668 "turn timing: prepare_context done"
1669 );
1670 let _ = self
1672 .channel
1673 .send_context_estimate(
1674 usize::try_from(self.runtime.providers.cached_prompt_tokens).unwrap_or(usize::MAX),
1675 )
1676 .await;
1677
1678 let image_parts = std::mem::take(&mut turn.input.image_parts);
1679 let merged_text = self.build_user_message_text_with_bg_completions(&text);
1683 let user_msg = self.build_user_message(&merged_text, image_parts);
1684
1685 let urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
1688 if !urls.is_empty() {
1689 self.services
1690 .security
1691 .user_provided_urls
1692 .write()
1693 .extend(urls);
1694 }
1695
1696 self.services.memory.extraction.goal_text = Some(text.clone());
1699
1700 let t_persist = std::time::Instant::now();
1701 tracing::debug!("turn timing: persist_message(user) start");
1702 self.persist_message(Role::User, &text, &[], false).await;
1704 turn.metrics_mut().timings.persist_message_ms =
1705 u64::try_from(t_persist.elapsed().as_millis()).unwrap_or(u64::MAX);
1706 tracing::debug!(
1707 ms = turn.metrics_snapshot().timings.persist_message_ms,
1708 "turn timing: persist_message(user) done"
1709 );
1710 self.push_message(user_msg);
1711
1712 let context_estimate = self.runtime.providers.cached_prompt_tokens;
1714 self.update_metrics(|m| m.context_tokens = context_estimate);
1715
1716 tracing::debug!("turn timing: process_response start");
1719 let turn_had_error = if let Err(e) = self.process_response().await {
1720 self.services.learning_engine.learning_tasks.detach_all();
1722 tracing::error!("Response processing failed: {e:#}");
1723
1724 if e.is_no_providers() {
1727 self.runtime.lifecycle.last_no_providers_at = Some(std::time::Instant::now());
1728 let backoff_secs = self.runtime.config.timeouts.no_providers_backoff_secs;
1729 tracing::warn!(
1730 backoff_secs,
1731 "no providers available; backing off before next turn"
1732 );
1733 tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await;
1734 }
1735
1736 let user_msg = format!("Error: {e:#}");
1737 self.channel.send(&user_msg).await?;
1738 self.msg.messages.pop();
1739 self.recompute_prompt_tokens();
1740 self.channel.flush_chunks().await?;
1741 true
1742 } else {
1743 self.services.learning_engine.learning_tasks.detach_all();
1746 self.truncate_old_tool_results();
1747 self.maybe_update_magic_docs();
1749 self.maybe_spawn_promotion_scan();
1751 false
1752 };
1753 tracing::debug!("turn timing: process_response done");
1754
1755 if let Some(pipeline) = self.services.quality.clone() {
1757 self.run_self_check_for_turn(pipeline, turn.id().0).await;
1758 }
1759 let _ = self.channel.flush_chunks().await;
1764
1765 self.maybe_fire_completion_notification(turn, turn_had_error);
1766
1767 self.flush_goal_accounting(tokens_before_turn);
1768
1769 turn.metrics_mut().timings.llm_chat_ms = self.runtime.metrics.pending_timings.llm_chat_ms;
1774 turn.metrics_mut().timings.tool_exec_ms = self.runtime.metrics.pending_timings.tool_exec_ms;
1775
1776 Ok(())
1777 }
1778
1779 fn wire_cancel_bridge(&mut self, turn_token: &tokio_util::sync::CancellationToken) {
1785 let signal = Arc::clone(&self.runtime.lifecycle.cancel_signal);
1786 let token = turn_token.clone();
1787 self.runtime.lifecycle.cancel_token = turn_token.clone();
1789 if let Some(prev) = self.runtime.lifecycle.cancel_bridge_handle.take() {
1790 prev.abort();
1791 }
1792 self.runtime.lifecycle.cancel_bridge_handle =
1793 Some(self.runtime.lifecycle.task_supervisor.spawn_oneshot(
1794 std::sync::Arc::from("agent.lifecycle.cancel_bridge"),
1795 move || async move {
1796 signal.notified().await;
1797 token.cancel();
1798 },
1799 ));
1800 }
1801
1802 fn reap_background_tasks_and_update_metrics(&mut self) {
1806 let bg_signal = self.runtime.lifecycle.supervisor.reap();
1807 if bg_signal.did_summarize {
1808 self.services.memory.persistence.unsummarized_count = 0;
1809 tracing::debug!("background summarization completed; unsummarized_count reset");
1810 }
1811 let snap = self.runtime.lifecycle.supervisor.metrics_snapshot();
1812 self.update_metrics(|m| {
1813 m.bg_inflight = snap.inflight as u64;
1814 m.bg_dropped = snap.total_dropped();
1815 m.bg_completed = snap.total_completed();
1816 m.bg_enrichment_inflight = snap.class_inflight[0] as u64;
1817 m.bg_telemetry_inflight = snap.class_inflight[1] as u64;
1818 });
1819
1820 if self.runtime.lifecycle.shell_executor_handle.is_some() {
1822 let shell_rows: Vec<crate::metrics::ShellBackgroundRunRow> = self
1823 .runtime
1824 .lifecycle
1825 .shell_executor_handle
1826 .as_ref()
1827 .map(|e| e.background_runs_snapshot())
1828 .unwrap_or_default()
1829 .into_iter()
1830 .map(|s| crate::metrics::ShellBackgroundRunRow {
1831 run_id: truncate_shell_run_id(&s.run_id),
1832 command: truncate_shell_command(&s.command),
1833 elapsed_secs: s.elapsed_ms / 1000,
1834 })
1835 .collect();
1836 self.update_metrics(|m| {
1837 m.shell_background_runs = shell_rows;
1838 });
1839 }
1840
1841 if self
1844 .runtime
1845 .config
1846 .supervisor_config
1847 .abort_enrichment_on_turn
1848 {
1849 self.runtime
1850 .lifecycle
1851 .supervisor
1852 .abort_class(agent_supervisor::TaskClass::Enrichment);
1853 }
1854 }
1855
1856 fn maybe_fire_completion_notification(&mut self, turn: &turn::Turn, is_error: bool) {
1869 let snap = turn.metrics_snapshot().timings.clone();
1870 let duration_ms = snap
1871 .prepare_context_ms
1872 .saturating_add(snap.llm_chat_ms)
1873 .saturating_add(snap.tool_exec_ms);
1874 let summary = crate::notifications::TurnSummary {
1875 duration_ms,
1876 preview: self.last_assistant_preview(160),
1877 tool_calls: 0,
1879 llm_requests: self.runtime.lifecycle.turn_llm_requests,
1880 exit_status: if is_error {
1881 crate::notifications::TurnExitStatus::Error
1882 } else {
1883 crate::notifications::TurnExitStatus::Success
1884 },
1885 };
1886
1887 let gate_ok = self
1889 .runtime
1890 .lifecycle
1891 .notifier
1892 .as_ref()
1893 .is_none_or(|n| n.should_fire(&summary));
1894
1895 if let Some(ref notifier) = self.runtime.lifecycle.notifier
1897 && gate_ok
1898 {
1899 notifier.fire(&summary);
1900 }
1901
1902 let hooks = self.services.session.hooks_config.turn_complete.clone();
1907 if !hooks.is_empty() && gate_ok {
1908 let mut env = std::collections::HashMap::new();
1909 env.insert(
1910 "ZEPH_TURN_DURATION_MS".to_owned(),
1911 summary.duration_ms.to_string(),
1912 );
1913 env.insert(
1914 "ZEPH_TURN_STATUS".to_owned(),
1915 if is_error { "error" } else { "success" }.to_owned(),
1916 );
1917 env.insert("ZEPH_TURN_PREVIEW".to_owned(), summary.preview.clone());
1918 env.insert(
1919 "ZEPH_TURN_LLM_REQUESTS".to_owned(),
1920 summary.llm_requests.to_string(),
1921 );
1922 let dispatch = self.mcp_dispatch();
1923 let _span = tracing::info_span!("core.agent.turn_hooks").entered();
1924 let _accepted = self.runtime.lifecycle.supervisor.spawn(
1925 agent_supervisor::TaskClass::Telemetry,
1926 "turn-complete-hooks",
1927 async move {
1928 let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
1929 .as_ref()
1930 .map(|d| d as &dyn zeph_subagent::McpDispatch);
1931 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp, None).await
1932 {
1933 tracing::warn!(error = %e, "turn_complete hook failed");
1934 }
1935 },
1936 );
1937 }
1938 }
1939
1940 fn flush_goal_accounting(&mut self, tokens_before: u64) {
1943 let goal_snap = self
1944 .services
1945 .goal_accounting
1946 .as_ref()
1947 .and_then(|a| a.snapshot());
1948 self.update_metrics(|m| m.active_goal = goal_snap);
1949
1950 if let Some(ref accounting) = self.services.goal_accounting {
1951 let tokens_after = self
1952 .runtime
1953 .metrics
1954 .metrics_tx
1955 .as_ref()
1956 .map_or(0, |tx| tx.borrow().total_tokens);
1957 let turn_tokens = tokens_after.saturating_sub(tokens_before);
1958 let mut spawned: Option<
1959 std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + 'static>>,
1960 > = None;
1961 accounting.on_turn_complete(turn_tokens, |fut| {
1962 spawned = Some(fut);
1963 });
1964 if let Some(fut) = spawned {
1965 let _ = self.runtime.lifecycle.supervisor.spawn(
1966 agent_supervisor::TaskClass::Telemetry,
1967 "goal-accounting",
1968 fut,
1969 );
1970 }
1971 }
1972 }
1973
1974 #[tracing::instrument(
1976 name = "core.agent.pre_process_security",
1977 skip_all,
1978 level = "debug",
1979 err
1980 )]
1981 async fn pre_process_security(&mut self, trimmed: &str) -> Result<bool, error::AgentError> {
1982 if let Some(ref guardrail) = self.services.security.guardrail {
1984 use zeph_sanitizer::guardrail::GuardrailVerdict;
1985 let verdict = guardrail.check(trimmed).await;
1986 match &verdict {
1987 GuardrailVerdict::Flagged { reason, .. } => {
1988 tracing::warn!(
1989 reason = %reason,
1990 should_block = verdict.should_block(),
1991 "guardrail flagged user input"
1992 );
1993 if verdict.should_block() {
1994 let msg = format!("[guardrail] Input blocked: {reason}");
1995 let _ = self.channel.send(&msg).await;
1996 let _ = self.channel.flush_chunks().await;
1997 return Ok(true);
1998 }
1999 let _ = self
2001 .channel
2002 .send(&format!("[guardrail] Warning: {reason}"))
2003 .await;
2004 }
2005 GuardrailVerdict::Error { error } => {
2006 if guardrail.error_should_block() {
2007 tracing::warn!(%error, "guardrail check failed (fail_strategy=closed), blocking input");
2008 let msg = "[guardrail] Input blocked: check failed (see logs for details)";
2009 let _ = self.channel.send(msg).await;
2010 let _ = self.channel.flush_chunks().await;
2011 return Ok(true);
2012 }
2013 tracing::warn!(%error, "guardrail check failed (fail_strategy=open), allowing input");
2014 }
2015 GuardrailVerdict::Safe => {}
2016 }
2017 }
2018
2019 #[cfg(feature = "classifiers")]
2025 if self.services.security.sanitizer.scan_user_input() {
2026 match self
2027 .services
2028 .security
2029 .sanitizer
2030 .classify_injection(trimmed)
2031 .await
2032 {
2033 zeph_sanitizer::InjectionVerdict::Blocked => {
2034 self.push_classifier_metrics();
2035 let _ = self
2036 .channel
2037 .send("[security] Input blocked: injection detected by classifier.")
2038 .await;
2039 let _ = self.channel.flush_chunks().await;
2040 return Ok(true);
2041 }
2042 zeph_sanitizer::InjectionVerdict::Suspicious => {
2043 tracing::warn!("injection_classifier soft_signal on user input");
2044 }
2045 zeph_sanitizer::InjectionVerdict::Clean => {}
2046 _ => {}
2047 }
2048 }
2049 #[cfg(feature = "classifiers")]
2050 self.push_classifier_metrics();
2051
2052 Ok(false)
2053 }
2054
2055 async fn advance_context_lifecycle_guarded(&mut self, text: &str, trimmed: &str) {
2062 let backoff_secs = self.runtime.config.timeouts.no_providers_backoff_secs;
2063 let prep_timeout_secs = self.runtime.config.timeouts.context_prep_timeout_secs;
2064
2065 let providers_recently_failed = self
2067 .runtime
2068 .lifecycle
2069 .last_no_providers_at
2070 .is_some_and(|t| t.elapsed().as_secs() < backoff_secs);
2071
2072 if providers_recently_failed {
2073 tracing::warn!(
2074 backoff_secs,
2075 "skipping context preparation: providers were unavailable on last turn"
2076 );
2077 return;
2078 }
2079
2080 let timeout_dur = std::time::Duration::from_secs(prep_timeout_secs);
2081 match tokio::time::timeout(timeout_dur, self.advance_context_lifecycle(text, trimmed)).await
2082 {
2083 Ok(()) => {}
2084 Err(_elapsed) => {
2085 tracing::warn!(
2086 timeout_secs = prep_timeout_secs,
2087 "context preparation timed out; proceeding with degraded context"
2088 );
2089 }
2090 }
2091 }
2092
2093 #[tracing::instrument(
2094 name = "core.agent.advance_context_lifecycle",
2095 skip_all,
2096 level = "debug"
2097 )]
2098 async fn advance_context_lifecycle(&mut self, text: &str, trimmed: &str) {
2099 self.services.mcp.pruning_cache.reset();
2101
2102 let conv_id = self.services.memory.persistence.conversation_id;
2105 self.rebuild_system_prompt(text).await;
2106
2107 self.detect_and_record_corrections(trimmed, conv_id).await;
2108 self.services.learning_engine.tick();
2109 self.analyze_and_learn().await;
2110 self.sync_graph_counts().await;
2111
2112 self.context_manager
2117 .set_compaction_state(self.context_manager.compaction_state().advance_turn());
2118
2119 {
2121 self.services.focus.tick();
2122
2123 let sidequest_should_fire = self.services.sidequest.tick();
2126 if sidequest_should_fire
2127 && !self
2128 .context_manager
2129 .compaction_state()
2130 .is_compacted_this_turn()
2131 {
2132 self.maybe_sidequest_eviction();
2133 }
2134 }
2135
2136 {
2139 let cfg = &self.services.memory.extraction.graph_config.experience;
2140 if cfg.enabled
2141 && cfg.evolution_sweep_enabled
2142 && cfg.evolution_sweep_interval > 0
2143 && self
2144 .services
2145 .sidequest
2146 .turn_counter
2147 .checked_rem(cfg.evolution_sweep_interval as u64)
2148 == Some(0)
2149 && let Some(memory) = self.services.memory.persistence.memory.as_ref()
2150 && let (Some(exp), Some(graph)) =
2151 (memory.experience.as_ref(), memory.graph_store.as_ref())
2152 {
2153 let exp = std::sync::Arc::clone(exp);
2154 let graph = std::sync::Arc::clone(graph);
2155 let threshold = cfg.confidence_prune_threshold;
2156 let turn = self.services.sidequest.turn_counter;
2157 let accepted = self.runtime.lifecycle.supervisor.spawn(
2158 agent_supervisor::TaskClass::Telemetry,
2159 "experience-sweep",
2160 async move {
2161 match exp.evolution_sweep(graph.as_ref(), threshold).await {
2162 Ok(stats) => tracing::info!(
2163 turn,
2164 self_loops = stats.pruned_self_loops,
2165 low_confidence = stats.pruned_low_confidence,
2166 "evolution sweep complete",
2167 ),
2168 Err(e) => tracing::warn!(
2169 turn,
2170 error = %e,
2171 "evolution sweep failed",
2172 ),
2173 }
2174 },
2175 );
2176 if !accepted {
2177 tracing::warn!(
2178 turn = self.services.sidequest.turn_counter,
2179 "experience-sweep dropped (telemetry class at capacity)",
2180 );
2181 }
2182 }
2183 }
2184
2185 if let Some(warning) = self.cache_expiry_warning() {
2187 tracing::info!(warning, "cache expiry warning");
2188 let _ = self.channel.send_status(&warning).await;
2189 }
2190
2191 self.maybe_time_based_microcompact();
2194
2195 self.maybe_apply_deferred_summaries();
2200 self.flush_deferred_summaries().await;
2201
2202 if let Err(e) = self.maybe_proactive_compress().await {
2204 tracing::warn!("proactive compression failed: {e:#}");
2205 }
2206
2207 if let Err(e) = self.maybe_compact().await {
2208 tracing::warn!("context compaction failed: {e:#}");
2209 }
2210
2211 if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
2212 tracing::warn!("context preparation failed: {e:#}");
2213 }
2214
2215 self.provider
2217 .set_memory_confidence(self.services.memory.persistence.last_recall_confidence);
2218
2219 self.services.learning_engine.reset_reflection();
2220 }
2221
2222 fn build_user_message(
2223 &mut self,
2224 text: &str,
2225 image_parts: Vec<zeph_llm::provider::MessagePart>,
2226 ) -> Message {
2227 let mut all_image_parts = std::mem::take(&mut self.msg.pending_image_parts);
2228 all_image_parts.extend(image_parts);
2229
2230 if !all_image_parts.is_empty() && self.provider.supports_vision() {
2231 let mut parts = vec![zeph_llm::provider::MessagePart::Text {
2232 text: text.to_owned(),
2233 }];
2234 parts.extend(all_image_parts);
2235 Message::from_parts(Role::User, parts)
2236 } else {
2237 if !all_image_parts.is_empty() {
2238 tracing::warn!(
2239 count = all_image_parts.len(),
2240 "image attachments dropped: provider does not support vision"
2241 );
2242 }
2243 Message {
2244 role: Role::User,
2245 content: text.to_owned(),
2246 parts: vec![],
2247 metadata: MessageMetadata::default(),
2248 }
2249 }
2250 }
2251
2252 fn drain_background_completions(&mut self) {
2256 const BACKGROUND_COMPLETION_BUFFER_CAP: usize = 16;
2257
2258 let Some(ref mut rx) = self.runtime.lifecycle.background_completion_rx else {
2259 return;
2260 };
2261 while let Ok(completion) = rx.try_recv() {
2263 if self.runtime.lifecycle.pending_background_completions.len()
2264 >= BACKGROUND_COMPLETION_BUFFER_CAP
2265 {
2266 tracing::warn!(
2267 run_id = %completion.run_id,
2268 "background completion buffer full; dropping run result"
2269 );
2270 self.runtime
2273 .lifecycle
2274 .pending_background_completions
2275 .pop_front();
2276 self.runtime
2277 .lifecycle
2278 .pending_background_completions
2279 .push_back(zeph_tools::BackgroundCompletion {
2280 run_id: completion.run_id,
2281 exit_code: -1,
2282 success: false,
2283 elapsed_ms: 0,
2284 command: completion.command,
2285 output: format!(
2286 "[background result for run {} dropped: buffer overflow]",
2287 completion.run_id
2288 ),
2289 });
2290 } else {
2291 self.runtime
2292 .lifecycle
2293 .pending_background_completions
2294 .push_back(completion);
2295 }
2296 }
2297 }
2298
2299 fn build_user_message_text_with_bg_completions(&mut self, user_text: &str) -> String {
2303 if self
2304 .runtime
2305 .lifecycle
2306 .pending_background_completions
2307 .is_empty()
2308 {
2309 return user_text.to_owned();
2310 }
2311 let mut parts = String::new();
2312 for completion in self
2313 .runtime
2314 .lifecycle
2315 .pending_background_completions
2316 .drain(..)
2317 {
2318 let _ = write!(
2319 parts,
2320 "[Background task {} completed]\nexit_code: {}\nsuccess: {}\nelapsed_ms: {}\ncommand: {}\n\n{}\n\n",
2321 completion.run_id,
2322 completion.exit_code,
2323 completion.success,
2324 completion.elapsed_ms,
2325 completion.command,
2326 completion.output,
2327 );
2328 }
2329 parts.push_str(user_text);
2330 parts
2331 }
2332
2333 async fn poll_subagent_until_done(
2337 &mut self,
2338 task_id: &str,
2339 label: &str,
2340 ) -> Option<(String, bool)> {
2341 use zeph_subagent::SubAgentState;
2342 let result = loop {
2343 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
2344
2345 #[allow(clippy::redundant_closure_for_method_calls)]
2349 let pending = self
2350 .services
2351 .orchestration
2352 .subagent_manager
2353 .as_mut()
2354 .and_then(|m| m.try_recv_secret_request());
2355 if let Some((req_task_id, req)) = pending {
2356 let confirm_prompt = format!(
2359 "Sub-agent requests secret '{}'. Allow?",
2360 crate::text::truncate_to_chars(&req.secret_key, 100)
2361 );
2362 let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
2363 if let Some(mgr) = self.services.orchestration.subagent_manager.as_mut() {
2364 if approved {
2365 let ttl = std::time::Duration::from_mins(5);
2366 let key = req.secret_key.clone();
2367 if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
2368 let _ = mgr.deliver_secret(&req_task_id, key);
2369 }
2370 } else {
2371 let _ = mgr.deny_secret(&req_task_id);
2372 }
2373 }
2374 }
2375
2376 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2377 let statuses = mgr.statuses();
2378 let Some((_, status)) = statuses.iter().find(|(id, _)| id == task_id) else {
2379 break (format!("{label} completed (no status available)."), true);
2380 };
2381 match status.state {
2382 SubAgentState::Completed => {
2383 let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
2384 break (format!("{label} completed: {msg}"), true);
2385 }
2386 SubAgentState::Failed => {
2387 let msg = status
2388 .last_message
2389 .clone()
2390 .unwrap_or_else(|| "unknown error".into());
2391 break (format!("{label} failed: {msg}"), false);
2392 }
2393 SubAgentState::Canceled => {
2394 break (format!("{label} was cancelled."), false);
2395 }
2396 _ => {
2397 let _ = self
2398 .channel
2399 .send_status(&format!(
2400 "{label}: turn {}/{}",
2401 status.turns_used,
2402 self.services
2403 .orchestration
2404 .subagent_manager
2405 .as_ref()
2406 .and_then(|m| m.agents_def(task_id))
2407 .map_or(20, |d| d.permissions.max_turns)
2408 ))
2409 .await;
2410 }
2411 }
2412 };
2413 Some(result)
2414 }
2415
2416 fn resolve_agent_id_prefix(&mut self, prefix: &str) -> Option<Result<String, String>> {
2419 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2420 let full_ids: Vec<String> = mgr
2421 .statuses()
2422 .into_iter()
2423 .map(|(tid, _)| tid)
2424 .filter(|tid| tid.starts_with(prefix))
2425 .collect();
2426 Some(match full_ids.as_slice() {
2427 [] => Err(format!("No sub-agent with id prefix '{prefix}'")),
2428 [fid] => Ok(fid.clone()),
2429 _ => Err(format!(
2430 "Ambiguous id prefix '{prefix}': matches {} agents",
2431 full_ids.len()
2432 )),
2433 })
2434 }
2435
2436 fn handle_agent_list(&self) -> Option<String> {
2437 use std::fmt::Write as _;
2438 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2439 let defs = mgr.definitions();
2440 if defs.is_empty() {
2441 return Some("No sub-agent definitions found.".into());
2442 }
2443 let mut out = String::from("Available sub-agents:\n");
2444 for d in defs {
2445 let memory_label = match d.memory {
2446 Some(zeph_subagent::MemoryScope::User) => " [memory:user]",
2447 Some(zeph_subagent::MemoryScope::Project) => " [memory:project]",
2448 Some(zeph_subagent::MemoryScope::Local) => " [memory:local]",
2449 Some(_) => " [memory:unknown]",
2450 None => "",
2451 };
2452 if let Some(ref src) = d.source {
2453 let _ = writeln!(
2454 out,
2455 " {}{} — {} ({})",
2456 d.name, memory_label, d.description, src
2457 );
2458 } else {
2459 let _ = writeln!(out, " {}{} — {}", d.name, memory_label, d.description);
2460 }
2461 }
2462 Some(out)
2463 }
2464
2465 fn handle_agent_status(&self) -> Option<String> {
2466 use std::fmt::Write as _;
2467 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2468 let statuses = mgr.statuses();
2469 if statuses.is_empty() {
2470 return Some("No active sub-agents.".into());
2471 }
2472 let mut out = String::from("Active sub-agents:\n");
2473 for (id, s) in &statuses {
2474 let state = format!("{:?}", s.state).to_lowercase();
2475 let elapsed = s.started_at.elapsed().as_secs();
2476 let _ = writeln!(
2477 out,
2478 " [{short}] {state} turns={t} elapsed={elapsed}s {msg}",
2479 short = &id[..8.min(id.len())],
2480 t = s.turns_used,
2481 msg = s.last_message.as_deref().unwrap_or(""),
2482 );
2483 if let Some(def) = mgr.agents_def(id)
2485 && let Some(scope) = def.memory
2486 && let Ok(dir) = zeph_subagent::memory::resolve_memory_dir(scope, &def.name)
2487 {
2488 let _ = writeln!(out, " memory: {}", dir.display());
2489 }
2490 }
2491 Some(out)
2492 }
2493
2494 fn handle_agent_approve(&mut self, id: &str) -> Option<String> {
2495 let full_id = match self.resolve_agent_id_prefix(id)? {
2496 Ok(fid) => fid,
2497 Err(msg) => return Some(msg),
2498 };
2499 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2500 if let Some((tid, req)) = mgr.try_recv_secret_request()
2501 && tid == full_id
2502 {
2503 let key = req.secret_key.clone();
2504 let ttl = std::time::Duration::from_mins(5);
2505 if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
2506 return Some(format!("Approve failed: {e}"));
2507 }
2508 if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
2509 return Some(format!("Secret delivery failed: {e}"));
2510 }
2511 return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
2512 }
2513 Some(format!(
2514 "No pending secret request for sub-agent '{full_id}'."
2515 ))
2516 }
2517
2518 fn handle_agent_deny(&mut self, id: &str) -> Option<String> {
2519 let full_id = match self.resolve_agent_id_prefix(id)? {
2520 Ok(fid) => fid,
2521 Err(msg) => return Some(msg),
2522 };
2523 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2524 match mgr.deny_secret(&full_id) {
2525 Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
2526 Err(e) => Some(format!("Deny failed: {e}")),
2527 }
2528 }
2529
2530 async fn handle_agent_command(&mut self, cmd: zeph_subagent::AgentCommand) -> Option<String> {
2531 use zeph_subagent::AgentCommand;
2532
2533 match cmd {
2534 AgentCommand::List => self.handle_agent_list(),
2535 AgentCommand::Background { name, prompt } => {
2536 self.handle_agent_background(&name, &prompt)
2537 }
2538 AgentCommand::Spawn { name, prompt }
2539 | AgentCommand::Mention {
2540 agent: name,
2541 prompt,
2542 } => self.handle_agent_spawn_foreground(&name, &prompt).await,
2543 AgentCommand::Status => self.handle_agent_status(),
2544 AgentCommand::Cancel { id } => self.handle_agent_cancel(&id),
2545 AgentCommand::Approve { id } => self.handle_agent_approve(&id),
2546 AgentCommand::Deny { id } => self.handle_agent_deny(&id),
2547 AgentCommand::Resume { id, prompt } => self.handle_agent_resume(&id, &prompt).await,
2548 _ => None,
2549 }
2550 }
2551
2552 pub(crate) fn handle_agents_definitions_list(&self) -> String {
2557 use std::fmt::Write as _;
2558
2559 let Some(mgr) = self.services.orchestration.subagent_manager.as_ref() else {
2560 return String::new();
2561 };
2562 let defs = mgr.definitions();
2563 if defs.is_empty() {
2564 return String::new();
2565 }
2566 let mut out = String::from("Sub-agents:\n");
2567 for d in defs {
2568 let memory_label = match d.memory {
2569 Some(zeph_subagent::MemoryScope::User) => " [memory:user]",
2570 Some(zeph_subagent::MemoryScope::Project) => " [memory:project]",
2571 Some(zeph_subagent::MemoryScope::Local) => " [memory:local]",
2572 Some(_) => " [memory:unknown]",
2573 None => "",
2574 };
2575 if let Some(ref src) = d.source {
2576 let _ = writeln!(
2577 out,
2578 " {}{} — {} ({})",
2579 d.name, memory_label, d.description, src
2580 );
2581 } else {
2582 let _ = writeln!(out, " {}{} — {}", d.name, memory_label, d.description);
2583 }
2584 }
2585 out
2586 }
2587
2588 pub(crate) fn handle_agents_crud(&mut self, cmd: zeph_subagent::AgentsCommand) -> String {
2593 use zeph_subagent::AgentsCommand;
2594
2595 let Some(mgr) = self.services.orchestration.subagent_manager.as_ref() else {
2596 return "Sub-agent manager is not available.".to_owned();
2597 };
2598
2599 match cmd {
2600 AgentsCommand::List => self.handle_agents_definitions_list(),
2601 AgentsCommand::Show { name } => {
2602 match mgr.definitions().iter().find(|d| d.name == name) {
2603 Some(d) => format!(
2604 "Agent: {}\nDescription: {}\nSource: {}\n",
2605 d.name,
2606 d.description,
2607 d.source.as_deref().unwrap_or("unknown"),
2608 ),
2609 None => format!("No sub-agent definition named '{name}'."),
2610 }
2611 }
2612 AgentsCommand::Create { name } => {
2613 format!(
2614 "To create a sub-agent definition, create a file at `.zeph/agents/{name}.md`.\n\
2615 See the sub-agent documentation for the required frontmatter."
2616 )
2617 }
2618 AgentsCommand::Edit { name } => {
2619 format!("To edit '{name}', open its definition file in `.zeph/agents/{name}.md`.")
2620 }
2621 AgentsCommand::Delete { name } => {
2622 format!("To delete '{name}', remove the file `.zeph/agents/{name}.md`.")
2623 }
2624 _ => "Unknown agents command.".to_owned(),
2625 }
2626 }
2627
2628 fn handle_agent_background(&mut self, name: &str, prompt: &str) -> Option<String> {
2629 let provider = self.provider.clone();
2630 let tool_executor = Arc::clone(&self.tool_executor);
2631 let skills = self.filtered_skills_for(name);
2632 let cfg = self.services.orchestration.subagent_config.clone();
2633 let spawn_ctx = self.build_spawn_context(&cfg);
2634 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2635 match mgr.spawn(
2636 name,
2637 prompt,
2638 provider,
2639 tool_executor,
2640 skills,
2641 &cfg,
2642 spawn_ctx,
2643 ) {
2644 Ok(id) => Some(format!(
2645 "Sub-agent '{name}' started in background (id: {short})",
2646 short = &id[..8.min(id.len())]
2647 )),
2648 Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
2649 }
2650 }
2651
2652 async fn handle_agent_spawn_foreground(&mut self, name: &str, prompt: &str) -> Option<String> {
2653 let provider = self.provider.clone();
2654 let tool_executor = Arc::clone(&self.tool_executor);
2655 let skills = self.filtered_skills_for(name);
2656 let cfg = self.services.orchestration.subagent_config.clone();
2657 let spawn_ctx = self.build_spawn_context(&cfg);
2658 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2659 let task_id = match mgr.spawn(
2660 name,
2661 prompt,
2662 provider,
2663 tool_executor,
2664 skills,
2665 &cfg,
2666 spawn_ctx,
2667 ) {
2668 Ok(id) => id,
2669 Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
2670 };
2671 let short = task_id[..8.min(task_id.len())].to_owned();
2672 let _ = self
2673 .channel
2674 .send(&format!("Sub-agent '{name}' running... (id: {short})"))
2675 .await;
2676 let _ = self
2677 .channel
2678 .notify_foreground_subagent_started(&task_id, name)
2679 .await;
2680 let label = format!("Sub-agent '{name}'");
2681 let Some((result, success)) = self.poll_subagent_until_done(&task_id, &label).await else {
2682 let _ = self
2684 .channel
2685 .notify_foreground_subagent_completed(&task_id, name, false)
2686 .await;
2687 return None;
2688 };
2689 let _ = self
2690 .channel
2691 .notify_foreground_subagent_completed(&task_id, name, success)
2692 .await;
2693 Some(result)
2694 }
2695
2696 fn handle_agent_cancel(&mut self, id: &str) -> Option<String> {
2697 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2698 let ids: Vec<String> = mgr
2700 .statuses()
2701 .into_iter()
2702 .map(|(task_id, _)| task_id)
2703 .filter(|task_id| task_id.starts_with(id))
2704 .collect();
2705 match ids.as_slice() {
2706 [] => Some(format!("No sub-agent with id prefix '{id}'")),
2707 [full_id] => {
2708 let full_id = full_id.clone();
2709 match mgr.cancel(&full_id) {
2710 Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
2711 Err(e) => Some(format!("Cancel failed: {e}")),
2712 }
2713 }
2714 _ => Some(format!(
2715 "Ambiguous id prefix '{id}': matches {} agents",
2716 ids.len()
2717 )),
2718 }
2719 }
2720
2721 async fn handle_agent_resume(&mut self, id: &str, prompt: &str) -> Option<String> {
2722 let cfg = self.services.orchestration.subagent_config.clone();
2723 let def_name = {
2726 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2727 match mgr.def_name_for_resume(id, &cfg) {
2728 Ok(name) => name,
2729 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2730 }
2731 };
2732 let skills = self.filtered_skills_for(&def_name);
2733 let provider = self.provider.clone();
2734 let tool_executor = Arc::clone(&self.tool_executor);
2735 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2736 let (task_id, _) = match mgr.resume(id, prompt, provider, tool_executor, skills, &cfg) {
2737 Ok(pair) => pair,
2738 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2739 };
2740 let short = task_id[..8.min(task_id.len())].to_owned();
2741 let _ = self
2742 .channel
2743 .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
2744 .await;
2745 let _ = self
2746 .channel
2747 .notify_foreground_subagent_started(&task_id, &def_name)
2748 .await;
2749 let Some((result, success)) = self
2750 .poll_subagent_until_done(&task_id, "Resumed sub-agent")
2751 .await
2752 else {
2753 let _ = self
2755 .channel
2756 .notify_foreground_subagent_completed(&task_id, &def_name, false)
2757 .await;
2758 return None;
2759 };
2760 let _ = self
2761 .channel
2762 .notify_foreground_subagent_completed(&task_id, &def_name, success)
2763 .await;
2764 Some(result)
2765 }
2766
2767 fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
2768 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2769 let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
2770 let reg = self.services.skill.registry.read();
2771 match zeph_subagent::filter_skills(®, &def.skills) {
2772 Ok(skills) => {
2773 let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
2774 if bodies.is_empty() {
2775 None
2776 } else {
2777 Some(bodies)
2778 }
2779 }
2780 Err(e) => {
2781 tracing::warn!(error = %e, "skill filtering failed for sub-agent");
2782 None
2783 }
2784 }
2785 }
2786
2787 fn build_spawn_context(
2789 &self,
2790 cfg: &zeph_config::SubAgentConfig,
2791 ) -> zeph_subagent::SpawnContext {
2792 zeph_subagent::SpawnContext {
2793 parent_messages: self.extract_parent_messages(cfg),
2794 parent_cancel: Some(self.runtime.lifecycle.cancel_token.clone()),
2795 parent_provider_name: {
2796 let name = &self.runtime.config.active_provider_name;
2797 if name.is_empty() {
2798 None
2799 } else {
2800 Some(name.clone())
2801 }
2802 },
2803 spawn_depth: self.runtime.config.spawn_depth,
2804 mcp_tool_names: self.extract_mcp_tool_names(),
2805 seed_trajectory_score: {
2807 let child = self.services.security.trajectory.spawn_child();
2808 let score = child.score_now();
2809 if score > 0.0 { Some(score) } else { None }
2810 },
2811 content_isolation: self.runtime.config.security.content_isolation.clone(),
2812 orchestrator_name: Some("zeph".to_owned()),
2813 orchestrator_role: Some("orchestrator".to_owned()),
2814 session_mcp_servers: Vec::new(),
2815 }
2816 }
2817
2818 fn extract_parent_messages(
2825 &self,
2826 config: &zeph_config::SubAgentConfig,
2827 ) -> Vec<zeph_llm::provider::Message> {
2828 use zeph_config::ParentContextPolicy;
2829 use zeph_llm::provider::Role;
2830
2831 if config.parent_context_policy == ParentContextPolicy::None
2832 || config.context_window_turns == 0
2833 {
2834 return Vec::new();
2835 }
2836
2837 let non_system: Vec<_> = self
2838 .msg
2839 .messages
2840 .iter()
2841 .filter(|m| m.role != Role::System)
2842 .cloned()
2843 .collect();
2844
2845 let take_count = config
2846 .context_window_turns
2847 .saturating_mul(2)
2848 .min(config.max_parent_messages);
2849 let start = non_system.len().saturating_sub(take_count);
2850 let mut msgs = non_system[start..].to_vec();
2851
2852 let max_chars = 128_000usize / 4;
2854 let requested = msgs.len();
2855 trim_parent_messages(&mut msgs, max_chars);
2856 if msgs.len() < requested {
2857 tracing::info!(
2858 kept = msgs.len(),
2859 requested,
2860 "[subagent] truncated parent history due to token budget or orphan pruning"
2861 );
2862 }
2863
2864 if config.parent_context_policy == ParentContextPolicy::InheritSanitized {
2865 use zeph_sanitizer::{ContentSource, ContentSourceKind};
2866 let source =
2867 ContentSource::new(ContentSourceKind::A2aMessage).with_identifier("parent_history");
2868 msgs = sanitize_parent_messages(msgs, &self.services.security.sanitizer, &source);
2869 }
2870
2871 msgs
2872 }
2873
2874 fn extract_mcp_tool_names(&self) -> Vec<String> {
2876 self.tool_executor
2877 .tool_definitions_erased()
2878 .into_iter()
2879 .filter(|t| t.id.starts_with("mcp_"))
2880 .map(|t| t.id.to_string())
2881 .collect()
2882 }
2883
2884 fn classify_source_kind(
2888 skill_dir: &std::path::Path,
2889 managed_dir: Option<&std::path::PathBuf>,
2890 bundled_names: &std::collections::HashSet<String>,
2891 ) -> zeph_memory::store::SourceKind {
2892 if managed_dir.is_some_and(|d| skill_dir.starts_with(d)) {
2893 let skill_name = skill_dir.file_name().and_then(|n| n.to_str()).unwrap_or("");
2894 let has_marker = skill_dir.join(".bundled").exists();
2895 if has_marker && bundled_names.contains(skill_name) {
2896 zeph_memory::store::SourceKind::Bundled
2897 } else {
2898 if has_marker {
2899 tracing::warn!(
2900 skill = %skill_name,
2901 "skill has .bundled marker but is not in the bundled skill \
2902 allowlist — classifying as Hub"
2903 );
2904 }
2905 zeph_memory::store::SourceKind::Hub
2906 }
2907 } else {
2908 zeph_memory::store::SourceKind::Local
2909 }
2910 }
2911
2912 async fn update_trust_for_reloaded_skills(
2914 &mut self,
2915 all_meta: &[zeph_skills::loader::SkillMeta],
2916 ) {
2917 let memory = self.services.memory.persistence.memory.clone();
2919 let Some(memory) = memory else {
2920 return;
2921 };
2922 let trust_cfg = self.services.skill.trust_config.clone();
2923 let managed_dir = self.services.skill.managed_dir.clone();
2924 let bundled_names: std::collections::HashSet<String> =
2925 zeph_skills::bundled_skill_names().into_iter().collect();
2926 for meta in all_meta {
2927 let skill_dir = meta.skill_dir.clone();
2930 let managed_dir_ref = managed_dir.clone();
2931 let bundled_names_ref = bundled_names.clone();
2932 let fs_result: Option<(String, zeph_memory::store::SourceKind)> =
2933 tokio::task::spawn_blocking(move || {
2934 let hash = zeph_skills::compute_skill_hash(&skill_dir).ok()?;
2935 let source_kind = Self::classify_source_kind(
2936 &skill_dir,
2937 managed_dir_ref.as_ref(),
2938 &bundled_names_ref,
2939 );
2940 Some((hash, source_kind))
2941 })
2942 .await
2943 .unwrap_or(None);
2944
2945 let Some((current_hash, source_kind)) = fs_result else {
2946 tracing::warn!("failed to compute hash for '{}'", meta.name);
2947 continue;
2948 };
2949 let initial_level = match source_kind {
2950 zeph_memory::store::SourceKind::Bundled => &trust_cfg.bundled_level,
2951 zeph_memory::store::SourceKind::Local | zeph_memory::store::SourceKind::File => {
2952 &trust_cfg.local_level
2953 }
2954 _ => &trust_cfg.default_level,
2955 };
2956 let existing = memory
2957 .sqlite()
2958 .load_skill_trust(&meta.name)
2959 .await
2960 .ok()
2961 .flatten();
2962 let trust_level = if let Some(ref row) = existing {
2963 if row.blake3_hash != current_hash {
2964 trust_cfg.hash_mismatch_level
2965 } else if row.source_kind != source_kind {
2966 let stored = row.trust_level;
2970 if !stored.is_active() || stored.severity() <= initial_level.severity() {
2971 stored
2972 } else {
2973 *initial_level
2974 }
2975 } else {
2976 row.trust_level
2977 }
2978 } else {
2979 *initial_level
2980 };
2981 let source_path = meta.skill_dir.to_str();
2982 if let Err(e) = memory
2983 .sqlite()
2984 .upsert_skill_trust(
2985 &meta.name,
2986 trust_level,
2987 source_kind,
2988 None,
2989 source_path,
2990 ¤t_hash,
2991 )
2992 .await
2993 {
2994 tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
2995 }
2996 }
2997 }
2998
2999 async fn rebuild_skill_matcher(&mut self, all_meta: &[&zeph_skills::loader::SkillMeta]) {
3001 let provider = self.embedding_provider.clone();
3002 let embed_timeout =
3003 std::time::Duration::from_secs(self.runtime.config.timeouts.embedding_seconds);
3004 let embed_fn = move |text: &str| -> zeph_skills::matcher::EmbedFuture {
3005 let owned = text.to_owned();
3006 let p = provider.clone();
3007 Box::pin(async move {
3008 if let Ok(result) = tokio::time::timeout(embed_timeout, p.embed(&owned)).await {
3009 result
3010 } else {
3011 tracing::warn!(
3012 timeout_secs = embed_timeout.as_secs(),
3013 "skill matcher: embedding timed out"
3014 );
3015 Err(zeph_llm::LlmError::Timeout)
3016 }
3017 })
3018 };
3019
3020 let needs_inmemory_rebuild = !self
3021 .services
3022 .skill
3023 .matcher
3024 .as_ref()
3025 .is_some_and(SkillMatcherBackend::is_qdrant);
3026
3027 if needs_inmemory_rebuild {
3028 self.services.skill.matcher = SkillMatcher::new(all_meta, embed_fn)
3029 .await
3030 .map(SkillMatcherBackend::InMemory);
3031 } else if let Some(ref mut backend) = self.services.skill.matcher {
3032 let _ = self.channel.send_status("syncing skill index...").await;
3033 let on_progress: Option<Box<dyn Fn(usize, usize) + Send>> =
3034 self.services.session.status_tx.clone().map(
3035 |tx| -> Box<dyn Fn(usize, usize) + Send> {
3036 Box::new(move |completed, total| {
3037 let msg = format!("Syncing skills: {completed}/{total}");
3038 let _ = tx.send(msg);
3039 })
3040 },
3041 );
3042 if let Err(e) = backend
3043 .sync(
3044 all_meta,
3045 &self.services.skill.embedding_model,
3046 embed_fn,
3047 on_progress,
3048 )
3049 .await
3050 {
3051 tracing::warn!("failed to sync skill embeddings: {e:#}");
3052 }
3053 }
3054
3055 if self.services.skill.hybrid_search {
3056 let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
3057 let _ = self.channel.send_status("rebuilding search index...").await;
3058 self.services.skill.rebuild_bm25(&descs);
3059 }
3060 }
3061
3062 #[tracing::instrument(name = "core.agent.reload_skills", skip_all, level = "debug")]
3063 async fn reload_skills(&mut self) {
3064 let old_fp = self.services.skill.fingerprint();
3065 let reload_paths = if let Some(ref supplier) = self.services.skill.plugin_dirs_supplier {
3066 let plugin_dirs = supplier();
3067 let mut paths = self.services.skill.skill_paths.clone();
3068 for dir in plugin_dirs {
3069 if !paths.contains(&dir) {
3070 paths.push(dir);
3071 }
3072 }
3073 paths
3074 } else {
3075 self.services.skill.skill_paths.clone()
3076 };
3077 self.services.skill.registry.write().reload(&reload_paths);
3078 if self.services.skill.fingerprint() == old_fp {
3079 return;
3080 }
3081 let _ = self.channel.send_status("reloading skills...").await;
3082
3083 let all_meta = self
3084 .services
3085 .skill
3086 .registry
3087 .read()
3088 .all_meta()
3089 .into_iter()
3090 .cloned()
3091 .collect::<Vec<_>>();
3092
3093 self.update_trust_for_reloaded_skills(&all_meta).await;
3094
3095 let all_meta_refs = all_meta.iter().collect::<Vec<_>>();
3096 self.rebuild_skill_matcher(&all_meta_refs).await;
3097
3098 let all_skills: Vec<Skill> = {
3099 let reg = self.services.skill.registry.read();
3100 reg.all_meta()
3101 .iter()
3102 .filter_map(|m| reg.skill(&m.name).ok())
3103 .collect()
3104 };
3105 let trust_map = self.build_skill_trust_map().await;
3106 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
3107 let skills_prompt =
3108 state::SkillState::rebuild_prompt(&all_skills, &trust_map, &empty_health);
3109 self.services
3110 .skill
3111 .last_skills_prompt
3112 .clone_from(&skills_prompt);
3113 let system_prompt = build_system_prompt(&skills_prompt, None);
3114 if let Some(msg) = self.msg.messages.first_mut() {
3115 msg.content = system_prompt;
3116 }
3117
3118 let _ = self.channel.send_status("").await;
3119 tracing::info!(
3120 "reloaded {} skill(s)",
3121 self.services.skill.registry.read().all_meta().len()
3122 );
3123 }
3124
3125 async fn reload_instructions(&mut self) {
3126 if let Some(ref mut rx) = self.runtime.instructions.reload_rx {
3128 while rx.try_recv().is_ok() {}
3129 }
3130 let Some(ref state) = self.runtime.instructions.reload_state else {
3131 return;
3132 };
3133 let base_dir = state.base_dir.clone();
3134 let provider_kinds = state.provider_kinds.clone();
3135 let explicit_files = state.explicit_files.clone();
3136 let auto_detect = state.auto_detect;
3137 let new_blocks = crate::instructions::load_instructions_async(
3138 base_dir,
3139 provider_kinds,
3140 explicit_files,
3141 auto_detect,
3142 )
3143 .await;
3144 let old_sources: std::collections::HashSet<_> = self
3145 .runtime
3146 .instructions
3147 .blocks
3148 .iter()
3149 .map(|b| &b.source)
3150 .collect();
3151 let new_sources: std::collections::HashSet<_> =
3152 new_blocks.iter().map(|b| &b.source).collect();
3153 for added in new_sources.difference(&old_sources) {
3154 tracing::info!(path = %added.display(), "instruction file added");
3155 }
3156 for removed in old_sources.difference(&new_sources) {
3157 tracing::info!(path = %removed.display(), "instruction file removed");
3158 }
3159 tracing::info!(
3160 old_count = self.runtime.instructions.blocks.len(),
3161 new_count = new_blocks.len(),
3162 "reloaded instruction files"
3163 );
3164 self.runtime.instructions.blocks = new_blocks;
3165 }
3166
3167 #[allow(clippy::too_many_lines)]
3168 fn reload_config(&mut self) {
3169 let Some(path) = self.runtime.lifecycle.config_path.clone() else {
3170 return;
3171 };
3172 let Some(config) = self.load_config_with_overlay(&path) else {
3173 return;
3174 };
3175 let budget_tokens = resolve_context_budget(&config, &self.provider);
3176 self.runtime.config.security = config.security;
3177 self.runtime.config.timeouts = config.timeouts;
3178 self.runtime.config.redact_credentials = config.memory.redact_credentials;
3179 self.services.memory.persistence.history_limit = config.memory.history_limit;
3180 self.services.memory.persistence.recall_limit = config.memory.semantic.recall_limit;
3181 self.services.memory.compaction.summarization_threshold =
3182 config.memory.summarization_threshold;
3183 self.services.skill.max_active_skills = config.skills.max_active_skills.get();
3184 self.services.skill.disambiguation_threshold = config.skills.disambiguation_threshold;
3185 self.services.skill.min_injection_score = config.skills.min_injection_score;
3186 self.services.skill.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
3187 self.services.skill.hybrid_search = config.skills.hybrid_search;
3188 {
3189 let alpha = config.skills.bm25_alpha;
3190 if !(0.0..=1.0).contains(&alpha) {
3191 tracing::warn!(
3192 bm25_alpha = alpha,
3193 "bm25_alpha is outside [0.0, 1.0]; clamping to valid range"
3194 );
3195 }
3196 self.services.skill.bm25_alpha = alpha.clamp(0.0, 1.0);
3197 }
3198 self.services.skill.two_stage_matching = config.skills.two_stage_matching;
3199 self.services.skill.confusability_threshold =
3200 config.skills.confusability_threshold.clamp(0.0, 1.0);
3201 self.services.skill.group_structured = config.skills.group_structured;
3202 self.services.skill.support_similarity_threshold =
3203 config.skills.support_similarity_threshold;
3204 config
3205 .skills
3206 .query_rewrite_provider
3207 .as_str()
3208 .clone_into(&mut self.services.skill.query_rewrite_provider_name);
3209 config
3210 .skills
3211 .generation_provider
3212 .as_str()
3213 .clone_into(&mut self.services.skill.generation_provider_name);
3214 config
3215 .skills
3216 .disambiguate_provider
3217 .as_str()
3218 .clone_into(&mut self.services.skill.disambiguate_provider_name);
3219 self.services.skill.generation_timeout_ms = config.skills.generation_timeout_ms;
3220 self.services.skill.generation_output_dir =
3221 config.skills.generation_output_dir.as_deref().map(|p| {
3222 if let Some(stripped) = p.strip_prefix("~/") {
3223 dirs::home_dir()
3224 .map_or_else(|| std::path::PathBuf::from(p), |h| h.join(stripped))
3225 } else {
3226 std::path::PathBuf::from(p)
3227 }
3228 });
3229
3230 self.context_manager.budget = Some(
3231 ContextBudget::new(budget_tokens, 0.20).with_graph_enabled(config.memory.graph.enabled),
3232 );
3233
3234 {
3235 let graph_cfg = &config.memory.graph;
3236 if graph_cfg.rpe.enabled {
3237 if self.services.memory.extraction.rpe_router.is_none() {
3239 self.services.memory.extraction.rpe_router =
3240 Some(std::sync::Mutex::new(zeph_memory::RpeRouter::new(
3241 graph_cfg.rpe.threshold,
3242 graph_cfg.rpe.max_skip_turns,
3243 )));
3244 }
3245 } else {
3246 self.services.memory.extraction.rpe_router = None;
3247 }
3248 self.services.memory.extraction.graph_config = graph_cfg.clone();
3249 }
3250 self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
3251 self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
3252 self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
3253 self.context_manager
3254 .set_compaction_cooldown_turns(config.memory.compaction_cooldown_turns);
3255 self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
3256 self.context_manager.compression = config.memory.compression.clone();
3257 self.context_manager.routing = config.memory.store_routing.clone();
3258 self.context_manager.store_routing_provider = if config
3260 .memory
3261 .store_routing
3262 .routing_classifier_provider
3263 .is_empty()
3264 {
3265 None
3266 } else {
3267 let resolved = self.resolve_background_provider(
3268 config
3269 .memory
3270 .store_routing
3271 .routing_classifier_provider
3272 .as_str(),
3273 );
3274 Some(std::sync::Arc::new(resolved))
3275 };
3276 self.services
3277 .memory
3278 .persistence
3279 .cross_session_score_threshold = config.memory.cross_session_score_threshold;
3280
3281 self.services.index.repo_map_tokens = config.index.repo_map_tokens;
3282 self.services.index.repo_map_ttl =
3283 std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
3284
3285 self.services
3286 .session
3287 .hooks_config
3288 .cwd_changed
3289 .clone_from(&config.hooks.cwd_changed);
3290 self.services
3291 .session
3292 .hooks_config
3293 .permission_denied
3294 .clone_from(&config.hooks.permission_denied);
3295 self.services
3296 .session
3297 .hooks_config
3298 .turn_complete
3299 .clone_from(&config.hooks.turn_complete);
3300 tracing::info!("config reloaded");
3303 }
3304
3305 fn load_config_with_overlay(&mut self, path: &std::path::Path) -> Option<Config> {
3309 let mut config = match Config::load(path) {
3310 Ok(c) => c,
3311 Err(e) => {
3312 tracing::warn!("config reload failed: {e:#}");
3313 return None;
3314 }
3315 };
3316
3317 let new_overlay = if self.runtime.lifecycle.plugins_dir.as_os_str().is_empty() {
3319 None
3320 } else {
3321 match zeph_plugins::apply_plugin_config_overlays(
3322 &mut config,
3323 &self.runtime.lifecycle.plugins_dir,
3324 ) {
3325 Ok(o) => Some(o),
3326 Err(e) => {
3327 tracing::warn!(
3328 "plugin overlay merge failed during reload: {e:#}; \
3329 keeping previous runtime state"
3330 );
3331 return None;
3332 }
3333 }
3334 };
3335
3336 if let Some(ref overlay) = new_overlay {
3340 self.warn_on_shell_overlay_divergence(overlay, &config);
3341 }
3342 Some(config)
3343 }
3344
3345 fn warn_on_shell_overlay_divergence(
3351 &self,
3352 new_overlay: &zeph_plugins::ResolvedOverlay,
3353 config: &Config,
3354 ) {
3355 let new_blocked: Vec<String> = {
3356 let mut v = config.tools.shell.blocked_commands.clone();
3357 v.sort();
3358 v
3359 };
3360 let new_allowed: Vec<String> = {
3361 let mut v = config.tools.shell.allowed_commands.clone();
3362 v.sort();
3363 v
3364 };
3365
3366 let startup = &self.runtime.lifecycle.startup_shell_overlay;
3367 let blocked_changed = new_blocked != startup.blocked;
3368 let allowed_changed = new_allowed != startup.allowed;
3369
3370 if blocked_changed && let Some(ref h) = self.runtime.lifecycle.shell_policy_handle {
3372 h.rebuild(&config.tools.shell);
3373 tracing::info!(
3374 blocked_count = h.snapshot_blocked().len(),
3375 "shell blocked_commands rebuilt from hot-reload"
3376 );
3377 }
3378
3379 if allowed_changed {
3386 let msg = "plugin config overlay changed shell allowed_commands; RESTART REQUIRED \
3387 for sandbox path recomputation (blocked_commands was rebuilt live)";
3388 tracing::warn!("{msg}");
3389 if let Some(ref tx) = self.services.session.status_tx {
3390 let _ = tx.send(msg.to_owned());
3391 }
3392 }
3393
3394 let _ = new_overlay;
3395 }
3396
3397 fn maybe_sidequest_eviction(&mut self) {
3408 if self.services.sidequest.config.enabled {
3412 use crate::config::PruningStrategy;
3413 if !matches!(
3414 self.context_manager.compression.pruning_strategy,
3415 PruningStrategy::Reactive
3416 ) {
3417 tracing::warn!(
3418 strategy = ?self.context_manager.compression.pruning_strategy,
3419 "sidequest is enabled alongside a non-Reactive pruning strategy; \
3420 consider disabling sidequest.enabled to avoid redundant eviction"
3421 );
3422 }
3423 }
3424
3425 if self.services.focus.is_active() {
3427 tracing::debug!("sidequest: skipping — focus session active");
3428 self.services.compression.pending_sidequest_result = None;
3430 return;
3431 }
3432
3433 self.sidequest_apply_pending();
3435
3436 self.sidequest_schedule_next();
3438 }
3439
3440 fn sidequest_apply_pending(&mut self) {
3441 let Some(handle) = self.services.compression.pending_sidequest_result.take() else {
3442 return;
3443 };
3444 let result = match handle.try_join() {
3447 Ok(result) => result,
3448 Err(_handle) => {
3449 tracing::debug!("sidequest: background LLM task not yet complete, rescheduling");
3451 return;
3452 }
3453 };
3454 match result {
3455 Ok(Some(evicted_indices)) if !evicted_indices.is_empty() => {
3456 let cursors_snapshot = self.services.sidequest.tool_output_cursors.clone();
3457 let freed = self.services.sidequest.apply_eviction(
3458 &mut self.msg.messages,
3459 &evicted_indices,
3460 &self.runtime.metrics.token_counter,
3461 );
3462 if freed > 0 {
3463 self.recompute_prompt_tokens();
3464 self.context_manager.set_compaction_state(
3467 crate::agent::context_manager::CompactionState::CompactedThisTurn {
3468 cooldown: 0,
3469 },
3470 );
3471 tracing::info!(
3472 freed_tokens = freed,
3473 evicted_cursors = evicted_indices.len(),
3474 pass = self.services.sidequest.passes_run,
3475 "sidequest eviction complete"
3476 );
3477 if let Some(ref d) = self.runtime.debug.debug_dumper {
3478 d.dump_sidequest_eviction(&cursors_snapshot, &evicted_indices, freed);
3479 }
3480 if let Some(ref tx) = self.services.session.status_tx {
3481 let _ = tx.send(format!("SideQuest evicted {freed} tokens"));
3482 }
3483 } else {
3484 if let Some(ref tx) = self.services.session.status_tx {
3486 let _ = tx.send(String::new());
3487 }
3488 }
3489 }
3490 Ok(None | Some(_)) => {
3491 tracing::debug!("sidequest: pending result: no cursors to evict");
3492 if let Some(ref tx) = self.services.session.status_tx {
3493 let _ = tx.send(String::new());
3494 }
3495 }
3496 Err(e) => {
3497 tracing::debug!("sidequest: background task error: {e}");
3498 if let Some(ref tx) = self.services.session.status_tx {
3499 let _ = tx.send(String::new());
3500 }
3501 }
3502 }
3503 }
3504
3505 fn sidequest_schedule_next(&mut self) {
3506 use zeph_llm::provider::{Message, MessageMetadata, Role};
3507
3508 self.services
3509 .sidequest
3510 .rebuild_cursors(&self.msg.messages, &self.runtime.metrics.token_counter);
3511
3512 if self.services.sidequest.tool_output_cursors.is_empty() {
3513 tracing::debug!("sidequest: no eligible cursors");
3514 return;
3515 }
3516
3517 let prompt = self.services.sidequest.build_eviction_prompt();
3518 let max_eviction_ratio = self.services.sidequest.config.max_eviction_ratio;
3519 let n_cursors = self.services.sidequest.tool_output_cursors.len();
3520 let provider = self.summary_or_primary_provider().clone();
3522
3523 let eviction_future = async move {
3524 let msgs = [Message {
3525 role: Role::User,
3526 content: prompt,
3527 parts: vec![],
3528 metadata: MessageMetadata::default(),
3529 }];
3530 let response =
3531 match tokio::time::timeout(std::time::Duration::from_secs(5), provider.chat(&msgs))
3532 .await
3533 {
3534 Ok(Ok(r)) => r,
3535 Ok(Err(e)) => {
3536 tracing::debug!("sidequest bg: LLM call failed: {e:#}");
3537 return None;
3538 }
3539 Err(_) => {
3540 tracing::debug!("sidequest bg: LLM call timed out");
3541 return None;
3542 }
3543 };
3544
3545 let start = response.find('{')?;
3546 let end = response.rfind('}')?;
3547 if start > end {
3548 return None;
3549 }
3550 let json_slice = &response[start..=end];
3551 let parsed: sidequest::EvictionResponse = serde_json::from_str(json_slice).ok()?;
3552 let mut valid: Vec<usize> = parsed
3553 .del_cursors
3554 .into_iter()
3555 .filter(|&c| c < n_cursors)
3556 .collect();
3557 valid.sort_unstable();
3558 valid.dedup();
3559 #[allow(
3560 clippy::cast_precision_loss,
3561 clippy::cast_possible_truncation,
3562 clippy::cast_sign_loss
3563 )]
3564 let max_evict = ((n_cursors as f32) * max_eviction_ratio).ceil() as usize;
3565 valid.truncate(max_evict);
3566 Some(valid)
3567 };
3568 let handle = self.runtime.lifecycle.task_supervisor.spawn_oneshot(
3569 std::sync::Arc::from("agent.sidequest.eviction"),
3570 move || eviction_future,
3571 );
3572 self.services.compression.pending_sidequest_result = Some(handle);
3573 tracing::debug!("sidequest: background LLM eviction task spawned");
3574 if let Some(ref tx) = self.services.session.status_tx {
3575 let _ = tx.send("SideQuest: scoring tool outputs...".into());
3576 }
3577 }
3578
3579 fn mcp_dispatch(&self) -> Option<McpManagerDispatch> {
3581 self.services
3582 .mcp
3583 .manager
3584 .as_ref()
3585 .map(|m| McpManagerDispatch(Arc::clone(m)))
3586 }
3587
3588 pub(crate) async fn check_cwd_changed(&mut self) {
3594 let current = match std::env::current_dir() {
3595 Ok(p) => p,
3596 Err(e) => {
3597 tracing::warn!("check_cwd_changed: failed to get cwd: {e}");
3598 return;
3599 }
3600 };
3601 if current == self.runtime.lifecycle.last_known_cwd {
3602 return;
3603 }
3604 let old_cwd =
3605 std::mem::replace(&mut self.runtime.lifecycle.last_known_cwd, current.clone());
3606 self.services.session.env_context.working_dir = current.display().to_string();
3607
3608 tracing::info!(
3609 old = %old_cwd.display(),
3610 new = %current.display(),
3611 "working directory changed"
3612 );
3613
3614 let _ = self
3615 .channel
3616 .send_status("Working directory changed\u{2026}")
3617 .await;
3618
3619 let hooks = self.services.session.hooks_config.cwd_changed.clone();
3620 if hooks.is_empty() {
3621 tracing::debug!("CwdChanged: no hooks configured, skipping");
3622 } else {
3623 tracing::debug!(count = hooks.len(), "CwdChanged: firing hooks");
3624 let mut env = std::collections::HashMap::new();
3625 env.insert("ZEPH_OLD_CWD".to_owned(), old_cwd.display().to_string());
3626 env.insert("ZEPH_NEW_CWD".to_owned(), current.display().to_string());
3627 let dispatch = self.mcp_dispatch();
3628 let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
3629 .as_ref()
3630 .map(|d| d as &dyn zeph_subagent::McpDispatch);
3631 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp, None).await {
3632 tracing::warn!(error = %e, "CwdChanged hook failed");
3633 } else {
3634 tracing::info!(count = hooks.len(), "CwdChanged: hooks fired");
3635 }
3636 }
3637
3638 let _ = self.channel.send_status("").await;
3639 }
3640
3641 pub(crate) async fn handle_file_changed(
3643 &mut self,
3644 event: crate::file_watcher::FileChangedEvent,
3645 ) {
3646 tracing::info!(path = %event.path.display(), "file changed");
3647
3648 let _ = self
3649 .channel
3650 .send_status("Running file-change hook\u{2026}")
3651 .await;
3652
3653 let hooks = self
3654 .services
3655 .session
3656 .hooks_config
3657 .file_changed_hooks
3658 .clone();
3659 if hooks.is_empty() {
3660 tracing::debug!(path = %event.path.display(), "FileChanged: no hooks configured, skipping");
3661 } else {
3662 tracing::debug!(count = hooks.len(), path = %event.path.display(), "FileChanged: firing hooks");
3663 let mut env = std::collections::HashMap::new();
3664 env.insert(
3665 "ZEPH_CHANGED_PATH".to_owned(),
3666 event.path.display().to_string(),
3667 );
3668 let dispatch = self.mcp_dispatch();
3669 let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
3670 .as_ref()
3671 .map(|d| d as &dyn zeph_subagent::McpDispatch);
3672 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp, None).await {
3673 tracing::warn!(error = %e, "FileChanged hook failed");
3674 } else {
3675 tracing::info!(count = hooks.len(), path = %event.path.display(), "FileChanged: hooks fired");
3676 }
3677 }
3678
3679 let _ = self.channel.send_status("").await;
3680 }
3681
3682 pub(super) fn maybe_spawn_promotion_scan(&mut self) {
3692 let Some(engine) = self.services.promotion_engine.clone() else {
3693 return;
3694 };
3695
3696 let Some(memory) = self.services.memory.persistence.memory.clone() else {
3697 return;
3698 };
3699
3700 let promotion_window = 200usize;
3703
3704 let accepted = self.runtime.lifecycle.supervisor.spawn(
3705 agent_supervisor::TaskClass::Enrichment,
3706 "compression_spectrum.promotion_scan",
3707 async move {
3708 let span = tracing::info_span!("memory.compression.promote.background");
3709 let _enter = span.enter();
3710
3711 let window = match memory.load_promotion_window(promotion_window).await {
3712 Ok(w) => w,
3713 Err(e) => {
3714 tracing::warn!(error = %e, "promotion scan: failed to load window");
3715 return;
3716 }
3717 };
3718
3719 if window.is_empty() {
3720 return;
3721 }
3722
3723 let candidates = match engine.scan(&window).await {
3724 Ok(c) => c,
3725 Err(e) => {
3726 tracing::warn!(error = %e, "promotion scan: clustering failed");
3727 return;
3728 }
3729 };
3730
3731 for candidate in &candidates {
3732 if let Err(e) = engine.promote(candidate).await {
3733 tracing::warn!(
3734 signature = %candidate.signature,
3735 error = %e,
3736 "promotion scan: promote failed"
3737 );
3738 }
3739 }
3740
3741 tracing::info!(candidates = candidates.len(), "promotion scan: complete");
3742 },
3743 );
3744
3745 if accepted {
3746 tracing::debug!("compression_spectrum: promotion scan task enqueued");
3747 }
3748 }
3749}
3750pub(crate) fn estimate_parts_size(m: &zeph_llm::provider::Message) -> usize {
3758 use zeph_llm::provider::MessagePart;
3759 if m.parts.is_empty() {
3760 return m.content.len();
3761 }
3762 m.parts
3763 .iter()
3764 .map(|p| match p {
3765 MessagePart::Text { text }
3766 | MessagePart::Recall { text }
3767 | MessagePart::CodeContext { text }
3768 | MessagePart::Summary { text }
3769 | MessagePart::CrossSession { text } => text.len(),
3770 MessagePart::ToolOutput { body, .. } => body.len(),
3771 MessagePart::ToolUse { id, name, input } => {
3772 50 + id.len() + name.len() + input.to_string().len()
3773 }
3774 MessagePart::ToolResult {
3775 tool_use_id,
3776 content,
3777 ..
3778 } => 50 + tool_use_id.len() + content.len(),
3779 MessagePart::Image(img) => img.data.len() * 4 / 3,
3780 MessagePart::ThinkingBlock {
3781 thinking,
3782 signature,
3783 } => 50 + thinking.len() + signature.len(),
3784 MessagePart::RedactedThinkingBlock { data } => data.len(),
3785 MessagePart::Compaction { summary } => summary.len(),
3786 _ => 0,
3787 })
3788 .sum()
3789}
3790
3791pub(crate) fn trim_parent_messages(msgs: &mut Vec<zeph_llm::provider::Message>, max_chars: usize) {
3810 use zeph_llm::provider::{MessagePart, Role};
3811
3812 let mut total_chars = 0usize;
3816 let mut drop_before = 0usize; for (i, m) in msgs.iter().enumerate().rev() {
3818 total_chars += estimate_parts_size(m);
3819 if total_chars > max_chars {
3820 drop_before = i + 1;
3821 break;
3822 }
3823 }
3824 if drop_before > 0 {
3825 msgs.drain(..drop_before);
3826 }
3827
3828 let emitted_tool_ids: std::collections::HashSet<String> = msgs
3832 .iter()
3833 .filter(|m| m.role == Role::Assistant)
3834 .flat_map(|m| m.parts.iter())
3835 .filter_map(|p| {
3836 if let MessagePart::ToolUse { id, .. } = p {
3837 Some(id.clone())
3838 } else {
3839 None
3840 }
3841 })
3842 .collect();
3843
3844 let mut orphans_removed = 0usize;
3845 for m in msgs.iter_mut() {
3846 if m.role != Role::User || m.parts.is_empty() {
3847 continue;
3848 }
3849 let before = m.parts.len();
3850 m.parts.retain(|p| match p {
3851 MessagePart::ToolResult { tool_use_id, .. } => {
3852 emitted_tool_ids.contains(tool_use_id.as_str())
3853 }
3854 _ => true,
3855 });
3856 let dropped = before - m.parts.len();
3857 if dropped > 0 {
3858 orphans_removed += dropped;
3859 if m.parts.is_empty() {
3860 m.content.clear();
3861 } else {
3862 m.rebuild_content();
3863 }
3864 }
3865 }
3866
3867 let consumed_tool_ids: std::collections::HashSet<String> = msgs
3875 .iter()
3876 .filter(|m| m.role == Role::User)
3877 .flat_map(|m| m.parts.iter())
3878 .filter_map(|p| {
3879 if let MessagePart::ToolResult { tool_use_id, .. } = p {
3880 Some(tool_use_id.clone())
3881 } else {
3882 None
3883 }
3884 })
3885 .collect();
3886
3887 let last_assistant_idx = msgs
3889 .iter()
3890 .enumerate()
3891 .rev()
3892 .find(|(_, m)| m.role == Role::Assistant)
3893 .map(|(i, _)| i);
3894
3895 for (idx, m) in msgs.iter_mut().enumerate() {
3896 if m.role != Role::Assistant || m.parts.is_empty() {
3897 continue;
3898 }
3899 if Some(idx) == last_assistant_idx {
3901 continue;
3902 }
3903 let before = m.parts.len();
3904 m.parts.retain(|p| match p {
3905 MessagePart::ToolUse { id, .. } => consumed_tool_ids.contains(id.as_str()),
3906 _ => true,
3907 });
3908 let dropped = before - m.parts.len();
3909 if dropped > 0 {
3910 orphans_removed += dropped;
3911 if m.parts.is_empty() {
3912 m.content.clear();
3913 } else {
3914 m.rebuild_content();
3915 }
3916 }
3917 }
3918
3919 msgs.retain(|m| !m.content.is_empty() || !m.parts.is_empty());
3921
3922 if orphans_removed > 0 {
3923 tracing::debug!(
3924 orphans = orphans_removed,
3925 "[subagent] pruned orphaned ToolUse/ToolResult parts from parent context boundary"
3926 );
3927 }
3928}
3929
3930fn sanitize_parent_messages(
3936 mut msgs: Vec<zeph_llm::provider::Message>,
3937 sanitizer: &zeph_sanitizer::ContentSanitizer,
3938 source: &zeph_sanitizer::ContentSource,
3939) -> Vec<zeph_llm::provider::Message> {
3940 use zeph_llm::provider::MessagePart;
3941 for msg in &mut msgs {
3942 let mut changed = false;
3943 for part in &mut msg.parts {
3944 if let MessagePart::Text { text } = part {
3945 let clean = sanitizer.sanitize(text, source.clone());
3946 if clean.body != *text {
3947 *text = clean.body;
3948 changed = true;
3949 }
3950 }
3951 }
3952 if changed {
3953 msg.rebuild_content();
3954 }
3955 }
3956 msgs
3957}
3958
3959struct McpManagerDispatch(Arc<zeph_mcp::McpManager>);
3964
3965impl zeph_subagent::McpDispatch for McpManagerDispatch {
3966 fn call_tool<'a>(
3967 &'a self,
3968 server: &'a str,
3969 tool: &'a str,
3970 args: serde_json::Value,
3971 ) -> std::pin::Pin<
3972 Box<dyn std::future::Future<Output = Result<serde_json::Value, String>> + Send + 'a>,
3973 > {
3974 Box::pin(async move {
3975 self.0
3976 .call_tool(server, tool, args)
3977 .await
3978 .map(|result| {
3979 let texts: Vec<serde_json::Value> = result
3981 .content
3982 .iter()
3983 .filter_map(|c| {
3984 if let rmcp::model::RawContent::Text(t) = &c.raw {
3985 Some(serde_json::Value::String(t.text.clone()))
3986 } else {
3987 None
3988 }
3989 })
3990 .collect();
3991 serde_json::Value::Array(texts)
3992 })
3993 .map_err(|e| e.to_string())
3994 })
3995 }
3996}
3997
3998pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
3999 while !*rx.borrow_and_update() {
4000 if rx.changed().await.is_err() {
4001 std::future::pending::<()>().await;
4002 }
4003 }
4004}
4005
4006pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
4007 match rx {
4008 Some(inner) => {
4009 if let Some(v) = inner.recv().await {
4010 Some(v)
4011 } else {
4012 *rx = None;
4013 std::future::pending().await
4014 }
4015 }
4016 None => std::future::pending().await,
4017 }
4018}
4019
4020fn truncate_shell_command(cmd: &str) -> String {
4027 if cmd.len() <= 80 {
4028 return cmd.to_owned();
4029 }
4030 let end = cmd.floor_char_boundary(79);
4031 format!("{}…", &cmd[..end])
4032}
4033
4034fn truncate_shell_run_id(id: &str) -> String {
4036 id.chars().take(8).collect()
4037}
4038
4039pub(crate) fn resolve_context_budget(config: &Config, provider: &AnyProvider) -> usize {
4040 let tokens = if config.memory.auto_budget && config.memory.context_budget_tokens == 0 {
4041 if let Some(ctx_size) = provider.context_window() {
4042 tracing::info!(
4043 model_context = ctx_size,
4044 "auto-configured context budget on reload"
4045 );
4046 ctx_size
4047 } else {
4048 0
4049 }
4050 } else {
4051 config.memory.context_budget_tokens
4052 };
4053 if tokens == 0 {
4054 tracing::warn!(
4055 "context_budget_tokens resolved to 0 on reload — using fallback of 128000 tokens"
4056 );
4057 128_000
4058 } else {
4059 tokens
4060 }
4061}
4062
4063#[cfg(test)]
4064mod tests;
4065
4066#[cfg(test)]
4067pub(crate) use tests::agent_tests;
4068
4069#[cfg(test)]
4070mod test_stubs {
4071 use std::pin::Pin;
4072
4073 use zeph_commands::{
4074 CommandContext, CommandError, CommandHandler, CommandOutput, SlashCategory,
4075 };
4076
4077 pub(super) struct TestErrorCommand;
4083
4084 impl CommandHandler<CommandContext<'_>> for TestErrorCommand {
4085 fn name(&self) -> &'static str {
4086 "/test-error"
4087 }
4088
4089 fn description(&self) -> &'static str {
4090 "Test stub: always returns CommandError"
4091 }
4092
4093 fn category(&self) -> SlashCategory {
4094 SlashCategory::Session
4095 }
4096
4097 fn handle<'a>(
4098 &'a self,
4099 _ctx: &'a mut CommandContext<'_>,
4100 _args: &'a str,
4101 ) -> Pin<
4102 Box<dyn std::future::Future<Output = Result<CommandOutput, CommandError>> + Send + 'a>,
4103 > {
4104 Box::pin(async { Err(CommandError::new("boom")) })
4105 }
4106 }
4107}