1#![cfg_attr(coverage_nightly, feature(coverage_attribute))]
2#![cfg_attr(
59 not(any(feature = "pg", feature = "mysql", feature = "sqlite")),
60 allow(
61 unused_imports,
62 unused_variables,
63 dead_code,
64 unreachable_code,
65 unused_lifetimes,
66 clippy::unused_async,
67 )
68)]
69
70pub use advisory_locks::{DbLockGuard, LockConfig};
72
73pub use sea_orm_migration;
75
76pub mod advisory_locks;
78pub mod config;
79pub mod manager;
80pub mod migration_runner;
81pub mod odata;
82pub mod options;
83
84#[cfg(feature = "preview-outbox")]
85pub mod outbox;
86pub mod secure;
87
88mod db_provider;
89
90mod pool_opts;
92#[cfg(feature = "sqlite")]
93mod sqlite;
94
95pub use config::{DbConnConfig, GlobalDatabaseConfig, PoolCfg};
97pub use manager::DbManager;
98pub use options::redact_credentials_in_dsn;
99
100pub use secure::{Db, DbConn, DbTx};
102
103pub use db_provider::DBProvider;
105
106pub async fn connect_db(dsn: &str, opts: ConnectOpts) -> Result<Db> {
114 let handle = DbHandle::connect(dsn, opts).await?;
115 Ok(Db::new(handle))
116}
117
118pub async fn build_db(cfg: DbConnConfig, global: Option<&GlobalDatabaseConfig>) -> Result<Db> {
124 let handle = options::build_db_handle(cfg, global).await?;
125 Ok(Db::new(handle))
126}
127
128use std::time::Duration;
129
130#[cfg(any(feature = "pg", feature = "mysql", feature = "sqlite"))]
132use pool_opts::ApplyPoolOpts;
133#[cfg(feature = "sqlite")]
134use sqlite::{Pragmas, extract_sqlite_pragmas, is_memory_dsn, prepare_sqlite_path};
135
136#[cfg(feature = "mysql")]
139use sqlx::mysql::MySqlPoolOptions;
140#[cfg(feature = "pg")]
141use sqlx::postgres::PgPoolOptions;
142#[cfg(feature = "sqlite")]
143use sqlx::sqlite::SqlitePoolOptions;
144#[cfg(feature = "sqlite")]
145use std::str::FromStr;
146
147use sea_orm::DatabaseConnection;
148#[cfg(feature = "mysql")]
149use sea_orm::SqlxMySqlConnector;
150#[cfg(feature = "pg")]
151use sea_orm::SqlxPostgresConnector;
152#[cfg(feature = "sqlite")]
153use sea_orm::SqlxSqliteConnector;
154
155use thiserror::Error;
156
157pub type Result<T> = std::result::Result<T, DbError>;
159
160#[derive(Debug, Error)]
162pub enum DbError {
163 #[error("Unknown DSN: {0}")]
164 UnknownDsn(String),
165
166 #[error("Feature not enabled: {0}")]
167 FeatureDisabled(&'static str),
168
169 #[error("Invalid configuration: {0}")]
170 InvalidConfig(String),
171
172 #[error("Configuration conflict: {0}")]
173 ConfigConflict(String),
174
175 #[error("Invalid SQLite PRAGMA parameter '{key}': {message}")]
176 InvalidSqlitePragma { key: String, message: String },
177
178 #[error("Unknown SQLite PRAGMA parameter: {0}")]
179 UnknownSqlitePragma(String),
180
181 #[error("Invalid connection parameter: {0}")]
182 InvalidParameter(String),
183
184 #[error("SQLite pragma error: {0}")]
185 SqlitePragma(String),
186
187 #[error("Environment variable '{name}': {source}")]
188 EnvVar {
189 name: String,
190 source: std::env::VarError,
191 },
192
193 #[error("URL parsing error: {0}")]
194 UrlParse(#[from] url::ParseError),
195
196 #[cfg(any(feature = "pg", feature = "mysql", feature = "sqlite"))]
197 #[error(transparent)]
198 Sqlx(#[from] sqlx::Error),
199
200 #[error(transparent)]
201 Sea(#[from] sea_orm::DbErr),
202
203 #[error(transparent)]
204 Io(#[from] std::io::Error),
205
206 #[error(transparent)]
208 Lock(#[from] advisory_locks::DbLockError),
209
210 #[error(transparent)]
211 Other(#[from] anyhow::Error),
212
213 #[error("Cannot create non-transactional connection inside an active transaction")]
238 ConnRequestedInsideTx,
239}
240
241impl From<modkit_utils::var_expand::ExpandVarsError> for DbError {
242 fn from(err: modkit_utils::var_expand::ExpandVarsError) -> Self {
243 match err {
244 modkit_utils::var_expand::ExpandVarsError::Var { name, source } => {
245 Self::EnvVar { name, source }
246 }
247 modkit_utils::var_expand::ExpandVarsError::Regex(msg) => Self::InvalidParameter(msg),
248 }
249 }
250}
251
252impl From<crate::secure::ScopeError> for DbError {
253 fn from(value: crate::secure::ScopeError) -> Self {
254 DbError::Other(anyhow::Error::new(value))
257 }
258}
259
260#[derive(Clone, Copy, Debug, PartialEq, Eq)]
262pub enum DbEngine {
263 Postgres,
264 MySql,
265 Sqlite,
266}
267
268#[derive(Clone, Debug)]
271pub struct ConnectOpts {
272 pub max_conns: Option<u32>,
274 pub min_conns: Option<u32>,
276 pub acquire_timeout: Option<Duration>,
278 pub idle_timeout: Option<Duration>,
280 pub max_lifetime: Option<Duration>,
282 pub test_before_acquire: bool,
284 pub create_sqlite_dirs: bool,
286}
287impl Default for ConnectOpts {
288 fn default() -> Self {
289 Self {
290 max_conns: Some(10),
291 min_conns: None,
292 acquire_timeout: Some(Duration::from_secs(30)),
293 idle_timeout: None,
294 max_lifetime: None,
295 test_before_acquire: false,
296
297 create_sqlite_dirs: true,
298 }
299 }
300}
301
302#[derive(Debug, Clone)]
304pub(crate) struct DbHandle {
305 engine: DbEngine,
306 dsn: String,
307 sea: DatabaseConnection,
308}
309
310#[cfg(feature = "sqlite")]
311const DEFAULT_SQLITE_BUSY_TIMEOUT: i32 = 5000;
312
313impl DbHandle {
314 pub(crate) fn detect(dsn: &str) -> Result<DbEngine> {
321 let s = dsn.trim_start();
323
324 if s.starts_with("postgres://") || s.starts_with("postgresql://") {
327 Ok(DbEngine::Postgres)
328 } else if s.starts_with("mysql://") {
329 Ok(DbEngine::MySql)
330 } else if s.starts_with("sqlite:") || s.starts_with("sqlite://") {
331 Ok(DbEngine::Sqlite)
332 } else {
333 Err(DbError::UnknownDsn(dsn.to_owned()))
334 }
335 }
336
337 pub(crate) async fn connect(dsn: &str, opts: ConnectOpts) -> Result<Self> {
342 let engine = Self::detect(dsn)?;
343 match engine {
344 #[cfg(feature = "pg")]
345 DbEngine::Postgres => {
346 let o = PgPoolOptions::new().apply(&opts);
347 let pool = o.connect(dsn).await?;
348 let sea = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);
349 Ok(Self {
350 engine,
351 dsn: dsn.to_owned(),
352 sea,
353 })
354 }
355 #[cfg(not(feature = "pg"))]
356 DbEngine::Postgres => Err(DbError::FeatureDisabled("PostgreSQL feature not enabled")),
357 #[cfg(feature = "mysql")]
358 DbEngine::MySql => {
359 let o = MySqlPoolOptions::new().apply(&opts);
360 let pool = o.connect(dsn).await?;
361 let sea = SqlxMySqlConnector::from_sqlx_mysql_pool(pool);
362 Ok(Self {
363 engine,
364 dsn: dsn.to_owned(),
365 sea,
366 })
367 }
368 #[cfg(not(feature = "mysql"))]
369 DbEngine::MySql => Err(DbError::FeatureDisabled("MySQL feature not enabled")),
370 #[cfg(feature = "sqlite")]
371 DbEngine::Sqlite => {
372 let dsn = prepare_sqlite_path(dsn, opts.create_sqlite_dirs)?;
373
374 let (clean_dsn, pairs) = extract_sqlite_pragmas(&dsn);
376 let pragmas = Pragmas::from_pairs(&pairs);
377
378 let o = SqlitePoolOptions::new().apply(&opts);
380
381 let is_memory = is_memory_dsn(&clean_dsn);
383 let mut conn_opts = sqlx::sqlite::SqliteConnectOptions::from_str(&clean_dsn)?;
384
385 let journal_mode = if let Some(mode) = &pragmas.journal_mode {
386 match mode {
387 sqlite::pragmas::JournalMode::Delete => {
388 sqlx::sqlite::SqliteJournalMode::Delete
389 }
390 sqlite::pragmas::JournalMode::Wal => sqlx::sqlite::SqliteJournalMode::Wal,
391 sqlite::pragmas::JournalMode::Memory => {
392 sqlx::sqlite::SqliteJournalMode::Memory
393 }
394 sqlite::pragmas::JournalMode::Truncate => {
395 sqlx::sqlite::SqliteJournalMode::Truncate
396 }
397 sqlite::pragmas::JournalMode::Persist => {
398 sqlx::sqlite::SqliteJournalMode::Persist
399 }
400 sqlite::pragmas::JournalMode::Off => sqlx::sqlite::SqliteJournalMode::Off,
401 }
402 } else if let Some(wal_toggle) = pragmas.wal_toggle {
403 if wal_toggle {
404 sqlx::sqlite::SqliteJournalMode::Wal
405 } else {
406 sqlx::sqlite::SqliteJournalMode::Delete
407 }
408 } else if is_memory {
409 sqlx::sqlite::SqliteJournalMode::Delete
410 } else {
411 sqlx::sqlite::SqliteJournalMode::Wal
412 };
413 conn_opts = conn_opts.journal_mode(journal_mode);
414
415 let sync_mode = pragmas.synchronous.as_ref().map_or(
416 sqlx::sqlite::SqliteSynchronous::Normal,
417 |s| match s {
418 sqlite::pragmas::SyncMode::Off => sqlx::sqlite::SqliteSynchronous::Off,
419 sqlite::pragmas::SyncMode::Normal => {
420 sqlx::sqlite::SqliteSynchronous::Normal
421 }
422 sqlite::pragmas::SyncMode::Full => sqlx::sqlite::SqliteSynchronous::Full,
423 sqlite::pragmas::SyncMode::Extra => sqlx::sqlite::SqliteSynchronous::Extra,
424 },
425 );
426 conn_opts = conn_opts.synchronous(sync_mode);
427
428 if !is_memory {
429 let busy_timeout_ms_i64 = pragmas
430 .busy_timeout_ms
431 .unwrap_or(DEFAULT_SQLITE_BUSY_TIMEOUT.into())
432 .max(0);
433 let busy_timeout_ms = u64::try_from(busy_timeout_ms_i64).unwrap_or(0);
434 conn_opts =
435 conn_opts.busy_timeout(std::time::Duration::from_millis(busy_timeout_ms));
436 }
437
438 let pool = o.connect_with(conn_opts).await?;
439 let sea = SqlxSqliteConnector::from_sqlx_sqlite_pool(pool);
440
441 Ok(Self {
442 engine,
443 dsn: clean_dsn,
444 sea,
445 })
446 }
447 #[cfg(not(feature = "sqlite"))]
448 DbEngine::Sqlite => Err(DbError::FeatureDisabled("SQLite feature not enabled")),
449 }
450 }
451
452 #[must_use]
454 pub fn engine(&self) -> DbEngine {
455 self.engine
456 }
457
458 #[must_use]
460 pub fn dsn(&self) -> &str {
461 &self.dsn
462 }
463
464 #[must_use]
483 pub(crate) fn sea_internal(&self) -> DatabaseConnection {
484 self.sea.clone()
485 }
486
487 #[must_use]
495 pub(crate) fn sea_internal_ref(&self) -> &DatabaseConnection {
496 &self.sea
497 }
498
499 pub async fn lock(&self, module: &str, key: &str) -> Result<DbLockGuard> {
506 let lock_manager = advisory_locks::LockManager::new(self.dsn.clone());
507 let guard = lock_manager.lock(module, key).await?;
508 Ok(guard)
509 }
510
511 pub async fn try_lock(
516 &self,
517 module: &str,
518 key: &str,
519 config: LockConfig,
520 ) -> Result<Option<DbLockGuard>> {
521 let lock_manager = advisory_locks::LockManager::new(self.dsn.clone());
522 let res = lock_manager.try_lock(module, key, config).await?;
523 Ok(res)
524 }
525
526 }
529
530#[cfg(test)]
533#[cfg_attr(coverage_nightly, coverage(off))]
534mod tests {
535 use super::*;
536 #[cfg(feature = "sqlite")]
537 use tokio::time::Duration;
538
539 #[cfg(feature = "sqlite")]
540 #[tokio::test]
541 async fn test_sqlite_connection() -> Result<()> {
542 let dsn = "sqlite::memory:";
543 let opts = ConnectOpts::default();
544 let db = DbHandle::connect(dsn, opts).await?;
545 assert_eq!(db.engine(), DbEngine::Sqlite);
546 Ok(())
547 }
548
549 #[cfg(feature = "sqlite")]
550 #[tokio::test]
551 async fn test_sqlite_connection_with_pragma_parameters() -> Result<()> {
552 let dsn = "sqlite::memory:?wal=true&synchronous=NORMAL&busy_timeout=5000&journal_mode=WAL";
554 let opts = ConnectOpts::default();
555 let db = DbHandle::connect(dsn, opts).await?;
556 assert_eq!(db.engine(), DbEngine::Sqlite);
557
558 assert!(db.dsn == "sqlite::memory:" || db.dsn.starts_with("sqlite::memory:"));
561
562 Ok(())
563 }
564
565 #[tokio::test]
566 async fn test_backend_detection() {
567 assert_eq!(
568 DbHandle::detect("sqlite::memory:").unwrap(),
569 DbEngine::Sqlite
570 );
571 assert_eq!(
572 DbHandle::detect("postgres://localhost/test").unwrap(),
573 DbEngine::Postgres
574 );
575 assert_eq!(
576 DbHandle::detect("mysql://localhost/test").unwrap(),
577 DbEngine::MySql
578 );
579 assert!(DbHandle::detect("unknown://test").is_err());
580 }
581
582 #[cfg(feature = "sqlite")]
583 #[tokio::test]
584 async fn test_advisory_lock_sqlite() -> Result<()> {
585 let dsn = "sqlite:file:memdb1?mode=memory&cache=shared";
586 let db = DbHandle::connect(dsn, ConnectOpts::default()).await?;
587
588 let now = std::time::SystemTime::now()
589 .duration_since(std::time::UNIX_EPOCH)
590 .map_or(0, |d| d.as_nanos());
591 let test_id = format!("test_basic_{now}");
592
593 let guard1 = db.lock("test_module", &format!("{test_id}_key1")).await?;
594 let _guard2 = db.lock("test_module", &format!("{test_id}_key2")).await?;
595 let _guard3 = db
596 .lock("different_module", &format!("{test_id}_key1"))
597 .await?;
598
599 guard1.release().await;
601 let _guard4 = db.lock("test_module", &format!("{test_id}_key1")).await?;
602 Ok(())
603 }
604
605 #[cfg(feature = "sqlite")]
606 #[tokio::test]
607 async fn test_advisory_lock_different_keys() -> Result<()> {
608 let dsn = "sqlite:file:memdb_diff_keys?mode=memory&cache=shared";
609 let db = DbHandle::connect(dsn, ConnectOpts::default()).await?;
610
611 let now = std::time::SystemTime::now()
612 .duration_since(std::time::UNIX_EPOCH)
613 .map_or(0, |d| d.as_nanos());
614 let test_id = format!("test_diff_{now}");
615
616 let _guard1 = db.lock("test_module", &format!("{test_id}_key1")).await?;
617 let _guard2 = db.lock("test_module", &format!("{test_id}_key2")).await?;
618 let _guard3 = db.lock("other_module", &format!("{test_id}_key1")).await?;
619 Ok(())
620 }
621
622 #[cfg(feature = "sqlite")]
623 #[tokio::test]
624 async fn test_try_lock_with_config() -> Result<()> {
625 let dsn = "sqlite:file:memdb2?mode=memory&cache=shared";
626 let db = DbHandle::connect(dsn, ConnectOpts::default()).await?;
627
628 let now = std::time::SystemTime::now()
629 .duration_since(std::time::UNIX_EPOCH)
630 .map_or(0, |d| d.as_nanos());
631 let test_id = format!("test_config_{now}");
632
633 let _guard1 = db.lock("test_module", &format!("{test_id}_key")).await?;
634
635 let config = LockConfig {
636 max_wait: Some(Duration::from_millis(200)),
637 initial_backoff: Duration::from_millis(50),
638 max_attempts: Some(3),
639 ..Default::default()
640 };
641
642 let result = db
643 .try_lock("test_module", &format!("{test_id}_different_key"), config)
644 .await?;
645 assert!(
646 result.is_some(),
647 "expected lock acquisition for different key"
648 );
649 Ok(())
650 }
651
652 #[cfg(feature = "sqlite")]
653 #[tokio::test]
654 async fn test_sea_internal_access() -> Result<()> {
655 let dsn = "sqlite::memory:";
656 let db = DbHandle::connect(dsn, ConnectOpts::default()).await?;
657
658 let _raw = db.sea_internal();
660 Ok(())
661 }
662}