1use std::{mem, ops::Bound, time::Duration};
5
6use reifydb_core::{
7 actors::cdc::CdcPollMessage,
8 common::CommitVersion,
9 encoded::key::EncodedKey,
10 interface::cdc::{Cdc, CdcConsumerId, SystemChange},
11 key::{EncodableKey, Key, cdc_consumer::CdcConsumerKey, kind::KeyKind},
12};
13use reifydb_runtime::actor::{
14 context::Context,
15 system::ActorConfig,
16 traits::{Actor, Directive},
17};
18use reifydb_transaction::transaction::Transaction;
19use reifydb_type::{Result, error::Error};
20use tracing::{debug, error};
21
22use super::{checkpoint::CdcCheckpoint, consumer::CdcConsume, host::CdcHost, watermark::CdcConsumerWatermark};
23use crate::storage::CdcStore;
24
25#[derive(Debug, Clone)]
26pub struct PollActorConfig {
27 pub consumer_id: CdcConsumerId,
28
29 pub poll_interval: Duration,
30
31 pub max_batch_size: Option<u64>,
32}
33
34pub struct PollActor<H: CdcHost, C: CdcConsume> {
35 config: PollActorConfig,
36 host: H,
37 consumer: Box<C>,
38 store: CdcStore,
39 consumer_key: EncodedKey,
40 consumer_watermark: Option<CdcConsumerWatermark>,
41}
42
43impl<H: CdcHost, C: CdcConsume> PollActor<H, C> {
44 pub fn new(
45 config: PollActorConfig,
46 host: H,
47 consumer: C,
48 store: CdcStore,
49 consumer_watermark: Option<CdcConsumerWatermark>,
50 ) -> Self {
51 let consumer_key = CdcConsumerKey {
52 consumer: config.consumer_id.clone(),
53 }
54 .encode();
55
56 Self {
57 config,
58 host,
59 consumer: Box::new(consumer),
60 store,
61 consumer_key,
62 consumer_watermark,
63 }
64 }
65
66 #[inline]
67 fn publish_watermark(&self, version: CommitVersion) {
68 if let Some(wm) = &self.consumer_watermark {
69 wm.store(version);
70 }
71 }
72}
73
74pub enum Phase {
75 Ready,
76
77 WaitingForWatermark {
78 current_version: CommitVersion,
79 retries_remaining: u8,
80 },
81
82 WaitingForConsume {
83 latest_version: CommitVersion,
84
85 count: usize,
86 },
87}
88
89pub struct PollState {
90 phase: Phase,
91
92 cached_checkpoint: Option<CommitVersion>,
93}
94
95impl<H: CdcHost, C: CdcConsume + Send + Sync + 'static> Actor for PollActor<H, C> {
96 type State = PollState;
97 type Message = CdcPollMessage;
98
99 fn init(&self, ctx: &Context<Self::Message>) -> Self::State {
100 debug!(
101 "[Consumer {:?}] Started polling with interval {:?}",
102 self.config.consumer_id, self.config.poll_interval
103 );
104
105 let _ = ctx.self_ref().send(CdcPollMessage::Poll);
106
107 PollState {
108 phase: Phase::Ready,
109 cached_checkpoint: None,
110 }
111 }
112
113 fn handle(&self, state: &mut Self::State, msg: Self::Message, ctx: &Context<Self::Message>) -> Directive {
114 match msg {
115 CdcPollMessage::Poll => self.on_poll(state, ctx),
116 CdcPollMessage::CheckWatermark => self.on_check_watermark(state, ctx),
117 CdcPollMessage::ConsumeResponse(result) => self.on_consume_response(state, ctx, result),
118 }
119 }
120
121 fn config(&self) -> ActorConfig {
122 ActorConfig::new().mailbox_capacity(16)
123 }
124}
125
126impl<H: CdcHost, C: CdcConsume> PollActor<H, C> {
127 #[inline]
128 fn on_poll(&self, state: &mut PollState, ctx: &Context<CdcPollMessage>) -> Directive {
129 if !matches!(state.phase, Phase::Ready) {
130 return Directive::Continue;
131 }
132 if ctx.is_cancelled() {
133 debug!("[Consumer {:?}] Stopped", self.config.consumer_id);
134 return Directive::Stop;
135 }
136 let current_version = match self.host.current_version() {
137 Ok(v) => v,
138 Err(e) => {
139 error!("[Consumer {:?}] Error getting current version: {}", self.config.consumer_id, e);
140 ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
141 return Directive::Continue;
142 }
143 };
144 if self.host.done_until() >= current_version {
145 self.start_consume(state, ctx);
146 } else {
147 state.phase = Phase::WaitingForWatermark {
148 current_version,
149 retries_remaining: 4,
150 };
151 ctx.schedule_once(Duration::from_millis(50), || CdcPollMessage::CheckWatermark);
152 }
153 Directive::Continue
154 }
155
156 #[inline]
157 fn on_check_watermark(&self, state: &mut PollState, ctx: &Context<CdcPollMessage>) -> Directive {
158 let Phase::WaitingForWatermark {
159 current_version,
160 retries_remaining,
161 } = state.phase
162 else {
163 return Directive::Continue;
164 };
165 if ctx.is_cancelled() {
166 debug!("[Consumer {:?}] Stopped", self.config.consumer_id);
167 return Directive::Stop;
168 }
169 let watermark_ready = self.host.done_until() >= current_version;
170 let timed_out = retries_remaining == 0;
171 if watermark_ready || timed_out {
172 state.phase = Phase::Ready;
173 self.start_consume(state, ctx);
174 } else {
175 state.phase = Phase::WaitingForWatermark {
176 current_version,
177 retries_remaining: retries_remaining - 1,
178 };
179 ctx.schedule_once(Duration::from_millis(50), || CdcPollMessage::CheckWatermark);
180 }
181 Directive::Continue
182 }
183
184 #[inline]
185 fn on_consume_response(
186 &self,
187 state: &mut PollState,
188 ctx: &Context<CdcPollMessage>,
189 result: Result<()>,
190 ) -> Directive {
191 if let Phase::WaitingForConsume {
192 latest_version,
193 count,
194 } = mem::replace(&mut state.phase, Phase::Ready)
195 {
196 self.finish_consume(state, ctx, latest_version, count, result);
197 }
198 Directive::Continue
199 }
200
201 fn start_consume(&self, state: &mut PollState, ctx: &Context<CdcPollMessage>) {
202 state.phase = Phase::Ready;
203 let safe_version = self.host.done_until();
204
205 let Some(checkpoint) = self.resolve_checkpoint(state, ctx) else {
206 return;
207 };
208 if safe_version <= checkpoint {
209 ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
210 return;
211 }
212
213 let Some(transactions) = self.fetch_or_reschedule(checkpoint, safe_version, ctx) else {
214 return;
215 };
216 if transactions.is_empty() {
217 ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
218 return;
219 }
220
221 let (count, latest_version) = summarize_batch(checkpoint, &transactions);
222 let relevant_cdcs: Vec<Cdc> = transactions.into_iter().filter(is_relevant_cdc).collect();
223
224 if relevant_cdcs.is_empty() {
225 self.advance_checkpoint_skip_ahead(state, ctx, latest_version);
226 return;
227 }
228
229 state.phase = Phase::WaitingForConsume {
230 latest_version,
231 count,
232 };
233 self.dispatch_to_consumer(relevant_cdcs, ctx);
234 }
235
236 #[inline]
237 fn advance_checkpoint_skip_ahead(
238 &self,
239 state: &mut PollState,
240 ctx: &Context<CdcPollMessage>,
241 latest_version: CommitVersion,
242 ) {
243 state.cached_checkpoint = Some(latest_version);
244 self.publish_watermark(latest_version);
245 let _ = ctx.self_ref().send(CdcPollMessage::Poll);
246 }
247
248 #[inline]
249 fn resolve_checkpoint(&self, state: &mut PollState, ctx: &Context<CdcPollMessage>) -> Option<CommitVersion> {
250 if let Some(v) = state.cached_checkpoint {
251 return Some(v);
252 }
253 let v = self.seed_checkpoint_from_durable(ctx)?;
254 state.cached_checkpoint = Some(v);
255 self.publish_watermark(v);
256 Some(v)
257 }
258
259 #[inline]
260 fn seed_checkpoint_from_durable(&self, ctx: &Context<CdcPollMessage>) -> Option<CommitVersion> {
261 let mut query = match self.host.begin_query() {
262 Ok(q) => q,
263 Err(e) => {
264 error!("[Consumer {:?}] Error beginning query: {}", self.config.consumer_id, e);
265 ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
266 return None;
267 }
268 };
269 let v = match CdcCheckpoint::fetch(&mut Transaction::Query(&mut query), &self.consumer_key) {
270 Ok(c) => c,
271 Err(e) => {
272 error!("[Consumer {:?}] Error fetching checkpoint: {}", self.config.consumer_id, e);
273 ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
274 return None;
275 }
276 };
277 drop(query);
278 Some(v)
279 }
280
281 #[inline]
282 fn fetch_or_reschedule(
283 &self,
284 checkpoint: CommitVersion,
285 safe_version: CommitVersion,
286 ctx: &Context<CdcPollMessage>,
287 ) -> Option<Vec<Cdc>> {
288 match self.fetch_cdcs_until(checkpoint, safe_version) {
289 Ok(t) => Some(t),
290 Err(e) => {
291 error!("[Consumer {:?}] Error fetching CDCs: {}", self.config.consumer_id, e);
292 ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
293 None
294 }
295 }
296 }
297
298 #[inline]
299 fn dispatch_to_consumer(&self, cdcs: Vec<Cdc>, ctx: &Context<CdcPollMessage>) {
300 let self_ref = ctx.self_ref().clone();
301 let reply: Box<dyn FnOnce(Result<()>) + Send> = Box::new(move |result| {
302 let _ = self_ref.send(CdcPollMessage::ConsumeResponse(result));
303 });
304 self.consumer.consume(cdcs, reply);
305 }
306
307 fn finish_consume(
308 &self,
309 state: &mut PollState,
310 ctx: &Context<CdcPollMessage>,
311 latest_version: CommitVersion,
312 count: usize,
313 result: Result<()>,
314 ) {
315 state.phase = Phase::Ready;
316 match result {
317 Ok(()) => self.advance_after_success(state, ctx, latest_version, count),
318 Err(e) => self.reschedule_after_error(ctx, e),
319 }
320 }
321
322 #[inline]
323 fn advance_after_success(
324 &self,
325 state: &mut PollState,
326 ctx: &Context<CdcPollMessage>,
327 latest_version: CommitVersion,
328 count: usize,
329 ) {
330 state.cached_checkpoint = Some(latest_version);
331 self.publish_watermark(latest_version);
332 if count > 0 {
333 let _ = ctx.self_ref().send(CdcPollMessage::Poll);
334 } else {
335 ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
336 }
337 }
338
339 #[inline]
340 fn reschedule_after_error(&self, ctx: &Context<CdcPollMessage>, err: Error) {
341 error!("[Consumer {:?}] Error consuming events: {}", self.config.consumer_id, err);
342 ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
343 }
344
345 fn fetch_cdcs_until(&self, since_version: CommitVersion, until_version: CommitVersion) -> Result<Vec<Cdc>> {
346 let batch_size = self.config.max_batch_size.unwrap_or(1024);
347 let batch = self.store.read_range(
348 Bound::Excluded(since_version),
349 Bound::Included(until_version),
350 batch_size,
351 )?;
352 Ok(batch.items)
353 }
354}
355
356#[inline]
357fn summarize_batch(checkpoint: CommitVersion, transactions: &[Cdc]) -> (usize, CommitVersion) {
358 let count = transactions.len();
359 let latest_version = transactions.iter().map(|tx| tx.version).max().unwrap_or(checkpoint);
360 (count, latest_version)
361}
362
363fn is_relevant_cdc(cdc: &Cdc) -> bool {
364 !cdc.changes.is_empty() || cdc.system_changes.iter().any(is_relevant_system_change)
365}
366
367fn is_relevant_system_change(change: &SystemChange) -> bool {
368 let key = match change {
369 SystemChange::Insert {
370 key,
371 ..
372 }
373 | SystemChange::Update {
374 key,
375 ..
376 }
377 | SystemChange::Delete {
378 key,
379 ..
380 } => key,
381 };
382 Key::kind(key)
383 .map(|kind| {
384 matches!(
385 kind,
386 KeyKind::Row
387 | KeyKind::Flow | KeyKind::FlowNode | KeyKind::FlowNodeByFlow
388 | KeyKind::FlowEdge | KeyKind::FlowEdgeByFlow
389 | KeyKind::NamespaceFlow
390 )
391 })
392 .unwrap_or(false)
393}