reifydb_engine/vm/volcano/scan/
ringbuffer.rs1use std::sync::Arc;
5
6use reifydb_core::{
7 encoded::{row::EncodedRow, shape::RowShape},
8 interface::{
9 catalog::{dictionary::Dictionary, ringbuffer::PartitionedMetadata},
10 resolved::ResolvedRingBuffer,
11 },
12 internal_error,
13 key::row::RowKey,
14 value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns, headers::ColumnHeaders},
15};
16use reifydb_transaction::transaction::Transaction;
17use reifydb_type::{
18 fragment::Fragment,
19 util::cowvec::CowVec,
20 value::{Value, row_number::RowNumber, r#type::Type},
21};
22use tracing::instrument;
23
24use super::super::decode_dictionary_columns;
25use crate::{
26 Result,
27 vm::volcano::query::{QueryContext, QueryNode},
28};
29
30pub struct RingBufferScan {
31 ringbuffer: ResolvedRingBuffer,
32 partitions: Vec<PartitionedMetadata>,
34 current_partition_index: usize,
35 headers: ColumnHeaders,
36 shape: Option<RowShape>,
37 storage_types: Vec<Type>,
39 dictionaries: Vec<Option<Dictionary>>,
41 partition_col_indices: Vec<usize>,
43 current_position: u64,
44 rows_returned_in_partition: u64,
45 context: Option<Arc<QueryContext>>,
46 initialized: bool,
47 scan_limit: Option<usize>,
48}
49
50impl RingBufferScan {
51 pub fn new(
52 ringbuffer: ResolvedRingBuffer,
53 context: Arc<QueryContext>,
54 rx: &mut Transaction<'_>,
55 ) -> Result<Self> {
56 let mut storage_types = Vec::with_capacity(ringbuffer.columns().len());
58 let mut dictionaries = Vec::with_capacity(ringbuffer.columns().len());
59
60 for col in ringbuffer.columns() {
61 if let Some(dict_id) = col.dictionary_id {
62 if let Some(dict) = context.services.catalog.find_dictionary(rx, dict_id)? {
63 storage_types.push(Type::DictionaryId);
64 dictionaries.push(Some(dict));
65 } else {
66 storage_types.push(col.constraint.get_type());
68 dictionaries.push(None);
69 }
70 } else {
71 storage_types.push(col.constraint.get_type());
72 dictionaries.push(None);
73 }
74 }
75
76 let partition_col_indices: Vec<usize> = ringbuffer
78 .def()
79 .partition_by
80 .iter()
81 .map(|pb_col| ringbuffer.columns().iter().position(|c| c.name == *pb_col).unwrap())
82 .collect();
83
84 let headers = ColumnHeaders {
86 columns: ringbuffer.columns().iter().map(|col| Fragment::internal(&col.name)).collect(),
87 };
88
89 Ok(Self {
90 ringbuffer,
91 partitions: Vec::new(),
92 current_partition_index: 0,
93 headers,
94 shape: None,
95 storage_types,
96 dictionaries,
97 partition_col_indices,
98 current_position: 0,
99 rows_returned_in_partition: 0,
100 context: Some(context),
101 initialized: false,
102 scan_limit: None,
103 })
104 }
105
106 fn get_or_load_shape(&mut self, rx: &mut Transaction, first_row: &EncodedRow) -> Result<RowShape> {
107 if let Some(shape) = &self.shape {
108 return Ok(shape.clone());
109 }
110
111 let fingerprint = first_row.fingerprint();
112
113 let stored_ctx = self.context.as_ref().expect("RingBufferScan context not set");
114 let shape = stored_ctx.services.catalog.get_or_load_row_shape(fingerprint, rx)?.ok_or_else(|| {
115 internal_error!(
116 "RowShape with fingerprint {:?} not found for ringbuffer {}",
117 fingerprint,
118 self.ringbuffer.def().name
119 )
120 })?;
121
122 self.shape = Some(shape.clone());
123
124 Ok(shape)
125 }
126
127 fn advance_to_next_partition(&mut self) -> bool {
129 loop {
130 self.current_partition_index += 1;
131 if self.current_partition_index >= self.partitions.len() {
132 return false;
133 }
134 let partition = &self.partitions[self.current_partition_index].metadata;
135 if !partition.is_empty() {
136 self.current_position = partition.head;
137 self.rows_returned_in_partition = 0;
138 return true;
139 }
140 }
141 }
142}
143
144impl QueryNode for RingBufferScan {
145 #[instrument(name = "volcano::scan::ringbuffer::initialize", level = "trace", skip_all)]
146 fn initialize<'a>(&mut self, txn: &mut Transaction<'a>, ctx: &QueryContext) -> Result<()> {
147 if !self.initialized {
148 self.partitions =
150 ctx.services.catalog.list_ringbuffer_partitions(txn, self.ringbuffer.def())?;
151
152 if let Some(partition) = self.partitions.first() {
154 self.current_position = partition.metadata.head;
155 }
156
157 self.initialized = true;
158 }
159 Ok(())
160 }
161
162 #[instrument(name = "volcano::scan::ringbuffer::next", level = "trace", skip_all)]
163 fn next<'a>(&mut self, txn: &mut Transaction<'a>, _ctx: &mut QueryContext) -> Result<Option<Columns>> {
164 let stored_ctx = self.context.as_ref().expect("RingBufferScan context not set");
165
166 if self.partitions.is_empty() {
168 if self.current_partition_index == 0 {
169 self.current_partition_index = 1; let columns: Vec<ColumnWithName> = self
171 .ringbuffer
172 .columns()
173 .iter()
174 .map(|col| ColumnWithName {
175 name: Fragment::internal(&col.name),
176 data: ColumnBuffer::none_typed(col.constraint.get_type(), 0),
177 })
178 .collect();
179 return Ok(Some(Columns::new(columns)));
180 }
181 return Ok(None);
182 }
183
184 if self.current_partition_index >= self.partitions.len() {
186 return Ok(None);
187 }
188
189 let batch_size = match self.scan_limit {
190 Some(limit) => limit.min(stored_ctx.batch_size as usize),
191 None => stored_ctx.batch_size as usize,
192 };
193
194 let mut batch_rows = Vec::new();
196 let mut row_numbers = Vec::new();
197
198 loop {
199 if self.current_partition_index >= self.partitions.len() {
200 break;
201 }
202
203 let partition_empty = self.partitions[self.current_partition_index].metadata.is_empty();
205 if partition_empty {
206 if !self.advance_to_next_partition() {
207 break;
208 }
209 continue;
210 }
211
212 let max_row_num = self.partitions[self.current_partition_index].metadata.tail;
213 let partition_count = self.partitions[self.current_partition_index].metadata.count;
214 let partition_values = self.partitions[self.current_partition_index].partition_values.clone();
215 let partition_col_indices = self.partition_col_indices.clone();
216
217 while batch_rows.len() < batch_size
218 && self.rows_returned_in_partition < partition_count
219 && self.current_position < max_row_num
220 {
221 let row_num = RowNumber(self.current_position);
222 let key = RowKey::encoded(self.ringbuffer.def().id, row_num);
223
224 if let Some(multi) = txn.get(&key)? {
225 if !partition_col_indices.is_empty() {
228 let shape = self.get_or_load_shape(txn, &multi.row)?;
229 if !row_matches_partition(
230 &shape,
231 &multi.row,
232 &partition_col_indices,
233 &partition_values,
234 ) {
235 self.current_position += 1;
236 continue;
237 }
238 }
239 batch_rows.push(multi.row);
240 row_numbers.push(row_num);
241 self.rows_returned_in_partition += 1;
242 }
243
244 self.current_position += 1;
245 }
246
247 if (self.rows_returned_in_partition >= partition_count || self.current_position >= max_row_num)
249 && !self.advance_to_next_partition()
250 {
251 break;
252 }
253
254 if batch_rows.len() >= batch_size {
256 break;
257 }
258 }
259
260 if batch_rows.is_empty() {
261 if self.partitions.iter().all(|p| p.metadata.is_empty()) {
263 let columns: Vec<ColumnWithName> = self
264 .ringbuffer
265 .columns()
266 .iter()
267 .map(|col| ColumnWithName {
268 name: Fragment::internal(&col.name),
269 data: ColumnBuffer::none_typed(col.constraint.get_type(), 0),
270 })
271 .collect();
272 return Ok(Some(Columns::new(columns)));
273 }
274 Ok(None)
275 } else {
276 let storage_columns: Vec<ColumnWithName> = self
278 .ringbuffer
279 .columns()
280 .iter()
281 .enumerate()
282 .map(|(idx, col)| ColumnWithName {
283 name: Fragment::internal(&col.name),
284 data: ColumnBuffer::with_capacity(self.storage_types[idx].clone(), 0),
285 })
286 .collect();
287
288 let mut columns =
289 Columns::with_system_columns(storage_columns, Vec::new(), Vec::new(), Vec::new());
290 let shape = self.get_or_load_shape(txn, &batch_rows[0])?;
291 columns.append_rows(&shape, batch_rows.into_iter(), row_numbers.clone())?;
292
293 columns.row_numbers = CowVec::new(row_numbers);
295
296 decode_dictionary_columns(&mut columns, &self.dictionaries, txn)?;
297
298 Ok(Some(columns))
299 }
300 }
301
302 fn headers(&self) -> Option<ColumnHeaders> {
303 Some(self.headers.clone())
304 }
305
306 fn set_scan_limit(&mut self, limit: usize) {
307 self.scan_limit = Some(limit);
308 }
309}
310
311fn row_matches_partition(
312 shape: &RowShape,
313 row: &EncodedRow,
314 partition_col_indices: &[usize],
315 expected_values: &[Value],
316) -> bool {
317 partition_col_indices.iter().zip(expected_values).all(|(&idx, expected)| shape.get_value(row, idx) == *expected)
318}