rhei-sidecar 1.5.0

Sidecar CDC consumer for Rhei — polls external databases by timestamp columns
Documentation

Sidecar CDC consumer — poll-based replication from an external database.

What is sidecar mode?

Rhei's primary CDC pipeline relies on SQLite triggers that fire on each INSERT, UPDATE, and DELETE. That approach requires write access and schema-modification rights on the source database. When the source database is external (e.g. a production PostgreSQL instance, or a read-only SQLite file), triggers are not available.

Sidecar mode works around this by polling the external database periodically: it issues a SELECT … WHERE updated_at > watermark ORDER BY updated_at, pk … query, turns each returned row into a [rhei_core::CdcEvent], and forwards those events to the CdcSyncEngine inside Rhei's HTAP pipeline.

Watermark and composite-PK ordering

Each table maintains a [Watermark] that records the last-seen (updated_at timestamp, primary_key_values) pair. Because multiple rows can share the same updated_at value (e.g. a bulk import), a plain updated_at > watermark predicate would skip rows whose timestamp equals the watermark. Instead, the consumer uses a compound predicate:

WHERE updated_at > $ts
   OR (updated_at = $ts AND (pk1, pk2, ...) > ($pk1, $pk2, ...))
ORDER BY updated_at ASC, pk1 ASC, pk2 ASC, ...

This guarantees that every row is delivered exactly once regardless of timestamp ties, and that composite primary keys (e.g. (tenant_id, order_id)) are handled correctly.

Watermarks can be persisted across restarts via the [WatermarkStore] trait. The default [NullWatermarkStore] is in-memory only. When the rocksdb-watermark feature is enabled, [RocksDbWatermarkStore] provides durable persistence so the consumer resumes exactly where it left off.

INSERT vs UPDATE heuristic

Because timestamp polling does not expose the old row image, the consumer cannot observe whether a row was newly created or updated. It uses a simple heuristic: if created_at == updated_at, the event is classified as [rhei_core::CdcOperation::Insert]; otherwise it is [rhei_core::CdcOperation::Update].

Soft-delete detection

Hard deletes (rows that vanish from the source) are invisible to a polling strategy. The [DeleteDetection] enum offers two opt-in workarounds:

  • [DeleteDetection::SoftDelete] — the source marks deleted rows with a non-NULL deleted_at column; the consumer issues a second query for those rows.
  • [DeleteDetection::FullDiff] — periodic full-table comparison (not yet implemented; logs a warning when configured).

Plugging in a SourceConnector

The [SourceConnector] trait abstracts over the synchronous connector_arrow query API. Built-in implementations are provided for SQLite (feature sqlite) and PostgreSQL (feature postgres). Custom backends can be added by implementing [SourceConnector] for any type that is Send + 'static.

Wiring up a sidecar consumer

use rhei_sidecar::{
    TimestampCdcConsumer, TimestampCdcConfig, TimestampTableConfig, DeleteDetection,
};

// 1. Open a connection to the external database (SQLite shown here).
let raw = connector_arrow::rusqlite::rusqlite::Connection::open("/path/to/external.db")
    .expect("open external db");
let conn = connector_arrow::rusqlite::SQLiteConnection::new(raw);

// 2. Describe the tables to follow.
let config = TimestampCdcConfig {
    tables: vec![TimestampTableConfig {
        table_name: "orders".into(),
        created_at_column: "created_at".into(),
        updated_at_column: "updated_at".into(),
        primary_key: vec!["id".into()],
        columns: vec![], // SELECT *
    }],
    poll_batch_size: 500,
    delete_detection: DeleteDetection::SoftDelete {
        column: "deleted_at".into(),
    },
};

// 3. Build the consumer (in-memory watermarks; use with_watermark_store for persistence).
let consumer = TimestampCdcConsumer::new(conn, config);
// Pass `consumer` to CdcSyncEngine as the CdcConsumer implementation.