reifydb_sub_flow/operator/sink/
view.rs1use std::sync::Arc;
5
6use reifydb_core::{
7 encoded::shape::RowShape,
8 interface::{
9 catalog::{flow::FlowNodeId, id::TableId, shape::ShapeId, view::View},
10 change::{Change, ChangeOrigin, Diff},
11 resolved::ResolvedView,
12 },
13 key::row::RowKey,
14 value::column::columns::Columns,
15};
16use reifydb_transaction::interceptor::view_row::ViewRowInterceptor;
17use reifydb_type::{
18 Result,
19 value::{datetime::DateTime, row_number::RowNumber},
20};
21use smallvec::smallvec;
22
23use super::{coerce_columns, encode_row_at_index};
24use crate::{Operator, operator::Operators, transaction::FlowTransaction};
25
26pub struct SinkTableViewOperator {
27 #[allow(dead_code)]
28 parent: Arc<Operators>,
29 node: FlowNodeId,
30 view: ResolvedView,
31 underlying: TableId,
32}
33
34impl SinkTableViewOperator {
35 pub fn new(parent: Arc<Operators>, node: FlowNodeId, view: ResolvedView, underlying: TableId) -> Self {
36 Self {
37 parent,
38 node,
39 view,
40 underlying,
41 }
42 }
43}
44
45impl Operator for SinkTableViewOperator {
46 fn id(&self) -> FlowNodeId {
47 self.node
48 }
49
50 fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
51 let view = self.view.def().clone();
52 let shape: RowShape = view.columns().into();
53 let object_id = ShapeId::table(self.underlying);
54
55 for diff in change.diffs.iter() {
56 match diff {
57 Diff::Insert {
58 post,
59 } => self.apply_table_view_insert(txn, &view, &shape, object_id, post)?,
60 Diff::Update {
61 pre,
62 post,
63 } => self.apply_table_view_update(txn, &view, &shape, object_id, pre, post)?,
64 Diff::Remove {
65 pre,
66 } => self.apply_table_view_remove(txn, &view, &shape, object_id, pre)?,
67 }
68 }
69
70 Ok(Change::from_flow(self.node, change.version, Vec::new(), change.changed_at))
71 }
72
73 fn pull(&self, _txn: &mut FlowTransaction, _rows: &[RowNumber]) -> Result<Columns> {
74 unreachable!()
75 }
76}
77
78impl SinkTableViewOperator {
79 #[inline]
80 fn apply_table_view_insert(
81 &self,
82 txn: &mut FlowTransaction,
83 view: &View,
84 shape: &RowShape,
85 object_id: ShapeId,
86 post: &Arc<Columns>,
87 ) -> Result<()> {
88 let coerced = coerce_columns(post, view.columns())?;
89 let row_count = coerced.row_count();
90 for row_idx in 0..row_count {
91 let row_number = coerced.row_numbers[row_idx];
92 let (_, encoded) = encode_row_at_index(&coerced, row_idx, shape, row_number)?;
93 let encoded = ViewRowInterceptor::pre_insert(txn, view, row_number, encoded)?;
94 let key = RowKey::encoded(object_id, row_number);
95 txn.set(&key, encoded.clone())?;
96 ViewRowInterceptor::post_insert(txn, view, row_number, &encoded)?;
97 }
98 emit_view_change(txn, view, Diff::insert(coerced));
99 Ok(())
100 }
101
102 #[inline]
103 fn apply_table_view_update(
104 &self,
105 txn: &mut FlowTransaction,
106 view: &View,
107 shape: &RowShape,
108 object_id: ShapeId,
109 pre: &Arc<Columns>,
110 post: &Arc<Columns>,
111 ) -> Result<()> {
112 let coerced_pre = coerce_columns(pre, view.columns())?;
113 let coerced_post = coerce_columns(post, view.columns())?;
114 let row_count = coerced_post.row_count();
115 for row_idx in 0..row_count {
116 let pre_row_number = coerced_pre.row_numbers[row_idx];
117 let post_row_number = coerced_post.row_numbers[row_idx];
118 let (_, pre_encoded) = encode_row_at_index(&coerced_pre, row_idx, shape, pre_row_number)?;
119 let (_, post_encoded) = encode_row_at_index(&coerced_post, row_idx, shape, post_row_number)?;
120
121 let post_encoded = ViewRowInterceptor::pre_update(txn, view, post_row_number, post_encoded)?;
122 let pre_key = RowKey::encoded(object_id, pre_row_number);
123 let post_key = RowKey::encoded(object_id, post_row_number);
124 txn.remove(&pre_key)?;
125 txn.set(&post_key, post_encoded.clone())?;
126 ViewRowInterceptor::post_update(txn, view, post_row_number, &post_encoded, &pre_encoded)?;
127 }
128 emit_view_change(txn, view, Diff::update(coerced_pre, coerced_post));
129 Ok(())
130 }
131
132 #[inline]
133 fn apply_table_view_remove(
134 &self,
135 txn: &mut FlowTransaction,
136 view: &View,
137 shape: &RowShape,
138 object_id: ShapeId,
139 pre: &Arc<Columns>,
140 ) -> Result<()> {
141 let coerced = coerce_columns(pre, view.columns())?;
142 let row_count = coerced.row_count();
143 for row_idx in 0..row_count {
144 let row_number = coerced.row_numbers[row_idx];
145 let (_, encoded) = encode_row_at_index(&coerced, row_idx, shape, row_number)?;
146 ViewRowInterceptor::pre_delete(txn, view, row_number)?;
147 let key = RowKey::encoded(object_id, row_number);
148 txn.remove(&key)?;
149 ViewRowInterceptor::post_delete(txn, view, row_number, &encoded)?;
150 }
151 emit_view_change(txn, view, Diff::remove(coerced));
152 Ok(())
153 }
154}
155
156#[inline]
157fn emit_view_change(txn: &mut FlowTransaction, view: &View, diff: Diff) {
158 let version = txn.version();
159 let changed_at = DateTime::from_nanos(txn.clock().now_nanos());
160 txn.track_flow_change(Change {
161 origin: ChangeOrigin::Shape(ShapeId::view(view.id())),
162 version,
163 diffs: smallvec![diff],
164 changed_at,
165 });
166}