1use super::selection::SelectionVector;
7use super::vector::ValueVector;
8use crate::index::ZoneMapEntry;
9use grafeo_common::types::LogicalType;
10use grafeo_common::utils::hash::FxHashMap;
11
12pub const DEFAULT_CHUNK_SIZE: usize = 2048;
14
15#[derive(Debug, Clone, Default)]
27pub struct ChunkZoneHints {
28 pub column_hints: FxHashMap<usize, ZoneMapEntry>,
30}
31
32#[derive(Debug)]
57pub struct DataChunk {
58 columns: Vec<ValueVector>,
60 selection: Option<SelectionVector>,
62 count: usize,
64 capacity: usize,
66 zone_hints: Option<ChunkZoneHints>,
68}
69
70impl DataChunk {
71 #[must_use]
73 pub fn empty() -> Self {
74 Self {
75 columns: Vec::new(),
76 selection: None,
77 count: 0,
78 capacity: 0,
79 zone_hints: None,
80 }
81 }
82
83 #[must_use]
85 pub fn new(columns: Vec<ValueVector>) -> Self {
86 let count = columns.first().map_or(0, ValueVector::len);
87 let capacity = columns.first().map_or(DEFAULT_CHUNK_SIZE, |c| c.len());
88 Self {
89 columns,
90 selection: None,
91 count,
92 capacity,
93 zone_hints: None,
94 }
95 }
96
97 #[must_use]
99 pub fn with_schema(column_types: &[LogicalType]) -> Self {
100 Self::with_capacity(column_types, DEFAULT_CHUNK_SIZE)
101 }
102
103 #[must_use]
105 pub fn with_capacity(column_types: &[LogicalType], capacity: usize) -> Self {
106 let columns = column_types
107 .iter()
108 .map(|t| ValueVector::with_capacity(t.clone(), capacity))
109 .collect();
110
111 Self {
112 columns,
113 selection: None,
114 count: 0,
115 capacity,
116 zone_hints: None,
117 }
118 }
119
120 #[must_use]
122 pub fn column_count(&self) -> usize {
123 self.columns.len()
124 }
125
126 #[must_use]
128 pub fn row_count(&self) -> usize {
129 self.selection.as_ref().map_or(self.count, |s| s.len())
130 }
131
132 #[must_use]
134 pub fn len(&self) -> usize {
135 self.row_count()
136 }
137
138 #[must_use]
140 pub fn columns(&self) -> &[ValueVector] {
141 &self.columns
142 }
143
144 #[must_use]
146 pub fn total_row_count(&self) -> usize {
147 self.count
148 }
149
150 #[must_use]
152 pub fn is_empty(&self) -> bool {
153 self.row_count() == 0
154 }
155
156 #[must_use]
158 pub fn capacity(&self) -> usize {
159 self.capacity
160 }
161
162 #[must_use]
164 pub fn is_full(&self) -> bool {
165 self.count >= self.capacity
166 }
167
168 #[must_use]
170 pub fn column(&self, index: usize) -> Option<&ValueVector> {
171 self.columns.get(index)
172 }
173
174 pub fn column_mut(&mut self, index: usize) -> Option<&mut ValueVector> {
176 self.columns.get_mut(index)
177 }
178
179 #[must_use]
181 pub fn selection(&self) -> Option<&SelectionVector> {
182 self.selection.as_ref()
183 }
184
185 pub fn set_selection(&mut self, selection: SelectionVector) {
187 self.selection = Some(selection);
188 }
189
190 pub fn clear_selection(&mut self) {
192 self.selection = None;
193 }
194
195 pub fn set_zone_hints(&mut self, hints: ChunkZoneHints) {
200 self.zone_hints = Some(hints);
201 }
202
203 #[must_use]
207 pub fn zone_hints(&self) -> Option<&ChunkZoneHints> {
208 self.zone_hints.as_ref()
209 }
210
211 pub fn clear_zone_hints(&mut self) {
213 self.zone_hints = None;
214 }
215
216 pub fn set_count(&mut self, count: usize) {
218 self.count = count;
219 }
220
221 pub fn reset(&mut self) {
223 for col in &mut self.columns {
224 col.clear();
225 }
226 self.selection = None;
227 self.zone_hints = None;
228 self.count = 0;
229 }
230
231 pub fn flatten(&mut self) {
236 let selection = match self.selection.take() {
237 Some(sel) => sel,
238 None => return,
239 };
240
241 let selected_count = selection.len();
242
243 let mut new_columns = Vec::with_capacity(self.columns.len());
245
246 for col in &self.columns {
247 let mut new_col = ValueVector::with_type(col.data_type().clone());
249 for idx in selection.iter() {
250 if let Some(val) = col.get(idx) {
251 new_col.push(val);
252 }
253 }
254 new_columns.push(new_col);
255 }
256
257 self.columns = new_columns;
258 self.count = selected_count;
259 self.capacity = selected_count;
260 }
261
262 pub fn selected_indices(&self) -> Box<dyn Iterator<Item = usize> + '_> {
264 match &self.selection {
265 Some(sel) => Box::new(sel.iter()),
266 None => Box::new(0..self.count),
267 }
268 }
269
270 pub fn concat(chunks: &[DataChunk]) -> DataChunk {
274 if chunks.is_empty() {
275 return DataChunk::empty();
276 }
277
278 if chunks.len() == 1 {
279 return DataChunk {
281 columns: chunks[0].columns.clone(),
282 selection: chunks[0].selection.clone(),
283 count: chunks[0].count,
284 capacity: chunks[0].capacity,
285 zone_hints: chunks[0].zone_hints.clone(),
286 };
287 }
288
289 let num_columns = chunks[0].column_count();
290 if num_columns == 0 {
291 return DataChunk::empty();
292 }
293
294 let total_rows: usize = chunks.iter().map(|c| c.row_count()).sum();
295
296 let mut result_columns = Vec::with_capacity(num_columns);
298
299 for col_idx in 0..num_columns {
300 let mut concat_vector = ValueVector::new();
301
302 for chunk in chunks {
303 if let Some(col) = chunk.column(col_idx) {
304 for i in chunk.selected_indices() {
306 if let Some(val) = col.get(i) {
307 concat_vector.push(val);
308 }
309 }
310 }
311 }
312
313 result_columns.push(concat_vector);
314 }
315
316 DataChunk {
317 columns: result_columns,
318 selection: None,
319 count: total_rows,
320 capacity: total_rows,
321 zone_hints: None,
322 }
323 }
324
325 pub fn filter(&self, predicate: &SelectionVector) -> DataChunk {
327 let selected: Vec<usize> = predicate
329 .iter()
330 .filter(|&idx| self.selection.as_ref().map_or(true, |s| s.contains(idx)))
331 .collect();
332
333 let mut result_columns = Vec::with_capacity(self.columns.len());
334
335 for col in &self.columns {
336 let mut new_col = ValueVector::new();
337 for &idx in &selected {
338 if let Some(val) = col.get(idx) {
339 new_col.push(val);
340 }
341 }
342 result_columns.push(new_col);
343 }
344
345 DataChunk {
346 columns: result_columns,
347 selection: None,
348 count: selected.len(),
349 capacity: selected.len(),
350 zone_hints: None,
351 }
352 }
353
354 #[must_use]
358 pub fn slice(&self, offset: usize, count: usize) -> DataChunk {
359 if offset >= self.len() || count == 0 {
360 return DataChunk::empty();
361 }
362
363 let actual_count = count.min(self.len() - offset);
364 let mut result_columns = Vec::with_capacity(self.columns.len());
365
366 for col in &self.columns {
367 let mut new_col = ValueVector::new();
368 for i in offset..(offset + actual_count) {
369 let actual_idx = if let Some(sel) = &self.selection {
370 sel.get(i).unwrap_or(i)
371 } else {
372 i
373 };
374 if let Some(val) = col.get(actual_idx) {
375 new_col.push(val);
376 }
377 }
378 result_columns.push(new_col);
379 }
380
381 DataChunk {
382 columns: result_columns,
383 selection: None,
384 count: actual_count,
385 capacity: actual_count,
386 zone_hints: None,
387 }
388 }
389
390 #[must_use]
392 pub fn num_columns(&self) -> usize {
393 self.columns.len()
394 }
395}
396
397impl Clone for DataChunk {
398 fn clone(&self) -> Self {
399 Self {
400 columns: self.columns.clone(),
401 selection: self.selection.clone(),
402 count: self.count,
403 capacity: self.capacity,
404 zone_hints: self.zone_hints.clone(),
405 }
406 }
407}
408
409pub struct DataChunkBuilder {
411 chunk: DataChunk,
412}
413
414impl DataChunkBuilder {
415 #[must_use]
417 pub fn with_schema(column_types: &[LogicalType]) -> Self {
418 Self {
419 chunk: DataChunk::with_schema(column_types),
420 }
421 }
422
423 #[must_use]
425 pub fn with_capacity(column_types: &[LogicalType], capacity: usize) -> Self {
426 Self {
427 chunk: DataChunk::with_capacity(column_types, capacity),
428 }
429 }
430
431 #[must_use]
433 pub fn new(column_types: &[LogicalType]) -> Self {
434 Self::with_schema(column_types)
435 }
436
437 #[must_use]
439 pub fn row_count(&self) -> usize {
440 self.chunk.count
441 }
442
443 #[must_use]
445 pub fn is_full(&self) -> bool {
446 self.chunk.is_full()
447 }
448
449 pub fn column_mut(&mut self, index: usize) -> Option<&mut ValueVector> {
451 self.chunk.column_mut(index)
452 }
453
454 pub fn advance_row(&mut self) {
456 self.chunk.count += 1;
457 }
458
459 #[must_use]
461 pub fn finish(self) -> DataChunk {
462 self.chunk
463 }
464
465 pub fn reset(&mut self) {
467 self.chunk.reset();
468 }
469}
470
471#[cfg(test)]
472mod tests {
473 use super::*;
474
475 #[test]
476 fn test_chunk_creation() {
477 let schema = [LogicalType::Int64, LogicalType::String];
478 let chunk = DataChunk::with_schema(&schema);
479
480 assert_eq!(chunk.column_count(), 2);
481 assert_eq!(chunk.row_count(), 0);
482 assert!(chunk.is_empty());
483 }
484
485 #[test]
486 fn test_chunk_builder() {
487 let schema = [LogicalType::Int64, LogicalType::String];
488 let mut builder = DataChunkBuilder::with_schema(&schema);
489
490 builder.column_mut(0).unwrap().push_int64(1);
492 builder.column_mut(1).unwrap().push_string("hello");
493 builder.advance_row();
494
495 builder.column_mut(0).unwrap().push_int64(2);
497 builder.column_mut(1).unwrap().push_string("world");
498 builder.advance_row();
499
500 let chunk = builder.finish();
501
502 assert_eq!(chunk.row_count(), 2);
503 assert_eq!(chunk.column(0).unwrap().get_int64(0), Some(1));
504 assert_eq!(chunk.column(1).unwrap().get_string(1), Some("world"));
505 }
506
507 #[test]
508 fn test_chunk_selection() {
509 let schema = [LogicalType::Int64];
510 let mut builder = DataChunkBuilder::with_schema(&schema);
511
512 for i in 0..10 {
513 builder.column_mut(0).unwrap().push_int64(i);
514 builder.advance_row();
515 }
516
517 let mut chunk = builder.finish();
518 assert_eq!(chunk.row_count(), 10);
519
520 let selection = SelectionVector::from_predicate(10, |i| i % 2 == 0);
522 chunk.set_selection(selection);
523
524 assert_eq!(chunk.row_count(), 5); assert_eq!(chunk.total_row_count(), 10);
526 }
527
528 #[test]
529 fn test_chunk_reset() {
530 let schema = [LogicalType::Int64];
531 let mut builder = DataChunkBuilder::with_schema(&schema);
532
533 builder.column_mut(0).unwrap().push_int64(1);
534 builder.advance_row();
535
536 let mut chunk = builder.finish();
537 assert_eq!(chunk.row_count(), 1);
538
539 chunk.reset();
540 assert_eq!(chunk.row_count(), 0);
541 assert!(chunk.is_empty());
542 }
543
544 #[test]
545 fn test_selected_indices() {
546 let schema = [LogicalType::Int64];
547 let mut chunk = DataChunk::with_schema(&schema);
548 chunk.set_count(5);
549
550 let indices: Vec<_> = chunk.selected_indices().collect();
552 assert_eq!(indices, vec![0, 1, 2, 3, 4]);
553
554 let selection = SelectionVector::from_predicate(5, |i| i == 1 || i == 3);
556 chunk.set_selection(selection);
557
558 let indices: Vec<_> = chunk.selected_indices().collect();
559 assert_eq!(indices, vec![1, 3]);
560 }
561
562 #[test]
563 fn test_chunk_flatten() {
564 let schema = [LogicalType::Int64, LogicalType::String];
565 let mut builder = DataChunkBuilder::with_schema(&schema);
566
567 let letters = ["a", "b", "c", "d", "e"];
569 for i in 0..5 {
570 builder.column_mut(0).unwrap().push_int64(i);
571 builder
572 .column_mut(1)
573 .unwrap()
574 .push_string(letters[i as usize]);
575 builder.advance_row();
576 }
577
578 let mut chunk = builder.finish();
579
580 let selection = SelectionVector::from_predicate(5, |i| i % 2 == 1);
582 chunk.set_selection(selection);
583
584 assert_eq!(chunk.row_count(), 2);
585 assert_eq!(chunk.total_row_count(), 5);
586
587 chunk.flatten();
589
590 assert_eq!(chunk.row_count(), 2);
592 assert_eq!(chunk.total_row_count(), 2);
593 assert!(chunk.selection().is_none());
594
595 assert_eq!(chunk.column(0).unwrap().get_int64(0), Some(1));
597 assert_eq!(chunk.column(0).unwrap().get_int64(1), Some(3));
598 assert_eq!(chunk.column(1).unwrap().get_string(0), Some("b"));
599 assert_eq!(chunk.column(1).unwrap().get_string(1), Some("d"));
600 }
601
602 #[test]
603 fn test_chunk_flatten_no_selection() {
604 let schema = [LogicalType::Int64];
605 let mut builder = DataChunkBuilder::with_schema(&schema);
606
607 builder.column_mut(0).unwrap().push_int64(42);
608 builder.advance_row();
609
610 let mut chunk = builder.finish();
611 let original_count = chunk.row_count();
612
613 chunk.flatten();
615
616 assert_eq!(chunk.row_count(), original_count);
617 assert_eq!(chunk.column(0).unwrap().get_int64(0), Some(42));
618 }
619
620 #[test]
621 fn test_chunk_zone_hints_default() {
622 let hints = ChunkZoneHints::default();
623 assert!(hints.column_hints.is_empty());
624 }
625
626 #[test]
627 fn test_chunk_zone_hints_set_and_get() {
628 let schema = [LogicalType::Int64];
629 let mut chunk = DataChunk::with_schema(&schema);
630
631 assert!(chunk.zone_hints().is_none());
633
634 let mut hints = ChunkZoneHints::default();
636 hints.column_hints.insert(
637 0,
638 crate::index::ZoneMapEntry::with_min_max(
639 grafeo_common::types::Value::Int64(10),
640 grafeo_common::types::Value::Int64(100),
641 0,
642 10,
643 ),
644 );
645 chunk.set_zone_hints(hints);
646
647 assert!(chunk.zone_hints().is_some());
649 let retrieved = chunk.zone_hints().unwrap();
650 assert_eq!(retrieved.column_hints.len(), 1);
651 assert!(retrieved.column_hints.contains_key(&0));
652 }
653
654 #[test]
655 fn test_chunk_zone_hints_clear() {
656 let schema = [LogicalType::Int64];
657 let mut chunk = DataChunk::with_schema(&schema);
658
659 let hints = ChunkZoneHints::default();
661 chunk.set_zone_hints(hints);
662 assert!(chunk.zone_hints().is_some());
663
664 chunk.clear_zone_hints();
666 assert!(chunk.zone_hints().is_none());
667 }
668
669 #[test]
670 fn test_chunk_zone_hints_preserved_on_clone() {
671 let schema = [LogicalType::Int64];
672 let mut chunk = DataChunk::with_schema(&schema);
673
674 let mut hints = ChunkZoneHints::default();
676 hints.column_hints.insert(
677 0,
678 crate::index::ZoneMapEntry::with_min_max(
679 grafeo_common::types::Value::Int64(1),
680 grafeo_common::types::Value::Int64(10),
681 0,
682 10,
683 ),
684 );
685 chunk.set_zone_hints(hints);
686
687 let cloned = chunk.clone();
689 assert!(cloned.zone_hints().is_some());
690 assert_eq!(cloned.zone_hints().unwrap().column_hints.len(), 1);
691 }
692
693 #[test]
694 fn test_chunk_reset_clears_zone_hints() {
695 let schema = [LogicalType::Int64];
696 let mut chunk = DataChunk::with_schema(&schema);
697
698 let hints = ChunkZoneHints::default();
700 chunk.set_zone_hints(hints);
701 assert!(chunk.zone_hints().is_some());
702
703 chunk.reset();
705 assert!(chunk.zone_hints().is_none());
706 }
707}