# rustcdc API Guide
This document is the primary API reference for embedding rustcdc in Rust applications.
## Audience
This guide is for engineers integrating rustcdc as a library and building custom runtime loops.
## API Surface
The core embedder API is centered on:
- `RuntimeConfig` for runtime construction
- `CdcRuntime` for lifecycle and event delivery
- `RuntimeSourceConfig` for source selection
- `EventBatch` and `AckToken` for loss-safe delivery semantics
## Runtime Construction
`RuntimeConfig` binds four concerns:
- source connector configuration
- checkpoint backend
- schema history backend
- runtime options and observability
Typical shape:
```rust
use rustcdc::{
checkpoint::InMemoryCheckpoint,
IdempotencyOptions,
schema_history::InMemorySchemaHistory,
RuntimeConfig,
RuntimeSourceConfig,
};
let checkpoint = InMemoryCheckpoint::default();
let schema_history = InMemorySchemaHistory::default();
let config = RuntimeConfig::new(
RuntimeSourceConfig::Disabled,
checkpoint,
schema_history,
)
.with_max_buffer_size(10_000)
.with_idempotency(IdempotencyOptions::new(100_000)?)
.with_max_poll_wait_ms(500);
// InMemoryCheckpoint is for tests and local development only.
// Use FileCheckpoint (or a custom durable backend) in production.
// Runtime duplicate suppression is enabled by default.
// Use this only when you need to opt out explicitly.
let config_without_dedup = RuntimeConfig::new(
RuntimeSourceConfig::Disabled,
InMemoryCheckpoint::default(),
InMemorySchemaHistory::default(),
)
.with_idempotency_disabled();
```
Durable schema history for restart resilience:
```rust
use rustcdc::{
checkpoint::InMemoryCheckpoint,
schema_history::FileSchemaHistory,
RuntimeConfig,
RuntimeSourceConfig,
};
async fn durable_schema_history_config() -> rustcdc::Result<()> {
let checkpoint = InMemoryCheckpoint::default();
let schema_history = FileSchemaHistory::new("/var/lib/rustcdc/schema-history.json").await?;
let _config = RuntimeConfig::new(RuntimeSourceConfig::Disabled, checkpoint, schema_history);
Ok(())
}
```
## Runtime Lifecycle
The canonical lifecycle is:
1. create runtime with `CdcRuntime::new`
2. start runtime with `start()`
3. read batches with `poll_event_batch()` or `event_batches()`
4. acknowledge durable progress with `commit_ack()`
5. stop runtime with `stop()`
Minimal lifecycle example:
```rust
use rustcdc::{CdcRuntime, Result, RuntimeConfig, RuntimeSourceConfig};
use rustcdc::checkpoint::InMemoryCheckpoint;
use rustcdc::schema_history::InMemorySchemaHistory;
async fn run_once() -> Result<()> {
let checkpoint = InMemoryCheckpoint::default();
let schema_history = InMemorySchemaHistory::default();
let config = RuntimeConfig::new(RuntimeSourceConfig::Disabled, checkpoint, schema_history);
let mut runtime = CdcRuntime::new(config)?;
runtime.start().await?;
let batch = runtime.poll_event_batch().await?;
if let Some(token) = batch.ack_token() {
runtime.commit_ack(token).await?;
}
runtime.stop().await?;
Ok(())
}
```
## Source Selection
`RuntimeSourceConfig` selects the source connector at runtime:
- `Postgres(PostgresSourceConfig)`
- `Mysql(MysqlSourceConfig)`
- `SqlServer(SqlServerSourceConfig)`
- `Disabled`
The runtime also exposes connector capability metadata via `source_capabilities()` and validates incompatible settings (for example, snapshot tables for a source that does not support snapshots). Capability metadata includes `snapshot_checkpoint_resume`, which is `true` for PostgreSQL, MySQL, and SQL Server. Snapshot checkpoints now resume through connector-native cursor state and keep stream bootstrap aligned with the saved snapshot watermark.
## Event Model
`Event` is the canonical envelope consumed by downstream code.
Key fields include:
- `op`: one of `Insert`, `Update`, `Delete`, `Read`, `SchemaChange`, `Truncate`
- `source`: source metadata and offset context
- `transaction`: optional transaction metadata
- `snapshot`: optional snapshot metadata
`Operation::Truncate` is emitted when a `TRUNCATE` statement removes all rows from a table.
`before` and `after` are always `None` for truncate events. Only connectors that advertise
`ConnectorCapabilities::truncate` emit this variant (currently: PostgreSQL).
`Operation::to_str()` returns a `&'static str` for zero-allocation display and comparison.
The event envelope is designed to support stable replay and source-agnostic processing.
## Delivery And Acknowledgement Semantics
`poll_event_batch()` returns an `EventBatch` that contains events and an optional `AckToken`.
Correct processing sequence:
1. consume events in batch order
2. durably commit sink side effects
3. call `commit_ack(token)`
Important semantics:
- not acknowledging after sink durability may replay already-delivered events
- `stop()` fails fast if uncommitted events remain in-flight
- `force_stop()` is intended for emergency drain where replay is acceptable
- process termination without `stop()` can replay the in-flight batch on restart (at-least-once)
- source confirmation failures after durable checkpoint commit now fail fast by default (`PostCommitSourceConfirmPolicy::FailFast`)
To preserve pre-existing availability-biased behavior, opt into continue mode explicitly:
```rust
use rustcdc::PostCommitSourceConfirmPolicy;
let config = config.with_post_commit_source_confirm_policy(
PostCommitSourceConfirmPolicy::Continue,
);
```
### Sink-Side Idempotency Guard
For at-least-once replay tolerance, rustcdc now provides a built-in
`EventIdempotencyGuard` helper for consumer loops.
```rust
use rustcdc::{EventIdempotencyGuard, Result};
async fn process_batch(events: &[rustcdc::Event]) -> Result<usize> {
let mut guard = EventIdempotencyGuard::new(100_000)?.with_ttl_ms(60_000)?;
let mut applied = 0usize;
for event in events {
if !guard.should_process(event)? {
continue;
}
// apply sink side-effect here
applied += 1;
}
Ok(applied)
}
```
The fingerprint includes source position, transaction sequence metadata, and
payload shape so events that share coarse offsets remain distinguishable.
## Streaming Consumption
`event_batches()` provides a stream-based consumption model for non-empty batches.
```rust
use futures_util::StreamExt;
let mut batches = runtime.event_batches();
while let Some(batch) = batches.next().await {
let batch = batch?;
if let Some(token) = batch.ack_token() {
runtime.commit_ack(token).await?;
}
}
```
## Incremental Snapshot API (DBLog Pattern)
`CdcRuntime::start()` currently initializes the classic snapshot + stream handoff.
If you want non-blocking incremental snapshot behavior, start it from a connector
connection directly via `start_incremental_snapshot(...)`.
```rust
use rustcdc::{
IncrementalSnapshotConfig, PostgresConnection, PostgresSourceConfig, Result,
};
async fn start_incremental_stream(config: PostgresSourceConfig) -> Result<()> {
let mut connection = PostgresConnection::new(config);
connection.connect().await?;
let incremental = IncrementalSnapshotConfig::new(vec!["public.users".to_string()])
.with_chunk_size(1_000);
let mut stream = connection
.start_incremental_snapshot(incremental, None)
.await?;
let _events = stream.next_events(5_000).await?;
Ok(())
}
```
This connector-level API is also available for MySQL and SQL Server via
`MysqlConnection::start_incremental_snapshot(...)` and
`SqlServerConnection::start_incremental_snapshot(...)`.
## Checkpoint Backends
Checkpoint implementations persist source offsets and determine restart position.
Built-in options include:
- in-memory checkpoint storage (tests)
- file-backed checkpoint storage
Custom checkpoint backends can be implemented through the `Checkpoint` trait.
## Runtime Introspection
The runtime exposes embeddable control-plane state and metrics surfaces:
- `admin_snapshot()`
- `admin_snapshot_json()`
- `admin_metrics_prometheus()`
Use these methods for health endpoints, diagnostics views, and lightweight observability bridges.
## Connection Retry Policy
For transient source connectivity failures, configure `ConnectionRetryPolicy` via
`RuntimeConfig::with_connection_retry`. The runtime retries only recoverable errors
(`SourceError`, `TimeoutError`); fatal configuration errors propagate immediately.
```rust
use rustcdc::core::ConnectionRetryPolicy;
let policy = ConnectionRetryPolicy {
max_retries: Some(5), // None = retry indefinitely
initial_delay_ms: 300,
max_delay_ms: 10_000,
};
let config = config.with_connection_retry(policy);
```
Defaults: 5 retries, 300 ms initial delay, 10 s cap, truncated exponential backoff.
Set `max_retries: None` for indefinitely-retrying long-running pipelines.
## Transform Configuration
`FilterProjectionTransform::new(config)` returns `Result<Self>` — configuration
errors (for example empty filter values) are caught at construction time rather
than silently at apply time.
```rust
use rustcdc::transform::{
FilterField, FilterOperator, FilterProjectionConfig, FilterProjectionTransform, FilterRule,
};
let transform = FilterProjectionTransform::new(FilterProjectionConfig {
filter: Some(FilterRule::new(FilterField::Op, FilterOperator::Eq, "insert")),
include_columns: Some(vec!["id".into(), "email".into()]),
exclude_columns: None,
})?; // returns Err(ConfigError) for invalid filter values
```
## Related Documentation
- [Getting Started](getting_started.md)
- [Configuration Reference](config_reference.md)
- [Architecture](architecture.md)
- [Schema Evolution and DDL Capture](schema_evolution.md)
- [Reliability Testing Guide](reliability_testing.md)
- [Adapter SDK](adapter_sdk.md)
---
## MariaDB Support
rustcdc supports **MariaDB 10.5 and 10.6** via the MySQL protocol stack. The
`mysql_async` library handles the MariaDB binlog wire protocol transparently.
rustcdc also provides a first-class `MariaDbSourceConfig` wrapper for explicit
runtime source typing (`mariadb`) and separate checkpoint namespace handling.
### Capability Matrix
| Full-table snapshot | ✅ | ✅ | ✅ (validated on 10.5 and 10.6) | ✅ |
| Resumable snapshot (keyset)| ✅ | ✅ | ✅ (validated on 10.5 and 10.6) | ✅ |
| CDC streaming | ✅ | ✅ | ✅ (validated on 10.5 and 10.6) | ✅ |
| GTID-based position | — | ✅ | ✅ (connector support) | — |
| Binlog position fallback | — | ✅ | ✅ (connector support) | — |
| TLS connections | ✅ | ✅ | ✅ (connector support) | ✅ |
| Transaction boundaries | ✅ | ✅ | ✅ (validated on 10.5 and 10.6) | ✅ |
| Schema change events | ✅ | ✅ | ✅ | ✅ |
**Note on schema change events**: Runtime connectors emit canonical `Operation::SchemaChange` events for supported DDL capture paths. Use `rustcdc::ddl_capture` and `rustcdc::schema_history` together when building schema-aware downstream consumers.
**MariaDB nuance**: MariaDB schema-change behavior follows the MySQL connector path and is exercised in integration coverage, but depth may vary by engine/version-specific DDL semantics.
### Connecting to MariaDB
Use `MysqlSourceConfig` exactly as you would for MySQL:
```rust
use rustcdc::source::mysql::MysqlSourceConfig;
let config = MysqlSourceConfig {
host: "mariadb-host".into(),
port: 3306,
user: "replication_user".into(),
password: rustcdc::SecretString::new("secret"),
database: "my_db".into(),
..Default::default()
};
```
### Known Limitations
- MariaDB 10.3 and earlier are **not tested** and may work with basic binlog
events but are unsupported.
- MariaDB Galera Cluster is not tested; CDC from a Galera node may exhibit
unexpected behaviour due to write-set replication semantics.
- `ROW_FORMAT=COMPRESSED` tables require `binlog_row_image = FULL` on the
server; partial images are not supported.
MariaDB integration evidence includes dedicated end-to-end suites for snapshot
resume, stream CDC, and snapshot-to-stream handoff on MariaDB 10.5 and 10.6 in
`tests/mariadb_e2e_integration.rs`, plus connection lifecycle coverage in
`tests/mariadb_connection_integration.rs`.