Skip to main content

zeph_core/agent/
graph_commands.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::fmt::Write as _;
5
6use zeph_memory::{GraphExtractionConfig, extract_and_store};
7
8use super::{Agent, error::AgentError};
9use crate::channel::Channel;
10
11impl<C: Channel> Agent<C> {
12    /// Dispatch `/graph [subcommand]` slash command.
13    ///
14    /// # Errors
15    ///
16    /// Returns an error if the channel send fails or graph store query fails.
17    pub async fn handle_graph_command(&mut self, input: &str) -> Result<(), AgentError> {
18        let args = input.strip_prefix("/graph").unwrap_or("").trim();
19
20        if args.is_empty() {
21            return self.handle_graph_stats().await;
22        }
23        if args == "entities" || args.starts_with("entities ") {
24            return self.handle_graph_entities().await;
25        }
26        if let Some(name) = args.strip_prefix("facts ") {
27            return self.handle_graph_facts(name.trim()).await;
28        }
29        if args == "communities" {
30            return self.handle_graph_communities().await;
31        }
32        if args == "backfill" || args.starts_with("backfill ") {
33            let limit = parse_backfill_limit(args);
34            return self.handle_graph_backfill(limit).await;
35        }
36
37        self.channel
38            .send(
39                "Unknown /graph subcommand. Available: /graph, /graph entities, \
40                 /graph facts <name>, /graph communities, /graph backfill [--limit N]",
41            )
42            .await?;
43        Ok(())
44    }
45
46    async fn handle_graph_stats(&mut self) -> Result<(), AgentError> {
47        let Some(memory) = self.memory_state.memory.as_ref() else {
48            self.channel.send("Graph memory is not enabled.").await?;
49            return Ok(());
50        };
51        let Some(store) = memory.graph_store.as_ref() else {
52            self.channel.send("Graph memory is not enabled.").await?;
53            return Ok(());
54        };
55
56        let (entities, edges, communities) = tokio::join!(
57            store.entity_count(),
58            store.active_edge_count(),
59            store.community_count()
60        );
61        let msg = format!(
62            "Graph memory: {} entities, {} edges, {} communities",
63            entities.unwrap_or(0),
64            edges.unwrap_or(0),
65            communities.unwrap_or(0)
66        );
67        self.channel.send(&msg).await?;
68        Ok(())
69    }
70
71    async fn handle_graph_entities(&mut self) -> Result<(), AgentError> {
72        let Some(memory) = self.memory_state.memory.as_ref() else {
73            self.channel.send("Graph memory is not enabled.").await?;
74            return Ok(());
75        };
76        let Some(store) = memory.graph_store.as_ref() else {
77            self.channel.send("Graph memory is not enabled.").await?;
78            return Ok(());
79        };
80
81        self.channel.send("Loading graph entities...").await?;
82        let entities = store.all_entities().await?;
83        if entities.is_empty() {
84            self.channel.send("No entities found.").await?;
85            return Ok(());
86        }
87
88        let total = entities.len();
89        let display: Vec<String> = entities
90            .iter()
91            .take(50)
92            .map(|e| {
93                format!(
94                    "  {:<40}  {:<15}  {}",
95                    e.name,
96                    e.entity_type.as_str(),
97                    e.last_seen_at.split('T').next().unwrap_or(&e.last_seen_at)
98                )
99            })
100            .collect();
101        let mut msg = format!(
102            "Entities ({total} total):\n  {:<40}  {:<15}  {}\n{}",
103            "NAME",
104            "TYPE",
105            "LAST SEEN",
106            display.join("\n")
107        );
108        if total > 50 {
109            write!(msg, "\n  ...and {} more", total - 50).unwrap_or(());
110        }
111        self.channel.send(&msg).await?;
112        Ok(())
113    }
114
115    async fn handle_graph_facts(&mut self, name: &str) -> Result<(), AgentError> {
116        let Some(memory) = self.memory_state.memory.as_ref() else {
117            self.channel.send("Graph memory is not enabled.").await?;
118            return Ok(());
119        };
120        let Some(store) = memory.graph_store.as_ref() else {
121            self.channel.send("Graph memory is not enabled.").await?;
122            return Ok(());
123        };
124
125        let matches = store.find_entity_by_name(name).await?;
126        if matches.is_empty() {
127            self.channel
128                .send(&format!("No entity found matching '{name}'."))
129                .await?;
130            return Ok(());
131        }
132
133        let entity = &matches[0];
134        let edges = store.edges_for_entity(entity.id).await?;
135        if edges.is_empty() {
136            self.channel
137                .send(&format!("Entity '{}' has no known facts.", entity.name))
138                .await?;
139            return Ok(());
140        }
141
142        // Build entity id → name lookup for display
143        let mut entity_names: std::collections::HashMap<i64, String> =
144            std::collections::HashMap::new();
145        entity_names.insert(entity.id, entity.name.clone());
146        for edge in &edges {
147            let other_id = if edge.source_entity_id == entity.id {
148                edge.target_entity_id
149            } else {
150                edge.source_entity_id
151            };
152            entity_names.entry(other_id).or_insert_with(|| {
153                // We'll fill these lazily; for simplicity use a placeholder here
154                // and fetch below.
155                String::new()
156            });
157        }
158        // Fetch names for any entries we inserted as empty placeholder
159        for (&id, name_val) in &mut entity_names {
160            if name_val.is_empty() {
161                if let Ok(Some(other)) = store.find_entity_by_id(id).await {
162                    *name_val = other.name;
163                } else {
164                    *name_val = format!("#{id}");
165                }
166            }
167        }
168
169        let lines: Vec<String> = edges
170            .iter()
171            .map(|e| {
172                let src = entity_names
173                    .get(&e.source_entity_id)
174                    .cloned()
175                    .unwrap_or_else(|| format!("#{}", e.source_entity_id));
176                let tgt = entity_names
177                    .get(&e.target_entity_id)
178                    .cloned()
179                    .unwrap_or_else(|| format!("#{}", e.target_entity_id));
180                format!(
181                    "  {} --[{}]--> {}: {} (confidence: {:.2})",
182                    src, e.relation, tgt, e.fact, e.confidence
183                )
184            })
185            .collect();
186        let msg = format!("Facts for '{}':\n{}", entity.name, lines.join("\n"));
187        self.channel.send(&msg).await?;
188        Ok(())
189    }
190
191    async fn handle_graph_communities(&mut self) -> Result<(), AgentError> {
192        let Some(memory) = self.memory_state.memory.as_ref() else {
193            self.channel.send("Graph memory is not enabled.").await?;
194            return Ok(());
195        };
196        let Some(store) = memory.graph_store.as_ref() else {
197            self.channel.send("Graph memory is not enabled.").await?;
198            return Ok(());
199        };
200
201        self.channel.send("Loading graph communities...").await?;
202        let communities = store.all_communities().await?;
203        if communities.is_empty() {
204            self.channel
205                .send("No communities detected yet. Run graph backfill first.")
206                .await?;
207            return Ok(());
208        }
209
210        let lines: Vec<String> = communities
211            .iter()
212            .map(|c| format!("  [{}]: {}", c.name, c.summary))
213            .collect();
214        let msg = format!("Communities ({}):\n{}", communities.len(), lines.join("\n"));
215        self.channel.send(&msg).await?;
216        Ok(())
217    }
218
219    async fn handle_graph_backfill(&mut self, limit: Option<usize>) -> Result<(), AgentError> {
220        let Some(memory) = self.memory_state.memory.clone() else {
221            self.channel.send("Graph memory is not enabled.").await?;
222            return Ok(());
223        };
224        let Some(store) = memory.graph_store.clone() else {
225            self.channel.send("Graph memory is not enabled.").await?;
226            return Ok(());
227        };
228
229        let total = store.unprocessed_message_count().await.unwrap_or(0);
230        let cap = limit.unwrap_or(usize::MAX);
231
232        self.channel
233            .send(&format!(
234                "Starting graph backfill... ({total} unprocessed messages)"
235            ))
236            .await?;
237
238        let batch_size = 50usize;
239        let mut processed = 0usize;
240        let mut total_entities = 0usize;
241        let mut total_edges = 0usize;
242
243        let graph_cfg = self.memory_state.graph_config.clone();
244        let provider = self.provider.clone();
245
246        loop {
247            let remaining_cap = cap.saturating_sub(processed);
248            if remaining_cap == 0 {
249                break;
250            }
251            let batch_limit = batch_size.min(remaining_cap);
252            let messages = store.unprocessed_messages_for_backfill(batch_limit).await?;
253            if messages.is_empty() {
254                break;
255            }
256
257            let ids: Vec<zeph_memory::types::MessageId> =
258                messages.iter().map(|(id, _)| *id).collect();
259
260            for (_id, content) in &messages {
261                if content.trim().is_empty() {
262                    continue;
263                }
264                let extraction_cfg = GraphExtractionConfig {
265                    max_entities: graph_cfg.max_entities_per_message,
266                    max_edges: graph_cfg.max_edges_per_message,
267                    extraction_timeout_secs: graph_cfg.extraction_timeout_secs,
268                    community_refresh_interval: 0,
269                    expired_edge_retention_days: graph_cfg.expired_edge_retention_days,
270                    max_entities_cap: graph_cfg.max_entities,
271                    community_summary_max_prompt_bytes: graph_cfg
272                        .community_summary_max_prompt_bytes,
273                    community_summary_concurrency: graph_cfg.community_summary_concurrency,
274                    lpa_edge_chunk_size: graph_cfg.lpa_edge_chunk_size,
275                };
276                let pool = store.pool().clone();
277                match extract_and_store(
278                    content.clone(),
279                    vec![],
280                    provider.clone(),
281                    pool,
282                    extraction_cfg,
283                )
284                .await
285                {
286                    Ok(stats) => {
287                        total_entities += stats.entities_upserted;
288                        total_edges += stats.edges_inserted;
289                    }
290                    Err(e) => {
291                        tracing::warn!("backfill extraction error: {e:#}");
292                    }
293                }
294            }
295
296            store.mark_messages_graph_processed(&ids).await?;
297            processed += messages.len();
298
299            self.channel
300                .send(&format!(
301                    "Backfill progress: {processed} messages processed, \
302                     {total_entities} entities, {total_edges} edges"
303                ))
304                .await?;
305        }
306
307        self.channel
308            .send(&format!(
309                "Backfill complete: {total_entities} entities, {total_edges} edges \
310                 extracted from {processed} messages"
311            ))
312            .await?;
313        Ok(())
314    }
315}
316
317fn parse_backfill_limit(args: &str) -> Option<usize> {
318    let pos = args.find("--limit")?;
319    args[pos + "--limit".len()..]
320        .split_whitespace()
321        .next()
322        .and_then(|s| s.parse::<usize>().ok())
323}
324
325#[cfg(test)]
326mod tests {
327    use super::parse_backfill_limit;
328
329    #[test]
330    fn handle_graph_backfill_limit_parsing() {
331        assert_eq!(parse_backfill_limit("backfill --limit 100"), Some(100));
332        assert_eq!(parse_backfill_limit("backfill"), None);
333        assert_eq!(parse_backfill_limit("backfill --limit"), None);
334        assert_eq!(parse_backfill_limit("backfill --limit 0"), Some(0));
335    }
336}