1use std::{mem, ops::Bound, time::Duration};
5
6use reifydb_core::{
7 common::CommitVersion,
8 encoded::key::EncodedKey,
9 interface::cdc::{Cdc, CdcConsumerId, SystemChange},
10 key::{EncodableKey, Key, cdc_consumer::CdcConsumerKey, kind::KeyKind},
11};
12use reifydb_runtime::actor::{
13 context::Context,
14 system::ActorConfig,
15 traits::{Actor, Directive},
16};
17use reifydb_transaction::transaction::Transaction;
18use reifydb_type::Result;
19use tracing::{debug, error};
20
21use super::{checkpoint::CdcCheckpoint, consumer::CdcConsume, host::CdcHost};
22use crate::storage::CdcStore;
23
24pub enum PollMsg {
25 Poll,
27 CheckWatermark,
29 ConsumeResponse(Result<()>),
31}
32
33#[derive(Debug, Clone)]
35pub struct PollActorConfig {
36 pub consumer_id: CdcConsumerId,
38 pub poll_interval: Duration,
40 pub max_batch_size: Option<u64>,
42}
43
44pub struct PollActor<H: CdcHost, C: CdcConsume> {
45 config: PollActorConfig,
46 host: H,
47 consumer: Box<C>,
48 store: CdcStore,
49 consumer_key: EncodedKey,
50}
51
52impl<H: CdcHost, C: CdcConsume> PollActor<H, C> {
53 pub fn new(config: PollActorConfig, host: H, consumer: C, store: CdcStore) -> Self {
55 let consumer_key = CdcConsumerKey {
56 consumer: config.consumer_id.clone(),
57 }
58 .encode();
59
60 Self {
61 config,
62 host,
63 consumer: Box::new(consumer),
64 store,
65 consumer_key,
66 }
67 }
68}
69
70pub enum PollState {
72 Ready,
74 WaitingForWatermark {
76 current_version: CommitVersion,
77 retries_remaining: u8,
78 },
79 WaitingForConsume {
81 latest_version: CommitVersion,
83 count: usize,
85 },
86}
87
88impl<H: CdcHost, C: CdcConsume + Send + Sync + 'static> Actor for PollActor<H, C> {
89 type State = PollState;
90 type Message = PollMsg;
91
92 fn init(&self, ctx: &Context<Self::Message>) -> Self::State {
93 debug!(
94 "[Consumer {:?}] Started polling with interval {:?}",
95 self.config.consumer_id, self.config.poll_interval
96 );
97
98 let _ = ctx.self_ref().send(PollMsg::Poll);
100
101 PollState::Ready
102 }
103
104 fn handle(&self, state: &mut Self::State, msg: Self::Message, ctx: &Context<Self::Message>) -> Directive {
105 match msg {
106 PollMsg::Poll => {
107 if !matches!(*state, PollState::Ready) {
109 return Directive::Continue;
110 }
111
112 if ctx.is_cancelled() {
113 debug!("[Consumer {:?}] Stopped", self.config.consumer_id);
114 return Directive::Stop;
115 }
116
117 let current_version = match self.host.current_version() {
119 Ok(v) => v,
120 Err(e) => {
121 error!(
122 "[Consumer {:?}] Error getting current version: {}",
123 self.config.consumer_id, e
124 );
125 ctx.schedule_once(self.config.poll_interval, || PollMsg::Poll);
126 return Directive::Continue;
127 }
128 };
129
130 let done = self.host.done_until();
131 if done >= current_version {
132 self.start_consume(state, ctx);
134 } else {
135 *state = PollState::WaitingForWatermark {
136 current_version,
137 retries_remaining: 4,
138 };
139 ctx.schedule_once(Duration::from_millis(50), || PollMsg::CheckWatermark);
140 }
141 }
142 PollMsg::CheckWatermark => {
143 if let PollState::WaitingForWatermark {
144 current_version,
145 retries_remaining,
146 } = *state
147 {
148 let is_cancelled = ctx.is_cancelled();
149 if is_cancelled {
150 debug!("[Consumer {:?}] Stopped", self.config.consumer_id);
151 return Directive::Stop;
152 }
153
154 let done = self.host.done_until();
155 if done >= current_version {
156 *state = PollState::Ready;
158 self.start_consume(state, ctx);
159 } else if retries_remaining == 0 {
160 *state = PollState::Ready;
162 self.start_consume(state, ctx);
163 } else {
164 *state = PollState::WaitingForWatermark {
166 current_version,
167 retries_remaining: retries_remaining - 1,
168 };
169 ctx.schedule_once(Duration::from_millis(50), || {
170 PollMsg::CheckWatermark
171 });
172 }
173 }
174 }
176 PollMsg::ConsumeResponse(result) => {
177 if let PollState::WaitingForConsume {
179 latest_version,
180 count,
181 } = mem::replace(&mut *state, PollState::Ready)
182 {
183 self.finish_consume(state, ctx, latest_version, count, result);
184 }
185 }
186 }
187 Directive::Continue
188 }
189
190 fn config(&self) -> ActorConfig {
191 ActorConfig::new().mailbox_capacity(16)
192 }
193}
194
195impl<H: CdcHost, C: CdcConsume> PollActor<H, C> {
196 fn start_consume(&self, state: &mut PollState, ctx: &Context<PollMsg>) {
199 *state = PollState::Ready;
200
201 let safe_version = self.host.done_until();
202
203 let mut query = match self.host.begin_query() {
204 Ok(q) => q,
205 Err(e) => {
206 error!("[Consumer {:?}] Error beginning query: {}", self.config.consumer_id, e);
207 ctx.schedule_once(self.config.poll_interval, || PollMsg::Poll);
208 return;
209 }
210 };
211
212 let checkpoint = match CdcCheckpoint::fetch(&mut Transaction::Query(&mut query), &self.consumer_key) {
213 Ok(c) => c,
214 Err(e) => {
215 error!("[Consumer {:?}] Error fetching checkpoint: {}", self.config.consumer_id, e);
216 ctx.schedule_once(self.config.poll_interval, || PollMsg::Poll);
217 return;
218 }
219 };
220
221 drop(query);
223
224 if safe_version <= checkpoint {
225 ctx.schedule_once(self.config.poll_interval, || PollMsg::Poll);
227 return;
228 }
229
230 let transactions = match self.fetch_cdcs_until(checkpoint, safe_version) {
231 Ok(t) => t,
232 Err(e) => {
233 error!("[Consumer {:?}] Error fetching CDCs: {}", self.config.consumer_id, e);
234 ctx.schedule_once(self.config.poll_interval, || PollMsg::Poll);
235 return;
236 }
237 };
238
239 if transactions.is_empty() {
240 ctx.schedule_once(self.config.poll_interval, || PollMsg::Poll);
241 return;
242 }
243
244 let count = transactions.len();
245 let latest_version = transactions.iter().map(|tx| tx.version).max().unwrap_or(checkpoint);
246
247 let relevant_cdcs = transactions
249 .into_iter()
250 .filter(|cdc| {
251 !cdc.changes.is_empty()
253 || cdc.system_changes.iter().any(|sys_change| match sys_change {
254 SystemChange::Insert {
255 key,
256 ..
257 }
258 | SystemChange::Update {
259 key,
260 ..
261 }
262 | SystemChange::Delete {
263 key,
264 ..
265 } => {
266 if let Some(kind) = Key::kind(key) {
267 matches!(
268 kind,
269 KeyKind::Row
270 | KeyKind::Flow | KeyKind::FlowNode
271 | KeyKind::FlowNodeByFlow | KeyKind::FlowEdge
272 | KeyKind::FlowEdgeByFlow
273 | KeyKind::NamespaceFlow
274 )
275 } else {
276 false
277 }
278 }
279 })
280 })
281 .collect::<Vec<_>>();
282
283 if relevant_cdcs.is_empty() {
284 match self.host.begin_command() {
286 Ok(mut transaction) => {
287 match CdcCheckpoint::persist(
288 &mut transaction,
289 &self.consumer_key,
290 latest_version,
291 ) {
292 Ok(_) => {
293 let _ = transaction.commit();
294 }
295 Err(e) => {
296 error!(
297 "[Consumer {:?}] Error persisting checkpoint: {}",
298 self.config.consumer_id, e
299 );
300 let _ = transaction.rollback();
301 }
302 }
303 }
304 Err(e) => {
305 error!(
306 "[Consumer {:?}] Error beginning transaction for checkpoint: {}",
307 self.config.consumer_id, e
308 );
309 }
310 }
311 let _ = ctx.self_ref().send(PollMsg::Poll);
313 return;
314 }
315
316 *state = PollState::WaitingForConsume {
318 latest_version,
319 count,
320 };
321
322 let self_ref = ctx.self_ref().clone();
324 let reply: Box<dyn FnOnce(Result<()>) + Send> = Box::new(move |result| {
325 let _ = self_ref.send(PollMsg::ConsumeResponse(result));
326 });
327
328 self.consumer.consume(relevant_cdcs, reply);
329 }
330
331 fn finish_consume(
333 &self,
334 state: &mut PollState,
335 ctx: &Context<PollMsg>,
336 latest_version: CommitVersion,
337 count: usize,
338 result: Result<()>,
339 ) {
340 *state = PollState::Ready;
341
342 match result {
343 Ok(()) => {
344 match self.host.begin_command() {
346 Ok(mut transaction) => {
347 match CdcCheckpoint::persist(
348 &mut transaction,
349 &self.consumer_key,
350 latest_version,
351 ) {
352 Ok(_) => {
353 match transaction.commit() {
354 Ok(_) => {
355 if count > 0 {
356 let _ = ctx
359 .self_ref()
360 .send(PollMsg::Poll);
361 } else {
362 ctx.schedule_once(
363 self.config
364 .poll_interval,
365 || PollMsg::Poll,
366 );
367 }
368 }
369 Err(e) => {
370 error!(
371 "[Consumer {:?}] Error committing checkpoint: {}",
372 self.config.consumer_id, e
373 );
374 ctx.schedule_once(
375 self.config.poll_interval,
376 || PollMsg::Poll,
377 );
378 }
379 }
380 }
381 Err(e) => {
382 error!(
383 "[Consumer {:?}] Error persisting checkpoint: {}",
384 self.config.consumer_id, e
385 );
386 let _ = transaction.rollback();
387 ctx.schedule_once(self.config.poll_interval, || {
388 PollMsg::Poll
389 });
390 }
391 }
392 }
393 Err(e) => {
394 error!(
395 "[Consumer {:?}] Error beginning checkpoint transaction: {}",
396 self.config.consumer_id, e
397 );
398 ctx.schedule_once(self.config.poll_interval, || PollMsg::Poll);
399 }
400 }
401 }
402 Err(e) => {
403 error!("[Consumer {:?}] Error consuming events: {}", self.config.consumer_id, e);
404 ctx.schedule_once(self.config.poll_interval, || PollMsg::Poll);
405 }
406 }
407 }
408
409 fn fetch_cdcs_until(&self, since_version: CommitVersion, until_version: CommitVersion) -> Result<Vec<Cdc>> {
410 let batch_size = self.config.max_batch_size.unwrap_or(1024);
411 let batch = self.store.read_range(
412 Bound::Excluded(since_version),
413 Bound::Included(until_version),
414 batch_size,
415 )?;
416 Ok(batch.items)
417 }
418}