1use std::fmt::Write as _;
13use std::future::Future;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::Duration;
17
18use tracing::Instrument as _;
19use zeph_commands::CommandError;
20use zeph_commands::traits::agent::AgentAccess;
21use zeph_memory::semantic::SemanticMemory;
22use zeph_memory::{GraphExtractionConfig, GraphStore, MessageId, extract_and_store};
23
24use super::{Agent, error::AgentError};
25use crate::channel::Channel;
26
27impl<C: Channel + Send + 'static> Agent<C> {
28 fn resolve_graph_store(&self) -> Result<(Arc<SemanticMemory>, Arc<GraphStore>), String> {
29 let Some(memory) = self.services.memory.persistence.memory.clone() else {
30 return Err("Graph memory is not enabled.".to_owned());
31 };
32 let Some(store) = memory.graph_store.clone() else {
33 if self.services.memory.extraction.graph_config.enabled {
34 return Err(
35 "Graph memory enabled but vector store unavailable (Qdrant unreachable)."
36 .to_owned(),
37 );
38 }
39 return Err("Graph memory is not enabled.".to_owned());
40 };
41 Ok((memory, store))
42 }
43}
44
45impl<C: Channel + Send + 'static> AgentAccess for Agent<C> {
46 fn memory_tiers<'a>(
49 &'a mut self,
50 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
51 Box::pin(
52 async move {
53 let Some(memory) = self.services.memory.persistence.memory.clone() else {
54 return Ok("Memory not configured.".to_owned());
55 };
56 match memory.sqlite().count_messages_by_tier().await {
57 Ok((episodic, semantic)) => {
58 let mut out = String::new();
59 let _ = writeln!(out, "Memory tiers:");
60 let _ = writeln!(out, " Working: (current context window — virtual)");
61 let _ = writeln!(out, " Episodic: {episodic} messages");
62 let _ = writeln!(out, " Semantic: {semantic} facts");
63 Ok(out.trim_end().to_owned())
64 }
65 Err(e) => Ok(format!("Failed to query tier stats: {e}")),
66 }
67 }
68 .instrument(tracing::info_span!("core.agent_access.memory_tiers")),
69 )
70 }
71
72 fn memory_promote<'a>(
73 &'a mut self,
74 ids_str: &'a str,
75 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
76 Box::pin(
77 async move {
78 let Some(memory) = self.services.memory.persistence.memory.clone() else {
79 return Ok("Memory not configured.".to_owned());
80 };
81 let ids: Vec<MessageId> = ids_str
82 .split_whitespace()
83 .filter_map(|s| s.parse::<i64>().ok().map(MessageId))
84 .collect();
85 if ids.is_empty() {
86 return Ok(
87 "Usage: /memory promote <id> [id...]\nExample: /memory promote 42 43 44"
88 .to_owned(),
89 );
90 }
91 match memory.sqlite().manual_promote(&ids).await {
92 Ok(count) => Ok(format!("Promoted {count} message(s) to semantic tier.")),
93 Err(e) => Ok(format!("Promotion failed: {e}")),
94 }
95 }
96 .instrument(tracing::info_span!("core.agent_access.memory_promote")),
97 )
98 }
99
100 fn graph_stats<'a>(
103 &'a mut self,
104 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
105 Box::pin(
106 async move {
107 let (_, store) = match self.resolve_graph_store() {
108 Ok(pair) => pair,
109 Err(msg) => return Ok(msg),
110 };
111
112 let stats_future = async {
113 tokio::join!(
114 store.entity_count(),
115 store.active_edge_count(),
116 store.community_count(),
117 store.edge_type_distribution()
118 )
119 };
120 let Ok((entities, edges, communities, distribution)) =
121 tokio::time::timeout(Duration::from_secs(5), stats_future).await
122 else {
123 tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
124 return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
125 };
126 let mut msg = format!(
127 "Graph memory: {} entities, {} edges, {} communities",
128 entities.unwrap_or(0),
129 edges.unwrap_or(0),
130 communities.unwrap_or(0)
131 );
132 if let Ok(dist) = distribution
133 && !dist.is_empty()
134 {
135 let dist_str: Vec<String> =
136 dist.iter().map(|(t, c)| format!("{t}={c}")).collect();
137 write!(msg, "\nEdge types: {}", dist_str.join(", ")).unwrap_or(());
138 }
139 Ok(msg)
140 }
141 .instrument(tracing::info_span!("core.agent_access.graph_stats")),
142 )
143 }
144
145 fn graph_entities<'a>(
146 &'a mut self,
147 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
148 Box::pin(
149 async move {
150 let (_, store) = match self.resolve_graph_store() {
151 Ok(pair) => pair,
152 Err(msg) => return Ok(msg),
153 };
154
155 let entities = match tokio::time::timeout(
156 Duration::from_secs(5),
157 store.all_entities(),
158 )
159 .await
160 {
161 Ok(Ok(v)) => v,
162 Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
163 Err(_) => {
164 tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
165 return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
166 }
167 };
168 if entities.is_empty() {
169 return Ok("No entities found.".to_owned());
170 }
171
172 let total = entities.len();
173 let display: Vec<String> = entities
174 .iter()
175 .take(50)
176 .map(|e| {
177 format!(
178 " {:<40} {:<15} {}",
179 e.name,
180 e.entity_type.as_str(),
181 e.last_seen_at.split('T').next().unwrap_or(&e.last_seen_at)
182 )
183 })
184 .collect();
185 let mut msg = format!(
186 "Entities ({total} total):\n {:<40} {:<15} {}\n{}",
187 "NAME",
188 "TYPE",
189 "LAST SEEN",
190 display.join("\n")
191 );
192 if total > 50 {
193 write!(msg, "\n ...and {} more", total - 50).unwrap_or(());
194 }
195 Ok(msg)
196 }
197 .instrument(tracing::info_span!("core.agent_access.graph_entities")),
198 )
199 }
200
201 fn graph_facts<'a>(
202 &'a mut self,
203 name: &'a str,
204 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
205 Box::pin(
206 async move {
207 let (_, store) = match self.resolve_graph_store() {
208 Ok(pair) => pair,
209 Err(msg) => return Ok(msg),
210 };
211
212 let matches = match tokio::time::timeout(
213 Duration::from_secs(5),
214 store.find_entity_by_name(name),
215 )
216 .await
217 {
218 Ok(Ok(v)) => v,
219 Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
220 Err(_) => {
221 tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
222 return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
223 }
224 };
225 if matches.is_empty() {
226 return Ok(format!("No entity found matching '{name}'."));
227 }
228
229 let entity = &matches[0];
230 let edges = match tokio::time::timeout(
231 Duration::from_secs(5),
232 store.edges_for_entity(entity.id.0),
233 )
234 .await
235 {
236 Ok(Ok(v)) => v,
237 Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
238 Err(_) => {
239 tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
240 return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
241 }
242 };
243 if edges.is_empty() {
244 return Ok(format!("Entity '{}' has no known facts.", entity.name));
245 }
246
247 let mut entity_names: std::collections::HashMap<i64, String> =
248 std::collections::HashMap::new();
249 entity_names.insert(entity.id.0, entity.name.clone());
250 for edge in &edges {
251 let other_id = if edge.source_entity_id == entity.id.0 {
252 edge.target_entity_id
253 } else {
254 edge.source_entity_id
255 };
256 entity_names.entry(other_id).or_default();
257 }
258 for (&id, name_val) in &mut entity_names {
259 if name_val.is_empty() {
260 let result = tokio::time::timeout(
261 Duration::from_secs(5),
262 store.find_entity_by_id(id),
263 )
264 .await;
265 if let Ok(Ok(Some(other))) = result {
266 *name_val = other.name;
267 } else {
268 *name_val = format!("#{id}");
269 }
270 }
271 }
272
273 let lines: Vec<String> = edges
274 .iter()
275 .map(|e| {
276 let src = entity_names
277 .get(&e.source_entity_id)
278 .cloned()
279 .unwrap_or_else(|| format!("#{}", e.source_entity_id));
280 let tgt = entity_names
281 .get(&e.target_entity_id)
282 .cloned()
283 .unwrap_or_else(|| format!("#{}", e.target_entity_id));
284 format!(
285 " {} --[{}/{}]--> {}: {} (confidence: {:.2})",
286 src, e.relation, e.edge_type, tgt, e.fact, e.confidence
287 )
288 })
289 .collect();
290 Ok(format!(
291 "Facts for '{}':\n{}",
292 entity.name,
293 lines.join("\n")
294 ))
295 }
296 .instrument(tracing::info_span!("core.agent_access.graph_facts")),
297 )
298 }
299
300 fn graph_history<'a>(
301 &'a mut self,
302 name: &'a str,
303 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
304 Box::pin(
305 async move {
306 let (_, store) = match self.resolve_graph_store() {
307 Ok(pair) => pair,
308 Err(msg) => return Ok(msg),
309 };
310
311 let matches = match tokio::time::timeout(
312 Duration::from_secs(5),
313 store.find_entity_by_name(name),
314 )
315 .await
316 {
317 Ok(Ok(v)) => v,
318 Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
319 Err(_) => {
320 tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
321 return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
322 }
323 };
324 if matches.is_empty() {
325 return Ok(format!("No entity found matching '{name}'."));
326 }
327
328 let entity = &matches[0];
329 let edges = match tokio::time::timeout(
330 Duration::from_secs(5),
331 store.edge_history_for_entity(entity.id.0, 50),
332 )
333 .await
334 {
335 Ok(Ok(v)) => v,
336 Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
337 Err(_) => {
338 tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
339 return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
340 }
341 };
342 if edges.is_empty() {
343 return Ok(format!("Entity '{}' has no edge history.", entity.name));
344 }
345
346 let mut entity_names: std::collections::HashMap<i64, String> =
347 std::collections::HashMap::new();
348 entity_names.insert(entity.id.0, entity.name.clone());
349 for edge in &edges {
350 for &id in &[edge.source_entity_id, edge.target_entity_id] {
351 entity_names.entry(id).or_default();
352 }
353 }
354 for (&id, name_val) in &mut entity_names {
355 if name_val.is_empty() {
356 let result = tokio::time::timeout(
357 Duration::from_secs(5),
358 store.find_entity_by_id(id),
359 )
360 .await;
361 if let Ok(Ok(Some(other))) = result {
362 *name_val = other.name;
363 } else {
364 *name_val = format!("#{id}");
365 }
366 }
367 }
368
369 let n = edges.len();
370 let lines: Vec<String> = edges
371 .iter()
372 .map(|e| {
373 let status = if e.valid_to.is_some() {
374 let date = e
375 .valid_to
376 .as_deref()
377 .and_then(|s| s.split('T').next().or_else(|| s.split(' ').next()))
378 .unwrap_or("?");
379 format!("[expired {date}]")
380 } else {
381 "[active]".to_string()
382 };
383 let src = entity_names
384 .get(&e.source_entity_id)
385 .cloned()
386 .unwrap_or_else(|| format!("#{}", e.source_entity_id));
387 let tgt = entity_names
388 .get(&e.target_entity_id)
389 .cloned()
390 .unwrap_or_else(|| format!("#{}", e.target_entity_id));
391 format!(
392 " {status} {} --[{}/{}]--> {}: {} (confidence: {:.2})",
393 src, e.relation, e.edge_type, tgt, e.fact, e.confidence
394 )
395 })
396 .collect();
397 Ok(format!(
398 "Edge history for '{}' ({n} edges):\n{}",
399 entity.name,
400 lines.join("\n")
401 ))
402 }
403 .instrument(tracing::info_span!("core.agent_access.graph_history")),
404 )
405 }
406
407 fn graph_communities<'a>(
408 &'a mut self,
409 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
410 Box::pin(
411 async move {
412 let (_, store) = match self.resolve_graph_store() {
413 Ok(pair) => pair,
414 Err(msg) => return Ok(msg),
415 };
416
417 let communities =
418 match tokio::time::timeout(Duration::from_secs(5), store.all_communities())
419 .await
420 {
421 Ok(Ok(v)) => v,
422 Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
423 Err(_) => {
424 tracing::warn!(
425 "graph store call timed out after 5s (Qdrant unreachable)"
426 );
427 return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
428 }
429 };
430 if communities.is_empty() {
431 return Ok("No communities detected yet. Run graph backfill first.".to_owned());
432 }
433
434 let lines: Vec<String> = communities
435 .iter()
436 .map(|c| format!(" [{}]: {}", c.name, c.summary))
437 .collect();
438 Ok(format!(
439 "Communities ({}):\n{}",
440 communities.len(),
441 lines.join("\n")
442 ))
443 }
444 .instrument(tracing::info_span!("core.agent_access.graph_communities")),
445 )
446 }
447
448 #[allow(clippy::too_many_lines)]
449 fn graph_backfill<'a>(
450 &'a mut self,
451 limit: Option<usize>,
452 progress_cb: &'a mut (dyn FnMut(String) + Send),
453 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
454 let store = match self.resolve_graph_store() {
455 Ok((_, s)) => s,
456 Err(msg) => return Box::pin(async move { Ok(msg) }),
457 };
458 let graph_cfg = self.services.memory.extraction.graph_config.clone();
459 let embed_timeout_secs = self
460 .services
461 .memory
462 .persistence
463 .memory
464 .as_ref()
465 .map_or(5, |m| m.embed_timeout().as_secs());
466 let provider = if graph_cfg.extract_provider.as_str().is_empty() {
467 self.provider.clone()
468 } else {
469 self.resolve_background_provider(graph_cfg.extract_provider.as_str())
470 };
471 Box::pin(
472 async move {
473 let total = store.unprocessed_message_count().await.unwrap_or(0);
474 let cap = limit.unwrap_or(usize::MAX);
475
476 progress_cb(format!(
477 "Starting graph backfill... ({total} unprocessed messages)"
478 ));
479
480 let batch_size = 50usize;
481 let mut processed = 0usize;
482 let mut total_entities = 0usize;
483 let mut total_edges = 0usize;
484
485 loop {
486 let remaining_cap = cap.saturating_sub(processed);
487 if remaining_cap == 0 {
488 break;
489 }
490 let batch_limit = batch_size.min(remaining_cap);
491 let messages = store
492 .unprocessed_messages_for_backfill(batch_limit)
493 .await
494 .map_err(|e| CommandError::new(e.to_string()))?;
495 if messages.is_empty() {
496 break;
497 }
498
499 let ids: Vec<zeph_memory::types::MessageId> =
500 messages.iter().map(|(id, _)| *id).collect();
501
502 for (_id, content) in &messages {
503 if content.trim().is_empty() {
504 continue;
505 }
506 let extraction_cfg = GraphExtractionConfig {
507 max_entities: graph_cfg.max_entities_per_message,
508 max_edges: graph_cfg.max_edges_per_message,
509 extraction_timeout_secs: graph_cfg.extraction_timeout_secs,
510 community_refresh_interval: 0,
511 expired_edge_retention_days: graph_cfg.expired_edge_retention_days,
512 max_entities_cap: graph_cfg.max_entities,
513 community_summary_max_prompt_bytes: graph_cfg
514 .community_summary_max_prompt_bytes,
515 community_summary_concurrency: graph_cfg.community_summary_concurrency,
516 lpa_edge_chunk_size: graph_cfg.lpa_edge_chunk_size,
517 note_linking: zeph_memory::NoteLinkingConfig::default(),
518 link_weight_decay_lambda: graph_cfg.link_weight_decay_lambda,
519 link_weight_decay_interval_secs: graph_cfg
520 .link_weight_decay_interval_secs,
521 belief_revision_enabled: graph_cfg.belief_revision.enabled,
522 belief_revision_similarity_threshold: graph_cfg
523 .belief_revision
524 .similarity_threshold,
525 conversation_id: None,
526 apex_mem_enabled: graph_cfg.apex_mem.enabled,
527 llm_timeout_secs: graph_cfg.llm_timeout_secs,
528 embed_timeout_secs,
529 };
530 let pool = store.pool().clone();
531 match extract_and_store(
532 content.clone(),
533 vec![],
534 provider.clone(),
535 pool,
536 extraction_cfg,
537 None,
538 None,
539 )
540 .await
541 {
542 Ok(result) => {
543 total_entities += result.stats.entities_upserted;
544 total_edges += result.stats.edges_inserted;
545 }
546 Err(e) => {
547 tracing::warn!("backfill extraction error: {e:#}");
548 }
549 }
550 }
551
552 store
553 .mark_messages_graph_processed(&ids)
554 .await
555 .map_err(|e| CommandError::new(e.to_string()))?;
556 processed += messages.len();
557
558 progress_cb(format!(
559 "Backfill progress: {processed} messages processed, \
560 {total_entities} entities, {total_edges} edges"
561 ));
562 }
563
564 Ok(format!(
565 "Backfill complete: {total_entities} entities, {total_edges} edges \
566 extracted from {processed} messages"
567 ))
568 }
569 .instrument(tracing::info_span!("core.agent_access.graph_backfill")),
570 )
571 }
572
573 fn guidelines<'a>(
576 &'a mut self,
577 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
578 Box::pin(
579 async move {
580 const MAX_DISPLAY_CHARS: usize = 4096;
581
582 let Some(memory) = &self.services.memory.persistence.memory else {
583 return Ok("No memory backend initialised.".to_owned());
584 };
585
586 let cid = self.services.memory.persistence.conversation_id;
587 let sqlite = memory.sqlite();
588
589 let (version, text) = sqlite
590 .load_compression_guidelines(cid)
591 .await
592 .map_err(|e: zeph_memory::MemoryError| CommandError::new(e.to_string()))?;
593
594 if version == 0 || text.is_empty() {
595 return Ok("No compression guidelines generated yet.".to_owned());
596 }
597
598 let (_, created_at) = sqlite
599 .load_compression_guidelines_meta(cid)
600 .await
601 .unwrap_or((0, String::new()));
602
603 let (body, truncated) = if text.len() > MAX_DISPLAY_CHARS {
604 let end = text.floor_char_boundary(MAX_DISPLAY_CHARS);
605 (&text[..end], true)
606 } else {
607 (text.as_str(), false)
608 };
609
610 let mut output =
611 format!("Compression Guidelines (v{version}, updated {created_at}):\n\n{body}");
612 if truncated {
613 output.push_str("\n\n[truncated]");
614 }
615 Ok(output)
616 }
617 .instrument(tracing::info_span!("core.agent_access.guidelines")),
618 )
619 }
620
621 fn handle_model<'a>(
624 &'a mut self,
625 arg: &'a str,
626 ) -> Pin<Box<dyn Future<Output = String> + Send + 'a>> {
627 Box::pin(async move {
628 let input = if arg.is_empty() {
629 "/model".to_owned()
630 } else {
631 format!("/model {arg}")
632 };
633 self.handle_model_command_as_string(&input).await
634 })
635 }
636
637 fn handle_provider<'a>(
638 &'a mut self,
639 arg: &'a str,
640 ) -> Pin<Box<dyn Future<Output = String> + Send + 'a>> {
641 Box::pin(async move { self.handle_provider_command_as_string(arg).await })
642 }
643
644 fn handle_policy<'a>(
647 &'a mut self,
648 args: &'a str,
649 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
650 Box::pin(async move { Ok(self.handle_policy_command_as_string(args)) })
651 }
652
653 #[cfg(feature = "scheduler")]
656 fn list_scheduled_tasks<'a>(
657 &'a mut self,
658 ) -> Pin<Box<dyn Future<Output = Result<Option<String>, CommandError>> + Send + 'a>> {
659 Box::pin(async move {
660 let result = self
661 .handle_scheduler_list_as_string()
662 .await
663 .map_err(|e| CommandError::new(e.to_string()))?;
664 Ok(Some(result))
665 })
666 }
667
668 #[cfg(not(feature = "scheduler"))]
669 fn list_scheduled_tasks<'a>(
670 &'a mut self,
671 ) -> Pin<Box<dyn Future<Output = Result<Option<String>, CommandError>> + Send + 'a>> {
672 Box::pin(async move { Ok(None) })
673 }
674
675 fn lsp_status<'a>(
678 &'a mut self,
679 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
680 Box::pin(async move {
681 self.handle_lsp_status_as_string()
682 .await
683 .map_err(|e| CommandError::new(e.to_string()))
684 })
685 }
686
687 fn session_recap<'a>(
690 &'a mut self,
691 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
692 Box::pin(
693 async move {
694 match self.build_recap().await {
695 Ok(text) => Ok(text),
696 Err(e) => {
697 tracing::warn!("session recap command: {}", e.0);
701 Ok("Recap unavailable — see logs for details".to_string())
702 }
703 }
704 }
705 .instrument(tracing::info_span!("core.agent_access.session_recap")),
706 )
707 }
708
709 fn compact_context<'a>(
712 &'a mut self,
713 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
714 Box::pin(
715 self.compact_context_command()
716 .instrument(tracing::info_span!("core.agent_access.compact_context")),
717 )
718 }
719
720 fn reset_conversation<'a>(
723 &'a mut self,
724 keep_plan: bool,
725 no_digest: bool,
726 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
727 Box::pin(async move {
728 match self.reset_conversation(keep_plan, no_digest).await {
729 Ok((old_id, new_id)) => {
730 let old = old_id.map_or_else(|| "none".to_string(), |id| id.0.to_string());
731 let new = new_id.map_or_else(|| "none".to_string(), |id| id.0.to_string());
732 let keep_note = if keep_plan { " (plan preserved)" } else { "" };
733 Ok(format!(
734 "New conversation started. Previous: {old} → Current: {new}{keep_note}"
735 ))
736 }
737 Err(e) => Ok(format!("Failed to start new conversation: {e}")),
738 }
739 })
740 }
741
742 fn cache_stats(&self) -> String {
745 self.tool_orchestrator.cache_stats()
746 }
747
748 fn session_status<'a>(
751 &'a mut self,
752 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
753 Box::pin(async move { Ok(self.handle_status_as_string()) })
754 }
755
756 fn guardrail_status(&self) -> String {
759 self.format_guardrail_status()
760 }
761
762 fn focus_status(&self) -> String {
765 self.format_focus_status()
766 }
767
768 fn sidequest_status(&self) -> String {
771 self.format_sidequest_status()
772 }
773
774 fn load_image<'a>(
777 &'a mut self,
778 path: &'a str,
779 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
780 Box::pin(async move { Ok(self.handle_image_as_string(path)) })
781 }
782
783 fn handle_mcp<'a>(
786 &'a mut self,
787 args: &'a str,
788 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
789 let args_owned = args.to_owned();
792 let parts: Vec<String> = args_owned.split_whitespace().map(str::to_owned).collect();
793 let sub = parts.first().cloned().unwrap_or_default();
794
795 match sub.as_str() {
796 "list" => {
797 let manager = self.services.mcp.manager.clone();
799 let tools_snapshot: Vec<(String, String)> = self
800 .services
801 .mcp
802 .tools
803 .iter()
804 .map(|t| (t.server_id.clone(), t.name.clone()))
805 .collect();
806 Box::pin(async move {
807 use std::fmt::Write;
808 let Some(manager) = manager else {
809 return Ok("MCP is not enabled.".to_owned());
810 };
811 let server_ids = manager.list_servers().await;
812 if server_ids.is_empty() {
813 return Ok("No MCP servers connected.".to_owned());
814 }
815 let mut output = String::from("Connected MCP servers:\n");
816 let mut total = 0usize;
817 for id in &server_ids {
818 let count = tools_snapshot.iter().filter(|(sid, _)| sid == id).count();
819 total += count;
820 let _ = writeln!(output, "- {id} ({count} tools)");
821 }
822 let _ = write!(output, "Total: {total} tool(s)");
823 Ok(output)
824 })
825 }
826 "tools" => {
827 let server_id = parts.get(1).cloned();
829 let owned_tools: Vec<(String, String)> = if let Some(ref sid) = server_id {
830 self.services
831 .mcp
832 .tools
833 .iter()
834 .filter(|t| &t.server_id == sid)
835 .map(|t| (t.name.clone(), t.description.clone()))
836 .collect()
837 } else {
838 Vec::new()
839 };
840 Box::pin(async move {
841 use std::fmt::Write;
842 let Some(server_id) = server_id else {
843 return Ok("Usage: /mcp tools <server_id>".to_owned());
844 };
845 if owned_tools.is_empty() {
846 return Ok(format!("No tools found for server '{server_id}'."));
847 }
848 let mut output =
849 format!("Tools for '{server_id}' ({} total):\n", owned_tools.len());
850 for (name, desc) in &owned_tools {
851 if desc.is_empty() {
852 let _ = writeln!(output, "- {name}");
853 } else {
854 let _ = writeln!(output, "- {name} — {desc}");
855 }
856 }
857 Ok(output)
858 })
859 }
860 _ => Box::pin(async move {
867 self.handle_mcp_command(&args_owned)
868 .await
869 .map_err(|e| CommandError::new(e.to_string()))
870 }),
871 }
872 }
873
874 fn handle_skill<'a>(
877 &'a mut self,
878 args: &'a str,
879 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
880 let args_owned = args.to_owned();
881 Box::pin(async move {
882 self.handle_skill_command_as_string(&args_owned)
883 .await
884 .map_err(|e| CommandError::new(e.to_string()))
885 })
886 }
887
888 fn handle_skills<'a>(
891 &'a mut self,
892 args: &'a str,
893 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
894 let args_owned = args.to_owned();
895 Box::pin(async move {
896 self.handle_skills_as_string(&args_owned)
897 .await
898 .map_err(|e| CommandError::new(e.to_string()))
899 })
900 }
901
902 fn handle_feedback_command<'a>(
905 &'a mut self,
906 args: &'a str,
907 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
908 let args_owned = args.to_owned();
909 Box::pin(async move {
910 self.handle_feedback_as_string(&args_owned)
911 .await
912 .map_err(|e| CommandError::new(e.to_string()))
913 })
914 }
915
916 #[cfg(feature = "scheduler")]
919 fn handle_plan<'a>(
920 &'a mut self,
921 input: &'a str,
922 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
923 Box::pin(async move {
924 self.dispatch_plan_command_as_string(input)
925 .await
926 .map_err(|e| CommandError::new(e.to_string()))
927 })
928 }
929
930 #[cfg(not(feature = "scheduler"))]
931 fn handle_plan<'a>(
932 &'a mut self,
933 _input: &'a str,
934 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
935 Box::pin(async move { Ok(String::new()) })
936 }
937
938 fn handle_experiment<'a>(
941 &'a mut self,
942 input: &'a str,
943 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
944 Box::pin(async move {
945 self.handle_experiment_command_as_string(input)
946 .await
947 .map_err(|e| CommandError::new(e.to_string()))
948 })
949 }
950
951 fn handle_agent_dispatch<'a>(
954 &'a mut self,
955 input: &'a str,
956 ) -> Pin<Box<dyn Future<Output = Result<Option<String>, CommandError>> + Send + 'a>> {
957 Box::pin(async move {
958 match self.dispatch_agent_command(input).await {
959 Some(Err(e)) => Err(CommandError::new(e.to_string())),
960 Some(Ok(())) | None => Ok(None),
961 }
962 })
963 }
964
965 fn handle_plugins<'a>(
968 &'a mut self,
969 args: &'a str,
970 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
971 let args_owned = args.to_owned();
972 let managed_dir = self.services.skill.managed_dir.clone();
975 let mcp_allowed = self.services.mcp.allowed_commands.clone();
976 let base_shell_allowed = self.runtime.lifecycle.startup_shell_overlay.allowed.clone();
977 Box::pin(async move {
978 tokio::task::spawn_blocking(move || {
981 Self::run_plugin_command(&args_owned, managed_dir, mcp_allowed, base_shell_allowed)
982 })
983 .await
984 .map_err(|e| CommandError(format!("plugin task panicked: {e}")))
985 })
986 }
987
988 fn handle_acp<'a>(
991 &'a mut self,
992 args: &'a str,
993 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
994 Box::pin(async move {
995 self.handle_acp_as_string(args)
996 .map_err(|e| CommandError::new(e.to_string()))
997 })
998 }
999
1000 #[cfg(feature = "cocoon")]
1003 fn handle_cocoon<'a>(
1004 &'a mut self,
1005 args: &'a str,
1006 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1007 Box::pin(async move {
1008 self.handle_cocoon_as_string(args)
1009 .await
1010 .map_err(|e| CommandError::new(e.to_string()))
1011 })
1012 }
1013
1014 #[cfg(not(feature = "cocoon"))]
1015 fn handle_cocoon<'a>(
1016 &'a mut self,
1017 _args: &'a str,
1018 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1019 Box::pin(async {
1020 Ok("Cocoon support is not compiled in. Rebuild with `--features cocoon`.".to_owned())
1021 })
1022 }
1023
1024 fn handle_loop<'a>(
1027 &'a mut self,
1028 args: &'a str,
1029 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1030 use zeph_commands::handlers::loop_cmd::parse_loop_args;
1031
1032 let args_owned = args.trim().to_owned();
1033 Box::pin(async move {
1034 if args_owned == "stop" {
1035 return Ok(self.stop_user_loop());
1036 }
1037 if args_owned == "status" {
1038 return Ok(match &self.runtime.lifecycle.user_loop {
1039 Some(ls) => format!(
1040 "Loop active: \"{}\" (iteration {}, interval every {}s).",
1041 ls.prompt,
1042 ls.iteration,
1043 ls.interval.period().as_secs(),
1044 ),
1045 None => "No active loop.".to_owned(),
1046 });
1047 }
1048 let (prompt, interval_secs) = parse_loop_args(&args_owned)?;
1049
1050 if prompt.starts_with('/') {
1051 return Err(CommandError::new(
1052 "Loop prompt must not start with '/'. Slash commands cannot be used as loop prompts.",
1053 ));
1054 }
1055
1056 let min_secs = self.runtime.config.loop_min_interval_secs;
1057 if interval_secs < min_secs {
1058 return Err(CommandError::new(format!(
1059 "Minimum loop interval is {min_secs}s. Got {interval_secs}s."
1060 )));
1061 }
1062 if self.runtime.lifecycle.user_loop.is_some() {
1063 return Err(CommandError::new(
1064 "A loop is already active. Use /loop stop first.",
1065 ));
1066 }
1067
1068 self.start_user_loop(prompt.clone(), interval_secs);
1069 Ok(format!(
1070 "Loop started: \"{prompt}\" every {interval_secs}s. Use /loop stop to cancel."
1071 ))
1072 })
1073 }
1074
1075 fn notify_test<'a>(
1076 &'a mut self,
1077 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1078 let notifier = self.runtime.lifecycle.notifier.clone();
1079 Box::pin(async move {
1080 let Some(notifier) = notifier else {
1081 return Ok(
1082 "Notifications are disabled. Set `notifications.enabled = true` in config."
1083 .to_owned(),
1084 );
1085 };
1086 match notifier.fire_test().await {
1087 Ok(()) => Ok("Test notification sent.".to_owned()),
1088 Err(e) => Err(CommandError::new(format!("notification test failed: {e}"))),
1089 }
1090 })
1091 }
1092
1093 fn handle_trajectory(&mut self, args: &str) -> String {
1094 self.handle_trajectory_command_as_string(args)
1095 }
1096
1097 fn handle_scope(&self, args: &str) -> String {
1098 self.handle_scope_command_as_string(args)
1099 }
1100
1101 fn handle_goal<'a>(
1104 &'a mut self,
1105 args: &'a str,
1106 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1107 if self.services.goal_accounting.is_none() {
1109 if !self.runtime.config.goals.enabled {
1110 return Box::pin(async {
1111 Ok("Goals are disabled. Set `[goals] enabled = true` in config.".to_owned())
1112 });
1113 }
1114 let pool = match self.services.memory.persistence.memory.as_ref() {
1115 Some(m) => std::sync::Arc::new(m.sqlite().pool().clone()),
1116 None => {
1117 return Box::pin(async {
1118 Ok("Goals require a database backend (memory not configured).".to_owned())
1119 });
1120 }
1121 };
1122 let store = std::sync::Arc::new(crate::goal::GoalStore::new(pool));
1123 let accounting = std::sync::Arc::new(crate::goal::GoalAccounting::new(store));
1124 self.services.goal_accounting = Some(accounting);
1125 }
1126
1127 let accounting =
1128 self.services.goal_accounting.clone().expect(
1129 "invariant: goal_accounting is always Some at this point (initialized above)",
1130 );
1131 let max_chars = self.runtime.config.goals.max_text_chars;
1132 let default_budget = self.runtime.config.goals.default_token_budget;
1133 let autonomous_enabled = self.runtime.config.goals.autonomous_enabled;
1134 let autonomous_max_turns = self.runtime.config.goals.autonomous_max_turns;
1135 let args_owned = args.to_owned();
1136
1137 let pending_start_arc = std::sync::Arc::clone(&self.services.autonomous.pending_start_arc);
1142
1143 Box::pin(async move {
1144 let _ = accounting.refresh().await;
1145 let store = accounting.get_store();
1146 let args = args_owned.as_str();
1147
1148 match args {
1149 "" | "status" => goal_status(&accounting).await,
1150 "pause" => goal_pause(&accounting, &store).await,
1151 "resume" => goal_resume(&accounting, &store).await,
1152 "complete" => goal_complete(&accounting, &store).await,
1153 "clear" => goal_clear(&accounting, &store).await,
1154 "list" => goal_list(&store).await,
1155 _ if args.starts_with("create") => {
1156 let (msg, auto_req) = goal_create(
1157 args,
1158 &accounting,
1159 &store,
1160 max_chars,
1161 default_budget,
1162 autonomous_enabled,
1163 autonomous_max_turns,
1164 )
1165 .await?;
1166 if let Some(req) = auto_req {
1167 *pending_start_arc.lock() = Some(req);
1168 }
1169 Ok(msg)
1170 }
1171 _ => Ok(
1172 "Unknown /goal subcommand. Try: create, pause, resume, complete, clear, status, list."
1173 .to_owned(),
1174 ),
1175 }
1176 })
1177 }
1178
1179 fn active_goal_snapshot(&self) -> Option<zeph_commands::GoalSnapshot> {
1180 let accounting = self.services.goal_accounting.as_ref()?;
1181 let snap = accounting.snapshot()?;
1182 Some(zeph_commands::GoalSnapshot {
1183 id: snap.id,
1184 text: snap.text,
1185 status: match snap.status {
1186 crate::goal::GoalStatus::Active => zeph_commands::GoalStatusView::Active,
1187 crate::goal::GoalStatus::Paused => zeph_commands::GoalStatusView::Paused,
1188 crate::goal::GoalStatus::Completed => zeph_commands::GoalStatusView::Completed,
1189 crate::goal::GoalStatus::Cleared => zeph_commands::GoalStatusView::Cleared,
1190 },
1191 turns_used: snap.turns_used,
1192 tokens_used: snap.tokens_used,
1193 token_budget: snap.token_budget,
1194 })
1195 }
1196
1197 fn handle_agents<'a>(
1200 &'a mut self,
1201 args: &'a str,
1202 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1203 use zeph_commands::handlers::agents_fleet::{FleetEntry, format_fleet_section};
1204 use zeph_subagent::AgentsCommand;
1205
1206 let args_owned = args.trim().to_owned();
1207 Box::pin(async move {
1208 let show_fleet = args_owned.is_empty() || args_owned == "fleet";
1210
1211 let fleet_section = if show_fleet {
1212 let snapshots = self.services.autonomous_registry.list();
1213 let entries: Vec<FleetEntry> = snapshots
1214 .into_iter()
1215 .map(|s| FleetEntry {
1216 goal_id: s.goal_id,
1217 goal_text_short: s.goal_text_short,
1218 state: s.state,
1219 turns_executed: s.turns_executed,
1220 max_turns: s.max_turns,
1221 elapsed: s.elapsed,
1222 })
1223 .collect();
1224 format_fleet_section(&entries)
1225 } else {
1226 String::new()
1227 };
1228
1229 let definitions_section = if show_fleet || args_owned == "list" {
1231 self.handle_agents_definitions_list()
1232 } else {
1233 match AgentsCommand::parse(&format!("/agents {args_owned}")) {
1235 Ok(cmd) => self.handle_agents_crud(cmd),
1236 Err(e) => e.to_string(),
1237 }
1238 };
1239
1240 let mut out = fleet_section;
1241 if !definitions_section.is_empty() {
1242 if !out.is_empty() {
1243 out.push('\n');
1244 }
1245 out.push_str(&definitions_section);
1246 }
1247
1248 if out.is_empty() {
1249 "No active autonomous sessions or sub-agent definitions found."
1250 .clone_into(&mut out);
1251 }
1252
1253 Ok(out)
1254 })
1255 }
1256}
1257
1258type GoalStore = crate::goal::GoalStore;
1259type GoalAccounting = crate::goal::GoalAccounting;
1260
1261const AUTONOMOUS_MAX_TURNS_CAP: u32 = 1000;
1263
1264async fn goal_status(accounting: &GoalAccounting) -> Result<String, CommandError> {
1265 match accounting.get_active().await {
1266 Ok(Some(g)) => {
1267 let budget_line = g.token_budget.map_or_else(
1268 || format!(" tokens used: {}", g.tokens_used),
1269 |b| format!(" budget: {}/{b}", g.tokens_used),
1270 );
1271 Ok(format!(
1272 "Active goal [{}]: {}\n status: {}\n turns: {}\n{}",
1273 &g.id[..8],
1274 g.text,
1275 g.status,
1276 g.turns_used,
1277 budget_line
1278 ))
1279 }
1280 Ok(None) => Ok("No active goal. Use `/goal create <text>` to set one.".to_owned()),
1281 Err(e) => Ok(format!("Goal lookup failed: {e}")),
1282 }
1283}
1284
1285async fn goal_create(
1291 args: &str,
1292 accounting: &GoalAccounting,
1293 store: &GoalStore,
1294 max_chars: usize,
1295 default_budget: Option<u64>,
1296 autonomous_enabled: bool,
1297 autonomous_max_turns: u32,
1298) -> Result<(String, Option<(String, String, u32)>), CommandError> {
1299 let rest = args.strip_prefix("create").unwrap_or("").trim();
1300
1301 let (stripped, is_auto, explicit_turns) = parse_auto_flags(rest);
1303 let (text, explicit_budget) = parse_goal_create_args(&stripped);
1304
1305 if text.is_empty() {
1306 return Ok((
1307 "Usage: /goal create <text> [--budget N] [--auto [--turns N]]".to_owned(),
1308 None,
1309 ));
1310 }
1311 if is_auto && !autonomous_enabled {
1312 return Ok((
1313 "Autonomous mode is disabled. Set `[goals] autonomous_enabled = true` in config."
1314 .to_owned(),
1315 None,
1316 ));
1317 }
1318 let budget = explicit_budget.or(default_budget.filter(|&b| b > 0));
1319
1320 let max_turns = explicit_turns
1321 .unwrap_or(autonomous_max_turns)
1322 .min(AUTONOMOUS_MAX_TURNS_CAP);
1323 if explicit_turns.is_some_and(|t| t > AUTONOMOUS_MAX_TURNS_CAP) {
1324 tracing::warn!(
1325 requested = explicit_turns,
1326 capped = AUTONOMOUS_MAX_TURNS_CAP,
1327 "autonomous max_turns capped to {AUTONOMOUS_MAX_TURNS_CAP}"
1328 );
1329 }
1330
1331 match store.create(text, budget, max_chars).await {
1332 Ok(g) => {
1333 let _ = accounting.refresh().await;
1334 let auto_start = if is_auto {
1335 Some((g.id.clone(), g.text.clone(), max_turns))
1336 } else {
1337 None
1338 };
1339 let auto_note = if is_auto {
1340 " Autonomous mode enabled — use `/goal clear` to stop."
1341 } else {
1342 ""
1343 };
1344 Ok((
1345 format!("Goal created [{}]: {}{auto_note}", &g.id[..8], g.text),
1346 auto_start,
1347 ))
1348 }
1349 Err(crate::goal::store::GoalError::TextTooLong { max }) => Ok((
1350 format!("Goal text exceeds {max} characters. Please shorten it."),
1351 None,
1352 )),
1353 Err(e) => Ok((format!("Failed to create goal: {e}"), None)),
1354 }
1355}
1356
1357async fn goal_pause(
1358 accounting: &GoalAccounting,
1359 store: &GoalStore,
1360) -> Result<String, CommandError> {
1361 match accounting.get_active().await {
1362 Ok(Some(g)) => {
1363 match store
1364 .transition(&g.id, crate::goal::GoalStatus::Paused, g.updated_at)
1365 .await
1366 {
1367 Ok(_) => {
1368 let _ = accounting.refresh().await;
1369 Ok(format!("Goal [{}] paused.", &g.id[..8]))
1370 }
1371 Err(crate::goal::store::GoalError::StaleUpdate(_)) => {
1372 let current = accounting.get_active().await.ok().flatten();
1373 Ok(format!(
1374 "Goal state changed concurrently. Current: {}",
1375 current.map_or_else(|| "none".into(), |g| g.status.to_string())
1376 ))
1377 }
1378 Err(e) => Ok(format!("Pause failed: {e}")),
1379 }
1380 }
1381 Ok(None) => Ok("No active goal to pause.".to_owned()),
1382 Err(e) => Ok(format!("Failed: {e}")),
1383 }
1384}
1385
1386async fn goal_resume(
1387 accounting: &GoalAccounting,
1388 store: &GoalStore,
1389) -> Result<String, CommandError> {
1390 let goals = store.list(10).await.unwrap_or_default();
1391 let paused = goals
1392 .into_iter()
1393 .find(|g| g.status == crate::goal::GoalStatus::Paused);
1394 match paused {
1395 Some(g) => {
1396 match store
1397 .transition(&g.id, crate::goal::GoalStatus::Active, g.updated_at)
1398 .await
1399 {
1400 Ok(_) => {
1401 let _ = accounting.refresh().await;
1402 Ok(format!("Goal [{}] resumed: {}", &g.id[..8], g.text))
1403 }
1404 Err(crate::goal::store::GoalError::StaleUpdate(_)) => {
1405 Ok("Goal state changed concurrently — please retry.".to_owned())
1406 }
1407 Err(e) => Ok(format!("Resume failed: {e}")),
1408 }
1409 }
1410 None => Ok("No paused goal to resume.".to_owned()),
1411 }
1412}
1413
1414async fn goal_complete(
1415 accounting: &GoalAccounting,
1416 store: &GoalStore,
1417) -> Result<String, CommandError> {
1418 match accounting.get_active().await {
1419 Ok(Some(g)) => {
1420 match store
1421 .transition(&g.id, crate::goal::GoalStatus::Completed, g.updated_at)
1422 .await
1423 {
1424 Ok(_) => {
1425 let _ = accounting.refresh().await;
1426 Ok(format!("Goal [{}] marked complete.", &g.id[..8]))
1427 }
1428 Err(e) => Ok(format!("Complete failed: {e}")),
1429 }
1430 }
1431 Ok(None) => Ok("No active goal.".to_owned()),
1432 Err(e) => Ok(format!("Failed: {e}")),
1433 }
1434}
1435
1436async fn goal_clear(
1437 accounting: &GoalAccounting,
1438 store: &GoalStore,
1439) -> Result<String, CommandError> {
1440 let goals = store.list(10).await.unwrap_or_default();
1441 let target = goals.into_iter().find(|g| {
1442 g.status == crate::goal::GoalStatus::Active || g.status == crate::goal::GoalStatus::Paused
1443 });
1444 match target {
1445 Some(g) => {
1446 match store
1447 .transition(&g.id, crate::goal::GoalStatus::Cleared, g.updated_at)
1448 .await
1449 {
1450 Ok(_) => {
1451 let _ = accounting.refresh().await;
1452 Ok(format!("Goal [{}] cleared.", &g.id[..8]))
1453 }
1454 Err(e) => Ok(format!("Clear failed: {e}")),
1455 }
1456 }
1457 None => Ok("No active or paused goal to clear.".to_owned()),
1458 }
1459}
1460
1461async fn goal_list(store: &GoalStore) -> Result<String, CommandError> {
1462 let goals = store.list(20).await.unwrap_or_default();
1463 if goals.is_empty() {
1464 return Ok("No goals recorded.".to_owned());
1465 }
1466 let mut out = String::from("Goals:\n");
1467 for g in goals {
1468 let _ = std::fmt::Write::write_fmt(
1469 &mut out,
1470 format_args!(
1471 " {} [{}] {} — {} turns\n",
1472 g.status.badge_symbol(),
1473 &g.id[..8],
1474 g.text,
1475 g.turns_used
1476 ),
1477 );
1478 }
1479 Ok(out.trim_end().to_owned())
1480}
1481
1482fn parse_goal_create_args(args: &str) -> (&str, Option<u64>) {
1483 if let Some(pos) = args.find("--budget") {
1484 let text = args[..pos].trim();
1485 let rest = args[pos + "--budget".len()..].trim();
1486 let budget = rest
1487 .split_whitespace()
1488 .next()
1489 .and_then(|s| s.parse::<u64>().ok());
1490 (text, budget)
1491 } else {
1492 (args, None)
1493 }
1494}
1495
1496fn parse_auto_flags(args: &str) -> (String, bool, Option<u32>) {
1500 let mut is_auto = false;
1501 let mut turns: Option<u32> = None;
1502 let mut text_words: Vec<&str> = Vec::new();
1503 let mut words = args.split_whitespace();
1504
1505 while let Some(w) = words.next() {
1506 if w == "--auto" {
1507 is_auto = true;
1508 } else if w == "--turns" {
1509 turns = words.next().and_then(|n| n.parse::<u32>().ok());
1510 } else {
1511 text_words.push(w);
1512 }
1513 }
1514
1515 (text_words.join(" "), is_auto, turns)
1516}
1517
1518impl From<AgentError> for CommandError {
1520 fn from(e: AgentError) -> Self {
1521 Self(e.to_string())
1522 }
1523}
1524
1525#[cfg(test)]
1526mod tests {
1527 use super::super::agent_tests::{
1528 MockChannel, MockToolExecutor, create_test_registry, mock_provider,
1529 };
1530 use super::*;
1531 use zeph_commands::traits::agent::AgentAccess;
1532 use zeph_memory::semantic::SemanticMemory;
1533
1534 async fn memory_without_qdrant() -> SemanticMemory {
1535 SemanticMemory::new(
1536 ":memory:",
1537 "http://127.0.0.1:1",
1538 None,
1539 zeph_llm::any::AnyProvider::Mock(zeph_llm::mock::MockProvider::default()),
1540 "test-model",
1541 )
1542 .await
1543 .unwrap()
1544 }
1545
1546 #[tokio::test]
1550 async fn graph_stats_enabled_but_no_store_reports_unavailable() {
1551 let cfg = crate::config::GraphConfig {
1552 enabled: true,
1553 ..Default::default()
1554 };
1555 let memory = memory_without_qdrant().await;
1556 let cid = memory.sqlite().create_conversation().await.unwrap();
1557 let mut agent = Agent::new(
1558 mock_provider(vec![]),
1559 MockChannel::new(vec![]),
1560 create_test_registry(),
1561 None,
1562 5,
1563 MockToolExecutor::no_tools(),
1564 )
1565 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1566 .with_graph_config(cfg);
1567
1568 let result = agent.graph_stats().await.unwrap();
1569 assert!(
1570 result.contains("unavailable"),
1571 "expected 'unavailable' but got: {result}"
1572 );
1573 assert!(
1574 !result.contains("not enabled"),
1575 "must not report 'not enabled' when graph is enabled: {result}"
1576 );
1577 }
1578
1579 #[tokio::test]
1580 async fn graph_stats_disabled_reports_not_enabled() {
1581 let cfg = crate::config::GraphConfig {
1582 enabled: false,
1583 ..Default::default()
1584 };
1585 let memory = memory_without_qdrant().await;
1586 let cid = memory.sqlite().create_conversation().await.unwrap();
1587 let mut agent = Agent::new(
1588 mock_provider(vec![]),
1589 MockChannel::new(vec![]),
1590 create_test_registry(),
1591 None,
1592 5,
1593 MockToolExecutor::no_tools(),
1594 )
1595 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1596 .with_graph_config(cfg);
1597
1598 let result = agent.graph_stats().await.unwrap();
1599 assert!(
1600 result.contains("not enabled"),
1601 "expected 'not enabled' but got: {result}"
1602 );
1603 }
1604
1605 #[tokio::test]
1611 async fn graph_backfill_with_extract_provider_resolves_without_panic() {
1612 let cfg = crate::config::GraphConfig {
1613 enabled: true,
1614 extract_provider: zeph_config::providers::ProviderName::new("nonexistent-provider"),
1615 ..Default::default()
1616 };
1617 let mut memory = memory_without_qdrant().await;
1618 let pool = memory.sqlite().pool().clone();
1620 memory.graph_store = Some(std::sync::Arc::new(zeph_memory::GraphStore::new(pool)));
1621 let cid = memory.sqlite().create_conversation().await.unwrap();
1622 let mut agent = Agent::new(
1623 mock_provider(vec![]),
1624 MockChannel::new(vec![]),
1625 create_test_registry(),
1626 None,
1627 5,
1628 MockToolExecutor::no_tools(),
1629 )
1630 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1631 .with_graph_config(cfg);
1632
1633 let mut progress = vec![];
1634 let result = agent
1635 .graph_backfill(Some(10), &mut |msg| progress.push(msg))
1636 .await
1637 .unwrap();
1638
1639 assert!(
1641 result.contains("Backfill complete"),
1642 "expected 'Backfill complete' but got: {result}"
1643 );
1644 }
1645
1646 #[tokio::test]
1649 async fn graph_entities_enabled_but_no_store_reports_unavailable() {
1650 let cfg = crate::config::GraphConfig {
1651 enabled: true,
1652 ..Default::default()
1653 };
1654 let memory = memory_without_qdrant().await;
1655 let cid = memory.sqlite().create_conversation().await.unwrap();
1656 let mut agent = Agent::new(
1657 mock_provider(vec![]),
1658 MockChannel::new(vec![]),
1659 create_test_registry(),
1660 None,
1661 5,
1662 MockToolExecutor::no_tools(),
1663 )
1664 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1665 .with_graph_config(cfg);
1666
1667 let result = agent.graph_entities().await.unwrap();
1668 assert!(
1669 result.contains("unavailable"),
1670 "expected 'unavailable' but got: {result}"
1671 );
1672 }
1673
1674 #[tokio::test]
1676 async fn graph_communities_enabled_but_no_store_reports_unavailable() {
1677 let cfg = crate::config::GraphConfig {
1678 enabled: true,
1679 ..Default::default()
1680 };
1681 let memory = memory_without_qdrant().await;
1682 let cid = memory.sqlite().create_conversation().await.unwrap();
1683 let mut agent = Agent::new(
1684 mock_provider(vec![]),
1685 MockChannel::new(vec![]),
1686 create_test_registry(),
1687 None,
1688 5,
1689 MockToolExecutor::no_tools(),
1690 )
1691 .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1692 .with_graph_config(cfg);
1693
1694 let result = agent.graph_communities().await.unwrap();
1695 assert!(
1696 result.contains("unavailable"),
1697 "expected 'unavailable' but got: {result}"
1698 );
1699 }
1700
1701 #[tokio::test]
1706 async fn graph_store_timeout_pattern_fires_on_pending_future() {
1707 use std::future;
1708 let result = tokio::time::timeout(
1709 Duration::from_millis(10),
1710 future::pending::<Result<Vec<()>, String>>(),
1711 )
1712 .await;
1713 assert!(
1714 result.is_err(),
1715 "timeout must fire on a never-resolving future"
1716 );
1717 }
1718}