reifydb_engine/vm/volcano/scan/
table.rs1use std::sync::Arc;
5
6use reifydb_core::{
7 encoded::{key::EncodedKey, row::EncodedRow, shape::RowShape},
8 error::diagnostic,
9 interface::{catalog::dictionary::Dictionary, resolved::ResolvedTable},
10 key::{
11 EncodableKey,
12 row::{RowKey, RowKeyRange},
13 },
14 value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns, headers::ColumnHeaders},
15};
16use reifydb_transaction::transaction::Transaction;
17use reifydb_type::{error, fragment::Fragment, util::cowvec::CowVec, value::r#type::Type};
18use tracing::instrument;
19
20use super::super::decode_dictionary_columns;
21use crate::{
22 Result,
23 vm::volcano::query::{QueryContext, QueryNode},
24};
25
26pub struct TableScanNode {
27 table: ResolvedTable,
28 context: Option<Arc<QueryContext>>,
29 headers: ColumnHeaders,
30
31 storage_types: Vec<Type>,
32
33 dictionaries: Vec<Option<Dictionary>>,
34
35 shape: Option<RowShape>,
36 last_key: Option<EncodedKey>,
37 exhausted: bool,
38}
39
40impl TableScanNode {
41 pub fn new(table: ResolvedTable, context: Arc<QueryContext>, rx: &mut Transaction<'_>) -> Result<Self> {
42 let mut storage_types = Vec::with_capacity(table.columns().len());
43 let mut dictionaries = Vec::with_capacity(table.columns().len());
44
45 for col in table.columns() {
46 if let Some(dict_id) = col.dictionary_id {
47 if let Some(dict) = context.services.catalog.find_dictionary(rx, dict_id)? {
48 storage_types.push(Type::DictionaryId);
49 dictionaries.push(Some(dict));
50 } else {
51 storage_types.push(col.constraint.get_type());
52 dictionaries.push(None);
53 }
54 } else {
55 storage_types.push(col.constraint.get_type());
56 dictionaries.push(None);
57 }
58 }
59
60 let headers = ColumnHeaders {
61 columns: table.columns().iter().map(|col| Fragment::internal(&col.name)).collect(),
62 };
63
64 Ok(Self {
65 table,
66 context: Some(context),
67 headers,
68 storage_types,
69 dictionaries,
70 shape: None,
71 last_key: None,
72 exhausted: false,
73 })
74 }
75
76 fn get_or_load_shape<'a>(&mut self, rx: &mut Transaction<'a>, first_row: &EncodedRow) -> Result<RowShape> {
77 if let Some(shape) = &self.shape {
78 return Ok(shape.clone());
79 }
80
81 let fingerprint = first_row.fingerprint();
82
83 let stored_ctx = self.context.as_ref().expect("TableScanNode context not set");
84 let shape = stored_ctx.services.catalog.get_or_load_row_shape(fingerprint, rx)?.ok_or_else(|| {
85 error!(diagnostic::internal::internal(format!(
86 "RowShape with fingerprint {:?} not found for table {}",
87 fingerprint,
88 self.table.def().name
89 )))
90 })?;
91
92 self.shape = Some(shape.clone());
93
94 Ok(shape)
95 }
96}
97
98impl QueryNode for TableScanNode {
99 #[instrument(level = "trace", skip_all, name = "volcano::scan::table::initialize")]
100 fn initialize<'a>(&mut self, _rx: &mut Transaction<'a>, _ctx: &QueryContext) -> Result<()> {
101 Ok(())
102 }
103
104 #[instrument(level = "trace", skip_all, name = "volcano::scan::table::next")]
105 fn next<'a>(&mut self, rx: &mut Transaction<'a>, _ctx: &mut QueryContext) -> Result<Option<Columns>> {
106 debug_assert!(self.context.is_some(), "TableScanNode::next() called before initialize()");
107 let stored_ctx = self.context.as_ref().unwrap();
108
109 if self.exhausted {
110 return Ok(None);
111 }
112
113 let batch_size = stored_ctx.batch_size;
114
115 let range = RowKeyRange::scan_range(self.table.def().id.into(), self.last_key.as_ref());
116
117 let mut batch_rows = Vec::new();
118 let mut row_numbers = Vec::new();
119 let mut new_last_key = None;
120
121 let mut stream = rx.range(range, batch_size as usize)?;
122
123 for _ in 0..batch_size {
124 match stream.next() {
125 Some(Ok(multi)) => {
126 if let Some(key) = RowKey::decode(&multi.key) {
127 batch_rows.push(multi.row);
128 row_numbers.push(key.row);
129 new_last_key = Some(multi.key);
130 }
131 }
132 Some(Err(e)) => return Err(e),
133 None => {
134 self.exhausted = true;
135 break;
136 }
137 }
138 }
139
140 drop(stream);
141
142 if batch_rows.is_empty() {
143 self.exhausted = true;
144 if self.last_key.is_none() {
145 let columns: Vec<ColumnWithName> = self
146 .table
147 .columns()
148 .iter()
149 .map(|col| ColumnWithName {
150 name: Fragment::internal(&col.name),
151 data: ColumnBuffer::none_typed(col.constraint.get_type(), 0),
152 })
153 .collect();
154 return Ok(Some(Columns::new(columns)));
155 }
156 return Ok(None);
157 }
158
159 self.last_key = new_last_key;
160
161 let storage_columns: Vec<ColumnWithName> = {
162 self.table
163 .columns()
164 .iter()
165 .enumerate()
166 .map(|(idx, col)| ColumnWithName {
167 name: Fragment::internal(&col.name),
168 data: ColumnBuffer::with_capacity(self.storage_types[idx].clone(), 0),
169 })
170 .collect()
171 };
172
173 let mut columns = Columns::with_system_columns(storage_columns, Vec::new(), Vec::new(), Vec::new());
174 {
175 let shape = self.get_or_load_shape(rx, &batch_rows[0])?;
176 columns.append_rows(&shape, batch_rows.into_iter(), row_numbers.clone())?;
177 }
178
179 columns.row_numbers = CowVec::new(row_numbers);
180
181 decode_dictionary_columns(&mut columns, &self.dictionaries, rx)?;
182
183 Ok(Some(columns))
184 }
185
186 fn headers(&self) -> Option<ColumnHeaders> {
187 Some(self.headers.clone())
188 }
189}