1use std::fmt::Write as _;
5
6use zeph_memory::{GraphExtractionConfig, extract_and_store};
7
8use super::{Agent, error::AgentError};
9use crate::channel::Channel;
10
11impl<C: Channel> Agent<C> {
12 pub async fn handle_graph_command(&mut self, input: &str) -> Result<(), AgentError> {
18 let args = input.strip_prefix("/graph").unwrap_or("").trim();
19
20 if args.is_empty() {
21 return self.handle_graph_stats().await;
22 }
23 if args == "entities" || args.starts_with("entities ") {
24 return self.handle_graph_entities().await;
25 }
26 if let Some(name) = args.strip_prefix("facts ") {
27 return self.handle_graph_facts(name.trim()).await;
28 }
29 if args == "communities" {
30 return self.handle_graph_communities().await;
31 }
32 if args == "backfill" || args.starts_with("backfill ") {
33 let limit = parse_backfill_limit(args);
34 return self.handle_graph_backfill(limit).await;
35 }
36 if let Some(name) = args.strip_prefix("history ") {
37 return self.handle_graph_history(name.trim()).await;
38 }
39
40 self.channel
41 .send(
42 "Unknown /graph subcommand. Available: /graph, /graph entities, \
43 /graph facts <name>, /graph history <name>, /graph communities, \
44 /graph backfill [--limit N]",
45 )
46 .await?;
47 Ok(())
48 }
49
50 async fn handle_graph_stats(&mut self) -> Result<(), AgentError> {
51 let Some(memory) = self.memory_state.memory.as_ref() else {
52 self.channel.send("Graph memory is not enabled.").await?;
53 return Ok(());
54 };
55 let Some(store) = memory.graph_store.as_ref() else {
56 self.channel.send("Graph memory is not enabled.").await?;
57 return Ok(());
58 };
59
60 let (entities, edges, communities) = tokio::join!(
61 store.entity_count(),
62 store.active_edge_count(),
63 store.community_count()
64 );
65 let msg = format!(
66 "Graph memory: {} entities, {} edges, {} communities",
67 entities.unwrap_or(0),
68 edges.unwrap_or(0),
69 communities.unwrap_or(0)
70 );
71 self.channel.send(&msg).await?;
72 Ok(())
73 }
74
75 async fn handle_graph_entities(&mut self) -> Result<(), AgentError> {
76 let Some(memory) = self.memory_state.memory.as_ref() else {
77 self.channel.send("Graph memory is not enabled.").await?;
78 return Ok(());
79 };
80 let Some(store) = memory.graph_store.as_ref() else {
81 self.channel.send("Graph memory is not enabled.").await?;
82 return Ok(());
83 };
84
85 self.channel.send("Loading graph entities...").await?;
86 let entities = store.all_entities().await?;
87 if entities.is_empty() {
88 self.channel.send("No entities found.").await?;
89 return Ok(());
90 }
91
92 let total = entities.len();
93 let display: Vec<String> = entities
94 .iter()
95 .take(50)
96 .map(|e| {
97 format!(
98 " {:<40} {:<15} {}",
99 e.name,
100 e.entity_type.as_str(),
101 e.last_seen_at.split('T').next().unwrap_or(&e.last_seen_at)
102 )
103 })
104 .collect();
105 let mut msg = format!(
106 "Entities ({total} total):\n {:<40} {:<15} {}\n{}",
107 "NAME",
108 "TYPE",
109 "LAST SEEN",
110 display.join("\n")
111 );
112 if total > 50 {
113 write!(msg, "\n ...and {} more", total - 50).unwrap_or(());
114 }
115 self.channel.send(&msg).await?;
116 Ok(())
117 }
118
119 async fn handle_graph_facts(&mut self, name: &str) -> Result<(), AgentError> {
120 let Some(memory) = self.memory_state.memory.as_ref() else {
121 self.channel.send("Graph memory is not enabled.").await?;
122 return Ok(());
123 };
124 let Some(store) = memory.graph_store.as_ref() else {
125 self.channel.send("Graph memory is not enabled.").await?;
126 return Ok(());
127 };
128
129 let matches = store.find_entity_by_name(name).await?;
130 if matches.is_empty() {
131 self.channel
132 .send(&format!("No entity found matching '{name}'."))
133 .await?;
134 return Ok(());
135 }
136
137 let entity = &matches[0];
138 let edges = store.edges_for_entity(entity.id).await?;
139 if edges.is_empty() {
140 self.channel
141 .send(&format!("Entity '{}' has no known facts.", entity.name))
142 .await?;
143 return Ok(());
144 }
145
146 let mut entity_names: std::collections::HashMap<i64, String> =
148 std::collections::HashMap::new();
149 entity_names.insert(entity.id, entity.name.clone());
150 for edge in &edges {
151 let other_id = if edge.source_entity_id == entity.id {
152 edge.target_entity_id
153 } else {
154 edge.source_entity_id
155 };
156 entity_names.entry(other_id).or_insert_with(|| {
157 String::new()
160 });
161 }
162 for (&id, name_val) in &mut entity_names {
164 if name_val.is_empty() {
165 if let Ok(Some(other)) = store.find_entity_by_id(id).await {
166 *name_val = other.name;
167 } else {
168 *name_val = format!("#{id}");
169 }
170 }
171 }
172
173 let lines: Vec<String> = edges
174 .iter()
175 .map(|e| {
176 let src = entity_names
177 .get(&e.source_entity_id)
178 .cloned()
179 .unwrap_or_else(|| format!("#{}", e.source_entity_id));
180 let tgt = entity_names
181 .get(&e.target_entity_id)
182 .cloned()
183 .unwrap_or_else(|| format!("#{}", e.target_entity_id));
184 format!(
185 " {} --[{}]--> {}: {} (confidence: {:.2})",
186 src, e.relation, tgt, e.fact, e.confidence
187 )
188 })
189 .collect();
190 let msg = format!("Facts for '{}':\n{}", entity.name, lines.join("\n"));
191 self.channel.send(&msg).await?;
192 Ok(())
193 }
194
195 async fn handle_graph_history(&mut self, name: &str) -> Result<(), AgentError> {
196 let Some(memory) = self.memory_state.memory.as_ref() else {
197 self.channel.send("Graph memory is not enabled.").await?;
198 return Ok(());
199 };
200 let Some(store) = memory.graph_store.as_ref() else {
201 self.channel.send("Graph memory is not enabled.").await?;
202 return Ok(());
203 };
204
205 let matches = store.find_entity_by_name(name).await?;
206 if matches.is_empty() {
207 self.channel
208 .send(&format!("No entity found matching '{name}'."))
209 .await?;
210 return Ok(());
211 }
212
213 let entity = &matches[0];
214 let edges = store.edge_history_for_entity(entity.id, 50).await?;
215 if edges.is_empty() {
216 self.channel
217 .send(&format!("Entity '{}' has no edge history.", entity.name))
218 .await?;
219 return Ok(());
220 }
221
222 let mut entity_names: std::collections::HashMap<i64, String> =
224 std::collections::HashMap::new();
225 entity_names.insert(entity.id, entity.name.clone());
226 for edge in &edges {
227 for &id in &[edge.source_entity_id, edge.target_entity_id] {
228 entity_names.entry(id).or_default();
229 }
230 }
231 for (&id, name_val) in &mut entity_names {
232 if name_val.is_empty() {
233 if let Ok(Some(other)) = store.find_entity_by_id(id).await {
234 *name_val = other.name;
235 } else {
236 *name_val = format!("#{id}");
237 }
238 }
239 }
240
241 let n = edges.len();
242 let lines: Vec<String> = edges
243 .iter()
244 .map(|e| {
245 let status = if e.valid_to.is_some() {
246 let date = e
247 .valid_to
248 .as_deref()
249 .and_then(|s| s.split('T').next().or_else(|| s.split(' ').next()))
250 .unwrap_or("?");
251 format!("[expired {date}]")
252 } else {
253 "[active]".to_string()
254 };
255 let src = entity_names
256 .get(&e.source_entity_id)
257 .cloned()
258 .unwrap_or_else(|| format!("#{}", e.source_entity_id));
259 let tgt = entity_names
260 .get(&e.target_entity_id)
261 .cloned()
262 .unwrap_or_else(|| format!("#{}", e.target_entity_id));
263 format!(
264 " {status} {} --[{}]--> {}: {} (confidence: {:.2})",
265 src, e.relation, tgt, e.fact, e.confidence
266 )
267 })
268 .collect();
269 let msg = format!(
270 "Edge history for '{}' ({n} edges):\n{}",
271 entity.name,
272 lines.join("\n")
273 );
274 self.channel.send(&msg).await?;
275 Ok(())
276 }
277
278 async fn handle_graph_communities(&mut self) -> Result<(), AgentError> {
279 let Some(memory) = self.memory_state.memory.as_ref() else {
280 self.channel.send("Graph memory is not enabled.").await?;
281 return Ok(());
282 };
283 let Some(store) = memory.graph_store.as_ref() else {
284 self.channel.send("Graph memory is not enabled.").await?;
285 return Ok(());
286 };
287
288 self.channel.send("Loading graph communities...").await?;
289 let communities = store.all_communities().await?;
290 if communities.is_empty() {
291 self.channel
292 .send("No communities detected yet. Run graph backfill first.")
293 .await?;
294 return Ok(());
295 }
296
297 let lines: Vec<String> = communities
298 .iter()
299 .map(|c| format!(" [{}]: {}", c.name, c.summary))
300 .collect();
301 let msg = format!("Communities ({}):\n{}", communities.len(), lines.join("\n"));
302 self.channel.send(&msg).await?;
303 Ok(())
304 }
305
306 async fn handle_graph_backfill(&mut self, limit: Option<usize>) -> Result<(), AgentError> {
307 let Some(memory) = self.memory_state.memory.clone() else {
308 self.channel.send("Graph memory is not enabled.").await?;
309 return Ok(());
310 };
311 let Some(store) = memory.graph_store.clone() else {
312 self.channel.send("Graph memory is not enabled.").await?;
313 return Ok(());
314 };
315
316 let total = store.unprocessed_message_count().await.unwrap_or(0);
317 let cap = limit.unwrap_or(usize::MAX);
318
319 self.channel
320 .send(&format!(
321 "Starting graph backfill... ({total} unprocessed messages)"
322 ))
323 .await?;
324
325 let batch_size = 50usize;
326 let mut processed = 0usize;
327 let mut total_entities = 0usize;
328 let mut total_edges = 0usize;
329
330 let graph_cfg = self.memory_state.graph_config.clone();
331 let provider = self.provider.clone();
332
333 loop {
334 let remaining_cap = cap.saturating_sub(processed);
335 if remaining_cap == 0 {
336 break;
337 }
338 let batch_limit = batch_size.min(remaining_cap);
339 let messages = store.unprocessed_messages_for_backfill(batch_limit).await?;
340 if messages.is_empty() {
341 break;
342 }
343
344 let ids: Vec<zeph_memory::types::MessageId> =
345 messages.iter().map(|(id, _)| *id).collect();
346
347 for (_id, content) in &messages {
348 if content.trim().is_empty() {
349 continue;
350 }
351 let extraction_cfg = GraphExtractionConfig {
352 max_entities: graph_cfg.max_entities_per_message,
353 max_edges: graph_cfg.max_edges_per_message,
354 extraction_timeout_secs: graph_cfg.extraction_timeout_secs,
355 community_refresh_interval: 0,
356 expired_edge_retention_days: graph_cfg.expired_edge_retention_days,
357 max_entities_cap: graph_cfg.max_entities,
358 community_summary_max_prompt_bytes: graph_cfg
359 .community_summary_max_prompt_bytes,
360 community_summary_concurrency: graph_cfg.community_summary_concurrency,
361 lpa_edge_chunk_size: graph_cfg.lpa_edge_chunk_size,
362 note_linking: zeph_memory::NoteLinkingConfig::default(),
365 };
366 let pool = store.pool().clone();
367 match extract_and_store(
368 content.clone(),
369 vec![],
370 provider.clone(),
371 pool,
372 extraction_cfg,
373 None,
374 )
375 .await
376 {
377 Ok(result) => {
378 total_entities += result.stats.entities_upserted;
379 total_edges += result.stats.edges_inserted;
380 }
381 Err(e) => {
382 tracing::warn!("backfill extraction error: {e:#}");
383 }
384 }
385 }
386
387 store.mark_messages_graph_processed(&ids).await?;
388 processed += messages.len();
389
390 self.channel
391 .send(&format!(
392 "Backfill progress: {processed} messages processed, \
393 {total_entities} entities, {total_edges} edges"
394 ))
395 .await?;
396 }
397
398 self.channel
399 .send(&format!(
400 "Backfill complete: {total_entities} entities, {total_edges} edges \
401 extracted from {processed} messages"
402 ))
403 .await?;
404 Ok(())
405 }
406}
407
408fn parse_backfill_limit(args: &str) -> Option<usize> {
409 let pos = args.find("--limit")?;
410 args[pos + "--limit".len()..]
411 .split_whitespace()
412 .next()
413 .and_then(|s| s.parse::<usize>().ok())
414}
415
416#[cfg(test)]
417mod tests {
418 use super::parse_backfill_limit;
419
420 #[test]
421 fn handle_graph_backfill_limit_parsing() {
422 assert_eq!(parse_backfill_limit("backfill --limit 100"), Some(100));
423 assert_eq!(parse_backfill_limit("backfill"), None);
424 assert_eq!(parse_backfill_limit("backfill --limit"), None);
425 assert_eq!(parse_backfill_limit("backfill --limit 0"), Some(0));
426 }
427
428 #[test]
429 fn parse_graph_history_subcommand() {
430 let args = "history Rust";
431 let name = args.strip_prefix("history ").map(str::trim);
432 assert_eq!(name, Some("Rust"));
433
434 let args2 = "history Alice ";
435 let name2 = args2.strip_prefix("history ").map(str::trim);
436 assert_eq!(name2, Some("Alice"));
437
438 let args3 = "entities";
439 let name3 = args3.strip_prefix("history ");
440 assert_eq!(name3, None);
441 }
442}