Skip to main content

entelix_memory/
graph.rs

1//! `GraphMemory<N, E>` — relationship-aware long-term memory.
2//!
3//! Where `EntityMemory` records flat `entity → fact` pairs and
4//! `SemanticMemory` records embeddings, `GraphMemory` records
5//! **typed nodes** plus **typed, timestamped edges** between them.
6//! That delivers the actual "graph-based long-term memory" surface
7//! the user-level mental model expects: knowledge stored as an
8//! evolving entity-relationship graph, queryable by traversal.
9//!
10//! Trait + reference impl pattern: the trait is the
11//! contract any backend honours (Neo4j, ArangoDB, in-Postgres
12//! recursive CTE, …), and [`InMemoryGraphMemory`] is the embedded
13//! reference impl — hand-rolled with `BTreeMap` adjacency lists, no
14//! external graph library, parallel to [`crate::InMemoryStore`].
15//!
16//! ## Surface
17//!
18//! - `add_node` / `add_edge` — insertion. Backends mint stable
19//!   [`NodeId`] / [`EdgeId`] values and return them so callers can
20//!   reference nodes/edges later without name lookups.
21//! - `get_node` / `get_edge` — single-id lookup. `get_node`
22//!   returns the payload `Option<N>`; `get_edge` returns the full
23//!   [`GraphHop<E>`] (`from`, `to`, `edge`, `timestamp`).
24//! - `neighbors(id, direction)` — outgoing or incoming edges from a
25//!   single node.
26//! - `traverse(start, max_depth)` — BFS up to `max_depth` hops.
27//! - `find_path(from, to, max_depth)` — shortest unweighted path
28//!   between two nodes (BFS-based).
29//! - `temporal_filter(time_range)` — edges whose timestamp falls in
30//!   `[from, to)`. Returns `(EdgeId, NodeId, NodeId, E)` tuples.
31
32use std::collections::{BTreeMap, HashSet, VecDeque};
33use std::sync::Arc;
34
35use async_trait::async_trait;
36use chrono::{DateTime, Utc};
37use dashmap::DashMap;
38use entelix_core::{Error, ExecutionContext, Result};
39use parking_lot::RwLock;
40use serde::{Deserialize, Serialize};
41use uuid::Uuid;
42
43use crate::namespace::Namespace;
44
45/// Stable, opaque node identifier minted by the backend.
46#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
47pub struct NodeId(String);
48
49impl NodeId {
50    /// Build a fresh id (UUID v7 — sortable by creation time).
51    #[must_use]
52    pub fn new() -> Self {
53        Self(Uuid::now_v7().to_string())
54    }
55
56    /// Adopt an externally-minted id (e.g. from a graph DB primary
57    /// key). Caller is responsible for uniqueness within a
58    /// namespace.
59    #[must_use]
60    pub fn from_string(s: impl Into<String>) -> Self {
61        Self(s.into())
62    }
63
64    /// Borrow the underlying string.
65    #[must_use]
66    pub fn as_str(&self) -> &str {
67        &self.0
68    }
69}
70
71impl Default for NodeId {
72    fn default() -> Self {
73        Self::new()
74    }
75}
76
77impl std::fmt::Display for NodeId {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        f.write_str(&self.0)
80    }
81}
82
83/// Stable, opaque edge identifier minted by the backend.
84#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
85pub struct EdgeId(String);
86
87impl EdgeId {
88    /// Build a fresh id.
89    #[must_use]
90    pub fn new() -> Self {
91        Self(Uuid::now_v7().to_string())
92    }
93
94    /// Adopt an externally-minted id.
95    #[must_use]
96    pub fn from_string(s: impl Into<String>) -> Self {
97        Self(s.into())
98    }
99
100    /// Borrow the underlying string.
101    #[must_use]
102    pub fn as_str(&self) -> &str {
103        &self.0
104    }
105}
106
107impl Default for EdgeId {
108    fn default() -> Self {
109        Self::new()
110    }
111}
112
113impl std::fmt::Display for EdgeId {
114    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115        f.write_str(&self.0)
116    }
117}
118
119/// Direction of edge traversal — outgoing edges leave a node,
120/// incoming edges arrive at it. `Both` returns the union.
121#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
122#[non_exhaustive]
123pub enum Direction {
124    /// Edges where the queried node is the source.
125    Outgoing,
126    /// Edges where the queried node is the target.
127    Incoming,
128    /// Either direction.
129    Both,
130}
131
132/// One traversal hop produced by [`GraphMemory::traverse`] or
133/// [`GraphMemory::find_path`].
134#[derive(Clone, Debug)]
135#[non_exhaustive]
136pub struct GraphHop<E> {
137    /// Edge that connects the previous node to `node`.
138    pub edge_id: EdgeId,
139    /// Source of the edge (the previously-visited node).
140    pub from: NodeId,
141    /// Destination of the edge — the freshly-reached node.
142    pub to: NodeId,
143    /// Edge payload.
144    pub edge: E,
145    /// Wall-clock timestamp recorded when the edge was inserted.
146    /// Carried on the hop so callers don't have to issue a second
147    /// backend lookup just to access temporal data.
148    pub timestamp: DateTime<Utc>,
149}
150
151impl<E> GraphHop<E> {
152    /// Construct a hop from the five fields every backend supplies.
153    /// Backends use this constructor (rather than struct-literal
154    /// syntax) so the `#[non_exhaustive]` evolvability guarantee
155    /// holds across the workspace boundary.
156    #[must_use]
157    pub const fn new(
158        edge_id: EdgeId,
159        from: NodeId,
160        to: NodeId,
161        edge: E,
162        timestamp: DateTime<Utc>,
163    ) -> Self {
164        Self {
165            edge_id,
166            from,
167            to,
168            edge,
169            timestamp,
170        }
171    }
172}
173
174/// Generic graph-of-knowledge memory. Trait so backends (Neo4j,
175/// ArangoDB, Postgres-with-recursive-CTE) can plug in without
176/// touching the consumer code; reference in-process impl is
177/// [`InMemoryGraphMemory`].
178#[async_trait]
179pub trait GraphMemory<N, E>: Send + Sync + 'static
180where
181    N: Clone + Send + Sync + 'static,
182    E: Clone + Send + Sync + 'static,
183{
184    /// Insert `node` and return its assigned id.
185    async fn add_node(&self, ctx: &ExecutionContext, ns: &Namespace, node: N) -> Result<NodeId>;
186
187    /// Insert an edge from `from` to `to` carrying `edge`.
188    /// `timestamp` is supplied by the caller so re-inserting after
189    /// a replay produces deterministic edges.
190    async fn add_edge(
191        &self,
192        ctx: &ExecutionContext,
193        ns: &Namespace,
194        from: &NodeId,
195        to: &NodeId,
196        edge: E,
197        timestamp: DateTime<Utc>,
198    ) -> Result<EdgeId>;
199
200    /// Insert a batch of edges atomically. Each tuple is
201    /// `(from, to, edge, timestamp)`; endpoints must already exist
202    /// (same contract as [`Self::add_edge`]). Returns the assigned
203    /// [`EdgeId`]s in input order.
204    ///
205    /// Backends with native bulk-insert support (e.g.
206    /// `PgGraphMemory`'s `INSERT … SELECT FROM UNNEST(…)`) override
207    /// this to fold N round-trips into one. The default impl loops
208    /// over [`Self::add_edge`] — correct for every backend, fast
209    /// for none. Knowledge-graph batch ingest is the operator
210    /// hot path that motivates the override.
211    async fn add_edges_batch(
212        &self,
213        ctx: &ExecutionContext,
214        ns: &Namespace,
215        edges: Vec<(NodeId, NodeId, E, DateTime<Utc>)>,
216    ) -> Result<Vec<EdgeId>> {
217        let mut ids = Vec::with_capacity(edges.len());
218        for (from, to, edge, timestamp) in edges {
219            ids.push(self.add_edge(ctx, ns, &from, &to, edge, timestamp).await?);
220        }
221        Ok(ids)
222    }
223
224    /// Look up a node by id (verb-family `get` per
225    /// `.claude/rules/naming.md` — single-item primary-key
226    /// lookup).
227    async fn get_node(
228        &self,
229        ctx: &ExecutionContext,
230        ns: &Namespace,
231        id: &NodeId,
232    ) -> Result<Option<N>>;
233
234    /// Look up an edge by id and return the full structural body
235    /// ([`GraphHop<E>`] — `from`, `to`, `edge`, `timestamp`).
236    /// Operators rarely want the payload alone for edges; the
237    /// endpoints and timestamp are usually load-bearing for any
238    /// follow-up decision (audit context, freshness check,
239    /// neighbour navigation). Returning the full hop saves a
240    /// second lookup.
241    ///
242    /// Asymmetric with [`Self::get_node`] (which returns
243    /// `Option<N>` because nodes have no separate structural
244    /// body) — the shape difference is intentional, not an
245    /// oversight.
246    ///
247    /// Default impl returns `None`.
248    async fn get_edge(
249        &self,
250        _ctx: &ExecutionContext,
251        _ns: &Namespace,
252        _edge_id: &EdgeId,
253    ) -> Result<Option<GraphHop<E>>> {
254        Ok(None)
255    }
256
257    /// Edges incident to `node` in the requested direction. Each
258    /// triple is `(EdgeId, neighbour NodeId, edge payload)`.
259    async fn neighbors(
260        &self,
261        ctx: &ExecutionContext,
262        ns: &Namespace,
263        node: &NodeId,
264        direction: Direction,
265    ) -> Result<Vec<(EdgeId, NodeId, E)>>;
266
267    /// Breadth-first traversal starting at `start`, expanding up to
268    /// `max_depth` hops along edges in the requested `direction`.
269    /// Returns the visited hops in BFS order (excluding the seed
270    /// node, which has no inbound edge in this traversal). Use
271    /// [`Direction::Both`] for relationship-graph queries that
272    /// don't care about edge polarity (knowledge graphs typically
273    /// want this).
274    async fn traverse(
275        &self,
276        ctx: &ExecutionContext,
277        ns: &Namespace,
278        start: &NodeId,
279        direction: Direction,
280        max_depth: usize,
281    ) -> Result<Vec<GraphHop<E>>>;
282
283    /// Shortest unweighted path from `from` to `to` (BFS) along
284    /// edges in the requested `direction`. Returns the sequence of
285    /// hops; `Some(vec![])` means `from == to` (already at
286    /// destination — no edges traversed); `None` means no path
287    /// exists within `max_depth` hops.
288    async fn find_path(
289        &self,
290        ctx: &ExecutionContext,
291        ns: &Namespace,
292        from: &NodeId,
293        to: &NodeId,
294        direction: Direction,
295        max_depth: usize,
296    ) -> Result<Option<Vec<GraphHop<E>>>>;
297
298    /// Edges whose timestamp falls in `[from, to)`. Useful for
299    /// audit-log style queries ("what relationships did the agent
300    /// learn last week").
301    async fn temporal_filter(
302        &self,
303        ctx: &ExecutionContext,
304        ns: &Namespace,
305        from: DateTime<Utc>,
306        to: DateTime<Utc>,
307    ) -> Result<Vec<GraphHop<E>>>;
308
309    /// Count nodes in `ns`. Cheap operator metric for
310    /// size-based decisions (paginate vs stream, fast-fail
311    /// empty-namespace check, audit / dashboard surface).
312    /// Default impl returns `0`.
313    async fn node_count(&self, _ctx: &ExecutionContext, _ns: &Namespace) -> Result<usize> {
314        Ok(0)
315    }
316
317    /// Count edges in `ns`. Cheap operator metric — same
318    /// rationale as [`Self::node_count`]. Default impl returns
319    /// `0`.
320    async fn edge_count(&self, _ctx: &ExecutionContext, _ns: &Namespace) -> Result<usize> {
321        Ok(0)
322    }
323
324    /// Drop one edge by id. Idempotent — deleting an absent
325    /// edge succeeds. Required — backends that don't support
326    /// edge deletion are degenerate; closed the
327    /// CRUD-completeness gap.
328    async fn delete_edge(
329        &self,
330        ctx: &ExecutionContext,
331        ns: &Namespace,
332        edge_id: &EdgeId,
333    ) -> Result<()>;
334
335    /// Drop one node by id and every edge incident to it.
336    /// Cascades — operators that don't want cascading delete
337    /// every incident edge first via [`Self::delete_edge`] and
338    /// then call this. Returns the count of removed edges so
339    /// callers can log or expose cleanup metrics; `0` when the
340    /// node had no edges (or was absent — the operation is
341    /// idempotent).
342    ///
343    /// Cascade is the right default because the alternative
344    /// (leaving dangling edges that point at a deleted node)
345    /// would break the invariant "every edge endpoint is a
346    /// resolvable node id" that traversal relies on. Refusing
347    /// when edges exist (the SQL `RESTRICT` shape) would force
348    /// every operator into a manual edge-delete loop.
349    async fn delete_node(
350        &self,
351        ctx: &ExecutionContext,
352        ns: &Namespace,
353        node_id: &NodeId,
354    ) -> Result<usize>;
355
356    /// Drop every edge in `ns` whose timestamp is older than
357    /// `ttl` ago. Returns the count of removed edges so callers
358    /// can log or expose pruning metrics.
359    ///
360    /// Edge-only by design — nodes have no timestamp on the
361    /// trait surface, so a TTL sweep cannot reason about them
362    /// directly. Nodes left orphaned by edge removal stay in
363    /// place until the operator drops them explicitly via a
364    /// future operation. This mirrors
365    /// [`crate::EntityMemory::prune_older_than`] and
366    /// [`crate::EpisodicMemory::prune_older_than`] (single
367    /// timestamp axis, no cascading semantics).
368    ///
369    /// Default impl returns `Ok(0)` — only backends that own a
370    /// timestamp index implement this. Operators schedule it on
371    /// a timer (or trigger from a periodic graph) to bound
372    /// edge-table growth in long-running deployments.
373    async fn prune_older_than(
374        &self,
375        _ctx: &ExecutionContext,
376        _ns: &Namespace,
377        _ttl: std::time::Duration,
378    ) -> Result<usize> {
379        Ok(0)
380    }
381}
382
383// ── reference in-memory impl ─────────────────────────────────────────────
384
385/// Edge metadata kept by [`InMemoryGraphMemory`] alongside the
386/// caller-supplied payload.
387#[derive(Clone, Debug)]
388struct StoredEdge<E> {
389    id: EdgeId,
390    from: NodeId,
391    to: NodeId,
392    payload: E,
393    timestamp: DateTime<Utc>,
394}
395
396/// Per-namespace in-process graph table.
397#[derive(Default)]
398struct GraphTable<N, E> {
399    nodes: BTreeMap<NodeId, N>,
400    edges: BTreeMap<EdgeId, StoredEdge<E>>,
401    out_adj: BTreeMap<NodeId, Vec<EdgeId>>,
402    in_adj: BTreeMap<NodeId, Vec<EdgeId>>,
403}
404
405impl<N, E> GraphTable<N, E> {
406    const fn new() -> Self {
407        Self {
408            nodes: BTreeMap::new(),
409            edges: BTreeMap::new(),
410            out_adj: BTreeMap::new(),
411            in_adj: BTreeMap::new(),
412        }
413    }
414}
415
416/// One per-namespace table behind a `RwLock`. Writes against one
417/// namespace block writes to that namespace only; reads share the
418/// lock's read side. Type alias keeps signatures readable.
419type NamespaceTable<N, E> = Arc<RwLock<GraphTable<N, E>>>;
420
421/// Sharded namespace map. `DashMap` partitions across internal
422/// stripes so concurrent writes against distinct namespaces never
423/// serialise on a single mutex.
424type ShardedNamespaceMap<N, E> = Arc<DashMap<String, NamespaceTable<N, E>>>;
425
426/// In-process [`GraphMemory`] backed by `BTreeMap` adjacency lists,
427/// sharded per namespace.
428///
429/// Locking model:
430/// - The outer `DashMap` keys per-namespace tables. DashMap shards
431///   the map across N internal stripes so insertions and lookups
432///   across distinct namespaces never serialise on a single mutex.
433/// - Each table sits behind its own `RwLock`. Writes to one
434///   namespace block writes to that namespace only; concurrent
435///   writes against distinct namespaces run in parallel.
436/// - Reads against a namespace share the read side of the per-table
437///   `RwLock`, so dashboards / agents querying the same graph in
438///   parallel scale linearly until contention on a single tenant's
439///   write rate.
440///
441/// Cheap to clone — internal state is `Arc<DashMap<...>>`-shared,
442/// so every clone observes the same graph.
443pub struct InMemoryGraphMemory<N, E>
444where
445    N: Clone + Send + Sync + 'static,
446    E: Clone + Send + Sync + 'static,
447{
448    inner: ShardedNamespaceMap<N, E>,
449}
450
451impl<N, E> InMemoryGraphMemory<N, E>
452where
453    N: Clone + Send + Sync + 'static,
454    E: Clone + Send + Sync + 'static,
455{
456    /// Empty graph. Cheap to clone.
457    #[must_use]
458    pub fn new() -> Self {
459        Self {
460            inner: Arc::new(DashMap::new()),
461        }
462    }
463
464    /// Total node count across all namespaces — useful for tests.
465    /// Iterates DashMap entries (each acquired via its own shard
466    /// lock) and reads each per-namespace lock independently.
467    #[must_use]
468    pub fn total_nodes(&self) -> usize {
469        self.inner
470            .iter()
471            .map(|entry| entry.value().read().nodes.len())
472            .sum()
473    }
474
475    /// Total edge count across all namespaces — useful for tests.
476    #[must_use]
477    pub fn total_edges(&self) -> usize {
478        self.inner
479            .iter()
480            .map(|entry| entry.value().read().edges.len())
481            .sum()
482    }
483
484    /// Look up the per-namespace table without creating one. Returns
485    /// `None` for namespaces never written to.
486    fn table_for(&self, key: &str) -> Option<NamespaceTable<N, E>> {
487        self.inner.get(key).map(|r| Arc::clone(r.value()))
488    }
489
490    /// Get-or-create the per-namespace table. Used on the write
491    /// path only — the read path bails out early when the namespace
492    /// is absent rather than allocating an empty table.
493    fn table_for_write(&self, key: String) -> NamespaceTable<N, E> {
494        self.inner
495            .entry(key)
496            .or_insert_with(|| Arc::new(RwLock::new(GraphTable::new())))
497            .clone()
498    }
499}
500
501impl<N, E> Default for InMemoryGraphMemory<N, E>
502where
503    N: Clone + Send + Sync + 'static,
504    E: Clone + Send + Sync + 'static,
505{
506    fn default() -> Self {
507        Self::new()
508    }
509}
510
511impl<N, E> Clone for InMemoryGraphMemory<N, E>
512where
513    N: Clone + Send + Sync + 'static,
514    E: Clone + Send + Sync + 'static,
515{
516    fn clone(&self) -> Self {
517        Self {
518            inner: Arc::clone(&self.inner),
519        }
520    }
521}
522
523#[async_trait]
524impl<N, E> GraphMemory<N, E> for InMemoryGraphMemory<N, E>
525where
526    N: Clone + Send + Sync + 'static,
527    E: Clone + Send + Sync + 'static,
528{
529    async fn add_node(&self, _ctx: &ExecutionContext, ns: &Namespace, node: N) -> Result<NodeId> {
530        let id = NodeId::new();
531        let table = self.table_for_write(ns.render());
532        table.write().nodes.insert(id.clone(), node);
533        Ok(id)
534    }
535
536    async fn add_edge(
537        &self,
538        _ctx: &ExecutionContext,
539        ns: &Namespace,
540        from: &NodeId,
541        to: &NodeId,
542        edge: E,
543        timestamp: DateTime<Utc>,
544    ) -> Result<EdgeId> {
545        let id = EdgeId::new();
546        let table = self.table_for_write(ns.render());
547        let mut guard = table.write();
548        if !guard.nodes.contains_key(from) {
549            return Err(entelix_core::Error::invalid_request(format!(
550                "GraphMemory::add_edge: source node {from} does not exist"
551            )));
552        }
553        if !guard.nodes.contains_key(to) {
554            return Err(entelix_core::Error::invalid_request(format!(
555                "GraphMemory::add_edge: target node {to} does not exist"
556            )));
557        }
558        let stored = StoredEdge {
559            id: id.clone(),
560            from: from.clone(),
561            to: to.clone(),
562            payload: edge,
563            timestamp,
564        };
565        guard.edges.insert(id.clone(), stored);
566        guard
567            .out_adj
568            .entry(from.clone())
569            .or_default()
570            .push(id.clone());
571        guard.in_adj.entry(to.clone()).or_default().push(id.clone());
572        Ok(id)
573    }
574
575    async fn add_edges_batch(
576        &self,
577        _ctx: &ExecutionContext,
578        ns: &Namespace,
579        edges: Vec<(NodeId, NodeId, E, DateTime<Utc>)>,
580    ) -> Result<Vec<EdgeId>> {
581        if edges.is_empty() {
582            return Ok(Vec::new());
583        }
584        let table = self.table_for_write(ns.render());
585        let mut guard = table.write();
586        // First pass: validate every endpoint up front — fail fast
587        // before any insert so a partially-applied batch can't leave
588        // the namespace in a half-state.
589        for (from, to, _, _) in &edges {
590            if !guard.nodes.contains_key(from) {
591                return Err(entelix_core::Error::invalid_request(format!(
592                    "GraphMemory::add_edges_batch: source node {from} does not exist"
593                )));
594            }
595            if !guard.nodes.contains_key(to) {
596                return Err(entelix_core::Error::invalid_request(format!(
597                    "GraphMemory::add_edges_batch: target node {to} does not exist"
598                )));
599            }
600        }
601        // Second pass: every endpoint is known, every insert is safe.
602        let mut ids = Vec::with_capacity(edges.len());
603        for (from, to, payload, timestamp) in edges {
604            let id = EdgeId::new();
605            let stored = StoredEdge {
606                id: id.clone(),
607                from: from.clone(),
608                to: to.clone(),
609                payload,
610                timestamp,
611            };
612            guard.edges.insert(id.clone(), stored);
613            guard.out_adj.entry(from).or_default().push(id.clone());
614            guard.in_adj.entry(to).or_default().push(id.clone());
615            ids.push(id);
616        }
617        Ok(ids)
618    }
619
620    async fn get_node(
621        &self,
622        _ctx: &ExecutionContext,
623        ns: &Namespace,
624        id: &NodeId,
625    ) -> Result<Option<N>> {
626        let Some(table) = self.table_for(&ns.render()) else {
627            return Ok(None);
628        };
629        Ok(table.read().nodes.get(id).cloned())
630    }
631
632    async fn get_edge(
633        &self,
634        _ctx: &ExecutionContext,
635        ns: &Namespace,
636        edge_id: &EdgeId,
637    ) -> Result<Option<GraphHop<E>>> {
638        let Some(table) = self.table_for(&ns.render()) else {
639            return Ok(None);
640        };
641        Ok(table.read().edges.get(edge_id).map(|e| GraphHop {
642            edge_id: e.id.clone(),
643            from: e.from.clone(),
644            to: e.to.clone(),
645            edge: e.payload.clone(),
646            timestamp: e.timestamp,
647        }))
648    }
649
650    async fn neighbors(
651        &self,
652        _ctx: &ExecutionContext,
653        ns: &Namespace,
654        node: &NodeId,
655        direction: Direction,
656    ) -> Result<Vec<(EdgeId, NodeId, E)>> {
657        let Some(table) = self.table_for(&ns.render()) else {
658            return Ok(Vec::new());
659        };
660        let guard = table.read();
661        let mut out = Vec::new();
662        let mut collect = |edge_ids: &[EdgeId], pick_far: fn(&StoredEdge<E>) -> &NodeId| {
663            for eid in edge_ids {
664                if let Some(stored) = guard.edges.get(eid) {
665                    out.push((
666                        eid.clone(),
667                        pick_far(stored).clone(),
668                        stored.payload.clone(),
669                    ));
670                }
671            }
672        };
673        if matches!(direction, Direction::Outgoing | Direction::Both)
674            && let Some(ids) = guard.out_adj.get(node)
675        {
676            collect(ids, |s| &s.to);
677        }
678        if matches!(direction, Direction::Incoming | Direction::Both)
679            && let Some(ids) = guard.in_adj.get(node)
680        {
681            collect(ids, |s| &s.from);
682        }
683        Ok(out)
684    }
685
686    async fn traverse(
687        &self,
688        ctx: &ExecutionContext,
689        ns: &Namespace,
690        start: &NodeId,
691        direction: Direction,
692        max_depth: usize,
693    ) -> Result<Vec<GraphHop<E>>> {
694        if max_depth == 0 {
695            return Ok(Vec::new());
696        }
697        let Some(table) = self.table_for(&ns.render()) else {
698            return Ok(Vec::new());
699        };
700        let guard = table.read();
701        let mut visited: HashSet<NodeId> = HashSet::new();
702        visited.insert(start.clone());
703        let mut frontier: VecDeque<(NodeId, usize)> = VecDeque::new();
704        frontier.push_back((start.clone(), 0));
705        let mut out = Vec::new();
706        while let Some((current, depth)) = frontier.pop_front() {
707            if ctx.is_cancelled() {
708                return Err(Error::Cancelled);
709            }
710            if depth >= max_depth {
711                continue;
712            }
713            for stored in directional_edges(&guard, &current, direction) {
714                let neighbour = stored
715                    .other_endpoint_of(&current)
716                    .cloned()
717                    .unwrap_or_else(|| stored.to.clone());
718                if visited.insert(neighbour.clone()) {
719                    out.push(GraphHop {
720                        edge_id: stored.id.clone(),
721                        from: stored.from.clone(),
722                        to: stored.to.clone(),
723                        edge: stored.payload.clone(),
724                        timestamp: stored.timestamp,
725                    });
726                    frontier.push_back((neighbour, depth + 1));
727                }
728            }
729        }
730        Ok(out)
731    }
732
733    async fn find_path(
734        &self,
735        ctx: &ExecutionContext,
736        ns: &Namespace,
737        from: &NodeId,
738        to: &NodeId,
739        direction: Direction,
740        max_depth: usize,
741    ) -> Result<Option<Vec<GraphHop<E>>>> {
742        if from == to {
743            return Ok(Some(Vec::new()));
744        }
745        if max_depth == 0 {
746            return Ok(None);
747        }
748        let Some(table) = self.table_for(&ns.render()) else {
749            return Ok(None);
750        };
751        let guard = table.read();
752        let mut parents: BTreeMap<NodeId, (EdgeId, NodeId)> = BTreeMap::new();
753        let mut depths: BTreeMap<NodeId, usize> = BTreeMap::new();
754        depths.insert(from.clone(), 0);
755        let mut frontier: VecDeque<NodeId> = VecDeque::new();
756        frontier.push_back(from.clone());
757        while let Some(current) = frontier.pop_front() {
758            if ctx.is_cancelled() {
759                return Err(Error::Cancelled);
760            }
761            let depth = *depths.get(&current).unwrap_or(&0);
762            if depth >= max_depth {
763                continue;
764            }
765            for stored in directional_edges(&guard, &current, direction) {
766                let neighbour = stored
767                    .other_endpoint_of(&current)
768                    .cloned()
769                    .unwrap_or_else(|| stored.to.clone());
770                if depths.contains_key(&neighbour) {
771                    continue;
772                }
773                depths.insert(neighbour.clone(), depth + 1);
774                parents.insert(neighbour.clone(), (stored.id.clone(), current.clone()));
775                if &neighbour == to {
776                    let mut hops: Vec<GraphHop<E>> = Vec::new();
777                    let mut cursor = to.clone();
778                    while let Some((eid, prev)) = parents.get(&cursor).cloned() {
779                        if let Some(stored) = guard.edges.get(&eid) {
780                            hops.push(GraphHop {
781                                edge_id: stored.id.clone(),
782                                from: stored.from.clone(),
783                                to: stored.to.clone(),
784                                edge: stored.payload.clone(),
785                                timestamp: stored.timestamp,
786                            });
787                        }
788                        cursor = prev;
789                    }
790                    hops.reverse();
791                    return Ok(Some(hops));
792                }
793                frontier.push_back(neighbour);
794            }
795        }
796        Ok(None)
797    }
798
799    async fn temporal_filter(
800        &self,
801        _ctx: &ExecutionContext,
802        ns: &Namespace,
803        from: DateTime<Utc>,
804        to: DateTime<Utc>,
805    ) -> Result<Vec<GraphHop<E>>> {
806        let Some(table) = self.table_for(&ns.render()) else {
807            return Ok(Vec::new());
808        };
809        let guard = table.read();
810        let mut out: Vec<GraphHop<E>> = guard
811            .edges
812            .values()
813            .filter(|e| e.timestamp >= from && e.timestamp < to)
814            .map(|e| GraphHop {
815                edge_id: e.id.clone(),
816                from: e.from.clone(),
817                to: e.to.clone(),
818                edge: e.payload.clone(),
819                timestamp: e.timestamp,
820            })
821            .collect();
822        out.sort_by_key(|hop| hop.timestamp);
823        Ok(out)
824    }
825
826    async fn node_count(&self, _ctx: &ExecutionContext, ns: &Namespace) -> Result<usize> {
827        let Some(table) = self.table_for(&ns.render()) else {
828            return Ok(0);
829        };
830        Ok(table.read().nodes.len())
831    }
832
833    async fn edge_count(&self, _ctx: &ExecutionContext, ns: &Namespace) -> Result<usize> {
834        let Some(table) = self.table_for(&ns.render()) else {
835            return Ok(0);
836        };
837        Ok(table.read().edges.len())
838    }
839
840    async fn delete_edge(
841        &self,
842        _ctx: &ExecutionContext,
843        ns: &Namespace,
844        edge_id: &EdgeId,
845    ) -> Result<()> {
846        let Some(table) = self.table_for(&ns.render()) else {
847            return Ok(());
848        };
849        let mut guard = table.write();
850        if let Some(edge) = guard.edges.remove(edge_id) {
851            if let Some(out_list) = guard.out_adj.get_mut(&edge.from) {
852                out_list.retain(|e| e != &edge.id);
853            }
854            if let Some(in_list) = guard.in_adj.get_mut(&edge.to) {
855                in_list.retain(|e| e != &edge.id);
856            }
857        }
858        Ok(())
859    }
860
861    async fn delete_node(
862        &self,
863        _ctx: &ExecutionContext,
864        ns: &Namespace,
865        node_id: &NodeId,
866    ) -> Result<usize> {
867        let Some(table) = self.table_for(&ns.render()) else {
868            return Ok(0);
869        };
870        let mut guard = table.write();
871        // Snapshot incident edge ids before mutating — `out_adj`
872        // and `in_adj` may overlap when the node has self-loops,
873        // so dedup via a HashSet.
874        let mut incident: HashSet<EdgeId> = HashSet::new();
875        if let Some(out_list) = guard.out_adj.get(node_id) {
876            for id in out_list {
877                incident.insert(id.clone());
878            }
879        }
880        if let Some(in_list) = guard.in_adj.get(node_id) {
881            for id in in_list {
882                incident.insert(id.clone());
883            }
884        }
885        let removed = incident.len();
886        for edge_id in incident {
887            if let Some(edge) = guard.edges.remove(&edge_id) {
888                if let Some(out_list) = guard.out_adj.get_mut(&edge.from) {
889                    out_list.retain(|e| e != &edge.id);
890                }
891                if let Some(in_list) = guard.in_adj.get_mut(&edge.to) {
892                    in_list.retain(|e| e != &edge.id);
893                }
894            }
895        }
896        guard.nodes.remove(node_id);
897        guard.out_adj.remove(node_id);
898        guard.in_adj.remove(node_id);
899        Ok(removed)
900    }
901
902    async fn prune_older_than(
903        &self,
904        _ctx: &ExecutionContext,
905        ns: &Namespace,
906        ttl: std::time::Duration,
907    ) -> Result<usize> {
908        let Some(table) = self.table_for(&ns.render()) else {
909            return Ok(0);
910        };
911        // chrono::Duration is signed and uses i64 nanoseconds; for
912        // pathological ttls (above i64::MAX seconds) saturate to
913        // chrono::Duration::MAX so the cutoff stays in the past.
914        let cutoff = Utc::now() - chrono::Duration::from_std(ttl).unwrap_or(chrono::Duration::MAX);
915        let mut guard = table.write();
916        let stale: Vec<EdgeId> = guard
917            .edges
918            .iter()
919            .filter(|(_, e)| e.timestamp < cutoff)
920            .map(|(id, _)| id.clone())
921            .collect();
922        let removed = stale.len();
923        for id in stale {
924            if let Some(edge) = guard.edges.remove(&id) {
925                if let Some(out_list) = guard.out_adj.get_mut(&edge.from) {
926                    out_list.retain(|e| e != &edge.id);
927                }
928                if let Some(in_list) = guard.in_adj.get_mut(&edge.to) {
929                    in_list.retain(|e| e != &edge.id);
930                }
931            }
932        }
933        Ok(removed)
934    }
935}
936
937impl<E> StoredEdge<E> {
938    fn other_endpoint_of(&self, node: &NodeId) -> Option<&NodeId> {
939        if &self.from == node {
940            Some(&self.to)
941        } else if &self.to == node {
942            Some(&self.from)
943        } else {
944            None
945        }
946    }
947}
948
949fn directional_edges<'a, N, E>(
950    table: &'a GraphTable<N, E>,
951    node: &NodeId,
952    direction: Direction,
953) -> Vec<&'a StoredEdge<E>> {
954    let mut out: Vec<&'a StoredEdge<E>> = Vec::new();
955    if matches!(direction, Direction::Outgoing | Direction::Both)
956        && let Some(ids) = table.out_adj.get(node)
957    {
958        for eid in ids {
959            if let Some(stored) = table.edges.get(eid) {
960                out.push(stored);
961            }
962        }
963    }
964    if matches!(direction, Direction::Incoming | Direction::Both)
965        && let Some(ids) = table.in_adj.get(node)
966    {
967        for eid in ids {
968            if let Some(stored) = table.edges.get(eid) {
969                out.push(stored);
970            }
971        }
972    }
973    out
974}
975
976#[cfg(test)]
977#[allow(
978    clippy::unwrap_used,
979    clippy::indexing_slicing,
980    clippy::many_single_char_names
981)]
982mod tests {
983    use super::*;
984    use entelix_core::TenantId;
985
986    fn ns() -> Namespace {
987        Namespace::new(TenantId::new("tenant")).with_scope("graph")
988    }
989
990    #[tokio::test]
991    async fn add_and_lookup_node() {
992        let g = InMemoryGraphMemory::<&str, &str>::new();
993        let ctx = ExecutionContext::new();
994        let id = g.add_node(&ctx, &ns(), "alice").await.unwrap();
995        let fetched = g.get_node(&ctx, &ns(), &id).await.unwrap();
996        assert_eq!(fetched, Some("alice"));
997    }
998
999    #[tokio::test]
1000    async fn add_edges_batch_inserts_all_atomically() {
1001        let g = InMemoryGraphMemory::<&str, &str>::new();
1002        let ctx = ExecutionContext::new();
1003        let alice = g.add_node(&ctx, &ns(), "alice").await.unwrap();
1004        let bob = g.add_node(&ctx, &ns(), "bob").await.unwrap();
1005        let carol = g.add_node(&ctx, &ns(), "carol").await.unwrap();
1006        let now = Utc::now();
1007        let ids = g
1008            .add_edges_batch(
1009                &ctx,
1010                &ns(),
1011                vec![
1012                    (alice.clone(), bob.clone(), "knows", now),
1013                    (bob.clone(), carol.clone(), "knows", now),
1014                    (alice.clone(), carol.clone(), "knows", now),
1015                ],
1016            )
1017            .await
1018            .unwrap();
1019        assert_eq!(ids.len(), 3, "returns one EdgeId per input");
1020        // Every id must resolve to the corresponding hop.
1021        for id in &ids {
1022            let hop = g.get_edge(&ctx, &ns(), id).await.unwrap();
1023            assert!(hop.is_some(), "edge {id} must be retrievable");
1024        }
1025    }
1026
1027    #[tokio::test]
1028    async fn add_edges_batch_rejects_unknown_endpoint_without_partial_writes() {
1029        // Validation happens before any insert — a single bad edge
1030        // in the batch must leave the namespace untouched.
1031        let g = InMemoryGraphMemory::<&str, &str>::new();
1032        let ctx = ExecutionContext::new();
1033        let alice = g.add_node(&ctx, &ns(), "alice").await.unwrap();
1034        let bob = g.add_node(&ctx, &ns(), "bob").await.unwrap();
1035        let ghost = NodeId::new();
1036        let now = Utc::now();
1037        let err = g
1038            .add_edges_batch(
1039                &ctx,
1040                &ns(),
1041                vec![
1042                    (alice.clone(), bob.clone(), "knows", now),
1043                    (alice.clone(), ghost, "knows", now), // bad
1044                ],
1045            )
1046            .await;
1047        assert!(err.is_err(), "batch with unknown endpoint must fail");
1048        // No edge should have been written — the good first entry
1049        // must NOT have leaked through.
1050        assert_eq!(g.edge_count(&ctx, &ns()).await.unwrap(), 0);
1051    }
1052
1053    #[tokio::test]
1054    async fn add_edges_batch_empty_input_is_a_noop() {
1055        let g = InMemoryGraphMemory::<&str, &str>::new();
1056        let ctx = ExecutionContext::new();
1057        let ids = g.add_edges_batch(&ctx, &ns(), Vec::new()).await.unwrap();
1058        assert!(ids.is_empty());
1059    }
1060
1061    #[tokio::test]
1062    async fn add_edge_requires_existing_endpoints() {
1063        let g = InMemoryGraphMemory::<&str, &str>::new();
1064        let ctx = ExecutionContext::new();
1065        let alice = g.add_node(&ctx, &ns(), "alice").await.unwrap();
1066        let ghost = NodeId::new();
1067        let err = g
1068            .add_edge(&ctx, &ns(), &alice, &ghost, "knows", Utc::now())
1069            .await;
1070        assert!(err.is_err());
1071    }
1072
1073    #[tokio::test]
1074    async fn neighbors_split_by_direction() {
1075        let g = InMemoryGraphMemory::<&str, &str>::new();
1076        let ctx = ExecutionContext::new();
1077        let alice = g.add_node(&ctx, &ns(), "alice").await.unwrap();
1078        let bob = g.add_node(&ctx, &ns(), "bob").await.unwrap();
1079        let _eid = g
1080            .add_edge(&ctx, &ns(), &alice, &bob, "knows", Utc::now())
1081            .await
1082            .unwrap();
1083        let outgoing = g
1084            .neighbors(&ctx, &ns(), &alice, Direction::Outgoing)
1085            .await
1086            .unwrap();
1087        assert_eq!(outgoing.len(), 1);
1088        let incoming = g
1089            .neighbors(&ctx, &ns(), &alice, Direction::Incoming)
1090            .await
1091            .unwrap();
1092        assert!(incoming.is_empty());
1093    }
1094
1095    #[tokio::test]
1096    async fn traverse_respects_max_depth() {
1097        let g = InMemoryGraphMemory::<&str, &str>::new();
1098        let ctx = ExecutionContext::new();
1099        let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
1100        let b = g.add_node(&ctx, &ns(), "b").await.unwrap();
1101        let c = g.add_node(&ctx, &ns(), "c").await.unwrap();
1102        let d = g.add_node(&ctx, &ns(), "d").await.unwrap();
1103        let now = Utc::now();
1104        g.add_edge(&ctx, &ns(), &a, &b, "->", now).await.unwrap();
1105        g.add_edge(&ctx, &ns(), &b, &c, "->", now).await.unwrap();
1106        g.add_edge(&ctx, &ns(), &c, &d, "->", now).await.unwrap();
1107        let two = g
1108            .traverse(&ctx, &ns(), &a, Direction::Outgoing, 2)
1109            .await
1110            .unwrap();
1111        assert_eq!(two.len(), 2);
1112        let three = g
1113            .traverse(&ctx, &ns(), &a, Direction::Outgoing, 3)
1114            .await
1115            .unwrap();
1116        assert_eq!(three.len(), 3);
1117    }
1118
1119    #[tokio::test]
1120    async fn traverse_with_direction_both_walks_inverse_edges() {
1121        let g = InMemoryGraphMemory::<&str, &str>::new();
1122        let ctx = ExecutionContext::new();
1123        let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
1124        let b = g.add_node(&ctx, &ns(), "b").await.unwrap();
1125        let c = g.add_node(&ctx, &ns(), "c").await.unwrap();
1126        let now = Utc::now();
1127        // a -> b <- c — walking from b in `Both` direction should
1128        // reach both a (incoming) and c (incoming).
1129        g.add_edge(&ctx, &ns(), &a, &b, "->", now).await.unwrap();
1130        g.add_edge(&ctx, &ns(), &c, &b, "->", now).await.unwrap();
1131        let from_b = g
1132            .traverse(&ctx, &ns(), &b, Direction::Both, 1)
1133            .await
1134            .unwrap();
1135        assert_eq!(from_b.len(), 2);
1136    }
1137
1138    #[tokio::test]
1139    async fn find_path_returns_shortest() {
1140        let g = InMemoryGraphMemory::<&str, &str>::new();
1141        let ctx = ExecutionContext::new();
1142        let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
1143        let b = g.add_node(&ctx, &ns(), "b").await.unwrap();
1144        let c = g.add_node(&ctx, &ns(), "c").await.unwrap();
1145        let now = Utc::now();
1146        g.add_edge(&ctx, &ns(), &a, &b, "ab", now).await.unwrap();
1147        g.add_edge(&ctx, &ns(), &b, &c, "bc", now).await.unwrap();
1148        let path = g
1149            .find_path(&ctx, &ns(), &a, &c, Direction::Outgoing, 5)
1150            .await
1151            .unwrap();
1152        let hops = path.unwrap();
1153        assert_eq!(hops.len(), 2);
1154        assert_eq!(hops[0].from, a);
1155        assert_eq!(hops[1].to, c);
1156    }
1157
1158    #[tokio::test]
1159    async fn temporal_filter_picks_window() {
1160        let g = InMemoryGraphMemory::<&str, &str>::new();
1161        let ctx = ExecutionContext::new();
1162        let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
1163        let b = g.add_node(&ctx, &ns(), "b").await.unwrap();
1164        let early = Utc::now() - chrono::Duration::hours(2);
1165        let late = Utc::now();
1166        g.add_edge(&ctx, &ns(), &a, &b, "early", early)
1167            .await
1168            .unwrap();
1169        g.add_edge(&ctx, &ns(), &a, &b, "late", late).await.unwrap();
1170        let window = g
1171            .temporal_filter(
1172                &ctx,
1173                &ns(),
1174                Utc::now() - chrono::Duration::hours(1),
1175                Utc::now() + chrono::Duration::hours(1),
1176            )
1177            .await
1178            .unwrap();
1179        assert_eq!(window.len(), 1);
1180        assert_eq!(window[0].edge, "late");
1181    }
1182
1183    #[tokio::test]
1184    async fn node_count_and_edge_count_track_inserts() {
1185        let g = InMemoryGraphMemory::<&str, &str>::new();
1186        let ctx = ExecutionContext::new();
1187        // Empty namespace.
1188        assert_eq!(g.node_count(&ctx, &ns()).await.unwrap(), 0);
1189        assert_eq!(g.edge_count(&ctx, &ns()).await.unwrap(), 0);
1190        let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
1191        let b = g.add_node(&ctx, &ns(), "b").await.unwrap();
1192        assert_eq!(g.node_count(&ctx, &ns()).await.unwrap(), 2);
1193        assert_eq!(g.edge_count(&ctx, &ns()).await.unwrap(), 0);
1194        let _ = g
1195            .add_edge(&ctx, &ns(), &a, &b, "ab", Utc::now())
1196            .await
1197            .unwrap();
1198        assert_eq!(g.edge_count(&ctx, &ns()).await.unwrap(), 1);
1199    }
1200
1201    #[tokio::test]
1202    async fn count_methods_respect_namespace_isolation() {
1203        let g = InMemoryGraphMemory::<&str, &str>::new();
1204        let ctx = ExecutionContext::new();
1205        let alpha = Namespace::new(TenantId::new("tenant")).with_scope("alpha");
1206        let beta = Namespace::new(TenantId::new("tenant")).with_scope("beta");
1207        let _ = g.add_node(&ctx, &alpha, "n").await.unwrap();
1208        assert_eq!(g.node_count(&ctx, &alpha).await.unwrap(), 1);
1209        assert_eq!(g.node_count(&ctx, &beta).await.unwrap(), 0);
1210    }
1211
1212    #[tokio::test]
1213    async fn delete_edge_is_idempotent_and_dedups_adjacency() {
1214        let g = InMemoryGraphMemory::<&str, &str>::new();
1215        let ctx = ExecutionContext::new();
1216        let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
1217        let b = g.add_node(&ctx, &ns(), "b").await.unwrap();
1218        let now = Utc::now();
1219        let id = g.add_edge(&ctx, &ns(), &a, &b, "ab", now).await.unwrap();
1220        // Removing twice succeeds — idempotent.
1221        g.delete_edge(&ctx, &ns(), &id).await.unwrap();
1222        g.delete_edge(&ctx, &ns(), &id).await.unwrap();
1223        // Adjacency lists no longer carry the deleted id.
1224        let outgoing = g
1225            .neighbors(&ctx, &ns(), &a, Direction::Outgoing)
1226            .await
1227            .unwrap();
1228        assert!(outgoing.is_empty());
1229        let incoming = g
1230            .neighbors(&ctx, &ns(), &b, Direction::Incoming)
1231            .await
1232            .unwrap();
1233        assert!(incoming.is_empty());
1234    }
1235
1236    #[tokio::test]
1237    async fn delete_node_cascades_to_incident_edges() {
1238        let g = InMemoryGraphMemory::<&str, &str>::new();
1239        let ctx = ExecutionContext::new();
1240        let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
1241        let b = g.add_node(&ctx, &ns(), "b").await.unwrap();
1242        let c = g.add_node(&ctx, &ns(), "c").await.unwrap();
1243        let now = Utc::now();
1244        // Three edges incident to `a`: out to b, out to c, in from b.
1245        let _ = g.add_edge(&ctx, &ns(), &a, &b, "ab", now).await.unwrap();
1246        let _ = g.add_edge(&ctx, &ns(), &a, &c, "ac", now).await.unwrap();
1247        let _ = g.add_edge(&ctx, &ns(), &b, &a, "ba", now).await.unwrap();
1248        let removed = g.delete_node(&ctx, &ns(), &a).await.unwrap();
1249        assert_eq!(removed, 3);
1250        assert!(g.get_node(&ctx, &ns(), &a).await.unwrap().is_none());
1251        // `b` and `c` survive (cascade is node-scoped, not graph-wide).
1252        assert!(g.get_node(&ctx, &ns(), &b).await.unwrap().is_some());
1253        assert!(g.get_node(&ctx, &ns(), &c).await.unwrap().is_some());
1254        // No dangling adjacency entries — `b`'s incoming edge from
1255        // `a` is gone too.
1256        let b_in = g
1257            .neighbors(&ctx, &ns(), &b, Direction::Incoming)
1258            .await
1259            .unwrap();
1260        assert!(b_in.is_empty());
1261    }
1262
1263    #[tokio::test]
1264    async fn delete_node_with_self_loop_dedups_count() {
1265        let g = InMemoryGraphMemory::<&str, &str>::new();
1266        let ctx = ExecutionContext::new();
1267        let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
1268        // Self-loop appears in both out_adj and in_adj — the snapshot
1269        // collection must dedup so the returned count is 1, not 2.
1270        let _ = g
1271            .add_edge(&ctx, &ns(), &a, &a, "self", Utc::now())
1272            .await
1273            .unwrap();
1274        assert_eq!(g.delete_node(&ctx, &ns(), &a).await.unwrap(), 1);
1275    }
1276
1277    #[tokio::test]
1278    async fn delete_node_on_absent_node_is_zero_noop() {
1279        let g = InMemoryGraphMemory::<&str, &str>::new();
1280        let ctx = ExecutionContext::new();
1281        let phantom = NodeId::from_string("does-not-exist");
1282        assert_eq!(g.delete_node(&ctx, &ns(), &phantom).await.unwrap(), 0);
1283    }
1284
1285    #[tokio::test]
1286    async fn prune_older_than_drops_stale_edges_only() {
1287        let g = InMemoryGraphMemory::<&str, &str>::new();
1288        let ctx = ExecutionContext::new();
1289        let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
1290        let b = g.add_node(&ctx, &ns(), "b").await.unwrap();
1291        let now = Utc::now();
1292        let _old = g
1293            .add_edge(
1294                &ctx,
1295                &ns(),
1296                &a,
1297                &b,
1298                "old",
1299                now - chrono::Duration::seconds(120),
1300            )
1301            .await
1302            .unwrap();
1303        let _fresh = g
1304            .add_edge(
1305                &ctx,
1306                &ns(),
1307                &a,
1308                &b,
1309                "fresh",
1310                now - chrono::Duration::seconds(5),
1311            )
1312            .await
1313            .unwrap();
1314        let removed = g
1315            .prune_older_than(&ctx, &ns(), std::time::Duration::from_mins(1))
1316            .await
1317            .unwrap();
1318        assert_eq!(removed, 1);
1319        // Both nodes survive — edge-only sweep, no orphan cleanup.
1320        assert!(g.get_node(&ctx, &ns(), &a).await.unwrap().is_some());
1321        assert!(g.get_node(&ctx, &ns(), &b).await.unwrap().is_some());
1322        // Adjacency lists deduplicated — only the fresh edge remains.
1323        let outgoing = g
1324            .neighbors(&ctx, &ns(), &a, Direction::Outgoing)
1325            .await
1326            .unwrap();
1327        assert_eq!(outgoing.len(), 1);
1328        assert_eq!(outgoing[0].2, "fresh");
1329    }
1330
1331    #[tokio::test]
1332    async fn prune_older_than_on_empty_namespace_is_noop() {
1333        let g = InMemoryGraphMemory::<&str, &str>::new();
1334        let ctx = ExecutionContext::new();
1335        let removed = g
1336            .prune_older_than(&ctx, &ns(), std::time::Duration::from_secs(0))
1337            .await
1338            .unwrap();
1339        assert_eq!(removed, 0);
1340    }
1341
1342    #[tokio::test]
1343    async fn namespaces_are_isolated() {
1344        let g = InMemoryGraphMemory::<&str, &str>::new();
1345        let ctx = ExecutionContext::new();
1346        let alpha = Namespace::new(TenantId::new("tenant")).with_scope("alpha");
1347        let beta = Namespace::new(TenantId::new("tenant")).with_scope("beta");
1348        let _ = g.add_node(&ctx, &alpha, "a-node").await.unwrap();
1349        let _ = g.add_node(&ctx, &beta, "b-node").await.unwrap();
1350        // Total bookkeeping shows both, but per-namespace traverse
1351        // never crosses.
1352        assert_eq!(g.total_nodes(), 2);
1353    }
1354
1355    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1356    async fn distinct_namespaces_write_concurrently() {
1357        // Lock the architectural promise that per-namespace sharding
1358        // delivers: 8 tasks, each writing into its own namespace,
1359        // run in parallel without contending on a single global
1360        // lock. The test does not measure wall time (flaky on CI)
1361        // but verifies the structural property: every per-namespace
1362        // table sees exactly the writes it owns, none of its
1363        // siblings'.
1364        let g: InMemoryGraphMemory<String, String> = InMemoryGraphMemory::new();
1365        let mut handles = Vec::new();
1366        for tenant in 0..8 {
1367            let g = g.clone();
1368            handles.push(tokio::spawn(async move {
1369                let ctx = ExecutionContext::new();
1370                let ns = Namespace::new(TenantId::new(format!("tenant-{tenant}")));
1371                let mut ids = Vec::new();
1372                for i in 0..50 {
1373                    let id = g
1374                        .add_node(&ctx, &ns, format!("t{tenant}-n{i}"))
1375                        .await
1376                        .unwrap();
1377                    ids.push(id);
1378                }
1379                // Edge between every consecutive pair — exercises the
1380                // write path beyond simple node insertion.
1381                let now = Utc::now();
1382                for window in ids.windows(2) {
1383                    g.add_edge(
1384                        &ctx,
1385                        &ns,
1386                        &window[0],
1387                        &window[1],
1388                        format!("t{tenant}-edge"),
1389                        now,
1390                    )
1391                    .await
1392                    .unwrap();
1393                }
1394            }));
1395        }
1396        for h in handles {
1397            h.await.unwrap();
1398        }
1399        // Each tenant inserted 50 nodes + 49 edges in its own
1400        // namespace; cross-pollution would change these counts.
1401        assert_eq!(g.total_nodes(), 8 * 50);
1402        assert_eq!(g.total_edges(), 8 * 49);
1403    }
1404
1405    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1406    async fn read_during_write_on_other_namespace_does_not_block() {
1407        // Reads of namespace A must not be serialised behind a
1408        // long-running write to namespace B. We can't prove
1409        // wall-clock independence portably, but we can verify the
1410        // structural property: a holding write lock on namespace A
1411        // does not even acquire the per-namespace lock for B, so
1412        // reads of B succeed regardless. Drive this through the
1413        // public surface: spawn N reads on `beta` while a write on
1414        // `alpha` is in flight, every read returns its expected
1415        // value.
1416        let g: InMemoryGraphMemory<String, String> = InMemoryGraphMemory::new();
1417        let alpha = Namespace::new(TenantId::new("alpha"));
1418        let beta = Namespace::new(TenantId::new("beta"));
1419        let ctx = ExecutionContext::new();
1420        let beta_node_id = g
1421            .add_node(&ctx, &beta, "beta-fixture".to_owned())
1422            .await
1423            .unwrap();
1424
1425        // Hold a write lock on alpha by inserting many nodes in a
1426        // loop. Concurrently, drive reads on beta and assert each
1427        // succeeds.
1428        let g_writer = g.clone();
1429        let alpha_writer = alpha.clone();
1430        let writer = tokio::spawn(async move {
1431            let ctx = ExecutionContext::new();
1432            for i in 0..200 {
1433                g_writer
1434                    .add_node(&ctx, &alpha_writer, format!("alpha-{i}"))
1435                    .await
1436                    .unwrap();
1437            }
1438        });
1439        let mut reads = Vec::new();
1440        for _ in 0..200 {
1441            let g_reader = g.clone();
1442            let beta_reader = beta.clone();
1443            let id_reader = beta_node_id.clone();
1444            reads.push(tokio::spawn(async move {
1445                let ctx = ExecutionContext::new();
1446                g_reader
1447                    .get_node(&ctx, &beta_reader, &id_reader)
1448                    .await
1449                    .unwrap()
1450            }));
1451        }
1452        for r in reads {
1453            assert_eq!(r.await.unwrap().as_deref(), Some("beta-fixture"));
1454        }
1455        writer.await.unwrap();
1456        assert_eq!(g.total_nodes(), 1 + 200);
1457    }
1458
1459    #[tokio::test]
1460    async fn traverse_short_circuits_on_cancellation() {
1461        let g = InMemoryGraphMemory::<&str, &str>::new();
1462        let ctx = ExecutionContext::new();
1463        let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
1464        let b = g.add_node(&ctx, &ns(), "b").await.unwrap();
1465        let c = g.add_node(&ctx, &ns(), "c").await.unwrap();
1466        let now = Utc::now();
1467        g.add_edge(&ctx, &ns(), &a, &b, "knows", now).await.unwrap();
1468        g.add_edge(&ctx, &ns(), &b, &c, "knows", now).await.unwrap();
1469        ctx.cancellation().cancel();
1470        let err = g
1471            .traverse(&ctx, &ns(), &a, Direction::Outgoing, 5)
1472            .await
1473            .unwrap_err();
1474        assert!(matches!(err, Error::Cancelled), "got {err:?}");
1475    }
1476
1477    #[tokio::test]
1478    async fn find_path_short_circuits_on_cancellation() {
1479        let g = InMemoryGraphMemory::<&str, &str>::new();
1480        let ctx = ExecutionContext::new();
1481        let a = g.add_node(&ctx, &ns(), "a").await.unwrap();
1482        let b = g.add_node(&ctx, &ns(), "b").await.unwrap();
1483        let c = g.add_node(&ctx, &ns(), "c").await.unwrap();
1484        let now = Utc::now();
1485        g.add_edge(&ctx, &ns(), &a, &b, "e", now).await.unwrap();
1486        g.add_edge(&ctx, &ns(), &b, &c, "e", now).await.unwrap();
1487        ctx.cancellation().cancel();
1488        let err = g
1489            .find_path(&ctx, &ns(), &a, &c, Direction::Outgoing, 5)
1490            .await
1491            .unwrap_err();
1492        assert!(matches!(err, Error::Cancelled), "got {err:?}");
1493    }
1494}