Skip to main content

reifydb_sub_flow/operator/join/
operator.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::{Arc, LazyLock};
5
6use indexmap::IndexMap;
7use postcard::to_stdvec;
8use reifydb_core::{
9	common::JoinType,
10	encoded::{key::EncodedKey, schema::Schema},
11	interface::{
12		catalog::flow::FlowNodeId,
13		change::{Change, ChangeOrigin, Diff},
14	},
15	internal,
16	util::encoding::keycode::serializer::KeySerializer,
17	value::column::{Column, columns::Columns},
18};
19use reifydb_engine::{
20	expression::{
21		compile::{CompiledExpr, compile_expression},
22		context::{CompileContext, EvalContext},
23	},
24	vm::{executor::Executor, stack::SymbolTable},
25};
26use reifydb_function::registry::Functions;
27use reifydb_rql::expression::Expression;
28use reifydb_runtime::{
29	clock::Clock,
30	hash::{Hash128, xxh3_128},
31};
32use reifydb_type::{
33	Result,
34	error::Error,
35	fragment::Fragment,
36	params::Params,
37	util::cowvec::CowVec,
38	value::{Value, identity::IdentityId, row_number::RowNumber, r#type::Type},
39};
40
41use super::{
42	column::JoinedColumnsBuilder,
43	state::{JoinSide, JoinState},
44	strategy::JoinStrategy,
45};
46use crate::{
47	operator::{
48		Operator, Operators,
49		stateful::{raw::RawStatefulOperator, row::RowNumberProvider, single::SingleStateful},
50	},
51	transaction::FlowTransaction,
52};
53
54static EMPTY_PARAMS: Params = Params::None;
55static EMPTY_SYMBOL_TABLE: LazyLock<SymbolTable> = LazyLock::new(|| SymbolTable::new());
56
57pub struct JoinOperator {
58	pub(crate) left_parent: Arc<Operators>,
59	pub(crate) right_parent: Arc<Operators>,
60	node: FlowNodeId,
61	strategy: JoinStrategy,
62	left_node: FlowNodeId,
63	right_node: FlowNodeId,
64	left_exprs: Vec<Expression>,
65	pub(crate) right_exprs: Vec<Expression>,
66	compiled_left_exprs: Vec<CompiledExpr>,
67	compiled_right_exprs: Vec<CompiledExpr>,
68	alias: Option<String>,
69	schema: Schema,
70	row_number_provider: RowNumberProvider,
71	executor: Executor,
72	functions: Functions,
73	clock: Clock,
74}
75
76impl JoinOperator {
77	pub fn new(
78		left_parent: Arc<Operators>,
79		right_parent: Arc<Operators>,
80		node: FlowNodeId,
81		join_type: JoinType,
82		left_node: FlowNodeId,
83		right_node: FlowNodeId,
84		left_exprs: Vec<Expression>,
85		right_exprs: Vec<Expression>,
86		alias: Option<String>,
87		executor: Executor,
88	) -> Self {
89		let strategy = JoinStrategy::from(join_type);
90		let schema = Self::state_schema();
91		let row_number_provider = RowNumberProvider::new(node);
92
93		// Create compile context with empty symbol table
94		let compile_ctx = CompileContext {
95			functions: &executor.functions,
96			symbol_table: &EMPTY_SYMBOL_TABLE,
97		};
98
99		// Compile expressions at construction time
100		let compiled_left_exprs: Vec<CompiledExpr> = left_exprs
101			.iter()
102			.map(|e| compile_expression(&compile_ctx, e))
103			.collect::<Result<Vec<_>>>()
104			.expect("Failed to compile left expressions");
105
106		let compiled_right_exprs: Vec<CompiledExpr> = right_exprs
107			.iter()
108			.map(|e| compile_expression(&compile_ctx, e))
109			.collect::<Result<Vec<_>>>()
110			.expect("Failed to compile right expressions");
111
112		// Extract Functions and Clock from executor
113		let functions = executor.functions.clone();
114		let clock = executor.clock.clone();
115
116		Self {
117			left_parent,
118			right_parent,
119			node,
120			strategy,
121			left_node,
122			right_node,
123			left_exprs,
124			right_exprs,
125			compiled_left_exprs,
126			compiled_right_exprs,
127			alias,
128			schema,
129			row_number_provider,
130			executor,
131			functions,
132			clock,
133		}
134	}
135
136	fn state_schema() -> Schema {
137		Schema::testing(&[Type::Blob])
138	}
139
140	/// Compute join keys for all rows in Columns
141	/// Returns Vec<Option<Hash128>> - one per row, None for rows with undefined key values
142	pub(crate) fn compute_join_keys(
143		&self,
144		columns: &Columns,
145		compiled_exprs: &[CompiledExpr],
146	) -> Result<Vec<Option<Hash128>>> {
147		let row_count = columns.row_count();
148		if row_count == 0 {
149			return Ok(Vec::new());
150		}
151
152		let exec_ctx = EvalContext {
153			target: None,
154			columns: columns.clone(),
155			row_count,
156			take: None,
157			params: &EMPTY_PARAMS,
158			symbol_table: &EMPTY_SYMBOL_TABLE,
159			is_aggregate_context: false,
160			functions: &self.functions,
161			clock: &self.clock,
162			arena: None,
163			identity: IdentityId::root(),
164		};
165
166		// Evaluate all compiled expressions on the entire batch
167		let mut expr_columns = Vec::with_capacity(compiled_exprs.len());
168		for compiled_expr in compiled_exprs.iter() {
169			let col = if let Some(col_name) = compiled_expr.access_column_name() {
170				columns.column(col_name)
171					.cloned()
172					.unwrap_or_else(|| Column::undefined_typed(col_name, Type::Boolean, row_count))
173			} else {
174				compiled_expr.execute(&exec_ctx)?
175			};
176			expr_columns.push(col);
177		}
178
179		// Compute hash for each row
180		let mut hashes = Vec::with_capacity(row_count);
181		for row_idx in 0..row_count {
182			let mut hasher = Vec::with_capacity(256);
183			let mut has_undefined = false;
184
185			for col in &expr_columns {
186				let value = col.data().get_value(row_idx);
187
188				// Check if the value is undefined - undefined values should never match in joins
189				if matches!(value, Value::None { .. }) {
190					has_undefined = true;
191					break;
192				}
193
194				let bytes = to_stdvec(&value)
195					.map_err(|e| Error(internal!("Failed to encode value for hash: {}", e)))?;
196				hasher.extend_from_slice(&bytes);
197			}
198
199			if has_undefined {
200				hashes.push(None);
201			} else {
202				hashes.push(Some(xxh3_128(&hasher)));
203			}
204		}
205
206		Ok(hashes)
207	}
208
209	/// Generate columns for an unmatched left join result.
210	/// Creates combined columns with left values and Undefined values for right columns.
211	pub(crate) fn unmatched_left_columns(
212		&self,
213		txn: &mut FlowTransaction,
214		left: &Columns,
215		left_idx: usize,
216	) -> Result<Columns> {
217		let left_row_number = left.row_numbers[left_idx];
218
219		// Create composite key for this unmatched row
220		let mut serializer = KeySerializer::new();
221		serializer.extend_u8(b'L');
222		serializer.extend_u64(left_row_number.0);
223		let composite_key = EncodedKey::new(serializer.finish());
224
225		// Get or create a unique row number for this unmatched row
226		let (result_row_number, _is_new) =
227			self.row_number_provider.get_or_create_row_number(txn, &composite_key)?;
228
229		// Get the right side schema
230		let right_schema = self.right_parent.pull(txn, &[])?;
231
232		// Build using JoinedColumnsBuilder
233		let builder = JoinedColumnsBuilder::new(left, &right_schema, &self.alias);
234		Ok(builder.unmatched_left(result_row_number, left, left_idx, &right_schema))
235	}
236
237	/// Generate columns for multiple unmatched left join results.
238	pub(crate) fn unmatched_left_columns_batch(
239		&self,
240		txn: &mut FlowTransaction,
241		left: &Columns,
242		left_indices: &[usize],
243	) -> Result<Columns> {
244		if left_indices.is_empty() {
245			return Ok(Columns::empty());
246		}
247
248		// Build composite keys for all unmatched rows
249		let composite_keys: Vec<EncodedKey> = left_indices
250			.iter()
251			.map(|&idx| {
252				let left_row_number = left.row_numbers[idx];
253				let mut serializer = KeySerializer::new();
254				serializer.extend_u8(b'L');
255				serializer.extend_u64(left_row_number.0);
256				EncodedKey::new(serializer.finish())
257			})
258			.collect();
259
260		// Batch get/create row numbers
261		let row_numbers_with_flags =
262			self.row_number_provider.get_or_create_row_numbers(txn, composite_keys.iter())?;
263		let row_numbers: Vec<RowNumber> = row_numbers_with_flags.iter().map(|(rn, _)| *rn).collect();
264
265		// Get the right side schema
266		let right_schema = self.right_parent.pull(txn, &[])?;
267
268		// Build using JoinedColumnsBuilder
269		let builder = JoinedColumnsBuilder::new(left, &right_schema, &self.alias);
270		Ok(builder.unmatched_left_batch(&row_numbers, left, left_indices, &right_schema))
271	}
272
273	/// Clean up all join results for a given left row
274	/// This removes both matched and unmatched join results
275	pub(crate) fn cleanup_left_row_joins(&self, txn: &mut FlowTransaction, left_number: u64) -> Result<()> {
276		let mut serializer = KeySerializer::new();
277		serializer.extend_u8(b'L');
278		serializer.extend_u64(left_number);
279		let prefix = serializer.finish();
280
281		// Remove all mappings with this prefix
282		self.row_number_provider.remove_by_prefix(txn, &prefix)
283	}
284
285	/// Join a single left row with a single right row, returning combined Columns.
286	pub(crate) fn join_columns(
287		&self,
288		txn: &mut FlowTransaction,
289		left: &Columns,
290		left_idx: usize,
291		right: &Columns,
292		right_idx: usize,
293	) -> Result<Columns> {
294		let left_row_number = left.row_numbers[left_idx];
295		let right_row_number = right.row_numbers[right_idx];
296
297		let composite_key = Self::make_composite_key(left_row_number, right_row_number);
298		let (result_row_number, _is_new) =
299			self.row_number_provider.get_or_create_row_number(txn, &composite_key)?;
300
301		// Join directly at indices without extracting rows
302		let builder = JoinedColumnsBuilder::new(left, right, &self.alias);
303		Ok(builder.join_at_indices(result_row_number, left, left_idx, right, right_idx))
304	}
305
306	/// Create a composite key for a join result from left and right row numbers.
307	fn make_composite_key(left_num: RowNumber, right_num: RowNumber) -> EncodedKey {
308		let mut serializer = KeySerializer::new();
309		serializer.extend_u8(b'L');
310		serializer.extend_u64(left_num.0);
311		serializer.extend_u64(right_num.0);
312		EncodedKey::new(serializer.finish())
313	}
314
315	/// Decode a u64 from keycode format (big-endian with bits flipped).
316	/// The keycode format inverts all bits for proper byte-order sorting.
317	fn decode_row_number_from_keycode(bytes: &[u8]) -> u64 {
318		let arr: [u8; 8] =
319			[!bytes[0], !bytes[1], !bytes[2], !bytes[3], !bytes[4], !bytes[5], !bytes[6], !bytes[7]];
320		u64::from_be_bytes(arr)
321	}
322
323	/// Parse a composite key to extract left and optional right row numbers.
324	/// Returns None if the key format is invalid.
325	/// Key format: '!L' (1 byte inverted) + left_row_number (8 bytes) + optional right_row_number (8 bytes)
326	fn parse_composite_key(key_bytes: &[u8]) -> Option<(RowNumber, Option<RowNumber>)> {
327		// Check minimum length and 'L' prefix (inverted in keycode format)
328		if key_bytes.len() < 9 || key_bytes[0] != !b'L' {
329			return None;
330		}
331
332		let left_num = Self::decode_row_number_from_keycode(&key_bytes[1..9]);
333		let right_num = if key_bytes.len() >= 17 {
334			Some(RowNumber(Self::decode_row_number_from_keycode(&key_bytes[9..17])))
335		} else {
336			None
337		};
338
339		Some((RowNumber(left_num), right_num))
340	}
341
342	/// Join one left row with all right rows.
343	/// Returns combined Columns with one row per right row.
344	pub(crate) fn join_columns_one_to_many(
345		&self,
346		txn: &mut FlowTransaction,
347		left: &Columns,
348		left_idx: usize,
349		right: &Columns,
350	) -> Result<Columns> {
351		let right_count = right.row_count();
352		if right_count == 0 {
353			return Ok(Columns::empty());
354		}
355
356		let left_row_number = left.row_numbers[left_idx];
357
358		// Build all composite keys
359		let composite_keys: Vec<EncodedKey> = (0..right_count)
360			.map(|right_idx| {
361				let right_row_number = right.row_numbers[right_idx];
362				Self::make_composite_key(left_row_number, right_row_number)
363			})
364			.collect();
365
366		// Batch get/create row numbers
367		let row_numbers_with_flags =
368			self.row_number_provider.get_or_create_row_numbers(txn, composite_keys.iter())?;
369		let row_numbers: Vec<RowNumber> = row_numbers_with_flags.iter().map(|(rn, _)| *rn).collect();
370
371		let builder = JoinedColumnsBuilder::new(left, right, &self.alias);
372		Ok(builder.join_one_to_many(&row_numbers, left, left_idx, right))
373	}
374
375	/// Join all left rows with one right row.
376	/// Returns combined Columns with one row per left row.
377	pub(crate) fn join_columns_many_to_one(
378		&self,
379		txn: &mut FlowTransaction,
380		left: &Columns,
381		right: &Columns,
382		right_idx: usize,
383	) -> Result<Columns> {
384		let left_count = left.row_count();
385		if left_count == 0 {
386			return Ok(Columns::empty());
387		}
388
389		let right_row_number = right.row_numbers[right_idx];
390
391		// Build all composite keys
392		let composite_keys: Vec<EncodedKey> = (0..left_count)
393			.map(|left_idx| {
394				let left_row_number = left.row_numbers[left_idx];
395				Self::make_composite_key(left_row_number, right_row_number)
396			})
397			.collect();
398
399		// Batch get/create row numbers
400		let row_numbers_with_flags =
401			self.row_number_provider.get_or_create_row_numbers(txn, composite_keys.iter())?;
402		let row_numbers: Vec<RowNumber> = row_numbers_with_flags.iter().map(|(rn, _)| *rn).collect();
403
404		let builder = JoinedColumnsBuilder::new(left, right, &self.alias);
405		Ok(builder.join_many_to_one(&row_numbers, left, right, right_idx))
406	}
407
408	/// Join left rows at specified indices with right rows at specified indices (cartesian product).
409	/// Returns combined Columns with left_indices.len() * right_indices.len() rows.
410	pub(crate) fn join_columns_cartesian(
411		&self,
412		txn: &mut FlowTransaction,
413		left: &Columns,
414		left_indices: &[usize],
415		right: &Columns,
416		right_indices: &[usize],
417	) -> Result<Columns> {
418		let left_count = left_indices.len();
419		let right_count = right_indices.len();
420		if left_count == 0 || right_count == 0 {
421			return Ok(Columns::empty());
422		}
423
424		// Build all composite keys for cartesian product
425		let total_results = left_count * right_count;
426		let mut composite_keys = Vec::with_capacity(total_results);
427
428		for &left_idx in left_indices {
429			let left_row_number = left.row_numbers[left_idx];
430			for &right_idx in right_indices {
431				let right_row_number = right.row_numbers[right_idx];
432				composite_keys.push(Self::make_composite_key(left_row_number, right_row_number));
433			}
434		}
435
436		// Batch get/create row numbers
437		let row_numbers_with_flags =
438			self.row_number_provider.get_or_create_row_numbers(txn, composite_keys.iter())?;
439		let row_numbers: Vec<RowNumber> = row_numbers_with_flags.iter().map(|(rn, _)| *rn).collect();
440
441		let builder = JoinedColumnsBuilder::new(left, right, &self.alias);
442		Ok(builder.join_cartesian(&row_numbers, left, left_indices, right, right_indices))
443	}
444
445	fn determine_side(&self, change: &Change) -> Option<JoinSide> {
446		match &change.origin {
447			ChangeOrigin::Flow(from_node) => {
448				if *from_node == self.left_node {
449					Some(JoinSide::Left)
450				} else if *from_node == self.right_node {
451					Some(JoinSide::Right)
452				} else {
453					None
454				}
455			}
456			_ => None,
457		}
458	}
459}
460
461impl RawStatefulOperator for JoinOperator {}
462
463impl SingleStateful for JoinOperator {
464	fn layout(&self) -> Schema {
465		self.schema.clone()
466	}
467}
468
469impl Operator for JoinOperator {
470	fn id(&self) -> FlowNodeId {
471		self.node
472	}
473
474	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
475		// Check for self-referential calls (should never happen)
476		if let ChangeOrigin::Flow(from_node) = &change.origin {
477			if *from_node == self.node {
478				return Ok(Change::from_flow(self.node, change.version, Vec::new()));
479			}
480		}
481
482		// Create the state
483		let mut state = JoinState::new(self.node);
484		// Pre-allocate result vector with estimated capacity
485		let estimated_capacity = change.diffs.len() * 2;
486		let mut result = Vec::with_capacity(estimated_capacity);
487
488		// Determine which side this change is from
489		let side = self
490			.determine_side(&change)
491			.ok_or_else(|| Error(internal!("Join operator received change from unknown node")))?;
492
493		let compiled_exprs = match side {
494			JoinSide::Left => &self.compiled_left_exprs,
495			JoinSide::Right => &self.compiled_right_exprs,
496		};
497
498		// Process each diff inline, grouping by key within each diff
499		for diff in change.diffs {
500			match diff {
501				Diff::Insert {
502					post,
503				} => {
504					// Compute keys for all rows in this Columns batch
505					let keys = self.compute_join_keys(&post, compiled_exprs)?;
506					let row_count = post.row_count();
507
508					// Group indices by key hash
509					let mut inserts_by_key: IndexMap<Hash128, Vec<usize>> = IndexMap::new();
510					let mut inserts_undefined: Vec<usize> = Vec::new();
511
512					for row_idx in 0..row_count {
513						if let Some(key_hash) = keys[row_idx] {
514							inserts_by_key.entry(key_hash).or_default().push(row_idx);
515						} else {
516							inserts_undefined.push(row_idx);
517						}
518					}
519
520					// Process inserts with defined keys (batched by key)
521					for (key_hash, indices) in inserts_by_key {
522						let diffs = self.strategy.handle_insert(
523							txn, &post, &indices, side, &key_hash, &mut state, self,
524						)?;
525						result.extend(diffs);
526					}
527
528					// Process inserts with undefined keys individually
529					for idx in inserts_undefined {
530						let diffs = self.strategy.handle_insert_undefined(
531							txn, &post, idx, side, &mut state, self,
532						)?;
533						result.extend(diffs);
534					}
535				}
536				Diff::Remove {
537					pre,
538				} => {
539					// Compute keys for all rows
540					let keys = self.compute_join_keys(&pre, compiled_exprs)?;
541					let row_count = pre.row_count();
542
543					// Group indices by key hash
544					let mut removes_by_key: IndexMap<Hash128, Vec<usize>> = IndexMap::new();
545					let mut removes_undefined: Vec<usize> = Vec::new();
546
547					for row_idx in 0..row_count {
548						if let Some(key_hash) = keys[row_idx] {
549							removes_by_key.entry(key_hash).or_default().push(row_idx);
550						} else {
551							removes_undefined.push(row_idx);
552						}
553					}
554
555					// Process removes with defined keys (batched by key)
556					for (key_hash, indices) in removes_by_key {
557						let diffs = self.strategy.handle_remove(
558							txn,
559							&pre,
560							&indices,
561							side,
562							&key_hash,
563							&mut state,
564							self,
565							change.version,
566						)?;
567						result.extend(diffs);
568					}
569
570					// Process removes with undefined keys individually
571					for idx in removes_undefined {
572						let diffs = self.strategy.handle_remove_undefined(
573							txn,
574							&pre,
575							idx,
576							side,
577							&mut state,
578							self,
579							change.version,
580						)?;
581						result.extend(diffs);
582					}
583				}
584				Diff::Update {
585					pre,
586					post,
587				} => {
588					// Compute keys for pre and post
589					let old_keys = self.compute_join_keys(&pre, compiled_exprs)?;
590					let new_keys = self.compute_join_keys(&post, compiled_exprs)?;
591					let row_count = post.row_count();
592
593					// Group updates by (old_key, new_key) pair
594					// Only updates with same key pair can be batched
595					let mut updates_by_key: IndexMap<(Hash128, Hash128), Vec<usize>> =
596						IndexMap::new();
597					let mut updates_undefined: Vec<usize> = Vec::new();
598
599					for row_idx in 0..row_count {
600						match (old_keys[row_idx], new_keys[row_idx]) {
601							(Some(old_key), Some(new_key)) => {
602								updates_by_key
603									.entry((old_key, new_key))
604									.or_default()
605									.push(row_idx);
606							}
607							_ => {
608								// Any undefined key (old or new) is processed
609								// individually
610								updates_undefined.push(row_idx);
611							}
612						}
613					}
614
615					// Process updates with defined keys (batched by key pair)
616					for ((old_key, new_key), indices) in updates_by_key {
617						let diffs = self.strategy.handle_update(
618							txn,
619							&pre,
620							&post,
621							&indices,
622							side,
623							&old_key,
624							&new_key,
625							&mut state,
626							self,
627							change.version,
628						)?;
629						result.extend(diffs);
630					}
631
632					// Process updates with undefined keys individually
633					for row_idx in updates_undefined {
634						let diffs = self.strategy.handle_update_undefined(
635							txn,
636							&pre,
637							&post,
638							row_idx,
639							side,
640							&mut state,
641							self,
642							change.version,
643						)?;
644						result.extend(diffs);
645					}
646				}
647			}
648		}
649
650		Ok(Change::from_flow(self.node, change.version, result))
651	}
652
653	// FIXME #244 The issue is that when we need to reconstruct an unmatched left row, we need the right side's
654	// schema to create the combined layout To make that work it requires schema / layout information of the right
655	// side this should unlock the test:
656	// testsuite/flow/tests/scripts/backfill/18_multiple_joins_same_table.skip
657	// testsuite/flow/tests/scripts/backfill/19_complex_multi_table.skip
658	// testsuite/flow/tests/scripts/backfill/21_backfill_with_distinct.skip
659	fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
660		let mut found_columns: Vec<Columns> = Vec::new();
661
662		for &row_number in rows {
663			// Get the composite key for this row number (reverse lookup)
664			let Some(key) = self.row_number_provider.get_key_for_row_number(txn, row_number)? else {
665				continue;
666			};
667
668			// Parse the composite key to extract left and optional right row numbers
669			let Some((left_row_number, right_row_number)) = Self::parse_composite_key(key.as_ref()) else {
670				continue;
671			};
672
673			// Get left columns from parent (no Row conversion)
674			let left_cols = self.left_parent.pull(txn, &[left_row_number])?;
675			if left_cols.is_empty() {
676				continue;
677			}
678
679			if let Some(right_row_num) = right_row_number {
680				// Matched join - has right row number
681				let right_cols = self.right_parent.pull(txn, &[right_row_num])?;
682				if !right_cols.is_empty() {
683					// Use JoinedColumnsBuilder to create joined columns
684					let builder = JoinedColumnsBuilder::new(&left_cols, &right_cols, &self.alias);
685					let mut joined = builder.join_single(row_number, &left_cols, &right_cols);
686					// Override the row number to match what was requested
687					joined.row_numbers = CowVec::new(vec![row_number]);
688					found_columns.push(joined);
689				}
690			} else {
691				// Unmatched left row - use builder.unmatched_left
692				let right_schema = self.right_parent.pull(txn, &[])?;
693				let builder = JoinedColumnsBuilder::new(&left_cols, &right_schema, &self.alias);
694				let mut unmatched = builder.unmatched_left(row_number, &left_cols, 0, &right_schema);
695				// Override the row number to match what was requested
696				unmatched.row_numbers = CowVec::new(vec![row_number]);
697				found_columns.push(unmatched);
698			}
699		}
700
701		// Combine found rows
702		if found_columns.is_empty() {
703			// Get schema from both parents and combine them
704			let left_schema = self.left_parent.pull(txn, &[])?;
705			let right_schema = self.right_parent.pull(txn, &[])?;
706
707			// Use JoinedColumnsBuilder to get properly aliased names
708			let builder = JoinedColumnsBuilder::new(&left_schema, &right_schema, &self.alias);
709			let right_names = builder.right_column_names();
710
711			// Add left columns as-is
712			let mut all_columns: Vec<Column> = left_schema.columns.into_iter().collect();
713
714			// Add right columns with pre-computed aliased names
715			for (col, aliased_name) in right_schema.columns.into_iter().zip(right_names.iter()) {
716				all_columns.push(Column {
717					name: Fragment::internal(aliased_name),
718					data: col.data,
719				});
720			}
721
722			Ok(Columns {
723				row_numbers: CowVec::new(Vec::new()),
724				columns: CowVec::new(all_columns),
725			})
726		} else if found_columns.len() == 1 {
727			Ok(found_columns.remove(0))
728		} else {
729			let mut result = found_columns.remove(0);
730			for cols in found_columns {
731				result.row_numbers.make_mut().extend(cols.row_numbers.iter().copied());
732				for (i, col) in cols.columns.into_iter().enumerate() {
733					result.columns.make_mut()[i].extend(col).expect("schema mismatch in join pull");
734				}
735			}
736			Ok(result)
737		}
738	}
739}