reifydb_engine/vm/volcano/scan/
series.rs1use std::sync::Arc;
5
6use reifydb_core::{
7 encoded::key::EncodedKey,
8 interface::resolved::ResolvedSeries,
9 key::{
10 EncodableKey,
11 series_row::{SeriesRowKey, SeriesRowKeyRange},
12 },
13 value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns, headers::ColumnHeaders},
14};
15use reifydb_transaction::transaction::Transaction;
16use reifydb_type::{
17 fragment::Fragment,
18 value::{Value, datetime::DateTime, row_number::RowNumber, r#type::Type},
19};
20use tracing::instrument;
21
22use crate::{
23 Result,
24 vm::{
25 instruction::dml::shape::get_or_create_series_shape,
26 volcano::query::{QueryContext, QueryNode},
27 },
28};
29
30pub struct SeriesScanNode {
31 series: ResolvedSeries,
32 key_range_start: Option<u64>,
33 key_range_end: Option<u64>,
34 variant_tag: Option<u8>,
35 context: Option<Arc<QueryContext>>,
36 headers: ColumnHeaders,
37 last_key: Option<EncodedKey>,
38 exhausted: bool,
39}
40
41impl SeriesScanNode {
42 pub fn new(
43 series: ResolvedSeries,
44 key_range_start: Option<u64>,
45 key_range_end: Option<u64>,
46 variant_tag: Option<u8>,
47 context: Arc<QueryContext>,
48 ) -> Result<Self> {
49 let mut columns = vec![Fragment::internal(series.def().key.column())];
50 if series.def().tag.is_some() {
51 columns.push(Fragment::internal("tag"));
52 }
53 for col in series.columns() {
54 columns.push(Fragment::internal(&col.name));
55 }
56 let headers = ColumnHeaders {
57 columns,
58 };
59
60 Ok(Self {
61 series,
62 key_range_start,
63 key_range_end,
64 variant_tag,
65 context: Some(context),
66 headers,
67 last_key: None,
68 exhausted: false,
69 })
70 }
71}
72
73impl QueryNode for SeriesScanNode {
74 #[instrument(name = "volcano::scan::series::initialize", level = "trace", skip_all)]
75 fn initialize<'a>(&mut self, _rx: &mut Transaction<'a>, _ctx: &QueryContext) -> Result<()> {
76 Ok(())
77 }
78
79 #[instrument(name = "volcano::scan::series::next", level = "trace", skip_all)]
80 fn next<'a>(&mut self, rx: &mut Transaction<'a>, _ctx: &mut QueryContext) -> Result<Option<Columns>> {
81 debug_assert!(self.context.is_some(), "SeriesScanNode::next() called before initialize()");
82 let stored_ctx = self.context.as_ref().unwrap();
83
84 if self.exhausted {
85 return Ok(None);
86 }
87
88 let batch_size = stored_ctx.batch_size;
89 let series = self.series.def();
90 let has_tag = series.tag.is_some();
91
92 let range = SeriesRowKeyRange::scan_range(
93 series.id,
94 self.variant_tag,
95 self.key_range_start,
96 self.key_range_end,
97 self.last_key.as_ref(),
98 );
99
100 let mut key_values: Vec<u64> = Vec::new();
101 let mut tags: Vec<u8> = Vec::new();
102 let mut sequences: Vec<u64> = Vec::new();
103 let mut created_at_values: Vec<DateTime> = Vec::new();
104 let mut updated_at_values: Vec<DateTime> = Vec::new();
105 let mut data_rows: Vec<Vec<Value>> = Vec::new();
106 let mut new_last_key = None;
107
108 let read_shape = get_or_create_series_shape(&stored_ctx.services.catalog, self.series.def(), rx)?;
109
110 let mut stream = rx.range(range, batch_size as usize)?;
111 let mut count = 0;
112
113 for entry in stream.by_ref() {
114 let entry = entry?;
115
116 if let Some(key) = SeriesRowKey::decode(&entry.key) {
117 key_values.push(key.key);
118 sequences.push(key.sequence);
119 created_at_values.push(DateTime::from_nanos(entry.row.created_at_nanos()));
120 updated_at_values.push(DateTime::from_nanos(entry.row.updated_at_nanos()));
121 if has_tag {
122 tags.push(key.variant_tag.unwrap_or(0));
123 }
124
125 let mut values = Vec::with_capacity(series.data_columns().count());
126 for (i, _) in series.data_columns().enumerate() {
127 values.push(read_shape.get_value(&entry.row, i + 1));
128 }
129 data_rows.push(values);
130
131 new_last_key = Some(entry.key);
132 count += 1;
133 if count >= batch_size as usize {
134 break;
135 }
136 }
137 }
138
139 drop(stream);
140
141 if key_values.is_empty() {
142 self.exhausted = true;
143 if self.last_key.is_none() {
144 let key_type = series
145 .columns
146 .iter()
147 .find(|c| c.name == series.key.column())
148 .map(|c| c.constraint.get_type())
149 .unwrap_or(Type::Int8);
150 let mut result_columns = Vec::new();
151 result_columns.push(ColumnWithName {
152 name: Fragment::internal(series.key.column()),
153 data: ColumnBuffer::none_typed(key_type, 0),
154 });
155 if has_tag {
156 result_columns.push(ColumnWithName {
157 name: Fragment::internal("tag"),
158 data: ColumnBuffer::none_typed(Type::Uint1, 0),
159 });
160 }
161 for col_def in series.data_columns() {
162 result_columns.push(ColumnWithName {
163 name: Fragment::internal(&col_def.name),
164 data: ColumnBuffer::none_typed(col_def.constraint.get_type(), 0),
165 });
166 }
167 return Ok(Some(Columns::new(result_columns)));
168 }
169 return Ok(None);
170 }
171
172 self.last_key = new_last_key;
173
174 let mut result_columns = Vec::new();
175
176 result_columns.push(ColumnWithName::new(
177 Fragment::internal(series.key.column()),
178 series.key_column_data(key_values),
179 ));
180
181 if has_tag {
182 result_columns.push(ColumnWithName::new(Fragment::internal("tag"), ColumnBuffer::uint1(tags)));
183 }
184
185 for (col_idx, col_def) in series.data_columns().enumerate() {
186 let col_type = col_def.constraint.get_type();
187 let col_values: Vec<Value> = data_rows
188 .iter()
189 .map(|row| row.get(col_idx).cloned().unwrap_or(Value::none()))
190 .collect();
191
192 result_columns.push(build_data_column(&col_def.name, &col_values, col_type)?);
193 }
194
195 let row_numbers: Vec<RowNumber> = sequences.into_iter().map(RowNumber::from).collect();
196 Ok(Some(Columns::with_system_columns(
197 result_columns,
198 row_numbers,
199 created_at_values,
200 updated_at_values,
201 )))
202 }
203
204 fn headers(&self) -> Option<ColumnHeaders> {
205 Some(self.headers.clone())
206 }
207}
208
209pub(crate) fn build_data_column(name: &str, values: &[Value], col_type: Type) -> Result<ColumnWithName> {
210 let data = match col_type {
211 Type::Boolean => {
212 let vals: Vec<bool> = values
213 .iter()
214 .map(|v| match v {
215 Value::Boolean(b) => *b,
216 _ => false,
217 })
218 .collect();
219 ColumnBuffer::bool(vals)
220 }
221 Type::Int1 => {
222 let vals: Vec<i8> = values
223 .iter()
224 .map(|v| match v {
225 Value::Int1(n) => *n,
226 _ => 0,
227 })
228 .collect();
229 ColumnBuffer::int1(vals)
230 }
231 Type::Int2 => {
232 let vals: Vec<i16> = values
233 .iter()
234 .map(|v| match v {
235 Value::Int2(n) => *n,
236 _ => 0,
237 })
238 .collect();
239 ColumnBuffer::int2(vals)
240 }
241 Type::Int4 => {
242 let vals: Vec<i32> = values
243 .iter()
244 .map(|v| match v {
245 Value::Int4(n) => *n,
246 _ => 0,
247 })
248 .collect();
249 ColumnBuffer::int4(vals)
250 }
251 Type::Int8 => {
252 let vals: Vec<i64> = values
253 .iter()
254 .map(|v| match v {
255 Value::Int8(n) => *n,
256 _ => 0,
257 })
258 .collect();
259 ColumnBuffer::int8(vals)
260 }
261 Type::Uint1 => {
262 let vals: Vec<u8> = values
263 .iter()
264 .map(|v| match v {
265 Value::Uint1(n) => *n,
266 _ => 0,
267 })
268 .collect();
269 ColumnBuffer::uint1(vals)
270 }
271 Type::Uint2 => {
272 let vals: Vec<u16> = values
273 .iter()
274 .map(|v| match v {
275 Value::Uint2(n) => *n,
276 _ => 0,
277 })
278 .collect();
279 ColumnBuffer::uint2(vals)
280 }
281 Type::Uint4 => {
282 let vals: Vec<u32> = values
283 .iter()
284 .map(|v| match v {
285 Value::Uint4(n) => *n,
286 _ => 0,
287 })
288 .collect();
289 ColumnBuffer::uint4(vals)
290 }
291 Type::Uint8 => {
292 let vals: Vec<u64> = values
293 .iter()
294 .map(|v| match v {
295 Value::Uint8(n) => *n,
296 _ => 0,
297 })
298 .collect();
299 ColumnBuffer::uint8(vals)
300 }
301 Type::Float4 => {
302 let vals: Vec<f32> = values
303 .iter()
304 .map(|v| match v {
305 Value::Float4(n) => n.value(),
306 _ => 0.0,
307 })
308 .collect();
309 ColumnBuffer::float4(vals)
310 }
311 Type::Float8 => {
312 let vals: Vec<f64> = values
313 .iter()
314 .map(|v| match v {
315 Value::Float8(n) => n.value(),
316 _ => 0.0,
317 })
318 .collect();
319 ColumnBuffer::float8(vals)
320 }
321 Type::Utf8 => {
322 let vals: Vec<String> = values
323 .iter()
324 .map(|v| match v {
325 Value::Utf8(s) => s.clone(),
326 _ => String::new(),
327 })
328 .collect();
329 ColumnBuffer::utf8(vals)
330 }
331 _ => {
332 let vals: Vec<String> = values.iter().map(|v| format!("{:?}", v)).collect();
333 ColumnBuffer::utf8(vals)
334 }
335 };
336
337 Ok(ColumnWithName {
338 name: Fragment::internal(name),
339 data,
340 })
341}