Skip to main content

zeph_core/agent/
agent_access_impl.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Implementation of [`zeph_commands::traits::agent::AgentAccess`] for [`Agent<C>`].
5//!
6//! Each method in `AgentAccess` returns a formatted `String` result (without sending to the
7//! channel directly), so that `CommandContext::sink` does not conflict with this borrow.
8//! The one exception is methods for subsystems that are already channel-free (memory, graph).
9//!
10//! [`Agent<C>`]: super::Agent
11
12use std::fmt::Write as _;
13use std::future::Future;
14use std::pin::Pin;
15
16use zeph_commands::CommandError;
17use zeph_commands::traits::agent::AgentAccess;
18use zeph_memory::{GraphExtractionConfig, MessageId, extract_and_store};
19
20use super::{Agent, error::AgentError};
21use crate::channel::Channel;
22
23impl<C: Channel + Send + 'static> AgentAccess for Agent<C> {
24    // ----- /memory -----
25
26    fn memory_tiers<'a>(
27        &'a mut self,
28    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
29        Box::pin(async move {
30            let Some(memory) = self.services.memory.persistence.memory.clone() else {
31                return Ok("Memory not configured.".to_owned());
32            };
33            match memory.sqlite().count_messages_by_tier().await {
34                Ok((episodic, semantic)) => {
35                    let mut out = String::new();
36                    let _ = writeln!(out, "Memory tiers:");
37                    let _ = writeln!(out, "  Working:  (current context window — virtual)");
38                    let _ = writeln!(out, "  Episodic: {episodic} messages");
39                    let _ = writeln!(out, "  Semantic: {semantic} facts");
40                    Ok(out.trim_end().to_owned())
41                }
42                Err(e) => Ok(format!("Failed to query tier stats: {e}")),
43            }
44        })
45    }
46
47    fn memory_promote<'a>(
48        &'a mut self,
49        ids_str: &'a str,
50    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
51        Box::pin(async move {
52            let Some(memory) = self.services.memory.persistence.memory.clone() else {
53                return Ok("Memory not configured.".to_owned());
54            };
55            let ids: Vec<MessageId> = ids_str
56                .split_whitespace()
57                .filter_map(|s| s.parse::<i64>().ok().map(MessageId))
58                .collect();
59            if ids.is_empty() {
60                return Ok(
61                    "Usage: /memory promote <id> [id...]\nExample: /memory promote 42 43 44"
62                        .to_owned(),
63                );
64            }
65            match memory.sqlite().manual_promote(&ids).await {
66                Ok(count) => Ok(format!("Promoted {count} message(s) to semantic tier.")),
67                Err(e) => Ok(format!("Promotion failed: {e}")),
68            }
69        })
70    }
71
72    // ----- /graph -----
73
74    fn graph_stats<'a>(
75        &'a mut self,
76    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
77        Box::pin(async move {
78            let Some(memory) = self.services.memory.persistence.memory.as_ref() else {
79                return Ok("Graph memory is not enabled.".to_owned());
80            };
81            let Some(store) = memory.graph_store.as_ref() else {
82                return Ok("Graph memory is not enabled.".to_owned());
83            };
84
85            let (entities, edges, communities, distribution) = tokio::join!(
86                store.entity_count(),
87                store.active_edge_count(),
88                store.community_count(),
89                store.edge_type_distribution()
90            );
91            let mut msg = format!(
92                "Graph memory: {} entities, {} edges, {} communities",
93                entities.unwrap_or(0),
94                edges.unwrap_or(0),
95                communities.unwrap_or(0)
96            );
97            if let Ok(dist) = distribution
98                && !dist.is_empty()
99            {
100                let dist_str: Vec<String> = dist.iter().map(|(t, c)| format!("{t}={c}")).collect();
101                write!(msg, "\nEdge types: {}", dist_str.join(", ")).unwrap_or(());
102            }
103            Ok(msg)
104        })
105    }
106
107    fn graph_entities<'a>(
108        &'a mut self,
109    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
110        Box::pin(async move {
111            let Some(memory) = self.services.memory.persistence.memory.as_ref() else {
112                return Ok("Graph memory is not enabled.".to_owned());
113            };
114            let Some(store) = memory.graph_store.as_ref() else {
115                return Ok("Graph memory is not enabled.".to_owned());
116            };
117
118            let entities = store
119                .all_entities()
120                .await
121                .map_err(|e| CommandError::new(e.to_string()))?;
122            if entities.is_empty() {
123                return Ok("No entities found.".to_owned());
124            }
125
126            let total = entities.len();
127            let display: Vec<String> = entities
128                .iter()
129                .take(50)
130                .map(|e| {
131                    format!(
132                        "  {:<40}  {:<15}  {}",
133                        e.name,
134                        e.entity_type.as_str(),
135                        e.last_seen_at.split('T').next().unwrap_or(&e.last_seen_at)
136                    )
137                })
138                .collect();
139            let mut msg = format!(
140                "Entities ({total} total):\n  {:<40}  {:<15}  {}\n{}",
141                "NAME",
142                "TYPE",
143                "LAST SEEN",
144                display.join("\n")
145            );
146            if total > 50 {
147                write!(msg, "\n  ...and {} more", total - 50).unwrap_or(());
148            }
149            Ok(msg)
150        })
151    }
152
153    fn graph_facts<'a>(
154        &'a mut self,
155        name: &'a str,
156    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
157        Box::pin(async move {
158            let Some(memory) = self.services.memory.persistence.memory.as_ref() else {
159                return Ok("Graph memory is not enabled.".to_owned());
160            };
161            let Some(store) = memory.graph_store.as_ref() else {
162                return Ok("Graph memory is not enabled.".to_owned());
163            };
164
165            let matches = store
166                .find_entity_by_name(name)
167                .await
168                .map_err(|e| CommandError::new(e.to_string()))?;
169            if matches.is_empty() {
170                return Ok(format!("No entity found matching '{name}'."));
171            }
172
173            let entity = &matches[0];
174            let edges = store
175                .edges_for_entity(entity.id)
176                .await
177                .map_err(|e| CommandError::new(e.to_string()))?;
178            if edges.is_empty() {
179                return Ok(format!("Entity '{}' has no known facts.", entity.name));
180            }
181
182            let mut entity_names: std::collections::HashMap<i64, String> =
183                std::collections::HashMap::new();
184            entity_names.insert(entity.id, entity.name.clone());
185            for edge in &edges {
186                let other_id = if edge.source_entity_id == entity.id {
187                    edge.target_entity_id
188                } else {
189                    edge.source_entity_id
190                };
191                entity_names.entry(other_id).or_default();
192            }
193            for (&id, name_val) in &mut entity_names {
194                if name_val.is_empty() {
195                    if let Ok(Some(other)) = store.find_entity_by_id(id).await {
196                        *name_val = other.name;
197                    } else {
198                        *name_val = format!("#{id}");
199                    }
200                }
201            }
202
203            let lines: Vec<String> = edges
204                .iter()
205                .map(|e| {
206                    let src = entity_names
207                        .get(&e.source_entity_id)
208                        .cloned()
209                        .unwrap_or_else(|| format!("#{}", e.source_entity_id));
210                    let tgt = entity_names
211                        .get(&e.target_entity_id)
212                        .cloned()
213                        .unwrap_or_else(|| format!("#{}", e.target_entity_id));
214                    format!(
215                        "  {} --[{}/{}]--> {}: {} (confidence: {:.2})",
216                        src, e.relation, e.edge_type, tgt, e.fact, e.confidence
217                    )
218                })
219                .collect();
220            Ok(format!(
221                "Facts for '{}':\n{}",
222                entity.name,
223                lines.join("\n")
224            ))
225        })
226    }
227
228    fn graph_history<'a>(
229        &'a mut self,
230        name: &'a str,
231    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
232        Box::pin(async move {
233            let Some(memory) = self.services.memory.persistence.memory.as_ref() else {
234                return Ok("Graph memory is not enabled.".to_owned());
235            };
236            let Some(store) = memory.graph_store.as_ref() else {
237                return Ok("Graph memory is not enabled.".to_owned());
238            };
239
240            let matches = store
241                .find_entity_by_name(name)
242                .await
243                .map_err(|e| CommandError::new(e.to_string()))?;
244            if matches.is_empty() {
245                return Ok(format!("No entity found matching '{name}'."));
246            }
247
248            let entity = &matches[0];
249            let edges = store
250                .edge_history_for_entity(entity.id, 50)
251                .await
252                .map_err(|e| CommandError::new(e.to_string()))?;
253            if edges.is_empty() {
254                return Ok(format!("Entity '{}' has no edge history.", entity.name));
255            }
256
257            let mut entity_names: std::collections::HashMap<i64, String> =
258                std::collections::HashMap::new();
259            entity_names.insert(entity.id, entity.name.clone());
260            for edge in &edges {
261                for &id in &[edge.source_entity_id, edge.target_entity_id] {
262                    entity_names.entry(id).or_default();
263                }
264            }
265            for (&id, name_val) in &mut entity_names {
266                if name_val.is_empty() {
267                    if let Ok(Some(other)) = store.find_entity_by_id(id).await {
268                        *name_val = other.name;
269                    } else {
270                        *name_val = format!("#{id}");
271                    }
272                }
273            }
274
275            let n = edges.len();
276            let lines: Vec<String> = edges
277                .iter()
278                .map(|e| {
279                    let status = if e.valid_to.is_some() {
280                        let date = e
281                            .valid_to
282                            .as_deref()
283                            .and_then(|s| s.split('T').next().or_else(|| s.split(' ').next()))
284                            .unwrap_or("?");
285                        format!("[expired {date}]")
286                    } else {
287                        "[active]".to_string()
288                    };
289                    let src = entity_names
290                        .get(&e.source_entity_id)
291                        .cloned()
292                        .unwrap_or_else(|| format!("#{}", e.source_entity_id));
293                    let tgt = entity_names
294                        .get(&e.target_entity_id)
295                        .cloned()
296                        .unwrap_or_else(|| format!("#{}", e.target_entity_id));
297                    format!(
298                        "  {status} {} --[{}/{}]--> {}: {} (confidence: {:.2})",
299                        src, e.relation, e.edge_type, tgt, e.fact, e.confidence
300                    )
301                })
302                .collect();
303            Ok(format!(
304                "Edge history for '{}' ({n} edges):\n{}",
305                entity.name,
306                lines.join("\n")
307            ))
308        })
309    }
310
311    fn graph_communities<'a>(
312        &'a mut self,
313    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
314        Box::pin(async move {
315            let Some(memory) = self.services.memory.persistence.memory.as_ref() else {
316                return Ok("Graph memory is not enabled.".to_owned());
317            };
318            let Some(store) = memory.graph_store.as_ref() else {
319                return Ok("Graph memory is not enabled.".to_owned());
320            };
321
322            let communities = store
323                .all_communities()
324                .await
325                .map_err(|e| CommandError::new(e.to_string()))?;
326            if communities.is_empty() {
327                return Ok("No communities detected yet. Run graph backfill first.".to_owned());
328            }
329
330            let lines: Vec<String> = communities
331                .iter()
332                .map(|c| format!("  [{}]: {}", c.name, c.summary))
333                .collect();
334            Ok(format!(
335                "Communities ({}):\n{}",
336                communities.len(),
337                lines.join("\n")
338            ))
339        })
340    }
341
342    fn graph_backfill<'a>(
343        &'a mut self,
344        limit: Option<usize>,
345        progress_cb: &'a mut (dyn FnMut(String) + Send),
346    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
347        Box::pin(async move {
348            let Some(memory) = self.services.memory.persistence.memory.clone() else {
349                return Ok("Graph memory is not enabled.".to_owned());
350            };
351            let Some(store) = memory.graph_store.clone() else {
352                return Ok("Graph memory is not enabled.".to_owned());
353            };
354
355            let total = store.unprocessed_message_count().await.unwrap_or(0);
356            let cap = limit.unwrap_or(usize::MAX);
357
358            progress_cb(format!(
359                "Starting graph backfill... ({total} unprocessed messages)"
360            ));
361
362            let batch_size = 50usize;
363            let mut processed = 0usize;
364            let mut total_entities = 0usize;
365            let mut total_edges = 0usize;
366
367            let graph_cfg = self.services.memory.extraction.graph_config.clone();
368            let provider = self.provider.clone();
369
370            loop {
371                let remaining_cap = cap.saturating_sub(processed);
372                if remaining_cap == 0 {
373                    break;
374                }
375                let batch_limit = batch_size.min(remaining_cap);
376                let messages = store
377                    .unprocessed_messages_for_backfill(batch_limit)
378                    .await
379                    .map_err(|e| CommandError::new(e.to_string()))?;
380                if messages.is_empty() {
381                    break;
382                }
383
384                let ids: Vec<zeph_memory::types::MessageId> =
385                    messages.iter().map(|(id, _)| *id).collect();
386
387                for (_id, content) in &messages {
388                    if content.trim().is_empty() {
389                        continue;
390                    }
391                    let extraction_cfg = GraphExtractionConfig {
392                        max_entities: graph_cfg.max_entities_per_message,
393                        max_edges: graph_cfg.max_edges_per_message,
394                        extraction_timeout_secs: graph_cfg.extraction_timeout_secs,
395                        community_refresh_interval: 0,
396                        expired_edge_retention_days: graph_cfg.expired_edge_retention_days,
397                        max_entities_cap: graph_cfg.max_entities,
398                        community_summary_max_prompt_bytes: graph_cfg
399                            .community_summary_max_prompt_bytes,
400                        community_summary_concurrency: graph_cfg.community_summary_concurrency,
401                        lpa_edge_chunk_size: graph_cfg.lpa_edge_chunk_size,
402                        note_linking: zeph_memory::NoteLinkingConfig::default(),
403                        link_weight_decay_lambda: graph_cfg.link_weight_decay_lambda,
404                        link_weight_decay_interval_secs: graph_cfg.link_weight_decay_interval_secs,
405                        belief_revision_enabled: graph_cfg.belief_revision.enabled,
406                        belief_revision_similarity_threshold: graph_cfg
407                            .belief_revision
408                            .similarity_threshold,
409                        conversation_id: None,
410                        apex_mem_enabled: graph_cfg.apex_mem.enabled,
411                    };
412                    let pool = store.pool().clone();
413                    match extract_and_store(
414                        content.clone(),
415                        vec![],
416                        provider.clone(),
417                        pool,
418                        extraction_cfg,
419                        None,
420                        None,
421                    )
422                    .await
423                    {
424                        Ok(result) => {
425                            total_entities += result.stats.entities_upserted;
426                            total_edges += result.stats.edges_inserted;
427                        }
428                        Err(e) => {
429                            tracing::warn!("backfill extraction error: {e:#}");
430                        }
431                    }
432                }
433
434                store
435                    .mark_messages_graph_processed(&ids)
436                    .await
437                    .map_err(|e| CommandError::new(e.to_string()))?;
438                processed += messages.len();
439
440                progress_cb(format!(
441                    "Backfill progress: {processed} messages processed, \
442                     {total_entities} entities, {total_edges} edges"
443                ));
444            }
445
446            Ok(format!(
447                "Backfill complete: {total_entities} entities, {total_edges} edges \
448                 extracted from {processed} messages"
449            ))
450        })
451    }
452
453    // ----- /guidelines -----
454
455    fn guidelines<'a>(
456        &'a mut self,
457    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
458        Box::pin(async move {
459            const MAX_DISPLAY_CHARS: usize = 4096;
460
461            let Some(memory) = &self.services.memory.persistence.memory else {
462                return Ok("No memory backend initialised.".to_owned());
463            };
464
465            let cid = self.services.memory.persistence.conversation_id;
466            let sqlite = memory.sqlite();
467
468            let (version, text) = sqlite
469                .load_compression_guidelines(cid)
470                .await
471                .map_err(|e: zeph_memory::MemoryError| CommandError::new(e.to_string()))?;
472
473            if version == 0 || text.is_empty() {
474                return Ok("No compression guidelines generated yet.".to_owned());
475            }
476
477            let (_, created_at) = sqlite
478                .load_compression_guidelines_meta(cid)
479                .await
480                .unwrap_or((0, String::new()));
481
482            let (body, truncated) = if text.len() > MAX_DISPLAY_CHARS {
483                let end = text.floor_char_boundary(MAX_DISPLAY_CHARS);
484                (&text[..end], true)
485            } else {
486                (text.as_str(), false)
487            };
488
489            let mut output =
490                format!("Compression Guidelines (v{version}, updated {created_at}):\n\n{body}");
491            if truncated {
492                output.push_str("\n\n[truncated]");
493            }
494            Ok(output)
495        })
496    }
497
498    // ----- /model, /provider -----
499
500    fn handle_model<'a>(
501        &'a mut self,
502        arg: &'a str,
503    ) -> Pin<Box<dyn Future<Output = String> + Send + 'a>> {
504        Box::pin(async move {
505            let input = if arg.is_empty() {
506                "/model".to_owned()
507            } else {
508                format!("/model {arg}")
509            };
510            self.handle_model_command_as_string(&input).await
511        })
512    }
513
514    fn handle_provider<'a>(
515        &'a mut self,
516        arg: &'a str,
517    ) -> Pin<Box<dyn Future<Output = String> + Send + 'a>> {
518        Box::pin(async move { self.handle_provider_command_as_string(arg) })
519    }
520
521    // ----- /policy -----
522
523    fn handle_policy<'a>(
524        &'a mut self,
525        args: &'a str,
526    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
527        Box::pin(async move { Ok(self.handle_policy_command_as_string(args)) })
528    }
529
530    // ----- /scheduler -----
531
532    #[cfg(feature = "scheduler")]
533    fn list_scheduled_tasks<'a>(
534        &'a mut self,
535    ) -> Pin<Box<dyn Future<Output = Result<Option<String>, CommandError>> + Send + 'a>> {
536        Box::pin(async move {
537            let result = self
538                .handle_scheduler_list_as_string()
539                .await
540                .map_err(|e| CommandError::new(e.to_string()))?;
541            Ok(Some(result))
542        })
543    }
544
545    #[cfg(not(feature = "scheduler"))]
546    fn list_scheduled_tasks<'a>(
547        &'a mut self,
548    ) -> Pin<Box<dyn Future<Output = Result<Option<String>, CommandError>> + Send + 'a>> {
549        Box::pin(async move { Ok(None) })
550    }
551
552    // ----- /lsp -----
553
554    fn lsp_status<'a>(
555        &'a mut self,
556    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
557        Box::pin(async move {
558            self.handle_lsp_status_as_string()
559                .await
560                .map_err(|e| CommandError::new(e.to_string()))
561        })
562    }
563
564    // ----- /recap -----
565
566    fn session_recap<'a>(
567        &'a mut self,
568    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
569        Box::pin(async move {
570            match self.build_recap().await {
571                Ok(text) => Ok(text),
572                Err(e) => {
573                    // /recap is an explicit user command — surface a fixed message so that
574                    // LlmError internals (URLs with embedded credentials, response excerpts)
575                    // are never forwarded to the user channel. Full detail goes to the log.
576                    tracing::warn!("session recap command: {}", e.0);
577                    Ok("Recap unavailable — see logs for details".to_string())
578                }
579            }
580        })
581    }
582
583    // ----- /compact -----
584
585    fn compact_context<'a>(
586        &'a mut self,
587    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
588        Box::pin(self.compact_context_command())
589    }
590
591    // ----- /new -----
592
593    fn reset_conversation<'a>(
594        &'a mut self,
595        keep_plan: bool,
596        no_digest: bool,
597    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
598        Box::pin(async move {
599            match self.reset_conversation(keep_plan, no_digest).await {
600                Ok((old_id, new_id)) => {
601                    let old = old_id.map_or_else(|| "none".to_string(), |id| id.0.to_string());
602                    let new = new_id.map_or_else(|| "none".to_string(), |id| id.0.to_string());
603                    let keep_note = if keep_plan { " (plan preserved)" } else { "" };
604                    Ok(format!(
605                        "New conversation started. Previous: {old} → Current: {new}{keep_note}"
606                    ))
607                }
608                Err(e) => Ok(format!("Failed to start new conversation: {e}")),
609            }
610        })
611    }
612
613    // ----- /cache-stats -----
614
615    fn cache_stats(&self) -> String {
616        self.tool_orchestrator.cache_stats()
617    }
618
619    // ----- /status -----
620
621    fn session_status<'a>(
622        &'a mut self,
623    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
624        Box::pin(async move { Ok(self.handle_status_as_string()) })
625    }
626
627    // ----- /guardrail -----
628
629    fn guardrail_status(&self) -> String {
630        self.format_guardrail_status()
631    }
632
633    // ----- /focus -----
634
635    fn focus_status(&self) -> String {
636        self.format_focus_status()
637    }
638
639    // ----- /sidequest -----
640
641    fn sidequest_status(&self) -> String {
642        self.format_sidequest_status()
643    }
644
645    // ----- /image -----
646
647    fn load_image<'a>(
648        &'a mut self,
649        path: &'a str,
650    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
651        Box::pin(async move { Ok(self.handle_image_as_string(path)) })
652    }
653
654    // ----- /mcp -----
655
656    fn handle_mcp<'a>(
657        &'a mut self,
658        args: &'a str,
659    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
660        // Extract all owned data before the async block so no &mut self reference is
661        // held across an .await point, satisfying the `for<'a>` Send bound.
662        let args_owned = args.to_owned();
663        let parts: Vec<String> = args_owned.split_whitespace().map(str::to_owned).collect();
664        let sub = parts.first().cloned().unwrap_or_default();
665
666        match sub.as_str() {
667            "list" => {
668                // Read-only: clone all data before async.
669                let manager = self.services.mcp.manager.clone();
670                let tools_snapshot: Vec<(String, String)> = self
671                    .services
672                    .mcp
673                    .tools
674                    .iter()
675                    .map(|t| (t.server_id.clone(), t.name.clone()))
676                    .collect();
677                Box::pin(async move {
678                    use std::fmt::Write;
679                    let Some(manager) = manager else {
680                        return Ok("MCP is not enabled.".to_owned());
681                    };
682                    let server_ids = manager.list_servers().await;
683                    if server_ids.is_empty() {
684                        return Ok("No MCP servers connected.".to_owned());
685                    }
686                    let mut output = String::from("Connected MCP servers:\n");
687                    let mut total = 0usize;
688                    for id in &server_ids {
689                        let count = tools_snapshot.iter().filter(|(sid, _)| sid == id).count();
690                        total += count;
691                        let _ = writeln!(output, "- {id} ({count} tools)");
692                    }
693                    let _ = write!(output, "Total: {total} tool(s)");
694                    Ok(output)
695                })
696            }
697            "tools" => {
698                // Read-only: collect tool info before async.
699                let server_id = parts.get(1).cloned();
700                let owned_tools: Vec<(String, String)> = if let Some(ref sid) = server_id {
701                    self.services
702                        .mcp
703                        .tools
704                        .iter()
705                        .filter(|t| &t.server_id == sid)
706                        .map(|t| (t.name.clone(), t.description.clone()))
707                        .collect()
708                } else {
709                    Vec::new()
710                };
711                Box::pin(async move {
712                    use std::fmt::Write;
713                    let Some(server_id) = server_id else {
714                        return Ok("Usage: /mcp tools <server_id>".to_owned());
715                    };
716                    if owned_tools.is_empty() {
717                        return Ok(format!("No tools found for server '{server_id}'."));
718                    }
719                    let mut output =
720                        format!("Tools for '{server_id}' ({} total):\n", owned_tools.len());
721                    for (name, desc) in &owned_tools {
722                        if desc.is_empty() {
723                            let _ = writeln!(output, "- {name}");
724                        } else {
725                            let _ = writeln!(output, "- {name} — {desc}");
726                        }
727                    }
728                    Ok(output)
729                })
730            }
731            // add/remove require mutating self after async I/O.
732            // handle_mcp_command is structured so the only .await crossing a &mut self
733            // boundary goes through a cloned Arc<McpManager> — no &self fields are held
734            // across that .await.  The subsequent state-change methods (rebuild_semantic_index,
735            // sync_mcp_registry) are also async fn(&mut self), but they only hold owned locals
736            // across their own .await points (cloned tools Vec, cloned Arcs).
737            _ => Box::pin(async move {
738                self.handle_mcp_command(&args_owned)
739                    .await
740                    .map_err(|e| CommandError::new(e.to_string()))
741            }),
742        }
743    }
744
745    // ----- /skill -----
746
747    fn handle_skill<'a>(
748        &'a mut self,
749        args: &'a str,
750    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
751        let args_owned = args.to_owned();
752        Box::pin(async move {
753            self.handle_skill_command_as_string(&args_owned)
754                .await
755                .map_err(|e| CommandError::new(e.to_string()))
756        })
757    }
758
759    // ----- /skills -----
760
761    fn handle_skills<'a>(
762        &'a mut self,
763        args: &'a str,
764    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
765        let args_owned = args.to_owned();
766        Box::pin(async move {
767            self.handle_skills_as_string(&args_owned)
768                .await
769                .map_err(|e| CommandError::new(e.to_string()))
770        })
771    }
772
773    // ----- /feedback -----
774
775    fn handle_feedback_command<'a>(
776        &'a mut self,
777        args: &'a str,
778    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
779        let args_owned = args.to_owned();
780        Box::pin(async move {
781            self.handle_feedback_as_string(&args_owned)
782                .await
783                .map_err(|e| CommandError::new(e.to_string()))
784        })
785    }
786
787    // ----- /plan -----
788
789    #[cfg(feature = "scheduler")]
790    fn handle_plan<'a>(
791        &'a mut self,
792        input: &'a str,
793    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
794        Box::pin(async move {
795            self.dispatch_plan_command_as_string(input)
796                .await
797                .map_err(|e| CommandError::new(e.to_string()))
798        })
799    }
800
801    #[cfg(not(feature = "scheduler"))]
802    fn handle_plan<'a>(
803        &'a mut self,
804        _input: &'a str,
805    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
806        Box::pin(async move { Ok(String::new()) })
807    }
808
809    // ----- /experiment -----
810
811    fn handle_experiment<'a>(
812        &'a mut self,
813        input: &'a str,
814    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
815        Box::pin(async move {
816            self.handle_experiment_command_as_string(input)
817                .await
818                .map_err(|e| CommandError::new(e.to_string()))
819        })
820    }
821
822    // ----- /agent, @mention -----
823
824    fn handle_agent_dispatch<'a>(
825        &'a mut self,
826        input: &'a str,
827    ) -> Pin<Box<dyn Future<Output = Result<Option<String>, CommandError>> + Send + 'a>> {
828        Box::pin(async move {
829            match self.dispatch_agent_command(input).await {
830                Some(Err(e)) => Err(CommandError::new(e.to_string())),
831                Some(Ok(())) | None => Ok(None),
832            }
833        })
834    }
835
836    // ----- /plugins -----
837
838    fn handle_plugins<'a>(
839        &'a mut self,
840        args: &'a str,
841    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
842        let args_owned = args.to_owned();
843        // Clone the fields needed by PluginManager before entering the async block.
844        // spawn_blocking requires 'static, so we cannot borrow &self inside the closure.
845        let managed_dir = self.services.skill.managed_dir.clone();
846        let mcp_allowed = self.services.mcp.allowed_commands.clone();
847        let base_shell_allowed = self.runtime.lifecycle.startup_shell_overlay.allowed.clone();
848        Box::pin(async move {
849            // PluginManager performs synchronous filesystem I/O (copy, remove_dir_all,
850            // read_dir). Run on a blocking thread to avoid stalling the tokio worker.
851            tokio::task::spawn_blocking(move || {
852                Self::run_plugin_command(&args_owned, managed_dir, mcp_allowed, base_shell_allowed)
853            })
854            .await
855            .map_err(|e| CommandError(format!("plugin task panicked: {e}")))
856        })
857    }
858
859    // ----- /acp -----
860
861    fn handle_acp<'a>(
862        &'a mut self,
863        args: &'a str,
864    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
865        Box::pin(async move {
866            self.handle_acp_as_string(args)
867                .map_err(|e| CommandError::new(e.to_string()))
868        })
869    }
870
871    // ----- /cocoon -----
872
873    #[cfg(feature = "cocoon")]
874    fn handle_cocoon<'a>(
875        &'a mut self,
876        args: &'a str,
877    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
878        Box::pin(async move {
879            self.handle_cocoon_as_string(args)
880                .await
881                .map_err(|e| CommandError::new(e.to_string()))
882        })
883    }
884
885    #[cfg(not(feature = "cocoon"))]
886    fn handle_cocoon<'a>(
887        &'a mut self,
888        _args: &'a str,
889    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
890        Box::pin(async {
891            Ok("Cocoon support is not compiled in. Rebuild with `--features cocoon`.".to_owned())
892        })
893    }
894
895    // ----- /loop -----
896
897    fn handle_loop<'a>(
898        &'a mut self,
899        args: &'a str,
900    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
901        use zeph_commands::handlers::loop_cmd::parse_loop_args;
902
903        let args_owned = args.trim().to_owned();
904        Box::pin(async move {
905            if args_owned == "stop" {
906                return Ok(self.stop_user_loop());
907            }
908            if args_owned == "status" {
909                return Ok(match &self.runtime.lifecycle.user_loop {
910                    Some(ls) => format!(
911                        "Loop active: \"{}\" (iteration {}, interval every {}s).",
912                        ls.prompt,
913                        ls.iteration,
914                        ls.interval.period().as_secs(),
915                    ),
916                    None => "No active loop.".to_owned(),
917                });
918            }
919            let (prompt, interval_secs) = parse_loop_args(&args_owned)?;
920
921            if prompt.starts_with('/') {
922                return Err(CommandError::new(
923                    "Loop prompt must not start with '/'. Slash commands cannot be used as loop prompts.",
924                ));
925            }
926
927            let min_secs = self.runtime.config.loop_min_interval_secs;
928            if interval_secs < min_secs {
929                return Err(CommandError::new(format!(
930                    "Minimum loop interval is {min_secs}s. Got {interval_secs}s."
931                )));
932            }
933            if self.runtime.lifecycle.user_loop.is_some() {
934                return Err(CommandError::new(
935                    "A loop is already active. Use /loop stop first.",
936                ));
937            }
938
939            self.start_user_loop(prompt.clone(), interval_secs);
940            Ok(format!(
941                "Loop started: \"{prompt}\" every {interval_secs}s. Use /loop stop to cancel."
942            ))
943        })
944    }
945
946    fn notify_test<'a>(
947        &'a mut self,
948    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
949        let notifier = self.runtime.lifecycle.notifier.clone();
950        Box::pin(async move {
951            let Some(notifier) = notifier else {
952                return Ok(
953                    "Notifications are disabled. Set `notifications.enabled = true` in config."
954                        .to_owned(),
955                );
956            };
957            match notifier.fire_test().await {
958                Ok(()) => Ok("Test notification sent.".to_owned()),
959                Err(e) => Err(CommandError::new(format!("notification test failed: {e}"))),
960            }
961        })
962    }
963
964    fn handle_trajectory(&mut self, args: &str) -> String {
965        self.handle_trajectory_command_as_string(args)
966    }
967
968    fn handle_scope(&self, args: &str) -> String {
969        self.handle_scope_command_as_string(args)
970    }
971
972    // ----- /goal -----
973
974    fn handle_goal<'a>(
975        &'a mut self,
976        args: &'a str,
977    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
978        // Extract all non-Send data synchronously before entering the async block.
979        if self.services.goal_accounting.is_none() {
980            if !self.runtime.config.goals.enabled {
981                return Box::pin(async {
982                    Ok("Goals are disabled. Set `[goals] enabled = true` in config.".to_owned())
983                });
984            }
985            let pool = match self.services.memory.persistence.memory.as_ref() {
986                Some(m) => std::sync::Arc::new(m.sqlite().pool().clone()),
987                None => {
988                    return Box::pin(async {
989                        Ok("Goals require a database backend (memory not configured).".to_owned())
990                    });
991                }
992            };
993            let store = std::sync::Arc::new(crate::goal::GoalStore::new(pool));
994            let accounting = std::sync::Arc::new(crate::goal::GoalAccounting::new(store));
995            self.services.goal_accounting = Some(accounting);
996        }
997
998        let accounting =
999            self.services.goal_accounting.clone().expect(
1000                "invariant: goal_accounting is always Some at this point (initialized above)",
1001            );
1002        let max_chars = self.runtime.config.goals.max_text_chars;
1003        let default_budget = self.runtime.config.goals.default_token_budget;
1004
1005        Box::pin(async move {
1006            let _ = accounting.refresh().await;
1007            let store = accounting.get_store();
1008
1009            match args {
1010                "" | "status" => goal_status(&accounting).await,
1011                "pause" => goal_pause(&accounting, &store).await,
1012                "resume" => goal_resume(&accounting, &store).await,
1013                "complete" => goal_complete(&accounting, &store).await,
1014                "clear" => goal_clear(&accounting, &store).await,
1015                "list" => goal_list(&store).await,
1016                _ if args.starts_with("create") => {
1017                    goal_create(args, &accounting, &store, max_chars, default_budget).await
1018                }
1019                _ => Ok(
1020                    "Unknown /goal subcommand. Try: create, pause, resume, complete, clear, status, list."
1021                        .to_owned(),
1022                ),
1023            }
1024        })
1025    }
1026
1027    fn active_goal_snapshot(&self) -> Option<zeph_commands::GoalSnapshot> {
1028        let accounting = self.services.goal_accounting.as_ref()?;
1029        let snap = accounting.snapshot()?;
1030        Some(zeph_commands::GoalSnapshot {
1031            id: snap.id,
1032            text: snap.text,
1033            status: match snap.status {
1034                crate::goal::GoalStatus::Active => zeph_commands::GoalStatusView::Active,
1035                crate::goal::GoalStatus::Paused => zeph_commands::GoalStatusView::Paused,
1036                crate::goal::GoalStatus::Completed => zeph_commands::GoalStatusView::Completed,
1037                crate::goal::GoalStatus::Cleared => zeph_commands::GoalStatusView::Cleared,
1038            },
1039            turns_used: snap.turns_used,
1040            tokens_used: snap.tokens_used,
1041            token_budget: snap.token_budget,
1042        })
1043    }
1044}
1045
1046type GoalStore = crate::goal::GoalStore;
1047type GoalAccounting = crate::goal::GoalAccounting;
1048
1049async fn goal_status(accounting: &GoalAccounting) -> Result<String, CommandError> {
1050    match accounting.get_active().await {
1051        Ok(Some(g)) => {
1052            let budget_line = g.token_budget.map_or_else(
1053                || format!("  tokens used: {}", g.tokens_used),
1054                |b| format!("  budget: {}/{b}", g.tokens_used),
1055            );
1056            Ok(format!(
1057                "Active goal [{}]: {}\n  status: {}\n  turns: {}\n{}",
1058                &g.id[..8],
1059                g.text,
1060                g.status,
1061                g.turns_used,
1062                budget_line
1063            ))
1064        }
1065        Ok(None) => Ok("No active goal. Use `/goal create <text>` to set one.".to_owned()),
1066        Err(e) => Ok(format!("Goal lookup failed: {e}")),
1067    }
1068}
1069
1070async fn goal_create(
1071    args: &str,
1072    accounting: &GoalAccounting,
1073    store: &GoalStore,
1074    max_chars: usize,
1075    default_budget: u64,
1076) -> Result<String, CommandError> {
1077    let rest = args.strip_prefix("create").unwrap_or("").trim();
1078    let (text, explicit_budget) = parse_goal_create_args(rest);
1079    if text.is_empty() {
1080        return Ok("Usage: /goal create <text> [--budget N]".to_owned());
1081    }
1082    let budget = explicit_budget.or(if default_budget > 0 {
1083        Some(default_budget)
1084    } else {
1085        None
1086    });
1087    match store.create(text, budget, max_chars).await {
1088        Ok(g) => {
1089            let _ = accounting.refresh().await;
1090            Ok(format!("Goal created [{}]: {}", &g.id[..8], g.text))
1091        }
1092        Err(crate::goal::store::GoalError::TextTooLong { max }) => Ok(format!(
1093            "Goal text exceeds {max} characters. Please shorten it."
1094        )),
1095        Err(e) => Ok(format!("Failed to create goal: {e}")),
1096    }
1097}
1098
1099async fn goal_pause(
1100    accounting: &GoalAccounting,
1101    store: &GoalStore,
1102) -> Result<String, CommandError> {
1103    match accounting.get_active().await {
1104        Ok(Some(g)) => {
1105            match store
1106                .transition(&g.id, crate::goal::GoalStatus::Paused, g.updated_at)
1107                .await
1108            {
1109                Ok(_) => {
1110                    let _ = accounting.refresh().await;
1111                    Ok(format!("Goal [{}] paused.", &g.id[..8]))
1112                }
1113                Err(crate::goal::store::GoalError::StaleUpdate(_)) => {
1114                    let current = accounting.get_active().await.ok().flatten();
1115                    Ok(format!(
1116                        "Goal state changed concurrently. Current: {}",
1117                        current.map_or_else(|| "none".into(), |g| g.status.to_string())
1118                    ))
1119                }
1120                Err(e) => Ok(format!("Pause failed: {e}")),
1121            }
1122        }
1123        Ok(None) => Ok("No active goal to pause.".to_owned()),
1124        Err(e) => Ok(format!("Failed: {e}")),
1125    }
1126}
1127
1128async fn goal_resume(
1129    accounting: &GoalAccounting,
1130    store: &GoalStore,
1131) -> Result<String, CommandError> {
1132    let goals = store.list(10).await.unwrap_or_default();
1133    let paused = goals
1134        .into_iter()
1135        .find(|g| g.status == crate::goal::GoalStatus::Paused);
1136    match paused {
1137        Some(g) => {
1138            match store
1139                .transition(&g.id, crate::goal::GoalStatus::Active, g.updated_at)
1140                .await
1141            {
1142                Ok(_) => {
1143                    let _ = accounting.refresh().await;
1144                    Ok(format!("Goal [{}] resumed: {}", &g.id[..8], g.text))
1145                }
1146                Err(crate::goal::store::GoalError::StaleUpdate(_)) => {
1147                    Ok("Goal state changed concurrently — please retry.".to_owned())
1148                }
1149                Err(e) => Ok(format!("Resume failed: {e}")),
1150            }
1151        }
1152        None => Ok("No paused goal to resume.".to_owned()),
1153    }
1154}
1155
1156async fn goal_complete(
1157    accounting: &GoalAccounting,
1158    store: &GoalStore,
1159) -> Result<String, CommandError> {
1160    match accounting.get_active().await {
1161        Ok(Some(g)) => {
1162            match store
1163                .transition(&g.id, crate::goal::GoalStatus::Completed, g.updated_at)
1164                .await
1165            {
1166                Ok(_) => {
1167                    let _ = accounting.refresh().await;
1168                    Ok(format!("Goal [{}] marked complete.", &g.id[..8]))
1169                }
1170                Err(e) => Ok(format!("Complete failed: {e}")),
1171            }
1172        }
1173        Ok(None) => Ok("No active goal.".to_owned()),
1174        Err(e) => Ok(format!("Failed: {e}")),
1175    }
1176}
1177
1178async fn goal_clear(
1179    accounting: &GoalAccounting,
1180    store: &GoalStore,
1181) -> Result<String, CommandError> {
1182    let goals = store.list(10).await.unwrap_or_default();
1183    let target = goals.into_iter().find(|g| {
1184        g.status == crate::goal::GoalStatus::Active || g.status == crate::goal::GoalStatus::Paused
1185    });
1186    match target {
1187        Some(g) => {
1188            match store
1189                .transition(&g.id, crate::goal::GoalStatus::Cleared, g.updated_at)
1190                .await
1191            {
1192                Ok(_) => {
1193                    let _ = accounting.refresh().await;
1194                    Ok(format!("Goal [{}] cleared.", &g.id[..8]))
1195                }
1196                Err(e) => Ok(format!("Clear failed: {e}")),
1197            }
1198        }
1199        None => Ok("No active or paused goal to clear.".to_owned()),
1200    }
1201}
1202
1203async fn goal_list(store: &GoalStore) -> Result<String, CommandError> {
1204    let goals = store.list(20).await.unwrap_or_default();
1205    if goals.is_empty() {
1206        return Ok("No goals recorded.".to_owned());
1207    }
1208    let mut out = String::from("Goals:\n");
1209    for g in goals {
1210        let _ = std::fmt::Write::write_fmt(
1211            &mut out,
1212            format_args!(
1213                "  {} [{}] {} — {} turns\n",
1214                g.status.badge_symbol(),
1215                &g.id[..8],
1216                g.text,
1217                g.turns_used
1218            ),
1219        );
1220    }
1221    Ok(out.trim_end().to_owned())
1222}
1223
1224fn parse_goal_create_args(args: &str) -> (&str, Option<u64>) {
1225    if let Some(pos) = args.find("--budget") {
1226        let text = args[..pos].trim();
1227        let rest = args[pos + "--budget".len()..].trim();
1228        let budget = rest
1229            .split_whitespace()
1230            .next()
1231            .and_then(|s| s.parse::<u64>().ok());
1232        (text, budget)
1233    } else {
1234        (args, None)
1235    }
1236}
1237
1238/// Convert `AgentError` to `CommandError` for the trait boundary.
1239impl From<AgentError> for CommandError {
1240    fn from(e: AgentError) -> Self {
1241        Self(e.to_string())
1242    }
1243}