reifydb_subscription/
consumer.rs1use std::collections::{HashMap, HashSet};
9
10use reifydb_catalog::find_subscription;
11use reifydb_core::{
12 encoded::key::EncodedKey,
13 error::diagnostic::internal::internal,
14 interface::catalog::id::SubscriptionId,
15 key::{
16 Key,
17 subscription_row::{SubscriptionRowKey, SubscriptionRowKeyRange},
18 },
19 value::column::{Column, columns::Columns, data::ColumnData},
20};
21use reifydb_engine::engine::StandardEngine;
22use reifydb_transaction::transaction::Transaction;
23use reifydb_type::{Result, error::Error, fragment::Fragment, value::identity::IdentityId};
24use tracing::{debug, warn};
25
26pub struct SubscriptionConsumer;
28
29impl SubscriptionConsumer {
30 pub fn read_rows(
34 engine: &StandardEngine,
35 db_subscription_id: SubscriptionId,
36 last_consumed_key: Option<&EncodedKey>,
37 batch_size: usize,
38 ) -> Result<(Columns, Vec<EncodedKey>)> {
39 let mut cmd_txn = engine.begin_command(IdentityId::system())?;
40
41 let _sub_def = match find_subscription(&mut Transaction::Command(&mut cmd_txn), db_subscription_id)? {
43 Some(def) => def,
44 None => {
45 warn!("Subscription {} not found", db_subscription_id);
46 return Ok((Columns::empty(), Vec::new()));
47 }
48 };
49
50 let catalog = engine.catalog();
52 let row_schema_registry = &catalog.schema;
53
54 let range = if let Some(last_key) = last_consumed_key {
56 SubscriptionRowKeyRange::scan_range(db_subscription_id, Some(last_key))
57 } else {
58 SubscriptionRowKey::full_scan(db_subscription_id)
59 };
60
61 let mut stream = cmd_txn.range(range, batch_size)?;
62 let mut entries = Vec::new();
63 while let Some(result) = stream.next() {
64 entries.push(result?);
65 }
66 drop(stream); let mut column_data: HashMap<String, ColumnData> = HashMap::new();
70
71 let mut row_numbers = Vec::new();
72 let mut row_keys = Vec::new();
73 let mut row_count: usize = 0;
74
75 for entry in entries {
77 if let Some(Key::SubscriptionRow(sub_row_key)) = Key::decode(&entry.key) {
79 row_numbers.push(sub_row_key.row);
80 row_keys.push(entry.key.clone());
81
82 let fingerprint = entry.row.fingerprint();
84
85 let schema = row_schema_registry
87 .get_or_load(fingerprint, &mut Transaction::Command(&mut cmd_txn))?
88 .ok_or_else(|| {
89 Error(internal(format!(
90 "Schema not found for fingerprint: {:?}",
91 fingerprint
92 )))
93 })?;
94
95 let mut seen_in_this_entry = HashSet::new();
96
97 for (idx, field) in schema.fields().iter().enumerate() {
99 let value = schema.get_value(&entry.row, idx);
100 seen_in_this_entry.insert(field.name.clone());
101
102 column_data
104 .entry(field.name.clone())
105 .or_insert_with(|| {
106 let mut cd = ColumnData::with_capacity(
108 field.constraint.get_type(),
109 0,
110 );
111 for _ in 0..row_count {
112 cd.push_none();
113 }
114 cd
115 })
116 .push_value(value);
117 }
118
119 for (name, col) in column_data.iter_mut() {
121 if !seen_in_this_entry.contains(name) {
122 col.push_none();
123 }
124 }
125
126 row_count += 1;
127 }
128 }
129
130 let columns: Vec<Column> = column_data
132 .into_iter()
133 .map(|(name, data)| Column {
134 name: Fragment::internal(&name),
135 data,
136 })
137 .collect();
138
139 Ok((Columns::with_row_numbers(columns, row_numbers), row_keys))
140 }
141
142 pub fn delete_rows(engine: &StandardEngine, row_keys: &[EncodedKey]) -> Result<()> {
144 if row_keys.is_empty() {
145 return Ok(());
146 }
147
148 let mut delete_txn = engine.begin_command(IdentityId::system())?;
149
150 for key in row_keys {
151 delete_txn.remove(key)?;
152 }
153
154 delete_txn.commit()?;
155
156 debug!("Deleted {} consumed rows", row_keys.len());
157 Ok(())
158 }
159}