reifydb_cdc/
poll.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::{
5	ops::Bound,
6	sync::{
7		Arc,
8		atomic::{AtomicBool, Ordering},
9	},
10	thread::{self, JoinHandle},
11	time::Duration,
12};
13
14use reifydb_core::{
15	CommitVersion, EncodedKey, Result,
16	event::cdc::CdcCheckpointAdvancedEvent,
17	interface::{
18		Cdc, CdcChange, CdcConsumerId, CdcQueryTransaction, CommandTransaction, Engine as EngineInterface, Key,
19		KeyKind, MultiVersionCommandTransaction, WithEventBus,
20	},
21	key::{CdcConsumerKey, EncodableKey},
22	log_debug, log_error,
23};
24use reifydb_engine::StandardEngine;
25use reifydb_sub_api::Priority;
26
27use crate::{CdcCheckpoint, CdcConsume, CdcConsumer};
28
29/// Configuration for a CDC poll consumer
30#[derive(Debug, Clone)]
31pub struct PollConsumerConfig {
32	/// Unique identifier for this consumer
33	pub consumer_id: CdcConsumerId,
34	/// How often to poll for new CDC events
35	pub poll_interval: Duration,
36	/// Priority for the polling task in the worker pool
37	pub priority: Priority,
38	/// Maximum batch size for fetching CDC events (None = unbounded)
39	pub max_batch_size: Option<u64>,
40}
41
42impl PollConsumerConfig {
43	pub fn new(consumer_id: CdcConsumerId, poll_interval: Duration, max_batch_size: Option<u64>) -> Self {
44		Self {
45			consumer_id,
46			poll_interval,
47			priority: Priority::Normal,
48			max_batch_size,
49		}
50	}
51
52	pub fn with_priority(mut self, priority: Priority) -> Self {
53		self.priority = priority;
54		self
55	}
56}
57
58pub struct PollConsumer<F: CdcConsume> {
59	engine: Option<StandardEngine>,
60	consumer: Option<Box<F>>,
61	config: PollConsumerConfig,
62	state: Arc<ConsumerState>,
63	worker: Option<JoinHandle<()>>,
64}
65
66struct ConsumerState {
67	consumer_key: EncodedKey,
68	running: AtomicBool,
69}
70
71impl<C: CdcConsume> PollConsumer<C> {
72	pub fn new(config: PollConsumerConfig, engine: StandardEngine, consume: C) -> Self {
73		let consumer_key = CdcConsumerKey {
74			consumer: config.consumer_id.clone(),
75		}
76		.encode();
77
78		Self {
79			engine: Some(engine),
80			consumer: Some(Box::new(consume)),
81			config,
82			state: Arc::new(ConsumerState {
83				consumer_key,
84				running: AtomicBool::new(false),
85			}),
86			worker: None,
87		}
88	}
89
90	fn consume_batch(
91		state: &ConsumerState,
92		engine: &StandardEngine,
93		consumer: &C,
94		consumer_id: &CdcConsumerId,
95		max_batch_size: Option<u64>,
96	) -> Result<Option<(CommitVersion, u64)>> {
97		let mut transaction = engine.begin_command()?;
98
99		let checkpoint = CdcCheckpoint::fetch(&mut transaction, &state.consumer_key)?;
100
101		let transactions = fetch_cdcs_since(&mut transaction, checkpoint, max_batch_size)?;
102		if transactions.is_empty() {
103			transaction.rollback()?;
104			return Ok(None);
105		}
106
107		let latest_version = transactions.iter().map(|tx| tx.version).max().unwrap_or(checkpoint);
108
109		// Filter transactions to only those with Row or Flow-related changes
110		// Flow-related changes are needed to detect new flow definitions
111		let relevant_transactions = transactions
112			.into_iter()
113			.filter(|tx| {
114				tx.changes.iter().any(|change| match &change.change {
115					CdcChange::Insert {
116						key,
117						..
118					}
119					| CdcChange::Update {
120						key,
121						..
122					}
123					| CdcChange::Delete {
124						key,
125						..
126					} => {
127						if let Some(kind) = Key::kind(key) {
128							matches!(
129								kind,
130								KeyKind::Row
131									| KeyKind::Flow | KeyKind::FlowNode
132									| KeyKind::FlowNodeByFlow | KeyKind::FlowEdge
133									| KeyKind::FlowEdgeByFlow | KeyKind::NamespaceFlow
134							)
135						} else {
136							false
137						}
138					}
139				})
140			})
141			.collect::<Vec<_>>();
142
143		if !relevant_transactions.is_empty() {
144			consumer.consume(&mut transaction, relevant_transactions)?;
145		}
146
147		CdcCheckpoint::persist(&mut transaction, &state.consumer_key, latest_version)?;
148		let current_version = transaction.commit()?;
149
150		engine.event_bus().emit(CdcCheckpointAdvancedEvent {
151			consumer_id: consumer_id.clone(),
152			version: latest_version,
153		});
154
155		let lag = current_version.0.saturating_sub(latest_version.0);
156
157		Ok(Some((latest_version, lag)))
158	}
159
160	fn polling_loop(
161		config: &PollConsumerConfig,
162		engine: StandardEngine,
163		consumer: Box<C>,
164		state: Arc<ConsumerState>,
165	) {
166		log_debug!(
167			"[Consumer {:?}] Started polling with interval {:?}",
168			config.consumer_id,
169			config.poll_interval
170		);
171
172		while state.running.load(Ordering::Acquire) {
173			match Self::consume_batch(
174				&state,
175				&engine,
176				&consumer,
177				&config.consumer_id,
178				config.max_batch_size,
179			) {
180				Ok(Some((_processed_version, _lag))) => {
181					// FIXME log this
182					println!("processed {} with lag {}", _processed_version, _lag)
183				}
184				Ok(None) => {}
185				Err(error) => {
186					log_error!(
187						"[Consumer {:?}] Error consuming events: {}",
188						config.consumer_id,
189						error
190					);
191				}
192			}
193		}
194
195		log_debug!("[Consumer {:?}] Stopped", config.consumer_id);
196	}
197}
198
199impl<F: CdcConsume> CdcConsumer for PollConsumer<F> {
200	fn start(&mut self) -> Result<()> {
201		assert!(self.worker.is_none(), "start() can only be called once");
202
203		if self.state.running.swap(true, Ordering::AcqRel) {
204			return Ok(());
205		}
206
207		let engine = self.engine.take().expect("engine already consumed");
208
209		let consumer = self.consumer.take().expect("consumer already consumed");
210
211		let state = Arc::clone(&self.state);
212		let config = self.config.clone();
213
214		self.worker = Some(thread::spawn(move || {
215			Self::polling_loop(&config, engine, consumer, state);
216		}));
217
218		Ok(())
219	}
220
221	fn stop(&mut self) -> Result<()> {
222		if !self.state.running.swap(false, Ordering::AcqRel) {
223			return Ok(());
224		}
225
226		if let Some(worker) = self.worker.take() {
227			worker.join().expect("Failed to join consumer thread");
228		}
229
230		Ok(())
231	}
232
233	fn is_running(&self) -> bool {
234		self.state.running.load(Ordering::Acquire)
235	}
236}
237
238fn fetch_cdcs_since(
239	txn: &mut impl CommandTransaction,
240	since_version: CommitVersion,
241	max_batch_size: Option<u64>,
242) -> Result<Vec<Cdc>> {
243	let upper_bound = match max_batch_size {
244		Some(size) => Bound::Included(CommitVersion(since_version.0.saturating_add(size))),
245		None => Bound::Unbounded,
246	};
247	txn.with_cdc_query(|cdc| Ok(cdc.range(Bound::Excluded(since_version), upper_bound)?.collect::<Vec<_>>()))
248}