1use kyu_types::TypedValue;
7
8use crate::value_vector::{SelectionVector, ValueVector};
9
10#[derive(Clone, Debug)]
16pub struct DataChunk {
17 columns: Vec<ValueVector>,
18 selection: SelectionVector,
19}
20
21impl DataChunk {
22 pub fn from_vectors(columns: Vec<ValueVector>, selection: SelectionVector) -> Self {
24 Self { columns, selection }
25 }
26
27 pub fn new(columns: Vec<Vec<TypedValue>>) -> Self {
29 let num_rows = columns.first().map_or(0, |c| c.len());
30 debug_assert!(columns.iter().all(|c| c.len() == num_rows));
31 let vectors = columns.into_iter().map(ValueVector::Owned).collect();
32 Self {
33 columns: vectors,
34 selection: SelectionVector::identity(num_rows),
35 }
36 }
37
38 pub fn new_with_row_count(columns: Vec<Vec<TypedValue>>, num_rows: usize) -> Self {
40 let vectors = columns.into_iter().map(ValueVector::Owned).collect();
41 Self {
42 columns: vectors,
43 selection: SelectionVector::identity(num_rows),
44 }
45 }
46
47 pub fn empty(num_columns: usize) -> Self {
49 Self {
50 columns: (0..num_columns)
51 .map(|_| ValueVector::Owned(Vec::new()))
52 .collect(),
53 selection: SelectionVector::identity(0),
54 }
55 }
56
57 pub fn with_capacity(num_columns: usize, row_capacity: usize) -> Self {
59 Self {
60 columns: (0..num_columns)
61 .map(|_| ValueVector::Owned(Vec::with_capacity(row_capacity)))
62 .collect(),
63 selection: SelectionVector::identity(0),
64 }
65 }
66
67 pub fn single_empty_row(num_columns: usize) -> Self {
69 Self {
70 columns: (0..num_columns)
71 .map(|_| ValueVector::Owned(vec![TypedValue::Null]))
72 .collect(),
73 selection: SelectionVector::identity(1),
74 }
75 }
76
77 pub fn from_rows(rows: &[Vec<TypedValue>], num_columns: usize) -> Self {
79 let mut columns: Vec<Vec<TypedValue>> = (0..num_columns).map(|_| Vec::new()).collect();
80 for row in rows {
81 for (col_idx, val) in row.iter().enumerate() {
82 if col_idx < num_columns {
83 columns[col_idx].push(val.clone());
84 }
85 }
86 }
87 let num_rows = rows.len();
88 Self {
89 columns: columns.into_iter().map(ValueVector::Owned).collect(),
90 selection: SelectionVector::identity(num_rows),
91 }
92 }
93
94 #[inline]
95 pub fn num_columns(&self) -> usize {
96 self.columns.len()
97 }
98
99 #[inline]
100 pub fn num_rows(&self) -> usize {
101 self.selection.len()
102 }
103
104 #[inline]
105 pub fn is_empty(&self) -> bool {
106 self.selection.is_empty()
107 }
108
109 #[inline]
111 pub fn get_value(&self, row_idx: usize, col_idx: usize) -> TypedValue {
112 let physical = self.selection.get(row_idx);
113 self.columns[col_idx].get_value(physical)
114 }
115
116 pub fn get_row(&self, row_idx: usize) -> Vec<TypedValue> {
118 (0..self.columns.len())
119 .map(|col| self.get_value(row_idx, col))
120 .collect()
121 }
122
123 pub fn append_row(&mut self, row: &[TypedValue]) {
125 debug_assert_eq!(row.len(), self.columns.len());
126 for (col_idx, val) in row.iter().enumerate() {
127 self.columns[col_idx].push(val.clone());
128 }
129 self.selection = SelectionVector::identity(self.selection.len() + 1);
130 }
131
132 pub fn append(&mut self, other: &DataChunk) {
136 debug_assert_eq!(self.num_columns(), other.num_columns());
137 self.ensure_owned();
138 for row_idx in 0..other.num_rows() {
139 self.append_row_from_chunk(other, row_idx);
140 }
141 }
142
143 fn ensure_owned(&mut self) {
145 let needs_materialize = self.columns.iter().any(|c| !matches!(c, ValueVector::Owned(_)));
146 if !needs_materialize {
147 return;
148 }
149 let num_rows = self.num_rows();
150 let num_cols = self.num_columns();
151 let owned_columns: Vec<Vec<TypedValue>> = (0..num_cols)
152 .map(|col| {
153 (0..num_rows)
154 .map(|row| self.get_value(row, col))
155 .collect()
156 })
157 .collect();
158 self.columns = owned_columns.into_iter().map(ValueVector::Owned).collect();
159 self.selection = SelectionVector::identity(num_rows);
160 }
161
162 #[inline]
164 pub fn selection(&self) -> &SelectionVector {
165 &self.selection
166 }
167
168 #[inline]
170 pub fn column(&self, col_idx: usize) -> &ValueVector {
171 &self.columns[col_idx]
172 }
173
174 pub fn compact_column(&self, col_idx: usize) -> ValueVector {
178 if self.selection.is_identity() {
179 self.columns[col_idx].clone()
180 } else {
181 let n = self.selection.len();
182 let mut result = Vec::with_capacity(n);
183 for i in 0..n {
184 result.push(self.columns[col_idx].get_value(self.selection.get(i)));
185 }
186 ValueVector::Owned(result)
187 }
188 }
189
190 pub fn select_columns(mut self, indices: &[usize]) -> Self {
192 let columns = indices
193 .iter()
194 .map(|&i| std::mem::replace(&mut self.columns[i], ValueVector::Owned(Vec::new())))
195 .collect();
196 Self {
197 columns,
198 selection: self.selection,
199 }
200 }
201
202 pub fn take_column(&mut self, col_idx: usize) -> ValueVector {
206 if self.selection.is_identity() {
207 std::mem::replace(
208 &mut self.columns[col_idx],
209 ValueVector::Owned(Vec::new()),
210 )
211 } else {
212 let n = self.selection.len();
213 let mut result = Vec::with_capacity(n);
214 for i in 0..n {
215 result.push(self.columns[col_idx].get_value(self.selection.get(i)));
216 }
217 ValueVector::Owned(result)
218 }
219 }
220
221 #[inline]
223 pub fn with_selection(self, selection: SelectionVector) -> Self {
224 Self {
225 columns: self.columns,
226 selection,
227 }
228 }
229
230 #[inline]
232 pub fn row_ref(&self, row_idx: usize) -> RowRef<'_> {
233 RowRef {
234 chunk: self,
235 row_idx,
236 }
237 }
238
239 pub fn append_row_from_chunk(&mut self, source: &DataChunk, row_idx: usize) {
241 debug_assert_eq!(self.num_columns(), source.num_columns());
242 for col_idx in 0..self.columns.len() {
243 let val = source.get_value(row_idx, col_idx);
244 self.columns[col_idx].push(val);
245 }
246 self.selection = SelectionVector::identity(self.selection.len() + 1);
247 }
248
249 pub fn append_row_from_chunk_with_extra(
251 &mut self,
252 source: &DataChunk,
253 row_idx: usize,
254 extra: TypedValue,
255 ) {
256 debug_assert_eq!(self.num_columns(), source.num_columns() + 1);
257 for col_idx in 0..source.num_columns() {
258 let val = source.get_value(row_idx, col_idx);
259 self.columns[col_idx].push(val);
260 }
261 self.columns[source.num_columns()].push(extra);
262 self.selection = SelectionVector::identity(self.selection.len() + 1);
263 }
264}
265
266pub struct RowRef<'a> {
271 chunk: &'a DataChunk,
272 row_idx: usize,
273}
274
275impl kyu_expression::Tuple for RowRef<'_> {
276 #[inline]
277 fn value_at(&self, col_idx: usize) -> Option<TypedValue> {
278 if col_idx < self.chunk.num_columns() {
279 Some(self.chunk.get_value(self.row_idx, col_idx))
280 } else {
281 None
282 }
283 }
284}
285
286#[cfg(test)]
287mod tests {
288 use super::*;
289 use smol_str::SmolStr;
290
291 #[test]
292 fn new_chunk() {
293 let chunk = DataChunk::new(vec![
294 vec![TypedValue::Int64(1), TypedValue::Int64(2)],
295 vec![
296 TypedValue::String(SmolStr::new("a")),
297 TypedValue::String(SmolStr::new("b")),
298 ],
299 ]);
300 assert_eq!(chunk.num_rows(), 2);
301 assert_eq!(chunk.num_columns(), 2);
302 }
303
304 #[test]
305 fn empty_chunk() {
306 let chunk = DataChunk::empty(3);
307 assert_eq!(chunk.num_rows(), 0);
308 assert_eq!(chunk.num_columns(), 3);
309 assert!(chunk.is_empty());
310 }
311
312 #[test]
313 fn single_empty_row() {
314 let chunk = DataChunk::single_empty_row(2);
315 assert_eq!(chunk.num_rows(), 1);
316 assert_eq!(chunk.get_value(0, 0), TypedValue::Null);
317 assert_eq!(chunk.get_value(0, 1), TypedValue::Null);
318 }
319
320 #[test]
321 fn from_rows() {
322 let rows = vec![
323 vec![TypedValue::Int64(1), TypedValue::Int64(10)],
324 vec![TypedValue::Int64(2), TypedValue::Int64(20)],
325 ];
326 let chunk = DataChunk::from_rows(&rows, 2);
327 assert_eq!(chunk.num_rows(), 2);
328 assert_eq!(chunk.get_value(0, 0), TypedValue::Int64(1));
329 assert_eq!(chunk.get_value(1, 0), TypedValue::Int64(2));
330 assert_eq!(chunk.get_value(0, 1), TypedValue::Int64(10));
331 assert_eq!(chunk.get_value(1, 1), TypedValue::Int64(20));
332 }
333
334 #[test]
335 fn get_row() {
336 let chunk = DataChunk::new(vec![
337 vec![TypedValue::Int64(1), TypedValue::Int64(2)],
338 vec![TypedValue::Int64(10), TypedValue::Int64(20)],
339 ]);
340 assert_eq!(chunk.get_row(0), vec![TypedValue::Int64(1), TypedValue::Int64(10)]);
341 assert_eq!(chunk.get_row(1), vec![TypedValue::Int64(2), TypedValue::Int64(20)]);
342 }
343
344 #[test]
345 fn append_row() {
346 let mut chunk = DataChunk::empty(2);
347 chunk.append_row(&[TypedValue::Int64(1), TypedValue::Int64(2)]);
348 assert_eq!(chunk.num_rows(), 1);
349 chunk.append_row(&[TypedValue::Int64(3), TypedValue::Int64(4)]);
350 assert_eq!(chunk.num_rows(), 2);
351 assert_eq!(chunk.get_row(1), vec![TypedValue::Int64(3), TypedValue::Int64(4)]);
352 }
353
354 #[test]
355 fn row_ref_value_at() {
356 use kyu_expression::Tuple;
357 let chunk = DataChunk::new(vec![
358 vec![TypedValue::Int64(1), TypedValue::Int64(2)],
359 vec![TypedValue::Int64(10), TypedValue::Int64(20)],
360 ]);
361 let row = chunk.row_ref(0);
362 assert_eq!(row.value_at(0), Some(TypedValue::Int64(1)));
363 assert_eq!(row.value_at(1), Some(TypedValue::Int64(10)));
364 assert_eq!(row.value_at(2), None);
365
366 let row1 = chunk.row_ref(1);
367 assert_eq!(row1.value_at(0), Some(TypedValue::Int64(2)));
368 assert_eq!(row1.value_at(1), Some(TypedValue::Int64(20)));
369 }
370
371 #[test]
372 fn append_row_from_chunk() {
373 let src = DataChunk::new(vec![
374 vec![TypedValue::Int64(1), TypedValue::Int64(2)],
375 vec![TypedValue::Int64(10), TypedValue::Int64(20)],
376 ]);
377 let mut dst = DataChunk::with_capacity(2, 2);
378 dst.append_row_from_chunk(&src, 1);
379 assert_eq!(dst.num_rows(), 1);
380 assert_eq!(dst.get_row(0), vec![TypedValue::Int64(2), TypedValue::Int64(20)]);
381 }
382
383 #[test]
384 fn append_chunks() {
385 let mut chunk1 = DataChunk::new(vec![vec![TypedValue::Int64(1)]]);
386 let chunk2 = DataChunk::new(vec![vec![TypedValue::Int64(2), TypedValue::Int64(3)]]);
387 chunk1.append(&chunk2);
388 assert_eq!(chunk1.num_rows(), 3);
389 assert_eq!(chunk1.get_value(0, 0), TypedValue::Int64(1));
390 assert_eq!(chunk1.get_value(1, 0), TypedValue::Int64(2));
391 assert_eq!(chunk1.get_value(2, 0), TypedValue::Int64(3));
392 }
393
394 #[test]
395 fn with_selection_filters() {
396 let chunk = DataChunk::new(vec![
397 vec![TypedValue::Int64(10), TypedValue::Int64(20), TypedValue::Int64(30)],
398 ]);
399 let filtered = chunk.with_selection(SelectionVector::from_indices(vec![0, 2]));
400 assert_eq!(filtered.num_rows(), 2);
401 assert_eq!(filtered.get_value(0, 0), TypedValue::Int64(10));
402 assert_eq!(filtered.get_value(1, 0), TypedValue::Int64(30));
403 }
404}