1use std::{
5 collections::HashMap,
6 hash::Hash,
7 mem,
8 ops::{Deref, Index, IndexMut},
9};
10
11use indexmap::IndexMap;
12use reifydb_type::{
13 Result,
14 fragment::Fragment,
15 util::cowvec::CowVec,
16 value::{Value, constraint::Constraint, datetime::DateTime, row_number::RowNumber, r#type::Type},
17};
18
19use crate::{
20 encoded::shape::{RowShape, RowShapeField},
21 interface::{
22 catalog::{table::Table, view::View},
23 resolved::{ResolvedRingBuffer, ResolvedTable, ResolvedView},
24 },
25 row::Row,
26 value::column::{Column, ColumnData, headers::ColumnHeaders},
27};
28
29#[derive(Debug, Clone)]
30pub struct Columns {
31 pub row_numbers: CowVec<RowNumber>,
32 pub created_at: CowVec<DateTime>,
33 pub updated_at: CowVec<DateTime>,
34 pub columns: CowVec<Column>,
35}
36
37impl Deref for Columns {
38 type Target = [Column];
39
40 fn deref(&self) -> &Self::Target {
41 self.columns.deref()
42 }
43}
44
45impl Index<usize> for Columns {
46 type Output = Column;
47
48 fn index(&self, index: usize) -> &Self::Output {
49 self.columns.index(index)
50 }
51}
52
53impl IndexMut<usize> for Columns {
54 fn index_mut(&mut self, index: usize) -> &mut Self::Output {
55 &mut self.columns.make_mut()[index]
56 }
57}
58
59impl Columns {
60 pub fn scalar(value: Value) -> Self {
63 let data = match value {
64 Value::None {
65 ..
66 } => ColumnData::none_typed(Type::Boolean, 1),
67 Value::Boolean(v) => ColumnData::bool([v]),
68 Value::Float4(v) => ColumnData::float4([v.into()]),
69 Value::Float8(v) => ColumnData::float8([v.into()]),
70 Value::Int1(v) => ColumnData::int1([v]),
71 Value::Int2(v) => ColumnData::int2([v]),
72 Value::Int4(v) => ColumnData::int4([v]),
73 Value::Int8(v) => ColumnData::int8([v]),
74 Value::Int16(v) => ColumnData::int16([v]),
75 Value::Utf8(v) => ColumnData::utf8([v]),
76 Value::Uint1(v) => ColumnData::uint1([v]),
77 Value::Uint2(v) => ColumnData::uint2([v]),
78 Value::Uint4(v) => ColumnData::uint4([v]),
79 Value::Uint8(v) => ColumnData::uint8([v]),
80 Value::Uint16(v) => ColumnData::uint16([v]),
81 Value::Date(v) => ColumnData::date([v]),
82 Value::DateTime(v) => ColumnData::datetime([v]),
83 Value::Time(v) => ColumnData::time([v]),
84 Value::Duration(v) => ColumnData::duration([v]),
85 Value::IdentityId(v) => ColumnData::identity_id([v]),
86 Value::Uuid4(v) => ColumnData::uuid4([v]),
87 Value::Uuid7(v) => ColumnData::uuid7([v]),
88 Value::Blob(v) => ColumnData::blob([v]),
89 Value::Int(v) => ColumnData::int(vec![v]),
90 Value::Uint(v) => ColumnData::uint(vec![v]),
91 Value::Decimal(v) => ColumnData::decimal(vec![v]),
92 Value::DictionaryId(v) => ColumnData::dictionary_id(vec![v]),
93 Value::Any(v) => ColumnData::any(vec![v]),
94 Value::Type(v) => ColumnData::any(vec![Box::new(Value::Type(v))]),
95 Value::List(v) => ColumnData::any(vec![Box::new(Value::List(v))]),
96 Value::Record(v) => ColumnData::any(vec![Box::new(Value::Record(v))]),
97 Value::Tuple(v) => ColumnData::any(vec![Box::new(Value::Tuple(v))]),
98 };
99 let column = Column {
100 name: Fragment::internal("value"),
101 data,
102 };
103 Self {
104 row_numbers: CowVec::new(Vec::new()),
105 created_at: CowVec::new(Vec::new()),
106 updated_at: CowVec::new(Vec::new()),
107 columns: CowVec::new(vec![column]),
108 }
109 }
110
111 pub fn scalar_value(&self) -> Value {
114 debug_assert_eq!(self.len(), 1, "scalar_value() requires exactly 1 column, got {}", self.len());
115 debug_assert_eq!(
116 self.row_count(),
117 1,
118 "scalar_value() requires exactly 1 row, got {}",
119 self.row_count()
120 );
121 self.columns[0].data().get_value(0)
122 }
123
124 pub fn new(columns: Vec<Column>) -> Self {
125 let n = columns.first().map_or(0, |c| c.data().len());
126 assert!(columns.iter().all(|c| c.data().len() == n));
127
128 Self {
129 row_numbers: CowVec::new(Vec::new()),
130 created_at: CowVec::new(Vec::new()),
131 updated_at: CowVec::new(Vec::new()),
132 columns: CowVec::new(columns),
133 }
134 }
135
136 pub fn with_row_numbers(columns: Vec<Column>, row_numbers: Vec<RowNumber>) -> Self {
137 let n = columns.first().map_or(0, |c| c.data().len());
138 assert!(columns.iter().all(|c| c.data().len() == n));
139 assert_eq!(row_numbers.len(), n, "row_numbers length must match column data length");
140
141 Self {
142 row_numbers: CowVec::new(row_numbers),
143 created_at: CowVec::new(Vec::new()),
144 updated_at: CowVec::new(Vec::new()),
145 columns: CowVec::new(columns),
146 }
147 }
148
149 pub fn single_row<'b>(rows: impl IntoIterator<Item = (&'b str, Value)>) -> Columns {
150 let mut columns = Vec::new();
151 let mut index = HashMap::new();
152
153 for (idx, (name, value)) in rows.into_iter().enumerate() {
154 let data = match value {
155 Value::None {
156 ..
157 } => ColumnData::none_typed(Type::Boolean, 1),
158 Value::Boolean(v) => ColumnData::bool([v]),
159 Value::Float4(v) => ColumnData::float4([v.into()]),
160 Value::Float8(v) => ColumnData::float8([v.into()]),
161 Value::Int1(v) => ColumnData::int1([v]),
162 Value::Int2(v) => ColumnData::int2([v]),
163 Value::Int4(v) => ColumnData::int4([v]),
164 Value::Int8(v) => ColumnData::int8([v]),
165 Value::Int16(v) => ColumnData::int16([v]),
166 Value::Utf8(v) => ColumnData::utf8([v.clone()]),
167 Value::Uint1(v) => ColumnData::uint1([v]),
168 Value::Uint2(v) => ColumnData::uint2([v]),
169 Value::Uint4(v) => ColumnData::uint4([v]),
170 Value::Uint8(v) => ColumnData::uint8([v]),
171 Value::Uint16(v) => ColumnData::uint16([v]),
172 Value::Date(v) => ColumnData::date([v]),
173 Value::DateTime(v) => ColumnData::datetime([v]),
174 Value::Time(v) => ColumnData::time([v]),
175 Value::Duration(v) => ColumnData::duration([v]),
176 Value::IdentityId(v) => ColumnData::identity_id([v]),
177 Value::Uuid4(v) => ColumnData::uuid4([v]),
178 Value::Uuid7(v) => ColumnData::uuid7([v]),
179 Value::Blob(v) => ColumnData::blob([v.clone()]),
180 Value::Int(v) => ColumnData::int(vec![v]),
181 Value::Uint(v) => ColumnData::uint(vec![v]),
182 Value::Decimal(v) => ColumnData::decimal(vec![v]),
183 Value::DictionaryId(v) => ColumnData::dictionary_id(vec![v]),
184 Value::Type(t) => ColumnData::any(vec![Box::new(Value::Type(t))]),
185 Value::Any(v) => ColumnData::any(vec![v]),
186 Value::List(v) => ColumnData::any(vec![Box::new(Value::List(v))]),
187 Value::Record(v) => ColumnData::any(vec![Box::new(Value::Record(v))]),
188 Value::Tuple(v) => ColumnData::any(vec![Box::new(Value::Tuple(v))]),
189 };
190
191 let column = Column {
192 name: Fragment::internal(name.to_string()),
193 data,
194 };
195 index.insert(name, idx);
196 columns.push(column);
197 }
198
199 Self {
200 row_numbers: CowVec::new(Vec::new()),
201 created_at: CowVec::new(Vec::new()),
202 updated_at: CowVec::new(Vec::new()),
203 columns: CowVec::new(columns),
204 }
205 }
206
207 pub fn apply_headers(&mut self, headers: &ColumnHeaders) {
208 for (i, name) in headers.columns.iter().enumerate() {
210 if i < self.len() {
211 let column = &mut self[i];
212 let data = mem::replace(column.data_mut(), ColumnData::none_typed(Type::Boolean, 0));
213
214 *column = Column {
215 name: name.clone(),
216 data,
217 };
218 }
219 }
220 }
221}
222
223impl Columns {
224 pub fn number(&self) -> RowNumber {
226 assert_eq!(self.row_count(), 1, "number() requires exactly 1 row, got {}", self.row_count());
227 if self.row_numbers.is_empty() {
228 RowNumber(0)
229 } else {
230 self.row_numbers[0]
231 }
232 }
233
234 pub fn shape(&self) -> (usize, usize) {
235 let row_count = if !self.row_numbers.is_empty() {
236 self.row_numbers.len()
237 } else {
238 self.first().map(|c| c.data().len()).unwrap_or(0)
239 };
240 (row_count, self.len())
241 }
242
243 pub fn is_empty(&self) -> bool {
244 self.shape().0 == 0
245 }
246
247 pub fn row(&self, i: usize) -> Vec<Value> {
248 self.iter().map(|c| c.data().get_value(i)).collect()
249 }
250
251 pub fn column(&self, name: &str) -> Option<&Column> {
252 self.iter().find(|col| col.name().text() == name)
253 }
254
255 pub fn row_count(&self) -> usize {
256 if !self.row_numbers.is_empty() {
257 self.row_numbers.len()
258 } else {
259 self.first().map_or(0, |col| col.data().len())
260 }
261 }
262
263 pub fn get_row(&self, index: usize) -> Vec<Value> {
264 self.iter().map(|col| col.data().get_value(index)).collect()
265 }
266}
267
268impl IntoIterator for Columns {
269 type Item = Column;
270 type IntoIter = std::vec::IntoIter<Column>;
271
272 fn into_iter(self) -> Self::IntoIter {
273 self.columns.into_iter()
274 }
275}
276
277impl Column {
278 pub fn extend(&mut self, other: Column) -> Result<()> {
279 self.data_mut().extend(other.data().clone())
280 }
281}
282
283impl Columns {
284 pub fn from_rows(names: &[&str], result_rows: &[Vec<Value>]) -> Self {
285 let column_count = names.len();
286
287 let mut columns: Vec<Column> = names
288 .iter()
289 .map(|name| Column {
290 name: Fragment::internal(name.to_string()),
291 data: ColumnData::none_typed(Type::Boolean, 0),
292 })
293 .collect();
294
295 for row in result_rows {
296 assert_eq!(row.len(), column_count, "row length does not match column count");
297 for (i, value) in row.iter().enumerate() {
298 columns[i].data_mut().push_value(value.clone());
299 }
300 }
301
302 Columns::new(columns)
303 }
304
305 pub fn from_rows_with_row_numbers(
306 names: &[&str],
307 result_rows: &[Vec<Value>],
308 row_numbers: Vec<RowNumber>,
309 ) -> Self {
310 let column_count = names.len();
311
312 let mut columns: Vec<Column> = names
313 .iter()
314 .map(|name| Column {
315 name: Fragment::internal(name.to_string()),
316 data: ColumnData::none_typed(Type::Boolean, 0),
317 })
318 .collect();
319
320 for row in result_rows {
321 assert_eq!(row.len(), column_count, "row length does not match column count");
322 for (i, value) in row.iter().enumerate() {
323 columns[i].data_mut().push_value(value.clone());
324 }
325 }
326
327 Columns::with_row_numbers(columns, row_numbers)
328 }
329}
330
331impl Columns {
332 pub fn empty() -> Self {
333 Self {
334 row_numbers: CowVec::new(vec![]),
335 created_at: CowVec::new(vec![]),
336 updated_at: CowVec::new(vec![]),
337 columns: CowVec::new(vec![]),
338 }
339 }
340
341 pub fn from_resolved_table(table: &ResolvedTable) -> Self {
342 Self::from_table(table.def())
343 }
344
345 pub fn from_table(table: &Table) -> Self {
347 let columns: Vec<Column> = table
348 .columns
349 .iter()
350 .map(|col| Column {
351 name: Fragment::internal(&col.name),
352 data: ColumnData::with_capacity(col.constraint.get_type(), 0),
353 })
354 .collect();
355
356 Self {
357 row_numbers: CowVec::new(Vec::new()),
358 created_at: CowVec::new(Vec::new()),
359 updated_at: CowVec::new(Vec::new()),
360 columns: CowVec::new(columns),
361 }
362 }
363
364 pub fn from_view(view: &View) -> Self {
366 let columns: Vec<Column> = view
367 .columns()
368 .iter()
369 .map(|col| Column {
370 name: Fragment::internal(&col.name),
371 data: ColumnData::with_capacity(col.constraint.get_type(), 0),
372 })
373 .collect();
374
375 Self {
376 row_numbers: CowVec::new(Vec::new()),
377 created_at: CowVec::new(Vec::new()),
378 updated_at: CowVec::new(Vec::new()),
379 columns: CowVec::new(columns),
380 }
381 }
382
383 pub fn from_ringbuffer(ringbuffer: &ResolvedRingBuffer) -> Self {
384 let _source = ringbuffer.clone();
385
386 let columns: Vec<Column> = ringbuffer
387 .columns()
388 .iter()
389 .map(|col| {
390 let column_ident = Fragment::internal(&col.name);
391 Column {
392 name: column_ident,
393 data: ColumnData::with_capacity(col.constraint.get_type(), 0),
394 }
395 })
396 .collect();
397
398 Self {
399 row_numbers: CowVec::new(Vec::new()),
400 created_at: CowVec::new(Vec::new()),
401 updated_at: CowVec::new(Vec::new()),
402 columns: CowVec::new(columns),
403 }
404 }
405
406 pub fn from_resolved_view(view: &ResolvedView) -> Self {
407 Self::from_view(view.def())
408 }
409}
410
411impl Columns {
412 pub fn extract_by_indices(&self, indices: &[usize]) -> Columns {
414 if indices.is_empty() {
415 return Columns::empty();
416 }
417
418 let new_columns: Vec<Column> = self
419 .columns
420 .iter()
421 .map(|col| {
422 let mut new_data = ColumnData::with_capacity(col.data().get_type(), indices.len());
423 for &idx in indices {
424 new_data.push_value(col.data().get_value(idx));
425 }
426 Column {
427 name: col.name.clone(),
428 data: new_data,
429 }
430 })
431 .collect();
432
433 let new_row_numbers: Vec<RowNumber> = if self.row_numbers.is_empty() {
434 Vec::new()
435 } else {
436 indices.iter().map(|&i| self.row_numbers[i]).collect()
437 };
438 let new_created_at: Vec<DateTime> = if self.created_at.is_empty() {
439 Vec::new()
440 } else {
441 indices.iter().map(|&i| self.created_at[i]).collect()
442 };
443 let new_updated_at: Vec<DateTime> = if self.updated_at.is_empty() {
444 Vec::new()
445 } else {
446 indices.iter().map(|&i| self.updated_at[i]).collect()
447 };
448 Columns {
449 row_numbers: CowVec::new(new_row_numbers),
450 created_at: CowVec::new(new_created_at),
451 updated_at: CowVec::new(new_updated_at),
452 columns: CowVec::new(new_columns),
453 }
454 }
455
456 pub fn extract_row(&self, index: usize) -> Columns {
458 self.extract_by_indices(&[index])
459 }
460
461 pub fn project_by_names(&self, names: &[String]) -> Columns {
464 let new_columns: Vec<Column> = names
465 .iter()
466 .filter_map(|name| self.columns.iter().find(|c| c.name().text() == name.as_str()).cloned())
467 .collect();
468
469 if new_columns.is_empty() {
470 return Columns::empty();
471 }
472
473 Columns {
474 row_numbers: self.row_numbers.clone(),
475 created_at: self.created_at.clone(),
476 updated_at: self.updated_at.clone(),
477 columns: CowVec::new(new_columns),
478 }
479 }
480
481 pub fn partition_by_keys<K: Hash + Eq + Clone>(&self, keys: &[K]) -> IndexMap<K, Columns> {
484 assert_eq!(keys.len(), self.row_count(), "keys length must match row count");
485
486 let mut key_to_indices: IndexMap<K, Vec<usize>> = IndexMap::new();
488 for (idx, key) in keys.iter().enumerate() {
489 key_to_indices.entry(key.clone()).or_default().push(idx);
490 }
491
492 key_to_indices.into_iter().map(|(key, indices)| (key, self.extract_by_indices(&indices))).collect()
494 }
495
496 pub fn from_row(row: &Row) -> Self {
498 let mut columns = Vec::new();
499
500 for (idx, field) in row.shape.fields().iter().enumerate() {
501 let value = row.shape.get_value(&row.encoded, idx);
502
503 let column_type = if matches!(value, Value::None { .. }) {
505 field.constraint.get_type()
506 } else {
507 value.get_type()
508 };
509
510 let mut data = if column_type.is_option() {
511 ColumnData::none_typed(column_type.clone(), 0)
512 } else {
513 ColumnData::with_capacity(column_type.clone(), 1)
514 };
515 data.push_value(value);
516
517 if column_type == Type::DictionaryId
518 && let ColumnData::DictionaryId(container) = &mut data
519 && let Some(Constraint::Dictionary(dict_id, _)) = field.constraint.constraint()
520 {
521 container.set_dictionary_id(*dict_id);
522 }
523
524 let name = row.shape.get_field_name(idx).expect("RowShape missing name for field");
525
526 columns.push(Column {
527 name: Fragment::internal(name),
528 data,
529 });
530 }
531
532 Self {
533 row_numbers: CowVec::new(vec![row.number]),
534 created_at: CowVec::new(vec![DateTime::from_nanos(row.encoded.created_at_nanos())]),
535 updated_at: CowVec::new(vec![DateTime::from_nanos(row.encoded.updated_at_nanos())]),
536 columns: CowVec::new(columns),
537 }
538 }
539
540 pub fn to_single_row(&self) -> Row {
543 assert_eq!(self.row_count(), 1, "to_row() requires exactly 1 row, got {}", self.row_count());
544 assert_eq!(
545 self.row_numbers.len(),
546 1,
547 "to_row() requires exactly 1 row number, got {}",
548 self.row_numbers.len()
549 );
550
551 let row_number = *self.row_numbers.first().unwrap();
552
553 let fields: Vec<RowShapeField> = self
555 .columns
556 .iter()
557 .map(|col| RowShapeField::unconstrained(col.name().text().to_string(), col.data().get_type()))
558 .collect();
559
560 let layout = RowShape::new(fields);
561 let mut encoded = layout.allocate();
562
563 let values: Vec<Value> = self.columns.iter().map(|col| col.data().get_value(0)).collect();
565 layout.set_values(&mut encoded, &values);
566
567 Row {
568 number: row_number,
569 encoded,
570 shape: layout,
571 }
572 }
573}
574
575#[cfg(test)]
576pub mod tests {
577 use reifydb_type::value::{date::Date, datetime::DateTime, duration::Duration, time::Time};
578
579 use super::*;
580
581 #[test]
582 fn test_single_row_temporal_types() {
583 let date = Date::from_ymd(2025, 1, 15).unwrap();
584 let datetime = DateTime::from_timestamp(1642694400).unwrap();
585 let time = Time::from_hms(14, 30, 45).unwrap();
586 let duration = Duration::from_days(30).unwrap();
587
588 let columns = Columns::single_row([
589 ("date_col", Value::Date(date.clone())),
590 ("datetime_col", Value::DateTime(datetime.clone())),
591 ("time_col", Value::Time(time.clone())),
592 ("interval_col", Value::Duration(duration.clone())),
593 ]);
594
595 assert_eq!(columns.len(), 4);
596 assert_eq!(columns.shape(), (1, 4));
597
598 assert_eq!(columns.column("date_col").unwrap().data().get_value(0), Value::Date(date));
600 assert_eq!(columns.column("datetime_col").unwrap().data().get_value(0), Value::DateTime(datetime));
601 assert_eq!(columns.column("time_col").unwrap().data().get_value(0), Value::Time(time));
602 assert_eq!(columns.column("interval_col").unwrap().data().get_value(0), Value::Duration(duration));
603 }
604
605 #[test]
606 fn test_single_row_mixed_types() {
607 let date = Date::from_ymd(2025, 7, 15).unwrap();
608 let time = Time::from_hms(9, 15, 30).unwrap();
609
610 let columns = Columns::single_row([
611 ("bool_col", Value::Boolean(true)),
612 ("int_col", Value::Int4(42)),
613 ("str_col", Value::Utf8("hello".to_string())),
614 ("date_col", Value::Date(date.clone())),
615 ("time_col", Value::Time(time.clone())),
616 ("none_col", Value::none()),
617 ]);
618
619 assert_eq!(columns.len(), 6);
620 assert_eq!(columns.shape(), (1, 6));
621
622 assert_eq!(columns.column("bool_col").unwrap().data().get_value(0), Value::Boolean(true));
624 assert_eq!(columns.column("int_col").unwrap().data().get_value(0), Value::Int4(42));
625 assert_eq!(columns.column("str_col").unwrap().data().get_value(0), Value::Utf8("hello".to_string()));
626 assert_eq!(columns.column("date_col").unwrap().data().get_value(0), Value::Date(date));
627 assert_eq!(columns.column("time_col").unwrap().data().get_value(0), Value::Time(time));
628 assert_eq!(columns.column("none_col").unwrap().data().get_value(0), Value::none());
629 }
630
631 #[test]
632 fn test_single_row_normal_column_names_work() {
633 let columns = Columns::single_row([("normal_column", Value::Int4(42))]);
634 assert_eq!(columns.len(), 1);
635 assert_eq!(columns.column("normal_column").unwrap().data().get_value(0), Value::Int4(42));
636 }
637}