Skip to main content

reifydb_cdc/consume/
actor.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{mem, ops::Bound, time::Duration};
5
6use reifydb_core::{
7	common::CommitVersion,
8	encoded::key::EncodedKey,
9	interface::cdc::{Cdc, CdcConsumerId, SystemChange},
10	key::{EncodableKey, Key, cdc_consumer::CdcConsumerKey, kind::KeyKind},
11};
12use reifydb_runtime::actor::{
13	context::Context,
14	system::ActorConfig,
15	traits::{Actor, Directive},
16};
17use reifydb_transaction::transaction::Transaction;
18use reifydb_type::Result;
19use tracing::{debug, error};
20
21use super::{checkpoint::CdcCheckpoint, consumer::CdcConsume, host::CdcHost};
22use crate::storage::CdcStore;
23
24pub enum PollMsg {
25	/// Trigger a poll for CDC events
26	Poll,
27	/// Retry watermark readiness check
28	CheckWatermark,
29	/// Async response from the consumer
30	ConsumeResponse(Result<()>),
31}
32
33/// Configuration for the poll actor
34#[derive(Debug, Clone)]
35pub struct PollActorConfig {
36	/// Unique identifier for this consumer
37	pub consumer_id: CdcConsumerId,
38	/// How often to poll when no data is available
39	pub poll_interval: Duration,
40	/// Maximum batch size for fetching CDC events (None = unbounded)
41	pub max_batch_size: Option<u64>,
42}
43
44pub struct PollActor<H: CdcHost, C: CdcConsume> {
45	config: PollActorConfig,
46	host: H,
47	consumer: Box<C>,
48	store: CdcStore,
49	consumer_key: EncodedKey,
50}
51
52impl<H: CdcHost, C: CdcConsume> PollActor<H, C> {
53	/// Create a new poll actor
54	pub fn new(config: PollActorConfig, host: H, consumer: C, store: CdcStore) -> Self {
55		let consumer_key = CdcConsumerKey {
56			consumer: config.consumer_id.clone(),
57		}
58		.encode();
59
60		Self {
61			config,
62			host,
63			consumer: Box::new(consumer),
64			store,
65			consumer_key,
66		}
67	}
68}
69
70/// Phase of the poll actor state machine
71pub enum PollState {
72	/// Ready to accept a new Poll
73	Ready,
74	/// Waiting for watermark to catch up to a specific version
75	WaitingForWatermark {
76		current_version: CommitVersion,
77		retries_remaining: u8,
78	},
79	/// Waiting for the consumer to respond
80	WaitingForConsume {
81		/// The latest CDC version in the batch
82		latest_version: CommitVersion,
83		/// Number of CDC transactions in the batch
84		count: usize,
85	},
86}
87
88impl<H: CdcHost, C: CdcConsume + Send + Sync + 'static> Actor for PollActor<H, C> {
89	type State = PollState;
90	type Message = PollMsg;
91
92	fn init(&self, ctx: &Context<Self::Message>) -> Self::State {
93		debug!(
94			"[Consumer {:?}] Started polling with interval {:?}",
95			self.config.consumer_id, self.config.poll_interval
96		);
97
98		// Send initial poll message to start the loop
99		let _ = ctx.self_ref().send(PollMsg::Poll);
100
101		PollState::Ready
102	}
103
104	fn handle(&self, state: &mut Self::State, msg: Self::Message, ctx: &Context<Self::Message>) -> Directive {
105		match msg {
106			PollMsg::Poll => {
107				// Ignore Poll if we're already waiting for watermark or consume
108				if !matches!(*state, PollState::Ready) {
109					return Directive::Continue;
110				}
111
112				if ctx.is_cancelled() {
113					debug!("[Consumer {:?}] Stopped", self.config.consumer_id);
114					return Directive::Stop;
115				}
116
117				// Get current version and check watermark readiness (non-blocking)
118				let current_version = match self.host.current_version() {
119					Ok(v) => v,
120					Err(e) => {
121						error!(
122							"[Consumer {:?}] Error getting current version: {}",
123							self.config.consumer_id, e
124						);
125						ctx.schedule_once(self.config.poll_interval, || PollMsg::Poll);
126						return Directive::Continue;
127					}
128				};
129
130				let done = self.host.done_until();
131				if done >= current_version {
132					// Watermark is ready, proceed with batch processing
133					self.start_consume(state, ctx);
134				} else {
135					*state = PollState::WaitingForWatermark {
136						current_version,
137						retries_remaining: 4,
138					};
139					ctx.schedule_once(Duration::from_millis(50), || PollMsg::CheckWatermark);
140				}
141			}
142			PollMsg::CheckWatermark => {
143				if let PollState::WaitingForWatermark {
144					current_version,
145					retries_remaining,
146				} = *state
147				{
148					let is_cancelled = ctx.is_cancelled();
149					if is_cancelled {
150						debug!("[Consumer {:?}] Stopped", self.config.consumer_id);
151						return Directive::Stop;
152					}
153
154					let done = self.host.done_until();
155					if done >= current_version {
156						// Watermark caught up, proceed
157						*state = PollState::Ready;
158						self.start_consume(state, ctx);
159					} else if retries_remaining == 0 {
160						// Timeout — proceed anyway (matches original 200ms behavior)
161						*state = PollState::Ready;
162						self.start_consume(state, ctx);
163					} else {
164						// Still not ready, schedule another check
165						*state = PollState::WaitingForWatermark {
166							current_version,
167							retries_remaining: retries_remaining - 1,
168						};
169						ctx.schedule_once(Duration::from_millis(50), || {
170							PollMsg::CheckWatermark
171						});
172					}
173				}
174				// If not in WaitingForWatermark phase, ignore
175			}
176			PollMsg::ConsumeResponse(result) => {
177				// Only handle if we're waiting for a consume response
178				if let PollState::WaitingForConsume {
179					latest_version,
180					count,
181				} = mem::replace(&mut *state, PollState::Ready)
182				{
183					self.finish_consume(state, ctx, latest_version, count, result);
184				}
185			}
186		}
187		Directive::Continue
188	}
189
190	fn config(&self) -> ActorConfig {
191		ActorConfig::new().mailbox_capacity(16)
192	}
193}
194
195impl<H: CdcHost, C: CdcConsume> PollActor<H, C> {
196	/// Start consuming a batch: fetch CDCs, filter, and send to consumer asynchronously.
197	/// If no data is available, schedules the next poll directly.
198	fn start_consume(&self, state: &mut PollState, ctx: &Context<PollMsg>) {
199		*state = PollState::Ready;
200
201		let safe_version = self.host.done_until();
202
203		let mut query = match self.host.begin_query() {
204			Ok(q) => q,
205			Err(e) => {
206				error!("[Consumer {:?}] Error beginning query: {}", self.config.consumer_id, e);
207				ctx.schedule_once(self.config.poll_interval, || PollMsg::Poll);
208				return;
209			}
210		};
211
212		let checkpoint = match CdcCheckpoint::fetch(&mut Transaction::Query(&mut query), &self.consumer_key) {
213			Ok(c) => c,
214			Err(e) => {
215				error!("[Consumer {:?}] Error fetching checkpoint: {}", self.config.consumer_id, e);
216				ctx.schedule_once(self.config.poll_interval, || PollMsg::Poll);
217				return;
218			}
219		};
220
221		// Drop the query — we no longer hold any transaction
222		drop(query);
223
224		if safe_version <= checkpoint {
225			// Nothing safe to fetch yet
226			ctx.schedule_once(self.config.poll_interval, || PollMsg::Poll);
227			return;
228		}
229
230		let transactions = match self.fetch_cdcs_until(checkpoint, safe_version) {
231			Ok(t) => t,
232			Err(e) => {
233				error!("[Consumer {:?}] Error fetching CDCs: {}", self.config.consumer_id, e);
234				ctx.schedule_once(self.config.poll_interval, || PollMsg::Poll);
235				return;
236			}
237		};
238
239		if transactions.is_empty() {
240			ctx.schedule_once(self.config.poll_interval, || PollMsg::Poll);
241			return;
242		}
243
244		let count = transactions.len();
245		let latest_version = transactions.iter().map(|tx| tx.version).max().unwrap_or(checkpoint);
246
247		// Filter transactions to only those with Row or Flow-related changes
248		let relevant_cdcs = transactions
249			.into_iter()
250			.filter(|cdc| {
251				// Pass through if there are decoded row changes (columnar Change objects)
252				!cdc.changes.is_empty()
253					|| cdc.system_changes.iter().any(|sys_change| match sys_change {
254						SystemChange::Insert {
255							key,
256							..
257						}
258						| SystemChange::Update {
259							key,
260							..
261						}
262						| SystemChange::Delete {
263							key,
264							..
265						} => {
266							if let Some(kind) = Key::kind(key) {
267								matches!(
268									kind,
269									KeyKind::Row
270										| KeyKind::Flow | KeyKind::FlowNode
271										| KeyKind::FlowNodeByFlow | KeyKind::FlowEdge
272										| KeyKind::FlowEdgeByFlow
273										| KeyKind::NamespaceFlow
274								)
275							} else {
276								false
277							}
278						}
279					})
280			})
281			.collect::<Vec<_>>();
282
283		if relevant_cdcs.is_empty() {
284			// No relevant CDCs — persist checkpoint and commit directly
285			match self.host.begin_command() {
286				Ok(mut transaction) => {
287					match CdcCheckpoint::persist(
288						&mut transaction,
289						&self.consumer_key,
290						latest_version,
291					) {
292						Ok(_) => {
293							let _ = transaction.commit();
294						}
295						Err(e) => {
296							error!(
297								"[Consumer {:?}] Error persisting checkpoint: {}",
298								self.config.consumer_id, e
299							);
300							let _ = transaction.rollback();
301						}
302					}
303				}
304				Err(e) => {
305					error!(
306						"[Consumer {:?}] Error beginning transaction for checkpoint: {}",
307						self.config.consumer_id, e
308					);
309				}
310			}
311			// More events likely available
312			let _ = ctx.self_ref().send(PollMsg::Poll);
313			return;
314		}
315
316		// Transition to WaitingForConsume phase before sending
317		*state = PollState::WaitingForConsume {
318			latest_version,
319			count,
320		};
321
322		// Send to consumer with a callback that delivers ConsumeResponse back to self
323		let self_ref = ctx.self_ref().clone();
324		let reply: Box<dyn FnOnce(Result<()>) + Send> = Box::new(move |result| {
325			let _ = self_ref.send(PollMsg::ConsumeResponse(result));
326		});
327
328		self.consumer.consume(relevant_cdcs, reply);
329	}
330
331	/// Finish consuming: persist consumer checkpoint on success, schedule next poll.
332	fn finish_consume(
333		&self,
334		state: &mut PollState,
335		ctx: &Context<PollMsg>,
336		latest_version: CommitVersion,
337		count: usize,
338		result: Result<()>,
339	) {
340		*state = PollState::Ready;
341
342		match result {
343			Ok(()) => {
344				// Consumer committed its own writes. Now persist the consumer-level checkpoint.
345				match self.host.begin_command() {
346					Ok(mut transaction) => {
347						match CdcCheckpoint::persist(
348							&mut transaction,
349							&self.consumer_key,
350							latest_version,
351						) {
352							Ok(_) => {
353								match transaction.commit() {
354									Ok(_) => {
355										if count > 0 {
356											// More events likely available
357											// - poll again immediately
358											let _ = ctx
359												.self_ref()
360												.send(PollMsg::Poll);
361										} else {
362											ctx.schedule_once(
363												self.config
364													.poll_interval,
365												|| PollMsg::Poll,
366											);
367										}
368									}
369									Err(e) => {
370										error!(
371											"[Consumer {:?}] Error committing checkpoint: {}",
372											self.config.consumer_id, e
373										);
374										ctx.schedule_once(
375											self.config.poll_interval,
376											|| PollMsg::Poll,
377										);
378									}
379								}
380							}
381							Err(e) => {
382								error!(
383									"[Consumer {:?}] Error persisting checkpoint: {}",
384									self.config.consumer_id, e
385								);
386								let _ = transaction.rollback();
387								ctx.schedule_once(self.config.poll_interval, || {
388									PollMsg::Poll
389								});
390							}
391						}
392					}
393					Err(e) => {
394						error!(
395							"[Consumer {:?}] Error beginning checkpoint transaction: {}",
396							self.config.consumer_id, e
397						);
398						ctx.schedule_once(self.config.poll_interval, || PollMsg::Poll);
399					}
400				}
401			}
402			Err(e) => {
403				error!("[Consumer {:?}] Error consuming events: {}", self.config.consumer_id, e);
404				ctx.schedule_once(self.config.poll_interval, || PollMsg::Poll);
405			}
406		}
407	}
408
409	fn fetch_cdcs_until(&self, since_version: CommitVersion, until_version: CommitVersion) -> Result<Vec<Cdc>> {
410		let batch_size = self.config.max_batch_size.unwrap_or(1024);
411		let batch = self.store.read_range(
412			Bound::Excluded(since_version),
413			Bound::Included(until_version),
414			batch_size,
415		)?;
416		Ok(batch.items)
417	}
418}