reifydb_subscription/
poller.rs1use reifydb_core::interface::catalog::id::SubscriptionId;
10use reifydb_engine::engine::StandardEngine;
11use reifydb_runtime::{actor::system::ActorSystem, sync::map::Map};
12use tracing::{debug, error};
13
14use crate::{
15 consumer::SubscriptionConsumer,
16 delivery::{DeliveryResult, SubscriptionDelivery},
17 state::ConsumptionState,
18};
19
20pub struct SubscriptionPoller {
22 states: Map<SubscriptionId, ConsumptionState>,
25 batch_size: usize,
27}
28
29impl SubscriptionPoller {
30 pub fn new(batch_size: usize) -> Self {
36 Self {
37 states: Map::new(),
38 batch_size,
39 }
40 }
41
42 pub fn register(&self, subscription_id: SubscriptionId) {
46 self.states.insert(
47 subscription_id,
48 ConsumptionState {
49 db_subscription_id: subscription_id,
50 last_consumed_key: None,
51 },
52 );
53 debug!("Registered subscription {} for polling", subscription_id);
54 }
55
56 pub fn unregister(&self, subscription_id: &SubscriptionId) {
60 self.states.remove(subscription_id);
61 debug!("Unregistered subscription {} from polling", subscription_id);
62 }
63
64 pub fn poll_all(&self, engine: &StandardEngine, system: &ActorSystem, delivery: &dyn SubscriptionDelivery) {
66 let subscription_ids: Vec<_> = self.states.keys();
67
68 for subscription_id in subscription_ids {
69 if let Err(e) = self.poll_single(subscription_id, engine, system, delivery) {
70 error!("Failed to poll subscription {}: {:?}", subscription_id, e);
71 }
72 }
73 }
74
75 fn poll_single(
77 &self,
78 subscription_id: SubscriptionId,
79 engine: &StandardEngine,
80 system: &ActorSystem,
81 delivery: &dyn SubscriptionDelivery,
82 ) -> reifydb_type::Result<()> {
83 let consumption_state = match self.states.get(&subscription_id) {
85 Some(state) => state,
86 None => {
87 return Ok(());
88 }
89 };
90
91 let db_subscription_id = consumption_state.db_subscription_id;
92 let last_consumed_key = consumption_state.last_consumed_key.clone();
93 let batch_size = self.batch_size;
94 let engine_clone = engine.clone();
95
96 let read_result = system.install(move || {
97 SubscriptionConsumer::read_rows(
98 &engine_clone,
99 db_subscription_id,
100 last_consumed_key.as_ref(),
101 batch_size,
102 )
103 });
104
105 let (rows, row_keys) = read_result?;
106
107 if rows.is_empty() {
108 return Ok(());
109 }
110
111 match delivery.try_deliver(&subscription_id, rows) {
113 DeliveryResult::Delivered => {
114 let prev_cursor = row_keys.last().and_then(|last_key| {
117 self.states.with_write(&subscription_id, |state| {
118 let prev = state.last_consumed_key.clone();
119 state.last_consumed_key = Some(last_key.clone());
120 prev
121 })
122 });
123
124 let engine_clone = engine.clone();
125 let keys_to_delete: Vec<_> = row_keys.clone();
126
127 let delete_result = system.install(move || {
128 SubscriptionConsumer::delete_rows(&engine_clone, &keys_to_delete)
129 });
130
131 match delete_result {
132 Ok(()) => {}
133 Err(e) => {
134 if let Some(prev) = prev_cursor {
136 self.states.with_write(&subscription_id, |state| {
137 state.last_consumed_key = prev;
138 });
139 }
140 return Err(e);
141 }
142 }
143 }
144 DeliveryResult::BackPressure => {
145 tracing::warn!("Back pressure for subscription {}, will retry", subscription_id);
146 }
147 DeliveryResult::Disconnected => {
148 debug!("Consumer disconnected for subscription {}, unregistering", subscription_id);
149 self.unregister(&subscription_id);
150 }
151 }
152
153 Ok(())
154 }
155}