reifydb-cdc 0.5.0

Change Data Capture module for ReifyDB
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB

use std::{mem, ops::Bound, time::Duration};

use reifydb_core::{
	actors::cdc::CdcPollMessage,
	common::CommitVersion,
	encoded::key::EncodedKey,
	interface::cdc::{Cdc, CdcConsumerId, SystemChange},
	key::{EncodableKey, Key, cdc_consumer::CdcConsumerKey, kind::KeyKind},
};
use reifydb_runtime::actor::{
	context::Context,
	system::ActorConfig,
	traits::{Actor, Directive},
};
use reifydb_transaction::transaction::Transaction;
use reifydb_type::{Result, error::Error};
use tracing::{debug, error};

use super::{checkpoint::CdcCheckpoint, consumer::CdcConsume, host::CdcHost, watermark::CdcConsumerWatermark};
use crate::storage::CdcStore;

#[derive(Debug, Clone)]
pub struct PollActorConfig {
	pub consumer_id: CdcConsumerId,

	pub poll_interval: Duration,

	pub max_batch_size: Option<u64>,
}

pub struct PollActor<H: CdcHost, C: CdcConsume> {
	config: PollActorConfig,
	host: H,
	consumer: Box<C>,
	store: CdcStore,
	consumer_key: EncodedKey,
	consumer_watermark: Option<CdcConsumerWatermark>,
}

impl<H: CdcHost, C: CdcConsume> PollActor<H, C> {
	pub fn new(
		config: PollActorConfig,
		host: H,
		consumer: C,
		store: CdcStore,
		consumer_watermark: Option<CdcConsumerWatermark>,
	) -> Self {
		let consumer_key = CdcConsumerKey {
			consumer: config.consumer_id.clone(),
		}
		.encode();

		Self {
			config,
			host,
			consumer: Box::new(consumer),
			store,
			consumer_key,
			consumer_watermark,
		}
	}

	#[inline]
	fn publish_watermark(&self, version: CommitVersion) {
		if let Some(wm) = &self.consumer_watermark {
			wm.store(version);
		}
	}
}

pub enum Phase {
	Ready,

	WaitingForWatermark {
		current_version: CommitVersion,
		retries_remaining: u8,
	},

	WaitingForConsume {
		latest_version: CommitVersion,

		count: usize,
	},
}

pub struct PollState {
	phase: Phase,

	cached_checkpoint: Option<CommitVersion>,
}

impl<H: CdcHost, C: CdcConsume + Send + Sync + 'static> Actor for PollActor<H, C> {
	type State = PollState;
	type Message = CdcPollMessage;

	fn init(&self, ctx: &Context<Self::Message>) -> Self::State {
		debug!(
			"[Consumer {:?}] Started polling with interval {:?}",
			self.config.consumer_id, self.config.poll_interval
		);

		let _ = ctx.self_ref().send(CdcPollMessage::Poll);

		PollState {
			phase: Phase::Ready,
			cached_checkpoint: None,
		}
	}

	fn handle(&self, state: &mut Self::State, msg: Self::Message, ctx: &Context<Self::Message>) -> Directive {
		match msg {
			CdcPollMessage::Poll => self.on_poll(state, ctx),
			CdcPollMessage::CheckWatermark => self.on_check_watermark(state, ctx),
			CdcPollMessage::ConsumeResponse(result) => self.on_consume_response(state, ctx, result),
		}
	}

	fn config(&self) -> ActorConfig {
		ActorConfig::new().mailbox_capacity(16)
	}
}

