reifydb_sub_flow/operator/sink/
series_view.rs1use std::sync::Arc;
5
6use reifydb_core::{
7 encoded::schema::RowSchema,
8 interface::{
9 catalog::{flow::FlowNodeId, id::SeriesId, schema::SchemaId, series::SeriesKey},
10 change::{Change, ChangeOrigin, Diff},
11 resolved::ResolvedView,
12 },
13 key::row::RowKey,
14 value::column::columns::Columns,
15};
16use reifydb_transaction::interceptor::view_row::ViewRowInterceptor;
17use reifydb_type::{Result, value::row_number::RowNumber};
18
19use super::{coerce_columns, encode_row_at_index};
20use crate::{Operator, operator::Operators, transaction::FlowTransaction};
21
22pub struct SinkSeriesViewOperator {
23 #[allow(dead_code)]
24 parent: Arc<Operators>,
25 node: FlowNodeId,
26 view: ResolvedView,
27 series_id: SeriesId,
28 #[allow(dead_code)]
29 key: SeriesKey,
30}
31
32impl SinkSeriesViewOperator {
33 pub fn new(
34 parent: Arc<Operators>,
35 node: FlowNodeId,
36 view: ResolvedView,
37 series_id: SeriesId,
38 key: SeriesKey,
39 ) -> Self {
40 Self {
41 parent,
42 node,
43 view,
44 series_id,
45 key,
46 }
47 }
48}
49
50impl Operator for SinkSeriesViewOperator {
51 fn id(&self) -> FlowNodeId {
52 self.node
53 }
54
55 fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
56 let view = self.view.def().clone();
57 let schema: RowSchema = view.columns().into();
58 let object_id = SchemaId::series(self.series_id);
59
60 for diff in change.diffs.iter() {
61 match diff {
62 Diff::Insert {
63 post,
64 } => {
65 let coerced = coerce_columns(post, view.columns())?;
66 let row_count = coerced.row_count();
67 for row_idx in 0..row_count {
68 let row_number = coerced.row_numbers[row_idx];
69 let (_, encoded) =
70 encode_row_at_index(&coerced, row_idx, &schema, row_number);
71
72 let encoded = ViewRowInterceptor::pre_insert(
73 txn, &view, row_number, encoded,
74 )?;
75 let key = RowKey::encoded(object_id, row_number);
76 txn.set(&key, encoded.clone())?;
77 ViewRowInterceptor::post_insert(txn, &view, row_number, &encoded)?;
78 }
79 let version = txn.version();
80 txn.track_flow_change(Change {
81 origin: ChangeOrigin::Schema(SchemaId::view(view.id())),
82 version,
83 diffs: vec![Diff::Insert {
84 post: coerced,
85 }],
86 });
87 }
88 Diff::Update {
89 pre,
90 post,
91 } => {
92 let coerced_pre = coerce_columns(pre, view.columns())?;
93 let coerced_post = coerce_columns(post, view.columns())?;
94 let row_count = coerced_post.row_count();
95 for row_idx in 0..row_count {
96 let pre_row_number = coerced_pre.row_numbers[row_idx];
97 let post_row_number = coerced_post.row_numbers[row_idx];
98 let (_, pre_encoded) = encode_row_at_index(
99 &coerced_pre,
100 row_idx,
101 &schema,
102 pre_row_number,
103 );
104 let (_, post_encoded) = encode_row_at_index(
105 &coerced_post,
106 row_idx,
107 &schema,
108 post_row_number,
109 );
110
111 let post_encoded = ViewRowInterceptor::pre_update(
112 txn,
113 &view,
114 post_row_number,
115 post_encoded,
116 )?;
117 let pre_key = RowKey::encoded(object_id, pre_row_number);
118 let post_key = RowKey::encoded(object_id, post_row_number);
119 txn.remove(&pre_key)?;
120 txn.set(&post_key, post_encoded.clone())?;
121 ViewRowInterceptor::post_update(
122 txn,
123 &view,
124 post_row_number,
125 &post_encoded,
126 &pre_encoded,
127 )?;
128 }
129 let version = txn.version();
130 txn.track_flow_change(Change {
131 origin: ChangeOrigin::Schema(SchemaId::view(view.id())),
132 version,
133 diffs: vec![Diff::Update {
134 pre: coerced_pre,
135 post: coerced_post,
136 }],
137 });
138 }
139 Diff::Remove {
140 pre,
141 } => {
142 let coerced = coerce_columns(pre, view.columns())?;
143 let row_count = coerced.row_count();
144 for row_idx in 0..row_count {
145 let row_number = coerced.row_numbers[row_idx];
146 let (_, encoded) =
147 encode_row_at_index(&coerced, row_idx, &schema, row_number);
148
149 ViewRowInterceptor::pre_delete(txn, &view, row_number)?;
150 let key = RowKey::encoded(object_id, row_number);
151 txn.remove(&key)?;
152 ViewRowInterceptor::post_delete(txn, &view, row_number, &encoded)?;
153 }
154 let version = txn.version();
155 txn.track_flow_change(Change {
156 origin: ChangeOrigin::Schema(SchemaId::view(view.id())),
157 version,
158 diffs: vec![Diff::Remove {
159 pre: coerced,
160 }],
161 });
162 }
163 }
164 }
165
166 Ok(Change::from_flow(self.node, change.version, Vec::new()))
167 }
168
169 fn pull(&self, _txn: &mut FlowTransaction, _rows: &[RowNumber]) -> Result<Columns> {
170 unreachable!()
171 }
172}