Skip to main content

zeph_core/agent/
graph_commands.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::fmt::Write as _;
5
6use zeph_memory::{GraphExtractionConfig, extract_and_store};
7
8use super::{Agent, error::AgentError};
9use crate::channel::Channel;
10
11impl<C: Channel> Agent<C> {
12    /// Dispatch `/graph [subcommand]` slash command.
13    ///
14    /// # Errors
15    ///
16    /// Returns an error if the channel send fails or graph store query fails.
17    pub async fn handle_graph_command(&mut self, input: &str) -> Result<(), AgentError> {
18        let args = input.strip_prefix("/graph").unwrap_or("").trim();
19
20        if args.is_empty() {
21            return self.handle_graph_stats().await;
22        }
23        if args == "entities" || args.starts_with("entities ") {
24            return self.handle_graph_entities().await;
25        }
26        if let Some(name) = args.strip_prefix("facts ") {
27            return self.handle_graph_facts(name.trim()).await;
28        }
29        if args == "communities" {
30            return self.handle_graph_communities().await;
31        }
32        if args == "backfill" || args.starts_with("backfill ") {
33            let limit = parse_backfill_limit(args);
34            return self.handle_graph_backfill(limit).await;
35        }
36        if let Some(name) = args.strip_prefix("history ") {
37            return self.handle_graph_history(name.trim()).await;
38        }
39
40        self.channel
41            .send(
42                "Unknown /graph subcommand. Available: /graph, /graph entities, \
43                 /graph facts <name>, /graph history <name>, /graph communities, \
44                 /graph backfill [--limit N]",
45            )
46            .await?;
47        Ok(())
48    }
49
50    async fn handle_graph_stats(&mut self) -> Result<(), AgentError> {
51        let Some(memory) = self.memory_state.memory.as_ref() else {
52            self.channel.send("Graph memory is not enabled.").await?;
53            return Ok(());
54        };
55        let Some(store) = memory.graph_store.as_ref() else {
56            self.channel.send("Graph memory is not enabled.").await?;
57            return Ok(());
58        };
59
60        let (entities, edges, communities, distribution) = tokio::join!(
61            store.entity_count(),
62            store.active_edge_count(),
63            store.community_count(),
64            store.edge_type_distribution()
65        );
66        let mut msg = format!(
67            "Graph memory: {} entities, {} edges, {} communities",
68            entities.unwrap_or(0),
69            edges.unwrap_or(0),
70            communities.unwrap_or(0)
71        );
72        if let Ok(dist) = distribution
73            && !dist.is_empty()
74        {
75            let dist_str: Vec<String> = dist.iter().map(|(t, c)| format!("{t}={c}")).collect();
76            write!(msg, "\nEdge types: {}", dist_str.join(", ")).unwrap_or(());
77        }
78        self.channel.send(&msg).await?;
79        Ok(())
80    }
81
82    async fn handle_graph_entities(&mut self) -> Result<(), AgentError> {
83        let Some(memory) = self.memory_state.memory.as_ref() else {
84            self.channel.send("Graph memory is not enabled.").await?;
85            return Ok(());
86        };
87        let Some(store) = memory.graph_store.as_ref() else {
88            self.channel.send("Graph memory is not enabled.").await?;
89            return Ok(());
90        };
91
92        self.channel.send("Loading graph entities...").await?;
93        let entities = store.all_entities().await?;
94        if entities.is_empty() {
95            self.channel.send("No entities found.").await?;
96            return Ok(());
97        }
98
99        let total = entities.len();
100        let display: Vec<String> = entities
101            .iter()
102            .take(50)
103            .map(|e| {
104                format!(
105                    "  {:<40}  {:<15}  {}",
106                    e.name,
107                    e.entity_type.as_str(),
108                    e.last_seen_at.split('T').next().unwrap_or(&e.last_seen_at)
109                )
110            })
111            .collect();
112        let mut msg = format!(
113            "Entities ({total} total):\n  {:<40}  {:<15}  {}\n{}",
114            "NAME",
115            "TYPE",
116            "LAST SEEN",
117            display.join("\n")
118        );
119        if total > 50 {
120            write!(msg, "\n  ...and {} more", total - 50).unwrap_or(());
121        }
122        self.channel.send(&msg).await?;
123        Ok(())
124    }
125
126    async fn handle_graph_facts(&mut self, name: &str) -> Result<(), AgentError> {
127        let Some(memory) = self.memory_state.memory.as_ref() else {
128            self.channel.send("Graph memory is not enabled.").await?;
129            return Ok(());
130        };
131        let Some(store) = memory.graph_store.as_ref() else {
132            self.channel.send("Graph memory is not enabled.").await?;
133            return Ok(());
134        };
135
136        let matches = store.find_entity_by_name(name).await?;
137        if matches.is_empty() {
138            self.channel
139                .send(&format!("No entity found matching '{name}'."))
140                .await?;
141            return Ok(());
142        }
143
144        let entity = &matches[0];
145        let edges = store.edges_for_entity(entity.id).await?;
146        if edges.is_empty() {
147            self.channel
148                .send(&format!("Entity '{}' has no known facts.", entity.name))
149                .await?;
150            return Ok(());
151        }
152
153        // Build entity id → name lookup for display
154        let mut entity_names: std::collections::HashMap<i64, String> =
155            std::collections::HashMap::new();
156        entity_names.insert(entity.id, entity.name.clone());
157        for edge in &edges {
158            let other_id = if edge.source_entity_id == entity.id {
159                edge.target_entity_id
160            } else {
161                edge.source_entity_id
162            };
163            entity_names.entry(other_id).or_insert_with(|| {
164                // We'll fill these lazily; for simplicity use a placeholder here
165                // and fetch below.
166                String::new()
167            });
168        }
169        // Fetch names for any entries we inserted as empty placeholder
170        for (&id, name_val) in &mut entity_names {
171            if name_val.is_empty() {
172                if let Ok(Some(other)) = store.find_entity_by_id(id).await {
173                    *name_val = other.name;
174                } else {
175                    *name_val = format!("#{id}");
176                }
177            }
178        }
179
180        let lines: Vec<String> = edges
181            .iter()
182            .map(|e| {
183                let src = entity_names
184                    .get(&e.source_entity_id)
185                    .cloned()
186                    .unwrap_or_else(|| format!("#{}", e.source_entity_id));
187                let tgt = entity_names
188                    .get(&e.target_entity_id)
189                    .cloned()
190                    .unwrap_or_else(|| format!("#{}", e.target_entity_id));
191                format!(
192                    "  {} --[{}/{}]--> {}: {} (confidence: {:.2})",
193                    src, e.relation, e.edge_type, tgt, e.fact, e.confidence
194                )
195            })
196            .collect();
197        let msg = format!("Facts for '{}':\n{}", entity.name, lines.join("\n"));
198        self.channel.send(&msg).await?;
199        Ok(())
200    }
201
202    async fn handle_graph_history(&mut self, name: &str) -> Result<(), AgentError> {
203        let Some(memory) = self.memory_state.memory.as_ref() else {
204            self.channel.send("Graph memory is not enabled.").await?;
205            return Ok(());
206        };
207        let Some(store) = memory.graph_store.as_ref() else {
208            self.channel.send("Graph memory is not enabled.").await?;
209            return Ok(());
210        };
211
212        let matches = store.find_entity_by_name(name).await?;
213        if matches.is_empty() {
214            self.channel
215                .send(&format!("No entity found matching '{name}'."))
216                .await?;
217            return Ok(());
218        }
219
220        let entity = &matches[0];
221        let edges = store.edge_history_for_entity(entity.id, 50).await?;
222        if edges.is_empty() {
223            self.channel
224                .send(&format!("Entity '{}' has no edge history.", entity.name))
225                .await?;
226            return Ok(());
227        }
228
229        // Build entity id → name lookup for display
230        let mut entity_names: std::collections::HashMap<i64, String> =
231            std::collections::HashMap::new();
232        entity_names.insert(entity.id, entity.name.clone());
233        for edge in &edges {
234            for &id in &[edge.source_entity_id, edge.target_entity_id] {
235                entity_names.entry(id).or_default();
236            }
237        }
238        for (&id, name_val) in &mut entity_names {
239            if name_val.is_empty() {
240                if let Ok(Some(other)) = store.find_entity_by_id(id).await {
241                    *name_val = other.name;
242                } else {
243                    *name_val = format!("#{id}");
244                }
245            }
246        }
247
248        let n = edges.len();
249        let lines: Vec<String> = edges
250            .iter()
251            .map(|e| {
252                let status = if e.valid_to.is_some() {
253                    let date = e
254                        .valid_to
255                        .as_deref()
256                        .and_then(|s| s.split('T').next().or_else(|| s.split(' ').next()))
257                        .unwrap_or("?");
258                    format!("[expired {date}]")
259                } else {
260                    "[active]".to_string()
261                };
262                let src = entity_names
263                    .get(&e.source_entity_id)
264                    .cloned()
265                    .unwrap_or_else(|| format!("#{}", e.source_entity_id));
266                let tgt = entity_names
267                    .get(&e.target_entity_id)
268                    .cloned()
269                    .unwrap_or_else(|| format!("#{}", e.target_entity_id));
270                format!(
271                    "  {status} {} --[{}/{}]--> {}: {} (confidence: {:.2})",
272                    src, e.relation, e.edge_type, tgt, e.fact, e.confidence
273                )
274            })
275            .collect();
276        let msg = format!(
277            "Edge history for '{}' ({n} edges):\n{}",
278            entity.name,
279            lines.join("\n")
280        );
281        self.channel.send(&msg).await?;
282        Ok(())
283    }
284
285    async fn handle_graph_communities(&mut self) -> Result<(), AgentError> {
286        let Some(memory) = self.memory_state.memory.as_ref() else {
287            self.channel.send("Graph memory is not enabled.").await?;
288            return Ok(());
289        };
290        let Some(store) = memory.graph_store.as_ref() else {
291            self.channel.send("Graph memory is not enabled.").await?;
292            return Ok(());
293        };
294
295        self.channel.send("Loading graph communities...").await?;
296        let communities = store.all_communities().await?;
297        if communities.is_empty() {
298            self.channel
299                .send("No communities detected yet. Run graph backfill first.")
300                .await?;
301            return Ok(());
302        }
303
304        let lines: Vec<String> = communities
305            .iter()
306            .map(|c| format!("  [{}]: {}", c.name, c.summary))
307            .collect();
308        let msg = format!("Communities ({}):\n{}", communities.len(), lines.join("\n"));
309        self.channel.send(&msg).await?;
310        Ok(())
311    }
312
313    async fn handle_graph_backfill(&mut self, limit: Option<usize>) -> Result<(), AgentError> {
314        let Some(memory) = self.memory_state.memory.clone() else {
315            self.channel.send("Graph memory is not enabled.").await?;
316            return Ok(());
317        };
318        let Some(store) = memory.graph_store.clone() else {
319            self.channel.send("Graph memory is not enabled.").await?;
320            return Ok(());
321        };
322
323        let total = store.unprocessed_message_count().await.unwrap_or(0);
324        let cap = limit.unwrap_or(usize::MAX);
325
326        self.channel
327            .send(&format!(
328                "Starting graph backfill... ({total} unprocessed messages)"
329            ))
330            .await?;
331
332        let batch_size = 50usize;
333        let mut processed = 0usize;
334        let mut total_entities = 0usize;
335        let mut total_edges = 0usize;
336
337        let graph_cfg = self.memory_state.graph_config.clone();
338        let provider = self.provider.clone();
339
340        loop {
341            let remaining_cap = cap.saturating_sub(processed);
342            if remaining_cap == 0 {
343                break;
344            }
345            let batch_limit = batch_size.min(remaining_cap);
346            let messages = store.unprocessed_messages_for_backfill(batch_limit).await?;
347            if messages.is_empty() {
348                break;
349            }
350
351            let ids: Vec<zeph_memory::types::MessageId> =
352                messages.iter().map(|(id, _)| *id).collect();
353
354            for (_id, content) in &messages {
355                if content.trim().is_empty() {
356                    continue;
357                }
358                let extraction_cfg = GraphExtractionConfig {
359                    max_entities: graph_cfg.max_entities_per_message,
360                    max_edges: graph_cfg.max_edges_per_message,
361                    extraction_timeout_secs: graph_cfg.extraction_timeout_secs,
362                    community_refresh_interval: 0,
363                    expired_edge_retention_days: graph_cfg.expired_edge_retention_days,
364                    max_entities_cap: graph_cfg.max_entities,
365                    community_summary_max_prompt_bytes: graph_cfg
366                        .community_summary_max_prompt_bytes,
367                    community_summary_concurrency: graph_cfg.community_summary_concurrency,
368                    lpa_edge_chunk_size: graph_cfg.lpa_edge_chunk_size,
369                    // Note linking is disabled for backfill — backfill doesn't have an
370                    // embedding store reference in this context.
371                    note_linking: zeph_memory::NoteLinkingConfig::default(),
372                    link_weight_decay_lambda: graph_cfg.link_weight_decay_lambda,
373                    link_weight_decay_interval_secs: graph_cfg.link_weight_decay_interval_secs,
374                };
375                let pool = store.pool().clone();
376                match extract_and_store(
377                    content.clone(),
378                    vec![],
379                    provider.clone(),
380                    pool,
381                    extraction_cfg,
382                    None,
383                    None,
384                )
385                .await
386                {
387                    Ok(result) => {
388                        total_entities += result.stats.entities_upserted;
389                        total_edges += result.stats.edges_inserted;
390                    }
391                    Err(e) => {
392                        tracing::warn!("backfill extraction error: {e:#}");
393                    }
394                }
395            }
396
397            store.mark_messages_graph_processed(&ids).await?;
398            processed += messages.len();
399
400            self.channel
401                .send(&format!(
402                    "Backfill progress: {processed} messages processed, \
403                     {total_entities} entities, {total_edges} edges"
404                ))
405                .await?;
406        }
407
408        self.channel
409            .send(&format!(
410                "Backfill complete: {total_entities} entities, {total_edges} edges \
411                 extracted from {processed} messages"
412            ))
413            .await?;
414        Ok(())
415    }
416}
417
418fn parse_backfill_limit(args: &str) -> Option<usize> {
419    let pos = args.find("--limit")?;
420    args[pos + "--limit".len()..]
421        .split_whitespace()
422        .next()
423        .and_then(|s| s.parse::<usize>().ok())
424}
425
426#[cfg(test)]
427mod tests {
428    use super::parse_backfill_limit;
429
430    #[test]
431    fn handle_graph_backfill_limit_parsing() {
432        assert_eq!(parse_backfill_limit("backfill --limit 100"), Some(100));
433        assert_eq!(parse_backfill_limit("backfill"), None);
434        assert_eq!(parse_backfill_limit("backfill --limit"), None);
435        assert_eq!(parse_backfill_limit("backfill --limit 0"), Some(0));
436    }
437
438    #[test]
439    fn parse_graph_history_subcommand() {
440        let args = "history Rust";
441        let name = args.strip_prefix("history ").map(str::trim);
442        assert_eq!(name, Some("Rust"));
443
444        let args2 = "history  Alice ";
445        let name2 = args2.strip_prefix("history ").map(str::trim);
446        assert_eq!(name2, Some("Alice"));
447
448        let args3 = "entities";
449        let name3 = args3.strip_prefix("history ");
450        assert_eq!(name3, None);
451    }
452}