1mod accessors;
5mod builder;
6pub(crate) mod compaction_strategy;
7#[cfg(feature = "compression-guidelines")]
8pub(super) mod compression_feedback;
9mod context;
10pub(crate) mod context_manager;
11pub mod error;
12#[cfg(feature = "experiments")]
13mod experiment_cmd;
14pub(super) mod feedback_detector;
15pub(crate) mod focus;
16mod graph_commands;
17#[cfg(feature = "compression-guidelines")]
18mod guidelines_commands;
19mod index;
20mod learning;
21pub(crate) mod learning_engine;
22mod log_commands;
23#[cfg(feature = "lsp-context")]
24mod lsp_commands;
25mod mcp;
26mod memory_commands;
27mod message_queue;
28mod persistence;
29#[cfg(feature = "policy-enforcer")]
30mod policy_commands;
31mod provider_cmd;
32pub(crate) mod rate_limiter;
33#[cfg(feature = "scheduler")]
34mod scheduler_commands;
35pub mod session_config;
36mod session_digest;
37pub(crate) mod sidequest;
38mod skill_management;
39pub mod slash_commands;
40pub(crate) mod state;
41pub(crate) mod tool_execution;
42pub(crate) mod tool_orchestrator;
43mod trust_commands;
44mod utils;
45
46use std::collections::{HashMap, HashSet, VecDeque};
47use std::sync::Arc;
48use std::time::Instant;
49
50use tokio::sync::{Notify, mpsc, watch};
51use tokio_util::sync::CancellationToken;
52use zeph_llm::any::AnyProvider;
53use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
54use zeph_memory::TokenCounter;
55use zeph_memory::semantic::SemanticMemory;
56use zeph_skills::loader::Skill;
57use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
58use zeph_skills::prompt::format_skills_prompt;
59use zeph_skills::registry::SkillRegistry;
60use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
61
62use crate::channel::Channel;
63use crate::config::Config;
64use crate::config::{SecurityConfig, SkillPromptMode, TimeoutConfig};
65use crate::context::{
66 ContextBudget, EnvironmentContext, build_system_prompt, build_system_prompt_with_instructions,
67};
68use zeph_sanitizer::ContentSanitizer;
69
70use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, detect_image_mime};
71#[cfg(feature = "context-compression")]
72use state::CompressionState;
73use state::{
74 DebugState, ExperimentState, FeedbackState, IndexState, InstructionState, LifecycleState,
75 McpState, MemoryState, MessageState, MetricsState, OrchestrationState, ProviderState,
76 RuntimeConfig, SecurityState, SessionState, SkillState,
77};
78
79pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
80pub(crate) const DOCUMENT_RAG_PREFIX: &str = "## Relevant documents\n";
81pub(crate) const RECALL_PREFIX: &str = "[semantic recall]\n";
82pub(crate) const CODE_CONTEXT_PREFIX: &str = "[code context]\n";
83pub(crate) const SUMMARY_PREFIX: &str = "[conversation summaries]\n";
84pub(crate) const CROSS_SESSION_PREFIX: &str = "[cross-session context]\n";
85pub(crate) const CORRECTIONS_PREFIX: &str = "[past corrections]\n";
86pub(crate) const GRAPH_FACTS_PREFIX: &str = "[known facts]\n";
87pub(crate) const SCHEDULED_TASK_PREFIX: &str = "Execute the following scheduled task now: ";
88pub(crate) const SESSION_DIGEST_PREFIX: &str = "[Session digest from previous interaction]\n";
89#[cfg(feature = "lsp-context")]
94pub(crate) const LSP_NOTE_PREFIX: &str = "[lsp ";
95pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
96
97fn format_plan_summary(graph: &crate::orchestration::TaskGraph) -> String {
98 use std::fmt::Write;
99 let mut out = String::new();
100 let _ = writeln!(out, "Plan: \"{}\"", graph.goal);
101 let _ = writeln!(out, "Tasks: {}", graph.tasks.len());
102 let _ = writeln!(out);
103 for (i, task) in graph.tasks.iter().enumerate() {
104 let deps = if task.depends_on.is_empty() {
105 String::new()
106 } else {
107 let ids: Vec<String> = task.depends_on.iter().map(ToString::to_string).collect();
108 format!(" (after: {})", ids.join(", "))
109 };
110 let agent = task.agent_hint.as_deref().unwrap_or("-");
111 let _ = writeln!(out, " {}. [{}] {}{}", i + 1, agent, task.title, deps);
112 }
113 out
114}
115
116fn collect_and_truncate_task_outputs(
121 graph: &crate::orchestration::TaskGraph,
122 max_tokens: u32,
123) -> String {
124 use crate::orchestration::TaskStatus;
125
126 let char_budget = max_tokens as usize * 4;
127 let mut raw = String::new();
128 for task in &graph.tasks {
129 if task.status == TaskStatus::Completed
130 && let Some(ref result) = task.result
131 {
132 if !raw.is_empty() {
133 raw.push('\n');
134 }
135 raw.push_str(&result.output);
136 }
137 }
138 if raw.len() > char_budget {
139 tracing::warn!(
140 original_len = raw.len(),
141 truncated_to = char_budget,
142 "whole-plan verify: output truncated to verify_max_tokens * 4 chars"
143 );
144 raw.chars().take(char_budget).collect()
145 } else {
146 raw
147 }
148}
149
150pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
151 use std::fmt::Write;
152 let capacity = "[tool output: ".len()
153 + tool_name.len()
154 + "]\n```\n".len()
155 + body.len()
156 + TOOL_OUTPUT_SUFFIX.len();
157 let mut buf = String::with_capacity(capacity);
158 let _ = write!(
159 buf,
160 "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
161 );
162 buf
163}
164
165pub struct Agent<C: Channel> {
166 provider: AnyProvider,
167 embedding_provider: AnyProvider,
172 channel: C,
173 pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
174 pub(super) msg: MessageState,
175 pub(super) memory_state: MemoryState,
176 pub(super) skill_state: SkillState,
177 pub(super) context_manager: context_manager::ContextManager,
178 pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
179 pub(super) learning_engine: learning_engine::LearningEngine,
180 pub(super) feedback: FeedbackState,
181 pub(super) runtime: RuntimeConfig,
182 pub(super) mcp: McpState,
183 pub(super) index: IndexState,
184 pub(super) session: SessionState,
185 pub(super) debug_state: DebugState,
186 pub(super) instructions: InstructionState,
187 pub(super) security: SecurityState,
188 pub(super) experiments: ExperimentState,
189 #[cfg(feature = "context-compression")]
190 pub(super) compression: CompressionState,
191 pub(super) lifecycle: LifecycleState,
192 pub(super) providers: ProviderState,
193 pub(super) metrics: MetricsState,
194 pub(super) orchestration: OrchestrationState,
195 pub(super) focus: focus::FocusState,
197 pub(super) sidequest: sidequest::SidequestState,
199 pub(super) tool_schema_filter: Option<zeph_tools::ToolSchemaFilter>,
201 pub(super) cached_filtered_tool_ids: Option<HashSet<String>>,
204 pub(super) dependency_graph: Option<zeph_tools::ToolDependencyGraph>,
207 pub(super) dependency_always_on: HashSet<String>,
209 pub(super) completed_tool_ids: HashSet<String>,
213 pub(super) last_persisted_message_id: Option<i64>,
216 pub(super) deferred_db_hide_ids: Vec<i64>,
218 pub(super) deferred_db_summaries: Vec<String>,
220 pub(super) runtime_layers: Vec<std::sync::Arc<dyn crate::runtime_layer::RuntimeLayer>>,
224}
225
226impl<C: Channel> Agent<C> {
227 #[must_use]
228 pub fn new(
229 provider: AnyProvider,
230 channel: C,
231 registry: SkillRegistry,
232 matcher: Option<SkillMatcherBackend>,
233 max_active_skills: usize,
234 tool_executor: impl ToolExecutor + 'static,
235 ) -> Self {
236 let registry = std::sync::Arc::new(std::sync::RwLock::new(registry));
237 Self::new_with_registry_arc(
238 provider,
239 channel,
240 registry,
241 matcher,
242 max_active_skills,
243 tool_executor,
244 )
245 }
246
247 #[must_use]
254 #[allow(clippy::too_many_lines)] pub fn new_with_registry_arc(
256 provider: AnyProvider,
257 channel: C,
258 registry: std::sync::Arc<std::sync::RwLock<SkillRegistry>>,
259 matcher: Option<SkillMatcherBackend>,
260 max_active_skills: usize,
261 tool_executor: impl ToolExecutor + 'static,
262 ) -> Self {
263 debug_assert!(max_active_skills > 0, "max_active_skills must be > 0");
264 let all_skills: Vec<Skill> = {
265 let reg = registry.read().expect("registry read lock poisoned");
266 reg.all_meta()
267 .iter()
268 .filter_map(|m| reg.get_skill(&m.name).ok())
269 .collect()
270 };
271 let empty_trust = HashMap::new();
272 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
273 let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
274 let system_prompt = build_system_prompt(&skills_prompt, None, None, false);
275 tracing::debug!(len = system_prompt.len(), "initial system prompt built");
276 tracing::trace!(prompt = %system_prompt, "full system prompt");
277
278 let initial_prompt_tokens = u64::try_from(system_prompt.len()).unwrap_or(0) / 4;
279 let (_tx, rx) = watch::channel(false);
280 let token_counter = Arc::new(TokenCounter::new());
281 #[cfg(feature = "experiments")]
285 let (exp_notify_tx, exp_notify_rx) = tokio::sync::mpsc::channel::<String>(4);
286 #[cfg(not(feature = "experiments"))]
287 let (_exp_notify_tx, exp_notify_rx) = tokio::sync::mpsc::channel::<String>(4);
288 let embedding_provider = provider.clone();
289 Self {
290 provider,
291 embedding_provider,
292 channel,
293 tool_executor: Arc::new(tool_executor),
294 msg: MessageState {
295 messages: vec![Message {
296 role: Role::System,
297 content: system_prompt,
298 parts: vec![],
299 metadata: MessageMetadata::default(),
300 }],
301 message_queue: VecDeque::new(),
302 pending_image_parts: Vec::new(),
303 },
304 memory_state: MemoryState {
305 memory: None,
306 conversation_id: None,
307 history_limit: 50,
308 recall_limit: 5,
309 summarization_threshold: 50,
310 cross_session_score_threshold: 0.35,
311 autosave_assistant: false,
312 autosave_min_length: 20,
313 tool_call_cutoff: 6,
314 unsummarized_count: 0,
315 document_config: crate::config::DocumentConfig::default(),
316 graph_config: crate::config::GraphConfig::default(),
317 compression_guidelines_config: zeph_memory::CompressionGuidelinesConfig::default(),
318 shutdown_summary: true,
319 shutdown_summary_min_messages: 4,
320 shutdown_summary_max_messages: 20,
321 shutdown_summary_timeout_secs: 10,
322 structured_summaries: false,
323 last_recall_confidence: None,
324 digest_config: crate::config::DigestConfig::default(),
325 cached_session_digest: None,
326 context_strategy: crate::config::ContextStrategy::default(),
327 crossover_turn_threshold: 20,
328 rpe_router: None,
329 goal_text: None,
330 },
331 skill_state: SkillState {
332 registry,
333 skill_paths: Vec::new(),
334 managed_dir: None,
335 trust_config: crate::config::TrustConfig::default(),
336 matcher,
337 max_active_skills,
338 disambiguation_threshold: 0.20,
339 min_injection_score: 0.20,
340 embedding_model: String::new(),
341 skill_reload_rx: None,
342 active_skill_names: Vec::new(),
343 last_skills_prompt: skills_prompt,
344 prompt_mode: SkillPromptMode::Auto,
345 available_custom_secrets: HashMap::new(),
346 cosine_weight: 0.7,
347 hybrid_search: false,
348 bm25_index: None,
349 two_stage_matching: false,
350 confusability_threshold: 0.0,
351 },
352 context_manager: context_manager::ContextManager::new(),
353 tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
354 learning_engine: learning_engine::LearningEngine::new(),
355 feedback: FeedbackState {
356 detector: feedback_detector::FeedbackDetector::new(0.6),
357 judge: None,
358 llm_classifier: None,
359 },
360 debug_state: DebugState {
361 debug_dumper: None,
362 dump_format: crate::debug_dump::DumpFormat::default(),
363 trace_collector: None,
364 iteration_counter: 0,
365 anomaly_detector: None,
366 reasoning_model_warning: true,
367 logging_config: crate::config::LoggingConfig::default(),
368 dump_dir: None,
369 trace_service_name: String::new(),
370 trace_redact: true,
371 current_iteration_span_id: None,
372 },
373 runtime: RuntimeConfig {
374 security: SecurityConfig::default(),
375 timeouts: TimeoutConfig::default(),
376 model_name: String::new(),
377 active_provider_name: String::new(),
378 permission_policy: zeph_tools::PermissionPolicy::default(),
379 redact_credentials: true,
380 rate_limiter: rate_limiter::ToolRateLimiter::new(
381 rate_limiter::RateLimitConfig::default(),
382 ),
383 semantic_cache_enabled: false,
384 semantic_cache_threshold: 0.95,
385 semantic_cache_max_candidates: 10,
386 dependency_config: zeph_tools::DependencyConfig::default(),
387 #[cfg(feature = "policy-enforcer")]
388 adversarial_policy_info: None,
389 },
390 mcp: McpState {
391 tools: Vec::new(),
392 registry: None,
393 manager: None,
394 allowed_commands: Vec::new(),
395 max_dynamic: 10,
396 elicitation_rx: None,
397 shared_tools: None,
398 tool_rx: None,
399 server_outcomes: Vec::new(),
400 pruning_cache: zeph_mcp::PruningCache::new(),
401 pruning_provider: None,
402 pruning_enabled: false,
403 pruning_params: zeph_mcp::PruningParams::default(),
404 semantic_index: None,
405 discovery_strategy: zeph_mcp::ToolDiscoveryStrategy::default(),
406 discovery_params: zeph_mcp::DiscoveryParams::default(),
407 discovery_provider: None,
408 elicitation_warn_sensitive_fields: true,
409 },
410 index: IndexState {
411 retriever: None,
412 repo_map_tokens: 0,
413 cached_repo_map: None,
414 repo_map_ttl: std::time::Duration::from_secs(300),
415 },
416 session: SessionState {
417 env_context: EnvironmentContext::gather(""),
418 response_cache: None,
419 parent_tool_use_id: None,
420 status_tx: None,
421 #[cfg(feature = "lsp-context")]
422 lsp_hooks: None,
423 #[cfg(feature = "policy-enforcer")]
424 policy_config: None,
425 hooks_config: state::HooksConfigSnapshot::default(),
426 },
427 instructions: InstructionState {
428 blocks: Vec::new(),
429 reload_rx: None,
430 reload_state: None,
431 },
432 security: SecurityState {
433 sanitizer: ContentSanitizer::new(&zeph_sanitizer::ContentIsolationConfig::default()),
434 quarantine_summarizer: None,
435 is_acp_session: false,
436 exfiltration_guard: zeph_sanitizer::exfiltration::ExfiltrationGuard::new(
437 zeph_sanitizer::exfiltration::ExfiltrationGuardConfig::default(),
438 ),
439 flagged_urls: std::collections::HashSet::new(),
440 user_provided_urls: std::sync::Arc::new(std::sync::RwLock::new(
441 std::collections::HashSet::new(),
442 )),
443 pii_filter: zeph_sanitizer::pii::PiiFilter::new(
444 zeph_sanitizer::pii::PiiFilterConfig::default(),
445 ),
446 #[cfg(feature = "classifiers")]
447 pii_ner_backend: None,
448 #[cfg(feature = "classifiers")]
449 pii_ner_timeout_ms: 5000,
450 #[cfg(feature = "classifiers")]
451 pii_ner_max_chars: 8192,
452 memory_validator: zeph_sanitizer::memory_validation::MemoryWriteValidator::new(
453 zeph_sanitizer::memory_validation::MemoryWriteValidationConfig::default(),
454 ),
455 #[cfg(feature = "guardrail")]
456 guardrail: None,
457 response_verifier: zeph_sanitizer::response_verifier::ResponseVerifier::new(
458 zeph_config::ResponseVerificationConfig::default(),
459 ),
460 causal_analyzer: None,
461 },
462 experiments: ExperimentState {
463 #[cfg(feature = "experiments")]
464 config: crate::config::ExperimentConfig::default(),
465 #[cfg(feature = "experiments")]
466 cancel: None,
467 #[cfg(feature = "experiments")]
468 baseline: crate::experiments::ConfigSnapshot::default(),
469 #[cfg(feature = "experiments")]
470 eval_provider: None,
471 notify_rx: Some(exp_notify_rx),
472 #[cfg(feature = "experiments")]
473 notify_tx: exp_notify_tx,
474 },
475 #[cfg(feature = "context-compression")]
476 compression: CompressionState {
477 current_task_goal: None,
478 task_goal_user_msg_hash: None,
479 pending_task_goal: None,
480 pending_sidequest_result: None,
481 subgoal_registry: crate::agent::compaction_strategy::SubgoalRegistry::default(),
482 pending_subgoal: None,
483 subgoal_user_msg_hash: None,
484 },
485 lifecycle: LifecycleState {
486 shutdown: rx,
487 start_time: Instant::now(),
488 cancel_signal: Arc::new(Notify::new()),
489 cancel_token: CancellationToken::new(),
490 config_path: None,
491 config_reload_rx: None,
492 warmup_ready: None,
493 update_notify_rx: None,
494 custom_task_rx: None,
495 last_known_cwd: std::env::current_dir().unwrap_or_default(),
496 file_changed_rx: None,
497 file_watcher: None,
498 },
499 providers: ProviderState {
500 summary_provider: None,
501 provider_override: None,
502 judge_provider: None,
503 probe_provider: None,
504 #[cfg(feature = "context-compression")]
505 compress_provider: None,
506 cached_prompt_tokens: initial_prompt_tokens,
507 server_compaction_active: false,
508 stt: None,
509 provider_pool: Vec::new(),
510 provider_config_snapshot: None,
511 },
512 metrics: MetricsState {
513 metrics_tx: None,
514 cost_tracker: None,
515 token_counter,
516 extended_context: false,
517 classifier_metrics: None,
518 },
519 orchestration: OrchestrationState {
520 planner_provider: None,
521 verify_provider: None,
522 pending_graph: None,
523 plan_cancel_token: None,
524 subagent_manager: None,
525 subagent_config: crate::config::SubAgentConfig::default(),
526 orchestration_config: crate::config::OrchestrationConfig::default(),
527 plan_cache: None,
528 pending_goal_embedding: None,
529 },
530 focus: focus::FocusState::default(),
531 sidequest: sidequest::SidequestState::default(),
532 tool_schema_filter: None,
533 cached_filtered_tool_ids: None,
534 dependency_graph: None,
535 dependency_always_on: HashSet::new(),
536 completed_tool_ids: HashSet::new(),
537 last_persisted_message_id: None,
538 deferred_db_hide_ids: Vec::new(),
539 deferred_db_summaries: Vec::new(),
540 runtime_layers: Vec::new(),
541 }
542 }
543
544 pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
549 let Some(mgr) = &mut self.orchestration.subagent_manager else {
550 return vec![];
551 };
552
553 let finished: Vec<String> = mgr
554 .statuses()
555 .into_iter()
556 .filter_map(|(id, status)| {
557 if matches!(
558 status.state,
559 crate::subagent::SubAgentState::Completed
560 | crate::subagent::SubAgentState::Failed
561 | crate::subagent::SubAgentState::Canceled
562 ) {
563 Some(id)
564 } else {
565 None
566 }
567 })
568 .collect();
569
570 let mut results = vec![];
571 for task_id in finished {
572 match mgr.collect(&task_id).await {
573 Ok(result) => results.push((task_id, result)),
574 Err(e) => {
575 tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
576 }
577 }
578 }
579 results
580 }
581
582 async fn handle_plan_command(
583 &mut self,
584 cmd: crate::orchestration::PlanCommand,
585 ) -> Result<(), error::AgentError> {
586 use crate::orchestration::PlanCommand;
587
588 if !self.config_for_orchestration().enabled {
589 self.channel
590 .send(
591 "Task orchestration is disabled. Set `orchestration.enabled = true` in config.",
592 )
593 .await?;
594 return Ok(());
595 }
596
597 match cmd {
598 PlanCommand::Goal(goal) => self.handle_plan_goal(&goal).await,
599 PlanCommand::Confirm => self.handle_plan_confirm().await,
600 PlanCommand::Status(id) => self.handle_plan_status(id.as_deref()).await,
601 PlanCommand::List => self.handle_plan_list().await,
602 PlanCommand::Cancel(id) => self.handle_plan_cancel(id.as_deref()).await,
603 PlanCommand::Resume(id) => self.handle_plan_resume(id.as_deref()).await,
604 PlanCommand::Retry(id) => self.handle_plan_retry(id.as_deref()).await,
605 }
606 }
607
608 fn config_for_orchestration(&self) -> &crate::config::OrchestrationConfig {
609 &self.orchestration.orchestration_config
610 }
611
612 async fn init_plan_cache_if_needed(&mut self) {
616 let plan_cache_config = self.orchestration.orchestration_config.plan_cache.clone();
617 if !plan_cache_config.enabled || self.orchestration.plan_cache.is_some() {
618 return;
619 }
620 if let Some(ref memory) = self.memory_state.memory {
621 let pool = memory.sqlite().pool().clone();
622 let embed_model = self.skill_state.embedding_model.clone();
623 match crate::orchestration::PlanCache::new(pool, plan_cache_config, &embed_model).await
624 {
625 Ok(cache) => self.orchestration.plan_cache = Some(cache),
626 Err(e) => {
627 tracing::warn!(error = %e, "plan cache: init failed, proceeding without cache");
628 }
629 }
630 } else {
631 tracing::warn!("plan cache: memory not configured, proceeding without cache");
632 }
633 }
634
635 async fn goal_embedding_for_cache(&self, goal: &str) -> Option<Vec<f32>> {
640 use crate::orchestration::normalize_goal;
641
642 self.orchestration.plan_cache.as_ref()?;
643 let normalized = normalize_goal(goal);
644 match self.embedding_provider.embed(&normalized).await {
645 Ok(emb) => Some(emb),
646 Err(zeph_llm::LlmError::EmbedUnsupported { .. }) => {
647 tracing::debug!(
648 "plan cache: provider does not support embeddings, skipping cache lookup"
649 );
650 None
651 }
652 Err(e) => {
653 tracing::warn!(error = %e, "plan cache: goal embedding failed, skipping cache");
654 None
655 }
656 }
657 }
658
659 async fn handle_plan_goal(&mut self, goal: &str) -> Result<(), error::AgentError> {
660 use crate::orchestration::{LlmPlanner, plan_with_cache};
661
662 if self.orchestration.pending_graph.is_some() {
663 self.channel
664 .send(
665 "A plan is already pending confirmation. \
666 Use /plan confirm to execute it or /plan cancel to discard.",
667 )
668 .await?;
669 return Ok(());
670 }
671
672 self.channel.send("Planning task decomposition...").await?;
673
674 let available_agents = self
675 .orchestration
676 .subagent_manager
677 .as_ref()
678 .map(|m| m.definitions().to_vec())
679 .unwrap_or_default();
680
681 let confirm_before_execute = self
682 .orchestration
683 .orchestration_config
684 .confirm_before_execute;
685
686 self.init_plan_cache_if_needed().await;
687 let goal_embedding = self.goal_embedding_for_cache(goal).await;
688
689 tracing::debug!(
690 cache_enabled = self.orchestration.orchestration_config.plan_cache.enabled,
691 has_embedding = goal_embedding.is_some(),
692 "plan cache state for goal"
693 );
694
695 let planner_provider = self
696 .orchestration
697 .planner_provider
698 .as_ref()
699 .unwrap_or(&self.provider)
700 .clone();
701 let planner = LlmPlanner::new(planner_provider, &self.orchestration.orchestration_config);
702 let embed_model = self.skill_state.embedding_model.clone();
703 let (graph, planner_usage) = plan_with_cache(
704 &planner,
705 self.orchestration.plan_cache.as_ref(),
706 &self.provider,
707 goal_embedding.as_deref(),
708 &embed_model,
709 goal,
710 &available_agents,
711 self.orchestration.orchestration_config.max_tasks,
712 )
713 .await
714 .map_err(|e| error::AgentError::Other(e.to_string()))?;
715
716 self.orchestration.pending_goal_embedding = goal_embedding;
718
719 let task_count = graph.tasks.len() as u64;
720 let snapshot = crate::metrics::TaskGraphSnapshot::from(&graph);
721 let (planner_prompt, planner_completion) = planner_usage.unwrap_or((0, 0));
722 self.update_metrics(|m| {
723 m.api_calls += 1;
724 m.prompt_tokens += planner_prompt;
725 m.completion_tokens += planner_completion;
726 m.total_tokens = m.prompt_tokens + m.completion_tokens;
727 m.orchestration.plans_total += 1;
728 m.orchestration.tasks_total += task_count;
729 m.orchestration_graph = Some(snapshot);
730 });
731 self.record_cost(planner_prompt, planner_completion);
732 self.record_cache_usage();
733
734 if confirm_before_execute {
735 let summary = format_plan_summary(&graph);
736 self.channel.send(&summary).await?;
737 self.channel
738 .send("Type `/plan confirm` to execute, or `/plan cancel` to abort.")
739 .await?;
740 self.orchestration.pending_graph = Some(graph);
741 } else {
742 let summary = format_plan_summary(&graph);
745 self.channel.send(&summary).await?;
746 self.channel
747 .send("Plan ready. Full execution will be available in a future phase.")
748 .await?;
749 let now = std::time::Instant::now();
751 self.update_metrics(|m| {
752 if let Some(ref mut s) = m.orchestration_graph {
753 "completed".clone_into(&mut s.status);
754 s.completed_at = Some(now);
755 }
756 });
757 }
759
760 Ok(())
761 }
762
763 async fn validate_pending_graph(
769 &mut self,
770 graph: crate::orchestration::TaskGraph,
771 ) -> Result<crate::orchestration::TaskGraph, ()> {
772 use crate::orchestration::GraphStatus;
773
774 if self.orchestration.subagent_manager.is_none() {
775 let _ = self
776 .channel
777 .send(
778 "No sub-agents configured. Add sub-agent definitions to config \
779 to enable plan execution.",
780 )
781 .await;
782 self.orchestration.pending_graph = Some(graph);
783 return Err(());
784 }
785
786 if graph.tasks.is_empty() {
789 let _ = self.channel.send("Plan has no tasks.").await;
790 self.orchestration.pending_graph = Some(graph);
791 return Err(());
792 }
793
794 if matches!(graph.status, GraphStatus::Completed | GraphStatus::Canceled) {
796 let _ = self
797 .channel
798 .send(&format!(
799 "Cannot re-execute a {} plan. Use `/plan <goal>` to create a new one.",
800 graph.status
801 ))
802 .await;
803 self.orchestration.pending_graph = Some(graph);
804 return Err(());
805 }
806
807 Ok(graph)
808 }
809
810 fn build_dag_scheduler(
815 &mut self,
816 graph: crate::orchestration::TaskGraph,
817 ) -> Result<(crate::orchestration::DagScheduler, usize), error::AgentError> {
818 use crate::orchestration::{DagScheduler, GraphStatus, RuleBasedRouter};
819
820 let available_agents = self
821 .orchestration
822 .subagent_manager
823 .as_ref()
824 .map(|m| m.definitions().to_vec())
825 .unwrap_or_default();
826
827 let max_concurrent = self.orchestration.subagent_config.max_concurrent;
831 let max_parallel = self.orchestration.orchestration_config.max_parallel as usize;
832 if max_concurrent < max_parallel + 1 {
833 tracing::warn!(
834 max_concurrent,
835 max_parallel,
836 "max_concurrent < max_parallel + 1: orchestration tasks may be starved by \
837 planning-phase sub-agents; recommend setting max_concurrent >= {}",
838 max_parallel + 1
839 );
840 }
841
842 let reserved = max_parallel.min(max_concurrent.saturating_sub(1));
845 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
846 mgr.reserve_slots(reserved);
847 }
848
849 let scheduler = if graph.status == GraphStatus::Created {
852 DagScheduler::new(
853 graph,
854 &self.orchestration.orchestration_config,
855 Box::new(RuleBasedRouter),
856 available_agents,
857 )
858 } else {
859 DagScheduler::resume_from(
860 graph,
861 &self.orchestration.orchestration_config,
862 Box::new(RuleBasedRouter),
863 available_agents,
864 )
865 }
866 .map_err(|e| {
867 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
869 mgr.release_reservation(reserved);
870 }
871 error::AgentError::Other(e.to_string())
872 })?;
873
874 let provider_names: Vec<&str> = self
876 .providers
877 .provider_pool
878 .iter()
879 .filter_map(|e| e.name.as_deref())
880 .collect();
881 scheduler
882 .validate_verify_config(&provider_names)
883 .map_err(|e| {
884 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
885 mgr.release_reservation(reserved);
886 }
887 error::AgentError::Other(e.to_string())
888 })?;
889
890 Ok((scheduler, reserved))
891 }
892
893 async fn handle_plan_confirm(&mut self) -> Result<(), error::AgentError> {
894 let Some(graph) = self.orchestration.pending_graph.take() else {
895 self.channel
896 .send("No pending plan to confirm. Use `/plan <goal>` to create one.")
897 .await?;
898 return Ok(());
899 };
900
901 let Ok(graph) = self.validate_pending_graph(graph).await else {
903 return Ok(());
904 };
905
906 let (mut scheduler, reserved) = self.build_dag_scheduler(graph)?;
907
908 let task_count = scheduler.graph().tasks.len();
909 self.channel
910 .send(&format!(
911 "Confirmed. Executing plan ({task_count} tasks)..."
912 ))
913 .await?;
914
915 let plan_token = CancellationToken::new();
916 self.orchestration.plan_cancel_token = Some(plan_token.clone());
917
918 let scheduler_result = self
920 .run_scheduler_loop(&mut scheduler, task_count, plan_token)
921 .await;
922 self.orchestration.plan_cancel_token = None;
923
924 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
926 mgr.release_reservation(reserved);
927 }
928
929 let final_status = scheduler_result?;
930
931 let extra_task_outputs = self
936 .run_whole_plan_verify(&mut scheduler, final_status)
937 .await;
938
939 let mut completed_graph = scheduler.into_graph();
940
941 if let Some(extra_tasks) = extra_task_outputs {
945 completed_graph.tasks.extend(extra_tasks);
946 }
947
948 let snapshot = crate::metrics::TaskGraphSnapshot::from(&completed_graph);
950 self.update_metrics(|m| {
951 m.orchestration_graph = Some(snapshot);
952 });
953
954 let result_label = self
955 .finalize_plan_execution(completed_graph, final_status)
956 .await?;
957
958 let now = std::time::Instant::now();
959 self.update_metrics(|m| {
960 if let Some(ref mut s) = m.orchestration_graph {
961 result_label.clone_into(&mut s.status);
962 s.completed_at = Some(now);
963 }
964 });
965 Ok(())
966 }
967
968 async fn run_whole_plan_verify(
977 &mut self,
978 scheduler: &mut crate::orchestration::DagScheduler,
979 final_status: crate::orchestration::GraphStatus,
980 ) -> Option<Vec<crate::orchestration::TaskNode>> {
981 use crate::orchestration::{GraphStatus, PlanVerifier};
982
983 if final_status != GraphStatus::Completed
984 || !self.orchestration.orchestration_config.verify_completeness
985 || scheduler.max_replans_remaining() == 0
986 {
987 return None;
988 }
989
990 let threshold = scheduler.completeness_threshold();
991 let max_tokens = self.orchestration.orchestration_config.verify_max_tokens;
992 let max_tasks = self.orchestration.orchestration_config.max_tasks;
993 let goal = scheduler.graph().goal.clone();
994 let truncated_output = collect_and_truncate_task_outputs(scheduler.graph(), max_tokens);
995
996 if truncated_output.is_empty() {
997 return None;
998 }
999
1000 let verify_provider = self
1001 .orchestration
1002 .verify_provider
1003 .as_ref()
1004 .unwrap_or(&self.provider)
1005 .clone();
1006 let mut verifier =
1007 PlanVerifier::new(verify_provider, max_tokens, self.security.sanitizer.clone());
1008 let result = verifier.verify_plan(&goal, &truncated_output).await;
1009
1010 tracing::debug!(
1011 complete = result.complete,
1012 confidence = result.confidence,
1013 gaps = result.gaps.len(),
1014 threshold,
1015 "whole-plan verification result"
1016 );
1017
1018 let should_replan =
1019 !result.complete && result.confidence < f64::from(threshold) && !result.gaps.is_empty();
1020
1021 if !should_replan {
1022 return None;
1023 }
1024
1025 scheduler.record_whole_plan_replan();
1026
1027 let next_id = u32::try_from(scheduler.graph().tasks.len()).unwrap_or(u32::MAX);
1028 let gap_tasks = match verifier
1029 .replan_from_plan(&goal, &result.gaps, next_id, max_tasks)
1030 .await
1031 {
1032 Ok(tasks) => tasks,
1033 Err(e) => {
1034 tracing::warn!(error = %e, "whole-plan replan_from_plan failed (fail-open)");
1035 return None;
1036 }
1037 };
1038
1039 if gap_tasks.is_empty() {
1040 return None;
1041 }
1042
1043 self.execute_partial_replan_dag(gap_tasks, &goal).await
1044 }
1045
1046 async fn execute_partial_replan_dag(
1051 &mut self,
1052 gap_tasks: Vec<crate::orchestration::TaskNode>,
1053 goal: &str,
1054 ) -> Option<Vec<crate::orchestration::TaskNode>> {
1055 use crate::orchestration::{DagScheduler, RuleBasedRouter, TaskStatus};
1056
1057 let mut partial_graph = crate::orchestration::TaskGraph::new(goal);
1058 partial_graph.tasks = gap_tasks;
1059
1060 let mut partial_config = self.orchestration.orchestration_config.clone();
1061 partial_config.max_replans = 0;
1063 partial_config.verify_completeness = false;
1064
1065 let available_agents = self
1066 .orchestration
1067 .subagent_manager
1068 .as_ref()
1069 .map(|m| m.definitions().to_vec())
1070 .unwrap_or_default();
1071
1072 let mut partial_scheduler = match DagScheduler::new(
1073 partial_graph,
1074 &partial_config,
1075 Box::new(RuleBasedRouter),
1076 available_agents,
1077 ) {
1078 Ok(s) => s,
1079 Err(e) => {
1080 tracing::warn!(
1081 error = %e,
1082 "whole-plan replan: failed to create partial DagScheduler (fail-open)"
1083 );
1084 return None;
1085 }
1086 };
1087
1088 let partial_task_count = partial_scheduler.graph().tasks.len();
1089 let cancel_token = CancellationToken::new();
1090 if let Err(e) = self
1091 .run_scheduler_loop(&mut partial_scheduler, partial_task_count, cancel_token)
1092 .await
1093 {
1094 tracing::warn!(
1095 error = %e,
1096 "whole-plan replan: partial DAG run failed (fail-open)"
1097 );
1098 }
1099
1100 let completed: Vec<_> = partial_scheduler
1101 .into_graph()
1102 .tasks
1103 .into_iter()
1104 .filter(|t| t.status == TaskStatus::Completed)
1105 .collect();
1106
1107 if completed.is_empty() {
1108 None
1109 } else {
1110 Some(completed)
1111 }
1112 }
1113
1114 fn cancel_agents_from_actions(
1118 &mut self,
1119 cancel_actions: Vec<crate::orchestration::SchedulerAction>,
1120 ) -> Option<crate::orchestration::GraphStatus> {
1121 use crate::orchestration::SchedulerAction;
1122 for action in cancel_actions {
1123 match action {
1124 SchedulerAction::Cancel { agent_handle_id } => {
1125 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1126 let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1127 tracing::trace!(error = %e, "cancel: agent already gone");
1128 });
1129 }
1130 }
1131 SchedulerAction::Done { status } => return Some(status),
1132 SchedulerAction::Spawn { .. }
1133 | SchedulerAction::RunInline { .. }
1134 | SchedulerAction::Verify { .. } => {}
1135 }
1136 }
1137 None
1138 }
1139
1140 async fn handle_scheduler_spawn_action(
1145 &mut self,
1146 scheduler: &mut crate::orchestration::DagScheduler,
1147 task_id: crate::orchestration::TaskId,
1148 agent_def_name: String,
1149 prompt: String,
1150 spawn_counter: &mut usize,
1151 task_count: usize,
1152 ) -> (bool, bool, Option<crate::orchestration::GraphStatus>) {
1153 let task_title = scheduler
1154 .graph()
1155 .tasks
1156 .get(task_id.index())
1157 .map_or("unknown", |t| t.title.as_str());
1158
1159 let provider = self.provider.clone();
1160 let tool_executor = Arc::clone(&self.tool_executor);
1161 let skills = self.filtered_skills_for(&agent_def_name);
1162 let cfg = self.orchestration.subagent_config.clone();
1163 let event_tx = scheduler.event_sender();
1164
1165 let mgr = self
1166 .orchestration
1167 .subagent_manager
1168 .as_mut()
1169 .expect("subagent_manager checked above");
1170
1171 let on_done = {
1172 use crate::orchestration::{TaskEvent, TaskOutcome};
1173 move |handle_id: String, result: Result<String, crate::subagent::SubAgentError>| {
1174 let outcome = match &result {
1175 Ok(output) => TaskOutcome::Completed {
1176 output: output.clone(),
1177 artifacts: vec![],
1178 },
1179 Err(e) => TaskOutcome::Failed {
1180 error: e.to_string(),
1181 },
1182 };
1183 let tx = event_tx;
1184 tokio::spawn(async move {
1185 if let Err(e) = tx
1186 .send(TaskEvent {
1187 task_id,
1188 agent_handle_id: handle_id,
1189 outcome,
1190 })
1191 .await
1192 {
1193 tracing::warn!(
1194 error = %e,
1195 "failed to send TaskEvent: scheduler may have been dropped"
1196 );
1197 }
1198 });
1199 }
1200 };
1201
1202 match mgr.spawn_for_task(
1203 &agent_def_name,
1204 &prompt,
1205 provider,
1206 tool_executor,
1207 skills,
1208 &cfg,
1209 on_done,
1210 ) {
1211 Ok(handle_id) => {
1212 *spawn_counter += 1;
1213 let _ = self
1214 .channel
1215 .send_status(&format!(
1216 "Executing task {spawn_counter}/{task_count}: {task_title}..."
1217 ))
1218 .await;
1219 scheduler.record_spawn(task_id, handle_id, agent_def_name);
1220 (true, false, None)
1221 }
1222 Err(e) => {
1223 tracing::error!(error = %e, %task_id, "spawn_for_task failed");
1224 let concurrency_fail =
1225 matches!(e, crate::subagent::SubAgentError::ConcurrencyLimit { .. });
1226 let extra = scheduler.record_spawn_failure(task_id, &e);
1227 let done_status = self.cancel_agents_from_actions(extra);
1228 (false, concurrency_fail, done_status)
1229 }
1230 }
1231 }
1232
1233 async fn handle_run_inline_action(
1238 &mut self,
1239 scheduler: &mut crate::orchestration::DagScheduler,
1240 task_id: crate::orchestration::TaskId,
1241 prompt: String,
1242 spawn_counter: usize,
1243 task_count: usize,
1244 cancel_token: &CancellationToken,
1245 ) {
1246 let task_title = scheduler
1247 .graph()
1248 .tasks
1249 .get(task_id.index())
1250 .map_or("unknown", |t| t.title.as_str());
1251 let _ = self
1252 .channel
1253 .send_status(&format!(
1254 "Executing task {spawn_counter}/{task_count} (inline): {task_title}..."
1255 ))
1256 .await;
1257
1258 let handle_id = format!("__inline_{task_id}__");
1261 scheduler.record_spawn(task_id, handle_id.clone(), "__main__".to_string());
1262
1263 let event_tx = scheduler.event_sender();
1264 let max_iter = self.tool_orchestrator.max_iterations;
1265 let outcome = tokio::select! {
1266 result = self.run_inline_tool_loop(&prompt, max_iter) => {
1267 match result {
1268 Ok(output) => crate::orchestration::TaskOutcome::Completed {
1269 output,
1270 artifacts: vec![],
1271 },
1272 Err(e) => crate::orchestration::TaskOutcome::Failed {
1273 error: e.to_string(),
1274 },
1275 }
1276 }
1277 () = cancel_token.cancelled() => {
1278 crate::orchestration::TaskOutcome::Failed {
1280 error: "canceled".to_string(),
1281 }
1282 }
1283 };
1284 let event = crate::orchestration::TaskEvent {
1285 task_id,
1286 agent_handle_id: handle_id,
1287 outcome,
1288 };
1289 if let Err(e) = event_tx.send(event).await {
1290 tracing::warn!(%task_id, error = %e, "inline task event send failed");
1291 }
1292 }
1293
1294 #[allow(clippy::too_many_lines)]
1299 async fn run_scheduler_loop(
1315 &mut self,
1316 scheduler: &mut crate::orchestration::DagScheduler,
1317 task_count: usize,
1318 cancel_token: CancellationToken,
1319 ) -> Result<crate::orchestration::GraphStatus, error::AgentError> {
1320 use crate::orchestration::{PlanVerifier, SchedulerAction};
1321
1322 let mut spawn_counter: usize = 0;
1326
1327 let mut denied_secrets: std::collections::HashSet<(String, String)> =
1330 std::collections::HashSet::new();
1331
1332 let mut plan_verifier: Option<PlanVerifier<AnyProvider>> = None;
1335
1336 let final_status = 'tick: loop {
1337 let actions = scheduler.tick();
1338
1339 let mut any_spawn_success = false;
1341 let mut any_concurrency_failure = false;
1342
1343 for action in actions {
1344 match action {
1345 SchedulerAction::Spawn {
1346 task_id,
1347 agent_def_name,
1348 prompt,
1349 } => {
1350 let (success, fail, done) = self
1351 .handle_scheduler_spawn_action(
1352 scheduler,
1353 task_id,
1354 agent_def_name,
1355 prompt,
1356 &mut spawn_counter,
1357 task_count,
1358 )
1359 .await;
1360 any_spawn_success |= success;
1361 any_concurrency_failure |= fail;
1362 if let Some(s) = done {
1363 break 'tick s;
1364 }
1365 }
1366 SchedulerAction::Cancel { agent_handle_id } => {
1367 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1368 let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1370 tracing::trace!(error = %e, "cancel: agent already gone");
1371 });
1372 }
1373 }
1374 SchedulerAction::RunInline { task_id, prompt } => {
1385 spawn_counter += 1;
1386 self.handle_run_inline_action(
1387 scheduler,
1388 task_id,
1389 prompt,
1390 spawn_counter,
1391 task_count,
1392 &cancel_token,
1393 )
1394 .await;
1395 }
1396 SchedulerAction::Done { status } => {
1397 break 'tick status;
1398 }
1399 SchedulerAction::Verify { task_id, output } => {
1400 let verify_provider = self
1404 .orchestration
1405 .verify_provider
1406 .as_ref()
1407 .unwrap_or(&self.provider)
1408 .clone();
1409 let max_tokens = self.orchestration.orchestration_config.verify_max_tokens;
1410 let threshold = self
1411 .orchestration
1412 .orchestration_config
1413 .completeness_threshold;
1414 let sanitizer = self.security.sanitizer.clone();
1415
1416 let verifier = plan_verifier.get_or_insert_with(|| {
1418 PlanVerifier::new(verify_provider, max_tokens, sanitizer)
1419 });
1420
1421 let task = scheduler.graph().tasks.get(task_id.index()).cloned();
1422
1423 if let Some(task) = task {
1424 let result = verifier.verify(&task, &output).await;
1425 tracing::debug!(
1426 task_id = %task_id,
1427 complete = result.complete,
1428 confidence = result.confidence,
1429 gaps = result.gaps.len(),
1430 "per-task verification result"
1431 );
1432
1433 let should_replan = !result.complete
1434 && result.confidence < f64::from(threshold)
1435 && result.gaps.iter().any(|g| {
1436 matches!(
1437 g.severity,
1438 crate::orchestration::GapSeverity::Critical
1439 | crate::orchestration::GapSeverity::Important
1440 )
1441 });
1442
1443 if should_replan {
1444 let max_tasks_u32 =
1445 self.orchestration.orchestration_config.max_tasks;
1446 let max_tasks = max_tasks_u32 as usize;
1447 match verifier
1448 .replan(&task, &result.gaps, scheduler.graph(), max_tasks_u32)
1449 .await
1450 {
1451 Ok(new_tasks) if !new_tasks.is_empty() => {
1452 if let Err(e) =
1453 scheduler.inject_tasks(task_id, new_tasks, max_tasks)
1454 {
1455 tracing::warn!(
1456 error = %e,
1457 task_id = %task_id,
1458 "per-task replan inject_tasks failed (fail-open)"
1459 );
1460 }
1461 }
1462 Ok(_) => {}
1463 Err(e) => {
1464 tracing::warn!(
1465 error = %e,
1466 task_id = %task_id,
1467 "per-task replan failed (fail-open)"
1468 );
1469 }
1470 }
1471 }
1472 }
1473 }
1474 }
1475 }
1476
1477 scheduler.record_batch_backoff(any_spawn_success, any_concurrency_failure);
1479
1480 self.process_pending_secret_requests(&mut denied_secrets)
1482 .await;
1483
1484 let snapshot = crate::metrics::TaskGraphSnapshot::from(scheduler.graph());
1486 self.update_metrics(|m| {
1487 m.orchestration_graph = Some(snapshot);
1488 });
1489
1490 tokio::select! {
1501 biased;
1503 () = cancel_token.cancelled() => {
1504 let cancel_actions = scheduler.cancel_all();
1505 for action in cancel_actions {
1506 match action {
1507 SchedulerAction::Cancel { agent_handle_id } => {
1508 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1509 let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1510 tracing::trace!(
1511 error = %e,
1512 "cancel during plan cancellation: agent already gone"
1513 );
1514 });
1515 }
1516 }
1517 SchedulerAction::Done { status } => {
1518 break 'tick status;
1519 }
1520 SchedulerAction::Spawn { .. }
1521 | SchedulerAction::RunInline { .. }
1522 | SchedulerAction::Verify { .. } => {}
1523 }
1524 }
1525 break 'tick crate::orchestration::GraphStatus::Canceled;
1528 }
1529 () = scheduler.wait_event() => {}
1530 result = self.channel.recv() => {
1531 if let Ok(Some(msg)) = result {
1532 if msg.text.trim().eq_ignore_ascii_case("/plan cancel") {
1533 let _ = self.channel.send_status("Canceling plan...").await;
1534 let cancel_actions = scheduler.cancel_all();
1535 for ca in cancel_actions {
1536 match ca {
1537 SchedulerAction::Cancel { agent_handle_id } => {
1538 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1539 let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1541 tracing::trace!(error = %e, "cancel on user request: agent already gone");
1542 });
1543 }
1544 }
1545 SchedulerAction::Done { status } => {
1546 break 'tick status;
1547 }
1548 SchedulerAction::Spawn { .. }
1549 | SchedulerAction::RunInline { .. }
1550 | SchedulerAction::Verify { .. } => {}
1551 }
1552 }
1553 break 'tick crate::orchestration::GraphStatus::Canceled;
1556 }
1557 self.enqueue_or_merge(msg.text, vec![], msg.attachments);
1558 } else {
1559 let drain_actions = scheduler.tick();
1565 let mut natural_done: Option<crate::orchestration::GraphStatus> = None;
1566 for action in drain_actions {
1567 match action {
1568 SchedulerAction::Cancel { agent_handle_id } => {
1569 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1570 let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1571 tracing::trace!(
1572 error = %e,
1573 "cancel during drain on channel close: agent already gone"
1574 );
1575 });
1576 }
1577 }
1578 SchedulerAction::Done { status } => {
1579 natural_done = Some(status);
1580 }
1581 SchedulerAction::Spawn { .. }
1583 | SchedulerAction::RunInline { .. }
1584 | SchedulerAction::Verify { .. } => {}
1585 }
1586 }
1587
1588 if let Some(status) = natural_done {
1590 break 'tick status;
1591 }
1592
1593 let cancel_actions = scheduler.cancel_all();
1595 let n = cancel_actions
1596 .iter()
1597 .filter(|a| matches!(a, SchedulerAction::Cancel { .. }))
1598 .count();
1599 let shutdown_status = if self.channel.supports_exit() {
1604 crate::orchestration::GraphStatus::Canceled
1605 } else {
1606 crate::orchestration::GraphStatus::Failed
1607 };
1608 tracing::warn!(
1609 sub_agents = n,
1610 supports_exit = self.channel.supports_exit(),
1611 status = ?shutdown_status,
1612 "scheduler channel closed, canceling running sub-agents"
1613 );
1614 for action in cancel_actions {
1615 match action {
1616 SchedulerAction::Cancel { agent_handle_id } => {
1617 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1618 let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1619 tracing::trace!(
1620 error = %e,
1621 "cancel on channel close: agent already gone"
1622 );
1623 });
1624 }
1625 }
1626 SchedulerAction::Done { .. }
1628 | SchedulerAction::Spawn { .. }
1629 | SchedulerAction::RunInline { .. }
1630 | SchedulerAction::Verify { .. } => {}
1631 }
1632 }
1633 break 'tick shutdown_status;
1634 }
1635 }
1636 () = shutdown_signal(&mut self.lifecycle.shutdown) => {
1638 let cancel_actions = scheduler.cancel_all();
1639 let n = cancel_actions
1640 .iter()
1641 .filter(|a| matches!(a, SchedulerAction::Cancel { .. }))
1642 .count();
1643 tracing::warn!(sub_agents = n, "shutdown signal received, canceling running sub-agents");
1644 for action in cancel_actions {
1645 match action {
1646 SchedulerAction::Cancel { agent_handle_id } => {
1647 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1648 let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1649 tracing::trace!(
1650 error = %e,
1651 "cancel on shutdown: agent already gone"
1652 );
1653 });
1654 }
1655 }
1656 SchedulerAction::Done { status } => {
1657 break 'tick status;
1658 }
1659 SchedulerAction::Spawn { .. }
1660 | SchedulerAction::RunInline { .. }
1661 | SchedulerAction::Verify { .. } => {}
1662 }
1663 }
1664 break 'tick crate::orchestration::GraphStatus::Canceled;
1667 }
1668 }
1669 };
1670
1671 self.process_pending_secret_requests(&mut std::collections::HashSet::new())
1674 .await;
1675
1676 Ok(final_status)
1677 }
1678
1679 async fn run_inline_tool_loop(
1686 &mut self,
1687 prompt: &str,
1688 max_iterations: usize,
1689 ) -> Result<String, zeph_llm::LlmError> {
1690 use zeph_llm::provider::{ChatResponse, Message, MessagePart, Role, ToolDefinition};
1691 use zeph_tools::executor::ToolCall;
1692
1693 let tool_defs: Vec<ToolDefinition> = self
1698 .tool_executor
1699 .tool_definitions_erased()
1700 .iter()
1701 .map(tool_execution::tool_def_to_definition)
1702 .collect();
1703
1704 tracing::debug!(
1705 prompt_len = prompt.len(),
1706 max_iterations,
1707 tool_count = tool_defs.len(),
1708 "inline tool loop: starting"
1709 );
1710
1711 let mut messages: Vec<Message> = vec![Message::from_legacy(Role::User, prompt)];
1712 let mut last_text = String::new();
1713
1714 for iteration in 0..max_iterations {
1715 let response = self.provider.chat_with_tools(&messages, &tool_defs).await?;
1716
1717 match response {
1718 ChatResponse::Text(text) => {
1719 tracing::debug!(iteration, "inline tool loop: text response, returning");
1720 return Ok(text);
1721 }
1722 ChatResponse::ToolUse {
1723 text, tool_calls, ..
1724 } => {
1725 tracing::debug!(
1726 iteration,
1727 tools = ?tool_calls.iter().map(|tc| &tc.name).collect::<Vec<_>>(),
1728 "inline tool loop: tool use"
1729 );
1730
1731 if let Some(ref t) = text {
1732 last_text.clone_from(t);
1733 }
1734
1735 let mut parts: Vec<MessagePart> = Vec::new();
1737 if let Some(ref t) = text
1738 && !t.is_empty()
1739 {
1740 parts.push(MessagePart::Text { text: t.clone() });
1741 }
1742 for tc in &tool_calls {
1743 parts.push(MessagePart::ToolUse {
1744 id: tc.id.clone(),
1745 name: tc.name.clone(),
1746 input: tc.input.clone(),
1747 });
1748 }
1749 messages.push(Message::from_parts(Role::Assistant, parts));
1750
1751 let mut result_parts: Vec<MessagePart> = Vec::new();
1756 for tc in &tool_calls {
1757 let call = ToolCall {
1758 tool_id: tc.name.clone(),
1759 params: match &tc.input {
1760 serde_json::Value::Object(map) => map.clone(),
1761 _ => serde_json::Map::new(),
1762 },
1763 };
1764 let output = loop {
1765 tokio::select! {
1766 result = self.tool_executor.execute_tool_call_erased(&call) => {
1767 break match result {
1768 Ok(Some(out)) => out.summary,
1769 Ok(None) => "(no output)".to_owned(),
1770 Err(e) => format!("[error] {e}"),
1771 };
1772 }
1773 Some(event) = async {
1774 match self.mcp.elicitation_rx.as_mut() {
1775 Some(rx) => rx.recv().await,
1776 None => std::future::pending().await,
1777 }
1778 } => {
1779 self.handle_elicitation_event(event).await;
1780 }
1781 }
1782 };
1783 let is_error = output.starts_with("[error]");
1784 result_parts.push(MessagePart::ToolResult {
1785 tool_use_id: tc.id.clone(),
1786 content: output,
1787 is_error,
1788 });
1789 }
1790 messages.push(Message::from_parts(Role::User, result_parts));
1791 }
1792 }
1793 }
1794
1795 tracing::debug!(
1796 max_iterations,
1797 last_text_empty = last_text.is_empty(),
1798 "inline tool loop: iteration limit reached"
1799 );
1800 Ok(last_text)
1801 }
1802
1803 async fn process_pending_secret_requests(
1811 &mut self,
1812 denied: &mut std::collections::HashSet<(String, String)>,
1813 ) {
1814 loop {
1815 let pending = self
1816 .orchestration
1817 .subagent_manager
1818 .as_mut()
1819 .and_then(crate::subagent::SubAgentManager::try_recv_secret_request);
1820 let Some((req_handle_id, req)) = pending else {
1821 break;
1822 };
1823 let deny_key = (req_handle_id.clone(), req.secret_key.clone());
1824 if denied.contains(&deny_key) {
1825 tracing::debug!(
1826 handle_id = %req_handle_id,
1827 secret_key = %req.secret_key,
1828 "skipping duplicate secret prompt for already-denied key"
1829 );
1830 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1831 let _ = mgr.deny_secret(&req_handle_id);
1832 }
1833 continue;
1834 }
1835 let prompt = format!(
1836 "Sub-agent requests secret '{}'. Allow?{}",
1837 crate::text::truncate_to_chars(&req.secret_key, 100),
1838 req.reason
1839 .as_deref()
1840 .map(|r| format!(" Reason: {}", crate::text::truncate_to_chars(r, 200)))
1841 .unwrap_or_default()
1842 );
1843 let approved = tokio::select! {
1845 result = self.channel.confirm(&prompt) => result.unwrap_or(false),
1846 () = tokio::time::sleep(std::time::Duration::from_secs(120)) => {
1847 let _ = self.channel.send("Secret request timed out.").await;
1848 false
1849 }
1850 };
1851 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1852 if approved {
1853 let ttl = std::time::Duration::from_secs(300);
1854 let key = req.secret_key.clone();
1855 if mgr.approve_secret(&req_handle_id, &key, ttl).is_ok() {
1856 let _ = mgr.deliver_secret(&req_handle_id, key);
1857 }
1858 } else {
1859 denied.insert(deny_key);
1860 let _ = mgr.deny_secret(&req_handle_id);
1861 }
1862 }
1863 }
1864 }
1865
1866 #[allow(clippy::too_many_lines)]
1868 async fn finalize_plan_execution(
1869 &mut self,
1870 completed_graph: crate::orchestration::TaskGraph,
1871 final_status: crate::orchestration::GraphStatus,
1872 ) -> Result<&'static str, error::AgentError> {
1873 use std::fmt::Write;
1874
1875 use crate::orchestration::{Aggregator, GraphStatus, LlmAggregator};
1876
1877 let result_label = match final_status {
1878 GraphStatus::Completed => {
1879 let completed_count = completed_graph
1881 .tasks
1882 .iter()
1883 .filter(|t| t.status == crate::orchestration::TaskStatus::Completed)
1884 .count() as u64;
1885 let skipped_count = completed_graph
1886 .tasks
1887 .iter()
1888 .filter(|t| t.status == crate::orchestration::TaskStatus::Skipped)
1889 .count() as u64;
1890 self.update_metrics(|m| {
1891 m.orchestration.tasks_completed += completed_count;
1892 m.orchestration.tasks_skipped += skipped_count;
1893 });
1894
1895 let aggregator = LlmAggregator::new(
1896 self.provider.clone(),
1897 &self.orchestration.orchestration_config,
1898 );
1899 match aggregator.aggregate(&completed_graph).await {
1900 Ok((synthesis, aggregator_usage)) => {
1901 let (aggr_prompt, aggr_completion) = aggregator_usage.unwrap_or((0, 0));
1902 self.update_metrics(|m| {
1903 m.api_calls += 1;
1904 m.prompt_tokens += aggr_prompt;
1905 m.completion_tokens += aggr_completion;
1906 m.total_tokens = m.prompt_tokens + m.completion_tokens;
1907 });
1908 self.record_cost(aggr_prompt, aggr_completion);
1909 self.record_cache_usage();
1910 self.channel.send(&synthesis).await?;
1911 }
1912 Err(e) => {
1913 tracing::error!(error = %e, "aggregation failed");
1914 self.channel
1915 .send(
1916 "Plan completed but aggregation failed. \
1917 Check individual task results.",
1918 )
1919 .await?;
1920 }
1921 }
1922
1923 if let Some(ref cache) = self.orchestration.plan_cache
1925 && let Some(embedding) = self.orchestration.pending_goal_embedding.take()
1926 {
1927 let embed_model = self.skill_state.embedding_model.clone();
1928 if let Err(e) = cache
1929 .cache_plan(&completed_graph, &embedding, &embed_model)
1930 .await
1931 {
1932 tracing::warn!(error = %e, "plan cache: failed to cache completed plan");
1933 }
1934 }
1935
1936 "completed"
1937 }
1938 GraphStatus::Failed => {
1939 let failed_tasks: Vec<_> = completed_graph
1940 .tasks
1941 .iter()
1942 .filter(|t| t.status == crate::orchestration::TaskStatus::Failed)
1943 .collect();
1944 let cancelled_tasks: Vec<_> = completed_graph
1945 .tasks
1946 .iter()
1947 .filter(|t| t.status == crate::orchestration::TaskStatus::Canceled)
1948 .collect();
1949 let completed_count = completed_graph
1950 .tasks
1951 .iter()
1952 .filter(|t| t.status == crate::orchestration::TaskStatus::Completed)
1953 .count() as u64;
1954 let skipped_count = completed_graph
1955 .tasks
1956 .iter()
1957 .filter(|t| t.status == crate::orchestration::TaskStatus::Skipped)
1958 .count() as u64;
1959 self.update_metrics(|m| {
1960 m.orchestration.tasks_failed += failed_tasks.len() as u64;
1961 m.orchestration.tasks_completed += completed_count;
1962 m.orchestration.tasks_skipped += skipped_count;
1963 });
1964 let total = completed_graph.tasks.len();
1965 let msg = if failed_tasks.is_empty() && !cancelled_tasks.is_empty() {
1966 format!(
1968 "Plan canceled. {}/{} tasks did not run.\n\
1969 Use `/plan retry` to retry or check logs for details.",
1970 cancelled_tasks.len(),
1971 total
1972 )
1973 } else if failed_tasks.is_empty() && cancelled_tasks.is_empty() {
1974 tracing::warn!(
1976 "plan finished with GraphStatus::Failed but no failed or canceled tasks"
1977 );
1978 "Plan failed. No task errors recorded; check logs for details.".to_string()
1979 } else {
1980 let mut m = if cancelled_tasks.is_empty() {
1981 format!(
1982 "Plan failed. {}/{} tasks failed:\n",
1983 failed_tasks.len(),
1984 total
1985 )
1986 } else {
1987 format!(
1988 "Plan failed. {}/{} tasks failed, {} canceled:\n",
1989 failed_tasks.len(),
1990 total,
1991 cancelled_tasks.len()
1992 )
1993 };
1994 for t in &failed_tasks {
1995 let err: std::borrow::Cow<str> =
1997 t.result.as_ref().map_or("unknown error".into(), |r| {
1998 if r.output.len() > 500 {
1999 r.output.chars().take(500).collect::<String>().into()
2000 } else {
2001 r.output.as_str().into()
2002 }
2003 });
2004 let _ = writeln!(m, " - {}: {err}", t.title);
2005 }
2006 m.push_str("\nUse `/plan retry` to retry failed tasks.");
2007 m
2008 };
2009 self.channel.send(&msg).await?;
2010 self.orchestration.pending_graph = Some(completed_graph);
2015 "failed"
2016 }
2017 GraphStatus::Paused => {
2018 self.channel
2019 .send(
2020 "Plan paused due to a task failure (ask strategy). \
2021 Use `/plan resume` to continue or `/plan retry` to retry failed tasks.",
2022 )
2023 .await?;
2024 self.orchestration.pending_graph = Some(completed_graph);
2026 "paused"
2027 }
2028 GraphStatus::Canceled => {
2029 let done_count = completed_graph
2030 .tasks
2031 .iter()
2032 .filter(|t| t.status == crate::orchestration::TaskStatus::Completed)
2033 .count();
2034 self.update_metrics(|m| m.orchestration.tasks_completed += done_count as u64);
2035 let total = completed_graph.tasks.len();
2036 self.channel
2037 .send(&format!(
2038 "Plan canceled. {done_count}/{total} tasks completed before cancellation."
2039 ))
2040 .await?;
2041 self.orchestration.pending_goal_embedding.take();
2044 "canceled"
2045 }
2046 other => {
2047 tracing::warn!(%other, "unexpected graph status after Done");
2048 self.channel
2049 .send(&format!("Plan ended with status: {other}"))
2050 .await?;
2051 self.orchestration.pending_goal_embedding.take();
2052 "unknown"
2053 }
2054 };
2055 Ok(result_label)
2056 }
2057
2058 async fn handle_plan_status(
2059 &mut self,
2060 _graph_id: Option<&str>,
2061 ) -> Result<(), error::AgentError> {
2062 use crate::orchestration::GraphStatus;
2063 let Some(ref graph) = self.orchestration.pending_graph else {
2064 self.channel.send("No active plan.").await?;
2065 return Ok(());
2066 };
2067 let msg = match graph.status {
2068 GraphStatus::Created => {
2069 "A plan is awaiting confirmation. Type `/plan confirm` to execute or `/plan cancel` to abort."
2070 }
2071 GraphStatus::Running => "Plan is currently running.",
2072 GraphStatus::Paused => {
2073 "Plan is paused. Use `/plan resume` to continue or `/plan cancel` to abort."
2074 }
2075 GraphStatus::Failed => {
2076 "Plan failed. Use `/plan retry` to retry or `/plan cancel` to discard."
2077 }
2078 GraphStatus::Completed => "Plan completed successfully.",
2079 GraphStatus::Canceled => "Plan was canceled.",
2080 };
2081 self.channel.send(msg).await?;
2082 Ok(())
2083 }
2084
2085 async fn handle_plan_list(&mut self) -> Result<(), error::AgentError> {
2086 if let Some(ref graph) = self.orchestration.pending_graph {
2087 let summary = format_plan_summary(graph);
2088 let status_label = match graph.status {
2089 crate::orchestration::GraphStatus::Created => "awaiting confirmation",
2090 crate::orchestration::GraphStatus::Running => "running",
2091 crate::orchestration::GraphStatus::Paused => "paused",
2092 crate::orchestration::GraphStatus::Failed => "failed (retryable)",
2093 _ => "unknown",
2094 };
2095 self.channel
2096 .send(&format!("{summary}\nStatus: {status_label}"))
2097 .await?;
2098 } else {
2099 self.channel.send("No recent plans.").await?;
2100 }
2101 Ok(())
2102 }
2103
2104 async fn handle_plan_cancel(
2105 &mut self,
2106 _graph_id: Option<&str>,
2107 ) -> Result<(), error::AgentError> {
2108 if let Some(token) = self.orchestration.plan_cancel_token.take() {
2109 token.cancel();
2116 self.channel.send("Canceling plan execution...").await?;
2117 } else if self.orchestration.pending_graph.take().is_some() {
2118 let now = std::time::Instant::now();
2119 self.update_metrics(|m| {
2120 if let Some(ref mut s) = m.orchestration_graph {
2121 "canceled".clone_into(&mut s.status);
2122 s.completed_at = Some(now);
2123 }
2124 });
2125 self.orchestration.pending_goal_embedding = None;
2126 self.channel.send("Plan canceled.").await?;
2127 } else {
2128 self.channel.send("No active plan to cancel.").await?;
2129 }
2130 Ok(())
2131 }
2132
2133 async fn handle_plan_resume(
2138 &mut self,
2139 graph_id: Option<&str>,
2140 ) -> Result<(), error::AgentError> {
2141 use crate::orchestration::GraphStatus;
2142
2143 let Some(ref graph) = self.orchestration.pending_graph else {
2144 self.channel
2145 .send("No paused plan to resume. Use `/plan status` to check the current state.")
2146 .await?;
2147 return Ok(());
2148 };
2149
2150 if let Some(id) = graph_id
2152 && graph.id.to_string() != id
2153 {
2154 self.channel
2155 .send(&format!(
2156 "Graph id '{id}' does not match the active plan ({}). \
2157 Use `/plan status` to see the active plan id.",
2158 graph.id
2159 ))
2160 .await?;
2161 return Ok(());
2162 }
2163
2164 if graph.status != GraphStatus::Paused {
2165 self.channel
2166 .send(&format!(
2167 "The active plan is in '{}' status and cannot be resumed. \
2168 Only Paused plans can be resumed.",
2169 graph.status
2170 ))
2171 .await?;
2172 return Ok(());
2173 }
2174
2175 let graph = self.orchestration.pending_graph.take().unwrap();
2176
2177 tracing::info!(
2178 graph_id = %graph.id,
2179 "resuming paused graph"
2180 );
2181
2182 self.channel
2183 .send(&format!(
2184 "Resuming plan: {}\nUse `/plan confirm` to continue execution.",
2185 graph.goal
2186 ))
2187 .await?;
2188
2189 self.orchestration.pending_graph = Some(graph);
2191 Ok(())
2192 }
2193
2194 async fn handle_plan_retry(&mut self, graph_id: Option<&str>) -> Result<(), error::AgentError> {
2200 use crate::orchestration::{GraphStatus, dag};
2201
2202 let Some(ref graph) = self.orchestration.pending_graph else {
2203 self.channel
2204 .send("No active plan to retry. Use `/plan status` to check the current state.")
2205 .await?;
2206 return Ok(());
2207 };
2208
2209 if let Some(id) = graph_id
2211 && graph.id.to_string() != id
2212 {
2213 self.channel
2214 .send(&format!(
2215 "Graph id '{id}' does not match the active plan ({}). \
2216 Use `/plan status` to see the active plan id.",
2217 graph.id
2218 ))
2219 .await?;
2220 return Ok(());
2221 }
2222
2223 if graph.status != GraphStatus::Failed && graph.status != GraphStatus::Paused {
2224 self.channel
2225 .send(&format!(
2226 "The active plan is in '{}' status. Only Failed or Paused plans can be retried.",
2227 graph.status
2228 ))
2229 .await?;
2230 return Ok(());
2231 }
2232
2233 let mut graph = self.orchestration.pending_graph.take().unwrap();
2234
2235 let failed_count = graph
2237 .tasks
2238 .iter()
2239 .filter(|t| t.status == crate::orchestration::TaskStatus::Failed)
2240 .count();
2241
2242 dag::reset_for_retry(&mut graph).map_err(|e| error::AgentError::Other(e.to_string()))?;
2243
2244 for task in &mut graph.tasks {
2249 if task.status == crate::orchestration::TaskStatus::Running {
2250 task.status = crate::orchestration::TaskStatus::Ready;
2251 task.assigned_agent = None;
2252 }
2253 }
2254
2255 tracing::info!(
2256 graph_id = %graph.id,
2257 failed_count,
2258 "retrying failed tasks in graph"
2259 );
2260
2261 self.channel
2262 .send(&format!(
2263 "Retrying {failed_count} failed task(s) in plan: {}\n\
2264 Use `/plan confirm` to execute.",
2265 graph.goal
2266 ))
2267 .await?;
2268
2269 self.orchestration.pending_graph = Some(graph);
2271 Ok(())
2272 }
2273
2274 async fn call_llm_for_session_summary(
2283 &self,
2284 chat_messages: &[Message],
2285 ) -> Option<zeph_memory::StructuredSummary> {
2286 let timeout_dur =
2287 std::time::Duration::from_secs(self.memory_state.shutdown_summary_timeout_secs);
2288 match tokio::time::timeout(
2289 timeout_dur,
2290 self.provider
2291 .chat_typed_erased::<zeph_memory::StructuredSummary>(chat_messages),
2292 )
2293 .await
2294 {
2295 Ok(Ok(s)) => Some(s),
2296 Ok(Err(e)) => {
2297 tracing::warn!(
2298 "shutdown summary: structured LLM call failed, falling back to plain: {e:#}"
2299 );
2300 self.plain_text_summary_fallback(chat_messages, timeout_dur)
2301 .await
2302 }
2303 Err(_) => {
2304 tracing::warn!(
2305 "shutdown summary: structured LLM call timed out after {}s, falling back to plain",
2306 self.memory_state.shutdown_summary_timeout_secs
2307 );
2308 self.plain_text_summary_fallback(chat_messages, timeout_dur)
2309 .await
2310 }
2311 }
2312 }
2313
2314 async fn plain_text_summary_fallback(
2315 &self,
2316 chat_messages: &[Message],
2317 timeout_dur: std::time::Duration,
2318 ) -> Option<zeph_memory::StructuredSummary> {
2319 match tokio::time::timeout(timeout_dur, self.provider.chat(chat_messages)).await {
2320 Ok(Ok(plain)) => Some(zeph_memory::StructuredSummary {
2321 summary: plain,
2322 key_facts: vec![],
2323 entities: vec![],
2324 }),
2325 Ok(Err(e)) => {
2326 tracing::warn!("shutdown summary: plain LLM fallback failed: {e:#}");
2327 None
2328 }
2329 Err(_) => {
2330 tracing::warn!("shutdown summary: plain LLM fallback timed out");
2331 None
2332 }
2333 }
2334 }
2335
2336 async fn maybe_store_shutdown_summary(&mut self) {
2346 if !self.memory_state.shutdown_summary {
2347 return;
2348 }
2349 let Some(memory) = self.memory_state.memory.clone() else {
2350 return;
2351 };
2352 let Some(conversation_id) = self.memory_state.conversation_id else {
2353 return;
2354 };
2355
2356 match memory.has_session_summary(conversation_id).await {
2358 Ok(true) => {
2359 tracing::debug!("shutdown summary: session already has a summary, skipping");
2360 return;
2361 }
2362 Ok(false) => {}
2363 Err(e) => {
2364 tracing::warn!("shutdown summary: failed to check existing summary: {e:#}");
2365 return;
2366 }
2367 }
2368
2369 let user_count = self
2371 .msg
2372 .messages
2373 .iter()
2374 .skip(1)
2375 .filter(|m| m.role == Role::User)
2376 .count();
2377 if user_count < self.memory_state.shutdown_summary_min_messages {
2378 tracing::debug!(
2379 user_count,
2380 min = self.memory_state.shutdown_summary_min_messages,
2381 "shutdown summary: too few user messages, skipping"
2382 );
2383 return;
2384 }
2385
2386 let _ = self.channel.send_status("Saving session summary...").await;
2388
2389 let max = self.memory_state.shutdown_summary_max_messages;
2391 if max == 0 {
2392 tracing::debug!("shutdown summary: max_messages=0, skipping");
2393 return;
2394 }
2395 let non_system: Vec<_> = self.msg.messages.iter().skip(1).collect();
2396 let slice = if non_system.len() > max {
2397 &non_system[non_system.len() - max..]
2398 } else {
2399 &non_system[..]
2400 };
2401
2402 let msgs_for_prompt: Vec<(zeph_memory::MessageId, String, String)> = slice
2403 .iter()
2404 .map(|m| {
2405 let role = match m.role {
2406 Role::User => "user".to_owned(),
2407 Role::Assistant => "assistant".to_owned(),
2408 Role::System => "system".to_owned(),
2409 };
2410 (zeph_memory::MessageId(0), role, m.content.clone())
2411 })
2412 .collect();
2413
2414 let prompt = zeph_memory::build_summarization_prompt(&msgs_for_prompt);
2415 let chat_messages = vec![Message {
2416 role: Role::User,
2417 content: prompt,
2418 parts: vec![],
2419 metadata: MessageMetadata::default(),
2420 }];
2421
2422 let Some(structured) = self.call_llm_for_session_summary(&chat_messages).await else {
2423 let _ = self.channel.send_status("").await;
2424 return;
2425 };
2426
2427 if let Err(e) = memory
2428 .store_shutdown_summary(conversation_id, &structured.summary, &structured.key_facts)
2429 .await
2430 {
2431 tracing::warn!("shutdown summary: storage failed: {e:#}");
2432 } else {
2433 tracing::info!(
2434 conversation_id = conversation_id.0,
2435 "shutdown summary stored"
2436 );
2437 }
2438
2439 let _ = self.channel.send_status("").await;
2441 }
2442
2443 pub async fn shutdown(&mut self) {
2444 self.channel.send("Shutting down...").await.ok();
2445
2446 self.provider.save_router_state();
2448
2449 if let Some(ref mut mgr) = self.orchestration.subagent_manager {
2450 mgr.shutdown_all();
2451 }
2452
2453 if let Some(ref manager) = self.mcp.manager {
2454 manager.shutdown_all_shared().await;
2455 }
2456
2457 if let Some(turns) = self.context_manager.turns_since_last_hard_compaction {
2461 self.update_metrics(|m| {
2462 m.compaction_turns_after_hard.push(turns);
2463 });
2464 self.context_manager.turns_since_last_hard_compaction = None;
2465 }
2466
2467 if let Some(ref tx) = self.metrics.metrics_tx {
2468 let m = tx.borrow();
2469 if m.filter_applications > 0 {
2470 #[allow(clippy::cast_precision_loss)]
2471 let pct = if m.filter_raw_tokens > 0 {
2472 m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
2473 } else {
2474 0.0
2475 };
2476 tracing::info!(
2477 raw_tokens = m.filter_raw_tokens,
2478 saved_tokens = m.filter_saved_tokens,
2479 applications = m.filter_applications,
2480 "tool output filtering saved ~{} tokens ({pct:.0}%)",
2481 m.filter_saved_tokens,
2482 );
2483 }
2484 if m.compaction_hard_count > 0 {
2485 tracing::info!(
2486 hard_compactions = m.compaction_hard_count,
2487 turns_after_hard = ?m.compaction_turns_after_hard,
2488 "hard compaction trajectory"
2489 );
2490 }
2491 }
2492
2493 self.maybe_store_shutdown_summary().await;
2494 self.maybe_store_session_digest().await;
2495
2496 tracing::info!("agent shutdown complete");
2497 }
2498
2499 fn refresh_subagent_metrics(&mut self) {
2506 let Some(ref mgr) = self.orchestration.subagent_manager else {
2507 return;
2508 };
2509 let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
2510 .statuses()
2511 .into_iter()
2512 .map(|(id, s)| {
2513 let def = mgr.agents_def(&id);
2514 crate::metrics::SubAgentMetrics {
2515 name: def.map_or_else(|| id[..8.min(id.len())].to_owned(), |d| d.name.clone()),
2516 id: id.clone(),
2517 state: format!("{:?}", s.state).to_lowercase(),
2518 turns_used: s.turns_used,
2519 max_turns: def.map_or(20, |d| d.permissions.max_turns),
2520 background: def.is_some_and(|d| d.permissions.background),
2521 elapsed_secs: s.started_at.elapsed().as_secs(),
2522 permission_mode: def.map_or_else(String::new, |d| {
2523 use crate::subagent::def::PermissionMode;
2524 match d.permissions.permission_mode {
2525 PermissionMode::Default => String::new(),
2526 PermissionMode::AcceptEdits => "accept_edits".into(),
2527 PermissionMode::DontAsk => "dont_ask".into(),
2528 PermissionMode::BypassPermissions => "bypass_permissions".into(),
2529 PermissionMode::Plan => "plan".into(),
2530 }
2531 }),
2532 transcript_dir: mgr
2533 .agent_transcript_dir(&id)
2534 .map(|p| p.to_string_lossy().into_owned()),
2535 }
2536 })
2537 .collect();
2538 self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
2539 }
2540
2541 async fn notify_completed_subagents(&mut self) -> Result<(), error::AgentError> {
2543 let completed = self.poll_subagents().await;
2544 for (task_id, result) in completed {
2545 let notice = if result.is_empty() {
2546 format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
2547 } else {
2548 format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
2549 };
2550 if let Err(e) = self.channel.send(¬ice).await {
2551 tracing::warn!(error = %e, "failed to send sub-agent completion notice");
2552 }
2553 }
2554 Ok(())
2555 }
2556
2557 pub async fn run(&mut self) -> Result<(), error::AgentError> {
2563 if let Some(mut rx) = self.lifecycle.warmup_ready.take()
2564 && !*rx.borrow()
2565 {
2566 let _ = rx.changed().await;
2567 if !*rx.borrow() {
2568 tracing::warn!("model warmup did not complete successfully");
2569 }
2570 }
2571
2572 self.load_and_cache_session_digest().await;
2574
2575 loop {
2576 if let Some(ref slot) = self.providers.provider_override
2578 && let Some(new_provider) = slot
2579 .write()
2580 .unwrap_or_else(std::sync::PoisonError::into_inner)
2581 .take()
2582 {
2583 tracing::debug!(provider = new_provider.name(), "ACP model override applied");
2584 self.provider = new_provider;
2585 }
2586
2587 self.check_tool_refresh().await;
2589
2590 self.process_pending_elicitations().await;
2592
2593 self.refresh_subagent_metrics();
2595
2596 self.notify_completed_subagents().await?;
2598
2599 self.drain_channel();
2600
2601 let (text, image_parts) = if let Some(queued) = self.msg.message_queue.pop_front() {
2602 self.notify_queue_count().await;
2603 if queued.raw_attachments.is_empty() {
2604 (queued.text, queued.image_parts)
2605 } else {
2606 let msg = crate::channel::ChannelMessage {
2607 text: queued.text,
2608 attachments: queued.raw_attachments,
2609 };
2610 self.resolve_message(msg).await
2611 }
2612 } else {
2613 let incoming = tokio::select! {
2614 result = self.channel.recv() => result?,
2615 () = shutdown_signal(&mut self.lifecycle.shutdown) => {
2616 tracing::info!("shutting down");
2617 break;
2618 }
2619 Some(_) = recv_optional(&mut self.skill_state.skill_reload_rx) => {
2620 self.reload_skills().await;
2621 continue;
2622 }
2623 Some(_) = recv_optional(&mut self.instructions.reload_rx) => {
2624 self.reload_instructions();
2625 continue;
2626 }
2627 Some(_) = recv_optional(&mut self.lifecycle.config_reload_rx) => {
2628 self.reload_config();
2629 continue;
2630 }
2631 Some(msg) = recv_optional(&mut self.lifecycle.update_notify_rx) => {
2632 if let Err(e) = self.channel.send(&msg).await {
2633 tracing::warn!("failed to send update notification: {e}");
2634 }
2635 continue;
2636 }
2637 Some(msg) = recv_optional(&mut self.experiments.notify_rx) => {
2638 #[cfg(feature = "experiments")]
2641 { self.experiments.cancel = None; }
2642 if let Err(e) = self.channel.send(&msg).await {
2643 tracing::warn!("failed to send experiment completion: {e}");
2644 }
2645 continue;
2646 }
2647 Some(prompt) = recv_optional(&mut self.lifecycle.custom_task_rx) => {
2648 tracing::info!("scheduler: injecting custom task as agent turn");
2649 let text = format!("{SCHEDULED_TASK_PREFIX}{prompt}");
2650 Some(crate::channel::ChannelMessage { text, attachments: Vec::new() })
2651 }
2652 Some(event) = recv_optional(&mut self.lifecycle.file_changed_rx) => {
2653 self.handle_file_changed(event).await;
2654 continue;
2655 }
2656 };
2657 let Some(msg) = incoming else { break };
2658 self.drain_channel();
2659 self.resolve_message(msg).await
2660 };
2661
2662 let trimmed = text.trim();
2663
2664 match self.handle_builtin_command(trimmed).await? {
2665 Some(true) => break,
2666 Some(false) => continue,
2667 None => {}
2668 }
2669
2670 self.process_user_message(text, image_parts).await?;
2671 }
2672
2673 if let Some(ref mut tc) = self.debug_state.trace_collector {
2675 tc.finish();
2676 }
2677
2678 Ok(())
2679 }
2680
2681 #[allow(clippy::too_many_lines)]
2687 async fn handle_builtin_command(
2688 &mut self,
2689 trimmed: &str,
2690 ) -> Result<Option<bool>, error::AgentError> {
2691 if trimmed == "/clear-queue" {
2692 let n = self.clear_queue();
2693 self.notify_queue_count().await;
2694 self.channel
2695 .send(&format!("Cleared {n} queued messages."))
2696 .await?;
2697 let _ = self.channel.flush_chunks().await;
2698 return Ok(Some(false));
2699 }
2700
2701 if trimmed == "/compact" {
2702 if self.msg.messages.len() > self.context_manager.compaction_preserve_tail + 1 {
2703 match self.compact_context().await {
2704 Ok(
2705 context::CompactionOutcome::Compacted
2706 | context::CompactionOutcome::NoChange,
2707 ) => {
2708 let _ = self.channel.send("Context compacted successfully.").await;
2709 }
2710 Ok(context::CompactionOutcome::ProbeRejected) => {
2711 let _ = self
2712 .channel
2713 .send(
2714 "Compaction rejected: summary quality below threshold. \
2715 Original context preserved.",
2716 )
2717 .await;
2718 }
2719 Err(e) => {
2720 let _ = self.channel.send(&format!("Compaction failed: {e}")).await;
2721 }
2722 }
2723 } else {
2724 let _ = self.channel.send("Nothing to compact.").await;
2725 }
2726 let _ = self.channel.flush_chunks().await;
2727 return Ok(Some(false));
2728 }
2729
2730 if trimmed == "/new" || trimmed.starts_with("/new ") {
2731 let args = trimmed.strip_prefix("/new").unwrap_or("").trim();
2732 let keep_plan = args.split_whitespace().any(|a| a == "--keep-plan");
2733 let no_digest = args.split_whitespace().any(|a| a == "--no-digest");
2734 match self.reset_conversation(keep_plan, no_digest).await {
2735 Ok((old_id, new_id)) => {
2736 let old = old_id.map_or_else(|| "none".to_string(), |id| id.0.to_string());
2737 let new = new_id.map_or_else(|| "none".to_string(), |id| id.0.to_string());
2738 let keep_note = if keep_plan { " (plan preserved)" } else { "" };
2739 self.channel
2740 .send(&format!(
2741 "New conversation started. Previous: {old} → Current: {new}{keep_note}"
2742 ))
2743 .await?;
2744 }
2745 Err(e) => {
2746 self.channel
2747 .send(&format!("Failed to start new conversation: {e}"))
2748 .await?;
2749 }
2750 }
2751 let _ = self.channel.flush_chunks().await;
2752 return Ok(Some(false));
2753 }
2754
2755 if trimmed == "/clear" {
2756 self.clear_history();
2757 self.tool_orchestrator.clear_cache();
2758 if let Ok(mut urls) = self.security.user_provided_urls.write() {
2759 urls.clear();
2760 }
2761 let _ = self.channel.flush_chunks().await;
2762 return Ok(Some(false));
2763 }
2764
2765 if trimmed == "/reset" {
2766 self.clear_history();
2767 self.tool_orchestrator.clear_cache();
2768 if let Ok(mut urls) = self.security.user_provided_urls.write() {
2769 urls.clear();
2770 }
2771 self.channel.send("Conversation history reset.").await?;
2772 let _ = self.channel.flush_chunks().await;
2773 return Ok(Some(false));
2774 }
2775
2776 if trimmed == "/cache-stats" {
2777 let stats = self.tool_orchestrator.cache_stats();
2778 self.channel.send(&stats).await?;
2779 let _ = self.channel.flush_chunks().await;
2780 return Ok(Some(false));
2781 }
2782
2783 if trimmed == "/model" || trimmed.starts_with("/model ") {
2784 self.handle_model_command(trimmed).await;
2785 let _ = self.channel.flush_chunks().await;
2786 return Ok(Some(false));
2787 }
2788
2789 if trimmed == "/provider" || trimmed.starts_with("/provider ") {
2790 self.handle_provider_command(trimmed).await;
2791 let _ = self.channel.flush_chunks().await;
2792 return Ok(Some(false));
2793 }
2794
2795 if trimmed == "/debug-dump" || trimmed.starts_with("/debug-dump ") {
2796 self.handle_debug_dump_command(trimmed).await;
2797 let _ = self.channel.flush_chunks().await;
2798 return Ok(Some(false));
2799 }
2800
2801 if trimmed.starts_with("/dump-format") {
2802 self.handle_dump_format_command(trimmed).await;
2803 let _ = self.channel.flush_chunks().await;
2804 return Ok(Some(false));
2805 }
2806
2807 if trimmed == "/exit" || trimmed == "/quit" {
2808 if self.channel.supports_exit() {
2809 return Ok(Some(true));
2810 }
2811 let _ = self
2812 .channel
2813 .send("/exit is not supported in this channel.")
2814 .await;
2815 return Ok(Some(false));
2816 }
2817
2818 Ok(None)
2819 }
2820
2821 pub fn set_model(&mut self, model_id: &str) -> Result<(), String> {
2829 if model_id.is_empty() {
2830 return Err("model id must not be empty".to_string());
2831 }
2832 if model_id.len() > 256 {
2833 return Err("model id exceeds maximum length of 256 characters".to_string());
2834 }
2835 if !model_id
2836 .chars()
2837 .all(|c| c.is_ascii() && !c.is_ascii_control())
2838 {
2839 return Err("model id must contain only printable ASCII characters".to_string());
2840 }
2841 self.runtime.model_name = model_id.to_string();
2842 tracing::info!(model = model_id, "set_model called");
2843 Ok(())
2844 }
2845
2846 async fn handle_model_refresh(&mut self) {
2847 if let Some(cache_dir) = dirs::cache_dir() {
2849 let models_dir = cache_dir.join("zeph").join("models");
2850 if let Ok(entries) = std::fs::read_dir(&models_dir) {
2851 for entry in entries.flatten() {
2852 let path = entry.path();
2853 if path.extension().and_then(|e| e.to_str()) == Some("json") {
2854 let _ = std::fs::remove_file(&path);
2855 }
2856 }
2857 }
2858 }
2859 match self.provider.list_models_remote().await {
2860 Ok(models) => {
2861 let _ = self
2862 .channel
2863 .send(&format!("Fetched {} models.", models.len()))
2864 .await;
2865 }
2866 Err(e) => {
2867 let _ = self
2868 .channel
2869 .send(&format!("Error fetching models: {e}"))
2870 .await;
2871 }
2872 }
2873 }
2874
2875 async fn handle_model_list(&mut self) {
2876 let cache = zeph_llm::model_cache::ModelCache::for_slug(self.provider.name());
2877 let cached = if cache.is_stale() {
2878 None
2879 } else {
2880 cache.load().unwrap_or(None)
2881 };
2882 let models = if let Some(m) = cached {
2883 m
2884 } else {
2885 match self.provider.list_models_remote().await {
2886 Ok(m) => m,
2887 Err(e) => {
2888 let _ = self
2889 .channel
2890 .send(&format!("Error fetching models: {e}"))
2891 .await;
2892 return;
2893 }
2894 }
2895 };
2896 if models.is_empty() {
2897 let _ = self.channel.send("No models available.").await;
2898 return;
2899 }
2900 let mut lines = vec!["Available models:".to_string()];
2901 for (i, m) in models.iter().enumerate() {
2902 lines.push(format!(" {}. {} ({})", i + 1, m.display_name, m.id));
2903 }
2904 let _ = self.channel.send(&lines.join("\n")).await;
2905 }
2906
2907 async fn handle_model_switch(&mut self, model_id: &str) {
2908 let cache = zeph_llm::model_cache::ModelCache::for_slug(self.provider.name());
2911 let known_models: Option<Vec<zeph_llm::model_cache::RemoteModelInfo>> = if cache.is_stale()
2912 {
2913 match self.provider.list_models_remote().await {
2914 Ok(m) if !m.is_empty() => Some(m),
2915 _ => None,
2916 }
2917 } else {
2918 cache.load().unwrap_or(None)
2919 };
2920 if let Some(models) = known_models {
2921 if !models.iter().any(|m| m.id == model_id) {
2922 let mut lines = vec![format!("Unknown model '{model_id}'. Available models:")];
2923 for m in &models {
2924 lines.push(format!(" • {} ({})", m.display_name, m.id));
2925 }
2926 let _ = self.channel.send(&lines.join("\n")).await;
2927 return;
2928 }
2929 } else {
2930 let _ = self
2931 .channel
2932 .send(
2933 "Model list unavailable, switching anyway — verify your model name is correct.",
2934 )
2935 .await;
2936 }
2937 match self.set_model(model_id) {
2938 Ok(()) => {
2939 let _ = self
2940 .channel
2941 .send(&format!("Switched to model: {model_id}"))
2942 .await;
2943 }
2944 Err(e) => {
2945 let _ = self.channel.send(&format!("Error: {e}")).await;
2946 }
2947 }
2948 }
2949
2950 async fn handle_model_command(&mut self, trimmed: &str) {
2952 let arg = trimmed.strip_prefix("/model").map_or("", str::trim);
2953 if arg == "refresh" {
2954 self.handle_model_refresh().await;
2955 } else if arg.is_empty() {
2956 self.handle_model_list().await;
2957 } else {
2958 self.handle_model_switch(arg).await;
2959 }
2960 }
2961
2962 async fn handle_debug_dump_command(&mut self, trimmed: &str) {
2964 let arg = trimmed.strip_prefix("/debug-dump").map_or("", str::trim);
2965 if arg.is_empty() {
2966 match &self.debug_state.debug_dumper {
2967 Some(d) => {
2968 let _ = self
2969 .channel
2970 .send(&format!("Debug dump active: {}", d.dir().display()))
2971 .await;
2972 }
2973 None => {
2974 let _ = self
2975 .channel
2976 .send(
2977 "Debug dump is inactive. Use `/debug-dump <path>` to enable, \
2978 or start with `--debug-dump [dir]`.",
2979 )
2980 .await;
2981 }
2982 }
2983 return;
2984 }
2985 let dir = std::path::PathBuf::from(arg);
2986 match crate::debug_dump::DebugDumper::new(&dir, self.debug_state.dump_format) {
2987 Ok(dumper) => {
2988 let path = dumper.dir().display().to_string();
2989 self.debug_state.debug_dumper = Some(dumper);
2990 let _ = self
2991 .channel
2992 .send(&format!("Debug dump enabled: {path}"))
2993 .await;
2994 }
2995 Err(e) => {
2996 let _ = self
2997 .channel
2998 .send(&format!("Failed to enable debug dump: {e}"))
2999 .await;
3000 }
3001 }
3002 }
3003
3004 async fn handle_dump_format_command(&mut self, trimmed: &str) {
3006 let arg = trimmed.strip_prefix("/dump-format").map_or("", str::trim);
3007 if arg.is_empty() {
3008 let _ = self
3009 .channel
3010 .send(&format!(
3011 "Current dump format: {:?}. Use `/dump-format json|raw|trace` to change.",
3012 self.debug_state.dump_format
3013 ))
3014 .await;
3015 return;
3016 }
3017 let new_format = match arg {
3018 "json" => crate::debug_dump::DumpFormat::Json,
3019 "raw" => crate::debug_dump::DumpFormat::Raw,
3020 "trace" => crate::debug_dump::DumpFormat::Trace,
3021 other => {
3022 let _ = self
3023 .channel
3024 .send(&format!(
3025 "Unknown format '{other}'. Valid values: json, raw, trace."
3026 ))
3027 .await;
3028 return;
3029 }
3030 };
3031 let was_trace = self.debug_state.dump_format == crate::debug_dump::DumpFormat::Trace;
3032 let now_trace = new_format == crate::debug_dump::DumpFormat::Trace;
3033
3034 if now_trace
3036 && !was_trace
3037 && let Some(ref dump_dir) = self.debug_state.dump_dir.clone()
3038 {
3039 let service_name = self.debug_state.trace_service_name.clone();
3040 let redact = self.debug_state.trace_redact;
3041 match crate::debug_dump::trace::TracingCollector::new(
3042 dump_dir.as_path(),
3043 &service_name,
3044 redact,
3045 None,
3046 ) {
3047 Ok(collector) => {
3048 self.debug_state.trace_collector = Some(collector);
3049 }
3050 Err(e) => {
3051 tracing::warn!(error = %e, "failed to create TracingCollector on format switch");
3052 }
3053 }
3054 }
3055 if was_trace
3057 && !now_trace
3058 && let Some(mut tc) = self.debug_state.trace_collector.take()
3059 {
3060 tc.finish();
3061 }
3062
3063 self.debug_state.dump_format = new_format;
3064 let _ = self
3065 .channel
3066 .send(&format!("Debug dump format set to: {arg}"))
3067 .await;
3068 }
3069
3070 async fn resolve_message(
3071 &self,
3072 msg: crate::channel::ChannelMessage,
3073 ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
3074 use crate::channel::{Attachment, AttachmentKind};
3075 use zeph_llm::provider::{ImageData, MessagePart};
3076
3077 let text_base = msg.text.clone();
3078
3079 let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
3080 .attachments
3081 .into_iter()
3082 .partition(|a| a.kind == AttachmentKind::Audio);
3083
3084 tracing::debug!(
3085 audio = audio_attachments.len(),
3086 has_stt = self.providers.stt.is_some(),
3087 "resolve_message attachments"
3088 );
3089
3090 let text = if !audio_attachments.is_empty()
3091 && let Some(stt) = self.providers.stt.as_ref()
3092 {
3093 let mut transcribed_parts = Vec::new();
3094 for attachment in &audio_attachments {
3095 if attachment.data.len() > MAX_AUDIO_BYTES {
3096 tracing::warn!(
3097 size = attachment.data.len(),
3098 max = MAX_AUDIO_BYTES,
3099 "audio attachment exceeds size limit, skipping"
3100 );
3101 continue;
3102 }
3103 match stt
3104 .transcribe(&attachment.data, attachment.filename.as_deref())
3105 .await
3106 {
3107 Ok(result) => {
3108 tracing::info!(
3109 len = result.text.len(),
3110 language = ?result.language,
3111 "audio transcribed"
3112 );
3113 transcribed_parts.push(result.text);
3114 }
3115 Err(e) => {
3116 tracing::error!(error = %e, "audio transcription failed");
3117 }
3118 }
3119 }
3120 if transcribed_parts.is_empty() {
3121 text_base
3122 } else {
3123 let transcribed = transcribed_parts.join("\n");
3124 if text_base.is_empty() {
3125 transcribed
3126 } else {
3127 format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
3128 }
3129 }
3130 } else {
3131 if !audio_attachments.is_empty() {
3132 tracing::warn!(
3133 count = audio_attachments.len(),
3134 "audio attachments received but no STT provider configured, dropping"
3135 );
3136 }
3137 text_base
3138 };
3139
3140 let mut image_parts = Vec::new();
3141 for attachment in image_attachments {
3142 if attachment.data.len() > MAX_IMAGE_BYTES {
3143 tracing::warn!(
3144 size = attachment.data.len(),
3145 max = MAX_IMAGE_BYTES,
3146 "image attachment exceeds size limit, skipping"
3147 );
3148 continue;
3149 }
3150 let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
3151 image_parts.push(MessagePart::Image(Box::new(ImageData {
3152 data: attachment.data,
3153 mime_type,
3154 })));
3155 }
3156
3157 (text, image_parts)
3158 }
3159
3160 #[allow(clippy::too_many_lines)]
3163 async fn dispatch_slash_command(
3164 &mut self,
3165 trimmed: &str,
3166 ) -> Option<Result<(), error::AgentError>> {
3167 macro_rules! handled {
3168 ($expr:expr) => {{
3169 if let Err(e) = $expr {
3170 return Some(Err(e));
3171 }
3172 let _ = self.channel.flush_chunks().await;
3173 return Some(Ok(()));
3174 }};
3175 }
3176
3177 let slash_urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
3180 if !slash_urls.is_empty()
3181 && let Ok(mut set) = self.security.user_provided_urls.write()
3182 {
3183 set.extend(slash_urls);
3184 }
3185
3186 if trimmed == "/help" {
3187 handled!(self.handle_help_command().await);
3188 }
3189
3190 if trimmed == "/status" {
3191 handled!(self.handle_status_command().await);
3192 }
3193
3194 #[cfg(feature = "guardrail")]
3195 if trimmed == "/guardrail" {
3196 handled!(self.handle_guardrail_command().await);
3197 }
3198
3199 if trimmed == "/skills" || trimmed.starts_with("/skills ") {
3200 let subcommand = trimmed.strip_prefix("/skills").unwrap_or("").trim();
3201 handled!(self.handle_skills_family(subcommand).await);
3202 }
3203
3204 if trimmed == "/skill" || trimmed.starts_with("/skill ") {
3205 let rest = trimmed
3206 .strip_prefix("/skill")
3207 .unwrap_or("")
3208 .trim()
3209 .to_owned();
3210 handled!(self.handle_skill_command(&rest).await);
3211 }
3212
3213 if trimmed == "/feedback" || trimmed.starts_with("/feedback ") {
3214 let rest = trimmed
3215 .strip_prefix("/feedback")
3216 .unwrap_or("")
3217 .trim()
3218 .to_owned();
3219 handled!(self.handle_feedback(&rest).await);
3220 }
3221
3222 if trimmed == "/mcp" || trimmed.starts_with("/mcp ") {
3223 let args = trimmed.strip_prefix("/mcp").unwrap_or("").trim().to_owned();
3224 handled!(self.handle_mcp_command(&args).await);
3225 }
3226
3227 if trimmed == "/image" || trimmed.starts_with("/image ") {
3228 let path = trimmed
3229 .strip_prefix("/image")
3230 .unwrap_or("")
3231 .trim()
3232 .to_owned();
3233 if path.is_empty() {
3234 handled!(
3235 self.channel
3236 .send("Usage: /image <path>")
3237 .await
3238 .map_err(Into::into)
3239 );
3240 }
3241 handled!(self.handle_image_command(&path).await);
3242 }
3243
3244 if trimmed == "/plan" || trimmed.starts_with("/plan ") {
3245 return Some(self.dispatch_plan_command(trimmed).await);
3246 }
3247
3248 if trimmed == "/graph" || trimmed.starts_with("/graph ") {
3249 handled!(self.handle_graph_command(trimmed).await);
3250 }
3251
3252 if trimmed == "/memory" || trimmed.starts_with("/memory ") {
3253 handled!(self.handle_memory_command(trimmed).await);
3254 }
3255
3256 #[cfg(feature = "compression-guidelines")]
3257 if trimmed == "/guidelines" {
3258 handled!(self.handle_guidelines_command().await);
3259 }
3260
3261 #[cfg(feature = "scheduler")]
3262 if trimmed == "/scheduler" || trimmed.starts_with("/scheduler ") {
3263 handled!(self.handle_scheduler_command(trimmed).await);
3264 }
3265
3266 #[cfg(feature = "experiments")]
3267 if trimmed == "/experiment" || trimmed.starts_with("/experiment ") {
3268 handled!(self.handle_experiment_command(trimmed).await);
3269 }
3270
3271 #[cfg(feature = "lsp-context")]
3272 if trimmed == "/lsp" {
3273 handled!(self.handle_lsp_status_command().await);
3274 }
3275
3276 #[cfg(feature = "policy-enforcer")]
3277 if trimmed == "/policy" || trimmed.starts_with("/policy ") {
3278 let args = trimmed
3279 .strip_prefix("/policy")
3280 .unwrap_or("")
3281 .trim()
3282 .to_owned();
3283 handled!(self.handle_policy_command(&args).await);
3284 }
3285
3286 if trimmed == "/log" {
3287 handled!(self.handle_log_command().await);
3288 }
3289
3290 if trimmed.starts_with("/agent") || trimmed.starts_with('@') {
3291 return self.dispatch_agent_command(trimmed).await;
3292 }
3293
3294 #[cfg(feature = "context-compression")]
3295 if trimmed == "/focus" {
3296 handled!(self.handle_focus_status_command().await);
3297 }
3298
3299 #[cfg(feature = "context-compression")]
3300 if trimmed == "/sidequest" {
3301 handled!(self.handle_sidequest_status_command().await);
3302 }
3303
3304 None
3305 }
3306
3307 async fn dispatch_plan_command(&mut self, trimmed: &str) -> Result<(), error::AgentError> {
3308 match crate::orchestration::PlanCommand::parse(trimmed) {
3309 Ok(cmd) => {
3310 self.handle_plan_command(cmd).await?;
3311 }
3312 Err(e) => {
3313 self.channel
3314 .send(&e.to_string())
3315 .await
3316 .map_err(error::AgentError::from)?;
3317 }
3318 }
3319 let _ = self.channel.flush_chunks().await;
3320 Ok(())
3321 }
3322
3323 async fn dispatch_agent_command(
3324 &mut self,
3325 trimmed: &str,
3326 ) -> Option<Result<(), error::AgentError>> {
3327 let known: Vec<String> = self
3328 .orchestration
3329 .subagent_manager
3330 .as_ref()
3331 .map(|m| m.definitions().iter().map(|d| d.name.clone()).collect())
3332 .unwrap_or_default();
3333 match crate::subagent::AgentCommand::parse(trimmed, &known) {
3334 Ok(cmd) => {
3335 if let Some(msg) = self.handle_agent_command(cmd).await
3336 && let Err(e) = self.channel.send(&msg).await
3337 {
3338 return Some(Err(e.into()));
3339 }
3340 let _ = self.channel.flush_chunks().await;
3341 Some(Ok(()))
3342 }
3343 Err(e) if trimmed.starts_with('@') => {
3344 tracing::debug!("@mention not matched as agent: {e}");
3346 None
3347 }
3348 Err(e) => {
3349 if let Err(send_err) = self.channel.send(&e.to_string()).await {
3350 return Some(Err(send_err.into()));
3351 }
3352 let _ = self.channel.flush_chunks().await;
3353 Some(Ok(()))
3354 }
3355 }
3356 }
3357
3358 #[allow(clippy::too_many_lines)]
3367 fn spawn_judge_correction_check(
3368 &mut self,
3369 trimmed: &str,
3370 conv_id: Option<zeph_memory::ConversationId>,
3371 ) {
3372 let assistant_snippet = self.last_assistant_response();
3373 let user_msg_owned = trimmed.to_owned();
3374 let memory_arc = self.memory_state.memory.clone();
3375 let skill_name = self
3376 .skill_state
3377 .active_skill_names
3378 .first()
3379 .cloned()
3380 .unwrap_or_default();
3381 let conv_id_bg = conv_id;
3382 let confidence_threshold = self
3383 .learning_engine
3384 .config
3385 .as_ref()
3386 .map_or(0.6, |c| c.correction_confidence_threshold);
3387
3388 if let Some(llm_classifier) = self.feedback.llm_classifier.clone() {
3389 let user_msg = user_msg_owned.clone();
3391 let assistant = assistant_snippet.clone();
3392 let memory_arc2 = memory_arc.clone();
3393 let skill_name2 = skill_name.clone();
3394 let classifier_metrics_bg = self.metrics.classifier_metrics.clone();
3396 let metrics_tx_bg = self.metrics.metrics_tx.clone();
3397 tokio::spawn(async move {
3398 match llm_classifier
3399 .classify_feedback(&user_msg, &assistant, confidence_threshold)
3400 .await
3401 {
3402 Ok(verdict) => {
3403 if let (Some(ref cm), Some(ref tx)) = (classifier_metrics_bg, metrics_tx_bg)
3405 {
3406 let snap = cm.snapshot();
3407 tx.send_modify(|ms| ms.classifier = snap);
3408 }
3409 if let Some(signal) = feedback_verdict_into_signal(&verdict, &user_msg) {
3410 let is_self_correction =
3411 signal.kind == feedback_detector::CorrectionKind::SelfCorrection;
3412 tracing::info!(
3413 kind = signal.kind.as_str(),
3414 confidence = signal.confidence,
3415 source = "llm-classifier",
3416 is_self_correction,
3417 "correction signal detected"
3418 );
3419 store_correction_in_memory(
3420 memory_arc2,
3421 conv_id_bg,
3422 &assistant,
3423 &user_msg,
3424 skill_name2,
3425 signal.kind.as_str(),
3426 )
3427 .await;
3428 }
3429 }
3430 Err(e) => {
3431 tracing::warn!("llm-classifier failed: {e:#}");
3432 }
3433 }
3434 });
3435 } else {
3436 let judge_provider = self
3438 .providers
3439 .judge_provider
3440 .clone()
3441 .unwrap_or_else(|| self.provider.clone());
3442 let user_msg = user_msg_owned.clone();
3443 let assistant = assistant_snippet.clone();
3444 tokio::spawn(async move {
3445 match feedback_detector::JudgeDetector::evaluate(
3446 &judge_provider,
3447 &user_msg,
3448 &assistant,
3449 confidence_threshold,
3450 )
3451 .await
3452 {
3453 Ok(verdict) => {
3454 if let Some(signal) = verdict.into_signal(&user_msg) {
3455 let is_self_correction =
3460 signal.kind == feedback_detector::CorrectionKind::SelfCorrection;
3461 tracing::info!(
3462 kind = signal.kind.as_str(),
3463 confidence = signal.confidence,
3464 source = "judge",
3465 is_self_correction,
3466 "correction signal detected"
3467 );
3468 store_correction_in_memory(
3469 memory_arc,
3470 conv_id_bg,
3471 &assistant,
3472 &user_msg,
3473 skill_name,
3474 signal.kind.as_str(),
3475 )
3476 .await;
3477 }
3478 }
3479 Err(e) => {
3480 tracing::warn!("judge detector failed: {e:#}");
3481 }
3482 }
3483 });
3484 }
3485 }
3486
3487 #[allow(clippy::too_many_lines)]
3494 async fn detect_and_record_corrections(
3495 &mut self,
3496 trimmed: &str,
3497 conv_id: Option<zeph_memory::ConversationId>,
3498 ) {
3499 let correction_detection_enabled = self
3500 .learning_engine
3501 .config
3502 .as_ref()
3503 .is_none_or(|c| c.correction_detection);
3504 if !correction_detection_enabled {
3505 return;
3506 }
3507
3508 let previous_user_messages: Vec<&str> = self
3509 .msg
3510 .messages
3511 .iter()
3512 .filter(|m| m.role == Role::User)
3513 .map(|m| m.content.as_str())
3514 .collect();
3515
3516 let regex_signal = self
3517 .feedback
3518 .detector
3519 .detect(trimmed, &previous_user_messages);
3520
3521 let judge_should_run = if self.feedback.llm_classifier.is_some() {
3538 let adaptive_low = self
3540 .learning_engine
3541 .config
3542 .as_ref()
3543 .map_or(0.5, |c| c.judge_adaptive_low);
3544 let adaptive_high = self
3545 .learning_engine
3546 .config
3547 .as_ref()
3548 .map_or(0.8, |c| c.judge_adaptive_high);
3549 let should_invoke = self
3550 .feedback
3551 .judge
3552 .get_or_insert_with(|| {
3553 feedback_detector::JudgeDetector::new(adaptive_low, adaptive_high)
3554 })
3555 .should_invoke(regex_signal.as_ref());
3556 should_invoke
3557 && self
3558 .feedback
3559 .judge
3560 .as_mut()
3561 .is_some_and(feedback_detector::JudgeDetector::check_rate_limit)
3562 } else {
3563 self.feedback
3565 .judge
3566 .as_ref()
3567 .is_some_and(|jd| jd.should_invoke(regex_signal.as_ref()))
3568 && self
3569 .feedback
3570 .judge
3571 .as_mut() .is_some_and(feedback_detector::JudgeDetector::check_rate_limit)
3573 };
3574
3575 let (signal, signal_source) = if judge_should_run {
3576 self.spawn_judge_correction_check(trimmed, conv_id);
3577 (None, "judge")
3579 } else {
3580 (regex_signal, "regex")
3581 };
3582
3583 let Some(signal) = signal else { return };
3584 tracing::info!(
3585 kind = signal.kind.as_str(),
3586 confidence = signal.confidence,
3587 source = signal_source,
3588 "implicit correction detected"
3589 );
3590 let feedback_text = context::truncate_chars(&signal.feedback_text, 500);
3592 if self.is_learning_enabled()
3595 && signal.kind != feedback_detector::CorrectionKind::SelfCorrection
3596 {
3597 self.record_skill_outcomes(
3598 "user_rejection",
3599 Some(&feedback_text),
3600 Some(signal.kind.as_str()),
3601 )
3602 .await;
3603 }
3604 if let Some(memory) = &self.memory_state.memory {
3605 let correction_text = context::truncate_chars(trimmed, 500);
3609 match memory
3610 .sqlite()
3611 .store_user_correction(
3612 conv_id.map(|c| c.0),
3613 "",
3614 &correction_text,
3615 self.skill_state
3616 .active_skill_names
3617 .first()
3618 .map(String::as_str),
3619 signal.kind.as_str(),
3620 )
3621 .await
3622 {
3623 Ok(correction_id) => {
3624 if let Err(e) = memory
3625 .store_correction_embedding(correction_id, &correction_text)
3626 .await
3627 {
3628 tracing::warn!("failed to store correction embedding: {e:#}");
3629 }
3630 }
3631 Err(e) => tracing::warn!("failed to store user correction: {e:#}"),
3632 }
3633 }
3634 }
3635
3636 async fn process_user_message(
3637 &mut self,
3638 text: String,
3639 image_parts: Vec<zeph_llm::provider::MessagePart>,
3640 ) -> Result<(), error::AgentError> {
3641 let iteration_index = self.debug_state.iteration_counter;
3643 self.debug_state.iteration_counter += 1;
3644 if let Some(ref mut tc) = self.debug_state.trace_collector {
3645 tc.begin_iteration(iteration_index, text.trim());
3646 self.debug_state.current_iteration_span_id =
3648 tc.current_iteration_span_id(iteration_index);
3649 }
3650
3651 let result = self
3652 .process_user_message_inner(text, image_parts, iteration_index)
3653 .await;
3654
3655 if let Some(ref mut tc) = self.debug_state.trace_collector {
3657 let status = if result.is_ok() {
3658 crate::debug_dump::trace::SpanStatus::Ok
3659 } else {
3660 crate::debug_dump::trace::SpanStatus::Error {
3661 message: "iteration failed".to_owned(),
3662 }
3663 };
3664 tc.end_iteration(iteration_index, status);
3665 }
3666 self.debug_state.current_iteration_span_id = None;
3667
3668 result
3669 }
3670
3671 #[allow(clippy::too_many_lines)]
3672 async fn process_user_message_inner(
3673 &mut self,
3674 text: String,
3675 image_parts: Vec<zeph_llm::provider::MessagePart>,
3676 iteration_index: usize,
3677 ) -> Result<(), error::AgentError> {
3678 let _ = iteration_index; self.lifecycle.cancel_token = CancellationToken::new();
3680 let signal = Arc::clone(&self.lifecycle.cancel_signal);
3681 let token = self.lifecycle.cancel_token.clone();
3682 tokio::spawn(async move {
3683 signal.notified().await;
3684 token.cancel();
3685 });
3686 let trimmed = text.trim();
3687
3688 if let Some(result) = self.dispatch_slash_command(trimmed).await {
3689 return result;
3690 }
3691
3692 self.check_pending_rollbacks().await;
3693
3694 #[cfg(feature = "guardrail")]
3696 if let Some(ref guardrail) = self.security.guardrail {
3697 use zeph_sanitizer::guardrail::GuardrailVerdict;
3698 let verdict = guardrail.check(trimmed).await;
3699 match &verdict {
3700 GuardrailVerdict::Flagged { reason, .. } => {
3701 tracing::warn!(
3702 reason = %reason,
3703 should_block = verdict.should_block(),
3704 "guardrail flagged user input"
3705 );
3706 if verdict.should_block() {
3707 let msg = format!("[guardrail] Input blocked: {reason}");
3708 let _ = self.channel.send(&msg).await;
3709 let _ = self.channel.flush_chunks().await;
3710 return Ok(());
3711 }
3712 let _ = self
3714 .channel
3715 .send(&format!("[guardrail] Warning: {reason}"))
3716 .await;
3717 }
3718 GuardrailVerdict::Error { error } => {
3719 if guardrail.error_should_block() {
3720 tracing::warn!(%error, "guardrail check failed (fail_strategy=closed), blocking input");
3721 let msg = "[guardrail] Input blocked: check failed (see logs for details)";
3722 let _ = self.channel.send(msg).await;
3723 let _ = self.channel.flush_chunks().await;
3724 return Ok(());
3725 }
3726 tracing::warn!(%error, "guardrail check failed (fail_strategy=open), allowing input");
3727 }
3728 GuardrailVerdict::Safe => {}
3729 }
3730 }
3731
3732 #[cfg(feature = "classifiers")]
3738 if self.security.sanitizer.scan_user_input() {
3739 match self.security.sanitizer.classify_injection(trimmed).await {
3740 zeph_sanitizer::InjectionVerdict::Blocked => {
3741 self.push_classifier_metrics();
3742 let _ = self
3743 .channel
3744 .send("[security] Input blocked: injection detected by classifier.")
3745 .await;
3746 let _ = self.channel.flush_chunks().await;
3747 return Ok(());
3748 }
3749 zeph_sanitizer::InjectionVerdict::Suspicious => {
3750 tracing::warn!("injection_classifier soft_signal on user input");
3751 }
3752 zeph_sanitizer::InjectionVerdict::Clean => {}
3753 }
3754 }
3755 #[cfg(feature = "classifiers")]
3756 self.push_classifier_metrics();
3757
3758 self.mcp.pruning_cache.reset();
3760
3761 let conv_id = self.memory_state.conversation_id;
3764 self.rebuild_system_prompt(&text).await;
3765
3766 self.detect_and_record_corrections(trimmed, conv_id).await;
3767 self.learning_engine.tick();
3768 self.analyze_and_learn().await;
3769 self.sync_graph_counts().await;
3770
3771 self.context_manager.compaction = self.context_manager.compaction.advance_turn();
3776
3777 #[cfg(feature = "context-compression")]
3779 {
3780 self.focus.tick();
3781
3782 let sidequest_should_fire = self.sidequest.tick();
3785 if sidequest_should_fire && !self.context_manager.compaction.is_compacted_this_turn() {
3786 self.maybe_sidequest_eviction();
3787 }
3788 }
3789
3790 self.maybe_apply_deferred_summaries();
3795 self.flush_deferred_summaries().await;
3796
3797 if let Err(e) = self.maybe_proactive_compress().await {
3799 tracing::warn!("proactive compression failed: {e:#}");
3800 }
3801
3802 if let Err(e) = self.maybe_compact().await {
3803 tracing::warn!("context compaction failed: {e:#}");
3804 }
3805
3806 if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
3807 tracing::warn!("context preparation failed: {e:#}");
3808 }
3809
3810 self.provider
3812 .set_memory_confidence(self.memory_state.last_recall_confidence);
3813
3814 self.learning_engine.reset_reflection();
3815
3816 let mut all_image_parts = std::mem::take(&mut self.msg.pending_image_parts);
3817 all_image_parts.extend(image_parts);
3818 let image_parts = all_image_parts;
3819
3820 let user_msg = if !image_parts.is_empty() && self.provider.supports_vision() {
3821 let mut parts = vec![zeph_llm::provider::MessagePart::Text { text: text.clone() }];
3822 parts.extend(image_parts);
3823 Message::from_parts(Role::User, parts)
3824 } else {
3825 if !image_parts.is_empty() {
3826 tracing::warn!(
3827 count = image_parts.len(),
3828 "image attachments dropped: provider does not support vision"
3829 );
3830 }
3831 Message {
3832 role: Role::User,
3833 content: text.clone(),
3834 parts: vec![],
3835 metadata: MessageMetadata::default(),
3836 }
3837 };
3838 let urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
3840 if !urls.is_empty()
3841 && let Ok(mut set) = self.security.user_provided_urls.write()
3842 {
3843 set.extend(urls);
3844 }
3845
3846 self.memory_state.goal_text = Some(text.clone());
3849
3850 self.persist_message(Role::User, &text, &[], false).await;
3852 self.push_message(user_msg);
3853
3854 if let Err(e) = self.process_response().await {
3855 tracing::error!("Response processing failed: {e:#}");
3856 let user_msg = format!("Error: {e:#}");
3857 self.channel.send(&user_msg).await?;
3858 self.msg.messages.pop();
3859 self.recompute_prompt_tokens();
3860 self.channel.flush_chunks().await?;
3861 }
3862
3863 Ok(())
3864 }
3865
3866 async fn handle_image_command(&mut self, path: &str) -> Result<(), error::AgentError> {
3867 use std::path::Component;
3868 use zeph_llm::provider::{ImageData, MessagePart};
3869
3870 let has_parent_dir = std::path::Path::new(path)
3872 .components()
3873 .any(|c| c == Component::ParentDir);
3874 if has_parent_dir {
3875 self.channel
3876 .send("Invalid image path: path traversal not allowed")
3877 .await?;
3878 let _ = self.channel.flush_chunks().await;
3879 return Ok(());
3880 }
3881
3882 let data = match std::fs::read(path) {
3883 Ok(d) => d,
3884 Err(e) => {
3885 self.channel
3886 .send(&format!("Cannot read image {path}: {e}"))
3887 .await?;
3888 let _ = self.channel.flush_chunks().await;
3889 return Ok(());
3890 }
3891 };
3892 if data.len() > MAX_IMAGE_BYTES {
3893 self.channel
3894 .send(&format!(
3895 "Image {path} exceeds size limit ({} MB), skipping",
3896 MAX_IMAGE_BYTES / 1024 / 1024
3897 ))
3898 .await?;
3899 let _ = self.channel.flush_chunks().await;
3900 return Ok(());
3901 }
3902 let mime_type = detect_image_mime(Some(path)).to_string();
3903 self.msg
3904 .pending_image_parts
3905 .push(MessagePart::Image(Box::new(ImageData { data, mime_type })));
3906 self.channel
3907 .send(&format!("Image loaded: {path}. Send your message."))
3908 .await?;
3909 let _ = self.channel.flush_chunks().await;
3910 Ok(())
3911 }
3912
3913 async fn handle_help_command(&mut self) -> Result<(), error::AgentError> {
3914 use std::fmt::Write;
3915
3916 let mut out = String::from("Slash commands:\n\n");
3917
3918 let categories = [
3919 slash_commands::SlashCategory::Info,
3920 slash_commands::SlashCategory::Session,
3921 slash_commands::SlashCategory::Model,
3922 slash_commands::SlashCategory::Memory,
3923 slash_commands::SlashCategory::Tools,
3924 slash_commands::SlashCategory::Planning,
3925 slash_commands::SlashCategory::Debug,
3926 slash_commands::SlashCategory::Advanced,
3927 ];
3928
3929 for cat in &categories {
3930 let entries: Vec<_> = slash_commands::COMMANDS
3931 .iter()
3932 .filter(|c| &c.category == cat)
3933 .collect();
3934 if entries.is_empty() {
3935 continue;
3936 }
3937 let _ = writeln!(out, "{}:", cat.as_str());
3938 for cmd in entries {
3939 if cmd.args.is_empty() {
3940 let _ = write!(out, " {}", cmd.name);
3941 } else {
3942 let _ = write!(out, " {} {}", cmd.name, cmd.args);
3943 }
3944 let _ = write!(out, " — {}", cmd.description);
3945 if let Some(feat) = cmd.feature_gate {
3946 let _ = write!(out, " [requires: {feat}]");
3947 }
3948 let _ = writeln!(out);
3949 }
3950 let _ = writeln!(out);
3951 }
3952
3953 self.channel.send(out.trim_end()).await?;
3954 Ok(())
3955 }
3956
3957 #[allow(clippy::too_many_lines)]
3958 async fn handle_status_command(&mut self) -> Result<(), error::AgentError> {
3959 use std::fmt::Write;
3960
3961 let uptime = self.lifecycle.start_time.elapsed().as_secs();
3962 let msg_count = self
3963 .msg
3964 .messages
3965 .iter()
3966 .filter(|m| m.role == Role::User)
3967 .count();
3968
3969 let (
3970 api_calls,
3971 prompt_tokens,
3972 completion_tokens,
3973 cost_cents,
3974 mcp_servers,
3975 orch_plans,
3976 orch_tasks,
3977 orch_completed,
3978 orch_failed,
3979 orch_skipped,
3980 ) = if let Some(ref tx) = self.metrics.metrics_tx {
3981 let m = tx.borrow();
3982 (
3983 m.api_calls,
3984 m.prompt_tokens,
3985 m.completion_tokens,
3986 m.cost_spent_cents,
3987 m.mcp_server_count,
3988 m.orchestration.plans_total,
3989 m.orchestration.tasks_total,
3990 m.orchestration.tasks_completed,
3991 m.orchestration.tasks_failed,
3992 m.orchestration.tasks_skipped,
3993 )
3994 } else {
3995 (0, 0, 0, 0.0, 0, 0, 0, 0, 0, 0)
3996 };
3997
3998 let skill_count = self
3999 .skill_state
4000 .registry
4001 .read()
4002 .map(|r| r.all_meta().len())
4003 .unwrap_or(0);
4004
4005 let mut out = String::from("Session status:\n\n");
4006 let _ = writeln!(out, "Provider: {}", self.provider.name());
4007 let _ = writeln!(out, "Model: {}", self.runtime.model_name);
4008 let _ = writeln!(out, "Uptime: {uptime}s");
4009 let _ = writeln!(out, "Turns: {msg_count}");
4010 let _ = writeln!(out, "API calls: {api_calls}");
4011 let _ = writeln!(
4012 out,
4013 "Tokens: {prompt_tokens} prompt / {completion_tokens} completion"
4014 );
4015 let _ = writeln!(out, "Skills: {skill_count}");
4016 let _ = writeln!(out, "MCP: {mcp_servers} server(s)");
4017 if let Some(ref tf) = self.tool_schema_filter {
4018 let _ = writeln!(
4019 out,
4020 "Filter: enabled (top_k={}, always_on={}, {} embeddings)",
4021 tf.top_k(),
4022 tf.always_on_count(),
4023 tf.embedding_count(),
4024 );
4025 }
4026 #[cfg(feature = "policy-enforcer")]
4027 if let Some(ref adv) = self.runtime.adversarial_policy_info {
4028 let provider_display = if adv.provider.is_empty() {
4029 "default"
4030 } else {
4031 adv.provider.as_str()
4032 };
4033 let _ = writeln!(
4034 out,
4035 "Adv gate: enabled (provider={}, policies={}, fail_open={})",
4036 provider_display, adv.policy_count, adv.fail_open
4037 );
4038 }
4039 if cost_cents > 0.0 {
4040 let _ = writeln!(out, "Cost: ${:.4}", cost_cents / 100.0);
4041 }
4042 if orch_plans > 0 {
4043 let _ = writeln!(out);
4044 let _ = writeln!(out, "Orchestration:");
4045 let _ = writeln!(out, " Plans: {orch_plans}");
4046 let _ = writeln!(out, " Tasks: {orch_completed}/{orch_tasks} completed");
4047 if orch_failed > 0 {
4048 let _ = writeln!(out, " Failed: {orch_failed}");
4049 }
4050 if orch_skipped > 0 {
4051 let _ = writeln!(out, " Skipped: {orch_skipped}");
4052 }
4053 }
4054
4055 #[cfg(feature = "context-compression")]
4057 {
4058 use crate::config::PruningStrategy;
4059 if matches!(
4060 self.context_manager.compression.pruning_strategy,
4061 PruningStrategy::Subgoal | PruningStrategy::SubgoalMig
4062 ) {
4063 let _ = writeln!(out);
4064 let _ = writeln!(
4065 out,
4066 "Pruning: {}",
4067 match self.context_manager.compression.pruning_strategy {
4068 PruningStrategy::SubgoalMig => "subgoal_mig",
4069 _ => "subgoal",
4070 }
4071 );
4072 let subgoal_count = self.compression.subgoal_registry.subgoals.len();
4073 let _ = writeln!(out, "Subgoals: {subgoal_count} tracked");
4074 if let Some(active) = self.compression.subgoal_registry.active_subgoal() {
4075 let _ = writeln!(out, "Active: \"{}\"", active.description);
4076 } else {
4077 let _ = writeln!(out, "Active: (none yet)");
4078 }
4079 }
4080 }
4081
4082 let gc = &self.memory_state.graph_config;
4084 if gc.enabled {
4085 let _ = writeln!(out);
4086 if gc.spreading_activation.enabled {
4087 let _ = writeln!(
4088 out,
4089 "Graph recall: spreading activation (lambda={:.2}, hops={})",
4090 gc.spreading_activation.decay_lambda, gc.spreading_activation.max_hops,
4091 );
4092 } else {
4093 let _ = writeln!(out, "Graph recall: BFS (hops={})", gc.max_hops,);
4094 }
4095 }
4096
4097 self.channel.send(out.trim_end()).await?;
4098 Ok(())
4099 }
4100
4101 #[cfg(feature = "guardrail")]
4102 async fn handle_guardrail_command(&mut self) -> Result<(), error::AgentError> {
4103 use std::fmt::Write;
4104
4105 let mut out = String::new();
4106 if let Some(ref guardrail) = self.security.guardrail {
4107 let stats = guardrail.stats();
4108 let _ = writeln!(out, "Guardrail: enabled");
4109 let _ = writeln!(out, "Action: {:?}", guardrail.action());
4110 let _ = writeln!(out, "Fail strategy: {:?}", guardrail.fail_strategy());
4111 let _ = writeln!(out, "Timeout: {}ms", guardrail.timeout_ms());
4112 let _ = writeln!(
4113 out,
4114 "Tool scan: {}",
4115 if guardrail.scan_tool_output() {
4116 "enabled"
4117 } else {
4118 "disabled"
4119 }
4120 );
4121 let _ = writeln!(out, "\nStats:");
4122 let _ = writeln!(out, " Total checks: {}", stats.total_checks);
4123 let _ = writeln!(out, " Flagged: {}", stats.flagged_count);
4124 let _ = writeln!(out, " Errors: {}", stats.error_count);
4125 let _ = writeln!(out, " Avg latency: {}ms", stats.avg_latency_ms());
4126 } else {
4127 out.push_str("Guardrail: disabled\n");
4128 out.push_str(
4129 "Enable with: --guardrail flag or [security.guardrail] enabled = true in config",
4130 );
4131 }
4132
4133 self.channel.send(out.trim_end()).await?;
4134 Ok(())
4135 }
4136
4137 async fn handle_skills_family(&mut self, subcommand: &str) -> Result<(), error::AgentError> {
4138 match subcommand {
4139 "" => self.handle_skills_command().await,
4140 "confusability" => self.handle_skills_confusability_command().await,
4141 other => {
4142 self.channel
4143 .send(&format!(
4144 "Unknown /skills subcommand: '{other}'. Available: confusability"
4145 ))
4146 .await?;
4147 Ok(())
4148 }
4149 }
4150 }
4151
4152 async fn handle_skills_command(&mut self) -> Result<(), error::AgentError> {
4153 use std::collections::BTreeMap;
4154 use std::fmt::Write;
4155
4156 let all_meta: Vec<zeph_skills::loader::SkillMeta> = self
4157 .skill_state
4158 .registry
4159 .read()
4160 .expect("registry read lock")
4161 .all_meta()
4162 .into_iter()
4163 .cloned()
4164 .collect();
4165
4166 let mut trust_map: std::collections::HashMap<String, String> =
4168 std::collections::HashMap::new();
4169 for meta in &all_meta {
4170 if let Some(memory) = &self.memory_state.memory {
4171 let info = memory
4172 .sqlite()
4173 .load_skill_trust(&meta.name)
4174 .await
4175 .ok()
4176 .flatten()
4177 .map_or_else(String::new, |r| format!(" [{}]", r.trust_level));
4178 trust_map.insert(meta.name.clone(), info);
4179 }
4180 }
4181
4182 let mut output = String::from("Available skills:\n\n");
4183
4184 let has_categories = all_meta.iter().any(|m| m.category.is_some());
4186 if has_categories {
4187 let mut by_category: BTreeMap<&str, Vec<&zeph_skills::loader::SkillMeta>> =
4189 BTreeMap::new();
4190 for meta in &all_meta {
4191 let cat = meta.category.as_deref().unwrap_or("other");
4192 by_category.entry(cat).or_default().push(meta);
4193 }
4194 for (cat, skills) in &by_category {
4195 let _ = writeln!(output, "[{cat}]");
4196 for meta in skills {
4197 let trust_info = trust_map.get(&meta.name).map_or("", String::as_str);
4198 let _ = writeln!(output, "- {} — {}{trust_info}", meta.name, meta.description);
4199 }
4200 output.push('\n');
4201 }
4202 } else {
4203 for meta in &all_meta {
4204 let trust_info = trust_map.get(&meta.name).map_or("", String::as_str);
4205 let _ = writeln!(output, "- {} — {}{trust_info}", meta.name, meta.description);
4206 }
4207 }
4208
4209 if let Some(memory) = &self.memory_state.memory {
4210 match memory.sqlite().load_skill_usage().await {
4211 Ok(usage) if !usage.is_empty() => {
4212 output.push_str("\nUsage statistics:\n\n");
4213 for row in &usage {
4214 let _ = writeln!(
4215 output,
4216 "- {}: {} invocations (last: {})",
4217 row.skill_name, row.invocation_count, row.last_used_at,
4218 );
4219 }
4220 }
4221 Ok(_) => {}
4222 Err(e) => tracing::warn!("failed to load skill usage: {e:#}"),
4223 }
4224 }
4225
4226 self.channel.send(&output).await?;
4227 Ok(())
4228 }
4229
4230 async fn handle_skills_confusability_command(&mut self) -> Result<(), error::AgentError> {
4231 let threshold = self.skill_state.confusability_threshold;
4232 if threshold <= 0.0 {
4233 self.channel
4234 .send(
4235 "Confusability monitoring is disabled. \
4236 Set [skills] confusability_threshold in config (e.g. 0.85) to enable.",
4237 )
4238 .await?;
4239 return Ok(());
4240 }
4241
4242 let Some(matcher) = &self.skill_state.matcher else {
4243 self.channel
4244 .send("Skill matcher not available (no embedding provider configured).")
4245 .await?;
4246 return Ok(());
4247 };
4248
4249 let all_meta: Vec<zeph_skills::loader::SkillMeta> = self
4250 .skill_state
4251 .registry
4252 .read()
4253 .expect("registry read lock")
4254 .all_meta()
4255 .into_iter()
4256 .cloned()
4257 .collect();
4258 let refs: Vec<&zeph_skills::loader::SkillMeta> = all_meta.iter().collect();
4259
4260 let report = matcher.confusability_report(&refs, threshold).await;
4261 self.channel.send(&report.to_string()).await?;
4262 Ok(())
4263 }
4264
4265 async fn handle_feedback(&mut self, input: &str) -> Result<(), error::AgentError> {
4266 let Some((name, rest)) = input.split_once(' ') else {
4267 self.channel
4268 .send("Usage: /feedback <skill_name> <message>")
4269 .await?;
4270 return Ok(());
4271 };
4272 let (skill_name, feedback) = (name.trim(), rest.trim().trim_matches('"'));
4273
4274 if feedback.is_empty() {
4275 self.channel
4276 .send("Usage: /feedback <skill_name> <message>")
4277 .await?;
4278 return Ok(());
4279 }
4280
4281 let Some(memory) = &self.memory_state.memory else {
4282 self.channel.send("Memory not available.").await?;
4283 return Ok(());
4284 };
4285
4286 let outcome_type = if self.feedback.detector.detect(feedback, &[]).is_some() {
4287 "user_rejection"
4288 } else {
4289 "user_approval"
4290 };
4291
4292 memory
4293 .sqlite()
4294 .record_skill_outcome(
4295 skill_name,
4296 None,
4297 self.memory_state.conversation_id,
4298 outcome_type,
4299 None,
4300 Some(feedback),
4301 )
4302 .await?;
4303
4304 if self.is_learning_enabled() && outcome_type == "user_rejection" {
4305 self.generate_improved_skill(skill_name, feedback, "", Some(feedback))
4306 .await
4307 .ok();
4308 }
4309
4310 self.channel
4311 .send(&format!("Feedback recorded for \"{skill_name}\"."))
4312 .await?;
4313 Ok(())
4314 }
4315
4316 async fn poll_subagent_until_done(&mut self, task_id: &str, label: &str) -> Option<String> {
4319 use crate::subagent::SubAgentState;
4320 let result = loop {
4321 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
4322
4323 #[allow(clippy::redundant_closure_for_method_calls)]
4327 let pending = self
4328 .orchestration
4329 .subagent_manager
4330 .as_mut()
4331 .and_then(|m| m.try_recv_secret_request());
4332 if let Some((req_task_id, req)) = pending {
4333 let confirm_prompt = format!(
4336 "Sub-agent requests secret '{}'. Allow?",
4337 crate::text::truncate_to_chars(&req.secret_key, 100)
4338 );
4339 let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
4340 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
4341 if approved {
4342 let ttl = std::time::Duration::from_secs(300);
4343 let key = req.secret_key.clone();
4344 if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
4345 let _ = mgr.deliver_secret(&req_task_id, key);
4346 }
4347 } else {
4348 let _ = mgr.deny_secret(&req_task_id);
4349 }
4350 }
4351 }
4352
4353 let mgr = self.orchestration.subagent_manager.as_ref()?;
4354 let statuses = mgr.statuses();
4355 let Some((_, status)) = statuses.iter().find(|(id, _)| id == task_id) else {
4356 break format!("{label} completed (no status available).");
4357 };
4358 match status.state {
4359 SubAgentState::Completed => {
4360 let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
4361 break format!("{label} completed: {msg}");
4362 }
4363 SubAgentState::Failed => {
4364 let msg = status
4365 .last_message
4366 .clone()
4367 .unwrap_or_else(|| "unknown error".into());
4368 break format!("{label} failed: {msg}");
4369 }
4370 SubAgentState::Canceled => {
4371 break format!("{label} was cancelled.");
4372 }
4373 _ => {
4374 let _ = self
4375 .channel
4376 .send_status(&format!(
4377 "{label}: turn {}/{}",
4378 status.turns_used,
4379 self.orchestration
4380 .subagent_manager
4381 .as_ref()
4382 .and_then(|m| m.agents_def(task_id))
4383 .map_or(20, |d| d.permissions.max_turns)
4384 ))
4385 .await;
4386 }
4387 }
4388 };
4389 Some(result)
4390 }
4391
4392 fn resolve_agent_id_prefix(&mut self, prefix: &str) -> Option<Result<String, String>> {
4395 let mgr = self.orchestration.subagent_manager.as_mut()?;
4396 let full_ids: Vec<String> = mgr
4397 .statuses()
4398 .into_iter()
4399 .map(|(tid, _)| tid)
4400 .filter(|tid| tid.starts_with(prefix))
4401 .collect();
4402 Some(match full_ids.as_slice() {
4403 [] => Err(format!("No sub-agent with id prefix '{prefix}'")),
4404 [fid] => Ok(fid.clone()),
4405 _ => Err(format!(
4406 "Ambiguous id prefix '{prefix}': matches {} agents",
4407 full_ids.len()
4408 )),
4409 })
4410 }
4411
4412 fn handle_agent_list(&self) -> Option<String> {
4413 use std::fmt::Write as _;
4414 let mgr = self.orchestration.subagent_manager.as_ref()?;
4415 let defs = mgr.definitions();
4416 if defs.is_empty() {
4417 return Some("No sub-agent definitions found.".into());
4418 }
4419 let mut out = String::from("Available sub-agents:\n");
4420 for d in defs {
4421 let memory_label = match d.memory {
4422 Some(crate::subagent::MemoryScope::User) => " [memory:user]",
4423 Some(crate::subagent::MemoryScope::Project) => " [memory:project]",
4424 Some(crate::subagent::MemoryScope::Local) => " [memory:local]",
4425 None => "",
4426 };
4427 if let Some(ref src) = d.source {
4428 let _ = writeln!(
4429 out,
4430 " {}{} — {} ({})",
4431 d.name, memory_label, d.description, src
4432 );
4433 } else {
4434 let _ = writeln!(out, " {}{} — {}", d.name, memory_label, d.description);
4435 }
4436 }
4437 Some(out)
4438 }
4439
4440 fn handle_agent_status(&self) -> Option<String> {
4441 use std::fmt::Write as _;
4442 let mgr = self.orchestration.subagent_manager.as_ref()?;
4443 let statuses = mgr.statuses();
4444 if statuses.is_empty() {
4445 return Some("No active sub-agents.".into());
4446 }
4447 let mut out = String::from("Active sub-agents:\n");
4448 for (id, s) in &statuses {
4449 let state = format!("{:?}", s.state).to_lowercase();
4450 let elapsed = s.started_at.elapsed().as_secs();
4451 let _ = writeln!(
4452 out,
4453 " [{short}] {state} turns={t} elapsed={elapsed}s {msg}",
4454 short = &id[..8.min(id.len())],
4455 t = s.turns_used,
4456 msg = s.last_message.as_deref().unwrap_or(""),
4457 );
4458 if let Some(def) = mgr.agents_def(id)
4460 && let Some(scope) = def.memory
4461 && let Ok(dir) = crate::subagent::memory::resolve_memory_dir(scope, &def.name)
4462 {
4463 let _ = writeln!(out, " memory: {}", dir.display());
4464 }
4465 }
4466 Some(out)
4467 }
4468
4469 fn handle_agent_approve(&mut self, id: &str) -> Option<String> {
4470 let full_id = match self.resolve_agent_id_prefix(id)? {
4471 Ok(fid) => fid,
4472 Err(msg) => return Some(msg),
4473 };
4474 let mgr = self.orchestration.subagent_manager.as_mut()?;
4475 if let Some((tid, req)) = mgr.try_recv_secret_request()
4476 && tid == full_id
4477 {
4478 let key = req.secret_key.clone();
4479 let ttl = std::time::Duration::from_secs(300);
4480 if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
4481 return Some(format!("Approve failed: {e}"));
4482 }
4483 if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
4484 return Some(format!("Secret delivery failed: {e}"));
4485 }
4486 return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
4487 }
4488 Some(format!(
4489 "No pending secret request for sub-agent '{full_id}'."
4490 ))
4491 }
4492
4493 fn handle_agent_deny(&mut self, id: &str) -> Option<String> {
4494 let full_id = match self.resolve_agent_id_prefix(id)? {
4495 Ok(fid) => fid,
4496 Err(msg) => return Some(msg),
4497 };
4498 let mgr = self.orchestration.subagent_manager.as_mut()?;
4499 match mgr.deny_secret(&full_id) {
4500 Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
4501 Err(e) => Some(format!("Deny failed: {e}")),
4502 }
4503 }
4504
4505 async fn handle_agent_command(&mut self, cmd: crate::subagent::AgentCommand) -> Option<String> {
4506 use crate::subagent::AgentCommand;
4507
4508 match cmd {
4509 AgentCommand::List => self.handle_agent_list(),
4510 AgentCommand::Background { name, prompt } => {
4511 let provider = self.provider.clone();
4512 let tool_executor = Arc::clone(&self.tool_executor);
4513 let skills = self.filtered_skills_for(&name);
4514 let mgr = self.orchestration.subagent_manager.as_mut()?;
4515 let cfg = self.orchestration.subagent_config.clone();
4516 match mgr.spawn(&name, &prompt, provider, tool_executor, skills, &cfg) {
4517 Ok(id) => Some(format!(
4518 "Sub-agent '{name}' started in background (id: {short})",
4519 short = &id[..8.min(id.len())]
4520 )),
4521 Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
4522 }
4523 }
4524 AgentCommand::Spawn { name, prompt }
4525 | AgentCommand::Mention {
4526 agent: name,
4527 prompt,
4528 } => {
4529 let provider = self.provider.clone();
4531 let tool_executor = Arc::clone(&self.tool_executor);
4532 let skills = self.filtered_skills_for(&name);
4533 let mgr = self.orchestration.subagent_manager.as_mut()?;
4534 let cfg = self.orchestration.subagent_config.clone();
4535 let task_id = match mgr.spawn(&name, &prompt, provider, tool_executor, skills, &cfg)
4536 {
4537 Ok(id) => id,
4538 Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
4539 };
4540 let short = task_id[..8.min(task_id.len())].to_owned();
4541 let _ = self
4542 .channel
4543 .send(&format!("Sub-agent '{name}' running... (id: {short})"))
4544 .await;
4545 let label = format!("Sub-agent '{name}'");
4546 self.poll_subagent_until_done(&task_id, &label).await
4547 }
4548 AgentCommand::Status => self.handle_agent_status(),
4549 AgentCommand::Cancel { id } => {
4550 let mgr = self.orchestration.subagent_manager.as_mut()?;
4551 let ids: Vec<String> = mgr
4553 .statuses()
4554 .into_iter()
4555 .map(|(task_id, _)| task_id)
4556 .filter(|task_id| task_id.starts_with(&id))
4557 .collect();
4558 match ids.as_slice() {
4559 [] => Some(format!("No sub-agent with id prefix '{id}'")),
4560 [full_id] => {
4561 let full_id = full_id.clone();
4562 match mgr.cancel(&full_id) {
4563 Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
4564 Err(e) => Some(format!("Cancel failed: {e}")),
4565 }
4566 }
4567 _ => Some(format!(
4568 "Ambiguous id prefix '{id}': matches {} agents",
4569 ids.len()
4570 )),
4571 }
4572 }
4573 AgentCommand::Approve { id } => self.handle_agent_approve(&id),
4574 AgentCommand::Deny { id } => self.handle_agent_deny(&id),
4575 AgentCommand::Resume { id, prompt } => {
4576 let cfg = self.orchestration.subagent_config.clone();
4577 let def_name = {
4580 let mgr = self.orchestration.subagent_manager.as_ref()?;
4581 match mgr.def_name_for_resume(&id, &cfg) {
4582 Ok(name) => name,
4583 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
4584 }
4585 };
4586 let skills = self.filtered_skills_for(&def_name);
4587 let provider = self.provider.clone();
4588 let tool_executor = Arc::clone(&self.tool_executor);
4589 let mgr = self.orchestration.subagent_manager.as_mut()?;
4590 let (task_id, _) =
4591 match mgr.resume(&id, &prompt, provider, tool_executor, skills, &cfg) {
4592 Ok(pair) => pair,
4593 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
4594 };
4595 let short = task_id[..8.min(task_id.len())].to_owned();
4596 let _ = self
4597 .channel
4598 .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
4599 .await;
4600 self.poll_subagent_until_done(&task_id, "Resumed sub-agent")
4601 .await
4602 }
4603 }
4604 }
4605
4606 fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
4607 let mgr = self.orchestration.subagent_manager.as_ref()?;
4608 let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
4609 let reg = self
4610 .skill_state
4611 .registry
4612 .read()
4613 .expect("registry read lock");
4614 match crate::subagent::filter_skills(®, &def.skills) {
4615 Ok(skills) => {
4616 let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
4617 if bodies.is_empty() {
4618 None
4619 } else {
4620 Some(bodies)
4621 }
4622 }
4623 Err(e) => {
4624 tracing::warn!(error = %e, "skill filtering failed for sub-agent");
4625 None
4626 }
4627 }
4628 }
4629
4630 async fn update_trust_for_reloaded_skills(&self, all_meta: &[zeph_skills::loader::SkillMeta]) {
4632 let Some(ref memory) = self.memory_state.memory else {
4633 return;
4634 };
4635 let trust_cfg = self.skill_state.trust_config.clone();
4636 let managed_dir = self.skill_state.managed_dir.clone();
4637 for meta in all_meta {
4638 let source_kind = if managed_dir
4639 .as_ref()
4640 .is_some_and(|d| meta.skill_dir.starts_with(d))
4641 {
4642 zeph_memory::store::SourceKind::Hub
4643 } else {
4644 zeph_memory::store::SourceKind::Local
4645 };
4646 let initial_level = if matches!(source_kind, zeph_memory::store::SourceKind::Hub) {
4647 &trust_cfg.default_level
4648 } else {
4649 &trust_cfg.local_level
4650 };
4651 match zeph_skills::compute_skill_hash(&meta.skill_dir) {
4652 Ok(current_hash) => {
4653 let existing = memory
4654 .sqlite()
4655 .load_skill_trust(&meta.name)
4656 .await
4657 .ok()
4658 .flatten();
4659 let trust_level_str = if let Some(ref row) = existing {
4660 if row.blake3_hash == current_hash {
4661 row.trust_level.clone()
4662 } else {
4663 trust_cfg.hash_mismatch_level.to_string()
4664 }
4665 } else {
4666 initial_level.to_string()
4667 };
4668 let source_path = meta.skill_dir.to_str();
4669 if let Err(e) = memory
4670 .sqlite()
4671 .upsert_skill_trust(
4672 &meta.name,
4673 &trust_level_str,
4674 source_kind,
4675 None,
4676 source_path,
4677 ¤t_hash,
4678 )
4679 .await
4680 {
4681 tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
4682 }
4683 }
4684 Err(e) => {
4685 tracing::warn!("failed to compute hash for '{}': {e:#}", meta.name);
4686 }
4687 }
4688 }
4689 }
4690
4691 async fn rebuild_skill_matcher(&mut self, all_meta: &[&zeph_skills::loader::SkillMeta]) {
4693 let provider = self.embedding_provider.clone();
4694 let embed_fn = |text: &str| -> zeph_skills::matcher::EmbedFuture {
4695 let owned = text.to_owned();
4696 let p = provider.clone();
4697 Box::pin(async move { p.embed(&owned).await })
4698 };
4699
4700 let needs_inmemory_rebuild = !self
4701 .skill_state
4702 .matcher
4703 .as_ref()
4704 .is_some_and(SkillMatcherBackend::is_qdrant);
4705
4706 if needs_inmemory_rebuild {
4707 self.skill_state.matcher = SkillMatcher::new(all_meta, embed_fn)
4708 .await
4709 .map(SkillMatcherBackend::InMemory);
4710 } else if let Some(ref mut backend) = self.skill_state.matcher {
4711 let _ = self.channel.send_status("syncing skill index...").await;
4712 if let Err(e) = backend
4713 .sync(all_meta, &self.skill_state.embedding_model, embed_fn)
4714 .await
4715 {
4716 tracing::warn!("failed to sync skill embeddings: {e:#}");
4717 }
4718 }
4719
4720 if self.skill_state.hybrid_search {
4721 let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
4722 let _ = self.channel.send_status("rebuilding search index...").await;
4723 self.skill_state.bm25_index = Some(zeph_skills::bm25::Bm25Index::build(&descs));
4724 }
4725 }
4726
4727 async fn reload_skills(&mut self) {
4728 let new_registry = SkillRegistry::load(&self.skill_state.skill_paths);
4729 if new_registry.fingerprint()
4730 == self
4731 .skill_state
4732 .registry
4733 .read()
4734 .expect("registry read lock")
4735 .fingerprint()
4736 {
4737 return;
4738 }
4739 let _ = self.channel.send_status("reloading skills...").await;
4740 *self
4741 .skill_state
4742 .registry
4743 .write()
4744 .expect("registry write lock") = new_registry;
4745
4746 let all_meta = self
4747 .skill_state
4748 .registry
4749 .read()
4750 .expect("registry read lock")
4751 .all_meta()
4752 .into_iter()
4753 .cloned()
4754 .collect::<Vec<_>>();
4755
4756 self.update_trust_for_reloaded_skills(&all_meta).await;
4757
4758 let all_meta_refs = all_meta.iter().collect::<Vec<_>>();
4759 self.rebuild_skill_matcher(&all_meta_refs).await;
4760
4761 let all_skills: Vec<Skill> = {
4762 let reg = self
4763 .skill_state
4764 .registry
4765 .read()
4766 .expect("registry read lock");
4767 reg.all_meta()
4768 .iter()
4769 .filter_map(|m| reg.get_skill(&m.name).ok())
4770 .collect()
4771 };
4772 let trust_map = self.build_skill_trust_map().await;
4773 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
4774 let skills_prompt = format_skills_prompt(&all_skills, &trust_map, &empty_health);
4775 self.skill_state
4776 .last_skills_prompt
4777 .clone_from(&skills_prompt);
4778 let system_prompt = build_system_prompt(&skills_prompt, None, None, false);
4779 if let Some(msg) = self.msg.messages.first_mut() {
4780 msg.content = system_prompt;
4781 }
4782
4783 let _ = self.channel.send_status("").await;
4784 tracing::info!(
4785 "reloaded {} skill(s)",
4786 self.skill_state
4787 .registry
4788 .read()
4789 .expect("registry read lock")
4790 .all_meta()
4791 .len()
4792 );
4793 }
4794
4795 fn reload_instructions(&mut self) {
4796 if let Some(ref mut rx) = self.instructions.reload_rx {
4798 while rx.try_recv().is_ok() {}
4799 }
4800 let Some(ref state) = self.instructions.reload_state else {
4801 return;
4802 };
4803 let new_blocks = crate::instructions::load_instructions(
4804 &state.base_dir,
4805 &state.provider_kinds,
4806 &state.explicit_files,
4807 state.auto_detect,
4808 );
4809 let old_sources: std::collections::HashSet<_> =
4810 self.instructions.blocks.iter().map(|b| &b.source).collect();
4811 let new_sources: std::collections::HashSet<_> =
4812 new_blocks.iter().map(|b| &b.source).collect();
4813 for added in new_sources.difference(&old_sources) {
4814 tracing::info!(path = %added.display(), "instruction file added");
4815 }
4816 for removed in old_sources.difference(&new_sources) {
4817 tracing::info!(path = %removed.display(), "instruction file removed");
4818 }
4819 tracing::info!(
4820 old_count = self.instructions.blocks.len(),
4821 new_count = new_blocks.len(),
4822 "reloaded instruction files"
4823 );
4824 self.instructions.blocks = new_blocks;
4825 }
4826
4827 fn reload_config(&mut self) {
4828 let Some(ref path) = self.lifecycle.config_path else {
4829 return;
4830 };
4831 let config = match Config::load(path) {
4832 Ok(c) => c,
4833 Err(e) => {
4834 tracing::warn!("config reload failed: {e:#}");
4835 return;
4836 }
4837 };
4838
4839 self.runtime.security = config.security;
4840 self.runtime.timeouts = config.timeouts;
4841 self.runtime.redact_credentials = config.memory.redact_credentials;
4842 self.memory_state.history_limit = config.memory.history_limit;
4843 self.memory_state.recall_limit = config.memory.semantic.recall_limit;
4844 self.memory_state.summarization_threshold = config.memory.summarization_threshold;
4845 self.skill_state.max_active_skills = config.skills.max_active_skills;
4846 self.skill_state.disambiguation_threshold = config.skills.disambiguation_threshold;
4847 self.skill_state.min_injection_score = config.skills.min_injection_score;
4848 self.skill_state.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
4849 self.skill_state.hybrid_search = config.skills.hybrid_search;
4850 self.skill_state.two_stage_matching = config.skills.two_stage_matching;
4851 self.skill_state.confusability_threshold =
4852 config.skills.confusability_threshold.clamp(0.0, 1.0);
4853
4854 if config.memory.context_budget_tokens > 0 {
4855 self.context_manager.budget = Some(
4856 ContextBudget::new(config.memory.context_budget_tokens, 0.20)
4857 .with_graph_enabled(config.memory.graph.enabled),
4858 );
4859 } else {
4860 self.context_manager.budget = None;
4861 }
4862
4863 {
4864 let graph_cfg = &config.memory.graph;
4865 if graph_cfg.rpe.enabled {
4866 if self.memory_state.rpe_router.is_none() {
4868 self.memory_state.rpe_router =
4869 Some(std::sync::Mutex::new(zeph_memory::RpeRouter::new(
4870 graph_cfg.rpe.threshold,
4871 graph_cfg.rpe.max_skip_turns,
4872 )));
4873 }
4874 } else {
4875 self.memory_state.rpe_router = None;
4876 }
4877 self.memory_state.graph_config = graph_cfg.clone();
4878 }
4879 self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
4880 self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
4881 self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
4882 self.context_manager.compaction_cooldown_turns = config.memory.compaction_cooldown_turns;
4883 self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
4884 self.context_manager.compression = config.memory.compression.clone();
4885 self.context_manager.routing = config.memory.store_routing.clone();
4886 self.context_manager.store_routing_provider = if config
4888 .memory
4889 .store_routing
4890 .routing_classifier_provider
4891 .is_empty()
4892 {
4893 None
4894 } else {
4895 let resolved = self.resolve_background_provider(
4896 &config.memory.store_routing.routing_classifier_provider,
4897 );
4898 Some(std::sync::Arc::new(resolved))
4899 };
4900 self.memory_state.cross_session_score_threshold =
4901 config.memory.cross_session_score_threshold;
4902
4903 self.index.repo_map_tokens = config.index.repo_map_tokens;
4904 self.index.repo_map_ttl = std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
4905
4906 tracing::info!("config reloaded");
4907 }
4908
4909 #[cfg(feature = "context-compression")]
4911 async fn handle_focus_status_command(&mut self) -> Result<(), error::AgentError> {
4912 use std::fmt::Write;
4913 let mut out = String::from("Focus Agent status\n\n");
4914 let _ = writeln!(out, "Enabled: {}", self.focus.config.enabled);
4915 let _ = writeln!(out, "Active session: {}", self.focus.is_active());
4916 if let Some(ref scope) = self.focus.active_scope {
4917 let _ = writeln!(out, "Active scope: {scope}");
4918 }
4919 let _ = writeln!(
4920 out,
4921 "Knowledge blocks: {}",
4922 self.focus.knowledge_blocks.len()
4923 );
4924 let _ = writeln!(out, "Turns since focus: {}", self.focus.turns_since_focus);
4925 self.channel.send(&out).await?;
4926 Ok(())
4927 }
4928
4929 #[cfg(feature = "context-compression")]
4931 async fn handle_sidequest_status_command(&mut self) -> Result<(), error::AgentError> {
4932 use std::fmt::Write;
4933 let mut out = String::from("SideQuest status\n\n");
4934 let _ = writeln!(out, "Enabled: {}", self.sidequest.config.enabled);
4935 let _ = writeln!(
4936 out,
4937 "Interval turns: {}",
4938 self.sidequest.config.interval_turns
4939 );
4940 let _ = writeln!(out, "Turn counter: {}", self.sidequest.turn_counter);
4941 let _ = writeln!(out, "Passes run: {}", self.sidequest.passes_run);
4942 let _ = writeln!(
4943 out,
4944 "Total evicted: {} tool outputs",
4945 self.sidequest.total_evicted
4946 );
4947 self.channel.send(&out).await?;
4948 Ok(())
4949 }
4950
4951 #[cfg(feature = "context-compression")]
4962 #[allow(clippy::too_many_lines)]
4963 fn maybe_sidequest_eviction(&mut self) {
4964 use zeph_llm::provider::{Message, MessageMetadata, Role};
4965
4966 if self.sidequest.config.enabled {
4970 use crate::config::PruningStrategy;
4971 if !matches!(
4972 self.context_manager.compression.pruning_strategy,
4973 PruningStrategy::Reactive
4974 ) {
4975 tracing::warn!(
4976 strategy = ?self.context_manager.compression.pruning_strategy,
4977 "sidequest is enabled alongside a non-Reactive pruning strategy; \
4978 consider disabling sidequest.enabled to avoid redundant eviction"
4979 );
4980 }
4981 }
4982
4983 if self.focus.is_active() {
4985 tracing::debug!("sidequest: skipping — focus session active");
4986 self.compression.pending_sidequest_result = None;
4988 return;
4989 }
4990
4991 if let Some(handle) = self.compression.pending_sidequest_result.take() {
4993 use futures::FutureExt as _;
4995 match handle.now_or_never() {
4996 Some(Ok(Some(evicted_indices))) if !evicted_indices.is_empty() => {
4997 let cursors_snapshot = self.sidequest.tool_output_cursors.clone();
4998 let freed = self.sidequest.apply_eviction(
4999 &mut self.msg.messages,
5000 &evicted_indices,
5001 &self.metrics.token_counter,
5002 );
5003 if freed > 0 {
5004 self.recompute_prompt_tokens();
5005 self.context_manager.compaction =
5008 crate::agent::context_manager::CompactionState::CompactedThisTurn {
5009 cooldown: 0,
5010 };
5011 tracing::info!(
5012 freed_tokens = freed,
5013 evicted_cursors = evicted_indices.len(),
5014 pass = self.sidequest.passes_run,
5015 "sidequest eviction complete"
5016 );
5017 if let Some(ref d) = self.debug_state.debug_dumper {
5018 d.dump_sidequest_eviction(&cursors_snapshot, &evicted_indices, freed);
5019 }
5020 if let Some(ref tx) = self.session.status_tx {
5021 let _ = tx.send(format!("SideQuest evicted {freed} tokens"));
5022 }
5023 } else {
5024 if let Some(ref tx) = self.session.status_tx {
5026 let _ = tx.send(String::new());
5027 }
5028 }
5029 }
5030 Some(Ok(None | Some(_))) => {
5031 tracing::debug!("sidequest: pending result: no cursors to evict");
5032 if let Some(ref tx) = self.session.status_tx {
5033 let _ = tx.send(String::new());
5034 }
5035 }
5036 Some(Err(e)) => {
5037 tracing::debug!("sidequest: background task panicked: {e}");
5038 if let Some(ref tx) = self.session.status_tx {
5039 let _ = tx.send(String::new());
5040 }
5041 }
5042 None => {
5043 tracing::debug!(
5047 "sidequest: background LLM task not yet complete, rescheduling"
5048 );
5049 }
5050 }
5051 }
5052
5053 self.sidequest
5055 .rebuild_cursors(&self.msg.messages, &self.metrics.token_counter);
5056
5057 if self.sidequest.tool_output_cursors.is_empty() {
5058 tracing::debug!("sidequest: no eligible cursors");
5059 return;
5060 }
5061
5062 let prompt = self.sidequest.build_eviction_prompt();
5063 let max_eviction_ratio = self.sidequest.config.max_eviction_ratio;
5064 let n_cursors = self.sidequest.tool_output_cursors.len();
5065 let provider = self.summary_or_primary_provider().clone();
5067
5068 let handle = tokio::spawn(async move {
5070 let msgs = [Message {
5071 role: Role::User,
5072 content: prompt,
5073 parts: vec![],
5074 metadata: MessageMetadata::default(),
5075 }];
5076 let response =
5077 match tokio::time::timeout(std::time::Duration::from_secs(5), provider.chat(&msgs))
5078 .await
5079 {
5080 Ok(Ok(r)) => r,
5081 Ok(Err(e)) => {
5082 tracing::debug!("sidequest bg: LLM call failed: {e:#}");
5083 return None;
5084 }
5085 Err(_) => {
5086 tracing::debug!("sidequest bg: LLM call timed out");
5087 return None;
5088 }
5089 };
5090
5091 let start = response.find('{')?;
5092 let end = response.rfind('}')?;
5093 if start > end {
5094 return None;
5095 }
5096 let json_slice = &response[start..=end];
5097 let parsed: sidequest::EvictionResponse = serde_json::from_str(json_slice).ok()?;
5098 let mut valid: Vec<usize> = parsed
5099 .del_cursors
5100 .into_iter()
5101 .filter(|&c| c < n_cursors)
5102 .collect();
5103 valid.sort_unstable();
5104 valid.dedup();
5105 #[allow(
5106 clippy::cast_precision_loss,
5107 clippy::cast_possible_truncation,
5108 clippy::cast_sign_loss
5109 )]
5110 let max_evict = ((n_cursors as f32) * max_eviction_ratio).ceil() as usize;
5111 valid.truncate(max_evict);
5112 Some(valid)
5113 });
5114
5115 self.compression.pending_sidequest_result = Some(handle);
5116 tracing::debug!("sidequest: background LLM eviction task spawned");
5117 if let Some(ref tx) = self.session.status_tx {
5118 let _ = tx.send("SideQuest: scoring tool outputs...".into());
5119 }
5120 }
5121
5122 pub(crate) async fn check_cwd_changed(&mut self) {
5128 let current = match std::env::current_dir() {
5129 Ok(p) => p,
5130 Err(e) => {
5131 tracing::warn!("check_cwd_changed: failed to get cwd: {e}");
5132 return;
5133 }
5134 };
5135 if current == self.lifecycle.last_known_cwd {
5136 return;
5137 }
5138 let old_cwd = std::mem::replace(&mut self.lifecycle.last_known_cwd, current.clone());
5139 self.session.env_context.working_dir = current.display().to_string();
5140
5141 tracing::info!(
5142 old = %old_cwd.display(),
5143 new = %current.display(),
5144 "working directory changed"
5145 );
5146
5147 let _ = self
5148 .channel
5149 .send_status("Working directory changed\u{2026}")
5150 .await;
5151
5152 let hooks = self.session.hooks_config.cwd_changed.clone();
5153 if !hooks.is_empty() {
5154 let mut env = std::collections::HashMap::new();
5155 env.insert("ZEPH_OLD_CWD".to_owned(), old_cwd.display().to_string());
5156 env.insert("ZEPH_NEW_CWD".to_owned(), current.display().to_string());
5157 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env).await {
5158 tracing::warn!(error = %e, "CwdChanged hook failed");
5159 }
5160 }
5161
5162 let _ = self.channel.send_status("").await;
5163 }
5164
5165 pub(crate) async fn handle_file_changed(
5167 &mut self,
5168 event: crate::file_watcher::FileChangedEvent,
5169 ) {
5170 tracing::info!(path = %event.path.display(), "file changed");
5171
5172 let _ = self
5173 .channel
5174 .send_status("Running file-change hook\u{2026}")
5175 .await;
5176
5177 let hooks = self.session.hooks_config.file_changed_hooks.clone();
5178 if !hooks.is_empty() {
5179 let mut env = std::collections::HashMap::new();
5180 env.insert(
5181 "ZEPH_CHANGED_PATH".to_owned(),
5182 event.path.display().to_string(),
5183 );
5184 if let Err(e) = zeph_subagent::hooks::fire_hooks(&hooks, &env).await {
5185 tracing::warn!(error = %e, "FileChanged hook failed");
5186 }
5187 }
5188
5189 let _ = self.channel.send_status("").await;
5190 }
5191}
5192pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
5193 while !*rx.borrow_and_update() {
5194 if rx.changed().await.is_err() {
5195 std::future::pending::<()>().await;
5196 }
5197 }
5198}
5199
5200fn feedback_verdict_into_signal(
5204 verdict: &zeph_llm::classifier::llm::FeedbackVerdict,
5205 user_message: &str,
5206) -> Option<feedback_detector::CorrectionSignal> {
5207 if !verdict.is_correction {
5208 return None;
5209 }
5210 let confidence = verdict.confidence.clamp(0.0, 1.0);
5211 let kind_raw = verdict.kind.trim().to_lowercase().replace(' ', "_");
5212 let kind = match kind_raw.as_str() {
5213 "explicit_rejection" => feedback_detector::CorrectionKind::ExplicitRejection,
5214 "alternative_request" => feedback_detector::CorrectionKind::AlternativeRequest,
5215 "repetition" => feedback_detector::CorrectionKind::Repetition,
5216 "self_correction" => feedback_detector::CorrectionKind::SelfCorrection,
5217 other => {
5218 tracing::warn!(
5219 kind = other,
5220 "llm-classifier returned unknown correction kind, discarding"
5221 );
5222 return None;
5223 }
5224 };
5225 Some(feedback_detector::CorrectionSignal {
5226 confidence,
5227 kind,
5228 feedback_text: user_message.to_owned(),
5229 })
5230}
5231
5232async fn store_correction_in_memory(
5234 memory: Option<std::sync::Arc<zeph_memory::semantic::SemanticMemory>>,
5235 conv_id: Option<zeph_memory::ConversationId>,
5236 assistant_snippet: &str,
5237 user_msg: &str,
5238 skill_name: String,
5239 kind_str: &str,
5240) {
5241 let Some(mem) = memory else { return };
5242 let correction_text = context::truncate_chars(user_msg, 500);
5243 match mem
5244 .sqlite()
5245 .store_user_correction(
5246 conv_id.map(|c| c.0),
5247 assistant_snippet,
5248 &correction_text,
5249 if skill_name.is_empty() {
5250 None
5251 } else {
5252 Some(skill_name.as_str())
5253 },
5254 kind_str,
5255 )
5256 .await
5257 {
5258 Ok(correction_id) => {
5259 if let Err(e) = mem
5260 .store_correction_embedding(correction_id, &correction_text)
5261 .await
5262 {
5263 tracing::warn!("failed to store correction embedding: {e:#}");
5264 }
5265 }
5266 Err(e) => {
5267 tracing::warn!("failed to store judge correction: {e:#}");
5268 }
5269 }
5270}
5271
5272pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
5273 match rx {
5274 Some(inner) => {
5275 if let Some(v) = inner.recv().await {
5276 Some(v)
5277 } else {
5278 *rx = None;
5279 std::future::pending().await
5280 }
5281 }
5282 None => std::future::pending().await,
5283 }
5284}
5285
5286#[cfg(test)]
5287mod tests;
5288
5289#[cfg(test)]
5290pub(crate) use tests::agent_tests;