rustcdc 0.6.7

Embeddable Rust CDC library focused on correctness-first capture primitives
Documentation
use async_trait::async_trait;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use rustcdc::transform::{Transform, TransformPipeline};
use rustcdc::{Event, Operation, SnapshotValidator, SourceMetadata, EVENT_ENVELOPE_VERSION};
use serde_json::json;
use std::collections::HashSet;
use std::hint::black_box;
use tokio::runtime::Builder;

fn build_transform_pipeline() -> TransformPipeline {
    let mut pipeline = TransformPipeline::default();
    pipeline.add_transform(Box::new(AddTagTransform));
    pipeline.add_transform(Box::new(NormalizeNameTransform));
    pipeline
}

fn build_event(id: u64) -> Event {
    Event {
        before: None,
        after: Some(
            json!({"id": id, "name": format!("user-{id}"), "email": format!("user-{id}@example.com")}),
        ),
        op: Operation::Read,
        source: SourceMetadata {
            source_name: "bench".to_string(),
            offset: id.to_string(),
            timestamp: id + 1,
        },
        ts: id + 1,
        schema: Some("public".to_string()),
        table: "users".to_string(),
        primary_key: Some(vec!["id".to_string()]),
        snapshot: None,
        transaction: None,
        envelope_version: EVENT_ENVELOPE_VERSION,
        before_is_key_only: false,
    }
}

fn run_pipeline_batch(
    runtime: &tokio::runtime::Runtime,
    pipeline: &mut TransformPipeline,
    size: u64,
) {
    for idx in 1..=size {
        let event = build_event(idx);
        let _ = runtime
            .block_on(pipeline.apply(event))
            .expect("apply transform pipeline")
            .expect("event should not be filtered");
    }
}

fn dedup_by_id(mut events: Vec<Event>) -> Vec<Event> {
    let mut seen = HashSet::with_capacity(events.len());
    events.retain(|event| {
        let id = event
            .after
            .as_ref()
            .and_then(|value| value.get("id"))
            .and_then(|value| value.as_u64())
            .unwrap_or(0);
        seen.insert(id)
    });
    events
}

#[derive(Debug)]
struct AddTagTransform;

#[async_trait]
impl Transform for AddTagTransform {
    async fn apply(&self, event: &mut Event) -> rustcdc::Result<bool> {
        if let Some(after) = event.after.as_mut().and_then(|value| value.as_object_mut()) {
            after.insert("bench_tag".to_string(), json!("quality"));
        }
        Ok(true)
    }

    fn name(&self) -> &str {
        "add_tag"
    }
}

#[derive(Debug)]
struct NormalizeNameTransform;

#[async_trait]
impl Transform for NormalizeNameTransform {
    async fn apply(&self, event: &mut Event) -> rustcdc::Result<bool> {
        if let Some(after) = event.after.as_mut().and_then(|value| value.as_object_mut()) {
            if let Some(serde_json::Value::String(name)) = after.get_mut("name") {
                name.make_ascii_uppercase();
            }
        }
        Ok(true)
    }

    fn name(&self) -> &str {
        "normalize_name"
    }
}

fn bench_event_json_roundtrip(c: &mut Criterion) {
    let payload = build_event(1);
    c.bench_function("event_json_roundtrip", |b| {
        b.iter(|| {
            let encoded = black_box(&payload).to_json().expect("serialize event");
            Event::from_json(&encoded).expect("deserialize event")
        })
    });
}

fn bench_transform_pipeline(c: &mut Criterion) {
    let runtime = Builder::new_current_thread()
        .enable_all()
        .build()
        .expect("build tokio runtime");

    let pipeline = build_transform_pipeline();

    c.bench_function("transform_pipeline_two_stages", |b| {
        b.iter(|| {
            let event = build_event(black_box(100));
            runtime
                .block_on(pipeline.apply(event))
                .expect("apply transform pipeline")
        })
    });
}

fn bench_snapshot_10k_rows(c: &mut Criterion) {
    let mut group = c.benchmark_group("snapshot_validator");
    let size = 10_000_u64;
    let events: Vec<Event> = (1..=size).map(build_event).collect();
    group.throughput(Throughput::Elements(size));
    group.bench_with_input(BenchmarkId::from_parameter(size), &events, |b, input| {
        b.iter(|| {
            let mut validator = SnapshotValidator::new();
            validator.set_expected_count("users", size);
            for event in input {
                validator.track_event(event).expect("track snapshot event");
            }
            validator.finalize().expect("finalize snapshot validator")
        })
    });
    group.finish();
}

