1use std::fmt::Write as _;
13use std::future::Future;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::Duration;
17
18use tracing::Instrument as _;
19use zeph_commands::CommandError;
20use zeph_commands::traits::agent::AgentAccess;
21use zeph_memory::semantic::SemanticMemory;
22use zeph_memory::{GraphExtractionConfig, GraphStore, MessageId, extract_and_store};
23
24use super::{Agent, error::AgentError};
25use crate::channel::Channel;
26
27impl<C: Channel + Send + 'static> Agent<C> {
28 fn resolve_graph_store(&self) -> Result<(Arc<SemanticMemory>, Arc<GraphStore>), String> {
29 let Some(memory) = self.services.memory.persistence.memory.clone() else {
30 return Err("Graph memory is not enabled.".to_owned());
31 };
32 let Some(store) = memory.graph_store.clone() else {
33 if self.services.memory.extraction.graph_config.enabled {
34 return Err(
35 "Graph memory enabled but vector store unavailable (Qdrant unreachable)."
36 .to_owned(),
37 );
38 }
39 return Err("Graph memory is not enabled.".to_owned());
40 };
41 Ok((memory, store))
42 }
43}
44
45impl<C: Channel + Send + 'static> AgentAccess for Agent<C> {
46 fn memory_tiers<'a>(
49 &'a mut self,
50 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
51 Box::pin(
52 async move {
53 let Some(memory) = self.services.memory.persistence.memory.clone() else {
54 return Ok("Memory not configured.".to_owned());
55 };
56 match memory.sqlite().count_messages_by_tier().await {
57 Ok((episodic, semantic)) => {
58 let mut out = String::new();
59 let _ = writeln!(out, "Memory tiers:");
60 let _ = writeln!(out, " Working: (current context window — virtual)");
61 let _ = writeln!(out, " Episodic: {episodic} messages");
62 let _ = writeln!(out, " Semantic: {semantic} facts");
63 Ok(out.trim_end().to_owned())
64 }
65 Err(e) => Ok(format!("Failed to query tier stats: {e}")),
66 }
67 }
68 .instrument(tracing::info_span!("core.agent_access.memory_tiers")),
69 )
70 }
71
72 fn memory_promote<'a>(
73 &'a mut self,
74 ids_str: &'a str,
75 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
76 Box::pin(
77 async move {
78 let Some(memory) = self.services.memory.persistence.memory.clone() else {
79 return Ok("Memory not configured.".to_owned());
80 };
81 let ids: Vec<MessageId> = ids_str
82 .split_whitespace()
83 .filter_map(|s| s.parse::<i64>().ok().map(MessageId))
84 .collect();
85 if ids.is_empty() {
86 return Ok(
87 "Usage: /memory promote <id> [id...]\nExample: /memory promote 42 43 44"
88 .to_owned(),
89 );
90 }
91 match memory.sqlite().manual_promote(&ids).await {
92 Ok(count) => Ok(format!("Promoted {count} message(s) to semantic tier.")),
93 Err(e) => Ok(format!("Promotion failed: {e}")),
94 }
95 }
96 .instrument(tracing::info_span!("core.agent_access.memory_promote")),
97 )
98 }
99
100 fn graph_stats<'a>(
103 &'a mut self,
104 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
105 Box::pin(
106 async move {
107 let (_, store) = match self.resolve_graph_store() {
108 Ok(pair) => pair,
109 Err(msg) => return Ok(msg),
110 };
111
112 let stats_future = async {
113 tokio::join!(
114 store.entity_count(),
115 store.active_edge_count(),
116 store.community_count(),
117 store.edge_type_distribution()
118 )
119 };
120 let Ok((entities, edges, communities, distribution)) =
121 tokio::time::timeout(Duration::from_secs(5), stats_future).await
122 else {
123 tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
124 return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
125 };
126 let mut msg = format!(
127 "Graph memory: {} entities, {} edges, {} communities",
128 entities.unwrap_or(0),
129 edges.unwrap_or(0),
130 communities.unwrap_or(0)
131 );
132 if let Ok(dist) = distribution
133 && !dist.is_empty()
134 {
135 let dist_str: Vec<String> =
136 dist.iter().map(|(t, c)| format!("{t}={c}")).collect();
137 write!(msg, "\nEdge types: {}", dist_str.join(", ")).unwrap_or(());
138 }
139 Ok(msg)
140 }
141 .instrument(tracing::info_span!("core.agent_access.graph_stats")),
142 )
143 }
144
145 fn graph_entities<'a>(
146 &'a mut self,
147 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
148 Box::pin(
149 async move {
150 let (_, store) = match self.resolve_graph_store() {
151 Ok(pair) => pair,
152 Err(msg) => return Ok(msg),
153 };
154
155 let entities = match tokio::time::timeout(
156 Duration::from_secs(5),
157 store.all_entities(),
158 )
159 .await
160 {
161 Ok(Ok(v)) => v,
162 Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
163 Err(_) => {
164 tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
165 return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
166 }
167 };
168 if entities.is_empty() {
169 return Ok("No entities found.".to_owned());
170 }
171
172 let total = entities.len();
173 let display: Vec<String> = entities
174 .iter()
175 .take(50)
176 .map(|e| {
177 format!(
178 " {:<40} {:<15} {}",
179 e.name,
180 e.entity_type.as_str(),
181 e.last_seen_at.split('T').next().unwrap_or(&e.last_seen_at)
182 )
183 })
184 .collect();
185 let mut msg = format!(
186 "Entities ({total} total):\n {:<40} {:<15} {}\n{}",
187 "NAME",
188 "TYPE",
189 "LAST SEEN",
190 display.join("\n")
191 );
192 if total > 50 {
193 write!(msg, "\n ...and {} more", total - 50).unwrap_or(());
194 }
195 Ok(msg)
196 }
197 .instrument(tracing::info_span!("core.agent_access.graph_entities")),
198 )
199 }
200
201 fn graph_facts<'a>(
202 &'a mut self,
203 name: &'a str,
204 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
205 Box::pin(
206 async move {
207 let (_, store) = match self.resolve_graph_store() {
208 Ok(pair) => pair,
209 Err(msg) => return Ok(msg),
210 };
211
212 let matches = match tokio::time::timeout(
213 Duration::from_secs(5),
214 store.find_entity_by_name(name),
215 )
216 .await
217 {
218 Ok(Ok(v)) => v,
219 Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
220 Err(_) => {
221 tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
222 return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
223 }
224 };
225 if matches.is_empty() {
226 return Ok(format!("No entity found matching '{name}'."));
227 }
228
229 let entity = &matches[0];
230 let edges = match tokio::time::timeout(
231 Duration::from_secs(5),
232 store.edges_for_entity(entity.id.0),
233 )
234 .await
235 {
236 Ok(Ok(v)) => v,
237 Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
238 Err(_) => {
239 tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
240 return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
241 }
242 };
243 if edges.is_empty() {
244 return Ok(format!("Entity '{}' has no known facts.", entity.name));
245 }
246
247 let mut entity_names: std::collections::HashMap<i64, String> =
248 std::collections::HashMap::new();
249 entity_names.insert(entity.id.0, entity.name.clone());
250 for edge in &edges {
251 let other_id = if edge.source_entity_id == entity.id.0 {
252 edge.target_entity_id
253 } else {
254 edge.source_entity_id
255 };
256 entity_names.entry(other_id).or_default();
257 }
258 for (&id, name_val) in &mut entity_names {
259 if name_val.is_empty() {
260 let result = tokio::time::timeout(
261 Duration::from_secs(5),
262 store.find_entity_by_id(id),
263 )
264 .await;
265 if let Ok(Ok(Some(other))) = result {
266 *name_val = other.name;
267 } else {
268 *name_val = format!("#{id}");
269 }
270 }
271 }
272
273 let lines: Vec<String> = edges
274 .iter()
275 .map(|e| {
276 let src = entity_names
277 .get(&e.source_entity_id)
278 .cloned()
279 .unwrap_or_else(|| format!("#{}", e.source_entity_id));
280 let tgt = entity_names
281 .get(&e.target_entity_id)
282 .cloned()
283 .unwrap_or_else(|| format!("#{}", e.target_entity_id));
284 format!(
285 " {} --[{}/{}]--> {}: {} (confidence: {:.2})",
286 src, e.relation, e.edge_type, tgt, e.fact, e.confidence
287 )
288 })
289 .collect();
290 Ok(format!(
291 "Facts for '{}':\n{}",
292 entity.name,
293 lines.join("\n")
294 ))
295 }
296 .instrument(tracing::info_span!("core.agent_access.graph_facts")),
297 )
298 }
299
300 fn graph_history<'a>(
301 &'a mut self,
302 name: &'a str,
303 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
304 Box::pin(
305 async move {
306 let (_, store) = match self.resolve_graph_store() {
307 Ok(pair) => pair,
308 Err(msg) => return Ok(msg),
309 };
310
311 let matches = match tokio::time::timeout(
312 Duration::from_secs(5),
313 store.find_entity_by_name(name),
314 )
315 .await
316 {
317 Ok(Ok(v)) => v,
318 Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
319 Err(_) => {
320 tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
321 return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
322 }
323 };
324 if matches.is_empty() {
325 return Ok(format!("No entity found matching '{name}'."));
326 }
327
328 let entity = &matches[0];
329 let edges = match tokio::time::timeout(
330 Duration::from_secs(5),
331 store.edge_history_for_entity(entity.id.0, 50),
332 )
333 .await
334 {
335 Ok(Ok(v)) => v,
336 Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
337 Err(_) => {
338 tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
339 return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
340 }
341 };
342 if edges.is_empty() {
343 return Ok(format!("Entity '{}' has no edge history.", entity.name));
344 }
345
346 let mut entity_names: std::collections::HashMap<i64, String> =
347 std::collections::HashMap::new();
348 entity_names.insert(entity.id.0, entity.name.clone());
349 for edge in &edges {
350 for &id in &[edge.source_entity_id, edge.target_entity_id] {
351 entity_names.entry(id).or_default();
352 }
353 }
354 for (&id, name_val) in &mut entity_names {
355 if name_val.is_empty() {
356 let result = tokio::time::timeout(
357 Duration::from_secs(5),
358 store.find_entity_by_id(id),
359 )
360 .await;
361 if let Ok(Ok(Some(other))) = result {
362 *name_val = other.name;
363 } else {
364 *name_val = format!("#{id}");
365 }
366 }
367 }
368
369 let n = edges.len();
370 let lines: Vec<String> = edges
371 .iter()
372 .map(|e| {
373 let status = if e.valid_to.is_some() {
374 let date = e
375 .valid_to
376 .as_deref()
377 .and_then(|s| s.split('T').next().or_else(|| s.split(' ').next()))
378 .unwrap_or("?");
379 format!("[expired {date}]")
380 } else {
381 "[active]".to_string()
382 };
383 let src = entity_names
384 .get(&e.source_entity_id)
385 .cloned()
386 .unwrap_or_else(|| format!("#{}", e.source_entity_id));
387 let tgt = entity_names
388 .get(&e.target_entity_id)
389 .cloned()
390 .unwrap_or_else(|| format!("#{}", e.target_entity_id));
391 format!(
392 " {status} {} --[{}/{}]--> {}: {} (confidence: {:.2})",
393 src, e.relation, e.edge_type, tgt, e.fact, e.confidence
394 )
395 })
396 .collect();
397 Ok(format!(
398 "Edge history for '{}' ({n} edges):\n{}",
399 entity.name,
400 lines.join("\n")
401 ))
402 }
403 .instrument(tracing::info_span!("core.agent_access.graph_history")),
404 )
405 }
406
407 fn graph_communities<'a>(
408 &'a mut self,
409 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
410 Box::pin(
411 async move {
412 let (_, store) = match self.resolve_graph_store() {
413 Ok(pair) => pair,
414 Err(msg) => return Ok(msg),
415 };
416
417 let communities =
418 match tokio::time::timeout(Duration::from_secs(5), store.all_communities())
419 .await
420 {
421 Ok(Ok(v)) => v,
422 Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
423 Err(_) => {
424 tracing::warn!(
425 "graph store call timed out after 5s (Qdrant unreachable)"
426 );
427 return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
428 }
429 };
430 if communities.is_empty() {
431 return Ok("No communities detected yet. Run graph backfill first.".to_owned());
432 }
433
434 let lines: Vec<String> = communities
435 .iter()
436 .map(|c| format!(" [{}]: {}", c.name, c.summary))
437 .collect();
438 Ok(format!(
439 "Communities ({}):\n{}",
440 communities.len(),
441 lines.join("\n")
442 ))
443 }
444 .instrument(tracing::info_span!("core.agent_access.graph_communities")),
445 )
446 }
447
448 #[allow(clippy::too_many_lines)]
449 fn graph_backfill<'a>(
450 &'a mut self,
451 limit: Option<usize>,
452 progress_cb: &'a mut (dyn FnMut(String) + Send),
453 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
454 let store = match self.resolve_graph_store() {
455 Ok((_, s)) => s,
456 Err(msg) => return Box::pin(async move { Ok(msg) }),
457 };
458 let graph_cfg = self.services.memory.extraction.graph_config.clone();
459 let provider = if graph_cfg.extract_provider.as_str().is_empty() {
460 self.provider.clone()
461 } else {
462 self.resolve_background_provider(graph_cfg.extract_provider.as_str())
463 };
464 Box::pin(
465 async move {
466 let total = store.unprocessed_message_count().await.unwrap_or(0);
467 let cap = limit.unwrap_or(usize::MAX);
468
469 progress_cb(format!(
470 "Starting graph backfill... ({total} unprocessed messages)"
471 ));
472
473 let batch_size = 50usize;
474 let mut processed = 0usize;
475 let mut total_entities = 0usize;
476 let mut total_edges = 0usize;
477
478 loop {
479 let remaining_cap = cap.saturating_sub(processed);
480 if remaining_cap == 0 {
481 break;
482 }
483 let batch_limit = batch_size.min(remaining_cap);
484 let messages = store
485 .unprocessed_messages_for_backfill(batch_limit)
486 .await
487 .map_err(|e| CommandError::new(e.to_string()))?;
488 if messages.is_empty() {
489 break;
490 }
491
492 let ids: Vec<zeph_memory::types::MessageId> =
493 messages.iter().map(|(id, _)| *id).collect();
494
495 for (_id, content) in &messages {
496 if content.trim().is_empty() {
497 continue;
498 }
499 let extraction_cfg = GraphExtractionConfig {
500 max_entities: graph_cfg.max_entities_per_message,
501 max_edges: graph_cfg.max_edges_per_message,
502 extraction_timeout_secs: graph_cfg.extraction_timeout_secs,
503 community_refresh_interval: 0,
504 expired_edge_retention_days: graph_cfg.expired_edge_retention_days,
505 max_entities_cap: graph_cfg.max_entities,
506 community_summary_max_prompt_bytes: graph_cfg
507 .community_summary_max_prompt_bytes,
508 community_summary_concurrency: graph_cfg.community_summary_concurrency,
509 lpa_edge_chunk_size: graph_cfg.lpa_edge_chunk_size,
510 note_linking: zeph_memory::NoteLinkingConfig::default(),
511 link_weight_decay_lambda: graph_cfg.link_weight_decay_lambda,
512 link_weight_decay_interval_secs: graph_cfg
513 .link_weight_decay_interval_secs,
514 belief_revision_enabled: graph_cfg.belief_revision.enabled,
515 belief_revision_similarity_threshold: graph_cfg
516 .belief_revision
517 .similarity_threshold,
518 conversation_id: None,
519 apex_mem_enabled: graph_cfg.apex_mem.enabled,
520 llm_timeout_secs: graph_cfg.llm_timeout_secs,
521 };
522 let pool = store.pool().clone();
523 match extract_and_store(
524 content.clone(),
525 vec![],
526 provider.clone(),
527 pool,
528 extraction_cfg,
529 None,
530 None,
531 )
532 .await
533 {
534 Ok(result) => {
535 total_entities += result.stats.entities_upserted;
536 total_edges += result.stats.edges_inserted;
537 }
538 Err(e) => {
539 tracing::warn!("backfill extraction error: {e:#}");
540 }
541 }
542 }
543
544 store
545 .mark_messages_graph_processed(&ids)
546 .await
547 .map_err(|e| CommandError::new(e.to_string()))?;
548 processed += messages.len();
549
550 progress_cb(format!(
551 "Backfill progress: {processed} messages processed, \
552 {total_entities} entities, {total_edges} edges"
553 ));
554 }
555
556 Ok(format!(
557 "Backfill complete: {total_entities} entities, {total_edges} edges \
558 extracted from {processed} messages"
559 ))
560 }
561 .instrument(tracing::info_span!("core.agent_access.graph_backfill")),
562 )
563 }
564
565 fn guidelines<'a>(
568 &'a mut self,
569 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
570 Box::pin(
571 async move {
572 const MAX_DISPLAY_CHARS: usize = 4096;
573
574 let Some(memory) = &self.services.memory.persistence.memory else {
575 return Ok("No memory backend initialised.".to_owned());
576 };
577
578 let cid = self.services.memory.persistence.conversation_id;
579 let sqlite = memory.sqlite();
580
581 let (version, text) = sqlite
582 .load_compression_guidelines(cid)
583 .await
584 .map_err(|e: zeph_memory::MemoryError| CommandError::new(e.to_string()))?;
585
586 if version == 0 || text.is_empty() {
587 return Ok("No compression guidelines generated yet.".to_owned());
588 }
589
590 let (_, created_at) = sqlite
591 .load_compression_guidelines_meta(cid)
592 .await
593 .unwrap_or((0, String::new()));
594
595 let (body, truncated) = if text.len() > MAX_DISPLAY_CHARS {
596 let end = text.floor_char_boundary(MAX_DISPLAY_CHARS);
597 (&text[..end], true)
598 } else {
599 (text.as_str(), false)
600 };
601
602 let mut output =
603 format!("Compression Guidelines (v{version}, updated {created_at}):\n\n{body}");
604 if truncated {
605 output.push_str("\n\n[truncated]");
606 }
607 Ok(output)
608 }
609 .instrument(tracing::info_span!("core.agent_access.guidelines")),
610 )
611 }
612
613 fn handle_model<'a>(
616 &'a mut self,
617 arg: &'a str,
618 ) -> Pin<Box<dyn Future<Output = String> + Send + 'a>> {
619 Box::pin(async move {
620 let input = if arg.is_empty() {
621 "/model".to_owned()
622 } else {
623 format!("/model {arg}")
624 };
625 self.handle_model_command_as_string(&input).await
626 })
627 }
628
629 fn handle_provider<'a>(
630 &'a mut self,
631 arg: &'a str,
632 ) -> Pin<Box<dyn Future<Output = String> + Send + 'a>> {
633 Box::pin(async move { self.handle_provider_command_as_string(arg).await })
634 }
635
636 fn handle_policy<'a>(
639 &'a mut self,
640 args: &'a str,
641 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
642 Box::pin(async move { Ok(self.handle_policy_command_as_string(args)) })
643 }
644
645 #[cfg(feature = "scheduler")]
648 fn list_scheduled_tasks<'a>(
649 &'a mut self,
650 ) -> Pin<Box<dyn Future<Output = Result<Option<String>, CommandError>> + Send + 'a>> {
651 Box::pin(async move {
652 let result = self
653 .handle_scheduler_list_as_string()
654 .await
655 .map_err(|e| CommandError::new(e.to_string()))?;
656 Ok(Some(result))
657 })
658 }
659
660 #[cfg(not(feature = "scheduler"))]
661 fn list_scheduled_tasks<'a>(
662 &'a mut self,
663 ) -> Pin<Box<dyn Future<Output = Result<Option<String>, CommandError>> + Send + 'a>> {
664 Box::pin(async move { Ok(None) })
665 }
666
667 fn lsp_status<'a>(
670 &'a mut self,
671 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
672 Box::pin(async move {
673 self.handle_lsp_status_as_string()
674 .await
675 .map_err(|e| CommandError::new(e.to_string()))
676 })
677 }
678
679 fn session_recap<'a>(
682 &'a mut self,
683 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
684 Box::pin(
685 async move {
686 match self.build_recap().await {
687 Ok(text) => Ok(text),
688 Err(e) => {
689 tracing::warn!("session recap command: {}", e.0);
693 Ok("Recap unavailable — see logs for details".to_string())
694 }
695 }
696 }
697 .instrument(tracing::info_span!("core.agent_access.session_recap")),
698 )
699 }
700
701 fn compact_context<'a>(
704 &'a mut self,
705 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
706 Box::pin(
707 self.compact_context_command()
708 .instrument(tracing::info_span!("core.agent_access.compact_context")),
709 )
710 }
711
712 fn reset_conversation<'a>(
715 &'a mut self,
716 keep_plan: bool,
717 no_digest: bool,
718 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
719 Box::pin(async move {
720 match self.reset_conversation(keep_plan, no_digest).await {
721 Ok((old_id, new_id)) => {
722 let old = old_id.map_or_else(|| "none".to_string(), |id| id.0.to_string());
723 let new = new_id.map_or_else(|| "none".to_string(), |id| id.0.to_string());
724 let keep_note = if keep_plan { " (plan preserved)" } else { "" };
725 Ok(format!(
726 "New conversation started. Previous: {old} → Current: {new}{keep_note}"
727 ))
728 }
729 Err(e) => Ok(format!("Failed to start new conversation: {e}")),
730 }
731 })
732 }
733
734 fn cache_stats(&self) -> String {
737 self.tool_orchestrator.cache_stats()
738 }
739
740 fn session_status<'a>(
743 &'a mut self,
744 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
745 Box::pin(async move { Ok(self.handle_status_as_string()) })
746 }
747
748 fn guardrail_status(&self) -> String {
751 self.format_guardrail_status()
752 }
753
754 fn focus_status(&self) -> String {
757 self.format_focus_status()
758 }
759
760 fn sidequest_status(&self) -> String {
763 self.format_sidequest_status()
764 }
765
766 fn load_image<'a>(
769 &'a mut self,
770 path: &'a str,
771 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
772 Box::pin(async move { Ok(self.handle_image_as_string(path)) })
773 }
774
775 fn handle_mcp<'a>(
778 &'a mut self,
779 args: &'a str,
780 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
781 let args_owned = args.to_owned();
784 let parts: Vec<String> = args_owned.split_whitespace().map(str::to_owned).collect();
785 let sub = parts.first().cloned().unwrap_or_default();
786
787 match sub.as_str() {
788 "list" => {
789 let manager = self.services.mcp.manager.clone();
791 let tools_snapshot: Vec<(String, String)> = self
792 .services
793 .mcp
794 .tools
795 .iter()
796 .map(|t| (t.server_id.clone(), t.name.clone()))
797 .collect();
798 Box::pin(async move {
799 use std::fmt::Write;
800 let Some(manager) = manager else {
801 return Ok("MCP is not enabled.".to_owned());
802 };
803 let server_ids = manager.list_servers().await;
804 if server_ids.is_empty() {
805 return Ok("No MCP servers connected.".to_owned());
806 }
807 let mut output = String::from("Connected MCP servers:\n");
808 let mut total = 0usize;
809 for id in &server_ids {
810 let count = tools_snapshot.iter().filter(|(sid, _)| sid == id).count();
811 total += count;
812 let _ = writeln!(output, "- {id} ({count} tools)");
813 }
814 let _ = write!(output, "Total: {total} tool(s)");
815 Ok(output)
816 })
817 }
818 "tools" => {
819 let server_id = parts.get(1).cloned();
821 let owned_tools: Vec<(String, String)> = if let Some(ref sid) = server_id {
822 self.services
823 .mcp
824 .tools
825 .iter()
826 .filter(|t| &t.server_id == sid)
827 .map(|t| (t.name.clone(), t.description.clone()))
828 .collect()
829 } else {
830 Vec::new()
831 };
832 Box::pin(async move {
833 use std::fmt::Write;
834 let Some(server_id) = server_id else {
835 return Ok("Usage: /mcp tools <server_id>".to_owned());
836 };
837 if owned_tools.is_empty() {
838 return Ok(format!("No tools found for server '{server_id}'."));
839 }
840 let mut output =
841 format!("Tools for '{server_id}' ({} total):\n", owned_tools.len());
842 for (name, desc) in &owned_tools {
843 if desc.is_empty() {
844 let _ = writeln!(output, "- {name}");
845 } else {
846 let _ = writeln!(output, "- {name} — {desc}");
847 }
848 }
849 Ok(output)
850 })
851 }
852 _ => Box::pin(async move {
859 self.handle_mcp_command(&args_owned)
860 .await
861 .map_err(|e| CommandError::new(e.to_string()))
862 }),
863 }
864 }
865
866 fn handle_skill<'a>(
869 &'a mut self,
870 args: &'a str,
871 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
872 let args_owned = args.to_owned();
873 Box::pin(async move {
874 self.handle_skill_command_as_string(&args_owned)
875 .await
876 .map_err(|e| CommandError::new(e.to_string()))
877 })
878 }
879
880 fn handle_skills<'a>(
883 &'a mut self,
884 args: &'a str,
885 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
886 let args_owned = args.to_owned();
887 Box::pin(async move {
888 self.handle_skills_as_string(&args_owned)
889 .await
890 .map_err(|e| CommandError::new(e.to_string()))
891 })
892 }
893
894 fn handle_feedback_command<'a>(
897 &'a mut self,
898 args: &'a str,
899 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
900 let args_owned = args.to_owned();
901 Box::pin(async move {
902 self.handle_feedback_as_string(&args_owned)
903 .await
904 .map_err(|e| CommandError::new(e.to_string()))
905 })
906 }
907
908 #[cfg(feature = "scheduler")]
911 fn handle_plan<'a>(
912 &'a mut self,
913 input: &'a str,
914 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
915 Box::pin(async move {
916 self.dispatch_plan_command_as_string(input)
917 .await
918 .map_err(|e| CommandError::new(e.to_string()))
919 })
920 }
921
922 #[cfg(not(feature = "scheduler"))]
923 fn handle_plan<'a>(
924 &'a mut self,
925 _input: &'a str,
926 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
927 Box::pin(async move { Ok(String::new()) })
928 }
929
930 fn handle_experiment<'a>(
933 &'a mut self,
934 input: &'a str,
935 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
936 Box::pin(async move {
937 self.handle_experiment_command_as_string(input)
938 .await
939 .map_err(|e| CommandError::new(e.to_string()))
940 })
941 }
942
943 fn handle_agent_dispatch<'a>(
946 &'a mut self,
947 input: &'a str,
948 ) -> Pin<Box<dyn Future<Output = Result<Option<String>, CommandError>> + Send + 'a>> {
949 Box::pin(async move {
950 match self.dispatch_agent_command(input).await {
951 Some(Err(e)) => Err(CommandError::new(e.to_string())),
952 Some(Ok(())) | None => Ok(None),
953 }
954 })
955 }
956
957 fn handle_plugins<'a>(
960 &'a mut self,
961 args: &'a str,
962 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
963 let args_owned = args.to_owned();
964 let managed_dir = self.services.skill.managed_dir.clone();
967 let mcp_allowed = self.services.mcp.allowed_commands.clone();
968 let base_shell_allowed = self.runtime.lifecycle.startup_shell_overlay.allowed.clone();
969 Box::pin(async move {
970 tokio::task::spawn_blocking(move || {
973 Self::run_plugin_command(&args_owned, managed_dir, mcp_allowed, base_shell_allowed)
974 })
975 .await
976 .map_err(|e| CommandError(format!("plugin task panicked: {e}")))
977 })
978 }
979
980 fn handle_acp<'a>(
983 &'a mut self,
984 args: &'a str,
985 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
986 Box::pin(async move {
987 self.handle_acp_as_string(args)
988 .map_err(|e| CommandError::new(e.to_string()))
989 })
990 }
991
992 #[cfg(feature = "cocoon")]
995 fn handle_cocoon<'a>(
996 &'a mut self,
997 args: &'a str,
998 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
999 Box::pin(async move {
1000 self.handle_cocoon_as_string(args)
1001 .await
1002 .map_err(|e| CommandError::new(e.to_string()))
1003 })
1004 }
1005
1006 #[cfg(not(feature = "cocoon"))]
1007 fn handle_cocoon<'a>(
1008 &'a mut self,
1009 _args: &'a str,
1010 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1011 Box::pin(async {
1012 Ok("Cocoon support is not compiled in. Rebuild with `--features cocoon`.".to_owned())
1013 })
1014 }
1015
1016 fn handle_loop<'a>(
1019 &'a mut self,
1020 args: &'a str,
1021 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1022 use zeph_commands::handlers::loop_cmd::parse_loop_args;
1023
1024 let args_owned = args.trim().to_owned();
1025 Box::pin(async move {
1026 if args_owned == "stop" {
1027 return Ok(self.stop_user_loop());
1028 }
1029 if args_owned == "status" {
1030 return Ok(match &self.runtime.lifecycle.user_loop {
1031 Some(ls) => format!(
1032 "Loop active: \"{}\" (iteration {}, interval every {}s).",
1033 ls.prompt,
1034 ls.iteration,
1035 ls.interval.period().as_secs(),
1036 ),
1037 None => "No active loop.".to_owned(),
1038 });
1039 }
1040 let (prompt, interval_secs) = parse_loop_args(&args_owned)?;
1041
1042 if prompt.starts_with('/') {
1043 return Err(CommandError::new(
1044 "Loop prompt must not start with '/'. Slash commands cannot be used as loop prompts.",
1045 ));
1046 }
1047
1048 let min_secs = self.runtime.config.loop_min_interval_secs;
1049 if interval_secs < min_secs {
1050 return Err(CommandError::new(format!(
1051 "Minimum loop interval is {min_secs}s. Got {interval_secs}s."
1052 )));
1053 }
1054 if self.runtime.lifecycle.user_loop.is_some() {
1055 return Err(CommandError::new(
1056 "A loop is already active. Use /loop stop first.",
1057 ));
1058 }
1059
1060 self.start_user_loop(prompt.clone(), interval_secs);
1061 Ok(format!(
1062 "Loop started: \"{prompt}\" every {interval_secs}s. Use /loop stop to cancel."
1063 ))
1064 })
1065 }
1066
1067 fn notify_test<'a>(
1068 &'a mut self,
1069 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1070 let notifier = self.runtime.lifecycle.notifier.clone();
1071 Box::pin(async move {
1072 let Some(notifier) = notifier else {
1073 return Ok(
1074 "Notifications are disabled. Set `notifications.enabled = true` in config."
1075 .to_owned(),
1076 );
1077 };
1078 match notifier.fire_test().await {
1079 Ok(()) => Ok("Test notification sent.".to_owned()),
1080 Err(e) => Err(CommandError::new(format!("notification test failed: {e}"))),
1081 }
1082 })
1083 }
1084
1085 fn handle_trajectory(&mut self, args: &str) -> String {
1086 self.handle_trajectory_command_as_string(args)
1087 }
1088
1089 fn handle_scope(&self, args: &str) -> String {
1090 self.handle_scope_command_as_string(args)
1091 }
1092
1093 fn handle_goal<'a>(
1096 &'a mut self,
1097 args: &'a str,
1098 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1099 if self.services.goal_accounting.is_none() {
1101 if !self.runtime.config.goals.enabled {
1102 return Box::pin(async {
1103 Ok("Goals are disabled. Set `[goals] enabled = true` in config.".to_owned())
1104 });
1105 }
1106 let pool = match self.services.memory.persistence.memory.as_ref() {
1107 Some(m) => std::sync::Arc::new(m.sqlite().pool().clone()),
1108 None => {
1109 return Box::pin(async {
1110 Ok("Goals require a database backend (memory not configured).".to_owned())
1111 });
1112 }
1113 };
1114 let store = std::sync::Arc::new(crate::goal::GoalStore::new(pool));
1115 let accounting = std::sync::Arc::new(crate::goal::GoalAccounting::new(store));
1116 self.services.goal_accounting = Some(accounting);
1117 }
1118
1119 let accounting =
1120 self.services.goal_accounting.clone().expect(
1121 "invariant: goal_accounting is always Some at this point (initialized above)",
1122 );
1123 let max_chars = self.runtime.config.goals.max_text_chars;
1124 let default_budget = self.runtime.config.goals.default_token_budget;
1125 let autonomous_enabled = self.runtime.config.goals.autonomous_enabled;
1126 let autonomous_max_turns = self.runtime.config.goals.autonomous_max_turns;
1127 let args_owned = args.to_owned();
1128
1129 let pending_start_arc = std::sync::Arc::clone(&self.services.autonomous.pending_start_arc);
1134
1135 Box::pin(async move {
1136 let _ = accounting.refresh().await;
1137 let store = accounting.get_store();
1138 let args = args_owned.as_str();
1139
1140 match args {
1141 "" | "status" => goal_status(&accounting).await,
1142 "pause" => goal_pause(&accounting, &store).await,
1143 "resume" => goal_resume(&accounting, &store).await,
1144 "complete" => goal_complete(&accounting, &store).await,
1145 "clear" => goal_clear(&accounting, &store).await,
1146 "list" => goal_list(&store).await,
1147 _ if args.starts_with("create") => {
1148 let (msg, auto_req) = goal_create(
1149 args,
1150 &accounting,
1151 &store,
1152 max_chars,
1153 default_budget,
1154 autonomous_enabled,
1155 autonomous_max_turns,
1156 )
1157 .await?;
1158 if let Some(req) = auto_req {
1159 *pending_start_arc.lock() = Some(req);
1160 }
1161 Ok(msg)
1162 }
1163 _ => Ok(
1164 "Unknown /goal subcommand. Try: create, pause, resume, complete, clear, status, list."
1165 .to_owned(),
1166 ),
1167 }
1168 })
1169 }
1170
1171 fn active_goal_snapshot(&self) -> Option<zeph_commands::GoalSnapshot> {
1172 let accounting = self.services.goal_accounting.as_ref()?;
1173 let snap = accounting.snapshot()?;
1174 Some(zeph_commands::GoalSnapshot {
1175 id: snap.id,
1176 text: snap.text,
1177 status: match snap.status {
1178 crate::goal::GoalStatus::Active => zeph_commands::GoalStatusView::Active,
1179 crate::goal::GoalStatus::Paused => zeph_commands::GoalStatusView::Paused,
1180 crate::goal::GoalStatus::Completed => zeph_commands::GoalStatusView::Completed,
1181 crate::goal::GoalStatus::Cleared => zeph_commands::GoalStatusView::Cleared,
1182 },
1183 turns_used: snap.turns_used,
1184 tokens_used: snap.tokens_used,
1185 token_budget: snap.token_budget,
1186 })
1187 }
1188
1189 fn handle_agents<'a>(
1192 &'a mut self,
1193 args: &'a str,
1194 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1195 use zeph_commands::handlers::agents_fleet::{FleetEntry, format_fleet_section};
1196 use zeph_subagent::AgentsCommand;
1197
1198 let args_owned = args.trim().to_owned();
1199 Box::pin(async move {
1200 let show_fleet = args_owned.is_empty() || args_owned == "fleet";
1202
1203 let fleet_section = if show_fleet {
1204 let snapshots = self.services.autonomous_registry.list();
1205 let entries: Vec<FleetEntry> = snapshots
1206 .into_iter()
1207 .map(|s| FleetEntry {
1208 goal_id: s.goal_id,
1209 goal_text_short: s.goal_text_short,
1210 state: s.state,
1211 turns_executed: s.turns_executed,
1212 max_turns: s.max_turns,
1213 elapsed: s.elapsed,
1214 })
1215 .collect();
1216 format_fleet_section(&entries)
1217 } else {
1218 String::new()
1219 };
1220
1221 let definitions_section = if show_fleet || args_owned == "list" {
1223 self.handle_agents_definitions_list()
1224 } else {
1225 match AgentsCommand::parse(&format!("/agents {args_owned}")) {
1227 Ok(cmd) => self.handle_agents_crud(cmd),
1228 Err(e) => e.to_string(),
1229 }
1230 };
1231
1232 let mut out = fleet_section;
1233 if !definitions_section.is_empty() {
1234 if !out.is_empty() {
1235 out.push('\n');
1236 }
1237 out.push_str(&definitions_section);
1238 }
1239
1240 if out.is_empty() {
1241 "No active autonomous sessions or sub-agent definitions found."
1242 .clone_into(&mut out);
1243 }
1244
1245 Ok(out)
1246 })
1247 }
1248}
1249
1250type GoalStore = crate::goal::GoalStore;
1251type GoalAccounting = crate::goal::GoalAccounting;
1252
1253const AUTONOMOUS_MAX_TURNS_CAP: u32 = 1000;
1255
1256async fn goal_status(accounting: &GoalAccounting) -> Result<String, CommandError> {
1257 match accounting.get_active().await {
1258 Ok(Some(g)) => {
1259 let budget_line = g.token_budget.map_or_else(
1260 || format!(" tokens used: {}", g.tokens_used),
1261 |b| format!(" budget: {}/{b}", g.tokens_used),
1262 );
1263 Ok(format!(
1264 "Active goal [{}]: {}\n status: {}\n turns: {}\n{}",
1265 &g.id[..8],
1266 g.text,
1267 g.status,
1268 g.turns_used,
1269 budget_line
1270 ))
1271 }
1272 Ok(None) => Ok("No active goal. Use `/goal create <text>` to set one.".to_owned()),
1273 Err(e) => Ok(format!("Goal lookup failed: {e}")),
1274 }
1275}
1276
1277async fn goal_create(
1283 args: &str,
1284 accounting: &GoalAccounting,
1285 store: &GoalStore,
1286 max_chars: usize,
1287 default_budget: Option<u64>,
1288 autonomous_enabled: bool,
1289 autonomous_max_turns: u32,
1290) -> Result<(String, Option<(String, String, u32)>), CommandError> {
1291 let rest = args.strip_prefix("create").unwrap_or("").trim();
1292
1293 let (stripped, is_auto, explicit_turns) = parse_auto_flags(rest);
1295 let (text, explicit_budget) = parse_goal_create_args(&stripped);
1296
1297 if text.is_empty() {
1298 return Ok((
1299 "Usage: /goal create <text> [--budget N] [--auto [--turns N]]".to_owned(),
1300 None,
1301 ));
1302 }
1303 if is_auto && !autonomous_enabled {
1304 return Ok((
1305 "Autonomous mode is disabled. Set `[goals] autonomous_enabled = true` in config."
1306 .to_owned(),
1307 None,
1308 ));
1309 }
1310 let budget = explicit_budget.or(default_budget.filter(|&b| b > 0));
1311
1312 let max_turns = explicit_turns
1313 .unwrap_or(autonomous_max_turns)
1314 .min(AUTONOMOUS_MAX_TURNS_CAP);
1315 if explicit_turns.is_some_and(|t| t > AUTONOMOUS_MAX_TURNS_CAP) {
1316 tracing::warn!(
1317 requested = explicit_turns,
1318 capped = AUTONOMOUS_MAX_TURNS_CAP,
1319 "autonomous max_turns capped to {AUTONOMOUS_MAX_TURNS_CAP}"
1320 );
1321 }
1322
1323 match store.create(text, budget, max_chars).await {
1324 Ok(g) => {
1325 let _ = accounting.refresh().await;
1326 let auto_start = if is_auto {
1327 Some((g.id.clone(), g.text.clone(), max_turns))
1328 } else {
1329 None
1330 };
1331 let auto_note = if is_auto {
1332 " Autonomous mode enabled — use `/goal clear` to stop."
1333 } else {
1334 ""
1335 };
1336 Ok((
1337 format!("Goal created [{}]: {}{auto_note}", &g.id[..8], g.text),
1338 auto_start,
1339 ))
1340 }
1341 Err(crate::goal::store::GoalError::TextTooLong { max }) => Ok((
1342 format!("Goal text exceeds {max} characters. Please shorten it."),
1343 None,
1344 )),
1345 Err(e) => Ok((format!("Failed to create goal: {e}"), None)),
1346 }
1347}
1348
1349async fn goal_pause(
1350 accounting: &GoalAccounting,
1351 store: &GoalStore,
1352) -> Result<String, CommandError> {
1353 match accounting.get_active().await {
1354 Ok(Some(g)) => {
1355 match store
1356 .transition(&g.id, crate::goal::GoalStatus::Paused, g.updated_at)
1357 .await
1358 {
1359 Ok(_) => {
1360 let _ = accounting.refresh().await;
1361 Ok(format!("Goal [{}] paused.", &g.id[..8]))
1362 }
1363 Err(crate::goal::store::GoalError::StaleUpdate(_)) => {
1364 let current = accounting.get_active().await.ok().flatten();
1365 Ok(format!(
1366 "Goal state changed concurrently. Current: {}",
1367 current.map_or_else(|| "none".into(), |g| g.status.to_string())
1368 ))
1369 }
1370 Err(e) => Ok(format!("Pause failed: {e}")),
1371 }
1372 }
1373 Ok(None) => Ok("No active goal to pause.".to_owned()),
1374 Err(e) => Ok(format!("Failed: {e}")),
1375 }
1376}
1377
1378async fn goal_resume(
1379 accounting: &GoalAccounting,
1380 store: &GoalStore,
1381) -> Result<String, CommandError> {
1382 let goals = store.list(10).await.unwrap_or_default();
1383 let paused = goals
1384 .into_iter()
1385 .find(|g| g.status == crate::goal::GoalStatus::Paused);
1386 match paused {
1387 Some(g) => {
1388 match store
1389 .transition(&g.id, crate::goal::GoalStatus::Active, g.updated_at)
1390 .await
1391 {
1392 Ok(_) => {
1393 let _ = accounting.refresh().await;
1394 Ok(format!("Goal [{}] resumed: {}", &g.id[..8], g.text))
1395 }
1396 Err(crate::goal::store::GoalError::StaleUpdate(_)) => {
1397 Ok("Goal state changed concurrently — please retry.".to_owned())
1398 }
1399 Err(e) => Ok(format!("Resume failed: {e}")),
1400 }
1401 }
1402 None => Ok("No paused goal to resume.".to_owned()),
1403 }
1404}
1405
1406async fn goal_complete(
1407 accounting: &GoalAccounting,
1408 store: &GoalStore,
1409) -> Result<String, CommandError> {
1410 match accounting.get_active().await {
1411 Ok(Some(g)) => {
1412 match store
1413 .transition(&g.id, crate::goal::GoalStatus::Completed, g.updated_at)
1414 .await
1415 {
1416 Ok(_) => {
1417 let _ = accounting.refresh().await;
1418 Ok(format!("Goal [{}] marked complete.", &g.id[..8]))
1419 }
1420 Err(e) => Ok(format!("Complete failed: {e}")),
1421 }
1422 }
1423 Ok(None) => Ok("No active goal.".to_owned()),
1424 Err(e) => Ok(format!("Failed: {e}")),
1425 }
1426}
1427
1428async fn goal_clear(
1429 accounting: &GoalAccounting,
1430 store: &GoalStore,
1431) -> Result<String, CommandError> {
1432 let goals = store.list(10).await.unwrap_or_default();
1433 let target = goals.into_iter().find(|g| {
1434 g.status == crate::goal::GoalStatus::Active || g.status == crate::goal::GoalStatus::Paused
1435 });
1436 match target {
1437 Some(g) => {
1438 match store
1439 .transition(&g.id, crate::goal::GoalStatus::Cleared, g.updated_at)
1440 .await
1441 {
1442 Ok(_) => {
1443 let _ = accounting.refresh().await;
1444 Ok(format!("Goal [{}] cleared.", &g.id[..8]))
1445 }
1446 Err(e) => Ok(format!("Clear failed: {e}")),
1447 }
1448 }
1449 None => Ok("No active or paused goal to clear.".to_owned()),
1450 }
1451}
1452
1453async fn goal_list(store: &GoalStore) -> Result<String, CommandError> {
1454 let goals = store.list(20).await.unwrap_or_default();
1455 if goals.is_empty() {
1456 return Ok("No goals recorded.".to_owned());
1457 }
1458 let mut out = String::from("Goals:\n");
1459 for g in goals {
1460 let _ = std::fmt::Write::write_fmt(
1461 &mut out,
1462 format_args!(
1463 " {} [{}] {} — {} turns\n",
1464 g.status.badge_symbol(),
1465 &g.id[..8],
1466 g.text,
1467 g.turns_used
1468 ),
1469 );
1470 }
1471 Ok(out.trim_end().to_owned())
1472}
1473
1474fn parse_goal_create_args(args: &str) -> (&str, Option<u64>) {
1475 if let Some(pos) = args.find("--budget") {
1476 let text = args[..pos].trim();
1477 let rest = args[pos + "--budget".len()..].trim();
1478 let budget = rest
1479 .split_whitespace()
1480 .next()
1481 .and_then(|s| s.parse::<u64>().ok());
1482 (text, budget)
1483 } else {
1484 (args, None)
1485 }
1486}
1487
1488fn parse_auto_flags(args: &str) -> (String, bool, Option<u32>) {
1492 let mut is_auto = false;
1493 let mut turns: Option<u32> = None;
1494 let mut text_words: Vec<&str> = Vec::new();
1495 let mut words = args.split_whitespace();
1496
1497 while let Some(w) = words.next() {
1498 if w == "--auto" {
1499 is_auto = true;
1500 } else if w == "--turns" {
1501 turns = words.next().and_then(|n| n.parse::<u32>().ok());
1502 } else {
1503 text_words.push(w);
1504 }
1505 }
1506
1507 (text_words.join(" "), is_auto, turns)
1508}
1509
1510impl From<AgentError> for CommandError {
1512 fn from(e: AgentError) -> Self {
1513 Self(e.to_string())
1514 }
1515}
1516
1517#[cfg(test)]
1518mod tests {
1519 use super::super::agent_tests::{
1520 MockChannel, MockToolExecutor, create_test_registry, mock_provider,
1521 };
1522 use super::*;
1523 use zeph_commands::traits::agent::AgentAccess;
1524 use zeph_memory::semantic::SemanticMemory;
1525
1526 async fn memory_without_qdrant() -> SemanticMemory {
1527 SemanticMemory::new(
1528 ":memory:",
1529 "http://127.0.0.1:1",
1530 None,
1531 zeph_llm::any::AnyProvider::Mock(zeph_llm::mock::MockProvider::default()),
1532 "test-model",
1533 )
1534 .await
1535 .unwrap()
1536 }
1537
1538 #[tokio::test]
1542 async fn graph_stats_enabled_but_no_store_reports_unavailable() {
1543 let cfg = crate::config::GraphConfig {
1544 enabled: true,
1545 ..Default::default()
1546 };
1547 let memory = memory_without_qdrant().await;
1548 let cid = memory.sqlite().create_conversation().await.unwrap();
1549 let mut agent = Agent::new(
1550 mock_provider(vec![]),
1551 MockChannel::new(vec![]),
1552 create_test_registry(),
1553 None,
1554 5,
1555 MockToolExecutor::no_tools(),
1556 )
1557 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1558 .with_graph_config(cfg);
1559
1560 let result = agent.graph_stats().await.unwrap();
1561 assert!(
1562 result.contains("unavailable"),
1563 "expected 'unavailable' but got: {result}"
1564 );
1565 assert!(
1566 !result.contains("not enabled"),
1567 "must not report 'not enabled' when graph is enabled: {result}"
1568 );
1569 }
1570
1571 #[tokio::test]
1572 async fn graph_stats_disabled_reports_not_enabled() {
1573 let cfg = crate::config::GraphConfig {
1574 enabled: false,
1575 ..Default::default()
1576 };
1577 let memory = memory_without_qdrant().await;
1578 let cid = memory.sqlite().create_conversation().await.unwrap();
1579 let mut agent = Agent::new(
1580 mock_provider(vec![]),
1581 MockChannel::new(vec![]),
1582 create_test_registry(),
1583 None,
1584 5,
1585 MockToolExecutor::no_tools(),
1586 )
1587 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1588 .with_graph_config(cfg);
1589
1590 let result = agent.graph_stats().await.unwrap();
1591 assert!(
1592 result.contains("not enabled"),
1593 "expected 'not enabled' but got: {result}"
1594 );
1595 }
1596
1597 #[tokio::test]
1603 async fn graph_backfill_with_extract_provider_resolves_without_panic() {
1604 let cfg = crate::config::GraphConfig {
1605 enabled: true,
1606 extract_provider: zeph_config::providers::ProviderName::new("nonexistent-provider"),
1607 ..Default::default()
1608 };
1609 let mut memory = memory_without_qdrant().await;
1610 let pool = memory.sqlite().pool().clone();
1612 memory.graph_store = Some(std::sync::Arc::new(zeph_memory::GraphStore::new(pool)));
1613 let cid = memory.sqlite().create_conversation().await.unwrap();
1614 let mut agent = Agent::new(
1615 mock_provider(vec![]),
1616 MockChannel::new(vec![]),
1617 create_test_registry(),
1618 None,
1619 5,
1620 MockToolExecutor::no_tools(),
1621 )
1622 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1623 .with_graph_config(cfg);
1624
1625 let mut progress = vec![];
1626 let result = agent
1627 .graph_backfill(Some(10), &mut |msg| progress.push(msg))
1628 .await
1629 .unwrap();
1630
1631 assert!(
1633 result.contains("Backfill complete"),
1634 "expected 'Backfill complete' but got: {result}"
1635 );
1636 }
1637
1638 #[tokio::test]
1641 async fn graph_entities_enabled_but_no_store_reports_unavailable() {
1642 let cfg = crate::config::GraphConfig {
1643 enabled: true,
1644 ..Default::default()
1645 };
1646 let memory = memory_without_qdrant().await;
1647 let cid = memory.sqlite().create_conversation().await.unwrap();
1648 let mut agent = Agent::new(
1649 mock_provider(vec![]),
1650 MockChannel::new(vec![]),
1651 create_test_registry(),
1652 None,
1653 5,
1654 MockToolExecutor::no_tools(),
1655 )
1656 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1657 .with_graph_config(cfg);
1658
1659 let result = agent.graph_entities().await.unwrap();
1660 assert!(
1661 result.contains("unavailable"),
1662 "expected 'unavailable' but got: {result}"
1663 );
1664 }
1665
1666 #[tokio::test]
1668 async fn graph_communities_enabled_but_no_store_reports_unavailable() {
1669 let cfg = crate::config::GraphConfig {
1670 enabled: true,
1671 ..Default::default()
1672 };
1673 let memory = memory_without_qdrant().await;
1674 let cid = memory.sqlite().create_conversation().await.unwrap();
1675 let mut agent = Agent::new(
1676 mock_provider(vec![]),
1677 MockChannel::new(vec![]),
1678 create_test_registry(),
1679 None,
1680 5,
1681 MockToolExecutor::no_tools(),
1682 )
1683 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1684 .with_graph_config(cfg);
1685
1686 let result = agent.graph_communities().await.unwrap();
1687 assert!(
1688 result.contains("unavailable"),
1689 "expected 'unavailable' but got: {result}"
1690 );
1691 }
1692
1693 #[tokio::test]
1698 async fn graph_store_timeout_pattern_fires_on_pending_future() {
1699 use std::future;
1700 let result = tokio::time::timeout(
1701 Duration::from_millis(10),
1702 future::pending::<Result<Vec<()>, String>>(),
1703 )
1704 .await;
1705 assert!(
1706 result.is_err(),
1707 "timeout must fire on a never-resolving future"
1708 );
1709 }
1710}