Skip to main content

reifydb_engine/vm/volcano/scan/
table.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_core::{
7	encoded::{key::EncodedKey, row::EncodedRow, shape::RowShape},
8	error::diagnostic,
9	interface::{catalog::dictionary::Dictionary, resolved::ResolvedTable},
10	key::{
11		EncodableKey,
12		row::{RowKey, RowKeyRange},
13	},
14	value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns, headers::ColumnHeaders},
15};
16use reifydb_transaction::transaction::Transaction;
17use reifydb_type::{error, fragment::Fragment, util::cowvec::CowVec, value::r#type::Type};
18use tracing::instrument;
19
20use super::super::decode_dictionary_columns;
21use crate::{
22	Result,
23	vm::volcano::query::{QueryContext, QueryNode},
24};
25
26pub struct TableScanNode {
27	table: ResolvedTable,
28	context: Option<Arc<QueryContext>>,
29	headers: ColumnHeaders,
30
31	storage_types: Vec<Type>,
32
33	dictionaries: Vec<Option<Dictionary>>,
34
35	shape: Option<RowShape>,
36	last_key: Option<EncodedKey>,
37	exhausted: bool,
38}
39
40impl TableScanNode {
41	pub fn new(table: ResolvedTable, context: Arc<QueryContext>, rx: &mut Transaction<'_>) -> Result<Self> {
42		let mut storage_types = Vec::with_capacity(table.columns().len());
43		let mut dictionaries = Vec::with_capacity(table.columns().len());
44
45		for col in table.columns() {
46			if let Some(dict_id) = col.dictionary_id {
47				if let Some(dict) = context.services.catalog.find_dictionary(rx, dict_id)? {
48					storage_types.push(Type::DictionaryId);
49					dictionaries.push(Some(dict));
50				} else {
51					storage_types.push(col.constraint.get_type());
52					dictionaries.push(None);
53				}
54			} else {
55				storage_types.push(col.constraint.get_type());
56				dictionaries.push(None);
57			}
58		}
59
60		let headers = ColumnHeaders {
61			columns: table.columns().iter().map(|col| Fragment::internal(&col.name)).collect(),
62		};
63
64		Ok(Self {
65			table,
66			context: Some(context),
67			headers,
68			storage_types,
69			dictionaries,
70			shape: None,
71			last_key: None,
72			exhausted: false,
73		})
74	}
75
76	fn get_or_load_shape<'a>(&mut self, rx: &mut Transaction<'a>, first_row: &EncodedRow) -> Result<RowShape> {
77		if let Some(shape) = &self.shape {
78			return Ok(shape.clone());
79		}
80
81		let fingerprint = first_row.fingerprint();
82
83		let stored_ctx = self.context.as_ref().expect("TableScanNode context not set");
84		let shape = stored_ctx.services.catalog.get_or_load_row_shape(fingerprint, rx)?.ok_or_else(|| {
85			error!(diagnostic::internal::internal(format!(
86				"RowShape with fingerprint {:?} not found for table {}",
87				fingerprint,
88				self.table.def().name
89			)))
90		})?;
91
92		self.shape = Some(shape.clone());
93
94		Ok(shape)
95	}
96}
97
98impl QueryNode for TableScanNode {
99	#[instrument(level = "trace", skip_all, name = "volcano::scan::table::initialize")]
100	fn initialize<'a>(&mut self, _rx: &mut Transaction<'a>, _ctx: &QueryContext) -> Result<()> {
101		Ok(())
102	}
103
104	#[instrument(level = "trace", skip_all, name = "volcano::scan::table::next")]
105	fn next<'a>(&mut self, rx: &mut Transaction<'a>, _ctx: &mut QueryContext) -> Result<Option<Columns>> {
106		debug_assert!(self.context.is_some(), "TableScanNode::next() called before initialize()");
107		let stored_ctx = self.context.as_ref().unwrap();
108
109		if self.exhausted {
110			return Ok(None);
111		}
112
113		let batch_size = stored_ctx.batch_size;
114
115		let range = RowKeyRange::scan_range(self.table.def().id.into(), self.last_key.as_ref());
116
117		let mut batch_rows = Vec::new();
118		let mut row_numbers = Vec::new();
119		let mut new_last_key = None;
120
121		let mut stream = rx.range(range, batch_size as usize)?;
122
123		for _ in 0..batch_size {
124			match stream.next() {
125				Some(Ok(multi)) => {
126					if let Some(key) = RowKey::decode(&multi.key) {
127						batch_rows.push(multi.row);
128						row_numbers.push(key.row);
129						new_last_key = Some(multi.key);
130					}
131				}
132				Some(Err(e)) => return Err(e),
133				None => {
134					self.exhausted = true;
135					break;
136				}
137			}
138		}
139
140		drop(stream);
141
142		if batch_rows.is_empty() {
143			self.exhausted = true;
144			if self.last_key.is_none() {
145				let columns: Vec<ColumnWithName> = self
146					.table
147					.columns()
148					.iter()
149					.map(|col| ColumnWithName {
150						name: Fragment::internal(&col.name),
151						data: ColumnBuffer::none_typed(col.constraint.get_type(), 0),
152					})
153					.collect();
154				return Ok(Some(Columns::new(columns)));
155			}
156			return Ok(None);
157		}
158
159		self.last_key = new_last_key;
160
161		let storage_columns: Vec<ColumnWithName> = {
162			self.table
163				.columns()
164				.iter()
165				.enumerate()
166				.map(|(idx, col)| ColumnWithName {
167					name: Fragment::internal(&col.name),
168					data: ColumnBuffer::with_capacity(self.storage_types[idx].clone(), 0),
169				})
170				.collect()
171		};
172
173		let mut columns = Columns::with_system_columns(storage_columns, Vec::new(), Vec::new(), Vec::new());
174		{
175			let shape = self.get_or_load_shape(rx, &batch_rows[0])?;
176			columns.append_rows(&shape, batch_rows.into_iter(), row_numbers.clone())?;
177		}
178
179		columns.row_numbers = CowVec::new(row_numbers);
180
181		decode_dictionary_columns(&mut columns, &self.dictionaries, rx)?;
182
183		Ok(Some(columns))
184	}
185
186	fn headers(&self) -> Option<ColumnHeaders> {
187		Some(self.headers.clone())
188	}
189}