rustcdc 0.5.0

Embeddable Rust CDC library focused on correctness-first capture primitives
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
# rustcdc Configuration Reference

**Version:** v0.1+  
**Audience:** Platform engineers and application developers embedding rustcdc

---

## Table of Contents

1. [RuntimeConfig]#runtimeconfig
2. [Runtime Consumption Model]#runtime-consumption-model
3. [Connector Capabilities]#connector-capabilities
4. [PostgreSQL Source Configuration]#postgresql-source-configuration
5. [MySQL Source Configuration]#mysql-source-configuration
6. [MariaDB Source Configuration]#mariadb-source-configuration
7. [SQL Server Source Configuration]#sql-server-source-configuration
8. [Checkpoint Configuration]#checkpoint-configuration
9. [Observability Configuration]#observability-configuration
10. [Production Recommendations]#production-recommendations

---

## RuntimeConfig

Core runtime configuration for CDC operations.

```rust
pub struct RuntimeConfig<C, H> {
  /// Typed source connector configuration
  pub source: RuntimeSourceConfig,
    
    /// List of tables to snapshot on initial run (when no checkpoint exists)
    /// Format: ["schema.table", "schema.table2"]
    /// Leave empty to start in stream-only mode on first run
    pub snapshot_tables: Vec<String>,
    
    /// Checkpoint backend (pluggable trait implementation)
    pub checkpoint: C,
    
    /// Schema history backend (pluggable trait implementation)
    pub schema_history: H,

    /// Explicit runtime options surface including observability and tuning.
    pub options: RuntimeOptions,
}
```

  `RuntimeOptions` contains the operational knobs that used to live as top-level runtime fields:

  ```rust
  pub struct RuntimeOptions {
    pub observability: RuntimeObservability,
    pub max_buffer_size: usize,
    pub max_poll_wait_ms: u64,
    pub transform_error_policy: TransformErrorPolicy,
    pub post_commit_source_confirm_policy: PostCommitSourceConfirmPolicy,
    pub idempotency: Option<IdempotencyOptions>,
    pub validate_events: bool,
    pub schema_history_retention: Option<SchemaHistoryRetention>,
    /// Exponential-backoff retry policy for recoverable source connection errors.
    /// `None` disables retry (errors propagate immediately).
    pub connection_retry: Option<ConnectionRetryPolicy>,
  }
  ```

Default runtime safety posture:
- `transform_error_policy = Halt`
- `post_commit_source_confirm_policy = FailFast`
- `validate_events = true`
- `schema_history_retention = Some(SchemaHistoryRetention::keep_last(256))`

### RuntimeConfig Builder Example

```rust
use rustcdc::{
  checkpoint::InMemoryCheckpoint,
  schema_history::InMemorySchemaHistory,
  PostgresSourceConfig,
  RuntimeConfig,
  RuntimeSourceConfig,
  SecretString,
};

let checkpoint = InMemoryCheckpoint::default();
let schema_history = InMemorySchemaHistory::default();
let source = PostgresSourceConfig {
  host: "localhost".into(),
  port: 5432,
  user: "postgres".into(),
  password: SecretString::from_callback("postgres-password", || {
    std::env::var("CDC_RS_POSTGRES_PASSWORD")
      .map_err(|error| rustcdc::Error::ConfigError(error.to_string()))
  }),
  database: "mydb".into(),
  replication_slot_name: "rustcdc_slot".into(),
  publication_name: "rustcdc_publication".into(),
  conn_timeout_secs: 30,
  ..PostgresSourceConfig::default()
};

let config = RuntimeConfig::new(RuntimeSourceConfig::postgres(source), checkpoint, schema_history)
    .with_snapshot_tables(vec!["public.users".to_string(), "public.orders".to_string()])
    .with_max_buffer_size(50_000)
    .with_max_poll_wait_ms(2_000)
    .with_transform_error_policy(rustcdc::TransformErrorPolicy::Halt);
```

For env-driven bootstrapping, use explicit argument parsing in your host
application and map values into typed source configs.

Prefer the associated constructors when selecting a source in embedder code:

