1use std::{
5 hash::Hash,
6 mem,
7 ops::{Index, IndexMut},
8};
9
10use indexmap::IndexMap;
11use reifydb_type::{
12 Result,
13 fragment::Fragment,
14 util::cowvec::CowVec,
15 value::{Value, constraint::Constraint, datetime::DateTime, row_number::RowNumber, r#type::Type},
16};
17use serde::{Deserialize, Serialize};
18
19use crate::{
20 encoded::{
21 row::EncodedRow,
22 shape::{RowShape, RowShapeField},
23 },
24 interface::catalog::column::Column as CatalogColumn,
25 return_internal_error,
26 row::Row,
27 value::column::{
28 ColumnBuffer, ColumnWithName, buffer::pool::ColumnBufferPool, data::Column, headers::ColumnHeaders,
29 },
30};
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct Columns {
34 pub row_numbers: CowVec<RowNumber>,
35 pub created_at: CowVec<DateTime>,
36 pub updated_at: CowVec<DateTime>,
37 pub columns: CowVec<ColumnBuffer>,
38 pub names: CowVec<Fragment>,
39}
40
41#[derive(Debug, Clone, Copy)]
42pub struct ColumnRef<'a> {
43 name: &'a Fragment,
44 data: &'a ColumnBuffer,
45}
46
47impl Index<usize> for Columns {
48 type Output = ColumnBuffer;
49
50 fn index(&self, index: usize) -> &Self::Output {
51 &self.columns[index]
52 }
53}
54
55impl IndexMut<usize> for Columns {
56 fn index_mut(&mut self, index: usize) -> &mut Self::Output {
57 &mut self.columns.make_mut()[index]
58 }
59}
60
61impl<'a> ColumnRef<'a> {
62 pub fn new(name: &'a Fragment, data: &'a ColumnBuffer) -> Self {
63 Self {
64 name,
65 data,
66 }
67 }
68
69 pub fn name(&self) -> &'a Fragment {
70 self.name
71 }
72
73 pub fn data(&self) -> &'a ColumnBuffer {
74 self.data
75 }
76
77 pub fn get_type(&self) -> Type {
78 self.data.get_type()
79 }
80
81 pub fn column(&self) -> Column {
82 Column::from_column_buffer(self.data.clone())
83 }
84
85 pub fn with_new_data(&self, data: ColumnBuffer) -> ColumnWithName {
86 ColumnWithName::new(self.name.clone(), data)
87 }
88}
89
90fn value_to_buffer(value: Value) -> ColumnBuffer {
91 match value {
92 Value::None {
93 ..
94 } => ColumnBuffer::none_typed(Type::Boolean, 1),
95 Value::Boolean(v) => ColumnBuffer::bool([v]),
96 Value::Float4(v) => ColumnBuffer::float4([v.into()]),
97 Value::Float8(v) => ColumnBuffer::float8([v.into()]),
98 Value::Int1(v) => ColumnBuffer::int1([v]),
99 Value::Int2(v) => ColumnBuffer::int2([v]),
100 Value::Int4(v) => ColumnBuffer::int4([v]),
101 Value::Int8(v) => ColumnBuffer::int8([v]),
102 Value::Int16(v) => ColumnBuffer::int16([v]),
103 Value::Utf8(v) => ColumnBuffer::utf8([v]),
104 Value::Uint1(v) => ColumnBuffer::uint1([v]),
105 Value::Uint2(v) => ColumnBuffer::uint2([v]),
106 Value::Uint4(v) => ColumnBuffer::uint4([v]),
107 Value::Uint8(v) => ColumnBuffer::uint8([v]),
108 Value::Uint16(v) => ColumnBuffer::uint16([v]),
109 Value::Date(v) => ColumnBuffer::date([v]),
110 Value::DateTime(v) => ColumnBuffer::datetime([v]),
111 Value::Time(v) => ColumnBuffer::time([v]),
112 Value::Duration(v) => ColumnBuffer::duration([v]),
113 Value::IdentityId(v) => ColumnBuffer::identity_id([v]),
114 Value::Uuid4(v) => ColumnBuffer::uuid4([v]),
115 Value::Uuid7(v) => ColumnBuffer::uuid7([v]),
116 Value::Blob(v) => ColumnBuffer::blob([v]),
117 Value::Int(v) => ColumnBuffer::int(vec![v]),
118 Value::Uint(v) => ColumnBuffer::uint(vec![v]),
119 Value::Decimal(v) => ColumnBuffer::decimal(vec![v]),
120 Value::DictionaryId(v) => ColumnBuffer::dictionary_id(vec![v]),
121 Value::Any(v) => ColumnBuffer::any(vec![v]),
122 Value::Type(v) => ColumnBuffer::any(vec![Box::new(Value::Type(v))]),
123 Value::List(v) => ColumnBuffer::any(vec![Box::new(Value::List(v))]),
124 Value::Record(v) => ColumnBuffer::any(vec![Box::new(Value::Record(v))]),
125 Value::Tuple(v) => ColumnBuffer::any(vec![Box::new(Value::Tuple(v))]),
126 }
127}
128
129impl Columns {
130 pub fn scalar_value(&self) -> Value {
131 debug_assert_eq!(self.len(), 1, "scalar_value() requires exactly 1 column, got {}", self.len());
132 debug_assert_eq!(
133 self.row_count(),
134 1,
135 "scalar_value() requires exactly 1 row, got {}",
136 self.row_count()
137 );
138 self.columns[0].get_value(0)
139 }
140
141 pub fn new(columns: Vec<ColumnWithName>) -> Self {
142 let n = columns.first().map_or(0, |c| c.data.len());
143 assert!(columns.iter().all(|c| c.data.len() == n));
144
145 let mut names = Vec::with_capacity(columns.len());
146 let mut buffers = Vec::with_capacity(columns.len());
147 for c in columns {
148 names.push(c.name);
149 buffers.push(c.data);
150 }
151
152 Self {
153 row_numbers: CowVec::new(Vec::new()),
154 created_at: CowVec::new(Vec::new()),
155 updated_at: CowVec::new(Vec::new()),
156 columns: CowVec::new(buffers),
157 names: CowVec::new(names),
158 }
159 }
160
161 pub fn with_system_columns(
162 columns: Vec<ColumnWithName>,
163 row_numbers: Vec<RowNumber>,
164 created_at: Vec<DateTime>,
165 updated_at: Vec<DateTime>,
166 ) -> Self {
167 let n = columns.first().map_or(0, |c| c.data.len());
168 assert!(columns.iter().all(|c| c.data.len() == n));
169 assert_eq!(row_numbers.len(), n, "row_numbers length must match column data length");
170 assert_eq!(created_at.len(), n, "created_at length must match column data length");
171 assert_eq!(updated_at.len(), n, "updated_at length must match column data length");
172
173 let mut names = Vec::with_capacity(columns.len());
174 let mut buffers = Vec::with_capacity(columns.len());
175 for c in columns {
176 names.push(c.name);
177 buffers.push(c.data);
178 }
179
180 Self {
181 row_numbers: CowVec::new(row_numbers),
182 created_at: CowVec::new(created_at),
183 updated_at: CowVec::new(updated_at),
184 columns: CowVec::new(buffers),
185 names: CowVec::new(names),
186 }
187 }
188
189 pub fn single_row<'b>(rows: impl IntoIterator<Item = (&'b str, Value)>) -> Columns {
190 let mut names = Vec::new();
191 let mut buffers = Vec::new();
192 for (name, value) in rows {
193 names.push(Fragment::internal(name));
194 buffers.push(value_to_buffer(value));
195 }
196 Self {
197 row_numbers: CowVec::new(Vec::new()),
198 created_at: CowVec::new(Vec::new()),
199 updated_at: CowVec::new(Vec::new()),
200 columns: CowVec::new(buffers),
201 names: CowVec::new(names),
202 }
203 }
204
205 pub fn with_row_numbers(mut self, row_numbers: Vec<RowNumber>) -> Self {
206 let n = row_numbers.len();
207 self.row_numbers = CowVec::new(row_numbers);
208 if self.created_at.len() != n {
209 let now = DateTime::default();
210 self.created_at = CowVec::new(vec![now; n]);
211 self.updated_at = CowVec::new(vec![now; n]);
212 }
213 self
214 }
215
216 pub fn from_catalog_columns(cols: &[CatalogColumn]) -> Self {
217 let mut names = Vec::with_capacity(cols.len());
218 let mut buffers = Vec::with_capacity(cols.len());
219 for col in cols {
220 names.push(Fragment::internal(&col.name));
221 buffers.push(ColumnBuffer::with_capacity(col.constraint.get_type(), 0));
222 }
223 Self {
224 row_numbers: CowVec::new(Vec::new()),
225 created_at: CowVec::new(Vec::new()),
226 updated_at: CowVec::new(Vec::new()),
227 columns: CowVec::new(buffers),
228 names: CowVec::new(names),
229 }
230 }
231
232 pub fn apply_headers(&mut self, headers: &ColumnHeaders) {
233 let n = self.len();
234 let names = self.names.make_mut();
235 for (i, name) in headers.columns.iter().enumerate() {
236 if i < n {
237 names[i] = name.clone();
238 }
239 }
240 }
241}
242
243impl Columns {
244 pub fn number(&self) -> RowNumber {
245 assert_eq!(self.row_count(), 1, "number() requires exactly 1 row, got {}", self.row_count());
246 if self.row_numbers.is_empty() {
247 RowNumber(0)
248 } else {
249 self.row_numbers[0]
250 }
251 }
252
253 pub fn shape(&self) -> (usize, usize) {
254 let row_count = if !self.row_numbers.is_empty() {
255 self.row_numbers.len()
256 } else {
257 self.columns.first().map(|c| c.len()).unwrap_or(0)
258 };
259 (row_count, self.len())
260 }
261
262 pub fn len(&self) -> usize {
263 self.columns.len()
264 }
265
266 pub fn is_empty(&self) -> bool {
267 self.columns.is_empty()
268 }
269
270 pub fn iter(&self) -> impl Iterator<Item = ColumnRef<'_>> + '_ {
271 self.names.iter().zip(self.columns.iter()).map(|(n, d)| ColumnRef::new(n, d))
272 }
273
274 pub fn first(&self) -> Option<ColumnRef<'_>> {
275 self.get(0)
276 }
277
278 pub fn last(&self) -> Option<ColumnRef<'_>> {
279 let n = self.len();
280 if n == 0 {
281 None
282 } else {
283 self.get(n - 1)
284 }
285 }
286
287 pub fn get(&self, index: usize) -> Option<ColumnRef<'_>> {
288 if index < self.len() {
289 Some(ColumnRef::new(&self.names[index], &self.columns[index]))
290 } else {
291 None
292 }
293 }
294
295 pub fn name_at(&self, index: usize) -> &Fragment {
296 &self.names[index]
297 }
298
299 pub fn data_at(&self, index: usize) -> &ColumnBuffer {
300 &self.columns[index]
301 }
302
303 pub fn data_at_mut(&mut self, index: usize) -> &mut ColumnBuffer {
304 &mut self.columns.make_mut()[index]
305 }
306
307 pub fn row(&self, i: usize) -> Vec<Value> {
308 self.columns.iter().map(|c| c.get_value(i)).collect()
309 }
310
311 pub fn column(&self, name: &str) -> Option<ColumnRef<'_>> {
312 self.names.iter().position(|n| n.text() == name).and_then(|i| self.get(i))
313 }
314
315 pub fn row_count(&self) -> usize {
316 if !self.row_numbers.is_empty() {
317 self.row_numbers.len()
318 } else {
319 self.columns.first().map_or(0, |col| col.len())
320 }
321 }
322
323 pub fn has_rows(&self) -> bool {
324 self.row_count() > 0
325 }
326
327 pub fn is_scalar(&self) -> bool {
328 self.len() == 1 && self.row_count() == 1
329 }
330
331 pub fn get_row(&self, index: usize) -> Vec<Value> {
332 self.columns.iter().map(|col| col.get_value(index)).collect()
333 }
334
335 #[track_caller]
336 pub fn assert_invariants(&self, ctx: &str) {
337 let n = self.columns.first().map_or(0, |c| c.len());
338 for (i, col) in self.columns.iter().enumerate() {
339 assert_eq!(
340 col.len(),
341 n,
342 "{ctx}: Columns column[{i}] has length {} but columns[0] has length {n}",
343 col.len(),
344 );
345 }
346 assert!(
347 self.row_numbers.is_empty() || self.row_numbers.len() == n,
348 "{ctx}: Columns.row_numbers.len() = {} but columns[0].len() = {n}",
349 self.row_numbers.len(),
350 );
351 assert!(
352 self.created_at.is_empty() || self.created_at.len() == n,
353 "{ctx}: Columns.created_at.len() = {} but columns[0].len() = {n}",
354 self.created_at.len(),
355 );
356 assert!(
357 self.updated_at.is_empty() || self.updated_at.len() == n,
358 "{ctx}: Columns.updated_at.len() = {} but columns[0].len() = {n}",
359 self.updated_at.len(),
360 );
361 }
362}
363
364impl Columns {
365 pub fn from_rows(names: &[&str], result_rows: &[Vec<Value>]) -> Self {
366 let column_count = names.len();
367
368 let mut name_vec: Vec<Fragment> = names.iter().map(Fragment::internal).collect();
369 let mut buffers: Vec<ColumnBuffer> =
370 (0..column_count).map(|_| ColumnBuffer::none_typed(Type::Boolean, 0)).collect();
371
372 for row in result_rows {
373 assert_eq!(row.len(), column_count, "row length does not match column count");
374 for (i, value) in row.iter().enumerate() {
375 buffers[i].push_value(value.clone());
376 }
377 }
378
379 let _ = &mut name_vec;
380 Self {
381 row_numbers: CowVec::new(Vec::new()),
382 created_at: CowVec::new(Vec::new()),
383 updated_at: CowVec::new(Vec::new()),
384 columns: CowVec::new(buffers),
385 names: CowVec::new(name_vec),
386 }
387 }
388
389 pub fn from_encoded_rows(shape: &RowShape, ids: &[RowNumber], rows: &[EncodedRow]) -> Self {
390 assert_eq!(ids.len(), rows.len(), "ids length must match rows length");
391 let fields = shape.fields();
392 let row_count = rows.len();
393
394 let mut columns_vec: Vec<ColumnWithName> = Vec::with_capacity(fields.len());
395 for field in fields.iter() {
396 columns_vec.push(ColumnWithName {
397 name: Fragment::internal(&field.name),
398 data: ColumnBuffer::with_capacity(field.constraint.get_type(), row_count),
399 });
400 }
401
402 for encoded in rows {
403 for (i, _) in fields.iter().enumerate() {
404 columns_vec[i].data.push_value(shape.get_value(encoded, i));
405 }
406 }
407
408 let row_numbers: Vec<RowNumber> = ids.to_vec();
409 let created_at: Vec<DateTime> =
410 rows.iter().map(|r| DateTime::from_nanos(r.created_at_nanos())).collect();
411 let updated_at: Vec<DateTime> =
412 rows.iter().map(|r| DateTime::from_nanos(r.updated_at_nanos())).collect();
413
414 Self::with_system_columns(columns_vec, row_numbers, created_at, updated_at)
415 }
416}
417
418impl Columns {
419 pub fn empty() -> Self {
420 Self {
421 row_numbers: CowVec::with_capacity(1),
422 created_at: CowVec::with_capacity(1),
423 updated_at: CowVec::with_capacity(1),
424 columns: CowVec::with_capacity(16),
425 names: CowVec::with_capacity(16),
426 }
427 }
428}
429
430impl Default for Columns {
431 fn default() -> Self {
432 Self::empty()
433 }
434}
435
436impl Columns {
437 pub fn extract_by_indices(&self, indices: &[usize]) -> Columns {
438 if indices.is_empty() {
439 return Columns::empty();
440 }
441
442 let mut new_buffers: Vec<ColumnBuffer> = Vec::with_capacity(self.columns.len());
443 for col in self.columns.iter() {
444 let mut new_data = ColumnBuffer::with_capacity(col.get_type(), indices.len());
445 for &idx in indices {
446 new_data.push_value(col.get_value(idx));
447 }
448 new_buffers.push(new_data);
449 }
450
451 let new_row_numbers: Vec<RowNumber> = if self.row_numbers.is_empty() {
452 Vec::new()
453 } else {
454 indices.iter().map(|&i| self.row_numbers[i]).collect()
455 };
456 let new_created_at: Vec<DateTime> = if self.created_at.is_empty() {
457 Vec::new()
458 } else {
459 indices.iter().map(|&i| self.created_at[i]).collect()
460 };
461 let new_updated_at: Vec<DateTime> = if self.updated_at.is_empty() {
462 Vec::new()
463 } else {
464 indices.iter().map(|&i| self.updated_at[i]).collect()
465 };
466 Columns {
467 row_numbers: CowVec::new(new_row_numbers),
468 created_at: CowVec::new(new_created_at),
469 updated_at: CowVec::new(new_updated_at),
470 columns: CowVec::new(new_buffers),
471 names: self.names.clone(),
472 }
473 }
474
475 pub fn extract_row(&self, index: usize) -> Columns {
476 self.extract_by_indices(&[index])
477 }
478
479 pub fn append_rows_by_indices(&mut self, source: &Columns, indices: &[usize]) {
480 if indices.is_empty() {
481 return;
482 }
483
484 if self.columns.is_empty() {
485 *self = source.extract_by_indices(indices);
486 return;
487 }
488
489 assert_eq!(
490 self.columns.len(),
491 source.columns.len(),
492 "append_rows: column count mismatch (self={}, source={})",
493 self.columns.len(),
494 source.columns.len(),
495 );
496
497 let self_cols = self.columns.make_mut();
498 for (i, src_col) in source.columns.iter().enumerate() {
499 for &idx in indices {
500 self_cols[i].push_value(src_col.get_value(idx));
501 }
502 }
503
504 if !source.row_numbers.is_empty() {
505 let rns = self.row_numbers.make_mut();
506 for &idx in indices {
507 rns.push(source.row_numbers[idx]);
508 }
509 }
510 if !source.created_at.is_empty() {
511 let cr = self.created_at.make_mut();
512 for &idx in indices {
513 cr.push(source.created_at[idx]);
514 }
515 }
516 if !source.updated_at.is_empty() {
517 let up = self.updated_at.make_mut();
518 for &idx in indices {
519 up.push(source.updated_at[idx]);
520 }
521 }
522 }
523
524 pub fn append_all(&mut self, source: Columns) -> Result<()> {
525 if source.row_count() == 0 {
526 return Ok(());
527 }
528
529 if self.columns.is_empty() {
530 *self = source;
531 return Ok(());
532 }
533
534 if self.columns.len() != source.columns.len() {
535 return_internal_error!(
536 "Columns::append_all: column count mismatch (self={}, source={})",
537 self.columns.len(),
538 source.columns.len()
539 );
540 }
541
542 if self.row_numbers.is_empty() != source.row_numbers.is_empty() {
543 return_internal_error!(
544 "Columns::append_all: row_numbers population mismatch (self_empty={}, source_empty={})",
545 self.row_numbers.is_empty(),
546 source.row_numbers.is_empty()
547 );
548 }
549 if self.created_at.is_empty() != source.created_at.is_empty() {
550 return_internal_error!(
551 "Columns::append_all: created_at population mismatch (self_empty={}, source_empty={})",
552 self.created_at.is_empty(),
553 source.created_at.is_empty()
554 );
555 }
556 if self.updated_at.is_empty() != source.updated_at.is_empty() {
557 return_internal_error!(
558 "Columns::append_all: updated_at population mismatch (self_empty={}, source_empty={})",
559 self.updated_at.is_empty(),
560 source.updated_at.is_empty()
561 );
562 }
563
564 let dest_cols = self.columns.make_mut();
565 let source_cols = source.columns.into_inner();
566 for (i, src_col) in source_cols.into_iter().enumerate() {
567 dest_cols[i].extend(src_col)?;
568 }
569
570 if !source.row_numbers.is_empty() {
571 self.row_numbers.extend_from_slice(source.row_numbers.as_slice());
572 }
573 if !source.created_at.is_empty() {
574 self.created_at.extend_from_slice(source.created_at.as_slice());
575 }
576 if !source.updated_at.is_empty() {
577 self.updated_at.extend_from_slice(source.updated_at.as_slice());
578 }
579
580 Ok(())
581 }
582
583 pub fn concat(batches: Vec<Columns>) -> Result<Option<Columns>> {
584 let mut iter = batches.into_iter();
585 let mut merged = match iter.next() {
586 Some(first) => first,
587 None => return Ok(None),
588 };
589 for cols in iter {
590 merged.append_all(cols)?;
591 }
592 if merged.row_count() == 0 {
593 return Ok(None);
594 }
595 Ok(Some(merged))
596 }
597
598 pub fn remove_row(&mut self, row_number: RowNumber) -> bool {
599 let pos = self.row_numbers.iter().position(|&r| r == row_number);
600 let Some(idx) = pos else {
601 return false;
602 };
603
604 let kept_indices: Vec<usize> = (0..self.row_count()).filter(|&i| i != idx).collect();
605 *self = self.extract_by_indices(&kept_indices);
606 true
607 }
608
609 pub fn project_by_names(&self, names: &[String]) -> Columns {
610 let mut new_names = Vec::new();
611 let mut new_buffers = Vec::new();
612
613 for name in names {
614 if let Some(pos) = self.names.iter().position(|n| n.text() == name.as_str()) {
615 new_names.push(self.names[pos].clone());
616 new_buffers.push(self.columns[pos].clone());
617 }
618 }
619
620 if new_buffers.is_empty() {
621 return Columns::empty();
622 }
623
624 Columns {
625 row_numbers: self.row_numbers.clone(),
626 created_at: self.created_at.clone(),
627 updated_at: self.updated_at.clone(),
628 columns: CowVec::new(new_buffers),
629 names: CowVec::new(new_names),
630 }
631 }
632
633 pub fn partition_by_keys<K: Hash + Eq + Clone>(&self, keys: &[K]) -> IndexMap<K, Columns> {
634 assert_eq!(keys.len(), self.row_count(), "keys length must match row count");
635
636 let mut key_to_indices: IndexMap<K, Vec<usize>> = IndexMap::new();
637 for (idx, key) in keys.iter().enumerate() {
638 key_to_indices.entry(key.clone()).or_default().push(idx);
639 }
640
641 key_to_indices.into_iter().map(|(key, indices)| (key, self.extract_by_indices(&indices))).collect()
642 }
643
644 pub fn from_row(row: &Row) -> Self {
645 let mut out = Columns::empty();
646 out.reset_from_row(row);
647 out
648 }
649
650 pub fn reset_from_row(&mut self, row: &Row) {
651 let field_count = row.shape.fields().len();
652
653 self.row_numbers.clear();
654 self.created_at.clear();
655 self.updated_at.clear();
656 self.columns.clear();
657 self.names.clear();
658
659 self.columns.make_mut().reserve(field_count);
660 self.names.make_mut().reserve(field_count);
661
662 self.row_numbers.push(row.number);
663 self.created_at.push(DateTime::from_nanos(row.encoded.created_at_nanos()));
664 self.updated_at.push(DateTime::from_nanos(row.encoded.updated_at_nanos()));
665
666 for (idx, field) in row.shape.fields().iter().enumerate() {
667 let value = row.shape.get_value(&row.encoded, idx);
668
669 let column_type = if matches!(value, Value::None { .. }) {
670 field.constraint.get_type()
671 } else {
672 value.get_type()
673 };
674
675 let mut data = if column_type.is_option() {
676 ColumnBuffer::none_typed(column_type.clone(), 0)
677 } else {
678 ColumnBuffer::with_capacity(column_type.clone(), 1)
679 };
680 data.push_value(value);
681
682 if column_type == Type::DictionaryId
683 && let ColumnBuffer::DictionaryId(container) = &mut data
684 && let Some(Constraint::Dictionary(dict_id, _)) = field.constraint.constraint()
685 {
686 container.set_dictionary_id(*dict_id);
687 }
688
689 let name = row.shape.get_field_name(idx).expect("RowShape missing name for field");
690
691 self.names.push(Fragment::internal(name));
692 self.columns.push(data);
693 }
694 }
695
696 pub fn reset_from_row_with_pool(&mut self, row: &Row, pool: &ColumnBufferPool) {
697 let field_count = row.shape.fields().len();
698
699 self.row_numbers.clear();
700 self.created_at.clear();
701 self.updated_at.clear();
702 self.names.clear();
703
704 self.row_numbers.push(row.number);
705 self.created_at.push(DateTime::from_nanos(row.encoded.created_at_nanos()));
706 self.updated_at.push(DateTime::from_nanos(row.encoded.updated_at_nanos()));
707
708 let columns_vec = self.columns.make_mut();
709 let names_vec = self.names.make_mut();
710
711 while columns_vec.len() > field_count {
712 if let Some(buf) = columns_vec.pop() {
713 pool.release(buf);
714 }
715 }
716
717 columns_vec.reserve(field_count);
718 names_vec.reserve(field_count);
719
720 for (idx, field) in row.shape.fields().iter().enumerate() {
721 let value = row.shape.get_value(&row.encoded, idx);
722
723 let column_type = if matches!(value, Value::None { .. }) {
724 field.constraint.get_type()
725 } else {
726 value.get_type()
727 };
728
729 if idx < columns_vec.len() {
730 if columns_vec[idx].get_type() == column_type {
731 columns_vec[idx].clear();
732 } else {
733 let replacement = if column_type.is_option() {
734 ColumnBuffer::none_typed(column_type.clone(), 0)
735 } else {
736 pool.acquire(&column_type, 1)
737 };
738 let old = mem::replace(&mut columns_vec[idx], replacement);
739 pool.release(old);
740 }
741 } else {
742 let fresh = if column_type.is_option() {
743 ColumnBuffer::none_typed(column_type.clone(), 0)
744 } else {
745 pool.acquire(&column_type, 1)
746 };
747 columns_vec.push(fresh);
748 }
749
750 columns_vec[idx].push_value(value);
751
752 if column_type == Type::DictionaryId
753 && let ColumnBuffer::DictionaryId(container) = &mut columns_vec[idx]
754 && let Some(Constraint::Dictionary(dict_id, _)) = field.constraint.constraint()
755 {
756 container.set_dictionary_id(*dict_id);
757 }
758
759 let name = row.shape.get_field_name(idx).expect("RowShape missing name for field");
760 names_vec.push(Fragment::internal(name));
761 }
762 }
763
764 pub fn to_single_row(&self) -> Row {
765 assert_eq!(self.row_count(), 1, "to_row() requires exactly 1 row, got {}", self.row_count());
766 assert_eq!(
767 self.row_numbers.len(),
768 1,
769 "to_row() requires exactly 1 row number, got {}",
770 self.row_numbers.len()
771 );
772
773 let row_number = *self.row_numbers.first().unwrap();
774
775 let fields: Vec<RowShapeField> = self
776 .names
777 .iter()
778 .zip(self.columns.iter())
779 .map(|(name, data)| RowShapeField::unconstrained(name.text().to_string(), data.get_type()))
780 .collect();
781
782 let layout = RowShape::new(fields);
783 let mut encoded = layout.allocate();
784
785 let values: Vec<Value> = self.columns.iter().map(|col| col.get_value(0)).collect();
786 layout.set_values(&mut encoded, &values);
787
788 Row {
789 number: row_number,
790 encoded,
791 shape: layout,
792 }
793 }
794}
795
796#[cfg(test)]
797pub mod tests {
798 use reifydb_type::value::{date::Date, datetime::DateTime, duration::Duration, time::Time};
799
800 use super::*;
801
802 #[test]
803 fn test_single_row_temporal_types() {
804 let date = Date::from_ymd(2025, 1, 15).unwrap();
805 let datetime = DateTime::from_timestamp(1642694400).unwrap();
806 let time = Time::from_hms(14, 30, 45).unwrap();
807 let duration = Duration::from_days(30).unwrap();
808
809 let columns = Columns::single_row([
810 ("date_col", Value::Date(date.clone())),
811 ("datetime_col", Value::DateTime(datetime.clone())),
812 ("time_col", Value::Time(time.clone())),
813 ("interval_col", Value::Duration(duration.clone())),
814 ]);
815
816 assert_eq!(columns.len(), 4);
817 assert_eq!(columns.shape(), (1, 4));
818
819 assert_eq!(columns.column("date_col").unwrap().data().get_value(0), Value::Date(date));
820 assert_eq!(columns.column("datetime_col").unwrap().data().get_value(0), Value::DateTime(datetime));
821 assert_eq!(columns.column("time_col").unwrap().data().get_value(0), Value::Time(time));
822 assert_eq!(columns.column("interval_col").unwrap().data().get_value(0), Value::Duration(duration));
823 }
824
825 #[test]
826 fn test_single_row_mixed_types() {
827 let date = Date::from_ymd(2025, 7, 15).unwrap();
828 let time = Time::from_hms(9, 15, 30).unwrap();
829
830 let columns = Columns::single_row([
831 ("bool_col", Value::Boolean(true)),
832 ("int_col", Value::Int4(42)),
833 ("str_col", Value::Utf8("hello".to_string())),
834 ("date_col", Value::Date(date.clone())),
835 ("time_col", Value::Time(time.clone())),
836 ("none_col", Value::none()),
837 ]);
838
839 assert_eq!(columns.len(), 6);
840 assert_eq!(columns.shape(), (1, 6));
841
842 assert_eq!(columns.column("bool_col").unwrap().data().get_value(0), Value::Boolean(true));
843 assert_eq!(columns.column("int_col").unwrap().data().get_value(0), Value::Int4(42));
844 assert_eq!(columns.column("str_col").unwrap().data().get_value(0), Value::Utf8("hello".to_string()));
845 assert_eq!(columns.column("date_col").unwrap().data().get_value(0), Value::Date(date));
846 assert_eq!(columns.column("time_col").unwrap().data().get_value(0), Value::Time(time));
847 assert_eq!(columns.column("none_col").unwrap().data().get_value(0), Value::none());
848 }
849
850 #[test]
851 fn test_single_row_normal_column_names_work() {
852 let columns = Columns::single_row([("normal_column", Value::Int4(42))]);
853 assert_eq!(columns.len(), 1);
854 assert_eq!(columns.column("normal_column").unwrap().data().get_value(0), Value::Int4(42));
855 }
856}