reifydb_cdc/consume/
poll.rs1use std::{
5 sync::{
6 Arc,
7 atomic::{AtomicBool, Ordering},
8 },
9 time::Duration,
10};
11
12use reifydb_core::{actors::cdc::CdcPollHandle, interface::cdc::CdcConsumerId};
13use reifydb_runtime::actor::system::ActorSystem;
14use reifydb_type::Result;
15
16use super::{
17 actor::{PollActor, PollActorConfig},
18 consumer::{CdcConsume, CdcConsumer},
19 host::CdcHost,
20};
21use crate::storage::CdcStore;
22
23#[derive(Debug, Clone)]
25pub struct PollConsumerConfig {
26 pub consumer_id: CdcConsumerId,
28 pub thread_name: String,
30 pub poll_interval: Duration,
32 pub max_batch_size: Option<u64>,
34}
35
36impl PollConsumerConfig {
37 pub fn new(
38 consumer_id: CdcConsumerId,
39 thread_name: impl Into<String>,
40 poll_interval: Duration,
41 max_batch_size: Option<u64>,
42 ) -> Self {
43 Self {
44 consumer_id,
45 thread_name: thread_name.into(),
46 poll_interval,
47 max_batch_size,
48 }
49 }
50}
51
52pub struct PollConsumer<H: CdcHost, C: CdcConsume + Send + 'static> {
57 config: PollConsumerConfig,
58 host: Option<H>,
59 consumer: Option<C>,
60 store: Option<CdcStore>,
61 running: Arc<AtomicBool>,
62 actor_system: ActorSystem,
63 handle: Option<CdcPollHandle>,
65}
66
67impl<H: CdcHost, C: CdcConsume + Send + 'static> PollConsumer<H, C> {
68 pub fn new(
69 config: PollConsumerConfig,
70 host: H,
71 consume: C,
72 store: CdcStore,
73 actor_system: ActorSystem,
74 ) -> Self {
75 Self {
76 config,
77 host: Some(host),
78 consumer: Some(consume),
79 store: Some(store),
80 running: Arc::new(AtomicBool::new(false)),
81 actor_system,
82 handle: None,
83 }
84 }
85}
86
87impl<H: CdcHost, C: CdcConsume + Send + Sync + 'static> CdcConsumer for PollConsumer<H, C> {
88 fn start(&mut self) -> Result<()> {
89 if self.running.swap(true, Ordering::AcqRel) {
90 return Ok(()); }
92
93 let host = self.host.take().expect("host already consumed");
94 let consumer = self.consumer.take().expect("consumer already consumed");
95 let store = self.store.take().expect("store already consumed");
96
97 let actor_config = PollActorConfig {
98 consumer_id: self.config.consumer_id.clone(),
99 poll_interval: self.config.poll_interval,
100 max_batch_size: self.config.max_batch_size,
101 };
102
103 let actor = PollActor::new(actor_config, host, consumer, store);
104
105 let handle = self.actor_system.spawn(&self.config.thread_name, actor);
107 self.handle = Some(handle);
108
109 Ok(())
110 }
111
112 fn stop(&mut self) -> Result<()> {
113 if !self.running.swap(false, Ordering::AcqRel) {
114 return Ok(()); }
116
117 self.actor_system.shutdown();
120
121 if let Some(handle) = self.handle.take() {
125 let _ = handle.join();
126 }
127
128 Ok(())
129 }
130
131 fn is_running(&self) -> bool {
132 self.running.load(Ordering::Acquire)
133 }
134}