use std::collections::{HashMap, HashSet};
use serde::{Deserialize, Serialize};
use tracing::info;
use crate::routing::RoutingTable;
#[derive(Debug, Clone)]
pub struct SplitPlan {
pub source_vshard: u16,
pub new_vshard: u16,
pub target_node: u64,
pub documents_to_move: Vec<String>,
pub strategy: SplitStrategy,
pub estimated_bytes: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub enum SplitStrategy {
VectorPartitionKey,
GraphCommunity,
EvenHash,
}
pub fn plan_vector_split(
source_vshard: u16,
new_vshard: u16,
target_node: u64,
document_ids: &[String],
) -> SplitPlan {
let mid = document_ids.len() / 2;
let mut sorted: Vec<(u64, String)> = document_ids
.iter()
.map(|id| (partition_hash(id), id.clone()))
.collect();
sorted.sort_by_key(|(hash, _)| *hash);
let documents_to_move: Vec<String> = sorted[mid..].iter().map(|(_, id)| id.clone()).collect();
info!(
source_vshard,
new_vshard,
target_node,
docs_moving = documents_to_move.len(),
docs_staying = mid,
"vector-aware split planned"
);
SplitPlan {
source_vshard,
new_vshard,
target_node,
documents_to_move,
strategy: SplitStrategy::VectorPartitionKey,
estimated_bytes: 0, }
}
pub fn plan_graph_split(
source_vshard: u16,
new_vshard: u16,
target_node: u64,
node_ids: &[String],
edges: &[(String, String)],
) -> SplitPlan {
if node_ids.is_empty() {
return SplitPlan {
source_vshard,
new_vshard,
target_node,
documents_to_move: Vec::new(),
strategy: SplitStrategy::GraphCommunity,
estimated_bytes: 0,
};
}
let mut adj: HashMap<&str, Vec<&str>> = HashMap::new();
for (src, dst) in edges {
adj.entry(src.as_str()).or_default().push(dst.as_str());
adj.entry(dst.as_str()).or_default().push(src.as_str());
}
let mut visited: Vec<String> = Vec::with_capacity(node_ids.len());
let mut seen: HashSet<&str> = HashSet::new();
let mut queue: std::collections::VecDeque<&str> = std::collections::VecDeque::new();
let start = node_ids
.iter()
.max_by_key(|id| adj.get(id.as_str()).map(|v| v.len()).unwrap_or(0))
.map(|s| s.as_str())
.unwrap_or(node_ids[0].as_str());
queue.push_back(start);
seen.insert(start);
while let Some(node) = queue.pop_front() {
visited.push(node.to_string());
if let Some(neighbors) = adj.get(node) {
for &neighbor in neighbors {
if seen.insert(neighbor) {
queue.push_back(neighbor);
}
}
}
}
for id in node_ids {
if seen.insert(id.as_str()) {
visited.push(id.clone());
}
}
let mid = visited.len() / 2;
let documents_to_move: Vec<String> = visited[mid..].to_vec();
let group_a: HashSet<&str> = visited[..mid].iter().map(|s| s.as_str()).collect();
let cross_edges = edges
.iter()
.filter(|(src, dst)| {
let a_has_src = group_a.contains(src.as_str());
let a_has_dst = group_a.contains(dst.as_str());
a_has_src != a_has_dst
})
.count();
info!(
source_vshard,
new_vshard,
total_nodes = node_ids.len(),
cross_edges,
"graph-aware split planned (BFS community)"
);
SplitPlan {
source_vshard,
new_vshard,
target_node,
documents_to_move,
strategy: SplitStrategy::GraphCommunity,
estimated_bytes: 0,
}
}
pub fn speculative_prefetch_shards(
query_vshards: &[u16],
_routing: &RoutingTable,
tenant_collections: &[(u32, String)],
) -> Vec<u16> {
let mut prefetch: HashSet<u16> = HashSet::new();
let queried: HashSet<u16> = query_vshards.iter().copied().collect();
for (_tenant_id, collection) in tenant_collections {
let primary = crate::routing::vshard_for_collection(collection);
for offset in [1u16, 2] {
let adjacent_low = primary.wrapping_sub(offset);
let adjacent_high = primary.wrapping_add(offset);
if !queried.contains(&adjacent_low) {
prefetch.insert(adjacent_low);
}
if !queried.contains(&adjacent_high) {
prefetch.insert(adjacent_high);
}
}
}
let max_prefetch = 8;
prefetch.into_iter().take(max_prefetch).collect()
}
fn partition_hash(key: &str) -> u64 {
crate::routing::fnv1a_hash(key)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn vector_split_even() {
let docs: Vec<String> = (0..100).map(|i| format!("doc_{i}")).collect();
let plan = plan_vector_split(10, 20, 3, &docs);
assert_eq!(plan.source_vshard, 10);
assert_eq!(plan.new_vshard, 20);
assert_eq!(plan.documents_to_move.len(), 50);
assert_eq!(plan.strategy, SplitStrategy::VectorPartitionKey);
}
#[test]
fn graph_split_minimizes_cross_edges() {
let nodes: Vec<String> = vec!["a", "b", "c", "d", "e"]
.into_iter()
.map(|s| s.to_string())
.collect();
let edges: Vec<(String, String)> = vec![
("a".into(), "b".into()),
("b".into(), "c".into()),
("c".into(), "d".into()),
("d".into(), "e".into()),
];
let plan = plan_graph_split(10, 20, 3, &nodes, &edges);
assert_eq!(plan.documents_to_move.len(), 3);
assert_eq!(plan.strategy, SplitStrategy::GraphCommunity);
}
#[test]
fn graph_split_empty() {
let plan = plan_graph_split(10, 20, 3, &[], &[]);
assert!(plan.documents_to_move.is_empty());
}
#[test]
fn speculative_prefetch_limits() {
let routing = RoutingTable::uniform(4, &[1, 2], 1);
let prefetch = speculative_prefetch_shards(&[0, 1], &routing, &[(1, "users".into())]);
assert!(prefetch.len() <= 8); }
#[test]
fn partition_hash_deterministic() {
let h1 = partition_hash("document_42");
let h2 = partition_hash("document_42");
assert_eq!(h1, h2);
}
}