impl<H: CdcHost, C: CdcConsume> PollActor<H, C> {
	#[inline]
	fn on_poll(&self, state: &mut PollState, ctx: &Context<CdcPollMessage>) -> Directive {
		if !matches!(state.phase, Phase::Ready) {
			return Directive::Continue;
		}
		if ctx.is_cancelled() {
			debug!("[Consumer {:?}] Stopped", self.config.consumer_id);
			return Directive::Stop;
		}
		let current_version = match self.host.current_version() {
			Ok(v) => v,
			Err(e) => {
				error!("[Consumer {:?}] Error getting current version: {}", self.config.consumer_id, e);
				ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
				return Directive::Continue;
			}
		};
		if self.host.done_until() >= current_version {
			self.start_consume(state, ctx);
		} else {
			state.phase = Phase::WaitingForWatermark {
				current_version,
				retries_remaining: 4,
			};
			ctx.schedule_once(Duration::from_millis(50), || CdcPollMessage::CheckWatermark);
		}
		Directive::Continue
	}

	#[inline]
	fn on_check_watermark(&self, state: &mut PollState, ctx: &Context<CdcPollMessage>) -> Directive {
		let Phase::WaitingForWatermark {
			current_version,
			retries_remaining,
		} = state.phase
		else {
			return Directive::Continue;
		};
		if ctx.is_cancelled() {
			debug!("[Consumer {:?}] Stopped", self.config.consumer_id);
			return Directive::Stop;
		}
		let watermark_ready = self.host.done_until() >= current_version;
		let timed_out = retries_remaining == 0;
		if watermark_ready || timed_out {
			state.phase = Phase::Ready;
			self.start_consume(state, ctx);
		} else {
			state.phase = Phase::WaitingForWatermark {
				current_version,
				retries_remaining: retries_remaining - 1,
			};
			ctx.schedule_once(Duration::from_millis(50), || CdcPollMessage::CheckWatermark);
		}
		Directive::Continue
	}

	#[inline]
	fn on_consume_response(
		&self,
		state: &mut PollState,
		ctx: &Context<CdcPollMessage>,
		result: Result<()>,
	) -> Directive {
		if let Phase::WaitingForConsume {
			latest_version,
			count,
		} = mem::replace(&mut state.phase, Phase::Ready)
		{
			self.finish_consume(state, ctx, latest_version, count, result);
		}
		Directive::Continue
	}

	fn start_consume(&self, state: &mut PollState, ctx: &Context<CdcPollMessage>) {
		state.phase = Phase::Ready;
		let safe_version = self.host.done_until();

		let Some(checkpoint) = self.resolve_checkpoint(state, ctx) else {
			return;
		};
		if safe_version <= checkpoint {
			ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
			return;
		}

		let Some(transactions) = self.fetch_or_reschedule(checkpoint, safe_version, ctx) else {
			return;
		};
		if transactions.is_empty() {
			ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
			return;
		}

		let (count, latest_version) = summarize_batch(checkpoint, &transactions);
		let relevant_cdcs: Vec<Cdc> = transactions.into_iter().filter(is_relevant_cdc).collect();

		if relevant_cdcs.is_empty() {
			self.advance_checkpoint_skip_ahead(state, ctx, latest_version);
			return;
		}

		state.phase = Phase::WaitingForConsume {
			latest_version,
			count,
		};
		self.dispatch_to_consumer(relevant_cdcs, ctx);
	}

	#[inline]
	fn advance_checkpoint_skip_ahead(
		&self,
		state: &mut PollState,
		ctx: &Context<CdcPollMessage>,
		latest_version: CommitVersion,
	) {
		state.cached_checkpoint = Some(latest_version);
		self.publish_watermark(latest_version);
		let _ = ctx.self_ref().send(CdcPollMessage::Poll);
	}

	#[inline]
	fn resolve_checkpoint(&self, state: &mut PollState, ctx: &Context<CdcPollMessage>) -> Option<CommitVersion> {
		if let Some(v) = state.cached_checkpoint {
			return Some(v);
		}
		let v = self.seed_checkpoint_from_durable(ctx)?;
		state.cached_checkpoint = Some(v);
		self.publish_watermark(v);
		Some(v)
	}

