rivven-rdbc
Production-grade relational database connectivity layer for the rivven event streaming platform.
Overview
rivven-rdbc provides a unified abstraction over multiple SQL database backends with:
- Async-first design using tokio
- Type-safe queries with sea-query SQL generation
- Connection pooling with health checking and metrics
- Transaction support including isolation levels and savepoints
- Schema introspection and evolution capabilities
- Prepared statement caching
Supported Databases
| Backend | Driver | Dialect | Features |
|---|---|---|---|
| PostgreSQL | tokio-postgres |
PostgresDialect |
RETURNING, ON CONFLICT, streaming |
| MySQL | mysql_async |
MySqlDialect |
ON DUPLICATE KEY UPDATE |
| MariaDB | mysql_async |
MariaDbDialect |
RETURNING (10.5+), native UUID (10.7+) |
| SQL Server | tiberius |
SqlServerDialect |
MERGE, OUTPUT clause |
Quick Start
use *;
async
Transactions
// Begin a transaction
let tx = conn.begin.await?;
// With specific isolation level
let tx = conn.begin_with_isolation.await?;
// Execute within transaction
tx.execute.await?;
// Savepoints for partial rollback
tx.savepoint.await?;
// ... more operations ...
tx.rollback_to_savepoint.await?;
// Commit
tx.commit.await?;
Connection Pooling
use ;
let pool_config = PoolConfig ;
let pool = new.await?;
// Get a connection from pool
let conn = pool.get.await?;
// Connection is automatically returned when dropped
Connection Lifecycle Tracking
The pool tracks connection lifecycle to enable accurate recycling and observability:
use *;
// Get a pooled connection
let conn = pool.get.await?;
// Check connection age and expiry status
println!;
println!;
// Check how long the connection has been in use (borrowed)
println!;
// The pool tracks recycling reasons for observability
let stats = pool.stats;
println!;
println!;
println!;
println!;
// Use helper methods for common metrics
println!;
println!;
println!;
println!;
println!;
Connections implement the ConnectionLifecycle trait for detailed lifecycle introspection:
use ConnectionLifecycle;
// Get creation time and idle duration
let created_at = conn.created_at;
let idle = conn.idle_time.await;
// Check against timeouts
if conn.is_idle_expired.await
SQL Dialect Abstraction
rivven-rdbc uses sea-query for portable SQL generation:
use ;
// Get dialect from string
let dialect = dialect_for; // PostgresDialect
let dialect = dialect_for; // MySqlDialect
let dialect = dialect_for; // MariaDbDialect (with RETURNING support)
let dialect = dialect_for; // SqlServerDialect
// Generate portable SQL
let sql = dialect.upsert_sql;
// PostgreSQL: INSERT ... ON CONFLICT (id) DO UPDATE SET ...
// MySQL: INSERT ... ON DUPLICATE KEY UPDATE ...
// MariaDB: INSERT ... ON DUPLICATE KEY UPDATE ... (with RETURNING available)
// SQL Server: MERGE ... WHEN MATCHED THEN UPDATE ...
Schema Introspection
use SchemaProvider;
// List all tables in a schema
let tables = provider.list_tables.await?;
// Get table metadata including columns
let table = provider.get_table.await?;
for col in &table.columns
// Get indexes
let indexes = provider.list_indexes.await?;
// Get foreign keys
let fks = provider.list_foreign_keys.await?;
Feature Flags
[]
= { = "0.0.14", = ["postgres", "mysql"] }
| Feature | Description |
|---|---|
postgres |
PostgreSQL support via tokio-postgres |
mysql |
MySQL/MariaDB support via mysql_async |
sqlserver |
SQL Server support via tiberius |
tls |
TLS support for all backends |
full |
All features enabled |
Architecture
┌─────────────────────────────────────────────────────┐
│ rivven-rdbc │
├─────────────────────────────────────────────────────┤
│ Connection Trait │ Transaction │ RowStream │
├─────────────────────────────────────────────────────┤
│ SQL Dialect Abstraction (sea-query) │
├─────────────────────────────────────────────────────┤
│ Connection Pool │ Health Check │ Metrics │
├─────────────┬────────────────┬─────────────────────┤
│ PostgreSQL │ MySQL │ SQL Server │
│ (tokio-pg) │ (mysql_async) │ (tiberius) │
└─────────────┴────────────────┴─────────────────────┘
Error Handling
use ;
match result
Performance Considerations
- Statement Caching: Prepared statements are cached to avoid repeated parsing
- Connection Pooling: Reuse connections to avoid connection overhead
- Async Streaming: Large result sets can be streamed to reduce memory
- Batch Operations: Use transactions for bulk inserts/updates
- Metrics: Built-in metrics for monitoring pool health and query performance
Testing
# Run unit tests (190+)
# Run integration tests (20 tests with testcontainers)
Integration Test Coverage:
- Source modes: Bulk, Incrementing, Timestamp, State resume
- Sink modes: Insert, Upsert, Update, Delete, Transactional
- Pool features: Connection pooling, stress testing (500 records)
- Advanced: Schema qualification, error handling, E2E pipeline
License
Apache-2.0. See LICENSE.