Skip to main content

reifydb_sub_flow/operator/join/
operator.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::{Arc, LazyLock};
5
6use indexmap::IndexMap;
7use postcard::to_stdvec;
8use reifydb_core::{
9	common::JoinType,
10	encoded::{key::EncodedKey, schema::RowSchema},
11	interface::{
12		catalog::flow::FlowNodeId,
13		change::{Change, ChangeOrigin, Diff},
14	},
15	internal,
16	util::encoding::keycode::serializer::KeySerializer,
17	value::column::{Column, columns::Columns},
18};
19use reifydb_engine::{
20	expression::{
21		compile::{CompiledExpr, compile_expression},
22		context::{CompileContext, EvalSession},
23	},
24	vm::{executor::Executor, stack::SymbolTable},
25};
26use reifydb_routine::function::registry::Functions;
27use reifydb_rql::expression::Expression;
28use reifydb_runtime::{
29	context::RuntimeContext,
30	hash::{Hash128, xxh3_128},
31};
32use reifydb_type::{
33	Result,
34	error::Error,
35	fragment::Fragment,
36	params::Params,
37	util::cowvec::CowVec,
38	value::{Value, identity::IdentityId, row_number::RowNumber, r#type::Type},
39};
40
41use super::{
42	column::JoinedColumnsBuilder,
43	state::{JoinSide, JoinState},
44	strategy::JoinStrategy,
45};
46use crate::{
47	operator::{
48		Operator, Operators,
49		stateful::{raw::RawStatefulOperator, row::RowNumberProvider, single::SingleStateful},
50	},
51	transaction::FlowTransaction,
52};
53
54static EMPTY_PARAMS: Params = Params::None;
55static EMPTY_SYMBOL_TABLE: LazyLock<SymbolTable> = LazyLock::new(|| SymbolTable::new());
56
57pub struct JoinOperator {
58	pub(crate) left_parent: Arc<Operators>,
59	pub(crate) right_parent: Arc<Operators>,
60	node: FlowNodeId,
61	strategy: JoinStrategy,
62	left_node: FlowNodeId,
63	right_node: FlowNodeId,
64	left_exprs: Vec<Expression>,
65	pub(crate) right_exprs: Vec<Expression>,
66	compiled_left_exprs: Vec<CompiledExpr>,
67	compiled_right_exprs: Vec<CompiledExpr>,
68	alias: Option<String>,
69	schema: RowSchema,
70	row_number_provider: RowNumberProvider,
71	executor: Executor,
72	functions: Functions,
73	runtime_context: RuntimeContext,
74}
75
76impl JoinOperator {
77	pub fn new(
78		left_parent: Arc<Operators>,
79		right_parent: Arc<Operators>,
80		node: FlowNodeId,
81		join_type: JoinType,
82		left_node: FlowNodeId,
83		right_node: FlowNodeId,
84		left_exprs: Vec<Expression>,
85		right_exprs: Vec<Expression>,
86		alias: Option<String>,
87		executor: Executor,
88	) -> Self {
89		let strategy = JoinStrategy::from(join_type);
90		let schema = Self::state_schema();
91		let row_number_provider = RowNumberProvider::new(node);
92
93		// Create compile context with empty symbol table
94		let compile_ctx = CompileContext {
95			functions: &executor.functions,
96			symbols: &EMPTY_SYMBOL_TABLE,
97		};
98
99		// Compile expressions at construction time
100		let compiled_left_exprs: Vec<CompiledExpr> = left_exprs
101			.iter()
102			.map(|e| compile_expression(&compile_ctx, e))
103			.collect::<Result<Vec<_>>>()
104			.expect("Failed to compile left expressions");
105
106		let compiled_right_exprs: Vec<CompiledExpr> = right_exprs
107			.iter()
108			.map(|e| compile_expression(&compile_ctx, e))
109			.collect::<Result<Vec<_>>>()
110			.expect("Failed to compile right expressions");
111
112		// Extract Functions and RuntimeContext from executor
113		let functions = executor.functions.clone();
114		let runtime_context = executor.runtime_context.clone();
115
116		Self {
117			left_parent,
118			right_parent,
119			node,
120			strategy,
121			left_node,
122			right_node,
123			left_exprs,
124			right_exprs,
125			compiled_left_exprs,
126			compiled_right_exprs,
127			alias,
128			schema,
129			row_number_provider,
130			executor,
131			functions,
132			runtime_context,
133		}
134	}
135
136	fn state_schema() -> RowSchema {
137		RowSchema::testing(&[Type::Blob])
138	}
139
140	/// Compute join keys for all rows in Columns
141	/// Returns Vec<Option<Hash128>> - one per row, None for rows with undefined key values
142	pub(crate) fn compute_join_keys(
143		&self,
144		columns: &Columns,
145		compiled_exprs: &[CompiledExpr],
146	) -> Result<Vec<Option<Hash128>>> {
147		let row_count = columns.row_count();
148		if row_count == 0 {
149			return Ok(Vec::new());
150		}
151
152		let session = EvalSession {
153			params: &EMPTY_PARAMS,
154			symbols: &EMPTY_SYMBOL_TABLE,
155			functions: &self.functions,
156			runtime_context: &self.runtime_context,
157			arena: None,
158			identity: IdentityId::root(),
159			is_aggregate_context: false,
160		};
161		let exec_ctx = session.eval(columns.clone(), row_count);
162
163		// Evaluate all compiled expressions on the entire batch
164		let mut expr_columns = Vec::with_capacity(compiled_exprs.len());
165		for compiled_expr in compiled_exprs.iter() {
166			let col = if let Some(col_name) = compiled_expr.access_column_name() {
167				columns.column(col_name)
168					.cloned()
169					.unwrap_or_else(|| Column::undefined_typed(col_name, Type::Boolean, row_count))
170			} else {
171				compiled_expr.execute(&exec_ctx)?
172			};
173			expr_columns.push(col);
174		}
175
176		// Compute hash for each row
177		let mut hashes = Vec::with_capacity(row_count);
178		for row_idx in 0..row_count {
179			let mut hasher = Vec::with_capacity(256);
180			let mut has_undefined = false;
181
182			for col in &expr_columns {
183				let value = col.data().get_value(row_idx);
184
185				// Check if the value is undefined - undefined values should never match in joins
186				if matches!(value, Value::None { .. }) {
187					has_undefined = true;
188					break;
189				}
190
191				let bytes = to_stdvec(&value)
192					.map_err(|e| Error(internal!("Failed to encode value for hash: {}", e)))?;
193				hasher.extend_from_slice(&bytes);
194			}
195
196			if has_undefined {
197				hashes.push(None);
198			} else {
199				hashes.push(Some(xxh3_128(&hasher)));
200			}
201		}
202
203		Ok(hashes)
204	}
205
206	/// Generate columns for an unmatched left join result.
207	/// Creates combined columns with left values and Undefined values for right columns.
208	pub(crate) fn unmatched_left_columns(
209		&self,
210		txn: &mut FlowTransaction,
211		left: &Columns,
212		left_idx: usize,
213	) -> Result<Columns> {
214		let left_row_number = left.row_numbers[left_idx];
215
216		// Create composite key for this unmatched row
217		let mut serializer = KeySerializer::new();
218		serializer.extend_u8(b'L');
219		serializer.extend_u64(left_row_number.0);
220		let composite_key = EncodedKey::new(serializer.finish());
221
222		// Get or create a unique row number for this unmatched row
223		let (result_row_number, _is_new) =
224			self.row_number_provider.get_or_create_row_number(txn, &composite_key)?;
225
226		// Get the right side schema
227		let right_schema = self.right_parent.pull(txn, &[])?;
228
229		// Build using JoinedColumnsBuilder
230		let builder = JoinedColumnsBuilder::new(left, &right_schema, &self.alias);
231		Ok(builder.unmatched_left(result_row_number, left, left_idx, &right_schema))
232	}
233
234	/// Generate columns for multiple unmatched left join results.
235	pub(crate) fn unmatched_left_columns_batch(
236		&self,
237		txn: &mut FlowTransaction,
238		left: &Columns,
239		left_indices: &[usize],
240	) -> Result<Columns> {
241		if left_indices.is_empty() {
242			return Ok(Columns::empty());
243		}
244
245		// Build composite keys for all unmatched rows
246		let composite_keys: Vec<EncodedKey> = left_indices
247			.iter()
248			.map(|&idx| {
249				let left_row_number = left.row_numbers[idx];
250				let mut serializer = KeySerializer::new();
251				serializer.extend_u8(b'L');
252				serializer.extend_u64(left_row_number.0);
253				EncodedKey::new(serializer.finish())
254			})
255			.collect();
256
257		// Batch get/create row numbers
258		let row_numbers_with_flags =
259			self.row_number_provider.get_or_create_row_numbers(txn, composite_keys.iter())?;
260		let row_numbers: Vec<RowNumber> = row_numbers_with_flags.iter().map(|(rn, _)| *rn).collect();
261
262		// Get the right side schema
263		let right_schema = self.right_parent.pull(txn, &[])?;
264
265		// Build using JoinedColumnsBuilder
266		let builder = JoinedColumnsBuilder::new(left, &right_schema, &self.alias);
267		Ok(builder.unmatched_left_batch(&row_numbers, left, left_indices, &right_schema))
268	}
269
270	/// Clean up all join results for a given left row
271	/// This removes both matched and unmatched join results
272	pub(crate) fn cleanup_left_row_joins(&self, txn: &mut FlowTransaction, left_number: u64) -> Result<()> {
273		let mut serializer = KeySerializer::new();
274		serializer.extend_u8(b'L');
275		serializer.extend_u64(left_number);
276		let prefix = serializer.finish();
277
278		// Remove all mappings with this prefix
279		self.row_number_provider.remove_by_prefix(txn, &prefix)
280	}
281
282	/// Join a single left row with a single right row, returning combined Columns.
283	pub(crate) fn join_columns(
284		&self,
285		txn: &mut FlowTransaction,
286		left: &Columns,
287		left_idx: usize,
288		right: &Columns,
289		right_idx: usize,
290	) -> Result<Columns> {
291		let left_row_number = left.row_numbers[left_idx];
292		let right_row_number = right.row_numbers[right_idx];
293
294		let composite_key = Self::make_composite_key(left_row_number, right_row_number);
295		let (result_row_number, _is_new) =
296			self.row_number_provider.get_or_create_row_number(txn, &composite_key)?;
297
298		// Join directly at indices without extracting rows
299		let builder = JoinedColumnsBuilder::new(left, right, &self.alias);
300		Ok(builder.join_at_indices(result_row_number, left, left_idx, right, right_idx))
301	}
302
303	/// Create a composite key for a join result from left and right row numbers.
304	fn make_composite_key(left_num: RowNumber, right_num: RowNumber) -> EncodedKey {
305		let mut serializer = KeySerializer::new();
306		serializer.extend_u8(b'L');
307		serializer.extend_u64(left_num.0);
308		serializer.extend_u64(right_num.0);
309		EncodedKey::new(serializer.finish())
310	}
311
312	/// Decode a u64 from keycode format (big-endian with bits flipped).
313	/// The keycode format inverts all bits for proper byte-order sorting.
314	fn decode_row_number_from_keycode(bytes: &[u8]) -> u64 {
315		let arr: [u8; 8] =
316			[!bytes[0], !bytes[1], !bytes[2], !bytes[3], !bytes[4], !bytes[5], !bytes[6], !bytes[7]];
317		u64::from_be_bytes(arr)
318	}
319
320	/// Parse a composite key to extract left and optional right row numbers.
321	/// Returns None if the key format is invalid.
322	/// Key format: '!L' (1 byte inverted) + left_row_number (8 bytes) + optional right_row_number (8 bytes)
323	fn parse_composite_key(key_bytes: &[u8]) -> Option<(RowNumber, Option<RowNumber>)> {
324		// Check minimum length and 'L' prefix (inverted in keycode format)
325		if key_bytes.len() < 9 || key_bytes[0] != !b'L' {
326			return None;
327		}
328
329		let left_num = Self::decode_row_number_from_keycode(&key_bytes[1..9]);
330		let right_num = if key_bytes.len() >= 17 {
331			Some(RowNumber(Self::decode_row_number_from_keycode(&key_bytes[9..17])))
332		} else {
333			None
334		};
335
336		Some((RowNumber(left_num), right_num))
337	}
338
339	/// Join one left row with all right rows.
340	/// Returns combined Columns with one row per right row.
341	pub(crate) fn join_columns_one_to_many(
342		&self,
343		txn: &mut FlowTransaction,
344		left: &Columns,
345		left_idx: usize,
346		right: &Columns,
347	) -> Result<Columns> {
348		let right_count = right.row_count();
349		if right_count == 0 {
350			return Ok(Columns::empty());
351		}
352
353		let left_row_number = left.row_numbers[left_idx];
354
355		// Build all composite keys
356		let composite_keys: Vec<EncodedKey> = (0..right_count)
357			.map(|right_idx| {
358				let right_row_number = right.row_numbers[right_idx];
359				Self::make_composite_key(left_row_number, right_row_number)
360			})
361			.collect();
362
363		// Batch get/create row numbers
364		let row_numbers_with_flags =
365			self.row_number_provider.get_or_create_row_numbers(txn, composite_keys.iter())?;
366		let row_numbers: Vec<RowNumber> = row_numbers_with_flags.iter().map(|(rn, _)| *rn).collect();
367
368		let builder = JoinedColumnsBuilder::new(left, right, &self.alias);
369		Ok(builder.join_one_to_many(&row_numbers, left, left_idx, right))
370	}
371
372	/// Join all left rows with one right row.
373	/// Returns combined Columns with one row per left row.
374	pub(crate) fn join_columns_many_to_one(
375		&self,
376		txn: &mut FlowTransaction,
377		left: &Columns,
378		right: &Columns,
379		right_idx: usize,
380	) -> Result<Columns> {
381		let left_count = left.row_count();
382		if left_count == 0 {
383			return Ok(Columns::empty());
384		}
385
386		let right_row_number = right.row_numbers[right_idx];
387
388		// Build all composite keys
389		let composite_keys: Vec<EncodedKey> = (0..left_count)
390			.map(|left_idx| {
391				let left_row_number = left.row_numbers[left_idx];
392				Self::make_composite_key(left_row_number, right_row_number)
393			})
394			.collect();
395
396		// Batch get/create row numbers
397		let row_numbers_with_flags =
398			self.row_number_provider.get_or_create_row_numbers(txn, composite_keys.iter())?;
399		let row_numbers: Vec<RowNumber> = row_numbers_with_flags.iter().map(|(rn, _)| *rn).collect();
400
401		let builder = JoinedColumnsBuilder::new(left, right, &self.alias);
402		Ok(builder.join_many_to_one(&row_numbers, left, right, right_idx))
403	}
404
405	/// Join left rows at specified indices with right rows at specified indices (cartesian product).
406	/// Returns combined Columns with left_indices.len() * right_indices.len() rows.
407	pub(crate) fn join_columns_cartesian(
408		&self,
409		txn: &mut FlowTransaction,
410		left: &Columns,
411		left_indices: &[usize],
412		right: &Columns,
413		right_indices: &[usize],
414	) -> Result<Columns> {
415		let left_count = left_indices.len();
416		let right_count = right_indices.len();
417		if left_count == 0 || right_count == 0 {
418			return Ok(Columns::empty());
419		}
420
421		// Build all composite keys for cartesian product
422		let total_results = left_count * right_count;
423		let mut composite_keys = Vec::with_capacity(total_results);
424
425		for &left_idx in left_indices {
426			let left_row_number = left.row_numbers[left_idx];
427			for &right_idx in right_indices {
428				let right_row_number = right.row_numbers[right_idx];
429				composite_keys.push(Self::make_composite_key(left_row_number, right_row_number));
430			}
431		}
432
433		// Batch get/create row numbers
434		let row_numbers_with_flags =
435			self.row_number_provider.get_or_create_row_numbers(txn, composite_keys.iter())?;
436		let row_numbers: Vec<RowNumber> = row_numbers_with_flags.iter().map(|(rn, _)| *rn).collect();
437
438		let builder = JoinedColumnsBuilder::new(left, right, &self.alias);
439		Ok(builder.join_cartesian(&row_numbers, left, left_indices, right, right_indices))
440	}
441
442	fn determine_side(&self, change: &Change) -> Option<JoinSide> {
443		match &change.origin {
444			ChangeOrigin::Flow(from_node) => {
445				if *from_node == self.left_node {
446					Some(JoinSide::Left)
447				} else if *from_node == self.right_node {
448					Some(JoinSide::Right)
449				} else {
450					None
451				}
452			}
453			_ => None,
454		}
455	}
456}
457
458impl RawStatefulOperator for JoinOperator {}
459
460impl SingleStateful for JoinOperator {
461	fn layout(&self) -> RowSchema {
462		self.schema.clone()
463	}
464}
465
466impl Operator for JoinOperator {
467	fn id(&self) -> FlowNodeId {
468		self.node
469	}
470
471	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
472		// Check for self-referential calls (should never happen)
473		if let ChangeOrigin::Flow(from_node) = &change.origin {
474			if *from_node == self.node {
475				return Ok(Change::from_flow(self.node, change.version, Vec::new()));
476			}
477		}
478
479		// Create the state
480		let mut state = JoinState::new(self.node);
481		// Pre-allocate result vector with estimated capacity
482		let estimated_capacity = change.diffs.len() * 2;
483		let mut result = Vec::with_capacity(estimated_capacity);
484
485		// Determine which side this change is from
486		let side = self
487			.determine_side(&change)
488			.ok_or_else(|| Error(internal!("Join operator received change from unknown node")))?;
489
490		let compiled_exprs = match side {
491			JoinSide::Left => &self.compiled_left_exprs,
492			JoinSide::Right => &self.compiled_right_exprs,
493		};
494
495		// Process each diff inline, grouping by key within each diff
496		for diff in change.diffs {
497			match diff {
498				Diff::Insert {
499					post,
500				} => {
501					// Compute keys for all rows in this Columns batch
502					let keys = self.compute_join_keys(&post, compiled_exprs)?;
503					let row_count = post.row_count();
504
505					// Group indices by key hash
506					let mut inserts_by_key: IndexMap<Hash128, Vec<usize>> = IndexMap::new();
507					let mut inserts_undefined: Vec<usize> = Vec::new();
508
509					for row_idx in 0..row_count {
510						if let Some(key_hash) = keys[row_idx] {
511							inserts_by_key.entry(key_hash).or_default().push(row_idx);
512						} else {
513							inserts_undefined.push(row_idx);
514						}
515					}
516
517					// Process inserts with defined keys (batched by key)
518					for (key_hash, indices) in inserts_by_key {
519						let diffs = self.strategy.handle_insert(
520							txn, &post, &indices, side, &key_hash, &mut state, self,
521						)?;
522						result.extend(diffs);
523					}
524
525					// Process inserts with undefined keys individually
526					for idx in inserts_undefined {
527						let diffs = self.strategy.handle_insert_undefined(
528							txn, &post, idx, side, &mut state, self,
529						)?;
530						result.extend(diffs);
531					}
532				}
533				Diff::Remove {
534					pre,
535				} => {
536					// Compute keys for all rows
537					let keys = self.compute_join_keys(&pre, compiled_exprs)?;
538					let row_count = pre.row_count();
539
540					// Group indices by key hash
541					let mut removes_by_key: IndexMap<Hash128, Vec<usize>> = IndexMap::new();
542					let mut removes_undefined: Vec<usize> = Vec::new();
543
544					for row_idx in 0..row_count {
545						if let Some(key_hash) = keys[row_idx] {
546							removes_by_key.entry(key_hash).or_default().push(row_idx);
547						} else {
548							removes_undefined.push(row_idx);
549						}
550					}
551
552					// Process removes with defined keys (batched by key)
553					for (key_hash, indices) in removes_by_key {
554						let diffs = self.strategy.handle_remove(
555							txn,
556							&pre,
557							&indices,
558							side,
559							&key_hash,
560							&mut state,
561							self,
562							change.version,
563						)?;
564						result.extend(diffs);
565					}
566
567					// Process removes with undefined keys individually
568					for idx in removes_undefined {
569						let diffs = self.strategy.handle_remove_undefined(
570							txn,
571							&pre,
572							idx,
573							side,
574							&mut state,
575							self,
576							change.version,
577						)?;
578						result.extend(diffs);
579					}
580				}
581				Diff::Update {
582					pre,
583					post,
584				} => {
585					// Compute keys for pre and post
586					let pre_keys = self.compute_join_keys(&pre, compiled_exprs)?;
587					let post_keys = self.compute_join_keys(&post, compiled_exprs)?;
588					let row_count = post.row_count();
589
590					// Group updates by (pre_key, post_key) pair
591					// Only updates with same key pair can be batched
592					let mut updates_by_key: IndexMap<(Hash128, Hash128), Vec<usize>> =
593						IndexMap::new();
594					let mut updates_undefined: Vec<usize> = Vec::new();
595
596					for row_idx in 0..row_count {
597						match (pre_keys[row_idx], post_keys[row_idx]) {
598							(Some(pre_key), Some(post_key)) => {
599								updates_by_key
600									.entry((pre_key, post_key))
601									.or_default()
602									.push(row_idx);
603							}
604							_ => {
605								// Any undefined key (pre or post) is processed
606								// individually
607								updates_undefined.push(row_idx);
608							}
609						}
610					}
611
612					// Process updates with defined keys (batched by key pair)
613					for ((pre_key, post_key), indices) in updates_by_key {
614						let diffs = self.strategy.handle_update(
615							txn,
616							&pre,
617							&post,
618							&indices,
619							side,
620							&pre_key,
621							&post_key,
622							&mut state,
623							self,
624							change.version,
625						)?;
626						result.extend(diffs);
627					}
628
629					// Process updates with undefined keys individually
630					for row_idx in updates_undefined {
631						let diffs = self.strategy.handle_update_undefined(
632							txn,
633							&pre,
634							&post,
635							row_idx,
636							side,
637							&mut state,
638							self,
639							change.version,
640						)?;
641						result.extend(diffs);
642					}
643				}
644			}
645		}
646
647		Ok(Change::from_flow(self.node, change.version, result))
648	}
649
650	// FIXME #244 The issue is that when we need to reconstruct an unmatched left row, we need the right side's
651	// schema to create the combined layout To make that work it requires schema / layout information of the right
652	// side this should unlock the test:
653	// testsuite/flow/tests/scripts/backfill/18_multiple_joins_same_table.skip
654	// testsuite/flow/tests/scripts/backfill/19_complex_multi_table.skip
655	// testsuite/flow/tests/scripts/backfill/21_backfill_with_distinct.skip
656	fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
657		let mut found_columns: Vec<Columns> = Vec::new();
658
659		for &row_number in rows {
660			// Get the composite key for this row number (reverse lookup)
661			let Some(key) = self.row_number_provider.get_key_for_row_number(txn, row_number)? else {
662				continue;
663			};
664
665			// Parse the composite key to extract left and optional right row numbers
666			let Some((left_row_number, right_row_number)) = Self::parse_composite_key(key.as_ref()) else {
667				continue;
668			};
669
670			// Get left columns from parent (no Row conversion)
671			let left_cols = self.left_parent.pull(txn, &[left_row_number])?;
672			if left_cols.is_empty() {
673				continue;
674			}
675
676			if let Some(right_row_num) = right_row_number {
677				// Matched join - has right row number
678				let right_cols = self.right_parent.pull(txn, &[right_row_num])?;
679				if !right_cols.is_empty() {
680					// Use JoinedColumnsBuilder to create joined columns
681					let builder = JoinedColumnsBuilder::new(&left_cols, &right_cols, &self.alias);
682					let mut joined = builder.join_single(row_number, &left_cols, &right_cols);
683					// Override the row number to match what was requested
684					joined.row_numbers = CowVec::new(vec![row_number]);
685					found_columns.push(joined);
686				}
687			} else {
688				// Unmatched left row - use builder.unmatched_left
689				let right_schema = self.right_parent.pull(txn, &[])?;
690				let builder = JoinedColumnsBuilder::new(&left_cols, &right_schema, &self.alias);
691				let mut unmatched = builder.unmatched_left(row_number, &left_cols, 0, &right_schema);
692				// Override the row number to match what was requested
693				unmatched.row_numbers = CowVec::new(vec![row_number]);
694				found_columns.push(unmatched);
695			}
696		}
697
698		// Combine found rows
699		if found_columns.is_empty() {
700			// Get schema from both parents and combine them
701			let left_schema = self.left_parent.pull(txn, &[])?;
702			let right_schema = self.right_parent.pull(txn, &[])?;
703
704			// Use JoinedColumnsBuilder to get properly aliased names
705			let builder = JoinedColumnsBuilder::new(&left_schema, &right_schema, &self.alias);
706			let right_names = builder.right_column_names();
707
708			// Add left columns as-is
709			let mut all_columns: Vec<Column> = left_schema.columns.into_iter().collect();
710
711			// Add right columns with pre-computed aliased names
712			for (col, aliased_name) in right_schema.columns.into_iter().zip(right_names.iter()) {
713				all_columns.push(Column {
714					name: Fragment::internal(aliased_name),
715					data: col.data,
716				});
717			}
718
719			Ok(Columns {
720				row_numbers: CowVec::new(Vec::new()),
721				columns: CowVec::new(all_columns),
722			})
723		} else if found_columns.len() == 1 {
724			Ok(found_columns.remove(0))
725		} else {
726			let mut result = found_columns.remove(0);
727			for cols in found_columns {
728				result.row_numbers.make_mut().extend(cols.row_numbers.iter().copied());
729				for (i, col) in cols.columns.into_iter().enumerate() {
730					result.columns.make_mut()[i].extend(col).expect("schema mismatch in join pull");
731				}
732			}
733			Ok(result)
734		}
735	}
736}