Skip to main content

reifydb_engine/vm/volcano/scan/
ringbuffer.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_core::{
7	encoded::{row::EncodedRow, shape::RowShape},
8	interface::{
9		catalog::{dictionary::Dictionary, ringbuffer::PartitionedMetadata},
10		resolved::ResolvedRingBuffer,
11	},
12	internal_error,
13	key::row::RowKey,
14	value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns, headers::ColumnHeaders},
15};
16use reifydb_transaction::transaction::Transaction;
17use reifydb_type::{
18	fragment::Fragment,
19	util::cowvec::CowVec,
20	value::{Value, row_number::RowNumber, r#type::Type},
21};
22use tracing::instrument;
23
24use super::super::decode_dictionary_columns;
25use crate::{
26	Result,
27	vm::volcano::query::{QueryContext, QueryNode},
28};
29
30pub struct RingBufferScan {
31	ringbuffer: ResolvedRingBuffer,
32
33	partitions: Vec<PartitionedMetadata>,
34	current_partition_index: usize,
35	headers: ColumnHeaders,
36	shape: Option<RowShape>,
37
38	storage_types: Vec<Type>,
39
40	dictionaries: Vec<Option<Dictionary>>,
41
42	partition_col_indices: Vec<usize>,
43	current_position: u64,
44	rows_returned_in_partition: u64,
45	context: Option<Arc<QueryContext>>,
46	initialized: bool,
47}
48
49impl RingBufferScan {
50	pub fn new(
51		ringbuffer: ResolvedRingBuffer,
52		context: Arc<QueryContext>,
53		rx: &mut Transaction<'_>,
54	) -> Result<Self> {
55		let mut storage_types = Vec::with_capacity(ringbuffer.columns().len());
56		let mut dictionaries = Vec::with_capacity(ringbuffer.columns().len());
57
58		for col in ringbuffer.columns() {
59			if let Some(dict_id) = col.dictionary_id {
60				if let Some(dict) = context.services.catalog.find_dictionary(rx, dict_id)? {
61					storage_types.push(Type::DictionaryId);
62					dictionaries.push(Some(dict));
63				} else {
64					storage_types.push(col.constraint.get_type());
65					dictionaries.push(None);
66				}
67			} else {
68				storage_types.push(col.constraint.get_type());
69				dictionaries.push(None);
70			}
71		}
72
73		let partition_col_indices: Vec<usize> = ringbuffer
74			.def()
75			.partition_by
76			.iter()
77			.map(|pb_col| ringbuffer.columns().iter().position(|c| c.name == *pb_col).unwrap())
78			.collect();
79
80		let headers = ColumnHeaders {
81			columns: ringbuffer.columns().iter().map(|col| Fragment::internal(&col.name)).collect(),
82		};
83
84		Ok(Self {
85			ringbuffer,
86			partitions: Vec::new(),
87			current_partition_index: 0,
88			headers,
89			shape: None,
90			storage_types,
91			dictionaries,
92			partition_col_indices,
93			current_position: 0,
94			rows_returned_in_partition: 0,
95			context: Some(context),
96			initialized: false,
97		})
98	}
99
100	fn get_or_load_shape(&mut self, rx: &mut Transaction, first_row: &EncodedRow) -> Result<RowShape> {
101		if let Some(shape) = &self.shape {
102			return Ok(shape.clone());
103		}
104
105		let fingerprint = first_row.fingerprint();
106
107		let stored_ctx = self.context.as_ref().expect("RingBufferScan context not set");
108		let shape = stored_ctx.services.catalog.get_or_load_row_shape(fingerprint, rx)?.ok_or_else(|| {
109			internal_error!(
110				"RowShape with fingerprint {:?} not found for ringbuffer {}",
111				fingerprint,
112				self.ringbuffer.def().name
113			)
114		})?;
115
116		self.shape = Some(shape.clone());
117
118		Ok(shape)
119	}
120
121	fn advance_to_next_partition(&mut self) -> bool {
122		loop {
123			self.current_partition_index += 1;
124			if self.current_partition_index >= self.partitions.len() {
125				return false;
126			}
127			let partition = &self.partitions[self.current_partition_index].metadata;
128			if !partition.is_empty() {
129				self.current_position = partition.head;
130				self.rows_returned_in_partition = 0;
131				return true;
132			}
133		}
134	}
135}
136
137impl QueryNode for RingBufferScan {
138	#[instrument(name = "volcano::scan::ringbuffer::initialize", level = "trace", skip_all)]
139	fn initialize<'a>(&mut self, txn: &mut Transaction<'a>, ctx: &QueryContext) -> Result<()> {
140		if !self.initialized {
141			self.partitions =
142				ctx.services.catalog.list_ringbuffer_partitions(txn, self.ringbuffer.def())?;
143
144			if let Some(partition) = self.partitions.first() {
145				self.current_position = partition.metadata.head;
146			}
147
148			self.initialized = true;
149		}
150		Ok(())
151	}
152
153	#[instrument(name = "volcano::scan::ringbuffer::next", level = "trace", skip_all)]
154	fn next<'a>(&mut self, txn: &mut Transaction<'a>, _ctx: &mut QueryContext) -> Result<Option<Columns>> {
155		let stored_ctx = self.context.as_ref().expect("RingBufferScan context not set");
156
157		if self.partitions.is_empty() {
158			if self.current_partition_index == 0 {
159				self.current_partition_index = 1;
160				let columns: Vec<ColumnWithName> = self
161					.ringbuffer
162					.columns()
163					.iter()
164					.map(|col| ColumnWithName {
165						name: Fragment::internal(&col.name),
166						data: ColumnBuffer::none_typed(col.constraint.get_type(), 0),
167					})
168					.collect();
169				return Ok(Some(Columns::new(columns)));
170			}
171			return Ok(None);
172		}
173
174		if self.current_partition_index >= self.partitions.len() {
175			return Ok(None);
176		}
177
178		let batch_size = stored_ctx.batch_size as usize;
179
180		let mut batch_rows = Vec::new();
181		let mut row_numbers = Vec::new();
182
183		loop {
184			if self.current_partition_index >= self.partitions.len() {
185				break;
186			}
187
188			let partition_empty = self.partitions[self.current_partition_index].metadata.is_empty();
189			if partition_empty {
190				if !self.advance_to_next_partition() {
191					break;
192				}
193				continue;
194			}
195
196			let max_row_num = self.partitions[self.current_partition_index].metadata.tail;
197			let partition_count = self.partitions[self.current_partition_index].metadata.count;
198			let partition_values = self.partitions[self.current_partition_index].partition_values.clone();
199			let partition_col_indices = self.partition_col_indices.clone();
200
201			while batch_rows.len() < batch_size
202				&& self.rows_returned_in_partition < partition_count
203				&& self.current_position < max_row_num
204			{
205				let row_num = RowNumber(self.current_position);
206				let key = RowKey::encoded(self.ringbuffer.def().id, row_num);
207
208				if let Some(multi) = txn.get(&key)? {
209					if !partition_col_indices.is_empty() {
210						let shape = self.get_or_load_shape(txn, &multi.row)?;
211						if !row_matches_partition(
212							&shape,
213							&multi.row,
214							&partition_col_indices,
215							&partition_values,
216						) {
217							self.current_position += 1;
218							continue;
219						}
220					}
221					batch_rows.push(multi.row);
222					row_numbers.push(row_num);
223					self.rows_returned_in_partition += 1;
224				}
225
226				self.current_position += 1;
227			}
228
229			if (self.rows_returned_in_partition >= partition_count || self.current_position >= max_row_num)
230				&& !self.advance_to_next_partition()
231			{
232				break;
233			}
234
235			if batch_rows.len() >= batch_size {
236				break;
237			}
238		}
239
240		if batch_rows.is_empty() {
241			if self.partitions.iter().all(|p| p.metadata.is_empty()) {
242				let columns: Vec<ColumnWithName> = self
243					.ringbuffer
244					.columns()
245					.iter()
246					.map(|col| ColumnWithName {
247						name: Fragment::internal(&col.name),
248						data: ColumnBuffer::none_typed(col.constraint.get_type(), 0),
249					})
250					.collect();
251				return Ok(Some(Columns::new(columns)));
252			}
253			Ok(None)
254		} else {
255			let storage_columns: Vec<ColumnWithName> = self
256				.ringbuffer
257				.columns()
258				.iter()
259				.enumerate()
260				.map(|(idx, col)| ColumnWithName {
261					name: Fragment::internal(&col.name),
262					data: ColumnBuffer::with_capacity(self.storage_types[idx].clone(), 0),
263				})
264				.collect();
265
266			let mut columns =
267				Columns::with_system_columns(storage_columns, Vec::new(), Vec::new(), Vec::new());
268			let shape = self.get_or_load_shape(txn, &batch_rows[0])?;
269			columns.append_rows(&shape, batch_rows.into_iter(), row_numbers.clone())?;
270
271			columns.row_numbers = CowVec::new(row_numbers);
272
273			decode_dictionary_columns(&mut columns, &self.dictionaries, txn)?;
274
275			Ok(Some(columns))
276		}
277	}
278
279	fn headers(&self) -> Option<ColumnHeaders> {
280		Some(self.headers.clone())
281	}
282}
283
284fn row_matches_partition(
285	shape: &RowShape,
286	row: &EncodedRow,
287	partition_col_indices: &[usize],
288	expected_values: &[Value],
289) -> bool {
290	partition_col_indices.iter().zip(expected_values).all(|(&idx, expected)| shape.get_value(row, idx) == *expected)
291}