use std::{mem, ops::Bound, time::Duration};
use reifydb_core::{
common::CommitVersion,
encoded::key::EncodedKey,
interface::cdc::{Cdc, CdcConsumerId, SystemChange},
key::{EncodableKey, Key, cdc_consumer::CdcConsumerKey, kind::KeyKind},
};
use reifydb_runtime::actor::{
context::Context,
system::ActorConfig,
traits::{Actor, Directive},
};
use reifydb_transaction::transaction::Transaction;
use reifydb_type::Result;
use tracing::{debug, error};
use super::{checkpoint::CdcCheckpoint, consumer::CdcConsume, host::CdcHost};
use crate::storage::CdcStore;
pub enum PollMsg {
Poll,
CheckWatermark,
ConsumeResponse(Result<()>),
}
#[derive(Debug, Clone)]
pub struct PollActorConfig {
pub consumer_id: CdcConsumerId,
pub poll_interval: Duration,
pub max_batch_size: Option<u64>,
}
pub struct PollActor<H: CdcHost, C: CdcConsume> {
config: PollActorConfig,
host: H,
consumer: Box<C>,
store: CdcStore,
consumer_key: EncodedKey,
}
impl<H: CdcHost, C: CdcConsume> PollActor<H, C> {
pub fn new(config: PollActorConfig, host: H, consumer: C, store: CdcStore) -> Self {
let consumer_key = CdcConsumerKey {
consumer: config.consumer_id.clone(),
}
.encode();
Self {
config,
host,
consumer: Box::new(consumer),
store,
consumer_key,
}
}
}
pub enum PollState {
Ready,
WaitingForWatermark {
current_version: CommitVersion,
retries_remaining: u8,
},
WaitingForConsume {
latest_version: CommitVersion,
count: usize,
},
}
impl<H: CdcHost, C: CdcConsume + Send + Sync + 'static> Actor for PollActor<H, C> {
type State = PollState;
type Message = PollMsg;
fn init(&self, ctx: &Context<Self::Message>) -> Self::State {
debug!(
"[Consumer {:?}] Started polling with interval {:?}",
self.config.consumer_id, self.config.poll_interval
);
let _ = ctx.self_ref().send(PollMsg::Poll);
PollState::Ready
}
fn handle(&self, state: &mut Self::State, msg: Self::Message, ctx: &Context<Self::Message>) -> Directive {
match msg {
PollMsg::Poll => {
if !matches!(*state, PollState::Ready) {
return Directive::Continue;
}
if ctx.is_cancelled() {
debug!("[Consumer {:?}] Stopped", self.config.consumer_id);
return Directive::Stop;
}
let current_version = match self.host.current_version() {
Ok(v) => v,
Err(e) => {
error!(
"[Consumer {:?}] Error getting current version: {}",
self.config.consumer_id, e
);
ctx.schedule_once(self.config.poll_interval, || PollMsg::Poll);
return Directive::Continue;
}
};
let done = self.host.done_until();
if done >= current_version {
self.start_consume(state, ctx);
} else {
*state = PollState::WaitingForWatermark {
current_version,
retries_remaining: 4,
};
ctx.schedule_once(Duration::from_millis(50), || PollMsg::CheckWatermark);
}
}
PollMsg::CheckWatermark => {
if let PollState::WaitingForWatermark {
current_version,
retries_remaining,
} = *state
{
let is_cancelled = ctx.is_cancelled();
if is_cancelled {
debug!("[Consumer {:?}] Stopped", self.config.consumer_id);
return Directive::Stop;
}
let done = self.host.done_until();
if done >= current_version {
*state = PollState::Ready;
self.start_consume(state, ctx);
} else if retries_remaining == 0 {
*state = PollState::Ready;
self.start_consume(state, ctx);
} else {
*state = PollState::WaitingForWatermark {
current_version,
retries_remaining: retries_remaining - 1,
};
ctx.schedule_once(Duration::from_millis(50), || {
PollMsg::CheckWatermark
});
}
}
}
PollMsg::ConsumeResponse(result) => {
if let PollState::WaitingForConsume {
latest_version,
count,
} = mem::replace(&mut *state, PollState::Ready)
{
self.finish_consume(state, ctx, latest_version, count, result);
}
}
}
Directive::Continue
}
fn config(&self) -> ActorConfig {
ActorConfig::new().mailbox_capacity(16)
}
}
impl<H: CdcHost, C: CdcConsume> PollActor<H, C> {
fn start_consume(&self, state: &mut PollState, ctx: &Context<PollMsg>) {
*state = PollState::Ready;
let safe_version = self.host.done_until();
let mut query = match self.host.begin_query() {
Ok(q) => q,
Err(e) => {
error!("[Consumer {:?}] Error beginning query: {}", self.config.consumer_id, e);
ctx.schedule_once(self.config.poll_interval, || PollMsg::Poll);
return;
}
};
let checkpoint = match CdcCheckpoint::fetch(&mut Transaction::Query(&mut query), &self.consumer_key) {
Ok(c) => c,
Err(e) => {
error!("[Consumer {:?}] Error fetching checkpoint: {}", self.config.consumer_id, e);
ctx.schedule_once(self.config.poll_interval, || PollMsg::Poll);
return;
}
};
drop(query);
if safe_version <= checkpoint {
ctx.schedule_once(self.config.poll_interval, || PollMsg::Poll);
return;
}
let transactions = match self.fetch_cdcs_until(checkpoint, safe_version) {
Ok(t) => t,
Err(e) => {
error!("[Consumer {:?}] Error fetching CDCs: {}", self.config.consumer_id, e);
ctx.schedule_once(self.config.poll_interval, || PollMsg::Poll);
return;
}
};
if transactions.is_empty() {
ctx.schedule_once(self.config.poll_interval, || PollMsg::Poll);
return;
}
let count = transactions.len();
let latest_version = transactions.iter().map(|tx| tx.version).max().unwrap_or(checkpoint);
let relevant_cdcs = transactions
.into_iter()
.filter(|cdc| {
!cdc.changes.is_empty()
|| cdc.system_changes.iter().any(|sys_change| match sys_change {
SystemChange::Insert {
key,
..
}
| SystemChange::Update {
key,
..
}
| SystemChange::Delete {
key,
..
} => {
if let Some(kind) = Key::kind(key) {
matches!(
kind,
KeyKind::Row
| KeyKind::Flow | KeyKind::FlowNode
| KeyKind::FlowNodeByFlow | KeyKind::FlowEdge
| KeyKind::FlowEdgeByFlow
| KeyKind::NamespaceFlow
)
} else {
false
}
}
})
})
.collect::<Vec<_>>();
if relevant_cdcs.is_empty() {
match self.host.begin_command() {
Ok(mut transaction) => {
match CdcCheckpoint::persist(
&mut transaction,
&self.consumer_key,
latest_version,
) {
Ok(_) => {
let _ = transaction.commit();
}
Err(e) => {
error!(
"[Consumer {:?}] Error persisting checkpoint: {}",
self.config.consumer_id, e
);
let _ = transaction.rollback();
}
}
}
Err(e) => {
error!(
"[Consumer {:?}] Error beginning transaction for checkpoint: {}",
self.config.consumer_id, e
);
}
}
let _ = ctx.self_ref().send(PollMsg::Poll);
return;
}
*state = PollState::WaitingForConsume {
latest_version,
count,
};
let self_ref = ctx.self_ref().clone();
let reply: Box<dyn FnOnce(Result<()>) + Send> = Box::new(move |result| {
let _ = self_ref.send(PollMsg::ConsumeResponse(result));
});
self.consumer.consume(relevant_cdcs, reply);
}
fn finish_consume(
&self,
state: &mut PollState,
ctx: &Context<PollMsg>,
latest_version: CommitVersion,
count: usize,
result: Result<()>,
) {
*state = PollState::Ready;
match result {
Ok(()) => {
match self.host.begin_command() {
Ok(mut transaction) => {
match CdcCheckpoint::persist(
&mut transaction,
&self.consumer_key,
latest_version,
) {
Ok(_) => {
match transaction.commit() {
Ok(_) => {
if count > 0 {
let _ = ctx
.self_ref()
.send(PollMsg::Poll);
} else {
ctx.schedule_once(
self.config
.poll_interval,
|| PollMsg::Poll,
);
}
}
Err(e) => {
error!(
"[Consumer {:?}] Error committing checkpoint: {}",
self.config.consumer_id, e
);
ctx.schedule_once(
self.config.poll_interval,
|| PollMsg::Poll,
);
}
}
}
Err(e) => {
error!(
"[Consumer {:?}] Error persisting checkpoint: {}",
self.config.consumer_id, e
);
let _ = transaction.rollback();
ctx.schedule_once(self.config.poll_interval, || {
PollMsg::Poll
});
}
}
}
Err(e) => {
error!(
"[Consumer {:?}] Error beginning checkpoint transaction: {}",
self.config.consumer_id, e
);
ctx.schedule_once(self.config.poll_interval, || PollMsg::Poll);
}
}
}
Err(e) => {
error!("[Consumer {:?}] Error consuming events: {}", self.config.consumer_id, e);
ctx.schedule_once(self.config.poll_interval, || PollMsg::Poll);
}
}
}
fn fetch_cdcs_until(&self, since_version: CommitVersion, until_version: CommitVersion) -> Result<Vec<Cdc>> {
let batch_size = self.config.max_batch_size.unwrap_or(1024);
let batch = self.store.read_range(
Bound::Excluded(since_version),
Bound::Included(until_version),
batch_size,
)?;
Ok(batch.items)
}
}