1#![allow(clippy::needless_range_loop)]
7
8use std::sync::Arc;
9
10use ahash::AHashSet;
11use vibesql_storage::Row;
12use vibesql_types::{DataType, Date, SqlValue, Time, Timestamp};
13
14use super::types::{ColumnArray, ColumnarBatch};
15use crate::errors::ExecutorError;
16
17impl ColumnarBatch {
18 pub fn row_count(&self) -> usize {
20 self.row_count
21 }
22
23 pub fn column_count(&self) -> usize {
25 self.columns.len()
26 }
27
28 pub fn column(&self, index: usize) -> Option<&ColumnArray> {
30 self.columns.get(index)
31 }
32
33 pub fn column_mut(&mut self, index: usize) -> Option<&mut ColumnArray> {
35 self.columns.get_mut(index)
36 }
37
38 pub fn add_column(&mut self, column: ColumnArray) -> Result<(), ExecutorError> {
40 let col_len = column.len();
42 if self.row_count > 0 && col_len != self.row_count {
43 return Err(ExecutorError::ColumnarLengthMismatch {
44 context: "add_column".to_string(),
45 expected: self.row_count,
46 actual: col_len,
47 });
48 }
49
50 if self.row_count == 0 {
51 self.row_count = col_len;
52 }
53
54 self.columns.push(column);
55 Ok(())
56 }
57
58 pub fn set_column_names(&mut self, names: Vec<String>) {
60 self.column_names = Some(names);
61 }
62
63 pub fn column_names(&self) -> Option<&[String]> {
65 self.column_names.as_deref()
66 }
67
68 pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
70 self.column_names.as_ref()?.iter().position(|n| n == name)
71 }
72
73 pub fn get_value(&self, row_idx: usize, col_idx: usize) -> Result<SqlValue, ExecutorError> {
75 let column = self.column(col_idx).ok_or(ExecutorError::ColumnarColumnNotFound {
76 column_index: col_idx,
77 batch_columns: self.columns.len(),
78 })?;
79 column.get_value(row_idx)
80 }
81
82 pub fn to_rows(&self) -> Result<Vec<Row>, ExecutorError> {
100 if self.row_count == 0 {
101 return Ok(Vec::new());
102 }
103
104 let num_cols = self.columns.len();
106 let mut row_values: Vec<Vec<SqlValue>> =
107 (0..self.row_count).map(|_| Vec::with_capacity(num_cols)).collect();
108
109 for column in &self.columns {
111 column.append_values_to_rows(&mut row_values)?;
112 }
113
114 Ok(row_values.into_iter().map(Row::new).collect())
116 }
117
118 pub fn deduplicate(&self) -> Result<Self, ExecutorError> {
139 if self.row_count == 0 {
140 return Ok(self.clone());
141 }
142
143 let mut seen: AHashSet<Vec<SqlValue>> = AHashSet::with_capacity(self.row_count);
145 let mut keep_indices: Vec<usize> = Vec::with_capacity(self.row_count);
146
147 for row_idx in 0..self.row_count {
148 let mut row_key = Vec::with_capacity(self.columns.len());
150 for col in &self.columns {
151 let value = col.get_value(row_idx)?;
152 row_key.push(value);
153 }
154
155 if seen.insert(row_key) {
157 keep_indices.push(row_idx);
158 }
159 }
160
161 if keep_indices.len() == self.row_count {
163 return Ok(self.clone());
164 }
165
166 log::debug!(
167 "Columnar deduplicate: {} rows -> {} unique rows",
168 self.row_count,
169 keep_indices.len()
170 );
171
172 self.select_rows(&keep_indices)
174 }
175
176 pub fn select_rows(&self, indices: &[usize]) -> Result<Self, ExecutorError> {
186 if indices.is_empty() {
187 return Self::empty(self.columns.len());
188 }
189
190 let new_row_count = indices.len();
191 let mut new_columns = Vec::with_capacity(self.columns.len());
192
193 for column in &self.columns {
194 let new_column = column.select_rows(indices)?;
195 new_columns.push(new_column);
196 }
197
198 Ok(Self {
199 row_count: new_row_count,
200 columns: new_columns,
201 column_names: self.column_names.clone(),
202 })
203 }
204}
205
206impl ColumnArray {
207 fn select_rows(&self, indices: &[usize]) -> Result<Self, ExecutorError> {
209 match self {
210 Self::Int64(values, nulls) => {
211 let new_values: Vec<i64> = indices.iter().map(|&i| values[i]).collect();
212 let new_nulls = nulls
213 .as_ref()
214 .map(|n| Arc::new(indices.iter().map(|&i| n[i]).collect::<Vec<_>>()));
215 Ok(Self::Int64(Arc::new(new_values), new_nulls))
216 }
217 Self::Int32(values, nulls) => {
218 let new_values: Vec<i32> = indices.iter().map(|&i| values[i]).collect();
219 let new_nulls = nulls
220 .as_ref()
221 .map(|n| Arc::new(indices.iter().map(|&i| n[i]).collect::<Vec<_>>()));
222 Ok(Self::Int32(Arc::new(new_values), new_nulls))
223 }
224 Self::Float64(values, nulls) => {
225 let new_values: Vec<f64> = indices.iter().map(|&i| values[i]).collect();
226 let new_nulls = nulls
227 .as_ref()
228 .map(|n| Arc::new(indices.iter().map(|&i| n[i]).collect::<Vec<_>>()));
229 Ok(Self::Float64(Arc::new(new_values), new_nulls))
230 }
231 Self::Float32(values, nulls) => {
232 let new_values: Vec<f32> = indices.iter().map(|&i| values[i]).collect();
233 let new_nulls = nulls
234 .as_ref()
235 .map(|n| Arc::new(indices.iter().map(|&i| n[i]).collect::<Vec<_>>()));
236 Ok(Self::Float32(Arc::new(new_values), new_nulls))
237 }
238 Self::String(values, nulls) => {
239 let new_values: Vec<Arc<str>> =
240 indices.iter().map(|&i| values[i].clone()).collect();
241 let new_nulls = nulls
242 .as_ref()
243 .map(|n| Arc::new(indices.iter().map(|&i| n[i]).collect::<Vec<_>>()));
244 Ok(Self::String(Arc::new(new_values), new_nulls))
245 }
246 Self::FixedString(values, nulls) => {
247 let new_values: Vec<Arc<str>> =
248 indices.iter().map(|&i| values[i].clone()).collect();
249 let new_nulls = nulls
250 .as_ref()
251 .map(|n| Arc::new(indices.iter().map(|&i| n[i]).collect::<Vec<_>>()));
252 Ok(Self::FixedString(Arc::new(new_values), new_nulls))
253 }
254 Self::Date(values, nulls) => {
255 let new_values: Vec<i32> = indices.iter().map(|&i| values[i]).collect();
256 let new_nulls = nulls
257 .as_ref()
258 .map(|n| Arc::new(indices.iter().map(|&i| n[i]).collect::<Vec<_>>()));
259 Ok(Self::Date(Arc::new(new_values), new_nulls))
260 }
261 Self::Timestamp(values, nulls) => {
262 let new_values: Vec<i64> = indices.iter().map(|&i| values[i]).collect();
263 let new_nulls = nulls
264 .as_ref()
265 .map(|n| Arc::new(indices.iter().map(|&i| n[i]).collect::<Vec<_>>()));
266 Ok(Self::Timestamp(Arc::new(new_values), new_nulls))
267 }
268 Self::Boolean(values, nulls) => {
269 let new_values: Vec<u8> = indices.iter().map(|&i| values[i]).collect();
270 let new_nulls = nulls
271 .as_ref()
272 .map(|n| Arc::new(indices.iter().map(|&i| n[i]).collect::<Vec<_>>()));
273 Ok(Self::Boolean(Arc::new(new_values), new_nulls))
274 }
275 Self::Mixed(values) => {
276 let new_values: Vec<SqlValue> =
277 indices.iter().map(|&i| values[i].clone()).collect();
278 Ok(Self::Mixed(Arc::new(new_values)))
279 }
280 }
281 }
282
283 pub fn len(&self) -> usize {
285 match self {
286 Self::Int64(v, _) => v.len(),
287 Self::Int32(v, _) => v.len(),
288 Self::Float64(v, _) => v.len(),
289 Self::Float32(v, _) => v.len(),
290 Self::String(v, _) => v.len(),
291 Self::FixedString(v, _) => v.len(),
292 Self::Date(v, _) => v.len(),
293 Self::Timestamp(v, _) => v.len(),
294 Self::Boolean(v, _) => v.len(),
295 Self::Mixed(v) => v.len(),
296 }
297 }
298
299 pub fn is_empty(&self) -> bool {
301 self.len() == 0
302 }
303
304 pub fn get_value(&self, index: usize) -> Result<SqlValue, ExecutorError> {
306 match self {
307 Self::Int64(values, nulls) => {
308 if let Some(null_mask) = nulls {
309 if null_mask.get(index).copied().unwrap_or(false) {
310 return Ok(SqlValue::Null);
311 }
312 }
313 values
314 .get(index)
315 .map(|v| SqlValue::Integer(*v))
316 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
317 }
318
319 Self::Float64(values, nulls) => {
320 if let Some(null_mask) = nulls {
321 if null_mask.get(index).copied().unwrap_or(false) {
322 return Ok(SqlValue::Null);
323 }
324 }
325 values
326 .get(index)
327 .map(|v| SqlValue::Double(*v))
328 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
329 }
330
331 Self::String(values, nulls) => {
332 if let Some(null_mask) = nulls {
333 if null_mask.get(index).copied().unwrap_or(false) {
334 return Ok(SqlValue::Null);
335 }
336 }
337 values
338 .get(index)
339 .map(|v| SqlValue::Varchar(arcstr::ArcStr::from(v.as_ref())))
340 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
341 }
342
343 Self::Boolean(values, nulls) => {
344 if let Some(null_mask) = nulls {
345 if null_mask.get(index).copied().unwrap_or(false) {
346 return Ok(SqlValue::Null);
347 }
348 }
349 values
350 .get(index)
351 .map(|v| SqlValue::Boolean(*v != 0))
352 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
353 }
354
355 Self::Mixed(values) => {
356 values.get(index).cloned().ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
357 }
358
359 Self::Int32(values, nulls) => {
360 if let Some(null_mask) = nulls {
361 if null_mask.get(index).copied().unwrap_or(false) {
362 return Ok(SqlValue::Null);
363 }
364 }
365 values
366 .get(index)
367 .map(|v| SqlValue::Integer(*v as i64))
368 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
369 }
370
371 Self::Float32(values, nulls) => {
372 if let Some(null_mask) = nulls {
373 if null_mask.get(index).copied().unwrap_or(false) {
374 return Ok(SqlValue::Null);
375 }
376 }
377 values
378 .get(index)
379 .map(|v| SqlValue::Float(*v)) .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
381 }
382
383 Self::FixedString(values, nulls) => {
384 if let Some(null_mask) = nulls {
385 if null_mask.get(index).copied().unwrap_or(false) {
386 return Ok(SqlValue::Null);
387 }
388 }
389 values
390 .get(index)
391 .map(|v| SqlValue::Character(arcstr::ArcStr::from(v.as_ref())))
392 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
393 }
394
395 Self::Date(values, nulls) => {
396 if let Some(null_mask) = nulls {
397 if null_mask.get(index).copied().unwrap_or(false) {
398 return Ok(SqlValue::Null);
399 }
400 }
401 values
402 .get(index)
403 .map(|v| SqlValue::Date(days_since_epoch_to_date(*v)))
404 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
405 }
406
407 Self::Timestamp(values, nulls) => {
408 if let Some(null_mask) = nulls {
409 if null_mask.get(index).copied().unwrap_or(false) {
410 return Ok(SqlValue::Null);
411 }
412 }
413 values
414 .get(index)
415 .map(|v| SqlValue::Timestamp(microseconds_to_timestamp(*v)))
416 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
417 }
418 }
419 }
420
421 pub(crate) fn append_values_to_rows(
438 &self,
439 row_values: &mut [Vec<SqlValue>],
440 ) -> Result<(), ExecutorError> {
441 match self {
442 Self::Int64(values, nulls) => {
443 if let Some(null_mask) = nulls {
444 for (i, (&v, &is_null)) in values.iter().zip(null_mask.iter()).enumerate() {
445 row_values[i].push(if is_null {
446 SqlValue::Null
447 } else {
448 SqlValue::Integer(v)
449 });
450 }
451 } else {
452 for (i, &v) in values.iter().enumerate() {
453 row_values[i].push(SqlValue::Integer(v));
454 }
455 }
456 }
457
458 Self::Int32(values, nulls) => {
459 if let Some(null_mask) = nulls {
460 for (i, (&v, &is_null)) in values.iter().zip(null_mask.iter()).enumerate() {
461 row_values[i].push(if is_null {
462 SqlValue::Null
463 } else {
464 SqlValue::Integer(v as i64)
465 });
466 }
467 } else {
468 for (i, &v) in values.iter().enumerate() {
469 row_values[i].push(SqlValue::Integer(v as i64));
470 }
471 }
472 }
473
474 Self::Float64(values, nulls) => {
475 if let Some(null_mask) = nulls {
476 for (i, (&v, &is_null)) in values.iter().zip(null_mask.iter()).enumerate() {
477 row_values[i].push(if is_null {
478 SqlValue::Null
479 } else {
480 SqlValue::Double(v)
481 });
482 }
483 } else {
484 for (i, &v) in values.iter().enumerate() {
485 row_values[i].push(SqlValue::Double(v));
486 }
487 }
488 }
489
490 Self::Float32(values, nulls) => {
491 if let Some(null_mask) = nulls {
492 for (i, (&v, &is_null)) in values.iter().zip(null_mask.iter()).enumerate() {
493 row_values[i].push(if is_null {
494 SqlValue::Null
495 } else {
496 SqlValue::Float(v) });
498 }
499 } else {
500 for (i, &v) in values.iter().enumerate() {
501 row_values[i].push(SqlValue::Float(v)); }
503 }
504 }
505
506 Self::String(values, nulls) => {
507 if let Some(null_mask) = nulls {
508 for (i, (v, &is_null)) in values.iter().zip(null_mask.iter()).enumerate() {
509 row_values[i].push(if is_null {
510 SqlValue::Null
511 } else {
512 SqlValue::Varchar(arcstr::ArcStr::from(v.as_ref()))
513 });
514 }
515 } else {
516 for (i, v) in values.iter().enumerate() {
517 row_values[i].push(SqlValue::Varchar(arcstr::ArcStr::from(v.as_ref())));
518 }
519 }
520 }
521
522 Self::FixedString(values, nulls) => {
523 if let Some(null_mask) = nulls {
524 for (i, (v, &is_null)) in values.iter().zip(null_mask.iter()).enumerate() {
525 row_values[i].push(if is_null {
526 SqlValue::Null
527 } else {
528 SqlValue::Character(arcstr::ArcStr::from(v.as_ref()))
529 });
530 }
531 } else {
532 for (i, v) in values.iter().enumerate() {
533 row_values[i].push(SqlValue::Character(arcstr::ArcStr::from(v.as_ref())));
534 }
535 }
536 }
537
538 Self::Date(values, nulls) => {
539 if let Some(null_mask) = nulls {
540 for (i, (&v, &is_null)) in values.iter().zip(null_mask.iter()).enumerate() {
541 row_values[i].push(if is_null {
542 SqlValue::Null
543 } else {
544 SqlValue::Date(days_since_epoch_to_date(v))
545 });
546 }
547 } else {
548 for (i, &v) in values.iter().enumerate() {
549 row_values[i].push(SqlValue::Date(days_since_epoch_to_date(v)));
550 }
551 }
552 }
553
554 Self::Timestamp(values, nulls) => {
555 if let Some(null_mask) = nulls {
556 for (i, (&v, &is_null)) in values.iter().zip(null_mask.iter()).enumerate() {
557 row_values[i].push(if is_null {
558 SqlValue::Null
559 } else {
560 SqlValue::Timestamp(microseconds_to_timestamp(v))
561 });
562 }
563 } else {
564 for (i, &v) in values.iter().enumerate() {
565 row_values[i].push(SqlValue::Timestamp(microseconds_to_timestamp(v)));
566 }
567 }
568 }
569
570 Self::Boolean(values, nulls) => {
571 if let Some(null_mask) = nulls {
572 for (i, (&v, &is_null)) in values.iter().zip(null_mask.iter()).enumerate() {
573 row_values[i].push(if is_null {
574 SqlValue::Null
575 } else {
576 SqlValue::Boolean(v != 0)
577 });
578 }
579 } else {
580 for (i, &v) in values.iter().enumerate() {
581 row_values[i].push(SqlValue::Boolean(v != 0));
582 }
583 }
584 }
585
586 Self::Mixed(values) => {
587 for (i, v) in values.iter().enumerate() {
588 row_values[i].push(v.clone());
589 }
590 }
591 }
592
593 Ok(())
594 }
595
596 pub fn data_type(&self) -> DataType {
598 match self {
599 Self::Int64(_, _) => DataType::Integer,
600 Self::Int32(_, _) => DataType::Integer,
601 Self::Float64(_, _) => DataType::DoublePrecision,
602 Self::Float32(_, _) => DataType::Real,
603 Self::String(_, _) => DataType::Varchar { max_length: None },
604 Self::FixedString(_, _) => DataType::Character { length: 255 },
605 Self::Date(_, _) => DataType::Date,
606 Self::Timestamp(_, _) => DataType::Timestamp { with_timezone: false },
607 Self::Boolean(_, _) => DataType::Boolean,
608 Self::Mixed(_) => DataType::Varchar { max_length: None }, }
610 }
611
612 pub fn as_i64(&self) -> Option<(&[i64], Option<&[bool]>)> {
614 match self {
615 Self::Int64(values, nulls) => {
616 Some((values.as_slice(), nulls.as_ref().map(|n| n.as_slice())))
617 }
618 _ => None,
619 }
620 }
621
622 pub fn as_f64(&self) -> Option<(&[f64], Option<&[bool]>)> {
624 match self {
625 Self::Float64(values, nulls) => {
626 Some((values.as_slice(), nulls.as_ref().map(|n| n.as_slice())))
627 }
628 _ => None,
629 }
630 }
631}
632
633fn days_since_epoch_to_date(days: i32) -> Date {
635 let mut year = 1970;
637 let mut remaining_days = days;
638
639 loop {
641 let year_days =
642 if year % 4 == 0 && (year % 100 != 0 || year % 400 == 0) { 366 } else { 365 };
643 if remaining_days < year_days {
644 break;
645 }
646 remaining_days -= year_days;
647 year += 1;
648 }
649
650 let is_leap = year % 4 == 0 && (year % 100 != 0 || year % 400 == 0);
652 let month_lengths = if is_leap {
653 [31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
654 } else {
655 [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
656 };
657
658 let mut month = 1;
659 for &days_in_month in &month_lengths {
660 if remaining_days < days_in_month {
661 break;
662 }
663 remaining_days -= days_in_month;
664 month += 1;
665 }
666
667 let day = remaining_days + 1;
668
669 Date::new(year, month as u8, day as u8).unwrap_or_else(|_| Date::new(1970, 1, 1).unwrap())
670}
671
672fn microseconds_to_timestamp(micros: i64) -> Timestamp {
674 let days = (micros / 86_400_000_000) as i32;
675 let remaining_micros = micros % 86_400_000_000;
676
677 let date = days_since_epoch_to_date(days);
678
679 let hours = (remaining_micros / 3_600_000_000) as u8;
680 let remaining_micros = remaining_micros % 3_600_000_000;
681 let minutes = (remaining_micros / 60_000_000) as u8;
682 let remaining_micros = remaining_micros % 60_000_000;
683 let seconds = (remaining_micros / 1_000_000) as u8;
684 let nanoseconds = ((remaining_micros % 1_000_000) * 1_000) as u32;
685
686 let time = Time::new(hours, minutes, seconds, nanoseconds)
687 .unwrap_or_else(|_| Time::new(0, 0, 0, 0).unwrap());
688
689 Timestamp::new(date, time)
690}
691
692#[cfg(test)]
693mod tests {
694 use super::*;
695
696 #[test]
697 fn test_batch_to_rows_roundtrip() {
698 let original_rows = vec![
699 Row::new(vec![SqlValue::Integer(1), SqlValue::Double(10.5)]),
700 Row::new(vec![SqlValue::Integer(2), SqlValue::Double(20.5)]),
701 ];
702
703 let batch = ColumnarBatch::from_rows(&original_rows).unwrap();
704 let converted_rows = batch.to_rows().unwrap();
705
706 assert_eq!(converted_rows.len(), original_rows.len());
707 for (original, converted) in original_rows.iter().zip(converted_rows.iter()) {
708 assert_eq!(original.len(), converted.len());
709 for i in 0..original.len() {
710 assert_eq!(original.get(i), converted.get(i));
711 }
712 }
713 }
714
715 #[test]
716 fn test_simd_column_access() {
717 let rows = vec![
718 Row::new(vec![SqlValue::Integer(1), SqlValue::Double(10.5)]),
719 Row::new(vec![SqlValue::Integer(2), SqlValue::Double(20.5)]),
720 Row::new(vec![SqlValue::Integer(3), SqlValue::Double(30.5)]),
721 ];
722
723 let batch = ColumnarBatch::from_rows(&rows).unwrap();
724
725 let col0 = batch.column(0).unwrap();
727 if let Some((values, nulls)) = col0.as_i64() {
728 assert_eq!(values, &[1, 2, 3]);
729 assert!(nulls.is_none());
730 } else {
731 panic!("Expected i64 slice");
732 }
733
734 let col1 = batch.column(1).unwrap();
736 if let Some((values, nulls)) = col1.as_f64() {
737 assert_eq!(values, &[10.5, 20.5, 30.5]);
738 assert!(nulls.is_none());
739 } else {
740 panic!("Expected f64 slice");
741 }
742 }
743
744 #[test]
745 fn test_deduplicate_with_duplicates() {
746 let rows = vec![
748 Row::new(vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("A"))]),
749 Row::new(vec![SqlValue::Integer(2), SqlValue::Varchar(arcstr::ArcStr::from("B"))]),
750 Row::new(vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("A"))]), Row::new(vec![SqlValue::Integer(3), SqlValue::Varchar(arcstr::ArcStr::from("C"))]),
752 Row::new(vec![SqlValue::Integer(2), SqlValue::Varchar(arcstr::ArcStr::from("B"))]), ];
754
755 let batch = ColumnarBatch::from_rows(&rows).unwrap();
756 assert_eq!(batch.row_count(), 5);
757
758 let deduped = batch.deduplicate().unwrap();
759 assert_eq!(deduped.row_count(), 3);
760
761 let result_rows = deduped.to_rows().unwrap();
763 assert_eq!(result_rows[0].get(0), Some(&SqlValue::Integer(1)));
764 assert_eq!(result_rows[0].get(1), Some(&SqlValue::Varchar(arcstr::ArcStr::from("A"))));
765 assert_eq!(result_rows[1].get(0), Some(&SqlValue::Integer(2)));
766 assert_eq!(result_rows[1].get(1), Some(&SqlValue::Varchar(arcstr::ArcStr::from("B"))));
767 assert_eq!(result_rows[2].get(0), Some(&SqlValue::Integer(3)));
768 assert_eq!(result_rows[2].get(1), Some(&SqlValue::Varchar(arcstr::ArcStr::from("C"))));
769 }
770
771 #[test]
772 fn test_deduplicate_no_duplicates() {
773 let rows = vec![
775 Row::new(vec![SqlValue::Integer(1)]),
776 Row::new(vec![SqlValue::Integer(2)]),
777 Row::new(vec![SqlValue::Integer(3)]),
778 ];
779
780 let batch = ColumnarBatch::from_rows(&rows).unwrap();
781 let deduped = batch.deduplicate().unwrap();
782
783 assert_eq!(deduped.row_count(), 3);
784 }
785
786 #[test]
787 fn test_deduplicate_with_nulls() {
788 let rows = vec![
790 Row::new(vec![SqlValue::Null, SqlValue::Varchar(arcstr::ArcStr::from("A"))]),
791 Row::new(vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("B"))]),
792 Row::new(vec![SqlValue::Null, SqlValue::Varchar(arcstr::ArcStr::from("A"))]), ];
794
795 let batch = ColumnarBatch::from_rows(&rows).unwrap();
796 let deduped = batch.deduplicate().unwrap();
797
798 assert_eq!(deduped.row_count(), 2);
799 }
800
801 #[test]
802 fn test_deduplicate_empty_batch() {
803 let batch = ColumnarBatch::new(2);
804 let deduped = batch.deduplicate().unwrap();
805 assert_eq!(deduped.row_count(), 0);
806 }
807
808 #[test]
809 fn test_select_rows() {
810 let rows = vec![
811 Row::new(vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("A"))]),
812 Row::new(vec![SqlValue::Integer(2), SqlValue::Varchar(arcstr::ArcStr::from("B"))]),
813 Row::new(vec![SqlValue::Integer(3), SqlValue::Varchar(arcstr::ArcStr::from("C"))]),
814 Row::new(vec![SqlValue::Integer(4), SqlValue::Varchar(arcstr::ArcStr::from("D"))]),
815 ];
816
817 let batch = ColumnarBatch::from_rows(&rows).unwrap();
818
819 let selected = batch.select_rows(&[0, 2]).unwrap();
821 assert_eq!(selected.row_count(), 2);
822
823 let result_rows = selected.to_rows().unwrap();
824 assert_eq!(result_rows[0].get(0), Some(&SqlValue::Integer(1)));
825 assert_eq!(result_rows[1].get(0), Some(&SqlValue::Integer(3)));
826 }
827}