rustcdc 0.2.0

Embeddable Rust CDC library focused on correctness-first capture primitives
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
# rustcdc API Guide

This document is the primary API reference for embedding rustcdc in Rust applications.

## Audience

This guide is for engineers integrating rustcdc as a library and building custom runtime loops.

## API Surface

The core embedder API is centered on:

- `RuntimeConfig` for runtime construction
- `CdcRuntime` for lifecycle and event delivery
- `RuntimeSourceConfig` for source selection
- `EventBatch` and `AckMode` for loss-safe delivery semantics

## Runtime Construction

`RuntimeConfig` binds four concerns:

- source connector configuration
- checkpoint backend
- schema history backend
- runtime options and observability

Typical shape:

```rust
use rustcdc::{
  checkpoint::InMemoryCheckpoint,
  IdempotencyOptions,
  schema_history::InMemorySchemaHistory,
  RuntimeConfig,
  RuntimeSourceConfig,
};

let checkpoint = InMemoryCheckpoint::default();
let schema_history = InMemorySchemaHistory::default();

let config = RuntimeConfig::new(
  RuntimeSourceConfig::Disabled,
  checkpoint,
  schema_history,
)
.with_max_buffer_size(10_000)
.with_idempotency(IdempotencyOptions::new(100_000)?)
.with_max_poll_wait_ms(500);

// InMemoryCheckpoint is for tests and local development only.
// Use FileCheckpoint (or a custom durable backend) in production.

// Runtime duplicate suppression is enabled by default.
// Use this only when you need to opt out explicitly.
let config_without_dedup = RuntimeConfig::new(
  RuntimeSourceConfig::Disabled,
  InMemoryCheckpoint::default(),
  InMemorySchemaHistory::default(),
)
.with_idempotency_disabled();
```

Durable schema history for restart resilience:

```rust
use rustcdc::{
  checkpoint::InMemoryCheckpoint,
  schema_history::FileSchemaHistory,
  RuntimeConfig,
  RuntimeSourceConfig,
};

async fn durable_schema_history_config() -> rustcdc::Result<()> {
  let checkpoint = InMemoryCheckpoint::default();
  let schema_history = FileSchemaHistory::new("/var/lib/rustcdc/schema-history.json").await?;

  let _config = RuntimeConfig::new(RuntimeSourceConfig::Disabled, checkpoint, schema_history);
  Ok(())
}
```

## Runtime Lifecycle

The canonical lifecycle is:

1. create runtime with `CdcRuntime::new`
2. start runtime with `start()`
3. read batches with `poll_event_batch()` or `event_batches()`
4. acknowledge durable progress with `commit_ack()`
5. stop runtime with `stop()`

Minimal lifecycle example:

```rust
use rustcdc::{CdcRuntime, Result, RuntimeConfig, RuntimeSourceConfig};
use rustcdc::checkpoint::InMemoryCheckpoint;
use rustcdc::schema_history::InMemorySchemaHistory;

async fn run_once() -> Result<()> {
  let checkpoint = InMemoryCheckpoint::default();
  let schema_history = InMemorySchemaHistory::default();
  let config = RuntimeConfig::new(RuntimeSourceConfig::Disabled, checkpoint, schema_history);

  let mut runtime = CdcRuntime::new(config)?;
  runtime.start().await?;

  let batch = runtime.poll_event_batch().await?;
  runtime.commit_ack(batch.ack_mode()).await?;

  runtime.stop().await?;
  Ok(())
}
```

## Source Selection

`RuntimeSourceConfig` selects the source connector at runtime:

- `Postgres(PostgresSourceConfig)`
- `Mysql(MysqlSourceConfig)`
- `MariaDb(MariaDbSourceConfig)`
- `SqlServer(SqlServerSourceConfig)`

Prefer the associated constructors when building embedder code for readability:

