Skip to main content

reifydb_cdc/consume/
poll.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	sync::{
6		Arc,
7		atomic::{AtomicBool, Ordering},
8	},
9	time::Duration,
10};
11
12use reifydb_core::{actors::cdc::CdcPollHandle, interface::cdc::CdcConsumerId};
13use reifydb_runtime::actor::system::ActorSystem;
14use reifydb_type::Result;
15
16use super::{
17	actor::{PollActor, PollActorConfig},
18	consumer::{CdcConsume, CdcConsumer},
19	host::CdcHost,
20	watermark::CdcConsumerWatermark,
21};
22use crate::storage::CdcStore;
23
24#[derive(Debug, Clone)]
25pub struct PollConsumerConfig {
26	pub consumer_id: CdcConsumerId,
27
28	pub thread_name: String,
29
30	pub poll_interval: Duration,
31
32	pub max_batch_size: Option<u64>,
33
34	pub consumer_watermark: Option<CdcConsumerWatermark>,
35}
36
37impl PollConsumerConfig {
38	pub fn new(
39		consumer_id: CdcConsumerId,
40		thread_name: impl Into<String>,
41		poll_interval: Duration,
42		max_batch_size: Option<u64>,
43	) -> Self {
44		Self {
45			consumer_id,
46			thread_name: thread_name.into(),
47			poll_interval,
48			max_batch_size,
49			consumer_watermark: None,
50		}
51	}
52
53	pub fn with_consumer_watermark(mut self, watermark: CdcConsumerWatermark) -> Self {
54		self.consumer_watermark = Some(watermark);
55		self
56	}
57}
58
59pub struct PollConsumer<H: CdcHost, C: CdcConsume + Send + 'static> {
60	config: PollConsumerConfig,
61	host: Option<H>,
62	consumer: Option<C>,
63	store: Option<CdcStore>,
64	running: Arc<AtomicBool>,
65	actor_system: ActorSystem,
66
67	handle: Option<CdcPollHandle>,
68}
69
70impl<H: CdcHost, C: CdcConsume + Send + 'static> PollConsumer<H, C> {
71	pub fn new(
72		config: PollConsumerConfig,
73		host: H,
74		consume: C,
75		store: CdcStore,
76		actor_system: ActorSystem,
77	) -> Self {
78		Self {
79			config,
80			host: Some(host),
81			consumer: Some(consume),
82			store: Some(store),
83			running: Arc::new(AtomicBool::new(false)),
84			actor_system,
85			handle: None,
86		}
87	}
88
89	fn take_resources(&mut self) -> (H, C, CdcStore) {
90		let host = self.host.take().expect("host already consumed");
91		let consumer = self.consumer.take().expect("consumer already consumed");
92		let store = self.store.take().expect("store already consumed");
93		(host, consumer, store)
94	}
95
96	fn build_actor_config(&self) -> PollActorConfig {
97		PollActorConfig {
98			consumer_id: self.config.consumer_id.clone(),
99			poll_interval: self.config.poll_interval,
100			max_batch_size: self.config.max_batch_size,
101		}
102	}
103}
104
105impl<H: CdcHost, C: CdcConsume + Send + Sync + 'static> CdcConsumer for PollConsumer<H, C> {
106	fn start(&mut self) -> Result<()> {
107		if self.running.swap(true, Ordering::AcqRel) {
108			return Ok(());
109		}
110		let (host, consumer, store) = self.take_resources();
111		let watermark = self.config.consumer_watermark.clone();
112		let actor = PollActor::new(self.build_actor_config(), host, consumer, store, watermark);
113		self.handle = Some(self.actor_system.spawn_system(&self.config.thread_name, actor));
114		Ok(())
115	}
116
117	fn stop(&mut self) -> Result<()> {
118		if !self.running.swap(false, Ordering::AcqRel) {
119			return Ok(());
120		}
121
122		self.actor_system.shutdown();
123
124		if let Some(handle) = self.handle.take() {
125			let _ = handle.join();
126		}
127
128		Ok(())
129	}
130
131	fn is_running(&self) -> bool {
132		self.running.load(Ordering::Acquire)
133	}
134}