- `RuntimeSourceConfig::postgres(...)`
- `RuntimeSourceConfig::mysql(...)`
- `RuntimeSourceConfig::mariadb(...)`
- `RuntimeSourceConfig::sqlserver(...)`
- `RuntimeSourceConfig::disabled()`

## Runtime Consumption Model

The preferred embedder surface is now batch-oriented rather than count-oriented.

`poll_event_batch()` returns an `EventBatch` containing the delivered events plus an `AckMode`. Re-polling before acknowledgement redelivers the same in-flight batch, which keeps retry behavior loss-safe.

```rust
use rustcdc::{CdcRuntime, Result};

async fn consume_once(runtime: &mut CdcRuntime) -> Result<()> {
  let batch = runtime.poll_event_batch().await?;
  if batch.is_empty() {
    return Ok(());
  }

  runtime.commit_ack(batch.ack_mode()).await?;

  Ok(())
}
```

For partial acknowledgement, split the token and commit only the accepted prefix. The remaining suffix will be re-delivered on the next poll.

```rust
use rustcdc::AckMode;

let batch = runtime.poll_event_batch().await?;
if let AckMode::Required(token) = batch.ack_mode() {
  let (accepted, _retry_later) = token.split_at(10)?;
  runtime.commit_ack(accepted).await?;
}
```

`event_batches()` exposes the same model as a stream of non-empty `EventBatch` values.

```rust
use futures_util::StreamExt;

let mut batches = runtime.event_batches();
while let Some(batch) = batches.next().await {
  let batch = batch?;
  runtime.commit_ack(batch.ack_mode()).await?;
}
```

`poll_event_batch()` + `commit_ack(batch.ack_mode())` is now the canonical runtime acknowledgement API.

## Connector Capabilities

Runtime source selection now exposes explicit connector capabilities through `ConnectorCapabilities`.

```rust
use rustcdc::{ConnectorCapabilities, RuntimeSourceConfig};

let source = RuntimeSourceConfig::Disabled;
let caps: ConnectorCapabilities = source.capabilities();
assert!(!caps.snapshot);
assert!(!caps.handoff);
assert!(!caps.ddl_capture);
```

When running a runtime instance, the same view is available from `source_capabilities()`:

```rust
let caps = runtime.source_capabilities();
if !caps.snapshot {
  // Guard feature wiring in embedders before attempting snapshot mode.
}
```

For configured PostgreSQL/MySQL/MariaDB/SQL Server sources, the runtime advertises
`snapshot=true`, `handoff=true`, `ddl_capture=true`, `heartbeat=true`, and
`schema_introspection=true`.

The runtime now also provides an embeddable admin/introspection surface that includes
capabilities, readiness/liveness, buffer depth, and delivery counters.

```rust
let admin = runtime.admin_snapshot();
assert_eq!(admin.state, "running");

let json = runtime.admin_snapshot_json()?;
let prometheus = runtime.admin_metrics_prometheus();
```

`admin_snapshot_json()` is intended for control-plane APIs, and
`admin_metrics_prometheus()` emits Prometheus-friendly text for embedding in
lightweight health endpoints.

The runtime constructor enforces capability guards. For example, configuring `snapshot_tables` with a source that does not support snapshots is rejected at construction time.

---

## PostgreSQL Source Configuration

```rust
pub struct PostgresSourceConfig {
    /// PostgreSQL host (FQDN or IP)
    pub host: String,
    
    /// PostgreSQL port
    /// Default: 5432
    pub port: u16,
    
    /// PostgreSQL username (should have REPLICATION role)
    pub user: String,
    
    /// PostgreSQL password material.
    /// Use `SecretString::new`,
    /// `SecretString::from_provider`, or `SecretString::from_callback`.
    pub password: SecretString,

    /// Database authentication mode.
    /// - `Password` (default): static password semantics
    /// - `AwsIamToken`: short-lived IAM token semantics (requires TLS transport)
    pub auth_mode: DatabaseAuthMode,
    
    /// Database name to replicate from
    pub database: String,
    
    /// Logical replication slot name
    /// Example: "rustcdc_slot"
    pub replication_slot_name: String,

    /// Publication name used by pgoutput
    /// Example: "rustcdc_publication"
    pub publication_name: String,
    
    /// Transport mode (`TransportConfig::tls()` by default when `tls` feature is enabled).
    pub transport: TransportConfig,
    
    /// Connection timeout in seconds
    /// Default: 30
    /// Range: 1 - 300
    pub conn_timeout_secs: u64,

    /// Stream poll interval in milliseconds
    /// Default: 50
    /// Range: 1 - 60000
    pub stream_poll_interval_ms: u64,

    /// Maximum events yielded per stream poll
    /// Default: 1000
    /// Range: 1 - 100000
    pub max_events_per_poll: usize,
    
}
```

