reifydb_sub_flow/operator/sink/
view.rs1use std::sync::Arc;
5
6use reifydb_core::{
7 encoded::schema::Schema,
8 interface::{
9 catalog::{flow::FlowNodeId, primitive::PrimitiveId},
10 change::{Change, ChangeOrigin, Diff},
11 resolved::ResolvedView,
12 },
13 key::row::RowKey,
14 value::column::columns::Columns,
15};
16use reifydb_transaction::interceptor::view::ViewInterceptor;
17use reifydb_type::{Result, value::row_number::RowNumber};
18
19use super::{coerce_columns, encode_row_at_index};
20use crate::{Operator, operator::Operators, transaction::FlowTransaction};
21
22pub struct SinkViewOperator {
23 #[allow(dead_code)]
24 parent: Arc<Operators>,
25 node: FlowNodeId,
26 view: ResolvedView,
27}
28
29impl SinkViewOperator {
30 pub fn new(parent: Arc<Operators>, node: FlowNodeId, view: ResolvedView) -> Self {
31 Self {
32 parent,
33 node,
34 view,
35 }
36 }
37}
38
39impl Operator for SinkViewOperator {
40 fn id(&self) -> FlowNodeId {
41 self.node
42 }
43
44 fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
45 let view_def = self.view.def().clone();
46 let schema: Schema = (&view_def.columns).into();
47
48 for diff in change.diffs.iter() {
49 match diff {
50 Diff::Insert {
51 post,
52 } => {
53 let coerced = coerce_columns(post, &view_def.columns)?;
55 let row_count = coerced.row_count();
56 for row_idx in 0..row_count {
57 let row_number = coerced.row_numbers[row_idx];
58 let (_, encoded) =
59 encode_row_at_index(&coerced, row_idx, &schema, row_number);
60
61 ViewInterceptor::pre_insert(txn, &view_def, row_number, &encoded)?;
62 let key = RowKey::encoded(PrimitiveId::view(view_def.id), row_number);
63 txn.set(&key, encoded.clone())?;
64 ViewInterceptor::post_insert(txn, &view_def, row_number, &encoded)?;
65
66 if let Some(log) = txn.testing_mut() {
67 let new = Columns::single_row(coerced.iter().map(|col| {
68 (col.name().text(), col.data().get_value(row_idx))
69 }));
70 let mutation_key = format!(
71 "views::{}::{}",
72 self.view.namespace().name(),
73 self.view.name()
74 );
75 log.record_insert(mutation_key, new);
76 }
77 }
78 let version = txn.version();
80 txn.push_view_change(Change {
81 origin: ChangeOrigin::Primitive(PrimitiveId::view(view_def.id)),
82 version,
83 diffs: vec![Diff::Insert {
84 post: coerced,
85 }],
86 });
87 }
88 Diff::Update {
89 pre,
90 post,
91 } => {
92 let coerced_pre = coerce_columns(pre, &view_def.columns)?;
94 let coerced_post = coerce_columns(post, &view_def.columns)?;
95 let row_count = coerced_post.row_count();
96 for row_idx in 0..row_count {
97 let pre_row_number = coerced_pre.row_numbers[row_idx];
98 let post_row_number = coerced_post.row_numbers[row_idx];
99 let (_, pre_encoded) = encode_row_at_index(
100 &coerced_pre,
101 row_idx,
102 &schema,
103 pre_row_number,
104 );
105 let (_, post_encoded) = encode_row_at_index(
106 &coerced_post,
107 row_idx,
108 &schema,
109 post_row_number,
110 );
111
112 ViewInterceptor::pre_update(
113 txn,
114 &view_def,
115 post_row_number,
116 &post_encoded,
117 )?;
118 let old_key =
119 RowKey::encoded(PrimitiveId::view(view_def.id), pre_row_number);
120 let new_key = RowKey::encoded(
121 PrimitiveId::view(view_def.id),
122 post_row_number,
123 );
124 txn.remove(&old_key)?;
125 txn.set(&new_key, post_encoded.clone())?;
126 ViewInterceptor::post_update(
127 txn,
128 &view_def,
129 post_row_number,
130 &post_encoded,
131 &pre_encoded,
132 )?;
133
134 if let Some(log) = txn.testing_mut() {
135 let old = Columns::single_row(coerced_pre.iter().map(|col| {
136 (col.name().text(), col.data().get_value(row_idx))
137 }));
138 let new = Columns::single_row(coerced_post.iter().map(|col| {
139 (col.name().text(), col.data().get_value(row_idx))
140 }));
141 let mutation_key = format!(
142 "views::{}::{}",
143 self.view.namespace().name(),
144 self.view.name()
145 );
146 log.record_update(mutation_key, old, new);
147 }
148 }
149 let version = txn.version();
151 txn.push_view_change(Change {
152 origin: ChangeOrigin::Primitive(PrimitiveId::view(view_def.id)),
153 version,
154 diffs: vec![Diff::Update {
155 pre: coerced_pre,
156 post: coerced_post,
157 }],
158 });
159 }
160 Diff::Remove {
161 pre,
162 } => {
163 let coerced = coerce_columns(pre, &view_def.columns)?;
165 let row_count = coerced.row_count();
166 for row_idx in 0..row_count {
167 let row_number = coerced.row_numbers[row_idx];
168 let (_, encoded) =
169 encode_row_at_index(&coerced, row_idx, &schema, row_number);
170
171 ViewInterceptor::pre_delete(txn, &view_def, row_number)?;
172 let key = RowKey::encoded(PrimitiveId::view(view_def.id), row_number);
173 txn.remove(&key)?;
174 ViewInterceptor::post_delete(txn, &view_def, row_number, &encoded)?;
175
176 if let Some(log) = txn.testing_mut() {
177 let old = Columns::single_row(coerced.iter().map(|col| {
178 (col.name().text(), col.data().get_value(row_idx))
179 }));
180 let mutation_key = format!(
181 "views::{}::{}",
182 self.view.namespace().name(),
183 self.view.name()
184 );
185 log.record_delete(mutation_key, old);
186 }
187 }
188 let version = txn.version();
190 txn.push_view_change(Change {
191 origin: ChangeOrigin::Primitive(PrimitiveId::view(view_def.id)),
192 version,
193 diffs: vec![Diff::Remove {
194 pre: coerced,
195 }],
196 });
197 }
198 }
199 }
200
201 Ok(Change::from_flow(self.node, change.version, Vec::new()))
202 }
203
204 fn pull(&self, _txn: &mut FlowTransaction, _rows: &[RowNumber]) -> Result<Columns> {
205 unreachable!()
206 }
207}