Skip to main content

reifydb_subscription/
consumer.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4//! Subscription data consumption logic.
5//!
6//! Provides static methods for reading, converting, and deleting subscription rows.
7
8use std::collections::HashMap;
9
10use reifydb_catalog::find_subscription;
11use reifydb_core::{
12	encoded::key::EncodedKey,
13	error::diagnostic::internal::internal,
14	interface::catalog::id::SubscriptionId,
15	key::{
16		Key,
17		subscription_row::{SubscriptionRowKey, SubscriptionRowKeyRange},
18	},
19	value::column::{Column, columns::Columns, data::ColumnData},
20};
21use reifydb_engine::engine::StandardEngine;
22use reifydb_transaction::transaction::Transaction;
23use reifydb_type::{Result, error::Error, fragment::Fragment};
24use tracing::{debug, warn};
25
26/// Static methods for consuming subscription data.
27pub struct SubscriptionConsumer;
28
29impl SubscriptionConsumer {
30	/// Read rows from a subscription's storage.
31	///
32	/// Returns (columns, row_keys) where row_keys are the encoded keys for deletion.
33	pub fn read_rows(
34		engine: &StandardEngine,
35		db_subscription_id: SubscriptionId,
36		last_consumed_key: Option<&EncodedKey>,
37		batch_size: usize,
38	) -> Result<(Columns, Vec<EncodedKey>)> {
39		let mut cmd_txn = engine.begin_command()?;
40
41		// Get subscription definition using catalog function
42		let _sub_def = match find_subscription(&mut Transaction::Command(&mut cmd_txn), db_subscription_id)? {
43			Some(def) => def,
44			None => {
45				warn!("Subscription {} not found", db_subscription_id);
46				return Ok((Columns::empty(), Vec::new()));
47			}
48		};
49
50		// Get schema registry for resolving per-row schemas
51		let catalog = engine.catalog();
52		let schema_registry = &catalog.schema;
53
54		// Create range for scanning rows
55		let range = if let Some(last_key) = last_consumed_key {
56			SubscriptionRowKeyRange::scan_range(db_subscription_id, Some(last_key))
57		} else {
58			SubscriptionRowKey::full_scan(db_subscription_id)
59		};
60
61		let mut stream = cmd_txn.range(range, batch_size)?;
62		let mut entries = Vec::new();
63		while let Some(result) = stream.next() {
64			entries.push(result?);
65		}
66		drop(stream); // Explicitly drop to release the borrow on cmd_txn
67
68		// Build dynamic column structure
69		let mut column_data: HashMap<String, ColumnData> = HashMap::new();
70
71		let mut row_numbers = Vec::new();
72		let mut row_keys = Vec::new();
73
74		// Process collected entries
75		for entry in entries {
76			// Decode row key
77			if let Some(Key::SubscriptionRow(sub_row_key)) = Key::decode(&entry.key) {
78				row_numbers.push(sub_row_key.row);
79				row_keys.push(entry.key.clone());
80
81				// Extract schema fingerprint from the encoded row
82				let fingerprint = entry.values.fingerprint();
83
84				// Resolve schema using SchemaRegistry
85				let schema = schema_registry
86					.get_or_load(fingerprint, &mut Transaction::Command(&mut cmd_txn))?
87					.ok_or_else(|| {
88						Error(internal(format!(
89							"Schema not found for fingerprint: {:?}",
90							fingerprint
91						)))
92					})?;
93
94				// Decode each field using the resolved schema
95				for (idx, field) in schema.fields().iter().enumerate() {
96					let value = schema.get_value(&entry.values, idx);
97
98					// Get or create column data for this field
99					column_data
100						.entry(field.name.clone())
101						.or_insert_with(|| {
102							ColumnData::with_capacity(field.constraint.get_type(), 0)
103						})
104						.push_value(value);
105				}
106			}
107		}
108
109		// Convert HashMap to Vec for Columns
110		let columns: Vec<Column> = column_data
111			.into_iter()
112			.map(|(name, data)| Column {
113				name: Fragment::internal(&name),
114				data,
115			})
116			.collect();
117
118		Ok((Columns::with_row_numbers(columns, row_numbers), row_keys))
119	}
120
121	/// Delete consumed rows from subscription storage.
122	pub fn delete_rows(engine: &StandardEngine, row_keys: &[EncodedKey]) -> Result<()> {
123		if row_keys.is_empty() {
124			return Ok(());
125		}
126
127		let mut delete_txn = engine.begin_command()?;
128
129		for key in row_keys {
130			delete_txn.remove(key)?;
131		}
132
133		delete_txn.commit()?;
134
135		debug!("Deleted {} consumed rows", row_keys.len());
136		Ok(())
137	}
138}