reifydb_sub_flow/operator/sink/
view.rs1use std::sync::Arc;
5
6use reifydb_core::{
7 encoded::schema::RowSchema,
8 interface::{
9 catalog::{flow::FlowNodeId, id::TableId, schema::SchemaId},
10 change::{Change, ChangeOrigin, Diff},
11 resolved::ResolvedView,
12 },
13 key::row::RowKey,
14 value::column::columns::Columns,
15};
16use reifydb_transaction::interceptor::view_row::ViewRowInterceptor;
17use reifydb_type::{Result, value::row_number::RowNumber};
18
19use super::{coerce_columns, encode_row_at_index};
20use crate::{Operator, operator::Operators, transaction::FlowTransaction};
21
22pub struct SinkTableViewOperator {
23 #[allow(dead_code)]
24 parent: Arc<Operators>,
25 node: FlowNodeId,
26 view: ResolvedView,
27 underlying: TableId,
28}
29
30impl SinkTableViewOperator {
31 pub fn new(parent: Arc<Operators>, node: FlowNodeId, view: ResolvedView, underlying: TableId) -> Self {
32 Self {
33 parent,
34 node,
35 view,
36 underlying,
37 }
38 }
39}
40
41impl Operator for SinkTableViewOperator {
42 fn id(&self) -> FlowNodeId {
43 self.node
44 }
45
46 fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
47 let view = self.view.def().clone();
48 let schema: RowSchema = view.columns().into();
49 let object_id = SchemaId::table(self.underlying);
50
51 for diff in change.diffs.iter() {
52 match diff {
53 Diff::Insert {
54 post,
55 } => {
56 let coerced = coerce_columns(post, view.columns())?;
57 let row_count = coerced.row_count();
58 for row_idx in 0..row_count {
59 let row_number = coerced.row_numbers[row_idx];
60
61 let (_, encoded) =
62 encode_row_at_index(&coerced, row_idx, &schema, row_number);
63
64 let encoded = ViewRowInterceptor::pre_insert(
65 txn, &view, row_number, encoded,
66 )?;
67 let key = RowKey::encoded(object_id, row_number);
68 txn.set(&key, encoded.clone())?;
69 ViewRowInterceptor::post_insert(txn, &view, row_number, &encoded)?;
70 }
71 let version = txn.version();
72 txn.track_flow_change(Change {
73 origin: ChangeOrigin::Schema(SchemaId::view(view.id())),
74 version,
75 diffs: vec![Diff::Insert {
76 post: coerced,
77 }],
78 });
79 }
80 Diff::Update {
81 pre,
82 post,
83 } => {
84 let coerced_pre = coerce_columns(pre, view.columns())?;
85 let coerced_post = coerce_columns(post, view.columns())?;
86 let row_count = coerced_post.row_count();
87 for row_idx in 0..row_count {
88 let pre_row_number = coerced_pre.row_numbers[row_idx];
89 let post_row_number = coerced_post.row_numbers[row_idx];
90 let (_, pre_encoded) = encode_row_at_index(
91 &coerced_pre,
92 row_idx,
93 &schema,
94 pre_row_number,
95 );
96 let (_, post_encoded) = encode_row_at_index(
97 &coerced_post,
98 row_idx,
99 &schema,
100 post_row_number,
101 );
102
103 let post_encoded = ViewRowInterceptor::pre_update(
104 txn,
105 &view,
106 post_row_number,
107 post_encoded,
108 )?;
109 let pre_key = RowKey::encoded(object_id, pre_row_number);
110 let post_key = RowKey::encoded(object_id, post_row_number);
111 txn.remove(&pre_key)?;
112 txn.set(&post_key, post_encoded.clone())?;
113 ViewRowInterceptor::post_update(
114 txn,
115 &view,
116 post_row_number,
117 &post_encoded,
118 &pre_encoded,
119 )?;
120 }
121 let version = txn.version();
122 txn.track_flow_change(Change {
123 origin: ChangeOrigin::Schema(SchemaId::view(view.id())),
124 version,
125 diffs: vec![Diff::Update {
126 pre: coerced_pre,
127 post: coerced_post,
128 }],
129 });
130 }
131 Diff::Remove {
132 pre,
133 } => {
134 let coerced = coerce_columns(pre, view.columns())?;
135 let row_count = coerced.row_count();
136 for row_idx in 0..row_count {
137 let row_number = coerced.row_numbers[row_idx];
138 let (_, encoded) =
139 encode_row_at_index(&coerced, row_idx, &schema, row_number);
140
141 ViewRowInterceptor::pre_delete(txn, &view, row_number)?;
142 let key = RowKey::encoded(object_id, row_number);
143 txn.remove(&key)?;
144 ViewRowInterceptor::post_delete(txn, &view, row_number, &encoded)?;
145 }
146 let version = txn.version();
147 txn.track_flow_change(Change {
148 origin: ChangeOrigin::Schema(SchemaId::view(view.id())),
149 version,
150 diffs: vec![Diff::Remove {
151 pre: coerced,
152 }],
153 });
154 }
155 }
156 }
157
158 Ok(Change::from_flow(self.node, change.version, Vec::new()))
159 }
160
161 fn pull(&self, _txn: &mut FlowTransaction, _rows: &[RowNumber]) -> Result<Columns> {
162 unreachable!()
163 }
164}