1use std::{
5 collections::HashMap,
6 hash::Hash,
7 mem,
8 ops::{Deref, Index, IndexMut},
9};
10
11use indexmap::IndexMap;
12use reifydb_type::{
13 Result,
14 fragment::Fragment,
15 util::cowvec::CowVec,
16 value::{Value, constraint::Constraint, datetime::DateTime, row_number::RowNumber, r#type::Type},
17};
18
19use crate::{
20 encoded::shape::{RowShape, RowShapeField},
21 interface::{
22 catalog::{table::Table, view::View},
23 resolved::{ResolvedRingBuffer, ResolvedTable, ResolvedView},
24 },
25 row::Row,
26 value::column::{Column, ColumnData, headers::ColumnHeaders},
27};
28
29#[derive(Debug, Clone)]
30pub struct Columns {
31 pub row_numbers: CowVec<RowNumber>,
32 pub created_at: CowVec<DateTime>,
33 pub updated_at: CowVec<DateTime>,
34 pub columns: CowVec<Column>,
35}
36
37impl Deref for Columns {
38 type Target = [Column];
39
40 fn deref(&self) -> &Self::Target {
41 self.columns.deref()
42 }
43}
44
45impl Index<usize> for Columns {
46 type Output = Column;
47
48 fn index(&self, index: usize) -> &Self::Output {
49 self.columns.index(index)
50 }
51}
52
53impl IndexMut<usize> for Columns {
54 fn index_mut(&mut self, index: usize) -> &mut Self::Output {
55 &mut self.columns.make_mut()[index]
56 }
57}
58
59impl Columns {
60 pub fn scalar(value: Value) -> Self {
63 let data = match value {
64 Value::None {
65 ..
66 } => ColumnData::none_typed(Type::Boolean, 1),
67 Value::Boolean(v) => ColumnData::bool([v]),
68 Value::Float4(v) => ColumnData::float4([v.into()]),
69 Value::Float8(v) => ColumnData::float8([v.into()]),
70 Value::Int1(v) => ColumnData::int1([v]),
71 Value::Int2(v) => ColumnData::int2([v]),
72 Value::Int4(v) => ColumnData::int4([v]),
73 Value::Int8(v) => ColumnData::int8([v]),
74 Value::Int16(v) => ColumnData::int16([v]),
75 Value::Utf8(v) => ColumnData::utf8([v]),
76 Value::Uint1(v) => ColumnData::uint1([v]),
77 Value::Uint2(v) => ColumnData::uint2([v]),
78 Value::Uint4(v) => ColumnData::uint4([v]),
79 Value::Uint8(v) => ColumnData::uint8([v]),
80 Value::Uint16(v) => ColumnData::uint16([v]),
81 Value::Date(v) => ColumnData::date([v]),
82 Value::DateTime(v) => ColumnData::datetime([v]),
83 Value::Time(v) => ColumnData::time([v]),
84 Value::Duration(v) => ColumnData::duration([v]),
85 Value::IdentityId(v) => ColumnData::identity_id([v]),
86 Value::Uuid4(v) => ColumnData::uuid4([v]),
87 Value::Uuid7(v) => ColumnData::uuid7([v]),
88 Value::Blob(v) => ColumnData::blob([v]),
89 Value::Int(v) => ColumnData::int(vec![v]),
90 Value::Uint(v) => ColumnData::uint(vec![v]),
91 Value::Decimal(v) => ColumnData::decimal(vec![v]),
92 Value::DictionaryId(v) => ColumnData::dictionary_id(vec![v]),
93 Value::Any(v) => ColumnData::any(vec![v]),
94 Value::Type(v) => ColumnData::any(vec![Box::new(Value::Type(v))]),
95 Value::List(v) => ColumnData::any(vec![Box::new(Value::List(v))]),
96 Value::Record(v) => ColumnData::any(vec![Box::new(Value::Record(v))]),
97 Value::Tuple(v) => ColumnData::any(vec![Box::new(Value::Tuple(v))]),
98 };
99 let column = Column {
100 name: Fragment::internal("value"),
101 data,
102 };
103 Self {
104 row_numbers: CowVec::new(Vec::new()),
105 created_at: CowVec::new(Vec::new()),
106 updated_at: CowVec::new(Vec::new()),
107 columns: CowVec::new(vec![column]),
108 }
109 }
110
111 pub fn scalar_value(&self) -> Value {
114 debug_assert_eq!(self.len(), 1, "scalar_value() requires exactly 1 column, got {}", self.len());
115 debug_assert_eq!(
116 self.row_count(),
117 1,
118 "scalar_value() requires exactly 1 row, got {}",
119 self.row_count()
120 );
121 self.columns[0].data().get_value(0)
122 }
123
124 pub fn new(columns: Vec<Column>) -> Self {
125 let n = columns.first().map_or(0, |c| c.data().len());
126 assert!(columns.iter().all(|c| c.data().len() == n));
127
128 Self {
129 row_numbers: CowVec::new(Vec::new()),
130 created_at: CowVec::new(Vec::new()),
131 updated_at: CowVec::new(Vec::new()),
132 columns: CowVec::new(columns),
133 }
134 }
135
136 pub fn with_system_columns(
137 columns: Vec<Column>,
138 row_numbers: Vec<RowNumber>,
139 created_at: Vec<DateTime>,
140 updated_at: Vec<DateTime>,
141 ) -> Self {
142 let n = columns.first().map_or(0, |c| c.data().len());
143 assert!(columns.iter().all(|c| c.data().len() == n));
144 assert_eq!(row_numbers.len(), n, "row_numbers length must match column data length");
145 assert_eq!(created_at.len(), n, "created_at length must match column data length");
146 assert_eq!(updated_at.len(), n, "updated_at length must match column data length");
147
148 Self {
149 row_numbers: CowVec::new(row_numbers),
150 created_at: CowVec::new(created_at),
151 updated_at: CowVec::new(updated_at),
152 columns: CowVec::new(columns),
153 }
154 }
155
156 pub fn single_row<'b>(rows: impl IntoIterator<Item = (&'b str, Value)>) -> Columns {
157 let mut columns = Vec::new();
158 let mut index = HashMap::new();
159
160 for (idx, (name, value)) in rows.into_iter().enumerate() {
161 let data = match value {
162 Value::None {
163 ..
164 } => ColumnData::none_typed(Type::Boolean, 1),
165 Value::Boolean(v) => ColumnData::bool([v]),
166 Value::Float4(v) => ColumnData::float4([v.into()]),
167 Value::Float8(v) => ColumnData::float8([v.into()]),
168 Value::Int1(v) => ColumnData::int1([v]),
169 Value::Int2(v) => ColumnData::int2([v]),
170 Value::Int4(v) => ColumnData::int4([v]),
171 Value::Int8(v) => ColumnData::int8([v]),
172 Value::Int16(v) => ColumnData::int16([v]),
173 Value::Utf8(v) => ColumnData::utf8([v.clone()]),
174 Value::Uint1(v) => ColumnData::uint1([v]),
175 Value::Uint2(v) => ColumnData::uint2([v]),
176 Value::Uint4(v) => ColumnData::uint4([v]),
177 Value::Uint8(v) => ColumnData::uint8([v]),
178 Value::Uint16(v) => ColumnData::uint16([v]),
179 Value::Date(v) => ColumnData::date([v]),
180 Value::DateTime(v) => ColumnData::datetime([v]),
181 Value::Time(v) => ColumnData::time([v]),
182 Value::Duration(v) => ColumnData::duration([v]),
183 Value::IdentityId(v) => ColumnData::identity_id([v]),
184 Value::Uuid4(v) => ColumnData::uuid4([v]),
185 Value::Uuid7(v) => ColumnData::uuid7([v]),
186 Value::Blob(v) => ColumnData::blob([v.clone()]),
187 Value::Int(v) => ColumnData::int(vec![v]),
188 Value::Uint(v) => ColumnData::uint(vec![v]),
189 Value::Decimal(v) => ColumnData::decimal(vec![v]),
190 Value::DictionaryId(v) => ColumnData::dictionary_id(vec![v]),
191 Value::Type(t) => ColumnData::any(vec![Box::new(Value::Type(t))]),
192 Value::Any(v) => ColumnData::any(vec![v]),
193 Value::List(v) => ColumnData::any(vec![Box::new(Value::List(v))]),
194 Value::Record(v) => ColumnData::any(vec![Box::new(Value::Record(v))]),
195 Value::Tuple(v) => ColumnData::any(vec![Box::new(Value::Tuple(v))]),
196 };
197
198 let column = Column {
199 name: Fragment::internal(name.to_string()),
200 data,
201 };
202 index.insert(name, idx);
203 columns.push(column);
204 }
205
206 Self {
207 row_numbers: CowVec::new(Vec::new()),
208 created_at: CowVec::new(Vec::new()),
209 updated_at: CowVec::new(Vec::new()),
210 columns: CowVec::new(columns),
211 }
212 }
213
214 pub fn apply_headers(&mut self, headers: &ColumnHeaders) {
215 for (i, name) in headers.columns.iter().enumerate() {
217 if i < self.len() {
218 let column = &mut self[i];
219 let data = mem::replace(column.data_mut(), ColumnData::none_typed(Type::Boolean, 0));
220
221 *column = Column {
222 name: name.clone(),
223 data,
224 };
225 }
226 }
227 }
228}
229
230impl Columns {
231 pub fn number(&self) -> RowNumber {
233 assert_eq!(self.row_count(), 1, "number() requires exactly 1 row, got {}", self.row_count());
234 if self.row_numbers.is_empty() {
235 RowNumber(0)
236 } else {
237 self.row_numbers[0]
238 }
239 }
240
241 pub fn shape(&self) -> (usize, usize) {
242 let row_count = if !self.row_numbers.is_empty() {
243 self.row_numbers.len()
244 } else {
245 self.first().map(|c| c.data().len()).unwrap_or(0)
246 };
247 (row_count, self.len())
248 }
249
250 pub fn is_empty(&self) -> bool {
251 self.shape().0 == 0
252 }
253
254 pub fn row(&self, i: usize) -> Vec<Value> {
255 self.iter().map(|c| c.data().get_value(i)).collect()
256 }
257
258 pub fn column(&self, name: &str) -> Option<&Column> {
259 self.iter().find(|col| col.name().text() == name)
260 }
261
262 pub fn row_count(&self) -> usize {
263 if !self.row_numbers.is_empty() {
264 self.row_numbers.len()
265 } else {
266 self.first().map_or(0, |col| col.data().len())
267 }
268 }
269
270 pub fn is_scalar(&self) -> bool {
271 self.len() == 1 && self.row_count() == 1
272 }
273
274 pub fn get_row(&self, index: usize) -> Vec<Value> {
275 self.iter().map(|col| col.data().get_value(index)).collect()
276 }
277}
278
279impl IntoIterator for Columns {
280 type Item = Column;
281 type IntoIter = std::vec::IntoIter<Column>;
282
283 fn into_iter(self) -> Self::IntoIter {
284 self.columns.into_iter()
285 }
286}
287
288impl Column {
289 pub fn extend(&mut self, other: Column) -> Result<()> {
290 self.data_mut().extend(other.data().clone())
291 }
292}
293
294impl Columns {
295 pub fn from_rows(names: &[&str], result_rows: &[Vec<Value>]) -> Self {
296 let column_count = names.len();
297
298 let mut columns: Vec<Column> = names
299 .iter()
300 .map(|name| Column {
301 name: Fragment::internal(name.to_string()),
302 data: ColumnData::none_typed(Type::Boolean, 0),
303 })
304 .collect();
305
306 for row in result_rows {
307 assert_eq!(row.len(), column_count, "row length does not match column count");
308 for (i, value) in row.iter().enumerate() {
309 columns[i].data_mut().push_value(value.clone());
310 }
311 }
312
313 Columns::new(columns)
314 }
315
316 pub fn from_rows_with_row_numbers(
317 names: &[&str],
318 result_rows: &[Vec<Value>],
319 row_numbers: Vec<RowNumber>,
320 ) -> Self {
321 let column_count = names.len();
322
323 let mut columns: Vec<Column> = names
324 .iter()
325 .map(|name| Column {
326 name: Fragment::internal(name.to_string()),
327 data: ColumnData::none_typed(Type::Boolean, 0),
328 })
329 .collect();
330
331 for row in result_rows {
332 assert_eq!(row.len(), column_count, "row length does not match column count");
333 for (i, value) in row.iter().enumerate() {
334 columns[i].data_mut().push_value(value.clone());
335 }
336 }
337
338 let n = row_numbers.len();
339 let now = DateTime::default();
340 Columns::with_system_columns(columns, row_numbers, vec![now; n], vec![now; n])
341 }
342}
343
344impl Columns {
345 pub fn empty() -> Self {
346 Self {
347 row_numbers: CowVec::new(vec![]),
348 created_at: CowVec::new(vec![]),
349 updated_at: CowVec::new(vec![]),
350 columns: CowVec::new(vec![]),
351 }
352 }
353
354 pub fn from_resolved_table(table: &ResolvedTable) -> Self {
355 Self::from_table(table.def())
356 }
357
358 pub fn from_table(table: &Table) -> Self {
360 let columns: Vec<Column> = table
361 .columns
362 .iter()
363 .map(|col| Column {
364 name: Fragment::internal(&col.name),
365 data: ColumnData::with_capacity(col.constraint.get_type(), 0),
366 })
367 .collect();
368
369 Self {
370 row_numbers: CowVec::new(Vec::new()),
371 created_at: CowVec::new(Vec::new()),
372 updated_at: CowVec::new(Vec::new()),
373 columns: CowVec::new(columns),
374 }
375 }
376
377 pub fn from_view(view: &View) -> Self {
379 let columns: Vec<Column> = view
380 .columns()
381 .iter()
382 .map(|col| Column {
383 name: Fragment::internal(&col.name),
384 data: ColumnData::with_capacity(col.constraint.get_type(), 0),
385 })
386 .collect();
387
388 Self {
389 row_numbers: CowVec::new(Vec::new()),
390 created_at: CowVec::new(Vec::new()),
391 updated_at: CowVec::new(Vec::new()),
392 columns: CowVec::new(columns),
393 }
394 }
395
396 pub fn from_ringbuffer(ringbuffer: &ResolvedRingBuffer) -> Self {
397 let _source = ringbuffer.clone();
398
399 let columns: Vec<Column> = ringbuffer
400 .columns()
401 .iter()
402 .map(|col| {
403 let column_ident = Fragment::internal(&col.name);
404 Column {
405 name: column_ident,
406 data: ColumnData::with_capacity(col.constraint.get_type(), 0),
407 }
408 })
409 .collect();
410
411 Self {
412 row_numbers: CowVec::new(Vec::new()),
413 created_at: CowVec::new(Vec::new()),
414 updated_at: CowVec::new(Vec::new()),
415 columns: CowVec::new(columns),
416 }
417 }
418
419 pub fn from_resolved_view(view: &ResolvedView) -> Self {
420 Self::from_view(view.def())
421 }
422}
423
424impl Columns {
425 pub fn extract_by_indices(&self, indices: &[usize]) -> Columns {
427 if indices.is_empty() {
428 return Columns::empty();
429 }
430
431 let new_columns: Vec<Column> = self
432 .columns
433 .iter()
434 .map(|col| {
435 let mut new_data = ColumnData::with_capacity(col.data().get_type(), indices.len());
436 for &idx in indices {
437 new_data.push_value(col.data().get_value(idx));
438 }
439 Column {
440 name: col.name.clone(),
441 data: new_data,
442 }
443 })
444 .collect();
445
446 let new_row_numbers: Vec<RowNumber> = if self.row_numbers.is_empty() {
447 Vec::new()
448 } else {
449 indices.iter().map(|&i| self.row_numbers[i]).collect()
450 };
451 let new_created_at: Vec<DateTime> = if self.created_at.is_empty() {
452 Vec::new()
453 } else {
454 indices.iter().map(|&i| self.created_at[i]).collect()
455 };
456 let new_updated_at: Vec<DateTime> = if self.updated_at.is_empty() {
457 Vec::new()
458 } else {
459 indices.iter().map(|&i| self.updated_at[i]).collect()
460 };
461 Columns {
462 row_numbers: CowVec::new(new_row_numbers),
463 created_at: CowVec::new(new_created_at),
464 updated_at: CowVec::new(new_updated_at),
465 columns: CowVec::new(new_columns),
466 }
467 }
468
469 pub fn extract_row(&self, index: usize) -> Columns {
471 self.extract_by_indices(&[index])
472 }
473
474 pub fn append_rows_by_indices(&mut self, source: &Columns, indices: &[usize]) {
484 if indices.is_empty() {
485 return;
486 }
487
488 if self.columns.is_empty() {
489 *self = source.extract_by_indices(indices);
490 return;
491 }
492
493 assert_eq!(
494 self.columns.len(),
495 source.columns.len(),
496 "append_rows: column count mismatch (self={}, source={})",
497 self.columns.len(),
498 source.columns.len(),
499 );
500
501 let self_cols = self.columns.make_mut();
502 for (i, src_col) in source.columns.iter().enumerate() {
503 for &idx in indices {
504 self_cols[i].data_mut().push_value(src_col.data().get_value(idx));
505 }
506 }
507
508 if !source.row_numbers.is_empty() {
509 let rns = self.row_numbers.make_mut();
510 for &idx in indices {
511 rns.push(source.row_numbers[idx]);
512 }
513 }
514 if !source.created_at.is_empty() {
515 let cr = self.created_at.make_mut();
516 for &idx in indices {
517 cr.push(source.created_at[idx]);
518 }
519 }
520 if !source.updated_at.is_empty() {
521 let up = self.updated_at.make_mut();
522 for &idx in indices {
523 up.push(source.updated_at[idx]);
524 }
525 }
526 }
527
528 pub fn remove_row(&mut self, row_number: RowNumber) -> bool {
531 let pos = self.row_numbers.iter().position(|&r| r == row_number);
532 let Some(idx) = pos else {
533 return false;
534 };
535
536 let kept_indices: Vec<usize> = (0..self.row_count()).filter(|&i| i != idx).collect();
537 *self = self.extract_by_indices(&kept_indices);
538 true
539 }
540
541 pub fn project_by_names(&self, names: &[String]) -> Columns {
544 let new_columns: Vec<Column> = names
545 .iter()
546 .filter_map(|name| self.columns.iter().find(|c| c.name().text() == name.as_str()).cloned())
547 .collect();
548
549 if new_columns.is_empty() {
550 return Columns::empty();
551 }
552
553 Columns {
554 row_numbers: self.row_numbers.clone(),
555 created_at: self.created_at.clone(),
556 updated_at: self.updated_at.clone(),
557 columns: CowVec::new(new_columns),
558 }
559 }
560
561 pub fn partition_by_keys<K: Hash + Eq + Clone>(&self, keys: &[K]) -> IndexMap<K, Columns> {
564 assert_eq!(keys.len(), self.row_count(), "keys length must match row count");
565
566 let mut key_to_indices: IndexMap<K, Vec<usize>> = IndexMap::new();
568 for (idx, key) in keys.iter().enumerate() {
569 key_to_indices.entry(key.clone()).or_default().push(idx);
570 }
571
572 key_to_indices.into_iter().map(|(key, indices)| (key, self.extract_by_indices(&indices))).collect()
574 }
575
576 pub fn from_row(row: &Row) -> Self {
578 let mut columns = Vec::new();
579
580 for (idx, field) in row.shape.fields().iter().enumerate() {
581 let value = row.shape.get_value(&row.encoded, idx);
582
583 let column_type = if matches!(value, Value::None { .. }) {
585 field.constraint.get_type()
586 } else {
587 value.get_type()
588 };
589
590 let mut data = if column_type.is_option() {
591 ColumnData::none_typed(column_type.clone(), 0)
592 } else {
593 ColumnData::with_capacity(column_type.clone(), 1)
594 };
595 data.push_value(value);
596
597 if column_type == Type::DictionaryId
598 && let ColumnData::DictionaryId(container) = &mut data
599 && let Some(Constraint::Dictionary(dict_id, _)) = field.constraint.constraint()
600 {
601 container.set_dictionary_id(*dict_id);
602 }
603
604 let name = row.shape.get_field_name(idx).expect("RowShape missing name for field");
605
606 columns.push(Column {
607 name: Fragment::internal(name),
608 data,
609 });
610 }
611
612 Self {
613 row_numbers: CowVec::new(vec![row.number]),
614 created_at: CowVec::new(vec![DateTime::from_nanos(row.encoded.created_at_nanos())]),
615 updated_at: CowVec::new(vec![DateTime::from_nanos(row.encoded.updated_at_nanos())]),
616 columns: CowVec::new(columns),
617 }
618 }
619
620 pub fn to_single_row(&self) -> Row {
623 assert_eq!(self.row_count(), 1, "to_row() requires exactly 1 row, got {}", self.row_count());
624 assert_eq!(
625 self.row_numbers.len(),
626 1,
627 "to_row() requires exactly 1 row number, got {}",
628 self.row_numbers.len()
629 );
630
631 let row_number = *self.row_numbers.first().unwrap();
632
633 let fields: Vec<RowShapeField> = self
635 .columns
636 .iter()
637 .map(|col| RowShapeField::unconstrained(col.name().text().to_string(), col.data().get_type()))
638 .collect();
639
640 let layout = RowShape::new(fields);
641 let mut encoded = layout.allocate();
642
643 let values: Vec<Value> = self.columns.iter().map(|col| col.data().get_value(0)).collect();
645 layout.set_values(&mut encoded, &values);
646
647 Row {
648 number: row_number,
649 encoded,
650 shape: layout,
651 }
652 }
653}
654
655#[cfg(test)]
656pub mod tests {
657 use reifydb_type::value::{date::Date, datetime::DateTime, duration::Duration, time::Time};
658
659 use super::*;
660
661 #[test]
662 fn test_single_row_temporal_types() {
663 let date = Date::from_ymd(2025, 1, 15).unwrap();
664 let datetime = DateTime::from_timestamp(1642694400).unwrap();
665 let time = Time::from_hms(14, 30, 45).unwrap();
666 let duration = Duration::from_days(30).unwrap();
667
668 let columns = Columns::single_row([
669 ("date_col", Value::Date(date.clone())),
670 ("datetime_col", Value::DateTime(datetime.clone())),
671 ("time_col", Value::Time(time.clone())),
672 ("interval_col", Value::Duration(duration.clone())),
673 ]);
674
675 assert_eq!(columns.len(), 4);
676 assert_eq!(columns.shape(), (1, 4));
677
678 assert_eq!(columns.column("date_col").unwrap().data().get_value(0), Value::Date(date));
680 assert_eq!(columns.column("datetime_col").unwrap().data().get_value(0), Value::DateTime(datetime));
681 assert_eq!(columns.column("time_col").unwrap().data().get_value(0), Value::Time(time));
682 assert_eq!(columns.column("interval_col").unwrap().data().get_value(0), Value::Duration(duration));
683 }
684
685 #[test]
686 fn test_single_row_mixed_types() {
687 let date = Date::from_ymd(2025, 7, 15).unwrap();
688 let time = Time::from_hms(9, 15, 30).unwrap();
689
690 let columns = Columns::single_row([
691 ("bool_col", Value::Boolean(true)),
692 ("int_col", Value::Int4(42)),
693 ("str_col", Value::Utf8("hello".to_string())),
694 ("date_col", Value::Date(date.clone())),
695 ("time_col", Value::Time(time.clone())),
696 ("none_col", Value::none()),
697 ]);
698
699 assert_eq!(columns.len(), 6);
700 assert_eq!(columns.shape(), (1, 6));
701
702 assert_eq!(columns.column("bool_col").unwrap().data().get_value(0), Value::Boolean(true));
704 assert_eq!(columns.column("int_col").unwrap().data().get_value(0), Value::Int4(42));
705 assert_eq!(columns.column("str_col").unwrap().data().get_value(0), Value::Utf8("hello".to_string()));
706 assert_eq!(columns.column("date_col").unwrap().data().get_value(0), Value::Date(date));
707 assert_eq!(columns.column("time_col").unwrap().data().get_value(0), Value::Time(time));
708 assert_eq!(columns.column("none_col").unwrap().data().get_value(0), Value::none());
709 }
710
711 #[test]
712 fn test_single_row_normal_column_names_work() {
713 let columns = Columns::single_row([("normal_column", Value::Int4(42))]);
714 assert_eq!(columns.len(), 1);
715 assert_eq!(columns.column("normal_column").unwrap().data().get_value(0), Value::Int4(42));
716 }
717}