reifydb_sub_flow/operator/sink/
subscription.rs1use std::sync::Arc;
5
6use reifydb_abi::flow::diff::DiffType;
7use reifydb_catalog::catalog::Catalog;
8use reifydb_core::{
9 encoded::{
10 key::EncodedKey,
11 schema::{RowSchema, RowSchemaField},
12 },
13 interface::{
14 catalog::{flow::FlowNodeId, subscription::IMPLICIT_COLUMN_OP},
15 change::{Change, Diff},
16 resolved::ResolvedSubscription,
17 },
18 key::subscription_row::SubscriptionRowKey,
19 util::encoding::keycode::serializer::KeySerializer,
20 value::column::{Column, columns::Columns, data::ColumnData},
21};
22use reifydb_type::{Result, fragment::Fragment, value::row_number::RowNumber};
23
24use super::encode_row_at_index;
25use crate::{
26 Operator,
27 operator::{
28 Operators,
29 stateful::counter::{Counter, CounterDirection},
30 },
31 transaction::FlowTransaction,
32};
33
34pub struct SinkSubscriptionOperator {
35 #[allow(dead_code)]
36 parent: Arc<Operators>,
37 node: FlowNodeId,
38 subscription: ResolvedSubscription,
39 counter: Counter,
40}
41
42impl SinkSubscriptionOperator {
43 pub fn new(parent: Arc<Operators>, node: FlowNodeId, subscription: ResolvedSubscription) -> Self {
44 let counter_key = {
45 let mut serializer = KeySerializer::new();
46 serializer.extend_u64(subscription.def().id.0);
47 EncodedKey::new(serializer.finish())
48 };
49
50 Self {
51 parent,
52 node,
53 subscription,
54 counter: Counter::with_key(node, counter_key, CounterDirection::Descending),
55 }
56 }
57
58 fn add_implicit_columns(columns: &Columns, op: DiffType) -> Columns {
60 let row_count = columns.row_count();
61
62 let mut all_columns: Vec<Column> = columns.iter().cloned().collect();
63
64 all_columns.push(Column {
65 name: Fragment::internal(IMPLICIT_COLUMN_OP),
66 data: ColumnData::uint1(vec![op as u8; row_count]),
67 });
68
69 Columns::with_row_numbers(all_columns, columns.row_numbers.to_vec())
70 }
71}
72
73impl Operator for SinkSubscriptionOperator {
74 fn id(&self) -> FlowNodeId {
75 self.node
76 }
77
78 fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
79 let subscription = self.subscription.def().clone();
80
81 for diff in change.diffs.iter() {
82 match diff {
83 Diff::Insert {
84 post,
85 } => {
86 let with_implicit = Self::add_implicit_columns(post, DiffType::Insert);
87
88 let schema = {
89 let catalog = txn.catalog();
90 create_schema_from_columns(&with_implicit, catalog)?
91 };
92
93 let row_count = with_implicit.row_count();
94 for row_idx in 0..row_count {
95 let row_number = self.counter.next(txn)?;
96
97 let (_, encoded) = encode_row_at_index(
98 &with_implicit,
99 row_idx,
100 &schema,
101 row_number,
102 );
103
104 let key = SubscriptionRowKey::encoded(subscription.id, row_number);
105 txn.set(&key, encoded)?;
106 }
107 }
108 Diff::Update {
109 pre: _pre,
110 post,
111 } => {
112 let with_implicit = Self::add_implicit_columns(post, DiffType::Update);
113
114 let schema = {
115 let catalog = txn.catalog();
116 create_schema_from_columns(&with_implicit, catalog)?
117 };
118
119 let row_count = with_implicit.row_count();
120 for row_idx in 0..row_count {
121 let row_number = self.counter.next(txn)?;
122
123 let (_, encoded) = encode_row_at_index(
124 &with_implicit,
125 row_idx,
126 &schema,
127 row_number,
128 );
129
130 let key = SubscriptionRowKey::encoded(subscription.id, row_number);
131 txn.set(&key, encoded)?;
132 }
133 }
134 Diff::Remove {
135 pre,
136 } => {
137 let with_implicit = Self::add_implicit_columns(pre, DiffType::Remove);
138
139 let schema = {
140 let catalog = txn.catalog();
141 create_schema_from_columns(&with_implicit, catalog)?
142 };
143
144 let row_count = with_implicit.row_count();
145 for row_idx in 0..row_count {
146 let row_number = self.counter.next(txn)?;
147
148 let (_, encoded) = encode_row_at_index(
149 &with_implicit,
150 row_idx,
151 &schema,
152 row_number,
153 );
154
155 let key = SubscriptionRowKey::encoded(subscription.id, row_number);
156 txn.set(&key, encoded)?;
157 }
158 }
159 }
160 }
161
162 Ok(Change::from_flow(self.node, change.version, Vec::new()))
163 }
164
165 fn pull(&self, _txn: &mut FlowTransaction, _rows: &[RowNumber]) -> Result<Columns> {
166 unreachable!()
167 }
168}
169
170fn create_schema_from_columns(columns: &Columns, catalog: &Catalog) -> Result<RowSchema> {
172 let fields: Vec<RowSchemaField> = columns
173 .iter()
174 .map(|col| RowSchemaField::unconstrained(col.name.to_string(), col.data().get_type()))
175 .collect();
176
177 catalog.schema.get_or_create(fields)
178}