Skip to main content

rivven_rdbc/
lib.rs

1//! # rivven-rdbc
2//!
3//! Production-grade relational database connectivity for the Rivven event streaming platform.
4//!
5//! This crate provides a unified interface for connecting to relational databases,
6//! with feature parity to Kafka Connect JDBC connector and Debezium's relational support.
7//!
8//! ## Features
9//!
10//! - **Multi-Database Support**: PostgreSQL, MySQL, SQL Server with unified API
11//! - **Connection Pooling**: High-performance connection pooling with health checks
12//! - **SQL Dialect Abstraction**: Vendor-agnostic SQL generation using sea-query
13//! - **Schema Discovery**: Automatic schema introspection and metadata
14//! - **Table Source**: Query-based CDC with incrementing/timestamp modes
15//! - **Table Sink**: High-performance batch writes with upsert support
16//! - **Type Safety**: Comprehensive value types matching Debezium's type system
17//!
18//! ## Quick Start
19//!
20//! ```rust,ignore
21//! use rivven_rdbc::prelude::*;
22//!
23//! // Create a connection pool
24//! let pool = PoolBuilder::new("postgres://user:pass@localhost/db")
25//!     .min_size(2)
26//!     .max_size(10)
27//!     .build();
28//!
29//! // Use connection for queries
30//! let conn = pool.get().await?;
31//! let rows = conn.query("SELECT * FROM users WHERE id = $1", &[Value::Int32(1)]).await?;
32//!
33//! // Configure a table source
34//! let source = TableSourceConfig::incrementing("users", "id")
35//!     .with_schema("public")
36//!     .with_poll_interval(Duration::from_secs(1));
37//!
38//! // Configure a table sink
39//! let sink = TableSinkBuilder::new()
40//!     .batch_size(1000)
41//!     .write_mode(WriteMode::Upsert)
42//!     .auto_ddl(AutoDdlMode::Create)
43//!     .build();
44//! ```
45//!
46//! ## Feature Flags
47//!
48//! - `postgres` - PostgreSQL support via tokio-postgres
49//! - `mysql` - MySQL/MariaDB support via mysql_async
50//! - `sqlserver` - SQL Server support via tiberius
51//! - `tls` - TLS support for secure connections
52//! - `full` - All features enabled
53
54#![warn(missing_docs)]
55#![warn(clippy::all)]
56#![deny(unsafe_code)]
57
58pub mod connection;
59pub mod dialect;
60pub mod error;
61pub mod pool;
62pub mod schema;
63pub mod security;
64pub mod sink;
65pub mod source;
66pub mod types;
67
68// Backend implementations (conditionally compiled)
69#[cfg(feature = "postgres")]
70pub mod postgres;
71
72#[cfg(feature = "mysql")]
73pub mod mysql;
74
75#[cfg(feature = "sqlserver")]
76pub mod sqlserver;
77
78/// Prelude module for convenient imports
79pub mod prelude {
80    // Error types
81    pub use crate::error::{Error, ErrorCategory, Result};
82
83    // Value and type system
84    pub use crate::types::{ColumnMetadata, Row, TableMetadata, Value};
85
86    // Connection traits and config
87    pub use crate::connection::{
88        Connection, ConnectionConfig, ConnectionFactory, ConnectionLifecycle, DatabaseType,
89        IsolationLevel, PreparedStatement, RowStream, Transaction,
90    };
91
92    // Pool types
93    pub use crate::pool::{
94        create_pool, create_pool_with_config, AtomicPoolStats, ConnectionPool, PoolBuilder,
95        PoolConfig, PoolStats, PooledConnection, RecycleReason, SimpleConnectionPool,
96    };
97
98    // Dialect types
99    pub use crate::dialect::{
100        dialect_for, MariaDbDialect, MySqlDialect, PostgresDialect, SqlDialect, SqlServerDialect,
101    };
102
103    // Schema types
104    pub use crate::schema::{
105        AutoDdlMode, ForeignKeyAction, ForeignKeyMetadata, IdentityNaming, IndexMetadata,
106        PrefixNaming, SchemaEvolutionMode, SchemaEvolutionResult, SchemaManager, SchemaMapping,
107        SchemaProvider, SuffixNaming, TableNamingStrategy,
108    };
109
110    // Source types
111    pub use crate::source::{
112        AtomicSourceStats, PollResult, QueryMode, SourceOffset, SourceQueryBuilder, SourceRecord,
113        SourceStats, TableSource, TableSourceConfig,
114    };
115
116    // Sink types
117    pub use crate::sink::{
118        AtomicSinkStats, BatchConfig, BatchResult, BufferedSink, FailedRecord, SinkRecord,
119        SinkStats, TableSink, TableSinkBuilder, TableSinkConfig, WriteMode,
120    };
121}
122
123// Re-export commonly used items at crate root
124pub use error::{Error, Result};
125pub use types::Value;
126
127#[cfg(test)]
128mod tests {
129    use super::prelude::*;
130
131    #[test]
132    fn test_prelude_imports() {
133        // Ensure common types are accessible
134        let _value = Value::Int32(42);
135        let _config = ConnectionConfig::new("postgres://localhost/test");
136        let _batch = BatchConfig::default();
137        let _mode = WriteMode::Upsert;
138    }
139
140    #[test]
141    fn test_error_types() {
142        let err = Error::connection("test error");
143        assert!(err.is_retriable());
144        assert_eq!(err.category(), ErrorCategory::Connection);
145    }
146
147    #[test]
148    fn test_value_types() {
149        let v = Value::from(42_i32);
150        assert!(!v.is_null());
151        assert_eq!(v.as_i64(), Some(42));
152
153        let v = Value::from("hello");
154        assert_eq!(v.as_str(), Some("hello"));
155    }
156
157    #[test]
158    fn test_table_metadata() {
159        let mut table = TableMetadata::new("users");
160        table.schema = Some("public".into());
161
162        assert_eq!(table.qualified_name(), "public.users");
163        assert!(table.columns.is_empty());
164    }
165
166    #[test]
167    fn test_query_modes() {
168        assert!(!QueryMode::Bulk.is_incremental());
169        assert!(QueryMode::incrementing("id").is_incremental());
170        assert!(QueryMode::timestamp("updated_at").is_incremental());
171    }
172
173    #[test]
174    fn test_write_modes() {
175        assert_eq!(WriteMode::default(), WriteMode::Upsert);
176    }
177
178    #[test]
179    fn test_dialect_selection() {
180        let pg = dialect_for("postgres");
181        assert_eq!(pg.name(), "PostgreSQL");
182
183        let mysql = dialect_for("mysql");
184        assert_eq!(mysql.name(), "MySQL");
185
186        let mssql = dialect_for("sqlserver");
187        assert_eq!(mssql.name(), "SQL Server");
188    }
189
190    #[test]
191    fn test_source_config() {
192        let config = TableSourceConfig::incrementing("events", "id")
193            .with_schema("public")
194            .with_batch_size(500);
195
196        assert_eq!(config.table, "events");
197        assert_eq!(config.schema, Some("public".into()));
198        assert_eq!(config.batch_size, 500);
199    }
200
201    #[test]
202    fn test_sink_config() {
203        let config = TableSinkBuilder::new()
204            .batch_size(2000)
205            .write_mode(WriteMode::Insert)
206            .auto_ddl(AutoDdlMode::CreateAndEvolve)
207            .build();
208
209        assert_eq!(config.batch.max_size, 2000);
210        assert_eq!(config.write_mode, WriteMode::Insert);
211        assert_eq!(config.auto_ddl, AutoDdlMode::CreateAndEvolve);
212    }
213}