### Secret Loading Patterns

Connector passwords are now modeled as `SecretString`, not raw `String` values.

```rust
use rustcdc::{SecretProvider, SecretString};
use std::sync::Arc;

struct VaultProvider;

impl SecretProvider for VaultProvider {
  fn resolve_secret(&self, reference: &str) -> rustcdc::Result<String> {
    Ok(format!("vault://{reference}"))
  }
}

let inline_secret = SecretString::new("postgres");
let provider_secret = SecretString::from_provider(
  "vault",
  "database/postgres/password",
  Arc::new(VaultProvider),
);
let callback_secret = SecretString::from_callback("runtime-refresh", || {
  std::env::var("CDC_RS_ROTATED_PASSWORD")
    .map_err(|error| rustcdc::Error::ConfigError(error.to_string()))
});
```

Deferred secrets are resolved at validation/connect time and remain redacted in `Debug`/`Display` output.

### Feature-Gated Encryption Transforms

Enable the `encryption` feature to use field-level AES-GCM encryption and decryption through the existing `MaskHashTransform` surface.

```rust
use rustcdc::{MaskHashConfig, MaskHashTransform, MaskRule, SecretString};
use std::collections::HashMap;

let mut encrypt_rules = HashMap::new();
encrypt_rules.insert(
  "profile.phone".to_string(),
  MaskRule::Encrypt(SecretString::from_callback("field-key", || {
    std::env::var("CDC_RS_FIELD_KEY")
      .map_err(|error| rustcdc::Error::ConfigError(error.to_string()))
  })),
);

let encrypt_transform = MaskHashTransform::new(MaskHashConfig {
  mask_rules: encrypt_rules,
  default_rule: MaskRule::Null,
});

let mut decrypt_rules = HashMap::new();
decrypt_rules.insert(
  "profile.phone".to_string(),
  MaskRule::Decrypt(SecretString::from_callback("field-key", || {
    std::env::var("CDC_RS_FIELD_KEY")
      .map_err(|error| rustcdc::Error::ConfigError(error.to_string()))
  })),
);

let decrypt_transform = MaskHashTransform::new(MaskHashConfig {
  mask_rules: decrypt_rules,
  default_rule: MaskRule::Null,
});
```

Encrypted fields are emitted as `enc:<nonce_b64>:<ciphertext_b64>` strings and decrypted back into their original JSON values with the matching key.

Format/KDF contract for current unversioned payloads:
- AEAD: AES-256-GCM
- Nonce: 12 random bytes (base64 encoded)
- KDF: HKDF-SHA-256, 32-byte output, no salt
- HKDF info label: `b"rustcdc-field-encryption"`

Future backward-compatibility rollout plan (when versioning becomes necessary):
- phase 1: decrypt supports both legacy unversioned and new versioned payloads
- phase 2: encrypt emits only the new versioned payload format
- phase 3: after migration window, remove legacy decrypt support with release-note callout

### Field Mapping Transform

Use `FieldMappingTransform` for high-value schema-alignment operations without
custom code:

- copy fields (`copy`)
- rename/move fields (`rename`)
- inject static literals (`set_literals`)
- remove fields (`remove`)

Paths use dot notation (`profile.email`, `meta.source`).

