1use std::fmt::Write as _;
13use std::future::Future;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::Duration;
17
18use tracing::Instrument as _;
19use zeph_commands::CommandError;
20use zeph_commands::traits::agent::AgentAccess;
21use zeph_memory::semantic::SemanticMemory;
22use zeph_memory::{GraphExtractionConfig, GraphStore, MessageId, extract_and_store};
23
24use super::{Agent, error::AgentError};
25use crate::channel::Channel;
26
27impl<C: Channel + Send + 'static> Agent<C> {
28 fn resolve_graph_store(&self) -> Result<(Arc<SemanticMemory>, Arc<GraphStore>), String> {
29 let Some(memory) = self.services.memory.persistence.memory.clone() else {
30 return Err("Graph memory is not enabled.".to_owned());
31 };
32 let Some(store) = memory.graph_store.clone() else {
33 if self.services.memory.extraction.graph_config.enabled {
34 return Err(
35 "Graph memory enabled but vector store unavailable (Qdrant unreachable)."
36 .to_owned(),
37 );
38 }
39 return Err("Graph memory is not enabled.".to_owned());
40 };
41 Ok((memory, store))
42 }
43}
44
45async fn semantic_scan_plugin_add(
57 scanner: &zeph_skills::semantic_scanner::SkillSemanticScanner,
58 source: &str,
59 managed_dir: Option<std::path::PathBuf>,
60 mcp_allowed: Vec<String>,
61 base_shell_allowed: Vec<String>,
62) -> Result<Option<String>, CommandError> {
63 use futures::stream::StreamExt as _;
64 use zeph_skills::semantic_scanner::ScanVerdict;
65
66 let plugins_dir = zeph_plugins::PluginManager::default_plugins_dir();
67 let mgr_dir =
68 managed_dir.unwrap_or_else(|| zeph_config::defaults::default_vault_dir().join("skills"));
69 let mgr =
70 zeph_plugins::PluginManager::new(plugins_dir, mgr_dir, mcp_allowed, base_shell_allowed);
71
72 let source_owned = source.to_owned();
73 let scan_inputs = tokio::task::spawn_blocking(move || mgr.scan_targets(&source_owned))
74 .await
75 .map_err(|e| CommandError(format!("plugin scan_targets panicked: {e}")))?
76 .map_err(|e| CommandError(format!("plugin add failed: {e}")))?;
77
78 tracing::info!(
79 plugin.source = %source,
80 skills_count = scan_inputs.len(),
81 "plugins.add: running Stage-2 semantic scan"
82 );
83
84 let scan_futs: Vec<_> = scan_inputs
89 .iter()
90 .map(|input| {
91 let name = input.skill_name.clone();
92 let purpose = input.declared_purpose.clone();
93 let md = input.skill_md.clone();
94 async move {
95 let verdict = scanner.scan(&name, &purpose, &md).await;
96 (name, verdict)
97 }
98 })
99 .collect();
100
101 let verdicts: Vec<_> = tokio::time::timeout(
102 std::time::Duration::from_mins(5),
103 futures::stream::iter(scan_futs)
104 .buffer_unordered(4)
105 .collect::<Vec<_>>(),
106 )
107 .await
108 .map_err(|_| CommandError("plugin scan timed out after 300s".to_owned()))?;
109
110 for (skill_name, verdict_result) in verdicts {
111 let verdict = verdict_result.map_err(|e| {
112 CommandError(format!(
113 "plugin add failed: semantic scan error for skill {skill_name:?}: {e}"
114 ))
115 })?;
116 match verdict {
117 ScanVerdict::Allow => {
118 tracing::debug!(
119 skill = %skill_name,
120 "plugins.add: skill passed semantic scan"
121 );
122 }
123 ScanVerdict::Warn(ref reason) => {
124 tracing::warn!(
125 skill = %skill_name,
126 reason = %reason,
127 "plugins.add: skill passed with warning"
128 );
129 }
130 ScanVerdict::Block(reason) => {
131 return Ok(Some(format!(
132 "plugin add failed: skill {skill_name:?} rejected by semantic scan: {reason}"
133 )));
134 }
135 _ => {}
136 }
137 }
138 Ok(None)
139}
140
141impl<C: Channel + Send + 'static> AgentAccess for Agent<C> {
142 fn memory_tiers<'a>(
145 &'a mut self,
146 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
147 Box::pin(
148 async move {
149 let Some(memory) = self.services.memory.persistence.memory.clone() else {
150 return Ok("Memory not configured.".to_owned());
151 };
152 match memory.sqlite().count_messages_by_tier().await {
153 Ok((episodic, semantic)) => {
154 let mut out = String::new();
155 let _ = writeln!(out, "Memory tiers:");
156 let _ = writeln!(out, " Working: (current context window — virtual)");
157 let _ = writeln!(out, " Episodic: {episodic} messages");
158 let _ = writeln!(out, " Semantic: {semantic} facts");
159 Ok(out.trim_end().to_owned())
160 }
161 Err(e) => Ok(format!("Failed to query tier stats: {e}")),
162 }
163 }
164 .instrument(tracing::info_span!("core.agent_access.memory_tiers")),
165 )
166 }
167
168 fn memory_promote<'a>(
169 &'a mut self,
170 ids_str: &'a str,
171 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
172 Box::pin(
173 async move {
174 let Some(memory) = self.services.memory.persistence.memory.clone() else {
175 return Ok("Memory not configured.".to_owned());
176 };
177 let ids: Vec<MessageId> = ids_str
178 .split_whitespace()
179 .filter_map(|s| s.parse::<i64>().ok().map(MessageId))
180 .collect();
181 if ids.is_empty() {
182 return Ok(
183 "Usage: /memory promote <id> [id...]\nExample: /memory promote 42 43 44"
184 .to_owned(),
185 );
186 }
187 match memory.sqlite().manual_promote(&ids).await {
188 Ok(count) => Ok(format!("Promoted {count} message(s) to semantic tier.")),
189 Err(e) => Ok(format!("Promotion failed: {e}")),
190 }
191 }
192 .instrument(tracing::info_span!("core.agent_access.memory_promote")),
193 )
194 }
195
196 fn graph_stats<'a>(
199 &'a mut self,
200 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
201 Box::pin(
202 async move {
203 let (_, store) = match self.resolve_graph_store() {
204 Ok(pair) => pair,
205 Err(msg) => return Ok(msg),
206 };
207
208 let stats_future = async {
209 tokio::join!(
210 store.entity_count(),
211 store.active_edge_count(),
212 store.community_count(),
213 store.edge_type_distribution()
214 )
215 };
216 let Ok((entities, edges, communities, distribution)) =
217 tokio::time::timeout(Duration::from_secs(5), stats_future).await
218 else {
219 tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
220 return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
221 };
222 let mut msg = format!(
223 "Graph memory: {} entities, {} edges, {} communities",
224 entities.unwrap_or(0),
225 edges.unwrap_or(0),
226 communities.unwrap_or(0)
227 );
228 if let Ok(dist) = distribution
229 && !dist.is_empty()
230 {
231 let dist_str: Vec<String> =
232 dist.iter().map(|(t, c)| format!("{t}={c}")).collect();
233 write!(msg, "\nEdge types: {}", dist_str.join(", ")).unwrap_or(());
234 }
235 Ok(msg)
236 }
237 .instrument(tracing::info_span!("core.agent_access.graph_stats")),
238 )
239 }
240
241 fn graph_entities<'a>(
242 &'a mut self,
243 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
244 Box::pin(
245 async move {
246 let (_, store) = match self.resolve_graph_store() {
247 Ok(pair) => pair,
248 Err(msg) => return Ok(msg),
249 };
250
251 let entities = match tokio::time::timeout(
252 Duration::from_secs(5),
253 store.all_entities(),
254 )
255 .await
256 {
257 Ok(Ok(v)) => v,
258 Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
259 Err(_) => {
260 tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
261 return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
262 }
263 };
264 if entities.is_empty() {
265 return Ok("No entities found.".to_owned());
266 }
267
268 let total = entities.len();
269 let display: Vec<String> = entities
270 .iter()
271 .take(50)
272 .map(|e| {
273 format!(
274 " {:<40} {:<15} {}",
275 e.name,
276 e.entity_type.as_str(),
277 e.last_seen_at.split('T').next().unwrap_or(&e.last_seen_at)
278 )
279 })
280 .collect();
281 let mut msg = format!(
282 "Entities ({total} total):\n {:<40} {:<15} {}\n{}",
283 "NAME",
284 "TYPE",
285 "LAST SEEN",
286 display.join("\n")
287 );
288 if total > 50 {
289 write!(msg, "\n ...and {} more", total - 50).unwrap_or(());
290 }
291 Ok(msg)
292 }
293 .instrument(tracing::info_span!("core.agent_access.graph_entities")),
294 )
295 }
296
297 fn graph_facts<'a>(
298 &'a mut self,
299 name: &'a str,
300 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
301 Box::pin(
302 async move {
303 let (_, store) = match self.resolve_graph_store() {
304 Ok(pair) => pair,
305 Err(msg) => return Ok(msg),
306 };
307
308 let matches = match tokio::time::timeout(
309 Duration::from_secs(5),
310 store.find_entity_by_name(name),
311 )
312 .await
313 {
314 Ok(Ok(v)) => v,
315 Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
316 Err(_) => {
317 tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
318 return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
319 }
320 };
321 if matches.is_empty() {
322 return Ok(format!("No entity found matching '{name}'."));
323 }
324
325 let entity = &matches[0];
326 let edges = match tokio::time::timeout(
327 Duration::from_secs(5),
328 store.edges_for_entity(entity.id.0),
329 )
330 .await
331 {
332 Ok(Ok(v)) => v,
333 Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
334 Err(_) => {
335 tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
336 return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
337 }
338 };
339 if edges.is_empty() {
340 return Ok(format!("Entity '{}' has no known facts.", entity.name));
341 }
342
343 let mut entity_names: std::collections::HashMap<i64, String> =
344 std::collections::HashMap::new();
345 entity_names.insert(entity.id.0, entity.name.clone());
346 for edge in &edges {
347 let other_id = if edge.source_entity_id == entity.id.0 {
348 edge.target_entity_id
349 } else {
350 edge.source_entity_id
351 };
352 entity_names.entry(other_id).or_default();
353 }
354 for (&id, name_val) in &mut entity_names {
355 if name_val.is_empty() {
356 let result = tokio::time::timeout(
357 Duration::from_secs(5),
358 store.find_entity_by_id(id),
359 )
360 .await;
361 if let Ok(Ok(Some(other))) = result {
362 *name_val = other.name;
363 } else {
364 *name_val = format!("#{id}");
365 }
366 }
367 }
368
369 let lines: Vec<String> = edges
370 .iter()
371 .map(|e| {
372 let src = entity_names
373 .get(&e.source_entity_id)
374 .cloned()
375 .unwrap_or_else(|| format!("#{}", e.source_entity_id));
376 let tgt = entity_names
377 .get(&e.target_entity_id)
378 .cloned()
379 .unwrap_or_else(|| format!("#{}", e.target_entity_id));
380 format!(
381 " {} --[{}/{}]--> {}: {} (confidence: {:.2})",
382 src, e.relation, e.edge_type, tgt, e.fact, e.confidence
383 )
384 })
385 .collect();
386 Ok(format!(
387 "Facts for '{}':\n{}",
388 entity.name,
389 lines.join("\n")
390 ))
391 }
392 .instrument(tracing::info_span!("core.agent_access.graph_facts")),
393 )
394 }
395
396 fn graph_history<'a>(
397 &'a mut self,
398 name: &'a str,
399 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
400 Box::pin(
401 async move {
402 let (_, store) = match self.resolve_graph_store() {
403 Ok(pair) => pair,
404 Err(msg) => return Ok(msg),
405 };
406
407 let matches = match tokio::time::timeout(
408 Duration::from_secs(5),
409 store.find_entity_by_name(name),
410 )
411 .await
412 {
413 Ok(Ok(v)) => v,
414 Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
415 Err(_) => {
416 tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
417 return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
418 }
419 };
420 if matches.is_empty() {
421 return Ok(format!("No entity found matching '{name}'."));
422 }
423
424 let entity = &matches[0];
425 let edges = match tokio::time::timeout(
426 Duration::from_secs(5),
427 store.edge_history_for_entity(entity.id.0, 50),
428 )
429 .await
430 {
431 Ok(Ok(v)) => v,
432 Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
433 Err(_) => {
434 tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
435 return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
436 }
437 };
438 if edges.is_empty() {
439 return Ok(format!("Entity '{}' has no edge history.", entity.name));
440 }
441
442 let mut entity_names: std::collections::HashMap<i64, String> =
443 std::collections::HashMap::new();
444 entity_names.insert(entity.id.0, entity.name.clone());
445 for edge in &edges {
446 for &id in &[edge.source_entity_id, edge.target_entity_id] {
447 entity_names.entry(id).or_default();
448 }
449 }
450 for (&id, name_val) in &mut entity_names {
451 if name_val.is_empty() {
452 let result = tokio::time::timeout(
453 Duration::from_secs(5),
454 store.find_entity_by_id(id),
455 )
456 .await;
457 if let Ok(Ok(Some(other))) = result {
458 *name_val = other.name;
459 } else {
460 *name_val = format!("#{id}");
461 }
462 }
463 }
464
465 let n = edges.len();
466 let lines: Vec<String> = edges
467 .iter()
468 .map(|e| {
469 let status = if e.valid_to.is_some() {
470 let date = e
471 .valid_to
472 .as_deref()
473 .and_then(|s| s.split('T').next().or_else(|| s.split(' ').next()))
474 .unwrap_or("?");
475 format!("[expired {date}]")
476 } else {
477 "[active]".to_string()
478 };
479 let src = entity_names
480 .get(&e.source_entity_id)
481 .cloned()
482 .unwrap_or_else(|| format!("#{}", e.source_entity_id));
483 let tgt = entity_names
484 .get(&e.target_entity_id)
485 .cloned()
486 .unwrap_or_else(|| format!("#{}", e.target_entity_id));
487 format!(
488 " {status} {} --[{}/{}]--> {}: {} (confidence: {:.2})",
489 src, e.relation, e.edge_type, tgt, e.fact, e.confidence
490 )
491 })
492 .collect();
493 Ok(format!(
494 "Edge history for '{}' ({n} edges):\n{}",
495 entity.name,
496 lines.join("\n")
497 ))
498 }
499 .instrument(tracing::info_span!("core.agent_access.graph_history")),
500 )
501 }
502
503 fn graph_communities<'a>(
504 &'a mut self,
505 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
506 Box::pin(
507 async move {
508 let (_, store) = match self.resolve_graph_store() {
509 Ok(pair) => pair,
510 Err(msg) => return Ok(msg),
511 };
512
513 let communities =
514 match tokio::time::timeout(Duration::from_secs(5), store.all_communities())
515 .await
516 {
517 Ok(Ok(v)) => v,
518 Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
519 Err(_) => {
520 tracing::warn!(
521 "graph store call timed out after 5s (Qdrant unreachable)"
522 );
523 return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
524 }
525 };
526 if communities.is_empty() {
527 return Ok("No communities detected yet. Run graph backfill first.".to_owned());
528 }
529
530 let lines: Vec<String> = communities
531 .iter()
532 .map(|c| format!(" [{}]: {}", c.name, c.summary))
533 .collect();
534 Ok(format!(
535 "Communities ({}):\n{}",
536 communities.len(),
537 lines.join("\n")
538 ))
539 }
540 .instrument(tracing::info_span!("core.agent_access.graph_communities")),
541 )
542 }
543
544 #[allow(clippy::too_many_lines)]
545 fn graph_backfill<'a>(
546 &'a mut self,
547 limit: Option<usize>,
548 progress_cb: &'a mut (dyn FnMut(String) + Send),
549 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
550 let store = match self.resolve_graph_store() {
551 Ok((_, s)) => s,
552 Err(msg) => return Box::pin(async move { Ok(msg) }),
553 };
554 let graph_cfg = self.services.memory.extraction.graph_config.clone();
555 let embed_timeout_secs = self
556 .services
557 .memory
558 .persistence
559 .memory
560 .as_ref()
561 .map_or(5, |m| m.embed_timeout().as_secs());
562 let provider = if graph_cfg.extract_provider.as_str().is_empty() {
563 self.provider.clone()
564 } else {
565 self.resolve_background_provider(graph_cfg.extract_provider.as_str())
566 };
567 Box::pin(
568 async move {
569 let total = store.unprocessed_message_count().await.unwrap_or(0);
570 let cap = limit.unwrap_or(usize::MAX);
571
572 progress_cb(format!(
573 "Starting graph backfill... ({total} unprocessed messages)"
574 ));
575
576 let batch_size = 50usize;
577 let mut processed = 0usize;
578 let mut total_entities = 0usize;
579 let mut total_edges = 0usize;
580
581 loop {
582 let remaining_cap = cap.saturating_sub(processed);
583 if remaining_cap == 0 {
584 break;
585 }
586 let batch_limit = batch_size.min(remaining_cap);
587 let messages = store
588 .unprocessed_messages_for_backfill(batch_limit)
589 .await
590 .map_err(|e| CommandError::new(e.to_string()))?;
591 if messages.is_empty() {
592 break;
593 }
594
595 let ids: Vec<zeph_memory::types::MessageId> =
596 messages.iter().map(|(id, _)| *id).collect();
597
598 for (_id, content) in &messages {
599 if content.trim().is_empty() {
600 continue;
601 }
602 let extraction_cfg = GraphExtractionConfig {
603 max_entities: graph_cfg.max_entities_per_message,
604 max_edges: graph_cfg.max_edges_per_message,
605 extraction_timeout_secs: graph_cfg.extraction_timeout_secs,
606 community_refresh_interval: 0,
607 expired_edge_retention_days: graph_cfg.expired_edge_retention_days,
608 max_entities_cap: graph_cfg.max_entities,
609 community_summary_max_prompt_bytes: graph_cfg
610 .community_summary_max_prompt_bytes,
611 community_summary_concurrency: graph_cfg.community_summary_concurrency,
612 lpa_edge_chunk_size: graph_cfg.lpa_edge_chunk_size,
613 note_linking: zeph_memory::NoteLinkingConfig::default(),
614 link_weight_decay_lambda: graph_cfg.link_weight_decay_lambda,
615 link_weight_decay_interval_secs: graph_cfg
616 .link_weight_decay_interval_secs,
617 belief_revision_enabled: graph_cfg.belief_revision.enabled,
618 belief_revision_similarity_threshold: graph_cfg
619 .belief_revision
620 .similarity_threshold,
621 conversation_id: None,
622 apex_mem_enabled: graph_cfg.apex_mem.enabled,
623 llm_timeout_secs: graph_cfg.llm_timeout_secs,
624 embed_timeout_secs,
625 turn_index: None,
626 write_gate_min_relevance: graph_cfg
627 .write_gate
628 .enabled
629 .then_some(graph_cfg.write_gate.min_edge_relevance),
630 benna_fast_rate: graph_cfg.spreading_activation.benna_fast_rate,
631 benna_slow_rate: graph_cfg.spreading_activation.benna_slow_rate,
632 };
633 let pool = store.pool().clone();
634 match extract_and_store(
635 content.clone(),
636 vec![],
637 provider.clone(),
638 pool,
639 extraction_cfg,
640 None,
641 None,
642 )
643 .await
644 {
645 Ok(result) => {
646 total_entities += result.stats.entities_upserted;
647 total_edges += result.stats.edges_inserted;
648 }
649 Err(e) => {
650 tracing::warn!("backfill extraction error: {e:#}");
651 }
652 }
653 }
654
655 store
656 .mark_messages_graph_processed(&ids)
657 .await
658 .map_err(|e| CommandError::new(e.to_string()))?;
659 processed += messages.len();
660
661 progress_cb(format!(
662 "Backfill progress: {processed} messages processed, \
663 {total_entities} entities, {total_edges} edges"
664 ));
665 }
666
667 Ok(format!(
668 "Backfill complete: {total_entities} entities, {total_edges} edges \
669 extracted from {processed} messages"
670 ))
671 }
672 .instrument(tracing::info_span!("core.agent_access.graph_backfill")),
673 )
674 }
675
676 fn guidelines<'a>(
679 &'a mut self,
680 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
681 Box::pin(
682 async move {
683 const MAX_DISPLAY_CHARS: usize = 4096;
684
685 let Some(memory) = &self.services.memory.persistence.memory else {
686 return Ok("No memory backend initialised.".to_owned());
687 };
688
689 let cid = self.services.memory.persistence.conversation_id;
690 let sqlite = memory.sqlite();
691
692 let (version, text) = sqlite
693 .load_compression_guidelines(cid)
694 .await
695 .map_err(|e: zeph_memory::MemoryError| CommandError::new(e.to_string()))?;
696
697 if version == 0 || text.is_empty() {
698 return Ok("No compression guidelines generated yet.".to_owned());
699 }
700
701 let (_, created_at) = sqlite
702 .load_compression_guidelines_meta(cid)
703 .await
704 .unwrap_or((0, String::new()));
705
706 let (body, truncated) = if text.len() > MAX_DISPLAY_CHARS {
707 let end = text.floor_char_boundary(MAX_DISPLAY_CHARS);
708 (&text[..end], true)
709 } else {
710 (text.as_str(), false)
711 };
712
713 let mut output =
714 format!("Compression Guidelines (v{version}, updated {created_at}):\n\n{body}");
715 if truncated {
716 output.push_str("\n\n[truncated]");
717 }
718 Ok(output)
719 }
720 .instrument(tracing::info_span!("core.agent_access.guidelines")),
721 )
722 }
723
724 fn handle_model<'a>(
727 &'a mut self,
728 arg: &'a str,
729 ) -> Pin<Box<dyn Future<Output = String> + Send + 'a>> {
730 Box::pin(async move {
731 let input = if arg.is_empty() {
732 "/model".to_owned()
733 } else {
734 format!("/model {arg}")
735 };
736 self.handle_model_command_as_string(&input).await
737 })
738 }
739
740 fn handle_provider<'a>(
741 &'a mut self,
742 arg: &'a str,
743 ) -> Pin<Box<dyn Future<Output = String> + Send + 'a>> {
744 Box::pin(async move { self.handle_provider_command_as_string(arg).await })
745 }
746
747 fn handle_policy<'a>(
750 &'a mut self,
751 args: &'a str,
752 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
753 Box::pin(async move { Ok(self.handle_policy_command_as_string(args)) })
754 }
755
756 #[cfg(feature = "scheduler")]
759 fn list_scheduled_tasks<'a>(
760 &'a mut self,
761 ) -> Pin<Box<dyn Future<Output = Result<Option<String>, CommandError>> + Send + 'a>> {
762 Box::pin(async move {
763 let result = self
764 .handle_scheduler_list_as_string()
765 .await
766 .map_err(|e| CommandError::new(e.to_string()))?;
767 Ok(Some(result))
768 })
769 }
770
771 #[cfg(not(feature = "scheduler"))]
772 fn list_scheduled_tasks<'a>(
773 &'a mut self,
774 ) -> Pin<Box<dyn Future<Output = Result<Option<String>, CommandError>> + Send + 'a>> {
775 Box::pin(async move { Ok(None) })
776 }
777
778 fn lsp_status<'a>(
781 &'a mut self,
782 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
783 Box::pin(async move {
784 self.handle_lsp_status_as_string()
785 .await
786 .map_err(|e| CommandError::new(e.to_string()))
787 })
788 }
789
790 fn session_recap<'a>(
793 &'a mut self,
794 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
795 Box::pin(
796 async move {
797 match self.build_recap().await {
798 Ok(text) => Ok(text),
799 Err(e) => {
800 tracing::warn!("session recap command: {}", e.0);
804 Ok("Recap unavailable — see logs for details".to_string())
805 }
806 }
807 }
808 .instrument(tracing::info_span!("core.agent_access.session_recap")),
809 )
810 }
811
812 fn compact_context<'a>(
815 &'a mut self,
816 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
817 Box::pin(
818 self.compact_context_command()
819 .instrument(tracing::info_span!("core.agent_access.compact_context")),
820 )
821 }
822
823 fn reset_conversation<'a>(
826 &'a mut self,
827 keep_plan: bool,
828 no_digest: bool,
829 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
830 Box::pin(async move {
831 match self.reset_conversation(keep_plan, no_digest).await {
832 Ok((old_id, new_id)) => {
833 let old = old_id.map_or_else(|| "none".to_string(), |id| id.0.to_string());
834 let new = new_id.map_or_else(|| "none".to_string(), |id| id.0.to_string());
835 let keep_note = if keep_plan { " (plan preserved)" } else { "" };
836 Ok(format!(
837 "New conversation started. Previous: {old} → Current: {new}{keep_note}"
838 ))
839 }
840 Err(e) => Ok(format!("Failed to start new conversation: {e}")),
841 }
842 })
843 }
844
845 fn cache_stats(&self) -> String {
848 self.tool_orchestrator.cache_stats()
849 }
850
851 fn session_status<'a>(
854 &'a mut self,
855 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
856 Box::pin(async move { Ok(self.handle_status_as_string()) })
857 }
858
859 fn guardrail_status(&self) -> String {
862 self.format_guardrail_status()
863 }
864
865 fn focus_status(&self) -> String {
868 self.format_focus_status()
869 }
870
871 fn sidequest_status(&self) -> String {
874 self.format_sidequest_status()
875 }
876
877 fn load_image<'a>(
880 &'a mut self,
881 path: &'a str,
882 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
883 Box::pin(async move { Ok(self.handle_image_as_string(path)) })
884 }
885
886 fn handle_mcp<'a>(
889 &'a mut self,
890 args: &'a str,
891 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
892 let args_owned = args.to_owned();
895 let parts: Vec<String> = args_owned.split_whitespace().map(str::to_owned).collect();
896 let sub = parts.first().cloned().unwrap_or_default();
897
898 match sub.as_str() {
899 "list" => {
900 let manager = self.services.mcp.manager.clone();
902 let tools_snapshot: Vec<(String, String)> = self
903 .services
904 .mcp
905 .tools
906 .iter()
907 .map(|t| (t.server_id.clone(), t.name.clone()))
908 .collect();
909 Box::pin(async move {
910 use std::fmt::Write;
911 let Some(manager) = manager else {
912 return Ok("MCP is not enabled.".to_owned());
913 };
914 let server_ids = manager.list_servers().await;
915 if server_ids.is_empty() {
916 return Ok("No MCP servers connected.".to_owned());
917 }
918 let mut output = String::from("Connected MCP servers:\n");
919 let mut total = 0usize;
920 for id in &server_ids {
921 let count = tools_snapshot.iter().filter(|(sid, _)| sid == id).count();
922 total += count;
923 let _ = writeln!(output, "- {id} ({count} tools)");
924 }
925 let _ = write!(output, "Total: {total} tool(s)");
926 Ok(output)
927 })
928 }
929 "tools" => {
930 let server_id = parts.get(1).cloned();
932 let owned_tools: Vec<(String, String)> = if let Some(ref sid) = server_id {
933 self.services
934 .mcp
935 .tools
936 .iter()
937 .filter(|t| &t.server_id == sid)
938 .map(|t| (t.name.clone(), t.description.clone()))
939 .collect()
940 } else {
941 Vec::new()
942 };
943 Box::pin(async move {
944 use std::fmt::Write;
945 let Some(server_id) = server_id else {
946 return Ok("Usage: /mcp tools <server_id>".to_owned());
947 };
948 if owned_tools.is_empty() {
949 return Ok(format!("No tools found for server '{server_id}'."));
950 }
951 let mut output =
952 format!("Tools for '{server_id}' ({} total):\n", owned_tools.len());
953 for (name, desc) in &owned_tools {
954 if desc.is_empty() {
955 let _ = writeln!(output, "- {name}");
956 } else {
957 let _ = writeln!(output, "- {name} — {desc}");
958 }
959 }
960 Ok(output)
961 })
962 }
963 _ => Box::pin(async move {
970 self.handle_mcp_command(&args_owned)
971 .await
972 .map_err(|e| CommandError::new(e.to_string()))
973 }),
974 }
975 }
976
977 fn handle_skill<'a>(
980 &'a mut self,
981 args: &'a str,
982 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
983 let args_owned = args.to_owned();
984 Box::pin(async move {
985 self.handle_skill_command_as_string(&args_owned)
986 .await
987 .map_err(|e| CommandError::new(e.to_string()))
988 })
989 }
990
991 fn handle_skills<'a>(
994 &'a mut self,
995 args: &'a str,
996 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
997 let args_owned = args.to_owned();
998 Box::pin(async move {
999 self.handle_skills_as_string(&args_owned)
1000 .await
1001 .map_err(|e| CommandError::new(e.to_string()))
1002 })
1003 }
1004
1005 fn handle_feedback_command<'a>(
1008 &'a mut self,
1009 args: &'a str,
1010 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1011 let args_owned = args.to_owned();
1012 Box::pin(async move {
1013 self.handle_feedback_as_string(&args_owned)
1014 .await
1015 .map_err(|e| CommandError::new(e.to_string()))
1016 })
1017 }
1018
1019 #[cfg(feature = "scheduler")]
1022 fn handle_plan<'a>(
1023 &'a mut self,
1024 input: &'a str,
1025 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1026 Box::pin(async move {
1027 self.dispatch_plan_command_as_string(input)
1028 .await
1029 .map_err(|e| CommandError::new(e.to_string()))
1030 })
1031 }
1032
1033 #[cfg(not(feature = "scheduler"))]
1034 fn handle_plan<'a>(
1035 &'a mut self,
1036 _input: &'a str,
1037 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1038 Box::pin(async move { Ok(String::new()) })
1039 }
1040
1041 fn handle_experiment<'a>(
1044 &'a mut self,
1045 input: &'a str,
1046 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1047 Box::pin(async move {
1048 self.handle_experiment_command_as_string(input)
1049 .await
1050 .map_err(|e| CommandError::new(e.to_string()))
1051 })
1052 }
1053
1054 fn handle_agent_dispatch<'a>(
1057 &'a mut self,
1058 input: &'a str,
1059 ) -> Pin<Box<dyn Future<Output = Result<Option<String>, CommandError>> + Send + 'a>> {
1060 Box::pin(async move {
1061 match self.dispatch_agent_command(input).await {
1062 Some(Err(e)) => Err(CommandError::new(e.to_string())),
1063 Some(Ok(())) | None => Ok(None),
1064 }
1065 })
1066 }
1067
1068 fn handle_plugins<'a>(
1071 &'a mut self,
1072 args: &'a str,
1073 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1074 let args_owned = args.to_owned();
1075 let managed_dir = self.services.skill.managed_dir.clone();
1078 let mcp_allowed = self.services.mcp.allowed_commands.clone();
1079 let base_shell_allowed = self.runtime.lifecycle.startup_shell_overlay.allowed.clone();
1080 let ephemeral_names: Vec<String> = self
1082 .runtime
1083 .ephemeral_plugins
1084 .iter()
1085 .filter_map(|tmp| {
1086 let manifest_path = tmp.path().join("plugin.toml");
1087 std::fs::read_to_string(manifest_path)
1088 .ok()
1089 .and_then(|s| toml::from_str::<zeph_plugins::PluginManifest>(&s).ok())
1090 .map(|m| m.plugin.name)
1091 })
1092 .collect();
1093
1094 let semantic_scan_enabled = self.services.skill.semantic_scan;
1098 let maybe_scanner: Option<zeph_skills::semantic_scanner::SkillSemanticScanner> =
1099 if semantic_scan_enabled {
1100 let provider_name = self.services.skill.semantic_scan_provider.as_str();
1101 if provider_name.trim().is_empty() {
1102 return Box::pin(async move {
1103 Err(CommandError::new(
1104 "semantic_scan is enabled but semantic_scan_provider is not set; \
1105 refusing plugin add to maintain fail-closed security posture",
1106 ))
1107 });
1108 }
1109 let provider_known = self
1110 .runtime
1111 .providers
1112 .provider_pool
1113 .iter()
1114 .any(|e| e.effective_name().eq_ignore_ascii_case(provider_name));
1115 if !provider_known {
1116 let name = provider_name.to_owned();
1117 return Box::pin(async move {
1118 Err(CommandError::new(format!(
1119 "semantic_scan is enabled but semantic_scan_provider '{name}' \
1120 is not configured in [[llm.providers]]; \
1121 refusing plugin add to maintain fail-closed security posture",
1122 )))
1123 });
1124 }
1125 let provider = self.resolve_background_provider(provider_name);
1126 Some(zeph_skills::semantic_scanner::SkillSemanticScanner::new(
1127 provider,
1128 ))
1129 } else {
1130 None
1131 };
1132
1133 Box::pin(async move {
1134 let (subcmd, source) = args_owned
1135 .trim()
1136 .split_once(' ')
1137 .unwrap_or((args_owned.trim(), ""));
1138
1139 if subcmd == "add"
1141 && !source.trim().is_empty()
1142 && let Some(ref scanner) = maybe_scanner
1143 && let Some(err) = semantic_scan_plugin_add(
1144 scanner,
1145 source.trim(),
1146 managed_dir.clone(),
1147 mcp_allowed.clone(),
1148 base_shell_allowed.clone(),
1149 )
1150 .instrument(tracing::info_span!("core.agent.scan_plugin", plugin = %source.trim()))
1151 .await?
1152 {
1153 return Ok(err);
1154 }
1155
1156 tokio::task::spawn_blocking(move || {
1159 Self::run_plugin_command(
1160 &args_owned,
1161 managed_dir,
1162 mcp_allowed,
1163 base_shell_allowed,
1164 ephemeral_names,
1165 )
1166 })
1167 .await
1168 .map_err(|e| CommandError(format!("plugin task panicked: {e}")))
1169 })
1170 }
1171
1172 fn handle_acp<'a>(
1175 &'a mut self,
1176 args: &'a str,
1177 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1178 Box::pin(async move {
1179 self.handle_acp_as_string(args)
1180 .map_err(|e| CommandError::new(e.to_string()))
1181 })
1182 }
1183
1184 #[cfg(feature = "cocoon")]
1187 fn handle_cocoon<'a>(
1188 &'a mut self,
1189 args: &'a str,
1190 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1191 Box::pin(async move {
1192 self.handle_cocoon_as_string(args)
1193 .await
1194 .map_err(|e| CommandError::new(e.to_string()))
1195 })
1196 }
1197
1198 #[cfg(not(feature = "cocoon"))]
1199 fn handle_cocoon<'a>(
1200 &'a mut self,
1201 _args: &'a str,
1202 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1203 Box::pin(async {
1204 Ok("Cocoon support is not compiled in. Rebuild with `--features cocoon`.".to_owned())
1205 })
1206 }
1207
1208 fn handle_loop<'a>(
1211 &'a mut self,
1212 args: &'a str,
1213 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1214 use zeph_commands::handlers::loop_cmd::parse_loop_args;
1215
1216 let args_owned = args.trim().to_owned();
1217 Box::pin(async move {
1218 if args_owned == "stop" {
1219 return Ok(self.stop_user_loop());
1220 }
1221 if args_owned == "status" {
1222 return Ok(match &self.runtime.lifecycle.user_loop {
1223 Some(ls) => format!(
1224 "Loop active: \"{}\" (iteration {}, interval every {}s).",
1225 ls.prompt,
1226 ls.iteration,
1227 ls.interval.period().as_secs(),
1228 ),
1229 None => "No active loop.".to_owned(),
1230 });
1231 }
1232 let (prompt, interval_secs) = parse_loop_args(&args_owned)?;
1233
1234 if prompt.starts_with('/') {
1235 return Err(CommandError::new(
1236 "Loop prompt must not start with '/'. Slash commands cannot be used as loop prompts.",
1237 ));
1238 }
1239
1240 let min_secs = self.runtime.config.loop_min_interval_secs;
1241 if interval_secs < min_secs {
1242 return Err(CommandError::new(format!(
1243 "Minimum loop interval is {min_secs}s. Got {interval_secs}s."
1244 )));
1245 }
1246 if self.runtime.lifecycle.user_loop.is_some() {
1247 return Err(CommandError::new(
1248 "A loop is already active. Use /loop stop first.",
1249 ));
1250 }
1251
1252 self.start_user_loop(prompt.clone(), interval_secs);
1253 Ok(format!(
1254 "Loop started: \"{prompt}\" every {interval_secs}s. Use /loop stop to cancel."
1255 ))
1256 })
1257 }
1258
1259 fn notify_test<'a>(
1260 &'a mut self,
1261 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1262 let notifier = self.runtime.lifecycle.notifier.clone();
1263 Box::pin(async move {
1264 let Some(notifier) = notifier else {
1265 return Ok(
1266 "Notifications are disabled. Set `notifications.enabled = true` in config."
1267 .to_owned(),
1268 );
1269 };
1270 match notifier.fire_test().await {
1271 Ok(()) => Ok("Test notification sent.".to_owned()),
1272 Err(e) => Err(CommandError::new(format!("notification test failed: {e}"))),
1273 }
1274 })
1275 }
1276
1277 fn handle_trajectory(&mut self, args: &str) -> String {
1278 self.handle_trajectory_command_as_string(args)
1279 }
1280
1281 fn handle_scope(&self, args: &str) -> String {
1282 self.handle_scope_command_as_string(args)
1283 }
1284
1285 fn handle_goal<'a>(
1288 &'a mut self,
1289 args: &'a str,
1290 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1291 if self.services.goal_accounting.is_none() {
1293 if !self.runtime.config.goals.enabled {
1294 return Box::pin(async {
1295 Ok("Goals are disabled. Set `[goals] enabled = true` in config.".to_owned())
1296 });
1297 }
1298 let pool = match self.services.memory.persistence.memory.as_ref() {
1299 Some(m) => std::sync::Arc::new(m.sqlite().pool().clone()),
1300 None => {
1301 return Box::pin(async {
1302 Ok("Goals require a database backend (memory not configured).".to_owned())
1303 });
1304 }
1305 };
1306 let store = std::sync::Arc::new(crate::goal::GoalStore::new(pool));
1307 let accounting = std::sync::Arc::new(crate::goal::GoalAccounting::new(store));
1308 self.services.goal_accounting = Some(accounting);
1309 }
1310
1311 let accounting =
1312 self.services.goal_accounting.clone().expect(
1313 "invariant: goal_accounting is always Some at this point (initialized above)",
1314 );
1315 let max_chars = self.runtime.config.goals.max_text_chars;
1316 let default_budget = self.runtime.config.goals.default_token_budget;
1317 let autonomous_enabled = self.runtime.config.goals.autonomous_enabled;
1318 let autonomous_max_turns = self.runtime.config.goals.autonomous_max_turns;
1319 let args_owned = args.to_owned();
1320
1321 let pending_start_arc = std::sync::Arc::clone(&self.services.autonomous.pending_start_arc);
1326
1327 Box::pin(async move {
1328 let _ = accounting.refresh().await;
1329 let store = accounting.get_store();
1330 let args = args_owned.as_str();
1331
1332 match args {
1333 "" | "status" => goal_status(&accounting).await,
1334 "pause" => goal_pause(&accounting, &store).await,
1335 "resume" => goal_resume(&accounting, &store).await,
1336 "complete" => goal_complete(&accounting, &store).await,
1337 "clear" => goal_clear(&accounting, &store).await,
1338 "list" => goal_list(&store).await,
1339 _ if args.starts_with("create") => {
1340 let (msg, auto_req) = goal_create(
1341 args,
1342 &accounting,
1343 &store,
1344 max_chars,
1345 default_budget,
1346 autonomous_enabled,
1347 autonomous_max_turns,
1348 )
1349 .await?;
1350 if let Some(req) = auto_req {
1351 *pending_start_arc.lock() = Some(req);
1352 }
1353 Ok(msg)
1354 }
1355 _ => Ok(
1356 "Unknown /goal subcommand. Try: create, pause, resume, complete, clear, status, list."
1357 .to_owned(),
1358 ),
1359 }
1360 })
1361 }
1362
1363 fn active_goal_snapshot(&self) -> Option<zeph_commands::GoalSnapshot> {
1364 let accounting = self.services.goal_accounting.as_ref()?;
1365 let snap = accounting.snapshot()?;
1366 Some(zeph_commands::GoalSnapshot {
1367 id: snap.id,
1368 text: snap.text,
1369 status: match snap.status {
1370 crate::goal::GoalStatus::Active => zeph_commands::GoalStatusView::Active,
1371 crate::goal::GoalStatus::Paused => zeph_commands::GoalStatusView::Paused,
1372 crate::goal::GoalStatus::Completed => zeph_commands::GoalStatusView::Completed,
1373 crate::goal::GoalStatus::Cleared => zeph_commands::GoalStatusView::Cleared,
1374 },
1375 turns_used: snap.turns_used,
1376 tokens_used: snap.tokens_used,
1377 token_budget: snap.token_budget,
1378 })
1379 }
1380
1381 fn handle_agents<'a>(
1384 &'a mut self,
1385 args: &'a str,
1386 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1387 use zeph_commands::handlers::agents_fleet::{FleetEntry, format_fleet_section};
1388 use zeph_subagent::AgentsCommand;
1389
1390 let args_owned = args.trim().to_owned();
1391 Box::pin(async move {
1392 let show_fleet = args_owned.is_empty() || args_owned == "fleet";
1394
1395 let fleet_section = if show_fleet {
1396 let snapshots = self.services.autonomous_registry.list();
1397 let entries: Vec<FleetEntry> = snapshots
1398 .into_iter()
1399 .map(|s| FleetEntry {
1400 goal_id: s.goal_id,
1401 goal_text_short: s.goal_text_short,
1402 state: s.state,
1403 turns_executed: s.turns_executed,
1404 max_turns: s.max_turns,
1405 elapsed: s.elapsed,
1406 })
1407 .collect();
1408 format_fleet_section(&entries)
1409 } else {
1410 String::new()
1411 };
1412
1413 let definitions_section = if show_fleet || args_owned == "list" {
1415 self.handle_agents_definitions_list()
1416 } else {
1417 match AgentsCommand::parse(&format!("/agents {args_owned}")) {
1419 Ok(cmd) => self.handle_agents_crud(cmd),
1420 Err(e) => e.to_string(),
1421 }
1422 };
1423
1424 let mut out = fleet_section;
1425 if !definitions_section.is_empty() {
1426 if !out.is_empty() {
1427 out.push('\n');
1428 }
1429 out.push_str(&definitions_section);
1430 }
1431
1432 if out.is_empty() {
1433 "No active autonomous sessions or sub-agent definitions found."
1434 .clone_into(&mut out);
1435 }
1436
1437 Ok(out)
1438 })
1439 }
1440}
1441
1442type GoalStore = crate::goal::GoalStore;
1443type GoalAccounting = crate::goal::GoalAccounting;
1444
1445const AUTONOMOUS_MAX_TURNS_CAP: u32 = 1000;
1447
1448async fn goal_status(accounting: &GoalAccounting) -> Result<String, CommandError> {
1449 match accounting.get_active().await {
1450 Ok(Some(g)) => {
1451 let budget_line = g.token_budget.map_or_else(
1452 || format!(" tokens used: {}", g.tokens_used),
1453 |b| format!(" budget: {}/{b}", g.tokens_used),
1454 );
1455 Ok(format!(
1456 "Active goal [{}]: {}\n status: {}\n turns: {}\n{}",
1457 &g.id[..8],
1458 g.text,
1459 g.status,
1460 g.turns_used,
1461 budget_line
1462 ))
1463 }
1464 Ok(None) => Ok("No active goal. Use `/goal create <text>` to set one.".to_owned()),
1465 Err(e) => Ok(format!("Goal lookup failed: {e}")),
1466 }
1467}
1468
1469async fn goal_create(
1475 args: &str,
1476 accounting: &GoalAccounting,
1477 store: &GoalStore,
1478 max_chars: usize,
1479 default_budget: Option<u64>,
1480 autonomous_enabled: bool,
1481 autonomous_max_turns: u32,
1482) -> Result<(String, Option<(String, String, u32)>), CommandError> {
1483 let rest = args.strip_prefix("create").unwrap_or("").trim();
1484
1485 let (stripped, is_auto, explicit_turns) = parse_auto_flags(rest);
1487 let (text, explicit_budget) = parse_goal_create_args(&stripped);
1488
1489 if text.is_empty() {
1490 return Ok((
1491 "Usage: /goal create <text> [--budget N] [--auto [--turns N]]".to_owned(),
1492 None,
1493 ));
1494 }
1495 if is_auto && !autonomous_enabled {
1496 return Ok((
1497 "Autonomous mode is disabled. Set `[goals] autonomous_enabled = true` in config."
1498 .to_owned(),
1499 None,
1500 ));
1501 }
1502 let budget = explicit_budget.or(default_budget.filter(|&b| b > 0));
1503
1504 let max_turns = explicit_turns
1505 .unwrap_or(autonomous_max_turns)
1506 .min(AUTONOMOUS_MAX_TURNS_CAP);
1507 if explicit_turns.is_some_and(|t| t > AUTONOMOUS_MAX_TURNS_CAP) {
1508 tracing::warn!(
1509 requested = explicit_turns,
1510 capped = AUTONOMOUS_MAX_TURNS_CAP,
1511 "autonomous max_turns capped to {AUTONOMOUS_MAX_TURNS_CAP}"
1512 );
1513 }
1514
1515 match store.create(text, budget, max_chars).await {
1516 Ok(g) => {
1517 let _ = accounting.refresh().await;
1518 let auto_start = if is_auto {
1519 Some((g.id.clone(), g.text.clone(), max_turns))
1520 } else {
1521 None
1522 };
1523 let auto_note = if is_auto {
1524 " Autonomous mode enabled — use `/goal clear` to stop."
1525 } else {
1526 ""
1527 };
1528 Ok((
1529 format!("Goal created [{}]: {}{auto_note}", &g.id[..8], g.text),
1530 auto_start,
1531 ))
1532 }
1533 Err(crate::goal::store::GoalError::TextTooLong { max }) => Ok((
1534 format!("Goal text exceeds {max} characters. Please shorten it."),
1535 None,
1536 )),
1537 Err(e) => Ok((format!("Failed to create goal: {e}"), None)),
1538 }
1539}
1540
1541async fn goal_pause(
1542 accounting: &GoalAccounting,
1543 store: &GoalStore,
1544) -> Result<String, CommandError> {
1545 match accounting.get_active().await {
1546 Ok(Some(g)) => {
1547 match store
1548 .transition(&g.id, crate::goal::GoalStatus::Paused, g.updated_at)
1549 .await
1550 {
1551 Ok(_) => {
1552 let _ = accounting.refresh().await;
1553 Ok(format!("Goal [{}] paused.", &g.id[..8]))
1554 }
1555 Err(crate::goal::store::GoalError::StaleUpdate(_)) => {
1556 let current = accounting.get_active().await.ok().flatten();
1557 Ok(format!(
1558 "Goal state changed concurrently. Current: {}",
1559 current.map_or_else(|| "none".into(), |g| g.status.to_string())
1560 ))
1561 }
1562 Err(e) => Ok(format!("Pause failed: {e}")),
1563 }
1564 }
1565 Ok(None) => Ok("No active goal to pause.".to_owned()),
1566 Err(e) => Ok(format!("Failed: {e}")),
1567 }
1568}
1569
1570async fn goal_resume(
1571 accounting: &GoalAccounting,
1572 store: &GoalStore,
1573) -> Result<String, CommandError> {
1574 let goals = store.list(10).await.unwrap_or_default();
1575 let paused = goals
1576 .into_iter()
1577 .find(|g| g.status == crate::goal::GoalStatus::Paused);
1578 match paused {
1579 Some(g) => {
1580 match store
1581 .transition(&g.id, crate::goal::GoalStatus::Active, g.updated_at)
1582 .await
1583 {
1584 Ok(_) => {
1585 let _ = accounting.refresh().await;
1586 Ok(format!("Goal [{}] resumed: {}", &g.id[..8], g.text))
1587 }
1588 Err(crate::goal::store::GoalError::StaleUpdate(_)) => {
1589 Ok("Goal state changed concurrently — please retry.".to_owned())
1590 }
1591 Err(e) => Ok(format!("Resume failed: {e}")),
1592 }
1593 }
1594 None => Ok("No paused goal to resume.".to_owned()),
1595 }
1596}
1597
1598async fn goal_complete(
1599 accounting: &GoalAccounting,
1600 store: &GoalStore,
1601) -> Result<String, CommandError> {
1602 match accounting.get_active().await {
1603 Ok(Some(g)) => {
1604 match store
1605 .transition(&g.id, crate::goal::GoalStatus::Completed, g.updated_at)
1606 .await
1607 {
1608 Ok(_) => {
1609 let _ = accounting.refresh().await;
1610 Ok(format!("Goal [{}] marked complete.", &g.id[..8]))
1611 }
1612 Err(e) => Ok(format!("Complete failed: {e}")),
1613 }
1614 }
1615 Ok(None) => Ok("No active goal.".to_owned()),
1616 Err(e) => Ok(format!("Failed: {e}")),
1617 }
1618}
1619
1620async fn goal_clear(
1621 accounting: &GoalAccounting,
1622 store: &GoalStore,
1623) -> Result<String, CommandError> {
1624 let goals = store.list(10).await.unwrap_or_default();
1625 let target = goals.into_iter().find(|g| {
1626 g.status == crate::goal::GoalStatus::Active || g.status == crate::goal::GoalStatus::Paused
1627 });
1628 match target {
1629 Some(g) => {
1630 match store
1631 .transition(&g.id, crate::goal::GoalStatus::Cleared, g.updated_at)
1632 .await
1633 {
1634 Ok(_) => {
1635 let _ = accounting.refresh().await;
1636 Ok(format!("Goal [{}] cleared.", &g.id[..8]))
1637 }
1638 Err(e) => Ok(format!("Clear failed: {e}")),
1639 }
1640 }
1641 None => Ok("No active or paused goal to clear.".to_owned()),
1642 }
1643}
1644
1645async fn goal_list(store: &GoalStore) -> Result<String, CommandError> {
1646 let goals = store.list(20).await.unwrap_or_default();
1647 if goals.is_empty() {
1648 return Ok("No goals recorded.".to_owned());
1649 }
1650 let mut out = String::from("Goals:\n");
1651 for g in goals {
1652 let _ = std::fmt::Write::write_fmt(
1653 &mut out,
1654 format_args!(
1655 " {} [{}] {} — {} turns\n",
1656 g.status.badge_symbol(),
1657 &g.id[..8],
1658 g.text,
1659 g.turns_used
1660 ),
1661 );
1662 }
1663 Ok(out.trim_end().to_owned())
1664}
1665
1666fn parse_goal_create_args(args: &str) -> (&str, Option<u64>) {
1667 if let Some(pos) = args.find("--budget") {
1668 let text = args[..pos].trim();
1669 let rest = args[pos + "--budget".len()..].trim();
1670 let budget = rest
1671 .split_whitespace()
1672 .next()
1673 .and_then(|s| s.parse::<u64>().ok());
1674 (text, budget)
1675 } else {
1676 (args, None)
1677 }
1678}
1679
1680fn parse_auto_flags(args: &str) -> (String, bool, Option<u32>) {
1684 let mut is_auto = false;
1685 let mut turns: Option<u32> = None;
1686 let mut text_words: Vec<&str> = Vec::new();
1687 let mut words = args.split_whitespace();
1688
1689 while let Some(w) = words.next() {
1690 if w == "--auto" {
1691 is_auto = true;
1692 } else if w == "--turns" {
1693 turns = words.next().and_then(|n| n.parse::<u32>().ok());
1694 } else {
1695 text_words.push(w);
1696 }
1697 }
1698
1699 (text_words.join(" "), is_auto, turns)
1700}
1701
1702impl From<AgentError> for CommandError {
1704 fn from(e: AgentError) -> Self {
1705 Self(e.to_string())
1706 }
1707}
1708
1709#[cfg(test)]
1710mod tests {
1711 use super::super::agent_tests::{
1712 MockChannel, MockToolExecutor, create_test_registry, mock_provider,
1713 };
1714 use super::*;
1715 use zeph_commands::traits::agent::AgentAccess;
1716 use zeph_memory::semantic::SemanticMemory;
1717
1718 async fn memory_without_qdrant() -> SemanticMemory {
1719 SemanticMemory::new(
1720 ":memory:",
1721 "http://127.0.0.1:1",
1722 None,
1723 zeph_llm::any::AnyProvider::Mock(zeph_llm::mock::MockProvider::default()),
1724 "test-model",
1725 )
1726 .await
1727 .unwrap()
1728 }
1729
1730 #[tokio::test]
1734 async fn graph_stats_enabled_but_no_store_reports_unavailable() {
1735 let cfg = crate::config::GraphConfig {
1736 enabled: true,
1737 ..Default::default()
1738 };
1739 let memory = memory_without_qdrant().await;
1740 let cid = memory.sqlite().create_conversation().await.unwrap();
1741 let mut agent = Agent::new(
1742 mock_provider(vec![]),
1743 MockChannel::new(vec![]),
1744 create_test_registry(),
1745 None,
1746 5,
1747 MockToolExecutor::no_tools(),
1748 )
1749 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1750 .with_graph_config(cfg);
1751
1752 let result = agent.graph_stats().await.unwrap();
1753 assert!(
1754 result.contains("unavailable"),
1755 "expected 'unavailable' but got: {result}"
1756 );
1757 assert!(
1758 !result.contains("not enabled"),
1759 "must not report 'not enabled' when graph is enabled: {result}"
1760 );
1761 }
1762
1763 #[tokio::test]
1764 async fn graph_stats_disabled_reports_not_enabled() {
1765 let cfg = crate::config::GraphConfig {
1766 enabled: false,
1767 ..Default::default()
1768 };
1769 let memory = memory_without_qdrant().await;
1770 let cid = memory.sqlite().create_conversation().await.unwrap();
1771 let mut agent = Agent::new(
1772 mock_provider(vec![]),
1773 MockChannel::new(vec![]),
1774 create_test_registry(),
1775 None,
1776 5,
1777 MockToolExecutor::no_tools(),
1778 )
1779 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1780 .with_graph_config(cfg);
1781
1782 let result = agent.graph_stats().await.unwrap();
1783 assert!(
1784 result.contains("not enabled"),
1785 "expected 'not enabled' but got: {result}"
1786 );
1787 }
1788
1789 #[tokio::test]
1795 async fn graph_backfill_with_extract_provider_resolves_without_panic() {
1796 let cfg = crate::config::GraphConfig {
1797 enabled: true,
1798 extract_provider: zeph_config::providers::ProviderName::new("nonexistent-provider"),
1799 ..Default::default()
1800 };
1801 let mut memory = memory_without_qdrant().await;
1802 let pool = memory.sqlite().pool().clone();
1804 memory.graph_store = Some(std::sync::Arc::new(zeph_memory::GraphStore::new(pool)));
1805 let cid = memory.sqlite().create_conversation().await.unwrap();
1806 let mut agent = Agent::new(
1807 mock_provider(vec![]),
1808 MockChannel::new(vec![]),
1809 create_test_registry(),
1810 None,
1811 5,
1812 MockToolExecutor::no_tools(),
1813 )
1814 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1815 .with_graph_config(cfg);
1816
1817 let mut progress = vec![];
1818 let result = agent
1819 .graph_backfill(Some(10), &mut |msg| progress.push(msg))
1820 .await
1821 .unwrap();
1822
1823 assert!(
1825 result.contains("Backfill complete"),
1826 "expected 'Backfill complete' but got: {result}"
1827 );
1828 }
1829
1830 #[tokio::test]
1833 async fn graph_entities_enabled_but_no_store_reports_unavailable() {
1834 let cfg = crate::config::GraphConfig {
1835 enabled: true,
1836 ..Default::default()
1837 };
1838 let memory = memory_without_qdrant().await;
1839 let cid = memory.sqlite().create_conversation().await.unwrap();
1840 let mut agent = Agent::new(
1841 mock_provider(vec![]),
1842 MockChannel::new(vec![]),
1843 create_test_registry(),
1844 None,
1845 5,
1846 MockToolExecutor::no_tools(),
1847 )
1848 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1849 .with_graph_config(cfg);
1850
1851 let result = agent.graph_entities().await.unwrap();
1852 assert!(
1853 result.contains("unavailable"),
1854 "expected 'unavailable' but got: {result}"
1855 );
1856 }
1857
1858 #[tokio::test]
1860 async fn graph_communities_enabled_but_no_store_reports_unavailable() {
1861 let cfg = crate::config::GraphConfig {
1862 enabled: true,
1863 ..Default::default()
1864 };
1865 let memory = memory_without_qdrant().await;
1866 let cid = memory.sqlite().create_conversation().await.unwrap();
1867 let mut agent = Agent::new(
1868 mock_provider(vec![]),
1869 MockChannel::new(vec![]),
1870 create_test_registry(),
1871 None,
1872 5,
1873 MockToolExecutor::no_tools(),
1874 )
1875 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1876 .with_graph_config(cfg);
1877
1878 let result = agent.graph_communities().await.unwrap();
1879 assert!(
1880 result.contains("unavailable"),
1881 "expected 'unavailable' but got: {result}"
1882 );
1883 }
1884
1885 #[tokio::test]
1890 async fn graph_store_timeout_pattern_fires_on_pending_future() {
1891 use std::future;
1892 let result = tokio::time::timeout(
1893 Duration::from_millis(10),
1894 future::pending::<Result<Vec<()>, String>>(),
1895 )
1896 .await;
1897 assert!(
1898 result.is_err(),
1899 "timeout must fire on a never-resolving future"
1900 );
1901 }
1902
1903 #[tokio::test]
1908 async fn plugin_add_semantic_scan_enabled_empty_provider_returns_error() {
1909 let mut agent = Agent::new(
1910 mock_provider(vec![]),
1911 MockChannel::new(vec![]),
1912 create_test_registry(),
1913 None,
1914 5,
1915 MockToolExecutor::no_tools(),
1916 )
1917 .with_semantic_scan(true, "");
1918
1919 let result = agent.handle_plugins("add some-plugin").await;
1920 assert!(
1921 result.is_err(),
1922 "expected CommandError for missing semantic_scan_provider, got: {result:?}"
1923 );
1924 let msg = result.unwrap_err().to_string();
1925 assert!(
1926 msg.contains("semantic_scan_provider"),
1927 "error message must mention semantic_scan_provider, got: {msg}"
1928 );
1929 }
1930
1931 #[tokio::test]
1934 async fn plugin_list_semantic_scan_disabled_succeeds() {
1935 let mut agent = Agent::new(
1936 mock_provider(vec![]),
1937 MockChannel::new(vec![]),
1938 create_test_registry(),
1939 None,
1940 5,
1941 MockToolExecutor::no_tools(),
1942 )
1943 .with_semantic_scan(false, "");
1944
1945 let result = agent.handle_plugins("list").await;
1947 assert!(
1948 result.is_ok(),
1949 "plugin list must succeed when semantic_scan is disabled, got: {result:?}"
1950 );
1951 }
1952
1953 #[tokio::test]
1957 async fn plugin_add_semantic_scan_disabled_no_scan_error() {
1958 let mut agent = Agent::new(
1959 mock_provider(vec![]),
1960 MockChannel::new(vec![]),
1961 create_test_registry(),
1962 None,
1963 5,
1964 MockToolExecutor::no_tools(),
1965 )
1966 .with_semantic_scan(false, "");
1967
1968 let result = agent.handle_plugins("add some-plugin").await;
1969 if let Err(ref e) = result {
1972 assert!(
1973 !e.to_string().contains("semantic_scan_provider"),
1974 "must not fail with scan error when semantic_scan is disabled, got: {e}"
1975 );
1976 }
1977 }
1978
1979 #[tokio::test]
1983 async fn semantic_scan_plugin_add_concurrent_all_allow_returns_none() {
1984 use zeph_llm::any::AnyProvider;
1985 use zeph_llm::mock::MockProvider;
1986 use zeph_skills::semantic_scanner::SkillSemanticScanner;
1987
1988 let allow_json = r#"{"verdict":"allow","reason":"ok"}"#.to_owned();
1990 let provider = AnyProvider::Mock(MockProvider::with_responses(vec![
1991 allow_json.clone(),
1992 allow_json,
1993 ]));
1994 let scanner = SkillSemanticScanner::new(provider);
1995
1996 let tmp = tempfile::tempdir().unwrap();
1999 let plugin_toml = r#"
2000[plugin]
2001name = "test-plugin"
2002version = "0.1.0"
2003description = "test"
2004
2005[[skills]]
2006path = "skill-a"
2007
2008[[skills]]
2009path = "skill-b"
2010"#;
2011 std::fs::write(tmp.path().join("plugin.toml"), plugin_toml).unwrap();
2012 for name in ["skill-a", "skill-b"] {
2013 let skill_dir = tmp.path().join(name);
2014 std::fs::create_dir_all(&skill_dir).unwrap();
2015 std::fs::write(
2016 skill_dir.join("SKILL.md"),
2017 format!("# {name}\n\n## Purpose\nTest skill.\n"),
2018 )
2019 .unwrap();
2020 }
2021
2022 let result =
2023 semantic_scan_plugin_add(&scanner, tmp.path().to_str().unwrap(), None, vec![], vec![])
2024 .await;
2025
2026 assert!(result.is_ok(), "expected Ok, got: {result:?}");
2028 assert!(
2029 result.unwrap().is_none(),
2030 "expected None (all passed) but got Some(err)"
2031 );
2032 }
2033
2034 #[tokio::test]
2039 async fn semantic_scan_plugin_add_block_names_correct_skill() {
2040 use zeph_llm::any::AnyProvider;
2041 use zeph_llm::mock::MockProvider;
2042 use zeph_skills::semantic_scanner::SkillSemanticScanner;
2043
2044 let allow_json = r#"{"verdict":"allow","reason":"ok"}"#.to_owned();
2046 let block_json = r#"{"verdict":"block","reason":"malicious"}"#.to_owned();
2047 let provider =
2048 AnyProvider::Mock(MockProvider::with_responses(vec![allow_json, block_json]));
2049 let scanner = SkillSemanticScanner::new(provider);
2050
2051 let tmp = tempfile::tempdir().unwrap();
2052 let plugin_toml = r#"
2053[plugin]
2054name = "test-plugin-block"
2055version = "0.1.0"
2056description = "test"
2057
2058[[skills]]
2059path = "skill-first"
2060
2061[[skills]]
2062path = "skill-second"
2063"#;
2064 std::fs::write(tmp.path().join("plugin.toml"), plugin_toml).unwrap();
2065 for name in ["skill-first", "skill-second"] {
2066 let skill_dir = tmp.path().join(name);
2067 std::fs::create_dir_all(&skill_dir).unwrap();
2068 std::fs::write(
2069 skill_dir.join("SKILL.md"),
2070 format!("# {name}\n\n## Purpose\nTest skill.\n"),
2071 )
2072 .unwrap();
2073 }
2074
2075 let result =
2076 semantic_scan_plugin_add(&scanner, tmp.path().to_str().unwrap(), None, vec![], vec![])
2077 .await;
2078
2079 assert!(result.is_ok(), "expected Ok(_), got: {result:?}");
2080 let msg = result
2081 .unwrap()
2082 .expect("expected Some(err) for blocked skill");
2083 assert!(
2084 msg.contains("skill-second"),
2085 "rejection must name the blocked skill 'skill-second', got: {msg}"
2086 );
2087 assert!(
2088 !msg.contains("skill-first"),
2089 "rejection must NOT name the allowed skill 'skill-first', got: {msg}"
2090 );
2091 }
2092
2093 #[tokio::test]
2096 async fn plugin_add_semantic_scan_unknown_provider_returns_error() {
2097 let mut agent = Agent::new(
2098 mock_provider(vec![]),
2099 MockChannel::new(vec![]),
2100 create_test_registry(),
2101 None,
2102 5,
2103 MockToolExecutor::no_tools(),
2104 )
2105 .with_semantic_scan(true, "nonexistent_provider");
2106
2107 let result = agent.handle_plugins("add some-plugin").await;
2108 assert!(
2109 result.is_err(),
2110 "expected CommandError for unknown semantic_scan_provider, got: {result:?}"
2111 );
2112 let msg = result.unwrap_err().to_string();
2113 assert!(
2114 msg.contains("semantic_scan_provider"),
2115 "error message must mention semantic_scan_provider, got: {msg}"
2116 );
2117 }
2118}