1use std::collections::{HashMap, HashSet};
23
24use serde::{Deserialize, Serialize};
25use tracing::info;
26
27use crate::routing::RoutingTable;
28
29#[derive(Debug, Clone)]
31pub struct SplitPlan {
32 pub source_vshard: u32,
34 pub new_vshard: u32,
36 pub target_node: u64,
38 pub documents_to_move: Vec<String>,
40 pub strategy: SplitStrategy,
42 pub estimated_bytes: u64,
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
48pub enum SplitStrategy {
49 VectorPartitionKey,
52 GraphCommunity,
55 EvenHash,
57}
58
59pub fn plan_vector_split(
69 source_vshard: u32,
70 new_vshard: u32,
71 target_node: u64,
72 document_ids: &[String],
73) -> SplitPlan {
74 let mid = document_ids.len() / 2;
75
76 let mut sorted: Vec<(u64, String)> = document_ids
78 .iter()
79 .map(|id| (partition_hash(id), id.clone()))
80 .collect();
81 sorted.sort_by_key(|(hash, _)| *hash);
82
83 let documents_to_move: Vec<String> = sorted[mid..].iter().map(|(_, id)| id.clone()).collect();
84
85 info!(
86 source_vshard,
87 new_vshard,
88 target_node,
89 docs_moving = documents_to_move.len(),
90 docs_staying = mid,
91 "vector-aware split planned"
92 );
93
94 SplitPlan {
95 source_vshard,
96 new_vshard,
97 target_node,
98 documents_to_move,
99 strategy: SplitStrategy::VectorPartitionKey,
100 estimated_bytes: 0, }
102}
103
104pub fn plan_graph_split(
113 source_vshard: u32,
114 new_vshard: u32,
115 target_node: u64,
116 node_ids: &[String],
117 edges: &[(String, String)],
118) -> SplitPlan {
119 if node_ids.is_empty() {
120 return SplitPlan {
121 source_vshard,
122 new_vshard,
123 target_node,
124 documents_to_move: Vec::new(),
125 strategy: SplitStrategy::GraphCommunity,
126 estimated_bytes: 0,
127 };
128 }
129
130 let mut adj: HashMap<&str, Vec<&str>> = HashMap::new();
132 for (src, dst) in edges {
133 adj.entry(src.as_str()).or_default().push(dst.as_str());
134 adj.entry(dst.as_str()).or_default().push(src.as_str());
135 }
136
137 let mut visited: Vec<String> = Vec::with_capacity(node_ids.len());
139 let mut seen: HashSet<&str> = HashSet::new();
140 let mut queue: std::collections::VecDeque<&str> = std::collections::VecDeque::new();
141
142 let start = node_ids
144 .iter()
145 .max_by_key(|id| adj.get(id.as_str()).map(|v| v.len()).unwrap_or(0))
146 .map(|s| s.as_str())
147 .unwrap_or(node_ids[0].as_str());
148
149 queue.push_back(start);
150 seen.insert(start);
151
152 while let Some(node) = queue.pop_front() {
153 visited.push(node.to_string());
154 if let Some(neighbors) = adj.get(node) {
155 for &neighbor in neighbors {
156 if seen.insert(neighbor) {
157 queue.push_back(neighbor);
158 }
159 }
160 }
161 }
162
163 for id in node_ids {
165 if seen.insert(id.as_str()) {
166 visited.push(id.clone());
167 }
168 }
169
170 let mid = visited.len() / 2;
172 let documents_to_move: Vec<String> = visited[mid..].to_vec();
173
174 let group_a: HashSet<&str> = visited[..mid].iter().map(|s| s.as_str()).collect();
176 let cross_edges = edges
177 .iter()
178 .filter(|(src, dst)| {
179 let a_has_src = group_a.contains(src.as_str());
180 let a_has_dst = group_a.contains(dst.as_str());
181 a_has_src != a_has_dst
182 })
183 .count();
184
185 info!(
186 source_vshard,
187 new_vshard,
188 total_nodes = node_ids.len(),
189 cross_edges,
190 "graph-aware split planned (BFS community)"
191 );
192
193 SplitPlan {
194 source_vshard,
195 new_vshard,
196 target_node,
197 documents_to_move,
198 strategy: SplitStrategy::GraphCommunity,
199 estimated_bytes: 0,
200 }
201}
202
203pub fn speculative_prefetch_shards(
214 query_vshards: &[u32],
215 _routing: &RoutingTable,
216 tenant_collections: &[(nodedb_types::id::DatabaseId, u32, String)],
217) -> Vec<u32> {
218 let mut prefetch: HashSet<u32> = HashSet::new();
219 let queried: HashSet<u32> = query_vshards.iter().copied().collect();
220
221 for (database_id, _tenant_id, collection) in tenant_collections {
224 let primary = crate::routing::vshard_for_collection(*database_id, collection);
226
227 for offset in [1u32, 2] {
230 let adjacent_low = primary.wrapping_sub(offset);
231 let adjacent_high = primary.wrapping_add(offset);
232 if !queried.contains(&adjacent_low) {
233 prefetch.insert(adjacent_low);
234 }
235 if !queried.contains(&adjacent_high) {
236 prefetch.insert(adjacent_high);
237 }
238 }
239 }
240
241 let max_prefetch = 8;
243 prefetch.into_iter().take(max_prefetch).collect()
244}
245
246fn partition_hash(key: &str) -> u64 {
248 crate::routing::fnv1a_hash(key)
249}
250
251#[cfg(test)]
252mod tests {
253 use super::*;
254
255 #[test]
256 fn vector_split_even() {
257 let docs: Vec<String> = (0..100).map(|i| format!("doc_{i}")).collect();
258 let plan = plan_vector_split(10, 20, 3, &docs);
259 assert_eq!(plan.source_vshard, 10);
260 assert_eq!(plan.new_vshard, 20);
261 assert_eq!(plan.documents_to_move.len(), 50);
262 assert_eq!(plan.strategy, SplitStrategy::VectorPartitionKey);
263 }
264
265 #[test]
266 fn graph_split_minimizes_cross_edges() {
267 let nodes: Vec<String> = vec!["a", "b", "c", "d", "e"]
270 .into_iter()
271 .map(|s| s.to_string())
272 .collect();
273 let edges: Vec<(String, String)> = vec![
274 ("a".into(), "b".into()),
275 ("b".into(), "c".into()),
276 ("c".into(), "d".into()),
277 ("d".into(), "e".into()),
278 ];
279 let plan = plan_graph_split(10, 20, 3, &nodes, &edges);
280 assert_eq!(plan.documents_to_move.len(), 3);
282 assert_eq!(plan.strategy, SplitStrategy::GraphCommunity);
283 }
284
285 #[test]
286 fn graph_split_empty() {
287 let plan = plan_graph_split(10, 20, 3, &[], &[]);
288 assert!(plan.documents_to_move.is_empty());
289 }
290
291 #[test]
292 fn speculative_prefetch_limits() {
293 let routing = RoutingTable::uniform(4, &[1, 2], 1);
294 let prefetch = speculative_prefetch_shards(
295 &[0, 1],
296 &routing,
297 &[(nodedb_types::id::DatabaseId::DEFAULT, 1, "users".into())],
298 );
299 assert!(prefetch.len() <= 8); }
301
302 #[test]
303 fn partition_hash_deterministic() {
304 let h1 = partition_hash("document_42");
305 let h2 = partition_hash("document_42");
306 assert_eq!(h1, h2);
307 }
308}