```rust
use rustcdc::{FieldMappingConfig, FieldMappingTransform};
use serde_json::json;

let transform = FieldMappingTransform::new(FieldMappingConfig {
  copy: vec![("user.email".into(), "email".into())],
  rename: vec![("user.name".into(), "user.full_name".into())],
  set_literals: vec![("meta.pipeline".into(), json!("orders"))],
  remove: vec!["legacy_flag".into()],
  strict: true,
})?;
```

`strict = true` fails fast when copy/rename/remove source paths are missing,
which helps catch drift during schema evolution and replay.

**Replay determinism caveat (important):**
- `MaskRule::Encrypt` is intentionally nonce-based and therefore non-deterministic.
- Replaying the same logical event will produce different ciphertext bytes.
- Use encryption rules only when your downstream dedup/idempotency logic does not depend on byte-identical payload replay.
- For replay-sensitive pipelines, prefer deterministic masking rules (`Hash`, `Redact`, `Truncate`, `Null`) on fields that participate in replay comparisons.

**Transport Selection:**
- `TransportConfig::tls()` (default with `tls` feature): TLS with system trust store
- `TransportConfig::tls_with_ca_cert_path(path)`: TLS with explicit CA bundle
- `TransportConfig::tls_insecure_skip_verify()`: TLS with certificate/hostname verification disabled (testing or tightly controlled air-gapped environments only)
- `TransportConfig::plaintext()`: unencrypted transport — credentials and data transmitted in the clear

Use TLS transport for all production connector configurations.
`TransportConfig::plaintext()` is provided as an explicit escape hatch for trusted
private networks and local integration testing only — never use it in production.

**Connection Retry Policy:**

Set `RuntimeOptions.connection_retry` to automatically retry recoverable source
connection failures with truncated exponential backoff:

```rust
use rustcdc::core::ConnectionRetryPolicy;

let config = RuntimeConfig::new(source, checkpoint, schema_history)
    .with_connection_retry(
        ConnectionRetryPolicy::new()
            .with_max_retries(Some(5))    // None retries indefinitely
            .with_initial_delay_ms(300)   // first retry after 300 ms
            .with_max_delay_ms(10_000),   // backoff capped at 10 s
    );
```

Only `SourceError` and `TimeoutError` trigger retry. Fatal errors (`ConfigError`,
`ValidationError`, etc.) propagate immediately regardless of this policy.

> **Operational warning — `max_retries: None` (indefinite retry):**
> Setting `max_retries: None` causes the runtime to retry failed source
> connections forever. This is appropriate for highly-available deployments
> where the source database is expected to recover (e.g., failover, transient
> network blips), but it **masks dead source connections indefinitely**.
> If your monitoring relies on `poll_event_batch` returning an error to
> trigger alerts or circuit-breaking logic, indefinite retry will prevent
> that signal from surfacing.
>
> **Recommendations for `max_retries: None`:**
> - Set a `replication_lag_ms` alert threshold in your observability stack;
>   rising lag indicates the source is unreachable even when the runtime
>   does not surface an error.
> - Emit a dead-man's-switch metric: if `total_events_polled` stops growing
>   for an unexpectedly long window, treat the pipeline as stalled.
> - Consider bounded retry (`max_retries: Some(N)`) with external restart
>   orchestration (e.g., Kubernetes pod restart policy) so stalled pipelines
>   surface cleanly rather than silently burning CPU in a backoff loop.

### Connector-Specific Post-Commit Confirmation Semantics

`commit_ack()` has a uniform API but connector confirmation semantics are intentionally connector-specific:

- PostgreSQL:
  - Runtime confirms durable progress via replication-slot LSN confirmation.
  - Post-commit confirmation failures are governed by `PostCommitSourceConfirmPolicy`.
- MySQL:
  - Runtime durability is checkpoint-first.
  - `confirm_lsn` is a connector compatibility hook and does not provide PostgreSQL-style slot advancement semantics.
- SQL Server:
  - Runtime durability is checkpoint-first.
  - `confirm_lsn` is a connector compatibility hook and does not provide PostgreSQL-style slot advancement semantics.

Operationally, all connectors remain at-least-once at the runtime boundary; downstream idempotency remains mandatory.

