Skip to main content

reifydb_cdc/consume/
poll.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	sync::{
6		Arc,
7		atomic::{AtomicBool, Ordering},
8	},
9	time::Duration,
10};
11
12use reifydb_core::{actors::cdc::CdcPollHandle, interface::cdc::CdcConsumerId};
13use reifydb_runtime::actor::system::ActorSystem;
14use reifydb_type::Result;
15
16use super::{
17	actor::{PollActor, PollActorConfig},
18	consumer::{CdcConsume, CdcConsumer},
19	host::CdcHost,
20};
21use crate::storage::CdcStore;
22
23/// Configuration for a CDC poll consumer
24#[derive(Debug, Clone)]
25pub struct PollConsumerConfig {
26	/// Unique identifier for this consumer
27	pub consumer_id: CdcConsumerId,
28	/// Thread name for the poll worker
29	pub thread_name: String,
30	/// How often to poll for new CDC events
31	pub poll_interval: Duration,
32	/// Maximum batch size for fetching CDC events (None = unbounded)
33	pub max_batch_size: Option<u64>,
34}
35
36impl PollConsumerConfig {
37	pub fn new(
38		consumer_id: CdcConsumerId,
39		thread_name: impl Into<String>,
40		poll_interval: Duration,
41		max_batch_size: Option<u64>,
42	) -> Self {
43		Self {
44			consumer_id,
45			thread_name: thread_name.into(),
46			poll_interval,
47			max_batch_size,
48		}
49	}
50}
51
52/// Poll-based CDC consumer backed by an actor.
53///
54/// Implements the `CdcConsumer` trait for start/stop lifecycle management.
55/// Internally uses `PollActor` for the actual polling logic.
56pub struct PollConsumer<H: CdcHost, C: CdcConsume + Send + 'static> {
57	config: PollConsumerConfig,
58	host: Option<H>,
59	consumer: Option<C>,
60	store: Option<CdcStore>,
61	running: Arc<AtomicBool>,
62	actor_system: ActorSystem,
63	/// Handle to the poll actor - must be joined on stop for proper cleanup
64	handle: Option<CdcPollHandle>,
65}
66
67impl<H: CdcHost, C: CdcConsume + Send + 'static> PollConsumer<H, C> {
68	pub fn new(
69		config: PollConsumerConfig,
70		host: H,
71		consume: C,
72		store: CdcStore,
73		actor_system: ActorSystem,
74	) -> Self {
75		Self {
76			config,
77			host: Some(host),
78			consumer: Some(consume),
79			store: Some(store),
80			running: Arc::new(AtomicBool::new(false)),
81			actor_system,
82			handle: None,
83		}
84	}
85
86	/// Take ownership of the host/consumer/store from their `Option` slots.
87	/// Panics if called twice; `start`'s `running` swap guards against that.
88	fn take_resources(&mut self) -> (H, C, CdcStore) {
89		let host = self.host.take().expect("host already consumed");
90		let consumer = self.consumer.take().expect("consumer already consumed");
91		let store = self.store.take().expect("store already consumed");
92		(host, consumer, store)
93	}
94
95	fn build_actor_config(&self) -> PollActorConfig {
96		PollActorConfig {
97			consumer_id: self.config.consumer_id.clone(),
98			poll_interval: self.config.poll_interval,
99			max_batch_size: self.config.max_batch_size,
100		}
101	}
102}
103
104impl<H: CdcHost, C: CdcConsume + Send + Sync + 'static> CdcConsumer for PollConsumer<H, C> {
105	fn start(&mut self) -> Result<()> {
106		if self.running.swap(true, Ordering::AcqRel) {
107			return Ok(());
108		}
109		let (host, consumer, store) = self.take_resources();
110		let actor = PollActor::new(self.build_actor_config(), host, consumer, store);
111		self.handle = Some(self.actor_system.spawn(&self.config.thread_name, actor));
112		Ok(())
113	}
114
115	fn stop(&mut self) -> Result<()> {
116		if !self.running.swap(false, Ordering::AcqRel) {
117			return Ok(()); // Already stopped
118		}
119
120		// Signal the actor system to shutdown - this will trigger cancellation
121		// which the actor checks before each poll
122		self.actor_system.shutdown();
123
124		// Join the poll actor thread to ensure proper cleanup
125		// This ensures the PollActor (and its consumer)
126		// are dropped cleanly before we return
127		if let Some(handle) = self.handle.take() {
128			let _ = handle.join();
129		}
130
131		Ok(())
132	}
133
134	fn is_running(&self) -> bool {
135		self.running.load(Ordering::Acquire)
136	}
137}