Skip to main content

fluxforge/drivers/
mod.rs

1//! Database driver implementations.
2//!
3//! This module provides concrete implementations of the `DatabaseDriver` trait
4//! for MySQL and PostgreSQL databases, along with a factory function for creating
5//! driver instances from connection URLs.
6
7pub mod mysql;
8pub mod postgres;
9
10pub use mysql::MySqlDriver;
11pub use postgres::PostgresDriver;
12
13use crate::core::ForgeConfig;
14use crate::drivers::mysql::get_mysql_init_session_sql_mode;
15use crate::DatabaseDriver;
16use sqlx::mysql::{MySqlConnectOptions, MySqlPoolOptions};
17use sqlx::ConnectOptions;
18use sqlx::{MySqlPool, PgPool};
19use std::error::Error;
20use std::str::FromStr;
21
22/// Creates a database driver from a connection URL.
23///
24/// Automatically detects the database type from the URL protocol and returns
25/// the appropriate driver implementation. Supports MySQL and PostgreSQL.
26///
27/// # Arguments
28///
29/// * `url` - Database connection URL (e.g., "mysql://user:pass@host/db" or "postgres://user:pass@host/db")
30/// * `config` - Configuration for type mappings and database-specific rules
31///
32/// # Examples
33///
34/// ```no_run
35/// use fluxforge::{drivers, core::ForgeConfig};
36///
37/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
38/// let config = ForgeConfig::default();
39///
40/// // Create MySQL driver
41/// let mysql_driver = drivers::create_driver(
42///     "mysql://root:password@localhost:3306/mydb",
43///     &config,
44///     true
45/// ).await?;
46///
47/// // Create PostgreSQL driver
48/// let pg_driver = drivers::create_driver(
49///     "postgres://postgres:password@localhost:5432/mydb",
50///     &config,
51///     true
52/// ).await?;
53/// # Ok(())
54/// # }
55/// ```
56///
57/// # Errors
58///
59/// Returns an error if:
60/// - The URL protocol is not supported (only mysql:// and postgres:// are supported)
61/// - Database connection fails (invalid credentials, host unreachable, etc.)
62/// - Connection pool cannot be established
63pub async fn create_driver(
64    url: &str,
65    config: &ForgeConfig,
66    is_source_driver: bool,
67) -> Result<Box<dyn DatabaseDriver>, Box<dyn Error>> {
68    if url.starts_with("mysql://") {
69        let zero_date_on_write = config
70            .mysql
71            .as_ref()
72            .and_then(|r| r.rules.as_ref())
73            .and_then(|r| r.on_write.as_ref())
74            .and_then(|w| w.zero_date)
75            .unwrap_or(false); // default false, if not in config
76
77        let sql_mode = get_mysql_init_session_sql_mode(config, is_source_driver);
78
79        if sql_mode == "".to_string() {
80            let pool = MySqlPool::connect(url).await?;
81            let driver = MySqlDriver {
82                pool,
83                zero_date_on_write,
84            };
85            Ok(Box::new(driver))
86        } else {
87            let sql_command_for_hook = sql_mode.clone(); // copy for outer Closure
88
89            let opts = MySqlConnectOptions::from_str(url)?;
90
91            // create pool with options
92            let pool = MySqlPoolOptions::new()
93                .max_connections(5)
94                .after_connect(move |conn, _meta| {
95                    // IMPORTANT: wen need a new copy for every call which is then "moved" into the async block
96                    let cmd = sql_command_for_hook.clone();
97
98                    Box::pin(async move {
99                        sqlx::query(&cmd).execute(conn).await?;
100                        Ok(())
101                    })
102                })
103                .connect_with(opts)
104                .await?;
105            let driver = MySqlDriver {
106                pool,
107                zero_date_on_write,
108            };
109            Ok(Box::new(driver))
110        }
111    }
112    // if mysql
113    else if url.starts_with("postgres://") || url.starts_with("postgresql://") {
114        let pool = PgPool::connect(url).await?;
115        Ok(Box::new(postgres::PostgresDriver { pool: Some(pool) }))
116    } else {
117        Err(format!("Unsupported database protocol in URL: {url}").into())
118    }
119}