1use super::selection::SelectionVector;
4use super::vector::ValueVector;
5use graphos_common::types::LogicalType;
6
7pub const DEFAULT_CHUNK_SIZE: usize = 2048;
9
10#[derive(Debug)]
16pub struct DataChunk {
17 columns: Vec<ValueVector>,
19 selection: Option<SelectionVector>,
21 count: usize,
23 capacity: usize,
25}
26
27impl DataChunk {
28 #[must_use]
30 pub fn empty() -> Self {
31 Self {
32 columns: Vec::new(),
33 selection: None,
34 count: 0,
35 capacity: 0,
36 }
37 }
38
39 #[must_use]
41 pub fn new(columns: Vec<ValueVector>) -> Self {
42 let count = columns.first().map_or(0, ValueVector::len);
43 let capacity = columns.first().map_or(DEFAULT_CHUNK_SIZE, |c| c.len());
44 Self {
45 columns,
46 selection: None,
47 count,
48 capacity,
49 }
50 }
51
52 #[must_use]
54 pub fn with_schema(column_types: &[LogicalType]) -> Self {
55 Self::with_capacity(column_types, DEFAULT_CHUNK_SIZE)
56 }
57
58 #[must_use]
60 pub fn with_capacity(column_types: &[LogicalType], capacity: usize) -> Self {
61 let columns = column_types
62 .iter()
63 .map(|t| ValueVector::with_capacity(t.clone(), capacity))
64 .collect();
65
66 Self {
67 columns,
68 selection: None,
69 count: 0,
70 capacity,
71 }
72 }
73
74 #[must_use]
76 pub fn column_count(&self) -> usize {
77 self.columns.len()
78 }
79
80 #[must_use]
82 pub fn row_count(&self) -> usize {
83 self.selection.as_ref().map_or(self.count, |s| s.len())
84 }
85
86 #[must_use]
88 pub fn len(&self) -> usize {
89 self.row_count()
90 }
91
92 #[must_use]
94 pub fn columns(&self) -> &[ValueVector] {
95 &self.columns
96 }
97
98 #[must_use]
100 pub fn total_row_count(&self) -> usize {
101 self.count
102 }
103
104 #[must_use]
106 pub fn is_empty(&self) -> bool {
107 self.row_count() == 0
108 }
109
110 #[must_use]
112 pub fn capacity(&self) -> usize {
113 self.capacity
114 }
115
116 #[must_use]
118 pub fn is_full(&self) -> bool {
119 self.count >= self.capacity
120 }
121
122 #[must_use]
124 pub fn column(&self, index: usize) -> Option<&ValueVector> {
125 self.columns.get(index)
126 }
127
128 pub fn column_mut(&mut self, index: usize) -> Option<&mut ValueVector> {
130 self.columns.get_mut(index)
131 }
132
133 #[must_use]
135 pub fn selection(&self) -> Option<&SelectionVector> {
136 self.selection.as_ref()
137 }
138
139 pub fn set_selection(&mut self, selection: SelectionVector) {
141 self.selection = Some(selection);
142 }
143
144 pub fn clear_selection(&mut self) {
146 self.selection = None;
147 }
148
149 pub fn set_count(&mut self, count: usize) {
151 self.count = count;
152 }
153
154 pub fn reset(&mut self) {
156 for col in &mut self.columns {
157 col.clear();
158 }
159 self.selection = None;
160 self.count = 0;
161 }
162
163 pub fn flatten(&mut self) {
168 let selection = match self.selection.take() {
169 Some(sel) => sel,
170 None => return,
171 };
172
173 let selected_count = selection.len();
174
175 let mut new_columns = Vec::with_capacity(self.columns.len());
177
178 for col in &self.columns {
179 let mut new_col = ValueVector::with_type(col.data_type().clone());
181 for idx in selection.iter() {
182 if let Some(val) = col.get(idx) {
183 new_col.push(val);
184 }
185 }
186 new_columns.push(new_col);
187 }
188
189 self.columns = new_columns;
190 self.count = selected_count;
191 self.capacity = selected_count;
192 }
193
194 pub fn selected_indices(&self) -> Box<dyn Iterator<Item = usize> + '_> {
196 match &self.selection {
197 Some(sel) => Box::new(sel.iter()),
198 None => Box::new(0..self.count),
199 }
200 }
201
202 pub fn concat(chunks: &[DataChunk]) -> DataChunk {
206 if chunks.is_empty() {
207 return DataChunk::empty();
208 }
209
210 if chunks.len() == 1 {
211 return DataChunk {
213 columns: chunks[0].columns.clone(),
214 selection: chunks[0].selection.clone(),
215 count: chunks[0].count,
216 capacity: chunks[0].capacity,
217 };
218 }
219
220 let num_columns = chunks[0].column_count();
221 if num_columns == 0 {
222 return DataChunk::empty();
223 }
224
225 let total_rows: usize = chunks.iter().map(|c| c.row_count()).sum();
226
227 let mut result_columns = Vec::with_capacity(num_columns);
229
230 for col_idx in 0..num_columns {
231 let mut concat_vector = ValueVector::new();
232
233 for chunk in chunks {
234 if let Some(col) = chunk.column(col_idx) {
235 for i in chunk.selected_indices() {
237 if let Some(val) = col.get(i) {
238 concat_vector.push(val);
239 }
240 }
241 }
242 }
243
244 result_columns.push(concat_vector);
245 }
246
247 DataChunk {
248 columns: result_columns,
249 selection: None,
250 count: total_rows,
251 capacity: total_rows,
252 }
253 }
254
255 pub fn filter(&self, predicate: &SelectionVector) -> DataChunk {
257 let selected: Vec<usize> = predicate
259 .iter()
260 .filter(|&idx| self.selection.as_ref().map_or(true, |s| s.contains(idx)))
261 .collect();
262
263 let mut result_columns = Vec::with_capacity(self.columns.len());
264
265 for col in &self.columns {
266 let mut new_col = ValueVector::new();
267 for &idx in &selected {
268 if let Some(val) = col.get(idx) {
269 new_col.push(val);
270 }
271 }
272 result_columns.push(new_col);
273 }
274
275 DataChunk {
276 columns: result_columns,
277 selection: None,
278 count: selected.len(),
279 capacity: selected.len(),
280 }
281 }
282
283 #[must_use]
287 pub fn slice(&self, offset: usize, count: usize) -> DataChunk {
288 if offset >= self.len() || count == 0 {
289 return DataChunk::empty();
290 }
291
292 let actual_count = count.min(self.len() - offset);
293 let mut result_columns = Vec::with_capacity(self.columns.len());
294
295 for col in &self.columns {
296 let mut new_col = ValueVector::new();
297 for i in offset..(offset + actual_count) {
298 let actual_idx = if let Some(sel) = &self.selection {
299 sel.get(i).unwrap_or(i)
300 } else {
301 i
302 };
303 if let Some(val) = col.get(actual_idx) {
304 new_col.push(val);
305 }
306 }
307 result_columns.push(new_col);
308 }
309
310 DataChunk {
311 columns: result_columns,
312 selection: None,
313 count: actual_count,
314 capacity: actual_count,
315 }
316 }
317
318 #[must_use]
320 pub fn num_columns(&self) -> usize {
321 self.columns.len()
322 }
323}
324
325impl Clone for DataChunk {
326 fn clone(&self) -> Self {
327 Self {
328 columns: self.columns.clone(),
329 selection: self.selection.clone(),
330 count: self.count,
331 capacity: self.capacity,
332 }
333 }
334}
335
336pub struct DataChunkBuilder {
338 chunk: DataChunk,
339}
340
341impl DataChunkBuilder {
342 #[must_use]
344 pub fn with_schema(column_types: &[LogicalType]) -> Self {
345 Self {
346 chunk: DataChunk::with_schema(column_types),
347 }
348 }
349
350 #[must_use]
352 pub fn with_capacity(column_types: &[LogicalType], capacity: usize) -> Self {
353 Self {
354 chunk: DataChunk::with_capacity(column_types, capacity),
355 }
356 }
357
358 #[must_use]
360 pub fn new(column_types: &[LogicalType]) -> Self {
361 Self::with_schema(column_types)
362 }
363
364 #[must_use]
366 pub fn row_count(&self) -> usize {
367 self.chunk.count
368 }
369
370 #[must_use]
372 pub fn is_full(&self) -> bool {
373 self.chunk.is_full()
374 }
375
376 pub fn column_mut(&mut self, index: usize) -> Option<&mut ValueVector> {
378 self.chunk.column_mut(index)
379 }
380
381 pub fn advance_row(&mut self) {
383 self.chunk.count += 1;
384 }
385
386 #[must_use]
388 pub fn finish(self) -> DataChunk {
389 self.chunk
390 }
391
392 pub fn reset(&mut self) {
394 self.chunk.reset();
395 }
396}
397
398#[cfg(test)]
399mod tests {
400 use super::*;
401
402 #[test]
403 fn test_chunk_creation() {
404 let schema = [LogicalType::Int64, LogicalType::String];
405 let chunk = DataChunk::with_schema(&schema);
406
407 assert_eq!(chunk.column_count(), 2);
408 assert_eq!(chunk.row_count(), 0);
409 assert!(chunk.is_empty());
410 }
411
412 #[test]
413 fn test_chunk_builder() {
414 let schema = [LogicalType::Int64, LogicalType::String];
415 let mut builder = DataChunkBuilder::with_schema(&schema);
416
417 builder.column_mut(0).unwrap().push_int64(1);
419 builder.column_mut(1).unwrap().push_string("hello");
420 builder.advance_row();
421
422 builder.column_mut(0).unwrap().push_int64(2);
424 builder.column_mut(1).unwrap().push_string("world");
425 builder.advance_row();
426
427 let chunk = builder.finish();
428
429 assert_eq!(chunk.row_count(), 2);
430 assert_eq!(chunk.column(0).unwrap().get_int64(0), Some(1));
431 assert_eq!(chunk.column(1).unwrap().get_string(1), Some("world"));
432 }
433
434 #[test]
435 fn test_chunk_selection() {
436 let schema = [LogicalType::Int64];
437 let mut builder = DataChunkBuilder::with_schema(&schema);
438
439 for i in 0..10 {
440 builder.column_mut(0).unwrap().push_int64(i);
441 builder.advance_row();
442 }
443
444 let mut chunk = builder.finish();
445 assert_eq!(chunk.row_count(), 10);
446
447 let selection = SelectionVector::from_predicate(10, |i| i % 2 == 0);
449 chunk.set_selection(selection);
450
451 assert_eq!(chunk.row_count(), 5); assert_eq!(chunk.total_row_count(), 10);
453 }
454
455 #[test]
456 fn test_chunk_reset() {
457 let schema = [LogicalType::Int64];
458 let mut builder = DataChunkBuilder::with_schema(&schema);
459
460 builder.column_mut(0).unwrap().push_int64(1);
461 builder.advance_row();
462
463 let mut chunk = builder.finish();
464 assert_eq!(chunk.row_count(), 1);
465
466 chunk.reset();
467 assert_eq!(chunk.row_count(), 0);
468 assert!(chunk.is_empty());
469 }
470
471 #[test]
472 fn test_selected_indices() {
473 let schema = [LogicalType::Int64];
474 let mut chunk = DataChunk::with_schema(&schema);
475 chunk.set_count(5);
476
477 let indices: Vec<_> = chunk.selected_indices().collect();
479 assert_eq!(indices, vec![0, 1, 2, 3, 4]);
480
481 let selection = SelectionVector::from_predicate(5, |i| i == 1 || i == 3);
483 chunk.set_selection(selection);
484
485 let indices: Vec<_> = chunk.selected_indices().collect();
486 assert_eq!(indices, vec![1, 3]);
487 }
488
489 #[test]
490 fn test_chunk_flatten() {
491 let schema = [LogicalType::Int64, LogicalType::String];
492 let mut builder = DataChunkBuilder::with_schema(&schema);
493
494 let letters = ["a", "b", "c", "d", "e"];
496 for i in 0..5 {
497 builder.column_mut(0).unwrap().push_int64(i);
498 builder
499 .column_mut(1)
500 .unwrap()
501 .push_string(letters[i as usize]);
502 builder.advance_row();
503 }
504
505 let mut chunk = builder.finish();
506
507 let selection = SelectionVector::from_predicate(5, |i| i % 2 == 1);
509 chunk.set_selection(selection);
510
511 assert_eq!(chunk.row_count(), 2);
512 assert_eq!(chunk.total_row_count(), 5);
513
514 chunk.flatten();
516
517 assert_eq!(chunk.row_count(), 2);
519 assert_eq!(chunk.total_row_count(), 2);
520 assert!(chunk.selection().is_none());
521
522 assert_eq!(chunk.column(0).unwrap().get_int64(0), Some(1));
524 assert_eq!(chunk.column(0).unwrap().get_int64(1), Some(3));
525 assert_eq!(chunk.column(1).unwrap().get_string(0), Some("b"));
526 assert_eq!(chunk.column(1).unwrap().get_string(1), Some("d"));
527 }
528
529 #[test]
530 fn test_chunk_flatten_no_selection() {
531 let schema = [LogicalType::Int64];
532 let mut builder = DataChunkBuilder::with_schema(&schema);
533
534 builder.column_mut(0).unwrap().push_int64(42);
535 builder.advance_row();
536
537 let mut chunk = builder.finish();
538 let original_count = chunk.row_count();
539
540 chunk.flatten();
542
543 assert_eq!(chunk.row_count(), original_count);
544 assert_eq!(chunk.column(0).unwrap().get_int64(0), Some(42));
545 }
546}