llkv_join/
lib.rs

1//! High-level join planning API that wraps hash join execution.
2//!
3//! This crate exposes shared types (`JoinKey`, `JoinType`, `JoinOptions`) used by the
4//! planner and runtime to negotiate join configuration. Execution currently routes
5//! through the hash join implementation in [`hash_join_stream`], with a placeholder for
6//! alternate algorithms when they land.
7#![forbid(unsafe_code)]
8
9mod hash_join;
10
11use arrow::array::RecordBatch;
12use llkv_result::{Error, Result as LlkvResult};
13use llkv_storage::pager::Pager;
14use llkv_table::table::Table;
15use llkv_table::types::FieldId;
16use simd_r_drive_entry_handle::EntryHandle;
17use std::fmt;
18
19pub use hash_join::hash_join_stream;
20
21/// Type of join to perform.
22#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
23pub enum JoinType {
24    /// Emit only matching row pairs.
25    Inner,
26    /// Emit all left rows; unmatched left rows have NULL right columns.
27    Left,
28    /// Emit all right rows; unmatched right rows have NULL left columns.
29    Right,
30    /// Emit all rows from both sides; unmatched rows have NULLs.
31    Full,
32    /// Emit left rows that have at least one match (no right columns).
33    Semi,
34    /// Emit left rows that have no match (no right columns).
35    Anti,
36}
37
38impl fmt::Display for JoinType {
39    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40        match self {
41            JoinType::Inner => write!(f, "INNER"),
42            JoinType::Left => write!(f, "LEFT"),
43            JoinType::Right => write!(f, "RIGHT"),
44            JoinType::Full => write!(f, "FULL"),
45            JoinType::Semi => write!(f, "SEMI"),
46            JoinType::Anti => write!(f, "ANTI"),
47        }
48    }
49}
50
51/// Join key pair describing which columns to equate.
52#[derive(Clone, Debug, PartialEq, Eq)]
53pub struct JoinKey {
54    /// Field ID from the left table.
55    pub left_field: FieldId,
56    /// Field ID from the right table.
57    pub right_field: FieldId,
58    /// If true, NULL == NULL for this key (SQL-style NULL-safe equality).
59    /// If false, NULL != NULL (Arrow default).
60    pub null_equals_null: bool,
61}
62
63impl JoinKey {
64    /// Create a join key with standard Arrow null semantics (NULL != NULL).
65    pub fn new(left_field: FieldId, right_field: FieldId) -> Self {
66        Self {
67            left_field,
68            right_field,
69            null_equals_null: false,
70        }
71    }
72
73    /// Create a join key with SQL-style NULL-safe equality (NULL == NULL).
74    pub fn null_safe(left_field: FieldId, right_field: FieldId) -> Self {
75        Self {
76            left_field,
77            right_field,
78            null_equals_null: true,
79        }
80    }
81}
82
83/// Algorithm to use for join execution.
84#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)]
85pub enum JoinAlgorithm {
86    /// Hash join: build hash table on one side, probe with other.
87    /// O(N+M) complexity - suitable for production workloads.
88    /// Default and recommended for all equality joins.
89    #[default]
90    Hash,
91    /// Sort-merge join: sort both sides, then merge.
92    /// Good for pre-sorted inputs or when memory is constrained.
93    /// Not yet implemented.
94    SortMerge,
95}
96
97impl fmt::Display for JoinAlgorithm {
98    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99        match self {
100            JoinAlgorithm::Hash => write!(f, "Hash"),
101            JoinAlgorithm::SortMerge => write!(f, "SortMerge"),
102        }
103    }
104}
105
106/// Options controlling join execution.
107#[derive(Clone, Debug)]
108pub struct JoinOptions {
109    /// Type of join to perform.
110    pub join_type: JoinType,
111    /// Algorithm to use. Planner may override this based on table sizes.
112    pub algorithm: JoinAlgorithm,
113    /// Target number of probe rows per output `RecordBatch`.
114    /// Larger batches reduce per-batch overhead (fewer Arrow gathers) at the
115    /// cost of increased peak memory; smaller batches improve latency.
116    pub batch_size: usize,
117    /// Memory limit in bytes for hash table (hash join only).
118    /// When exceeded, algorithm will partition and spill to disk.
119    pub memory_limit_bytes: Option<usize>,
120    /// Concurrency hint: number of threads for parallel partitions.
121    pub concurrency: usize,
122}
123
124impl Default for JoinOptions {
125    fn default() -> Self {
126        Self {
127            join_type: JoinType::Inner,
128            algorithm: JoinAlgorithm::Hash,
129            batch_size: 8192,
130            memory_limit_bytes: None,
131            concurrency: 1,
132        }
133    }
134}
135
136impl JoinOptions {
137    /// Create options for an inner join with default settings.
138    pub fn inner() -> Self {
139        Self {
140            join_type: JoinType::Inner,
141            ..Default::default()
142        }
143    }
144
145    /// Create options for a left outer join with default settings.
146    pub fn left() -> Self {
147        Self {
148            join_type: JoinType::Left,
149            ..Default::default()
150        }
151    }
152
153    /// Create options for a right outer join with default settings.
154    pub fn right() -> Self {
155        Self {
156            join_type: JoinType::Right,
157            ..Default::default()
158        }
159    }
160
161    /// Create options for a full outer join with default settings.
162    pub fn full() -> Self {
163        Self {
164            join_type: JoinType::Full,
165            ..Default::default()
166        }
167    }
168
169    /// Create options for a semi join with default settings.
170    pub fn semi() -> Self {
171        Self {
172            join_type: JoinType::Semi,
173            ..Default::default()
174        }
175    }
176
177    /// Create options for an anti join with default settings.
178    pub fn anti() -> Self {
179        Self {
180            join_type: JoinType::Anti,
181            ..Default::default()
182        }
183    }
184
185    /// Set the join algorithm.
186    pub fn with_algorithm(mut self, algorithm: JoinAlgorithm) -> Self {
187        self.algorithm = algorithm;
188        self
189    }
190
191    /// Set the output batch size.
192    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
193        self.batch_size = batch_size;
194        self
195    }
196
197    /// Set the memory limit for hash joins.
198    pub fn with_memory_limit(mut self, limit_bytes: usize) -> Self {
199        self.memory_limit_bytes = Some(limit_bytes);
200        self
201    }
202
203    /// Set the concurrency level.
204    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
205        self.concurrency = concurrency;
206        self
207    }
208}
209
210// TODO: Build out more fully or remove
211// NOTE: Validation presently only asserts that zero keys implies a Cartesian
212// join. Extend this once the planner provides richer metadata about key
213// compatibility (equality types, null semantics, etc.).
214/// Validate join keys before execution.
215///
216/// Note: Empty keys = cross product (Cartesian product).
217pub fn validate_join_keys(_keys: &[JoinKey]) -> LlkvResult<()> {
218    // Empty keys is valid for cross product
219    Ok(())
220}
221
222/// Validate join options before execution.
223pub fn validate_join_options(options: &JoinOptions) -> LlkvResult<()> {
224    if options.batch_size == 0 {
225        return Err(Error::InvalidArgumentError(
226            "join batch_size must be > 0".to_string(),
227        ));
228    }
229    if options.concurrency == 0 {
230        return Err(Error::InvalidArgumentError(
231            "join concurrency must be > 0".to_string(),
232        ));
233    }
234    Ok(())
235}
236
237/// Extension trait adding join operations to `Table`.
238pub trait TableJoinExt<P>
239where
240    P: Pager<Blob = EntryHandle> + Send + Sync,
241{
242    /// Join this table with another table based on equality predicates.
243    fn join_stream<F>(
244        &self,
245        right: &Table<P>,
246        keys: &[JoinKey],
247        options: &JoinOptions,
248        on_batch: F,
249    ) -> LlkvResult<()>
250    where
251        F: FnMut(RecordBatch);
252}
253
254impl<P> TableJoinExt<P> for Table<P>
255where
256    P: Pager<Blob = EntryHandle> + Send + Sync,
257{
258    fn join_stream<F>(
259        &self,
260        right: &Table<P>,
261        keys: &[JoinKey],
262        options: &JoinOptions,
263        on_batch: F,
264    ) -> LlkvResult<()>
265    where
266        F: FnMut(RecordBatch),
267    {
268        validate_join_keys(keys)?;
269        validate_join_options(options)?;
270
271        match options.algorithm {
272            JoinAlgorithm::Hash => {
273                hash_join::hash_join_stream(self, right, keys, options, on_batch)
274            }
275            JoinAlgorithm::SortMerge => Err(Error::Internal(
276                "Sort-merge join not yet implemented; use JoinAlgorithm::Hash".to_string(),
277            )),
278        }
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285
286    #[test]
287    fn test_join_key_constructors() {
288        let key = JoinKey::new(10, 20);
289        assert_eq!(key.left_field, 10);
290        assert_eq!(key.right_field, 20);
291        assert!(!key.null_equals_null);
292
293        let key_null_safe = JoinKey::null_safe(10, 20);
294        assert!(key_null_safe.null_equals_null);
295    }
296
297    #[test]
298    fn test_join_options_builders() {
299        let inner = JoinOptions::inner();
300        assert_eq!(inner.join_type, JoinType::Inner);
301
302        let left = JoinOptions::left()
303            .with_algorithm(JoinAlgorithm::Hash)
304            .with_batch_size(1024)
305            .with_memory_limit(1_000_000)
306            .with_concurrency(4);
307        assert_eq!(left.join_type, JoinType::Left);
308        assert_eq!(left.algorithm, JoinAlgorithm::Hash);
309        assert_eq!(left.batch_size, 1024);
310        assert_eq!(left.memory_limit_bytes, Some(1_000_000));
311        assert_eq!(left.concurrency, 4);
312    }
313
314    #[test]
315    fn test_validate_join_keys() {
316        // Empty keys are valid (cross product)
317        let empty: Vec<JoinKey> = vec![];
318        assert!(validate_join_keys(&empty).is_ok());
319
320        let keys = vec![JoinKey::new(1, 2)];
321        assert!(validate_join_keys(&keys).is_ok());
322    }
323
324    #[test]
325    fn test_validate_join_options() {
326        let bad_batch = JoinOptions {
327            batch_size: 0,
328            ..Default::default()
329        };
330        assert!(validate_join_options(&bad_batch).is_err());
331
332        let bad_concurrency = JoinOptions {
333            concurrency: 0,
334            ..Default::default()
335        };
336        assert!(validate_join_options(&bad_concurrency).is_err());
337
338        let good = JoinOptions::default();
339        assert!(validate_join_options(&good).is_ok());
340    }
341
342    #[test]
343    fn test_join_type_display() {
344        assert_eq!(JoinType::Inner.to_string(), "INNER");
345        assert_eq!(JoinType::Left.to_string(), "LEFT");
346        assert_eq!(JoinType::Right.to_string(), "RIGHT");
347        assert_eq!(JoinType::Full.to_string(), "FULL");
348        assert_eq!(JoinType::Semi.to_string(), "SEMI");
349        assert_eq!(JoinType::Anti.to_string(), "ANTI");
350    }
351
352    #[test]
353    fn test_join_algorithm_display() {
354        assert_eq!(JoinAlgorithm::Hash.to_string(), "Hash");
355        assert_eq!(JoinAlgorithm::SortMerge.to_string(), "SortMerge");
356    }
357}