Sidecar CDC consumer — poll-based replication from an external database.
What is sidecar mode?
Rhei's primary CDC pipeline relies on SQLite triggers that fire on each
INSERT, UPDATE, and DELETE. That approach requires write access and
schema-modification rights on the source database. When the source database
is external (e.g. a production PostgreSQL instance, or a read-only SQLite
file), triggers are not available.
Sidecar mode works around this by polling the external database
periodically: it issues a SELECT … WHERE updated_at > watermark ORDER BY updated_at, pk … query, turns each returned row into a
[rhei_core::CdcEvent], and forwards those events to the CdcSyncEngine
inside Rhei's HTAP pipeline.
Watermark and composite-PK ordering
Each table maintains a [Watermark] that records the last-seen
(updated_at timestamp, primary_key_values) pair. Because multiple rows can
share the same updated_at value (e.g. a bulk import), a plain
updated_at > watermark predicate would skip rows whose timestamp equals
the watermark. Instead, the consumer uses a compound predicate:
WHERE updated_at > $ts
OR (updated_at = $ts AND (pk1, pk2, ...) > ($pk1, $pk2, ...))
ORDER BY updated_at ASC, pk1 ASC, pk2 ASC, ...
This guarantees that every row is delivered exactly once regardless of
timestamp ties, and that composite primary keys (e.g. (tenant_id, order_id))
are handled correctly.
Watermarks can be persisted across restarts via the [WatermarkStore] trait.
The default [NullWatermarkStore] is in-memory only. When the
rocksdb-watermark feature is enabled, [RocksDbWatermarkStore] provides
durable persistence so the consumer resumes exactly where it left off.
INSERT vs UPDATE heuristic
Because timestamp polling does not expose the old row image, the consumer
cannot observe whether a row was newly created or updated. It uses a simple
heuristic: if created_at == updated_at, the event is classified as
[rhei_core::CdcOperation::Insert]; otherwise it is
[rhei_core::CdcOperation::Update].
Soft-delete detection
Hard deletes (rows that vanish from the source) are invisible to a polling
strategy. The [DeleteDetection] enum offers two opt-in workarounds:
- [
DeleteDetection::SoftDelete] — the source marks deleted rows with a non-NULLdeleted_atcolumn; the consumer issues a second query for those rows. - [
DeleteDetection::FullDiff] — periodic full-table comparison (not yet implemented; logs a warning when configured).
Plugging in a SourceConnector
The [SourceConnector] trait abstracts over the synchronous
connector_arrow query API. Built-in implementations are provided for
SQLite (feature sqlite) and PostgreSQL (feature postgres). Custom
backends can be added by implementing [SourceConnector] for any type that
is Send + 'static.
Wiring up a sidecar consumer
use ;
// 1. Open a connection to the external database (SQLite shown here).
let raw = open
.expect;
let conn = new;
// 2. Describe the tables to follow.
let config = TimestampCdcConfig ;
// 3. Build the consumer (in-memory watermarks; use with_watermark_store for persistence).
let consumer = new;
// Pass `consumer` to CdcSyncEngine as the CdcConsumer implementation.