1use kyu_types::TypedValue;
7
8use crate::value_vector::{SelectionVector, ValueVector};
9
10#[derive(Clone, Debug)]
16pub struct DataChunk {
17 columns: Vec<ValueVector>,
18 selection: SelectionVector,
19}
20
21impl DataChunk {
22 pub fn from_vectors(columns: Vec<ValueVector>, selection: SelectionVector) -> Self {
24 Self { columns, selection }
25 }
26
27 pub fn new(columns: Vec<Vec<TypedValue>>) -> Self {
29 let num_rows = columns.first().map_or(0, |c| c.len());
30 debug_assert!(columns.iter().all(|c| c.len() == num_rows));
31 let vectors = columns.into_iter().map(ValueVector::Owned).collect();
32 Self {
33 columns: vectors,
34 selection: SelectionVector::identity(num_rows),
35 }
36 }
37
38 pub fn new_with_row_count(columns: Vec<Vec<TypedValue>>, num_rows: usize) -> Self {
40 let vectors = columns.into_iter().map(ValueVector::Owned).collect();
41 Self {
42 columns: vectors,
43 selection: SelectionVector::identity(num_rows),
44 }
45 }
46
47 pub fn empty(num_columns: usize) -> Self {
49 Self {
50 columns: (0..num_columns)
51 .map(|_| ValueVector::Owned(Vec::new()))
52 .collect(),
53 selection: SelectionVector::identity(0),
54 }
55 }
56
57 pub fn with_capacity(num_columns: usize, row_capacity: usize) -> Self {
59 Self {
60 columns: (0..num_columns)
61 .map(|_| ValueVector::Owned(Vec::with_capacity(row_capacity)))
62 .collect(),
63 selection: SelectionVector::identity(0),
64 }
65 }
66
67 pub fn single_empty_row(num_columns: usize) -> Self {
69 Self {
70 columns: (0..num_columns)
71 .map(|_| ValueVector::Owned(vec![TypedValue::Null]))
72 .collect(),
73 selection: SelectionVector::identity(1),
74 }
75 }
76
77 pub fn from_rows(rows: &[Vec<TypedValue>], num_columns: usize) -> Self {
79 let mut columns: Vec<Vec<TypedValue>> = (0..num_columns).map(|_| Vec::new()).collect();
80 for row in rows {
81 for (col_idx, val) in row.iter().enumerate() {
82 if col_idx < num_columns {
83 columns[col_idx].push(val.clone());
84 }
85 }
86 }
87 let num_rows = rows.len();
88 Self {
89 columns: columns.into_iter().map(ValueVector::Owned).collect(),
90 selection: SelectionVector::identity(num_rows),
91 }
92 }
93
94 #[inline]
95 pub fn num_columns(&self) -> usize {
96 self.columns.len()
97 }
98
99 #[inline]
100 pub fn num_rows(&self) -> usize {
101 self.selection.len()
102 }
103
104 #[inline]
105 pub fn is_empty(&self) -> bool {
106 self.selection.is_empty()
107 }
108
109 #[inline]
111 pub fn get_value(&self, row_idx: usize, col_idx: usize) -> TypedValue {
112 let physical = self.selection.get(row_idx);
113 self.columns[col_idx].get_value(physical)
114 }
115
116 pub fn get_row(&self, row_idx: usize) -> Vec<TypedValue> {
118 (0..self.columns.len())
119 .map(|col| self.get_value(row_idx, col))
120 .collect()
121 }
122
123 pub fn append_row(&mut self, row: &[TypedValue]) {
125 debug_assert_eq!(row.len(), self.columns.len());
126 for (col_idx, val) in row.iter().enumerate() {
127 self.columns[col_idx].push(val.clone());
128 }
129 self.selection = SelectionVector::identity(self.selection.len() + 1);
130 }
131
132 pub fn append(&mut self, other: &DataChunk) {
136 debug_assert_eq!(self.num_columns(), other.num_columns());
137 self.ensure_owned();
138 for row_idx in 0..other.num_rows() {
139 self.append_row_from_chunk(other, row_idx);
140 }
141 }
142
143 fn ensure_owned(&mut self) {
145 let needs_materialize = self
146 .columns
147 .iter()
148 .any(|c| !matches!(c, ValueVector::Owned(_)));
149 if !needs_materialize {
150 return;
151 }
152 let num_rows = self.num_rows();
153 let num_cols = self.num_columns();
154 let owned_columns: Vec<Vec<TypedValue>> = (0..num_cols)
155 .map(|col| (0..num_rows).map(|row| self.get_value(row, col)).collect())
156 .collect();
157 self.columns = owned_columns.into_iter().map(ValueVector::Owned).collect();
158 self.selection = SelectionVector::identity(num_rows);
159 }
160
161 #[inline]
163 pub fn selection(&self) -> &SelectionVector {
164 &self.selection
165 }
166
167 #[inline]
169 pub fn column(&self, col_idx: usize) -> &ValueVector {
170 &self.columns[col_idx]
171 }
172
173 pub fn compact_column(&self, col_idx: usize) -> ValueVector {
177 if self.selection.is_identity() {
178 self.columns[col_idx].clone()
179 } else {
180 let n = self.selection.len();
181 let mut result = Vec::with_capacity(n);
182 for i in 0..n {
183 result.push(self.columns[col_idx].get_value(self.selection.get(i)));
184 }
185 ValueVector::Owned(result)
186 }
187 }
188
189 pub fn select_columns(mut self, indices: &[usize]) -> Self {
191 let columns = indices
192 .iter()
193 .map(|&i| std::mem::replace(&mut self.columns[i], ValueVector::Owned(Vec::new())))
194 .collect();
195 Self {
196 columns,
197 selection: self.selection,
198 }
199 }
200
201 pub fn take_column(&mut self, col_idx: usize) -> ValueVector {
205 if self.selection.is_identity() {
206 std::mem::replace(&mut self.columns[col_idx], ValueVector::Owned(Vec::new()))
207 } else {
208 let n = self.selection.len();
209 let mut result = Vec::with_capacity(n);
210 for i in 0..n {
211 result.push(self.columns[col_idx].get_value(self.selection.get(i)));
212 }
213 ValueVector::Owned(result)
214 }
215 }
216
217 #[inline]
219 pub fn with_selection(self, selection: SelectionVector) -> Self {
220 Self {
221 columns: self.columns,
222 selection,
223 }
224 }
225
226 #[inline]
228 pub fn row_ref(&self, row_idx: usize) -> RowRef<'_> {
229 RowRef {
230 chunk: self,
231 row_idx,
232 }
233 }
234
235 pub fn append_row_from_chunk(&mut self, source: &DataChunk, row_idx: usize) {
237 debug_assert_eq!(self.num_columns(), source.num_columns());
238 for col_idx in 0..self.columns.len() {
239 let val = source.get_value(row_idx, col_idx);
240 self.columns[col_idx].push(val);
241 }
242 self.selection = SelectionVector::identity(self.selection.len() + 1);
243 }
244
245 pub fn append_row_from_chunk_with_extra(
247 &mut self,
248 source: &DataChunk,
249 row_idx: usize,
250 extra: TypedValue,
251 ) {
252 debug_assert_eq!(self.num_columns(), source.num_columns() + 1);
253 for col_idx in 0..source.num_columns() {
254 let val = source.get_value(row_idx, col_idx);
255 self.columns[col_idx].push(val);
256 }
257 self.columns[source.num_columns()].push(extra);
258 self.selection = SelectionVector::identity(self.selection.len() + 1);
259 }
260}
261
262pub struct RowRef<'a> {
267 chunk: &'a DataChunk,
268 row_idx: usize,
269}
270
271impl kyu_expression::Tuple for RowRef<'_> {
272 #[inline]
273 fn value_at(&self, col_idx: usize) -> Option<TypedValue> {
274 if col_idx < self.chunk.num_columns() {
275 Some(self.chunk.get_value(self.row_idx, col_idx))
276 } else {
277 None
278 }
279 }
280}
281
282#[cfg(test)]
283mod tests {
284 use super::*;
285 use smol_str::SmolStr;
286
287 #[test]
288 fn new_chunk() {
289 let chunk = DataChunk::new(vec![
290 vec![TypedValue::Int64(1), TypedValue::Int64(2)],
291 vec![
292 TypedValue::String(SmolStr::new("a")),
293 TypedValue::String(SmolStr::new("b")),
294 ],
295 ]);
296 assert_eq!(chunk.num_rows(), 2);
297 assert_eq!(chunk.num_columns(), 2);
298 }
299
300 #[test]
301 fn empty_chunk() {
302 let chunk = DataChunk::empty(3);
303 assert_eq!(chunk.num_rows(), 0);
304 assert_eq!(chunk.num_columns(), 3);
305 assert!(chunk.is_empty());
306 }
307
308 #[test]
309 fn single_empty_row() {
310 let chunk = DataChunk::single_empty_row(2);
311 assert_eq!(chunk.num_rows(), 1);
312 assert_eq!(chunk.get_value(0, 0), TypedValue::Null);
313 assert_eq!(chunk.get_value(0, 1), TypedValue::Null);
314 }
315
316 #[test]
317 fn from_rows() {
318 let rows = vec![
319 vec![TypedValue::Int64(1), TypedValue::Int64(10)],
320 vec![TypedValue::Int64(2), TypedValue::Int64(20)],
321 ];
322 let chunk = DataChunk::from_rows(&rows, 2);
323 assert_eq!(chunk.num_rows(), 2);
324 assert_eq!(chunk.get_value(0, 0), TypedValue::Int64(1));
325 assert_eq!(chunk.get_value(1, 0), TypedValue::Int64(2));
326 assert_eq!(chunk.get_value(0, 1), TypedValue::Int64(10));
327 assert_eq!(chunk.get_value(1, 1), TypedValue::Int64(20));
328 }
329
330 #[test]
331 fn get_row() {
332 let chunk = DataChunk::new(vec![
333 vec![TypedValue::Int64(1), TypedValue::Int64(2)],
334 vec![TypedValue::Int64(10), TypedValue::Int64(20)],
335 ]);
336 assert_eq!(
337 chunk.get_row(0),
338 vec![TypedValue::Int64(1), TypedValue::Int64(10)]
339 );
340 assert_eq!(
341 chunk.get_row(1),
342 vec![TypedValue::Int64(2), TypedValue::Int64(20)]
343 );
344 }
345
346 #[test]
347 fn append_row() {
348 let mut chunk = DataChunk::empty(2);
349 chunk.append_row(&[TypedValue::Int64(1), TypedValue::Int64(2)]);
350 assert_eq!(chunk.num_rows(), 1);
351 chunk.append_row(&[TypedValue::Int64(3), TypedValue::Int64(4)]);
352 assert_eq!(chunk.num_rows(), 2);
353 assert_eq!(
354 chunk.get_row(1),
355 vec![TypedValue::Int64(3), TypedValue::Int64(4)]
356 );
357 }
358
359 #[test]
360 fn row_ref_value_at() {
361 use kyu_expression::Tuple;
362 let chunk = DataChunk::new(vec![
363 vec![TypedValue::Int64(1), TypedValue::Int64(2)],
364 vec![TypedValue::Int64(10), TypedValue::Int64(20)],
365 ]);
366 let row = chunk.row_ref(0);
367 assert_eq!(row.value_at(0), Some(TypedValue::Int64(1)));
368 assert_eq!(row.value_at(1), Some(TypedValue::Int64(10)));
369 assert_eq!(row.value_at(2), None);
370
371 let row1 = chunk.row_ref(1);
372 assert_eq!(row1.value_at(0), Some(TypedValue::Int64(2)));
373 assert_eq!(row1.value_at(1), Some(TypedValue::Int64(20)));
374 }
375
376 #[test]
377 fn append_row_from_chunk() {
378 let src = DataChunk::new(vec![
379 vec![TypedValue::Int64(1), TypedValue::Int64(2)],
380 vec![TypedValue::Int64(10), TypedValue::Int64(20)],
381 ]);
382 let mut dst = DataChunk::with_capacity(2, 2);
383 dst.append_row_from_chunk(&src, 1);
384 assert_eq!(dst.num_rows(), 1);
385 assert_eq!(
386 dst.get_row(0),
387 vec![TypedValue::Int64(2), TypedValue::Int64(20)]
388 );
389 }
390
391 #[test]
392 fn append_chunks() {
393 let mut chunk1 = DataChunk::new(vec![vec![TypedValue::Int64(1)]]);
394 let chunk2 = DataChunk::new(vec![vec![TypedValue::Int64(2), TypedValue::Int64(3)]]);
395 chunk1.append(&chunk2);
396 assert_eq!(chunk1.num_rows(), 3);
397 assert_eq!(chunk1.get_value(0, 0), TypedValue::Int64(1));
398 assert_eq!(chunk1.get_value(1, 0), TypedValue::Int64(2));
399 assert_eq!(chunk1.get_value(2, 0), TypedValue::Int64(3));
400 }
401
402 #[test]
403 fn with_selection_filters() {
404 let chunk = DataChunk::new(vec![vec![
405 TypedValue::Int64(10),
406 TypedValue::Int64(20),
407 TypedValue::Int64(30),
408 ]]);
409 let filtered = chunk.with_selection(SelectionVector::from_indices(vec![0, 2]));
410 assert_eq!(filtered.num_rows(), 2);
411 assert_eq!(filtered.get_value(0, 0), TypedValue::Int64(10));
412 assert_eq!(filtered.get_value(1, 0), TypedValue::Int64(30));
413 }
414}