1use super::selection::SelectionVector;
7use super::vector::ValueVector;
8use crate::index::ZoneMapEntry;
9use grafeo_common::types::LogicalType;
10use grafeo_common::utils::hash::FxHashMap;
11
12pub const DEFAULT_CHUNK_SIZE: usize = 2048;
14
15#[derive(Debug, Clone, Default)]
27pub struct ChunkZoneHints {
28 pub column_hints: FxHashMap<usize, ZoneMapEntry>,
30}
31
32#[derive(Debug)]
57pub struct DataChunk {
58 columns: Vec<ValueVector>,
60 selection: Option<SelectionVector>,
62 count: usize,
64 capacity: usize,
66 zone_hints: Option<ChunkZoneHints>,
68}
69
70impl DataChunk {
71 #[must_use]
73 pub fn empty() -> Self {
74 Self {
75 columns: Vec::new(),
76 selection: None,
77 count: 0,
78 capacity: 0,
79 zone_hints: None,
80 }
81 }
82
83 #[must_use]
85 pub fn new(columns: Vec<ValueVector>) -> Self {
86 let count = columns.first().map_or(0, ValueVector::len);
87 let capacity = columns.first().map_or(DEFAULT_CHUNK_SIZE, |c| c.len());
88 Self {
89 columns,
90 selection: None,
91 count,
92 capacity,
93 zone_hints: None,
94 }
95 }
96
97 #[must_use]
99 pub fn with_schema(column_types: &[LogicalType]) -> Self {
100 Self::with_capacity(column_types, DEFAULT_CHUNK_SIZE)
101 }
102
103 #[must_use]
105 pub fn with_capacity(column_types: &[LogicalType], capacity: usize) -> Self {
106 let columns = column_types
107 .iter()
108 .map(|t| ValueVector::with_capacity(t.clone(), capacity))
109 .collect();
110
111 Self {
112 columns,
113 selection: None,
114 count: 0,
115 capacity,
116 zone_hints: None,
117 }
118 }
119
120 #[must_use]
122 pub fn column_count(&self) -> usize {
123 self.columns.len()
124 }
125
126 #[must_use]
128 pub fn row_count(&self) -> usize {
129 self.selection.as_ref().map_or(self.count, |s| s.len())
130 }
131
132 #[must_use]
134 pub fn len(&self) -> usize {
135 self.row_count()
136 }
137
138 #[must_use]
140 pub fn columns(&self) -> &[ValueVector] {
141 &self.columns
142 }
143
144 #[must_use]
146 pub fn total_row_count(&self) -> usize {
147 self.count
148 }
149
150 #[must_use]
152 pub fn is_empty(&self) -> bool {
153 self.row_count() == 0
154 }
155
156 #[must_use]
158 pub fn capacity(&self) -> usize {
159 self.capacity
160 }
161
162 #[must_use]
164 pub fn is_full(&self) -> bool {
165 self.count >= self.capacity
166 }
167
168 #[must_use]
170 pub fn column(&self, index: usize) -> Option<&ValueVector> {
171 self.columns.get(index)
172 }
173
174 pub fn column_mut(&mut self, index: usize) -> Option<&mut ValueVector> {
176 self.columns.get_mut(index)
177 }
178
179 #[must_use]
181 pub fn selection(&self) -> Option<&SelectionVector> {
182 self.selection.as_ref()
183 }
184
185 pub fn set_selection(&mut self, selection: SelectionVector) {
187 self.selection = Some(selection);
188 }
189
190 pub fn clear_selection(&mut self) {
192 self.selection = None;
193 }
194
195 pub fn set_zone_hints(&mut self, hints: ChunkZoneHints) {
200 self.zone_hints = Some(hints);
201 }
202
203 #[must_use]
207 pub fn zone_hints(&self) -> Option<&ChunkZoneHints> {
208 self.zone_hints.as_ref()
209 }
210
211 pub fn clear_zone_hints(&mut self) {
213 self.zone_hints = None;
214 }
215
216 pub fn set_count(&mut self, count: usize) {
218 self.count = count;
219 }
220
221 pub fn reset(&mut self) {
223 for col in &mut self.columns {
224 col.clear();
225 }
226 self.selection = None;
227 self.zone_hints = None;
228 self.count = 0;
229 }
230
231 pub fn flatten(&mut self) {
236 let Some(selection) = self.selection.take() else {
237 return;
238 };
239
240 let selected_count = selection.len();
241
242 let mut new_columns = Vec::with_capacity(self.columns.len());
244
245 for col in &self.columns {
246 let mut new_col = ValueVector::with_type(col.data_type().clone());
248 for idx in selection.iter() {
249 if let Some(val) = col.get(idx) {
250 new_col.push(val);
251 }
252 }
253 new_columns.push(new_col);
254 }
255
256 self.columns = new_columns;
257 self.count = selected_count;
258 self.capacity = selected_count;
259 }
260
261 pub fn selected_indices(&self) -> Box<dyn Iterator<Item = usize> + '_> {
263 match &self.selection {
264 Some(sel) => Box::new(sel.iter()),
265 None => Box::new(0..self.count),
266 }
267 }
268
269 pub fn concat(chunks: &[DataChunk]) -> DataChunk {
273 if chunks.is_empty() {
274 return DataChunk::empty();
275 }
276
277 if chunks.len() == 1 {
278 return DataChunk {
280 columns: chunks[0].columns.clone(),
281 selection: chunks[0].selection.clone(),
282 count: chunks[0].count,
283 capacity: chunks[0].capacity,
284 zone_hints: chunks[0].zone_hints.clone(),
285 };
286 }
287
288 let num_columns = chunks[0].column_count();
289 if num_columns == 0 {
290 return DataChunk::empty();
291 }
292
293 let total_rows: usize = chunks.iter().map(|c| c.row_count()).sum();
294
295 let mut result_columns = Vec::with_capacity(num_columns);
297
298 for col_idx in 0..num_columns {
299 let mut concat_vector = ValueVector::new();
300
301 for chunk in chunks {
302 if let Some(col) = chunk.column(col_idx) {
303 for i in chunk.selected_indices() {
305 if let Some(val) = col.get(i) {
306 concat_vector.push(val);
307 }
308 }
309 }
310 }
311
312 result_columns.push(concat_vector);
313 }
314
315 DataChunk {
316 columns: result_columns,
317 selection: None,
318 count: total_rows,
319 capacity: total_rows,
320 zone_hints: None,
321 }
322 }
323
324 pub fn filter(&self, predicate: &SelectionVector) -> DataChunk {
326 let selected: Vec<usize> = predicate
328 .iter()
329 .filter(|&idx| self.selection.as_ref().map_or(true, |s| s.contains(idx)))
330 .collect();
331
332 let mut result_columns = Vec::with_capacity(self.columns.len());
333
334 for col in &self.columns {
335 let mut new_col = ValueVector::new();
336 for &idx in &selected {
337 if let Some(val) = col.get(idx) {
338 new_col.push(val);
339 }
340 }
341 result_columns.push(new_col);
342 }
343
344 DataChunk {
345 columns: result_columns,
346 selection: None,
347 count: selected.len(),
348 capacity: selected.len(),
349 zone_hints: None,
350 }
351 }
352
353 #[must_use]
357 pub fn slice(&self, offset: usize, count: usize) -> DataChunk {
358 if offset >= self.len() || count == 0 {
359 return DataChunk::empty();
360 }
361
362 let actual_count = count.min(self.len() - offset);
363 let mut result_columns = Vec::with_capacity(self.columns.len());
364
365 for col in &self.columns {
366 let mut new_col = ValueVector::new();
367 for i in offset..(offset + actual_count) {
368 let actual_idx = if let Some(sel) = &self.selection {
369 sel.get(i).unwrap_or(i)
370 } else {
371 i
372 };
373 if let Some(val) = col.get(actual_idx) {
374 new_col.push(val);
375 }
376 }
377 result_columns.push(new_col);
378 }
379
380 DataChunk {
381 columns: result_columns,
382 selection: None,
383 count: actual_count,
384 capacity: actual_count,
385 zone_hints: None,
386 }
387 }
388
389 #[must_use]
391 pub fn num_columns(&self) -> usize {
392 self.columns.len()
393 }
394}
395
396impl Clone for DataChunk {
397 fn clone(&self) -> Self {
398 Self {
399 columns: self.columns.clone(),
400 selection: self.selection.clone(),
401 count: self.count,
402 capacity: self.capacity,
403 zone_hints: self.zone_hints.clone(),
404 }
405 }
406}
407
408pub struct DataChunkBuilder {
410 chunk: DataChunk,
411}
412
413impl DataChunkBuilder {
414 #[must_use]
416 pub fn with_schema(column_types: &[LogicalType]) -> Self {
417 Self {
418 chunk: DataChunk::with_schema(column_types),
419 }
420 }
421
422 #[must_use]
424 pub fn with_capacity(column_types: &[LogicalType], capacity: usize) -> Self {
425 Self {
426 chunk: DataChunk::with_capacity(column_types, capacity),
427 }
428 }
429
430 #[must_use]
432 pub fn new(column_types: &[LogicalType]) -> Self {
433 Self::with_schema(column_types)
434 }
435
436 #[must_use]
438 pub fn row_count(&self) -> usize {
439 self.chunk.count
440 }
441
442 #[must_use]
444 pub fn is_full(&self) -> bool {
445 self.chunk.is_full()
446 }
447
448 pub fn column_mut(&mut self, index: usize) -> Option<&mut ValueVector> {
450 self.chunk.column_mut(index)
451 }
452
453 pub fn advance_row(&mut self) {
455 self.chunk.count += 1;
456 }
457
458 #[must_use]
460 pub fn finish(self) -> DataChunk {
461 self.chunk
462 }
463
464 pub fn reset(&mut self) {
466 self.chunk.reset();
467 }
468}
469
470#[cfg(test)]
471mod tests {
472 use super::*;
473
474 #[test]
475 fn test_chunk_creation() {
476 let schema = [LogicalType::Int64, LogicalType::String];
477 let chunk = DataChunk::with_schema(&schema);
478
479 assert_eq!(chunk.column_count(), 2);
480 assert_eq!(chunk.row_count(), 0);
481 assert!(chunk.is_empty());
482 }
483
484 #[test]
485 fn test_chunk_builder() {
486 let schema = [LogicalType::Int64, LogicalType::String];
487 let mut builder = DataChunkBuilder::with_schema(&schema);
488
489 builder.column_mut(0).unwrap().push_int64(1);
491 builder.column_mut(1).unwrap().push_string("hello");
492 builder.advance_row();
493
494 builder.column_mut(0).unwrap().push_int64(2);
496 builder.column_mut(1).unwrap().push_string("world");
497 builder.advance_row();
498
499 let chunk = builder.finish();
500
501 assert_eq!(chunk.row_count(), 2);
502 assert_eq!(chunk.column(0).unwrap().get_int64(0), Some(1));
503 assert_eq!(chunk.column(1).unwrap().get_string(1), Some("world"));
504 }
505
506 #[test]
507 fn test_chunk_selection() {
508 let schema = [LogicalType::Int64];
509 let mut builder = DataChunkBuilder::with_schema(&schema);
510
511 for i in 0..10 {
512 builder.column_mut(0).unwrap().push_int64(i);
513 builder.advance_row();
514 }
515
516 let mut chunk = builder.finish();
517 assert_eq!(chunk.row_count(), 10);
518
519 let selection = SelectionVector::from_predicate(10, |i| i % 2 == 0);
521 chunk.set_selection(selection);
522
523 assert_eq!(chunk.row_count(), 5); assert_eq!(chunk.total_row_count(), 10);
525 }
526
527 #[test]
528 fn test_chunk_reset() {
529 let schema = [LogicalType::Int64];
530 let mut builder = DataChunkBuilder::with_schema(&schema);
531
532 builder.column_mut(0).unwrap().push_int64(1);
533 builder.advance_row();
534
535 let mut chunk = builder.finish();
536 assert_eq!(chunk.row_count(), 1);
537
538 chunk.reset();
539 assert_eq!(chunk.row_count(), 0);
540 assert!(chunk.is_empty());
541 }
542
543 #[test]
544 fn test_selected_indices() {
545 let schema = [LogicalType::Int64];
546 let mut chunk = DataChunk::with_schema(&schema);
547 chunk.set_count(5);
548
549 let indices: Vec<_> = chunk.selected_indices().collect();
551 assert_eq!(indices, vec![0, 1, 2, 3, 4]);
552
553 let selection = SelectionVector::from_predicate(5, |i| i == 1 || i == 3);
555 chunk.set_selection(selection);
556
557 let indices: Vec<_> = chunk.selected_indices().collect();
558 assert_eq!(indices, vec![1, 3]);
559 }
560
561 #[test]
562 fn test_chunk_flatten() {
563 let schema = [LogicalType::Int64, LogicalType::String];
564 let mut builder = DataChunkBuilder::with_schema(&schema);
565
566 let letters = ["a", "b", "c", "d", "e"];
568 for i in 0..5 {
569 builder.column_mut(0).unwrap().push_int64(i);
570 builder
571 .column_mut(1)
572 .unwrap()
573 .push_string(letters[i as usize]);
574 builder.advance_row();
575 }
576
577 let mut chunk = builder.finish();
578
579 let selection = SelectionVector::from_predicate(5, |i| i % 2 == 1);
581 chunk.set_selection(selection);
582
583 assert_eq!(chunk.row_count(), 2);
584 assert_eq!(chunk.total_row_count(), 5);
585
586 chunk.flatten();
588
589 assert_eq!(chunk.row_count(), 2);
591 assert_eq!(chunk.total_row_count(), 2);
592 assert!(chunk.selection().is_none());
593
594 assert_eq!(chunk.column(0).unwrap().get_int64(0), Some(1));
596 assert_eq!(chunk.column(0).unwrap().get_int64(1), Some(3));
597 assert_eq!(chunk.column(1).unwrap().get_string(0), Some("b"));
598 assert_eq!(chunk.column(1).unwrap().get_string(1), Some("d"));
599 }
600
601 #[test]
602 fn test_chunk_flatten_no_selection() {
603 let schema = [LogicalType::Int64];
604 let mut builder = DataChunkBuilder::with_schema(&schema);
605
606 builder.column_mut(0).unwrap().push_int64(42);
607 builder.advance_row();
608
609 let mut chunk = builder.finish();
610 let original_count = chunk.row_count();
611
612 chunk.flatten();
614
615 assert_eq!(chunk.row_count(), original_count);
616 assert_eq!(chunk.column(0).unwrap().get_int64(0), Some(42));
617 }
618
619 #[test]
620 fn test_chunk_zone_hints_default() {
621 let hints = ChunkZoneHints::default();
622 assert!(hints.column_hints.is_empty());
623 }
624
625 #[test]
626 fn test_chunk_zone_hints_set_and_get() {
627 let schema = [LogicalType::Int64];
628 let mut chunk = DataChunk::with_schema(&schema);
629
630 assert!(chunk.zone_hints().is_none());
632
633 let mut hints = ChunkZoneHints::default();
635 hints.column_hints.insert(
636 0,
637 crate::index::ZoneMapEntry::with_min_max(
638 grafeo_common::types::Value::Int64(10),
639 grafeo_common::types::Value::Int64(100),
640 0,
641 10,
642 ),
643 );
644 chunk.set_zone_hints(hints);
645
646 assert!(chunk.zone_hints().is_some());
648 let retrieved = chunk.zone_hints().unwrap();
649 assert_eq!(retrieved.column_hints.len(), 1);
650 assert!(retrieved.column_hints.contains_key(&0));
651 }
652
653 #[test]
654 fn test_chunk_zone_hints_clear() {
655 let schema = [LogicalType::Int64];
656 let mut chunk = DataChunk::with_schema(&schema);
657
658 let hints = ChunkZoneHints::default();
660 chunk.set_zone_hints(hints);
661 assert!(chunk.zone_hints().is_some());
662
663 chunk.clear_zone_hints();
665 assert!(chunk.zone_hints().is_none());
666 }
667
668 #[test]
669 fn test_chunk_zone_hints_preserved_on_clone() {
670 let schema = [LogicalType::Int64];
671 let mut chunk = DataChunk::with_schema(&schema);
672
673 let mut hints = ChunkZoneHints::default();
675 hints.column_hints.insert(
676 0,
677 crate::index::ZoneMapEntry::with_min_max(
678 grafeo_common::types::Value::Int64(1),
679 grafeo_common::types::Value::Int64(10),
680 0,
681 10,
682 ),
683 );
684 chunk.set_zone_hints(hints);
685
686 let cloned = chunk.clone();
688 assert!(cloned.zone_hints().is_some());
689 assert_eq!(cloned.zone_hints().unwrap().column_hints.len(), 1);
690 }
691
692 #[test]
693 fn test_chunk_reset_clears_zone_hints() {
694 let schema = [LogicalType::Int64];
695 let mut chunk = DataChunk::with_schema(&schema);
696
697 let hints = ChunkZoneHints::default();
699 chunk.set_zone_hints(hints);
700 assert!(chunk.zone_hints().is_some());
701
702 chunk.reset();
704 assert!(chunk.zone_hints().is_none());
705 }
706}