reifydb_cdc/consume/
poll.rs1use std::{
5 sync::{
6 Arc,
7 atomic::{AtomicBool, Ordering},
8 },
9 time::Duration,
10};
11
12use reifydb_core::{actors::cdc::CdcPollHandle, interface::cdc::CdcConsumerId};
13use reifydb_runtime::actor::system::ActorSystem;
14use reifydb_type::Result;
15
16use super::{
17 actor::{PollActor, PollActorConfig},
18 consumer::{CdcConsume, CdcConsumer},
19 host::CdcHost,
20};
21use crate::storage::CdcStore;
22
23#[derive(Debug, Clone)]
25pub struct PollConsumerConfig {
26 pub consumer_id: CdcConsumerId,
28 pub thread_name: String,
30 pub poll_interval: Duration,
32 pub max_batch_size: Option<u64>,
34}
35
36impl PollConsumerConfig {
37 pub fn new(
38 consumer_id: CdcConsumerId,
39 thread_name: impl Into<String>,
40 poll_interval: Duration,
41 max_batch_size: Option<u64>,
42 ) -> Self {
43 Self {
44 consumer_id,
45 thread_name: thread_name.into(),
46 poll_interval,
47 max_batch_size,
48 }
49 }
50}
51
52pub struct PollConsumer<H: CdcHost, C: CdcConsume + Send + 'static> {
57 config: PollConsumerConfig,
58 host: Option<H>,
59 consumer: Option<C>,
60 store: Option<CdcStore>,
61 running: Arc<AtomicBool>,
62 actor_system: ActorSystem,
63 handle: Option<CdcPollHandle>,
65}
66
67impl<H: CdcHost, C: CdcConsume + Send + 'static> PollConsumer<H, C> {
68 pub fn new(
69 config: PollConsumerConfig,
70 host: H,
71 consume: C,
72 store: CdcStore,
73 actor_system: ActorSystem,
74 ) -> Self {
75 Self {
76 config,
77 host: Some(host),
78 consumer: Some(consume),
79 store: Some(store),
80 running: Arc::new(AtomicBool::new(false)),
81 actor_system,
82 handle: None,
83 }
84 }
85
86 fn take_resources(&mut self) -> (H, C, CdcStore) {
89 let host = self.host.take().expect("host already consumed");
90 let consumer = self.consumer.take().expect("consumer already consumed");
91 let store = self.store.take().expect("store already consumed");
92 (host, consumer, store)
93 }
94
95 fn build_actor_config(&self) -> PollActorConfig {
96 PollActorConfig {
97 consumer_id: self.config.consumer_id.clone(),
98 poll_interval: self.config.poll_interval,
99 max_batch_size: self.config.max_batch_size,
100 }
101 }
102}
103
104impl<H: CdcHost, C: CdcConsume + Send + Sync + 'static> CdcConsumer for PollConsumer<H, C> {
105 fn start(&mut self) -> Result<()> {
106 if self.running.swap(true, Ordering::AcqRel) {
107 return Ok(());
108 }
109 let (host, consumer, store) = self.take_resources();
110 let actor = PollActor::new(self.build_actor_config(), host, consumer, store);
111 self.handle = Some(self.actor_system.spawn(&self.config.thread_name, actor));
112 Ok(())
113 }
114
115 fn stop(&mut self) -> Result<()> {
116 if !self.running.swap(false, Ordering::AcqRel) {
117 return Ok(()); }
119
120 self.actor_system.shutdown();
123
124 if let Some(handle) = self.handle.take() {
128 let _ = handle.join();
129 }
130
131 Ok(())
132 }
133
134 fn is_running(&self) -> bool {
135 self.running.load(Ordering::Acquire)
136 }
137}