reifydb_sub_flow/operator/scan/
view.rs1use reifydb_core::{
5 encoded::schema::RowSchema,
6 interface::{
7 catalog::{flow::FlowNodeId, view::View},
8 change::{Change, Diff},
9 },
10 key::row::RowKey,
11 value::column::{Column, columns::Columns, data::ColumnData},
12};
13use reifydb_type::{Result, fragment::Fragment, util::cowvec::CowVec, value::row_number::RowNumber};
14
15use crate::{Operator, operator::sink::decode_dictionary_columns, transaction::FlowTransaction};
16
17pub struct PrimitiveViewOperator {
18 node: FlowNodeId,
19 view: View,
20}
21
22impl PrimitiveViewOperator {
23 pub fn new(node: FlowNodeId, view: View) -> Self {
24 Self {
25 node,
26 view,
27 }
28 }
29}
30
31impl Operator for PrimitiveViewOperator {
32 fn id(&self) -> FlowNodeId {
33 self.node
34 }
35
36 fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
37 let mut decoded_diffs = Vec::with_capacity(change.diffs.len());
38 for diff in change.diffs {
39 decoded_diffs.push(match diff {
40 Diff::Insert {
41 post,
42 } => {
43 let mut decoded = post;
44 decode_dictionary_columns(&mut decoded, txn)?;
45 Diff::Insert {
46 post: decoded,
47 }
48 }
49 Diff::Update {
50 pre,
51 post,
52 } => {
53 let mut decoded_pre = pre;
54 let mut decoded_post = post;
55 decode_dictionary_columns(&mut decoded_pre, txn)?;
56 decode_dictionary_columns(&mut decoded_post, txn)?;
57 Diff::Update {
58 pre: decoded_pre,
59 post: decoded_post,
60 }
61 }
62 Diff::Remove {
63 pre,
64 } => {
65 let mut decoded = pre;
66 decode_dictionary_columns(&mut decoded, txn)?;
67 Diff::Remove {
68 pre: decoded,
69 }
70 }
71 });
72 }
73 Ok(Change::from_flow(self.node, change.version, decoded_diffs))
74 }
75
76 fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
77 if rows.is_empty() {
78 return Ok(Columns::from_view(&self.view));
79 }
80
81 let schema: RowSchema = self.view.columns().into();
82 let fields = schema.fields();
83
84 let mut columns_vec: Vec<Column> = Vec::with_capacity(fields.len());
86 for field in fields.iter() {
87 columns_vec.push(Column {
88 name: Fragment::internal(&field.name),
89 data: ColumnData::with_capacity(field.constraint.get_type(), rows.len()),
90 });
91 }
92 let mut row_numbers = Vec::with_capacity(rows.len());
93
94 for row_num in rows {
95 let key = RowKey::encoded(self.view.underlying_id(), *row_num);
96 if let Some(encoded) = txn.get(&key)? {
97 row_numbers.push(*row_num);
98 for (i, _field) in fields.iter().enumerate() {
100 let value = schema.get_value(&encoded, i);
101 columns_vec[i].data.push_value(value);
102 }
103 }
104 }
105
106 if row_numbers.is_empty() {
107 Ok(Columns::from_view(&self.view))
108 } else {
109 Ok(Columns {
110 row_numbers: CowVec::new(row_numbers),
111 columns: CowVec::new(columns_vec),
112 })
113 }
114 }
115}