1use std::{
5 collections::HashMap,
6 hash::Hash,
7 ops::{Deref, Index, IndexMut},
8};
9
10use indexmap::IndexMap;
11use reifydb_type::{
12 fragment::Fragment,
13 util::cowvec::CowVec,
14 value::{Value, constraint::Constraint, row_number::RowNumber, r#type::Type},
15};
16
17use crate::{
18 encoded::schema::{Schema, SchemaField},
19 interface::{
20 catalog::{table::TableDef, view::ViewDef},
21 resolved::{ResolvedRingBuffer, ResolvedTable, ResolvedView},
22 },
23 row::Row,
24 value::column::{Column, ColumnData, headers::ColumnHeaders},
25};
26
27#[derive(Debug, Clone)]
28pub struct Columns {
29 pub row_numbers: CowVec<RowNumber>,
30 pub columns: CowVec<Column>,
31}
32
33impl Deref for Columns {
34 type Target = [Column];
35
36 fn deref(&self) -> &Self::Target {
37 self.columns.deref()
38 }
39}
40
41impl Index<usize> for Columns {
42 type Output = Column;
43
44 fn index(&self, index: usize) -> &Self::Output {
45 self.columns.index(index)
46 }
47}
48
49impl IndexMut<usize> for Columns {
50 fn index_mut(&mut self, index: usize) -> &mut Self::Output {
51 &mut self.columns.make_mut()[index]
52 }
53}
54
55impl Columns {
56 pub fn scalar(value: Value) -> Self {
59 let data = match value {
60 Value::None {
61 ..
62 } => ColumnData::none_typed(Type::Boolean, 1),
63 Value::Boolean(v) => ColumnData::bool([v]),
64 Value::Float4(v) => ColumnData::float4([v.into()]),
65 Value::Float8(v) => ColumnData::float8([v.into()]),
66 Value::Int1(v) => ColumnData::int1([v]),
67 Value::Int2(v) => ColumnData::int2([v]),
68 Value::Int4(v) => ColumnData::int4([v]),
69 Value::Int8(v) => ColumnData::int8([v]),
70 Value::Int16(v) => ColumnData::int16([v]),
71 Value::Utf8(v) => ColumnData::utf8([v]),
72 Value::Uint1(v) => ColumnData::uint1([v]),
73 Value::Uint2(v) => ColumnData::uint2([v]),
74 Value::Uint4(v) => ColumnData::uint4([v]),
75 Value::Uint8(v) => ColumnData::uint8([v]),
76 Value::Uint16(v) => ColumnData::uint16([v]),
77 Value::Date(v) => ColumnData::date([v]),
78 Value::DateTime(v) => ColumnData::datetime([v]),
79 Value::Time(v) => ColumnData::time([v]),
80 Value::Duration(v) => ColumnData::duration([v]),
81 Value::IdentityId(v) => ColumnData::identity_id([v]),
82 Value::Uuid4(v) => ColumnData::uuid4([v]),
83 Value::Uuid7(v) => ColumnData::uuid7([v]),
84 Value::Blob(v) => ColumnData::blob([v]),
85 Value::Int(v) => ColumnData::int(vec![v]),
86 Value::Uint(v) => ColumnData::uint(vec![v]),
87 Value::Decimal(v) => ColumnData::decimal(vec![v]),
88 Value::DictionaryId(v) => ColumnData::dictionary_id(vec![v]),
89 Value::Any(v) => ColumnData::any(vec![v]),
90 Value::Type(v) => ColumnData::any(vec![Box::new(Value::Type(v))]),
91 };
92 let column = Column {
93 name: Fragment::internal("value"),
94 data,
95 };
96 Self {
97 row_numbers: CowVec::new(Vec::new()),
98 columns: CowVec::new(vec![column]),
99 }
100 }
101
102 pub fn scalar_value(&self) -> Value {
105 debug_assert_eq!(self.len(), 1, "scalar_value() requires exactly 1 column, got {}", self.len());
106 debug_assert_eq!(
107 self.row_count(),
108 1,
109 "scalar_value() requires exactly 1 row, got {}",
110 self.row_count()
111 );
112 self.columns[0].data().get_value(0)
113 }
114
115 pub fn new(columns: Vec<Column>) -> Self {
116 let n = columns.first().map_or(0, |c| c.data().len());
117 assert!(columns.iter().all(|c| c.data().len() == n));
118
119 Self {
120 row_numbers: CowVec::new(Vec::new()),
121 columns: CowVec::new(columns),
122 }
123 }
124
125 pub fn with_row_numbers(columns: Vec<Column>, row_numbers: Vec<RowNumber>) -> Self {
126 let n = columns.first().map_or(0, |c| c.data().len());
127 assert!(columns.iter().all(|c| c.data().len() == n));
128 assert_eq!(row_numbers.len(), n, "row_numbers length must match column data length");
129
130 Self {
131 row_numbers: CowVec::new(row_numbers),
132 columns: CowVec::new(columns),
133 }
134 }
135
136 pub fn single_row<'b>(rows: impl IntoIterator<Item = (&'b str, Value)>) -> Columns {
137 let mut columns = Vec::new();
138 let mut index = HashMap::new();
139
140 for (idx, (name, value)) in rows.into_iter().enumerate() {
141 let data = match value {
142 Value::None {
143 ..
144 } => ColumnData::none_typed(Type::Boolean, 1),
145 Value::Boolean(v) => ColumnData::bool([v]),
146 Value::Float4(v) => ColumnData::float4([v.into()]),
147 Value::Float8(v) => ColumnData::float8([v.into()]),
148 Value::Int1(v) => ColumnData::int1([v]),
149 Value::Int2(v) => ColumnData::int2([v]),
150 Value::Int4(v) => ColumnData::int4([v]),
151 Value::Int8(v) => ColumnData::int8([v]),
152 Value::Int16(v) => ColumnData::int16([v]),
153 Value::Utf8(v) => ColumnData::utf8([v.clone()]),
154 Value::Uint1(v) => ColumnData::uint1([v]),
155 Value::Uint2(v) => ColumnData::uint2([v]),
156 Value::Uint4(v) => ColumnData::uint4([v]),
157 Value::Uint8(v) => ColumnData::uint8([v]),
158 Value::Uint16(v) => ColumnData::uint16([v]),
159 Value::Date(v) => ColumnData::date([v.clone()]),
160 Value::DateTime(v) => ColumnData::datetime([v.clone()]),
161 Value::Time(v) => ColumnData::time([v.clone()]),
162 Value::Duration(v) => ColumnData::duration([v.clone()]),
163 Value::IdentityId(v) => ColumnData::identity_id([v]),
164 Value::Uuid4(v) => ColumnData::uuid4([v]),
165 Value::Uuid7(v) => ColumnData::uuid7([v]),
166 Value::Blob(v) => ColumnData::blob([v.clone()]),
167 Value::Int(v) => ColumnData::int(vec![v]),
168 Value::Uint(v) => ColumnData::uint(vec![v]),
169 Value::Decimal(v) => ColumnData::decimal(vec![v]),
170 Value::DictionaryId(v) => ColumnData::dictionary_id(vec![v]),
171 Value::Type(t) => ColumnData::any(vec![Box::new(Value::Type(t))]),
172 Value::Any(v) => ColumnData::any(vec![v]),
173 };
174
175 let column = Column {
176 name: Fragment::internal(name.to_string()),
177 data,
178 };
179 index.insert(name, idx);
180 columns.push(column);
181 }
182
183 Self {
184 row_numbers: CowVec::new(Vec::new()),
185 columns: CowVec::new(columns),
186 }
187 }
188
189 pub fn apply_headers(&mut self, headers: &ColumnHeaders) {
190 for (i, name) in headers.columns.iter().enumerate() {
192 if i < self.len() {
193 let column = &mut self[i];
194 let data =
195 std::mem::replace(column.data_mut(), ColumnData::none_typed(Type::Boolean, 0));
196
197 *column = Column {
198 name: name.clone(),
199 data,
200 };
201 }
202 }
203 }
204}
205
206impl Columns {
207 pub fn number(&self) -> RowNumber {
209 assert_eq!(self.row_count(), 1, "number() requires exactly 1 row, got {}", self.row_count());
210 if self.row_numbers.is_empty() {
211 RowNumber(0)
212 } else {
213 self.row_numbers[0]
214 }
215 }
216
217 pub fn shape(&self) -> (usize, usize) {
218 let row_count = if !self.row_numbers.is_empty() {
219 self.row_numbers.len()
220 } else {
221 self.get(0).map(|c| c.data().len()).unwrap_or(0)
222 };
223 (row_count, self.len())
224 }
225
226 pub fn into_iter(self) -> impl Iterator<Item = Column> {
227 self.columns.into_iter()
228 }
229
230 pub fn is_empty(&self) -> bool {
231 self.shape().0 == 0
232 }
233
234 pub fn row(&self, i: usize) -> Vec<Value> {
235 self.iter().map(|c| c.data().get_value(i)).collect()
236 }
237
238 pub fn column(&self, name: &str) -> Option<&Column> {
239 self.iter().find(|col| col.name().text() == name)
240 }
241
242 pub fn row_count(&self) -> usize {
243 if !self.row_numbers.is_empty() {
244 self.row_numbers.len()
245 } else {
246 self.first().map_or(0, |col| col.data().len())
247 }
248 }
249
250 pub fn get_row(&self, index: usize) -> Vec<Value> {
251 self.iter().map(|col| col.data().get_value(index)).collect()
252 }
253}
254
255impl Column {
256 pub fn extend(&mut self, other: Column) -> reifydb_type::Result<()> {
257 self.data_mut().extend(other.data().clone())
258 }
259}
260
261impl Columns {
262 pub fn from_rows(names: &[&str], result_rows: &[Vec<Value>]) -> Self {
263 let column_count = names.len();
264
265 let mut columns: Vec<Column> = names
266 .iter()
267 .map(|name| Column {
268 name: Fragment::internal(name.to_string()),
269 data: ColumnData::none_typed(Type::Boolean, 0),
270 })
271 .collect();
272
273 for row in result_rows {
274 assert_eq!(row.len(), column_count, "row length does not match column count");
275 for (i, value) in row.iter().enumerate() {
276 columns[i].data_mut().push_value(value.clone());
277 }
278 }
279
280 Columns::new(columns)
281 }
282
283 pub fn from_rows_with_row_numbers(
284 names: &[&str],
285 result_rows: &[Vec<Value>],
286 row_numbers: Vec<RowNumber>,
287 ) -> Self {
288 let column_count = names.len();
289
290 let mut columns: Vec<Column> = names
291 .iter()
292 .map(|name| Column {
293 name: Fragment::internal(name.to_string()),
294 data: ColumnData::none_typed(Type::Boolean, 0),
295 })
296 .collect();
297
298 for row in result_rows {
299 assert_eq!(row.len(), column_count, "row length does not match column count");
300 for (i, value) in row.iter().enumerate() {
301 columns[i].data_mut().push_value(value.clone());
302 }
303 }
304
305 Columns::with_row_numbers(columns, row_numbers)
306 }
307}
308
309impl Columns {
310 pub fn empty() -> Self {
311 Self {
312 row_numbers: CowVec::new(vec![]),
313 columns: CowVec::new(vec![]),
314 }
315 }
316
317 pub fn from_table(table: &ResolvedTable) -> Self {
318 let _source = table.clone();
319
320 let columns: Vec<Column> = table
321 .columns()
322 .iter()
323 .map(|col| {
324 let column_ident = Fragment::internal(&col.name);
325 Column {
326 name: column_ident,
327 data: ColumnData::with_capacity(col.constraint.get_type(), 0),
328 }
329 })
330 .collect();
331
332 Self {
333 row_numbers: CowVec::new(Vec::new()),
334 columns: CowVec::new(columns),
335 }
336 }
337
338 pub fn from_table_def(table: &TableDef) -> Self {
340 let columns: Vec<Column> = table
341 .columns
342 .iter()
343 .map(|col| Column {
344 name: Fragment::internal(&col.name),
345 data: ColumnData::with_capacity(col.constraint.get_type(), 0),
346 })
347 .collect();
348
349 Self {
350 row_numbers: CowVec::new(Vec::new()),
351 columns: CowVec::new(columns),
352 }
353 }
354
355 pub fn from_view_def(view: &ViewDef) -> Self {
357 let columns: Vec<Column> = view
358 .columns
359 .iter()
360 .map(|col| Column {
361 name: Fragment::internal(&col.name),
362 data: ColumnData::with_capacity(col.constraint.get_type(), 0),
363 })
364 .collect();
365
366 Self {
367 row_numbers: CowVec::new(Vec::new()),
368 columns: CowVec::new(columns),
369 }
370 }
371
372 pub fn from_ringbuffer(ringbuffer: &ResolvedRingBuffer) -> Self {
373 let _source = ringbuffer.clone();
374
375 let columns: Vec<Column> = ringbuffer
376 .columns()
377 .iter()
378 .map(|col| {
379 let column_ident = Fragment::internal(&col.name);
380 Column {
381 name: column_ident,
382 data: ColumnData::with_capacity(col.constraint.get_type(), 0),
383 }
384 })
385 .collect();
386
387 Self {
388 row_numbers: CowVec::new(Vec::new()),
389 columns: CowVec::new(columns),
390 }
391 }
392
393 pub fn from_view(view: &ResolvedView) -> Self {
394 let _source = view.clone();
395
396 let columns: Vec<Column> = view
397 .columns()
398 .iter()
399 .map(|col| {
400 let column_ident = Fragment::internal(&col.name);
401 Column {
402 name: column_ident,
403 data: ColumnData::with_capacity(col.constraint.get_type(), 0),
404 }
405 })
406 .collect();
407
408 Self {
409 row_numbers: CowVec::new(Vec::new()),
410 columns: CowVec::new(columns),
411 }
412 }
413}
414
415impl Columns {
416 pub fn extract_by_indices(&self, indices: &[usize]) -> Columns {
418 if indices.is_empty() {
419 return Columns::empty();
420 }
421
422 let new_columns: Vec<Column> = self
423 .columns
424 .iter()
425 .map(|col| {
426 let mut new_data = ColumnData::with_capacity(col.data().get_type(), indices.len());
427 for &idx in indices {
428 new_data.push_value(col.data().get_value(idx));
429 }
430 Column {
431 name: col.name.clone(),
432 data: new_data,
433 }
434 })
435 .collect();
436
437 if self.row_numbers.is_empty() {
438 Columns::new(new_columns)
439 } else {
440 let new_row_numbers: Vec<RowNumber> = indices.iter().map(|&i| self.row_numbers[i]).collect();
441 Columns::with_row_numbers(new_columns, new_row_numbers)
442 }
443 }
444
445 pub fn extract_row(&self, index: usize) -> Columns {
447 self.extract_by_indices(&[index])
448 }
449
450 pub fn partition_by_keys<K: Hash + Eq + Clone>(&self, keys: &[K]) -> IndexMap<K, Columns> {
453 assert_eq!(keys.len(), self.row_count(), "keys length must match row count");
454
455 let mut key_to_indices: IndexMap<K, Vec<usize>> = IndexMap::new();
457 for (idx, key) in keys.iter().enumerate() {
458 key_to_indices.entry(key.clone()).or_default().push(idx);
459 }
460
461 key_to_indices.into_iter().map(|(key, indices)| (key, self.extract_by_indices(&indices))).collect()
463 }
464
465 pub fn from_row(row: &Row) -> Self {
467 let mut columns = Vec::new();
468
469 for (idx, field) in row.schema.fields().iter().enumerate() {
470 let value = row.schema.get_value(&row.encoded, idx);
471
472 let column_type = if matches!(value, Value::None { .. }) {
474 field.constraint.get_type()
475 } else {
476 value.get_type()
477 };
478
479 let mut data = if column_type.is_option() {
480 ColumnData::none_typed(column_type.clone(), 0)
481 } else {
482 ColumnData::with_capacity(column_type.clone(), 1)
483 };
484 data.push_value(value);
485
486 if column_type == Type::DictionaryId {
487 if let ColumnData::DictionaryId(container) = &mut data {
488 if let Some(Constraint::Dictionary(dict_id, _)) = field.constraint.constraint()
489 {
490 container.set_dictionary_id(*dict_id);
491 }
492 }
493 }
494
495 let name = row.schema.get_field_name(idx).expect("Schema missing name for field");
496
497 columns.push(Column {
498 name: Fragment::internal(name),
499 data,
500 });
501 }
502
503 Self {
504 row_numbers: CowVec::new(vec![row.number]),
505 columns: CowVec::new(columns),
506 }
507 }
508
509 pub fn to_single_row(&self) -> Row {
512 assert_eq!(self.row_count(), 1, "to_row() requires exactly 1 row, got {}", self.row_count());
513 assert_eq!(
514 self.row_numbers.len(),
515 1,
516 "to_row() requires exactly 1 row number, got {}",
517 self.row_numbers.len()
518 );
519
520 let row_number = self.row_numbers.first().unwrap().clone();
521
522 let fields: Vec<SchemaField> = self
524 .columns
525 .iter()
526 .map(|col| SchemaField::unconstrained(col.name().text().to_string(), col.data().get_type()))
527 .collect();
528
529 let layout = Schema::new(fields);
530 let mut encoded = layout.allocate();
531
532 let values: Vec<Value> = self.columns.iter().map(|col| col.data().get_value(0)).collect();
534 layout.set_values(&mut encoded, &values);
535
536 Row {
537 number: row_number,
538 encoded,
539 schema: layout,
540 }
541 }
542}
543
544#[cfg(test)]
545pub mod tests {
546 use reifydb_type::value::{date::Date, datetime::DateTime, duration::Duration, time::Time};
547
548 use super::*;
549
550 #[test]
551 fn test_single_row_temporal_types() {
552 let date = Date::from_ymd(2025, 1, 15).unwrap();
553 let datetime = DateTime::from_timestamp(1642694400).unwrap();
554 let time = Time::from_hms(14, 30, 45).unwrap();
555 let duration = Duration::from_days(30);
556
557 let columns = Columns::single_row([
558 ("date_col", Value::Date(date.clone())),
559 ("datetime_col", Value::DateTime(datetime.clone())),
560 ("time_col", Value::Time(time.clone())),
561 ("interval_col", Value::Duration(duration.clone())),
562 ]);
563
564 assert_eq!(columns.len(), 4);
565 assert_eq!(columns.shape(), (1, 4));
566
567 assert_eq!(columns.column("date_col").unwrap().data().get_value(0), Value::Date(date));
569 assert_eq!(columns.column("datetime_col").unwrap().data().get_value(0), Value::DateTime(datetime));
570 assert_eq!(columns.column("time_col").unwrap().data().get_value(0), Value::Time(time));
571 assert_eq!(columns.column("interval_col").unwrap().data().get_value(0), Value::Duration(duration));
572 }
573
574 #[test]
575 fn test_single_row_mixed_types() {
576 let date = Date::from_ymd(2025, 7, 15).unwrap();
577 let time = Time::from_hms(9, 15, 30).unwrap();
578
579 let columns = Columns::single_row([
580 ("bool_col", Value::Boolean(true)),
581 ("int_col", Value::Int4(42)),
582 ("str_col", Value::Utf8("hello".to_string())),
583 ("date_col", Value::Date(date.clone())),
584 ("time_col", Value::Time(time.clone())),
585 ("none_col", Value::none()),
586 ]);
587
588 assert_eq!(columns.len(), 6);
589 assert_eq!(columns.shape(), (1, 6));
590
591 assert_eq!(columns.column("bool_col").unwrap().data().get_value(0), Value::Boolean(true));
593 assert_eq!(columns.column("int_col").unwrap().data().get_value(0), Value::Int4(42));
594 assert_eq!(columns.column("str_col").unwrap().data().get_value(0), Value::Utf8("hello".to_string()));
595 assert_eq!(columns.column("date_col").unwrap().data().get_value(0), Value::Date(date));
596 assert_eq!(columns.column("time_col").unwrap().data().get_value(0), Value::Time(time));
597 assert_eq!(columns.column("none_col").unwrap().data().get_value(0), Value::none());
598 }
599
600 #[test]
601 fn test_single_row_normal_column_names_work() {
602 let columns = Columns::single_row([("normal_column", Value::Int4(42))]);
603 assert_eq!(columns.len(), 1);
604 assert_eq!(columns.column("normal_column").unwrap().data().get_value(0), Value::Int4(42));
605 }
606}