rhei-cdc-rocksdb 1.5.0

RocksDB-backed CDC log for Rhei HTAP engine
Documentation
//! Benchmark: RocksDB CDC log vs SQLite trigger CDC
//!
//! Measures append + poll + prune throughput for both backends.
//!
//! Run: cargo run -p rhei-cdc-rocksdb --example bench_vs_sqlite --release

use rhei_core::types::{CdcEvent, CdcOperation};
use rhei_core::CdcConsumer;
use std::time::Instant;

fn make_event(seq: i64, table: &str) -> CdcEvent {
    CdcEvent {
        seq,
        timestamp: 1000000 + seq,
        operation: CdcOperation::Insert,
        table: table.to_string(),
        row_id: Some(seq),
        old_data: None,
        new_data: Some(serde_json::json!({
            "id": seq,
            "name": format!("User{seq}"),
            "age": 20 + (seq % 50),
            "email": format!("user{seq}@example.com"),
        })),
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    println!("=== CDC Backend Benchmark ===\n");

    let counts = [100, 1000, 5000, 10000];

    for &n in &counts {
        println!("--- {n} events ---");

        // === RocksDB CDC ===
        {
            let tmp = tempfile::tempdir()?;
            let config = rhei_cdc_rocksdb::RocksDbCdcConfig {
                path: tmp.path().join("cdc").to_str().unwrap().to_string(),
                create_if_missing: true,
            };
            let log = rhei_cdc_rocksdb::RocksDbCdcLog::open(&config)?;

            // Append
            let t0 = Instant::now();
            let mut events: Vec<CdcEvent> = (1..=n).map(|i| make_event(i, "users")).collect();
            log.append_batch(&mut events)?;
            let append_elapsed = t0.elapsed();

            // Poll all
            let t0 = Instant::now();
            let polled = log.poll(None, n as u32).await?;
            let poll_elapsed = t0.elapsed();
            assert_eq!(polled.len(), n as usize);

            // Prune all
            let t0 = Instant::now();
            log.prune(n).await?;
            let prune_elapsed = t0.elapsed();

            println!(
                "  RocksDB  append: {:>8.0} events/sec  ({:.3}ms)",
                n as f64 / append_elapsed.as_secs_f64(),
                append_elapsed.as_secs_f64() * 1000.0
            );
            println!(
                "  RocksDB  poll:   {:>8.0} events/sec  ({:.3}ms)",
                n as f64 / poll_elapsed.as_secs_f64(),
                poll_elapsed.as_secs_f64() * 1000.0
            );
            println!(
                "  RocksDB  prune:  {:>8.0} events/sec  ({:.3}ms)",
                n as f64 / prune_elapsed.as_secs_f64(),
                prune_elapsed.as_secs_f64() * 1000.0
            );
        }

        // === SQLite CDC (trigger-based via rhei-oltp-rusqlite) ===
        {
            let tmp = tempfile::tempdir()?;
            let db_path = tmp.path().join("test.db");

            let conn = rhei_tokio_rusqlite::Connection::open(&db_path).await?;

            // Setup: WAL + table + triggers
            conn.call(|c: &mut rusqlite::Connection| {
                c.execute_batch(
                    "PRAGMA journal_mode=WAL;
                     PRAGMA synchronous=NORMAL;
                     PRAGMA busy_timeout=5000;
                     CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER, email TEXT);
                     CREATE TABLE _rhei_cdc_log (
                         seq    INTEGER PRIMARY KEY AUTOINCREMENT,
                         ts     INTEGER NOT NULL DEFAULT (strftime('%s', 'now')),
                         op     TEXT    NOT NULL CHECK(op IN ('I','U','D')),
                         tbl    TEXT    NOT NULL,
                         row_id INTEGER,
                         old_data TEXT,
                         new_data TEXT
                     );
                     CREATE TRIGGER _rhei_cdc_users_insert
                     AFTER INSERT ON users BEGIN
                       INSERT INTO _rhei_cdc_log (op, tbl, row_id, new_data)
                       VALUES ('I', 'users', NEW.rowid,
                         json_array(NEW.id, NEW.name, NEW.age, NEW.email));
                     END;"
                )?;
                Ok(())
            })
            .await?;

            // Append (INSERT into source table, trigger writes to CDC log)
            let n_copy = n;
            let t0 = Instant::now();
            conn.call(move |c: &mut rusqlite::Connection| {
                let tx = c.transaction()?;
                for i in 1..=n_copy {
                    tx.execute(
                        "INSERT INTO users VALUES (?1, ?2, ?3, ?4)",
                        rusqlite::params![
                            i,
                            format!("User{i}"),
                            20 + (i % 50),
                            format!("user{i}@example.com")
                        ],
                    )?;
                }
                tx.commit()?;
                Ok(())
            })
            .await?;
            let append_elapsed = t0.elapsed();

            // Poll all from CDC log
            let n_copy = n;
            let t0 = Instant::now();
            let count = conn
                .call(move |c: &mut rusqlite::Connection| {
                    let mut stmt = c.prepare(
                        "SELECT seq, ts, op, tbl, row_id, old_data, new_data
                         FROM _rhei_cdc_log ORDER BY seq ASC LIMIT ?1",
                    )?;
                    let mut rows = stmt.query(rusqlite::params![n_copy])?;
                    let mut count = 0i64;
                    while let Some(_row) = rows.next()? {
                        count += 1;
                    }
                    Ok(count)
                })
                .await?;
            let poll_elapsed = t0.elapsed();
            assert_eq!(count, n);

            // Prune
            let n_copy = n;
            let t0 = Instant::now();
            conn.call(move |c: &mut rusqlite::Connection| {
                c.execute(
                    "DELETE FROM _rhei_cdc_log WHERE seq <= ?1",
                    rusqlite::params![n_copy],
                )?;
                Ok(())
            })
            .await?;
            let prune_elapsed = t0.elapsed();

            println!(
                "  SQLite   append: {:>8.0} events/sec  ({:.3}ms)  (batched transaction)",
                n as f64 / append_elapsed.as_secs_f64(),
                append_elapsed.as_secs_f64() * 1000.0
            );
            println!(
                "  SQLite   poll:   {:>8.0} events/sec  ({:.3}ms)",
                n as f64 / poll_elapsed.as_secs_f64(),
                poll_elapsed.as_secs_f64() * 1000.0
            );
            println!(
                "  SQLite   prune:  {:>8.0} events/sec  ({:.3}ms)",
                n as f64 / prune_elapsed.as_secs_f64(),
                prune_elapsed.as_secs_f64() * 1000.0
            );
        }

        println!();
    }

    Ok(())
}