reifydb_sub_flow/operator/scan/
series.rs1use reifydb_core::{
5 interface::{
6 catalog::{flow::FlowNodeId, series::Series},
7 change::Change,
8 },
9 value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns},
10};
11use reifydb_type::{
12 Result,
13 fragment::Fragment,
14 value::{row_number::RowNumber, r#type::Type},
15};
16
17use crate::{Operator, transaction::FlowTransaction};
18
19pub struct PrimitiveSeriesOperator {
20 node: FlowNodeId,
21 series: Series,
22}
23
24impl PrimitiveSeriesOperator {
25 pub fn new(node: FlowNodeId, series: Series) -> Self {
26 Self {
27 node,
28 series,
29 }
30 }
31}
32
33impl Operator for PrimitiveSeriesOperator {
34 fn id(&self) -> FlowNodeId {
35 self.node
36 }
37
38 fn apply(&self, _txn: &mut FlowTransaction, change: Change) -> Result<Change> {
39 Ok(Change::from_flow(self.node, change.version, change.diffs, change.changed_at))
40 }
41
42 fn pull(&self, _txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
43 if rows.is_empty() {
44 return Ok(self.empty_columns());
45 }
46
47 Ok(self.empty_columns())
50 }
51}
52
53impl PrimitiveSeriesOperator {
54 fn empty_columns(&self) -> Columns {
55 let mut columns = Vec::with_capacity(1 + self.series.columns.len());
56
57 columns.push(ColumnWithName {
59 name: Fragment::internal("timestamp"),
60 data: ColumnBuffer::with_capacity(Type::Int8, 0),
61 });
62
63 for col in &self.series.columns {
65 columns.push(ColumnWithName {
66 name: Fragment::internal(&col.name),
67 data: ColumnBuffer::with_capacity(col.constraint.get_type(), 0),
68 });
69 }
70
71 Columns::new(columns)
72 }
73}