Skip to main content

reifydb_core/value/column/
columns.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	hash::Hash,
6	mem,
7	ops::{Index, IndexMut},
8};
9
10use indexmap::IndexMap;
11use reifydb_type::{
12	fragment::Fragment,
13	util::cowvec::CowVec,
14	value::{Value, constraint::Constraint, datetime::DateTime, row_number::RowNumber, r#type::Type},
15};
16use serde::{Deserialize, Serialize};
17
18use crate::{
19	encoded::shape::{RowShape, RowShapeField},
20	interface::catalog::column::Column as CatalogColumn,
21	row::Row,
22	value::column::{
23		ColumnBuffer, ColumnWithName, array::Column, buffer::pool::ColumnBufferPool, headers::ColumnHeaders,
24	},
25};
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct Columns {
29	pub row_numbers: CowVec<RowNumber>,
30	pub created_at: CowVec<DateTime>,
31	pub updated_at: CowVec<DateTime>,
32	pub columns: CowVec<ColumnBuffer>,
33	pub names: CowVec<Fragment>,
34}
35
36#[derive(Debug, Clone, Copy)]
37pub struct ColumnRef<'a> {
38	name: &'a Fragment,
39	data: &'a ColumnBuffer,
40}
41
42impl Index<usize> for Columns {
43	type Output = ColumnBuffer;
44
45	fn index(&self, index: usize) -> &Self::Output {
46		&self.columns[index]
47	}
48}
49
50impl IndexMut<usize> for Columns {
51	fn index_mut(&mut self, index: usize) -> &mut Self::Output {
52		&mut self.columns.make_mut()[index]
53	}
54}
55
56impl<'a> ColumnRef<'a> {
57	pub fn new(name: &'a Fragment, data: &'a ColumnBuffer) -> Self {
58		Self {
59			name,
60			data,
61		}
62	}
63
64	pub fn name(&self) -> &'a Fragment {
65		self.name
66	}
67
68	pub fn data(&self) -> &'a ColumnBuffer {
69		self.data
70	}
71
72	pub fn get_type(&self) -> Type {
73		self.data.get_type()
74	}
75
76	pub fn column(&self) -> Column {
77		Column::from_column_buffer(self.data.clone())
78	}
79
80	pub fn with_new_data(&self, data: ColumnBuffer) -> ColumnWithName {
81		ColumnWithName::new(self.name.clone(), data)
82	}
83}
84
85fn value_to_buffer(value: Value) -> ColumnBuffer {
86	match value {
87		Value::None {
88			..
89		} => ColumnBuffer::none_typed(Type::Boolean, 1),
90		Value::Boolean(v) => ColumnBuffer::bool([v]),
91		Value::Float4(v) => ColumnBuffer::float4([v.into()]),
92		Value::Float8(v) => ColumnBuffer::float8([v.into()]),
93		Value::Int1(v) => ColumnBuffer::int1([v]),
94		Value::Int2(v) => ColumnBuffer::int2([v]),
95		Value::Int4(v) => ColumnBuffer::int4([v]),
96		Value::Int8(v) => ColumnBuffer::int8([v]),
97		Value::Int16(v) => ColumnBuffer::int16([v]),
98		Value::Utf8(v) => ColumnBuffer::utf8([v]),
99		Value::Uint1(v) => ColumnBuffer::uint1([v]),
100		Value::Uint2(v) => ColumnBuffer::uint2([v]),
101		Value::Uint4(v) => ColumnBuffer::uint4([v]),
102		Value::Uint8(v) => ColumnBuffer::uint8([v]),
103		Value::Uint16(v) => ColumnBuffer::uint16([v]),
104		Value::Date(v) => ColumnBuffer::date([v]),
105		Value::DateTime(v) => ColumnBuffer::datetime([v]),
106		Value::Time(v) => ColumnBuffer::time([v]),
107		Value::Duration(v) => ColumnBuffer::duration([v]),
108		Value::IdentityId(v) => ColumnBuffer::identity_id([v]),
109		Value::Uuid4(v) => ColumnBuffer::uuid4([v]),
110		Value::Uuid7(v) => ColumnBuffer::uuid7([v]),
111		Value::Blob(v) => ColumnBuffer::blob([v]),
112		Value::Int(v) => ColumnBuffer::int(vec![v]),
113		Value::Uint(v) => ColumnBuffer::uint(vec![v]),
114		Value::Decimal(v) => ColumnBuffer::decimal(vec![v]),
115		Value::DictionaryId(v) => ColumnBuffer::dictionary_id(vec![v]),
116		Value::Any(v) => ColumnBuffer::any(vec![v]),
117		Value::Type(v) => ColumnBuffer::any(vec![Box::new(Value::Type(v))]),
118		Value::List(v) => ColumnBuffer::any(vec![Box::new(Value::List(v))]),
119		Value::Record(v) => ColumnBuffer::any(vec![Box::new(Value::Record(v))]),
120		Value::Tuple(v) => ColumnBuffer::any(vec![Box::new(Value::Tuple(v))]),
121	}
122}
123
124impl Columns {
125	/// Extract the single value from a 1-column, 1-row Columns.
126	/// Panics if the Columns does not have exactly 1 column and 1 row.
127	pub fn scalar_value(&self) -> Value {
128		debug_assert_eq!(self.len(), 1, "scalar_value() requires exactly 1 column, got {}", self.len());
129		debug_assert_eq!(
130			self.row_count(),
131			1,
132			"scalar_value() requires exactly 1 row, got {}",
133			self.row_count()
134		);
135		self.columns[0].get_value(0)
136	}
137
138	pub fn new(columns: Vec<ColumnWithName>) -> Self {
139		let n = columns.first().map_or(0, |c| c.data.len());
140		assert!(columns.iter().all(|c| c.data.len() == n));
141
142		let mut names = Vec::with_capacity(columns.len());
143		let mut buffers = Vec::with_capacity(columns.len());
144		for c in columns {
145			names.push(c.name);
146			buffers.push(c.data);
147		}
148
149		Self {
150			row_numbers: CowVec::new(Vec::new()),
151			created_at: CowVec::new(Vec::new()),
152			updated_at: CowVec::new(Vec::new()),
153			columns: CowVec::new(buffers),
154			names: CowVec::new(names),
155		}
156	}
157
158	pub fn with_system_columns(
159		columns: Vec<ColumnWithName>,
160		row_numbers: Vec<RowNumber>,
161		created_at: Vec<DateTime>,
162		updated_at: Vec<DateTime>,
163	) -> Self {
164		let n = columns.first().map_or(0, |c| c.data.len());
165		assert!(columns.iter().all(|c| c.data.len() == n));
166		assert_eq!(row_numbers.len(), n, "row_numbers length must match column data length");
167		assert_eq!(created_at.len(), n, "created_at length must match column data length");
168		assert_eq!(updated_at.len(), n, "updated_at length must match column data length");
169
170		let mut names = Vec::with_capacity(columns.len());
171		let mut buffers = Vec::with_capacity(columns.len());
172		for c in columns {
173			names.push(c.name);
174			buffers.push(c.data);
175		}
176
177		Self {
178			row_numbers: CowVec::new(row_numbers),
179			created_at: CowVec::new(created_at),
180			updated_at: CowVec::new(updated_at),
181			columns: CowVec::new(buffers),
182			names: CowVec::new(names),
183		}
184	}
185
186	pub fn single_row<'b>(rows: impl IntoIterator<Item = (&'b str, Value)>) -> Columns {
187		let mut names = Vec::new();
188		let mut buffers = Vec::new();
189		for (name, value) in rows {
190			names.push(Fragment::internal(name));
191			buffers.push(value_to_buffer(value));
192		}
193		Self {
194			row_numbers: CowVec::new(Vec::new()),
195			created_at: CowVec::new(Vec::new()),
196			updated_at: CowVec::new(Vec::new()),
197			columns: CowVec::new(buffers),
198			names: CowVec::new(names),
199		}
200	}
201
202	pub fn with_row_numbers(mut self, row_numbers: Vec<RowNumber>) -> Self {
203		let n = row_numbers.len();
204		self.row_numbers = CowVec::new(row_numbers);
205		if self.created_at.len() != n {
206			let now = DateTime::default();
207			self.created_at = CowVec::new(vec![now; n]);
208			self.updated_at = CowVec::new(vec![now; n]);
209		}
210		self
211	}
212
213	pub fn from_catalog_columns(cols: &[CatalogColumn]) -> Self {
214		let mut names = Vec::with_capacity(cols.len());
215		let mut buffers = Vec::with_capacity(cols.len());
216		for col in cols {
217			names.push(Fragment::internal(&col.name));
218			buffers.push(ColumnBuffer::with_capacity(col.constraint.get_type(), 0));
219		}
220		Self {
221			row_numbers: CowVec::new(Vec::new()),
222			created_at: CowVec::new(Vec::new()),
223			updated_at: CowVec::new(Vec::new()),
224			columns: CowVec::new(buffers),
225			names: CowVec::new(names),
226		}
227	}
228
229	pub fn apply_headers(&mut self, headers: &ColumnHeaders) {
230		let n = self.len();
231		let names = self.names.make_mut();
232		for (i, name) in headers.columns.iter().enumerate() {
233			if i < n {
234				names[i] = name.clone();
235			}
236		}
237	}
238}
239
240impl Columns {
241	/// Get the row number (for single-row Columns). Panics if Columns has 0 or multiple rows.
242	pub fn number(&self) -> RowNumber {
243		assert_eq!(self.row_count(), 1, "number() requires exactly 1 row, got {}", self.row_count());
244		if self.row_numbers.is_empty() {
245			RowNumber(0)
246		} else {
247			self.row_numbers[0]
248		}
249	}
250
251	pub fn shape(&self) -> (usize, usize) {
252		let row_count = if !self.row_numbers.is_empty() {
253			self.row_numbers.len()
254		} else {
255			self.columns.first().map(|c| c.len()).unwrap_or(0)
256		};
257		(row_count, self.len())
258	}
259
260	pub fn len(&self) -> usize {
261		self.columns.len()
262	}
263
264	pub fn is_empty(&self) -> bool {
265		self.columns.is_empty()
266	}
267
268	pub fn iter(&self) -> impl Iterator<Item = ColumnRef<'_>> + '_ {
269		self.names.iter().zip(self.columns.iter()).map(|(n, d)| ColumnRef::new(n, d))
270	}
271
272	pub fn first(&self) -> Option<ColumnRef<'_>> {
273		self.get(0)
274	}
275
276	pub fn last(&self) -> Option<ColumnRef<'_>> {
277		let n = self.len();
278		if n == 0 {
279			None
280		} else {
281			self.get(n - 1)
282		}
283	}
284
285	pub fn get(&self, index: usize) -> Option<ColumnRef<'_>> {
286		if index < self.len() {
287			Some(ColumnRef::new(&self.names[index], &self.columns[index]))
288		} else {
289			None
290		}
291	}
292
293	pub fn name_at(&self, index: usize) -> &Fragment {
294		&self.names[index]
295	}
296
297	pub fn data_at(&self, index: usize) -> &ColumnBuffer {
298		&self.columns[index]
299	}
300
301	pub fn data_at_mut(&mut self, index: usize) -> &mut ColumnBuffer {
302		&mut self.columns.make_mut()[index]
303	}
304
305	pub fn row(&self, i: usize) -> Vec<Value> {
306		self.columns.iter().map(|c| c.get_value(i)).collect()
307	}
308
309	pub fn column(&self, name: &str) -> Option<ColumnRef<'_>> {
310		self.names.iter().position(|n| n.text() == name).and_then(|i| self.get(i))
311	}
312
313	pub fn row_count(&self) -> usize {
314		if !self.row_numbers.is_empty() {
315			self.row_numbers.len()
316		} else {
317			self.columns.first().map_or(0, |col| col.len())
318		}
319	}
320
321	pub fn has_rows(&self) -> bool {
322		self.row_count() > 0
323	}
324
325	pub fn is_scalar(&self) -> bool {
326		self.len() == 1 && self.row_count() == 1
327	}
328
329	pub fn get_row(&self, index: usize) -> Vec<Value> {
330		self.columns.iter().map(|col| col.get_value(index)).collect()
331	}
332}
333
334impl Columns {
335	pub fn from_rows(names: &[&str], result_rows: &[Vec<Value>]) -> Self {
336		let column_count = names.len();
337
338		let mut name_vec: Vec<Fragment> = names.iter().map(Fragment::internal).collect();
339		let mut buffers: Vec<ColumnBuffer> =
340			(0..column_count).map(|_| ColumnBuffer::none_typed(Type::Boolean, 0)).collect();
341
342		for row in result_rows {
343			assert_eq!(row.len(), column_count, "row length does not match column count");
344			for (i, value) in row.iter().enumerate() {
345				buffers[i].push_value(value.clone());
346			}
347		}
348
349		let _ = &mut name_vec;
350		Self {
351			row_numbers: CowVec::new(Vec::new()),
352			created_at: CowVec::new(Vec::new()),
353			updated_at: CowVec::new(Vec::new()),
354			columns: CowVec::new(buffers),
355			names: CowVec::new(name_vec),
356		}
357	}
358}
359
360impl Columns {
361	pub fn empty() -> Self {
362		Self {
363			row_numbers: CowVec::with_capacity(1),
364			created_at: CowVec::with_capacity(1),
365			updated_at: CowVec::with_capacity(1),
366			columns: CowVec::with_capacity(16),
367			names: CowVec::with_capacity(16),
368		}
369	}
370}
371
372impl Default for Columns {
373	/// Equivalent to [`Columns::empty`]. Required by `core::util::slab::Slab<T>`
374	/// which mints fresh slabs via `T::default()`.
375	fn default() -> Self {
376		Self::empty()
377	}
378}
379
380impl Columns {
381	/// Extract a subset of rows by indices, returning a new Columns
382	pub fn extract_by_indices(&self, indices: &[usize]) -> Columns {
383		if indices.is_empty() {
384			return Columns::empty();
385		}
386
387		let mut new_buffers: Vec<ColumnBuffer> = Vec::with_capacity(self.columns.len());
388		for col in self.columns.iter() {
389			let mut new_data = ColumnBuffer::with_capacity(col.get_type(), indices.len());
390			for &idx in indices {
391				new_data.push_value(col.get_value(idx));
392			}
393			new_buffers.push(new_data);
394		}
395
396		let new_row_numbers: Vec<RowNumber> = if self.row_numbers.is_empty() {
397			Vec::new()
398		} else {
399			indices.iter().map(|&i| self.row_numbers[i]).collect()
400		};
401		let new_created_at: Vec<DateTime> = if self.created_at.is_empty() {
402			Vec::new()
403		} else {
404			indices.iter().map(|&i| self.created_at[i]).collect()
405		};
406		let new_updated_at: Vec<DateTime> = if self.updated_at.is_empty() {
407			Vec::new()
408		} else {
409			indices.iter().map(|&i| self.updated_at[i]).collect()
410		};
411		Columns {
412			row_numbers: CowVec::new(new_row_numbers),
413			created_at: CowVec::new(new_created_at),
414			updated_at: CowVec::new(new_updated_at),
415			columns: CowVec::new(new_buffers),
416			names: self.names.clone(),
417		}
418	}
419
420	/// Extract a single row by index, returning a new Columns with 1 row
421	pub fn extract_row(&self, index: usize) -> Columns {
422		self.extract_by_indices(&[index])
423	}
424
425	/// Append rows from `source` at the given `indices` to `self`.
426	///
427	/// If `self` is empty (no columns), it is initialized to match the shape
428	/// of `source` and populated from the selected indices. Otherwise the
429	/// per-column data, row_numbers, created_at, and updated_at are extended
430	/// in place.
431	///
432	/// Panics if `self` and `source` have different column counts (only
433	/// checked when `self` is non-empty).
434	pub fn append_rows_by_indices(&mut self, source: &Columns, indices: &[usize]) {
435		if indices.is_empty() {
436			return;
437		}
438
439		if self.columns.is_empty() {
440			*self = source.extract_by_indices(indices);
441			return;
442		}
443
444		assert_eq!(
445			self.columns.len(),
446			source.columns.len(),
447			"append_rows: column count mismatch (self={}, source={})",
448			self.columns.len(),
449			source.columns.len(),
450		);
451
452		let self_cols = self.columns.make_mut();
453		for (i, src_col) in source.columns.iter().enumerate() {
454			for &idx in indices {
455				self_cols[i].push_value(src_col.get_value(idx));
456			}
457		}
458
459		if !source.row_numbers.is_empty() {
460			let rns = self.row_numbers.make_mut();
461			for &idx in indices {
462				rns.push(source.row_numbers[idx]);
463			}
464		}
465		if !source.created_at.is_empty() {
466			let cr = self.created_at.make_mut();
467			for &idx in indices {
468				cr.push(source.created_at[idx]);
469			}
470		}
471		if !source.updated_at.is_empty() {
472			let up = self.updated_at.make_mut();
473			for &idx in indices {
474				up.push(source.updated_at[idx]);
475			}
476		}
477	}
478
479	/// Remove the row whose row_number equals `row_number`, if present.
480	/// Returns true if a row was removed.
481	pub fn remove_row(&mut self, row_number: RowNumber) -> bool {
482		let pos = self.row_numbers.iter().position(|&r| r == row_number);
483		let Some(idx) = pos else {
484			return false;
485		};
486
487		let kept_indices: Vec<usize> = (0..self.row_count()).filter(|&i| i != idx).collect();
488		*self = self.extract_by_indices(&kept_indices);
489		true
490	}
491
492	/// Project to a subset of columns by name, preserving the order of the provided names.
493	/// Columns not found in self are silently skipped.
494	pub fn project_by_names(&self, names: &[String]) -> Columns {
495		let mut new_names = Vec::new();
496		let mut new_buffers = Vec::new();
497
498		for name in names {
499			if let Some(pos) = self.names.iter().position(|n| n.text() == name.as_str()) {
500				new_names.push(self.names[pos].clone());
501				new_buffers.push(self.columns[pos].clone());
502			}
503		}
504
505		if new_buffers.is_empty() {
506			return Columns::empty();
507		}
508
509		Columns {
510			row_numbers: self.row_numbers.clone(),
511			created_at: self.created_at.clone(),
512			updated_at: self.updated_at.clone(),
513			columns: CowVec::new(new_buffers),
514			names: CowVec::new(new_names),
515		}
516	}
517
518	/// Partition Columns into groups based on keys (one key per row).
519	/// Returns an IndexMap preserving insertion order of first occurrence.
520	pub fn partition_by_keys<K: Hash + Eq + Clone>(&self, keys: &[K]) -> IndexMap<K, Columns> {
521		assert_eq!(keys.len(), self.row_count(), "keys length must match row count");
522
523		let mut key_to_indices: IndexMap<K, Vec<usize>> = IndexMap::new();
524		for (idx, key) in keys.iter().enumerate() {
525			key_to_indices.entry(key.clone()).or_default().push(idx);
526		}
527
528		key_to_indices.into_iter().map(|(key, indices)| (key, self.extract_by_indices(&indices))).collect()
529	}
530
531	/// Create Columns from a Row by decoding its encoded values.
532	///
533	/// Allocates fresh `CowVec` storage. For hot paths that repeatedly
534	/// rebuild a `Columns` per row (e.g. `CdcProducerActor`), prefer
535	/// `reset_from_row` on a reusable `Columns` slab to retain the inner
536	/// `Vec` capacities across calls.
537	pub fn from_row(row: &Row) -> Self {
538		let mut out = Columns::empty();
539		out.reset_from_row(row);
540		out
541	}
542
543	/// Refill this `Columns` from a single row, retaining inner `Vec`
544	/// capacities when this slab is uniquely owned. When shared (e.g. a
545	/// previous `Diff` is still in flight referencing one of the inner
546	/// `CowVec`s), `Arc::make_mut` forks to a fresh capacity-preserving
547	/// copy.
548	pub fn reset_from_row(&mut self, row: &Row) {
549		let field_count = row.shape.fields().len();
550
551		self.row_numbers.clear();
552		self.created_at.clear();
553		self.updated_at.clear();
554		self.columns.clear();
555		self.names.clear();
556
557		// Reserve up-front so per-field push() does not re-trigger
558		// RawVec::grow_one when the slab was created empty.
559		self.columns.make_mut().reserve(field_count);
560		self.names.make_mut().reserve(field_count);
561
562		self.row_numbers.push(row.number);
563		self.created_at.push(DateTime::from_nanos(row.encoded.created_at_nanos()));
564		self.updated_at.push(DateTime::from_nanos(row.encoded.updated_at_nanos()));
565
566		for (idx, field) in row.shape.fields().iter().enumerate() {
567			let value = row.shape.get_value(&row.encoded, idx);
568
569			let column_type = if matches!(value, Value::None { .. }) {
570				field.constraint.get_type()
571			} else {
572				value.get_type()
573			};
574
575			let mut data = if column_type.is_option() {
576				ColumnBuffer::none_typed(column_type.clone(), 0)
577			} else {
578				ColumnBuffer::with_capacity(column_type.clone(), 1)
579			};
580			data.push_value(value);
581
582			if column_type == Type::DictionaryId
583				&& let ColumnBuffer::DictionaryId(container) = &mut data
584				&& let Some(Constraint::Dictionary(dict_id, _)) = field.constraint.constraint()
585			{
586				container.set_dictionary_id(*dict_id);
587			}
588
589			let name = row.shape.get_field_name(idx).expect("RowShape missing name for field");
590
591			self.names.push(Fragment::internal(name));
592			self.columns.push(data);
593		}
594	}
595
596	/// Refill from a row, sourcing inner column buffers from a shared
597	/// `ColumnBufferPool`. Existing buffers whose `get_type()` matches
598	/// the field type are cleared in place; mismatches are released to
599	/// the pool and replaced with `pool.acquire(...)`. New columns
600	/// (slab last held a narrower row) come from `pool.acquire`. Excess
601	/// columns (slab last held a wider row) are released to the pool.
602	pub fn reset_from_row_with_pool(&mut self, row: &Row, pool: &ColumnBufferPool) {
603		let field_count = row.shape.fields().len();
604
605		self.row_numbers.clear();
606		self.created_at.clear();
607		self.updated_at.clear();
608		self.names.clear();
609
610		self.row_numbers.push(row.number);
611		self.created_at.push(DateTime::from_nanos(row.encoded.created_at_nanos()));
612		self.updated_at.push(DateTime::from_nanos(row.encoded.updated_at_nanos()));
613
614		let columns_vec = self.columns.make_mut();
615		let names_vec = self.names.make_mut();
616
617		// Drain extra columns back to the pool (slab last held a wider row).
618		while columns_vec.len() > field_count {
619			if let Some(buf) = columns_vec.pop() {
620				pool.release(buf);
621			}
622		}
623
624		columns_vec.reserve(field_count);
625		names_vec.reserve(field_count);
626
627		for (idx, field) in row.shape.fields().iter().enumerate() {
628			let value = row.shape.get_value(&row.encoded, idx);
629
630			let column_type = if matches!(value, Value::None { .. }) {
631				field.constraint.get_type()
632			} else {
633				value.get_type()
634			};
635
636			if idx < columns_vec.len() {
637				if columns_vec[idx].get_type() == column_type {
638					columns_vec[idx].clear();
639				} else {
640					let replacement = if column_type.is_option() {
641						// Option-wrapped buffers are never pooled; allocate fresh.
642						ColumnBuffer::none_typed(column_type.clone(), 0)
643					} else {
644						pool.acquire(&column_type, 1)
645					};
646					let old = mem::replace(&mut columns_vec[idx], replacement);
647					pool.release(old);
648				}
649			} else {
650				let fresh = if column_type.is_option() {
651					ColumnBuffer::none_typed(column_type.clone(), 0)
652				} else {
653					pool.acquire(&column_type, 1)
654				};
655				columns_vec.push(fresh);
656			}
657
658			columns_vec[idx].push_value(value);
659
660			if column_type == Type::DictionaryId
661				&& let ColumnBuffer::DictionaryId(container) = &mut columns_vec[idx]
662				&& let Some(Constraint::Dictionary(dict_id, _)) = field.constraint.constraint()
663			{
664				container.set_dictionary_id(*dict_id);
665			}
666
667			let name = row.shape.get_field_name(idx).expect("RowShape missing name for field");
668			names_vec.push(Fragment::internal(name));
669		}
670	}
671
672	/// Convert Columns back to a Row (assumes single row)
673	/// Panics if Columns contains more than 1 row
674	pub fn to_single_row(&self) -> Row {
675		assert_eq!(self.row_count(), 1, "to_row() requires exactly 1 row, got {}", self.row_count());
676		assert_eq!(
677			self.row_numbers.len(),
678			1,
679			"to_row() requires exactly 1 row number, got {}",
680			self.row_numbers.len()
681		);
682
683		let row_number = *self.row_numbers.first().unwrap();
684
685		let fields: Vec<RowShapeField> = self
686			.names
687			.iter()
688			.zip(self.columns.iter())
689			.map(|(name, data)| RowShapeField::unconstrained(name.text().to_string(), data.get_type()))
690			.collect();
691
692		let layout = RowShape::new(fields);
693		let mut encoded = layout.allocate();
694
695		let values: Vec<Value> = self.columns.iter().map(|col| col.get_value(0)).collect();
696		layout.set_values(&mut encoded, &values);
697
698		Row {
699			number: row_number,
700			encoded,
701			shape: layout,
702		}
703	}
704}
705
706#[cfg(test)]
707pub mod tests {
708	use reifydb_type::value::{date::Date, datetime::DateTime, duration::Duration, time::Time};
709
710	use super::*;
711
712	#[test]
713	fn test_single_row_temporal_types() {
714		let date = Date::from_ymd(2025, 1, 15).unwrap();
715		let datetime = DateTime::from_timestamp(1642694400).unwrap();
716		let time = Time::from_hms(14, 30, 45).unwrap();
717		let duration = Duration::from_days(30).unwrap();
718
719		let columns = Columns::single_row([
720			("date_col", Value::Date(date.clone())),
721			("datetime_col", Value::DateTime(datetime.clone())),
722			("time_col", Value::Time(time.clone())),
723			("interval_col", Value::Duration(duration.clone())),
724		]);
725
726		assert_eq!(columns.len(), 4);
727		assert_eq!(columns.shape(), (1, 4));
728
729		assert_eq!(columns.column("date_col").unwrap().data().get_value(0), Value::Date(date));
730		assert_eq!(columns.column("datetime_col").unwrap().data().get_value(0), Value::DateTime(datetime));
731		assert_eq!(columns.column("time_col").unwrap().data().get_value(0), Value::Time(time));
732		assert_eq!(columns.column("interval_col").unwrap().data().get_value(0), Value::Duration(duration));
733	}
734
735	#[test]
736	fn test_single_row_mixed_types() {
737		let date = Date::from_ymd(2025, 7, 15).unwrap();
738		let time = Time::from_hms(9, 15, 30).unwrap();
739
740		let columns = Columns::single_row([
741			("bool_col", Value::Boolean(true)),
742			("int_col", Value::Int4(42)),
743			("str_col", Value::Utf8("hello".to_string())),
744			("date_col", Value::Date(date.clone())),
745			("time_col", Value::Time(time.clone())),
746			("none_col", Value::none()),
747		]);
748
749		assert_eq!(columns.len(), 6);
750		assert_eq!(columns.shape(), (1, 6));
751
752		assert_eq!(columns.column("bool_col").unwrap().data().get_value(0), Value::Boolean(true));
753		assert_eq!(columns.column("int_col").unwrap().data().get_value(0), Value::Int4(42));
754		assert_eq!(columns.column("str_col").unwrap().data().get_value(0), Value::Utf8("hello".to_string()));
755		assert_eq!(columns.column("date_col").unwrap().data().get_value(0), Value::Date(date));
756		assert_eq!(columns.column("time_col").unwrap().data().get_value(0), Value::Time(time));
757		assert_eq!(columns.column("none_col").unwrap().data().get_value(0), Value::none());
758	}
759
760	#[test]
761	fn test_single_row_normal_column_names_work() {
762		let columns = Columns::single_row([("normal_column", Value::Int4(42))]);
763		assert_eq!(columns.len(), 1);
764		assert_eq!(columns.column("normal_column").unwrap().data().get_value(0), Value::Int4(42));
765	}
766}