Expand description
Sidecar CDC consumer — poll-based replication from an external database.
§What is sidecar mode?
Rhei’s primary CDC pipeline relies on SQLite triggers that fire on each
INSERT, UPDATE, and DELETE. That approach requires write access and
schema-modification rights on the source database. When the source database
is external (e.g. a production PostgreSQL instance, or a read-only SQLite
file), triggers are not available.
Sidecar mode works around this by polling the external database
periodically: it issues a SELECT … WHERE updated_at > watermark ORDER BY updated_at, pk … query, turns each returned row into a
rhei_core::CdcEvent, and forwards those events to the CdcSyncEngine
inside Rhei’s HTAP pipeline.
§Watermark and composite-PK ordering
Each table maintains a Watermark that records the last-seen
(updated_at timestamp, primary_key_values) pair. Because multiple rows can
share the same updated_at value (e.g. a bulk import), a plain
updated_at > watermark predicate would skip rows whose timestamp equals
the watermark. Instead, the consumer uses a compound predicate:
WHERE updated_at > $ts
OR (updated_at = $ts AND (pk1, pk2, ...) > ($pk1, $pk2, ...))
ORDER BY updated_at ASC, pk1 ASC, pk2 ASC, ...This guarantees that every row is delivered exactly once regardless of
timestamp ties, and that composite primary keys (e.g. (tenant_id, order_id))
are handled correctly.
Watermarks can be persisted across restarts via the WatermarkStore trait.
The default NullWatermarkStore is in-memory only. When the
rocksdb-watermark feature is enabled, [RocksDbWatermarkStore] provides
durable persistence so the consumer resumes exactly where it left off.
§INSERT vs UPDATE heuristic
Because timestamp polling does not expose the old row image, the consumer
cannot observe whether a row was newly created or updated. It uses a simple
heuristic: if created_at == updated_at, the event is classified as
rhei_core::CdcOperation::Insert; otherwise it is
rhei_core::CdcOperation::Update.
§Soft-delete detection
Hard deletes (rows that vanish from the source) are invisible to a polling
strategy. The DeleteDetection enum offers two opt-in workarounds:
DeleteDetection::SoftDelete— the source marks deleted rows with a non-NULLdeleted_atcolumn; the consumer issues a second query for those rows.DeleteDetection::FullDiff— periodic full-table comparison (not yet implemented; logs a warning when configured).
§Plugging in a SourceConnector
The SourceConnector trait abstracts over the synchronous
connector_arrow query API. Built-in implementations are provided for
SQLite (feature sqlite) and PostgreSQL (feature postgres). Custom
backends can be added by implementing SourceConnector for any type that
is Send + 'static.
§Wiring up a sidecar consumer
use rhei_sidecar::{
TimestampCdcConsumer, TimestampCdcConfig, TimestampTableConfig, DeleteDetection,
};
// 1. Open a connection to the external database (SQLite shown here).
let raw = connector_arrow::rusqlite::rusqlite::Connection::open("/path/to/external.db")
.expect("open external db");
let conn = connector_arrow::rusqlite::SQLiteConnection::new(raw);
// 2. Describe the tables to follow.
let config = TimestampCdcConfig {
tables: vec![TimestampTableConfig {
table_name: "orders".into(),
created_at_column: "created_at".into(),
updated_at_column: "updated_at".into(),
primary_key: vec!["id".into()],
columns: vec![], // SELECT *
}],
poll_batch_size: 500,
delete_detection: DeleteDetection::SoftDelete {
column: "deleted_at".into(),
},
};
// 3. Build the consumer (in-memory watermarks; use with_watermark_store for persistence).
let consumer = TimestampCdcConsumer::new(conn, config);
// Pass `consumer` to CdcSyncEngine as the CdcConsumer implementation.Re-exports§
pub use config::DeleteDetection;pub use config::TimestampCdcConfig;pub use config::TimestampTableConfig;pub use error::SidecarError;pub use source::SourceConnector;pub use timestamp_consumer::TimestampCdcConsumer;
Modules§
- config
- Configuration types for the
crate::TimestampCdcConsumer. - error
- Error types for the
rhei-sidecarcrate. - source
- The
SourceConnectortrait and its built-in implementations. - timestamp_
consumer - The
TimestampCdcConsumer— poll-based CDC consumer generic overcrate::SourceConnector.
Structs§
- Null
Watermark Store - No-op watermark store — provides backward-compatible in-memory-only behavior.
- Watermark
- Internal watermark state for timestamp-based polling.
Traits§
- Watermark
Store - Persistence backend for sidecar watermarks.