**Resumable Snapshot Cursoring:**
- Snapshot resume uses primary-key keyset cursoring (not `ctid`).
- Tables configured for resumable snapshots must expose a primary key.
- Tables without a primary key are rejected for resumable snapshots.
- This prevents physical tuple cursor instability during long-running snapshots with concurrent writes.

---

## MySQL Source Configuration

```rust
pub struct MysqlSourceConfig {
    /// MySQL host (FQDN or IP)
    pub host: String,
    
    /// MySQL port
    /// Default: 3306
    pub port: u16,
    
    /// MySQL username (should have REPLICATION CLIENT and SELECT privileges)
    pub user: String,
    
    /// MySQL password material as `SecretString`
    pub password: SecretString,

    /// Database authentication mode.
    /// - `Password` (default): static password semantics
    /// - `AwsIamToken`: short-lived IAM token semantics (requires TLS transport)
    pub auth_mode: DatabaseAuthMode,
    
    /// Database name to replicate from
    pub database: String,
    
    /// Replication server id used by binlog stream client
    /// Default: 1
    pub server_id: u32,

    /// Whether GTID mode is enabled in your deployment.
    /// Default: false
    pub gtid_mode_enabled: bool,

    /// Validate that source binlog format is ROW before streaming.
    /// Default: true
    pub binlog_format_check: bool,
    
    /// Transport mode (`TransportConfig::tls()` by default when `tls` feature is enabled).
    pub transport: TransportConfig,
    
    /// Connection timeout in seconds
    /// Default: 30
    /// Range: 1 - 300
    pub conn_timeout_secs: u64,

    /// Stream poll interval in milliseconds
    /// Default: 50
    /// Range: 1 - 60000
    pub stream_poll_interval_ms: u64,

    /// Maximum events yielded per stream poll
    /// Default: 1000
    /// Range: 1 - 100000
    pub max_events_per_poll: usize,
    
}
```

### MySQL GTID String Format

```
GTID Set Format: "source_id:interval[, ...]"
Example: "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5"
```

---

## MariaDB Source Configuration

MariaDB uses the same MySQL-protocol transport stack, but rustcdc exposes it as a first-class source identity through [`MariaDbSourceConfig`] and `RuntimeSourceConfig::mariadb(...)`.

Use MariaDB when you need distinct checkpoint naming, source labeling, or runtime routing while keeping the same underlying binlog transport semantics as MySQL.

```rust
use rustcdc::{MariaDbSourceConfig, RuntimeSourceConfig};

let source = MariaDbSourceConfig {
  host: "localhost".to_string(),
  port: 3306,
  user: "cdc_user".to_string(),
  password: "cdc_password".to_string().into(),
  database: "events".to_string(),
  ..MariaDbSourceConfig::default()
};

let runtime_source = RuntimeSourceConfig::mariadb(source);
```

MariaDB supports the same startup, snapshot, and streaming modes as MySQL, but emits `source_type() == "mariadb"` and uses MariaDB-specific checkpoint identifiers.

> **GTID Format Warning:** MariaDB uses a distinct GTID format — `domain_id-server_id-sequence_no`
> (e.g. `0-1-12345`) — that is **incompatible** with MySQL's `uuid:interval` format
> (e.g. `3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5`). Never mix checkpoint files between
> MySQL and MariaDB instances, even if the schemas are identical. Doing so will produce
> invalid GTID resume positions and cause the connector to silently restart replication
> from the beginning or raise a fatal position error. Always use
> `RuntimeSourceConfig::mariadb(...)` (not `RuntimeSourceConfig::mysql(...)`) when
> connecting to a MariaDB server to ensure correct checkpoint namespace isolation.

---

## SQL Server Source Configuration

