1use super::selection::SelectionVector;
7use super::vector::ValueVector;
8use crate::index::ZoneMapEntry;
9use grafeo_common::types::LogicalType;
10use grafeo_common::utils::hash::FxHashMap;
11
12pub const DEFAULT_CHUNK_SIZE: usize = 2048;
14
15#[derive(Debug, Clone, Default)]
27pub struct ChunkZoneHints {
28 pub column_hints: FxHashMap<usize, ZoneMapEntry>,
30}
31
32#[derive(Debug)]
57pub struct DataChunk {
58 columns: Vec<ValueVector>,
60 selection: Option<SelectionVector>,
62 count: usize,
64 capacity: usize,
66 zone_hints: Option<ChunkZoneHints>,
68}
69
70impl DataChunk {
71 #[must_use]
73 pub fn empty() -> Self {
74 Self {
75 columns: Vec::new(),
76 selection: None,
77 count: 0,
78 capacity: 0,
79 zone_hints: None,
80 }
81 }
82
83 #[must_use]
85 pub fn new(columns: Vec<ValueVector>) -> Self {
86 let count = columns.first().map_or(0, ValueVector::len);
87 let capacity = columns.first().map_or(DEFAULT_CHUNK_SIZE, |c| c.len());
88 Self {
89 columns,
90 selection: None,
91 count,
92 capacity,
93 zone_hints: None,
94 }
95 }
96
97 #[must_use]
99 pub fn with_schema(column_types: &[LogicalType]) -> Self {
100 Self::with_capacity(column_types, DEFAULT_CHUNK_SIZE)
101 }
102
103 #[must_use]
105 pub fn with_capacity(column_types: &[LogicalType], capacity: usize) -> Self {
106 let columns = column_types
107 .iter()
108 .map(|t| ValueVector::with_capacity(t.clone(), capacity))
109 .collect();
110
111 Self {
112 columns,
113 selection: None,
114 count: 0,
115 capacity,
116 zone_hints: None,
117 }
118 }
119
120 #[must_use]
122 pub fn column_count(&self) -> usize {
123 self.columns.len()
124 }
125
126 #[must_use]
128 pub fn row_count(&self) -> usize {
129 self.selection.as_ref().map_or(self.count, |s| s.len())
130 }
131
132 #[must_use]
134 pub fn len(&self) -> usize {
135 self.row_count()
136 }
137
138 #[must_use]
140 pub fn columns(&self) -> &[ValueVector] {
141 &self.columns
142 }
143
144 #[must_use]
146 pub fn total_row_count(&self) -> usize {
147 self.count
148 }
149
150 #[must_use]
152 pub fn is_empty(&self) -> bool {
153 self.row_count() == 0
154 }
155
156 #[must_use]
158 pub fn capacity(&self) -> usize {
159 self.capacity
160 }
161
162 #[must_use]
164 pub fn is_full(&self) -> bool {
165 self.count >= self.capacity
166 }
167
168 #[must_use]
170 pub fn column(&self, index: usize) -> Option<&ValueVector> {
171 self.columns.get(index)
172 }
173
174 pub fn column_mut(&mut self, index: usize) -> Option<&mut ValueVector> {
176 self.columns.get_mut(index)
177 }
178
179 #[must_use]
181 pub fn selection(&self) -> Option<&SelectionVector> {
182 self.selection.as_ref()
183 }
184
185 pub fn set_selection(&mut self, selection: SelectionVector) {
187 self.selection = Some(selection);
188 }
189
190 pub fn clear_selection(&mut self) {
192 self.selection = None;
193 }
194
195 pub fn set_zone_hints(&mut self, hints: ChunkZoneHints) {
200 self.zone_hints = Some(hints);
201 }
202
203 #[must_use]
207 pub fn zone_hints(&self) -> Option<&ChunkZoneHints> {
208 self.zone_hints.as_ref()
209 }
210
211 pub fn clear_zone_hints(&mut self) {
213 self.zone_hints = None;
214 }
215
216 pub fn set_count(&mut self, count: usize) {
218 self.count = count;
219 }
220
221 pub fn reset(&mut self) {
223 for col in &mut self.columns {
224 col.clear();
225 }
226 self.selection = None;
227 self.zone_hints = None;
228 self.count = 0;
229 }
230
231 pub fn flatten(&mut self) {
236 let Some(selection) = self.selection.take() else {
237 return;
238 };
239
240 let selected_count = selection.len();
241
242 let mut new_columns = Vec::with_capacity(self.columns.len());
244
245 for col in &self.columns {
246 let mut new_col = ValueVector::with_type(col.data_type().clone());
248 for idx in selection.iter() {
249 if let Some(val) = col.get(idx) {
250 new_col.push(val);
251 }
252 }
253 new_columns.push(new_col);
254 }
255
256 self.columns = new_columns;
257 self.count = selected_count;
258 self.capacity = selected_count;
259 }
260
261 #[must_use]
271 pub fn sort_by_column(&self, col_idx: usize) -> DataChunk {
272 let row_count = self.len();
273 if row_count <= 1 {
274 return self.clone();
275 }
276
277 let Some(sort_col) = self.column(col_idx) else {
278 return self.clone();
279 };
280
281 let mut indices: Vec<usize> = self.selected_indices().collect();
285 indices.sort_by_key(|&i| {
286 sort_col
289 .get_node_id(i)
290 .map(|id| (0u8, id.as_u64()))
291 .or_else(|| {
292 sort_col.get_value(i).and_then(|v| match v {
293 grafeo_common::types::Value::Int64(n) => {
297 #[allow(clippy::cast_sign_loss)]
299 Some((0u8, (n as u64) ^ (1u64 << 63)))
300 }
301 _ => None,
302 })
303 })
304 .unwrap_or((1u8, 0u64))
305 });
306
307 let mut result_columns = Vec::with_capacity(self.columns.len());
309 for col in &self.columns {
310 let mut new_col = ValueVector::with_type(col.data_type().clone());
311 for &idx in &indices {
312 if let Some(val) = col.get(idx) {
313 new_col.push(val);
314 }
315 }
316 result_columns.push(new_col);
317 }
318
319 DataChunk {
320 columns: result_columns,
321 selection: None,
322 count: row_count,
323 capacity: row_count,
324 zone_hints: None,
325 }
326 }
327
328 pub fn selected_indices(&self) -> Box<dyn Iterator<Item = usize> + '_> {
330 match &self.selection {
331 Some(sel) => Box::new(sel.iter()),
332 None => Box::new(0..self.count),
333 }
334 }
335
336 pub fn concat(chunks: &[DataChunk]) -> DataChunk {
340 if chunks.is_empty() {
341 return DataChunk::empty();
342 }
343
344 if chunks.len() == 1 {
345 return DataChunk {
347 columns: chunks[0].columns.clone(),
348 selection: chunks[0].selection.clone(),
349 count: chunks[0].count,
350 capacity: chunks[0].capacity,
351 zone_hints: chunks[0].zone_hints.clone(),
352 };
353 }
354
355 let num_columns = chunks[0].column_count();
356 if num_columns == 0 {
357 return DataChunk::empty();
358 }
359
360 let total_rows: usize = chunks.iter().map(|c| c.row_count()).sum();
361
362 let mut result_columns = Vec::with_capacity(num_columns);
364
365 for col_idx in 0..num_columns {
366 let mut concat_vector = ValueVector::new();
367
368 for chunk in chunks {
369 if let Some(col) = chunk.column(col_idx) {
370 for i in chunk.selected_indices() {
372 if let Some(val) = col.get(i) {
373 concat_vector.push(val);
374 }
375 }
376 }
377 }
378
379 result_columns.push(concat_vector);
380 }
381
382 DataChunk {
383 columns: result_columns,
384 selection: None,
385 count: total_rows,
386 capacity: total_rows,
387 zone_hints: None,
388 }
389 }
390
391 pub fn filter(&self, predicate: &SelectionVector) -> DataChunk {
393 let selected: Vec<usize> = predicate
395 .iter()
396 .filter(|&idx| self.selection.as_ref().map_or(true, |s| s.contains(idx)))
397 .collect();
398
399 let mut result_columns = Vec::with_capacity(self.columns.len());
400
401 for col in &self.columns {
402 let mut new_col = ValueVector::new();
403 for &idx in &selected {
404 if let Some(val) = col.get(idx) {
405 new_col.push(val);
406 }
407 }
408 result_columns.push(new_col);
409 }
410
411 DataChunk {
412 columns: result_columns,
413 selection: None,
414 count: selected.len(),
415 capacity: selected.len(),
416 zone_hints: None,
417 }
418 }
419
420 #[must_use]
424 pub fn slice(&self, offset: usize, count: usize) -> DataChunk {
425 if offset >= self.len() || count == 0 {
426 return DataChunk::empty();
427 }
428
429 let actual_count = count.min(self.len() - offset);
430 let mut result_columns = Vec::with_capacity(self.columns.len());
431
432 for col in &self.columns {
433 let mut new_col = ValueVector::new();
434 for i in offset..(offset + actual_count) {
435 let actual_idx = if let Some(sel) = &self.selection {
436 sel.get(i).unwrap_or(i)
437 } else {
438 i
439 };
440 if let Some(val) = col.get(actual_idx) {
441 new_col.push(val);
442 }
443 }
444 result_columns.push(new_col);
445 }
446
447 DataChunk {
448 columns: result_columns,
449 selection: None,
450 count: actual_count,
451 capacity: actual_count,
452 zone_hints: None,
453 }
454 }
455
456 #[must_use]
458 pub fn num_columns(&self) -> usize {
459 self.columns.len()
460 }
461}
462
463impl Clone for DataChunk {
464 fn clone(&self) -> Self {
465 Self {
466 columns: self.columns.clone(),
467 selection: self.selection.clone(),
468 count: self.count,
469 capacity: self.capacity,
470 zone_hints: self.zone_hints.clone(),
471 }
472 }
473}
474
475pub struct DataChunkBuilder {
477 chunk: DataChunk,
478}
479
480impl DataChunkBuilder {
481 #[must_use]
483 pub fn with_schema(column_types: &[LogicalType]) -> Self {
484 Self {
485 chunk: DataChunk::with_schema(column_types),
486 }
487 }
488
489 #[must_use]
491 pub fn with_capacity(column_types: &[LogicalType], capacity: usize) -> Self {
492 Self {
493 chunk: DataChunk::with_capacity(column_types, capacity),
494 }
495 }
496
497 #[must_use]
499 pub fn new(column_types: &[LogicalType]) -> Self {
500 Self::with_schema(column_types)
501 }
502
503 #[must_use]
505 pub fn row_count(&self) -> usize {
506 self.chunk.count
507 }
508
509 #[must_use]
511 pub fn is_full(&self) -> bool {
512 self.chunk.is_full()
513 }
514
515 pub fn column_mut(&mut self, index: usize) -> Option<&mut ValueVector> {
517 self.chunk.column_mut(index)
518 }
519
520 pub fn advance_row(&mut self) {
522 self.chunk.count += 1;
523 }
524
525 #[must_use]
527 pub fn finish(self) -> DataChunk {
528 self.chunk
529 }
530
531 pub fn reset(&mut self) {
533 self.chunk.reset();
534 }
535}
536
537#[cfg(test)]
538mod tests {
539 use super::*;
540 use grafeo_common::types::Value;
541
542 #[test]
543 fn test_chunk_creation() {
544 let schema = [LogicalType::Int64, LogicalType::String];
545 let chunk = DataChunk::with_schema(&schema);
546
547 assert_eq!(chunk.column_count(), 2);
548 assert_eq!(chunk.row_count(), 0);
549 assert!(chunk.is_empty());
550 }
551
552 #[test]
553 fn test_chunk_builder() {
554 let schema = [LogicalType::Int64, LogicalType::String];
555 let mut builder = DataChunkBuilder::with_schema(&schema);
556
557 builder.column_mut(0).unwrap().push_int64(1);
559 builder.column_mut(1).unwrap().push_string("hello");
560 builder.advance_row();
561
562 builder.column_mut(0).unwrap().push_int64(2);
564 builder.column_mut(1).unwrap().push_string("world");
565 builder.advance_row();
566
567 let chunk = builder.finish();
568
569 assert_eq!(chunk.row_count(), 2);
570 assert_eq!(chunk.column(0).unwrap().get_int64(0), Some(1));
571 assert_eq!(chunk.column(1).unwrap().get_string(1), Some("world"));
572 }
573
574 #[test]
575 fn test_chunk_selection() {
576 let schema = [LogicalType::Int64];
577 let mut builder = DataChunkBuilder::with_schema(&schema);
578
579 for i in 0..10 {
580 builder.column_mut(0).unwrap().push_int64(i);
581 builder.advance_row();
582 }
583
584 let mut chunk = builder.finish();
585 assert_eq!(chunk.row_count(), 10);
586
587 let selection = SelectionVector::from_predicate(10, |i| i % 2 == 0);
589 chunk.set_selection(selection);
590
591 assert_eq!(chunk.row_count(), 5); assert_eq!(chunk.total_row_count(), 10);
593 }
594
595 #[test]
596 fn test_chunk_reset() {
597 let schema = [LogicalType::Int64];
598 let mut builder = DataChunkBuilder::with_schema(&schema);
599
600 builder.column_mut(0).unwrap().push_int64(1);
601 builder.advance_row();
602
603 let mut chunk = builder.finish();
604 assert_eq!(chunk.row_count(), 1);
605
606 chunk.reset();
607 assert_eq!(chunk.row_count(), 0);
608 assert!(chunk.is_empty());
609 }
610
611 #[test]
612 fn test_selected_indices() {
613 let schema = [LogicalType::Int64];
614 let mut chunk = DataChunk::with_schema(&schema);
615 chunk.set_count(5);
616
617 let indices: Vec<_> = chunk.selected_indices().collect();
619 assert_eq!(indices, vec![0, 1, 2, 3, 4]);
620
621 let selection = SelectionVector::from_predicate(5, |i| i == 1 || i == 3);
623 chunk.set_selection(selection);
624
625 let indices: Vec<_> = chunk.selected_indices().collect();
626 assert_eq!(indices, vec![1, 3]);
627 }
628
629 #[test]
630 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
632 fn test_chunk_flatten() {
633 let schema = [LogicalType::Int64, LogicalType::String];
634 let mut builder = DataChunkBuilder::with_schema(&schema);
635
636 let letters = ["a", "b", "c", "d", "e"];
638 for i in 0..5 {
639 builder.column_mut(0).unwrap().push_int64(i);
640 builder
641 .column_mut(1)
642 .unwrap()
643 .push_string(letters[i as usize]);
644 builder.advance_row();
645 }
646
647 let mut chunk = builder.finish();
648
649 let selection = SelectionVector::from_predicate(5, |i| i % 2 == 1);
651 chunk.set_selection(selection);
652
653 assert_eq!(chunk.row_count(), 2);
654 assert_eq!(chunk.total_row_count(), 5);
655
656 chunk.flatten();
658
659 assert_eq!(chunk.row_count(), 2);
661 assert_eq!(chunk.total_row_count(), 2);
662 assert!(chunk.selection().is_none());
663
664 assert_eq!(chunk.column(0).unwrap().get_int64(0), Some(1));
666 assert_eq!(chunk.column(0).unwrap().get_int64(1), Some(3));
667 assert_eq!(chunk.column(1).unwrap().get_string(0), Some("b"));
668 assert_eq!(chunk.column(1).unwrap().get_string(1), Some("d"));
669 }
670
671 #[test]
672 fn test_chunk_flatten_no_selection() {
673 let schema = [LogicalType::Int64];
674 let mut builder = DataChunkBuilder::with_schema(&schema);
675
676 builder.column_mut(0).unwrap().push_int64(42);
677 builder.advance_row();
678
679 let mut chunk = builder.finish();
680 let original_count = chunk.row_count();
681
682 chunk.flatten();
684
685 assert_eq!(chunk.row_count(), original_count);
686 assert_eq!(chunk.column(0).unwrap().get_int64(0), Some(42));
687 }
688
689 #[test]
690 fn test_chunk_zone_hints_default() {
691 let hints = ChunkZoneHints::default();
692 assert!(hints.column_hints.is_empty());
693 }
694
695 #[test]
696 fn test_chunk_zone_hints_set_and_get() {
697 let schema = [LogicalType::Int64];
698 let mut chunk = DataChunk::with_schema(&schema);
699
700 assert!(chunk.zone_hints().is_none());
702
703 let mut hints = ChunkZoneHints::default();
705 hints.column_hints.insert(
706 0,
707 crate::index::ZoneMapEntry::with_min_max(
708 grafeo_common::types::Value::Int64(10),
709 grafeo_common::types::Value::Int64(100),
710 0,
711 10,
712 ),
713 );
714 chunk.set_zone_hints(hints);
715
716 assert!(chunk.zone_hints().is_some());
718 let retrieved = chunk.zone_hints().unwrap();
719 assert_eq!(retrieved.column_hints.len(), 1);
720 assert!(retrieved.column_hints.contains_key(&0));
721 }
722
723 #[test]
724 fn test_chunk_zone_hints_clear() {
725 let schema = [LogicalType::Int64];
726 let mut chunk = DataChunk::with_schema(&schema);
727
728 let hints = ChunkZoneHints::default();
730 chunk.set_zone_hints(hints);
731 assert!(chunk.zone_hints().is_some());
732
733 chunk.clear_zone_hints();
735 assert!(chunk.zone_hints().is_none());
736 }
737
738 #[test]
739 fn test_chunk_zone_hints_preserved_on_clone() {
740 let schema = [LogicalType::Int64];
741 let mut chunk = DataChunk::with_schema(&schema);
742
743 let mut hints = ChunkZoneHints::default();
745 hints.column_hints.insert(
746 0,
747 crate::index::ZoneMapEntry::with_min_max(
748 grafeo_common::types::Value::Int64(1),
749 grafeo_common::types::Value::Int64(10),
750 0,
751 10,
752 ),
753 );
754 chunk.set_zone_hints(hints);
755
756 let cloned = chunk.clone();
758 assert!(cloned.zone_hints().is_some());
759 assert_eq!(cloned.zone_hints().unwrap().column_hints.len(), 1);
760 }
761
762 #[test]
763 fn test_chunk_reset_clears_zone_hints() {
764 let schema = [LogicalType::Int64];
765 let mut chunk = DataChunk::with_schema(&schema);
766
767 let hints = ChunkZoneHints::default();
769 chunk.set_zone_hints(hints);
770 assert!(chunk.zone_hints().is_some());
771
772 chunk.reset();
774 assert!(chunk.zone_hints().is_none());
775 }
776
777 #[test]
778 fn test_sort_by_column_int64() {
779 let v = ValueVector::from_values(&[Value::Int64(30), Value::Int64(10), Value::Int64(20)]);
780 let names = ValueVector::from_values(&[
781 Value::from("Alix"),
782 Value::from("Gus"),
783 Value::from("Jules"),
784 ]);
785 let chunk = DataChunk::new(vec![v, names]);
786 let sorted = chunk.sort_by_column(0);
787
788 assert_eq!(sorted.len(), 3);
789 assert_eq!(
791 sorted.column(0).unwrap().get_value(0),
792 Some(Value::Int64(10))
793 );
794 assert_eq!(
795 sorted.column(0).unwrap().get_value(1),
796 Some(Value::Int64(20))
797 );
798 assert_eq!(
799 sorted.column(0).unwrap().get_value(2),
800 Some(Value::Int64(30))
801 );
802 assert_eq!(
804 sorted.column(1).unwrap().get_value(0),
805 Some(Value::from("Gus"))
806 );
807 assert_eq!(
808 sorted.column(1).unwrap().get_value(1),
809 Some(Value::from("Jules"))
810 );
811 assert_eq!(
812 sorted.column(1).unwrap().get_value(2),
813 Some(Value::from("Alix"))
814 );
815 }
816
817 #[test]
818 fn test_sort_by_column_node_ids() {
819 use grafeo_common::types::NodeId;
820
821 let mut id_col = ValueVector::new();
822 id_col.push_node_id(NodeId::new(100));
823 id_col.push_node_id(NodeId::new(5));
824 id_col.push_node_id(NodeId::new(50));
825
826 let data_col = ValueVector::from_values(&[
827 Value::from("hundred"),
828 Value::from("five"),
829 Value::from("fifty"),
830 ]);
831
832 let chunk = DataChunk::new(vec![id_col, data_col]);
833 let sorted = chunk.sort_by_column(0);
834
835 assert_eq!(sorted.len(), 3);
836 assert_eq!(
838 sorted.column(0).unwrap().get_node_id(0),
839 Some(NodeId::new(5))
840 );
841 assert_eq!(
842 sorted.column(0).unwrap().get_node_id(1),
843 Some(NodeId::new(50))
844 );
845 assert_eq!(
846 sorted.column(0).unwrap().get_node_id(2),
847 Some(NodeId::new(100))
848 );
849 assert_eq!(
851 sorted.column(1).unwrap().get_value(0),
852 Some(Value::from("five"))
853 );
854 }
855
856 #[test]
857 fn test_sort_by_column_empty() {
858 let chunk = DataChunk::empty();
859 let sorted = chunk.sort_by_column(0);
860 assert_eq!(sorted.len(), 0);
861 }
862
863 #[test]
864 fn test_sort_by_column_single_row() {
865 let v = ValueVector::from_values(&[Value::Int64(42)]);
866 let chunk = DataChunk::new(vec![v]);
867 let sorted = chunk.sort_by_column(0);
868 assert_eq!(sorted.len(), 1);
869 assert_eq!(
870 sorted.column(0).unwrap().get_value(0),
871 Some(Value::Int64(42))
872 );
873 }
874}