Skip to main content

reifydb_cdc/consume/
actor.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{mem, ops::Bound, time::Duration};
5
6use reifydb_core::{
7	actors::cdc::CdcPollMessage,
8	common::CommitVersion,
9	encoded::key::EncodedKey,
10	interface::cdc::{Cdc, CdcConsumerId, SystemChange},
11	key::{EncodableKey, Key, cdc_consumer::CdcConsumerKey, kind::KeyKind},
12};
13use reifydb_runtime::actor::{
14	context::Context,
15	system::ActorConfig,
16	traits::{Actor, Directive},
17};
18use reifydb_transaction::transaction::Transaction;
19use reifydb_type::Result;
20use tracing::{debug, error};
21
22use super::{checkpoint::CdcCheckpoint, consumer::CdcConsume, host::CdcHost};
23use crate::storage::CdcStore;
24
25/// Configuration for the poll actor
26#[derive(Debug, Clone)]
27pub struct PollActorConfig {
28	/// Unique identifier for this consumer
29	pub consumer_id: CdcConsumerId,
30	/// How often to poll when no data is available
31	pub poll_interval: Duration,
32	/// Maximum batch size for fetching CDC events (None = unbounded)
33	pub max_batch_size: Option<u64>,
34}
35
36pub struct PollActor<H: CdcHost, C: CdcConsume> {
37	config: PollActorConfig,
38	host: H,
39	consumer: Box<C>,
40	store: CdcStore,
41	consumer_key: EncodedKey,
42}
43
44impl<H: CdcHost, C: CdcConsume> PollActor<H, C> {
45	/// Create a new poll actor
46	pub fn new(config: PollActorConfig, host: H, consumer: C, store: CdcStore) -> Self {
47		let consumer_key = CdcConsumerKey {
48			consumer: config.consumer_id.clone(),
49		}
50		.encode();
51
52		Self {
53			config,
54			host,
55			consumer: Box::new(consumer),
56			store,
57			consumer_key,
58		}
59	}
60}
61
62/// Phase of the poll actor state machine
63pub enum PollState {
64	/// Ready to accept a new Poll
65	Ready,
66	/// Waiting for watermark to catch up to a specific version
67	WaitingForWatermark {
68		current_version: CommitVersion,
69		retries_remaining: u8,
70	},
71	/// Waiting for the consumer to respond
72	WaitingForConsume {
73		/// The latest CDC version in the batch
74		latest_version: CommitVersion,
75		/// Number of CDC transactions in the batch
76		count: usize,
77	},
78}
79
80impl<H: CdcHost, C: CdcConsume + Send + Sync + 'static> Actor for PollActor<H, C> {
81	type State = PollState;
82	type Message = CdcPollMessage;
83
84	fn init(&self, ctx: &Context<Self::Message>) -> Self::State {
85		debug!(
86			"[Consumer {:?}] Started polling with interval {:?}",
87			self.config.consumer_id, self.config.poll_interval
88		);
89
90		// Send initial poll message to start the loop
91		let _ = ctx.self_ref().send(CdcPollMessage::Poll);
92
93		PollState::Ready
94	}
95
96	fn handle(&self, state: &mut Self::State, msg: Self::Message, ctx: &Context<Self::Message>) -> Directive {
97		match msg {
98			CdcPollMessage::Poll => {
99				// Ignore Poll if we're already waiting for watermark or consume
100				if !matches!(*state, PollState::Ready) {
101					return Directive::Continue;
102				}
103
104				if ctx.is_cancelled() {
105					debug!("[Consumer {:?}] Stopped", self.config.consumer_id);
106					return Directive::Stop;
107				}
108
109				// Get current version and check watermark readiness (non-blocking)
110				let current_version = match self.host.current_version() {
111					Ok(v) => v,
112					Err(e) => {
113						error!(
114							"[Consumer {:?}] Error getting current version: {}",
115							self.config.consumer_id, e
116						);
117						ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
118						return Directive::Continue;
119					}
120				};
121
122				let done = self.host.done_until();
123				if done >= current_version {
124					// Watermark is ready, proceed with batch processing
125					self.start_consume(state, ctx);
126				} else {
127					*state = PollState::WaitingForWatermark {
128						current_version,
129						retries_remaining: 4,
130					};
131					ctx.schedule_once(Duration::from_millis(50), || CdcPollMessage::CheckWatermark);
132				}
133			}
134			CdcPollMessage::CheckWatermark => {
135				if let PollState::WaitingForWatermark {
136					current_version,
137					retries_remaining,
138				} = *state
139				{
140					let is_cancelled = ctx.is_cancelled();
141					if is_cancelled {
142						debug!("[Consumer {:?}] Stopped", self.config.consumer_id);
143						return Directive::Stop;
144					}
145
146					let done = self.host.done_until();
147					if done >= current_version {
148						// Watermark caught up, proceed
149						*state = PollState::Ready;
150						self.start_consume(state, ctx);
151					} else if retries_remaining == 0 {
152						// Timeout — proceed anyway (matches original 200ms behavior)
153						*state = PollState::Ready;
154						self.start_consume(state, ctx);
155					} else {
156						// Still not ready, schedule another check
157						*state = PollState::WaitingForWatermark {
158							current_version,
159							retries_remaining: retries_remaining - 1,
160						};
161						ctx.schedule_once(Duration::from_millis(50), || {
162							CdcPollMessage::CheckWatermark
163						});
164					}
165				}
166				// If not in WaitingForWatermark phase, ignore
167			}
168			CdcPollMessage::ConsumeResponse(result) => {
169				// Only handle if we're waiting for a consume response
170				if let PollState::WaitingForConsume {
171					latest_version,
172					count,
173				} = mem::replace(&mut *state, PollState::Ready)
174				{
175					self.finish_consume(state, ctx, latest_version, count, result);
176				}
177			}
178		}
179		Directive::Continue
180	}
181
182	fn config(&self) -> ActorConfig {
183		ActorConfig::new().mailbox_capacity(16)
184	}
185}
186
187impl<H: CdcHost, C: CdcConsume> PollActor<H, C> {
188	/// Start consuming a batch: fetch CDCs, filter, and send to consumer asynchronously.
189	/// If no data is available, schedules the next poll directly.
190	fn start_consume(&self, state: &mut PollState, ctx: &Context<CdcPollMessage>) {
191		*state = PollState::Ready;
192
193		let safe_version = self.host.done_until();
194
195		let mut query = match self.host.begin_query() {
196			Ok(q) => q,
197			Err(e) => {
198				error!("[Consumer {:?}] Error beginning query: {}", self.config.consumer_id, e);
199				ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
200				return;
201			}
202		};
203
204		let checkpoint = match CdcCheckpoint::fetch(&mut Transaction::Query(&mut query), &self.consumer_key) {
205			Ok(c) => c,
206			Err(e) => {
207				error!("[Consumer {:?}] Error fetching checkpoint: {}", self.config.consumer_id, e);
208				ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
209				return;
210			}
211		};
212
213		// Drop the query — we no longer hold any transaction
214		drop(query);
215
216		if safe_version <= checkpoint {
217			// Nothing safe to fetch yet
218			ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
219			return;
220		}
221
222		let transactions = match self.fetch_cdcs_until(checkpoint, safe_version) {
223			Ok(t) => t,
224			Err(e) => {
225				error!("[Consumer {:?}] Error fetching CDCs: {}", self.config.consumer_id, e);
226				ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
227				return;
228			}
229		};
230
231		if transactions.is_empty() {
232			ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
233			return;
234		}
235
236		let count = transactions.len();
237		let latest_version = transactions.iter().map(|tx| tx.version).max().unwrap_or(checkpoint);
238
239		// Filter transactions to only those with Row or Flow-related changes
240		let relevant_cdcs = transactions
241			.into_iter()
242			.filter(|cdc| {
243				// Pass through if there are decoded row changes (columnar Change objects)
244				!cdc.changes.is_empty()
245					|| cdc.system_changes.iter().any(|sys_change| match sys_change {
246						SystemChange::Insert {
247							key,
248							..
249						}
250						| SystemChange::Update {
251							key,
252							..
253						}
254						| SystemChange::Delete {
255							key,
256							..
257						} => {
258							if let Some(kind) = Key::kind(key) {
259								matches!(
260									kind,
261									KeyKind::Row
262										| KeyKind::Flow | KeyKind::FlowNode
263										| KeyKind::FlowNodeByFlow | KeyKind::FlowEdge
264										| KeyKind::FlowEdgeByFlow
265										| KeyKind::NamespaceFlow
266								)
267							} else {
268								false
269							}
270						}
271					})
272			})
273			.collect::<Vec<_>>();
274
275		if relevant_cdcs.is_empty() {
276			// No relevant CDCs — persist checkpoint and commit directly
277			match self.host.begin_command() {
278				Ok(mut transaction) => {
279					match CdcCheckpoint::persist(
280						&mut transaction,
281						&self.consumer_key,
282						latest_version,
283					) {
284						Ok(_) => {
285							let _ = transaction.commit();
286						}
287						Err(e) => {
288							error!(
289								"[Consumer {:?}] Error persisting checkpoint: {}",
290								self.config.consumer_id, e
291							);
292							let _ = transaction.rollback();
293						}
294					}
295				}
296				Err(e) => {
297					error!(
298						"[Consumer {:?}] Error beginning transaction for checkpoint: {}",
299						self.config.consumer_id, e
300					);
301				}
302			}
303			// More events likely available
304			let _ = ctx.self_ref().send(CdcPollMessage::Poll);
305			return;
306		}
307
308		// Transition to WaitingForConsume phase before sending
309		*state = PollState::WaitingForConsume {
310			latest_version,
311			count,
312		};
313
314		// Send to consumer with a callback that delivers ConsumeResponse back to self
315		let self_ref = ctx.self_ref().clone();
316		let reply: Box<dyn FnOnce(Result<()>) + Send> = Box::new(move |result| {
317			let _ = self_ref.send(CdcPollMessage::ConsumeResponse(result));
318		});
319
320		self.consumer.consume(relevant_cdcs, reply);
321	}
322
323	/// Finish consuming: persist consumer checkpoint on success, schedule next poll.
324	fn finish_consume(
325		&self,
326		state: &mut PollState,
327		ctx: &Context<CdcPollMessage>,
328		latest_version: CommitVersion,
329		count: usize,
330		result: Result<()>,
331	) {
332		*state = PollState::Ready;
333
334		match result {
335			Ok(()) => {
336				// Consumer committed its own writes. Now persist the consumer-level checkpoint.
337				match self.host.begin_command() {
338					Ok(mut transaction) => {
339						match CdcCheckpoint::persist(
340							&mut transaction,
341							&self.consumer_key,
342							latest_version,
343						) {
344							Ok(_) => {
345								match transaction.commit() {
346									Ok(_) => {
347										if count > 0 {
348											// More events likely available
349											// - poll again immediately
350											let _ = ctx.self_ref().send(
351												CdcPollMessage::Poll,
352											);
353										} else {
354											ctx.schedule_once(
355												self.config
356													.poll_interval,
357												|| CdcPollMessage::Poll,
358											);
359										}
360									}
361									Err(e) => {
362										error!(
363											"[Consumer {:?}] Error committing checkpoint: {}",
364											self.config.consumer_id, e
365										);
366										ctx.schedule_once(
367											self.config.poll_interval,
368											|| CdcPollMessage::Poll,
369										);
370									}
371								}
372							}
373							Err(e) => {
374								error!(
375									"[Consumer {:?}] Error persisting checkpoint: {}",
376									self.config.consumer_id, e
377								);
378								let _ = transaction.rollback();
379								ctx.schedule_once(self.config.poll_interval, || {
380									CdcPollMessage::Poll
381								});
382							}
383						}
384					}
385					Err(e) => {
386						error!(
387							"[Consumer {:?}] Error beginning checkpoint transaction: {}",
388							self.config.consumer_id, e
389						);
390						ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
391					}
392				}
393			}
394			Err(e) => {
395				error!("[Consumer {:?}] Error consuming events: {}", self.config.consumer_id, e);
396				ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
397			}
398		}
399	}
400
401	fn fetch_cdcs_until(&self, since_version: CommitVersion, until_version: CommitVersion) -> Result<Vec<Cdc>> {
402		let batch_size = self.config.max_batch_size.unwrap_or(1024);
403		let batch = self.store.read_range(
404			Bound::Excluded(since_version),
405			Bound::Included(until_version),
406			batch_size,
407		)?;
408		Ok(batch.items)
409	}
410}