zeph_core/agent/
graph_commands.rs1use std::fmt::Write as _;
5
6use zeph_memory::{GraphExtractionConfig, extract_and_store};
7
8use super::{Agent, error::AgentError};
9use crate::channel::Channel;
10
11impl<C: Channel> Agent<C> {
12 pub async fn handle_graph_command(&mut self, input: &str) -> Result<(), AgentError> {
18 let args = input.strip_prefix("/graph").unwrap_or("").trim();
19
20 if args.is_empty() {
21 return self.handle_graph_stats().await;
22 }
23 if args == "entities" || args.starts_with("entities ") {
24 return self.handle_graph_entities().await;
25 }
26 if let Some(name) = args.strip_prefix("facts ") {
27 return self.handle_graph_facts(name.trim()).await;
28 }
29 if args == "communities" {
30 return self.handle_graph_communities().await;
31 }
32 if args == "backfill" || args.starts_with("backfill ") {
33 let limit = parse_backfill_limit(args);
34 return self.handle_graph_backfill(limit).await;
35 }
36
37 self.channel
38 .send(
39 "Unknown /graph subcommand. Available: /graph, /graph entities, \
40 /graph facts <name>, /graph communities, /graph backfill [--limit N]",
41 )
42 .await?;
43 Ok(())
44 }
45
46 async fn handle_graph_stats(&mut self) -> Result<(), AgentError> {
47 let Some(memory) = self.memory_state.memory.as_ref() else {
48 self.channel.send("Graph memory is not enabled.").await?;
49 return Ok(());
50 };
51 let Some(store) = memory.graph_store.as_ref() else {
52 self.channel.send("Graph memory is not enabled.").await?;
53 return Ok(());
54 };
55
56 let (entities, edges, communities) = tokio::join!(
57 store.entity_count(),
58 store.active_edge_count(),
59 store.community_count()
60 );
61 let msg = format!(
62 "Graph memory: {} entities, {} edges, {} communities",
63 entities.unwrap_or(0),
64 edges.unwrap_or(0),
65 communities.unwrap_or(0)
66 );
67 self.channel.send(&msg).await?;
68 Ok(())
69 }
70
71 async fn handle_graph_entities(&mut self) -> Result<(), AgentError> {
72 let Some(memory) = self.memory_state.memory.as_ref() else {
73 self.channel.send("Graph memory is not enabled.").await?;
74 return Ok(());
75 };
76 let Some(store) = memory.graph_store.as_ref() else {
77 self.channel.send("Graph memory is not enabled.").await?;
78 return Ok(());
79 };
80
81 self.channel.send("Loading graph entities...").await?;
82 let entities = store.all_entities().await?;
83 if entities.is_empty() {
84 self.channel.send("No entities found.").await?;
85 return Ok(());
86 }
87
88 let total = entities.len();
89 let display: Vec<String> = entities
90 .iter()
91 .take(50)
92 .map(|e| {
93 format!(
94 " {:<40} {:<15} {}",
95 e.name,
96 e.entity_type.as_str(),
97 e.last_seen_at.split('T').next().unwrap_or(&e.last_seen_at)
98 )
99 })
100 .collect();
101 let mut msg = format!(
102 "Entities ({total} total):\n {:<40} {:<15} {}\n{}",
103 "NAME",
104 "TYPE",
105 "LAST SEEN",
106 display.join("\n")
107 );
108 if total > 50 {
109 write!(msg, "\n ...and {} more", total - 50).unwrap_or(());
110 }
111 self.channel.send(&msg).await?;
112 Ok(())
113 }
114
115 async fn handle_graph_facts(&mut self, name: &str) -> Result<(), AgentError> {
116 let Some(memory) = self.memory_state.memory.as_ref() else {
117 self.channel.send("Graph memory is not enabled.").await?;
118 return Ok(());
119 };
120 let Some(store) = memory.graph_store.as_ref() else {
121 self.channel.send("Graph memory is not enabled.").await?;
122 return Ok(());
123 };
124
125 let matches = store.find_entity_by_name(name).await?;
126 if matches.is_empty() {
127 self.channel
128 .send(&format!("No entity found matching '{name}'."))
129 .await?;
130 return Ok(());
131 }
132
133 let entity = &matches[0];
134 let edges = store.edges_for_entity(entity.id).await?;
135 if edges.is_empty() {
136 self.channel
137 .send(&format!("Entity '{}' has no known facts.", entity.name))
138 .await?;
139 return Ok(());
140 }
141
142 let mut entity_names: std::collections::HashMap<i64, String> =
144 std::collections::HashMap::new();
145 entity_names.insert(entity.id, entity.name.clone());
146 for edge in &edges {
147 let other_id = if edge.source_entity_id == entity.id {
148 edge.target_entity_id
149 } else {
150 edge.source_entity_id
151 };
152 entity_names.entry(other_id).or_insert_with(|| {
153 String::new()
156 });
157 }
158 for (&id, name_val) in &mut entity_names {
160 if name_val.is_empty() {
161 if let Ok(Some(other)) = store.find_entity_by_id(id).await {
162 *name_val = other.name;
163 } else {
164 *name_val = format!("#{id}");
165 }
166 }
167 }
168
169 let lines: Vec<String> = edges
170 .iter()
171 .map(|e| {
172 let src = entity_names
173 .get(&e.source_entity_id)
174 .cloned()
175 .unwrap_or_else(|| format!("#{}", e.source_entity_id));
176 let tgt = entity_names
177 .get(&e.target_entity_id)
178 .cloned()
179 .unwrap_or_else(|| format!("#{}", e.target_entity_id));
180 format!(
181 " {} --[{}]--> {}: {} (confidence: {:.2})",
182 src, e.relation, tgt, e.fact, e.confidence
183 )
184 })
185 .collect();
186 let msg = format!("Facts for '{}':\n{}", entity.name, lines.join("\n"));
187 self.channel.send(&msg).await?;
188 Ok(())
189 }
190
191 async fn handle_graph_communities(&mut self) -> Result<(), AgentError> {
192 let Some(memory) = self.memory_state.memory.as_ref() else {
193 self.channel.send("Graph memory is not enabled.").await?;
194 return Ok(());
195 };
196 let Some(store) = memory.graph_store.as_ref() else {
197 self.channel.send("Graph memory is not enabled.").await?;
198 return Ok(());
199 };
200
201 self.channel.send("Loading graph communities...").await?;
202 let communities = store.all_communities().await?;
203 if communities.is_empty() {
204 self.channel
205 .send("No communities detected yet. Run graph backfill first.")
206 .await?;
207 return Ok(());
208 }
209
210 let lines: Vec<String> = communities
211 .iter()
212 .map(|c| format!(" [{}]: {}", c.name, c.summary))
213 .collect();
214 let msg = format!("Communities ({}):\n{}", communities.len(), lines.join("\n"));
215 self.channel.send(&msg).await?;
216 Ok(())
217 }
218
219 async fn handle_graph_backfill(&mut self, limit: Option<usize>) -> Result<(), AgentError> {
220 let Some(memory) = self.memory_state.memory.clone() else {
221 self.channel.send("Graph memory is not enabled.").await?;
222 return Ok(());
223 };
224 let Some(store) = memory.graph_store.clone() else {
225 self.channel.send("Graph memory is not enabled.").await?;
226 return Ok(());
227 };
228
229 let total = store.unprocessed_message_count().await.unwrap_or(0);
230 let cap = limit.unwrap_or(usize::MAX);
231
232 self.channel
233 .send(&format!(
234 "Starting graph backfill... ({total} unprocessed messages)"
235 ))
236 .await?;
237
238 let batch_size = 50usize;
239 let mut processed = 0usize;
240 let mut total_entities = 0usize;
241 let mut total_edges = 0usize;
242
243 let graph_cfg = self.memory_state.graph_config.clone();
244 let provider = self.provider.clone();
245
246 loop {
247 let remaining_cap = cap.saturating_sub(processed);
248 if remaining_cap == 0 {
249 break;
250 }
251 let batch_limit = batch_size.min(remaining_cap);
252 let messages = store.unprocessed_messages_for_backfill(batch_limit).await?;
253 if messages.is_empty() {
254 break;
255 }
256
257 let ids: Vec<zeph_memory::types::MessageId> =
258 messages.iter().map(|(id, _)| *id).collect();
259
260 for (_id, content) in &messages {
261 if content.trim().is_empty() {
262 continue;
263 }
264 let extraction_cfg = GraphExtractionConfig {
265 max_entities: graph_cfg.max_entities_per_message,
266 max_edges: graph_cfg.max_edges_per_message,
267 extraction_timeout_secs: graph_cfg.extraction_timeout_secs,
268 community_refresh_interval: 0,
269 expired_edge_retention_days: graph_cfg.expired_edge_retention_days,
270 max_entities_cap: graph_cfg.max_entities,
271 community_summary_max_prompt_bytes: graph_cfg
272 .community_summary_max_prompt_bytes,
273 community_summary_concurrency: graph_cfg.community_summary_concurrency,
274 };
275 let pool = store.pool().clone();
276 match extract_and_store(
277 content.clone(),
278 vec![],
279 provider.clone(),
280 pool,
281 extraction_cfg,
282 )
283 .await
284 {
285 Ok(stats) => {
286 total_entities += stats.entities_upserted;
287 total_edges += stats.edges_inserted;
288 }
289 Err(e) => {
290 tracing::warn!("backfill extraction error: {e:#}");
291 }
292 }
293 }
294
295 store.mark_messages_graph_processed(&ids).await?;
296 processed += messages.len();
297
298 self.channel
299 .send(&format!(
300 "Backfill progress: {processed} messages processed, \
301 {total_entities} entities, {total_edges} edges"
302 ))
303 .await?;
304 }
305
306 self.channel
307 .send(&format!(
308 "Backfill complete: {total_entities} entities, {total_edges} edges \
309 extracted from {processed} messages"
310 ))
311 .await?;
312 Ok(())
313 }
314}
315
316fn parse_backfill_limit(args: &str) -> Option<usize> {
317 let pos = args.find("--limit")?;
318 args[pos + "--limit".len()..]
319 .split_whitespace()
320 .next()
321 .and_then(|s| s.parse::<usize>().ok())
322}
323
324#[cfg(test)]
325mod tests {
326 use super::parse_backfill_limit;
327
328 #[test]
329 fn handle_graph_backfill_limit_parsing() {
330 assert_eq!(parse_backfill_limit("backfill --limit 100"), Some(100));
331 assert_eq!(parse_backfill_limit("backfill"), None);
332 assert_eq!(parse_backfill_limit("backfill --limit"), None);
333 assert_eq!(parse_backfill_limit("backfill --limit 0"), Some(0));
334 }
335}