reifydb_engine/vm/volcano/scan/
series.rs1use std::sync::Arc;
5
6use reifydb_core::{
7 encoded::key::EncodedKey,
8 interface::resolved::ResolvedSeries,
9 key::{
10 EncodableKey,
11 series_row::{SeriesRowKey, SeriesRowKeyRange},
12 },
13 value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns, headers::ColumnHeaders},
14};
15use reifydb_transaction::transaction::Transaction;
16use reifydb_type::{
17 fragment::Fragment,
18 value::{Value, datetime::DateTime, row_number::RowNumber, r#type::Type},
19};
20use tracing::instrument;
21
22use crate::{
23 Result,
24 vm::{
25 instruction::dml::shape::get_or_create_series_shape,
26 volcano::query::{QueryContext, QueryNode},
27 },
28};
29
30pub struct SeriesScanNode {
31 series: ResolvedSeries,
32 key_range_start: Option<u64>,
33 key_range_end: Option<u64>,
34 variant_tag: Option<u8>,
35 context: Option<Arc<QueryContext>>,
36 headers: ColumnHeaders,
37 last_key: Option<EncodedKey>,
38 exhausted: bool,
39 scan_limit: Option<usize>,
40}
41
42impl SeriesScanNode {
43 pub fn new(
44 series: ResolvedSeries,
45 key_range_start: Option<u64>,
46 key_range_end: Option<u64>,
47 variant_tag: Option<u8>,
48 context: Arc<QueryContext>,
49 ) -> Result<Self> {
50 let mut columns = vec![Fragment::internal(series.def().key.column())];
52 if series.def().tag.is_some() {
53 columns.push(Fragment::internal("tag"));
54 }
55 for col in series.columns() {
56 columns.push(Fragment::internal(&col.name));
57 }
58 let headers = ColumnHeaders {
59 columns,
60 };
61
62 Ok(Self {
63 series,
64 key_range_start,
65 key_range_end,
66 variant_tag,
67 context: Some(context),
68 headers,
69 last_key: None,
70 exhausted: false,
71 scan_limit: None,
72 })
73 }
74}
75
76impl QueryNode for SeriesScanNode {
77 #[instrument(name = "volcano::scan::series::initialize", level = "trace", skip_all)]
78 fn initialize<'a>(&mut self, _rx: &mut Transaction<'a>, _ctx: &QueryContext) -> Result<()> {
79 Ok(())
80 }
81
82 #[instrument(name = "volcano::scan::series::next", level = "trace", skip_all)]
83 fn next<'a>(&mut self, rx: &mut Transaction<'a>, _ctx: &mut QueryContext) -> Result<Option<Columns>> {
84 debug_assert!(self.context.is_some(), "SeriesScanNode::next() called before initialize()");
85 let stored_ctx = self.context.as_ref().unwrap();
86
87 if self.exhausted {
88 return Ok(None);
89 }
90
91 let batch_size = match self.scan_limit {
92 Some(limit) => (limit as u64).min(stored_ctx.batch_size),
93 None => stored_ctx.batch_size,
94 };
95 let series = self.series.def();
96 let has_tag = series.tag.is_some();
97
98 let range = SeriesRowKeyRange::scan_range(
100 series.id,
101 self.variant_tag,
102 self.key_range_start,
103 self.key_range_end,
104 self.last_key.as_ref(),
105 );
106
107 let mut key_values: Vec<u64> = Vec::new();
108 let mut tags: Vec<u8> = Vec::new();
109 let mut sequences: Vec<u64> = Vec::new();
110 let mut created_at_values: Vec<DateTime> = Vec::new();
111 let mut updated_at_values: Vec<DateTime> = Vec::new();
112 let mut data_rows: Vec<Vec<Value>> = Vec::new();
113 let mut new_last_key = None;
114
115 let read_shape = get_or_create_series_shape(&stored_ctx.services.catalog, self.series.def(), rx)?;
117
118 let mut stream = rx.range(range, batch_size as usize)?;
119 let mut count = 0;
120
121 for entry in stream.by_ref() {
122 let entry = entry?;
123
124 if let Some(key) = SeriesRowKey::decode(&entry.key) {
126 key_values.push(key.key);
127 sequences.push(key.sequence);
128 created_at_values.push(DateTime::from_nanos(entry.row.created_at_nanos()));
129 updated_at_values.push(DateTime::from_nanos(entry.row.updated_at_nanos()));
130 if has_tag {
131 tags.push(key.variant_tag.unwrap_or(0));
132 }
133
134 let mut values = Vec::with_capacity(series.data_columns().count());
136 for (i, _) in series.data_columns().enumerate() {
137 values.push(read_shape.get_value(&entry.row, i + 1));
138 }
139 data_rows.push(values);
140
141 new_last_key = Some(entry.key);
142 count += 1;
143 if count >= batch_size as usize {
144 break;
145 }
146 }
147 }
148
149 drop(stream);
150
151 if key_values.is_empty() {
152 self.exhausted = true;
153 if self.last_key.is_none() {
154 let key_type = series
156 .columns
157 .iter()
158 .find(|c| c.name == series.key.column())
159 .map(|c| c.constraint.get_type())
160 .unwrap_or(Type::Int8);
161 let mut result_columns = Vec::new();
162 result_columns.push(ColumnWithName {
163 name: Fragment::internal(series.key.column()),
164 data: ColumnBuffer::none_typed(key_type, 0),
165 });
166 if has_tag {
167 result_columns.push(ColumnWithName {
168 name: Fragment::internal("tag"),
169 data: ColumnBuffer::none_typed(Type::Uint1, 0),
170 });
171 }
172 for col_def in series.data_columns() {
173 result_columns.push(ColumnWithName {
174 name: Fragment::internal(&col_def.name),
175 data: ColumnBuffer::none_typed(col_def.constraint.get_type(), 0),
176 });
177 }
178 return Ok(Some(Columns::new(result_columns)));
179 }
180 return Ok(None);
181 }
182
183 self.last_key = new_last_key;
184
185 let mut result_columns = Vec::new();
187
188 result_columns.push(ColumnWithName::new(
190 Fragment::internal(series.key.column()),
191 series.key_column_data(key_values),
192 ));
193
194 if has_tag {
196 result_columns.push(ColumnWithName::new(Fragment::internal("tag"), ColumnBuffer::uint1(tags)));
197 }
198
199 for (col_idx, col_def) in series.data_columns().enumerate() {
201 let col_type = col_def.constraint.get_type();
202 let col_values: Vec<Value> = data_rows
203 .iter()
204 .map(|row| row.get(col_idx).cloned().unwrap_or(Value::none()))
205 .collect();
206
207 result_columns.push(build_data_column(&col_def.name, &col_values, col_type)?);
208 }
209
210 let row_numbers: Vec<RowNumber> = sequences.into_iter().map(RowNumber::from).collect();
211 Ok(Some(Columns::with_system_columns(
212 result_columns,
213 row_numbers,
214 created_at_values,
215 updated_at_values,
216 )))
217 }
218
219 fn headers(&self) -> Option<ColumnHeaders> {
220 Some(self.headers.clone())
221 }
222
223 fn set_scan_limit(&mut self, limit: usize) {
224 self.scan_limit = Some(limit);
225 }
226}
227
228pub(crate) fn build_data_column(name: &str, values: &[Value], col_type: Type) -> Result<ColumnWithName> {
229 let data = match col_type {
230 Type::Boolean => {
231 let vals: Vec<bool> = values
232 .iter()
233 .map(|v| match v {
234 Value::Boolean(b) => *b,
235 _ => false,
236 })
237 .collect();
238 ColumnBuffer::bool(vals)
239 }
240 Type::Int1 => {
241 let vals: Vec<i8> = values
242 .iter()
243 .map(|v| match v {
244 Value::Int1(n) => *n,
245 _ => 0,
246 })
247 .collect();
248 ColumnBuffer::int1(vals)
249 }
250 Type::Int2 => {
251 let vals: Vec<i16> = values
252 .iter()
253 .map(|v| match v {
254 Value::Int2(n) => *n,
255 _ => 0,
256 })
257 .collect();
258 ColumnBuffer::int2(vals)
259 }
260 Type::Int4 => {
261 let vals: Vec<i32> = values
262 .iter()
263 .map(|v| match v {
264 Value::Int4(n) => *n,
265 _ => 0,
266 })
267 .collect();
268 ColumnBuffer::int4(vals)
269 }
270 Type::Int8 => {
271 let vals: Vec<i64> = values
272 .iter()
273 .map(|v| match v {
274 Value::Int8(n) => *n,
275 _ => 0,
276 })
277 .collect();
278 ColumnBuffer::int8(vals)
279 }
280 Type::Uint1 => {
281 let vals: Vec<u8> = values
282 .iter()
283 .map(|v| match v {
284 Value::Uint1(n) => *n,
285 _ => 0,
286 })
287 .collect();
288 ColumnBuffer::uint1(vals)
289 }
290 Type::Uint2 => {
291 let vals: Vec<u16> = values
292 .iter()
293 .map(|v| match v {
294 Value::Uint2(n) => *n,
295 _ => 0,
296 })
297 .collect();
298 ColumnBuffer::uint2(vals)
299 }
300 Type::Uint4 => {
301 let vals: Vec<u32> = values
302 .iter()
303 .map(|v| match v {
304 Value::Uint4(n) => *n,
305 _ => 0,
306 })
307 .collect();
308 ColumnBuffer::uint4(vals)
309 }
310 Type::Uint8 => {
311 let vals: Vec<u64> = values
312 .iter()
313 .map(|v| match v {
314 Value::Uint8(n) => *n,
315 _ => 0,
316 })
317 .collect();
318 ColumnBuffer::uint8(vals)
319 }
320 Type::Float4 => {
321 let vals: Vec<f32> = values
322 .iter()
323 .map(|v| match v {
324 Value::Float4(n) => n.value(),
325 _ => 0.0,
326 })
327 .collect();
328 ColumnBuffer::float4(vals)
329 }
330 Type::Float8 => {
331 let vals: Vec<f64> = values
332 .iter()
333 .map(|v| match v {
334 Value::Float8(n) => n.value(),
335 _ => 0.0,
336 })
337 .collect();
338 ColumnBuffer::float8(vals)
339 }
340 Type::Utf8 => {
341 let vals: Vec<String> = values
342 .iter()
343 .map(|v| match v {
344 Value::Utf8(s) => s.clone(),
345 _ => String::new(),
346 })
347 .collect();
348 ColumnBuffer::utf8(vals)
349 }
350 _ => {
351 let vals: Vec<String> = values.iter().map(|v| format!("{:?}", v)).collect();
353 ColumnBuffer::utf8(vals)
354 }
355 };
356
357 Ok(ColumnWithName {
358 name: Fragment::internal(name),
359 data,
360 })
361}