1use std::{
5 collections::HashMap,
6 hash::Hash,
7 mem,
8 ops::{Deref, Index, IndexMut},
9};
10
11use indexmap::IndexMap;
12use reifydb_type::{
13 Result,
14 fragment::Fragment,
15 util::cowvec::CowVec,
16 value::{Value, constraint::Constraint, row_number::RowNumber, r#type::Type},
17};
18
19use crate::{
20 encoded::schema::{Schema, SchemaField},
21 interface::{
22 catalog::{table::TableDef, view::ViewDef},
23 resolved::{ResolvedRingBuffer, ResolvedTable, ResolvedView},
24 },
25 row::Row,
26 value::column::{Column, ColumnData, headers::ColumnHeaders},
27};
28
29#[derive(Debug, Clone)]
30pub struct Columns {
31 pub row_numbers: CowVec<RowNumber>,
32 pub columns: CowVec<Column>,
33}
34
35impl Deref for Columns {
36 type Target = [Column];
37
38 fn deref(&self) -> &Self::Target {
39 self.columns.deref()
40 }
41}
42
43impl Index<usize> for Columns {
44 type Output = Column;
45
46 fn index(&self, index: usize) -> &Self::Output {
47 self.columns.index(index)
48 }
49}
50
51impl IndexMut<usize> for Columns {
52 fn index_mut(&mut self, index: usize) -> &mut Self::Output {
53 &mut self.columns.make_mut()[index]
54 }
55}
56
57impl Columns {
58 pub fn scalar(value: Value) -> Self {
61 let data = match value {
62 Value::None {
63 ..
64 } => ColumnData::none_typed(Type::Boolean, 1),
65 Value::Boolean(v) => ColumnData::bool([v]),
66 Value::Float4(v) => ColumnData::float4([v.into()]),
67 Value::Float8(v) => ColumnData::float8([v.into()]),
68 Value::Int1(v) => ColumnData::int1([v]),
69 Value::Int2(v) => ColumnData::int2([v]),
70 Value::Int4(v) => ColumnData::int4([v]),
71 Value::Int8(v) => ColumnData::int8([v]),
72 Value::Int16(v) => ColumnData::int16([v]),
73 Value::Utf8(v) => ColumnData::utf8([v]),
74 Value::Uint1(v) => ColumnData::uint1([v]),
75 Value::Uint2(v) => ColumnData::uint2([v]),
76 Value::Uint4(v) => ColumnData::uint4([v]),
77 Value::Uint8(v) => ColumnData::uint8([v]),
78 Value::Uint16(v) => ColumnData::uint16([v]),
79 Value::Date(v) => ColumnData::date([v]),
80 Value::DateTime(v) => ColumnData::datetime([v]),
81 Value::Time(v) => ColumnData::time([v]),
82 Value::Duration(v) => ColumnData::duration([v]),
83 Value::IdentityId(v) => ColumnData::identity_id([v]),
84 Value::Uuid4(v) => ColumnData::uuid4([v]),
85 Value::Uuid7(v) => ColumnData::uuid7([v]),
86 Value::Blob(v) => ColumnData::blob([v]),
87 Value::Int(v) => ColumnData::int(vec![v]),
88 Value::Uint(v) => ColumnData::uint(vec![v]),
89 Value::Decimal(v) => ColumnData::decimal(vec![v]),
90 Value::DictionaryId(v) => ColumnData::dictionary_id(vec![v]),
91 Value::Any(v) => ColumnData::any(vec![v]),
92 Value::Type(v) => ColumnData::any(vec![Box::new(Value::Type(v))]),
93 Value::List(v) => ColumnData::any(vec![Box::new(Value::List(v))]),
94 };
95 let column = Column {
96 name: Fragment::internal("value"),
97 data,
98 };
99 Self {
100 row_numbers: CowVec::new(Vec::new()),
101 columns: CowVec::new(vec![column]),
102 }
103 }
104
105 pub fn scalar_value(&self) -> Value {
108 debug_assert_eq!(self.len(), 1, "scalar_value() requires exactly 1 column, got {}", self.len());
109 debug_assert_eq!(
110 self.row_count(),
111 1,
112 "scalar_value() requires exactly 1 row, got {}",
113 self.row_count()
114 );
115 self.columns[0].data().get_value(0)
116 }
117
118 pub fn new(columns: Vec<Column>) -> Self {
119 let n = columns.first().map_or(0, |c| c.data().len());
120 assert!(columns.iter().all(|c| c.data().len() == n));
121
122 Self {
123 row_numbers: CowVec::new(Vec::new()),
124 columns: CowVec::new(columns),
125 }
126 }
127
128 pub fn with_row_numbers(columns: Vec<Column>, row_numbers: Vec<RowNumber>) -> Self {
129 let n = columns.first().map_or(0, |c| c.data().len());
130 assert!(columns.iter().all(|c| c.data().len() == n));
131 assert_eq!(row_numbers.len(), n, "row_numbers length must match column data length");
132
133 Self {
134 row_numbers: CowVec::new(row_numbers),
135 columns: CowVec::new(columns),
136 }
137 }
138
139 pub fn single_row<'b>(rows: impl IntoIterator<Item = (&'b str, Value)>) -> Columns {
140 let mut columns = Vec::new();
141 let mut index = HashMap::new();
142
143 for (idx, (name, value)) in rows.into_iter().enumerate() {
144 let data = match value {
145 Value::None {
146 ..
147 } => ColumnData::none_typed(Type::Boolean, 1),
148 Value::Boolean(v) => ColumnData::bool([v]),
149 Value::Float4(v) => ColumnData::float4([v.into()]),
150 Value::Float8(v) => ColumnData::float8([v.into()]),
151 Value::Int1(v) => ColumnData::int1([v]),
152 Value::Int2(v) => ColumnData::int2([v]),
153 Value::Int4(v) => ColumnData::int4([v]),
154 Value::Int8(v) => ColumnData::int8([v]),
155 Value::Int16(v) => ColumnData::int16([v]),
156 Value::Utf8(v) => ColumnData::utf8([v.clone()]),
157 Value::Uint1(v) => ColumnData::uint1([v]),
158 Value::Uint2(v) => ColumnData::uint2([v]),
159 Value::Uint4(v) => ColumnData::uint4([v]),
160 Value::Uint8(v) => ColumnData::uint8([v]),
161 Value::Uint16(v) => ColumnData::uint16([v]),
162 Value::Date(v) => ColumnData::date([v.clone()]),
163 Value::DateTime(v) => ColumnData::datetime([v.clone()]),
164 Value::Time(v) => ColumnData::time([v.clone()]),
165 Value::Duration(v) => ColumnData::duration([v.clone()]),
166 Value::IdentityId(v) => ColumnData::identity_id([v]),
167 Value::Uuid4(v) => ColumnData::uuid4([v]),
168 Value::Uuid7(v) => ColumnData::uuid7([v]),
169 Value::Blob(v) => ColumnData::blob([v.clone()]),
170 Value::Int(v) => ColumnData::int(vec![v]),
171 Value::Uint(v) => ColumnData::uint(vec![v]),
172 Value::Decimal(v) => ColumnData::decimal(vec![v]),
173 Value::DictionaryId(v) => ColumnData::dictionary_id(vec![v]),
174 Value::Type(t) => ColumnData::any(vec![Box::new(Value::Type(t))]),
175 Value::Any(v) => ColumnData::any(vec![v]),
176 Value::List(v) => ColumnData::any(vec![Box::new(Value::List(v))]),
177 };
178
179 let column = Column {
180 name: Fragment::internal(name.to_string()),
181 data,
182 };
183 index.insert(name, idx);
184 columns.push(column);
185 }
186
187 Self {
188 row_numbers: CowVec::new(Vec::new()),
189 columns: CowVec::new(columns),
190 }
191 }
192
193 pub fn apply_headers(&mut self, headers: &ColumnHeaders) {
194 for (i, name) in headers.columns.iter().enumerate() {
196 if i < self.len() {
197 let column = &mut self[i];
198 let data = mem::replace(column.data_mut(), ColumnData::none_typed(Type::Boolean, 0));
199
200 *column = Column {
201 name: name.clone(),
202 data,
203 };
204 }
205 }
206 }
207}
208
209impl Columns {
210 pub fn number(&self) -> RowNumber {
212 assert_eq!(self.row_count(), 1, "number() requires exactly 1 row, got {}", self.row_count());
213 if self.row_numbers.is_empty() {
214 RowNumber(0)
215 } else {
216 self.row_numbers[0]
217 }
218 }
219
220 pub fn shape(&self) -> (usize, usize) {
221 let row_count = if !self.row_numbers.is_empty() {
222 self.row_numbers.len()
223 } else {
224 self.get(0).map(|c| c.data().len()).unwrap_or(0)
225 };
226 (row_count, self.len())
227 }
228
229 pub fn into_iter(self) -> impl Iterator<Item = Column> {
230 self.columns.into_iter()
231 }
232
233 pub fn is_empty(&self) -> bool {
234 self.shape().0 == 0
235 }
236
237 pub fn row(&self, i: usize) -> Vec<Value> {
238 self.iter().map(|c| c.data().get_value(i)).collect()
239 }
240
241 pub fn column(&self, name: &str) -> Option<&Column> {
242 self.iter().find(|col| col.name().text() == name)
243 }
244
245 pub fn row_count(&self) -> usize {
246 if !self.row_numbers.is_empty() {
247 self.row_numbers.len()
248 } else {
249 self.first().map_or(0, |col| col.data().len())
250 }
251 }
252
253 pub fn get_row(&self, index: usize) -> Vec<Value> {
254 self.iter().map(|col| col.data().get_value(index)).collect()
255 }
256}
257
258impl Column {
259 pub fn extend(&mut self, other: Column) -> Result<()> {
260 self.data_mut().extend(other.data().clone())
261 }
262}
263
264impl Columns {
265 pub fn from_rows(names: &[&str], result_rows: &[Vec<Value>]) -> Self {
266 let column_count = names.len();
267
268 let mut columns: Vec<Column> = names
269 .iter()
270 .map(|name| Column {
271 name: Fragment::internal(name.to_string()),
272 data: ColumnData::none_typed(Type::Boolean, 0),
273 })
274 .collect();
275
276 for row in result_rows {
277 assert_eq!(row.len(), column_count, "row length does not match column count");
278 for (i, value) in row.iter().enumerate() {
279 columns[i].data_mut().push_value(value.clone());
280 }
281 }
282
283 Columns::new(columns)
284 }
285
286 pub fn from_rows_with_row_numbers(
287 names: &[&str],
288 result_rows: &[Vec<Value>],
289 row_numbers: Vec<RowNumber>,
290 ) -> Self {
291 let column_count = names.len();
292
293 let mut columns: Vec<Column> = names
294 .iter()
295 .map(|name| Column {
296 name: Fragment::internal(name.to_string()),
297 data: ColumnData::none_typed(Type::Boolean, 0),
298 })
299 .collect();
300
301 for row in result_rows {
302 assert_eq!(row.len(), column_count, "row length does not match column count");
303 for (i, value) in row.iter().enumerate() {
304 columns[i].data_mut().push_value(value.clone());
305 }
306 }
307
308 Columns::with_row_numbers(columns, row_numbers)
309 }
310}
311
312impl Columns {
313 pub fn empty() -> Self {
314 Self {
315 row_numbers: CowVec::new(vec![]),
316 columns: CowVec::new(vec![]),
317 }
318 }
319
320 pub fn from_table(table: &ResolvedTable) -> Self {
321 let _source = table.clone();
322
323 let columns: Vec<Column> = table
324 .columns()
325 .iter()
326 .map(|col| {
327 let column_ident = Fragment::internal(&col.name);
328 Column {
329 name: column_ident,
330 data: ColumnData::with_capacity(col.constraint.get_type(), 0),
331 }
332 })
333 .collect();
334
335 Self {
336 row_numbers: CowVec::new(Vec::new()),
337 columns: CowVec::new(columns),
338 }
339 }
340
341 pub fn from_table_def(table: &TableDef) -> Self {
343 let columns: Vec<Column> = table
344 .columns
345 .iter()
346 .map(|col| Column {
347 name: Fragment::internal(&col.name),
348 data: ColumnData::with_capacity(col.constraint.get_type(), 0),
349 })
350 .collect();
351
352 Self {
353 row_numbers: CowVec::new(Vec::new()),
354 columns: CowVec::new(columns),
355 }
356 }
357
358 pub fn from_view_def(view: &ViewDef) -> Self {
360 let columns: Vec<Column> = view
361 .columns
362 .iter()
363 .map(|col| Column {
364 name: Fragment::internal(&col.name),
365 data: ColumnData::with_capacity(col.constraint.get_type(), 0),
366 })
367 .collect();
368
369 Self {
370 row_numbers: CowVec::new(Vec::new()),
371 columns: CowVec::new(columns),
372 }
373 }
374
375 pub fn from_ringbuffer(ringbuffer: &ResolvedRingBuffer) -> Self {
376 let _source = ringbuffer.clone();
377
378 let columns: Vec<Column> = ringbuffer
379 .columns()
380 .iter()
381 .map(|col| {
382 let column_ident = Fragment::internal(&col.name);
383 Column {
384 name: column_ident,
385 data: ColumnData::with_capacity(col.constraint.get_type(), 0),
386 }
387 })
388 .collect();
389
390 Self {
391 row_numbers: CowVec::new(Vec::new()),
392 columns: CowVec::new(columns),
393 }
394 }
395
396 pub fn from_view(view: &ResolvedView) -> Self {
397 let _source = view.clone();
398
399 let columns: Vec<Column> = view
400 .columns()
401 .iter()
402 .map(|col| {
403 let column_ident = Fragment::internal(&col.name);
404 Column {
405 name: column_ident,
406 data: ColumnData::with_capacity(col.constraint.get_type(), 0),
407 }
408 })
409 .collect();
410
411 Self {
412 row_numbers: CowVec::new(Vec::new()),
413 columns: CowVec::new(columns),
414 }
415 }
416}
417
418impl Columns {
419 pub fn extract_by_indices(&self, indices: &[usize]) -> Columns {
421 if indices.is_empty() {
422 return Columns::empty();
423 }
424
425 let new_columns: Vec<Column> = self
426 .columns
427 .iter()
428 .map(|col| {
429 let mut new_data = ColumnData::with_capacity(col.data().get_type(), indices.len());
430 for &idx in indices {
431 new_data.push_value(col.data().get_value(idx));
432 }
433 Column {
434 name: col.name.clone(),
435 data: new_data,
436 }
437 })
438 .collect();
439
440 if self.row_numbers.is_empty() {
441 Columns::new(new_columns)
442 } else {
443 let new_row_numbers: Vec<RowNumber> = indices.iter().map(|&i| self.row_numbers[i]).collect();
444 Columns::with_row_numbers(new_columns, new_row_numbers)
445 }
446 }
447
448 pub fn extract_row(&self, index: usize) -> Columns {
450 self.extract_by_indices(&[index])
451 }
452
453 pub fn partition_by_keys<K: Hash + Eq + Clone>(&self, keys: &[K]) -> IndexMap<K, Columns> {
456 assert_eq!(keys.len(), self.row_count(), "keys length must match row count");
457
458 let mut key_to_indices: IndexMap<K, Vec<usize>> = IndexMap::new();
460 for (idx, key) in keys.iter().enumerate() {
461 key_to_indices.entry(key.clone()).or_default().push(idx);
462 }
463
464 key_to_indices.into_iter().map(|(key, indices)| (key, self.extract_by_indices(&indices))).collect()
466 }
467
468 pub fn from_row(row: &Row) -> Self {
470 let mut columns = Vec::new();
471
472 for (idx, field) in row.schema.fields().iter().enumerate() {
473 let value = row.schema.get_value(&row.encoded, idx);
474
475 let column_type = if matches!(value, Value::None { .. }) {
477 field.constraint.get_type()
478 } else {
479 value.get_type()
480 };
481
482 let mut data = if column_type.is_option() {
483 ColumnData::none_typed(column_type.clone(), 0)
484 } else {
485 ColumnData::with_capacity(column_type.clone(), 1)
486 };
487 data.push_value(value);
488
489 if column_type == Type::DictionaryId {
490 if let ColumnData::DictionaryId(container) = &mut data {
491 if let Some(Constraint::Dictionary(dict_id, _)) = field.constraint.constraint()
492 {
493 container.set_dictionary_id(*dict_id);
494 }
495 }
496 }
497
498 let name = row.schema.get_field_name(idx).expect("Schema missing name for field");
499
500 columns.push(Column {
501 name: Fragment::internal(name),
502 data,
503 });
504 }
505
506 Self {
507 row_numbers: CowVec::new(vec![row.number]),
508 columns: CowVec::new(columns),
509 }
510 }
511
512 pub fn to_single_row(&self) -> Row {
515 assert_eq!(self.row_count(), 1, "to_row() requires exactly 1 row, got {}", self.row_count());
516 assert_eq!(
517 self.row_numbers.len(),
518 1,
519 "to_row() requires exactly 1 row number, got {}",
520 self.row_numbers.len()
521 );
522
523 let row_number = self.row_numbers.first().unwrap().clone();
524
525 let fields: Vec<SchemaField> = self
527 .columns
528 .iter()
529 .map(|col| SchemaField::unconstrained(col.name().text().to_string(), col.data().get_type()))
530 .collect();
531
532 let layout = Schema::new(fields);
533 let mut encoded = layout.allocate();
534
535 let values: Vec<Value> = self.columns.iter().map(|col| col.data().get_value(0)).collect();
537 layout.set_values(&mut encoded, &values);
538
539 Row {
540 number: row_number,
541 encoded,
542 schema: layout,
543 }
544 }
545}
546
547#[cfg(test)]
548pub mod tests {
549 use reifydb_type::value::{date::Date, datetime::DateTime, duration::Duration, time::Time};
550
551 use super::*;
552
553 #[test]
554 fn test_single_row_temporal_types() {
555 let date = Date::from_ymd(2025, 1, 15).unwrap();
556 let datetime = DateTime::from_timestamp(1642694400).unwrap();
557 let time = Time::from_hms(14, 30, 45).unwrap();
558 let duration = Duration::from_days(30);
559
560 let columns = Columns::single_row([
561 ("date_col", Value::Date(date.clone())),
562 ("datetime_col", Value::DateTime(datetime.clone())),
563 ("time_col", Value::Time(time.clone())),
564 ("interval_col", Value::Duration(duration.clone())),
565 ]);
566
567 assert_eq!(columns.len(), 4);
568 assert_eq!(columns.shape(), (1, 4));
569
570 assert_eq!(columns.column("date_col").unwrap().data().get_value(0), Value::Date(date));
572 assert_eq!(columns.column("datetime_col").unwrap().data().get_value(0), Value::DateTime(datetime));
573 assert_eq!(columns.column("time_col").unwrap().data().get_value(0), Value::Time(time));
574 assert_eq!(columns.column("interval_col").unwrap().data().get_value(0), Value::Duration(duration));
575 }
576
577 #[test]
578 fn test_single_row_mixed_types() {
579 let date = Date::from_ymd(2025, 7, 15).unwrap();
580 let time = Time::from_hms(9, 15, 30).unwrap();
581
582 let columns = Columns::single_row([
583 ("bool_col", Value::Boolean(true)),
584 ("int_col", Value::Int4(42)),
585 ("str_col", Value::Utf8("hello".to_string())),
586 ("date_col", Value::Date(date.clone())),
587 ("time_col", Value::Time(time.clone())),
588 ("none_col", Value::none()),
589 ]);
590
591 assert_eq!(columns.len(), 6);
592 assert_eq!(columns.shape(), (1, 6));
593
594 assert_eq!(columns.column("bool_col").unwrap().data().get_value(0), Value::Boolean(true));
596 assert_eq!(columns.column("int_col").unwrap().data().get_value(0), Value::Int4(42));
597 assert_eq!(columns.column("str_col").unwrap().data().get_value(0), Value::Utf8("hello".to_string()));
598 assert_eq!(columns.column("date_col").unwrap().data().get_value(0), Value::Date(date));
599 assert_eq!(columns.column("time_col").unwrap().data().get_value(0), Value::Time(time));
600 assert_eq!(columns.column("none_col").unwrap().data().get_value(0), Value::none());
601 }
602
603 #[test]
604 fn test_single_row_normal_column_names_work() {
605 let columns = Columns::single_row([("normal_column", Value::Int4(42))]);
606 assert_eq!(columns.len(), 1);
607 assert_eq!(columns.column("normal_column").unwrap().data().get_value(0), Value::Int4(42));
608 }
609}