1use std::{mem, ops::Bound, time::Duration};
5
6use reifydb_core::{
7 actors::cdc::CdcPollMessage,
8 common::CommitVersion,
9 encoded::key::EncodedKey,
10 interface::cdc::{Cdc, CdcConsumerId, SystemChange},
11 key::{EncodableKey, Key, cdc_consumer::CdcConsumerKey, kind::KeyKind},
12};
13use reifydb_runtime::actor::{
14 context::Context,
15 system::ActorConfig,
16 traits::{Actor, Directive},
17};
18use reifydb_transaction::transaction::Transaction;
19use reifydb_type::{Result, error::Error};
20use tracing::{debug, error};
21
22use super::{checkpoint::CdcCheckpoint, consumer::CdcConsume, host::CdcHost};
23use crate::storage::CdcStore;
24
25#[derive(Debug, Clone)]
27pub struct PollActorConfig {
28 pub consumer_id: CdcConsumerId,
30 pub poll_interval: Duration,
32 pub max_batch_size: Option<u64>,
34}
35
36pub struct PollActor<H: CdcHost, C: CdcConsume> {
37 config: PollActorConfig,
38 host: H,
39 consumer: Box<C>,
40 store: CdcStore,
41 consumer_key: EncodedKey,
42}
43
44impl<H: CdcHost, C: CdcConsume> PollActor<H, C> {
45 pub fn new(config: PollActorConfig, host: H, consumer: C, store: CdcStore) -> Self {
47 let consumer_key = CdcConsumerKey {
48 consumer: config.consumer_id.clone(),
49 }
50 .encode();
51
52 Self {
53 config,
54 host,
55 consumer: Box::new(consumer),
56 store,
57 consumer_key,
58 }
59 }
60}
61
62pub enum Phase {
64 Ready,
66 WaitingForWatermark {
68 current_version: CommitVersion,
69 retries_remaining: u8,
70 },
71 WaitingForConsume {
73 latest_version: CommitVersion,
75 count: usize,
77 },
78}
79
80pub struct PollState {
81 phase: Phase,
82 cached_checkpoint: Option<CommitVersion>,
89}
90
91impl<H: CdcHost, C: CdcConsume + Send + Sync + 'static> Actor for PollActor<H, C> {
92 type State = PollState;
93 type Message = CdcPollMessage;
94
95 fn init(&self, ctx: &Context<Self::Message>) -> Self::State {
96 debug!(
97 "[Consumer {:?}] Started polling with interval {:?}",
98 self.config.consumer_id, self.config.poll_interval
99 );
100
101 let _ = ctx.self_ref().send(CdcPollMessage::Poll);
103
104 PollState {
105 phase: Phase::Ready,
106 cached_checkpoint: None,
107 }
108 }
109
110 fn handle(&self, state: &mut Self::State, msg: Self::Message, ctx: &Context<Self::Message>) -> Directive {
111 match msg {
112 CdcPollMessage::Poll => self.on_poll(state, ctx),
113 CdcPollMessage::CheckWatermark => self.on_check_watermark(state, ctx),
114 CdcPollMessage::ConsumeResponse(result) => self.on_consume_response(state, ctx, result),
115 }
116 }
117
118 fn config(&self) -> ActorConfig {
119 ActorConfig::new().mailbox_capacity(16)
120 }
121}
122
123impl<H: CdcHost, C: CdcConsume> PollActor<H, C> {
124 #[inline]
125 fn on_poll(&self, state: &mut PollState, ctx: &Context<CdcPollMessage>) -> Directive {
126 if !matches!(state.phase, Phase::Ready) {
127 return Directive::Continue;
128 }
129 if ctx.is_cancelled() {
130 debug!("[Consumer {:?}] Stopped", self.config.consumer_id);
131 return Directive::Stop;
132 }
133 let current_version = match self.host.current_version() {
134 Ok(v) => v,
135 Err(e) => {
136 error!("[Consumer {:?}] Error getting current version: {}", self.config.consumer_id, e);
137 ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
138 return Directive::Continue;
139 }
140 };
141 if self.host.done_until() >= current_version {
142 self.start_consume(state, ctx);
143 } else {
144 state.phase = Phase::WaitingForWatermark {
145 current_version,
146 retries_remaining: 4,
147 };
148 ctx.schedule_once(Duration::from_millis(50), || CdcPollMessage::CheckWatermark);
149 }
150 Directive::Continue
151 }
152
153 #[inline]
154 fn on_check_watermark(&self, state: &mut PollState, ctx: &Context<CdcPollMessage>) -> Directive {
155 let Phase::WaitingForWatermark {
156 current_version,
157 retries_remaining,
158 } = state.phase
159 else {
160 return Directive::Continue;
161 };
162 if ctx.is_cancelled() {
163 debug!("[Consumer {:?}] Stopped", self.config.consumer_id);
164 return Directive::Stop;
165 }
166 let watermark_ready = self.host.done_until() >= current_version;
167 let timed_out = retries_remaining == 0;
168 if watermark_ready || timed_out {
169 state.phase = Phase::Ready;
170 self.start_consume(state, ctx);
171 } else {
172 state.phase = Phase::WaitingForWatermark {
173 current_version,
174 retries_remaining: retries_remaining - 1,
175 };
176 ctx.schedule_once(Duration::from_millis(50), || CdcPollMessage::CheckWatermark);
177 }
178 Directive::Continue
179 }
180
181 #[inline]
182 fn on_consume_response(
183 &self,
184 state: &mut PollState,
185 ctx: &Context<CdcPollMessage>,
186 result: Result<()>,
187 ) -> Directive {
188 if let Phase::WaitingForConsume {
189 latest_version,
190 count,
191 } = mem::replace(&mut state.phase, Phase::Ready)
192 {
193 self.finish_consume(state, ctx, latest_version, count, result);
194 }
195 Directive::Continue
196 }
197
198 fn start_consume(&self, state: &mut PollState, ctx: &Context<CdcPollMessage>) {
199 state.phase = Phase::Ready;
200 let safe_version = self.host.done_until();
201
202 let Some(checkpoint) = self.resolve_checkpoint(state, ctx) else {
203 return;
204 };
205 if safe_version <= checkpoint {
206 ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
207 return;
208 }
209
210 let Some(transactions) = self.fetch_or_reschedule(checkpoint, safe_version, ctx) else {
211 return;
212 };
213 if transactions.is_empty() {
214 ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
215 return;
216 }
217
218 let (count, latest_version) = summarize_batch(checkpoint, &transactions);
219 let relevant_cdcs: Vec<Cdc> = transactions.into_iter().filter(is_relevant_cdc).collect();
220
221 if relevant_cdcs.is_empty() {
222 self.advance_checkpoint_skip_ahead(state, ctx, latest_version);
223 return;
224 }
225
226 state.phase = Phase::WaitingForConsume {
227 latest_version,
228 count,
229 };
230 self.dispatch_to_consumer(relevant_cdcs, ctx);
231 }
232
233 #[inline]
239 fn advance_checkpoint_skip_ahead(
240 &self,
241 state: &mut PollState,
242 ctx: &Context<CdcPollMessage>,
243 latest_version: CommitVersion,
244 ) {
245 state.cached_checkpoint = Some(latest_version);
246 let _ = ctx.self_ref().send(CdcPollMessage::Poll);
247 }
248
249 #[inline]
250 fn resolve_checkpoint(&self, state: &mut PollState, ctx: &Context<CdcPollMessage>) -> Option<CommitVersion> {
251 if let Some(v) = state.cached_checkpoint {
252 return Some(v);
253 }
254 let v = self.seed_checkpoint_from_durable(ctx)?;
255 state.cached_checkpoint = Some(v);
256 Some(v)
257 }
258
259 #[inline]
263 fn seed_checkpoint_from_durable(&self, ctx: &Context<CdcPollMessage>) -> Option<CommitVersion> {
264 let mut query = match self.host.begin_query() {
265 Ok(q) => q,
266 Err(e) => {
267 error!("[Consumer {:?}] Error beginning query: {}", self.config.consumer_id, e);
268 ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
269 return None;
270 }
271 };
272 let v = match CdcCheckpoint::fetch(&mut Transaction::Query(&mut query), &self.consumer_key) {
273 Ok(c) => c,
274 Err(e) => {
275 error!("[Consumer {:?}] Error fetching checkpoint: {}", self.config.consumer_id, e);
276 ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
277 return None;
278 }
279 };
280 drop(query);
281 Some(v)
282 }
283
284 #[inline]
285 fn fetch_or_reschedule(
286 &self,
287 checkpoint: CommitVersion,
288 safe_version: CommitVersion,
289 ctx: &Context<CdcPollMessage>,
290 ) -> Option<Vec<Cdc>> {
291 match self.fetch_cdcs_until(checkpoint, safe_version) {
292 Ok(t) => Some(t),
293 Err(e) => {
294 error!("[Consumer {:?}] Error fetching CDCs: {}", self.config.consumer_id, e);
295 ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
296 None
297 }
298 }
299 }
300
301 #[inline]
302 fn dispatch_to_consumer(&self, cdcs: Vec<Cdc>, ctx: &Context<CdcPollMessage>) {
303 let self_ref = ctx.self_ref().clone();
304 let reply: Box<dyn FnOnce(Result<()>) + Send> = Box::new(move |result| {
305 let _ = self_ref.send(CdcPollMessage::ConsumeResponse(result));
306 });
307 self.consumer.consume(cdcs, reply);
308 }
309
310 fn finish_consume(
311 &self,
312 state: &mut PollState,
313 ctx: &Context<CdcPollMessage>,
314 latest_version: CommitVersion,
315 count: usize,
316 result: Result<()>,
317 ) {
318 state.phase = Phase::Ready;
319 match result {
320 Ok(()) => self.advance_after_success(state, ctx, latest_version, count),
321 Err(e) => self.reschedule_after_error(ctx, e),
322 }
323 }
324
325 #[inline]
339 fn advance_after_success(
340 &self,
341 state: &mut PollState,
342 ctx: &Context<CdcPollMessage>,
343 latest_version: CommitVersion,
344 count: usize,
345 ) {
346 state.cached_checkpoint = Some(latest_version);
347 if count > 0 {
348 let _ = ctx.self_ref().send(CdcPollMessage::Poll);
349 } else {
350 ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
351 }
352 }
353
354 #[inline]
355 fn reschedule_after_error(&self, ctx: &Context<CdcPollMessage>, err: Error) {
356 error!("[Consumer {:?}] Error consuming events: {}", self.config.consumer_id, err);
357 ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
358 }
359
360 fn fetch_cdcs_until(&self, since_version: CommitVersion, until_version: CommitVersion) -> Result<Vec<Cdc>> {
361 let batch_size = self.config.max_batch_size.unwrap_or(1024);
362 let batch = self.store.read_range(
363 Bound::Excluded(since_version),
364 Bound::Included(until_version),
365 batch_size,
366 )?;
367 Ok(batch.items)
368 }
369}
370
371#[inline]
372fn summarize_batch(checkpoint: CommitVersion, transactions: &[Cdc]) -> (usize, CommitVersion) {
373 let count = transactions.len();
374 let latest_version = transactions.iter().map(|tx| tx.version).max().unwrap_or(checkpoint);
375 (count, latest_version)
376}
377
378fn is_relevant_cdc(cdc: &Cdc) -> bool {
379 !cdc.changes.is_empty() || cdc.system_changes.iter().any(is_relevant_system_change)
380}
381
382fn is_relevant_system_change(change: &SystemChange) -> bool {
383 let key = match change {
384 SystemChange::Insert {
385 key,
386 ..
387 }
388 | SystemChange::Update {
389 key,
390 ..
391 }
392 | SystemChange::Delete {
393 key,
394 ..
395 } => key,
396 };
397 Key::kind(key)
398 .map(|kind| {
399 matches!(
400 kind,
401 KeyKind::Row
402 | KeyKind::Flow | KeyKind::FlowNode | KeyKind::FlowNodeByFlow
403 | KeyKind::FlowEdge | KeyKind::FlowEdgeByFlow
404 | KeyKind::NamespaceFlow
405 )
406 })
407 .unwrap_or(false)
408}