1use std::collections::{HashMap, HashSet};
21
22use serde::{Deserialize, Serialize};
23use tracing::info;
24
25use crate::routing::RoutingTable;
26
27#[derive(Debug, Clone)]
29pub struct SplitPlan {
30 pub source_vshard: u16,
32 pub new_vshard: u16,
34 pub target_node: u64,
36 pub documents_to_move: Vec<String>,
38 pub strategy: SplitStrategy,
40 pub estimated_bytes: u64,
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
46pub enum SplitStrategy {
47 VectorPartitionKey,
50 GraphCommunity,
53 EvenHash,
55}
56
57pub fn plan_vector_split(
67 source_vshard: u16,
68 new_vshard: u16,
69 target_node: u64,
70 document_ids: &[String],
71) -> SplitPlan {
72 let mid = document_ids.len() / 2;
73
74 let mut sorted: Vec<(u64, String)> = document_ids
76 .iter()
77 .map(|id| (partition_hash(id), id.clone()))
78 .collect();
79 sorted.sort_by_key(|(hash, _)| *hash);
80
81 let documents_to_move: Vec<String> = sorted[mid..].iter().map(|(_, id)| id.clone()).collect();
82
83 info!(
84 source_vshard,
85 new_vshard,
86 target_node,
87 docs_moving = documents_to_move.len(),
88 docs_staying = mid,
89 "vector-aware split planned"
90 );
91
92 SplitPlan {
93 source_vshard,
94 new_vshard,
95 target_node,
96 documents_to_move,
97 strategy: SplitStrategy::VectorPartitionKey,
98 estimated_bytes: 0, }
100}
101
102pub fn plan_graph_split(
111 source_vshard: u16,
112 new_vshard: u16,
113 target_node: u64,
114 node_ids: &[String],
115 edges: &[(String, String)],
116) -> SplitPlan {
117 if node_ids.is_empty() {
118 return SplitPlan {
119 source_vshard,
120 new_vshard,
121 target_node,
122 documents_to_move: Vec::new(),
123 strategy: SplitStrategy::GraphCommunity,
124 estimated_bytes: 0,
125 };
126 }
127
128 let mut adj: HashMap<&str, Vec<&str>> = HashMap::new();
130 for (src, dst) in edges {
131 adj.entry(src.as_str()).or_default().push(dst.as_str());
132 adj.entry(dst.as_str()).or_default().push(src.as_str());
133 }
134
135 let mut visited: Vec<String> = Vec::with_capacity(node_ids.len());
137 let mut seen: HashSet<&str> = HashSet::new();
138 let mut queue: std::collections::VecDeque<&str> = std::collections::VecDeque::new();
139
140 let start = node_ids
142 .iter()
143 .max_by_key(|id| adj.get(id.as_str()).map(|v| v.len()).unwrap_or(0))
144 .map(|s| s.as_str())
145 .unwrap_or(node_ids[0].as_str());
146
147 queue.push_back(start);
148 seen.insert(start);
149
150 while let Some(node) = queue.pop_front() {
151 visited.push(node.to_string());
152 if let Some(neighbors) = adj.get(node) {
153 for &neighbor in neighbors {
154 if seen.insert(neighbor) {
155 queue.push_back(neighbor);
156 }
157 }
158 }
159 }
160
161 for id in node_ids {
163 if seen.insert(id.as_str()) {
164 visited.push(id.clone());
165 }
166 }
167
168 let mid = visited.len() / 2;
170 let documents_to_move: Vec<String> = visited[mid..].to_vec();
171
172 let group_a: HashSet<&str> = visited[..mid].iter().map(|s| s.as_str()).collect();
174 let cross_edges = edges
175 .iter()
176 .filter(|(src, dst)| {
177 let a_has_src = group_a.contains(src.as_str());
178 let a_has_dst = group_a.contains(dst.as_str());
179 a_has_src != a_has_dst
180 })
181 .count();
182
183 info!(
184 source_vshard,
185 new_vshard,
186 total_nodes = node_ids.len(),
187 cross_edges,
188 "graph-aware split planned (BFS community)"
189 );
190
191 SplitPlan {
192 source_vshard,
193 new_vshard,
194 target_node,
195 documents_to_move,
196 strategy: SplitStrategy::GraphCommunity,
197 estimated_bytes: 0,
198 }
199}
200
201pub fn speculative_prefetch_shards(
212 query_vshards: &[u16],
213 _routing: &RoutingTable,
214 tenant_collections: &[(u32, String)],
215) -> Vec<u16> {
216 let mut prefetch: HashSet<u16> = HashSet::new();
217 let queried: HashSet<u16> = query_vshards.iter().copied().collect();
218
219 for (_tenant_id, collection) in tenant_collections {
222 let primary = crate::routing::vshard_for_collection(collection);
224
225 for offset in [1u16, 2] {
228 let adjacent_low = primary.wrapping_sub(offset);
229 let adjacent_high = primary.wrapping_add(offset);
230 if !queried.contains(&adjacent_low) {
231 prefetch.insert(adjacent_low);
232 }
233 if !queried.contains(&adjacent_high) {
234 prefetch.insert(adjacent_high);
235 }
236 }
237 }
238
239 let max_prefetch = 8;
241 prefetch.into_iter().take(max_prefetch).collect()
242}
243
244fn partition_hash(key: &str) -> u64 {
246 crate::routing::fnv1a_hash(key)
247}
248
249#[cfg(test)]
250mod tests {
251 use super::*;
252
253 #[test]
254 fn vector_split_even() {
255 let docs: Vec<String> = (0..100).map(|i| format!("doc_{i}")).collect();
256 let plan = plan_vector_split(10, 20, 3, &docs);
257 assert_eq!(plan.source_vshard, 10);
258 assert_eq!(plan.new_vshard, 20);
259 assert_eq!(plan.documents_to_move.len(), 50);
260 assert_eq!(plan.strategy, SplitStrategy::VectorPartitionKey);
261 }
262
263 #[test]
264 fn graph_split_minimizes_cross_edges() {
265 let nodes: Vec<String> = vec!["a", "b", "c", "d", "e"]
268 .into_iter()
269 .map(|s| s.to_string())
270 .collect();
271 let edges: Vec<(String, String)> = vec![
272 ("a".into(), "b".into()),
273 ("b".into(), "c".into()),
274 ("c".into(), "d".into()),
275 ("d".into(), "e".into()),
276 ];
277 let plan = plan_graph_split(10, 20, 3, &nodes, &edges);
278 assert_eq!(plan.documents_to_move.len(), 3);
280 assert_eq!(plan.strategy, SplitStrategy::GraphCommunity);
281 }
282
283 #[test]
284 fn graph_split_empty() {
285 let plan = plan_graph_split(10, 20, 3, &[], &[]);
286 assert!(plan.documents_to_move.is_empty());
287 }
288
289 #[test]
290 fn speculative_prefetch_limits() {
291 let routing = RoutingTable::uniform(4, &[1, 2], 1);
292 let prefetch = speculative_prefetch_shards(&[0, 1], &routing, &[(1, "users".into())]);
293 assert!(prefetch.len() <= 8); }
295
296 #[test]
297 fn partition_hash_deterministic() {
298 let h1 = partition_hash("document_42");
299 let h2 = partition_hash("document_42");
300 assert_eq!(h1, h2);
301 }
302}