1mod acp_commands;
10mod agent_access_impl;
11pub(crate) mod agent_supervisor;
12mod autodream;
13mod builder;
14pub(crate) mod channel_impl;
15mod command_context_impls;
16pub(super) mod compression_feedback;
17mod context;
18mod context_impls;
19pub(crate) mod context_manager;
20mod corrections;
21pub mod error;
22mod experiment_cmd;
23pub(crate) mod focus;
24mod index;
25mod learning;
26pub(crate) mod learning_engine;
27mod log_commands;
28mod loop_event;
29mod lsp_commands;
30mod magic_docs;
31mod mcp;
32pub(crate) mod memcot;
33mod message_queue;
34mod microcompact;
35mod model_commands;
36mod persistence;
37#[cfg(feature = "scheduler")]
38mod plan;
39mod policy_commands;
40mod provider_cmd;
41#[cfg(feature = "self-check")]
42mod quality_hook;
43pub(crate) mod rate_limiter;
44#[cfg(feature = "scheduler")]
45mod scheduler_commands;
46#[cfg(feature = "scheduler")]
47mod scheduler_loop;
48mod scope_commands;
49pub mod session_config;
50mod session_digest;
51pub(crate) mod sidequest;
52mod skill_management;
53pub mod slash_commands;
54pub mod speculative;
55pub(crate) mod state;
56pub(crate) mod task_injection;
57pub(crate) mod tool_execution;
58pub(crate) mod tool_orchestrator;
59pub mod trajectory;
60mod trajectory_commands;
61mod trust_commands;
62pub mod turn;
63mod utils;
64pub(crate) mod vigil;
65
66use std::collections::{HashMap, VecDeque};
67use std::fmt::Write as _;
68use std::sync::Arc;
69
70use parking_lot::RwLock;
71
72use tokio::sync::{mpsc, watch};
73use tokio_util::sync::CancellationToken;
74use zeph_llm::any::AnyProvider;
75use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
76use zeph_memory::TokenCounter;
77use zeph_memory::semantic::SemanticMemory;
78use zeph_skills::loader::Skill;
79use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
80use zeph_skills::prompt::format_skills_prompt;
81use zeph_skills::registry::SkillRegistry;
82use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
83
84use crate::channel::Channel;
85use crate::config::Config;
86use crate::context::{ContextBudget, build_system_prompt};
87use zeph_common::text::estimate_tokens;
88
89use loop_event::LoopEvent;
90use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, detect_image_mime};
91use state::MessageState;
92
93pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
94pub(crate) use zeph_agent_context::helpers::CODE_CONTEXT_PREFIX;
98pub(crate) const SCHEDULED_TASK_PREFIX: &str = "Execute the following scheduled task now: ";
99pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
100
101pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
102 use std::fmt::Write;
103 let capacity = "[tool output: ".len()
104 + tool_name.len()
105 + "]\n```\n".len()
106 + body.len()
107 + TOOL_OUTPUT_SUFFIX.len();
108 let mut buf = String::with_capacity(capacity);
109 let _ = write!(
110 buf,
111 "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
112 );
113 buf
114}
115
116pub struct Agent<C: Channel> {
149 provider: AnyProvider,
151 embedding_provider: AnyProvider,
156 channel: C,
157 pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
158
159 pub(super) msg: MessageState,
161 pub(super) context_manager: context_manager::ContextManager,
162 pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
163
164 pub(super) services: state::Services,
166
167 pub(super) runtime: state::AgentRuntime,
169}
170
171enum DispatchFlow {
173 Break,
175 Continue,
177 Fallthrough,
179}
180
181impl<C: Channel> Agent<C> {
182 #[must_use]
206 pub fn new(
207 provider: AnyProvider,
208 channel: C,
209 registry: SkillRegistry,
210 matcher: Option<SkillMatcherBackend>,
211 max_active_skills: usize,
212 tool_executor: impl ToolExecutor + 'static,
213 ) -> Self {
214 let registry = Arc::new(RwLock::new(registry));
215 let embedding_provider = provider.clone();
216 Self::new_with_registry_arc(
217 provider,
218 embedding_provider,
219 channel,
220 registry,
221 matcher,
222 max_active_skills,
223 tool_executor,
224 )
225 }
226
227 #[must_use]
234 pub fn new_with_registry_arc(
235 provider: AnyProvider,
236 embedding_provider: AnyProvider,
237 channel: C,
238 registry: Arc<RwLock<SkillRegistry>>,
239 matcher: Option<SkillMatcherBackend>,
240 max_active_skills: usize,
241 tool_executor: impl ToolExecutor + 'static,
242 ) -> Self {
243 use state::{
244 AgentRuntime, CompressionState, DebugState, ExperimentState, FeedbackState, IndexState,
245 InstructionState, LifecycleState, McpState, MemoryState, MetricsState,
246 OrchestrationState, ProviderState, RuntimeConfig, SecurityState, Services,
247 SessionState, SkillState, ToolState,
248 };
249
250 debug_assert!(max_active_skills > 0, "max_active_skills must be > 0");
251 let all_skills: Vec<Skill> = {
252 let reg = registry.read();
253 reg.all_meta()
254 .iter()
255 .filter_map(|m| reg.skill(&m.name).ok())
256 .collect()
257 };
258 let empty_trust = HashMap::new();
259 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
260 let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
261 let system_prompt = build_system_prompt(&skills_prompt, None);
262 tracing::debug!(len = system_prompt.len(), "initial system prompt built");
263 tracing::trace!(prompt = %system_prompt, "full system prompt");
264
265 let initial_prompt_tokens = estimate_tokens(&system_prompt) as u64;
266 let token_counter = Arc::new(TokenCounter::new());
267
268 let services = Services {
269 memory: MemoryState::default(),
270 skill: SkillState::new(registry, matcher, max_active_skills, skills_prompt),
271 learning_engine: learning_engine::LearningEngine::new(),
272 feedback: FeedbackState::default(),
273 mcp: McpState::default(),
274 index: IndexState::default(),
275 session: SessionState::new(),
276 security: SecurityState::default(),
277 experiments: ExperimentState::new(),
278 compression: CompressionState::default(),
279 orchestration: OrchestrationState::default(),
280 focus: focus::FocusState::default(),
281 sidequest: sidequest::SidequestState::default(),
282 tool_state: ToolState::default(),
283 goal_accounting: None,
284 #[cfg(feature = "self-check")]
285 quality: None,
286 proactive_explorer: None,
287 promotion_engine: None,
288 taco_compressor: None,
289 speculation_engine: None,
290 };
291
292 let runtime = AgentRuntime {
293 config: RuntimeConfig::default(),
294 lifecycle: LifecycleState::new(),
295 providers: ProviderState::new(initial_prompt_tokens),
296 metrics: MetricsState::new(token_counter),
297 debug: DebugState::default(),
298 instructions: InstructionState::default(),
299 };
300
301 Self {
302 provider,
303 embedding_provider,
304 channel,
305 tool_executor: Arc::new(tool_executor),
306 msg: MessageState {
307 messages: vec![Message {
308 role: Role::System,
309 content: system_prompt,
310 parts: vec![],
311 metadata: MessageMetadata::default(),
312 }],
313 message_queue: VecDeque::new(),
314 pending_image_parts: Vec::new(),
315 last_persisted_message_id: None,
316 deferred_db_hide_ids: Vec::new(),
317 deferred_db_summaries: Vec::new(),
318 },
319 context_manager: context_manager::ContextManager::new(),
320 tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
321 services,
322 runtime,
323 }
324 }
325
326 #[must_use]
339 pub fn into_channel(self) -> C {
340 self.channel
341 }
342
343 pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
348 let Some(mgr) = &mut self.services.orchestration.subagent_manager else {
349 return vec![];
350 };
351
352 let finished: Vec<String> = mgr
353 .statuses()
354 .into_iter()
355 .filter_map(|(id, status)| {
356 if matches!(
357 status.state,
358 zeph_subagent::SubAgentState::Completed
359 | zeph_subagent::SubAgentState::Failed
360 | zeph_subagent::SubAgentState::Canceled
361 ) {
362 Some(id)
363 } else {
364 None
365 }
366 })
367 .collect();
368
369 let mut results = vec![];
370 for task_id in finished {
371 match mgr.collect(&task_id).await {
372 Ok(result) => results.push((task_id, result)),
373 Err(e) => {
374 tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
375 }
376 }
377 }
378 results
379 }
380
381 async fn call_llm_for_session_summary(
390 &self,
391 chat_messages: &[Message],
392 ) -> Option<zeph_memory::StructuredSummary> {
393 let timeout_dur = std::time::Duration::from_secs(
394 self.services
395 .memory
396 .compaction
397 .shutdown_summary_timeout_secs,
398 );
399 match tokio::time::timeout(
400 timeout_dur,
401 self.provider
402 .chat_typed_erased::<zeph_memory::StructuredSummary>(chat_messages),
403 )
404 .await
405 {
406 Ok(Ok(s)) => Some(s),
407 Ok(Err(e)) => {
408 tracing::warn!(
409 "shutdown summary: structured LLM call failed, falling back to plain: {e:#}"
410 );
411 self.plain_text_summary_fallback(chat_messages, timeout_dur)
412 .await
413 }
414 Err(_) => {
415 tracing::warn!(
416 "shutdown summary: structured LLM call timed out after {}s, falling back to plain",
417 self.services
418 .memory
419 .compaction
420 .shutdown_summary_timeout_secs
421 );
422 self.plain_text_summary_fallback(chat_messages, timeout_dur)
423 .await
424 }
425 }
426 }
427
428 async fn plain_text_summary_fallback(
429 &self,
430 chat_messages: &[Message],
431 timeout_dur: std::time::Duration,
432 ) -> Option<zeph_memory::StructuredSummary> {
433 match tokio::time::timeout(timeout_dur, self.provider.chat(chat_messages)).await {
434 Ok(Ok(plain)) => Some(zeph_memory::StructuredSummary {
435 summary: plain,
436 key_facts: vec![],
437 entities: vec![],
438 }),
439 Ok(Err(e)) => {
440 tracing::warn!("shutdown summary: plain LLM fallback failed: {e:#}");
441 None
442 }
443 Err(_) => {
444 tracing::warn!("shutdown summary: plain LLM fallback timed out");
445 None
446 }
447 }
448 }
449
450 async fn flush_orphaned_tool_use_on_shutdown(&mut self) {
455 use zeph_llm::provider::{MessagePart, Role};
456
457 let msgs = &self.msg.messages;
461 let Some(asst_idx) = msgs.iter().rposition(|m| m.role == Role::Assistant) else {
463 return;
464 };
465 let asst_msg = &msgs[asst_idx];
466 let tool_use_ids: Vec<(&str, &str, &serde_json::Value)> = asst_msg
467 .parts
468 .iter()
469 .filter_map(|p| {
470 if let MessagePart::ToolUse { id, name, input } = p {
471 Some((id.as_str(), name.as_str(), input))
472 } else {
473 None
474 }
475 })
476 .collect();
477 if tool_use_ids.is_empty() {
478 return;
479 }
480
481 let paired_ids: std::collections::HashSet<&str> = msgs
483 .get(asst_idx + 1..)
484 .into_iter()
485 .flatten()
486 .filter(|m| m.role == Role::User)
487 .flat_map(|m| m.parts.iter())
488 .filter_map(|p| {
489 if let MessagePart::ToolResult { tool_use_id, .. } = p {
490 Some(tool_use_id.as_str())
491 } else {
492 None
493 }
494 })
495 .collect();
496
497 let unpaired: Vec<zeph_llm::provider::ToolUseRequest> = tool_use_ids
498 .iter()
499 .filter(|(id, _, _)| !paired_ids.contains(*id))
500 .map(|(id, name, input)| zeph_llm::provider::ToolUseRequest {
501 id: (*id).to_owned(),
502 name: (*name).to_owned().into(),
503 input: (*input).clone(),
504 })
505 .collect();
506
507 if unpaired.is_empty() {
508 return;
509 }
510
511 tracing::info!(
512 count = unpaired.len(),
513 "shutdown: persisting tombstone ToolResults for unpaired in-flight tool calls"
514 );
515 self.persist_cancelled_tool_results(&unpaired).await;
516 }
517
518 async fn maybe_store_shutdown_summary(&mut self) {
528 if !self.services.memory.compaction.shutdown_summary {
529 return;
530 }
531 let Some(memory) = self.services.memory.persistence.memory.clone() else {
532 return;
533 };
534 let Some(conversation_id) = self.services.memory.persistence.conversation_id else {
535 return;
536 };
537
538 match memory.has_session_summary(conversation_id).await {
540 Ok(true) => {
541 tracing::debug!("shutdown summary: session already has a summary, skipping");
542 return;
543 }
544 Ok(false) => {}
545 Err(e) => {
546 tracing::warn!("shutdown summary: failed to check existing summary: {e:#}");
547 return;
548 }
549 }
550
551 let user_count = self
553 .msg
554 .messages
555 .iter()
556 .skip(1)
557 .filter(|m| m.role == Role::User)
558 .count();
559 if user_count
560 < self
561 .services
562 .memory
563 .compaction
564 .shutdown_summary_min_messages
565 {
566 tracing::debug!(
567 user_count,
568 min = self
569 .services
570 .memory
571 .compaction
572 .shutdown_summary_min_messages,
573 "shutdown summary: too few user messages, skipping"
574 );
575 return;
576 }
577
578 let _ = self.channel.send_status("Saving session summary...").await;
580
581 let max = self
583 .services
584 .memory
585 .compaction
586 .shutdown_summary_max_messages;
587 if max == 0 {
588 tracing::debug!("shutdown summary: max_messages=0, skipping");
589 return;
590 }
591 let non_system: Vec<_> = self.msg.messages.iter().skip(1).collect();
592 let slice = if non_system.len() > max {
593 &non_system[non_system.len() - max..]
594 } else {
595 &non_system[..]
596 };
597
598 let msgs_for_prompt: Vec<(zeph_memory::MessageId, String, String)> = slice
599 .iter()
600 .map(|m| {
601 let role = match m.role {
602 Role::User => "user".to_owned(),
603 Role::Assistant => "assistant".to_owned(),
604 Role::System => "system".to_owned(),
605 };
606 (zeph_memory::MessageId(0), role, m.content.clone())
607 })
608 .collect();
609
610 let prompt = zeph_memory::build_summarization_prompt(&msgs_for_prompt);
611 let chat_messages = vec![Message {
612 role: Role::User,
613 content: prompt,
614 parts: vec![],
615 metadata: MessageMetadata::default(),
616 }];
617
618 let Some(structured) = self.call_llm_for_session_summary(&chat_messages).await else {
619 let _ = self.channel.send_status("").await;
620 return;
621 };
622
623 if let Err(e) = memory
624 .store_shutdown_summary(conversation_id, &structured.summary, &structured.key_facts)
625 .await
626 {
627 tracing::warn!("shutdown summary: storage failed: {e:#}");
628 } else {
629 tracing::info!(
630 conversation_id = conversation_id.0,
631 "shutdown summary stored"
632 );
633 }
634
635 let _ = self.channel.send_status("").await;
637 }
638
639 pub async fn shutdown(&mut self) {
655 let _ = self.channel.send_status("Shutting down...").await;
656
657 self.provider.save_router_state().await;
659
660 if let Some(ref advisor) = self.services.orchestration.topology_advisor
662 && let Err(e) = advisor.save()
663 {
664 tracing::warn!(error = %e, "adaptorch: failed to persist state");
665 }
666
667 if let Some(ref mut mgr) = self.services.orchestration.subagent_manager {
668 mgr.shutdown_all();
669 }
670
671 if let Some(ref manager) = self.services.mcp.manager {
672 manager.shutdown_all_shared().await;
673 }
674
675 if let Some(turns) = self.context_manager.turns_since_last_hard_compaction {
679 self.update_metrics(|m| {
680 m.compaction_turns_after_hard.push(turns);
681 });
682 self.context_manager.turns_since_last_hard_compaction = None;
683 }
684
685 if let Some(ref tx) = self.runtime.metrics.metrics_tx {
686 let m = tx.borrow();
687 if m.filter_applications > 0 {
688 #[allow(clippy::cast_precision_loss)]
689 let pct = if m.filter_raw_tokens > 0 {
690 m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
691 } else {
692 0.0
693 };
694 tracing::info!(
695 raw_tokens = m.filter_raw_tokens,
696 saved_tokens = m.filter_saved_tokens,
697 applications = m.filter_applications,
698 "tool output filtering saved ~{} tokens ({pct:.0}%)",
699 m.filter_saved_tokens,
700 );
701 }
702 if m.compaction_hard_count > 0 {
703 tracing::info!(
704 hard_compactions = m.compaction_hard_count,
705 turns_after_hard = ?m.compaction_turns_after_hard,
706 "hard compaction trajectory"
707 );
708 }
709 }
710
711 self.flush_orphaned_tool_use_on_shutdown().await;
715
716 self.runtime.lifecycle.supervisor.abort_all();
722
723 if let Some(h) = self.services.compression.pending_task_goal.take() {
726 h.abort();
727 }
728 if let Some(h) = self.services.compression.pending_sidequest_result.take() {
729 h.abort();
730 }
731 if let Some(h) = self.services.compression.pending_subgoal.take() {
732 h.abort();
733 }
734
735 self.services.learning_engine.learning_tasks.abort_all();
737
738 for _ in 0..4 {
743 tokio::task::yield_now().await;
744 }
745
746 self.maybe_store_shutdown_summary().await;
747 self.maybe_store_session_digest().await;
748
749 tracing::info!("agent shutdown complete");
750 }
751
752 fn refresh_subagent_metrics(&mut self) {
759 let Some(ref mgr) = self.services.orchestration.subagent_manager else {
760 return;
761 };
762 let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
763 .statuses()
764 .into_iter()
765 .map(|(id, s)| {
766 let def = mgr.agents_def(&id);
767 crate::metrics::SubAgentMetrics {
768 name: def.map_or_else(|| id[..8.min(id.len())].to_owned(), |d| d.name.clone()),
769 id: id.clone(),
770 state: format!("{:?}", s.state).to_lowercase(),
771 turns_used: s.turns_used,
772 max_turns: def.map_or(20, |d| d.permissions.max_turns),
773 background: def.is_some_and(|d| d.permissions.background),
774 elapsed_secs: s.started_at.elapsed().as_secs(),
775 permission_mode: def.map_or_else(String::new, |d| {
776 use zeph_subagent::def::PermissionMode;
777 match d.permissions.permission_mode {
778 PermissionMode::Default => String::new(),
779 PermissionMode::AcceptEdits => "accept_edits".into(),
780 PermissionMode::DontAsk => "dont_ask".into(),
781 PermissionMode::BypassPermissions => "bypass_permissions".into(),
782 PermissionMode::Plan => "plan".into(),
783 }
784 }),
785 transcript_dir: mgr
786 .agent_transcript_dir(&id)
787 .map(|p| p.to_string_lossy().into_owned()),
788 }
789 })
790 .collect();
791 self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
792 }
793
794 async fn notify_completed_subagents(&mut self) -> Result<(), error::AgentError> {
796 let completed = self.poll_subagents().await;
797 for (task_id, result) in completed {
798 let notice = if result.is_empty() {
799 format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
800 } else {
801 format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
802 };
803 if let Err(e) = self.channel.send(¬ice).await {
804 tracing::warn!(error = %e, "failed to send sub-agent completion notice");
805 }
806 }
807 Ok(())
808 }
809
810 #[allow(clippy::too_many_lines)] pub async fn run(&mut self) -> Result<(), error::AgentError>
817 where
818 C: 'static,
819 {
820 if let Some(mut rx) = self.runtime.lifecycle.warmup_ready.take()
821 && !*rx.borrow()
822 {
823 let _ = rx.changed().await;
824 if !*rx.borrow() {
825 tracing::warn!("model warmup did not complete successfully");
826 }
827 }
828
829 self.restore_channel_provider().await;
831
832 self.load_and_cache_session_digest().await;
834 self.maybe_send_resume_recap().await;
835
836 loop {
837 self.apply_provider_override();
838 self.check_tool_refresh().await;
839 self.process_pending_elicitations().await;
840 self.refresh_subagent_metrics();
841 self.notify_completed_subagents().await?;
842 self.drain_channel();
843
844 let (text, image_parts) = if let Some(queued) = self.msg.message_queue.pop_front() {
845 self.notify_queue_count().await;
846 if queued.raw_attachments.is_empty() {
847 (queued.text, queued.image_parts)
848 } else {
849 let msg = crate::channel::ChannelMessage {
850 text: queued.text,
851 attachments: queued.raw_attachments,
852 };
853 self.resolve_message(msg).await
854 }
855 } else {
856 match self.next_event().await? {
857 None | Some(LoopEvent::Shutdown) => break,
858 Some(LoopEvent::SkillReload) => {
859 self.reload_skills().await;
860 continue;
861 }
862 Some(LoopEvent::InstructionReload) => {
863 self.reload_instructions();
864 continue;
865 }
866 Some(LoopEvent::ConfigReload) => {
867 self.reload_config();
868 continue;
869 }
870 Some(LoopEvent::UpdateNotification(msg)) => {
871 if let Err(e) = self.channel.send(&msg).await {
872 tracing::warn!("failed to send update notification: {e}");
873 }
874 continue;
875 }
876 Some(LoopEvent::ExperimentCompleted(msg)) => {
877 self.services.experiments.cancel = None;
878 if let Err(e) = self.channel.send(&msg).await {
879 tracing::warn!("failed to send experiment completion: {e}");
880 }
881 continue;
882 }
883 Some(LoopEvent::ScheduledTask(prompt)) => {
884 let text = format!("{SCHEDULED_TASK_PREFIX}{prompt}");
885 let msg = crate::channel::ChannelMessage {
886 text,
887 attachments: Vec::new(),
888 };
889 self.drain_channel();
890 self.resolve_message(msg).await
891 }
892 Some(LoopEvent::TaskInjected(injection)) => {
893 if let Some(ref mut ls) = self.runtime.lifecycle.user_loop {
894 ls.iteration += 1;
895 tracing::info!(iteration = ls.iteration, "loop: tick");
896 }
897 let msg = crate::channel::ChannelMessage {
898 text: injection.prompt,
899 attachments: Vec::new(),
900 };
901 self.drain_channel();
902 self.resolve_message(msg).await
903 }
904 Some(LoopEvent::FileChanged(event)) => {
905 self.handle_file_changed(event).await;
906 continue;
907 }
908 Some(LoopEvent::Message(msg)) => {
909 self.drain_channel();
910 self.resolve_message(msg).await
911 }
912 }
913 };
914
915 let trimmed = text.trim();
916
917 if trimmed.starts_with('/') {
920 let slash_urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
921 if !slash_urls.is_empty() {
922 self.services
923 .security
924 .user_provided_urls
925 .write()
926 .extend(slash_urls);
927 }
928 }
929
930 let session_impl = command_context_impls::SessionAccessImpl {
953 supports_exit: self.channel.supports_exit(),
954 };
955 let mut messages_impl = command_context_impls::MessageAccessImpl {
956 msg: &mut self.msg,
957 tool_state: &mut self.services.tool_state,
958 providers: &mut self.runtime.providers,
959 metrics: &self.runtime.metrics,
960 security: &mut self.services.security,
961 tool_orchestrator: &mut self.tool_orchestrator,
962 };
963 let mut sink_adapter = crate::channel::ChannelSinkAdapter(&mut self.channel);
965 let mut null_agent = zeph_commands::NullAgent;
967 let registry_handled = {
968 use zeph_commands::CommandRegistry;
969 use zeph_commands::handlers::debug::{
970 DebugDumpCommand, DumpFormatCommand, LogCommand,
971 };
972 use zeph_commands::handlers::help::HelpCommand;
973 use zeph_commands::handlers::session::{
974 ClearCommand, ClearQueueCommand, ExitCommand, QuitCommand, ResetCommand,
975 };
976
977 let mut reg = CommandRegistry::new();
978 reg.register(ExitCommand);
979 reg.register(QuitCommand);
980 reg.register(ClearCommand);
981 reg.register(ResetCommand);
982 reg.register(ClearQueueCommand);
983 reg.register(LogCommand);
984 reg.register(DebugDumpCommand);
985 reg.register(DumpFormatCommand);
986 reg.register(HelpCommand);
987 #[cfg(test)]
988 reg.register(test_stubs::TestErrorCommand);
989
990 let mut ctx = zeph_commands::CommandContext {
991 sink: &mut sink_adapter,
992 debug: &mut self.runtime.debug,
993 messages: &mut messages_impl,
994 session: &session_impl,
995 agent: &mut null_agent,
996 };
997 reg.dispatch(&mut ctx, trimmed).await
998 };
999 let session_reg_missed = registry_handled.is_none();
1000 match self
1001 .apply_dispatch_result(registry_handled, trimmed, false)
1002 .await
1003 {
1004 DispatchFlow::Break => break,
1005 DispatchFlow::Continue => continue,
1006 DispatchFlow::Fallthrough => {
1007 }
1009 }
1010
1011 let mut agent_null_debug = command_context_impls::NullDebugAccess;
1017 let mut agent_null_messages = command_context_impls::NullMessageAccess;
1018 let agent_null_session = command_context_impls::NullSessionAccess;
1019 let mut agent_null_sink = zeph_commands::NullSink;
1020 let agent_result: Option<
1021 Result<zeph_commands::CommandOutput, zeph_commands::CommandError>,
1022 > = if session_reg_missed {
1023 use zeph_commands::CommandRegistry;
1024 use zeph_commands::handlers::{
1025 acp::AcpCommand,
1026 agent_cmd::AgentCommand,
1027 compaction::{CompactCommand, NewConversationCommand, RecapCommand},
1028 experiment::ExperimentCommand,
1029 goal::GoalCommand,
1030 loop_cmd::LoopCommand,
1031 lsp::LspCommand,
1032 mcp::McpCommand,
1033 memory::{GraphCommand, GuidelinesCommand, MemoryCommand},
1034 misc::{CacheStatsCommand, ImageCommand, NotifyTestCommand},
1035 model::{ModelCommand, ProviderCommand},
1036 plan::PlanCommand,
1037 plugins::PluginsCommand,
1038 policy::PolicyCommand,
1039 scheduler::SchedulerCommand,
1040 skill::{FeedbackCommand, SkillCommand, SkillsCommand},
1041 status::{FocusCommand, GuardrailCommand, SideQuestCommand, StatusCommand},
1042 trajectory::{ScopeCommand, TrajectoryCommand},
1043 };
1044
1045 let mut agent_reg = CommandRegistry::new();
1046 agent_reg.register(MemoryCommand);
1047 agent_reg.register(GraphCommand);
1048 agent_reg.register(GuidelinesCommand);
1049 agent_reg.register(ModelCommand);
1050 agent_reg.register(ProviderCommand);
1051 agent_reg.register(SkillCommand);
1053 agent_reg.register(SkillsCommand);
1054 agent_reg.register(FeedbackCommand);
1055 agent_reg.register(McpCommand);
1056 agent_reg.register(PolicyCommand);
1057 agent_reg.register(SchedulerCommand);
1058 agent_reg.register(LspCommand);
1059 agent_reg.register(CacheStatsCommand);
1061 agent_reg.register(ImageCommand);
1062 agent_reg.register(NotifyTestCommand);
1063 agent_reg.register(StatusCommand);
1064 agent_reg.register(GuardrailCommand);
1065 agent_reg.register(FocusCommand);
1066 agent_reg.register(SideQuestCommand);
1067 agent_reg.register(AgentCommand);
1068 agent_reg.register(CompactCommand);
1070 agent_reg.register(NewConversationCommand);
1071 agent_reg.register(RecapCommand);
1072 agent_reg.register(ExperimentCommand);
1073 agent_reg.register(PlanCommand);
1074 agent_reg.register(LoopCommand);
1075 agent_reg.register(PluginsCommand);
1076 agent_reg.register(AcpCommand);
1077 agent_reg.register(TrajectoryCommand);
1078 agent_reg.register(ScopeCommand);
1079 agent_reg.register(GoalCommand);
1080
1081 let mut ctx = zeph_commands::CommandContext {
1082 sink: &mut agent_null_sink,
1083 debug: &mut agent_null_debug,
1084 messages: &mut agent_null_messages,
1085 session: &agent_null_session,
1086 agent: self,
1087 };
1088 agent_reg.dispatch(&mut ctx, trimmed).await
1090 } else {
1091 None
1092 };
1093 match self
1097 .apply_dispatch_result(agent_result, trimmed, true)
1098 .await
1099 {
1100 DispatchFlow::Break => break,
1101 DispatchFlow::Continue => continue,
1102 DispatchFlow::Fallthrough => {
1103 }
1105 }
1106
1107 match self.handle_builtin_command(trimmed) {
1108 Some(true) => break,
1109 Some(false) => continue,
1110 None => {}
1111 }
1112
1113 self.process_user_message(text, image_parts).await?;
1114 }
1115
1116 self.maybe_autodream().await;
1119
1120 if let Some(ref mut tc) = self.runtime.debug.trace_collector {
1122 tc.finish();
1123 }
1124
1125 Ok(())
1126 }
1127
1128 async fn apply_dispatch_result(
1134 &mut self,
1135 result: Option<Result<zeph_commands::CommandOutput, zeph_commands::CommandError>>,
1136 command: &str,
1137 with_learning: bool,
1138 ) -> DispatchFlow {
1139 match result {
1140 Some(Ok(zeph_commands::CommandOutput::Exit)) => {
1141 let _ = self.channel.flush_chunks().await;
1142 DispatchFlow::Break
1143 }
1144 Some(Ok(
1145 zeph_commands::CommandOutput::Continue | zeph_commands::CommandOutput::Silent,
1146 )) => {
1147 let _ = self.channel.flush_chunks().await;
1148 DispatchFlow::Continue
1149 }
1150 Some(Ok(zeph_commands::CommandOutput::Message(msg))) => {
1151 let _ = self.channel.send(&msg).await;
1152 let _ = self.channel.flush_chunks().await;
1153 if with_learning {
1154 self.maybe_trigger_post_command_learning(command).await;
1155 }
1156 DispatchFlow::Continue
1157 }
1158 Some(Err(e)) => {
1159 let _ = self.channel.send(&e.to_string()).await;
1160 let _ = self.channel.flush_chunks().await;
1161 tracing::warn!(command = %command, error = %e.0, "slash command failed");
1162 DispatchFlow::Continue
1163 }
1164 None => DispatchFlow::Fallthrough,
1165 }
1166 }
1167
1168 fn apply_provider_override(&mut self) {
1170 if let Some(ref slot) = self.runtime.providers.provider_override
1171 && let Some(new_provider) = slot.write().take()
1172 {
1173 tracing::debug!(provider = new_provider.name(), "ACP model override applied");
1174 self.provider = new_provider;
1175 }
1176 }
1177
1178 async fn next_event(&mut self) -> Result<Option<LoopEvent>, error::AgentError> {
1186 let event = tokio::select! {
1187 result = self.channel.recv() => {
1188 return Ok(result?.map(LoopEvent::Message));
1189 }
1190 () = shutdown_signal(&mut self.runtime.lifecycle.shutdown) => {
1191 tracing::info!("shutting down");
1192 LoopEvent::Shutdown
1193 }
1194 Some(_) = recv_optional(&mut self.services.skill.skill_reload_rx) => {
1195 LoopEvent::SkillReload
1196 }
1197 Some(_) = recv_optional(&mut self.runtime.instructions.reload_rx) => {
1198 LoopEvent::InstructionReload
1199 }
1200 Some(_) = recv_optional(&mut self.runtime.lifecycle.config_reload_rx) => {
1201 LoopEvent::ConfigReload
1202 }
1203 Some(msg) = recv_optional(&mut self.runtime.lifecycle.update_notify_rx) => {
1204 LoopEvent::UpdateNotification(msg)
1205 }
1206 Some(msg) = recv_optional(&mut self.services.experiments.notify_rx) => {
1207 LoopEvent::ExperimentCompleted(msg)
1208 }
1209 Some(prompt) = recv_optional(&mut self.runtime.lifecycle.custom_task_rx) => {
1210 tracing::info!("scheduler: injecting custom task as agent turn");
1211 LoopEvent::ScheduledTask(prompt)
1212 }
1213 () = async {
1214 if let Some(ref mut ls) = self.runtime.lifecycle.user_loop {
1215 if ls.cancel_tx.is_cancelled() {
1216 std::future::pending::<()>().await;
1217 } else {
1218 ls.interval.tick().await;
1219 }
1220 } else {
1221 std::future::pending::<()>().await;
1222 }
1223 } => {
1224 let Some(ls) = self.runtime.lifecycle.user_loop.as_ref() else {
1228 return Ok(None);
1229 };
1230 if ls.cancel_tx.is_cancelled() {
1231 self.runtime.lifecycle.user_loop = None;
1232 return Ok(None);
1233 }
1234 let prompt = ls.prompt.clone();
1235 LoopEvent::TaskInjected(task_injection::TaskInjection { prompt })
1236 }
1237 Some(event) = recv_optional(&mut self.runtime.lifecycle.file_changed_rx) => {
1238 LoopEvent::FileChanged(event)
1239 }
1240 };
1241 Ok(Some(event))
1242 }
1243
1244 async fn resolve_message(
1245 &self,
1246 msg: crate::channel::ChannelMessage,
1247 ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
1248 use crate::channel::{Attachment, AttachmentKind};
1249 use zeph_llm::provider::{ImageData, MessagePart};
1250
1251 let text_base = msg.text.clone();
1252
1253 let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
1254 .attachments
1255 .into_iter()
1256 .partition(|a| a.kind == AttachmentKind::Audio);
1257
1258 tracing::debug!(
1259 audio = audio_attachments.len(),
1260 has_stt = self.runtime.providers.stt.is_some(),
1261 "resolve_message attachments"
1262 );
1263
1264 let text = if !audio_attachments.is_empty()
1265 && let Some(stt) = self.runtime.providers.stt.as_ref()
1266 {
1267 let mut transcribed_parts = Vec::new();
1268 for attachment in &audio_attachments {
1269 if attachment.data.len() > MAX_AUDIO_BYTES {
1270 tracing::warn!(
1271 size = attachment.data.len(),
1272 max = MAX_AUDIO_BYTES,
1273 "audio attachment exceeds size limit, skipping"
1274 );
1275 continue;
1276 }
1277 match stt
1278 .transcribe(&attachment.data, attachment.filename.as_deref())
1279 .await
1280 {
1281 Ok(result) => {
1282 tracing::info!(
1283 len = result.text.len(),
1284 language = ?result.language,
1285 "audio transcribed"
1286 );
1287 transcribed_parts.push(result.text);
1288 }
1289 Err(e) => {
1290 tracing::error!(error = %e, "audio transcription failed");
1291 }
1292 }
1293 }
1294 if transcribed_parts.is_empty() {
1295 text_base
1296 } else {
1297 let transcribed = transcribed_parts.join("\n");
1298 if text_base.is_empty() {
1299 transcribed
1300 } else {
1301 format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
1302 }
1303 }
1304 } else {
1305 if !audio_attachments.is_empty() {
1306 tracing::warn!(
1307 count = audio_attachments.len(),
1308 "audio attachments received but no STT provider configured, dropping"
1309 );
1310 }
1311 text_base
1312 };
1313
1314 let mut image_parts = Vec::new();
1315 for attachment in image_attachments {
1316 if attachment.data.len() > MAX_IMAGE_BYTES {
1317 tracing::warn!(
1318 size = attachment.data.len(),
1319 max = MAX_IMAGE_BYTES,
1320 "image attachment exceeds size limit, skipping"
1321 );
1322 continue;
1323 }
1324 let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
1325 image_parts.push(MessagePart::Image(Box::new(ImageData {
1326 data: attachment.data,
1327 mime_type,
1328 })));
1329 }
1330
1331 (text, image_parts)
1332 }
1333
1334 fn begin_turn(&mut self, input: turn::TurnInput) -> turn::Turn {
1341 let id = turn::TurnId(self.runtime.debug.iteration_counter as u64);
1342 self.runtime.debug.iteration_counter += 1;
1343 let cancel_token = CancellationToken::new();
1344 self.runtime.lifecycle.cancel_token = cancel_token.clone();
1346 self.services.security.user_provided_urls.write().clear();
1347 self.runtime.lifecycle.turn_llm_requests = 0;
1349
1350 {
1352 let pending: Vec<u8> = {
1353 let mut q = self.services.security.trajectory_signal_queue.lock();
1354 std::mem::take(&mut *q)
1355 };
1356 for code in pending {
1357 self.services
1358 .security
1359 .trajectory
1360 .record(crate::agent::trajectory::RiskSignal::from_code(code));
1361 }
1362 }
1363 if self.services.security.trajectory.advance_turn()
1366 && let Some(logger) = self.tool_orchestrator.audit_logger.clone()
1367 {
1368 let entry = zeph_tools::AuditEntry {
1369 timestamp: zeph_tools::chrono_now(),
1370 tool: "<sentinel>".to_owned().into(),
1371 command: String::new(),
1372 result: zeph_tools::AuditResult::Success,
1373 duration_ms: 0,
1374 error_category: Some("trajectory_auto_recover".to_owned()),
1375 error_domain: Some("security".to_owned()),
1376 error_phase: None,
1377 claim_source: None,
1378 mcp_server_id: None,
1379 injection_flagged: false,
1380 embedding_anomalous: false,
1381 cross_boundary_mcp_to_acp: false,
1382 adversarial_policy_decision: None,
1383 exit_code: None,
1384 truncated: false,
1385 caller_id: None,
1386 policy_match: None,
1387 correlation_id: None,
1388 vigil_risk: None,
1389 execution_env: None,
1390 resolved_cwd: None,
1391 scope_at_definition: None,
1392 scope_at_dispatch: None,
1393 };
1394 self.runtime.lifecycle.supervisor.spawn(
1395 crate::agent::agent_supervisor::TaskClass::Telemetry,
1396 "trajectory-auto-recover-audit",
1397 async move { logger.log(&entry).await },
1398 );
1399 }
1400 let risk_level = self.services.security.trajectory.current_risk();
1402 *self.services.security.trajectory_risk_slot.write() = u8::from(risk_level);
1403 if let Some(alert) = self.services.security.trajectory.poll_alert() {
1405 let msg = format!(
1406 "[trajectory] Risk level: {:?} (score={:.2})",
1407 alert.level, alert.score
1408 );
1409 tracing::warn!(
1410 level = ?alert.level,
1411 score = alert.score,
1412 "trajectory sentinel alert"
1413 );
1414 if let Some(ref tx) = self.services.session.status_tx {
1415 let _ = tx.send(msg);
1416 }
1417 }
1418
1419 let context = turn::TurnContext::new(id, cancel_token, self.runtime.config.timeouts);
1420 turn::Turn::new(context, input)
1421 }
1422
1423 fn end_turn(&mut self, turn: turn::Turn) {
1430 self.runtime.metrics.pending_timings = turn.metrics.timings;
1431 self.flush_turn_timings();
1432 self.services.session.current_turn_intent = None;
1434 if let Some(ref engine) = self.services.speculation_engine {
1436 let metrics = engine.end_turn();
1437 if metrics.committed > 0 || metrics.cancelled > 0 {
1438 tracing::debug!(
1439 committed = metrics.committed,
1440 cancelled = metrics.cancelled,
1441 wasted_ms = metrics.wasted_ms,
1442 "speculation: turn boundary metrics"
1443 );
1444 }
1445 }
1446 }
1447
1448 #[cfg_attr(
1449 feature = "profiling",
1450 tracing::instrument(name = "agent.turn", skip_all, fields(turn_id))
1451 )]
1452 async fn process_user_message(
1453 &mut self,
1454 text: String,
1455 image_parts: Vec<zeph_llm::provider::MessagePart>,
1456 ) -> Result<(), error::AgentError> {
1457 let input = turn::TurnInput::new(text, image_parts);
1458 let mut t = self.begin_turn(input);
1459
1460 let turn_idx = usize::try_from(t.id().0).unwrap_or(usize::MAX);
1461 tracing::Span::current().record("turn_id", t.id().0);
1462 self.runtime
1464 .debug
1465 .start_iteration_span(turn_idx, t.input.text.trim());
1466
1467 let result = Box::pin(self.process_user_message_inner(&mut t)).await;
1468
1469 let span_status = if result.is_ok() {
1471 crate::debug_dump::trace::SpanStatus::Ok
1472 } else {
1473 crate::debug_dump::trace::SpanStatus::Error {
1474 message: "iteration failed".to_owned(),
1475 }
1476 };
1477 self.runtime.debug.end_iteration_span(turn_idx, span_status);
1478
1479 self.end_turn(t);
1480 result
1481 }
1482
1483 async fn process_user_message_inner(
1484 &mut self,
1485 turn: &mut turn::Turn,
1486 ) -> Result<(), error::AgentError> {
1487 self.reap_background_tasks_and_update_metrics();
1488
1489 let tokens_before_turn = self
1490 .runtime
1491 .metrics
1492 .metrics_tx
1493 .as_ref()
1494 .map_or(0, |tx| tx.borrow().total_tokens);
1495
1496 self.drain_background_completions();
1500
1501 self.wire_cancel_bridge(turn.cancel_token());
1502
1503 let text = turn.input.text.clone();
1505 let trimmed_owned = text.trim().to_owned();
1506 let trimmed = trimmed_owned.as_str();
1507
1508 if self.services.security.vigil.is_some() {
1511 let intent_len = trimmed.floor_char_boundary(1024.min(trimmed.len()));
1512 self.services.session.current_turn_intent = Some(trimmed[..intent_len].to_owned());
1513 }
1514
1515 if let Some(result) = self.dispatch_slash_command(trimmed).await {
1516 return result;
1517 }
1518
1519 self.check_pending_rollbacks().await;
1520
1521 if self.pre_process_security(trimmed).await? {
1522 return Ok(());
1523 }
1524
1525 let t_ctx = std::time::Instant::now();
1526 tracing::debug!("turn timing: prepare_context start");
1527 self.advance_context_lifecycle_guarded(&text, trimmed).await;
1528 turn.metrics_mut().timings.prepare_context_ms =
1529 u64::try_from(t_ctx.elapsed().as_millis()).unwrap_or(u64::MAX);
1530 tracing::debug!(
1531 ms = turn.metrics_snapshot().timings.prepare_context_ms,
1532 "turn timing: prepare_context done"
1533 );
1534
1535 let image_parts = std::mem::take(&mut turn.input.image_parts);
1536 let merged_text = self.build_user_message_text_with_bg_completions(&text);
1540 let user_msg = self.build_user_message(&merged_text, image_parts);
1541
1542 let urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
1545 if !urls.is_empty() {
1546 self.services
1547 .security
1548 .user_provided_urls
1549 .write()
1550 .extend(urls);
1551 }
1552
1553 self.services.memory.extraction.goal_text = Some(text.clone());
1556
1557 let t_persist = std::time::Instant::now();
1558 tracing::debug!("turn timing: persist_message(user) start");
1559 self.persist_message(Role::User, &text, &[], false).await;
1561 turn.metrics_mut().timings.persist_message_ms =
1562 u64::try_from(t_persist.elapsed().as_millis()).unwrap_or(u64::MAX);
1563 tracing::debug!(
1564 ms = turn.metrics_snapshot().timings.persist_message_ms,
1565 "turn timing: persist_message(user) done"
1566 );
1567 self.push_message(user_msg);
1568
1569 tracing::debug!("turn timing: process_response start");
1572 let turn_had_error = if let Err(e) = self.process_response().await {
1573 self.services.learning_engine.learning_tasks.detach_all();
1575 tracing::error!("Response processing failed: {e:#}");
1576
1577 if e.is_no_providers() {
1580 self.runtime.lifecycle.last_no_providers_at = Some(std::time::Instant::now());
1581 let backoff_secs = self.runtime.config.timeouts.no_providers_backoff_secs;
1582 tracing::warn!(
1583 backoff_secs,
1584 "no providers available; backing off before next turn"
1585 );
1586 tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await;
1587 }
1588
1589 let user_msg = format!("Error: {e:#}");
1590 self.channel.send(&user_msg).await?;
1591 self.msg.messages.pop();
1592 self.recompute_prompt_tokens();
1593 self.channel.flush_chunks().await?;
1594 true
1595 } else {
1596 self.services.learning_engine.learning_tasks.detach_all();
1599 self.truncate_old_tool_results();
1600 self.maybe_update_magic_docs();
1602 self.maybe_spawn_promotion_scan();
1604 false
1605 };
1606 tracing::debug!("turn timing: process_response done");
1607
1608 #[cfg(feature = "self-check")]
1610 if let Some(pipeline) = self.services.quality.clone() {
1611 self.run_self_check_for_turn(pipeline, turn.id().0).await;
1612 }
1613 let _ = self.channel.flush_chunks().await;
1618
1619 self.maybe_fire_completion_notification(turn, turn_had_error);
1620
1621 self.flush_goal_accounting(tokens_before_turn);
1622
1623 turn.metrics_mut().timings.llm_chat_ms = self.runtime.metrics.pending_timings.llm_chat_ms;
1628 turn.metrics_mut().timings.tool_exec_ms = self.runtime.metrics.pending_timings.tool_exec_ms;
1629
1630 Ok(())
1631 }
1632
1633 fn wire_cancel_bridge(&mut self, turn_token: &tokio_util::sync::CancellationToken) {
1639 let signal = Arc::clone(&self.runtime.lifecycle.cancel_signal);
1640 let token = turn_token.clone();
1641 self.runtime.lifecycle.cancel_token = turn_token.clone();
1643 if let Some(prev) = self.runtime.lifecycle.cancel_bridge_handle.take() {
1644 prev.abort();
1645 }
1646 self.runtime.lifecycle.cancel_bridge_handle =
1647 Some(self.runtime.lifecycle.task_supervisor.spawn_oneshot(
1648 std::sync::Arc::from("agent.lifecycle.cancel_bridge"),
1649 move || async move {
1650 signal.notified().await;
1651 token.cancel();
1652 },
1653 ));
1654 }
1655
1656 fn reap_background_tasks_and_update_metrics(&mut self) {
1660 let bg_signal = self.runtime.lifecycle.supervisor.reap();
1661 if bg_signal.did_summarize {
1662 self.services.memory.persistence.unsummarized_count = 0;
1663 tracing::debug!("background summarization completed; unsummarized_count reset");
1664 }
1665 let snap = self.runtime.lifecycle.supervisor.metrics_snapshot();
1666 self.update_metrics(|m| {
1667 m.bg_inflight = snap.inflight as u64;
1668 m.bg_dropped = snap.total_dropped();
1669 m.bg_completed = snap.total_completed();
1670 m.bg_enrichment_inflight = snap.class_inflight[0] as u64;
1671 m.bg_telemetry_inflight = snap.class_inflight[1] as u64;
1672 });
1673
1674 if self.runtime.lifecycle.shell_executor_handle.is_some() {
1676 let shell_rows: Vec<crate::metrics::ShellBackgroundRunRow> = self
1677 .runtime
1678 .lifecycle
1679 .shell_executor_handle
1680 .as_ref()
1681 .map(|e| e.background_runs_snapshot())
1682 .unwrap_or_default()
1683 .into_iter()
1684 .map(|s| crate::metrics::ShellBackgroundRunRow {
1685 run_id: truncate_shell_run_id(&s.run_id),
1686 command: truncate_shell_command(&s.command),
1687 elapsed_secs: s.elapsed_ms / 1000,
1688 })
1689 .collect();
1690 self.update_metrics(|m| {
1691 m.shell_background_runs = shell_rows;
1692 });
1693 }
1694
1695 if self
1698 .runtime
1699 .config
1700 .supervisor_config
1701 .abort_enrichment_on_turn
1702 {
1703 self.runtime
1704 .lifecycle
1705 .supervisor
1706 .abort_class(agent_supervisor::TaskClass::Enrichment);
1707 }
1708 }
1709
1710 fn maybe_fire_completion_notification(&mut self, turn: &turn::Turn, is_error: bool) {
1723 let snap = turn.metrics_snapshot().timings.clone();
1724 let duration_ms = snap
1725 .prepare_context_ms
1726 .saturating_add(snap.llm_chat_ms)
1727 .saturating_add(snap.tool_exec_ms);
1728 let summary = crate::notifications::TurnSummary {
1729 duration_ms,
1730 preview: self.last_assistant_preview(160),
1731 tool_calls: 0,
1733 llm_requests: self.runtime.lifecycle.turn_llm_requests,
1734 exit_status: if is_error {
1735 crate::notifications::TurnExitStatus::Error
1736 } else {
1737 crate::notifications::TurnExitStatus::Success
1738 },
1739 };
1740
1741 let gate_ok = self
1743 .runtime
1744 .lifecycle
1745 .notifier
1746 .as_ref()
1747 .is_none_or(|n| n.should_fire(&summary));
1748
1749 if let Some(ref notifier) = self.runtime.lifecycle.notifier
1751 && gate_ok
1752 {
1753 notifier.fire(&summary);
1754 }
1755
1756 let hooks = self.services.session.hooks_config.turn_complete.clone();
1762 if !hooks.is_empty() && gate_ok {
1763 let mut env = std::collections::HashMap::new();
1764 env.insert(
1765 "ZEPH_TURN_DURATION_MS".to_owned(),
1766 summary.duration_ms.to_string(),
1767 );
1768 env.insert(
1769 "ZEPH_TURN_STATUS".to_owned(),
1770 if is_error { "error" } else { "success" }.to_owned(),
1771 );
1772 env.insert("ZEPH_TURN_PREVIEW".to_owned(), summary.preview.clone());
1773 env.insert(
1774 "ZEPH_TURN_LLM_REQUESTS".to_owned(),
1775 summary.llm_requests.to_string(),
1776 );
1777 let _span = tracing::info_span!("core.agent.turn_hooks").entered();
1778 let _accepted = self.runtime.lifecycle.supervisor.spawn(
1779 agent_supervisor::TaskClass::Telemetry,
1780 "turn-complete-hooks",
1781 async move {
1782 let no_mcp: Option<&'static dyn zeph_subagent::McpDispatch> = None;
1785 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, no_mcp).await {
1786 tracing::warn!(error = %e, "turn_complete hook failed");
1787 }
1788 },
1789 );
1790 }
1791 }
1792
1793 fn flush_goal_accounting(&mut self, tokens_before: u64) {
1796 let goal_snap = self
1797 .services
1798 .goal_accounting
1799 .as_ref()
1800 .and_then(|a| a.snapshot());
1801 self.update_metrics(|m| m.active_goal = goal_snap);
1802
1803 if let Some(ref accounting) = self.services.goal_accounting {
1804 let tokens_after = self
1805 .runtime
1806 .metrics
1807 .metrics_tx
1808 .as_ref()
1809 .map_or(0, |tx| tx.borrow().total_tokens);
1810 let turn_tokens = tokens_after.saturating_sub(tokens_before);
1811 let mut spawned: Option<
1812 std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + 'static>>,
1813 > = None;
1814 accounting.on_turn_complete(turn_tokens, |fut| {
1815 spawned = Some(fut);
1816 });
1817 if let Some(fut) = spawned {
1818 let _ = self.runtime.lifecycle.supervisor.spawn(
1819 agent_supervisor::TaskClass::Telemetry,
1820 "goal-accounting",
1821 fut,
1822 );
1823 }
1824 }
1825 }
1826
1827 #[cfg_attr(
1829 feature = "profiling",
1830 tracing::instrument(name = "agent.security_prescreen", skip_all)
1831 )]
1832 async fn pre_process_security(&mut self, trimmed: &str) -> Result<bool, error::AgentError> {
1833 if let Some(ref guardrail) = self.services.security.guardrail {
1835 use zeph_sanitizer::guardrail::GuardrailVerdict;
1836 let verdict = guardrail.check(trimmed).await;
1837 match &verdict {
1838 GuardrailVerdict::Flagged { reason, .. } => {
1839 tracing::warn!(
1840 reason = %reason,
1841 should_block = verdict.should_block(),
1842 "guardrail flagged user input"
1843 );
1844 if verdict.should_block() {
1845 let msg = format!("[guardrail] Input blocked: {reason}");
1846 let _ = self.channel.send(&msg).await;
1847 let _ = self.channel.flush_chunks().await;
1848 return Ok(true);
1849 }
1850 let _ = self
1852 .channel
1853 .send(&format!("[guardrail] Warning: {reason}"))
1854 .await;
1855 }
1856 GuardrailVerdict::Error { error } => {
1857 if guardrail.error_should_block() {
1858 tracing::warn!(%error, "guardrail check failed (fail_strategy=closed), blocking input");
1859 let msg = "[guardrail] Input blocked: check failed (see logs for details)";
1860 let _ = self.channel.send(msg).await;
1861 let _ = self.channel.flush_chunks().await;
1862 return Ok(true);
1863 }
1864 tracing::warn!(%error, "guardrail check failed (fail_strategy=open), allowing input");
1865 }
1866 GuardrailVerdict::Safe => {}
1867 }
1868 }
1869
1870 #[cfg(feature = "classifiers")]
1876 if self.services.security.sanitizer.scan_user_input() {
1877 match self
1878 .services
1879 .security
1880 .sanitizer
1881 .classify_injection(trimmed)
1882 .await
1883 {
1884 zeph_sanitizer::InjectionVerdict::Blocked => {
1885 self.push_classifier_metrics();
1886 let _ = self
1887 .channel
1888 .send("[security] Input blocked: injection detected by classifier.")
1889 .await;
1890 let _ = self.channel.flush_chunks().await;
1891 return Ok(true);
1892 }
1893 zeph_sanitizer::InjectionVerdict::Suspicious => {
1894 tracing::warn!("injection_classifier soft_signal on user input");
1895 }
1896 zeph_sanitizer::InjectionVerdict::Clean => {}
1897 }
1898 }
1899 #[cfg(feature = "classifiers")]
1900 self.push_classifier_metrics();
1901
1902 Ok(false)
1903 }
1904
1905 async fn advance_context_lifecycle_guarded(&mut self, text: &str, trimmed: &str) {
1912 let backoff_secs = self.runtime.config.timeouts.no_providers_backoff_secs;
1913 let prep_timeout_secs = self.runtime.config.timeouts.context_prep_timeout_secs;
1914
1915 let providers_recently_failed = self
1917 .runtime
1918 .lifecycle
1919 .last_no_providers_at
1920 .is_some_and(|t| t.elapsed().as_secs() < backoff_secs);
1921
1922 if providers_recently_failed {
1923 tracing::warn!(
1924 backoff_secs,
1925 "skipping context preparation: providers were unavailable on last turn"
1926 );
1927 return;
1928 }
1929
1930 let timeout_dur = std::time::Duration::from_secs(prep_timeout_secs);
1931 match tokio::time::timeout(timeout_dur, self.advance_context_lifecycle(text, trimmed)).await
1932 {
1933 Ok(()) => {}
1934 Err(_elapsed) => {
1935 tracing::warn!(
1936 timeout_secs = prep_timeout_secs,
1937 "context preparation timed out; proceeding with degraded context"
1938 );
1939 }
1940 }
1941 }
1942
1943 #[cfg_attr(
1944 feature = "profiling",
1945 tracing::instrument(name = "agent.prepare_context", skip_all)
1946 )]
1947 async fn advance_context_lifecycle(&mut self, text: &str, trimmed: &str) {
1948 self.services.mcp.pruning_cache.reset();
1950
1951 let conv_id = self.services.memory.persistence.conversation_id;
1954 self.rebuild_system_prompt(text).await;
1955
1956 self.detect_and_record_corrections(trimmed, conv_id).await;
1957 self.services.learning_engine.tick();
1958 self.analyze_and_learn().await;
1959 self.sync_graph_counts().await;
1960
1961 self.context_manager.compaction = self.context_manager.compaction.advance_turn();
1966
1967 {
1969 self.services.focus.tick();
1970
1971 let sidequest_should_fire = self.services.sidequest.tick();
1974 if sidequest_should_fire && !self.context_manager.compaction.is_compacted_this_turn() {
1975 self.maybe_sidequest_eviction();
1976 }
1977 }
1978
1979 {
1982 let cfg = &self.services.memory.extraction.graph_config.experience;
1983 if cfg.enabled
1984 && cfg.evolution_sweep_enabled
1985 && cfg.evolution_sweep_interval > 0
1986 && self
1987 .services
1988 .sidequest
1989 .turn_counter
1990 .checked_rem(cfg.evolution_sweep_interval as u64)
1991 == Some(0)
1992 && let Some(memory) = self.services.memory.persistence.memory.as_ref()
1993 && let (Some(exp), Some(graph)) =
1994 (memory.experience.as_ref(), memory.graph_store.as_ref())
1995 {
1996 let exp = std::sync::Arc::clone(exp);
1997 let graph = std::sync::Arc::clone(graph);
1998 let threshold = cfg.confidence_prune_threshold;
1999 let turn = self.services.sidequest.turn_counter;
2000 let accepted = self.runtime.lifecycle.supervisor.spawn(
2001 agent_supervisor::TaskClass::Telemetry,
2002 "experience-sweep",
2003 async move {
2004 match exp.evolution_sweep(graph.as_ref(), threshold).await {
2005 Ok(stats) => tracing::info!(
2006 turn,
2007 self_loops = stats.pruned_self_loops,
2008 low_confidence = stats.pruned_low_confidence,
2009 "evolution sweep complete",
2010 ),
2011 Err(e) => tracing::warn!(
2012 turn,
2013 error = %e,
2014 "evolution sweep failed",
2015 ),
2016 }
2017 },
2018 );
2019 if !accepted {
2020 tracing::warn!(
2021 turn = self.services.sidequest.turn_counter,
2022 "experience-sweep dropped (telemetry class at capacity)",
2023 );
2024 }
2025 }
2026 }
2027
2028 if let Some(warning) = self.cache_expiry_warning() {
2030 tracing::info!(warning, "cache expiry warning");
2031 let _ = self.channel.send_status(&warning).await;
2032 }
2033
2034 self.maybe_time_based_microcompact();
2037
2038 self.maybe_apply_deferred_summaries();
2043 self.flush_deferred_summaries().await;
2044
2045 if let Err(e) = self.maybe_proactive_compress().await {
2047 tracing::warn!("proactive compression failed: {e:#}");
2048 }
2049
2050 if let Err(e) = self.maybe_compact().await {
2051 tracing::warn!("context compaction failed: {e:#}");
2052 }
2053
2054 if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
2055 tracing::warn!("context preparation failed: {e:#}");
2056 }
2057
2058 self.provider
2060 .set_memory_confidence(self.services.memory.persistence.last_recall_confidence);
2061
2062 self.services.learning_engine.reset_reflection();
2063 }
2064
2065 fn build_user_message(
2066 &mut self,
2067 text: &str,
2068 image_parts: Vec<zeph_llm::provider::MessagePart>,
2069 ) -> Message {
2070 let mut all_image_parts = std::mem::take(&mut self.msg.pending_image_parts);
2071 all_image_parts.extend(image_parts);
2072
2073 if !all_image_parts.is_empty() && self.provider.supports_vision() {
2074 let mut parts = vec![zeph_llm::provider::MessagePart::Text {
2075 text: text.to_owned(),
2076 }];
2077 parts.extend(all_image_parts);
2078 Message::from_parts(Role::User, parts)
2079 } else {
2080 if !all_image_parts.is_empty() {
2081 tracing::warn!(
2082 count = all_image_parts.len(),
2083 "image attachments dropped: provider does not support vision"
2084 );
2085 }
2086 Message {
2087 role: Role::User,
2088 content: text.to_owned(),
2089 parts: vec![],
2090 metadata: MessageMetadata::default(),
2091 }
2092 }
2093 }
2094
2095 fn drain_background_completions(&mut self) {
2099 const BACKGROUND_COMPLETION_BUFFER_CAP: usize = 16;
2100
2101 let Some(ref mut rx) = self.runtime.lifecycle.background_completion_rx else {
2102 return;
2103 };
2104 while let Ok(completion) = rx.try_recv() {
2106 if self.runtime.lifecycle.pending_background_completions.len()
2107 >= BACKGROUND_COMPLETION_BUFFER_CAP
2108 {
2109 tracing::warn!(
2110 run_id = %completion.run_id,
2111 "background completion buffer full; dropping run result"
2112 );
2113 self.runtime
2116 .lifecycle
2117 .pending_background_completions
2118 .pop_front();
2119 self.runtime
2120 .lifecycle
2121 .pending_background_completions
2122 .push_back(zeph_tools::BackgroundCompletion {
2123 run_id: completion.run_id,
2124 exit_code: -1,
2125 success: false,
2126 elapsed_ms: 0,
2127 command: completion.command,
2128 output: format!(
2129 "[background result for run {} dropped: buffer overflow]",
2130 completion.run_id
2131 ),
2132 });
2133 } else {
2134 self.runtime
2135 .lifecycle
2136 .pending_background_completions
2137 .push_back(completion);
2138 }
2139 }
2140 }
2141
2142 fn build_user_message_text_with_bg_completions(&mut self, user_text: &str) -> String {
2146 if self
2147 .runtime
2148 .lifecycle
2149 .pending_background_completions
2150 .is_empty()
2151 {
2152 return user_text.to_owned();
2153 }
2154 let mut parts = String::new();
2155 for completion in self
2156 .runtime
2157 .lifecycle
2158 .pending_background_completions
2159 .drain(..)
2160 {
2161 let _ = write!(
2162 parts,
2163 "[Background task {} completed]\nexit_code: {}\nsuccess: {}\nelapsed_ms: {}\ncommand: {}\n\n{}\n\n",
2164 completion.run_id,
2165 completion.exit_code,
2166 completion.success,
2167 completion.elapsed_ms,
2168 completion.command,
2169 completion.output,
2170 );
2171 }
2172 parts.push_str(user_text);
2173 parts
2174 }
2175
2176 async fn poll_subagent_until_done(&mut self, task_id: &str, label: &str) -> Option<String> {
2179 use zeph_subagent::SubAgentState;
2180 let result = loop {
2181 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
2182
2183 #[allow(clippy::redundant_closure_for_method_calls)]
2187 let pending = self
2188 .services
2189 .orchestration
2190 .subagent_manager
2191 .as_mut()
2192 .and_then(|m| m.try_recv_secret_request());
2193 if let Some((req_task_id, req)) = pending {
2194 let confirm_prompt = format!(
2197 "Sub-agent requests secret '{}'. Allow?",
2198 crate::text::truncate_to_chars(&req.secret_key, 100)
2199 );
2200 let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
2201 if let Some(mgr) = self.services.orchestration.subagent_manager.as_mut() {
2202 if approved {
2203 let ttl = std::time::Duration::from_mins(5);
2204 let key = req.secret_key.clone();
2205 if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
2206 let _ = mgr.deliver_secret(&req_task_id, key);
2207 }
2208 } else {
2209 let _ = mgr.deny_secret(&req_task_id);
2210 }
2211 }
2212 }
2213
2214 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2215 let statuses = mgr.statuses();
2216 let Some((_, status)) = statuses.iter().find(|(id, _)| id == task_id) else {
2217 break format!("{label} completed (no status available).");
2218 };
2219 match status.state {
2220 SubAgentState::Completed => {
2221 let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
2222 break format!("{label} completed: {msg}");
2223 }
2224 SubAgentState::Failed => {
2225 let msg = status
2226 .last_message
2227 .clone()
2228 .unwrap_or_else(|| "unknown error".into());
2229 break format!("{label} failed: {msg}");
2230 }
2231 SubAgentState::Canceled => {
2232 break format!("{label} was cancelled.");
2233 }
2234 _ => {
2235 let _ = self
2236 .channel
2237 .send_status(&format!(
2238 "{label}: turn {}/{}",
2239 status.turns_used,
2240 self.services
2241 .orchestration
2242 .subagent_manager
2243 .as_ref()
2244 .and_then(|m| m.agents_def(task_id))
2245 .map_or(20, |d| d.permissions.max_turns)
2246 ))
2247 .await;
2248 }
2249 }
2250 };
2251 Some(result)
2252 }
2253
2254 fn resolve_agent_id_prefix(&mut self, prefix: &str) -> Option<Result<String, String>> {
2257 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2258 let full_ids: Vec<String> = mgr
2259 .statuses()
2260 .into_iter()
2261 .map(|(tid, _)| tid)
2262 .filter(|tid| tid.starts_with(prefix))
2263 .collect();
2264 Some(match full_ids.as_slice() {
2265 [] => Err(format!("No sub-agent with id prefix '{prefix}'")),
2266 [fid] => Ok(fid.clone()),
2267 _ => Err(format!(
2268 "Ambiguous id prefix '{prefix}': matches {} agents",
2269 full_ids.len()
2270 )),
2271 })
2272 }
2273
2274 fn handle_agent_list(&self) -> Option<String> {
2275 use std::fmt::Write as _;
2276 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2277 let defs = mgr.definitions();
2278 if defs.is_empty() {
2279 return Some("No sub-agent definitions found.".into());
2280 }
2281 let mut out = String::from("Available sub-agents:\n");
2282 for d in defs {
2283 let memory_label = match d.memory {
2284 Some(zeph_subagent::MemoryScope::User) => " [memory:user]",
2285 Some(zeph_subagent::MemoryScope::Project) => " [memory:project]",
2286 Some(zeph_subagent::MemoryScope::Local) => " [memory:local]",
2287 None => "",
2288 };
2289 if let Some(ref src) = d.source {
2290 let _ = writeln!(
2291 out,
2292 " {}{} — {} ({})",
2293 d.name, memory_label, d.description, src
2294 );
2295 } else {
2296 let _ = writeln!(out, " {}{} — {}", d.name, memory_label, d.description);
2297 }
2298 }
2299 Some(out)
2300 }
2301
2302 fn handle_agent_status(&self) -> Option<String> {
2303 use std::fmt::Write as _;
2304 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2305 let statuses = mgr.statuses();
2306 if statuses.is_empty() {
2307 return Some("No active sub-agents.".into());
2308 }
2309 let mut out = String::from("Active sub-agents:\n");
2310 for (id, s) in &statuses {
2311 let state = format!("{:?}", s.state).to_lowercase();
2312 let elapsed = s.started_at.elapsed().as_secs();
2313 let _ = writeln!(
2314 out,
2315 " [{short}] {state} turns={t} elapsed={elapsed}s {msg}",
2316 short = &id[..8.min(id.len())],
2317 t = s.turns_used,
2318 msg = s.last_message.as_deref().unwrap_or(""),
2319 );
2320 if let Some(def) = mgr.agents_def(id)
2322 && let Some(scope) = def.memory
2323 && let Ok(dir) = zeph_subagent::memory::resolve_memory_dir(scope, &def.name)
2324 {
2325 let _ = writeln!(out, " memory: {}", dir.display());
2326 }
2327 }
2328 Some(out)
2329 }
2330
2331 fn handle_agent_approve(&mut self, id: &str) -> Option<String> {
2332 let full_id = match self.resolve_agent_id_prefix(id)? {
2333 Ok(fid) => fid,
2334 Err(msg) => return Some(msg),
2335 };
2336 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2337 if let Some((tid, req)) = mgr.try_recv_secret_request()
2338 && tid == full_id
2339 {
2340 let key = req.secret_key.clone();
2341 let ttl = std::time::Duration::from_mins(5);
2342 if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
2343 return Some(format!("Approve failed: {e}"));
2344 }
2345 if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
2346 return Some(format!("Secret delivery failed: {e}"));
2347 }
2348 return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
2349 }
2350 Some(format!(
2351 "No pending secret request for sub-agent '{full_id}'."
2352 ))
2353 }
2354
2355 fn handle_agent_deny(&mut self, id: &str) -> Option<String> {
2356 let full_id = match self.resolve_agent_id_prefix(id)? {
2357 Ok(fid) => fid,
2358 Err(msg) => return Some(msg),
2359 };
2360 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2361 match mgr.deny_secret(&full_id) {
2362 Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
2363 Err(e) => Some(format!("Deny failed: {e}")),
2364 }
2365 }
2366
2367 async fn handle_agent_command(&mut self, cmd: zeph_subagent::AgentCommand) -> Option<String> {
2368 use zeph_subagent::AgentCommand;
2369
2370 match cmd {
2371 AgentCommand::List => self.handle_agent_list(),
2372 AgentCommand::Background { name, prompt } => {
2373 self.handle_agent_background(&name, &prompt)
2374 }
2375 AgentCommand::Spawn { name, prompt }
2376 | AgentCommand::Mention {
2377 agent: name,
2378 prompt,
2379 } => self.handle_agent_spawn_foreground(&name, &prompt).await,
2380 AgentCommand::Status => self.handle_agent_status(),
2381 AgentCommand::Cancel { id } => self.handle_agent_cancel(&id),
2382 AgentCommand::Approve { id } => self.handle_agent_approve(&id),
2383 AgentCommand::Deny { id } => self.handle_agent_deny(&id),
2384 AgentCommand::Resume { id, prompt } => self.handle_agent_resume(&id, &prompt).await,
2385 }
2386 }
2387
2388 fn handle_agent_background(&mut self, name: &str, prompt: &str) -> Option<String> {
2389 let provider = self.provider.clone();
2390 let tool_executor = Arc::clone(&self.tool_executor);
2391 let skills = self.filtered_skills_for(name);
2392 let cfg = self.services.orchestration.subagent_config.clone();
2393 let spawn_ctx = self.build_spawn_context(&cfg);
2394 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2395 match mgr.spawn(
2396 name,
2397 prompt,
2398 provider,
2399 tool_executor,
2400 skills,
2401 &cfg,
2402 spawn_ctx,
2403 ) {
2404 Ok(id) => Some(format!(
2405 "Sub-agent '{name}' started in background (id: {short})",
2406 short = &id[..8.min(id.len())]
2407 )),
2408 Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
2409 }
2410 }
2411
2412 async fn handle_agent_spawn_foreground(&mut self, name: &str, prompt: &str) -> Option<String> {
2413 let provider = self.provider.clone();
2414 let tool_executor = Arc::clone(&self.tool_executor);
2415 let skills = self.filtered_skills_for(name);
2416 let cfg = self.services.orchestration.subagent_config.clone();
2417 let spawn_ctx = self.build_spawn_context(&cfg);
2418 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2419 let task_id = match mgr.spawn(
2420 name,
2421 prompt,
2422 provider,
2423 tool_executor,
2424 skills,
2425 &cfg,
2426 spawn_ctx,
2427 ) {
2428 Ok(id) => id,
2429 Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
2430 };
2431 let short = task_id[..8.min(task_id.len())].to_owned();
2432 let _ = self
2433 .channel
2434 .send(&format!("Sub-agent '{name}' running... (id: {short})"))
2435 .await;
2436 let label = format!("Sub-agent '{name}'");
2437 self.poll_subagent_until_done(&task_id, &label).await
2438 }
2439
2440 fn handle_agent_cancel(&mut self, id: &str) -> Option<String> {
2441 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2442 let ids: Vec<String> = mgr
2444 .statuses()
2445 .into_iter()
2446 .map(|(task_id, _)| task_id)
2447 .filter(|task_id| task_id.starts_with(id))
2448 .collect();
2449 match ids.as_slice() {
2450 [] => Some(format!("No sub-agent with id prefix '{id}'")),
2451 [full_id] => {
2452 let full_id = full_id.clone();
2453 match mgr.cancel(&full_id) {
2454 Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
2455 Err(e) => Some(format!("Cancel failed: {e}")),
2456 }
2457 }
2458 _ => Some(format!(
2459 "Ambiguous id prefix '{id}': matches {} agents",
2460 ids.len()
2461 )),
2462 }
2463 }
2464
2465 async fn handle_agent_resume(&mut self, id: &str, prompt: &str) -> Option<String> {
2466 let cfg = self.services.orchestration.subagent_config.clone();
2467 let def_name = {
2470 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2471 match mgr.def_name_for_resume(id, &cfg) {
2472 Ok(name) => name,
2473 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2474 }
2475 };
2476 let skills = self.filtered_skills_for(&def_name);
2477 let provider = self.provider.clone();
2478 let tool_executor = Arc::clone(&self.tool_executor);
2479 let mgr = self.services.orchestration.subagent_manager.as_mut()?;
2480 let (task_id, _) = match mgr.resume(id, prompt, provider, tool_executor, skills, &cfg) {
2481 Ok(pair) => pair,
2482 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
2483 };
2484 let short = task_id[..8.min(task_id.len())].to_owned();
2485 let _ = self
2486 .channel
2487 .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
2488 .await;
2489 self.poll_subagent_until_done(&task_id, "Resumed sub-agent")
2490 .await
2491 }
2492
2493 fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
2494 let mgr = self.services.orchestration.subagent_manager.as_ref()?;
2495 let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
2496 let reg = self.services.skill.registry.read();
2497 match zeph_subagent::filter_skills(®, &def.skills) {
2498 Ok(skills) => {
2499 let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
2500 if bodies.is_empty() {
2501 None
2502 } else {
2503 Some(bodies)
2504 }
2505 }
2506 Err(e) => {
2507 tracing::warn!(error = %e, "skill filtering failed for sub-agent");
2508 None
2509 }
2510 }
2511 }
2512
2513 fn build_spawn_context(
2515 &self,
2516 cfg: &zeph_config::SubAgentConfig,
2517 ) -> zeph_subagent::SpawnContext {
2518 zeph_subagent::SpawnContext {
2519 parent_messages: self.extract_parent_messages(cfg),
2520 parent_cancel: Some(self.runtime.lifecycle.cancel_token.clone()),
2521 parent_provider_name: {
2522 let name = &self.runtime.config.active_provider_name;
2523 if name.is_empty() {
2524 None
2525 } else {
2526 Some(name.clone())
2527 }
2528 },
2529 spawn_depth: self.runtime.config.spawn_depth,
2530 mcp_tool_names: self.extract_mcp_tool_names(),
2531 seed_trajectory_score: {
2533 let child = self.services.security.trajectory.spawn_child();
2534 let score = child.score_now();
2535 if score > 0.0 { Some(score) } else { None }
2536 },
2537 }
2538 }
2539
2540 fn extract_parent_messages(
2545 &self,
2546 config: &zeph_config::SubAgentConfig,
2547 ) -> Vec<zeph_llm::provider::Message> {
2548 use zeph_llm::provider::Role;
2549 if config.context_window_turns == 0 {
2550 return Vec::new();
2551 }
2552 let non_system: Vec<_> = self
2553 .msg
2554 .messages
2555 .iter()
2556 .filter(|m| m.role != Role::System)
2557 .cloned()
2558 .collect();
2559 let take_count = config.context_window_turns * 2;
2560 let start = non_system.len().saturating_sub(take_count);
2561 let mut msgs = non_system[start..].to_vec();
2562
2563 let max_chars = 128_000usize / 4; let mut total_chars: usize = 0;
2566 let mut keep = msgs.len();
2567 for (i, m) in msgs.iter().enumerate() {
2568 total_chars += m.content.len();
2569 if total_chars > max_chars {
2570 keep = i;
2571 break;
2572 }
2573 }
2574 if keep < msgs.len() {
2575 tracing::info!(
2576 kept = keep,
2577 requested = config.context_window_turns * 2,
2578 "[subagent] truncated parent history from {} to {} turns due to token budget",
2579 config.context_window_turns * 2,
2580 keep
2581 );
2582 msgs.truncate(keep);
2583 }
2584 msgs
2585 }
2586
2587 fn extract_mcp_tool_names(&self) -> Vec<String> {
2589 self.tool_executor
2590 .tool_definitions_erased()
2591 .into_iter()
2592 .filter(|t| t.id.starts_with("mcp_"))
2593 .map(|t| t.id.to_string())
2594 .collect()
2595 }
2596
2597 fn classify_source_kind(
2601 skill_dir: &std::path::Path,
2602 managed_dir: Option<&std::path::PathBuf>,
2603 bundled_names: &std::collections::HashSet<String>,
2604 ) -> zeph_memory::store::SourceKind {
2605 if managed_dir.is_some_and(|d| skill_dir.starts_with(d)) {
2606 let skill_name = skill_dir.file_name().and_then(|n| n.to_str()).unwrap_or("");
2607 let has_marker = skill_dir.join(".bundled").exists();
2608 if has_marker && bundled_names.contains(skill_name) {
2609 zeph_memory::store::SourceKind::Bundled
2610 } else {
2611 if has_marker {
2612 tracing::warn!(
2613 skill = %skill_name,
2614 "skill has .bundled marker but is not in the bundled skill \
2615 allowlist — classifying as Hub"
2616 );
2617 }
2618 zeph_memory::store::SourceKind::Hub
2619 }
2620 } else {
2621 zeph_memory::store::SourceKind::Local
2622 }
2623 }
2624
2625 async fn update_trust_for_reloaded_skills(
2627 &mut self,
2628 all_meta: &[zeph_skills::loader::SkillMeta],
2629 ) {
2630 let memory = self.services.memory.persistence.memory.clone();
2632 let Some(memory) = memory else {
2633 return;
2634 };
2635 let trust_cfg = self.services.skill.trust_config.clone();
2636 let managed_dir = self.services.skill.managed_dir.clone();
2637 let bundled_names: std::collections::HashSet<String> =
2638 zeph_skills::bundled_skill_names().into_iter().collect();
2639 for meta in all_meta {
2640 let skill_dir = meta.skill_dir.clone();
2643 let managed_dir_ref = managed_dir.clone();
2644 let bundled_names_ref = bundled_names.clone();
2645 let fs_result: Option<(String, zeph_memory::store::SourceKind)> =
2646 tokio::task::spawn_blocking(move || {
2647 let hash = zeph_skills::compute_skill_hash(&skill_dir).ok()?;
2648 let source_kind = Self::classify_source_kind(
2649 &skill_dir,
2650 managed_dir_ref.as_ref(),
2651 &bundled_names_ref,
2652 );
2653 Some((hash, source_kind))
2654 })
2655 .await
2656 .unwrap_or(None);
2657
2658 let Some((current_hash, source_kind)) = fs_result else {
2659 tracing::warn!("failed to compute hash for '{}'", meta.name);
2660 continue;
2661 };
2662 let initial_level = match source_kind {
2663 zeph_memory::store::SourceKind::Bundled => &trust_cfg.bundled_level,
2664 zeph_memory::store::SourceKind::Hub => &trust_cfg.default_level,
2665 zeph_memory::store::SourceKind::Local | zeph_memory::store::SourceKind::File => {
2666 &trust_cfg.local_level
2667 }
2668 };
2669 let existing = memory
2670 .sqlite()
2671 .load_skill_trust(&meta.name)
2672 .await
2673 .ok()
2674 .flatten();
2675 let trust_level_str = if let Some(ref row) = existing {
2676 if row.blake3_hash != current_hash {
2677 trust_cfg.hash_mismatch_level.to_string()
2678 } else if row.source_kind != source_kind {
2679 let stored = row
2683 .trust_level
2684 .parse::<zeph_common::SkillTrustLevel>()
2685 .unwrap_or_else(|_| {
2686 tracing::warn!(
2687 skill = %meta.name,
2688 raw = %row.trust_level,
2689 "unrecognised trust_level in DB, treating as quarantined"
2690 );
2691 zeph_common::SkillTrustLevel::Quarantined
2692 });
2693 if !stored.is_active() || stored.severity() <= initial_level.severity() {
2694 row.trust_level.clone()
2695 } else {
2696 initial_level.to_string()
2697 }
2698 } else {
2699 row.trust_level.clone()
2700 }
2701 } else {
2702 initial_level.to_string()
2703 };
2704 let source_path = meta.skill_dir.to_str();
2705 if let Err(e) = memory
2706 .sqlite()
2707 .upsert_skill_trust(
2708 &meta.name,
2709 &trust_level_str,
2710 source_kind,
2711 None,
2712 source_path,
2713 ¤t_hash,
2714 )
2715 .await
2716 {
2717 tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
2718 }
2719 }
2720 }
2721
2722 async fn rebuild_skill_matcher(&mut self, all_meta: &[&zeph_skills::loader::SkillMeta]) {
2724 let provider = self.embedding_provider.clone();
2725 let embed_timeout =
2726 std::time::Duration::from_secs(self.runtime.config.timeouts.embedding_seconds);
2727 let embed_fn = move |text: &str| -> zeph_skills::matcher::EmbedFuture {
2728 let owned = text.to_owned();
2729 let p = provider.clone();
2730 Box::pin(async move {
2731 if let Ok(result) = tokio::time::timeout(embed_timeout, p.embed(&owned)).await {
2732 result
2733 } else {
2734 tracing::warn!(
2735 timeout_secs = embed_timeout.as_secs(),
2736 "skill matcher: embedding timed out"
2737 );
2738 Err(zeph_llm::LlmError::Timeout)
2739 }
2740 })
2741 };
2742
2743 let needs_inmemory_rebuild = !self
2744 .services
2745 .skill
2746 .matcher
2747 .as_ref()
2748 .is_some_and(SkillMatcherBackend::is_qdrant);
2749
2750 if needs_inmemory_rebuild {
2751 self.services.skill.matcher = SkillMatcher::new(all_meta, embed_fn)
2752 .await
2753 .map(SkillMatcherBackend::InMemory);
2754 } else if let Some(ref mut backend) = self.services.skill.matcher {
2755 let _ = self.channel.send_status("syncing skill index...").await;
2756 let on_progress: Option<Box<dyn Fn(usize, usize) + Send>> =
2757 self.services.session.status_tx.clone().map(
2758 |tx| -> Box<dyn Fn(usize, usize) + Send> {
2759 Box::new(move |completed, total| {
2760 let msg = format!("Syncing skills: {completed}/{total}");
2761 let _ = tx.send(msg);
2762 })
2763 },
2764 );
2765 if let Err(e) = backend
2766 .sync(
2767 all_meta,
2768 &self.services.skill.embedding_model,
2769 embed_fn,
2770 on_progress,
2771 )
2772 .await
2773 {
2774 tracing::warn!("failed to sync skill embeddings: {e:#}");
2775 }
2776 }
2777
2778 if self.services.skill.hybrid_search {
2779 let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
2780 let _ = self.channel.send_status("rebuilding search index...").await;
2781 self.services.skill.rebuild_bm25(&descs);
2782 }
2783 }
2784
2785 #[cfg_attr(
2786 feature = "profiling",
2787 tracing::instrument(name = "skill.hot_reload", skip_all)
2788 )]
2789 async fn reload_skills(&mut self) {
2790 let old_fp = self.services.skill.fingerprint();
2791 let reload_paths = if let Some(ref supplier) = self.services.skill.plugin_dirs_supplier {
2792 let plugin_dirs = supplier();
2793 let mut paths = self.services.skill.skill_paths.clone();
2794 for dir in plugin_dirs {
2795 if !paths.contains(&dir) {
2796 paths.push(dir);
2797 }
2798 }
2799 paths
2800 } else {
2801 self.services.skill.skill_paths.clone()
2802 };
2803 self.services.skill.registry.write().reload(&reload_paths);
2804 if self.services.skill.fingerprint() == old_fp {
2805 return;
2806 }
2807 let _ = self.channel.send_status("reloading skills...").await;
2808
2809 let all_meta = self
2810 .services
2811 .skill
2812 .registry
2813 .read()
2814 .all_meta()
2815 .into_iter()
2816 .cloned()
2817 .collect::<Vec<_>>();
2818
2819 self.update_trust_for_reloaded_skills(&all_meta).await;
2820
2821 let all_meta_refs = all_meta.iter().collect::<Vec<_>>();
2822 self.rebuild_skill_matcher(&all_meta_refs).await;
2823
2824 let all_skills: Vec<Skill> = {
2825 let reg = self.services.skill.registry.read();
2826 reg.all_meta()
2827 .iter()
2828 .filter_map(|m| reg.skill(&m.name).ok())
2829 .collect()
2830 };
2831 let trust_map = self.build_skill_trust_map().await;
2832 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
2833 let skills_prompt =
2834 state::SkillState::rebuild_prompt(&all_skills, &trust_map, &empty_health);
2835 self.services
2836 .skill
2837 .last_skills_prompt
2838 .clone_from(&skills_prompt);
2839 let system_prompt = build_system_prompt(&skills_prompt, None);
2840 if let Some(msg) = self.msg.messages.first_mut() {
2841 msg.content = system_prompt;
2842 }
2843
2844 let _ = self.channel.send_status("").await;
2845 tracing::info!(
2846 "reloaded {} skill(s)",
2847 self.services.skill.registry.read().all_meta().len()
2848 );
2849 }
2850
2851 fn reload_instructions(&mut self) {
2852 if let Some(ref mut rx) = self.runtime.instructions.reload_rx {
2854 while rx.try_recv().is_ok() {}
2855 }
2856 let Some(ref state) = self.runtime.instructions.reload_state else {
2857 return;
2858 };
2859 let new_blocks = crate::instructions::load_instructions(
2860 &state.base_dir,
2861 &state.provider_kinds,
2862 &state.explicit_files,
2863 state.auto_detect,
2864 );
2865 let old_sources: std::collections::HashSet<_> = self
2866 .runtime
2867 .instructions
2868 .blocks
2869 .iter()
2870 .map(|b| &b.source)
2871 .collect();
2872 let new_sources: std::collections::HashSet<_> =
2873 new_blocks.iter().map(|b| &b.source).collect();
2874 for added in new_sources.difference(&old_sources) {
2875 tracing::info!(path = %added.display(), "instruction file added");
2876 }
2877 for removed in old_sources.difference(&new_sources) {
2878 tracing::info!(path = %removed.display(), "instruction file removed");
2879 }
2880 tracing::info!(
2881 old_count = self.runtime.instructions.blocks.len(),
2882 new_count = new_blocks.len(),
2883 "reloaded instruction files"
2884 );
2885 self.runtime.instructions.blocks = new_blocks;
2886 }
2887
2888 fn reload_config(&mut self) {
2889 let Some(path) = self.runtime.lifecycle.config_path.clone() else {
2890 return;
2891 };
2892 let Some(config) = self.load_config_with_overlay(&path) else {
2893 return;
2894 };
2895 let budget_tokens = resolve_context_budget(&config, &self.provider);
2896 self.runtime.config.security = config.security;
2897 self.runtime.config.timeouts = config.timeouts;
2898 self.runtime.config.redact_credentials = config.memory.redact_credentials;
2899 self.services.memory.persistence.history_limit = config.memory.history_limit;
2900 self.services.memory.persistence.recall_limit = config.memory.semantic.recall_limit;
2901 self.services.memory.compaction.summarization_threshold =
2902 config.memory.summarization_threshold;
2903 self.services.skill.max_active_skills = config.skills.max_active_skills.get();
2904 self.services.skill.disambiguation_threshold = config.skills.disambiguation_threshold;
2905 self.services.skill.min_injection_score = config.skills.min_injection_score;
2906 self.services.skill.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
2907 self.services.skill.hybrid_search = config.skills.hybrid_search;
2908 self.services.skill.two_stage_matching = config.skills.two_stage_matching;
2909 self.services.skill.confusability_threshold =
2910 config.skills.confusability_threshold.clamp(0.0, 1.0);
2911 config
2912 .skills
2913 .generation_provider
2914 .as_str()
2915 .clone_into(&mut self.services.skill.generation_provider_name);
2916 self.services.skill.generation_output_dir =
2917 config.skills.generation_output_dir.as_deref().map(|p| {
2918 if let Some(stripped) = p.strip_prefix("~/") {
2919 dirs::home_dir()
2920 .map_or_else(|| std::path::PathBuf::from(p), |h| h.join(stripped))
2921 } else {
2922 std::path::PathBuf::from(p)
2923 }
2924 });
2925
2926 self.context_manager.budget = Some(
2927 ContextBudget::new(budget_tokens, 0.20).with_graph_enabled(config.memory.graph.enabled),
2928 );
2929
2930 {
2931 let graph_cfg = &config.memory.graph;
2932 if graph_cfg.rpe.enabled {
2933 if self.services.memory.extraction.rpe_router.is_none() {
2935 self.services.memory.extraction.rpe_router =
2936 Some(std::sync::Mutex::new(zeph_memory::RpeRouter::new(
2937 graph_cfg.rpe.threshold,
2938 graph_cfg.rpe.max_skip_turns,
2939 )));
2940 }
2941 } else {
2942 self.services.memory.extraction.rpe_router = None;
2943 }
2944 self.services.memory.extraction.graph_config = graph_cfg.clone();
2945 }
2946 self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
2947 self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
2948 self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
2949 self.context_manager.compaction_cooldown_turns = config.memory.compaction_cooldown_turns;
2950 self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
2951 self.context_manager.compression = config.memory.compression.clone();
2952 self.context_manager.routing = config.memory.store_routing.clone();
2953 self.context_manager.store_routing_provider = if config
2955 .memory
2956 .store_routing
2957 .routing_classifier_provider
2958 .is_empty()
2959 {
2960 None
2961 } else {
2962 let resolved = self.resolve_background_provider(
2963 &config.memory.store_routing.routing_classifier_provider,
2964 );
2965 Some(std::sync::Arc::new(resolved))
2966 };
2967 self.services
2968 .memory
2969 .persistence
2970 .cross_session_score_threshold = config.memory.cross_session_score_threshold;
2971
2972 self.services.index.repo_map_tokens = config.index.repo_map_tokens;
2973 self.services.index.repo_map_ttl =
2974 std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
2975
2976 self.services
2977 .session
2978 .hooks_config
2979 .cwd_changed
2980 .clone_from(&config.hooks.cwd_changed);
2981 self.services
2982 .session
2983 .hooks_config
2984 .permission_denied
2985 .clone_from(&config.hooks.permission_denied);
2986 self.services
2987 .session
2988 .hooks_config
2989 .turn_complete
2990 .clone_from(&config.hooks.turn_complete);
2991 tracing::info!("config reloaded");
2994 }
2995
2996 fn load_config_with_overlay(&mut self, path: &std::path::Path) -> Option<Config> {
3000 let mut config = match Config::load(path) {
3001 Ok(c) => c,
3002 Err(e) => {
3003 tracing::warn!("config reload failed: {e:#}");
3004 return None;
3005 }
3006 };
3007
3008 let new_overlay = if self.runtime.lifecycle.plugins_dir.as_os_str().is_empty() {
3010 None
3011 } else {
3012 match zeph_plugins::apply_plugin_config_overlays(
3013 &mut config,
3014 &self.runtime.lifecycle.plugins_dir,
3015 ) {
3016 Ok(o) => Some(o),
3017 Err(e) => {
3018 tracing::warn!(
3019 "plugin overlay merge failed during reload: {e:#}; \
3020 keeping previous runtime state"
3021 );
3022 return None;
3023 }
3024 }
3025 };
3026
3027 if let Some(ref overlay) = new_overlay {
3031 self.warn_on_shell_overlay_divergence(overlay, &config);
3032 }
3033 Some(config)
3034 }
3035
3036 fn warn_on_shell_overlay_divergence(
3042 &self,
3043 new_overlay: &zeph_plugins::ResolvedOverlay,
3044 config: &Config,
3045 ) {
3046 let new_blocked: Vec<String> = {
3047 let mut v = config.tools.shell.blocked_commands.clone();
3048 v.sort();
3049 v
3050 };
3051 let new_allowed: Vec<String> = {
3052 let mut v = config.tools.shell.allowed_commands.clone();
3053 v.sort();
3054 v
3055 };
3056
3057 let startup = &self.runtime.lifecycle.startup_shell_overlay;
3058 let blocked_changed = new_blocked != startup.blocked;
3059 let allowed_changed = new_allowed != startup.allowed;
3060
3061 if blocked_changed && let Some(ref h) = self.runtime.lifecycle.shell_policy_handle {
3063 h.rebuild(&config.tools.shell);
3064 tracing::info!(
3065 blocked_count = h.snapshot_blocked().len(),
3066 "shell blocked_commands rebuilt from hot-reload"
3067 );
3068 }
3069
3070 if allowed_changed {
3077 let msg = "plugin config overlay changed shell allowed_commands; RESTART REQUIRED \
3078 for sandbox path recomputation (blocked_commands was rebuilt live)";
3079 tracing::warn!("{msg}");
3080 if let Some(ref tx) = self.services.session.status_tx {
3081 let _ = tx.send(msg.to_owned());
3082 }
3083 }
3084
3085 let _ = new_overlay;
3086 }
3087
3088 fn maybe_sidequest_eviction(&mut self) {
3099 if self.services.sidequest.config.enabled {
3103 use crate::config::PruningStrategy;
3104 if !matches!(
3105 self.context_manager.compression.pruning_strategy,
3106 PruningStrategy::Reactive
3107 ) {
3108 tracing::warn!(
3109 strategy = ?self.context_manager.compression.pruning_strategy,
3110 "sidequest is enabled alongside a non-Reactive pruning strategy; \
3111 consider disabling sidequest.enabled to avoid redundant eviction"
3112 );
3113 }
3114 }
3115
3116 if self.services.focus.is_active() {
3118 tracing::debug!("sidequest: skipping — focus session active");
3119 self.services.compression.pending_sidequest_result = None;
3121 return;
3122 }
3123
3124 self.sidequest_apply_pending();
3126
3127 self.sidequest_schedule_next();
3129 }
3130
3131 fn sidequest_apply_pending(&mut self) {
3132 let Some(handle) = self.services.compression.pending_sidequest_result.take() else {
3133 return;
3134 };
3135 let result = match handle.try_join() {
3138 Ok(result) => result,
3139 Err(_handle) => {
3140 tracing::debug!("sidequest: background LLM task not yet complete, rescheduling");
3142 return;
3143 }
3144 };
3145 match result {
3146 Ok(Some(evicted_indices)) if !evicted_indices.is_empty() => {
3147 let cursors_snapshot = self.services.sidequest.tool_output_cursors.clone();
3148 let freed = self.services.sidequest.apply_eviction(
3149 &mut self.msg.messages,
3150 &evicted_indices,
3151 &self.runtime.metrics.token_counter,
3152 );
3153 if freed > 0 {
3154 self.recompute_prompt_tokens();
3155 self.context_manager.compaction =
3158 crate::agent::context_manager::CompactionState::CompactedThisTurn {
3159 cooldown: 0,
3160 };
3161 tracing::info!(
3162 freed_tokens = freed,
3163 evicted_cursors = evicted_indices.len(),
3164 pass = self.services.sidequest.passes_run,
3165 "sidequest eviction complete"
3166 );
3167 if let Some(ref d) = self.runtime.debug.debug_dumper {
3168 d.dump_sidequest_eviction(&cursors_snapshot, &evicted_indices, freed);
3169 }
3170 if let Some(ref tx) = self.services.session.status_tx {
3171 let _ = tx.send(format!("SideQuest evicted {freed} tokens"));
3172 }
3173 } else {
3174 if let Some(ref tx) = self.services.session.status_tx {
3176 let _ = tx.send(String::new());
3177 }
3178 }
3179 }
3180 Ok(None | Some(_)) => {
3181 tracing::debug!("sidequest: pending result: no cursors to evict");
3182 if let Some(ref tx) = self.services.session.status_tx {
3183 let _ = tx.send(String::new());
3184 }
3185 }
3186 Err(e) => {
3187 tracing::debug!("sidequest: background task error: {e}");
3188 if let Some(ref tx) = self.services.session.status_tx {
3189 let _ = tx.send(String::new());
3190 }
3191 }
3192 }
3193 }
3194
3195 fn sidequest_schedule_next(&mut self) {
3196 use zeph_llm::provider::{Message, MessageMetadata, Role};
3197
3198 self.services
3199 .sidequest
3200 .rebuild_cursors(&self.msg.messages, &self.runtime.metrics.token_counter);
3201
3202 if self.services.sidequest.tool_output_cursors.is_empty() {
3203 tracing::debug!("sidequest: no eligible cursors");
3204 return;
3205 }
3206
3207 let prompt = self.services.sidequest.build_eviction_prompt();
3208 let max_eviction_ratio = self.services.sidequest.config.max_eviction_ratio;
3209 let n_cursors = self.services.sidequest.tool_output_cursors.len();
3210 let provider = self.summary_or_primary_provider().clone();
3212
3213 let eviction_future = async move {
3214 let msgs = [Message {
3215 role: Role::User,
3216 content: prompt,
3217 parts: vec![],
3218 metadata: MessageMetadata::default(),
3219 }];
3220 let response =
3221 match tokio::time::timeout(std::time::Duration::from_secs(5), provider.chat(&msgs))
3222 .await
3223 {
3224 Ok(Ok(r)) => r,
3225 Ok(Err(e)) => {
3226 tracing::debug!("sidequest bg: LLM call failed: {e:#}");
3227 return None;
3228 }
3229 Err(_) => {
3230 tracing::debug!("sidequest bg: LLM call timed out");
3231 return None;
3232 }
3233 };
3234
3235 let start = response.find('{')?;
3236 let end = response.rfind('}')?;
3237 if start > end {
3238 return None;
3239 }
3240 let json_slice = &response[start..=end];
3241 let parsed: sidequest::EvictionResponse = serde_json::from_str(json_slice).ok()?;
3242 let mut valid: Vec<usize> = parsed
3243 .del_cursors
3244 .into_iter()
3245 .filter(|&c| c < n_cursors)
3246 .collect();
3247 valid.sort_unstable();
3248 valid.dedup();
3249 #[allow(
3250 clippy::cast_precision_loss,
3251 clippy::cast_possible_truncation,
3252 clippy::cast_sign_loss
3253 )]
3254 let max_evict = ((n_cursors as f32) * max_eviction_ratio).ceil() as usize;
3255 valid.truncate(max_evict);
3256 Some(valid)
3257 };
3258 let handle = self.runtime.lifecycle.task_supervisor.spawn_oneshot(
3259 std::sync::Arc::from("agent.sidequest.eviction"),
3260 move || eviction_future,
3261 );
3262 self.services.compression.pending_sidequest_result = Some(handle);
3263 tracing::debug!("sidequest: background LLM eviction task spawned");
3264 if let Some(ref tx) = self.services.session.status_tx {
3265 let _ = tx.send("SideQuest: scoring tool outputs...".into());
3266 }
3267 }
3268
3269 fn mcp_dispatch(&self) -> Option<McpManagerDispatch> {
3271 self.services
3272 .mcp
3273 .manager
3274 .as_ref()
3275 .map(|m| McpManagerDispatch(Arc::clone(m)))
3276 }
3277
3278 pub(crate) async fn check_cwd_changed(&mut self) {
3284 let current = match std::env::current_dir() {
3285 Ok(p) => p,
3286 Err(e) => {
3287 tracing::warn!("check_cwd_changed: failed to get cwd: {e}");
3288 return;
3289 }
3290 };
3291 if current == self.runtime.lifecycle.last_known_cwd {
3292 return;
3293 }
3294 let old_cwd =
3295 std::mem::replace(&mut self.runtime.lifecycle.last_known_cwd, current.clone());
3296 self.services.session.env_context.working_dir = current.display().to_string();
3297
3298 tracing::info!(
3299 old = %old_cwd.display(),
3300 new = %current.display(),
3301 "working directory changed"
3302 );
3303
3304 let _ = self
3305 .channel
3306 .send_status("Working directory changed\u{2026}")
3307 .await;
3308
3309 let hooks = self.services.session.hooks_config.cwd_changed.clone();
3310 if hooks.is_empty() {
3311 tracing::debug!("CwdChanged: no hooks configured, skipping");
3312 } else {
3313 tracing::debug!(count = hooks.len(), "CwdChanged: firing hooks");
3314 let mut env = std::collections::HashMap::new();
3315 env.insert("ZEPH_OLD_CWD".to_owned(), old_cwd.display().to_string());
3316 env.insert("ZEPH_NEW_CWD".to_owned(), current.display().to_string());
3317 let dispatch = self.mcp_dispatch();
3318 let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
3319 .as_ref()
3320 .map(|d| d as &dyn zeph_subagent::McpDispatch);
3321 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp).await {
3322 tracing::warn!(error = %e, "CwdChanged hook failed");
3323 } else {
3324 tracing::info!(count = hooks.len(), "CwdChanged: hooks fired");
3325 }
3326 }
3327
3328 let _ = self.channel.send_status("").await;
3329 }
3330
3331 pub(crate) async fn handle_file_changed(
3333 &mut self,
3334 event: crate::file_watcher::FileChangedEvent,
3335 ) {
3336 tracing::info!(path = %event.path.display(), "file changed");
3337
3338 let _ = self
3339 .channel
3340 .send_status("Running file-change hook\u{2026}")
3341 .await;
3342
3343 let hooks = self
3344 .services
3345 .session
3346 .hooks_config
3347 .file_changed_hooks
3348 .clone();
3349 if hooks.is_empty() {
3350 tracing::debug!(path = %event.path.display(), "FileChanged: no hooks configured, skipping");
3351 } else {
3352 tracing::debug!(count = hooks.len(), path = %event.path.display(), "FileChanged: firing hooks");
3353 let mut env = std::collections::HashMap::new();
3354 env.insert(
3355 "ZEPH_CHANGED_PATH".to_owned(),
3356 event.path.display().to_string(),
3357 );
3358 let dispatch = self.mcp_dispatch();
3359 let mcp: Option<&dyn zeph_subagent::McpDispatch> = dispatch
3360 .as_ref()
3361 .map(|d| d as &dyn zeph_subagent::McpDispatch);
3362 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env, mcp).await {
3363 tracing::warn!(error = %e, "FileChanged hook failed");
3364 } else {
3365 tracing::info!(count = hooks.len(), path = %event.path.display(), "FileChanged: hooks fired");
3366 }
3367 }
3368
3369 let _ = self.channel.send_status("").await;
3370 }
3371
3372 pub(super) fn maybe_spawn_promotion_scan(&mut self) {
3382 let Some(engine) = self.services.promotion_engine.clone() else {
3383 return;
3384 };
3385
3386 let Some(memory) = self.services.memory.persistence.memory.clone() else {
3387 return;
3388 };
3389
3390 let promotion_window = 200usize;
3393
3394 let accepted = self.runtime.lifecycle.supervisor.spawn(
3395 agent_supervisor::TaskClass::Enrichment,
3396 "compression_spectrum.promotion_scan",
3397 async move {
3398 let span = tracing::info_span!("memory.compression.promote.background");
3399 let _enter = span.enter();
3400
3401 let window = match memory.load_promotion_window(promotion_window).await {
3402 Ok(w) => w,
3403 Err(e) => {
3404 tracing::warn!(error = %e, "promotion scan: failed to load window");
3405 return;
3406 }
3407 };
3408
3409 if window.is_empty() {
3410 return;
3411 }
3412
3413 let candidates = match engine.scan(&window).await {
3414 Ok(c) => c,
3415 Err(e) => {
3416 tracing::warn!(error = %e, "promotion scan: clustering failed");
3417 return;
3418 }
3419 };
3420
3421 for candidate in &candidates {
3422 if let Err(e) = engine.promote(candidate).await {
3423 tracing::warn!(
3424 signature = %candidate.signature,
3425 error = %e,
3426 "promotion scan: promote failed"
3427 );
3428 }
3429 }
3430
3431 tracing::info!(candidates = candidates.len(), "promotion scan: complete");
3432 },
3433 );
3434
3435 if accepted {
3436 tracing::debug!("compression_spectrum: promotion scan task enqueued");
3437 }
3438 }
3439}
3440struct McpManagerDispatch(Arc<zeph_mcp::McpManager>);
3445
3446impl zeph_subagent::McpDispatch for McpManagerDispatch {
3447 fn call_tool<'a>(
3448 &'a self,
3449 server: &'a str,
3450 tool: &'a str,
3451 args: serde_json::Value,
3452 ) -> std::pin::Pin<
3453 Box<dyn std::future::Future<Output = Result<serde_json::Value, String>> + Send + 'a>,
3454 > {
3455 Box::pin(async move {
3456 self.0
3457 .call_tool(server, tool, args)
3458 .await
3459 .map(|result| {
3460 let texts: Vec<serde_json::Value> = result
3462 .content
3463 .iter()
3464 .filter_map(|c| {
3465 if let rmcp::model::RawContent::Text(t) = &c.raw {
3466 Some(serde_json::Value::String(t.text.clone()))
3467 } else {
3468 None
3469 }
3470 })
3471 .collect();
3472 serde_json::Value::Array(texts)
3473 })
3474 .map_err(|e| e.to_string())
3475 })
3476 }
3477}
3478
3479pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
3480 while !*rx.borrow_and_update() {
3481 if rx.changed().await.is_err() {
3482 std::future::pending::<()>().await;
3483 }
3484 }
3485}
3486
3487pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
3488 match rx {
3489 Some(inner) => {
3490 if let Some(v) = inner.recv().await {
3491 Some(v)
3492 } else {
3493 *rx = None;
3494 std::future::pending().await
3495 }
3496 }
3497 None => std::future::pending().await,
3498 }
3499}
3500
3501fn truncate_shell_command(cmd: &str) -> String {
3508 if cmd.len() <= 80 {
3509 return cmd.to_owned();
3510 }
3511 let end = cmd.floor_char_boundary(79);
3512 format!("{}…", &cmd[..end])
3513}
3514
3515fn truncate_shell_run_id(id: &str) -> String {
3517 id.chars().take(8).collect()
3518}
3519
3520pub(crate) fn resolve_context_budget(config: &Config, provider: &AnyProvider) -> usize {
3521 let tokens = if config.memory.auto_budget && config.memory.context_budget_tokens == 0 {
3522 if let Some(ctx_size) = provider.context_window() {
3523 tracing::info!(
3524 model_context = ctx_size,
3525 "auto-configured context budget on reload"
3526 );
3527 ctx_size
3528 } else {
3529 0
3530 }
3531 } else {
3532 config.memory.context_budget_tokens
3533 };
3534 if tokens == 0 {
3535 tracing::warn!(
3536 "context_budget_tokens resolved to 0 on reload — using fallback of 128000 tokens"
3537 );
3538 128_000
3539 } else {
3540 tokens
3541 }
3542}
3543
3544#[cfg(test)]
3545mod tests;
3546
3547#[cfg(test)]
3548pub(crate) use tests::agent_tests;
3549
3550#[cfg(test)]
3551mod test_stubs {
3552 use std::pin::Pin;
3553
3554 use zeph_commands::{
3555 CommandContext, CommandError, CommandHandler, CommandOutput, SlashCategory,
3556 };
3557
3558 pub(super) struct TestErrorCommand;
3564
3565 impl CommandHandler<CommandContext<'_>> for TestErrorCommand {
3566 fn name(&self) -> &'static str {
3567 "/test-error"
3568 }
3569
3570 fn description(&self) -> &'static str {
3571 "Test stub: always returns CommandError"
3572 }
3573
3574 fn category(&self) -> SlashCategory {
3575 SlashCategory::Session
3576 }
3577
3578 fn handle<'a>(
3579 &'a self,
3580 _ctx: &'a mut CommandContext<'_>,
3581 _args: &'a str,
3582 ) -> Pin<
3583 Box<dyn std::future::Future<Output = Result<CommandOutput, CommandError>> + Send + 'a>,
3584 > {
3585 Box::pin(async { Err(CommandError::new("boom")) })
3586 }
3587 }
3588}