1mod acp_commands;
10mod agent_access_impl;
11pub(crate) mod agent_supervisor;
12mod autodream;
13mod builder;
14pub(crate) mod channel_impl;
15#[cfg(feature = "cocoon")]
16mod cocoon_cmd;
17mod command_context_impls;
18pub(super) mod compression_feedback;
19mod context;
20mod context_impls;
21pub(crate) mod context_manager;
22mod corrections;
23pub mod error;
24mod experiment_cmd;
25pub(crate) mod focus;
26mod index;
27mod learning;
28pub(crate) mod learning_engine;
29mod log_commands;
30mod loop_event;
31mod lsp_commands;
32mod magic_docs;
33mod mcp;
34pub(crate) mod memcot;
35mod message_queue;
36mod microcompact;
37mod model_commands;
38mod persistence;
39#[cfg(feature = "scheduler")]
40mod plan;
41mod policy_commands;
42mod provider_cmd;
43mod quality_hook;
44pub(crate) mod rate_limiter;
45#[cfg(feature = "scheduler")]
46mod scheduler_commands;
47#[cfg(feature = "scheduler")]
48mod scheduler_loop;
49mod scope_commands;
50pub mod session_config;
51mod session_digest;
52pub mod shadow_sentinel;
53pub(crate) mod sidequest;
54mod skill_management;
55pub mod slash_commands;
56pub mod speculative;
57pub(crate) mod state;
58pub(crate) mod task_injection;
59pub(crate) mod tool_execution;
60pub(crate) mod tool_orchestrator;
61pub mod trajectory;
62mod trajectory_commands;
63mod trust_commands;
64pub mod turn;
65mod utils;
66pub(crate) mod vigil;
67
68use std::collections::{HashMap, VecDeque};
69use std::fmt::Write as _;
70use std::sync::Arc;
71
72use parking_lot::RwLock;
73
74use tokio::sync::{mpsc, watch};
75use tokio_util::sync::CancellationToken;
76use zeph_llm::any::AnyProvider;
77use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
78use zeph_memory::TokenCounter;
79use zeph_memory::semantic::SemanticMemory;
80use zeph_skills::loader::Skill;
81use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
82use zeph_skills::prompt::format_skills_prompt;
83use zeph_skills::registry::SkillRegistry;
84use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
85
86use crate::channel::Channel;
87use crate::config::Config;
88use crate::context::{ContextBudget, build_system_prompt};
89use zeph_common::text::estimate_tokens;
90
91use loop_event::LoopEvent;
92use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, detect_image_mime};
93use state::MessageState;
94
95pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
96pub(crate) use zeph_agent_context::helpers::CODE_CONTEXT_PREFIX;
100pub(crate) const SCHEDULED_TASK_PREFIX: &str = "Execute the following scheduled task now: ";
101pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
102
103pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
104 use std::fmt::Write;
105 let capacity = "[tool output: ".len()
106 + tool_name.len()
107 + "]\n```\n".len()
108 + body.len()
109 + TOOL_OUTPUT_SUFFIX.len();
110 let mut buf = String::with_capacity(capacity);
111 let _ = write!(
112 buf,
113 "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
114 );
115 buf
116}
117
118pub struct Agent<C: Channel> {
151 provider: AnyProvider,
153 embedding_provider: AnyProvider,
158 channel: C,
159 pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
160
161 pub(super) msg: MessageState,
163 pub(super) context_manager: context_manager::ContextManager,
164 pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
165
166 pub(super) services: state::Services,
168
169 pub(super) runtime: state::AgentRuntime,
171}
172
173enum DispatchFlow {
175 Break,
177 Continue,
179 Fallthrough,
181}
182
183impl<C: Channel> Agent<C> {
184 #[must_use]
208 pub fn new(
209 provider: AnyProvider,
210 channel: C,
211 registry: SkillRegistry,
212 matcher: Option<SkillMatcherBackend>,
213 max_active_skills: usize,
214 tool_executor: impl ToolExecutor + 'static,
215 ) -> Self {
216 let registry = Arc::new(RwLock::new(registry));
217 let embedding_provider = provider.clone();
218 Self::new_with_registry_arc(
219 provider,
220 embedding_provider,
221 channel,
222 registry,
223 matcher,
224 max_active_skills,
225 tool_executor,
226 )
227 }
228
229 #[must_use]
236 pub fn new_with_registry_arc(
237 provider: AnyProvider,
238 embedding_provider: AnyProvider,
239 channel: C,
240 registry: Arc<RwLock<SkillRegistry>>,
241 matcher: Option<SkillMatcherBackend>,
242 max_active_skills: usize,
243 tool_executor: impl ToolExecutor + 'static,
244 ) -> Self {
245 use state::{
246 AgentRuntime, CompressionState, DebugState, ExperimentState, FeedbackState, IndexState,
247 InstructionState, LifecycleState, McpState, MemoryState, MetricsState,
248 OrchestrationState, ProviderState, RuntimeConfig, SecurityState, Services,
249 SessionState, SkillState, ToolState,
250 };
251
252 debug_assert!(max_active_skills > 0, "max_active_skills must be > 0");
253 let all_skills: Vec<Skill> = {
254 let reg = registry.read();
255 reg.all_meta()
256 .iter()
257 .filter_map(|m| reg.skill(&m.name).ok())
258 .collect()
259 };
260 let empty_trust = HashMap::new();
261 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
262 let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
263 let system_prompt = build_system_prompt(&skills_prompt, None);
264 tracing::debug!(len = system_prompt.len(), "initial system prompt built");
265 tracing::trace!(prompt = %system_prompt, "full system prompt");
266
267 let initial_prompt_tokens = estimate_tokens(&system_prompt) as u64;
268 let token_counter = Arc::new(TokenCounter::new());
269
270 let services = Services {
271 memory: MemoryState::default(),
272 skill: SkillState::new(registry, matcher, max_active_skills, skills_prompt),
273 learning_engine: learning_engine::LearningEngine::new(),
274 feedback: FeedbackState::default(),
275 mcp: McpState::default(),
276 index: IndexState::default(),
277 session: SessionState::new(),
278 security: SecurityState::default(),
279 experiments: ExperimentState::new(),
280 compression: CompressionState::default(),
281 orchestration: OrchestrationState::default(),
282 focus: focus::FocusState::default(),
283 sidequest: sidequest::SidequestState::default(),
284 tool_state: ToolState::default(),
285 goal_accounting: None,
286 quality: None,
287 proactive_explorer: None,
288 promotion_engine: None,
289 taco_compressor: None,
290 speculation_engine: None,
291 };
292
293 let runtime = AgentRuntime {
294 config: RuntimeConfig::default(),
295 lifecycle: LifecycleState::new(),
296 providers: ProviderState::new(initial_prompt_tokens),
297 metrics: MetricsState::new(token_counter),
298 debug: DebugState::default(),
299 instructions: InstructionState::default(),
300 };
301
302 Self {
303 provider,
304 embedding_provider,
305 channel,
306 tool_executor: Arc::new(tool_executor),
307 msg: MessageState {
308 messages: vec![Message {
309 role: Role::System,
310 content: system_prompt,
311 parts: vec![],
312 metadata: MessageMetadata::default(),
313 }],
314 message_queue: VecDeque::new(),
315 pending_image_parts: Vec::new(),
316 last_persisted_message_id: None,
317 deferred_db_hide_ids: Vec::new(),
318 deferred_db_summaries: Vec::new(),
319 },
320 context_manager: context_manager::ContextManager::new(),
321 tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
322 services,
323 runtime,
324 }
325 }
326
327 #[must_use]
340 pub fn into_channel(self) -> C {
341 self.channel
342 }
343
344 #[tracing::instrument(name = "core.agent.poll_subagents", skip_all, level = "debug")]
349 pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
350 let Some(mgr) = &mut self.services.orchestration.subagent_manager else {
351 return vec![];
352 };
353
354 let finished: Vec<String> = mgr
355 .statuses()
356 .into_iter()
357 .filter_map(|(id, status)| {
358 if matches!(
359 status.state,
360 zeph_subagent::SubAgentState::Completed
361 | zeph_subagent::SubAgentState::Failed
362 | zeph_subagent::SubAgentState::Canceled
363 ) {
364 Some(id)
365 } else {
366 None
367 }
368 })
369 .collect();
370
371 let mut results = vec![];
372 for task_id in finished {
373 match mgr.collect(&task_id).await {
374 Ok(result) => results.push((task_id, result)),
375 Err(e) => {
376 tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
377 }
378 }
379 }
380 results
381 }
382
383 async fn call_llm_for_session_summary(
392 &self,
393 chat_messages: &[Message],
394 ) -> Option<zeph_memory::StructuredSummary> {
395 let timeout_dur = std::time::Duration::from_secs(
396 self.services
397 .memory
398 .compaction
399 .shutdown_summary_timeout_secs,
400 );
401 match tokio::time::timeout(
402 timeout_dur,
403 self.provider
404 .chat_typed_erased::<zeph_memory::StructuredSummary>(chat_messages),
405 )
406 .await
407 {
408 Ok(Ok(s)) => Some(s),
409 Ok(Err(e)) => {
410 tracing::warn!(
411 "shutdown summary: structured LLM call failed, falling back to plain: {e:#}"
412 );
413 self.plain_text_summary_fallback(chat_messages, timeout_dur)
414 .await
415 }
416 Err(_) => {
417 tracing::warn!(
418 "shutdown summary: structured LLM call timed out after {}s, falling back to plain",
419 self.services
420 .memory
421 .compaction
422 .shutdown_summary_timeout_secs
423 );
424 self.plain_text_summary_fallback(chat_messages, timeout_dur)
425 .await
426 }
427 }
428 }
429
430 async fn plain_text_summary_fallback(
431 &self,
432 chat_messages: &[Message],
433 timeout_dur: std::time::Duration,
434 ) -> Option<zeph_memory::StructuredSummary> {
435 match tokio::time::timeout(timeout_dur, self.provider.chat(chat_messages)).await {
436 Ok(Ok(plain)) => Some(zeph_memory::StructuredSummary {
437 summary: plain,
438 key_facts: vec![],
439 entities: vec![],
440 }),
441 Ok(Err(e)) => {
442 tracing::warn!("shutdown summary: plain LLM fallback failed: {e:#}");
443 None
444 }
445 Err(_) => {
446 tracing::warn!("shutdown summary: plain LLM fallback timed out");
447 None
448 }
449 }
450 }
451
452 async fn flush_orphaned_tool_use_on_shutdown(&mut self) {
457 use zeph_llm::provider::{MessagePart, Role};
458
459 let msgs = &self.msg.messages;
463 let Some(asst_idx) = msgs.iter().rposition(|m| m.role == Role::Assistant) else {
465 return;
466 };
467 let asst_msg = &msgs[asst_idx];
468 let tool_use_ids: Vec<(&str, &str, &serde_json::Value)> = asst_msg
469 .parts
470 .iter()
471 .filter_map(|p| {
472 if let MessagePart::ToolUse { id, name, input } = p {
473 Some((id.as_str(), name.as_str(), input))
474 } else {
475 None
476 }
477 })
478 .collect();
479 if tool_use_ids.is_empty() {
480 return;
481 }
482
483 let paired_ids: std::collections::HashSet<&str> = msgs
485 .get(asst_idx + 1..)
486 .into_iter()
487 .flatten()
488 .filter(|m| m.role == Role::User)
489 .flat_map(|m| m.parts.iter())
490 .filter_map(|p| {
491 if let MessagePart::ToolResult { tool_use_id, .. } = p {
492 Some(tool_use_id.as_str())
493 } else {
494 None
495 }
496 })
497 .collect();
498
499 let unpaired: Vec<zeph_llm::provider::ToolUseRequest> = tool_use_ids
500 .iter()
501 .filter(|(id, _, _)| !paired_ids.contains(*id))
502 .map(|(id, name, input)| zeph_llm::provider::ToolUseRequest {
503 id: (*id).to_owned(),
504 name: (*name).to_owned().into(),
505 input: (*input).clone(),
506 })
507 .collect();
508
509 if unpaired.is_empty() {
510 return;
511 }
512
513 tracing::info!(
514 count = unpaired.len(),
515 "shutdown: persisting tombstone ToolResults for unpaired in-flight tool calls"
516 );
517 self.persist_cancelled_tool_results(&unpaired).await;
518 }
519
520 async fn maybe_store_shutdown_summary(&mut self) {
530 if !self.services.memory.compaction.shutdown_summary {
531 return;
532 }
533 let Some(memory) = self.services.memory.persistence.memory.clone() else {
534 return;
535 };
536 let Some(conversation_id) = self.services.memory.persistence.conversation_id else {
537 return;
538 };
539
540 match memory.has_session_summary(conversation_id).await {
542 Ok(true) => {
543 tracing::debug!("shutdown summary: session already has a summary, skipping");
544 return;
545 }
546 Ok(false) => {}
547 Err(e) => {
548 tracing::warn!("shutdown summary: failed to check existing summary: {e:#}");
549 return;
550 }
551 }
552
553 let user_count = self
555 .msg
556 .messages
557 .iter()
558 .skip(1)
559 .filter(|m| m.role == Role::User)
560 .count();
561 if user_count
562 < self
563 .services
564 .memory
565 .compaction
566 .shutdown_summary_min_messages
567 {
568 tracing::debug!(
569 user_count,
570 min = self
571 .services
572 .memory
573 .compaction
574 .shutdown_summary_min_messages,
575 "shutdown summary: too few user messages, skipping"
576 );
577 return;
578 }
579
580 let _ = self.channel.send_status("Saving session summary...").await;
582
583 let max = self
585 .services
586 .memory
587 .compaction
588 .shutdown_summary_max_messages;
589 if max == 0 {
590 tracing::debug!("shutdown summary: max_messages=0, skipping");
591 return;
592 }
593 let non_system: Vec<_> = self.msg.messages.iter().skip(1).collect();
594 let slice = if non_system.len() > max {
595 &non_system[non_system.len() - max..]
596 } else {
597 &non_system[..]
598 };
599
600 let msgs_for_prompt: Vec<(zeph_memory::MessageId, String, String)> = slice
601 .iter()
602 .map(|m| {
603 let role = match m.role {
604 Role::User => "user".to_owned(),
605 Role::Assistant => "assistant".to_owned(),
606 Role::System => "system".to_owned(),
607 };
608 (zeph_memory::MessageId(0), role, m.content.clone())
609 })
610 .collect();
611
612 let prompt = zeph_memory::build_summarization_prompt(&msgs_for_prompt);
613 let chat_messages = vec![Message {
614 role: Role::User,
615 content: prompt,
616 parts: vec![],
617 metadata: MessageMetadata::default(),
618 }];
619
620 let Some(structured) = self.call_llm_for_session_summary(&chat_messages).await else {
621 let _ = self.channel.send_status("").await;
622 return;
623 };
624
625 if let Err(e) = memory
626 .store_shutdown_summary(conversation_id, &structured.summary, &structured.key_facts)
627 .await
628 {
629 tracing::warn!("shutdown summary: storage failed: {e:#}");
630 } else {
631 tracing::info!(
632 conversation_id = conversation_id.0,
633 "shutdown summary stored"
634 );
635 }
636
637 let _ = self.channel.send_status("").await;
639 }
640
641 #[tracing::instrument(name = "core.agent.shutdown", skip_all, level = "debug")]
657 pub async fn shutdown(&mut self) {
658 let _ = self.channel.send_status("Shutting down...").await;
659
660 self.provider.save_router_state().await;
662
663 if let Some(ref advisor) = self.services.orchestration.topology_advisor
665 && let Err(e) = advisor.save()
666 {
667 tracing::warn!(error = %e, "adaptorch: failed to persist state");
668 }
669
670 if let Some(ref mut mgr) = self.services.orchestration.subagent_manager {
671 mgr.shutdown_all();
672 }
673
674 if let Some(ref manager) = self.services.mcp.manager {
675 manager.shutdown_all_shared().await;
676 }
677
678 if let Some(turns) = self.context_manager.turns_since_last_hard_compaction {
682 self.update_metrics(|m| {
683 m.compaction_turns_after_hard.push(turns);
684 });
685 self.context_manager.turns_since_last_hard_compaction = None;
686 }
687
688 if let Some(ref tx) = self.runtime.metrics.metrics_tx {
689 let m = tx.borrow();
690 if m.filter_applications > 0 {
691 #[allow(clippy::cast_precision_loss)]
692 let pct = if m.filter_raw_tokens > 0 {
693 m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
694 } else {
695 0.0
696 };
697 tracing::info!(
698 raw_tokens = m.filter_raw_tokens,
699 saved_tokens = m.filter_saved_tokens,
700 applications = m.filter_applications,
701 "tool output filtering saved ~{} tokens ({pct:.0}%)",
702 m.filter_saved_tokens,
703 );
704 }
705 if m.compaction_hard_count > 0 {
706 tracing::info!(
707 hard_compactions = m.compaction_hard_count,
708 turns_after_hard = ?m.compaction_turns_after_hard,
709 "hard compaction trajectory"
710 );
711 }
712 }
713
714 self.flush_orphaned_tool_use_on_shutdown().await;
718
719 self.runtime.lifecycle.supervisor.abort_all();
725
726 if let Some(h) = self.services.compression.pending_task_goal.take() {
729 h.abort();
730 }
731 if let Some(h) = self.services.compression.pending_sidequest_result.take() {
732 h.abort();
733 }
734 if let Some(h) = self.services.compression.pending_subgoal.take() {
735 h.abort();
736 }
737
738 self.services.learning_engine.learning_tasks.abort_all();
740
741 for _ in 0..4 {
746 tokio::task::yield_now().await;
747 }
748
749 self.maybe_store_shutdown_summary().await;
750 self.maybe_store_session_digest().await;
751
752 tracing::info!("agent shutdown complete");
753 }
754
755 fn refresh_subagent_metrics(&mut self) {
762 let Some(ref mgr) = self.services.orchestration.subagent_manager else {
763 return;
764 };
765 let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
766 .statuses()
767 .into_iter()
768 .map(|(id, s)| {
769 let def = mgr.agents_def(&id);
770 crate::metrics::SubAgentMetrics {
771 name: def.map_or_else(|| id[..8.min(id.len())].to_owned(), |d| d.name.clone()),
772 id: id.clone(),
773 state: format!("{:?}", s.state).to_lowercase(),
774 turns_used: s.turns_used,
775 max_turns: def.map_or(20, |d| d.permissions.max_turns),
776 background: def.is_some_and(|d| d.permissions.background),
777 elapsed_secs: s.started_at.elapsed().as_secs(),
778 permission_mode: def.map_or_else(String::new, |d| {
779 use zeph_subagent::def::PermissionMode;
780 match d.permissions.permission_mode {
781 PermissionMode::Default => String::new(),
782 PermissionMode::AcceptEdits => "accept_edits".into(),
783 PermissionMode::DontAsk => "dont_ask".into(),
784 PermissionMode::BypassPermissions => "bypass_permissions".into(),
785 PermissionMode::Plan => "plan".into(),
786 }
787 }),
788 transcript_dir: mgr
789 .agent_transcript_dir(&id)
790 .map(|p| p.to_string_lossy().into_owned()),
791 }
792 })
793 .collect();
794 self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
795 }
796
797 async fn notify_completed_subagents(&mut self) -> Result<(), error::AgentError> {
799 let completed = self.poll_subagents().await;
800 for (task_id, result) in completed {
801 let notice = if result.is_empty() {
802 format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
803 } else {
804 format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
805 };
806 if let Err(e) = self.channel.send(¬ice).await {
807 tracing::warn!(error = %e, "failed to send sub-agent completion notice");
808 }
809 }
810 Ok(())
811 }
812
813 #[tracing::instrument(name = "core.agent.run", skip_all, level = "debug", err)]
819 #[allow(clippy::too_many_lines)] pub async fn run(&mut self) -> Result<(), error::AgentError>
821 where
822 C: 'static,
823 {
824 if let Some(mut rx) = self.runtime.lifecycle.warmup_ready.take()
825 && !*rx.borrow()
826 {
827 let _ = rx.changed().await;
828 if !*rx.borrow() {
829 tracing::warn!("model warmup did not complete successfully");
830 }
831 }
832
833 self.restore_channel_provider().await;
835
836 self.load_and_cache_session_digest().await;
838 self.maybe_send_resume_recap().await;
839
840 loop {
841 self.apply_provider_override();
842 self.check_tool_refresh().await;
843 self.process_pending_elicitations().await;
844 self.refresh_subagent_metrics();
845 self.notify_completed_subagents().await?;
846 self.drain_channel();
847
848 let (text, image_parts) = if let Some(queued) = self.msg.message_queue.pop_front() {
849 self.notify_queue_count().await;
850 if queued.raw_attachments.is_empty() {
851 (queued.text, queued.image_parts)
852 } else {
853 let msg = crate::channel::ChannelMessage {
854 text: queued.text,
855 attachments: queued.raw_attachments,
856 is_guest_context: false,
857 is_from_bot: false,
858 };
859 self.resolve_message(msg).await
860 }
861 } else {
862 match self.next_event().await? {
863 None | Some(LoopEvent::Shutdown) => break,
864 Some(LoopEvent::SkillReload) => {
865 self.reload_skills().await;
866 continue;
867 }
868 Some(LoopEvent::InstructionReload) => {
869 self.reload_instructions();
870 continue;
871 }
872 Some(LoopEvent::ConfigReload) => {
873 self.reload_config();
874 continue;
875 }
876 Some(LoopEvent::UpdateNotification(msg)) => {
877 if let Err(e) = self.channel.send(&msg).await {
878 tracing::warn!("failed to send update notification: {e}");
879 }
880 continue;
881 }
882 Some(LoopEvent::ExperimentCompleted(msg)) => {
883 self.services.experiments.cancel = None;
884 if let Err(e) = self.channel.send(&msg).await {
885 tracing::warn!("failed to send experiment completion: {e}");
886 }
887 continue;
888 }
889 Some(LoopEvent::ScheduledTask(prompt)) => {
890 let text = format!("{SCHEDULED_TASK_PREFIX}{prompt}");
891 let msg = crate::channel::ChannelMessage {
892 text,
893 attachments: Vec::new(),
894 is_guest_context: false,
895 is_from_bot: false,
896 };
897 self.drain_channel();
898 self.resolve_message(msg).await
899 }
900 Some(LoopEvent::TaskInjected(injection)) => {
901 if let Some(ref mut ls) = self.runtime.lifecycle.user_loop {
902 ls.iteration += 1;
903 tracing::info!(iteration = ls.iteration, "loop: tick");
904 }
905 let msg = crate::channel::ChannelMessage {
906 text: injection.prompt,
907 attachments: Vec::new(),
908 is_guest_context: false,
909 is_from_bot: false,
910 };
911 self.drain_channel();
912 self.resolve_message(msg).await
913 }
914 Some(LoopEvent::FileChanged(event)) => {
915 self.handle_file_changed(event).await;
916 continue;
917 }
918 Some(LoopEvent::Message(msg)) => {
919 self.services.session.is_guest_context = msg.is_guest_context;
920 self.drain_channel();
921 self.resolve_message(msg).await
922 }
923 }
924 };
925
926 let trimmed = text.trim();
927
928 if trimmed.starts_with('/') {
931 let slash_urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
932 if !slash_urls.is_empty() {
933 self.services
934 .security
935 .user_provided_urls
936 .write()
937 .extend(slash_urls);
938 }
939 }
940
941 let session_impl = command_context_impls::SessionAccessImpl {
964 supports_exit: self.channel.supports_exit(),
965 };
966 let mut messages_impl = command_context_impls::MessageAccessImpl {
967 msg: &mut self.msg,
968 tool_state: &mut self.services.tool_state,
969 providers: &mut self.runtime.providers,
970 metrics: &self.runtime.metrics,
971 security: &mut self.services.security,
972 tool_orchestrator: &mut self.tool_orchestrator,
973 };
974 let mut sink_adapter = crate::channel::ChannelSinkAdapter(&mut self.channel);
976 let mut null_agent = zeph_commands::NullAgent;
978 let registry_handled = {
979 use zeph_commands::CommandRegistry;
980 use zeph_commands::handlers::debug::{
981 DebugDumpCommand, DumpFormatCommand, LogCommand,
982 };
983 use zeph_commands::handlers::help::HelpCommand;
984 use zeph_commands::handlers::session::{
985 ClearCommand, ClearQueueCommand, ExitCommand, QuitCommand, ResetCommand,
986 };
987
988 let mut reg = CommandRegistry::new();
989 reg.register(ExitCommand);
990 reg.register(QuitCommand);
991 reg.register(ClearCommand);
992 reg.register(ResetCommand);
993 reg.register(ClearQueueCommand);
994 reg.register(LogCommand);
995 reg.register(DebugDumpCommand);
996 reg.register(DumpFormatCommand);
997 reg.register(HelpCommand);
998 #[cfg(test)]
999 reg.register(test_stubs::TestErrorCommand);
1000
1001 let mut ctx = zeph_commands::CommandContext {
1002 sink: &mut sink_adapter,
1003 debug: &mut self.runtime.debug,
1004 messages: &mut messages_impl,
1005 session: &session_impl,
1006 agent: &mut null_agent,
1007 };
1008 reg.dispatch(&mut ctx, trimmed).await
1009 };
1010 let session_reg_missed = registry_handled.is_none();
1011 match self
1012 .apply_dispatch_result(registry_handled, trimmed, false)
1013 .await
1014 {
1015 DispatchFlow::Break => break,
1016 DispatchFlow::Continue => continue,
1017 DispatchFlow::Fallthrough => {
1018 }
1020 }
1021
1022 let mut agent_null_debug = command_context_impls::NullDebugAccess;
1028 let mut agent_null_messages = command_context_impls::NullMessageAccess;
1029 let agent_null_session = command_context_impls::NullSessionAccess;
1030 let mut agent_null_sink = zeph_commands::NullSink;
1031 let agent_result: Option<
1032 Result<zeph_commands::CommandOutput, zeph_commands::CommandError>,
1033 > = if session_reg_missed {
1034 use zeph_commands::CommandRegistry;
1035 use zeph_commands::handlers::{
1036 acp::AcpCommand,
1037 agent_cmd::AgentCommand,
1038 compaction::{CompactCommand, NewConversationCommand, RecapCommand},
1039 experiment::ExperimentCommand,
1040 goal::GoalCommand,
1041 loop_cmd::LoopCommand,
1042 lsp::LspCommand,
1043 mcp::McpCommand,
1044 memory::{GraphCommand, GuidelinesCommand, MemoryCommand},
1045 misc::{CacheStatsCommand, ImageCommand, NotifyTestCommand},
1046 model::{ModelCommand, ProviderCommand},
1047 plan::PlanCommand,
1048 plugins::PluginsCommand,
1049 policy::PolicyCommand,
1050 scheduler::SchedulerCommand,
1051 skill::{FeedbackCommand, SkillCommand, SkillsCommand},
1052 status::{FocusCommand, GuardrailCommand, SideQuestCommand, StatusCommand},
1053 trajectory::{ScopeCommand, TrajectoryCommand},
1054 };
1055
1056 let mut agent_reg = CommandRegistry::new();
1057 agent_reg.register(MemoryCommand);
1058 agent_reg.register(GraphCommand);
1059 agent_reg.register(GuidelinesCommand);
1060 agent_reg.register(ModelCommand);
1061 agent_reg.register(ProviderCommand);
1062 agent_reg.register(SkillCommand);
1064 agent_reg.register(SkillsCommand);
1065 agent_reg.register(FeedbackCommand);
1066 agent_reg.register(McpCommand);
1067 agent_reg.register(PolicyCommand);
1068 agent_reg.register(SchedulerCommand);
1069 agent_reg.register(LspCommand);
1070 agent_reg.register(CacheStatsCommand);
1072 agent_reg.register(ImageCommand);
1073 agent_reg.register(NotifyTestCommand);
1074 agent_reg.register(StatusCommand);
1075 agent_reg.register(GuardrailCommand);
1076 agent_reg.register(FocusCommand);
1077 agent_reg.register(SideQuestCommand);
1078 agent_reg.register(AgentCommand);
1079 agent_reg.register(CompactCommand);
1081 agent_reg.register(NewConversationCommand);
1082 agent_reg.register(RecapCommand);
1083 agent_reg.register(ExperimentCommand);
1084 agent_reg.register(PlanCommand);
1085 agent_reg.register(LoopCommand);
1086 agent_reg.register(PluginsCommand);
1087 agent_reg.register(AcpCommand);
1088 #[cfg(feature = "cocoon")]
1089 agent_reg.register(zeph_commands::handlers::cocoon::CocoonCommand);
1090 agent_reg.register(TrajectoryCommand);
1091 agent_reg.register(ScopeCommand);
1092 agent_reg.register(GoalCommand);
1093
1094 let mut ctx = zeph_commands::CommandContext {
1095 sink: &mut agent_null_sink,
1096 debug: &mut agent_null_debug,
1097 messages: &mut agent_null_messages,
1098 session: &agent_null_session,
1099 agent: self,
1100 };
1101 agent_reg.dispatch(&mut ctx, trimmed).await
1103 } else {
1104 None
1105 };
1106 match self
1110 .apply_dispatch_result(agent_result, trimmed, true)
1111 .await
1112 {
1113 DispatchFlow::Break => break,
1114 DispatchFlow::Continue => continue,
1115 DispatchFlow::Fallthrough => {
1116 }
1118 }
1119
1120 match self.handle_builtin_command(trimmed) {
1121 Some(true) => break,
1122 Some(false) => continue,
1123 None => {}
1124 }
1125
1126 self.process_user_message(text, image_parts).await?;
1127 }
1128
1129 self.maybe_autodream().await;
1132
1133 if let Some(ref mut tc) = self.runtime.debug.trace_collector {
1135 tc.finish();
1136 }
1137
1138 Ok(())
1139 }
1140
1141 async fn apply_dispatch_result(
1147 &mut self,
1148 result: Option<Result<zeph_commands::CommandOutput, zeph_commands::CommandError>>,
1149 command: &str,
1150 with_learning: bool,
1151 ) -> DispatchFlow {
1152 match result {
1153 Some(Ok(zeph_commands::CommandOutput::Exit)) => {
1154 let _ = self.channel.flush_chunks().await;
1155 DispatchFlow::Break
1156 }
1157 Some(Ok(
1158 zeph_commands::CommandOutput::Continue | zeph_commands::CommandOutput::Silent,
1159 )) => {
1160 let _ = self.channel.flush_chunks().await;
1161 DispatchFlow::Continue
1162 }
1163 Some(Ok(zeph_commands::CommandOutput::Message(msg))) => {
1164 let _ = self.channel.send(&msg).await;
1165 let _ = self.channel.flush_chunks().await;
1166 if with_learning {
1167 self.maybe_trigger_post_command_learning(command).await;
1168 }
1169 DispatchFlow::Continue
1170 }
1171 Some(Err(e)) => {
1172 let _ = self.channel.send(&e.to_string()).await;
1173 let _ = self.channel.flush_chunks().await;
1174 tracing::warn!(command = %command, error = %e.0, "slash command failed");
1175 DispatchFlow::Continue
1176 }
1177 None => DispatchFlow::Fallthrough,
1178 }
1179 }
1180
1181 fn apply_provider_override(&mut self) {
1183 if let Some(ref slot) = self.runtime.providers.provider_override
1184 && let Some(new_provider) = slot.write().take()
1185 {
1186 tracing::debug!(provider = new_provider.name(), "ACP model override applied");
1187 self.provider = new_provider;
1188 }
1189 }
1190
1191 #[tracing::instrument(name = "core.agent.next_event", skip_all, level = "debug", err)]
1199 async fn next_event(&mut self) -> Result<Option<LoopEvent>, error::AgentError> {
1200 let event = tokio::select! {
1201 result = self.channel.recv() => {
1202 return Ok(result?.map(LoopEvent::Message));
1203 }
1204 () = shutdown_signal(&mut self.runtime.lifecycle.shutdown) => {
1205 tracing::info!("shutting down");
1206 LoopEvent::Shutdown
1207 }
1208 Some(_) = recv_optional(&mut self.services.skill.skill_reload_rx) => {
1209 LoopEvent::SkillReload
1210 }
1211 Some(_) = recv_optional(&mut self.runtime.instructions.reload_rx) => {
1212 LoopEvent::InstructionReload
1213 }
1214 Some(_) = recv_optional(&mut self.runtime.lifecycle.config_reload_rx) => {
1215 LoopEvent::ConfigReload
1216 }
1217 Some(msg) = recv_optional(&mut self.runtime.lifecycle.update_notify_rx) => {
1218 LoopEvent::UpdateNotification(msg)
1219 }
1220 Some(msg) = recv_optional(&mut self.services.experiments.notify_rx) => {
1221 LoopEvent::ExperimentCompleted(msg)
1222 }
1223 Some(prompt) = recv_optional(&mut self.runtime.lifecycle.custom_task_rx) => {
1224 tracing::info!("scheduler: injecting custom task as agent turn");
1225 LoopEvent::ScheduledTask(prompt)
1226 }
1227 () = async {
1228 if let Some(ref mut ls) = self.runtime.lifecycle.user_loop {
1229 if ls.cancel_tx.is_cancelled() {
1230 std::future::pending::<()>().await;
1231 } else {
1232 ls.interval.tick().await;
1233 }
1234 } else {
1235 std::future::pending::<()>().await;
1236 }
1237 } => {
1238 let Some(ls) = self.runtime.lifecycle.user_loop.as_ref() else {
1242 return Ok(None);
1243 };
1244 if ls.cancel_tx.is_cancelled() {
1245 self.runtime.lifecycle.user_loop = None;
1246 return Ok(None);
1247 }
1248 let prompt = ls.prompt.clone();
1249 LoopEvent::TaskInjected(task_injection::TaskInjection { prompt })
1250 }
1251 Some(event) = recv_optional(&mut self.runtime.lifecycle.file_changed_rx) => {
1252 LoopEvent::FileChanged(event)
1253 }
1254 };
1255 Ok(Some(event))
1256 }
1257
1258 #[tracing::instrument(name = "core.agent.resolve_message", skip_all, level = "debug")]
1259 async fn resolve_message(
1260 &self,
1261 msg: crate::channel::ChannelMessage,
1262 ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
1263 use crate::channel::{Attachment, AttachmentKind};
1264 use zeph_llm::provider::{ImageData, MessagePart};
1265
1266 let text_base = msg.text.clone();
1267
1268 let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
1269 .attachments
1270 .into_iter()
1271 .partition(|a| a.kind == AttachmentKind::Audio);
1272
1273 tracing::debug!(
1274 audio = audio_attachments.len(),
1275 has_stt = self.runtime.providers.stt.is_some(),
1276 "resolve_message attachments"
1277 );
1278
1279 let text = if !audio_attachments.is_empty()
1280 && let Some(stt) = self.runtime.providers.stt.as_ref()
1281 {
1282 let mut transcribed_parts = Vec::new();
1283 for attachment in &audio_attachments {
1284 if attachment.data.len() > MAX_AUDIO_BYTES {
1285 tracing::warn!(
1286 size = attachment.data.len(),
1287 max = MAX_AUDIO_BYTES,
1288 "audio attachment exceeds size limit, skipping"
1289 );
1290 continue;
1291 }
1292 match stt
1293 .transcribe(&attachment.data, attachment.filename.as_deref())
1294 .await
1295 {
1296 Ok(result) => {
1297 tracing::info!(
1298 len = result.text.len(),
1299 language = ?result.language,
1300 "audio transcribed"
1301 );
1302 transcribed_parts.push(result.text);
1303 }
1304 Err(e) => {
1305 tracing::error!(error = %e, "audio transcription failed");
1306 }
1307 }
1308 }
1309 if transcribed_parts.is_empty() {
1310 text_base
1311 } else {
1312 let transcribed = transcribed_parts.join("\n");
1313 if text_base.is_empty() {
1314 transcribed
1315 } else {
1316 format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
1317 }
1318 }
1319 } else {
1320 if !audio_attachments.is_empty() {
1321 tracing::warn!(
1322 count = audio_attachments.len(),
1323 "audio attachments received but no STT provider configured, dropping"
1324 );
1325 }
1326 text_base
1327 };
1328
1329 let mut image_parts = Vec::new();
1330 for attachment in image_attachments {
1331 if attachment.data.len() > MAX_IMAGE_BYTES {
1332 tracing::warn!(
1333 size = attachment.data.len(),
1334 max = MAX_IMAGE_BYTES,
1335 "image attachment exceeds size limit, skipping"
1336 );
1337 continue;
1338 }
1339 let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
1340 image_parts.push(MessagePart::Image(Box::new(ImageData {
1341 data: attachment.data,
1342 mime_type,
1343 })));
1344 }
1345
1346 (text, image_parts)
1347 }
1348
1349 fn begin_turn(&mut self, input: turn::TurnInput) -> turn::Turn {
1356 let id = turn::TurnId(self.runtime.debug.iteration_counter as u64);
1357 self.runtime.debug.iteration_counter += 1;
1358 let cancel_token = CancellationToken::new();
1359 self.runtime.lifecycle.cancel_token = cancel_token.clone();
1361 self.services.security.user_provided_urls.write().clear();
1362 self.runtime.lifecycle.turn_llm_requests = 0;
1364
1365 {
1367 let pending: Vec<u8> = {
1368 let mut q = self.services.security.trajectory_signal_queue.lock();
1369 std::mem::take(&mut *q)
1370 };
1371 for code in pending {
1372 self.services
1373 .security
1374 .trajectory
1375 .record(crate::agent::trajectory::RiskSignal::from_code(code));
1376 }
1377 }
1378 if self.services.security.trajectory.advance_turn()
1381 && let Some(logger) = self.tool_orchestrator.audit_logger.clone()
1382 {
1383 let entry = zeph_tools::AuditEntry {
1384 timestamp: zeph_tools::chrono_now(),
1385 tool: "<sentinel>".to_owned().into(),
1386 command: String::new(),
1387 result: zeph_tools::AuditResult::Success,
1388 duration_ms: 0,
1389 error_category: Some("trajectory_auto_recover".to_owned()),
1390 error_domain: Some("security".to_owned()),
1391 error_phase: None,
1392 claim_source: None,
1393 mcp_server_id: None,
1394 injection_flagged: false,
1395 embedding_anomalous: false,
1396 cross_boundary_mcp_to_acp: false,
1397 adversarial_policy_decision: None,
1398 exit_code: None,
1399 truncated: false,
1400 caller_id: None,
1401 policy_match: None,
1402 correlation_id: None,
1403 vigil_risk: None,
1404 execution_env: None,
1405 resolved_cwd: None,
1406 scope_at_definition: None,
1407 scope_at_dispatch: None,
1408 };
1409 self.runtime.lifecycle.supervisor.spawn(
1410 crate::agent::agent_supervisor::TaskClass::Telemetry,
1411 "trajectory-auto-recover-audit",
1412 async move { logger.log(&entry).await },
1413 );
1414 }
1415 if let Some(ref sentinel) = self.services.security.shadow_sentinel {
1417 sentinel.advance_turn();
1418 }
1419 let risk_level = self.services.security.trajectory.current_risk();
1421 *self.services.security.trajectory_risk_slot.write() = u8::from(risk_level);
1422 if let Some(alert) = self.services.security.trajectory.poll_alert() {
1424 let msg = format!(
1425 "[trajectory] Risk level: {:?} (score={:.2})",
1426 alert.level, alert.score
1427 );
1428 tracing::warn!(
1429 level = ?alert.level,
1430 score = alert.score,
1431 "trajectory sentinel alert"
1432 );
1433 if let Some(ref tx) = self.services.session.status_tx {
1434 let _ = tx.send(msg);
1435 }
1436 }
1437
1438 let context = turn::TurnContext::new(id, cancel_token, self.runtime.config.timeouts);
1439 turn::Turn::new(context, input)
1440 }
1441
1442 fn end_turn(&mut self, turn: turn::Turn) {
1449 self.runtime.metrics.pending_timings = turn.metrics.timings;
1450 self.flush_turn_timings();
1451 self.services.session.current_turn_intent = None;
1453 self.services.session.is_guest_context = false;
1455 if let Some(ref engine) = self.services.speculation_engine {
1457 let metrics = engine.end_turn();
1458 if metrics.committed > 0 || metrics.cancelled > 0 {
1459 tracing::debug!(
1460 committed = metrics.committed,
1461 cancelled = metrics.cancelled,
1462 wasted_ms = metrics.wasted_ms,
1463 "speculation: turn boundary metrics"
1464 );
1465 }
1466 }
1467 }
1468
1469 #[tracing::instrument(
1470 name = "core.agent.process_user_message",
1471 skip_all,
1472 level = "debug",
1473 fields(turn_id),
1474 err
1475 )]
1476 async fn process_user_message(
1477 &mut self,
1478 text: String,
1479 image_parts: Vec<zeph_llm::provider::MessagePart>,
1480 ) -> Result<(), error::AgentError> {
1481 let input = turn::TurnInput::new(text, image_parts);
1482 let mut t = self.begin_turn(input);
1483
1484 let turn_idx = usize::try_from(t.id().0).unwrap_or(usize::MAX);
1485 tracing::Span::current().record("turn_id", t.id().0);
1486 self.runtime
1488 .debug
1489 .start_iteration_span(turn_idx, t.input.text.trim());
1490
1491 let result = Box::pin(self.process_user_message_inner(&mut t)).await;
1492
1493 let span_status = if result.is_ok() {
1495 crate::debug_dump::trace::SpanStatus::Ok
1496 } else {
1497 crate::debug_dump::trace::SpanStatus::Error {
1498 message: "iteration failed".to_owned(),
1499 }
1500 };
1501 self.runtime.debug.end_iteration_span(turn_idx, span_status);
1502
1503 self.end_turn(t);
1504 result
1505 }
1506
1507 #[tracing::instrument(
1508 name = "core.agent.process_user_message_inner",
1509 skip_all,
1510 level = "debug",
1511 err
1512 )]
1513 async fn process_user_message_inner(
1514 &mut self,
1515 turn: &mut turn::Turn,
1516 ) -> Result<(), error::AgentError> {
1517 self.reap_background_tasks_and_update_metrics();
1518
1519 let tokens_before_turn = self
1520 .runtime
1521 .metrics
1522 .metrics_tx
1523 .as_ref()
1524 .map_or(0, |tx| tx.borrow().total_tokens);
1525
1526 self.drain_background_completions();
1530
1531 self.wire_cancel_bridge(turn.cancel_token());
1532
1533 let text = turn.input.text.clone();
1535 let trimmed_owned = text.trim().to_owned();
1536 let trimmed = trimmed_owned.as_str();
1537
1538 if self.services.security.vigil.is_some() {
1541 let intent_len = trimmed.floor_char_boundary(1024.min(trimmed.len()));
1542 self.services.session.current_turn_intent = Some(trimmed[..intent_len].to_owned());
1543 }
1544
1545 if let Some(result) = self.dispatch_slash_command(trimmed).await {
1546 return result;
1547 }
1548
1549 self.check_pending_rollbacks().await;
1550
1551 if self.pre_process_security(trimmed).await? {
1552 return Ok(());
1553 }
1554
1555 let t_ctx = std::time::Instant::now();
1556 tracing::debug!("turn timing: prepare_context start");
1557 self.advance_context_lifecycle_guarded(&text, trimmed).await;
1558 turn.metrics_mut().timings.prepare_context_ms =
1559 u64::try_from(t_ctx.elapsed().as_millis()).unwrap_or(u64::MAX);
1560 tracing::debug!(
1561 ms = turn.metrics_snapshot().timings.prepare_context_ms,
1562 "turn timing: prepare_context done"
1563 );
1564
1565 let image_parts = std::mem::take(&mut turn.input.image_parts);
1566 let merged_text = self.build_user_message_text_with_bg_completions(&text);
1570 let user_msg = self.build_user_message(&merged_text, image_parts);
1571
1572 let urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
1575 if !urls.is_empty() {
1576 self.services
1577 .security
1578 .user_provided_urls
1579 .write()
1580 .extend(urls);
1581 }
1582
1583 self.services.memory.extraction.goal_text = Some(text.clone());
1586
1587 let t_persist = std::time::Instant::now();
1588 tracing::debug!("turn timing: persist_message(user) start");
1589 self.persist_message(Role::User, &text, &[], false).await;
1591 turn.metrics_mut().timings.persist_message_ms =
1592 u64::try_from(t_persist.elapsed().as_millis()).unwrap_or(u64::MAX);
1593 tracing::debug!(
1594 ms = turn.metrics_snapshot().timings.persist_message_ms,
1595 "turn timing: persist_message(user) done"
1596 );
1597 self.push_message(user_msg);
1598
1599 tracing::debug!("turn timing: process_response start");
1602 let turn_had_error = if let Err(e) = self.process_response().await {
1603 self.services.learning_engine.learning_tasks.detach_all();
1605 tracing::error!("Response processing failed: {e:#}");
1606
1607 if e.is_no_providers() {
1610 self.runtime.lifecycle.last_no_providers_at = Some(std::time::Instant::now());
1611 let backoff_secs = self.runtime.config.timeouts.no_providers_backoff_secs;
1612 tracing::warn!(
1613 backoff_secs,
1614 "no providers available; backing off before next turn"
1615 );
1616 tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await;
1617 }
1618
1619 let user_msg = format!("Error: {e:#}");
1620 self.channel.send(&user_msg).await?;
1621 self.msg.messages.pop();
1622 self.recompute_prompt_tokens();
1623 self.channel.flush_chunks().await?;
1624 true
1625 } else {
1626 self.services.learning_engine.learning_tasks.detach_all();
1629 self.truncate_old_tool_results();
1630 self.maybe_update_magic_docs();
1632 self.maybe_spawn_promotion_scan();
1634 false
1635 };
1636 tracing::debug!("turn timing: process_response done");
1637
1638 if let Some(pipeline) = self.services.quality.clone() {
1640 self.run_self_check_for_turn(pipeline, turn.id().0).await;
1641 }
1642 let _ = self.channel.flush_chunks().await;
1647
1648 self.maybe_fire_completion_notification(turn, turn_had_error);
1649
1650 self.flush_goal_accounting(tokens_before_turn);
1651
1652 turn.metrics_mut().timings.llm_chat_ms = self.runtime.metrics.pending_timings.llm_chat_ms;
1657 turn.metrics_mut().timings.tool_exec_ms = self.runtime.metrics.pending_timings.tool_exec_ms;
1658
1659 Ok(())
1660 }
1661
1662 fn wire_cancel_bridge(&mut self, turn_token: &tokio_util::sync::CancellationToken) {
1668 let signal = Arc::clone(&self.runtime.lifecycle.cancel_signal);
1669 let token = turn_token.clone();
1670 self.runtime.lifecycle.cancel_token = turn_token.clone();
1672 if let Some(prev) = self.runtime.lifecycle.cancel_bridge_handle.take() {
1673 prev.abort();
1674 }
1675 self.runtime.lifecycle.cancel_bridge_handle =
1676 Some(self.runtime.lifecycle.task_supervisor.spawn_oneshot(
1677 std::sync::Arc::from("agent.lifecycle.cancel_bridge"),
1678 move || async move {
1679 signal.notified().await;
1680 token.cancel();
1681 },
1682 ));
1683 }
1684
1685 fn reap_background_tasks_and_update_metrics(&mut self) {
1689 let bg_signal = self.runtime.lifecycle.supervisor.reap();
1690 if bg_signal.did_summarize {
1691 self.services.memory.persistence.unsummarized_count = 0;
1692 tracing::debug!("background summarization completed; unsummarized_count reset");
1693 }
1694 let snap = self.runtime.lifecycle.supervisor.metrics_snapshot();
1695 self.update_metrics(|m| {
1696 m.bg_inflight = snap.inflight as u64;
1697 m.bg_dropped = snap.total_dropped();
1698 m.bg_completed = snap.total_completed();
1699 m.bg_enrichment_inflight = snap.class_inflight[0] as u64;
1700 m.bg_telemetry_inflight = snap.class_inflight[1] as u64;
1701 });
1702
1703 if self.runtime.lifecycle.shell_executor_handle.is_some() {
1705 let shell_rows: Vec<crate::metrics::ShellBackgroundRunRow> = self
1706 .runtime
1707 .lifecycle
1708 .shell_executor_handle
1709 .as_ref()
1710 .map(|e| e.background_runs_snapshot())
1711 .unwrap_or_default()
1712 .into_iter()
1713 .map(|s| crate::metrics::ShellBackgroundRunRow {
1714 run_id: truncate_shell_run_id(&s.run_id),
1715 command: truncate_shell_command(&s.command),
1716 elapsed_secs: s.elapsed_ms / 1000,
1717 })
1718 .collect();
1719 self.update_metrics(|m| {
1720 m.shell_background_runs = shell_rows;
1721 });
1722 }
1723
1724 if self
1727 .runtime
1728 .config
1729 .supervisor_config
1730 .abort_enrichment_on_turn
1731 {
1732 self.runtime
1733 .lifecycle
1734 .supervisor
1735 .abort_class(agent_supervisor::TaskClass::Enrichment);
1736 }
1737 }
1738
1739 fn maybe_fire_completion_notification(&mut self, turn: &turn::Turn, is_error: bool) {
1752 let snap = turn.metrics_snapshot().timings.clone();
1753 let duration_ms = snap
1754 .prepare_context_ms
1755 .saturating_add(snap.llm_chat_ms)
1756 .saturating_add(snap.tool_exec_ms);
1757 let summary = crate::notifications::TurnSummary {
1758 duration_ms,
1759 preview: self.last_assistant_preview(160),
1760 tool_calls: 0,
1762 llm_requests: self.runtime.lifecycle.turn_llm_requests,
1763 exit_status: if is_error {
1764 crate::notifications::TurnExitStatus::Error
1765 } else {
1766 crate::notifications::TurnExitStatus::Success
1767 },
1768 };
1769
1770 let gate_ok = self
1772 .runtime
1773 .lifecycle
1774 .notifier
1775 .as_ref()
1776 .is_none_or(|n| n.should_fire(&summary));
1777
1778 if let Some(ref notifier) = self.runtime.lifecycle.notifier
1780 && gate_ok
1781 {
1782 notifier.fire(&summary);
1783 }
1784
1785 let hooks = self.services.session.hooks_config.turn_complete.clone();
1790 if !hooks.is_empty() && gate_ok {
1791 let mut env = std::collections::HashMap::new();
1792 env.insert(
1793 "ZEPH_TURN_DURATION_MS".to_owned(),
1794 summary.duration_ms.to_string(),
1795 );
1796 env.insert(
1797 "ZEPH_TURN_STATUS".to_owned(),
1798 if is_error { "error" } else { "success" }.to_owned(),
1799 );
1800 env.insert("ZEPH_TURN_PREVIEW".to_owned(), summary.preview.clone());
1801 env.insert(
1802 "ZEPH_TURN_LLM_REQUESTS".to_owned(),
1803 summary.llm_requests.to_string(),
1804 );
1805 let dispatch = self.mcp_dispatch();
1806 let _span = tracing::info_span!("core.agent.turn_hooks").entered();
1807 let _accepted = self.runtime.lifecycle.supervisor.spawn(
1808 agent_supervisor::TaskClass::Telemetry,
1809 "turn-complete-hooks",
1810 async move {
1811 let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
1812 .as_ref()
1813 .map(|d| d as &dyn zeph_subagent::McpDispatch);
1814 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp).await {
1815 tracing::warn!(error = %e, "turn_complete hook failed");
1816 }
1817 },
1818 );
1819 }
1820 }
1821
1822 fn flush_goal_accounting(&mut self, tokens_before: u64) {
1825 let goal_snap = self
1826 .services
1827 .goal_accounting
1828 .as_ref()
1829 .and_then(|a| a.snapshot());
1830 self.update_metrics(|m| m.active_goal = goal_snap);
1831
1832 if let Some(ref accounting) = self.services.goal_accounting {
1833 let tokens_after = self
1834 .runtime
1835 .metrics
1836 .metrics_tx
1837 .as_ref()
1838 .map_or(0, |tx| tx.borrow().total_tokens);
1839 let turn_tokens = tokens_after.saturating_sub(tokens_before);
1840 let mut spawned: Option<
1841 std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + 'static>>,
1842 > = None;
1843 accounting.on_turn_complete(turn_tokens, |fut| {
1844 spawned = Some(fut);
1845 });
1846 if let Some(fut) = spawned {
1847 let _ = self.runtime.lifecycle.supervisor.spawn(
1848 agent_supervisor::TaskClass::Telemetry,
1849 "goal-accounting",
1850 fut,
1851 );
1852 }
1853 }
1854 }
1855
1856 #[tracing::instrument(
1858 name = "core.agent.pre_process_security",
1859 skip_all,
1860 level = "debug",
1861 err
1862 )]
1863 async fn pre_process_security(&mut self, trimmed: &str) -> Result<bool, error::AgentError> {
1864 if let Some(ref guardrail) = self.services.security.guardrail {
1866 use zeph_sanitizer::guardrail::GuardrailVerdict;
1867 let verdict = guardrail.check(trimmed).await;
1868 match &verdict {
1869 GuardrailVerdict::Flagged { reason, .. } => {
1870 tracing::warn!(
1871 reason = %reason,
1872 should_block = verdict.should_block(),
1873 "guardrail flagged user input"
1874 );
1875 if verdict.should_block() {
1876 let msg = format!("[guardrail] Input blocked: {reason}");
1877 let _ = self.channel.send(&msg).await;
1878 let _ = self.channel.flush_chunks().await;
1879 return Ok(true);
1880 }
1881 let _ = self
1883 .channel
1884 .send(&format!("[guardrail] Warning: {reason}"))
1885 .await;
1886 }
1887 GuardrailVerdict::Error { error } => {
1888 if guardrail.error_should_block() {
1889 tracing::warn!(%error, "guardrail check failed (fail_strategy=closed), blocking input");
1890 let msg = "[guardrail] Input blocked: check failed (see logs for details)";
1891 let _ = self.channel.send(msg).await;
1892 let _ = self.channel.flush_chunks().await;
1893 return Ok(true);
1894 }
1895 tracing::warn!(%error, "guardrail check failed (fail_strategy=open), allowing input");
1896 }
1897 GuardrailVerdict::Safe => {}
1898 }
1899 }
1900
1901 #[cfg(feature = "classifiers")]
1907 if self.services.security.sanitizer.scan_user_input() {
1908 match self
1909 .services
1910 .security
1911 .sanitizer
1912 .classify_injection(trimmed)
1913 .await
1914 {
1915 zeph_sanitizer::InjectionVerdict::Blocked => {
1916 self.push_classifier_metrics();
1917 let _ = self
1918 .channel
1919 .send("[security] Input blocked: injection detected by classifier.")
1920 .await;
1921 let _ = self.channel.flush_chunks().await;
1922 return Ok(true);
1923 }
1924 zeph_sanitizer::InjectionVerdict::Suspicious => {
1925 tracing::warn!("injection_classifier soft_signal on user input");
1926 }
1927 zeph_sanitizer::InjectionVerdict::Clean => {}
1928 }
1929 }
1930 #[cfg(feature = "classifiers")]
1931 self.push_classifier_metrics();
1932
1933 Ok(false)
1934 }
1935
1936 async fn advance_context_lifecycle_guarded(&mut self, text: &str, trimmed: &str) {
1943 let backoff_secs = self.runtime.config.timeouts.no_providers_backoff_secs;
1944 let prep_timeout_secs = self.runtime.config.timeouts.context_prep_timeout_secs;
1945
1946 let providers_recently_failed = self
1948 .runtime
1949 .lifecycle
1950 .last_no_providers_at
1951 .is_some_and(|t| t.elapsed().as_secs() < backoff_secs);
1952
1953 if providers_recently_failed {
1954 tracing::warn!(
1955 backoff_secs,
1956 "skipping context preparation: providers were unavailable on last turn"
1957 );
1958 return;
1959 }
1960
1961 let timeout_dur = std::time::Duration::from_secs(prep_timeout_secs);
1962 match tokio::time::timeout(timeout_dur, self.advance_context_lifecycle(text, trimmed)).await
1963 {
1964 Ok(()) => {}
1965 Err(_elapsed) => {
1966 tracing::warn!(
1967 timeout_secs = prep_timeout_secs,
1968 "context preparation timed out; proceeding with degraded context"
1969 );
1970 }
1971 }
1972 }
1973
1974 #[tracing::instrument(
1975 name = "core.agent.advance_context_lifecycle",
1976 skip_all,
1977 level = "debug"
1978 )]
1979 async fn advance_context_lifecycle(&mut self, text: &str, trimmed: &str) {
1980 self.services.mcp.pruning_cache.reset();
1982
1983 let conv_id = self.services.memory.persistence.conversation_id;
1986 self.rebuild_system_prompt(text).await;
1987
1988 self.detect_and_record_corrections(trimmed, conv_id).await;
1989 self.services.learning_engine.tick();
1990 self.analyze_and_learn().await;
1991 self.sync_graph_counts().await;
1992
1993 self.context_manager.compaction = self.context_manager.compaction.advance_turn();
1998
1999 {
2001 self.services.focus.tick();
2002
2003 let sidequest_should_fire = self.services.sidequest.tick();
2006 if sidequest_should_fire && !self.context_manager.compaction.is_compacted_this_turn() {
2007 self.maybe_sidequest_eviction();
2008 }
2009 }
2010
2011 {
2014 let cfg = &self.services.memory.extraction.graph_config.experience;
2015 if cfg.enabled
2016 && cfg.evolution_sweep_enabled
2017 && cfg.evolution_sweep_interval > 0
2018 && self
2019 .services
2020 .sidequest
2021 .turn_counter
2022 .checked_rem(cfg.evolution_sweep_interval as u64)
2023 == Some(0)
2024 && let Some(memory) = self.services.memory.persistence.memory.as_ref()
2025 && let (Some(exp), Some(graph)) =
2026 (memory.experience.as_ref(), memory.graph_store.as_ref())
2027 {
2028 let exp = std::sync::Arc::clone(exp);
2029 let graph = std::sync::Arc::clone(graph);
2030 let threshold = cfg.confidence_prune_threshold;
2031 let turn = self.services.sidequest.turn_counter;
2032 let accepted = self.runtime.lifecycle.supervisor.spawn(
2033 agent_supervisor::TaskClass::Telemetry,
2034 "experience-sweep",
2035 async move {
2036 match exp.evolution_sweep(graph.as_ref(), threshold).await {
2037 Ok(stats) => tracing::info!(
2038 turn,
2039 self_loops = stats.pruned_self_loops,
2040 low_confidence = stats.pruned_low_confidence,
2041 "evolution sweep complete",
2042 ),
2043 Err(e) => tracing::warn!(
2044 turn,
2045 error = %e,
2046 "evolution sweep failed",
2047 ),
2048 }
2049 },
2050 );
2051 if !accepted {
2052 tracing::warn!(
2053 turn = self.services.sidequest.turn_counter,
2054 "experience-sweep dropped (telemetry class at capacity)",
2055 );
2056 }
2057 }
2058 }
2059
2060 if let Some(warning) = self.cache_expiry_warning() {
2062 tracing::info!(warning, "cache expiry warning");
2063 let _ = self.channel.send_status(&warning).await;
2064 }
2065
2066 self.maybe_time_based_microcompact();
2069
2070 self.maybe_apply_deferred_summaries();
2075 self.flush_deferred_summaries().await;
2076
2077 if let Err(e) = self.maybe_proactive_compress().await {
2079 tracing::warn!("proactive compression failed: {e:#}");
2080 }
2081
2082 if let Err(e) = self.maybe_compact().await {
2083 tracing::warn!("context compaction failed: {e:#}");
2084 }
2085
2086 if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
2087 tracing::warn!("context preparation failed: {e:#}");
2088 }
2089
2090 self.provider
2092 .set_memory_confidence(self.services.memory.persistence.last_recall_confidence);
2093
2094 self.services.learning_engine.reset_reflection();
2095 }
2096
2097 fn build_user_message(
2098 &mut self,
2099 text: &str,
2100 image_parts: Vec<zeph_llm::provider::MessagePart>,
2101 ) -> Message {
2102 let mut all_image_parts = std::mem::take(&mut self.msg.pending_image_parts);
2103 all_image_parts.extend(image_parts);
2104
2105 if !all_image_parts.is_empty() && self.provider.supports_vision() {
2106 let mut parts = vec![zeph_llm::provider::MessagePart::Text {
2107 text: text.to_owned(),
2108 }];
2109 parts.extend(all_image_parts);
2110 Message::from_parts(Role::User, parts)
2111 } else {
2112 if !all_image_parts.is_empty() {
2113 tracing::warn!(
2114 count = all_image_parts.len(),
2115 "image attachments dropped: provider does not support vision"
2116 );
2117 }
2118 Message {
2119 role: Role::User,
2120 content: text.to_owned(),
2121 parts: vec![],
2122 metadata: MessageMetadata::default(),
2123 }
2124 }
2125 }
2126
2127 fn drain_background_completions(&mut self) {
2131 const BACKGROUND_COMPLETION_BUFFER_CAP: usize = 16;
2132
2133 let Some(ref mut rx) = self.runtime.lifecycle.background_completion_rx else {
2134 return;
2135 };
2136 while let Ok(completion) = rx.try_recv() {
2138 if self.runtime.lifecycle.pending_background_completions.len()
2139 >= BACKGROUND_COMPLETION_BUFFER_CAP
2140 {
2141 tracing::warn!(
2142 run_id = %completion.run_id,
2143 "background completion buffer full; dropping run result"
2144 );
2145 self.runtime
2148 .lifecycle
2149 .pending_background_completions
2150 .pop_front();
2151 self.runtime
2152 .lifecycle
2153 .pending_background_completions
2154 .push_back(zeph_tools::BackgroundCompletion {
2155 run_id: completion.run_id,
2156 exit_code: -1,
2157 success: false,
2158 elapsed_ms: 0,
2159 command: completion.command,
2160 output: format!(
2161 "[background result for run {} dropped: buffer overflow]",
2162 completion.run_id
2163 ),
2164 });
2165 } else {
2166 self.runtime
2167 .lifecycle
2168 .pending_background_completions
2169 .push_back(completion);
2170 }
2171 }
2172 }
2173
2174 fn build_user_message_text_with_bg_completions(&mut self, user_text: &str) -> String {
2178 if self
2179 .runtime
2180 .lifecycle
2181 .pending_background_completions
2182 .is_empty()
2183 {
2184 return user_text.to_owned();
2185 }
2186 let mut parts = String::new();
2187 for completion in self
2188 .runtime
2189 .lifecycle
2190 .pending_background_completions
2191 .drain(..)
2192 {
2193 let _ = write!(
2194 parts,
2195 "[Background task {} completed]\nexit_code: {}\nsuccess: {}\nelapsed_ms: {}\ncommand: {}\n\n{}\n\n",
2196 completion.run_id,
2197 completion.exit_code,
2198 completion.success,
2199 completion.elapsed_ms,
2200 completion.command,
2201 completion.output,
2202 );
2203 }
2204 parts.push_str(user_text);
2205 parts
2206 }
2207
2208 async fn poll_subagent_until_done(
2212 &mut self,
2213 task_id: &str,
2214 label: &str,
2215 ) -> Option<(String, bool)> {
2216 use zeph_subagent::SubAgentState;
2217 let result = loop {
2218 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
2219
2220 #[allow(clippy::redundant_closure_for_method_calls)]
2224 let pending = self
2225 .services
2226 .orchestration
2227 .subagent_manager
2228 .as_mut()
2229 .and_then(|m| m.try_recv_secret_request());
2230 if let Some((req_task_id, req)) = pending {
2231 let confirm_prompt = format!(
2234 "Sub-agent requests secret '{}'. Allow?",
2235 crate::text::truncate_to_chars(&req.secret_key, 100)
2236 );
2237 let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
2238 if let Some(mgr) = self.services.orchestration.subagent_manager.as_mut() {
2239 if approved {
2240 let ttl = std::time::Duration::from_mins(5);
2241 let key = req.secret_key.clone();
2242 if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
2243 let _ = mgr.deliver_secret(&req_task_id, key);
2244 }
2245 } else {
2246 let _ = mgr.deny_secret(&req_task_id);
2247 }
2248 }
2249 }
2250
2251 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2252 let statuses = mgr.statuses();
2253 let Some((_, status)) = statuses.iter().find(|(id, _)| id == task_id) else {
2254 break (format!("{label} completed (no status available)."), true);
2255 };
2256 match status.state {
2257 SubAgentState::Completed => {
2258 let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
2259 break (format!("{label} completed: {msg}"), true);
2260 }
2261 SubAgentState::Failed => {
2262 let msg = status
2263 .last_message
2264 .clone()
2265 .unwrap_or_else(|| "unknown error".into());
2266 break (format!("{label} failed: {msg}"), false);
2267 }
2268 SubAgentState::Canceled => {
2269 break (format!("{label} was cancelled."), false);
2270 }
2271 _ => {
2272 let _ = self
2273 .channel
2274 .send_status(&format!(
2275 "{label}: turn {}/{}",
2276 status.turns_used,
2277 self.services
2278 .orchestration
2279 .subagent_manager
2280 .as_ref()
2281 .and_then(|m| m.agents_def(task_id))
2282 .map_or(20, |d| d.permissions.max_turns)
2283 ))
2284 .await;
2285 }
2286 }
2287 };
2288 Some(result)
2289 }
2290
2291 fn resolve_agent_id_prefix(&mut self, prefix: &str) -> Option<Result<String, String>> {
2294 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2295 let full_ids: Vec<String> = mgr
2296 .statuses()
2297 .into_iter()
2298 .map(|(tid, _)| tid)
2299 .filter(|tid| tid.starts_with(prefix))
2300 .collect();
2301 Some(match full_ids.as_slice() {
2302 [] => Err(format!("No sub-agent with id prefix '{prefix}'")),
2303 [fid] => Ok(fid.clone()),
2304 _ => Err(format!(
2305 "Ambiguous id prefix '{prefix}': matches {} agents",
2306 full_ids.len()
2307 )),
2308 })
2309 }
2310
2311 fn handle_agent_list(&self) -> Option<String> {
2312 use std::fmt::Write as _;
2313 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2314 let defs = mgr.definitions();
2315 if defs.is_empty() {
2316 return Some("No sub-agent definitions found.".into());
2317 }
2318 let mut out = String::from("Available sub-agents:\n");
2319 for d in defs {
2320 let memory_label = match d.memory {
2321 Some(zeph_subagent::MemoryScope::User) => " [memory:user]",
2322 Some(zeph_subagent::MemoryScope::Project) => " [memory:project]",
2323 Some(zeph_subagent::MemoryScope::Local) => " [memory:local]",
2324 None => "",
2325 };
2326 if let Some(ref src) = d.source {
2327 let _ = writeln!(
2328 out,
2329 " {}{} — {} ({})",
2330 d.name, memory_label, d.description, src
2331 );
2332 } else {
2333 let _ = writeln!(out, " {}{} — {}", d.name, memory_label, d.description);
2334 }
2335 }
2336 Some(out)
2337 }
2338
2339 fn handle_agent_status(&self) -> Option<String> {
2340 use std::fmt::Write as _;
2341 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2342 let statuses = mgr.statuses();
2343 if statuses.is_empty() {
2344 return Some("No active sub-agents.".into());
2345 }
2346 let mut out = String::from("Active sub-agents:\n");
2347 for (id, s) in &statuses {
2348 let state = format!("{:?}", s.state).to_lowercase();
2349 let elapsed = s.started_at.elapsed().as_secs();
2350 let _ = writeln!(
2351 out,
2352 " [{short}] {state} turns={t} elapsed={elapsed}s {msg}",
2353 short = &id[..8.min(id.len())],
2354 t = s.turns_used,
2355 msg = s.last_message.as_deref().unwrap_or(""),
2356 );
2357 if let Some(def) = mgr.agents_def(id)
2359 && let Some(scope) = def.memory
2360 && let Ok(dir) = zeph_subagent::memory::resolve_memory_dir(scope, &def.name)
2361 {
2362 let _ = writeln!(out, " memory: {}", dir.display());
2363 }
2364 }
2365 Some(out)
2366 }
2367
2368 fn handle_agent_approve(&mut self, id: &str) -> Option<String> {
2369 let full_id = match self.resolve_agent_id_prefix(id)? {
2370 Ok(fid) => fid,
2371 Err(msg) => return Some(msg),
2372 };
2373 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2374 if let Some((tid, req)) = mgr.try_recv_secret_request()
2375 && tid == full_id
2376 {
2377 let key = req.secret_key.clone();
2378 let ttl = std::time::Duration::from_mins(5);
2379 if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
2380 return Some(format!("Approve failed: {e}"));
2381 }
2382 if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
2383 return Some(format!("Secret delivery failed: {e}"));
2384 }
2385 return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
2386 }
2387 Some(format!(
2388 "No pending secret request for sub-agent '{full_id}'."
2389 ))
2390 }
2391
2392 fn handle_agent_deny(&mut self, id: &str) -> Option<String> {
2393 let full_id = match self.resolve_agent_id_prefix(id)? {
2394 Ok(fid) => fid,
2395 Err(msg) => return Some(msg),
2396 };
2397 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2398 match mgr.deny_secret(&full_id) {
2399 Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
2400 Err(e) => Some(format!("Deny failed: {e}")),
2401 }
2402 }
2403
2404 async fn handle_agent_command(&mut self, cmd: zeph_subagent::AgentCommand) -> Option<String> {
2405 use zeph_subagent::AgentCommand;
2406
2407 match cmd {
2408 AgentCommand::List => self.handle_agent_list(),
2409 AgentCommand::Background { name, prompt } => {
2410 self.handle_agent_background(&name, &prompt)
2411 }
2412 AgentCommand::Spawn { name, prompt }
2413 | AgentCommand::Mention {
2414 agent: name,
2415 prompt,
2416 } => self.handle_agent_spawn_foreground(&name, &prompt).await,
2417 AgentCommand::Status => self.handle_agent_status(),
2418 AgentCommand::Cancel { id } => self.handle_agent_cancel(&id),
2419 AgentCommand::Approve { id } => self.handle_agent_approve(&id),
2420 AgentCommand::Deny { id } => self.handle_agent_deny(&id),
2421 AgentCommand::Resume { id, prompt } => self.handle_agent_resume(&id, &prompt).await,
2422 }
2423 }
2424
2425 fn handle_agent_background(&mut self, name: &str, prompt: &str) -> Option<String> {
2426 let provider = self.provider.clone();
2427 let tool_executor = Arc::clone(&self.tool_executor);
2428 let skills = self.filtered_skills_for(name);
2429 let cfg = self.services.orchestration.subagent_config.clone();
2430 let spawn_ctx = self.build_spawn_context(&cfg);
2431 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2432 match mgr.spawn(
2433 name,
2434 prompt,
2435 provider,
2436 tool_executor,
2437 skills,
2438 &cfg,
2439 spawn_ctx,
2440 ) {
2441 Ok(id) => Some(format!(
2442 "Sub-agent '{name}' started in background (id: {short})",
2443 short = &id[..8.min(id.len())]
2444 )),
2445 Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
2446 }
2447 }
2448
2449 async fn handle_agent_spawn_foreground(&mut self, name: &str, prompt: &str) -> Option<String> {
2450 let provider = self.provider.clone();
2451 let tool_executor = Arc::clone(&self.tool_executor);
2452 let skills = self.filtered_skills_for(name);
2453 let cfg = self.services.orchestration.subagent_config.clone();
2454 let spawn_ctx = self.build_spawn_context(&cfg);
2455 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2456 let task_id = match mgr.spawn(
2457 name,
2458 prompt,
2459 provider,
2460 tool_executor,
2461 skills,
2462 &cfg,
2463 spawn_ctx,
2464 ) {
2465 Ok(id) => id,
2466 Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
2467 };
2468 let short = task_id[..8.min(task_id.len())].to_owned();
2469 let _ = self
2470 .channel
2471 .send(&format!("Sub-agent '{name}' running... (id: {short})"))
2472 .await;
2473 let _ = self
2474 .channel
2475 .notify_foreground_subagent_started(&task_id, name)
2476 .await;
2477 let label = format!("Sub-agent '{name}'");
2478 let Some((result, success)) = self.poll_subagent_until_done(&task_id, &label).await else {
2479 let _ = self
2481 .channel
2482 .notify_foreground_subagent_completed(&task_id, name, false)
2483 .await;
2484 return None;
2485 };
2486 let _ = self
2487 .channel
2488 .notify_foreground_subagent_completed(&task_id, name, success)
2489 .await;
2490 Some(result)
2491 }
2492
2493 fn handle_agent_cancel(&mut self, id: &str) -> Option<String> {
2494 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2495 let ids: Vec<String> = mgr
2497 .statuses()
2498 .into_iter()
2499 .map(|(task_id, _)| task_id)
2500 .filter(|task_id| task_id.starts_with(id))
2501 .collect();
2502 match ids.as_slice() {
2503 [] => Some(format!("No sub-agent with id prefix '{id}'")),
2504 [full_id] => {
2505 let full_id = full_id.clone();
2506 match mgr.cancel(&full_id) {
2507 Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
2508 Err(e) => Some(format!("Cancel failed: {e}")),
2509 }
2510 }
2511 _ => Some(format!(
2512 "Ambiguous id prefix '{id}': matches {} agents",
2513 ids.len()
2514 )),
2515 }
2516 }
2517
2518 async fn handle_agent_resume(&mut self, id: &str, prompt: &str) -> Option<String> {
2519 let cfg = self.services.orchestration.subagent_config.clone();
2520 let def_name = {
2523 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2524 match mgr.def_name_for_resume(id, &cfg) {
2525 Ok(name) => name,
2526 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2527 }
2528 };
2529 let skills = self.filtered_skills_for(&def_name);
2530 let provider = self.provider.clone();
2531 let tool_executor = Arc::clone(&self.tool_executor);
2532 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2533 let (task_id, _) = match mgr.resume(id, prompt, provider, tool_executor, skills, &cfg) {
2534 Ok(pair) => pair,
2535 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2536 };
2537 let short = task_id[..8.min(task_id.len())].to_owned();
2538 let _ = self
2539 .channel
2540 .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
2541 .await;
2542 let _ = self
2543 .channel
2544 .notify_foreground_subagent_started(&task_id, &def_name)
2545 .await;
2546 let Some((result, success)) = self
2547 .poll_subagent_until_done(&task_id, "Resumed sub-agent")
2548 .await
2549 else {
2550 let _ = self
2552 .channel
2553 .notify_foreground_subagent_completed(&task_id, &def_name, false)
2554 .await;
2555 return None;
2556 };
2557 let _ = self
2558 .channel
2559 .notify_foreground_subagent_completed(&task_id, &def_name, success)
2560 .await;
2561 Some(result)
2562 }
2563
2564 fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
2565 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2566 let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
2567 let reg = self.services.skill.registry.read();
2568 match zeph_subagent::filter_skills(®, &def.skills) {
2569 Ok(skills) => {
2570 let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
2571 if bodies.is_empty() {
2572 None
2573 } else {
2574 Some(bodies)
2575 }
2576 }
2577 Err(e) => {
2578 tracing::warn!(error = %e, "skill filtering failed for sub-agent");
2579 None
2580 }
2581 }
2582 }
2583
2584 fn build_spawn_context(
2586 &self,
2587 cfg: &zeph_config::SubAgentConfig,
2588 ) -> zeph_subagent::SpawnContext {
2589 zeph_subagent::SpawnContext {
2590 parent_messages: self.extract_parent_messages(cfg),
2591 parent_cancel: Some(self.runtime.lifecycle.cancel_token.clone()),
2592 parent_provider_name: {
2593 let name = &self.runtime.config.active_provider_name;
2594 if name.is_empty() {
2595 None
2596 } else {
2597 Some(name.clone())
2598 }
2599 },
2600 spawn_depth: self.runtime.config.spawn_depth,
2601 mcp_tool_names: self.extract_mcp_tool_names(),
2602 seed_trajectory_score: {
2604 let child = self.services.security.trajectory.spawn_child();
2605 let score = child.score_now();
2606 if score > 0.0 { Some(score) } else { None }
2607 },
2608 }
2609 }
2610
2611 fn extract_parent_messages(
2617 &self,
2618 config: &zeph_config::SubAgentConfig,
2619 ) -> Vec<zeph_llm::provider::Message> {
2620 use zeph_llm::provider::Role;
2621 if config.context_window_turns == 0 {
2622 return Vec::new();
2623 }
2624 let non_system: Vec<_> = self
2625 .msg
2626 .messages
2627 .iter()
2628 .filter(|m| m.role != Role::System)
2629 .cloned()
2630 .collect();
2631 let take_count = config.context_window_turns * 2;
2632 let start = non_system.len().saturating_sub(take_count);
2633 let mut msgs = non_system[start..].to_vec();
2634
2635 let max_chars = 128_000usize / 4;
2637 let requested = msgs.len();
2638 trim_parent_messages(&mut msgs, max_chars);
2639 if msgs.len() < requested {
2640 tracing::info!(
2641 kept = msgs.len(),
2642 requested,
2643 "[subagent] truncated parent history due to token budget or orphan pruning"
2644 );
2645 }
2646 msgs
2647 }
2648
2649 fn extract_mcp_tool_names(&self) -> Vec<String> {
2651 self.tool_executor
2652 .tool_definitions_erased()
2653 .into_iter()
2654 .filter(|t| t.id.starts_with("mcp_"))
2655 .map(|t| t.id.to_string())
2656 .collect()
2657 }
2658
2659 fn classify_source_kind(
2663 skill_dir: &std::path::Path,
2664 managed_dir: Option<&std::path::PathBuf>,
2665 bundled_names: &std::collections::HashSet<String>,
2666 ) -> zeph_memory::store::SourceKind {
2667 if managed_dir.is_some_and(|d| skill_dir.starts_with(d)) {
2668 let skill_name = skill_dir.file_name().and_then(|n| n.to_str()).unwrap_or("");
2669 let has_marker = skill_dir.join(".bundled").exists();
2670 if has_marker && bundled_names.contains(skill_name) {
2671 zeph_memory::store::SourceKind::Bundled
2672 } else {
2673 if has_marker {
2674 tracing::warn!(
2675 skill = %skill_name,
2676 "skill has .bundled marker but is not in the bundled skill \
2677 allowlist — classifying as Hub"
2678 );
2679 }
2680 zeph_memory::store::SourceKind::Hub
2681 }
2682 } else {
2683 zeph_memory::store::SourceKind::Local
2684 }
2685 }
2686
2687 async fn update_trust_for_reloaded_skills(
2689 &mut self,
2690 all_meta: &[zeph_skills::loader::SkillMeta],
2691 ) {
2692 let memory = self.services.memory.persistence.memory.clone();
2694 let Some(memory) = memory else {
2695 return;
2696 };
2697 let trust_cfg = self.services.skill.trust_config.clone();
2698 let managed_dir = self.services.skill.managed_dir.clone();
2699 let bundled_names: std::collections::HashSet<String> =
2700 zeph_skills::bundled_skill_names().into_iter().collect();
2701 for meta in all_meta {
2702 let skill_dir = meta.skill_dir.clone();
2705 let managed_dir_ref = managed_dir.clone();
2706 let bundled_names_ref = bundled_names.clone();
2707 let fs_result: Option<(String, zeph_memory::store::SourceKind)> =
2708 tokio::task::spawn_blocking(move || {
2709 let hash = zeph_skills::compute_skill_hash(&skill_dir).ok()?;
2710 let source_kind = Self::classify_source_kind(
2711 &skill_dir,
2712 managed_dir_ref.as_ref(),
2713 &bundled_names_ref,
2714 );
2715 Some((hash, source_kind))
2716 })
2717 .await
2718 .unwrap_or(None);
2719
2720 let Some((current_hash, source_kind)) = fs_result else {
2721 tracing::warn!("failed to compute hash for '{}'", meta.name);
2722 continue;
2723 };
2724 let initial_level = match source_kind {
2725 zeph_memory::store::SourceKind::Bundled => &trust_cfg.bundled_level,
2726 zeph_memory::store::SourceKind::Hub => &trust_cfg.default_level,
2727 zeph_memory::store::SourceKind::Local | zeph_memory::store::SourceKind::File => {
2728 &trust_cfg.local_level
2729 }
2730 };
2731 let existing = memory
2732 .sqlite()
2733 .load_skill_trust(&meta.name)
2734 .await
2735 .ok()
2736 .flatten();
2737 let trust_level_str = if let Some(ref row) = existing {
2738 if row.blake3_hash != current_hash {
2739 trust_cfg.hash_mismatch_level.to_string()
2740 } else if row.source_kind != source_kind {
2741 let stored = row
2745 .trust_level
2746 .parse::<zeph_common::SkillTrustLevel>()
2747 .unwrap_or_else(|_| {
2748 tracing::warn!(
2749 skill = %meta.name,
2750 raw = %row.trust_level,
2751 "unrecognised trust_level in DB, treating as quarantined"
2752 );
2753 zeph_common::SkillTrustLevel::Quarantined
2754 });
2755 if !stored.is_active() || stored.severity() <= initial_level.severity() {
2756 row.trust_level.clone()
2757 } else {
2758 initial_level.to_string()
2759 }
2760 } else {
2761 row.trust_level.clone()
2762 }
2763 } else {
2764 initial_level.to_string()
2765 };
2766 let source_path = meta.skill_dir.to_str();
2767 if let Err(e) = memory
2768 .sqlite()
2769 .upsert_skill_trust(
2770 &meta.name,
2771 &trust_level_str,
2772 source_kind,
2773 None,
2774 source_path,
2775 ¤t_hash,
2776 )
2777 .await
2778 {
2779 tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
2780 }
2781 }
2782 }
2783
2784 async fn rebuild_skill_matcher(&mut self, all_meta: &[&zeph_skills::loader::SkillMeta]) {
2786 let provider = self.embedding_provider.clone();
2787 let embed_timeout =
2788 std::time::Duration::from_secs(self.runtime.config.timeouts.embedding_seconds);
2789 let embed_fn = move |text: &str| -> zeph_skills::matcher::EmbedFuture {
2790 let owned = text.to_owned();
2791 let p = provider.clone();
2792 Box::pin(async move {
2793 if let Ok(result) = tokio::time::timeout(embed_timeout, p.embed(&owned)).await {
2794 result
2795 } else {
2796 tracing::warn!(
2797 timeout_secs = embed_timeout.as_secs(),
2798 "skill matcher: embedding timed out"
2799 );
2800 Err(zeph_llm::LlmError::Timeout)
2801 }
2802 })
2803 };
2804
2805 let needs_inmemory_rebuild = !self
2806 .services
2807 .skill
2808 .matcher
2809 .as_ref()
2810 .is_some_and(SkillMatcherBackend::is_qdrant);
2811
2812 if needs_inmemory_rebuild {
2813 self.services.skill.matcher = SkillMatcher::new(all_meta, embed_fn)
2814 .await
2815 .map(SkillMatcherBackend::InMemory);
2816 } else if let Some(ref mut backend) = self.services.skill.matcher {
2817 let _ = self.channel.send_status("syncing skill index...").await;
2818 let on_progress: Option<Box<dyn Fn(usize, usize) + Send>> =
2819 self.services.session.status_tx.clone().map(
2820 |tx| -> Box<dyn Fn(usize, usize) + Send> {
2821 Box::new(move |completed, total| {
2822 let msg = format!("Syncing skills: {completed}/{total}");
2823 let _ = tx.send(msg);
2824 })
2825 },
2826 );
2827 if let Err(e) = backend
2828 .sync(
2829 all_meta,
2830 &self.services.skill.embedding_model,
2831 embed_fn,
2832 on_progress,
2833 )
2834 .await
2835 {
2836 tracing::warn!("failed to sync skill embeddings: {e:#}");
2837 }
2838 }
2839
2840 if self.services.skill.hybrid_search {
2841 let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
2842 let _ = self.channel.send_status("rebuilding search index...").await;
2843 self.services.skill.rebuild_bm25(&descs);
2844 }
2845 }
2846
2847 #[tracing::instrument(name = "core.agent.reload_skills", skip_all, level = "debug")]
2848 async fn reload_skills(&mut self) {
2849 let old_fp = self.services.skill.fingerprint();
2850 let reload_paths = if let Some(ref supplier) = self.services.skill.plugin_dirs_supplier {
2851 let plugin_dirs = supplier();
2852 let mut paths = self.services.skill.skill_paths.clone();
2853 for dir in plugin_dirs {
2854 if !paths.contains(&dir) {
2855 paths.push(dir);
2856 }
2857 }
2858 paths
2859 } else {
2860 self.services.skill.skill_paths.clone()
2861 };
2862 self.services.skill.registry.write().reload(&reload_paths);
2863 if self.services.skill.fingerprint() == old_fp {
2864 return;
2865 }
2866 let _ = self.channel.send_status("reloading skills...").await;
2867
2868 let all_meta = self
2869 .services
2870 .skill
2871 .registry
2872 .read()
2873 .all_meta()
2874 .into_iter()
2875 .cloned()
2876 .collect::<Vec<_>>();
2877
2878 self.update_trust_for_reloaded_skills(&all_meta).await;
2879
2880 let all_meta_refs = all_meta.iter().collect::<Vec<_>>();
2881 self.rebuild_skill_matcher(&all_meta_refs).await;
2882
2883 let all_skills: Vec<Skill> = {
2884 let reg = self.services.skill.registry.read();
2885 reg.all_meta()
2886 .iter()
2887 .filter_map(|m| reg.skill(&m.name).ok())
2888 .collect()
2889 };
2890 let trust_map = self.build_skill_trust_map().await;
2891 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
2892 let skills_prompt =
2893 state::SkillState::rebuild_prompt(&all_skills, &trust_map, &empty_health);
2894 self.services
2895 .skill
2896 .last_skills_prompt
2897 .clone_from(&skills_prompt);
2898 let system_prompt = build_system_prompt(&skills_prompt, None);
2899 if let Some(msg) = self.msg.messages.first_mut() {
2900 msg.content = system_prompt;
2901 }
2902
2903 let _ = self.channel.send_status("").await;
2904 tracing::info!(
2905 "reloaded {} skill(s)",
2906 self.services.skill.registry.read().all_meta().len()
2907 );
2908 }
2909
2910 fn reload_instructions(&mut self) {
2911 if let Some(ref mut rx) = self.runtime.instructions.reload_rx {
2913 while rx.try_recv().is_ok() {}
2914 }
2915 let Some(ref state) = self.runtime.instructions.reload_state else {
2916 return;
2917 };
2918 let new_blocks = crate::instructions::load_instructions(
2919 &state.base_dir,
2920 &state.provider_kinds,
2921 &state.explicit_files,
2922 state.auto_detect,
2923 );
2924 let old_sources: std::collections::HashSet<_> = self
2925 .runtime
2926 .instructions
2927 .blocks
2928 .iter()
2929 .map(|b| &b.source)
2930 .collect();
2931 let new_sources: std::collections::HashSet<_> =
2932 new_blocks.iter().map(|b| &b.source).collect();
2933 for added in new_sources.difference(&old_sources) {
2934 tracing::info!(path = %added.display(), "instruction file added");
2935 }
2936 for removed in old_sources.difference(&new_sources) {
2937 tracing::info!(path = %removed.display(), "instruction file removed");
2938 }
2939 tracing::info!(
2940 old_count = self.runtime.instructions.blocks.len(),
2941 new_count = new_blocks.len(),
2942 "reloaded instruction files"
2943 );
2944 self.runtime.instructions.blocks = new_blocks;
2945 }
2946
2947 fn reload_config(&mut self) {
2948 let Some(path) = self.runtime.lifecycle.config_path.clone() else {
2949 return;
2950 };
2951 let Some(config) = self.load_config_with_overlay(&path) else {
2952 return;
2953 };
2954 let budget_tokens = resolve_context_budget(&config, &self.provider);
2955 self.runtime.config.security = config.security;
2956 self.runtime.config.timeouts = config.timeouts;
2957 self.runtime.config.redact_credentials = config.memory.redact_credentials;
2958 self.services.memory.persistence.history_limit = config.memory.history_limit;
2959 self.services.memory.persistence.recall_limit = config.memory.semantic.recall_limit;
2960 self.services.memory.compaction.summarization_threshold =
2961 config.memory.summarization_threshold;
2962 self.services.skill.max_active_skills = config.skills.max_active_skills.get();
2963 self.services.skill.disambiguation_threshold = config.skills.disambiguation_threshold;
2964 self.services.skill.min_injection_score = config.skills.min_injection_score;
2965 self.services.skill.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
2966 self.services.skill.hybrid_search = config.skills.hybrid_search;
2967 self.services.skill.two_stage_matching = config.skills.two_stage_matching;
2968 self.services.skill.confusability_threshold =
2969 config.skills.confusability_threshold.clamp(0.0, 1.0);
2970 config
2971 .skills
2972 .generation_provider
2973 .as_str()
2974 .clone_into(&mut self.services.skill.generation_provider_name);
2975 self.services.skill.generation_output_dir =
2976 config.skills.generation_output_dir.as_deref().map(|p| {
2977 if let Some(stripped) = p.strip_prefix("~/") {
2978 dirs::home_dir()
2979 .map_or_else(|| std::path::PathBuf::from(p), |h| h.join(stripped))
2980 } else {
2981 std::path::PathBuf::from(p)
2982 }
2983 });
2984
2985 self.context_manager.budget = Some(
2986 ContextBudget::new(budget_tokens, 0.20).with_graph_enabled(config.memory.graph.enabled),
2987 );
2988
2989 {
2990 let graph_cfg = &config.memory.graph;
2991 if graph_cfg.rpe.enabled {
2992 if self.services.memory.extraction.rpe_router.is_none() {
2994 self.services.memory.extraction.rpe_router =
2995 Some(std::sync::Mutex::new(zeph_memory::RpeRouter::new(
2996 graph_cfg.rpe.threshold,
2997 graph_cfg.rpe.max_skip_turns,
2998 )));
2999 }
3000 } else {
3001 self.services.memory.extraction.rpe_router = None;
3002 }
3003 self.services.memory.extraction.graph_config = graph_cfg.clone();
3004 }
3005 self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
3006 self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
3007 self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
3008 self.context_manager.compaction_cooldown_turns = config.memory.compaction_cooldown_turns;
3009 self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
3010 self.context_manager.compression = config.memory.compression.clone();
3011 self.context_manager.routing = config.memory.store_routing.clone();
3012 self.context_manager.store_routing_provider = if config
3014 .memory
3015 .store_routing
3016 .routing_classifier_provider
3017 .is_empty()
3018 {
3019 None
3020 } else {
3021 let resolved = self.resolve_background_provider(
3022 &config.memory.store_routing.routing_classifier_provider,
3023 );
3024 Some(std::sync::Arc::new(resolved))
3025 };
3026 self.services
3027 .memory
3028 .persistence
3029 .cross_session_score_threshold = config.memory.cross_session_score_threshold;
3030
3031 self.services.index.repo_map_tokens = config.index.repo_map_tokens;
3032 self.services.index.repo_map_ttl =
3033 std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
3034
3035 self.services
3036 .session
3037 .hooks_config
3038 .cwd_changed
3039 .clone_from(&config.hooks.cwd_changed);
3040 self.services
3041 .session
3042 .hooks_config
3043 .permission_denied
3044 .clone_from(&config.hooks.permission_denied);
3045 self.services
3046 .session
3047 .hooks_config
3048 .turn_complete
3049 .clone_from(&config.hooks.turn_complete);
3050 tracing::info!("config reloaded");
3053 }
3054
3055 fn load_config_with_overlay(&mut self, path: &std::path::Path) -> Option<Config> {
3059 let mut config = match Config::load(path) {
3060 Ok(c) => c,
3061 Err(e) => {
3062 tracing::warn!("config reload failed: {e:#}");
3063 return None;
3064 }
3065 };
3066
3067 let new_overlay = if self.runtime.lifecycle.plugins_dir.as_os_str().is_empty() {
3069 None
3070 } else {
3071 match zeph_plugins::apply_plugin_config_overlays(
3072 &mut config,
3073 &self.runtime.lifecycle.plugins_dir,
3074 ) {
3075 Ok(o) => Some(o),
3076 Err(e) => {
3077 tracing::warn!(
3078 "plugin overlay merge failed during reload: {e:#}; \
3079 keeping previous runtime state"
3080 );
3081 return None;
3082 }
3083 }
3084 };
3085
3086 if let Some(ref overlay) = new_overlay {
3090 self.warn_on_shell_overlay_divergence(overlay, &config);
3091 }
3092 Some(config)
3093 }
3094
3095 fn warn_on_shell_overlay_divergence(
3101 &self,
3102 new_overlay: &zeph_plugins::ResolvedOverlay,
3103 config: &Config,
3104 ) {
3105 let new_blocked: Vec<String> = {
3106 let mut v = config.tools.shell.blocked_commands.clone();
3107 v.sort();
3108 v
3109 };
3110 let new_allowed: Vec<String> = {
3111 let mut v = config.tools.shell.allowed_commands.clone();
3112 v.sort();
3113 v
3114 };
3115
3116 let startup = &self.runtime.lifecycle.startup_shell_overlay;
3117 let blocked_changed = new_blocked != startup.blocked;
3118 let allowed_changed = new_allowed != startup.allowed;
3119
3120 if blocked_changed && let Some(ref h) = self.runtime.lifecycle.shell_policy_handle {
3122 h.rebuild(&config.tools.shell);
3123 tracing::info!(
3124 blocked_count = h.snapshot_blocked().len(),
3125 "shell blocked_commands rebuilt from hot-reload"
3126 );
3127 }
3128
3129 if allowed_changed {
3136 let msg = "plugin config overlay changed shell allowed_commands; RESTART REQUIRED \
3137 for sandbox path recomputation (blocked_commands was rebuilt live)";
3138 tracing::warn!("{msg}");
3139 if let Some(ref tx) = self.services.session.status_tx {
3140 let _ = tx.send(msg.to_owned());
3141 }
3142 }
3143
3144 let _ = new_overlay;
3145 }
3146
3147 fn maybe_sidequest_eviction(&mut self) {
3158 if self.services.sidequest.config.enabled {
3162 use crate::config::PruningStrategy;
3163 if !matches!(
3164 self.context_manager.compression.pruning_strategy,
3165 PruningStrategy::Reactive
3166 ) {
3167 tracing::warn!(
3168 strategy = ?self.context_manager.compression.pruning_strategy,
3169 "sidequest is enabled alongside a non-Reactive pruning strategy; \
3170 consider disabling sidequest.enabled to avoid redundant eviction"
3171 );
3172 }
3173 }
3174
3175 if self.services.focus.is_active() {
3177 tracing::debug!("sidequest: skipping — focus session active");
3178 self.services.compression.pending_sidequest_result = None;
3180 return;
3181 }
3182
3183 self.sidequest_apply_pending();
3185
3186 self.sidequest_schedule_next();
3188 }
3189
3190 fn sidequest_apply_pending(&mut self) {
3191 let Some(handle) = self.services.compression.pending_sidequest_result.take() else {
3192 return;
3193 };
3194 let result = match handle.try_join() {
3197 Ok(result) => result,
3198 Err(_handle) => {
3199 tracing::debug!("sidequest: background LLM task not yet complete, rescheduling");
3201 return;
3202 }
3203 };
3204 match result {
3205 Ok(Some(evicted_indices)) if !evicted_indices.is_empty() => {
3206 let cursors_snapshot = self.services.sidequest.tool_output_cursors.clone();
3207 let freed = self.services.sidequest.apply_eviction(
3208 &mut self.msg.messages,
3209 &evicted_indices,
3210 &self.runtime.metrics.token_counter,
3211 );
3212 if freed > 0 {
3213 self.recompute_prompt_tokens();
3214 self.context_manager.compaction =
3217 crate::agent::context_manager::CompactionState::CompactedThisTurn {
3218 cooldown: 0,
3219 };
3220 tracing::info!(
3221 freed_tokens = freed,
3222 evicted_cursors = evicted_indices.len(),
3223 pass = self.services.sidequest.passes_run,
3224 "sidequest eviction complete"
3225 );
3226 if let Some(ref d) = self.runtime.debug.debug_dumper {
3227 d.dump_sidequest_eviction(&cursors_snapshot, &evicted_indices, freed);
3228 }
3229 if let Some(ref tx) = self.services.session.status_tx {
3230 let _ = tx.send(format!("SideQuest evicted {freed} tokens"));
3231 }
3232 } else {
3233 if let Some(ref tx) = self.services.session.status_tx {
3235 let _ = tx.send(String::new());
3236 }
3237 }
3238 }
3239 Ok(None | Some(_)) => {
3240 tracing::debug!("sidequest: pending result: no cursors to evict");
3241 if let Some(ref tx) = self.services.session.status_tx {
3242 let _ = tx.send(String::new());
3243 }
3244 }
3245 Err(e) => {
3246 tracing::debug!("sidequest: background task error: {e}");
3247 if let Some(ref tx) = self.services.session.status_tx {
3248 let _ = tx.send(String::new());
3249 }
3250 }
3251 }
3252 }
3253
3254 fn sidequest_schedule_next(&mut self) {
3255 use zeph_llm::provider::{Message, MessageMetadata, Role};
3256
3257 self.services
3258 .sidequest
3259 .rebuild_cursors(&self.msg.messages, &self.runtime.metrics.token_counter);
3260
3261 if self.services.sidequest.tool_output_cursors.is_empty() {
3262 tracing::debug!("sidequest: no eligible cursors");
3263 return;
3264 }
3265
3266 let prompt = self.services.sidequest.build_eviction_prompt();
3267 let max_eviction_ratio = self.services.sidequest.config.max_eviction_ratio;
3268 let n_cursors = self.services.sidequest.tool_output_cursors.len();
3269 let provider = self.summary_or_primary_provider().clone();
3271
3272 let eviction_future = async move {
3273 let msgs = [Message {
3274 role: Role::User,
3275 content: prompt,
3276 parts: vec![],
3277 metadata: MessageMetadata::default(),
3278 }];
3279 let response =
3280 match tokio::time::timeout(std::time::Duration::from_secs(5), provider.chat(&msgs))
3281 .await
3282 {
3283 Ok(Ok(r)) => r,
3284 Ok(Err(e)) => {
3285 tracing::debug!("sidequest bg: LLM call failed: {e:#}");
3286 return None;
3287 }
3288 Err(_) => {
3289 tracing::debug!("sidequest bg: LLM call timed out");
3290 return None;
3291 }
3292 };
3293
3294 let start = response.find('{')?;
3295 let end = response.rfind('}')?;
3296 if start > end {
3297 return None;
3298 }
3299 let json_slice = &response[start..=end];
3300 let parsed: sidequest::EvictionResponse = serde_json::from_str(json_slice).ok()?;
3301 let mut valid: Vec<usize> = parsed
3302 .del_cursors
3303 .into_iter()
3304 .filter(|&c| c < n_cursors)
3305 .collect();
3306 valid.sort_unstable();
3307 valid.dedup();
3308 #[allow(
3309 clippy::cast_precision_loss,
3310 clippy::cast_possible_truncation,
3311 clippy::cast_sign_loss
3312 )]
3313 let max_evict = ((n_cursors as f32) * max_eviction_ratio).ceil() as usize;
3314 valid.truncate(max_evict);
3315 Some(valid)
3316 };
3317 let handle = self.runtime.lifecycle.task_supervisor.spawn_oneshot(
3318 std::sync::Arc::from("agent.sidequest.eviction"),
3319 move || eviction_future,
3320 );
3321 self.services.compression.pending_sidequest_result = Some(handle);
3322 tracing::debug!("sidequest: background LLM eviction task spawned");
3323 if let Some(ref tx) = self.services.session.status_tx {
3324 let _ = tx.send("SideQuest: scoring tool outputs...".into());
3325 }
3326 }
3327
3328 fn mcp_dispatch(&self) -> Option<McpManagerDispatch> {
3330 self.services
3331 .mcp
3332 .manager
3333 .as_ref()
3334 .map(|m| McpManagerDispatch(Arc::clone(m)))
3335 }
3336
3337 pub(crate) async fn check_cwd_changed(&mut self) {
3343 let current = match std::env::current_dir() {
3344 Ok(p) => p,
3345 Err(e) => {
3346 tracing::warn!("check_cwd_changed: failed to get cwd: {e}");
3347 return;
3348 }
3349 };
3350 if current == self.runtime.lifecycle.last_known_cwd {
3351 return;
3352 }
3353 let old_cwd =
3354 std::mem::replace(&mut self.runtime.lifecycle.last_known_cwd, current.clone());
3355 self.services.session.env_context.working_dir = current.display().to_string();
3356
3357 tracing::info!(
3358 old = %old_cwd.display(),
3359 new = %current.display(),
3360 "working directory changed"
3361 );
3362
3363 let _ = self
3364 .channel
3365 .send_status("Working directory changed\u{2026}")
3366 .await;
3367
3368 let hooks = self.services.session.hooks_config.cwd_changed.clone();
3369 if hooks.is_empty() {
3370 tracing::debug!("CwdChanged: no hooks configured, skipping");
3371 } else {
3372 tracing::debug!(count = hooks.len(), "CwdChanged: firing hooks");
3373 let mut env = std::collections::HashMap::new();
3374 env.insert("ZEPH_OLD_CWD".to_owned(), old_cwd.display().to_string());
3375 env.insert("ZEPH_NEW_CWD".to_owned(), current.display().to_string());
3376 let dispatch = self.mcp_dispatch();
3377 let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
3378 .as_ref()
3379 .map(|d| d as &dyn zeph_subagent::McpDispatch);
3380 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp).await {
3381 tracing::warn!(error = %e, "CwdChanged hook failed");
3382 } else {
3383 tracing::info!(count = hooks.len(), "CwdChanged: hooks fired");
3384 }
3385 }
3386
3387 let _ = self.channel.send_status("").await;
3388 }
3389
3390 pub(crate) async fn handle_file_changed(
3392 &mut self,
3393 event: crate::file_watcher::FileChangedEvent,
3394 ) {
3395 tracing::info!(path = %event.path.display(), "file changed");
3396
3397 let _ = self
3398 .channel
3399 .send_status("Running file-change hook\u{2026}")
3400 .await;
3401
3402 let hooks = self
3403 .services
3404 .session
3405 .hooks_config
3406 .file_changed_hooks
3407 .clone();
3408 if hooks.is_empty() {
3409 tracing::debug!(path = %event.path.display(), "FileChanged: no hooks configured, skipping");
3410 } else {
3411 tracing::debug!(count = hooks.len(), path = %event.path.display(), "FileChanged: firing hooks");
3412 let mut env = std::collections::HashMap::new();
3413 env.insert(
3414 "ZEPH_CHANGED_PATH".to_owned(),
3415 event.path.display().to_string(),
3416 );
3417 let dispatch = self.mcp_dispatch();
3418 let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
3419 .as_ref()
3420 .map(|d| d as &dyn zeph_subagent::McpDispatch);
3421 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp).await {
3422 tracing::warn!(error = %e, "FileChanged hook failed");
3423 } else {
3424 tracing::info!(count = hooks.len(), path = %event.path.display(), "FileChanged: hooks fired");
3425 }
3426 }
3427
3428 let _ = self.channel.send_status("").await;
3429 }
3430
3431 pub(super) fn maybe_spawn_promotion_scan(&mut self) {
3441 let Some(engine) = self.services.promotion_engine.clone() else {
3442 return;
3443 };
3444
3445 let Some(memory) = self.services.memory.persistence.memory.clone() else {
3446 return;
3447 };
3448
3449 let promotion_window = 200usize;
3452
3453 let accepted = self.runtime.lifecycle.supervisor.spawn(
3454 agent_supervisor::TaskClass::Enrichment,
3455 "compression_spectrum.promotion_scan",
3456 async move {
3457 let span = tracing::info_span!("memory.compression.promote.background");
3458 let _enter = span.enter();
3459
3460 let window = match memory.load_promotion_window(promotion_window).await {
3461 Ok(w) => w,
3462 Err(e) => {
3463 tracing::warn!(error = %e, "promotion scan: failed to load window");
3464 return;
3465 }
3466 };
3467
3468 if window.is_empty() {
3469 return;
3470 }
3471
3472 let candidates = match engine.scan(&window).await {
3473 Ok(c) => c,
3474 Err(e) => {
3475 tracing::warn!(error = %e, "promotion scan: clustering failed");
3476 return;
3477 }
3478 };
3479
3480 for candidate in &candidates {
3481 if let Err(e) = engine.promote(candidate).await {
3482 tracing::warn!(
3483 signature = %candidate.signature,
3484 error = %e,
3485 "promotion scan: promote failed"
3486 );
3487 }
3488 }
3489
3490 tracing::info!(candidates = candidates.len(), "promotion scan: complete");
3491 },
3492 );
3493
3494 if accepted {
3495 tracing::debug!("compression_spectrum: promotion scan task enqueued");
3496 }
3497 }
3498}
3499pub(crate) fn estimate_parts_size(m: &zeph_llm::provider::Message) -> usize {
3507 use zeph_llm::provider::MessagePart;
3508 if m.parts.is_empty() {
3509 return m.content.len();
3510 }
3511 m.parts
3512 .iter()
3513 .map(|p| match p {
3514 MessagePart::Text { text }
3515 | MessagePart::Recall { text }
3516 | MessagePart::CodeContext { text }
3517 | MessagePart::Summary { text }
3518 | MessagePart::CrossSession { text } => text.len(),
3519 MessagePart::ToolOutput { body, .. } => body.len(),
3520 MessagePart::ToolUse { id, name, input } => {
3521 50 + id.len() + name.len() + input.to_string().len()
3522 }
3523 MessagePart::ToolResult {
3524 tool_use_id,
3525 content,
3526 ..
3527 } => 50 + tool_use_id.len() + content.len(),
3528 MessagePart::Image(img) => img.data.len() * 4 / 3,
3529 MessagePart::ThinkingBlock {
3530 thinking,
3531 signature,
3532 } => 50 + thinking.len() + signature.len(),
3533 MessagePart::RedactedThinkingBlock { data } => data.len(),
3534 MessagePart::Compaction { summary } => summary.len(),
3535 })
3536 .sum()
3537}
3538
3539pub(crate) fn trim_parent_messages(msgs: &mut Vec<zeph_llm::provider::Message>, max_chars: usize) {
3558 use zeph_llm::provider::{MessagePart, Role};
3559
3560 let mut total_chars = 0usize;
3564 let mut drop_before = 0usize; for (i, m) in msgs.iter().enumerate().rev() {
3566 total_chars += estimate_parts_size(m);
3567 if total_chars > max_chars {
3568 drop_before = i + 1;
3569 break;
3570 }
3571 }
3572 if drop_before > 0 {
3573 msgs.drain(..drop_before);
3574 }
3575
3576 let emitted_tool_ids: std::collections::HashSet<String> = msgs
3580 .iter()
3581 .filter(|m| m.role == Role::Assistant)
3582 .flat_map(|m| m.parts.iter())
3583 .filter_map(|p| {
3584 if let MessagePart::ToolUse { id, .. } = p {
3585 Some(id.clone())
3586 } else {
3587 None
3588 }
3589 })
3590 .collect();
3591
3592 let mut orphans_removed = 0usize;
3593 for m in msgs.iter_mut() {
3594 if m.role != Role::User || m.parts.is_empty() {
3595 continue;
3596 }
3597 let before = m.parts.len();
3598 m.parts.retain(|p| match p {
3599 MessagePart::ToolResult { tool_use_id, .. } => {
3600 emitted_tool_ids.contains(tool_use_id.as_str())
3601 }
3602 _ => true,
3603 });
3604 let dropped = before - m.parts.len();
3605 if dropped > 0 {
3606 orphans_removed += dropped;
3607 if m.parts.is_empty() {
3608 m.content.clear();
3609 } else {
3610 m.rebuild_content();
3611 }
3612 }
3613 }
3614
3615 let consumed_tool_ids: std::collections::HashSet<String> = msgs
3623 .iter()
3624 .filter(|m| m.role == Role::User)
3625 .flat_map(|m| m.parts.iter())
3626 .filter_map(|p| {
3627 if let MessagePart::ToolResult { tool_use_id, .. } = p {
3628 Some(tool_use_id.clone())
3629 } else {
3630 None
3631 }
3632 })
3633 .collect();
3634
3635 let last_assistant_idx = msgs
3637 .iter()
3638 .enumerate()
3639 .rev()
3640 .find(|(_, m)| m.role == Role::Assistant)
3641 .map(|(i, _)| i);
3642
3643 for (idx, m) in msgs.iter_mut().enumerate() {
3644 if m.role != Role::Assistant || m.parts.is_empty() {
3645 continue;
3646 }
3647 if Some(idx) == last_assistant_idx {
3649 continue;
3650 }
3651 let before = m.parts.len();
3652 m.parts.retain(|p| match p {
3653 MessagePart::ToolUse { id, .. } => consumed_tool_ids.contains(id.as_str()),
3654 _ => true,
3655 });
3656 let dropped = before - m.parts.len();
3657 if dropped > 0 {
3658 orphans_removed += dropped;
3659 if m.parts.is_empty() {
3660 m.content.clear();
3661 } else {
3662 m.rebuild_content();
3663 }
3664 }
3665 }
3666
3667 msgs.retain(|m| !m.content.is_empty() || !m.parts.is_empty());
3669
3670 if orphans_removed > 0 {
3671 tracing::debug!(
3672 orphans = orphans_removed,
3673 "[subagent] pruned orphaned ToolUse/ToolResult parts from parent context boundary"
3674 );
3675 }
3676}
3677
3678struct McpManagerDispatch(Arc<zeph_mcp::McpManager>);
3683
3684impl zeph_subagent::McpDispatch for McpManagerDispatch {
3685 fn call_tool<'a>(
3686 &'a self,
3687 server: &'a str,
3688 tool: &'a str,
3689 args: serde_json::Value,
3690 ) -> std::pin::Pin<
3691 Box<dyn std::future::Future<Output = Result<serde_json::Value, String>> + Send + 'a>,
3692 > {
3693 Box::pin(async move {
3694 self.0
3695 .call_tool(server, tool, args)
3696 .await
3697 .map(|result| {
3698 let texts: Vec<serde_json::Value> = result
3700 .content
3701 .iter()
3702 .filter_map(|c| {
3703 if let rmcp::model::RawContent::Text(t) = &c.raw {
3704 Some(serde_json::Value::String(t.text.clone()))
3705 } else {
3706 None
3707 }
3708 })
3709 .collect();
3710 serde_json::Value::Array(texts)
3711 })
3712 .map_err(|e| e.to_string())
3713 })
3714 }
3715}
3716
3717pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
3718 while !*rx.borrow_and_update() {
3719 if rx.changed().await.is_err() {
3720 std::future::pending::<()>().await;
3721 }
3722 }
3723}
3724
3725pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
3726 match rx {
3727 Some(inner) => {
3728 if let Some(v) = inner.recv().await {
3729 Some(v)
3730 } else {
3731 *rx = None;
3732 std::future::pending().await
3733 }
3734 }
3735 None => std::future::pending().await,
3736 }
3737}
3738
3739fn truncate_shell_command(cmd: &str) -> String {
3746 if cmd.len() <= 80 {
3747 return cmd.to_owned();
3748 }
3749 let end = cmd.floor_char_boundary(79);
3750 format!("{}…", &cmd[..end])
3751}
3752
3753fn truncate_shell_run_id(id: &str) -> String {
3755 id.chars().take(8).collect()
3756}
3757
3758pub(crate) fn resolve_context_budget(config: &Config, provider: &AnyProvider) -> usize {
3759 let tokens = if config.memory.auto_budget && config.memory.context_budget_tokens == 0 {
3760 if let Some(ctx_size) = provider.context_window() {
3761 tracing::info!(
3762 model_context = ctx_size,
3763 "auto-configured context budget on reload"
3764 );
3765 ctx_size
3766 } else {
3767 0
3768 }
3769 } else {
3770 config.memory.context_budget_tokens
3771 };
3772 if tokens == 0 {
3773 tracing::warn!(
3774 "context_budget_tokens resolved to 0 on reload — using fallback of 128000 tokens"
3775 );
3776 128_000
3777 } else {
3778 tokens
3779 }
3780}
3781
3782#[cfg(test)]
3783mod tests;
3784
3785#[cfg(test)]
3786pub(crate) use tests::agent_tests;
3787
3788#[cfg(test)]
3789mod test_stubs {
3790 use std::pin::Pin;
3791
3792 use zeph_commands::{
3793 CommandContext, CommandError, CommandHandler, CommandOutput, SlashCategory,
3794 };
3795
3796 pub(super) struct TestErrorCommand;
3802
3803 impl CommandHandler<CommandContext<'_>> for TestErrorCommand {
3804 fn name(&self) -> &'static str {
3805 "/test-error"
3806 }
3807
3808 fn description(&self) -> &'static str {
3809 "Test stub: always returns CommandError"
3810 }
3811
3812 fn category(&self) -> SlashCategory {
3813 SlashCategory::Session
3814 }
3815
3816 fn handle<'a>(
3817 &'a self,
3818 _ctx: &'a mut CommandContext<'_>,
3819 _args: &'a str,
3820 ) -> Pin<
3821 Box<dyn std::future::Future<Output = Result<CommandOutput, CommandError>> + Send + 'a>,
3822 > {
3823 Box::pin(async { Err(CommandError::new("boom")) })
3824 }
3825 }
3826}