Skip to main content

memoir_core/graph/
inspect.rs

1//! Admin read-path: a whole-scope snapshot of the graph.
2//!
3//! The admin "Knowledge graph view" (`.tasks/README.md:587`): every entity and
4//! relationship in a scope, for an operator to inspect or render. Unlike
5//! read-path enrichment ([`super::enrich`]), this is *scope-anchored*, not
6//! *seed-anchored* — there are no seed memories, the whole (possibly partial)
7//! scope is dumped — and it returns **both** current and superseded edges, each
8//! flagged by `valid_to`, so the admin UI can render the temporal history.
9//!
10//! Scope is *partial*: any of `agent_id` / `org_id` / `user_id` may be absent,
11//! and an absent dimension imposes no filter. This is the one cross-scope read
12//! in memoir (an admin can view across agents/users/orgs); it is read-only and
13//! gated by the caller's auth layer (memoir-service's `require_admin`). The
14//! write, forget, and enrichment paths keep full-scope-tuple isolation.
15//!
16//! The snapshot carries richer per-element provenance than the flat enrichment
17//! [`GraphContext`](super::GraphContext): nodes carry `memory_pids` and
18//! `first_seen_at`, edges carry `valid_from`, `valid_to`, and `memory_pids`.
19//! Source memory *content* is not hydrated here — the consumer resolves
20//! `memory_pids` against Postgres if it needs the underlying utterances.
21
22use std::collections::HashMap;
23
24use super::{GraphError, GraphParam, GraphRow, GraphStore};
25
26/// Default cap on the nodes and on the edges a single inspection returns.
27///
28/// A scope's full graph can be large; an unbounded dump risks an enormous
29/// payload and a heavy backend scan. The cap applies independently to nodes and
30/// to edges (each limited to this many), and the snapshot flags when either was
31/// truncated so the UI knows the view is partial.
32pub const DEFAULT_INSPECTION_LIMIT: usize = 500;
33
34/// Hard upper bound on a caller-supplied inspection limit.
35///
36/// Clamps an over-large request so an admin cannot ask for an unbounded scan;
37/// mirrors the failed-jobs limit discipline (`services/admin.rs`).
38pub const MAX_INSPECTION_LIMIT: usize = 5_000;
39
40/// An entity node in an admin graph snapshot.
41///
42/// Untyped in v1 (`:Entity`, ticket 0005) — carries the canonical `name` for
43/// identity, plus the provenance the admin view wants: which memories
44/// contributed it and when it first appeared.
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct GraphNode {
47    /// The entity node's canonical name (its identity within a scope).
48    pub name: String,
49    /// Public ids of the memories that contributed this entity.
50    pub memory_pids: Vec<String>,
51    /// When this entity was first seen, RFC 3339 (the commit's `first_seen_at`).
52    pub first_seen_at: Option<String>,
53}
54
55/// A relationship edge in an admin graph snapshot.
56///
57/// Carries the full temporal state: `valid_from` (when the fact became true) and
58/// `valid_to` (`None` = current, `Some` = superseded at that time). Both current
59/// and closed edges are returned so the admin UI can render history — the reason
60/// this type carries `valid_to` where the enrichment
61/// [`GraphRelationship`](super::GraphRelationship) does not.
62#[derive(Debug, Clone, PartialEq)]
63pub struct GraphEdge {
64    /// The subject entity's name.
65    pub subject: String,
66    /// The relation label (open vocabulary, the original extracted string).
67    pub relation: String,
68    /// The object entity's name.
69    pub object: String,
70    /// The extractor's confidence in this relationship, 0.0-1.0.
71    pub confidence: f32,
72    /// When the fact became true, RFC 3339.
73    pub valid_from: Option<String>,
74    /// When the fact was superseded, RFC 3339; `None` for a current edge.
75    pub valid_to: Option<String>,
76    /// Public ids of the memories that contributed this relationship.
77    pub memory_pids: Vec<String>,
78}
79
80/// A whole-scope snapshot of the graph for admin inspection.
81///
82/// Every node and edge in the (possibly partial) scope, up to the inspection
83/// limit. `truncated` is set when either list hit the cap, so the consumer knows
84/// the view is incomplete rather than the scope being small.
85#[derive(Debug, Clone, Default, PartialEq)]
86pub struct GraphSnapshot {
87    /// Every entity in scope, ordered by `first_seen_at` then `name`.
88    pub nodes: Vec<GraphNode>,
89    /// Every relationship in scope — current and superseded.
90    pub edges: Vec<GraphEdge>,
91    /// Whether the node or edge list was capped at the inspection limit.
92    pub truncated: bool,
93}
94
95impl GraphSnapshot {
96    /// Returns whether the snapshot holds no nodes or edges.
97    pub fn is_empty(&self) -> bool {
98        self.nodes.is_empty() && self.edges.is_empty()
99    }
100}
101
102/// Backs [`GraphStore::inspect_scope`]; see that method for semantics.
103///
104/// Nodes and edges are read in two separate queries rather than one path-match:
105/// an edge-anchored traversal would drop isolated entities, which are real graph
106/// data the admin view must show.
107pub(super) async fn inspect_scope<G: GraphStore + ?Sized>(
108    store: &G,
109    agent_id: Option<&str>,
110    org_id: Option<&str>,
111    user_id: Option<&str>,
112    limit: usize,
113) -> Result<GraphSnapshot, GraphError> {
114    let limit = limit.clamp(1, MAX_INSPECTION_LIMIT);
115
116    let mut params = HashMap::new();
117    let mut node_terms: Vec<&str> = Vec::new();
118    let mut edge_terms: Vec<&str> = Vec::new();
119    if let Some(agent_id) = agent_id {
120        params.insert("agent_id".to_string(), agent_id.into());
121        node_terms.push("n.agent_id = $agent_id");
122        edge_terms.push("s.agent_id = $agent_id");
123    }
124    if let Some(org_id) = org_id {
125        params.insert("org_id".to_string(), org_id.into());
126        node_terms.push("n.org_id = $org_id");
127        edge_terms.push("s.org_id = $org_id");
128    }
129    if let Some(user_id) = user_id {
130        params.insert("user_id".to_string(), user_id.into());
131        node_terms.push("n.user_id = $user_id");
132        edge_terms.push("s.user_id = $user_id");
133    }
134    params.insert("lim".to_string(), GraphParam::Int(limit as i64));
135
136    let node_where = where_clause(&node_terms);
137    let node_cypher = format!(
138        "MATCH (n:Entity){node_where} \
139         RETURN n.name AS name, n.memory_pids AS memory_pids, n.first_seen_at AS first_seen_at \
140         ORDER BY n.first_seen_at, n.name \
141         LIMIT $lim"
142    );
143
144    let edge_where = where_clause(&edge_terms);
145    let edge_cypher = format!(
146        "MATCH (s:Entity)-[r]->(o:Entity){edge_where} \
147         RETURN s.name AS subject, r.relation AS relation, o.name AS object, \
148                r.confidence AS confidence, r.valid_from AS valid_from, r.valid_to AS valid_to, \
149                r.memory_pids AS memory_pids \
150         ORDER BY r.valid_from \
151         LIMIT $lim"
152    );
153
154    let node_rows = store.query(&node_cypher, &params).await?;
155    let edge_rows = store.query(&edge_cypher, &params).await?;
156
157    let nodes: Vec<GraphNode> = node_rows.iter().filter_map(node_from_row).collect();
158    let edges: Vec<GraphEdge> = edge_rows.iter().filter_map(edge_from_row).collect();
159    let truncated = nodes.len() >= limit || edges.len() >= limit;
160
161    Ok(GraphSnapshot { nodes, edges, truncated })
162}
163
164/// Joins scope predicates into a `WHERE` clause, or empty when unconstrained.
165fn where_clause(terms: &[&str]) -> String {
166    if terms.is_empty() {
167        String::new()
168    } else {
169        format!(" WHERE {}", terms.join(" AND "))
170    }
171}
172
173/// Parses a [`GraphNode`] from a node result row.
174///
175/// A row missing its `name` is skipped — one malformed node should not break the
176/// whole snapshot. `memory_pids` parses from the JSON array the commit writes;
177/// an unparseable value yields an empty list rather than dropping the node.
178fn node_from_row(row: &GraphRow) -> Option<GraphNode> {
179    let name = column(row, "name")?.to_string();
180    Some(GraphNode {
181        name,
182        memory_pids: parse_pids(column(row, "memory_pids")),
183        first_seen_at: present(column(row, "first_seen_at")),
184    })
185}
186
187/// Parses a [`GraphEdge`] from an edge result row.
188///
189/// A row missing subject, relation, or object is skipped. `valid_to` carries
190/// through as `None` for a current edge (the backend renders null as the absent
191/// sentinel), `Some` for a superseded one.
192fn edge_from_row(row: &GraphRow) -> Option<GraphEdge> {
193    let subject = column(row, "subject")?.to_string();
194    let relation = column(row, "relation")?.to_string();
195    let object = column(row, "object")?.to_string();
196    let confidence = column(row, "confidence").and_then(|c| c.parse().ok()).unwrap_or(1.0);
197    Some(GraphEdge {
198        subject,
199        relation,
200        object,
201        confidence,
202        valid_from: present(column(row, "valid_from")),
203        valid_to: present(column(row, "valid_to")),
204        memory_pids: parse_pids(column(row, "memory_pids")),
205    })
206}
207
208/// Parses the `memory_pids` JSON array, defaulting to empty on any other shape.
209fn parse_pids(value: Option<&str>) -> Vec<String> {
210    value.and_then(|v| serde_json::from_str(v).ok()).unwrap_or_default()
211}
212
213/// Maps a backend null sentinel (absent column or `"null"`) to `None`.
214fn present(value: Option<&str>) -> Option<String> {
215    match value {
216        None => None,
217        Some(v) if v == "null" || v.is_empty() => None,
218        Some(v) => Some(v.to_string()),
219    }
220}
221
222/// Returns the value of the column named `name` in a result row.
223fn column<'a>(row: &'a GraphRow, name: &str) -> Option<&'a str> {
224    row.iter()
225        .find(|(column, _)| column == name)
226        .map(|(_, value)| value.as_str())
227}
228
229#[cfg(test)]
230mod tests {
231    use std::sync::Mutex;
232
233    use super::*;
234    use crate::graph::GraphRows;
235
236    fn row(pairs: &[(&str, &str)]) -> GraphRow {
237        pairs.iter().map(|(k, v)| (k.to_string(), v.to_string())).collect()
238    }
239
240    /// Returns staged node/edge rows in turn, recording each (cypher, params) call.
241    ///
242    /// `inspect_scope` issues the node query first, then the edge query, so the
243    /// staged responses are drained in that order.
244    struct StagedStore {
245        responses: Mutex<Vec<GraphRows>>,
246        calls: Mutex<Vec<(String, HashMap<String, GraphParam>)>>,
247    }
248
249    impl StagedStore {
250        fn new(responses: Vec<GraphRows>) -> Self {
251            Self {
252                responses: Mutex::new(responses),
253                calls: Mutex::default(),
254            }
255        }
256
257        fn empty() -> Self {
258            Self::new(vec![vec![], vec![]])
259        }
260
261        fn calls(&self) -> Vec<(String, HashMap<String, GraphParam>)> {
262            self.calls.lock().unwrap().clone()
263        }
264    }
265
266    impl GraphStore for StagedStore {
267        async fn ensure_graph(&self) -> Result<(), GraphError> {
268            Ok(())
269        }
270
271        async fn query(&self, cypher: &str, params: &HashMap<String, GraphParam>) -> Result<GraphRows, GraphError> {
272            self.calls.lock().unwrap().push((cypher.to_string(), params.clone()));
273            let mut responses = self.responses.lock().unwrap();
274            Ok(if responses.is_empty() {
275                Vec::new()
276            } else {
277                responses.remove(0)
278            })
279        }
280    }
281
282    #[tokio::test(flavor = "current_thread")]
283    async fn should_bind_full_scope_as_params() {
284        let store = StagedStore::empty();
285        inspect_scope(&store, Some("a"), Some("o"), Some("u"), 100).await.unwrap();
286
287        let (node_cypher, params) = &store.calls()[0];
288        assert!(!node_cypher.contains("\"a\""), "scope must not be interpolated");
289        assert_eq!(params.get("agent_id"), Some(&GraphParam::Str("a".to_string())));
290        assert_eq!(params.get("org_id"), Some(&GraphParam::Str("o".to_string())));
291        assert_eq!(params.get("user_id"), Some(&GraphParam::Str("u".to_string())));
292        assert!(node_cypher.contains("n.agent_id = $agent_id"));
293    }
294
295    #[tokio::test(flavor = "current_thread")]
296    async fn should_omit_absent_scope_dimensions() {
297        let store = StagedStore::empty();
298        inspect_scope(&store, None, Some("o"), None, 100).await.unwrap();
299
300        let (node_cypher, params) = &store.calls()[0];
301        assert!(node_cypher.contains("n.org_id = $org_id"));
302        assert!(!node_cypher.contains("agent_id"), "absent dimension imposes no filter");
303        assert!(!node_cypher.contains("user_id"));
304        assert!(!params.contains_key("agent_id"));
305    }
306
307    #[tokio::test(flavor = "current_thread")]
308    async fn should_emit_no_where_clause_for_empty_scope() {
309        let store = StagedStore::empty();
310        inspect_scope(&store, None, None, None, 100).await.unwrap();
311
312        let (node_cypher, _) = &store.calls()[0];
313        assert!(!node_cypher.contains("WHERE"), "no scope -> whole-graph dump");
314    }
315
316    #[tokio::test(flavor = "current_thread")]
317    async fn should_read_both_current_and_superseded_edges() {
318        let store = StagedStore::empty();
319        inspect_scope(&store, Some("a"), Some("o"), Some("u"), 100).await.unwrap();
320
321        let edge_cypher = &store.calls()[1].0;
322        assert!(
323            !edge_cypher.contains("valid_to IS NULL"),
324            "admin view must include superseded edges, not filter to current",
325        );
326        assert!(edge_cypher.contains("r.valid_to AS valid_to"));
327    }
328
329    #[tokio::test(flavor = "current_thread")]
330    async fn should_clamp_limit_to_max() {
331        let store = StagedStore::empty();
332        inspect_scope(&store, None, None, None, MAX_INSPECTION_LIMIT * 10)
333            .await
334            .unwrap();
335        assert_eq!(
336            store.calls()[0].1.get("lim"),
337            Some(&GraphParam::Int(MAX_INSPECTION_LIMIT as i64)),
338        );
339    }
340
341    #[tokio::test(flavor = "current_thread")]
342    async fn should_build_snapshot_from_node_and_edge_rows() {
343        let store = StagedStore::new(vec![
344            vec![row(&[
345                ("name", "Alice"),
346                ("memory_pids", "[\"mem1\",\"mem2\"]"),
347                ("first_seen_at", "2026-06-01T00:00:00+00:00"),
348            ])],
349            vec![row(&[
350                ("subject", "Alice"),
351                ("relation", "works at"),
352                ("object", "Acme"),
353                ("confidence", "0.9"),
354                ("valid_from", "2026-06-01T00:00:00+00:00"),
355                ("valid_to", "null"),
356                ("memory_pids", "[\"mem1\"]"),
357            ])],
358        ]);
359
360        let snapshot = inspect_scope(&store, None, None, None, 100).await.unwrap();
361
362        assert_eq!(snapshot.nodes.len(), 1);
363        assert_eq!(snapshot.nodes[0].name, "Alice");
364        assert_eq!(snapshot.nodes[0].memory_pids, vec!["mem1", "mem2"]);
365        assert_eq!(snapshot.edges.len(), 1);
366        assert_eq!(snapshot.edges[0].object, "Acme");
367        assert!(snapshot.edges[0].valid_to.is_none(), "null valid_to -> current edge");
368        assert!(!snapshot.truncated);
369    }
370
371    #[tokio::test(flavor = "current_thread")]
372    async fn should_surface_superseded_edge_valid_to() {
373        let store = StagedStore::new(vec![
374            vec![],
375            vec![row(&[
376                ("subject", "Alice"),
377                ("relation", "works at"),
378                ("object", "Globex"),
379                ("confidence", "0.8"),
380                ("valid_from", "2026-05-01T00:00:00+00:00"),
381                ("valid_to", "2026-06-01T00:00:00+00:00"),
382                ("memory_pids", "[\"mem0\"]"),
383            ])],
384        ]);
385
386        let snapshot = inspect_scope(&store, None, None, None, 100).await.unwrap();
387        assert_eq!(snapshot.edges[0].valid_to.as_deref(), Some("2026-06-01T00:00:00+00:00"));
388    }
389
390    #[tokio::test(flavor = "current_thread")]
391    async fn should_flag_truncated_when_limit_reached() {
392        let store = StagedStore::new(vec![vec![row(&[("name", "Alice")]), row(&[("name", "Bob")])], vec![]]);
393
394        let snapshot = inspect_scope(&store, None, None, None, 2).await.unwrap();
395        assert!(snapshot.truncated, "node count == limit -> truncated");
396    }
397
398    #[tokio::test(flavor = "current_thread")]
399    async fn should_skip_node_missing_name() {
400        let store = StagedStore::new(vec![vec![row(&[("memory_pids", "[\"mem1\"]")])], vec![]]);
401        let snapshot = inspect_scope(&store, None, None, None, 100).await.unwrap();
402        assert!(snapshot.nodes.is_empty());
403    }
404
405    #[tokio::test(flavor = "current_thread")]
406    async fn should_default_pids_empty_when_unparseable() {
407        let store = StagedStore::new(vec![vec![row(&[("name", "Alice"), ("memory_pids", "not json")])], vec![]]);
408        let snapshot = inspect_scope(&store, None, None, None, 100).await.unwrap();
409        assert!(snapshot.nodes[0].memory_pids.is_empty());
410    }
411}