```rust
pub struct SqlServerSourceConfig {
    /// SQL Server host (FQDN or IP)
    pub host: String,
    
    /// SQL Server port
    /// Default: 1433
    pub port: u16,
    
    /// SQL Server username (should have CDC_ADMIN role)
    pub user: String,
    
    /// SQL Server password material as `SecretString`
    pub password: SecretString,
    
    /// Database name to replicate from (CDC must be enabled on database)
    pub database: String,
    
    /// Named instance (if using non-default instance)
    /// Example: Some("INSTANCE_NAME")
    /// Default: None (default instance)
    pub instance_name: Option<String>,
    
    /// Transport mode (`TransportConfig::tls()` by default when `tls` feature is enabled).
    pub transport: TransportConfig,
    
    /// Connection timeout in seconds
    /// Default: 30
    /// Range: 1 - 300
    pub conn_timeout_secs: u64,
    
    /// Require CDC to be enabled on database.
    /// Default: true
    pub cdc_enabled: bool,
    
    /// CDC schema name (usually "cdc")
    /// Default: "cdc"
    pub cdc_schema: String,

    /// Maximum concurrent SQL Server connections used by prereq checks
    /// Default: 4
    /// Range: 1 - 64
    pub prereq_pool_size: usize,

    /// Stream poll interval in milliseconds
    /// Default: 5000
    /// Range: 1 - 60000
    ///
    /// ⚠️ LATENCY NOTE: SQL Server CDC is polling-based, not event-driven.
    /// p99 latency ≈ stream_poll_interval_ms + CDC capture agent delay.
    /// Reduce this to 500–1000ms for latency-sensitive workloads.
    pub stream_poll_interval_ms: u64,

    /// Maximum events yielded per stream poll
    /// Default: 10000
    /// Range: 1 - 100000
    pub max_events_per_poll: usize,
    
}
```

### AWS IAM Auth Mode (MySQL/PostgreSQL)

For RDS-style IAM database auth, use connector `auth_mode = AwsIamToken` and
resolve the token through `SecretString::from_callback` (or provider) so each
new connection can fetch a fresh short-lived token.

TLS is mandatory when `auth_mode = AwsIamToken`.

### SQL Server Connection String Format

```
sqlserver://user:password@host:port;database=dbname;TrustServerCertificate=no;Encrypt=yes
```

---

## Checkpoint Configuration

### InMemoryCheckpoint

**Use Case:** Development, testing, single-machine deployments (volatile)

```rust
use rustcdc::checkpoint::InMemoryCheckpoint;

let checkpoint = InMemoryCheckpoint::default();
// Keeps checkpoint in memory; lost on process restart
```

### FileCheckpoint

**Use Case:** Local machine deployments; single-machine production (persistent but not HA)

```rust
use rustcdc::checkpoint::FileCheckpoint;

// Default: 0o600 (owner read/write only — enforced at load time).
let checkpoint = FileCheckpoint::new("/var/rustcdc/checkpoints");
// Stores checkpoint in JSON file; atomically updated via write-rename.
```

File permissions are enforced at load time: if the checkpoint file on disk has
mode bits accessible to group or other (e.g. 0o644), the load is rejected with
a `CheckpointError`. This protects connection credentials embedded in the
checkpoint from unauthorized access. Do not set a mode wider than 0o600.

**File Location Format:**
```
/var/rustcdc/checkpoints/checkpoint_postgres.json
/var/rustcdc/checkpoints/checkpoint_mysql.json
/var/rustcdc/checkpoints/checkpoint_sqlserver.json
```

**File Content Example:**
```json
{
  "checkpoint_format_version": 1,
  "source_type": "postgres",
  "committed_event_count": 12345,
  "offset": {
    "lsn": 281474976711680,
    "slot_name": "rustcdc_postgres_abc123"
  }
}
```

**Checkpoint Format Version Policy:**
- `checkpoint_format_version = 1` is the current write format.
- `checkpoint_format_version` is required for all file checkpoints.
- Unknown or missing versions are rejected at load time.
- rustcdc intentionally enforces fail-closed checkpoint decoding for format safety.

### Custom Durable Checkpoint Backend

**Use Case:** High-availability or centralized checkpoint management

rustcdc currently ships with `FileCheckpoint` and `InMemoryCheckpoint`.
For HA or centralized state, implement the `Checkpoint` trait against your
own storage backend (for example PostgreSQL, Redis, object storage, or a
platform metadata service).

---

## Observability Configuration

### NoOp Observability (Default)

