Skip to main content

nodedb_graph/
sharded.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Tenant-partitioned CSR index.
4//!
5//! The graph engine multiplexes many tenants onto one per-core `CsrIndex`.
6//! Historically that multiplexing was done by prepending `"{tid}:"` to every
7//! node key before handing it to the CSR — a lexical scheme that required
8//! every API boundary (results, traversal output, algorithm emit) to strip
9//! the prefix on the way back out. The heuristic strippers were fragile
10//! (mangled any id that happened to match the shape) and the strip was
11//! opt-in (forgetting it leaked internal keys to clients).
12//!
13//! `ShardedCsrIndex` replaces that scheme with structural partitioning:
14//! one independent `CsrIndex` per tenant, keyed in a `HashMap<TenantId, _>`.
15//! Node keys inside a partition are the raw user-visible names — no prefix,
16//! no strip, no heuristic. Cross-tenant access is not a question of auth
17//! checks; it is structurally impossible because partitions don't share
18//! key space.
19//!
20//! ## Invariants
21//!
22//! - **One tenant per partition.** Every `CsrIndex` held in `partitions`
23//!   contains only one tenant's nodes and edges. Mixing is a programming
24//!   bug, not a runtime condition.
25//! - **No key prefixing.** Callers pass user-visible names directly; the
26//!   partition never sees a scoped form.
27//! - **Partition lifecycle is explicit.** `get_or_create` constructs on
28//!   first use; `drop_partition` removes atomically. No auto-eviction at
29//!   this layer (belongs to a future pool-management concern).
30//! - **Data Plane shape preserved.** This type is `!Send` in the same way
31//!   `CsrIndex` is (via `Cell<u32>` access counters). It is owned by a
32//!   single Data Plane core.
33
34use std::collections::HashMap;
35use std::collections::hash_map::{Entry, Iter, IterMut};
36
37use nodedb_types::TenantId;
38
39use crate::GraphError;
40use crate::csr::CsrIndex;
41
42/// Per-tenant partitioned CSR index.
43///
44/// Holds one `CsrIndex` per tenant on the owning Data Plane core.
45/// Handlers resolve the caller's tenant to a partition up front; all
46/// downstream graph operations (algorithms, MATCH, traversal) run
47/// against that single partition and cannot reach any other tenant's
48/// state.
49pub struct ShardedCsrIndex {
50    partitions: HashMap<TenantId, CsrIndex>,
51}
52
53impl ShardedCsrIndex {
54    /// Construct an empty sharded index with no partitions.
55    pub fn new() -> Self {
56        Self {
57            partitions: HashMap::new(),
58        }
59    }
60
61    /// Shared access to a tenant's partition, if it exists.
62    ///
63    /// Returns `None` if the tenant has never inserted any graph data.
64    /// Callers expecting a read-only view of a possibly-empty partition
65    /// should treat `None` as "empty" rather than as an error.
66    pub fn partition(&self, tid: TenantId) -> Option<&CsrIndex> {
67        self.partitions.get(&tid)
68    }
69
70    /// Mutable access to a tenant's partition, if it exists.
71    pub fn partition_mut(&mut self, tid: TenantId) -> Option<&mut CsrIndex> {
72        self.partitions.get_mut(&tid)
73    }
74
75    /// Mutable access to a tenant's partition, creating an empty one
76    /// on first use.
77    ///
78    /// This is the canonical write-path entry point: insertion handlers
79    /// call this once to resolve the partition, then operate on the
80    /// returned `&mut CsrIndex` exactly as they would on a standalone
81    /// instance.
82    pub fn get_or_create(&mut self, tid: TenantId) -> &mut CsrIndex {
83        self.partitions.entry(tid).or_default()
84    }
85
86    /// Drop a tenant's entire graph state.
87    ///
88    /// Returns `true` if a partition existed and was removed, `false` if
89    /// the tenant had no graph state. Used by tenant-purge flows — O(1)
90    /// structural deletion replaces the former "range-scan and erase
91    /// every key with the `{tid}:` prefix" approach, which was both
92    /// slow and coupled to the lexical encoding.
93    pub fn drop_partition(&mut self, tid: TenantId) -> bool {
94        self.partitions.remove(&tid).is_some()
95    }
96
97    /// Collection-scoped in-memory reclaim.
98    ///
99    /// The CSR is collection-agnostic in memory: a tenant's partition holds
100    /// edges from **all** collections in a single adjacency structure. There
101    /// is no per-collection sub-partition to drop. As a result this method is
102    /// intentionally a no-op — collection-scoped edge removal from persistent
103    /// storage is handled by [`EdgeStore::purge_collection`], and the stale
104    /// in-memory CSR state is eliminated on the next tenant `drop_partition`
105    /// call (triggered by tenant deletion) or on server restart (CSR is
106    /// rebuilt from the now-clean EdgeStore).
107    pub fn drop_collection(&mut self, _tid: TenantId, _collection: &str) {
108        // Intentional no-op. See doc comment above.
109    }
110
111    /// Whether a partition exists for the tenant.
112    pub fn contains_partition(&self, tid: TenantId) -> bool {
113        self.partitions.contains_key(&tid)
114    }
115
116    /// Number of tenants with graph state on this core.
117    pub fn partition_count(&self) -> usize {
118        self.partitions.len()
119    }
120
121    /// Iterate all (tenant, partition) pairs. Used for checkpointing,
122    /// memory accounting, and administrative views that genuinely
123    /// need to see every tenant's state.
124    pub fn iter(&self) -> Iter<'_, TenantId, CsrIndex> {
125        self.partitions.iter()
126    }
127
128    /// Mutable iteration over all partitions. Used by `compact()` and
129    /// similar maintenance passes that apply per-partition without
130    /// needing tenant routing.
131    pub fn iter_mut(&mut self) -> IterMut<'_, TenantId, CsrIndex> {
132        self.partitions.iter_mut()
133    }
134
135    /// Compact every partition. Mirrors `CsrIndex::compact` but across
136    /// the full set — maintenance handlers call this once per core.
137    ///
138    /// # Errors
139    ///
140    /// Returns the first [`GraphError::MemoryBudget`] encountered if any
141    /// partition has a memory governor installed and the budget is exhausted.
142    /// Partitions already compacted before the error are not rolled back.
143    pub fn compact_all(&mut self) -> Result<(), GraphError> {
144        for (_tid, part) in self.iter_mut() {
145            part.compact()?;
146        }
147        Ok(())
148    }
149
150    /// Replace an existing partition (or install a new one) with the
151    /// given `CsrIndex`. Used by the rebuild path — after rebuilding a
152    /// tenant's CSR from persistent edge storage, this installs it
153    /// atomically.
154    pub fn install_partition(&mut self, tid: TenantId, csr: CsrIndex) {
155        self.partitions.insert(tid, csr);
156    }
157
158    /// Access or create a partition via the `Entry` API, for cases that
159    /// need conditional initialization with a non-default constructor.
160    pub fn entry(&mut self, tid: TenantId) -> Entry<'_, TenantId, CsrIndex> {
161        self.partitions.entry(tid)
162    }
163}
164
165impl Default for ShardedCsrIndex {
166    fn default() -> Self {
167        Self::new()
168    }
169}
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174
175    fn tid(n: u64) -> TenantId {
176        TenantId::new(n)
177    }
178
179    #[test]
180    fn empty_sharded_has_no_partitions() {
181        let sharded = ShardedCsrIndex::new();
182        assert_eq!(sharded.partition_count(), 0);
183        assert!(!sharded.contains_partition(tid(1)));
184        assert!(sharded.partition(tid(1)).is_none());
185    }
186
187    #[test]
188    fn get_or_create_installs_empty_partition() {
189        let mut sharded = ShardedCsrIndex::new();
190        let part = sharded.get_or_create(tid(7));
191        assert_eq!(part.node_count(), 0);
192        assert!(sharded.contains_partition(tid(7)));
193        assert_eq!(sharded.partition_count(), 1);
194    }
195
196    #[test]
197    fn partitions_are_isolated_by_tenant() {
198        // Two tenants insert identically-named nodes. Neither can see
199        // the other's graph state — the test encodes the core
200        // architectural guarantee of Option C: isolation by partition,
201        // not by key prefix.
202        let mut sharded = ShardedCsrIndex::new();
203
204        sharded
205            .get_or_create(tid(1))
206            .add_edge("alice", "knows", "bob")
207            .unwrap();
208        sharded
209            .get_or_create(tid(2))
210            .add_edge("alice", "knows", "carol")
211            .unwrap();
212
213        let p1 = sharded.partition(tid(1)).unwrap();
214        let p2 = sharded.partition(tid(2)).unwrap();
215
216        // Tenant 1 sees alice→bob; no carol.
217        assert!(p1.contains_node("alice"));
218        assert!(p1.contains_node("bob"));
219        assert!(!p1.contains_node("carol"));
220
221        // Tenant 2 sees alice→carol; no bob.
222        assert!(p2.contains_node("alice"));
223        assert!(p2.contains_node("carol"));
224        assert!(!p2.contains_node("bob"));
225
226        // `alice` exists in both partitions but refers to two different
227        // logical nodes — no collision, no ambiguity, no lexical prefix.
228    }
229
230    #[test]
231    fn node_names_are_unprefixed() {
232        let mut sharded = ShardedCsrIndex::new();
233        sharded
234            .get_or_create(tid(42))
235            .add_edge("alice", "knows", "bob")
236            .unwrap();
237        sharded
238            .get_or_create(tid(42))
239            .compact()
240            .expect("no governor, cannot fail");
241
242        let part = sharded.partition(tid(42)).unwrap();
243        let alice_id = part.node_id("alice").expect("alice must be present");
244        // The stored name is exactly what the caller inserted — never
245        // `"42:alice"`. Structural partitioning keeps shard ids out of
246        // user-visible keys.
247        assert_eq!(part.node_name(alice_id), "alice");
248    }
249
250    #[test]
251    fn drop_partition_removes_tenant_state() {
252        let mut sharded = ShardedCsrIndex::new();
253        sharded
254            .get_or_create(tid(1))
255            .add_edge("a", "l", "b")
256            .unwrap();
257        assert!(sharded.contains_partition(tid(1)));
258
259        assert!(sharded.drop_partition(tid(1)));
260        assert!(!sharded.contains_partition(tid(1)));
261        assert_eq!(sharded.partition_count(), 0);
262
263        // Second drop is a no-op, not an error.
264        assert!(!sharded.drop_partition(tid(1)));
265    }
266
267    #[test]
268    fn drop_partition_does_not_touch_other_tenants() {
269        let mut sharded = ShardedCsrIndex::new();
270        sharded
271            .get_or_create(tid(1))
272            .add_edge("a", "l", "b")
273            .unwrap();
274        sharded
275            .get_or_create(tid(2))
276            .add_edge("c", "l", "d")
277            .unwrap();
278
279        sharded.drop_partition(tid(1));
280        assert!(!sharded.contains_partition(tid(1)));
281        assert!(sharded.contains_partition(tid(2)));
282        assert!(sharded.partition(tid(2)).unwrap().contains_node("c"));
283    }
284
285    #[test]
286    fn install_partition_replaces_existing() {
287        let mut sharded = ShardedCsrIndex::new();
288        sharded
289            .get_or_create(tid(1))
290            .add_edge("old", "l", "value")
291            .unwrap();
292
293        let mut replacement = CsrIndex::new();
294        replacement.add_edge("new", "l", "value").unwrap();
295        sharded.install_partition(tid(1), replacement);
296
297        let part = sharded.partition(tid(1)).unwrap();
298        assert!(part.contains_node("new"));
299        assert!(!part.contains_node("old"));
300    }
301
302    #[test]
303    fn compact_all_applies_to_every_partition() {
304        let mut sharded = ShardedCsrIndex::new();
305        for t in 1..=3 {
306            sharded
307                .get_or_create(tid(t))
308                .add_edge("a", "l", "b")
309                .unwrap();
310        }
311        // Pre-compact: edges live in the write buffer. `compact_all`
312        // merges them into the dense CSR arrays for every partition.
313        sharded.compact_all().expect("no governor, cannot fail");
314        for t in 1..=3 {
315            let part = sharded.partition(tid(t)).unwrap();
316            assert_eq!(part.edge_count(), 1);
317            assert_eq!(part.node_count(), 2);
318        }
319    }
320}