1use std::fmt::Write as _;
13use std::future::Future;
14use std::pin::Pin;
15
16use zeph_commands::CommandError;
17use zeph_commands::traits::agent::AgentAccess;
18use zeph_memory::{GraphExtractionConfig, MessageId, extract_and_store};
19
20use super::{Agent, error::AgentError};
21use crate::channel::Channel;
22
23impl<C: Channel + Send + 'static> AgentAccess for Agent<C> {
24 fn memory_tiers<'a>(
27 &'a mut self,
28 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
29 Box::pin(async move {
30 let Some(memory) = self.services.memory.persistence.memory.clone() else {
31 return Ok("Memory not configured.".to_owned());
32 };
33 match memory.sqlite().count_messages_by_tier().await {
34 Ok((episodic, semantic)) => {
35 let mut out = String::new();
36 let _ = writeln!(out, "Memory tiers:");
37 let _ = writeln!(out, " Working: (current context window — virtual)");
38 let _ = writeln!(out, " Episodic: {episodic} messages");
39 let _ = writeln!(out, " Semantic: {semantic} facts");
40 Ok(out.trim_end().to_owned())
41 }
42 Err(e) => Ok(format!("Failed to query tier stats: {e}")),
43 }
44 })
45 }
46
47 fn memory_promote<'a>(
48 &'a mut self,
49 ids_str: &'a str,
50 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
51 Box::pin(async move {
52 let Some(memory) = self.services.memory.persistence.memory.clone() else {
53 return Ok("Memory not configured.".to_owned());
54 };
55 let ids: Vec<MessageId> = ids_str
56 .split_whitespace()
57 .filter_map(|s| s.parse::<i64>().ok().map(MessageId))
58 .collect();
59 if ids.is_empty() {
60 return Ok(
61 "Usage: /memory promote <id> [id...]\nExample: /memory promote 42 43 44"
62 .to_owned(),
63 );
64 }
65 match memory.sqlite().manual_promote(&ids).await {
66 Ok(count) => Ok(format!("Promoted {count} message(s) to semantic tier.")),
67 Err(e) => Ok(format!("Promotion failed: {e}")),
68 }
69 })
70 }
71
72 fn graph_stats<'a>(
75 &'a mut self,
76 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
77 Box::pin(async move {
78 let Some(memory) = self.services.memory.persistence.memory.as_ref() else {
79 return Ok("Graph memory is not enabled.".to_owned());
80 };
81 let Some(store) = memory.graph_store.as_ref() else {
82 return Ok("Graph memory is not enabled.".to_owned());
83 };
84
85 let (entities, edges, communities, distribution) = tokio::join!(
86 store.entity_count(),
87 store.active_edge_count(),
88 store.community_count(),
89 store.edge_type_distribution()
90 );
91 let mut msg = format!(
92 "Graph memory: {} entities, {} edges, {} communities",
93 entities.unwrap_or(0),
94 edges.unwrap_or(0),
95 communities.unwrap_or(0)
96 );
97 if let Ok(dist) = distribution
98 && !dist.is_empty()
99 {
100 let dist_str: Vec<String> = dist.iter().map(|(t, c)| format!("{t}={c}")).collect();
101 write!(msg, "\nEdge types: {}", dist_str.join(", ")).unwrap_or(());
102 }
103 Ok(msg)
104 })
105 }
106
107 fn graph_entities<'a>(
108 &'a mut self,
109 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
110 Box::pin(async move {
111 let Some(memory) = self.services.memory.persistence.memory.as_ref() else {
112 return Ok("Graph memory is not enabled.".to_owned());
113 };
114 let Some(store) = memory.graph_store.as_ref() else {
115 return Ok("Graph memory is not enabled.".to_owned());
116 };
117
118 let entities = store
119 .all_entities()
120 .await
121 .map_err(|e| CommandError::new(e.to_string()))?;
122 if entities.is_empty() {
123 return Ok("No entities found.".to_owned());
124 }
125
126 let total = entities.len();
127 let display: Vec<String> = entities
128 .iter()
129 .take(50)
130 .map(|e| {
131 format!(
132 " {:<40} {:<15} {}",
133 e.name,
134 e.entity_type.as_str(),
135 e.last_seen_at.split('T').next().unwrap_or(&e.last_seen_at)
136 )
137 })
138 .collect();
139 let mut msg = format!(
140 "Entities ({total} total):\n {:<40} {:<15} {}\n{}",
141 "NAME",
142 "TYPE",
143 "LAST SEEN",
144 display.join("\n")
145 );
146 if total > 50 {
147 write!(msg, "\n ...and {} more", total - 50).unwrap_or(());
148 }
149 Ok(msg)
150 })
151 }
152
153 fn graph_facts<'a>(
154 &'a mut self,
155 name: &'a str,
156 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
157 Box::pin(async move {
158 let Some(memory) = self.services.memory.persistence.memory.as_ref() else {
159 return Ok("Graph memory is not enabled.".to_owned());
160 };
161 let Some(store) = memory.graph_store.as_ref() else {
162 return Ok("Graph memory is not enabled.".to_owned());
163 };
164
165 let matches = store
166 .find_entity_by_name(name)
167 .await
168 .map_err(|e| CommandError::new(e.to_string()))?;
169 if matches.is_empty() {
170 return Ok(format!("No entity found matching '{name}'."));
171 }
172
173 let entity = &matches[0];
174 let edges = store
175 .edges_for_entity(entity.id)
176 .await
177 .map_err(|e| CommandError::new(e.to_string()))?;
178 if edges.is_empty() {
179 return Ok(format!("Entity '{}' has no known facts.", entity.name));
180 }
181
182 let mut entity_names: std::collections::HashMap<i64, String> =
183 std::collections::HashMap::new();
184 entity_names.insert(entity.id, entity.name.clone());
185 for edge in &edges {
186 let other_id = if edge.source_entity_id == entity.id {
187 edge.target_entity_id
188 } else {
189 edge.source_entity_id
190 };
191 entity_names.entry(other_id).or_default();
192 }
193 for (&id, name_val) in &mut entity_names {
194 if name_val.is_empty() {
195 if let Ok(Some(other)) = store.find_entity_by_id(id).await {
196 *name_val = other.name;
197 } else {
198 *name_val = format!("#{id}");
199 }
200 }
201 }
202
203 let lines: Vec<String> = edges
204 .iter()
205 .map(|e| {
206 let src = entity_names
207 .get(&e.source_entity_id)
208 .cloned()
209 .unwrap_or_else(|| format!("#{}", e.source_entity_id));
210 let tgt = entity_names
211 .get(&e.target_entity_id)
212 .cloned()
213 .unwrap_or_else(|| format!("#{}", e.target_entity_id));
214 format!(
215 " {} --[{}/{}]--> {}: {} (confidence: {:.2})",
216 src, e.relation, e.edge_type, tgt, e.fact, e.confidence
217 )
218 })
219 .collect();
220 Ok(format!(
221 "Facts for '{}':\n{}",
222 entity.name,
223 lines.join("\n")
224 ))
225 })
226 }
227
228 fn graph_history<'a>(
229 &'a mut self,
230 name: &'a str,
231 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
232 Box::pin(async move {
233 let Some(memory) = self.services.memory.persistence.memory.as_ref() else {
234 return Ok("Graph memory is not enabled.".to_owned());
235 };
236 let Some(store) = memory.graph_store.as_ref() else {
237 return Ok("Graph memory is not enabled.".to_owned());
238 };
239
240 let matches = store
241 .find_entity_by_name(name)
242 .await
243 .map_err(|e| CommandError::new(e.to_string()))?;
244 if matches.is_empty() {
245 return Ok(format!("No entity found matching '{name}'."));
246 }
247
248 let entity = &matches[0];
249 let edges = store
250 .edge_history_for_entity(entity.id, 50)
251 .await
252 .map_err(|e| CommandError::new(e.to_string()))?;
253 if edges.is_empty() {
254 return Ok(format!("Entity '{}' has no edge history.", entity.name));
255 }
256
257 let mut entity_names: std::collections::HashMap<i64, String> =
258 std::collections::HashMap::new();
259 entity_names.insert(entity.id, entity.name.clone());
260 for edge in &edges {
261 for &id in &[edge.source_entity_id, edge.target_entity_id] {
262 entity_names.entry(id).or_default();
263 }
264 }
265 for (&id, name_val) in &mut entity_names {
266 if name_val.is_empty() {
267 if let Ok(Some(other)) = store.find_entity_by_id(id).await {
268 *name_val = other.name;
269 } else {
270 *name_val = format!("#{id}");
271 }
272 }
273 }
274
275 let n = edges.len();
276 let lines: Vec<String> = edges
277 .iter()
278 .map(|e| {
279 let status = if e.valid_to.is_some() {
280 let date = e
281 .valid_to
282 .as_deref()
283 .and_then(|s| s.split('T').next().or_else(|| s.split(' ').next()))
284 .unwrap_or("?");
285 format!("[expired {date}]")
286 } else {
287 "[active]".to_string()
288 };
289 let src = entity_names
290 .get(&e.source_entity_id)
291 .cloned()
292 .unwrap_or_else(|| format!("#{}", e.source_entity_id));
293 let tgt = entity_names
294 .get(&e.target_entity_id)
295 .cloned()
296 .unwrap_or_else(|| format!("#{}", e.target_entity_id));
297 format!(
298 " {status} {} --[{}/{}]--> {}: {} (confidence: {:.2})",
299 src, e.relation, e.edge_type, tgt, e.fact, e.confidence
300 )
301 })
302 .collect();
303 Ok(format!(
304 "Edge history for '{}' ({n} edges):\n{}",
305 entity.name,
306 lines.join("\n")
307 ))
308 })
309 }
310
311 fn graph_communities<'a>(
312 &'a mut self,
313 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
314 Box::pin(async move {
315 let Some(memory) = self.services.memory.persistence.memory.as_ref() else {
316 return Ok("Graph memory is not enabled.".to_owned());
317 };
318 let Some(store) = memory.graph_store.as_ref() else {
319 return Ok("Graph memory is not enabled.".to_owned());
320 };
321
322 let communities = store
323 .all_communities()
324 .await
325 .map_err(|e| CommandError::new(e.to_string()))?;
326 if communities.is_empty() {
327 return Ok("No communities detected yet. Run graph backfill first.".to_owned());
328 }
329
330 let lines: Vec<String> = communities
331 .iter()
332 .map(|c| format!(" [{}]: {}", c.name, c.summary))
333 .collect();
334 Ok(format!(
335 "Communities ({}):\n{}",
336 communities.len(),
337 lines.join("\n")
338 ))
339 })
340 }
341
342 fn graph_backfill<'a>(
343 &'a mut self,
344 limit: Option<usize>,
345 progress_cb: &'a mut (dyn FnMut(String) + Send),
346 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
347 Box::pin(async move {
348 let Some(memory) = self.services.memory.persistence.memory.clone() else {
349 return Ok("Graph memory is not enabled.".to_owned());
350 };
351 let Some(store) = memory.graph_store.clone() else {
352 return Ok("Graph memory is not enabled.".to_owned());
353 };
354
355 let total = store.unprocessed_message_count().await.unwrap_or(0);
356 let cap = limit.unwrap_or(usize::MAX);
357
358 progress_cb(format!(
359 "Starting graph backfill... ({total} unprocessed messages)"
360 ));
361
362 let batch_size = 50usize;
363 let mut processed = 0usize;
364 let mut total_entities = 0usize;
365 let mut total_edges = 0usize;
366
367 let graph_cfg = self.services.memory.extraction.graph_config.clone();
368 let provider = self.provider.clone();
369
370 loop {
371 let remaining_cap = cap.saturating_sub(processed);
372 if remaining_cap == 0 {
373 break;
374 }
375 let batch_limit = batch_size.min(remaining_cap);
376 let messages = store
377 .unprocessed_messages_for_backfill(batch_limit)
378 .await
379 .map_err(|e| CommandError::new(e.to_string()))?;
380 if messages.is_empty() {
381 break;
382 }
383
384 let ids: Vec<zeph_memory::types::MessageId> =
385 messages.iter().map(|(id, _)| *id).collect();
386
387 for (_id, content) in &messages {
388 if content.trim().is_empty() {
389 continue;
390 }
391 let extraction_cfg = GraphExtractionConfig {
392 max_entities: graph_cfg.max_entities_per_message,
393 max_edges: graph_cfg.max_edges_per_message,
394 extraction_timeout_secs: graph_cfg.extraction_timeout_secs,
395 community_refresh_interval: 0,
396 expired_edge_retention_days: graph_cfg.expired_edge_retention_days,
397 max_entities_cap: graph_cfg.max_entities,
398 community_summary_max_prompt_bytes: graph_cfg
399 .community_summary_max_prompt_bytes,
400 community_summary_concurrency: graph_cfg.community_summary_concurrency,
401 lpa_edge_chunk_size: graph_cfg.lpa_edge_chunk_size,
402 note_linking: zeph_memory::NoteLinkingConfig::default(),
403 link_weight_decay_lambda: graph_cfg.link_weight_decay_lambda,
404 link_weight_decay_interval_secs: graph_cfg.link_weight_decay_interval_secs,
405 belief_revision_enabled: graph_cfg.belief_revision.enabled,
406 belief_revision_similarity_threshold: graph_cfg
407 .belief_revision
408 .similarity_threshold,
409 conversation_id: None,
410 apex_mem_enabled: graph_cfg.apex_mem.enabled,
411 };
412 let pool = store.pool().clone();
413 match extract_and_store(
414 content.clone(),
415 vec![],
416 provider.clone(),
417 pool,
418 extraction_cfg,
419 None,
420 None,
421 )
422 .await
423 {
424 Ok(result) => {
425 total_entities += result.stats.entities_upserted;
426 total_edges += result.stats.edges_inserted;
427 }
428 Err(e) => {
429 tracing::warn!("backfill extraction error: {e:#}");
430 }
431 }
432 }
433
434 store
435 .mark_messages_graph_processed(&ids)
436 .await
437 .map_err(|e| CommandError::new(e.to_string()))?;
438 processed += messages.len();
439
440 progress_cb(format!(
441 "Backfill progress: {processed} messages processed, \
442 {total_entities} entities, {total_edges} edges"
443 ));
444 }
445
446 Ok(format!(
447 "Backfill complete: {total_entities} entities, {total_edges} edges \
448 extracted from {processed} messages"
449 ))
450 })
451 }
452
453 fn guidelines<'a>(
456 &'a mut self,
457 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
458 Box::pin(async move {
459 const MAX_DISPLAY_CHARS: usize = 4096;
460
461 let Some(memory) = &self.services.memory.persistence.memory else {
462 return Ok("No memory backend initialised.".to_owned());
463 };
464
465 let cid = self.services.memory.persistence.conversation_id;
466 let sqlite = memory.sqlite();
467
468 let (version, text) = sqlite
469 .load_compression_guidelines(cid)
470 .await
471 .map_err(|e: zeph_memory::MemoryError| CommandError::new(e.to_string()))?;
472
473 if version == 0 || text.is_empty() {
474 return Ok("No compression guidelines generated yet.".to_owned());
475 }
476
477 let (_, created_at) = sqlite
478 .load_compression_guidelines_meta(cid)
479 .await
480 .unwrap_or((0, String::new()));
481
482 let (body, truncated) = if text.len() > MAX_DISPLAY_CHARS {
483 let end = text.floor_char_boundary(MAX_DISPLAY_CHARS);
484 (&text[..end], true)
485 } else {
486 (text.as_str(), false)
487 };
488
489 let mut output =
490 format!("Compression Guidelines (v{version}, updated {created_at}):\n\n{body}");
491 if truncated {
492 output.push_str("\n\n[truncated]");
493 }
494 Ok(output)
495 })
496 }
497
498 fn handle_model<'a>(
501 &'a mut self,
502 arg: &'a str,
503 ) -> Pin<Box<dyn Future<Output = String> + Send + 'a>> {
504 Box::pin(async move {
505 let input = if arg.is_empty() {
506 "/model".to_owned()
507 } else {
508 format!("/model {arg}")
509 };
510 self.handle_model_command_as_string(&input).await
511 })
512 }
513
514 fn handle_provider<'a>(
515 &'a mut self,
516 arg: &'a str,
517 ) -> Pin<Box<dyn Future<Output = String> + Send + 'a>> {
518 Box::pin(async move { self.handle_provider_command_as_string(arg) })
519 }
520
521 fn handle_policy<'a>(
524 &'a mut self,
525 args: &'a str,
526 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
527 Box::pin(async move { Ok(self.handle_policy_command_as_string(args)) })
528 }
529
530 #[cfg(feature = "scheduler")]
533 fn list_scheduled_tasks<'a>(
534 &'a mut self,
535 ) -> Pin<Box<dyn Future<Output = Result<Option<String>, CommandError>> + Send + 'a>> {
536 Box::pin(async move {
537 let result = self
538 .handle_scheduler_list_as_string()
539 .await
540 .map_err(|e| CommandError::new(e.to_string()))?;
541 Ok(Some(result))
542 })
543 }
544
545 #[cfg(not(feature = "scheduler"))]
546 fn list_scheduled_tasks<'a>(
547 &'a mut self,
548 ) -> Pin<Box<dyn Future<Output = Result<Option<String>, CommandError>> + Send + 'a>> {
549 Box::pin(async move { Ok(None) })
550 }
551
552 fn lsp_status<'a>(
555 &'a mut self,
556 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
557 Box::pin(async move {
558 self.handle_lsp_status_as_string()
559 .await
560 .map_err(|e| CommandError::new(e.to_string()))
561 })
562 }
563
564 fn session_recap<'a>(
567 &'a mut self,
568 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
569 Box::pin(async move {
570 match self.build_recap().await {
571 Ok(text) => Ok(text),
572 Err(e) => {
573 tracing::warn!("session recap command: {}", e.0);
577 Ok("Recap unavailable — see logs for details".to_string())
578 }
579 }
580 })
581 }
582
583 fn compact_context<'a>(
586 &'a mut self,
587 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
588 Box::pin(self.compact_context_command())
589 }
590
591 fn reset_conversation<'a>(
594 &'a mut self,
595 keep_plan: bool,
596 no_digest: bool,
597 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
598 Box::pin(async move {
599 match self.reset_conversation(keep_plan, no_digest).await {
600 Ok((old_id, new_id)) => {
601 let old = old_id.map_or_else(|| "none".to_string(), |id| id.0.to_string());
602 let new = new_id.map_or_else(|| "none".to_string(), |id| id.0.to_string());
603 let keep_note = if keep_plan { " (plan preserved)" } else { "" };
604 Ok(format!(
605 "New conversation started. Previous: {old} → Current: {new}{keep_note}"
606 ))
607 }
608 Err(e) => Ok(format!("Failed to start new conversation: {e}")),
609 }
610 })
611 }
612
613 fn cache_stats(&self) -> String {
616 self.tool_orchestrator.cache_stats()
617 }
618
619 fn session_status<'a>(
622 &'a mut self,
623 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
624 Box::pin(async move { Ok(self.handle_status_as_string()) })
625 }
626
627 fn guardrail_status(&self) -> String {
630 self.format_guardrail_status()
631 }
632
633 fn focus_status(&self) -> String {
636 self.format_focus_status()
637 }
638
639 fn sidequest_status(&self) -> String {
642 self.format_sidequest_status()
643 }
644
645 fn load_image<'a>(
648 &'a mut self,
649 path: &'a str,
650 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
651 Box::pin(async move { Ok(self.handle_image_as_string(path)) })
652 }
653
654 fn handle_mcp<'a>(
657 &'a mut self,
658 args: &'a str,
659 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
660 let args_owned = args.to_owned();
663 let parts: Vec<String> = args_owned.split_whitespace().map(str::to_owned).collect();
664 let sub = parts.first().cloned().unwrap_or_default();
665
666 match sub.as_str() {
667 "list" => {
668 let manager = self.services.mcp.manager.clone();
670 let tools_snapshot: Vec<(String, String)> = self
671 .services
672 .mcp
673 .tools
674 .iter()
675 .map(|t| (t.server_id.clone(), t.name.clone()))
676 .collect();
677 Box::pin(async move {
678 use std::fmt::Write;
679 let Some(manager) = manager else {
680 return Ok("MCP is not enabled.".to_owned());
681 };
682 let server_ids = manager.list_servers().await;
683 if server_ids.is_empty() {
684 return Ok("No MCP servers connected.".to_owned());
685 }
686 let mut output = String::from("Connected MCP servers:\n");
687 let mut total = 0usize;
688 for id in &server_ids {
689 let count = tools_snapshot.iter().filter(|(sid, _)| sid == id).count();
690 total += count;
691 let _ = writeln!(output, "- {id} ({count} tools)");
692 }
693 let _ = write!(output, "Total: {total} tool(s)");
694 Ok(output)
695 })
696 }
697 "tools" => {
698 let server_id = parts.get(1).cloned();
700 let owned_tools: Vec<(String, String)> = if let Some(ref sid) = server_id {
701 self.services
702 .mcp
703 .tools
704 .iter()
705 .filter(|t| &t.server_id == sid)
706 .map(|t| (t.name.clone(), t.description.clone()))
707 .collect()
708 } else {
709 Vec::new()
710 };
711 Box::pin(async move {
712 use std::fmt::Write;
713 let Some(server_id) = server_id else {
714 return Ok("Usage: /mcp tools <server_id>".to_owned());
715 };
716 if owned_tools.is_empty() {
717 return Ok(format!("No tools found for server '{server_id}'."));
718 }
719 let mut output =
720 format!("Tools for '{server_id}' ({} total):\n", owned_tools.len());
721 for (name, desc) in &owned_tools {
722 if desc.is_empty() {
723 let _ = writeln!(output, "- {name}");
724 } else {
725 let _ = writeln!(output, "- {name} — {desc}");
726 }
727 }
728 Ok(output)
729 })
730 }
731 _ => Box::pin(async move {
738 self.handle_mcp_command(&args_owned)
739 .await
740 .map_err(|e| CommandError::new(e.to_string()))
741 }),
742 }
743 }
744
745 fn handle_skill<'a>(
748 &'a mut self,
749 args: &'a str,
750 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
751 let args_owned = args.to_owned();
752 Box::pin(async move {
753 self.handle_skill_command_as_string(&args_owned)
754 .await
755 .map_err(|e| CommandError::new(e.to_string()))
756 })
757 }
758
759 fn handle_skills<'a>(
762 &'a mut self,
763 args: &'a str,
764 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
765 let args_owned = args.to_owned();
766 Box::pin(async move {
767 self.handle_skills_as_string(&args_owned)
768 .await
769 .map_err(|e| CommandError::new(e.to_string()))
770 })
771 }
772
773 fn handle_feedback_command<'a>(
776 &'a mut self,
777 args: &'a str,
778 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
779 let args_owned = args.to_owned();
780 Box::pin(async move {
781 self.handle_feedback_as_string(&args_owned)
782 .await
783 .map_err(|e| CommandError::new(e.to_string()))
784 })
785 }
786
787 #[cfg(feature = "scheduler")]
790 fn handle_plan<'a>(
791 &'a mut self,
792 input: &'a str,
793 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
794 Box::pin(async move {
795 self.dispatch_plan_command_as_string(input)
796 .await
797 .map_err(|e| CommandError::new(e.to_string()))
798 })
799 }
800
801 #[cfg(not(feature = "scheduler"))]
802 fn handle_plan<'a>(
803 &'a mut self,
804 _input: &'a str,
805 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
806 Box::pin(async move { Ok(String::new()) })
807 }
808
809 fn handle_experiment<'a>(
812 &'a mut self,
813 input: &'a str,
814 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
815 Box::pin(async move {
816 self.handle_experiment_command_as_string(input)
817 .await
818 .map_err(|e| CommandError::new(e.to_string()))
819 })
820 }
821
822 fn handle_agent_dispatch<'a>(
825 &'a mut self,
826 input: &'a str,
827 ) -> Pin<Box<dyn Future<Output = Result<Option<String>, CommandError>> + Send + 'a>> {
828 Box::pin(async move {
829 match self.dispatch_agent_command(input).await {
830 Some(Err(e)) => Err(CommandError::new(e.to_string())),
831 Some(Ok(())) | None => Ok(None),
832 }
833 })
834 }
835
836 fn handle_plugins<'a>(
839 &'a mut self,
840 args: &'a str,
841 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
842 let args_owned = args.to_owned();
843 let managed_dir = self.services.skill.managed_dir.clone();
846 let mcp_allowed = self.services.mcp.allowed_commands.clone();
847 let base_shell_allowed = self.runtime.lifecycle.startup_shell_overlay.allowed.clone();
848 Box::pin(async move {
849 tokio::task::spawn_blocking(move || {
852 Self::run_plugin_command(&args_owned, managed_dir, mcp_allowed, base_shell_allowed)
853 })
854 .await
855 .map_err(|e| CommandError(format!("plugin task panicked: {e}")))
856 })
857 }
858
859 fn handle_acp<'a>(
862 &'a mut self,
863 args: &'a str,
864 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
865 Box::pin(async move {
866 self.handle_acp_as_string(args)
867 .map_err(|e| CommandError::new(e.to_string()))
868 })
869 }
870
871 #[cfg(feature = "cocoon")]
874 fn handle_cocoon<'a>(
875 &'a mut self,
876 args: &'a str,
877 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
878 Box::pin(async move {
879 self.handle_cocoon_as_string(args)
880 .await
881 .map_err(|e| CommandError::new(e.to_string()))
882 })
883 }
884
885 #[cfg(not(feature = "cocoon"))]
886 fn handle_cocoon<'a>(
887 &'a mut self,
888 _args: &'a str,
889 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
890 Box::pin(async {
891 Ok("Cocoon support is not compiled in. Rebuild with `--features cocoon`.".to_owned())
892 })
893 }
894
895 fn handle_loop<'a>(
898 &'a mut self,
899 args: &'a str,
900 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
901 use zeph_commands::handlers::loop_cmd::parse_loop_args;
902
903 let args_owned = args.trim().to_owned();
904 Box::pin(async move {
905 if args_owned == "stop" {
906 return Ok(self.stop_user_loop());
907 }
908 if args_owned == "status" {
909 return Ok(match &self.runtime.lifecycle.user_loop {
910 Some(ls) => format!(
911 "Loop active: \"{}\" (iteration {}, interval every {}s).",
912 ls.prompt,
913 ls.iteration,
914 ls.interval.period().as_secs(),
915 ),
916 None => "No active loop.".to_owned(),
917 });
918 }
919 let (prompt, interval_secs) = parse_loop_args(&args_owned)?;
920
921 if prompt.starts_with('/') {
922 return Err(CommandError::new(
923 "Loop prompt must not start with '/'. Slash commands cannot be used as loop prompts.",
924 ));
925 }
926
927 let min_secs = self.runtime.config.loop_min_interval_secs;
928 if interval_secs < min_secs {
929 return Err(CommandError::new(format!(
930 "Minimum loop interval is {min_secs}s. Got {interval_secs}s."
931 )));
932 }
933 if self.runtime.lifecycle.user_loop.is_some() {
934 return Err(CommandError::new(
935 "A loop is already active. Use /loop stop first.",
936 ));
937 }
938
939 self.start_user_loop(prompt.clone(), interval_secs);
940 Ok(format!(
941 "Loop started: \"{prompt}\" every {interval_secs}s. Use /loop stop to cancel."
942 ))
943 })
944 }
945
946 fn notify_test<'a>(
947 &'a mut self,
948 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
949 let notifier = self.runtime.lifecycle.notifier.clone();
950 Box::pin(async move {
951 let Some(notifier) = notifier else {
952 return Ok(
953 "Notifications are disabled. Set `notifications.enabled = true` in config."
954 .to_owned(),
955 );
956 };
957 match notifier.fire_test().await {
958 Ok(()) => Ok("Test notification sent.".to_owned()),
959 Err(e) => Err(CommandError::new(format!("notification test failed: {e}"))),
960 }
961 })
962 }
963
964 fn handle_trajectory(&mut self, args: &str) -> String {
965 self.handle_trajectory_command_as_string(args)
966 }
967
968 fn handle_scope(&self, args: &str) -> String {
969 self.handle_scope_command_as_string(args)
970 }
971
972 fn handle_goal<'a>(
975 &'a mut self,
976 args: &'a str,
977 ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
978 if self.services.goal_accounting.is_none() {
980 if !self.runtime.config.goals.enabled {
981 return Box::pin(async {
982 Ok("Goals are disabled. Set `[goals] enabled = true` in config.".to_owned())
983 });
984 }
985 let pool = match self.services.memory.persistence.memory.as_ref() {
986 Some(m) => std::sync::Arc::new(m.sqlite().pool().clone()),
987 None => {
988 return Box::pin(async {
989 Ok("Goals require a database backend (memory not configured).".to_owned())
990 });
991 }
992 };
993 let store = std::sync::Arc::new(crate::goal::GoalStore::new(pool));
994 let accounting = std::sync::Arc::new(crate::goal::GoalAccounting::new(store));
995 self.services.goal_accounting = Some(accounting);
996 }
997
998 let accounting =
999 self.services.goal_accounting.clone().expect(
1000 "invariant: goal_accounting is always Some at this point (initialized above)",
1001 );
1002 let max_chars = self.runtime.config.goals.max_text_chars;
1003 let default_budget = self.runtime.config.goals.default_token_budget;
1004
1005 Box::pin(async move {
1006 let _ = accounting.refresh().await;
1007 let store = accounting.get_store();
1008
1009 match args {
1010 "" | "status" => goal_status(&accounting).await,
1011 "pause" => goal_pause(&accounting, &store).await,
1012 "resume" => goal_resume(&accounting, &store).await,
1013 "complete" => goal_complete(&accounting, &store).await,
1014 "clear" => goal_clear(&accounting, &store).await,
1015 "list" => goal_list(&store).await,
1016 _ if args.starts_with("create") => {
1017 goal_create(args, &accounting, &store, max_chars, default_budget).await
1018 }
1019 _ => Ok(
1020 "Unknown /goal subcommand. Try: create, pause, resume, complete, clear, status, list."
1021 .to_owned(),
1022 ),
1023 }
1024 })
1025 }
1026
1027 fn active_goal_snapshot(&self) -> Option<zeph_commands::GoalSnapshot> {
1028 let accounting = self.services.goal_accounting.as_ref()?;
1029 let snap = accounting.snapshot()?;
1030 Some(zeph_commands::GoalSnapshot {
1031 id: snap.id,
1032 text: snap.text,
1033 status: match snap.status {
1034 crate::goal::GoalStatus::Active => zeph_commands::GoalStatusView::Active,
1035 crate::goal::GoalStatus::Paused => zeph_commands::GoalStatusView::Paused,
1036 crate::goal::GoalStatus::Completed => zeph_commands::GoalStatusView::Completed,
1037 crate::goal::GoalStatus::Cleared => zeph_commands::GoalStatusView::Cleared,
1038 },
1039 turns_used: snap.turns_used,
1040 tokens_used: snap.tokens_used,
1041 token_budget: snap.token_budget,
1042 })
1043 }
1044}
1045
1046type GoalStore = crate::goal::GoalStore;
1047type GoalAccounting = crate::goal::GoalAccounting;
1048
1049async fn goal_status(accounting: &GoalAccounting) -> Result<String, CommandError> {
1050 match accounting.get_active().await {
1051 Ok(Some(g)) => {
1052 let budget_line = g.token_budget.map_or_else(
1053 || format!(" tokens used: {}", g.tokens_used),
1054 |b| format!(" budget: {}/{b}", g.tokens_used),
1055 );
1056 Ok(format!(
1057 "Active goal [{}]: {}\n status: {}\n turns: {}\n{}",
1058 &g.id[..8],
1059 g.text,
1060 g.status,
1061 g.turns_used,
1062 budget_line
1063 ))
1064 }
1065 Ok(None) => Ok("No active goal. Use `/goal create <text>` to set one.".to_owned()),
1066 Err(e) => Ok(format!("Goal lookup failed: {e}")),
1067 }
1068}
1069
1070async fn goal_create(
1071 args: &str,
1072 accounting: &GoalAccounting,
1073 store: &GoalStore,
1074 max_chars: usize,
1075 default_budget: u64,
1076) -> Result<String, CommandError> {
1077 let rest = args.strip_prefix("create").unwrap_or("").trim();
1078 let (text, explicit_budget) = parse_goal_create_args(rest);
1079 if text.is_empty() {
1080 return Ok("Usage: /goal create <text> [--budget N]".to_owned());
1081 }
1082 let budget = explicit_budget.or(if default_budget > 0 {
1083 Some(default_budget)
1084 } else {
1085 None
1086 });
1087 match store.create(text, budget, max_chars).await {
1088 Ok(g) => {
1089 let _ = accounting.refresh().await;
1090 Ok(format!("Goal created [{}]: {}", &g.id[..8], g.text))
1091 }
1092 Err(crate::goal::store::GoalError::TextTooLong { max }) => Ok(format!(
1093 "Goal text exceeds {max} characters. Please shorten it."
1094 )),
1095 Err(e) => Ok(format!("Failed to create goal: {e}")),
1096 }
1097}
1098
1099async fn goal_pause(
1100 accounting: &GoalAccounting,
1101 store: &GoalStore,
1102) -> Result<String, CommandError> {
1103 match accounting.get_active().await {
1104 Ok(Some(g)) => {
1105 match store
1106 .transition(&g.id, crate::goal::GoalStatus::Paused, g.updated_at)
1107 .await
1108 {
1109 Ok(_) => {
1110 let _ = accounting.refresh().await;
1111 Ok(format!("Goal [{}] paused.", &g.id[..8]))
1112 }
1113 Err(crate::goal::store::GoalError::StaleUpdate(_)) => {
1114 let current = accounting.get_active().await.ok().flatten();
1115 Ok(format!(
1116 "Goal state changed concurrently. Current: {}",
1117 current.map_or_else(|| "none".into(), |g| g.status.to_string())
1118 ))
1119 }
1120 Err(e) => Ok(format!("Pause failed: {e}")),
1121 }
1122 }
1123 Ok(None) => Ok("No active goal to pause.".to_owned()),
1124 Err(e) => Ok(format!("Failed: {e}")),
1125 }
1126}
1127
1128async fn goal_resume(
1129 accounting: &GoalAccounting,
1130 store: &GoalStore,
1131) -> Result<String, CommandError> {
1132 let goals = store.list(10).await.unwrap_or_default();
1133 let paused = goals
1134 .into_iter()
1135 .find(|g| g.status == crate::goal::GoalStatus::Paused);
1136 match paused {
1137 Some(g) => {
1138 match store
1139 .transition(&g.id, crate::goal::GoalStatus::Active, g.updated_at)
1140 .await
1141 {
1142 Ok(_) => {
1143 let _ = accounting.refresh().await;
1144 Ok(format!("Goal [{}] resumed: {}", &g.id[..8], g.text))
1145 }
1146 Err(crate::goal::store::GoalError::StaleUpdate(_)) => {
1147 Ok("Goal state changed concurrently — please retry.".to_owned())
1148 }
1149 Err(e) => Ok(format!("Resume failed: {e}")),
1150 }
1151 }
1152 None => Ok("No paused goal to resume.".to_owned()),
1153 }
1154}
1155
1156async fn goal_complete(
1157 accounting: &GoalAccounting,
1158 store: &GoalStore,
1159) -> Result<String, CommandError> {
1160 match accounting.get_active().await {
1161 Ok(Some(g)) => {
1162 match store
1163 .transition(&g.id, crate::goal::GoalStatus::Completed, g.updated_at)
1164 .await
1165 {
1166 Ok(_) => {
1167 let _ = accounting.refresh().await;
1168 Ok(format!("Goal [{}] marked complete.", &g.id[..8]))
1169 }
1170 Err(e) => Ok(format!("Complete failed: {e}")),
1171 }
1172 }
1173 Ok(None) => Ok("No active goal.".to_owned()),
1174 Err(e) => Ok(format!("Failed: {e}")),
1175 }
1176}
1177
1178async fn goal_clear(
1179 accounting: &GoalAccounting,
1180 store: &GoalStore,
1181) -> Result<String, CommandError> {
1182 let goals = store.list(10).await.unwrap_or_default();
1183 let target = goals.into_iter().find(|g| {
1184 g.status == crate::goal::GoalStatus::Active || g.status == crate::goal::GoalStatus::Paused
1185 });
1186 match target {
1187 Some(g) => {
1188 match store
1189 .transition(&g.id, crate::goal::GoalStatus::Cleared, g.updated_at)
1190 .await
1191 {
1192 Ok(_) => {
1193 let _ = accounting.refresh().await;
1194 Ok(format!("Goal [{}] cleared.", &g.id[..8]))
1195 }
1196 Err(e) => Ok(format!("Clear failed: {e}")),
1197 }
1198 }
1199 None => Ok("No active or paused goal to clear.".to_owned()),
1200 }
1201}
1202
1203async fn goal_list(store: &GoalStore) -> Result<String, CommandError> {
1204 let goals = store.list(20).await.unwrap_or_default();
1205 if goals.is_empty() {
1206 return Ok("No goals recorded.".to_owned());
1207 }
1208 let mut out = String::from("Goals:\n");
1209 for g in goals {
1210 let _ = std::fmt::Write::write_fmt(
1211 &mut out,
1212 format_args!(
1213 " {} [{}] {} — {} turns\n",
1214 g.status.badge_symbol(),
1215 &g.id[..8],
1216 g.text,
1217 g.turns_used
1218 ),
1219 );
1220 }
1221 Ok(out.trim_end().to_owned())
1222}
1223
1224fn parse_goal_create_args(args: &str) -> (&str, Option<u64>) {
1225 if let Some(pos) = args.find("--budget") {
1226 let text = args[..pos].trim();
1227 let rest = args[pos + "--budget".len()..].trim();
1228 let budget = rest
1229 .split_whitespace()
1230 .next()
1231 .and_then(|s| s.parse::<u64>().ok());
1232 (text, budget)
1233 } else {
1234 (args, None)
1235 }
1236}
1237
1238impl From<AgentError> for CommandError {
1240 fn from(e: AgentError) -> Self {
1241 Self(e.to_string())
1242 }
1243}