`RuntimeSourceConfig::postgres(...)`
`RuntimeSourceConfig::mysql(...)`
`RuntimeSourceConfig::mariadb(...)`
`RuntimeSourceConfig::sqlserver(...)`
`RuntimeSourceConfig::disabled()`

Source configuration in library code is explicit and typed; environment parsing
belongs in host applications or examples that map `CDC_RS_*` variables into
connector config values.

The runtime also exposes connector capability metadata via `source_capabilities()` and validates incompatible settings (for example, snapshot tables for a source that does not support snapshots). Capability metadata includes `snapshot_checkpoint_resume`, which is `true` for PostgreSQL, MySQL, and SQL Server. Snapshot checkpoints now resume through connector-native cursor state and keep stream bootstrap aligned with the saved snapshot watermark.

## Event Model

`Event` is the canonical envelope consumed by downstream code.

Key fields include:
- `op`: one of `Insert`, `Update`, `Delete`, `Read`, `SchemaChange`, `Truncate`
- `source`: source metadata and offset context
- `transaction`: optional transaction metadata
- `snapshot`: optional snapshot metadata

`Operation::Truncate` is emitted when a `TRUNCATE` statement removes all rows from a table.
`before` and `after` are always `None` for truncate events. Only connectors that advertise
`ConnectorCapabilities::truncate` emit this variant (currently: PostgreSQL).
`Operation::to_str()` returns a `&'static str` for zero-allocation display and comparison.

The event envelope is designed to support stable replay and source-agnostic processing.

## Delivery And Acknowledgement Semantics

`poll_event_batch()` returns an `EventBatch` that contains events and an `AckMode`.

```rust
pub enum AckMode {
    Required(AckToken),   // must commit; skipping risks replay on restart
    NotRequired,          // empty batch or disabled source; commit_ack is a no-op
}
```

Correct processing sequence:

1. consume events in batch order
2. durably commit sink side effects
3. call `commit_ack(batch.ack_mode())`

`commit_ack` accepts `impl Into<AckMode>` — passing `AckMode::NotRequired` is a documented zero-cost no-op. Raw `AckToken` values are also accepted (via `From<AckToken>`).

Important semantics:
- not acknowledging after sink durability may replay already-delivered events
- `stop()` fails fast if uncommitted events remain in-flight
- `force_stop()` is intended for emergency drain where replay is acceptable; emits a `WARN` log with `shutdown_mode = "forced"`
- `drain_and_stop()` polls until the source is exhausted then stops cleanly
- process termination without `stop()` can replay the in-flight batch on restart (at-least-once)
- source confirmation failures after durable checkpoint commit now fail fast by default (`PostCommitSourceConfirmPolicy::FailFast`)

To preserve pre-existing availability-biased behavior, opt into continue mode explicitly:

```rust
use rustcdc::PostCommitSourceConfirmPolicy;

let config = config.with_post_commit_source_confirm_policy(
  PostCommitSourceConfirmPolicy::Continue,
);
```

### Sink-Side Idempotency Guard

For at-least-once replay tolerance, rustcdc now provides a built-in
`EventIdempotencyGuard` helper for consumer loops.

```rust
use rustcdc::{EventIdempotencyGuard, Result};

async fn process_batch(events: &[rustcdc::Event]) -> Result<usize> {
  let mut guard = EventIdempotencyGuard::new(100_000)?.with_ttl_ms(60_000)?;
  let mut applied = 0usize;

  for event in events {
    if !guard.should_process(event)? {
      continue;
    }
    // apply sink side-effect here
    applied += 1;
  }

  Ok(applied)
}
```

The fingerprint includes source position, transaction sequence metadata, and
payload shape so events that share coarse offsets remain distinguishable.

### Restart Replay Window

**The in-memory idempotency guard resets on every process restart.** It provides
within-session deduplication only, not cross-restart deduplication.

