reifydb_cdc/consume/
poll.rs1use std::{
5 sync::{
6 Arc,
7 atomic::{AtomicBool, Ordering},
8 },
9 time::Duration,
10};
11
12use reifydb_core::{actors::cdc::CdcPollHandle, interface::cdc::CdcConsumerId};
13use reifydb_runtime::actor::system::ActorSystem;
14use reifydb_type::Result;
15
16use super::{
17 actor::{PollActor, PollActorConfig},
18 consumer::{CdcConsume, CdcConsumer},
19 host::CdcHost,
20 watermark::CdcConsumerWatermark,
21};
22use crate::storage::CdcStore;
23
24#[derive(Debug, Clone)]
25pub struct PollConsumerConfig {
26 pub consumer_id: CdcConsumerId,
27
28 pub thread_name: String,
29
30 pub poll_interval: Duration,
31
32 pub max_batch_size: Option<u64>,
33
34 pub consumer_watermark: Option<CdcConsumerWatermark>,
35}
36
37impl PollConsumerConfig {
38 pub fn new(
39 consumer_id: CdcConsumerId,
40 thread_name: impl Into<String>,
41 poll_interval: Duration,
42 max_batch_size: Option<u64>,
43 ) -> Self {
44 Self {
45 consumer_id,
46 thread_name: thread_name.into(),
47 poll_interval,
48 max_batch_size,
49 consumer_watermark: None,
50 }
51 }
52
53 pub fn with_consumer_watermark(mut self, watermark: CdcConsumerWatermark) -> Self {
54 self.consumer_watermark = Some(watermark);
55 self
56 }
57}
58
59pub struct PollConsumer<H: CdcHost, C: CdcConsume + Send + 'static> {
60 config: PollConsumerConfig,
61 host: Option<H>,
62 consumer: Option<C>,
63 store: Option<CdcStore>,
64 running: Arc<AtomicBool>,
65 actor_system: ActorSystem,
66
67 handle: Option<CdcPollHandle>,
68}
69
70impl<H: CdcHost, C: CdcConsume + Send + 'static> PollConsumer<H, C> {
71 pub fn new(
72 config: PollConsumerConfig,
73 host: H,
74 consume: C,
75 store: CdcStore,
76 actor_system: ActorSystem,
77 ) -> Self {
78 Self {
79 config,
80 host: Some(host),
81 consumer: Some(consume),
82 store: Some(store),
83 running: Arc::new(AtomicBool::new(false)),
84 actor_system,
85 handle: None,
86 }
87 }
88
89 fn take_resources(&mut self) -> (H, C, CdcStore) {
90 let host = self.host.take().expect("host already consumed");
91 let consumer = self.consumer.take().expect("consumer already consumed");
92 let store = self.store.take().expect("store already consumed");
93 (host, consumer, store)
94 }
95
96 fn build_actor_config(&self) -> PollActorConfig {
97 PollActorConfig {
98 consumer_id: self.config.consumer_id.clone(),
99 poll_interval: self.config.poll_interval,
100 max_batch_size: self.config.max_batch_size,
101 }
102 }
103}
104
105impl<H: CdcHost, C: CdcConsume + Send + Sync + 'static> CdcConsumer for PollConsumer<H, C> {
106 fn start(&mut self) -> Result<()> {
107 if self.running.swap(true, Ordering::AcqRel) {
108 return Ok(());
109 }
110 let (host, consumer, store) = self.take_resources();
111 let watermark = self.config.consumer_watermark.clone();
112 let actor = PollActor::new(self.build_actor_config(), host, consumer, store, watermark);
113 self.handle = Some(self.actor_system.spawn_system(&self.config.thread_name, actor));
114 Ok(())
115 }
116
117 fn stop(&mut self) -> Result<()> {
118 if !self.running.swap(false, Ordering::AcqRel) {
119 return Ok(());
120 }
121
122 self.actor_system.shutdown();
123
124 if let Some(handle) = self.handle.take() {
125 let _ = handle.join();
126 }
127
128 Ok(())
129 }
130
131 fn is_running(&self) -> bool {
132 self.running.load(Ordering::Acquire)
133 }
134}