rhei_sidecar/lib.rs
1//! Sidecar CDC consumer — poll-based replication from an external database.
2//!
3//! # What is sidecar mode?
4//!
5//! Rhei's primary CDC pipeline relies on SQLite triggers that fire on each
6//! `INSERT`, `UPDATE`, and `DELETE`. That approach requires write access and
7//! schema-modification rights on the source database. When the source database
8//! is external (e.g. a production PostgreSQL instance, or a read-only SQLite
9//! file), triggers are not available.
10//!
11//! Sidecar mode works around this by **polling** the external database
12//! periodically: it issues a `SELECT … WHERE updated_at > watermark ORDER BY
13//! updated_at, pk …` query, turns each returned row into a
14//! [`rhei_core::CdcEvent`], and forwards those events to the `CdcSyncEngine`
15//! inside Rhei's HTAP pipeline.
16//!
17//! # Watermark and composite-PK ordering
18//!
19//! Each table maintains a [`Watermark`] that records the last-seen
20//! `(updated_at timestamp, primary_key_values)` pair. Because multiple rows can
21//! share the same `updated_at` value (e.g. a bulk import), a plain
22//! `updated_at > watermark` predicate would skip rows whose timestamp equals
23//! the watermark. Instead, the consumer uses a compound predicate:
24//!
25//! ```text
26//! WHERE updated_at > $ts
27//! OR (updated_at = $ts AND (pk1, pk2, ...) > ($pk1, $pk2, ...))
28//! ORDER BY updated_at ASC, pk1 ASC, pk2 ASC, ...
29//! ```
30//!
31//! This guarantees that every row is delivered exactly once regardless of
32//! timestamp ties, and that composite primary keys (e.g. `(tenant_id, order_id)`)
33//! are handled correctly.
34//!
35//! Watermarks can be persisted across restarts via the [`WatermarkStore`] trait.
36//! The default [`NullWatermarkStore`] is in-memory only. When the
37//! `rocksdb-watermark` feature is enabled, [`RocksDbWatermarkStore`] provides
38//! durable persistence so the consumer resumes exactly where it left off.
39//!
40//! # INSERT vs UPDATE heuristic
41//!
42//! Because timestamp polling does not expose the old row image, the consumer
43//! cannot observe whether a row was newly created or updated. It uses a simple
44//! heuristic: if `created_at == updated_at`, the event is classified as
45//! [`rhei_core::CdcOperation::Insert`]; otherwise it is
46//! [`rhei_core::CdcOperation::Update`].
47//!
48//! # Soft-delete detection
49//!
50//! Hard deletes (rows that vanish from the source) are invisible to a polling
51//! strategy. The [`DeleteDetection`] enum offers two opt-in workarounds:
52//!
53//! - [`DeleteDetection::SoftDelete`] — the source marks deleted rows with a
54//! non-`NULL` `deleted_at` column; the consumer issues a second query for
55//! those rows.
56//! - [`DeleteDetection::FullDiff`] — periodic full-table comparison (not yet
57//! implemented; logs a warning when configured).
58//!
59//! # Plugging in a `SourceConnector`
60//!
61//! The [`SourceConnector`] trait abstracts over the synchronous
62//! `connector_arrow` query API. Built-in implementations are provided for
63//! SQLite (feature `sqlite`) and PostgreSQL (feature `postgres`). Custom
64//! backends can be added by implementing [`SourceConnector`] for any type that
65//! is `Send + 'static`.
66//!
67//! # Wiring up a sidecar consumer
68//!
69//! ```rust,no_run
70//! use rhei_sidecar::{
71//! TimestampCdcConsumer, TimestampCdcConfig, TimestampTableConfig, DeleteDetection,
72//! };
73//!
74//! // 1. Open a connection to the external database (SQLite shown here).
75//! let raw = connector_arrow::rusqlite::rusqlite::Connection::open("/path/to/external.db")
76//! .expect("open external db");
77//! let conn = connector_arrow::rusqlite::SQLiteConnection::new(raw);
78//!
79//! // 2. Describe the tables to follow.
80//! let config = TimestampCdcConfig {
81//! tables: vec![TimestampTableConfig {
82//! table_name: "orders".into(),
83//! created_at_column: "created_at".into(),
84//! updated_at_column: "updated_at".into(),
85//! primary_key: vec!["id".into()],
86//! columns: vec![], // SELECT *
87//! }],
88//! poll_batch_size: 500,
89//! delete_detection: DeleteDetection::SoftDelete {
90//! column: "deleted_at".into(),
91//! },
92//! };
93//!
94//! // 3. Build the consumer (in-memory watermarks; use with_watermark_store for persistence).
95//! let consumer = TimestampCdcConsumer::new(conn, config);
96//! // Pass `consumer` to CdcSyncEngine as the CdcConsumer implementation.
97//! ```
98
99pub mod config;
100pub mod error;
101pub mod source;
102pub mod timestamp_consumer;
103mod watermark;
104
105pub use config::{DeleteDetection, TimestampCdcConfig, TimestampTableConfig};
106pub use error::SidecarError;
107pub use source::SourceConnector;
108pub use timestamp_consumer::TimestampCdcConsumer;
109pub use watermark::{NullWatermarkStore, Watermark, WatermarkStore};
110
111#[cfg(feature = "rocksdb-watermark")]
112pub use watermark::RocksDbWatermarkStore;