1use std::sync::{Arc, LazyLock};
5
6use indexmap::IndexMap;
7use postcard::to_stdvec;
8use reifydb_core::{
9 common::JoinType,
10 encoded::{key::EncodedKey, schema::Schema},
11 interface::{
12 catalog::flow::FlowNodeId,
13 change::{Change, ChangeOrigin, Diff},
14 },
15 internal,
16 util::encoding::keycode::serializer::KeySerializer,
17 value::column::{Column, columns::Columns},
18};
19use reifydb_engine::{
20 expression::{
21 compile::{CompiledExpr, compile_expression},
22 context::{CompileContext, EvalContext},
23 },
24 vm::{executor::Executor, stack::SymbolTable},
25};
26use reifydb_function::registry::Functions;
27use reifydb_rql::expression::Expression;
28use reifydb_runtime::{
29 clock::Clock,
30 hash::{Hash128, xxh3_128},
31};
32use reifydb_type::{
33 Result,
34 error::Error,
35 fragment::Fragment,
36 params::Params,
37 util::cowvec::CowVec,
38 value::{Value, identity::IdentityId, row_number::RowNumber, r#type::Type},
39};
40
41use super::{
42 column::JoinedColumnsBuilder,
43 state::{JoinSide, JoinState},
44 strategy::JoinStrategy,
45};
46use crate::{
47 operator::{
48 Operator, Operators,
49 stateful::{raw::RawStatefulOperator, row::RowNumberProvider, single::SingleStateful},
50 },
51 transaction::FlowTransaction,
52};
53
54static EMPTY_PARAMS: Params = Params::None;
55static EMPTY_SYMBOL_TABLE: LazyLock<SymbolTable> = LazyLock::new(|| SymbolTable::new());
56
57pub struct JoinOperator {
58 pub(crate) left_parent: Arc<Operators>,
59 pub(crate) right_parent: Arc<Operators>,
60 node: FlowNodeId,
61 strategy: JoinStrategy,
62 left_node: FlowNodeId,
63 right_node: FlowNodeId,
64 left_exprs: Vec<Expression>,
65 pub(crate) right_exprs: Vec<Expression>,
66 compiled_left_exprs: Vec<CompiledExpr>,
67 compiled_right_exprs: Vec<CompiledExpr>,
68 alias: Option<String>,
69 schema: Schema,
70 row_number_provider: RowNumberProvider,
71 executor: Executor,
72 functions: Functions,
73 clock: Clock,
74}
75
76impl JoinOperator {
77 pub fn new(
78 left_parent: Arc<Operators>,
79 right_parent: Arc<Operators>,
80 node: FlowNodeId,
81 join_type: JoinType,
82 left_node: FlowNodeId,
83 right_node: FlowNodeId,
84 left_exprs: Vec<Expression>,
85 right_exprs: Vec<Expression>,
86 alias: Option<String>,
87 executor: Executor,
88 ) -> Self {
89 let strategy = JoinStrategy::from(join_type);
90 let schema = Self::state_schema();
91 let row_number_provider = RowNumberProvider::new(node);
92
93 let compile_ctx = CompileContext {
95 functions: &executor.functions,
96 symbol_table: &EMPTY_SYMBOL_TABLE,
97 };
98
99 let compiled_left_exprs: Vec<CompiledExpr> = left_exprs
101 .iter()
102 .map(|e| compile_expression(&compile_ctx, e))
103 .collect::<Result<Vec<_>>>()
104 .expect("Failed to compile left expressions");
105
106 let compiled_right_exprs: Vec<CompiledExpr> = right_exprs
107 .iter()
108 .map(|e| compile_expression(&compile_ctx, e))
109 .collect::<Result<Vec<_>>>()
110 .expect("Failed to compile right expressions");
111
112 let functions = executor.functions.clone();
114 let clock = executor.clock.clone();
115
116 Self {
117 left_parent,
118 right_parent,
119 node,
120 strategy,
121 left_node,
122 right_node,
123 left_exprs,
124 right_exprs,
125 compiled_left_exprs,
126 compiled_right_exprs,
127 alias,
128 schema,
129 row_number_provider,
130 executor,
131 functions,
132 clock,
133 }
134 }
135
136 fn state_schema() -> Schema {
137 Schema::testing(&[Type::Blob])
138 }
139
140 pub(crate) fn compute_join_keys(
143 &self,
144 columns: &Columns,
145 compiled_exprs: &[CompiledExpr],
146 ) -> Result<Vec<Option<Hash128>>> {
147 let row_count = columns.row_count();
148 if row_count == 0 {
149 return Ok(Vec::new());
150 }
151
152 let exec_ctx = EvalContext {
153 target: None,
154 columns: columns.clone(),
155 row_count,
156 take: None,
157 params: &EMPTY_PARAMS,
158 symbol_table: &EMPTY_SYMBOL_TABLE,
159 is_aggregate_context: false,
160 functions: &self.functions,
161 clock: &self.clock,
162 arena: None,
163 identity: IdentityId::root(),
164 };
165
166 let mut expr_columns = Vec::with_capacity(compiled_exprs.len());
168 for compiled_expr in compiled_exprs.iter() {
169 let col = if let Some(col_name) = compiled_expr.access_column_name() {
170 columns.column(col_name)
171 .cloned()
172 .unwrap_or_else(|| Column::undefined_typed(col_name, Type::Boolean, row_count))
173 } else {
174 compiled_expr.execute(&exec_ctx)?
175 };
176 expr_columns.push(col);
177 }
178
179 let mut hashes = Vec::with_capacity(row_count);
181 for row_idx in 0..row_count {
182 let mut hasher = Vec::with_capacity(256);
183 let mut has_undefined = false;
184
185 for col in &expr_columns {
186 let value = col.data().get_value(row_idx);
187
188 if matches!(value, Value::None { .. }) {
190 has_undefined = true;
191 break;
192 }
193
194 let bytes = to_stdvec(&value)
195 .map_err(|e| Error(internal!("Failed to encode value for hash: {}", e)))?;
196 hasher.extend_from_slice(&bytes);
197 }
198
199 if has_undefined {
200 hashes.push(None);
201 } else {
202 hashes.push(Some(xxh3_128(&hasher)));
203 }
204 }
205
206 Ok(hashes)
207 }
208
209 pub(crate) fn unmatched_left_columns(
212 &self,
213 txn: &mut FlowTransaction,
214 left: &Columns,
215 left_idx: usize,
216 ) -> Result<Columns> {
217 let left_row_number = left.row_numbers[left_idx];
218
219 let mut serializer = KeySerializer::new();
221 serializer.extend_u8(b'L');
222 serializer.extend_u64(left_row_number.0);
223 let composite_key = EncodedKey::new(serializer.finish());
224
225 let (result_row_number, _is_new) =
227 self.row_number_provider.get_or_create_row_number(txn, &composite_key)?;
228
229 let right_schema = self.right_parent.pull(txn, &[])?;
231
232 let builder = JoinedColumnsBuilder::new(left, &right_schema, &self.alias);
234 Ok(builder.unmatched_left(result_row_number, left, left_idx, &right_schema))
235 }
236
237 pub(crate) fn unmatched_left_columns_batch(
239 &self,
240 txn: &mut FlowTransaction,
241 left: &Columns,
242 left_indices: &[usize],
243 ) -> Result<Columns> {
244 if left_indices.is_empty() {
245 return Ok(Columns::empty());
246 }
247
248 let composite_keys: Vec<EncodedKey> = left_indices
250 .iter()
251 .map(|&idx| {
252 let left_row_number = left.row_numbers[idx];
253 let mut serializer = KeySerializer::new();
254 serializer.extend_u8(b'L');
255 serializer.extend_u64(left_row_number.0);
256 EncodedKey::new(serializer.finish())
257 })
258 .collect();
259
260 let row_numbers_with_flags =
262 self.row_number_provider.get_or_create_row_numbers(txn, composite_keys.iter())?;
263 let row_numbers: Vec<RowNumber> = row_numbers_with_flags.iter().map(|(rn, _)| *rn).collect();
264
265 let right_schema = self.right_parent.pull(txn, &[])?;
267
268 let builder = JoinedColumnsBuilder::new(left, &right_schema, &self.alias);
270 Ok(builder.unmatched_left_batch(&row_numbers, left, left_indices, &right_schema))
271 }
272
273 pub(crate) fn cleanup_left_row_joins(&self, txn: &mut FlowTransaction, left_number: u64) -> Result<()> {
276 let mut serializer = KeySerializer::new();
277 serializer.extend_u8(b'L');
278 serializer.extend_u64(left_number);
279 let prefix = serializer.finish();
280
281 self.row_number_provider.remove_by_prefix(txn, &prefix)
283 }
284
285 pub(crate) fn join_columns(
287 &self,
288 txn: &mut FlowTransaction,
289 left: &Columns,
290 left_idx: usize,
291 right: &Columns,
292 right_idx: usize,
293 ) -> Result<Columns> {
294 let left_row_number = left.row_numbers[left_idx];
295 let right_row_number = right.row_numbers[right_idx];
296
297 let composite_key = Self::make_composite_key(left_row_number, right_row_number);
298 let (result_row_number, _is_new) =
299 self.row_number_provider.get_or_create_row_number(txn, &composite_key)?;
300
301 let builder = JoinedColumnsBuilder::new(left, right, &self.alias);
303 Ok(builder.join_at_indices(result_row_number, left, left_idx, right, right_idx))
304 }
305
306 fn make_composite_key(left_num: RowNumber, right_num: RowNumber) -> EncodedKey {
308 let mut serializer = KeySerializer::new();
309 serializer.extend_u8(b'L');
310 serializer.extend_u64(left_num.0);
311 serializer.extend_u64(right_num.0);
312 EncodedKey::new(serializer.finish())
313 }
314
315 fn decode_row_number_from_keycode(bytes: &[u8]) -> u64 {
318 let arr: [u8; 8] =
319 [!bytes[0], !bytes[1], !bytes[2], !bytes[3], !bytes[4], !bytes[5], !bytes[6], !bytes[7]];
320 u64::from_be_bytes(arr)
321 }
322
323 fn parse_composite_key(key_bytes: &[u8]) -> Option<(RowNumber, Option<RowNumber>)> {
327 if key_bytes.len() < 9 || key_bytes[0] != !b'L' {
329 return None;
330 }
331
332 let left_num = Self::decode_row_number_from_keycode(&key_bytes[1..9]);
333 let right_num = if key_bytes.len() >= 17 {
334 Some(RowNumber(Self::decode_row_number_from_keycode(&key_bytes[9..17])))
335 } else {
336 None
337 };
338
339 Some((RowNumber(left_num), right_num))
340 }
341
342 pub(crate) fn join_columns_one_to_many(
345 &self,
346 txn: &mut FlowTransaction,
347 left: &Columns,
348 left_idx: usize,
349 right: &Columns,
350 ) -> Result<Columns> {
351 let right_count = right.row_count();
352 if right_count == 0 {
353 return Ok(Columns::empty());
354 }
355
356 let left_row_number = left.row_numbers[left_idx];
357
358 let composite_keys: Vec<EncodedKey> = (0..right_count)
360 .map(|right_idx| {
361 let right_row_number = right.row_numbers[right_idx];
362 Self::make_composite_key(left_row_number, right_row_number)
363 })
364 .collect();
365
366 let row_numbers_with_flags =
368 self.row_number_provider.get_or_create_row_numbers(txn, composite_keys.iter())?;
369 let row_numbers: Vec<RowNumber> = row_numbers_with_flags.iter().map(|(rn, _)| *rn).collect();
370
371 let builder = JoinedColumnsBuilder::new(left, right, &self.alias);
372 Ok(builder.join_one_to_many(&row_numbers, left, left_idx, right))
373 }
374
375 pub(crate) fn join_columns_many_to_one(
378 &self,
379 txn: &mut FlowTransaction,
380 left: &Columns,
381 right: &Columns,
382 right_idx: usize,
383 ) -> Result<Columns> {
384 let left_count = left.row_count();
385 if left_count == 0 {
386 return Ok(Columns::empty());
387 }
388
389 let right_row_number = right.row_numbers[right_idx];
390
391 let composite_keys: Vec<EncodedKey> = (0..left_count)
393 .map(|left_idx| {
394 let left_row_number = left.row_numbers[left_idx];
395 Self::make_composite_key(left_row_number, right_row_number)
396 })
397 .collect();
398
399 let row_numbers_with_flags =
401 self.row_number_provider.get_or_create_row_numbers(txn, composite_keys.iter())?;
402 let row_numbers: Vec<RowNumber> = row_numbers_with_flags.iter().map(|(rn, _)| *rn).collect();
403
404 let builder = JoinedColumnsBuilder::new(left, right, &self.alias);
405 Ok(builder.join_many_to_one(&row_numbers, left, right, right_idx))
406 }
407
408 pub(crate) fn join_columns_cartesian(
411 &self,
412 txn: &mut FlowTransaction,
413 left: &Columns,
414 left_indices: &[usize],
415 right: &Columns,
416 right_indices: &[usize],
417 ) -> Result<Columns> {
418 let left_count = left_indices.len();
419 let right_count = right_indices.len();
420 if left_count == 0 || right_count == 0 {
421 return Ok(Columns::empty());
422 }
423
424 let total_results = left_count * right_count;
426 let mut composite_keys = Vec::with_capacity(total_results);
427
428 for &left_idx in left_indices {
429 let left_row_number = left.row_numbers[left_idx];
430 for &right_idx in right_indices {
431 let right_row_number = right.row_numbers[right_idx];
432 composite_keys.push(Self::make_composite_key(left_row_number, right_row_number));
433 }
434 }
435
436 let row_numbers_with_flags =
438 self.row_number_provider.get_or_create_row_numbers(txn, composite_keys.iter())?;
439 let row_numbers: Vec<RowNumber> = row_numbers_with_flags.iter().map(|(rn, _)| *rn).collect();
440
441 let builder = JoinedColumnsBuilder::new(left, right, &self.alias);
442 Ok(builder.join_cartesian(&row_numbers, left, left_indices, right, right_indices))
443 }
444
445 fn determine_side(&self, change: &Change) -> Option<JoinSide> {
446 match &change.origin {
447 ChangeOrigin::Flow(from_node) => {
448 if *from_node == self.left_node {
449 Some(JoinSide::Left)
450 } else if *from_node == self.right_node {
451 Some(JoinSide::Right)
452 } else {
453 None
454 }
455 }
456 _ => None,
457 }
458 }
459}
460
461impl RawStatefulOperator for JoinOperator {}
462
463impl SingleStateful for JoinOperator {
464 fn layout(&self) -> Schema {
465 self.schema.clone()
466 }
467}
468
469impl Operator for JoinOperator {
470 fn id(&self) -> FlowNodeId {
471 self.node
472 }
473
474 fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
475 if let ChangeOrigin::Flow(from_node) = &change.origin {
477 if *from_node == self.node {
478 return Ok(Change::from_flow(self.node, change.version, Vec::new()));
479 }
480 }
481
482 let mut state = JoinState::new(self.node);
484 let estimated_capacity = change.diffs.len() * 2;
486 let mut result = Vec::with_capacity(estimated_capacity);
487
488 let side = self
490 .determine_side(&change)
491 .ok_or_else(|| Error(internal!("Join operator received change from unknown node")))?;
492
493 let compiled_exprs = match side {
494 JoinSide::Left => &self.compiled_left_exprs,
495 JoinSide::Right => &self.compiled_right_exprs,
496 };
497
498 for diff in change.diffs {
500 match diff {
501 Diff::Insert {
502 post,
503 } => {
504 let keys = self.compute_join_keys(&post, compiled_exprs)?;
506 let row_count = post.row_count();
507
508 let mut inserts_by_key: IndexMap<Hash128, Vec<usize>> = IndexMap::new();
510 let mut inserts_undefined: Vec<usize> = Vec::new();
511
512 for row_idx in 0..row_count {
513 if let Some(key_hash) = keys[row_idx] {
514 inserts_by_key.entry(key_hash).or_default().push(row_idx);
515 } else {
516 inserts_undefined.push(row_idx);
517 }
518 }
519
520 for (key_hash, indices) in inserts_by_key {
522 let diffs = self.strategy.handle_insert(
523 txn, &post, &indices, side, &key_hash, &mut state, self,
524 )?;
525 result.extend(diffs);
526 }
527
528 for idx in inserts_undefined {
530 let diffs = self.strategy.handle_insert_undefined(
531 txn, &post, idx, side, &mut state, self,
532 )?;
533 result.extend(diffs);
534 }
535 }
536 Diff::Remove {
537 pre,
538 } => {
539 let keys = self.compute_join_keys(&pre, compiled_exprs)?;
541 let row_count = pre.row_count();
542
543 let mut removes_by_key: IndexMap<Hash128, Vec<usize>> = IndexMap::new();
545 let mut removes_undefined: Vec<usize> = Vec::new();
546
547 for row_idx in 0..row_count {
548 if let Some(key_hash) = keys[row_idx] {
549 removes_by_key.entry(key_hash).or_default().push(row_idx);
550 } else {
551 removes_undefined.push(row_idx);
552 }
553 }
554
555 for (key_hash, indices) in removes_by_key {
557 let diffs = self.strategy.handle_remove(
558 txn,
559 &pre,
560 &indices,
561 side,
562 &key_hash,
563 &mut state,
564 self,
565 change.version,
566 )?;
567 result.extend(diffs);
568 }
569
570 for idx in removes_undefined {
572 let diffs = self.strategy.handle_remove_undefined(
573 txn,
574 &pre,
575 idx,
576 side,
577 &mut state,
578 self,
579 change.version,
580 )?;
581 result.extend(diffs);
582 }
583 }
584 Diff::Update {
585 pre,
586 post,
587 } => {
588 let old_keys = self.compute_join_keys(&pre, compiled_exprs)?;
590 let new_keys = self.compute_join_keys(&post, compiled_exprs)?;
591 let row_count = post.row_count();
592
593 let mut updates_by_key: IndexMap<(Hash128, Hash128), Vec<usize>> =
596 IndexMap::new();
597 let mut updates_undefined: Vec<usize> = Vec::new();
598
599 for row_idx in 0..row_count {
600 match (old_keys[row_idx], new_keys[row_idx]) {
601 (Some(old_key), Some(new_key)) => {
602 updates_by_key
603 .entry((old_key, new_key))
604 .or_default()
605 .push(row_idx);
606 }
607 _ => {
608 updates_undefined.push(row_idx);
611 }
612 }
613 }
614
615 for ((old_key, new_key), indices) in updates_by_key {
617 let diffs = self.strategy.handle_update(
618 txn,
619 &pre,
620 &post,
621 &indices,
622 side,
623 &old_key,
624 &new_key,
625 &mut state,
626 self,
627 change.version,
628 )?;
629 result.extend(diffs);
630 }
631
632 for row_idx in updates_undefined {
634 let diffs = self.strategy.handle_update_undefined(
635 txn,
636 &pre,
637 &post,
638 row_idx,
639 side,
640 &mut state,
641 self,
642 change.version,
643 )?;
644 result.extend(diffs);
645 }
646 }
647 }
648 }
649
650 Ok(Change::from_flow(self.node, change.version, result))
651 }
652
653 fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
660 let mut found_columns: Vec<Columns> = Vec::new();
661
662 for &row_number in rows {
663 let Some(key) = self.row_number_provider.get_key_for_row_number(txn, row_number)? else {
665 continue;
666 };
667
668 let Some((left_row_number, right_row_number)) = Self::parse_composite_key(key.as_ref()) else {
670 continue;
671 };
672
673 let left_cols = self.left_parent.pull(txn, &[left_row_number])?;
675 if left_cols.is_empty() {
676 continue;
677 }
678
679 if let Some(right_row_num) = right_row_number {
680 let right_cols = self.right_parent.pull(txn, &[right_row_num])?;
682 if !right_cols.is_empty() {
683 let builder = JoinedColumnsBuilder::new(&left_cols, &right_cols, &self.alias);
685 let mut joined = builder.join_single(row_number, &left_cols, &right_cols);
686 joined.row_numbers = CowVec::new(vec![row_number]);
688 found_columns.push(joined);
689 }
690 } else {
691 let right_schema = self.right_parent.pull(txn, &[])?;
693 let builder = JoinedColumnsBuilder::new(&left_cols, &right_schema, &self.alias);
694 let mut unmatched = builder.unmatched_left(row_number, &left_cols, 0, &right_schema);
695 unmatched.row_numbers = CowVec::new(vec![row_number]);
697 found_columns.push(unmatched);
698 }
699 }
700
701 if found_columns.is_empty() {
703 let left_schema = self.left_parent.pull(txn, &[])?;
705 let right_schema = self.right_parent.pull(txn, &[])?;
706
707 let builder = JoinedColumnsBuilder::new(&left_schema, &right_schema, &self.alias);
709 let right_names = builder.right_column_names();
710
711 let mut all_columns: Vec<Column> = left_schema.columns.into_iter().collect();
713
714 for (col, aliased_name) in right_schema.columns.into_iter().zip(right_names.iter()) {
716 all_columns.push(Column {
717 name: Fragment::internal(aliased_name),
718 data: col.data,
719 });
720 }
721
722 Ok(Columns {
723 row_numbers: CowVec::new(Vec::new()),
724 columns: CowVec::new(all_columns),
725 })
726 } else if found_columns.len() == 1 {
727 Ok(found_columns.remove(0))
728 } else {
729 let mut result = found_columns.remove(0);
730 for cols in found_columns {
731 result.row_numbers.make_mut().extend(cols.row_numbers.iter().copied());
732 for (i, col) in cols.columns.into_iter().enumerate() {
733 result.columns.make_mut()[i].extend(col).expect("schema mismatch in join pull");
734 }
735 }
736 Ok(result)
737 }
738 }
739}