1use std::collections::HashMap;
23
24use super::{GraphError, GraphParam, GraphRow, GraphStore};
25
26pub const DEFAULT_INSPECTION_LIMIT: usize = 500;
33
34pub const MAX_INSPECTION_LIMIT: usize = 5_000;
39
40#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct GraphNode {
47 pub name: String,
49 pub memory_pids: Vec<String>,
51 pub first_seen_at: Option<String>,
53}
54
55#[derive(Debug, Clone, PartialEq)]
63pub struct GraphEdge {
64 pub subject: String,
66 pub relation: String,
68 pub object: String,
70 pub confidence: f32,
72 pub valid_from: Option<String>,
74 pub valid_to: Option<String>,
76 pub memory_pids: Vec<String>,
78}
79
80#[derive(Debug, Clone, Default, PartialEq)]
86pub struct GraphSnapshot {
87 pub nodes: Vec<GraphNode>,
89 pub edges: Vec<GraphEdge>,
91 pub truncated: bool,
93}
94
95impl GraphSnapshot {
96 pub fn is_empty(&self) -> bool {
98 self.nodes.is_empty() && self.edges.is_empty()
99 }
100}
101
102pub(super) async fn inspect_scope<G: GraphStore + ?Sized>(
108 store: &G,
109 agent_id: Option<&str>,
110 org_id: Option<&str>,
111 user_id: Option<&str>,
112 limit: usize,
113) -> Result<GraphSnapshot, GraphError> {
114 let limit = limit.clamp(1, MAX_INSPECTION_LIMIT);
115
116 let mut params = HashMap::new();
117 let mut node_terms: Vec<&str> = Vec::new();
118 let mut edge_terms: Vec<&str> = Vec::new();
119 if let Some(agent_id) = agent_id {
120 params.insert("agent_id".to_string(), agent_id.into());
121 node_terms.push("n.agent_id = $agent_id");
122 edge_terms.push("s.agent_id = $agent_id");
123 }
124 if let Some(org_id) = org_id {
125 params.insert("org_id".to_string(), org_id.into());
126 node_terms.push("n.org_id = $org_id");
127 edge_terms.push("s.org_id = $org_id");
128 }
129 if let Some(user_id) = user_id {
130 params.insert("user_id".to_string(), user_id.into());
131 node_terms.push("n.user_id = $user_id");
132 edge_terms.push("s.user_id = $user_id");
133 }
134 params.insert("lim".to_string(), GraphParam::Int(limit as i64));
135
136 let node_where = where_clause(&node_terms);
137 let node_cypher = format!(
138 "MATCH (n:Entity){node_where} \
139 RETURN n.name AS name, n.memory_pids AS memory_pids, n.first_seen_at AS first_seen_at \
140 ORDER BY n.first_seen_at, n.name \
141 LIMIT $lim"
142 );
143
144 let edge_where = where_clause(&edge_terms);
145 let edge_cypher = format!(
146 "MATCH (s:Entity)-[r]->(o:Entity){edge_where} \
147 RETURN s.name AS subject, r.relation AS relation, o.name AS object, \
148 r.confidence AS confidence, r.valid_from AS valid_from, r.valid_to AS valid_to, \
149 r.memory_pids AS memory_pids \
150 ORDER BY r.valid_from \
151 LIMIT $lim"
152 );
153
154 let node_rows = store.query(&node_cypher, ¶ms).await?;
155 let edge_rows = store.query(&edge_cypher, ¶ms).await?;
156
157 let nodes: Vec<GraphNode> = node_rows.iter().filter_map(node_from_row).collect();
158 let edges: Vec<GraphEdge> = edge_rows.iter().filter_map(edge_from_row).collect();
159 let truncated = nodes.len() >= limit || edges.len() >= limit;
160
161 Ok(GraphSnapshot { nodes, edges, truncated })
162}
163
164fn where_clause(terms: &[&str]) -> String {
166 if terms.is_empty() {
167 String::new()
168 } else {
169 format!(" WHERE {}", terms.join(" AND "))
170 }
171}
172
173fn node_from_row(row: &GraphRow) -> Option<GraphNode> {
179 let name = column(row, "name")?.to_string();
180 Some(GraphNode {
181 name,
182 memory_pids: parse_pids(column(row, "memory_pids")),
183 first_seen_at: present(column(row, "first_seen_at")),
184 })
185}
186
187fn edge_from_row(row: &GraphRow) -> Option<GraphEdge> {
193 let subject = column(row, "subject")?.to_string();
194 let relation = column(row, "relation")?.to_string();
195 let object = column(row, "object")?.to_string();
196 let confidence = column(row, "confidence").and_then(|c| c.parse().ok()).unwrap_or(1.0);
197 Some(GraphEdge {
198 subject,
199 relation,
200 object,
201 confidence,
202 valid_from: present(column(row, "valid_from")),
203 valid_to: present(column(row, "valid_to")),
204 memory_pids: parse_pids(column(row, "memory_pids")),
205 })
206}
207
208fn parse_pids(value: Option<&str>) -> Vec<String> {
210 value.and_then(|v| serde_json::from_str(v).ok()).unwrap_or_default()
211}
212
213fn present(value: Option<&str>) -> Option<String> {
215 match value {
216 None => None,
217 Some(v) if v == "null" || v.is_empty() => None,
218 Some(v) => Some(v.to_string()),
219 }
220}
221
222fn column<'a>(row: &'a GraphRow, name: &str) -> Option<&'a str> {
224 row.iter()
225 .find(|(column, _)| column == name)
226 .map(|(_, value)| value.as_str())
227}
228
229#[cfg(test)]
230mod tests {
231 use std::sync::Mutex;
232
233 use super::*;
234 use crate::graph::GraphRows;
235
236 fn row(pairs: &[(&str, &str)]) -> GraphRow {
237 pairs.iter().map(|(k, v)| (k.to_string(), v.to_string())).collect()
238 }
239
240 struct StagedStore {
245 responses: Mutex<Vec<GraphRows>>,
246 calls: Mutex<Vec<(String, HashMap<String, GraphParam>)>>,
247 }
248
249 impl StagedStore {
250 fn new(responses: Vec<GraphRows>) -> Self {
251 Self {
252 responses: Mutex::new(responses),
253 calls: Mutex::default(),
254 }
255 }
256
257 fn empty() -> Self {
258 Self::new(vec![vec![], vec![]])
259 }
260
261 fn calls(&self) -> Vec<(String, HashMap<String, GraphParam>)> {
262 self.calls.lock().unwrap().clone()
263 }
264 }
265
266 impl GraphStore for StagedStore {
267 async fn ensure_graph(&self) -> Result<(), GraphError> {
268 Ok(())
269 }
270
271 async fn query(&self, cypher: &str, params: &HashMap<String, GraphParam>) -> Result<GraphRows, GraphError> {
272 self.calls.lock().unwrap().push((cypher.to_string(), params.clone()));
273 let mut responses = self.responses.lock().unwrap();
274 Ok(if responses.is_empty() {
275 Vec::new()
276 } else {
277 responses.remove(0)
278 })
279 }
280 }
281
282 #[tokio::test(flavor = "current_thread")]
283 async fn should_bind_full_scope_as_params() {
284 let store = StagedStore::empty();
285 inspect_scope(&store, Some("a"), Some("o"), Some("u"), 100).await.unwrap();
286
287 let (node_cypher, params) = &store.calls()[0];
288 assert!(!node_cypher.contains("\"a\""), "scope must not be interpolated");
289 assert_eq!(params.get("agent_id"), Some(&GraphParam::Str("a".to_string())));
290 assert_eq!(params.get("org_id"), Some(&GraphParam::Str("o".to_string())));
291 assert_eq!(params.get("user_id"), Some(&GraphParam::Str("u".to_string())));
292 assert!(node_cypher.contains("n.agent_id = $agent_id"));
293 }
294
295 #[tokio::test(flavor = "current_thread")]
296 async fn should_omit_absent_scope_dimensions() {
297 let store = StagedStore::empty();
298 inspect_scope(&store, None, Some("o"), None, 100).await.unwrap();
299
300 let (node_cypher, params) = &store.calls()[0];
301 assert!(node_cypher.contains("n.org_id = $org_id"));
302 assert!(!node_cypher.contains("agent_id"), "absent dimension imposes no filter");
303 assert!(!node_cypher.contains("user_id"));
304 assert!(!params.contains_key("agent_id"));
305 }
306
307 #[tokio::test(flavor = "current_thread")]
308 async fn should_emit_no_where_clause_for_empty_scope() {
309 let store = StagedStore::empty();
310 inspect_scope(&store, None, None, None, 100).await.unwrap();
311
312 let (node_cypher, _) = &store.calls()[0];
313 assert!(!node_cypher.contains("WHERE"), "no scope -> whole-graph dump");
314 }
315
316 #[tokio::test(flavor = "current_thread")]
317 async fn should_read_both_current_and_superseded_edges() {
318 let store = StagedStore::empty();
319 inspect_scope(&store, Some("a"), Some("o"), Some("u"), 100).await.unwrap();
320
321 let edge_cypher = &store.calls()[1].0;
322 assert!(
323 !edge_cypher.contains("valid_to IS NULL"),
324 "admin view must include superseded edges, not filter to current",
325 );
326 assert!(edge_cypher.contains("r.valid_to AS valid_to"));
327 }
328
329 #[tokio::test(flavor = "current_thread")]
330 async fn should_clamp_limit_to_max() {
331 let store = StagedStore::empty();
332 inspect_scope(&store, None, None, None, MAX_INSPECTION_LIMIT * 10)
333 .await
334 .unwrap();
335 assert_eq!(
336 store.calls()[0].1.get("lim"),
337 Some(&GraphParam::Int(MAX_INSPECTION_LIMIT as i64)),
338 );
339 }
340
341 #[tokio::test(flavor = "current_thread")]
342 async fn should_build_snapshot_from_node_and_edge_rows() {
343 let store = StagedStore::new(vec![
344 vec![row(&[
345 ("name", "Alice"),
346 ("memory_pids", "[\"mem1\",\"mem2\"]"),
347 ("first_seen_at", "2026-06-01T00:00:00+00:00"),
348 ])],
349 vec![row(&[
350 ("subject", "Alice"),
351 ("relation", "works at"),
352 ("object", "Acme"),
353 ("confidence", "0.9"),
354 ("valid_from", "2026-06-01T00:00:00+00:00"),
355 ("valid_to", "null"),
356 ("memory_pids", "[\"mem1\"]"),
357 ])],
358 ]);
359
360 let snapshot = inspect_scope(&store, None, None, None, 100).await.unwrap();
361
362 assert_eq!(snapshot.nodes.len(), 1);
363 assert_eq!(snapshot.nodes[0].name, "Alice");
364 assert_eq!(snapshot.nodes[0].memory_pids, vec!["mem1", "mem2"]);
365 assert_eq!(snapshot.edges.len(), 1);
366 assert_eq!(snapshot.edges[0].object, "Acme");
367 assert!(snapshot.edges[0].valid_to.is_none(), "null valid_to -> current edge");
368 assert!(!snapshot.truncated);
369 }
370
371 #[tokio::test(flavor = "current_thread")]
372 async fn should_surface_superseded_edge_valid_to() {
373 let store = StagedStore::new(vec![
374 vec![],
375 vec![row(&[
376 ("subject", "Alice"),
377 ("relation", "works at"),
378 ("object", "Globex"),
379 ("confidence", "0.8"),
380 ("valid_from", "2026-05-01T00:00:00+00:00"),
381 ("valid_to", "2026-06-01T00:00:00+00:00"),
382 ("memory_pids", "[\"mem0\"]"),
383 ])],
384 ]);
385
386 let snapshot = inspect_scope(&store, None, None, None, 100).await.unwrap();
387 assert_eq!(snapshot.edges[0].valid_to.as_deref(), Some("2026-06-01T00:00:00+00:00"));
388 }
389
390 #[tokio::test(flavor = "current_thread")]
391 async fn should_flag_truncated_when_limit_reached() {
392 let store = StagedStore::new(vec![vec![row(&[("name", "Alice")]), row(&[("name", "Bob")])], vec![]]);
393
394 let snapshot = inspect_scope(&store, None, None, None, 2).await.unwrap();
395 assert!(snapshot.truncated, "node count == limit -> truncated");
396 }
397
398 #[tokio::test(flavor = "current_thread")]
399 async fn should_skip_node_missing_name() {
400 let store = StagedStore::new(vec![vec![row(&[("memory_pids", "[\"mem1\"]")])], vec![]]);
401 let snapshot = inspect_scope(&store, None, None, None, 100).await.unwrap();
402 assert!(snapshot.nodes.is_empty());
403 }
404
405 #[tokio::test(flavor = "current_thread")]
406 async fn should_default_pids_empty_when_unparseable() {
407 let store = StagedStore::new(vec![vec![row(&[("name", "Alice"), ("memory_pids", "not json")])], vec![]]);
408 let snapshot = inspect_scope(&store, None, None, None, 100).await.unwrap();
409 assert!(snapshot.nodes[0].memory_pids.is_empty());
410 }
411}