Skip to main content

nodedb_cluster/
shard_split.rs

1//! Shard splitting: vector-aware and graph-aware partitioning.
2//!
3//! When a shard becomes overloaded, it can be split into two shards.
4//! The splitting strategy is engine-aware:
5//!
6//! - **Vector-aware**: split by collection + partition key. Each resulting
7//!   shard holds a subset of the collection's vectors with its own HNSW
8//!   index. Cross-shard k-NN queries use scatter-gather with result
9//!   merging on the Control Plane.
10//!
11//! - **Graph-aware**: balanced partitioning minimizing cross-shard edges.
12//!   Uses a greedy heuristic (BFS-based community detection) rather than
13//!   full METIS for practical performance. Cross-shard traversals use
14//!   existing scatter-gather infrastructure with ghost edges.
15//!
16//! - **Speculative prefetch**: when scatter-gather dispatches to multiple
17//!   shards, co-located vector and graph shards for the same tenant are
18//!   prefetched together to reduce round-trip latency.
19
20use std::collections::{HashMap, HashSet};
21
22use serde::{Deserialize, Serialize};
23use tracing::info;
24
25use crate::routing::RoutingTable;
26
27/// A shard split plan: which vectors/documents move to the new shard.
28#[derive(Debug, Clone)]
29pub struct SplitPlan {
30    /// Source vShard being split.
31    pub source_vshard: u16,
32    /// New vShard ID for the second half.
33    pub new_vshard: u16,
34    /// Target node for the new shard.
35    pub target_node: u64,
36    /// Document IDs that move to the new shard.
37    pub documents_to_move: Vec<String>,
38    /// Strategy used.
39    pub strategy: SplitStrategy,
40    /// Estimated data size moving (bytes).
41    pub estimated_bytes: u64,
42}
43
44/// Splitting strategy.
45#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
46pub enum SplitStrategy {
47    /// Split by partition key hash (vector-aware).
48    /// Each half gets vectors whose partition key hash falls in its range.
49    VectorPartitionKey,
50    /// Split by graph community (graph-aware).
51    /// Minimizes cross-shard edges using BFS community detection.
52    GraphCommunity,
53    /// Simple even split by document ID hash.
54    EvenHash,
55}
56
57/// Plan a vector-aware shard split.
58///
59/// Splits documents by partition key: documents whose key hashes to
60/// the lower half stay on the source shard, upper half moves to the new shard.
61/// Each resulting shard builds its own HNSW index independently.
62///
63/// Cross-shard k-NN: the Control Plane dispatches VectorSearch to both
64/// shards via scatter-gather, collects top-k from each, and merges by
65/// distance on the Control Plane.
66pub fn plan_vector_split(
67    source_vshard: u16,
68    new_vshard: u16,
69    target_node: u64,
70    document_ids: &[String],
71) -> SplitPlan {
72    let mid = document_ids.len() / 2;
73
74    // Sort by partition key hash for deterministic splitting.
75    let mut sorted: Vec<(u64, String)> = document_ids
76        .iter()
77        .map(|id| (partition_hash(id), id.clone()))
78        .collect();
79    sorted.sort_by_key(|(hash, _)| *hash);
80
81    let documents_to_move: Vec<String> = sorted[mid..].iter().map(|(_, id)| id.clone()).collect();
82
83    info!(
84        source_vshard,
85        new_vshard,
86        target_node,
87        docs_moving = documents_to_move.len(),
88        docs_staying = mid,
89        "vector-aware split planned"
90    );
91
92    SplitPlan {
93        source_vshard,
94        new_vshard,
95        target_node,
96        documents_to_move,
97        strategy: SplitStrategy::VectorPartitionKey,
98        estimated_bytes: 0, // Caller fills in from actual data.
99    }
100}
101
102/// Plan a graph-aware shard split.
103///
104/// Uses BFS-based community detection to partition nodes into two groups
105/// that minimize cross-shard edges. Starting from a seed node, BFS assigns
106/// the first half of discovered nodes to group A, the rest to group B.
107///
108/// This is a greedy heuristic — not optimal like METIS, but O(V+E) and
109/// practical for online splitting without blocking queries.
110pub fn plan_graph_split(
111    source_vshard: u16,
112    new_vshard: u16,
113    target_node: u64,
114    node_ids: &[String],
115    edges: &[(String, String)],
116) -> SplitPlan {
117    if node_ids.is_empty() {
118        return SplitPlan {
119            source_vshard,
120            new_vshard,
121            target_node,
122            documents_to_move: Vec::new(),
123            strategy: SplitStrategy::GraphCommunity,
124            estimated_bytes: 0,
125        };
126    }
127
128    // Build adjacency list for BFS.
129    let mut adj: HashMap<&str, Vec<&str>> = HashMap::new();
130    for (src, dst) in edges {
131        adj.entry(src.as_str()).or_default().push(dst.as_str());
132        adj.entry(dst.as_str()).or_default().push(src.as_str());
133    }
134
135    // BFS from first node, assign first half to group A.
136    let mut visited: Vec<String> = Vec::with_capacity(node_ids.len());
137    let mut seen: HashSet<&str> = HashSet::new();
138    let mut queue: std::collections::VecDeque<&str> = std::collections::VecDeque::new();
139
140    // Start BFS from the node with the most edges (hub).
141    let start = node_ids
142        .iter()
143        .max_by_key(|id| adj.get(id.as_str()).map(|v| v.len()).unwrap_or(0))
144        .map(|s| s.as_str())
145        .unwrap_or(node_ids[0].as_str());
146
147    queue.push_back(start);
148    seen.insert(start);
149
150    while let Some(node) = queue.pop_front() {
151        visited.push(node.to_string());
152        if let Some(neighbors) = adj.get(node) {
153            for &neighbor in neighbors {
154                if seen.insert(neighbor) {
155                    queue.push_back(neighbor);
156                }
157            }
158        }
159    }
160
161    // Add any disconnected nodes not reached by BFS.
162    for id in node_ids {
163        if seen.insert(id.as_str()) {
164            visited.push(id.clone());
165        }
166    }
167
168    // Second half moves to new shard.
169    let mid = visited.len() / 2;
170    let documents_to_move: Vec<String> = visited[mid..].to_vec();
171
172    // Count cross-shard edges.
173    let group_a: HashSet<&str> = visited[..mid].iter().map(|s| s.as_str()).collect();
174    let cross_edges = edges
175        .iter()
176        .filter(|(src, dst)| {
177            let a_has_src = group_a.contains(src.as_str());
178            let a_has_dst = group_a.contains(dst.as_str());
179            a_has_src != a_has_dst
180        })
181        .count();
182
183    info!(
184        source_vshard,
185        new_vshard,
186        total_nodes = node_ids.len(),
187        cross_edges,
188        "graph-aware split planned (BFS community)"
189    );
190
191    SplitPlan {
192        source_vshard,
193        new_vshard,
194        target_node,
195        documents_to_move,
196        strategy: SplitStrategy::GraphCommunity,
197        estimated_bytes: 0,
198    }
199}
200
201/// Speculative prefetch plan for cross-shard scatter-gather queries.
202///
203/// When a query must scatter to multiple shards, this identifies which
204/// additional shards should be prefetched based on co-location hints:
205///
206/// - Vector shards for the same tenant/collection are prefetched together
207/// - Graph shards adjacent to queried shards are prefetched
208///
209/// Returns additional vShard IDs to include in the scatter batch for
210/// reduced round-trip latency.
211pub fn speculative_prefetch_shards(
212    query_vshards: &[u16],
213    _routing: &RoutingTable,
214    tenant_collections: &[(u32, String)],
215) -> Vec<u16> {
216    let mut prefetch: HashSet<u16> = HashSet::new();
217    let queried: HashSet<u16> = query_vshards.iter().copied().collect();
218
219    // For each tenant+collection, find all vShards that might hold
220    // data for the same collection (co-located shards).
221    for (_tenant_id, collection) in tenant_collections {
222        // Hash the collection to find its primary vShard.
223        let primary = crate::routing::vshard_for_collection(collection);
224
225        // Adjacent vShards (±1, ±2) are likely to hold related data
226        // due to hash distribution locality.
227        for offset in [1u16, 2] {
228            let adjacent_low = primary.wrapping_sub(offset);
229            let adjacent_high = primary.wrapping_add(offset);
230            if !queried.contains(&adjacent_low) {
231                prefetch.insert(adjacent_low);
232            }
233            if !queried.contains(&adjacent_high) {
234                prefetch.insert(adjacent_high);
235            }
236        }
237    }
238
239    // Limit prefetch to avoid excessive fan-out.
240    let max_prefetch = 8;
241    prefetch.into_iter().take(max_prefetch).collect()
242}
243
244/// Deterministic hash for partition key splitting.
245fn partition_hash(key: &str) -> u64 {
246    crate::routing::fnv1a_hash(key)
247}
248
249#[cfg(test)]
250mod tests {
251    use super::*;
252
253    #[test]
254    fn vector_split_even() {
255        let docs: Vec<String> = (0..100).map(|i| format!("doc_{i}")).collect();
256        let plan = plan_vector_split(10, 20, 3, &docs);
257        assert_eq!(plan.source_vshard, 10);
258        assert_eq!(plan.new_vshard, 20);
259        assert_eq!(plan.documents_to_move.len(), 50);
260        assert_eq!(plan.strategy, SplitStrategy::VectorPartitionKey);
261    }
262
263    #[test]
264    fn graph_split_minimizes_cross_edges() {
265        // Chain: a→b→c→d→e. Splitting at midpoint should produce
266        // fewer cross-edges than random split.
267        let nodes: Vec<String> = vec!["a", "b", "c", "d", "e"]
268            .into_iter()
269            .map(|s| s.to_string())
270            .collect();
271        let edges: Vec<(String, String)> = vec![
272            ("a".into(), "b".into()),
273            ("b".into(), "c".into()),
274            ("c".into(), "d".into()),
275            ("d".into(), "e".into()),
276        ];
277        let plan = plan_graph_split(10, 20, 3, &nodes, &edges);
278        // 5 nodes split: mid = 5/2 = 2, so 3 docs move (5 - 2).
279        assert_eq!(plan.documents_to_move.len(), 3);
280        assert_eq!(plan.strategy, SplitStrategy::GraphCommunity);
281    }
282
283    #[test]
284    fn graph_split_empty() {
285        let plan = plan_graph_split(10, 20, 3, &[], &[]);
286        assert!(plan.documents_to_move.is_empty());
287    }
288
289    #[test]
290    fn speculative_prefetch_limits() {
291        let routing = RoutingTable::uniform(4, &[1, 2], 1);
292        let prefetch = speculative_prefetch_shards(&[0, 1], &routing, &[(1, "users".into())]);
293        assert!(prefetch.len() <= 8); // Max prefetch limit.
294    }
295
296    #[test]
297    fn partition_hash_deterministic() {
298        let h1 = partition_hash("document_42");
299        let h2 = partition_hash("document_42");
300        assert_eq!(h1, h2);
301    }
302}