Skip to main content

reifydb_sub_flow/operator/sink/
view.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_core::{
7	encoded::schema::RowSchema,
8	interface::{
9		catalog::{flow::FlowNodeId, id::TableId, schema::SchemaId},
10		change::{Change, ChangeOrigin, Diff},
11		resolved::ResolvedView,
12	},
13	key::row::RowKey,
14	value::column::columns::Columns,
15};
16use reifydb_transaction::interceptor::view_row::ViewRowInterceptor;
17use reifydb_type::{Result, value::row_number::RowNumber};
18
19use super::{coerce_columns, encode_row_at_index};
20use crate::{Operator, operator::Operators, transaction::FlowTransaction};
21
22pub struct SinkTableViewOperator {
23	#[allow(dead_code)]
24	parent: Arc<Operators>,
25	node: FlowNodeId,
26	view: ResolvedView,
27	underlying: TableId,
28}
29
30impl SinkTableViewOperator {
31	pub fn new(parent: Arc<Operators>, node: FlowNodeId, view: ResolvedView, underlying: TableId) -> Self {
32		Self {
33			parent,
34			node,
35			view,
36			underlying,
37		}
38	}
39}
40
41impl Operator for SinkTableViewOperator {
42	fn id(&self) -> FlowNodeId {
43		self.node
44	}
45
46	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
47		let view = self.view.def().clone();
48		let schema: RowSchema = view.columns().into();
49		let object_id = SchemaId::table(self.underlying);
50
51		for diff in change.diffs.iter() {
52			match diff {
53				Diff::Insert {
54					post,
55				} => {
56					let coerced = coerce_columns(post, view.columns())?;
57					let row_count = coerced.row_count();
58					for row_idx in 0..row_count {
59						let row_number = coerced.row_numbers[row_idx];
60
61						let (_, encoded) =
62							encode_row_at_index(&coerced, row_idx, &schema, row_number);
63
64						let encoded = ViewRowInterceptor::pre_insert(
65							txn, &view, row_number, encoded,
66						)?;
67						let key = RowKey::encoded(object_id, row_number);
68						txn.set(&key, encoded.clone())?;
69						ViewRowInterceptor::post_insert(txn, &view, row_number, &encoded)?;
70					}
71					let version = txn.version();
72					txn.track_flow_change(Change {
73						origin: ChangeOrigin::Schema(SchemaId::view(view.id())),
74						version,
75						diffs: vec![Diff::Insert {
76							post: coerced,
77						}],
78					});
79				}
80				Diff::Update {
81					pre,
82					post,
83				} => {
84					let coerced_pre = coerce_columns(pre, view.columns())?;
85					let coerced_post = coerce_columns(post, view.columns())?;
86					let row_count = coerced_post.row_count();
87					for row_idx in 0..row_count {
88						let pre_row_number = coerced_pre.row_numbers[row_idx];
89						let post_row_number = coerced_post.row_numbers[row_idx];
90						let (_, pre_encoded) = encode_row_at_index(
91							&coerced_pre,
92							row_idx,
93							&schema,
94							pre_row_number,
95						);
96						let (_, post_encoded) = encode_row_at_index(
97							&coerced_post,
98							row_idx,
99							&schema,
100							post_row_number,
101						);
102
103						let post_encoded = ViewRowInterceptor::pre_update(
104							txn,
105							&view,
106							post_row_number,
107							post_encoded,
108						)?;
109						let pre_key = RowKey::encoded(object_id, pre_row_number);
110						let post_key = RowKey::encoded(object_id, post_row_number);
111						txn.remove(&pre_key)?;
112						txn.set(&post_key, post_encoded.clone())?;
113						ViewRowInterceptor::post_update(
114							txn,
115							&view,
116							post_row_number,
117							&post_encoded,
118							&pre_encoded,
119						)?;
120					}
121					let version = txn.version();
122					txn.track_flow_change(Change {
123						origin: ChangeOrigin::Schema(SchemaId::view(view.id())),
124						version,
125						diffs: vec![Diff::Update {
126							pre: coerced_pre,
127							post: coerced_post,
128						}],
129					});
130				}
131				Diff::Remove {
132					pre,
133				} => {
134					let coerced = coerce_columns(pre, view.columns())?;
135					let row_count = coerced.row_count();
136					for row_idx in 0..row_count {
137						let row_number = coerced.row_numbers[row_idx];
138						let (_, encoded) =
139							encode_row_at_index(&coerced, row_idx, &schema, row_number);
140
141						ViewRowInterceptor::pre_delete(txn, &view, row_number)?;
142						let key = RowKey::encoded(object_id, row_number);
143						txn.remove(&key)?;
144						ViewRowInterceptor::post_delete(txn, &view, row_number, &encoded)?;
145					}
146					let version = txn.version();
147					txn.track_flow_change(Change {
148						origin: ChangeOrigin::Schema(SchemaId::view(view.id())),
149						version,
150						diffs: vec![Diff::Remove {
151							pre: coerced,
152						}],
153					});
154				}
155			}
156		}
157
158		Ok(Change::from_flow(self.node, change.version, Vec::new()))
159	}
160
161	fn pull(&self, _txn: &mut FlowTransaction, _rows: &[RowNumber]) -> Result<Columns> {
162		unreachable!()
163	}
164}