Skip to main content

reifydb_subscription/
poller.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4//! Push-based subscription poller.
5//!
6//! Polls all registered subscriptions and delivers data via the
7//! `SubscriptionDelivery` trait, decoupled from any specific transport.
8
9use reifydb_core::interface::catalog::id::SubscriptionId;
10use reifydb_engine::engine::StandardEngine;
11use reifydb_runtime::{actor::system::ActorSystem, sync::map::Map};
12use tracing::{debug, error};
13
14use crate::{
15	consumer::SubscriptionConsumer,
16	delivery::{DeliveryResult, SubscriptionDelivery},
17	state::ConsumptionState,
18};
19
20/// Subscription poller that consumes subscription data and delivers via a delivery trait.
21pub struct SubscriptionPoller {
22	/// Consumption state per subscription
23	/// Maps: subscription_id → ConsumptionState
24	states: Map<SubscriptionId, ConsumptionState>,
25	/// Batch size for reading rows per poll cycle
26	batch_size: usize,
27}
28
29impl SubscriptionPoller {
30	/// Create a new subscription poller.
31	///
32	/// # Arguments
33	///
34	/// * `batch_size` - Maximum number of rows to read per subscription per poll cycle
35	pub fn new(batch_size: usize) -> Self {
36		Self {
37			states: Map::new(),
38			batch_size,
39		}
40	}
41
42	/// Register a new subscription for polling.
43	///
44	/// Should be called when a client subscribes.
45	pub fn register(&self, subscription_id: SubscriptionId) {
46		self.states.insert(
47			subscription_id,
48			ConsumptionState {
49				db_subscription_id: subscription_id,
50				last_consumed_key: None,
51			},
52		);
53		debug!("Registered subscription {} for polling", subscription_id);
54	}
55
56	/// Unregister a subscription from polling.
57	///
58	/// Should be called when a client unsubscribes or disconnects.
59	pub fn unregister(&self, subscription_id: &SubscriptionId) {
60		self.states.remove(subscription_id);
61		debug!("Unregistered subscription {} from polling", subscription_id);
62	}
63
64	/// Poll all active subscriptions and deliver data via the delivery trait.
65	pub fn poll_all(&self, engine: &StandardEngine, system: &ActorSystem, delivery: &dyn SubscriptionDelivery) {
66		let subscription_ids: Vec<_> = self.states.keys();
67
68		for subscription_id in subscription_ids {
69			if let Err(e) = self.poll_single(subscription_id, engine, system, delivery) {
70				error!("Failed to poll subscription {}: {:?}", subscription_id, e);
71			}
72		}
73	}
74
75	/// Poll a single subscription and deliver data to the client.
76	fn poll_single(
77		&self,
78		subscription_id: SubscriptionId,
79		engine: &StandardEngine,
80		system: &ActorSystem,
81		delivery: &dyn SubscriptionDelivery,
82	) -> reifydb_type::Result<()> {
83		// Get consumption state
84		let consumption_state = match self.states.get(&subscription_id) {
85			Some(state) => state,
86			None => {
87				return Ok(());
88			}
89		};
90
91		let db_subscription_id = consumption_state.db_subscription_id;
92		let last_consumed_key = consumption_state.last_consumed_key.clone();
93		let batch_size = self.batch_size;
94		let engine_clone = engine.clone();
95
96		let read_result = system.install(move || {
97			SubscriptionConsumer::read_rows(
98				&engine_clone,
99				db_subscription_id,
100				last_consumed_key.as_ref(),
101				batch_size,
102			)
103		});
104
105		let (rows, row_keys) = read_result?;
106
107		if rows.is_empty() {
108			return Ok(());
109		}
110
111		// Deliver via the delivery trait
112		match delivery.try_deliver(&subscription_id, rows) {
113			DeliveryResult::Delivered => {
114				// Advance cursor BEFORE deletion for at-least-once guarantee.
115				// If delete fails, revert cursor so rows retry next poll.
116				let prev_cursor = row_keys.last().and_then(|last_key| {
117					self.states.with_write(&subscription_id, |state| {
118						let prev = state.last_consumed_key.clone();
119						state.last_consumed_key = Some(last_key.clone());
120						prev
121					})
122				});
123
124				let engine_clone = engine.clone();
125				let keys_to_delete: Vec<_> = row_keys.clone();
126
127				let delete_result = system.install(move || {
128					SubscriptionConsumer::delete_rows(&engine_clone, &keys_to_delete)
129				});
130
131				match delete_result {
132					Ok(()) => {}
133					Err(e) => {
134						// Revert cursor on delete failure
135						if let Some(prev) = prev_cursor {
136							self.states.with_write(&subscription_id, |state| {
137								state.last_consumed_key = prev;
138							});
139						}
140						return Err(e);
141					}
142				}
143			}
144			DeliveryResult::BackPressure => {
145				tracing::warn!("Back pressure for subscription {}, will retry", subscription_id);
146			}
147			DeliveryResult::Disconnected => {
148				debug!("Consumer disconnected for subscription {}, unregistering", subscription_id);
149				self.unregister(&subscription_id);
150			}
151		}
152
153		Ok(())
154	}
155}