1#![cfg_attr(coverage_nightly, feature(coverage_attribute))]
2#![cfg_attr(
59 not(any(feature = "pg", feature = "mysql", feature = "sqlite")),
60 allow(
61 unused_imports,
62 unused_variables,
63 dead_code,
64 unreachable_code,
65 unused_lifetimes,
66 clippy::unused_async,
67 )
68)]
69
70pub use advisory_locks::{DbLockGuard, LockConfig};
72
73pub use sea_orm_migration;
75
76pub mod advisory_locks;
78pub mod config;
79pub mod deadlock;
80pub mod manager;
81pub mod migration_runner;
82pub mod odata;
83pub mod options;
84
85#[cfg(feature = "preview-outbox")]
86pub mod outbox;
87pub mod secure;
88
89mod db_provider;
90
91mod pool_opts;
93#[cfg(feature = "sqlite")]
94mod sqlite;
95
96pub use config::{DbConnConfig, GlobalDatabaseConfig, PoolCfg};
98pub use manager::DbManager;
99pub use options::redact_credentials_in_dsn;
100
101pub use secure::{Db, DbConn, DbTx};
103
104pub use db_provider::DBProvider;
106
107pub async fn connect_db(dsn: &str, opts: ConnectOpts) -> Result<Db> {
115 let handle = DbHandle::connect(dsn, opts).await?;
116 Ok(Db::new(handle))
117}
118
119pub async fn build_db(cfg: DbConnConfig, global: Option<&GlobalDatabaseConfig>) -> Result<Db> {
125 let handle = options::build_db_handle(cfg, global).await?;
126 Ok(Db::new(handle))
127}
128
129use std::time::Duration;
130
131#[cfg(any(feature = "pg", feature = "mysql", feature = "sqlite"))]
133use pool_opts::ApplyPoolOpts;
134#[cfg(feature = "sqlite")]
135use sqlite::{Pragmas, extract_sqlite_pragmas, is_memory_dsn, prepare_sqlite_path};
136
137#[cfg(feature = "mysql")]
140use sqlx::mysql::MySqlPoolOptions;
141#[cfg(feature = "pg")]
142use sqlx::postgres::PgPoolOptions;
143#[cfg(feature = "sqlite")]
144use sqlx::sqlite::SqlitePoolOptions;
145#[cfg(feature = "sqlite")]
146use std::str::FromStr;
147
148use sea_orm::DatabaseConnection;
149#[cfg(feature = "mysql")]
150use sea_orm::SqlxMySqlConnector;
151#[cfg(feature = "pg")]
152use sea_orm::SqlxPostgresConnector;
153#[cfg(feature = "sqlite")]
154use sea_orm::SqlxSqliteConnector;
155
156use thiserror::Error;
157
158pub type Result<T> = std::result::Result<T, DbError>;
160
161#[derive(Debug, Error)]
163pub enum DbError {
164 #[error("Unknown DSN: {0}")]
165 UnknownDsn(String),
166
167 #[error("Feature not enabled: {0}")]
168 FeatureDisabled(&'static str),
169
170 #[error("Invalid configuration: {0}")]
171 InvalidConfig(String),
172
173 #[error("Configuration conflict: {0}")]
174 ConfigConflict(String),
175
176 #[error("Invalid SQLite PRAGMA parameter '{key}': {message}")]
177 InvalidSqlitePragma { key: String, message: String },
178
179 #[error("Unknown SQLite PRAGMA parameter: {0}")]
180 UnknownSqlitePragma(String),
181
182 #[error("Invalid connection parameter: {0}")]
183 InvalidParameter(String),
184
185 #[error("SQLite pragma error: {0}")]
186 SqlitePragma(String),
187
188 #[error("Environment variable '{name}': {source}")]
189 EnvVar {
190 name: String,
191 source: std::env::VarError,
192 },
193
194 #[error("URL parsing error: {0}")]
195 UrlParse(#[from] url::ParseError),
196
197 #[cfg(any(feature = "pg", feature = "mysql", feature = "sqlite"))]
198 #[error(transparent)]
199 Sqlx(#[from] sqlx::Error),
200
201 #[error(transparent)]
202 Sea(#[from] sea_orm::DbErr),
203
204 #[error(transparent)]
205 Io(#[from] std::io::Error),
206
207 #[error(transparent)]
209 Lock(#[from] advisory_locks::DbLockError),
210
211 #[error(transparent)]
212 Other(#[from] anyhow::Error),
213
214 #[error("Cannot create non-transactional connection inside an active transaction")]
239 ConnRequestedInsideTx,
240}
241
242impl From<modkit_utils::var_expand::ExpandVarsError> for DbError {
243 fn from(err: modkit_utils::var_expand::ExpandVarsError) -> Self {
244 match err {
245 modkit_utils::var_expand::ExpandVarsError::Var { name, source } => {
246 Self::EnvVar { name, source }
247 }
248 modkit_utils::var_expand::ExpandVarsError::Regex(msg) => Self::InvalidParameter(msg),
249 }
250 }
251}
252
253impl From<crate::secure::ScopeError> for DbError {
254 fn from(value: crate::secure::ScopeError) -> Self {
255 DbError::Other(anyhow::Error::new(value))
258 }
259}
260
261#[derive(Clone, Copy, Debug, PartialEq, Eq)]
263pub enum DbEngine {
264 Postgres,
265 MySql,
266 Sqlite,
267}
268
269#[derive(Clone, Debug)]
272pub struct ConnectOpts {
273 pub max_conns: Option<u32>,
275 pub min_conns: Option<u32>,
277 pub acquire_timeout: Option<Duration>,
279 pub idle_timeout: Option<Duration>,
281 pub max_lifetime: Option<Duration>,
283 pub test_before_acquire: bool,
285 pub create_sqlite_dirs: bool,
287}
288impl Default for ConnectOpts {
289 fn default() -> Self {
290 Self {
291 max_conns: Some(10),
292 min_conns: None,
293 acquire_timeout: Some(Duration::from_secs(30)),
294 idle_timeout: None,
295 max_lifetime: None,
296 test_before_acquire: false,
297
298 create_sqlite_dirs: true,
299 }
300 }
301}
302
303#[derive(Debug, Clone)]
305pub(crate) struct DbHandle {
306 engine: DbEngine,
307 dsn: String,
308 sea: DatabaseConnection,
309}
310
311#[cfg(feature = "sqlite")]
312const DEFAULT_SQLITE_BUSY_TIMEOUT: i32 = 5000;
313
314impl DbHandle {
315 pub(crate) fn detect(dsn: &str) -> Result<DbEngine> {
322 let s = dsn.trim_start();
324
325 if s.starts_with("postgres://") || s.starts_with("postgresql://") {
328 Ok(DbEngine::Postgres)
329 } else if s.starts_with("mysql://") {
330 Ok(DbEngine::MySql)
331 } else if s.starts_with("sqlite:") || s.starts_with("sqlite://") {
332 Ok(DbEngine::Sqlite)
333 } else {
334 Err(DbError::UnknownDsn(dsn.to_owned()))
335 }
336 }
337
338 pub(crate) async fn connect(dsn: &str, opts: ConnectOpts) -> Result<Self> {
343 let engine = Self::detect(dsn)?;
344 match engine {
345 #[cfg(feature = "pg")]
346 DbEngine::Postgres => {
347 let o = PgPoolOptions::new().apply(&opts);
348 let pool = o.connect(dsn).await?;
349 let sea = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);
350 Ok(Self {
351 engine,
352 dsn: dsn.to_owned(),
353 sea,
354 })
355 }
356 #[cfg(not(feature = "pg"))]
357 DbEngine::Postgres => Err(DbError::FeatureDisabled("PostgreSQL feature not enabled")),
358 #[cfg(feature = "mysql")]
359 DbEngine::MySql => {
360 let o = MySqlPoolOptions::new().apply(&opts);
361 let pool = o.connect(dsn).await?;
362 let sea = SqlxMySqlConnector::from_sqlx_mysql_pool(pool);
363 Ok(Self {
364 engine,
365 dsn: dsn.to_owned(),
366 sea,
367 })
368 }
369 #[cfg(not(feature = "mysql"))]
370 DbEngine::MySql => Err(DbError::FeatureDisabled("MySQL feature not enabled")),
371 #[cfg(feature = "sqlite")]
372 DbEngine::Sqlite => {
373 let dsn = prepare_sqlite_path(dsn, opts.create_sqlite_dirs)?;
374
375 let (clean_dsn, pairs) = extract_sqlite_pragmas(&dsn);
377 let pragmas = Pragmas::from_pairs(&pairs);
378
379 let o = SqlitePoolOptions::new().apply(&opts);
381
382 let is_memory = is_memory_dsn(&clean_dsn);
384 let mut conn_opts = sqlx::sqlite::SqliteConnectOptions::from_str(&clean_dsn)?;
385
386 let journal_mode = if let Some(mode) = &pragmas.journal_mode {
387 match mode {
388 sqlite::pragmas::JournalMode::Delete => {
389 sqlx::sqlite::SqliteJournalMode::Delete
390 }
391 sqlite::pragmas::JournalMode::Wal => sqlx::sqlite::SqliteJournalMode::Wal,
392 sqlite::pragmas::JournalMode::Memory => {
393 sqlx::sqlite::SqliteJournalMode::Memory
394 }
395 sqlite::pragmas::JournalMode::Truncate => {
396 sqlx::sqlite::SqliteJournalMode::Truncate
397 }
398 sqlite::pragmas::JournalMode::Persist => {
399 sqlx::sqlite::SqliteJournalMode::Persist
400 }
401 sqlite::pragmas::JournalMode::Off => sqlx::sqlite::SqliteJournalMode::Off,
402 }
403 } else if let Some(wal_toggle) = pragmas.wal_toggle {
404 if wal_toggle {
405 sqlx::sqlite::SqliteJournalMode::Wal
406 } else {
407 sqlx::sqlite::SqliteJournalMode::Delete
408 }
409 } else if is_memory {
410 sqlx::sqlite::SqliteJournalMode::Delete
411 } else {
412 sqlx::sqlite::SqliteJournalMode::Wal
413 };
414 conn_opts = conn_opts.journal_mode(journal_mode);
415
416 let sync_mode = pragmas.synchronous.as_ref().map_or(
417 sqlx::sqlite::SqliteSynchronous::Normal,
418 |s| match s {
419 sqlite::pragmas::SyncMode::Off => sqlx::sqlite::SqliteSynchronous::Off,
420 sqlite::pragmas::SyncMode::Normal => {
421 sqlx::sqlite::SqliteSynchronous::Normal
422 }
423 sqlite::pragmas::SyncMode::Full => sqlx::sqlite::SqliteSynchronous::Full,
424 sqlite::pragmas::SyncMode::Extra => sqlx::sqlite::SqliteSynchronous::Extra,
425 },
426 );
427 conn_opts = conn_opts.synchronous(sync_mode);
428
429 if !is_memory {
430 let busy_timeout_ms_i64 = pragmas
431 .busy_timeout_ms
432 .unwrap_or(DEFAULT_SQLITE_BUSY_TIMEOUT.into())
433 .max(0);
434 let busy_timeout_ms = u64::try_from(busy_timeout_ms_i64).unwrap_or(0);
435 conn_opts =
436 conn_opts.busy_timeout(std::time::Duration::from_millis(busy_timeout_ms));
437 }
438
439 let pool = o.connect_with(conn_opts).await?;
440 let sea = SqlxSqliteConnector::from_sqlx_sqlite_pool(pool);
441
442 Ok(Self {
443 engine,
444 dsn: clean_dsn,
445 sea,
446 })
447 }
448 #[cfg(not(feature = "sqlite"))]
449 DbEngine::Sqlite => Err(DbError::FeatureDisabled("SQLite feature not enabled")),
450 }
451 }
452
453 #[must_use]
455 pub fn engine(&self) -> DbEngine {
456 self.engine
457 }
458
459 #[must_use]
461 pub fn dsn(&self) -> &str {
462 &self.dsn
463 }
464
465 #[must_use]
484 pub(crate) fn sea_internal(&self) -> DatabaseConnection {
485 self.sea.clone()
486 }
487
488 #[must_use]
496 pub(crate) fn sea_internal_ref(&self) -> &DatabaseConnection {
497 &self.sea
498 }
499
500 pub async fn lock(&self, module: &str, key: &str) -> Result<DbLockGuard> {
507 let lock_manager = advisory_locks::LockManager::new(self.dsn.clone());
508 let guard = lock_manager.lock(module, key).await?;
509 Ok(guard)
510 }
511
512 pub async fn try_lock(
517 &self,
518 module: &str,
519 key: &str,
520 config: LockConfig,
521 ) -> Result<Option<DbLockGuard>> {
522 let lock_manager = advisory_locks::LockManager::new(self.dsn.clone());
523 let res = lock_manager.try_lock(module, key, config).await?;
524 Ok(res)
525 }
526
527 }
530
531#[cfg(test)]
534#[cfg_attr(coverage_nightly, coverage(off))]
535mod tests {
536 use super::*;
537 #[cfg(feature = "sqlite")]
538 use tokio::time::Duration;
539
540 #[cfg(feature = "sqlite")]
541 #[tokio::test]
542 async fn test_sqlite_connection() -> Result<()> {
543 let dsn = "sqlite::memory:";
544 let opts = ConnectOpts::default();
545 let db = DbHandle::connect(dsn, opts).await?;
546 assert_eq!(db.engine(), DbEngine::Sqlite);
547 Ok(())
548 }
549
550 #[cfg(feature = "sqlite")]
551 #[tokio::test]
552 async fn test_sqlite_connection_with_pragma_parameters() -> Result<()> {
553 let dsn = "sqlite::memory:?wal=true&synchronous=NORMAL&busy_timeout=5000&journal_mode=WAL";
555 let opts = ConnectOpts::default();
556 let db = DbHandle::connect(dsn, opts).await?;
557 assert_eq!(db.engine(), DbEngine::Sqlite);
558
559 assert!(db.dsn == "sqlite::memory:" || db.dsn.starts_with("sqlite::memory:"));
562
563 Ok(())
564 }
565
566 #[tokio::test]
567 async fn test_backend_detection() {
568 assert_eq!(
569 DbHandle::detect("sqlite::memory:").unwrap(),
570 DbEngine::Sqlite
571 );
572 assert_eq!(
573 DbHandle::detect("postgres://localhost/test").unwrap(),
574 DbEngine::Postgres
575 );
576 assert_eq!(
577 DbHandle::detect("mysql://localhost/test").unwrap(),
578 DbEngine::MySql
579 );
580 assert!(DbHandle::detect("unknown://test").is_err());
581 }
582
583 #[cfg(feature = "sqlite")]
584 #[tokio::test]
585 async fn test_advisory_lock_sqlite() -> Result<()> {
586 let dsn = "sqlite:file:memdb1?mode=memory&cache=shared";
587 let db = DbHandle::connect(dsn, ConnectOpts::default()).await?;
588
589 let now = std::time::SystemTime::now()
590 .duration_since(std::time::UNIX_EPOCH)
591 .map_or(0, |d| d.as_nanos());
592 let test_id = format!("test_basic_{now}");
593
594 let guard1 = db.lock("test_module", &format!("{test_id}_key1")).await?;
595 let _guard2 = db.lock("test_module", &format!("{test_id}_key2")).await?;
596 let _guard3 = db
597 .lock("different_module", &format!("{test_id}_key1"))
598 .await?;
599
600 guard1.release().await;
602 let _guard4 = db.lock("test_module", &format!("{test_id}_key1")).await?;
603 Ok(())
604 }
605
606 #[cfg(feature = "sqlite")]
607 #[tokio::test]
608 async fn test_advisory_lock_different_keys() -> Result<()> {
609 let dsn = "sqlite:file:memdb_diff_keys?mode=memory&cache=shared";
610 let db = DbHandle::connect(dsn, ConnectOpts::default()).await?;
611
612 let now = std::time::SystemTime::now()
613 .duration_since(std::time::UNIX_EPOCH)
614 .map_or(0, |d| d.as_nanos());
615 let test_id = format!("test_diff_{now}");
616
617 let _guard1 = db.lock("test_module", &format!("{test_id}_key1")).await?;
618 let _guard2 = db.lock("test_module", &format!("{test_id}_key2")).await?;
619 let _guard3 = db.lock("other_module", &format!("{test_id}_key1")).await?;
620 Ok(())
621 }
622
623 #[cfg(feature = "sqlite")]
624 #[tokio::test]
625 async fn test_try_lock_with_config() -> Result<()> {
626 let dsn = "sqlite:file:memdb2?mode=memory&cache=shared";
627 let db = DbHandle::connect(dsn, ConnectOpts::default()).await?;
628
629 let now = std::time::SystemTime::now()
630 .duration_since(std::time::UNIX_EPOCH)
631 .map_or(0, |d| d.as_nanos());
632 let test_id = format!("test_config_{now}");
633
634 let _guard1 = db.lock("test_module", &format!("{test_id}_key")).await?;
635
636 let config = LockConfig {
637 max_wait: Some(Duration::from_millis(200)),
638 initial_backoff: Duration::from_millis(50),
639 max_attempts: Some(3),
640 ..Default::default()
641 };
642
643 let result = db
644 .try_lock("test_module", &format!("{test_id}_different_key"), config)
645 .await?;
646 assert!(
647 result.is_some(),
648 "expected lock acquisition for different key"
649 );
650 Ok(())
651 }
652
653 #[cfg(feature = "sqlite")]
654 #[tokio::test]
655 async fn test_sea_internal_access() -> Result<()> {
656 let dsn = "sqlite::memory:";
657 let db = DbHandle::connect(dsn, ConnectOpts::default()).await?;
658
659 let _raw = db.sea_internal();
661 Ok(())
662 }
663}