1use std::fmt::Debug;
5
6use reifydb_type::{
7 storage::{DataBitVec, DataVec},
8 util::bitvec::BitVec,
9 value::{
10 Value,
11 container::{
12 bool::BoolContainer, number::NumberContainer, temporal::TemporalContainer, uuid::UuidContainer,
13 },
14 date::Date,
15 datetime::DateTime,
16 duration::Duration,
17 is::{IsNumber, IsTemporal, IsUuid},
18 time::Time,
19 uuid::{Uuid4, Uuid7},
20 },
21};
22
23use crate::value::column::ColumnBuffer;
24
25impl ColumnBuffer {
26 pub fn scatter_merge(
27 &self,
28 other: &ColumnBuffer,
29 then_mask: &BitVec,
30 else_mask: &BitVec,
31 total_len: usize,
32 ) -> ColumnBuffer {
33 if let (
34 ColumnBuffer::Option {
35 inner: a_inner,
36 bitvec: a_bv,
37 },
38 ColumnBuffer::Option {
39 inner: b_inner,
40 bitvec: b_bv,
41 },
42 ) = (self, other)
43 {
44 let merged_inner = a_inner.scatter_merge(b_inner, then_mask, else_mask, total_len);
45 let merged_bv = merge_validity_bitvecs(a_bv, b_bv, then_mask, else_mask, total_len);
46 return match merged_inner {
47 ColumnBuffer::Option {
48 inner: nested_inner,
49 bitvec: nested_bv,
50 } => ColumnBuffer::Option {
51 inner: nested_inner,
52 bitvec: merged_bv.and(&nested_bv),
53 },
54 inner => ColumnBuffer::Option {
55 inner: Box::new(inner),
56 bitvec: merged_bv,
57 },
58 };
59 }
60
61 if let Some(result) = scatter_merge_typed(self, other, then_mask, else_mask, total_len) {
62 return result;
63 }
64
65 scatter_merge_generic(self, other, then_mask, else_mask, total_len)
66 }
67}
68
69fn merge_validity_bitvecs(
70 then_bv: &BitVec,
71 else_bv: &BitVec,
72 then_mask: &BitVec,
73 else_mask: &BitVec,
74 total_len: usize,
75) -> BitVec {
76 let mut out = BitVec::with_capacity(total_len);
77 for i in 0..total_len {
78 let bit = if DataBitVec::get(then_mask, i) {
79 i < DataBitVec::len(then_bv) && DataBitVec::get(then_bv, i)
80 } else if DataBitVec::get(else_mask, i) {
81 i < DataBitVec::len(else_bv) && DataBitVec::get(else_bv, i)
82 } else {
83 false
84 };
85 DataBitVec::push(&mut out, bit);
86 }
87 out
88}
89
90fn scatter_merge_generic(
91 self_col: &ColumnBuffer,
92 other: &ColumnBuffer,
93 then_mask: &BitVec,
94 else_mask: &BitVec,
95 total_len: usize,
96) -> ColumnBuffer {
97 let result_type = self_col.get_type();
98 let mut data = ColumnBuffer::with_capacity(result_type.clone(), total_len);
99 for i in 0..total_len {
100 if DataBitVec::get(then_mask, i) {
101 data.push_value(self_col.get_value(i));
102 } else if DataBitVec::get(else_mask, i) {
103 data.push_value(other.get_value(i));
104 } else {
105 data.push_value(Value::none_of(result_type.clone()));
106 }
107 }
108 data
109}
110
111fn scatter_merge_typed(
112 self_col: &ColumnBuffer,
113 other: &ColumnBuffer,
114 then_mask: &BitVec,
115 else_mask: &BitVec,
116 total_len: usize,
117) -> Option<ColumnBuffer> {
118 macro_rules! number_kernel {
119 ($variant:ident, $t:ty) => {
120 if let (ColumnBuffer::$variant(a), ColumnBuffer::$variant(b)) = (self_col, other) {
121 let (data, validity) = number_scatter::<$t>(a, b, then_mask, else_mask, total_len);
122 let inner = ColumnBuffer::$variant(NumberContainer::new(data));
123 return Some(finalize(inner, validity));
124 }
125 };
126 }
127 macro_rules! temporal_kernel {
128 ($variant:ident, $t:ty) => {
129 if let (ColumnBuffer::$variant(a), ColumnBuffer::$variant(b)) = (self_col, other) {
130 let (data, validity) = temporal_scatter::<$t>(a, b, then_mask, else_mask, total_len);
131 let inner = ColumnBuffer::$variant(TemporalContainer::new(data));
132 return Some(finalize(inner, validity));
133 }
134 };
135 }
136 macro_rules! uuid_kernel {
137 ($variant:ident, $t:ty) => {
138 if let (ColumnBuffer::$variant(a), ColumnBuffer::$variant(b)) = (self_col, other) {
139 let (data, validity) = uuid_scatter::<$t>(a, b, then_mask, else_mask, total_len);
140 let inner = ColumnBuffer::$variant(UuidContainer::new(data));
141 return Some(finalize(inner, validity));
142 }
143 };
144 }
145
146 if let (ColumnBuffer::Bool(a), ColumnBuffer::Bool(b)) = (self_col, other) {
147 let (data, validity) = bool_scatter(a, b, then_mask, else_mask, total_len);
148 let inner = ColumnBuffer::Bool(BoolContainer::from_parts(data));
149 return Some(finalize(inner, validity));
150 }
151
152 number_kernel!(Float4, f32);
153 number_kernel!(Float8, f64);
154 number_kernel!(Int1, i8);
155 number_kernel!(Int2, i16);
156 number_kernel!(Int4, i32);
157 number_kernel!(Int8, i64);
158 number_kernel!(Int16, i128);
159 number_kernel!(Uint1, u8);
160 number_kernel!(Uint2, u16);
161 number_kernel!(Uint4, u32);
162 number_kernel!(Uint8, u64);
163 number_kernel!(Uint16, u128);
164
165 temporal_kernel!(Date, Date);
166 temporal_kernel!(DateTime, DateTime);
167 temporal_kernel!(Time, Time);
168 temporal_kernel!(Duration, Duration);
169
170 uuid_kernel!(Uuid4, Uuid4);
171 uuid_kernel!(Uuid7, Uuid7);
172
173 None
174}
175
176fn finalize(inner: ColumnBuffer, validity: Option<BitVec>) -> ColumnBuffer {
177 match validity {
178 Some(bv) => ColumnBuffer::Option {
179 inner: Box::new(inner),
180 bitvec: bv,
181 },
182 None => inner,
183 }
184}
185
186fn bool_scatter(
187 a: &BoolContainer,
188 b: &BoolContainer,
189 then_mask: &BitVec,
190 else_mask: &BitVec,
191 total_len: usize,
192) -> (BitVec, Option<BitVec>) {
193 let a_data = a.data();
194 let b_data = b.data();
195 let mut out = BitVec::with_capacity(total_len);
196 let mut validity: Option<BitVec> = None;
197 for i in 0..total_len {
198 let in_then = DataBitVec::get(then_mask, i);
199 let in_else = !in_then && DataBitVec::get(else_mask, i);
200 let bit = if in_then && i < DataBitVec::len(a_data) {
201 DataBitVec::get(a_data, i)
202 } else if in_else && i < DataBitVec::len(b_data) {
203 DataBitVec::get(b_data, i)
204 } else {
205 false
206 };
207 DataBitVec::push(&mut out, bit);
208 if !in_then && !in_else {
209 let v = validity.get_or_insert_with(|| {
210 let mut bv = BitVec::with_capacity(total_len);
211 for _ in 0..i {
212 DataBitVec::push(&mut bv, true);
213 }
214 bv
215 });
216 DataBitVec::push(v, false);
217 } else if let Some(v) = validity.as_mut() {
218 DataBitVec::push(v, true);
219 }
220 }
221 (out, validity)
222}
223
224fn number_scatter<T>(
225 a: &NumberContainer<T>,
226 b: &NumberContainer<T>,
227 then_mask: &BitVec,
228 else_mask: &BitVec,
229 total_len: usize,
230) -> (Vec<T>, Option<BitVec>)
231where
232 T: IsNumber + Clone + Default + Debug,
233{
234 let a_data = a.data();
235 let b_data = b.data();
236 let mut out: Vec<T> = Vec::with_capacity(total_len);
237 let mut validity: Option<BitVec> = None;
238 for i in 0..total_len {
239 let in_then = DataBitVec::get(then_mask, i);
240 let in_else = !in_then && DataBitVec::get(else_mask, i);
241 let value = if in_then {
242 DataVec::get(a_data, i).cloned().unwrap_or_default()
243 } else if in_else {
244 DataVec::get(b_data, i).cloned().unwrap_or_default()
245 } else {
246 T::default()
247 };
248 out.push(value);
249 if !in_then && !in_else {
250 let v = validity.get_or_insert_with(|| {
251 let mut bv = BitVec::with_capacity(total_len);
252 for _ in 0..i {
253 DataBitVec::push(&mut bv, true);
254 }
255 bv
256 });
257 DataBitVec::push(v, false);
258 } else if let Some(v) = validity.as_mut() {
259 DataBitVec::push(v, true);
260 }
261 }
262 (out, validity)
263}
264
265fn temporal_scatter<T>(
266 a: &TemporalContainer<T>,
267 b: &TemporalContainer<T>,
268 then_mask: &BitVec,
269 else_mask: &BitVec,
270 total_len: usize,
271) -> (Vec<T>, Option<BitVec>)
272where
273 T: IsTemporal + Clone + Default + Debug,
274{
275 let a_data = a.data();
276 let b_data = b.data();
277 let mut out: Vec<T> = Vec::with_capacity(total_len);
278 let mut validity: Option<BitVec> = None;
279 for i in 0..total_len {
280 let in_then = DataBitVec::get(then_mask, i);
281 let in_else = !in_then && DataBitVec::get(else_mask, i);
282 let value = if in_then {
283 DataVec::get(a_data, i).cloned().unwrap_or_default()
284 } else if in_else {
285 DataVec::get(b_data, i).cloned().unwrap_or_default()
286 } else {
287 T::default()
288 };
289 out.push(value);
290 if !in_then && !in_else {
291 let v = validity.get_or_insert_with(|| {
292 let mut bv = BitVec::with_capacity(total_len);
293 for _ in 0..i {
294 DataBitVec::push(&mut bv, true);
295 }
296 bv
297 });
298 DataBitVec::push(v, false);
299 } else if let Some(v) = validity.as_mut() {
300 DataBitVec::push(v, true);
301 }
302 }
303 (out, validity)
304}
305
306fn uuid_scatter<T>(
307 a: &UuidContainer<T>,
308 b: &UuidContainer<T>,
309 then_mask: &BitVec,
310 else_mask: &BitVec,
311 total_len: usize,
312) -> (Vec<T>, Option<BitVec>)
313where
314 T: IsUuid + Clone + Default + Debug,
315{
316 let a_data = a.data();
317 let b_data = b.data();
318 let mut out: Vec<T> = Vec::with_capacity(total_len);
319 let mut validity: Option<BitVec> = None;
320 for i in 0..total_len {
321 let in_then = DataBitVec::get(then_mask, i);
322 let in_else = !in_then && DataBitVec::get(else_mask, i);
323 let value = if in_then {
324 DataVec::get(a_data, i).cloned().unwrap_or_default()
325 } else if in_else {
326 DataVec::get(b_data, i).cloned().unwrap_or_default()
327 } else {
328 T::default()
329 };
330 out.push(value);
331 if !in_then && !in_else {
332 let v = validity.get_or_insert_with(|| {
333 let mut bv = BitVec::with_capacity(total_len);
334 for _ in 0..i {
335 DataBitVec::push(&mut bv, true);
336 }
337 bv
338 });
339 DataBitVec::push(v, false);
340 } else if let Some(v) = validity.as_mut() {
341 DataBitVec::push(v, true);
342 }
343 }
344 (out, validity)
345}
346
347#[cfg(test)]
348mod tests {
349 use reifydb_type::{util::bitvec::BitVec, value::Value};
350
351 use crate::value::column::ColumnBuffer;
352
353 #[test]
354 fn scatter_merge_all_mapped_int4() {
355 let a = ColumnBuffer::int4([10, 20, 30, 40]);
356 let b = ColumnBuffer::int4([90, 80, 70, 60]);
357 let then_mask = BitVec::from_slice(&[true, false, true, false]);
358 let else_mask = BitVec::from_slice(&[false, true, false, true]);
359
360 let merged = a.scatter_merge(&b, &then_mask, &else_mask, 4);
361 assert!(matches!(merged, ColumnBuffer::Int4(_)));
362 assert_eq!(merged.get_value(0), Value::Int4(10));
363 assert_eq!(merged.get_value(1), Value::Int4(80));
364 assert_eq!(merged.get_value(2), Value::Int4(30));
365 assert_eq!(merged.get_value(3), Value::Int4(60));
366 }
367
368 #[test]
369 fn scatter_merge_unmapped_promotes_to_option() {
370 let a = ColumnBuffer::int4([10, 20, 30]);
371 let b = ColumnBuffer::int4([90, 80, 70]);
372 let then_mask = BitVec::from_slice(&[true, false, true]);
374 let else_mask = BitVec::from_slice(&[false, false, false]);
375
376 let merged = a.scatter_merge(&b, &then_mask, &else_mask, 3);
377 assert!(matches!(merged, ColumnBuffer::Option { .. }));
378 assert_eq!(merged.get_value(0), Value::Int4(10));
379 assert_eq!(merged.get_value(1), Value::none());
380 assert_eq!(merged.get_value(2), Value::Int4(30));
381 }
382
383 #[test]
384 fn scatter_merge_bool_all_mapped() {
385 let a = ColumnBuffer::bool([true, true, false, false]);
386 let b = ColumnBuffer::bool([false, false, true, true]);
387 let then_mask = BitVec::from_slice(&[true, false, true, false]);
388 let else_mask = BitVec::from_slice(&[false, true, false, true]);
389
390 let merged = a.scatter_merge(&b, &then_mask, &else_mask, 4);
391 assert!(matches!(merged, ColumnBuffer::Bool(_)));
392 assert_eq!(merged.get_value(0), Value::Boolean(true));
393 assert_eq!(merged.get_value(1), Value::Boolean(false));
394 assert_eq!(merged.get_value(2), Value::Boolean(false));
395 assert_eq!(merged.get_value(3), Value::Boolean(true));
396 }
397
398 #[test]
399 fn scatter_merge_utf8_uses_generic_fallback() {
400 let a = ColumnBuffer::utf8(["a", "b", "c"]);
401 let b = ColumnBuffer::utf8(["x", "y", "z"]);
402 let then_mask = BitVec::from_slice(&[true, false, true]);
403 let else_mask = BitVec::from_slice(&[false, true, false]);
404
405 let merged = a.scatter_merge(&b, &then_mask, &else_mask, 3);
406 assert_eq!(merged.get_value(0), Value::Utf8("a".to_string()));
407 assert_eq!(merged.get_value(1), Value::Utf8("y".to_string()));
408 assert_eq!(merged.get_value(2), Value::Utf8("c".to_string()));
409 }
410}