1use std::{
5 ops::Bound,
6 sync::{
7 Arc,
8 atomic::{AtomicBool, Ordering},
9 },
10 thread::{self, JoinHandle},
11 time::Duration,
12};
13
14use reifydb_core::{
15 CommitVersion, EncodedKey, Result,
16 event::cdc::CdcCheckpointAdvancedEvent,
17 interface::{
18 Cdc, CdcChange, CdcConsumerId, CdcQueryTransaction, CommandTransaction, Engine as EngineInterface, Key,
19 KeyKind, MultiVersionCommandTransaction, WithEventBus,
20 },
21 key::{CdcConsumerKey, EncodableKey},
22 log_debug, log_error,
23};
24use reifydb_engine::StandardEngine;
25use reifydb_sub_api::Priority;
26
27use crate::{CdcCheckpoint, CdcConsume, CdcConsumer};
28
29#[derive(Debug, Clone)]
31pub struct PollConsumerConfig {
32 pub consumer_id: CdcConsumerId,
34 pub poll_interval: Duration,
36 pub priority: Priority,
38 pub max_batch_size: Option<u64>,
40}
41
42impl PollConsumerConfig {
43 pub fn new(consumer_id: CdcConsumerId, poll_interval: Duration, max_batch_size: Option<u64>) -> Self {
44 Self {
45 consumer_id,
46 poll_interval,
47 priority: Priority::Normal,
48 max_batch_size,
49 }
50 }
51
52 pub fn with_priority(mut self, priority: Priority) -> Self {
53 self.priority = priority;
54 self
55 }
56}
57
58pub struct PollConsumer<F: CdcConsume> {
59 engine: Option<StandardEngine>,
60 consumer: Option<Box<F>>,
61 config: PollConsumerConfig,
62 state: Arc<ConsumerState>,
63 worker: Option<JoinHandle<()>>,
64}
65
66struct ConsumerState {
67 consumer_key: EncodedKey,
68 running: AtomicBool,
69}
70
71impl<C: CdcConsume> PollConsumer<C> {
72 pub fn new(config: PollConsumerConfig, engine: StandardEngine, consume: C) -> Self {
73 let consumer_key = CdcConsumerKey {
74 consumer: config.consumer_id.clone(),
75 }
76 .encode();
77
78 Self {
79 engine: Some(engine),
80 consumer: Some(Box::new(consume)),
81 config,
82 state: Arc::new(ConsumerState {
83 consumer_key,
84 running: AtomicBool::new(false),
85 }),
86 worker: None,
87 }
88 }
89
90 fn consume_batch(
91 state: &ConsumerState,
92 engine: &StandardEngine,
93 consumer: &C,
94 consumer_id: &CdcConsumerId,
95 max_batch_size: Option<u64>,
96 ) -> Result<Option<(CommitVersion, u64)>> {
97 let mut transaction = engine.begin_command()?;
98
99 let checkpoint = CdcCheckpoint::fetch(&mut transaction, &state.consumer_key)?;
100
101 let transactions = fetch_cdcs_since(&mut transaction, checkpoint, max_batch_size)?;
102 if transactions.is_empty() {
103 transaction.rollback()?;
104 return Ok(None);
105 }
106
107 let latest_version = transactions.iter().map(|tx| tx.version).max().unwrap_or(checkpoint);
108
109 let relevant_transactions = transactions
112 .into_iter()
113 .filter(|tx| {
114 tx.changes.iter().any(|change| match &change.change {
115 CdcChange::Insert {
116 key,
117 ..
118 }
119 | CdcChange::Update {
120 key,
121 ..
122 }
123 | CdcChange::Delete {
124 key,
125 ..
126 } => {
127 if let Some(kind) = Key::kind(key) {
128 matches!(
129 kind,
130 KeyKind::Row
131 | KeyKind::Flow | KeyKind::FlowNode
132 | KeyKind::FlowNodeByFlow | KeyKind::FlowEdge
133 | KeyKind::FlowEdgeByFlow | KeyKind::NamespaceFlow
134 )
135 } else {
136 false
137 }
138 }
139 })
140 })
141 .collect::<Vec<_>>();
142
143 if !relevant_transactions.is_empty() {
144 consumer.consume(&mut transaction, relevant_transactions)?;
145 }
146
147 CdcCheckpoint::persist(&mut transaction, &state.consumer_key, latest_version)?;
148 let current_version = transaction.commit()?;
149
150 engine.event_bus().emit(CdcCheckpointAdvancedEvent {
151 consumer_id: consumer_id.clone(),
152 version: latest_version,
153 });
154
155 let lag = current_version.0.saturating_sub(latest_version.0);
156
157 Ok(Some((latest_version, lag)))
158 }
159
160 fn polling_loop(
161 config: &PollConsumerConfig,
162 engine: StandardEngine,
163 consumer: Box<C>,
164 state: Arc<ConsumerState>,
165 ) {
166 log_debug!(
167 "[Consumer {:?}] Started polling with interval {:?}",
168 config.consumer_id,
169 config.poll_interval
170 );
171
172 while state.running.load(Ordering::Acquire) {
173 match Self::consume_batch(
174 &state,
175 &engine,
176 &consumer,
177 &config.consumer_id,
178 config.max_batch_size,
179 ) {
180 Ok(Some((_processed_version, _lag))) => {
181 println!("processed {} with lag {}", _processed_version, _lag)
183 }
184 Ok(None) => {}
185 Err(error) => {
186 log_error!(
187 "[Consumer {:?}] Error consuming events: {}",
188 config.consumer_id,
189 error
190 );
191 }
192 }
193 }
194
195 log_debug!("[Consumer {:?}] Stopped", config.consumer_id);
196 }
197}
198
199impl<F: CdcConsume> CdcConsumer for PollConsumer<F> {
200 fn start(&mut self) -> Result<()> {
201 assert!(self.worker.is_none(), "start() can only be called once");
202
203 if self.state.running.swap(true, Ordering::AcqRel) {
204 return Ok(());
205 }
206
207 let engine = self.engine.take().expect("engine already consumed");
208
209 let consumer = self.consumer.take().expect("consumer already consumed");
210
211 let state = Arc::clone(&self.state);
212 let config = self.config.clone();
213
214 self.worker = Some(thread::spawn(move || {
215 Self::polling_loop(&config, engine, consumer, state);
216 }));
217
218 Ok(())
219 }
220
221 fn stop(&mut self) -> Result<()> {
222 if !self.state.running.swap(false, Ordering::AcqRel) {
223 return Ok(());
224 }
225
226 if let Some(worker) = self.worker.take() {
227 worker.join().expect("Failed to join consumer thread");
228 }
229
230 Ok(())
231 }
232
233 fn is_running(&self) -> bool {
234 self.state.running.load(Ordering::Acquire)
235 }
236}
237
238fn fetch_cdcs_since(
239 txn: &mut impl CommandTransaction,
240 since_version: CommitVersion,
241 max_batch_size: Option<u64>,
242) -> Result<Vec<Cdc>> {
243 let upper_bound = match max_batch_size {
244 Some(size) => Bound::Included(CommitVersion(since_version.0.saturating_add(size))),
245 None => Bound::Unbounded,
246 };
247 txn.with_cdc_query(|cdc| Ok(cdc.range(Bound::Excluded(since_version), upper_bound)?.collect::<Vec<_>>()))
248}