Skip to main content

nodedb_graph/
sharded.rs

1//! Tenant-partitioned CSR index.
2//!
3//! The graph engine multiplexes many tenants onto one per-core `CsrIndex`.
4//! Historically that multiplexing was done by prepending `"{tid}:"` to every
5//! node key before handing it to the CSR — a lexical scheme that required
6//! every API boundary (results, traversal output, algorithm emit) to strip
7//! the prefix on the way back out. The heuristic strippers were fragile
8//! (mangled any id that happened to match the shape) and the strip was
9//! opt-in (forgetting it leaked internal keys to clients).
10//!
11//! `ShardedCsrIndex` replaces that scheme with structural partitioning:
12//! one independent `CsrIndex` per tenant, keyed in a `HashMap<TenantId, _>`.
13//! Node keys inside a partition are the raw user-visible names — no prefix,
14//! no strip, no heuristic. Cross-tenant access is not a question of auth
15//! checks; it is structurally impossible because partitions don't share
16//! key space.
17//!
18//! ## Invariants
19//!
20//! - **One tenant per partition.** Every `CsrIndex` held in `partitions`
21//!   contains only one tenant's nodes and edges. Mixing is a programming
22//!   bug, not a runtime condition.
23//! - **No key prefixing.** Callers pass user-visible names directly; the
24//!   partition never sees a scoped form.
25//! - **Partition lifecycle is explicit.** `get_or_create` constructs on
26//!   first use; `drop_partition` removes atomically. No auto-eviction at
27//!   this layer (belongs to a future pool-management concern).
28//! - **Data Plane shape preserved.** This type is `!Send` in the same way
29//!   `CsrIndex` is (via `Cell<u32>` access counters). It is owned by a
30//!   single Data Plane core.
31
32use std::collections::HashMap;
33use std::collections::hash_map::{Entry, Iter, IterMut};
34
35use nodedb_types::TenantId;
36
37use crate::csr::CsrIndex;
38
39/// Per-tenant partitioned CSR index.
40///
41/// Holds one `CsrIndex` per tenant on the owning Data Plane core.
42/// Handlers resolve the caller's tenant to a partition up front; all
43/// downstream graph operations (algorithms, MATCH, traversal) run
44/// against that single partition and cannot reach any other tenant's
45/// state.
46pub struct ShardedCsrIndex {
47    partitions: HashMap<TenantId, CsrIndex>,
48}
49
50impl ShardedCsrIndex {
51    /// Construct an empty sharded index with no partitions.
52    pub fn new() -> Self {
53        Self {
54            partitions: HashMap::new(),
55        }
56    }
57
58    /// Shared access to a tenant's partition, if it exists.
59    ///
60    /// Returns `None` if the tenant has never inserted any graph data.
61    /// Callers expecting a read-only view of a possibly-empty partition
62    /// should treat `None` as "empty" rather than as an error.
63    pub fn partition(&self, tid: TenantId) -> Option<&CsrIndex> {
64        self.partitions.get(&tid)
65    }
66
67    /// Mutable access to a tenant's partition, if it exists.
68    pub fn partition_mut(&mut self, tid: TenantId) -> Option<&mut CsrIndex> {
69        self.partitions.get_mut(&tid)
70    }
71
72    /// Mutable access to a tenant's partition, creating an empty one
73    /// on first use.
74    ///
75    /// This is the canonical write-path entry point: insertion handlers
76    /// call this once to resolve the partition, then operate on the
77    /// returned `&mut CsrIndex` exactly as they would on a standalone
78    /// instance.
79    pub fn get_or_create(&mut self, tid: TenantId) -> &mut CsrIndex {
80        self.partitions.entry(tid).or_default()
81    }
82
83    /// Drop a tenant's entire graph state.
84    ///
85    /// Returns `true` if a partition existed and was removed, `false` if
86    /// the tenant had no graph state. Used by tenant-purge flows — O(1)
87    /// structural deletion replaces the former "range-scan and erase
88    /// every key with the `{tid}:` prefix" approach, which was both
89    /// slow and coupled to the lexical encoding.
90    pub fn drop_partition(&mut self, tid: TenantId) -> bool {
91        self.partitions.remove(&tid).is_some()
92    }
93
94    /// Collection-scoped in-memory reclaim.
95    ///
96    /// The CSR is collection-agnostic in memory: a tenant's partition holds
97    /// edges from **all** collections in a single adjacency structure. There
98    /// is no per-collection sub-partition to drop. As a result this method is
99    /// intentionally a no-op — collection-scoped edge removal from persistent
100    /// storage is handled by [`EdgeStore::purge_collection`], and the stale
101    /// in-memory CSR state is eliminated on the next tenant `drop_partition`
102    /// call (triggered by tenant deletion) or on server restart (CSR is
103    /// rebuilt from the now-clean EdgeStore).
104    pub fn drop_collection(&mut self, _tid: TenantId, _collection: &str) {
105        // Intentional no-op. See doc comment above.
106    }
107
108    /// Whether a partition exists for the tenant.
109    pub fn contains_partition(&self, tid: TenantId) -> bool {
110        self.partitions.contains_key(&tid)
111    }
112
113    /// Number of tenants with graph state on this core.
114    pub fn partition_count(&self) -> usize {
115        self.partitions.len()
116    }
117
118    /// Iterate all (tenant, partition) pairs. Used for checkpointing,
119    /// memory accounting, and administrative views that genuinely
120    /// need to see every tenant's state.
121    pub fn iter(&self) -> Iter<'_, TenantId, CsrIndex> {
122        self.partitions.iter()
123    }
124
125    /// Mutable iteration over all partitions. Used by `compact()` and
126    /// similar maintenance passes that apply per-partition without
127    /// needing tenant routing.
128    pub fn iter_mut(&mut self) -> IterMut<'_, TenantId, CsrIndex> {
129        self.partitions.iter_mut()
130    }
131
132    /// Compact every partition. Mirrors `CsrIndex::compact` but across
133    /// the full set — maintenance handlers call this once per core.
134    pub fn compact_all(&mut self) {
135        for (_tid, part) in self.iter_mut() {
136            part.compact();
137        }
138    }
139
140    /// Replace an existing partition (or install a new one) with the
141    /// given `CsrIndex`. Used by the rebuild path — after rebuilding a
142    /// tenant's CSR from persistent edge storage, this installs it
143    /// atomically.
144    pub fn install_partition(&mut self, tid: TenantId, csr: CsrIndex) {
145        self.partitions.insert(tid, csr);
146    }
147
148    /// Access or create a partition via the `Entry` API, for cases that
149    /// need conditional initialization with a non-default constructor.
150    pub fn entry(&mut self, tid: TenantId) -> Entry<'_, TenantId, CsrIndex> {
151        self.partitions.entry(tid)
152    }
153}
154
155impl Default for ShardedCsrIndex {
156    fn default() -> Self {
157        Self::new()
158    }
159}
160
161#[cfg(test)]
162mod tests {
163    use super::*;
164
165    fn tid(n: u32) -> TenantId {
166        TenantId::new(n)
167    }
168
169    #[test]
170    fn empty_sharded_has_no_partitions() {
171        let sharded = ShardedCsrIndex::new();
172        assert_eq!(sharded.partition_count(), 0);
173        assert!(!sharded.contains_partition(tid(1)));
174        assert!(sharded.partition(tid(1)).is_none());
175    }
176
177    #[test]
178    fn get_or_create_installs_empty_partition() {
179        let mut sharded = ShardedCsrIndex::new();
180        let part = sharded.get_or_create(tid(7));
181        assert_eq!(part.node_count(), 0);
182        assert!(sharded.contains_partition(tid(7)));
183        assert_eq!(sharded.partition_count(), 1);
184    }
185
186    #[test]
187    fn partitions_are_isolated_by_tenant() {
188        // Two tenants insert identically-named nodes. Neither can see
189        // the other's graph state — the test encodes the core
190        // architectural guarantee of Option C: isolation by partition,
191        // not by key prefix.
192        let mut sharded = ShardedCsrIndex::new();
193
194        sharded
195            .get_or_create(tid(1))
196            .add_edge("alice", "knows", "bob")
197            .unwrap();
198        sharded
199            .get_or_create(tid(2))
200            .add_edge("alice", "knows", "carol")
201            .unwrap();
202
203        let p1 = sharded.partition(tid(1)).unwrap();
204        let p2 = sharded.partition(tid(2)).unwrap();
205
206        // Tenant 1 sees alice→bob; no carol.
207        assert!(p1.contains_node("alice"));
208        assert!(p1.contains_node("bob"));
209        assert!(!p1.contains_node("carol"));
210
211        // Tenant 2 sees alice→carol; no bob.
212        assert!(p2.contains_node("alice"));
213        assert!(p2.contains_node("carol"));
214        assert!(!p2.contains_node("bob"));
215
216        // `alice` exists in both partitions but refers to two different
217        // logical nodes — no collision, no ambiguity, no lexical prefix.
218    }
219
220    #[test]
221    fn node_names_are_unprefixed() {
222        let mut sharded = ShardedCsrIndex::new();
223        sharded
224            .get_or_create(tid(42))
225            .add_edge("alice", "knows", "bob")
226            .unwrap();
227        sharded.get_or_create(tid(42)).compact();
228
229        let part = sharded.partition(tid(42)).unwrap();
230        let alice_id = part.node_id("alice").expect("alice must be present");
231        // The stored name is exactly what the caller inserted — never
232        // `"42:alice"`. Structural partitioning keeps shard ids out of
233        // user-visible keys.
234        assert_eq!(part.node_name(alice_id), "alice");
235    }
236
237    #[test]
238    fn drop_partition_removes_tenant_state() {
239        let mut sharded = ShardedCsrIndex::new();
240        sharded
241            .get_or_create(tid(1))
242            .add_edge("a", "l", "b")
243            .unwrap();
244        assert!(sharded.contains_partition(tid(1)));
245
246        assert!(sharded.drop_partition(tid(1)));
247        assert!(!sharded.contains_partition(tid(1)));
248        assert_eq!(sharded.partition_count(), 0);
249
250        // Second drop is a no-op, not an error.
251        assert!(!sharded.drop_partition(tid(1)));
252    }
253
254    #[test]
255    fn drop_partition_does_not_touch_other_tenants() {
256        let mut sharded = ShardedCsrIndex::new();
257        sharded
258            .get_or_create(tid(1))
259            .add_edge("a", "l", "b")
260            .unwrap();
261        sharded
262            .get_or_create(tid(2))
263            .add_edge("c", "l", "d")
264            .unwrap();
265
266        sharded.drop_partition(tid(1));
267        assert!(!sharded.contains_partition(tid(1)));
268        assert!(sharded.contains_partition(tid(2)));
269        assert!(sharded.partition(tid(2)).unwrap().contains_node("c"));
270    }
271
272    #[test]
273    fn install_partition_replaces_existing() {
274        let mut sharded = ShardedCsrIndex::new();
275        sharded
276            .get_or_create(tid(1))
277            .add_edge("old", "l", "value")
278            .unwrap();
279
280        let mut replacement = CsrIndex::new();
281        replacement.add_edge("new", "l", "value").unwrap();
282        sharded.install_partition(tid(1), replacement);
283
284        let part = sharded.partition(tid(1)).unwrap();
285        assert!(part.contains_node("new"));
286        assert!(!part.contains_node("old"));
287    }
288
289    #[test]
290    fn compact_all_applies_to_every_partition() {
291        let mut sharded = ShardedCsrIndex::new();
292        for t in 1..=3 {
293            sharded
294                .get_or_create(tid(t))
295                .add_edge("a", "l", "b")
296                .unwrap();
297        }
298        // Pre-compact: edges live in the write buffer. `compact_all`
299        // merges them into the dense CSR arrays for every partition.
300        sharded.compact_all();
301        for t in 1..=3 {
302            let part = sharded.partition(tid(t)).unwrap();
303            assert_eq!(part.edge_count(), 1);
304            assert_eq!(part.node_count(), 2);
305        }
306    }
307}