Skip to main content

reifydb_engine/vm/volcano/scan/
table.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_core::{
7	encoded::{key::EncodedKey, row::EncodedRow, shape::RowShape},
8	error::diagnostic,
9	interface::{catalog::dictionary::Dictionary, resolved::ResolvedTable},
10	key::{
11		EncodableKey,
12		row::{RowKey, RowKeyRange},
13	},
14	value::{
15		batch::lazy::{LazyBatch, LazyColumnMeta},
16		column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns, headers::ColumnHeaders},
17	},
18};
19use reifydb_transaction::transaction::Transaction;
20use reifydb_type::{error, fragment::Fragment, util::cowvec::CowVec, value::r#type::Type};
21use tracing::instrument;
22
23use super::super::decode_dictionary_columns;
24use crate::{
25	Result,
26	vm::volcano::query::{QueryContext, QueryNode},
27};
28
29pub struct TableScanNode {
30	table: ResolvedTable,
31	context: Option<Arc<QueryContext>>,
32	headers: ColumnHeaders,
33	/// Storage types for each column (dictionary ID types for dictionary columns)
34	storage_types: Vec<Type>,
35	/// Dictionary definitions for columns that need decoding (None for non-dictionary columns)
36	dictionaries: Vec<Option<Dictionary>>,
37	/// Cached shape loaded from the first batch
38	shape: Option<RowShape>,
39	last_key: Option<EncodedKey>,
40	exhausted: bool,
41	scan_limit: Option<usize>,
42}
43
44impl TableScanNode {
45	pub fn new(table: ResolvedTable, context: Arc<QueryContext>, rx: &mut Transaction<'_>) -> Result<Self> {
46		// Look up dictionaries and build storage types
47		let mut storage_types = Vec::with_capacity(table.columns().len());
48		let mut dictionaries = Vec::with_capacity(table.columns().len());
49
50		for col in table.columns() {
51			if let Some(dict_id) = col.dictionary_id {
52				if let Some(dict) = context.services.catalog.find_dictionary(rx, dict_id)? {
53					storage_types.push(Type::DictionaryId);
54					dictionaries.push(Some(dict));
55				} else {
56					// Dictionary not found, fall back to constraint type
57					storage_types.push(col.constraint.get_type());
58					dictionaries.push(None);
59				}
60			} else {
61				storage_types.push(col.constraint.get_type());
62				dictionaries.push(None);
63			}
64		}
65
66		let headers = ColumnHeaders {
67			columns: table.columns().iter().map(|col| Fragment::internal(&col.name)).collect(),
68		};
69
70		Ok(Self {
71			table,
72			context: Some(context),
73			headers,
74			storage_types,
75			dictionaries,
76			shape: None,
77			last_key: None,
78			exhausted: false,
79			scan_limit: None,
80		})
81	}
82
83	fn get_or_load_shape<'a>(&mut self, rx: &mut Transaction<'a>, first_row: &EncodedRow) -> Result<RowShape> {
84		if let Some(shape) = &self.shape {
85			return Ok(shape.clone());
86		}
87
88		let fingerprint = first_row.fingerprint();
89
90		let stored_ctx = self.context.as_ref().expect("TableScanNode context not set");
91		let shape = stored_ctx.services.catalog.get_or_load_row_shape(fingerprint, rx)?.ok_or_else(|| {
92			error!(diagnostic::internal::internal(format!(
93				"RowShape with fingerprint {:?} not found for table {}",
94				fingerprint,
95				self.table.def().name
96			)))
97		})?;
98
99		self.shape = Some(shape.clone());
100
101		Ok(shape)
102	}
103}
104
105impl QueryNode for TableScanNode {
106	#[instrument(level = "trace", skip_all, name = "volcano::scan::table::initialize")]
107	fn initialize<'a>(&mut self, _rx: &mut Transaction<'a>, _ctx: &QueryContext) -> Result<()> {
108		// Already has context from constructor
109		Ok(())
110	}
111
112	#[instrument(level = "trace", skip_all, name = "volcano::scan::table::next")]
113	fn next<'a>(&mut self, rx: &mut Transaction<'a>, _ctx: &mut QueryContext) -> Result<Option<Columns>> {
114		debug_assert!(self.context.is_some(), "TableScanNode::next() called before initialize()");
115		let stored_ctx = self.context.as_ref().unwrap();
116
117		if self.exhausted {
118			return Ok(None);
119		}
120
121		let batch_size = match self.scan_limit {
122			Some(limit) => (limit as u64).min(stored_ctx.batch_size),
123			None => stored_ctx.batch_size,
124		};
125
126		let range = RowKeyRange::scan_range(self.table.def().id.into(), self.last_key.as_ref());
127
128		let mut batch_rows = Vec::new();
129		let mut row_numbers = Vec::new();
130		let mut new_last_key = None;
131
132		// Use streaming API which properly handles version density at storage level
133		let mut stream = rx.range(range, batch_size as usize)?;
134
135		// Consume up to batch_size items from the stream
136		for _ in 0..batch_size {
137			match stream.next() {
138				Some(Ok(multi)) => {
139					if let Some(key) = RowKey::decode(&multi.key) {
140						batch_rows.push(multi.row);
141						row_numbers.push(key.row);
142						new_last_key = Some(multi.key);
143					}
144				}
145				Some(Err(e)) => return Err(e),
146				None => {
147					self.exhausted = true;
148					break;
149				}
150			}
151		}
152
153		// Drop the stream to release the borrow on rx before dictionary decoding
154		drop(stream);
155
156		if batch_rows.is_empty() {
157			self.exhausted = true;
158			if self.last_key.is_none() {
159				// Empty table: return empty columns with correct types to preserve shape
160				let columns: Vec<ColumnWithName> = self
161					.table
162					.columns()
163					.iter()
164					.map(|col| ColumnWithName {
165						name: Fragment::internal(&col.name),
166						data: ColumnBuffer::none_typed(col.constraint.get_type(), 0),
167					})
168					.collect();
169				return Ok(Some(Columns::new(columns)));
170			}
171			return Ok(None);
172		}
173
174		self.last_key = new_last_key;
175
176		// Create columns with storage types (dictionary ID types for dictionary columns)
177		let storage_columns: Vec<ColumnWithName> = {
178			self.table
179				.columns()
180				.iter()
181				.enumerate()
182				.map(|(idx, col)| ColumnWithName {
183					name: Fragment::internal(&col.name),
184					data: ColumnBuffer::with_capacity(self.storage_types[idx].clone(), 0),
185				})
186				.collect()
187		};
188
189		let mut columns = Columns::with_system_columns(storage_columns, Vec::new(), Vec::new(), Vec::new());
190		{
191			let shape = self.get_or_load_shape(rx, &batch_rows[0])?;
192			columns.append_rows(&shape, batch_rows.into_iter(), row_numbers.clone())?;
193		}
194		// Restore row numbers (they get cleared during column transformation)
195		columns.row_numbers = CowVec::new(row_numbers);
196
197		decode_dictionary_columns(&mut columns, &self.dictionaries, rx)?;
198
199		Ok(Some(columns))
200	}
201
202	fn headers(&self) -> Option<ColumnHeaders> {
203		Some(self.headers.clone())
204	}
205
206	#[instrument(level = "trace", skip_all, name = "volcano::scan::table::next_lazy")]
207	fn next_lazy<'a>(&mut self, rx: &mut Transaction<'a>, _ctx: &mut QueryContext) -> Result<Option<LazyBatch>> {
208		debug_assert!(self.context.is_some(), "TableScanNode::next_lazy() called before initialize()");
209		let stored_ctx = self.context.as_ref().unwrap();
210
211		if self.exhausted {
212			return Ok(None);
213		}
214
215		let batch_size = match self.scan_limit {
216			Some(limit) => (limit as u64).min(stored_ctx.batch_size),
217			None => stored_ctx.batch_size,
218		};
219
220		let range = RowKeyRange::scan_range(self.table.def().id.into(), self.last_key.as_ref());
221
222		let mut stream = rx.range(range, batch_size as usize)?;
223
224		let mut encoded_rows = Vec::with_capacity(batch_size as usize);
225		let mut row_numbers = Vec::with_capacity(batch_size as usize);
226
227		// Consume up to batch_size items from the stream
228		for _ in 0..batch_size {
229			match stream.next() {
230				Some(Ok(multi)) => {
231					if let Some(key) = RowKey::decode(&multi.key) {
232						encoded_rows.push(multi.row);
233						row_numbers.push(key.row);
234						self.last_key = Some(multi.key);
235					}
236				}
237				Some(Err(e)) => return Err(e),
238				None => {
239					self.exhausted = true;
240					break;
241				}
242			}
243		}
244
245		drop(stream);
246
247		if encoded_rows.is_empty() {
248			self.exhausted = true;
249			return Ok(None);
250		}
251
252		// Build column metas
253		let column_metas: Vec<LazyColumnMeta> = self
254			.table
255			.columns()
256			.iter()
257			.enumerate()
258			.map(|(idx, col)| {
259				let output_type = col.constraint.get_type();
260				LazyColumnMeta {
261					name: Fragment::internal(&col.name),
262					storage_type: self.storage_types[idx].clone(),
263					output_type,
264					dictionary: self.dictionaries[idx].clone(),
265				}
266			})
267			.collect();
268
269		let shape = self.get_or_load_shape(rx, &encoded_rows[0])?;
270		Ok(Some(LazyBatch::new(encoded_rows, row_numbers, &shape, column_metas)))
271	}
272
273	fn set_scan_limit(&mut self, limit: usize) {
274		self.scan_limit = Some(limit);
275	}
276}