Skip to main content

reifydb_engine/vm/volcano/scan/
ringbuffer.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_core::{
7	encoded::{row::EncodedRow, shape::RowShape},
8	interface::{
9		catalog::{dictionary::Dictionary, ringbuffer::PartitionedMetadata},
10		resolved::ResolvedRingBuffer,
11	},
12	internal_error,
13	key::row::RowKey,
14	value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns, headers::ColumnHeaders},
15};
16use reifydb_transaction::transaction::Transaction;
17use reifydb_type::{
18	fragment::Fragment,
19	util::cowvec::CowVec,
20	value::{Value, row_number::RowNumber, r#type::Type},
21};
22use tracing::instrument;
23
24use super::super::decode_dictionary_columns;
25use crate::{
26	Result,
27	vm::volcano::query::{QueryContext, QueryNode},
28};
29
30pub struct RingBufferScan {
31	ringbuffer: ResolvedRingBuffer,
32	/// All partitions for this ringbuffer (global = 1-element, partitioned = N-element)
33	partitions: Vec<PartitionedMetadata>,
34	current_partition_index: usize,
35	headers: ColumnHeaders,
36	shape: Option<RowShape>,
37	/// Storage types for each column (Type::DictionaryId for dictionary columns)
38	storage_types: Vec<Type>,
39	/// Dictionary definitions for columns that need decoding (None for non-dictionary columns)
40	dictionaries: Vec<Option<Dictionary>>,
41	/// Column indices for partition_by columns (empty for global ringbuffers)
42	partition_col_indices: Vec<usize>,
43	current_position: u64,
44	rows_returned_in_partition: u64,
45	context: Option<Arc<QueryContext>>,
46	initialized: bool,
47	scan_limit: Option<usize>,
48}
49
50impl RingBufferScan {
51	pub fn new(
52		ringbuffer: ResolvedRingBuffer,
53		context: Arc<QueryContext>,
54		rx: &mut Transaction<'_>,
55	) -> Result<Self> {
56		// Build storage types and dictionaries
57		let mut storage_types = Vec::with_capacity(ringbuffer.columns().len());
58		let mut dictionaries = Vec::with_capacity(ringbuffer.columns().len());
59
60		for col in ringbuffer.columns() {
61			if let Some(dict_id) = col.dictionary_id {
62				if let Some(dict) = context.services.catalog.find_dictionary(rx, dict_id)? {
63					storage_types.push(Type::DictionaryId);
64					dictionaries.push(Some(dict));
65				} else {
66					// Dictionary not found, fall back to constraint type
67					storage_types.push(col.constraint.get_type());
68					dictionaries.push(None);
69				}
70			} else {
71				storage_types.push(col.constraint.get_type());
72				dictionaries.push(None);
73			}
74		}
75
76		// Resolve partition column indices
77		let partition_col_indices: Vec<usize> = ringbuffer
78			.def()
79			.partition_by
80			.iter()
81			.map(|pb_col| ringbuffer.columns().iter().position(|c| c.name == *pb_col).unwrap())
82			.collect();
83
84		// Create columns headers
85		let headers = ColumnHeaders {
86			columns: ringbuffer.columns().iter().map(|col| Fragment::internal(&col.name)).collect(),
87		};
88
89		Ok(Self {
90			ringbuffer,
91			partitions: Vec::new(),
92			current_partition_index: 0,
93			headers,
94			shape: None,
95			storage_types,
96			dictionaries,
97			partition_col_indices,
98			current_position: 0,
99			rows_returned_in_partition: 0,
100			context: Some(context),
101			initialized: false,
102			scan_limit: None,
103		})
104	}
105
106	fn get_or_load_shape(&mut self, rx: &mut Transaction, first_row: &EncodedRow) -> Result<RowShape> {
107		if let Some(shape) = &self.shape {
108			return Ok(shape.clone());
109		}
110
111		let fingerprint = first_row.fingerprint();
112
113		let stored_ctx = self.context.as_ref().expect("RingBufferScan context not set");
114		let shape = stored_ctx.services.catalog.get_or_load_row_shape(fingerprint, rx)?.ok_or_else(|| {
115			internal_error!(
116				"RowShape with fingerprint {:?} not found for ringbuffer {}",
117				fingerprint,
118				self.ringbuffer.def().name
119			)
120		})?;
121
122		self.shape = Some(shape.clone());
123
124		Ok(shape)
125	}
126
127	/// Advance to next non-empty partition if current is exhausted. Returns false if all done.
128	fn advance_to_next_partition(&mut self) -> bool {
129		loop {
130			self.current_partition_index += 1;
131			if self.current_partition_index >= self.partitions.len() {
132				return false;
133			}
134			let partition = &self.partitions[self.current_partition_index].metadata;
135			if !partition.is_empty() {
136				self.current_position = partition.head;
137				self.rows_returned_in_partition = 0;
138				return true;
139			}
140		}
141	}
142}
143
144impl QueryNode for RingBufferScan {
145	#[instrument(name = "volcano::scan::ringbuffer::initialize", level = "trace", skip_all)]
146	fn initialize<'a>(&mut self, txn: &mut Transaction<'a>, ctx: &QueryContext) -> Result<()> {
147		if !self.initialized {
148			// Load all partitions (global = 1-element vec, partitioned = N-element vec)
149			self.partitions =
150				ctx.services.catalog.list_ringbuffer_partitions(txn, self.ringbuffer.def())?;
151
152			// Start scanning from the first non-empty partition's head
153			if let Some(partition) = self.partitions.first() {
154				self.current_position = partition.metadata.head;
155			}
156
157			self.initialized = true;
158		}
159		Ok(())
160	}
161
162	#[instrument(name = "volcano::scan::ringbuffer::next", level = "trace", skip_all)]
163	fn next<'a>(&mut self, txn: &mut Transaction<'a>, _ctx: &mut QueryContext) -> Result<Option<Columns>> {
164		let stored_ctx = self.context.as_ref().expect("RingBufferScan context not set");
165
166		// If no partitions, return empty shape
167		if self.partitions.is_empty() {
168			if self.current_partition_index == 0 {
169				self.current_partition_index = 1; // prevent re-entry
170				let columns: Vec<ColumnWithName> = self
171					.ringbuffer
172					.columns()
173					.iter()
174					.map(|col| ColumnWithName {
175						name: Fragment::internal(&col.name),
176						data: ColumnBuffer::none_typed(col.constraint.get_type(), 0),
177					})
178					.collect();
179				return Ok(Some(Columns::new(columns)));
180			}
181			return Ok(None);
182		}
183
184		// Check if we're past all partitions
185		if self.current_partition_index >= self.partitions.len() {
186			return Ok(None);
187		}
188
189		let batch_size = match self.scan_limit {
190			Some(limit) => limit.min(stored_ctx.batch_size as usize),
191			None => stored_ctx.batch_size as usize,
192		};
193
194		// Collect rows for this batch, spanning partitions if needed
195		let mut batch_rows = Vec::new();
196		let mut row_numbers = Vec::new();
197
198		loop {
199			if self.current_partition_index >= self.partitions.len() {
200				break;
201			}
202
203			// Copy partition fields to avoid holding a borrow on self
204			let partition_empty = self.partitions[self.current_partition_index].metadata.is_empty();
205			if partition_empty {
206				if !self.advance_to_next_partition() {
207					break;
208				}
209				continue;
210			}
211
212			let max_row_num = self.partitions[self.current_partition_index].metadata.tail;
213			let partition_count = self.partitions[self.current_partition_index].metadata.count;
214			let partition_values = self.partitions[self.current_partition_index].partition_values.clone();
215			let partition_col_indices = self.partition_col_indices.clone();
216
217			while batch_rows.len() < batch_size
218				&& self.rows_returned_in_partition < partition_count
219				&& self.current_position < max_row_num
220			{
221				let row_num = RowNumber(self.current_position);
222				let key = RowKey::encoded(self.ringbuffer.def().id, row_num);
223
224				if let Some(multi) = txn.get(&key)? {
225					// For partitioned ringbuffers, check if this row belongs to the current
226					// partition
227					if !partition_col_indices.is_empty() {
228						let shape = self.get_or_load_shape(txn, &multi.row)?;
229						if !row_matches_partition(
230							&shape,
231							&multi.row,
232							&partition_col_indices,
233							&partition_values,
234						) {
235							self.current_position += 1;
236							continue;
237						}
238					}
239					batch_rows.push(multi.row);
240					row_numbers.push(row_num);
241					self.rows_returned_in_partition += 1;
242				}
243
244				self.current_position += 1;
245			}
246
247			// If we've exhausted this partition, move to next
248			if (self.rows_returned_in_partition >= partition_count || self.current_position >= max_row_num)
249				&& !self.advance_to_next_partition()
250			{
251				break;
252			}
253
254			// If we've filled a batch, stop
255			if batch_rows.len() >= batch_size {
256				break;
257			}
258		}
259
260		if batch_rows.is_empty() {
261			// If we never returned any rows at all, return empty shape
262			if self.partitions.iter().all(|p| p.metadata.is_empty()) {
263				let columns: Vec<ColumnWithName> = self
264					.ringbuffer
265					.columns()
266					.iter()
267					.map(|col| ColumnWithName {
268						name: Fragment::internal(&col.name),
269						data: ColumnBuffer::none_typed(col.constraint.get_type(), 0),
270					})
271					.collect();
272				return Ok(Some(Columns::new(columns)));
273			}
274			Ok(None)
275		} else {
276			// Create columns with storage types (Type::DictionaryId for dictionary columns)
277			let storage_columns: Vec<ColumnWithName> = self
278				.ringbuffer
279				.columns()
280				.iter()
281				.enumerate()
282				.map(|(idx, col)| ColumnWithName {
283					name: Fragment::internal(&col.name),
284					data: ColumnBuffer::with_capacity(self.storage_types[idx].clone(), 0),
285				})
286				.collect();
287
288			let mut columns =
289				Columns::with_system_columns(storage_columns, Vec::new(), Vec::new(), Vec::new());
290			let shape = self.get_or_load_shape(txn, &batch_rows[0])?;
291			columns.append_rows(&shape, batch_rows.into_iter(), row_numbers.clone())?;
292
293			// Restore row numbers
294			columns.row_numbers = CowVec::new(row_numbers);
295
296			decode_dictionary_columns(&mut columns, &self.dictionaries, txn)?;
297
298			Ok(Some(columns))
299		}
300	}
301
302	fn headers(&self) -> Option<ColumnHeaders> {
303		Some(self.headers.clone())
304	}
305
306	fn set_scan_limit(&mut self, limit: usize) {
307		self.scan_limit = Some(limit);
308	}
309}
310
311fn row_matches_partition(
312	shape: &RowShape,
313	row: &EncodedRow,
314	partition_col_indices: &[usize],
315	expected_values: &[Value],
316) -> bool {
317	partition_col_indices.iter().zip(expected_values).all(|(&idx, expected)| shape.get_value(row, idx) == *expected)
318}