Skip to main content

reifydb_core/value/column/
columns.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	collections::HashMap,
6	hash::Hash,
7	ops::{Deref, Index, IndexMut},
8};
9
10use indexmap::IndexMap;
11use reifydb_type::{
12	fragment::Fragment,
13	util::cowvec::CowVec,
14	value::{Value, constraint::Constraint, row_number::RowNumber, r#type::Type},
15};
16
17use crate::{
18	encoded::schema::{Schema, SchemaField},
19	interface::{
20		catalog::{table::TableDef, view::ViewDef},
21		resolved::{ResolvedRingBuffer, ResolvedTable, ResolvedView},
22	},
23	row::Row,
24	value::column::{Column, ColumnData, headers::ColumnHeaders},
25};
26
27#[derive(Debug, Clone)]
28pub struct Columns {
29	pub row_numbers: CowVec<RowNumber>,
30	pub columns: CowVec<Column>,
31}
32
33impl Deref for Columns {
34	type Target = [Column];
35
36	fn deref(&self) -> &Self::Target {
37		self.columns.deref()
38	}
39}
40
41impl Index<usize> for Columns {
42	type Output = Column;
43
44	fn index(&self, index: usize) -> &Self::Output {
45		self.columns.index(index)
46	}
47}
48
49impl IndexMut<usize> for Columns {
50	fn index_mut(&mut self, index: usize) -> &mut Self::Output {
51		&mut self.columns.make_mut()[index]
52	}
53}
54
55impl Columns {
56	/// Create a 1-column, 1-row Columns from a single Value.
57	/// Used to store scalar values inside `Variable::Scalar(Columns)`.
58	pub fn scalar(value: Value) -> Self {
59		let data = match value {
60			Value::None {
61				..
62			} => ColumnData::none_typed(Type::Boolean, 1),
63			Value::Boolean(v) => ColumnData::bool([v]),
64			Value::Float4(v) => ColumnData::float4([v.into()]),
65			Value::Float8(v) => ColumnData::float8([v.into()]),
66			Value::Int1(v) => ColumnData::int1([v]),
67			Value::Int2(v) => ColumnData::int2([v]),
68			Value::Int4(v) => ColumnData::int4([v]),
69			Value::Int8(v) => ColumnData::int8([v]),
70			Value::Int16(v) => ColumnData::int16([v]),
71			Value::Utf8(v) => ColumnData::utf8([v]),
72			Value::Uint1(v) => ColumnData::uint1([v]),
73			Value::Uint2(v) => ColumnData::uint2([v]),
74			Value::Uint4(v) => ColumnData::uint4([v]),
75			Value::Uint8(v) => ColumnData::uint8([v]),
76			Value::Uint16(v) => ColumnData::uint16([v]),
77			Value::Date(v) => ColumnData::date([v]),
78			Value::DateTime(v) => ColumnData::datetime([v]),
79			Value::Time(v) => ColumnData::time([v]),
80			Value::Duration(v) => ColumnData::duration([v]),
81			Value::IdentityId(v) => ColumnData::identity_id([v]),
82			Value::Uuid4(v) => ColumnData::uuid4([v]),
83			Value::Uuid7(v) => ColumnData::uuid7([v]),
84			Value::Blob(v) => ColumnData::blob([v]),
85			Value::Int(v) => ColumnData::int(vec![v]),
86			Value::Uint(v) => ColumnData::uint(vec![v]),
87			Value::Decimal(v) => ColumnData::decimal(vec![v]),
88			Value::DictionaryId(v) => ColumnData::dictionary_id(vec![v]),
89			Value::Any(v) => ColumnData::any(vec![v]),
90			Value::Type(v) => ColumnData::any(vec![Box::new(Value::Type(v))]),
91		};
92		let column = Column {
93			name: Fragment::internal("value"),
94			data,
95		};
96		Self {
97			row_numbers: CowVec::new(Vec::new()),
98			columns: CowVec::new(vec![column]),
99		}
100	}
101
102	/// Extract the single value from a 1-column, 1-row Columns.
103	/// Panics if the Columns does not have exactly 1 column and 1 row.
104	pub fn scalar_value(&self) -> Value {
105		debug_assert_eq!(self.len(), 1, "scalar_value() requires exactly 1 column, got {}", self.len());
106		debug_assert_eq!(
107			self.row_count(),
108			1,
109			"scalar_value() requires exactly 1 row, got {}",
110			self.row_count()
111		);
112		self.columns[0].data().get_value(0)
113	}
114
115	pub fn new(columns: Vec<Column>) -> Self {
116		let n = columns.first().map_or(0, |c| c.data().len());
117		assert!(columns.iter().all(|c| c.data().len() == n));
118
119		Self {
120			row_numbers: CowVec::new(Vec::new()),
121			columns: CowVec::new(columns),
122		}
123	}
124
125	pub fn with_row_numbers(columns: Vec<Column>, row_numbers: Vec<RowNumber>) -> Self {
126		let n = columns.first().map_or(0, |c| c.data().len());
127		assert!(columns.iter().all(|c| c.data().len() == n));
128		assert_eq!(row_numbers.len(), n, "row_numbers length must match column data length");
129
130		Self {
131			row_numbers: CowVec::new(row_numbers),
132			columns: CowVec::new(columns),
133		}
134	}
135
136	pub fn single_row<'b>(rows: impl IntoIterator<Item = (&'b str, Value)>) -> Columns {
137		let mut columns = Vec::new();
138		let mut index = HashMap::new();
139
140		for (idx, (name, value)) in rows.into_iter().enumerate() {
141			let data = match value {
142				Value::None {
143					..
144				} => ColumnData::none_typed(Type::Boolean, 1),
145				Value::Boolean(v) => ColumnData::bool([v]),
146				Value::Float4(v) => ColumnData::float4([v.into()]),
147				Value::Float8(v) => ColumnData::float8([v.into()]),
148				Value::Int1(v) => ColumnData::int1([v]),
149				Value::Int2(v) => ColumnData::int2([v]),
150				Value::Int4(v) => ColumnData::int4([v]),
151				Value::Int8(v) => ColumnData::int8([v]),
152				Value::Int16(v) => ColumnData::int16([v]),
153				Value::Utf8(v) => ColumnData::utf8([v.clone()]),
154				Value::Uint1(v) => ColumnData::uint1([v]),
155				Value::Uint2(v) => ColumnData::uint2([v]),
156				Value::Uint4(v) => ColumnData::uint4([v]),
157				Value::Uint8(v) => ColumnData::uint8([v]),
158				Value::Uint16(v) => ColumnData::uint16([v]),
159				Value::Date(v) => ColumnData::date([v.clone()]),
160				Value::DateTime(v) => ColumnData::datetime([v.clone()]),
161				Value::Time(v) => ColumnData::time([v.clone()]),
162				Value::Duration(v) => ColumnData::duration([v.clone()]),
163				Value::IdentityId(v) => ColumnData::identity_id([v]),
164				Value::Uuid4(v) => ColumnData::uuid4([v]),
165				Value::Uuid7(v) => ColumnData::uuid7([v]),
166				Value::Blob(v) => ColumnData::blob([v.clone()]),
167				Value::Int(v) => ColumnData::int(vec![v]),
168				Value::Uint(v) => ColumnData::uint(vec![v]),
169				Value::Decimal(v) => ColumnData::decimal(vec![v]),
170				Value::DictionaryId(v) => ColumnData::dictionary_id(vec![v]),
171				Value::Type(t) => ColumnData::any(vec![Box::new(Value::Type(t))]),
172				Value::Any(v) => ColumnData::any(vec![v]),
173			};
174
175			let column = Column {
176				name: Fragment::internal(name.to_string()),
177				data,
178			};
179			index.insert(name, idx);
180			columns.push(column);
181		}
182
183		Self {
184			row_numbers: CowVec::new(Vec::new()),
185			columns: CowVec::new(columns),
186		}
187	}
188
189	pub fn apply_headers(&mut self, headers: &ColumnHeaders) {
190		// Apply the column names from headers to this Columns instance
191		for (i, name) in headers.columns.iter().enumerate() {
192			if i < self.len() {
193				let column = &mut self[i];
194				let data =
195					std::mem::replace(column.data_mut(), ColumnData::none_typed(Type::Boolean, 0));
196
197				*column = Column {
198					name: name.clone(),
199					data,
200				};
201			}
202		}
203	}
204}
205
206impl Columns {
207	/// Get the row number (for single-row Columns). Panics if Columns has 0 or multiple rows.
208	pub fn number(&self) -> RowNumber {
209		assert_eq!(self.row_count(), 1, "number() requires exactly 1 row, got {}", self.row_count());
210		if self.row_numbers.is_empty() {
211			RowNumber(0)
212		} else {
213			self.row_numbers[0]
214		}
215	}
216
217	pub fn shape(&self) -> (usize, usize) {
218		let row_count = if !self.row_numbers.is_empty() {
219			self.row_numbers.len()
220		} else {
221			self.get(0).map(|c| c.data().len()).unwrap_or(0)
222		};
223		(row_count, self.len())
224	}
225
226	pub fn into_iter(self) -> impl Iterator<Item = Column> {
227		self.columns.into_iter()
228	}
229
230	pub fn is_empty(&self) -> bool {
231		self.shape().0 == 0
232	}
233
234	pub fn row(&self, i: usize) -> Vec<Value> {
235		self.iter().map(|c| c.data().get_value(i)).collect()
236	}
237
238	pub fn column(&self, name: &str) -> Option<&Column> {
239		self.iter().find(|col| col.name().text() == name)
240	}
241
242	pub fn row_count(&self) -> usize {
243		if !self.row_numbers.is_empty() {
244			self.row_numbers.len()
245		} else {
246			self.first().map_or(0, |col| col.data().len())
247		}
248	}
249
250	pub fn get_row(&self, index: usize) -> Vec<Value> {
251		self.iter().map(|col| col.data().get_value(index)).collect()
252	}
253}
254
255impl Column {
256	pub fn extend(&mut self, other: Column) -> reifydb_type::Result<()> {
257		self.data_mut().extend(other.data().clone())
258	}
259}
260
261impl Columns {
262	pub fn from_rows(names: &[&str], result_rows: &[Vec<Value>]) -> Self {
263		let column_count = names.len();
264
265		let mut columns: Vec<Column> = names
266			.iter()
267			.map(|name| Column {
268				name: Fragment::internal(name.to_string()),
269				data: ColumnData::none_typed(Type::Boolean, 0),
270			})
271			.collect();
272
273		for row in result_rows {
274			assert_eq!(row.len(), column_count, "row length does not match column count");
275			for (i, value) in row.iter().enumerate() {
276				columns[i].data_mut().push_value(value.clone());
277			}
278		}
279
280		Columns::new(columns)
281	}
282
283	pub fn from_rows_with_row_numbers(
284		names: &[&str],
285		result_rows: &[Vec<Value>],
286		row_numbers: Vec<RowNumber>,
287	) -> Self {
288		let column_count = names.len();
289
290		let mut columns: Vec<Column> = names
291			.iter()
292			.map(|name| Column {
293				name: Fragment::internal(name.to_string()),
294				data: ColumnData::none_typed(Type::Boolean, 0),
295			})
296			.collect();
297
298		for row in result_rows {
299			assert_eq!(row.len(), column_count, "row length does not match column count");
300			for (i, value) in row.iter().enumerate() {
301				columns[i].data_mut().push_value(value.clone());
302			}
303		}
304
305		Columns::with_row_numbers(columns, row_numbers)
306	}
307}
308
309impl Columns {
310	pub fn empty() -> Self {
311		Self {
312			row_numbers: CowVec::new(vec![]),
313			columns: CowVec::new(vec![]),
314		}
315	}
316
317	pub fn from_table(table: &ResolvedTable) -> Self {
318		let _source = table.clone();
319
320		let columns: Vec<Column> = table
321			.columns()
322			.iter()
323			.map(|col| {
324				let column_ident = Fragment::internal(&col.name);
325				Column {
326					name: column_ident,
327					data: ColumnData::with_capacity(col.constraint.get_type(), 0),
328				}
329			})
330			.collect();
331
332		Self {
333			row_numbers: CowVec::new(Vec::new()),
334			columns: CowVec::new(columns),
335		}
336	}
337
338	/// Create empty Columns (0 rows) with schema from a TableDef
339	pub fn from_table_def(table: &TableDef) -> Self {
340		let columns: Vec<Column> = table
341			.columns
342			.iter()
343			.map(|col| Column {
344				name: Fragment::internal(&col.name),
345				data: ColumnData::with_capacity(col.constraint.get_type(), 0),
346			})
347			.collect();
348
349		Self {
350			row_numbers: CowVec::new(Vec::new()),
351			columns: CowVec::new(columns),
352		}
353	}
354
355	/// Create empty Columns (0 rows) with schema from a ViewDef
356	pub fn from_view_def(view: &ViewDef) -> Self {
357		let columns: Vec<Column> = view
358			.columns
359			.iter()
360			.map(|col| Column {
361				name: Fragment::internal(&col.name),
362				data: ColumnData::with_capacity(col.constraint.get_type(), 0),
363			})
364			.collect();
365
366		Self {
367			row_numbers: CowVec::new(Vec::new()),
368			columns: CowVec::new(columns),
369		}
370	}
371
372	pub fn from_ringbuffer(ringbuffer: &ResolvedRingBuffer) -> Self {
373		let _source = ringbuffer.clone();
374
375		let columns: Vec<Column> = ringbuffer
376			.columns()
377			.iter()
378			.map(|col| {
379				let column_ident = Fragment::internal(&col.name);
380				Column {
381					name: column_ident,
382					data: ColumnData::with_capacity(col.constraint.get_type(), 0),
383				}
384			})
385			.collect();
386
387		Self {
388			row_numbers: CowVec::new(Vec::new()),
389			columns: CowVec::new(columns),
390		}
391	}
392
393	pub fn from_view(view: &ResolvedView) -> Self {
394		let _source = view.clone();
395
396		let columns: Vec<Column> = view
397			.columns()
398			.iter()
399			.map(|col| {
400				let column_ident = Fragment::internal(&col.name);
401				Column {
402					name: column_ident,
403					data: ColumnData::with_capacity(col.constraint.get_type(), 0),
404				}
405			})
406			.collect();
407
408		Self {
409			row_numbers: CowVec::new(Vec::new()),
410			columns: CowVec::new(columns),
411		}
412	}
413}
414
415impl Columns {
416	/// Extract a subset of rows by indices, returning a new Columns
417	pub fn extract_by_indices(&self, indices: &[usize]) -> Columns {
418		if indices.is_empty() {
419			return Columns::empty();
420		}
421
422		let new_columns: Vec<Column> = self
423			.columns
424			.iter()
425			.map(|col| {
426				let mut new_data = ColumnData::with_capacity(col.data().get_type(), indices.len());
427				for &idx in indices {
428					new_data.push_value(col.data().get_value(idx));
429				}
430				Column {
431					name: col.name.clone(),
432					data: new_data,
433				}
434			})
435			.collect();
436
437		if self.row_numbers.is_empty() {
438			Columns::new(new_columns)
439		} else {
440			let new_row_numbers: Vec<RowNumber> = indices.iter().map(|&i| self.row_numbers[i]).collect();
441			Columns::with_row_numbers(new_columns, new_row_numbers)
442		}
443	}
444
445	/// Extract a single row by index, returning a new Columns with 1 row
446	pub fn extract_row(&self, index: usize) -> Columns {
447		self.extract_by_indices(&[index])
448	}
449
450	/// Partition Columns into groups based on keys (one key per row).
451	/// Returns an IndexMap preserving insertion order of first occurrence.
452	pub fn partition_by_keys<K: Hash + Eq + Clone>(&self, keys: &[K]) -> IndexMap<K, Columns> {
453		assert_eq!(keys.len(), self.row_count(), "keys length must match row count");
454
455		// Group indices by key
456		let mut key_to_indices: IndexMap<K, Vec<usize>> = IndexMap::new();
457		for (idx, key) in keys.iter().enumerate() {
458			key_to_indices.entry(key.clone()).or_default().push(idx);
459		}
460
461		// Convert to Columns
462		key_to_indices.into_iter().map(|(key, indices)| (key, self.extract_by_indices(&indices))).collect()
463	}
464
465	/// Create Columns from a Row by decoding its encoded values
466	pub fn from_row(row: &Row) -> Self {
467		let mut columns = Vec::new();
468
469		for (idx, field) in row.schema.fields().iter().enumerate() {
470			let value = row.schema.get_value(&row.encoded, idx);
471
472			// Use the field type for the column data, handling undefined values
473			let column_type = if matches!(value, Value::None { .. }) {
474				field.constraint.get_type()
475			} else {
476				value.get_type()
477			};
478
479			let mut data = if column_type.is_option() {
480				ColumnData::none_typed(column_type.clone(), 0)
481			} else {
482				ColumnData::with_capacity(column_type.clone(), 1)
483			};
484			data.push_value(value);
485
486			if column_type == Type::DictionaryId {
487				if let ColumnData::DictionaryId(container) = &mut data {
488					if let Some(Constraint::Dictionary(dict_id, _)) = field.constraint.constraint()
489					{
490						container.set_dictionary_id(*dict_id);
491					}
492				}
493			}
494
495			let name = row.schema.get_field_name(idx).expect("Schema missing name for field");
496
497			columns.push(Column {
498				name: Fragment::internal(name),
499				data,
500			});
501		}
502
503		Self {
504			row_numbers: CowVec::new(vec![row.number]),
505			columns: CowVec::new(columns),
506		}
507	}
508
509	/// Convert Columns back to a Row (assumes single row)
510	/// Panics if Columns contains more than 1 row
511	pub fn to_single_row(&self) -> Row {
512		assert_eq!(self.row_count(), 1, "to_row() requires exactly 1 row, got {}", self.row_count());
513		assert_eq!(
514			self.row_numbers.len(),
515			1,
516			"to_row() requires exactly 1 row number, got {}",
517			self.row_numbers.len()
518		);
519
520		let row_number = self.row_numbers.first().unwrap().clone();
521
522		// Build schema fields for the layout
523		let fields: Vec<SchemaField> = self
524			.columns
525			.iter()
526			.map(|col| SchemaField::unconstrained(col.name().text().to_string(), col.data().get_type()))
527			.collect();
528
529		let layout = Schema::new(fields);
530		let mut encoded = layout.allocate();
531
532		// Get values and set them
533		let values: Vec<Value> = self.columns.iter().map(|col| col.data().get_value(0)).collect();
534		layout.set_values(&mut encoded, &values);
535
536		Row {
537			number: row_number,
538			encoded,
539			schema: layout,
540		}
541	}
542}
543
544#[cfg(test)]
545pub mod tests {
546	use reifydb_type::value::{date::Date, datetime::DateTime, duration::Duration, time::Time};
547
548	use super::*;
549
550	#[test]
551	fn test_single_row_temporal_types() {
552		let date = Date::from_ymd(2025, 1, 15).unwrap();
553		let datetime = DateTime::from_timestamp(1642694400).unwrap();
554		let time = Time::from_hms(14, 30, 45).unwrap();
555		let duration = Duration::from_days(30);
556
557		let columns = Columns::single_row([
558			("date_col", Value::Date(date.clone())),
559			("datetime_col", Value::DateTime(datetime.clone())),
560			("time_col", Value::Time(time.clone())),
561			("interval_col", Value::Duration(duration.clone())),
562		]);
563
564		assert_eq!(columns.len(), 4);
565		assert_eq!(columns.shape(), (1, 4));
566
567		// Check that the values are correctly stored
568		assert_eq!(columns.column("date_col").unwrap().data().get_value(0), Value::Date(date));
569		assert_eq!(columns.column("datetime_col").unwrap().data().get_value(0), Value::DateTime(datetime));
570		assert_eq!(columns.column("time_col").unwrap().data().get_value(0), Value::Time(time));
571		assert_eq!(columns.column("interval_col").unwrap().data().get_value(0), Value::Duration(duration));
572	}
573
574	#[test]
575	fn test_single_row_mixed_types() {
576		let date = Date::from_ymd(2025, 7, 15).unwrap();
577		let time = Time::from_hms(9, 15, 30).unwrap();
578
579		let columns = Columns::single_row([
580			("bool_col", Value::Boolean(true)),
581			("int_col", Value::Int4(42)),
582			("str_col", Value::Utf8("hello".to_string())),
583			("date_col", Value::Date(date.clone())),
584			("time_col", Value::Time(time.clone())),
585			("none_col", Value::none()),
586		]);
587
588		assert_eq!(columns.len(), 6);
589		assert_eq!(columns.shape(), (1, 6));
590
591		// Check all values are correctly stored
592		assert_eq!(columns.column("bool_col").unwrap().data().get_value(0), Value::Boolean(true));
593		assert_eq!(columns.column("int_col").unwrap().data().get_value(0), Value::Int4(42));
594		assert_eq!(columns.column("str_col").unwrap().data().get_value(0), Value::Utf8("hello".to_string()));
595		assert_eq!(columns.column("date_col").unwrap().data().get_value(0), Value::Date(date));
596		assert_eq!(columns.column("time_col").unwrap().data().get_value(0), Value::Time(time));
597		assert_eq!(columns.column("none_col").unwrap().data().get_value(0), Value::none());
598	}
599
600	#[test]
601	fn test_single_row_normal_column_names_work() {
602		let columns = Columns::single_row([("normal_column", Value::Int4(42))]);
603		assert_eq!(columns.len(), 1);
604		assert_eq!(columns.column("normal_column").unwrap().data().get_value(0), Value::Int4(42));
605	}
606}