```rust
use rustcdc::{RuntimeConfig, RuntimeObservability};

// Metrics and tracing are disabled by default via explicit runtime observability options.
let config = RuntimeConfig::new(...)
  .with_observability(RuntimeObservability::default());
```

### OpenTelemetry Observability

```rust
use rustcdc::{OTelConfig, OTelEventTracer, OTelMetricsCollector, RuntimeConfig, RuntimeObservability};
use std::sync::Arc;

let otel_config = OTelConfig::new(
    "http://otel-collector:4317",  // OTLP gRPC endpoint
    "rustcdc",                        // Service name
    "0.5.0",                         // Service version
    "production",                    // Environment
);

let metrics = Arc::new(OTelMetricsCollector::with_otlp_exporter(otel_config.clone())?);
let tracer = Arc::new(OTelEventTracer::with_otlp_exporter(otel_config)?);

let config = RuntimeConfig::new(...)
  .with_observability(
    RuntimeObservability::default()
      .with_metrics(metrics)
      .with_tracer(tracer)
  );
```

### Runtime Admin Metrics (`CdcRuntime::admin_metrics_prometheus()`)

| Metric | Type | Description |
|--------|------|-------------|
| `rustcdc_runtime_readiness` | Gauge | Runtime readiness (1 ready, 0 not ready) |
| `rustcdc_runtime_liveness` | Gauge | Runtime liveness (1 alive, 0 stopped) |
| `rustcdc_runtime_buffer_depth` | Gauge | Buffered events waiting for delivery |
| `rustcdc_runtime_in_flight_events` | Gauge | Delivered but uncommitted events |
| `rustcdc_runtime_events_polled_total` | Counter | Total events delivered by runtime batches |
| `rustcdc_runtime_events_committed_total` | Counter | Total acknowledged and checkpointed events |
| `rustcdc_runtime_events_deduplicated_total` | Counter | Total events suppressed by idempotency guard |
| `rustcdc_runtime_checkpoint_age_ms` | Gauge | Age of last durable checkpoint |
| `rustcdc_runtime_replication_lag_ms` | Gauge | Estimated source lag in milliseconds |

### OpenTelemetry Exported Metrics (`OTelMetricsCollector`)

| Metric | Type | Description |
|--------|------|-------------|
| `rustcdc.events.processed` | Counter | Total events successfully processed |
| `rustcdc.events.filtered` | Counter | Events dropped by transform pipeline |
| `rustcdc.errors` | Counter | Total errors encountered |
| `rustcdc.checkpoint.committed_count` | Counter | Total events committed to checkpoint |
| `rustcdc.replication_lag_ms` | Gauge | Estimated replication lag in milliseconds |
| `rustcdc.replication_lag_events` | Gauge | Estimated events not yet consumed |
| `rustcdc.checkpoint_offset` | Gauge | Current checkpoint offset (source-specific encoding) |
| `rustcdc.buffer_size` | Gauge | Current buffered event count |
| `rustcdc.snapshot_progress` | Gauge | Current snapshot completion percentage |
| `rustcdc.event_processing_duration` | Histogram | Event processing latency (ms) |
| `rustcdc.checkpoint_commit_duration` | Histogram | Checkpoint commit latency (ms) |

### Structured Log Fields

All logs include:
- `source_type`: Connector type (postgres, mysql, sqlserver)
- `timestamp`: ISO 8601 timestamp
- `level`: ERROR, WARN, INFO, DEBUG, TRACE
- `message`: Human-readable description
- Context fields (when applicable):
  - `table`: Table name
  - `event_count`: Number of events
  - `offset`: Source-specific position
  - `error`: Error details (sanitized)

**Enable Logging:**

```bash
# Set environment variable
export RUST_LOG=rustcdc=info,rustcdc::source=debug

# Run with structured JSON output
export RUST_LOG_FORMAT=json
```

---

## Production Recommendations

### Checkpoint Store Selection

| Scenario | Recommendation | Rationale |
|----------|---|----------|
| Single machine, restarts acceptable | FileCheckpoint | Simple, no external dependencies |
| HA cluster, centralized state | Custom `Checkpoint` backend | Integrates with your existing HA metadata store |
| Development/testing | InMemoryCheckpoint | Fast iteration; ephemeral OK |

