1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
//! Sidecar CDC consumer — poll-based replication from an external database.
//!
//! # What is sidecar mode?
//!
//! Rhei's primary CDC pipeline relies on SQLite triggers that fire on each
//! `INSERT`, `UPDATE`, and `DELETE`. That approach requires write access and
//! schema-modification rights on the source database. When the source database
//! is external (e.g. a production PostgreSQL instance, or a read-only SQLite
//! file), triggers are not available.
//!
//! Sidecar mode works around this by **polling** the external database
//! periodically: it issues a `SELECT … WHERE updated_at > watermark ORDER BY
//! updated_at, pk …` query, turns each returned row into a
//! [`rhei_core::CdcEvent`], and forwards those events to the `CdcSyncEngine`
//! inside Rhei's HTAP pipeline.
//!
//! # Watermark and composite-PK ordering
//!
//! Each table maintains a [`Watermark`] that records the last-seen
//! `(updated_at timestamp, primary_key_values)` pair. Because multiple rows can
//! share the same `updated_at` value (e.g. a bulk import), a plain
//! `updated_at > watermark` predicate would skip rows whose timestamp equals
//! the watermark. Instead, the consumer uses a compound predicate:
//!
//! ```text
//! WHERE updated_at > $ts
//! OR (updated_at = $ts AND (pk1, pk2, ...) > ($pk1, $pk2, ...))
//! ORDER BY updated_at ASC, pk1 ASC, pk2 ASC, ...
//! ```
//!
//! This guarantees that every row is delivered exactly once regardless of
//! timestamp ties, and that composite primary keys (e.g. `(tenant_id, order_id)`)
//! are handled correctly.
//!
//! Watermarks can be persisted across restarts via the [`WatermarkStore`] trait.
//! The default [`NullWatermarkStore`] is in-memory only. When the
//! `rocksdb-watermark` feature is enabled, [`RocksDbWatermarkStore`] provides
//! durable persistence so the consumer resumes exactly where it left off.
//!
//! # INSERT vs UPDATE heuristic
//!
//! Because timestamp polling does not expose the old row image, the consumer
//! cannot observe whether a row was newly created or updated. It uses a simple
//! heuristic: if `created_at == updated_at`, the event is classified as
//! [`rhei_core::CdcOperation::Insert`]; otherwise it is
//! [`rhei_core::CdcOperation::Update`].
//!
//! # Soft-delete detection
//!
//! Hard deletes (rows that vanish from the source) are invisible to a polling
//! strategy. The [`DeleteDetection`] enum offers two opt-in workarounds:
//!
//! - [`DeleteDetection::SoftDelete`] — the source marks deleted rows with a
//! non-`NULL` `deleted_at` column; the consumer issues a second query for
//! those rows.
//! - [`DeleteDetection::FullDiff`] — periodic full-table comparison (not yet
//! implemented; logs a warning when configured).
//!
//! # Plugging in a `SourceConnector`
//!
//! The [`SourceConnector`] trait abstracts over the synchronous
//! `connector_arrow` query API. Built-in implementations are provided for
//! SQLite (feature `sqlite`) and PostgreSQL (feature `postgres`). Custom
//! backends can be added by implementing [`SourceConnector`] for any type that
//! is `Send + 'static`.
//!
//! # Wiring up a sidecar consumer
//!
//! ```rust,no_run
//! use rhei_sidecar::{
//! TimestampCdcConsumer, TimestampCdcConfig, TimestampTableConfig, DeleteDetection,
//! };
//!
//! // 1. Open a connection to the external database (SQLite shown here).
//! let raw = connector_arrow::rusqlite::rusqlite::Connection::open("/path/to/external.db")
//! .expect("open external db");
//! let conn = connector_arrow::rusqlite::SQLiteConnection::new(raw);
//!
//! // 2. Describe the tables to follow.
//! let config = TimestampCdcConfig {
//! tables: vec![TimestampTableConfig {
//! table_name: "orders".into(),
//! created_at_column: "created_at".into(),
//! updated_at_column: "updated_at".into(),
//! primary_key: vec!["id".into()],
//! columns: vec![], // SELECT *
//! }],
//! poll_batch_size: 500,
//! delete_detection: DeleteDetection::SoftDelete {
//! column: "deleted_at".into(),
//! },
//! };
//!
//! // 3. Build the consumer (in-memory watermarks; use with_watermark_store for persistence).
//! let consumer = TimestampCdcConsumer::new(conn, config);
//! // Pass `consumer` to CdcSyncEngine as the CdcConsumer implementation.
//! ```
pub use ;
pub use SidecarError;
pub use SourceConnector;
pub use TimestampCdcConsumer;
pub use ;
pub use RocksDbWatermarkStore;