Skip to main content

zeph_core/agent/
agent_access_impl.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Implementation of [`zeph_commands::traits::agent::AgentAccess`] for [`Agent<C>`].
5//!
6//! Each method in `AgentAccess` returns a formatted `String` result (without sending to the
7//! channel directly), so that `CommandContext::sink` does not conflict with this borrow.
8//! The one exception is methods for subsystems that are already channel-free (memory, graph).
9//!
10//! [`Agent<C>`]: super::Agent
11
12use std::fmt::Write as _;
13use std::future::Future;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::Duration;
17
18use tracing::Instrument as _;
19use zeph_commands::CommandError;
20use zeph_commands::traits::agent::AgentAccess;
21use zeph_memory::semantic::SemanticMemory;
22use zeph_memory::{GraphExtractionConfig, GraphStore, MessageId, extract_and_store};
23
24use super::{Agent, error::AgentError};
25use crate::channel::Channel;
26
27impl<C: Channel + Send + 'static> Agent<C> {
28    fn resolve_graph_store(&self) -> Result<(Arc<SemanticMemory>, Arc<GraphStore>), String> {
29        let Some(memory) = self.services.memory.persistence.memory.clone() else {
30            return Err("Graph memory is not enabled.".to_owned());
31        };
32        let Some(store) = memory.graph_store.clone() else {
33            if self.services.memory.extraction.graph_config.enabled {
34                return Err(
35                    "Graph memory enabled but vector store unavailable (Qdrant unreachable)."
36                        .to_owned(),
37                );
38            }
39            return Err("Graph memory is not enabled.".to_owned());
40        };
41        Ok((memory, store))
42    }
43}
44
45impl<C: Channel + Send + 'static> AgentAccess for Agent<C> {
46    // ----- /memory -----
47
48    fn memory_tiers<'a>(
49        &'a mut self,
50    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
51        Box::pin(
52            async move {
53                let Some(memory) = self.services.memory.persistence.memory.clone() else {
54                    return Ok("Memory not configured.".to_owned());
55                };
56                match memory.sqlite().count_messages_by_tier().await {
57                    Ok((episodic, semantic)) => {
58                        let mut out = String::new();
59                        let _ = writeln!(out, "Memory tiers:");
60                        let _ = writeln!(out, "  Working:  (current context window — virtual)");
61                        let _ = writeln!(out, "  Episodic: {episodic} messages");
62                        let _ = writeln!(out, "  Semantic: {semantic} facts");
63                        Ok(out.trim_end().to_owned())
64                    }
65                    Err(e) => Ok(format!("Failed to query tier stats: {e}")),
66                }
67            }
68            .instrument(tracing::info_span!("core.agent_access.memory_tiers")),
69        )
70    }
71
72    fn memory_promote<'a>(
73        &'a mut self,
74        ids_str: &'a str,
75    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
76        Box::pin(
77            async move {
78                let Some(memory) = self.services.memory.persistence.memory.clone() else {
79                    return Ok("Memory not configured.".to_owned());
80                };
81                let ids: Vec<MessageId> = ids_str
82                    .split_whitespace()
83                    .filter_map(|s| s.parse::<i64>().ok().map(MessageId))
84                    .collect();
85                if ids.is_empty() {
86                    return Ok(
87                        "Usage: /memory promote <id> [id...]\nExample: /memory promote 42 43 44"
88                            .to_owned(),
89                    );
90                }
91                match memory.sqlite().manual_promote(&ids).await {
92                    Ok(count) => Ok(format!("Promoted {count} message(s) to semantic tier.")),
93                    Err(e) => Ok(format!("Promotion failed: {e}")),
94                }
95            }
96            .instrument(tracing::info_span!("core.agent_access.memory_promote")),
97        )
98    }
99
100    // ----- /graph -----
101
102    fn graph_stats<'a>(
103        &'a mut self,
104    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
105        Box::pin(
106            async move {
107                let (_, store) = match self.resolve_graph_store() {
108                    Ok(pair) => pair,
109                    Err(msg) => return Ok(msg),
110                };
111
112                let stats_future = async {
113                    tokio::join!(
114                        store.entity_count(),
115                        store.active_edge_count(),
116                        store.community_count(),
117                        store.edge_type_distribution()
118                    )
119                };
120                let Ok((entities, edges, communities, distribution)) =
121                    tokio::time::timeout(Duration::from_secs(5), stats_future).await
122                else {
123                    tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
124                    return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
125                };
126                let mut msg = format!(
127                    "Graph memory: {} entities, {} edges, {} communities",
128                    entities.unwrap_or(0),
129                    edges.unwrap_or(0),
130                    communities.unwrap_or(0)
131                );
132                if let Ok(dist) = distribution
133                    && !dist.is_empty()
134                {
135                    let dist_str: Vec<String> =
136                        dist.iter().map(|(t, c)| format!("{t}={c}")).collect();
137                    write!(msg, "\nEdge types: {}", dist_str.join(", ")).unwrap_or(());
138                }
139                Ok(msg)
140            }
141            .instrument(tracing::info_span!("core.agent_access.graph_stats")),
142        )
143    }
144
145    fn graph_entities<'a>(
146        &'a mut self,
147    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
148        Box::pin(
149            async move {
150                let (_, store) = match self.resolve_graph_store() {
151                    Ok(pair) => pair,
152                    Err(msg) => return Ok(msg),
153                };
154
155                let entities = match tokio::time::timeout(
156                    Duration::from_secs(5),
157                    store.all_entities(),
158                )
159                .await
160                {
161                    Ok(Ok(v)) => v,
162                    Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
163                    Err(_) => {
164                        tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
165                        return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
166                    }
167                };
168                if entities.is_empty() {
169                    return Ok("No entities found.".to_owned());
170                }
171
172                let total = entities.len();
173                let display: Vec<String> = entities
174                    .iter()
175                    .take(50)
176                    .map(|e| {
177                        format!(
178                            "  {:<40}  {:<15}  {}",
179                            e.name,
180                            e.entity_type.as_str(),
181                            e.last_seen_at.split('T').next().unwrap_or(&e.last_seen_at)
182                        )
183                    })
184                    .collect();
185                let mut msg = format!(
186                    "Entities ({total} total):\n  {:<40}  {:<15}  {}\n{}",
187                    "NAME",
188                    "TYPE",
189                    "LAST SEEN",
190                    display.join("\n")
191                );
192                if total > 50 {
193                    write!(msg, "\n  ...and {} more", total - 50).unwrap_or(());
194                }
195                Ok(msg)
196            }
197            .instrument(tracing::info_span!("core.agent_access.graph_entities")),
198        )
199    }
200
201    fn graph_facts<'a>(
202        &'a mut self,
203        name: &'a str,
204    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
205        Box::pin(
206            async move {
207                let (_, store) = match self.resolve_graph_store() {
208                    Ok(pair) => pair,
209                    Err(msg) => return Ok(msg),
210                };
211
212                let matches = match tokio::time::timeout(
213                    Duration::from_secs(5),
214                    store.find_entity_by_name(name),
215                )
216                .await
217                {
218                    Ok(Ok(v)) => v,
219                    Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
220                    Err(_) => {
221                        tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
222                        return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
223                    }
224                };
225                if matches.is_empty() {
226                    return Ok(format!("No entity found matching '{name}'."));
227                }
228
229                let entity = &matches[0];
230                let edges = match tokio::time::timeout(
231                    Duration::from_secs(5),
232                    store.edges_for_entity(entity.id.0),
233                )
234                .await
235                {
236                    Ok(Ok(v)) => v,
237                    Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
238                    Err(_) => {
239                        tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
240                        return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
241                    }
242                };
243                if edges.is_empty() {
244                    return Ok(format!("Entity '{}' has no known facts.", entity.name));
245                }
246
247                let mut entity_names: std::collections::HashMap<i64, String> =
248                    std::collections::HashMap::new();
249                entity_names.insert(entity.id.0, entity.name.clone());
250                for edge in &edges {
251                    let other_id = if edge.source_entity_id == entity.id.0 {
252                        edge.target_entity_id
253                    } else {
254                        edge.source_entity_id
255                    };
256                    entity_names.entry(other_id).or_default();
257                }
258                for (&id, name_val) in &mut entity_names {
259                    if name_val.is_empty() {
260                        let result = tokio::time::timeout(
261                            Duration::from_secs(5),
262                            store.find_entity_by_id(id),
263                        )
264                        .await;
265                        if let Ok(Ok(Some(other))) = result {
266                            *name_val = other.name;
267                        } else {
268                            *name_val = format!("#{id}");
269                        }
270                    }
271                }
272
273                let lines: Vec<String> = edges
274                    .iter()
275                    .map(|e| {
276                        let src = entity_names
277                            .get(&e.source_entity_id)
278                            .cloned()
279                            .unwrap_or_else(|| format!("#{}", e.source_entity_id));
280                        let tgt = entity_names
281                            .get(&e.target_entity_id)
282                            .cloned()
283                            .unwrap_or_else(|| format!("#{}", e.target_entity_id));
284                        format!(
285                            "  {} --[{}/{}]--> {}: {} (confidence: {:.2})",
286                            src, e.relation, e.edge_type, tgt, e.fact, e.confidence
287                        )
288                    })
289                    .collect();
290                Ok(format!(
291                    "Facts for '{}':\n{}",
292                    entity.name,
293                    lines.join("\n")
294                ))
295            }
296            .instrument(tracing::info_span!("core.agent_access.graph_facts")),
297        )
298    }
299
300    fn graph_history<'a>(
301        &'a mut self,
302        name: &'a str,
303    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
304        Box::pin(
305            async move {
306                let (_, store) = match self.resolve_graph_store() {
307                    Ok(pair) => pair,
308                    Err(msg) => return Ok(msg),
309                };
310
311                let matches = match tokio::time::timeout(
312                    Duration::from_secs(5),
313                    store.find_entity_by_name(name),
314                )
315                .await
316                {
317                    Ok(Ok(v)) => v,
318                    Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
319                    Err(_) => {
320                        tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
321                        return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
322                    }
323                };
324                if matches.is_empty() {
325                    return Ok(format!("No entity found matching '{name}'."));
326                }
327
328                let entity = &matches[0];
329                let edges = match tokio::time::timeout(
330                    Duration::from_secs(5),
331                    store.edge_history_for_entity(entity.id.0, 50),
332                )
333                .await
334                {
335                    Ok(Ok(v)) => v,
336                    Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
337                    Err(_) => {
338                        tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
339                        return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
340                    }
341                };
342                if edges.is_empty() {
343                    return Ok(format!("Entity '{}' has no edge history.", entity.name));
344                }
345
346                let mut entity_names: std::collections::HashMap<i64, String> =
347                    std::collections::HashMap::new();
348                entity_names.insert(entity.id.0, entity.name.clone());
349                for edge in &edges {
350                    for &id in &[edge.source_entity_id, edge.target_entity_id] {
351                        entity_names.entry(id).or_default();
352                    }
353                }
354                for (&id, name_val) in &mut entity_names {
355                    if name_val.is_empty() {
356                        let result = tokio::time::timeout(
357                            Duration::from_secs(5),
358                            store.find_entity_by_id(id),
359                        )
360                        .await;
361                        if let Ok(Ok(Some(other))) = result {
362                            *name_val = other.name;
363                        } else {
364                            *name_val = format!("#{id}");
365                        }
366                    }
367                }
368
369                let n = edges.len();
370                let lines: Vec<String> = edges
371                    .iter()
372                    .map(|e| {
373                        let status = if e.valid_to.is_some() {
374                            let date = e
375                                .valid_to
376                                .as_deref()
377                                .and_then(|s| s.split('T').next().or_else(|| s.split(' ').next()))
378                                .unwrap_or("?");
379                            format!("[expired {date}]")
380                        } else {
381                            "[active]".to_string()
382                        };
383                        let src = entity_names
384                            .get(&e.source_entity_id)
385                            .cloned()
386                            .unwrap_or_else(|| format!("#{}", e.source_entity_id));
387                        let tgt = entity_names
388                            .get(&e.target_entity_id)
389                            .cloned()
390                            .unwrap_or_else(|| format!("#{}", e.target_entity_id));
391                        format!(
392                            "  {status} {} --[{}/{}]--> {}: {} (confidence: {:.2})",
393                            src, e.relation, e.edge_type, tgt, e.fact, e.confidence
394                        )
395                    })
396                    .collect();
397                Ok(format!(
398                    "Edge history for '{}' ({n} edges):\n{}",
399                    entity.name,
400                    lines.join("\n")
401                ))
402            }
403            .instrument(tracing::info_span!("core.agent_access.graph_history")),
404        )
405    }
406
407    fn graph_communities<'a>(
408        &'a mut self,
409    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
410        Box::pin(
411            async move {
412                let (_, store) = match self.resolve_graph_store() {
413                    Ok(pair) => pair,
414                    Err(msg) => return Ok(msg),
415                };
416
417                let communities =
418                    match tokio::time::timeout(Duration::from_secs(5), store.all_communities())
419                        .await
420                    {
421                        Ok(Ok(v)) => v,
422                        Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
423                        Err(_) => {
424                            tracing::warn!(
425                                "graph store call timed out after 5s (Qdrant unreachable)"
426                            );
427                            return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
428                        }
429                    };
430                if communities.is_empty() {
431                    return Ok("No communities detected yet. Run graph backfill first.".to_owned());
432                }
433
434                let lines: Vec<String> = communities
435                    .iter()
436                    .map(|c| format!("  [{}]: {}", c.name, c.summary))
437                    .collect();
438                Ok(format!(
439                    "Communities ({}):\n{}",
440                    communities.len(),
441                    lines.join("\n")
442                ))
443            }
444            .instrument(tracing::info_span!("core.agent_access.graph_communities")),
445        )
446    }
447
448    #[allow(clippy::too_many_lines)]
449    fn graph_backfill<'a>(
450        &'a mut self,
451        limit: Option<usize>,
452        progress_cb: &'a mut (dyn FnMut(String) + Send),
453    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
454        let store = match self.resolve_graph_store() {
455            Ok((_, s)) => s,
456            Err(msg) => return Box::pin(async move { Ok(msg) }),
457        };
458        let graph_cfg = self.services.memory.extraction.graph_config.clone();
459        let embed_timeout_secs = self
460            .services
461            .memory
462            .persistence
463            .memory
464            .as_ref()
465            .map_or(5, |m| m.embed_timeout().as_secs());
466        let provider = if graph_cfg.extract_provider.as_str().is_empty() {
467            self.provider.clone()
468        } else {
469            self.resolve_background_provider(graph_cfg.extract_provider.as_str())
470        };
471        Box::pin(
472            async move {
473                let total = store.unprocessed_message_count().await.unwrap_or(0);
474                let cap = limit.unwrap_or(usize::MAX);
475
476                progress_cb(format!(
477                    "Starting graph backfill... ({total} unprocessed messages)"
478                ));
479
480                let batch_size = 50usize;
481                let mut processed = 0usize;
482                let mut total_entities = 0usize;
483                let mut total_edges = 0usize;
484
485                loop {
486                    let remaining_cap = cap.saturating_sub(processed);
487                    if remaining_cap == 0 {
488                        break;
489                    }
490                    let batch_limit = batch_size.min(remaining_cap);
491                    let messages = store
492                        .unprocessed_messages_for_backfill(batch_limit)
493                        .await
494                        .map_err(|e| CommandError::new(e.to_string()))?;
495                    if messages.is_empty() {
496                        break;
497                    }
498
499                    let ids: Vec<zeph_memory::types::MessageId> =
500                        messages.iter().map(|(id, _)| *id).collect();
501
502                    for (_id, content) in &messages {
503                        if content.trim().is_empty() {
504                            continue;
505                        }
506                        let extraction_cfg = GraphExtractionConfig {
507                            max_entities: graph_cfg.max_entities_per_message,
508                            max_edges: graph_cfg.max_edges_per_message,
509                            extraction_timeout_secs: graph_cfg.extraction_timeout_secs,
510                            community_refresh_interval: 0,
511                            expired_edge_retention_days: graph_cfg.expired_edge_retention_days,
512                            max_entities_cap: graph_cfg.max_entities,
513                            community_summary_max_prompt_bytes: graph_cfg
514                                .community_summary_max_prompt_bytes,
515                            community_summary_concurrency: graph_cfg.community_summary_concurrency,
516                            lpa_edge_chunk_size: graph_cfg.lpa_edge_chunk_size,
517                            note_linking: zeph_memory::NoteLinkingConfig::default(),
518                            link_weight_decay_lambda: graph_cfg.link_weight_decay_lambda,
519                            link_weight_decay_interval_secs: graph_cfg
520                                .link_weight_decay_interval_secs,
521                            belief_revision_enabled: graph_cfg.belief_revision.enabled,
522                            belief_revision_similarity_threshold: graph_cfg
523                                .belief_revision
524                                .similarity_threshold,
525                            conversation_id: None,
526                            apex_mem_enabled: graph_cfg.apex_mem.enabled,
527                            llm_timeout_secs: graph_cfg.llm_timeout_secs,
528                            embed_timeout_secs,
529                        };
530                        let pool = store.pool().clone();
531                        match extract_and_store(
532                            content.clone(),
533                            vec![],
534                            provider.clone(),
535                            pool,
536                            extraction_cfg,
537                            None,
538                            None,
539                        )
540                        .await
541                        {
542                            Ok(result) => {
543                                total_entities += result.stats.entities_upserted;
544                                total_edges += result.stats.edges_inserted;
545                            }
546                            Err(e) => {
547                                tracing::warn!("backfill extraction error: {e:#}");
548                            }
549                        }
550                    }
551
552                    store
553                        .mark_messages_graph_processed(&ids)
554                        .await
555                        .map_err(|e| CommandError::new(e.to_string()))?;
556                    processed += messages.len();
557
558                    progress_cb(format!(
559                        "Backfill progress: {processed} messages processed, \
560                     {total_entities} entities, {total_edges} edges"
561                    ));
562                }
563
564                Ok(format!(
565                    "Backfill complete: {total_entities} entities, {total_edges} edges \
566                 extracted from {processed} messages"
567                ))
568            }
569            .instrument(tracing::info_span!("core.agent_access.graph_backfill")),
570        )
571    }
572
573    // ----- /guidelines -----
574
575    fn guidelines<'a>(
576        &'a mut self,
577    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
578        Box::pin(
579            async move {
580                const MAX_DISPLAY_CHARS: usize = 4096;
581
582                let Some(memory) = &self.services.memory.persistence.memory else {
583                    return Ok("No memory backend initialised.".to_owned());
584                };
585
586                let cid = self.services.memory.persistence.conversation_id;
587                let sqlite = memory.sqlite();
588
589                let (version, text) = sqlite
590                    .load_compression_guidelines(cid)
591                    .await
592                    .map_err(|e: zeph_memory::MemoryError| CommandError::new(e.to_string()))?;
593
594                if version == 0 || text.is_empty() {
595                    return Ok("No compression guidelines generated yet.".to_owned());
596                }
597
598                let (_, created_at) = sqlite
599                    .load_compression_guidelines_meta(cid)
600                    .await
601                    .unwrap_or((0, String::new()));
602
603                let (body, truncated) = if text.len() > MAX_DISPLAY_CHARS {
604                    let end = text.floor_char_boundary(MAX_DISPLAY_CHARS);
605                    (&text[..end], true)
606                } else {
607                    (text.as_str(), false)
608                };
609
610                let mut output =
611                    format!("Compression Guidelines (v{version}, updated {created_at}):\n\n{body}");
612                if truncated {
613                    output.push_str("\n\n[truncated]");
614                }
615                Ok(output)
616            }
617            .instrument(tracing::info_span!("core.agent_access.guidelines")),
618        )
619    }
620
621    // ----- /model, /provider -----
622
623    fn handle_model<'a>(
624        &'a mut self,
625        arg: &'a str,
626    ) -> Pin<Box<dyn Future<Output = String> + Send + 'a>> {
627        Box::pin(async move {
628            let input = if arg.is_empty() {
629                "/model".to_owned()
630            } else {
631                format!("/model {arg}")
632            };
633            self.handle_model_command_as_string(&input).await
634        })
635    }
636
637    fn handle_provider<'a>(
638        &'a mut self,
639        arg: &'a str,
640    ) -> Pin<Box<dyn Future<Output = String> + Send + 'a>> {
641        Box::pin(async move { self.handle_provider_command_as_string(arg).await })
642    }
643
644    // ----- /policy -----
645
646    fn handle_policy<'a>(
647        &'a mut self,
648        args: &'a str,
649    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
650        Box::pin(async move { Ok(self.handle_policy_command_as_string(args)) })
651    }
652
653    // ----- /scheduler -----
654
655    #[cfg(feature = "scheduler")]
656    fn list_scheduled_tasks<'a>(
657        &'a mut self,
658    ) -> Pin<Box<dyn Future<Output = Result<Option<String>, CommandError>> + Send + 'a>> {
659        Box::pin(async move {
660            let result = self
661                .handle_scheduler_list_as_string()
662                .await
663                .map_err(|e| CommandError::new(e.to_string()))?;
664            Ok(Some(result))
665        })
666    }
667
668    #[cfg(not(feature = "scheduler"))]
669    fn list_scheduled_tasks<'a>(
670        &'a mut self,
671    ) -> Pin<Box<dyn Future<Output = Result<Option<String>, CommandError>> + Send + 'a>> {
672        Box::pin(async move { Ok(None) })
673    }
674
675    // ----- /lsp -----
676
677    fn lsp_status<'a>(
678        &'a mut self,
679    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
680        Box::pin(async move {
681            self.handle_lsp_status_as_string()
682                .await
683                .map_err(|e| CommandError::new(e.to_string()))
684        })
685    }
686
687    // ----- /recap -----
688
689    fn session_recap<'a>(
690        &'a mut self,
691    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
692        Box::pin(
693            async move {
694                match self.build_recap().await {
695                    Ok(text) => Ok(text),
696                    Err(e) => {
697                        // /recap is an explicit user command — surface a fixed message so that
698                        // LlmError internals (URLs with embedded credentials, response excerpts)
699                        // are never forwarded to the user channel. Full detail goes to the log.
700                        tracing::warn!("session recap command: {}", e.0);
701                        Ok("Recap unavailable — see logs for details".to_string())
702                    }
703                }
704            }
705            .instrument(tracing::info_span!("core.agent_access.session_recap")),
706        )
707    }
708
709    // ----- /compact -----
710
711    fn compact_context<'a>(
712        &'a mut self,
713    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
714        Box::pin(
715            self.compact_context_command()
716                .instrument(tracing::info_span!("core.agent_access.compact_context")),
717        )
718    }
719
720    // ----- /new -----
721
722    fn reset_conversation<'a>(
723        &'a mut self,
724        keep_plan: bool,
725        no_digest: bool,
726    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
727        Box::pin(async move {
728            match self.reset_conversation(keep_plan, no_digest).await {
729                Ok((old_id, new_id)) => {
730                    let old = old_id.map_or_else(|| "none".to_string(), |id| id.0.to_string());
731                    let new = new_id.map_or_else(|| "none".to_string(), |id| id.0.to_string());
732                    let keep_note = if keep_plan { " (plan preserved)" } else { "" };
733                    Ok(format!(
734                        "New conversation started. Previous: {old} → Current: {new}{keep_note}"
735                    ))
736                }
737                Err(e) => Ok(format!("Failed to start new conversation: {e}")),
738            }
739        })
740    }
741
742    // ----- /cache-stats -----
743
744    fn cache_stats(&self) -> String {
745        self.tool_orchestrator.cache_stats()
746    }
747
748    // ----- /status -----
749
750    fn session_status<'a>(
751        &'a mut self,
752    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
753        Box::pin(async move { Ok(self.handle_status_as_string()) })
754    }
755
756    // ----- /guardrail -----
757
758    fn guardrail_status(&self) -> String {
759        self.format_guardrail_status()
760    }
761
762    // ----- /focus -----
763
764    fn focus_status(&self) -> String {
765        self.format_focus_status()
766    }
767
768    // ----- /sidequest -----
769
770    fn sidequest_status(&self) -> String {
771        self.format_sidequest_status()
772    }
773
774    // ----- /image -----
775
776    fn load_image<'a>(
777        &'a mut self,
778        path: &'a str,
779    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
780        Box::pin(async move { Ok(self.handle_image_as_string(path)) })
781    }
782
783    // ----- /mcp -----
784
785    fn handle_mcp<'a>(
786        &'a mut self,
787        args: &'a str,
788    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
789        // Extract all owned data before the async block so no &mut self reference is
790        // held across an .await point, satisfying the `for<'a>` Send bound.
791        let args_owned = args.to_owned();
792        let parts: Vec<String> = args_owned.split_whitespace().map(str::to_owned).collect();
793        let sub = parts.first().cloned().unwrap_or_default();
794
795        match sub.as_str() {
796            "list" => {
797                // Read-only: clone all data before async.
798                let manager = self.services.mcp.manager.clone();
799                let tools_snapshot: Vec<(String, String)> = self
800                    .services
801                    .mcp
802                    .tools
803                    .iter()
804                    .map(|t| (t.server_id.clone(), t.name.clone()))
805                    .collect();
806                Box::pin(async move {
807                    use std::fmt::Write;
808                    let Some(manager) = manager else {
809                        return Ok("MCP is not enabled.".to_owned());
810                    };
811                    let server_ids = manager.list_servers().await;
812                    if server_ids.is_empty() {
813                        return Ok("No MCP servers connected.".to_owned());
814                    }
815                    let mut output = String::from("Connected MCP servers:\n");
816                    let mut total = 0usize;
817                    for id in &server_ids {
818                        let count = tools_snapshot.iter().filter(|(sid, _)| sid == id).count();
819                        total += count;
820                        let _ = writeln!(output, "- {id} ({count} tools)");
821                    }
822                    let _ = write!(output, "Total: {total} tool(s)");
823                    Ok(output)
824                })
825            }
826            "tools" => {
827                // Read-only: collect tool info before async.
828                let server_id = parts.get(1).cloned();
829                let owned_tools: Vec<(String, String)> = if let Some(ref sid) = server_id {
830                    self.services
831                        .mcp
832                        .tools
833                        .iter()
834                        .filter(|t| &t.server_id == sid)
835                        .map(|t| (t.name.clone(), t.description.clone()))
836                        .collect()
837                } else {
838                    Vec::new()
839                };
840                Box::pin(async move {
841                    use std::fmt::Write;
842                    let Some(server_id) = server_id else {
843                        return Ok("Usage: /mcp tools <server_id>".to_owned());
844                    };
845                    if owned_tools.is_empty() {
846                        return Ok(format!("No tools found for server '{server_id}'."));
847                    }
848                    let mut output =
849                        format!("Tools for '{server_id}' ({} total):\n", owned_tools.len());
850                    for (name, desc) in &owned_tools {
851                        if desc.is_empty() {
852                            let _ = writeln!(output, "- {name}");
853                        } else {
854                            let _ = writeln!(output, "- {name} — {desc}");
855                        }
856                    }
857                    Ok(output)
858                })
859            }
860            // add/remove require mutating self after async I/O.
861            // handle_mcp_command is structured so the only .await crossing a &mut self
862            // boundary goes through a cloned Arc<McpManager> — no &self fields are held
863            // across that .await.  The subsequent state-change methods (rebuild_semantic_index,
864            // sync_mcp_registry) are also async fn(&mut self), but they only hold owned locals
865            // across their own .await points (cloned tools Vec, cloned Arcs).
866            _ => Box::pin(async move {
867                self.handle_mcp_command(&args_owned)
868                    .await
869                    .map_err(|e| CommandError::new(e.to_string()))
870            }),
871        }
872    }
873
874    // ----- /skill -----
875
876    fn handle_skill<'a>(
877        &'a mut self,
878        args: &'a str,
879    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
880        let args_owned = args.to_owned();
881        Box::pin(async move {
882            self.handle_skill_command_as_string(&args_owned)
883                .await
884                .map_err(|e| CommandError::new(e.to_string()))
885        })
886    }
887
888    // ----- /skills -----
889
890    fn handle_skills<'a>(
891        &'a mut self,
892        args: &'a str,
893    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
894        let args_owned = args.to_owned();
895        Box::pin(async move {
896            self.handle_skills_as_string(&args_owned)
897                .await
898                .map_err(|e| CommandError::new(e.to_string()))
899        })
900    }
901
902    // ----- /feedback -----
903
904    fn handle_feedback_command<'a>(
905        &'a mut self,
906        args: &'a str,
907    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
908        let args_owned = args.to_owned();
909        Box::pin(async move {
910            self.handle_feedback_as_string(&args_owned)
911                .await
912                .map_err(|e| CommandError::new(e.to_string()))
913        })
914    }
915
916    // ----- /plan -----
917
918    #[cfg(feature = "scheduler")]
919    fn handle_plan<'a>(
920        &'a mut self,
921        input: &'a str,
922    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
923        Box::pin(async move {
924            self.dispatch_plan_command_as_string(input)
925                .await
926                .map_err(|e| CommandError::new(e.to_string()))
927        })
928    }
929
930    #[cfg(not(feature = "scheduler"))]
931    fn handle_plan<'a>(
932        &'a mut self,
933        _input: &'a str,
934    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
935        Box::pin(async move { Ok(String::new()) })
936    }
937
938    // ----- /experiment -----
939
940    fn handle_experiment<'a>(
941        &'a mut self,
942        input: &'a str,
943    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
944        Box::pin(async move {
945            self.handle_experiment_command_as_string(input)
946                .await
947                .map_err(|e| CommandError::new(e.to_string()))
948        })
949    }
950
951    // ----- /agent, @mention -----
952
953    fn handle_agent_dispatch<'a>(
954        &'a mut self,
955        input: &'a str,
956    ) -> Pin<Box<dyn Future<Output = Result<Option<String>, CommandError>> + Send + 'a>> {
957        Box::pin(async move {
958            match self.dispatch_agent_command(input).await {
959                Some(Err(e)) => Err(CommandError::new(e.to_string())),
960                Some(Ok(())) | None => Ok(None),
961            }
962        })
963    }
964
965    // ----- /plugins -----
966
967    fn handle_plugins<'a>(
968        &'a mut self,
969        args: &'a str,
970    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
971        let args_owned = args.to_owned();
972        // Clone the fields needed by PluginManager before entering the async block.
973        // spawn_blocking requires 'static, so we cannot borrow &self inside the closure.
974        let managed_dir = self.services.skill.managed_dir.clone();
975        let mcp_allowed = self.services.mcp.allowed_commands.clone();
976        let base_shell_allowed = self.runtime.lifecycle.startup_shell_overlay.allowed.clone();
977        Box::pin(async move {
978            // PluginManager performs synchronous filesystem I/O (copy, remove_dir_all,
979            // read_dir). Run on a blocking thread to avoid stalling the tokio worker.
980            tokio::task::spawn_blocking(move || {
981                Self::run_plugin_command(&args_owned, managed_dir, mcp_allowed, base_shell_allowed)
982            })
983            .await
984            .map_err(|e| CommandError(format!("plugin task panicked: {e}")))
985        })
986    }
987
988    // ----- /acp -----
989
990    fn handle_acp<'a>(
991        &'a mut self,
992        args: &'a str,
993    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
994        Box::pin(async move {
995            self.handle_acp_as_string(args)
996                .map_err(|e| CommandError::new(e.to_string()))
997        })
998    }
999
1000    // ----- /cocoon -----
1001
1002    #[cfg(feature = "cocoon")]
1003    fn handle_cocoon<'a>(
1004        &'a mut self,
1005        args: &'a str,
1006    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1007        Box::pin(async move {
1008            self.handle_cocoon_as_string(args)
1009                .await
1010                .map_err(|e| CommandError::new(e.to_string()))
1011        })
1012    }
1013
1014    #[cfg(not(feature = "cocoon"))]
1015    fn handle_cocoon<'a>(
1016        &'a mut self,
1017        _args: &'a str,
1018    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1019        Box::pin(async {
1020            Ok("Cocoon support is not compiled in. Rebuild with `--features cocoon`.".to_owned())
1021        })
1022    }
1023
1024    // ----- /loop -----
1025
1026    fn handle_loop<'a>(
1027        &'a mut self,
1028        args: &'a str,
1029    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1030        use zeph_commands::handlers::loop_cmd::parse_loop_args;
1031
1032        let args_owned = args.trim().to_owned();
1033        Box::pin(async move {
1034            if args_owned == "stop" {
1035                return Ok(self.stop_user_loop());
1036            }
1037            if args_owned == "status" {
1038                return Ok(match &self.runtime.lifecycle.user_loop {
1039                    Some(ls) => format!(
1040                        "Loop active: \"{}\" (iteration {}, interval every {}s).",
1041                        ls.prompt,
1042                        ls.iteration,
1043                        ls.interval.period().as_secs(),
1044                    ),
1045                    None => "No active loop.".to_owned(),
1046                });
1047            }
1048            let (prompt, interval_secs) = parse_loop_args(&args_owned)?;
1049
1050            if prompt.starts_with('/') {
1051                return Err(CommandError::new(
1052                    "Loop prompt must not start with '/'. Slash commands cannot be used as loop prompts.",
1053                ));
1054            }
1055
1056            let min_secs = self.runtime.config.loop_min_interval_secs;
1057            if interval_secs < min_secs {
1058                return Err(CommandError::new(format!(
1059                    "Minimum loop interval is {min_secs}s. Got {interval_secs}s."
1060                )));
1061            }
1062            if self.runtime.lifecycle.user_loop.is_some() {
1063                return Err(CommandError::new(
1064                    "A loop is already active. Use /loop stop first.",
1065                ));
1066            }
1067
1068            self.start_user_loop(prompt.clone(), interval_secs);
1069            Ok(format!(
1070                "Loop started: \"{prompt}\" every {interval_secs}s. Use /loop stop to cancel."
1071            ))
1072        })
1073    }
1074
1075    fn notify_test<'a>(
1076        &'a mut self,
1077    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1078        let notifier = self.runtime.lifecycle.notifier.clone();
1079        Box::pin(async move {
1080            let Some(notifier) = notifier else {
1081                return Ok(
1082                    "Notifications are disabled. Set `notifications.enabled = true` in config."
1083                        .to_owned(),
1084                );
1085            };
1086            match notifier.fire_test().await {
1087                Ok(()) => Ok("Test notification sent.".to_owned()),
1088                Err(e) => Err(CommandError::new(format!("notification test failed: {e}"))),
1089            }
1090        })
1091    }
1092
1093    fn handle_trajectory(&mut self, args: &str) -> String {
1094        self.handle_trajectory_command_as_string(args)
1095    }
1096
1097    fn handle_scope(&self, args: &str) -> String {
1098        self.handle_scope_command_as_string(args)
1099    }
1100
1101    // ----- /goal -----
1102
1103    fn handle_goal<'a>(
1104        &'a mut self,
1105        args: &'a str,
1106    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1107        // Extract all non-Send data synchronously before entering the async block.
1108        if self.services.goal_accounting.is_none() {
1109            if !self.runtime.config.goals.enabled {
1110                return Box::pin(async {
1111                    Ok("Goals are disabled. Set `[goals] enabled = true` in config.".to_owned())
1112                });
1113            }
1114            let pool = match self.services.memory.persistence.memory.as_ref() {
1115                Some(m) => std::sync::Arc::new(m.sqlite().pool().clone()),
1116                None => {
1117                    return Box::pin(async {
1118                        Ok("Goals require a database backend (memory not configured).".to_owned())
1119                    });
1120                }
1121            };
1122            let store = std::sync::Arc::new(crate::goal::GoalStore::new(pool));
1123            let accounting = std::sync::Arc::new(crate::goal::GoalAccounting::new(store));
1124            self.services.goal_accounting = Some(accounting);
1125        }
1126
1127        let accounting =
1128            self.services.goal_accounting.clone().expect(
1129                "invariant: goal_accounting is always Some at this point (initialized above)",
1130            );
1131        let max_chars = self.runtime.config.goals.max_text_chars;
1132        let default_budget = self.runtime.config.goals.default_token_budget;
1133        let autonomous_enabled = self.runtime.config.goals.autonomous_enabled;
1134        let autonomous_max_turns = self.runtime.config.goals.autonomous_max_turns;
1135        let args_owned = args.to_owned();
1136
1137        // S1: `goal_create` may need to arm `AutonomousDriver` with a new session.
1138        // We capture a clone of the pending_start Arc that lives on the driver.
1139        // The async block fills it; the main agent loop (which has `&mut self`) drains it
1140        // via `AutonomousDriver::flush_pending_start()` after each command handler returns.
1141        let pending_start_arc = std::sync::Arc::clone(&self.services.autonomous.pending_start_arc);
1142
1143        Box::pin(async move {
1144            let _ = accounting.refresh().await;
1145            let store = accounting.get_store();
1146            let args = args_owned.as_str();
1147
1148            match args {
1149                "" | "status" => goal_status(&accounting).await,
1150                "pause" => goal_pause(&accounting, &store).await,
1151                "resume" => goal_resume(&accounting, &store).await,
1152                "complete" => goal_complete(&accounting, &store).await,
1153                "clear" => goal_clear(&accounting, &store).await,
1154                "list" => goal_list(&store).await,
1155                _ if args.starts_with("create") => {
1156                    let (msg, auto_req) = goal_create(
1157                        args,
1158                        &accounting,
1159                        &store,
1160                        max_chars,
1161                        default_budget,
1162                        autonomous_enabled,
1163                        autonomous_max_turns,
1164                    )
1165                    .await?;
1166                    if let Some(req) = auto_req {
1167                        *pending_start_arc.lock() = Some(req);
1168                    }
1169                    Ok(msg)
1170                }
1171                _ => Ok(
1172                    "Unknown /goal subcommand. Try: create, pause, resume, complete, clear, status, list."
1173                        .to_owned(),
1174                ),
1175            }
1176        })
1177    }
1178
1179    fn active_goal_snapshot(&self) -> Option<zeph_commands::GoalSnapshot> {
1180        let accounting = self.services.goal_accounting.as_ref()?;
1181        let snap = accounting.snapshot()?;
1182        Some(zeph_commands::GoalSnapshot {
1183            id: snap.id,
1184            text: snap.text,
1185            status: match snap.status {
1186                crate::goal::GoalStatus::Active => zeph_commands::GoalStatusView::Active,
1187                crate::goal::GoalStatus::Paused => zeph_commands::GoalStatusView::Paused,
1188                crate::goal::GoalStatus::Completed => zeph_commands::GoalStatusView::Completed,
1189                crate::goal::GoalStatus::Cleared => zeph_commands::GoalStatusView::Cleared,
1190            },
1191            turns_used: snap.turns_used,
1192            tokens_used: snap.tokens_used,
1193            token_budget: snap.token_budget,
1194        })
1195    }
1196
1197    // ----- /agents -----
1198
1199    fn handle_agents<'a>(
1200        &'a mut self,
1201        args: &'a str,
1202    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1203        use zeph_commands::handlers::agents_fleet::{FleetEntry, format_fleet_section};
1204        use zeph_subagent::AgentsCommand;
1205
1206        let args_owned = args.trim().to_owned();
1207        Box::pin(async move {
1208            // Fleet view: bare `/agents` or `/agents fleet` shows autonomous sessions + definitions.
1209            let show_fleet = args_owned.is_empty() || args_owned == "fleet";
1210
1211            let fleet_section = if show_fleet {
1212                let snapshots = self.services.autonomous_registry.list();
1213                let entries: Vec<FleetEntry> = snapshots
1214                    .into_iter()
1215                    .map(|s| FleetEntry {
1216                        goal_id: s.goal_id,
1217                        goal_text_short: s.goal_text_short,
1218                        state: s.state,
1219                        turns_executed: s.turns_executed,
1220                        max_turns: s.max_turns,
1221                        elapsed: s.elapsed,
1222                    })
1223                    .collect();
1224                format_fleet_section(&entries)
1225            } else {
1226                String::new()
1227            };
1228
1229            // Sub-agent definitions section.
1230            let definitions_section = if show_fleet || args_owned == "list" {
1231                self.handle_agents_definitions_list()
1232            } else {
1233                // CRUD subcommands: show, create, edit, delete.
1234                match AgentsCommand::parse(&format!("/agents {args_owned}")) {
1235                    Ok(cmd) => self.handle_agents_crud(cmd),
1236                    Err(e) => e.to_string(),
1237                }
1238            };
1239
1240            let mut out = fleet_section;
1241            if !definitions_section.is_empty() {
1242                if !out.is_empty() {
1243                    out.push('\n');
1244                }
1245                out.push_str(&definitions_section);
1246            }
1247
1248            if out.is_empty() {
1249                "No active autonomous sessions or sub-agent definitions found."
1250                    .clone_into(&mut out);
1251            }
1252
1253            Ok(out)
1254        })
1255    }
1256}
1257
1258type GoalStore = crate::goal::GoalStore;
1259type GoalAccounting = crate::goal::GoalAccounting;
1260
1261/// Hard cap on `--turns` to prevent runaway autonomous loops (Security Low).
1262const AUTONOMOUS_MAX_TURNS_CAP: u32 = 1000;
1263
1264async fn goal_status(accounting: &GoalAccounting) -> Result<String, CommandError> {
1265    match accounting.get_active().await {
1266        Ok(Some(g)) => {
1267            let budget_line = g.token_budget.map_or_else(
1268                || format!("  tokens used: {}", g.tokens_used),
1269                |b| format!("  budget: {}/{b}", g.tokens_used),
1270            );
1271            Ok(format!(
1272                "Active goal [{}]: {}\n  status: {}\n  turns: {}\n{}",
1273                &g.id[..8],
1274                g.text,
1275                g.status,
1276                g.turns_used,
1277                budget_line
1278            ))
1279        }
1280        Ok(None) => Ok("No active goal. Use `/goal create <text>` to set one.".to_owned()),
1281        Err(e) => Ok(format!("Goal lookup failed: {e}")),
1282    }
1283}
1284
1285/// Returns `(display_message, auto_start_request)`.
1286///
1287/// `auto_start_request` is `Some((goal_id, goal_text, max_turns))` when `--auto` was passed and
1288/// the goal was successfully created. The caller must relay this to `AutonomousDriver` via the
1289/// `pending_start_arc` side-channel before the future resolves.
1290async fn goal_create(
1291    args: &str,
1292    accounting: &GoalAccounting,
1293    store: &GoalStore,
1294    max_chars: usize,
1295    default_budget: Option<u64>,
1296    autonomous_enabled: bool,
1297    autonomous_max_turns: u32,
1298) -> Result<(String, Option<(String, String, u32)>), CommandError> {
1299    let rest = args.strip_prefix("create").unwrap_or("").trim();
1300
1301    // Strip --auto / --turns before passing text to the budget parser.
1302    let (stripped, is_auto, explicit_turns) = parse_auto_flags(rest);
1303    let (text, explicit_budget) = parse_goal_create_args(&stripped);
1304
1305    if text.is_empty() {
1306        return Ok((
1307            "Usage: /goal create <text> [--budget N] [--auto [--turns N]]".to_owned(),
1308            None,
1309        ));
1310    }
1311    if is_auto && !autonomous_enabled {
1312        return Ok((
1313            "Autonomous mode is disabled. Set `[goals] autonomous_enabled = true` in config."
1314                .to_owned(),
1315            None,
1316        ));
1317    }
1318    let budget = explicit_budget.or(default_budget.filter(|&b| b > 0));
1319
1320    let max_turns = explicit_turns
1321        .unwrap_or(autonomous_max_turns)
1322        .min(AUTONOMOUS_MAX_TURNS_CAP);
1323    if explicit_turns.is_some_and(|t| t > AUTONOMOUS_MAX_TURNS_CAP) {
1324        tracing::warn!(
1325            requested = explicit_turns,
1326            capped = AUTONOMOUS_MAX_TURNS_CAP,
1327            "autonomous max_turns capped to {AUTONOMOUS_MAX_TURNS_CAP}"
1328        );
1329    }
1330
1331    match store.create(text, budget, max_chars).await {
1332        Ok(g) => {
1333            let _ = accounting.refresh().await;
1334            let auto_start = if is_auto {
1335                Some((g.id.clone(), g.text.clone(), max_turns))
1336            } else {
1337                None
1338            };
1339            let auto_note = if is_auto {
1340                " Autonomous mode enabled — use `/goal clear` to stop."
1341            } else {
1342                ""
1343            };
1344            Ok((
1345                format!("Goal created [{}]: {}{auto_note}", &g.id[..8], g.text),
1346                auto_start,
1347            ))
1348        }
1349        Err(crate::goal::store::GoalError::TextTooLong { max }) => Ok((
1350            format!("Goal text exceeds {max} characters. Please shorten it."),
1351            None,
1352        )),
1353        Err(e) => Ok((format!("Failed to create goal: {e}"), None)),
1354    }
1355}
1356
1357async fn goal_pause(
1358    accounting: &GoalAccounting,
1359    store: &GoalStore,
1360) -> Result<String, CommandError> {
1361    match accounting.get_active().await {
1362        Ok(Some(g)) => {
1363            match store
1364                .transition(&g.id, crate::goal::GoalStatus::Paused, g.updated_at)
1365                .await
1366            {
1367                Ok(_) => {
1368                    let _ = accounting.refresh().await;
1369                    Ok(format!("Goal [{}] paused.", &g.id[..8]))
1370                }
1371                Err(crate::goal::store::GoalError::StaleUpdate(_)) => {
1372                    let current = accounting.get_active().await.ok().flatten();
1373                    Ok(format!(
1374                        "Goal state changed concurrently. Current: {}",
1375                        current.map_or_else(|| "none".into(), |g| g.status.to_string())
1376                    ))
1377                }
1378                Err(e) => Ok(format!("Pause failed: {e}")),
1379            }
1380        }
1381        Ok(None) => Ok("No active goal to pause.".to_owned()),
1382        Err(e) => Ok(format!("Failed: {e}")),
1383    }
1384}
1385
1386async fn goal_resume(
1387    accounting: &GoalAccounting,
1388    store: &GoalStore,
1389) -> Result<String, CommandError> {
1390    let goals = store.list(10).await.unwrap_or_default();
1391    let paused = goals
1392        .into_iter()
1393        .find(|g| g.status == crate::goal::GoalStatus::Paused);
1394    match paused {
1395        Some(g) => {
1396            match store
1397                .transition(&g.id, crate::goal::GoalStatus::Active, g.updated_at)
1398                .await
1399            {
1400                Ok(_) => {
1401                    let _ = accounting.refresh().await;
1402                    Ok(format!("Goal [{}] resumed: {}", &g.id[..8], g.text))
1403                }
1404                Err(crate::goal::store::GoalError::StaleUpdate(_)) => {
1405                    Ok("Goal state changed concurrently — please retry.".to_owned())
1406                }
1407                Err(e) => Ok(format!("Resume failed: {e}")),
1408            }
1409        }
1410        None => Ok("No paused goal to resume.".to_owned()),
1411    }
1412}
1413
1414async fn goal_complete(
1415    accounting: &GoalAccounting,
1416    store: &GoalStore,
1417) -> Result<String, CommandError> {
1418    match accounting.get_active().await {
1419        Ok(Some(g)) => {
1420            match store
1421                .transition(&g.id, crate::goal::GoalStatus::Completed, g.updated_at)
1422                .await
1423            {
1424                Ok(_) => {
1425                    let _ = accounting.refresh().await;
1426                    Ok(format!("Goal [{}] marked complete.", &g.id[..8]))
1427                }
1428                Err(e) => Ok(format!("Complete failed: {e}")),
1429            }
1430        }
1431        Ok(None) => Ok("No active goal.".to_owned()),
1432        Err(e) => Ok(format!("Failed: {e}")),
1433    }
1434}
1435
1436async fn goal_clear(
1437    accounting: &GoalAccounting,
1438    store: &GoalStore,
1439) -> Result<String, CommandError> {
1440    let goals = store.list(10).await.unwrap_or_default();
1441    let target = goals.into_iter().find(|g| {
1442        g.status == crate::goal::GoalStatus::Active || g.status == crate::goal::GoalStatus::Paused
1443    });
1444    match target {
1445        Some(g) => {
1446            match store
1447                .transition(&g.id, crate::goal::GoalStatus::Cleared, g.updated_at)
1448                .await
1449            {
1450                Ok(_) => {
1451                    let _ = accounting.refresh().await;
1452                    Ok(format!("Goal [{}] cleared.", &g.id[..8]))
1453                }
1454                Err(e) => Ok(format!("Clear failed: {e}")),
1455            }
1456        }
1457        None => Ok("No active or paused goal to clear.".to_owned()),
1458    }
1459}
1460
1461async fn goal_list(store: &GoalStore) -> Result<String, CommandError> {
1462    let goals = store.list(20).await.unwrap_or_default();
1463    if goals.is_empty() {
1464        return Ok("No goals recorded.".to_owned());
1465    }
1466    let mut out = String::from("Goals:\n");
1467    for g in goals {
1468        let _ = std::fmt::Write::write_fmt(
1469            &mut out,
1470            format_args!(
1471                "  {} [{}] {} — {} turns\n",
1472                g.status.badge_symbol(),
1473                &g.id[..8],
1474                g.text,
1475                g.turns_used
1476            ),
1477        );
1478    }
1479    Ok(out.trim_end().to_owned())
1480}
1481
1482fn parse_goal_create_args(args: &str) -> (&str, Option<u64>) {
1483    if let Some(pos) = args.find("--budget") {
1484        let text = args[..pos].trim();
1485        let rest = args[pos + "--budget".len()..].trim();
1486        let budget = rest
1487            .split_whitespace()
1488            .next()
1489            .and_then(|s| s.parse::<u64>().ok());
1490        (text, budget)
1491    } else {
1492        (args, None)
1493    }
1494}
1495
1496/// Parse `--auto` and `--turns N` flags from the remainder of a `/goal create` argument string.
1497///
1498/// Returns `(text_without_auto_flags, is_auto, explicit_turns)`.
1499fn parse_auto_flags(args: &str) -> (String, bool, Option<u32>) {
1500    let mut is_auto = false;
1501    let mut turns: Option<u32> = None;
1502    let mut text_words: Vec<&str> = Vec::new();
1503    let mut words = args.split_whitespace();
1504
1505    while let Some(w) = words.next() {
1506        if w == "--auto" {
1507            is_auto = true;
1508        } else if w == "--turns" {
1509            turns = words.next().and_then(|n| n.parse::<u32>().ok());
1510        } else {
1511            text_words.push(w);
1512        }
1513    }
1514
1515    (text_words.join(" "), is_auto, turns)
1516}
1517
1518/// Convert `AgentError` to `CommandError` for the trait boundary.
1519impl From<AgentError> for CommandError {
1520    fn from(e: AgentError) -> Self {
1521        Self(e.to_string())
1522    }
1523}
1524
1525#[cfg(test)]
1526mod tests {
1527    use super::super::agent_tests::{
1528        MockChannel, MockToolExecutor, create_test_registry, mock_provider,
1529    };
1530    use super::*;
1531    use zeph_commands::traits::agent::AgentAccess;
1532    use zeph_memory::semantic::SemanticMemory;
1533
1534    async fn memory_without_qdrant() -> SemanticMemory {
1535        SemanticMemory::new(
1536            ":memory:",
1537            "http://127.0.0.1:1",
1538            None,
1539            zeph_llm::any::AnyProvider::Mock(zeph_llm::mock::MockProvider::default()),
1540            "test-model",
1541        )
1542        .await
1543        .unwrap()
1544    }
1545
1546    // R-CRIT-4111: when graph is enabled in config but graph_store is None
1547    // (Qdrant unreachable), graph command handlers must report
1548    // "unavailable" rather than "not enabled".
1549    #[tokio::test]
1550    async fn graph_stats_enabled_but_no_store_reports_unavailable() {
1551        let cfg = crate::config::GraphConfig {
1552            enabled: true,
1553            ..Default::default()
1554        };
1555        let memory = memory_without_qdrant().await;
1556        let cid = memory.sqlite().create_conversation().await.unwrap();
1557        let mut agent = Agent::new(
1558            mock_provider(vec![]),
1559            MockChannel::new(vec![]),
1560            create_test_registry(),
1561            None,
1562            5,
1563            MockToolExecutor::no_tools(),
1564        )
1565        .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1566        .with_graph_config(cfg);
1567
1568        let result = agent.graph_stats().await.unwrap();
1569        assert!(
1570            result.contains("unavailable"),
1571            "expected 'unavailable' but got: {result}"
1572        );
1573        assert!(
1574            !result.contains("not enabled"),
1575            "must not report 'not enabled' when graph is enabled: {result}"
1576        );
1577    }
1578
1579    #[tokio::test]
1580    async fn graph_stats_disabled_reports_not_enabled() {
1581        let cfg = crate::config::GraphConfig {
1582            enabled: false,
1583            ..Default::default()
1584        };
1585        let memory = memory_without_qdrant().await;
1586        let cid = memory.sqlite().create_conversation().await.unwrap();
1587        let mut agent = Agent::new(
1588            mock_provider(vec![]),
1589            MockChannel::new(vec![]),
1590            create_test_registry(),
1591            None,
1592            5,
1593            MockToolExecutor::no_tools(),
1594        )
1595        .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1596        .with_graph_config(cfg);
1597
1598        let result = agent.graph_stats().await.unwrap();
1599        assert!(
1600            result.contains("not enabled"),
1601            "expected 'not enabled' but got: {result}"
1602        );
1603    }
1604
1605    // R-CRIT-4136: graph_backfill must resolve extract_provider before entering the async block.
1606    // When extract_provider is set to an unknown name, resolve_background_provider falls back to
1607    // the primary provider — the backfill still completes (no messages to process).
1608    // This test confirms that the provider-resolution code path executes without panic or borrow
1609    // errors, which would occur if the old code tried to access `&mut self` inside `async move`.
1610    #[tokio::test]
1611    async fn graph_backfill_with_extract_provider_resolves_without_panic() {
1612        let cfg = crate::config::GraphConfig {
1613            enabled: true,
1614            extract_provider: zeph_config::providers::ProviderName::new("nonexistent-provider"),
1615            ..Default::default()
1616        };
1617        let mut memory = memory_without_qdrant().await;
1618        // Install a real SQLite-backed GraphStore so resolve_graph_store succeeds.
1619        let pool = memory.sqlite().pool().clone();
1620        memory.graph_store = Some(std::sync::Arc::new(zeph_memory::GraphStore::new(pool)));
1621        let cid = memory.sqlite().create_conversation().await.unwrap();
1622        let mut agent = Agent::new(
1623            mock_provider(vec![]),
1624            MockChannel::new(vec![]),
1625            create_test_registry(),
1626            None,
1627            5,
1628            MockToolExecutor::no_tools(),
1629        )
1630        .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1631        .with_graph_config(cfg);
1632
1633        let mut progress = vec![];
1634        let result = agent
1635            .graph_backfill(Some(10), &mut |msg| progress.push(msg))
1636            .await
1637            .unwrap();
1638
1639        // With an empty store there are zero unprocessed messages → backfill completes immediately.
1640        assert!(
1641            result.contains("Backfill complete"),
1642            "expected 'Backfill complete' but got: {result}"
1643        );
1644    }
1645
1646    // R-4139: graph_entities with enabled graph but no store (Qdrant unreachable) must
1647    // report unavailable, not panic or hang.
1648    #[tokio::test]
1649    async fn graph_entities_enabled_but_no_store_reports_unavailable() {
1650        let cfg = crate::config::GraphConfig {
1651            enabled: true,
1652            ..Default::default()
1653        };
1654        let memory = memory_without_qdrant().await;
1655        let cid = memory.sqlite().create_conversation().await.unwrap();
1656        let mut agent = Agent::new(
1657            mock_provider(vec![]),
1658            MockChannel::new(vec![]),
1659            create_test_registry(),
1660            None,
1661            5,
1662            MockToolExecutor::no_tools(),
1663        )
1664        .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1665        .with_graph_config(cfg);
1666
1667        let result = agent.graph_entities().await.unwrap();
1668        assert!(
1669            result.contains("unavailable"),
1670            "expected 'unavailable' but got: {result}"
1671        );
1672    }
1673
1674    // R-4139: graph_communities with enabled graph but no store must report unavailable.
1675    #[tokio::test]
1676    async fn graph_communities_enabled_but_no_store_reports_unavailable() {
1677        let cfg = crate::config::GraphConfig {
1678            enabled: true,
1679            ..Default::default()
1680        };
1681        let memory = memory_without_qdrant().await;
1682        let cid = memory.sqlite().create_conversation().await.unwrap();
1683        let mut agent = Agent::new(
1684            mock_provider(vec![]),
1685            MockChannel::new(vec![]),
1686            create_test_registry(),
1687            None,
1688            5,
1689            MockToolExecutor::no_tools(),
1690        )
1691        .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1692        .with_graph_config(cfg);
1693
1694        let result = agent.graph_communities().await.unwrap();
1695        assert!(
1696            result.contains("unavailable"),
1697            "expected 'unavailable' but got: {result}"
1698        );
1699    }
1700
1701    // R-4139: verify that the tokio::time::timeout pattern used in graph handlers
1702    // correctly returns Err on a never-resolving future. This is a direct regression
1703    // guard for the fix introduced in #4139: before the fix, these calls had no
1704    // timeout guard and would block indefinitely when Qdrant was unreachable.
1705    #[tokio::test]
1706    async fn graph_store_timeout_pattern_fires_on_pending_future() {
1707        use std::future;
1708        let result = tokio::time::timeout(
1709            Duration::from_millis(10),
1710            future::pending::<Result<Vec<()>, String>>(),
1711        )
1712        .await;
1713        assert!(
1714            result.is_err(),
1715            "timeout must fire on a never-resolving future"
1716        );
1717    }
1718}