Skip to main content

reifydb_cdc/consume/
poll.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	sync::{
6		Arc,
7		atomic::{AtomicBool, Ordering},
8	},
9	time::Duration,
10};
11
12use reifydb_core::{actors::cdc::CdcPollHandle, interface::cdc::CdcConsumerId};
13use reifydb_runtime::actor::system::ActorSystem;
14use reifydb_type::Result;
15
16use super::{
17	actor::{PollActor, PollActorConfig},
18	consumer::{CdcConsume, CdcConsumer},
19	host::CdcHost,
20};
21use crate::storage::CdcStore;
22
23/// Configuration for a CDC poll consumer
24#[derive(Debug, Clone)]
25pub struct PollConsumerConfig {
26	/// Unique identifier for this consumer
27	pub consumer_id: CdcConsumerId,
28	/// Thread name for the poll worker
29	pub thread_name: String,
30	/// How often to poll for new CDC events
31	pub poll_interval: Duration,
32	/// Maximum batch size for fetching CDC events (None = unbounded)
33	pub max_batch_size: Option<u64>,
34}
35
36impl PollConsumerConfig {
37	pub fn new(
38		consumer_id: CdcConsumerId,
39		thread_name: impl Into<String>,
40		poll_interval: Duration,
41		max_batch_size: Option<u64>,
42	) -> Self {
43		Self {
44			consumer_id,
45			thread_name: thread_name.into(),
46			poll_interval,
47			max_batch_size,
48		}
49	}
50}
51
52/// Poll-based CDC consumer backed by an actor.
53///
54/// Implements the `CdcConsumer` trait for start/stop lifecycle management.
55/// Internally uses `PollActor` for the actual polling logic.
56pub struct PollConsumer<H: CdcHost, C: CdcConsume + Send + 'static> {
57	config: PollConsumerConfig,
58	host: Option<H>,
59	consumer: Option<C>,
60	store: Option<CdcStore>,
61	running: Arc<AtomicBool>,
62	actor_system: ActorSystem,
63	/// Handle to the poll actor - must be joined on stop for proper cleanup
64	handle: Option<CdcPollHandle>,
65}
66
67impl<H: CdcHost, C: CdcConsume + Send + 'static> PollConsumer<H, C> {
68	pub fn new(
69		config: PollConsumerConfig,
70		host: H,
71		consume: C,
72		store: CdcStore,
73		actor_system: ActorSystem,
74	) -> Self {
75		Self {
76			config,
77			host: Some(host),
78			consumer: Some(consume),
79			store: Some(store),
80			running: Arc::new(AtomicBool::new(false)),
81			actor_system,
82			handle: None,
83		}
84	}
85}
86
87impl<H: CdcHost, C: CdcConsume + Send + Sync + 'static> CdcConsumer for PollConsumer<H, C> {
88	fn start(&mut self) -> Result<()> {
89		if self.running.swap(true, Ordering::AcqRel) {
90			return Ok(()); // Already running
91		}
92
93		let host = self.host.take().expect("host already consumed");
94		let consumer = self.consumer.take().expect("consumer already consumed");
95		let store = self.store.take().expect("store already consumed");
96
97		let actor_config = PollActorConfig {
98			consumer_id: self.config.consumer_id.clone(),
99			poll_interval: self.config.poll_interval,
100			max_batch_size: self.config.max_batch_size,
101		};
102
103		let actor = PollActor::new(actor_config, host, consumer, store);
104
105		// Use the shared actor system instead of creating a new one
106		let handle = self.actor_system.spawn(&self.config.thread_name, actor);
107		self.handle = Some(handle);
108
109		Ok(())
110	}
111
112	fn stop(&mut self) -> Result<()> {
113		if !self.running.swap(false, Ordering::AcqRel) {
114			return Ok(()); // Already stopped
115		}
116
117		// Signal the actor system to shutdown - this will trigger cancellation
118		// which the actor checks before each poll
119		self.actor_system.shutdown();
120
121		// Join the poll actor thread to ensure proper cleanup
122		// This ensures the PollActor (and its consumer)
123		// are dropped cleanly before we return
124		if let Some(handle) = self.handle.take() {
125			let _ = handle.join();
126		}
127
128		Ok(())
129	}
130
131	fn is_running(&self) -> bool {
132		self.running.load(Ordering::Acquire)
133	}
134}