### Buffer Size Tuning

```
Throughput-Focused (High Latency Acceptable):
  max_buffer_size = 100_000
  max_poll_wait_ms = 5_000
  → Batches large groups; fewer commits

Latency-Focused (Lower Throughput):
  max_buffer_size = 10_000
  max_poll_wait_ms = 1_000
  → Frequent commits; sub-second latency

Balanced (Recommended):
  max_buffer_size = 50_000
  max_poll_wait_ms = 2_000
  → ~50-100ms latency; 1K-2K commits/sec
```

### Connector Scaling Envelopes

Use these as baseline production profiles, then tune with real workload evidence.

**SQL Server connector tuning (`SqlServerSourceConfig`):**

| Profile | `prereq_pool_size` | `stream_poll_interval_ms` | `max_events_per_poll` | Suggested Use |
|---|---:|---:|---:|---|
| Low-latency | 4 | 250 | 5000 | Near-real-time dashboards, lower throughput |
| Balanced (default-ish) | 4-8 | 1000 | 10000-20000 | General production workloads |
| Throughput-heavy | 8-16 | 2000-5000 | 20000-50000 | Backfills, bursty write workloads |

**PostgreSQL connector tuning (`PostgresSourceConfig`):**

| Profile | `stream_poll_interval_ms` | `max_events_per_poll` | Suggested Use |
|---|---:|---:|---|
| Low-latency | 10-25 | 250-500 | Interactive workloads where update freshness is prioritized |
| Balanced (default-ish) | 50-250 | 1000-5000 | General production workloads |
| Throughput-heavy | 250-1000 | 5000-20000 | Backfills, high sustained ingest |

**MySQL connector tuning (`MysqlSourceConfig`):**

| Profile | `stream_poll_interval_ms` | `max_events_per_poll` | Suggested Use |
|---|---:|---:|---|
| Low-latency | 10-25 | 250-500 | Interactive workloads where update freshness is prioritized |
| Balanced (default-ish) | 50-250 | 1000-5000 | General production workloads |
| Throughput-heavy | 250-1000 | 5000-20000 | Backfills, high sustained ingest |

For sustained saturation, combine connector tuning with runtime delivery controls (`RuntimeOptions.max_buffer_size`, `RuntimeOptions.max_poll_wait_ms`) and horizontal partitioning.

### TLS Best Practices

```rust
use rustcdc::TransportConfig;

// Recommended: explicit CA bundle in production.
let transport = TransportConfig::tls_with_ca_cert_path("/etc/ssl/certs/company-ca.pem");

// Also valid: rely on system trust store.
let transport = TransportConfig::tls();

// Testing/air-gapped fallback only: disable certificate + hostname verification.
let transport = TransportConfig::tls_insecure_skip_verify();

// Plaintext: only for trusted private networks or local integration testing.
// Credentials and event data are transmitted unencrypted.
let transport = TransportConfig::plaintext();
```

Connector config helpers now provide explicit transport selection APIs:

```rust
let mysql_cfg = MysqlSourceConfig::default().with_plaintext_transport();
let pg_cfg = PostgresSourceConfig::default().with_plaintext_transport();
let mssql_cfg = SqlServerSourceConfig::default().with_plaintext_transport();

let mysql_tls = mysql_cfg.with_tls_transport();
```

### Monitoring Checklist

- [ ] Alert on `rustcdc_runtime_replication_lag_ms > 30000` (30s)
- [ ] Alert on `rustcdc_runtime_liveness == 0`
- [ ] Alert on `rustcdc_runtime_checkpoint_age_ms > 10000`
- [ ] Alert on `rustcdc_runtime_events_polled_total` trend deviation > 20%
- [ ] Dashboard: Replication lag trend over 24h
- [ ] Dashboard: Event processing rate (events/sec)
- [ ] Dashboard: Checkpoint commit latency distribution

---

**Last Updated:** May 25, 2026  
**Version:** Configuration Reference v0.1+