fn bench_stream_1k_events(c: &mut Criterion) {
    let runtime = Builder::new_current_thread()
        .enable_all()
        .build()
        .expect("build tokio runtime");
    let mut pipeline = build_transform_pipeline();

    let mut group = c.benchmark_group("stream_events");
    group.throughput(Throughput::Elements(1_000));
    group.bench_function("stream_1k_events", |b| {
        b.iter(|| run_pipeline_batch(&runtime, &mut pipeline, black_box(1_000)))
    });
    group.finish();
}

fn bench_full_cycle_snapshot_stream_handoff(c: &mut Criterion) {
    let runtime = Builder::new_current_thread()
        .enable_all()
        .build()
        .expect("build tokio runtime");
    let mut pipeline = build_transform_pipeline();

    let snapshot_events: Vec<Event> = (1..=10_000).map(build_event).collect();
    let overlap_prefetch: Vec<Event> = (9_500..=10_500).map(build_event).collect();

    c.bench_function("full_cycle_snapshot_stream_handoff", |b| {
        b.iter(|| {
            let mut validator = SnapshotValidator::new();
            validator.set_expected_count("users", 10_000);
            for event in &snapshot_events {
                validator.track_event(event).expect("track snapshot event");
            }
            let _ = validator.finalize().expect("validate snapshot consistency");

            run_pipeline_batch(&runtime, &mut pipeline, 1_000);

            let _forward = dedup_by_id(overlap_prefetch.clone());
        })
    });
}

fn bench_parallel_snapshot_4x100k(c: &mut Criterion) {
    let table_events: Vec<(String, Vec<Event>)> = (0..4_u64)
        .map(|table_idx| {
            let table_name = format!("users_{table_idx}");
            let offset_base = table_idx * 100_000;
            let events = (1..=100_000_u64)
                .map(|row| {
                    let mut event = build_event(offset_base + row);
                    event.table = table_name.clone();
                    event
                })
                .collect::<Vec<_>>();
            (table_name, events)
        })
        .collect();

    c.bench_function("parallel_snapshot_4_tables_100k", |b| {
        b.iter(|| {
            for (table_name, events) in &table_events {
                let mut validator = SnapshotValidator::new();
                validator.set_expected_count(table_name, 100_000);
                for event in events {
                    validator
                        .track_event(event)
                        .expect("track parallel snapshot row");
                }
                let _ = validator
                    .finalize()
                    .expect("finalize parallel snapshot table");
            }
        })
    });
}

fn bench_quality_gate_targets(c: &mut Criterion) {
    let mut group = c.benchmark_group("quality_gates");
    // Use the same sampling parameters as the main benchmark suite so that
    // per-measurement confidence intervals are tight enough to support the
    // ±5 % regression gate. The former values (20 samples / 5 s) produced
    // wide intervals that caused spurious CI failures from machine-load drift.
    group.sample_size(30);
    group.measurement_time(std::time::Duration::from_secs(8));

    group.bench_function("snapshot_10k_rows", |b| {
        let events: Vec<Event> = (1..=10_000).map(build_event).collect();
        b.iter(|| {
            let mut validator = SnapshotValidator::new();
            validator.set_expected_count("users", 10_000);
            for event in &events {
                validator.track_event(event).expect("track snapshot event");
            }
            validator.finalize().expect("finalize snapshot")
        })
    });

    let runtime = Builder::new_current_thread()
        .enable_all()
        .build()
        .expect("build tokio runtime");
    let mut pipeline = build_transform_pipeline();
    group.bench_function("stream_1k_events_target", |b| {
        b.iter(|| run_pipeline_batch(&runtime, &mut pipeline, 1_000))
    });

    group.finish();
}

fn bench_full_quality_suite(c: &mut Criterion) {
    bench_snapshot_10k_rows(c);
    bench_stream_1k_events(c);
    bench_full_cycle_snapshot_stream_handoff(c);
    bench_parallel_snapshot_4x100k(c);
    bench_quality_gate_targets(c);
}

fn bench_utility(c: &mut Criterion) {
    bench_event_json_roundtrip(c);
    bench_transform_pipeline(c);
}

criterion_group!(quality_perf, bench_full_quality_suite, bench_utility);
criterion_main!(quality_perf);