On restart the runtime replays all events between the last durable checkpoint and
the current source position. Events in this window that were already delivered
before the crash **will be re-delivered** and will not be detected as duplicates
by the in-memory guard (because its state was lost).

**Implications:**

- The replay window size is bounded by your commit frequency. Committing after
  each batch keeps the window small.
- For sink operations that are naturally idempotent (upsert-by-PK, conditional
  inserts, etc.) this is safe to ignore.
- For non-idempotent sinks (append-only log ingest, payment triggers, etc.) you
  **must** provide cross-restart deduplication at the sink layer.

**Cross-restart deduplication pattern:**

Use `fingerprint_event_stable` (SHA-256 based, deterministic across process
restarts) and persist seen fingerprints in your sink's storage:

```rust
use rustcdc::idempotency::fingerprint_event_stable;

// On each delivered event:
let fingerprint = fingerprint_event_stable(&event)?; // returns Result<String, FingerprintError>
if !sink_has_seen(&fingerprint).await? {
    sink_write(&event).await?;
    sink_mark_seen(&fingerprint).await?;
}
```

Unlike `fingerprint_event_transient` (which uses a per-process random seed),
`fingerprint_event_stable` produces the same fingerprint for the same event
across restarts, making it safe to persist and check against a durable store.

## Streaming Consumption

`event_batches()` provides a stream-based consumption model for non-empty batches.

```rust
use futures_util::StreamExt;

let mut batches = runtime.event_batches();
while let Some(batch) = batches.next().await {
  let batch = batch?;
  runtime.commit_ack(batch.ack_mode()).await?;
}
```

For cooperative cancellation, use `event_batches_cancellable(token)` with a `CancellationToken`:

```rust
use tokio_util::sync::CancellationToken;
use futures_util::StreamExt;

let cancel = CancellationToken::new();
let mut batches = runtime.event_batches_cancellable(cancel.clone());
while let Some(batch) = batches.next().await {
  let batch = batch?;
  runtime.commit_ack(batch.ack_mode()).await?;
}
// cancel.cancel() from another task unblocks the stream cleanly
```

## Incremental Snapshot API (DBLog Pattern)

`CdcRuntime::start()` supports both classic snapshot + stream handoff and
runtime-driven incremental snapshot startup (when configured via
`with_incremental_snapshot(...)`).

If you want connector-managed non-blocking incremental snapshot behavior,
you can also start it directly from a connector connection via
`start_incremental_snapshot(...)`.

```rust
use rustcdc::{
  IncrementalSnapshotConfig, PostgresConnection, PostgresSourceConfig, Result,
};

async fn start_incremental_stream(config: PostgresSourceConfig) -> Result<()> {
  let mut connection = PostgresConnection::new(config);
  connection.connect().await?;

  let incremental = IncrementalSnapshotConfig::new(vec!["public.users".to_string()])
    .with_chunk_size(1_000);

  let mut stream = connection
    .start_incremental_snapshot(incremental, None)
    .await?;

  let _events = stream.next_events(5_000).await?;
  Ok(())
}
```

This connector-level API is also available for MySQL and SQL Server via
`MysqlConnection::start_incremental_snapshot(...)` and
`SqlServerConnection::start_incremental_snapshot(...)`.

## EventBatch Inspection

`EventBatch` provides several inspection methods:

- `batch.len()` / `batch.is_empty()` — event count
- `batch.ack_mode()``AckMode::Required(token)` or `AckMode::NotRequired`
- `batch.oldest_event_source_timestamp_ms()` — millisecond timestamp of the oldest event in the batch (for lag monitoring)
- `batch.events()` — iterator over contained `Event` values

## Checkpoint Backends

Checkpoint implementations persist source offsets and determine restart position.

Built-in options include:

- `InMemoryCheckpoint` — zero-config, suitable for tests and short-lived processes. State is lost on restart.
- `FileCheckpoint` — file-backed persistence; recommended for production.

Custom checkpoint backends can be implemented through the `Checkpoint` trait.

