Skip to main content

reifydb_sub_flow/operator/scan/
view.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{collections::HashMap, sync::Arc};
5
6use reifydb_core::{
7	encoded::shape::{RowShape, RowShapeField},
8	interface::{
9		catalog::{flow::FlowNodeId, shape::ShapeId, view::View},
10		change::{Change, ChangeOrigin, Diff},
11	},
12	key::row::RowKey,
13	value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns},
14};
15use reifydb_type::{
16	Result,
17	fragment::Fragment,
18	value::{datetime::DateTime, row_number::RowNumber},
19};
20
21use crate::{Operator, operator::sink::decode_dictionary_columns, transaction::FlowTransaction};
22
23/// Final state of a single row according to the in-transaction view overlay.
24///
25/// `Present(columns, idx)` - the row exists with data at `columns[idx]`.
26/// `Removed` - the row was removed in this transaction and should be absent
27/// from the pull result.
28enum OverlayRow<'a> {
29	Present(&'a Columns, usize),
30	Removed,
31}
32
33/// Build a per-row lookup of the overlay's effect on the given view.
34///
35/// Walks `overlay` in order, collapsing multiple diffs for the same row_number
36/// so the final entry reflects the latest state (later diffs override earlier
37/// ones, Insert/Update write a Present entry, Remove writes a Removed entry).
38fn build_view_overlay<'a>(overlay: &'a [Change], view_id: u64) -> HashMap<RowNumber, OverlayRow<'a>> {
39	let mut map: HashMap<RowNumber, OverlayRow<'a>> = HashMap::new();
40	for change in overlay {
41		let ChangeOrigin::Shape(ShapeId::View(id)) = change.origin else {
42			continue;
43		};
44		if id.0 != view_id {
45			continue;
46		}
47		for diff in &change.diffs {
48			match diff {
49				Diff::Insert {
50					post,
51				} => {
52					for (idx, rn) in post.row_numbers.iter().enumerate() {
53						map.insert(*rn, OverlayRow::Present(post, idx));
54					}
55				}
56				Diff::Update {
57					post,
58					..
59				} => {
60					for (idx, rn) in post.row_numbers.iter().enumerate() {
61						map.insert(*rn, OverlayRow::Present(post, idx));
62					}
63				}
64				Diff::Remove {
65					pre,
66				} => {
67					for rn in pre.row_numbers.iter() {
68						map.insert(*rn, OverlayRow::Removed);
69					}
70				}
71			}
72		}
73	}
74	map
75}
76
77pub struct PrimitiveViewOperator {
78	node: FlowNodeId,
79	view: View,
80}
81
82impl PrimitiveViewOperator {
83	pub fn new(node: FlowNodeId, view: View) -> Self {
84		Self {
85			node,
86			view,
87		}
88	}
89}
90
91impl Operator for PrimitiveViewOperator {
92	fn id(&self) -> FlowNodeId {
93		self.node
94	}
95
96	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
97		let mut decoded_diffs = Vec::with_capacity(change.diffs.len());
98		for diff in change.diffs {
99			decoded_diffs.push(match diff {
100				Diff::Insert {
101					post,
102				} => {
103					let mut decoded = post;
104					decode_dictionary_columns(Arc::make_mut(&mut decoded), txn)?;
105					Diff::insert_arc(decoded)
106				}
107				Diff::Update {
108					pre,
109					post,
110				} => {
111					let mut decoded_pre = pre;
112					let mut decoded_post = post;
113					decode_dictionary_columns(Arc::make_mut(&mut decoded_pre), txn)?;
114					decode_dictionary_columns(Arc::make_mut(&mut decoded_post), txn)?;
115					Diff::update_arc(decoded_pre, decoded_post)
116				}
117				Diff::Remove {
118					pre,
119				} => {
120					let mut decoded = pre;
121					decode_dictionary_columns(Arc::make_mut(&mut decoded), txn)?;
122					Diff::remove_arc(decoded)
123				}
124			});
125		}
126		Ok(Change::from_flow(self.node, change.version, decoded_diffs, change.changed_at))
127	}
128
129	fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
130		if rows.is_empty() {
131			return Ok(Columns::from_catalog_columns(self.view.columns()));
132		}
133
134		let shape: RowShape = self.view.columns().into();
135		let fields = shape.fields();
136
137		// Hold the Arc in a local so the overlay HashMap (which borrows from it)
138		// stays alive across the subsequent mutable borrow for `txn.get`. The
139		// overlay reflects sibling views' outputs produced earlier in the same
140		// pre-commit; it's empty for Deferred / Ephemeral transactions.
141		let overlay_arc = txn.view_overlay();
142		let overlay = overlay_arc
143			.as_deref()
144			.map(|o| build_view_overlay(o.as_slice(), self.view.id().0))
145			.unwrap_or_default();
146
147		let mut columns_vec = self.allocate_pull_columns(fields, rows.len());
148		let mut row_numbers = Vec::with_capacity(rows.len());
149		let mut created_at = Vec::with_capacity(rows.len());
150		let mut updated_at = Vec::with_capacity(rows.len());
151
152		for row_num in rows {
153			if self.try_push_overlay_row(
154				*row_num,
155				&overlay,
156				&mut columns_vec,
157				&mut row_numbers,
158				&mut created_at,
159				&mut updated_at,
160			) {
161				continue;
162			}
163			self.try_push_storage_row(
164				txn,
165				*row_num,
166				&shape,
167				fields,
168				&mut columns_vec,
169				&mut row_numbers,
170				&mut created_at,
171				&mut updated_at,
172			)?;
173		}
174
175		if row_numbers.is_empty() {
176			Ok(Columns::from_catalog_columns(self.view.columns()))
177		} else {
178			Ok(Columns::with_system_columns(columns_vec, row_numbers, created_at, updated_at))
179		}
180	}
181}
182
183impl PrimitiveViewOperator {
184	#[inline]
185	fn allocate_pull_columns(&self, fields: &[RowShapeField], capacity: usize) -> Vec<ColumnWithName> {
186		let mut columns_vec: Vec<ColumnWithName> = Vec::with_capacity(fields.len());
187		for field in fields.iter() {
188			columns_vec.push(ColumnWithName {
189				name: Fragment::internal(&field.name),
190				data: ColumnBuffer::with_capacity(field.constraint.get_type(), capacity),
191			});
192		}
193		columns_vec
194	}
195
196	#[inline]
197	#[allow(clippy::too_many_arguments)]
198	fn try_push_overlay_row(
199		&self,
200		row_num: RowNumber,
201		overlay: &HashMap<RowNumber, OverlayRow<'_>>,
202		columns_vec: &mut [ColumnWithName],
203		row_numbers: &mut Vec<RowNumber>,
204		created_at: &mut Vec<DateTime>,
205		updated_at: &mut Vec<DateTime>,
206	) -> bool {
207		// True iff this row was resolved via overlay (Removed -> skip, Present -> appended).
208		match overlay.get(&row_num) {
209			Some(OverlayRow::Removed) => true,
210			Some(OverlayRow::Present(src, idx)) => {
211				row_numbers.push(row_num);
212				created_at.push(src.created_at.get(*idx).copied().unwrap_or_default());
213				updated_at.push(src.updated_at.get(*idx).copied().unwrap_or_default());
214				for (i, col) in src.iter().enumerate() {
215					if i < columns_vec.len() {
216						columns_vec[i].data.push_value(col.data().get_value(*idx));
217					}
218				}
219				true
220			}
221			None => false,
222		}
223	}
224
225	#[inline]
226	#[allow(clippy::too_many_arguments)]
227	fn try_push_storage_row(
228		&self,
229		txn: &mut FlowTransaction,
230		row_num: RowNumber,
231		shape: &RowShape,
232		fields: &[RowShapeField],
233		columns_vec: &mut [ColumnWithName],
234		row_numbers: &mut Vec<RowNumber>,
235		created_at: &mut Vec<DateTime>,
236		updated_at: &mut Vec<DateTime>,
237	) -> Result<()> {
238		let key = RowKey::encoded(self.view.underlying_id(), row_num);
239		if let Some(encoded) = txn.get(&key)? {
240			row_numbers.push(row_num);
241			created_at.push(DateTime::from_nanos(encoded.created_at_nanos()));
242			updated_at.push(DateTime::from_nanos(encoded.updated_at_nanos()));
243			for (i, _field) in fields.iter().enumerate() {
244				let value = shape.get_value(&encoded, i);
245				columns_vec[i].data.push_value(value);
246			}
247		}
248		Ok(())
249	}
250}