nodedb_cluster/
distributed_join.rs1use std::collections::HashMap;
16
17use serde::{Deserialize, Serialize};
18use tracing::debug;
19
20use crate::routing::RoutingTable;
21
22pub const DEFAULT_BROADCAST_THRESHOLD_BYTES: usize = 8 * 1024 * 1024; #[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct BroadcastJoinRequest {
31 pub broadcast_data: Vec<u8>,
33 pub large_collection: String,
35 pub on_keys: Vec<(String, String)>,
37 pub join_type: String,
39 pub limit: usize,
41 pub tenant_id: u64,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct ShufflePartition {
48 pub data: Vec<u8>,
50 pub side: JoinSide,
52 pub target_node: u64,
54 pub partition_id: u32,
56}
57
58#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
59pub enum JoinSide {
60 Left,
61 Right,
62}
63
64pub fn select_strategy(
70 left_estimated_bytes: usize,
71 right_estimated_bytes: usize,
72 broadcast_threshold_bytes: usize,
73) -> JoinStrategy {
74 let (smaller, _larger) = if left_estimated_bytes <= right_estimated_bytes {
75 (left_estimated_bytes, right_estimated_bytes)
76 } else {
77 (right_estimated_bytes, left_estimated_bytes)
78 };
79
80 if smaller <= broadcast_threshold_bytes {
81 JoinStrategy::Broadcast {
82 broadcast_side: if left_estimated_bytes <= right_estimated_bytes {
83 JoinSide::Left
84 } else {
85 JoinSide::Right
86 },
87 }
88 } else {
89 JoinStrategy::Shuffle
90 }
91}
92
93#[derive(Debug, Clone, Copy, PartialEq)]
95pub enum JoinStrategy {
96 Broadcast { broadcast_side: JoinSide },
98 Shuffle,
100}
101
102pub fn partition_for_key(key: &str, num_partitions: usize) -> u32 {
107 (crate::routing::fnv1a_hash(key) % num_partitions as u64) as u32
108}
109
110pub fn plan_shuffle_partitions(routing: &RoutingTable, num_partitions: usize) -> HashMap<u32, u64> {
118 use crate::metadata_group::METADATA_GROUP_ID;
119 let mut group_ids: Vec<u64> = routing
120 .group_ids()
121 .into_iter()
122 .filter(|&id| id != METADATA_GROUP_ID)
123 .collect();
124 group_ids.sort_unstable();
125 let mut partition_map = HashMap::new();
126
127 if group_ids.is_empty() {
128 return partition_map;
129 }
130
131 for p in 0..num_partitions {
132 let group_idx = p % group_ids.len();
133 let group_id = group_ids[group_idx];
134 let leader = routing.group_info(group_id).map(|g| g.leader).unwrap_or(0);
135 partition_map.insert(p as u32, leader);
136 }
137
138 debug!(
139 num_partitions,
140 num_groups = group_ids.len(),
141 "shuffle partition plan computed"
142 );
143 partition_map
144}
145
146pub fn estimate_collection_bytes(doc_count: usize, avg_doc_bytes: usize) -> usize {
151 doc_count * avg_doc_bytes
152}
153
154#[cfg(test)]
155mod tests {
156 use super::*;
157
158 #[test]
159 fn broadcast_selected_for_small_side() {
160 let strategy = select_strategy(1_000, 100_000_000, DEFAULT_BROADCAST_THRESHOLD_BYTES);
161 assert!(matches!(
162 strategy,
163 JoinStrategy::Broadcast {
164 broadcast_side: JoinSide::Left
165 }
166 ));
167 }
168
169 #[test]
170 fn shuffle_selected_for_large_sides() {
171 let strategy = select_strategy(100_000_000, 200_000_000, DEFAULT_BROADCAST_THRESHOLD_BYTES);
172 assert_eq!(strategy, JoinStrategy::Shuffle);
173 }
174
175 #[test]
176 fn partition_deterministic() {
177 let p1 = partition_for_key("alice", 16);
178 let p2 = partition_for_key("alice", 16);
179 assert_eq!(p1, p2);
180
181 let p3 = partition_for_key("bob", 16);
183 let _ = p3;
185 }
186
187 #[test]
188 fn shuffle_plan_covers_all_partitions() {
189 let routing = RoutingTable::uniform(4, &[1, 2, 3], 2);
190 let plan = plan_shuffle_partitions(&routing, 8);
191 assert_eq!(plan.len(), 8);
192 for p in 0..8u32 {
194 assert!(plan.contains_key(&p));
195 }
196 }
197
198 #[test]
199 fn broadcast_threshold() {
200 let strategy = select_strategy(
202 DEFAULT_BROADCAST_THRESHOLD_BYTES,
203 100_000_000,
204 DEFAULT_BROADCAST_THRESHOLD_BYTES,
205 );
206 assert!(matches!(strategy, JoinStrategy::Broadcast { .. }));
207
208 let strategy = select_strategy(
210 DEFAULT_BROADCAST_THRESHOLD_BYTES + 1,
211 100_000_000,
212 DEFAULT_BROADCAST_THRESHOLD_BYTES,
213 );
214 assert_eq!(strategy, JoinStrategy::Shuffle);
215 }
216}