use rhei_core::types::{CdcEvent, CdcOperation};
use rhei_core::CdcConsumer;
use std::time::Instant;
fn make_event(seq: i64, table: &str) -> CdcEvent {
CdcEvent {
seq,
timestamp: 1000000 + seq,
operation: CdcOperation::Insert,
table: table.to_string(),
row_id: Some(seq),
old_data: None,
new_data: Some(serde_json::json!({
"id": seq,
"name": format!("User{seq}"),
"age": 20 + (seq % 50),
"email": format!("user{seq}@example.com"),
})),
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("=== CDC Backend Benchmark ===\n");
let counts = [100, 1000, 5000, 10000];
for &n in &counts {
println!("--- {n} events ---");
{
let tmp = tempfile::tempdir()?;
let config = rhei_cdc_rocksdb::RocksDbCdcConfig {
path: tmp.path().join("cdc").to_str().unwrap().to_string(),
create_if_missing: true,
};
let log = rhei_cdc_rocksdb::RocksDbCdcLog::open(&config)?;
let t0 = Instant::now();
let mut events: Vec<CdcEvent> = (1..=n).map(|i| make_event(i, "users")).collect();
log.append_batch(&mut events)?;
let append_elapsed = t0.elapsed();
let t0 = Instant::now();
let polled = log.poll(None, n as u32).await?;
let poll_elapsed = t0.elapsed();
assert_eq!(polled.len(), n as usize);
let t0 = Instant::now();
log.prune(n).await?;
let prune_elapsed = t0.elapsed();
println!(
" RocksDB append: {:>8.0} events/sec ({:.3}ms)",
n as f64 / append_elapsed.as_secs_f64(),
append_elapsed.as_secs_f64() * 1000.0
);
println!(
" RocksDB poll: {:>8.0} events/sec ({:.3}ms)",
n as f64 / poll_elapsed.as_secs_f64(),
poll_elapsed.as_secs_f64() * 1000.0
);
println!(
" RocksDB prune: {:>8.0} events/sec ({:.3}ms)",
n as f64 / prune_elapsed.as_secs_f64(),
prune_elapsed.as_secs_f64() * 1000.0
);
}
{
let tmp = tempfile::tempdir()?;
let db_path = tmp.path().join("test.db");
let conn = rhei_tokio_rusqlite::Connection::open(&db_path).await?;
conn.call(|c: &mut rusqlite::Connection| {
c.execute_batch(
"PRAGMA journal_mode=WAL;
PRAGMA synchronous=NORMAL;
PRAGMA busy_timeout=5000;
CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER, email TEXT);
CREATE TABLE _rhei_cdc_log (
seq INTEGER PRIMARY KEY AUTOINCREMENT,
ts INTEGER NOT NULL DEFAULT (strftime('%s', 'now')),
op TEXT NOT NULL CHECK(op IN ('I','U','D')),
tbl TEXT NOT NULL,
row_id INTEGER,
old_data TEXT,
new_data TEXT
);
CREATE TRIGGER _rhei_cdc_users_insert
AFTER INSERT ON users BEGIN
INSERT INTO _rhei_cdc_log (op, tbl, row_id, new_data)
VALUES ('I', 'users', NEW.rowid,
json_array(NEW.id, NEW.name, NEW.age, NEW.email));
END;"
)?;
Ok(())
})
.await?;
let n_copy = n;
let t0 = Instant::now();
conn.call(move |c: &mut rusqlite::Connection| {
let tx = c.transaction()?;
for i in 1..=n_copy {
tx.execute(
"INSERT INTO users VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![
i,
format!("User{i}"),
20 + (i % 50),
format!("user{i}@example.com")
],
)?;
}
tx.commit()?;
Ok(())
})
.await?;
let append_elapsed = t0.elapsed();
let n_copy = n;
let t0 = Instant::now();
let count = conn
.call(move |c: &mut rusqlite::Connection| {
let mut stmt = c.prepare(
"SELECT seq, ts, op, tbl, row_id, old_data, new_data
FROM _rhei_cdc_log ORDER BY seq ASC LIMIT ?1",
)?;
let mut rows = stmt.query(rusqlite::params![n_copy])?;
let mut count = 0i64;
while let Some(_row) = rows.next()? {
count += 1;
}
Ok(count)
})
.await?;
let poll_elapsed = t0.elapsed();
assert_eq!(count, n);
let n_copy = n;
let t0 = Instant::now();
conn.call(move |c: &mut rusqlite::Connection| {
c.execute(
"DELETE FROM _rhei_cdc_log WHERE seq <= ?1",
rusqlite::params![n_copy],
)?;
Ok(())
})
.await?;
let prune_elapsed = t0.elapsed();
println!(
" SQLite append: {:>8.0} events/sec ({:.3}ms) (batched transaction)",
n as f64 / append_elapsed.as_secs_f64(),
append_elapsed.as_secs_f64() * 1000.0
);
println!(
" SQLite poll: {:>8.0} events/sec ({:.3}ms)",
n as f64 / poll_elapsed.as_secs_f64(),
poll_elapsed.as_secs_f64() * 1000.0
);
println!(
" SQLite prune: {:>8.0} events/sec ({:.3}ms)",
n as f64 / prune_elapsed.as_secs_f64(),
prune_elapsed.as_secs_f64() * 1000.0
);
}
println!();
}
Ok(())
}