Skip to main content

reifydb_sub_flow/operator/scan/
series.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{
5	interface::{
6		catalog::{flow::FlowNodeId, series::Series},
7		change::Change,
8	},
9	value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns},
10};
11use reifydb_type::{
12	Result,
13	fragment::Fragment,
14	value::{row_number::RowNumber, r#type::Type},
15};
16
17use crate::{Operator, transaction::FlowTransaction};
18
19pub struct PrimitiveSeriesOperator {
20	node: FlowNodeId,
21	series: Series,
22}
23
24impl PrimitiveSeriesOperator {
25	pub fn new(node: FlowNodeId, series: Series) -> Self {
26		Self {
27			node,
28			series,
29		}
30	}
31}
32
33impl Operator for PrimitiveSeriesOperator {
34	fn id(&self) -> FlowNodeId {
35		self.node
36	}
37
38	fn apply(&self, _txn: &mut FlowTransaction, change: Change) -> Result<Change> {
39		Ok(Change::from_flow(self.node, change.version, change.diffs, change.changed_at))
40	}
41
42	fn pull(&self, _txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
43		if rows.is_empty() {
44			return Ok(self.empty_columns());
45		}
46
47		// Series pull returns empty columns since series data is keyed differently
48		// than tables. Flow changes are pushed via apply() instead.
49		Ok(self.empty_columns())
50	}
51}
52
53impl PrimitiveSeriesOperator {
54	fn empty_columns(&self) -> Columns {
55		let mut columns = Vec::with_capacity(1 + self.series.columns.len());
56
57		// Timestamp column
58		columns.push(ColumnWithName {
59			name: Fragment::internal("timestamp"),
60			data: ColumnBuffer::with_capacity(Type::Int8, 0),
61		});
62
63		// Data columns
64		for col in &self.series.columns {
65			columns.push(ColumnWithName {
66				name: Fragment::internal(&col.name),
67				data: ColumnBuffer::with_capacity(col.constraint.get_type(), 0),
68			});
69		}
70
71		Columns::new(columns)
72	}
73}