Skip to main content

zeph_core/agent/
graph_commands.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::fmt::Write as _;
5
6use zeph_memory::{GraphExtractionConfig, extract_and_store};
7
8use super::{Agent, error::AgentError};
9use crate::channel::Channel;
10
11impl<C: Channel> Agent<C> {
12    /// Dispatch `/graph [subcommand]` slash command.
13    ///
14    /// # Errors
15    ///
16    /// Returns an error if the channel send fails or graph store query fails.
17    pub async fn handle_graph_command(&mut self, input: &str) -> Result<(), AgentError> {
18        let args = input.strip_prefix("/graph").unwrap_or("").trim();
19
20        if args.is_empty() {
21            return self.handle_graph_stats().await;
22        }
23        if args == "entities" || args.starts_with("entities ") {
24            return self.handle_graph_entities().await;
25        }
26        if let Some(name) = args.strip_prefix("facts ") {
27            return self.handle_graph_facts(name.trim()).await;
28        }
29        if args == "communities" {
30            return self.handle_graph_communities().await;
31        }
32        if args == "backfill" || args.starts_with("backfill ") {
33            let limit = parse_backfill_limit(args);
34            return self.handle_graph_backfill(limit).await;
35        }
36        if let Some(name) = args.strip_prefix("history ") {
37            return self.handle_graph_history(name.trim()).await;
38        }
39
40        self.channel
41            .send(
42                "Unknown /graph subcommand. Available: /graph, /graph entities, \
43                 /graph facts <name>, /graph history <name>, /graph communities, \
44                 /graph backfill [--limit N]",
45            )
46            .await?;
47        Ok(())
48    }
49
50    async fn handle_graph_stats(&mut self) -> Result<(), AgentError> {
51        let Some(memory) = self.memory_state.memory.as_ref() else {
52            self.channel.send("Graph memory is not enabled.").await?;
53            return Ok(());
54        };
55        let Some(store) = memory.graph_store.as_ref() else {
56            self.channel.send("Graph memory is not enabled.").await?;
57            return Ok(());
58        };
59
60        let (entities, edges, communities) = tokio::join!(
61            store.entity_count(),
62            store.active_edge_count(),
63            store.community_count()
64        );
65        let msg = format!(
66            "Graph memory: {} entities, {} edges, {} communities",
67            entities.unwrap_or(0),
68            edges.unwrap_or(0),
69            communities.unwrap_or(0)
70        );
71        self.channel.send(&msg).await?;
72        Ok(())
73    }
74
75    async fn handle_graph_entities(&mut self) -> Result<(), AgentError> {
76        let Some(memory) = self.memory_state.memory.as_ref() else {
77            self.channel.send("Graph memory is not enabled.").await?;
78            return Ok(());
79        };
80        let Some(store) = memory.graph_store.as_ref() else {
81            self.channel.send("Graph memory is not enabled.").await?;
82            return Ok(());
83        };
84
85        self.channel.send("Loading graph entities...").await?;
86        let entities = store.all_entities().await?;
87        if entities.is_empty() {
88            self.channel.send("No entities found.").await?;
89            return Ok(());
90        }
91
92        let total = entities.len();
93        let display: Vec<String> = entities
94            .iter()
95            .take(50)
96            .map(|e| {
97                format!(
98                    "  {:<40}  {:<15}  {}",
99                    e.name,
100                    e.entity_type.as_str(),
101                    e.last_seen_at.split('T').next().unwrap_or(&e.last_seen_at)
102                )
103            })
104            .collect();
105        let mut msg = format!(
106            "Entities ({total} total):\n  {:<40}  {:<15}  {}\n{}",
107            "NAME",
108            "TYPE",
109            "LAST SEEN",
110            display.join("\n")
111        );
112        if total > 50 {
113            write!(msg, "\n  ...and {} more", total - 50).unwrap_or(());
114        }
115        self.channel.send(&msg).await?;
116        Ok(())
117    }
118
119    async fn handle_graph_facts(&mut self, name: &str) -> Result<(), AgentError> {
120        let Some(memory) = self.memory_state.memory.as_ref() else {
121            self.channel.send("Graph memory is not enabled.").await?;
122            return Ok(());
123        };
124        let Some(store) = memory.graph_store.as_ref() else {
125            self.channel.send("Graph memory is not enabled.").await?;
126            return Ok(());
127        };
128
129        let matches = store.find_entity_by_name(name).await?;
130        if matches.is_empty() {
131            self.channel
132                .send(&format!("No entity found matching '{name}'."))
133                .await?;
134            return Ok(());
135        }
136
137        let entity = &matches[0];
138        let edges = store.edges_for_entity(entity.id).await?;
139        if edges.is_empty() {
140            self.channel
141                .send(&format!("Entity '{}' has no known facts.", entity.name))
142                .await?;
143            return Ok(());
144        }
145
146        // Build entity id → name lookup for display
147        let mut entity_names: std::collections::HashMap<i64, String> =
148            std::collections::HashMap::new();
149        entity_names.insert(entity.id, entity.name.clone());
150        for edge in &edges {
151            let other_id = if edge.source_entity_id == entity.id {
152                edge.target_entity_id
153            } else {
154                edge.source_entity_id
155            };
156            entity_names.entry(other_id).or_insert_with(|| {
157                // We'll fill these lazily; for simplicity use a placeholder here
158                // and fetch below.
159                String::new()
160            });
161        }
162        // Fetch names for any entries we inserted as empty placeholder
163        for (&id, name_val) in &mut entity_names {
164            if name_val.is_empty() {
165                if let Ok(Some(other)) = store.find_entity_by_id(id).await {
166                    *name_val = other.name;
167                } else {
168                    *name_val = format!("#{id}");
169                }
170            }
171        }
172
173        let lines: Vec<String> = edges
174            .iter()
175            .map(|e| {
176                let src = entity_names
177                    .get(&e.source_entity_id)
178                    .cloned()
179                    .unwrap_or_else(|| format!("#{}", e.source_entity_id));
180                let tgt = entity_names
181                    .get(&e.target_entity_id)
182                    .cloned()
183                    .unwrap_or_else(|| format!("#{}", e.target_entity_id));
184                format!(
185                    "  {} --[{}]--> {}: {} (confidence: {:.2})",
186                    src, e.relation, tgt, e.fact, e.confidence
187                )
188            })
189            .collect();
190        let msg = format!("Facts for '{}':\n{}", entity.name, lines.join("\n"));
191        self.channel.send(&msg).await?;
192        Ok(())
193    }
194
195    async fn handle_graph_history(&mut self, name: &str) -> Result<(), AgentError> {
196        let Some(memory) = self.memory_state.memory.as_ref() else {
197            self.channel.send("Graph memory is not enabled.").await?;
198            return Ok(());
199        };
200        let Some(store) = memory.graph_store.as_ref() else {
201            self.channel.send("Graph memory is not enabled.").await?;
202            return Ok(());
203        };
204
205        let matches = store.find_entity_by_name(name).await?;
206        if matches.is_empty() {
207            self.channel
208                .send(&format!("No entity found matching '{name}'."))
209                .await?;
210            return Ok(());
211        }
212
213        let entity = &matches[0];
214        let edges = store.edge_history_for_entity(entity.id, 50).await?;
215        if edges.is_empty() {
216            self.channel
217                .send(&format!("Entity '{}' has no edge history.", entity.name))
218                .await?;
219            return Ok(());
220        }
221
222        // Build entity id → name lookup for display
223        let mut entity_names: std::collections::HashMap<i64, String> =
224            std::collections::HashMap::new();
225        entity_names.insert(entity.id, entity.name.clone());
226        for edge in &edges {
227            for &id in &[edge.source_entity_id, edge.target_entity_id] {
228                entity_names.entry(id).or_default();
229            }
230        }
231        for (&id, name_val) in &mut entity_names {
232            if name_val.is_empty() {
233                if let Ok(Some(other)) = store.find_entity_by_id(id).await {
234                    *name_val = other.name;
235                } else {
236                    *name_val = format!("#{id}");
237                }
238            }
239        }
240
241        let n = edges.len();
242        let lines: Vec<String> = edges
243            .iter()
244            .map(|e| {
245                let status = if e.valid_to.is_some() {
246                    let date = e
247                        .valid_to
248                        .as_deref()
249                        .and_then(|s| s.split('T').next().or_else(|| s.split(' ').next()))
250                        .unwrap_or("?");
251                    format!("[expired {date}]")
252                } else {
253                    "[active]".to_string()
254                };
255                let src = entity_names
256                    .get(&e.source_entity_id)
257                    .cloned()
258                    .unwrap_or_else(|| format!("#{}", e.source_entity_id));
259                let tgt = entity_names
260                    .get(&e.target_entity_id)
261                    .cloned()
262                    .unwrap_or_else(|| format!("#{}", e.target_entity_id));
263                format!(
264                    "  {status} {} --[{}]--> {}: {} (confidence: {:.2})",
265                    src, e.relation, tgt, e.fact, e.confidence
266                )
267            })
268            .collect();
269        let msg = format!(
270            "Edge history for '{}' ({n} edges):\n{}",
271            entity.name,
272            lines.join("\n")
273        );
274        self.channel.send(&msg).await?;
275        Ok(())
276    }
277
278    async fn handle_graph_communities(&mut self) -> Result<(), AgentError> {
279        let Some(memory) = self.memory_state.memory.as_ref() else {
280            self.channel.send("Graph memory is not enabled.").await?;
281            return Ok(());
282        };
283        let Some(store) = memory.graph_store.as_ref() else {
284            self.channel.send("Graph memory is not enabled.").await?;
285            return Ok(());
286        };
287
288        self.channel.send("Loading graph communities...").await?;
289        let communities = store.all_communities().await?;
290        if communities.is_empty() {
291            self.channel
292                .send("No communities detected yet. Run graph backfill first.")
293                .await?;
294            return Ok(());
295        }
296
297        let lines: Vec<String> = communities
298            .iter()
299            .map(|c| format!("  [{}]: {}", c.name, c.summary))
300            .collect();
301        let msg = format!("Communities ({}):\n{}", communities.len(), lines.join("\n"));
302        self.channel.send(&msg).await?;
303        Ok(())
304    }
305
306    async fn handle_graph_backfill(&mut self, limit: Option<usize>) -> Result<(), AgentError> {
307        let Some(memory) = self.memory_state.memory.clone() else {
308            self.channel.send("Graph memory is not enabled.").await?;
309            return Ok(());
310        };
311        let Some(store) = memory.graph_store.clone() else {
312            self.channel.send("Graph memory is not enabled.").await?;
313            return Ok(());
314        };
315
316        let total = store.unprocessed_message_count().await.unwrap_or(0);
317        let cap = limit.unwrap_or(usize::MAX);
318
319        self.channel
320            .send(&format!(
321                "Starting graph backfill... ({total} unprocessed messages)"
322            ))
323            .await?;
324
325        let batch_size = 50usize;
326        let mut processed = 0usize;
327        let mut total_entities = 0usize;
328        let mut total_edges = 0usize;
329
330        let graph_cfg = self.memory_state.graph_config.clone();
331        let provider = self.provider.clone();
332
333        loop {
334            let remaining_cap = cap.saturating_sub(processed);
335            if remaining_cap == 0 {
336                break;
337            }
338            let batch_limit = batch_size.min(remaining_cap);
339            let messages = store.unprocessed_messages_for_backfill(batch_limit).await?;
340            if messages.is_empty() {
341                break;
342            }
343
344            let ids: Vec<zeph_memory::types::MessageId> =
345                messages.iter().map(|(id, _)| *id).collect();
346
347            for (_id, content) in &messages {
348                if content.trim().is_empty() {
349                    continue;
350                }
351                let extraction_cfg = GraphExtractionConfig {
352                    max_entities: graph_cfg.max_entities_per_message,
353                    max_edges: graph_cfg.max_edges_per_message,
354                    extraction_timeout_secs: graph_cfg.extraction_timeout_secs,
355                    community_refresh_interval: 0,
356                    expired_edge_retention_days: graph_cfg.expired_edge_retention_days,
357                    max_entities_cap: graph_cfg.max_entities,
358                    community_summary_max_prompt_bytes: graph_cfg
359                        .community_summary_max_prompt_bytes,
360                    community_summary_concurrency: graph_cfg.community_summary_concurrency,
361                    lpa_edge_chunk_size: graph_cfg.lpa_edge_chunk_size,
362                    // Note linking is disabled for backfill — backfill doesn't have an
363                    // embedding store reference in this context.
364                    note_linking: zeph_memory::NoteLinkingConfig::default(),
365                };
366                let pool = store.pool().clone();
367                match extract_and_store(
368                    content.clone(),
369                    vec![],
370                    provider.clone(),
371                    pool,
372                    extraction_cfg,
373                    None,
374                )
375                .await
376                {
377                    Ok(result) => {
378                        total_entities += result.stats.entities_upserted;
379                        total_edges += result.stats.edges_inserted;
380                    }
381                    Err(e) => {
382                        tracing::warn!("backfill extraction error: {e:#}");
383                    }
384                }
385            }
386
387            store.mark_messages_graph_processed(&ids).await?;
388            processed += messages.len();
389
390            self.channel
391                .send(&format!(
392                    "Backfill progress: {processed} messages processed, \
393                     {total_entities} entities, {total_edges} edges"
394                ))
395                .await?;
396        }
397
398        self.channel
399            .send(&format!(
400                "Backfill complete: {total_entities} entities, {total_edges} edges \
401                 extracted from {processed} messages"
402            ))
403            .await?;
404        Ok(())
405    }
406}
407
408fn parse_backfill_limit(args: &str) -> Option<usize> {
409    let pos = args.find("--limit")?;
410    args[pos + "--limit".len()..]
411        .split_whitespace()
412        .next()
413        .and_then(|s| s.parse::<usize>().ok())
414}
415
416#[cfg(test)]
417mod tests {
418    use super::parse_backfill_limit;
419
420    #[test]
421    fn handle_graph_backfill_limit_parsing() {
422        assert_eq!(parse_backfill_limit("backfill --limit 100"), Some(100));
423        assert_eq!(parse_backfill_limit("backfill"), None);
424        assert_eq!(parse_backfill_limit("backfill --limit"), None);
425        assert_eq!(parse_backfill_limit("backfill --limit 0"), Some(0));
426    }
427
428    #[test]
429    fn parse_graph_history_subcommand() {
430        let args = "history Rust";
431        let name = args.strip_prefix("history ").map(str::trim);
432        assert_eq!(name, Some("Rust"));
433
434        let args2 = "history  Alice ";
435        let name2 = args2.strip_prefix("history ").map(str::trim);
436        assert_eq!(name2, Some("Alice"));
437
438        let args3 = "entities";
439        let name3 = args3.strip_prefix("history ");
440        assert_eq!(name3, None);
441    }
442}