1mod acp_commands;
10mod agent_access_impl;
11pub(crate) mod agent_supervisor;
12mod autodream;
13mod builder;
14pub(crate) mod channel_impl;
15#[cfg(feature = "cocoon")]
16mod cocoon_cmd;
17mod command_context_impls;
18pub(super) mod compression_feedback;
19mod context;
20mod context_impls;
21pub(crate) mod context_manager;
22mod corrections;
23pub mod error;
24mod experiment_cmd;
25pub(crate) mod focus;
26mod index;
27mod learning;
28pub(crate) mod learning_engine;
29mod log_commands;
30mod loop_event;
31mod lsp_commands;
32mod magic_docs;
33mod mcp;
34pub(crate) mod memcot;
35mod message_queue;
36mod microcompact;
37mod model_commands;
38mod persistence;
39#[cfg(feature = "scheduler")]
40mod plan;
41mod policy_commands;
42mod provider_cmd;
43mod quality_hook;
44pub(crate) mod rate_limiter;
45#[cfg(feature = "scheduler")]
46mod scheduler_commands;
47#[cfg(feature = "scheduler")]
48mod scheduler_loop;
49mod scope_commands;
50pub mod session_config;
51mod session_digest;
52pub mod shadow_sentinel;
53pub(crate) mod sidequest;
54mod skill_management;
55pub mod slash_commands;
56pub mod speculative;
57pub(crate) mod state;
58pub(crate) mod task_injection;
59pub(crate) mod tool_execution;
60pub(crate) mod tool_orchestrator;
61pub mod trajectory;
62mod trajectory_commands;
63mod trust_commands;
64pub mod turn;
65mod utils;
66pub(crate) mod vigil;
67
68use std::collections::{HashMap, VecDeque};
69use std::fmt::Write as _;
70use std::sync::Arc;
71
72use parking_lot::RwLock;
73
74use tokio::sync::{mpsc, watch};
75use tokio_util::sync::CancellationToken;
76use zeph_llm::any::AnyProvider;
77use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
78use zeph_memory::TokenCounter;
79use zeph_memory::semantic::SemanticMemory;
80use zeph_skills::loader::Skill;
81use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
82use zeph_skills::prompt::format_skills_prompt;
83use zeph_skills::registry::SkillRegistry;
84use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
85
86use crate::channel::Channel;
87use crate::config::Config;
88use crate::context::{ContextBudget, build_system_prompt};
89use zeph_common::text::estimate_tokens;
90
91use loop_event::LoopEvent;
92use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, detect_image_mime};
93use state::MessageState;
94
95pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
96pub(crate) use zeph_agent_context::helpers::CODE_CONTEXT_PREFIX;
100pub(crate) const SCHEDULED_TASK_PREFIX: &str = "Execute the following scheduled task now: ";
101pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
102
103pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
104 use std::fmt::Write;
105 let capacity = "[tool output: ".len()
106 + tool_name.len()
107 + "]\n```\n".len()
108 + body.len()
109 + TOOL_OUTPUT_SUFFIX.len();
110 let mut buf = String::with_capacity(capacity);
111 let _ = write!(
112 buf,
113 "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
114 );
115 buf
116}
117
118pub struct Agent<C: Channel> {
151 provider: AnyProvider,
153 embedding_provider: AnyProvider,
158 channel: C,
159 pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
160
161 pub(super) msg: MessageState,
163 pub(super) context_manager: context_manager::ContextManager,
164 pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
165
166 pub(super) services: state::Services,
168
169 pub(super) runtime: state::AgentRuntime,
171}
172
173enum DispatchFlow {
175 Break,
177 Continue,
179 Fallthrough,
181}
182
183impl<C: Channel> Agent<C> {
184 #[must_use]
208 pub fn new(
209 provider: AnyProvider,
210 channel: C,
211 registry: SkillRegistry,
212 matcher: Option<SkillMatcherBackend>,
213 max_active_skills: usize,
214 tool_executor: impl ToolExecutor + 'static,
215 ) -> Self {
216 let registry = Arc::new(RwLock::new(registry));
217 let embedding_provider = provider.clone();
218 Self::new_with_registry_arc(
219 provider,
220 embedding_provider,
221 channel,
222 registry,
223 matcher,
224 max_active_skills,
225 tool_executor,
226 )
227 }
228
229 #[must_use]
236 pub fn new_with_registry_arc(
237 provider: AnyProvider,
238 embedding_provider: AnyProvider,
239 channel: C,
240 registry: Arc<RwLock<SkillRegistry>>,
241 matcher: Option<SkillMatcherBackend>,
242 max_active_skills: usize,
243 tool_executor: impl ToolExecutor + 'static,
244 ) -> Self {
245 use state::{
246 AgentRuntime, CompressionState, DebugState, ExperimentState, FeedbackState, IndexState,
247 InstructionState, LifecycleState, McpState, MemoryState, MetricsState,
248 OrchestrationState, ProviderState, RuntimeConfig, SecurityState, Services,
249 SessionState, SkillState, ToolState,
250 };
251
252 debug_assert!(max_active_skills > 0, "max_active_skills must be > 0");
253 let all_skills: Vec<Skill> = {
254 let reg = registry.read();
255 reg.all_meta()
256 .iter()
257 .filter_map(|m| reg.skill(&m.name).ok())
258 .collect()
259 };
260 let empty_trust = HashMap::new();
261 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
262 let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
263 let system_prompt = build_system_prompt(&skills_prompt, None);
264 tracing::debug!(len = system_prompt.len(), "initial system prompt built");
265 tracing::trace!(prompt = %system_prompt, "full system prompt");
266
267 let initial_prompt_tokens = estimate_tokens(&system_prompt) as u64;
268 let token_counter = Arc::new(TokenCounter::new());
269
270 let services = Services {
271 memory: MemoryState::default(),
272 skill: SkillState::new(registry, matcher, max_active_skills, skills_prompt),
273 learning_engine: learning_engine::LearningEngine::new(),
274 feedback: FeedbackState::default(),
275 mcp: McpState::default(),
276 index: IndexState::default(),
277 session: SessionState::new(),
278 security: SecurityState::default(),
279 experiments: ExperimentState::new(),
280 compression: CompressionState::default(),
281 orchestration: OrchestrationState::default(),
282 focus: focus::FocusState::default(),
283 sidequest: sidequest::SidequestState::default(),
284 tool_state: ToolState::default(),
285 goal_accounting: None,
286 quality: None,
287 proactive_explorer: None,
288 promotion_engine: None,
289 taco_compressor: None,
290 speculation_engine: None,
291 };
292
293 let runtime = AgentRuntime {
294 config: RuntimeConfig::default(),
295 lifecycle: LifecycleState::new(),
296 providers: ProviderState::new(initial_prompt_tokens),
297 metrics: MetricsState::new(token_counter),
298 debug: DebugState::default(),
299 instructions: InstructionState::default(),
300 };
301
302 Self {
303 provider,
304 embedding_provider,
305 channel,
306 tool_executor: Arc::new(tool_executor),
307 msg: MessageState {
308 messages: vec![Message {
309 role: Role::System,
310 content: system_prompt,
311 parts: vec![],
312 metadata: MessageMetadata::default(),
313 }],
314 message_queue: VecDeque::new(),
315 pending_image_parts: Vec::new(),
316 last_persisted_message_id: None,
317 deferred_db_hide_ids: Vec::new(),
318 deferred_db_summaries: Vec::new(),
319 },
320 context_manager: context_manager::ContextManager::new(),
321 tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
322 services,
323 runtime,
324 }
325 }
326
327 #[must_use]
340 pub fn into_channel(self) -> C {
341 self.channel
342 }
343
344 #[tracing::instrument(name = "core.agent.poll_subagents", skip_all, level = "debug")]
349 pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
350 let Some(mgr) = &mut self.services.orchestration.subagent_manager else {
351 return vec![];
352 };
353
354 let finished: Vec<String> = mgr
355 .statuses()
356 .into_iter()
357 .filter_map(|(id, status)| {
358 if matches!(
359 status.state,
360 zeph_subagent::SubAgentState::Completed
361 | zeph_subagent::SubAgentState::Failed
362 | zeph_subagent::SubAgentState::Canceled
363 ) {
364 Some(id)
365 } else {
366 None
367 }
368 })
369 .collect();
370
371 let mut results = vec![];
372 for task_id in finished {
373 match mgr.collect(&task_id).await {
374 Ok(result) => results.push((task_id, result)),
375 Err(e) => {
376 tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
377 }
378 }
379 }
380 results
381 }
382
383 async fn call_llm_for_session_summary(
392 &self,
393 chat_messages: &[Message],
394 ) -> Option<zeph_memory::StructuredSummary> {
395 let timeout_dur = std::time::Duration::from_secs(
396 self.services
397 .memory
398 .compaction
399 .shutdown_summary_timeout_secs,
400 );
401 match tokio::time::timeout(
402 timeout_dur,
403 self.provider
404 .chat_typed_erased::<zeph_memory::StructuredSummary>(chat_messages),
405 )
406 .await
407 {
408 Ok(Ok(s)) => Some(s),
409 Ok(Err(e)) => {
410 tracing::warn!(
411 "shutdown summary: structured LLM call failed, falling back to plain: {e:#}"
412 );
413 self.plain_text_summary_fallback(chat_messages, timeout_dur)
414 .await
415 }
416 Err(_) => {
417 tracing::warn!(
418 "shutdown summary: structured LLM call timed out after {}s, falling back to plain",
419 self.services
420 .memory
421 .compaction
422 .shutdown_summary_timeout_secs
423 );
424 self.plain_text_summary_fallback(chat_messages, timeout_dur)
425 .await
426 }
427 }
428 }
429
430 async fn plain_text_summary_fallback(
431 &self,
432 chat_messages: &[Message],
433 timeout_dur: std::time::Duration,
434 ) -> Option<zeph_memory::StructuredSummary> {
435 match tokio::time::timeout(timeout_dur, self.provider.chat(chat_messages)).await {
436 Ok(Ok(plain)) => Some(zeph_memory::StructuredSummary {
437 summary: plain,
438 key_facts: vec![],
439 entities: vec![],
440 }),
441 Ok(Err(e)) => {
442 tracing::warn!("shutdown summary: plain LLM fallback failed: {e:#}");
443 None
444 }
445 Err(_) => {
446 tracing::warn!("shutdown summary: plain LLM fallback timed out");
447 None
448 }
449 }
450 }
451
452 async fn flush_orphaned_tool_use_on_shutdown(&mut self) {
457 use zeph_llm::provider::{MessagePart, Role};
458
459 let msgs = &self.msg.messages;
463 let Some(asst_idx) = msgs.iter().rposition(|m| m.role == Role::Assistant) else {
465 return;
466 };
467 let asst_msg = &msgs[asst_idx];
468 let tool_use_ids: Vec<(&str, &str, &serde_json::Value)> = asst_msg
469 .parts
470 .iter()
471 .filter_map(|p| {
472 if let MessagePart::ToolUse { id, name, input } = p {
473 Some((id.as_str(), name.as_str(), input))
474 } else {
475 None
476 }
477 })
478 .collect();
479 if tool_use_ids.is_empty() {
480 return;
481 }
482
483 let paired_ids: std::collections::HashSet<&str> = msgs
485 .get(asst_idx + 1..)
486 .into_iter()
487 .flatten()
488 .filter(|m| m.role == Role::User)
489 .flat_map(|m| m.parts.iter())
490 .filter_map(|p| {
491 if let MessagePart::ToolResult { tool_use_id, .. } = p {
492 Some(tool_use_id.as_str())
493 } else {
494 None
495 }
496 })
497 .collect();
498
499 let unpaired: Vec<zeph_llm::provider::ToolUseRequest> = tool_use_ids
500 .iter()
501 .filter(|(id, _, _)| !paired_ids.contains(*id))
502 .map(|(id, name, input)| zeph_llm::provider::ToolUseRequest {
503 id: (*id).to_owned(),
504 name: (*name).to_owned().into(),
505 input: (*input).clone(),
506 })
507 .collect();
508
509 if unpaired.is_empty() {
510 return;
511 }
512
513 tracing::info!(
514 count = unpaired.len(),
515 "shutdown: persisting tombstone ToolResults for unpaired in-flight tool calls"
516 );
517 self.persist_cancelled_tool_results(&unpaired).await;
518 }
519
520 async fn maybe_store_shutdown_summary(&mut self) {
530 if !self.services.memory.compaction.shutdown_summary {
531 return;
532 }
533 let Some(memory) = self.services.memory.persistence.memory.clone() else {
534 return;
535 };
536 let Some(conversation_id) = self.services.memory.persistence.conversation_id else {
537 return;
538 };
539
540 match memory.has_session_summary(conversation_id).await {
542 Ok(true) => {
543 tracing::debug!("shutdown summary: session already has a summary, skipping");
544 return;
545 }
546 Ok(false) => {}
547 Err(e) => {
548 tracing::warn!("shutdown summary: failed to check existing summary: {e:#}");
549 return;
550 }
551 }
552
553 let user_count = self
555 .msg
556 .messages
557 .iter()
558 .skip(1)
559 .filter(|m| m.role == Role::User)
560 .count();
561 if user_count
562 < self
563 .services
564 .memory
565 .compaction
566 .shutdown_summary_min_messages
567 {
568 tracing::debug!(
569 user_count,
570 min = self
571 .services
572 .memory
573 .compaction
574 .shutdown_summary_min_messages,
575 "shutdown summary: too few user messages, skipping"
576 );
577 return;
578 }
579
580 let _ = self.channel.send_status("Saving session summary...").await;
582
583 let max = self
585 .services
586 .memory
587 .compaction
588 .shutdown_summary_max_messages;
589 if max == 0 {
590 tracing::debug!("shutdown summary: max_messages=0, skipping");
591 return;
592 }
593 let non_system: Vec<_> = self.msg.messages.iter().skip(1).collect();
594 let slice = if non_system.len() > max {
595 &non_system[non_system.len() - max..]
596 } else {
597 &non_system[..]
598 };
599
600 let msgs_for_prompt: Vec<(zeph_memory::MessageId, String, String)> = slice
601 .iter()
602 .map(|m| {
603 let role = match m.role {
604 Role::User => "user".to_owned(),
605 Role::Assistant => "assistant".to_owned(),
606 Role::System => "system".to_owned(),
607 };
608 (zeph_memory::MessageId(0), role, m.content.clone())
609 })
610 .collect();
611
612 let prompt = zeph_memory::build_summarization_prompt(&msgs_for_prompt);
613 let chat_messages = vec![Message {
614 role: Role::User,
615 content: prompt,
616 parts: vec![],
617 metadata: MessageMetadata::default(),
618 }];
619
620 let Some(structured) = self.call_llm_for_session_summary(&chat_messages).await else {
621 let _ = self.channel.send_status("").await;
622 return;
623 };
624
625 if let Err(e) = memory
626 .store_shutdown_summary(conversation_id, &structured.summary, &structured.key_facts)
627 .await
628 {
629 tracing::warn!("shutdown summary: storage failed: {e:#}");
630 } else {
631 tracing::info!(
632 conversation_id = conversation_id.0,
633 "shutdown summary stored"
634 );
635 }
636
637 let _ = self.channel.send_status("").await;
639 }
640
641 #[tracing::instrument(name = "core.agent.shutdown", skip_all, level = "debug")]
657 pub async fn shutdown(&mut self) {
658 let _ = self.channel.send_status("Shutting down...").await;
659
660 self.provider.save_router_state().await;
662
663 if let Some(ref advisor) = self.services.orchestration.topology_advisor
665 && let Err(e) = advisor.save()
666 {
667 tracing::warn!(error = %e, "adaptorch: failed to persist state");
668 }
669
670 if let Some(ref mut mgr) = self.services.orchestration.subagent_manager {
671 mgr.shutdown_all();
672 }
673
674 if let Some(ref manager) = self.services.mcp.manager {
675 manager.shutdown_all_shared().await;
676 }
677
678 if let Some(turns) = self.context_manager.turns_since_last_hard_compaction {
682 self.update_metrics(|m| {
683 m.compaction_turns_after_hard.push(turns);
684 });
685 self.context_manager.turns_since_last_hard_compaction = None;
686 }
687
688 if let Some(ref tx) = self.runtime.metrics.metrics_tx {
689 let m = tx.borrow();
690 if m.filter_applications > 0 {
691 #[allow(clippy::cast_precision_loss)]
692 let pct = if m.filter_raw_tokens > 0 {
693 m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
694 } else {
695 0.0
696 };
697 tracing::info!(
698 raw_tokens = m.filter_raw_tokens,
699 saved_tokens = m.filter_saved_tokens,
700 applications = m.filter_applications,
701 "tool output filtering saved ~{} tokens ({pct:.0}%)",
702 m.filter_saved_tokens,
703 );
704 }
705 if m.compaction_hard_count > 0 {
706 tracing::info!(
707 hard_compactions = m.compaction_hard_count,
708 turns_after_hard = ?m.compaction_turns_after_hard,
709 "hard compaction trajectory"
710 );
711 }
712 }
713
714 self.flush_orphaned_tool_use_on_shutdown().await;
718
719 self.runtime.lifecycle.supervisor.abort_all();
725
726 if let Some(h) = self.services.compression.pending_task_goal.take() {
729 h.abort();
730 }
731 if let Some(h) = self.services.compression.pending_sidequest_result.take() {
732 h.abort();
733 }
734 if let Some(h) = self.services.compression.pending_subgoal.take() {
735 h.abort();
736 }
737
738 self.services.learning_engine.learning_tasks.abort_all();
740
741 for _ in 0..4 {
746 tokio::task::yield_now().await;
747 }
748
749 self.maybe_store_shutdown_summary().await;
750 self.maybe_store_session_digest().await;
751
752 tracing::info!("agent shutdown complete");
753 }
754
755 fn refresh_subagent_metrics(&mut self) {
762 let Some(ref mgr) = self.services.orchestration.subagent_manager else {
763 return;
764 };
765 let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
766 .statuses()
767 .into_iter()
768 .map(|(id, s)| {
769 let def = mgr.agents_def(&id);
770 crate::metrics::SubAgentMetrics {
771 name: def.map_or_else(|| id[..8.min(id.len())].to_owned(), |d| d.name.clone()),
772 id: id.clone(),
773 state: format!("{:?}", s.state).to_lowercase(),
774 turns_used: s.turns_used,
775 max_turns: def.map_or(20, |d| d.permissions.max_turns),
776 background: def.is_some_and(|d| d.permissions.background),
777 elapsed_secs: s.started_at.elapsed().as_secs(),
778 permission_mode: def.map_or_else(String::new, |d| {
779 use zeph_subagent::def::PermissionMode;
780 match d.permissions.permission_mode {
781 PermissionMode::Default => String::new(),
782 PermissionMode::AcceptEdits => "accept_edits".into(),
783 PermissionMode::DontAsk => "dont_ask".into(),
784 PermissionMode::BypassPermissions => "bypass_permissions".into(),
785 PermissionMode::Plan => "plan".into(),
786 }
787 }),
788 transcript_dir: mgr
789 .agent_transcript_dir(&id)
790 .map(|p| p.to_string_lossy().into_owned()),
791 }
792 })
793 .collect();
794 self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
795 }
796
797 async fn notify_completed_subagents(&mut self) -> Result<(), error::AgentError> {
799 let completed = self.poll_subagents().await;
800 for (task_id, result) in completed {
801 let notice = if result.is_empty() {
802 format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
803 } else {
804 format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
805 };
806 if let Err(e) = self.channel.send(¬ice).await {
807 tracing::warn!(error = %e, "failed to send sub-agent completion notice");
808 }
809 }
810 Ok(())
811 }
812
813 #[tracing::instrument(name = "core.agent.run", skip_all, level = "debug", err)]
819 #[allow(clippy::too_many_lines)] pub async fn run(&mut self) -> Result<(), error::AgentError>
821 where
822 C: 'static,
823 {
824 if let Some(mut rx) = self.runtime.lifecycle.warmup_ready.take()
825 && !*rx.borrow()
826 {
827 let _ = rx.changed().await;
828 if !*rx.borrow() {
829 tracing::warn!("model warmup did not complete successfully");
830 }
831 }
832
833 self.restore_channel_provider().await;
835
836 self.load_and_cache_session_digest().await;
838 self.maybe_send_resume_recap().await;
839
840 loop {
841 self.apply_provider_override();
842 self.check_tool_refresh().await;
843 self.process_pending_elicitations().await;
844 self.refresh_subagent_metrics();
845 self.notify_completed_subagents().await?;
846 self.drain_channel();
847
848 let (text, image_parts) = if let Some(queued) = self.msg.message_queue.pop_front() {
849 self.notify_queue_count().await;
850 if queued.raw_attachments.is_empty() {
851 (queued.text, queued.image_parts)
852 } else {
853 let msg = crate::channel::ChannelMessage {
854 text: queued.text,
855 attachments: queued.raw_attachments,
856 is_guest_context: false,
857 is_from_bot: false,
858 };
859 self.resolve_message(msg).await
860 }
861 } else {
862 match self.next_event().await? {
863 None | Some(LoopEvent::Shutdown) => break,
864 Some(LoopEvent::SkillReload) => {
865 self.reload_skills().await;
866 continue;
867 }
868 Some(LoopEvent::InstructionReload) => {
869 self.reload_instructions();
870 continue;
871 }
872 Some(LoopEvent::ConfigReload) => {
873 self.reload_config();
874 continue;
875 }
876 Some(LoopEvent::UpdateNotification(msg)) => {
877 if let Err(e) = self.channel.send(&msg).await {
878 tracing::warn!("failed to send update notification: {e}");
879 }
880 continue;
881 }
882 Some(LoopEvent::ExperimentCompleted(msg)) => {
883 self.services.experiments.cancel = None;
884 if let Err(e) = self.channel.send(&msg).await {
885 tracing::warn!("failed to send experiment completion: {e}");
886 }
887 continue;
888 }
889 Some(LoopEvent::ScheduledTask(prompt)) => {
890 let text = format!("{SCHEDULED_TASK_PREFIX}{prompt}");
891 let msg = crate::channel::ChannelMessage {
892 text,
893 attachments: Vec::new(),
894 is_guest_context: false,
895 is_from_bot: false,
896 };
897 self.drain_channel();
898 self.resolve_message(msg).await
899 }
900 Some(LoopEvent::TaskInjected(injection)) => {
901 if let Some(ref mut ls) = self.runtime.lifecycle.user_loop {
902 ls.iteration += 1;
903 tracing::info!(iteration = ls.iteration, "loop: tick");
904 }
905 let msg = crate::channel::ChannelMessage {
906 text: injection.prompt,
907 attachments: Vec::new(),
908 is_guest_context: false,
909 is_from_bot: false,
910 };
911 self.drain_channel();
912 self.resolve_message(msg).await
913 }
914 Some(LoopEvent::FileChanged(event)) => {
915 self.handle_file_changed(event).await;
916 continue;
917 }
918 Some(LoopEvent::Message(msg)) => {
919 self.services.session.is_guest_context = msg.is_guest_context;
920 self.drain_channel();
921 self.resolve_message(msg).await
922 }
923 }
924 };
925
926 let trimmed = text.trim();
927
928 if trimmed.starts_with('/') {
931 let slash_urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
932 if !slash_urls.is_empty() {
933 self.services
934 .security
935 .user_provided_urls
936 .write()
937 .extend(slash_urls);
938 }
939 }
940
941 let session_impl = command_context_impls::SessionAccessImpl {
964 supports_exit: self.channel.supports_exit(),
965 };
966 let mut messages_impl = command_context_impls::MessageAccessImpl {
967 msg: &mut self.msg,
968 tool_state: &mut self.services.tool_state,
969 providers: &mut self.runtime.providers,
970 metrics: &self.runtime.metrics,
971 security: &mut self.services.security,
972 tool_orchestrator: &mut self.tool_orchestrator,
973 };
974 let mut sink_adapter = crate::channel::ChannelSinkAdapter(&mut self.channel);
976 let mut null_agent = zeph_commands::NullAgent;
978 let registry_handled = {
979 use zeph_commands::CommandRegistry;
980 use zeph_commands::handlers::debug::{
981 DebugDumpCommand, DumpFormatCommand, LogCommand,
982 };
983 use zeph_commands::handlers::help::HelpCommand;
984 use zeph_commands::handlers::session::{
985 ClearCommand, ClearQueueCommand, ExitCommand, QuitCommand, ResetCommand,
986 };
987
988 let mut reg = CommandRegistry::new();
989 reg.register(ExitCommand);
990 reg.register(QuitCommand);
991 reg.register(ClearCommand);
992 reg.register(ResetCommand);
993 reg.register(ClearQueueCommand);
994 reg.register(LogCommand);
995 reg.register(DebugDumpCommand);
996 reg.register(DumpFormatCommand);
997 reg.register(HelpCommand);
998 #[cfg(test)]
999 reg.register(test_stubs::TestErrorCommand);
1000
1001 let mut ctx = zeph_commands::CommandContext {
1002 sink: &mut sink_adapter,
1003 debug: &mut self.runtime.debug,
1004 messages: &mut messages_impl,
1005 session: &session_impl,
1006 agent: &mut null_agent,
1007 };
1008 reg.dispatch(&mut ctx, trimmed).await
1009 };
1010 let session_reg_missed = registry_handled.is_none();
1011 match self
1012 .apply_dispatch_result(registry_handled, trimmed, false)
1013 .await
1014 {
1015 DispatchFlow::Break => break,
1016 DispatchFlow::Continue => continue,
1017 DispatchFlow::Fallthrough => {
1018 }
1020 }
1021
1022 let mut agent_null_debug = command_context_impls::NullDebugAccess;
1028 let mut agent_null_messages = command_context_impls::NullMessageAccess;
1029 let agent_null_session = command_context_impls::NullSessionAccess;
1030 let mut agent_null_sink = zeph_commands::NullSink;
1031 let agent_result: Option<
1032 Result<zeph_commands::CommandOutput, zeph_commands::CommandError>,
1033 > = if session_reg_missed {
1034 use zeph_commands::CommandRegistry;
1035 use zeph_commands::handlers::{
1036 acp::AcpCommand,
1037 agent_cmd::AgentCommand,
1038 compaction::{CompactCommand, NewConversationCommand, RecapCommand},
1039 experiment::ExperimentCommand,
1040 goal::GoalCommand,
1041 loop_cmd::LoopCommand,
1042 lsp::LspCommand,
1043 mcp::McpCommand,
1044 memory::{GraphCommand, GuidelinesCommand, MemoryCommand},
1045 misc::{CacheStatsCommand, ImageCommand, NotifyTestCommand},
1046 model::{ModelCommand, ProviderCommand},
1047 plan::PlanCommand,
1048 plugins::PluginsCommand,
1049 policy::PolicyCommand,
1050 scheduler::SchedulerCommand,
1051 skill::{FeedbackCommand, SkillCommand, SkillsCommand},
1052 status::{FocusCommand, GuardrailCommand, SideQuestCommand, StatusCommand},
1053 trajectory::{ScopeCommand, TrajectoryCommand},
1054 };
1055
1056 let mut agent_reg = CommandRegistry::new();
1057 agent_reg.register(MemoryCommand);
1058 agent_reg.register(GraphCommand);
1059 agent_reg.register(GuidelinesCommand);
1060 agent_reg.register(ModelCommand);
1061 agent_reg.register(ProviderCommand);
1062 agent_reg.register(SkillCommand);
1064 agent_reg.register(SkillsCommand);
1065 agent_reg.register(FeedbackCommand);
1066 agent_reg.register(McpCommand);
1067 agent_reg.register(PolicyCommand);
1068 agent_reg.register(SchedulerCommand);
1069 agent_reg.register(LspCommand);
1070 agent_reg.register(CacheStatsCommand);
1072 agent_reg.register(ImageCommand);
1073 agent_reg.register(NotifyTestCommand);
1074 agent_reg.register(StatusCommand);
1075 agent_reg.register(GuardrailCommand);
1076 agent_reg.register(FocusCommand);
1077 agent_reg.register(SideQuestCommand);
1078 agent_reg.register(AgentCommand);
1079 agent_reg.register(CompactCommand);
1081 agent_reg.register(NewConversationCommand);
1082 agent_reg.register(RecapCommand);
1083 agent_reg.register(ExperimentCommand);
1084 agent_reg.register(PlanCommand);
1085 agent_reg.register(LoopCommand);
1086 agent_reg.register(PluginsCommand);
1087 agent_reg.register(AcpCommand);
1088 #[cfg(feature = "cocoon")]
1089 agent_reg.register(zeph_commands::handlers::cocoon::CocoonCommand);
1090 agent_reg.register(TrajectoryCommand);
1091 agent_reg.register(ScopeCommand);
1092 agent_reg.register(GoalCommand);
1093
1094 let mut ctx = zeph_commands::CommandContext {
1095 sink: &mut agent_null_sink,
1096 debug: &mut agent_null_debug,
1097 messages: &mut agent_null_messages,
1098 session: &agent_null_session,
1099 agent: self,
1100 };
1101 agent_reg.dispatch(&mut ctx, trimmed).await
1103 } else {
1104 None
1105 };
1106 match self
1110 .apply_dispatch_result(agent_result, trimmed, true)
1111 .await
1112 {
1113 DispatchFlow::Break => break,
1114 DispatchFlow::Continue => continue,
1115 DispatchFlow::Fallthrough => {
1116 }
1118 }
1119
1120 match self.handle_builtin_command(trimmed) {
1121 Some(true) => break,
1122 Some(false) => continue,
1123 None => {}
1124 }
1125
1126 self.process_user_message(text, image_parts).await?;
1127 }
1128
1129 self.maybe_autodream().await;
1132
1133 if let Some(ref mut tc) = self.runtime.debug.trace_collector {
1135 tc.finish();
1136 }
1137
1138 Ok(())
1139 }
1140
1141 async fn apply_dispatch_result(
1147 &mut self,
1148 result: Option<Result<zeph_commands::CommandOutput, zeph_commands::CommandError>>,
1149 command: &str,
1150 with_learning: bool,
1151 ) -> DispatchFlow {
1152 match result {
1153 Some(Ok(zeph_commands::CommandOutput::Exit)) => {
1154 let _ = self.channel.flush_chunks().await;
1155 DispatchFlow::Break
1156 }
1157 Some(Ok(
1158 zeph_commands::CommandOutput::Continue | zeph_commands::CommandOutput::Silent,
1159 )) => {
1160 let _ = self.channel.flush_chunks().await;
1161 DispatchFlow::Continue
1162 }
1163 Some(Ok(zeph_commands::CommandOutput::Message(msg))) => {
1164 let _ = self.channel.send(&msg).await;
1165 let _ = self.channel.flush_chunks().await;
1166 if with_learning {
1167 self.maybe_trigger_post_command_learning(command).await;
1168 }
1169 DispatchFlow::Continue
1170 }
1171 Some(Err(e)) => {
1172 let _ = self.channel.send(&e.to_string()).await;
1173 let _ = self.channel.flush_chunks().await;
1174 tracing::warn!(command = %command, error = %e.0, "slash command failed");
1175 DispatchFlow::Continue
1176 }
1177 None => DispatchFlow::Fallthrough,
1178 }
1179 }
1180
1181 fn apply_provider_override(&mut self) {
1183 if let Some(ref slot) = self.runtime.providers.provider_override
1184 && let Some(new_provider) = slot.write().take()
1185 {
1186 tracing::debug!(provider = new_provider.name(), "ACP model override applied");
1187 self.provider = new_provider;
1188 }
1189 }
1190
1191 #[tracing::instrument(name = "core.agent.next_event", skip_all, level = "debug", err)]
1199 async fn next_event(&mut self) -> Result<Option<LoopEvent>, error::AgentError> {
1200 let event = tokio::select! {
1201 result = self.channel.recv() => {
1202 return Ok(result?.map(LoopEvent::Message));
1203 }
1204 () = shutdown_signal(&mut self.runtime.lifecycle.shutdown) => {
1205 tracing::info!("shutting down");
1206 LoopEvent::Shutdown
1207 }
1208 Some(_) = recv_optional(&mut self.services.skill.skill_reload_rx) => {
1209 LoopEvent::SkillReload
1210 }
1211 Some(_) = recv_optional(&mut self.runtime.instructions.reload_rx) => {
1212 LoopEvent::InstructionReload
1213 }
1214 Some(_) = recv_optional(&mut self.runtime.lifecycle.config_reload_rx) => {
1215 LoopEvent::ConfigReload
1216 }
1217 Some(msg) = recv_optional(&mut self.runtime.lifecycle.update_notify_rx) => {
1218 LoopEvent::UpdateNotification(msg)
1219 }
1220 Some(msg) = recv_optional(&mut self.services.experiments.notify_rx) => {
1221 LoopEvent::ExperimentCompleted(msg)
1222 }
1223 Some(prompt) = recv_optional(&mut self.runtime.lifecycle.custom_task_rx) => {
1224 tracing::info!("scheduler: injecting custom task as agent turn");
1225 LoopEvent::ScheduledTask(prompt)
1226 }
1227 () = async {
1228 if let Some(ref mut ls) = self.runtime.lifecycle.user_loop {
1229 if ls.cancel_tx.is_cancelled() {
1230 std::future::pending::<()>().await;
1231 } else {
1232 ls.interval.tick().await;
1233 }
1234 } else {
1235 std::future::pending::<()>().await;
1236 }
1237 } => {
1238 let Some(ls) = self.runtime.lifecycle.user_loop.as_ref() else {
1242 return Ok(None);
1243 };
1244 if ls.cancel_tx.is_cancelled() {
1245 self.runtime.lifecycle.user_loop = None;
1246 return Ok(None);
1247 }
1248 let prompt = ls.prompt.clone();
1249 LoopEvent::TaskInjected(task_injection::TaskInjection { prompt })
1250 }
1251 Some(event) = recv_optional(&mut self.runtime.lifecycle.file_changed_rx) => {
1252 LoopEvent::FileChanged(event)
1253 }
1254 };
1255 Ok(Some(event))
1256 }
1257
1258 #[tracing::instrument(name = "core.agent.resolve_message", skip_all, level = "debug")]
1259 async fn resolve_message(
1260 &self,
1261 msg: crate::channel::ChannelMessage,
1262 ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
1263 use crate::channel::{Attachment, AttachmentKind};
1264 use zeph_llm::provider::{ImageData, MessagePart};
1265
1266 let text_base = msg.text.clone();
1267
1268 let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
1269 .attachments
1270 .into_iter()
1271 .partition(|a| a.kind == AttachmentKind::Audio);
1272
1273 tracing::debug!(
1274 audio = audio_attachments.len(),
1275 has_stt = self.runtime.providers.stt.is_some(),
1276 "resolve_message attachments"
1277 );
1278
1279 let text = if !audio_attachments.is_empty()
1280 && let Some(stt) = self.runtime.providers.stt.as_ref()
1281 {
1282 let mut transcribed_parts = Vec::new();
1283 for attachment in &audio_attachments {
1284 if attachment.data.len() > MAX_AUDIO_BYTES {
1285 tracing::warn!(
1286 size = attachment.data.len(),
1287 max = MAX_AUDIO_BYTES,
1288 "audio attachment exceeds size limit, skipping"
1289 );
1290 continue;
1291 }
1292 match stt
1293 .transcribe(&attachment.data, attachment.filename.as_deref())
1294 .await
1295 {
1296 Ok(result) => {
1297 tracing::info!(
1298 len = result.text.len(),
1299 language = ?result.language,
1300 "audio transcribed"
1301 );
1302 transcribed_parts.push(result.text);
1303 }
1304 Err(e) => {
1305 tracing::error!(error = %e, "audio transcription failed");
1306 }
1307 }
1308 }
1309 if transcribed_parts.is_empty() {
1310 text_base
1311 } else {
1312 let transcribed = transcribed_parts.join("\n");
1313 if text_base.is_empty() {
1314 transcribed
1315 } else {
1316 format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
1317 }
1318 }
1319 } else {
1320 if !audio_attachments.is_empty() {
1321 tracing::warn!(
1322 count = audio_attachments.len(),
1323 "audio attachments received but no STT provider configured, dropping"
1324 );
1325 }
1326 text_base
1327 };
1328
1329 let mut image_parts = Vec::new();
1330 for attachment in image_attachments {
1331 if attachment.data.len() > MAX_IMAGE_BYTES {
1332 tracing::warn!(
1333 size = attachment.data.len(),
1334 max = MAX_IMAGE_BYTES,
1335 "image attachment exceeds size limit, skipping"
1336 );
1337 continue;
1338 }
1339 let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
1340 image_parts.push(MessagePart::Image(Box::new(ImageData {
1341 data: attachment.data,
1342 mime_type,
1343 })));
1344 }
1345
1346 (text, image_parts)
1347 }
1348
1349 fn begin_turn(&mut self, input: turn::TurnInput) -> turn::Turn {
1356 let id = turn::TurnId(self.runtime.debug.iteration_counter as u64);
1357 self.runtime.debug.iteration_counter += 1;
1358 let cancel_token = CancellationToken::new();
1359 self.runtime.lifecycle.cancel_token = cancel_token.clone();
1361 self.services.security.user_provided_urls.write().clear();
1362 self.runtime.lifecycle.turn_llm_requests = 0;
1364
1365 {
1367 let pending: Vec<u8> = {
1368 let mut q = self.services.security.trajectory_signal_queue.lock();
1369 std::mem::take(&mut *q)
1370 };
1371 for code in pending {
1372 self.services
1373 .security
1374 .trajectory
1375 .record(crate::agent::trajectory::RiskSignal::from_code(code));
1376 }
1377 }
1378 if self.services.security.trajectory.advance_turn()
1381 && let Some(logger) = self.tool_orchestrator.audit_logger.clone()
1382 {
1383 let entry = zeph_tools::AuditEntry {
1384 timestamp: zeph_tools::chrono_now(),
1385 tool: "<sentinel>".to_owned().into(),
1386 command: String::new(),
1387 result: zeph_tools::AuditResult::Success,
1388 duration_ms: 0,
1389 error_category: Some("trajectory_auto_recover".to_owned()),
1390 error_domain: Some("security".to_owned()),
1391 error_phase: None,
1392 claim_source: None,
1393 mcp_server_id: None,
1394 injection_flagged: false,
1395 embedding_anomalous: false,
1396 cross_boundary_mcp_to_acp: false,
1397 adversarial_policy_decision: None,
1398 exit_code: None,
1399 truncated: false,
1400 caller_id: None,
1401 policy_match: None,
1402 correlation_id: None,
1403 vigil_risk: None,
1404 execution_env: None,
1405 resolved_cwd: None,
1406 scope_at_definition: None,
1407 scope_at_dispatch: None,
1408 };
1409 self.runtime.lifecycle.supervisor.spawn(
1410 crate::agent::agent_supervisor::TaskClass::Telemetry,
1411 "trajectory-auto-recover-audit",
1412 async move { logger.log(&entry).await },
1413 );
1414 }
1415 if let Some(ref sentinel) = self.services.security.shadow_sentinel {
1417 sentinel.advance_turn();
1418 }
1419 let risk_level = self.services.security.trajectory.current_risk();
1421 *self.services.security.trajectory_risk_slot.write() = u8::from(risk_level);
1422 if let Some(alert) = self.services.security.trajectory.poll_alert() {
1424 let msg = format!(
1425 "[trajectory] Risk level: {:?} (score={:.2})",
1426 alert.level, alert.score
1427 );
1428 tracing::warn!(
1429 level = ?alert.level,
1430 score = alert.score,
1431 "trajectory sentinel alert"
1432 );
1433 if let Some(ref tx) = self.services.session.status_tx {
1434 let _ = tx.send(msg);
1435 }
1436 }
1437
1438 let context = turn::TurnContext::new(id, cancel_token, self.runtime.config.timeouts);
1439 turn::Turn::new(context, input)
1440 }
1441
1442 fn end_turn(&mut self, turn: turn::Turn) {
1449 self.runtime.metrics.pending_timings = turn.metrics.timings;
1450 self.flush_turn_timings();
1451 self.services.session.current_turn_intent = None;
1453 self.services.session.is_guest_context = false;
1455 if let Some(ref engine) = self.services.speculation_engine {
1457 let metrics = engine.end_turn();
1458 if metrics.committed > 0 || metrics.cancelled > 0 {
1459 tracing::debug!(
1460 committed = metrics.committed,
1461 cancelled = metrics.cancelled,
1462 wasted_ms = metrics.wasted_ms,
1463 "speculation: turn boundary metrics"
1464 );
1465 }
1466 }
1467 }
1468
1469 #[tracing::instrument(
1470 name = "core.agent.process_user_message",
1471 skip_all,
1472 level = "debug",
1473 fields(turn_id),
1474 err
1475 )]
1476 async fn process_user_message(
1477 &mut self,
1478 text: String,
1479 image_parts: Vec<zeph_llm::provider::MessagePart>,
1480 ) -> Result<(), error::AgentError> {
1481 let input = turn::TurnInput::new(text, image_parts);
1482 let mut t = self.begin_turn(input);
1483
1484 let turn_idx = usize::try_from(t.id().0).unwrap_or(usize::MAX);
1485 tracing::Span::current().record("turn_id", t.id().0);
1486 self.runtime
1488 .debug
1489 .start_iteration_span(turn_idx, t.input.text.trim());
1490
1491 let result = Box::pin(self.process_user_message_inner(&mut t)).await;
1492
1493 let span_status = if result.is_ok() {
1495 crate::debug_dump::trace::SpanStatus::Ok
1496 } else {
1497 crate::debug_dump::trace::SpanStatus::Error {
1498 message: "iteration failed".to_owned(),
1499 }
1500 };
1501 self.runtime.debug.end_iteration_span(turn_idx, span_status);
1502
1503 self.end_turn(t);
1504 result
1505 }
1506
1507 #[tracing::instrument(
1508 name = "core.agent.process_user_message_inner",
1509 skip_all,
1510 level = "debug",
1511 err
1512 )]
1513 async fn process_user_message_inner(
1514 &mut self,
1515 turn: &mut turn::Turn,
1516 ) -> Result<(), error::AgentError> {
1517 self.reap_background_tasks_and_update_metrics();
1518
1519 let tokens_before_turn = self
1520 .runtime
1521 .metrics
1522 .metrics_tx
1523 .as_ref()
1524 .map_or(0, |tx| tx.borrow().total_tokens);
1525
1526 self.drain_background_completions();
1530
1531 self.wire_cancel_bridge(turn.cancel_token());
1532
1533 let text = turn.input.text.clone();
1535 let trimmed_owned = text.trim().to_owned();
1536 let trimmed = trimmed_owned.as_str();
1537
1538 if self.services.security.vigil.is_some() {
1541 let intent_len = trimmed.floor_char_boundary(1024.min(trimmed.len()));
1542 self.services.session.current_turn_intent = Some(trimmed[..intent_len].to_owned());
1543 }
1544
1545 if let Some(result) = self.dispatch_slash_command(trimmed).await {
1546 return result;
1547 }
1548
1549 self.check_pending_rollbacks().await;
1550
1551 if self.pre_process_security(trimmed).await? {
1552 return Ok(());
1553 }
1554
1555 let t_ctx = std::time::Instant::now();
1556 tracing::debug!("turn timing: prepare_context start");
1557 self.advance_context_lifecycle_guarded(&text, trimmed).await;
1558 turn.metrics_mut().timings.prepare_context_ms =
1559 u64::try_from(t_ctx.elapsed().as_millis()).unwrap_or(u64::MAX);
1560 tracing::debug!(
1561 ms = turn.metrics_snapshot().timings.prepare_context_ms,
1562 "turn timing: prepare_context done"
1563 );
1564
1565 let image_parts = std::mem::take(&mut turn.input.image_parts);
1566 let merged_text = self.build_user_message_text_with_bg_completions(&text);
1570 let user_msg = self.build_user_message(&merged_text, image_parts);
1571
1572 let urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
1575 if !urls.is_empty() {
1576 self.services
1577 .security
1578 .user_provided_urls
1579 .write()
1580 .extend(urls);
1581 }
1582
1583 self.services.memory.extraction.goal_text = Some(text.clone());
1586
1587 let t_persist = std::time::Instant::now();
1588 tracing::debug!("turn timing: persist_message(user) start");
1589 self.persist_message(Role::User, &text, &[], false).await;
1591 turn.metrics_mut().timings.persist_message_ms =
1592 u64::try_from(t_persist.elapsed().as_millis()).unwrap_or(u64::MAX);
1593 tracing::debug!(
1594 ms = turn.metrics_snapshot().timings.persist_message_ms,
1595 "turn timing: persist_message(user) done"
1596 );
1597 self.push_message(user_msg);
1598
1599 tracing::debug!("turn timing: process_response start");
1602 let turn_had_error = if let Err(e) = self.process_response().await {
1603 self.services.learning_engine.learning_tasks.detach_all();
1605 tracing::error!("Response processing failed: {e:#}");
1606
1607 if e.is_no_providers() {
1610 self.runtime.lifecycle.last_no_providers_at = Some(std::time::Instant::now());
1611 let backoff_secs = self.runtime.config.timeouts.no_providers_backoff_secs;
1612 tracing::warn!(
1613 backoff_secs,
1614 "no providers available; backing off before next turn"
1615 );
1616 tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await;
1617 }
1618
1619 let user_msg = format!("Error: {e:#}");
1620 self.channel.send(&user_msg).await?;
1621 self.msg.messages.pop();
1622 self.recompute_prompt_tokens();
1623 self.channel.flush_chunks().await?;
1624 true
1625 } else {
1626 self.services.learning_engine.learning_tasks.detach_all();
1629 self.truncate_old_tool_results();
1630 self.maybe_update_magic_docs();
1632 self.maybe_spawn_promotion_scan();
1634 false
1635 };
1636 tracing::debug!("turn timing: process_response done");
1637
1638 if let Some(pipeline) = self.services.quality.clone() {
1640 self.run_self_check_for_turn(pipeline, turn.id().0).await;
1641 }
1642 let _ = self.channel.flush_chunks().await;
1647
1648 self.maybe_fire_completion_notification(turn, turn_had_error);
1649
1650 self.flush_goal_accounting(tokens_before_turn);
1651
1652 turn.metrics_mut().timings.llm_chat_ms = self.runtime.metrics.pending_timings.llm_chat_ms;
1657 turn.metrics_mut().timings.tool_exec_ms = self.runtime.metrics.pending_timings.tool_exec_ms;
1658
1659 Ok(())
1660 }
1661
1662 fn wire_cancel_bridge(&mut self, turn_token: &tokio_util::sync::CancellationToken) {
1668 let signal = Arc::clone(&self.runtime.lifecycle.cancel_signal);
1669 let token = turn_token.clone();
1670 self.runtime.lifecycle.cancel_token = turn_token.clone();
1672 if let Some(prev) = self.runtime.lifecycle.cancel_bridge_handle.take() {
1673 prev.abort();
1674 }
1675 self.runtime.lifecycle.cancel_bridge_handle =
1676 Some(self.runtime.lifecycle.task_supervisor.spawn_oneshot(
1677 std::sync::Arc::from("agent.lifecycle.cancel_bridge"),
1678 move || async move {
1679 signal.notified().await;
1680 token.cancel();
1681 },
1682 ));
1683 }
1684
1685 fn reap_background_tasks_and_update_metrics(&mut self) {
1689 let bg_signal = self.runtime.lifecycle.supervisor.reap();
1690 if bg_signal.did_summarize {
1691 self.services.memory.persistence.unsummarized_count = 0;
1692 tracing::debug!("background summarization completed; unsummarized_count reset");
1693 }
1694 let snap = self.runtime.lifecycle.supervisor.metrics_snapshot();
1695 self.update_metrics(|m| {
1696 m.bg_inflight = snap.inflight as u64;
1697 m.bg_dropped = snap.total_dropped();
1698 m.bg_completed = snap.total_completed();
1699 m.bg_enrichment_inflight = snap.class_inflight[0] as u64;
1700 m.bg_telemetry_inflight = snap.class_inflight[1] as u64;
1701 });
1702
1703 if self.runtime.lifecycle.shell_executor_handle.is_some() {
1705 let shell_rows: Vec<crate::metrics::ShellBackgroundRunRow> = self
1706 .runtime
1707 .lifecycle
1708 .shell_executor_handle
1709 .as_ref()
1710 .map(|e| e.background_runs_snapshot())
1711 .unwrap_or_default()
1712 .into_iter()
1713 .map(|s| crate::metrics::ShellBackgroundRunRow {
1714 run_id: truncate_shell_run_id(&s.run_id),
1715 command: truncate_shell_command(&s.command),
1716 elapsed_secs: s.elapsed_ms / 1000,
1717 })
1718 .collect();
1719 self.update_metrics(|m| {
1720 m.shell_background_runs = shell_rows;
1721 });
1722 }
1723
1724 if self
1727 .runtime
1728 .config
1729 .supervisor_config
1730 .abort_enrichment_on_turn
1731 {
1732 self.runtime
1733 .lifecycle
1734 .supervisor
1735 .abort_class(agent_supervisor::TaskClass::Enrichment);
1736 }
1737 }
1738
1739 fn maybe_fire_completion_notification(&mut self, turn: &turn::Turn, is_error: bool) {
1752 let snap = turn.metrics_snapshot().timings.clone();
1753 let duration_ms = snap
1754 .prepare_context_ms
1755 .saturating_add(snap.llm_chat_ms)
1756 .saturating_add(snap.tool_exec_ms);
1757 let summary = crate::notifications::TurnSummary {
1758 duration_ms,
1759 preview: self.last_assistant_preview(160),
1760 tool_calls: 0,
1762 llm_requests: self.runtime.lifecycle.turn_llm_requests,
1763 exit_status: if is_error {
1764 crate::notifications::TurnExitStatus::Error
1765 } else {
1766 crate::notifications::TurnExitStatus::Success
1767 },
1768 };
1769
1770 let gate_ok = self
1772 .runtime
1773 .lifecycle
1774 .notifier
1775 .as_ref()
1776 .is_none_or(|n| n.should_fire(&summary));
1777
1778 if let Some(ref notifier) = self.runtime.lifecycle.notifier
1780 && gate_ok
1781 {
1782 notifier.fire(&summary);
1783 }
1784
1785 let hooks = self.services.session.hooks_config.turn_complete.clone();
1791 if !hooks.is_empty() && gate_ok {
1792 let mut env = std::collections::HashMap::new();
1793 env.insert(
1794 "ZEPH_TURN_DURATION_MS".to_owned(),
1795 summary.duration_ms.to_string(),
1796 );
1797 env.insert(
1798 "ZEPH_TURN_STATUS".to_owned(),
1799 if is_error { "error" } else { "success" }.to_owned(),
1800 );
1801 env.insert("ZEPH_TURN_PREVIEW".to_owned(), summary.preview.clone());
1802 env.insert(
1803 "ZEPH_TURN_LLM_REQUESTS".to_owned(),
1804 summary.llm_requests.to_string(),
1805 );
1806 let _span = tracing::info_span!("core.agent.turn_hooks").entered();
1807 let _accepted = self.runtime.lifecycle.supervisor.spawn(
1808 agent_supervisor::TaskClass::Telemetry,
1809 "turn-complete-hooks",
1810 async move {
1811 let no_mcp: Option<&'static dyn zeph_subagent::McpDispatch> = None;
1814 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, no_mcp).await {
1815 tracing::warn!(error = %e, "turn_complete hook failed");
1816 }
1817 },
1818 );
1819 }
1820 }
1821
1822 fn flush_goal_accounting(&mut self, tokens_before: u64) {
1825 let goal_snap = self
1826 .services
1827 .goal_accounting
1828 .as_ref()
1829 .and_then(|a| a.snapshot());
1830 self.update_metrics(|m| m.active_goal = goal_snap);
1831
1832 if let Some(ref accounting) = self.services.goal_accounting {
1833 let tokens_after = self
1834 .runtime
1835 .metrics
1836 .metrics_tx
1837 .as_ref()
1838 .map_or(0, |tx| tx.borrow().total_tokens);
1839 let turn_tokens = tokens_after.saturating_sub(tokens_before);
1840 let mut spawned: Option<
1841 std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + 'static>>,
1842 > = None;
1843 accounting.on_turn_complete(turn_tokens, |fut| {
1844 spawned = Some(fut);
1845 });
1846 if let Some(fut) = spawned {
1847 let _ = self.runtime.lifecycle.supervisor.spawn(
1848 agent_supervisor::TaskClass::Telemetry,
1849 "goal-accounting",
1850 fut,
1851 );
1852 }
1853 }
1854 }
1855
1856 #[tracing::instrument(
1858 name = "core.agent.pre_process_security",
1859 skip_all,
1860 level = "debug",
1861 err
1862 )]
1863 async fn pre_process_security(&mut self, trimmed: &str) -> Result<bool, error::AgentError> {
1864 if let Some(ref guardrail) = self.services.security.guardrail {
1866 use zeph_sanitizer::guardrail::GuardrailVerdict;
1867 let verdict = guardrail.check(trimmed).await;
1868 match &verdict {
1869 GuardrailVerdict::Flagged { reason, .. } => {
1870 tracing::warn!(
1871 reason = %reason,
1872 should_block = verdict.should_block(),
1873 "guardrail flagged user input"
1874 );
1875 if verdict.should_block() {
1876 let msg = format!("[guardrail] Input blocked: {reason}");
1877 let _ = self.channel.send(&msg).await;
1878 let _ = self.channel.flush_chunks().await;
1879 return Ok(true);
1880 }
1881 let _ = self
1883 .channel
1884 .send(&format!("[guardrail] Warning: {reason}"))
1885 .await;
1886 }
1887 GuardrailVerdict::Error { error } => {
1888 if guardrail.error_should_block() {
1889 tracing::warn!(%error, "guardrail check failed (fail_strategy=closed), blocking input");
1890 let msg = "[guardrail] Input blocked: check failed (see logs for details)";
1891 let _ = self.channel.send(msg).await;
1892 let _ = self.channel.flush_chunks().await;
1893 return Ok(true);
1894 }
1895 tracing::warn!(%error, "guardrail check failed (fail_strategy=open), allowing input");
1896 }
1897 GuardrailVerdict::Safe => {}
1898 }
1899 }
1900
1901 #[cfg(feature = "classifiers")]
1907 if self.services.security.sanitizer.scan_user_input() {
1908 match self
1909 .services
1910 .security
1911 .sanitizer
1912 .classify_injection(trimmed)
1913 .await
1914 {
1915 zeph_sanitizer::InjectionVerdict::Blocked => {
1916 self.push_classifier_metrics();
1917 let _ = self
1918 .channel
1919 .send("[security] Input blocked: injection detected by classifier.")
1920 .await;
1921 let _ = self.channel.flush_chunks().await;
1922 return Ok(true);
1923 }
1924 zeph_sanitizer::InjectionVerdict::Suspicious => {
1925 tracing::warn!("injection_classifier soft_signal on user input");
1926 }
1927 zeph_sanitizer::InjectionVerdict::Clean => {}
1928 }
1929 }
1930 #[cfg(feature = "classifiers")]
1931 self.push_classifier_metrics();
1932
1933 Ok(false)
1934 }
1935
1936 async fn advance_context_lifecycle_guarded(&mut self, text: &str, trimmed: &str) {
1943 let backoff_secs = self.runtime.config.timeouts.no_providers_backoff_secs;
1944 let prep_timeout_secs = self.runtime.config.timeouts.context_prep_timeout_secs;
1945
1946 let providers_recently_failed = self
1948 .runtime
1949 .lifecycle
1950 .last_no_providers_at
1951 .is_some_and(|t| t.elapsed().as_secs() < backoff_secs);
1952
1953 if providers_recently_failed {
1954 tracing::warn!(
1955 backoff_secs,
1956 "skipping context preparation: providers were unavailable on last turn"
1957 );
1958 return;
1959 }
1960
1961 let timeout_dur = std::time::Duration::from_secs(prep_timeout_secs);
1962 match tokio::time::timeout(timeout_dur, self.advance_context_lifecycle(text, trimmed)).await
1963 {
1964 Ok(()) => {}
1965 Err(_elapsed) => {
1966 tracing::warn!(
1967 timeout_secs = prep_timeout_secs,
1968 "context preparation timed out; proceeding with degraded context"
1969 );
1970 }
1971 }
1972 }
1973
1974 #[tracing::instrument(
1975 name = "core.agent.advance_context_lifecycle",
1976 skip_all,
1977 level = "debug"
1978 )]
1979 async fn advance_context_lifecycle(&mut self, text: &str, trimmed: &str) {
1980 self.services.mcp.pruning_cache.reset();
1982
1983 let conv_id = self.services.memory.persistence.conversation_id;
1986 self.rebuild_system_prompt(text).await;
1987
1988 self.detect_and_record_corrections(trimmed, conv_id).await;
1989 self.services.learning_engine.tick();
1990 self.analyze_and_learn().await;
1991 self.sync_graph_counts().await;
1992
1993 self.context_manager.compaction = self.context_manager.compaction.advance_turn();
1998
1999 {
2001 self.services.focus.tick();
2002
2003 let sidequest_should_fire = self.services.sidequest.tick();
2006 if sidequest_should_fire && !self.context_manager.compaction.is_compacted_this_turn() {
2007 self.maybe_sidequest_eviction();
2008 }
2009 }
2010
2011 {
2014 let cfg = &self.services.memory.extraction.graph_config.experience;
2015 if cfg.enabled
2016 && cfg.evolution_sweep_enabled
2017 && cfg.evolution_sweep_interval > 0
2018 && self
2019 .services
2020 .sidequest
2021 .turn_counter
2022 .checked_rem(cfg.evolution_sweep_interval as u64)
2023 == Some(0)
2024 && let Some(memory) = self.services.memory.persistence.memory.as_ref()
2025 && let (Some(exp), Some(graph)) =
2026 (memory.experience.as_ref(), memory.graph_store.as_ref())
2027 {
2028 let exp = std::sync::Arc::clone(exp);
2029 let graph = std::sync::Arc::clone(graph);
2030 let threshold = cfg.confidence_prune_threshold;
2031 let turn = self.services.sidequest.turn_counter;
2032 let accepted = self.runtime.lifecycle.supervisor.spawn(
2033 agent_supervisor::TaskClass::Telemetry,
2034 "experience-sweep",
2035 async move {
2036 match exp.evolution_sweep(graph.as_ref(), threshold).await {
2037 Ok(stats) => tracing::info!(
2038 turn,
2039 self_loops = stats.pruned_self_loops,
2040 low_confidence = stats.pruned_low_confidence,
2041 "evolution sweep complete",
2042 ),
2043 Err(e) => tracing::warn!(
2044 turn,
2045 error = %e,
2046 "evolution sweep failed",
2047 ),
2048 }
2049 },
2050 );
2051 if !accepted {
2052 tracing::warn!(
2053 turn = self.services.sidequest.turn_counter,
2054 "experience-sweep dropped (telemetry class at capacity)",
2055 );
2056 }
2057 }
2058 }
2059
2060 if let Some(warning) = self.cache_expiry_warning() {
2062 tracing::info!(warning, "cache expiry warning");
2063 let _ = self.channel.send_status(&warning).await;
2064 }
2065
2066 self.maybe_time_based_microcompact();
2069
2070 self.maybe_apply_deferred_summaries();
2075 self.flush_deferred_summaries().await;
2076
2077 if let Err(e) = self.maybe_proactive_compress().await {
2079 tracing::warn!("proactive compression failed: {e:#}");
2080 }
2081
2082 if let Err(e) = self.maybe_compact().await {
2083 tracing::warn!("context compaction failed: {e:#}");
2084 }
2085
2086 if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
2087 tracing::warn!("context preparation failed: {e:#}");
2088 }
2089
2090 self.provider
2092 .set_memory_confidence(self.services.memory.persistence.last_recall_confidence);
2093
2094 self.services.learning_engine.reset_reflection();
2095 }
2096
2097 fn build_user_message(
2098 &mut self,
2099 text: &str,
2100 image_parts: Vec<zeph_llm::provider::MessagePart>,
2101 ) -> Message {
2102 let mut all_image_parts = std::mem::take(&mut self.msg.pending_image_parts);
2103 all_image_parts.extend(image_parts);
2104
2105 if !all_image_parts.is_empty() && self.provider.supports_vision() {
2106 let mut parts = vec![zeph_llm::provider::MessagePart::Text {
2107 text: text.to_owned(),
2108 }];
2109 parts.extend(all_image_parts);
2110 Message::from_parts(Role::User, parts)
2111 } else {
2112 if !all_image_parts.is_empty() {
2113 tracing::warn!(
2114 count = all_image_parts.len(),
2115 "image attachments dropped: provider does not support vision"
2116 );
2117 }
2118 Message {
2119 role: Role::User,
2120 content: text.to_owned(),
2121 parts: vec![],
2122 metadata: MessageMetadata::default(),
2123 }
2124 }
2125 }
2126
2127 fn drain_background_completions(&mut self) {
2131 const BACKGROUND_COMPLETION_BUFFER_CAP: usize = 16;
2132
2133 let Some(ref mut rx) = self.runtime.lifecycle.background_completion_rx else {
2134 return;
2135 };
2136 while let Ok(completion) = rx.try_recv() {
2138 if self.runtime.lifecycle.pending_background_completions.len()
2139 >= BACKGROUND_COMPLETION_BUFFER_CAP
2140 {
2141 tracing::warn!(
2142 run_id = %completion.run_id,
2143 "background completion buffer full; dropping run result"
2144 );
2145 self.runtime
2148 .lifecycle
2149 .pending_background_completions
2150 .pop_front();
2151 self.runtime
2152 .lifecycle
2153 .pending_background_completions
2154 .push_back(zeph_tools::BackgroundCompletion {
2155 run_id: completion.run_id,
2156 exit_code: -1,
2157 success: false,
2158 elapsed_ms: 0,
2159 command: completion.command,
2160 output: format!(
2161 "[background result for run {} dropped: buffer overflow]",
2162 completion.run_id
2163 ),
2164 });
2165 } else {
2166 self.runtime
2167 .lifecycle
2168 .pending_background_completions
2169 .push_back(completion);
2170 }
2171 }
2172 }
2173
2174 fn build_user_message_text_with_bg_completions(&mut self, user_text: &str) -> String {
2178 if self
2179 .runtime
2180 .lifecycle
2181 .pending_background_completions
2182 .is_empty()
2183 {
2184 return user_text.to_owned();
2185 }
2186 let mut parts = String::new();
2187 for completion in self
2188 .runtime
2189 .lifecycle
2190 .pending_background_completions
2191 .drain(..)
2192 {
2193 let _ = write!(
2194 parts,
2195 "[Background task {} completed]\nexit_code: {}\nsuccess: {}\nelapsed_ms: {}\ncommand: {}\n\n{}\n\n",
2196 completion.run_id,
2197 completion.exit_code,
2198 completion.success,
2199 completion.elapsed_ms,
2200 completion.command,
2201 completion.output,
2202 );
2203 }
2204 parts.push_str(user_text);
2205 parts
2206 }
2207
2208 async fn poll_subagent_until_done(&mut self, task_id: &str, label: &str) -> Option<String> {
2211 use zeph_subagent::SubAgentState;
2212 let result = loop {
2213 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
2214
2215 #[allow(clippy::redundant_closure_for_method_calls)]
2219 let pending = self
2220 .services
2221 .orchestration
2222 .subagent_manager
2223 .as_mut()
2224 .and_then(|m| m.try_recv_secret_request());
2225 if let Some((req_task_id, req)) = pending {
2226 let confirm_prompt = format!(
2229 "Sub-agent requests secret '{}'. Allow?",
2230 crate::text::truncate_to_chars(&req.secret_key, 100)
2231 );
2232 let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
2233 if let Some(mgr) = self.services.orchestration.subagent_manager.as_mut() {
2234 if approved {
2235 let ttl = std::time::Duration::from_mins(5);
2236 let key = req.secret_key.clone();
2237 if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
2238 let _ = mgr.deliver_secret(&req_task_id, key);
2239 }
2240 } else {
2241 let _ = mgr.deny_secret(&req_task_id);
2242 }
2243 }
2244 }
2245
2246 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2247 let statuses = mgr.statuses();
2248 let Some((_, status)) = statuses.iter().find(|(id, _)| id == task_id) else {
2249 break format!("{label} completed (no status available).");
2250 };
2251 match status.state {
2252 SubAgentState::Completed => {
2253 let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
2254 break format!("{label} completed: {msg}");
2255 }
2256 SubAgentState::Failed => {
2257 let msg = status
2258 .last_message
2259 .clone()
2260 .unwrap_or_else(|| "unknown error".into());
2261 break format!("{label} failed: {msg}");
2262 }
2263 SubAgentState::Canceled => {
2264 break format!("{label} was cancelled.");
2265 }
2266 _ => {
2267 let _ = self
2268 .channel
2269 .send_status(&format!(
2270 "{label}: turn {}/{}",
2271 status.turns_used,
2272 self.services
2273 .orchestration
2274 .subagent_manager
2275 .as_ref()
2276 .and_then(|m| m.agents_def(task_id))
2277 .map_or(20, |d| d.permissions.max_turns)
2278 ))
2279 .await;
2280 }
2281 }
2282 };
2283 Some(result)
2284 }
2285
2286 fn resolve_agent_id_prefix(&mut self, prefix: &str) -> Option<Result<String, String>> {
2289 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2290 let full_ids: Vec<String> = mgr
2291 .statuses()
2292 .into_iter()
2293 .map(|(tid, _)| tid)
2294 .filter(|tid| tid.starts_with(prefix))
2295 .collect();
2296 Some(match full_ids.as_slice() {
2297 [] => Err(format!("No sub-agent with id prefix '{prefix}'")),
2298 [fid] => Ok(fid.clone()),
2299 _ => Err(format!(
2300 "Ambiguous id prefix '{prefix}': matches {} agents",
2301 full_ids.len()
2302 )),
2303 })
2304 }
2305
2306 fn handle_agent_list(&self) -> Option<String> {
2307 use std::fmt::Write as _;
2308 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2309 let defs = mgr.definitions();
2310 if defs.is_empty() {
2311 return Some("No sub-agent definitions found.".into());
2312 }
2313 let mut out = String::from("Available sub-agents:\n");
2314 for d in defs {
2315 let memory_label = match d.memory {
2316 Some(zeph_subagent::MemoryScope::User) => " [memory:user]",
2317 Some(zeph_subagent::MemoryScope::Project) => " [memory:project]",
2318 Some(zeph_subagent::MemoryScope::Local) => " [memory:local]",
2319 None => "",
2320 };
2321 if let Some(ref src) = d.source {
2322 let _ = writeln!(
2323 out,
2324 " {}{} — {} ({})",
2325 d.name, memory_label, d.description, src
2326 );
2327 } else {
2328 let _ = writeln!(out, " {}{} — {}", d.name, memory_label, d.description);
2329 }
2330 }
2331 Some(out)
2332 }
2333
2334 fn handle_agent_status(&self) -> Option<String> {
2335 use std::fmt::Write as _;
2336 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2337 let statuses = mgr.statuses();
2338 if statuses.is_empty() {
2339 return Some("No active sub-agents.".into());
2340 }
2341 let mut out = String::from("Active sub-agents:\n");
2342 for (id, s) in &statuses {
2343 let state = format!("{:?}", s.state).to_lowercase();
2344 let elapsed = s.started_at.elapsed().as_secs();
2345 let _ = writeln!(
2346 out,
2347 " [{short}] {state} turns={t} elapsed={elapsed}s {msg}",
2348 short = &id[..8.min(id.len())],
2349 t = s.turns_used,
2350 msg = s.last_message.as_deref().unwrap_or(""),
2351 );
2352 if let Some(def) = mgr.agents_def(id)
2354 && let Some(scope) = def.memory
2355 && let Ok(dir) = zeph_subagent::memory::resolve_memory_dir(scope, &def.name)
2356 {
2357 let _ = writeln!(out, " memory: {}", dir.display());
2358 }
2359 }
2360 Some(out)
2361 }
2362
2363 fn handle_agent_approve(&mut self, id: &str) -> Option<String> {
2364 let full_id = match self.resolve_agent_id_prefix(id)? {
2365 Ok(fid) => fid,
2366 Err(msg) => return Some(msg),
2367 };
2368 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2369 if let Some((tid, req)) = mgr.try_recv_secret_request()
2370 && tid == full_id
2371 {
2372 let key = req.secret_key.clone();
2373 let ttl = std::time::Duration::from_mins(5);
2374 if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
2375 return Some(format!("Approve failed: {e}"));
2376 }
2377 if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
2378 return Some(format!("Secret delivery failed: {e}"));
2379 }
2380 return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
2381 }
2382 Some(format!(
2383 "No pending secret request for sub-agent '{full_id}'."
2384 ))
2385 }
2386
2387 fn handle_agent_deny(&mut self, id: &str) -> Option<String> {
2388 let full_id = match self.resolve_agent_id_prefix(id)? {
2389 Ok(fid) => fid,
2390 Err(msg) => return Some(msg),
2391 };
2392 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2393 match mgr.deny_secret(&full_id) {
2394 Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
2395 Err(e) => Some(format!("Deny failed: {e}")),
2396 }
2397 }
2398
2399 async fn handle_agent_command(&mut self, cmd: zeph_subagent::AgentCommand) -> Option<String> {
2400 use zeph_subagent::AgentCommand;
2401
2402 match cmd {
2403 AgentCommand::List => self.handle_agent_list(),
2404 AgentCommand::Background { name, prompt } => {
2405 self.handle_agent_background(&name, &prompt)
2406 }
2407 AgentCommand::Spawn { name, prompt }
2408 | AgentCommand::Mention {
2409 agent: name,
2410 prompt,
2411 } => self.handle_agent_spawn_foreground(&name, &prompt).await,
2412 AgentCommand::Status => self.handle_agent_status(),
2413 AgentCommand::Cancel { id } => self.handle_agent_cancel(&id),
2414 AgentCommand::Approve { id } => self.handle_agent_approve(&id),
2415 AgentCommand::Deny { id } => self.handle_agent_deny(&id),
2416 AgentCommand::Resume { id, prompt } => self.handle_agent_resume(&id, &prompt).await,
2417 }
2418 }
2419
2420 fn handle_agent_background(&mut self, name: &str, prompt: &str) -> Option<String> {
2421 let provider = self.provider.clone();
2422 let tool_executor = Arc::clone(&self.tool_executor);
2423 let skills = self.filtered_skills_for(name);
2424 let cfg = self.services.orchestration.subagent_config.clone();
2425 let spawn_ctx = self.build_spawn_context(&cfg);
2426 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2427 match mgr.spawn(
2428 name,
2429 prompt,
2430 provider,
2431 tool_executor,
2432 skills,
2433 &cfg,
2434 spawn_ctx,
2435 ) {
2436 Ok(id) => Some(format!(
2437 "Sub-agent '{name}' started in background (id: {short})",
2438 short = &id[..8.min(id.len())]
2439 )),
2440 Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
2441 }
2442 }
2443
2444 async fn handle_agent_spawn_foreground(&mut self, name: &str, prompt: &str) -> Option<String> {
2445 let provider = self.provider.clone();
2446 let tool_executor = Arc::clone(&self.tool_executor);
2447 let skills = self.filtered_skills_for(name);
2448 let cfg = self.services.orchestration.subagent_config.clone();
2449 let spawn_ctx = self.build_spawn_context(&cfg);
2450 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2451 let task_id = match mgr.spawn(
2452 name,
2453 prompt,
2454 provider,
2455 tool_executor,
2456 skills,
2457 &cfg,
2458 spawn_ctx,
2459 ) {
2460 Ok(id) => id,
2461 Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
2462 };
2463 let short = task_id[..8.min(task_id.len())].to_owned();
2464 let _ = self
2465 .channel
2466 .send(&format!("Sub-agent '{name}' running... (id: {short})"))
2467 .await;
2468 let label = format!("Sub-agent '{name}'");
2469 self.poll_subagent_until_done(&task_id, &label).await
2470 }
2471
2472 fn handle_agent_cancel(&mut self, id: &str) -> Option<String> {
2473 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2474 let ids: Vec<String> = mgr
2476 .statuses()
2477 .into_iter()
2478 .map(|(task_id, _)| task_id)
2479 .filter(|task_id| task_id.starts_with(id))
2480 .collect();
2481 match ids.as_slice() {
2482 [] => Some(format!("No sub-agent with id prefix '{id}'")),
2483 [full_id] => {
2484 let full_id = full_id.clone();
2485 match mgr.cancel(&full_id) {
2486 Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
2487 Err(e) => Some(format!("Cancel failed: {e}")),
2488 }
2489 }
2490 _ => Some(format!(
2491 "Ambiguous id prefix '{id}': matches {} agents",
2492 ids.len()
2493 )),
2494 }
2495 }
2496
2497 async fn handle_agent_resume(&mut self, id: &str, prompt: &str) -> Option<String> {
2498 let cfg = self.services.orchestration.subagent_config.clone();
2499 let def_name = {
2502 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2503 match mgr.def_name_for_resume(id, &cfg) {
2504 Ok(name) => name,
2505 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2506 }
2507 };
2508 let skills = self.filtered_skills_for(&def_name);
2509 let provider = self.provider.clone();
2510 let tool_executor = Arc::clone(&self.tool_executor);
2511 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2512 let (task_id, _) = match mgr.resume(id, prompt, provider, tool_executor, skills, &cfg) {
2513 Ok(pair) => pair,
2514 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2515 };
2516 let short = task_id[..8.min(task_id.len())].to_owned();
2517 let _ = self
2518 .channel
2519 .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
2520 .await;
2521 self.poll_subagent_until_done(&task_id, "Resumed sub-agent")
2522 .await
2523 }
2524
2525 fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
2526 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2527 let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
2528 let reg = self.services.skill.registry.read();
2529 match zeph_subagent::filter_skills(®, &def.skills) {
2530 Ok(skills) => {
2531 let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
2532 if bodies.is_empty() {
2533 None
2534 } else {
2535 Some(bodies)
2536 }
2537 }
2538 Err(e) => {
2539 tracing::warn!(error = %e, "skill filtering failed for sub-agent");
2540 None
2541 }
2542 }
2543 }
2544
2545 fn build_spawn_context(
2547 &self,
2548 cfg: &zeph_config::SubAgentConfig,
2549 ) -> zeph_subagent::SpawnContext {
2550 zeph_subagent::SpawnContext {
2551 parent_messages: self.extract_parent_messages(cfg),
2552 parent_cancel: Some(self.runtime.lifecycle.cancel_token.clone()),
2553 parent_provider_name: {
2554 let name = &self.runtime.config.active_provider_name;
2555 if name.is_empty() {
2556 None
2557 } else {
2558 Some(name.clone())
2559 }
2560 },
2561 spawn_depth: self.runtime.config.spawn_depth,
2562 mcp_tool_names: self.extract_mcp_tool_names(),
2563 seed_trajectory_score: {
2565 let child = self.services.security.trajectory.spawn_child();
2566 let score = child.score_now();
2567 if score > 0.0 { Some(score) } else { None }
2568 },
2569 }
2570 }
2571
2572 fn extract_parent_messages(
2577 &self,
2578 config: &zeph_config::SubAgentConfig,
2579 ) -> Vec<zeph_llm::provider::Message> {
2580 use zeph_llm::provider::Role;
2581 if config.context_window_turns == 0 {
2582 return Vec::new();
2583 }
2584 let non_system: Vec<_> = self
2585 .msg
2586 .messages
2587 .iter()
2588 .filter(|m| m.role != Role::System)
2589 .cloned()
2590 .collect();
2591 let take_count = config.context_window_turns * 2;
2592 let start = non_system.len().saturating_sub(take_count);
2593 let mut msgs = non_system[start..].to_vec();
2594
2595 let max_chars = 128_000usize / 4; let mut total_chars: usize = 0;
2598 let mut keep = msgs.len();
2599 for (i, m) in msgs.iter().enumerate() {
2600 total_chars += m.content.len();
2601 if total_chars > max_chars {
2602 keep = i;
2603 break;
2604 }
2605 }
2606 if keep < msgs.len() {
2607 tracing::info!(
2608 kept = keep,
2609 requested = config.context_window_turns * 2,
2610 "[subagent] truncated parent history from {} to {} turns due to token budget",
2611 config.context_window_turns * 2,
2612 keep
2613 );
2614 msgs.truncate(keep);
2615 }
2616 msgs
2617 }
2618
2619 fn extract_mcp_tool_names(&self) -> Vec<String> {
2621 self.tool_executor
2622 .tool_definitions_erased()
2623 .into_iter()
2624 .filter(|t| t.id.starts_with("mcp_"))
2625 .map(|t| t.id.to_string())
2626 .collect()
2627 }
2628
2629 fn classify_source_kind(
2633 skill_dir: &std::path::Path,
2634 managed_dir: Option<&std::path::PathBuf>,
2635 bundled_names: &std::collections::HashSet<String>,
2636 ) -> zeph_memory::store::SourceKind {
2637 if managed_dir.is_some_and(|d| skill_dir.starts_with(d)) {
2638 let skill_name = skill_dir.file_name().and_then(|n| n.to_str()).unwrap_or("");
2639 let has_marker = skill_dir.join(".bundled").exists();
2640 if has_marker && bundled_names.contains(skill_name) {
2641 zeph_memory::store::SourceKind::Bundled
2642 } else {
2643 if has_marker {
2644 tracing::warn!(
2645 skill = %skill_name,
2646 "skill has .bundled marker but is not in the bundled skill \
2647 allowlist — classifying as Hub"
2648 );
2649 }
2650 zeph_memory::store::SourceKind::Hub
2651 }
2652 } else {
2653 zeph_memory::store::SourceKind::Local
2654 }
2655 }
2656
2657 async fn update_trust_for_reloaded_skills(
2659 &mut self,
2660 all_meta: &[zeph_skills::loader::SkillMeta],
2661 ) {
2662 let memory = self.services.memory.persistence.memory.clone();
2664 let Some(memory) = memory else {
2665 return;
2666 };
2667 let trust_cfg = self.services.skill.trust_config.clone();
2668 let managed_dir = self.services.skill.managed_dir.clone();
2669 let bundled_names: std::collections::HashSet<String> =
2670 zeph_skills::bundled_skill_names().into_iter().collect();
2671 for meta in all_meta {
2672 let skill_dir = meta.skill_dir.clone();
2675 let managed_dir_ref = managed_dir.clone();
2676 let bundled_names_ref = bundled_names.clone();
2677 let fs_result: Option<(String, zeph_memory::store::SourceKind)> =
2678 tokio::task::spawn_blocking(move || {
2679 let hash = zeph_skills::compute_skill_hash(&skill_dir).ok()?;
2680 let source_kind = Self::classify_source_kind(
2681 &skill_dir,
2682 managed_dir_ref.as_ref(),
2683 &bundled_names_ref,
2684 );
2685 Some((hash, source_kind))
2686 })
2687 .await
2688 .unwrap_or(None);
2689
2690 let Some((current_hash, source_kind)) = fs_result else {
2691 tracing::warn!("failed to compute hash for '{}'", meta.name);
2692 continue;
2693 };
2694 let initial_level = match source_kind {
2695 zeph_memory::store::SourceKind::Bundled => &trust_cfg.bundled_level,
2696 zeph_memory::store::SourceKind::Hub => &trust_cfg.default_level,
2697 zeph_memory::store::SourceKind::Local | zeph_memory::store::SourceKind::File => {
2698 &trust_cfg.local_level
2699 }
2700 };
2701 let existing = memory
2702 .sqlite()
2703 .load_skill_trust(&meta.name)
2704 .await
2705 .ok()
2706 .flatten();
2707 let trust_level_str = if let Some(ref row) = existing {
2708 if row.blake3_hash != current_hash {
2709 trust_cfg.hash_mismatch_level.to_string()
2710 } else if row.source_kind != source_kind {
2711 let stored = row
2715 .trust_level
2716 .parse::<zeph_common::SkillTrustLevel>()
2717 .unwrap_or_else(|_| {
2718 tracing::warn!(
2719 skill = %meta.name,
2720 raw = %row.trust_level,
2721 "unrecognised trust_level in DB, treating as quarantined"
2722 );
2723 zeph_common::SkillTrustLevel::Quarantined
2724 });
2725 if !stored.is_active() || stored.severity() <= initial_level.severity() {
2726 row.trust_level.clone()
2727 } else {
2728 initial_level.to_string()
2729 }
2730 } else {
2731 row.trust_level.clone()
2732 }
2733 } else {
2734 initial_level.to_string()
2735 };
2736 let source_path = meta.skill_dir.to_str();
2737 if let Err(e) = memory
2738 .sqlite()
2739 .upsert_skill_trust(
2740 &meta.name,
2741 &trust_level_str,
2742 source_kind,
2743 None,
2744 source_path,
2745 ¤t_hash,
2746 )
2747 .await
2748 {
2749 tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
2750 }
2751 }
2752 }
2753
2754 async fn rebuild_skill_matcher(&mut self, all_meta: &[&zeph_skills::loader::SkillMeta]) {
2756 let provider = self.embedding_provider.clone();
2757 let embed_timeout =
2758 std::time::Duration::from_secs(self.runtime.config.timeouts.embedding_seconds);
2759 let embed_fn = move |text: &str| -> zeph_skills::matcher::EmbedFuture {
2760 let owned = text.to_owned();
2761 let p = provider.clone();
2762 Box::pin(async move {
2763 if let Ok(result) = tokio::time::timeout(embed_timeout, p.embed(&owned)).await {
2764 result
2765 } else {
2766 tracing::warn!(
2767 timeout_secs = embed_timeout.as_secs(),
2768 "skill matcher: embedding timed out"
2769 );
2770 Err(zeph_llm::LlmError::Timeout)
2771 }
2772 })
2773 };
2774
2775 let needs_inmemory_rebuild = !self
2776 .services
2777 .skill
2778 .matcher
2779 .as_ref()
2780 .is_some_and(SkillMatcherBackend::is_qdrant);
2781
2782 if needs_inmemory_rebuild {
2783 self.services.skill.matcher = SkillMatcher::new(all_meta, embed_fn)
2784 .await
2785 .map(SkillMatcherBackend::InMemory);
2786 } else if let Some(ref mut backend) = self.services.skill.matcher {
2787 let _ = self.channel.send_status("syncing skill index...").await;
2788 let on_progress: Option<Box<dyn Fn(usize, usize) + Send>> =
2789 self.services.session.status_tx.clone().map(
2790 |tx| -> Box<dyn Fn(usize, usize) + Send> {
2791 Box::new(move |completed, total| {
2792 let msg = format!("Syncing skills: {completed}/{total}");
2793 let _ = tx.send(msg);
2794 })
2795 },
2796 );
2797 if let Err(e) = backend
2798 .sync(
2799 all_meta,
2800 &self.services.skill.embedding_model,
2801 embed_fn,
2802 on_progress,
2803 )
2804 .await
2805 {
2806 tracing::warn!("failed to sync skill embeddings: {e:#}");
2807 }
2808 }
2809
2810 if self.services.skill.hybrid_search {
2811 let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
2812 let _ = self.channel.send_status("rebuilding search index...").await;
2813 self.services.skill.rebuild_bm25(&descs);
2814 }
2815 }
2816
2817 #[tracing::instrument(name = "core.agent.reload_skills", skip_all, level = "debug")]
2818 async fn reload_skills(&mut self) {
2819 let old_fp = self.services.skill.fingerprint();
2820 let reload_paths = if let Some(ref supplier) = self.services.skill.plugin_dirs_supplier {
2821 let plugin_dirs = supplier();
2822 let mut paths = self.services.skill.skill_paths.clone();
2823 for dir in plugin_dirs {
2824 if !paths.contains(&dir) {
2825 paths.push(dir);
2826 }
2827 }
2828 paths
2829 } else {
2830 self.services.skill.skill_paths.clone()
2831 };
2832 self.services.skill.registry.write().reload(&reload_paths);
2833 if self.services.skill.fingerprint() == old_fp {
2834 return;
2835 }
2836 let _ = self.channel.send_status("reloading skills...").await;
2837
2838 let all_meta = self
2839 .services
2840 .skill
2841 .registry
2842 .read()
2843 .all_meta()
2844 .into_iter()
2845 .cloned()
2846 .collect::<Vec<_>>();
2847
2848 self.update_trust_for_reloaded_skills(&all_meta).await;
2849
2850 let all_meta_refs = all_meta.iter().collect::<Vec<_>>();
2851 self.rebuild_skill_matcher(&all_meta_refs).await;
2852
2853 let all_skills: Vec<Skill> = {
2854 let reg = self.services.skill.registry.read();
2855 reg.all_meta()
2856 .iter()
2857 .filter_map(|m| reg.skill(&m.name).ok())
2858 .collect()
2859 };
2860 let trust_map = self.build_skill_trust_map().await;
2861 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
2862 let skills_prompt =
2863 state::SkillState::rebuild_prompt(&all_skills, &trust_map, &empty_health);
2864 self.services
2865 .skill
2866 .last_skills_prompt
2867 .clone_from(&skills_prompt);
2868 let system_prompt = build_system_prompt(&skills_prompt, None);
2869 if let Some(msg) = self.msg.messages.first_mut() {
2870 msg.content = system_prompt;
2871 }
2872
2873 let _ = self.channel.send_status("").await;
2874 tracing::info!(
2875 "reloaded {} skill(s)",
2876 self.services.skill.registry.read().all_meta().len()
2877 );
2878 }
2879
2880 fn reload_instructions(&mut self) {
2881 if let Some(ref mut rx) = self.runtime.instructions.reload_rx {
2883 while rx.try_recv().is_ok() {}
2884 }
2885 let Some(ref state) = self.runtime.instructions.reload_state else {
2886 return;
2887 };
2888 let new_blocks = crate::instructions::load_instructions(
2889 &state.base_dir,
2890 &state.provider_kinds,
2891 &state.explicit_files,
2892 state.auto_detect,
2893 );
2894 let old_sources: std::collections::HashSet<_> = self
2895 .runtime
2896 .instructions
2897 .blocks
2898 .iter()
2899 .map(|b| &b.source)
2900 .collect();
2901 let new_sources: std::collections::HashSet<_> =
2902 new_blocks.iter().map(|b| &b.source).collect();
2903 for added in new_sources.difference(&old_sources) {
2904 tracing::info!(path = %added.display(), "instruction file added");
2905 }
2906 for removed in old_sources.difference(&new_sources) {
2907 tracing::info!(path = %removed.display(), "instruction file removed");
2908 }
2909 tracing::info!(
2910 old_count = self.runtime.instructions.blocks.len(),
2911 new_count = new_blocks.len(),
2912 "reloaded instruction files"
2913 );
2914 self.runtime.instructions.blocks = new_blocks;
2915 }
2916
2917 fn reload_config(&mut self) {
2918 let Some(path) = self.runtime.lifecycle.config_path.clone() else {
2919 return;
2920 };
2921 let Some(config) = self.load_config_with_overlay(&path) else {
2922 return;
2923 };
2924 let budget_tokens = resolve_context_budget(&config, &self.provider);
2925 self.runtime.config.security = config.security;
2926 self.runtime.config.timeouts = config.timeouts;
2927 self.runtime.config.redact_credentials = config.memory.redact_credentials;
2928 self.services.memory.persistence.history_limit = config.memory.history_limit;
2929 self.services.memory.persistence.recall_limit = config.memory.semantic.recall_limit;
2930 self.services.memory.compaction.summarization_threshold =
2931 config.memory.summarization_threshold;
2932 self.services.skill.max_active_skills = config.skills.max_active_skills.get();
2933 self.services.skill.disambiguation_threshold = config.skills.disambiguation_threshold;
2934 self.services.skill.min_injection_score = config.skills.min_injection_score;
2935 self.services.skill.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
2936 self.services.skill.hybrid_search = config.skills.hybrid_search;
2937 self.services.skill.two_stage_matching = config.skills.two_stage_matching;
2938 self.services.skill.confusability_threshold =
2939 config.skills.confusability_threshold.clamp(0.0, 1.0);
2940 config
2941 .skills
2942 .generation_provider
2943 .as_str()
2944 .clone_into(&mut self.services.skill.generation_provider_name);
2945 self.services.skill.generation_output_dir =
2946 config.skills.generation_output_dir.as_deref().map(|p| {
2947 if let Some(stripped) = p.strip_prefix("~/") {
2948 dirs::home_dir()
2949 .map_or_else(|| std::path::PathBuf::from(p), |h| h.join(stripped))
2950 } else {
2951 std::path::PathBuf::from(p)
2952 }
2953 });
2954
2955 self.context_manager.budget = Some(
2956 ContextBudget::new(budget_tokens, 0.20).with_graph_enabled(config.memory.graph.enabled),
2957 );
2958
2959 {
2960 let graph_cfg = &config.memory.graph;
2961 if graph_cfg.rpe.enabled {
2962 if self.services.memory.extraction.rpe_router.is_none() {
2964 self.services.memory.extraction.rpe_router =
2965 Some(std::sync::Mutex::new(zeph_memory::RpeRouter::new(
2966 graph_cfg.rpe.threshold,
2967 graph_cfg.rpe.max_skip_turns,
2968 )));
2969 }
2970 } else {
2971 self.services.memory.extraction.rpe_router = None;
2972 }
2973 self.services.memory.extraction.graph_config = graph_cfg.clone();
2974 }
2975 self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
2976 self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
2977 self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
2978 self.context_manager.compaction_cooldown_turns = config.memory.compaction_cooldown_turns;
2979 self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
2980 self.context_manager.compression = config.memory.compression.clone();
2981 self.context_manager.routing = config.memory.store_routing.clone();
2982 self.context_manager.store_routing_provider = if config
2984 .memory
2985 .store_routing
2986 .routing_classifier_provider
2987 .is_empty()
2988 {
2989 None
2990 } else {
2991 let resolved = self.resolve_background_provider(
2992 &config.memory.store_routing.routing_classifier_provider,
2993 );
2994 Some(std::sync::Arc::new(resolved))
2995 };
2996 self.services
2997 .memory
2998 .persistence
2999 .cross_session_score_threshold = config.memory.cross_session_score_threshold;
3000
3001 self.services.index.repo_map_tokens = config.index.repo_map_tokens;
3002 self.services.index.repo_map_ttl =
3003 std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
3004
3005 self.services
3006 .session
3007 .hooks_config
3008 .cwd_changed
3009 .clone_from(&config.hooks.cwd_changed);
3010 self.services
3011 .session
3012 .hooks_config
3013 .permission_denied
3014 .clone_from(&config.hooks.permission_denied);
3015 self.services
3016 .session
3017 .hooks_config
3018 .turn_complete
3019 .clone_from(&config.hooks.turn_complete);
3020 tracing::info!("config reloaded");
3023 }
3024
3025 fn load_config_with_overlay(&mut self, path: &std::path::Path) -> Option<Config> {
3029 let mut config = match Config::load(path) {
3030 Ok(c) => c,
3031 Err(e) => {
3032 tracing::warn!("config reload failed: {e:#}");
3033 return None;
3034 }
3035 };
3036
3037 let new_overlay = if self.runtime.lifecycle.plugins_dir.as_os_str().is_empty() {
3039 None
3040 } else {
3041 match zeph_plugins::apply_plugin_config_overlays(
3042 &mut config,
3043 &self.runtime.lifecycle.plugins_dir,
3044 ) {
3045 Ok(o) => Some(o),
3046 Err(e) => {
3047 tracing::warn!(
3048 "plugin overlay merge failed during reload: {e:#}; \
3049 keeping previous runtime state"
3050 );
3051 return None;
3052 }
3053 }
3054 };
3055
3056 if let Some(ref overlay) = new_overlay {
3060 self.warn_on_shell_overlay_divergence(overlay, &config);
3061 }
3062 Some(config)
3063 }
3064
3065 fn warn_on_shell_overlay_divergence(
3071 &self,
3072 new_overlay: &zeph_plugins::ResolvedOverlay,
3073 config: &Config,
3074 ) {
3075 let new_blocked: Vec<String> = {
3076 let mut v = config.tools.shell.blocked_commands.clone();
3077 v.sort();
3078 v
3079 };
3080 let new_allowed: Vec<String> = {
3081 let mut v = config.tools.shell.allowed_commands.clone();
3082 v.sort();
3083 v
3084 };
3085
3086 let startup = &self.runtime.lifecycle.startup_shell_overlay;
3087 let blocked_changed = new_blocked != startup.blocked;
3088 let allowed_changed = new_allowed != startup.allowed;
3089
3090 if blocked_changed && let Some(ref h) = self.runtime.lifecycle.shell_policy_handle {
3092 h.rebuild(&config.tools.shell);
3093 tracing::info!(
3094 blocked_count = h.snapshot_blocked().len(),
3095 "shell blocked_commands rebuilt from hot-reload"
3096 );
3097 }
3098
3099 if allowed_changed {
3106 let msg = "plugin config overlay changed shell allowed_commands; RESTART REQUIRED \
3107 for sandbox path recomputation (blocked_commands was rebuilt live)";
3108 tracing::warn!("{msg}");
3109 if let Some(ref tx) = self.services.session.status_tx {
3110 let _ = tx.send(msg.to_owned());
3111 }
3112 }
3113
3114 let _ = new_overlay;
3115 }
3116
3117 fn maybe_sidequest_eviction(&mut self) {
3128 if self.services.sidequest.config.enabled {
3132 use crate::config::PruningStrategy;
3133 if !matches!(
3134 self.context_manager.compression.pruning_strategy,
3135 PruningStrategy::Reactive
3136 ) {
3137 tracing::warn!(
3138 strategy = ?self.context_manager.compression.pruning_strategy,
3139 "sidequest is enabled alongside a non-Reactive pruning strategy; \
3140 consider disabling sidequest.enabled to avoid redundant eviction"
3141 );
3142 }
3143 }
3144
3145 if self.services.focus.is_active() {
3147 tracing::debug!("sidequest: skipping — focus session active");
3148 self.services.compression.pending_sidequest_result = None;
3150 return;
3151 }
3152
3153 self.sidequest_apply_pending();
3155
3156 self.sidequest_schedule_next();
3158 }
3159
3160 fn sidequest_apply_pending(&mut self) {
3161 let Some(handle) = self.services.compression.pending_sidequest_result.take() else {
3162 return;
3163 };
3164 let result = match handle.try_join() {
3167 Ok(result) => result,
3168 Err(_handle) => {
3169 tracing::debug!("sidequest: background LLM task not yet complete, rescheduling");
3171 return;
3172 }
3173 };
3174 match result {
3175 Ok(Some(evicted_indices)) if !evicted_indices.is_empty() => {
3176 let cursors_snapshot = self.services.sidequest.tool_output_cursors.clone();
3177 let freed = self.services.sidequest.apply_eviction(
3178 &mut self.msg.messages,
3179 &evicted_indices,
3180 &self.runtime.metrics.token_counter,
3181 );
3182 if freed > 0 {
3183 self.recompute_prompt_tokens();
3184 self.context_manager.compaction =
3187 crate::agent::context_manager::CompactionState::CompactedThisTurn {
3188 cooldown: 0,
3189 };
3190 tracing::info!(
3191 freed_tokens = freed,
3192 evicted_cursors = evicted_indices.len(),
3193 pass = self.services.sidequest.passes_run,
3194 "sidequest eviction complete"
3195 );
3196 if let Some(ref d) = self.runtime.debug.debug_dumper {
3197 d.dump_sidequest_eviction(&cursors_snapshot, &evicted_indices, freed);
3198 }
3199 if let Some(ref tx) = self.services.session.status_tx {
3200 let _ = tx.send(format!("SideQuest evicted {freed} tokens"));
3201 }
3202 } else {
3203 if let Some(ref tx) = self.services.session.status_tx {
3205 let _ = tx.send(String::new());
3206 }
3207 }
3208 }
3209 Ok(None | Some(_)) => {
3210 tracing::debug!("sidequest: pending result: no cursors to evict");
3211 if let Some(ref tx) = self.services.session.status_tx {
3212 let _ = tx.send(String::new());
3213 }
3214 }
3215 Err(e) => {
3216 tracing::debug!("sidequest: background task error: {e}");
3217 if let Some(ref tx) = self.services.session.status_tx {
3218 let _ = tx.send(String::new());
3219 }
3220 }
3221 }
3222 }
3223
3224 fn sidequest_schedule_next(&mut self) {
3225 use zeph_llm::provider::{Message, MessageMetadata, Role};
3226
3227 self.services
3228 .sidequest
3229 .rebuild_cursors(&self.msg.messages, &self.runtime.metrics.token_counter);
3230
3231 if self.services.sidequest.tool_output_cursors.is_empty() {
3232 tracing::debug!("sidequest: no eligible cursors");
3233 return;
3234 }
3235
3236 let prompt = self.services.sidequest.build_eviction_prompt();
3237 let max_eviction_ratio = self.services.sidequest.config.max_eviction_ratio;
3238 let n_cursors = self.services.sidequest.tool_output_cursors.len();
3239 let provider = self.summary_or_primary_provider().clone();
3241
3242 let eviction_future = async move {
3243 let msgs = [Message {
3244 role: Role::User,
3245 content: prompt,
3246 parts: vec![],
3247 metadata: MessageMetadata::default(),
3248 }];
3249 let response =
3250 match tokio::time::timeout(std::time::Duration::from_secs(5), provider.chat(&msgs))
3251 .await
3252 {
3253 Ok(Ok(r)) => r,
3254 Ok(Err(e)) => {
3255 tracing::debug!("sidequest bg: LLM call failed: {e:#}");
3256 return None;
3257 }
3258 Err(_) => {
3259 tracing::debug!("sidequest bg: LLM call timed out");
3260 return None;
3261 }
3262 };
3263
3264 let start = response.find('{')?;
3265 let end = response.rfind('}')?;
3266 if start > end {
3267 return None;
3268 }
3269 let json_slice = &response[start..=end];
3270 let parsed: sidequest::EvictionResponse = serde_json::from_str(json_slice).ok()?;
3271 let mut valid: Vec<usize> = parsed
3272 .del_cursors
3273 .into_iter()
3274 .filter(|&c| c < n_cursors)
3275 .collect();
3276 valid.sort_unstable();
3277 valid.dedup();
3278 #[allow(
3279 clippy::cast_precision_loss,
3280 clippy::cast_possible_truncation,
3281 clippy::cast_sign_loss
3282 )]
3283 let max_evict = ((n_cursors as f32) * max_eviction_ratio).ceil() as usize;
3284 valid.truncate(max_evict);
3285 Some(valid)
3286 };
3287 let handle = self.runtime.lifecycle.task_supervisor.spawn_oneshot(
3288 std::sync::Arc::from("agent.sidequest.eviction"),
3289 move || eviction_future,
3290 );
3291 self.services.compression.pending_sidequest_result = Some(handle);
3292 tracing::debug!("sidequest: background LLM eviction task spawned");
3293 if let Some(ref tx) = self.services.session.status_tx {
3294 let _ = tx.send("SideQuest: scoring tool outputs...".into());
3295 }
3296 }
3297
3298 fn mcp_dispatch(&self) -> Option<McpManagerDispatch> {
3300 self.services
3301 .mcp
3302 .manager
3303 .as_ref()
3304 .map(|m| McpManagerDispatch(Arc::clone(m)))
3305 }
3306
3307 pub(crate) async fn check_cwd_changed(&mut self) {
3313 let current = match std::env::current_dir() {
3314 Ok(p) => p,
3315 Err(e) => {
3316 tracing::warn!("check_cwd_changed: failed to get cwd: {e}");
3317 return;
3318 }
3319 };
3320 if current == self.runtime.lifecycle.last_known_cwd {
3321 return;
3322 }
3323 let old_cwd =
3324 std::mem::replace(&mut self.runtime.lifecycle.last_known_cwd, current.clone());
3325 self.services.session.env_context.working_dir = current.display().to_string();
3326
3327 tracing::info!(
3328 old = %old_cwd.display(),
3329 new = %current.display(),
3330 "working directory changed"
3331 );
3332
3333 let _ = self
3334 .channel
3335 .send_status("Working directory changed\u{2026}")
3336 .await;
3337
3338 let hooks = self.services.session.hooks_config.cwd_changed.clone();
3339 if hooks.is_empty() {
3340 tracing::debug!("CwdChanged: no hooks configured, skipping");
3341 } else {
3342 tracing::debug!(count = hooks.len(), "CwdChanged: firing hooks");
3343 let mut env = std::collections::HashMap::new();
3344 env.insert("ZEPH_OLD_CWD".to_owned(), old_cwd.display().to_string());
3345 env.insert("ZEPH_NEW_CWD".to_owned(), current.display().to_string());
3346 let dispatch = self.mcp_dispatch();
3347 let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
3348 .as_ref()
3349 .map(|d| d as &dyn zeph_subagent::McpDispatch);
3350 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp).await {
3351 tracing::warn!(error = %e, "CwdChanged hook failed");
3352 } else {
3353 tracing::info!(count = hooks.len(), "CwdChanged: hooks fired");
3354 }
3355 }
3356
3357 let _ = self.channel.send_status("").await;
3358 }
3359
3360 pub(crate) async fn handle_file_changed(
3362 &mut self,
3363 event: crate::file_watcher::FileChangedEvent,
3364 ) {
3365 tracing::info!(path = %event.path.display(), "file changed");
3366
3367 let _ = self
3368 .channel
3369 .send_status("Running file-change hook\u{2026}")
3370 .await;
3371
3372 let hooks = self
3373 .services
3374 .session
3375 .hooks_config
3376 .file_changed_hooks
3377 .clone();
3378 if hooks.is_empty() {
3379 tracing::debug!(path = %event.path.display(), "FileChanged: no hooks configured, skipping");
3380 } else {
3381 tracing::debug!(count = hooks.len(), path = %event.path.display(), "FileChanged: firing hooks");
3382 let mut env = std::collections::HashMap::new();
3383 env.insert(
3384 "ZEPH_CHANGED_PATH".to_owned(),
3385 event.path.display().to_string(),
3386 );
3387 let dispatch = self.mcp_dispatch();
3388 let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
3389 .as_ref()
3390 .map(|d| d as &dyn zeph_subagent::McpDispatch);
3391 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp).await {
3392 tracing::warn!(error = %e, "FileChanged hook failed");
3393 } else {
3394 tracing::info!(count = hooks.len(), path = %event.path.display(), "FileChanged: hooks fired");
3395 }
3396 }
3397
3398 let _ = self.channel.send_status("").await;
3399 }
3400
3401 pub(super) fn maybe_spawn_promotion_scan(&mut self) {
3411 let Some(engine) = self.services.promotion_engine.clone() else {
3412 return;
3413 };
3414
3415 let Some(memory) = self.services.memory.persistence.memory.clone() else {
3416 return;
3417 };
3418
3419 let promotion_window = 200usize;
3422
3423 let accepted = self.runtime.lifecycle.supervisor.spawn(
3424 agent_supervisor::TaskClass::Enrichment,
3425 "compression_spectrum.promotion_scan",
3426 async move {
3427 let span = tracing::info_span!("memory.compression.promote.background");
3428 let _enter = span.enter();
3429
3430 let window = match memory.load_promotion_window(promotion_window).await {
3431 Ok(w) => w,
3432 Err(e) => {
3433 tracing::warn!(error = %e, "promotion scan: failed to load window");
3434 return;
3435 }
3436 };
3437
3438 if window.is_empty() {
3439 return;
3440 }
3441
3442 let candidates = match engine.scan(&window).await {
3443 Ok(c) => c,
3444 Err(e) => {
3445 tracing::warn!(error = %e, "promotion scan: clustering failed");
3446 return;
3447 }
3448 };
3449
3450 for candidate in &candidates {
3451 if let Err(e) = engine.promote(candidate).await {
3452 tracing::warn!(
3453 signature = %candidate.signature,
3454 error = %e,
3455 "promotion scan: promote failed"
3456 );
3457 }
3458 }
3459
3460 tracing::info!(candidates = candidates.len(), "promotion scan: complete");
3461 },
3462 );
3463
3464 if accepted {
3465 tracing::debug!("compression_spectrum: promotion scan task enqueued");
3466 }
3467 }
3468}
3469struct McpManagerDispatch(Arc<zeph_mcp::McpManager>);
3474
3475impl zeph_subagent::McpDispatch for McpManagerDispatch {
3476 fn call_tool<'a>(
3477 &'a self,
3478 server: &'a str,
3479 tool: &'a str,
3480 args: serde_json::Value,
3481 ) -> std::pin::Pin<
3482 Box<dyn std::future::Future<Output = Result<serde_json::Value, String>> + Send + 'a>,
3483 > {
3484 Box::pin(async move {
3485 self.0
3486 .call_tool(server, tool, args)
3487 .await
3488 .map(|result| {
3489 let texts: Vec<serde_json::Value> = result
3491 .content
3492 .iter()
3493 .filter_map(|c| {
3494 if let rmcp::model::RawContent::Text(t) = &c.raw {
3495 Some(serde_json::Value::String(t.text.clone()))
3496 } else {
3497 None
3498 }
3499 })
3500 .collect();
3501 serde_json::Value::Array(texts)
3502 })
3503 .map_err(|e| e.to_string())
3504 })
3505 }
3506}
3507
3508pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
3509 while !*rx.borrow_and_update() {
3510 if rx.changed().await.is_err() {
3511 std::future::pending::<()>().await;
3512 }
3513 }
3514}
3515
3516pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
3517 match rx {
3518 Some(inner) => {
3519 if let Some(v) = inner.recv().await {
3520 Some(v)
3521 } else {
3522 *rx = None;
3523 std::future::pending().await
3524 }
3525 }
3526 None => std::future::pending().await,
3527 }
3528}
3529
3530fn truncate_shell_command(cmd: &str) -> String {
3537 if cmd.len() <= 80 {
3538 return cmd.to_owned();
3539 }
3540 let end = cmd.floor_char_boundary(79);
3541 format!("{}…", &cmd[..end])
3542}
3543
3544fn truncate_shell_run_id(id: &str) -> String {
3546 id.chars().take(8).collect()
3547}
3548
3549pub(crate) fn resolve_context_budget(config: &Config, provider: &AnyProvider) -> usize {
3550 let tokens = if config.memory.auto_budget && config.memory.context_budget_tokens == 0 {
3551 if let Some(ctx_size) = provider.context_window() {
3552 tracing::info!(
3553 model_context = ctx_size,
3554 "auto-configured context budget on reload"
3555 );
3556 ctx_size
3557 } else {
3558 0
3559 }
3560 } else {
3561 config.memory.context_budget_tokens
3562 };
3563 if tokens == 0 {
3564 tracing::warn!(
3565 "context_budget_tokens resolved to 0 on reload — using fallback of 128000 tokens"
3566 );
3567 128_000
3568 } else {
3569 tokens
3570 }
3571}
3572
3573#[cfg(test)]
3574mod tests;
3575
3576#[cfg(test)]
3577pub(crate) use tests::agent_tests;
3578
3579#[cfg(test)]
3580mod test_stubs {
3581 use std::pin::Pin;
3582
3583 use zeph_commands::{
3584 CommandContext, CommandError, CommandHandler, CommandOutput, SlashCategory,
3585 };
3586
3587 pub(super) struct TestErrorCommand;
3593
3594 impl CommandHandler<CommandContext<'_>> for TestErrorCommand {
3595 fn name(&self) -> &'static str {
3596 "/test-error"
3597 }
3598
3599 fn description(&self) -> &'static str {
3600 "Test stub: always returns CommandError"
3601 }
3602
3603 fn category(&self) -> SlashCategory {
3604 SlashCategory::Session
3605 }
3606
3607 fn handle<'a>(
3608 &'a self,
3609 _ctx: &'a mut CommandContext<'_>,
3610 _args: &'a str,
3611 ) -> Pin<
3612 Box<dyn std::future::Future<Output = Result<CommandOutput, CommandError>> + Send + 'a>,
3613 > {
3614 Box::pin(async { Err(CommandError::new("boom")) })
3615 }
3616 }
3617}