Skip to main content

reifydb_subscription/
cursor.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4//! Pull-based subscription cursor for embedded/HTTP use.
5
6use reifydb_core::{encoded::key::EncodedKey, interface::catalog::id::SubscriptionId, value::column::columns::Columns};
7use reifydb_engine::engine::StandardEngine;
8use reifydb_runtime::actor::system::ActorSystem;
9
10use crate::consumer::SubscriptionConsumer;
11
12/// Pull-based cursor for consuming subscription data.
13///
14/// Unlike the push-based `SubscriptionPoller`, the cursor allows consumers
15/// to pull data on demand, suitable for HTTP polling or embedded usage.
16pub struct SubscriptionCursor {
17	subscription_id: SubscriptionId,
18	batch_size: usize,
19	last_consumed_key: Option<EncodedKey>,
20	engine: StandardEngine,
21	system: ActorSystem,
22}
23
24impl SubscriptionCursor {
25	/// Create a new subscription cursor.
26	pub fn new(
27		subscription_id: SubscriptionId,
28		batch_size: usize,
29		engine: StandardEngine,
30		system: ActorSystem,
31	) -> Self {
32		Self {
33			subscription_id,
34			batch_size,
35			last_consumed_key: None,
36			engine,
37			system,
38		}
39	}
40
41	/// Fetch the next batch of subscription data.
42	///
43	/// Returns `None` if there is no new data available.
44	pub fn next(&mut self) -> reifydb_type::Result<Option<Columns>> {
45		let sub_id = self.subscription_id;
46		let last_key = self.last_consumed_key.clone();
47		let batch_size = self.batch_size;
48		let engine_clone = self.engine.clone();
49
50		let read_result = self.system.install(move || {
51			SubscriptionConsumer::read_rows(&engine_clone, sub_id, last_key.as_ref(), batch_size)
52		});
53
54		let (rows, row_keys) = read_result?;
55
56		if rows.is_empty() {
57			return Ok(None);
58		}
59
60		// Advance cursor BEFORE deletion for at-least-once guarantee
61		let prev_cursor = self.last_consumed_key.clone();
62		if let Some(last_key) = row_keys.last() {
63			self.last_consumed_key = Some(last_key.clone());
64		}
65
66		// Delete consumed rows
67		let keys_to_delete = row_keys.clone();
68		let engine_clone = self.engine.clone();
69		let delete_result =
70			self.system.install(move || SubscriptionConsumer::delete_rows(&engine_clone, &keys_to_delete));
71
72		match delete_result {
73			Ok(()) => {}
74			Err(e) => {
75				self.last_consumed_key = prev_cursor;
76				return Err(e);
77			}
78		}
79
80		Ok(Some(rows))
81	}
82}