Skip to main content

zeph_core/agent/
agent_access_impl.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Implementation of [`zeph_commands::traits::agent::AgentAccess`] for [`Agent<C>`].
5//!
6//! Each method in `AgentAccess` returns a formatted `String` result (without sending to the
7//! channel directly), so that `CommandContext::sink` does not conflict with this borrow.
8//! The one exception is methods for subsystems that are already channel-free (memory, graph).
9//!
10//! [`Agent<C>`]: super::Agent
11
12use std::fmt::Write as _;
13use std::future::Future;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::Duration;
17
18use tracing::Instrument as _;
19use zeph_commands::CommandError;
20use zeph_commands::traits::agent::AgentAccess;
21use zeph_memory::semantic::SemanticMemory;
22use zeph_memory::{GraphExtractionConfig, GraphStore, MessageId, extract_and_store};
23
24use super::{Agent, error::AgentError};
25use crate::channel::Channel;
26
27impl<C: Channel + Send + 'static> Agent<C> {
28    fn resolve_graph_store(&self) -> Result<(Arc<SemanticMemory>, Arc<GraphStore>), String> {
29        let Some(memory) = self.services.memory.persistence.memory.clone() else {
30            return Err("Graph memory is not enabled.".to_owned());
31        };
32        let Some(store) = memory.graph_store.clone() else {
33            if self.services.memory.extraction.graph_config.enabled {
34                return Err(
35                    "Graph memory enabled but vector store unavailable (Qdrant unreachable)."
36                        .to_owned(),
37                );
38            }
39            return Err("Graph memory is not enabled.".to_owned());
40        };
41        Ok((memory, store))
42    }
43}
44
45/// Run Stage-2 LLM semantic scan for all skills in the plugin at `source`.
46///
47/// Skills are scanned concurrently with up to 4 in-flight at a time
48/// (`buffer_unordered(4)`). An aggregate 5-minute timeout wraps the whole
49/// batch; each individual scan is already bounded by `SCAN_TIMEOUT` (30 s) in
50/// `SkillSemanticScanner`. Returns `Some(err_msg)` when any skill is blocked,
51/// `None` when all skills pass.
52///
53/// Each future carries its own `skill_name` so that the rejection message names
54/// the correct skill regardless of completion order (which differs from input
55/// order when futures complete out-of-order with `buffer_unordered`).
56async fn semantic_scan_plugin_add(
57    scanner: &zeph_skills::semantic_scanner::SkillSemanticScanner,
58    source: &str,
59    managed_dir: Option<std::path::PathBuf>,
60    mcp_allowed: Vec<String>,
61    base_shell_allowed: Vec<String>,
62) -> Result<Option<String>, CommandError> {
63    use futures::stream::StreamExt as _;
64    use zeph_skills::semantic_scanner::ScanVerdict;
65
66    let plugins_dir = zeph_plugins::PluginManager::default_plugins_dir();
67    let mgr_dir =
68        managed_dir.unwrap_or_else(|| zeph_config::defaults::default_vault_dir().join("skills"));
69    let mgr =
70        zeph_plugins::PluginManager::new(plugins_dir, mgr_dir, mcp_allowed, base_shell_allowed);
71
72    let source_owned = source.to_owned();
73    let scan_inputs = tokio::task::spawn_blocking(move || mgr.scan_targets(&source_owned))
74        .await
75        .map_err(|e| CommandError(format!("plugin scan_targets panicked: {e}")))?
76        .map_err(|e| CommandError(format!("plugin add failed: {e}")))?;
77
78    tracing::info!(
79        plugin.source = %source,
80        skills_count = scan_inputs.len(),
81        "plugins.add: running Stage-2 semantic scan"
82    );
83
84    // Scan all skills concurrently with up to 4 in-flight. Each individual scan is
85    // already bounded by SCAN_TIMEOUT (30 s); the outer 5-min cap guards the batch.
86    // Each future owns its skill_name so verdicts carry the correct name regardless
87    // of buffer_unordered completion order (which is not the same as input order).
88    let scan_futs: Vec<_> = scan_inputs
89        .iter()
90        .map(|input| {
91            let name = input.skill_name.clone();
92            let purpose = input.declared_purpose.clone();
93            let md = input.skill_md.clone();
94            async move {
95                let verdict = scanner.scan(&name, &purpose, &md).await;
96                (name, verdict)
97            }
98        })
99        .collect();
100
101    let verdicts: Vec<_> = tokio::time::timeout(
102        std::time::Duration::from_mins(5),
103        futures::stream::iter(scan_futs)
104            .buffer_unordered(4)
105            .collect::<Vec<_>>(),
106    )
107    .await
108    .map_err(|_| CommandError("plugin scan timed out after 300s".to_owned()))?;
109
110    for (skill_name, verdict_result) in verdicts {
111        let verdict = verdict_result.map_err(|e| {
112            CommandError(format!(
113                "plugin add failed: semantic scan error for skill {skill_name:?}: {e}"
114            ))
115        })?;
116        match verdict {
117            ScanVerdict::Allow => {
118                tracing::debug!(
119                    skill = %skill_name,
120                    "plugins.add: skill passed semantic scan"
121                );
122            }
123            ScanVerdict::Warn(ref reason) => {
124                tracing::warn!(
125                    skill = %skill_name,
126                    reason = %reason,
127                    "plugins.add: skill passed with warning"
128                );
129            }
130            ScanVerdict::Block(reason) => {
131                return Ok(Some(format!(
132                    "plugin add failed: skill {skill_name:?} rejected by semantic scan: {reason}"
133                )));
134            }
135            _ => {}
136        }
137    }
138    Ok(None)
139}
140
141impl<C: Channel + Send + 'static> AgentAccess for Agent<C> {
142    // ----- /memory -----
143
144    fn memory_tiers<'a>(
145        &'a mut self,
146    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
147        Box::pin(
148            async move {
149                let Some(memory) = self.services.memory.persistence.memory.clone() else {
150                    return Ok("Memory not configured.".to_owned());
151                };
152                match memory.sqlite().count_messages_by_tier().await {
153                    Ok((episodic, semantic)) => {
154                        let mut out = String::new();
155                        let _ = writeln!(out, "Memory tiers:");
156                        let _ = writeln!(out, "  Working:  (current context window — virtual)");
157                        let _ = writeln!(out, "  Episodic: {episodic} messages");
158                        let _ = writeln!(out, "  Semantic: {semantic} facts");
159                        Ok(out.trim_end().to_owned())
160                    }
161                    Err(e) => Ok(format!("Failed to query tier stats: {e}")),
162                }
163            }
164            .instrument(tracing::info_span!("core.agent_access.memory_tiers")),
165        )
166    }
167
168    fn memory_promote<'a>(
169        &'a mut self,
170        ids_str: &'a str,
171    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
172        Box::pin(
173            async move {
174                let Some(memory) = self.services.memory.persistence.memory.clone() else {
175                    return Ok("Memory not configured.".to_owned());
176                };
177                let ids: Vec<MessageId> = ids_str
178                    .split_whitespace()
179                    .filter_map(|s| s.parse::<i64>().ok().map(MessageId))
180                    .collect();
181                if ids.is_empty() {
182                    return Ok(
183                        "Usage: /memory promote <id> [id...]\nExample: /memory promote 42 43 44"
184                            .to_owned(),
185                    );
186                }
187                match memory.sqlite().manual_promote(&ids).await {
188                    Ok(count) => Ok(format!("Promoted {count} message(s) to semantic tier.")),
189                    Err(e) => Ok(format!("Promotion failed: {e}")),
190                }
191            }
192            .instrument(tracing::info_span!("core.agent_access.memory_promote")),
193        )
194    }
195
196    // ----- /graph -----
197
198    fn graph_stats<'a>(
199        &'a mut self,
200    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
201        Box::pin(
202            async move {
203                let (_, store) = match self.resolve_graph_store() {
204                    Ok(pair) => pair,
205                    Err(msg) => return Ok(msg),
206                };
207
208                let stats_future = async {
209                    tokio::join!(
210                        store.entity_count(),
211                        store.active_edge_count(),
212                        store.community_count(),
213                        store.edge_type_distribution()
214                    )
215                };
216                let Ok((entities, edges, communities, distribution)) =
217                    tokio::time::timeout(Duration::from_secs(5), stats_future).await
218                else {
219                    tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
220                    return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
221                };
222                let mut msg = format!(
223                    "Graph memory: {} entities, {} edges, {} communities",
224                    entities.unwrap_or(0),
225                    edges.unwrap_or(0),
226                    communities.unwrap_or(0)
227                );
228                if let Ok(dist) = distribution
229                    && !dist.is_empty()
230                {
231                    let dist_str: Vec<String> =
232                        dist.iter().map(|(t, c)| format!("{t}={c}")).collect();
233                    write!(msg, "\nEdge types: {}", dist_str.join(", ")).unwrap_or(());
234                }
235                Ok(msg)
236            }
237            .instrument(tracing::info_span!("core.agent_access.graph_stats")),
238        )
239    }
240
241    fn graph_entities<'a>(
242        &'a mut self,
243    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
244        Box::pin(
245            async move {
246                let (_, store) = match self.resolve_graph_store() {
247                    Ok(pair) => pair,
248                    Err(msg) => return Ok(msg),
249                };
250
251                let entities = match tokio::time::timeout(
252                    Duration::from_secs(5),
253                    store.all_entities(),
254                )
255                .await
256                {
257                    Ok(Ok(v)) => v,
258                    Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
259                    Err(_) => {
260                        tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
261                        return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
262                    }
263                };
264                if entities.is_empty() {
265                    return Ok("No entities found.".to_owned());
266                }
267
268                let total = entities.len();
269                let display: Vec<String> = entities
270                    .iter()
271                    .take(50)
272                    .map(|e| {
273                        format!(
274                            "  {:<40}  {:<15}  {}",
275                            e.name,
276                            e.entity_type.as_str(),
277                            e.last_seen_at.split('T').next().unwrap_or(&e.last_seen_at)
278                        )
279                    })
280                    .collect();
281                let mut msg = format!(
282                    "Entities ({total} total):\n  {:<40}  {:<15}  {}\n{}",
283                    "NAME",
284                    "TYPE",
285                    "LAST SEEN",
286                    display.join("\n")
287                );
288                if total > 50 {
289                    write!(msg, "\n  ...and {} more", total - 50).unwrap_or(());
290                }
291                Ok(msg)
292            }
293            .instrument(tracing::info_span!("core.agent_access.graph_entities")),
294        )
295    }
296
297    fn graph_facts<'a>(
298        &'a mut self,
299        name: &'a str,
300    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
301        Box::pin(
302            async move {
303                let (_, store) = match self.resolve_graph_store() {
304                    Ok(pair) => pair,
305                    Err(msg) => return Ok(msg),
306                };
307
308                let matches = match tokio::time::timeout(
309                    Duration::from_secs(5),
310                    store.find_entity_by_name(name),
311                )
312                .await
313                {
314                    Ok(Ok(v)) => v,
315                    Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
316                    Err(_) => {
317                        tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
318                        return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
319                    }
320                };
321                if matches.is_empty() {
322                    return Ok(format!("No entity found matching '{name}'."));
323                }
324
325                let entity = &matches[0];
326                let edges = match tokio::time::timeout(
327                    Duration::from_secs(5),
328                    store.edges_for_entity(entity.id.0),
329                )
330                .await
331                {
332                    Ok(Ok(v)) => v,
333                    Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
334                    Err(_) => {
335                        tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
336                        return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
337                    }
338                };
339                if edges.is_empty() {
340                    return Ok(format!("Entity '{}' has no known facts.", entity.name));
341                }
342
343                let mut entity_names: std::collections::HashMap<i64, String> =
344                    std::collections::HashMap::new();
345                entity_names.insert(entity.id.0, entity.name.clone());
346                for edge in &edges {
347                    let other_id = if edge.source_entity_id == entity.id.0 {
348                        edge.target_entity_id
349                    } else {
350                        edge.source_entity_id
351                    };
352                    entity_names.entry(other_id).or_default();
353                }
354                for (&id, name_val) in &mut entity_names {
355                    if name_val.is_empty() {
356                        let result = tokio::time::timeout(
357                            Duration::from_secs(5),
358                            store.find_entity_by_id(id),
359                        )
360                        .await;
361                        if let Ok(Ok(Some(other))) = result {
362                            *name_val = other.name;
363                        } else {
364                            *name_val = format!("#{id}");
365                        }
366                    }
367                }
368
369                let lines: Vec<String> = edges
370                    .iter()
371                    .map(|e| {
372                        let src = entity_names
373                            .get(&e.source_entity_id)
374                            .cloned()
375                            .unwrap_or_else(|| format!("#{}", e.source_entity_id));
376                        let tgt = entity_names
377                            .get(&e.target_entity_id)
378                            .cloned()
379                            .unwrap_or_else(|| format!("#{}", e.target_entity_id));
380                        format!(
381                            "  {} --[{}/{}]--> {}: {} (confidence: {:.2})",
382                            src, e.relation, e.edge_type, tgt, e.fact, e.confidence
383                        )
384                    })
385                    .collect();
386                Ok(format!(
387                    "Facts for '{}':\n{}",
388                    entity.name,
389                    lines.join("\n")
390                ))
391            }
392            .instrument(tracing::info_span!("core.agent_access.graph_facts")),
393        )
394    }
395
396    fn graph_history<'a>(
397        &'a mut self,
398        name: &'a str,
399    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
400        Box::pin(
401            async move {
402                let (_, store) = match self.resolve_graph_store() {
403                    Ok(pair) => pair,
404                    Err(msg) => return Ok(msg),
405                };
406
407                let matches = match tokio::time::timeout(
408                    Duration::from_secs(5),
409                    store.find_entity_by_name(name),
410                )
411                .await
412                {
413                    Ok(Ok(v)) => v,
414                    Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
415                    Err(_) => {
416                        tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
417                        return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
418                    }
419                };
420                if matches.is_empty() {
421                    return Ok(format!("No entity found matching '{name}'."));
422                }
423
424                let entity = &matches[0];
425                let edges = match tokio::time::timeout(
426                    Duration::from_secs(5),
427                    store.edge_history_for_entity(entity.id.0, 50),
428                )
429                .await
430                {
431                    Ok(Ok(v)) => v,
432                    Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
433                    Err(_) => {
434                        tracing::warn!("graph store call timed out after 5s (Qdrant unreachable)");
435                        return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
436                    }
437                };
438                if edges.is_empty() {
439                    return Ok(format!("Entity '{}' has no edge history.", entity.name));
440                }
441
442                let mut entity_names: std::collections::HashMap<i64, String> =
443                    std::collections::HashMap::new();
444                entity_names.insert(entity.id.0, entity.name.clone());
445                for edge in &edges {
446                    for &id in &[edge.source_entity_id, edge.target_entity_id] {
447                        entity_names.entry(id).or_default();
448                    }
449                }
450                for (&id, name_val) in &mut entity_names {
451                    if name_val.is_empty() {
452                        let result = tokio::time::timeout(
453                            Duration::from_secs(5),
454                            store.find_entity_by_id(id),
455                        )
456                        .await;
457                        if let Ok(Ok(Some(other))) = result {
458                            *name_val = other.name;
459                        } else {
460                            *name_val = format!("#{id}");
461                        }
462                    }
463                }
464
465                let n = edges.len();
466                let lines: Vec<String> = edges
467                    .iter()
468                    .map(|e| {
469                        let status = if e.valid_to.is_some() {
470                            let date = e
471                                .valid_to
472                                .as_deref()
473                                .and_then(|s| s.split('T').next().or_else(|| s.split(' ').next()))
474                                .unwrap_or("?");
475                            format!("[expired {date}]")
476                        } else {
477                            "[active]".to_string()
478                        };
479                        let src = entity_names
480                            .get(&e.source_entity_id)
481                            .cloned()
482                            .unwrap_or_else(|| format!("#{}", e.source_entity_id));
483                        let tgt = entity_names
484                            .get(&e.target_entity_id)
485                            .cloned()
486                            .unwrap_or_else(|| format!("#{}", e.target_entity_id));
487                        format!(
488                            "  {status} {} --[{}/{}]--> {}: {} (confidence: {:.2})",
489                            src, e.relation, e.edge_type, tgt, e.fact, e.confidence
490                        )
491                    })
492                    .collect();
493                Ok(format!(
494                    "Edge history for '{}' ({n} edges):\n{}",
495                    entity.name,
496                    lines.join("\n")
497                ))
498            }
499            .instrument(tracing::info_span!("core.agent_access.graph_history")),
500        )
501    }
502
503    fn graph_communities<'a>(
504        &'a mut self,
505    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
506        Box::pin(
507            async move {
508                let (_, store) = match self.resolve_graph_store() {
509                    Ok(pair) => pair,
510                    Err(msg) => return Ok(msg),
511                };
512
513                let communities =
514                    match tokio::time::timeout(Duration::from_secs(5), store.all_communities())
515                        .await
516                    {
517                        Ok(Ok(v)) => v,
518                        Ok(Err(e)) => return Err(CommandError::new(e.to_string())),
519                        Err(_) => {
520                            tracing::warn!(
521                                "graph store call timed out after 5s (Qdrant unreachable)"
522                            );
523                            return Ok("Graph store unavailable (Qdrant unreachable).".to_owned());
524                        }
525                    };
526                if communities.is_empty() {
527                    return Ok("No communities detected yet. Run graph backfill first.".to_owned());
528                }
529
530                let lines: Vec<String> = communities
531                    .iter()
532                    .map(|c| format!("  [{}]: {}", c.name, c.summary))
533                    .collect();
534                Ok(format!(
535                    "Communities ({}):\n{}",
536                    communities.len(),
537                    lines.join("\n")
538                ))
539            }
540            .instrument(tracing::info_span!("core.agent_access.graph_communities")),
541        )
542    }
543
544    #[allow(clippy::too_many_lines)]
545    fn graph_backfill<'a>(
546        &'a mut self,
547        limit: Option<usize>,
548        progress_cb: &'a mut (dyn FnMut(String) + Send),
549    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
550        let store = match self.resolve_graph_store() {
551            Ok((_, s)) => s,
552            Err(msg) => return Box::pin(async move { Ok(msg) }),
553        };
554        let graph_cfg = self.services.memory.extraction.graph_config.clone();
555        let embed_timeout_secs = self
556            .services
557            .memory
558            .persistence
559            .memory
560            .as_ref()
561            .map_or(5, |m| m.embed_timeout().as_secs());
562        let provider = if graph_cfg.extract_provider.as_str().is_empty() {
563            self.provider.clone()
564        } else {
565            self.resolve_background_provider(graph_cfg.extract_provider.as_str())
566        };
567        Box::pin(
568            async move {
569                let total = store.unprocessed_message_count().await.unwrap_or(0);
570                let cap = limit.unwrap_or(usize::MAX);
571
572                progress_cb(format!(
573                    "Starting graph backfill... ({total} unprocessed messages)"
574                ));
575
576                let batch_size = 50usize;
577                let mut processed = 0usize;
578                let mut total_entities = 0usize;
579                let mut total_edges = 0usize;
580
581                loop {
582                    let remaining_cap = cap.saturating_sub(processed);
583                    if remaining_cap == 0 {
584                        break;
585                    }
586                    let batch_limit = batch_size.min(remaining_cap);
587                    let messages = store
588                        .unprocessed_messages_for_backfill(batch_limit)
589                        .await
590                        .map_err(|e| CommandError::new(e.to_string()))?;
591                    if messages.is_empty() {
592                        break;
593                    }
594
595                    let ids: Vec<zeph_memory::types::MessageId> =
596                        messages.iter().map(|(id, _)| *id).collect();
597
598                    for (_id, content) in &messages {
599                        if content.trim().is_empty() {
600                            continue;
601                        }
602                        let extraction_cfg = GraphExtractionConfig {
603                            max_entities: graph_cfg.max_entities_per_message,
604                            max_edges: graph_cfg.max_edges_per_message,
605                            extraction_timeout_secs: graph_cfg.extraction_timeout_secs,
606                            community_refresh_interval: 0,
607                            expired_edge_retention_days: graph_cfg.expired_edge_retention_days,
608                            max_entities_cap: graph_cfg.max_entities,
609                            community_summary_max_prompt_bytes: graph_cfg
610                                .community_summary_max_prompt_bytes,
611                            community_summary_concurrency: graph_cfg.community_summary_concurrency,
612                            lpa_edge_chunk_size: graph_cfg.lpa_edge_chunk_size,
613                            note_linking: zeph_memory::NoteLinkingConfig::default(),
614                            link_weight_decay_lambda: graph_cfg.link_weight_decay_lambda,
615                            link_weight_decay_interval_secs: graph_cfg
616                                .link_weight_decay_interval_secs,
617                            belief_revision_enabled: graph_cfg.belief_revision.enabled,
618                            belief_revision_similarity_threshold: graph_cfg
619                                .belief_revision
620                                .similarity_threshold,
621                            conversation_id: None,
622                            apex_mem_enabled: graph_cfg.apex_mem.enabled,
623                            llm_timeout_secs: graph_cfg.llm_timeout_secs,
624                            embed_timeout_secs,
625                            turn_index: None,
626                            write_gate_min_relevance: graph_cfg
627                                .write_gate
628                                .enabled
629                                .then_some(graph_cfg.write_gate.min_edge_relevance),
630                            benna_fast_rate: graph_cfg.spreading_activation.benna_fast_rate,
631                            benna_slow_rate: graph_cfg.spreading_activation.benna_slow_rate,
632                        };
633                        let pool = store.pool().clone();
634                        match extract_and_store(
635                            content.clone(),
636                            vec![],
637                            provider.clone(),
638                            pool,
639                            extraction_cfg,
640                            None,
641                            None,
642                        )
643                        .await
644                        {
645                            Ok(result) => {
646                                total_entities += result.stats.entities_upserted;
647                                total_edges += result.stats.edges_inserted;
648                            }
649                            Err(e) => {
650                                tracing::warn!("backfill extraction error: {e:#}");
651                            }
652                        }
653                    }
654
655                    store
656                        .mark_messages_graph_processed(&ids)
657                        .await
658                        .map_err(|e| CommandError::new(e.to_string()))?;
659                    processed += messages.len();
660
661                    progress_cb(format!(
662                        "Backfill progress: {processed} messages processed, \
663                     {total_entities} entities, {total_edges} edges"
664                    ));
665                }
666
667                Ok(format!(
668                    "Backfill complete: {total_entities} entities, {total_edges} edges \
669                 extracted from {processed} messages"
670                ))
671            }
672            .instrument(tracing::info_span!("core.agent_access.graph_backfill")),
673        )
674    }
675
676    // ----- /guidelines -----
677
678    fn guidelines<'a>(
679        &'a mut self,
680    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
681        Box::pin(
682            async move {
683                const MAX_DISPLAY_CHARS: usize = 4096;
684
685                let Some(memory) = &self.services.memory.persistence.memory else {
686                    return Ok("No memory backend initialised.".to_owned());
687                };
688
689                let cid = self.services.memory.persistence.conversation_id;
690                let sqlite = memory.sqlite();
691
692                let (version, text) = sqlite
693                    .load_compression_guidelines(cid)
694                    .await
695                    .map_err(|e: zeph_memory::MemoryError| CommandError::new(e.to_string()))?;
696
697                if version == 0 || text.is_empty() {
698                    return Ok("No compression guidelines generated yet.".to_owned());
699                }
700
701                let (_, created_at) = sqlite
702                    .load_compression_guidelines_meta(cid)
703                    .await
704                    .unwrap_or((0, String::new()));
705
706                let (body, truncated) = if text.len() > MAX_DISPLAY_CHARS {
707                    let end = text.floor_char_boundary(MAX_DISPLAY_CHARS);
708                    (&text[..end], true)
709                } else {
710                    (text.as_str(), false)
711                };
712
713                let mut output =
714                    format!("Compression Guidelines (v{version}, updated {created_at}):\n\n{body}");
715                if truncated {
716                    output.push_str("\n\n[truncated]");
717                }
718                Ok(output)
719            }
720            .instrument(tracing::info_span!("core.agent_access.guidelines")),
721        )
722    }
723
724    // ----- /model, /provider -----
725
726    fn handle_model<'a>(
727        &'a mut self,
728        arg: &'a str,
729    ) -> Pin<Box<dyn Future<Output = String> + Send + 'a>> {
730        Box::pin(async move {
731            let input = if arg.is_empty() {
732                "/model".to_owned()
733            } else {
734                format!("/model {arg}")
735            };
736            self.handle_model_command_as_string(&input).await
737        })
738    }
739
740    fn handle_provider<'a>(
741        &'a mut self,
742        arg: &'a str,
743    ) -> Pin<Box<dyn Future<Output = String> + Send + 'a>> {
744        Box::pin(async move { self.handle_provider_command_as_string(arg).await })
745    }
746
747    // ----- /policy -----
748
749    fn handle_policy<'a>(
750        &'a mut self,
751        args: &'a str,
752    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
753        Box::pin(async move { Ok(self.handle_policy_command_as_string(args)) })
754    }
755
756    // ----- /scheduler -----
757
758    #[cfg(feature = "scheduler")]
759    fn list_scheduled_tasks<'a>(
760        &'a mut self,
761    ) -> Pin<Box<dyn Future<Output = Result<Option<String>, CommandError>> + Send + 'a>> {
762        Box::pin(async move {
763            let result = self
764                .handle_scheduler_list_as_string()
765                .await
766                .map_err(|e| CommandError::new(e.to_string()))?;
767            Ok(Some(result))
768        })
769    }
770
771    #[cfg(not(feature = "scheduler"))]
772    fn list_scheduled_tasks<'a>(
773        &'a mut self,
774    ) -> Pin<Box<dyn Future<Output = Result<Option<String>, CommandError>> + Send + 'a>> {
775        Box::pin(async move { Ok(None) })
776    }
777
778    // ----- /lsp -----
779
780    fn lsp_status<'a>(
781        &'a mut self,
782    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
783        Box::pin(async move {
784            self.handle_lsp_status_as_string()
785                .await
786                .map_err(|e| CommandError::new(e.to_string()))
787        })
788    }
789
790    // ----- /recap -----
791
792    fn session_recap<'a>(
793        &'a mut self,
794    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
795        Box::pin(
796            async move {
797                match self.build_recap().await {
798                    Ok(text) => Ok(text),
799                    Err(e) => {
800                        // /recap is an explicit user command — surface a fixed message so that
801                        // LlmError internals (URLs with embedded credentials, response excerpts)
802                        // are never forwarded to the user channel. Full detail goes to the log.
803                        tracing::warn!("session recap command: {}", e.0);
804                        Ok("Recap unavailable — see logs for details".to_string())
805                    }
806                }
807            }
808            .instrument(tracing::info_span!("core.agent_access.session_recap")),
809        )
810    }
811
812    // ----- /compact -----
813
814    fn compact_context<'a>(
815        &'a mut self,
816    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
817        Box::pin(
818            self.compact_context_command()
819                .instrument(tracing::info_span!("core.agent_access.compact_context")),
820        )
821    }
822
823    // ----- /new -----
824
825    fn reset_conversation<'a>(
826        &'a mut self,
827        keep_plan: bool,
828        no_digest: bool,
829    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
830        Box::pin(async move {
831            match self.reset_conversation(keep_plan, no_digest).await {
832                Ok((old_id, new_id)) => {
833                    let old = old_id.map_or_else(|| "none".to_string(), |id| id.0.to_string());
834                    let new = new_id.map_or_else(|| "none".to_string(), |id| id.0.to_string());
835                    let keep_note = if keep_plan { " (plan preserved)" } else { "" };
836                    Ok(format!(
837                        "New conversation started. Previous: {old} → Current: {new}{keep_note}"
838                    ))
839                }
840                Err(e) => Ok(format!("Failed to start new conversation: {e}")),
841            }
842        })
843    }
844
845    // ----- /cache-stats -----
846
847    fn cache_stats(&self) -> String {
848        self.tool_orchestrator.cache_stats()
849    }
850
851    // ----- /status -----
852
853    fn session_status<'a>(
854        &'a mut self,
855    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
856        Box::pin(async move { Ok(self.handle_status_as_string()) })
857    }
858
859    // ----- /guardrail -----
860
861    fn guardrail_status(&self) -> String {
862        self.format_guardrail_status()
863    }
864
865    // ----- /focus -----
866
867    fn focus_status(&self) -> String {
868        self.format_focus_status()
869    }
870
871    // ----- /sidequest -----
872
873    fn sidequest_status(&self) -> String {
874        self.format_sidequest_status()
875    }
876
877    // ----- /image -----
878
879    fn load_image<'a>(
880        &'a mut self,
881        path: &'a str,
882    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
883        Box::pin(async move { Ok(self.handle_image_as_string(path)) })
884    }
885
886    // ----- /mcp -----
887
888    fn handle_mcp<'a>(
889        &'a mut self,
890        args: &'a str,
891    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
892        // Extract all owned data before the async block so no &mut self reference is
893        // held across an .await point, satisfying the `for<'a>` Send bound.
894        let args_owned = args.to_owned();
895        let parts: Vec<String> = args_owned.split_whitespace().map(str::to_owned).collect();
896        let sub = parts.first().cloned().unwrap_or_default();
897
898        match sub.as_str() {
899            "list" => {
900                // Read-only: clone all data before async.
901                let manager = self.services.mcp.manager.clone();
902                let tools_snapshot: Vec<(String, String)> = self
903                    .services
904                    .mcp
905                    .tools
906                    .iter()
907                    .map(|t| (t.server_id.clone(), t.name.clone()))
908                    .collect();
909                Box::pin(async move {
910                    use std::fmt::Write;
911                    let Some(manager) = manager else {
912                        return Ok("MCP is not enabled.".to_owned());
913                    };
914                    let server_ids = manager.list_servers().await;
915                    if server_ids.is_empty() {
916                        return Ok("No MCP servers connected.".to_owned());
917                    }
918                    let mut output = String::from("Connected MCP servers:\n");
919                    let mut total = 0usize;
920                    for id in &server_ids {
921                        let count = tools_snapshot.iter().filter(|(sid, _)| sid == id).count();
922                        total += count;
923                        let _ = writeln!(output, "- {id} ({count} tools)");
924                    }
925                    let _ = write!(output, "Total: {total} tool(s)");
926                    Ok(output)
927                })
928            }
929            "tools" => {
930                // Read-only: collect tool info before async.
931                let server_id = parts.get(1).cloned();
932                let owned_tools: Vec<(String, String)> = if let Some(ref sid) = server_id {
933                    self.services
934                        .mcp
935                        .tools
936                        .iter()
937                        .filter(|t| &t.server_id == sid)
938                        .map(|t| (t.name.clone(), t.description.clone()))
939                        .collect()
940                } else {
941                    Vec::new()
942                };
943                Box::pin(async move {
944                    use std::fmt::Write;
945                    let Some(server_id) = server_id else {
946                        return Ok("Usage: /mcp tools <server_id>".to_owned());
947                    };
948                    if owned_tools.is_empty() {
949                        return Ok(format!("No tools found for server '{server_id}'."));
950                    }
951                    let mut output =
952                        format!("Tools for '{server_id}' ({} total):\n", owned_tools.len());
953                    for (name, desc) in &owned_tools {
954                        if desc.is_empty() {
955                            let _ = writeln!(output, "- {name}");
956                        } else {
957                            let _ = writeln!(output, "- {name} — {desc}");
958                        }
959                    }
960                    Ok(output)
961                })
962            }
963            // add/remove require mutating self after async I/O.
964            // handle_mcp_command is structured so the only .await crossing a &mut self
965            // boundary goes through a cloned Arc<McpManager> — no &self fields are held
966            // across that .await.  The subsequent state-change methods (rebuild_semantic_index,
967            // sync_mcp_registry) are also async fn(&mut self), but they only hold owned locals
968            // across their own .await points (cloned tools Vec, cloned Arcs).
969            _ => Box::pin(async move {
970                self.handle_mcp_command(&args_owned)
971                    .await
972                    .map_err(|e| CommandError::new(e.to_string()))
973            }),
974        }
975    }
976
977    // ----- /skill -----
978
979    fn handle_skill<'a>(
980        &'a mut self,
981        args: &'a str,
982    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
983        let args_owned = args.to_owned();
984        Box::pin(async move {
985            self.handle_skill_command_as_string(&args_owned)
986                .await
987                .map_err(|e| CommandError::new(e.to_string()))
988        })
989    }
990
991    // ----- /skills -----
992
993    fn handle_skills<'a>(
994        &'a mut self,
995        args: &'a str,
996    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
997        let args_owned = args.to_owned();
998        Box::pin(async move {
999            self.handle_skills_as_string(&args_owned)
1000                .await
1001                .map_err(|e| CommandError::new(e.to_string()))
1002        })
1003    }
1004
1005    // ----- /feedback -----
1006
1007    fn handle_feedback_command<'a>(
1008        &'a mut self,
1009        args: &'a str,
1010    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1011        let args_owned = args.to_owned();
1012        Box::pin(async move {
1013            self.handle_feedback_as_string(&args_owned)
1014                .await
1015                .map_err(|e| CommandError::new(e.to_string()))
1016        })
1017    }
1018
1019    // ----- /plan -----
1020
1021    #[cfg(feature = "scheduler")]
1022    fn handle_plan<'a>(
1023        &'a mut self,
1024        input: &'a str,
1025    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1026        Box::pin(async move {
1027            self.dispatch_plan_command_as_string(input)
1028                .await
1029                .map_err(|e| CommandError::new(e.to_string()))
1030        })
1031    }
1032
1033    #[cfg(not(feature = "scheduler"))]
1034    fn handle_plan<'a>(
1035        &'a mut self,
1036        _input: &'a str,
1037    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1038        Box::pin(async move { Ok(String::new()) })
1039    }
1040
1041    // ----- /experiment -----
1042
1043    fn handle_experiment<'a>(
1044        &'a mut self,
1045        input: &'a str,
1046    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1047        Box::pin(async move {
1048            self.handle_experiment_command_as_string(input)
1049                .await
1050                .map_err(|e| CommandError::new(e.to_string()))
1051        })
1052    }
1053
1054    // ----- /agent, @mention -----
1055
1056    fn handle_agent_dispatch<'a>(
1057        &'a mut self,
1058        input: &'a str,
1059    ) -> Pin<Box<dyn Future<Output = Result<Option<String>, CommandError>> + Send + 'a>> {
1060        Box::pin(async move {
1061            match self.dispatch_agent_command(input).await {
1062                Some(Err(e)) => Err(CommandError::new(e.to_string())),
1063                Some(Ok(())) | None => Ok(None),
1064            }
1065        })
1066    }
1067
1068    // ----- /plugins -----
1069
1070    fn handle_plugins<'a>(
1071        &'a mut self,
1072        args: &'a str,
1073    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1074        let args_owned = args.to_owned();
1075        // Clone the fields needed by PluginManager before entering the async block.
1076        // spawn_blocking requires 'static, so we cannot borrow &self inside the closure.
1077        let managed_dir = self.services.skill.managed_dir.clone();
1078        let mcp_allowed = self.services.mcp.allowed_commands.clone();
1079        let base_shell_allowed = self.runtime.lifecycle.startup_shell_overlay.allowed.clone();
1080        // Collect ephemeral plugin names for display in the list subcommand.
1081        let ephemeral_names: Vec<String> = self
1082            .runtime
1083            .ephemeral_plugins
1084            .iter()
1085            .filter_map(|tmp| {
1086                let manifest_path = tmp.path().join("plugin.toml");
1087                std::fs::read_to_string(manifest_path)
1088                    .ok()
1089                    .and_then(|s| toml::from_str::<zeph_plugins::PluginManifest>(&s).ok())
1090                    .map(|m| m.plugin.name)
1091            })
1092            .collect();
1093
1094        // Resolve scanner once, before the async block captures `self`.
1095        // Fail-closed: if semantic_scan is enabled but no provider is configured, refuse
1096        // to proceed rather than silently falling back to the primary provider (#4706, #4709).
1097        let semantic_scan_enabled = self.services.skill.semantic_scan;
1098        let maybe_scanner: Option<zeph_skills::semantic_scanner::SkillSemanticScanner> =
1099            if semantic_scan_enabled {
1100                let provider_name = self.services.skill.semantic_scan_provider.as_str();
1101                if provider_name.trim().is_empty() {
1102                    return Box::pin(async move {
1103                        Err(CommandError::new(
1104                            "semantic_scan is enabled but semantic_scan_provider is not set; \
1105                             refusing plugin add to maintain fail-closed security posture",
1106                        ))
1107                    });
1108                }
1109                let provider_known = self
1110                    .runtime
1111                    .providers
1112                    .provider_pool
1113                    .iter()
1114                    .any(|e| e.effective_name().eq_ignore_ascii_case(provider_name));
1115                if !provider_known {
1116                    let name = provider_name.to_owned();
1117                    return Box::pin(async move {
1118                        Err(CommandError::new(format!(
1119                            "semantic_scan is enabled but semantic_scan_provider '{name}' \
1120                             is not configured in [[llm.providers]]; \
1121                             refusing plugin add to maintain fail-closed security posture",
1122                        )))
1123                    });
1124                }
1125                let provider = self.resolve_background_provider(provider_name);
1126                Some(zeph_skills::semantic_scanner::SkillSemanticScanner::new(
1127                    provider,
1128                ))
1129            } else {
1130                None
1131            };
1132
1133        Box::pin(async move {
1134            let (subcmd, source) = args_owned
1135                .trim()
1136                .split_once(' ')
1137                .unwrap_or((args_owned.trim(), ""));
1138
1139            // Stage-2 LLM semantic scan runs before the blocking add(), fail-closed.
1140            if subcmd == "add"
1141                && !source.trim().is_empty()
1142                && let Some(ref scanner) = maybe_scanner
1143                && let Some(err) = semantic_scan_plugin_add(
1144                    scanner,
1145                    source.trim(),
1146                    managed_dir.clone(),
1147                    mcp_allowed.clone(),
1148                    base_shell_allowed.clone(),
1149                )
1150                .instrument(tracing::info_span!("core.agent.scan_plugin", plugin = %source.trim()))
1151                .await?
1152            {
1153                return Ok(err);
1154            }
1155
1156            // PluginManager performs synchronous filesystem I/O (copy, remove_dir_all,
1157            // read_dir). Run on a blocking thread to avoid stalling the tokio worker.
1158            tokio::task::spawn_blocking(move || {
1159                Self::run_plugin_command(
1160                    &args_owned,
1161                    managed_dir,
1162                    mcp_allowed,
1163                    base_shell_allowed,
1164                    ephemeral_names,
1165                )
1166            })
1167            .await
1168            .map_err(|e| CommandError(format!("plugin task panicked: {e}")))
1169        })
1170    }
1171
1172    // ----- /acp -----
1173
1174    fn handle_acp<'a>(
1175        &'a mut self,
1176        args: &'a str,
1177    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1178        Box::pin(async move {
1179            self.handle_acp_as_string(args)
1180                .map_err(|e| CommandError::new(e.to_string()))
1181        })
1182    }
1183
1184    // ----- /cocoon -----
1185
1186    #[cfg(feature = "cocoon")]
1187    fn handle_cocoon<'a>(
1188        &'a mut self,
1189        args: &'a str,
1190    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1191        Box::pin(async move {
1192            self.handle_cocoon_as_string(args)
1193                .await
1194                .map_err(|e| CommandError::new(e.to_string()))
1195        })
1196    }
1197
1198    #[cfg(not(feature = "cocoon"))]
1199    fn handle_cocoon<'a>(
1200        &'a mut self,
1201        _args: &'a str,
1202    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1203        Box::pin(async {
1204            Ok("Cocoon support is not compiled in. Rebuild with `--features cocoon`.".to_owned())
1205        })
1206    }
1207
1208    // ----- /loop -----
1209
1210    fn handle_loop<'a>(
1211        &'a mut self,
1212        args: &'a str,
1213    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1214        use zeph_commands::handlers::loop_cmd::parse_loop_args;
1215
1216        let args_owned = args.trim().to_owned();
1217        Box::pin(async move {
1218            if args_owned == "stop" {
1219                return Ok(self.stop_user_loop());
1220            }
1221            if args_owned == "status" {
1222                return Ok(match &self.runtime.lifecycle.user_loop {
1223                    Some(ls) => format!(
1224                        "Loop active: \"{}\" (iteration {}, interval every {}s).",
1225                        ls.prompt,
1226                        ls.iteration,
1227                        ls.interval.period().as_secs(),
1228                    ),
1229                    None => "No active loop.".to_owned(),
1230                });
1231            }
1232            let (prompt, interval_secs) = parse_loop_args(&args_owned)?;
1233
1234            if prompt.starts_with('/') {
1235                return Err(CommandError::new(
1236                    "Loop prompt must not start with '/'. Slash commands cannot be used as loop prompts.",
1237                ));
1238            }
1239
1240            let min_secs = self.runtime.config.loop_min_interval_secs;
1241            if interval_secs < min_secs {
1242                return Err(CommandError::new(format!(
1243                    "Minimum loop interval is {min_secs}s. Got {interval_secs}s."
1244                )));
1245            }
1246            if self.runtime.lifecycle.user_loop.is_some() {
1247                return Err(CommandError::new(
1248                    "A loop is already active. Use /loop stop first.",
1249                ));
1250            }
1251
1252            self.start_user_loop(prompt.clone(), interval_secs);
1253            Ok(format!(
1254                "Loop started: \"{prompt}\" every {interval_secs}s. Use /loop stop to cancel."
1255            ))
1256        })
1257    }
1258
1259    fn notify_test<'a>(
1260        &'a mut self,
1261    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1262        let notifier = self.runtime.lifecycle.notifier.clone();
1263        Box::pin(async move {
1264            let Some(notifier) = notifier else {
1265                return Ok(
1266                    "Notifications are disabled. Set `notifications.enabled = true` in config."
1267                        .to_owned(),
1268                );
1269            };
1270            match notifier.fire_test().await {
1271                Ok(()) => Ok("Test notification sent.".to_owned()),
1272                Err(e) => Err(CommandError::new(format!("notification test failed: {e}"))),
1273            }
1274        })
1275    }
1276
1277    fn handle_trajectory(&mut self, args: &str) -> String {
1278        self.handle_trajectory_command_as_string(args)
1279    }
1280
1281    fn handle_scope(&self, args: &str) -> String {
1282        self.handle_scope_command_as_string(args)
1283    }
1284
1285    // ----- /goal -----
1286
1287    fn handle_goal<'a>(
1288        &'a mut self,
1289        args: &'a str,
1290    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1291        // Extract all non-Send data synchronously before entering the async block.
1292        if self.services.goal_accounting.is_none() {
1293            if !self.runtime.config.goals.enabled {
1294                return Box::pin(async {
1295                    Ok("Goals are disabled. Set `[goals] enabled = true` in config.".to_owned())
1296                });
1297            }
1298            let pool = match self.services.memory.persistence.memory.as_ref() {
1299                Some(m) => std::sync::Arc::new(m.sqlite().pool().clone()),
1300                None => {
1301                    return Box::pin(async {
1302                        Ok("Goals require a database backend (memory not configured).".to_owned())
1303                    });
1304                }
1305            };
1306            let store = std::sync::Arc::new(crate::goal::GoalStore::new(pool));
1307            let accounting = std::sync::Arc::new(crate::goal::GoalAccounting::new(store));
1308            self.services.goal_accounting = Some(accounting);
1309        }
1310
1311        let accounting =
1312            self.services.goal_accounting.clone().expect(
1313                "invariant: goal_accounting is always Some at this point (initialized above)",
1314            );
1315        let max_chars = self.runtime.config.goals.max_text_chars;
1316        let default_budget = self.runtime.config.goals.default_token_budget;
1317        let autonomous_enabled = self.runtime.config.goals.autonomous_enabled;
1318        let autonomous_max_turns = self.runtime.config.goals.autonomous_max_turns;
1319        let args_owned = args.to_owned();
1320
1321        // S1: `goal_create` may need to arm `AutonomousDriver` with a new session.
1322        // We capture a clone of the pending_start Arc that lives on the driver.
1323        // The async block fills it; the main agent loop (which has `&mut self`) drains it
1324        // via `AutonomousDriver::flush_pending_start()` after each command handler returns.
1325        let pending_start_arc = std::sync::Arc::clone(&self.services.autonomous.pending_start_arc);
1326
1327        Box::pin(async move {
1328            let _ = accounting.refresh().await;
1329            let store = accounting.get_store();
1330            let args = args_owned.as_str();
1331
1332            match args {
1333                "" | "status" => goal_status(&accounting).await,
1334                "pause" => goal_pause(&accounting, &store).await,
1335                "resume" => goal_resume(&accounting, &store).await,
1336                "complete" => goal_complete(&accounting, &store).await,
1337                "clear" => goal_clear(&accounting, &store).await,
1338                "list" => goal_list(&store).await,
1339                _ if args.starts_with("create") => {
1340                    let (msg, auto_req) = goal_create(
1341                        args,
1342                        &accounting,
1343                        &store,
1344                        max_chars,
1345                        default_budget,
1346                        autonomous_enabled,
1347                        autonomous_max_turns,
1348                    )
1349                    .await?;
1350                    if let Some(req) = auto_req {
1351                        *pending_start_arc.lock() = Some(req);
1352                    }
1353                    Ok(msg)
1354                }
1355                _ => Ok(
1356                    "Unknown /goal subcommand. Try: create, pause, resume, complete, clear, status, list."
1357                        .to_owned(),
1358                ),
1359            }
1360        })
1361    }
1362
1363    fn active_goal_snapshot(&self) -> Option<zeph_commands::GoalSnapshot> {
1364        let accounting = self.services.goal_accounting.as_ref()?;
1365        let snap = accounting.snapshot()?;
1366        Some(zeph_commands::GoalSnapshot {
1367            id: snap.id,
1368            text: snap.text,
1369            status: match snap.status {
1370                crate::goal::GoalStatus::Active => zeph_commands::GoalStatusView::Active,
1371                crate::goal::GoalStatus::Paused => zeph_commands::GoalStatusView::Paused,
1372                crate::goal::GoalStatus::Completed => zeph_commands::GoalStatusView::Completed,
1373                crate::goal::GoalStatus::Cleared => zeph_commands::GoalStatusView::Cleared,
1374            },
1375            turns_used: snap.turns_used,
1376            tokens_used: snap.tokens_used,
1377            token_budget: snap.token_budget,
1378        })
1379    }
1380
1381    // ----- /agents -----
1382
1383    fn handle_agents<'a>(
1384        &'a mut self,
1385        args: &'a str,
1386    ) -> Pin<Box<dyn Future<Output = Result<String, CommandError>> + Send + 'a>> {
1387        use zeph_commands::handlers::agents_fleet::{FleetEntry, format_fleet_section};
1388        use zeph_subagent::AgentsCommand;
1389
1390        let args_owned = args.trim().to_owned();
1391        Box::pin(async move {
1392            // Fleet view: bare `/agents` or `/agents fleet` shows autonomous sessions + definitions.
1393            let show_fleet = args_owned.is_empty() || args_owned == "fleet";
1394
1395            let fleet_section = if show_fleet {
1396                let snapshots = self.services.autonomous_registry.list();
1397                let entries: Vec<FleetEntry> = snapshots
1398                    .into_iter()
1399                    .map(|s| FleetEntry {
1400                        goal_id: s.goal_id,
1401                        goal_text_short: s.goal_text_short,
1402                        state: s.state,
1403                        turns_executed: s.turns_executed,
1404                        max_turns: s.max_turns,
1405                        elapsed: s.elapsed,
1406                    })
1407                    .collect();
1408                format_fleet_section(&entries)
1409            } else {
1410                String::new()
1411            };
1412
1413            // Sub-agent definitions section.
1414            let definitions_section = if show_fleet || args_owned == "list" {
1415                self.handle_agents_definitions_list()
1416            } else {
1417                // CRUD subcommands: show, create, edit, delete.
1418                match AgentsCommand::parse(&format!("/agents {args_owned}")) {
1419                    Ok(cmd) => self.handle_agents_crud(cmd),
1420                    Err(e) => e.to_string(),
1421                }
1422            };
1423
1424            let mut out = fleet_section;
1425            if !definitions_section.is_empty() {
1426                if !out.is_empty() {
1427                    out.push('\n');
1428                }
1429                out.push_str(&definitions_section);
1430            }
1431
1432            if out.is_empty() {
1433                "No active autonomous sessions or sub-agent definitions found."
1434                    .clone_into(&mut out);
1435            }
1436
1437            Ok(out)
1438        })
1439    }
1440}
1441
1442type GoalStore = crate::goal::GoalStore;
1443type GoalAccounting = crate::goal::GoalAccounting;
1444
1445/// Hard cap on `--turns` to prevent runaway autonomous loops (Security Low).
1446const AUTONOMOUS_MAX_TURNS_CAP: u32 = 1000;
1447
1448async fn goal_status(accounting: &GoalAccounting) -> Result<String, CommandError> {
1449    match accounting.get_active().await {
1450        Ok(Some(g)) => {
1451            let budget_line = g.token_budget.map_or_else(
1452                || format!("  tokens used: {}", g.tokens_used),
1453                |b| format!("  budget: {}/{b}", g.tokens_used),
1454            );
1455            Ok(format!(
1456                "Active goal [{}]: {}\n  status: {}\n  turns: {}\n{}",
1457                &g.id[..8],
1458                g.text,
1459                g.status,
1460                g.turns_used,
1461                budget_line
1462            ))
1463        }
1464        Ok(None) => Ok("No active goal. Use `/goal create <text>` to set one.".to_owned()),
1465        Err(e) => Ok(format!("Goal lookup failed: {e}")),
1466    }
1467}
1468
1469/// Returns `(display_message, auto_start_request)`.
1470///
1471/// `auto_start_request` is `Some((goal_id, goal_text, max_turns))` when `--auto` was passed and
1472/// the goal was successfully created. The caller must relay this to `AutonomousDriver` via the
1473/// `pending_start_arc` side-channel before the future resolves.
1474async fn goal_create(
1475    args: &str,
1476    accounting: &GoalAccounting,
1477    store: &GoalStore,
1478    max_chars: usize,
1479    default_budget: Option<u64>,
1480    autonomous_enabled: bool,
1481    autonomous_max_turns: u32,
1482) -> Result<(String, Option<(String, String, u32)>), CommandError> {
1483    let rest = args.strip_prefix("create").unwrap_or("").trim();
1484
1485    // Strip --auto / --turns before passing text to the budget parser.
1486    let (stripped, is_auto, explicit_turns) = parse_auto_flags(rest);
1487    let (text, explicit_budget) = parse_goal_create_args(&stripped);
1488
1489    if text.is_empty() {
1490        return Ok((
1491            "Usage: /goal create <text> [--budget N] [--auto [--turns N]]".to_owned(),
1492            None,
1493        ));
1494    }
1495    if is_auto && !autonomous_enabled {
1496        return Ok((
1497            "Autonomous mode is disabled. Set `[goals] autonomous_enabled = true` in config."
1498                .to_owned(),
1499            None,
1500        ));
1501    }
1502    let budget = explicit_budget.or(default_budget.filter(|&b| b > 0));
1503
1504    let max_turns = explicit_turns
1505        .unwrap_or(autonomous_max_turns)
1506        .min(AUTONOMOUS_MAX_TURNS_CAP);
1507    if explicit_turns.is_some_and(|t| t > AUTONOMOUS_MAX_TURNS_CAP) {
1508        tracing::warn!(
1509            requested = explicit_turns,
1510            capped = AUTONOMOUS_MAX_TURNS_CAP,
1511            "autonomous max_turns capped to {AUTONOMOUS_MAX_TURNS_CAP}"
1512        );
1513    }
1514
1515    match store.create(text, budget, max_chars).await {
1516        Ok(g) => {
1517            let _ = accounting.refresh().await;
1518            let auto_start = if is_auto {
1519                Some((g.id.clone(), g.text.clone(), max_turns))
1520            } else {
1521                None
1522            };
1523            let auto_note = if is_auto {
1524                " Autonomous mode enabled — use `/goal clear` to stop."
1525            } else {
1526                ""
1527            };
1528            Ok((
1529                format!("Goal created [{}]: {}{auto_note}", &g.id[..8], g.text),
1530                auto_start,
1531            ))
1532        }
1533        Err(crate::goal::store::GoalError::TextTooLong { max }) => Ok((
1534            format!("Goal text exceeds {max} characters. Please shorten it."),
1535            None,
1536        )),
1537        Err(e) => Ok((format!("Failed to create goal: {e}"), None)),
1538    }
1539}
1540
1541async fn goal_pause(
1542    accounting: &GoalAccounting,
1543    store: &GoalStore,
1544) -> Result<String, CommandError> {
1545    match accounting.get_active().await {
1546        Ok(Some(g)) => {
1547            match store
1548                .transition(&g.id, crate::goal::GoalStatus::Paused, g.updated_at)
1549                .await
1550            {
1551                Ok(_) => {
1552                    let _ = accounting.refresh().await;
1553                    Ok(format!("Goal [{}] paused.", &g.id[..8]))
1554                }
1555                Err(crate::goal::store::GoalError::StaleUpdate(_)) => {
1556                    let current = accounting.get_active().await.ok().flatten();
1557                    Ok(format!(
1558                        "Goal state changed concurrently. Current: {}",
1559                        current.map_or_else(|| "none".into(), |g| g.status.to_string())
1560                    ))
1561                }
1562                Err(e) => Ok(format!("Pause failed: {e}")),
1563            }
1564        }
1565        Ok(None) => Ok("No active goal to pause.".to_owned()),
1566        Err(e) => Ok(format!("Failed: {e}")),
1567    }
1568}
1569
1570async fn goal_resume(
1571    accounting: &GoalAccounting,
1572    store: &GoalStore,
1573) -> Result<String, CommandError> {
1574    let goals = store.list(10).await.unwrap_or_default();
1575    let paused = goals
1576        .into_iter()
1577        .find(|g| g.status == crate::goal::GoalStatus::Paused);
1578    match paused {
1579        Some(g) => {
1580            match store
1581                .transition(&g.id, crate::goal::GoalStatus::Active, g.updated_at)
1582                .await
1583            {
1584                Ok(_) => {
1585                    let _ = accounting.refresh().await;
1586                    Ok(format!("Goal [{}] resumed: {}", &g.id[..8], g.text))
1587                }
1588                Err(crate::goal::store::GoalError::StaleUpdate(_)) => {
1589                    Ok("Goal state changed concurrently — please retry.".to_owned())
1590                }
1591                Err(e) => Ok(format!("Resume failed: {e}")),
1592            }
1593        }
1594        None => Ok("No paused goal to resume.".to_owned()),
1595    }
1596}
1597
1598async fn goal_complete(
1599    accounting: &GoalAccounting,
1600    store: &GoalStore,
1601) -> Result<String, CommandError> {
1602    match accounting.get_active().await {
1603        Ok(Some(g)) => {
1604            match store
1605                .transition(&g.id, crate::goal::GoalStatus::Completed, g.updated_at)
1606                .await
1607            {
1608                Ok(_) => {
1609                    let _ = accounting.refresh().await;
1610                    Ok(format!("Goal [{}] marked complete.", &g.id[..8]))
1611                }
1612                Err(e) => Ok(format!("Complete failed: {e}")),
1613            }
1614        }
1615        Ok(None) => Ok("No active goal.".to_owned()),
1616        Err(e) => Ok(format!("Failed: {e}")),
1617    }
1618}
1619
1620async fn goal_clear(
1621    accounting: &GoalAccounting,
1622    store: &GoalStore,
1623) -> Result<String, CommandError> {
1624    let goals = store.list(10).await.unwrap_or_default();
1625    let target = goals.into_iter().find(|g| {
1626        g.status == crate::goal::GoalStatus::Active || g.status == crate::goal::GoalStatus::Paused
1627    });
1628    match target {
1629        Some(g) => {
1630            match store
1631                .transition(&g.id, crate::goal::GoalStatus::Cleared, g.updated_at)
1632                .await
1633            {
1634                Ok(_) => {
1635                    let _ = accounting.refresh().await;
1636                    Ok(format!("Goal [{}] cleared.", &g.id[..8]))
1637                }
1638                Err(e) => Ok(format!("Clear failed: {e}")),
1639            }
1640        }
1641        None => Ok("No active or paused goal to clear.".to_owned()),
1642    }
1643}
1644
1645async fn goal_list(store: &GoalStore) -> Result<String, CommandError> {
1646    let goals = store.list(20).await.unwrap_or_default();
1647    if goals.is_empty() {
1648        return Ok("No goals recorded.".to_owned());
1649    }
1650    let mut out = String::from("Goals:\n");
1651    for g in goals {
1652        let _ = std::fmt::Write::write_fmt(
1653            &mut out,
1654            format_args!(
1655                "  {} [{}] {} — {} turns\n",
1656                g.status.badge_symbol(),
1657                &g.id[..8],
1658                g.text,
1659                g.turns_used
1660            ),
1661        );
1662    }
1663    Ok(out.trim_end().to_owned())
1664}
1665
1666fn parse_goal_create_args(args: &str) -> (&str, Option<u64>) {
1667    if let Some(pos) = args.find("--budget") {
1668        let text = args[..pos].trim();
1669        let rest = args[pos + "--budget".len()..].trim();
1670        let budget = rest
1671            .split_whitespace()
1672            .next()
1673            .and_then(|s| s.parse::<u64>().ok());
1674        (text, budget)
1675    } else {
1676        (args, None)
1677    }
1678}
1679
1680/// Parse `--auto` and `--turns N` flags from the remainder of a `/goal create` argument string.
1681///
1682/// Returns `(text_without_auto_flags, is_auto, explicit_turns)`.
1683fn parse_auto_flags(args: &str) -> (String, bool, Option<u32>) {
1684    let mut is_auto = false;
1685    let mut turns: Option<u32> = None;
1686    let mut text_words: Vec<&str> = Vec::new();
1687    let mut words = args.split_whitespace();
1688
1689    while let Some(w) = words.next() {
1690        if w == "--auto" {
1691            is_auto = true;
1692        } else if w == "--turns" {
1693            turns = words.next().and_then(|n| n.parse::<u32>().ok());
1694        } else {
1695            text_words.push(w);
1696        }
1697    }
1698
1699    (text_words.join(" "), is_auto, turns)
1700}
1701
1702/// Convert `AgentError` to `CommandError` for the trait boundary.
1703impl From<AgentError> for CommandError {
1704    fn from(e: AgentError) -> Self {
1705        Self(e.to_string())
1706    }
1707}
1708
1709#[cfg(test)]
1710mod tests {
1711    use super::super::agent_tests::{
1712        MockChannel, MockToolExecutor, create_test_registry, mock_provider,
1713    };
1714    use super::*;
1715    use zeph_commands::traits::agent::AgentAccess;
1716    use zeph_memory::semantic::SemanticMemory;
1717
1718    async fn memory_without_qdrant() -> SemanticMemory {
1719        SemanticMemory::new(
1720            ":memory:",
1721            "http://127.0.0.1:1",
1722            None,
1723            zeph_llm::any::AnyProvider::Mock(zeph_llm::mock::MockProvider::default()),
1724            "test-model",
1725        )
1726        .await
1727        .unwrap()
1728    }
1729
1730    // R-CRIT-4111: when graph is enabled in config but graph_store is None
1731    // (Qdrant unreachable), graph command handlers must report
1732    // "unavailable" rather than "not enabled".
1733    #[tokio::test]
1734    async fn graph_stats_enabled_but_no_store_reports_unavailable() {
1735        let cfg = crate::config::GraphConfig {
1736            enabled: true,
1737            ..Default::default()
1738        };
1739        let memory = memory_without_qdrant().await;
1740        let cid = memory.sqlite().create_conversation().await.unwrap();
1741        let mut agent = Agent::new(
1742            mock_provider(vec![]),
1743            MockChannel::new(vec![]),
1744            create_test_registry(),
1745            None,
1746            5,
1747            MockToolExecutor::no_tools(),
1748        )
1749        .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1750        .with_graph_config(cfg);
1751
1752        let result = agent.graph_stats().await.unwrap();
1753        assert!(
1754            result.contains("unavailable"),
1755            "expected 'unavailable' but got: {result}"
1756        );
1757        assert!(
1758            !result.contains("not enabled"),
1759            "must not report 'not enabled' when graph is enabled: {result}"
1760        );
1761    }
1762
1763    #[tokio::test]
1764    async fn graph_stats_disabled_reports_not_enabled() {
1765        let cfg = crate::config::GraphConfig {
1766            enabled: false,
1767            ..Default::default()
1768        };
1769        let memory = memory_without_qdrant().await;
1770        let cid = memory.sqlite().create_conversation().await.unwrap();
1771        let mut agent = Agent::new(
1772            mock_provider(vec![]),
1773            MockChannel::new(vec![]),
1774            create_test_registry(),
1775            None,
1776            5,
1777            MockToolExecutor::no_tools(),
1778        )
1779        .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1780        .with_graph_config(cfg);
1781
1782        let result = agent.graph_stats().await.unwrap();
1783        assert!(
1784            result.contains("not enabled"),
1785            "expected 'not enabled' but got: {result}"
1786        );
1787    }
1788
1789    // R-CRIT-4136: graph_backfill must resolve extract_provider before entering the async block.
1790    // When extract_provider is set to an unknown name, resolve_background_provider falls back to
1791    // the primary provider — the backfill still completes (no messages to process).
1792    // This test confirms that the provider-resolution code path executes without panic or borrow
1793    // errors, which would occur if the old code tried to access `&mut self` inside `async move`.
1794    #[tokio::test]
1795    async fn graph_backfill_with_extract_provider_resolves_without_panic() {
1796        let cfg = crate::config::GraphConfig {
1797            enabled: true,
1798            extract_provider: zeph_config::providers::ProviderName::new("nonexistent-provider"),
1799            ..Default::default()
1800        };
1801        let mut memory = memory_without_qdrant().await;
1802        // Install a real SQLite-backed GraphStore so resolve_graph_store succeeds.
1803        let pool = memory.sqlite().pool().clone();
1804        memory.graph_store = Some(std::sync::Arc::new(zeph_memory::GraphStore::new(pool)));
1805        let cid = memory.sqlite().create_conversation().await.unwrap();
1806        let mut agent = Agent::new(
1807            mock_provider(vec![]),
1808            MockChannel::new(vec![]),
1809            create_test_registry(),
1810            None,
1811            5,
1812            MockToolExecutor::no_tools(),
1813        )
1814        .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1815        .with_graph_config(cfg);
1816
1817        let mut progress = vec![];
1818        let result = agent
1819            .graph_backfill(Some(10), &mut |msg| progress.push(msg))
1820            .await
1821            .unwrap();
1822
1823        // With an empty store there are zero unprocessed messages → backfill completes immediately.
1824        assert!(
1825            result.contains("Backfill complete"),
1826            "expected 'Backfill complete' but got: {result}"
1827        );
1828    }
1829
1830    // R-4139: graph_entities with enabled graph but no store (Qdrant unreachable) must
1831    // report unavailable, not panic or hang.
1832    #[tokio::test]
1833    async fn graph_entities_enabled_but_no_store_reports_unavailable() {
1834        let cfg = crate::config::GraphConfig {
1835            enabled: true,
1836            ..Default::default()
1837        };
1838        let memory = memory_without_qdrant().await;
1839        let cid = memory.sqlite().create_conversation().await.unwrap();
1840        let mut agent = Agent::new(
1841            mock_provider(vec![]),
1842            MockChannel::new(vec![]),
1843            create_test_registry(),
1844            None,
1845            5,
1846            MockToolExecutor::no_tools(),
1847        )
1848        .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1849        .with_graph_config(cfg);
1850
1851        let result = agent.graph_entities().await.unwrap();
1852        assert!(
1853            result.contains("unavailable"),
1854            "expected 'unavailable' but got: {result}"
1855        );
1856    }
1857
1858    // R-4139: graph_communities with enabled graph but no store must report unavailable.
1859    #[tokio::test]
1860    async fn graph_communities_enabled_but_no_store_reports_unavailable() {
1861        let cfg = crate::config::GraphConfig {
1862            enabled: true,
1863            ..Default::default()
1864        };
1865        let memory = memory_without_qdrant().await;
1866        let cid = memory.sqlite().create_conversation().await.unwrap();
1867        let mut agent = Agent::new(
1868            mock_provider(vec![]),
1869            MockChannel::new(vec![]),
1870            create_test_registry(),
1871            None,
1872            5,
1873            MockToolExecutor::no_tools(),
1874        )
1875        .with_memory(std::sync::Arc::new(memory), cid, 50, 5, 100)
1876        .with_graph_config(cfg);
1877
1878        let result = agent.graph_communities().await.unwrap();
1879        assert!(
1880            result.contains("unavailable"),
1881            "expected 'unavailable' but got: {result}"
1882        );
1883    }
1884
1885    // R-4139: verify that the tokio::time::timeout pattern used in graph handlers
1886    // correctly returns Err on a never-resolving future. This is a direct regression
1887    // guard for the fix introduced in #4139: before the fix, these calls had no
1888    // timeout guard and would block indefinitely when Qdrant was unreachable.
1889    #[tokio::test]
1890    async fn graph_store_timeout_pattern_fires_on_pending_future() {
1891        use std::future;
1892        let result = tokio::time::timeout(
1893            Duration::from_millis(10),
1894            future::pending::<Result<Vec<()>, String>>(),
1895        )
1896        .await;
1897        assert!(
1898            result.is_err(),
1899            "timeout must fire on a never-resolving future"
1900        );
1901    }
1902
1903    // R-4706/R-4709: when semantic_scan is enabled but semantic_scan_provider is empty,
1904    // `plugin add` must return a CommandError immediately (fail-closed). Before this fix
1905    // the code fell through to resolve_background_provider which silently used the primary
1906    // provider, bypassing the intent that an unconfigured scanner means "do not proceed".
1907    #[tokio::test]
1908    async fn plugin_add_semantic_scan_enabled_empty_provider_returns_error() {
1909        let mut agent = Agent::new(
1910            mock_provider(vec![]),
1911            MockChannel::new(vec![]),
1912            create_test_registry(),
1913            None,
1914            5,
1915            MockToolExecutor::no_tools(),
1916        )
1917        .with_semantic_scan(true, "");
1918
1919        let result = agent.handle_plugins("add some-plugin").await;
1920        assert!(
1921            result.is_err(),
1922            "expected CommandError for missing semantic_scan_provider, got: {result:?}"
1923        );
1924        let msg = result.unwrap_err().to_string();
1925        assert!(
1926            msg.contains("semantic_scan_provider"),
1927            "error message must mention semantic_scan_provider, got: {msg}"
1928        );
1929    }
1930
1931    // R-4706/R-4709: when semantic_scan is disabled, plugin subcommands must proceed
1932    // normally regardless of whether semantic_scan_provider is set.
1933    #[tokio::test]
1934    async fn plugin_list_semantic_scan_disabled_succeeds() {
1935        let mut agent = Agent::new(
1936            mock_provider(vec![]),
1937            MockChannel::new(vec![]),
1938            create_test_registry(),
1939            None,
1940            5,
1941            MockToolExecutor::no_tools(),
1942        )
1943        .with_semantic_scan(false, "");
1944
1945        // "list" does not trigger scan logic; it should succeed without error.
1946        let result = agent.handle_plugins("list").await;
1947        assert!(
1948            result.is_ok(),
1949            "plugin list must succeed when semantic_scan is disabled, got: {result:?}"
1950        );
1951    }
1952
1953    // R-4706/R-4709: "plugin add" with semantic_scan disabled must reach the install path
1954    // rather than return a scan-related error. The install itself may fail (no real plugin
1955    // source), but it must NOT fail with the fail-closed error message.
1956    #[tokio::test]
1957    async fn plugin_add_semantic_scan_disabled_no_scan_error() {
1958        let mut agent = Agent::new(
1959            mock_provider(vec![]),
1960            MockChannel::new(vec![]),
1961            create_test_registry(),
1962            None,
1963            5,
1964            MockToolExecutor::no_tools(),
1965        )
1966        .with_semantic_scan(false, "");
1967
1968        let result = agent.handle_plugins("add some-plugin").await;
1969        // The call may succeed or fail for unrelated reasons (no real plugin source),
1970        // but must NOT fail with the fail-closed error about semantic_scan_provider.
1971        if let Err(ref e) = result {
1972            assert!(
1973                !e.to_string().contains("semantic_scan_provider"),
1974                "must not fail with scan error when semantic_scan is disabled, got: {e}"
1975            );
1976        }
1977    }
1978
1979    // R-4705: semantic_scan_plugin_add must scan all skills concurrently and return
1980    // None when every scanner call returns Allow. Verifies buffer_unordered path
1981    // processes N inputs without sequential bottleneck.
1982    #[tokio::test]
1983    async fn semantic_scan_plugin_add_concurrent_all_allow_returns_none() {
1984        use zeph_llm::any::AnyProvider;
1985        use zeph_llm::mock::MockProvider;
1986        use zeph_skills::semantic_scanner::SkillSemanticScanner;
1987
1988        // MockProvider returns `{"verdict":"allow","reason":"ok"}` for every call.
1989        let allow_json = r#"{"verdict":"allow","reason":"ok"}"#.to_owned();
1990        let provider = AnyProvider::Mock(MockProvider::with_responses(vec![
1991            allow_json.clone(),
1992            allow_json,
1993        ]));
1994        let scanner = SkillSemanticScanner::new(provider);
1995
1996        // Build a minimal plugin layout with two skills so scan_targets returns
1997        // two SkillScanInput entries.
1998        let tmp = tempfile::tempdir().unwrap();
1999        let plugin_toml = r#"
2000[plugin]
2001name = "test-plugin"
2002version = "0.1.0"
2003description = "test"
2004
2005[[skills]]
2006path = "skill-a"
2007
2008[[skills]]
2009path = "skill-b"
2010"#;
2011        std::fs::write(tmp.path().join("plugin.toml"), plugin_toml).unwrap();
2012        for name in ["skill-a", "skill-b"] {
2013            let skill_dir = tmp.path().join(name);
2014            std::fs::create_dir_all(&skill_dir).unwrap();
2015            std::fs::write(
2016                skill_dir.join("SKILL.md"),
2017                format!("# {name}\n\n## Purpose\nTest skill.\n"),
2018            )
2019            .unwrap();
2020        }
2021
2022        let result =
2023            semantic_scan_plugin_add(&scanner, tmp.path().to_str().unwrap(), None, vec![], vec![])
2024                .await;
2025
2026        // All skills allowed → no error message returned.
2027        assert!(result.is_ok(), "expected Ok, got: {result:?}");
2028        assert!(
2029            result.unwrap().is_none(),
2030            "expected None (all passed) but got Some(err)"
2031        );
2032    }
2033
2034    // R-4705 regression: buffer_unordered yields in completion order, not input order.
2035    // A Block verdict on the *second* skill (index 1) must name that second skill, not the
2036    // first. Before the fix, the code zipped verdicts against scan_inputs by position and
2037    // discarded the tuple's skill_name, so the wrong skill was reported.
2038    #[tokio::test]
2039    async fn semantic_scan_plugin_add_block_names_correct_skill() {
2040        use zeph_llm::any::AnyProvider;
2041        use zeph_llm::mock::MockProvider;
2042        use zeph_skills::semantic_scanner::SkillSemanticScanner;
2043
2044        // First call returns Allow, second returns Block — only the second skill is rejected.
2045        let allow_json = r#"{"verdict":"allow","reason":"ok"}"#.to_owned();
2046        let block_json = r#"{"verdict":"block","reason":"malicious"}"#.to_owned();
2047        let provider =
2048            AnyProvider::Mock(MockProvider::with_responses(vec![allow_json, block_json]));
2049        let scanner = SkillSemanticScanner::new(provider);
2050
2051        let tmp = tempfile::tempdir().unwrap();
2052        let plugin_toml = r#"
2053[plugin]
2054name = "test-plugin-block"
2055version = "0.1.0"
2056description = "test"
2057
2058[[skills]]
2059path = "skill-first"
2060
2061[[skills]]
2062path = "skill-second"
2063"#;
2064        std::fs::write(tmp.path().join("plugin.toml"), plugin_toml).unwrap();
2065        for name in ["skill-first", "skill-second"] {
2066            let skill_dir = tmp.path().join(name);
2067            std::fs::create_dir_all(&skill_dir).unwrap();
2068            std::fs::write(
2069                skill_dir.join("SKILL.md"),
2070                format!("# {name}\n\n## Purpose\nTest skill.\n"),
2071            )
2072            .unwrap();
2073        }
2074
2075        let result =
2076            semantic_scan_plugin_add(&scanner, tmp.path().to_str().unwrap(), None, vec![], vec![])
2077                .await;
2078
2079        assert!(result.is_ok(), "expected Ok(_), got: {result:?}");
2080        let msg = result
2081            .unwrap()
2082            .expect("expected Some(err) for blocked skill");
2083        assert!(
2084            msg.contains("skill-second"),
2085            "rejection must name the blocked skill 'skill-second', got: {msg}"
2086        );
2087        assert!(
2088            !msg.contains("skill-first"),
2089            "rejection must NOT name the allowed skill 'skill-first', got: {msg}"
2090        );
2091    }
2092
2093    // R-4706/R-4709: unknown provider name must also fail-closed rather than silently
2094    // falling back to the primary provider via resolve_background_provider.
2095    #[tokio::test]
2096    async fn plugin_add_semantic_scan_unknown_provider_returns_error() {
2097        let mut agent = Agent::new(
2098            mock_provider(vec![]),
2099            MockChannel::new(vec![]),
2100            create_test_registry(),
2101            None,
2102            5,
2103            MockToolExecutor::no_tools(),
2104        )
2105        .with_semantic_scan(true, "nonexistent_provider");
2106
2107        let result = agent.handle_plugins("add some-plugin").await;
2108        assert!(
2109            result.is_err(),
2110            "expected CommandError for unknown semantic_scan_provider, got: {result:?}"
2111        );
2112        let msg = result.unwrap_err().to_string();
2113        assert!(
2114            msg.contains("semantic_scan_provider"),
2115            "error message must mention semantic_scan_provider, got: {msg}"
2116        );
2117    }
2118}