Skip to main content

nodedb_cluster/
shard_split.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Shard splitting: vector-aware and graph-aware partitioning.
4//!
5//! When a shard becomes overloaded, it can be split into two shards.
6//! The splitting strategy is engine-aware:
7//!
8//! - **Vector-aware**: split by collection + partition key. Each resulting
9//!   shard holds a subset of the collection's vectors with its own HNSW
10//!   index. Cross-shard k-NN queries use scatter-gather with result
11//!   merging on the Control Plane.
12//!
13//! - **Graph-aware**: balanced partitioning minimizing cross-shard edges.
14//!   Uses a greedy heuristic (BFS-based community detection) rather than
15//!   full METIS for practical performance. Cross-shard traversals use
16//!   existing scatter-gather infrastructure with ghost edges.
17//!
18//! - **Speculative prefetch**: when scatter-gather dispatches to multiple
19//!   shards, co-located vector and graph shards for the same tenant are
20//!   prefetched together to reduce round-trip latency.
21
22use std::collections::{HashMap, HashSet};
23
24use serde::{Deserialize, Serialize};
25use tracing::info;
26
27use crate::routing::RoutingTable;
28
29/// A shard split plan: which vectors/documents move to the new shard.
30#[derive(Debug, Clone)]
31pub struct SplitPlan {
32    /// Source vShard being split.
33    pub source_vshard: u32,
34    /// New vShard ID for the second half.
35    pub new_vshard: u32,
36    /// Target node for the new shard.
37    pub target_node: u64,
38    /// Document IDs that move to the new shard.
39    pub documents_to_move: Vec<String>,
40    /// Strategy used.
41    pub strategy: SplitStrategy,
42    /// Estimated data size moving (bytes).
43    pub estimated_bytes: u64,
44}
45
46/// Splitting strategy.
47#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
48pub enum SplitStrategy {
49    /// Split by partition key hash (vector-aware).
50    /// Each half gets vectors whose partition key hash falls in its range.
51    VectorPartitionKey,
52    /// Split by graph community (graph-aware).
53    /// Minimizes cross-shard edges using BFS community detection.
54    GraphCommunity,
55    /// Simple even split by document ID hash.
56    EvenHash,
57}
58
59/// Plan a vector-aware shard split.
60///
61/// Splits documents by partition key: documents whose key hashes to
62/// the lower half stay on the source shard, upper half moves to the new shard.
63/// Each resulting shard builds its own HNSW index independently.
64///
65/// Cross-shard k-NN: the Control Plane dispatches VectorSearch to both
66/// shards via scatter-gather, collects top-k from each, and merges by
67/// distance on the Control Plane.
68pub fn plan_vector_split(
69    source_vshard: u32,
70    new_vshard: u32,
71    target_node: u64,
72    document_ids: &[String],
73) -> SplitPlan {
74    let mid = document_ids.len() / 2;
75
76    // Sort by partition key hash for deterministic splitting.
77    let mut sorted: Vec<(u64, String)> = document_ids
78        .iter()
79        .map(|id| (partition_hash(id), id.clone()))
80        .collect();
81    sorted.sort_by_key(|(hash, _)| *hash);
82
83    let documents_to_move: Vec<String> = sorted[mid..].iter().map(|(_, id)| id.clone()).collect();
84
85    info!(
86        source_vshard,
87        new_vshard,
88        target_node,
89        docs_moving = documents_to_move.len(),
90        docs_staying = mid,
91        "vector-aware split planned"
92    );
93
94    SplitPlan {
95        source_vshard,
96        new_vshard,
97        target_node,
98        documents_to_move,
99        strategy: SplitStrategy::VectorPartitionKey,
100        estimated_bytes: 0, // Caller fills in from actual data.
101    }
102}
103
104/// Plan a graph-aware shard split.
105///
106/// Uses BFS-based community detection to partition nodes into two groups
107/// that minimize cross-shard edges. Starting from a seed node, BFS assigns
108/// the first half of discovered nodes to group A, the rest to group B.
109///
110/// This is a greedy heuristic — not optimal like METIS, but O(V+E) and
111/// practical for online splitting without blocking queries.
112pub fn plan_graph_split(
113    source_vshard: u32,
114    new_vshard: u32,
115    target_node: u64,
116    node_ids: &[String],
117    edges: &[(String, String)],
118) -> SplitPlan {
119    if node_ids.is_empty() {
120        return SplitPlan {
121            source_vshard,
122            new_vshard,
123            target_node,
124            documents_to_move: Vec::new(),
125            strategy: SplitStrategy::GraphCommunity,
126            estimated_bytes: 0,
127        };
128    }
129
130    // Build adjacency list for BFS.
131    let mut adj: HashMap<&str, Vec<&str>> = HashMap::new();
132    for (src, dst) in edges {
133        adj.entry(src.as_str()).or_default().push(dst.as_str());
134        adj.entry(dst.as_str()).or_default().push(src.as_str());
135    }
136
137    // BFS from first node, assign first half to group A.
138    let mut visited: Vec<String> = Vec::with_capacity(node_ids.len());
139    let mut seen: HashSet<&str> = HashSet::new();
140    let mut queue: std::collections::VecDeque<&str> = std::collections::VecDeque::new();
141
142    // Start BFS from the node with the most edges (hub).
143    let start = node_ids
144        .iter()
145        .max_by_key(|id| adj.get(id.as_str()).map(|v| v.len()).unwrap_or(0))
146        .map(|s| s.as_str())
147        .unwrap_or(node_ids[0].as_str());
148
149    queue.push_back(start);
150    seen.insert(start);
151
152    while let Some(node) = queue.pop_front() {
153        visited.push(node.to_string());
154        if let Some(neighbors) = adj.get(node) {
155            for &neighbor in neighbors {
156                if seen.insert(neighbor) {
157                    queue.push_back(neighbor);
158                }
159            }
160        }
161    }
162
163    // Add any disconnected nodes not reached by BFS.
164    for id in node_ids {
165        if seen.insert(id.as_str()) {
166            visited.push(id.clone());
167        }
168    }
169
170    // Second half moves to new shard.
171    let mid = visited.len() / 2;
172    let documents_to_move: Vec<String> = visited[mid..].to_vec();
173
174    // Count cross-shard edges.
175    let group_a: HashSet<&str> = visited[..mid].iter().map(|s| s.as_str()).collect();
176    let cross_edges = edges
177        .iter()
178        .filter(|(src, dst)| {
179            let a_has_src = group_a.contains(src.as_str());
180            let a_has_dst = group_a.contains(dst.as_str());
181            a_has_src != a_has_dst
182        })
183        .count();
184
185    info!(
186        source_vshard,
187        new_vshard,
188        total_nodes = node_ids.len(),
189        cross_edges,
190        "graph-aware split planned (BFS community)"
191    );
192
193    SplitPlan {
194        source_vshard,
195        new_vshard,
196        target_node,
197        documents_to_move,
198        strategy: SplitStrategy::GraphCommunity,
199        estimated_bytes: 0,
200    }
201}
202
203/// Speculative prefetch plan for cross-shard scatter-gather queries.
204///
205/// When a query must scatter to multiple shards, this identifies which
206/// additional shards should be prefetched based on co-location hints:
207///
208/// - Vector shards for the same tenant/collection are prefetched together
209/// - Graph shards adjacent to queried shards are prefetched
210///
211/// Returns additional vShard IDs to include in the scatter batch for
212/// reduced round-trip latency.
213pub fn speculative_prefetch_shards(
214    query_vshards: &[u32],
215    _routing: &RoutingTable,
216    tenant_collections: &[(nodedb_types::id::DatabaseId, u32, String)],
217) -> Vec<u32> {
218    let mut prefetch: HashSet<u32> = HashSet::new();
219    let queried: HashSet<u32> = query_vshards.iter().copied().collect();
220
221    // For each (database, tenant, collection), find all vShards that might hold
222    // data for the same collection (co-located shards).
223    for (database_id, _tenant_id, collection) in tenant_collections {
224        // Hash the (database, collection) pair to find its primary vShard.
225        let primary = crate::routing::vshard_for_collection(*database_id, collection);
226
227        // Adjacent vShards (±1, ±2) are likely to hold related data
228        // due to hash distribution locality.
229        for offset in [1u32, 2] {
230            let adjacent_low = primary.wrapping_sub(offset);
231            let adjacent_high = primary.wrapping_add(offset);
232            if !queried.contains(&adjacent_low) {
233                prefetch.insert(adjacent_low);
234            }
235            if !queried.contains(&adjacent_high) {
236                prefetch.insert(adjacent_high);
237            }
238        }
239    }
240
241    // Limit prefetch to avoid excessive fan-out.
242    let max_prefetch = 8;
243    prefetch.into_iter().take(max_prefetch).collect()
244}
245
246/// Deterministic hash for partition key splitting.
247fn partition_hash(key: &str) -> u64 {
248    crate::routing::fnv1a_hash(key)
249}
250
251#[cfg(test)]
252mod tests {
253    use super::*;
254
255    #[test]
256    fn vector_split_even() {
257        let docs: Vec<String> = (0..100).map(|i| format!("doc_{i}")).collect();
258        let plan = plan_vector_split(10, 20, 3, &docs);
259        assert_eq!(plan.source_vshard, 10);
260        assert_eq!(plan.new_vshard, 20);
261        assert_eq!(plan.documents_to_move.len(), 50);
262        assert_eq!(plan.strategy, SplitStrategy::VectorPartitionKey);
263    }
264
265    #[test]
266    fn graph_split_minimizes_cross_edges() {
267        // Chain: a→b→c→d→e. Splitting at midpoint should produce
268        // fewer cross-edges than random split.
269        let nodes: Vec<String> = vec!["a", "b", "c", "d", "e"]
270            .into_iter()
271            .map(|s| s.to_string())
272            .collect();
273        let edges: Vec<(String, String)> = vec![
274            ("a".into(), "b".into()),
275            ("b".into(), "c".into()),
276            ("c".into(), "d".into()),
277            ("d".into(), "e".into()),
278        ];
279        let plan = plan_graph_split(10, 20, 3, &nodes, &edges);
280        // 5 nodes split: mid = 5/2 = 2, so 3 docs move (5 - 2).
281        assert_eq!(plan.documents_to_move.len(), 3);
282        assert_eq!(plan.strategy, SplitStrategy::GraphCommunity);
283    }
284
285    #[test]
286    fn graph_split_empty() {
287        let plan = plan_graph_split(10, 20, 3, &[], &[]);
288        assert!(plan.documents_to_move.is_empty());
289    }
290
291    #[test]
292    fn speculative_prefetch_limits() {
293        let routing = RoutingTable::uniform(4, &[1, 2], 1);
294        let prefetch = speculative_prefetch_shards(
295            &[0, 1],
296            &routing,
297            &[(nodedb_types::id::DatabaseId::DEFAULT, 1, "users".into())],
298        );
299        assert!(prefetch.len() <= 8); // Max prefetch limit.
300    }
301
302    #[test]
303    fn partition_hash_deterministic() {
304        let h1 = partition_hash("document_42");
305        let h2 = partition_hash("document_42");
306        assert_eq!(h1, h2);
307    }
308}