Skip to main content

reifydb_subscription/
poller.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Push-based subscription poller.
5//!
6//! Polls all registered subscriptions and delivers data via the
7//! `SubscriptionDelivery` trait, decoupled from any specific transport.
8
9use std::{
10	sync::{Arc, Mutex},
11	time::Duration,
12};
13
14use reifydb_core::interface::catalog::id::SubscriptionId;
15use reifydb_engine::engine::StandardEngine;
16use reifydb_runtime::{actor::system::ActorSystem, sync::map::Map};
17use reifydb_type::Result;
18use tokio::{select, sync::watch, task, time::interval};
19use tracing::{debug, error, warn};
20
21use crate::{
22	consumer::SubscriptionConsumer,
23	delivery::{DeliveryResult, SubscriptionDelivery},
24	state::ConsumptionState,
25};
26
27/// Subscription poller that consumes subscription data and delivers via a delivery trait.
28pub struct SubscriptionPoller {
29	/// Consumption state per subscription
30	/// Maps: subscription_id → ConsumptionState
31	states: Map<SubscriptionId, ConsumptionState>,
32	/// Batch size for reading rows per poll cycle
33	batch_size: usize,
34	/// Reusable buffer for collecting subscription keys, avoiding allocation per poll cycle
35	keys_buf: Mutex<Vec<SubscriptionId>>,
36}
37
38impl SubscriptionPoller {
39	/// Create a new subscription poller.
40	///
41	/// # Arguments
42	///
43	/// * `batch_size` - Maximum number of rows to read per subscription per poll cycle
44	pub fn new(batch_size: usize) -> Self {
45		Self {
46			states: Map::new(),
47			batch_size,
48			keys_buf: Mutex::new(Vec::new()),
49		}
50	}
51
52	/// Register a new subscription for polling.
53	///
54	/// Should be called when a client subscribes.
55	pub fn register(&self, subscription_id: SubscriptionId) {
56		self.states.insert(
57			subscription_id,
58			ConsumptionState {
59				db_subscription_id: subscription_id,
60				last_consumed_key: None,
61			},
62		);
63		debug!("Registered subscription {} for polling", subscription_id);
64	}
65
66	/// Unregister a subscription from polling.
67	///
68	/// Should be called when a client unsubscribes or disconnects.
69	pub fn unregister(&self, subscription_id: &SubscriptionId) {
70		self.states.remove(subscription_id);
71		debug!("Unregistered subscription {} from polling", subscription_id);
72	}
73
74	/// Poll all active subscriptions and deliver data via the delivery trait.
75	pub fn poll_all(&self, engine: &StandardEngine, system: &ActorSystem, delivery: &dyn SubscriptionDelivery) {
76		let mut keys_buf = self.keys_buf.lock().unwrap();
77		self.states.keys_into(&mut keys_buf);
78
79		for &subscription_id in keys_buf.iter() {
80			if let Err(e) = self.poll_single(subscription_id, engine, system, delivery) {
81				error!("Failed to poll subscription {}: {:?}", subscription_id, e);
82			}
83		}
84	}
85
86	/// Run the poller loop. Returns when the stop signal is received.
87	pub async fn run_loop(
88		self: Arc<Self>,
89		engine: StandardEngine,
90		system: ActorSystem,
91		delivery: Arc<dyn SubscriptionDelivery + Send + Sync>,
92		poll_interval: Duration,
93		mut stop_rx: watch::Receiver<bool>,
94	) {
95		let mut tick = interval(poll_interval);
96		loop {
97			select! {
98				biased;
99
100				result = stop_rx.changed() => {
101					if result.is_err() || *stop_rx.borrow() {
102						debug!("Subscription poller shutting down");
103						break;
104					}
105				}
106
107				_ = tick.tick() => {
108					let p = self.clone();
109					let e = engine.clone();
110					let s = system.clone();
111					let d = delivery.clone();
112					let _ = task::spawn_blocking(move || {
113						p.poll_all(&e, &s, d.as_ref());
114					}).await;
115				}
116			}
117		}
118	}
119
120	/// Poll a single subscription and deliver data to the client.
121	fn poll_single(
122		&self,
123		subscription_id: SubscriptionId,
124		engine: &StandardEngine,
125		system: &ActorSystem,
126		delivery: &dyn SubscriptionDelivery,
127	) -> Result<()> {
128		// Get consumption state
129		let consumption_state = match self.states.get(&subscription_id) {
130			Some(state) => state,
131			None => {
132				return Ok(());
133			}
134		};
135
136		let db_subscription_id = consumption_state.db_subscription_id;
137		let last_consumed_key = consumption_state.last_consumed_key.clone();
138		let batch_size = self.batch_size;
139		let engine_clone = engine.clone();
140
141		let read_result = system.install(move || {
142			SubscriptionConsumer::read_rows(
143				&engine_clone,
144				db_subscription_id,
145				last_consumed_key.as_ref(),
146				batch_size,
147			)
148		});
149
150		let (rows, row_keys) = read_result?;
151
152		if rows.is_empty() {
153			return Ok(());
154		}
155
156		// Deliver via the delivery trait
157		match delivery.try_deliver(&subscription_id, rows) {
158			DeliveryResult::Delivered => {
159				// Advance cursor BEFORE deletion for at-least-once guarantee.
160				// If delete fails, revert cursor so rows retry next poll.
161				let prev_cursor = row_keys.last().and_then(|last_key| {
162					self.states.with_write(&subscription_id, |state| {
163						let prev = state.last_consumed_key.clone();
164						state.last_consumed_key = Some(last_key.clone());
165						prev
166					})
167				});
168
169				let engine_clone = engine.clone();
170				let keys_to_delete: Vec<_> = row_keys.clone();
171
172				let delete_result = system.install(move || {
173					SubscriptionConsumer::delete_rows(&engine_clone, &keys_to_delete)
174				});
175
176				match delete_result {
177					Ok(()) => {}
178					Err(e) => {
179						// Revert cursor on delete failure
180						if let Some(prev) = prev_cursor {
181							self.states.with_write(&subscription_id, |state| {
182								state.last_consumed_key = prev;
183							});
184						}
185						return Err(e);
186					}
187				}
188			}
189			DeliveryResult::BackPressure => {
190				warn!("Back pressure for subscription {}, will retry", subscription_id);
191			}
192			DeliveryResult::Disconnected => {
193				debug!("Consumer disconnected for subscription {}, unregistering", subscription_id);
194				self.unregister(&subscription_id);
195			}
196		}
197
198		Ok(())
199	}
200}