Skip to main content

reifydb_core/value/column/buffer/
scatter.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::fmt::Debug;
5
6use reifydb_type::{
7	storage::{DataBitVec, DataVec},
8	util::bitvec::BitVec,
9	value::{
10		Value,
11		container::{
12			bool::BoolContainer, number::NumberContainer, temporal::TemporalContainer, uuid::UuidContainer,
13		},
14		date::Date,
15		datetime::DateTime,
16		duration::Duration,
17		is::{IsNumber, IsTemporal, IsUuid},
18		time::Time,
19		uuid::{Uuid4, Uuid7},
20	},
21};
22
23use crate::value::column::ColumnBuffer;
24
25impl ColumnBuffer {
26	pub fn scatter_merge(
27		&self,
28		other: &ColumnBuffer,
29		then_mask: &BitVec,
30		else_mask: &BitVec,
31		total_len: usize,
32	) -> ColumnBuffer {
33		if let (
34			ColumnBuffer::Option {
35				inner: a_inner,
36				bitvec: a_bv,
37			},
38			ColumnBuffer::Option {
39				inner: b_inner,
40				bitvec: b_bv,
41			},
42		) = (self, other)
43		{
44			let merged_inner = a_inner.scatter_merge(b_inner, then_mask, else_mask, total_len);
45			let merged_bv = merge_validity_bitvecs(a_bv, b_bv, then_mask, else_mask, total_len);
46			return match merged_inner {
47				ColumnBuffer::Option {
48					inner: nested_inner,
49					bitvec: nested_bv,
50				} => ColumnBuffer::Option {
51					inner: nested_inner,
52					bitvec: merged_bv.and(&nested_bv),
53				},
54				inner => ColumnBuffer::Option {
55					inner: Box::new(inner),
56					bitvec: merged_bv,
57				},
58			};
59		}
60
61		if let Some(result) = scatter_merge_typed(self, other, then_mask, else_mask, total_len) {
62			return result;
63		}
64
65		scatter_merge_generic(self, other, then_mask, else_mask, total_len)
66	}
67}
68
69fn merge_validity_bitvecs(
70	then_bv: &BitVec,
71	else_bv: &BitVec,
72	then_mask: &BitVec,
73	else_mask: &BitVec,
74	total_len: usize,
75) -> BitVec {
76	let mut out = BitVec::with_capacity(total_len);
77	for i in 0..total_len {
78		let bit = if DataBitVec::get(then_mask, i) {
79			i < DataBitVec::len(then_bv) && DataBitVec::get(then_bv, i)
80		} else if DataBitVec::get(else_mask, i) {
81			i < DataBitVec::len(else_bv) && DataBitVec::get(else_bv, i)
82		} else {
83			false
84		};
85		DataBitVec::push(&mut out, bit);
86	}
87	out
88}
89
90fn scatter_merge_generic(
91	self_col: &ColumnBuffer,
92	other: &ColumnBuffer,
93	then_mask: &BitVec,
94	else_mask: &BitVec,
95	total_len: usize,
96) -> ColumnBuffer {
97	let result_type = self_col.get_type();
98	let mut data = ColumnBuffer::with_capacity(result_type.clone(), total_len);
99	for i in 0..total_len {
100		if DataBitVec::get(then_mask, i) {
101			data.push_value(self_col.get_value(i));
102		} else if DataBitVec::get(else_mask, i) {
103			data.push_value(other.get_value(i));
104		} else {
105			data.push_value(Value::none_of(result_type.clone()));
106		}
107	}
108	data
109}
110
111fn scatter_merge_typed(
112	self_col: &ColumnBuffer,
113	other: &ColumnBuffer,
114	then_mask: &BitVec,
115	else_mask: &BitVec,
116	total_len: usize,
117) -> Option<ColumnBuffer> {
118	macro_rules! number_kernel {
119		($variant:ident, $t:ty) => {
120			if let (ColumnBuffer::$variant(a), ColumnBuffer::$variant(b)) = (self_col, other) {
121				let (data, validity) = number_scatter::<$t>(a, b, then_mask, else_mask, total_len);
122				let inner = ColumnBuffer::$variant(NumberContainer::new(data));
123				return Some(finalize(inner, validity));
124			}
125		};
126	}
127	macro_rules! temporal_kernel {
128		($variant:ident, $t:ty) => {
129			if let (ColumnBuffer::$variant(a), ColumnBuffer::$variant(b)) = (self_col, other) {
130				let (data, validity) = temporal_scatter::<$t>(a, b, then_mask, else_mask, total_len);
131				let inner = ColumnBuffer::$variant(TemporalContainer::new(data));
132				return Some(finalize(inner, validity));
133			}
134		};
135	}
136	macro_rules! uuid_kernel {
137		($variant:ident, $t:ty) => {
138			if let (ColumnBuffer::$variant(a), ColumnBuffer::$variant(b)) = (self_col, other) {
139				let (data, validity) = uuid_scatter::<$t>(a, b, then_mask, else_mask, total_len);
140				let inner = ColumnBuffer::$variant(UuidContainer::new(data));
141				return Some(finalize(inner, validity));
142			}
143		};
144	}
145
146	if let (ColumnBuffer::Bool(a), ColumnBuffer::Bool(b)) = (self_col, other) {
147		let (data, validity) = bool_scatter(a, b, then_mask, else_mask, total_len);
148		let inner = ColumnBuffer::Bool(BoolContainer::from_parts(data));
149		return Some(finalize(inner, validity));
150	}
151
152	number_kernel!(Float4, f32);
153	number_kernel!(Float8, f64);
154	number_kernel!(Int1, i8);
155	number_kernel!(Int2, i16);
156	number_kernel!(Int4, i32);
157	number_kernel!(Int8, i64);
158	number_kernel!(Int16, i128);
159	number_kernel!(Uint1, u8);
160	number_kernel!(Uint2, u16);
161	number_kernel!(Uint4, u32);
162	number_kernel!(Uint8, u64);
163	number_kernel!(Uint16, u128);
164
165	temporal_kernel!(Date, Date);
166	temporal_kernel!(DateTime, DateTime);
167	temporal_kernel!(Time, Time);
168	temporal_kernel!(Duration, Duration);
169
170	uuid_kernel!(Uuid4, Uuid4);
171	uuid_kernel!(Uuid7, Uuid7);
172
173	None
174}
175
176fn finalize(inner: ColumnBuffer, validity: Option<BitVec>) -> ColumnBuffer {
177	match validity {
178		Some(bv) => ColumnBuffer::Option {
179			inner: Box::new(inner),
180			bitvec: bv,
181		},
182		None => inner,
183	}
184}
185
186fn bool_scatter(
187	a: &BoolContainer,
188	b: &BoolContainer,
189	then_mask: &BitVec,
190	else_mask: &BitVec,
191	total_len: usize,
192) -> (BitVec, Option<BitVec>) {
193	let a_data = a.data();
194	let b_data = b.data();
195	let mut out = BitVec::with_capacity(total_len);
196	let mut validity: Option<BitVec> = None;
197	for i in 0..total_len {
198		let in_then = DataBitVec::get(then_mask, i);
199		let in_else = !in_then && DataBitVec::get(else_mask, i);
200		let bit = if in_then && i < DataBitVec::len(a_data) {
201			DataBitVec::get(a_data, i)
202		} else if in_else && i < DataBitVec::len(b_data) {
203			DataBitVec::get(b_data, i)
204		} else {
205			false
206		};
207		DataBitVec::push(&mut out, bit);
208		if !in_then && !in_else {
209			let v = validity.get_or_insert_with(|| {
210				let mut bv = BitVec::with_capacity(total_len);
211				for _ in 0..i {
212					DataBitVec::push(&mut bv, true);
213				}
214				bv
215			});
216			DataBitVec::push(v, false);
217		} else if let Some(v) = validity.as_mut() {
218			DataBitVec::push(v, true);
219		}
220	}
221	(out, validity)
222}
223
224fn number_scatter<T>(
225	a: &NumberContainer<T>,
226	b: &NumberContainer<T>,
227	then_mask: &BitVec,
228	else_mask: &BitVec,
229	total_len: usize,
230) -> (Vec<T>, Option<BitVec>)
231where
232	T: IsNumber + Clone + Default + Debug,
233{
234	let a_data = a.data();
235	let b_data = b.data();
236	let mut out: Vec<T> = Vec::with_capacity(total_len);
237	let mut validity: Option<BitVec> = None;
238	for i in 0..total_len {
239		let in_then = DataBitVec::get(then_mask, i);
240		let in_else = !in_then && DataBitVec::get(else_mask, i);
241		let value = if in_then {
242			DataVec::get(a_data, i).cloned().unwrap_or_default()
243		} else if in_else {
244			DataVec::get(b_data, i).cloned().unwrap_or_default()
245		} else {
246			T::default()
247		};
248		out.push(value);
249		if !in_then && !in_else {
250			let v = validity.get_or_insert_with(|| {
251				let mut bv = BitVec::with_capacity(total_len);
252				for _ in 0..i {
253					DataBitVec::push(&mut bv, true);
254				}
255				bv
256			});
257			DataBitVec::push(v, false);
258		} else if let Some(v) = validity.as_mut() {
259			DataBitVec::push(v, true);
260		}
261	}
262	(out, validity)
263}
264
265fn temporal_scatter<T>(
266	a: &TemporalContainer<T>,
267	b: &TemporalContainer<T>,
268	then_mask: &BitVec,
269	else_mask: &BitVec,
270	total_len: usize,
271) -> (Vec<T>, Option<BitVec>)
272where
273	T: IsTemporal + Clone + Default + Debug,
274{
275	let a_data = a.data();
276	let b_data = b.data();
277	let mut out: Vec<T> = Vec::with_capacity(total_len);
278	let mut validity: Option<BitVec> = None;
279	for i in 0..total_len {
280		let in_then = DataBitVec::get(then_mask, i);
281		let in_else = !in_then && DataBitVec::get(else_mask, i);
282		let value = if in_then {
283			DataVec::get(a_data, i).cloned().unwrap_or_default()
284		} else if in_else {
285			DataVec::get(b_data, i).cloned().unwrap_or_default()
286		} else {
287			T::default()
288		};
289		out.push(value);
290		if !in_then && !in_else {
291			let v = validity.get_or_insert_with(|| {
292				let mut bv = BitVec::with_capacity(total_len);
293				for _ in 0..i {
294					DataBitVec::push(&mut bv, true);
295				}
296				bv
297			});
298			DataBitVec::push(v, false);
299		} else if let Some(v) = validity.as_mut() {
300			DataBitVec::push(v, true);
301		}
302	}
303	(out, validity)
304}
305
306fn uuid_scatter<T>(
307	a: &UuidContainer<T>,
308	b: &UuidContainer<T>,
309	then_mask: &BitVec,
310	else_mask: &BitVec,
311	total_len: usize,
312) -> (Vec<T>, Option<BitVec>)
313where
314	T: IsUuid + Clone + Default + Debug,
315{
316	let a_data = a.data();
317	let b_data = b.data();
318	let mut out: Vec<T> = Vec::with_capacity(total_len);
319	let mut validity: Option<BitVec> = None;
320	for i in 0..total_len {
321		let in_then = DataBitVec::get(then_mask, i);
322		let in_else = !in_then && DataBitVec::get(else_mask, i);
323		let value = if in_then {
324			DataVec::get(a_data, i).cloned().unwrap_or_default()
325		} else if in_else {
326			DataVec::get(b_data, i).cloned().unwrap_or_default()
327		} else {
328			T::default()
329		};
330		out.push(value);
331		if !in_then && !in_else {
332			let v = validity.get_or_insert_with(|| {
333				let mut bv = BitVec::with_capacity(total_len);
334				for _ in 0..i {
335					DataBitVec::push(&mut bv, true);
336				}
337				bv
338			});
339			DataBitVec::push(v, false);
340		} else if let Some(v) = validity.as_mut() {
341			DataBitVec::push(v, true);
342		}
343	}
344	(out, validity)
345}
346
347#[cfg(test)]
348mod tests {
349	use reifydb_type::{util::bitvec::BitVec, value::Value};
350
351	use crate::value::column::ColumnBuffer;
352
353	#[test]
354	fn scatter_merge_all_mapped_int4() {
355		let a = ColumnBuffer::int4([10, 20, 30, 40]);
356		let b = ColumnBuffer::int4([90, 80, 70, 60]);
357		let then_mask = BitVec::from_slice(&[true, false, true, false]);
358		let else_mask = BitVec::from_slice(&[false, true, false, true]);
359
360		let merged = a.scatter_merge(&b, &then_mask, &else_mask, 4);
361		assert!(matches!(merged, ColumnBuffer::Int4(_)));
362		assert_eq!(merged.get_value(0), Value::Int4(10));
363		assert_eq!(merged.get_value(1), Value::Int4(80));
364		assert_eq!(merged.get_value(2), Value::Int4(30));
365		assert_eq!(merged.get_value(3), Value::Int4(60));
366	}
367
368	#[test]
369	fn scatter_merge_unmapped_promotes_to_option() {
370		let a = ColumnBuffer::int4([10, 20, 30]);
371		let b = ColumnBuffer::int4([90, 80, 70]);
372		// Row 1 is in neither mask - should yield None.
373		let then_mask = BitVec::from_slice(&[true, false, true]);
374		let else_mask = BitVec::from_slice(&[false, false, false]);
375
376		let merged = a.scatter_merge(&b, &then_mask, &else_mask, 3);
377		assert!(matches!(merged, ColumnBuffer::Option { .. }));
378		assert_eq!(merged.get_value(0), Value::Int4(10));
379		assert_eq!(merged.get_value(1), Value::none());
380		assert_eq!(merged.get_value(2), Value::Int4(30));
381	}
382
383	#[test]
384	fn scatter_merge_bool_all_mapped() {
385		let a = ColumnBuffer::bool([true, true, false, false]);
386		let b = ColumnBuffer::bool([false, false, true, true]);
387		let then_mask = BitVec::from_slice(&[true, false, true, false]);
388		let else_mask = BitVec::from_slice(&[false, true, false, true]);
389
390		let merged = a.scatter_merge(&b, &then_mask, &else_mask, 4);
391		assert!(matches!(merged, ColumnBuffer::Bool(_)));
392		assert_eq!(merged.get_value(0), Value::Boolean(true));
393		assert_eq!(merged.get_value(1), Value::Boolean(false));
394		assert_eq!(merged.get_value(2), Value::Boolean(false));
395		assert_eq!(merged.get_value(3), Value::Boolean(true));
396	}
397
398	#[test]
399	fn scatter_merge_utf8_uses_generic_fallback() {
400		let a = ColumnBuffer::utf8(["a", "b", "c"]);
401		let b = ColumnBuffer::utf8(["x", "y", "z"]);
402		let then_mask = BitVec::from_slice(&[true, false, true]);
403		let else_mask = BitVec::from_slice(&[false, true, false]);
404
405		let merged = a.scatter_merge(&b, &then_mask, &else_mask, 3);
406		assert_eq!(merged.get_value(0), Value::Utf8("a".to_string()));
407		assert_eq!(merged.get_value(1), Value::Utf8("y".to_string()));
408		assert_eq!(merged.get_value(2), Value::Utf8("c".to_string()));
409	}
410}