Skip to main content

reifydb_subscription/
consumer.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4//! Subscription data consumption logic.
5//!
6//! Provides static methods for reading, converting, and deleting subscription rows.
7
8use reifydb_core::{
9	encoded::key::EncodedKey,
10	error::diagnostic::internal::internal,
11	interface::catalog::id::SubscriptionId,
12	key::{
13		Key,
14		subscription_row::{SubscriptionRowKey, SubscriptionRowKeyRange},
15	},
16	value::column::{Column, columns::Columns, data::ColumnData},
17};
18use reifydb_engine::engine::StandardEngine;
19use reifydb_transaction::transaction::Transaction;
20use reifydb_type::{error::Error, fragment::Fragment};
21use tracing::debug;
22
23/// Static methods for consuming subscription data.
24pub struct SubscriptionConsumer;
25
26impl SubscriptionConsumer {
27	/// Read rows from a subscription's storage.
28	///
29	/// Returns (columns, row_keys) where row_keys are the encoded keys for deletion.
30	pub fn read_rows(
31		engine: &StandardEngine,
32		db_subscription_id: SubscriptionId,
33		last_consumed_key: Option<&EncodedKey>,
34		batch_size: usize,
35	) -> reifydb_type::Result<(Columns, Vec<EncodedKey>)> {
36		let mut cmd_txn = engine.begin_command()?;
37
38		// Get subscription definition using catalog function
39		let _sub_def = match reifydb_catalog::find_subscription(
40			&mut Transaction::Command(&mut cmd_txn),
41			db_subscription_id,
42		)? {
43			Some(def) => def,
44			None => {
45				tracing::warn!("Subscription {} not found", db_subscription_id);
46				return Ok((Columns::empty(), Vec::new()));
47			}
48		};
49
50		// Get schema registry for resolving per-row schemas
51		let catalog = engine.catalog();
52		let schema_registry = &catalog.schema;
53
54		// Create range for scanning rows
55		let range = if let Some(last_key) = last_consumed_key {
56			SubscriptionRowKeyRange::scan_range(db_subscription_id, Some(last_key))
57		} else {
58			SubscriptionRowKey::full_scan(db_subscription_id)
59		};
60
61		let mut stream = cmd_txn.range(range, batch_size)?;
62		let mut entries = Vec::new();
63		while let Some(result) = stream.next() {
64			entries.push(result?);
65		}
66		drop(stream); // Explicitly drop to release the borrow on cmd_txn
67
68		// Build dynamic column structure
69		use std::collections::HashMap;
70		let mut column_data: HashMap<String, ColumnData> = HashMap::new();
71
72		let mut row_numbers = Vec::new();
73		let mut row_keys = Vec::new();
74
75		// Process collected entries
76		for entry in entries {
77			// Decode row key
78			if let Some(Key::SubscriptionRow(sub_row_key)) = Key::decode(&entry.key) {
79				row_numbers.push(sub_row_key.row);
80				row_keys.push(entry.key.clone());
81
82				// Extract schema fingerprint from the encoded row
83				let fingerprint = entry.values.fingerprint();
84
85				// Resolve schema using SchemaRegistry
86				let schema = schema_registry
87					.get_or_load(fingerprint, &mut Transaction::Command(&mut cmd_txn))?
88					.ok_or_else(|| {
89						Error(internal(format!(
90							"Schema not found for fingerprint: {:?}",
91							fingerprint
92						)))
93					})?;
94
95				// Decode each field using the resolved schema
96				for (idx, field) in schema.fields().iter().enumerate() {
97					let value = schema.get_value(&entry.values, idx);
98
99					// Get or create column data for this field
100					column_data
101						.entry(field.name.clone())
102						.or_insert_with(|| {
103							ColumnData::with_capacity(field.constraint.get_type(), 0)
104						})
105						.push_value(value);
106				}
107			}
108		}
109
110		// Convert HashMap to Vec for Columns
111		let columns: Vec<Column> = column_data
112			.into_iter()
113			.map(|(name, data)| Column {
114				name: Fragment::internal(&name),
115				data,
116			})
117			.collect();
118
119		Ok((Columns::with_row_numbers(columns, row_numbers), row_keys))
120	}
121
122	/// Delete consumed rows from subscription storage.
123	pub fn delete_rows(engine: &StandardEngine, row_keys: &[EncodedKey]) -> reifydb_type::Result<()> {
124		if row_keys.is_empty() {
125			return Ok(());
126		}
127
128		let mut delete_txn = engine.begin_command()?;
129
130		for key in row_keys {
131			delete_txn.remove(key)?;
132		}
133
134		delete_txn.commit()?;
135
136		debug!("Deleted {} consumed rows", row_keys.len());
137		Ok(())
138	}
139}