reifydb_engine/vm/volcano/scan/
ringbuffer.rs1use std::sync::Arc;
5
6use reifydb_core::{
7 encoded::{row::EncodedRow, shape::RowShape},
8 interface::{
9 catalog::{dictionary::Dictionary, ringbuffer::PartitionedMetadata},
10 resolved::ResolvedRingBuffer,
11 },
12 internal_error,
13 key::row::RowKey,
14 value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns, headers::ColumnHeaders},
15};
16use reifydb_transaction::transaction::Transaction;
17use reifydb_type::{
18 fragment::Fragment,
19 util::cowvec::CowVec,
20 value::{Value, row_number::RowNumber, r#type::Type},
21};
22use tracing::instrument;
23
24use super::super::decode_dictionary_columns;
25use crate::{
26 Result,
27 vm::volcano::query::{QueryContext, QueryNode},
28};
29
30pub struct RingBufferScan {
31 ringbuffer: ResolvedRingBuffer,
32
33 partitions: Vec<PartitionedMetadata>,
34 current_partition_index: usize,
35 headers: ColumnHeaders,
36 shape: Option<RowShape>,
37
38 storage_types: Vec<Type>,
39
40 dictionaries: Vec<Option<Dictionary>>,
41
42 partition_col_indices: Vec<usize>,
43 current_position: u64,
44 rows_returned_in_partition: u64,
45 context: Option<Arc<QueryContext>>,
46 initialized: bool,
47}
48
49impl RingBufferScan {
50 pub fn new(
51 ringbuffer: ResolvedRingBuffer,
52 context: Arc<QueryContext>,
53 rx: &mut Transaction<'_>,
54 ) -> Result<Self> {
55 let mut storage_types = Vec::with_capacity(ringbuffer.columns().len());
56 let mut dictionaries = Vec::with_capacity(ringbuffer.columns().len());
57
58 for col in ringbuffer.columns() {
59 if let Some(dict_id) = col.dictionary_id {
60 if let Some(dict) = context.services.catalog.find_dictionary(rx, dict_id)? {
61 storage_types.push(Type::DictionaryId);
62 dictionaries.push(Some(dict));
63 } else {
64 storage_types.push(col.constraint.get_type());
65 dictionaries.push(None);
66 }
67 } else {
68 storage_types.push(col.constraint.get_type());
69 dictionaries.push(None);
70 }
71 }
72
73 let partition_col_indices: Vec<usize> = ringbuffer
74 .def()
75 .partition_by
76 .iter()
77 .map(|pb_col| ringbuffer.columns().iter().position(|c| c.name == *pb_col).unwrap())
78 .collect();
79
80 let headers = ColumnHeaders {
81 columns: ringbuffer.columns().iter().map(|col| Fragment::internal(&col.name)).collect(),
82 };
83
84 Ok(Self {
85 ringbuffer,
86 partitions: Vec::new(),
87 current_partition_index: 0,
88 headers,
89 shape: None,
90 storage_types,
91 dictionaries,
92 partition_col_indices,
93 current_position: 0,
94 rows_returned_in_partition: 0,
95 context: Some(context),
96 initialized: false,
97 })
98 }
99
100 fn get_or_load_shape(&mut self, rx: &mut Transaction, first_row: &EncodedRow) -> Result<RowShape> {
101 if let Some(shape) = &self.shape {
102 return Ok(shape.clone());
103 }
104
105 let fingerprint = first_row.fingerprint();
106
107 let stored_ctx = self.context.as_ref().expect("RingBufferScan context not set");
108 let shape = stored_ctx.services.catalog.get_or_load_row_shape(fingerprint, rx)?.ok_or_else(|| {
109 internal_error!(
110 "RowShape with fingerprint {:?} not found for ringbuffer {}",
111 fingerprint,
112 self.ringbuffer.def().name
113 )
114 })?;
115
116 self.shape = Some(shape.clone());
117
118 Ok(shape)
119 }
120
121 fn advance_to_next_partition(&mut self) -> bool {
122 loop {
123 self.current_partition_index += 1;
124 if self.current_partition_index >= self.partitions.len() {
125 return false;
126 }
127 let partition = &self.partitions[self.current_partition_index].metadata;
128 if !partition.is_empty() {
129 self.current_position = partition.head;
130 self.rows_returned_in_partition = 0;
131 return true;
132 }
133 }
134 }
135}
136
137impl QueryNode for RingBufferScan {
138 #[instrument(name = "volcano::scan::ringbuffer::initialize", level = "trace", skip_all)]
139 fn initialize<'a>(&mut self, txn: &mut Transaction<'a>, ctx: &QueryContext) -> Result<()> {
140 if !self.initialized {
141 self.partitions =
142 ctx.services.catalog.list_ringbuffer_partitions(txn, self.ringbuffer.def())?;
143
144 if let Some(partition) = self.partitions.first() {
145 self.current_position = partition.metadata.head;
146 }
147
148 self.initialized = true;
149 }
150 Ok(())
151 }
152
153 #[instrument(name = "volcano::scan::ringbuffer::next", level = "trace", skip_all)]
154 fn next<'a>(&mut self, txn: &mut Transaction<'a>, _ctx: &mut QueryContext) -> Result<Option<Columns>> {
155 let stored_ctx = self.context.as_ref().expect("RingBufferScan context not set");
156
157 if self.partitions.is_empty() {
158 if self.current_partition_index == 0 {
159 self.current_partition_index = 1;
160 let columns: Vec<ColumnWithName> = self
161 .ringbuffer
162 .columns()
163 .iter()
164 .map(|col| ColumnWithName {
165 name: Fragment::internal(&col.name),
166 data: ColumnBuffer::none_typed(col.constraint.get_type(), 0),
167 })
168 .collect();
169 return Ok(Some(Columns::new(columns)));
170 }
171 return Ok(None);
172 }
173
174 if self.current_partition_index >= self.partitions.len() {
175 return Ok(None);
176 }
177
178 let batch_size = stored_ctx.batch_size as usize;
179
180 let mut batch_rows = Vec::new();
181 let mut row_numbers = Vec::new();
182
183 loop {
184 if self.current_partition_index >= self.partitions.len() {
185 break;
186 }
187
188 let partition_empty = self.partitions[self.current_partition_index].metadata.is_empty();
189 if partition_empty {
190 if !self.advance_to_next_partition() {
191 break;
192 }
193 continue;
194 }
195
196 let max_row_num = self.partitions[self.current_partition_index].metadata.tail;
197 let partition_count = self.partitions[self.current_partition_index].metadata.count;
198 let partition_values = self.partitions[self.current_partition_index].partition_values.clone();
199 let partition_col_indices = self.partition_col_indices.clone();
200
201 while batch_rows.len() < batch_size
202 && self.rows_returned_in_partition < partition_count
203 && self.current_position < max_row_num
204 {
205 let row_num = RowNumber(self.current_position);
206 let key = RowKey::encoded(self.ringbuffer.def().id, row_num);
207
208 if let Some(multi) = txn.get(&key)? {
209 if !partition_col_indices.is_empty() {
210 let shape = self.get_or_load_shape(txn, &multi.row)?;
211 if !row_matches_partition(
212 &shape,
213 &multi.row,
214 &partition_col_indices,
215 &partition_values,
216 ) {
217 self.current_position += 1;
218 continue;
219 }
220 }
221 batch_rows.push(multi.row);
222 row_numbers.push(row_num);
223 self.rows_returned_in_partition += 1;
224 }
225
226 self.current_position += 1;
227 }
228
229 if (self.rows_returned_in_partition >= partition_count || self.current_position >= max_row_num)
230 && !self.advance_to_next_partition()
231 {
232 break;
233 }
234
235 if batch_rows.len() >= batch_size {
236 break;
237 }
238 }
239
240 if batch_rows.is_empty() {
241 if self.partitions.iter().all(|p| p.metadata.is_empty()) {
242 let columns: Vec<ColumnWithName> = self
243 .ringbuffer
244 .columns()
245 .iter()
246 .map(|col| ColumnWithName {
247 name: Fragment::internal(&col.name),
248 data: ColumnBuffer::none_typed(col.constraint.get_type(), 0),
249 })
250 .collect();
251 return Ok(Some(Columns::new(columns)));
252 }
253 Ok(None)
254 } else {
255 let storage_columns: Vec<ColumnWithName> = self
256 .ringbuffer
257 .columns()
258 .iter()
259 .enumerate()
260 .map(|(idx, col)| ColumnWithName {
261 name: Fragment::internal(&col.name),
262 data: ColumnBuffer::with_capacity(self.storage_types[idx].clone(), 0),
263 })
264 .collect();
265
266 let mut columns =
267 Columns::with_system_columns(storage_columns, Vec::new(), Vec::new(), Vec::new());
268 let shape = self.get_or_load_shape(txn, &batch_rows[0])?;
269 columns.append_rows(&shape, batch_rows.into_iter(), row_numbers.clone())?;
270
271 columns.row_numbers = CowVec::new(row_numbers);
272
273 decode_dictionary_columns(&mut columns, &self.dictionaries, txn)?;
274
275 Ok(Some(columns))
276 }
277 }
278
279 fn headers(&self) -> Option<ColumnHeaders> {
280 Some(self.headers.clone())
281 }
282}
283
284fn row_matches_partition(
285 shape: &RowShape,
286 row: &EncodedRow,
287 partition_col_indices: &[usize],
288 expected_values: &[Value],
289) -> bool {
290 partition_col_indices.iter().zip(expected_values).all(|(&idx, expected)| shape.get_value(row, idx) == *expected)
291}