use std::{
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
time::Duration,
};
use reifydb_core::interface::cdc::CdcConsumerId;
use reifydb_runtime::actor::system::{ActorHandle, ActorSystem};
use reifydb_type::Result;
use super::{
actor::{PollActor, PollActorConfig, PollMsg},
consumer::{CdcConsume, CdcConsumer},
host::CdcHost,
};
use crate::storage::CdcStore;
#[derive(Debug, Clone)]
pub struct PollConsumerConfig {
pub consumer_id: CdcConsumerId,
pub thread_name: String,
pub poll_interval: Duration,
pub max_batch_size: Option<u64>,
}
impl PollConsumerConfig {
pub fn new(
consumer_id: CdcConsumerId,
thread_name: impl Into<String>,
poll_interval: Duration,
max_batch_size: Option<u64>,
) -> Self {
Self {
consumer_id,
thread_name: thread_name.into(),
poll_interval,
max_batch_size,
}
}
}
pub struct PollConsumer<H: CdcHost, C: CdcConsume + Send + 'static> {
config: PollConsumerConfig,
host: Option<H>,
consumer: Option<C>,
store: Option<CdcStore>,
running: Arc<AtomicBool>,
actor_system: ActorSystem,
handle: Option<ActorHandle<PollMsg>>,
}
impl<H: CdcHost, C: CdcConsume + Send + 'static> PollConsumer<H, C> {
pub fn new(
config: PollConsumerConfig,
host: H,
consume: C,
store: CdcStore,
actor_system: ActorSystem,
) -> Self {
Self {
config,
host: Some(host),
consumer: Some(consume),
store: Some(store),
running: Arc::new(AtomicBool::new(false)),
actor_system,
handle: None,
}
}
}
impl<H: CdcHost, C: CdcConsume + Send + Sync + 'static> CdcConsumer for PollConsumer<H, C> {
fn start(&mut self) -> Result<()> {
if self.running.swap(true, Ordering::AcqRel) {
return Ok(()); }
let host = self.host.take().expect("host already consumed");
let consumer = self.consumer.take().expect("consumer already consumed");
let store = self.store.take().expect("store already consumed");
let actor_config = PollActorConfig {
consumer_id: self.config.consumer_id.clone(),
poll_interval: self.config.poll_interval,
max_batch_size: self.config.max_batch_size,
};
let actor = PollActor::new(actor_config, host, consumer, store);
let handle = self.actor_system.spawn(&self.config.thread_name, actor);
self.handle = Some(handle);
Ok(())
}
fn stop(&mut self) -> Result<()> {
if !self.running.swap(false, Ordering::AcqRel) {
return Ok(()); }
self.actor_system.shutdown();
if let Some(handle) = self.handle.take() {
let _ = handle.join();
}
Ok(())
}
fn is_running(&self) -> bool {
self.running.load(Ordering::Acquire)
}
}