rustcdc 0.6.2

Embeddable Rust CDC library focused on correctness-first capture primitives
Documentation
# Adapter SDK Guide

## Contract

`SinkAdapter` is an async trait for adapter development. It uses native `async fn in traits`
(RPITIT, stabilised in Rust 1.75). **No `#[async_trait]` macro is required** in implementations —
just write plain `async fn` methods.

Required methods:
- `send(&mut self, event: &Event) -> Result<()>`: accept one canonical event.
- `flush(&mut self) -> Result<()>`: durably flush buffered sends.
- `close(&mut self) -> Result<()>`: graceful shutdown.
- `name(&self) -> &str`: stable adapter identifier.

Optional capability methods (all default to no-op / `false`):
- `delivery_guarantee() -> SinkDeliveryGuarantee`: advertise `AtLeastOnce`, `AtLeastOnceIdempotent`, or `EffectivelyOnce`.
- `idempotent_delivery_capable() -> bool`: whether retrying the same event is idempotent.
- `transactional_checkpoint_barrier_capable() -> bool`: whether the adapter implements the full barrier protocol.
- `queue_depth() -> Option<usize>`: current in-memory buffer depth for back-pressure observation.
- `flush_tick_interval() -> Option<Duration>`: hint for periodic time-based flushing.
- `begin_checkpoint_barrier / commit_checkpoint_barrier / abort_checkpoint_barrier`: two-phase-commit-style barrier protocol for effectively-once delivery.
- `preflight_check()`: connectivity validation called once at startup.
- `close_with_timeout(ms)`: pre-built graceful close with timeout (no override needed).
- `exported_events() -> Option<&[Event]>`: for in-memory/test adapters; enables conformance assertions.
- `delivery_metrics() -> Option<SinkDeliveryMetrics>`: snapshot of delivery counters (`events_sent`, `events_errored`, `events_retried`, `last_delivered_offset`) for admin endpoints and Prometheus scrapers. `SinkDeliveryMetrics::merge()` is provided for composed sinks.
- `is_closed() -> bool`: closed-state hook used by conformance suite; defaults to `false`.

Behavioral expectations:
- Preserve event ordering per `send` call sequence.
- Return structured errors instead of panicking.
- Treat `flush` as a durability boundary.
- Reject sends after `close` with a state error.

## Implementation Guide

1. Implement a concrete sink type that stores connection/client state.
2. Implement `SinkAdapter` on that type (only `send`, `flush`, `close`, `name` are required).
3. Ensure idempotent `close` behavior.
4. Redact credentials from logs and debug output.
5. Surface recoverable versus unrecoverable errors using `rustcdc::Error` variants.
6. Override `delivery_guarantee` if your sink can offer stronger-than-default guarantees.
7. Override `delivery_metrics` to expose per-sink counters to admin/observability endpoints.
8. Use `your_sink.boxed()` (or `BoxedSink::new(your_sink)`) when you need type-erased storage or `FanOutSinkAdapter` / `TableRouter<BoxedSink>` composition.

Recommended pattern:
- `send`: enqueue and optionally batch.
- `flush`: commit batch to external sink.
- `close`: call `flush`, then release resources.

## Conformance Instructions

Use `AdapterGoldenFixture`, `BasicAdapterConformance`, and `AdapterConformanceSuite` from `testkit`:

- `single_event`: validates basic acceptance path.
- `batch_send`: validates batch handling and flush behavior.
- `ordering`: validates stable send order.
- `crash_recovery`: validates close semantics after sends.

Minimum validation loop:
1. Build fixture sequences with `AdapterGoldenFixture::{single_event,batch,ordering,crash_recovery}`.
2. Run `AdapterConformanceSuite::run_all()` against your adapter and fixture.
3. Assert all returned `TestResult` values are `passed = true`.
4. Add adapter-specific fault tests (network loss, sink timeout, partial flush failure).

Quick smoke harness:

```rust
use rustcdc::testkit::{AdapterConformanceSuite, AdapterGoldenFixture};
use rustcdc::{MemorySinkAdapter, Event, Operation, SourceMetadata, EVENT_ENVELOPE_VERSION};
use serde_json::json;

async fn validate_adapter() -> rustcdc::Result<()> {
    let fixture = AdapterGoldenFixture::single_event(Event {
        before: None,
        after: Some(json!({"id": 1})),
        op: Operation::Insert,
        source: SourceMetadata {
            source_name: "test".into(),
            offset: "1".into(),
            timestamp: 1,
        },
        ts: 1,
        schema: Some("public".into()),
        table: "items".into(),
        primary_key: Some(vec!["id".into()]),
        snapshot: None,
        transaction: None,
        envelope_version: EVENT_ENVELOPE_VERSION,
        before_is_key_only: false,
    });
    let suite = AdapterConformanceSuite::new();
    let mut adapter = MemorySinkAdapter::default();

    let results = suite.run_all(&mut adapter, &fixture).await?;
    assert!(results.iter().all(|result| result.passed));
    Ok(())
}
```

## Dynamic Dispatch, Fan-Out, and Routing

To store sinks heterogeneously or compose multiple sinks:

```rust,no_run
use rustcdc::sink::{BoxedSink, FanOutSinkAdapter, MemorySinkAdapter, SinkAdapter};
use rustcdc::pipeline::HeterogeneousTableRouter;

// Type-erase any SinkAdapter (two equivalent spellings):
let boxed: BoxedSink = MemorySinkAdapter::new("mem").boxed();
let boxed2: BoxedSink = BoxedSink::new(MemorySinkAdapter::new("mem2"));

// Fan out to multiple sinks concurrently:
let fan = FanOutSinkAdapter::new(vec![
    MemorySinkAdapter::new("a").boxed(),
    MemorySinkAdapter::new("b").boxed(),
]);
// fan.send(event) drives all children concurrently; latency = max(children)

// Route events by table name to heterogeneous sinks:
let router = HeterogeneousTableRouter::builder("demo")
    .route("public.orders", MemorySinkAdapter::new("orders").boxed())
    .route("public.audit", MemorySinkAdapter::new("audit").boxed())
    .default(MemorySinkAdapter::new("fallback").boxed())
    .build()
    .expect("valid patterns");
```

