1use std::{
5 collections::HashMap,
6 hash::Hash,
7 mem,
8 ops::{Deref, Index, IndexMut},
9};
10
11use indexmap::IndexMap;
12use reifydb_type::{
13 Result,
14 fragment::Fragment,
15 util::cowvec::CowVec,
16 value::{Value, constraint::Constraint, row_number::RowNumber, r#type::Type},
17};
18
19use crate::{
20 encoded::schema::{Schema, SchemaField},
21 interface::{
22 catalog::{table::TableDef, view::ViewDef},
23 resolved::{ResolvedRingBuffer, ResolvedTable, ResolvedView},
24 },
25 row::Row,
26 value::column::{Column, ColumnData, headers::ColumnHeaders},
27};
28
29#[derive(Debug, Clone)]
30pub struct Columns {
31 pub row_numbers: CowVec<RowNumber>,
32 pub columns: CowVec<Column>,
33}
34
35impl Deref for Columns {
36 type Target = [Column];
37
38 fn deref(&self) -> &Self::Target {
39 self.columns.deref()
40 }
41}
42
43impl Index<usize> for Columns {
44 type Output = Column;
45
46 fn index(&self, index: usize) -> &Self::Output {
47 self.columns.index(index)
48 }
49}
50
51impl IndexMut<usize> for Columns {
52 fn index_mut(&mut self, index: usize) -> &mut Self::Output {
53 &mut self.columns.make_mut()[index]
54 }
55}
56
57impl Columns {
58 pub fn scalar(value: Value) -> Self {
61 let data = match value {
62 Value::None {
63 ..
64 } => ColumnData::none_typed(Type::Boolean, 1),
65 Value::Boolean(v) => ColumnData::bool([v]),
66 Value::Float4(v) => ColumnData::float4([v.into()]),
67 Value::Float8(v) => ColumnData::float8([v.into()]),
68 Value::Int1(v) => ColumnData::int1([v]),
69 Value::Int2(v) => ColumnData::int2([v]),
70 Value::Int4(v) => ColumnData::int4([v]),
71 Value::Int8(v) => ColumnData::int8([v]),
72 Value::Int16(v) => ColumnData::int16([v]),
73 Value::Utf8(v) => ColumnData::utf8([v]),
74 Value::Uint1(v) => ColumnData::uint1([v]),
75 Value::Uint2(v) => ColumnData::uint2([v]),
76 Value::Uint4(v) => ColumnData::uint4([v]),
77 Value::Uint8(v) => ColumnData::uint8([v]),
78 Value::Uint16(v) => ColumnData::uint16([v]),
79 Value::Date(v) => ColumnData::date([v]),
80 Value::DateTime(v) => ColumnData::datetime([v]),
81 Value::Time(v) => ColumnData::time([v]),
82 Value::Duration(v) => ColumnData::duration([v]),
83 Value::IdentityId(v) => ColumnData::identity_id([v]),
84 Value::Uuid4(v) => ColumnData::uuid4([v]),
85 Value::Uuid7(v) => ColumnData::uuid7([v]),
86 Value::Blob(v) => ColumnData::blob([v]),
87 Value::Int(v) => ColumnData::int(vec![v]),
88 Value::Uint(v) => ColumnData::uint(vec![v]),
89 Value::Decimal(v) => ColumnData::decimal(vec![v]),
90 Value::DictionaryId(v) => ColumnData::dictionary_id(vec![v]),
91 Value::Any(v) => ColumnData::any(vec![v]),
92 Value::Type(v) => ColumnData::any(vec![Box::new(Value::Type(v))]),
93 Value::List(v) => ColumnData::any(vec![Box::new(Value::List(v))]),
94 Value::Record(v) => ColumnData::any(vec![Box::new(Value::Record(v))]),
95 Value::Tuple(v) => ColumnData::any(vec![Box::new(Value::Tuple(v))]),
96 };
97 let column = Column {
98 name: Fragment::internal("value"),
99 data,
100 };
101 Self {
102 row_numbers: CowVec::new(Vec::new()),
103 columns: CowVec::new(vec![column]),
104 }
105 }
106
107 pub fn scalar_value(&self) -> Value {
110 debug_assert_eq!(self.len(), 1, "scalar_value() requires exactly 1 column, got {}", self.len());
111 debug_assert_eq!(
112 self.row_count(),
113 1,
114 "scalar_value() requires exactly 1 row, got {}",
115 self.row_count()
116 );
117 self.columns[0].data().get_value(0)
118 }
119
120 pub fn new(columns: Vec<Column>) -> Self {
121 let n = columns.first().map_or(0, |c| c.data().len());
122 assert!(columns.iter().all(|c| c.data().len() == n));
123
124 Self {
125 row_numbers: CowVec::new(Vec::new()),
126 columns: CowVec::new(columns),
127 }
128 }
129
130 pub fn with_row_numbers(columns: Vec<Column>, row_numbers: Vec<RowNumber>) -> Self {
131 let n = columns.first().map_or(0, |c| c.data().len());
132 assert!(columns.iter().all(|c| c.data().len() == n));
133 assert_eq!(row_numbers.len(), n, "row_numbers length must match column data length");
134
135 Self {
136 row_numbers: CowVec::new(row_numbers),
137 columns: CowVec::new(columns),
138 }
139 }
140
141 pub fn single_row<'b>(rows: impl IntoIterator<Item = (&'b str, Value)>) -> Columns {
142 let mut columns = Vec::new();
143 let mut index = HashMap::new();
144
145 for (idx, (name, value)) in rows.into_iter().enumerate() {
146 let data = match value {
147 Value::None {
148 ..
149 } => ColumnData::none_typed(Type::Boolean, 1),
150 Value::Boolean(v) => ColumnData::bool([v]),
151 Value::Float4(v) => ColumnData::float4([v.into()]),
152 Value::Float8(v) => ColumnData::float8([v.into()]),
153 Value::Int1(v) => ColumnData::int1([v]),
154 Value::Int2(v) => ColumnData::int2([v]),
155 Value::Int4(v) => ColumnData::int4([v]),
156 Value::Int8(v) => ColumnData::int8([v]),
157 Value::Int16(v) => ColumnData::int16([v]),
158 Value::Utf8(v) => ColumnData::utf8([v.clone()]),
159 Value::Uint1(v) => ColumnData::uint1([v]),
160 Value::Uint2(v) => ColumnData::uint2([v]),
161 Value::Uint4(v) => ColumnData::uint4([v]),
162 Value::Uint8(v) => ColumnData::uint8([v]),
163 Value::Uint16(v) => ColumnData::uint16([v]),
164 Value::Date(v) => ColumnData::date([v.clone()]),
165 Value::DateTime(v) => ColumnData::datetime([v.clone()]),
166 Value::Time(v) => ColumnData::time([v.clone()]),
167 Value::Duration(v) => ColumnData::duration([v.clone()]),
168 Value::IdentityId(v) => ColumnData::identity_id([v]),
169 Value::Uuid4(v) => ColumnData::uuid4([v]),
170 Value::Uuid7(v) => ColumnData::uuid7([v]),
171 Value::Blob(v) => ColumnData::blob([v.clone()]),
172 Value::Int(v) => ColumnData::int(vec![v]),
173 Value::Uint(v) => ColumnData::uint(vec![v]),
174 Value::Decimal(v) => ColumnData::decimal(vec![v]),
175 Value::DictionaryId(v) => ColumnData::dictionary_id(vec![v]),
176 Value::Type(t) => ColumnData::any(vec![Box::new(Value::Type(t))]),
177 Value::Any(v) => ColumnData::any(vec![v]),
178 Value::List(v) => ColumnData::any(vec![Box::new(Value::List(v))]),
179 Value::Record(v) => ColumnData::any(vec![Box::new(Value::Record(v))]),
180 Value::Tuple(v) => ColumnData::any(vec![Box::new(Value::Tuple(v))]),
181 };
182
183 let column = Column {
184 name: Fragment::internal(name.to_string()),
185 data,
186 };
187 index.insert(name, idx);
188 columns.push(column);
189 }
190
191 Self {
192 row_numbers: CowVec::new(Vec::new()),
193 columns: CowVec::new(columns),
194 }
195 }
196
197 pub fn apply_headers(&mut self, headers: &ColumnHeaders) {
198 for (i, name) in headers.columns.iter().enumerate() {
200 if i < self.len() {
201 let column = &mut self[i];
202 let data = mem::replace(column.data_mut(), ColumnData::none_typed(Type::Boolean, 0));
203
204 *column = Column {
205 name: name.clone(),
206 data,
207 };
208 }
209 }
210 }
211}
212
213impl Columns {
214 pub fn number(&self) -> RowNumber {
216 assert_eq!(self.row_count(), 1, "number() requires exactly 1 row, got {}", self.row_count());
217 if self.row_numbers.is_empty() {
218 RowNumber(0)
219 } else {
220 self.row_numbers[0]
221 }
222 }
223
224 pub fn shape(&self) -> (usize, usize) {
225 let row_count = if !self.row_numbers.is_empty() {
226 self.row_numbers.len()
227 } else {
228 self.get(0).map(|c| c.data().len()).unwrap_or(0)
229 };
230 (row_count, self.len())
231 }
232
233 pub fn into_iter(self) -> impl Iterator<Item = Column> {
234 self.columns.into_iter()
235 }
236
237 pub fn is_empty(&self) -> bool {
238 self.shape().0 == 0
239 }
240
241 pub fn row(&self, i: usize) -> Vec<Value> {
242 self.iter().map(|c| c.data().get_value(i)).collect()
243 }
244
245 pub fn column(&self, name: &str) -> Option<&Column> {
246 self.iter().find(|col| col.name().text() == name)
247 }
248
249 pub fn row_count(&self) -> usize {
250 if !self.row_numbers.is_empty() {
251 self.row_numbers.len()
252 } else {
253 self.first().map_or(0, |col| col.data().len())
254 }
255 }
256
257 pub fn get_row(&self, index: usize) -> Vec<Value> {
258 self.iter().map(|col| col.data().get_value(index)).collect()
259 }
260}
261
262impl Column {
263 pub fn extend(&mut self, other: Column) -> Result<()> {
264 self.data_mut().extend(other.data().clone())
265 }
266}
267
268impl Columns {
269 pub fn from_rows(names: &[&str], result_rows: &[Vec<Value>]) -> Self {
270 let column_count = names.len();
271
272 let mut columns: Vec<Column> = names
273 .iter()
274 .map(|name| Column {
275 name: Fragment::internal(name.to_string()),
276 data: ColumnData::none_typed(Type::Boolean, 0),
277 })
278 .collect();
279
280 for row in result_rows {
281 assert_eq!(row.len(), column_count, "row length does not match column count");
282 for (i, value) in row.iter().enumerate() {
283 columns[i].data_mut().push_value(value.clone());
284 }
285 }
286
287 Columns::new(columns)
288 }
289
290 pub fn from_rows_with_row_numbers(
291 names: &[&str],
292 result_rows: &[Vec<Value>],
293 row_numbers: Vec<RowNumber>,
294 ) -> Self {
295 let column_count = names.len();
296
297 let mut columns: Vec<Column> = names
298 .iter()
299 .map(|name| Column {
300 name: Fragment::internal(name.to_string()),
301 data: ColumnData::none_typed(Type::Boolean, 0),
302 })
303 .collect();
304
305 for row in result_rows {
306 assert_eq!(row.len(), column_count, "row length does not match column count");
307 for (i, value) in row.iter().enumerate() {
308 columns[i].data_mut().push_value(value.clone());
309 }
310 }
311
312 Columns::with_row_numbers(columns, row_numbers)
313 }
314}
315
316impl Columns {
317 pub fn empty() -> Self {
318 Self {
319 row_numbers: CowVec::new(vec![]),
320 columns: CowVec::new(vec![]),
321 }
322 }
323
324 pub fn from_table(table: &ResolvedTable) -> Self {
325 let _source = table.clone();
326
327 let columns: Vec<Column> = table
328 .columns()
329 .iter()
330 .map(|col| {
331 let column_ident = Fragment::internal(&col.name);
332 Column {
333 name: column_ident,
334 data: ColumnData::with_capacity(col.constraint.get_type(), 0),
335 }
336 })
337 .collect();
338
339 Self {
340 row_numbers: CowVec::new(Vec::new()),
341 columns: CowVec::new(columns),
342 }
343 }
344
345 pub fn from_table_def(table: &TableDef) -> Self {
347 let columns: Vec<Column> = table
348 .columns
349 .iter()
350 .map(|col| Column {
351 name: Fragment::internal(&col.name),
352 data: ColumnData::with_capacity(col.constraint.get_type(), 0),
353 })
354 .collect();
355
356 Self {
357 row_numbers: CowVec::new(Vec::new()),
358 columns: CowVec::new(columns),
359 }
360 }
361
362 pub fn from_view_def(view: &ViewDef) -> Self {
364 let columns: Vec<Column> = view
365 .columns
366 .iter()
367 .map(|col| Column {
368 name: Fragment::internal(&col.name),
369 data: ColumnData::with_capacity(col.constraint.get_type(), 0),
370 })
371 .collect();
372
373 Self {
374 row_numbers: CowVec::new(Vec::new()),
375 columns: CowVec::new(columns),
376 }
377 }
378
379 pub fn from_ringbuffer(ringbuffer: &ResolvedRingBuffer) -> Self {
380 let _source = ringbuffer.clone();
381
382 let columns: Vec<Column> = ringbuffer
383 .columns()
384 .iter()
385 .map(|col| {
386 let column_ident = Fragment::internal(&col.name);
387 Column {
388 name: column_ident,
389 data: ColumnData::with_capacity(col.constraint.get_type(), 0),
390 }
391 })
392 .collect();
393
394 Self {
395 row_numbers: CowVec::new(Vec::new()),
396 columns: CowVec::new(columns),
397 }
398 }
399
400 pub fn from_view(view: &ResolvedView) -> Self {
401 let _source = view.clone();
402
403 let columns: Vec<Column> = view
404 .columns()
405 .iter()
406 .map(|col| {
407 let column_ident = Fragment::internal(&col.name);
408 Column {
409 name: column_ident,
410 data: ColumnData::with_capacity(col.constraint.get_type(), 0),
411 }
412 })
413 .collect();
414
415 Self {
416 row_numbers: CowVec::new(Vec::new()),
417 columns: CowVec::new(columns),
418 }
419 }
420}
421
422impl Columns {
423 pub fn extract_by_indices(&self, indices: &[usize]) -> Columns {
425 if indices.is_empty() {
426 return Columns::empty();
427 }
428
429 let new_columns: Vec<Column> = self
430 .columns
431 .iter()
432 .map(|col| {
433 let mut new_data = ColumnData::with_capacity(col.data().get_type(), indices.len());
434 for &idx in indices {
435 new_data.push_value(col.data().get_value(idx));
436 }
437 Column {
438 name: col.name.clone(),
439 data: new_data,
440 }
441 })
442 .collect();
443
444 if self.row_numbers.is_empty() {
445 Columns::new(new_columns)
446 } else {
447 let new_row_numbers: Vec<RowNumber> = indices.iter().map(|&i| self.row_numbers[i]).collect();
448 Columns::with_row_numbers(new_columns, new_row_numbers)
449 }
450 }
451
452 pub fn extract_row(&self, index: usize) -> Columns {
454 self.extract_by_indices(&[index])
455 }
456
457 pub fn partition_by_keys<K: Hash + Eq + Clone>(&self, keys: &[K]) -> IndexMap<K, Columns> {
460 assert_eq!(keys.len(), self.row_count(), "keys length must match row count");
461
462 let mut key_to_indices: IndexMap<K, Vec<usize>> = IndexMap::new();
464 for (idx, key) in keys.iter().enumerate() {
465 key_to_indices.entry(key.clone()).or_default().push(idx);
466 }
467
468 key_to_indices.into_iter().map(|(key, indices)| (key, self.extract_by_indices(&indices))).collect()
470 }
471
472 pub fn from_row(row: &Row) -> Self {
474 let mut columns = Vec::new();
475
476 for (idx, field) in row.schema.fields().iter().enumerate() {
477 let value = row.schema.get_value(&row.encoded, idx);
478
479 let column_type = if matches!(value, Value::None { .. }) {
481 field.constraint.get_type()
482 } else {
483 value.get_type()
484 };
485
486 let mut data = if column_type.is_option() {
487 ColumnData::none_typed(column_type.clone(), 0)
488 } else {
489 ColumnData::with_capacity(column_type.clone(), 1)
490 };
491 data.push_value(value);
492
493 if column_type == Type::DictionaryId {
494 if let ColumnData::DictionaryId(container) = &mut data {
495 if let Some(Constraint::Dictionary(dict_id, _)) = field.constraint.constraint()
496 {
497 container.set_dictionary_id(*dict_id);
498 }
499 }
500 }
501
502 let name = row.schema.get_field_name(idx).expect("Schema missing name for field");
503
504 columns.push(Column {
505 name: Fragment::internal(name),
506 data,
507 });
508 }
509
510 Self {
511 row_numbers: CowVec::new(vec![row.number]),
512 columns: CowVec::new(columns),
513 }
514 }
515
516 pub fn to_single_row(&self) -> Row {
519 assert_eq!(self.row_count(), 1, "to_row() requires exactly 1 row, got {}", self.row_count());
520 assert_eq!(
521 self.row_numbers.len(),
522 1,
523 "to_row() requires exactly 1 row number, got {}",
524 self.row_numbers.len()
525 );
526
527 let row_number = self.row_numbers.first().unwrap().clone();
528
529 let fields: Vec<SchemaField> = self
531 .columns
532 .iter()
533 .map(|col| SchemaField::unconstrained(col.name().text().to_string(), col.data().get_type()))
534 .collect();
535
536 let layout = Schema::new(fields);
537 let mut encoded = layout.allocate();
538
539 let values: Vec<Value> = self.columns.iter().map(|col| col.data().get_value(0)).collect();
541 layout.set_values(&mut encoded, &values);
542
543 Row {
544 number: row_number,
545 encoded,
546 schema: layout,
547 }
548 }
549}
550
551#[cfg(test)]
552pub mod tests {
553 use reifydb_type::value::{date::Date, datetime::DateTime, duration::Duration, time::Time};
554
555 use super::*;
556
557 #[test]
558 fn test_single_row_temporal_types() {
559 let date = Date::from_ymd(2025, 1, 15).unwrap();
560 let datetime = DateTime::from_timestamp(1642694400).unwrap();
561 let time = Time::from_hms(14, 30, 45).unwrap();
562 let duration = Duration::from_days(30);
563
564 let columns = Columns::single_row([
565 ("date_col", Value::Date(date.clone())),
566 ("datetime_col", Value::DateTime(datetime.clone())),
567 ("time_col", Value::Time(time.clone())),
568 ("interval_col", Value::Duration(duration.clone())),
569 ]);
570
571 assert_eq!(columns.len(), 4);
572 assert_eq!(columns.shape(), (1, 4));
573
574 assert_eq!(columns.column("date_col").unwrap().data().get_value(0), Value::Date(date));
576 assert_eq!(columns.column("datetime_col").unwrap().data().get_value(0), Value::DateTime(datetime));
577 assert_eq!(columns.column("time_col").unwrap().data().get_value(0), Value::Time(time));
578 assert_eq!(columns.column("interval_col").unwrap().data().get_value(0), Value::Duration(duration));
579 }
580
581 #[test]
582 fn test_single_row_mixed_types() {
583 let date = Date::from_ymd(2025, 7, 15).unwrap();
584 let time = Time::from_hms(9, 15, 30).unwrap();
585
586 let columns = Columns::single_row([
587 ("bool_col", Value::Boolean(true)),
588 ("int_col", Value::Int4(42)),
589 ("str_col", Value::Utf8("hello".to_string())),
590 ("date_col", Value::Date(date.clone())),
591 ("time_col", Value::Time(time.clone())),
592 ("none_col", Value::none()),
593 ]);
594
595 assert_eq!(columns.len(), 6);
596 assert_eq!(columns.shape(), (1, 6));
597
598 assert_eq!(columns.column("bool_col").unwrap().data().get_value(0), Value::Boolean(true));
600 assert_eq!(columns.column("int_col").unwrap().data().get_value(0), Value::Int4(42));
601 assert_eq!(columns.column("str_col").unwrap().data().get_value(0), Value::Utf8("hello".to_string()));
602 assert_eq!(columns.column("date_col").unwrap().data().get_value(0), Value::Date(date));
603 assert_eq!(columns.column("time_col").unwrap().data().get_value(0), Value::Time(time));
604 assert_eq!(columns.column("none_col").unwrap().data().get_value(0), Value::none());
605 }
606
607 #[test]
608 fn test_single_row_normal_column_names_work() {
609 let columns = Columns::single_row([("normal_column", Value::Int4(42))]);
610 assert_eq!(columns.len(), 1);
611 assert_eq!(columns.column("normal_column").unwrap().data().get_value(0), Value::Int4(42));
612 }
613}