Skip to main content

reifydb_cdc/consume/
actor.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{mem, ops::Bound, time::Duration};
5
6use reifydb_core::{
7	actors::cdc::CdcPollMessage,
8	common::CommitVersion,
9	encoded::key::EncodedKey,
10	interface::cdc::{Cdc, CdcConsumerId, SystemChange},
11	key::{EncodableKey, Key, cdc_consumer::CdcConsumerKey, kind::KeyKind},
12};
13use reifydb_runtime::actor::{
14	context::Context,
15	system::ActorConfig,
16	traits::{Actor, Directive},
17};
18use reifydb_transaction::transaction::Transaction;
19use reifydb_type::{Result, error::Error};
20use tracing::{debug, error};
21
22use super::{checkpoint::CdcCheckpoint, consumer::CdcConsume, host::CdcHost};
23use crate::storage::CdcStore;
24
25/// Configuration for the poll actor
26#[derive(Debug, Clone)]
27pub struct PollActorConfig {
28	/// Unique identifier for this consumer
29	pub consumer_id: CdcConsumerId,
30	/// How often to poll when no data is available
31	pub poll_interval: Duration,
32	/// Maximum batch size for fetching CDC events (None = unbounded)
33	pub max_batch_size: Option<u64>,
34}
35
36pub struct PollActor<H: CdcHost, C: CdcConsume> {
37	config: PollActorConfig,
38	host: H,
39	consumer: Box<C>,
40	store: CdcStore,
41	consumer_key: EncodedKey,
42}
43
44impl<H: CdcHost, C: CdcConsume> PollActor<H, C> {
45	/// Create a new poll actor
46	pub fn new(config: PollActorConfig, host: H, consumer: C, store: CdcStore) -> Self {
47		let consumer_key = CdcConsumerKey {
48			consumer: config.consumer_id.clone(),
49		}
50		.encode();
51
52		Self {
53			config,
54			host,
55			consumer: Box::new(consumer),
56			store,
57			consumer_key,
58		}
59	}
60}
61
62/// Phase of the poll actor state machine
63pub enum Phase {
64	/// Ready to accept a new Poll
65	Ready,
66	/// Waiting for watermark to catch up to a specific version
67	WaitingForWatermark {
68		current_version: CommitVersion,
69		retries_remaining: u8,
70	},
71	/// Waiting for the consumer to respond
72	WaitingForConsume {
73		/// The latest CDC version in the batch
74		latest_version: CommitVersion,
75		/// Number of CDC transactions in the batch
76		count: usize,
77	},
78}
79
80pub struct PollState {
81	phase: Phase,
82	/// In-memory mirror of the durable checkpoint, plus any skip-ahead
83	/// advances made on idle polls. Seeded from storage on first poll.
84	/// Used as the lower bound for `fetch_cdcs_until` so we never re-read
85	/// filtered ranges. Never persisted directly - the durable record is
86	/// updated only inside `finish_consume`, where it commits atomically
87	/// alongside the consumer's user-data writes.
88	cached_checkpoint: Option<CommitVersion>,
89}
90
91impl<H: CdcHost, C: CdcConsume + Send + Sync + 'static> Actor for PollActor<H, C> {
92	type State = PollState;
93	type Message = CdcPollMessage;
94
95	fn init(&self, ctx: &Context<Self::Message>) -> Self::State {
96		debug!(
97			"[Consumer {:?}] Started polling with interval {:?}",
98			self.config.consumer_id, self.config.poll_interval
99		);
100
101		// Send initial poll message to start the loop
102		let _ = ctx.self_ref().send(CdcPollMessage::Poll);
103
104		PollState {
105			phase: Phase::Ready,
106			cached_checkpoint: None,
107		}
108	}
109
110	fn handle(&self, state: &mut Self::State, msg: Self::Message, ctx: &Context<Self::Message>) -> Directive {
111		match msg {
112			CdcPollMessage::Poll => self.on_poll(state, ctx),
113			CdcPollMessage::CheckWatermark => self.on_check_watermark(state, ctx),
114			CdcPollMessage::ConsumeResponse(result) => self.on_consume_response(state, ctx, result),
115		}
116	}
117
118	fn config(&self) -> ActorConfig {
119		ActorConfig::new().mailbox_capacity(16)
120	}
121}
122
123impl<H: CdcHost, C: CdcConsume> PollActor<H, C> {
124	#[inline]
125	fn on_poll(&self, state: &mut PollState, ctx: &Context<CdcPollMessage>) -> Directive {
126		if !matches!(state.phase, Phase::Ready) {
127			return Directive::Continue;
128		}
129		if ctx.is_cancelled() {
130			debug!("[Consumer {:?}] Stopped", self.config.consumer_id);
131			return Directive::Stop;
132		}
133		let current_version = match self.host.current_version() {
134			Ok(v) => v,
135			Err(e) => {
136				error!("[Consumer {:?}] Error getting current version: {}", self.config.consumer_id, e);
137				ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
138				return Directive::Continue;
139			}
140		};
141		if self.host.done_until() >= current_version {
142			self.start_consume(state, ctx);
143		} else {
144			state.phase = Phase::WaitingForWatermark {
145				current_version,
146				retries_remaining: 4,
147			};
148			ctx.schedule_once(Duration::from_millis(50), || CdcPollMessage::CheckWatermark);
149		}
150		Directive::Continue
151	}
152
153	#[inline]
154	fn on_check_watermark(&self, state: &mut PollState, ctx: &Context<CdcPollMessage>) -> Directive {
155		let Phase::WaitingForWatermark {
156			current_version,
157			retries_remaining,
158		} = state.phase
159		else {
160			return Directive::Continue;
161		};
162		if ctx.is_cancelled() {
163			debug!("[Consumer {:?}] Stopped", self.config.consumer_id);
164			return Directive::Stop;
165		}
166		let watermark_ready = self.host.done_until() >= current_version;
167		let timed_out = retries_remaining == 0;
168		if watermark_ready || timed_out {
169			state.phase = Phase::Ready;
170			self.start_consume(state, ctx);
171		} else {
172			state.phase = Phase::WaitingForWatermark {
173				current_version,
174				retries_remaining: retries_remaining - 1,
175			};
176			ctx.schedule_once(Duration::from_millis(50), || CdcPollMessage::CheckWatermark);
177		}
178		Directive::Continue
179	}
180
181	#[inline]
182	fn on_consume_response(
183		&self,
184		state: &mut PollState,
185		ctx: &Context<CdcPollMessage>,
186		result: Result<()>,
187	) -> Directive {
188		if let Phase::WaitingForConsume {
189			latest_version,
190			count,
191		} = mem::replace(&mut state.phase, Phase::Ready)
192		{
193			self.finish_consume(state, ctx, latest_version, count, result);
194		}
195		Directive::Continue
196	}
197
198	fn start_consume(&self, state: &mut PollState, ctx: &Context<CdcPollMessage>) {
199		state.phase = Phase::Ready;
200		let safe_version = self.host.done_until();
201
202		let Some(checkpoint) = self.resolve_checkpoint(state, ctx) else {
203			return;
204		};
205		if safe_version <= checkpoint {
206			ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
207			return;
208		}
209
210		let Some(transactions) = self.fetch_or_reschedule(checkpoint, safe_version, ctx) else {
211			return;
212		};
213		if transactions.is_empty() {
214			ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
215			return;
216		}
217
218		let (count, latest_version) = summarize_batch(checkpoint, &transactions);
219		let relevant_cdcs: Vec<Cdc> = transactions.into_iter().filter(is_relevant_cdc).collect();
220
221		if relevant_cdcs.is_empty() {
222			self.advance_checkpoint_skip_ahead(state, ctx, latest_version);
223			return;
224		}
225
226		state.phase = Phase::WaitingForConsume {
227			latest_version,
228			count,
229		};
230		self.dispatch_to_consumer(relevant_cdcs, ctx);
231	}
232
233	/// Advance the in-memory checkpoint past a range that had no relevant CDCs,
234	/// then poll immediately. Does NOT touch the multi store - this advance is a
235	/// process-local skip-ahead. The durable checkpoint moves only when
236	/// `finish_consume` actually applies CDCs (a real-work commit that the multi
237	/// store legitimately version-bumps).
238	#[inline]
239	fn advance_checkpoint_skip_ahead(
240		&self,
241		state: &mut PollState,
242		ctx: &Context<CdcPollMessage>,
243		latest_version: CommitVersion,
244	) {
245		state.cached_checkpoint = Some(latest_version);
246		let _ = ctx.self_ref().send(CdcPollMessage::Poll);
247	}
248
249	#[inline]
250	fn resolve_checkpoint(&self, state: &mut PollState, ctx: &Context<CdcPollMessage>) -> Option<CommitVersion> {
251		if let Some(v) = state.cached_checkpoint {
252			return Some(v);
253		}
254		let v = self.seed_checkpoint_from_durable(ctx)?;
255		state.cached_checkpoint = Some(v);
256		Some(v)
257	}
258
259	/// First-poll only: read the durable checkpoint from storage. After this we
260	/// never read the durable checkpoint again - the cache is authoritative
261	/// until process exit. On error, schedules a retry poll and returns None.
262	#[inline]
263	fn seed_checkpoint_from_durable(&self, ctx: &Context<CdcPollMessage>) -> Option<CommitVersion> {
264		let mut query = match self.host.begin_query() {
265			Ok(q) => q,
266			Err(e) => {
267				error!("[Consumer {:?}] Error beginning query: {}", self.config.consumer_id, e);
268				ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
269				return None;
270			}
271		};
272		let v = match CdcCheckpoint::fetch(&mut Transaction::Query(&mut query), &self.consumer_key) {
273			Ok(c) => c,
274			Err(e) => {
275				error!("[Consumer {:?}] Error fetching checkpoint: {}", self.config.consumer_id, e);
276				ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
277				return None;
278			}
279		};
280		drop(query);
281		Some(v)
282	}
283
284	#[inline]
285	fn fetch_or_reschedule(
286		&self,
287		checkpoint: CommitVersion,
288		safe_version: CommitVersion,
289		ctx: &Context<CdcPollMessage>,
290	) -> Option<Vec<Cdc>> {
291		match self.fetch_cdcs_until(checkpoint, safe_version) {
292			Ok(t) => Some(t),
293			Err(e) => {
294				error!("[Consumer {:?}] Error fetching CDCs: {}", self.config.consumer_id, e);
295				ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
296				None
297			}
298		}
299	}
300
301	#[inline]
302	fn dispatch_to_consumer(&self, cdcs: Vec<Cdc>, ctx: &Context<CdcPollMessage>) {
303		let self_ref = ctx.self_ref().clone();
304		let reply: Box<dyn FnOnce(Result<()>) + Send> = Box::new(move |result| {
305			let _ = self_ref.send(CdcPollMessage::ConsumeResponse(result));
306		});
307		self.consumer.consume(cdcs, reply);
308	}
309
310	fn finish_consume(
311		&self,
312		state: &mut PollState,
313		ctx: &Context<CdcPollMessage>,
314		latest_version: CommitVersion,
315		count: usize,
316		result: Result<()>,
317	) {
318		state.phase = Phase::Ready;
319		match result {
320			Ok(()) => self.advance_after_success(state, ctx, latest_version, count),
321			Err(e) => self.reschedule_after_error(ctx, e),
322		}
323	}
324
325	/// Advance the in-memory checkpoint. The consumer has applied its writes via
326	/// its own transaction; the checkpoint here is a process-local watermark
327	/// used as the lower bound for the next fetch, mirrored from
328	/// `state.cached_checkpoint`.
329	///
330	/// We do NOT persist the checkpoint to the multi store. A per-batch persist
331	/// would write a `CdcConsumer`-keyed row - filtered from CDC by
332	/// `should_exclude_from_cdc` but still allocating a fresh `current_version`
333	/// on every batch. That drifts version-sensitive callers (e.g. cdc-snapshot
334	/// golden scripts) and produces redundant work, since the consumer's apply
335	/// is idempotent: on restart the durable checkpoint is re-seeded from
336	/// storage in `start_consume`, the consumer re-fetches the same range, and
337	/// re-applies the same set/unset writes - cheap and correct.
338	#[inline]
339	fn advance_after_success(
340		&self,
341		state: &mut PollState,
342		ctx: &Context<CdcPollMessage>,
343		latest_version: CommitVersion,
344		count: usize,
345	) {
346		state.cached_checkpoint = Some(latest_version);
347		if count > 0 {
348			let _ = ctx.self_ref().send(CdcPollMessage::Poll);
349		} else {
350			ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
351		}
352	}
353
354	#[inline]
355	fn reschedule_after_error(&self, ctx: &Context<CdcPollMessage>, err: Error) {
356		error!("[Consumer {:?}] Error consuming events: {}", self.config.consumer_id, err);
357		ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
358	}
359
360	fn fetch_cdcs_until(&self, since_version: CommitVersion, until_version: CommitVersion) -> Result<Vec<Cdc>> {
361		let batch_size = self.config.max_batch_size.unwrap_or(1024);
362		let batch = self.store.read_range(
363			Bound::Excluded(since_version),
364			Bound::Included(until_version),
365			batch_size,
366		)?;
367		Ok(batch.items)
368	}
369}
370
371#[inline]
372fn summarize_batch(checkpoint: CommitVersion, transactions: &[Cdc]) -> (usize, CommitVersion) {
373	let count = transactions.len();
374	let latest_version = transactions.iter().map(|tx| tx.version).max().unwrap_or(checkpoint);
375	(count, latest_version)
376}
377
378fn is_relevant_cdc(cdc: &Cdc) -> bool {
379	!cdc.changes.is_empty() || cdc.system_changes.iter().any(is_relevant_system_change)
380}
381
382fn is_relevant_system_change(change: &SystemChange) -> bool {
383	let key = match change {
384		SystemChange::Insert {
385			key,
386			..
387		}
388		| SystemChange::Update {
389			key,
390			..
391		}
392		| SystemChange::Delete {
393			key,
394			..
395		} => key,
396	};
397	Key::kind(key)
398		.map(|kind| {
399			matches!(
400				kind,
401				KeyKind::Row
402					| KeyKind::Flow | KeyKind::FlowNode | KeyKind::FlowNodeByFlow
403					| KeyKind::FlowEdge | KeyKind::FlowEdgeByFlow
404					| KeyKind::NamespaceFlow
405			)
406		})
407		.unwrap_or(false)
408}