Skip to main content

nodedb_cluster/
distributed_join.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Distributed join execution: broadcast and shuffle joins across cluster nodes.
4//!
5//! **Broadcast join**: Control Plane serializes the small side (< 8 MiB),
6//! sends it to all relevant nodes via QUIC transport. Each node performs
7//! a local hash join with its local large-side data.
8//!
9//! **Shuffle join**: each node scans its local data, hashes on the join key,
10//! routes rows to the owning node via QUIC transport. The target node
11//! performs a local hash join on the repartitioned data.
12//!
13//! Both strategies use Arrow IPC for zero-copy batched data movement.
14
15use std::collections::HashMap;
16
17use serde::{Deserialize, Serialize};
18use tracing::debug;
19
20use crate::routing::RoutingTable;
21
22/// Default maximum payload size for broadcast join strategy selection.
23///
24/// Above this threshold, shuffle join is preferred over broadcast.
25/// Corresponds to `ClusterTransportTuning::broadcast_threshold_bytes`.
26pub const DEFAULT_BROADCAST_THRESHOLD_BYTES: usize = 8 * 1024 * 1024; // 8 MiB
27
28/// A broadcast join request sent to each participating node.
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct BroadcastJoinRequest {
31    /// The small-side data serialized as MessagePack `Vec<(doc_id, doc_bytes)>`.
32    pub broadcast_data: Vec<u8>,
33    /// The large-side collection to scan locally.
34    pub large_collection: String,
35    /// Join keys: `[(large_field, small_field)]`.
36    pub on_keys: Vec<(String, String)>,
37    /// Join type.
38    pub join_type: String,
39    /// Result limit per node.
40    pub limit: usize,
41    /// Tenant scope.
42    pub tenant_id: u64,
43}
44
45/// A shuffle join partition assignment.
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct ShufflePartition {
48    /// Rows assigned to this partition, serialized as Arrow IPC bytes.
49    pub data: Vec<u8>,
50    /// Which join side this partition belongs to.
51    pub side: JoinSide,
52    /// Target node that owns this partition.
53    pub target_node: u64,
54    /// Join key hash that determined this partition.
55    pub partition_id: u32,
56}
57
58#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
59pub enum JoinSide {
60    Left,
61    Right,
62}
63
64/// Determine the join strategy based on estimated data sizes.
65///
66/// Returns `Broadcast` if the smaller side fits within `broadcast_threshold_bytes`,
67/// otherwise `Shuffle`. Pass `DEFAULT_BROADCAST_THRESHOLD_BYTES` when no runtime
68/// config is available.
69pub fn select_strategy(
70    left_estimated_bytes: usize,
71    right_estimated_bytes: usize,
72    broadcast_threshold_bytes: usize,
73) -> JoinStrategy {
74    let (smaller, _larger) = if left_estimated_bytes <= right_estimated_bytes {
75        (left_estimated_bytes, right_estimated_bytes)
76    } else {
77        (right_estimated_bytes, left_estimated_bytes)
78    };
79
80    if smaller <= broadcast_threshold_bytes {
81        JoinStrategy::Broadcast {
82            broadcast_side: if left_estimated_bytes <= right_estimated_bytes {
83                JoinSide::Left
84            } else {
85                JoinSide::Right
86            },
87        }
88    } else {
89        JoinStrategy::Shuffle
90    }
91}
92
93/// Selected distributed join strategy.
94#[derive(Debug, Clone, Copy, PartialEq)]
95pub enum JoinStrategy {
96    /// Broadcast the small side to all nodes.
97    Broadcast { broadcast_side: JoinSide },
98    /// Shuffle both sides by join key hash.
99    Shuffle,
100}
101
102/// Compute which node owns a given partition (based on join key hash).
103///
104/// Uses consistent hashing: `partition = hash(key) % num_nodes`.
105/// The target node is selected from the routing table's active leaders.
106pub fn partition_for_key(key: &str, num_partitions: usize) -> u32 {
107    (crate::routing::fnv1a_hash(key) % num_partitions as u64) as u32
108}
109
110/// Plan the node assignments for a shuffle join.
111///
112/// Returns `(partition_id → target_node_id)` mapping.
113///
114/// Only data groups (group_id > METADATA_GROUP_ID) are used as shuffle
115/// targets. The metadata group (0) is excluded — it does not own any vShards
116/// and its leader should not receive data-plane shuffle traffic.
117pub fn plan_shuffle_partitions(routing: &RoutingTable, num_partitions: usize) -> HashMap<u32, u64> {
118    use crate::metadata_group::METADATA_GROUP_ID;
119    let mut group_ids: Vec<u64> = routing
120        .group_ids()
121        .into_iter()
122        .filter(|&id| id != METADATA_GROUP_ID)
123        .collect();
124    group_ids.sort_unstable();
125    let mut partition_map = HashMap::new();
126
127    if group_ids.is_empty() {
128        return partition_map;
129    }
130
131    for p in 0..num_partitions {
132        let group_idx = p % group_ids.len();
133        let group_id = group_ids[group_idx];
134        let leader = routing.group_info(group_id).map(|g| g.leader).unwrap_or(0);
135        partition_map.insert(p as u32, leader);
136    }
137
138    debug!(
139        num_partitions,
140        num_groups = group_ids.len(),
141        "shuffle partition plan computed"
142    );
143    partition_map
144}
145
146/// Estimate the serialized size of a collection's data.
147///
148/// This is a rough estimate based on document count × average document size.
149/// Used for broadcast/shuffle strategy selection.
150pub fn estimate_collection_bytes(doc_count: usize, avg_doc_bytes: usize) -> usize {
151    doc_count * avg_doc_bytes
152}
153
154#[cfg(test)]
155mod tests {
156    use super::*;
157
158    #[test]
159    fn broadcast_selected_for_small_side() {
160        let strategy = select_strategy(1_000, 100_000_000, DEFAULT_BROADCAST_THRESHOLD_BYTES);
161        assert!(matches!(
162            strategy,
163            JoinStrategy::Broadcast {
164                broadcast_side: JoinSide::Left
165            }
166        ));
167    }
168
169    #[test]
170    fn shuffle_selected_for_large_sides() {
171        let strategy = select_strategy(100_000_000, 200_000_000, DEFAULT_BROADCAST_THRESHOLD_BYTES);
172        assert_eq!(strategy, JoinStrategy::Shuffle);
173    }
174
175    #[test]
176    fn partition_deterministic() {
177        let p1 = partition_for_key("alice", 16);
178        let p2 = partition_for_key("alice", 16);
179        assert_eq!(p1, p2);
180
181        // Different keys should (usually) get different partitions.
182        let p3 = partition_for_key("bob", 16);
183        // Not guaranteed different, but statistically likely with 16 partitions.
184        let _ = p3;
185    }
186
187    #[test]
188    fn shuffle_plan_covers_all_partitions() {
189        let routing = RoutingTable::uniform(4, &[1, 2, 3], 2);
190        let plan = plan_shuffle_partitions(&routing, 8);
191        assert_eq!(plan.len(), 8);
192        // All partitions should be assigned.
193        for p in 0..8u32 {
194            assert!(plan.contains_key(&p));
195        }
196    }
197
198    #[test]
199    fn broadcast_threshold() {
200        // Exactly at threshold → broadcast.
201        let strategy = select_strategy(
202            DEFAULT_BROADCAST_THRESHOLD_BYTES,
203            100_000_000,
204            DEFAULT_BROADCAST_THRESHOLD_BYTES,
205        );
206        assert!(matches!(strategy, JoinStrategy::Broadcast { .. }));
207
208        // Over threshold → shuffle.
209        let strategy = select_strategy(
210            DEFAULT_BROADCAST_THRESHOLD_BYTES + 1,
211            100_000_000,
212            DEFAULT_BROADCAST_THRESHOLD_BYTES,
213        );
214        assert_eq!(strategy, JoinStrategy::Shuffle);
215    }
216}