dbx-core 0.1.0-beta

High-performance file-based database engine with 5-Tier Hybrid Storage
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
//! HashJoin Operator — Hash-based join implementation

use crate::error::{DbxError, DbxResult};
use crate::sql::executor::operators::PhysicalOperator;
use crate::sql::planner::JoinType;
use ahash::AHashMap;
use arrow::array::*;
use arrow::compute;
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
use rayon::prelude::*;
use smallvec::{SmallVec, smallvec};
use std::sync::Arc;

/// Hash Join 연산자 — build from left, probe from right
pub struct HashJoinOperator {
    left: Box<dyn PhysicalOperator>,
    right: Box<dyn PhysicalOperator>,
    schema: Arc<Schema>,
    /// (left_col_idx, right_col_idx) pairs
    on: Vec<(usize, usize)>,
    #[allow(dead_code)]
    join_type: JoinType,
    /// Build phase result: hash(key) → Vec<row_index>
    build_table: Option<AHashMap<Vec<u8>, Vec<usize>>>,
    /// Materialized left side
    left_batch: Option<RecordBatch>,
    /// Materialized right side (for probe)
    right_batches: Option<Vec<RecordBatch>>,
    /// Current right batch index for probe
    right_batch_idx: usize,
    /// Whether we've finished producing output
    done: bool,
}

impl HashJoinOperator {
    pub fn new(
        left: Box<dyn PhysicalOperator>,
        right: Box<dyn PhysicalOperator>,
        schema: Arc<Schema>,
        on: Vec<(usize, usize)>,
        join_type: JoinType,
    ) -> Self {
        Self {
            left,
            right,
            schema,
            on,
            join_type,
            build_table: None,
            left_batch: None,
            right_batches: None,
            right_batch_idx: 0,
            done: false,
        }
    }

    /// Build hash table from left input.
    ///
    /// 최적화: 양쪽 크기를 먼저 확인하고, 작은 쪽을 build로 사용합니다.
    fn build_phase(&mut self) -> DbxResult<()> {
        // 양쪽 데이터를 모두 수집
        let mut left_batches: SmallVec<[RecordBatch; 8]> = smallvec![];
        while let Some(batch) = self.left.next()? {
            if batch.num_rows() > 0 {
                left_batches.push(batch);
            }
        }

        let mut right_batches: SmallVec<[RecordBatch; 8]> = smallvec![];
        while let Some(batch) = self.right.next()? {
            if batch.num_rows() > 0 {
                right_batches.push(batch);
            }
        }

        // 빈 경우 처리
        if left_batches.is_empty() || right_batches.is_empty() {
            self.build_table = Some(AHashMap::new());
            self.left_batch = None;
            self.right_batches = Some(Vec::new());
            return Ok(());
        }

        // Size-based optimization: use the smaller side as build table
        let left_rows: usize = left_batches.iter().map(|b| b.num_rows()).sum();
        let right_rows: usize = right_batches.iter().map(|b| b.num_rows()).sum();

        let (build_batches, probe_batches, build_is_left) =
            if right_rows < left_rows && matches!(self.join_type, JoinType::Inner) {
                // Swap only for INNER JOIN (LEFT/RIGHT swap requires type conversion)
                (right_batches, left_batches, false)
            } else {
                (left_batches, right_batches, true)
            };

        // Build hash table (병렬 처리)
        let schema = build_batches[0].schema();
        let merged = super::super::concat_batches(&schema, build_batches.as_slice())?;

        // 임계값: 1000 rows 이상만 병렬화
        const PARALLEL_THRESHOLD: usize = 1000;

        // JOIN 키 컬럼 인덱스: swap 시 반전
        let key_columns: Vec<usize> = if build_is_left {
            self.on.iter().map(|(left_col, _)| *left_col).collect()
        } else {
            // Swapped: build is right, so use right_col indices for build keys
            self.on.iter().map(|(_, right_col)| *right_col).collect()
        };

        let hash_table: AHashMap<Vec<u8>, Vec<usize>> = if merged.num_rows() >= PARALLEL_THRESHOLD {
            // 병렬 Build Phase
            use dashmap::DashMap;
            let parallel_table: DashMap<Vec<u8>, Vec<usize>> = DashMap::new();

            (0..merged.num_rows()).into_par_iter().for_each(|row_idx| {
                let key = extract_join_key(&merged, &key_columns, row_idx);
                parallel_table.entry(key).or_default().push(row_idx);
            });

            // DashMap을 AHashMap으로 변환
            parallel_table.into_iter().collect()
        } else {
            // 순차 Build Phase (작은 데이터셋)
            let mut hash_table: AHashMap<Vec<u8>, Vec<usize>> = AHashMap::new();
            for row_idx in 0..merged.num_rows() {
                let key = extract_join_key(&merged, &key_columns, row_idx);
                hash_table.entry(key).or_default().push(row_idx);
            }
            hash_table
        };

        // Store results: when swapped, build=right so store accordingly
        if build_is_left {
            // Normal: left is build, right is probe
            self.left_batch = Some(merged);
            self.right_batches = Some(probe_batches.into_vec());
        } else {
            // Swapped (INNER only): right was build, left was probe
            // Store build data as left_batch (output will reorder columns)
            self.left_batch = Some(merged);
            self.right_batches = Some(probe_batches.into_vec());
        }

        self.build_table = Some(hash_table);

        Ok(())
    }
}

