Skip to main content

reifydb_sub_flow/operator/sink/
subscription.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_abi::flow::diff::DiffType;
7use reifydb_catalog::catalog::Catalog;
8use reifydb_core::{
9	encoded::{
10		key::EncodedKey,
11		schema::{RowSchema, RowSchemaField},
12	},
13	interface::{
14		catalog::{flow::FlowNodeId, subscription::IMPLICIT_COLUMN_OP},
15		change::{Change, Diff},
16		resolved::ResolvedSubscription,
17	},
18	key::subscription_row::SubscriptionRowKey,
19	util::encoding::keycode::serializer::KeySerializer,
20	value::column::{Column, columns::Columns, data::ColumnData},
21};
22use reifydb_type::{Result, fragment::Fragment, value::row_number::RowNumber};
23
24use super::encode_row_at_index;
25use crate::{
26	Operator,
27	operator::{
28		Operators,
29		stateful::counter::{Counter, CounterDirection},
30	},
31	transaction::FlowTransaction,
32};
33
34pub struct SinkSubscriptionOperator {
35	#[allow(dead_code)]
36	parent: Arc<Operators>,
37	node: FlowNodeId,
38	subscription: ResolvedSubscription,
39	counter: Counter,
40}
41
42impl SinkSubscriptionOperator {
43	pub fn new(parent: Arc<Operators>, node: FlowNodeId, subscription: ResolvedSubscription) -> Self {
44		let counter_key = {
45			let mut serializer = KeySerializer::new();
46			serializer.extend_u64(subscription.def().id.0);
47			EncodedKey::new(serializer.finish())
48		};
49
50		Self {
51			parent,
52			node,
53			subscription,
54			counter: Counter::with_key(node, counter_key, CounterDirection::Descending),
55		}
56	}
57
58	/// Add implicit columns (_op) to the columns
59	fn add_implicit_columns(columns: &Columns, op: DiffType) -> Columns {
60		let row_count = columns.row_count();
61
62		let mut all_columns: Vec<Column> = columns.iter().cloned().collect();
63
64		all_columns.push(Column {
65			name: Fragment::internal(IMPLICIT_COLUMN_OP),
66			data: ColumnData::uint1(vec![op as u8; row_count]),
67		});
68
69		Columns::with_row_numbers(all_columns, columns.row_numbers.to_vec())
70	}
71}
72
73impl Operator for SinkSubscriptionOperator {
74	fn id(&self) -> FlowNodeId {
75		self.node
76	}
77
78	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
79		let subscription = self.subscription.def().clone();
80
81		for diff in change.diffs.iter() {
82			match diff {
83				Diff::Insert {
84					post,
85				} => {
86					let with_implicit = Self::add_implicit_columns(post, DiffType::Insert);
87
88					let schema = {
89						let catalog = txn.catalog();
90						create_schema_from_columns(&with_implicit, catalog)?
91					};
92
93					let row_count = with_implicit.row_count();
94					for row_idx in 0..row_count {
95						let row_number = self.counter.next(txn)?;
96
97						let (_, encoded) = encode_row_at_index(
98							&with_implicit,
99							row_idx,
100							&schema,
101							row_number,
102						);
103
104						let key = SubscriptionRowKey::encoded(subscription.id, row_number);
105						txn.set(&key, encoded)?;
106					}
107				}
108				Diff::Update {
109					pre: _pre,
110					post,
111				} => {
112					let with_implicit = Self::add_implicit_columns(post, DiffType::Update);
113
114					let schema = {
115						let catalog = txn.catalog();
116						create_schema_from_columns(&with_implicit, catalog)?
117					};
118
119					let row_count = with_implicit.row_count();
120					for row_idx in 0..row_count {
121						let row_number = self.counter.next(txn)?;
122
123						let (_, encoded) = encode_row_at_index(
124							&with_implicit,
125							row_idx,
126							&schema,
127							row_number,
128						);
129
130						let key = SubscriptionRowKey::encoded(subscription.id, row_number);
131						txn.set(&key, encoded)?;
132					}
133				}
134				Diff::Remove {
135					pre,
136				} => {
137					let with_implicit = Self::add_implicit_columns(pre, DiffType::Remove);
138
139					let schema = {
140						let catalog = txn.catalog();
141						create_schema_from_columns(&with_implicit, catalog)?
142					};
143
144					let row_count = with_implicit.row_count();
145					for row_idx in 0..row_count {
146						let row_number = self.counter.next(txn)?;
147
148						let (_, encoded) = encode_row_at_index(
149							&with_implicit,
150							row_idx,
151							&schema,
152							row_number,
153						);
154
155						let key = SubscriptionRowKey::encoded(subscription.id, row_number);
156						txn.set(&key, encoded)?;
157					}
158				}
159			}
160		}
161
162		Ok(Change::from_flow(self.node, change.version, Vec::new()))
163	}
164
165	fn pull(&self, _txn: &mut FlowTransaction, _rows: &[RowNumber]) -> Result<Columns> {
166		unreachable!()
167	}
168}
169
170/// Create and persist a schema from actual column data
171fn create_schema_from_columns(columns: &Columns, catalog: &Catalog) -> Result<RowSchema> {
172	let fields: Vec<RowSchemaField> = columns
173		.iter()
174		.map(|col| RowSchemaField::unconstrained(col.name.to_string(), col.data().get_type()))
175		.collect();
176
177	catalog.schema.get_or_create(fields)
178}