use crate::backends::{DatabaseError, DatabaseType, Row as DbRow, connection::DatabaseConnection};
use sqlx::{Any, AnyPool, pool::PoolOptions};
use std::time::Duration;
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EngineConfig {
pub url: String,
pub pool_min_size: u32,
pub pool_max_size: u32,
pub pool_timeout: u64,
pub pool_idle_timeout: Option<u64>,
pub pool_max_lifetime: Option<u64>,
pub echo: bool,
pub query_cache_size: usize,
}
impl Default for EngineConfig {
fn default() -> Self {
Self {
url: "sqlite::memory:".to_string(),
pool_min_size: 1,
pool_max_size: 10,
pool_timeout: 30,
pool_idle_timeout: Some(600), pool_max_lifetime: Some(1800), echo: false,
query_cache_size: 500,
}
}
}
impl EngineConfig {
pub fn new(url: impl Into<String>) -> Self {
Self {
url: url.into(),
..Default::default()
}
}
pub fn with_pool_size(mut self, min: u32, max: u32) -> Self {
self.pool_min_size = min;
self.pool_max_size = max;
self
}
pub fn with_timeout(mut self, timeout: u64) -> Self {
self.pool_timeout = timeout;
self
}
pub fn with_idle_timeout(mut self, timeout: Option<u64>) -> Self {
self.pool_idle_timeout = timeout;
self
}
pub fn with_max_lifetime(mut self, lifetime: Option<u64>) -> Self {
self.pool_max_lifetime = lifetime;
self
}
pub fn with_echo(mut self, echo: bool) -> Self {
self.echo = echo;
self
}
pub fn with_cache_size(mut self, size: usize) -> Self {
self.query_cache_size = size;
self
}
}
pub struct Engine {
pool: AnyPool,
config: EngineConfig,
}
impl Engine {
pub async fn from_config(config: EngineConfig) -> Result<Self, sqlx::Error> {
let mut pool_options = PoolOptions::<Any>::new()
.min_connections(config.pool_min_size)
.max_connections(config.pool_max_size)
.acquire_timeout(Duration::from_secs(config.pool_timeout));
if let Some(idle_timeout) = config.pool_idle_timeout {
pool_options = pool_options.idle_timeout(Duration::from_secs(idle_timeout));
}
if let Some(max_lifetime) = config.pool_max_lifetime {
pool_options = pool_options.max_lifetime(Duration::from_secs(max_lifetime));
}
let pool = pool_options.connect(&config.url).await?;
Ok(Self { pool, config })
}
pub async fn new(url: impl Into<String>) -> Result<Self, sqlx::Error> {
Self::from_config(EngineConfig::new(url)).await
}
pub async fn connect(&self) -> Result<sqlx::pool::PoolConnection<Any>, sqlx::Error> {
self.pool.acquire().await
}
pub async fn execute(&self, sql: &str) -> Result<u64, sqlx::Error> {
if self.config.echo {
println!("SQL: {}", sql);
}
let result = sqlx::query(sql).execute(&self.pool).await?;
Ok(result.rows_affected())
}
pub async fn fetch_all(&self, sql: &str) -> Result<Vec<sqlx::any::AnyRow>, sqlx::Error> {
if self.config.echo {
println!("SQL: {}", sql);
}
sqlx::query(sql).fetch_all(&self.pool).await
}
pub async fn fetch_one(&self, sql: &str) -> Result<sqlx::any::AnyRow, sqlx::Error> {
if self.config.echo {
println!("SQL: {}", sql);
}
sqlx::query(sql).fetch_one(&self.pool).await
}
pub async fn fetch_optional(
&self,
sql: &str,
) -> Result<Option<sqlx::any::AnyRow>, sqlx::Error> {
if self.config.echo {
println!("SQL: {}", sql);
}
sqlx::query(sql).fetch_optional(&self.pool).await
}
pub async fn begin(&self) -> Result<sqlx::Transaction<'_, Any>, sqlx::Error> {
self.pool.begin().await
}
pub fn config(&self) -> &EngineConfig {
&self.config
}
pub fn pool(&self) -> &AnyPool {
&self.pool
}
pub fn clone_ref(&self) -> Self {
Self {
pool: self.pool.clone(),
config: self.config.clone(),
}
}
}
pub async fn create_engine(url: impl Into<String>) -> Result<Engine, sqlx::Error> {
Engine::new(url).await
}
pub async fn create_engine_with_config(config: EngineConfig) -> Result<Engine, sqlx::Error> {
Engine::from_config(config).await
}
pub struct DatabaseEngine {
connection: DatabaseConnection,
db_type: DatabaseType,
config: EngineConfig,
}
impl DatabaseEngine {
pub fn new(connection: DatabaseConnection, db_type: DatabaseType) -> Self {
Self {
connection,
db_type,
config: EngineConfig::default(),
}
}
pub fn with_config(
connection: DatabaseConnection,
db_type: DatabaseType,
config: EngineConfig,
) -> Self {
Self {
connection,
db_type,
config,
}
}
#[cfg(feature = "postgres")]
pub async fn from_postgres(url: &str) -> Result<Self, DatabaseError> {
let connection = DatabaseConnection::connect_postgres(url).await?;
Ok(Self::new(connection, DatabaseType::Postgres))
}
#[cfg(feature = "sqlite")]
pub async fn from_sqlite(url: &str) -> Result<Self, DatabaseError> {
let connection = DatabaseConnection::connect_sqlite(url).await?;
Ok(Self::new(connection, DatabaseType::Sqlite))
}
#[cfg(feature = "mysql")]
pub async fn from_mysql(url: &str) -> Result<Self, DatabaseError> {
let connection = DatabaseConnection::connect_mysql(url).await?;
Ok(Self::new(connection, DatabaseType::Mysql))
}
pub fn connection(&self) -> &DatabaseConnection {
&self.connection
}
pub fn database_type(&self) -> DatabaseType {
self.db_type
}
pub fn config(&self) -> &EngineConfig {
&self.config
}
pub async fn execute(&self, sql: &str) -> Result<u64, DatabaseError> {
if self.config.echo {
println!("SQL: {}", sql);
}
let result = self.connection.execute(sql, vec![]).await?;
Ok(result.rows_affected)
}
pub async fn fetch_all(&self, sql: &str) -> Result<Vec<DbRow>, DatabaseError> {
if self.config.echo {
println!("SQL: {}", sql);
}
self.connection.fetch_all(sql, vec![]).await
}
pub async fn fetch_one(&self, sql: &str) -> Result<DbRow, DatabaseError> {
if self.config.echo {
println!("SQL: {}", sql);
}
self.connection.fetch_one(sql, vec![]).await
}
pub async fn fetch_optional(&self, sql: &str) -> Result<Option<DbRow>, DatabaseError> {
if self.config.echo {
println!("SQL: {}", sql);
}
let rows = self.connection.fetch_all(sql, vec![]).await?;
Ok(rows.into_iter().next())
}
pub fn clone_ref(&self) -> Self {
Self {
connection: self.connection.clone(),
db_type: self.db_type,
config: self.config.clone(),
}
}
}
#[cfg(feature = "postgres")]
pub async fn create_database_engine_postgres(url: &str) -> Result<DatabaseEngine, DatabaseError> {
DatabaseEngine::from_postgres(url).await
}
#[cfg(feature = "sqlite")]
pub async fn create_database_engine_sqlite(url: &str) -> Result<DatabaseEngine, DatabaseError> {
DatabaseEngine::from_sqlite(url).await
}
#[cfg(feature = "mysql")]
pub async fn create_database_engine_mysql(url: &str) -> Result<DatabaseEngine, DatabaseError> {
DatabaseEngine::from_mysql(url).await
}
#[cfg(test)]
mod tests {
use sqlx::sqlite::{SqlitePool, SqlitePoolOptions};
async fn create_test_pool() -> SqlitePool {
SqlitePoolOptions::new()
.min_connections(0)
.max_connections(5)
.connect("sqlite::memory:")
.await
.expect("Failed to create SQLite pool")
}
#[tokio::test]
async fn test_engine_creation() {
let pool = create_test_pool().await;
assert!(!pool.is_closed());
pool.close().await;
}
#[tokio::test]
async fn test_engine_execute() {
let pool = create_test_pool().await;
let result = sqlx::query("CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)")
.execute(&pool)
.await;
assert!(result.is_ok());
pool.close().await;
}
#[tokio::test]
async fn test_engine_query() {
let pool = create_test_pool().await;
sqlx::query("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
.execute(&pool)
.await
.unwrap();
sqlx::query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
.execute(&pool)
.await
.unwrap();
let rows = sqlx::query("SELECT * FROM users")
.fetch_all(&pool)
.await
.expect("Query failed");
assert_eq!(rows.len(), 1);
pool.close().await;
}
#[tokio::test]
async fn test_engine_with_config() {
let pool = SqlitePoolOptions::new()
.min_connections(2)
.max_connections(5)
.connect("sqlite::memory:")
.await
.expect("Failed to create engine with config");
assert!(!pool.is_closed());
pool.close().await;
}
#[tokio::test]
async fn test_transaction() {
let pool = create_test_pool().await;
sqlx::query("CREATE TABLE accounts (id INTEGER PRIMARY KEY, balance INTEGER)")
.execute(&pool)
.await
.unwrap();
let mut tx = pool.begin().await.expect("Failed to begin transaction");
sqlx::query("INSERT INTO accounts (id, balance) VALUES (1, 100)")
.execute(&mut *tx)
.await
.unwrap();
tx.commit().await.expect("Failed to commit");
let rows = sqlx::query("SELECT * FROM accounts")
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(rows.len(), 1);
pool.close().await;
}
}