Skip to main content

reifydb_engine/vm/volcano/scan/
series.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_core::{
7	encoded::key::EncodedKey,
8	interface::resolved::ResolvedSeries,
9	key::{
10		EncodableKey,
11		series_row::{SeriesRowKey, SeriesRowKeyRange},
12	},
13	value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns, headers::ColumnHeaders},
14};
15use reifydb_transaction::transaction::Transaction;
16use reifydb_type::{
17	fragment::Fragment,
18	value::{Value, datetime::DateTime, row_number::RowNumber, r#type::Type},
19};
20use tracing::instrument;
21
22use crate::{
23	Result,
24	vm::{
25		instruction::dml::shape::get_or_create_series_shape,
26		volcano::query::{QueryContext, QueryNode},
27	},
28};
29
30pub struct SeriesScanNode {
31	series: ResolvedSeries,
32	key_range_start: Option<u64>,
33	key_range_end: Option<u64>,
34	variant_tag: Option<u8>,
35	context: Option<Arc<QueryContext>>,
36	headers: ColumnHeaders,
37	last_key: Option<EncodedKey>,
38	exhausted: bool,
39	scan_limit: Option<usize>,
40}
41
42impl SeriesScanNode {
43	pub fn new(
44		series: ResolvedSeries,
45		key_range_start: Option<u64>,
46		key_range_end: Option<u64>,
47		variant_tag: Option<u8>,
48		context: Arc<QueryContext>,
49	) -> Result<Self> {
50		// Build headers: key column, optional tag, then data columns
51		let mut columns = vec![Fragment::internal(series.def().key.column())];
52		if series.def().tag.is_some() {
53			columns.push(Fragment::internal("tag"));
54		}
55		for col in series.columns() {
56			columns.push(Fragment::internal(&col.name));
57		}
58		let headers = ColumnHeaders {
59			columns,
60		};
61
62		Ok(Self {
63			series,
64			key_range_start,
65			key_range_end,
66			variant_tag,
67			context: Some(context),
68			headers,
69			last_key: None,
70			exhausted: false,
71			scan_limit: None,
72		})
73	}
74}
75
76impl QueryNode for SeriesScanNode {
77	#[instrument(name = "volcano::scan::series::initialize", level = "trace", skip_all)]
78	fn initialize<'a>(&mut self, _rx: &mut Transaction<'a>, _ctx: &QueryContext) -> Result<()> {
79		Ok(())
80	}
81
82	#[instrument(name = "volcano::scan::series::next", level = "trace", skip_all)]
83	fn next<'a>(&mut self, rx: &mut Transaction<'a>, _ctx: &mut QueryContext) -> Result<Option<Columns>> {
84		debug_assert!(self.context.is_some(), "SeriesScanNode::next() called before initialize()");
85		let stored_ctx = self.context.as_ref().unwrap();
86
87		if self.exhausted {
88			return Ok(None);
89		}
90
91		let batch_size = match self.scan_limit {
92			Some(limit) => (limit as u64).min(stored_ctx.batch_size),
93			None => stored_ctx.batch_size,
94		};
95		let series = self.series.def();
96		let has_tag = series.tag.is_some();
97
98		// Create scan range
99		let range = SeriesRowKeyRange::scan_range(
100			series.id,
101			self.variant_tag,
102			self.key_range_start,
103			self.key_range_end,
104			self.last_key.as_ref(),
105		);
106
107		let mut key_values: Vec<u64> = Vec::new();
108		let mut tags: Vec<u8> = Vec::new();
109		let mut sequences: Vec<u64> = Vec::new();
110		let mut created_at_values: Vec<DateTime> = Vec::new();
111		let mut updated_at_values: Vec<DateTime> = Vec::new();
112		let mut data_rows: Vec<Vec<Value>> = Vec::new();
113		let mut new_last_key = None;
114
115		// Get the shape for decoding series values before borrowing rx for the stream
116		let read_shape = get_or_create_series_shape(&stored_ctx.services.catalog, self.series.def(), rx)?;
117
118		let mut stream = rx.range(range, batch_size as usize)?;
119		let mut count = 0;
120
121		for entry in stream.by_ref() {
122			let entry = entry?;
123
124			// Decode the key to get timestamp and optional tag
125			if let Some(key) = SeriesRowKey::decode(&entry.key) {
126				key_values.push(key.key);
127				sequences.push(key.sequence);
128				created_at_values.push(DateTime::from_nanos(entry.row.created_at_nanos()));
129				updated_at_values.push(DateTime::from_nanos(entry.row.updated_at_nanos()));
130				if has_tag {
131					tags.push(key.variant_tag.unwrap_or(0));
132				}
133
134				// Decode data columns from value using shape
135				let mut values = Vec::with_capacity(series.data_columns().count());
136				for (i, _) in series.data_columns().enumerate() {
137					values.push(read_shape.get_value(&entry.row, i + 1));
138				}
139				data_rows.push(values);
140
141				new_last_key = Some(entry.key);
142				count += 1;
143				if count >= batch_size as usize {
144					break;
145				}
146			}
147		}
148
149		drop(stream);
150
151		if key_values.is_empty() {
152			self.exhausted = true;
153			if self.last_key.is_none() {
154				// Empty series: return empty columns with correct types to preserve shape
155				let key_type = series
156					.columns
157					.iter()
158					.find(|c| c.name == series.key.column())
159					.map(|c| c.constraint.get_type())
160					.unwrap_or(Type::Int8);
161				let mut result_columns = Vec::new();
162				result_columns.push(ColumnWithName {
163					name: Fragment::internal(series.key.column()),
164					data: ColumnBuffer::none_typed(key_type, 0),
165				});
166				if has_tag {
167					result_columns.push(ColumnWithName {
168						name: Fragment::internal("tag"),
169						data: ColumnBuffer::none_typed(Type::Uint1, 0),
170					});
171				}
172				for col_def in series.data_columns() {
173					result_columns.push(ColumnWithName {
174						name: Fragment::internal(&col_def.name),
175						data: ColumnBuffer::none_typed(col_def.constraint.get_type(), 0),
176					});
177				}
178				return Ok(Some(Columns::new(result_columns)));
179			}
180			return Ok(None);
181		}
182
183		self.last_key = new_last_key;
184
185		// Build output columns
186		let mut result_columns = Vec::new();
187
188		// Key column
189		result_columns.push(ColumnWithName::new(
190			Fragment::internal(series.key.column()),
191			series.key_column_data(key_values),
192		));
193
194		// Tag column (Uint1) if present
195		if has_tag {
196			result_columns.push(ColumnWithName::new(Fragment::internal("tag"), ColumnBuffer::uint1(tags)));
197		}
198
199		// Data columns
200		for (col_idx, col_def) in series.data_columns().enumerate() {
201			let col_type = col_def.constraint.get_type();
202			let col_values: Vec<Value> = data_rows
203				.iter()
204				.map(|row| row.get(col_idx).cloned().unwrap_or(Value::none()))
205				.collect();
206
207			result_columns.push(build_data_column(&col_def.name, &col_values, col_type)?);
208		}
209
210		let row_numbers: Vec<RowNumber> = sequences.into_iter().map(RowNumber::from).collect();
211		Ok(Some(Columns::with_system_columns(
212			result_columns,
213			row_numbers,
214			created_at_values,
215			updated_at_values,
216		)))
217	}
218
219	fn headers(&self) -> Option<ColumnHeaders> {
220		Some(self.headers.clone())
221	}
222
223	fn set_scan_limit(&mut self, limit: usize) {
224		self.scan_limit = Some(limit);
225	}
226}
227
228pub(crate) fn build_data_column(name: &str, values: &[Value], col_type: Type) -> Result<ColumnWithName> {
229	let data = match col_type {
230		Type::Boolean => {
231			let vals: Vec<bool> = values
232				.iter()
233				.map(|v| match v {
234					Value::Boolean(b) => *b,
235					_ => false,
236				})
237				.collect();
238			ColumnBuffer::bool(vals)
239		}
240		Type::Int1 => {
241			let vals: Vec<i8> = values
242				.iter()
243				.map(|v| match v {
244					Value::Int1(n) => *n,
245					_ => 0,
246				})
247				.collect();
248			ColumnBuffer::int1(vals)
249		}
250		Type::Int2 => {
251			let vals: Vec<i16> = values
252				.iter()
253				.map(|v| match v {
254					Value::Int2(n) => *n,
255					_ => 0,
256				})
257				.collect();
258			ColumnBuffer::int2(vals)
259		}
260		Type::Int4 => {
261			let vals: Vec<i32> = values
262				.iter()
263				.map(|v| match v {
264					Value::Int4(n) => *n,
265					_ => 0,
266				})
267				.collect();
268			ColumnBuffer::int4(vals)
269		}
270		Type::Int8 => {
271			let vals: Vec<i64> = values
272				.iter()
273				.map(|v| match v {
274					Value::Int8(n) => *n,
275					_ => 0,
276				})
277				.collect();
278			ColumnBuffer::int8(vals)
279		}
280		Type::Uint1 => {
281			let vals: Vec<u8> = values
282				.iter()
283				.map(|v| match v {
284					Value::Uint1(n) => *n,
285					_ => 0,
286				})
287				.collect();
288			ColumnBuffer::uint1(vals)
289		}
290		Type::Uint2 => {
291			let vals: Vec<u16> = values
292				.iter()
293				.map(|v| match v {
294					Value::Uint2(n) => *n,
295					_ => 0,
296				})
297				.collect();
298			ColumnBuffer::uint2(vals)
299		}
300		Type::Uint4 => {
301			let vals: Vec<u32> = values
302				.iter()
303				.map(|v| match v {
304					Value::Uint4(n) => *n,
305					_ => 0,
306				})
307				.collect();
308			ColumnBuffer::uint4(vals)
309		}
310		Type::Uint8 => {
311			let vals: Vec<u64> = values
312				.iter()
313				.map(|v| match v {
314					Value::Uint8(n) => *n,
315					_ => 0,
316				})
317				.collect();
318			ColumnBuffer::uint8(vals)
319		}
320		Type::Float4 => {
321			let vals: Vec<f32> = values
322				.iter()
323				.map(|v| match v {
324					Value::Float4(n) => n.value(),
325					_ => 0.0,
326				})
327				.collect();
328			ColumnBuffer::float4(vals)
329		}
330		Type::Float8 => {
331			let vals: Vec<f64> = values
332				.iter()
333				.map(|v| match v {
334					Value::Float8(n) => n.value(),
335					_ => 0.0,
336				})
337				.collect();
338			ColumnBuffer::float8(vals)
339		}
340		Type::Utf8 => {
341			let vals: Vec<String> = values
342				.iter()
343				.map(|v| match v {
344					Value::Utf8(s) => s.clone(),
345					_ => String::new(),
346				})
347				.collect();
348			ColumnBuffer::utf8(vals)
349		}
350		_ => {
351			// Fallback: convert to string representation
352			let vals: Vec<String> = values.iter().map(|v| format!("{:?}", v)).collect();
353			ColumnBuffer::utf8(vals)
354		}
355	};
356
357	Ok(ColumnWithName {
358		name: Fragment::internal(name),
359		data,
360	})
361}