Skip to main content

reifydb_sub_flow/operator/scan/
series.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{
5	interface::{
6		catalog::{flow::FlowNodeId, series::Series},
7		change::Change,
8	},
9	value::column::{Column, columns::Columns, data::ColumnData},
10};
11use reifydb_type::{
12	Result,
13	fragment::Fragment,
14	util::cowvec::CowVec,
15	value::{row_number::RowNumber, r#type::Type},
16};
17
18use crate::{Operator, transaction::FlowTransaction};
19
20pub struct PrimitiveSeriesOperator {
21	node: FlowNodeId,
22	series: Series,
23}
24
25impl PrimitiveSeriesOperator {
26	pub fn new(node: FlowNodeId, series: Series) -> Self {
27		Self {
28			node,
29			series,
30		}
31	}
32}
33
34impl Operator for PrimitiveSeriesOperator {
35	fn id(&self) -> FlowNodeId {
36		self.node
37	}
38
39	fn apply(&self, _txn: &mut FlowTransaction, change: Change) -> Result<Change> {
40		Ok(Change::from_flow(self.node, change.version, change.diffs))
41	}
42
43	fn pull(&self, _txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
44		if rows.is_empty() {
45			return Ok(self.empty_columns());
46		}
47
48		// Series pull returns empty columns since series data is keyed differently
49		// than tables. Flow changes are pushed via apply() instead.
50		Ok(self.empty_columns())
51	}
52}
53
54impl PrimitiveSeriesOperator {
55	fn empty_columns(&self) -> Columns {
56		let mut columns = Vec::with_capacity(1 + self.series.columns.len());
57
58		// Timestamp column
59		columns.push(Column {
60			name: Fragment::internal("timestamp"),
61			data: ColumnData::with_capacity(Type::Int8, 0),
62		});
63
64		// Data columns
65		for col in &self.series.columns {
66			columns.push(Column {
67				name: Fragment::internal(&col.name),
68				data: ColumnData::with_capacity(col.constraint.get_type(), 0),
69			});
70		}
71
72		Columns {
73			row_numbers: CowVec::new(Vec::new()),
74			columns: CowVec::new(columns),
75		}
76	}
77}