Skip to main content

reifydb_sub_flow/operator/sink/
series_view.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_core::{
7	encoded::shape::RowShape,
8	interface::{
9		catalog::{flow::FlowNodeId, id::SeriesId, series::SeriesKey, shape::ShapeId, view::View},
10		change::{Change, ChangeOrigin, Diff},
11		resolved::ResolvedView,
12	},
13	key::row::RowKey,
14	value::column::columns::Columns,
15};
16use reifydb_transaction::interceptor::view_row::ViewRowInterceptor;
17use reifydb_type::{
18	Result,
19	value::{datetime::DateTime, row_number::RowNumber},
20};
21use smallvec::smallvec;
22
23use super::{coerce_columns, encode_row_at_index};
24use crate::{Operator, operator::Operators, transaction::FlowTransaction};
25
26pub struct SinkSeriesViewOperator {
27	#[allow(dead_code)]
28	parent: Arc<Operators>,
29	node: FlowNodeId,
30	view: ResolvedView,
31	series_id: SeriesId,
32	#[allow(dead_code)]
33	key: SeriesKey,
34}
35
36impl SinkSeriesViewOperator {
37	pub fn new(
38		parent: Arc<Operators>,
39		node: FlowNodeId,
40		view: ResolvedView,
41		series_id: SeriesId,
42		key: SeriesKey,
43	) -> Self {
44		Self {
45			parent,
46			node,
47			view,
48			series_id,
49			key,
50		}
51	}
52}
53
54impl Operator for SinkSeriesViewOperator {
55	fn id(&self) -> FlowNodeId {
56		self.node
57	}
58
59	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
60		let view = self.view.def().clone();
61		let shape: RowShape = view.columns().into();
62		let object_id = ShapeId::series(self.series_id);
63
64		for diff in change.diffs.iter() {
65			match diff {
66				Diff::Insert {
67					post,
68				} => self.apply_series_view_insert(txn, &view, &shape, object_id, post)?,
69				Diff::Update {
70					pre,
71					post,
72				} => self.apply_series_view_update(txn, &view, &shape, object_id, pre, post)?,
73				Diff::Remove {
74					pre,
75				} => self.apply_series_view_remove(txn, &view, &shape, object_id, pre)?,
76			}
77		}
78
79		Ok(Change::from_flow(self.node, change.version, Vec::new(), change.changed_at))
80	}
81
82	fn pull(&self, _txn: &mut FlowTransaction, _rows: &[RowNumber]) -> Result<Columns> {
83		unreachable!()
84	}
85}
86
87impl SinkSeriesViewOperator {
88	#[inline]
89	fn apply_series_view_insert(
90		&self,
91		txn: &mut FlowTransaction,
92		view: &View,
93		shape: &RowShape,
94		object_id: ShapeId,
95		post: &Arc<Columns>,
96	) -> Result<()> {
97		let coerced = coerce_columns(post, view.columns())?;
98		let row_count = coerced.row_count();
99		for row_idx in 0..row_count {
100			let row_number = coerced.row_numbers[row_idx];
101			let (_, encoded) = encode_row_at_index(&coerced, row_idx, shape, row_number)?;
102			let encoded = ViewRowInterceptor::pre_insert(txn, view, row_number, encoded)?;
103			let key = RowKey::encoded(object_id, row_number);
104			txn.set(&key, encoded.clone())?;
105			ViewRowInterceptor::post_insert(txn, view, row_number, &encoded)?;
106		}
107		emit_view_change(txn, view, Diff::insert(coerced));
108		Ok(())
109	}
110
111	#[inline]
112	fn apply_series_view_update(
113		&self,
114		txn: &mut FlowTransaction,
115		view: &View,
116		shape: &RowShape,
117		object_id: ShapeId,
118		pre: &Arc<Columns>,
119		post: &Arc<Columns>,
120	) -> Result<()> {
121		let coerced_pre = coerce_columns(pre, view.columns())?;
122		let coerced_post = coerce_columns(post, view.columns())?;
123		let row_count = coerced_post.row_count();
124		for row_idx in 0..row_count {
125			let pre_row_number = coerced_pre.row_numbers[row_idx];
126			let post_row_number = coerced_post.row_numbers[row_idx];
127			let (_, pre_encoded) = encode_row_at_index(&coerced_pre, row_idx, shape, pre_row_number)?;
128			let (_, post_encoded) = encode_row_at_index(&coerced_post, row_idx, shape, post_row_number)?;
129
130			let post_encoded = ViewRowInterceptor::pre_update(txn, view, post_row_number, post_encoded)?;
131			let pre_key = RowKey::encoded(object_id, pre_row_number);
132			let post_key = RowKey::encoded(object_id, post_row_number);
133			txn.remove(&pre_key)?;
134			txn.set(&post_key, post_encoded.clone())?;
135			ViewRowInterceptor::post_update(txn, view, post_row_number, &post_encoded, &pre_encoded)?;
136		}
137		emit_view_change(txn, view, Diff::update(coerced_pre, coerced_post));
138		Ok(())
139	}
140
141	#[inline]
142	fn apply_series_view_remove(
143		&self,
144		txn: &mut FlowTransaction,
145		view: &View,
146		shape: &RowShape,
147		object_id: ShapeId,
148		pre: &Arc<Columns>,
149	) -> Result<()> {
150		let coerced = coerce_columns(pre, view.columns())?;
151		let row_count = coerced.row_count();
152		for row_idx in 0..row_count {
153			let row_number = coerced.row_numbers[row_idx];
154			let (_, encoded) = encode_row_at_index(&coerced, row_idx, shape, row_number)?;
155			ViewRowInterceptor::pre_delete(txn, view, row_number)?;
156			let key = RowKey::encoded(object_id, row_number);
157			txn.remove(&key)?;
158			ViewRowInterceptor::post_delete(txn, view, row_number, &encoded)?;
159		}
160		emit_view_change(txn, view, Diff::remove(coerced));
161		Ok(())
162	}
163}
164
165#[inline]
166fn emit_view_change(txn: &mut FlowTransaction, view: &View, diff: Diff) {
167	let version = txn.version();
168	let changed_at = DateTime::from_nanos(txn.clock().now_nanos());
169	txn.track_flow_change(Change {
170		origin: ChangeOrigin::Shape(ShapeId::view(view.id())),
171		version,
172		diffs: smallvec![diff],
173		changed_at,
174	});
175}