Skip to main content

reifydb_subscription/
consumer.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Subscription data consumption logic.
5//!
6//! Provides static methods for reading, converting, and deleting subscription rows.
7
8use std::collections::{HashMap, HashSet};
9
10use reifydb_catalog::find_subscription;
11use reifydb_core::{
12	encoded::key::EncodedKey,
13	error::diagnostic::internal::internal,
14	interface::catalog::id::SubscriptionId,
15	key::{
16		Key,
17		subscription_row::{SubscriptionRowKey, SubscriptionRowKeyRange},
18	},
19	value::column::{Column, columns::Columns, data::ColumnData},
20};
21use reifydb_engine::engine::StandardEngine;
22use reifydb_transaction::transaction::Transaction;
23use reifydb_type::{Result, error::Error, fragment::Fragment, value::identity::IdentityId};
24use tracing::{debug, warn};
25
26/// Static methods for consuming subscription data.
27pub struct SubscriptionConsumer;
28
29impl SubscriptionConsumer {
30	/// Read rows from a subscription's storage.
31	///
32	/// Returns (columns, row_keys) where row_keys are the encoded keys for deletion.
33	pub fn read_rows(
34		engine: &StandardEngine,
35		db_subscription_id: SubscriptionId,
36		last_consumed_key: Option<&EncodedKey>,
37		batch_size: usize,
38	) -> Result<(Columns, Vec<EncodedKey>)> {
39		let mut cmd_txn = engine.begin_command(IdentityId::system())?;
40
41		// Get subscription definition using catalog function
42		let _sub_def = match find_subscription(&mut Transaction::Command(&mut cmd_txn), db_subscription_id)? {
43			Some(def) => def,
44			None => {
45				warn!("Subscription {} not found", db_subscription_id);
46				return Ok((Columns::empty(), Vec::new()));
47			}
48		};
49
50		// Get schema registry for resolving per-row schemas
51		let catalog = engine.catalog();
52		let row_schema_registry = &catalog.schema;
53
54		// Create range for scanning rows
55		let range = if let Some(last_key) = last_consumed_key {
56			SubscriptionRowKeyRange::scan_range(db_subscription_id, Some(last_key))
57		} else {
58			SubscriptionRowKey::full_scan(db_subscription_id)
59		};
60
61		let mut stream = cmd_txn.range(range, batch_size)?;
62		let mut entries = Vec::new();
63		while let Some(result) = stream.next() {
64			entries.push(result?);
65		}
66		drop(stream); // Explicitly drop to release the borrow on cmd_txn
67
68		// Build dynamic column structure
69		let mut column_data: HashMap<String, ColumnData> = HashMap::new();
70
71		let mut row_numbers = Vec::new();
72		let mut row_keys = Vec::new();
73		let mut row_count: usize = 0;
74
75		// Process collected entries
76		for entry in entries {
77			// Decode row key
78			if let Some(Key::SubscriptionRow(sub_row_key)) = Key::decode(&entry.key) {
79				row_numbers.push(sub_row_key.row);
80				row_keys.push(entry.key.clone());
81
82				// Extract schema fingerprint from the encoded row
83				let fingerprint = entry.row.fingerprint();
84
85				// Resolve schema using RowSchemaRegistry
86				let schema = row_schema_registry
87					.get_or_load(fingerprint, &mut Transaction::Command(&mut cmd_txn))?
88					.ok_or_else(|| {
89						Error(internal(format!(
90							"Schema not found for fingerprint: {:?}",
91							fingerprint
92						)))
93					})?;
94
95				let mut seen_in_this_entry = HashSet::new();
96
97				// Decode each field using the resolved schema
98				for (idx, field) in schema.fields().iter().enumerate() {
99					let value = schema.get_value(&entry.row, idx);
100					seen_in_this_entry.insert(field.name.clone());
101
102					// Get or create column data for this field
103					column_data
104						.entry(field.name.clone())
105						.or_insert_with(|| {
106							// New column — backfill with None for all prior rows
107							let mut cd = ColumnData::with_capacity(
108								field.constraint.get_type(),
109								0,
110							);
111							for _ in 0..row_count {
112								cd.push_none();
113							}
114							cd
115						})
116						.push_value(value);
117				}
118
119				// Pad columns not seen in this entry with None
120				for (name, col) in column_data.iter_mut() {
121					if !seen_in_this_entry.contains(name) {
122						col.push_none();
123					}
124				}
125
126				row_count += 1;
127			}
128		}
129
130		// Convert HashMap to Vec for Columns
131		let columns: Vec<Column> = column_data
132			.into_iter()
133			.map(|(name, data)| Column {
134				name: Fragment::internal(&name),
135				data,
136			})
137			.collect();
138
139		Ok((Columns::with_row_numbers(columns, row_numbers), row_keys))
140	}
141
142	/// Delete consumed rows from subscription storage.
143	pub fn delete_rows(engine: &StandardEngine, row_keys: &[EncodedKey]) -> Result<()> {
144		if row_keys.is_empty() {
145			return Ok(());
146		}
147
148		let mut delete_txn = engine.begin_command(IdentityId::system())?;
149
150		for key in row_keys {
151			delete_txn.remove(key)?;
152		}
153
154		delete_txn.commit()?;
155
156		debug!("Deleted {} consumed rows", row_keys.len());
157		Ok(())
158	}
159}