reifydb_engine/vm/volcano/scan/
table.rs1use std::sync::Arc;
5
6use reifydb_core::{
7 encoded::{key::EncodedKey, row::EncodedRow, shape::RowShape},
8 error::diagnostic,
9 interface::{catalog::dictionary::Dictionary, resolved::ResolvedTable},
10 key::{
11 EncodableKey,
12 row::{RowKey, RowKeyRange},
13 },
14 value::{
15 batch::lazy::{LazyBatch, LazyColumnMeta},
16 column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns, headers::ColumnHeaders},
17 },
18};
19use reifydb_transaction::transaction::Transaction;
20use reifydb_type::{error, fragment::Fragment, util::cowvec::CowVec, value::r#type::Type};
21use tracing::instrument;
22
23use super::super::decode_dictionary_columns;
24use crate::{
25 Result,
26 vm::volcano::query::{QueryContext, QueryNode},
27};
28
29pub struct TableScanNode {
30 table: ResolvedTable,
31 context: Option<Arc<QueryContext>>,
32 headers: ColumnHeaders,
33 storage_types: Vec<Type>,
35 dictionaries: Vec<Option<Dictionary>>,
37 shape: Option<RowShape>,
39 last_key: Option<EncodedKey>,
40 exhausted: bool,
41 scan_limit: Option<usize>,
42}
43
44impl TableScanNode {
45 pub fn new(table: ResolvedTable, context: Arc<QueryContext>, rx: &mut Transaction<'_>) -> Result<Self> {
46 let mut storage_types = Vec::with_capacity(table.columns().len());
48 let mut dictionaries = Vec::with_capacity(table.columns().len());
49
50 for col in table.columns() {
51 if let Some(dict_id) = col.dictionary_id {
52 if let Some(dict) = context.services.catalog.find_dictionary(rx, dict_id)? {
53 storage_types.push(Type::DictionaryId);
54 dictionaries.push(Some(dict));
55 } else {
56 storage_types.push(col.constraint.get_type());
58 dictionaries.push(None);
59 }
60 } else {
61 storage_types.push(col.constraint.get_type());
62 dictionaries.push(None);
63 }
64 }
65
66 let headers = ColumnHeaders {
67 columns: table.columns().iter().map(|col| Fragment::internal(&col.name)).collect(),
68 };
69
70 Ok(Self {
71 table,
72 context: Some(context),
73 headers,
74 storage_types,
75 dictionaries,
76 shape: None,
77 last_key: None,
78 exhausted: false,
79 scan_limit: None,
80 })
81 }
82
83 fn get_or_load_shape<'a>(&mut self, rx: &mut Transaction<'a>, first_row: &EncodedRow) -> Result<RowShape> {
84 if let Some(shape) = &self.shape {
85 return Ok(shape.clone());
86 }
87
88 let fingerprint = first_row.fingerprint();
89
90 let stored_ctx = self.context.as_ref().expect("TableScanNode context not set");
91 let shape = stored_ctx.services.catalog.get_or_load_row_shape(fingerprint, rx)?.ok_or_else(|| {
92 error!(diagnostic::internal::internal(format!(
93 "RowShape with fingerprint {:?} not found for table {}",
94 fingerprint,
95 self.table.def().name
96 )))
97 })?;
98
99 self.shape = Some(shape.clone());
100
101 Ok(shape)
102 }
103}
104
105impl QueryNode for TableScanNode {
106 #[instrument(level = "trace", skip_all, name = "volcano::scan::table::initialize")]
107 fn initialize<'a>(&mut self, _rx: &mut Transaction<'a>, _ctx: &QueryContext) -> Result<()> {
108 Ok(())
110 }
111
112 #[instrument(level = "trace", skip_all, name = "volcano::scan::table::next")]
113 fn next<'a>(&mut self, rx: &mut Transaction<'a>, _ctx: &mut QueryContext) -> Result<Option<Columns>> {
114 debug_assert!(self.context.is_some(), "TableScanNode::next() called before initialize()");
115 let stored_ctx = self.context.as_ref().unwrap();
116
117 if self.exhausted {
118 return Ok(None);
119 }
120
121 let batch_size = match self.scan_limit {
122 Some(limit) => (limit as u64).min(stored_ctx.batch_size),
123 None => stored_ctx.batch_size,
124 };
125
126 let range = RowKeyRange::scan_range(self.table.def().id.into(), self.last_key.as_ref());
127
128 let mut batch_rows = Vec::new();
129 let mut row_numbers = Vec::new();
130 let mut new_last_key = None;
131
132 let mut stream = rx.range(range, batch_size as usize)?;
134
135 for _ in 0..batch_size {
137 match stream.next() {
138 Some(Ok(multi)) => {
139 if let Some(key) = RowKey::decode(&multi.key) {
140 batch_rows.push(multi.row);
141 row_numbers.push(key.row);
142 new_last_key = Some(multi.key);
143 }
144 }
145 Some(Err(e)) => return Err(e),
146 None => {
147 self.exhausted = true;
148 break;
149 }
150 }
151 }
152
153 drop(stream);
155
156 if batch_rows.is_empty() {
157 self.exhausted = true;
158 if self.last_key.is_none() {
159 let columns: Vec<ColumnWithName> = self
161 .table
162 .columns()
163 .iter()
164 .map(|col| ColumnWithName {
165 name: Fragment::internal(&col.name),
166 data: ColumnBuffer::none_typed(col.constraint.get_type(), 0),
167 })
168 .collect();
169 return Ok(Some(Columns::new(columns)));
170 }
171 return Ok(None);
172 }
173
174 self.last_key = new_last_key;
175
176 let storage_columns: Vec<ColumnWithName> = {
178 self.table
179 .columns()
180 .iter()
181 .enumerate()
182 .map(|(idx, col)| ColumnWithName {
183 name: Fragment::internal(&col.name),
184 data: ColumnBuffer::with_capacity(self.storage_types[idx].clone(), 0),
185 })
186 .collect()
187 };
188
189 let mut columns = Columns::with_system_columns(storage_columns, Vec::new(), Vec::new(), Vec::new());
190 {
191 let shape = self.get_or_load_shape(rx, &batch_rows[0])?;
192 columns.append_rows(&shape, batch_rows.into_iter(), row_numbers.clone())?;
193 }
194 columns.row_numbers = CowVec::new(row_numbers);
196
197 decode_dictionary_columns(&mut columns, &self.dictionaries, rx)?;
198
199 Ok(Some(columns))
200 }
201
202 fn headers(&self) -> Option<ColumnHeaders> {
203 Some(self.headers.clone())
204 }
205
206 #[instrument(level = "trace", skip_all, name = "volcano::scan::table::next_lazy")]
207 fn next_lazy<'a>(&mut self, rx: &mut Transaction<'a>, _ctx: &mut QueryContext) -> Result<Option<LazyBatch>> {
208 debug_assert!(self.context.is_some(), "TableScanNode::next_lazy() called before initialize()");
209 let stored_ctx = self.context.as_ref().unwrap();
210
211 if self.exhausted {
212 return Ok(None);
213 }
214
215 let batch_size = match self.scan_limit {
216 Some(limit) => (limit as u64).min(stored_ctx.batch_size),
217 None => stored_ctx.batch_size,
218 };
219
220 let range = RowKeyRange::scan_range(self.table.def().id.into(), self.last_key.as_ref());
221
222 let mut stream = rx.range(range, batch_size as usize)?;
223
224 let mut encoded_rows = Vec::with_capacity(batch_size as usize);
225 let mut row_numbers = Vec::with_capacity(batch_size as usize);
226
227 for _ in 0..batch_size {
229 match stream.next() {
230 Some(Ok(multi)) => {
231 if let Some(key) = RowKey::decode(&multi.key) {
232 encoded_rows.push(multi.row);
233 row_numbers.push(key.row);
234 self.last_key = Some(multi.key);
235 }
236 }
237 Some(Err(e)) => return Err(e),
238 None => {
239 self.exhausted = true;
240 break;
241 }
242 }
243 }
244
245 drop(stream);
246
247 if encoded_rows.is_empty() {
248 self.exhausted = true;
249 return Ok(None);
250 }
251
252 let column_metas: Vec<LazyColumnMeta> = self
254 .table
255 .columns()
256 .iter()
257 .enumerate()
258 .map(|(idx, col)| {
259 let output_type = col.constraint.get_type();
260 LazyColumnMeta {
261 name: Fragment::internal(&col.name),
262 storage_type: self.storage_types[idx].clone(),
263 output_type,
264 dictionary: self.dictionaries[idx].clone(),
265 }
266 })
267 .collect();
268
269 let shape = self.get_or_load_shape(rx, &encoded_rows[0])?;
270 Ok(Some(LazyBatch::new(encoded_rows, row_numbers, &shape, column_metas)))
271 }
272
273 fn set_scan_limit(&mut self, limit: usize) {
274 self.scan_limit = Some(limit);
275 }
276}