	#[inline]
	fn seed_checkpoint_from_durable(&self, ctx: &Context<CdcPollMessage>) -> Option<CommitVersion> {
		let mut query = match self.host.begin_query() {
			Ok(q) => q,
			Err(e) => {
				error!("[Consumer {:?}] Error beginning query: {}", self.config.consumer_id, e);
				ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
				return None;
			}
		};
		let v = match CdcCheckpoint::fetch(&mut Transaction::Query(&mut query), &self.consumer_key) {
			Ok(c) => c,
			Err(e) => {
				error!("[Consumer {:?}] Error fetching checkpoint: {}", self.config.consumer_id, e);
				ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
				return None;
			}
		};
		drop(query);
		Some(v)
	}

	#[inline]
	fn fetch_or_reschedule(
		&self,
		checkpoint: CommitVersion,
		safe_version: CommitVersion,
		ctx: &Context<CdcPollMessage>,
	) -> Option<Vec<Cdc>> {
		match self.fetch_cdcs_until(checkpoint, safe_version) {
			Ok(t) => Some(t),
			Err(e) => {
				error!("[Consumer {:?}] Error fetching CDCs: {}", self.config.consumer_id, e);
				ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
				None
			}
		}
	}

	#[inline]
	fn dispatch_to_consumer(&self, cdcs: Vec<Cdc>, ctx: &Context<CdcPollMessage>) {
		let self_ref = ctx.self_ref().clone();
		let reply: Box<dyn FnOnce(Result<()>) + Send> = Box::new(move |result| {
			let _ = self_ref.send(CdcPollMessage::ConsumeResponse(result));
		});
		self.consumer.consume(cdcs, reply);
	}

	fn finish_consume(
		&self,
		state: &mut PollState,
		ctx: &Context<CdcPollMessage>,
		latest_version: CommitVersion,
		count: usize,
		result: Result<()>,
	) {
		state.phase = Phase::Ready;
		match result {
			Ok(()) => self.advance_after_success(state, ctx, latest_version, count),
			Err(e) => self.reschedule_after_error(ctx, e),
		}
	}

	#[inline]
	fn advance_after_success(
		&self,
		state: &mut PollState,
		ctx: &Context<CdcPollMessage>,
		latest_version: CommitVersion,
		count: usize,
	) {
		state.cached_checkpoint = Some(latest_version);
		self.publish_watermark(latest_version);
		if count > 0 {
			let _ = ctx.self_ref().send(CdcPollMessage::Poll);
		} else {
			ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
		}
	}

	#[inline]
	fn reschedule_after_error(&self, ctx: &Context<CdcPollMessage>, err: Error) {
		error!("[Consumer {:?}] Error consuming events: {}", self.config.consumer_id, err);
		ctx.schedule_once(self.config.poll_interval, || CdcPollMessage::Poll);
	}

	fn fetch_cdcs_until(&self, since_version: CommitVersion, until_version: CommitVersion) -> Result<Vec<Cdc>> {
		let batch_size = self.config.max_batch_size.unwrap_or(1024);
		let batch = self.store.read_range(
			Bound::Excluded(since_version),
			Bound::Included(until_version),
			batch_size,
		)?;
		Ok(batch.items)
	}
}

#[inline]
fn summarize_batch(checkpoint: CommitVersion, transactions: &[Cdc]) -> (usize, CommitVersion) {
	let count = transactions.len();
	let latest_version = transactions.iter().map(|tx| tx.version).max().unwrap_or(checkpoint);
	(count, latest_version)
}

fn is_relevant_cdc(cdc: &Cdc) -> bool {
	!cdc.changes.is_empty() || cdc.system_changes.iter().any(is_relevant_system_change)
}

fn is_relevant_system_change(change: &SystemChange) -> bool {
	let key = match change {
		SystemChange::Insert {
			key,
			..
		}
		| SystemChange::Update {
			key,
			..
		}
		| SystemChange::Delete {
			key,
			..
		} => key,
	};
	Key::kind(key)
		.map(|kind| {
			matches!(
				kind,
				KeyKind::Row
					| KeyKind::Flow | KeyKind::FlowNode | KeyKind::FlowNodeByFlow
					| KeyKind::FlowEdge | KeyKind::FlowEdgeByFlow
					| KeyKind::NamespaceFlow
			)
		})
		.unwrap_or(false)
}