Skip to main content

reifydb_sub_flow/operator/sink/
view.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_core::{
7	encoded::schema::Schema,
8	interface::{
9		catalog::{flow::FlowNodeId, primitive::PrimitiveId},
10		change::{Change, ChangeOrigin, Diff},
11		resolved::ResolvedView,
12	},
13	key::row::RowKey,
14	value::column::columns::Columns,
15};
16use reifydb_transaction::interceptor::view::ViewInterceptor;
17use reifydb_type::{Result, value::row_number::RowNumber};
18
19use super::{coerce_columns, encode_row_at_index};
20use crate::{Operator, operator::Operators, transaction::FlowTransaction};
21
22pub struct SinkViewOperator {
23	#[allow(dead_code)]
24	parent: Arc<Operators>,
25	node: FlowNodeId,
26	view: ResolvedView,
27}
28
29impl SinkViewOperator {
30	pub fn new(parent: Arc<Operators>, node: FlowNodeId, view: ResolvedView) -> Self {
31		Self {
32			parent,
33			node,
34			view,
35		}
36	}
37}
38
39impl Operator for SinkViewOperator {
40	fn id(&self) -> FlowNodeId {
41		self.node
42	}
43
44	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
45		let view_def = self.view.def().clone();
46		let schema: Schema = (&view_def.columns).into();
47
48		for diff in change.diffs.iter() {
49			match diff {
50				Diff::Insert {
51					post,
52				} => {
53					// Coerce columns to match view schema types (already decoded at source)
54					let coerced = coerce_columns(post, &view_def.columns)?;
55					let row_count = coerced.row_count();
56					for row_idx in 0..row_count {
57						let row_number = coerced.row_numbers[row_idx];
58						let (_, encoded) =
59							encode_row_at_index(&coerced, row_idx, &schema, row_number);
60
61						ViewInterceptor::pre_insert(txn, &view_def, row_number, &encoded)?;
62						let key = RowKey::encoded(PrimitiveId::view(view_def.id), row_number);
63						txn.set(&key, encoded.clone())?;
64						ViewInterceptor::post_insert(txn, &view_def, row_number, &encoded)?;
65
66						if let Some(log) = txn.testing_mut() {
67							let new = Columns::single_row(coerced.iter().map(|col| {
68								(col.name().text(), col.data().get_value(row_idx))
69							}));
70							let mutation_key = format!(
71								"views::{}::{}",
72								self.view.namespace().name(),
73								self.view.name()
74							);
75							log.record_insert(mutation_key, new);
76						}
77					}
78					// Emit view change for downstream transactional flows
79					let version = txn.version();
80					txn.push_view_change(Change {
81						origin: ChangeOrigin::Primitive(PrimitiveId::view(view_def.id)),
82						version,
83						diffs: vec![Diff::Insert {
84							post: coerced,
85						}],
86					});
87				}
88				Diff::Update {
89					pre,
90					post,
91				} => {
92					// Coerce columns to match view schema types (already decoded at source)
93					let coerced_pre = coerce_columns(pre, &view_def.columns)?;
94					let coerced_post = coerce_columns(post, &view_def.columns)?;
95					let row_count = coerced_post.row_count();
96					for row_idx in 0..row_count {
97						let pre_row_number = coerced_pre.row_numbers[row_idx];
98						let post_row_number = coerced_post.row_numbers[row_idx];
99						let (_, pre_encoded) = encode_row_at_index(
100							&coerced_pre,
101							row_idx,
102							&schema,
103							pre_row_number,
104						);
105						let (_, post_encoded) = encode_row_at_index(
106							&coerced_post,
107							row_idx,
108							&schema,
109							post_row_number,
110						);
111
112						ViewInterceptor::pre_update(
113							txn,
114							&view_def,
115							post_row_number,
116							&post_encoded,
117						)?;
118						let old_key =
119							RowKey::encoded(PrimitiveId::view(view_def.id), pre_row_number);
120						let new_key = RowKey::encoded(
121							PrimitiveId::view(view_def.id),
122							post_row_number,
123						);
124						txn.remove(&old_key)?;
125						txn.set(&new_key, post_encoded.clone())?;
126						ViewInterceptor::post_update(
127							txn,
128							&view_def,
129							post_row_number,
130							&post_encoded,
131							&pre_encoded,
132						)?;
133
134						if let Some(log) = txn.testing_mut() {
135							let old = Columns::single_row(coerced_pre.iter().map(|col| {
136								(col.name().text(), col.data().get_value(row_idx))
137							}));
138							let new = Columns::single_row(coerced_post.iter().map(|col| {
139								(col.name().text(), col.data().get_value(row_idx))
140							}));
141							let mutation_key = format!(
142								"views::{}::{}",
143								self.view.namespace().name(),
144								self.view.name()
145							);
146							log.record_update(mutation_key, old, new);
147						}
148					}
149					// Emit view change for downstream transactional flows
150					let version = txn.version();
151					txn.push_view_change(Change {
152						origin: ChangeOrigin::Primitive(PrimitiveId::view(view_def.id)),
153						version,
154						diffs: vec![Diff::Update {
155							pre: coerced_pre,
156							post: coerced_post,
157						}],
158					});
159				}
160				Diff::Remove {
161					pre,
162				} => {
163					// Coerce columns to match view schema types (already decoded at source)
164					let coerced = coerce_columns(pre, &view_def.columns)?;
165					let row_count = coerced.row_count();
166					for row_idx in 0..row_count {
167						let row_number = coerced.row_numbers[row_idx];
168						let (_, encoded) =
169							encode_row_at_index(&coerced, row_idx, &schema, row_number);
170
171						ViewInterceptor::pre_delete(txn, &view_def, row_number)?;
172						let key = RowKey::encoded(PrimitiveId::view(view_def.id), row_number);
173						txn.remove(&key)?;
174						ViewInterceptor::post_delete(txn, &view_def, row_number, &encoded)?;
175
176						if let Some(log) = txn.testing_mut() {
177							let old = Columns::single_row(coerced.iter().map(|col| {
178								(col.name().text(), col.data().get_value(row_idx))
179							}));
180							let mutation_key = format!(
181								"views::{}::{}",
182								self.view.namespace().name(),
183								self.view.name()
184							);
185							log.record_delete(mutation_key, old);
186						}
187					}
188					// Emit view change for downstream transactional flows
189					let version = txn.version();
190					txn.push_view_change(Change {
191						origin: ChangeOrigin::Primitive(PrimitiveId::view(view_def.id)),
192						version,
193						diffs: vec![Diff::Remove {
194							pre: coerced,
195						}],
196					});
197				}
198			}
199		}
200
201		Ok(Change::from_flow(self.node, change.version, Vec::new()))
202	}
203
204	fn pull(&self, _txn: &mut FlowTransaction, _rows: &[RowNumber]) -> Result<Columns> {
205		unreachable!()
206	}
207}