reifydb_subscription/
cursor.rs1use reifydb_core::{encoded::key::EncodedKey, interface::catalog::id::SubscriptionId, value::column::columns::Columns};
7use reifydb_engine::engine::StandardEngine;
8use reifydb_runtime::actor::system::ActorSystem;
9use reifydb_type::Result;
10
11use crate::consumer::SubscriptionConsumer;
12
13pub struct SubscriptionCursor {
18 subscription_id: SubscriptionId,
19 batch_size: usize,
20 last_consumed_key: Option<EncodedKey>,
21 engine: StandardEngine,
22 system: ActorSystem,
23}
24
25impl SubscriptionCursor {
26 pub fn new(
28 subscription_id: SubscriptionId,
29 batch_size: usize,
30 engine: StandardEngine,
31 system: ActorSystem,
32 ) -> Self {
33 Self {
34 subscription_id,
35 batch_size,
36 last_consumed_key: None,
37 engine,
38 system,
39 }
40 }
41
42 pub fn next(&mut self) -> Result<Option<Columns>> {
46 let sub_id = self.subscription_id;
47 let last_key = self.last_consumed_key.clone();
48 let batch_size = self.batch_size;
49 let engine_clone = self.engine.clone();
50
51 let read_result = self.system.install(move || {
52 SubscriptionConsumer::read_rows(&engine_clone, sub_id, last_key.as_ref(), batch_size)
53 });
54
55 let (rows, row_keys) = read_result?;
56
57 if rows.is_empty() {
58 return Ok(None);
59 }
60
61 let prev_cursor = self.last_consumed_key.clone();
63 if let Some(last_key) = row_keys.last() {
64 self.last_consumed_key = Some(last_key.clone());
65 }
66
67 let keys_to_delete = row_keys.clone();
69 let engine_clone = self.engine.clone();
70 let delete_result =
71 self.system.install(move || SubscriptionConsumer::delete_rows(&engine_clone, &keys_to_delete));
72
73 match delete_result {
74 Ok(()) => {}
75 Err(e) => {
76 self.last_consumed_key = prev_cursor;
77 return Err(e);
78 }
79 }
80
81 Ok(Some(rows))
82 }
83}