Skip to main content

rhei_sync/
lib.rs

1//! CDC sync engine, SQL query router, and CDC-to-DML converter for Rhei.
2//!
3//! `rhei-sync` is the **glue crate** that ties the OLTP and OLAP halves of
4//! Rhei's HTAP pipeline together.  It has three core responsibilities:
5//!
6//! ## 1. Query routing ([`router`])
7//!
8//! [`SqlParserRouter`] parses every incoming SQL statement with
9//! `sqlparser-rs` (SQLite dialect) and directs it to the right backend:
10//!
11//! - **OLTP** — writes (INSERT / UPDATE / DELETE), DDL, transactions, and
12//!   simple point-lookup SELECTs.
13//! - **OLAP** — aggregates, GROUP BY / HAVING, JOINs, window functions, CTEs,
14//!   set operations (UNION / INTERSECT / EXCEPT), and subqueries.
15//! - **Heuristic fallback** — when the parser cannot handle the SQL (e.g.
16//!   SQLite `PRAGMA`), a keyword-scan fallback is applied.  The default is
17//!   OLTP (safety-first).
18//!
19//! ## 2. CDC-to-DML conversion ([`converter`], [`temporal_converter`])
20//!
21//! CDC events (`INSERT` / `UPDATE` / `DELETE`) captured from OLTP triggers are
22//! converted into OLAP DML:
23//!
24//! - **Destructive mode** — mirror semantics: UPDATE overwrites the existing
25//!   row, DELETE removes it.
26//! - **Temporal mode (SCD Type 2)** — append-only with validity intervals.
27//!   Every change produces an INSERT; UPDATEs and DELETEs additionally close
28//!   the previous row version by setting `_rhei_valid_to`.  Point-in-time
29//!   queries use:
30//!   `WHERE _rhei_valid_from <= T AND (_rhei_valid_to IS NULL OR _rhei_valid_to > T)`
31//!
32//! ### Arrow-native bulk INSERT path
33//!
34//! Consecutive INSERT events for the same table are grouped into a
35//! `SyncOp::BatchInsert` (see [`sync_engine`]).  The engine first attempts to
36//! build a typed Arrow `RecordBatch` via [`converter::cdc_events_to_batch`] (or
37//! [`temporal_converter::cdc_events_to_temporal_batch`] in temporal mode) and
38//! calls `OlapEngine::load_arrow`, bypassing SQL parsing entirely.  If the
39//! schema contains a type that the Arrow builder does not handle (Date,
40//! Timestamp, Decimal, List, Struct, …) the engine falls back to a SQL
41//! `INSERT … VALUES` statement.
42//!
43//! ## 3. Background sync loop ([`background`])
44//!
45//! [`spawn_sync_loop`] starts a Tokio task that calls
46//! [`rhei_core::SyncEngine::sync_once`] on a configurable interval.  Shutdown
47//! is cooperative: the caller holds a
48//! [`tokio_util::sync::CancellationToken`] and cancels it to stop the loop
49//! cleanly.
50//!
51//! A DDL read-lock is acquired before every cycle so that schema mutations
52//! (which take the write lock) never interleave with an in-progress sync.
53//!
54//! ## `SyncMode` semantics
55//!
56//! | Mode | UPDATE | DELETE |
57//! |------|--------|--------|
58//! | `Destructive` | overwrites the row | removes the row |
59//! | `Temporal` | closes previous version + inserts new version | closes previous version + inserts tombstone |
60//!
61//! ## SQL injection prevention
62//!
63//! All table and column identifiers are validated against `[A-Za-z0-9_]` at
64//! `register_table` time.  The converter re-validates them at conversion time
65//! for defense-in-depth.
66
67pub mod background;
68pub mod converter;
69pub mod error;
70pub mod router;
71pub mod sync_engine;
72pub mod temporal_converter;
73
74pub use background::spawn_sync_loop;
75pub use error::SyncError;
76pub use router::{HeuristicRouter, SqlParserRouter};
77pub use sync_engine::CdcSyncEngine;
78pub use temporal_converter::temporalize_schema;