1use super::selection::SelectionVector;
7use super::vector::ValueVector;
8use grafeo_common::types::LogicalType;
9
10pub const DEFAULT_CHUNK_SIZE: usize = 2048;
12
13#[derive(Debug)]
38pub struct DataChunk {
39 columns: Vec<ValueVector>,
41 selection: Option<SelectionVector>,
43 count: usize,
45 capacity: usize,
47}
48
49impl DataChunk {
50 #[must_use]
52 pub fn empty() -> Self {
53 Self {
54 columns: Vec::new(),
55 selection: None,
56 count: 0,
57 capacity: 0,
58 }
59 }
60
61 #[must_use]
63 pub fn new(columns: Vec<ValueVector>) -> Self {
64 let count = columns.first().map_or(0, ValueVector::len);
65 let capacity = columns.first().map_or(DEFAULT_CHUNK_SIZE, |c| c.len());
66 Self {
67 columns,
68 selection: None,
69 count,
70 capacity,
71 }
72 }
73
74 #[must_use]
76 pub fn with_schema(column_types: &[LogicalType]) -> Self {
77 Self::with_capacity(column_types, DEFAULT_CHUNK_SIZE)
78 }
79
80 #[must_use]
82 pub fn with_capacity(column_types: &[LogicalType], capacity: usize) -> Self {
83 let columns = column_types
84 .iter()
85 .map(|t| ValueVector::with_capacity(t.clone(), capacity))
86 .collect();
87
88 Self {
89 columns,
90 selection: None,
91 count: 0,
92 capacity,
93 }
94 }
95
96 #[must_use]
98 pub fn column_count(&self) -> usize {
99 self.columns.len()
100 }
101
102 #[must_use]
104 pub fn row_count(&self) -> usize {
105 self.selection.as_ref().map_or(self.count, |s| s.len())
106 }
107
108 #[must_use]
110 pub fn len(&self) -> usize {
111 self.row_count()
112 }
113
114 #[must_use]
116 pub fn columns(&self) -> &[ValueVector] {
117 &self.columns
118 }
119
120 #[must_use]
122 pub fn total_row_count(&self) -> usize {
123 self.count
124 }
125
126 #[must_use]
128 pub fn is_empty(&self) -> bool {
129 self.row_count() == 0
130 }
131
132 #[must_use]
134 pub fn capacity(&self) -> usize {
135 self.capacity
136 }
137
138 #[must_use]
140 pub fn is_full(&self) -> bool {
141 self.count >= self.capacity
142 }
143
144 #[must_use]
146 pub fn column(&self, index: usize) -> Option<&ValueVector> {
147 self.columns.get(index)
148 }
149
150 pub fn column_mut(&mut self, index: usize) -> Option<&mut ValueVector> {
152 self.columns.get_mut(index)
153 }
154
155 #[must_use]
157 pub fn selection(&self) -> Option<&SelectionVector> {
158 self.selection.as_ref()
159 }
160
161 pub fn set_selection(&mut self, selection: SelectionVector) {
163 self.selection = Some(selection);
164 }
165
166 pub fn clear_selection(&mut self) {
168 self.selection = None;
169 }
170
171 pub fn set_count(&mut self, count: usize) {
173 self.count = count;
174 }
175
176 pub fn reset(&mut self) {
178 for col in &mut self.columns {
179 col.clear();
180 }
181 self.selection = None;
182 self.count = 0;
183 }
184
185 pub fn flatten(&mut self) {
190 let selection = match self.selection.take() {
191 Some(sel) => sel,
192 None => return,
193 };
194
195 let selected_count = selection.len();
196
197 let mut new_columns = Vec::with_capacity(self.columns.len());
199
200 for col in &self.columns {
201 let mut new_col = ValueVector::with_type(col.data_type().clone());
203 for idx in selection.iter() {
204 if let Some(val) = col.get(idx) {
205 new_col.push(val);
206 }
207 }
208 new_columns.push(new_col);
209 }
210
211 self.columns = new_columns;
212 self.count = selected_count;
213 self.capacity = selected_count;
214 }
215
216 pub fn selected_indices(&self) -> Box<dyn Iterator<Item = usize> + '_> {
218 match &self.selection {
219 Some(sel) => Box::new(sel.iter()),
220 None => Box::new(0..self.count),
221 }
222 }
223
224 pub fn concat(chunks: &[DataChunk]) -> DataChunk {
228 if chunks.is_empty() {
229 return DataChunk::empty();
230 }
231
232 if chunks.len() == 1 {
233 return DataChunk {
235 columns: chunks[0].columns.clone(),
236 selection: chunks[0].selection.clone(),
237 count: chunks[0].count,
238 capacity: chunks[0].capacity,
239 };
240 }
241
242 let num_columns = chunks[0].column_count();
243 if num_columns == 0 {
244 return DataChunk::empty();
245 }
246
247 let total_rows: usize = chunks.iter().map(|c| c.row_count()).sum();
248
249 let mut result_columns = Vec::with_capacity(num_columns);
251
252 for col_idx in 0..num_columns {
253 let mut concat_vector = ValueVector::new();
254
255 for chunk in chunks {
256 if let Some(col) = chunk.column(col_idx) {
257 for i in chunk.selected_indices() {
259 if let Some(val) = col.get(i) {
260 concat_vector.push(val);
261 }
262 }
263 }
264 }
265
266 result_columns.push(concat_vector);
267 }
268
269 DataChunk {
270 columns: result_columns,
271 selection: None,
272 count: total_rows,
273 capacity: total_rows,
274 }
275 }
276
277 pub fn filter(&self, predicate: &SelectionVector) -> DataChunk {
279 let selected: Vec<usize> = predicate
281 .iter()
282 .filter(|&idx| self.selection.as_ref().map_or(true, |s| s.contains(idx)))
283 .collect();
284
285 let mut result_columns = Vec::with_capacity(self.columns.len());
286
287 for col in &self.columns {
288 let mut new_col = ValueVector::new();
289 for &idx in &selected {
290 if let Some(val) = col.get(idx) {
291 new_col.push(val);
292 }
293 }
294 result_columns.push(new_col);
295 }
296
297 DataChunk {
298 columns: result_columns,
299 selection: None,
300 count: selected.len(),
301 capacity: selected.len(),
302 }
303 }
304
305 #[must_use]
309 pub fn slice(&self, offset: usize, count: usize) -> DataChunk {
310 if offset >= self.len() || count == 0 {
311 return DataChunk::empty();
312 }
313
314 let actual_count = count.min(self.len() - offset);
315 let mut result_columns = Vec::with_capacity(self.columns.len());
316
317 for col in &self.columns {
318 let mut new_col = ValueVector::new();
319 for i in offset..(offset + actual_count) {
320 let actual_idx = if let Some(sel) = &self.selection {
321 sel.get(i).unwrap_or(i)
322 } else {
323 i
324 };
325 if let Some(val) = col.get(actual_idx) {
326 new_col.push(val);
327 }
328 }
329 result_columns.push(new_col);
330 }
331
332 DataChunk {
333 columns: result_columns,
334 selection: None,
335 count: actual_count,
336 capacity: actual_count,
337 }
338 }
339
340 #[must_use]
342 pub fn num_columns(&self) -> usize {
343 self.columns.len()
344 }
345}
346
347impl Clone for DataChunk {
348 fn clone(&self) -> Self {
349 Self {
350 columns: self.columns.clone(),
351 selection: self.selection.clone(),
352 count: self.count,
353 capacity: self.capacity,
354 }
355 }
356}
357
358pub struct DataChunkBuilder {
360 chunk: DataChunk,
361}
362
363impl DataChunkBuilder {
364 #[must_use]
366 pub fn with_schema(column_types: &[LogicalType]) -> Self {
367 Self {
368 chunk: DataChunk::with_schema(column_types),
369 }
370 }
371
372 #[must_use]
374 pub fn with_capacity(column_types: &[LogicalType], capacity: usize) -> Self {
375 Self {
376 chunk: DataChunk::with_capacity(column_types, capacity),
377 }
378 }
379
380 #[must_use]
382 pub fn new(column_types: &[LogicalType]) -> Self {
383 Self::with_schema(column_types)
384 }
385
386 #[must_use]
388 pub fn row_count(&self) -> usize {
389 self.chunk.count
390 }
391
392 #[must_use]
394 pub fn is_full(&self) -> bool {
395 self.chunk.is_full()
396 }
397
398 pub fn column_mut(&mut self, index: usize) -> Option<&mut ValueVector> {
400 self.chunk.column_mut(index)
401 }
402
403 pub fn advance_row(&mut self) {
405 self.chunk.count += 1;
406 }
407
408 #[must_use]
410 pub fn finish(self) -> DataChunk {
411 self.chunk
412 }
413
414 pub fn reset(&mut self) {
416 self.chunk.reset();
417 }
418}
419
420#[cfg(test)]
421mod tests {
422 use super::*;
423
424 #[test]
425 fn test_chunk_creation() {
426 let schema = [LogicalType::Int64, LogicalType::String];
427 let chunk = DataChunk::with_schema(&schema);
428
429 assert_eq!(chunk.column_count(), 2);
430 assert_eq!(chunk.row_count(), 0);
431 assert!(chunk.is_empty());
432 }
433
434 #[test]
435 fn test_chunk_builder() {
436 let schema = [LogicalType::Int64, LogicalType::String];
437 let mut builder = DataChunkBuilder::with_schema(&schema);
438
439 builder.column_mut(0).unwrap().push_int64(1);
441 builder.column_mut(1).unwrap().push_string("hello");
442 builder.advance_row();
443
444 builder.column_mut(0).unwrap().push_int64(2);
446 builder.column_mut(1).unwrap().push_string("world");
447 builder.advance_row();
448
449 let chunk = builder.finish();
450
451 assert_eq!(chunk.row_count(), 2);
452 assert_eq!(chunk.column(0).unwrap().get_int64(0), Some(1));
453 assert_eq!(chunk.column(1).unwrap().get_string(1), Some("world"));
454 }
455
456 #[test]
457 fn test_chunk_selection() {
458 let schema = [LogicalType::Int64];
459 let mut builder = DataChunkBuilder::with_schema(&schema);
460
461 for i in 0..10 {
462 builder.column_mut(0).unwrap().push_int64(i);
463 builder.advance_row();
464 }
465
466 let mut chunk = builder.finish();
467 assert_eq!(chunk.row_count(), 10);
468
469 let selection = SelectionVector::from_predicate(10, |i| i % 2 == 0);
471 chunk.set_selection(selection);
472
473 assert_eq!(chunk.row_count(), 5); assert_eq!(chunk.total_row_count(), 10);
475 }
476
477 #[test]
478 fn test_chunk_reset() {
479 let schema = [LogicalType::Int64];
480 let mut builder = DataChunkBuilder::with_schema(&schema);
481
482 builder.column_mut(0).unwrap().push_int64(1);
483 builder.advance_row();
484
485 let mut chunk = builder.finish();
486 assert_eq!(chunk.row_count(), 1);
487
488 chunk.reset();
489 assert_eq!(chunk.row_count(), 0);
490 assert!(chunk.is_empty());
491 }
492
493 #[test]
494 fn test_selected_indices() {
495 let schema = [LogicalType::Int64];
496 let mut chunk = DataChunk::with_schema(&schema);
497 chunk.set_count(5);
498
499 let indices: Vec<_> = chunk.selected_indices().collect();
501 assert_eq!(indices, vec![0, 1, 2, 3, 4]);
502
503 let selection = SelectionVector::from_predicate(5, |i| i == 1 || i == 3);
505 chunk.set_selection(selection);
506
507 let indices: Vec<_> = chunk.selected_indices().collect();
508 assert_eq!(indices, vec![1, 3]);
509 }
510
511 #[test]
512 fn test_chunk_flatten() {
513 let schema = [LogicalType::Int64, LogicalType::String];
514 let mut builder = DataChunkBuilder::with_schema(&schema);
515
516 let letters = ["a", "b", "c", "d", "e"];
518 for i in 0..5 {
519 builder.column_mut(0).unwrap().push_int64(i);
520 builder
521 .column_mut(1)
522 .unwrap()
523 .push_string(letters[i as usize]);
524 builder.advance_row();
525 }
526
527 let mut chunk = builder.finish();
528
529 let selection = SelectionVector::from_predicate(5, |i| i % 2 == 1);
531 chunk.set_selection(selection);
532
533 assert_eq!(chunk.row_count(), 2);
534 assert_eq!(chunk.total_row_count(), 5);
535
536 chunk.flatten();
538
539 assert_eq!(chunk.row_count(), 2);
541 assert_eq!(chunk.total_row_count(), 2);
542 assert!(chunk.selection().is_none());
543
544 assert_eq!(chunk.column(0).unwrap().get_int64(0), Some(1));
546 assert_eq!(chunk.column(0).unwrap().get_int64(1), Some(3));
547 assert_eq!(chunk.column(1).unwrap().get_string(0), Some("b"));
548 assert_eq!(chunk.column(1).unwrap().get_string(1), Some("d"));
549 }
550
551 #[test]
552 fn test_chunk_flatten_no_selection() {
553 let schema = [LogicalType::Int64];
554 let mut builder = DataChunkBuilder::with_schema(&schema);
555
556 builder.column_mut(0).unwrap().push_int64(42);
557 builder.advance_row();
558
559 let mut chunk = builder.finish();
560 let original_count = chunk.row_count();
561
562 chunk.flatten();
564
565 assert_eq!(chunk.row_count(), original_count);
566 assert_eq!(chunk.column(0).unwrap().get_int64(0), Some(42));
567 }
568}