Skip to main content

reifydb_core/value/column/
columns.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	hash::Hash,
6	mem,
7	ops::{Index, IndexMut},
8};
9
10use indexmap::IndexMap;
11use reifydb_type::{
12	Result,
13	fragment::Fragment,
14	util::cowvec::CowVec,
15	value::{Value, constraint::Constraint, datetime::DateTime, row_number::RowNumber, r#type::Type},
16};
17use serde::{Deserialize, Serialize};
18
19use crate::{
20	encoded::{
21		row::EncodedRow,
22		shape::{RowShape, RowShapeField},
23	},
24	interface::catalog::column::Column as CatalogColumn,
25	return_internal_error,
26	row::Row,
27	value::column::{
28		ColumnBuffer, ColumnWithName, buffer::pool::ColumnBufferPool, data::Column, headers::ColumnHeaders,
29	},
30};
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct Columns {
34	pub row_numbers: CowVec<RowNumber>,
35	pub created_at: CowVec<DateTime>,
36	pub updated_at: CowVec<DateTime>,
37	pub columns: CowVec<ColumnBuffer>,
38	pub names: CowVec<Fragment>,
39}
40
41#[derive(Debug, Clone, Copy)]
42pub struct ColumnRef<'a> {
43	name: &'a Fragment,
44	data: &'a ColumnBuffer,
45}
46
47impl Index<usize> for Columns {
48	type Output = ColumnBuffer;
49
50	fn index(&self, index: usize) -> &Self::Output {
51		&self.columns[index]
52	}
53}
54
55impl IndexMut<usize> for Columns {
56	fn index_mut(&mut self, index: usize) -> &mut Self::Output {
57		&mut self.columns.make_mut()[index]
58	}
59}
60
61impl<'a> ColumnRef<'a> {
62	pub fn new(name: &'a Fragment, data: &'a ColumnBuffer) -> Self {
63		Self {
64			name,
65			data,
66		}
67	}
68
69	pub fn name(&self) -> &'a Fragment {
70		self.name
71	}
72
73	pub fn data(&self) -> &'a ColumnBuffer {
74		self.data
75	}
76
77	pub fn get_type(&self) -> Type {
78		self.data.get_type()
79	}
80
81	pub fn column(&self) -> Column {
82		Column::from_column_buffer(self.data.clone())
83	}
84
85	pub fn with_new_data(&self, data: ColumnBuffer) -> ColumnWithName {
86		ColumnWithName::new(self.name.clone(), data)
87	}
88}
89
90fn value_to_buffer(value: Value) -> ColumnBuffer {
91	match value {
92		Value::None {
93			..
94		} => ColumnBuffer::none_typed(Type::Boolean, 1),
95		Value::Boolean(v) => ColumnBuffer::bool([v]),
96		Value::Float4(v) => ColumnBuffer::float4([v.into()]),
97		Value::Float8(v) => ColumnBuffer::float8([v.into()]),
98		Value::Int1(v) => ColumnBuffer::int1([v]),
99		Value::Int2(v) => ColumnBuffer::int2([v]),
100		Value::Int4(v) => ColumnBuffer::int4([v]),
101		Value::Int8(v) => ColumnBuffer::int8([v]),
102		Value::Int16(v) => ColumnBuffer::int16([v]),
103		Value::Utf8(v) => ColumnBuffer::utf8([v]),
104		Value::Uint1(v) => ColumnBuffer::uint1([v]),
105		Value::Uint2(v) => ColumnBuffer::uint2([v]),
106		Value::Uint4(v) => ColumnBuffer::uint4([v]),
107		Value::Uint8(v) => ColumnBuffer::uint8([v]),
108		Value::Uint16(v) => ColumnBuffer::uint16([v]),
109		Value::Date(v) => ColumnBuffer::date([v]),
110		Value::DateTime(v) => ColumnBuffer::datetime([v]),
111		Value::Time(v) => ColumnBuffer::time([v]),
112		Value::Duration(v) => ColumnBuffer::duration([v]),
113		Value::IdentityId(v) => ColumnBuffer::identity_id([v]),
114		Value::Uuid4(v) => ColumnBuffer::uuid4([v]),
115		Value::Uuid7(v) => ColumnBuffer::uuid7([v]),
116		Value::Blob(v) => ColumnBuffer::blob([v]),
117		Value::Int(v) => ColumnBuffer::int(vec![v]),
118		Value::Uint(v) => ColumnBuffer::uint(vec![v]),
119		Value::Decimal(v) => ColumnBuffer::decimal(vec![v]),
120		Value::DictionaryId(v) => ColumnBuffer::dictionary_id(vec![v]),
121		Value::Any(v) => ColumnBuffer::any(vec![v]),
122		Value::Type(v) => ColumnBuffer::any(vec![Box::new(Value::Type(v))]),
123		Value::List(v) => ColumnBuffer::any(vec![Box::new(Value::List(v))]),
124		Value::Record(v) => ColumnBuffer::any(vec![Box::new(Value::Record(v))]),
125		Value::Tuple(v) => ColumnBuffer::any(vec![Box::new(Value::Tuple(v))]),
126	}
127}
128
129impl Columns {
130	pub fn scalar_value(&self) -> Value {
131		debug_assert_eq!(self.len(), 1, "scalar_value() requires exactly 1 column, got {}", self.len());
132		debug_assert_eq!(
133			self.row_count(),
134			1,
135			"scalar_value() requires exactly 1 row, got {}",
136			self.row_count()
137		);
138		self.columns[0].get_value(0)
139	}
140
141	pub fn new(columns: Vec<ColumnWithName>) -> Self {
142		let n = columns.first().map_or(0, |c| c.data.len());
143		assert!(columns.iter().all(|c| c.data.len() == n));
144
145		let mut names = Vec::with_capacity(columns.len());
146		let mut buffers = Vec::with_capacity(columns.len());
147		for c in columns {
148			names.push(c.name);
149			buffers.push(c.data);
150		}
151
152		Self {
153			row_numbers: CowVec::new(Vec::new()),
154			created_at: CowVec::new(Vec::new()),
155			updated_at: CowVec::new(Vec::new()),
156			columns: CowVec::new(buffers),
157			names: CowVec::new(names),
158		}
159	}
160
161	pub fn with_system_columns(
162		columns: Vec<ColumnWithName>,
163		row_numbers: Vec<RowNumber>,
164		created_at: Vec<DateTime>,
165		updated_at: Vec<DateTime>,
166	) -> Self {
167		let n = columns.first().map_or(0, |c| c.data.len());
168		assert!(columns.iter().all(|c| c.data.len() == n));
169		assert_eq!(row_numbers.len(), n, "row_numbers length must match column data length");
170		assert_eq!(created_at.len(), n, "created_at length must match column data length");
171		assert_eq!(updated_at.len(), n, "updated_at length must match column data length");
172
173		let mut names = Vec::with_capacity(columns.len());
174		let mut buffers = Vec::with_capacity(columns.len());
175		for c in columns {
176			names.push(c.name);
177			buffers.push(c.data);
178		}
179
180		Self {
181			row_numbers: CowVec::new(row_numbers),
182			created_at: CowVec::new(created_at),
183			updated_at: CowVec::new(updated_at),
184			columns: CowVec::new(buffers),
185			names: CowVec::new(names),
186		}
187	}
188
189	pub fn single_row<'b>(rows: impl IntoIterator<Item = (&'b str, Value)>) -> Columns {
190		let mut names = Vec::new();
191		let mut buffers = Vec::new();
192		for (name, value) in rows {
193			names.push(Fragment::internal(name));
194			buffers.push(value_to_buffer(value));
195		}
196		Self {
197			row_numbers: CowVec::new(Vec::new()),
198			created_at: CowVec::new(Vec::new()),
199			updated_at: CowVec::new(Vec::new()),
200			columns: CowVec::new(buffers),
201			names: CowVec::new(names),
202		}
203	}
204
205	pub fn with_row_numbers(mut self, row_numbers: Vec<RowNumber>) -> Self {
206		let n = row_numbers.len();
207		self.row_numbers = CowVec::new(row_numbers);
208		if self.created_at.len() != n {
209			let now = DateTime::default();
210			self.created_at = CowVec::new(vec![now; n]);
211			self.updated_at = CowVec::new(vec![now; n]);
212		}
213		self
214	}
215
216	pub fn from_catalog_columns(cols: &[CatalogColumn]) -> Self {
217		let mut names = Vec::with_capacity(cols.len());
218		let mut buffers = Vec::with_capacity(cols.len());
219		for col in cols {
220			names.push(Fragment::internal(&col.name));
221			buffers.push(ColumnBuffer::with_capacity(col.constraint.get_type(), 0));
222		}
223		Self {
224			row_numbers: CowVec::new(Vec::new()),
225			created_at: CowVec::new(Vec::new()),
226			updated_at: CowVec::new(Vec::new()),
227			columns: CowVec::new(buffers),
228			names: CowVec::new(names),
229		}
230	}
231
232	pub fn apply_headers(&mut self, headers: &ColumnHeaders) {
233		let n = self.len();
234		let names = self.names.make_mut();
235		for (i, name) in headers.columns.iter().enumerate() {
236			if i < n {
237				names[i] = name.clone();
238			}
239		}
240	}
241}
242
243impl Columns {
244	pub fn number(&self) -> RowNumber {
245		assert_eq!(self.row_count(), 1, "number() requires exactly 1 row, got {}", self.row_count());
246		if self.row_numbers.is_empty() {
247			RowNumber(0)
248		} else {
249			self.row_numbers[0]
250		}
251	}
252
253	pub fn shape(&self) -> (usize, usize) {
254		let row_count = if !self.row_numbers.is_empty() {
255			self.row_numbers.len()
256		} else {
257			self.columns.first().map(|c| c.len()).unwrap_or(0)
258		};
259		(row_count, self.len())
260	}
261
262	pub fn len(&self) -> usize {
263		self.columns.len()
264	}
265
266	pub fn is_empty(&self) -> bool {
267		self.columns.is_empty()
268	}
269
270	pub fn iter(&self) -> impl Iterator<Item = ColumnRef<'_>> + '_ {
271		self.names.iter().zip(self.columns.iter()).map(|(n, d)| ColumnRef::new(n, d))
272	}
273
274	pub fn first(&self) -> Option<ColumnRef<'_>> {
275		self.get(0)
276	}
277
278	pub fn last(&self) -> Option<ColumnRef<'_>> {
279		let n = self.len();
280		if n == 0 {
281			None
282		} else {
283			self.get(n - 1)
284		}
285	}
286
287	pub fn get(&self, index: usize) -> Option<ColumnRef<'_>> {
288		if index < self.len() {
289			Some(ColumnRef::new(&self.names[index], &self.columns[index]))
290		} else {
291			None
292		}
293	}
294
295	pub fn name_at(&self, index: usize) -> &Fragment {
296		&self.names[index]
297	}
298
299	pub fn data_at(&self, index: usize) -> &ColumnBuffer {
300		&self.columns[index]
301	}
302
303	pub fn data_at_mut(&mut self, index: usize) -> &mut ColumnBuffer {
304		&mut self.columns.make_mut()[index]
305	}
306
307	pub fn row(&self, i: usize) -> Vec<Value> {
308		self.columns.iter().map(|c| c.get_value(i)).collect()
309	}
310
311	pub fn column(&self, name: &str) -> Option<ColumnRef<'_>> {
312		self.names.iter().position(|n| n.text() == name).and_then(|i| self.get(i))
313	}
314
315	pub fn row_count(&self) -> usize {
316		if !self.row_numbers.is_empty() {
317			self.row_numbers.len()
318		} else {
319			self.columns.first().map_or(0, |col| col.len())
320		}
321	}
322
323	pub fn has_rows(&self) -> bool {
324		self.row_count() > 0
325	}
326
327	pub fn is_scalar(&self) -> bool {
328		self.len() == 1 && self.row_count() == 1
329	}
330
331	pub fn get_row(&self, index: usize) -> Vec<Value> {
332		self.columns.iter().map(|col| col.get_value(index)).collect()
333	}
334
335	#[track_caller]
336	pub fn assert_invariants(&self, ctx: &str) {
337		let n = self.columns.first().map_or(0, |c| c.len());
338		for (i, col) in self.columns.iter().enumerate() {
339			assert_eq!(
340				col.len(),
341				n,
342				"{ctx}: Columns column[{i}] has length {} but columns[0] has length {n}",
343				col.len(),
344			);
345		}
346		assert!(
347			self.row_numbers.is_empty() || self.row_numbers.len() == n,
348			"{ctx}: Columns.row_numbers.len() = {} but columns[0].len() = {n}",
349			self.row_numbers.len(),
350		);
351		assert!(
352			self.created_at.is_empty() || self.created_at.len() == n,
353			"{ctx}: Columns.created_at.len() = {} but columns[0].len() = {n}",
354			self.created_at.len(),
355		);
356		assert!(
357			self.updated_at.is_empty() || self.updated_at.len() == n,
358			"{ctx}: Columns.updated_at.len() = {} but columns[0].len() = {n}",
359			self.updated_at.len(),
360		);
361	}
362}
363
364impl Columns {
365	pub fn from_rows(names: &[&str], result_rows: &[Vec<Value>]) -> Self {
366		let column_count = names.len();
367
368		let mut name_vec: Vec<Fragment> = names.iter().map(Fragment::internal).collect();
369		let mut buffers: Vec<ColumnBuffer> =
370			(0..column_count).map(|_| ColumnBuffer::none_typed(Type::Boolean, 0)).collect();
371
372		for row in result_rows {
373			assert_eq!(row.len(), column_count, "row length does not match column count");
374			for (i, value) in row.iter().enumerate() {
375				buffers[i].push_value(value.clone());
376			}
377		}
378
379		let _ = &mut name_vec;
380		Self {
381			row_numbers: CowVec::new(Vec::new()),
382			created_at: CowVec::new(Vec::new()),
383			updated_at: CowVec::new(Vec::new()),
384			columns: CowVec::new(buffers),
385			names: CowVec::new(name_vec),
386		}
387	}
388
389	pub fn from_encoded_rows(shape: &RowShape, ids: &[RowNumber], rows: &[EncodedRow]) -> Self {
390		assert_eq!(ids.len(), rows.len(), "ids length must match rows length");
391		let fields = shape.fields();
392		let row_count = rows.len();
393
394		let mut columns_vec: Vec<ColumnWithName> = Vec::with_capacity(fields.len());
395		for field in fields.iter() {
396			columns_vec.push(ColumnWithName {
397				name: Fragment::internal(&field.name),
398				data: ColumnBuffer::with_capacity(field.constraint.get_type(), row_count),
399			});
400		}
401
402		for encoded in rows {
403			for (i, _) in fields.iter().enumerate() {
404				columns_vec[i].data.push_value(shape.get_value(encoded, i));
405			}
406		}
407
408		let row_numbers: Vec<RowNumber> = ids.to_vec();
409		let created_at: Vec<DateTime> =
410			rows.iter().map(|r| DateTime::from_nanos(r.created_at_nanos())).collect();
411		let updated_at: Vec<DateTime> =
412			rows.iter().map(|r| DateTime::from_nanos(r.updated_at_nanos())).collect();
413
414		Self::with_system_columns(columns_vec, row_numbers, created_at, updated_at)
415	}
416}
417
418impl Columns {
419	pub fn empty() -> Self {
420		Self {
421			row_numbers: CowVec::with_capacity(1),
422			created_at: CowVec::with_capacity(1),
423			updated_at: CowVec::with_capacity(1),
424			columns: CowVec::with_capacity(16),
425			names: CowVec::with_capacity(16),
426		}
427	}
428}
429
430impl Default for Columns {
431	fn default() -> Self {
432		Self::empty()
433	}
434}
435
436impl Columns {
437	pub fn extract_by_indices(&self, indices: &[usize]) -> Columns {
438		if indices.is_empty() {
439			return Columns::empty();
440		}
441
442		let mut new_buffers: Vec<ColumnBuffer> = Vec::with_capacity(self.columns.len());
443		for col in self.columns.iter() {
444			let mut new_data = ColumnBuffer::with_capacity(col.get_type(), indices.len());
445			for &idx in indices {
446				new_data.push_value(col.get_value(idx));
447			}
448			new_buffers.push(new_data);
449		}
450
451		let new_row_numbers: Vec<RowNumber> = if self.row_numbers.is_empty() {
452			Vec::new()
453		} else {
454			indices.iter().map(|&i| self.row_numbers[i]).collect()
455		};
456		let new_created_at: Vec<DateTime> = if self.created_at.is_empty() {
457			Vec::new()
458		} else {
459			indices.iter().map(|&i| self.created_at[i]).collect()
460		};
461		let new_updated_at: Vec<DateTime> = if self.updated_at.is_empty() {
462			Vec::new()
463		} else {
464			indices.iter().map(|&i| self.updated_at[i]).collect()
465		};
466		Columns {
467			row_numbers: CowVec::new(new_row_numbers),
468			created_at: CowVec::new(new_created_at),
469			updated_at: CowVec::new(new_updated_at),
470			columns: CowVec::new(new_buffers),
471			names: self.names.clone(),
472		}
473	}
474
475	pub fn extract_row(&self, index: usize) -> Columns {
476		self.extract_by_indices(&[index])
477	}
478
479	pub fn append_rows_by_indices(&mut self, source: &Columns, indices: &[usize]) {
480		if indices.is_empty() {
481			return;
482		}
483
484		if self.columns.is_empty() {
485			*self = source.extract_by_indices(indices);
486			return;
487		}
488
489		assert_eq!(
490			self.columns.len(),
491			source.columns.len(),
492			"append_rows: column count mismatch (self={}, source={})",
493			self.columns.len(),
494			source.columns.len(),
495		);
496
497		let self_cols = self.columns.make_mut();
498		for (i, src_col) in source.columns.iter().enumerate() {
499			for &idx in indices {
500				self_cols[i].push_value(src_col.get_value(idx));
501			}
502		}
503
504		if !source.row_numbers.is_empty() {
505			let rns = self.row_numbers.make_mut();
506			for &idx in indices {
507				rns.push(source.row_numbers[idx]);
508			}
509		}
510		if !source.created_at.is_empty() {
511			let cr = self.created_at.make_mut();
512			for &idx in indices {
513				cr.push(source.created_at[idx]);
514			}
515		}
516		if !source.updated_at.is_empty() {
517			let up = self.updated_at.make_mut();
518			for &idx in indices {
519				up.push(source.updated_at[idx]);
520			}
521		}
522	}
523
524	pub fn append_all(&mut self, source: Columns) -> Result<()> {
525		if source.row_count() == 0 {
526			return Ok(());
527		}
528
529		if self.columns.is_empty() {
530			*self = source;
531			return Ok(());
532		}
533
534		if self.columns.len() != source.columns.len() {
535			return_internal_error!(
536				"Columns::append_all: column count mismatch (self={}, source={})",
537				self.columns.len(),
538				source.columns.len()
539			);
540		}
541
542		if self.row_numbers.is_empty() != source.row_numbers.is_empty() {
543			return_internal_error!(
544				"Columns::append_all: row_numbers population mismatch (self_empty={}, source_empty={})",
545				self.row_numbers.is_empty(),
546				source.row_numbers.is_empty()
547			);
548		}
549		if self.created_at.is_empty() != source.created_at.is_empty() {
550			return_internal_error!(
551				"Columns::append_all: created_at population mismatch (self_empty={}, source_empty={})",
552				self.created_at.is_empty(),
553				source.created_at.is_empty()
554			);
555		}
556		if self.updated_at.is_empty() != source.updated_at.is_empty() {
557			return_internal_error!(
558				"Columns::append_all: updated_at population mismatch (self_empty={}, source_empty={})",
559				self.updated_at.is_empty(),
560				source.updated_at.is_empty()
561			);
562		}
563
564		let dest_cols = self.columns.make_mut();
565		let source_cols = source.columns.into_inner();
566		for (i, src_col) in source_cols.into_iter().enumerate() {
567			dest_cols[i].extend(src_col)?;
568		}
569
570		if !source.row_numbers.is_empty() {
571			self.row_numbers.extend_from_slice(source.row_numbers.as_slice());
572		}
573		if !source.created_at.is_empty() {
574			self.created_at.extend_from_slice(source.created_at.as_slice());
575		}
576		if !source.updated_at.is_empty() {
577			self.updated_at.extend_from_slice(source.updated_at.as_slice());
578		}
579
580		Ok(())
581	}
582
583	pub fn concat(batches: Vec<Columns>) -> Result<Option<Columns>> {
584		let mut iter = batches.into_iter();
585		let mut merged = match iter.next() {
586			Some(first) => first,
587			None => return Ok(None),
588		};
589		for cols in iter {
590			merged.append_all(cols)?;
591		}
592		if merged.row_count() == 0 {
593			return Ok(None);
594		}
595		Ok(Some(merged))
596	}
597
598	pub fn remove_row(&mut self, row_number: RowNumber) -> bool {
599		let pos = self.row_numbers.iter().position(|&r| r == row_number);
600		let Some(idx) = pos else {
601			return false;
602		};
603
604		let kept_indices: Vec<usize> = (0..self.row_count()).filter(|&i| i != idx).collect();
605		*self = self.extract_by_indices(&kept_indices);
606		true
607	}
608
609	pub fn project_by_names(&self, names: &[String]) -> Columns {
610		let mut new_names = Vec::new();
611		let mut new_buffers = Vec::new();
612
613		for name in names {
614			if let Some(pos) = self.names.iter().position(|n| n.text() == name.as_str()) {
615				new_names.push(self.names[pos].clone());
616				new_buffers.push(self.columns[pos].clone());
617			}
618		}
619
620		if new_buffers.is_empty() {
621			return Columns::empty();
622		}
623
624		Columns {
625			row_numbers: self.row_numbers.clone(),
626			created_at: self.created_at.clone(),
627			updated_at: self.updated_at.clone(),
628			columns: CowVec::new(new_buffers),
629			names: CowVec::new(new_names),
630		}
631	}
632
633	pub fn partition_by_keys<K: Hash + Eq + Clone>(&self, keys: &[K]) -> IndexMap<K, Columns> {
634		assert_eq!(keys.len(), self.row_count(), "keys length must match row count");
635
636		let mut key_to_indices: IndexMap<K, Vec<usize>> = IndexMap::new();
637		for (idx, key) in keys.iter().enumerate() {
638			key_to_indices.entry(key.clone()).or_default().push(idx);
639		}
640
641		key_to_indices.into_iter().map(|(key, indices)| (key, self.extract_by_indices(&indices))).collect()
642	}
643
644	pub fn from_row(row: &Row) -> Self {
645		let mut out = Columns::empty();
646		out.reset_from_row(row);
647		out
648	}
649
650	pub fn reset_from_row(&mut self, row: &Row) {
651		let field_count = row.shape.fields().len();
652
653		self.row_numbers.clear();
654		self.created_at.clear();
655		self.updated_at.clear();
656		self.columns.clear();
657		self.names.clear();
658
659		self.columns.make_mut().reserve(field_count);
660		self.names.make_mut().reserve(field_count);
661
662		self.row_numbers.push(row.number);
663		self.created_at.push(DateTime::from_nanos(row.encoded.created_at_nanos()));
664		self.updated_at.push(DateTime::from_nanos(row.encoded.updated_at_nanos()));
665
666		for (idx, field) in row.shape.fields().iter().enumerate() {
667			let value = row.shape.get_value(&row.encoded, idx);
668
669			let column_type = if matches!(value, Value::None { .. }) {
670				field.constraint.get_type()
671			} else {
672				value.get_type()
673			};
674
675			let mut data = if column_type.is_option() {
676				ColumnBuffer::none_typed(column_type.clone(), 0)
677			} else {
678				ColumnBuffer::with_capacity(column_type.clone(), 1)
679			};
680			data.push_value(value);
681
682			if column_type == Type::DictionaryId
683				&& let ColumnBuffer::DictionaryId(container) = &mut data
684				&& let Some(Constraint::Dictionary(dict_id, _)) = field.constraint.constraint()
685			{
686				container.set_dictionary_id(*dict_id);
687			}
688
689			let name = row.shape.get_field_name(idx).expect("RowShape missing name for field");
690
691			self.names.push(Fragment::internal(name));
692			self.columns.push(data);
693		}
694	}
695
696	pub fn reset_from_row_with_pool(&mut self, row: &Row, pool: &ColumnBufferPool) {
697		let field_count = row.shape.fields().len();
698
699		self.row_numbers.clear();
700		self.created_at.clear();
701		self.updated_at.clear();
702		self.names.clear();
703
704		self.row_numbers.push(row.number);
705		self.created_at.push(DateTime::from_nanos(row.encoded.created_at_nanos()));
706		self.updated_at.push(DateTime::from_nanos(row.encoded.updated_at_nanos()));
707
708		let columns_vec = self.columns.make_mut();
709		let names_vec = self.names.make_mut();
710
711		while columns_vec.len() > field_count {
712			if let Some(buf) = columns_vec.pop() {
713				pool.release(buf);
714			}
715		}
716
717		columns_vec.reserve(field_count);
718		names_vec.reserve(field_count);
719
720		for (idx, field) in row.shape.fields().iter().enumerate() {
721			let value = row.shape.get_value(&row.encoded, idx);
722
723			let column_type = if matches!(value, Value::None { .. }) {
724				field.constraint.get_type()
725			} else {
726				value.get_type()
727			};
728
729			if idx < columns_vec.len() {
730				if columns_vec[idx].get_type() == column_type {
731					columns_vec[idx].clear();
732				} else {
733					let replacement = if column_type.is_option() {
734						ColumnBuffer::none_typed(column_type.clone(), 0)
735					} else {
736						pool.acquire(&column_type, 1)
737					};
738					let old = mem::replace(&mut columns_vec[idx], replacement);
739					pool.release(old);
740				}
741			} else {
742				let fresh = if column_type.is_option() {
743					ColumnBuffer::none_typed(column_type.clone(), 0)
744				} else {
745					pool.acquire(&column_type, 1)
746				};
747				columns_vec.push(fresh);
748			}
749
750			columns_vec[idx].push_value(value);
751
752			if column_type == Type::DictionaryId
753				&& let ColumnBuffer::DictionaryId(container) = &mut columns_vec[idx]
754				&& let Some(Constraint::Dictionary(dict_id, _)) = field.constraint.constraint()
755			{
756				container.set_dictionary_id(*dict_id);
757			}
758
759			let name = row.shape.get_field_name(idx).expect("RowShape missing name for field");
760			names_vec.push(Fragment::internal(name));
761		}
762	}
763
764	pub fn to_single_row(&self) -> Row {
765		assert_eq!(self.row_count(), 1, "to_row() requires exactly 1 row, got {}", self.row_count());
766		assert_eq!(
767			self.row_numbers.len(),
768			1,
769			"to_row() requires exactly 1 row number, got {}",
770			self.row_numbers.len()
771		);
772
773		let row_number = *self.row_numbers.first().unwrap();
774
775		let fields: Vec<RowShapeField> = self
776			.names
777			.iter()
778			.zip(self.columns.iter())
779			.map(|(name, data)| RowShapeField::unconstrained(name.text().to_string(), data.get_type()))
780			.collect();
781
782		let layout = RowShape::new(fields);
783		let mut encoded = layout.allocate();
784
785		let values: Vec<Value> = self.columns.iter().map(|col| col.get_value(0)).collect();
786		layout.set_values(&mut encoded, &values);
787
788		Row {
789			number: row_number,
790			encoded,
791			shape: layout,
792		}
793	}
794}
795
796#[cfg(test)]
797pub mod tests {
798	use reifydb_type::value::{date::Date, datetime::DateTime, duration::Duration, time::Time};
799
800	use super::*;
801
802	#[test]
803	fn test_single_row_temporal_types() {
804		let date = Date::from_ymd(2025, 1, 15).unwrap();
805		let datetime = DateTime::from_timestamp(1642694400).unwrap();
806		let time = Time::from_hms(14, 30, 45).unwrap();
807		let duration = Duration::from_days(30).unwrap();
808
809		let columns = Columns::single_row([
810			("date_col", Value::Date(date.clone())),
811			("datetime_col", Value::DateTime(datetime.clone())),
812			("time_col", Value::Time(time.clone())),
813			("interval_col", Value::Duration(duration.clone())),
814		]);
815
816		assert_eq!(columns.len(), 4);
817		assert_eq!(columns.shape(), (1, 4));
818
819		assert_eq!(columns.column("date_col").unwrap().data().get_value(0), Value::Date(date));
820		assert_eq!(columns.column("datetime_col").unwrap().data().get_value(0), Value::DateTime(datetime));
821		assert_eq!(columns.column("time_col").unwrap().data().get_value(0), Value::Time(time));
822		assert_eq!(columns.column("interval_col").unwrap().data().get_value(0), Value::Duration(duration));
823	}
824
825	#[test]
826	fn test_single_row_mixed_types() {
827		let date = Date::from_ymd(2025, 7, 15).unwrap();
828		let time = Time::from_hms(9, 15, 30).unwrap();
829
830		let columns = Columns::single_row([
831			("bool_col", Value::Boolean(true)),
832			("int_col", Value::Int4(42)),
833			("str_col", Value::Utf8("hello".to_string())),
834			("date_col", Value::Date(date.clone())),
835			("time_col", Value::Time(time.clone())),
836			("none_col", Value::none()),
837		]);
838
839		assert_eq!(columns.len(), 6);
840		assert_eq!(columns.shape(), (1, 6));
841
842		assert_eq!(columns.column("bool_col").unwrap().data().get_value(0), Value::Boolean(true));
843		assert_eq!(columns.column("int_col").unwrap().data().get_value(0), Value::Int4(42));
844		assert_eq!(columns.column("str_col").unwrap().data().get_value(0), Value::Utf8("hello".to_string()));
845		assert_eq!(columns.column("date_col").unwrap().data().get_value(0), Value::Date(date));
846		assert_eq!(columns.column("time_col").unwrap().data().get_value(0), Value::Time(time));
847		assert_eq!(columns.column("none_col").unwrap().data().get_value(0), Value::none());
848	}
849
850	#[test]
851	fn test_single_row_normal_column_names_work() {
852		let columns = Columns::single_row([("normal_column", Value::Int4(42))]);
853		assert_eq!(columns.len(), 1);
854		assert_eq!(columns.column("normal_column").unwrap().data().get_value(0), Value::Int4(42));
855	}
856}