1use std::fmt::Debug;
5
6use reifydb_type::{
7 storage::{DataBitVec, DataVec},
8 util::bitvec::BitVec,
9 value::{
10 Value,
11 container::{
12 bool::BoolContainer, number::NumberContainer, temporal::TemporalContainer, uuid::UuidContainer,
13 },
14 date::Date,
15 datetime::DateTime,
16 duration::Duration,
17 is::{IsNumber, IsTemporal, IsUuid},
18 time::Time,
19 uuid::{Uuid4, Uuid7},
20 },
21};
22
23use crate::value::column::ColumnData;
24
25impl ColumnData {
26 pub fn scatter_merge(
40 &self,
41 other: &ColumnData,
42 then_mask: &BitVec,
43 else_mask: &BitVec,
44 total_len: usize,
45 ) -> ColumnData {
46 if let (
47 ColumnData::Option {
48 inner: a_inner,
49 bitvec: a_bv,
50 },
51 ColumnData::Option {
52 inner: b_inner,
53 bitvec: b_bv,
54 },
55 ) = (self, other)
56 {
57 let merged_inner = a_inner.scatter_merge(b_inner, then_mask, else_mask, total_len);
58 let merged_bv = merge_validity_bitvecs(a_bv, b_bv, then_mask, else_mask, total_len);
59 return match merged_inner {
60 ColumnData::Option {
61 inner: nested_inner,
62 bitvec: nested_bv,
63 } => ColumnData::Option {
64 inner: nested_inner,
65 bitvec: merged_bv.and(&nested_bv),
66 },
67 inner => ColumnData::Option {
68 inner: Box::new(inner),
69 bitvec: merged_bv,
70 },
71 };
72 }
73
74 if let Some(result) = scatter_merge_typed(self, other, then_mask, else_mask, total_len) {
75 return result;
76 }
77
78 scatter_merge_generic(self, other, then_mask, else_mask, total_len)
79 }
80}
81
82fn merge_validity_bitvecs(
83 then_bv: &BitVec,
84 else_bv: &BitVec,
85 then_mask: &BitVec,
86 else_mask: &BitVec,
87 total_len: usize,
88) -> BitVec {
89 let mut out = BitVec::with_capacity(total_len);
90 for i in 0..total_len {
91 let bit = if DataBitVec::get(then_mask, i) {
92 i < DataBitVec::len(then_bv) && DataBitVec::get(then_bv, i)
93 } else if DataBitVec::get(else_mask, i) {
94 i < DataBitVec::len(else_bv) && DataBitVec::get(else_bv, i)
95 } else {
96 false
97 };
98 DataBitVec::push(&mut out, bit);
99 }
100 out
101}
102
103fn scatter_merge_generic(
104 self_col: &ColumnData,
105 other: &ColumnData,
106 then_mask: &BitVec,
107 else_mask: &BitVec,
108 total_len: usize,
109) -> ColumnData {
110 let result_type = self_col.get_type();
111 let mut data = ColumnData::with_capacity(result_type.clone(), total_len);
112 for i in 0..total_len {
113 if DataBitVec::get(then_mask, i) {
114 data.push_value(self_col.get_value(i));
115 } else if DataBitVec::get(else_mask, i) {
116 data.push_value(other.get_value(i));
117 } else {
118 data.push_value(Value::none_of(result_type.clone()));
119 }
120 }
121 data
122}
123
124fn scatter_merge_typed(
127 self_col: &ColumnData,
128 other: &ColumnData,
129 then_mask: &BitVec,
130 else_mask: &BitVec,
131 total_len: usize,
132) -> Option<ColumnData> {
133 macro_rules! number_kernel {
134 ($variant:ident, $t:ty) => {
135 if let (ColumnData::$variant(a), ColumnData::$variant(b)) = (self_col, other) {
136 let (data, validity) = number_scatter::<$t>(a, b, then_mask, else_mask, total_len);
137 let inner = ColumnData::$variant(NumberContainer::new(data));
138 return Some(finalize(inner, validity));
139 }
140 };
141 }
142 macro_rules! temporal_kernel {
143 ($variant:ident, $t:ty) => {
144 if let (ColumnData::$variant(a), ColumnData::$variant(b)) = (self_col, other) {
145 let (data, validity) = temporal_scatter::<$t>(a, b, then_mask, else_mask, total_len);
146 let inner = ColumnData::$variant(TemporalContainer::new(data));
147 return Some(finalize(inner, validity));
148 }
149 };
150 }
151 macro_rules! uuid_kernel {
152 ($variant:ident, $t:ty) => {
153 if let (ColumnData::$variant(a), ColumnData::$variant(b)) = (self_col, other) {
154 let (data, validity) = uuid_scatter::<$t>(a, b, then_mask, else_mask, total_len);
155 let inner = ColumnData::$variant(UuidContainer::new(data));
156 return Some(finalize(inner, validity));
157 }
158 };
159 }
160
161 if let (ColumnData::Bool(a), ColumnData::Bool(b)) = (self_col, other) {
162 let (data, validity) = bool_scatter(a, b, then_mask, else_mask, total_len);
163 let inner = ColumnData::Bool(BoolContainer::from_parts(data));
164 return Some(finalize(inner, validity));
165 }
166
167 number_kernel!(Float4, f32);
168 number_kernel!(Float8, f64);
169 number_kernel!(Int1, i8);
170 number_kernel!(Int2, i16);
171 number_kernel!(Int4, i32);
172 number_kernel!(Int8, i64);
173 number_kernel!(Int16, i128);
174 number_kernel!(Uint1, u8);
175 number_kernel!(Uint2, u16);
176 number_kernel!(Uint4, u32);
177 number_kernel!(Uint8, u64);
178 number_kernel!(Uint16, u128);
179
180 temporal_kernel!(Date, Date);
181 temporal_kernel!(DateTime, DateTime);
182 temporal_kernel!(Time, Time);
183 temporal_kernel!(Duration, Duration);
184
185 uuid_kernel!(Uuid4, Uuid4);
186 uuid_kernel!(Uuid7, Uuid7);
187
188 None
189}
190
191fn finalize(inner: ColumnData, validity: Option<BitVec>) -> ColumnData {
192 match validity {
193 Some(bv) => ColumnData::Option {
194 inner: Box::new(inner),
195 bitvec: bv,
196 },
197 None => inner,
198 }
199}
200
201fn bool_scatter(
202 a: &BoolContainer,
203 b: &BoolContainer,
204 then_mask: &BitVec,
205 else_mask: &BitVec,
206 total_len: usize,
207) -> (BitVec, Option<BitVec>) {
208 let a_data = a.data();
209 let b_data = b.data();
210 let mut out = BitVec::with_capacity(total_len);
211 let mut validity: Option<BitVec> = None;
212 for i in 0..total_len {
213 let in_then = DataBitVec::get(then_mask, i);
214 let in_else = !in_then && DataBitVec::get(else_mask, i);
215 let bit = if in_then && i < DataBitVec::len(a_data) {
216 DataBitVec::get(a_data, i)
217 } else if in_else && i < DataBitVec::len(b_data) {
218 DataBitVec::get(b_data, i)
219 } else {
220 false
221 };
222 DataBitVec::push(&mut out, bit);
223 if !in_then && !in_else {
224 let v = validity.get_or_insert_with(|| {
225 let mut bv = BitVec::with_capacity(total_len);
226 for _ in 0..i {
227 DataBitVec::push(&mut bv, true);
228 }
229 bv
230 });
231 DataBitVec::push(v, false);
232 } else if let Some(v) = validity.as_mut() {
233 DataBitVec::push(v, true);
234 }
235 }
236 (out, validity)
237}
238
239fn number_scatter<T>(
240 a: &NumberContainer<T>,
241 b: &NumberContainer<T>,
242 then_mask: &BitVec,
243 else_mask: &BitVec,
244 total_len: usize,
245) -> (Vec<T>, Option<BitVec>)
246where
247 T: IsNumber + Clone + Default + Debug,
248{
249 let a_data = a.data();
250 let b_data = b.data();
251 let mut out: Vec<T> = Vec::with_capacity(total_len);
252 let mut validity: Option<BitVec> = None;
253 for i in 0..total_len {
254 let in_then = DataBitVec::get(then_mask, i);
255 let in_else = !in_then && DataBitVec::get(else_mask, i);
256 let value = if in_then {
257 DataVec::get(a_data, i).cloned().unwrap_or_default()
258 } else if in_else {
259 DataVec::get(b_data, i).cloned().unwrap_or_default()
260 } else {
261 T::default()
262 };
263 out.push(value);
264 if !in_then && !in_else {
265 let v = validity.get_or_insert_with(|| {
266 let mut bv = BitVec::with_capacity(total_len);
267 for _ in 0..i {
268 DataBitVec::push(&mut bv, true);
269 }
270 bv
271 });
272 DataBitVec::push(v, false);
273 } else if let Some(v) = validity.as_mut() {
274 DataBitVec::push(v, true);
275 }
276 }
277 (out, validity)
278}
279
280fn temporal_scatter<T>(
281 a: &TemporalContainer<T>,
282 b: &TemporalContainer<T>,
283 then_mask: &BitVec,
284 else_mask: &BitVec,
285 total_len: usize,
286) -> (Vec<T>, Option<BitVec>)
287where
288 T: IsTemporal + Clone + Default + Debug,
289{
290 let a_data = a.data();
291 let b_data = b.data();
292 let mut out: Vec<T> = Vec::with_capacity(total_len);
293 let mut validity: Option<BitVec> = None;
294 for i in 0..total_len {
295 let in_then = DataBitVec::get(then_mask, i);
296 let in_else = !in_then && DataBitVec::get(else_mask, i);
297 let value = if in_then {
298 DataVec::get(a_data, i).cloned().unwrap_or_default()
299 } else if in_else {
300 DataVec::get(b_data, i).cloned().unwrap_or_default()
301 } else {
302 T::default()
303 };
304 out.push(value);
305 if !in_then && !in_else {
306 let v = validity.get_or_insert_with(|| {
307 let mut bv = BitVec::with_capacity(total_len);
308 for _ in 0..i {
309 DataBitVec::push(&mut bv, true);
310 }
311 bv
312 });
313 DataBitVec::push(v, false);
314 } else if let Some(v) = validity.as_mut() {
315 DataBitVec::push(v, true);
316 }
317 }
318 (out, validity)
319}
320
321fn uuid_scatter<T>(
322 a: &UuidContainer<T>,
323 b: &UuidContainer<T>,
324 then_mask: &BitVec,
325 else_mask: &BitVec,
326 total_len: usize,
327) -> (Vec<T>, Option<BitVec>)
328where
329 T: IsUuid + Clone + Default + Debug,
330{
331 let a_data = a.data();
332 let b_data = b.data();
333 let mut out: Vec<T> = Vec::with_capacity(total_len);
334 let mut validity: Option<BitVec> = None;
335 for i in 0..total_len {
336 let in_then = DataBitVec::get(then_mask, i);
337 let in_else = !in_then && DataBitVec::get(else_mask, i);
338 let value = if in_then {
339 DataVec::get(a_data, i).cloned().unwrap_or_default()
340 } else if in_else {
341 DataVec::get(b_data, i).cloned().unwrap_or_default()
342 } else {
343 T::default()
344 };
345 out.push(value);
346 if !in_then && !in_else {
347 let v = validity.get_or_insert_with(|| {
348 let mut bv = BitVec::with_capacity(total_len);
349 for _ in 0..i {
350 DataBitVec::push(&mut bv, true);
351 }
352 bv
353 });
354 DataBitVec::push(v, false);
355 } else if let Some(v) = validity.as_mut() {
356 DataBitVec::push(v, true);
357 }
358 }
359 (out, validity)
360}
361
362#[cfg(test)]
363mod tests {
364 use reifydb_type::{util::bitvec::BitVec, value::Value};
365
366 use crate::value::column::ColumnData;
367
368 #[test]
369 fn scatter_merge_all_mapped_int4() {
370 let a = ColumnData::int4([10, 20, 30, 40]);
371 let b = ColumnData::int4([90, 80, 70, 60]);
372 let then_mask = BitVec::from_slice(&[true, false, true, false]);
373 let else_mask = BitVec::from_slice(&[false, true, false, true]);
374
375 let merged = a.scatter_merge(&b, &then_mask, &else_mask, 4);
376 assert!(matches!(merged, ColumnData::Int4(_)));
377 assert_eq!(merged.get_value(0), Value::Int4(10));
378 assert_eq!(merged.get_value(1), Value::Int4(80));
379 assert_eq!(merged.get_value(2), Value::Int4(30));
380 assert_eq!(merged.get_value(3), Value::Int4(60));
381 }
382
383 #[test]
384 fn scatter_merge_unmapped_promotes_to_option() {
385 let a = ColumnData::int4([10, 20, 30]);
386 let b = ColumnData::int4([90, 80, 70]);
387 let then_mask = BitVec::from_slice(&[true, false, true]);
389 let else_mask = BitVec::from_slice(&[false, false, false]);
390
391 let merged = a.scatter_merge(&b, &then_mask, &else_mask, 3);
392 assert!(matches!(merged, ColumnData::Option { .. }));
393 assert_eq!(merged.get_value(0), Value::Int4(10));
394 assert_eq!(merged.get_value(1), Value::none());
395 assert_eq!(merged.get_value(2), Value::Int4(30));
396 }
397
398 #[test]
399 fn scatter_merge_bool_all_mapped() {
400 let a = ColumnData::bool([true, true, false, false]);
401 let b = ColumnData::bool([false, false, true, true]);
402 let then_mask = BitVec::from_slice(&[true, false, true, false]);
403 let else_mask = BitVec::from_slice(&[false, true, false, true]);
404
405 let merged = a.scatter_merge(&b, &then_mask, &else_mask, 4);
406 assert!(matches!(merged, ColumnData::Bool(_)));
407 assert_eq!(merged.get_value(0), Value::Boolean(true));
408 assert_eq!(merged.get_value(1), Value::Boolean(false));
409 assert_eq!(merged.get_value(2), Value::Boolean(false));
410 assert_eq!(merged.get_value(3), Value::Boolean(true));
411 }
412
413 #[test]
414 fn scatter_merge_utf8_uses_generic_fallback() {
415 let a = ColumnData::utf8(["a", "b", "c"]);
416 let b = ColumnData::utf8(["x", "y", "z"]);
417 let then_mask = BitVec::from_slice(&[true, false, true]);
418 let else_mask = BitVec::from_slice(&[false, true, false]);
419
420 let merged = a.scatter_merge(&b, &then_mask, &else_mask, 3);
421 assert_eq!(merged.get_value(0), Value::Utf8("a".to_string()));
422 assert_eq!(merged.get_value(1), Value::Utf8("y".to_string()));
423 assert_eq!(merged.get_value(2), Value::Utf8("c".to_string()));
424 }
425}