1use std::fmt::Write as _;
5
6use zeph_memory::{GraphExtractionConfig, extract_and_store};
7
8use super::{Agent, error::AgentError};
9use crate::channel::Channel;
10
11impl<C: Channel> Agent<C> {
12 pub async fn handle_graph_command(&mut self, input: &str) -> Result<(), AgentError> {
18 let args = input.strip_prefix("/graph").unwrap_or("").trim();
19
20 if args.is_empty() {
21 return self.handle_graph_stats().await;
22 }
23 if args == "entities" || args.starts_with("entities ") {
24 return self.handle_graph_entities().await;
25 }
26 if let Some(name) = args.strip_prefix("facts ") {
27 return self.handle_graph_facts(name.trim()).await;
28 }
29 if args == "communities" {
30 return self.handle_graph_communities().await;
31 }
32 if args == "backfill" || args.starts_with("backfill ") {
33 let limit = parse_backfill_limit(args);
34 return self.handle_graph_backfill(limit).await;
35 }
36 if let Some(name) = args.strip_prefix("history ") {
37 return self.handle_graph_history(name.trim()).await;
38 }
39
40 self.channel
41 .send(
42 "Unknown /graph subcommand. Available: /graph, /graph entities, \
43 /graph facts <name>, /graph history <name>, /graph communities, \
44 /graph backfill [--limit N]",
45 )
46 .await?;
47 Ok(())
48 }
49
50 async fn handle_graph_stats(&mut self) -> Result<(), AgentError> {
51 let Some(memory) = self.memory_state.memory.as_ref() else {
52 self.channel.send("Graph memory is not enabled.").await?;
53 return Ok(());
54 };
55 let Some(store) = memory.graph_store.as_ref() else {
56 self.channel.send("Graph memory is not enabled.").await?;
57 return Ok(());
58 };
59
60 let (entities, edges, communities, distribution) = tokio::join!(
61 store.entity_count(),
62 store.active_edge_count(),
63 store.community_count(),
64 store.edge_type_distribution()
65 );
66 let mut msg = format!(
67 "Graph memory: {} entities, {} edges, {} communities",
68 entities.unwrap_or(0),
69 edges.unwrap_or(0),
70 communities.unwrap_or(0)
71 );
72 if let Ok(dist) = distribution
73 && !dist.is_empty()
74 {
75 let dist_str: Vec<String> = dist.iter().map(|(t, c)| format!("{t}={c}")).collect();
76 write!(msg, "\nEdge types: {}", dist_str.join(", ")).unwrap_or(());
77 }
78 self.channel.send(&msg).await?;
79 Ok(())
80 }
81
82 async fn handle_graph_entities(&mut self) -> Result<(), AgentError> {
83 let Some(memory) = self.memory_state.memory.as_ref() else {
84 self.channel.send("Graph memory is not enabled.").await?;
85 return Ok(());
86 };
87 let Some(store) = memory.graph_store.as_ref() else {
88 self.channel.send("Graph memory is not enabled.").await?;
89 return Ok(());
90 };
91
92 self.channel.send("Loading graph entities...").await?;
93 let entities = store.all_entities().await?;
94 if entities.is_empty() {
95 self.channel.send("No entities found.").await?;
96 return Ok(());
97 }
98
99 let total = entities.len();
100 let display: Vec<String> = entities
101 .iter()
102 .take(50)
103 .map(|e| {
104 format!(
105 " {:<40} {:<15} {}",
106 e.name,
107 e.entity_type.as_str(),
108 e.last_seen_at.split('T').next().unwrap_or(&e.last_seen_at)
109 )
110 })
111 .collect();
112 let mut msg = format!(
113 "Entities ({total} total):\n {:<40} {:<15} {}\n{}",
114 "NAME",
115 "TYPE",
116 "LAST SEEN",
117 display.join("\n")
118 );
119 if total > 50 {
120 write!(msg, "\n ...and {} more", total - 50).unwrap_or(());
121 }
122 self.channel.send(&msg).await?;
123 Ok(())
124 }
125
126 async fn handle_graph_facts(&mut self, name: &str) -> Result<(), AgentError> {
127 let Some(memory) = self.memory_state.memory.as_ref() else {
128 self.channel.send("Graph memory is not enabled.").await?;
129 return Ok(());
130 };
131 let Some(store) = memory.graph_store.as_ref() else {
132 self.channel.send("Graph memory is not enabled.").await?;
133 return Ok(());
134 };
135
136 let matches = store.find_entity_by_name(name).await?;
137 if matches.is_empty() {
138 self.channel
139 .send(&format!("No entity found matching '{name}'."))
140 .await?;
141 return Ok(());
142 }
143
144 let entity = &matches[0];
145 let edges = store.edges_for_entity(entity.id).await?;
146 if edges.is_empty() {
147 self.channel
148 .send(&format!("Entity '{}' has no known facts.", entity.name))
149 .await?;
150 return Ok(());
151 }
152
153 let mut entity_names: std::collections::HashMap<i64, String> =
155 std::collections::HashMap::new();
156 entity_names.insert(entity.id, entity.name.clone());
157 for edge in &edges {
158 let other_id = if edge.source_entity_id == entity.id {
159 edge.target_entity_id
160 } else {
161 edge.source_entity_id
162 };
163 entity_names.entry(other_id).or_insert_with(|| {
164 String::new()
167 });
168 }
169 for (&id, name_val) in &mut entity_names {
171 if name_val.is_empty() {
172 if let Ok(Some(other)) = store.find_entity_by_id(id).await {
173 *name_val = other.name;
174 } else {
175 *name_val = format!("#{id}");
176 }
177 }
178 }
179
180 let lines: Vec<String> = edges
181 .iter()
182 .map(|e| {
183 let src = entity_names
184 .get(&e.source_entity_id)
185 .cloned()
186 .unwrap_or_else(|| format!("#{}", e.source_entity_id));
187 let tgt = entity_names
188 .get(&e.target_entity_id)
189 .cloned()
190 .unwrap_or_else(|| format!("#{}", e.target_entity_id));
191 format!(
192 " {} --[{}/{}]--> {}: {} (confidence: {:.2})",
193 src, e.relation, e.edge_type, tgt, e.fact, e.confidence
194 )
195 })
196 .collect();
197 let msg = format!("Facts for '{}':\n{}", entity.name, lines.join("\n"));
198 self.channel.send(&msg).await?;
199 Ok(())
200 }
201
202 async fn handle_graph_history(&mut self, name: &str) -> Result<(), AgentError> {
203 let Some(memory) = self.memory_state.memory.as_ref() else {
204 self.channel.send("Graph memory is not enabled.").await?;
205 return Ok(());
206 };
207 let Some(store) = memory.graph_store.as_ref() else {
208 self.channel.send("Graph memory is not enabled.").await?;
209 return Ok(());
210 };
211
212 let matches = store.find_entity_by_name(name).await?;
213 if matches.is_empty() {
214 self.channel
215 .send(&format!("No entity found matching '{name}'."))
216 .await?;
217 return Ok(());
218 }
219
220 let entity = &matches[0];
221 let edges = store.edge_history_for_entity(entity.id, 50).await?;
222 if edges.is_empty() {
223 self.channel
224 .send(&format!("Entity '{}' has no edge history.", entity.name))
225 .await?;
226 return Ok(());
227 }
228
229 let mut entity_names: std::collections::HashMap<i64, String> =
231 std::collections::HashMap::new();
232 entity_names.insert(entity.id, entity.name.clone());
233 for edge in &edges {
234 for &id in &[edge.source_entity_id, edge.target_entity_id] {
235 entity_names.entry(id).or_default();
236 }
237 }
238 for (&id, name_val) in &mut entity_names {
239 if name_val.is_empty() {
240 if let Ok(Some(other)) = store.find_entity_by_id(id).await {
241 *name_val = other.name;
242 } else {
243 *name_val = format!("#{id}");
244 }
245 }
246 }
247
248 let n = edges.len();
249 let lines: Vec<String> = edges
250 .iter()
251 .map(|e| {
252 let status = if e.valid_to.is_some() {
253 let date = e
254 .valid_to
255 .as_deref()
256 .and_then(|s| s.split('T').next().or_else(|| s.split(' ').next()))
257 .unwrap_or("?");
258 format!("[expired {date}]")
259 } else {
260 "[active]".to_string()
261 };
262 let src = entity_names
263 .get(&e.source_entity_id)
264 .cloned()
265 .unwrap_or_else(|| format!("#{}", e.source_entity_id));
266 let tgt = entity_names
267 .get(&e.target_entity_id)
268 .cloned()
269 .unwrap_or_else(|| format!("#{}", e.target_entity_id));
270 format!(
271 " {status} {} --[{}/{}]--> {}: {} (confidence: {:.2})",
272 src, e.relation, e.edge_type, tgt, e.fact, e.confidence
273 )
274 })
275 .collect();
276 let msg = format!(
277 "Edge history for '{}' ({n} edges):\n{}",
278 entity.name,
279 lines.join("\n")
280 );
281 self.channel.send(&msg).await?;
282 Ok(())
283 }
284
285 async fn handle_graph_communities(&mut self) -> Result<(), AgentError> {
286 let Some(memory) = self.memory_state.memory.as_ref() else {
287 self.channel.send("Graph memory is not enabled.").await?;
288 return Ok(());
289 };
290 let Some(store) = memory.graph_store.as_ref() else {
291 self.channel.send("Graph memory is not enabled.").await?;
292 return Ok(());
293 };
294
295 self.channel.send("Loading graph communities...").await?;
296 let communities = store.all_communities().await?;
297 if communities.is_empty() {
298 self.channel
299 .send("No communities detected yet. Run graph backfill first.")
300 .await?;
301 return Ok(());
302 }
303
304 let lines: Vec<String> = communities
305 .iter()
306 .map(|c| format!(" [{}]: {}", c.name, c.summary))
307 .collect();
308 let msg = format!("Communities ({}):\n{}", communities.len(), lines.join("\n"));
309 self.channel.send(&msg).await?;
310 Ok(())
311 }
312
313 async fn handle_graph_backfill(&mut self, limit: Option<usize>) -> Result<(), AgentError> {
314 let Some(memory) = self.memory_state.memory.clone() else {
315 self.channel.send("Graph memory is not enabled.").await?;
316 return Ok(());
317 };
318 let Some(store) = memory.graph_store.clone() else {
319 self.channel.send("Graph memory is not enabled.").await?;
320 return Ok(());
321 };
322
323 let total = store.unprocessed_message_count().await.unwrap_or(0);
324 let cap = limit.unwrap_or(usize::MAX);
325
326 self.channel
327 .send(&format!(
328 "Starting graph backfill... ({total} unprocessed messages)"
329 ))
330 .await?;
331
332 let batch_size = 50usize;
333 let mut processed = 0usize;
334 let mut total_entities = 0usize;
335 let mut total_edges = 0usize;
336
337 let graph_cfg = self.memory_state.graph_config.clone();
338 let provider = self.provider.clone();
339
340 loop {
341 let remaining_cap = cap.saturating_sub(processed);
342 if remaining_cap == 0 {
343 break;
344 }
345 let batch_limit = batch_size.min(remaining_cap);
346 let messages = store.unprocessed_messages_for_backfill(batch_limit).await?;
347 if messages.is_empty() {
348 break;
349 }
350
351 let ids: Vec<zeph_memory::types::MessageId> =
352 messages.iter().map(|(id, _)| *id).collect();
353
354 for (_id, content) in &messages {
355 if content.trim().is_empty() {
356 continue;
357 }
358 let extraction_cfg = GraphExtractionConfig {
359 max_entities: graph_cfg.max_entities_per_message,
360 max_edges: graph_cfg.max_edges_per_message,
361 extraction_timeout_secs: graph_cfg.extraction_timeout_secs,
362 community_refresh_interval: 0,
363 expired_edge_retention_days: graph_cfg.expired_edge_retention_days,
364 max_entities_cap: graph_cfg.max_entities,
365 community_summary_max_prompt_bytes: graph_cfg
366 .community_summary_max_prompt_bytes,
367 community_summary_concurrency: graph_cfg.community_summary_concurrency,
368 lpa_edge_chunk_size: graph_cfg.lpa_edge_chunk_size,
369 note_linking: zeph_memory::NoteLinkingConfig::default(),
372 link_weight_decay_lambda: graph_cfg.link_weight_decay_lambda,
373 link_weight_decay_interval_secs: graph_cfg.link_weight_decay_interval_secs,
374 };
375 let pool = store.pool().clone();
376 match extract_and_store(
377 content.clone(),
378 vec![],
379 provider.clone(),
380 pool,
381 extraction_cfg,
382 None,
383 None,
384 )
385 .await
386 {
387 Ok(result) => {
388 total_entities += result.stats.entities_upserted;
389 total_edges += result.stats.edges_inserted;
390 }
391 Err(e) => {
392 tracing::warn!("backfill extraction error: {e:#}");
393 }
394 }
395 }
396
397 store.mark_messages_graph_processed(&ids).await?;
398 processed += messages.len();
399
400 self.channel
401 .send(&format!(
402 "Backfill progress: {processed} messages processed, \
403 {total_entities} entities, {total_edges} edges"
404 ))
405 .await?;
406 }
407
408 self.channel
409 .send(&format!(
410 "Backfill complete: {total_entities} entities, {total_edges} edges \
411 extracted from {processed} messages"
412 ))
413 .await?;
414 Ok(())
415 }
416}
417
418fn parse_backfill_limit(args: &str) -> Option<usize> {
419 let pos = args.find("--limit")?;
420 args[pos + "--limit".len()..]
421 .split_whitespace()
422 .next()
423 .and_then(|s| s.parse::<usize>().ok())
424}
425
426#[cfg(test)]
427mod tests {
428 use super::parse_backfill_limit;
429
430 #[test]
431 fn handle_graph_backfill_limit_parsing() {
432 assert_eq!(parse_backfill_limit("backfill --limit 100"), Some(100));
433 assert_eq!(parse_backfill_limit("backfill"), None);
434 assert_eq!(parse_backfill_limit("backfill --limit"), None);
435 assert_eq!(parse_backfill_limit("backfill --limit 0"), Some(0));
436 }
437
438 #[test]
439 fn parse_graph_history_subcommand() {
440 let args = "history Rust";
441 let name = args.strip_prefix("history ").map(str::trim);
442 assert_eq!(name, Some("Rust"));
443
444 let args2 = "history Alice ";
445 let name2 = args2.strip_prefix("history ").map(str::trim);
446 assert_eq!(name2, Some("Alice"));
447
448 let args3 = "entities";
449 let name3 = args3.strip_prefix("history ");
450 assert_eq!(name3, None);
451 }
452}