1mod agent_access_impl;
5pub(crate) mod agent_supervisor;
6mod autodream;
7mod builder;
8mod command_context_impls;
9pub(crate) mod compaction_strategy;
10pub(super) mod compression_feedback;
11mod context;
12mod context_impls;
13pub(crate) mod context_manager;
14mod corrections;
15pub mod error;
16mod experiment_cmd;
17pub(super) mod feedback_detector;
18pub(crate) mod focus;
19mod index;
20mod learning;
21pub(crate) mod learning_engine;
22mod log_commands;
23mod loop_event;
24mod lsp_commands;
25mod magic_docs;
26mod mcp;
27mod message_queue;
28mod microcompact;
29mod model_commands;
30mod persistence;
31#[cfg(feature = "scheduler")]
32mod plan;
33mod policy_commands;
34mod provider_cmd;
35#[cfg(feature = "self-check")]
36mod quality_hook;
37pub(crate) mod rate_limiter;
38#[cfg(feature = "scheduler")]
39mod scheduler_commands;
40#[cfg(feature = "scheduler")]
41mod scheduler_loop;
42pub mod session_config;
43mod session_digest;
44pub(crate) mod sidequest;
45mod skill_management;
46pub mod slash_commands;
47pub mod speculative;
48pub(crate) mod state;
49pub(crate) mod task_injection;
50pub(crate) mod tool_execution;
51pub(crate) mod tool_orchestrator;
52mod trust_commands;
53pub mod turn;
54mod utils;
55pub(crate) mod vigil;
56
57use std::collections::{HashMap, VecDeque};
58use std::sync::Arc;
59
60use parking_lot::RwLock;
61
62use tokio::sync::{mpsc, watch};
63use tokio_util::sync::CancellationToken;
64use zeph_llm::any::AnyProvider;
65use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
66use zeph_memory::TokenCounter;
67use zeph_memory::semantic::SemanticMemory;
68use zeph_skills::loader::Skill;
69use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
70use zeph_skills::prompt::format_skills_prompt;
71use zeph_skills::registry::SkillRegistry;
72use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
73
74use crate::channel::Channel;
75use crate::config::Config;
76use crate::context::{ContextBudget, build_system_prompt};
77use zeph_common::text::estimate_tokens;
78
79use loop_event::LoopEvent;
80use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, detect_image_mime};
81use state::CompressionState;
82use state::{
83 DebugState, ExperimentState, FeedbackState, IndexState, InstructionState, LifecycleState,
84 McpState, MemoryState, MessageState, MetricsState, OrchestrationState, ProviderState,
85 RuntimeConfig, SecurityState, SessionState, SkillState, ToolState,
86};
87
88pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
89pub(crate) const DOCUMENT_RAG_PREFIX: &str = "## Relevant documents\n";
90pub(crate) const RECALL_PREFIX: &str = "[semantic recall]\n";
91pub(crate) const CODE_CONTEXT_PREFIX: &str = "[code context]\n";
92pub(crate) const SUMMARY_PREFIX: &str = "[conversation summaries]\n";
93pub(crate) const CROSS_SESSION_PREFIX: &str = "[cross-session context]\n";
94pub(crate) const CORRECTIONS_PREFIX: &str = "[past corrections]\n";
95pub(crate) const GRAPH_FACTS_PREFIX: &str = "[known facts]\n";
96pub(crate) const SCHEDULED_TASK_PREFIX: &str = "Execute the following scheduled task now: ";
97pub(crate) const SESSION_DIGEST_PREFIX: &str = "[Session digest from previous interaction]\n";
98pub(crate) const LSP_NOTE_PREFIX: &str = "[lsp ";
103pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
104
105pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
106 use std::fmt::Write;
107 let capacity = "[tool output: ".len()
108 + tool_name.len()
109 + "]\n```\n".len()
110 + body.len()
111 + TOOL_OUTPUT_SUFFIX.len();
112 let mut buf = String::with_capacity(capacity);
113 let _ = write!(
114 buf,
115 "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
116 );
117 buf
118}
119
120pub struct Agent<C: Channel> {
152 provider: AnyProvider,
153 embedding_provider: AnyProvider,
158 channel: C,
159 pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
160 pub(super) msg: MessageState,
161 pub(super) memory_state: MemoryState,
162 pub(super) skill_state: SkillState,
163 pub(super) context_manager: context_manager::ContextManager,
164 pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
165 pub(super) learning_engine: learning_engine::LearningEngine,
166 pub(super) feedback: FeedbackState,
167 pub(super) runtime: RuntimeConfig,
168 pub(super) mcp: McpState,
169 pub(super) index: IndexState,
170 pub(super) session: SessionState,
171 pub(super) debug_state: DebugState,
172 pub(super) instructions: InstructionState,
173 pub(super) security: SecurityState,
174 pub(super) experiments: ExperimentState,
175 pub(super) compression: CompressionState,
176 pub(super) lifecycle: LifecycleState,
177 pub(super) providers: ProviderState,
178 pub(super) metrics: MetricsState,
179 pub(super) orchestration: OrchestrationState,
180 pub(super) focus: focus::FocusState,
182 pub(super) sidequest: sidequest::SidequestState,
184 pub(super) tool_state: ToolState,
186 #[cfg(feature = "self-check")]
188 pub(super) quality: Option<std::sync::Arc<crate::quality::SelfCheckPipeline>>,
189}
190
191enum DispatchFlow {
193 Break,
195 Continue,
197 Fallthrough,
199}
200
201impl<C: Channel> Agent<C> {
202 #[must_use]
226 pub fn new(
227 provider: AnyProvider,
228 channel: C,
229 registry: SkillRegistry,
230 matcher: Option<SkillMatcherBackend>,
231 max_active_skills: usize,
232 tool_executor: impl ToolExecutor + 'static,
233 ) -> Self {
234 let registry = Arc::new(RwLock::new(registry));
235 let embedding_provider = provider.clone();
236 Self::new_with_registry_arc(
237 provider,
238 embedding_provider,
239 channel,
240 registry,
241 matcher,
242 max_active_skills,
243 tool_executor,
244 )
245 }
246
247 #[must_use]
254 #[allow(clippy::too_many_lines)] pub fn new_with_registry_arc(
256 provider: AnyProvider,
257 embedding_provider: AnyProvider,
258 channel: C,
259 registry: Arc<RwLock<SkillRegistry>>,
260 matcher: Option<SkillMatcherBackend>,
261 max_active_skills: usize,
262 tool_executor: impl ToolExecutor + 'static,
263 ) -> Self {
264 debug_assert!(max_active_skills > 0, "max_active_skills must be > 0");
265 let all_skills: Vec<Skill> = {
266 let reg = registry.read();
267 reg.all_meta()
268 .iter()
269 .filter_map(|m| reg.get_skill(&m.name).ok())
270 .collect()
271 };
272 let empty_trust = HashMap::new();
273 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
274 let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
275 let system_prompt = build_system_prompt(&skills_prompt, None);
276 tracing::debug!(len = system_prompt.len(), "initial system prompt built");
277 tracing::trace!(prompt = %system_prompt, "full system prompt");
278
279 let initial_prompt_tokens = estimate_tokens(&system_prompt) as u64;
280 let token_counter = Arc::new(TokenCounter::new());
281 Self {
282 provider,
283 embedding_provider,
284 channel,
285 tool_executor: Arc::new(tool_executor),
286 msg: MessageState {
287 messages: vec![Message {
288 role: Role::System,
289 content: system_prompt,
290 parts: vec![],
291 metadata: MessageMetadata::default(),
292 }],
293 message_queue: VecDeque::new(),
294 pending_image_parts: Vec::new(),
295 last_persisted_message_id: None,
296 deferred_db_hide_ids: Vec::new(),
297 deferred_db_summaries: Vec::new(),
298 },
299 memory_state: MemoryState::default(),
300 skill_state: SkillState::new(registry, matcher, max_active_skills, skills_prompt),
301 context_manager: context_manager::ContextManager::new(),
302 tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
303 learning_engine: learning_engine::LearningEngine::new(),
304 feedback: FeedbackState::default(),
305 debug_state: DebugState::default(),
306 runtime: RuntimeConfig::default(),
307 mcp: McpState::default(),
308 index: IndexState::default(),
309 session: SessionState::new(),
310 instructions: InstructionState::default(),
311 security: SecurityState::default(),
312 experiments: ExperimentState::new(),
313 compression: CompressionState::default(),
314 lifecycle: LifecycleState::new(),
315 providers: ProviderState::new(initial_prompt_tokens),
316 metrics: MetricsState::new(token_counter),
317 orchestration: OrchestrationState::default(),
318 focus: focus::FocusState::default(),
319 sidequest: sidequest::SidequestState::default(),
320 tool_state: ToolState::default(),
321 #[cfg(feature = "self-check")]
322 quality: None,
323 }
324 }
325
326 pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
331 let Some(mgr) = &mut self.orchestration.subagent_manager else {
332 return vec![];
333 };
334
335 let finished: Vec<String> = mgr
336 .statuses()
337 .into_iter()
338 .filter_map(|(id, status)| {
339 if matches!(
340 status.state,
341 zeph_subagent::SubAgentState::Completed
342 | zeph_subagent::SubAgentState::Failed
343 | zeph_subagent::SubAgentState::Canceled
344 ) {
345 Some(id)
346 } else {
347 None
348 }
349 })
350 .collect();
351
352 let mut results = vec![];
353 for task_id in finished {
354 match mgr.collect(&task_id).await {
355 Ok(result) => results.push((task_id, result)),
356 Err(e) => {
357 tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
358 }
359 }
360 }
361 results
362 }
363
364 async fn call_llm_for_session_summary(
373 &self,
374 chat_messages: &[Message],
375 ) -> Option<zeph_memory::StructuredSummary> {
376 let timeout_dur = std::time::Duration::from_secs(
377 self.memory_state.compaction.shutdown_summary_timeout_secs,
378 );
379 match tokio::time::timeout(
380 timeout_dur,
381 self.provider
382 .chat_typed_erased::<zeph_memory::StructuredSummary>(chat_messages),
383 )
384 .await
385 {
386 Ok(Ok(s)) => Some(s),
387 Ok(Err(e)) => {
388 tracing::warn!(
389 "shutdown summary: structured LLM call failed, falling back to plain: {e:#}"
390 );
391 self.plain_text_summary_fallback(chat_messages, timeout_dur)
392 .await
393 }
394 Err(_) => {
395 tracing::warn!(
396 "shutdown summary: structured LLM call timed out after {}s, falling back to plain",
397 self.memory_state.compaction.shutdown_summary_timeout_secs
398 );
399 self.plain_text_summary_fallback(chat_messages, timeout_dur)
400 .await
401 }
402 }
403 }
404
405 async fn plain_text_summary_fallback(
406 &self,
407 chat_messages: &[Message],
408 timeout_dur: std::time::Duration,
409 ) -> Option<zeph_memory::StructuredSummary> {
410 match tokio::time::timeout(timeout_dur, self.provider.chat(chat_messages)).await {
411 Ok(Ok(plain)) => Some(zeph_memory::StructuredSummary {
412 summary: plain,
413 key_facts: vec![],
414 entities: vec![],
415 }),
416 Ok(Err(e)) => {
417 tracing::warn!("shutdown summary: plain LLM fallback failed: {e:#}");
418 None
419 }
420 Err(_) => {
421 tracing::warn!("shutdown summary: plain LLM fallback timed out");
422 None
423 }
424 }
425 }
426
427 async fn flush_orphaned_tool_use_on_shutdown(&mut self) {
432 use zeph_llm::provider::{MessagePart, Role};
433
434 let msgs = &self.msg.messages;
438 let Some(asst_idx) = msgs.iter().rposition(|m| m.role == Role::Assistant) else {
440 return;
441 };
442 let asst_msg = &msgs[asst_idx];
443 let tool_use_ids: Vec<(&str, &str, &serde_json::Value)> = asst_msg
444 .parts
445 .iter()
446 .filter_map(|p| {
447 if let MessagePart::ToolUse { id, name, input } = p {
448 Some((id.as_str(), name.as_str(), input))
449 } else {
450 None
451 }
452 })
453 .collect();
454 if tool_use_ids.is_empty() {
455 return;
456 }
457
458 let paired_ids: std::collections::HashSet<&str> = msgs
460 .get(asst_idx + 1..)
461 .into_iter()
462 .flatten()
463 .filter(|m| m.role == Role::User)
464 .flat_map(|m| m.parts.iter())
465 .filter_map(|p| {
466 if let MessagePart::ToolResult { tool_use_id, .. } = p {
467 Some(tool_use_id.as_str())
468 } else {
469 None
470 }
471 })
472 .collect();
473
474 let unpaired: Vec<zeph_llm::provider::ToolUseRequest> = tool_use_ids
475 .iter()
476 .filter(|(id, _, _)| !paired_ids.contains(*id))
477 .map(|(id, name, input)| zeph_llm::provider::ToolUseRequest {
478 id: (*id).to_owned(),
479 name: (*name).to_owned().into(),
480 input: (*input).clone(),
481 })
482 .collect();
483
484 if unpaired.is_empty() {
485 return;
486 }
487
488 tracing::info!(
489 count = unpaired.len(),
490 "shutdown: persisting tombstone ToolResults for unpaired in-flight tool calls"
491 );
492 self.persist_cancelled_tool_results(&unpaired).await;
493 }
494
495 async fn maybe_store_shutdown_summary(&mut self) {
505 if !self.memory_state.compaction.shutdown_summary {
506 return;
507 }
508 let Some(memory) = self.memory_state.persistence.memory.clone() else {
509 return;
510 };
511 let Some(conversation_id) = self.memory_state.persistence.conversation_id else {
512 return;
513 };
514
515 match memory.has_session_summary(conversation_id).await {
517 Ok(true) => {
518 tracing::debug!("shutdown summary: session already has a summary, skipping");
519 return;
520 }
521 Ok(false) => {}
522 Err(e) => {
523 tracing::warn!("shutdown summary: failed to check existing summary: {e:#}");
524 return;
525 }
526 }
527
528 let user_count = self
530 .msg
531 .messages
532 .iter()
533 .skip(1)
534 .filter(|m| m.role == Role::User)
535 .count();
536 if user_count < self.memory_state.compaction.shutdown_summary_min_messages {
537 tracing::debug!(
538 user_count,
539 min = self.memory_state.compaction.shutdown_summary_min_messages,
540 "shutdown summary: too few user messages, skipping"
541 );
542 return;
543 }
544
545 let _ = self.channel.send_status("Saving session summary...").await;
547
548 let max = self.memory_state.compaction.shutdown_summary_max_messages;
550 if max == 0 {
551 tracing::debug!("shutdown summary: max_messages=0, skipping");
552 return;
553 }
554 let non_system: Vec<_> = self.msg.messages.iter().skip(1).collect();
555 let slice = if non_system.len() > max {
556 &non_system[non_system.len() - max..]
557 } else {
558 &non_system[..]
559 };
560
561 let msgs_for_prompt: Vec<(zeph_memory::MessageId, String, String)> = slice
562 .iter()
563 .map(|m| {
564 let role = match m.role {
565 Role::User => "user".to_owned(),
566 Role::Assistant => "assistant".to_owned(),
567 Role::System => "system".to_owned(),
568 };
569 (zeph_memory::MessageId(0), role, m.content.clone())
570 })
571 .collect();
572
573 let prompt = zeph_memory::build_summarization_prompt(&msgs_for_prompt);
574 let chat_messages = vec![Message {
575 role: Role::User,
576 content: prompt,
577 parts: vec![],
578 metadata: MessageMetadata::default(),
579 }];
580
581 let Some(structured) = self.call_llm_for_session_summary(&chat_messages).await else {
582 let _ = self.channel.send_status("").await;
583 return;
584 };
585
586 if let Err(e) = memory
587 .store_shutdown_summary(conversation_id, &structured.summary, &structured.key_facts)
588 .await
589 {
590 tracing::warn!("shutdown summary: storage failed: {e:#}");
591 } else {
592 tracing::info!(
593 conversation_id = conversation_id.0,
594 "shutdown summary stored"
595 );
596 }
597
598 let _ = self.channel.send_status("").await;
600 }
601
602 pub async fn shutdown(&mut self) {
618 let _ = self.channel.send_status("Shutting down...").await;
619
620 self.provider.save_router_state();
622
623 if let Some(ref advisor) = self.orchestration.topology_advisor
625 && let Err(e) = advisor.save()
626 {
627 tracing::warn!(error = %e, "adaptorch: failed to persist state");
628 }
629
630 if let Some(ref mut mgr) = self.orchestration.subagent_manager {
631 mgr.shutdown_all();
632 }
633
634 if let Some(ref manager) = self.mcp.manager {
635 manager.shutdown_all_shared().await;
636 }
637
638 if let Some(turns) = self.context_manager.turns_since_last_hard_compaction {
642 self.update_metrics(|m| {
643 m.compaction_turns_after_hard.push(turns);
644 });
645 self.context_manager.turns_since_last_hard_compaction = None;
646 }
647
648 if let Some(ref tx) = self.metrics.metrics_tx {
649 let m = tx.borrow();
650 if m.filter_applications > 0 {
651 #[allow(clippy::cast_precision_loss)]
652 let pct = if m.filter_raw_tokens > 0 {
653 m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
654 } else {
655 0.0
656 };
657 tracing::info!(
658 raw_tokens = m.filter_raw_tokens,
659 saved_tokens = m.filter_saved_tokens,
660 applications = m.filter_applications,
661 "tool output filtering saved ~{} tokens ({pct:.0}%)",
662 m.filter_saved_tokens,
663 );
664 }
665 if m.compaction_hard_count > 0 {
666 tracing::info!(
667 hard_compactions = m.compaction_hard_count,
668 turns_after_hard = ?m.compaction_turns_after_hard,
669 "hard compaction trajectory"
670 );
671 }
672 }
673
674 self.flush_orphaned_tool_use_on_shutdown().await;
678
679 self.lifecycle.supervisor.abort_all();
680
681 self.maybe_store_shutdown_summary().await;
682 self.maybe_store_session_digest().await;
683
684 tracing::info!("agent shutdown complete");
685 }
686
687 fn refresh_subagent_metrics(&mut self) {
694 let Some(ref mgr) = self.orchestration.subagent_manager else {
695 return;
696 };
697 let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
698 .statuses()
699 .into_iter()
700 .map(|(id, s)| {
701 let def = mgr.agents_def(&id);
702 crate::metrics::SubAgentMetrics {
703 name: def.map_or_else(|| id[..8.min(id.len())].to_owned(), |d| d.name.clone()),
704 id: id.clone(),
705 state: format!("{:?}", s.state).to_lowercase(),
706 turns_used: s.turns_used,
707 max_turns: def.map_or(20, |d| d.permissions.max_turns),
708 background: def.is_some_and(|d| d.permissions.background),
709 elapsed_secs: s.started_at.elapsed().as_secs(),
710 permission_mode: def.map_or_else(String::new, |d| {
711 use zeph_subagent::def::PermissionMode;
712 match d.permissions.permission_mode {
713 PermissionMode::Default => String::new(),
714 PermissionMode::AcceptEdits => "accept_edits".into(),
715 PermissionMode::DontAsk => "dont_ask".into(),
716 PermissionMode::BypassPermissions => "bypass_permissions".into(),
717 PermissionMode::Plan => "plan".into(),
718 }
719 }),
720 transcript_dir: mgr
721 .agent_transcript_dir(&id)
722 .map(|p| p.to_string_lossy().into_owned()),
723 }
724 })
725 .collect();
726 self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
727 }
728
729 async fn notify_completed_subagents(&mut self) -> Result<(), error::AgentError> {
731 let completed = self.poll_subagents().await;
732 for (task_id, result) in completed {
733 let notice = if result.is_empty() {
734 format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
735 } else {
736 format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
737 };
738 if let Err(e) = self.channel.send(¬ice).await {
739 tracing::warn!(error = %e, "failed to send sub-agent completion notice");
740 }
741 }
742 Ok(())
743 }
744
745 #[allow(clippy::too_many_lines)] pub async fn run(&mut self) -> Result<(), error::AgentError>
752 where
753 C: 'static,
754 {
755 if let Some(mut rx) = self.lifecycle.warmup_ready.take()
756 && !*rx.borrow()
757 {
758 let _ = rx.changed().await;
759 if !*rx.borrow() {
760 tracing::warn!("model warmup did not complete successfully");
761 }
762 }
763
764 self.load_and_cache_session_digest().await;
766 self.maybe_send_resume_recap().await;
767
768 loop {
769 self.apply_provider_override();
770 self.check_tool_refresh().await;
771 self.process_pending_elicitations().await;
772 self.refresh_subagent_metrics();
773 self.notify_completed_subagents().await?;
774 self.drain_channel();
775
776 let (text, image_parts) = if let Some(queued) = self.msg.message_queue.pop_front() {
777 self.notify_queue_count().await;
778 if queued.raw_attachments.is_empty() {
779 (queued.text, queued.image_parts)
780 } else {
781 let msg = crate::channel::ChannelMessage {
782 text: queued.text,
783 attachments: queued.raw_attachments,
784 };
785 self.resolve_message(msg).await
786 }
787 } else {
788 match self.next_event().await? {
789 None | Some(LoopEvent::Shutdown) => break,
790 Some(LoopEvent::SkillReload) => {
791 self.reload_skills().await;
792 continue;
793 }
794 Some(LoopEvent::InstructionReload) => {
795 self.reload_instructions();
796 continue;
797 }
798 Some(LoopEvent::ConfigReload) => {
799 self.reload_config();
800 continue;
801 }
802 Some(LoopEvent::UpdateNotification(msg)) => {
803 if let Err(e) = self.channel.send(&msg).await {
804 tracing::warn!("failed to send update notification: {e}");
805 }
806 continue;
807 }
808 Some(LoopEvent::ExperimentCompleted(msg)) => {
809 self.experiments.cancel = None;
810 if let Err(e) = self.channel.send(&msg).await {
811 tracing::warn!("failed to send experiment completion: {e}");
812 }
813 continue;
814 }
815 Some(LoopEvent::ScheduledTask(prompt)) => {
816 let text = format!("{SCHEDULED_TASK_PREFIX}{prompt}");
817 let msg = crate::channel::ChannelMessage {
818 text,
819 attachments: Vec::new(),
820 };
821 self.drain_channel();
822 self.resolve_message(msg).await
823 }
824 Some(LoopEvent::TaskInjected(injection)) => {
825 if let Some(ref mut ls) = self.lifecycle.user_loop {
826 ls.iteration += 1;
827 tracing::info!(iteration = ls.iteration, "loop: tick");
828 }
829 let msg = crate::channel::ChannelMessage {
830 text: injection.prompt,
831 attachments: Vec::new(),
832 };
833 self.drain_channel();
834 self.resolve_message(msg).await
835 }
836 Some(LoopEvent::FileChanged(event)) => {
837 self.handle_file_changed(event).await;
838 continue;
839 }
840 Some(LoopEvent::Message(msg)) => {
841 self.drain_channel();
842 self.resolve_message(msg).await
843 }
844 }
845 };
846
847 let trimmed = text.trim();
848
849 if trimmed.starts_with('/') {
852 let slash_urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
853 if !slash_urls.is_empty() {
854 self.security.user_provided_urls.write().extend(slash_urls);
855 }
856 }
857
858 let session_impl = command_context_impls::SessionAccessImpl {
863 supports_exit: self.channel.supports_exit(),
864 };
865 let mut messages_impl = command_context_impls::MessageAccessImpl {
866 msg: &mut self.msg,
867 tool_state: &mut self.tool_state,
868 providers: &mut self.providers,
869 metrics: &self.metrics,
870 security: &mut self.security,
871 tool_orchestrator: &mut self.tool_orchestrator,
872 };
873 let mut sink_adapter = crate::channel::ChannelSinkAdapter(&mut self.channel);
875 let mut null_agent = zeph_commands::NullAgent;
877 let registry_handled = {
878 use zeph_commands::CommandRegistry;
879 use zeph_commands::handlers::debug::{
880 DebugDumpCommand, DumpFormatCommand, LogCommand,
881 };
882 use zeph_commands::handlers::help::HelpCommand;
883 use zeph_commands::handlers::session::{
884 ClearCommand, ClearQueueCommand, ExitCommand, QuitCommand, ResetCommand,
885 };
886
887 let mut reg = CommandRegistry::new();
888 reg.register(ExitCommand);
889 reg.register(QuitCommand);
890 reg.register(ClearCommand);
891 reg.register(ResetCommand);
892 reg.register(ClearQueueCommand);
893 reg.register(LogCommand);
894 reg.register(DebugDumpCommand);
895 reg.register(DumpFormatCommand);
896 reg.register(HelpCommand);
897 #[cfg(test)]
898 reg.register(test_stubs::TestErrorCommand);
899
900 let mut ctx = zeph_commands::CommandContext {
901 sink: &mut sink_adapter,
902 debug: &mut self.debug_state,
903 messages: &mut messages_impl,
904 session: &session_impl,
905 agent: &mut null_agent,
906 };
907 reg.dispatch(&mut ctx, trimmed).await
908 };
909 let session_reg_missed = registry_handled.is_none();
910 match self
911 .apply_dispatch_result(registry_handled, trimmed, false)
912 .await
913 {
914 DispatchFlow::Break => break,
915 DispatchFlow::Continue => continue,
916 DispatchFlow::Fallthrough => {
917 }
919 }
920
921 let mut agent_null_debug = command_context_impls::NullDebugAccess;
927 let mut agent_null_messages = command_context_impls::NullMessageAccess;
928 let agent_null_session = command_context_impls::NullSessionAccess;
929 let mut agent_null_sink = zeph_commands::NullSink;
930 let agent_result: Option<
931 Result<zeph_commands::CommandOutput, zeph_commands::CommandError>,
932 > = if session_reg_missed {
933 use zeph_commands::CommandRegistry;
934 use zeph_commands::handlers::{
935 agent_cmd::AgentCommand,
936 compaction::{CompactCommand, NewConversationCommand, RecapCommand},
937 experiment::ExperimentCommand,
938 loop_cmd::LoopCommand,
939 lsp::LspCommand,
940 mcp::McpCommand,
941 memory::{GraphCommand, GuidelinesCommand, MemoryCommand},
942 misc::{CacheStatsCommand, ImageCommand},
943 model::{ModelCommand, ProviderCommand},
944 plan::PlanCommand,
945 plugins::PluginsCommand,
946 policy::PolicyCommand,
947 scheduler::SchedulerCommand,
948 skill::{FeedbackCommand, SkillCommand, SkillsCommand},
949 status::{FocusCommand, GuardrailCommand, SideQuestCommand, StatusCommand},
950 };
951
952 let mut agent_reg = CommandRegistry::new();
953 agent_reg.register(MemoryCommand);
954 agent_reg.register(GraphCommand);
955 agent_reg.register(GuidelinesCommand);
956 agent_reg.register(ModelCommand);
957 agent_reg.register(ProviderCommand);
958 agent_reg.register(SkillCommand);
960 agent_reg.register(SkillsCommand);
961 agent_reg.register(FeedbackCommand);
962 agent_reg.register(McpCommand);
963 agent_reg.register(PolicyCommand);
964 agent_reg.register(SchedulerCommand);
965 agent_reg.register(LspCommand);
966 agent_reg.register(CacheStatsCommand);
968 agent_reg.register(ImageCommand);
969 agent_reg.register(StatusCommand);
970 agent_reg.register(GuardrailCommand);
971 agent_reg.register(FocusCommand);
972 agent_reg.register(SideQuestCommand);
973 agent_reg.register(AgentCommand);
974 agent_reg.register(CompactCommand);
976 agent_reg.register(NewConversationCommand);
977 agent_reg.register(RecapCommand);
978 agent_reg.register(ExperimentCommand);
979 agent_reg.register(PlanCommand);
980 agent_reg.register(LoopCommand);
981 agent_reg.register(PluginsCommand);
982
983 let mut ctx = zeph_commands::CommandContext {
984 sink: &mut agent_null_sink,
985 debug: &mut agent_null_debug,
986 messages: &mut agent_null_messages,
987 session: &agent_null_session,
988 agent: self,
989 };
990 agent_reg.dispatch(&mut ctx, trimmed).await
992 } else {
993 None
994 };
995 match self
999 .apply_dispatch_result(agent_result, trimmed, true)
1000 .await
1001 {
1002 DispatchFlow::Break => break,
1003 DispatchFlow::Continue => continue,
1004 DispatchFlow::Fallthrough => {
1005 }
1007 }
1008
1009 match self.handle_builtin_command(trimmed) {
1010 Some(true) => break,
1011 Some(false) => continue,
1012 None => {}
1013 }
1014
1015 self.process_user_message(text, image_parts).await?;
1016 }
1017
1018 self.maybe_autodream().await;
1021
1022 if let Some(ref mut tc) = self.debug_state.trace_collector {
1024 tc.finish();
1025 }
1026
1027 Ok(())
1028 }
1029
1030 async fn apply_dispatch_result(
1036 &mut self,
1037 result: Option<Result<zeph_commands::CommandOutput, zeph_commands::CommandError>>,
1038 command: &str,
1039 with_learning: bool,
1040 ) -> DispatchFlow {
1041 match result {
1042 Some(Ok(zeph_commands::CommandOutput::Exit)) => {
1043 let _ = self.channel.flush_chunks().await;
1044 DispatchFlow::Break
1045 }
1046 Some(Ok(
1047 zeph_commands::CommandOutput::Continue | zeph_commands::CommandOutput::Silent,
1048 )) => {
1049 let _ = self.channel.flush_chunks().await;
1050 DispatchFlow::Continue
1051 }
1052 Some(Ok(zeph_commands::CommandOutput::Message(msg))) => {
1053 let _ = self.channel.send(&msg).await;
1054 let _ = self.channel.flush_chunks().await;
1055 if with_learning {
1056 self.maybe_trigger_post_command_learning(command).await;
1057 }
1058 DispatchFlow::Continue
1059 }
1060 Some(Err(e)) => {
1061 let _ = self.channel.send(&e.to_string()).await;
1062 let _ = self.channel.flush_chunks().await;
1063 tracing::warn!(command = %command, error = %e.0, "slash command failed");
1064 DispatchFlow::Continue
1065 }
1066 None => DispatchFlow::Fallthrough,
1067 }
1068 }
1069
1070 fn apply_provider_override(&mut self) {
1072 if let Some(ref slot) = self.providers.provider_override
1073 && let Some(new_provider) = slot.write().take()
1074 {
1075 tracing::debug!(provider = new_provider.name(), "ACP model override applied");
1076 self.provider = new_provider;
1077 }
1078 }
1079
1080 async fn next_event(&mut self) -> Result<Option<LoopEvent>, error::AgentError> {
1088 let event = tokio::select! {
1089 result = self.channel.recv() => {
1090 return Ok(result?.map(LoopEvent::Message));
1091 }
1092 () = shutdown_signal(&mut self.lifecycle.shutdown) => {
1093 tracing::info!("shutting down");
1094 LoopEvent::Shutdown
1095 }
1096 Some(_) = recv_optional(&mut self.skill_state.skill_reload_rx) => {
1097 LoopEvent::SkillReload
1098 }
1099 Some(_) = recv_optional(&mut self.instructions.reload_rx) => {
1100 LoopEvent::InstructionReload
1101 }
1102 Some(_) = recv_optional(&mut self.lifecycle.config_reload_rx) => {
1103 LoopEvent::ConfigReload
1104 }
1105 Some(msg) = recv_optional(&mut self.lifecycle.update_notify_rx) => {
1106 LoopEvent::UpdateNotification(msg)
1107 }
1108 Some(msg) = recv_optional(&mut self.experiments.notify_rx) => {
1109 LoopEvent::ExperimentCompleted(msg)
1110 }
1111 Some(prompt) = recv_optional(&mut self.lifecycle.custom_task_rx) => {
1112 tracing::info!("scheduler: injecting custom task as agent turn");
1113 LoopEvent::ScheduledTask(prompt)
1114 }
1115 () = async {
1116 if let Some(ref mut ls) = self.lifecycle.user_loop {
1117 if ls.cancel_tx.is_cancelled() {
1118 std::future::pending::<()>().await;
1119 } else {
1120 ls.interval.tick().await;
1121 }
1122 } else {
1123 std::future::pending::<()>().await;
1124 }
1125 } => {
1126 let Some(ls) = self.lifecycle.user_loop.as_ref() else {
1130 return Ok(None);
1131 };
1132 if ls.cancel_tx.is_cancelled() {
1133 self.lifecycle.user_loop = None;
1134 return Ok(None);
1135 }
1136 let prompt = ls.prompt.clone();
1137 LoopEvent::TaskInjected(task_injection::TaskInjection { prompt })
1138 }
1139 Some(event) = recv_optional(&mut self.lifecycle.file_changed_rx) => {
1140 LoopEvent::FileChanged(event)
1141 }
1142 };
1143 Ok(Some(event))
1144 }
1145
1146 async fn resolve_message(
1147 &self,
1148 msg: crate::channel::ChannelMessage,
1149 ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
1150 use crate::channel::{Attachment, AttachmentKind};
1151 use zeph_llm::provider::{ImageData, MessagePart};
1152
1153 let text_base = msg.text.clone();
1154
1155 let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
1156 .attachments
1157 .into_iter()
1158 .partition(|a| a.kind == AttachmentKind::Audio);
1159
1160 tracing::debug!(
1161 audio = audio_attachments.len(),
1162 has_stt = self.providers.stt.is_some(),
1163 "resolve_message attachments"
1164 );
1165
1166 let text = if !audio_attachments.is_empty()
1167 && let Some(stt) = self.providers.stt.as_ref()
1168 {
1169 let mut transcribed_parts = Vec::new();
1170 for attachment in &audio_attachments {
1171 if attachment.data.len() > MAX_AUDIO_BYTES {
1172 tracing::warn!(
1173 size = attachment.data.len(),
1174 max = MAX_AUDIO_BYTES,
1175 "audio attachment exceeds size limit, skipping"
1176 );
1177 continue;
1178 }
1179 match stt
1180 .transcribe(&attachment.data, attachment.filename.as_deref())
1181 .await
1182 {
1183 Ok(result) => {
1184 tracing::info!(
1185 len = result.text.len(),
1186 language = ?result.language,
1187 "audio transcribed"
1188 );
1189 transcribed_parts.push(result.text);
1190 }
1191 Err(e) => {
1192 tracing::error!(error = %e, "audio transcription failed");
1193 }
1194 }
1195 }
1196 if transcribed_parts.is_empty() {
1197 text_base
1198 } else {
1199 let transcribed = transcribed_parts.join("\n");
1200 if text_base.is_empty() {
1201 transcribed
1202 } else {
1203 format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
1204 }
1205 }
1206 } else {
1207 if !audio_attachments.is_empty() {
1208 tracing::warn!(
1209 count = audio_attachments.len(),
1210 "audio attachments received but no STT provider configured, dropping"
1211 );
1212 }
1213 text_base
1214 };
1215
1216 let mut image_parts = Vec::new();
1217 for attachment in image_attachments {
1218 if attachment.data.len() > MAX_IMAGE_BYTES {
1219 tracing::warn!(
1220 size = attachment.data.len(),
1221 max = MAX_IMAGE_BYTES,
1222 "image attachment exceeds size limit, skipping"
1223 );
1224 continue;
1225 }
1226 let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
1227 image_parts.push(MessagePart::Image(Box::new(ImageData {
1228 data: attachment.data,
1229 mime_type,
1230 })));
1231 }
1232
1233 (text, image_parts)
1234 }
1235
1236 fn begin_turn(&mut self, input: turn::TurnInput) -> turn::Turn {
1243 let id = turn::TurnId(self.debug_state.iteration_counter as u64);
1244 self.debug_state.iteration_counter += 1;
1245 self.lifecycle.cancel_token = CancellationToken::new();
1246 self.security.user_provided_urls.write().clear();
1247 turn::Turn::new(id, input)
1248 }
1249
1250 fn end_turn(&mut self, turn: turn::Turn) {
1257 self.metrics.pending_timings = turn.metrics.timings;
1258 self.flush_turn_timings();
1259 self.session.current_turn_intent = None;
1261 }
1262
1263 #[cfg_attr(
1264 feature = "profiling",
1265 tracing::instrument(name = "agent.turn", skip_all, fields(turn_id))
1266 )]
1267 async fn process_user_message(
1268 &mut self,
1269 text: String,
1270 image_parts: Vec<zeph_llm::provider::MessagePart>,
1271 ) -> Result<(), error::AgentError> {
1272 let input = turn::TurnInput::new(text, image_parts);
1273 let mut t = self.begin_turn(input);
1274
1275 let turn_idx = usize::try_from(t.id().0).unwrap_or(usize::MAX);
1276 tracing::Span::current().record("turn_id", t.id().0);
1277 self.debug_state
1279 .start_iteration_span(turn_idx, t.input.text.trim());
1280
1281 let result = self.process_user_message_inner(&mut t).await;
1282
1283 let span_status = if result.is_ok() {
1285 crate::debug_dump::trace::SpanStatus::Ok
1286 } else {
1287 crate::debug_dump::trace::SpanStatus::Error {
1288 message: "iteration failed".to_owned(),
1289 }
1290 };
1291 self.debug_state.end_iteration_span(turn_idx, span_status);
1292
1293 self.end_turn(t);
1294 result
1295 }
1296
1297 async fn process_user_message_inner(
1298 &mut self,
1299 turn: &mut turn::Turn,
1300 ) -> Result<(), error::AgentError> {
1301 let bg_signal = self.lifecycle.supervisor.reap();
1305 if bg_signal.did_summarize {
1306 self.memory_state.persistence.unsummarized_count = 0;
1307 tracing::debug!("background summarization completed; unsummarized_count reset");
1308 }
1309 {
1310 let snap = self.lifecycle.supervisor.metrics_snapshot();
1311 self.update_metrics(|m| {
1312 m.bg_inflight = snap.inflight as u64;
1313 m.bg_dropped = snap.total_dropped();
1314 m.bg_completed = snap.total_completed();
1315 m.bg_enrichment_inflight = snap.class_inflight[0] as u64;
1316 m.bg_telemetry_inflight = snap.class_inflight[1] as u64;
1317 });
1318 }
1319
1320 if self.runtime.supervisor_config.abort_enrichment_on_turn {
1324 self.lifecycle
1325 .supervisor
1326 .abort_class(agent_supervisor::TaskClass::Enrichment);
1327 }
1328
1329 let signal = Arc::clone(&self.lifecycle.cancel_signal);
1333 let token = turn.cancel_token.clone();
1334 self.lifecycle.cancel_token = turn.cancel_token.clone();
1336 if let Some(prev) = self.lifecycle.cancel_bridge_handle.take() {
1337 prev.abort();
1338 }
1339 self.lifecycle.cancel_bridge_handle = Some(tokio::spawn(async move {
1340 signal.notified().await;
1341 token.cancel();
1342 }));
1343
1344 let text = turn.input.text.clone();
1346 let trimmed_owned = text.trim().to_owned();
1347 let trimmed = trimmed_owned.as_str();
1348
1349 if self.security.vigil.is_some() {
1352 let intent_len = trimmed.floor_char_boundary(1024.min(trimmed.len()));
1353 self.session.current_turn_intent = Some(trimmed[..intent_len].to_owned());
1354 }
1355
1356 if let Some(result) = self.dispatch_slash_command(trimmed).await {
1357 return result;
1358 }
1359
1360 self.check_pending_rollbacks().await;
1361
1362 if self.pre_process_security(trimmed).await? {
1363 return Ok(());
1364 }
1365
1366 let t_ctx = std::time::Instant::now();
1367 tracing::debug!("turn timing: prepare_context start");
1368 self.advance_context_lifecycle(&text, trimmed).await;
1369 turn.metrics_mut().timings.prepare_context_ms =
1370 u64::try_from(t_ctx.elapsed().as_millis()).unwrap_or(u64::MAX);
1371 tracing::debug!(
1372 ms = turn.metrics_snapshot().timings.prepare_context_ms,
1373 "turn timing: prepare_context done"
1374 );
1375
1376 let image_parts = std::mem::take(&mut turn.input.image_parts);
1377 let user_msg = self.build_user_message(&text, image_parts);
1378
1379 let urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
1382 if !urls.is_empty() {
1383 self.security.user_provided_urls.write().extend(urls);
1384 }
1385
1386 self.memory_state.extraction.goal_text = Some(text.clone());
1389
1390 let t_persist = std::time::Instant::now();
1391 tracing::debug!("turn timing: persist_message(user) start");
1392 self.persist_message(Role::User, &text, &[], false).await;
1394 turn.metrics_mut().timings.persist_message_ms =
1395 u64::try_from(t_persist.elapsed().as_millis()).unwrap_or(u64::MAX);
1396 tracing::debug!(
1397 ms = turn.metrics_snapshot().timings.persist_message_ms,
1398 "turn timing: persist_message(user) done"
1399 );
1400 self.push_message(user_msg);
1401
1402 tracing::debug!("turn timing: process_response start");
1405 if let Err(e) = self.process_response().await {
1406 self.learning_engine.learning_tasks.detach_all();
1408 tracing::error!("Response processing failed: {e:#}");
1409 let user_msg = format!("Error: {e:#}");
1410 self.channel.send(&user_msg).await?;
1411 self.msg.messages.pop();
1412 self.recompute_prompt_tokens();
1413 self.channel.flush_chunks().await?;
1414 } else {
1415 self.learning_engine.learning_tasks.detach_all();
1418 self.truncate_old_tool_results();
1419 self.maybe_update_magic_docs();
1421 }
1422 tracing::debug!("turn timing: process_response done");
1423
1424 #[cfg(feature = "self-check")]
1426 if let Some(pipeline) = self.quality.clone() {
1427 self.run_self_check_for_turn(pipeline, turn.id().0).await;
1428 }
1429 let _ = self.channel.flush_chunks().await;
1434
1435 turn.metrics_mut().timings.llm_chat_ms = self.metrics.pending_timings.llm_chat_ms;
1440 turn.metrics_mut().timings.tool_exec_ms = self.metrics.pending_timings.tool_exec_ms;
1441
1442 Ok(())
1443 }
1444
1445 #[cfg_attr(
1447 feature = "profiling",
1448 tracing::instrument(name = "agent.security_prescreen", skip_all)
1449 )]
1450 async fn pre_process_security(&mut self, trimmed: &str) -> Result<bool, error::AgentError> {
1451 if let Some(ref guardrail) = self.security.guardrail {
1453 use zeph_sanitizer::guardrail::GuardrailVerdict;
1454 let verdict = guardrail.check(trimmed).await;
1455 match &verdict {
1456 GuardrailVerdict::Flagged { reason, .. } => {
1457 tracing::warn!(
1458 reason = %reason,
1459 should_block = verdict.should_block(),
1460 "guardrail flagged user input"
1461 );
1462 if verdict.should_block() {
1463 let msg = format!("[guardrail] Input blocked: {reason}");
1464 let _ = self.channel.send(&msg).await;
1465 let _ = self.channel.flush_chunks().await;
1466 return Ok(true);
1467 }
1468 let _ = self
1470 .channel
1471 .send(&format!("[guardrail] Warning: {reason}"))
1472 .await;
1473 }
1474 GuardrailVerdict::Error { error } => {
1475 if guardrail.error_should_block() {
1476 tracing::warn!(%error, "guardrail check failed (fail_strategy=closed), blocking input");
1477 let msg = "[guardrail] Input blocked: check failed (see logs for details)";
1478 let _ = self.channel.send(msg).await;
1479 let _ = self.channel.flush_chunks().await;
1480 return Ok(true);
1481 }
1482 tracing::warn!(%error, "guardrail check failed (fail_strategy=open), allowing input");
1483 }
1484 GuardrailVerdict::Safe => {}
1485 }
1486 }
1487
1488 #[cfg(feature = "classifiers")]
1494 if self.security.sanitizer.scan_user_input() {
1495 match self.security.sanitizer.classify_injection(trimmed).await {
1496 zeph_sanitizer::InjectionVerdict::Blocked => {
1497 self.push_classifier_metrics();
1498 let _ = self
1499 .channel
1500 .send("[security] Input blocked: injection detected by classifier.")
1501 .await;
1502 let _ = self.channel.flush_chunks().await;
1503 return Ok(true);
1504 }
1505 zeph_sanitizer::InjectionVerdict::Suspicious => {
1506 tracing::warn!("injection_classifier soft_signal on user input");
1507 }
1508 zeph_sanitizer::InjectionVerdict::Clean => {}
1509 }
1510 }
1511 #[cfg(feature = "classifiers")]
1512 self.push_classifier_metrics();
1513
1514 Ok(false)
1515 }
1516
1517 #[cfg_attr(
1518 feature = "profiling",
1519 tracing::instrument(name = "agent.prepare_context", skip_all)
1520 )]
1521 async fn advance_context_lifecycle(&mut self, text: &str, trimmed: &str) {
1522 self.mcp.pruning_cache.reset();
1524
1525 let conv_id = self.memory_state.persistence.conversation_id;
1528 self.rebuild_system_prompt(text).await;
1529
1530 self.detect_and_record_corrections(trimmed, conv_id).await;
1531 self.learning_engine.tick();
1532 self.analyze_and_learn().await;
1533 self.sync_graph_counts().await;
1534
1535 self.context_manager.compaction = self.context_manager.compaction.advance_turn();
1540
1541 {
1543 self.focus.tick();
1544
1545 let sidequest_should_fire = self.sidequest.tick();
1548 if sidequest_should_fire && !self.context_manager.compaction.is_compacted_this_turn() {
1549 self.maybe_sidequest_eviction();
1550 }
1551 }
1552
1553 if let Some(warning) = self.cache_expiry_warning() {
1555 tracing::info!(warning, "cache expiry warning");
1556 let _ = self.channel.send_status(&warning).await;
1557 }
1558
1559 self.maybe_time_based_microcompact();
1562
1563 self.maybe_apply_deferred_summaries();
1568 self.flush_deferred_summaries().await;
1569
1570 if let Err(e) = self.maybe_proactive_compress().await {
1572 tracing::warn!("proactive compression failed: {e:#}");
1573 }
1574
1575 if let Err(e) = self.maybe_compact().await {
1576 tracing::warn!("context compaction failed: {e:#}");
1577 }
1578
1579 if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
1580 tracing::warn!("context preparation failed: {e:#}");
1581 }
1582
1583 self.provider
1585 .set_memory_confidence(self.memory_state.persistence.last_recall_confidence);
1586
1587 self.learning_engine.reset_reflection();
1588 }
1589
1590 fn build_user_message(
1591 &mut self,
1592 text: &str,
1593 image_parts: Vec<zeph_llm::provider::MessagePart>,
1594 ) -> Message {
1595 let mut all_image_parts = std::mem::take(&mut self.msg.pending_image_parts);
1596 all_image_parts.extend(image_parts);
1597
1598 if !all_image_parts.is_empty() && self.provider.supports_vision() {
1599 let mut parts = vec![zeph_llm::provider::MessagePart::Text {
1600 text: text.to_owned(),
1601 }];
1602 parts.extend(all_image_parts);
1603 Message::from_parts(Role::User, parts)
1604 } else {
1605 if !all_image_parts.is_empty() {
1606 tracing::warn!(
1607 count = all_image_parts.len(),
1608 "image attachments dropped: provider does not support vision"
1609 );
1610 }
1611 Message {
1612 role: Role::User,
1613 content: text.to_owned(),
1614 parts: vec![],
1615 metadata: MessageMetadata::default(),
1616 }
1617 }
1618 }
1619
1620 async fn poll_subagent_until_done(&mut self, task_id: &str, label: &str) -> Option<String> {
1623 use zeph_subagent::SubAgentState;
1624 let result = loop {
1625 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1626
1627 #[allow(clippy::redundant_closure_for_method_calls)]
1631 let pending = self
1632 .orchestration
1633 .subagent_manager
1634 .as_mut()
1635 .and_then(|m| m.try_recv_secret_request());
1636 if let Some((req_task_id, req)) = pending {
1637 let confirm_prompt = format!(
1640 "Sub-agent requests secret '{}'. Allow?",
1641 crate::text::truncate_to_chars(&req.secret_key, 100)
1642 );
1643 let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
1644 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1645 if approved {
1646 let ttl = std::time::Duration::from_mins(5);
1647 let key = req.secret_key.clone();
1648 if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
1649 let _ = mgr.deliver_secret(&req_task_id, key);
1650 }
1651 } else {
1652 let _ = mgr.deny_secret(&req_task_id);
1653 }
1654 }
1655 }
1656
1657 let mgr = self.orchestration.subagent_manager.as_ref()?;
1658 let statuses = mgr.statuses();
1659 let Some((_, status)) = statuses.iter().find(|(id, _)| id == task_id) else {
1660 break format!("{label} completed (no status available).");
1661 };
1662 match status.state {
1663 SubAgentState::Completed => {
1664 let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
1665 break format!("{label} completed: {msg}");
1666 }
1667 SubAgentState::Failed => {
1668 let msg = status
1669 .last_message
1670 .clone()
1671 .unwrap_or_else(|| "unknown error".into());
1672 break format!("{label} failed: {msg}");
1673 }
1674 SubAgentState::Canceled => {
1675 break format!("{label} was cancelled.");
1676 }
1677 _ => {
1678 let _ = self
1679 .channel
1680 .send_status(&format!(
1681 "{label}: turn {}/{}",
1682 status.turns_used,
1683 self.orchestration
1684 .subagent_manager
1685 .as_ref()
1686 .and_then(|m| m.agents_def(task_id))
1687 .map_or(20, |d| d.permissions.max_turns)
1688 ))
1689 .await;
1690 }
1691 }
1692 };
1693 Some(result)
1694 }
1695
1696 fn resolve_agent_id_prefix(&mut self, prefix: &str) -> Option<Result<String, String>> {
1699 let mgr = self.orchestration.subagent_manager.as_mut()?;
1700 let full_ids: Vec<String> = mgr
1701 .statuses()
1702 .into_iter()
1703 .map(|(tid, _)| tid)
1704 .filter(|tid| tid.starts_with(prefix))
1705 .collect();
1706 Some(match full_ids.as_slice() {
1707 [] => Err(format!("No sub-agent with id prefix '{prefix}'")),
1708 [fid] => Ok(fid.clone()),
1709 _ => Err(format!(
1710 "Ambiguous id prefix '{prefix}': matches {} agents",
1711 full_ids.len()
1712 )),
1713 })
1714 }
1715
1716 fn handle_agent_list(&self) -> Option<String> {
1717 use std::fmt::Write as _;
1718 let mgr = self.orchestration.subagent_manager.as_ref()?;
1719 let defs = mgr.definitions();
1720 if defs.is_empty() {
1721 return Some("No sub-agent definitions found.".into());
1722 }
1723 let mut out = String::from("Available sub-agents:\n");
1724 for d in defs {
1725 let memory_label = match d.memory {
1726 Some(zeph_subagent::MemoryScope::User) => " [memory:user]",
1727 Some(zeph_subagent::MemoryScope::Project) => " [memory:project]",
1728 Some(zeph_subagent::MemoryScope::Local) => " [memory:local]",
1729 None => "",
1730 };
1731 if let Some(ref src) = d.source {
1732 let _ = writeln!(
1733 out,
1734 " {}{} — {} ({})",
1735 d.name, memory_label, d.description, src
1736 );
1737 } else {
1738 let _ = writeln!(out, " {}{} — {}", d.name, memory_label, d.description);
1739 }
1740 }
1741 Some(out)
1742 }
1743
1744 fn handle_agent_status(&self) -> Option<String> {
1745 use std::fmt::Write as _;
1746 let mgr = self.orchestration.subagent_manager.as_ref()?;
1747 let statuses = mgr.statuses();
1748 if statuses.is_empty() {
1749 return Some("No active sub-agents.".into());
1750 }
1751 let mut out = String::from("Active sub-agents:\n");
1752 for (id, s) in &statuses {
1753 let state = format!("{:?}", s.state).to_lowercase();
1754 let elapsed = s.started_at.elapsed().as_secs();
1755 let _ = writeln!(
1756 out,
1757 " [{short}] {state} turns={t} elapsed={elapsed}s {msg}",
1758 short = &id[..8.min(id.len())],
1759 t = s.turns_used,
1760 msg = s.last_message.as_deref().unwrap_or(""),
1761 );
1762 if let Some(def) = mgr.agents_def(id)
1764 && let Some(scope) = def.memory
1765 && let Ok(dir) = zeph_subagent::memory::resolve_memory_dir(scope, &def.name)
1766 {
1767 let _ = writeln!(out, " memory: {}", dir.display());
1768 }
1769 }
1770 Some(out)
1771 }
1772
1773 fn handle_agent_approve(&mut self, id: &str) -> Option<String> {
1774 let full_id = match self.resolve_agent_id_prefix(id)? {
1775 Ok(fid) => fid,
1776 Err(msg) => return Some(msg),
1777 };
1778 let mgr = self.orchestration.subagent_manager.as_mut()?;
1779 if let Some((tid, req)) = mgr.try_recv_secret_request()
1780 && tid == full_id
1781 {
1782 let key = req.secret_key.clone();
1783 let ttl = std::time::Duration::from_mins(5);
1784 if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
1785 return Some(format!("Approve failed: {e}"));
1786 }
1787 if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
1788 return Some(format!("Secret delivery failed: {e}"));
1789 }
1790 return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
1791 }
1792 Some(format!(
1793 "No pending secret request for sub-agent '{full_id}'."
1794 ))
1795 }
1796
1797 fn handle_agent_deny(&mut self, id: &str) -> Option<String> {
1798 let full_id = match self.resolve_agent_id_prefix(id)? {
1799 Ok(fid) => fid,
1800 Err(msg) => return Some(msg),
1801 };
1802 let mgr = self.orchestration.subagent_manager.as_mut()?;
1803 match mgr.deny_secret(&full_id) {
1804 Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
1805 Err(e) => Some(format!("Deny failed: {e}")),
1806 }
1807 }
1808
1809 #[allow(clippy::too_many_lines)]
1810 async fn handle_agent_command(&mut self, cmd: zeph_subagent::AgentCommand) -> Option<String> {
1811 use zeph_subagent::AgentCommand;
1812
1813 match cmd {
1814 AgentCommand::List => self.handle_agent_list(),
1815 AgentCommand::Background { name, prompt } => {
1816 let provider = self.provider.clone();
1817 let tool_executor = Arc::clone(&self.tool_executor);
1818 let skills = self.filtered_skills_for(&name);
1819 let cfg = self.orchestration.subagent_config.clone();
1820 let spawn_ctx = self.build_spawn_context(&cfg);
1821 let mgr = self.orchestration.subagent_manager.as_mut()?;
1822 match mgr.spawn(
1823 &name,
1824 &prompt,
1825 provider,
1826 tool_executor,
1827 skills,
1828 &cfg,
1829 spawn_ctx,
1830 ) {
1831 Ok(id) => Some(format!(
1832 "Sub-agent '{name}' started in background (id: {short})",
1833 short = &id[..8.min(id.len())]
1834 )),
1835 Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
1836 }
1837 }
1838 AgentCommand::Spawn { name, prompt }
1839 | AgentCommand::Mention {
1840 agent: name,
1841 prompt,
1842 } => {
1843 let provider = self.provider.clone();
1845 let tool_executor = Arc::clone(&self.tool_executor);
1846 let skills = self.filtered_skills_for(&name);
1847 let cfg = self.orchestration.subagent_config.clone();
1848 let spawn_ctx = self.build_spawn_context(&cfg);
1849 let mgr = self.orchestration.subagent_manager.as_mut()?;
1850 let task_id = match mgr.spawn(
1851 &name,
1852 &prompt,
1853 provider,
1854 tool_executor,
1855 skills,
1856 &cfg,
1857 spawn_ctx,
1858 ) {
1859 Ok(id) => id,
1860 Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
1861 };
1862 let short = task_id[..8.min(task_id.len())].to_owned();
1863 let _ = self
1864 .channel
1865 .send(&format!("Sub-agent '{name}' running... (id: {short})"))
1866 .await;
1867 let label = format!("Sub-agent '{name}'");
1868 self.poll_subagent_until_done(&task_id, &label).await
1869 }
1870 AgentCommand::Status => self.handle_agent_status(),
1871 AgentCommand::Cancel { id } => {
1872 let mgr = self.orchestration.subagent_manager.as_mut()?;
1873 let ids: Vec<String> = mgr
1875 .statuses()
1876 .into_iter()
1877 .map(|(task_id, _)| task_id)
1878 .filter(|task_id| task_id.starts_with(&id))
1879 .collect();
1880 match ids.as_slice() {
1881 [] => Some(format!("No sub-agent with id prefix '{id}'")),
1882 [full_id] => {
1883 let full_id = full_id.clone();
1884 match mgr.cancel(&full_id) {
1885 Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
1886 Err(e) => Some(format!("Cancel failed: {e}")),
1887 }
1888 }
1889 _ => Some(format!(
1890 "Ambiguous id prefix '{id}': matches {} agents",
1891 ids.len()
1892 )),
1893 }
1894 }
1895 AgentCommand::Approve { id } => self.handle_agent_approve(&id),
1896 AgentCommand::Deny { id } => self.handle_agent_deny(&id),
1897 AgentCommand::Resume { id, prompt } => {
1898 let cfg = self.orchestration.subagent_config.clone();
1899 let def_name = {
1902 let mgr = self.orchestration.subagent_manager.as_ref()?;
1903 match mgr.def_name_for_resume(&id, &cfg) {
1904 Ok(name) => name,
1905 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
1906 }
1907 };
1908 let skills = self.filtered_skills_for(&def_name);
1909 let provider = self.provider.clone();
1910 let tool_executor = Arc::clone(&self.tool_executor);
1911 let mgr = self.orchestration.subagent_manager.as_mut()?;
1912 let (task_id, _) =
1913 match mgr.resume(&id, &prompt, provider, tool_executor, skills, &cfg) {
1914 Ok(pair) => pair,
1915 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
1916 };
1917 let short = task_id[..8.min(task_id.len())].to_owned();
1918 let _ = self
1919 .channel
1920 .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
1921 .await;
1922 self.poll_subagent_until_done(&task_id, "Resumed sub-agent")
1923 .await
1924 }
1925 }
1926 }
1927
1928 fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
1929 let mgr = self.orchestration.subagent_manager.as_ref()?;
1930 let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
1931 let reg = self.skill_state.registry.read();
1932 match zeph_subagent::filter_skills(®, &def.skills) {
1933 Ok(skills) => {
1934 let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
1935 if bodies.is_empty() {
1936 None
1937 } else {
1938 Some(bodies)
1939 }
1940 }
1941 Err(e) => {
1942 tracing::warn!(error = %e, "skill filtering failed for sub-agent");
1943 None
1944 }
1945 }
1946 }
1947
1948 fn build_spawn_context(
1950 &self,
1951 cfg: &zeph_config::SubAgentConfig,
1952 ) -> zeph_subagent::SpawnContext {
1953 zeph_subagent::SpawnContext {
1954 parent_messages: self.extract_parent_messages(cfg),
1955 parent_cancel: Some(self.lifecycle.cancel_token.clone()),
1956 parent_provider_name: {
1957 let name = &self.runtime.active_provider_name;
1958 if name.is_empty() {
1959 None
1960 } else {
1961 Some(name.clone())
1962 }
1963 },
1964 spawn_depth: self.runtime.spawn_depth,
1965 mcp_tool_names: self.extract_mcp_tool_names(),
1966 }
1967 }
1968
1969 fn extract_parent_messages(
1974 &self,
1975 config: &zeph_config::SubAgentConfig,
1976 ) -> Vec<zeph_llm::provider::Message> {
1977 use zeph_llm::provider::Role;
1978 if config.context_window_turns == 0 {
1979 return Vec::new();
1980 }
1981 let non_system: Vec<_> = self
1982 .msg
1983 .messages
1984 .iter()
1985 .filter(|m| m.role != Role::System)
1986 .cloned()
1987 .collect();
1988 let take_count = config.context_window_turns * 2;
1989 let start = non_system.len().saturating_sub(take_count);
1990 let mut msgs = non_system[start..].to_vec();
1991
1992 let max_chars = 128_000usize / 4; let mut total_chars: usize = 0;
1995 let mut keep = msgs.len();
1996 for (i, m) in msgs.iter().enumerate() {
1997 total_chars += m.content.len();
1998 if total_chars > max_chars {
1999 keep = i;
2000 break;
2001 }
2002 }
2003 if keep < msgs.len() {
2004 tracing::info!(
2005 kept = keep,
2006 requested = config.context_window_turns * 2,
2007 "[subagent] truncated parent history from {} to {} turns due to token budget",
2008 config.context_window_turns * 2,
2009 keep
2010 );
2011 msgs.truncate(keep);
2012 }
2013 msgs
2014 }
2015
2016 fn extract_mcp_tool_names(&self) -> Vec<String> {
2018 self.tool_executor
2019 .tool_definitions_erased()
2020 .into_iter()
2021 .filter(|t| t.id.starts_with("mcp_"))
2022 .map(|t| t.id.to_string())
2023 .collect()
2024 }
2025
2026 fn classify_source_kind(
2030 skill_dir: &std::path::Path,
2031 managed_dir: Option<&std::path::PathBuf>,
2032 bundled_names: &std::collections::HashSet<String>,
2033 ) -> zeph_memory::store::SourceKind {
2034 if managed_dir.is_some_and(|d| skill_dir.starts_with(d)) {
2035 let skill_name = skill_dir.file_name().and_then(|n| n.to_str()).unwrap_or("");
2036 let has_marker = skill_dir.join(".bundled").exists();
2037 if has_marker && bundled_names.contains(skill_name) {
2038 zeph_memory::store::SourceKind::Bundled
2039 } else {
2040 if has_marker {
2041 tracing::warn!(
2042 skill = %skill_name,
2043 "skill has .bundled marker but is not in the bundled skill \
2044 allowlist — classifying as Hub"
2045 );
2046 }
2047 zeph_memory::store::SourceKind::Hub
2048 }
2049 } else {
2050 zeph_memory::store::SourceKind::Local
2051 }
2052 }
2053
2054 async fn update_trust_for_reloaded_skills(
2056 &mut self,
2057 all_meta: &[zeph_skills::loader::SkillMeta],
2058 ) {
2059 let memory = self.memory_state.persistence.memory.clone();
2061 let Some(memory) = memory else {
2062 return;
2063 };
2064 let trust_cfg = self.skill_state.trust_config.clone();
2065 let managed_dir = self.skill_state.managed_dir.clone();
2066 let bundled_names: std::collections::HashSet<String> =
2067 zeph_skills::bundled_skill_names().into_iter().collect();
2068 for meta in all_meta {
2069 let skill_dir = meta.skill_dir.clone();
2072 let managed_dir_ref = managed_dir.clone();
2073 let bundled_names_ref = bundled_names.clone();
2074 let fs_result: Option<(String, zeph_memory::store::SourceKind)> =
2075 tokio::task::spawn_blocking(move || {
2076 let hash = zeph_skills::compute_skill_hash(&skill_dir).ok()?;
2077 let source_kind = Self::classify_source_kind(
2078 &skill_dir,
2079 managed_dir_ref.as_ref(),
2080 &bundled_names_ref,
2081 );
2082 Some((hash, source_kind))
2083 })
2084 .await
2085 .unwrap_or(None);
2086
2087 let Some((current_hash, source_kind)) = fs_result else {
2088 tracing::warn!("failed to compute hash for '{}'", meta.name);
2089 continue;
2090 };
2091 let initial_level = match source_kind {
2092 zeph_memory::store::SourceKind::Bundled => &trust_cfg.bundled_level,
2093 zeph_memory::store::SourceKind::Hub => &trust_cfg.default_level,
2094 zeph_memory::store::SourceKind::Local | zeph_memory::store::SourceKind::File => {
2095 &trust_cfg.local_level
2096 }
2097 };
2098 let existing = memory
2099 .sqlite()
2100 .load_skill_trust(&meta.name)
2101 .await
2102 .ok()
2103 .flatten();
2104 let trust_level_str = if let Some(ref row) = existing {
2105 if row.blake3_hash != current_hash {
2106 trust_cfg.hash_mismatch_level.to_string()
2107 } else if row.source_kind != source_kind {
2108 let stored = row
2112 .trust_level
2113 .parse::<zeph_tools::SkillTrustLevel>()
2114 .unwrap_or_else(|_| {
2115 tracing::warn!(
2116 skill = %meta.name,
2117 raw = %row.trust_level,
2118 "unrecognised trust_level in DB, treating as quarantined"
2119 );
2120 zeph_tools::SkillTrustLevel::Quarantined
2121 });
2122 if !stored.is_active() || stored.severity() <= initial_level.severity() {
2123 row.trust_level.clone()
2124 } else {
2125 initial_level.to_string()
2126 }
2127 } else {
2128 row.trust_level.clone()
2129 }
2130 } else {
2131 initial_level.to_string()
2132 };
2133 let source_path = meta.skill_dir.to_str();
2134 if let Err(e) = memory
2135 .sqlite()
2136 .upsert_skill_trust(
2137 &meta.name,
2138 &trust_level_str,
2139 source_kind,
2140 None,
2141 source_path,
2142 ¤t_hash,
2143 )
2144 .await
2145 {
2146 tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
2147 }
2148 }
2149 }
2150
2151 async fn rebuild_skill_matcher(&mut self, all_meta: &[&zeph_skills::loader::SkillMeta]) {
2153 let provider = self.embedding_provider.clone();
2154 let embed_timeout = std::time::Duration::from_secs(self.runtime.timeouts.embedding_seconds);
2155 let embed_fn = move |text: &str| -> zeph_skills::matcher::EmbedFuture {
2156 let owned = text.to_owned();
2157 let p = provider.clone();
2158 Box::pin(async move {
2159 if let Ok(result) = tokio::time::timeout(embed_timeout, p.embed(&owned)).await {
2160 result
2161 } else {
2162 tracing::warn!(
2163 timeout_secs = embed_timeout.as_secs(),
2164 "skill matcher: embedding timed out"
2165 );
2166 Err(zeph_llm::LlmError::Timeout)
2167 }
2168 })
2169 };
2170
2171 let needs_inmemory_rebuild = !self
2172 .skill_state
2173 .matcher
2174 .as_ref()
2175 .is_some_and(SkillMatcherBackend::is_qdrant);
2176
2177 if needs_inmemory_rebuild {
2178 self.skill_state.matcher = SkillMatcher::new(all_meta, embed_fn)
2179 .await
2180 .map(SkillMatcherBackend::InMemory);
2181 } else if let Some(ref mut backend) = self.skill_state.matcher {
2182 let _ = self.channel.send_status("syncing skill index...").await;
2183 let on_progress: Option<Box<dyn Fn(usize, usize) + Send>> = self
2184 .session
2185 .status_tx
2186 .clone()
2187 .map(|tx| -> Box<dyn Fn(usize, usize) + Send> {
2188 Box::new(move |completed, total| {
2189 let msg = format!("Syncing skills: {completed}/{total}");
2190 let _ = tx.send(msg);
2191 })
2192 });
2193 if let Err(e) = backend
2194 .sync(
2195 all_meta,
2196 &self.skill_state.embedding_model,
2197 embed_fn,
2198 on_progress,
2199 )
2200 .await
2201 {
2202 tracing::warn!("failed to sync skill embeddings: {e:#}");
2203 }
2204 }
2205
2206 if self.skill_state.hybrid_search {
2207 let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
2208 let _ = self.channel.send_status("rebuilding search index...").await;
2209 self.skill_state.rebuild_bm25(&descs);
2210 }
2211 }
2212
2213 #[cfg_attr(
2214 feature = "profiling",
2215 tracing::instrument(name = "skill.hot_reload", skip_all)
2216 )]
2217 async fn reload_skills(&mut self) {
2218 let old_fp = self.skill_state.fingerprint();
2219 let reload_paths = if let Some(ref supplier) = self.skill_state.plugin_dirs_supplier {
2220 let plugin_dirs = supplier();
2221 let mut paths = self.skill_state.skill_paths.clone();
2222 for dir in plugin_dirs {
2223 if !paths.contains(&dir) {
2224 paths.push(dir);
2225 }
2226 }
2227 paths
2228 } else {
2229 self.skill_state.skill_paths.clone()
2230 };
2231 self.skill_state.registry.write().reload(&reload_paths);
2232 if self.skill_state.fingerprint() == old_fp {
2233 return;
2234 }
2235 let _ = self.channel.send_status("reloading skills...").await;
2236
2237 let all_meta = self
2238 .skill_state
2239 .registry
2240 .read()
2241 .all_meta()
2242 .into_iter()
2243 .cloned()
2244 .collect::<Vec<_>>();
2245
2246 self.update_trust_for_reloaded_skills(&all_meta).await;
2247
2248 let all_meta_refs = all_meta.iter().collect::<Vec<_>>();
2249 self.rebuild_skill_matcher(&all_meta_refs).await;
2250
2251 let all_skills: Vec<Skill> = {
2252 let reg = self.skill_state.registry.read();
2253 reg.all_meta()
2254 .iter()
2255 .filter_map(|m| reg.get_skill(&m.name).ok())
2256 .collect()
2257 };
2258 let trust_map = self.build_skill_trust_map().await;
2259 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
2260 let skills_prompt = SkillState::rebuild_prompt(&all_skills, &trust_map, &empty_health);
2261 self.skill_state
2262 .last_skills_prompt
2263 .clone_from(&skills_prompt);
2264 let system_prompt = build_system_prompt(&skills_prompt, None);
2265 if let Some(msg) = self.msg.messages.first_mut() {
2266 msg.content = system_prompt;
2267 }
2268
2269 let _ = self.channel.send_status("").await;
2270 tracing::info!(
2271 "reloaded {} skill(s)",
2272 self.skill_state.registry.read().all_meta().len()
2273 );
2274 }
2275
2276 fn reload_instructions(&mut self) {
2277 if let Some(ref mut rx) = self.instructions.reload_rx {
2279 while rx.try_recv().is_ok() {}
2280 }
2281 let Some(ref state) = self.instructions.reload_state else {
2282 return;
2283 };
2284 let new_blocks = crate::instructions::load_instructions(
2285 &state.base_dir,
2286 &state.provider_kinds,
2287 &state.explicit_files,
2288 state.auto_detect,
2289 );
2290 let old_sources: std::collections::HashSet<_> =
2291 self.instructions.blocks.iter().map(|b| &b.source).collect();
2292 let new_sources: std::collections::HashSet<_> =
2293 new_blocks.iter().map(|b| &b.source).collect();
2294 for added in new_sources.difference(&old_sources) {
2295 tracing::info!(path = %added.display(), "instruction file added");
2296 }
2297 for removed in old_sources.difference(&new_sources) {
2298 tracing::info!(path = %removed.display(), "instruction file removed");
2299 }
2300 tracing::info!(
2301 old_count = self.instructions.blocks.len(),
2302 new_count = new_blocks.len(),
2303 "reloaded instruction files"
2304 );
2305 self.instructions.blocks = new_blocks;
2306 }
2307
2308 fn reload_config(&mut self) {
2309 let Some(path) = self.lifecycle.config_path.clone() else {
2310 return;
2311 };
2312 let Some(config) = self.load_config_with_overlay(&path) else {
2313 return;
2314 };
2315 let budget_tokens = resolve_context_budget(&config, &self.provider);
2316 self.runtime.security = config.security;
2317 self.runtime.timeouts = config.timeouts;
2318 self.runtime.redact_credentials = config.memory.redact_credentials;
2319 self.memory_state.persistence.history_limit = config.memory.history_limit;
2320 self.memory_state.persistence.recall_limit = config.memory.semantic.recall_limit;
2321 self.memory_state.compaction.summarization_threshold =
2322 config.memory.summarization_threshold;
2323 self.skill_state.max_active_skills = config.skills.max_active_skills;
2324 self.skill_state.disambiguation_threshold = config.skills.disambiguation_threshold;
2325 self.skill_state.min_injection_score = config.skills.min_injection_score;
2326 self.skill_state.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
2327 self.skill_state.hybrid_search = config.skills.hybrid_search;
2328 self.skill_state.two_stage_matching = config.skills.two_stage_matching;
2329 self.skill_state.confusability_threshold =
2330 config.skills.confusability_threshold.clamp(0.0, 1.0);
2331 config
2332 .skills
2333 .generation_provider
2334 .as_str()
2335 .clone_into(&mut self.skill_state.generation_provider_name);
2336 self.skill_state.generation_output_dir =
2337 config.skills.generation_output_dir.as_deref().map(|p| {
2338 if let Some(stripped) = p.strip_prefix("~/") {
2339 dirs::home_dir()
2340 .map_or_else(|| std::path::PathBuf::from(p), |h| h.join(stripped))
2341 } else {
2342 std::path::PathBuf::from(p)
2343 }
2344 });
2345
2346 self.context_manager.budget = Some(
2347 ContextBudget::new(budget_tokens, 0.20).with_graph_enabled(config.memory.graph.enabled),
2348 );
2349
2350 {
2351 let graph_cfg = &config.memory.graph;
2352 if graph_cfg.rpe.enabled {
2353 if self.memory_state.extraction.rpe_router.is_none() {
2355 self.memory_state.extraction.rpe_router =
2356 Some(std::sync::Mutex::new(zeph_memory::RpeRouter::new(
2357 graph_cfg.rpe.threshold,
2358 graph_cfg.rpe.max_skip_turns,
2359 )));
2360 }
2361 } else {
2362 self.memory_state.extraction.rpe_router = None;
2363 }
2364 self.memory_state.extraction.graph_config = graph_cfg.clone();
2365 }
2366 self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
2367 self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
2368 self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
2369 self.context_manager.compaction_cooldown_turns = config.memory.compaction_cooldown_turns;
2370 self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
2371 self.context_manager.compression = config.memory.compression.clone();
2372 self.context_manager.routing = config.memory.store_routing.clone();
2373 self.context_manager.store_routing_provider = if config
2375 .memory
2376 .store_routing
2377 .routing_classifier_provider
2378 .is_empty()
2379 {
2380 None
2381 } else {
2382 let resolved = self.resolve_background_provider(
2383 &config.memory.store_routing.routing_classifier_provider,
2384 );
2385 Some(std::sync::Arc::new(resolved))
2386 };
2387 self.memory_state.persistence.cross_session_score_threshold =
2388 config.memory.cross_session_score_threshold;
2389
2390 self.index.repo_map_tokens = config.index.repo_map_tokens;
2391 self.index.repo_map_ttl = std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
2392
2393 tracing::info!("config reloaded");
2394 }
2395
2396 fn load_config_with_overlay(&mut self, path: &std::path::Path) -> Option<Config> {
2400 let mut config = match Config::load(path) {
2401 Ok(c) => c,
2402 Err(e) => {
2403 tracing::warn!("config reload failed: {e:#}");
2404 return None;
2405 }
2406 };
2407
2408 let new_overlay = if self.lifecycle.plugins_dir.as_os_str().is_empty() {
2410 None
2411 } else {
2412 match zeph_plugins::apply_plugin_config_overlays(
2413 &mut config,
2414 &self.lifecycle.plugins_dir,
2415 ) {
2416 Ok(o) => Some(o),
2417 Err(e) => {
2418 tracing::warn!(
2419 "plugin overlay merge failed during reload: {e:#}; \
2420 keeping previous runtime state"
2421 );
2422 return None;
2423 }
2424 }
2425 };
2426
2427 if let Some(ref overlay) = new_overlay {
2431 self.warn_on_shell_overlay_divergence(overlay, &config);
2432 }
2433 Some(config)
2434 }
2435
2436 fn warn_on_shell_overlay_divergence(
2442 &self,
2443 new_overlay: &zeph_plugins::ResolvedOverlay,
2444 config: &Config,
2445 ) {
2446 let new_blocked: Vec<String> = {
2447 let mut v = config.tools.shell.blocked_commands.clone();
2448 v.sort();
2449 v
2450 };
2451 let new_allowed: Vec<String> = {
2452 let mut v = config.tools.shell.allowed_commands.clone();
2453 v.sort();
2454 v
2455 };
2456
2457 let startup = &self.lifecycle.startup_shell_overlay;
2458 let blocked_changed = new_blocked != startup.blocked;
2459 let allowed_changed = new_allowed != startup.allowed;
2460
2461 if blocked_changed && let Some(ref h) = self.lifecycle.shell_policy_handle {
2463 h.rebuild(&config.tools.shell);
2464 tracing::info!(
2465 blocked_count = h.snapshot_blocked().len(),
2466 "shell blocked_commands rebuilt from hot-reload"
2467 );
2468 }
2469
2470 if allowed_changed {
2477 let msg = "plugin config overlay changed shell allowed_commands; RESTART REQUIRED \
2478 for sandbox path recomputation (blocked_commands was rebuilt live)";
2479 tracing::warn!("{msg}");
2480 if let Some(ref tx) = self.session.status_tx {
2481 let _ = tx.send(msg.to_owned());
2482 }
2483 }
2484
2485 let _ = new_overlay;
2486 }
2487
2488 #[allow(clippy::too_many_lines)]
2499 fn maybe_sidequest_eviction(&mut self) {
2500 use zeph_llm::provider::{Message, MessageMetadata, Role};
2501
2502 if self.sidequest.config.enabled {
2506 use crate::config::PruningStrategy;
2507 if !matches!(
2508 self.context_manager.compression.pruning_strategy,
2509 PruningStrategy::Reactive
2510 ) {
2511 tracing::warn!(
2512 strategy = ?self.context_manager.compression.pruning_strategy,
2513 "sidequest is enabled alongside a non-Reactive pruning strategy; \
2514 consider disabling sidequest.enabled to avoid redundant eviction"
2515 );
2516 }
2517 }
2518
2519 if self.focus.is_active() {
2521 tracing::debug!("sidequest: skipping — focus session active");
2522 self.compression.pending_sidequest_result = None;
2524 return;
2525 }
2526
2527 if let Some(handle) = self.compression.pending_sidequest_result.take() {
2529 use futures::FutureExt as _;
2531 match handle.now_or_never() {
2532 Some(Ok(Some(evicted_indices))) if !evicted_indices.is_empty() => {
2533 let cursors_snapshot = self.sidequest.tool_output_cursors.clone();
2534 let freed = self.sidequest.apply_eviction(
2535 &mut self.msg.messages,
2536 &evicted_indices,
2537 &self.metrics.token_counter,
2538 );
2539 if freed > 0 {
2540 self.recompute_prompt_tokens();
2541 self.context_manager.compaction =
2544 crate::agent::context_manager::CompactionState::CompactedThisTurn {
2545 cooldown: 0,
2546 };
2547 tracing::info!(
2548 freed_tokens = freed,
2549 evicted_cursors = evicted_indices.len(),
2550 pass = self.sidequest.passes_run,
2551 "sidequest eviction complete"
2552 );
2553 if let Some(ref d) = self.debug_state.debug_dumper {
2554 d.dump_sidequest_eviction(&cursors_snapshot, &evicted_indices, freed);
2555 }
2556 if let Some(ref tx) = self.session.status_tx {
2557 let _ = tx.send(format!("SideQuest evicted {freed} tokens"));
2558 }
2559 } else {
2560 if let Some(ref tx) = self.session.status_tx {
2562 let _ = tx.send(String::new());
2563 }
2564 }
2565 }
2566 Some(Ok(None | Some(_))) => {
2567 tracing::debug!("sidequest: pending result: no cursors to evict");
2568 if let Some(ref tx) = self.session.status_tx {
2569 let _ = tx.send(String::new());
2570 }
2571 }
2572 Some(Err(e)) => {
2573 tracing::debug!("sidequest: background task panicked: {e}");
2574 if let Some(ref tx) = self.session.status_tx {
2575 let _ = tx.send(String::new());
2576 }
2577 }
2578 None => {
2579 tracing::debug!(
2583 "sidequest: background LLM task not yet complete, rescheduling"
2584 );
2585 }
2586 }
2587 }
2588
2589 self.sidequest
2591 .rebuild_cursors(&self.msg.messages, &self.metrics.token_counter);
2592
2593 if self.sidequest.tool_output_cursors.is_empty() {
2594 tracing::debug!("sidequest: no eligible cursors");
2595 return;
2596 }
2597
2598 let prompt = self.sidequest.build_eviction_prompt();
2599 let max_eviction_ratio = self.sidequest.config.max_eviction_ratio;
2600 let n_cursors = self.sidequest.tool_output_cursors.len();
2601 let provider = self.summary_or_primary_provider().clone();
2603
2604 let handle = tokio::spawn(async move {
2606 let msgs = [Message {
2607 role: Role::User,
2608 content: prompt,
2609 parts: vec![],
2610 metadata: MessageMetadata::default(),
2611 }];
2612 let response =
2613 match tokio::time::timeout(std::time::Duration::from_secs(5), provider.chat(&msgs))
2614 .await
2615 {
2616 Ok(Ok(r)) => r,
2617 Ok(Err(e)) => {
2618 tracing::debug!("sidequest bg: LLM call failed: {e:#}");
2619 return None;
2620 }
2621 Err(_) => {
2622 tracing::debug!("sidequest bg: LLM call timed out");
2623 return None;
2624 }
2625 };
2626
2627 let start = response.find('{')?;
2628 let end = response.rfind('}')?;
2629 if start > end {
2630 return None;
2631 }
2632 let json_slice = &response[start..=end];
2633 let parsed: sidequest::EvictionResponse = serde_json::from_str(json_slice).ok()?;
2634 let mut valid: Vec<usize> = parsed
2635 .del_cursors
2636 .into_iter()
2637 .filter(|&c| c < n_cursors)
2638 .collect();
2639 valid.sort_unstable();
2640 valid.dedup();
2641 #[allow(
2642 clippy::cast_precision_loss,
2643 clippy::cast_possible_truncation,
2644 clippy::cast_sign_loss
2645 )]
2646 let max_evict = ((n_cursors as f32) * max_eviction_ratio).ceil() as usize;
2647 valid.truncate(max_evict);
2648 Some(valid)
2649 });
2650
2651 self.compression.pending_sidequest_result = Some(handle);
2652 tracing::debug!("sidequest: background LLM eviction task spawned");
2653 if let Some(ref tx) = self.session.status_tx {
2654 let _ = tx.send("SideQuest: scoring tool outputs...".into());
2655 }
2656 }
2657
2658 pub(crate) async fn check_cwd_changed(&mut self) {
2664 let current = match std::env::current_dir() {
2665 Ok(p) => p,
2666 Err(e) => {
2667 tracing::warn!("check_cwd_changed: failed to get cwd: {e}");
2668 return;
2669 }
2670 };
2671 if current == self.lifecycle.last_known_cwd {
2672 return;
2673 }
2674 let old_cwd = std::mem::replace(&mut self.lifecycle.last_known_cwd, current.clone());
2675 self.session.env_context.working_dir = current.display().to_string();
2676
2677 tracing::info!(
2678 old = %old_cwd.display(),
2679 new = %current.display(),
2680 "working directory changed"
2681 );
2682
2683 let _ = self
2684 .channel
2685 .send_status("Working directory changed\u{2026}")
2686 .await;
2687
2688 let hooks = self.session.hooks_config.cwd_changed.clone();
2689 if !hooks.is_empty() {
2690 let mut env = std::collections::HashMap::new();
2691 env.insert("ZEPH_OLD_CWD".to_owned(), old_cwd.display().to_string());
2692 env.insert("ZEPH_NEW_CWD".to_owned(), current.display().to_string());
2693 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env).await {
2694 tracing::warn!(error = %e, "CwdChanged hook failed");
2695 }
2696 }
2697
2698 let _ = self.channel.send_status("").await;
2699 }
2700
2701 pub(crate) async fn handle_file_changed(
2703 &mut self,
2704 event: crate::file_watcher::FileChangedEvent,
2705 ) {
2706 tracing::info!(path = %event.path.display(), "file changed");
2707
2708 let _ = self
2709 .channel
2710 .send_status("Running file-change hook\u{2026}")
2711 .await;
2712
2713 let hooks = self.session.hooks_config.file_changed_hooks.clone();
2714 if !hooks.is_empty() {
2715 let mut env = std::collections::HashMap::new();
2716 env.insert(
2717 "ZEPH_CHANGED_PATH".to_owned(),
2718 event.path.display().to_string(),
2719 );
2720 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env).await {
2721 tracing::warn!(error = %e, "FileChanged hook failed");
2722 }
2723 }
2724
2725 let _ = self.channel.send_status("").await;
2726 }
2727}
2728pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
2729 while !*rx.borrow_and_update() {
2730 if rx.changed().await.is_err() {
2731 std::future::pending::<()>().await;
2732 }
2733 }
2734}
2735
2736pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
2737 match rx {
2738 Some(inner) => {
2739 if let Some(v) = inner.recv().await {
2740 Some(v)
2741 } else {
2742 *rx = None;
2743 std::future::pending().await
2744 }
2745 }
2746 None => std::future::pending().await,
2747 }
2748}
2749
2750pub(crate) fn resolve_context_budget(config: &Config, provider: &AnyProvider) -> usize {
2756 let tokens = if config.memory.auto_budget && config.memory.context_budget_tokens == 0 {
2757 if let Some(ctx_size) = provider.context_window() {
2758 tracing::info!(
2759 model_context = ctx_size,
2760 "auto-configured context budget on reload"
2761 );
2762 ctx_size
2763 } else {
2764 0
2765 }
2766 } else {
2767 config.memory.context_budget_tokens
2768 };
2769 if tokens == 0 {
2770 tracing::warn!(
2771 "context_budget_tokens resolved to 0 on reload — using fallback of 128000 tokens"
2772 );
2773 128_000
2774 } else {
2775 tokens
2776 }
2777}
2778
2779#[cfg(test)]
2780mod tests;
2781
2782#[cfg(test)]
2783pub(crate) use tests::agent_tests;
2784
2785#[cfg(test)]
2786mod test_stubs {
2787 use std::pin::Pin;
2788
2789 use zeph_commands::{
2790 CommandContext, CommandError, CommandHandler, CommandOutput, SlashCategory,
2791 };
2792
2793 pub(super) struct TestErrorCommand;
2799
2800 impl CommandHandler<CommandContext<'_>> for TestErrorCommand {
2801 fn name(&self) -> &'static str {
2802 "/test-error"
2803 }
2804
2805 fn description(&self) -> &'static str {
2806 "Test stub: always returns CommandError"
2807 }
2808
2809 fn category(&self) -> SlashCategory {
2810 SlashCategory::Session
2811 }
2812
2813 fn handle<'a>(
2814 &'a self,
2815 _ctx: &'a mut CommandContext<'_>,
2816 _args: &'a str,
2817 ) -> Pin<
2818 Box<dyn std::future::Future<Output = Result<CommandOutput, CommandError>> + Send + 'a>,
2819 > {
2820 Box::pin(async { Err(CommandError::new("boom")) })
2821 }
2822 }
2823}