reifydb_subscription/
cursor.rs1use reifydb_core::{encoded::key::EncodedKey, interface::catalog::id::SubscriptionId, value::column::columns::Columns};
7use reifydb_engine::engine::StandardEngine;
8use reifydb_runtime::actor::system::ActorSystem;
9
10use crate::consumer::SubscriptionConsumer;
11
12pub struct SubscriptionCursor {
17 subscription_id: SubscriptionId,
18 batch_size: usize,
19 last_consumed_key: Option<EncodedKey>,
20 engine: StandardEngine,
21 system: ActorSystem,
22}
23
24impl SubscriptionCursor {
25 pub fn new(
27 subscription_id: SubscriptionId,
28 batch_size: usize,
29 engine: StandardEngine,
30 system: ActorSystem,
31 ) -> Self {
32 Self {
33 subscription_id,
34 batch_size,
35 last_consumed_key: None,
36 engine,
37 system,
38 }
39 }
40
41 pub fn next(&mut self) -> reifydb_type::Result<Option<Columns>> {
45 let sub_id = self.subscription_id;
46 let last_key = self.last_consumed_key.clone();
47 let batch_size = self.batch_size;
48 let engine_clone = self.engine.clone();
49
50 let read_result = self.system.install(move || {
51 SubscriptionConsumer::read_rows(&engine_clone, sub_id, last_key.as_ref(), batch_size)
52 });
53
54 let (rows, row_keys) = read_result?;
55
56 if rows.is_empty() {
57 return Ok(None);
58 }
59
60 let prev_cursor = self.last_consumed_key.clone();
62 if let Some(last_key) = row_keys.last() {
63 self.last_consumed_key = Some(last_key.clone());
64 }
65
66 let keys_to_delete = row_keys.clone();
68 let engine_clone = self.engine.clone();
69 let delete_result =
70 self.system.install(move || SubscriptionConsumer::delete_rows(&engine_clone, &keys_to_delete));
71
72 match delete_result {
73 Ok(()) => {}
74 Err(e) => {
75 self.last_consumed_key = prev_cursor;
76 return Err(e);
77 }
78 }
79
80 Ok(Some(rows))
81 }
82}