reifydb_sub_flow/operator/sink/
series_view.rs1use std::sync::Arc;
5
6use reifydb_core::{
7 encoded::shape::RowShape,
8 interface::{
9 catalog::{flow::FlowNodeId, id::SeriesId, series::SeriesKey, shape::ShapeId, view::View},
10 change::{Change, ChangeOrigin, Diff},
11 resolved::ResolvedView,
12 },
13 key::row::RowKey,
14 value::column::columns::Columns,
15};
16use reifydb_transaction::interceptor::view_row::ViewRowInterceptor;
17use reifydb_type::{
18 Result,
19 value::{datetime::DateTime, row_number::RowNumber},
20};
21use smallvec::smallvec;
22
23use super::{coerce_columns, encode_row_at_index};
24use crate::{Operator, operator::Operators, transaction::FlowTransaction};
25
26pub struct SinkSeriesViewOperator {
27 #[allow(dead_code)]
28 parent: Arc<Operators>,
29 node: FlowNodeId,
30 view: ResolvedView,
31 series_id: SeriesId,
32 #[allow(dead_code)]
33 key: SeriesKey,
34}
35
36impl SinkSeriesViewOperator {
37 pub fn new(
38 parent: Arc<Operators>,
39 node: FlowNodeId,
40 view: ResolvedView,
41 series_id: SeriesId,
42 key: SeriesKey,
43 ) -> Self {
44 Self {
45 parent,
46 node,
47 view,
48 series_id,
49 key,
50 }
51 }
52}
53
54impl Operator for SinkSeriesViewOperator {
55 fn id(&self) -> FlowNodeId {
56 self.node
57 }
58
59 fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
60 let view = self.view.def().clone();
61 let shape: RowShape = view.columns().into();
62 let object_id = ShapeId::series(self.series_id);
63
64 for diff in change.diffs.iter() {
65 match diff {
66 Diff::Insert {
67 post,
68 } => self.apply_series_view_insert(txn, &view, &shape, object_id, post)?,
69 Diff::Update {
70 pre,
71 post,
72 } => self.apply_series_view_update(txn, &view, &shape, object_id, pre, post)?,
73 Diff::Remove {
74 pre,
75 } => self.apply_series_view_remove(txn, &view, &shape, object_id, pre)?,
76 }
77 }
78
79 Ok(Change::from_flow(self.node, change.version, Vec::new(), change.changed_at))
80 }
81
82 fn pull(&self, _txn: &mut FlowTransaction, _rows: &[RowNumber]) -> Result<Columns> {
83 unreachable!()
84 }
85}
86
87impl SinkSeriesViewOperator {
88 #[inline]
89 fn apply_series_view_insert(
90 &self,
91 txn: &mut FlowTransaction,
92 view: &View,
93 shape: &RowShape,
94 object_id: ShapeId,
95 post: &Arc<Columns>,
96 ) -> Result<()> {
97 let coerced = coerce_columns(post, view.columns())?;
98 let row_count = coerced.row_count();
99 for row_idx in 0..row_count {
100 let row_number = coerced.row_numbers[row_idx];
101 let (_, encoded) = encode_row_at_index(&coerced, row_idx, shape, row_number)?;
102 let encoded = ViewRowInterceptor::pre_insert(txn, view, row_number, encoded)?;
103 let key = RowKey::encoded(object_id, row_number);
104 txn.set(&key, encoded.clone())?;
105 ViewRowInterceptor::post_insert(txn, view, row_number, &encoded)?;
106 }
107 emit_view_change(txn, view, Diff::insert(coerced));
108 Ok(())
109 }
110
111 #[inline]
112 fn apply_series_view_update(
113 &self,
114 txn: &mut FlowTransaction,
115 view: &View,
116 shape: &RowShape,
117 object_id: ShapeId,
118 pre: &Arc<Columns>,
119 post: &Arc<Columns>,
120 ) -> Result<()> {
121 let coerced_pre = coerce_columns(pre, view.columns())?;
122 let coerced_post = coerce_columns(post, view.columns())?;
123 let row_count = coerced_post.row_count();
124 for row_idx in 0..row_count {
125 let pre_row_number = coerced_pre.row_numbers[row_idx];
126 let post_row_number = coerced_post.row_numbers[row_idx];
127 let (_, pre_encoded) = encode_row_at_index(&coerced_pre, row_idx, shape, pre_row_number)?;
128 let (_, post_encoded) = encode_row_at_index(&coerced_post, row_idx, shape, post_row_number)?;
129
130 let post_encoded = ViewRowInterceptor::pre_update(txn, view, post_row_number, post_encoded)?;
131 let pre_key = RowKey::encoded(object_id, pre_row_number);
132 let post_key = RowKey::encoded(object_id, post_row_number);
133 txn.remove(&pre_key)?;
134 txn.set(&post_key, post_encoded.clone())?;
135 ViewRowInterceptor::post_update(txn, view, post_row_number, &post_encoded, &pre_encoded)?;
136 }
137 emit_view_change(txn, view, Diff::update(coerced_pre, coerced_post));
138 Ok(())
139 }
140
141 #[inline]
142 fn apply_series_view_remove(
143 &self,
144 txn: &mut FlowTransaction,
145 view: &View,
146 shape: &RowShape,
147 object_id: ShapeId,
148 pre: &Arc<Columns>,
149 ) -> Result<()> {
150 let coerced = coerce_columns(pre, view.columns())?;
151 let row_count = coerced.row_count();
152 for row_idx in 0..row_count {
153 let row_number = coerced.row_numbers[row_idx];
154 let (_, encoded) = encode_row_at_index(&coerced, row_idx, shape, row_number)?;
155 ViewRowInterceptor::pre_delete(txn, view, row_number)?;
156 let key = RowKey::encoded(object_id, row_number);
157 txn.remove(&key)?;
158 ViewRowInterceptor::post_delete(txn, view, row_number, &encoded)?;
159 }
160 emit_view_change(txn, view, Diff::remove(coerced));
161 Ok(())
162 }
163}
164
165#[inline]
166fn emit_view_change(txn: &mut FlowTransaction, view: &View, diff: Diff) {
167 let version = txn.version();
168 let changed_at = DateTime::from_nanos(txn.clock().now_nanos());
169 txn.track_flow_change(Change {
170 origin: ChangeOrigin::Shape(ShapeId::view(view.id())),
171 version,
172 diffs: smallvec![diff],
173 changed_at,
174 });
175}