reifydb_subscription/
consumer.rs1use reifydb_core::{
9 encoded::key::EncodedKey,
10 error::diagnostic::internal::internal,
11 interface::catalog::id::SubscriptionId,
12 key::{
13 Key,
14 subscription_row::{SubscriptionRowKey, SubscriptionRowKeyRange},
15 },
16 value::column::{Column, columns::Columns, data::ColumnData},
17};
18use reifydb_engine::engine::StandardEngine;
19use reifydb_transaction::transaction::Transaction;
20use reifydb_type::{error::Error, fragment::Fragment};
21use tracing::debug;
22
23pub struct SubscriptionConsumer;
25
26impl SubscriptionConsumer {
27 pub fn read_rows(
31 engine: &StandardEngine,
32 db_subscription_id: SubscriptionId,
33 last_consumed_key: Option<&EncodedKey>,
34 batch_size: usize,
35 ) -> reifydb_type::Result<(Columns, Vec<EncodedKey>)> {
36 let mut cmd_txn = engine.begin_command()?;
37
38 let _sub_def = match reifydb_catalog::find_subscription(
40 &mut Transaction::Command(&mut cmd_txn),
41 db_subscription_id,
42 )? {
43 Some(def) => def,
44 None => {
45 tracing::warn!("Subscription {} not found", db_subscription_id);
46 return Ok((Columns::empty(), Vec::new()));
47 }
48 };
49
50 let catalog = engine.catalog();
52 let schema_registry = &catalog.schema;
53
54 let range = if let Some(last_key) = last_consumed_key {
56 SubscriptionRowKeyRange::scan_range(db_subscription_id, Some(last_key))
57 } else {
58 SubscriptionRowKey::full_scan(db_subscription_id)
59 };
60
61 let mut stream = cmd_txn.range(range, batch_size)?;
62 let mut entries = Vec::new();
63 while let Some(result) = stream.next() {
64 entries.push(result?);
65 }
66 drop(stream); use std::collections::HashMap;
70 let mut column_data: HashMap<String, ColumnData> = HashMap::new();
71
72 let mut row_numbers = Vec::new();
73 let mut row_keys = Vec::new();
74
75 for entry in entries {
77 if let Some(Key::SubscriptionRow(sub_row_key)) = Key::decode(&entry.key) {
79 row_numbers.push(sub_row_key.row);
80 row_keys.push(entry.key.clone());
81
82 let fingerprint = entry.values.fingerprint();
84
85 let schema = schema_registry
87 .get_or_load(fingerprint, &mut Transaction::Command(&mut cmd_txn))?
88 .ok_or_else(|| {
89 Error(internal(format!(
90 "Schema not found for fingerprint: {:?}",
91 fingerprint
92 )))
93 })?;
94
95 for (idx, field) in schema.fields().iter().enumerate() {
97 let value = schema.get_value(&entry.values, idx);
98
99 column_data
101 .entry(field.name.clone())
102 .or_insert_with(|| {
103 ColumnData::with_capacity(field.constraint.get_type(), 0)
104 })
105 .push_value(value);
106 }
107 }
108 }
109
110 let columns: Vec<Column> = column_data
112 .into_iter()
113 .map(|(name, data)| Column {
114 name: Fragment::internal(&name),
115 data,
116 })
117 .collect();
118
119 Ok((Columns::with_row_numbers(columns, row_numbers), row_keys))
120 }
121
122 pub fn delete_rows(engine: &StandardEngine, row_keys: &[EncodedKey]) -> reifydb_type::Result<()> {
124 if row_keys.is_empty() {
125 return Ok(());
126 }
127
128 let mut delete_txn = engine.begin_command()?;
129
130 for key in row_keys {
131 delete_txn.remove(key)?;
132 }
133
134 delete_txn.commit()?;
135
136 debug!("Deleted {} consumed rows", row_keys.len());
137 Ok(())
138 }
139}