Skip to main content

reifydb_sub_flow/operator/sink/
view.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_core::{
7	encoded::schema::Schema,
8	interface::{
9		catalog::{flow::FlowNodeId, primitive::PrimitiveId},
10		change::{Change, ChangeOrigin, Diff},
11		resolved::ResolvedView,
12	},
13	key::row::RowKey,
14	value::column::columns::Columns,
15};
16use reifydb_transaction::interceptor::view::ViewInterceptor;
17use reifydb_type::{Result, value::row_number::RowNumber};
18
19use super::{coerce_columns, encode_row_at_index};
20use crate::{Operator, operator::Operators, transaction::FlowTransaction};
21
22pub struct SinkViewOperator {
23	#[allow(dead_code)]
24	parent: Arc<Operators>,
25	node: FlowNodeId,
26	view: ResolvedView,
27}
28
29impl SinkViewOperator {
30	pub fn new(parent: Arc<Operators>, node: FlowNodeId, view: ResolvedView) -> Self {
31		Self {
32			parent,
33			node,
34			view,
35		}
36	}
37}
38
39impl Operator for SinkViewOperator {
40	fn id(&self) -> FlowNodeId {
41		self.node
42	}
43
44	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
45		let view_def = self.view.def().clone();
46		let schema: Schema = (&view_def.columns).into();
47
48		for diff in change.diffs.iter() {
49			match diff {
50				Diff::Insert {
51					post,
52				} => {
53					// Coerce columns to match view schema types (already decoded at source)
54					let coerced = coerce_columns(post, &view_def.columns)?;
55					let row_count = coerced.row_count();
56					for row_idx in 0..row_count {
57						let row_number = coerced.row_numbers[row_idx];
58						let (_, encoded) =
59							encode_row_at_index(&coerced, row_idx, &schema, row_number);
60
61						ViewInterceptor::pre_insert(txn, &view_def, row_number, &encoded)?;
62						let key = RowKey::encoded(PrimitiveId::view(view_def.id), row_number);
63						txn.set(&key, encoded.clone())?;
64						ViewInterceptor::post_insert(txn, &view_def, row_number, &encoded)?;
65					}
66					// Emit view change for downstream transactional flows
67					let version = txn.version();
68					txn.push_view_change(Change {
69						origin: ChangeOrigin::Primitive(PrimitiveId::view(view_def.id)),
70						version,
71						diffs: vec![Diff::Insert {
72							post: coerced,
73						}],
74					});
75				}
76				Diff::Update {
77					pre,
78					post,
79				} => {
80					// Coerce columns to match view schema types (already decoded at source)
81					let coerced_pre = coerce_columns(pre, &view_def.columns)?;
82					let coerced_post = coerce_columns(post, &view_def.columns)?;
83					let row_count = coerced_post.row_count();
84					for row_idx in 0..row_count {
85						let pre_row_number = coerced_pre.row_numbers[row_idx];
86						let post_row_number = coerced_post.row_numbers[row_idx];
87						let (_, pre_encoded) = encode_row_at_index(
88							&coerced_pre,
89							row_idx,
90							&schema,
91							pre_row_number,
92						);
93						let (_, post_encoded) = encode_row_at_index(
94							&coerced_post,
95							row_idx,
96							&schema,
97							post_row_number,
98						);
99
100						ViewInterceptor::pre_update(
101							txn,
102							&view_def,
103							post_row_number,
104							&post_encoded,
105						)?;
106						let old_key =
107							RowKey::encoded(PrimitiveId::view(view_def.id), pre_row_number);
108						let new_key = RowKey::encoded(
109							PrimitiveId::view(view_def.id),
110							post_row_number,
111						);
112						txn.remove(&old_key)?;
113						txn.set(&new_key, post_encoded.clone())?;
114						ViewInterceptor::post_update(
115							txn,
116							&view_def,
117							post_row_number,
118							&post_encoded,
119							&pre_encoded,
120						)?;
121					}
122					// Emit view change for downstream transactional flows
123					let version = txn.version();
124					txn.push_view_change(Change {
125						origin: ChangeOrigin::Primitive(PrimitiveId::view(view_def.id)),
126						version,
127						diffs: vec![Diff::Update {
128							pre: coerced_pre,
129							post: coerced_post,
130						}],
131					});
132				}
133				Diff::Remove {
134					pre,
135				} => {
136					// Coerce columns to match view schema types (already decoded at source)
137					let coerced = coerce_columns(pre, &view_def.columns)?;
138					let row_count = coerced.row_count();
139					for row_idx in 0..row_count {
140						let row_number = coerced.row_numbers[row_idx];
141						let (_, encoded) =
142							encode_row_at_index(&coerced, row_idx, &schema, row_number);
143
144						ViewInterceptor::pre_delete(txn, &view_def, row_number)?;
145						let key = RowKey::encoded(PrimitiveId::view(view_def.id), row_number);
146						txn.remove(&key)?;
147						ViewInterceptor::post_delete(txn, &view_def, row_number, &encoded)?;
148					}
149					// Emit view change for downstream transactional flows
150					let version = txn.version();
151					txn.push_view_change(Change {
152						origin: ChangeOrigin::Primitive(PrimitiveId::view(view_def.id)),
153						version,
154						diffs: vec![Diff::Remove {
155							pre: coerced,
156						}],
157					});
158				}
159			}
160		}
161
162		Ok(Change::from_flow(self.node, change.version, Vec::new()))
163	}
164
165	fn pull(&self, _txn: &mut FlowTransaction, _rows: &[RowNumber]) -> Result<Columns> {
166		unreachable!()
167	}
168}