rhei-sidecar 1.5.0

Sidecar CDC consumer for Rhei — polls external databases by timestamp columns
Documentation
//! Sidecar CDC consumer — poll-based replication from an external database.
//!
//! # What is sidecar mode?
//!
//! Rhei's primary CDC pipeline relies on SQLite triggers that fire on each
//! `INSERT`, `UPDATE`, and `DELETE`. That approach requires write access and
//! schema-modification rights on the source database. When the source database
//! is external (e.g. a production PostgreSQL instance, or a read-only SQLite
//! file), triggers are not available.
//!
//! Sidecar mode works around this by **polling** the external database
//! periodically: it issues a `SELECT … WHERE updated_at > watermark ORDER BY
//! updated_at, pk …` query, turns each returned row into a
//! [`rhei_core::CdcEvent`], and forwards those events to the `CdcSyncEngine`
//! inside Rhei's HTAP pipeline.
//!
//! # Watermark and composite-PK ordering
//!
//! Each table maintains a [`Watermark`] that records the last-seen
//! `(updated_at timestamp, primary_key_values)` pair. Because multiple rows can
//! share the same `updated_at` value (e.g. a bulk import), a plain
//! `updated_at > watermark` predicate would skip rows whose timestamp equals
//! the watermark. Instead, the consumer uses a compound predicate:
//!
//! ```text
//! WHERE updated_at > $ts
//!    OR (updated_at = $ts AND (pk1, pk2, ...) > ($pk1, $pk2, ...))
//! ORDER BY updated_at ASC, pk1 ASC, pk2 ASC, ...
//! ```
//!
//! This guarantees that every row is delivered exactly once regardless of
//! timestamp ties, and that composite primary keys (e.g. `(tenant_id, order_id)`)
//! are handled correctly.
//!
//! Watermarks can be persisted across restarts via the [`WatermarkStore`] trait.
//! The default [`NullWatermarkStore`] is in-memory only. When the
//! `rocksdb-watermark` feature is enabled, [`RocksDbWatermarkStore`] provides
//! durable persistence so the consumer resumes exactly where it left off.
//!
//! # INSERT vs UPDATE heuristic
//!
//! Because timestamp polling does not expose the old row image, the consumer
//! cannot observe whether a row was newly created or updated. It uses a simple
//! heuristic: if `created_at == updated_at`, the event is classified as
//! [`rhei_core::CdcOperation::Insert`]; otherwise it is
//! [`rhei_core::CdcOperation::Update`].
//!
//! # Soft-delete detection
//!
//! Hard deletes (rows that vanish from the source) are invisible to a polling
//! strategy. The [`DeleteDetection`] enum offers two opt-in workarounds:
//!
//! - [`DeleteDetection::SoftDelete`] — the source marks deleted rows with a
//!   non-`NULL` `deleted_at` column; the consumer issues a second query for
//!   those rows.
//! - [`DeleteDetection::FullDiff`] — periodic full-table comparison (not yet
//!   implemented; logs a warning when configured).
//!
//! # Plugging in a `SourceConnector`
//!
//! The [`SourceConnector`] trait abstracts over the synchronous
//! `connector_arrow` query API. Built-in implementations are provided for
//! SQLite (feature `sqlite`) and PostgreSQL (feature `postgres`). Custom
//! backends can be added by implementing [`SourceConnector`] for any type that
//! is `Send + 'static`.
//!
//! # Wiring up a sidecar consumer
//!
//! ```rust,no_run
//! use rhei_sidecar::{
//!     TimestampCdcConsumer, TimestampCdcConfig, TimestampTableConfig, DeleteDetection,
//! };
//!
//! // 1. Open a connection to the external database (SQLite shown here).
//! let raw = connector_arrow::rusqlite::rusqlite::Connection::open("/path/to/external.db")
//!     .expect("open external db");
//! let conn = connector_arrow::rusqlite::SQLiteConnection::new(raw);
//!
//! // 2. Describe the tables to follow.
//! let config = TimestampCdcConfig {
//!     tables: vec![TimestampTableConfig {
//!         table_name: "orders".into(),
//!         created_at_column: "created_at".into(),
//!         updated_at_column: "updated_at".into(),
//!         primary_key: vec!["id".into()],
//!         columns: vec![], // SELECT *
//!     }],
//!     poll_batch_size: 500,
//!     delete_detection: DeleteDetection::SoftDelete {
//!         column: "deleted_at".into(),
//!     },
//! };
//!
//! // 3. Build the consumer (in-memory watermarks; use with_watermark_store for persistence).
//! let consumer = TimestampCdcConsumer::new(conn, config);
//! // Pass `consumer` to CdcSyncEngine as the CdcConsumer implementation.
//! ```

pub mod config;
pub mod error;
pub mod source;
pub mod timestamp_consumer;
mod watermark;

pub use config::{DeleteDetection, TimestampCdcConfig, TimestampTableConfig};
pub use error::SidecarError;
pub use source::SourceConnector;
pub use timestamp_consumer::TimestampCdcConsumer;
pub use watermark::{NullWatermarkStore, Watermark, WatermarkStore};

#[cfg(feature = "rocksdb-watermark")]
pub use watermark::RocksDbWatermarkStore;