rivven-rdbc
Production-grade relational database connectivity layer for the rivven event streaming platform.
Overview
rivven-rdbc provides a unified abstraction over multiple SQL database backends with:
- Async-first design using tokio
- Type-safe queries with sea-query SQL generation
- Connection pooling with health checking and metrics
- Transaction support including isolation levels and savepoints
- Schema introspection and evolution capabilities
- Prepared statement caching
- SQL injection prevention — identifier, type name, and string literal validation on all DDL/introspection paths; WHERE clause deny-list with word-boundary matching to prevent false positives; covers stored procedures (DECLARE, CALL), PostgreSQL file-access functions (PG_READ_FILE, PG_LS_DIR, PG_READ_BINARY_FILE), timing attacks, and privilege escalation
- Deferred rollback — rollback operations avoid
block_in_place, preventing Tokio runtime stalls - MAX_STREAM_ROWS safeguard — streaming queries enforce a row limit to prevent unbounded memory growth
- Range literal serialization — PostgreSQL
Rangetypes are correctly serialized as SQL literals
Supported Databases
| Backend | Driver | Dialect | Features |
|---|---|---|---|
| PostgreSQL | tokio-postgres |
PostgresDialect |
RETURNING, ON CONFLICT, streaming, prepared statements |
| MySQL | mysql_async |
MySqlDialect |
ON DUPLICATE KEY UPDATE, streaming, prepared statements |
| MariaDB | mysql_async |
MariaDbDialect |
RETURNING (10.5+), native UUID (10.7+), streaming, prepared statements |
| SQL Server | tiberius |
SqlServerDialect |
MERGE, OUTPUT clause, streaming, prepared statements |
Quick Start
use *;
async
Transactions
// Begin a transaction
let tx = conn.begin.await?;
// With specific isolation level
let tx = conn.begin_with_isolation.await?;
// Execute within transaction
tx.execute.await?;
// Savepoints for partial rollback
tx.savepoint.await?;
// ... more operations ...
tx.rollback_to_savepoint.await?;
// Commit
tx.commit.await?;
Connection Pooling
use ;
let pool_config = PoolConfig ;
let pool = new.await?;
// Get a connection from pool
let conn = pool.get.await?;
// Connection is automatically returned when dropped
Connection Lifecycle Tracking
The pool tracks connection lifecycle to enable accurate recycling and observability:
use *;
// Get a pooled connection
let conn = pool.get.await?;
// Check connection age and expiry status
println!;
println!;
// Check how long the connection has been in use (borrowed)
println!;
// The pool tracks recycling reasons for observability
let stats = pool.stats;
println!;
println!;
println!;
println!;
// Use helper methods for common metrics
println!;
println!;
println!;
println!;
println!;
println!;
Connections implement the ConnectionLifecycle trait for detailed lifecycle introspection:
use ConnectionLifecycle;
// Get creation time and idle duration
let created_at = conn.created_at;
let idle = conn.idle_time.await;
// Check against timeouts
if conn.is_idle_expired.await
SQL Dialect Abstraction
rivven-rdbc uses sea-query for portable SQL generation:
use ;
// Get dialect from string
let dialect = dialect_for; // PostgresDialect
let dialect = dialect_for; // MySqlDialect
let dialect = dialect_for; // MariaDbDialect (with RETURNING support)
let dialect = dialect_for; // SqlServerDialect
// Generate portable SQL
let sql = dialect.upsert_sql;
// PostgreSQL: INSERT ... ON CONFLICT (id) DO UPDATE SET ...
// MySQL: INSERT ... ON DUPLICATE KEY UPDATE ...
// MariaDB: INSERT ... ON DUPLICATE KEY UPDATE ... (with RETURNING available)
// SQL Server: MERGE ... WHEN MATCHED THEN UPDATE ...
Schema Introspection
use SchemaProvider;
// List all tables in a schema
let tables = provider.list_tables.await?;
// Get table metadata including columns
let table = provider.get_table.await?;
for col in &table.columns
// Get indexes
let indexes = provider.list_indexes.await?;
// Get foreign keys
let fks = provider.list_foreign_keys.await?;
Feature Flags
[]
= { = "0.0.22", = ["postgres", "mysql"] }
| Feature | Description |
|---|---|
postgres |
PostgreSQL support via tokio-postgres |
mysql |
MySQL/MariaDB support via mysql_async |
sqlserver |
SQL Server support via tiberius |
tls |
TLS support for all backends |
full |
All features enabled |
Architecture
┌─────────────────────────────────────────────────────┐
│ rivven-rdbc │
├─────────────────────────────────────────────────────┤
│ Connection Trait │ Transaction │ RowStream │
├─────────────────────────────────────────────────────┤
│ SQL Dialect Abstraction (sea-query) │
├─────────────────────────────────────────────────────┤
│ Connection Pool │ Health Check │ Metrics │
├─────────────┬────────────────┬─────────────────────┤
│ PostgreSQL │ MySQL │ SQL Server │
│ (tokio-pg) │ (mysql_async) │ (tiberius) │
└─────────────┴────────────────┴─────────────────────┘
Error Handling
use ;
match result
Performance Considerations
- Statement Caching: Prepared statements are cached to avoid repeated parsing
- Connection Pooling: Reuse connections to avoid connection overhead
- Async Streaming: All backends support
query_streamfor memory-efficient result processing. PostgreSQL uses true incremental streaming viaquery_raw, while MySQL and SQL Server fetch results then stream them through an async iterator. - Batch Operations: Use transactions for bulk inserts/updates
- Metrics: Built-in metrics for monitoring pool health and query performance
Testing
# Run unit tests (190+)
# Run integration tests (20 tests with testcontainers)
Integration Test Coverage:
- Source modes: Bulk, Incrementing, Timestamp, State resume
- Sink modes: Insert, Upsert, Update, Delete, Transactional
- Pool features: Connection pooling, stress testing (500 records)
- Advanced: Schema qualification, error handling, E2E pipeline
License
Apache-2.0. See LICENSE.