Skip to main content

reifydb_sub_flow/operator/join/
operator.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::{Arc, LazyLock};
5
6use indexmap::IndexMap;
7use postcard::to_stdvec;
8use reifydb_core::{
9	common::{CommitVersion, JoinType},
10	encoded::{key::EncodedKey, shape::RowShape},
11	interface::{
12		catalog::flow::FlowNodeId,
13		change::{Change, ChangeOrigin, Diff},
14	},
15	internal,
16	util::encoding::keycode::serializer::KeySerializer,
17	value::column::{ColumnWithName, columns::Columns},
18};
19use reifydb_engine::{
20	expression::{
21		compile::{CompiledExpr, compile_expression},
22		context::{CompileContext, EvalContext},
23	},
24	vm::{executor::Executor, stack::SymbolTable},
25};
26use reifydb_routine::routine::registry::Routines;
27use reifydb_rql::expression::Expression;
28use reifydb_runtime::{
29	context::RuntimeContext,
30	hash::{Hash128, xxh3_128},
31};
32use reifydb_type::{
33	Result,
34	error::Error,
35	fragment::Fragment,
36	params::Params,
37	util::cowvec::CowVec,
38	value::{Value, datetime::DateTime, identity::IdentityId, row_number::RowNumber, r#type::Type},
39};
40
41use super::{
42	column::JoinedColumnsBuilder,
43	state::{JoinSide, JoinState},
44	strategy::{JoinContext, JoinStrategy, UpdateKeys},
45};
46use crate::{
47	operator::{
48		Operator, Operators,
49		stateful::{raw::RawStatefulOperator, row::RowNumberProvider, single::SingleStateful},
50	},
51	transaction::FlowTransaction,
52};
53
54static EMPTY_PARAMS: Params = Params::None;
55static EMPTY_SYMBOL_TABLE: LazyLock<SymbolTable> = LazyLock::new(SymbolTable::new);
56
57/// Configuration for one side (left or right) of a join operator.
58pub struct JoinSideConfig {
59	pub parent: Arc<Operators>,
60	pub node: FlowNodeId,
61	pub exprs: Vec<Expression>,
62}
63
64pub struct JoinOperator {
65	pub(crate) left_parent: Arc<Operators>,
66	pub(crate) right_parent: Arc<Operators>,
67	node: FlowNodeId,
68	strategy: JoinStrategy,
69	left_node: FlowNodeId,
70	right_node: FlowNodeId,
71	left_exprs: Vec<Expression>,
72	pub(crate) right_exprs: Vec<Expression>,
73	compiled_left_exprs: Vec<CompiledExpr>,
74	compiled_right_exprs: Vec<CompiledExpr>,
75	alias: Option<String>,
76	shape: RowShape,
77	row_number_provider: RowNumberProvider,
78	executor: Executor,
79	routines: Routines,
80	runtime_context: RuntimeContext,
81}
82
83impl JoinOperator {
84	pub fn new(
85		left: JoinSideConfig,
86		right: JoinSideConfig,
87		node: FlowNodeId,
88		join_type: JoinType,
89		alias: Option<String>,
90		executor: Executor,
91	) -> Self {
92		let left_parent = left.parent;
93		let right_parent = right.parent;
94		let left_node = left.node;
95		let right_node = right.node;
96		let left_exprs = left.exprs;
97		let right_exprs = right.exprs;
98		let strategy = JoinStrategy::from(join_type);
99		let shape = Self::state_shape();
100		let row_number_provider = RowNumberProvider::new(node);
101
102		// Create compile context with empty symbol table
103		let compile_ctx = CompileContext {
104			symbols: &EMPTY_SYMBOL_TABLE,
105		};
106
107		// Compile expressions at construction time
108		let compiled_left_exprs: Vec<CompiledExpr> = left_exprs
109			.iter()
110			.map(|e| compile_expression(&compile_ctx, e))
111			.collect::<Result<Vec<_>>>()
112			.expect("Failed to compile left expressions");
113
114		let compiled_right_exprs: Vec<CompiledExpr> = right_exprs
115			.iter()
116			.map(|e| compile_expression(&compile_ctx, e))
117			.collect::<Result<Vec<_>>>()
118			.expect("Failed to compile right expressions");
119
120		// Extract Functions and RuntimeContext from executor
121		let routines = executor.routines.clone();
122		let runtime_context = executor.runtime_context.clone();
123
124		Self {
125			left_parent,
126			right_parent,
127			node,
128			strategy,
129			left_node,
130			right_node,
131			left_exprs,
132			right_exprs,
133			compiled_left_exprs,
134			compiled_right_exprs,
135			alias,
136			shape,
137			row_number_provider,
138			executor,
139			routines,
140			runtime_context,
141		}
142	}
143
144	fn state_shape() -> RowShape {
145		RowShape::testing(&[Type::Blob])
146	}
147
148	/// Compute join keys for all rows in Columns
149	/// Returns Vec<Option<Hash128>> - one per row, None for rows with undefined key values
150	pub(crate) fn compute_join_keys(
151		&self,
152		columns: &Columns,
153		compiled_exprs: &[CompiledExpr],
154	) -> Result<Vec<Option<Hash128>>> {
155		let row_count = columns.row_count();
156		if row_count == 0 {
157			return Ok(Vec::new());
158		}
159
160		let session = EvalContext {
161			params: &EMPTY_PARAMS,
162			symbols: &EMPTY_SYMBOL_TABLE,
163			routines: &self.routines,
164			runtime_context: &self.runtime_context,
165			arena: None,
166			identity: IdentityId::root(),
167			is_aggregate_context: false,
168			columns: Columns::empty(),
169			row_count: 1,
170			target: None,
171			take: None,
172		};
173		let exec_ctx = session.with_eval(columns.clone(), row_count);
174
175		// Evaluate all compiled expressions on the entire batch
176		let mut expr_columns = Vec::with_capacity(compiled_exprs.len());
177		for compiled_expr in compiled_exprs.iter() {
178			let col: ColumnWithName = if let Some(col_name) = compiled_expr.access_column_name() {
179				columns.column(col_name)
180					.map(|c| ColumnWithName::new(c.name().clone(), c.data().clone()))
181					.unwrap_or_else(|| {
182						ColumnWithName::undefined_typed(col_name, Type::Boolean, row_count)
183					})
184			} else {
185				compiled_expr.execute(&exec_ctx)?
186			};
187			expr_columns.push(col);
188		}
189
190		// Compute hash for each row
191		let mut hashes = Vec::with_capacity(row_count);
192		for row_idx in 0..row_count {
193			let mut hasher = Vec::with_capacity(256);
194			let mut has_undefined = false;
195
196			for col in &expr_columns {
197				let value = col.data().get_value(row_idx);
198
199				// Check if the value is undefined - undefined values should never match in joins
200				if matches!(value, Value::None { .. }) {
201					has_undefined = true;
202					break;
203				}
204
205				let bytes = to_stdvec(&value).map_err(|e| {
206					Error(Box::new(internal!("Failed to encode value for hash: {}", e)))
207				})?;
208				hasher.extend_from_slice(&bytes);
209			}
210
211			if has_undefined {
212				hashes.push(None);
213			} else {
214				hashes.push(Some(xxh3_128(&hasher)));
215			}
216		}
217
218		Ok(hashes)
219	}
220
221	/// Generate columns for an unmatched left join result.
222	/// Creates combined columns with left values and Undefined values for right columns.
223	pub(crate) fn unmatched_left_columns(
224		&self,
225		txn: &mut FlowTransaction,
226		left: &Columns,
227		left_idx: usize,
228	) -> Result<Columns> {
229		let left_row_number = left.row_numbers[left_idx];
230
231		// Create composite key for this unmatched row
232		let mut serializer = KeySerializer::new();
233		serializer.extend_u8(b'L');
234		serializer.extend_u64(left_row_number.0);
235		let composite_key = EncodedKey::new(serializer.finish());
236
237		// Get or create a unique row number for this unmatched row
238		let (result_row_number, _is_new) =
239			self.row_number_provider.get_or_create_row_number(txn, &composite_key)?;
240
241		// Get the right side shape
242		let right_shape = self.right_parent.pull(txn, &[])?;
243
244		// Build using JoinedColumnsBuilder
245		let builder = JoinedColumnsBuilder::new(left, &right_shape, &self.alias);
246		Ok(builder.unmatched_left(result_row_number, left, left_idx, &right_shape))
247	}
248
249	/// Generate columns for multiple unmatched left join results.
250	pub(crate) fn unmatched_left_columns_batch(
251		&self,
252		txn: &mut FlowTransaction,
253		left: &Columns,
254		left_indices: &[usize],
255	) -> Result<Columns> {
256		if left_indices.is_empty() {
257			return Ok(Columns::empty());
258		}
259
260		// Build composite keys for all unmatched rows
261		let composite_keys: Vec<EncodedKey> = left_indices
262			.iter()
263			.map(|&idx| {
264				let left_row_number = left.row_numbers[idx];
265				let mut serializer = KeySerializer::new();
266				serializer.extend_u8(b'L');
267				serializer.extend_u64(left_row_number.0);
268				EncodedKey::new(serializer.finish())
269			})
270			.collect();
271
272		// Batch get/create row numbers
273		let row_numbers_with_flags =
274			self.row_number_provider.get_or_create_row_numbers(txn, composite_keys.iter())?;
275		let row_numbers: Vec<RowNumber> = row_numbers_with_flags.iter().map(|(rn, _)| *rn).collect();
276
277		// Get the right side shape
278		let right_shape = self.right_parent.pull(txn, &[])?;
279
280		// Build using JoinedColumnsBuilder
281		let builder = JoinedColumnsBuilder::new(left, &right_shape, &self.alias);
282		Ok(builder.unmatched_left_batch(&row_numbers, left, left_indices, &right_shape))
283	}
284
285	/// Clean up all join results for a given left row
286	/// This removes both matched and unmatched join results
287	pub(crate) fn cleanup_left_row_joins(&self, txn: &mut FlowTransaction, left_number: u64) -> Result<()> {
288		let mut serializer = KeySerializer::new();
289		serializer.extend_u8(b'L');
290		serializer.extend_u64(left_number);
291		let prefix = serializer.finish();
292
293		// Remove all mappings with this prefix
294		self.row_number_provider.remove_by_prefix(txn, &prefix)
295	}
296
297	/// Join a single left row with a single right row, returning combined Columns.
298	pub(crate) fn join_columns(
299		&self,
300		txn: &mut FlowTransaction,
301		left: &Columns,
302		left_idx: usize,
303		right: &Columns,
304		right_idx: usize,
305	) -> Result<Columns> {
306		let left_row_number = left.row_numbers[left_idx];
307		let right_row_number = right.row_numbers[right_idx];
308
309		let composite_key = Self::make_composite_key(left_row_number, right_row_number);
310		let (result_row_number, _is_new) =
311			self.row_number_provider.get_or_create_row_number(txn, &composite_key)?;
312
313		// Join directly at indices without extracting rows
314		let builder = JoinedColumnsBuilder::new(left, right, &self.alias);
315		Ok(builder.join_at_indices(result_row_number, left, left_idx, right, right_idx))
316	}
317
318	/// Create a composite key for a join result from left and right row numbers.
319	fn make_composite_key(left_num: RowNumber, right_num: RowNumber) -> EncodedKey {
320		let mut serializer = KeySerializer::new();
321		serializer.extend_u8(b'L');
322		serializer.extend_u64(left_num.0);
323		serializer.extend_u64(right_num.0);
324		EncodedKey::new(serializer.finish())
325	}
326
327	/// Decode a u64 from keycode format (big-endian with bits flipped).
328	/// The keycode format inverts all bits for proper byte-order sorting.
329	fn decode_row_number_from_keycode(bytes: &[u8]) -> u64 {
330		let arr: [u8; 8] =
331			[!bytes[0], !bytes[1], !bytes[2], !bytes[3], !bytes[4], !bytes[5], !bytes[6], !bytes[7]];
332		u64::from_be_bytes(arr)
333	}
334
335	/// Parse a composite key to extract left and optional right row numbers.
336	/// Returns None if the key format is invalid.
337	/// Key format: '!L' (1 byte inverted) + left_row_number (8 bytes) + optional right_row_number (8 bytes)
338	fn parse_composite_key(key_bytes: &[u8]) -> Option<(RowNumber, Option<RowNumber>)> {
339		// Check minimum length and 'L' prefix (inverted in keycode format)
340		if key_bytes.len() < 9 || key_bytes[0] != !b'L' {
341			return None;
342		}
343
344		let left_num = Self::decode_row_number_from_keycode(&key_bytes[1..9]);
345		let right_num = if key_bytes.len() >= 17 {
346			Some(RowNumber(Self::decode_row_number_from_keycode(&key_bytes[9..17])))
347		} else {
348			None
349		};
350
351		Some((RowNumber(left_num), right_num))
352	}
353
354	/// Join one left row with all right rows.
355	/// Returns combined Columns with one row per right row.
356	pub(crate) fn join_columns_one_to_many(
357		&self,
358		txn: &mut FlowTransaction,
359		left: &Columns,
360		left_idx: usize,
361		right: &Columns,
362	) -> Result<Columns> {
363		let right_count = right.row_count();
364		if right_count == 0 {
365			return Ok(Columns::empty());
366		}
367
368		let left_row_number = left.row_numbers[left_idx];
369
370		// Build all composite keys
371		let composite_keys: Vec<EncodedKey> = (0..right_count)
372			.map(|right_idx| {
373				let right_row_number = right.row_numbers[right_idx];
374				Self::make_composite_key(left_row_number, right_row_number)
375			})
376			.collect();
377
378		// Batch get/create row numbers
379		let row_numbers_with_flags =
380			self.row_number_provider.get_or_create_row_numbers(txn, composite_keys.iter())?;
381		let row_numbers: Vec<RowNumber> = row_numbers_with_flags.iter().map(|(rn, _)| *rn).collect();
382
383		let builder = JoinedColumnsBuilder::new(left, right, &self.alias);
384		Ok(builder.join_one_to_many(&row_numbers, left, left_idx, right))
385	}
386
387	/// Join all left rows with one right row.
388	/// Returns combined Columns with one row per left row.
389	pub(crate) fn join_columns_many_to_one(
390		&self,
391		txn: &mut FlowTransaction,
392		left: &Columns,
393		right: &Columns,
394		right_idx: usize,
395	) -> Result<Columns> {
396		let left_count = left.row_count();
397		if left_count == 0 {
398			return Ok(Columns::empty());
399		}
400
401		let right_row_number = right.row_numbers[right_idx];
402
403		// Build all composite keys
404		let composite_keys: Vec<EncodedKey> = (0..left_count)
405			.map(|left_idx| {
406				let left_row_number = left.row_numbers[left_idx];
407				Self::make_composite_key(left_row_number, right_row_number)
408			})
409			.collect();
410
411		// Batch get/create row numbers
412		let row_numbers_with_flags =
413			self.row_number_provider.get_or_create_row_numbers(txn, composite_keys.iter())?;
414		let row_numbers: Vec<RowNumber> = row_numbers_with_flags.iter().map(|(rn, _)| *rn).collect();
415
416		let builder = JoinedColumnsBuilder::new(left, right, &self.alias);
417		Ok(builder.join_many_to_one(&row_numbers, left, right, right_idx))
418	}
419
420	/// Join left rows at specified indices with right rows at specified indices (cartesian product).
421	/// Returns combined Columns with left_indices.len() * right_indices.len() rows.
422	pub(crate) fn join_columns_cartesian(
423		&self,
424		txn: &mut FlowTransaction,
425		left: &Columns,
426		left_indices: &[usize],
427		right: &Columns,
428		right_indices: &[usize],
429	) -> Result<Columns> {
430		let left_count = left_indices.len();
431		let right_count = right_indices.len();
432		if left_count == 0 || right_count == 0 {
433			return Ok(Columns::empty());
434		}
435
436		// Build all composite keys for cartesian product
437		let total_results = left_count * right_count;
438		let mut composite_keys = Vec::with_capacity(total_results);
439
440		for &left_idx in left_indices {
441			let left_row_number = left.row_numbers[left_idx];
442			for &right_idx in right_indices {
443				let right_row_number = right.row_numbers[right_idx];
444				composite_keys.push(Self::make_composite_key(left_row_number, right_row_number));
445			}
446		}
447
448		// Batch get/create row numbers
449		let row_numbers_with_flags =
450			self.row_number_provider.get_or_create_row_numbers(txn, composite_keys.iter())?;
451		let row_numbers: Vec<RowNumber> = row_numbers_with_flags.iter().map(|(rn, _)| *rn).collect();
452
453		let builder = JoinedColumnsBuilder::new(left, right, &self.alias);
454		Ok(builder.join_cartesian(&row_numbers, left, left_indices, right, right_indices))
455	}
456
457	fn determine_side(&self, change: &Change) -> Option<JoinSide> {
458		match &change.origin {
459			ChangeOrigin::Flow(from_node) => {
460				if *from_node == self.left_node {
461					Some(JoinSide::Left)
462				} else if *from_node == self.right_node {
463					Some(JoinSide::Right)
464				} else {
465					None
466				}
467			}
468			_ => None,
469		}
470	}
471}
472
473impl RawStatefulOperator for JoinOperator {}
474
475impl SingleStateful for JoinOperator {
476	fn layout(&self) -> RowShape {
477		self.shape.clone()
478	}
479}
480
481impl Operator for JoinOperator {
482	fn id(&self) -> FlowNodeId {
483		self.node
484	}
485
486	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
487		if let ChangeOrigin::Flow(from_node) = &change.origin
488			&& *from_node == self.node
489		{
490			return Ok(Change::from_flow(self.node, change.version, Vec::new(), DateTime::default()));
491		}
492
493		let mut state = JoinState::new(self.node);
494		let mut result = Vec::with_capacity(change.diffs.len() * 2);
495
496		let side = self
497			.determine_side(&change)
498			.ok_or_else(|| Error(Box::new(internal!("Join operator received change from unknown node"))))?;
499
500		let compiled_exprs = match side {
501			JoinSide::Left => &self.compiled_left_exprs,
502			JoinSide::Right => &self.compiled_right_exprs,
503		};
504
505		let version = change.version;
506		for diff in change.diffs {
507			match diff {
508				Diff::Insert {
509					post,
510				} => self.apply_join_insert(
511					txn,
512					&post,
513					compiled_exprs,
514					side,
515					version,
516					&mut state,
517					&mut result,
518				)?,
519				Diff::Remove {
520					pre,
521				} => self.apply_join_remove(
522					txn,
523					&pre,
524					compiled_exprs,
525					side,
526					version,
527					&mut state,
528					&mut result,
529				)?,
530				Diff::Update {
531					pre,
532					post,
533				} => self.apply_join_update(
534					txn,
535					&pre,
536					&post,
537					compiled_exprs,
538					side,
539					version,
540					&mut state,
541					&mut result,
542				)?,
543			}
544		}
545
546		Ok(Change::from_flow(self.node, version, result, change.changed_at))
547	}
548
549	// FIXME #244 The issue is that when we need to reconstruct an unmatched left row, we need the right side's
550	// shape to create the combined layout To make that work it requires shape / layout information of the right
551	// side this should unlock the test:
552	// testsuite/flow/tests/scripts/backfill/18_multiple_joins_same_table.skip
553	// testsuite/flow/tests/scripts/backfill/19_complex_multi_table.skip
554	// testsuite/flow/tests/scripts/backfill/21_backfill_with_distinct.skip
555	fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
556		let mut found_columns: Vec<Columns> = Vec::new();
557
558		for &row_number in rows {
559			if let Some(joined) = self.pull_one_joined_row(txn, row_number)? {
560				found_columns.push(joined);
561			}
562		}
563
564		if found_columns.is_empty() {
565			return self.empty_joined_shape(txn);
566		}
567		if found_columns.len() == 1 {
568			return Ok(found_columns.remove(0));
569		}
570
571		let mut result = found_columns.remove(0);
572		for cols in found_columns {
573			result.row_numbers.make_mut().extend(cols.row_numbers.iter().copied());
574			for (i, col) in cols.columns.into_iter().enumerate() {
575				result.columns.make_mut()[i].extend(col).expect("shape mismatch in join pull");
576			}
577		}
578		Ok(result)
579	}
580}
581
582impl JoinOperator {
583	#[inline]
584	fn pull_one_joined_row(&self, txn: &mut FlowTransaction, row_number: RowNumber) -> Result<Option<Columns>> {
585		let Some(key) = self.row_number_provider.get_key_for_row_number(txn, row_number)? else {
586			return Ok(None);
587		};
588		let Some((left_row_number, right_row_number)) = Self::parse_composite_key(key.as_ref()) else {
589			return Ok(None);
590		};
591
592		let left_cols = self.left_parent.pull(txn, &[left_row_number])?;
593		if left_cols.is_empty() {
594			return Ok(None);
595		}
596
597		if let Some(right_row_num) = right_row_number {
598			let right_cols = self.right_parent.pull(txn, &[right_row_num])?;
599			if right_cols.is_empty() {
600				return Ok(None);
601			}
602			let builder = JoinedColumnsBuilder::new(&left_cols, &right_cols, &self.alias);
603			let mut joined = builder.join_single(row_number, &left_cols, &right_cols);
604			joined.row_numbers = CowVec::new(vec![row_number]);
605			Ok(Some(joined))
606		} else {
607			let right_shape = self.right_parent.pull(txn, &[])?;
608			let builder = JoinedColumnsBuilder::new(&left_cols, &right_shape, &self.alias);
609			let mut unmatched = builder.unmatched_left(row_number, &left_cols, 0, &right_shape);
610			unmatched.row_numbers = CowVec::new(vec![row_number]);
611			Ok(Some(unmatched))
612		}
613	}
614
615	#[inline]
616	fn empty_joined_shape(&self, txn: &mut FlowTransaction) -> Result<Columns> {
617		let left_shape = self.left_parent.pull(txn, &[])?;
618		let right_shape = self.right_parent.pull(txn, &[])?;
619		let builder = JoinedColumnsBuilder::new(&left_shape, &right_shape, &self.alias);
620		let right_names = builder.right_column_names();
621
622		let mut all_columns: Vec<ColumnWithName> = left_shape
623			.names
624			.iter()
625			.zip(left_shape.columns.iter())
626			.map(|(name, data)| ColumnWithName::new(name.clone(), data.clone()))
627			.collect();
628
629		for (col, aliased_name) in right_shape.columns.into_iter().zip(right_names.iter()) {
630			all_columns.push(ColumnWithName::new(Fragment::internal(aliased_name), col));
631		}
632
633		Ok(Columns::new(all_columns))
634	}
635}
636
637impl JoinOperator {
638	#[inline]
639	#[allow(clippy::too_many_arguments)]
640	fn apply_join_insert(
641		&self,
642		txn: &mut FlowTransaction,
643		post: &Columns,
644		compiled_exprs: &[CompiledExpr],
645		side: JoinSide,
646		version: CommitVersion,
647		state: &mut JoinState,
648		result: &mut Vec<Diff>,
649	) -> Result<()> {
650		let keys = self.compute_join_keys(post, compiled_exprs)?;
651		let mut inserts_by_key: IndexMap<Hash128, Vec<usize>> = IndexMap::new();
652		let mut inserts_undefined: Vec<usize> = Vec::new();
653
654		for (row_idx, key) in keys.iter().enumerate() {
655			if let Some(key_hash) = key {
656				inserts_by_key.entry(*key_hash).or_default().push(row_idx);
657			} else {
658				inserts_undefined.push(row_idx);
659			}
660		}
661
662		for (key_hash, indices) in inserts_by_key {
663			let mut ctx = JoinContext {
664				side,
665				state,
666				operator: self,
667				version,
668			};
669			let diffs = self.strategy.handle_insert(txn, post, &indices, &key_hash, &mut ctx)?;
670			result.extend(diffs);
671		}
672
673		for idx in inserts_undefined {
674			let mut ctx = JoinContext {
675				side,
676				state,
677				operator: self,
678				version,
679			};
680			let diffs = self.strategy.handle_insert_undefined(txn, post, idx, &mut ctx)?;
681			result.extend(diffs);
682		}
683
684		Ok(())
685	}
686
687	#[inline]
688	#[allow(clippy::too_many_arguments)]
689	fn apply_join_remove(
690		&self,
691		txn: &mut FlowTransaction,
692		pre: &Columns,
693		compiled_exprs: &[CompiledExpr],
694		side: JoinSide,
695		version: CommitVersion,
696		state: &mut JoinState,
697		result: &mut Vec<Diff>,
698	) -> Result<()> {
699		let keys = self.compute_join_keys(pre, compiled_exprs)?;
700		let mut removes_by_key: IndexMap<Hash128, Vec<usize>> = IndexMap::new();
701		let mut removes_undefined: Vec<usize> = Vec::new();
702
703		for (row_idx, key) in keys.iter().enumerate() {
704			if let Some(key_hash) = key {
705				removes_by_key.entry(*key_hash).or_default().push(row_idx);
706			} else {
707				removes_undefined.push(row_idx);
708			}
709		}
710
711		for (key_hash, indices) in removes_by_key {
712			let mut ctx = JoinContext {
713				side,
714				state,
715				operator: self,
716				version,
717			};
718			let diffs = self.strategy.handle_remove(txn, pre, &indices, &key_hash, &mut ctx)?;
719			result.extend(diffs);
720		}
721
722		for idx in removes_undefined {
723			let mut ctx = JoinContext {
724				side,
725				state,
726				operator: self,
727				version,
728			};
729			let diffs = self.strategy.handle_remove_undefined(txn, pre, idx, &mut ctx)?;
730			result.extend(diffs);
731		}
732
733		Ok(())
734	}
735
736	#[inline]
737	#[allow(clippy::too_many_arguments)]
738	fn apply_join_update(
739		&self,
740		txn: &mut FlowTransaction,
741		pre: &Columns,
742		post: &Columns,
743		compiled_exprs: &[CompiledExpr],
744		side: JoinSide,
745		version: CommitVersion,
746		state: &mut JoinState,
747		result: &mut Vec<Diff>,
748	) -> Result<()> {
749		let pre_keys = self.compute_join_keys(pre, compiled_exprs)?;
750		let post_keys = self.compute_join_keys(post, compiled_exprs)?;
751		let row_count = post.row_count();
752
753		let mut updates_by_key: IndexMap<(Hash128, Hash128), Vec<usize>> = IndexMap::new();
754		let mut updates_undefined: Vec<usize> = Vec::new();
755
756		for row_idx in 0..row_count {
757			match (pre_keys[row_idx], post_keys[row_idx]) {
758				(Some(pre_key), Some(post_key)) => {
759					updates_by_key.entry((pre_key, post_key)).or_default().push(row_idx);
760				}
761				_ => {
762					updates_undefined.push(row_idx);
763				}
764			}
765		}
766
767		for ((pre_key, post_key), indices) in updates_by_key {
768			let mut ctx = JoinContext {
769				side,
770				state,
771				operator: self,
772				version,
773			};
774			let keys = UpdateKeys {
775				pre: &pre_key,
776				post: &post_key,
777			};
778			let diffs = self.strategy.handle_update(txn, pre, post, &indices, keys, &mut ctx)?;
779			result.extend(diffs);
780		}
781
782		for row_idx in updates_undefined {
783			let mut ctx = JoinContext {
784				side,
785				state,
786				operator: self,
787				version,
788			};
789			let diffs = self.strategy.handle_update_undefined(txn, pre, post, row_idx, &mut ctx)?;
790			result.extend(diffs);
791		}
792
793		Ok(())
794	}
795}