reifydb_sub_flow/operator/scan/
series.rs1use reifydb_core::{
5 interface::{
6 catalog::{flow::FlowNodeId, series::Series},
7 change::Change,
8 },
9 value::column::{Column, columns::Columns, data::ColumnData},
10};
11use reifydb_type::{
12 Result,
13 fragment::Fragment,
14 util::cowvec::CowVec,
15 value::{row_number::RowNumber, r#type::Type},
16};
17
18use crate::{Operator, transaction::FlowTransaction};
19
20pub struct PrimitiveSeriesOperator {
21 node: FlowNodeId,
22 series: Series,
23}
24
25impl PrimitiveSeriesOperator {
26 pub fn new(node: FlowNodeId, series: Series) -> Self {
27 Self {
28 node,
29 series,
30 }
31 }
32}
33
34impl Operator for PrimitiveSeriesOperator {
35 fn id(&self) -> FlowNodeId {
36 self.node
37 }
38
39 fn apply(&self, _txn: &mut FlowTransaction, change: Change) -> Result<Change> {
40 Ok(Change::from_flow(self.node, change.version, change.diffs))
41 }
42
43 fn pull(&self, _txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
44 if rows.is_empty() {
45 return Ok(self.empty_columns());
46 }
47
48 Ok(self.empty_columns())
51 }
52}
53
54impl PrimitiveSeriesOperator {
55 fn empty_columns(&self) -> Columns {
56 let mut columns = Vec::with_capacity(1 + self.series.columns.len());
57
58 columns.push(Column {
60 name: Fragment::internal("timestamp"),
61 data: ColumnData::with_capacity(Type::Int8, 0),
62 });
63
64 for col in &self.series.columns {
66 columns.push(Column {
67 name: Fragment::internal(&col.name),
68 data: ColumnData::with_capacity(col.constraint.get_type(), 0),
69 });
70 }
71
72 Columns {
73 row_numbers: CowVec::new(Vec::new()),
74 columns: CowVec::new(columns),
75 }
76 }
77}