Skip to main content

reifydb_subscription/
cursor.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Pull-based subscription cursor for embedded/HTTP use.
5
6use reifydb_core::{encoded::key::EncodedKey, interface::catalog::id::SubscriptionId, value::column::columns::Columns};
7use reifydb_engine::engine::StandardEngine;
8use reifydb_runtime::actor::system::ActorSystem;
9use reifydb_type::Result;
10
11use crate::consumer::SubscriptionConsumer;
12
13/// Pull-based cursor for consuming subscription data.
14///
15/// Unlike the push-based `SubscriptionPoller`, the cursor allows consumers
16/// to pull data on demand, suitable for HTTP polling or embedded usage.
17pub struct SubscriptionCursor {
18	subscription_id: SubscriptionId,
19	batch_size: usize,
20	last_consumed_key: Option<EncodedKey>,
21	engine: StandardEngine,
22	system: ActorSystem,
23}
24
25impl SubscriptionCursor {
26	/// Create a new subscription cursor.
27	pub fn new(
28		subscription_id: SubscriptionId,
29		batch_size: usize,
30		engine: StandardEngine,
31		system: ActorSystem,
32	) -> Self {
33		Self {
34			subscription_id,
35			batch_size,
36			last_consumed_key: None,
37			engine,
38			system,
39		}
40	}
41
42	/// Fetch the next batch of subscription data.
43	///
44	/// Returns `None` if there is no new data available.
45	pub fn next(&mut self) -> Result<Option<Columns>> {
46		let sub_id = self.subscription_id;
47		let last_key = self.last_consumed_key.clone();
48		let batch_size = self.batch_size;
49		let engine_clone = self.engine.clone();
50
51		let read_result = self.system.install(move || {
52			SubscriptionConsumer::read_rows(&engine_clone, sub_id, last_key.as_ref(), batch_size)
53		});
54
55		let (rows, row_keys) = read_result?;
56
57		if rows.is_empty() {
58			return Ok(None);
59		}
60
61		// Advance cursor BEFORE deletion for at-least-once guarantee
62		let prev_cursor = self.last_consumed_key.clone();
63		if let Some(last_key) = row_keys.last() {
64			self.last_consumed_key = Some(last_key.clone());
65		}
66
67		// Delete consumed rows
68		let keys_to_delete = row_keys.clone();
69		let engine_clone = self.engine.clone();
70		let delete_result =
71			self.system.install(move || SubscriptionConsumer::delete_rows(&engine_clone, &keys_to_delete));
72
73		match delete_result {
74			Ok(()) => {}
75			Err(e) => {
76				self.last_consumed_key = prev_cursor;
77				return Err(e);
78			}
79		}
80
81		Ok(Some(rows))
82	}
83}