reifydb_sub_flow/operator/sink/
view.rs1use std::sync::Arc;
5
6use reifydb_core::{
7 encoded::schema::Schema,
8 interface::{
9 catalog::{flow::FlowNodeId, primitive::PrimitiveId},
10 change::{Change, ChangeOrigin, Diff},
11 resolved::ResolvedView,
12 },
13 key::row::RowKey,
14 value::column::columns::Columns,
15};
16use reifydb_transaction::interceptor::view::ViewInterceptor;
17use reifydb_type::{Result, value::row_number::RowNumber};
18
19use super::{coerce_columns, encode_row_at_index};
20use crate::{Operator, operator::Operators, transaction::FlowTransaction};
21
22pub struct SinkViewOperator {
23 #[allow(dead_code)]
24 parent: Arc<Operators>,
25 node: FlowNodeId,
26 view: ResolvedView,
27}
28
29impl SinkViewOperator {
30 pub fn new(parent: Arc<Operators>, node: FlowNodeId, view: ResolvedView) -> Self {
31 Self {
32 parent,
33 node,
34 view,
35 }
36 }
37}
38
39impl Operator for SinkViewOperator {
40 fn id(&self) -> FlowNodeId {
41 self.node
42 }
43
44 fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
45 let view_def = self.view.def().clone();
46 let schema: Schema = (&view_def.columns).into();
47
48 for diff in change.diffs.iter() {
49 match diff {
50 Diff::Insert {
51 post,
52 } => {
53 let coerced = coerce_columns(post, &view_def.columns)?;
55 let row_count = coerced.row_count();
56 for row_idx in 0..row_count {
57 let row_number = coerced.row_numbers[row_idx];
58 let (_, encoded) =
59 encode_row_at_index(&coerced, row_idx, &schema, row_number);
60
61 ViewInterceptor::pre_insert(txn, &view_def, row_number, &encoded)?;
62 let key = RowKey::encoded(PrimitiveId::view(view_def.id), row_number);
63 txn.set(&key, encoded.clone())?;
64 ViewInterceptor::post_insert(txn, &view_def, row_number, &encoded)?;
65 }
66 let version = txn.version();
68 txn.push_view_change(Change {
69 origin: ChangeOrigin::Primitive(PrimitiveId::view(view_def.id)),
70 version,
71 diffs: vec![Diff::Insert {
72 post: coerced,
73 }],
74 });
75 }
76 Diff::Update {
77 pre,
78 post,
79 } => {
80 let coerced_pre = coerce_columns(pre, &view_def.columns)?;
82 let coerced_post = coerce_columns(post, &view_def.columns)?;
83 let row_count = coerced_post.row_count();
84 for row_idx in 0..row_count {
85 let pre_row_number = coerced_pre.row_numbers[row_idx];
86 let post_row_number = coerced_post.row_numbers[row_idx];
87 let (_, pre_encoded) = encode_row_at_index(
88 &coerced_pre,
89 row_idx,
90 &schema,
91 pre_row_number,
92 );
93 let (_, post_encoded) = encode_row_at_index(
94 &coerced_post,
95 row_idx,
96 &schema,
97 post_row_number,
98 );
99
100 ViewInterceptor::pre_update(
101 txn,
102 &view_def,
103 post_row_number,
104 &post_encoded,
105 )?;
106 let old_key =
107 RowKey::encoded(PrimitiveId::view(view_def.id), pre_row_number);
108 let new_key = RowKey::encoded(
109 PrimitiveId::view(view_def.id),
110 post_row_number,
111 );
112 txn.remove(&old_key)?;
113 txn.set(&new_key, post_encoded.clone())?;
114 ViewInterceptor::post_update(
115 txn,
116 &view_def,
117 post_row_number,
118 &post_encoded,
119 &pre_encoded,
120 )?;
121 }
122 let version = txn.version();
124 txn.push_view_change(Change {
125 origin: ChangeOrigin::Primitive(PrimitiveId::view(view_def.id)),
126 version,
127 diffs: vec![Diff::Update {
128 pre: coerced_pre,
129 post: coerced_post,
130 }],
131 });
132 }
133 Diff::Remove {
134 pre,
135 } => {
136 let coerced = coerce_columns(pre, &view_def.columns)?;
138 let row_count = coerced.row_count();
139 for row_idx in 0..row_count {
140 let row_number = coerced.row_numbers[row_idx];
141 let (_, encoded) =
142 encode_row_at_index(&coerced, row_idx, &schema, row_number);
143
144 ViewInterceptor::pre_delete(txn, &view_def, row_number)?;
145 let key = RowKey::encoded(PrimitiveId::view(view_def.id), row_number);
146 txn.remove(&key)?;
147 ViewInterceptor::post_delete(txn, &view_def, row_number, &encoded)?;
148 }
149 let version = txn.version();
151 txn.push_view_change(Change {
152 origin: ChangeOrigin::Primitive(PrimitiveId::view(view_def.id)),
153 version,
154 diffs: vec![Diff::Remove {
155 pre: coerced,
156 }],
157 });
158 }
159 }
160 }
161
162 Ok(Change::from_flow(self.node, change.version, Vec::new()))
163 }
164
165 fn pull(&self, _txn: &mut FlowTransaction, _rows: &[RowNumber]) -> Result<Columns> {
166 unreachable!()
167 }
168}