1#![cfg_attr(coverage_nightly, feature(coverage_attribute))]
2#![cfg_attr(
59 not(any(feature = "pg", feature = "mysql", feature = "sqlite")),
60 allow(
61 unused_imports,
62 unused_variables,
63 dead_code,
64 unreachable_code,
65 unused_lifetimes,
66 clippy::unused_async,
67 )
68)]
69
70pub use advisory_locks::{DbLockGuard, LockConfig};
72
73pub use sea_orm_migration;
75
76pub mod advisory_locks;
78pub mod config;
79pub mod contention;
80pub mod deadlock;
81pub mod manager;
82pub mod migration_runner;
83pub mod odata;
84pub mod options;
85
86#[cfg(feature = "preview-outbox")]
87pub mod outbox;
88pub mod secure;
89
90mod db_provider;
91
92mod pool_opts;
94#[cfg(feature = "sqlite")]
95mod sqlite;
96
97pub use config::{DbConnConfig, GlobalDatabaseConfig, PoolCfg};
99pub use manager::DbManager;
100pub use options::redact_credentials_in_dsn;
101
102pub use secure::{Db, DbConn, DbTx};
104
105pub use db_provider::DBProvider;
107
108pub async fn connect_db(dsn: &str, opts: ConnectOpts) -> Result<Db> {
116 let handle = DbHandle::connect(dsn, opts).await?;
117 Ok(Db::new(handle))
118}
119
120pub async fn build_db(cfg: DbConnConfig, global: Option<&GlobalDatabaseConfig>) -> Result<Db> {
126 let handle = options::build_db_handle(cfg, global).await?;
127 Ok(Db::new(handle))
128}
129
130use std::time::Duration;
131
132#[cfg(any(feature = "pg", feature = "mysql", feature = "sqlite"))]
134use pool_opts::ApplyPoolOpts;
135#[cfg(feature = "sqlite")]
136use sqlite::{Pragmas, extract_sqlite_pragmas, is_memory_dsn, prepare_sqlite_path};
137
138#[cfg(feature = "mysql")]
141use sqlx::mysql::MySqlPoolOptions;
142#[cfg(feature = "pg")]
143use sqlx::postgres::PgPoolOptions;
144#[cfg(feature = "sqlite")]
145use sqlx::sqlite::SqlitePoolOptions;
146#[cfg(feature = "sqlite")]
147use std::str::FromStr;
148
149use sea_orm::DatabaseConnection;
150#[cfg(feature = "mysql")]
151use sea_orm::SqlxMySqlConnector;
152#[cfg(feature = "pg")]
153use sea_orm::SqlxPostgresConnector;
154#[cfg(feature = "sqlite")]
155use sea_orm::SqlxSqliteConnector;
156
157use thiserror::Error;
158
159pub type Result<T> = std::result::Result<T, DbError>;
161
162#[derive(Debug, Error)]
164pub enum DbError {
165 #[error("Unknown DSN: {0}")]
166 UnknownDsn(String),
167
168 #[error("Feature not enabled: {0}")]
169 FeatureDisabled(&'static str),
170
171 #[error("Invalid configuration: {0}")]
172 InvalidConfig(String),
173
174 #[error("Configuration conflict: {0}")]
175 ConfigConflict(String),
176
177 #[error("Invalid SQLite PRAGMA parameter '{key}': {message}")]
178 InvalidSqlitePragma { key: String, message: String },
179
180 #[error("Unknown SQLite PRAGMA parameter: {0}")]
181 UnknownSqlitePragma(String),
182
183 #[error("Invalid connection parameter: {0}")]
184 InvalidParameter(String),
185
186 #[error("SQLite pragma error: {0}")]
187 SqlitePragma(String),
188
189 #[error("Environment variable '{name}': {source}")]
190 EnvVar {
191 name: String,
192 source: std::env::VarError,
193 },
194
195 #[error("URL parsing error: {0}")]
196 UrlParse(#[from] url::ParseError),
197
198 #[cfg(any(feature = "pg", feature = "mysql", feature = "sqlite"))]
199 #[error(transparent)]
200 Sqlx(#[from] sqlx::Error),
201
202 #[error(transparent)]
203 Sea(#[from] sea_orm::DbErr),
204
205 #[error(transparent)]
206 Io(#[from] std::io::Error),
207
208 #[error(transparent)]
210 Lock(#[from] advisory_locks::DbLockError),
211
212 #[error(transparent)]
213 Other(#[from] anyhow::Error),
214
215 #[error("Cannot create non-transactional connection inside an active transaction")]
240 ConnRequestedInsideTx,
241}
242
243impl From<modkit_utils::var_expand::ExpandVarsError> for DbError {
244 fn from(err: modkit_utils::var_expand::ExpandVarsError) -> Self {
245 match err {
246 modkit_utils::var_expand::ExpandVarsError::Var { name, source } => {
247 Self::EnvVar { name, source }
248 }
249 modkit_utils::var_expand::ExpandVarsError::Regex(msg) => Self::InvalidParameter(msg),
250 }
251 }
252}
253
254impl From<crate::secure::ScopeError> for DbError {
255 fn from(value: crate::secure::ScopeError) -> Self {
256 DbError::Other(anyhow::Error::new(value))
259 }
260}
261
262#[derive(Clone, Copy, Debug, PartialEq, Eq)]
264pub enum DbEngine {
265 Postgres,
266 MySql,
267 Sqlite,
268}
269
270#[derive(Clone, Debug)]
273pub struct ConnectOpts {
274 pub max_conns: Option<u32>,
276 pub min_conns: Option<u32>,
278 pub acquire_timeout: Option<Duration>,
280 pub idle_timeout: Option<Duration>,
282 pub max_lifetime: Option<Duration>,
284 pub test_before_acquire: bool,
286 pub create_sqlite_dirs: bool,
288}
289impl Default for ConnectOpts {
290 fn default() -> Self {
291 Self {
292 max_conns: Some(10),
293 min_conns: None,
294 acquire_timeout: Some(Duration::from_secs(30)),
295 idle_timeout: None,
296 max_lifetime: None,
297 test_before_acquire: false,
298
299 create_sqlite_dirs: true,
300 }
301 }
302}
303
304#[derive(Debug, Clone)]
306pub(crate) struct DbHandle {
307 engine: DbEngine,
308 dsn: String,
309 sea: DatabaseConnection,
310}
311
312#[cfg(feature = "sqlite")]
313const DEFAULT_SQLITE_BUSY_TIMEOUT: i32 = 5000;
314
315impl DbHandle {
316 pub(crate) fn detect(dsn: &str) -> Result<DbEngine> {
323 let s = dsn.trim_start();
325
326 if s.starts_with("postgres://") || s.starts_with("postgresql://") {
329 Ok(DbEngine::Postgres)
330 } else if s.starts_with("mysql://") {
331 Ok(DbEngine::MySql)
332 } else if s.starts_with("sqlite:") || s.starts_with("sqlite://") {
333 Ok(DbEngine::Sqlite)
334 } else {
335 Err(DbError::UnknownDsn(dsn.to_owned()))
336 }
337 }
338
339 pub(crate) async fn connect(dsn: &str, opts: ConnectOpts) -> Result<Self> {
344 let engine = Self::detect(dsn)?;
345 match engine {
346 #[cfg(feature = "pg")]
347 DbEngine::Postgres => {
348 let o = PgPoolOptions::new().apply(&opts);
349 let pool = o.connect(dsn).await?;
350 let sea = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);
351 Ok(Self {
352 engine,
353 dsn: dsn.to_owned(),
354 sea,
355 })
356 }
357 #[cfg(not(feature = "pg"))]
358 DbEngine::Postgres => Err(DbError::FeatureDisabled("PostgreSQL feature not enabled")),
359 #[cfg(feature = "mysql")]
360 DbEngine::MySql => {
361 let o = MySqlPoolOptions::new().apply(&opts);
362 let pool = o.connect(dsn).await?;
363 let sea = SqlxMySqlConnector::from_sqlx_mysql_pool(pool);
364 Ok(Self {
365 engine,
366 dsn: dsn.to_owned(),
367 sea,
368 })
369 }
370 #[cfg(not(feature = "mysql"))]
371 DbEngine::MySql => Err(DbError::FeatureDisabled("MySQL feature not enabled")),
372 #[cfg(feature = "sqlite")]
373 DbEngine::Sqlite => {
374 let dsn = prepare_sqlite_path(dsn, opts.create_sqlite_dirs)?;
375
376 let (clean_dsn, pairs) = extract_sqlite_pragmas(&dsn);
378 let pragmas = Pragmas::from_pairs(&pairs);
379
380 let o = SqlitePoolOptions::new().apply(&opts);
382
383 let is_memory = is_memory_dsn(&clean_dsn);
385 let mut conn_opts = sqlx::sqlite::SqliteConnectOptions::from_str(&clean_dsn)?;
386
387 let journal_mode = if let Some(mode) = &pragmas.journal_mode {
388 match mode {
389 sqlite::pragmas::JournalMode::Delete => {
390 sqlx::sqlite::SqliteJournalMode::Delete
391 }
392 sqlite::pragmas::JournalMode::Wal => sqlx::sqlite::SqliteJournalMode::Wal,
393 sqlite::pragmas::JournalMode::Memory => {
394 sqlx::sqlite::SqliteJournalMode::Memory
395 }
396 sqlite::pragmas::JournalMode::Truncate => {
397 sqlx::sqlite::SqliteJournalMode::Truncate
398 }
399 sqlite::pragmas::JournalMode::Persist => {
400 sqlx::sqlite::SqliteJournalMode::Persist
401 }
402 sqlite::pragmas::JournalMode::Off => sqlx::sqlite::SqliteJournalMode::Off,
403 }
404 } else if let Some(wal_toggle) = pragmas.wal_toggle {
405 if wal_toggle {
406 sqlx::sqlite::SqliteJournalMode::Wal
407 } else {
408 sqlx::sqlite::SqliteJournalMode::Delete
409 }
410 } else if is_memory {
411 sqlx::sqlite::SqliteJournalMode::Delete
412 } else {
413 sqlx::sqlite::SqliteJournalMode::Wal
414 };
415 conn_opts = conn_opts.journal_mode(journal_mode);
416
417 let sync_mode = pragmas.synchronous.as_ref().map_or(
418 sqlx::sqlite::SqliteSynchronous::Normal,
419 |s| match s {
420 sqlite::pragmas::SyncMode::Off => sqlx::sqlite::SqliteSynchronous::Off,
421 sqlite::pragmas::SyncMode::Normal => {
422 sqlx::sqlite::SqliteSynchronous::Normal
423 }
424 sqlite::pragmas::SyncMode::Full => sqlx::sqlite::SqliteSynchronous::Full,
425 sqlite::pragmas::SyncMode::Extra => sqlx::sqlite::SqliteSynchronous::Extra,
426 },
427 );
428 conn_opts = conn_opts.synchronous(sync_mode);
429
430 if !is_memory {
431 let busy_timeout_ms_i64 = pragmas
432 .busy_timeout_ms
433 .unwrap_or(DEFAULT_SQLITE_BUSY_TIMEOUT.into())
434 .max(0);
435 let busy_timeout_ms = u64::try_from(busy_timeout_ms_i64).unwrap_or(0);
436 conn_opts =
437 conn_opts.busy_timeout(std::time::Duration::from_millis(busy_timeout_ms));
438 }
439
440 let pool = o.connect_with(conn_opts).await?;
441 let sea = SqlxSqliteConnector::from_sqlx_sqlite_pool(pool);
442
443 Ok(Self {
444 engine,
445 dsn: clean_dsn,
446 sea,
447 })
448 }
449 #[cfg(not(feature = "sqlite"))]
450 DbEngine::Sqlite => Err(DbError::FeatureDisabled("SQLite feature not enabled")),
451 }
452 }
453
454 #[must_use]
456 pub fn engine(&self) -> DbEngine {
457 self.engine
458 }
459
460 #[must_use]
462 pub fn dsn(&self) -> &str {
463 &self.dsn
464 }
465
466 #[must_use]
485 pub(crate) fn sea_internal(&self) -> DatabaseConnection {
486 self.sea.clone()
487 }
488
489 #[must_use]
497 pub(crate) fn sea_internal_ref(&self) -> &DatabaseConnection {
498 &self.sea
499 }
500
501 pub async fn lock(&self, module: &str, key: &str) -> Result<DbLockGuard> {
508 let lock_manager = advisory_locks::LockManager::new(self.dsn.clone());
509 let guard = lock_manager.lock(module, key).await?;
510 Ok(guard)
511 }
512
513 pub async fn try_lock(
518 &self,
519 module: &str,
520 key: &str,
521 config: LockConfig,
522 ) -> Result<Option<DbLockGuard>> {
523 let lock_manager = advisory_locks::LockManager::new(self.dsn.clone());
524 let res = lock_manager.try_lock(module, key, config).await?;
525 Ok(res)
526 }
527
528 }
531
532#[cfg(test)]
535#[cfg_attr(coverage_nightly, coverage(off))]
536mod tests {
537 use super::*;
538 #[cfg(feature = "sqlite")]
539 use tokio::time::Duration;
540
541 #[cfg(feature = "sqlite")]
542 #[tokio::test]
543 async fn test_sqlite_connection() -> Result<()> {
544 let dsn = "sqlite::memory:";
545 let opts = ConnectOpts::default();
546 let db = DbHandle::connect(dsn, opts).await?;
547 assert_eq!(db.engine(), DbEngine::Sqlite);
548 Ok(())
549 }
550
551 #[cfg(feature = "sqlite")]
552 #[tokio::test]
553 async fn test_sqlite_connection_with_pragma_parameters() -> Result<()> {
554 let dsn = "sqlite::memory:?wal=true&synchronous=NORMAL&busy_timeout=5000&journal_mode=WAL";
556 let opts = ConnectOpts::default();
557 let db = DbHandle::connect(dsn, opts).await?;
558 assert_eq!(db.engine(), DbEngine::Sqlite);
559
560 assert!(db.dsn == "sqlite::memory:" || db.dsn.starts_with("sqlite::memory:"));
563
564 Ok(())
565 }
566
567 #[tokio::test]
568 async fn test_backend_detection() {
569 assert_eq!(
570 DbHandle::detect("sqlite::memory:").unwrap(),
571 DbEngine::Sqlite
572 );
573 assert_eq!(
574 DbHandle::detect("postgres://localhost/test").unwrap(),
575 DbEngine::Postgres
576 );
577 assert_eq!(
578 DbHandle::detect("mysql://localhost/test").unwrap(),
579 DbEngine::MySql
580 );
581 assert!(DbHandle::detect("unknown://test").is_err());
582 }
583
584 #[cfg(feature = "sqlite")]
585 #[tokio::test]
586 async fn test_advisory_lock_sqlite() -> Result<()> {
587 let dsn = "sqlite:file:memdb1?mode=memory&cache=shared";
588 let db = DbHandle::connect(dsn, ConnectOpts::default()).await?;
589
590 let now = std::time::SystemTime::now()
591 .duration_since(std::time::UNIX_EPOCH)
592 .map_or(0, |d| d.as_nanos());
593 let test_id = format!("test_basic_{now}");
594
595 let guard1 = db.lock("test_module", &format!("{test_id}_key1")).await?;
596 let _guard2 = db.lock("test_module", &format!("{test_id}_key2")).await?;
597 let _guard3 = db
598 .lock("different_module", &format!("{test_id}_key1"))
599 .await?;
600
601 guard1.release().await;
603 let _guard4 = db.lock("test_module", &format!("{test_id}_key1")).await?;
604 Ok(())
605 }
606
607 #[cfg(feature = "sqlite")]
608 #[tokio::test]
609 async fn test_advisory_lock_different_keys() -> Result<()> {
610 let dsn = "sqlite:file:memdb_diff_keys?mode=memory&cache=shared";
611 let db = DbHandle::connect(dsn, ConnectOpts::default()).await?;
612
613 let now = std::time::SystemTime::now()
614 .duration_since(std::time::UNIX_EPOCH)
615 .map_or(0, |d| d.as_nanos());
616 let test_id = format!("test_diff_{now}");
617
618 let _guard1 = db.lock("test_module", &format!("{test_id}_key1")).await?;
619 let _guard2 = db.lock("test_module", &format!("{test_id}_key2")).await?;
620 let _guard3 = db.lock("other_module", &format!("{test_id}_key1")).await?;
621 Ok(())
622 }
623
624 #[cfg(feature = "sqlite")]
625 #[tokio::test]
626 async fn test_try_lock_with_config() -> Result<()> {
627 let dsn = "sqlite:file:memdb2?mode=memory&cache=shared";
628 let db = DbHandle::connect(dsn, ConnectOpts::default()).await?;
629
630 let now = std::time::SystemTime::now()
631 .duration_since(std::time::UNIX_EPOCH)
632 .map_or(0, |d| d.as_nanos());
633 let test_id = format!("test_config_{now}");
634
635 let _guard1 = db.lock("test_module", &format!("{test_id}_key")).await?;
636
637 let config = LockConfig {
638 max_wait: Some(Duration::from_millis(200)),
639 initial_backoff: Duration::from_millis(50),
640 max_attempts: Some(3),
641 ..Default::default()
642 };
643
644 let result = db
645 .try_lock("test_module", &format!("{test_id}_different_key"), config)
646 .await?;
647 assert!(
648 result.is_some(),
649 "expected lock acquisition for different key"
650 );
651 Ok(())
652 }
653
654 #[cfg(feature = "sqlite")]
655 #[tokio::test]
656 async fn test_sea_internal_access() -> Result<()> {
657 let dsn = "sqlite::memory:";
658 let db = DbHandle::connect(dsn, ConnectOpts::default()).await?;
659
660 let _raw = db.sea_internal();
662 Ok(())
663 }
664}