Skip to main content

reifydb_core/value/column/data/
scatter.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::fmt::Debug;
5
6use reifydb_type::{
7	storage::{DataBitVec, DataVec},
8	util::bitvec::BitVec,
9	value::{
10		Value,
11		container::{
12			bool::BoolContainer, number::NumberContainer, temporal::TemporalContainer, uuid::UuidContainer,
13		},
14		date::Date,
15		datetime::DateTime,
16		duration::Duration,
17		is::{IsNumber, IsTemporal, IsUuid},
18		time::Time,
19		uuid::{Uuid4, Uuid7},
20	},
21};
22
23use crate::value::column::ColumnData;
24
25impl ColumnData {
26	/// Merge two columns by mask: row `i` gets `self[i]` if `then_mask[i]`,
27	/// `other[i]` if `else_mask[i]`, `None` otherwise. Callers must satisfy
28	/// `self.len() >= total_len` and `other.len() >= total_len`.
29	///
30	/// Fast path: when both operands share a bare variant (Bool / numeric /
31	/// temporal / Uuid), a typed kernel writes directly into a preallocated
32	/// buffer, skipping the `Value` enum round-trip. If any row is unmapped by
33	/// both masks, the kernel returns an `Option`-wrapped result with the
34	/// validity bitmap set accordingly.
35	///
36	/// Fallback: mismatched variants, `Option`-wrapped operands, and
37	/// variable-width variants (`Utf8`, `Blob`, etc.) go through a generic
38	/// row-by-row path.
39	pub fn scatter_merge(
40		&self,
41		other: &ColumnData,
42		then_mask: &BitVec,
43		else_mask: &BitVec,
44		total_len: usize,
45	) -> ColumnData {
46		if let (
47			ColumnData::Option {
48				inner: a_inner,
49				bitvec: a_bv,
50			},
51			ColumnData::Option {
52				inner: b_inner,
53				bitvec: b_bv,
54			},
55		) = (self, other)
56		{
57			let merged_inner = a_inner.scatter_merge(b_inner, then_mask, else_mask, total_len);
58			let merged_bv = merge_validity_bitvecs(a_bv, b_bv, then_mask, else_mask, total_len);
59			return match merged_inner {
60				ColumnData::Option {
61					inner: nested_inner,
62					bitvec: nested_bv,
63				} => ColumnData::Option {
64					inner: nested_inner,
65					bitvec: merged_bv.and(&nested_bv),
66				},
67				inner => ColumnData::Option {
68					inner: Box::new(inner),
69					bitvec: merged_bv,
70				},
71			};
72		}
73
74		if let Some(result) = scatter_merge_typed(self, other, then_mask, else_mask, total_len) {
75			return result;
76		}
77
78		scatter_merge_generic(self, other, then_mask, else_mask, total_len)
79	}
80}
81
82fn merge_validity_bitvecs(
83	then_bv: &BitVec,
84	else_bv: &BitVec,
85	then_mask: &BitVec,
86	else_mask: &BitVec,
87	total_len: usize,
88) -> BitVec {
89	let mut out = BitVec::with_capacity(total_len);
90	for i in 0..total_len {
91		let bit = if DataBitVec::get(then_mask, i) {
92			i < DataBitVec::len(then_bv) && DataBitVec::get(then_bv, i)
93		} else if DataBitVec::get(else_mask, i) {
94			i < DataBitVec::len(else_bv) && DataBitVec::get(else_bv, i)
95		} else {
96			false
97		};
98		DataBitVec::push(&mut out, bit);
99	}
100	out
101}
102
103fn scatter_merge_generic(
104	self_col: &ColumnData,
105	other: &ColumnData,
106	then_mask: &BitVec,
107	else_mask: &BitVec,
108	total_len: usize,
109) -> ColumnData {
110	let result_type = self_col.get_type();
111	let mut data = ColumnData::with_capacity(result_type.clone(), total_len);
112	for i in 0..total_len {
113		if DataBitVec::get(then_mask, i) {
114			data.push_value(self_col.get_value(i));
115		} else if DataBitVec::get(else_mask, i) {
116			data.push_value(other.get_value(i));
117		} else {
118			data.push_value(Value::none_of(result_type.clone()));
119		}
120	}
121	data
122}
123
124/// Fast-path typed scatter merge. Returns `None` if the variant pair isn't
125/// supported by a typed kernel; callers fall back to the generic path.
126fn scatter_merge_typed(
127	self_col: &ColumnData,
128	other: &ColumnData,
129	then_mask: &BitVec,
130	else_mask: &BitVec,
131	total_len: usize,
132) -> Option<ColumnData> {
133	macro_rules! number_kernel {
134		($variant:ident, $t:ty) => {
135			if let (ColumnData::$variant(a), ColumnData::$variant(b)) = (self_col, other) {
136				let (data, validity) = number_scatter::<$t>(a, b, then_mask, else_mask, total_len);
137				let inner = ColumnData::$variant(NumberContainer::new(data));
138				return Some(finalize(inner, validity));
139			}
140		};
141	}
142	macro_rules! temporal_kernel {
143		($variant:ident, $t:ty) => {
144			if let (ColumnData::$variant(a), ColumnData::$variant(b)) = (self_col, other) {
145				let (data, validity) = temporal_scatter::<$t>(a, b, then_mask, else_mask, total_len);
146				let inner = ColumnData::$variant(TemporalContainer::new(data));
147				return Some(finalize(inner, validity));
148			}
149		};
150	}
151	macro_rules! uuid_kernel {
152		($variant:ident, $t:ty) => {
153			if let (ColumnData::$variant(a), ColumnData::$variant(b)) = (self_col, other) {
154				let (data, validity) = uuid_scatter::<$t>(a, b, then_mask, else_mask, total_len);
155				let inner = ColumnData::$variant(UuidContainer::new(data));
156				return Some(finalize(inner, validity));
157			}
158		};
159	}
160
161	if let (ColumnData::Bool(a), ColumnData::Bool(b)) = (self_col, other) {
162		let (data, validity) = bool_scatter(a, b, then_mask, else_mask, total_len);
163		let inner = ColumnData::Bool(BoolContainer::from_parts(data));
164		return Some(finalize(inner, validity));
165	}
166
167	number_kernel!(Float4, f32);
168	number_kernel!(Float8, f64);
169	number_kernel!(Int1, i8);
170	number_kernel!(Int2, i16);
171	number_kernel!(Int4, i32);
172	number_kernel!(Int8, i64);
173	number_kernel!(Int16, i128);
174	number_kernel!(Uint1, u8);
175	number_kernel!(Uint2, u16);
176	number_kernel!(Uint4, u32);
177	number_kernel!(Uint8, u64);
178	number_kernel!(Uint16, u128);
179
180	temporal_kernel!(Date, Date);
181	temporal_kernel!(DateTime, DateTime);
182	temporal_kernel!(Time, Time);
183	temporal_kernel!(Duration, Duration);
184
185	uuid_kernel!(Uuid4, Uuid4);
186	uuid_kernel!(Uuid7, Uuid7);
187
188	None
189}
190
191fn finalize(inner: ColumnData, validity: Option<BitVec>) -> ColumnData {
192	match validity {
193		Some(bv) => ColumnData::Option {
194			inner: Box::new(inner),
195			bitvec: bv,
196		},
197		None => inner,
198	}
199}
200
201fn bool_scatter(
202	a: &BoolContainer,
203	b: &BoolContainer,
204	then_mask: &BitVec,
205	else_mask: &BitVec,
206	total_len: usize,
207) -> (BitVec, Option<BitVec>) {
208	let a_data = a.data();
209	let b_data = b.data();
210	let mut out = BitVec::with_capacity(total_len);
211	let mut validity: Option<BitVec> = None;
212	for i in 0..total_len {
213		let in_then = DataBitVec::get(then_mask, i);
214		let in_else = !in_then && DataBitVec::get(else_mask, i);
215		let bit = if in_then && i < DataBitVec::len(a_data) {
216			DataBitVec::get(a_data, i)
217		} else if in_else && i < DataBitVec::len(b_data) {
218			DataBitVec::get(b_data, i)
219		} else {
220			false
221		};
222		DataBitVec::push(&mut out, bit);
223		if !in_then && !in_else {
224			let v = validity.get_or_insert_with(|| {
225				let mut bv = BitVec::with_capacity(total_len);
226				for _ in 0..i {
227					DataBitVec::push(&mut bv, true);
228				}
229				bv
230			});
231			DataBitVec::push(v, false);
232		} else if let Some(v) = validity.as_mut() {
233			DataBitVec::push(v, true);
234		}
235	}
236	(out, validity)
237}
238
239fn number_scatter<T>(
240	a: &NumberContainer<T>,
241	b: &NumberContainer<T>,
242	then_mask: &BitVec,
243	else_mask: &BitVec,
244	total_len: usize,
245) -> (Vec<T>, Option<BitVec>)
246where
247	T: IsNumber + Clone + Default + Debug,
248{
249	let a_data = a.data();
250	let b_data = b.data();
251	let mut out: Vec<T> = Vec::with_capacity(total_len);
252	let mut validity: Option<BitVec> = None;
253	for i in 0..total_len {
254		let in_then = DataBitVec::get(then_mask, i);
255		let in_else = !in_then && DataBitVec::get(else_mask, i);
256		let value = if in_then {
257			DataVec::get(a_data, i).cloned().unwrap_or_default()
258		} else if in_else {
259			DataVec::get(b_data, i).cloned().unwrap_or_default()
260		} else {
261			T::default()
262		};
263		out.push(value);
264		if !in_then && !in_else {
265			let v = validity.get_or_insert_with(|| {
266				let mut bv = BitVec::with_capacity(total_len);
267				for _ in 0..i {
268					DataBitVec::push(&mut bv, true);
269				}
270				bv
271			});
272			DataBitVec::push(v, false);
273		} else if let Some(v) = validity.as_mut() {
274			DataBitVec::push(v, true);
275		}
276	}
277	(out, validity)
278}
279
280fn temporal_scatter<T>(
281	a: &TemporalContainer<T>,
282	b: &TemporalContainer<T>,
283	then_mask: &BitVec,
284	else_mask: &BitVec,
285	total_len: usize,
286) -> (Vec<T>, Option<BitVec>)
287where
288	T: IsTemporal + Clone + Default + Debug,
289{
290	let a_data = a.data();
291	let b_data = b.data();
292	let mut out: Vec<T> = Vec::with_capacity(total_len);
293	let mut validity: Option<BitVec> = None;
294	for i in 0..total_len {
295		let in_then = DataBitVec::get(then_mask, i);
296		let in_else = !in_then && DataBitVec::get(else_mask, i);
297		let value = if in_then {
298			DataVec::get(a_data, i).cloned().unwrap_or_default()
299		} else if in_else {
300			DataVec::get(b_data, i).cloned().unwrap_or_default()
301		} else {
302			T::default()
303		};
304		out.push(value);
305		if !in_then && !in_else {
306			let v = validity.get_or_insert_with(|| {
307				let mut bv = BitVec::with_capacity(total_len);
308				for _ in 0..i {
309					DataBitVec::push(&mut bv, true);
310				}
311				bv
312			});
313			DataBitVec::push(v, false);
314		} else if let Some(v) = validity.as_mut() {
315			DataBitVec::push(v, true);
316		}
317	}
318	(out, validity)
319}
320
321fn uuid_scatter<T>(
322	a: &UuidContainer<T>,
323	b: &UuidContainer<T>,
324	then_mask: &BitVec,
325	else_mask: &BitVec,
326	total_len: usize,
327) -> (Vec<T>, Option<BitVec>)
328where
329	T: IsUuid + Clone + Default + Debug,
330{
331	let a_data = a.data();
332	let b_data = b.data();
333	let mut out: Vec<T> = Vec::with_capacity(total_len);
334	let mut validity: Option<BitVec> = None;
335	for i in 0..total_len {
336		let in_then = DataBitVec::get(then_mask, i);
337		let in_else = !in_then && DataBitVec::get(else_mask, i);
338		let value = if in_then {
339			DataVec::get(a_data, i).cloned().unwrap_or_default()
340		} else if in_else {
341			DataVec::get(b_data, i).cloned().unwrap_or_default()
342		} else {
343			T::default()
344		};
345		out.push(value);
346		if !in_then && !in_else {
347			let v = validity.get_or_insert_with(|| {
348				let mut bv = BitVec::with_capacity(total_len);
349				for _ in 0..i {
350					DataBitVec::push(&mut bv, true);
351				}
352				bv
353			});
354			DataBitVec::push(v, false);
355		} else if let Some(v) = validity.as_mut() {
356			DataBitVec::push(v, true);
357		}
358	}
359	(out, validity)
360}
361
362#[cfg(test)]
363mod tests {
364	use reifydb_type::{util::bitvec::BitVec, value::Value};
365
366	use crate::value::column::ColumnData;
367
368	#[test]
369	fn scatter_merge_all_mapped_int4() {
370		let a = ColumnData::int4([10, 20, 30, 40]);
371		let b = ColumnData::int4([90, 80, 70, 60]);
372		let then_mask = BitVec::from_slice(&[true, false, true, false]);
373		let else_mask = BitVec::from_slice(&[false, true, false, true]);
374
375		let merged = a.scatter_merge(&b, &then_mask, &else_mask, 4);
376		assert!(matches!(merged, ColumnData::Int4(_)));
377		assert_eq!(merged.get_value(0), Value::Int4(10));
378		assert_eq!(merged.get_value(1), Value::Int4(80));
379		assert_eq!(merged.get_value(2), Value::Int4(30));
380		assert_eq!(merged.get_value(3), Value::Int4(60));
381	}
382
383	#[test]
384	fn scatter_merge_unmapped_promotes_to_option() {
385		let a = ColumnData::int4([10, 20, 30]);
386		let b = ColumnData::int4([90, 80, 70]);
387		// Row 1 is in neither mask — should yield None.
388		let then_mask = BitVec::from_slice(&[true, false, true]);
389		let else_mask = BitVec::from_slice(&[false, false, false]);
390
391		let merged = a.scatter_merge(&b, &then_mask, &else_mask, 3);
392		assert!(matches!(merged, ColumnData::Option { .. }));
393		assert_eq!(merged.get_value(0), Value::Int4(10));
394		assert_eq!(merged.get_value(1), Value::none());
395		assert_eq!(merged.get_value(2), Value::Int4(30));
396	}
397
398	#[test]
399	fn scatter_merge_bool_all_mapped() {
400		let a = ColumnData::bool([true, true, false, false]);
401		let b = ColumnData::bool([false, false, true, true]);
402		let then_mask = BitVec::from_slice(&[true, false, true, false]);
403		let else_mask = BitVec::from_slice(&[false, true, false, true]);
404
405		let merged = a.scatter_merge(&b, &then_mask, &else_mask, 4);
406		assert!(matches!(merged, ColumnData::Bool(_)));
407		assert_eq!(merged.get_value(0), Value::Boolean(true));
408		assert_eq!(merged.get_value(1), Value::Boolean(false));
409		assert_eq!(merged.get_value(2), Value::Boolean(false));
410		assert_eq!(merged.get_value(3), Value::Boolean(true));
411	}
412
413	#[test]
414	fn scatter_merge_utf8_uses_generic_fallback() {
415		let a = ColumnData::utf8(["a", "b", "c"]);
416		let b = ColumnData::utf8(["x", "y", "z"]);
417		let then_mask = BitVec::from_slice(&[true, false, true]);
418		let else_mask = BitVec::from_slice(&[false, true, false]);
419
420		let merged = a.scatter_merge(&b, &then_mask, &else_mask, 3);
421		assert_eq!(merged.get_value(0), Value::Utf8("a".to_string()));
422		assert_eq!(merged.get_value(1), Value::Utf8("y".to_string()));
423		assert_eq!(merged.get_value(2), Value::Utf8("c".to_string()));
424	}
425}