nodedb_graph/sharded.rs
1//! Tenant-partitioned CSR index.
2//!
3//! The graph engine multiplexes many tenants onto one per-core `CsrIndex`.
4//! Historically that multiplexing was done by prepending `"{tid}:"` to every
5//! node key before handing it to the CSR — a lexical scheme that required
6//! every API boundary (results, traversal output, algorithm emit) to strip
7//! the prefix on the way back out. The heuristic strippers were fragile
8//! (mangled any id that happened to match the shape) and the strip was
9//! opt-in (forgetting it leaked internal keys to clients).
10//!
11//! `ShardedCsrIndex` replaces that scheme with structural partitioning:
12//! one independent `CsrIndex` per tenant, keyed in a `HashMap<TenantId, _>`.
13//! Node keys inside a partition are the raw user-visible names — no prefix,
14//! no strip, no heuristic. Cross-tenant access is not a question of auth
15//! checks; it is structurally impossible because partitions don't share
16//! key space.
17//!
18//! ## Invariants
19//!
20//! - **One tenant per partition.** Every `CsrIndex` held in `partitions`
21//! contains only one tenant's nodes and edges. Mixing is a programming
22//! bug, not a runtime condition.
23//! - **No key prefixing.** Callers pass user-visible names directly; the
24//! partition never sees a scoped form.
25//! - **Partition lifecycle is explicit.** `get_or_create` constructs on
26//! first use; `drop_partition` removes atomically. No auto-eviction at
27//! this layer (belongs to a future pool-management concern).
28//! - **Data Plane shape preserved.** This type is `!Send` in the same way
29//! `CsrIndex` is (via `Cell<u32>` access counters). It is owned by a
30//! single Data Plane core.
31
32use std::collections::HashMap;
33use std::collections::hash_map::{Entry, Iter, IterMut};
34
35use nodedb_types::TenantId;
36
37use crate::csr::CsrIndex;
38
39/// Per-tenant partitioned CSR index.
40///
41/// Holds one `CsrIndex` per tenant on the owning Data Plane core.
42/// Handlers resolve the caller's tenant to a partition up front; all
43/// downstream graph operations (algorithms, MATCH, traversal) run
44/// against that single partition and cannot reach any other tenant's
45/// state.
46pub struct ShardedCsrIndex {
47 partitions: HashMap<TenantId, CsrIndex>,
48}
49
50impl ShardedCsrIndex {
51 /// Construct an empty sharded index with no partitions.
52 pub fn new() -> Self {
53 Self {
54 partitions: HashMap::new(),
55 }
56 }
57
58 /// Shared access to a tenant's partition, if it exists.
59 ///
60 /// Returns `None` if the tenant has never inserted any graph data.
61 /// Callers expecting a read-only view of a possibly-empty partition
62 /// should treat `None` as "empty" rather than as an error.
63 pub fn partition(&self, tid: TenantId) -> Option<&CsrIndex> {
64 self.partitions.get(&tid)
65 }
66
67 /// Mutable access to a tenant's partition, if it exists.
68 pub fn partition_mut(&mut self, tid: TenantId) -> Option<&mut CsrIndex> {
69 self.partitions.get_mut(&tid)
70 }
71
72 /// Mutable access to a tenant's partition, creating an empty one
73 /// on first use.
74 ///
75 /// This is the canonical write-path entry point: insertion handlers
76 /// call this once to resolve the partition, then operate on the
77 /// returned `&mut CsrIndex` exactly as they would on a standalone
78 /// instance.
79 pub fn get_or_create(&mut self, tid: TenantId) -> &mut CsrIndex {
80 self.partitions.entry(tid).or_default()
81 }
82
83 /// Drop a tenant's entire graph state.
84 ///
85 /// Returns `true` if a partition existed and was removed, `false` if
86 /// the tenant had no graph state. Used by tenant-purge flows — O(1)
87 /// structural deletion replaces the former "range-scan and erase
88 /// every key with the `{tid}:` prefix" approach, which was both
89 /// slow and coupled to the lexical encoding.
90 pub fn drop_partition(&mut self, tid: TenantId) -> bool {
91 self.partitions.remove(&tid).is_some()
92 }
93
94 /// Collection-scoped in-memory reclaim.
95 ///
96 /// The CSR is collection-agnostic in memory: a tenant's partition holds
97 /// edges from **all** collections in a single adjacency structure. There
98 /// is no per-collection sub-partition to drop. As a result this method is
99 /// intentionally a no-op — collection-scoped edge removal from persistent
100 /// storage is handled by [`EdgeStore::purge_collection`], and the stale
101 /// in-memory CSR state is eliminated on the next tenant `drop_partition`
102 /// call (triggered by tenant deletion) or on server restart (CSR is
103 /// rebuilt from the now-clean EdgeStore).
104 pub fn drop_collection(&mut self, _tid: TenantId, _collection: &str) {
105 // Intentional no-op. See doc comment above.
106 }
107
108 /// Whether a partition exists for the tenant.
109 pub fn contains_partition(&self, tid: TenantId) -> bool {
110 self.partitions.contains_key(&tid)
111 }
112
113 /// Number of tenants with graph state on this core.
114 pub fn partition_count(&self) -> usize {
115 self.partitions.len()
116 }
117
118 /// Iterate all (tenant, partition) pairs. Used for checkpointing,
119 /// memory accounting, and administrative views that genuinely
120 /// need to see every tenant's state.
121 pub fn iter(&self) -> Iter<'_, TenantId, CsrIndex> {
122 self.partitions.iter()
123 }
124
125 /// Mutable iteration over all partitions. Used by `compact()` and
126 /// similar maintenance passes that apply per-partition without
127 /// needing tenant routing.
128 pub fn iter_mut(&mut self) -> IterMut<'_, TenantId, CsrIndex> {
129 self.partitions.iter_mut()
130 }
131
132 /// Compact every partition. Mirrors `CsrIndex::compact` but across
133 /// the full set — maintenance handlers call this once per core.
134 pub fn compact_all(&mut self) {
135 for (_tid, part) in self.iter_mut() {
136 part.compact();
137 }
138 }
139
140 /// Replace an existing partition (or install a new one) with the
141 /// given `CsrIndex`. Used by the rebuild path — after rebuilding a
142 /// tenant's CSR from persistent edge storage, this installs it
143 /// atomically.
144 pub fn install_partition(&mut self, tid: TenantId, csr: CsrIndex) {
145 self.partitions.insert(tid, csr);
146 }
147
148 /// Access or create a partition via the `Entry` API, for cases that
149 /// need conditional initialization with a non-default constructor.
150 pub fn entry(&mut self, tid: TenantId) -> Entry<'_, TenantId, CsrIndex> {
151 self.partitions.entry(tid)
152 }
153}
154
155impl Default for ShardedCsrIndex {
156 fn default() -> Self {
157 Self::new()
158 }
159}
160
161#[cfg(test)]
162mod tests {
163 use super::*;
164
165 fn tid(n: u32) -> TenantId {
166 TenantId::new(n)
167 }
168
169 #[test]
170 fn empty_sharded_has_no_partitions() {
171 let sharded = ShardedCsrIndex::new();
172 assert_eq!(sharded.partition_count(), 0);
173 assert!(!sharded.contains_partition(tid(1)));
174 assert!(sharded.partition(tid(1)).is_none());
175 }
176
177 #[test]
178 fn get_or_create_installs_empty_partition() {
179 let mut sharded = ShardedCsrIndex::new();
180 let part = sharded.get_or_create(tid(7));
181 assert_eq!(part.node_count(), 0);
182 assert!(sharded.contains_partition(tid(7)));
183 assert_eq!(sharded.partition_count(), 1);
184 }
185
186 #[test]
187 fn partitions_are_isolated_by_tenant() {
188 // Two tenants insert identically-named nodes. Neither can see
189 // the other's graph state — the test encodes the core
190 // architectural guarantee of Option C: isolation by partition,
191 // not by key prefix.
192 let mut sharded = ShardedCsrIndex::new();
193
194 sharded
195 .get_or_create(tid(1))
196 .add_edge("alice", "knows", "bob")
197 .unwrap();
198 sharded
199 .get_or_create(tid(2))
200 .add_edge("alice", "knows", "carol")
201 .unwrap();
202
203 let p1 = sharded.partition(tid(1)).unwrap();
204 let p2 = sharded.partition(tid(2)).unwrap();
205
206 // Tenant 1 sees alice→bob; no carol.
207 assert!(p1.contains_node("alice"));
208 assert!(p1.contains_node("bob"));
209 assert!(!p1.contains_node("carol"));
210
211 // Tenant 2 sees alice→carol; no bob.
212 assert!(p2.contains_node("alice"));
213 assert!(p2.contains_node("carol"));
214 assert!(!p2.contains_node("bob"));
215
216 // `alice` exists in both partitions but refers to two different
217 // logical nodes — no collision, no ambiguity, no lexical prefix.
218 }
219
220 #[test]
221 fn node_names_are_unprefixed() {
222 let mut sharded = ShardedCsrIndex::new();
223 sharded
224 .get_or_create(tid(42))
225 .add_edge("alice", "knows", "bob")
226 .unwrap();
227 sharded.get_or_create(tid(42)).compact();
228
229 let part = sharded.partition(tid(42)).unwrap();
230 let alice_id = part.node_id("alice").expect("alice must be present");
231 // The stored name is exactly what the caller inserted — never
232 // `"42:alice"`. Structural partitioning keeps shard ids out of
233 // user-visible keys.
234 assert_eq!(part.node_name(alice_id), "alice");
235 }
236
237 #[test]
238 fn drop_partition_removes_tenant_state() {
239 let mut sharded = ShardedCsrIndex::new();
240 sharded
241 .get_or_create(tid(1))
242 .add_edge("a", "l", "b")
243 .unwrap();
244 assert!(sharded.contains_partition(tid(1)));
245
246 assert!(sharded.drop_partition(tid(1)));
247 assert!(!sharded.contains_partition(tid(1)));
248 assert_eq!(sharded.partition_count(), 0);
249
250 // Second drop is a no-op, not an error.
251 assert!(!sharded.drop_partition(tid(1)));
252 }
253
254 #[test]
255 fn drop_partition_does_not_touch_other_tenants() {
256 let mut sharded = ShardedCsrIndex::new();
257 sharded
258 .get_or_create(tid(1))
259 .add_edge("a", "l", "b")
260 .unwrap();
261 sharded
262 .get_or_create(tid(2))
263 .add_edge("c", "l", "d")
264 .unwrap();
265
266 sharded.drop_partition(tid(1));
267 assert!(!sharded.contains_partition(tid(1)));
268 assert!(sharded.contains_partition(tid(2)));
269 assert!(sharded.partition(tid(2)).unwrap().contains_node("c"));
270 }
271
272 #[test]
273 fn install_partition_replaces_existing() {
274 let mut sharded = ShardedCsrIndex::new();
275 sharded
276 .get_or_create(tid(1))
277 .add_edge("old", "l", "value")
278 .unwrap();
279
280 let mut replacement = CsrIndex::new();
281 replacement.add_edge("new", "l", "value").unwrap();
282 sharded.install_partition(tid(1), replacement);
283
284 let part = sharded.partition(tid(1)).unwrap();
285 assert!(part.contains_node("new"));
286 assert!(!part.contains_node("old"));
287 }
288
289 #[test]
290 fn compact_all_applies_to_every_partition() {
291 let mut sharded = ShardedCsrIndex::new();
292 for t in 1..=3 {
293 sharded
294 .get_or_create(tid(t))
295 .add_edge("a", "l", "b")
296 .unwrap();
297 }
298 // Pre-compact: edges live in the write buffer. `compact_all`
299 // merges them into the dense CSR arrays for every partition.
300 sharded.compact_all();
301 for t in 1..=3 {
302 let part = sharded.partition(tid(t)).unwrap();
303 assert_eq!(part.edge_count(), 1);
304 assert_eq!(part.node_count(), 2);
305 }
306 }
307}