/// Extract join key bytes from a batch for given column indices.
fn extract_join_key(batch: &RecordBatch, key_columns: &[usize], row_idx: usize) -> Vec<u8> {
    let mut key = Vec::new();
    for &col_idx in key_columns {
        append_value_to_key(&mut key, batch.column(col_idx), row_idx);
    }
    key
}

/// Append a cell value to a byte key for hashing.
fn append_value_to_key(key: &mut Vec<u8>, col: &ArrayRef, row_idx: usize) {
    if col.is_null(row_idx) {
        key.push(0); // null marker
        return;
    }
    key.push(1); // non-null marker
    match col.data_type() {
        DataType::Int32 => {
            let arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
            key.extend_from_slice(&arr.value(row_idx).to_le_bytes());
        }
        DataType::Int64 => {
            let arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
            key.extend_from_slice(&arr.value(row_idx).to_le_bytes());
        }
        DataType::Float64 => {
            let arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
            key.extend_from_slice(&arr.value(row_idx).to_le_bytes());
        }
        DataType::Utf8 => {
            let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
            let s = arr.value(row_idx);
            key.extend_from_slice(&(s.len() as u32).to_le_bytes());
            key.extend_from_slice(s.as_bytes());
        }
        _ => {
            // Fallback: use debug format
            key.extend_from_slice(format!("{:?}", col).as_bytes());
        }
    }
}

/// Create a column with NULLs for sentinel values
fn create_column_with_nulls(
    source_col: &ArrayRef,
    indices: &[u32],
    null_sentinel: u32,
) -> DbxResult<ArrayRef> {
    let num_rows = indices.len();

    match source_col.data_type() {
        DataType::Int32 => {
            let source = source_col.as_any().downcast_ref::<Int32Array>().unwrap();
            let mut builder = Int32Builder::with_capacity(num_rows);
            for &idx in indices {
                if idx == null_sentinel {
                    builder.append_null();
                } else {
                    builder.append_value(source.value(idx as usize));
                }
            }
            Ok(Arc::new(builder.finish()))
        }
        DataType::Int64 => {
            let source = source_col.as_any().downcast_ref::<Int64Array>().unwrap();
            let mut builder = Int64Builder::with_capacity(num_rows);
            for &idx in indices {
                if idx == null_sentinel {
                    builder.append_null();
                } else {
                    builder.append_value(source.value(idx as usize));
                }
            }
            Ok(Arc::new(builder.finish()))
        }
        DataType::Float64 => {
            let source = source_col.as_any().downcast_ref::<Float64Array>().unwrap();
            let mut builder = Float64Builder::with_capacity(num_rows);
            for &idx in indices {
                if idx == null_sentinel {
                    builder.append_null();
                } else {
                    builder.append_value(source.value(idx as usize));
                }
            }
            Ok(Arc::new(builder.finish()))
        }
        DataType::Utf8 => {
            let source = source_col.as_any().downcast_ref::<StringArray>().unwrap();
            let mut builder = StringBuilder::with_capacity(num_rows, num_rows * 10);
            for &idx in indices {
                if idx == null_sentinel {
                    builder.append_null();
                } else {
                    builder.append_value(source.value(idx as usize));
                }
            }
            Ok(Arc::new(builder.finish()))
        }
        DataType::Boolean => {
            let source = source_col.as_any().downcast_ref::<BooleanArray>().unwrap();
            let mut builder = BooleanBuilder::with_capacity(num_rows);
            for &idx in indices {
                if idx == null_sentinel {
                    builder.append_null();
                } else {
                    builder.append_value(source.value(idx as usize));
                }
            }
            Ok(Arc::new(builder.finish()))
        }
        _ => Err(DbxError::SqlExecution {
            message: format!(
                "Unsupported data type for NULL handling: {:?}",
                source_col.data_type()
            ),
            context: "create_column_with_nulls".to_string(),
        }),
    }
}

impl PhysicalOperator for HashJoinOperator {
    fn schema(&self) -> &Schema {
        &self.schema
    }

