1mod acp_commands;
10mod agent_access_impl;
11pub(crate) mod agent_supervisor;
12mod autodream;
13mod autonomous_turn;
14mod builder;
15pub(crate) mod channel_impl;
16#[cfg(feature = "cocoon")]
17mod cocoon_cmd;
18mod command_context_impls;
19pub(super) mod compression_feedback;
20mod context;
21mod context_impls;
22pub(crate) mod context_manager;
23mod corrections;
24pub mod error;
25mod experiment_cmd;
26pub(crate) mod focus;
27mod heuristic_promotion;
28mod index;
29mod learning;
30pub(crate) mod learning_engine;
31mod log_commands;
32mod loop_event;
33mod lsp_commands;
34mod magic_docs;
35mod mcp;
36pub(crate) mod memcot;
37mod message_queue;
38mod microcompact;
39mod model_commands;
40mod persistence;
41#[cfg(feature = "scheduler")]
42mod plan;
43mod policy_commands;
44mod provider_cmd;
45mod quality_hook;
46pub(crate) mod rate_limiter;
47#[cfg(feature = "scheduler")]
48mod scheduler_commands;
49#[cfg(feature = "scheduler")]
50mod scheduler_loop;
51mod scope_commands;
52pub mod session_config;
53mod session_digest;
54pub mod shadow_sentinel;
55pub(crate) mod sidequest;
56mod skill_management;
57pub mod slash_commands;
58pub mod speculative;
59pub(crate) mod state;
60pub(crate) mod task_injection;
61pub(crate) mod tool_execution;
62pub(crate) mod tool_orchestrator;
63mod trace_extraction;
64pub mod trajectory;
65mod trajectory_commands;
66mod trust_commands;
67pub mod turn;
68mod utils;
69pub(crate) mod vigil;
70
71use std::collections::{HashMap, VecDeque};
72use std::fmt::Write as _;
73use std::sync::Arc;
74
75use parking_lot::RwLock;
76
77use tokio::sync::{mpsc, watch};
78use tokio_util::sync::CancellationToken;
79use zeph_llm::any::AnyProvider;
80use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
81use zeph_memory::TokenCounter;
82use zeph_memory::semantic::SemanticMemory;
83use zeph_skills::loader::Skill;
84use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
85use zeph_skills::prompt::format_skills_prompt;
86use zeph_skills::registry::SkillRegistry;
87use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
88
89use tracing::Instrument as _;
90
91use crate::channel::Channel;
92use crate::config::Config;
93use crate::context::{ContextBudget, build_system_prompt};
94use zeph_common::text::estimate_tokens;
95
96use loop_event::LoopEvent;
97use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, detect_image_mime};
98use state::MessageState;
99
100pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
101pub(crate) use zeph_agent_context::helpers::CODE_CONTEXT_PREFIX;
105pub(crate) const SCHEDULED_TASK_PREFIX: &str = "Execute the following scheduled task now: ";
106pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
107
108pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
109 use std::fmt::Write;
110 let capacity = "[tool output: ".len()
111 + tool_name.len()
112 + "]\n```\n".len()
113 + body.len()
114 + TOOL_OUTPUT_SUFFIX.len();
115 let mut buf = String::with_capacity(capacity);
116 let _ = write!(
117 buf,
118 "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
119 );
120 buf
121}
122
123pub struct Agent<C: Channel> {
156 provider: AnyProvider,
158 embedding_provider: AnyProvider,
163 channel: C,
164 pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
165
166 pub(super) msg: MessageState,
168 pub(super) context_manager: context_manager::ContextManager,
169 pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
170
171 pub(super) services: state::Services,
173
174 pub(super) runtime: state::AgentRuntime,
176}
177
178enum DispatchFlow {
180 Break,
182 Continue,
184 Fallthrough,
186}
187
188impl<C: Channel> Agent<C> {
189 #[must_use]
213 pub fn new(
214 provider: AnyProvider,
215 channel: C,
216 registry: SkillRegistry,
217 matcher: Option<SkillMatcherBackend>,
218 max_active_skills: usize,
219 tool_executor: impl ToolExecutor + 'static,
220 ) -> Self {
221 let registry = Arc::new(RwLock::new(registry));
222 let embedding_provider = provider.clone();
223 Self::new_with_registry_arc(
224 provider,
225 embedding_provider,
226 channel,
227 registry,
228 matcher,
229 max_active_skills,
230 tool_executor,
231 )
232 }
233
234 #[must_use]
241 pub fn new_with_registry_arc(
242 provider: AnyProvider,
243 embedding_provider: AnyProvider,
244 channel: C,
245 registry: Arc<RwLock<SkillRegistry>>,
246 matcher: Option<SkillMatcherBackend>,
247 max_active_skills: usize,
248 tool_executor: impl ToolExecutor + 'static,
249 ) -> Self {
250 use state::{
251 AgentRuntime, CompressionState, DebugState, ExperimentState, FeedbackState, IndexState,
252 InstructionState, LifecycleState, McpState, MemoryState, MetricsState,
253 OrchestrationState, ProviderState, RuntimeConfig, SecurityState, Services,
254 SessionState, SkillState, ToolState,
255 };
256
257 debug_assert!(max_active_skills > 0, "max_active_skills must be > 0");
258 let all_skills: Vec<Skill> = {
259 let reg = registry.read();
260 reg.all_meta()
261 .iter()
262 .filter_map(|m| reg.skill(&m.name).ok())
263 .collect()
264 };
265 let empty_trust = HashMap::new();
266 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
267 let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
268 let system_prompt = build_system_prompt(&skills_prompt, None);
269 tracing::debug!(len = system_prompt.len(), "initial system prompt built");
270 tracing::trace!(prompt = %system_prompt, "full system prompt");
271
272 let initial_prompt_tokens = estimate_tokens(&system_prompt) as u64;
273 let token_counter = Arc::new(TokenCounter::new());
274
275 let services = Services {
276 memory: MemoryState::default(),
277 skill: SkillState::new(registry, matcher, max_active_skills, skills_prompt),
278 learning_engine: learning_engine::LearningEngine::new(),
279 feedback: FeedbackState::default(),
280 mcp: McpState::default(),
281 index: IndexState::default(),
282 session: SessionState::new(),
283 security: SecurityState::default(),
284 experiments: ExperimentState::new(),
285 compression: CompressionState::default(),
286 orchestration: OrchestrationState::default(),
287 focus: focus::FocusState::default(),
288 sidequest: sidequest::SidequestState::default(),
289 tool_state: ToolState::default(),
290 goal_accounting: None,
291 quality: None,
292 proactive_explorer: None,
293 promotion_engine: None,
294 taco_compressor: None,
295 speculation_engine: None,
296 autonomous: crate::goal::AutonomousDriver::new(tokio::time::Duration::from_millis(500)),
297 autonomous_registry: crate::goal::AutonomousRegistry::new(),
298 };
299
300 let runtime = AgentRuntime {
301 config: RuntimeConfig::default(),
302 lifecycle: LifecycleState::new(),
303 providers: ProviderState::new(initial_prompt_tokens),
304 metrics: MetricsState::new(token_counter),
305 debug: DebugState::default(),
306 instructions: InstructionState::default(),
307 ephemeral_plugins: Vec::new(),
308 };
309
310 Self {
311 provider,
312 embedding_provider,
313 channel,
314 tool_executor: Arc::new(tool_executor),
315 msg: MessageState {
316 messages: vec![Message {
317 role: Role::System,
318 content: system_prompt,
319 parts: vec![],
320 metadata: MessageMetadata::default(),
321 }],
322 message_queue: VecDeque::new(),
323 pending_image_parts: Vec::new(),
324 last_persisted_message_id: None,
325 deferred_db_hide_ids: Vec::new(),
326 deferred_db_summaries: Vec::new(),
327 },
328 context_manager: context_manager::ContextManager::new(),
329 tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
330 services,
331 runtime,
332 }
333 }
334
335 #[must_use]
348 pub fn into_channel(self) -> C {
349 self.channel
350 }
351
352 #[tracing::instrument(name = "core.agent.poll_subagents", skip_all, level = "debug")]
357 pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
358 let Some(mgr) = &mut self.services.orchestration.subagent_manager else {
359 return vec![];
360 };
361
362 let finished: Vec<String> = mgr
363 .statuses()
364 .into_iter()
365 .filter_map(|(id, status)| {
366 if matches!(
367 status.state,
368 zeph_subagent::SubAgentState::Completed
369 | zeph_subagent::SubAgentState::Failed
370 | zeph_subagent::SubAgentState::Canceled
371 ) {
372 Some(id)
373 } else {
374 None
375 }
376 })
377 .collect();
378
379 let mut results = vec![];
380 for task_id in finished {
381 match mgr.collect(&task_id).await {
382 Ok(result) => results.push((task_id, result)),
383 Err(e) => {
384 tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
385 }
386 }
387 }
388 results
389 }
390
391 async fn call_llm_for_session_summary(
400 &self,
401 chat_messages: &[Message],
402 ) -> Option<zeph_memory::StructuredSummary> {
403 let provider = self.resolve_background_provider(
404 &self.services.memory.compaction.shutdown_summary_provider,
405 );
406 let timeout_dur = std::time::Duration::from_secs(
407 self.services
408 .memory
409 .compaction
410 .shutdown_summary_timeout_secs,
411 );
412 match tokio::time::timeout(
413 timeout_dur,
414 provider.chat_typed_erased::<zeph_memory::StructuredSummary>(chat_messages),
415 )
416 .await
417 {
418 Ok(Ok(s)) => Some(s),
419 Ok(Err(e)) => {
420 tracing::warn!(
421 "shutdown summary: structured LLM call failed, falling back to plain: {e:#}"
422 );
423 self.plain_text_summary_fallback(&provider, chat_messages, timeout_dur)
424 .await
425 }
426 Err(_) => {
427 tracing::warn!(
428 "shutdown summary: structured LLM call timed out after {}s, falling back to plain",
429 self.services
430 .memory
431 .compaction
432 .shutdown_summary_timeout_secs
433 );
434 self.plain_text_summary_fallback(&provider, chat_messages, timeout_dur)
435 .await
436 }
437 }
438 }
439
440 async fn plain_text_summary_fallback(
441 &self,
442 provider: &zeph_llm::any::AnyProvider,
443 chat_messages: &[Message],
444 timeout_dur: std::time::Duration,
445 ) -> Option<zeph_memory::StructuredSummary> {
446 match tokio::time::timeout(timeout_dur, provider.chat(chat_messages)).await {
447 Ok(Ok(plain)) => Some(zeph_memory::StructuredSummary {
448 summary: plain,
449 key_facts: vec![],
450 entities: vec![],
451 }),
452 Ok(Err(e)) => {
453 tracing::warn!("shutdown summary: plain LLM fallback failed: {e:#}");
454 None
455 }
456 Err(_) => {
457 tracing::warn!("shutdown summary: plain LLM fallback timed out");
458 None
459 }
460 }
461 }
462
463 async fn flush_orphaned_tool_use_on_shutdown(&mut self) {
468 use zeph_llm::provider::{MessagePart, Role};
469
470 let msgs = &self.msg.messages;
474 let Some(asst_idx) = msgs.iter().rposition(|m| m.role == Role::Assistant) else {
476 return;
477 };
478 let asst_msg = &msgs[asst_idx];
479 let tool_use_ids: Vec<(&str, &str, &serde_json::Value)> = asst_msg
480 .parts
481 .iter()
482 .filter_map(|p| {
483 if let MessagePart::ToolUse { id, name, input } = p {
484 Some((id.as_str(), name.as_str(), input))
485 } else {
486 None
487 }
488 })
489 .collect();
490 if tool_use_ids.is_empty() {
491 return;
492 }
493
494 let paired_ids: std::collections::HashSet<&str> = msgs
496 .get(asst_idx + 1..)
497 .into_iter()
498 .flatten()
499 .filter(|m| m.role == Role::User)
500 .flat_map(|m| m.parts.iter())
501 .filter_map(|p| {
502 if let MessagePart::ToolResult { tool_use_id, .. } = p {
503 Some(tool_use_id.as_str())
504 } else {
505 None
506 }
507 })
508 .collect();
509
510 let unpaired: Vec<zeph_llm::provider::ToolUseRequest> = tool_use_ids
511 .iter()
512 .filter(|(id, _, _)| !paired_ids.contains(*id))
513 .map(|(id, name, input)| zeph_llm::provider::ToolUseRequest {
514 id: (*id).to_owned(),
515 name: (*name).to_owned().into(),
516 input: (*input).clone(),
517 })
518 .collect();
519
520 if unpaired.is_empty() {
521 return;
522 }
523
524 tracing::info!(
525 count = unpaired.len(),
526 "shutdown: persisting tombstone ToolResults for unpaired in-flight tool calls"
527 );
528 self.persist_cancelled_tool_results(&unpaired).await;
529 }
530
531 async fn maybe_store_shutdown_summary(&mut self) {
541 if !self.services.memory.compaction.shutdown_summary {
542 return;
543 }
544 let Some(memory) = self.services.memory.persistence.memory.clone() else {
545 return;
546 };
547 let Some(conversation_id) = self.services.memory.persistence.conversation_id else {
548 return;
549 };
550
551 match memory.has_session_summary(conversation_id).await {
553 Ok(true) => {
554 tracing::debug!("shutdown summary: session already has a summary, skipping");
555 return;
556 }
557 Ok(false) => {}
558 Err(e) => {
559 tracing::warn!("shutdown summary: failed to check existing summary: {e:#}");
560 return;
561 }
562 }
563
564 let user_count = self
566 .msg
567 .messages
568 .iter()
569 .skip(1)
570 .filter(|m| m.role == Role::User)
571 .count();
572 if user_count
573 < self
574 .services
575 .memory
576 .compaction
577 .shutdown_summary_min_messages
578 {
579 tracing::debug!(
580 user_count,
581 min = self
582 .services
583 .memory
584 .compaction
585 .shutdown_summary_min_messages,
586 "shutdown summary: too few user messages, skipping"
587 );
588 return;
589 }
590
591 let _ = self.channel.send_status("Saving session summary...").await;
593
594 let max = self
596 .services
597 .memory
598 .compaction
599 .shutdown_summary_max_messages;
600 if max == 0 {
601 tracing::debug!("shutdown summary: max_messages=0, skipping");
602 return;
603 }
604 let non_system: Vec<_> = self.msg.messages.iter().skip(1).collect();
605 let slice = if non_system.len() > max {
606 &non_system[non_system.len() - max..]
607 } else {
608 &non_system[..]
609 };
610
611 let msgs_for_prompt: Vec<(zeph_memory::MessageId, String, String)> = slice
612 .iter()
613 .map(|m| {
614 let role = match m.role {
615 Role::Assistant => "assistant".to_owned(),
616 Role::System => "system".to_owned(),
617 Role::User | _ => "user".to_owned(),
618 };
619 (zeph_memory::MessageId(0), role, m.content.clone())
620 })
621 .collect();
622
623 let prompt = zeph_memory::build_summarization_prompt(&msgs_for_prompt);
624 let chat_messages = vec![Message {
625 role: Role::User,
626 content: prompt,
627 parts: vec![],
628 metadata: MessageMetadata::default(),
629 }];
630
631 let Some(structured) = self.call_llm_for_session_summary(&chat_messages).await else {
632 let _ = self.channel.send_status("").await;
633 return;
634 };
635
636 if let Err(e) = memory
637 .store_shutdown_summary(conversation_id, &structured.summary, &structured.key_facts)
638 .await
639 {
640 tracing::warn!("shutdown summary: storage failed: {e:#}");
641 } else {
642 tracing::info!(
643 conversation_id = conversation_id.0,
644 "shutdown summary stored"
645 );
646 }
647
648 let _ = self.channel.send_status("").await;
650 }
651
652 #[tracing::instrument(name = "core.agent.shutdown", skip_all, level = "debug")]
668 pub async fn shutdown(&mut self) {
669 let _ = self.channel.send_status("Shutting down...").await;
670
671 self.provider.save_router_state().await;
673
674 if let Some(ref advisor) = self.services.orchestration.topology_advisor
676 && let Err(e) = advisor.save()
677 {
678 tracing::warn!(error = %e, "adaptorch: failed to persist state");
679 }
680
681 if let Some(ref mut mgr) = self.services.orchestration.subagent_manager {
682 mgr.shutdown_all();
683 }
684
685 if let Some(ref manager) = self.services.mcp.manager {
686 manager.shutdown_all_shared().await;
687 }
688
689 if let Some(turns) = self.context_manager.turns_since_last_hard_compaction() {
693 self.update_metrics(|m| {
694 m.compaction_turns_after_hard.push(turns);
695 });
696 self.context_manager
697 .set_turns_since_last_hard_compaction(None);
698 }
699
700 if let Some(ref tx) = self.runtime.metrics.metrics_tx {
701 let m = tx.borrow();
702 if m.filter_applications > 0 {
703 #[allow(clippy::cast_precision_loss)]
704 let pct = if m.filter_raw_tokens > 0 {
705 m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
706 } else {
707 0.0
708 };
709 tracing::info!(
710 raw_tokens = m.filter_raw_tokens,
711 saved_tokens = m.filter_saved_tokens,
712 applications = m.filter_applications,
713 "tool output filtering saved ~{} tokens ({pct:.0}%)",
714 m.filter_saved_tokens,
715 );
716 }
717 if m.compaction_hard_count > 0 {
718 tracing::info!(
719 hard_compactions = m.compaction_hard_count,
720 turns_after_hard = ?m.compaction_turns_after_hard,
721 "hard compaction trajectory"
722 );
723 }
724 }
725
726 self.flush_orphaned_tool_use_on_shutdown().await;
730
731 if let Some(ref token) = self.services.experiments.cancel {
734 token.cancel();
735 }
736 if let Some(h) = self.services.experiments.handle.take() {
737 h.abort();
738 }
739
740 if let Some(memory) = self.services.memory.persistence.memory.as_ref() {
744 memory.cancel_graph_extraction();
745 }
746
747 self.runtime.lifecycle.supervisor.abort_all();
749
750 if let Some(h) = self.services.compression.pending_task_goal.take() {
753 h.abort();
754 }
755 if let Some(h) = self.services.compression.pending_sidequest_result.take() {
756 h.abort();
757 }
758 if let Some(h) = self.services.compression.pending_subgoal.take() {
759 h.abort();
760 }
761
762 self.services.learning_engine.learning_tasks.abort_all();
764
765 if let Some(h) = self.services.learning_engine.trace_extraction_handle.take() {
768 let deadline = std::time::Duration::from_mins(2);
769 match tokio::time::timeout(deadline, h).await {
770 Ok(Ok(())) => {}
771 Ok(Err(e)) => tracing::warn!("trace_extraction: task panicked at shutdown: {e}"),
772 Err(_) => tracing::warn!(
773 "trace_extraction: timed out at shutdown ({}s), aborting",
774 deadline.as_secs()
775 ),
776 }
777 }
778
779 if let Some(h) = self
782 .services
783 .learning_engine
784 .heuristic_promotion_handle
785 .take()
786 {
787 h.abort();
788 }
789
790 if let Some(ref sentinel) = self.services.security.shadow_sentinel {
792 sentinel.drain_pending().await;
793 }
794
795 for _ in 0..4 {
800 tokio::task::yield_now().await;
801 }
802
803 self.maybe_store_shutdown_summary().await;
804 self.maybe_store_session_digest().await;
805
806 tracing::info!("agent shutdown complete");
807 }
808
809 fn refresh_subagent_metrics(&mut self) {
816 let Some(ref mgr) = self.services.orchestration.subagent_manager else {
817 return;
818 };
819 let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
820 .statuses()
821 .into_iter()
822 .map(|(id, s)| {
823 let def = mgr.agents_def(&id);
824 crate::metrics::SubAgentMetrics {
825 name: def.map_or_else(|| id[..8.min(id.len())].to_owned(), |d| d.name.clone()),
826 id: id.clone(),
827 state: format!("{:?}", s.state).to_lowercase(),
828 turns_used: s.turns_used,
829 max_turns: def.map_or(20, |d| d.permissions.max_turns),
830 background: def.is_some_and(|d| d.permissions.background),
831 elapsed_secs: s.started_at.elapsed().as_secs(),
832 permission_mode: def.map_or_else(String::new, |d| {
833 use zeph_subagent::def::PermissionMode;
834 match d.permissions.permission_mode {
835 PermissionMode::AcceptEdits => "accept_edits".into(),
836 PermissionMode::DontAsk => "dont_ask".into(),
837 PermissionMode::BypassPermissions => "bypass_permissions".into(),
838 PermissionMode::Plan => "plan".into(),
839 _ => String::new(),
840 }
841 }),
842 transcript_dir: mgr
843 .agent_transcript_dir(&id)
844 .map(|p| p.to_string_lossy().into_owned()),
845 }
846 })
847 .collect();
848 self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
849 }
850
851 async fn notify_completed_subagents(&mut self) -> Result<(), error::AgentError> {
853 let completed = self.poll_subagents().await;
854 for (task_id, result) in completed {
855 let notice = if result.is_empty() {
856 format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
857 } else {
858 format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
859 };
860 if let Err(e) = self.channel.send(¬ice).await {
861 tracing::warn!(error = %e, "failed to send sub-agent completion notice");
862 }
863 }
864 Ok(())
865 }
866
867 #[tracing::instrument(name = "core.agent.run", skip_all, level = "debug", err)]
873 #[allow(clippy::too_many_lines)] pub async fn run(&mut self) -> Result<(), error::AgentError>
875 where
876 C: 'static,
877 {
878 if let Some(mut rx) = self.runtime.lifecycle.warmup_ready.take()
879 && !*rx.borrow()
880 {
881 let _ = rx.changed().await;
882 if !*rx.borrow() {
883 tracing::warn!("model warmup did not complete successfully");
884 }
885 }
886
887 self.restore_channel_provider().await;
889
890 self.load_and_cache_session_digest().await;
892 self.maybe_send_resume_recap().await;
893
894 self.maybe_start_heuristic_promotion();
898
899 loop {
900 self.apply_provider_override();
901 self.check_tool_refresh().await;
902 self.process_pending_elicitations().await;
903 self.refresh_subagent_metrics();
904 self.notify_completed_subagents().await?;
905 self.drain_channel();
906
907 let (text, image_parts) = if let Some(queued) = self.msg.message_queue.pop_front() {
908 self.notify_queue_count().await;
909 if queued.raw_attachments.is_empty() {
910 (queued.text, queued.image_parts)
911 } else {
912 let msg = crate::channel::ChannelMessage {
913 text: queued.text,
914 attachments: queued.raw_attachments,
915 is_guest_context: false,
916 is_from_bot: false,
917 };
918 self.resolve_message(msg).await
919 }
920 } else {
921 match self.next_event().await? {
922 None | Some(LoopEvent::Shutdown) => break,
923 Some(LoopEvent::SkillReload) => {
924 self.reload_skills().await;
925 continue;
926 }
927 Some(LoopEvent::InstructionReload) => {
928 self.reload_instructions().await;
929 continue;
930 }
931 Some(LoopEvent::ConfigReload) => {
932 self.reload_config();
933 continue;
934 }
935 Some(LoopEvent::UpdateNotification(msg)) => {
936 if let Err(e) = self.channel.send(&msg).await {
937 tracing::warn!("failed to send update notification: {e}");
938 }
939 continue;
940 }
941 Some(LoopEvent::ExperimentCompleted(msg)) => {
942 self.services.experiments.cancel = None;
943 self.services.experiments.handle = None;
944 if let Err(e) = self.channel.send(&msg).await {
945 tracing::warn!("failed to send experiment completion: {e}");
946 }
947 continue;
948 }
949 Some(LoopEvent::ScheduledTask(prompt)) => {
950 let text = format!("{SCHEDULED_TASK_PREFIX}{prompt}");
951 let msg = crate::channel::ChannelMessage {
952 text,
953 attachments: Vec::new(),
954 is_guest_context: false,
955 is_from_bot: false,
956 };
957 self.drain_channel();
958 self.resolve_message(msg).await
959 }
960 Some(LoopEvent::TaskInjected(injection)) => {
961 if let Some(ref mut ls) = self.runtime.lifecycle.user_loop {
962 ls.iteration += 1;
963 tracing::info!(iteration = ls.iteration, "loop: tick");
964 }
965 let msg = crate::channel::ChannelMessage {
966 text: injection.prompt,
967 attachments: Vec::new(),
968 is_guest_context: false,
969 is_from_bot: false,
970 };
971 self.drain_channel();
972 self.resolve_message(msg).await
973 }
974 Some(LoopEvent::FileChanged(event)) => {
975 self.handle_file_changed(event).await;
976 continue;
977 }
978 Some(LoopEvent::AutonomousTick) => {
979 if let Err(e) = self.run_autonomous_turn().await {
980 tracing::warn!(error = %e, "autonomous turn error");
981 }
982 continue;
983 }
984 Some(LoopEvent::Message(msg)) => {
985 self.services.session.is_guest_context = msg.is_guest_context;
986 self.drain_channel();
987 self.resolve_message(msg).await
988 }
989 }
990 };
991
992 let trimmed = text.trim();
993
994 if trimmed.starts_with('/') {
997 let slash_urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
998 if !slash_urls.is_empty() {
999 self.services
1000 .security
1001 .user_provided_urls
1002 .write()
1003 .extend(slash_urls);
1004 }
1005 }
1006
1007 let trusted = self.channel.supports_exit();
1030 let session_impl = command_context_impls::SessionAccessImpl {
1031 supports_exit: trusted,
1032 };
1033 let mut messages_impl = command_context_impls::MessageAccessImpl {
1034 msg: &mut self.msg,
1035 tool_state: &mut self.services.tool_state,
1036 providers: &mut self.runtime.providers,
1037 metrics: &self.runtime.metrics,
1038 security: &mut self.services.security,
1039 tool_orchestrator: &mut self.tool_orchestrator,
1040 };
1041 let mut sink_adapter = crate::channel::ChannelSinkAdapter(&mut self.channel);
1043 let mut null_agent = zeph_commands::NullAgent;
1045 let registry_handled = {
1046 use zeph_commands::CommandRegistry;
1047 use zeph_commands::handlers::debug::{
1048 DebugDumpCommand, DumpFormatCommand, LogCommand,
1049 };
1050 use zeph_commands::handlers::help::HelpCommand;
1051 use zeph_commands::handlers::session::{
1052 ClearCommand, ClearQueueCommand, ExitCommand, QuitCommand, ResetCommand,
1053 };
1054
1055 let mut reg = CommandRegistry::new();
1056 reg.register(ExitCommand);
1057 reg.register(QuitCommand);
1058 reg.register(ClearCommand);
1059 reg.register(ResetCommand);
1060 reg.register(ClearQueueCommand);
1061 reg.register(LogCommand);
1062 reg.register(DebugDumpCommand);
1063 reg.register(DumpFormatCommand);
1064 reg.register(HelpCommand);
1065 #[cfg(test)]
1066 reg.register(test_stubs::TestErrorCommand);
1067
1068 let mut ctx = zeph_commands::CommandContext {
1069 sink: &mut sink_adapter,
1070 debug: &mut self.runtime.debug,
1071 messages: &mut messages_impl,
1072 session: &session_impl,
1073 agent: &mut null_agent,
1074 };
1075 reg.dispatch(&mut ctx, trimmed, trusted).await
1076 };
1077 let session_reg_missed = registry_handled.is_none();
1078 match self
1079 .apply_dispatch_result(registry_handled, trimmed, false)
1080 .await
1081 {
1082 DispatchFlow::Break => break,
1083 DispatchFlow::Continue => continue,
1084 DispatchFlow::Fallthrough => {
1085 }
1087 }
1088
1089 let mut agent_null_debug = command_context_impls::NullDebugAccess;
1095 let mut agent_null_messages = command_context_impls::NullMessageAccess;
1096 let agent_null_session = command_context_impls::NullSessionAccess;
1097 let mut agent_null_sink = zeph_commands::NullSink;
1098 let agent_result: Option<
1099 Result<zeph_commands::CommandOutput, zeph_commands::CommandError>,
1100 > = if session_reg_missed {
1101 use zeph_commands::CommandRegistry;
1102 use zeph_commands::handlers::{
1103 acp::AcpCommand,
1104 agent_cmd::AgentCommand,
1105 agents_fleet::AgentsFleetCommand,
1106 compaction::{CompactCommand, NewConversationCommand, RecapCommand},
1107 experiment::ExperimentCommand,
1108 goal::GoalCommand,
1109 loop_cmd::LoopCommand,
1110 lsp::LspCommand,
1111 mcp::McpCommand,
1112 memory::{GraphCommand, GuidelinesCommand, MemoryCommand},
1113 misc::{CacheStatsCommand, ImageCommand, NotifyTestCommand},
1114 model::{ModelCommand, ProviderCommand},
1115 plan::PlanCommand,
1116 plugins::PluginsCommand,
1117 policy::PolicyCommand,
1118 scheduler::SchedulerCommand,
1119 skill::{FeedbackCommand, SkillCommand, SkillsCommand},
1120 status::{FocusCommand, GuardrailCommand, SideQuestCommand, StatusCommand},
1121 trajectory::{ScopeCommand, TrajectoryCommand},
1122 };
1123
1124 let mut agent_reg = CommandRegistry::new();
1125 agent_reg.register(MemoryCommand);
1126 agent_reg.register(GraphCommand);
1127 agent_reg.register(GuidelinesCommand);
1128 agent_reg.register(ModelCommand);
1129 agent_reg.register(ProviderCommand);
1130 agent_reg.register(SkillCommand);
1132 agent_reg.register(SkillsCommand);
1133 agent_reg.register(FeedbackCommand);
1134 agent_reg.register(McpCommand);
1135 agent_reg.register(PolicyCommand);
1136 agent_reg.register(SchedulerCommand);
1137 agent_reg.register(LspCommand);
1138 agent_reg.register(CacheStatsCommand);
1140 agent_reg.register(ImageCommand);
1141 agent_reg.register(NotifyTestCommand);
1142 agent_reg.register(StatusCommand);
1143 agent_reg.register(GuardrailCommand);
1144 agent_reg.register(FocusCommand);
1145 agent_reg.register(SideQuestCommand);
1146 agent_reg.register(AgentCommand);
1147 agent_reg.register(AgentsFleetCommand);
1148 agent_reg.register(CompactCommand);
1150 agent_reg.register(NewConversationCommand);
1151 agent_reg.register(RecapCommand);
1152 agent_reg.register(ExperimentCommand);
1153 agent_reg.register(PlanCommand);
1154 agent_reg.register(LoopCommand);
1155 agent_reg.register(PluginsCommand);
1156 agent_reg.register(AcpCommand);
1157 #[cfg(feature = "cocoon")]
1158 agent_reg.register(zeph_commands::handlers::cocoon::CocoonCommand);
1159 agent_reg.register(TrajectoryCommand);
1160 agent_reg.register(ScopeCommand);
1161 agent_reg.register(GoalCommand);
1162
1163 let mut ctx = zeph_commands::CommandContext {
1164 sink: &mut agent_null_sink,
1165 debug: &mut agent_null_debug,
1166 messages: &mut agent_null_messages,
1167 session: &agent_null_session,
1168 agent: self,
1169 };
1170 agent_reg.dispatch(&mut ctx, trimmed, trusted).await
1172 } else {
1173 None
1174 };
1175 if let Some((cancelled_id, new_id)) = self.services.autonomous.flush_pending_start() {
1181 if let Some(cid) = cancelled_id {
1182 tracing::info!(
1183 goal_id = cid,
1184 "autonomous: previous session cancelled for new goal"
1185 );
1186 }
1187 self.sync_registry_entry();
1188 tracing::info!(goal_id = new_id, "autonomous: session started");
1189 }
1190
1191 match self
1194 .apply_dispatch_result(agent_result, trimmed, true)
1195 .await
1196 {
1197 DispatchFlow::Break => break,
1198 DispatchFlow::Continue => continue,
1199 DispatchFlow::Fallthrough => {
1200 }
1202 }
1203
1204 match self.handle_builtin_command(trimmed) {
1205 Some(true) => break,
1206 Some(false) => continue,
1207 None => {}
1208 }
1209
1210 self.process_user_message(text, image_parts).await?;
1211 }
1212
1213 self.maybe_autodream().await;
1216
1217 self.maybe_extract_skills_from_trace().await;
1219
1220 if let Some(ref mut tc) = self.runtime.debug.trace_collector {
1222 tc.finish();
1223 }
1224
1225 Ok(())
1226 }
1227
1228 async fn apply_dispatch_result(
1234 &mut self,
1235 result: Option<Result<zeph_commands::CommandOutput, zeph_commands::CommandError>>,
1236 command: &str,
1237 with_learning: bool,
1238 ) -> DispatchFlow {
1239 match result {
1240 Some(Ok(zeph_commands::CommandOutput::Exit)) => {
1241 let _ = self.channel.flush_chunks().await;
1242 DispatchFlow::Break
1243 }
1244 Some(Ok(zeph_commands::CommandOutput::Message(msg))) => {
1245 let _ = self.channel.send(&msg).await;
1246 let _ = self.channel.flush_chunks().await;
1247 if with_learning {
1248 self.maybe_trigger_post_command_learning(command).await;
1249 }
1250 DispatchFlow::Continue
1251 }
1252 Some(Ok(_)) => {
1253 let _ = self.channel.flush_chunks().await;
1254 DispatchFlow::Continue
1255 }
1256 Some(Err(e)) => {
1257 let _ = self.channel.send(&e.to_string()).await;
1258 let _ = self.channel.flush_chunks().await;
1259 tracing::warn!(command = %command, error = %e.0, "slash command failed");
1260 DispatchFlow::Continue
1261 }
1262 None => DispatchFlow::Fallthrough,
1263 }
1264 }
1265
1266 fn apply_provider_override(&mut self) {
1268 if let Some(ref slot) = self.runtime.providers.provider_override
1269 && let Some(new_provider) = slot.write().take()
1270 {
1271 tracing::debug!(provider = new_provider.name(), "ACP model override applied");
1272 self.provider = new_provider;
1273 }
1274 }
1275
1276 #[tracing::instrument(name = "core.agent.next_event", skip_all, level = "debug", err)]
1284 async fn next_event(&mut self) -> Result<Option<LoopEvent>, error::AgentError> {
1285 let event = tokio::select! {
1286 result = self.channel.recv() => {
1287 return Ok(result?.map(LoopEvent::Message));
1288 }
1289 () = shutdown_signal(&mut self.runtime.lifecycle.shutdown) => {
1290 tracing::info!("shutting down");
1291 LoopEvent::Shutdown
1292 }
1293 Some(_) = recv_optional(&mut self.services.skill.skill_reload_rx) => {
1294 LoopEvent::SkillReload
1295 }
1296 Some(_) = recv_optional(&mut self.runtime.instructions.reload_rx) => {
1297 LoopEvent::InstructionReload
1298 }
1299 Some(_) = recv_optional(&mut self.runtime.lifecycle.config_reload_rx) => {
1300 LoopEvent::ConfigReload
1301 }
1302 Some(msg) = recv_optional(&mut self.runtime.lifecycle.update_notify_rx) => {
1303 LoopEvent::UpdateNotification(msg)
1304 }
1305 Some(msg) = recv_optional(&mut self.services.experiments.notify_rx) => {
1306 LoopEvent::ExperimentCompleted(msg)
1307 }
1308 Some(prompt) = recv_optional(&mut self.runtime.lifecycle.custom_task_rx) => {
1309 tracing::info!("scheduler: injecting custom task as agent turn");
1310 LoopEvent::ScheduledTask(prompt)
1311 }
1312 () = async {
1313 if let Some(ref mut ls) = self.runtime.lifecycle.user_loop {
1314 if ls.cancel_tx.is_cancelled() {
1315 std::future::pending::<()>().await;
1316 } else {
1317 ls.interval.tick().await;
1318 }
1319 } else {
1320 std::future::pending::<()>().await;
1321 }
1322 } => {
1323 let Some(ls) = self.runtime.lifecycle.user_loop.as_ref() else {
1327 return Ok(None);
1328 };
1329 if ls.cancel_tx.is_cancelled() {
1330 self.runtime.lifecycle.user_loop = None;
1331 return Ok(None);
1332 }
1333 let prompt = ls.prompt.clone();
1334 LoopEvent::TaskInjected(task_injection::TaskInjection { prompt })
1335 }
1336 Some(event) = recv_optional(&mut self.runtime.lifecycle.file_changed_rx) => {
1337 LoopEvent::FileChanged(event)
1338 }
1339 () = self.services.autonomous.next_tick(),
1341 if self.services.autonomous.should_tick() => {
1342 LoopEvent::AutonomousTick
1343 }
1344 };
1345 Ok(Some(event))
1346 }
1347
1348 #[tracing::instrument(name = "core.agent.resolve_message", skip_all, level = "debug")]
1349 async fn resolve_message(
1350 &self,
1351 msg: crate::channel::ChannelMessage,
1352 ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
1353 use crate::channel::{Attachment, AttachmentKind};
1354 use zeph_llm::provider::{ImageData, MessagePart};
1355
1356 let text_base = msg.text.clone();
1357
1358 let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
1359 .attachments
1360 .into_iter()
1361 .partition(|a| a.kind == AttachmentKind::Audio);
1362
1363 tracing::debug!(
1364 audio = audio_attachments.len(),
1365 has_stt = self.runtime.providers.stt.is_some(),
1366 "resolve_message attachments"
1367 );
1368
1369 let text = if !audio_attachments.is_empty()
1370 && let Some(stt) = self.runtime.providers.stt.as_ref()
1371 {
1372 let mut transcribed_parts = Vec::new();
1373 for attachment in &audio_attachments {
1374 if attachment.data.len() > MAX_AUDIO_BYTES {
1375 tracing::warn!(
1376 size = attachment.data.len(),
1377 max = MAX_AUDIO_BYTES,
1378 "audio attachment exceeds size limit, skipping"
1379 );
1380 continue;
1381 }
1382 match stt
1383 .transcribe(&attachment.data, attachment.filename.as_deref())
1384 .await
1385 {
1386 Ok(result) => {
1387 tracing::info!(
1388 len = result.text.len(),
1389 language = ?result.language,
1390 "audio transcribed"
1391 );
1392 transcribed_parts.push(result.text);
1393 }
1394 Err(e) => {
1395 tracing::error!(error = %e, "audio transcription failed");
1396 }
1397 }
1398 }
1399 if transcribed_parts.is_empty() {
1400 text_base
1401 } else {
1402 let transcribed = transcribed_parts.join("\n");
1403 if text_base.is_empty() {
1404 transcribed
1405 } else {
1406 format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
1407 }
1408 }
1409 } else {
1410 if !audio_attachments.is_empty() {
1411 tracing::warn!(
1412 count = audio_attachments.len(),
1413 "audio attachments received but no STT provider configured, dropping"
1414 );
1415 }
1416 text_base
1417 };
1418
1419 let mut image_parts = Vec::new();
1420 for attachment in image_attachments {
1421 if attachment.data.len() > MAX_IMAGE_BYTES {
1422 tracing::warn!(
1423 size = attachment.data.len(),
1424 max = MAX_IMAGE_BYTES,
1425 "image attachment exceeds size limit, skipping"
1426 );
1427 continue;
1428 }
1429 let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
1430 image_parts.push(MessagePart::Image(Box::new(ImageData {
1431 data: attachment.data,
1432 mime_type,
1433 })));
1434 }
1435
1436 (text, image_parts)
1437 }
1438
1439 fn begin_turn(&mut self, input: turn::TurnInput) -> turn::Turn {
1446 let id = turn::TurnId(self.runtime.debug.iteration_counter as u64);
1447 self.runtime.debug.iteration_counter += 1;
1448 let cancel_token = CancellationToken::new();
1449 self.runtime.lifecycle.cancel_token = cancel_token.clone();
1451 self.services.security.user_provided_urls.write().clear();
1452 self.runtime.lifecycle.turn_llm_requests = 0;
1454
1455 {
1458 use zeph_memory::shadow::{AuditSignalType as MageSignal, Severity as MageSev};
1459 let pending: Vec<u8> = {
1460 let mut q = self.services.security.trajectory_signal_queue.lock();
1461 std::mem::take(&mut *q)
1462 };
1463 self.services.security.mage_accumulator.advance_turn();
1464 for code in pending {
1465 self.services
1466 .security
1467 .trajectory
1468 .record(crate::agent::trajectory::RiskSignal::from_code(code));
1469 let mage_signal: Option<(MageSignal, MageSev)> = match code {
1472 1 => Some((MageSignal::PolicyViolation, MageSev::Medium)),
1473 2 => Some((MageSignal::ToolChainAnomaly, MageSev::Medium)),
1474 6 => Some((MageSignal::PromptInjectionPattern, MageSev::Medium)),
1475 7 => Some((MageSignal::PromptInjectionPattern, MageSev::High)),
1476 _ => None,
1477 };
1478 if let Some((sig, sev)) = mage_signal {
1479 self.services.security.mage_accumulator.ingest(sig, sev);
1480 }
1481 }
1482 }
1483 if self.services.security.trajectory.advance_turn()
1486 && let Some(logger) = self.tool_orchestrator.audit_logger.clone()
1487 {
1488 let entry = zeph_tools::AuditEntry {
1489 timestamp: zeph_tools::chrono_now(),
1490 tool: "<sentinel>".to_owned().into(),
1491 command: String::new(),
1492 result: zeph_tools::AuditResult::Success,
1493 duration_ms: 0,
1494 error_category: Some("trajectory_auto_recover".to_owned()),
1495 error_domain: Some("security".to_owned()),
1496 error_phase: None,
1497 claim_source: None,
1498 mcp_server_id: None,
1499 injection_flagged: false,
1500 embedding_anomalous: false,
1501 cross_boundary_mcp_to_acp: false,
1502 adversarial_policy_decision: None,
1503 exit_code: None,
1504 truncated: false,
1505 caller_id: None,
1506 skill_name: None,
1507 policy_match: None,
1508 correlation_id: None,
1509 vigil_risk: None,
1510 execution_env: None,
1511 resolved_cwd: None,
1512 scope_at_definition: None,
1513 scope_at_dispatch: None,
1514 };
1515 self.runtime.lifecycle.supervisor.spawn(
1516 crate::agent::agent_supervisor::TaskClass::Telemetry,
1517 "trajectory-auto-recover-audit",
1518 async move { logger.log(&entry).await },
1519 );
1520 }
1521 if let Some(ref sentinel) = self.services.security.shadow_sentinel {
1523 sentinel.advance_turn();
1524 }
1525 if let Some(ref acc) = self.services.security.risk_chain_accumulator {
1527 acc.reset();
1528 }
1529 let risk_level = self.services.security.trajectory.current_risk();
1531 *self.services.security.trajectory_risk_slot.write() = u8::from(risk_level);
1532 if let Some(alert) = self.services.security.trajectory.poll_alert() {
1534 let msg = format!(
1535 "[trajectory] Risk level: {:?} (score={:.2})",
1536 alert.level, alert.score
1537 );
1538 tracing::warn!(
1539 level = ?alert.level,
1540 score = alert.score,
1541 "trajectory sentinel alert"
1542 );
1543 if let Some(ref tx) = self.services.session.status_tx {
1544 let _ = tx.send(msg);
1545 }
1546 }
1547
1548 let context = turn::TurnContext::new(id, cancel_token, self.runtime.config.timeouts)
1549 .with_tool_allowlist(self.runtime.config.channel_tool_allowlist.clone());
1550 turn::Turn::new(context, input)
1551 }
1552
1553 fn end_turn(&mut self, turn: turn::Turn) {
1560 self.runtime.metrics.pending_timings = turn.metrics.timings;
1561 self.flush_turn_timings();
1562 self.services.session.current_turn_intent = None;
1564 self.services.session.is_guest_context = false;
1566 if let Some(ref engine) = self.services.speculation_engine {
1568 let metrics = engine.end_turn();
1569 if metrics.committed > 0 || metrics.cancelled > 0 {
1570 tracing::debug!(
1571 committed = metrics.committed,
1572 cancelled = metrics.cancelled,
1573 wasted_ms = metrics.wasted_ms,
1574 "speculation: turn boundary metrics"
1575 );
1576 }
1577 }
1578 }
1579
1580 #[tracing::instrument(
1581 name = "core.agent.process_user_message",
1582 skip_all,
1583 level = "debug",
1584 fields(turn_id),
1585 err
1586 )]
1587 async fn process_user_message(
1588 &mut self,
1589 text: String,
1590 image_parts: Vec<zeph_llm::provider::MessagePart>,
1591 ) -> Result<(), error::AgentError> {
1592 let input = turn::TurnInput::new(text, image_parts);
1593 let mut t = self.begin_turn(input);
1594
1595 let turn_idx = usize::try_from(t.id().0).unwrap_or(usize::MAX);
1596 tracing::Span::current().record("turn_id", t.id().0);
1597 self.runtime
1599 .debug
1600 .start_iteration_span(turn_idx, t.input.text.trim());
1601
1602 let result = Box::pin(self.process_user_message_inner(&mut t)).await;
1603
1604 let span_status = if result.is_ok() {
1606 crate::debug_dump::trace::SpanStatus::Ok
1607 } else {
1608 crate::debug_dump::trace::SpanStatus::Error {
1609 message: "iteration failed".to_owned(),
1610 }
1611 };
1612 self.runtime.debug.end_iteration_span(turn_idx, span_status);
1613
1614 self.end_turn(t);
1615 result
1616 }
1617
1618 #[tracing::instrument(
1619 name = "core.agent.process_user_message_inner",
1620 skip_all,
1621 level = "debug",
1622 err
1623 )]
1624 async fn process_user_message_inner(
1625 &mut self,
1626 turn: &mut turn::Turn,
1627 ) -> Result<(), error::AgentError> {
1628 self.reap_background_tasks_and_update_metrics();
1629
1630 let tokens_before_turn = self
1631 .runtime
1632 .metrics
1633 .metrics_tx
1634 .as_ref()
1635 .map_or(0, |tx| tx.borrow().total_tokens);
1636
1637 self.drain_background_completions();
1641
1642 self.wire_cancel_bridge(turn.cancel_token());
1643
1644 let text = turn.input.text.clone();
1646 let trimmed_owned = text.trim().to_owned();
1647 let trimmed = trimmed_owned.as_str();
1648
1649 if self.services.security.vigil.is_some() {
1652 let intent_len = trimmed.floor_char_boundary(1024.min(trimmed.len()));
1653 self.services.session.current_turn_intent = Some(trimmed[..intent_len].to_owned());
1654 }
1655
1656 if let Some(result) = self.dispatch_slash_command(trimmed).await {
1657 return result;
1658 }
1659
1660 self.check_pending_rollbacks().await;
1661
1662 if self.pre_process_security(trimmed).await? {
1663 return Ok(());
1664 }
1665
1666 let t_ctx = std::time::Instant::now();
1667 tracing::debug!("turn timing: prepare_context start");
1668 self.advance_context_lifecycle_guarded(&text, trimmed).await;
1669 turn.metrics_mut().timings.prepare_context_ms =
1670 u64::try_from(t_ctx.elapsed().as_millis()).unwrap_or(u64::MAX);
1671 tracing::debug!(
1672 ms = turn.metrics_snapshot().timings.prepare_context_ms,
1673 "turn timing: prepare_context done"
1674 );
1675 let _ = self
1677 .channel
1678 .send_context_estimate(
1679 usize::try_from(self.runtime.providers.cached_prompt_tokens).unwrap_or(usize::MAX),
1680 )
1681 .await;
1682
1683 let image_parts = std::mem::take(&mut turn.input.image_parts);
1684 let merged_text = self.build_user_message_text_with_bg_completions(&text);
1688 let user_msg = self.build_user_message(&merged_text, image_parts);
1689
1690 let urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
1693 if !urls.is_empty() {
1694 self.services
1695 .security
1696 .user_provided_urls
1697 .write()
1698 .extend(urls);
1699 }
1700
1701 self.services.memory.extraction.goal_text = Some(text.clone());
1704
1705 let t_persist = std::time::Instant::now();
1706 tracing::debug!("turn timing: persist_message(user) start");
1707 self.persist_message(Role::User, &text, &[], false).await;
1709 turn.metrics_mut().timings.persist_message_ms =
1710 u64::try_from(t_persist.elapsed().as_millis()).unwrap_or(u64::MAX);
1711 tracing::debug!(
1712 ms = turn.metrics_snapshot().timings.persist_message_ms,
1713 "turn timing: persist_message(user) done"
1714 );
1715 self.push_message(user_msg);
1716
1717 let context_estimate = self.runtime.providers.cached_prompt_tokens;
1719 self.update_metrics(|m| m.context_tokens = context_estimate);
1720
1721 tracing::debug!("turn timing: process_response start");
1724 let turn_had_error = if let Err(e) = self.process_response().await {
1725 self.services.learning_engine.learning_tasks.detach_all();
1727 tracing::error!("Response processing failed: {e:#}");
1728
1729 if e.is_no_providers() {
1732 self.runtime.lifecycle.last_no_providers_at = Some(std::time::Instant::now());
1733 let backoff_secs = self.runtime.config.timeouts.no_providers_backoff_secs;
1734 tracing::warn!(
1735 backoff_secs,
1736 "no providers available; backing off before next turn"
1737 );
1738 tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await;
1739 }
1740
1741 let user_msg = format!("Error: {e:#}");
1742 self.channel.send(&user_msg).await?;
1743 self.msg.messages.pop();
1744 self.recompute_prompt_tokens();
1745 self.channel.flush_chunks().await?;
1746 true
1747 } else {
1748 self.services.learning_engine.learning_tasks.detach_all();
1751 self.truncate_old_tool_results();
1752 self.maybe_update_magic_docs();
1754 self.maybe_spawn_promotion_scan();
1756 false
1757 };
1758 tracing::debug!("turn timing: process_response done");
1759
1760 if let Some(pipeline) = self.services.quality.clone() {
1762 self.run_self_check_for_turn(pipeline, turn.id().0).await;
1763 }
1764 let _ = self.channel.flush_chunks().await;
1769
1770 self.maybe_fire_completion_notification(turn, turn_had_error);
1771
1772 self.flush_goal_accounting(tokens_before_turn);
1773
1774 turn.metrics_mut().timings.llm_chat_ms = self.runtime.metrics.pending_timings.llm_chat_ms;
1779 turn.metrics_mut().timings.tool_exec_ms = self.runtime.metrics.pending_timings.tool_exec_ms;
1780
1781 Ok(())
1782 }
1783
1784 fn wire_cancel_bridge(&mut self, turn_token: &tokio_util::sync::CancellationToken) {
1790 let signal = Arc::clone(&self.runtime.lifecycle.cancel_signal);
1791 let token = turn_token.clone();
1792 self.runtime.lifecycle.cancel_token = turn_token.clone();
1794 if let Some(prev) = self.runtime.lifecycle.cancel_bridge_handle.take() {
1795 prev.abort();
1796 }
1797 self.runtime.lifecycle.cancel_bridge_handle =
1798 Some(self.runtime.lifecycle.task_supervisor.spawn_oneshot(
1799 std::sync::Arc::from("agent.lifecycle.cancel_bridge"),
1800 move || async move {
1801 signal.notified().await;
1802 token.cancel();
1803 },
1804 ));
1805 }
1806
1807 fn reap_background_tasks_and_update_metrics(&mut self) {
1811 let bg_signal = self.runtime.lifecycle.supervisor.reap();
1812 if bg_signal.did_summarize {
1813 self.services.memory.persistence.unsummarized_count = 0;
1814 tracing::debug!("background summarization completed; unsummarized_count reset");
1815 }
1816 let snap = self.runtime.lifecycle.supervisor.metrics_snapshot();
1817 self.update_metrics(|m| {
1818 m.bg_inflight = snap.inflight as u64;
1819 m.bg_dropped = snap.total_dropped();
1820 m.bg_completed = snap.total_completed();
1821 m.bg_enrichment_inflight = snap.class_inflight[0] as u64;
1822 m.bg_telemetry_inflight = snap.class_inflight[1] as u64;
1823 });
1824
1825 if self.runtime.lifecycle.shell_executor_handle.is_some() {
1827 let shell_rows: Vec<crate::metrics::ShellBackgroundRunRow> = self
1828 .runtime
1829 .lifecycle
1830 .shell_executor_handle
1831 .as_ref()
1832 .map(|e| e.background_runs_snapshot())
1833 .unwrap_or_default()
1834 .into_iter()
1835 .map(|s| crate::metrics::ShellBackgroundRunRow {
1836 run_id: truncate_shell_run_id(&s.run_id),
1837 command: truncate_shell_command(&s.command),
1838 elapsed_secs: s.elapsed_ms / 1000,
1839 })
1840 .collect();
1841 self.update_metrics(|m| {
1842 m.shell_background_runs = shell_rows;
1843 });
1844 }
1845
1846 if self
1849 .runtime
1850 .config
1851 .supervisor_config
1852 .abort_enrichment_on_turn
1853 {
1854 self.runtime
1855 .lifecycle
1856 .supervisor
1857 .abort_class(agent_supervisor::TaskClass::Enrichment);
1858 }
1859 }
1860
1861 fn maybe_fire_completion_notification(&mut self, turn: &turn::Turn, is_error: bool) {
1874 let snap = turn.metrics_snapshot().timings.clone();
1875 let duration_ms = snap
1876 .prepare_context_ms
1877 .saturating_add(snap.llm_chat_ms)
1878 .saturating_add(snap.tool_exec_ms);
1879 let summary = crate::notifications::TurnSummary {
1880 duration_ms,
1881 preview: self.last_assistant_preview(160),
1882 tool_calls: 0,
1884 llm_requests: self.runtime.lifecycle.turn_llm_requests,
1885 exit_status: if is_error {
1886 crate::notifications::TurnExitStatus::Error
1887 } else {
1888 crate::notifications::TurnExitStatus::Success
1889 },
1890 };
1891
1892 let gate_ok = self
1894 .runtime
1895 .lifecycle
1896 .notifier
1897 .as_ref()
1898 .is_none_or(|n| n.should_fire(&summary));
1899
1900 if let Some(ref notifier) = self.runtime.lifecycle.notifier
1902 && gate_ok
1903 {
1904 notifier.fire(&summary);
1905 }
1906
1907 let hooks = self.services.session.hooks_config.turn_complete.clone();
1912 if !hooks.is_empty() && gate_ok {
1913 let mut env = std::collections::HashMap::new();
1914 env.insert(
1915 "ZEPH_TURN_DURATION_MS".to_owned(),
1916 summary.duration_ms.to_string(),
1917 );
1918 env.insert(
1919 "ZEPH_TURN_STATUS".to_owned(),
1920 if is_error { "error" } else { "success" }.to_owned(),
1921 );
1922 env.insert("ZEPH_TURN_PREVIEW".to_owned(), summary.preview.clone());
1923 env.insert(
1924 "ZEPH_TURN_LLM_REQUESTS".to_owned(),
1925 summary.llm_requests.to_string(),
1926 );
1927 let dispatch = self.mcp_dispatch();
1928 let _span = tracing::info_span!("core.agent.turn_hooks").entered();
1929 let _accepted = self.runtime.lifecycle.supervisor.spawn(
1930 agent_supervisor::TaskClass::Telemetry,
1931 "turn-complete-hooks",
1932 async move {
1933 let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
1934 .as_ref()
1935 .map(|d| d as &dyn zeph_subagent::McpDispatch);
1936 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp, None).await
1937 {
1938 tracing::warn!(error = %e, "turn_complete hook failed");
1939 }
1940 },
1941 );
1942 }
1943 }
1944
1945 fn flush_goal_accounting(&mut self, tokens_before: u64) {
1948 let goal_snap = self
1949 .services
1950 .goal_accounting
1951 .as_ref()
1952 .and_then(|a| a.snapshot());
1953 self.update_metrics(|m| m.active_goal = goal_snap);
1954
1955 if let Some(ref accounting) = self.services.goal_accounting {
1956 let tokens_after = self
1957 .runtime
1958 .metrics
1959 .metrics_tx
1960 .as_ref()
1961 .map_or(0, |tx| tx.borrow().total_tokens);
1962 let turn_tokens = tokens_after.saturating_sub(tokens_before);
1963 let mut spawned: Option<
1964 std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + 'static>>,
1965 > = None;
1966 accounting.on_turn_complete(turn_tokens, |fut| {
1967 spawned = Some(fut);
1968 });
1969 if let Some(fut) = spawned {
1970 let _ = self.runtime.lifecycle.supervisor.spawn(
1971 agent_supervisor::TaskClass::Telemetry,
1972 "goal-accounting",
1973 fut,
1974 );
1975 }
1976 }
1977 }
1978
1979 #[tracing::instrument(
1981 name = "core.agent.pre_process_security",
1982 skip_all,
1983 level = "debug",
1984 err
1985 )]
1986 async fn pre_process_security(&mut self, trimmed: &str) -> Result<bool, error::AgentError> {
1987 if let Some(ref guardrail) = self.services.security.guardrail {
1989 use zeph_sanitizer::guardrail::GuardrailVerdict;
1990 let verdict = guardrail.check(trimmed).await;
1991 match &verdict {
1992 GuardrailVerdict::Flagged { reason, .. } => {
1993 tracing::warn!(
1994 reason = %reason,
1995 should_block = verdict.should_block(),
1996 "guardrail flagged user input"
1997 );
1998 if verdict.should_block() {
1999 let msg = format!("[guardrail] Input blocked: {reason}");
2000 let _ = self.channel.send(&msg).await;
2001 let _ = self.channel.flush_chunks().await;
2002 return Ok(true);
2003 }
2004 let _ = self
2006 .channel
2007 .send(&format!("[guardrail] Warning: {reason}"))
2008 .await;
2009 }
2010 GuardrailVerdict::Error { error } => {
2011 if guardrail.error_should_block() {
2012 tracing::warn!(%error, "guardrail check failed (fail_strategy=closed), blocking input");
2013 let msg = "[guardrail] Input blocked: check failed (see logs for details)";
2014 let _ = self.channel.send(msg).await;
2015 let _ = self.channel.flush_chunks().await;
2016 return Ok(true);
2017 }
2018 tracing::warn!(%error, "guardrail check failed (fail_strategy=open), allowing input");
2019 }
2020 _ => {}
2021 }
2022 }
2023
2024 #[cfg(feature = "classifiers")]
2030 if self.services.security.sanitizer.scan_user_input() {
2031 match self
2032 .services
2033 .security
2034 .sanitizer
2035 .classify_injection(trimmed)
2036 .await
2037 {
2038 zeph_sanitizer::InjectionVerdict::Blocked => {
2039 self.push_classifier_metrics();
2040 let _ = self
2041 .channel
2042 .send("[security] Input blocked: injection detected by classifier.")
2043 .await;
2044 let _ = self.channel.flush_chunks().await;
2045 return Ok(true);
2046 }
2047 zeph_sanitizer::InjectionVerdict::Suspicious => {
2048 tracing::warn!("injection_classifier soft_signal on user input");
2049 }
2050 zeph_sanitizer::InjectionVerdict::Clean => {}
2051 _ => {}
2052 }
2053 }
2054 #[cfg(feature = "classifiers")]
2055 self.push_classifier_metrics();
2056
2057 Ok(false)
2058 }
2059
2060 async fn advance_context_lifecycle_guarded(&mut self, text: &str, trimmed: &str) {
2067 let backoff_secs = self.runtime.config.timeouts.no_providers_backoff_secs;
2068 let prep_timeout_secs = self.runtime.config.timeouts.context_prep_timeout_secs;
2069
2070 let providers_recently_failed = self
2072 .runtime
2073 .lifecycle
2074 .last_no_providers_at
2075 .is_some_and(|t| t.elapsed().as_secs() < backoff_secs);
2076
2077 if providers_recently_failed {
2078 tracing::warn!(
2079 backoff_secs,
2080 "skipping context preparation: providers were unavailable on last turn"
2081 );
2082 return;
2083 }
2084
2085 let timeout_dur = std::time::Duration::from_secs(prep_timeout_secs);
2086 match tokio::time::timeout(timeout_dur, self.advance_context_lifecycle(text, trimmed)).await
2087 {
2088 Ok(()) => {}
2089 Err(_elapsed) => {
2090 tracing::warn!(
2091 timeout_secs = prep_timeout_secs,
2092 "context preparation timed out; proceeding with degraded context"
2093 );
2094 }
2095 }
2096 }
2097
2098 #[tracing::instrument(
2099 name = "core.agent.advance_context_lifecycle",
2100 skip_all,
2101 level = "debug"
2102 )]
2103 async fn advance_context_lifecycle(&mut self, text: &str, trimmed: &str) {
2104 self.services.mcp.pruning_cache.reset();
2106
2107 let conv_id = self.services.memory.persistence.conversation_id;
2110 self.rebuild_system_prompt(text).await;
2111
2112 self.detect_and_record_corrections(trimmed, conv_id).await;
2113 self.services.learning_engine.tick();
2114 self.analyze_and_learn().await;
2115 self.sync_graph_counts().await;
2116
2117 self.context_manager
2122 .set_compaction_state(self.context_manager.compaction_state().advance_turn());
2123
2124 {
2126 self.services.focus.tick();
2127
2128 let sidequest_should_fire = self.services.sidequest.tick();
2131 if sidequest_should_fire
2132 && !self
2133 .context_manager
2134 .compaction_state()
2135 .is_compacted_this_turn()
2136 {
2137 self.maybe_sidequest_eviction();
2138 }
2139 }
2140
2141 {
2144 let cfg = &self.services.memory.extraction.graph_config.experience;
2145 if cfg.enabled
2146 && cfg.evolution_sweep_enabled
2147 && cfg.evolution_sweep_interval > 0
2148 && self
2149 .services
2150 .sidequest
2151 .turn_counter
2152 .checked_rem(cfg.evolution_sweep_interval as u64)
2153 == Some(0)
2154 && let Some(memory) = self.services.memory.persistence.memory.as_ref()
2155 && let (Some(exp), Some(graph)) =
2156 (memory.experience.as_ref(), memory.graph_store.as_ref())
2157 {
2158 let exp = std::sync::Arc::clone(exp);
2159 let graph = std::sync::Arc::clone(graph);
2160 let threshold = cfg.confidence_prune_threshold;
2161 let turn = self.services.sidequest.turn_counter;
2162 let accepted = self.runtime.lifecycle.supervisor.spawn(
2163 agent_supervisor::TaskClass::Telemetry,
2164 "experience-sweep",
2165 async move {
2166 match exp.evolution_sweep(graph.as_ref(), threshold).await {
2167 Ok(stats) => tracing::info!(
2168 turn,
2169 self_loops = stats.pruned_self_loops,
2170 low_confidence = stats.pruned_low_confidence,
2171 "evolution sweep complete",
2172 ),
2173 Err(e) => tracing::warn!(
2174 turn,
2175 error = %e,
2176 "evolution sweep failed",
2177 ),
2178 }
2179 },
2180 );
2181 if !accepted {
2182 tracing::warn!(
2183 turn = self.services.sidequest.turn_counter,
2184 "experience-sweep dropped (telemetry class at capacity)",
2185 );
2186 }
2187 }
2188 }
2189
2190 if let Some(warning) = self.cache_expiry_warning() {
2192 tracing::info!(warning, "cache expiry warning");
2193 let _ = self.channel.send_status(&warning).await;
2194 }
2195
2196 self.maybe_time_based_microcompact();
2199
2200 self.maybe_apply_deferred_summaries();
2205 self.flush_deferred_summaries().await;
2206
2207 if let Err(e) = self.maybe_proactive_compress().await {
2209 tracing::warn!("proactive compression failed: {e:#}");
2210 }
2211
2212 if let Err(e) = self.maybe_compact().await {
2213 tracing::warn!("context compaction failed: {e:#}");
2214 }
2215
2216 if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
2217 tracing::warn!("context preparation failed: {e:#}");
2218 }
2219
2220 self.provider
2222 .set_memory_confidence(self.services.memory.persistence.last_recall_confidence);
2223
2224 self.services.learning_engine.reset_reflection();
2225 }
2226
2227 fn build_user_message(
2228 &mut self,
2229 text: &str,
2230 image_parts: Vec<zeph_llm::provider::MessagePart>,
2231 ) -> Message {
2232 let mut all_image_parts = std::mem::take(&mut self.msg.pending_image_parts);
2233 all_image_parts.extend(image_parts);
2234
2235 if !all_image_parts.is_empty() && self.provider.supports_vision() {
2236 let mut parts = vec![zeph_llm::provider::MessagePart::Text {
2237 text: text.to_owned(),
2238 }];
2239 parts.extend(all_image_parts);
2240 Message::from_parts(Role::User, parts)
2241 } else {
2242 if !all_image_parts.is_empty() {
2243 tracing::warn!(
2244 count = all_image_parts.len(),
2245 "image attachments dropped: provider does not support vision"
2246 );
2247 }
2248 Message {
2249 role: Role::User,
2250 content: text.to_owned(),
2251 parts: vec![],
2252 metadata: MessageMetadata::default(),
2253 }
2254 }
2255 }
2256
2257 fn drain_background_completions(&mut self) {
2261 const BACKGROUND_COMPLETION_BUFFER_CAP: usize = 16;
2262
2263 let Some(ref mut rx) = self.runtime.lifecycle.background_completion_rx else {
2264 return;
2265 };
2266 while let Ok(completion) = rx.try_recv() {
2268 if self.runtime.lifecycle.pending_background_completions.len()
2269 >= BACKGROUND_COMPLETION_BUFFER_CAP
2270 {
2271 tracing::warn!(
2272 run_id = %completion.run_id,
2273 "background completion buffer full; dropping run result"
2274 );
2275 self.runtime
2278 .lifecycle
2279 .pending_background_completions
2280 .pop_front();
2281 self.runtime
2282 .lifecycle
2283 .pending_background_completions
2284 .push_back(zeph_tools::BackgroundCompletion {
2285 run_id: completion.run_id,
2286 exit_code: -1,
2287 success: false,
2288 elapsed_ms: 0,
2289 command: completion.command,
2290 output: format!(
2291 "[background result for run {} dropped: buffer overflow]",
2292 completion.run_id
2293 ),
2294 });
2295 } else {
2296 self.runtime
2297 .lifecycle
2298 .pending_background_completions
2299 .push_back(completion);
2300 }
2301 }
2302 }
2303
2304 fn build_user_message_text_with_bg_completions(&mut self, user_text: &str) -> String {
2308 if self
2309 .runtime
2310 .lifecycle
2311 .pending_background_completions
2312 .is_empty()
2313 {
2314 return user_text.to_owned();
2315 }
2316 let mut parts = String::new();
2317 for completion in self
2318 .runtime
2319 .lifecycle
2320 .pending_background_completions
2321 .drain(..)
2322 {
2323 let _ = write!(
2324 parts,
2325 "[Background task {} completed]\nexit_code: {}\nsuccess: {}\nelapsed_ms: {}\ncommand: {}\n\n{}\n\n",
2326 completion.run_id,
2327 completion.exit_code,
2328 completion.success,
2329 completion.elapsed_ms,
2330 completion.command,
2331 completion.output,
2332 );
2333 }
2334 parts.push_str(user_text);
2335 parts
2336 }
2337
2338 async fn poll_subagent_until_done(
2342 &mut self,
2343 task_id: &str,
2344 label: &str,
2345 ) -> Option<(String, bool)> {
2346 use zeph_subagent::SubAgentState;
2347 let result = loop {
2348 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
2349
2350 #[allow(clippy::redundant_closure_for_method_calls)]
2354 let pending = self
2355 .services
2356 .orchestration
2357 .subagent_manager
2358 .as_mut()
2359 .and_then(|m| m.try_recv_secret_request());
2360 if let Some((req_task_id, req)) = pending {
2361 let confirm_prompt = format!(
2364 "Sub-agent requests secret '{}'. Allow?",
2365 crate::text::truncate_to_chars(&req.secret_key, 100)
2366 );
2367 let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
2368 if let Some(mgr) = self.services.orchestration.subagent_manager.as_mut() {
2369 if approved {
2370 let ttl = std::time::Duration::from_mins(5);
2371 let key = req.secret_key.clone();
2372 if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
2373 let _ = mgr.deliver_secret(&req_task_id, key);
2374 }
2375 } else {
2376 let _ = mgr.deny_secret(&req_task_id);
2377 }
2378 }
2379 }
2380
2381 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2382 let statuses = mgr.statuses();
2383 let Some((_, status)) = statuses.iter().find(|(id, _)| id == task_id) else {
2384 break (format!("{label} completed (no status available)."), true);
2385 };
2386 match status.state {
2387 SubAgentState::Completed => {
2388 let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
2389 break (format!("{label} completed: {msg}"), true);
2390 }
2391 SubAgentState::Failed => {
2392 let msg = status
2393 .last_message
2394 .clone()
2395 .unwrap_or_else(|| "unknown error".into());
2396 break (format!("{label} failed: {msg}"), false);
2397 }
2398 SubAgentState::Canceled => {
2399 break (format!("{label} was cancelled."), false);
2400 }
2401 _ => {
2402 let _ = self
2403 .channel
2404 .send_status(&format!(
2405 "{label}: turn {}/{}",
2406 status.turns_used,
2407 self.services
2408 .orchestration
2409 .subagent_manager
2410 .as_ref()
2411 .and_then(|m| m.agents_def(task_id))
2412 .map_or(20, |d| d.permissions.max_turns)
2413 ))
2414 .await;
2415 }
2416 }
2417 };
2418 Some(result)
2419 }
2420
2421 fn resolve_agent_id_prefix(&mut self, prefix: &str) -> Option<Result<String, String>> {
2424 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2425 let full_ids: Vec<String> = mgr
2426 .statuses()
2427 .into_iter()
2428 .map(|(tid, _)| tid)
2429 .filter(|tid| tid.starts_with(prefix))
2430 .collect();
2431 Some(match full_ids.as_slice() {
2432 [] => Err(format!("No sub-agent with id prefix '{prefix}'")),
2433 [fid] => Ok(fid.clone()),
2434 _ => Err(format!(
2435 "Ambiguous id prefix '{prefix}': matches {} agents",
2436 full_ids.len()
2437 )),
2438 })
2439 }
2440
2441 fn handle_agent_list(&self) -> Option<String> {
2442 use std::fmt::Write as _;
2443 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2444 let defs = mgr.definitions();
2445 if defs.is_empty() {
2446 return Some("No sub-agent definitions found.".into());
2447 }
2448 let mut out = String::from("Available sub-agents:\n");
2449 for d in defs {
2450 let memory_label = match d.memory {
2451 Some(zeph_subagent::MemoryScope::User) => " [memory:user]",
2452 Some(zeph_subagent::MemoryScope::Project) => " [memory:project]",
2453 Some(zeph_subagent::MemoryScope::Local) => " [memory:local]",
2454 Some(_) => " [memory:unknown]",
2455 None => "",
2456 };
2457 if let Some(ref src) = d.source {
2458 let _ = writeln!(
2459 out,
2460 " {}{} — {} ({})",
2461 d.name, memory_label, d.description, src
2462 );
2463 } else {
2464 let _ = writeln!(out, " {}{} — {}", d.name, memory_label, d.description);
2465 }
2466 }
2467 Some(out)
2468 }
2469
2470 fn handle_agent_status(&self) -> Option<String> {
2471 use std::fmt::Write as _;
2472 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2473 let statuses = mgr.statuses();
2474 if statuses.is_empty() {
2475 return Some("No active sub-agents.".into());
2476 }
2477 let mut out = String::from("Active sub-agents:\n");
2478 for (id, s) in &statuses {
2479 let state = format!("{:?}", s.state).to_lowercase();
2480 let elapsed = s.started_at.elapsed().as_secs();
2481 let _ = writeln!(
2482 out,
2483 " [{short}] {state} turns={t} elapsed={elapsed}s {msg}",
2484 short = &id[..8.min(id.len())],
2485 t = s.turns_used,
2486 msg = s.last_message.as_deref().unwrap_or(""),
2487 );
2488 if let Some(def) = mgr.agents_def(id)
2490 && let Some(scope) = def.memory
2491 && let Ok(dir) = zeph_subagent::memory::resolve_memory_dir(scope, &def.name)
2492 {
2493 let _ = writeln!(out, " memory: {}", dir.display());
2494 }
2495 }
2496 Some(out)
2497 }
2498
2499 fn handle_agent_approve(&mut self, id: &str) -> Option<String> {
2500 let full_id = match self.resolve_agent_id_prefix(id)? {
2501 Ok(fid) => fid,
2502 Err(msg) => return Some(msg),
2503 };
2504 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2505 if let Some((tid, req)) = mgr.try_recv_secret_request()
2506 && tid == full_id
2507 {
2508 let key = req.secret_key.clone();
2509 let ttl = std::time::Duration::from_mins(5);
2510 if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
2511 return Some(format!("Approve failed: {e}"));
2512 }
2513 if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
2514 return Some(format!("Secret delivery failed: {e}"));
2515 }
2516 return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
2517 }
2518 Some(format!(
2519 "No pending secret request for sub-agent '{full_id}'."
2520 ))
2521 }
2522
2523 fn handle_agent_deny(&mut self, id: &str) -> Option<String> {
2524 let full_id = match self.resolve_agent_id_prefix(id)? {
2525 Ok(fid) => fid,
2526 Err(msg) => return Some(msg),
2527 };
2528 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2529 match mgr.deny_secret(&full_id) {
2530 Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
2531 Err(e) => Some(format!("Deny failed: {e}")),
2532 }
2533 }
2534
2535 async fn handle_agent_command(&mut self, cmd: zeph_subagent::AgentCommand) -> Option<String> {
2536 use zeph_subagent::AgentCommand;
2537
2538 match cmd {
2539 AgentCommand::List => self.handle_agent_list(),
2540 AgentCommand::Background { name, prompt } => {
2541 self.handle_agent_background(&name, &prompt)
2542 }
2543 AgentCommand::Spawn { name, prompt }
2544 | AgentCommand::Mention {
2545 agent: name,
2546 prompt,
2547 } => self.handle_agent_spawn_foreground(&name, &prompt).await,
2548 AgentCommand::Status => self.handle_agent_status(),
2549 AgentCommand::Cancel { id } => self.handle_agent_cancel(&id),
2550 AgentCommand::Approve { id } => self.handle_agent_approve(&id),
2551 AgentCommand::Deny { id } => self.handle_agent_deny(&id),
2552 AgentCommand::Resume { id, prompt } => self.handle_agent_resume(&id, &prompt).await,
2553 _ => None,
2554 }
2555 }
2556
2557 pub(crate) fn handle_agents_definitions_list(&self) -> String {
2562 use std::fmt::Write as _;
2563
2564 let Some(mgr) = self.services.orchestration.subagent_manager.as_ref() else {
2565 return String::new();
2566 };
2567 let defs = mgr.definitions();
2568 if defs.is_empty() {
2569 return String::new();
2570 }
2571 let mut out = String::from("Sub-agents:\n");
2572 for d in defs {
2573 let memory_label = match d.memory {
2574 Some(zeph_subagent::MemoryScope::User) => " [memory:user]",
2575 Some(zeph_subagent::MemoryScope::Project) => " [memory:project]",
2576 Some(zeph_subagent::MemoryScope::Local) => " [memory:local]",
2577 Some(_) => " [memory:unknown]",
2578 None => "",
2579 };
2580 if let Some(ref src) = d.source {
2581 let _ = writeln!(
2582 out,
2583 " {}{} — {} ({})",
2584 d.name, memory_label, d.description, src
2585 );
2586 } else {
2587 let _ = writeln!(out, " {}{} — {}", d.name, memory_label, d.description);
2588 }
2589 }
2590 out
2591 }
2592
2593 pub(crate) fn handle_agents_crud(&mut self, cmd: zeph_subagent::AgentsCommand) -> String {
2598 use zeph_subagent::AgentsCommand;
2599
2600 let Some(mgr) = self.services.orchestration.subagent_manager.as_ref() else {
2601 return "Sub-agent manager is not available.".to_owned();
2602 };
2603
2604 match cmd {
2605 AgentsCommand::List => self.handle_agents_definitions_list(),
2606 AgentsCommand::Show { name } => {
2607 match mgr.definitions().iter().find(|d| d.name == name) {
2608 Some(d) => format!(
2609 "Agent: {}\nDescription: {}\nSource: {}\n",
2610 d.name,
2611 d.description,
2612 d.source.as_deref().unwrap_or("unknown"),
2613 ),
2614 None => format!("No sub-agent definition named '{name}'."),
2615 }
2616 }
2617 AgentsCommand::Create { name } => {
2618 format!(
2619 "To create a sub-agent definition, create a file at `.zeph/agents/{name}.md`.\n\
2620 See the sub-agent documentation for the required frontmatter."
2621 )
2622 }
2623 AgentsCommand::Edit { name } => {
2624 format!("To edit '{name}', open its definition file in `.zeph/agents/{name}.md`.")
2625 }
2626 AgentsCommand::Delete { name } => {
2627 format!("To delete '{name}', remove the file `.zeph/agents/{name}.md`.")
2628 }
2629 _ => "Unknown agents command.".to_owned(),
2630 }
2631 }
2632
2633 fn handle_agent_background(&mut self, name: &str, prompt: &str) -> Option<String> {
2634 let provider = self.provider.clone();
2635 let tool_executor = Arc::clone(&self.tool_executor);
2636 let skills = self.filtered_skills_for(name);
2637 let cfg = self.services.orchestration.subagent_config.clone();
2638 let spawn_ctx = self.build_spawn_context(&cfg);
2639 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2640 match mgr.spawn(
2641 name,
2642 prompt,
2643 provider,
2644 tool_executor,
2645 skills,
2646 &cfg,
2647 spawn_ctx,
2648 ) {
2649 Ok(id) => Some(format!(
2650 "Sub-agent '{name}' started in background (id: {short})",
2651 short = &id[..8.min(id.len())]
2652 )),
2653 Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
2654 }
2655 }
2656
2657 async fn handle_agent_spawn_foreground(&mut self, name: &str, prompt: &str) -> Option<String> {
2658 let provider = self.provider.clone();
2659 let tool_executor = Arc::clone(&self.tool_executor);
2660 let skills = self.filtered_skills_for(name);
2661 let cfg = self.services.orchestration.subagent_config.clone();
2662 let spawn_ctx = self.build_spawn_context(&cfg);
2663 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2664 let task_id = match mgr.spawn(
2665 name,
2666 prompt,
2667 provider,
2668 tool_executor,
2669 skills,
2670 &cfg,
2671 spawn_ctx,
2672 ) {
2673 Ok(id) => id,
2674 Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
2675 };
2676 let short = task_id[..8.min(task_id.len())].to_owned();
2677 let _ = self
2678 .channel
2679 .send(&format!("Sub-agent '{name}' running... (id: {short})"))
2680 .await;
2681 let _ = self
2682 .channel
2683 .notify_foreground_subagent_started(&task_id, name)
2684 .await;
2685 let label = format!("Sub-agent '{name}'");
2686 let Some((result, success)) = self.poll_subagent_until_done(&task_id, &label).await else {
2687 let _ = self
2689 .channel
2690 .notify_foreground_subagent_completed(&task_id, name, false)
2691 .await;
2692 return None;
2693 };
2694 let _ = self
2695 .channel
2696 .notify_foreground_subagent_completed(&task_id, name, success)
2697 .await;
2698 Some(result)
2699 }
2700
2701 fn handle_agent_cancel(&mut self, id: &str) -> Option<String> {
2702 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2703 let ids: Vec<String> = mgr
2705 .statuses()
2706 .into_iter()
2707 .map(|(task_id, _)| task_id)
2708 .filter(|task_id| task_id.starts_with(id))
2709 .collect();
2710 match ids.as_slice() {
2711 [] => Some(format!("No sub-agent with id prefix '{id}'")),
2712 [full_id] => {
2713 let full_id = full_id.clone();
2714 match mgr.cancel(&full_id) {
2715 Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
2716 Err(e) => Some(format!("Cancel failed: {e}")),
2717 }
2718 }
2719 _ => Some(format!(
2720 "Ambiguous id prefix '{id}': matches {} agents",
2721 ids.len()
2722 )),
2723 }
2724 }
2725
2726 async fn handle_agent_resume(&mut self, id: &str, prompt: &str) -> Option<String> {
2727 let cfg = self.services.orchestration.subagent_config.clone();
2728 let def_name = {
2731 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2732 match mgr.def_name_for_resume(id, &cfg) {
2733 Ok(name) => name,
2734 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2735 }
2736 };
2737 let skills = self.filtered_skills_for(&def_name);
2738 let provider = self.provider.clone();
2739 let tool_executor = Arc::clone(&self.tool_executor);
2740 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2741 let (task_id, _) = match mgr.resume(id, prompt, provider, tool_executor, skills, &cfg, None)
2742 {
2743 Ok(pair) => pair,
2744 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2745 };
2746 let short = task_id[..8.min(task_id.len())].to_owned();
2747 let _ = self
2748 .channel
2749 .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
2750 .await;
2751 let _ = self
2752 .channel
2753 .notify_foreground_subagent_started(&task_id, &def_name)
2754 .await;
2755 let Some((result, success)) = self
2756 .poll_subagent_until_done(&task_id, "Resumed sub-agent")
2757 .await
2758 else {
2759 let _ = self
2761 .channel
2762 .notify_foreground_subagent_completed(&task_id, &def_name, false)
2763 .await;
2764 return None;
2765 };
2766 let _ = self
2767 .channel
2768 .notify_foreground_subagent_completed(&task_id, &def_name, success)
2769 .await;
2770 Some(result)
2771 }
2772
2773 fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
2774 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2775 let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
2776 let reg = self.services.skill.registry.read();
2777 match zeph_subagent::filter_skills(®, &def.skills) {
2778 Ok(skills) => {
2779 let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
2780 if bodies.is_empty() {
2781 None
2782 } else {
2783 Some(bodies)
2784 }
2785 }
2786 Err(e) => {
2787 tracing::warn!(error = %e, "skill filtering failed for sub-agent");
2788 None
2789 }
2790 }
2791 }
2792
2793 fn build_spawn_context(
2795 &self,
2796 cfg: &zeph_config::SubAgentConfig,
2797 ) -> zeph_subagent::SpawnContext {
2798 zeph_subagent::SpawnContext {
2799 parent_messages: self.extract_parent_messages(cfg),
2800 parent_cancel: Some(self.runtime.lifecycle.cancel_token.clone()),
2801 parent_provider_name: {
2802 let name = &self.runtime.config.active_provider_name;
2803 if name.is_empty() {
2804 None
2805 } else {
2806 Some(name.clone())
2807 }
2808 },
2809 spawn_depth: self.runtime.config.spawn_depth,
2810 mcp_tool_names: self.extract_mcp_tool_names(),
2811 seed_trajectory_score: {
2813 let child = self.services.security.trajectory.spawn_child();
2814 let score = child.score_now();
2815 if score > 0.0 { Some(score) } else { None }
2816 },
2817 content_isolation: self.runtime.config.security.content_isolation.clone(),
2818 orchestrator_name: Some("zeph".to_owned()),
2819 orchestrator_role: Some("orchestrator".to_owned()),
2820 session_mcp_servers: Vec::new(),
2821 ..Default::default()
2824 }
2825 }
2826
2827 fn extract_parent_messages(
2834 &self,
2835 config: &zeph_config::SubAgentConfig,
2836 ) -> Vec<zeph_llm::provider::Message> {
2837 use zeph_config::ParentContextPolicy;
2838 use zeph_llm::provider::Role;
2839
2840 if config.parent_context_policy == ParentContextPolicy::None
2841 || config.context_window_turns == 0
2842 {
2843 return Vec::new();
2844 }
2845
2846 let non_system: Vec<_> = self
2847 .msg
2848 .messages
2849 .iter()
2850 .filter(|m| m.role != Role::System)
2851 .cloned()
2852 .collect();
2853
2854 let take_count = config
2855 .context_window_turns
2856 .saturating_mul(2)
2857 .min(config.max_parent_messages);
2858 let start = non_system.len().saturating_sub(take_count);
2859 let mut msgs = non_system[start..].to_vec();
2860
2861 let max_chars = 128_000usize / 4;
2863 let requested = msgs.len();
2864 trim_parent_messages(&mut msgs, max_chars);
2865 if msgs.len() < requested {
2866 tracing::info!(
2867 kept = msgs.len(),
2868 requested,
2869 "[subagent] truncated parent history due to token budget or orphan pruning"
2870 );
2871 }
2872
2873 if config.parent_context_policy == ParentContextPolicy::InheritSanitized {
2874 use zeph_sanitizer::{ContentSource, ContentSourceKind};
2875 let source =
2876 ContentSource::new(ContentSourceKind::A2aMessage).with_identifier("parent_history");
2877 msgs = sanitize_parent_messages(msgs, &self.services.security.sanitizer, &source);
2878 }
2879
2880 msgs
2881 }
2882
2883 fn extract_mcp_tool_names(&self) -> Vec<String> {
2885 self.tool_executor
2886 .tool_definitions_erased()
2887 .into_iter()
2888 .filter(|t| t.id.starts_with("mcp_"))
2889 .map(|t| t.id.to_string())
2890 .collect()
2891 }
2892
2893 fn classify_source_kind(
2897 skill_dir: &std::path::Path,
2898 managed_dir: Option<&std::path::PathBuf>,
2899 bundled_names: &std::collections::HashSet<String>,
2900 ) -> zeph_memory::store::SourceKind {
2901 if managed_dir.is_some_and(|d| skill_dir.starts_with(d)) {
2902 let skill_name = skill_dir.file_name().and_then(|n| n.to_str()).unwrap_or("");
2903 let has_marker = skill_dir.join(".bundled").exists();
2904 if has_marker && bundled_names.contains(skill_name) {
2905 zeph_memory::store::SourceKind::Bundled
2906 } else {
2907 if has_marker {
2908 tracing::warn!(
2909 skill = %skill_name,
2910 "skill has .bundled marker but is not in the bundled skill \
2911 allowlist — classifying as Hub"
2912 );
2913 }
2914 zeph_memory::store::SourceKind::Hub
2915 }
2916 } else {
2917 zeph_memory::store::SourceKind::Local
2918 }
2919 }
2920
2921 async fn update_trust_for_reloaded_skills(
2923 &mut self,
2924 all_meta: &[zeph_skills::loader::SkillMeta],
2925 ) {
2926 let memory = self.services.memory.persistence.memory.clone();
2928 let Some(memory) = memory else {
2929 return;
2930 };
2931 let trust_cfg = self.services.skill.trust_config.clone();
2932 let managed_dir = self.services.skill.managed_dir.clone();
2933 let bundled_names: std::collections::HashSet<String> =
2934 zeph_skills::bundled_skill_names().into_iter().collect();
2935 for meta in all_meta {
2936 let skill_dir = meta.skill_dir.clone();
2939 let managed_dir_ref = managed_dir.clone();
2940 let bundled_names_ref = bundled_names.clone();
2941 let fs_result: Option<(String, zeph_memory::store::SourceKind)> =
2942 tokio::task::spawn_blocking(move || {
2943 let hash = zeph_skills::compute_skill_hash(&skill_dir).ok()?;
2944 let source_kind = Self::classify_source_kind(
2945 &skill_dir,
2946 managed_dir_ref.as_ref(),
2947 &bundled_names_ref,
2948 );
2949 Some((hash, source_kind))
2950 })
2951 .await
2952 .unwrap_or(None);
2953
2954 let Some((current_hash, source_kind)) = fs_result else {
2955 tracing::warn!("failed to compute hash for '{}'", meta.name);
2956 continue;
2957 };
2958 let initial_level = match source_kind {
2959 zeph_memory::store::SourceKind::Bundled => &trust_cfg.bundled_level,
2960 zeph_memory::store::SourceKind::Local | zeph_memory::store::SourceKind::File => {
2961 &trust_cfg.local_level
2962 }
2963 _ => &trust_cfg.default_level,
2964 };
2965 let existing = memory
2966 .sqlite()
2967 .load_skill_trust(&meta.name)
2968 .await
2969 .ok()
2970 .flatten();
2971 let trust_level = if let Some(ref row) = existing {
2972 if row.blake3_hash != current_hash {
2973 trust_cfg.hash_mismatch_level
2974 } else if row.source_kind != source_kind {
2975 let stored = row.trust_level;
2979 if !stored.is_active() || stored.severity() <= initial_level.severity() {
2980 stored
2981 } else {
2982 *initial_level
2983 }
2984 } else {
2985 row.trust_level
2986 }
2987 } else {
2988 *initial_level
2989 };
2990 let source_path = meta.skill_dir.to_str();
2991 if let Err(e) = memory
2992 .sqlite()
2993 .upsert_skill_trust(
2994 &meta.name,
2995 trust_level,
2996 source_kind,
2997 None,
2998 source_path,
2999 ¤t_hash,
3000 )
3001 .await
3002 {
3003 tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
3004 }
3005 }
3006 }
3007
3008 async fn rebuild_skill_matcher(&mut self, all_meta: &[&zeph_skills::loader::SkillMeta]) {
3010 let provider = self.embedding_provider.clone();
3011 let embed_timeout =
3012 std::time::Duration::from_secs(self.runtime.config.timeouts.embedding_seconds);
3013 let embed_fn = move |text: &str| -> zeph_skills::matcher::EmbedFuture {
3014 let owned = text.to_owned();
3015 let p = provider.clone();
3016 Box::pin(async move {
3017 if let Ok(result) = tokio::time::timeout(embed_timeout, p.embed(&owned)).await {
3018 result
3019 } else {
3020 tracing::warn!(
3021 timeout_secs = embed_timeout.as_secs(),
3022 "skill matcher: embedding timed out"
3023 );
3024 Err(zeph_llm::LlmError::Timeout)
3025 }
3026 })
3027 };
3028
3029 let needs_inmemory_rebuild = !self
3030 .services
3031 .skill
3032 .matcher
3033 .as_ref()
3034 .is_some_and(SkillMatcherBackend::is_qdrant);
3035
3036 if needs_inmemory_rebuild {
3037 self.services.skill.matcher = SkillMatcher::new(all_meta, embed_fn)
3038 .await
3039 .map(SkillMatcherBackend::InMemory);
3040 } else if let Some(ref mut backend) = self.services.skill.matcher {
3041 let _ = self.channel.send_status("syncing skill index...").await;
3042 let on_progress: Option<Box<dyn Fn(usize, usize) + Send>> =
3043 self.services.session.status_tx.clone().map(
3044 |tx| -> Box<dyn Fn(usize, usize) + Send> {
3045 Box::new(move |completed, total| {
3046 let msg = format!("Syncing skills: {completed}/{total}");
3047 let _ = tx.send(msg);
3048 })
3049 },
3050 );
3051 if let Err(e) = backend
3052 .sync(
3053 all_meta,
3054 &self.services.skill.embedding_model,
3055 embed_fn,
3056 on_progress,
3057 )
3058 .await
3059 {
3060 tracing::warn!("failed to sync skill embeddings: {e:#}");
3061 }
3062 }
3063
3064 if self.services.skill.hybrid_search {
3065 let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
3066 let _ = self.channel.send_status("rebuilding search index...").await;
3067 self.services.skill.rebuild_bm25(&descs);
3068 }
3069 }
3070
3071 #[tracing::instrument(name = "core.agent.reload_skills", skip_all, level = "debug")]
3072 async fn reload_skills(&mut self) {
3073 let old_fp = self.services.skill.fingerprint();
3074 let reload_paths = if let Some(ref supplier) = self.services.skill.plugin_dirs_supplier {
3075 let plugin_dirs = supplier();
3076 let mut paths = self.services.skill.skill_paths.clone();
3077 for dir in plugin_dirs {
3078 if !paths.contains(&dir) {
3079 paths.push(dir);
3080 }
3081 }
3082 paths
3083 } else {
3084 self.services.skill.skill_paths.clone()
3085 };
3086 self.services.skill.registry.write().reload(&reload_paths);
3087 if self.services.skill.fingerprint() == old_fp {
3088 return;
3089 }
3090 let _ = self.channel.send_status("reloading skills...").await;
3091
3092 let all_meta = self
3093 .services
3094 .skill
3095 .registry
3096 .read()
3097 .all_meta()
3098 .into_iter()
3099 .cloned()
3100 .collect::<Vec<_>>();
3101
3102 self.update_trust_for_reloaded_skills(&all_meta).await;
3103
3104 let all_meta_refs = all_meta.iter().collect::<Vec<_>>();
3105 self.rebuild_skill_matcher(&all_meta_refs).await;
3106
3107 let all_skills: Vec<Skill> = {
3108 let reg = self.services.skill.registry.read();
3109 reg.all_meta()
3110 .iter()
3111 .filter_map(|m| reg.skill(&m.name).ok())
3112 .collect()
3113 };
3114 let trust_map = self.build_skill_trust_map().await;
3115 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
3116 let skills_prompt =
3117 state::SkillState::rebuild_prompt(&all_skills, &trust_map, &empty_health);
3118 self.services
3119 .skill
3120 .last_skills_prompt
3121 .clone_from(&skills_prompt);
3122 let system_prompt = build_system_prompt(&skills_prompt, None);
3123 if let Some(msg) = self.msg.messages.first_mut() {
3124 msg.content = system_prompt;
3125 }
3126
3127 let _ = self.channel.send_status("").await;
3128 tracing::info!(
3129 "reloaded {} skill(s)",
3130 self.services.skill.registry.read().all_meta().len()
3131 );
3132 }
3133
3134 async fn reload_instructions(&mut self) {
3135 if let Some(ref mut rx) = self.runtime.instructions.reload_rx {
3137 while rx.try_recv().is_ok() {}
3138 }
3139 let Some(ref state) = self.runtime.instructions.reload_state else {
3140 return;
3141 };
3142 let base_dir = state.base_dir.clone();
3143 let provider_kinds = state.provider_kinds.clone();
3144 let explicit_files = state.explicit_files.clone();
3145 let auto_detect = state.auto_detect;
3146 let new_blocks = crate::instructions::load_instructions_async(
3147 base_dir,
3148 provider_kinds,
3149 explicit_files,
3150 auto_detect,
3151 )
3152 .await;
3153 let old_sources: std::collections::HashSet<_> = self
3154 .runtime
3155 .instructions
3156 .blocks
3157 .iter()
3158 .map(|b| &b.source)
3159 .collect();
3160 let new_sources: std::collections::HashSet<_> =
3161 new_blocks.iter().map(|b| &b.source).collect();
3162 for added in new_sources.difference(&old_sources) {
3163 tracing::info!(path = %added.display(), "instruction file added");
3164 }
3165 for removed in old_sources.difference(&new_sources) {
3166 tracing::info!(path = %removed.display(), "instruction file removed");
3167 }
3168 tracing::info!(
3169 old_count = self.runtime.instructions.blocks.len(),
3170 new_count = new_blocks.len(),
3171 "reloaded instruction files"
3172 );
3173 self.runtime.instructions.blocks = new_blocks;
3174 }
3175
3176 #[allow(clippy::too_many_lines)]
3177 fn reload_config(&mut self) {
3178 let Some(path) = self.runtime.lifecycle.config_path.clone() else {
3179 return;
3180 };
3181 let Some(config) = self.load_config_with_overlay(&path) else {
3182 return;
3183 };
3184 let budget_tokens = resolve_context_budget(&config, &self.provider);
3185 self.runtime.config.security = config.security;
3186 self.runtime.config.timeouts = config.timeouts;
3187 self.runtime.config.redact_credentials = config.memory.redact_credentials;
3188 self.services.memory.persistence.history_limit = config.memory.history_limit;
3189 self.services.memory.persistence.recall_limit = config.memory.semantic.recall_limit;
3190 self.services.memory.compaction.summarization_threshold =
3191 config.memory.summarization_threshold;
3192 self.services.skill.max_active_skills = config.skills.max_active_skills.get();
3193 self.services.skill.disambiguation_threshold = config.skills.disambiguation_threshold;
3194 self.services.skill.min_injection_score = config.skills.min_injection_score;
3195 self.services.skill.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
3196 self.services.skill.hybrid_search = config.skills.hybrid_search;
3197 {
3198 let alpha = config.skills.bm25_alpha;
3199 if !(0.0..=1.0).contains(&alpha) {
3200 tracing::warn!(
3201 bm25_alpha = alpha,
3202 "bm25_alpha is outside [0.0, 1.0]; clamping to valid range"
3203 );
3204 }
3205 self.services.skill.bm25_alpha = alpha.clamp(0.0, 1.0);
3206 }
3207 self.services.skill.two_stage_matching = config.skills.two_stage_matching;
3208 self.services.skill.confusability_threshold =
3209 config.skills.confusability_threshold.clamp(0.0, 1.0);
3210 self.services.skill.group_structured = config.skills.group_structured;
3211 self.services.skill.support_similarity_threshold =
3212 config.skills.support_similarity_threshold;
3213 config
3214 .skills
3215 .query_rewrite_provider
3216 .as_str()
3217 .clone_into(&mut self.services.skill.query_rewrite_provider_name);
3218 config
3219 .skills
3220 .generation_provider
3221 .as_str()
3222 .clone_into(&mut self.services.skill.generation_provider_name);
3223 config
3224 .skills
3225 .disambiguate_provider
3226 .as_str()
3227 .clone_into(&mut self.services.skill.disambiguate_provider_name);
3228 self.services.skill.generation_timeout_ms = config.skills.generation_timeout_ms;
3229 self.services.skill.semantic_scan = config.skills.semantic_scan;
3230 config
3231 .skills
3232 .semantic_scan_provider
3233 .as_str()
3234 .clone_into(&mut self.services.skill.semantic_scan_provider);
3235 self.services.skill.generation_output_dir =
3236 config.skills.generation_output_dir.as_deref().map(|p| {
3237 if let Some(stripped) = p.strip_prefix("~/") {
3238 dirs::home_dir()
3239 .map_or_else(|| std::path::PathBuf::from(p), |h| h.join(stripped))
3240 } else {
3241 std::path::PathBuf::from(p)
3242 }
3243 });
3244
3245 self.context_manager.budget = Some(
3246 ContextBudget::new(budget_tokens, 0.20).with_graph_enabled(config.memory.graph.enabled),
3247 );
3248
3249 {
3250 let graph_cfg = &config.memory.graph;
3251 if graph_cfg.rpe.enabled {
3252 if self.services.memory.extraction.rpe_router.is_none() {
3254 self.services.memory.extraction.rpe_router =
3255 Some(std::sync::Mutex::new(zeph_memory::RpeRouter::new(
3256 graph_cfg.rpe.threshold,
3257 graph_cfg.rpe.max_skip_turns,
3258 )));
3259 }
3260 } else {
3261 self.services.memory.extraction.rpe_router = None;
3262 }
3263 self.services.memory.extraction.graph_config = graph_cfg.clone();
3264 }
3265 self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
3266 self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
3267 self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
3268 self.context_manager
3269 .set_compaction_cooldown_turns(config.memory.compaction_cooldown_turns);
3270 self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
3271 self.context_manager.compression = config.memory.compression.clone();
3272 self.context_manager.routing = config.memory.store_routing.clone();
3273 self.context_manager.store_routing_provider = if config
3275 .memory
3276 .store_routing
3277 .routing_classifier_provider
3278 .is_empty()
3279 {
3280 None
3281 } else {
3282 let resolved = self.resolve_background_provider(
3283 config
3284 .memory
3285 .store_routing
3286 .routing_classifier_provider
3287 .as_str(),
3288 );
3289 Some(std::sync::Arc::new(resolved))
3290 };
3291 self.services
3292 .memory
3293 .persistence
3294 .cross_session_score_threshold = config.memory.cross_session_score_threshold;
3295
3296 self.services.index.repo_map_tokens = config.index.repo_map_tokens;
3297 self.services.index.repo_map_ttl =
3298 std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
3299
3300 self.services
3301 .session
3302 .hooks_config
3303 .cwd_changed
3304 .clone_from(&config.hooks.cwd_changed);
3305 self.services
3306 .session
3307 .hooks_config
3308 .permission_denied
3309 .clone_from(&config.hooks.permission_denied);
3310 self.services
3311 .session
3312 .hooks_config
3313 .turn_complete
3314 .clone_from(&config.hooks.turn_complete);
3315 tracing::info!("config reloaded");
3318 }
3319
3320 fn load_config_with_overlay(&mut self, path: &std::path::Path) -> Option<Config> {
3324 let mut config = match Config::load(path) {
3325 Ok(c) => c,
3326 Err(e) => {
3327 tracing::warn!("config reload failed: {e:#}");
3328 return None;
3329 }
3330 };
3331
3332 let new_overlay = if self.runtime.lifecycle.plugins_dir.as_os_str().is_empty() {
3334 None
3335 } else {
3336 match zeph_plugins::apply_plugin_config_overlays(
3337 &mut config,
3338 &self.runtime.lifecycle.plugins_dir,
3339 ) {
3340 Ok(o) => Some(o),
3341 Err(e) => {
3342 tracing::warn!(
3343 "plugin overlay merge failed during reload: {e:#}; \
3344 keeping previous runtime state"
3345 );
3346 return None;
3347 }
3348 }
3349 };
3350
3351 if let Some(ref overlay) = new_overlay {
3355 self.warn_on_shell_overlay_divergence(overlay, &config);
3356 }
3357 Some(config)
3358 }
3359
3360 fn warn_on_shell_overlay_divergence(
3366 &self,
3367 new_overlay: &zeph_plugins::ResolvedOverlay,
3368 config: &Config,
3369 ) {
3370 let new_blocked: Vec<String> = {
3371 let mut v = config.tools.shell.blocked_commands.clone();
3372 v.sort();
3373 v
3374 };
3375 let new_allowed: Vec<String> = {
3376 let mut v = config.tools.shell.allowed_commands.clone();
3377 v.sort();
3378 v
3379 };
3380
3381 let startup = &self.runtime.lifecycle.startup_shell_overlay;
3382 let blocked_changed = new_blocked != startup.blocked;
3383 let allowed_changed = new_allowed != startup.allowed;
3384
3385 if blocked_changed && let Some(ref h) = self.runtime.lifecycle.shell_policy_handle {
3387 h.rebuild(&config.tools.shell);
3388 tracing::info!(
3389 blocked_count = h.snapshot_blocked().len(),
3390 "shell blocked_commands rebuilt from hot-reload"
3391 );
3392 }
3393
3394 if allowed_changed {
3401 let msg = "plugin config overlay changed shell allowed_commands; RESTART REQUIRED \
3402 for sandbox path recomputation (blocked_commands was rebuilt live)";
3403 tracing::warn!("{msg}");
3404 if let Some(ref tx) = self.services.session.status_tx {
3405 let _ = tx.send(msg.to_owned());
3406 }
3407 }
3408
3409 let _ = new_overlay;
3410 }
3411
3412 fn maybe_sidequest_eviction(&mut self) {
3423 if self.services.sidequest.config.enabled {
3427 use crate::config::PruningStrategy;
3428 if !matches!(
3429 self.context_manager.compression.pruning_strategy,
3430 PruningStrategy::Reactive
3431 ) {
3432 tracing::warn!(
3433 strategy = ?self.context_manager.compression.pruning_strategy,
3434 "sidequest is enabled alongside a non-Reactive pruning strategy; \
3435 consider disabling sidequest.enabled to avoid redundant eviction"
3436 );
3437 }
3438 }
3439
3440 if self.services.focus.is_active() {
3442 tracing::debug!("sidequest: skipping — focus session active");
3443 self.services.compression.pending_sidequest_result = None;
3445 return;
3446 }
3447
3448 self.sidequest_apply_pending();
3450
3451 self.sidequest_schedule_next();
3453 }
3454
3455 fn sidequest_apply_pending(&mut self) {
3456 let Some(handle) = self.services.compression.pending_sidequest_result.take() else {
3457 return;
3458 };
3459 let result = match handle.try_join() {
3462 Ok(result) => result,
3463 Err(_handle) => {
3464 tracing::debug!("sidequest: background LLM task not yet complete, rescheduling");
3466 return;
3467 }
3468 };
3469 match result {
3470 Ok(Some(evicted_indices)) if !evicted_indices.is_empty() => {
3471 let cursors_snapshot = self.services.sidequest.tool_output_cursors.clone();
3472 let freed = self.services.sidequest.apply_eviction(
3473 &mut self.msg.messages,
3474 &evicted_indices,
3475 &self.runtime.metrics.token_counter,
3476 );
3477 if freed > 0 {
3478 self.recompute_prompt_tokens();
3479 self.context_manager.set_compaction_state(
3482 crate::agent::context_manager::CompactionState::CompactedThisTurn {
3483 cooldown: 0,
3484 },
3485 );
3486 tracing::info!(
3487 freed_tokens = freed,
3488 evicted_cursors = evicted_indices.len(),
3489 pass = self.services.sidequest.passes_run,
3490 "sidequest eviction complete"
3491 );
3492 if let Some(ref d) = self.runtime.debug.debug_dumper {
3493 d.dump_sidequest_eviction(&cursors_snapshot, &evicted_indices, freed);
3494 }
3495 if let Some(ref tx) = self.services.session.status_tx {
3496 let _ = tx.send(format!("SideQuest evicted {freed} tokens"));
3497 }
3498 } else {
3499 if let Some(ref tx) = self.services.session.status_tx {
3501 let _ = tx.send(String::new());
3502 }
3503 }
3504 }
3505 Ok(None | Some(_)) => {
3506 tracing::debug!("sidequest: pending result: no cursors to evict");
3507 if let Some(ref tx) = self.services.session.status_tx {
3508 let _ = tx.send(String::new());
3509 }
3510 }
3511 Err(e) => {
3512 tracing::debug!("sidequest: background task error: {e}");
3513 if let Some(ref tx) = self.services.session.status_tx {
3514 let _ = tx.send(String::new());
3515 }
3516 }
3517 }
3518 }
3519
3520 fn sidequest_schedule_next(&mut self) {
3521 use zeph_llm::provider::{Message, MessageMetadata, Role};
3522
3523 self.services
3524 .sidequest
3525 .rebuild_cursors(&self.msg.messages, &self.runtime.metrics.token_counter);
3526
3527 if self.services.sidequest.tool_output_cursors.is_empty() {
3528 tracing::debug!("sidequest: no eligible cursors");
3529 return;
3530 }
3531
3532 let prompt = self.services.sidequest.build_eviction_prompt();
3533 let max_eviction_ratio = self.services.sidequest.config.max_eviction_ratio;
3534 let n_cursors = self.services.sidequest.tool_output_cursors.len();
3535 let provider = self.summary_or_primary_provider().clone();
3537
3538 let eviction_future = async move {
3539 let msgs = [Message {
3540 role: Role::User,
3541 content: prompt,
3542 parts: vec![],
3543 metadata: MessageMetadata::default(),
3544 }];
3545 let response =
3546 match tokio::time::timeout(std::time::Duration::from_secs(5), provider.chat(&msgs))
3547 .await
3548 {
3549 Ok(Ok(r)) => r,
3550 Ok(Err(e)) => {
3551 tracing::debug!("sidequest bg: LLM call failed: {e:#}");
3552 return None;
3553 }
3554 Err(_) => {
3555 tracing::debug!("sidequest bg: LLM call timed out");
3556 return None;
3557 }
3558 };
3559
3560 let start = response.find('{')?;
3561 let end = response.rfind('}')?;
3562 if start > end {
3563 return None;
3564 }
3565 let json_slice = &response[start..=end];
3566 let parsed: sidequest::EvictionResponse = serde_json::from_str(json_slice).ok()?;
3567 let mut valid: Vec<usize> = parsed
3568 .del_cursors
3569 .into_iter()
3570 .filter(|&c| c < n_cursors)
3571 .collect();
3572 valid.sort_unstable();
3573 valid.dedup();
3574 #[allow(
3575 clippy::cast_precision_loss,
3576 clippy::cast_possible_truncation,
3577 clippy::cast_sign_loss
3578 )]
3579 let max_evict = ((n_cursors as f32) * max_eviction_ratio).ceil() as usize;
3580 valid.truncate(max_evict);
3581 Some(valid)
3582 };
3583 let handle = self.runtime.lifecycle.task_supervisor.spawn_oneshot(
3584 std::sync::Arc::from("agent.sidequest.eviction"),
3585 move || eviction_future,
3586 );
3587 self.services.compression.pending_sidequest_result = Some(handle);
3588 tracing::debug!("sidequest: background LLM eviction task spawned");
3589 if let Some(ref tx) = self.services.session.status_tx {
3590 let _ = tx.send("SideQuest: scoring tool outputs...".into());
3591 }
3592 }
3593
3594 fn mcp_dispatch(&self) -> Option<McpManagerDispatch> {
3596 self.services
3597 .mcp
3598 .manager
3599 .as_ref()
3600 .map(|m| McpManagerDispatch(Arc::clone(m)))
3601 }
3602
3603 pub(crate) async fn check_cwd_changed(&mut self) {
3609 let current = match std::env::current_dir() {
3610 Ok(p) => p,
3611 Err(e) => {
3612 tracing::warn!("check_cwd_changed: failed to get cwd: {e}");
3613 return;
3614 }
3615 };
3616 if current == self.runtime.lifecycle.last_known_cwd {
3617 return;
3618 }
3619 let old_cwd =
3620 std::mem::replace(&mut self.runtime.lifecycle.last_known_cwd, current.clone());
3621 self.services.session.env_context.working_dir = current.display().to_string();
3622
3623 tracing::info!(
3624 old = %old_cwd.display(),
3625 new = %current.display(),
3626 "working directory changed"
3627 );
3628
3629 let _ = self
3630 .channel
3631 .send_status("Working directory changed\u{2026}")
3632 .await;
3633
3634 let hooks = self.services.session.hooks_config.cwd_changed.clone();
3635 if hooks.is_empty() {
3636 tracing::debug!("CwdChanged: no hooks configured, skipping");
3637 } else {
3638 tracing::debug!(count = hooks.len(), "CwdChanged: firing hooks");
3639 let mut env = std::collections::HashMap::new();
3640 env.insert("ZEPH_OLD_CWD".to_owned(), old_cwd.display().to_string());
3641 env.insert("ZEPH_NEW_CWD".to_owned(), current.display().to_string());
3642 let dispatch = self.mcp_dispatch();
3643 let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
3644 .as_ref()
3645 .map(|d| d as &dyn zeph_subagent::McpDispatch);
3646 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp, None).await {
3647 tracing::warn!(error = %e, "CwdChanged hook failed");
3648 } else {
3649 tracing::info!(count = hooks.len(), "CwdChanged: hooks fired");
3650 }
3651 }
3652
3653 let _ = self.channel.send_status("").await;
3654 }
3655
3656 pub(crate) async fn handle_file_changed(
3658 &mut self,
3659 event: crate::file_watcher::FileChangedEvent,
3660 ) {
3661 tracing::info!(path = %event.path.display(), "file changed");
3662
3663 let _ = self
3664 .channel
3665 .send_status("Running file-change hook\u{2026}")
3666 .await;
3667
3668 let hooks = self
3669 .services
3670 .session
3671 .hooks_config
3672 .file_changed_hooks
3673 .clone();
3674 if hooks.is_empty() {
3675 tracing::debug!(path = %event.path.display(), "FileChanged: no hooks configured, skipping");
3676 } else {
3677 tracing::debug!(count = hooks.len(), path = %event.path.display(), "FileChanged: firing hooks");
3678 let mut env = std::collections::HashMap::new();
3679 env.insert(
3680 "ZEPH_CHANGED_PATH".to_owned(),
3681 event.path.display().to_string(),
3682 );
3683 let dispatch = self.mcp_dispatch();
3684 let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
3685 .as_ref()
3686 .map(|d| d as &dyn zeph_subagent::McpDispatch);
3687 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp, None).await {
3688 tracing::warn!(error = %e, "FileChanged hook failed");
3689 } else {
3690 tracing::info!(count = hooks.len(), path = %event.path.display(), "FileChanged: hooks fired");
3691 }
3692 }
3693
3694 let _ = self.channel.send_status("").await;
3695 }
3696
3697 pub(super) fn maybe_spawn_promotion_scan(&mut self) {
3707 let Some(engine) = self.services.promotion_engine.clone() else {
3708 return;
3709 };
3710
3711 let Some(memory) = self.services.memory.persistence.memory.clone() else {
3712 return;
3713 };
3714
3715 let promotion_window = 200usize;
3718
3719 let accepted = self.runtime.lifecycle.supervisor.spawn(
3720 agent_supervisor::TaskClass::Enrichment,
3721 "compression_spectrum.promotion_scan",
3722 async move {
3723 let window = match memory.load_promotion_window(promotion_window).await {
3724 Ok(w) => w,
3725 Err(e) => {
3726 tracing::warn!(error = %e, "promotion scan: failed to load window");
3727 return;
3728 }
3729 };
3730
3731 if window.is_empty() {
3732 return;
3733 }
3734
3735 let candidates = match engine.scan(&window).await {
3736 Ok(c) => c,
3737 Err(e) => {
3738 tracing::warn!(error = %e, "promotion scan: clustering failed");
3739 return;
3740 }
3741 };
3742
3743 for candidate in &candidates {
3744 if let Err(e) = engine.promote(candidate).await {
3745 tracing::warn!(
3746 signature = %candidate.signature,
3747 error = %e,
3748 "promotion scan: promote failed"
3749 );
3750 }
3751 }
3752
3753 tracing::info!(candidates = candidates.len(), "promotion scan: complete");
3754 }
3755 .instrument(tracing::info_span!("memory.compression.promote.background")),
3756 );
3757
3758 if accepted {
3759 tracing::debug!("compression_spectrum: promotion scan task enqueued");
3760 }
3761 }
3762}
3763pub(crate) fn estimate_parts_size(m: &zeph_llm::provider::Message) -> usize {
3771 use zeph_llm::provider::MessagePart;
3772 if m.parts.is_empty() {
3773 return m.content.len();
3774 }
3775 m.parts
3776 .iter()
3777 .map(|p| match p {
3778 MessagePart::Text { text }
3779 | MessagePart::Recall { text }
3780 | MessagePart::CodeContext { text }
3781 | MessagePart::Summary { text }
3782 | MessagePart::CrossSession { text } => text.len(),
3783 MessagePart::ToolOutput { body, .. } => body.len(),
3784 MessagePart::ToolUse { id, name, input } => {
3785 50 + id.len() + name.len() + input.to_string().len()
3786 }
3787 MessagePart::ToolResult {
3788 tool_use_id,
3789 content,
3790 ..
3791 } => 50 + tool_use_id.len() + content.len(),
3792 MessagePart::Image(img) => img.data.len() * 4 / 3,
3793 MessagePart::ThinkingBlock {
3794 thinking,
3795 signature,
3796 } => 50 + thinking.len() + signature.len(),
3797 MessagePart::RedactedThinkingBlock { data } => data.len(),
3798 MessagePart::Compaction { summary } => summary.len(),
3799 _ => 0,
3800 })
3801 .sum()
3802}
3803
3804pub(crate) fn trim_parent_messages(msgs: &mut Vec<zeph_llm::provider::Message>, max_chars: usize) {
3823 use zeph_llm::provider::{MessagePart, Role};
3824
3825 let mut total_chars = 0usize;
3829 let mut drop_before = 0usize; for (i, m) in msgs.iter().enumerate().rev() {
3831 total_chars += estimate_parts_size(m);
3832 if total_chars > max_chars {
3833 drop_before = i + 1;
3834 break;
3835 }
3836 }
3837 if drop_before > 0 {
3838 msgs.drain(..drop_before);
3839 }
3840
3841 let emitted_tool_ids: std::collections::HashSet<String> = msgs
3845 .iter()
3846 .filter(|m| m.role == Role::Assistant)
3847 .flat_map(|m| m.parts.iter())
3848 .filter_map(|p| {
3849 if let MessagePart::ToolUse { id, .. } = p {
3850 Some(id.clone())
3851 } else {
3852 None
3853 }
3854 })
3855 .collect();
3856
3857 let mut orphans_removed = 0usize;
3858 for m in msgs.iter_mut() {
3859 if m.role != Role::User || m.parts.is_empty() {
3860 continue;
3861 }
3862 let before = m.parts.len();
3863 m.parts.retain(|p| match p {
3864 MessagePart::ToolResult { tool_use_id, .. } => {
3865 emitted_tool_ids.contains(tool_use_id.as_str())
3866 }
3867 _ => true,
3868 });
3869 let dropped = before - m.parts.len();
3870 if dropped > 0 {
3871 orphans_removed += dropped;
3872 if m.parts.is_empty() {
3873 m.content.clear();
3874 } else {
3875 m.rebuild_content();
3876 }
3877 }
3878 }
3879
3880 let consumed_tool_ids: std::collections::HashSet<String> = msgs
3888 .iter()
3889 .filter(|m| m.role == Role::User)
3890 .flat_map(|m| m.parts.iter())
3891 .filter_map(|p| {
3892 if let MessagePart::ToolResult { tool_use_id, .. } = p {
3893 Some(tool_use_id.clone())
3894 } else {
3895 None
3896 }
3897 })
3898 .collect();
3899
3900 let last_assistant_idx = msgs
3902 .iter()
3903 .enumerate()
3904 .rev()
3905 .find(|(_, m)| m.role == Role::Assistant)
3906 .map(|(i, _)| i);
3907
3908 for (idx, m) in msgs.iter_mut().enumerate() {
3909 if m.role != Role::Assistant || m.parts.is_empty() {
3910 continue;
3911 }
3912 if Some(idx) == last_assistant_idx {
3914 continue;
3915 }
3916 let before = m.parts.len();
3917 m.parts.retain(|p| match p {
3918 MessagePart::ToolUse { id, .. } => consumed_tool_ids.contains(id.as_str()),
3919 _ => true,
3920 });
3921 let dropped = before - m.parts.len();
3922 if dropped > 0 {
3923 orphans_removed += dropped;
3924 if m.parts.is_empty() {
3925 m.content.clear();
3926 } else {
3927 m.rebuild_content();
3928 }
3929 }
3930 }
3931
3932 msgs.retain(|m| !m.content.is_empty() || !m.parts.is_empty());
3934
3935 if orphans_removed > 0 {
3936 tracing::debug!(
3937 orphans = orphans_removed,
3938 "[subagent] pruned orphaned ToolUse/ToolResult parts from parent context boundary"
3939 );
3940 }
3941}
3942
3943fn sanitize_parent_messages(
3949 mut msgs: Vec<zeph_llm::provider::Message>,
3950 sanitizer: &zeph_sanitizer::ContentSanitizer,
3951 source: &zeph_sanitizer::ContentSource,
3952) -> Vec<zeph_llm::provider::Message> {
3953 use zeph_llm::provider::MessagePart;
3954 for msg in &mut msgs {
3955 let mut changed = false;
3956 for part in &mut msg.parts {
3957 if let MessagePart::Text { text } = part {
3958 let clean = sanitizer.sanitize(text, source.clone());
3959 if clean.body != *text {
3960 *text = clean.body;
3961 changed = true;
3962 }
3963 }
3964 }
3965 if changed {
3966 msg.rebuild_content();
3967 }
3968 }
3969 msgs
3970}
3971
3972struct McpManagerDispatch(Arc<zeph_mcp::McpManager>);
3977
3978impl zeph_subagent::McpDispatch for McpManagerDispatch {
3979 fn call_tool<'a>(
3980 &'a self,
3981 server: &'a str,
3982 tool: &'a str,
3983 args: serde_json::Value,
3984 ) -> std::pin::Pin<
3985 Box<dyn std::future::Future<Output = Result<serde_json::Value, String>> + Send + 'a>,
3986 > {
3987 Box::pin(async move {
3988 self.0
3989 .call_tool(server, tool, args)
3990 .await
3991 .map(|result| {
3992 let texts: Vec<serde_json::Value> = result
3994 .content
3995 .iter()
3996 .filter_map(|c| {
3997 if let rmcp::model::RawContent::Text(t) = &c.raw {
3998 Some(serde_json::Value::String(t.text.clone()))
3999 } else {
4000 None
4001 }
4002 })
4003 .collect();
4004 serde_json::Value::Array(texts)
4005 })
4006 .map_err(|e| e.to_string())
4007 })
4008 }
4009}
4010
4011pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
4012 while !*rx.borrow_and_update() {
4013 if rx.changed().await.is_err() {
4014 std::future::pending::<()>().await;
4015 }
4016 }
4017}
4018
4019pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
4020 match rx {
4021 Some(inner) => {
4022 if let Some(v) = inner.recv().await {
4023 Some(v)
4024 } else {
4025 *rx = None;
4026 std::future::pending().await
4027 }
4028 }
4029 None => std::future::pending().await,
4030 }
4031}
4032
4033fn truncate_shell_command(cmd: &str) -> String {
4040 if cmd.len() <= 80 {
4041 return cmd.to_owned();
4042 }
4043 let end = cmd.floor_char_boundary(79);
4044 format!("{}…", &cmd[..end])
4045}
4046
4047fn truncate_shell_run_id(id: &str) -> String {
4049 id.chars().take(8).collect()
4050}
4051
4052pub(crate) fn resolve_context_budget(config: &Config, provider: &AnyProvider) -> usize {
4053 let tokens = if config.memory.auto_budget && config.memory.context_budget_tokens == 0 {
4054 if let Some(ctx_size) = provider.context_window() {
4055 tracing::info!(
4056 model_context = ctx_size,
4057 "auto-configured context budget on reload"
4058 );
4059 ctx_size
4060 } else {
4061 0
4062 }
4063 } else {
4064 config.memory.context_budget_tokens
4065 };
4066 if tokens == 0 {
4067 tracing::warn!(
4068 "context_budget_tokens resolved to 0 on reload — using fallback of 128000 tokens"
4069 );
4070 128_000
4071 } else {
4072 tokens
4073 }
4074}
4075
4076#[cfg(test)]
4077mod tests;
4078
4079#[cfg(test)]
4080pub(crate) use tests::agent_tests;
4081
4082#[cfg(test)]
4083mod test_stubs {
4084 use std::pin::Pin;
4085
4086 use zeph_commands::{
4087 CommandContext, CommandError, CommandHandler, CommandOutput, SlashCategory,
4088 };
4089
4090 pub(super) struct TestErrorCommand;
4096
4097 impl CommandHandler<CommandContext<'_>> for TestErrorCommand {
4098 fn name(&self) -> &'static str {
4099 "/test-error"
4100 }
4101
4102 fn description(&self) -> &'static str {
4103 "Test stub: always returns CommandError"
4104 }
4105
4106 fn category(&self) -> SlashCategory {
4107 SlashCategory::Session
4108 }
4109
4110 fn handle<'a>(
4111 &'a self,
4112 _ctx: &'a mut CommandContext<'_>,
4113 _args: &'a str,
4114 ) -> Pin<
4115 Box<dyn std::future::Future<Output = Result<CommandOutput, CommandError>> + Send + 'a>,
4116 > {
4117 Box::pin(async { Err(CommandError::new("boom")) })
4118 }
4119 }
4120}