Skip to main content

reifydb_core/value/column/
columns.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	collections::HashMap,
6	hash::Hash,
7	mem,
8	ops::{Deref, Index, IndexMut},
9};
10
11use indexmap::IndexMap;
12use reifydb_type::{
13	Result,
14	fragment::Fragment,
15	util::cowvec::CowVec,
16	value::{Value, constraint::Constraint, row_number::RowNumber, r#type::Type},
17};
18
19use crate::{
20	encoded::schema::{Schema, SchemaField},
21	interface::{
22		catalog::{table::TableDef, view::ViewDef},
23		resolved::{ResolvedRingBuffer, ResolvedTable, ResolvedView},
24	},
25	row::Row,
26	value::column::{Column, ColumnData, headers::ColumnHeaders},
27};
28
29#[derive(Debug, Clone)]
30pub struct Columns {
31	pub row_numbers: CowVec<RowNumber>,
32	pub columns: CowVec<Column>,
33}
34
35impl Deref for Columns {
36	type Target = [Column];
37
38	fn deref(&self) -> &Self::Target {
39		self.columns.deref()
40	}
41}
42
43impl Index<usize> for Columns {
44	type Output = Column;
45
46	fn index(&self, index: usize) -> &Self::Output {
47		self.columns.index(index)
48	}
49}
50
51impl IndexMut<usize> for Columns {
52	fn index_mut(&mut self, index: usize) -> &mut Self::Output {
53		&mut self.columns.make_mut()[index]
54	}
55}
56
57impl Columns {
58	/// Create a 1-column, 1-row Columns from a single Value.
59	/// Used to store scalar values inside `Variable::Scalar(Columns)`.
60	pub fn scalar(value: Value) -> Self {
61		let data = match value {
62			Value::None {
63				..
64			} => ColumnData::none_typed(Type::Boolean, 1),
65			Value::Boolean(v) => ColumnData::bool([v]),
66			Value::Float4(v) => ColumnData::float4([v.into()]),
67			Value::Float8(v) => ColumnData::float8([v.into()]),
68			Value::Int1(v) => ColumnData::int1([v]),
69			Value::Int2(v) => ColumnData::int2([v]),
70			Value::Int4(v) => ColumnData::int4([v]),
71			Value::Int8(v) => ColumnData::int8([v]),
72			Value::Int16(v) => ColumnData::int16([v]),
73			Value::Utf8(v) => ColumnData::utf8([v]),
74			Value::Uint1(v) => ColumnData::uint1([v]),
75			Value::Uint2(v) => ColumnData::uint2([v]),
76			Value::Uint4(v) => ColumnData::uint4([v]),
77			Value::Uint8(v) => ColumnData::uint8([v]),
78			Value::Uint16(v) => ColumnData::uint16([v]),
79			Value::Date(v) => ColumnData::date([v]),
80			Value::DateTime(v) => ColumnData::datetime([v]),
81			Value::Time(v) => ColumnData::time([v]),
82			Value::Duration(v) => ColumnData::duration([v]),
83			Value::IdentityId(v) => ColumnData::identity_id([v]),
84			Value::Uuid4(v) => ColumnData::uuid4([v]),
85			Value::Uuid7(v) => ColumnData::uuid7([v]),
86			Value::Blob(v) => ColumnData::blob([v]),
87			Value::Int(v) => ColumnData::int(vec![v]),
88			Value::Uint(v) => ColumnData::uint(vec![v]),
89			Value::Decimal(v) => ColumnData::decimal(vec![v]),
90			Value::DictionaryId(v) => ColumnData::dictionary_id(vec![v]),
91			Value::Any(v) => ColumnData::any(vec![v]),
92			Value::Type(v) => ColumnData::any(vec![Box::new(Value::Type(v))]),
93			Value::List(v) => ColumnData::any(vec![Box::new(Value::List(v))]),
94			Value::Record(v) => ColumnData::any(vec![Box::new(Value::Record(v))]),
95			Value::Tuple(v) => ColumnData::any(vec![Box::new(Value::Tuple(v))]),
96		};
97		let column = Column {
98			name: Fragment::internal("value"),
99			data,
100		};
101		Self {
102			row_numbers: CowVec::new(Vec::new()),
103			columns: CowVec::new(vec![column]),
104		}
105	}
106
107	/// Extract the single value from a 1-column, 1-row Columns.
108	/// Panics if the Columns does not have exactly 1 column and 1 row.
109	pub fn scalar_value(&self) -> Value {
110		debug_assert_eq!(self.len(), 1, "scalar_value() requires exactly 1 column, got {}", self.len());
111		debug_assert_eq!(
112			self.row_count(),
113			1,
114			"scalar_value() requires exactly 1 row, got {}",
115			self.row_count()
116		);
117		self.columns[0].data().get_value(0)
118	}
119
120	pub fn new(columns: Vec<Column>) -> Self {
121		let n = columns.first().map_or(0, |c| c.data().len());
122		assert!(columns.iter().all(|c| c.data().len() == n));
123
124		Self {
125			row_numbers: CowVec::new(Vec::new()),
126			columns: CowVec::new(columns),
127		}
128	}
129
130	pub fn with_row_numbers(columns: Vec<Column>, row_numbers: Vec<RowNumber>) -> Self {
131		let n = columns.first().map_or(0, |c| c.data().len());
132		assert!(columns.iter().all(|c| c.data().len() == n));
133		assert_eq!(row_numbers.len(), n, "row_numbers length must match column data length");
134
135		Self {
136			row_numbers: CowVec::new(row_numbers),
137			columns: CowVec::new(columns),
138		}
139	}
140
141	pub fn single_row<'b>(rows: impl IntoIterator<Item = (&'b str, Value)>) -> Columns {
142		let mut columns = Vec::new();
143		let mut index = HashMap::new();
144
145		for (idx, (name, value)) in rows.into_iter().enumerate() {
146			let data = match value {
147				Value::None {
148					..
149				} => ColumnData::none_typed(Type::Boolean, 1),
150				Value::Boolean(v) => ColumnData::bool([v]),
151				Value::Float4(v) => ColumnData::float4([v.into()]),
152				Value::Float8(v) => ColumnData::float8([v.into()]),
153				Value::Int1(v) => ColumnData::int1([v]),
154				Value::Int2(v) => ColumnData::int2([v]),
155				Value::Int4(v) => ColumnData::int4([v]),
156				Value::Int8(v) => ColumnData::int8([v]),
157				Value::Int16(v) => ColumnData::int16([v]),
158				Value::Utf8(v) => ColumnData::utf8([v.clone()]),
159				Value::Uint1(v) => ColumnData::uint1([v]),
160				Value::Uint2(v) => ColumnData::uint2([v]),
161				Value::Uint4(v) => ColumnData::uint4([v]),
162				Value::Uint8(v) => ColumnData::uint8([v]),
163				Value::Uint16(v) => ColumnData::uint16([v]),
164				Value::Date(v) => ColumnData::date([v.clone()]),
165				Value::DateTime(v) => ColumnData::datetime([v.clone()]),
166				Value::Time(v) => ColumnData::time([v.clone()]),
167				Value::Duration(v) => ColumnData::duration([v.clone()]),
168				Value::IdentityId(v) => ColumnData::identity_id([v]),
169				Value::Uuid4(v) => ColumnData::uuid4([v]),
170				Value::Uuid7(v) => ColumnData::uuid7([v]),
171				Value::Blob(v) => ColumnData::blob([v.clone()]),
172				Value::Int(v) => ColumnData::int(vec![v]),
173				Value::Uint(v) => ColumnData::uint(vec![v]),
174				Value::Decimal(v) => ColumnData::decimal(vec![v]),
175				Value::DictionaryId(v) => ColumnData::dictionary_id(vec![v]),
176				Value::Type(t) => ColumnData::any(vec![Box::new(Value::Type(t))]),
177				Value::Any(v) => ColumnData::any(vec![v]),
178				Value::List(v) => ColumnData::any(vec![Box::new(Value::List(v))]),
179				Value::Record(v) => ColumnData::any(vec![Box::new(Value::Record(v))]),
180				Value::Tuple(v) => ColumnData::any(vec![Box::new(Value::Tuple(v))]),
181			};
182
183			let column = Column {
184				name: Fragment::internal(name.to_string()),
185				data,
186			};
187			index.insert(name, idx);
188			columns.push(column);
189		}
190
191		Self {
192			row_numbers: CowVec::new(Vec::new()),
193			columns: CowVec::new(columns),
194		}
195	}
196
197	pub fn apply_headers(&mut self, headers: &ColumnHeaders) {
198		// Apply the column names from headers to this Columns instance
199		for (i, name) in headers.columns.iter().enumerate() {
200			if i < self.len() {
201				let column = &mut self[i];
202				let data = mem::replace(column.data_mut(), ColumnData::none_typed(Type::Boolean, 0));
203
204				*column = Column {
205					name: name.clone(),
206					data,
207				};
208			}
209		}
210	}
211}
212
213impl Columns {
214	/// Get the row number (for single-row Columns). Panics if Columns has 0 or multiple rows.
215	pub fn number(&self) -> RowNumber {
216		assert_eq!(self.row_count(), 1, "number() requires exactly 1 row, got {}", self.row_count());
217		if self.row_numbers.is_empty() {
218			RowNumber(0)
219		} else {
220			self.row_numbers[0]
221		}
222	}
223
224	pub fn shape(&self) -> (usize, usize) {
225		let row_count = if !self.row_numbers.is_empty() {
226			self.row_numbers.len()
227		} else {
228			self.get(0).map(|c| c.data().len()).unwrap_or(0)
229		};
230		(row_count, self.len())
231	}
232
233	pub fn into_iter(self) -> impl Iterator<Item = Column> {
234		self.columns.into_iter()
235	}
236
237	pub fn is_empty(&self) -> bool {
238		self.shape().0 == 0
239	}
240
241	pub fn row(&self, i: usize) -> Vec<Value> {
242		self.iter().map(|c| c.data().get_value(i)).collect()
243	}
244
245	pub fn column(&self, name: &str) -> Option<&Column> {
246		self.iter().find(|col| col.name().text() == name)
247	}
248
249	pub fn row_count(&self) -> usize {
250		if !self.row_numbers.is_empty() {
251			self.row_numbers.len()
252		} else {
253			self.first().map_or(0, |col| col.data().len())
254		}
255	}
256
257	pub fn get_row(&self, index: usize) -> Vec<Value> {
258		self.iter().map(|col| col.data().get_value(index)).collect()
259	}
260}
261
262impl Column {
263	pub fn extend(&mut self, other: Column) -> Result<()> {
264		self.data_mut().extend(other.data().clone())
265	}
266}
267
268impl Columns {
269	pub fn from_rows(names: &[&str], result_rows: &[Vec<Value>]) -> Self {
270		let column_count = names.len();
271
272		let mut columns: Vec<Column> = names
273			.iter()
274			.map(|name| Column {
275				name: Fragment::internal(name.to_string()),
276				data: ColumnData::none_typed(Type::Boolean, 0),
277			})
278			.collect();
279
280		for row in result_rows {
281			assert_eq!(row.len(), column_count, "row length does not match column count");
282			for (i, value) in row.iter().enumerate() {
283				columns[i].data_mut().push_value(value.clone());
284			}
285		}
286
287		Columns::new(columns)
288	}
289
290	pub fn from_rows_with_row_numbers(
291		names: &[&str],
292		result_rows: &[Vec<Value>],
293		row_numbers: Vec<RowNumber>,
294	) -> Self {
295		let column_count = names.len();
296
297		let mut columns: Vec<Column> = names
298			.iter()
299			.map(|name| Column {
300				name: Fragment::internal(name.to_string()),
301				data: ColumnData::none_typed(Type::Boolean, 0),
302			})
303			.collect();
304
305		for row in result_rows {
306			assert_eq!(row.len(), column_count, "row length does not match column count");
307			for (i, value) in row.iter().enumerate() {
308				columns[i].data_mut().push_value(value.clone());
309			}
310		}
311
312		Columns::with_row_numbers(columns, row_numbers)
313	}
314}
315
316impl Columns {
317	pub fn empty() -> Self {
318		Self {
319			row_numbers: CowVec::new(vec![]),
320			columns: CowVec::new(vec![]),
321		}
322	}
323
324	pub fn from_table(table: &ResolvedTable) -> Self {
325		let _source = table.clone();
326
327		let columns: Vec<Column> = table
328			.columns()
329			.iter()
330			.map(|col| {
331				let column_ident = Fragment::internal(&col.name);
332				Column {
333					name: column_ident,
334					data: ColumnData::with_capacity(col.constraint.get_type(), 0),
335				}
336			})
337			.collect();
338
339		Self {
340			row_numbers: CowVec::new(Vec::new()),
341			columns: CowVec::new(columns),
342		}
343	}
344
345	/// Create empty Columns (0 rows) with schema from a TableDef
346	pub fn from_table_def(table: &TableDef) -> Self {
347		let columns: Vec<Column> = table
348			.columns
349			.iter()
350			.map(|col| Column {
351				name: Fragment::internal(&col.name),
352				data: ColumnData::with_capacity(col.constraint.get_type(), 0),
353			})
354			.collect();
355
356		Self {
357			row_numbers: CowVec::new(Vec::new()),
358			columns: CowVec::new(columns),
359		}
360	}
361
362	/// Create empty Columns (0 rows) with schema from a ViewDef
363	pub fn from_view_def(view: &ViewDef) -> Self {
364		let columns: Vec<Column> = view
365			.columns
366			.iter()
367			.map(|col| Column {
368				name: Fragment::internal(&col.name),
369				data: ColumnData::with_capacity(col.constraint.get_type(), 0),
370			})
371			.collect();
372
373		Self {
374			row_numbers: CowVec::new(Vec::new()),
375			columns: CowVec::new(columns),
376		}
377	}
378
379	pub fn from_ringbuffer(ringbuffer: &ResolvedRingBuffer) -> Self {
380		let _source = ringbuffer.clone();
381
382		let columns: Vec<Column> = ringbuffer
383			.columns()
384			.iter()
385			.map(|col| {
386				let column_ident = Fragment::internal(&col.name);
387				Column {
388					name: column_ident,
389					data: ColumnData::with_capacity(col.constraint.get_type(), 0),
390				}
391			})
392			.collect();
393
394		Self {
395			row_numbers: CowVec::new(Vec::new()),
396			columns: CowVec::new(columns),
397		}
398	}
399
400	pub fn from_view(view: &ResolvedView) -> Self {
401		let _source = view.clone();
402
403		let columns: Vec<Column> = view
404			.columns()
405			.iter()
406			.map(|col| {
407				let column_ident = Fragment::internal(&col.name);
408				Column {
409					name: column_ident,
410					data: ColumnData::with_capacity(col.constraint.get_type(), 0),
411				}
412			})
413			.collect();
414
415		Self {
416			row_numbers: CowVec::new(Vec::new()),
417			columns: CowVec::new(columns),
418		}
419	}
420}
421
422impl Columns {
423	/// Extract a subset of rows by indices, returning a new Columns
424	pub fn extract_by_indices(&self, indices: &[usize]) -> Columns {
425		if indices.is_empty() {
426			return Columns::empty();
427		}
428
429		let new_columns: Vec<Column> = self
430			.columns
431			.iter()
432			.map(|col| {
433				let mut new_data = ColumnData::with_capacity(col.data().get_type(), indices.len());
434				for &idx in indices {
435					new_data.push_value(col.data().get_value(idx));
436				}
437				Column {
438					name: col.name.clone(),
439					data: new_data,
440				}
441			})
442			.collect();
443
444		if self.row_numbers.is_empty() {
445			Columns::new(new_columns)
446		} else {
447			let new_row_numbers: Vec<RowNumber> = indices.iter().map(|&i| self.row_numbers[i]).collect();
448			Columns::with_row_numbers(new_columns, new_row_numbers)
449		}
450	}
451
452	/// Extract a single row by index, returning a new Columns with 1 row
453	pub fn extract_row(&self, index: usize) -> Columns {
454		self.extract_by_indices(&[index])
455	}
456
457	/// Partition Columns into groups based on keys (one key per row).
458	/// Returns an IndexMap preserving insertion order of first occurrence.
459	pub fn partition_by_keys<K: Hash + Eq + Clone>(&self, keys: &[K]) -> IndexMap<K, Columns> {
460		assert_eq!(keys.len(), self.row_count(), "keys length must match row count");
461
462		// Group indices by key
463		let mut key_to_indices: IndexMap<K, Vec<usize>> = IndexMap::new();
464		for (idx, key) in keys.iter().enumerate() {
465			key_to_indices.entry(key.clone()).or_default().push(idx);
466		}
467
468		// Convert to Columns
469		key_to_indices.into_iter().map(|(key, indices)| (key, self.extract_by_indices(&indices))).collect()
470	}
471
472	/// Create Columns from a Row by decoding its encoded values
473	pub fn from_row(row: &Row) -> Self {
474		let mut columns = Vec::new();
475
476		for (idx, field) in row.schema.fields().iter().enumerate() {
477			let value = row.schema.get_value(&row.encoded, idx);
478
479			// Use the field type for the column data, handling undefined values
480			let column_type = if matches!(value, Value::None { .. }) {
481				field.constraint.get_type()
482			} else {
483				value.get_type()
484			};
485
486			let mut data = if column_type.is_option() {
487				ColumnData::none_typed(column_type.clone(), 0)
488			} else {
489				ColumnData::with_capacity(column_type.clone(), 1)
490			};
491			data.push_value(value);
492
493			if column_type == Type::DictionaryId {
494				if let ColumnData::DictionaryId(container) = &mut data {
495					if let Some(Constraint::Dictionary(dict_id, _)) = field.constraint.constraint()
496					{
497						container.set_dictionary_id(*dict_id);
498					}
499				}
500			}
501
502			let name = row.schema.get_field_name(idx).expect("Schema missing name for field");
503
504			columns.push(Column {
505				name: Fragment::internal(name),
506				data,
507			});
508		}
509
510		Self {
511			row_numbers: CowVec::new(vec![row.number]),
512			columns: CowVec::new(columns),
513		}
514	}
515
516	/// Convert Columns back to a Row (assumes single row)
517	/// Panics if Columns contains more than 1 row
518	pub fn to_single_row(&self) -> Row {
519		assert_eq!(self.row_count(), 1, "to_row() requires exactly 1 row, got {}", self.row_count());
520		assert_eq!(
521			self.row_numbers.len(),
522			1,
523			"to_row() requires exactly 1 row number, got {}",
524			self.row_numbers.len()
525		);
526
527		let row_number = self.row_numbers.first().unwrap().clone();
528
529		// Build schema fields for the layout
530		let fields: Vec<SchemaField> = self
531			.columns
532			.iter()
533			.map(|col| SchemaField::unconstrained(col.name().text().to_string(), col.data().get_type()))
534			.collect();
535
536		let layout = Schema::new(fields);
537		let mut encoded = layout.allocate();
538
539		// Get values and set them
540		let values: Vec<Value> = self.columns.iter().map(|col| col.data().get_value(0)).collect();
541		layout.set_values(&mut encoded, &values);
542
543		Row {
544			number: row_number,
545			encoded,
546			schema: layout,
547		}
548	}
549}
550
551#[cfg(test)]
552pub mod tests {
553	use reifydb_type::value::{date::Date, datetime::DateTime, duration::Duration, time::Time};
554
555	use super::*;
556
557	#[test]
558	fn test_single_row_temporal_types() {
559		let date = Date::from_ymd(2025, 1, 15).unwrap();
560		let datetime = DateTime::from_timestamp(1642694400).unwrap();
561		let time = Time::from_hms(14, 30, 45).unwrap();
562		let duration = Duration::from_days(30);
563
564		let columns = Columns::single_row([
565			("date_col", Value::Date(date.clone())),
566			("datetime_col", Value::DateTime(datetime.clone())),
567			("time_col", Value::Time(time.clone())),
568			("interval_col", Value::Duration(duration.clone())),
569		]);
570
571		assert_eq!(columns.len(), 4);
572		assert_eq!(columns.shape(), (1, 4));
573
574		// Check that the values are correctly stored
575		assert_eq!(columns.column("date_col").unwrap().data().get_value(0), Value::Date(date));
576		assert_eq!(columns.column("datetime_col").unwrap().data().get_value(0), Value::DateTime(datetime));
577		assert_eq!(columns.column("time_col").unwrap().data().get_value(0), Value::Time(time));
578		assert_eq!(columns.column("interval_col").unwrap().data().get_value(0), Value::Duration(duration));
579	}
580
581	#[test]
582	fn test_single_row_mixed_types() {
583		let date = Date::from_ymd(2025, 7, 15).unwrap();
584		let time = Time::from_hms(9, 15, 30).unwrap();
585
586		let columns = Columns::single_row([
587			("bool_col", Value::Boolean(true)),
588			("int_col", Value::Int4(42)),
589			("str_col", Value::Utf8("hello".to_string())),
590			("date_col", Value::Date(date.clone())),
591			("time_col", Value::Time(time.clone())),
592			("none_col", Value::none()),
593		]);
594
595		assert_eq!(columns.len(), 6);
596		assert_eq!(columns.shape(), (1, 6));
597
598		// Check all values are correctly stored
599		assert_eq!(columns.column("bool_col").unwrap().data().get_value(0), Value::Boolean(true));
600		assert_eq!(columns.column("int_col").unwrap().data().get_value(0), Value::Int4(42));
601		assert_eq!(columns.column("str_col").unwrap().data().get_value(0), Value::Utf8("hello".to_string()));
602		assert_eq!(columns.column("date_col").unwrap().data().get_value(0), Value::Date(date));
603		assert_eq!(columns.column("time_col").unwrap().data().get_value(0), Value::Time(time));
604		assert_eq!(columns.column("none_col").unwrap().data().get_value(0), Value::none());
605	}
606
607	#[test]
608	fn test_single_row_normal_column_names_work() {
609		let columns = Columns::single_row([("normal_column", Value::Int4(42))]);
610		assert_eq!(columns.len(), 1);
611		assert_eq!(columns.column("normal_column").unwrap().data().get_value(0), Value::Int4(42));
612	}
613}