nodedb_graph/sharded.rs
1// SPDX-License-Identifier: Apache-2.0
2
3//! Tenant-partitioned CSR index.
4//!
5//! The graph engine multiplexes many tenants onto one per-core `CsrIndex`.
6//! Historically that multiplexing was done by prepending `"{tid}:"` to every
7//! node key before handing it to the CSR — a lexical scheme that required
8//! every API boundary (results, traversal output, algorithm emit) to strip
9//! the prefix on the way back out. The heuristic strippers were fragile
10//! (mangled any id that happened to match the shape) and the strip was
11//! opt-in (forgetting it leaked internal keys to clients).
12//!
13//! `ShardedCsrIndex` replaces that scheme with structural partitioning:
14//! one independent `CsrIndex` per tenant, keyed in a `HashMap<TenantId, _>`.
15//! Node keys inside a partition are the raw user-visible names — no prefix,
16//! no strip, no heuristic. Cross-tenant access is not a question of auth
17//! checks; it is structurally impossible because partitions don't share
18//! key space.
19//!
20//! ## Invariants
21//!
22//! - **One tenant per partition.** Every `CsrIndex` held in `partitions`
23//! contains only one tenant's nodes and edges. Mixing is a programming
24//! bug, not a runtime condition.
25//! - **No key prefixing.** Callers pass user-visible names directly; the
26//! partition never sees a scoped form.
27//! - **Partition lifecycle is explicit.** `get_or_create` constructs on
28//! first use; `drop_partition` removes atomically. No auto-eviction at
29//! this layer (belongs to a future pool-management concern).
30//! - **Data Plane shape preserved.** This type is `!Send` in the same way
31//! `CsrIndex` is (via `Cell<u32>` access counters). It is owned by a
32//! single Data Plane core.
33
34use std::collections::HashMap;
35use std::collections::hash_map::{Entry, Iter, IterMut};
36
37use nodedb_types::TenantId;
38
39use crate::GraphError;
40use crate::csr::CsrIndex;
41
42/// Per-tenant partitioned CSR index.
43///
44/// Holds one `CsrIndex` per tenant on the owning Data Plane core.
45/// Handlers resolve the caller's tenant to a partition up front; all
46/// downstream graph operations (algorithms, MATCH, traversal) run
47/// against that single partition and cannot reach any other tenant's
48/// state.
49pub struct ShardedCsrIndex {
50 partitions: HashMap<TenantId, CsrIndex>,
51}
52
53impl ShardedCsrIndex {
54 /// Construct an empty sharded index with no partitions.
55 pub fn new() -> Self {
56 Self {
57 partitions: HashMap::new(),
58 }
59 }
60
61 /// Shared access to a tenant's partition, if it exists.
62 ///
63 /// Returns `None` if the tenant has never inserted any graph data.
64 /// Callers expecting a read-only view of a possibly-empty partition
65 /// should treat `None` as "empty" rather than as an error.
66 pub fn partition(&self, tid: TenantId) -> Option<&CsrIndex> {
67 self.partitions.get(&tid)
68 }
69
70 /// Mutable access to a tenant's partition, if it exists.
71 pub fn partition_mut(&mut self, tid: TenantId) -> Option<&mut CsrIndex> {
72 self.partitions.get_mut(&tid)
73 }
74
75 /// Mutable access to a tenant's partition, creating an empty one
76 /// on first use.
77 ///
78 /// This is the canonical write-path entry point: insertion handlers
79 /// call this once to resolve the partition, then operate on the
80 /// returned `&mut CsrIndex` exactly as they would on a standalone
81 /// instance.
82 pub fn get_or_create(&mut self, tid: TenantId) -> &mut CsrIndex {
83 self.partitions.entry(tid).or_default()
84 }
85
86 /// Drop a tenant's entire graph state.
87 ///
88 /// Returns `true` if a partition existed and was removed, `false` if
89 /// the tenant had no graph state. Used by tenant-purge flows — O(1)
90 /// structural deletion replaces the former "range-scan and erase
91 /// every key with the `{tid}:` prefix" approach, which was both
92 /// slow and coupled to the lexical encoding.
93 pub fn drop_partition(&mut self, tid: TenantId) -> bool {
94 self.partitions.remove(&tid).is_some()
95 }
96
97 /// Collection-scoped in-memory reclaim.
98 ///
99 /// The CSR is collection-agnostic in memory: a tenant's partition holds
100 /// edges from **all** collections in a single adjacency structure. There
101 /// is no per-collection sub-partition to drop. As a result this method is
102 /// intentionally a no-op — collection-scoped edge removal from persistent
103 /// storage is handled by [`EdgeStore::purge_collection`], and the stale
104 /// in-memory CSR state is eliminated on the next tenant `drop_partition`
105 /// call (triggered by tenant deletion) or on server restart (CSR is
106 /// rebuilt from the now-clean EdgeStore).
107 pub fn drop_collection(&mut self, _tid: TenantId, _collection: &str) {
108 // Intentional no-op. See doc comment above.
109 }
110
111 /// Whether a partition exists for the tenant.
112 pub fn contains_partition(&self, tid: TenantId) -> bool {
113 self.partitions.contains_key(&tid)
114 }
115
116 /// Number of tenants with graph state on this core.
117 pub fn partition_count(&self) -> usize {
118 self.partitions.len()
119 }
120
121 /// Iterate all (tenant, partition) pairs. Used for checkpointing,
122 /// memory accounting, and administrative views that genuinely
123 /// need to see every tenant's state.
124 pub fn iter(&self) -> Iter<'_, TenantId, CsrIndex> {
125 self.partitions.iter()
126 }
127
128 /// Mutable iteration over all partitions. Used by `compact()` and
129 /// similar maintenance passes that apply per-partition without
130 /// needing tenant routing.
131 pub fn iter_mut(&mut self) -> IterMut<'_, TenantId, CsrIndex> {
132 self.partitions.iter_mut()
133 }
134
135 /// Compact every partition. Mirrors `CsrIndex::compact` but across
136 /// the full set — maintenance handlers call this once per core.
137 ///
138 /// # Errors
139 ///
140 /// Returns the first [`GraphError::MemoryBudget`] encountered if any
141 /// partition has a memory governor installed and the budget is exhausted.
142 /// Partitions already compacted before the error are not rolled back.
143 pub fn compact_all(&mut self) -> Result<(), GraphError> {
144 for (_tid, part) in self.iter_mut() {
145 part.compact()?;
146 }
147 Ok(())
148 }
149
150 /// Replace an existing partition (or install a new one) with the
151 /// given `CsrIndex`. Used by the rebuild path — after rebuilding a
152 /// tenant's CSR from persistent edge storage, this installs it
153 /// atomically.
154 pub fn install_partition(&mut self, tid: TenantId, csr: CsrIndex) {
155 self.partitions.insert(tid, csr);
156 }
157
158 /// Access or create a partition via the `Entry` API, for cases that
159 /// need conditional initialization with a non-default constructor.
160 pub fn entry(&mut self, tid: TenantId) -> Entry<'_, TenantId, CsrIndex> {
161 self.partitions.entry(tid)
162 }
163}
164
165impl Default for ShardedCsrIndex {
166 fn default() -> Self {
167 Self::new()
168 }
169}
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174
175 fn tid(n: u64) -> TenantId {
176 TenantId::new(n)
177 }
178
179 #[test]
180 fn empty_sharded_has_no_partitions() {
181 let sharded = ShardedCsrIndex::new();
182 assert_eq!(sharded.partition_count(), 0);
183 assert!(!sharded.contains_partition(tid(1)));
184 assert!(sharded.partition(tid(1)).is_none());
185 }
186
187 #[test]
188 fn get_or_create_installs_empty_partition() {
189 let mut sharded = ShardedCsrIndex::new();
190 let part = sharded.get_or_create(tid(7));
191 assert_eq!(part.node_count(), 0);
192 assert!(sharded.contains_partition(tid(7)));
193 assert_eq!(sharded.partition_count(), 1);
194 }
195
196 #[test]
197 fn partitions_are_isolated_by_tenant() {
198 // Two tenants insert identically-named nodes. Neither can see
199 // the other's graph state — the test encodes the core
200 // architectural guarantee of Option C: isolation by partition,
201 // not by key prefix.
202 let mut sharded = ShardedCsrIndex::new();
203
204 sharded
205 .get_or_create(tid(1))
206 .add_edge("alice", "knows", "bob")
207 .unwrap();
208 sharded
209 .get_or_create(tid(2))
210 .add_edge("alice", "knows", "carol")
211 .unwrap();
212
213 let p1 = sharded.partition(tid(1)).unwrap();
214 let p2 = sharded.partition(tid(2)).unwrap();
215
216 // Tenant 1 sees alice→bob; no carol.
217 assert!(p1.contains_node("alice"));
218 assert!(p1.contains_node("bob"));
219 assert!(!p1.contains_node("carol"));
220
221 // Tenant 2 sees alice→carol; no bob.
222 assert!(p2.contains_node("alice"));
223 assert!(p2.contains_node("carol"));
224 assert!(!p2.contains_node("bob"));
225
226 // `alice` exists in both partitions but refers to two different
227 // logical nodes — no collision, no ambiguity, no lexical prefix.
228 }
229
230 #[test]
231 fn node_names_are_unprefixed() {
232 let mut sharded = ShardedCsrIndex::new();
233 sharded
234 .get_or_create(tid(42))
235 .add_edge("alice", "knows", "bob")
236 .unwrap();
237 sharded
238 .get_or_create(tid(42))
239 .compact()
240 .expect("no governor, cannot fail");
241
242 let part = sharded.partition(tid(42)).unwrap();
243 let alice_id = part.node_id("alice").expect("alice must be present");
244 // The stored name is exactly what the caller inserted — never
245 // `"42:alice"`. Structural partitioning keeps shard ids out of
246 // user-visible keys.
247 assert_eq!(part.node_name(alice_id), "alice");
248 }
249
250 #[test]
251 fn drop_partition_removes_tenant_state() {
252 let mut sharded = ShardedCsrIndex::new();
253 sharded
254 .get_or_create(tid(1))
255 .add_edge("a", "l", "b")
256 .unwrap();
257 assert!(sharded.contains_partition(tid(1)));
258
259 assert!(sharded.drop_partition(tid(1)));
260 assert!(!sharded.contains_partition(tid(1)));
261 assert_eq!(sharded.partition_count(), 0);
262
263 // Second drop is a no-op, not an error.
264 assert!(!sharded.drop_partition(tid(1)));
265 }
266
267 #[test]
268 fn drop_partition_does_not_touch_other_tenants() {
269 let mut sharded = ShardedCsrIndex::new();
270 sharded
271 .get_or_create(tid(1))
272 .add_edge("a", "l", "b")
273 .unwrap();
274 sharded
275 .get_or_create(tid(2))
276 .add_edge("c", "l", "d")
277 .unwrap();
278
279 sharded.drop_partition(tid(1));
280 assert!(!sharded.contains_partition(tid(1)));
281 assert!(sharded.contains_partition(tid(2)));
282 assert!(sharded.partition(tid(2)).unwrap().contains_node("c"));
283 }
284
285 #[test]
286 fn install_partition_replaces_existing() {
287 let mut sharded = ShardedCsrIndex::new();
288 sharded
289 .get_or_create(tid(1))
290 .add_edge("old", "l", "value")
291 .unwrap();
292
293 let mut replacement = CsrIndex::new();
294 replacement.add_edge("new", "l", "value").unwrap();
295 sharded.install_partition(tid(1), replacement);
296
297 let part = sharded.partition(tid(1)).unwrap();
298 assert!(part.contains_node("new"));
299 assert!(!part.contains_node("old"));
300 }
301
302 #[test]
303 fn compact_all_applies_to_every_partition() {
304 let mut sharded = ShardedCsrIndex::new();
305 for t in 1..=3 {
306 sharded
307 .get_or_create(tid(t))
308 .add_edge("a", "l", "b")
309 .unwrap();
310 }
311 // Pre-compact: edges live in the write buffer. `compact_all`
312 // merges them into the dense CSR arrays for every partition.
313 sharded.compact_all().expect("no governor, cannot fail");
314 for t in 1..=3 {
315 let part = sharded.partition(tid(t)).unwrap();
316 assert_eq!(part.edge_count(), 1);
317 assert_eq!(part.node_count(), 2);
318 }
319 }
320}