use crate::common::{CdcEvent, CdcOp};
use std::time::{Duration, Instant};
use tokio::sync::Mutex;
use tokio::time::interval;
#[derive(Debug, Clone)]
pub struct BatchConfig {
pub max_events: usize,
pub max_bytes: usize,
pub max_delay: Duration,
pub preserve_transactions: bool,
pub min_events: usize,
}
impl Default for BatchConfig {
fn default() -> Self {
Self {
max_events: 1000,
max_bytes: 1_048_576, max_delay: Duration::from_millis(100),
preserve_transactions: true,
min_events: 1,
}
}
}
impl BatchConfig {
pub fn builder() -> BatchConfigBuilder {
BatchConfigBuilder::default()
}
pub fn high_throughput() -> Self {
Self {
max_events: 10_000,
max_bytes: 10_485_760, max_delay: Duration::from_millis(500),
preserve_transactions: false,
min_events: 100,
}
}
pub fn low_latency() -> Self {
Self {
max_events: 100,
max_bytes: 102_400, max_delay: Duration::from_millis(10),
preserve_transactions: true,
min_events: 1,
}
}
}
#[derive(Default)]
pub struct BatchConfigBuilder {
max_events: Option<usize>,
max_bytes: Option<usize>,
max_delay: Option<Duration>,
preserve_transactions: Option<bool>,
min_events: Option<usize>,
}
impl BatchConfigBuilder {
pub fn max_events(mut self, n: usize) -> Self {
self.max_events = Some(n);
self
}
pub fn max_bytes(mut self, n: usize) -> Self {
self.max_bytes = Some(n);
self
}
pub fn max_delay(mut self, d: Duration) -> Self {
self.max_delay = Some(d);
self
}
pub fn preserve_transactions(mut self, v: bool) -> Self {
self.preserve_transactions = Some(v);
self
}
pub fn min_events(mut self, n: usize) -> Self {
self.min_events = Some(n);
self
}
pub fn build(self) -> BatchConfig {
let default = BatchConfig::default();
BatchConfig {
max_events: self.max_events.unwrap_or(default.max_events),
max_bytes: self.max_bytes.unwrap_or(default.max_bytes),
max_delay: self.max_delay.unwrap_or(default.max_delay),
preserve_transactions: self
.preserve_transactions
.unwrap_or(default.preserve_transactions),
min_events: self.min_events.unwrap_or(default.min_events),
}
}
}
#[derive(Debug, Clone)]
pub struct EventBatch {
pub events: Vec<CdcEvent>,
pub bytes: usize,
pub created_at: Instant,
pub flushed_at: Instant,
pub sequence: u64,
pub transaction_ids: Vec<String>,
}
impl EventBatch {
#[inline]
pub fn len(&self) -> usize {
self.events.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.events.is_empty()
}
pub fn wait_time(&self) -> Duration {
self.flushed_at.duration_since(self.created_at)
}
pub fn counts(&self) -> BatchCounts {
let mut counts = BatchCounts::default();
for event in &self.events {
match event.op {
CdcOp::Insert => counts.inserts += 1,
CdcOp::Update => counts.updates += 1,
CdcOp::Delete => counts.deletes += 1,
CdcOp::Tombstone => counts.tombstones += 1,
CdcOp::Snapshot => counts.snapshots += 1,
CdcOp::Truncate => counts.truncates += 1,
CdcOp::Schema => counts.schemas += 1,
}
}
counts
}
}
#[derive(Debug, Clone, Default)]
pub struct BatchCounts {
pub inserts: usize,
pub updates: usize,
pub deletes: usize,
pub tombstones: usize,
pub snapshots: usize,
pub truncates: usize,
pub schemas: usize,
}
impl BatchCounts {
pub fn total(&self) -> usize {
self.inserts
+ self.updates
+ self.deletes
+ self.tombstones
+ self.snapshots
+ self.truncates
+ self.schemas
}
}
pub struct EventBatcher {
config: BatchConfig,
state: Mutex<BatcherState>,
}
struct BatcherState {
events: Vec<CdcEvent>,
bytes: usize,
batch_start: Instant,
sequence: u64,
in_transaction: bool,
#[allow(dead_code)] transaction_id: Option<String>,
}
impl EventBatcher {
pub fn new(config: BatchConfig) -> Self {
let max_events = config.max_events;
Self {
config,
state: Mutex::new(BatcherState {
events: Vec::with_capacity(max_events),
bytes: 0,
batch_start: Instant::now(),
sequence: 0,
in_transaction: false,
transaction_id: None,
}),
}
}
pub async fn add(&self, event: CdcEvent) -> Option<EventBatch> {
let mut state = self.state.lock().await;
let event_bytes = estimate_event_size(&event);
if self.config.preserve_transactions {
if !state.in_transaction && event.is_dml() {
state.in_transaction = true;
}
}
state.events.push(event);
state.bytes += event_bytes;
let should_flush = self.should_flush(&state);
if should_flush {
Some(self.create_batch(&mut state))
} else {
None
}
}
pub async fn flush(&self) -> Option<EventBatch> {
let mut state = self.state.lock().await;
if state.events.is_empty() {
return None;
}
Some(self.create_batch(&mut state))
}
pub async fn check_timeout(&self) -> Option<EventBatch> {
let mut state = self.state.lock().await;
if state.events.len() >= self.config.min_events
&& state.batch_start.elapsed() >= self.config.max_delay
{
if self.config.preserve_transactions && state.in_transaction {
return None;
}
return Some(self.create_batch(&mut state));
}
None
}
pub async fn pending_count(&self) -> usize {
let state = self.state.lock().await;
state.events.len()
}
pub async fn pending_bytes(&self) -> usize {
let state = self.state.lock().await;
state.bytes
}
fn should_flush(&self, state: &BatcherState) -> bool {
if state.events.len() >= self.config.max_events {
return true;
}
if state.bytes >= self.config.max_bytes {
return true;
}
false
}
fn create_batch(&self, state: &mut BatcherState) -> EventBatch {
let events = std::mem::take(&mut state.events);
let bytes = state.bytes;
state.events = Vec::with_capacity(self.config.max_events);
state.bytes = 0;
state.batch_start = Instant::now();
state.sequence += 1;
state.in_transaction = false;
EventBatch {
events,
bytes,
created_at: state.batch_start,
flushed_at: Instant::now(),
sequence: state.sequence,
transaction_ids: Vec::new(),
}
}
}
fn estimate_event_size(event: &CdcEvent) -> usize {
let mut size = 0;
size += 64;
if let Some(ref before) = event.before {
size += before.to_string().len();
}
if let Some(ref after) = event.after {
size += after.to_string().len();
}
size
}
pub struct BatchProcessor<F>
where
F: FnMut(EventBatch) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>> + Send,
{
batcher: EventBatcher,
handler: F,
}
impl<F> BatchProcessor<F>
where
F: FnMut(EventBatch) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>> + Send,
{
pub fn new(config: BatchConfig, handler: F) -> Self {
Self {
batcher: EventBatcher::new(config.clone()),
handler,
}
}
pub async fn process(&mut self, event: CdcEvent) {
if let Some(batch) = self.batcher.add(event).await {
(self.handler)(batch).await;
}
}
pub async fn flush(&mut self) {
if let Some(batch) = self.batcher.flush().await {
(self.handler)(batch).await;
}
}
}
pub fn spawn_timeout_flusher(
batcher: std::sync::Arc<EventBatcher>,
flush_interval: Duration,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = interval(flush_interval);
loop {
interval.tick().await;
if let Some(_batch) = batcher.check_timeout().await {
}
}
})
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
fn make_event(op: CdcOp) -> CdcEvent {
CdcEvent {
source_type: "postgres".to_string(),
database: "testdb".to_string(),
schema: "public".to_string(),
table: "users".to_string(),
op,
before: None,
after: Some(json!({"id": 1, "name": "Alice"})),
timestamp: 1234567890,
transaction: None,
}
}
#[test]
fn test_batch_config_builder() {
let config = BatchConfig::builder()
.max_events(500)
.max_bytes(500_000)
.max_delay(Duration::from_millis(50))
.build();
assert_eq!(config.max_events, 500);
assert_eq!(config.max_bytes, 500_000);
assert_eq!(config.max_delay, Duration::from_millis(50));
}
#[test]
fn test_batch_config_presets() {
let high = BatchConfig::high_throughput();
assert_eq!(high.max_events, 10_000);
let low = BatchConfig::low_latency();
assert_eq!(low.max_events, 100);
}
#[tokio::test]
async fn test_batcher_size_limit() {
let config = BatchConfig::builder().max_events(3).build();
let batcher = EventBatcher::new(config);
assert!(batcher.add(make_event(CdcOp::Insert)).await.is_none());
assert!(batcher.add(make_event(CdcOp::Update)).await.is_none());
let batch = batcher.add(make_event(CdcOp::Delete)).await;
assert!(batch.is_some());
assert_eq!(batch.unwrap().len(), 3);
}
#[tokio::test]
async fn test_batcher_force_flush() {
let config = BatchConfig::default();
let batcher = EventBatcher::new(config);
batcher.add(make_event(CdcOp::Insert)).await;
batcher.add(make_event(CdcOp::Insert)).await;
let batch = batcher.flush().await;
assert!(batch.is_some());
assert_eq!(batch.unwrap().len(), 2);
assert!(batcher.flush().await.is_none());
}
#[tokio::test]
async fn test_batch_counts() {
let config = BatchConfig::builder().max_events(10).build();
let batcher = EventBatcher::new(config);
batcher.add(make_event(CdcOp::Insert)).await;
batcher.add(make_event(CdcOp::Insert)).await;
batcher.add(make_event(CdcOp::Update)).await;
batcher.add(make_event(CdcOp::Delete)).await;
let batch = batcher.flush().await.unwrap();
let counts = batch.counts();
assert_eq!(counts.inserts, 2);
assert_eq!(counts.updates, 1);
assert_eq!(counts.deletes, 1);
assert_eq!(counts.total(), 4);
}
#[tokio::test]
async fn test_pending_counts() {
let config = BatchConfig::default();
let batcher = EventBatcher::new(config);
assert_eq!(batcher.pending_count().await, 0);
batcher.add(make_event(CdcOp::Insert)).await;
batcher.add(make_event(CdcOp::Insert)).await;
assert_eq!(batcher.pending_count().await, 2);
assert!(batcher.pending_bytes().await > 0);
}
#[tokio::test]
async fn test_batch_sequence_numbers() {
let config = BatchConfig::builder().max_events(1).build();
let batcher = EventBatcher::new(config);
let batch1 = batcher.add(make_event(CdcOp::Insert)).await.unwrap();
let batch2 = batcher.add(make_event(CdcOp::Insert)).await.unwrap();
let batch3 = batcher.add(make_event(CdcOp::Insert)).await.unwrap();
assert_eq!(batch1.sequence, 1);
assert_eq!(batch2.sequence, 2);
assert_eq!(batch3.sequence, 3);
}
#[test]
fn test_estimate_event_size() {
let event = make_event(CdcOp::Insert);
let size = estimate_event_size(&event);
assert!(size > 64);
}
#[test]
fn test_batch_wait_time() {
let batch = EventBatch {
events: vec![],
bytes: 0,
created_at: Instant::now(),
flushed_at: Instant::now(),
sequence: 1,
transaction_ids: vec![],
};
assert!(batch.wait_time() < Duration::from_millis(10));
}
}