Skip to main content

reifydb_core/value/column/
columns.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	collections::HashMap,
6	hash::Hash,
7	mem,
8	ops::{Deref, Index, IndexMut},
9};
10
11use indexmap::IndexMap;
12use reifydb_type::{
13	Result,
14	fragment::Fragment,
15	util::cowvec::CowVec,
16	value::{Value, constraint::Constraint, datetime::DateTime, row_number::RowNumber, r#type::Type},
17};
18
19use crate::{
20	encoded::shape::{RowShape, RowShapeField},
21	interface::{
22		catalog::{table::Table, view::View},
23		resolved::{ResolvedRingBuffer, ResolvedTable, ResolvedView},
24	},
25	row::Row,
26	value::column::{Column, ColumnData, headers::ColumnHeaders},
27};
28
29#[derive(Debug, Clone)]
30pub struct Columns {
31	pub row_numbers: CowVec<RowNumber>,
32	pub created_at: CowVec<DateTime>,
33	pub updated_at: CowVec<DateTime>,
34	pub columns: CowVec<Column>,
35}
36
37impl Deref for Columns {
38	type Target = [Column];
39
40	fn deref(&self) -> &Self::Target {
41		self.columns.deref()
42	}
43}
44
45impl Index<usize> for Columns {
46	type Output = Column;
47
48	fn index(&self, index: usize) -> &Self::Output {
49		self.columns.index(index)
50	}
51}
52
53impl IndexMut<usize> for Columns {
54	fn index_mut(&mut self, index: usize) -> &mut Self::Output {
55		&mut self.columns.make_mut()[index]
56	}
57}
58
59impl Columns {
60	/// Create a 1-column, 1-row Columns from a single Value.
61	/// Used to store scalar values inside `Variable::Scalar(Columns)`.
62	pub fn scalar(value: Value) -> Self {
63		let data = match value {
64			Value::None {
65				..
66			} => ColumnData::none_typed(Type::Boolean, 1),
67			Value::Boolean(v) => ColumnData::bool([v]),
68			Value::Float4(v) => ColumnData::float4([v.into()]),
69			Value::Float8(v) => ColumnData::float8([v.into()]),
70			Value::Int1(v) => ColumnData::int1([v]),
71			Value::Int2(v) => ColumnData::int2([v]),
72			Value::Int4(v) => ColumnData::int4([v]),
73			Value::Int8(v) => ColumnData::int8([v]),
74			Value::Int16(v) => ColumnData::int16([v]),
75			Value::Utf8(v) => ColumnData::utf8([v]),
76			Value::Uint1(v) => ColumnData::uint1([v]),
77			Value::Uint2(v) => ColumnData::uint2([v]),
78			Value::Uint4(v) => ColumnData::uint4([v]),
79			Value::Uint8(v) => ColumnData::uint8([v]),
80			Value::Uint16(v) => ColumnData::uint16([v]),
81			Value::Date(v) => ColumnData::date([v]),
82			Value::DateTime(v) => ColumnData::datetime([v]),
83			Value::Time(v) => ColumnData::time([v]),
84			Value::Duration(v) => ColumnData::duration([v]),
85			Value::IdentityId(v) => ColumnData::identity_id([v]),
86			Value::Uuid4(v) => ColumnData::uuid4([v]),
87			Value::Uuid7(v) => ColumnData::uuid7([v]),
88			Value::Blob(v) => ColumnData::blob([v]),
89			Value::Int(v) => ColumnData::int(vec![v]),
90			Value::Uint(v) => ColumnData::uint(vec![v]),
91			Value::Decimal(v) => ColumnData::decimal(vec![v]),
92			Value::DictionaryId(v) => ColumnData::dictionary_id(vec![v]),
93			Value::Any(v) => ColumnData::any(vec![v]),
94			Value::Type(v) => ColumnData::any(vec![Box::new(Value::Type(v))]),
95			Value::List(v) => ColumnData::any(vec![Box::new(Value::List(v))]),
96			Value::Record(v) => ColumnData::any(vec![Box::new(Value::Record(v))]),
97			Value::Tuple(v) => ColumnData::any(vec![Box::new(Value::Tuple(v))]),
98		};
99		let column = Column {
100			name: Fragment::internal("value"),
101			data,
102		};
103		Self {
104			row_numbers: CowVec::new(Vec::new()),
105			created_at: CowVec::new(Vec::new()),
106			updated_at: CowVec::new(Vec::new()),
107			columns: CowVec::new(vec![column]),
108		}
109	}
110
111	/// Extract the single value from a 1-column, 1-row Columns.
112	/// Panics if the Columns does not have exactly 1 column and 1 row.
113	pub fn scalar_value(&self) -> Value {
114		debug_assert_eq!(self.len(), 1, "scalar_value() requires exactly 1 column, got {}", self.len());
115		debug_assert_eq!(
116			self.row_count(),
117			1,
118			"scalar_value() requires exactly 1 row, got {}",
119			self.row_count()
120		);
121		self.columns[0].data().get_value(0)
122	}
123
124	pub fn new(columns: Vec<Column>) -> Self {
125		let n = columns.first().map_or(0, |c| c.data().len());
126		assert!(columns.iter().all(|c| c.data().len() == n));
127
128		Self {
129			row_numbers: CowVec::new(Vec::new()),
130			created_at: CowVec::new(Vec::new()),
131			updated_at: CowVec::new(Vec::new()),
132			columns: CowVec::new(columns),
133		}
134	}
135
136	pub fn with_system_columns(
137		columns: Vec<Column>,
138		row_numbers: Vec<RowNumber>,
139		created_at: Vec<DateTime>,
140		updated_at: Vec<DateTime>,
141	) -> Self {
142		let n = columns.first().map_or(0, |c| c.data().len());
143		assert!(columns.iter().all(|c| c.data().len() == n));
144		assert_eq!(row_numbers.len(), n, "row_numbers length must match column data length");
145		assert_eq!(created_at.len(), n, "created_at length must match column data length");
146		assert_eq!(updated_at.len(), n, "updated_at length must match column data length");
147
148		Self {
149			row_numbers: CowVec::new(row_numbers),
150			created_at: CowVec::new(created_at),
151			updated_at: CowVec::new(updated_at),
152			columns: CowVec::new(columns),
153		}
154	}
155
156	pub fn single_row<'b>(rows: impl IntoIterator<Item = (&'b str, Value)>) -> Columns {
157		let mut columns = Vec::new();
158		let mut index = HashMap::new();
159
160		for (idx, (name, value)) in rows.into_iter().enumerate() {
161			let data = match value {
162				Value::None {
163					..
164				} => ColumnData::none_typed(Type::Boolean, 1),
165				Value::Boolean(v) => ColumnData::bool([v]),
166				Value::Float4(v) => ColumnData::float4([v.into()]),
167				Value::Float8(v) => ColumnData::float8([v.into()]),
168				Value::Int1(v) => ColumnData::int1([v]),
169				Value::Int2(v) => ColumnData::int2([v]),
170				Value::Int4(v) => ColumnData::int4([v]),
171				Value::Int8(v) => ColumnData::int8([v]),
172				Value::Int16(v) => ColumnData::int16([v]),
173				Value::Utf8(v) => ColumnData::utf8([v.clone()]),
174				Value::Uint1(v) => ColumnData::uint1([v]),
175				Value::Uint2(v) => ColumnData::uint2([v]),
176				Value::Uint4(v) => ColumnData::uint4([v]),
177				Value::Uint8(v) => ColumnData::uint8([v]),
178				Value::Uint16(v) => ColumnData::uint16([v]),
179				Value::Date(v) => ColumnData::date([v]),
180				Value::DateTime(v) => ColumnData::datetime([v]),
181				Value::Time(v) => ColumnData::time([v]),
182				Value::Duration(v) => ColumnData::duration([v]),
183				Value::IdentityId(v) => ColumnData::identity_id([v]),
184				Value::Uuid4(v) => ColumnData::uuid4([v]),
185				Value::Uuid7(v) => ColumnData::uuid7([v]),
186				Value::Blob(v) => ColumnData::blob([v.clone()]),
187				Value::Int(v) => ColumnData::int(vec![v]),
188				Value::Uint(v) => ColumnData::uint(vec![v]),
189				Value::Decimal(v) => ColumnData::decimal(vec![v]),
190				Value::DictionaryId(v) => ColumnData::dictionary_id(vec![v]),
191				Value::Type(t) => ColumnData::any(vec![Box::new(Value::Type(t))]),
192				Value::Any(v) => ColumnData::any(vec![v]),
193				Value::List(v) => ColumnData::any(vec![Box::new(Value::List(v))]),
194				Value::Record(v) => ColumnData::any(vec![Box::new(Value::Record(v))]),
195				Value::Tuple(v) => ColumnData::any(vec![Box::new(Value::Tuple(v))]),
196			};
197
198			let column = Column {
199				name: Fragment::internal(name.to_string()),
200				data,
201			};
202			index.insert(name, idx);
203			columns.push(column);
204		}
205
206		Self {
207			row_numbers: CowVec::new(Vec::new()),
208			created_at: CowVec::new(Vec::new()),
209			updated_at: CowVec::new(Vec::new()),
210			columns: CowVec::new(columns),
211		}
212	}
213
214	pub fn apply_headers(&mut self, headers: &ColumnHeaders) {
215		// Apply the column names from headers to this Columns instance
216		for (i, name) in headers.columns.iter().enumerate() {
217			if i < self.len() {
218				let column = &mut self[i];
219				let data = mem::replace(column.data_mut(), ColumnData::none_typed(Type::Boolean, 0));
220
221				*column = Column {
222					name: name.clone(),
223					data,
224				};
225			}
226		}
227	}
228}
229
230impl Columns {
231	/// Get the row number (for single-row Columns). Panics if Columns has 0 or multiple rows.
232	pub fn number(&self) -> RowNumber {
233		assert_eq!(self.row_count(), 1, "number() requires exactly 1 row, got {}", self.row_count());
234		if self.row_numbers.is_empty() {
235			RowNumber(0)
236		} else {
237			self.row_numbers[0]
238		}
239	}
240
241	pub fn shape(&self) -> (usize, usize) {
242		let row_count = if !self.row_numbers.is_empty() {
243			self.row_numbers.len()
244		} else {
245			self.first().map(|c| c.data().len()).unwrap_or(0)
246		};
247		(row_count, self.len())
248	}
249
250	pub fn is_empty(&self) -> bool {
251		self.shape().0 == 0
252	}
253
254	pub fn row(&self, i: usize) -> Vec<Value> {
255		self.iter().map(|c| c.data().get_value(i)).collect()
256	}
257
258	pub fn column(&self, name: &str) -> Option<&Column> {
259		self.iter().find(|col| col.name().text() == name)
260	}
261
262	pub fn row_count(&self) -> usize {
263		if !self.row_numbers.is_empty() {
264			self.row_numbers.len()
265		} else {
266			self.first().map_or(0, |col| col.data().len())
267		}
268	}
269
270	pub fn is_scalar(&self) -> bool {
271		self.len() == 1 && self.row_count() == 1
272	}
273
274	pub fn get_row(&self, index: usize) -> Vec<Value> {
275		self.iter().map(|col| col.data().get_value(index)).collect()
276	}
277}
278
279impl IntoIterator for Columns {
280	type Item = Column;
281	type IntoIter = std::vec::IntoIter<Column>;
282
283	fn into_iter(self) -> Self::IntoIter {
284		self.columns.into_iter()
285	}
286}
287
288impl Column {
289	pub fn extend(&mut self, other: Column) -> Result<()> {
290		self.data_mut().extend(other.data().clone())
291	}
292}
293
294impl Columns {
295	pub fn from_rows(names: &[&str], result_rows: &[Vec<Value>]) -> Self {
296		let column_count = names.len();
297
298		let mut columns: Vec<Column> = names
299			.iter()
300			.map(|name| Column {
301				name: Fragment::internal(name.to_string()),
302				data: ColumnData::none_typed(Type::Boolean, 0),
303			})
304			.collect();
305
306		for row in result_rows {
307			assert_eq!(row.len(), column_count, "row length does not match column count");
308			for (i, value) in row.iter().enumerate() {
309				columns[i].data_mut().push_value(value.clone());
310			}
311		}
312
313		Columns::new(columns)
314	}
315
316	pub fn from_rows_with_row_numbers(
317		names: &[&str],
318		result_rows: &[Vec<Value>],
319		row_numbers: Vec<RowNumber>,
320	) -> Self {
321		let column_count = names.len();
322
323		let mut columns: Vec<Column> = names
324			.iter()
325			.map(|name| Column {
326				name: Fragment::internal(name.to_string()),
327				data: ColumnData::none_typed(Type::Boolean, 0),
328			})
329			.collect();
330
331		for row in result_rows {
332			assert_eq!(row.len(), column_count, "row length does not match column count");
333			for (i, value) in row.iter().enumerate() {
334				columns[i].data_mut().push_value(value.clone());
335			}
336		}
337
338		let n = row_numbers.len();
339		let now = DateTime::default();
340		Columns::with_system_columns(columns, row_numbers, vec![now; n], vec![now; n])
341	}
342}
343
344impl Columns {
345	pub fn empty() -> Self {
346		Self {
347			row_numbers: CowVec::new(vec![]),
348			created_at: CowVec::new(vec![]),
349			updated_at: CowVec::new(vec![]),
350			columns: CowVec::new(vec![]),
351		}
352	}
353
354	pub fn from_resolved_table(table: &ResolvedTable) -> Self {
355		Self::from_table(table.def())
356	}
357
358	/// Create empty Columns (0 rows) with shape from a Table
359	pub fn from_table(table: &Table) -> Self {
360		let columns: Vec<Column> = table
361			.columns
362			.iter()
363			.map(|col| Column {
364				name: Fragment::internal(&col.name),
365				data: ColumnData::with_capacity(col.constraint.get_type(), 0),
366			})
367			.collect();
368
369		Self {
370			row_numbers: CowVec::new(Vec::new()),
371			created_at: CowVec::new(Vec::new()),
372			updated_at: CowVec::new(Vec::new()),
373			columns: CowVec::new(columns),
374		}
375	}
376
377	/// Create empty Columns (0 rows) with shape from a View
378	pub fn from_view(view: &View) -> Self {
379		let columns: Vec<Column> = view
380			.columns()
381			.iter()
382			.map(|col| Column {
383				name: Fragment::internal(&col.name),
384				data: ColumnData::with_capacity(col.constraint.get_type(), 0),
385			})
386			.collect();
387
388		Self {
389			row_numbers: CowVec::new(Vec::new()),
390			created_at: CowVec::new(Vec::new()),
391			updated_at: CowVec::new(Vec::new()),
392			columns: CowVec::new(columns),
393		}
394	}
395
396	pub fn from_ringbuffer(ringbuffer: &ResolvedRingBuffer) -> Self {
397		let _source = ringbuffer.clone();
398
399		let columns: Vec<Column> = ringbuffer
400			.columns()
401			.iter()
402			.map(|col| {
403				let column_ident = Fragment::internal(&col.name);
404				Column {
405					name: column_ident,
406					data: ColumnData::with_capacity(col.constraint.get_type(), 0),
407				}
408			})
409			.collect();
410
411		Self {
412			row_numbers: CowVec::new(Vec::new()),
413			created_at: CowVec::new(Vec::new()),
414			updated_at: CowVec::new(Vec::new()),
415			columns: CowVec::new(columns),
416		}
417	}
418
419	pub fn from_resolved_view(view: &ResolvedView) -> Self {
420		Self::from_view(view.def())
421	}
422}
423
424impl Columns {
425	/// Extract a subset of rows by indices, returning a new Columns
426	pub fn extract_by_indices(&self, indices: &[usize]) -> Columns {
427		if indices.is_empty() {
428			return Columns::empty();
429		}
430
431		let new_columns: Vec<Column> = self
432			.columns
433			.iter()
434			.map(|col| {
435				let mut new_data = ColumnData::with_capacity(col.data().get_type(), indices.len());
436				for &idx in indices {
437					new_data.push_value(col.data().get_value(idx));
438				}
439				Column {
440					name: col.name.clone(),
441					data: new_data,
442				}
443			})
444			.collect();
445
446		let new_row_numbers: Vec<RowNumber> = if self.row_numbers.is_empty() {
447			Vec::new()
448		} else {
449			indices.iter().map(|&i| self.row_numbers[i]).collect()
450		};
451		let new_created_at: Vec<DateTime> = if self.created_at.is_empty() {
452			Vec::new()
453		} else {
454			indices.iter().map(|&i| self.created_at[i]).collect()
455		};
456		let new_updated_at: Vec<DateTime> = if self.updated_at.is_empty() {
457			Vec::new()
458		} else {
459			indices.iter().map(|&i| self.updated_at[i]).collect()
460		};
461		Columns {
462			row_numbers: CowVec::new(new_row_numbers),
463			created_at: CowVec::new(new_created_at),
464			updated_at: CowVec::new(new_updated_at),
465			columns: CowVec::new(new_columns),
466		}
467	}
468
469	/// Extract a single row by index, returning a new Columns with 1 row
470	pub fn extract_row(&self, index: usize) -> Columns {
471		self.extract_by_indices(&[index])
472	}
473
474	/// Project to a subset of columns by name, preserving the order of the provided names.
475	/// Columns not found in self are silently skipped.
476	pub fn project_by_names(&self, names: &[String]) -> Columns {
477		let new_columns: Vec<Column> = names
478			.iter()
479			.filter_map(|name| self.columns.iter().find(|c| c.name().text() == name.as_str()).cloned())
480			.collect();
481
482		if new_columns.is_empty() {
483			return Columns::empty();
484		}
485
486		Columns {
487			row_numbers: self.row_numbers.clone(),
488			created_at: self.created_at.clone(),
489			updated_at: self.updated_at.clone(),
490			columns: CowVec::new(new_columns),
491		}
492	}
493
494	/// Partition Columns into groups based on keys (one key per row).
495	/// Returns an IndexMap preserving insertion order of first occurrence.
496	pub fn partition_by_keys<K: Hash + Eq + Clone>(&self, keys: &[K]) -> IndexMap<K, Columns> {
497		assert_eq!(keys.len(), self.row_count(), "keys length must match row count");
498
499		// Group indices by key
500		let mut key_to_indices: IndexMap<K, Vec<usize>> = IndexMap::new();
501		for (idx, key) in keys.iter().enumerate() {
502			key_to_indices.entry(key.clone()).or_default().push(idx);
503		}
504
505		// Convert to Columns
506		key_to_indices.into_iter().map(|(key, indices)| (key, self.extract_by_indices(&indices))).collect()
507	}
508
509	/// Create Columns from a Row by decoding its encoded values
510	pub fn from_row(row: &Row) -> Self {
511		let mut columns = Vec::new();
512
513		for (idx, field) in row.shape.fields().iter().enumerate() {
514			let value = row.shape.get_value(&row.encoded, idx);
515
516			// Use the field type for the column data, handling undefined values
517			let column_type = if matches!(value, Value::None { .. }) {
518				field.constraint.get_type()
519			} else {
520				value.get_type()
521			};
522
523			let mut data = if column_type.is_option() {
524				ColumnData::none_typed(column_type.clone(), 0)
525			} else {
526				ColumnData::with_capacity(column_type.clone(), 1)
527			};
528			data.push_value(value);
529
530			if column_type == Type::DictionaryId
531				&& let ColumnData::DictionaryId(container) = &mut data
532				&& let Some(Constraint::Dictionary(dict_id, _)) = field.constraint.constraint()
533			{
534				container.set_dictionary_id(*dict_id);
535			}
536
537			let name = row.shape.get_field_name(idx).expect("RowShape missing name for field");
538
539			columns.push(Column {
540				name: Fragment::internal(name),
541				data,
542			});
543		}
544
545		Self {
546			row_numbers: CowVec::new(vec![row.number]),
547			created_at: CowVec::new(vec![DateTime::from_nanos(row.encoded.created_at_nanos())]),
548			updated_at: CowVec::new(vec![DateTime::from_nanos(row.encoded.updated_at_nanos())]),
549			columns: CowVec::new(columns),
550		}
551	}
552
553	/// Convert Columns back to a Row (assumes single row)
554	/// Panics if Columns contains more than 1 row
555	pub fn to_single_row(&self) -> Row {
556		assert_eq!(self.row_count(), 1, "to_row() requires exactly 1 row, got {}", self.row_count());
557		assert_eq!(
558			self.row_numbers.len(),
559			1,
560			"to_row() requires exactly 1 row number, got {}",
561			self.row_numbers.len()
562		);
563
564		let row_number = *self.row_numbers.first().unwrap();
565
566		// Build shape fields for the layout
567		let fields: Vec<RowShapeField> = self
568			.columns
569			.iter()
570			.map(|col| RowShapeField::unconstrained(col.name().text().to_string(), col.data().get_type()))
571			.collect();
572
573		let layout = RowShape::new(fields);
574		let mut encoded = layout.allocate();
575
576		// Get values and set them
577		let values: Vec<Value> = self.columns.iter().map(|col| col.data().get_value(0)).collect();
578		layout.set_values(&mut encoded, &values);
579
580		Row {
581			number: row_number,
582			encoded,
583			shape: layout,
584		}
585	}
586}
587
588#[cfg(test)]
589pub mod tests {
590	use reifydb_type::value::{date::Date, datetime::DateTime, duration::Duration, time::Time};
591
592	use super::*;
593
594	#[test]
595	fn test_single_row_temporal_types() {
596		let date = Date::from_ymd(2025, 1, 15).unwrap();
597		let datetime = DateTime::from_timestamp(1642694400).unwrap();
598		let time = Time::from_hms(14, 30, 45).unwrap();
599		let duration = Duration::from_days(30).unwrap();
600
601		let columns = Columns::single_row([
602			("date_col", Value::Date(date.clone())),
603			("datetime_col", Value::DateTime(datetime.clone())),
604			("time_col", Value::Time(time.clone())),
605			("interval_col", Value::Duration(duration.clone())),
606		]);
607
608		assert_eq!(columns.len(), 4);
609		assert_eq!(columns.shape(), (1, 4));
610
611		// Check that the values are correctly stored
612		assert_eq!(columns.column("date_col").unwrap().data().get_value(0), Value::Date(date));
613		assert_eq!(columns.column("datetime_col").unwrap().data().get_value(0), Value::DateTime(datetime));
614		assert_eq!(columns.column("time_col").unwrap().data().get_value(0), Value::Time(time));
615		assert_eq!(columns.column("interval_col").unwrap().data().get_value(0), Value::Duration(duration));
616	}
617
618	#[test]
619	fn test_single_row_mixed_types() {
620		let date = Date::from_ymd(2025, 7, 15).unwrap();
621		let time = Time::from_hms(9, 15, 30).unwrap();
622
623		let columns = Columns::single_row([
624			("bool_col", Value::Boolean(true)),
625			("int_col", Value::Int4(42)),
626			("str_col", Value::Utf8("hello".to_string())),
627			("date_col", Value::Date(date.clone())),
628			("time_col", Value::Time(time.clone())),
629			("none_col", Value::none()),
630		]);
631
632		assert_eq!(columns.len(), 6);
633		assert_eq!(columns.shape(), (1, 6));
634
635		// Check all values are correctly stored
636		assert_eq!(columns.column("bool_col").unwrap().data().get_value(0), Value::Boolean(true));
637		assert_eq!(columns.column("int_col").unwrap().data().get_value(0), Value::Int4(42));
638		assert_eq!(columns.column("str_col").unwrap().data().get_value(0), Value::Utf8("hello".to_string()));
639		assert_eq!(columns.column("date_col").unwrap().data().get_value(0), Value::Date(date));
640		assert_eq!(columns.column("time_col").unwrap().data().get_value(0), Value::Time(time));
641		assert_eq!(columns.column("none_col").unwrap().data().get_value(0), Value::none());
642	}
643
644	#[test]
645	fn test_single_row_normal_column_names_work() {
646		let columns = Columns::single_row([("normal_column", Value::Int4(42))]);
647		assert_eq!(columns.len(), 1);
648		assert_eq!(columns.column("normal_column").unwrap().data().get_value(0), Value::Int4(42));
649	}
650}