Skip to main content

reifydb_cdc/consume/
actor.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{mem, ops::Bound, time::Duration};
5
6use reifydb_core::{
7	actors::cdc::CdcPollMessage,
8	common::CommitVersion,
9	encoded::key::EncodedKey,
10	interface::cdc::{Cdc, CdcConsumerId, SystemChange},
11	key::{EncodableKey, Key, cdc_consumer::CdcConsumerKey, kind::KeyKind},
12};
13use reifydb_runtime::actor::{
14	context::Context,
15	system::ActorConfig,
16	traits::{Actor, Directive},
17};
18use reifydb_transaction::transaction::Transaction;
19use reifydb_type::{Result, error::Error};
20use tracing::{debug, error};
21
22use super::{checkpoint::CdcCheckpoint, consumer::CdcConsume, host::CdcHost, watermark::CdcConsumerWatermark};
23use crate::storage::CdcStore;
24
25#[derive(Debug, Clone)]
26pub struct PollActorConfig {
27	pub consumer_id: CdcConsumerId,
28
29	pub poll_interval: Duration,
30
31	pub max_batch_size: Option<u64>,
32}
33
34pub struct PollActor<H: CdcHost, C: CdcConsume> {
35	config: PollActorConfig,
36	host: H,
37	consumer: Box<C>,
38	store: CdcStore,
39	consumer_key: EncodedKey,
40	consumer_watermark: Option<CdcConsumerWatermark>,
41}
42
43impl<H: CdcHost, C: CdcConsume> PollActor<H, C> {
44	pub fn new(
45		config: PollActorConfig,
46		host: H,
47		consumer: C,
48		store: CdcStore,
49		consumer_watermark: Option<CdcConsumerWatermark>,
50	) -> Self {
51		let consumer_key = CdcConsumerKey {
52			consumer: config.consumer_id.clone(),
53		}
54		.encode();
55
56		Self {
57			config,
58			host,
59			consumer: Box::new(consumer),
60			store,
61			consumer_key,
62			consumer_watermark,
63		}
64	}
65
66	#[inline]
67	fn publish_watermark(&self, version: CommitVersion) {
68		if let Some(wm) = &self.consumer_watermark {
69			wm.store(version);
70		}
71	}
72}
73
74pub enum Phase {
75	Ready,
76
77	WaitingForWatermark {
78		current_version: CommitVersion,
79		retries_remaining: u8,
80	},
81
82	WaitingForConsume {
83		latest_version: CommitVersion,
84
85		count: usize,
86	},
87}
88
89pub struct PollState {
90	phase: Phase,
91
92	cached_checkpoint: Option<CommitVersion>,
93}
94
95impl<H: CdcHost, C: CdcConsume + Send + Sync + 'static> Actor for PollActor<H, C> {
96	type State = PollState;
97	type Message = CdcPollMessage;
98
99	fn init(&self, ctx: &Context<Self::Message>) -> Self::State {
100		debug!(
101			"[Consumer {:?}] Started polling with interval {:?}",
102			self.config.consumer_id, self.config.poll_interval
103		);
104
105		let _ = ctx.self_ref().send(CdcPollMessage::Poll);
106
107		PollState {
108			phase: Phase::Ready,
109			cached_checkpoint: None,
110		}
111	}
112
113	fn handle(&self, state: &mut Self::State, msg: Self::Message, ctx: &Context<Self::Message>) -> Directive {
114		match msg {
115			CdcPollMessage::Poll => self.on_poll(state, ctx),
116			CdcPollMessage::CheckWatermark => self.on_check_watermark(state, ctx),
117			CdcPollMessage::ConsumeResponse(result) => self.on_consume_response(state, ctx, result),
118		}
119	}
120
121	fn config(&self) -> ActorConfig {
122		ActorConfig::new().mailbox_capacity(16)
123	}
124}
125
126impl<H: CdcHost, C: CdcConsume> PollActor<H, C> {
127	#[inline]
128	fn on_poll(&self, state: &mut PollState, ctx: &Context<CdcPollMessage>) -> Directive {
129		if !matches!(state.phase, Phase::Ready) {
130			return Directive::Continue;
131		}
132		if ctx.is_cancelled() {
133			debug!("[Consumer {:?}] Stopped", self.config.consumer_id);
134			return Directive::Stop;
135		}
136		let current_version = match self.host.current_version() {
137			Ok(v) => v,
138			Err(e) => {
139				error!("[Consumer {:?}] Error getting current version: {}", self.config.consumer_id, e);
140				ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
141				return Directive::Continue;
142			}
143		};
144		if self.host.done_until() >= current_version {
145			self.start_consume(state, ctx);
146		} else {
147			state.phase = Phase::WaitingForWatermark {
148				current_version,
149				retries_remaining: 4,
150			};
151			ctx.schedule_once(Duration::from_millis(50), || CdcPollMessage::CheckWatermark);
152		}
153		Directive::Continue
154	}
155
156	#[inline]
157	fn on_check_watermark(&self, state: &mut PollState, ctx: &Context<CdcPollMessage>) -> Directive {
158		let Phase::WaitingForWatermark {
159			current_version,
160			retries_remaining,
161		} = state.phase
162		else {
163			return Directive::Continue;
164		};
165		if ctx.is_cancelled() {
166			debug!("[Consumer {:?}] Stopped", self.config.consumer_id);
167			return Directive::Stop;
168		}
169		let watermark_ready = self.host.done_until() >= current_version;
170		let timed_out = retries_remaining == 0;
171		if watermark_ready || timed_out {
172			state.phase = Phase::Ready;
173			self.start_consume(state, ctx);
174		} else {
175			state.phase = Phase::WaitingForWatermark {
176				current_version,
177				retries_remaining: retries_remaining - 1,
178			};
179			ctx.schedule_once(Duration::from_millis(50), || CdcPollMessage::CheckWatermark);
180		}
181		Directive::Continue
182	}
183
184	#[inline]
185	fn on_consume_response(
186		&self,
187		state: &mut PollState,
188		ctx: &Context<CdcPollMessage>,
189		result: Result<()>,
190	) -> Directive {
191		if let Phase::WaitingForConsume {
192			latest_version,
193			count,
194		} = mem::replace(&mut state.phase, Phase::Ready)
195		{
196			self.finish_consume(state, ctx, latest_version, count, result);
197		}
198		Directive::Continue
199	}
200
201	fn start_consume(&self, state: &mut PollState, ctx: &Context<CdcPollMessage>) {
202		state.phase = Phase::Ready;
203		let safe_version = self.host.done_until();
204
205		let Some(checkpoint) = self.resolve_checkpoint(state, ctx) else {
206			return;
207		};
208		if safe_version <= checkpoint {
209			ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
210			return;
211		}
212
213		let Some(transactions) = self.fetch_or_reschedule(checkpoint, safe_version, ctx) else {
214			return;
215		};
216		if transactions.is_empty() {
217			ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
218			return;
219		}
220
221		let (count, latest_version) = summarize_batch(checkpoint, &transactions);
222		let relevant_cdcs: Vec<Cdc> = transactions.into_iter().filter(is_relevant_cdc).collect();
223
224		if relevant_cdcs.is_empty() {
225			self.advance_checkpoint_skip_ahead(state, ctx, latest_version);
226			return;
227		}
228
229		state.phase = Phase::WaitingForConsume {
230			latest_version,
231			count,
232		};
233		self.dispatch_to_consumer(relevant_cdcs, ctx);
234	}
235
236	#[inline]
237	fn advance_checkpoint_skip_ahead(
238		&self,
239		state: &mut PollState,
240		ctx: &Context<CdcPollMessage>,
241		latest_version: CommitVersion,
242	) {
243		state.cached_checkpoint = Some(latest_version);
244		self.publish_watermark(latest_version);
245		let _ = ctx.self_ref().send(CdcPollMessage::Poll);
246	}
247
248	#[inline]
249	fn resolve_checkpoint(&self, state: &mut PollState, ctx: &Context<CdcPollMessage>) -> Option<CommitVersion> {
250		if let Some(v) = state.cached_checkpoint {
251			return Some(v);
252		}
253		let v = self.seed_checkpoint_from_durable(ctx)?;
254		state.cached_checkpoint = Some(v);
255		self.publish_watermark(v);
256		Some(v)
257	}
258
259	#[inline]
260	fn seed_checkpoint_from_durable(&self, ctx: &Context<CdcPollMessage>) -> Option<CommitVersion> {
261		let mut query = match self.host.begin_query() {
262			Ok(q) => q,
263			Err(e) => {
264				error!("[Consumer {:?}] Error beginning query: {}", self.config.consumer_id, e);
265				ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
266				return None;
267			}
268		};
269		let v = match CdcCheckpoint::fetch(&mut Transaction::Query(&mut query), &self.consumer_key) {
270			Ok(c) => c,
271			Err(e) => {
272				error!("[Consumer {:?}] Error fetching checkpoint: {}", self.config.consumer_id, e);
273				ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
274				return None;
275			}
276		};
277		drop(query);
278		Some(v)
279	}
280
281	#[inline]
282	fn fetch_or_reschedule(
283		&self,
284		checkpoint: CommitVersion,
285		safe_version: CommitVersion,
286		ctx: &Context<CdcPollMessage>,
287	) -> Option<Vec<Cdc>> {
288		match self.fetch_cdcs_until(checkpoint, safe_version) {
289			Ok(t) => Some(t),
290			Err(e) => {
291				error!("[Consumer {:?}] Error fetching CDCs: {}", self.config.consumer_id, e);
292				ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
293				None
294			}
295		}
296	}
297
298	#[inline]
299	fn dispatch_to_consumer(&self, cdcs: Vec<Cdc>, ctx: &Context<CdcPollMessage>) {
300		let self_ref = ctx.self_ref().clone();
301		let reply: Box<dyn FnOnce(Result<()>) + Send> = Box::new(move |result| {
302			let _ = self_ref.send(CdcPollMessage::ConsumeResponse(result));
303		});
304		self.consumer.consume(cdcs, reply);
305	}
306
307	fn finish_consume(
308		&self,
309		state: &mut PollState,
310		ctx: &Context<CdcPollMessage>,
311		latest_version: CommitVersion,
312		count: usize,
313		result: Result<()>,
314	) {
315		state.phase = Phase::Ready;
316		match result {
317			Ok(()) => self.advance_after_success(state, ctx, latest_version, count),
318			Err(e) => self.reschedule_after_error(ctx, e),
319		}
320	}
321
322	#[inline]
323	fn advance_after_success(
324		&self,
325		state: &mut PollState,
326		ctx: &Context<CdcPollMessage>,
327		latest_version: CommitVersion,
328		count: usize,
329	) {
330		state.cached_checkpoint = Some(latest_version);
331		self.publish_watermark(latest_version);
332		if count > 0 {
333			let _ = ctx.self_ref().send(CdcPollMessage::Poll);
334		} else {
335			ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
336		}
337	}
338
339	#[inline]
340	fn reschedule_after_error(&self, ctx: &Context<CdcPollMessage>, err: Error) {
341		error!("[Consumer {:?}] Error consuming events: {}", self.config.consumer_id, err);
342		ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
343	}
344
345	fn fetch_cdcs_until(&self, since_version: CommitVersion, until_version: CommitVersion) -> Result<Vec<Cdc>> {
346		let batch_size = self.config.max_batch_size.unwrap_or(1024);
347		let batch = self.store.read_range(
348			Bound::Excluded(since_version),
349			Bound::Included(until_version),
350			batch_size,
351		)?;
352		Ok(batch.items)
353	}
354}
355
356#[inline]
357fn summarize_batch(checkpoint: CommitVersion, transactions: &[Cdc]) -> (usize, CommitVersion) {
358	let count = transactions.len();
359	let latest_version = transactions.iter().map(|tx| tx.version).max().unwrap_or(checkpoint);
360	(count, latest_version)
361}
362
363fn is_relevant_cdc(cdc: &Cdc) -> bool {
364	!cdc.changes.is_empty() || cdc.system_changes.iter().any(is_relevant_system_change)
365}
366
367fn is_relevant_system_change(change: &SystemChange) -> bool {
368	let key = match change {
369		SystemChange::Insert {
370			key,
371			..
372		}
373		| SystemChange::Update {
374			key,
375			..
376		}
377		| SystemChange::Delete {
378			key,
379			..
380		} => key,
381	};
382	Key::kind(key)
383		.map(|kind| {
384			matches!(
385				kind,
386				KeyKind::Row
387					| KeyKind::Flow | KeyKind::FlowNode | KeyKind::FlowNodeByFlow
388					| KeyKind::FlowEdge | KeyKind::FlowEdgeByFlow
389					| KeyKind::NamespaceFlow
390			)
391		})
392		.unwrap_or(false)
393}