Skip to main content

reifydb_sub_flow/operator/sink/
series_view.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_core::{
7	encoded::schema::RowSchema,
8	interface::{
9		catalog::{flow::FlowNodeId, id::SeriesId, schema::SchemaId, series::SeriesKey},
10		change::{Change, ChangeOrigin, Diff},
11		resolved::ResolvedView,
12	},
13	key::row::RowKey,
14	value::column::columns::Columns,
15};
16use reifydb_transaction::interceptor::view_row::ViewRowInterceptor;
17use reifydb_type::{Result, value::row_number::RowNumber};
18
19use super::{coerce_columns, encode_row_at_index};
20use crate::{Operator, operator::Operators, transaction::FlowTransaction};
21
22pub struct SinkSeriesViewOperator {
23	#[allow(dead_code)]
24	parent: Arc<Operators>,
25	node: FlowNodeId,
26	view: ResolvedView,
27	series_id: SeriesId,
28	#[allow(dead_code)]
29	key: SeriesKey,
30}
31
32impl SinkSeriesViewOperator {
33	pub fn new(
34		parent: Arc<Operators>,
35		node: FlowNodeId,
36		view: ResolvedView,
37		series_id: SeriesId,
38		key: SeriesKey,
39	) -> Self {
40		Self {
41			parent,
42			node,
43			view,
44			series_id,
45			key,
46		}
47	}
48}
49
50impl Operator for SinkSeriesViewOperator {
51	fn id(&self) -> FlowNodeId {
52		self.node
53	}
54
55	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
56		let view = self.view.def().clone();
57		let schema: RowSchema = view.columns().into();
58		let object_id = SchemaId::series(self.series_id);
59
60		for diff in change.diffs.iter() {
61			match diff {
62				Diff::Insert {
63					post,
64				} => {
65					let coerced = coerce_columns(post, view.columns())?;
66					let row_count = coerced.row_count();
67					for row_idx in 0..row_count {
68						let row_number = coerced.row_numbers[row_idx];
69						let (_, encoded) =
70							encode_row_at_index(&coerced, row_idx, &schema, row_number);
71
72						let encoded = ViewRowInterceptor::pre_insert(
73							txn, &view, row_number, encoded,
74						)?;
75						let key = RowKey::encoded(object_id, row_number);
76						txn.set(&key, encoded.clone())?;
77						ViewRowInterceptor::post_insert(txn, &view, row_number, &encoded)?;
78					}
79					let version = txn.version();
80					txn.track_flow_change(Change {
81						origin: ChangeOrigin::Schema(SchemaId::view(view.id())),
82						version,
83						diffs: vec![Diff::Insert {
84							post: coerced,
85						}],
86					});
87				}
88				Diff::Update {
89					pre,
90					post,
91				} => {
92					let coerced_pre = coerce_columns(pre, view.columns())?;
93					let coerced_post = coerce_columns(post, view.columns())?;
94					let row_count = coerced_post.row_count();
95					for row_idx in 0..row_count {
96						let pre_row_number = coerced_pre.row_numbers[row_idx];
97						let post_row_number = coerced_post.row_numbers[row_idx];
98						let (_, pre_encoded) = encode_row_at_index(
99							&coerced_pre,
100							row_idx,
101							&schema,
102							pre_row_number,
103						);
104						let (_, post_encoded) = encode_row_at_index(
105							&coerced_post,
106							row_idx,
107							&schema,
108							post_row_number,
109						);
110
111						let post_encoded = ViewRowInterceptor::pre_update(
112							txn,
113							&view,
114							post_row_number,
115							post_encoded,
116						)?;
117						let pre_key = RowKey::encoded(object_id, pre_row_number);
118						let post_key = RowKey::encoded(object_id, post_row_number);
119						txn.remove(&pre_key)?;
120						txn.set(&post_key, post_encoded.clone())?;
121						ViewRowInterceptor::post_update(
122							txn,
123							&view,
124							post_row_number,
125							&post_encoded,
126							&pre_encoded,
127						)?;
128					}
129					let version = txn.version();
130					txn.track_flow_change(Change {
131						origin: ChangeOrigin::Schema(SchemaId::view(view.id())),
132						version,
133						diffs: vec![Diff::Update {
134							pre: coerced_pre,
135							post: coerced_post,
136						}],
137					});
138				}
139				Diff::Remove {
140					pre,
141				} => {
142					let coerced = coerce_columns(pre, view.columns())?;
143					let row_count = coerced.row_count();
144					for row_idx in 0..row_count {
145						let row_number = coerced.row_numbers[row_idx];
146						let (_, encoded) =
147							encode_row_at_index(&coerced, row_idx, &schema, row_number);
148
149						ViewRowInterceptor::pre_delete(txn, &view, row_number)?;
150						let key = RowKey::encoded(object_id, row_number);
151						txn.remove(&key)?;
152						ViewRowInterceptor::post_delete(txn, &view, row_number, &encoded)?;
153					}
154					let version = txn.version();
155					txn.track_flow_change(Change {
156						origin: ChangeOrigin::Schema(SchemaId::view(view.id())),
157						version,
158						diffs: vec![Diff::Remove {
159							pre: coerced,
160						}],
161					});
162				}
163			}
164		}
165
166		Ok(Change::from_flow(self.node, change.version, Vec::new()))
167	}
168
169	fn pull(&self, _txn: &mut FlowTransaction, _rows: &[RowNumber]) -> Result<Columns> {
170		unreachable!()
171	}
172}