Skip to main content

zeph_core/agent/
agent_access_impl.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Implementation of [`zeph_commands::traits::agent::AgentAccess`] for [`Agent<C>`].
5//!
6//! Each method in `AgentAccess` returns a formatted `String` result (without sending to the
7//! channel directly), so that `CommandContext::sink` does not conflict with this borrow.
8//! The one exception is methods for subsystems that are already channel-free (memory, graph).
9//!
10//! [`Agent<C>`]: super::Agent
11
12use std::fmt::Write as _;
13use std::future::Future;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::Duration;
17
18use tracing::Instrument as _;
19use zeph_commands::CommandError;
20use zeph_commands::traits::agent::AgentAccess;
21use zeph_memory::semantic::SemanticMemory;
22use zeph_memory::{GraphExtractionConfig, GraphStore, MessageId, extract_and_store};
23
24use super::{Agent, error::AgentError};
25use crate::channel::Channel;
26
27impl<C: Channel + Send + 'static> Agent<C> {
28    fn resolve_graph_store(&self) -> Result<(Arc<SemanticMemory>, Arc<GraphStore>), String> {
29        let Some(memory) = self.services.memory.persistence.memory.clone() else {
30            return Err("Graph memory is not enabled.".to_owned());
31        };
32        let Some(store) = memory.graph_store.clone() else {
33            if self.services.memory.extraction.graph_config.enabled {
34                return Err(
35                    "Graph memory enabled but vector store unavailable (Qdrant unreachable)."
36                        .to_owned(),
37                );
38            }
39            return Err("Graph memory is not enabled.".to_owned());
40        };
41        Ok((memory, store))
42    }
43}
44
45impl<C: Channel + Send + 'static> AgentAccess for Agent<C> {
46    // ----- /memory -----
47
48    fn memory_tiers<'a>(
49        &'a mut self,
50    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
51        Box::pin(
52            async move {
53                let Some(memory) = self.services.memory.persistence.memory.clone() else {
54                    return Ok("Memory not configured.".to_owned());
55                };
56                match memory.sqlite().count_messages_by_tier().await {
57                    Ok((episodic, semantic)) => {
58                        let mut out = String::new();
59                        let _ = writeln!(out, "Memory tiers:");
60                        let _ = writeln!(out, "  Working:  (current context window — virtual)");
61                        let _ = writeln!(out, "  Episodic: {episodic} messages");
62                        let _ = writeln!(out, "  Semantic: {semantic} facts");
63                        Ok(out.trim_end().to_owned())
64                    }
65                    Err(e) => Ok(format!("Failed to query tier stats: {e}")),
66                }
67            }
68            .instrument(tracing::info_span!("core.agent_access.memory_tiers")),
69        )
70    }
71
72    fn memory_promote<'a>(
73        &'a mut self,
74        ids_str: &'a str,
75    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
76        Box::pin(
77            async move {
78                let Some(memory) = self.services.memory.persistence.memory.clone() else {
79                    return Ok("Memory not configured.".to_owned());
80                };
81                let ids: Vec<MessageId> = ids_str
82                    .split_whitespace()
83                    .filter_map(|s| s.parse::<i64>().ok().map(MessageId))
84                    .collect();
85                if ids.is_empty() {
86                    return Ok(
87                        "Usage: /memory promote <id> [id...]\nExample: /memory promote 42 43 44"
88                            .to_owned(),
89                    );
90                }
91                match memory.sqlite().manual_promote(&ids).await {
92                    Ok(count) => Ok(format!("Promoted {count} message(s) to semantic tier.")),
93                    Err(e) => Ok(format!("Promotion failed: {e}")),
94                }
95            }
96            .instrument(tracing::info_span!("core.agent_access.memory_promote")),
97        )
98    }
99
100    // ----- /graph -----
101
102    fn graph_stats<'a>(
103        &'a mut self,
104    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
105        Box::pin(
106            async move {
107                let (_, store) = match self.resolve_graph_store() {
108                    Ok(pair) => pair,
109                    Err(msg) => return Ok(msg),
110                };
111
112                let stats_future = async {
113                    tokio::join!(
114                        store.entity_count(),
115                        store.active_edge_count(),
116                        store.community_count(),
117                        store.edge_type_distribution()
118                    )
119                };
120                let Ok((entities, edges, communities, distribution)) =
121                    tokio::time::timeout(Duration::from_secs(5), stats_future).await
122                else {
123                    tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
124                    return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
125                };
126                let mut msg = format!(
127                    "Graph memory: {} entities, {} edges, {} communities",
128                    entities.unwrap_or(0),
129                    edges.unwrap_or(0),
130                    communities.unwrap_or(0)
131                );
132                if let Ok(dist) = distribution
133                    && !dist.is_empty()
134                {
135                    let dist_str: Vec<String> =
136                        dist.iter().map(|(t, c)| format!("{t}={c}")).collect();
137                    write!(msg, "\nEdge types: {}", dist_str.join(", ")).unwrap_or(());
138                }
139                Ok(msg)
140            }
141            .instrument(tracing::info_span!("core.agent_access.graph_stats")),
142        )
143    }
144
145    fn graph_entities<'a>(
146        &'a mut self,
147    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
148        Box::pin(
149            async move {
150                let (_, store) = match self.resolve_graph_store() {
151                    Ok(pair) => pair,
152                    Err(msg) => return Ok(msg),
153                };
154
155                let entities = match tokio::time::timeout(
156                    Duration::from_secs(5),
157                    store.all_entities(),
158                )
159                .await
160                {
161                    Ok(Ok(v)) => v,
162                    Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
163                    Err(_) => {
164                        tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
165                        return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
166                    }
167                };
168                if entities.is_empty() {
169                    return Ok("No entities found.".to_owned());
170                }
171
172                let total = entities.len();
173                let display: Vec<String> = entities
174                    .iter()
175                    .take(50)
176                    .map(|e| {
177                        format!(
178                            "  {:<40}  {:<15}  {}",
179                            e.name,
180                            e.entity_type.as_str(),
181                            e.last_seen_at.split('T').next().unwrap_or(&e.last_seen_at)
182                        )
183                    })
184                    .collect();
185                let mut msg = format!(
186                    "Entities ({total} total):\n  {:<40}  {:<15}  {}\n{}",
187                    "NAME",
188                    "TYPE",
189                    "LAST SEEN",
190                    display.join("\n")
191                );
192                if total > 50 {
193                    write!(msg, "\n  ...and {} more", total - 50).unwrap_or(());
194                }
195                Ok(msg)
196            }
197            .instrument(tracing::info_span!("core.agent_access.graph_entities")),
198        )
199    }
200
201    fn graph_facts<'a>(
202        &'a mut self,
203        name: &'a str,
204    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
205        Box::pin(
206            async move {
207                let (_, store) = match self.resolve_graph_store() {
208                    Ok(pair) => pair,
209                    Err(msg) => return Ok(msg),
210                };
211
212                let matches = match tokio::time::timeout(
213                    Duration::from_secs(5),
214                    store.find_entity_by_name(name),
215                )
216                .await
217                {
218                    Ok(Ok(v)) => v,
219                    Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
220                    Err(_) => {
221                        tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
222                        return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
223                    }
224                };
225                if matches.is_empty() {
226                    return Ok(format!("No entity found matching '{name}'."));
227                }
228
229                let entity = &matches[0];
230                let edges = match tokio::time::timeout(
231                    Duration::from_secs(5),
232                    store.edges_for_entity(entity.id.0),
233                )
234                .await
235                {
236                    Ok(Ok(v)) => v,
237                    Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
238                    Err(_) => {
239                        tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
240                        return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
241                    }
242                };
243                if edges.is_empty() {
244                    return Ok(format!("Entity '{}' has no known facts.", entity.name));
245                }
246
247                let mut entity_names: std::collections::HashMap<i64, String> =
248                    std::collections::HashMap::new();
249                entity_names.insert(entity.id.0, entity.name.clone());
250                for edge in &edges {
251                    let other_id = if edge.source_entity_id == entity.id.0 {
252                        edge.target_entity_id
253                    } else {
254                        edge.source_entity_id
255                    };
256                    entity_names.entry(other_id).or_default();
257                }
258                for (&id, name_val) in &mut entity_names {
259                    if name_val.is_empty() {
260                        let result = tokio::time::timeout(
261                            Duration::from_secs(5),
262                            store.find_entity_by_id(id),
263                        )
264                        .await;
265                        if let Ok(Ok(Some(other))) = result {
266                            *name_val = other.name;
267                        } else {
268                            *name_val = format!("#{id}");
269                        }
270                    }
271                }
272
273                let lines: Vec<String> = edges
274                    .iter()
275                    .map(|e| {
276                        let src = entity_names
277                            .get(&e.source_entity_id)
278                            .cloned()
279                            .unwrap_or_else(|| format!("#{}", e.source_entity_id));
280                        let tgt = entity_names
281                            .get(&e.target_entity_id)
282                            .cloned()
283                            .unwrap_or_else(|| format!("#{}", e.target_entity_id));
284                        format!(
285                            "  {} --[{}/{}]--> {}: {} (confidence: {:.2})",
286                            src, e.relation, e.edge_type, tgt, e.fact, e.confidence
287                        )
288                    })
289                    .collect();
290                Ok(format!(
291                    "Facts for '{}':\n{}",
292                    entity.name,
293                    lines.join("\n")
294                ))
295            }
296            .instrument(tracing::info_span!("core.agent_access.graph_facts")),
297        )
298    }
299
300    fn graph_history<'a>(
301        &'a mut self,
302        name: &'a str,
303    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
304        Box::pin(
305            async move {
306                let (_, store) = match self.resolve_graph_store() {
307                    Ok(pair) => pair,
308                    Err(msg) => return Ok(msg),
309                };
310
311                let matches = match tokio::time::timeout(
312                    Duration::from_secs(5),
313                    store.find_entity_by_name(name),
314                )
315                .await
316                {
317                    Ok(Ok(v)) => v,
318                    Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
319                    Err(_) => {
320                        tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
321                        return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
322                    }
323                };
324                if matches.is_empty() {
325                    return Ok(format!("No entity found matching '{name}'."));
326                }
327
328                let entity = &matches[0];
329                let edges = match tokio::time::timeout(
330                    Duration::from_secs(5),
331                    store.edge_history_for_entity(entity.id.0, 50),
332                )
333                .await
334                {
335                    Ok(Ok(v)) => v,
336                    Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
337                    Err(_) => {
338                        tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
339                        return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
340                    }
341                };
342                if edges.is_empty() {
343                    return Ok(format!("Entity '{}' has no edge history.", entity.name));
344                }
345
346                let mut entity_names: std::collections::HashMap<i64, String> =
347                    std::collections::HashMap::new();
348                entity_names.insert(entity.id.0, entity.name.clone());
349                for edge in &edges {
350                    for &id in &[edge.source_entity_id, edge.target_entity_id] {
351                        entity_names.entry(id).or_default();
352                    }
353                }
354                for (&id, name_val) in &mut entity_names {
355                    if name_val.is_empty() {
356                        let result = tokio::time::timeout(
357                            Duration::from_secs(5),
358                            store.find_entity_by_id(id),
359                        )
360                        .await;
361                        if let Ok(Ok(Some(other))) = result {
362                            *name_val = other.name;
363                        } else {
364                            *name_val = format!("#{id}");
365                        }
366                    }
367                }
368
369                let n = edges.len();
370                let lines: Vec<String> = edges
371                    .iter()
372                    .map(|e| {
373                        let status = if e.valid_to.is_some() {
374                            let date = e
375                                .valid_to
376                                .as_deref()
377                                .and_then(|s| s.split('T').next().or_else(|| s.split(' ').next()))
378                                .unwrap_or("?");
379                            format!("[expired {date}]")
380                        } else {
381                            "[active]".to_string()
382                        };
383                        let src = entity_names
384                            .get(&e.source_entity_id)
385                            .cloned()
386                            .unwrap_or_else(|| format!("#{}", e.source_entity_id));
387                        let tgt = entity_names
388                            .get(&e.target_entity_id)
389                            .cloned()
390                            .unwrap_or_else(|| format!("#{}", e.target_entity_id));
391                        format!(
392                            "  {status} {} --[{}/{}]--> {}: {} (confidence: {:.2})",
393                            src, e.relation, e.edge_type, tgt, e.fact, e.confidence
394                        )
395                    })
396                    .collect();
397                Ok(format!(
398                    "Edge history for '{}' ({n} edges):\n{}",
399                    entity.name,
400                    lines.join("\n")
401                ))
402            }
403            .instrument(tracing::info_span!("core.agent_access.graph_history")),
404        )
405    }
406
407    fn graph_communities<'a>(
408        &'a mut self,
409    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
410        Box::pin(
411            async move {
412                let (_, store) = match self.resolve_graph_store() {
413                    Ok(pair) => pair,
414                    Err(msg) => return Ok(msg),
415                };
416
417                let communities =
418                    match tokio::time::timeout(Duration::from_secs(5), store.all_communities())
419                        .await
420                    {
421                        Ok(Ok(v)) => v,
422                        Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
423                        Err(_) => {
424                            tracing::warn!(
425                                "graph store call timed out after 5s (Qdrant unreachable)"
426                            );
427                            return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
428                        }
429                    };
430                if communities.is_empty() {
431                    return Ok("No communities detected yet. Run graph backfill first.".to_owned());
432                }
433
434                let lines: Vec<String> = communities
435                    .iter()
436                    .map(|c| format!("  [{}]: {}", c.name, c.summary))
437                    .collect();
438                Ok(format!(
439                    "Communities ({}):\n{}",
440                    communities.len(),
441                    lines.join("\n")
442                ))
443            }
444            .instrument(tracing::info_span!("core.agent_access.graph_communities")),
445        )
446    }
447
448    #[allow(clippy::too_many_lines)]
449    fn graph_backfill<'a>(
450        &'a mut self,
451        limit: Option<usize>,
452        progress_cb: &'a mut (dyn FnMut(String) + Send),
453    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
454        let store = match self.resolve_graph_store() {
455            Ok((_, s)) => s,
456            Err(msg) => return Box::pin(async move { Ok(msg) }),
457        };
458        let graph_cfg = self.services.memory.extraction.graph_config.clone();
459        let provider = if graph_cfg.extract_provider.as_str().is_empty() {
460            self.provider.clone()
461        } else {
462            self.resolve_background_provider(graph_cfg.extract_provider.as_str())
463        };
464        Box::pin(
465            async move {
466                let total = store.unprocessed_message_count().await.unwrap_or(0);
467                let cap = limit.unwrap_or(usize::MAX);
468
469                progress_cb(format!(
470                    "Starting graph backfill... ({total} unprocessed messages)"
471                ));
472
473                let batch_size = 50usize;
474                let mut processed = 0usize;
475                let mut total_entities = 0usize;
476                let mut total_edges = 0usize;
477
478                loop {
479                    let remaining_cap = cap.saturating_sub(processed);
480                    if remaining_cap == 0 {
481                        break;
482                    }
483                    let batch_limit = batch_size.min(remaining_cap);
484                    let messages = store
485                        .unprocessed_messages_for_backfill(batch_limit)
486                        .await
487                        .map_err(|e| CommandError::new(e.to_string()))?;
488                    if messages.is_empty() {
489                        break;
490                    }
491
492                    let ids: Vec<zeph_memory::types::MessageId> =
493                        messages.iter().map(|(id, _)| *id).collect();
494
495                    for (_id, content) in &messages {
496                        if content.trim().is_empty() {
497                            continue;
498                        }
499                        let extraction_cfg = GraphExtractionConfig {
500                            max_entities: graph_cfg.max_entities_per_message,
501                            max_edges: graph_cfg.max_edges_per_message,
502                            extraction_timeout_secs: graph_cfg.extraction_timeout_secs,
503                            community_refresh_interval: 0,
504                            expired_edge_retention_days: graph_cfg.expired_edge_retention_days,
505                            max_entities_cap: graph_cfg.max_entities,
506                            community_summary_max_prompt_bytes: graph_cfg
507                                .community_summary_max_prompt_bytes,
508                            community_summary_concurrency: graph_cfg.community_summary_concurrency,
509                            lpa_edge_chunk_size: graph_cfg.lpa_edge_chunk_size,
510                            note_linking: zeph_memory::NoteLinkingConfig::default(),
511                            link_weight_decay_lambda: graph_cfg.link_weight_decay_lambda,
512                            link_weight_decay_interval_secs: graph_cfg
513                                .link_weight_decay_interval_secs,
514                            belief_revision_enabled: graph_cfg.belief_revision.enabled,
515                            belief_revision_similarity_threshold: graph_cfg
516                                .belief_revision
517                                .similarity_threshold,
518                            conversation_id: None,
519                            apex_mem_enabled: graph_cfg.apex_mem.enabled,
520                            llm_timeout_secs: graph_cfg.llm_timeout_secs,
521                        };
522                        let pool = store.pool().clone();
523                        match extract_and_store(
524                            content.clone(),
525                            vec![],
526                            provider.clone(),
527                            pool,
528                            extraction_cfg,
529                            None,
530                            None,
531                        )
532                        .await
533                        {
534                            Ok(result) => {
535                                total_entities += result.stats.entities_upserted;
536                                total_edges += result.stats.edges_inserted;
537                            }
538                            Err(e) => {
539                                tracing::warn!("backfill extraction error: {e:#}");
540                            }
541                        }
542                    }
543
544                    store
545                        .mark_messages_graph_processed(&ids)
546                        .await
547                        .map_err(|e| CommandError::new(e.to_string()))?;
548                    processed += messages.len();
549
550                    progress_cb(format!(
551                        "Backfill progress: {processed} messages processed, \
552                     {total_entities} entities, {total_edges} edges"
553                    ));
554                }
555
556                Ok(format!(
557                    "Backfill complete: {total_entities} entities, {total_edges} edges \
558                 extracted from {processed} messages"
559                ))
560            }
561            .instrument(tracing::info_span!("core.agent_access.graph_backfill")),
562        )
563    }
564
565    // ----- /guidelines -----
566
567    fn guidelines<'a>(
568        &'a mut self,
569    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
570        Box::pin(
571            async move {
572                const MAX_DISPLAY_CHARS: usize = 4096;
573
574                let Some(memory) = &self.services.memory.persistence.memory else {
575                    return Ok("No memory backend initialised.".to_owned());
576                };
577
578                let cid = self.services.memory.persistence.conversation_id;
579                let sqlite = memory.sqlite();
580
581                let (version, text) = sqlite
582                    .load_compression_guidelines(cid)
583                    .await
584                    .map_err(|e: zeph_memory::MemoryError| CommandError::new(e.to_string()))?;
585
586                if version == 0 || text.is_empty() {
587                    return Ok("No compression guidelines generated yet.".to_owned());
588                }
589
590                let (_, created_at) = sqlite
591                    .load_compression_guidelines_meta(cid)
592                    .await
593                    .unwrap_or((0, String::new()));
594
595                let (body, truncated) = if text.len() > MAX_DISPLAY_CHARS {
596                    let end = text.floor_char_boundary(MAX_DISPLAY_CHARS);
597                    (&text[..end], true)
598                } else {
599                    (text.as_str(), false)
600                };
601
602                let mut output =
603                    format!("Compression Guidelines (v{version}, updated {created_at}):\n\n{body}");
604                if truncated {
605                    output.push_str("\n\n[truncated]");
606                }
607                Ok(output)
608            }
609            .instrument(tracing::info_span!("core.agent_access.guidelines")),
610        )
611    }
612
613    // ----- /model, /provider -----
614
615    fn handle_model<'a>(
616        &'a mut self,
617        arg: &'a str,
618    ) -> Pin<Box<dyn Future<Output = String> + Send + 'a>> {
619        Box::pin(async move {
620            let input = if arg.is_empty() {
621                "/model".to_owned()
622            } else {
623                format!("/model {arg}")
624            };
625            self.handle_model_command_as_string(&input).await
626        })
627    }
628
629    fn handle_provider<'a>(
630        &'a mut self,
631        arg: &'a str,
632    ) -> Pin<Box<dyn Future<Output = String> + Send + 'a>> {
633        Box::pin(async move { self.handle_provider_command_as_string(arg).await })
634    }
635
636    // ----- /policy -----
637
638    fn handle_policy<'a>(
639        &'a mut self,
640        args: &'a str,
641    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
642        Box::pin(async move { Ok(self.handle_policy_command_as_string(args)) })
643    }
644
645    // ----- /scheduler -----
646
647    #[cfg(feature = "scheduler")]
648    fn list_scheduled_tasks<'a>(
649        &'a mut self,
650    ) -> Pin<Box<dyn Future<Output = Result<Option<String>, CommandError>> + Send + 'a>> {
651        Box::pin(async move {
652            let result = self
653                .handle_scheduler_list_as_string()
654                .await
655                .map_err(|e| CommandError::new(e.to_string()))?;
656            Ok(Some(result))
657        })
658    }
659
660    #[cfg(not(feature = "scheduler"))]
661    fn list_scheduled_tasks<'a>(
662        &'a mut self,
663    ) -> Pin<Box<dyn Future<Output = Result<Option<String>, CommandError>> + Send + 'a>> {
664        Box::pin(async move { Ok(None) })
665    }
666
667    // ----- /lsp -----
668
669    fn lsp_status<'a>(
670        &'a mut self,
671    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
672        Box::pin(async move {
673            self.handle_lsp_status_as_string()
674                .await
675                .map_err(|e| CommandError::new(e.to_string()))
676        })
677    }
678
679    // ----- /recap -----
680
681    fn session_recap<'a>(
682        &'a mut self,
683    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
684        Box::pin(
685            async move {
686                match self.build_recap().await {
687                    Ok(text) => Ok(text),
688                    Err(e) => {
689                        // /recap is an explicit user command — surface a fixed message so that
690                        // LlmError internals (URLs with embedded credentials, response excerpts)
691                        // are never forwarded to the user channel. Full detail goes to the log.
692                        tracing::warn!("session recap command: {}", e.0);
693                        Ok("Recap unavailable — see logs for details".to_string())
694                    }
695                }
696            }
697            .instrument(tracing::info_span!("core.agent_access.session_recap")),
698        )
699    }
700
701    // ----- /compact -----
702
703    fn compact_context<'a>(
704        &'a mut self,
705    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
706        Box::pin(
707            self.compact_context_command()
708                .instrument(tracing::info_span!("core.agent_access.compact_context")),
709        )
710    }
711
712    // ----- /new -----
713
714    fn reset_conversation<'a>(
715        &'a mut self,
716        keep_plan: bool,
717        no_digest: bool,
718    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
719        Box::pin(async move {
720            match self.reset_conversation(keep_plan, no_digest).await {
721                Ok((old_id, new_id)) => {
722                    let old = old_id.map_or_else(|| "none".to_string(), |id| id.0.to_string());
723                    let new = new_id.map_or_else(|| "none".to_string(), |id| id.0.to_string());
724                    let keep_note = if keep_plan { " (plan preserved)" } else { "" };
725                    Ok(format!(
726                        "New conversation started. Previous: {old} → Current: {new}{keep_note}"
727                    ))
728                }
729                Err(e) => Ok(format!("Failed to start new conversation: {e}")),
730            }
731        })
732    }
733
734    // ----- /cache-stats -----
735
736    fn cache_stats(&self) -> String {
737        self.tool_orchestrator.cache_stats()
738    }
739
740    // ----- /status -----
741
742    fn session_status<'a>(
743        &'a mut self,
744    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
745        Box::pin(async move { Ok(self.handle_status_as_string()) })
746    }
747
748    // ----- /guardrail -----
749
750    fn guardrail_status(&self) -> String {
751        self.format_guardrail_status()
752    }
753
754    // ----- /focus -----
755
756    fn focus_status(&self) -> String {
757        self.format_focus_status()
758    }
759
760    // ----- /sidequest -----
761
762    fn sidequest_status(&self) -> String {
763        self.format_sidequest_status()
764    }
765
766    // ----- /image -----
767
768    fn load_image<'a>(
769        &'a mut self,
770        path: &'a str,
771    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
772        Box::pin(async move { Ok(self.handle_image_as_string(path)) })
773    }
774
775    // ----- /mcp -----
776
777    fn handle_mcp<'a>(
778        &'a mut self,
779        args: &'a str,
780    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
781        // Extract all owned data before the async block so no &mut self reference is
782        // held across an .await point, satisfying the `for<'a>` Send bound.
783        let args_owned = args.to_owned();
784        let parts: Vec<String> = args_owned.split_whitespace().map(str::to_owned).collect();
785        let sub = parts.first().cloned().unwrap_or_default();
786
787        match sub.as_str() {
788            "list" => {
789                // Read-only: clone all data before async.
790                let manager = self.services.mcp.manager.clone();
791                let tools_snapshot: Vec<(String, String)> = self
792                    .services
793                    .mcp
794                    .tools
795                    .iter()
796                    .map(|t| (t.server_id.clone(), t.name.clone()))
797                    .collect();
798                Box::pin(async move {
799                    use std::fmt::Write;
800                    let Some(manager) = manager else {
801                        return Ok("MCP is not enabled.".to_owned());
802                    };
803                    let server_ids = manager.list_servers().await;
804                    if server_ids.is_empty() {
805                        return Ok("No MCP servers connected.".to_owned());
806                    }
807                    let mut output = String::from("Connected MCP servers:\n");
808                    let mut total = 0usize;
809                    for id in &server_ids {
810                        let count = tools_snapshot.iter().filter(|(sid, _)| sid == id).count();
811                        total += count;
812                        let _ = writeln!(output, "- {id} ({count} tools)");
813                    }
814                    let _ = write!(output, "Total: {total} tool(s)");
815                    Ok(output)
816                })
817            }
818            "tools" => {
819                // Read-only: collect tool info before async.
820                let server_id = parts.get(1).cloned();
821                let owned_tools: Vec<(String, String)> = if let Some(ref sid) = server_id {
822                    self.services
823                        .mcp
824                        .tools
825                        .iter()
826                        .filter(|t| &t.server_id == sid)
827                        .map(|t| (t.name.clone(), t.description.clone()))
828                        .collect()
829                } else {
830                    Vec::new()
831                };
832                Box::pin(async move {
833                    use std::fmt::Write;
834                    let Some(server_id) = server_id else {
835                        return Ok("Usage: /mcp tools <server_id>".to_owned());
836                    };
837                    if owned_tools.is_empty() {
838                        return Ok(format!("No tools found for server '{server_id}'."));
839                    }
840                    let mut output =
841                        format!("Tools for '{server_id}' ({} total):\n", owned_tools.len());
842                    for (name, desc) in &owned_tools {
843                        if desc.is_empty() {
844                            let _ = writeln!(output, "- {name}");
845                        } else {
846                            let _ = writeln!(output, "- {name} — {desc}");
847                        }
848                    }
849                    Ok(output)
850                })
851            }
852            // add/remove require mutating self after async I/O.
853            // handle_mcp_command is structured so the only .await crossing a &mut self
854            // boundary goes through a cloned Arc<McpManager> — no &self fields are held
855            // across that .await.  The subsequent state-change methods (rebuild_semantic_index,
856            // sync_mcp_registry) are also async fn(&mut self), but they only hold owned locals
857            // across their own .await points (cloned tools Vec, cloned Arcs).
858            _ => Box::pin(async move {
859                self.handle_mcp_command(&args_owned)
860                    .await
861                    .map_err(|e| CommandError::new(e.to_string()))
862            }),
863        }
864    }
865
866    // ----- /skill -----
867
868    fn handle_skill<'a>(
869        &'a mut self,
870        args: &'a str,
871    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
872        let args_owned = args.to_owned();
873        Box::pin(async move {
874            self.handle_skill_command_as_string(&args_owned)
875                .await
876                .map_err(|e| CommandError::new(e.to_string()))
877        })
878    }
879
880    // ----- /skills -----
881
882    fn handle_skills<'a>(
883        &'a mut self,
884        args: &'a str,
885    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
886        let args_owned = args.to_owned();
887        Box::pin(async move {
888            self.handle_skills_as_string(&args_owned)
889                .await
890                .map_err(|e| CommandError::new(e.to_string()))
891        })
892    }
893
894    // ----- /feedback -----
895
896    fn handle_feedback_command<'a>(
897        &'a mut self,
898        args: &'a str,
899    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
900        let args_owned = args.to_owned();
901        Box::pin(async move {
902            self.handle_feedback_as_string(&args_owned)
903                .await
904                .map_err(|e| CommandError::new(e.to_string()))
905        })
906    }
907
908    // ----- /plan -----
909
910    #[cfg(feature = "scheduler")]
911    fn handle_plan<'a>(
912        &'a mut self,
913        input: &'a str,
914    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
915        Box::pin(async move {
916            self.dispatch_plan_command_as_string(input)
917                .await
918                .map_err(|e| CommandError::new(e.to_string()))
919        })
920    }
921
922    #[cfg(not(feature = "scheduler"))]
923    fn handle_plan<'a>(
924        &'a mut self,
925        _input: &'a str,
926    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
927        Box::pin(async move { Ok(String::new()) })
928    }
929
930    // ----- /experiment -----
931
932    fn handle_experiment<'a>(
933        &'a mut self,
934        input: &'a str,
935    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
936        Box::pin(async move {
937            self.handle_experiment_command_as_string(input)
938                .await
939                .map_err(|e| CommandError::new(e.to_string()))
940        })
941    }
942
943    // ----- /agent, @mention -----
944
945    fn handle_agent_dispatch<'a>(
946        &'a mut self,
947        input: &'a str,
948    ) -> Pin<Box<dyn Future<Output = Result<Option<String>, CommandError>> + Send + 'a>> {
949        Box::pin(async move {
950            match self.dispatch_agent_command(input).await {
951                Some(Err(e)) => Err(CommandError::new(e.to_string())),
952                Some(Ok(())) | None => Ok(None),
953            }
954        })
955    }
956
957    // ----- /plugins -----
958
959    fn handle_plugins<'a>(
960        &'a mut self,
961        args: &'a str,
962    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
963        let args_owned = args.to_owned();
964        // Clone the fields needed by PluginManager before entering the async block.
965        // spawn_blocking requires 'static, so we cannot borrow &self inside the closure.
966        let managed_dir = self.services.skill.managed_dir.clone();
967        let mcp_allowed = self.services.mcp.allowed_commands.clone();
968        let base_shell_allowed = self.runtime.lifecycle.startup_shell_overlay.allowed.clone();
969        Box::pin(async move {
970            // PluginManager performs synchronous filesystem I/O (copy, remove_dir_all,
971            // read_dir). Run on a blocking thread to avoid stalling the tokio worker.
972            tokio::task::spawn_blocking(move || {
973                Self::run_plugin_command(&args_owned, managed_dir, mcp_allowed, base_shell_allowed)
974            })
975            .await
976            .map_err(|e| CommandError(format!("plugin task panicked: {e}")))
977        })
978    }
979
980    // ----- /acp -----
981
982    fn handle_acp<'a>(
983        &'a mut self,
984        args: &'a str,
985    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
986        Box::pin(async move {
987            self.handle_acp_as_string(args)
988                .map_err(|e| CommandError::new(e.to_string()))
989        })
990    }
991
992    // ----- /cocoon -----
993
994    #[cfg(feature = "cocoon")]
995    fn handle_cocoon<'a>(
996        &'a mut self,
997        args: &'a str,
998    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
999        Box::pin(async move {
1000            self.handle_cocoon_as_string(args)
1001                .await
1002                .map_err(|e| CommandError::new(e.to_string()))
1003        })
1004    }
1005
1006    #[cfg(not(feature = "cocoon"))]
1007    fn handle_cocoon<'a>(
1008        &'a mut self,
1009        _args: &'a str,
1010    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1011        Box::pin(async {
1012            Ok("Cocoon support is not compiled in. Rebuild with `--features cocoon`.".to_owned())
1013        })
1014    }
1015
1016    // ----- /loop -----
1017
1018    fn handle_loop<'a>(
1019        &'a mut self,
1020        args: &'a str,
1021    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1022        use zeph_commands::handlers::loop_cmd::parse_loop_args;
1023
1024        let args_owned = args.trim().to_owned();
1025        Box::pin(async move {
1026            if args_owned == "stop" {
1027                return Ok(self.stop_user_loop());
1028            }
1029            if args_owned == "status" {
1030                return Ok(match &self.runtime.lifecycle.user_loop {
1031                    Some(ls) => format!(
1032                        "Loop active: \"{}\" (iteration {}, interval every {}s).",
1033                        ls.prompt,
1034                        ls.iteration,
1035                        ls.interval.period().as_secs(),
1036                    ),
1037                    None => "No active loop.".to_owned(),
1038                });
1039            }
1040            let (prompt, interval_secs) = parse_loop_args(&args_owned)?;
1041
1042            if prompt.starts_with('/') {
1043                return Err(CommandError::new(
1044                    "Loop prompt must not start with '/'. Slash commands cannot be used as loop prompts.",
1045                ));
1046            }
1047
1048            let min_secs = self.runtime.config.loop_min_interval_secs;
1049            if interval_secs < min_secs {
1050                return Err(CommandError::new(format!(
1051                    "Minimum loop interval is {min_secs}s. Got {interval_secs}s."
1052                )));
1053            }
1054            if self.runtime.lifecycle.user_loop.is_some() {
1055                return Err(CommandError::new(
1056                    "A loop is already active. Use /loop stop first.",
1057                ));
1058            }
1059
1060            self.start_user_loop(prompt.clone(), interval_secs);
1061            Ok(format!(
1062                "Loop started: \"{prompt}\" every {interval_secs}s. Use /loop stop to cancel."
1063            ))
1064        })
1065    }
1066
1067    fn notify_test<'a>(
1068        &'a mut self,
1069    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1070        let notifier = self.runtime.lifecycle.notifier.clone();
1071        Box::pin(async move {
1072            let Some(notifier) = notifier else {
1073                return Ok(
1074                    "Notifications are disabled. Set `notifications.enabled = true` in config."
1075                        .to_owned(),
1076                );
1077            };
1078            match notifier.fire_test().await {
1079                Ok(()) => Ok("Test notification sent.".to_owned()),
1080                Err(e) => Err(CommandError::new(format!("notification test failed: {e}"))),
1081            }
1082        })
1083    }
1084
1085    fn handle_trajectory(&mut self, args: &str) -> String {
1086        self.handle_trajectory_command_as_string(args)
1087    }
1088
1089    fn handle_scope(&self, args: &str) -> String {
1090        self.handle_scope_command_as_string(args)
1091    }
1092
1093    // ----- /goal -----
1094
1095    fn handle_goal<'a>(
1096        &'a mut self,
1097        args: &'a str,
1098    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1099        // Extract all non-Send data synchronously before entering the async block.
1100        if self.services.goal_accounting.is_none() {
1101            if !self.runtime.config.goals.enabled {
1102                return Box::pin(async {
1103                    Ok("Goals are disabled. Set `[goals] enabled = true` in config.".to_owned())
1104                });
1105            }
1106            let pool = match self.services.memory.persistence.memory.as_ref() {
1107                Some(m) => std::sync::Arc::new(m.sqlite().pool().clone()),
1108                None => {
1109                    return Box::pin(async {
1110                        Ok("Goals require a database backend (memory not configured).".to_owned())
1111                    });
1112                }
1113            };
1114            let store = std::sync::Arc::new(crate::goal::GoalStore::new(pool));
1115            let accounting = std::sync::Arc::new(crate::goal::GoalAccounting::new(store));
1116            self.services.goal_accounting = Some(accounting);
1117        }
1118
1119        let accounting =
1120            self.services.goal_accounting.clone().expect(
1121                "invariant: goal_accounting is always Some at this point (initialized above)",
1122            );
1123        let max_chars = self.runtime.config.goals.max_text_chars;
1124        let default_budget = self.runtime.config.goals.default_token_budget;
1125        let autonomous_enabled = self.runtime.config.goals.autonomous_enabled;
1126        let autonomous_max_turns = self.runtime.config.goals.autonomous_max_turns;
1127        let args_owned = args.to_owned();
1128
1129        // S1: `goal_create` may need to arm `AutonomousDriver` with a new session.
1130        // We capture a clone of the pending_start Arc that lives on the driver.
1131        // The async block fills it; the main agent loop (which has `&mut self`) drains it
1132        // via `AutonomousDriver::flush_pending_start()` after each command handler returns.
1133        let pending_start_arc = std::sync::Arc::clone(&self.services.autonomous.pending_start_arc);
1134
1135        Box::pin(async move {
1136            let _ = accounting.refresh().await;
1137            let store = accounting.get_store();
1138            let args = args_owned.as_str();
1139
1140            match args {
1141                "" | "status" => goal_status(&accounting).await,
1142                "pause" => goal_pause(&accounting, &store).await,
1143                "resume" => goal_resume(&accounting, &store).await,
1144                "complete" => goal_complete(&accounting, &store).await,
1145                "clear" => goal_clear(&accounting, &store).await,
1146                "list" => goal_list(&store).await,
1147                _ if args.starts_with("create") => {
1148                    let (msg, auto_req) = goal_create(
1149                        args,
1150                        &accounting,
1151                        &store,
1152                        max_chars,
1153                        default_budget,
1154                        autonomous_enabled,
1155                        autonomous_max_turns,
1156                    )
1157                    .await?;
1158                    if let Some(req) = auto_req {
1159                        *pending_start_arc.lock() = Some(req);
1160                    }
1161                    Ok(msg)
1162                }
1163                _ => Ok(
1164                    "Unknown /goal subcommand. Try: create, pause, resume, complete, clear, status, list."
1165                        .to_owned(),
1166                ),
1167            }
1168        })
1169    }
1170
1171    fn active_goal_snapshot(&self) -> Option<zeph_commands::GoalSnapshot> {
1172        let accounting = self.services.goal_accounting.as_ref()?;
1173        let snap = accounting.snapshot()?;
1174        Some(zeph_commands::GoalSnapshot {
1175            id: snap.id,
1176            text: snap.text,
1177            status: match snap.status {
1178                crate::goal::GoalStatus::Active => zeph_commands::GoalStatusView::Active,
1179                crate::goal::GoalStatus::Paused => zeph_commands::GoalStatusView::Paused,
1180                crate::goal::GoalStatus::Completed => zeph_commands::GoalStatusView::Completed,
1181                crate::goal::GoalStatus::Cleared => zeph_commands::GoalStatusView::Cleared,
1182            },
1183            turns_used: snap.turns_used,
1184            tokens_used: snap.tokens_used,
1185            token_budget: snap.token_budget,
1186        })
1187    }
1188
1189    // ----- /agents -----
1190
1191    fn handle_agents<'a>(
1192        &'a mut self,
1193        args: &'a str,
1194    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1195        use zeph_commands::handlers::agents_fleet::{FleetEntry, format_fleet_section};
1196        use zeph_subagent::AgentsCommand;
1197
1198        let args_owned = args.trim().to_owned();
1199        Box::pin(async move {
1200            // Fleet view: bare `/agents` or `/agents fleet` shows autonomous sessions + definitions.
1201            let show_fleet = args_owned.is_empty() || args_owned == "fleet";
1202
1203            let fleet_section = if show_fleet {
1204                let snapshots = self.services.autonomous_registry.list();
1205                let entries: Vec<FleetEntry> = snapshots
1206                    .into_iter()
1207                    .map(|s| FleetEntry {
1208                        goal_id: s.goal_id,
1209                        goal_text_short: s.goal_text_short,
1210                        state: s.state,
1211                        turns_executed: s.turns_executed,
1212                        max_turns: s.max_turns,
1213                        elapsed: s.elapsed,
1214                    })
1215                    .collect();
1216                format_fleet_section(&entries)
1217            } else {
1218                String::new()
1219            };
1220
1221            // Sub-agent definitions section.
1222            let definitions_section = if show_fleet || args_owned == "list" {
1223                self.handle_agents_definitions_list()
1224            } else {
1225                // CRUD subcommands: show, create, edit, delete.
1226                match AgentsCommand::parse(&format!("/agents {args_owned}")) {
1227                    Ok(cmd) => self.handle_agents_crud(cmd),
1228                    Err(e) => e.to_string(),
1229                }
1230            };
1231
1232            let mut out = fleet_section;
1233            if !definitions_section.is_empty() {
1234                if !out.is_empty() {
1235                    out.push('\n');
1236                }
1237                out.push_str(&definitions_section);
1238            }
1239
1240            if out.is_empty() {
1241                "No active autonomous sessions or sub-agent definitions found."
1242                    .clone_into(&mut out);
1243            }
1244
1245            Ok(out)
1246        })
1247    }
1248}
1249
1250type GoalStore = crate::goal::GoalStore;
1251type GoalAccounting = crate::goal::GoalAccounting;
1252
1253/// Hard cap on `--turns` to prevent runaway autonomous loops (Security Low).
1254const AUTONOMOUS_MAX_TURNS_CAP: u32 = 1000;
1255
1256async fn goal_status(accounting: &GoalAccounting) -> Result<String, CommandError> {
1257    match accounting.get_active().await {
1258        Ok(Some(g)) => {
1259            let budget_line = g.token_budget.map_or_else(
1260                || format!("  tokens used: {}", g.tokens_used),
1261                |b| format!("  budget: {}/{b}", g.tokens_used),
1262            );
1263            Ok(format!(
1264                "Active goal [{}]: {}\n  status: {}\n  turns: {}\n{}",
1265                &g.id[..8],
1266                g.text,
1267                g.status,
1268                g.turns_used,
1269                budget_line
1270            ))
1271        }
1272        Ok(None) => Ok("No active goal. Use `/goal create <text>` to set one.".to_owned()),
1273        Err(e) => Ok(format!("Goal lookup failed: {e}")),
1274    }
1275}
1276
1277/// Returns `(display_message, auto_start_request)`.
1278///
1279/// `auto_start_request` is `Some((goal_id, goal_text, max_turns))` when `--auto` was passed and
1280/// the goal was successfully created. The caller must relay this to `AutonomousDriver` via the
1281/// `pending_start_arc` side-channel before the future resolves.
1282async fn goal_create(
1283    args: &str,
1284    accounting: &GoalAccounting,
1285    store: &GoalStore,
1286    max_chars: usize,
1287    default_budget: Option<u64>,
1288    autonomous_enabled: bool,
1289    autonomous_max_turns: u32,
1290) -> Result<(String, Option<(String, String, u32)>), CommandError> {
1291    let rest = args.strip_prefix("create").unwrap_or("").trim();
1292
1293    // Strip --auto / --turns before passing text to the budget parser.
1294    let (stripped, is_auto, explicit_turns) = parse_auto_flags(rest);
1295    let (text, explicit_budget) = parse_goal_create_args(&stripped);
1296
1297    if text.is_empty() {
1298        return Ok((
1299            "Usage: /goal create <text> [--budget N] [--auto [--turns N]]".to_owned(),
1300            None,
1301        ));
1302    }
1303    if is_auto && !autonomous_enabled {
1304        return Ok((
1305            "Autonomous mode is disabled. Set `[goals] autonomous_enabled = true` in config."
1306                .to_owned(),
1307            None,
1308        ));
1309    }
1310    let budget = explicit_budget.or(default_budget.filter(|&b| b > 0));
1311
1312    let max_turns = explicit_turns
1313        .unwrap_or(autonomous_max_turns)
1314        .min(AUTONOMOUS_MAX_TURNS_CAP);
1315    if explicit_turns.is_some_and(|t| t > AUTONOMOUS_MAX_TURNS_CAP) {
1316        tracing::warn!(
1317            requested = explicit_turns,
1318            capped = AUTONOMOUS_MAX_TURNS_CAP,
1319            "autonomous max_turns capped to {AUTONOMOUS_MAX_TURNS_CAP}"
1320        );
1321    }
1322
1323    match store.create(text, budget, max_chars).await {
1324        Ok(g) => {
1325            let _ = accounting.refresh().await;
1326            let auto_start = if is_auto {
1327                Some((g.id.clone(), g.text.clone(), max_turns))
1328            } else {
1329                None
1330            };
1331            let auto_note = if is_auto {
1332                " Autonomous mode enabled — use `/goal clear` to stop."
1333            } else {
1334                ""
1335            };
1336            Ok((
1337                format!("Goal created [{}]: {}{auto_note}", &g.id[..8], g.text),
1338                auto_start,
1339            ))
1340        }
1341        Err(crate::goal::store::GoalError::TextTooLong { max }) => Ok((
1342            format!("Goal text exceeds {max} characters. Please shorten it."),
1343            None,
1344        )),
1345        Err(e) => Ok((format!("Failed to create goal: {e}"), None)),
1346    }
1347}
1348
1349async fn goal_pause(
1350    accounting: &GoalAccounting,
1351    store: &GoalStore,
1352) -> Result<String, CommandError> {
1353    match accounting.get_active().await {
1354        Ok(Some(g)) => {
1355            match store
1356                .transition(&g.id, crate::goal::GoalStatus::Paused, g.updated_at)
1357                .await
1358            {
1359                Ok(_) => {
1360                    let _ = accounting.refresh().await;
1361                    Ok(format!("Goal [{}] paused.", &g.id[..8]))
1362                }
1363                Err(crate::goal::store::GoalError::StaleUpdate(_)) => {
1364                    let current = accounting.get_active().await.ok().flatten();
1365                    Ok(format!(
1366                        "Goal state changed concurrently. Current: {}",
1367                        current.map_or_else(|| "none".into(), |g| g.status.to_string())
1368                    ))
1369                }
1370                Err(e) => Ok(format!("Pause failed: {e}")),
1371            }
1372        }
1373        Ok(None) => Ok("No active goal to pause.".to_owned()),
1374        Err(e) => Ok(format!("Failed: {e}")),
1375    }
1376}
1377
1378async fn goal_resume(
1379    accounting: &GoalAccounting,
1380    store: &GoalStore,
1381) -> Result<String, CommandError> {
1382    let goals = store.list(10).await.unwrap_or_default();
1383    let paused = goals
1384        .into_iter()
1385        .find(|g| g.status == crate::goal::GoalStatus::Paused);
1386    match paused {
1387        Some(g) => {
1388            match store
1389                .transition(&g.id, crate::goal::GoalStatus::Active, g.updated_at)
1390                .await
1391            {
1392                Ok(_) => {
1393                    let _ = accounting.refresh().await;
1394                    Ok(format!("Goal [{}] resumed: {}", &g.id[..8], g.text))
1395                }
1396                Err(crate::goal::store::GoalError::StaleUpdate(_)) => {
1397                    Ok("Goal state changed concurrently — please retry.".to_owned())
1398                }
1399                Err(e) => Ok(format!("Resume failed: {e}")),
1400            }
1401        }
1402        None => Ok("No paused goal to resume.".to_owned()),
1403    }
1404}
1405
1406async fn goal_complete(
1407    accounting: &GoalAccounting,
1408    store: &GoalStore,
1409) -> Result<String, CommandError> {
1410    match accounting.get_active().await {
1411        Ok(Some(g)) => {
1412            match store
1413                .transition(&g.id, crate::goal::GoalStatus::Completed, g.updated_at)
1414                .await
1415            {
1416                Ok(_) => {
1417                    let _ = accounting.refresh().await;
1418                    Ok(format!("Goal [{}] marked complete.", &g.id[..8]))
1419                }
1420                Err(e) => Ok(format!("Complete failed: {e}")),
1421            }
1422        }
1423        Ok(None) => Ok("No active goal.".to_owned()),
1424        Err(e) => Ok(format!("Failed: {e}")),
1425    }
1426}
1427
1428async fn goal_clear(
1429    accounting: &GoalAccounting,
1430    store: &GoalStore,
1431) -> Result<String, CommandError> {
1432    let goals = store.list(10).await.unwrap_or_default();
1433    let target = goals.into_iter().find(|g| {
1434        g.status == crate::goal::GoalStatus::Active || g.status == crate::goal::GoalStatus::Paused
1435    });
1436    match target {
1437        Some(g) => {
1438            match store
1439                .transition(&g.id, crate::goal::GoalStatus::Cleared, g.updated_at)
1440                .await
1441            {
1442                Ok(_) => {
1443                    let _ = accounting.refresh().await;
1444                    Ok(format!("Goal [{}] cleared.", &g.id[..8]))
1445                }
1446                Err(e) => Ok(format!("Clear failed: {e}")),
1447            }
1448        }
1449        None => Ok("No active or paused goal to clear.".to_owned()),
1450    }
1451}
1452
1453async fn goal_list(store: &GoalStore) -> Result<String, CommandError> {
1454    let goals = store.list(20).await.unwrap_or_default();
1455    if goals.is_empty() {
1456        return Ok("No goals recorded.".to_owned());
1457    }
1458    let mut out = String::from("Goals:\n");
1459    for g in goals {
1460        let _ = std::fmt::Write::write_fmt(
1461            &mut out,
1462            format_args!(
1463                "  {} [{}] {} — {} turns\n",
1464                g.status.badge_symbol(),
1465                &g.id[..8],
1466                g.text,
1467                g.turns_used
1468            ),
1469        );
1470    }
1471    Ok(out.trim_end().to_owned())
1472}
1473
1474fn parse_goal_create_args(args: &str) -> (&str, Option<u64>) {
1475    if let Some(pos) = args.find("--budget") {
1476        let text = args[..pos].trim();
1477        let rest = args[pos + "--budget".len()..].trim();
1478        let budget = rest
1479            .split_whitespace()
1480            .next()
1481            .and_then(|s| s.parse::<u64>().ok());
1482        (text, budget)
1483    } else {
1484        (args, None)
1485    }
1486}
1487
1488/// Parse `--auto` and `--turns N` flags from the remainder of a `/goal create` argument string.
1489///
1490/// Returns `(text_without_auto_flags, is_auto, explicit_turns)`.
1491fn parse_auto_flags(args: &str) -> (String, bool, Option<u32>) {
1492    let mut is_auto = false;
1493    let mut turns: Option<u32> = None;
1494    let mut text_words: Vec<&str> = Vec::new();
1495    let mut words = args.split_whitespace();
1496
1497    while let Some(w) = words.next() {
1498        if w == "--auto" {
1499            is_auto = true;
1500        } else if w == "--turns" {
1501            turns = words.next().and_then(|n| n.parse::<u32>().ok());
1502        } else {
1503            text_words.push(w);
1504        }
1505    }
1506
1507    (text_words.join(" "), is_auto, turns)
1508}
1509
1510/// Convert `AgentError` to `CommandError` for the trait boundary.
1511impl From<AgentError> for CommandError {
1512    fn from(e: AgentError) -> Self {
1513        Self(e.to_string())
1514    }
1515}
1516
1517#[cfg(test)]
1518mod tests {
1519    use super::super::agent_tests::{
1520        MockChannel, MockToolExecutor, create_test_registry, mock_provider,
1521    };
1522    use super::*;
1523    use zeph_commands::traits::agent::AgentAccess;
1524    use zeph_memory::semantic::SemanticMemory;
1525
1526    async fn memory_without_qdrant() -> SemanticMemory {
1527        SemanticMemory::new(
1528            ":memory:",
1529            "http://127.0.0.1:1",
1530            None,
1531            zeph_llm::any::AnyProvider::Mock(zeph_llm::mock::MockProvider::default()),
1532            "test-model",
1533        )
1534        .await
1535        .unwrap()
1536    }
1537
1538    // R-CRIT-4111: when graph is enabled in config but graph_store is None
1539    // (Qdrant unreachable), graph command handlers must report
1540    // "unavailable" rather than "not enabled".
1541    #[tokio::test]
1542    async fn graph_stats_enabled_but_no_store_reports_unavailable() {
1543        let cfg = crate::config::GraphConfig {
1544            enabled: true,
1545            ..Default::default()
1546        };
1547        let memory = memory_without_qdrant().await;
1548        let cid = memory.sqlite().create_conversation().await.unwrap();
1549        let mut agent = Agent::new(
1550            mock_provider(vec![]),
1551            MockChannel::new(vec![]),
1552            create_test_registry(),
1553            None,
1554            5,
1555            MockToolExecutor::no_tools(),
1556        )
1557        .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1558        .with_graph_config(cfg);
1559
1560        let result = agent.graph_stats().await.unwrap();
1561        assert!(
1562            result.contains("unavailable"),
1563            "expected 'unavailable' but got: {result}"
1564        );
1565        assert!(
1566            !result.contains("not enabled"),
1567            "must not report 'not enabled' when graph is enabled: {result}"
1568        );
1569    }
1570
1571    #[tokio::test]
1572    async fn graph_stats_disabled_reports_not_enabled() {
1573        let cfg = crate::config::GraphConfig {
1574            enabled: false,
1575            ..Default::default()
1576        };
1577        let memory = memory_without_qdrant().await;
1578        let cid = memory.sqlite().create_conversation().await.unwrap();
1579        let mut agent = Agent::new(
1580            mock_provider(vec![]),
1581            MockChannel::new(vec![]),
1582            create_test_registry(),
1583            None,
1584            5,
1585            MockToolExecutor::no_tools(),
1586        )
1587        .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1588        .with_graph_config(cfg);
1589
1590        let result = agent.graph_stats().await.unwrap();
1591        assert!(
1592            result.contains("not enabled"),
1593            "expected 'not enabled' but got: {result}"
1594        );
1595    }
1596
1597    // R-CRIT-4136: graph_backfill must resolve extract_provider before entering the async block.
1598    // When extract_provider is set to an unknown name, resolve_background_provider falls back to
1599    // the primary provider — the backfill still completes (no messages to process).
1600    // This test confirms that the provider-resolution code path executes without panic or borrow
1601    // errors, which would occur if the old code tried to access `&mut self` inside `async move`.
1602    #[tokio::test]
1603    async fn graph_backfill_with_extract_provider_resolves_without_panic() {
1604        let cfg = crate::config::GraphConfig {
1605            enabled: true,
1606            extract_provider: zeph_config::providers::ProviderName::new("nonexistent-provider"),
1607            ..Default::default()
1608        };
1609        let mut memory = memory_without_qdrant().await;
1610        // Install a real SQLite-backed GraphStore so resolve_graph_store succeeds.
1611        let pool = memory.sqlite().pool().clone();
1612        memory.graph_store = Some(std::sync::Arc::new(zeph_memory::GraphStore::new(pool)));
1613        let cid = memory.sqlite().create_conversation().await.unwrap();
1614        let mut agent = Agent::new(
1615            mock_provider(vec![]),
1616            MockChannel::new(vec![]),
1617            create_test_registry(),
1618            None,
1619            5,
1620            MockToolExecutor::no_tools(),
1621        )
1622        .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1623        .with_graph_config(cfg);
1624
1625        let mut progress = vec![];
1626        let result = agent
1627            .graph_backfill(Some(10), &mut |msg| progress.push(msg))
1628            .await
1629            .unwrap();
1630
1631        // With an empty store there are zero unprocessed messages → backfill completes immediately.
1632        assert!(
1633            result.contains("Backfill complete"),
1634            "expected 'Backfill complete' but got: {result}"
1635        );
1636    }
1637
1638    // R-4139: graph_entities with enabled graph but no store (Qdrant unreachable) must
1639    // report unavailable, not panic or hang.
1640    #[tokio::test]
1641    async fn graph_entities_enabled_but_no_store_reports_unavailable() {
1642        let cfg = crate::config::GraphConfig {
1643            enabled: true,
1644            ..Default::default()
1645        };
1646        let memory = memory_without_qdrant().await;
1647        let cid = memory.sqlite().create_conversation().await.unwrap();
1648        let mut agent = Agent::new(
1649            mock_provider(vec![]),
1650            MockChannel::new(vec![]),
1651            create_test_registry(),
1652            None,
1653            5,
1654            MockToolExecutor::no_tools(),
1655        )
1656        .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1657        .with_graph_config(cfg);
1658
1659        let result = agent.graph_entities().await.unwrap();
1660        assert!(
1661            result.contains("unavailable"),
1662            "expected 'unavailable' but got: {result}"
1663        );
1664    }
1665
1666    // R-4139: graph_communities with enabled graph but no store must report unavailable.
1667    #[tokio::test]
1668    async fn graph_communities_enabled_but_no_store_reports_unavailable() {
1669        let cfg = crate::config::GraphConfig {
1670            enabled: true,
1671            ..Default::default()
1672        };
1673        let memory = memory_without_qdrant().await;
1674        let cid = memory.sqlite().create_conversation().await.unwrap();
1675        let mut agent = Agent::new(
1676            mock_provider(vec![]),
1677            MockChannel::new(vec![]),
1678            create_test_registry(),
1679            None,
1680            5,
1681            MockToolExecutor::no_tools(),
1682        )
1683        .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1684        .with_graph_config(cfg);
1685
1686        let result = agent.graph_communities().await.unwrap();
1687        assert!(
1688            result.contains("unavailable"),
1689            "expected 'unavailable' but got: {result}"
1690        );
1691    }
1692
1693    // R-4139: verify that the tokio::time::timeout pattern used in graph handlers
1694    // correctly returns Err on a never-resolving future. This is a direct regression
1695    // guard for the fix introduced in #4139: before the fix, these calls had no
1696    // timeout guard and would block indefinitely when Qdrant was unreachable.
1697    #[tokio::test]
1698    async fn graph_store_timeout_pattern_fires_on_pending_future() {
1699        use std::future;
1700        let result = tokio::time::timeout(
1701            Duration::from_millis(10),
1702            future::pending::<Result<Vec<()>, String>>(),
1703        )
1704        .await;
1705        assert!(
1706            result.is_err(),
1707            "timeout must fire on a never-resolving future"
1708        );
1709    }
1710}