1use std::{mem, ops::Bound, time::Duration};
5
6use reifydb_core::{
7 actors::cdc::CdcPollMessage,
8 common::CommitVersion,
9 encoded::key::EncodedKey,
10 interface::cdc::{Cdc, CdcConsumerId, SystemChange},
11 key::{EncodableKey, Key, cdc_consumer::CdcConsumerKey, kind::KeyKind},
12};
13use reifydb_runtime::actor::{
14 context::Context,
15 system::ActorConfig,
16 traits::{Actor, Directive},
17};
18use reifydb_transaction::transaction::Transaction;
19use reifydb_type::Result;
20use tracing::{debug, error};
21
22use super::{checkpoint::CdcCheckpoint, consumer::CdcConsume, host::CdcHost};
23use crate::storage::CdcStore;
24
25#[derive(Debug, Clone)]
27pub struct PollActorConfig {
28 pub consumer_id: CdcConsumerId,
30 pub poll_interval: Duration,
32 pub max_batch_size: Option<u64>,
34}
35
36pub struct PollActor<H: CdcHost, C: CdcConsume> {
37 config: PollActorConfig,
38 host: H,
39 consumer: Box<C>,
40 store: CdcStore,
41 consumer_key: EncodedKey,
42}
43
44impl<H: CdcHost, C: CdcConsume> PollActor<H, C> {
45 pub fn new(config: PollActorConfig, host: H, consumer: C, store: CdcStore) -> Self {
47 let consumer_key = CdcConsumerKey {
48 consumer: config.consumer_id.clone(),
49 }
50 .encode();
51
52 Self {
53 config,
54 host,
55 consumer: Box::new(consumer),
56 store,
57 consumer_key,
58 }
59 }
60}
61
62pub enum PollState {
64 Ready,
66 WaitingForWatermark {
68 current_version: CommitVersion,
69 retries_remaining: u8,
70 },
71 WaitingForConsume {
73 latest_version: CommitVersion,
75 count: usize,
77 },
78}
79
80impl<H: CdcHost, C: CdcConsume + Send + Sync + 'static> Actor for PollActor<H, C> {
81 type State = PollState;
82 type Message = CdcPollMessage;
83
84 fn init(&self, ctx: &Context<Self::Message>) -> Self::State {
85 debug!(
86 "[Consumer {:?}] Started polling with interval {:?}",
87 self.config.consumer_id, self.config.poll_interval
88 );
89
90 let _ = ctx.self_ref().send(CdcPollMessage::Poll);
92
93 PollState::Ready
94 }
95
96 fn handle(&self, state: &mut Self::State, msg: Self::Message, ctx: &Context<Self::Message>) -> Directive {
97 match msg {
98 CdcPollMessage::Poll => {
99 if !matches!(*state, PollState::Ready) {
101 return Directive::Continue;
102 }
103
104 if ctx.is_cancelled() {
105 debug!("[Consumer {:?}] Stopped", self.config.consumer_id);
106 return Directive::Stop;
107 }
108
109 let current_version = match self.host.current_version() {
111 Ok(v) => v,
112 Err(e) => {
113 error!(
114 "[Consumer {:?}] Error getting current version: {}",
115 self.config.consumer_id, e
116 );
117 ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
118 return Directive::Continue;
119 }
120 };
121
122 let done = self.host.done_until();
123 if done >= current_version {
124 self.start_consume(state, ctx);
126 } else {
127 *state = PollState::WaitingForWatermark {
128 current_version,
129 retries_remaining: 4,
130 };
131 ctx.schedule_once(Duration::from_millis(50), || CdcPollMessage::CheckWatermark);
132 }
133 }
134 CdcPollMessage::CheckWatermark => {
135 if let PollState::WaitingForWatermark {
136 current_version,
137 retries_remaining,
138 } = *state
139 {
140 let is_cancelled = ctx.is_cancelled();
141 if is_cancelled {
142 debug!("[Consumer {:?}] Stopped", self.config.consumer_id);
143 return Directive::Stop;
144 }
145
146 let done = self.host.done_until();
147 if done >= current_version {
148 *state = PollState::Ready;
150 self.start_consume(state, ctx);
151 } else if retries_remaining == 0 {
152 *state = PollState::Ready;
154 self.start_consume(state, ctx);
155 } else {
156 *state = PollState::WaitingForWatermark {
158 current_version,
159 retries_remaining: retries_remaining - 1,
160 };
161 ctx.schedule_once(Duration::from_millis(50), || {
162 CdcPollMessage::CheckWatermark
163 });
164 }
165 }
166 }
168 CdcPollMessage::ConsumeResponse(result) => {
169 if let PollState::WaitingForConsume {
171 latest_version,
172 count,
173 } = mem::replace(&mut *state, PollState::Ready)
174 {
175 self.finish_consume(state, ctx, latest_version, count, result);
176 }
177 }
178 }
179 Directive::Continue
180 }
181
182 fn config(&self) -> ActorConfig {
183 ActorConfig::new().mailbox_capacity(16)
184 }
185}
186
187impl<H: CdcHost, C: CdcConsume> PollActor<H, C> {
188 fn start_consume(&self, state: &mut PollState, ctx: &Context<CdcPollMessage>) {
191 *state = PollState::Ready;
192
193 let safe_version = self.host.done_until();
194
195 let mut query = match self.host.begin_query() {
196 Ok(q) => q,
197 Err(e) => {
198 error!("[Consumer {:?}] Error beginning query: {}", self.config.consumer_id, e);
199 ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
200 return;
201 }
202 };
203
204 let checkpoint = match CdcCheckpoint::fetch(&mut Transaction::Query(&mut query), &self.consumer_key) {
205 Ok(c) => c,
206 Err(e) => {
207 error!("[Consumer {:?}] Error fetching checkpoint: {}", self.config.consumer_id, e);
208 ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
209 return;
210 }
211 };
212
213 drop(query);
215
216 if safe_version <= checkpoint {
217 ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
219 return;
220 }
221
222 let transactions = match self.fetch_cdcs_until(checkpoint, safe_version) {
223 Ok(t) => t,
224 Err(e) => {
225 error!("[Consumer {:?}] Error fetching CDCs: {}", self.config.consumer_id, e);
226 ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
227 return;
228 }
229 };
230
231 if transactions.is_empty() {
232 ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
233 return;
234 }
235
236 let count = transactions.len();
237 let latest_version = transactions.iter().map(|tx| tx.version).max().unwrap_or(checkpoint);
238
239 let relevant_cdcs = transactions
241 .into_iter()
242 .filter(|cdc| {
243 !cdc.changes.is_empty()
245 || cdc.system_changes.iter().any(|sys_change| match sys_change {
246 SystemChange::Insert {
247 key,
248 ..
249 }
250 | SystemChange::Update {
251 key,
252 ..
253 }
254 | SystemChange::Delete {
255 key,
256 ..
257 } => {
258 if let Some(kind) = Key::kind(key) {
259 matches!(
260 kind,
261 KeyKind::Row
262 | KeyKind::Flow | KeyKind::FlowNode
263 | KeyKind::FlowNodeByFlow | KeyKind::FlowEdge
264 | KeyKind::FlowEdgeByFlow
265 | KeyKind::NamespaceFlow
266 )
267 } else {
268 false
269 }
270 }
271 })
272 })
273 .collect::<Vec<_>>();
274
275 if relevant_cdcs.is_empty() {
276 match self.host.begin_command() {
278 Ok(mut transaction) => {
279 match CdcCheckpoint::persist(
280 &mut transaction,
281 &self.consumer_key,
282 latest_version,
283 ) {
284 Ok(_) => {
285 let _ = transaction.commit();
286 }
287 Err(e) => {
288 error!(
289 "[Consumer {:?}] Error persisting checkpoint: {}",
290 self.config.consumer_id, e
291 );
292 let _ = transaction.rollback();
293 }
294 }
295 }
296 Err(e) => {
297 error!(
298 "[Consumer {:?}] Error beginning transaction for checkpoint: {}",
299 self.config.consumer_id, e
300 );
301 }
302 }
303 let _ = ctx.self_ref().send(CdcPollMessage::Poll);
305 return;
306 }
307
308 *state = PollState::WaitingForConsume {
310 latest_version,
311 count,
312 };
313
314 let self_ref = ctx.self_ref().clone();
316 let reply: Box<dyn FnOnce(Result<()>) + Send> = Box::new(move |result| {
317 let _ = self_ref.send(CdcPollMessage::ConsumeResponse(result));
318 });
319
320 self.consumer.consume(relevant_cdcs, reply);
321 }
322
323 fn finish_consume(
325 &self,
326 state: &mut PollState,
327 ctx: &Context<CdcPollMessage>,
328 latest_version: CommitVersion,
329 count: usize,
330 result: Result<()>,
331 ) {
332 *state = PollState::Ready;
333
334 match result {
335 Ok(()) => {
336 match self.host.begin_command() {
338 Ok(mut transaction) => {
339 match CdcCheckpoint::persist(
340 &mut transaction,
341 &self.consumer_key,
342 latest_version,
343 ) {
344 Ok(_) => {
345 match transaction.commit() {
346 Ok(_) => {
347 if count > 0 {
348 let _ = ctx.self_ref().send(
351 CdcPollMessage::Poll,
352 );
353 } else {
354 ctx.schedule_once(
355 self.config
356 .poll_interval,
357 || CdcPollMessage::Poll,
358 );
359 }
360 }
361 Err(e) => {
362 error!(
363 "[Consumer {:?}] Error committing checkpoint: {}",
364 self.config.consumer_id, e
365 );
366 ctx.schedule_once(
367 self.config.poll_interval,
368 || CdcPollMessage::Poll,
369 );
370 }
371 }
372 }
373 Err(e) => {
374 error!(
375 "[Consumer {:?}] Error persisting checkpoint: {}",
376 self.config.consumer_id, e
377 );
378 let _ = transaction.rollback();
379 ctx.schedule_once(self.config.poll_interval, || {
380 CdcPollMessage::Poll
381 });
382 }
383 }
384 }
385 Err(e) => {
386 error!(
387 "[Consumer {:?}] Error beginning checkpoint transaction: {}",
388 self.config.consumer_id, e
389 );
390 ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
391 }
392 }
393 }
394 Err(e) => {
395 error!("[Consumer {:?}] Error consuming events: {}", self.config.consumer_id, e);
396 ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
397 }
398 }
399 }
400
401 fn fetch_cdcs_until(&self, since_version: CommitVersion, until_version: CommitVersion) -> Result<Vec<Cdc>> {
402 let batch_size = self.config.max_batch_size.unwrap_or(1024);
403 let batch = self.store.read_range(
404 Bound::Excluded(since_version),
405 Bound::Included(until_version),
406 batch_size,
407 )?;
408 Ok(batch.items)
409 }
410}