## Notes

- The four required methods (`send`, `flush`, `close`, `name`) are the minimum viable implementation.
- All optional capability methods have no-op defaults; override only what your sink actually supports.
- Retry policies and delivery semantics are the responsibility of the embedding application.
- `MemorySinkAdapter` is exported from the crate root for convenience in tests.

## EffectivelyOnce Delivery: Barrier Protocol

`SinkDeliveryGuarantee::EffectivelyOnce` is the strongest guarantee available. It requires
the sink to participate in a **two-phase checkpoint barrier** so that the runtime and the sink
advance their durable state atomically — no event is lost on crash, and no event is emitted
twice to the downstream system (assuming the downstream is a transactional target such as a
database or a Kafka topic with transactions enabled).

### Responsibility split

> **Important:** The runtime does **not** call `begin_checkpoint_barrier`, `commit_checkpoint_barrier`,
> or `abort_checkpoint_barrier` automatically. No runtime code drives the barrier.
> The **embedder** must call these methods explicitly around the commit path.
> The barrier methods are pure hooks on the `SinkAdapter` trait — they exist so the embedder
> can drive them in a coordinated sequence with `commit_ack`.

The **embedder** is responsible for:
- Calling `sink.begin_checkpoint_barrier()` before the commit window opens.
- Calling `sink.commit_checkpoint_barrier()` after `runtime.commit_ack(...)` succeeds.
- Calling `sink.abort_checkpoint_barrier()` on any error in the commit window.

The **sink** is responsible for:
- Opening a transaction (or writing to a transactional staging area) on `begin_checkpoint_barrier`.
- Making all `send()` calls part of that transaction during the window.
- Committing the transaction atomically with the checkpoint on `commit_checkpoint_barrier`.
- Rolling back on `abort_checkpoint_barrier`.

### Minimal implementation skeleton

```rust,no_run
use rustcdc::sink::{SinkAdapter, SinkDeliveryGuarantee};
use rustcdc::{Event, Result};

/// A sink that writes to a transactional target and participates in the
/// checkpoint barrier protocol to provide EffectivelyOnce delivery.
pub struct TransactionalSink {
    name: String,
    pending: Vec<Event>,
    in_barrier: bool,
    closed: bool,
}

impl TransactionalSink {
    pub fn new(name: impl Into<String>) -> Self {
        Self {
            name: name.into(),
            pending: Vec::new(),
            in_barrier: false,
            closed: false,
        }
    }

    async fn begin_transaction(&mut self) -> Result<()> {
        // Open a database transaction, Kafka transaction, etc.
        self.in_barrier = true;
        Ok(())
    }

    async fn commit_transaction(&mut self) -> Result<()> {
        // Commit all pending events atomically with the checkpoint.
        self.pending.clear();
        self.in_barrier = false;
        Ok(())
    }

    async fn rollback_transaction(&mut self) -> Result<()> {
        self.pending.clear();
        self.in_barrier = false;
        Ok(())
    }
}

impl SinkAdapter for TransactionalSink {
    async fn send(&mut self, event: &Event) -> Result<()> {
        // Buffer into the open transaction.
        self.pending.push(event.clone());
        Ok(())
    }

    async fn flush(&mut self) -> Result<()> {
        // Flush pending events within the current transaction (no commit yet).
        Ok(())
    }

    async fn close(&mut self) -> Result<()> {
        if self.in_barrier {
            self.rollback_transaction().await?;
        }
        self.closed = true;
        Ok(())
    }

    fn name(&self) -> &str {
        &self.name
    }

    fn delivery_guarantee(&self) -> SinkDeliveryGuarantee {
        SinkDeliveryGuarantee::EffectivelyOnce
    }

    fn transactional_checkpoint_barrier_capable(&self) -> bool {
        true
    }

    fn is_closed(&self) -> bool {
        self.closed
    }

    async fn begin_checkpoint_barrier(&mut self) -> Result<()> {
        self.begin_transaction().await
    }

    async fn commit_checkpoint_barrier(&mut self) -> Result<()> {
        self.commit_transaction().await
    }

    async fn abort_checkpoint_barrier(&mut self) -> Result<()> {
        self.rollback_transaction().await
    }
}
```

### Embedder-side barrier orchestration

```rust,no_run
# use rustcdc::{CdcRuntime, sink::SinkAdapter};
async fn run(mut runtime: CdcRuntime, mut sink: impl SinkAdapter) -> rustcdc::Result<()> {
    loop {
        let batch = runtime.poll_event_batch().await?;
        if batch.is_empty() {
            break;
        }
        // Send all events in the batch to the sink.
        for event in batch.events() {
            sink.send(event).await?;
        }
        // Open the barrier — sink begins its transaction.
        sink.begin_checkpoint_barrier().await?;
        // Commit the checkpoint durably.
        match runtime.commit_ack(batch.ack_mode()).await {
            Ok(_) => {
                // Checkpoint is durable — commit the sink transaction.
                sink.commit_checkpoint_barrier().await?;
            }
            Err(e) => {
                // Checkpoint failed — roll back the sink transaction.
                sink.abort_checkpoint_barrier().await?;
                return Err(e);
            }
        }
    }
    sink.close().await?;
    Ok(())
}
```