1mod builder;
5#[cfg(feature = "compression-guidelines")]
6pub(super) mod compression_feedback;
7mod context;
8pub(crate) mod context_manager;
9pub mod error;
10#[cfg(feature = "experiments")]
11mod experiment_cmd;
12pub(super) mod feedback_detector;
13mod graph_commands;
14mod index;
15mod learning;
16pub(crate) mod learning_engine;
17mod log_commands;
18#[cfg(feature = "lsp-context")]
19mod lsp_commands;
20mod mcp;
21mod message_queue;
22mod persistence;
23pub(crate) mod rate_limiter;
24#[cfg(feature = "scheduler")]
25mod scheduler_commands;
26mod skill_management;
27pub mod slash_commands;
28pub(crate) mod tool_execution;
29pub(crate) mod tool_orchestrator;
30mod trust_commands;
31mod utils;
32
33use std::collections::VecDeque;
34use std::path::PathBuf;
35use std::time::Instant;
36
37use std::sync::Arc;
38
39use tokio::sync::{Notify, mpsc, watch};
40use tokio_util::sync::CancellationToken;
41use zeph_llm::any::AnyProvider;
42use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
43use zeph_llm::stt::SpeechToText;
44
45use crate::metrics::MetricsSnapshot;
46use std::collections::HashMap;
47use zeph_memory::TokenCounter;
48use zeph_memory::semantic::SemanticMemory;
49use zeph_skills::loader::Skill;
50use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
51use zeph_skills::prompt::format_skills_prompt;
52use zeph_skills::registry::SkillRegistry;
53use zeph_skills::watcher::SkillEvent;
54use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
55
56use crate::channel::Channel;
57use crate::config::Config;
58use crate::config::{SecurityConfig, SkillPromptMode, TimeoutConfig};
59use crate::config_watcher::ConfigEvent;
60use crate::context::{
61 ContextBudget, EnvironmentContext, build_system_prompt, build_system_prompt_with_instructions,
62};
63use crate::cost::CostTracker;
64use crate::instructions::{InstructionBlock, InstructionEvent, InstructionReloadState};
65use crate::sanitizer::ContentSanitizer;
66use crate::sanitizer::quarantine::QuarantinedSummarizer;
67use crate::vault::Secret;
68
69use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, QueuedMessage, detect_image_mime};
70
71pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
72pub(crate) const DOCUMENT_RAG_PREFIX: &str = "## Relevant documents\n";
73pub(crate) const RECALL_PREFIX: &str = "[semantic recall]\n";
74pub(crate) const CODE_CONTEXT_PREFIX: &str = "[code context]\n";
75pub(crate) const SUMMARY_PREFIX: &str = "[conversation summaries]\n";
76pub(crate) const CROSS_SESSION_PREFIX: &str = "[cross-session context]\n";
77pub(crate) const CORRECTIONS_PREFIX: &str = "[past corrections]\n";
78pub(crate) const GRAPH_FACTS_PREFIX: &str = "[known facts]\n";
79#[cfg(feature = "lsp-context")]
84pub(crate) const LSP_NOTE_PREFIX: &str = "[lsp ";
85pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
86
87fn format_plan_summary(graph: &crate::orchestration::TaskGraph) -> String {
88 use std::fmt::Write;
89 let mut out = String::new();
90 let _ = writeln!(out, "Plan: \"{}\"", graph.goal);
91 let _ = writeln!(out, "Tasks: {}", graph.tasks.len());
92 let _ = writeln!(out);
93 for (i, task) in graph.tasks.iter().enumerate() {
94 let deps = if task.depends_on.is_empty() {
95 String::new()
96 } else {
97 let ids: Vec<String> = task.depends_on.iter().map(ToString::to_string).collect();
98 format!(" (after: {})", ids.join(", "))
99 };
100 let agent = task.agent_hint.as_deref().unwrap_or("-");
101 let _ = writeln!(out, " {}. [{}] {}{}", i + 1, agent, task.title, deps);
102 }
103 out
104}
105
106pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
107 use std::fmt::Write;
108 let capacity = "[tool output: ".len()
109 + tool_name.len()
110 + "]\n```\n".len()
111 + body.len()
112 + TOOL_OUTPUT_SUFFIX.len();
113 let mut buf = String::with_capacity(capacity);
114 let _ = write!(
115 buf,
116 "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
117 );
118 buf
119}
120
121pub(super) struct MemoryState {
122 pub(super) memory: Option<Arc<SemanticMemory>>,
123 pub(super) conversation_id: Option<zeph_memory::ConversationId>,
124 pub(super) history_limit: u32,
125 pub(super) recall_limit: usize,
126 pub(super) summarization_threshold: usize,
127 pub(super) cross_session_score_threshold: f32,
128 pub(super) autosave_assistant: bool,
129 pub(super) autosave_min_length: usize,
130 pub(super) tool_call_cutoff: usize,
131 pub(super) unsummarized_count: usize,
132 pub(super) document_config: crate::config::DocumentConfig,
133 pub(super) graph_config: crate::config::GraphConfig,
134 pub(super) compression_guidelines_config: zeph_memory::CompressionGuidelinesConfig,
135}
136
137pub(super) struct SkillState {
138 pub(super) registry: std::sync::Arc<std::sync::RwLock<SkillRegistry>>,
139 pub(super) skill_paths: Vec<PathBuf>,
140 pub(super) managed_dir: Option<PathBuf>,
141 pub(super) trust_config: crate::config::TrustConfig,
142 pub(super) matcher: Option<SkillMatcherBackend>,
143 pub(super) max_active_skills: usize,
144 pub(super) disambiguation_threshold: f32,
145 pub(super) embedding_model: String,
146 pub(super) skill_reload_rx: Option<mpsc::Receiver<SkillEvent>>,
147 pub(super) active_skill_names: Vec<String>,
148 pub(super) last_skills_prompt: String,
149 pub(super) prompt_mode: SkillPromptMode,
150 pub(super) available_custom_secrets: HashMap<String, Secret>,
152 pub(super) cosine_weight: f32,
153 pub(super) hybrid_search: bool,
154 pub(super) bm25_index: Option<zeph_skills::bm25::Bm25Index>,
155}
156
157pub(super) struct McpState {
158 pub(super) tools: Vec<zeph_mcp::McpTool>,
159 pub(super) registry: Option<zeph_mcp::McpToolRegistry>,
160 pub(super) manager: Option<std::sync::Arc<zeph_mcp::McpManager>>,
161 pub(super) allowed_commands: Vec<String>,
162 pub(super) max_dynamic: usize,
163 pub(super) shared_tools: Option<std::sync::Arc<std::sync::RwLock<Vec<zeph_mcp::McpTool>>>>,
165 pub(super) tool_rx: Option<tokio::sync::watch::Receiver<Vec<zeph_mcp::McpTool>>>,
167}
168
169pub(super) struct IndexState {
170 pub(super) retriever: Option<std::sync::Arc<zeph_index::retriever::CodeRetriever>>,
171 pub(super) repo_map_tokens: usize,
172 pub(super) cached_repo_map: Option<(String, std::time::Instant)>,
173 pub(super) repo_map_ttl: std::time::Duration,
174}
175
176pub(super) struct RuntimeConfig {
177 pub(super) security: SecurityConfig,
178 pub(super) timeouts: TimeoutConfig,
179 pub(super) model_name: String,
180 pub(super) permission_policy: zeph_tools::PermissionPolicy,
181 pub(super) redact_credentials: bool,
182}
183
184pub(super) struct SecurityState {
186 pub(super) sanitizer: ContentSanitizer,
187 pub(super) quarantine_summarizer: Option<QuarantinedSummarizer>,
188 pub(super) exfiltration_guard: crate::sanitizer::exfiltration::ExfiltrationGuard,
189 pub(super) flagged_urls: std::collections::HashSet<String>,
190 pub(super) pii_filter: crate::sanitizer::pii::PiiFilter,
191 pub(super) memory_validator: crate::sanitizer::memory_validation::MemoryWriteValidator,
192}
193
194pub(super) struct DebugState {
196 pub(super) debug_dumper: Option<crate::debug_dump::DebugDumper>,
197 pub(super) dump_format: crate::debug_dump::DumpFormat,
198 pub(super) trace_collector: Option<crate::debug_dump::trace::TracingCollector>,
199 pub(super) iteration_counter: usize,
202 pub(super) anomaly_detector: Option<zeph_tools::AnomalyDetector>,
203 pub(super) logging_config: crate::config::LoggingConfig,
204 pub(super) dump_dir: Option<PathBuf>,
206 pub(super) trace_service_name: String,
208 pub(super) trace_redact: bool,
210 pub(super) current_iteration_span_id: Option<[u8; 8]>,
213}
214
215pub(super) struct LifecycleState {
217 pub(super) shutdown: watch::Receiver<bool>,
218 pub(super) start_time: Instant,
219 pub(super) cancel_signal: Arc<Notify>,
220 pub(super) cancel_token: CancellationToken,
221 pub(super) config_path: Option<PathBuf>,
222 pub(super) config_reload_rx: Option<mpsc::Receiver<ConfigEvent>>,
223 pub(super) warmup_ready: Option<watch::Receiver<bool>>,
224 pub(super) update_notify_rx: Option<mpsc::Receiver<String>>,
225 pub(super) custom_task_rx: Option<mpsc::Receiver<String>>,
226}
227
228pub(super) struct ProviderState {
230 pub(super) summary_provider: Option<AnyProvider>,
231 pub(super) provider_override: Option<Arc<std::sync::RwLock<Option<AnyProvider>>>>,
233 pub(super) judge_provider: Option<AnyProvider>,
234 pub(super) cached_prompt_tokens: u64,
235 pub(super) server_compaction_active: bool,
238 pub(super) stt: Option<Box<dyn SpeechToText>>,
239}
240
241pub(super) struct MetricsState {
243 pub(super) metrics_tx: Option<watch::Sender<MetricsSnapshot>>,
244 pub(super) cost_tracker: Option<CostTracker>,
245 pub(super) token_counter: Arc<TokenCounter>,
246 pub(super) extended_context: bool,
249}
250
251pub(super) struct OrchestrationState {
253 pub(super) pending_graph: Option<crate::orchestration::TaskGraph>,
255 pub(super) plan_cancel_token: Option<CancellationToken>,
264 pub(super) subagent_manager: Option<crate::subagent::SubAgentManager>,
266 pub(super) subagent_config: crate::config::SubAgentConfig,
267 pub(super) orchestration_config: crate::config::OrchestrationConfig,
268}
269
270pub struct Agent<C: Channel> {
271 provider: AnyProvider,
272 channel: C,
273 pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
274 messages: Vec<Message>,
275 pub(super) memory_state: MemoryState,
276 pub(super) skill_state: SkillState,
277 pub(super) context_manager: context_manager::ContextManager,
278 pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
279 pub(super) learning_engine: learning_engine::LearningEngine,
280 pub(super) feedback_detector: feedback_detector::FeedbackDetector,
281 pub(super) judge_detector: Option<feedback_detector::JudgeDetector>,
282 pub(super) runtime: RuntimeConfig,
283 pub(super) mcp: McpState,
284 pub(super) index: IndexState,
285 message_queue: VecDeque<QueuedMessage>,
286 env_context: EnvironmentContext,
287 pub(super) response_cache: Option<std::sync::Arc<zeph_memory::ResponseCache>>,
288 pub(crate) parent_tool_use_id: Option<String>,
292 pub(super) debug_state: DebugState,
293 pub(super) instruction_blocks: Vec<InstructionBlock>,
295 pub(super) instruction_reload_rx: Option<mpsc::Receiver<InstructionEvent>>,
296 pub(super) instruction_reload_state: Option<InstructionReloadState>,
297 pub(super) security: SecurityState,
298 pending_image_parts: Vec<zeph_llm::provider::MessagePart>,
300 #[cfg(feature = "experiments")]
301 pub(super) experiment_config: crate::config::ExperimentConfig,
302 #[cfg(feature = "experiments")]
304 pub(super) experiment_cancel: Option<tokio_util::sync::CancellationToken>,
305 #[cfg(feature = "experiments")]
308 pub(super) experiment_baseline: crate::experiments::ConfigSnapshot,
309 pub(super) experiment_notify_rx: Option<tokio::sync::mpsc::Receiver<String>>,
314 #[cfg(feature = "experiments")]
317 pub(super) experiment_notify_tx: tokio::sync::mpsc::Sender<String>,
318 #[cfg(feature = "lsp-context")]
321 pub(super) lsp_hooks: Option<crate::lsp_hooks::LspHookRunner>,
322 pub(super) lifecycle: LifecycleState,
324 pub(super) providers: ProviderState,
325 pub(super) metrics: MetricsState,
326 pub(super) orchestration: OrchestrationState,
327 pub(super) rate_limiter: rate_limiter::ToolRateLimiter,
328}
329
330impl<C: Channel> Agent<C> {
331 #[must_use]
332 pub fn new(
333 provider: AnyProvider,
334 channel: C,
335 registry: SkillRegistry,
336 matcher: Option<SkillMatcherBackend>,
337 max_active_skills: usize,
338 tool_executor: impl ToolExecutor + 'static,
339 ) -> Self {
340 let registry = std::sync::Arc::new(std::sync::RwLock::new(registry));
341 Self::new_with_registry_arc(
342 provider,
343 channel,
344 registry,
345 matcher,
346 max_active_skills,
347 tool_executor,
348 )
349 }
350
351 #[must_use]
358 #[allow(clippy::too_many_lines)] pub fn new_with_registry_arc(
360 provider: AnyProvider,
361 channel: C,
362 registry: std::sync::Arc<std::sync::RwLock<SkillRegistry>>,
363 matcher: Option<SkillMatcherBackend>,
364 max_active_skills: usize,
365 tool_executor: impl ToolExecutor + 'static,
366 ) -> Self {
367 let all_skills: Vec<Skill> = {
368 let reg = registry.read().expect("registry read lock poisoned");
369 reg.all_meta()
370 .iter()
371 .filter_map(|m| reg.get_skill(&m.name).ok())
372 .collect()
373 };
374 let empty_trust = HashMap::new();
375 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
376 let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
377 let system_prompt = build_system_prompt(&skills_prompt, None, None, false);
378 tracing::debug!(len = system_prompt.len(), "initial system prompt built");
379 tracing::trace!(prompt = %system_prompt, "full system prompt");
380
381 let initial_prompt_tokens = u64::try_from(system_prompt.len()).unwrap_or(0) / 4;
382 let (_tx, rx) = watch::channel(false);
383 let token_counter = Arc::new(TokenCounter::new());
384 #[cfg(feature = "experiments")]
388 let (exp_notify_tx, exp_notify_rx) = tokio::sync::mpsc::channel::<String>(4);
389 #[cfg(not(feature = "experiments"))]
390 let (_exp_notify_tx, exp_notify_rx) = tokio::sync::mpsc::channel::<String>(4);
391 Self {
392 provider,
393 channel,
394 tool_executor: Arc::new(tool_executor),
395 messages: vec![Message {
396 role: Role::System,
397 content: system_prompt,
398 parts: vec![],
399 metadata: MessageMetadata::default(),
400 }],
401 memory_state: MemoryState {
402 memory: None,
403 conversation_id: None,
404 history_limit: 50,
405 recall_limit: 5,
406 summarization_threshold: 50,
407 cross_session_score_threshold: 0.35,
408 autosave_assistant: false,
409 autosave_min_length: 20,
410 tool_call_cutoff: 6,
411 unsummarized_count: 0,
412 document_config: crate::config::DocumentConfig::default(),
413 graph_config: crate::config::GraphConfig::default(),
414 compression_guidelines_config: zeph_memory::CompressionGuidelinesConfig::default(),
415 },
416 skill_state: SkillState {
417 registry,
418 skill_paths: Vec::new(),
419 managed_dir: None,
420 trust_config: crate::config::TrustConfig::default(),
421 matcher,
422 max_active_skills,
423 disambiguation_threshold: 0.05,
424 embedding_model: String::new(),
425 skill_reload_rx: None,
426 active_skill_names: Vec::new(),
427 last_skills_prompt: skills_prompt,
428 prompt_mode: SkillPromptMode::Auto,
429 available_custom_secrets: HashMap::new(),
430 cosine_weight: 0.7,
431 hybrid_search: false,
432 bm25_index: None,
433 },
434 context_manager: context_manager::ContextManager::new(),
435 tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
436 learning_engine: learning_engine::LearningEngine::new(),
437 feedback_detector: feedback_detector::FeedbackDetector::new(0.6),
438 judge_detector: None,
439 debug_state: DebugState {
440 debug_dumper: None,
441 dump_format: crate::debug_dump::DumpFormat::default(),
442 trace_collector: None,
443 iteration_counter: 0,
444 anomaly_detector: None,
445 logging_config: crate::config::LoggingConfig::default(),
446 dump_dir: None,
447 trace_service_name: String::new(),
448 trace_redact: true,
449 current_iteration_span_id: None,
450 },
451 runtime: RuntimeConfig {
452 security: SecurityConfig::default(),
453 timeouts: TimeoutConfig::default(),
454 model_name: String::new(),
455 permission_policy: zeph_tools::PermissionPolicy::default(),
456 redact_credentials: true,
457 },
458 mcp: McpState {
459 tools: Vec::new(),
460 registry: None,
461 manager: None,
462 allowed_commands: Vec::new(),
463 max_dynamic: 10,
464 shared_tools: None,
465 tool_rx: None,
466 },
467 index: IndexState {
468 retriever: None,
469 repo_map_tokens: 0,
470 cached_repo_map: None,
471 repo_map_ttl: std::time::Duration::from_secs(300),
472 },
473 message_queue: VecDeque::new(),
474 env_context: EnvironmentContext::gather(""),
475 response_cache: None,
476 parent_tool_use_id: None,
477 instruction_blocks: Vec::new(),
478 instruction_reload_rx: None,
479 instruction_reload_state: None,
480 security: SecurityState {
481 sanitizer: ContentSanitizer::new(
482 &crate::sanitizer::ContentIsolationConfig::default(),
483 ),
484 quarantine_summarizer: None,
485 exfiltration_guard: crate::sanitizer::exfiltration::ExfiltrationGuard::new(
486 crate::sanitizer::exfiltration::ExfiltrationGuardConfig::default(),
487 ),
488 flagged_urls: std::collections::HashSet::new(),
489 pii_filter: crate::sanitizer::pii::PiiFilter::new(
490 crate::sanitizer::pii::PiiFilterConfig::default(),
491 ),
492 memory_validator: crate::sanitizer::memory_validation::MemoryWriteValidator::new(
493 crate::sanitizer::memory_validation::MemoryWriteValidationConfig::default(),
494 ),
495 },
496 pending_image_parts: Vec::new(),
497 #[cfg(feature = "experiments")]
498 experiment_config: crate::config::ExperimentConfig::default(),
499 #[cfg(feature = "experiments")]
500 experiment_baseline: crate::experiments::ConfigSnapshot::default(),
501 experiment_notify_rx: Some(exp_notify_rx),
502 #[cfg(feature = "experiments")]
503 experiment_notify_tx: exp_notify_tx,
504 #[cfg(feature = "lsp-context")]
505 lsp_hooks: None,
506 #[cfg(feature = "experiments")]
507 experiment_cancel: None,
508 lifecycle: LifecycleState {
510 shutdown: rx,
511 start_time: Instant::now(),
512 cancel_signal: Arc::new(Notify::new()),
513 cancel_token: CancellationToken::new(),
514 config_path: None,
515 config_reload_rx: None,
516 warmup_ready: None,
517 update_notify_rx: None,
518 custom_task_rx: None,
519 },
520 providers: ProviderState {
521 summary_provider: None,
522 provider_override: None,
523 judge_provider: None,
524 cached_prompt_tokens: initial_prompt_tokens,
525 server_compaction_active: false,
526 stt: None,
527 },
528 metrics: MetricsState {
529 metrics_tx: None,
530 cost_tracker: None,
531 token_counter,
532 extended_context: false,
533 },
534 orchestration: OrchestrationState {
535 pending_graph: None,
536 plan_cancel_token: None,
537 subagent_manager: None,
538 subagent_config: crate::config::SubAgentConfig::default(),
539 orchestration_config: crate::config::OrchestrationConfig::default(),
540 },
541 rate_limiter: rate_limiter::ToolRateLimiter::new(
542 crate::agent::rate_limiter::RateLimitConfig::default(),
543 ),
544 }
545 }
546
547 pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
552 let Some(mgr) = &mut self.orchestration.subagent_manager else {
553 return vec![];
554 };
555
556 let finished: Vec<String> = mgr
557 .statuses()
558 .into_iter()
559 .filter_map(|(id, status)| {
560 if matches!(
561 status.state,
562 crate::subagent::SubAgentState::Completed
563 | crate::subagent::SubAgentState::Failed
564 | crate::subagent::SubAgentState::Canceled
565 ) {
566 Some(id)
567 } else {
568 None
569 }
570 })
571 .collect();
572
573 let mut results = vec![];
574 for task_id in finished {
575 match mgr.collect(&task_id).await {
576 Ok(result) => results.push((task_id, result)),
577 Err(e) => {
578 tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
579 }
580 }
581 }
582 results
583 }
584
585 async fn handle_plan_command(
586 &mut self,
587 cmd: crate::orchestration::PlanCommand,
588 ) -> Result<(), error::AgentError> {
589 use crate::orchestration::PlanCommand;
590
591 if !self.config_for_orchestration().enabled {
592 self.channel
593 .send(
594 "Task orchestration is disabled. Set `orchestration.enabled = true` in config.",
595 )
596 .await?;
597 return Ok(());
598 }
599
600 match cmd {
601 PlanCommand::Goal(goal) => self.handle_plan_goal(&goal).await,
602 PlanCommand::Confirm => self.handle_plan_confirm().await,
603 PlanCommand::Status(id) => self.handle_plan_status(id.as_deref()).await,
604 PlanCommand::List => self.handle_plan_list().await,
605 PlanCommand::Cancel(id) => self.handle_plan_cancel(id.as_deref()).await,
606 PlanCommand::Resume(id) => self.handle_plan_resume(id.as_deref()).await,
607 PlanCommand::Retry(id) => self.handle_plan_retry(id.as_deref()).await,
608 }
609 }
610
611 fn config_for_orchestration(&self) -> &crate::config::OrchestrationConfig {
612 &self.orchestration.orchestration_config
613 }
614
615 async fn handle_plan_goal(&mut self, goal: &str) -> Result<(), error::AgentError> {
616 use crate::orchestration::{LlmPlanner, Planner};
617
618 if self.orchestration.pending_graph.is_some() {
619 self.channel
620 .send(
621 "A plan is already pending confirmation. \
622 Use /plan confirm to execute it or /plan cancel to discard.",
623 )
624 .await?;
625 return Ok(());
626 }
627
628 self.channel.send("Planning task decomposition...").await?;
629
630 let available_agents = self
631 .orchestration
632 .subagent_manager
633 .as_ref()
634 .map(|m| m.definitions().to_vec())
635 .unwrap_or_default();
636
637 let confirm_before_execute = self
638 .orchestration
639 .orchestration_config
640 .confirm_before_execute;
641 let graph = LlmPlanner::new(
642 self.provider.clone(),
643 &self.orchestration.orchestration_config,
644 )
645 .plan(goal, &available_agents)
646 .await
647 .map_err(|e| error::AgentError::Other(e.to_string()))?;
648
649 let task_count = graph.tasks.len() as u64;
650 let snapshot = crate::metrics::TaskGraphSnapshot::from(&graph);
651 self.update_metrics(|m| {
652 m.orchestration.plans_total += 1;
653 m.orchestration.tasks_total += task_count;
654 m.orchestration_graph = Some(snapshot);
655 });
656
657 if confirm_before_execute {
658 let summary = format_plan_summary(&graph);
659 self.channel.send(&summary).await?;
660 self.channel
661 .send("Type `/plan confirm` to execute, or `/plan cancel` to abort.")
662 .await?;
663 self.orchestration.pending_graph = Some(graph);
664 } else {
665 let summary = format_plan_summary(&graph);
668 self.channel.send(&summary).await?;
669 self.channel
670 .send("Plan ready. Full execution will be available in a future phase.")
671 .await?;
672 let now = std::time::Instant::now();
674 self.update_metrics(|m| {
675 if let Some(ref mut s) = m.orchestration_graph {
676 "completed".clone_into(&mut s.status);
677 s.completed_at = Some(now);
678 }
679 });
680 }
681
682 Ok(())
683 }
684
685 async fn validate_pending_graph(
691 &mut self,
692 graph: crate::orchestration::TaskGraph,
693 ) -> Result<crate::orchestration::TaskGraph, ()> {
694 use crate::orchestration::GraphStatus;
695
696 if self.orchestration.subagent_manager.is_none() {
697 let _ = self
698 .channel
699 .send(
700 "No sub-agents configured. Add sub-agent definitions to config \
701 to enable plan execution.",
702 )
703 .await;
704 self.orchestration.pending_graph = Some(graph);
705 return Err(());
706 }
707
708 if graph.tasks.is_empty() {
711 let _ = self.channel.send("Plan has no tasks.").await;
712 self.orchestration.pending_graph = Some(graph);
713 return Err(());
714 }
715
716 if matches!(graph.status, GraphStatus::Completed | GraphStatus::Canceled) {
718 let _ = self
719 .channel
720 .send(&format!(
721 "Cannot re-execute a {} plan. Use `/plan <goal>` to create a new one.",
722 graph.status
723 ))
724 .await;
725 self.orchestration.pending_graph = Some(graph);
726 return Err(());
727 }
728
729 Ok(graph)
730 }
731
732 fn build_dag_scheduler(
737 &mut self,
738 graph: crate::orchestration::TaskGraph,
739 ) -> Result<(crate::orchestration::DagScheduler, usize), error::AgentError> {
740 use crate::orchestration::{DagScheduler, GraphStatus, RuleBasedRouter};
741
742 let available_agents = self
743 .orchestration
744 .subagent_manager
745 .as_ref()
746 .map(|m| m.definitions().to_vec())
747 .unwrap_or_default();
748
749 let max_concurrent = self.orchestration.subagent_config.max_concurrent;
753 let max_parallel = self.orchestration.orchestration_config.max_parallel as usize;
754 if max_concurrent < max_parallel + 1 {
755 tracing::warn!(
756 max_concurrent,
757 max_parallel,
758 "max_concurrent < max_parallel + 1: orchestration tasks may be starved by \
759 planning-phase sub-agents; recommend setting max_concurrent >= {}",
760 max_parallel + 1
761 );
762 }
763
764 let reserved = max_parallel.min(max_concurrent.saturating_sub(1));
767 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
768 mgr.reserve_slots(reserved);
769 }
770
771 let scheduler = if graph.status == GraphStatus::Created {
774 DagScheduler::new(
775 graph,
776 &self.orchestration.orchestration_config,
777 Box::new(RuleBasedRouter),
778 available_agents,
779 )
780 } else {
781 DagScheduler::resume_from(
782 graph,
783 &self.orchestration.orchestration_config,
784 Box::new(RuleBasedRouter),
785 available_agents,
786 )
787 }
788 .map_err(|e| {
789 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
791 mgr.release_reservation(reserved);
792 }
793 error::AgentError::Other(e.to_string())
794 })?;
795
796 Ok((scheduler, reserved))
797 }
798
799 async fn handle_plan_confirm(&mut self) -> Result<(), error::AgentError> {
800 let Some(graph) = self.orchestration.pending_graph.take() else {
801 self.channel
802 .send("No pending plan to confirm. Use `/plan <goal>` to create one.")
803 .await?;
804 return Ok(());
805 };
806
807 let Ok(graph) = self.validate_pending_graph(graph).await else {
809 return Ok(());
810 };
811
812 let (mut scheduler, reserved) = self.build_dag_scheduler(graph)?;
813
814 let task_count = scheduler.graph().tasks.len();
815 self.channel
816 .send(&format!(
817 "Confirmed. Executing plan ({task_count} tasks)..."
818 ))
819 .await?;
820
821 let plan_token = CancellationToken::new();
822 self.orchestration.plan_cancel_token = Some(plan_token.clone());
823
824 let scheduler_result = self
826 .run_scheduler_loop(&mut scheduler, task_count, plan_token)
827 .await;
828 self.orchestration.plan_cancel_token = None;
829
830 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
832 mgr.release_reservation(reserved);
833 }
834
835 let final_status = scheduler_result?;
836 let completed_graph = scheduler.into_graph();
837
838 let snapshot = crate::metrics::TaskGraphSnapshot::from(&completed_graph);
840 self.update_metrics(|m| {
841 m.orchestration_graph = Some(snapshot);
842 });
843
844 let result_label = self
845 .finalize_plan_execution(completed_graph, final_status)
846 .await?;
847
848 let now = std::time::Instant::now();
849 self.update_metrics(|m| {
850 if let Some(ref mut s) = m.orchestration_graph {
851 result_label.clone_into(&mut s.status);
852 s.completed_at = Some(now);
853 }
854 });
855 Ok(())
856 }
857
858 fn cancel_agents_from_actions(
862 &mut self,
863 cancel_actions: Vec<crate::orchestration::SchedulerAction>,
864 ) -> Option<crate::orchestration::GraphStatus> {
865 use crate::orchestration::SchedulerAction;
866 for action in cancel_actions {
867 match action {
868 SchedulerAction::Cancel { agent_handle_id } => {
869 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
870 let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
871 tracing::trace!(error = %e, "cancel: agent already gone");
872 });
873 }
874 }
875 SchedulerAction::Done { status } => return Some(status),
876 SchedulerAction::Spawn { .. } | SchedulerAction::RunInline { .. } => {}
877 }
878 }
879 None
880 }
881
882 async fn handle_scheduler_spawn_action(
887 &mut self,
888 scheduler: &mut crate::orchestration::DagScheduler,
889 task_id: crate::orchestration::TaskId,
890 agent_def_name: String,
891 prompt: String,
892 spawn_counter: &mut usize,
893 task_count: usize,
894 ) -> (bool, bool, Option<crate::orchestration::GraphStatus>) {
895 let task_title = scheduler
896 .graph()
897 .tasks
898 .get(task_id.index())
899 .map_or("unknown", |t| t.title.as_str());
900
901 let provider = self.provider.clone();
902 let tool_executor = Arc::clone(&self.tool_executor);
903 let skills = self.filtered_skills_for(&agent_def_name);
904 let cfg = self.orchestration.subagent_config.clone();
905 let event_tx = scheduler.event_sender();
906
907 let mgr = self
908 .orchestration
909 .subagent_manager
910 .as_mut()
911 .expect("subagent_manager checked above");
912
913 match mgr.spawn_for_task(
914 &agent_def_name,
915 &prompt,
916 provider,
917 tool_executor,
918 skills,
919 &cfg,
920 task_id,
921 event_tx,
922 ) {
923 Ok(handle_id) => {
924 *spawn_counter += 1;
925 let _ = self
926 .channel
927 .send_status(&format!(
928 "Executing task {spawn_counter}/{task_count}: {task_title}..."
929 ))
930 .await;
931 scheduler.record_spawn(task_id, handle_id, agent_def_name);
932 (true, false, None)
933 }
934 Err(e) => {
935 tracing::error!(error = %e, %task_id, "spawn_for_task failed");
936 let concurrency_fail =
937 matches!(e, crate::subagent::SubAgentError::ConcurrencyLimit { .. });
938 let extra = scheduler.record_spawn_failure(task_id, &e);
939 let done_status = self.cancel_agents_from_actions(extra);
940 (false, concurrency_fail, done_status)
941 }
942 }
943 }
944
945 async fn handle_run_inline_action(
950 &mut self,
951 scheduler: &mut crate::orchestration::DagScheduler,
952 task_id: crate::orchestration::TaskId,
953 prompt: String,
954 spawn_counter: usize,
955 task_count: usize,
956 cancel_token: &CancellationToken,
957 ) {
958 let task_title = scheduler
959 .graph()
960 .tasks
961 .get(task_id.index())
962 .map_or("unknown", |t| t.title.as_str());
963 let _ = self
964 .channel
965 .send_status(&format!(
966 "Executing task {spawn_counter}/{task_count} (inline): {task_title}..."
967 ))
968 .await;
969
970 let handle_id = format!("__inline_{task_id}__");
973 scheduler.record_spawn(task_id, handle_id.clone(), "__main__".to_string());
974
975 let event_tx = scheduler.event_sender();
976 let max_iter = self.tool_orchestrator.max_iterations;
977 let outcome = tokio::select! {
978 result = self.run_inline_tool_loop(&prompt, max_iter) => {
979 match result {
980 Ok(output) => crate::orchestration::TaskOutcome::Completed {
981 output,
982 artifacts: vec![],
983 },
984 Err(e) => crate::orchestration::TaskOutcome::Failed {
985 error: e.to_string(),
986 },
987 }
988 }
989 () = cancel_token.cancelled() => {
990 crate::orchestration::TaskOutcome::Failed {
992 error: "canceled".to_string(),
993 }
994 }
995 };
996 let event = crate::orchestration::TaskEvent {
997 task_id,
998 agent_handle_id: handle_id,
999 outcome,
1000 };
1001 if let Err(e) = event_tx.send(event).await {
1002 tracing::warn!(%task_id, error = %e, "inline task event send failed");
1003 }
1004 }
1005
1006 #[allow(clippy::too_many_lines)]
1011 async fn run_scheduler_loop(
1027 &mut self,
1028 scheduler: &mut crate::orchestration::DagScheduler,
1029 task_count: usize,
1030 cancel_token: CancellationToken,
1031 ) -> Result<crate::orchestration::GraphStatus, error::AgentError> {
1032 use crate::orchestration::SchedulerAction;
1033
1034 let mut spawn_counter: usize = 0;
1038
1039 let mut denied_secrets: std::collections::HashSet<(String, String)> =
1042 std::collections::HashSet::new();
1043
1044 let final_status = 'tick: loop {
1045 let actions = scheduler.tick();
1046
1047 let mut any_spawn_success = false;
1049 let mut any_concurrency_failure = false;
1050
1051 for action in actions {
1052 match action {
1053 SchedulerAction::Spawn {
1054 task_id,
1055 agent_def_name,
1056 prompt,
1057 } => {
1058 let (success, fail, done) = self
1059 .handle_scheduler_spawn_action(
1060 scheduler,
1061 task_id,
1062 agent_def_name,
1063 prompt,
1064 &mut spawn_counter,
1065 task_count,
1066 )
1067 .await;
1068 any_spawn_success |= success;
1069 any_concurrency_failure |= fail;
1070 if let Some(s) = done {
1071 break 'tick s;
1072 }
1073 }
1074 SchedulerAction::Cancel { agent_handle_id } => {
1075 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1076 let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1078 tracing::trace!(error = %e, "cancel: agent already gone");
1079 });
1080 }
1081 }
1082 SchedulerAction::RunInline { task_id, prompt } => {
1093 spawn_counter += 1;
1094 self.handle_run_inline_action(
1095 scheduler,
1096 task_id,
1097 prompt,
1098 spawn_counter,
1099 task_count,
1100 &cancel_token,
1101 )
1102 .await;
1103 }
1104 SchedulerAction::Done { status } => {
1105 break 'tick status;
1106 }
1107 }
1108 }
1109
1110 scheduler.record_batch_backoff(any_spawn_success, any_concurrency_failure);
1112
1113 self.process_pending_secret_requests(&mut denied_secrets)
1115 .await;
1116
1117 let snapshot = crate::metrics::TaskGraphSnapshot::from(scheduler.graph());
1119 self.update_metrics(|m| {
1120 m.orchestration_graph = Some(snapshot);
1121 });
1122
1123 tokio::select! {
1134 biased;
1136 () = cancel_token.cancelled() => {
1137 let cancel_actions = scheduler.cancel_all();
1138 for action in cancel_actions {
1139 match action {
1140 SchedulerAction::Cancel { agent_handle_id } => {
1141 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1142 let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1143 tracing::trace!(
1144 error = %e,
1145 "cancel during plan cancellation: agent already gone"
1146 );
1147 });
1148 }
1149 }
1150 SchedulerAction::Done { status } => {
1151 break 'tick status;
1152 }
1153 SchedulerAction::Spawn { .. } | SchedulerAction::RunInline { .. } => {}
1154 }
1155 }
1156 break 'tick crate::orchestration::GraphStatus::Canceled;
1159 }
1160 () = scheduler.wait_event() => {}
1161 result = self.channel.recv() => {
1162 if let Ok(Some(msg)) = result {
1163 if msg.text.trim().eq_ignore_ascii_case("/plan cancel") {
1164 let _ = self.channel.send_status("Canceling plan...").await;
1165 let cancel_actions = scheduler.cancel_all();
1166 for ca in cancel_actions {
1167 match ca {
1168 SchedulerAction::Cancel { agent_handle_id } => {
1169 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1170 let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1172 tracing::trace!(error = %e, "cancel on user request: agent already gone");
1173 });
1174 }
1175 }
1176 SchedulerAction::Done { status } => {
1177 break 'tick status;
1178 }
1179 SchedulerAction::Spawn { .. }
1180 | SchedulerAction::RunInline { .. } => {}
1181 }
1182 }
1183 break 'tick crate::orchestration::GraphStatus::Canceled;
1186 }
1187 self.enqueue_or_merge(msg.text, vec![], msg.attachments);
1188 } else {
1189 let cancel_actions = scheduler.cancel_all();
1193 let n = cancel_actions
1194 .iter()
1195 .filter(|a| matches!(a, SchedulerAction::Cancel { .. }))
1196 .count();
1197 tracing::warn!(sub_agents = n, "scheduler channel closed, canceling running sub-agents");
1198 for action in cancel_actions {
1199 match action {
1200 SchedulerAction::Cancel { agent_handle_id } => {
1201 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1202 let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1203 tracing::trace!(
1204 error = %e,
1205 "cancel on channel close: agent already gone"
1206 );
1207 });
1208 }
1209 }
1210 SchedulerAction::Done { .. }
1212 | SchedulerAction::Spawn { .. }
1213 | SchedulerAction::RunInline { .. } => {}
1214 }
1215 }
1216 break 'tick crate::orchestration::GraphStatus::Failed;
1217 }
1218 }
1219 () = shutdown_signal(&mut self.lifecycle.shutdown) => {
1221 let cancel_actions = scheduler.cancel_all();
1222 let n = cancel_actions
1223 .iter()
1224 .filter(|a| matches!(a, SchedulerAction::Cancel { .. }))
1225 .count();
1226 tracing::warn!(sub_agents = n, "shutdown signal received, canceling running sub-agents");
1227 for action in cancel_actions {
1228 match action {
1229 SchedulerAction::Cancel { agent_handle_id } => {
1230 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1231 let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1232 tracing::trace!(
1233 error = %e,
1234 "cancel on shutdown: agent already gone"
1235 );
1236 });
1237 }
1238 }
1239 SchedulerAction::Done { status } => {
1240 break 'tick status;
1241 }
1242 SchedulerAction::Spawn { .. } | SchedulerAction::RunInline { .. } => {}
1243 }
1244 }
1245 break 'tick crate::orchestration::GraphStatus::Canceled;
1248 }
1249 }
1250 };
1251
1252 self.process_pending_secret_requests(&mut std::collections::HashSet::new())
1255 .await;
1256
1257 Ok(final_status)
1258 }
1259
1260 async fn run_inline_tool_loop(
1267 &self,
1268 prompt: &str,
1269 max_iterations: usize,
1270 ) -> Result<String, zeph_llm::LlmError> {
1271 use zeph_llm::provider::{ChatResponse, Message, MessagePart, Role, ToolDefinition};
1272 use zeph_tools::executor::ToolCall;
1273
1274 let tool_defs: Vec<ToolDefinition> = self
1275 .tool_executor
1276 .tool_definitions_erased()
1277 .iter()
1278 .map(tool_execution::tool_def_to_definition)
1279 .collect();
1280
1281 tracing::debug!(
1282 prompt_len = prompt.len(),
1283 max_iterations,
1284 tool_count = tool_defs.len(),
1285 "inline tool loop: starting"
1286 );
1287
1288 let mut messages: Vec<Message> = vec![Message::from_legacy(Role::User, prompt)];
1289 let mut last_text = String::new();
1290
1291 for iteration in 0..max_iterations {
1292 let response = self.provider.chat_with_tools(&messages, &tool_defs).await?;
1293
1294 match response {
1295 ChatResponse::Text(text) => {
1296 tracing::debug!(iteration, "inline tool loop: text response, returning");
1297 return Ok(text);
1298 }
1299 ChatResponse::ToolUse {
1300 text, tool_calls, ..
1301 } => {
1302 tracing::debug!(
1303 iteration,
1304 tools = ?tool_calls.iter().map(|tc| &tc.name).collect::<Vec<_>>(),
1305 "inline tool loop: tool use"
1306 );
1307
1308 if let Some(ref t) = text {
1309 last_text.clone_from(t);
1310 }
1311
1312 let mut parts: Vec<MessagePart> = Vec::new();
1314 if let Some(ref t) = text
1315 && !t.is_empty()
1316 {
1317 parts.push(MessagePart::Text { text: t.clone() });
1318 }
1319 for tc in &tool_calls {
1320 parts.push(MessagePart::ToolUse {
1321 id: tc.id.clone(),
1322 name: tc.name.clone(),
1323 input: tc.input.clone(),
1324 });
1325 }
1326 messages.push(Message::from_parts(Role::Assistant, parts));
1327
1328 let mut result_parts: Vec<MessagePart> = Vec::new();
1330 for tc in &tool_calls {
1331 let call = ToolCall {
1332 tool_id: tc.name.clone(),
1333 params: match &tc.input {
1334 serde_json::Value::Object(map) => map.clone(),
1335 _ => serde_json::Map::new(),
1336 },
1337 };
1338 let output = match self.tool_executor.execute_tool_call_erased(&call).await
1339 {
1340 Ok(Some(out)) => out.summary,
1341 Ok(None) => "(no output)".to_owned(),
1342 Err(e) => format!("[error] {e}"),
1343 };
1344 let is_error = output.starts_with("[error]");
1345 result_parts.push(MessagePart::ToolResult {
1346 tool_use_id: tc.id.clone(),
1347 content: output,
1348 is_error,
1349 });
1350 }
1351 messages.push(Message::from_parts(Role::User, result_parts));
1352 }
1353 }
1354 }
1355
1356 tracing::debug!(
1357 max_iterations,
1358 last_text_empty = last_text.is_empty(),
1359 "inline tool loop: iteration limit reached"
1360 );
1361 Ok(last_text)
1362 }
1363
1364 async fn process_pending_secret_requests(
1372 &mut self,
1373 denied: &mut std::collections::HashSet<(String, String)>,
1374 ) {
1375 loop {
1376 let pending = self
1377 .orchestration
1378 .subagent_manager
1379 .as_mut()
1380 .and_then(crate::subagent::SubAgentManager::try_recv_secret_request);
1381 let Some((req_handle_id, req)) = pending else {
1382 break;
1383 };
1384 let deny_key = (req_handle_id.clone(), req.secret_key.clone());
1385 if denied.contains(&deny_key) {
1386 tracing::debug!(
1387 handle_id = %req_handle_id,
1388 secret_key = %req.secret_key,
1389 "skipping duplicate secret prompt for already-denied key"
1390 );
1391 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1392 let _ = mgr.deny_secret(&req_handle_id);
1393 }
1394 continue;
1395 }
1396 let prompt = format!(
1397 "Sub-agent requests secret '{}'. Allow?{}",
1398 crate::text::truncate_to_chars(&req.secret_key, 100),
1399 req.reason
1400 .as_deref()
1401 .map(|r| format!(" Reason: {}", crate::text::truncate_to_chars(r, 200)))
1402 .unwrap_or_default()
1403 );
1404 let approved = tokio::select! {
1406 result = self.channel.confirm(&prompt) => result.unwrap_or(false),
1407 () = tokio::time::sleep(std::time::Duration::from_secs(120)) => {
1408 let _ = self.channel.send("Secret request timed out.").await;
1409 false
1410 }
1411 };
1412 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1413 if approved {
1414 let ttl = std::time::Duration::from_secs(300);
1415 let key = req.secret_key.clone();
1416 if mgr.approve_secret(&req_handle_id, &key, ttl).is_ok() {
1417 let _ = mgr.deliver_secret(&req_handle_id, key);
1418 }
1419 } else {
1420 denied.insert(deny_key);
1421 let _ = mgr.deny_secret(&req_handle_id);
1422 }
1423 }
1424 }
1425 }
1426
1427 async fn finalize_plan_execution(
1429 &mut self,
1430 completed_graph: crate::orchestration::TaskGraph,
1431 final_status: crate::orchestration::GraphStatus,
1432 ) -> Result<&'static str, error::AgentError> {
1433 use std::fmt::Write;
1434
1435 use crate::orchestration::{Aggregator, GraphStatus, LlmAggregator};
1436
1437 let result_label = match final_status {
1438 GraphStatus::Completed => {
1439 let completed_count = completed_graph
1441 .tasks
1442 .iter()
1443 .filter(|t| t.status == crate::orchestration::TaskStatus::Completed)
1444 .count() as u64;
1445 self.update_metrics(|m| m.orchestration.tasks_completed += completed_count);
1446
1447 let aggregator = LlmAggregator::new(
1448 self.provider.clone(),
1449 &self.orchestration.orchestration_config,
1450 );
1451 match aggregator.aggregate(&completed_graph).await {
1452 Ok(synthesis) => {
1453 self.channel.send(&synthesis).await?;
1454 }
1455 Err(e) => {
1456 tracing::error!(error = %e, "aggregation failed");
1457 self.channel
1458 .send(
1459 "Plan completed but aggregation failed. \
1460 Check individual task results.",
1461 )
1462 .await?;
1463 }
1464 }
1465 "completed"
1466 }
1467 GraphStatus::Failed => {
1468 let failed_tasks: Vec<_> = completed_graph
1469 .tasks
1470 .iter()
1471 .filter(|t| t.status == crate::orchestration::TaskStatus::Failed)
1472 .collect();
1473 self.update_metrics(|m| {
1474 m.orchestration.tasks_failed += failed_tasks.len() as u64;
1475 });
1476 let mut msg = format!(
1477 "Plan failed. {}/{} tasks failed:\n",
1478 failed_tasks.len(),
1479 completed_graph.tasks.len()
1480 );
1481 for t in &failed_tasks {
1482 let err: std::borrow::Cow<str> =
1484 t.result.as_ref().map_or("unknown error".into(), |r| {
1485 if r.output.len() > 500 {
1486 r.output.chars().take(500).collect::<String>().into()
1487 } else {
1488 r.output.as_str().into()
1489 }
1490 });
1491 let _ = writeln!(msg, " - {}: {err}", t.title);
1492 }
1493 msg.push_str("\nUse `/plan retry` to retry failed tasks.");
1494 self.channel.send(&msg).await?;
1495 self.orchestration.pending_graph = Some(completed_graph);
1497 "failed"
1498 }
1499 GraphStatus::Paused => {
1500 self.channel
1501 .send(
1502 "Plan paused due to a task failure (ask strategy). \
1503 Use `/plan resume` to continue or `/plan retry` to retry failed tasks.",
1504 )
1505 .await?;
1506 self.orchestration.pending_graph = Some(completed_graph);
1507 "paused"
1508 }
1509 GraphStatus::Canceled => {
1510 let done_count = completed_graph
1511 .tasks
1512 .iter()
1513 .filter(|t| t.status == crate::orchestration::TaskStatus::Completed)
1514 .count();
1515 self.update_metrics(|m| m.orchestration.tasks_completed += done_count as u64);
1516 let total = completed_graph.tasks.len();
1517 self.channel
1518 .send(&format!(
1519 "Plan canceled. {done_count}/{total} tasks completed before cancellation."
1520 ))
1521 .await?;
1522 "canceled"
1525 }
1526 other => {
1527 tracing::warn!(%other, "unexpected graph status after Done");
1528 self.channel
1529 .send(&format!("Plan ended with status: {other}"))
1530 .await?;
1531 "unknown"
1532 }
1533 };
1534 Ok(result_label)
1535 }
1536
1537 async fn handle_plan_status(
1538 &mut self,
1539 _graph_id: Option<&str>,
1540 ) -> Result<(), error::AgentError> {
1541 use crate::orchestration::GraphStatus;
1542 let Some(ref graph) = self.orchestration.pending_graph else {
1543 self.channel.send("No active plan.").await?;
1544 return Ok(());
1545 };
1546 let msg = match graph.status {
1547 GraphStatus::Created => {
1548 "A plan is awaiting confirmation. Type `/plan confirm` to execute or `/plan cancel` to abort."
1549 }
1550 GraphStatus::Running => "Plan is currently running.",
1551 GraphStatus::Paused => {
1552 "Plan is paused. Use `/plan resume` to continue or `/plan cancel` to abort."
1553 }
1554 GraphStatus::Failed => {
1555 "Plan failed. Use `/plan retry` to retry or `/plan cancel` to discard."
1556 }
1557 GraphStatus::Completed => "Plan completed successfully.",
1558 GraphStatus::Canceled => "Plan was canceled.",
1559 };
1560 self.channel.send(msg).await?;
1561 Ok(())
1562 }
1563
1564 async fn handle_plan_list(&mut self) -> Result<(), error::AgentError> {
1565 if let Some(ref graph) = self.orchestration.pending_graph {
1566 let summary = format_plan_summary(graph);
1567 let status_label = match graph.status {
1568 crate::orchestration::GraphStatus::Created => "awaiting confirmation",
1569 crate::orchestration::GraphStatus::Running => "running",
1570 crate::orchestration::GraphStatus::Paused => "paused",
1571 crate::orchestration::GraphStatus::Failed => "failed (retryable)",
1572 _ => "unknown",
1573 };
1574 self.channel
1575 .send(&format!("{summary}\nStatus: {status_label}"))
1576 .await?;
1577 } else {
1578 self.channel.send("No recent plans.").await?;
1579 }
1580 Ok(())
1581 }
1582
1583 async fn handle_plan_cancel(
1584 &mut self,
1585 _graph_id: Option<&str>,
1586 ) -> Result<(), error::AgentError> {
1587 if let Some(token) = self.orchestration.plan_cancel_token.take() {
1588 token.cancel();
1595 self.channel.send("Canceling plan execution...").await?;
1596 } else if self.orchestration.pending_graph.take().is_some() {
1597 let now = std::time::Instant::now();
1598 self.update_metrics(|m| {
1599 if let Some(ref mut s) = m.orchestration_graph {
1600 "canceled".clone_into(&mut s.status);
1601 s.completed_at = Some(now);
1602 }
1603 });
1604 self.channel.send("Plan canceled.").await?;
1605 } else {
1606 self.channel.send("No active plan to cancel.").await?;
1607 }
1608 Ok(())
1609 }
1610
1611 async fn handle_plan_resume(
1616 &mut self,
1617 graph_id: Option<&str>,
1618 ) -> Result<(), error::AgentError> {
1619 use crate::orchestration::GraphStatus;
1620
1621 let Some(ref graph) = self.orchestration.pending_graph else {
1622 self.channel
1623 .send("No paused plan to resume. Use `/plan status` to check the current state.")
1624 .await?;
1625 return Ok(());
1626 };
1627
1628 if let Some(id) = graph_id
1630 && graph.id.to_string() != id
1631 {
1632 self.channel
1633 .send(&format!(
1634 "Graph id '{id}' does not match the active plan ({}). \
1635 Use `/plan status` to see the active plan id.",
1636 graph.id
1637 ))
1638 .await?;
1639 return Ok(());
1640 }
1641
1642 if graph.status != GraphStatus::Paused {
1643 self.channel
1644 .send(&format!(
1645 "The active plan is in '{}' status and cannot be resumed. \
1646 Only Paused plans can be resumed.",
1647 graph.status
1648 ))
1649 .await?;
1650 return Ok(());
1651 }
1652
1653 let graph = self.orchestration.pending_graph.take().unwrap();
1654
1655 tracing::info!(
1656 graph_id = %graph.id,
1657 "resuming paused graph"
1658 );
1659
1660 self.channel
1661 .send(&format!(
1662 "Resuming plan: {}\nUse `/plan confirm` to continue execution.",
1663 graph.goal
1664 ))
1665 .await?;
1666
1667 self.orchestration.pending_graph = Some(graph);
1669 Ok(())
1670 }
1671
1672 async fn handle_plan_retry(&mut self, graph_id: Option<&str>) -> Result<(), error::AgentError> {
1678 use crate::orchestration::{GraphStatus, dag};
1679
1680 let Some(ref graph) = self.orchestration.pending_graph else {
1681 self.channel
1682 .send("No active plan to retry. Use `/plan status` to check the current state.")
1683 .await?;
1684 return Ok(());
1685 };
1686
1687 if let Some(id) = graph_id
1689 && graph.id.to_string() != id
1690 {
1691 self.channel
1692 .send(&format!(
1693 "Graph id '{id}' does not match the active plan ({}). \
1694 Use `/plan status` to see the active plan id.",
1695 graph.id
1696 ))
1697 .await?;
1698 return Ok(());
1699 }
1700
1701 if graph.status != GraphStatus::Failed && graph.status != GraphStatus::Paused {
1702 self.channel
1703 .send(&format!(
1704 "The active plan is in '{}' status. Only Failed or Paused plans can be retried.",
1705 graph.status
1706 ))
1707 .await?;
1708 return Ok(());
1709 }
1710
1711 let mut graph = self.orchestration.pending_graph.take().unwrap();
1712
1713 let failed_count = graph
1715 .tasks
1716 .iter()
1717 .filter(|t| t.status == crate::orchestration::TaskStatus::Failed)
1718 .count();
1719
1720 dag::reset_for_retry(&mut graph).map_err(|e| error::AgentError::Other(e.to_string()))?;
1721
1722 for task in &mut graph.tasks {
1727 if task.status == crate::orchestration::TaskStatus::Running {
1728 task.status = crate::orchestration::TaskStatus::Ready;
1729 task.assigned_agent = None;
1730 }
1731 }
1732
1733 tracing::info!(
1734 graph_id = %graph.id,
1735 failed_count,
1736 "retrying failed tasks in graph"
1737 );
1738
1739 self.channel
1740 .send(&format!(
1741 "Retrying {failed_count} failed task(s) in plan: {}\n\
1742 Use `/plan confirm` to execute.",
1743 graph.goal
1744 ))
1745 .await?;
1746
1747 self.orchestration.pending_graph = Some(graph);
1749 Ok(())
1750 }
1751
1752 pub async fn shutdown(&mut self) {
1753 self.channel.send("Shutting down...").await.ok();
1754
1755 self.provider.save_router_state();
1757
1758 if let Some(ref mut mgr) = self.orchestration.subagent_manager {
1759 mgr.shutdown_all();
1760 }
1761
1762 if let Some(ref manager) = self.mcp.manager {
1763 manager.shutdown_all_shared().await;
1764 }
1765
1766 if let Some(turns) = self.context_manager.turns_since_last_hard_compaction {
1770 self.update_metrics(|m| {
1771 m.compaction_turns_after_hard.push(turns);
1772 });
1773 self.context_manager.turns_since_last_hard_compaction = None;
1774 }
1775
1776 if let Some(ref tx) = self.metrics.metrics_tx {
1777 let m = tx.borrow();
1778 if m.filter_applications > 0 {
1779 #[allow(clippy::cast_precision_loss)]
1780 let pct = if m.filter_raw_tokens > 0 {
1781 m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
1782 } else {
1783 0.0
1784 };
1785 tracing::info!(
1786 raw_tokens = m.filter_raw_tokens,
1787 saved_tokens = m.filter_saved_tokens,
1788 applications = m.filter_applications,
1789 "tool output filtering saved ~{} tokens ({pct:.0}%)",
1790 m.filter_saved_tokens,
1791 );
1792 }
1793 if m.compaction_hard_count > 0 {
1794 tracing::info!(
1795 hard_compactions = m.compaction_hard_count,
1796 turns_after_hard = ?m.compaction_turns_after_hard,
1797 "hard compaction trajectory"
1798 );
1799 }
1800 }
1801 tracing::info!("agent shutdown complete");
1802 }
1803
1804 fn refresh_subagent_metrics(&mut self) {
1811 let Some(ref mgr) = self.orchestration.subagent_manager else {
1812 return;
1813 };
1814 let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
1815 .statuses()
1816 .into_iter()
1817 .map(|(id, s)| {
1818 let def = mgr.agents_def(&id);
1819 crate::metrics::SubAgentMetrics {
1820 name: def.map_or_else(|| id[..8.min(id.len())].to_owned(), |d| d.name.clone()),
1821 id: id.clone(),
1822 state: format!("{:?}", s.state).to_lowercase(),
1823 turns_used: s.turns_used,
1824 max_turns: def.map_or(20, |d| d.permissions.max_turns),
1825 background: def.is_some_and(|d| d.permissions.background),
1826 elapsed_secs: s.started_at.elapsed().as_secs(),
1827 permission_mode: def.map_or_else(String::new, |d| {
1828 use crate::subagent::def::PermissionMode;
1829 match d.permissions.permission_mode {
1830 PermissionMode::Default => String::new(),
1831 PermissionMode::AcceptEdits => "accept_edits".into(),
1832 PermissionMode::DontAsk => "dont_ask".into(),
1833 PermissionMode::BypassPermissions => "bypass_permissions".into(),
1834 PermissionMode::Plan => "plan".into(),
1835 }
1836 }),
1837 }
1838 })
1839 .collect();
1840 self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
1841 }
1842
1843 async fn notify_completed_subagents(&mut self) -> Result<(), error::AgentError> {
1845 let completed = self.poll_subagents().await;
1846 for (task_id, result) in completed {
1847 let notice = if result.is_empty() {
1848 format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
1849 } else {
1850 format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
1851 };
1852 if let Err(e) = self.channel.send(¬ice).await {
1853 tracing::warn!(error = %e, "failed to send sub-agent completion notice");
1854 }
1855 }
1856 Ok(())
1857 }
1858
1859 pub async fn run(&mut self) -> Result<(), error::AgentError> {
1865 if let Some(mut rx) = self.lifecycle.warmup_ready.take()
1866 && !*rx.borrow()
1867 {
1868 let _ = rx.changed().await;
1869 if !*rx.borrow() {
1870 tracing::warn!("model warmup did not complete successfully");
1871 }
1872 }
1873
1874 loop {
1875 if let Some(ref slot) = self.providers.provider_override
1877 && let Some(new_provider) = slot
1878 .write()
1879 .unwrap_or_else(std::sync::PoisonError::into_inner)
1880 .take()
1881 {
1882 tracing::debug!(provider = new_provider.name(), "ACP model override applied");
1883 self.provider = new_provider;
1884 }
1885
1886 self.check_tool_refresh().await;
1888
1889 self.refresh_subagent_metrics();
1891
1892 self.notify_completed_subagents().await?;
1894
1895 self.drain_channel();
1896
1897 let (text, image_parts) = if let Some(queued) = self.message_queue.pop_front() {
1898 self.notify_queue_count().await;
1899 if queued.raw_attachments.is_empty() {
1900 (queued.text, queued.image_parts)
1901 } else {
1902 let msg = crate::channel::ChannelMessage {
1903 text: queued.text,
1904 attachments: queued.raw_attachments,
1905 };
1906 self.resolve_message(msg).await
1907 }
1908 } else {
1909 let incoming = tokio::select! {
1910 result = self.channel.recv() => result?,
1911 () = shutdown_signal(&mut self.lifecycle.shutdown) => {
1912 tracing::info!("shutting down");
1913 break;
1914 }
1915 Some(_) = recv_optional(&mut self.skill_state.skill_reload_rx) => {
1916 self.reload_skills().await;
1917 continue;
1918 }
1919 Some(_) = recv_optional(&mut self.instruction_reload_rx) => {
1920 self.reload_instructions();
1921 continue;
1922 }
1923 Some(_) = recv_optional(&mut self.lifecycle.config_reload_rx) => {
1924 self.reload_config();
1925 continue;
1926 }
1927 Some(msg) = recv_optional(&mut self.lifecycle.update_notify_rx) => {
1928 if let Err(e) = self.channel.send(&msg).await {
1929 tracing::warn!("failed to send update notification: {e}");
1930 }
1931 continue;
1932 }
1933 Some(msg) = recv_optional(&mut self.experiment_notify_rx) => {
1934 #[cfg(feature = "experiments")]
1937 { self.experiment_cancel = None; }
1938 if let Err(e) = self.channel.send(&msg).await {
1939 tracing::warn!("failed to send experiment completion: {e}");
1940 }
1941 continue;
1942 }
1943 Some(prompt) = recv_optional(&mut self.lifecycle.custom_task_rx) => {
1944 tracing::info!("scheduler: injecting custom task as agent turn");
1945 let text = format!("[Scheduled task] {prompt}");
1946 Some(crate::channel::ChannelMessage { text, attachments: Vec::new() })
1947 }
1948 };
1949 let Some(msg) = incoming else { break };
1950 self.drain_channel();
1951 self.resolve_message(msg).await
1952 };
1953
1954 let trimmed = text.trim();
1955
1956 match self.handle_builtin_command(trimmed).await? {
1957 Some(true) => break,
1958 Some(false) => continue,
1959 None => {}
1960 }
1961
1962 self.process_user_message(text, image_parts).await?;
1963 }
1964
1965 if let Some(ref mut tc) = self.debug_state.trace_collector {
1967 tc.finish();
1968 }
1969
1970 Ok(())
1971 }
1972
1973 async fn handle_builtin_command(
1979 &mut self,
1980 trimmed: &str,
1981 ) -> Result<Option<bool>, error::AgentError> {
1982 if trimmed == "/clear-queue" {
1983 let n = self.clear_queue();
1984 self.notify_queue_count().await;
1985 self.channel
1986 .send(&format!("Cleared {n} queued messages."))
1987 .await?;
1988 let _ = self.channel.flush_chunks().await;
1989 return Ok(Some(false));
1990 }
1991
1992 if trimmed == "/compact" {
1993 if self.messages.len() > self.context_manager.compaction_preserve_tail + 1 {
1994 match self.compact_context().await {
1995 Ok(()) => {
1996 let _ = self.channel.send("Context compacted successfully.").await;
1997 }
1998 Err(e) => {
1999 let _ = self.channel.send(&format!("Compaction failed: {e}")).await;
2000 }
2001 }
2002 } else {
2003 let _ = self.channel.send("Nothing to compact.").await;
2004 }
2005 let _ = self.channel.flush_chunks().await;
2006 return Ok(Some(false));
2007 }
2008
2009 if trimmed == "/clear" {
2010 self.clear_history();
2011 let _ = self.channel.flush_chunks().await;
2012 return Ok(Some(false));
2013 }
2014
2015 if trimmed == "/model" || trimmed.starts_with("/model ") {
2016 self.handle_model_command(trimmed).await;
2017 let _ = self.channel.flush_chunks().await;
2018 return Ok(Some(false));
2019 }
2020
2021 if trimmed == "/debug-dump" || trimmed.starts_with("/debug-dump ") {
2022 self.handle_debug_dump_command(trimmed).await;
2023 let _ = self.channel.flush_chunks().await;
2024 return Ok(Some(false));
2025 }
2026
2027 if trimmed.starts_with("/dump-format") {
2028 self.handle_dump_format_command(trimmed).await;
2029 let _ = self.channel.flush_chunks().await;
2030 return Ok(Some(false));
2031 }
2032
2033 if trimmed == "/exit" || trimmed == "/quit" {
2034 if self.channel.supports_exit() {
2035 return Ok(Some(true));
2036 }
2037 let _ = self
2038 .channel
2039 .send("/exit is not supported in this channel.")
2040 .await;
2041 return Ok(Some(false));
2042 }
2043
2044 Ok(None)
2045 }
2046
2047 pub fn set_model(&mut self, model_id: &str) -> Result<(), String> {
2055 if model_id.is_empty() {
2056 return Err("model id must not be empty".to_string());
2057 }
2058 if model_id.len() > 256 {
2059 return Err("model id exceeds maximum length of 256 characters".to_string());
2060 }
2061 if !model_id
2062 .chars()
2063 .all(|c| c.is_ascii() && !c.is_ascii_control())
2064 {
2065 return Err("model id must contain only printable ASCII characters".to_string());
2066 }
2067 self.runtime.model_name = model_id.to_string();
2068 tracing::info!(model = model_id, "set_model called");
2069 Ok(())
2070 }
2071
2072 async fn handle_model_refresh(&mut self) {
2073 if let Some(cache_dir) = dirs::cache_dir() {
2075 let models_dir = cache_dir.join("zeph").join("models");
2076 if let Ok(entries) = std::fs::read_dir(&models_dir) {
2077 for entry in entries.flatten() {
2078 let path = entry.path();
2079 if path.extension().and_then(|e| e.to_str()) == Some("json") {
2080 let _ = std::fs::remove_file(&path);
2081 }
2082 }
2083 }
2084 }
2085 match self.provider.list_models_remote().await {
2086 Ok(models) => {
2087 let _ = self
2088 .channel
2089 .send(&format!("Fetched {} models.", models.len()))
2090 .await;
2091 }
2092 Err(e) => {
2093 let _ = self
2094 .channel
2095 .send(&format!("Error fetching models: {e}"))
2096 .await;
2097 }
2098 }
2099 }
2100
2101 async fn handle_model_list(&mut self) {
2102 let cache = zeph_llm::model_cache::ModelCache::for_slug(self.provider.name());
2103 let cached = if cache.is_stale() {
2104 None
2105 } else {
2106 cache.load().unwrap_or(None)
2107 };
2108 let models = if let Some(m) = cached {
2109 m
2110 } else {
2111 match self.provider.list_models_remote().await {
2112 Ok(m) => m,
2113 Err(e) => {
2114 let _ = self
2115 .channel
2116 .send(&format!("Error fetching models: {e}"))
2117 .await;
2118 return;
2119 }
2120 }
2121 };
2122 if models.is_empty() {
2123 let _ = self.channel.send("No models available.").await;
2124 return;
2125 }
2126 let mut lines = vec!["Available models:".to_string()];
2127 for (i, m) in models.iter().enumerate() {
2128 lines.push(format!(" {}. {} ({})", i + 1, m.display_name, m.id));
2129 }
2130 let _ = self.channel.send(&lines.join("\n")).await;
2131 }
2132
2133 async fn handle_model_switch(&mut self, model_id: &str) {
2134 let cache = zeph_llm::model_cache::ModelCache::for_slug(self.provider.name());
2137 let known_models: Option<Vec<zeph_llm::model_cache::RemoteModelInfo>> = if cache.is_stale()
2138 {
2139 match self.provider.list_models_remote().await {
2140 Ok(m) if !m.is_empty() => Some(m),
2141 _ => None,
2142 }
2143 } else {
2144 cache.load().unwrap_or(None)
2145 };
2146 if let Some(models) = known_models {
2147 if !models.iter().any(|m| m.id == model_id) {
2148 let mut lines = vec![format!("Unknown model '{model_id}'. Available models:")];
2149 for m in &models {
2150 lines.push(format!(" • {} ({})", m.display_name, m.id));
2151 }
2152 let _ = self.channel.send(&lines.join("\n")).await;
2153 return;
2154 }
2155 } else {
2156 let _ = self
2157 .channel
2158 .send(
2159 "Model list unavailable, switching anyway — verify your model name is correct.",
2160 )
2161 .await;
2162 }
2163 match self.set_model(model_id) {
2164 Ok(()) => {
2165 let _ = self
2166 .channel
2167 .send(&format!("Switched to model: {model_id}"))
2168 .await;
2169 }
2170 Err(e) => {
2171 let _ = self.channel.send(&format!("Error: {e}")).await;
2172 }
2173 }
2174 }
2175
2176 async fn handle_model_command(&mut self, trimmed: &str) {
2178 let arg = trimmed.strip_prefix("/model").map_or("", str::trim);
2179 if arg == "refresh" {
2180 self.handle_model_refresh().await;
2181 } else if arg.is_empty() {
2182 self.handle_model_list().await;
2183 } else {
2184 self.handle_model_switch(arg).await;
2185 }
2186 }
2187
2188 async fn handle_debug_dump_command(&mut self, trimmed: &str) {
2190 let arg = trimmed.strip_prefix("/debug-dump").map_or("", str::trim);
2191 if arg.is_empty() {
2192 match &self.debug_state.debug_dumper {
2193 Some(d) => {
2194 let _ = self
2195 .channel
2196 .send(&format!("Debug dump active: {}", d.dir().display()))
2197 .await;
2198 }
2199 None => {
2200 let _ = self
2201 .channel
2202 .send(
2203 "Debug dump is inactive. Use `/debug-dump <path>` to enable, \
2204 or start with `--debug-dump [dir]`.",
2205 )
2206 .await;
2207 }
2208 }
2209 return;
2210 }
2211 let dir = std::path::PathBuf::from(arg);
2212 match crate::debug_dump::DebugDumper::new(&dir, self.debug_state.dump_format) {
2213 Ok(dumper) => {
2214 let path = dumper.dir().display().to_string();
2215 self.debug_state.debug_dumper = Some(dumper);
2216 let _ = self
2217 .channel
2218 .send(&format!("Debug dump enabled: {path}"))
2219 .await;
2220 }
2221 Err(e) => {
2222 let _ = self
2223 .channel
2224 .send(&format!("Failed to enable debug dump: {e}"))
2225 .await;
2226 }
2227 }
2228 }
2229
2230 async fn handle_dump_format_command(&mut self, trimmed: &str) {
2232 let arg = trimmed.strip_prefix("/dump-format").map_or("", str::trim);
2233 if arg.is_empty() {
2234 let _ = self
2235 .channel
2236 .send(&format!(
2237 "Current dump format: {:?}. Use `/dump-format json|raw|trace` to change.",
2238 self.debug_state.dump_format
2239 ))
2240 .await;
2241 return;
2242 }
2243 let new_format = match arg {
2244 "json" => crate::debug_dump::DumpFormat::Json,
2245 "raw" => crate::debug_dump::DumpFormat::Raw,
2246 "trace" => crate::debug_dump::DumpFormat::Trace,
2247 other => {
2248 let _ = self
2249 .channel
2250 .send(&format!(
2251 "Unknown format '{other}'. Valid values: json, raw, trace."
2252 ))
2253 .await;
2254 return;
2255 }
2256 };
2257 let was_trace = self.debug_state.dump_format == crate::debug_dump::DumpFormat::Trace;
2258 let now_trace = new_format == crate::debug_dump::DumpFormat::Trace;
2259
2260 if now_trace
2262 && !was_trace
2263 && let Some(ref dump_dir) = self.debug_state.dump_dir.clone()
2264 {
2265 let service_name = self.debug_state.trace_service_name.clone();
2266 let redact = self.debug_state.trace_redact;
2267 match crate::debug_dump::trace::TracingCollector::new(
2268 dump_dir.as_path(),
2269 &service_name,
2270 redact,
2271 None,
2272 ) {
2273 Ok(collector) => {
2274 self.debug_state.trace_collector = Some(collector);
2275 }
2276 Err(e) => {
2277 tracing::warn!(error = %e, "failed to create TracingCollector on format switch");
2278 }
2279 }
2280 }
2281 if was_trace
2283 && !now_trace
2284 && let Some(mut tc) = self.debug_state.trace_collector.take()
2285 {
2286 tc.finish();
2287 }
2288
2289 self.debug_state.dump_format = new_format;
2290 let _ = self
2291 .channel
2292 .send(&format!("Debug dump format set to: {arg}"))
2293 .await;
2294 }
2295
2296 async fn resolve_message(
2297 &self,
2298 msg: crate::channel::ChannelMessage,
2299 ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
2300 use crate::channel::{Attachment, AttachmentKind};
2301 use zeph_llm::provider::{ImageData, MessagePart};
2302
2303 let text_base = msg.text.clone();
2304
2305 let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
2306 .attachments
2307 .into_iter()
2308 .partition(|a| a.kind == AttachmentKind::Audio);
2309
2310 tracing::debug!(
2311 audio = audio_attachments.len(),
2312 has_stt = self.providers.stt.is_some(),
2313 "resolve_message attachments"
2314 );
2315
2316 let text = if !audio_attachments.is_empty()
2317 && let Some(stt) = self.providers.stt.as_ref()
2318 {
2319 let mut transcribed_parts = Vec::new();
2320 for attachment in &audio_attachments {
2321 if attachment.data.len() > MAX_AUDIO_BYTES {
2322 tracing::warn!(
2323 size = attachment.data.len(),
2324 max = MAX_AUDIO_BYTES,
2325 "audio attachment exceeds size limit, skipping"
2326 );
2327 continue;
2328 }
2329 match stt
2330 .transcribe(&attachment.data, attachment.filename.as_deref())
2331 .await
2332 {
2333 Ok(result) => {
2334 tracing::info!(
2335 len = result.text.len(),
2336 language = ?result.language,
2337 "audio transcribed"
2338 );
2339 transcribed_parts.push(result.text);
2340 }
2341 Err(e) => {
2342 tracing::error!(error = %e, "audio transcription failed");
2343 }
2344 }
2345 }
2346 if transcribed_parts.is_empty() {
2347 text_base
2348 } else {
2349 let transcribed = transcribed_parts.join("\n");
2350 if text_base.is_empty() {
2351 transcribed
2352 } else {
2353 format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
2354 }
2355 }
2356 } else {
2357 if !audio_attachments.is_empty() {
2358 tracing::warn!(
2359 count = audio_attachments.len(),
2360 "audio attachments received but no STT provider configured, dropping"
2361 );
2362 }
2363 text_base
2364 };
2365
2366 let mut image_parts = Vec::new();
2367 for attachment in image_attachments {
2368 if attachment.data.len() > MAX_IMAGE_BYTES {
2369 tracing::warn!(
2370 size = attachment.data.len(),
2371 max = MAX_IMAGE_BYTES,
2372 "image attachment exceeds size limit, skipping"
2373 );
2374 continue;
2375 }
2376 let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
2377 image_parts.push(MessagePart::Image(Box::new(ImageData {
2378 data: attachment.data,
2379 mime_type,
2380 })));
2381 }
2382
2383 (text, image_parts)
2384 }
2385
2386 async fn dispatch_slash_command(
2389 &mut self,
2390 trimmed: &str,
2391 ) -> Option<Result<(), error::AgentError>> {
2392 macro_rules! handled {
2393 ($expr:expr) => {{
2394 if let Err(e) = $expr {
2395 return Some(Err(e));
2396 }
2397 let _ = self.channel.flush_chunks().await;
2398 return Some(Ok(()));
2399 }};
2400 }
2401
2402 if trimmed == "/help" {
2403 handled!(self.handle_help_command().await);
2404 }
2405
2406 if trimmed == "/status" {
2407 handled!(self.handle_status_command().await);
2408 }
2409
2410 if trimmed == "/skills" {
2411 handled!(self.handle_skills_command().await);
2412 }
2413
2414 if trimmed == "/skill" || trimmed.starts_with("/skill ") {
2415 let rest = trimmed
2416 .strip_prefix("/skill")
2417 .unwrap_or("")
2418 .trim()
2419 .to_owned();
2420 handled!(self.handle_skill_command(&rest).await);
2421 }
2422
2423 if trimmed == "/feedback" || trimmed.starts_with("/feedback ") {
2424 let rest = trimmed
2425 .strip_prefix("/feedback")
2426 .unwrap_or("")
2427 .trim()
2428 .to_owned();
2429 handled!(self.handle_feedback(&rest).await);
2430 }
2431
2432 if trimmed == "/mcp" || trimmed.starts_with("/mcp ") {
2433 let args = trimmed.strip_prefix("/mcp").unwrap_or("").trim().to_owned();
2434 handled!(self.handle_mcp_command(&args).await);
2435 }
2436
2437 if trimmed == "/image" || trimmed.starts_with("/image ") {
2438 let path = trimmed
2439 .strip_prefix("/image")
2440 .unwrap_or("")
2441 .trim()
2442 .to_owned();
2443 if path.is_empty() {
2444 handled!(
2445 self.channel
2446 .send("Usage: /image <path>")
2447 .await
2448 .map_err(Into::into)
2449 );
2450 }
2451 handled!(self.handle_image_command(&path).await);
2452 }
2453
2454 if trimmed == "/plan" || trimmed.starts_with("/plan ") {
2455 return Some(self.dispatch_plan_command(trimmed).await);
2456 }
2457
2458 if trimmed == "/graph" || trimmed.starts_with("/graph ") {
2459 handled!(self.handle_graph_command(trimmed).await);
2460 }
2461
2462 #[cfg(feature = "scheduler")]
2463 if trimmed == "/scheduler" || trimmed.starts_with("/scheduler ") {
2464 handled!(self.handle_scheduler_command(trimmed).await);
2465 }
2466
2467 #[cfg(feature = "experiments")]
2468 if trimmed == "/experiment" || trimmed.starts_with("/experiment ") {
2469 handled!(self.handle_experiment_command(trimmed).await);
2470 }
2471
2472 #[cfg(feature = "lsp-context")]
2473 if trimmed == "/lsp" {
2474 handled!(self.handle_lsp_status_command().await);
2475 }
2476
2477 if trimmed == "/log" {
2478 handled!(self.handle_log_command().await);
2479 }
2480
2481 if trimmed.starts_with("/agent") || trimmed.starts_with('@') {
2482 return self.dispatch_agent_command(trimmed).await;
2483 }
2484
2485 None
2486 }
2487
2488 async fn dispatch_plan_command(&mut self, trimmed: &str) -> Result<(), error::AgentError> {
2489 match crate::orchestration::PlanCommand::parse(trimmed) {
2490 Ok(cmd) => {
2491 self.handle_plan_command(cmd).await?;
2492 }
2493 Err(e) => {
2494 self.channel
2495 .send(&e.to_string())
2496 .await
2497 .map_err(error::AgentError::from)?;
2498 }
2499 }
2500 let _ = self.channel.flush_chunks().await;
2501 Ok(())
2502 }
2503
2504 async fn dispatch_agent_command(
2505 &mut self,
2506 trimmed: &str,
2507 ) -> Option<Result<(), error::AgentError>> {
2508 let known: Vec<String> = self
2509 .orchestration
2510 .subagent_manager
2511 .as_ref()
2512 .map(|m| m.definitions().iter().map(|d| d.name.clone()).collect())
2513 .unwrap_or_default();
2514 match crate::subagent::AgentCommand::parse(trimmed, &known) {
2515 Ok(cmd) => {
2516 if let Some(msg) = self.handle_agent_command(cmd).await
2517 && let Err(e) = self.channel.send(&msg).await
2518 {
2519 return Some(Err(e.into()));
2520 }
2521 let _ = self.channel.flush_chunks().await;
2522 Some(Ok(()))
2523 }
2524 Err(e) if trimmed.starts_with('@') => {
2525 tracing::debug!("@mention not matched as agent: {e}");
2527 None
2528 }
2529 Err(e) => {
2530 if let Err(send_err) = self.channel.send(&e.to_string()).await {
2531 return Some(Err(send_err.into()));
2532 }
2533 let _ = self.channel.flush_chunks().await;
2534 Some(Ok(()))
2535 }
2536 }
2537 }
2538
2539 fn spawn_judge_correction_check(
2547 &mut self,
2548 trimmed: &str,
2549 conv_id: Option<zeph_memory::ConversationId>,
2550 ) {
2551 let judge_provider = self
2552 .providers
2553 .judge_provider
2554 .clone()
2555 .unwrap_or_else(|| self.provider.clone());
2556 let assistant_snippet = self.last_assistant_response();
2557 let user_msg_owned = trimmed.to_owned();
2558 let memory_arc = self.memory_state.memory.clone();
2559 let skill_name = self
2560 .skill_state
2561 .active_skill_names
2562 .first()
2563 .cloned()
2564 .unwrap_or_default();
2565 let conv_id_bg = conv_id;
2566 let confidence_threshold = self
2567 .learning_engine
2568 .config
2569 .as_ref()
2570 .map_or(0.6, |c| c.correction_confidence_threshold);
2571
2572 tokio::spawn(async move {
2573 match feedback_detector::JudgeDetector::evaluate(
2574 &judge_provider,
2575 &user_msg_owned,
2576 &assistant_snippet,
2577 confidence_threshold,
2578 )
2579 .await
2580 {
2581 Ok(verdict) => {
2582 if let Some(signal) = verdict.into_signal(&user_msg_owned) {
2583 let is_self_correction =
2588 signal.kind == feedback_detector::CorrectionKind::SelfCorrection;
2589 tracing::info!(
2590 kind = signal.kind.as_str(),
2591 confidence = signal.confidence,
2592 source = "judge",
2593 is_self_correction,
2594 "correction signal detected"
2595 );
2596 if let Some(memory) = memory_arc {
2597 let correction_text = context::truncate_chars(&user_msg_owned, 500);
2598 match memory
2599 .sqlite()
2600 .store_user_correction(
2601 conv_id_bg.map(|c| c.0),
2602 &assistant_snippet,
2603 &correction_text,
2604 if skill_name.is_empty() {
2605 None
2606 } else {
2607 Some(skill_name.as_str())
2608 },
2609 signal.kind.as_str(),
2610 )
2611 .await
2612 {
2613 Ok(correction_id) => {
2614 if let Err(e) = memory
2615 .store_correction_embedding(correction_id, &correction_text)
2616 .await
2617 {
2618 tracing::warn!(
2619 "failed to store correction embedding: {e:#}"
2620 );
2621 }
2622 }
2623 Err(e) => {
2624 tracing::warn!("failed to store judge correction: {e:#}");
2625 }
2626 }
2627 }
2628 }
2629 }
2630 Err(e) => {
2631 tracing::warn!("judge detector failed: {e:#}");
2632 }
2633 }
2634 });
2635 }
2636
2637 async fn detect_and_record_corrections(
2642 &mut self,
2643 trimmed: &str,
2644 conv_id: Option<zeph_memory::ConversationId>,
2645 ) {
2646 let correction_detection_enabled = self
2647 .learning_engine
2648 .config
2649 .as_ref()
2650 .is_none_or(|c| c.correction_detection);
2651 if !self.is_learning_enabled() || !correction_detection_enabled {
2652 return;
2653 }
2654
2655 let previous_user_messages: Vec<&str> = self
2656 .messages
2657 .iter()
2658 .filter(|m| m.role == Role::User)
2659 .map(|m| m.content.as_str())
2660 .collect();
2661 let regex_signal = self
2662 .feedback_detector
2663 .detect(trimmed, &previous_user_messages);
2664
2665 let judge_should_run = self
2678 .judge_detector
2679 .as_ref()
2680 .is_some_and(|jd| jd.should_invoke(regex_signal.as_ref()))
2681 && self
2682 .judge_detector
2683 .as_mut()
2684 .is_some_and(feedback_detector::JudgeDetector::check_rate_limit);
2685
2686 let signal = if judge_should_run {
2687 self.spawn_judge_correction_check(trimmed, conv_id);
2688 None
2690 } else {
2691 regex_signal
2692 };
2693
2694 let Some(signal) = signal else { return };
2695 tracing::info!(
2696 kind = signal.kind.as_str(),
2697 confidence = signal.confidence,
2698 source = "regex",
2699 "implicit correction detected"
2700 );
2701 let feedback_text = context::truncate_chars(&signal.feedback_text, 500);
2703 if signal.kind != feedback_detector::CorrectionKind::SelfCorrection {
2706 self.record_skill_outcomes(
2707 "user_rejection",
2708 Some(&feedback_text),
2709 Some(signal.kind.as_str()),
2710 )
2711 .await;
2712 }
2713 if let Some(memory) = &self.memory_state.memory {
2714 let correction_text = context::truncate_chars(trimmed, 500);
2718 match memory
2719 .sqlite()
2720 .store_user_correction(
2721 conv_id.map(|c| c.0),
2722 "",
2723 &correction_text,
2724 self.skill_state
2725 .active_skill_names
2726 .first()
2727 .map(String::as_str),
2728 signal.kind.as_str(),
2729 )
2730 .await
2731 {
2732 Ok(correction_id) => {
2733 if let Err(e) = memory
2734 .store_correction_embedding(correction_id, &correction_text)
2735 .await
2736 {
2737 tracing::warn!("failed to store correction embedding: {e:#}");
2738 }
2739 }
2740 Err(e) => tracing::warn!("failed to store user correction: {e:#}"),
2741 }
2742 }
2743 }
2744
2745 async fn process_user_message(
2746 &mut self,
2747 text: String,
2748 image_parts: Vec<zeph_llm::provider::MessagePart>,
2749 ) -> Result<(), error::AgentError> {
2750 let iteration_index = self.debug_state.iteration_counter;
2752 self.debug_state.iteration_counter += 1;
2753 if let Some(ref mut tc) = self.debug_state.trace_collector {
2754 tc.begin_iteration(iteration_index, text.trim());
2755 self.debug_state.current_iteration_span_id =
2757 tc.current_iteration_span_id(iteration_index);
2758 }
2759
2760 let result = self
2761 .process_user_message_inner(text, image_parts, iteration_index)
2762 .await;
2763
2764 if let Some(ref mut tc) = self.debug_state.trace_collector {
2766 let status = if result.is_ok() {
2767 crate::debug_dump::trace::SpanStatus::Ok
2768 } else {
2769 crate::debug_dump::trace::SpanStatus::Error {
2770 message: "iteration failed".to_owned(),
2771 }
2772 };
2773 tc.end_iteration(iteration_index, status);
2774 }
2775 self.debug_state.current_iteration_span_id = None;
2776
2777 result
2778 }
2779
2780 async fn process_user_message_inner(
2781 &mut self,
2782 text: String,
2783 image_parts: Vec<zeph_llm::provider::MessagePart>,
2784 iteration_index: usize,
2785 ) -> Result<(), error::AgentError> {
2786 let _ = iteration_index; self.lifecycle.cancel_token = CancellationToken::new();
2788 let signal = Arc::clone(&self.lifecycle.cancel_signal);
2789 let token = self.lifecycle.cancel_token.clone();
2790 tokio::spawn(async move {
2791 signal.notified().await;
2792 token.cancel();
2793 });
2794 let trimmed = text.trim();
2795
2796 if let Some(result) = self.dispatch_slash_command(trimmed).await {
2797 return result;
2798 }
2799
2800 self.check_pending_rollbacks().await;
2801 let conv_id = self.memory_state.conversation_id;
2804 self.rebuild_system_prompt(&text).await;
2805
2806 self.detect_and_record_corrections(trimmed, conv_id).await;
2807
2808 self.context_manager.compacted_this_turn = false;
2810
2811 self.maybe_apply_deferred_summaries();
2816
2817 if let Err(e) = self.maybe_proactive_compress().await {
2819 tracing::warn!("proactive compression failed: {e:#}");
2820 }
2821
2822 if let Err(e) = self.maybe_compact().await {
2823 tracing::warn!("context compaction failed: {e:#}");
2824 }
2825
2826 if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
2827 tracing::warn!("context preparation failed: {e:#}");
2828 }
2829
2830 self.learning_engine.reset_reflection();
2831
2832 let mut all_image_parts = std::mem::take(&mut self.pending_image_parts);
2833 all_image_parts.extend(image_parts);
2834 let image_parts = all_image_parts;
2835
2836 let user_msg = if !image_parts.is_empty() && self.provider.supports_vision() {
2837 let mut parts = vec![zeph_llm::provider::MessagePart::Text { text: text.clone() }];
2838 parts.extend(image_parts);
2839 Message::from_parts(Role::User, parts)
2840 } else {
2841 if !image_parts.is_empty() {
2842 tracing::warn!(
2843 count = image_parts.len(),
2844 "image attachments dropped: provider does not support vision"
2845 );
2846 }
2847 Message {
2848 role: Role::User,
2849 content: text.clone(),
2850 parts: vec![],
2851 metadata: MessageMetadata::default(),
2852 }
2853 };
2854 self.persist_message(Role::User, &text, &[], false).await;
2856 self.push_message(user_msg);
2857
2858 if let Err(e) = self.process_response().await {
2859 tracing::error!("Response processing failed: {e:#}");
2860 let user_msg = format!("Error: {e:#}");
2861 self.channel.send(&user_msg).await?;
2862 self.messages.pop();
2863 self.recompute_prompt_tokens();
2864 self.channel.flush_chunks().await?;
2865 }
2866
2867 Ok(())
2868 }
2869
2870 async fn handle_image_command(&mut self, path: &str) -> Result<(), error::AgentError> {
2871 use std::path::Component;
2872 use zeph_llm::provider::{ImageData, MessagePart};
2873
2874 let has_parent_dir = std::path::Path::new(path)
2876 .components()
2877 .any(|c| c == Component::ParentDir);
2878 if has_parent_dir {
2879 self.channel
2880 .send("Invalid image path: path traversal not allowed")
2881 .await?;
2882 let _ = self.channel.flush_chunks().await;
2883 return Ok(());
2884 }
2885
2886 let data = match std::fs::read(path) {
2887 Ok(d) => d,
2888 Err(e) => {
2889 self.channel
2890 .send(&format!("Cannot read image {path}: {e}"))
2891 .await?;
2892 let _ = self.channel.flush_chunks().await;
2893 return Ok(());
2894 }
2895 };
2896 if data.len() > MAX_IMAGE_BYTES {
2897 self.channel
2898 .send(&format!(
2899 "Image {path} exceeds size limit ({} MB), skipping",
2900 MAX_IMAGE_BYTES / 1024 / 1024
2901 ))
2902 .await?;
2903 let _ = self.channel.flush_chunks().await;
2904 return Ok(());
2905 }
2906 let mime_type = detect_image_mime(Some(path)).to_string();
2907 self.pending_image_parts
2908 .push(MessagePart::Image(Box::new(ImageData { data, mime_type })));
2909 self.channel
2910 .send(&format!("Image loaded: {path}. Send your message."))
2911 .await?;
2912 let _ = self.channel.flush_chunks().await;
2913 Ok(())
2914 }
2915
2916 async fn handle_help_command(&mut self) -> Result<(), error::AgentError> {
2917 use std::fmt::Write;
2918
2919 let mut out = String::from("Slash commands:\n\n");
2920
2921 let categories = [
2922 slash_commands::SlashCategory::Info,
2923 slash_commands::SlashCategory::Session,
2924 slash_commands::SlashCategory::Model,
2925 slash_commands::SlashCategory::Memory,
2926 slash_commands::SlashCategory::Tools,
2927 slash_commands::SlashCategory::Planning,
2928 slash_commands::SlashCategory::Debug,
2929 slash_commands::SlashCategory::Advanced,
2930 ];
2931
2932 for cat in &categories {
2933 let entries: Vec<_> = slash_commands::COMMANDS
2934 .iter()
2935 .filter(|c| &c.category == cat)
2936 .collect();
2937 if entries.is_empty() {
2938 continue;
2939 }
2940 let _ = writeln!(out, "{}:", cat.as_str());
2941 for cmd in entries {
2942 if cmd.args.is_empty() {
2943 let _ = write!(out, " {}", cmd.name);
2944 } else {
2945 let _ = write!(out, " {} {}", cmd.name, cmd.args);
2946 }
2947 let _ = write!(out, " — {}", cmd.description);
2948 if let Some(feat) = cmd.feature_gate {
2949 let _ = write!(out, " [requires: {feat}]");
2950 }
2951 let _ = writeln!(out);
2952 }
2953 let _ = writeln!(out);
2954 }
2955
2956 self.channel.send(out.trim_end()).await?;
2957 Ok(())
2958 }
2959
2960 async fn handle_status_command(&mut self) -> Result<(), error::AgentError> {
2961 use std::fmt::Write;
2962
2963 let uptime = self.lifecycle.start_time.elapsed().as_secs();
2964 let msg_count = self
2965 .messages
2966 .iter()
2967 .filter(|m| m.role == Role::User)
2968 .count();
2969
2970 let (api_calls, prompt_tokens, completion_tokens, cost_cents, mcp_servers) =
2971 if let Some(ref tx) = self.metrics.metrics_tx {
2972 let m = tx.borrow();
2973 (
2974 m.api_calls,
2975 m.prompt_tokens,
2976 m.completion_tokens,
2977 m.cost_spent_cents,
2978 m.mcp_server_count,
2979 )
2980 } else {
2981 (0, 0, 0, 0.0, 0)
2982 };
2983
2984 let skill_count = self
2985 .skill_state
2986 .registry
2987 .read()
2988 .map(|r| r.all_meta().len())
2989 .unwrap_or(0);
2990
2991 let mut out = String::from("Session status:\n\n");
2992 let _ = writeln!(out, "Provider: {}", self.provider.name());
2993 let _ = writeln!(out, "Model: {}", self.runtime.model_name);
2994 let _ = writeln!(out, "Uptime: {uptime}s");
2995 let _ = writeln!(out, "Turns: {msg_count}");
2996 let _ = writeln!(out, "API calls: {api_calls}");
2997 let _ = writeln!(
2998 out,
2999 "Tokens: {prompt_tokens} prompt / {completion_tokens} completion"
3000 );
3001 let _ = writeln!(out, "Skills: {skill_count}");
3002 let _ = writeln!(out, "MCP: {mcp_servers} server(s)");
3003 if cost_cents > 0.0 {
3004 let _ = writeln!(out, "Cost: ${:.4}", cost_cents / 100.0);
3005 }
3006
3007 self.channel.send(out.trim_end()).await?;
3008 Ok(())
3009 }
3010
3011 async fn handle_skills_command(&mut self) -> Result<(), error::AgentError> {
3012 use std::fmt::Write;
3013
3014 let mut output = String::from("Available skills:\n\n");
3015
3016 let all_meta: Vec<zeph_skills::loader::SkillMeta> = self
3017 .skill_state
3018 .registry
3019 .read()
3020 .expect("registry read lock")
3021 .all_meta()
3022 .into_iter()
3023 .cloned()
3024 .collect();
3025
3026 for meta in &all_meta {
3027 let trust_info = if let Some(memory) = &self.memory_state.memory {
3028 memory
3029 .sqlite()
3030 .load_skill_trust(&meta.name)
3031 .await
3032 .ok()
3033 .flatten()
3034 .map_or_else(String::new, |r| format!(" [{}]", r.trust_level))
3035 } else {
3036 String::new()
3037 };
3038 let _ = writeln!(output, "- {} — {}{trust_info}", meta.name, meta.description);
3039 }
3040
3041 if let Some(memory) = &self.memory_state.memory {
3042 match memory.sqlite().load_skill_usage().await {
3043 Ok(usage) if !usage.is_empty() => {
3044 output.push_str("\nUsage statistics:\n\n");
3045 for row in &usage {
3046 let _ = writeln!(
3047 output,
3048 "- {}: {} invocations (last: {})",
3049 row.skill_name, row.invocation_count, row.last_used_at,
3050 );
3051 }
3052 }
3053 Ok(_) => {}
3054 Err(e) => tracing::warn!("failed to load skill usage: {e:#}"),
3055 }
3056 }
3057
3058 self.channel.send(&output).await?;
3059 Ok(())
3060 }
3061
3062 async fn handle_feedback(&mut self, input: &str) -> Result<(), error::AgentError> {
3063 let Some((name, rest)) = input.split_once(' ') else {
3064 self.channel
3065 .send("Usage: /feedback <skill_name> <message>")
3066 .await?;
3067 return Ok(());
3068 };
3069 let (skill_name, feedback) = (name.trim(), rest.trim().trim_matches('"'));
3070
3071 if feedback.is_empty() {
3072 self.channel
3073 .send("Usage: /feedback <skill_name> <message>")
3074 .await?;
3075 return Ok(());
3076 }
3077
3078 let Some(memory) = &self.memory_state.memory else {
3079 self.channel.send("Memory not available.").await?;
3080 return Ok(());
3081 };
3082
3083 let outcome_type = if self.feedback_detector.detect(feedback, &[]).is_some() {
3084 "user_rejection"
3085 } else {
3086 "user_approval"
3087 };
3088
3089 memory
3090 .sqlite()
3091 .record_skill_outcome(
3092 skill_name,
3093 None,
3094 self.memory_state.conversation_id,
3095 outcome_type,
3096 None,
3097 Some(feedback),
3098 )
3099 .await?;
3100
3101 if self.is_learning_enabled() && outcome_type == "user_rejection" {
3102 self.generate_improved_skill(skill_name, feedback, "", Some(feedback))
3103 .await
3104 .ok();
3105 }
3106
3107 self.channel
3108 .send(&format!("Feedback recorded for \"{skill_name}\"."))
3109 .await?;
3110 Ok(())
3111 }
3112
3113 async fn poll_subagent_until_done(&mut self, task_id: &str, label: &str) -> Option<String> {
3116 use crate::subagent::SubAgentState;
3117 let result = loop {
3118 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
3119
3120 #[allow(clippy::redundant_closure_for_method_calls)]
3124 let pending = self
3125 .orchestration
3126 .subagent_manager
3127 .as_mut()
3128 .and_then(|m| m.try_recv_secret_request());
3129 if let Some((req_task_id, req)) = pending {
3130 let confirm_prompt = format!(
3133 "Sub-agent requests secret '{}'. Allow?",
3134 crate::text::truncate_to_chars(&req.secret_key, 100)
3135 );
3136 let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
3137 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
3138 if approved {
3139 let ttl = std::time::Duration::from_secs(300);
3140 let key = req.secret_key.clone();
3141 if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
3142 let _ = mgr.deliver_secret(&req_task_id, key);
3143 }
3144 } else {
3145 let _ = mgr.deny_secret(&req_task_id);
3146 }
3147 }
3148 }
3149
3150 let mgr = self.orchestration.subagent_manager.as_ref()?;
3151 let statuses = mgr.statuses();
3152 let Some((_, status)) = statuses.iter().find(|(id, _)| id == task_id) else {
3153 break format!("{label} completed (no status available).");
3154 };
3155 match status.state {
3156 SubAgentState::Completed => {
3157 let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
3158 break format!("{label} completed: {msg}");
3159 }
3160 SubAgentState::Failed => {
3161 let msg = status
3162 .last_message
3163 .clone()
3164 .unwrap_or_else(|| "unknown error".into());
3165 break format!("{label} failed: {msg}");
3166 }
3167 SubAgentState::Canceled => {
3168 break format!("{label} was cancelled.");
3169 }
3170 _ => {
3171 let _ = self
3172 .channel
3173 .send_status(&format!(
3174 "{label}: turn {}/{}",
3175 status.turns_used,
3176 self.orchestration
3177 .subagent_manager
3178 .as_ref()
3179 .and_then(|m| m.agents_def(task_id))
3180 .map_or(20, |d| d.permissions.max_turns)
3181 ))
3182 .await;
3183 }
3184 }
3185 };
3186 Some(result)
3187 }
3188
3189 fn resolve_agent_id_prefix(&mut self, prefix: &str) -> Option<Result<String, String>> {
3192 let mgr = self.orchestration.subagent_manager.as_mut()?;
3193 let full_ids: Vec<String> = mgr
3194 .statuses()
3195 .into_iter()
3196 .map(|(tid, _)| tid)
3197 .filter(|tid| tid.starts_with(prefix))
3198 .collect();
3199 Some(match full_ids.as_slice() {
3200 [] => Err(format!("No sub-agent with id prefix '{prefix}'")),
3201 [fid] => Ok(fid.clone()),
3202 _ => Err(format!(
3203 "Ambiguous id prefix '{prefix}': matches {} agents",
3204 full_ids.len()
3205 )),
3206 })
3207 }
3208
3209 fn handle_agent_list(&self) -> Option<String> {
3210 use std::fmt::Write as _;
3211 let mgr = self.orchestration.subagent_manager.as_ref()?;
3212 let defs = mgr.definitions();
3213 if defs.is_empty() {
3214 return Some("No sub-agent definitions found.".into());
3215 }
3216 let mut out = String::from("Available sub-agents:\n");
3217 for d in defs {
3218 let memory_label = match d.memory {
3219 Some(crate::subagent::MemoryScope::User) => " [memory:user]",
3220 Some(crate::subagent::MemoryScope::Project) => " [memory:project]",
3221 Some(crate::subagent::MemoryScope::Local) => " [memory:local]",
3222 None => "",
3223 };
3224 if let Some(ref src) = d.source {
3225 let _ = writeln!(
3226 out,
3227 " {}{} — {} ({})",
3228 d.name, memory_label, d.description, src
3229 );
3230 } else {
3231 let _ = writeln!(out, " {}{} — {}", d.name, memory_label, d.description);
3232 }
3233 }
3234 Some(out)
3235 }
3236
3237 fn handle_agent_status(&self) -> Option<String> {
3238 use std::fmt::Write as _;
3239 let mgr = self.orchestration.subagent_manager.as_ref()?;
3240 let statuses = mgr.statuses();
3241 if statuses.is_empty() {
3242 return Some("No active sub-agents.".into());
3243 }
3244 let mut out = String::from("Active sub-agents:\n");
3245 for (id, s) in &statuses {
3246 let state = format!("{:?}", s.state).to_lowercase();
3247 let elapsed = s.started_at.elapsed().as_secs();
3248 let _ = writeln!(
3249 out,
3250 " [{short}] {state} turns={t} elapsed={elapsed}s {msg}",
3251 short = &id[..8.min(id.len())],
3252 t = s.turns_used,
3253 msg = s.last_message.as_deref().unwrap_or(""),
3254 );
3255 if let Some(def) = mgr.agents_def(id)
3257 && let Some(scope) = def.memory
3258 && let Ok(dir) = crate::subagent::memory::resolve_memory_dir(scope, &def.name)
3259 {
3260 let _ = writeln!(out, " memory: {}", dir.display());
3261 }
3262 }
3263 Some(out)
3264 }
3265
3266 fn handle_agent_approve(&mut self, id: &str) -> Option<String> {
3267 let full_id = match self.resolve_agent_id_prefix(id)? {
3268 Ok(fid) => fid,
3269 Err(msg) => return Some(msg),
3270 };
3271 let mgr = self.orchestration.subagent_manager.as_mut()?;
3272 if let Some((tid, req)) = mgr.try_recv_secret_request()
3273 && tid == full_id
3274 {
3275 let key = req.secret_key.clone();
3276 let ttl = std::time::Duration::from_secs(300);
3277 if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
3278 return Some(format!("Approve failed: {e}"));
3279 }
3280 if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
3281 return Some(format!("Secret delivery failed: {e}"));
3282 }
3283 return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
3284 }
3285 Some(format!(
3286 "No pending secret request for sub-agent '{full_id}'."
3287 ))
3288 }
3289
3290 fn handle_agent_deny(&mut self, id: &str) -> Option<String> {
3291 let full_id = match self.resolve_agent_id_prefix(id)? {
3292 Ok(fid) => fid,
3293 Err(msg) => return Some(msg),
3294 };
3295 let mgr = self.orchestration.subagent_manager.as_mut()?;
3296 match mgr.deny_secret(&full_id) {
3297 Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
3298 Err(e) => Some(format!("Deny failed: {e}")),
3299 }
3300 }
3301
3302 async fn handle_agent_command(&mut self, cmd: crate::subagent::AgentCommand) -> Option<String> {
3303 use crate::subagent::AgentCommand;
3304
3305 match cmd {
3306 AgentCommand::List => self.handle_agent_list(),
3307 AgentCommand::Background { name, prompt } => {
3308 let provider = self.provider.clone();
3309 let tool_executor = Arc::clone(&self.tool_executor);
3310 let skills = self.filtered_skills_for(&name);
3311 let mgr = self.orchestration.subagent_manager.as_mut()?;
3312 let cfg = self.orchestration.subagent_config.clone();
3313 match mgr.spawn(&name, &prompt, provider, tool_executor, skills, &cfg) {
3314 Ok(id) => Some(format!(
3315 "Sub-agent '{name}' started in background (id: {short})",
3316 short = &id[..8.min(id.len())]
3317 )),
3318 Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
3319 }
3320 }
3321 AgentCommand::Spawn { name, prompt }
3322 | AgentCommand::Mention {
3323 agent: name,
3324 prompt,
3325 } => {
3326 let provider = self.provider.clone();
3328 let tool_executor = Arc::clone(&self.tool_executor);
3329 let skills = self.filtered_skills_for(&name);
3330 let mgr = self.orchestration.subagent_manager.as_mut()?;
3331 let cfg = self.orchestration.subagent_config.clone();
3332 let task_id = match mgr.spawn(&name, &prompt, provider, tool_executor, skills, &cfg)
3333 {
3334 Ok(id) => id,
3335 Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
3336 };
3337 let short = task_id[..8.min(task_id.len())].to_owned();
3338 let _ = self
3339 .channel
3340 .send(&format!("Sub-agent '{name}' running... (id: {short})"))
3341 .await;
3342 let label = format!("Sub-agent '{name}'");
3343 self.poll_subagent_until_done(&task_id, &label).await
3344 }
3345 AgentCommand::Status => self.handle_agent_status(),
3346 AgentCommand::Cancel { id } => {
3347 let mgr = self.orchestration.subagent_manager.as_mut()?;
3348 let ids: Vec<String> = mgr
3350 .statuses()
3351 .into_iter()
3352 .map(|(task_id, _)| task_id)
3353 .filter(|task_id| task_id.starts_with(&id))
3354 .collect();
3355 match ids.as_slice() {
3356 [] => Some(format!("No sub-agent with id prefix '{id}'")),
3357 [full_id] => {
3358 let full_id = full_id.clone();
3359 match mgr.cancel(&full_id) {
3360 Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
3361 Err(e) => Some(format!("Cancel failed: {e}")),
3362 }
3363 }
3364 _ => Some(format!(
3365 "Ambiguous id prefix '{id}': matches {} agents",
3366 ids.len()
3367 )),
3368 }
3369 }
3370 AgentCommand::Approve { id } => self.handle_agent_approve(&id),
3371 AgentCommand::Deny { id } => self.handle_agent_deny(&id),
3372 AgentCommand::Resume { id, prompt } => {
3373 let cfg = self.orchestration.subagent_config.clone();
3374 let def_name = {
3377 let mgr = self.orchestration.subagent_manager.as_ref()?;
3378 match mgr.def_name_for_resume(&id, &cfg) {
3379 Ok(name) => name,
3380 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
3381 }
3382 };
3383 let skills = self.filtered_skills_for(&def_name);
3384 let provider = self.provider.clone();
3385 let tool_executor = Arc::clone(&self.tool_executor);
3386 let mgr = self.orchestration.subagent_manager.as_mut()?;
3387 let (task_id, _) =
3388 match mgr.resume(&id, &prompt, provider, tool_executor, skills, &cfg) {
3389 Ok(pair) => pair,
3390 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
3391 };
3392 let short = task_id[..8.min(task_id.len())].to_owned();
3393 let _ = self
3394 .channel
3395 .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
3396 .await;
3397 self.poll_subagent_until_done(&task_id, "Resumed sub-agent")
3398 .await
3399 }
3400 }
3401 }
3402
3403 fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
3404 let mgr = self.orchestration.subagent_manager.as_ref()?;
3405 let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
3406 let reg = self
3407 .skill_state
3408 .registry
3409 .read()
3410 .expect("registry read lock");
3411 match crate::subagent::filter_skills(®, &def.skills) {
3412 Ok(skills) => {
3413 let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
3414 if bodies.is_empty() {
3415 None
3416 } else {
3417 Some(bodies)
3418 }
3419 }
3420 Err(e) => {
3421 tracing::warn!(error = %e, "skill filtering failed for sub-agent");
3422 None
3423 }
3424 }
3425 }
3426
3427 async fn update_trust_for_reloaded_skills(&self, all_meta: &[zeph_skills::loader::SkillMeta]) {
3429 let Some(ref memory) = self.memory_state.memory else {
3430 return;
3431 };
3432 let trust_cfg = self.skill_state.trust_config.clone();
3433 let managed_dir = self.skill_state.managed_dir.clone();
3434 for meta in all_meta {
3435 let source_kind = if managed_dir
3436 .as_ref()
3437 .is_some_and(|d| meta.skill_dir.starts_with(d))
3438 {
3439 zeph_memory::sqlite::SourceKind::Hub
3440 } else {
3441 zeph_memory::sqlite::SourceKind::Local
3442 };
3443 let initial_level = if matches!(source_kind, zeph_memory::sqlite::SourceKind::Hub) {
3444 &trust_cfg.default_level
3445 } else {
3446 &trust_cfg.local_level
3447 };
3448 match zeph_skills::compute_skill_hash(&meta.skill_dir) {
3449 Ok(current_hash) => {
3450 let existing = memory
3451 .sqlite()
3452 .load_skill_trust(&meta.name)
3453 .await
3454 .ok()
3455 .flatten();
3456 let trust_level_str = if let Some(ref row) = existing {
3457 if row.blake3_hash == current_hash {
3458 row.trust_level.clone()
3459 } else {
3460 trust_cfg.hash_mismatch_level.to_string()
3461 }
3462 } else {
3463 initial_level.to_string()
3464 };
3465 let source_path = meta.skill_dir.to_str();
3466 if let Err(e) = memory
3467 .sqlite()
3468 .upsert_skill_trust(
3469 &meta.name,
3470 &trust_level_str,
3471 source_kind,
3472 None,
3473 source_path,
3474 ¤t_hash,
3475 )
3476 .await
3477 {
3478 tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
3479 }
3480 }
3481 Err(e) => {
3482 tracing::warn!("failed to compute hash for '{}': {e:#}", meta.name);
3483 }
3484 }
3485 }
3486 }
3487
3488 async fn rebuild_skill_matcher(&mut self, all_meta: &[&zeph_skills::loader::SkillMeta]) {
3490 let provider = self.provider.clone();
3491 let embed_fn = |text: &str| -> zeph_skills::matcher::EmbedFuture {
3492 let owned = text.to_owned();
3493 let p = provider.clone();
3494 Box::pin(async move { p.embed(&owned).await })
3495 };
3496
3497 let needs_inmemory_rebuild = !self
3498 .skill_state
3499 .matcher
3500 .as_ref()
3501 .is_some_and(SkillMatcherBackend::is_qdrant);
3502
3503 if needs_inmemory_rebuild {
3504 self.skill_state.matcher = SkillMatcher::new(all_meta, embed_fn)
3505 .await
3506 .map(SkillMatcherBackend::InMemory);
3507 } else if let Some(ref mut backend) = self.skill_state.matcher {
3508 let _ = self.channel.send_status("syncing skill index...").await;
3509 if let Err(e) = backend
3510 .sync(all_meta, &self.skill_state.embedding_model, embed_fn)
3511 .await
3512 {
3513 tracing::warn!("failed to sync skill embeddings: {e:#}");
3514 }
3515 }
3516
3517 if self.skill_state.hybrid_search {
3518 let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
3519 let _ = self.channel.send_status("rebuilding search index...").await;
3520 self.skill_state.bm25_index = Some(zeph_skills::bm25::Bm25Index::build(&descs));
3521 }
3522 }
3523
3524 async fn reload_skills(&mut self) {
3525 let new_registry = SkillRegistry::load(&self.skill_state.skill_paths);
3526 if new_registry.fingerprint()
3527 == self
3528 .skill_state
3529 .registry
3530 .read()
3531 .expect("registry read lock")
3532 .fingerprint()
3533 {
3534 return;
3535 }
3536 let _ = self.channel.send_status("reloading skills...").await;
3537 *self
3538 .skill_state
3539 .registry
3540 .write()
3541 .expect("registry write lock") = new_registry;
3542
3543 let all_meta = self
3544 .skill_state
3545 .registry
3546 .read()
3547 .expect("registry read lock")
3548 .all_meta()
3549 .into_iter()
3550 .cloned()
3551 .collect::<Vec<_>>();
3552
3553 self.update_trust_for_reloaded_skills(&all_meta).await;
3554
3555 let all_meta_refs = all_meta.iter().collect::<Vec<_>>();
3556 self.rebuild_skill_matcher(&all_meta_refs).await;
3557
3558 let all_skills: Vec<Skill> = {
3559 let reg = self
3560 .skill_state
3561 .registry
3562 .read()
3563 .expect("registry read lock");
3564 reg.all_meta()
3565 .iter()
3566 .filter_map(|m| reg.get_skill(&m.name).ok())
3567 .collect()
3568 };
3569 let trust_map = self.build_skill_trust_map().await;
3570 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
3571 let skills_prompt = format_skills_prompt(&all_skills, &trust_map, &empty_health);
3572 self.skill_state
3573 .last_skills_prompt
3574 .clone_from(&skills_prompt);
3575 let system_prompt = build_system_prompt(&skills_prompt, None, None, false);
3576 if let Some(msg) = self.messages.first_mut() {
3577 msg.content = system_prompt;
3578 }
3579
3580 let _ = self.channel.send_status("").await;
3581 tracing::info!(
3582 "reloaded {} skill(s)",
3583 self.skill_state
3584 .registry
3585 .read()
3586 .expect("registry read lock")
3587 .all_meta()
3588 .len()
3589 );
3590 }
3591
3592 fn reload_instructions(&mut self) {
3593 if let Some(ref mut rx) = self.instruction_reload_rx {
3595 while rx.try_recv().is_ok() {}
3596 }
3597 let Some(ref state) = self.instruction_reload_state else {
3598 return;
3599 };
3600 let new_blocks = crate::instructions::load_instructions(
3601 &state.base_dir,
3602 &state.provider_kinds,
3603 &state.explicit_files,
3604 state.auto_detect,
3605 );
3606 let old_sources: std::collections::HashSet<_> =
3607 self.instruction_blocks.iter().map(|b| &b.source).collect();
3608 let new_sources: std::collections::HashSet<_> =
3609 new_blocks.iter().map(|b| &b.source).collect();
3610 for added in new_sources.difference(&old_sources) {
3611 tracing::info!(path = %added.display(), "instruction file added");
3612 }
3613 for removed in old_sources.difference(&new_sources) {
3614 tracing::info!(path = %removed.display(), "instruction file removed");
3615 }
3616 tracing::info!(
3617 old_count = self.instruction_blocks.len(),
3618 new_count = new_blocks.len(),
3619 "reloaded instruction files"
3620 );
3621 self.instruction_blocks = new_blocks;
3622 }
3623
3624 fn reload_config(&mut self) {
3625 let Some(ref path) = self.lifecycle.config_path else {
3626 return;
3627 };
3628 let config = match Config::load(path) {
3629 Ok(c) => c,
3630 Err(e) => {
3631 tracing::warn!("config reload failed: {e:#}");
3632 return;
3633 }
3634 };
3635
3636 self.runtime.security = config.security;
3637 self.runtime.timeouts = config.timeouts;
3638 self.runtime.redact_credentials = config.memory.redact_credentials;
3639 self.memory_state.history_limit = config.memory.history_limit;
3640 self.memory_state.recall_limit = config.memory.semantic.recall_limit;
3641 self.memory_state.summarization_threshold = config.memory.summarization_threshold;
3642 self.skill_state.max_active_skills = config.skills.max_active_skills;
3643 self.skill_state.disambiguation_threshold = config.skills.disambiguation_threshold;
3644 self.skill_state.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
3645 self.skill_state.hybrid_search = config.skills.hybrid_search;
3646
3647 if config.memory.context_budget_tokens > 0 {
3648 self.context_manager.budget = Some(
3649 ContextBudget::new(config.memory.context_budget_tokens, 0.20)
3650 .with_graph_enabled(config.memory.graph.enabled),
3651 );
3652 } else {
3653 self.context_manager.budget = None;
3654 }
3655
3656 {
3657 self.memory_state.graph_config = config.memory.graph.clone();
3658 }
3659 self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
3660 self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
3661 self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
3662 self.context_manager.compaction_cooldown_turns = config.memory.compaction_cooldown_turns;
3663 self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
3664 self.context_manager.compression = config.memory.compression.clone();
3665 self.context_manager.routing = config.memory.routing.clone();
3666 self.memory_state.cross_session_score_threshold =
3667 config.memory.cross_session_score_threshold;
3668
3669 self.index.repo_map_tokens = config.index.repo_map_tokens;
3670 self.index.repo_map_ttl = std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
3671
3672 tracing::info!("config reloaded");
3673 }
3674}
3675pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
3676 while !*rx.borrow_and_update() {
3677 if rx.changed().await.is_err() {
3678 std::future::pending::<()>().await;
3679 }
3680 }
3681}
3682
3683pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
3684 match rx {
3685 Some(inner) => {
3686 if let Some(v) = inner.recv().await {
3687 Some(v)
3688 } else {
3689 *rx = None;
3690 std::future::pending().await
3691 }
3692 }
3693 None => std::future::pending().await,
3694 }
3695}
3696
3697#[cfg(test)]
3698mod tests;
3699
3700#[cfg(test)]
3701pub(crate) use tests::agent_tests;