Skip to main content

reifydb_sub_flow/operator/sink/
view.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_core::{
7	encoded::shape::RowShape,
8	interface::{
9		catalog::{flow::FlowNodeId, id::TableId, shape::ShapeId, view::View},
10		change::{Change, ChangeOrigin, Diff},
11		resolved::ResolvedView,
12	},
13	key::row::RowKey,
14	value::column::columns::Columns,
15};
16use reifydb_transaction::interceptor::view_row::ViewRowInterceptor;
17use reifydb_type::{
18	Result,
19	value::{datetime::DateTime, row_number::RowNumber},
20};
21use smallvec::smallvec;
22
23use super::{coerce_columns, encode_row_at_index};
24use crate::{Operator, operator::Operators, transaction::FlowTransaction};
25
26pub struct SinkTableViewOperator {
27	#[allow(dead_code)]
28	parent: Arc<Operators>,
29	node: FlowNodeId,
30	view: ResolvedView,
31	underlying: TableId,
32}
33
34impl SinkTableViewOperator {
35	pub fn new(parent: Arc<Operators>, node: FlowNodeId, view: ResolvedView, underlying: TableId) -> Self {
36		Self {
37			parent,
38			node,
39			view,
40			underlying,
41		}
42	}
43}
44
45impl Operator for SinkTableViewOperator {
46	fn id(&self) -> FlowNodeId {
47		self.node
48	}
49
50	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
51		let view = self.view.def().clone();
52		let shape: RowShape = view.columns().into();
53		let object_id = ShapeId::table(self.underlying);
54
55		for diff in change.diffs.iter() {
56			match diff {
57				Diff::Insert {
58					post,
59				} => self.apply_table_view_insert(txn, &view, &shape, object_id, post)?,
60				Diff::Update {
61					pre,
62					post,
63				} => self.apply_table_view_update(txn, &view, &shape, object_id, pre, post)?,
64				Diff::Remove {
65					pre,
66				} => self.apply_table_view_remove(txn, &view, &shape, object_id, pre)?,
67			}
68		}
69
70		Ok(Change::from_flow(self.node, change.version, Vec::new(), change.changed_at))
71	}
72
73	fn pull(&self, _txn: &mut FlowTransaction, _rows: &[RowNumber]) -> Result<Columns> {
74		unreachable!()
75	}
76}
77
78impl SinkTableViewOperator {
79	#[inline]
80	fn apply_table_view_insert(
81		&self,
82		txn: &mut FlowTransaction,
83		view: &View,
84		shape: &RowShape,
85		object_id: ShapeId,
86		post: &Arc<Columns>,
87	) -> Result<()> {
88		let coerced = coerce_columns(post, view.columns())?;
89		let row_count = coerced.row_count();
90		for row_idx in 0..row_count {
91			let row_number = coerced.row_numbers[row_idx];
92			let (_, encoded) = encode_row_at_index(&coerced, row_idx, shape, row_number)?;
93			let encoded = ViewRowInterceptor::pre_insert(txn, view, row_number, encoded)?;
94			let key = RowKey::encoded(object_id, row_number);
95			txn.set(&key, encoded.clone())?;
96			ViewRowInterceptor::post_insert(txn, view, row_number, &encoded)?;
97		}
98		emit_view_change(txn, view, Diff::insert(coerced));
99		Ok(())
100	}
101
102	#[inline]
103	fn apply_table_view_update(
104		&self,
105		txn: &mut FlowTransaction,
106		view: &View,
107		shape: &RowShape,
108		object_id: ShapeId,
109		pre: &Arc<Columns>,
110		post: &Arc<Columns>,
111	) -> Result<()> {
112		let coerced_pre = coerce_columns(pre, view.columns())?;
113		let coerced_post = coerce_columns(post, view.columns())?;
114		let row_count = coerced_post.row_count();
115		for row_idx in 0..row_count {
116			let pre_row_number = coerced_pre.row_numbers[row_idx];
117			let post_row_number = coerced_post.row_numbers[row_idx];
118			let (_, pre_encoded) = encode_row_at_index(&coerced_pre, row_idx, shape, pre_row_number)?;
119			let (_, post_encoded) = encode_row_at_index(&coerced_post, row_idx, shape, post_row_number)?;
120
121			let post_encoded = ViewRowInterceptor::pre_update(txn, view, post_row_number, post_encoded)?;
122			let pre_key = RowKey::encoded(object_id, pre_row_number);
123			let post_key = RowKey::encoded(object_id, post_row_number);
124			txn.remove(&pre_key)?;
125			txn.set(&post_key, post_encoded.clone())?;
126			ViewRowInterceptor::post_update(txn, view, post_row_number, &post_encoded, &pre_encoded)?;
127		}
128		emit_view_change(txn, view, Diff::update(coerced_pre, coerced_post));
129		Ok(())
130	}
131
132	#[inline]
133	fn apply_table_view_remove(
134		&self,
135		txn: &mut FlowTransaction,
136		view: &View,
137		shape: &RowShape,
138		object_id: ShapeId,
139		pre: &Arc<Columns>,
140	) -> Result<()> {
141		let coerced = coerce_columns(pre, view.columns())?;
142		let row_count = coerced.row_count();
143		for row_idx in 0..row_count {
144			let row_number = coerced.row_numbers[row_idx];
145			let (_, encoded) = encode_row_at_index(&coerced, row_idx, shape, row_number)?;
146			ViewRowInterceptor::pre_delete(txn, view, row_number)?;
147			let key = RowKey::encoded(object_id, row_number);
148			txn.remove(&key)?;
149			ViewRowInterceptor::post_delete(txn, view, row_number, &encoded)?;
150		}
151		emit_view_change(txn, view, Diff::remove(coerced));
152		Ok(())
153	}
154}
155
156#[inline]
157fn emit_view_change(txn: &mut FlowTransaction, view: &View, diff: Diff) {
158	let version = txn.version();
159	let changed_at = DateTime::from_nanos(txn.clock().now_nanos());
160	txn.track_flow_change(Change {
161		origin: ChangeOrigin::Shape(ShapeId::view(view.id())),
162		version,
163		diffs: smallvec![diff],
164		changed_at,
165	});
166}