## Runtime Introspection

The runtime exposes embeddable control-plane state and metrics surfaces:

- `admin_snapshot()`
- `admin_snapshot_json()`
- `admin_metrics_prometheus()`

Use these methods for health endpoints, diagnostics views, and lightweight observability bridges.

## Connection Retry Policy

For transient source connectivity failures, configure `ConnectionRetryPolicy` via
`RuntimeConfig::with_connection_retry`. The runtime retries only recoverable errors
(`SourceError`, `TimeoutError`); fatal configuration errors propagate immediately.

```rust
use rustcdc::core::ConnectionRetryPolicy;

let policy = ConnectionRetryPolicy {
    max_retries: Some(5),       // None = retry indefinitely
    initial_delay_ms: 300,
    max_delay_ms: 10_000,
};

let config = config.with_connection_retry(policy);
```

Defaults: 5 retries, 300 ms initial delay, 10 s cap, truncated exponential backoff.
Set `max_retries: None` for indefinitely-retrying long-running pipelines.

## Transform Configuration

`FilterProjectionTransform::new(config)` returns `Result<Self>` — configuration
errors (for example empty filter values) are caught at construction time rather
than silently at apply time.

```rust
use rustcdc::transform::{
  FilterField, FilterOperator, FilterProjectionConfig, FilterProjectionTransform, FilterRule,
};

let transform = FilterProjectionTransform::new(FilterProjectionConfig {
  filter: Some(FilterRule::new(FilterField::Op, FilterOperator::Eq, "insert")),
    include_columns: Some(vec!["id".into(), "email".into()]),
    exclude_columns: None,
})?;  // returns Err(ConfigError) for invalid filter values
```

### Content-based filtering

`FilterField::AfterField(path)` and `FilterField::BeforeField(path)` match
against fields inside the event payload using a dot-separated path (e.g. `"user.country"`).

Available operators: `Eq`, `Ne`, `Contains`, `Regex`, `Lt`, `LtEq`, `Gt`, `GtEq`.

```rust
// Keep only events where after["status"] == "active"
FilterRule::new(FilterField::AfterField("status".into()), FilterOperator::Eq, "active")

// Keep events where after["amount"] > 100
FilterRule::new(FilterField::AfterField("amount".into()), FilterOperator::Gt, "100")

// Keep events matching a regex on after["email"]
FilterRule::new(FilterField::AfterField("email".into()), FilterOperator::Regex, r"@example\.com$")
```

`FilterOperator::Regex` patterns are compiled once at construction; invalid
patterns return `Err(ConfigError)` at `FilterProjectionTransform::new` time.

### Sensitive-data masking (`MaskHashTransform`)

> **⚠ GDPR / privacy warning**
>
> `MaskRule::Hash` (SHA-256) is **deterministic and unsalted**.  For
> low-cardinality fields (e.g., `gender`, `country_code`, `status`) or any
> field whose values are enumerable, an attacker can reverse the hash via a
> pre-computed lookup table.  **Do not rely on `Hash` alone for GDPR
> pseudonymization compliance.**
>
> Recommended approaches for GDPR-compliant pseudonymization:
> - Use `MaskRule::Encrypt` (AES-256-GCM) — provides reversible but opaque tokens.
> - Add a site-specific HMAC secret by pre-hashing `format!("{secret}:{value}")`
>   before storing, then applying `MaskRule::Hash` on the HMAC output.
> - Consider `MaskRule::Redact` or `MaskRule::Null` for fields that must be
>   fully suppressed in the downstream stream.
>
> **Default behaviour change in 0.2**: `MaskHashConfig::default()` now uses
> `default_rule: MaskRule::Passthrough`, meaning unlisted fields are passed
> through unchanged.  Use `MaskHashConfig::hash_all()` if you need the old
> "hash everything" behaviour.

