1use std::sync::{Arc, LazyLock};
5
6use indexmap::IndexMap;
7use postcard::to_stdvec;
8use reifydb_core::{
9 common::JoinType,
10 encoded::{key::EncodedKey, schema::RowSchema},
11 interface::{
12 catalog::flow::FlowNodeId,
13 change::{Change, ChangeOrigin, Diff},
14 },
15 internal,
16 util::encoding::keycode::serializer::KeySerializer,
17 value::column::{Column, columns::Columns},
18};
19use reifydb_engine::{
20 expression::{
21 compile::{CompiledExpr, compile_expression},
22 context::{CompileContext, EvalSession},
23 },
24 vm::{executor::Executor, stack::SymbolTable},
25};
26use reifydb_routine::function::registry::Functions;
27use reifydb_rql::expression::Expression;
28use reifydb_runtime::{
29 context::RuntimeContext,
30 hash::{Hash128, xxh3_128},
31};
32use reifydb_type::{
33 Result,
34 error::Error,
35 fragment::Fragment,
36 params::Params,
37 util::cowvec::CowVec,
38 value::{Value, identity::IdentityId, row_number::RowNumber, r#type::Type},
39};
40
41use super::{
42 column::JoinedColumnsBuilder,
43 state::{JoinSide, JoinState},
44 strategy::JoinStrategy,
45};
46use crate::{
47 operator::{
48 Operator, Operators,
49 stateful::{raw::RawStatefulOperator, row::RowNumberProvider, single::SingleStateful},
50 },
51 transaction::FlowTransaction,
52};
53
54static EMPTY_PARAMS: Params = Params::None;
55static EMPTY_SYMBOL_TABLE: LazyLock<SymbolTable> = LazyLock::new(|| SymbolTable::new());
56
57pub struct JoinOperator {
58 pub(crate) left_parent: Arc<Operators>,
59 pub(crate) right_parent: Arc<Operators>,
60 node: FlowNodeId,
61 strategy: JoinStrategy,
62 left_node: FlowNodeId,
63 right_node: FlowNodeId,
64 left_exprs: Vec<Expression>,
65 pub(crate) right_exprs: Vec<Expression>,
66 compiled_left_exprs: Vec<CompiledExpr>,
67 compiled_right_exprs: Vec<CompiledExpr>,
68 alias: Option<String>,
69 schema: RowSchema,
70 row_number_provider: RowNumberProvider,
71 executor: Executor,
72 functions: Functions,
73 runtime_context: RuntimeContext,
74}
75
76impl JoinOperator {
77 pub fn new(
78 left_parent: Arc<Operators>,
79 right_parent: Arc<Operators>,
80 node: FlowNodeId,
81 join_type: JoinType,
82 left_node: FlowNodeId,
83 right_node: FlowNodeId,
84 left_exprs: Vec<Expression>,
85 right_exprs: Vec<Expression>,
86 alias: Option<String>,
87 executor: Executor,
88 ) -> Self {
89 let strategy = JoinStrategy::from(join_type);
90 let schema = Self::state_schema();
91 let row_number_provider = RowNumberProvider::new(node);
92
93 let compile_ctx = CompileContext {
95 functions: &executor.functions,
96 symbols: &EMPTY_SYMBOL_TABLE,
97 };
98
99 let compiled_left_exprs: Vec<CompiledExpr> = left_exprs
101 .iter()
102 .map(|e| compile_expression(&compile_ctx, e))
103 .collect::<Result<Vec<_>>>()
104 .expect("Failed to compile left expressions");
105
106 let compiled_right_exprs: Vec<CompiledExpr> = right_exprs
107 .iter()
108 .map(|e| compile_expression(&compile_ctx, e))
109 .collect::<Result<Vec<_>>>()
110 .expect("Failed to compile right expressions");
111
112 let functions = executor.functions.clone();
114 let runtime_context = executor.runtime_context.clone();
115
116 Self {
117 left_parent,
118 right_parent,
119 node,
120 strategy,
121 left_node,
122 right_node,
123 left_exprs,
124 right_exprs,
125 compiled_left_exprs,
126 compiled_right_exprs,
127 alias,
128 schema,
129 row_number_provider,
130 executor,
131 functions,
132 runtime_context,
133 }
134 }
135
136 fn state_schema() -> RowSchema {
137 RowSchema::testing(&[Type::Blob])
138 }
139
140 pub(crate) fn compute_join_keys(
143 &self,
144 columns: &Columns,
145 compiled_exprs: &[CompiledExpr],
146 ) -> Result<Vec<Option<Hash128>>> {
147 let row_count = columns.row_count();
148 if row_count == 0 {
149 return Ok(Vec::new());
150 }
151
152 let session = EvalSession {
153 params: &EMPTY_PARAMS,
154 symbols: &EMPTY_SYMBOL_TABLE,
155 functions: &self.functions,
156 runtime_context: &self.runtime_context,
157 arena: None,
158 identity: IdentityId::root(),
159 is_aggregate_context: false,
160 };
161 let exec_ctx = session.eval(columns.clone(), row_count);
162
163 let mut expr_columns = Vec::with_capacity(compiled_exprs.len());
165 for compiled_expr in compiled_exprs.iter() {
166 let col = if let Some(col_name) = compiled_expr.access_column_name() {
167 columns.column(col_name)
168 .cloned()
169 .unwrap_or_else(|| Column::undefined_typed(col_name, Type::Boolean, row_count))
170 } else {
171 compiled_expr.execute(&exec_ctx)?
172 };
173 expr_columns.push(col);
174 }
175
176 let mut hashes = Vec::with_capacity(row_count);
178 for row_idx in 0..row_count {
179 let mut hasher = Vec::with_capacity(256);
180 let mut has_undefined = false;
181
182 for col in &expr_columns {
183 let value = col.data().get_value(row_idx);
184
185 if matches!(value, Value::None { .. }) {
187 has_undefined = true;
188 break;
189 }
190
191 let bytes = to_stdvec(&value)
192 .map_err(|e| Error(internal!("Failed to encode value for hash: {}", e)))?;
193 hasher.extend_from_slice(&bytes);
194 }
195
196 if has_undefined {
197 hashes.push(None);
198 } else {
199 hashes.push(Some(xxh3_128(&hasher)));
200 }
201 }
202
203 Ok(hashes)
204 }
205
206 pub(crate) fn unmatched_left_columns(
209 &self,
210 txn: &mut FlowTransaction,
211 left: &Columns,
212 left_idx: usize,
213 ) -> Result<Columns> {
214 let left_row_number = left.row_numbers[left_idx];
215
216 let mut serializer = KeySerializer::new();
218 serializer.extend_u8(b'L');
219 serializer.extend_u64(left_row_number.0);
220 let composite_key = EncodedKey::new(serializer.finish());
221
222 let (result_row_number, _is_new) =
224 self.row_number_provider.get_or_create_row_number(txn, &composite_key)?;
225
226 let right_schema = self.right_parent.pull(txn, &[])?;
228
229 let builder = JoinedColumnsBuilder::new(left, &right_schema, &self.alias);
231 Ok(builder.unmatched_left(result_row_number, left, left_idx, &right_schema))
232 }
233
234 pub(crate) fn unmatched_left_columns_batch(
236 &self,
237 txn: &mut FlowTransaction,
238 left: &Columns,
239 left_indices: &[usize],
240 ) -> Result<Columns> {
241 if left_indices.is_empty() {
242 return Ok(Columns::empty());
243 }
244
245 let composite_keys: Vec<EncodedKey> = left_indices
247 .iter()
248 .map(|&idx| {
249 let left_row_number = left.row_numbers[idx];
250 let mut serializer = KeySerializer::new();
251 serializer.extend_u8(b'L');
252 serializer.extend_u64(left_row_number.0);
253 EncodedKey::new(serializer.finish())
254 })
255 .collect();
256
257 let row_numbers_with_flags =
259 self.row_number_provider.get_or_create_row_numbers(txn, composite_keys.iter())?;
260 let row_numbers: Vec<RowNumber> = row_numbers_with_flags.iter().map(|(rn, _)| *rn).collect();
261
262 let right_schema = self.right_parent.pull(txn, &[])?;
264
265 let builder = JoinedColumnsBuilder::new(left, &right_schema, &self.alias);
267 Ok(builder.unmatched_left_batch(&row_numbers, left, left_indices, &right_schema))
268 }
269
270 pub(crate) fn cleanup_left_row_joins(&self, txn: &mut FlowTransaction, left_number: u64) -> Result<()> {
273 let mut serializer = KeySerializer::new();
274 serializer.extend_u8(b'L');
275 serializer.extend_u64(left_number);
276 let prefix = serializer.finish();
277
278 self.row_number_provider.remove_by_prefix(txn, &prefix)
280 }
281
282 pub(crate) fn join_columns(
284 &self,
285 txn: &mut FlowTransaction,
286 left: &Columns,
287 left_idx: usize,
288 right: &Columns,
289 right_idx: usize,
290 ) -> Result<Columns> {
291 let left_row_number = left.row_numbers[left_idx];
292 let right_row_number = right.row_numbers[right_idx];
293
294 let composite_key = Self::make_composite_key(left_row_number, right_row_number);
295 let (result_row_number, _is_new) =
296 self.row_number_provider.get_or_create_row_number(txn, &composite_key)?;
297
298 let builder = JoinedColumnsBuilder::new(left, right, &self.alias);
300 Ok(builder.join_at_indices(result_row_number, left, left_idx, right, right_idx))
301 }
302
303 fn make_composite_key(left_num: RowNumber, right_num: RowNumber) -> EncodedKey {
305 let mut serializer = KeySerializer::new();
306 serializer.extend_u8(b'L');
307 serializer.extend_u64(left_num.0);
308 serializer.extend_u64(right_num.0);
309 EncodedKey::new(serializer.finish())
310 }
311
312 fn decode_row_number_from_keycode(bytes: &[u8]) -> u64 {
315 let arr: [u8; 8] =
316 [!bytes[0], !bytes[1], !bytes[2], !bytes[3], !bytes[4], !bytes[5], !bytes[6], !bytes[7]];
317 u64::from_be_bytes(arr)
318 }
319
320 fn parse_composite_key(key_bytes: &[u8]) -> Option<(RowNumber, Option<RowNumber>)> {
324 if key_bytes.len() < 9 || key_bytes[0] != !b'L' {
326 return None;
327 }
328
329 let left_num = Self::decode_row_number_from_keycode(&key_bytes[1..9]);
330 let right_num = if key_bytes.len() >= 17 {
331 Some(RowNumber(Self::decode_row_number_from_keycode(&key_bytes[9..17])))
332 } else {
333 None
334 };
335
336 Some((RowNumber(left_num), right_num))
337 }
338
339 pub(crate) fn join_columns_one_to_many(
342 &self,
343 txn: &mut FlowTransaction,
344 left: &Columns,
345 left_idx: usize,
346 right: &Columns,
347 ) -> Result<Columns> {
348 let right_count = right.row_count();
349 if right_count == 0 {
350 return Ok(Columns::empty());
351 }
352
353 let left_row_number = left.row_numbers[left_idx];
354
355 let composite_keys: Vec<EncodedKey> = (0..right_count)
357 .map(|right_idx| {
358 let right_row_number = right.row_numbers[right_idx];
359 Self::make_composite_key(left_row_number, right_row_number)
360 })
361 .collect();
362
363 let row_numbers_with_flags =
365 self.row_number_provider.get_or_create_row_numbers(txn, composite_keys.iter())?;
366 let row_numbers: Vec<RowNumber> = row_numbers_with_flags.iter().map(|(rn, _)| *rn).collect();
367
368 let builder = JoinedColumnsBuilder::new(left, right, &self.alias);
369 Ok(builder.join_one_to_many(&row_numbers, left, left_idx, right))
370 }
371
372 pub(crate) fn join_columns_many_to_one(
375 &self,
376 txn: &mut FlowTransaction,
377 left: &Columns,
378 right: &Columns,
379 right_idx: usize,
380 ) -> Result<Columns> {
381 let left_count = left.row_count();
382 if left_count == 0 {
383 return Ok(Columns::empty());
384 }
385
386 let right_row_number = right.row_numbers[right_idx];
387
388 let composite_keys: Vec<EncodedKey> = (0..left_count)
390 .map(|left_idx| {
391 let left_row_number = left.row_numbers[left_idx];
392 Self::make_composite_key(left_row_number, right_row_number)
393 })
394 .collect();
395
396 let row_numbers_with_flags =
398 self.row_number_provider.get_or_create_row_numbers(txn, composite_keys.iter())?;
399 let row_numbers: Vec<RowNumber> = row_numbers_with_flags.iter().map(|(rn, _)| *rn).collect();
400
401 let builder = JoinedColumnsBuilder::new(left, right, &self.alias);
402 Ok(builder.join_many_to_one(&row_numbers, left, right, right_idx))
403 }
404
405 pub(crate) fn join_columns_cartesian(
408 &self,
409 txn: &mut FlowTransaction,
410 left: &Columns,
411 left_indices: &[usize],
412 right: &Columns,
413 right_indices: &[usize],
414 ) -> Result<Columns> {
415 let left_count = left_indices.len();
416 let right_count = right_indices.len();
417 if left_count == 0 || right_count == 0 {
418 return Ok(Columns::empty());
419 }
420
421 let total_results = left_count * right_count;
423 let mut composite_keys = Vec::with_capacity(total_results);
424
425 for &left_idx in left_indices {
426 let left_row_number = left.row_numbers[left_idx];
427 for &right_idx in right_indices {
428 let right_row_number = right.row_numbers[right_idx];
429 composite_keys.push(Self::make_composite_key(left_row_number, right_row_number));
430 }
431 }
432
433 let row_numbers_with_flags =
435 self.row_number_provider.get_or_create_row_numbers(txn, composite_keys.iter())?;
436 let row_numbers: Vec<RowNumber> = row_numbers_with_flags.iter().map(|(rn, _)| *rn).collect();
437
438 let builder = JoinedColumnsBuilder::new(left, right, &self.alias);
439 Ok(builder.join_cartesian(&row_numbers, left, left_indices, right, right_indices))
440 }
441
442 fn determine_side(&self, change: &Change) -> Option<JoinSide> {
443 match &change.origin {
444 ChangeOrigin::Flow(from_node) => {
445 if *from_node == self.left_node {
446 Some(JoinSide::Left)
447 } else if *from_node == self.right_node {
448 Some(JoinSide::Right)
449 } else {
450 None
451 }
452 }
453 _ => None,
454 }
455 }
456}
457
458impl RawStatefulOperator for JoinOperator {}
459
460impl SingleStateful for JoinOperator {
461 fn layout(&self) -> RowSchema {
462 self.schema.clone()
463 }
464}
465
466impl Operator for JoinOperator {
467 fn id(&self) -> FlowNodeId {
468 self.node
469 }
470
471 fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
472 if let ChangeOrigin::Flow(from_node) = &change.origin {
474 if *from_node == self.node {
475 return Ok(Change::from_flow(self.node, change.version, Vec::new()));
476 }
477 }
478
479 let mut state = JoinState::new(self.node);
481 let estimated_capacity = change.diffs.len() * 2;
483 let mut result = Vec::with_capacity(estimated_capacity);
484
485 let side = self
487 .determine_side(&change)
488 .ok_or_else(|| Error(internal!("Join operator received change from unknown node")))?;
489
490 let compiled_exprs = match side {
491 JoinSide::Left => &self.compiled_left_exprs,
492 JoinSide::Right => &self.compiled_right_exprs,
493 };
494
495 for diff in change.diffs {
497 match diff {
498 Diff::Insert {
499 post,
500 } => {
501 let keys = self.compute_join_keys(&post, compiled_exprs)?;
503 let row_count = post.row_count();
504
505 let mut inserts_by_key: IndexMap<Hash128, Vec<usize>> = IndexMap::new();
507 let mut inserts_undefined: Vec<usize> = Vec::new();
508
509 for row_idx in 0..row_count {
510 if let Some(key_hash) = keys[row_idx] {
511 inserts_by_key.entry(key_hash).or_default().push(row_idx);
512 } else {
513 inserts_undefined.push(row_idx);
514 }
515 }
516
517 for (key_hash, indices) in inserts_by_key {
519 let diffs = self.strategy.handle_insert(
520 txn, &post, &indices, side, &key_hash, &mut state, self,
521 )?;
522 result.extend(diffs);
523 }
524
525 for idx in inserts_undefined {
527 let diffs = self.strategy.handle_insert_undefined(
528 txn, &post, idx, side, &mut state, self,
529 )?;
530 result.extend(diffs);
531 }
532 }
533 Diff::Remove {
534 pre,
535 } => {
536 let keys = self.compute_join_keys(&pre, compiled_exprs)?;
538 let row_count = pre.row_count();
539
540 let mut removes_by_key: IndexMap<Hash128, Vec<usize>> = IndexMap::new();
542 let mut removes_undefined: Vec<usize> = Vec::new();
543
544 for row_idx in 0..row_count {
545 if let Some(key_hash) = keys[row_idx] {
546 removes_by_key.entry(key_hash).or_default().push(row_idx);
547 } else {
548 removes_undefined.push(row_idx);
549 }
550 }
551
552 for (key_hash, indices) in removes_by_key {
554 let diffs = self.strategy.handle_remove(
555 txn,
556 &pre,
557 &indices,
558 side,
559 &key_hash,
560 &mut state,
561 self,
562 change.version,
563 )?;
564 result.extend(diffs);
565 }
566
567 for idx in removes_undefined {
569 let diffs = self.strategy.handle_remove_undefined(
570 txn,
571 &pre,
572 idx,
573 side,
574 &mut state,
575 self,
576 change.version,
577 )?;
578 result.extend(diffs);
579 }
580 }
581 Diff::Update {
582 pre,
583 post,
584 } => {
585 let pre_keys = self.compute_join_keys(&pre, compiled_exprs)?;
587 let post_keys = self.compute_join_keys(&post, compiled_exprs)?;
588 let row_count = post.row_count();
589
590 let mut updates_by_key: IndexMap<(Hash128, Hash128), Vec<usize>> =
593 IndexMap::new();
594 let mut updates_undefined: Vec<usize> = Vec::new();
595
596 for row_idx in 0..row_count {
597 match (pre_keys[row_idx], post_keys[row_idx]) {
598 (Some(pre_key), Some(post_key)) => {
599 updates_by_key
600 .entry((pre_key, post_key))
601 .or_default()
602 .push(row_idx);
603 }
604 _ => {
605 updates_undefined.push(row_idx);
608 }
609 }
610 }
611
612 for ((pre_key, post_key), indices) in updates_by_key {
614 let diffs = self.strategy.handle_update(
615 txn,
616 &pre,
617 &post,
618 &indices,
619 side,
620 &pre_key,
621 &post_key,
622 &mut state,
623 self,
624 change.version,
625 )?;
626 result.extend(diffs);
627 }
628
629 for row_idx in updates_undefined {
631 let diffs = self.strategy.handle_update_undefined(
632 txn,
633 &pre,
634 &post,
635 row_idx,
636 side,
637 &mut state,
638 self,
639 change.version,
640 )?;
641 result.extend(diffs);
642 }
643 }
644 }
645 }
646
647 Ok(Change::from_flow(self.node, change.version, result))
648 }
649
650 fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
657 let mut found_columns: Vec<Columns> = Vec::new();
658
659 for &row_number in rows {
660 let Some(key) = self.row_number_provider.get_key_for_row_number(txn, row_number)? else {
662 continue;
663 };
664
665 let Some((left_row_number, right_row_number)) = Self::parse_composite_key(key.as_ref()) else {
667 continue;
668 };
669
670 let left_cols = self.left_parent.pull(txn, &[left_row_number])?;
672 if left_cols.is_empty() {
673 continue;
674 }
675
676 if let Some(right_row_num) = right_row_number {
677 let right_cols = self.right_parent.pull(txn, &[right_row_num])?;
679 if !right_cols.is_empty() {
680 let builder = JoinedColumnsBuilder::new(&left_cols, &right_cols, &self.alias);
682 let mut joined = builder.join_single(row_number, &left_cols, &right_cols);
683 joined.row_numbers = CowVec::new(vec![row_number]);
685 found_columns.push(joined);
686 }
687 } else {
688 let right_schema = self.right_parent.pull(txn, &[])?;
690 let builder = JoinedColumnsBuilder::new(&left_cols, &right_schema, &self.alias);
691 let mut unmatched = builder.unmatched_left(row_number, &left_cols, 0, &right_schema);
692 unmatched.row_numbers = CowVec::new(vec![row_number]);
694 found_columns.push(unmatched);
695 }
696 }
697
698 if found_columns.is_empty() {
700 let left_schema = self.left_parent.pull(txn, &[])?;
702 let right_schema = self.right_parent.pull(txn, &[])?;
703
704 let builder = JoinedColumnsBuilder::new(&left_schema, &right_schema, &self.alias);
706 let right_names = builder.right_column_names();
707
708 let mut all_columns: Vec<Column> = left_schema.columns.into_iter().collect();
710
711 for (col, aliased_name) in right_schema.columns.into_iter().zip(right_names.iter()) {
713 all_columns.push(Column {
714 name: Fragment::internal(aliased_name),
715 data: col.data,
716 });
717 }
718
719 Ok(Columns {
720 row_numbers: CowVec::new(Vec::new()),
721 columns: CowVec::new(all_columns),
722 })
723 } else if found_columns.len() == 1 {
724 Ok(found_columns.remove(0))
725 } else {
726 let mut result = found_columns.remove(0);
727 for cols in found_columns {
728 result.row_numbers.make_mut().extend(cols.row_numbers.iter().copied());
729 for (i, col) in cols.columns.into_iter().enumerate() {
730 result.columns.make_mut()[i].extend(col).expect("schema mismatch in join pull");
731 }
732 }
733 Ok(result)
734 }
735 }
736}