Skip to main content

reifydb_sub_flow/operator/scan/
view.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{
5	encoded::schema::RowSchema,
6	interface::{
7		catalog::{flow::FlowNodeId, view::View},
8		change::{Change, Diff},
9	},
10	key::row::RowKey,
11	value::column::{Column, columns::Columns, data::ColumnData},
12};
13use reifydb_type::{Result, fragment::Fragment, util::cowvec::CowVec, value::row_number::RowNumber};
14
15use crate::{Operator, operator::sink::decode_dictionary_columns, transaction::FlowTransaction};
16
17pub struct PrimitiveViewOperator {
18	node: FlowNodeId,
19	view: View,
20}
21
22impl PrimitiveViewOperator {
23	pub fn new(node: FlowNodeId, view: View) -> Self {
24		Self {
25			node,
26			view,
27		}
28	}
29}
30
31impl Operator for PrimitiveViewOperator {
32	fn id(&self) -> FlowNodeId {
33		self.node
34	}
35
36	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
37		let mut decoded_diffs = Vec::with_capacity(change.diffs.len());
38		for diff in change.diffs {
39			decoded_diffs.push(match diff {
40				Diff::Insert {
41					post,
42				} => {
43					let mut decoded = post;
44					decode_dictionary_columns(&mut decoded, txn)?;
45					Diff::Insert {
46						post: decoded,
47					}
48				}
49				Diff::Update {
50					pre,
51					post,
52				} => {
53					let mut decoded_pre = pre;
54					let mut decoded_post = post;
55					decode_dictionary_columns(&mut decoded_pre, txn)?;
56					decode_dictionary_columns(&mut decoded_post, txn)?;
57					Diff::Update {
58						pre: decoded_pre,
59						post: decoded_post,
60					}
61				}
62				Diff::Remove {
63					pre,
64				} => {
65					let mut decoded = pre;
66					decode_dictionary_columns(&mut decoded, txn)?;
67					Diff::Remove {
68						pre: decoded,
69					}
70				}
71			});
72		}
73		Ok(Change::from_flow(self.node, change.version, decoded_diffs))
74	}
75
76	fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
77		if rows.is_empty() {
78			return Ok(Columns::from_view(&self.view));
79		}
80
81		let schema: RowSchema = self.view.columns().into();
82		let fields = schema.fields();
83
84		// Pre-allocate columns with capacity
85		let mut columns_vec: Vec<Column> = Vec::with_capacity(fields.len());
86		for field in fields.iter() {
87			columns_vec.push(Column {
88				name: Fragment::internal(&field.name),
89				data: ColumnData::with_capacity(field.constraint.get_type(), rows.len()),
90			});
91		}
92		let mut row_numbers = Vec::with_capacity(rows.len());
93
94		for row_num in rows {
95			let key = RowKey::encoded(self.view.underlying_id(), *row_num);
96			if let Some(encoded) = txn.get(&key)? {
97				row_numbers.push(*row_num);
98				// Decode each column value directly
99				for (i, _field) in fields.iter().enumerate() {
100					let value = schema.get_value(&encoded, i);
101					columns_vec[i].data.push_value(value);
102				}
103			}
104		}
105
106		if row_numbers.is_empty() {
107			Ok(Columns::from_view(&self.view))
108		} else {
109			Ok(Columns {
110				row_numbers: CowVec::new(row_numbers),
111				columns: CowVec::new(columns_vec),
112			})
113		}
114	}
115}