reifydb_sub_flow/operator/scan/
view.rs1use std::{collections::HashMap, sync::Arc};
5
6use reifydb_core::{
7 encoded::shape::{RowShape, RowShapeField},
8 interface::{
9 catalog::{flow::FlowNodeId, shape::ShapeId, view::View},
10 change::{Change, ChangeOrigin, Diff},
11 },
12 key::row::RowKey,
13 value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns},
14};
15use reifydb_type::{
16 Result,
17 fragment::Fragment,
18 value::{datetime::DateTime, row_number::RowNumber},
19};
20
21use crate::{Operator, operator::sink::decode_dictionary_columns, transaction::FlowTransaction};
22
23enum OverlayRow<'a> {
29 Present(&'a Columns, usize),
30 Removed,
31}
32
33fn build_view_overlay<'a>(overlay: &'a [Change], view_id: u64) -> HashMap<RowNumber, OverlayRow<'a>> {
39 let mut map: HashMap<RowNumber, OverlayRow<'a>> = HashMap::new();
40 for change in overlay {
41 let ChangeOrigin::Shape(ShapeId::View(id)) = change.origin else {
42 continue;
43 };
44 if id.0 != view_id {
45 continue;
46 }
47 for diff in &change.diffs {
48 match diff {
49 Diff::Insert {
50 post,
51 } => {
52 for (idx, rn) in post.row_numbers.iter().enumerate() {
53 map.insert(*rn, OverlayRow::Present(post, idx));
54 }
55 }
56 Diff::Update {
57 post,
58 ..
59 } => {
60 for (idx, rn) in post.row_numbers.iter().enumerate() {
61 map.insert(*rn, OverlayRow::Present(post, idx));
62 }
63 }
64 Diff::Remove {
65 pre,
66 } => {
67 for rn in pre.row_numbers.iter() {
68 map.insert(*rn, OverlayRow::Removed);
69 }
70 }
71 }
72 }
73 }
74 map
75}
76
77pub struct PrimitiveViewOperator {
78 node: FlowNodeId,
79 view: View,
80}
81
82impl PrimitiveViewOperator {
83 pub fn new(node: FlowNodeId, view: View) -> Self {
84 Self {
85 node,
86 view,
87 }
88 }
89}
90
91impl Operator for PrimitiveViewOperator {
92 fn id(&self) -> FlowNodeId {
93 self.node
94 }
95
96 fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
97 let mut decoded_diffs = Vec::with_capacity(change.diffs.len());
98 for diff in change.diffs {
99 decoded_diffs.push(match diff {
100 Diff::Insert {
101 post,
102 } => {
103 let mut decoded = post;
104 decode_dictionary_columns(Arc::make_mut(&mut decoded), txn)?;
105 Diff::insert_arc(decoded)
106 }
107 Diff::Update {
108 pre,
109 post,
110 } => {
111 let mut decoded_pre = pre;
112 let mut decoded_post = post;
113 decode_dictionary_columns(Arc::make_mut(&mut decoded_pre), txn)?;
114 decode_dictionary_columns(Arc::make_mut(&mut decoded_post), txn)?;
115 Diff::update_arc(decoded_pre, decoded_post)
116 }
117 Diff::Remove {
118 pre,
119 } => {
120 let mut decoded = pre;
121 decode_dictionary_columns(Arc::make_mut(&mut decoded), txn)?;
122 Diff::remove_arc(decoded)
123 }
124 });
125 }
126 Ok(Change::from_flow(self.node, change.version, decoded_diffs, change.changed_at))
127 }
128
129 fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
130 if rows.is_empty() {
131 return Ok(Columns::from_catalog_columns(self.view.columns()));
132 }
133
134 let shape: RowShape = self.view.columns().into();
135 let fields = shape.fields();
136
137 let overlay_arc = txn.view_overlay();
142 let overlay = overlay_arc
143 .as_deref()
144 .map(|o| build_view_overlay(o.as_slice(), self.view.id().0))
145 .unwrap_or_default();
146
147 let mut columns_vec = self.allocate_pull_columns(fields, rows.len());
148 let mut row_numbers = Vec::with_capacity(rows.len());
149 let mut created_at = Vec::with_capacity(rows.len());
150 let mut updated_at = Vec::with_capacity(rows.len());
151
152 for row_num in rows {
153 if self.try_push_overlay_row(
154 *row_num,
155 &overlay,
156 &mut columns_vec,
157 &mut row_numbers,
158 &mut created_at,
159 &mut updated_at,
160 ) {
161 continue;
162 }
163 self.try_push_storage_row(
164 txn,
165 *row_num,
166 &shape,
167 fields,
168 &mut columns_vec,
169 &mut row_numbers,
170 &mut created_at,
171 &mut updated_at,
172 )?;
173 }
174
175 if row_numbers.is_empty() {
176 Ok(Columns::from_catalog_columns(self.view.columns()))
177 } else {
178 Ok(Columns::with_system_columns(columns_vec, row_numbers, created_at, updated_at))
179 }
180 }
181}
182
183impl PrimitiveViewOperator {
184 #[inline]
185 fn allocate_pull_columns(&self, fields: &[RowShapeField], capacity: usize) -> Vec<ColumnWithName> {
186 let mut columns_vec: Vec<ColumnWithName> = Vec::with_capacity(fields.len());
187 for field in fields.iter() {
188 columns_vec.push(ColumnWithName {
189 name: Fragment::internal(&field.name),
190 data: ColumnBuffer::with_capacity(field.constraint.get_type(), capacity),
191 });
192 }
193 columns_vec
194 }
195
196 #[inline]
197 #[allow(clippy::too_many_arguments)]
198 fn try_push_overlay_row(
199 &self,
200 row_num: RowNumber,
201 overlay: &HashMap<RowNumber, OverlayRow<'_>>,
202 columns_vec: &mut [ColumnWithName],
203 row_numbers: &mut Vec<RowNumber>,
204 created_at: &mut Vec<DateTime>,
205 updated_at: &mut Vec<DateTime>,
206 ) -> bool {
207 match overlay.get(&row_num) {
209 Some(OverlayRow::Removed) => true,
210 Some(OverlayRow::Present(src, idx)) => {
211 row_numbers.push(row_num);
212 created_at.push(src.created_at.get(*idx).copied().unwrap_or_default());
213 updated_at.push(src.updated_at.get(*idx).copied().unwrap_or_default());
214 for (i, col) in src.iter().enumerate() {
215 if i < columns_vec.len() {
216 columns_vec[i].data.push_value(col.data().get_value(*idx));
217 }
218 }
219 true
220 }
221 None => false,
222 }
223 }
224
225 #[inline]
226 #[allow(clippy::too_many_arguments)]
227 fn try_push_storage_row(
228 &self,
229 txn: &mut FlowTransaction,
230 row_num: RowNumber,
231 shape: &RowShape,
232 fields: &[RowShapeField],
233 columns_vec: &mut [ColumnWithName],
234 row_numbers: &mut Vec<RowNumber>,
235 created_at: &mut Vec<DateTime>,
236 updated_at: &mut Vec<DateTime>,
237 ) -> Result<()> {
238 let key = RowKey::encoded(self.view.underlying_id(), row_num);
239 if let Some(encoded) = txn.get(&key)? {
240 row_numbers.push(row_num);
241 created_at.push(DateTime::from_nanos(encoded.created_at_nanos()));
242 updated_at.push(DateTime::from_nanos(encoded.updated_at_nanos()));
243 for (i, _field) in fields.iter().enumerate() {
244 let value = shape.get_value(&encoded, i);
245 columns_vec[i].data.push_value(value);
246 }
247 }
248 Ok(())
249 }
250}