#![allow(dead_code)]
use super::consumer::Consumer;
use super::error::StreamError;
use super::types::{ChangeEvent, Op};
pub struct CdcBuilder<'a> {
consumer: Option<&'a Consumer>,
bootstrap_servers: Option<String>,
tables: Vec<String>,
operations: Vec<Op>,
from_beginning: bool,
}
impl<'a> CdcBuilder<'a> {
pub fn new(servers: impl Into<String>) -> Self {
Self {
consumer: None,
bootstrap_servers: Some(servers.into()),
tables: Vec::new(),
operations: vec![Op::Insert, Op::Update, Op::Delete],
from_beginning: false,
}
}
pub fn from_consumer(consumer: &'a Consumer) -> Self {
Self {
consumer: Some(consumer),
bootstrap_servers: None,
tables: Vec::new(),
operations: vec![Op::Insert, Op::Update, Op::Delete],
from_beginning: false,
}
}
pub fn tables(mut self, tables: &[&str]) -> Self {
self.tables = tables.iter().map(|s| s.to_string()).collect();
self
}
pub fn operations(mut self, ops: &[Op]) -> Self {
self.operations = ops.to_vec();
self
}
pub fn beginning(mut self) -> Self {
self.from_beginning = true;
self
}
pub fn current(mut self) -> Self {
self.from_beginning = false;
self
}
pub async fn build(self) -> Result<CdcStream, StreamError> {
if self.tables.is_empty() {
return Err(StreamError::Config(
"at least one table is required".to_string(),
));
}
Ok(CdcStream {
tables: self.tables,
operations: self.operations,
_from_beginning: self.from_beginning,
})
}
pub async fn subscribe(self) -> Result<CdcStream, StreamError> {
self.build().await
}
}
pub struct CdcStream {
tables: Vec<String>,
operations: Vec<Op>,
_from_beginning: bool,
}
impl CdcStream {
pub async fn poll(&self) -> Result<Option<ChangeEvent>, StreamError> {
Ok(None)
}
pub async fn poll_timeout(
&self,
_timeout: std::time::Duration,
) -> Result<Option<ChangeEvent>, StreamError> {
self.poll().await
}
pub fn tables(&self) -> &[String] {
&self.tables
}
pub fn operations(&self) -> &[Op] {
&self.operations
}
pub async fn close(&self) -> Result<(), StreamError> {
Ok(())
}
}