Skip to main content

reifydb_engine/vm/volcano/scan/
series.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_core::{
7	encoded::key::EncodedKey,
8	interface::resolved::ResolvedSeries,
9	key::{
10		EncodableKey,
11		series_row::{SeriesRowKey, SeriesRowKeyRange},
12	},
13	value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns, headers::ColumnHeaders},
14};
15use reifydb_transaction::transaction::Transaction;
16use reifydb_type::{
17	fragment::Fragment,
18	value::{Value, datetime::DateTime, row_number::RowNumber, r#type::Type},
19};
20use tracing::instrument;
21
22use crate::{
23	Result,
24	vm::{
25		instruction::dml::shape::get_or_create_series_shape,
26		volcano::query::{QueryContext, QueryNode},
27	},
28};
29
30pub struct SeriesScanNode {
31	series: ResolvedSeries,
32	key_range_start: Option<u64>,
33	key_range_end: Option<u64>,
34	variant_tag: Option<u8>,
35	context: Option<Arc<QueryContext>>,
36	headers: ColumnHeaders,
37	last_key: Option<EncodedKey>,
38	exhausted: bool,
39}
40
41impl SeriesScanNode {
42	pub fn new(
43		series: ResolvedSeries,
44		key_range_start: Option<u64>,
45		key_range_end: Option<u64>,
46		variant_tag: Option<u8>,
47		context: Arc<QueryContext>,
48	) -> Result<Self> {
49		let mut columns = vec![Fragment::internal(series.def().key.column())];
50		if series.def().tag.is_some() {
51			columns.push(Fragment::internal("tag"));
52		}
53		for col in series.columns() {
54			columns.push(Fragment::internal(&col.name));
55		}
56		let headers = ColumnHeaders {
57			columns,
58		};
59
60		Ok(Self {
61			series,
62			key_range_start,
63			key_range_end,
64			variant_tag,
65			context: Some(context),
66			headers,
67			last_key: None,
68			exhausted: false,
69		})
70	}
71}
72
73impl QueryNode for SeriesScanNode {
74	#[instrument(name = "volcano::scan::series::initialize", level = "trace", skip_all)]
75	fn initialize<'a>(&mut self, _rx: &mut Transaction<'a>, _ctx: &QueryContext) -> Result<()> {
76		Ok(())
77	}
78
79	#[instrument(name = "volcano::scan::series::next", level = "trace", skip_all)]
80	fn next<'a>(&mut self, rx: &mut Transaction<'a>, _ctx: &mut QueryContext) -> Result<Option<Columns>> {
81		debug_assert!(self.context.is_some(), "SeriesScanNode::next() called before initialize()");
82		let stored_ctx = self.context.as_ref().unwrap();
83
84		if self.exhausted {
85			return Ok(None);
86		}
87
88		let batch_size = stored_ctx.batch_size;
89		let series = self.series.def();
90		let has_tag = series.tag.is_some();
91
92		let range = SeriesRowKeyRange::scan_range(
93			series.id,
94			self.variant_tag,
95			self.key_range_start,
96			self.key_range_end,
97			self.last_key.as_ref(),
98		);
99
100		let mut key_values: Vec<u64> = Vec::new();
101		let mut tags: Vec<u8> = Vec::new();
102		let mut sequences: Vec<u64> = Vec::new();
103		let mut created_at_values: Vec<DateTime> = Vec::new();
104		let mut updated_at_values: Vec<DateTime> = Vec::new();
105		let mut data_rows: Vec<Vec<Value>> = Vec::new();
106		let mut new_last_key = None;
107
108		let read_shape = get_or_create_series_shape(&stored_ctx.services.catalog, self.series.def(), rx)?;
109
110		let mut stream = rx.range(range, batch_size as usize)?;
111		let mut count = 0;
112
113		for entry in stream.by_ref() {
114			let entry = entry?;
115
116			if let Some(key) = SeriesRowKey::decode(&entry.key) {
117				key_values.push(key.key);
118				sequences.push(key.sequence);
119				created_at_values.push(DateTime::from_nanos(entry.row.created_at_nanos()));
120				updated_at_values.push(DateTime::from_nanos(entry.row.updated_at_nanos()));
121				if has_tag {
122					tags.push(key.variant_tag.unwrap_or(0));
123				}
124
125				let mut values = Vec::with_capacity(series.data_columns().count());
126				for (i, _) in series.data_columns().enumerate() {
127					values.push(read_shape.get_value(&entry.row, i + 1));
128				}
129				data_rows.push(values);
130
131				new_last_key = Some(entry.key);
132				count += 1;
133				if count >= batch_size as usize {
134					break;
135				}
136			}
137		}
138
139		drop(stream);
140
141		if key_values.is_empty() {
142			self.exhausted = true;
143			if self.last_key.is_none() {
144				let key_type = series
145					.columns
146					.iter()
147					.find(|c| c.name == series.key.column())
148					.map(|c| c.constraint.get_type())
149					.unwrap_or(Type::Int8);
150				let mut result_columns = Vec::new();
151				result_columns.push(ColumnWithName {
152					name: Fragment::internal(series.key.column()),
153					data: ColumnBuffer::none_typed(key_type, 0),
154				});
155				if has_tag {
156					result_columns.push(ColumnWithName {
157						name: Fragment::internal("tag"),
158						data: ColumnBuffer::none_typed(Type::Uint1, 0),
159					});
160				}
161				for col_def in series.data_columns() {
162					result_columns.push(ColumnWithName {
163						name: Fragment::internal(&col_def.name),
164						data: ColumnBuffer::none_typed(col_def.constraint.get_type(), 0),
165					});
166				}
167				return Ok(Some(Columns::new(result_columns)));
168			}
169			return Ok(None);
170		}
171
172		self.last_key = new_last_key;
173
174		let mut result_columns = Vec::new();
175
176		result_columns.push(ColumnWithName::new(
177			Fragment::internal(series.key.column()),
178			series.key_column_data(key_values),
179		));
180
181		if has_tag {
182			result_columns.push(ColumnWithName::new(Fragment::internal("tag"), ColumnBuffer::uint1(tags)));
183		}
184
185		for (col_idx, col_def) in series.data_columns().enumerate() {
186			let col_type = col_def.constraint.get_type();
187			let col_values: Vec<Value> = data_rows
188				.iter()
189				.map(|row| row.get(col_idx).cloned().unwrap_or(Value::none()))
190				.collect();
191
192			result_columns.push(build_data_column(&col_def.name, &col_values, col_type)?);
193		}
194
195		let row_numbers: Vec<RowNumber> = sequences.into_iter().map(RowNumber::from).collect();
196		Ok(Some(Columns::with_system_columns(
197			result_columns,
198			row_numbers,
199			created_at_values,
200			updated_at_values,
201		)))
202	}
203
204	fn headers(&self) -> Option<ColumnHeaders> {
205		Some(self.headers.clone())
206	}
207}
208
209pub(crate) fn build_data_column(name: &str, values: &[Value], col_type: Type) -> Result<ColumnWithName> {
210	let data = match col_type {
211		Type::Boolean => {
212			let vals: Vec<bool> = values
213				.iter()
214				.map(|v| match v {
215					Value::Boolean(b) => *b,
216					_ => false,
217				})
218				.collect();
219			ColumnBuffer::bool(vals)
220		}
221		Type::Int1 => {
222			let vals: Vec<i8> = values
223				.iter()
224				.map(|v| match v {
225					Value::Int1(n) => *n,
226					_ => 0,
227				})
228				.collect();
229			ColumnBuffer::int1(vals)
230		}
231		Type::Int2 => {
232			let vals: Vec<i16> = values
233				.iter()
234				.map(|v| match v {
235					Value::Int2(n) => *n,
236					_ => 0,
237				})
238				.collect();
239			ColumnBuffer::int2(vals)
240		}
241		Type::Int4 => {
242			let vals: Vec<i32> = values
243				.iter()
244				.map(|v| match v {
245					Value::Int4(n) => *n,
246					_ => 0,
247				})
248				.collect();
249			ColumnBuffer::int4(vals)
250		}
251		Type::Int8 => {
252			let vals: Vec<i64> = values
253				.iter()
254				.map(|v| match v {
255					Value::Int8(n) => *n,
256					_ => 0,
257				})
258				.collect();
259			ColumnBuffer::int8(vals)
260		}
261		Type::Uint1 => {
262			let vals: Vec<u8> = values
263				.iter()
264				.map(|v| match v {
265					Value::Uint1(n) => *n,
266					_ => 0,
267				})
268				.collect();
269			ColumnBuffer::uint1(vals)
270		}
271		Type::Uint2 => {
272			let vals: Vec<u16> = values
273				.iter()
274				.map(|v| match v {
275					Value::Uint2(n) => *n,
276					_ => 0,
277				})
278				.collect();
279			ColumnBuffer::uint2(vals)
280		}
281		Type::Uint4 => {
282			let vals: Vec<u32> = values
283				.iter()
284				.map(|v| match v {
285					Value::Uint4(n) => *n,
286					_ => 0,
287				})
288				.collect();
289			ColumnBuffer::uint4(vals)
290		}
291		Type::Uint8 => {
292			let vals: Vec<u64> = values
293				.iter()
294				.map(|v| match v {
295					Value::Uint8(n) => *n,
296					_ => 0,
297				})
298				.collect();
299			ColumnBuffer::uint8(vals)
300		}
301		Type::Float4 => {
302			let vals: Vec<f32> = values
303				.iter()
304				.map(|v| match v {
305					Value::Float4(n) => n.value(),
306					_ => 0.0,
307				})
308				.collect();
309			ColumnBuffer::float4(vals)
310		}
311		Type::Float8 => {
312			let vals: Vec<f64> = values
313				.iter()
314				.map(|v| match v {
315					Value::Float8(n) => n.value(),
316					_ => 0.0,
317				})
318				.collect();
319			ColumnBuffer::float8(vals)
320		}
321		Type::Utf8 => {
322			let vals: Vec<String> = values
323				.iter()
324				.map(|v| match v {
325					Value::Utf8(s) => s.clone(),
326					_ => String::new(),
327				})
328				.collect();
329			ColumnBuffer::utf8(vals)
330		}
331		_ => {
332			let vals: Vec<String> = values.iter().map(|v| format!("{:?}", v)).collect();
333			ColumnBuffer::utf8(vals)
334		}
335	};
336
337	Ok(ColumnWithName {
338		name: Fragment::internal(name),
339		data,
340	})
341}