reifydb_subscription/
poller.rs1use std::{
10 sync::{Arc, Mutex},
11 time::Duration,
12};
13
14use reifydb_core::interface::catalog::id::SubscriptionId;
15use reifydb_engine::engine::StandardEngine;
16use reifydb_runtime::{actor::system::ActorSystem, sync::map::Map};
17use reifydb_type::Result;
18use tokio::{select, sync::watch, task, time::interval};
19use tracing::{debug, error, warn};
20
21use crate::{
22 consumer::SubscriptionConsumer,
23 delivery::{DeliveryResult, SubscriptionDelivery},
24 state::ConsumptionState,
25};
26
27pub struct SubscriptionPoller {
29 states: Map<SubscriptionId, ConsumptionState>,
32 batch_size: usize,
34 keys_buf: Mutex<Vec<SubscriptionId>>,
36}
37
38impl SubscriptionPoller {
39 pub fn new(batch_size: usize) -> Self {
45 Self {
46 states: Map::new(),
47 batch_size,
48 keys_buf: Mutex::new(Vec::new()),
49 }
50 }
51
52 pub fn register(&self, subscription_id: SubscriptionId) {
56 self.states.insert(
57 subscription_id,
58 ConsumptionState {
59 db_subscription_id: subscription_id,
60 last_consumed_key: None,
61 },
62 );
63 debug!("Registered subscription {} for polling", subscription_id);
64 }
65
66 pub fn unregister(&self, subscription_id: &SubscriptionId) {
70 self.states.remove(subscription_id);
71 debug!("Unregistered subscription {} from polling", subscription_id);
72 }
73
74 pub fn poll_all(&self, engine: &StandardEngine, system: &ActorSystem, delivery: &dyn SubscriptionDelivery) {
76 let mut keys_buf = self.keys_buf.lock().unwrap();
77 self.states.keys_into(&mut keys_buf);
78
79 for &subscription_id in keys_buf.iter() {
80 if let Err(e) = self.poll_single(subscription_id, engine, system, delivery) {
81 error!("Failed to poll subscription {}: {:?}", subscription_id, e);
82 }
83 }
84 }
85
86 pub async fn run_loop(
88 self: Arc<Self>,
89 engine: StandardEngine,
90 system: ActorSystem,
91 delivery: Arc<dyn SubscriptionDelivery + Send + Sync>,
92 poll_interval: Duration,
93 mut stop_rx: watch::Receiver<bool>,
94 ) {
95 let mut tick = interval(poll_interval);
96 loop {
97 select! {
98 biased;
99
100 result = stop_rx.changed() => {
101 if result.is_err() || *stop_rx.borrow() {
102 debug!("Subscription poller shutting down");
103 break;
104 }
105 }
106
107 _ = tick.tick() => {
108 let p = self.clone();
109 let e = engine.clone();
110 let s = system.clone();
111 let d = delivery.clone();
112 let _ = task::spawn_blocking(move || {
113 p.poll_all(&e, &s, d.as_ref());
114 }).await;
115 }
116 }
117 }
118 }
119
120 fn poll_single(
122 &self,
123 subscription_id: SubscriptionId,
124 engine: &StandardEngine,
125 system: &ActorSystem,
126 delivery: &dyn SubscriptionDelivery,
127 ) -> Result<()> {
128 let consumption_state = match self.states.get(&subscription_id) {
130 Some(state) => state,
131 None => {
132 return Ok(());
133 }
134 };
135
136 let db_subscription_id = consumption_state.db_subscription_id;
137 let last_consumed_key = consumption_state.last_consumed_key.clone();
138 let batch_size = self.batch_size;
139 let engine_clone = engine.clone();
140
141 let read_result = system.install(move || {
142 SubscriptionConsumer::read_rows(
143 &engine_clone,
144 db_subscription_id,
145 last_consumed_key.as_ref(),
146 batch_size,
147 )
148 });
149
150 let (rows, row_keys) = read_result?;
151
152 if rows.is_empty() {
153 return Ok(());
154 }
155
156 match delivery.try_deliver(&subscription_id, rows) {
158 DeliveryResult::Delivered => {
159 let prev_cursor = row_keys.last().and_then(|last_key| {
162 self.states.with_write(&subscription_id, |state| {
163 let prev = state.last_consumed_key.clone();
164 state.last_consumed_key = Some(last_key.clone());
165 prev
166 })
167 });
168
169 let engine_clone = engine.clone();
170 let keys_to_delete: Vec<_> = row_keys.clone();
171
172 let delete_result = system.install(move || {
173 SubscriptionConsumer::delete_rows(&engine_clone, &keys_to_delete)
174 });
175
176 match delete_result {
177 Ok(()) => {}
178 Err(e) => {
179 if let Some(prev) = prev_cursor {
181 self.states.with_write(&subscription_id, |state| {
182 state.last_consumed_key = prev;
183 });
184 }
185 return Err(e);
186 }
187 }
188 }
189 DeliveryResult::BackPressure => {
190 warn!("Back pressure for subscription {}, will retry", subscription_id);
191 }
192 DeliveryResult::Disconnected => {
193 debug!("Consumer disconnected for subscription {}, unregistering", subscription_id);
194 self.unregister(&subscription_id);
195 }
196 }
197
198 Ok(())
199 }
200}