    fn next(&mut self) -> DbxResult<Option<RecordBatch>> {
        if self.done {
            return Ok(None);
        }

        // Build phase (once)
        if self.build_table.is_none() {
            self.build_phase()?;
        }

        let build_table = self.build_table.as_ref().unwrap();
        let left_batch = match &self.left_batch {
            Some(b) => b.clone(),
            None => {
                self.done = true;
                return Ok(None);
            }
        };

        // Probe phase: iterate right batches
        let right_batches = self.right_batches.as_ref().unwrap();

        while self.right_batch_idx < right_batches.len() {
            let right_batch = &right_batches[self.right_batch_idx];
            self.right_batch_idx += 1;

            if right_batch.num_rows() == 0 {
                continue;
            }

            let mut left_indices = Vec::new();
            let mut right_indices = Vec::new();

            // For LEFT JOIN: track which left rows were matched
            let mut matched_left_rows = if matches!(self.join_type, JoinType::Left) {
                Some(std::collections::HashSet::new())
            } else {
                None
            };

            // For RIGHT JOIN: track which right rows were matched
            let mut matched_right_rows = if matches!(self.join_type, JoinType::Right) {
                Some(vec![false; right_batch.num_rows()])
            } else {
                None
            };

            // Probe Phase: 병렬 처리
            let right_key_columns: Vec<usize> =
                self.on.iter().map(|(_, right_col)| *right_col).collect();

            // 임계값: 1000 rows 이상만 병렬화
            const PARALLEL_THRESHOLD: usize = 1000;

            if right_batch.num_rows() >= PARALLEL_THRESHOLD {
                // 병렬 Probe Phase
                use dashmap::DashMap;
                let parallel_matches: DashMap<usize, Vec<usize>> = DashMap::new();

                (0..right_batch.num_rows())
                    .into_par_iter()
                    .for_each(|right_row| {
                        let key = extract_join_key(right_batch, &right_key_columns, right_row);
                        if let Some(left_rows) = build_table.get(&key) {
                            parallel_matches.insert(right_row, left_rows.clone());
                        }
                    });

                // 결과 수집
                for (right_row, left_rows) in parallel_matches.into_iter() {
                    for &left_row in &left_rows {
                        left_indices.push(left_row as u32);
                        right_indices.push(right_row as u32);

                        if let Some(ref mut matched) = matched_left_rows {
                            matched.insert(left_row);
                        }
                        if let Some(ref mut matched) = matched_right_rows {
                            matched[right_row] = true;
                        }
                    }
                }
            } else {
                // 순차 Probe Phase (작은 데이터셋)
                for right_row in 0..right_batch.num_rows() {
                    let key = extract_join_key(right_batch, &right_key_columns, right_row);
                    if let Some(left_rows) = build_table.get(&key) {
                        for &left_row in left_rows {
                            left_indices.push(left_row as u32);
                            right_indices.push(right_row as u32);

                            if let Some(ref mut matched) = matched_left_rows {
                                matched.insert(left_row);
                            }
                            if let Some(ref mut matched) = matched_right_rows {
                                matched[right_row] = true;
                            }
                        }
                    } else if matches!(self.join_type, JoinType::Right) {
                        // RIGHT JOIN: include unmatched right rows
                        // Will be handled after the loop
                    }
                }
            }

            // Handle LEFT JOIN: add unmatched left rows
            if let Some(matched) = matched_left_rows {
                for left_row in 0..left_batch.num_rows() {
                    if !matched.contains(&left_row) {
                        left_indices.push(left_row as u32);
                        // Use a sentinel value for NULL right side
                        right_indices.push(u32::MAX);
                    }
                }
            }

            // Handle RIGHT JOIN: add unmatched right rows
            if let Some(matched) = matched_right_rows {
                for (right_row, &was_matched) in matched.iter().enumerate() {
                    if !was_matched {
                        // Use a sentinel value for NULL left side
                        left_indices.push(u32::MAX);
                        right_indices.push(right_row as u32);
                    }
                }
            }

            if left_indices.is_empty() {
                continue;
            }

            // Build output: left columns + right columns
            let mut output_columns: Vec<ArrayRef> = Vec::new();

            // Process left columns
            for col in left_batch.columns() {
                let filtered_indices: Vec<u32> = left_indices
                    .iter()
                    .filter(|&&idx| idx != u32::MAX)
                    .copied()
                    .collect();

                if filtered_indices.len() == left_indices.len() {
                    // No NULLs needed
                    let left_idx_arr = UInt32Array::from(left_indices.clone());
                    output_columns.push(compute::take(col.as_ref(), &left_idx_arr, None)?);
                } else {
                    // Need to handle NULLs (RIGHT JOIN case)
                    output_columns.push(create_column_with_nulls(col, &left_indices, u32::MAX)?);
                }
            }

            // Process right columns
            for col in right_batch.columns() {
                let filtered_indices: Vec<u32> = right_indices
                    .iter()
                    .filter(|&&idx| idx != u32::MAX)
                    .copied()
                    .collect();

                if filtered_indices.len() == right_indices.len() {
                    // No NULLs needed
                    let right_idx_arr = UInt32Array::from(right_indices.clone());
                    output_columns.push(compute::take(col.as_ref(), &right_idx_arr, None)?);
                } else {
                    // Need to handle NULLs (LEFT JOIN case)
                    output_columns.push(create_column_with_nulls(col, &right_indices, u32::MAX)?);
                }
            }

            let result = RecordBatch::try_new(Arc::clone(&self.schema), output_columns)?;

            if result.num_rows() > 0 {
                return Ok(Some(result));
            }
        }

        self.done = true;
        Ok(None)
    }

    fn reset(&mut self) -> DbxResult<()> {
        self.build_table = None;
        self.left_batch = None;
        self.done = false;
        self.left.reset()?;
        self.right.reset()
    }
}