1mod accessors;
5mod builder;
6pub(crate) mod compaction_strategy;
7#[cfg(feature = "compression-guidelines")]
8pub(super) mod compression_feedback;
9mod context;
10pub(crate) mod context_manager;
11pub mod error;
12#[cfg(feature = "experiments")]
13mod experiment_cmd;
14pub(super) mod feedback_detector;
15pub(crate) mod focus;
16mod graph_commands;
17#[cfg(feature = "compression-guidelines")]
18mod guidelines_commands;
19mod index;
20mod learning;
21pub(crate) mod learning_engine;
22mod log_commands;
23#[cfg(feature = "lsp-context")]
24mod lsp_commands;
25mod mcp;
26mod memory_commands;
27mod message_queue;
28mod persistence;
29#[cfg(feature = "policy-enforcer")]
30mod policy_commands;
31mod provider_cmd;
32pub(crate) mod rate_limiter;
33#[cfg(feature = "scheduler")]
34mod scheduler_commands;
35pub mod session_config;
36pub(crate) mod sidequest;
37mod skill_management;
38pub mod slash_commands;
39pub(crate) mod state;
40pub(crate) mod tool_execution;
41pub(crate) mod tool_orchestrator;
42mod trust_commands;
43mod utils;
44
45use std::collections::{HashMap, HashSet, VecDeque};
46use std::sync::Arc;
47use std::time::Instant;
48
49use tokio::sync::{Notify, mpsc, watch};
50use tokio_util::sync::CancellationToken;
51use zeph_llm::any::AnyProvider;
52use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
53use zeph_memory::TokenCounter;
54use zeph_memory::semantic::SemanticMemory;
55use zeph_skills::loader::Skill;
56use zeph_skills::matcher::{SkillMatcher, SkillMatcherBackend};
57use zeph_skills::prompt::format_skills_prompt;
58use zeph_skills::registry::SkillRegistry;
59use zeph_tools::executor::{ErasedToolExecutor, ToolExecutor};
60
61use crate::channel::Channel;
62use crate::config::Config;
63use crate::config::{SecurityConfig, SkillPromptMode, TimeoutConfig};
64use crate::context::{
65 ContextBudget, EnvironmentContext, build_system_prompt, build_system_prompt_with_instructions,
66};
67use zeph_sanitizer::ContentSanitizer;
68
69use message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, detect_image_mime};
70#[cfg(feature = "context-compression")]
71use state::CompressionState;
72use state::{
73 DebugState, ExperimentState, FeedbackState, IndexState, InstructionState, LifecycleState,
74 McpState, MemoryState, MessageState, MetricsState, OrchestrationState, ProviderState,
75 RuntimeConfig, SecurityState, SessionState, SkillState,
76};
77
78pub(crate) const DOOM_LOOP_WINDOW: usize = 3;
79pub(crate) const DOCUMENT_RAG_PREFIX: &str = "## Relevant documents\n";
80pub(crate) const RECALL_PREFIX: &str = "[semantic recall]\n";
81pub(crate) const CODE_CONTEXT_PREFIX: &str = "[code context]\n";
82pub(crate) const SUMMARY_PREFIX: &str = "[conversation summaries]\n";
83pub(crate) const CROSS_SESSION_PREFIX: &str = "[cross-session context]\n";
84pub(crate) const CORRECTIONS_PREFIX: &str = "[past corrections]\n";
85pub(crate) const GRAPH_FACTS_PREFIX: &str = "[known facts]\n";
86pub(crate) const SCHEDULED_TASK_PREFIX: &str = "Execute the following scheduled task now: ";
87#[cfg(feature = "lsp-context")]
92pub(crate) const LSP_NOTE_PREFIX: &str = "[lsp ";
93pub(crate) const TOOL_OUTPUT_SUFFIX: &str = "\n```";
94
95fn format_plan_summary(graph: &crate::orchestration::TaskGraph) -> String {
96 use std::fmt::Write;
97 let mut out = String::new();
98 let _ = writeln!(out, "Plan: \"{}\"", graph.goal);
99 let _ = writeln!(out, "Tasks: {}", graph.tasks.len());
100 let _ = writeln!(out);
101 for (i, task) in graph.tasks.iter().enumerate() {
102 let deps = if task.depends_on.is_empty() {
103 String::new()
104 } else {
105 let ids: Vec<String> = task.depends_on.iter().map(ToString::to_string).collect();
106 format!(" (after: {})", ids.join(", "))
107 };
108 let agent = task.agent_hint.as_deref().unwrap_or("-");
109 let _ = writeln!(out, " {}. [{}] {}{}", i + 1, agent, task.title, deps);
110 }
111 out
112}
113
114pub(crate) fn format_tool_output(tool_name: &str, body: &str) -> String {
115 use std::fmt::Write;
116 let capacity = "[tool output: ".len()
117 + tool_name.len()
118 + "]\n```\n".len()
119 + body.len()
120 + TOOL_OUTPUT_SUFFIX.len();
121 let mut buf = String::with_capacity(capacity);
122 let _ = write!(
123 buf,
124 "[tool output: {tool_name}]\n```\n{body}{TOOL_OUTPUT_SUFFIX}"
125 );
126 buf
127}
128
129pub struct Agent<C: Channel> {
130 provider: AnyProvider,
131 embedding_provider: AnyProvider,
136 channel: C,
137 pub(crate) tool_executor: Arc<dyn ErasedToolExecutor>,
138 pub(super) msg: MessageState,
139 pub(super) memory_state: MemoryState,
140 pub(super) skill_state: SkillState,
141 pub(super) context_manager: context_manager::ContextManager,
142 pub(super) tool_orchestrator: tool_orchestrator::ToolOrchestrator,
143 pub(super) learning_engine: learning_engine::LearningEngine,
144 pub(super) feedback: FeedbackState,
145 pub(super) runtime: RuntimeConfig,
146 pub(super) mcp: McpState,
147 pub(super) index: IndexState,
148 pub(super) session: SessionState,
149 pub(super) debug_state: DebugState,
150 pub(super) instructions: InstructionState,
151 pub(super) security: SecurityState,
152 pub(super) experiments: ExperimentState,
153 #[cfg(feature = "context-compression")]
154 pub(super) compression: CompressionState,
155 pub(super) lifecycle: LifecycleState,
156 pub(super) providers: ProviderState,
157 pub(super) metrics: MetricsState,
158 pub(super) orchestration: OrchestrationState,
159 pub(super) focus: focus::FocusState,
161 pub(super) sidequest: sidequest::SidequestState,
163 pub(super) tool_schema_filter: Option<zeph_tools::ToolSchemaFilter>,
165 pub(super) cached_filtered_tool_ids: Option<HashSet<String>>,
168 pub(super) dependency_graph: Option<zeph_tools::ToolDependencyGraph>,
171 pub(super) dependency_always_on: HashSet<String>,
173 pub(super) completed_tool_ids: HashSet<String>,
177 pub(super) last_persisted_message_id: Option<i64>,
180 pub(super) deferred_db_hide_ids: Vec<i64>,
182 pub(super) deferred_db_summaries: Vec<String>,
184}
185
186impl<C: Channel> Agent<C> {
187 #[must_use]
188 pub fn new(
189 provider: AnyProvider,
190 channel: C,
191 registry: SkillRegistry,
192 matcher: Option<SkillMatcherBackend>,
193 max_active_skills: usize,
194 tool_executor: impl ToolExecutor + 'static,
195 ) -> Self {
196 let registry = std::sync::Arc::new(std::sync::RwLock::new(registry));
197 Self::new_with_registry_arc(
198 provider,
199 channel,
200 registry,
201 matcher,
202 max_active_skills,
203 tool_executor,
204 )
205 }
206
207 #[must_use]
214 #[allow(clippy::too_many_lines)] pub fn new_with_registry_arc(
216 provider: AnyProvider,
217 channel: C,
218 registry: std::sync::Arc<std::sync::RwLock<SkillRegistry>>,
219 matcher: Option<SkillMatcherBackend>,
220 max_active_skills: usize,
221 tool_executor: impl ToolExecutor + 'static,
222 ) -> Self {
223 debug_assert!(max_active_skills > 0, "max_active_skills must be > 0");
224 let all_skills: Vec<Skill> = {
225 let reg = registry.read().expect("registry read lock poisoned");
226 reg.all_meta()
227 .iter()
228 .filter_map(|m| reg.get_skill(&m.name).ok())
229 .collect()
230 };
231 let empty_trust = HashMap::new();
232 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
233 let skills_prompt = format_skills_prompt(&all_skills, &empty_trust, &empty_health);
234 let system_prompt = build_system_prompt(&skills_prompt, None, None, false);
235 tracing::debug!(len = system_prompt.len(), "initial system prompt built");
236 tracing::trace!(prompt = %system_prompt, "full system prompt");
237
238 let initial_prompt_tokens = u64::try_from(system_prompt.len()).unwrap_or(0) / 4;
239 let (_tx, rx) = watch::channel(false);
240 let token_counter = Arc::new(TokenCounter::new());
241 #[cfg(feature = "experiments")]
245 let (exp_notify_tx, exp_notify_rx) = tokio::sync::mpsc::channel::<String>(4);
246 #[cfg(not(feature = "experiments"))]
247 let (_exp_notify_tx, exp_notify_rx) = tokio::sync::mpsc::channel::<String>(4);
248 let embedding_provider = provider.clone();
249 Self {
250 provider,
251 embedding_provider,
252 channel,
253 tool_executor: Arc::new(tool_executor),
254 msg: MessageState {
255 messages: vec![Message {
256 role: Role::System,
257 content: system_prompt,
258 parts: vec![],
259 metadata: MessageMetadata::default(),
260 }],
261 message_queue: VecDeque::new(),
262 pending_image_parts: Vec::new(),
263 },
264 memory_state: MemoryState {
265 memory: None,
266 conversation_id: None,
267 history_limit: 50,
268 recall_limit: 5,
269 summarization_threshold: 50,
270 cross_session_score_threshold: 0.35,
271 autosave_assistant: false,
272 autosave_min_length: 20,
273 tool_call_cutoff: 6,
274 unsummarized_count: 0,
275 document_config: crate::config::DocumentConfig::default(),
276 graph_config: crate::config::GraphConfig::default(),
277 compression_guidelines_config: zeph_memory::CompressionGuidelinesConfig::default(),
278 shutdown_summary: true,
279 shutdown_summary_min_messages: 4,
280 shutdown_summary_max_messages: 20,
281 shutdown_summary_timeout_secs: 10,
282 structured_summaries: false,
283 },
284 skill_state: SkillState {
285 registry,
286 skill_paths: Vec::new(),
287 managed_dir: None,
288 trust_config: crate::config::TrustConfig::default(),
289 matcher,
290 max_active_skills,
291 disambiguation_threshold: 0.05,
292 embedding_model: String::new(),
293 skill_reload_rx: None,
294 active_skill_names: Vec::new(),
295 last_skills_prompt: skills_prompt,
296 prompt_mode: SkillPromptMode::Auto,
297 available_custom_secrets: HashMap::new(),
298 cosine_weight: 0.7,
299 hybrid_search: false,
300 bm25_index: None,
301 },
302 context_manager: context_manager::ContextManager::new(),
303 tool_orchestrator: tool_orchestrator::ToolOrchestrator::new(),
304 learning_engine: learning_engine::LearningEngine::new(),
305 feedback: FeedbackState {
306 detector: feedback_detector::FeedbackDetector::new(0.6),
307 judge: None,
308 llm_classifier: None,
309 },
310 debug_state: DebugState {
311 debug_dumper: None,
312 dump_format: crate::debug_dump::DumpFormat::default(),
313 trace_collector: None,
314 iteration_counter: 0,
315 anomaly_detector: None,
316 logging_config: crate::config::LoggingConfig::default(),
317 dump_dir: None,
318 trace_service_name: String::new(),
319 trace_redact: true,
320 current_iteration_span_id: None,
321 },
322 runtime: RuntimeConfig {
323 security: SecurityConfig::default(),
324 timeouts: TimeoutConfig::default(),
325 model_name: String::new(),
326 active_provider_name: String::new(),
327 permission_policy: zeph_tools::PermissionPolicy::default(),
328 redact_credentials: true,
329 rate_limiter: rate_limiter::ToolRateLimiter::new(
330 rate_limiter::RateLimitConfig::default(),
331 ),
332 semantic_cache_enabled: false,
333 semantic_cache_threshold: 0.95,
334 semantic_cache_max_candidates: 10,
335 dependency_config: zeph_tools::DependencyConfig::default(),
336 },
337 mcp: McpState {
338 tools: Vec::new(),
339 registry: None,
340 manager: None,
341 allowed_commands: Vec::new(),
342 max_dynamic: 10,
343 shared_tools: None,
344 tool_rx: None,
345 },
346 index: IndexState {
347 retriever: None,
348 repo_map_tokens: 0,
349 cached_repo_map: None,
350 repo_map_ttl: std::time::Duration::from_secs(300),
351 },
352 session: SessionState {
353 env_context: EnvironmentContext::gather(""),
354 response_cache: None,
355 parent_tool_use_id: None,
356 status_tx: None,
357 #[cfg(feature = "lsp-context")]
358 lsp_hooks: None,
359 #[cfg(feature = "policy-enforcer")]
360 policy_config: None,
361 },
362 instructions: InstructionState {
363 blocks: Vec::new(),
364 reload_rx: None,
365 reload_state: None,
366 },
367 security: SecurityState {
368 sanitizer: ContentSanitizer::new(&zeph_sanitizer::ContentIsolationConfig::default()),
369 quarantine_summarizer: None,
370 exfiltration_guard: zeph_sanitizer::exfiltration::ExfiltrationGuard::new(
371 zeph_sanitizer::exfiltration::ExfiltrationGuardConfig::default(),
372 ),
373 flagged_urls: std::collections::HashSet::new(),
374 user_provided_urls: std::sync::Arc::new(std::sync::RwLock::new(
375 std::collections::HashSet::new(),
376 )),
377 pii_filter: zeph_sanitizer::pii::PiiFilter::new(
378 zeph_sanitizer::pii::PiiFilterConfig::default(),
379 ),
380 #[cfg(feature = "classifiers")]
381 pii_ner_backend: None,
382 #[cfg(feature = "classifiers")]
383 pii_ner_timeout_ms: 5000,
384 memory_validator: zeph_sanitizer::memory_validation::MemoryWriteValidator::new(
385 zeph_sanitizer::memory_validation::MemoryWriteValidationConfig::default(),
386 ),
387 #[cfg(feature = "guardrail")]
388 guardrail: None,
389 response_verifier: zeph_sanitizer::response_verifier::ResponseVerifier::new(
390 zeph_config::ResponseVerificationConfig::default(),
391 ),
392 },
393 experiments: ExperimentState {
394 #[cfg(feature = "experiments")]
395 config: crate::config::ExperimentConfig::default(),
396 #[cfg(feature = "experiments")]
397 cancel: None,
398 #[cfg(feature = "experiments")]
399 baseline: crate::experiments::ConfigSnapshot::default(),
400 #[cfg(feature = "experiments")]
401 eval_provider: None,
402 notify_rx: Some(exp_notify_rx),
403 #[cfg(feature = "experiments")]
404 notify_tx: exp_notify_tx,
405 },
406 #[cfg(feature = "context-compression")]
407 compression: CompressionState {
408 current_task_goal: None,
409 task_goal_user_msg_hash: None,
410 pending_task_goal: None,
411 pending_sidequest_result: None,
412 subgoal_registry: crate::agent::compaction_strategy::SubgoalRegistry::default(),
413 pending_subgoal: None,
414 subgoal_user_msg_hash: None,
415 },
416 lifecycle: LifecycleState {
417 shutdown: rx,
418 start_time: Instant::now(),
419 cancel_signal: Arc::new(Notify::new()),
420 cancel_token: CancellationToken::new(),
421 config_path: None,
422 config_reload_rx: None,
423 warmup_ready: None,
424 update_notify_rx: None,
425 custom_task_rx: None,
426 },
427 providers: ProviderState {
428 summary_provider: None,
429 provider_override: None,
430 judge_provider: None,
431 probe_provider: None,
432 cached_prompt_tokens: initial_prompt_tokens,
433 server_compaction_active: false,
434 stt: None,
435 provider_pool: Vec::new(),
436 provider_config_snapshot: None,
437 },
438 metrics: MetricsState {
439 metrics_tx: None,
440 cost_tracker: None,
441 token_counter,
442 extended_context: false,
443 },
444 orchestration: OrchestrationState {
445 planner_provider: None,
446 pending_graph: None,
447 plan_cancel_token: None,
448 subagent_manager: None,
449 subagent_config: crate::config::SubAgentConfig::default(),
450 orchestration_config: crate::config::OrchestrationConfig::default(),
451 plan_cache: None,
452 pending_goal_embedding: None,
453 },
454 focus: focus::FocusState::default(),
455 sidequest: sidequest::SidequestState::default(),
456 tool_schema_filter: None,
457 cached_filtered_tool_ids: None,
458 dependency_graph: None,
459 dependency_always_on: HashSet::new(),
460 completed_tool_ids: HashSet::new(),
461 last_persisted_message_id: None,
462 deferred_db_hide_ids: Vec::new(),
463 deferred_db_summaries: Vec::new(),
464 }
465 }
466
467 pub async fn poll_subagents(&mut self) -> Vec<(String, String)> {
472 let Some(mgr) = &mut self.orchestration.subagent_manager else {
473 return vec![];
474 };
475
476 let finished: Vec<String> = mgr
477 .statuses()
478 .into_iter()
479 .filter_map(|(id, status)| {
480 if matches!(
481 status.state,
482 crate::subagent::SubAgentState::Completed
483 | crate::subagent::SubAgentState::Failed
484 | crate::subagent::SubAgentState::Canceled
485 ) {
486 Some(id)
487 } else {
488 None
489 }
490 })
491 .collect();
492
493 let mut results = vec![];
494 for task_id in finished {
495 match mgr.collect(&task_id).await {
496 Ok(result) => results.push((task_id, result)),
497 Err(e) => {
498 tracing::warn!(task_id, error = %e, "failed to collect sub-agent result");
499 }
500 }
501 }
502 results
503 }
504
505 async fn handle_plan_command(
506 &mut self,
507 cmd: crate::orchestration::PlanCommand,
508 ) -> Result<(), error::AgentError> {
509 use crate::orchestration::PlanCommand;
510
511 if !self.config_for_orchestration().enabled {
512 self.channel
513 .send(
514 "Task orchestration is disabled. Set `orchestration.enabled = true` in config.",
515 )
516 .await?;
517 return Ok(());
518 }
519
520 match cmd {
521 PlanCommand::Goal(goal) => self.handle_plan_goal(&goal).await,
522 PlanCommand::Confirm => self.handle_plan_confirm().await,
523 PlanCommand::Status(id) => self.handle_plan_status(id.as_deref()).await,
524 PlanCommand::List => self.handle_plan_list().await,
525 PlanCommand::Cancel(id) => self.handle_plan_cancel(id.as_deref()).await,
526 PlanCommand::Resume(id) => self.handle_plan_resume(id.as_deref()).await,
527 PlanCommand::Retry(id) => self.handle_plan_retry(id.as_deref()).await,
528 }
529 }
530
531 fn config_for_orchestration(&self) -> &crate::config::OrchestrationConfig {
532 &self.orchestration.orchestration_config
533 }
534
535 async fn init_plan_cache_if_needed(&mut self) {
539 let plan_cache_config = self.orchestration.orchestration_config.plan_cache.clone();
540 if !plan_cache_config.enabled || self.orchestration.plan_cache.is_some() {
541 return;
542 }
543 if let Some(ref memory) = self.memory_state.memory {
544 let pool = memory.sqlite().pool().clone();
545 let embed_model = self.skill_state.embedding_model.clone();
546 match crate::orchestration::PlanCache::new(pool, plan_cache_config, &embed_model).await
547 {
548 Ok(cache) => self.orchestration.plan_cache = Some(cache),
549 Err(e) => {
550 tracing::warn!(error = %e, "plan cache: init failed, proceeding without cache");
551 }
552 }
553 } else {
554 tracing::warn!("plan cache: memory not configured, proceeding without cache");
555 }
556 }
557
558 async fn goal_embedding_for_cache(&self, goal: &str) -> Option<Vec<f32>> {
563 use crate::orchestration::normalize_goal;
564
565 self.orchestration.plan_cache.as_ref()?;
566 let normalized = normalize_goal(goal);
567 match self.embedding_provider.embed(&normalized).await {
568 Ok(emb) => Some(emb),
569 Err(zeph_llm::LlmError::EmbedUnsupported { .. }) => {
570 tracing::debug!(
571 "plan cache: provider does not support embeddings, skipping cache lookup"
572 );
573 None
574 }
575 Err(e) => {
576 tracing::warn!(error = %e, "plan cache: goal embedding failed, skipping cache");
577 None
578 }
579 }
580 }
581
582 async fn handle_plan_goal(&mut self, goal: &str) -> Result<(), error::AgentError> {
583 use crate::orchestration::{LlmPlanner, plan_with_cache};
584
585 if self.orchestration.pending_graph.is_some() {
586 self.channel
587 .send(
588 "A plan is already pending confirmation. \
589 Use /plan confirm to execute it or /plan cancel to discard.",
590 )
591 .await?;
592 return Ok(());
593 }
594
595 self.channel.send("Planning task decomposition...").await?;
596
597 let available_agents = self
598 .orchestration
599 .subagent_manager
600 .as_ref()
601 .map(|m| m.definitions().to_vec())
602 .unwrap_or_default();
603
604 let confirm_before_execute = self
605 .orchestration
606 .orchestration_config
607 .confirm_before_execute;
608
609 self.init_plan_cache_if_needed().await;
610 let goal_embedding = self.goal_embedding_for_cache(goal).await;
611
612 tracing::debug!(
613 cache_enabled = self.orchestration.orchestration_config.plan_cache.enabled,
614 has_embedding = goal_embedding.is_some(),
615 "plan cache state for goal"
616 );
617
618 let planner_provider = self
619 .orchestration
620 .planner_provider
621 .as_ref()
622 .unwrap_or(&self.provider)
623 .clone();
624 let planner = LlmPlanner::new(planner_provider, &self.orchestration.orchestration_config);
625 let embed_model = self.skill_state.embedding_model.clone();
626 let (graph, planner_usage) = plan_with_cache(
627 &planner,
628 self.orchestration.plan_cache.as_ref(),
629 &self.provider,
630 goal_embedding.as_deref(),
631 &embed_model,
632 goal,
633 &available_agents,
634 self.orchestration.orchestration_config.max_tasks,
635 )
636 .await
637 .map_err(|e| error::AgentError::Other(e.to_string()))?;
638
639 self.orchestration.pending_goal_embedding = goal_embedding;
641
642 let task_count = graph.tasks.len() as u64;
643 let snapshot = crate::metrics::TaskGraphSnapshot::from(&graph);
644 let (planner_prompt, planner_completion) = planner_usage.unwrap_or((0, 0));
645 self.update_metrics(|m| {
646 m.api_calls += 1;
647 m.prompt_tokens += planner_prompt;
648 m.completion_tokens += planner_completion;
649 m.total_tokens = m.prompt_tokens + m.completion_tokens;
650 m.orchestration.plans_total += 1;
651 m.orchestration.tasks_total += task_count;
652 m.orchestration_graph = Some(snapshot);
653 });
654 self.record_cost(planner_prompt, planner_completion);
655 self.record_cache_usage();
656
657 if confirm_before_execute {
658 let summary = format_plan_summary(&graph);
659 self.channel.send(&summary).await?;
660 self.channel
661 .send("Type `/plan confirm` to execute, or `/plan cancel` to abort.")
662 .await?;
663 self.orchestration.pending_graph = Some(graph);
664 } else {
665 let summary = format_plan_summary(&graph);
668 self.channel.send(&summary).await?;
669 self.channel
670 .send("Plan ready. Full execution will be available in a future phase.")
671 .await?;
672 let now = std::time::Instant::now();
674 self.update_metrics(|m| {
675 if let Some(ref mut s) = m.orchestration_graph {
676 "completed".clone_into(&mut s.status);
677 s.completed_at = Some(now);
678 }
679 });
680 }
682
683 Ok(())
684 }
685
686 async fn validate_pending_graph(
692 &mut self,
693 graph: crate::orchestration::TaskGraph,
694 ) -> Result<crate::orchestration::TaskGraph, ()> {
695 use crate::orchestration::GraphStatus;
696
697 if self.orchestration.subagent_manager.is_none() {
698 let _ = self
699 .channel
700 .send(
701 "No sub-agents configured. Add sub-agent definitions to config \
702 to enable plan execution.",
703 )
704 .await;
705 self.orchestration.pending_graph = Some(graph);
706 return Err(());
707 }
708
709 if graph.tasks.is_empty() {
712 let _ = self.channel.send("Plan has no tasks.").await;
713 self.orchestration.pending_graph = Some(graph);
714 return Err(());
715 }
716
717 if matches!(graph.status, GraphStatus::Completed | GraphStatus::Canceled) {
719 let _ = self
720 .channel
721 .send(&format!(
722 "Cannot re-execute a {} plan. Use `/plan <goal>` to create a new one.",
723 graph.status
724 ))
725 .await;
726 self.orchestration.pending_graph = Some(graph);
727 return Err(());
728 }
729
730 Ok(graph)
731 }
732
733 fn build_dag_scheduler(
738 &mut self,
739 graph: crate::orchestration::TaskGraph,
740 ) -> Result<(crate::orchestration::DagScheduler, usize), error::AgentError> {
741 use crate::orchestration::{DagScheduler, GraphStatus, RuleBasedRouter};
742
743 let available_agents = self
744 .orchestration
745 .subagent_manager
746 .as_ref()
747 .map(|m| m.definitions().to_vec())
748 .unwrap_or_default();
749
750 let max_concurrent = self.orchestration.subagent_config.max_concurrent;
754 let max_parallel = self.orchestration.orchestration_config.max_parallel as usize;
755 if max_concurrent < max_parallel + 1 {
756 tracing::warn!(
757 max_concurrent,
758 max_parallel,
759 "max_concurrent < max_parallel + 1: orchestration tasks may be starved by \
760 planning-phase sub-agents; recommend setting max_concurrent >= {}",
761 max_parallel + 1
762 );
763 }
764
765 let reserved = max_parallel.min(max_concurrent.saturating_sub(1));
768 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
769 mgr.reserve_slots(reserved);
770 }
771
772 let scheduler = if graph.status == GraphStatus::Created {
775 DagScheduler::new(
776 graph,
777 &self.orchestration.orchestration_config,
778 Box::new(RuleBasedRouter),
779 available_agents,
780 )
781 } else {
782 DagScheduler::resume_from(
783 graph,
784 &self.orchestration.orchestration_config,
785 Box::new(RuleBasedRouter),
786 available_agents,
787 )
788 }
789 .map_err(|e| {
790 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
792 mgr.release_reservation(reserved);
793 }
794 error::AgentError::Other(e.to_string())
795 })?;
796
797 let provider_names: Vec<&str> = self
799 .providers
800 .provider_pool
801 .iter()
802 .filter_map(|e| e.name.as_deref())
803 .collect();
804 scheduler
805 .validate_verify_config(&provider_names)
806 .map_err(|e| {
807 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
808 mgr.release_reservation(reserved);
809 }
810 error::AgentError::Other(e.to_string())
811 })?;
812
813 Ok((scheduler, reserved))
814 }
815
816 async fn handle_plan_confirm(&mut self) -> Result<(), error::AgentError> {
817 let Some(graph) = self.orchestration.pending_graph.take() else {
818 self.channel
819 .send("No pending plan to confirm. Use `/plan <goal>` to create one.")
820 .await?;
821 return Ok(());
822 };
823
824 let Ok(graph) = self.validate_pending_graph(graph).await else {
826 return Ok(());
827 };
828
829 let (mut scheduler, reserved) = self.build_dag_scheduler(graph)?;
830
831 let task_count = scheduler.graph().tasks.len();
832 self.channel
833 .send(&format!(
834 "Confirmed. Executing plan ({task_count} tasks)..."
835 ))
836 .await?;
837
838 let plan_token = CancellationToken::new();
839 self.orchestration.plan_cancel_token = Some(plan_token.clone());
840
841 let scheduler_result = self
843 .run_scheduler_loop(&mut scheduler, task_count, plan_token)
844 .await;
845 self.orchestration.plan_cancel_token = None;
846
847 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
849 mgr.release_reservation(reserved);
850 }
851
852 let final_status = scheduler_result?;
853 let completed_graph = scheduler.into_graph();
854
855 let snapshot = crate::metrics::TaskGraphSnapshot::from(&completed_graph);
857 self.update_metrics(|m| {
858 m.orchestration_graph = Some(snapshot);
859 });
860
861 let result_label = self
862 .finalize_plan_execution(completed_graph, final_status)
863 .await?;
864
865 let now = std::time::Instant::now();
866 self.update_metrics(|m| {
867 if let Some(ref mut s) = m.orchestration_graph {
868 result_label.clone_into(&mut s.status);
869 s.completed_at = Some(now);
870 }
871 });
872 Ok(())
873 }
874
875 fn cancel_agents_from_actions(
879 &mut self,
880 cancel_actions: Vec<crate::orchestration::SchedulerAction>,
881 ) -> Option<crate::orchestration::GraphStatus> {
882 use crate::orchestration::SchedulerAction;
883 for action in cancel_actions {
884 match action {
885 SchedulerAction::Cancel { agent_handle_id } => {
886 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
887 let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
888 tracing::trace!(error = %e, "cancel: agent already gone");
889 });
890 }
891 }
892 SchedulerAction::Done { status } => return Some(status),
893 SchedulerAction::Spawn { .. }
894 | SchedulerAction::RunInline { .. }
895 | SchedulerAction::Verify { .. } => {}
896 }
897 }
898 None
899 }
900
901 async fn handle_scheduler_spawn_action(
906 &mut self,
907 scheduler: &mut crate::orchestration::DagScheduler,
908 task_id: crate::orchestration::TaskId,
909 agent_def_name: String,
910 prompt: String,
911 spawn_counter: &mut usize,
912 task_count: usize,
913 ) -> (bool, bool, Option<crate::orchestration::GraphStatus>) {
914 let task_title = scheduler
915 .graph()
916 .tasks
917 .get(task_id.index())
918 .map_or("unknown", |t| t.title.as_str());
919
920 let provider = self.provider.clone();
921 let tool_executor = Arc::clone(&self.tool_executor);
922 let skills = self.filtered_skills_for(&agent_def_name);
923 let cfg = self.orchestration.subagent_config.clone();
924 let event_tx = scheduler.event_sender();
925
926 let mgr = self
927 .orchestration
928 .subagent_manager
929 .as_mut()
930 .expect("subagent_manager checked above");
931
932 let on_done = {
933 use crate::orchestration::{TaskEvent, TaskOutcome};
934 move |handle_id: String, result: Result<String, crate::subagent::SubAgentError>| {
935 let outcome = match &result {
936 Ok(output) => TaskOutcome::Completed {
937 output: output.clone(),
938 artifacts: vec![],
939 },
940 Err(e) => TaskOutcome::Failed {
941 error: e.to_string(),
942 },
943 };
944 let tx = event_tx;
945 tokio::spawn(async move {
946 if let Err(e) = tx
947 .send(TaskEvent {
948 task_id,
949 agent_handle_id: handle_id,
950 outcome,
951 })
952 .await
953 {
954 tracing::warn!(
955 error = %e,
956 "failed to send TaskEvent: scheduler may have been dropped"
957 );
958 }
959 });
960 }
961 };
962
963 match mgr.spawn_for_task(
964 &agent_def_name,
965 &prompt,
966 provider,
967 tool_executor,
968 skills,
969 &cfg,
970 on_done,
971 ) {
972 Ok(handle_id) => {
973 *spawn_counter += 1;
974 let _ = self
975 .channel
976 .send_status(&format!(
977 "Executing task {spawn_counter}/{task_count}: {task_title}..."
978 ))
979 .await;
980 scheduler.record_spawn(task_id, handle_id, agent_def_name);
981 (true, false, None)
982 }
983 Err(e) => {
984 tracing::error!(error = %e, %task_id, "spawn_for_task failed");
985 let concurrency_fail =
986 matches!(e, crate::subagent::SubAgentError::ConcurrencyLimit { .. });
987 let extra = scheduler.record_spawn_failure(task_id, &e);
988 let done_status = self.cancel_agents_from_actions(extra);
989 (false, concurrency_fail, done_status)
990 }
991 }
992 }
993
994 async fn handle_run_inline_action(
999 &mut self,
1000 scheduler: &mut crate::orchestration::DagScheduler,
1001 task_id: crate::orchestration::TaskId,
1002 prompt: String,
1003 spawn_counter: usize,
1004 task_count: usize,
1005 cancel_token: &CancellationToken,
1006 ) {
1007 let task_title = scheduler
1008 .graph()
1009 .tasks
1010 .get(task_id.index())
1011 .map_or("unknown", |t| t.title.as_str());
1012 let _ = self
1013 .channel
1014 .send_status(&format!(
1015 "Executing task {spawn_counter}/{task_count} (inline): {task_title}..."
1016 ))
1017 .await;
1018
1019 let handle_id = format!("__inline_{task_id}__");
1022 scheduler.record_spawn(task_id, handle_id.clone(), "__main__".to_string());
1023
1024 let event_tx = scheduler.event_sender();
1025 let max_iter = self.tool_orchestrator.max_iterations;
1026 let outcome = tokio::select! {
1027 result = self.run_inline_tool_loop(&prompt, max_iter) => {
1028 match result {
1029 Ok(output) => crate::orchestration::TaskOutcome::Completed {
1030 output,
1031 artifacts: vec![],
1032 },
1033 Err(e) => crate::orchestration::TaskOutcome::Failed {
1034 error: e.to_string(),
1035 },
1036 }
1037 }
1038 () = cancel_token.cancelled() => {
1039 crate::orchestration::TaskOutcome::Failed {
1041 error: "canceled".to_string(),
1042 }
1043 }
1044 };
1045 let event = crate::orchestration::TaskEvent {
1046 task_id,
1047 agent_handle_id: handle_id,
1048 outcome,
1049 };
1050 if let Err(e) = event_tx.send(event).await {
1051 tracing::warn!(%task_id, error = %e, "inline task event send failed");
1052 }
1053 }
1054
1055 #[allow(clippy::too_many_lines)]
1060 async fn run_scheduler_loop(
1076 &mut self,
1077 scheduler: &mut crate::orchestration::DagScheduler,
1078 task_count: usize,
1079 cancel_token: CancellationToken,
1080 ) -> Result<crate::orchestration::GraphStatus, error::AgentError> {
1081 use crate::orchestration::SchedulerAction;
1082
1083 let mut spawn_counter: usize = 0;
1087
1088 let mut denied_secrets: std::collections::HashSet<(String, String)> =
1091 std::collections::HashSet::new();
1092
1093 let final_status = 'tick: loop {
1094 let actions = scheduler.tick();
1095
1096 let mut any_spawn_success = false;
1098 let mut any_concurrency_failure = false;
1099
1100 for action in actions {
1101 match action {
1102 SchedulerAction::Spawn {
1103 task_id,
1104 agent_def_name,
1105 prompt,
1106 } => {
1107 let (success, fail, done) = self
1108 .handle_scheduler_spawn_action(
1109 scheduler,
1110 task_id,
1111 agent_def_name,
1112 prompt,
1113 &mut spawn_counter,
1114 task_count,
1115 )
1116 .await;
1117 any_spawn_success |= success;
1118 any_concurrency_failure |= fail;
1119 if let Some(s) = done {
1120 break 'tick s;
1121 }
1122 }
1123 SchedulerAction::Cancel { agent_handle_id } => {
1124 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1125 let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1127 tracing::trace!(error = %e, "cancel: agent already gone");
1128 });
1129 }
1130 }
1131 SchedulerAction::RunInline { task_id, prompt } => {
1142 spawn_counter += 1;
1143 self.handle_run_inline_action(
1144 scheduler,
1145 task_id,
1146 prompt,
1147 spawn_counter,
1148 task_count,
1149 &cancel_token,
1150 )
1151 .await;
1152 }
1153 SchedulerAction::Done { status } => {
1154 break 'tick status;
1155 }
1156 SchedulerAction::Verify { .. } => {
1157 }
1161 }
1162 }
1163
1164 scheduler.record_batch_backoff(any_spawn_success, any_concurrency_failure);
1166
1167 self.process_pending_secret_requests(&mut denied_secrets)
1169 .await;
1170
1171 let snapshot = crate::metrics::TaskGraphSnapshot::from(scheduler.graph());
1173 self.update_metrics(|m| {
1174 m.orchestration_graph = Some(snapshot);
1175 });
1176
1177 tokio::select! {
1188 biased;
1190 () = cancel_token.cancelled() => {
1191 let cancel_actions = scheduler.cancel_all();
1192 for action in cancel_actions {
1193 match action {
1194 SchedulerAction::Cancel { agent_handle_id } => {
1195 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1196 let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1197 tracing::trace!(
1198 error = %e,
1199 "cancel during plan cancellation: agent already gone"
1200 );
1201 });
1202 }
1203 }
1204 SchedulerAction::Done { status } => {
1205 break 'tick status;
1206 }
1207 SchedulerAction::Spawn { .. }
1208 | SchedulerAction::RunInline { .. }
1209 | SchedulerAction::Verify { .. } => {}
1210 }
1211 }
1212 break 'tick crate::orchestration::GraphStatus::Canceled;
1215 }
1216 () = scheduler.wait_event() => {}
1217 result = self.channel.recv() => {
1218 if let Ok(Some(msg)) = result {
1219 if msg.text.trim().eq_ignore_ascii_case("/plan cancel") {
1220 let _ = self.channel.send_status("Canceling plan...").await;
1221 let cancel_actions = scheduler.cancel_all();
1222 for ca in cancel_actions {
1223 match ca {
1224 SchedulerAction::Cancel { agent_handle_id } => {
1225 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1226 let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1228 tracing::trace!(error = %e, "cancel on user request: agent already gone");
1229 });
1230 }
1231 }
1232 SchedulerAction::Done { status } => {
1233 break 'tick status;
1234 }
1235 SchedulerAction::Spawn { .. }
1236 | SchedulerAction::RunInline { .. }
1237 | SchedulerAction::Verify { .. } => {}
1238 }
1239 }
1240 break 'tick crate::orchestration::GraphStatus::Canceled;
1243 }
1244 self.enqueue_or_merge(msg.text, vec![], msg.attachments);
1245 } else {
1246 let drain_actions = scheduler.tick();
1252 let mut natural_done: Option<crate::orchestration::GraphStatus> = None;
1253 for action in drain_actions {
1254 match action {
1255 SchedulerAction::Cancel { agent_handle_id } => {
1256 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1257 let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1258 tracing::trace!(
1259 error = %e,
1260 "cancel during drain on channel close: agent already gone"
1261 );
1262 });
1263 }
1264 }
1265 SchedulerAction::Done { status } => {
1266 natural_done = Some(status);
1267 }
1268 SchedulerAction::Spawn { .. }
1270 | SchedulerAction::RunInline { .. }
1271 | SchedulerAction::Verify { .. } => {}
1272 }
1273 }
1274
1275 if let Some(status) = natural_done {
1277 break 'tick status;
1278 }
1279
1280 let cancel_actions = scheduler.cancel_all();
1282 let n = cancel_actions
1283 .iter()
1284 .filter(|a| matches!(a, SchedulerAction::Cancel { .. }))
1285 .count();
1286 let shutdown_status = if self.channel.supports_exit() {
1291 crate::orchestration::GraphStatus::Canceled
1292 } else {
1293 crate::orchestration::GraphStatus::Failed
1294 };
1295 tracing::warn!(
1296 sub_agents = n,
1297 supports_exit = self.channel.supports_exit(),
1298 status = ?shutdown_status,
1299 "scheduler channel closed, canceling running sub-agents"
1300 );
1301 for action in cancel_actions {
1302 match action {
1303 SchedulerAction::Cancel { agent_handle_id } => {
1304 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1305 let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1306 tracing::trace!(
1307 error = %e,
1308 "cancel on channel close: agent already gone"
1309 );
1310 });
1311 }
1312 }
1313 SchedulerAction::Done { .. }
1315 | SchedulerAction::Spawn { .. }
1316 | SchedulerAction::RunInline { .. }
1317 | SchedulerAction::Verify { .. } => {}
1318 }
1319 }
1320 break 'tick shutdown_status;
1321 }
1322 }
1323 () = shutdown_signal(&mut self.lifecycle.shutdown) => {
1325 let cancel_actions = scheduler.cancel_all();
1326 let n = cancel_actions
1327 .iter()
1328 .filter(|a| matches!(a, SchedulerAction::Cancel { .. }))
1329 .count();
1330 tracing::warn!(sub_agents = n, "shutdown signal received, canceling running sub-agents");
1331 for action in cancel_actions {
1332 match action {
1333 SchedulerAction::Cancel { agent_handle_id } => {
1334 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1335 let _ = mgr.cancel(&agent_handle_id).inspect_err(|e| {
1336 tracing::trace!(
1337 error = %e,
1338 "cancel on shutdown: agent already gone"
1339 );
1340 });
1341 }
1342 }
1343 SchedulerAction::Done { status } => {
1344 break 'tick status;
1345 }
1346 SchedulerAction::Spawn { .. }
1347 | SchedulerAction::RunInline { .. }
1348 | SchedulerAction::Verify { .. } => {}
1349 }
1350 }
1351 break 'tick crate::orchestration::GraphStatus::Canceled;
1354 }
1355 }
1356 };
1357
1358 self.process_pending_secret_requests(&mut std::collections::HashSet::new())
1361 .await;
1362
1363 Ok(final_status)
1364 }
1365
1366 async fn run_inline_tool_loop(
1373 &self,
1374 prompt: &str,
1375 max_iterations: usize,
1376 ) -> Result<String, zeph_llm::LlmError> {
1377 use zeph_llm::provider::{ChatResponse, Message, MessagePart, Role, ToolDefinition};
1378 use zeph_tools::executor::ToolCall;
1379
1380 let tool_defs: Vec<ToolDefinition> = self
1385 .tool_executor
1386 .tool_definitions_erased()
1387 .iter()
1388 .map(tool_execution::tool_def_to_definition)
1389 .collect();
1390
1391 tracing::debug!(
1392 prompt_len = prompt.len(),
1393 max_iterations,
1394 tool_count = tool_defs.len(),
1395 "inline tool loop: starting"
1396 );
1397
1398 let mut messages: Vec<Message> = vec![Message::from_legacy(Role::User, prompt)];
1399 let mut last_text = String::new();
1400
1401 for iteration in 0..max_iterations {
1402 let response = self.provider.chat_with_tools(&messages, &tool_defs).await?;
1403
1404 match response {
1405 ChatResponse::Text(text) => {
1406 tracing::debug!(iteration, "inline tool loop: text response, returning");
1407 return Ok(text);
1408 }
1409 ChatResponse::ToolUse {
1410 text, tool_calls, ..
1411 } => {
1412 tracing::debug!(
1413 iteration,
1414 tools = ?tool_calls.iter().map(|tc| &tc.name).collect::<Vec<_>>(),
1415 "inline tool loop: tool use"
1416 );
1417
1418 if let Some(ref t) = text {
1419 last_text.clone_from(t);
1420 }
1421
1422 let mut parts: Vec<MessagePart> = Vec::new();
1424 if let Some(ref t) = text
1425 && !t.is_empty()
1426 {
1427 parts.push(MessagePart::Text { text: t.clone() });
1428 }
1429 for tc in &tool_calls {
1430 parts.push(MessagePart::ToolUse {
1431 id: tc.id.clone(),
1432 name: tc.name.clone(),
1433 input: tc.input.clone(),
1434 });
1435 }
1436 messages.push(Message::from_parts(Role::Assistant, parts));
1437
1438 let mut result_parts: Vec<MessagePart> = Vec::new();
1440 for tc in &tool_calls {
1441 let call = ToolCall {
1442 tool_id: tc.name.clone(),
1443 params: match &tc.input {
1444 serde_json::Value::Object(map) => map.clone(),
1445 _ => serde_json::Map::new(),
1446 },
1447 };
1448 let output = match self.tool_executor.execute_tool_call_erased(&call).await
1449 {
1450 Ok(Some(out)) => out.summary,
1451 Ok(None) => "(no output)".to_owned(),
1452 Err(e) => format!("[error] {e}"),
1453 };
1454 let is_error = output.starts_with("[error]");
1455 result_parts.push(MessagePart::ToolResult {
1456 tool_use_id: tc.id.clone(),
1457 content: output,
1458 is_error,
1459 });
1460 }
1461 messages.push(Message::from_parts(Role::User, result_parts));
1462 }
1463 }
1464 }
1465
1466 tracing::debug!(
1467 max_iterations,
1468 last_text_empty = last_text.is_empty(),
1469 "inline tool loop: iteration limit reached"
1470 );
1471 Ok(last_text)
1472 }
1473
1474 async fn process_pending_secret_requests(
1482 &mut self,
1483 denied: &mut std::collections::HashSet<(String, String)>,
1484 ) {
1485 loop {
1486 let pending = self
1487 .orchestration
1488 .subagent_manager
1489 .as_mut()
1490 .and_then(crate::subagent::SubAgentManager::try_recv_secret_request);
1491 let Some((req_handle_id, req)) = pending else {
1492 break;
1493 };
1494 let deny_key = (req_handle_id.clone(), req.secret_key.clone());
1495 if denied.contains(&deny_key) {
1496 tracing::debug!(
1497 handle_id = %req_handle_id,
1498 secret_key = %req.secret_key,
1499 "skipping duplicate secret prompt for already-denied key"
1500 );
1501 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1502 let _ = mgr.deny_secret(&req_handle_id);
1503 }
1504 continue;
1505 }
1506 let prompt = format!(
1507 "Sub-agent requests secret '{}'. Allow?{}",
1508 crate::text::truncate_to_chars(&req.secret_key, 100),
1509 req.reason
1510 .as_deref()
1511 .map(|r| format!(" Reason: {}", crate::text::truncate_to_chars(r, 200)))
1512 .unwrap_or_default()
1513 );
1514 let approved = tokio::select! {
1516 result = self.channel.confirm(&prompt) => result.unwrap_or(false),
1517 () = tokio::time::sleep(std::time::Duration::from_secs(120)) => {
1518 let _ = self.channel.send("Secret request timed out.").await;
1519 false
1520 }
1521 };
1522 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
1523 if approved {
1524 let ttl = std::time::Duration::from_secs(300);
1525 let key = req.secret_key.clone();
1526 if mgr.approve_secret(&req_handle_id, &key, ttl).is_ok() {
1527 let _ = mgr.deliver_secret(&req_handle_id, key);
1528 }
1529 } else {
1530 denied.insert(deny_key);
1531 let _ = mgr.deny_secret(&req_handle_id);
1532 }
1533 }
1534 }
1535 }
1536
1537 #[allow(clippy::too_many_lines)]
1539 async fn finalize_plan_execution(
1540 &mut self,
1541 completed_graph: crate::orchestration::TaskGraph,
1542 final_status: crate::orchestration::GraphStatus,
1543 ) -> Result<&'static str, error::AgentError> {
1544 use std::fmt::Write;
1545
1546 use crate::orchestration::{Aggregator, GraphStatus, LlmAggregator};
1547
1548 let result_label = match final_status {
1549 GraphStatus::Completed => {
1550 let completed_count = completed_graph
1552 .tasks
1553 .iter()
1554 .filter(|t| t.status == crate::orchestration::TaskStatus::Completed)
1555 .count() as u64;
1556 let skipped_count = completed_graph
1557 .tasks
1558 .iter()
1559 .filter(|t| t.status == crate::orchestration::TaskStatus::Skipped)
1560 .count() as u64;
1561 self.update_metrics(|m| {
1562 m.orchestration.tasks_completed += completed_count;
1563 m.orchestration.tasks_skipped += skipped_count;
1564 });
1565
1566 let aggregator = LlmAggregator::new(
1567 self.provider.clone(),
1568 &self.orchestration.orchestration_config,
1569 );
1570 match aggregator.aggregate(&completed_graph).await {
1571 Ok((synthesis, aggregator_usage)) => {
1572 let (aggr_prompt, aggr_completion) = aggregator_usage.unwrap_or((0, 0));
1573 self.update_metrics(|m| {
1574 m.api_calls += 1;
1575 m.prompt_tokens += aggr_prompt;
1576 m.completion_tokens += aggr_completion;
1577 m.total_tokens = m.prompt_tokens + m.completion_tokens;
1578 });
1579 self.record_cost(aggr_prompt, aggr_completion);
1580 self.record_cache_usage();
1581 self.channel.send(&synthesis).await?;
1582 }
1583 Err(e) => {
1584 tracing::error!(error = %e, "aggregation failed");
1585 self.channel
1586 .send(
1587 "Plan completed but aggregation failed. \
1588 Check individual task results.",
1589 )
1590 .await?;
1591 }
1592 }
1593
1594 if let Some(ref cache) = self.orchestration.plan_cache
1596 && let Some(embedding) = self.orchestration.pending_goal_embedding.take()
1597 {
1598 let embed_model = self.skill_state.embedding_model.clone();
1599 if let Err(e) = cache
1600 .cache_plan(&completed_graph, &embedding, &embed_model)
1601 .await
1602 {
1603 tracing::warn!(error = %e, "plan cache: failed to cache completed plan");
1604 }
1605 }
1606
1607 "completed"
1608 }
1609 GraphStatus::Failed => {
1610 let failed_tasks: Vec<_> = completed_graph
1611 .tasks
1612 .iter()
1613 .filter(|t| t.status == crate::orchestration::TaskStatus::Failed)
1614 .collect();
1615 let cancelled_tasks: Vec<_> = completed_graph
1616 .tasks
1617 .iter()
1618 .filter(|t| t.status == crate::orchestration::TaskStatus::Canceled)
1619 .collect();
1620 let completed_count = completed_graph
1621 .tasks
1622 .iter()
1623 .filter(|t| t.status == crate::orchestration::TaskStatus::Completed)
1624 .count() as u64;
1625 let skipped_count = completed_graph
1626 .tasks
1627 .iter()
1628 .filter(|t| t.status == crate::orchestration::TaskStatus::Skipped)
1629 .count() as u64;
1630 self.update_metrics(|m| {
1631 m.orchestration.tasks_failed += failed_tasks.len() as u64;
1632 m.orchestration.tasks_completed += completed_count;
1633 m.orchestration.tasks_skipped += skipped_count;
1634 });
1635 let total = completed_graph.tasks.len();
1636 let msg = if failed_tasks.is_empty() && !cancelled_tasks.is_empty() {
1637 format!(
1639 "Plan canceled. {}/{} tasks did not run.\n\
1640 Use `/plan retry` to retry or check logs for details.",
1641 cancelled_tasks.len(),
1642 total
1643 )
1644 } else if failed_tasks.is_empty() && cancelled_tasks.is_empty() {
1645 tracing::warn!(
1647 "plan finished with GraphStatus::Failed but no failed or canceled tasks"
1648 );
1649 "Plan failed. No task errors recorded; check logs for details.".to_string()
1650 } else {
1651 let mut m = if cancelled_tasks.is_empty() {
1652 format!(
1653 "Plan failed. {}/{} tasks failed:\n",
1654 failed_tasks.len(),
1655 total
1656 )
1657 } else {
1658 format!(
1659 "Plan failed. {}/{} tasks failed, {} canceled:\n",
1660 failed_tasks.len(),
1661 total,
1662 cancelled_tasks.len()
1663 )
1664 };
1665 for t in &failed_tasks {
1666 let err: std::borrow::Cow<str> =
1668 t.result.as_ref().map_or("unknown error".into(), |r| {
1669 if r.output.len() > 500 {
1670 r.output.chars().take(500).collect::<String>().into()
1671 } else {
1672 r.output.as_str().into()
1673 }
1674 });
1675 let _ = writeln!(m, " - {}: {err}", t.title);
1676 }
1677 m.push_str("\nUse `/plan retry` to retry failed tasks.");
1678 m
1679 };
1680 self.channel.send(&msg).await?;
1681 self.orchestration.pending_graph = Some(completed_graph);
1686 "failed"
1687 }
1688 GraphStatus::Paused => {
1689 self.channel
1690 .send(
1691 "Plan paused due to a task failure (ask strategy). \
1692 Use `/plan resume` to continue or `/plan retry` to retry failed tasks.",
1693 )
1694 .await?;
1695 self.orchestration.pending_graph = Some(completed_graph);
1697 "paused"
1698 }
1699 GraphStatus::Canceled => {
1700 let done_count = completed_graph
1701 .tasks
1702 .iter()
1703 .filter(|t| t.status == crate::orchestration::TaskStatus::Completed)
1704 .count();
1705 self.update_metrics(|m| m.orchestration.tasks_completed += done_count as u64);
1706 let total = completed_graph.tasks.len();
1707 self.channel
1708 .send(&format!(
1709 "Plan canceled. {done_count}/{total} tasks completed before cancellation."
1710 ))
1711 .await?;
1712 self.orchestration.pending_goal_embedding.take();
1715 "canceled"
1716 }
1717 other => {
1718 tracing::warn!(%other, "unexpected graph status after Done");
1719 self.channel
1720 .send(&format!("Plan ended with status: {other}"))
1721 .await?;
1722 self.orchestration.pending_goal_embedding.take();
1723 "unknown"
1724 }
1725 };
1726 Ok(result_label)
1727 }
1728
1729 async fn handle_plan_status(
1730 &mut self,
1731 _graph_id: Option<&str>,
1732 ) -> Result<(), error::AgentError> {
1733 use crate::orchestration::GraphStatus;
1734 let Some(ref graph) = self.orchestration.pending_graph else {
1735 self.channel.send("No active plan.").await?;
1736 return Ok(());
1737 };
1738 let msg = match graph.status {
1739 GraphStatus::Created => {
1740 "A plan is awaiting confirmation. Type `/plan confirm` to execute or `/plan cancel` to abort."
1741 }
1742 GraphStatus::Running => "Plan is currently running.",
1743 GraphStatus::Paused => {
1744 "Plan is paused. Use `/plan resume` to continue or `/plan cancel` to abort."
1745 }
1746 GraphStatus::Failed => {
1747 "Plan failed. Use `/plan retry` to retry or `/plan cancel` to discard."
1748 }
1749 GraphStatus::Completed => "Plan completed successfully.",
1750 GraphStatus::Canceled => "Plan was canceled.",
1751 };
1752 self.channel.send(msg).await?;
1753 Ok(())
1754 }
1755
1756 async fn handle_plan_list(&mut self) -> Result<(), error::AgentError> {
1757 if let Some(ref graph) = self.orchestration.pending_graph {
1758 let summary = format_plan_summary(graph);
1759 let status_label = match graph.status {
1760 crate::orchestration::GraphStatus::Created => "awaiting confirmation",
1761 crate::orchestration::GraphStatus::Running => "running",
1762 crate::orchestration::GraphStatus::Paused => "paused",
1763 crate::orchestration::GraphStatus::Failed => "failed (retryable)",
1764 _ => "unknown",
1765 };
1766 self.channel
1767 .send(&format!("{summary}\nStatus: {status_label}"))
1768 .await?;
1769 } else {
1770 self.channel.send("No recent plans.").await?;
1771 }
1772 Ok(())
1773 }
1774
1775 async fn handle_plan_cancel(
1776 &mut self,
1777 _graph_id: Option<&str>,
1778 ) -> Result<(), error::AgentError> {
1779 if let Some(token) = self.orchestration.plan_cancel_token.take() {
1780 token.cancel();
1787 self.channel.send("Canceling plan execution...").await?;
1788 } else if self.orchestration.pending_graph.take().is_some() {
1789 let now = std::time::Instant::now();
1790 self.update_metrics(|m| {
1791 if let Some(ref mut s) = m.orchestration_graph {
1792 "canceled".clone_into(&mut s.status);
1793 s.completed_at = Some(now);
1794 }
1795 });
1796 self.orchestration.pending_goal_embedding = None;
1797 self.channel.send("Plan canceled.").await?;
1798 } else {
1799 self.channel.send("No active plan to cancel.").await?;
1800 }
1801 Ok(())
1802 }
1803
1804 async fn handle_plan_resume(
1809 &mut self,
1810 graph_id: Option<&str>,
1811 ) -> Result<(), error::AgentError> {
1812 use crate::orchestration::GraphStatus;
1813
1814 let Some(ref graph) = self.orchestration.pending_graph else {
1815 self.channel
1816 .send("No paused plan to resume. Use `/plan status` to check the current state.")
1817 .await?;
1818 return Ok(());
1819 };
1820
1821 if let Some(id) = graph_id
1823 && graph.id.to_string() != id
1824 {
1825 self.channel
1826 .send(&format!(
1827 "Graph id '{id}' does not match the active plan ({}). \
1828 Use `/plan status` to see the active plan id.",
1829 graph.id
1830 ))
1831 .await?;
1832 return Ok(());
1833 }
1834
1835 if graph.status != GraphStatus::Paused {
1836 self.channel
1837 .send(&format!(
1838 "The active plan is in '{}' status and cannot be resumed. \
1839 Only Paused plans can be resumed.",
1840 graph.status
1841 ))
1842 .await?;
1843 return Ok(());
1844 }
1845
1846 let graph = self.orchestration.pending_graph.take().unwrap();
1847
1848 tracing::info!(
1849 graph_id = %graph.id,
1850 "resuming paused graph"
1851 );
1852
1853 self.channel
1854 .send(&format!(
1855 "Resuming plan: {}\nUse `/plan confirm` to continue execution.",
1856 graph.goal
1857 ))
1858 .await?;
1859
1860 self.orchestration.pending_graph = Some(graph);
1862 Ok(())
1863 }
1864
1865 async fn handle_plan_retry(&mut self, graph_id: Option<&str>) -> Result<(), error::AgentError> {
1871 use crate::orchestration::{GraphStatus, dag};
1872
1873 let Some(ref graph) = self.orchestration.pending_graph else {
1874 self.channel
1875 .send("No active plan to retry. Use `/plan status` to check the current state.")
1876 .await?;
1877 return Ok(());
1878 };
1879
1880 if let Some(id) = graph_id
1882 && graph.id.to_string() != id
1883 {
1884 self.channel
1885 .send(&format!(
1886 "Graph id '{id}' does not match the active plan ({}). \
1887 Use `/plan status` to see the active plan id.",
1888 graph.id
1889 ))
1890 .await?;
1891 return Ok(());
1892 }
1893
1894 if graph.status != GraphStatus::Failed && graph.status != GraphStatus::Paused {
1895 self.channel
1896 .send(&format!(
1897 "The active plan is in '{}' status. Only Failed or Paused plans can be retried.",
1898 graph.status
1899 ))
1900 .await?;
1901 return Ok(());
1902 }
1903
1904 let mut graph = self.orchestration.pending_graph.take().unwrap();
1905
1906 let failed_count = graph
1908 .tasks
1909 .iter()
1910 .filter(|t| t.status == crate::orchestration::TaskStatus::Failed)
1911 .count();
1912
1913 dag::reset_for_retry(&mut graph).map_err(|e| error::AgentError::Other(e.to_string()))?;
1914
1915 for task in &mut graph.tasks {
1920 if task.status == crate::orchestration::TaskStatus::Running {
1921 task.status = crate::orchestration::TaskStatus::Ready;
1922 task.assigned_agent = None;
1923 }
1924 }
1925
1926 tracing::info!(
1927 graph_id = %graph.id,
1928 failed_count,
1929 "retrying failed tasks in graph"
1930 );
1931
1932 self.channel
1933 .send(&format!(
1934 "Retrying {failed_count} failed task(s) in plan: {}\n\
1935 Use `/plan confirm` to execute.",
1936 graph.goal
1937 ))
1938 .await?;
1939
1940 self.orchestration.pending_graph = Some(graph);
1942 Ok(())
1943 }
1944
1945 async fn call_llm_for_session_summary(
1954 &self,
1955 chat_messages: &[Message],
1956 ) -> Option<zeph_memory::StructuredSummary> {
1957 let timeout_dur =
1958 std::time::Duration::from_secs(self.memory_state.shutdown_summary_timeout_secs);
1959 match tokio::time::timeout(
1960 timeout_dur,
1961 self.provider
1962 .chat_typed_erased::<zeph_memory::StructuredSummary>(chat_messages),
1963 )
1964 .await
1965 {
1966 Ok(Ok(s)) => Some(s),
1967 Ok(Err(e)) => {
1968 tracing::warn!(
1969 "shutdown summary: structured LLM call failed, falling back to plain: {e:#}"
1970 );
1971 self.plain_text_summary_fallback(chat_messages, timeout_dur)
1972 .await
1973 }
1974 Err(_) => {
1975 tracing::warn!(
1976 "shutdown summary: structured LLM call timed out after {}s, falling back to plain",
1977 self.memory_state.shutdown_summary_timeout_secs
1978 );
1979 self.plain_text_summary_fallback(chat_messages, timeout_dur)
1980 .await
1981 }
1982 }
1983 }
1984
1985 async fn plain_text_summary_fallback(
1986 &self,
1987 chat_messages: &[Message],
1988 timeout_dur: std::time::Duration,
1989 ) -> Option<zeph_memory::StructuredSummary> {
1990 match tokio::time::timeout(timeout_dur, self.provider.chat(chat_messages)).await {
1991 Ok(Ok(plain)) => Some(zeph_memory::StructuredSummary {
1992 summary: plain,
1993 key_facts: vec![],
1994 entities: vec![],
1995 }),
1996 Ok(Err(e)) => {
1997 tracing::warn!("shutdown summary: plain LLM fallback failed: {e:#}");
1998 None
1999 }
2000 Err(_) => {
2001 tracing::warn!("shutdown summary: plain LLM fallback timed out");
2002 None
2003 }
2004 }
2005 }
2006
2007 async fn maybe_store_shutdown_summary(&mut self) {
2017 if !self.memory_state.shutdown_summary {
2018 return;
2019 }
2020 let Some(memory) = self.memory_state.memory.clone() else {
2021 return;
2022 };
2023 let Some(conversation_id) = self.memory_state.conversation_id else {
2024 return;
2025 };
2026
2027 match memory.has_session_summary(conversation_id).await {
2029 Ok(true) => {
2030 tracing::debug!("shutdown summary: session already has a summary, skipping");
2031 return;
2032 }
2033 Ok(false) => {}
2034 Err(e) => {
2035 tracing::warn!("shutdown summary: failed to check existing summary: {e:#}");
2036 return;
2037 }
2038 }
2039
2040 let user_count = self
2042 .msg
2043 .messages
2044 .iter()
2045 .skip(1)
2046 .filter(|m| m.role == Role::User)
2047 .count();
2048 if user_count < self.memory_state.shutdown_summary_min_messages {
2049 tracing::debug!(
2050 user_count,
2051 min = self.memory_state.shutdown_summary_min_messages,
2052 "shutdown summary: too few user messages, skipping"
2053 );
2054 return;
2055 }
2056
2057 let _ = self.channel.send_status("Saving session summary...").await;
2059
2060 let max = self.memory_state.shutdown_summary_max_messages;
2062 if max == 0 {
2063 tracing::debug!("shutdown summary: max_messages=0, skipping");
2064 return;
2065 }
2066 let non_system: Vec<_> = self.msg.messages.iter().skip(1).collect();
2067 let slice = if non_system.len() > max {
2068 &non_system[non_system.len() - max..]
2069 } else {
2070 &non_system[..]
2071 };
2072
2073 let msgs_for_prompt: Vec<(zeph_memory::MessageId, String, String)> = slice
2074 .iter()
2075 .map(|m| {
2076 let role = match m.role {
2077 Role::User => "user".to_owned(),
2078 Role::Assistant => "assistant".to_owned(),
2079 Role::System => "system".to_owned(),
2080 };
2081 (zeph_memory::MessageId(0), role, m.content.clone())
2082 })
2083 .collect();
2084
2085 let prompt = zeph_memory::build_summarization_prompt(&msgs_for_prompt);
2086 let chat_messages = vec![Message {
2087 role: Role::User,
2088 content: prompt,
2089 parts: vec![],
2090 metadata: MessageMetadata::default(),
2091 }];
2092
2093 let Some(structured) = self.call_llm_for_session_summary(&chat_messages).await else {
2094 let _ = self.channel.send_status("").await;
2095 return;
2096 };
2097
2098 if let Err(e) = memory
2099 .store_shutdown_summary(conversation_id, &structured.summary, &structured.key_facts)
2100 .await
2101 {
2102 tracing::warn!("shutdown summary: storage failed: {e:#}");
2103 } else {
2104 tracing::info!(
2105 conversation_id = conversation_id.0,
2106 "shutdown summary stored"
2107 );
2108 }
2109
2110 let _ = self.channel.send_status("").await;
2112 }
2113
2114 pub async fn shutdown(&mut self) {
2115 self.channel.send("Shutting down...").await.ok();
2116
2117 self.provider.save_router_state();
2119
2120 if let Some(ref mut mgr) = self.orchestration.subagent_manager {
2121 mgr.shutdown_all();
2122 }
2123
2124 if let Some(ref manager) = self.mcp.manager {
2125 manager.shutdown_all_shared().await;
2126 }
2127
2128 if let Some(turns) = self.context_manager.turns_since_last_hard_compaction {
2132 self.update_metrics(|m| {
2133 m.compaction_turns_after_hard.push(turns);
2134 });
2135 self.context_manager.turns_since_last_hard_compaction = None;
2136 }
2137
2138 if let Some(ref tx) = self.metrics.metrics_tx {
2139 let m = tx.borrow();
2140 if m.filter_applications > 0 {
2141 #[allow(clippy::cast_precision_loss)]
2142 let pct = if m.filter_raw_tokens > 0 {
2143 m.filter_saved_tokens as f64 / m.filter_raw_tokens as f64 * 100.0
2144 } else {
2145 0.0
2146 };
2147 tracing::info!(
2148 raw_tokens = m.filter_raw_tokens,
2149 saved_tokens = m.filter_saved_tokens,
2150 applications = m.filter_applications,
2151 "tool output filtering saved ~{} tokens ({pct:.0}%)",
2152 m.filter_saved_tokens,
2153 );
2154 }
2155 if m.compaction_hard_count > 0 {
2156 tracing::info!(
2157 hard_compactions = m.compaction_hard_count,
2158 turns_after_hard = ?m.compaction_turns_after_hard,
2159 "hard compaction trajectory"
2160 );
2161 }
2162 }
2163
2164 self.maybe_store_shutdown_summary().await;
2165
2166 tracing::info!("agent shutdown complete");
2167 }
2168
2169 fn refresh_subagent_metrics(&mut self) {
2176 let Some(ref mgr) = self.orchestration.subagent_manager else {
2177 return;
2178 };
2179 let sub_agent_metrics: Vec<crate::metrics::SubAgentMetrics> = mgr
2180 .statuses()
2181 .into_iter()
2182 .map(|(id, s)| {
2183 let def = mgr.agents_def(&id);
2184 crate::metrics::SubAgentMetrics {
2185 name: def.map_or_else(|| id[..8.min(id.len())].to_owned(), |d| d.name.clone()),
2186 id: id.clone(),
2187 state: format!("{:?}", s.state).to_lowercase(),
2188 turns_used: s.turns_used,
2189 max_turns: def.map_or(20, |d| d.permissions.max_turns),
2190 background: def.is_some_and(|d| d.permissions.background),
2191 elapsed_secs: s.started_at.elapsed().as_secs(),
2192 permission_mode: def.map_or_else(String::new, |d| {
2193 use crate::subagent::def::PermissionMode;
2194 match d.permissions.permission_mode {
2195 PermissionMode::Default => String::new(),
2196 PermissionMode::AcceptEdits => "accept_edits".into(),
2197 PermissionMode::DontAsk => "dont_ask".into(),
2198 PermissionMode::BypassPermissions => "bypass_permissions".into(),
2199 PermissionMode::Plan => "plan".into(),
2200 }
2201 }),
2202 }
2203 })
2204 .collect();
2205 self.update_metrics(|m| m.sub_agents = sub_agent_metrics);
2206 }
2207
2208 async fn notify_completed_subagents(&mut self) -> Result<(), error::AgentError> {
2210 let completed = self.poll_subagents().await;
2211 for (task_id, result) in completed {
2212 let notice = if result.is_empty() {
2213 format!("[sub-agent {id}] completed (no output)", id = &task_id[..8])
2214 } else {
2215 format!("[sub-agent {id}] completed:\n{result}", id = &task_id[..8])
2216 };
2217 if let Err(e) = self.channel.send(¬ice).await {
2218 tracing::warn!(error = %e, "failed to send sub-agent completion notice");
2219 }
2220 }
2221 Ok(())
2222 }
2223
2224 pub async fn run(&mut self) -> Result<(), error::AgentError> {
2230 if let Some(mut rx) = self.lifecycle.warmup_ready.take()
2231 && !*rx.borrow()
2232 {
2233 let _ = rx.changed().await;
2234 if !*rx.borrow() {
2235 tracing::warn!("model warmup did not complete successfully");
2236 }
2237 }
2238
2239 loop {
2240 if let Some(ref slot) = self.providers.provider_override
2242 && let Some(new_provider) = slot
2243 .write()
2244 .unwrap_or_else(std::sync::PoisonError::into_inner)
2245 .take()
2246 {
2247 tracing::debug!(provider = new_provider.name(), "ACP model override applied");
2248 self.provider = new_provider;
2249 }
2250
2251 self.check_tool_refresh().await;
2253
2254 self.refresh_subagent_metrics();
2256
2257 self.notify_completed_subagents().await?;
2259
2260 self.drain_channel();
2261
2262 let (text, image_parts) = if let Some(queued) = self.msg.message_queue.pop_front() {
2263 self.notify_queue_count().await;
2264 if queued.raw_attachments.is_empty() {
2265 (queued.text, queued.image_parts)
2266 } else {
2267 let msg = crate::channel::ChannelMessage {
2268 text: queued.text,
2269 attachments: queued.raw_attachments,
2270 };
2271 self.resolve_message(msg).await
2272 }
2273 } else {
2274 let incoming = tokio::select! {
2275 result = self.channel.recv() => result?,
2276 () = shutdown_signal(&mut self.lifecycle.shutdown) => {
2277 tracing::info!("shutting down");
2278 break;
2279 }
2280 Some(_) = recv_optional(&mut self.skill_state.skill_reload_rx) => {
2281 self.reload_skills().await;
2282 continue;
2283 }
2284 Some(_) = recv_optional(&mut self.instructions.reload_rx) => {
2285 self.reload_instructions();
2286 continue;
2287 }
2288 Some(_) = recv_optional(&mut self.lifecycle.config_reload_rx) => {
2289 self.reload_config();
2290 continue;
2291 }
2292 Some(msg) = recv_optional(&mut self.lifecycle.update_notify_rx) => {
2293 if let Err(e) = self.channel.send(&msg).await {
2294 tracing::warn!("failed to send update notification: {e}");
2295 }
2296 continue;
2297 }
2298 Some(msg) = recv_optional(&mut self.experiments.notify_rx) => {
2299 #[cfg(feature = "experiments")]
2302 { self.experiments.cancel = None; }
2303 if let Err(e) = self.channel.send(&msg).await {
2304 tracing::warn!("failed to send experiment completion: {e}");
2305 }
2306 continue;
2307 }
2308 Some(prompt) = recv_optional(&mut self.lifecycle.custom_task_rx) => {
2309 tracing::info!("scheduler: injecting custom task as agent turn");
2310 let text = format!("{SCHEDULED_TASK_PREFIX}{prompt}");
2311 Some(crate::channel::ChannelMessage { text, attachments: Vec::new() })
2312 }
2313 };
2314 let Some(msg) = incoming else { break };
2315 self.drain_channel();
2316 self.resolve_message(msg).await
2317 };
2318
2319 let trimmed = text.trim();
2320
2321 match self.handle_builtin_command(trimmed).await? {
2322 Some(true) => break,
2323 Some(false) => continue,
2324 None => {}
2325 }
2326
2327 self.process_user_message(text, image_parts).await?;
2328 }
2329
2330 if let Some(ref mut tc) = self.debug_state.trace_collector {
2332 tc.finish();
2333 }
2334
2335 Ok(())
2336 }
2337
2338 async fn handle_builtin_command(
2344 &mut self,
2345 trimmed: &str,
2346 ) -> Result<Option<bool>, error::AgentError> {
2347 if trimmed == "/clear-queue" {
2348 let n = self.clear_queue();
2349 self.notify_queue_count().await;
2350 self.channel
2351 .send(&format!("Cleared {n} queued messages."))
2352 .await?;
2353 let _ = self.channel.flush_chunks().await;
2354 return Ok(Some(false));
2355 }
2356
2357 if trimmed == "/compact" {
2358 if self.msg.messages.len() > self.context_manager.compaction_preserve_tail + 1 {
2359 match self.compact_context().await {
2360 Ok(
2361 context::CompactionOutcome::Compacted
2362 | context::CompactionOutcome::NoChange,
2363 ) => {
2364 let _ = self.channel.send("Context compacted successfully.").await;
2365 }
2366 Ok(context::CompactionOutcome::ProbeRejected) => {
2367 let _ = self
2368 .channel
2369 .send(
2370 "Compaction rejected: summary quality below threshold. \
2371 Original context preserved.",
2372 )
2373 .await;
2374 }
2375 Err(e) => {
2376 let _ = self.channel.send(&format!("Compaction failed: {e}")).await;
2377 }
2378 }
2379 } else {
2380 let _ = self.channel.send("Nothing to compact.").await;
2381 }
2382 let _ = self.channel.flush_chunks().await;
2383 return Ok(Some(false));
2384 }
2385
2386 if trimmed == "/clear" {
2387 self.clear_history();
2388 self.tool_orchestrator.clear_cache();
2389 if let Ok(mut urls) = self.security.user_provided_urls.write() {
2390 urls.clear();
2391 }
2392 let _ = self.channel.flush_chunks().await;
2393 return Ok(Some(false));
2394 }
2395
2396 if trimmed == "/cache-stats" {
2397 let stats = self.tool_orchestrator.cache_stats();
2398 self.channel.send(&stats).await?;
2399 let _ = self.channel.flush_chunks().await;
2400 return Ok(Some(false));
2401 }
2402
2403 if trimmed == "/model" || trimmed.starts_with("/model ") {
2404 self.handle_model_command(trimmed).await;
2405 let _ = self.channel.flush_chunks().await;
2406 return Ok(Some(false));
2407 }
2408
2409 if trimmed == "/provider" || trimmed.starts_with("/provider ") {
2410 self.handle_provider_command(trimmed).await;
2411 let _ = self.channel.flush_chunks().await;
2412 return Ok(Some(false));
2413 }
2414
2415 if trimmed == "/debug-dump" || trimmed.starts_with("/debug-dump ") {
2416 self.handle_debug_dump_command(trimmed).await;
2417 let _ = self.channel.flush_chunks().await;
2418 return Ok(Some(false));
2419 }
2420
2421 if trimmed.starts_with("/dump-format") {
2422 self.handle_dump_format_command(trimmed).await;
2423 let _ = self.channel.flush_chunks().await;
2424 return Ok(Some(false));
2425 }
2426
2427 if trimmed == "/exit" || trimmed == "/quit" {
2428 if self.channel.supports_exit() {
2429 return Ok(Some(true));
2430 }
2431 let _ = self
2432 .channel
2433 .send("/exit is not supported in this channel.")
2434 .await;
2435 return Ok(Some(false));
2436 }
2437
2438 Ok(None)
2439 }
2440
2441 pub fn set_model(&mut self, model_id: &str) -> Result<(), String> {
2449 if model_id.is_empty() {
2450 return Err("model id must not be empty".to_string());
2451 }
2452 if model_id.len() > 256 {
2453 return Err("model id exceeds maximum length of 256 characters".to_string());
2454 }
2455 if !model_id
2456 .chars()
2457 .all(|c| c.is_ascii() && !c.is_ascii_control())
2458 {
2459 return Err("model id must contain only printable ASCII characters".to_string());
2460 }
2461 self.runtime.model_name = model_id.to_string();
2462 tracing::info!(model = model_id, "set_model called");
2463 Ok(())
2464 }
2465
2466 async fn handle_model_refresh(&mut self) {
2467 if let Some(cache_dir) = dirs::cache_dir() {
2469 let models_dir = cache_dir.join("zeph").join("models");
2470 if let Ok(entries) = std::fs::read_dir(&models_dir) {
2471 for entry in entries.flatten() {
2472 let path = entry.path();
2473 if path.extension().and_then(|e| e.to_str()) == Some("json") {
2474 let _ = std::fs::remove_file(&path);
2475 }
2476 }
2477 }
2478 }
2479 match self.provider.list_models_remote().await {
2480 Ok(models) => {
2481 let _ = self
2482 .channel
2483 .send(&format!("Fetched {} models.", models.len()))
2484 .await;
2485 }
2486 Err(e) => {
2487 let _ = self
2488 .channel
2489 .send(&format!("Error fetching models: {e}"))
2490 .await;
2491 }
2492 }
2493 }
2494
2495 async fn handle_model_list(&mut self) {
2496 let cache = zeph_llm::model_cache::ModelCache::for_slug(self.provider.name());
2497 let cached = if cache.is_stale() {
2498 None
2499 } else {
2500 cache.load().unwrap_or(None)
2501 };
2502 let models = if let Some(m) = cached {
2503 m
2504 } else {
2505 match self.provider.list_models_remote().await {
2506 Ok(m) => m,
2507 Err(e) => {
2508 let _ = self
2509 .channel
2510 .send(&format!("Error fetching models: {e}"))
2511 .await;
2512 return;
2513 }
2514 }
2515 };
2516 if models.is_empty() {
2517 let _ = self.channel.send("No models available.").await;
2518 return;
2519 }
2520 let mut lines = vec!["Available models:".to_string()];
2521 for (i, m) in models.iter().enumerate() {
2522 lines.push(format!(" {}. {} ({})", i + 1, m.display_name, m.id));
2523 }
2524 let _ = self.channel.send(&lines.join("\n")).await;
2525 }
2526
2527 async fn handle_model_switch(&mut self, model_id: &str) {
2528 let cache = zeph_llm::model_cache::ModelCache::for_slug(self.provider.name());
2531 let known_models: Option<Vec<zeph_llm::model_cache::RemoteModelInfo>> = if cache.is_stale()
2532 {
2533 match self.provider.list_models_remote().await {
2534 Ok(m) if !m.is_empty() => Some(m),
2535 _ => None,
2536 }
2537 } else {
2538 cache.load().unwrap_or(None)
2539 };
2540 if let Some(models) = known_models {
2541 if !models.iter().any(|m| m.id == model_id) {
2542 let mut lines = vec![format!("Unknown model '{model_id}'. Available models:")];
2543 for m in &models {
2544 lines.push(format!(" • {} ({})", m.display_name, m.id));
2545 }
2546 let _ = self.channel.send(&lines.join("\n")).await;
2547 return;
2548 }
2549 } else {
2550 let _ = self
2551 .channel
2552 .send(
2553 "Model list unavailable, switching anyway — verify your model name is correct.",
2554 )
2555 .await;
2556 }
2557 match self.set_model(model_id) {
2558 Ok(()) => {
2559 let _ = self
2560 .channel
2561 .send(&format!("Switched to model: {model_id}"))
2562 .await;
2563 }
2564 Err(e) => {
2565 let _ = self.channel.send(&format!("Error: {e}")).await;
2566 }
2567 }
2568 }
2569
2570 async fn handle_model_command(&mut self, trimmed: &str) {
2572 let arg = trimmed.strip_prefix("/model").map_or("", str::trim);
2573 if arg == "refresh" {
2574 self.handle_model_refresh().await;
2575 } else if arg.is_empty() {
2576 self.handle_model_list().await;
2577 } else {
2578 self.handle_model_switch(arg).await;
2579 }
2580 }
2581
2582 async fn handle_debug_dump_command(&mut self, trimmed: &str) {
2584 let arg = trimmed.strip_prefix("/debug-dump").map_or("", str::trim);
2585 if arg.is_empty() {
2586 match &self.debug_state.debug_dumper {
2587 Some(d) => {
2588 let _ = self
2589 .channel
2590 .send(&format!("Debug dump active: {}", d.dir().display()))
2591 .await;
2592 }
2593 None => {
2594 let _ = self
2595 .channel
2596 .send(
2597 "Debug dump is inactive. Use `/debug-dump <path>` to enable, \
2598 or start with `--debug-dump [dir]`.",
2599 )
2600 .await;
2601 }
2602 }
2603 return;
2604 }
2605 let dir = std::path::PathBuf::from(arg);
2606 match crate::debug_dump::DebugDumper::new(&dir, self.debug_state.dump_format) {
2607 Ok(dumper) => {
2608 let path = dumper.dir().display().to_string();
2609 self.debug_state.debug_dumper = Some(dumper);
2610 let _ = self
2611 .channel
2612 .send(&format!("Debug dump enabled: {path}"))
2613 .await;
2614 }
2615 Err(e) => {
2616 let _ = self
2617 .channel
2618 .send(&format!("Failed to enable debug dump: {e}"))
2619 .await;
2620 }
2621 }
2622 }
2623
2624 async fn handle_dump_format_command(&mut self, trimmed: &str) {
2626 let arg = trimmed.strip_prefix("/dump-format").map_or("", str::trim);
2627 if arg.is_empty() {
2628 let _ = self
2629 .channel
2630 .send(&format!(
2631 "Current dump format: {:?}. Use `/dump-format json|raw|trace` to change.",
2632 self.debug_state.dump_format
2633 ))
2634 .await;
2635 return;
2636 }
2637 let new_format = match arg {
2638 "json" => crate::debug_dump::DumpFormat::Json,
2639 "raw" => crate::debug_dump::DumpFormat::Raw,
2640 "trace" => crate::debug_dump::DumpFormat::Trace,
2641 other => {
2642 let _ = self
2643 .channel
2644 .send(&format!(
2645 "Unknown format '{other}'. Valid values: json, raw, trace."
2646 ))
2647 .await;
2648 return;
2649 }
2650 };
2651 let was_trace = self.debug_state.dump_format == crate::debug_dump::DumpFormat::Trace;
2652 let now_trace = new_format == crate::debug_dump::DumpFormat::Trace;
2653
2654 if now_trace
2656 && !was_trace
2657 && let Some(ref dump_dir) = self.debug_state.dump_dir.clone()
2658 {
2659 let service_name = self.debug_state.trace_service_name.clone();
2660 let redact = self.debug_state.trace_redact;
2661 match crate::debug_dump::trace::TracingCollector::new(
2662 dump_dir.as_path(),
2663 &service_name,
2664 redact,
2665 None,
2666 ) {
2667 Ok(collector) => {
2668 self.debug_state.trace_collector = Some(collector);
2669 }
2670 Err(e) => {
2671 tracing::warn!(error = %e, "failed to create TracingCollector on format switch");
2672 }
2673 }
2674 }
2675 if was_trace
2677 && !now_trace
2678 && let Some(mut tc) = self.debug_state.trace_collector.take()
2679 {
2680 tc.finish();
2681 }
2682
2683 self.debug_state.dump_format = new_format;
2684 let _ = self
2685 .channel
2686 .send(&format!("Debug dump format set to: {arg}"))
2687 .await;
2688 }
2689
2690 async fn resolve_message(
2691 &self,
2692 msg: crate::channel::ChannelMessage,
2693 ) -> (String, Vec<zeph_llm::provider::MessagePart>) {
2694 use crate::channel::{Attachment, AttachmentKind};
2695 use zeph_llm::provider::{ImageData, MessagePart};
2696
2697 let text_base = msg.text.clone();
2698
2699 let (audio_attachments, image_attachments): (Vec<Attachment>, Vec<Attachment>) = msg
2700 .attachments
2701 .into_iter()
2702 .partition(|a| a.kind == AttachmentKind::Audio);
2703
2704 tracing::debug!(
2705 audio = audio_attachments.len(),
2706 has_stt = self.providers.stt.is_some(),
2707 "resolve_message attachments"
2708 );
2709
2710 let text = if !audio_attachments.is_empty()
2711 && let Some(stt) = self.providers.stt.as_ref()
2712 {
2713 let mut transcribed_parts = Vec::new();
2714 for attachment in &audio_attachments {
2715 if attachment.data.len() > MAX_AUDIO_BYTES {
2716 tracing::warn!(
2717 size = attachment.data.len(),
2718 max = MAX_AUDIO_BYTES,
2719 "audio attachment exceeds size limit, skipping"
2720 );
2721 continue;
2722 }
2723 match stt
2724 .transcribe(&attachment.data, attachment.filename.as_deref())
2725 .await
2726 {
2727 Ok(result) => {
2728 tracing::info!(
2729 len = result.text.len(),
2730 language = ?result.language,
2731 "audio transcribed"
2732 );
2733 transcribed_parts.push(result.text);
2734 }
2735 Err(e) => {
2736 tracing::error!(error = %e, "audio transcription failed");
2737 }
2738 }
2739 }
2740 if transcribed_parts.is_empty() {
2741 text_base
2742 } else {
2743 let transcribed = transcribed_parts.join("\n");
2744 if text_base.is_empty() {
2745 transcribed
2746 } else {
2747 format!("[transcribed audio]\n{transcribed}\n\n{text_base}")
2748 }
2749 }
2750 } else {
2751 if !audio_attachments.is_empty() {
2752 tracing::warn!(
2753 count = audio_attachments.len(),
2754 "audio attachments received but no STT provider configured, dropping"
2755 );
2756 }
2757 text_base
2758 };
2759
2760 let mut image_parts = Vec::new();
2761 for attachment in image_attachments {
2762 if attachment.data.len() > MAX_IMAGE_BYTES {
2763 tracing::warn!(
2764 size = attachment.data.len(),
2765 max = MAX_IMAGE_BYTES,
2766 "image attachment exceeds size limit, skipping"
2767 );
2768 continue;
2769 }
2770 let mime_type = detect_image_mime(attachment.filename.as_deref()).to_string();
2771 image_parts.push(MessagePart::Image(Box::new(ImageData {
2772 data: attachment.data,
2773 mime_type,
2774 })));
2775 }
2776
2777 (text, image_parts)
2778 }
2779
2780 #[allow(clippy::too_many_lines)]
2783 async fn dispatch_slash_command(
2784 &mut self,
2785 trimmed: &str,
2786 ) -> Option<Result<(), error::AgentError>> {
2787 macro_rules! handled {
2788 ($expr:expr) => {{
2789 if let Err(e) = $expr {
2790 return Some(Err(e));
2791 }
2792 let _ = self.channel.flush_chunks().await;
2793 return Some(Ok(()));
2794 }};
2795 }
2796
2797 let slash_urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
2800 if !slash_urls.is_empty()
2801 && let Ok(mut set) = self.security.user_provided_urls.write()
2802 {
2803 set.extend(slash_urls);
2804 }
2805
2806 if trimmed == "/help" {
2807 handled!(self.handle_help_command().await);
2808 }
2809
2810 if trimmed == "/status" {
2811 handled!(self.handle_status_command().await);
2812 }
2813
2814 #[cfg(feature = "guardrail")]
2815 if trimmed == "/guardrail" {
2816 handled!(self.handle_guardrail_command().await);
2817 }
2818
2819 if trimmed == "/skills" {
2820 handled!(self.handle_skills_command().await);
2821 }
2822
2823 if trimmed == "/skill" || trimmed.starts_with("/skill ") {
2824 let rest = trimmed
2825 .strip_prefix("/skill")
2826 .unwrap_or("")
2827 .trim()
2828 .to_owned();
2829 handled!(self.handle_skill_command(&rest).await);
2830 }
2831
2832 if trimmed == "/feedback" || trimmed.starts_with("/feedback ") {
2833 let rest = trimmed
2834 .strip_prefix("/feedback")
2835 .unwrap_or("")
2836 .trim()
2837 .to_owned();
2838 handled!(self.handle_feedback(&rest).await);
2839 }
2840
2841 if trimmed == "/mcp" || trimmed.starts_with("/mcp ") {
2842 let args = trimmed.strip_prefix("/mcp").unwrap_or("").trim().to_owned();
2843 handled!(self.handle_mcp_command(&args).await);
2844 }
2845
2846 if trimmed == "/image" || trimmed.starts_with("/image ") {
2847 let path = trimmed
2848 .strip_prefix("/image")
2849 .unwrap_or("")
2850 .trim()
2851 .to_owned();
2852 if path.is_empty() {
2853 handled!(
2854 self.channel
2855 .send("Usage: /image <path>")
2856 .await
2857 .map_err(Into::into)
2858 );
2859 }
2860 handled!(self.handle_image_command(&path).await);
2861 }
2862
2863 if trimmed == "/plan" || trimmed.starts_with("/plan ") {
2864 return Some(self.dispatch_plan_command(trimmed).await);
2865 }
2866
2867 if trimmed == "/graph" || trimmed.starts_with("/graph ") {
2868 handled!(self.handle_graph_command(trimmed).await);
2869 }
2870
2871 if trimmed == "/memory" || trimmed.starts_with("/memory ") {
2872 handled!(self.handle_memory_command(trimmed).await);
2873 }
2874
2875 #[cfg(feature = "compression-guidelines")]
2876 if trimmed == "/guidelines" {
2877 handled!(self.handle_guidelines_command().await);
2878 }
2879
2880 #[cfg(feature = "scheduler")]
2881 if trimmed == "/scheduler" || trimmed.starts_with("/scheduler ") {
2882 handled!(self.handle_scheduler_command(trimmed).await);
2883 }
2884
2885 #[cfg(feature = "experiments")]
2886 if trimmed == "/experiment" || trimmed.starts_with("/experiment ") {
2887 handled!(self.handle_experiment_command(trimmed).await);
2888 }
2889
2890 #[cfg(feature = "lsp-context")]
2891 if trimmed == "/lsp" {
2892 handled!(self.handle_lsp_status_command().await);
2893 }
2894
2895 #[cfg(feature = "policy-enforcer")]
2896 if trimmed == "/policy" || trimmed.starts_with("/policy ") {
2897 let args = trimmed
2898 .strip_prefix("/policy")
2899 .unwrap_or("")
2900 .trim()
2901 .to_owned();
2902 handled!(self.handle_policy_command(&args).await);
2903 }
2904
2905 if trimmed == "/log" {
2906 handled!(self.handle_log_command().await);
2907 }
2908
2909 if trimmed.starts_with("/agent") || trimmed.starts_with('@') {
2910 return self.dispatch_agent_command(trimmed).await;
2911 }
2912
2913 #[cfg(feature = "context-compression")]
2914 if trimmed == "/focus" {
2915 handled!(self.handle_focus_status_command().await);
2916 }
2917
2918 #[cfg(feature = "context-compression")]
2919 if trimmed == "/sidequest" {
2920 handled!(self.handle_sidequest_status_command().await);
2921 }
2922
2923 None
2924 }
2925
2926 async fn dispatch_plan_command(&mut self, trimmed: &str) -> Result<(), error::AgentError> {
2927 match crate::orchestration::PlanCommand::parse(trimmed) {
2928 Ok(cmd) => {
2929 self.handle_plan_command(cmd).await?;
2930 }
2931 Err(e) => {
2932 self.channel
2933 .send(&e.to_string())
2934 .await
2935 .map_err(error::AgentError::from)?;
2936 }
2937 }
2938 let _ = self.channel.flush_chunks().await;
2939 Ok(())
2940 }
2941
2942 async fn dispatch_agent_command(
2943 &mut self,
2944 trimmed: &str,
2945 ) -> Option<Result<(), error::AgentError>> {
2946 let known: Vec<String> = self
2947 .orchestration
2948 .subagent_manager
2949 .as_ref()
2950 .map(|m| m.definitions().iter().map(|d| d.name.clone()).collect())
2951 .unwrap_or_default();
2952 match crate::subagent::AgentCommand::parse(trimmed, &known) {
2953 Ok(cmd) => {
2954 if let Some(msg) = self.handle_agent_command(cmd).await
2955 && let Err(e) = self.channel.send(&msg).await
2956 {
2957 return Some(Err(e.into()));
2958 }
2959 let _ = self.channel.flush_chunks().await;
2960 Some(Ok(()))
2961 }
2962 Err(e) if trimmed.starts_with('@') => {
2963 tracing::debug!("@mention not matched as agent: {e}");
2965 None
2966 }
2967 Err(e) => {
2968 if let Err(send_err) = self.channel.send(&e.to_string()).await {
2969 return Some(Err(send_err.into()));
2970 }
2971 let _ = self.channel.flush_chunks().await;
2972 Some(Ok(()))
2973 }
2974 }
2975 }
2976
2977 fn spawn_judge_correction_check(
2986 &mut self,
2987 trimmed: &str,
2988 conv_id: Option<zeph_memory::ConversationId>,
2989 ) {
2990 let assistant_snippet = self.last_assistant_response();
2991 let user_msg_owned = trimmed.to_owned();
2992 let memory_arc = self.memory_state.memory.clone();
2993 let skill_name = self
2994 .skill_state
2995 .active_skill_names
2996 .first()
2997 .cloned()
2998 .unwrap_or_default();
2999 let conv_id_bg = conv_id;
3000 let confidence_threshold = self
3001 .learning_engine
3002 .config
3003 .as_ref()
3004 .map_or(0.6, |c| c.correction_confidence_threshold);
3005
3006 if let Some(llm_classifier) = self.feedback.llm_classifier.clone() {
3007 let user_msg = user_msg_owned.clone();
3009 let assistant = assistant_snippet.clone();
3010 let memory_arc2 = memory_arc.clone();
3011 let skill_name2 = skill_name.clone();
3012 tokio::spawn(async move {
3013 match llm_classifier
3014 .classify_feedback(&user_msg, &assistant, confidence_threshold)
3015 .await
3016 {
3017 Ok(verdict) => {
3018 if let Some(signal) = feedback_verdict_into_signal(&verdict, &user_msg) {
3019 let is_self_correction =
3020 signal.kind == feedback_detector::CorrectionKind::SelfCorrection;
3021 tracing::info!(
3022 kind = signal.kind.as_str(),
3023 confidence = signal.confidence,
3024 source = "llm-classifier",
3025 is_self_correction,
3026 "correction signal detected"
3027 );
3028 store_correction_in_memory(
3029 memory_arc2,
3030 conv_id_bg,
3031 &assistant,
3032 &user_msg,
3033 skill_name2,
3034 signal.kind.as_str(),
3035 )
3036 .await;
3037 }
3038 }
3039 Err(e) => {
3040 tracing::warn!("llm-classifier failed: {e:#}");
3041 }
3042 }
3043 });
3044 } else {
3045 let judge_provider = self
3047 .providers
3048 .judge_provider
3049 .clone()
3050 .unwrap_or_else(|| self.provider.clone());
3051 let user_msg = user_msg_owned.clone();
3052 let assistant = assistant_snippet.clone();
3053 tokio::spawn(async move {
3054 match feedback_detector::JudgeDetector::evaluate(
3055 &judge_provider,
3056 &user_msg,
3057 &assistant,
3058 confidence_threshold,
3059 )
3060 .await
3061 {
3062 Ok(verdict) => {
3063 if let Some(signal) = verdict.into_signal(&user_msg) {
3064 let is_self_correction =
3069 signal.kind == feedback_detector::CorrectionKind::SelfCorrection;
3070 tracing::info!(
3071 kind = signal.kind.as_str(),
3072 confidence = signal.confidence,
3073 source = "judge",
3074 is_self_correction,
3075 "correction signal detected"
3076 );
3077 store_correction_in_memory(
3078 memory_arc,
3079 conv_id_bg,
3080 &assistant,
3081 &user_msg,
3082 skill_name,
3083 signal.kind.as_str(),
3084 )
3085 .await;
3086 }
3087 }
3088 Err(e) => {
3089 tracing::warn!("judge detector failed: {e:#}");
3090 }
3091 }
3092 });
3093 }
3094 }
3095
3096 #[allow(clippy::too_many_lines)]
3103 async fn detect_and_record_corrections(
3104 &mut self,
3105 trimmed: &str,
3106 conv_id: Option<zeph_memory::ConversationId>,
3107 ) {
3108 let correction_detection_enabled = self
3109 .learning_engine
3110 .config
3111 .as_ref()
3112 .is_none_or(|c| c.correction_detection);
3113 if !correction_detection_enabled {
3114 return;
3115 }
3116
3117 let previous_user_messages: Vec<&str> = self
3118 .msg
3119 .messages
3120 .iter()
3121 .filter(|m| m.role == Role::User)
3122 .map(|m| m.content.as_str())
3123 .collect();
3124
3125 let regex_signal = self
3126 .feedback
3127 .detector
3128 .detect(trimmed, &previous_user_messages);
3129
3130 let judge_should_run = if self.feedback.llm_classifier.is_some() {
3147 let adaptive_low = self
3149 .learning_engine
3150 .config
3151 .as_ref()
3152 .map_or(0.5, |c| c.judge_adaptive_low);
3153 let adaptive_high = self
3154 .learning_engine
3155 .config
3156 .as_ref()
3157 .map_or(0.8, |c| c.judge_adaptive_high);
3158 let should_invoke = self
3159 .feedback
3160 .judge
3161 .get_or_insert_with(|| {
3162 feedback_detector::JudgeDetector::new(adaptive_low, adaptive_high)
3163 })
3164 .should_invoke(regex_signal.as_ref());
3165 should_invoke
3166 && self
3167 .feedback
3168 .judge
3169 .as_mut()
3170 .is_some_and(feedback_detector::JudgeDetector::check_rate_limit)
3171 } else {
3172 self.feedback
3174 .judge
3175 .as_ref()
3176 .is_some_and(|jd| jd.should_invoke(regex_signal.as_ref()))
3177 && self
3178 .feedback
3179 .judge
3180 .as_mut() .is_some_and(feedback_detector::JudgeDetector::check_rate_limit)
3182 };
3183
3184 let (signal, signal_source) = if judge_should_run {
3185 self.spawn_judge_correction_check(trimmed, conv_id);
3186 (None, "judge")
3188 } else {
3189 (regex_signal, "regex")
3190 };
3191
3192 let Some(signal) = signal else { return };
3193 tracing::info!(
3194 kind = signal.kind.as_str(),
3195 confidence = signal.confidence,
3196 source = signal_source,
3197 "implicit correction detected"
3198 );
3199 let feedback_text = context::truncate_chars(&signal.feedback_text, 500);
3201 if self.is_learning_enabled()
3204 && signal.kind != feedback_detector::CorrectionKind::SelfCorrection
3205 {
3206 self.record_skill_outcomes(
3207 "user_rejection",
3208 Some(&feedback_text),
3209 Some(signal.kind.as_str()),
3210 )
3211 .await;
3212 }
3213 if let Some(memory) = &self.memory_state.memory {
3214 let correction_text = context::truncate_chars(trimmed, 500);
3218 match memory
3219 .sqlite()
3220 .store_user_correction(
3221 conv_id.map(|c| c.0),
3222 "",
3223 &correction_text,
3224 self.skill_state
3225 .active_skill_names
3226 .first()
3227 .map(String::as_str),
3228 signal.kind.as_str(),
3229 )
3230 .await
3231 {
3232 Ok(correction_id) => {
3233 if let Err(e) = memory
3234 .store_correction_embedding(correction_id, &correction_text)
3235 .await
3236 {
3237 tracing::warn!("failed to store correction embedding: {e:#}");
3238 }
3239 }
3240 Err(e) => tracing::warn!("failed to store user correction: {e:#}"),
3241 }
3242 }
3243 }
3244
3245 async fn process_user_message(
3246 &mut self,
3247 text: String,
3248 image_parts: Vec<zeph_llm::provider::MessagePart>,
3249 ) -> Result<(), error::AgentError> {
3250 let iteration_index = self.debug_state.iteration_counter;
3252 self.debug_state.iteration_counter += 1;
3253 if let Some(ref mut tc) = self.debug_state.trace_collector {
3254 tc.begin_iteration(iteration_index, text.trim());
3255 self.debug_state.current_iteration_span_id =
3257 tc.current_iteration_span_id(iteration_index);
3258 }
3259
3260 let result = self
3261 .process_user_message_inner(text, image_parts, iteration_index)
3262 .await;
3263
3264 if let Some(ref mut tc) = self.debug_state.trace_collector {
3266 let status = if result.is_ok() {
3267 crate::debug_dump::trace::SpanStatus::Ok
3268 } else {
3269 crate::debug_dump::trace::SpanStatus::Error {
3270 message: "iteration failed".to_owned(),
3271 }
3272 };
3273 tc.end_iteration(iteration_index, status);
3274 }
3275 self.debug_state.current_iteration_span_id = None;
3276
3277 result
3278 }
3279
3280 #[allow(clippy::too_many_lines)]
3281 async fn process_user_message_inner(
3282 &mut self,
3283 text: String,
3284 image_parts: Vec<zeph_llm::provider::MessagePart>,
3285 iteration_index: usize,
3286 ) -> Result<(), error::AgentError> {
3287 let _ = iteration_index; self.lifecycle.cancel_token = CancellationToken::new();
3289 let signal = Arc::clone(&self.lifecycle.cancel_signal);
3290 let token = self.lifecycle.cancel_token.clone();
3291 tokio::spawn(async move {
3292 signal.notified().await;
3293 token.cancel();
3294 });
3295 let trimmed = text.trim();
3296
3297 if let Some(result) = self.dispatch_slash_command(trimmed).await {
3298 return result;
3299 }
3300
3301 self.check_pending_rollbacks().await;
3302
3303 #[cfg(feature = "guardrail")]
3305 if let Some(ref guardrail) = self.security.guardrail {
3306 use zeph_sanitizer::guardrail::GuardrailVerdict;
3307 let verdict = guardrail.check(trimmed).await;
3308 match &verdict {
3309 GuardrailVerdict::Flagged { reason, .. } => {
3310 tracing::warn!(
3311 reason = %reason,
3312 should_block = verdict.should_block(),
3313 "guardrail flagged user input"
3314 );
3315 if verdict.should_block() {
3316 let msg = format!("[guardrail] Input blocked: {reason}");
3317 let _ = self.channel.send(&msg).await;
3318 let _ = self.channel.flush_chunks().await;
3319 return Ok(());
3320 }
3321 let _ = self
3323 .channel
3324 .send(&format!("[guardrail] Warning: {reason}"))
3325 .await;
3326 }
3327 GuardrailVerdict::Error { error } => {
3328 if guardrail.error_should_block() {
3329 tracing::warn!(%error, "guardrail check failed (fail_strategy=closed), blocking input");
3330 let msg = "[guardrail] Input blocked: check failed (see logs for details)";
3331 let _ = self.channel.send(msg).await;
3332 let _ = self.channel.flush_chunks().await;
3333 return Ok(());
3334 }
3335 tracing::warn!(%error, "guardrail check failed (fail_strategy=open), allowing input");
3336 }
3337 GuardrailVerdict::Safe => {}
3338 }
3339 }
3340
3341 #[cfg(feature = "classifiers")]
3345 if self.security.sanitizer.classify_injection(trimmed).await {
3346 let _ = self
3347 .channel
3348 .send("[security] Input blocked: injection detected by classifier.")
3349 .await;
3350 let _ = self.channel.flush_chunks().await;
3351 return Ok(());
3352 }
3353
3354 let conv_id = self.memory_state.conversation_id;
3357 self.rebuild_system_prompt(&text).await;
3358
3359 self.detect_and_record_corrections(trimmed, conv_id).await;
3360 self.learning_engine.tick();
3361 self.analyze_and_learn().await;
3362 self.sync_graph_counts().await;
3363
3364 self.context_manager.compaction = self.context_manager.compaction.advance_turn();
3369
3370 #[cfg(feature = "context-compression")]
3372 {
3373 self.focus.tick();
3374
3375 let sidequest_should_fire = self.sidequest.tick();
3378 if sidequest_should_fire && !self.context_manager.compaction.is_compacted_this_turn() {
3379 self.maybe_sidequest_eviction();
3380 }
3381 }
3382
3383 self.maybe_apply_deferred_summaries();
3388 self.flush_deferred_summaries().await;
3389
3390 if let Err(e) = self.maybe_proactive_compress().await {
3392 tracing::warn!("proactive compression failed: {e:#}");
3393 }
3394
3395 if let Err(e) = self.maybe_compact().await {
3396 tracing::warn!("context compaction failed: {e:#}");
3397 }
3398
3399 if let Err(e) = Box::pin(self.prepare_context(trimmed)).await {
3400 tracing::warn!("context preparation failed: {e:#}");
3401 }
3402
3403 self.learning_engine.reset_reflection();
3404
3405 let mut all_image_parts = std::mem::take(&mut self.msg.pending_image_parts);
3406 all_image_parts.extend(image_parts);
3407 let image_parts = all_image_parts;
3408
3409 let user_msg = if !image_parts.is_empty() && self.provider.supports_vision() {
3410 let mut parts = vec![zeph_llm::provider::MessagePart::Text { text: text.clone() }];
3411 parts.extend(image_parts);
3412 Message::from_parts(Role::User, parts)
3413 } else {
3414 if !image_parts.is_empty() {
3415 tracing::warn!(
3416 count = image_parts.len(),
3417 "image attachments dropped: provider does not support vision"
3418 );
3419 }
3420 Message {
3421 role: Role::User,
3422 content: text.clone(),
3423 parts: vec![],
3424 metadata: MessageMetadata::default(),
3425 }
3426 };
3427 let urls = zeph_sanitizer::exfiltration::extract_flagged_urls(trimmed);
3429 if !urls.is_empty()
3430 && let Ok(mut set) = self.security.user_provided_urls.write()
3431 {
3432 set.extend(urls);
3433 }
3434
3435 self.persist_message(Role::User, &text, &[], false).await;
3437 self.push_message(user_msg);
3438
3439 if let Err(e) = self.process_response().await {
3440 tracing::error!("Response processing failed: {e:#}");
3441 let user_msg = format!("Error: {e:#}");
3442 self.channel.send(&user_msg).await?;
3443 self.msg.messages.pop();
3444 self.recompute_prompt_tokens();
3445 self.channel.flush_chunks().await?;
3446 }
3447
3448 Ok(())
3449 }
3450
3451 async fn handle_image_command(&mut self, path: &str) -> Result<(), error::AgentError> {
3452 use std::path::Component;
3453 use zeph_llm::provider::{ImageData, MessagePart};
3454
3455 let has_parent_dir = std::path::Path::new(path)
3457 .components()
3458 .any(|c| c == Component::ParentDir);
3459 if has_parent_dir {
3460 self.channel
3461 .send("Invalid image path: path traversal not allowed")
3462 .await?;
3463 let _ = self.channel.flush_chunks().await;
3464 return Ok(());
3465 }
3466
3467 let data = match std::fs::read(path) {
3468 Ok(d) => d,
3469 Err(e) => {
3470 self.channel
3471 .send(&format!("Cannot read image {path}: {e}"))
3472 .await?;
3473 let _ = self.channel.flush_chunks().await;
3474 return Ok(());
3475 }
3476 };
3477 if data.len() > MAX_IMAGE_BYTES {
3478 self.channel
3479 .send(&format!(
3480 "Image {path} exceeds size limit ({} MB), skipping",
3481 MAX_IMAGE_BYTES / 1024 / 1024
3482 ))
3483 .await?;
3484 let _ = self.channel.flush_chunks().await;
3485 return Ok(());
3486 }
3487 let mime_type = detect_image_mime(Some(path)).to_string();
3488 self.msg
3489 .pending_image_parts
3490 .push(MessagePart::Image(Box::new(ImageData { data, mime_type })));
3491 self.channel
3492 .send(&format!("Image loaded: {path}. Send your message."))
3493 .await?;
3494 let _ = self.channel.flush_chunks().await;
3495 Ok(())
3496 }
3497
3498 async fn handle_help_command(&mut self) -> Result<(), error::AgentError> {
3499 use std::fmt::Write;
3500
3501 let mut out = String::from("Slash commands:\n\n");
3502
3503 let categories = [
3504 slash_commands::SlashCategory::Info,
3505 slash_commands::SlashCategory::Session,
3506 slash_commands::SlashCategory::Model,
3507 slash_commands::SlashCategory::Memory,
3508 slash_commands::SlashCategory::Tools,
3509 slash_commands::SlashCategory::Planning,
3510 slash_commands::SlashCategory::Debug,
3511 slash_commands::SlashCategory::Advanced,
3512 ];
3513
3514 for cat in &categories {
3515 let entries: Vec<_> = slash_commands::COMMANDS
3516 .iter()
3517 .filter(|c| &c.category == cat)
3518 .collect();
3519 if entries.is_empty() {
3520 continue;
3521 }
3522 let _ = writeln!(out, "{}:", cat.as_str());
3523 for cmd in entries {
3524 if cmd.args.is_empty() {
3525 let _ = write!(out, " {}", cmd.name);
3526 } else {
3527 let _ = write!(out, " {} {}", cmd.name, cmd.args);
3528 }
3529 let _ = write!(out, " — {}", cmd.description);
3530 if let Some(feat) = cmd.feature_gate {
3531 let _ = write!(out, " [requires: {feat}]");
3532 }
3533 let _ = writeln!(out);
3534 }
3535 let _ = writeln!(out);
3536 }
3537
3538 self.channel.send(out.trim_end()).await?;
3539 Ok(())
3540 }
3541
3542 #[allow(clippy::too_many_lines)]
3543 async fn handle_status_command(&mut self) -> Result<(), error::AgentError> {
3544 use std::fmt::Write;
3545
3546 let uptime = self.lifecycle.start_time.elapsed().as_secs();
3547 let msg_count = self
3548 .msg
3549 .messages
3550 .iter()
3551 .filter(|m| m.role == Role::User)
3552 .count();
3553
3554 let (
3555 api_calls,
3556 prompt_tokens,
3557 completion_tokens,
3558 cost_cents,
3559 mcp_servers,
3560 orch_plans,
3561 orch_tasks,
3562 orch_completed,
3563 orch_failed,
3564 orch_skipped,
3565 ) = if let Some(ref tx) = self.metrics.metrics_tx {
3566 let m = tx.borrow();
3567 (
3568 m.api_calls,
3569 m.prompt_tokens,
3570 m.completion_tokens,
3571 m.cost_spent_cents,
3572 m.mcp_server_count,
3573 m.orchestration.plans_total,
3574 m.orchestration.tasks_total,
3575 m.orchestration.tasks_completed,
3576 m.orchestration.tasks_failed,
3577 m.orchestration.tasks_skipped,
3578 )
3579 } else {
3580 (0, 0, 0, 0.0, 0, 0, 0, 0, 0, 0)
3581 };
3582
3583 let skill_count = self
3584 .skill_state
3585 .registry
3586 .read()
3587 .map(|r| r.all_meta().len())
3588 .unwrap_or(0);
3589
3590 let mut out = String::from("Session status:\n\n");
3591 let _ = writeln!(out, "Provider: {}", self.provider.name());
3592 let _ = writeln!(out, "Model: {}", self.runtime.model_name);
3593 let _ = writeln!(out, "Uptime: {uptime}s");
3594 let _ = writeln!(out, "Turns: {msg_count}");
3595 let _ = writeln!(out, "API calls: {api_calls}");
3596 let _ = writeln!(
3597 out,
3598 "Tokens: {prompt_tokens} prompt / {completion_tokens} completion"
3599 );
3600 let _ = writeln!(out, "Skills: {skill_count}");
3601 let _ = writeln!(out, "MCP: {mcp_servers} server(s)");
3602 if let Some(ref tf) = self.tool_schema_filter {
3603 let _ = writeln!(
3604 out,
3605 "Filter: enabled (top_k={}, always_on={}, {} embeddings)",
3606 tf.top_k(),
3607 tf.always_on_count(),
3608 tf.embedding_count(),
3609 );
3610 }
3611 if cost_cents > 0.0 {
3612 let _ = writeln!(out, "Cost: ${:.4}", cost_cents / 100.0);
3613 }
3614 if orch_plans > 0 {
3615 let _ = writeln!(out);
3616 let _ = writeln!(out, "Orchestration:");
3617 let _ = writeln!(out, " Plans: {orch_plans}");
3618 let _ = writeln!(out, " Tasks: {orch_completed}/{orch_tasks} completed");
3619 if orch_failed > 0 {
3620 let _ = writeln!(out, " Failed: {orch_failed}");
3621 }
3622 if orch_skipped > 0 {
3623 let _ = writeln!(out, " Skipped: {orch_skipped}");
3624 }
3625 }
3626
3627 #[cfg(feature = "context-compression")]
3629 {
3630 use crate::config::PruningStrategy;
3631 if matches!(
3632 self.context_manager.compression.pruning_strategy,
3633 PruningStrategy::Subgoal | PruningStrategy::SubgoalMig
3634 ) {
3635 let _ = writeln!(out);
3636 let _ = writeln!(
3637 out,
3638 "Pruning: {}",
3639 match self.context_manager.compression.pruning_strategy {
3640 PruningStrategy::SubgoalMig => "subgoal_mig",
3641 _ => "subgoal",
3642 }
3643 );
3644 let subgoal_count = self.compression.subgoal_registry.subgoals.len();
3645 let _ = writeln!(out, "Subgoals: {subgoal_count} tracked");
3646 if let Some(active) = self.compression.subgoal_registry.active_subgoal() {
3647 let _ = writeln!(out, "Active: \"{}\"", active.description);
3648 } else {
3649 let _ = writeln!(out, "Active: (none yet)");
3650 }
3651 }
3652 }
3653
3654 let gc = &self.memory_state.graph_config;
3656 if gc.enabled {
3657 let _ = writeln!(out);
3658 if gc.spreading_activation.enabled {
3659 let _ = writeln!(
3660 out,
3661 "Graph recall: spreading activation (lambda={:.2}, hops={})",
3662 gc.spreading_activation.decay_lambda, gc.spreading_activation.max_hops,
3663 );
3664 } else {
3665 let _ = writeln!(out, "Graph recall: BFS (hops={})", gc.max_hops,);
3666 }
3667 }
3668
3669 self.channel.send(out.trim_end()).await?;
3670 Ok(())
3671 }
3672
3673 #[cfg(feature = "guardrail")]
3674 async fn handle_guardrail_command(&mut self) -> Result<(), error::AgentError> {
3675 use std::fmt::Write;
3676
3677 let mut out = String::new();
3678 if let Some(ref guardrail) = self.security.guardrail {
3679 let stats = guardrail.stats();
3680 let _ = writeln!(out, "Guardrail: enabled");
3681 let _ = writeln!(out, "Action: {:?}", guardrail.action());
3682 let _ = writeln!(out, "Fail strategy: {:?}", guardrail.fail_strategy());
3683 let _ = writeln!(out, "Timeout: {}ms", guardrail.timeout_ms());
3684 let _ = writeln!(
3685 out,
3686 "Tool scan: {}",
3687 if guardrail.scan_tool_output() {
3688 "enabled"
3689 } else {
3690 "disabled"
3691 }
3692 );
3693 let _ = writeln!(out, "\nStats:");
3694 let _ = writeln!(out, " Total checks: {}", stats.total_checks);
3695 let _ = writeln!(out, " Flagged: {}", stats.flagged_count);
3696 let _ = writeln!(out, " Errors: {}", stats.error_count);
3697 let _ = writeln!(out, " Avg latency: {}ms", stats.avg_latency_ms());
3698 } else {
3699 out.push_str("Guardrail: disabled\n");
3700 out.push_str(
3701 "Enable with: --guardrail flag or [security.guardrail] enabled = true in config",
3702 );
3703 }
3704
3705 self.channel.send(out.trim_end()).await?;
3706 Ok(())
3707 }
3708
3709 async fn handle_skills_command(&mut self) -> Result<(), error::AgentError> {
3710 use std::fmt::Write;
3711
3712 let mut output = String::from("Available skills:\n\n");
3713
3714 let all_meta: Vec<zeph_skills::loader::SkillMeta> = self
3715 .skill_state
3716 .registry
3717 .read()
3718 .expect("registry read lock")
3719 .all_meta()
3720 .into_iter()
3721 .cloned()
3722 .collect();
3723
3724 for meta in &all_meta {
3725 let trust_info = if let Some(memory) = &self.memory_state.memory {
3726 memory
3727 .sqlite()
3728 .load_skill_trust(&meta.name)
3729 .await
3730 .ok()
3731 .flatten()
3732 .map_or_else(String::new, |r| format!(" [{}]", r.trust_level))
3733 } else {
3734 String::new()
3735 };
3736 let _ = writeln!(output, "- {} — {}{trust_info}", meta.name, meta.description);
3737 }
3738
3739 if let Some(memory) = &self.memory_state.memory {
3740 match memory.sqlite().load_skill_usage().await {
3741 Ok(usage) if !usage.is_empty() => {
3742 output.push_str("\nUsage statistics:\n\n");
3743 for row in &usage {
3744 let _ = writeln!(
3745 output,
3746 "- {}: {} invocations (last: {})",
3747 row.skill_name, row.invocation_count, row.last_used_at,
3748 );
3749 }
3750 }
3751 Ok(_) => {}
3752 Err(e) => tracing::warn!("failed to load skill usage: {e:#}"),
3753 }
3754 }
3755
3756 self.channel.send(&output).await?;
3757 Ok(())
3758 }
3759
3760 async fn handle_feedback(&mut self, input: &str) -> Result<(), error::AgentError> {
3761 let Some((name, rest)) = input.split_once(' ') else {
3762 self.channel
3763 .send("Usage: /feedback <skill_name> <message>")
3764 .await?;
3765 return Ok(());
3766 };
3767 let (skill_name, feedback) = (name.trim(), rest.trim().trim_matches('"'));
3768
3769 if feedback.is_empty() {
3770 self.channel
3771 .send("Usage: /feedback <skill_name> <message>")
3772 .await?;
3773 return Ok(());
3774 }
3775
3776 let Some(memory) = &self.memory_state.memory else {
3777 self.channel.send("Memory not available.").await?;
3778 return Ok(());
3779 };
3780
3781 let outcome_type = if self.feedback.detector.detect(feedback, &[]).is_some() {
3782 "user_rejection"
3783 } else {
3784 "user_approval"
3785 };
3786
3787 memory
3788 .sqlite()
3789 .record_skill_outcome(
3790 skill_name,
3791 None,
3792 self.memory_state.conversation_id,
3793 outcome_type,
3794 None,
3795 Some(feedback),
3796 )
3797 .await?;
3798
3799 if self.is_learning_enabled() && outcome_type == "user_rejection" {
3800 self.generate_improved_skill(skill_name, feedback, "", Some(feedback))
3801 .await
3802 .ok();
3803 }
3804
3805 self.channel
3806 .send(&format!("Feedback recorded for \"{skill_name}\"."))
3807 .await?;
3808 Ok(())
3809 }
3810
3811 async fn poll_subagent_until_done(&mut self, task_id: &str, label: &str) -> Option<String> {
3814 use crate::subagent::SubAgentState;
3815 let result = loop {
3816 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
3817
3818 #[allow(clippy::redundant_closure_for_method_calls)]
3822 let pending = self
3823 .orchestration
3824 .subagent_manager
3825 .as_mut()
3826 .and_then(|m| m.try_recv_secret_request());
3827 if let Some((req_task_id, req)) = pending {
3828 let confirm_prompt = format!(
3831 "Sub-agent requests secret '{}'. Allow?",
3832 crate::text::truncate_to_chars(&req.secret_key, 100)
3833 );
3834 let approved = self.channel.confirm(&confirm_prompt).await.unwrap_or(false);
3835 if let Some(mgr) = self.orchestration.subagent_manager.as_mut() {
3836 if approved {
3837 let ttl = std::time::Duration::from_secs(300);
3838 let key = req.secret_key.clone();
3839 if mgr.approve_secret(&req_task_id, &key, ttl).is_ok() {
3840 let _ = mgr.deliver_secret(&req_task_id, key);
3841 }
3842 } else {
3843 let _ = mgr.deny_secret(&req_task_id);
3844 }
3845 }
3846 }
3847
3848 let mgr = self.orchestration.subagent_manager.as_ref()?;
3849 let statuses = mgr.statuses();
3850 let Some((_, status)) = statuses.iter().find(|(id, _)| id == task_id) else {
3851 break format!("{label} completed (no status available).");
3852 };
3853 match status.state {
3854 SubAgentState::Completed => {
3855 let msg = status.last_message.clone().unwrap_or_else(|| "done".into());
3856 break format!("{label} completed: {msg}");
3857 }
3858 SubAgentState::Failed => {
3859 let msg = status
3860 .last_message
3861 .clone()
3862 .unwrap_or_else(|| "unknown error".into());
3863 break format!("{label} failed: {msg}");
3864 }
3865 SubAgentState::Canceled => {
3866 break format!("{label} was cancelled.");
3867 }
3868 _ => {
3869 let _ = self
3870 .channel
3871 .send_status(&format!(
3872 "{label}: turn {}/{}",
3873 status.turns_used,
3874 self.orchestration
3875 .subagent_manager
3876 .as_ref()
3877 .and_then(|m| m.agents_def(task_id))
3878 .map_or(20, |d| d.permissions.max_turns)
3879 ))
3880 .await;
3881 }
3882 }
3883 };
3884 Some(result)
3885 }
3886
3887 fn resolve_agent_id_prefix(&mut self, prefix: &str) -> Option<Result<String, String>> {
3890 let mgr = self.orchestration.subagent_manager.as_mut()?;
3891 let full_ids: Vec<String> = mgr
3892 .statuses()
3893 .into_iter()
3894 .map(|(tid, _)| tid)
3895 .filter(|tid| tid.starts_with(prefix))
3896 .collect();
3897 Some(match full_ids.as_slice() {
3898 [] => Err(format!("No sub-agent with id prefix '{prefix}'")),
3899 [fid] => Ok(fid.clone()),
3900 _ => Err(format!(
3901 "Ambiguous id prefix '{prefix}': matches {} agents",
3902 full_ids.len()
3903 )),
3904 })
3905 }
3906
3907 fn handle_agent_list(&self) -> Option<String> {
3908 use std::fmt::Write as _;
3909 let mgr = self.orchestration.subagent_manager.as_ref()?;
3910 let defs = mgr.definitions();
3911 if defs.is_empty() {
3912 return Some("No sub-agent definitions found.".into());
3913 }
3914 let mut out = String::from("Available sub-agents:\n");
3915 for d in defs {
3916 let memory_label = match d.memory {
3917 Some(crate::subagent::MemoryScope::User) => " [memory:user]",
3918 Some(crate::subagent::MemoryScope::Project) => " [memory:project]",
3919 Some(crate::subagent::MemoryScope::Local) => " [memory:local]",
3920 None => "",
3921 };
3922 if let Some(ref src) = d.source {
3923 let _ = writeln!(
3924 out,
3925 " {}{} — {} ({})",
3926 d.name, memory_label, d.description, src
3927 );
3928 } else {
3929 let _ = writeln!(out, " {}{} — {}", d.name, memory_label, d.description);
3930 }
3931 }
3932 Some(out)
3933 }
3934
3935 fn handle_agent_status(&self) -> Option<String> {
3936 use std::fmt::Write as _;
3937 let mgr = self.orchestration.subagent_manager.as_ref()?;
3938 let statuses = mgr.statuses();
3939 if statuses.is_empty() {
3940 return Some("No active sub-agents.".into());
3941 }
3942 let mut out = String::from("Active sub-agents:\n");
3943 for (id, s) in &statuses {
3944 let state = format!("{:?}", s.state).to_lowercase();
3945 let elapsed = s.started_at.elapsed().as_secs();
3946 let _ = writeln!(
3947 out,
3948 " [{short}] {state} turns={t} elapsed={elapsed}s {msg}",
3949 short = &id[..8.min(id.len())],
3950 t = s.turns_used,
3951 msg = s.last_message.as_deref().unwrap_or(""),
3952 );
3953 if let Some(def) = mgr.agents_def(id)
3955 && let Some(scope) = def.memory
3956 && let Ok(dir) = crate::subagent::memory::resolve_memory_dir(scope, &def.name)
3957 {
3958 let _ = writeln!(out, " memory: {}", dir.display());
3959 }
3960 }
3961 Some(out)
3962 }
3963
3964 fn handle_agent_approve(&mut self, id: &str) -> Option<String> {
3965 let full_id = match self.resolve_agent_id_prefix(id)? {
3966 Ok(fid) => fid,
3967 Err(msg) => return Some(msg),
3968 };
3969 let mgr = self.orchestration.subagent_manager.as_mut()?;
3970 if let Some((tid, req)) = mgr.try_recv_secret_request()
3971 && tid == full_id
3972 {
3973 let key = req.secret_key.clone();
3974 let ttl = std::time::Duration::from_secs(300);
3975 if let Err(e) = mgr.approve_secret(&full_id, &key, ttl) {
3976 return Some(format!("Approve failed: {e}"));
3977 }
3978 if let Err(e) = mgr.deliver_secret(&full_id, key.clone()) {
3979 return Some(format!("Secret delivery failed: {e}"));
3980 }
3981 return Some(format!("Secret '{key}' approved for sub-agent {full_id}."));
3982 }
3983 Some(format!(
3984 "No pending secret request for sub-agent '{full_id}'."
3985 ))
3986 }
3987
3988 fn handle_agent_deny(&mut self, id: &str) -> Option<String> {
3989 let full_id = match self.resolve_agent_id_prefix(id)? {
3990 Ok(fid) => fid,
3991 Err(msg) => return Some(msg),
3992 };
3993 let mgr = self.orchestration.subagent_manager.as_mut()?;
3994 match mgr.deny_secret(&full_id) {
3995 Ok(()) => Some(format!("Secret request denied for sub-agent '{full_id}'.")),
3996 Err(e) => Some(format!("Deny failed: {e}")),
3997 }
3998 }
3999
4000 async fn handle_agent_command(&mut self, cmd: crate::subagent::AgentCommand) -> Option<String> {
4001 use crate::subagent::AgentCommand;
4002
4003 match cmd {
4004 AgentCommand::List => self.handle_agent_list(),
4005 AgentCommand::Background { name, prompt } => {
4006 let provider = self.provider.clone();
4007 let tool_executor = Arc::clone(&self.tool_executor);
4008 let skills = self.filtered_skills_for(&name);
4009 let mgr = self.orchestration.subagent_manager.as_mut()?;
4010 let cfg = self.orchestration.subagent_config.clone();
4011 match mgr.spawn(&name, &prompt, provider, tool_executor, skills, &cfg) {
4012 Ok(id) => Some(format!(
4013 "Sub-agent '{name}' started in background (id: {short})",
4014 short = &id[..8.min(id.len())]
4015 )),
4016 Err(e) => Some(format!("Failed to spawn sub-agent: {e}")),
4017 }
4018 }
4019 AgentCommand::Spawn { name, prompt }
4020 | AgentCommand::Mention {
4021 agent: name,
4022 prompt,
4023 } => {
4024 let provider = self.provider.clone();
4026 let tool_executor = Arc::clone(&self.tool_executor);
4027 let skills = self.filtered_skills_for(&name);
4028 let mgr = self.orchestration.subagent_manager.as_mut()?;
4029 let cfg = self.orchestration.subagent_config.clone();
4030 let task_id = match mgr.spawn(&name, &prompt, provider, tool_executor, skills, &cfg)
4031 {
4032 Ok(id) => id,
4033 Err(e) => return Some(format!("Failed to spawn sub-agent: {e}")),
4034 };
4035 let short = task_id[..8.min(task_id.len())].to_owned();
4036 let _ = self
4037 .channel
4038 .send(&format!("Sub-agent '{name}' running... (id: {short})"))
4039 .await;
4040 let label = format!("Sub-agent '{name}'");
4041 self.poll_subagent_until_done(&task_id, &label).await
4042 }
4043 AgentCommand::Status => self.handle_agent_status(),
4044 AgentCommand::Cancel { id } => {
4045 let mgr = self.orchestration.subagent_manager.as_mut()?;
4046 let ids: Vec<String> = mgr
4048 .statuses()
4049 .into_iter()
4050 .map(|(task_id, _)| task_id)
4051 .filter(|task_id| task_id.starts_with(&id))
4052 .collect();
4053 match ids.as_slice() {
4054 [] => Some(format!("No sub-agent with id prefix '{id}'")),
4055 [full_id] => {
4056 let full_id = full_id.clone();
4057 match mgr.cancel(&full_id) {
4058 Ok(()) => Some(format!("Cancelled sub-agent {full_id}.")),
4059 Err(e) => Some(format!("Cancel failed: {e}")),
4060 }
4061 }
4062 _ => Some(format!(
4063 "Ambiguous id prefix '{id}': matches {} agents",
4064 ids.len()
4065 )),
4066 }
4067 }
4068 AgentCommand::Approve { id } => self.handle_agent_approve(&id),
4069 AgentCommand::Deny { id } => self.handle_agent_deny(&id),
4070 AgentCommand::Resume { id, prompt } => {
4071 let cfg = self.orchestration.subagent_config.clone();
4072 let def_name = {
4075 let mgr = self.orchestration.subagent_manager.as_ref()?;
4076 match mgr.def_name_for_resume(&id, &cfg) {
4077 Ok(name) => name,
4078 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
4079 }
4080 };
4081 let skills = self.filtered_skills_for(&def_name);
4082 let provider = self.provider.clone();
4083 let tool_executor = Arc::clone(&self.tool_executor);
4084 let mgr = self.orchestration.subagent_manager.as_mut()?;
4085 let (task_id, _) =
4086 match mgr.resume(&id, &prompt, provider, tool_executor, skills, &cfg) {
4087 Ok(pair) => pair,
4088 Err(e) => return Some(format!("Failed to resume sub-agent: {e}")),
4089 };
4090 let short = task_id[..8.min(task_id.len())].to_owned();
4091 let _ = self
4092 .channel
4093 .send(&format!("Resuming sub-agent '{id}'... (new id: {short})"))
4094 .await;
4095 self.poll_subagent_until_done(&task_id, "Resumed sub-agent")
4096 .await
4097 }
4098 }
4099 }
4100
4101 fn filtered_skills_for(&self, agent_name: &str) -> Option<Vec<String>> {
4102 let mgr = self.orchestration.subagent_manager.as_ref()?;
4103 let def = mgr.definitions().iter().find(|d| d.name == agent_name)?;
4104 let reg = self
4105 .skill_state
4106 .registry
4107 .read()
4108 .expect("registry read lock");
4109 match crate::subagent::filter_skills(®, &def.skills) {
4110 Ok(skills) => {
4111 let bodies: Vec<String> = skills.into_iter().map(|s| s.body.clone()).collect();
4112 if bodies.is_empty() {
4113 None
4114 } else {
4115 Some(bodies)
4116 }
4117 }
4118 Err(e) => {
4119 tracing::warn!(error = %e, "skill filtering failed for sub-agent");
4120 None
4121 }
4122 }
4123 }
4124
4125 async fn update_trust_for_reloaded_skills(&self, all_meta: &[zeph_skills::loader::SkillMeta]) {
4127 let Some(ref memory) = self.memory_state.memory else {
4128 return;
4129 };
4130 let trust_cfg = self.skill_state.trust_config.clone();
4131 let managed_dir = self.skill_state.managed_dir.clone();
4132 for meta in all_meta {
4133 let source_kind = if managed_dir
4134 .as_ref()
4135 .is_some_and(|d| meta.skill_dir.starts_with(d))
4136 {
4137 zeph_memory::sqlite::SourceKind::Hub
4138 } else {
4139 zeph_memory::sqlite::SourceKind::Local
4140 };
4141 let initial_level = if matches!(source_kind, zeph_memory::sqlite::SourceKind::Hub) {
4142 &trust_cfg.default_level
4143 } else {
4144 &trust_cfg.local_level
4145 };
4146 match zeph_skills::compute_skill_hash(&meta.skill_dir) {
4147 Ok(current_hash) => {
4148 let existing = memory
4149 .sqlite()
4150 .load_skill_trust(&meta.name)
4151 .await
4152 .ok()
4153 .flatten();
4154 let trust_level_str = if let Some(ref row) = existing {
4155 if row.blake3_hash == current_hash {
4156 row.trust_level.clone()
4157 } else {
4158 trust_cfg.hash_mismatch_level.to_string()
4159 }
4160 } else {
4161 initial_level.to_string()
4162 };
4163 let source_path = meta.skill_dir.to_str();
4164 if let Err(e) = memory
4165 .sqlite()
4166 .upsert_skill_trust(
4167 &meta.name,
4168 &trust_level_str,
4169 source_kind,
4170 None,
4171 source_path,
4172 ¤t_hash,
4173 )
4174 .await
4175 {
4176 tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
4177 }
4178 }
4179 Err(e) => {
4180 tracing::warn!("failed to compute hash for '{}': {e:#}", meta.name);
4181 }
4182 }
4183 }
4184 }
4185
4186 async fn rebuild_skill_matcher(&mut self, all_meta: &[&zeph_skills::loader::SkillMeta]) {
4188 let provider = self.embedding_provider.clone();
4189 let embed_fn = |text: &str| -> zeph_skills::matcher::EmbedFuture {
4190 let owned = text.to_owned();
4191 let p = provider.clone();
4192 Box::pin(async move { p.embed(&owned).await })
4193 };
4194
4195 let needs_inmemory_rebuild = !self
4196 .skill_state
4197 .matcher
4198 .as_ref()
4199 .is_some_and(SkillMatcherBackend::is_qdrant);
4200
4201 if needs_inmemory_rebuild {
4202 self.skill_state.matcher = SkillMatcher::new(all_meta, embed_fn)
4203 .await
4204 .map(SkillMatcherBackend::InMemory);
4205 } else if let Some(ref mut backend) = self.skill_state.matcher {
4206 let _ = self.channel.send_status("syncing skill index...").await;
4207 if let Err(e) = backend
4208 .sync(all_meta, &self.skill_state.embedding_model, embed_fn)
4209 .await
4210 {
4211 tracing::warn!("failed to sync skill embeddings: {e:#}");
4212 }
4213 }
4214
4215 if self.skill_state.hybrid_search {
4216 let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
4217 let _ = self.channel.send_status("rebuilding search index...").await;
4218 self.skill_state.bm25_index = Some(zeph_skills::bm25::Bm25Index::build(&descs));
4219 }
4220 }
4221
4222 async fn reload_skills(&mut self) {
4223 let new_registry = SkillRegistry::load(&self.skill_state.skill_paths);
4224 if new_registry.fingerprint()
4225 == self
4226 .skill_state
4227 .registry
4228 .read()
4229 .expect("registry read lock")
4230 .fingerprint()
4231 {
4232 return;
4233 }
4234 let _ = self.channel.send_status("reloading skills...").await;
4235 *self
4236 .skill_state
4237 .registry
4238 .write()
4239 .expect("registry write lock") = new_registry;
4240
4241 let all_meta = self
4242 .skill_state
4243 .registry
4244 .read()
4245 .expect("registry read lock")
4246 .all_meta()
4247 .into_iter()
4248 .cloned()
4249 .collect::<Vec<_>>();
4250
4251 self.update_trust_for_reloaded_skills(&all_meta).await;
4252
4253 let all_meta_refs = all_meta.iter().collect::<Vec<_>>();
4254 self.rebuild_skill_matcher(&all_meta_refs).await;
4255
4256 let all_skills: Vec<Skill> = {
4257 let reg = self
4258 .skill_state
4259 .registry
4260 .read()
4261 .expect("registry read lock");
4262 reg.all_meta()
4263 .iter()
4264 .filter_map(|m| reg.get_skill(&m.name).ok())
4265 .collect()
4266 };
4267 let trust_map = self.build_skill_trust_map().await;
4268 let empty_health: HashMap<String, (f64, u32)> = HashMap::new();
4269 let skills_prompt = format_skills_prompt(&all_skills, &trust_map, &empty_health);
4270 self.skill_state
4271 .last_skills_prompt
4272 .clone_from(&skills_prompt);
4273 let system_prompt = build_system_prompt(&skills_prompt, None, None, false);
4274 if let Some(msg) = self.msg.messages.first_mut() {
4275 msg.content = system_prompt;
4276 }
4277
4278 let _ = self.channel.send_status("").await;
4279 tracing::info!(
4280 "reloaded {} skill(s)",
4281 self.skill_state
4282 .registry
4283 .read()
4284 .expect("registry read lock")
4285 .all_meta()
4286 .len()
4287 );
4288 }
4289
4290 fn reload_instructions(&mut self) {
4291 if let Some(ref mut rx) = self.instructions.reload_rx {
4293 while rx.try_recv().is_ok() {}
4294 }
4295 let Some(ref state) = self.instructions.reload_state else {
4296 return;
4297 };
4298 let new_blocks = crate::instructions::load_instructions(
4299 &state.base_dir,
4300 &state.provider_kinds,
4301 &state.explicit_files,
4302 state.auto_detect,
4303 );
4304 let old_sources: std::collections::HashSet<_> =
4305 self.instructions.blocks.iter().map(|b| &b.source).collect();
4306 let new_sources: std::collections::HashSet<_> =
4307 new_blocks.iter().map(|b| &b.source).collect();
4308 for added in new_sources.difference(&old_sources) {
4309 tracing::info!(path = %added.display(), "instruction file added");
4310 }
4311 for removed in old_sources.difference(&new_sources) {
4312 tracing::info!(path = %removed.display(), "instruction file removed");
4313 }
4314 tracing::info!(
4315 old_count = self.instructions.blocks.len(),
4316 new_count = new_blocks.len(),
4317 "reloaded instruction files"
4318 );
4319 self.instructions.blocks = new_blocks;
4320 }
4321
4322 fn reload_config(&mut self) {
4323 let Some(ref path) = self.lifecycle.config_path else {
4324 return;
4325 };
4326 let config = match Config::load(path) {
4327 Ok(c) => c,
4328 Err(e) => {
4329 tracing::warn!("config reload failed: {e:#}");
4330 return;
4331 }
4332 };
4333
4334 self.runtime.security = config.security;
4335 self.runtime.timeouts = config.timeouts;
4336 self.runtime.redact_credentials = config.memory.redact_credentials;
4337 self.memory_state.history_limit = config.memory.history_limit;
4338 self.memory_state.recall_limit = config.memory.semantic.recall_limit;
4339 self.memory_state.summarization_threshold = config.memory.summarization_threshold;
4340 self.skill_state.max_active_skills = config.skills.max_active_skills;
4341 self.skill_state.disambiguation_threshold = config.skills.disambiguation_threshold;
4342 self.skill_state.cosine_weight = config.skills.cosine_weight.clamp(0.0, 1.0);
4343 self.skill_state.hybrid_search = config.skills.hybrid_search;
4344
4345 if config.memory.context_budget_tokens > 0 {
4346 self.context_manager.budget = Some(
4347 ContextBudget::new(config.memory.context_budget_tokens, 0.20)
4348 .with_graph_enabled(config.memory.graph.enabled),
4349 );
4350 } else {
4351 self.context_manager.budget = None;
4352 }
4353
4354 {
4355 self.memory_state.graph_config = config.memory.graph.clone();
4356 }
4357 self.context_manager.soft_compaction_threshold = config.memory.soft_compaction_threshold;
4358 self.context_manager.hard_compaction_threshold = config.memory.hard_compaction_threshold;
4359 self.context_manager.compaction_preserve_tail = config.memory.compaction_preserve_tail;
4360 self.context_manager.compaction_cooldown_turns = config.memory.compaction_cooldown_turns;
4361 self.context_manager.prune_protect_tokens = config.memory.prune_protect_tokens;
4362 self.context_manager.compression = config.memory.compression.clone();
4363 self.context_manager.routing = config.memory.routing.clone();
4364 self.memory_state.cross_session_score_threshold =
4365 config.memory.cross_session_score_threshold;
4366
4367 self.index.repo_map_tokens = config.index.repo_map_tokens;
4368 self.index.repo_map_ttl = std::time::Duration::from_secs(config.index.repo_map_ttl_secs);
4369
4370 tracing::info!("config reloaded");
4371 }
4372
4373 #[cfg(feature = "context-compression")]
4375 async fn handle_focus_status_command(&mut self) -> Result<(), error::AgentError> {
4376 use std::fmt::Write;
4377 let mut out = String::from("Focus Agent status\n\n");
4378 let _ = writeln!(out, "Enabled: {}", self.focus.config.enabled);
4379 let _ = writeln!(out, "Active session: {}", self.focus.is_active());
4380 if let Some(ref scope) = self.focus.active_scope {
4381 let _ = writeln!(out, "Active scope: {scope}");
4382 }
4383 let _ = writeln!(
4384 out,
4385 "Knowledge blocks: {}",
4386 self.focus.knowledge_blocks.len()
4387 );
4388 let _ = writeln!(out, "Turns since focus: {}", self.focus.turns_since_focus);
4389 self.channel.send(&out).await?;
4390 Ok(())
4391 }
4392
4393 #[cfg(feature = "context-compression")]
4395 async fn handle_sidequest_status_command(&mut self) -> Result<(), error::AgentError> {
4396 use std::fmt::Write;
4397 let mut out = String::from("SideQuest status\n\n");
4398 let _ = writeln!(out, "Enabled: {}", self.sidequest.config.enabled);
4399 let _ = writeln!(
4400 out,
4401 "Interval turns: {}",
4402 self.sidequest.config.interval_turns
4403 );
4404 let _ = writeln!(out, "Turn counter: {}", self.sidequest.turn_counter);
4405 let _ = writeln!(out, "Passes run: {}", self.sidequest.passes_run);
4406 let _ = writeln!(
4407 out,
4408 "Total evicted: {} tool outputs",
4409 self.sidequest.total_evicted
4410 );
4411 self.channel.send(&out).await?;
4412 Ok(())
4413 }
4414
4415 #[cfg(feature = "context-compression")]
4426 #[allow(clippy::too_many_lines)]
4427 fn maybe_sidequest_eviction(&mut self) {
4428 use zeph_llm::provider::{Message, MessageMetadata, Role};
4429
4430 if self.sidequest.config.enabled {
4434 use crate::config::PruningStrategy;
4435 if !matches!(
4436 self.context_manager.compression.pruning_strategy,
4437 PruningStrategy::Reactive
4438 ) {
4439 tracing::warn!(
4440 strategy = ?self.context_manager.compression.pruning_strategy,
4441 "sidequest is enabled alongside a non-Reactive pruning strategy; \
4442 consider disabling sidequest.enabled to avoid redundant eviction"
4443 );
4444 }
4445 }
4446
4447 if self.focus.is_active() {
4449 tracing::debug!("sidequest: skipping — focus session active");
4450 self.compression.pending_sidequest_result = None;
4452 return;
4453 }
4454
4455 if let Some(handle) = self.compression.pending_sidequest_result.take() {
4457 use futures::FutureExt as _;
4459 match handle.now_or_never() {
4460 Some(Ok(Some(evicted_indices))) if !evicted_indices.is_empty() => {
4461 let cursors_snapshot = self.sidequest.tool_output_cursors.clone();
4462 let freed = self.sidequest.apply_eviction(
4463 &mut self.msg.messages,
4464 &evicted_indices,
4465 &self.metrics.token_counter,
4466 );
4467 if freed > 0 {
4468 self.recompute_prompt_tokens();
4469 self.context_manager.compaction =
4472 crate::agent::context_manager::CompactionState::CompactedThisTurn {
4473 cooldown: 0,
4474 };
4475 tracing::info!(
4476 freed_tokens = freed,
4477 evicted_cursors = evicted_indices.len(),
4478 pass = self.sidequest.passes_run,
4479 "sidequest eviction complete"
4480 );
4481 if let Some(ref d) = self.debug_state.debug_dumper {
4482 d.dump_sidequest_eviction(&cursors_snapshot, &evicted_indices, freed);
4483 }
4484 if let Some(ref tx) = self.session.status_tx {
4485 let _ = tx.send(format!("SideQuest evicted {freed} tokens"));
4486 }
4487 } else {
4488 if let Some(ref tx) = self.session.status_tx {
4490 let _ = tx.send(String::new());
4491 }
4492 }
4493 }
4494 Some(Ok(None | Some(_))) => {
4495 tracing::debug!("sidequest: pending result: no cursors to evict");
4496 if let Some(ref tx) = self.session.status_tx {
4497 let _ = tx.send(String::new());
4498 }
4499 }
4500 Some(Err(e)) => {
4501 tracing::debug!("sidequest: background task panicked: {e}");
4502 if let Some(ref tx) = self.session.status_tx {
4503 let _ = tx.send(String::new());
4504 }
4505 }
4506 None => {
4507 tracing::debug!(
4511 "sidequest: background LLM task not yet complete, rescheduling"
4512 );
4513 }
4514 }
4515 }
4516
4517 self.sidequest
4519 .rebuild_cursors(&self.msg.messages, &self.metrics.token_counter);
4520
4521 if self.sidequest.tool_output_cursors.is_empty() {
4522 tracing::debug!("sidequest: no eligible cursors");
4523 return;
4524 }
4525
4526 let prompt = self.sidequest.build_eviction_prompt();
4527 let max_eviction_ratio = self.sidequest.config.max_eviction_ratio;
4528 let n_cursors = self.sidequest.tool_output_cursors.len();
4529 let provider = self.summary_or_primary_provider().clone();
4531
4532 let handle = tokio::spawn(async move {
4534 let msgs = [Message {
4535 role: Role::User,
4536 content: prompt,
4537 parts: vec![],
4538 metadata: MessageMetadata::default(),
4539 }];
4540 let response =
4541 match tokio::time::timeout(std::time::Duration::from_secs(5), provider.chat(&msgs))
4542 .await
4543 {
4544 Ok(Ok(r)) => r,
4545 Ok(Err(e)) => {
4546 tracing::debug!("sidequest bg: LLM call failed: {e:#}");
4547 return None;
4548 }
4549 Err(_) => {
4550 tracing::debug!("sidequest bg: LLM call timed out");
4551 return None;
4552 }
4553 };
4554
4555 let start = response.find('{')?;
4556 let end = response.rfind('}')?;
4557 if start > end {
4558 return None;
4559 }
4560 let json_slice = &response[start..=end];
4561 let parsed: sidequest::EvictionResponse = serde_json::from_str(json_slice).ok()?;
4562 let mut valid: Vec<usize> = parsed
4563 .del_cursors
4564 .into_iter()
4565 .filter(|&c| c < n_cursors)
4566 .collect();
4567 valid.sort_unstable();
4568 valid.dedup();
4569 #[allow(
4570 clippy::cast_precision_loss,
4571 clippy::cast_possible_truncation,
4572 clippy::cast_sign_loss
4573 )]
4574 let max_evict = ((n_cursors as f32) * max_eviction_ratio).ceil() as usize;
4575 valid.truncate(max_evict);
4576 Some(valid)
4577 });
4578
4579 self.compression.pending_sidequest_result = Some(handle);
4580 tracing::debug!("sidequest: background LLM eviction task spawned");
4581 if let Some(ref tx) = self.session.status_tx {
4582 let _ = tx.send("SideQuest: scoring tool outputs...".into());
4583 }
4584 }
4585}
4586pub(crate) async fn shutdown_signal(rx: &mut watch::Receiver<bool>) {
4587 while !*rx.borrow_and_update() {
4588 if rx.changed().await.is_err() {
4589 std::future::pending::<()>().await;
4590 }
4591 }
4592}
4593
4594fn feedback_verdict_into_signal(
4598 verdict: &zeph_llm::classifier::llm::FeedbackVerdict,
4599 user_message: &str,
4600) -> Option<feedback_detector::CorrectionSignal> {
4601 if !verdict.is_correction {
4602 return None;
4603 }
4604 let confidence = verdict.confidence.clamp(0.0, 1.0);
4605 let kind_raw = verdict.kind.trim().to_lowercase().replace(' ', "_");
4606 let kind = match kind_raw.as_str() {
4607 "explicit_rejection" => feedback_detector::CorrectionKind::ExplicitRejection,
4608 "alternative_request" => feedback_detector::CorrectionKind::AlternativeRequest,
4609 "repetition" => feedback_detector::CorrectionKind::Repetition,
4610 "self_correction" => feedback_detector::CorrectionKind::SelfCorrection,
4611 other => {
4612 tracing::warn!(
4613 kind = other,
4614 "llm-classifier returned unknown correction kind, discarding"
4615 );
4616 return None;
4617 }
4618 };
4619 Some(feedback_detector::CorrectionSignal {
4620 confidence,
4621 kind,
4622 feedback_text: user_message.to_owned(),
4623 })
4624}
4625
4626async fn store_correction_in_memory(
4628 memory: Option<std::sync::Arc<zeph_memory::semantic::SemanticMemory>>,
4629 conv_id: Option<zeph_memory::ConversationId>,
4630 assistant_snippet: &str,
4631 user_msg: &str,
4632 skill_name: String,
4633 kind_str: &str,
4634) {
4635 let Some(mem) = memory else { return };
4636 let correction_text = context::truncate_chars(user_msg, 500);
4637 match mem
4638 .sqlite()
4639 .store_user_correction(
4640 conv_id.map(|c| c.0),
4641 assistant_snippet,
4642 &correction_text,
4643 if skill_name.is_empty() {
4644 None
4645 } else {
4646 Some(skill_name.as_str())
4647 },
4648 kind_str,
4649 )
4650 .await
4651 {
4652 Ok(correction_id) => {
4653 if let Err(e) = mem
4654 .store_correction_embedding(correction_id, &correction_text)
4655 .await
4656 {
4657 tracing::warn!("failed to store correction embedding: {e:#}");
4658 }
4659 }
4660 Err(e) => {
4661 tracing::warn!("failed to store judge correction: {e:#}");
4662 }
4663 }
4664}
4665
4666pub(crate) async fn recv_optional<T>(rx: &mut Option<mpsc::Receiver<T>>) -> Option<T> {
4667 match rx {
4668 Some(inner) => {
4669 if let Some(v) = inner.recv().await {
4670 Some(v)
4671 } else {
4672 *rx = None;
4673 std::future::pending().await
4674 }
4675 }
4676 None => std::future::pending().await,
4677 }
4678}
4679
4680#[cfg(test)]
4681mod tests;
4682
4683#[cfg(test)]
4684pub(crate) use tests::agent_tests;