```rust
use rustcdc::{MaskHashConfig, MaskHashTransform, MaskRule};

// Hash only specified PII fields; leave everything else unchanged.
let mut config = MaskHashConfig::default();
config.mask_rules.insert("email".into(), MaskRule::Hash);
config.mask_rules.insert("ssn".into(),   MaskRule::Null);

// Encrypt a field with AES-256-GCM (requires "encryption" feature).
#[cfg(feature = "encryption")]
config.mask_rules.insert("credit_card".into(), MaskRule::Encrypt("my-secret".into()));

// Opt-in aggressive mode: SHA-256 every field not explicitly configured.
let aggressive = MaskHashConfig::hash_all();
```

## Related Documentation

- [Getting Started]getting_started.md
- [Configuration Reference]config_reference.md
- [Architecture]architecture.md
- [Schema Evolution and DDL Capture]schema_evolution.md
- [Reliability Testing Guide]reliability_testing.md
- [Adapter SDK]adapter_sdk.md

---

## MariaDB Support

rustcdc supports **MariaDB 10.5, 10.6, and 10.11** via the MySQL protocol stack. The
`mysql_async` library handles the MariaDB binlog wire protocol transparently.
rustcdc also provides a first-class `MariaDbSourceConfig` wrapper for explicit
runtime source typing (`mariadb`) and separate checkpoint namespace handling.

### Capability Matrix

| Capability                 | PostgreSQL | MySQL 8+ | MariaDB 10.5/10.6/10.11 | SQL Server |
|----------------------------|:----------:|:--------:|:-----------------:|:----------:|
| Full-table snapshot        ||| ✅ (validated on 10.5 and 10.6) ||
| Resumable snapshot (keyset)||| ✅ (validated on 10.5 and 10.6) ||
| CDC streaming              ||| ✅ (validated on 10.5 and 10.6) ||
| GTID-based position        ||| ✅ (connector support) ||
| Binlog position fallback   ||| ✅ (connector support) ||
| TLS connections            ||| ✅ (connector support) ||
| Transaction boundaries     ||| ✅ (validated on 10.5 and 10.6) ||
| Schema change events       |||||

MariaDB 10.11 currently has explicit process-crash and replay evidence coverage,
while 10.5/10.6 are validated across the core connection and end-to-end lanes.

**Note on schema change events**: Runtime connectors emit canonical `Operation::SchemaChange` events for supported DDL capture paths. Use `rustcdc::ddl_capture` and `rustcdc::schema_history` together when building schema-aware downstream consumers.

**MariaDB nuance**: MariaDB schema-change behavior follows the MySQL connector path and is exercised in integration coverage, but depth may vary by engine/version-specific DDL semantics.

### Connecting to MariaDB

Use `MysqlSourceConfig` exactly as you would for MySQL:

```rust
use rustcdc::source::mysql::MysqlSourceConfig;

let config = MysqlSourceConfig {
    host: "mariadb-host".into(),
    port: 3306,
    user: "replication_user".into(),
    password: rustcdc::SecretString::new("secret"),
    database: "my_db".into(),
    ..Default::default()
};
```

### Known Limitations

- MariaDB 10.3 and earlier are **not tested** and may work with basic binlog
  events but are unsupported.
- MariaDB Galera Cluster is not tested; CDC from a Galera node may exhibit
  unexpected behaviour due to write-set replication semantics.
- `ROW_FORMAT=COMPRESSED` tables require `binlog_row_image = FULL` on the
  server; partial images are not supported.

MariaDB integration evidence includes dedicated end-to-end suites for snapshot
resume, stream CDC, and snapshot-to-stream handoff on MariaDB 10.5 and 10.6 in
`tests/mariadb_e2e_integration.rs`, plus connection lifecycle coverage in
`tests/mariadb_connection_integration.rs`, and process-crash replay coverage on
MariaDB 10.11 in `tests/runtime_mariadb_process_crash_integration.rs`.