Skip to main content

modkit_db/secure/
db.rs

1//! Secure database handle and runner types.
2//!
3//! This module provides the primary entry point for secure database access:
4//!
5//! - [`Db`]: The database handle. NOT Clone, NOT storable by services.
6//! - [`DbConn`]: Non-transactional runner (borrows from `Db`).
7//! - [`DbTx`]: Transactional runner (lives inside transaction closure).
8//!
9//! # Security Model
10//!
11//! The transaction bypass vulnerability is prevented by multiple layers:
12//!
13//! 1. `Db` does NOT implement `Clone`
14//! 2. `Db::transaction(self, f)` consumes `self`, making it inaccessible inside the closure
15//! 3. Services receive `&impl DBRunner`, not `Db` or any factory
16//! 4. **Task-local guard**: `Db::conn()` fails if called inside a transaction
17//!
18//! The task-local guard provides defense-in-depth: even if code obtains a `Db`
19//! reference via another path (e.g., captured `Arc<AppServices>`), calling
20//! `conn()` will fail at runtime with `DbError::ConnRequestedInsideTx`.
21//!
22//! # Example
23//!
24//! ```ignore
25//! // In handler/entrypoint
26//! let db: Db = ctx.db()?;
27//!
28//! // Non-transactional path
29//! let conn = db.conn()?;  // Note: returns Result now
30//! let user = service.get_user(&conn, &scope, id).await?;
31//!
32//! // Transactional path
33//! let (db, result) = db.transaction(|tx| {
34//!     Box::pin(async move {
35//!         // Only `tx` is available here - `db` is consumed
36//!         // Calling some_db.conn() here would fail with ConnRequestedInsideTx
37//!         service.create_user(tx, &scope, data).await?;
38//!         Ok(user_id)
39//!     })
40//! }).await;
41//! let user_id = result?;
42//! ```
43
44use std::{cell::Cell, future::Future, pin::Pin, sync::Arc};
45
46use sea_orm::{DatabaseConnection, DatabaseTransaction, TransactionTrait};
47
48use super::tx_config::TxConfig;
49use super::tx_error::TxError;
50use crate::{DbError, DbHandle};
51
52/// Default attempt budget for [`Db::transaction_with_retry`].
53///
54/// Three attempts is the canonical "small bounded retry" used across
55/// Cyberfabric services that wrap writes in retry-aware transactions
56/// (closure-table mutations, hierarchy invariants, write paths under
57/// concurrent load). It balances:
58///
59/// - resilience against transient lock-contention failures detected by
60///   [`crate::contention::is_retryable_contention`] (`PostgreSQL`
61///   serialization failures / deadlocks, `MySQL`/`InnoDB` deadlocks,
62///   `SQLite` `BUSY` / `BUSY_SNAPSHOT`);
63/// - bounded latency on the hot path — no exponential backoff, just
64///   immediate retry of a guaranteed-stale transaction;
65/// - predictable failure semantics — after exhausting attempts the
66///   original error is returned, so callers can surface e.g.
67///   `503 Service Unavailable`.
68pub const DEFAULT_TX_RETRY_ATTEMPTS: u32 = 3;
69
70// Task-local guard to detect transaction bypass attempts.
71//
72// When set to `true`, any call to `Db::conn()` will fail with
73// `DbError::ConnRequestedInsideTx`. This prevents code from creating
74// non-transactional runners while inside a transaction closure.
75tokio::task_local! {
76    static IN_TX: Cell<bool>;
77}
78
79/// Check if we're currently inside a transaction context.
80///
81/// Returns `true` if a transaction is active in the current task.
82fn is_in_transaction() -> bool {
83    IN_TX.try_with(Cell::get).unwrap_or(false)
84}
85
86/// Execute a closure with the transaction guard set.
87///
88/// This sets `IN_TX = true` for the duration of the closure, ensuring
89/// that any calls to `Db::conn()` within will fail.
90async fn with_tx_guard<F, T>(f: F) -> T
91where
92    F: Future<Output = T>,
93{
94    IN_TX.scope(Cell::new(true), f).await
95}
96
97/// Database handle for secure operations.
98///
99/// # Security
100///
101/// This type is `Clone` to support ergonomic sharing in runtimes and service containers.
102/// Transaction-bypass is still prevented by the task-local guard: any attempt to call
103/// `conn()` inside a transaction closure fails with `DbError::ConnRequestedInsideTx`.
104///
105/// Services and repositories must NOT store this type. They should receive
106/// `&impl DBRunner` as a parameter to all methods that need database access.
107///
108/// # Usage
109///
110/// ```ignore
111/// // At the entrypoint (handler/command)
112/// let db: Db = ctx.db()?;
113///
114/// // Pass runner to service methods
115/// let conn = db.conn()?;
116/// let result = service.do_something(&conn, &scope).await?;
117///
118/// // Or use a transaction
119/// let (db, result) = db.transaction(|tx| {
120///     Box::pin(async move {
121///         service.do_something(tx, &scope).await
122///     })
123/// }).await;
124/// ```
125#[derive(Clone)]
126pub struct Db {
127    handle: Arc<DbHandle>,
128}
129
130impl std::fmt::Debug for Db {
131    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132        f.debug_struct("Db")
133            .field("engine", &self.handle.engine())
134            .finish_non_exhaustive()
135    }
136}
137
138impl Db {
139    /// **INTERNAL**: Create a new `Db` from an owned `DbHandle`.
140    ///
141    /// This is typically called by the runtime/context layer, not by service code.
142    #[must_use]
143    pub(crate) fn new(handle: DbHandle) -> Self {
144        Self {
145            handle: Arc::new(handle),
146        }
147    }
148
149    /// **INTERNAL**: Get a privileged `SeaORM` connection clone.
150    ///
151    /// This must not be exposed to module code. It exists for infrastructure
152    /// (migrations) inside `modkit-db`.
153    pub(crate) fn sea_internal(&self) -> DatabaseConnection {
154        self.handle.sea_internal()
155    }
156
157    /// Get a reference to the underlying `DbHandle`.
158    ///
159    /// # Security
160    ///
161    /// This is `pub(crate)` to allow internal infrastructure access (migrations, etc.)
162    /// but prevents service code from extracting the handle.
163    /// Create a non-transactional database runner.
164    ///
165    /// The returned `DbConn` borrows from `self`, ensuring that while a `DbConn`
166    /// exists, the `Db` cannot be used for other purposes (like starting a transaction).
167    ///
168    /// # Errors
169    ///
170    /// Returns `DbError::ConnRequestedInsideTx` if called from within a transaction
171    /// closure. This prevents the transaction bypass vulnerability where code could
172    /// create a non-transactional runner that persists writes outside the transaction.
173    ///
174    /// # Example
175    ///
176    /// ```ignore
177    /// let db: Db = ctx.db()?;
178    /// let conn = db.conn()?;
179    ///
180    /// // Use conn for queries
181    /// let users = Entity::find()
182    ///     .secure()
183    ///     .scope_with(&scope)
184    ///     .all(&conn)
185    ///     .await?;
186    /// ```
187    ///
188    /// The `Result` itself is `#[must_use]`; this method does not add an extra must-use
189    /// marker to avoid clippy `double_must_use`.
190    pub fn conn(&self) -> Result<DbConn<'_>, DbError> {
191        if is_in_transaction() {
192            return Err(DbError::ConnRequestedInsideTx);
193        }
194        Ok(DbConn {
195            conn: self.handle.sea_internal_ref(),
196        })
197    }
198
199    /// Database backend in use (`Postgres` / `MySql` / `Sqlite`).
200    ///
201    /// Required by [`crate::contention::is_retryable_contention`] to scope
202    /// retryable-error detection to the correct engine.
203    #[must_use]
204    pub fn backend(&self) -> sea_orm::DbBackend {
205        use sea_orm::ConnectionTrait;
206        self.handle.sea_internal_ref().get_database_backend()
207    }
208
209    // --- Advisory locks (forwarded, no `DbHandle` exposure) ---
210
211    /// Acquire an advisory lock with the given key and module namespace.
212    ///
213    /// # Errors
214    /// Returns an error if the lock cannot be acquired.
215    pub async fn lock(&self, module: &str, key: &str) -> crate::Result<crate::DbLockGuard> {
216        self.handle.lock(module, key).await
217    }
218
219    /// Try to acquire an advisory lock with configurable retry/backoff policy.
220    ///
221    /// # Errors
222    /// Returns an error if an unrecoverable lock error occurs.
223    pub async fn try_lock(
224        &self,
225        module: &str,
226        key: &str,
227        config: crate::LockConfig,
228    ) -> crate::Result<Option<crate::DbLockGuard>> {
229        self.handle.try_lock(module, key, config).await
230    }
231
232    /// Execute a closure inside a database transaction (borrowed form).
233    ///
234    /// This variant keeps the call site ergonomic for service containers that store a
235    /// reusable DB entrypoint (e.g. `DBProvider`) without exposing `DbHandle`.
236    ///
237    /// # Security
238    ///
239    /// The task-local guard is still enforced: any call to `Db::conn()` within the closure
240    /// will fail with `DbError::ConnRequestedInsideTx`.
241    ///
242    /// # Errors
243    ///
244    /// Returns `DbError` if:
245    /// - starting the transaction fails
246    /// - the closure returns an error
247    /// - commit fails (rollback is attempted on closure error)
248    pub async fn transaction_ref<F, T>(&self, f: F) -> Result<T, DbError>
249    where
250        F: for<'a> FnOnce(
251                &'a DbTx<'a>,
252            )
253                -> Pin<Box<dyn Future<Output = Result<T, DbError>> + Send + 'a>>
254            + Send,
255        T: Send + 'static,
256    {
257        let txn = self.handle.sea_internal_ref().begin().await?;
258        let tx = DbTx { tx: &txn };
259
260        // Run the closure with the transaction guard set
261        let res = with_tx_guard(f(&tx)).await;
262
263        match res {
264            Ok(v) => {
265                txn.commit().await?;
266                Ok(v)
267            }
268            Err(e) => {
269                _ = txn.rollback().await;
270                Err(e)
271            }
272        }
273    }
274
275    /// Execute a closure inside a database transaction, mapping infrastructure errors into `E`.
276    ///
277    /// This is the preferred building block for service-facing entrypoints (like `DBProvider`)
278    /// that must return **domain** errors while still acquiring connections internally.
279    ///
280    /// - The transaction closure returns `Result<T, E>` (domain error).
281    /// - Begin/commit failures are `DbError` and are mapped via `E: From<DbError>`.
282    ///
283    /// # Security
284    ///
285    /// The task-local guard is enforced for the duration of the closure.
286    ///
287    /// # Errors
288    ///
289    /// Returns `E` if:
290    /// - starting the transaction fails (mapped from `DbError`)
291    /// - the closure returns an error
292    /// - commit fails (mapped from `DbError`)
293    pub async fn transaction_ref_mapped<F, T, E>(&self, f: F) -> Result<T, E>
294    where
295        E: From<DbError> + Send + 'static,
296        F: for<'a> FnOnce(&'a DbTx<'a>) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>
297            + Send,
298        T: Send + 'static,
299    {
300        let txn = self
301            .handle
302            .sea_internal_ref()
303            .begin()
304            .await
305            .map_err(DbError::from)
306            .map_err(E::from)?;
307        let tx = DbTx { tx: &txn };
308
309        // Run the closure with the transaction guard set
310        let res = with_tx_guard(f(&tx)).await;
311
312        match res {
313            Ok(v) => {
314                txn.commit().await.map_err(DbError::from).map_err(E::from)?;
315                Ok(v)
316            }
317            Err(e) => {
318                _ = txn.rollback().await;
319                Err(e)
320            }
321        }
322    }
323
324    /// Execute a closure inside a database transaction with custom configuration
325    /// (isolation level, access mode), mapping infrastructure errors into `E`.
326    ///
327    /// This is the preferred building block for service-facing entrypoints (like `DBProvider`)
328    /// that must return **domain** errors and need non-default transaction settings
329    /// (e.g., `SERIALIZABLE` isolation).
330    ///
331    /// # Security
332    ///
333    /// The task-local guard is enforced for the duration of the closure.
334    ///
335    /// # Errors
336    ///
337    /// Returns `E` if:
338    /// - starting the transaction fails (mapped from `DbError`)
339    /// - the closure returns an error
340    /// - commit fails (mapped from `DbError`)
341    pub async fn transaction_ref_mapped_with_config<F, T, E>(
342        &self,
343        tx_config: TxConfig,
344        f: F,
345    ) -> Result<T, E>
346    where
347        E: From<DbError> + Send + 'static,
348        F: for<'a> FnOnce(&'a DbTx<'a>) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>
349            + Send,
350        T: Send + 'static,
351    {
352        use sea_orm::{AccessMode, IsolationLevel};
353
354        let isolation: Option<IsolationLevel> = tx_config.isolation.map(Into::into);
355        let access_mode: Option<AccessMode> = tx_config.access_mode.map(Into::into);
356
357        let txn = self
358            .handle
359            .sea_internal_ref()
360            .begin_with_config(isolation, access_mode)
361            .await
362            .map_err(DbError::from)
363            .map_err(E::from)?;
364        let tx = DbTx { tx: &txn };
365
366        // Run the closure with the transaction guard set
367        let res = with_tx_guard(f(&tx)).await;
368
369        match res {
370            Ok(v) => {
371                txn.commit().await.map_err(DbError::from).map_err(E::from)?;
372                Ok(v)
373            }
374            Err(e) => {
375                _ = txn.rollback().await;
376                Err(e)
377            }
378        }
379    }
380
381    /// Execute a closure inside a transaction with bounded retries on transient
382    /// lock-contention failures.
383    ///
384    /// Retry detection is delegated to [`crate::contention::is_retryable_contention`],
385    /// which is backend-aware (`PostgreSQL` serialization failure / deadlock,
386    /// `MySQL`/`InnoDB` deadlock, `SQLite` `BUSY` / `BUSY_SNAPSHOT`). The caller
387    /// supplies a small `extract_db_err` accessor that reaches into the domain
388    /// error `E` and returns the underlying [`sea_orm::DbErr`], if any — that is
389    /// the only piece of glue the helper needs from the caller.
390    ///
391    /// Uses [`DEFAULT_TX_RETRY_ATTEMPTS`] as the attempt budget. Use
392    /// [`Self::transaction_with_retry_max`] if you need to override the budget
393    /// (typically only in tests).
394    ///
395    /// # Parameters
396    ///
397    /// - `tx_config`: Transaction configuration (isolation level + access
398    ///   mode). The helper itself is isolation-agnostic — pick whichever
399    ///   level the operation needs:
400    ///   - [`TxConfig::default()`] — engine default. Right for retry on
401    ///     `SQLite` (BUSY) or `MySQL`/`InnoDB` (deadlocks happen at any
402    ///     isolation level), or for `PostgreSQL` work that doesn't need
403    ///     stronger guarantees than `READ COMMITTED`.
404    ///   - [`TxConfig::serializable()`] — full `SERIALIZABLE`. Right when
405    ///     the body relies on predicate-level invariants (closure-table
406    ///     mutations, hierarchy or uniqueness checks across rows that
407    ///     concurrent writers could insert), so that conflicting reads
408    ///     surface as `40001` and get retried by this helper.
409    ///   - Custom `TxConfig` (e.g. `RepeatableRead` + `ReadOnly`) for
410    ///     reporting paths that want repeatable snapshots without paying
411    ///     the cost of `SERIALIZABLE`.
412    /// - `extract_db_err`: Accessor returning `Some(&DbErr)` if the domain
413    ///   error wraps a database error, `None` otherwise. Returning `None`
414    ///   always short-circuits the retry loop (the failure is non-DB and
415    ///   is propagated immediately).
416    /// - `body`: The transactional work. Called with a fresh `&DbTx` per
417    ///   attempt. Each retry runs in a brand-new transaction, so the
418    ///   closure must be idempotent across attempts (any in-memory state
419    ///   mutated by an earlier attempt must be reset by the closure
420    ///   itself before re-running).
421    ///
422    /// # Behaviour
423    ///
424    /// 1. Begin a transaction with the given `tx_config` and invoke `body`.
425    /// 2. On `Ok`, commit and return.
426    /// 3. On `Err(e)`:
427    ///    - if `extract_db_err(&e)` yields a `DbErr` that
428    ///      [`crate::contention::is_retryable_contention`] flags as
429    ///      retryable for the active backend, and attempts remain, log at
430    ///      `WARN` and retry;
431    ///    - otherwise, return the error.
432    ///
433    /// On exhausting all attempts the **last** error is returned.
434    ///
435    /// # Errors
436    ///
437    /// Returns `E` if the transaction fails (after retries) or if any
438    /// infrastructure error mapped from `DbError` occurs.
439    ///
440    /// # Example
441    ///
442    /// ```ignore
443    /// let result: Result<MyType, MyError> = db
444    ///     .transaction_with_retry(
445    ///         TxConfig::serializable(),
446    ///         MyError::db_err, // fn(&MyError) -> Option<&sea_orm::DbErr>
447    ///         |tx| Box::pin(async move {
448    ///             repo.do_atomic_work(tx).await?;
449    ///             Ok(MyType::default())
450    ///         }),
451    ///     )
452    ///     .await;
453    /// ```
454    pub async fn transaction_with_retry<T, E, X, F>(
455        &self,
456        tx_config: TxConfig,
457        extract_db_err: X,
458        body: F,
459    ) -> Result<T, E>
460    where
461        E: From<DbError> + Send + 'static,
462        T: Send + 'static,
463        X: Fn(&E) -> Option<&sea_orm::DbErr> + Send,
464        F: for<'a> FnMut(&'a DbTx<'a>) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>
465            + Send,
466    {
467        self.transaction_with_retry_max(tx_config, DEFAULT_TX_RETRY_ATTEMPTS, extract_db_err, body)
468            .await
469    }
470
471    /// Like [`Self::transaction_with_retry`] but with an explicit attempt
472    /// budget. See that method for behaviour and parameter semantics.
473    ///
474    /// `max_attempts` includes the first try (so `1` disables retries). Values
475    /// below `1` are clamped to `1`. Production code should call the default
476    /// variant instead of hard-coding a number here; this method exists mainly
477    /// for tests and for the rare case where a service has a justified reason
478    /// to deviate from the workspace-wide default.
479    ///
480    /// # Errors
481    ///
482    /// Returns `E` if the transaction fails (after retries) or if any
483    /// infrastructure error mapped from `DbError` occurs.
484    pub async fn transaction_with_retry_max<T, E, X, F>(
485        &self,
486        tx_config: TxConfig,
487        max_attempts: u32,
488        extract_db_err: X,
489        mut body: F,
490    ) -> Result<T, E>
491    where
492        E: From<DbError> + Send + 'static,
493        T: Send + 'static,
494        X: Fn(&E) -> Option<&sea_orm::DbErr> + Send,
495        F: for<'a> FnMut(&'a DbTx<'a>) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>
496            + Send,
497    {
498        let max = max_attempts.max(1);
499        let backend = self.backend();
500        let mut attempt: u32 = 1;
501
502        loop {
503            let result = self
504                .transaction_ref_mapped_with_config(tx_config.clone(), |tx| body(tx))
505                .await;
506
507            match result {
508                Ok(value) => return Ok(value),
509                Err(e) => {
510                    let retryable = extract_db_err(&e).is_some_and(|db_err| {
511                        crate::contention::is_retryable_contention(backend, db_err)
512                    });
513                    if retryable && attempt < max {
514                        tracing::warn!(
515                            attempt,
516                            max_attempts = max,
517                            "retrying transaction after retryable failure"
518                        );
519                        attempt += 1;
520                        continue;
521                    }
522                    return Err(e);
523                }
524            }
525        }
526    }
527
528    /// Execute a closure inside a database transaction.
529    ///
530    /// # Security
531    ///
532    /// This method **consumes** `self` and returns it after the transaction completes.
533    /// This is critical for security: inside the closure, the original `Db` is not
534    /// accessible, so code cannot call `db.conn()` to create a non-transactional runner.
535    ///
536    /// Additionally, a task-local guard is set during the transaction, so any call
537    /// to `conn()` on *any* `Db` instance will fail with `DbError::ConnRequestedInsideTx`.
538    ///
539    /// # Example
540    ///
541    /// ```ignore
542    /// let db: Db = ctx.db()?;
543    ///
544    /// let (db, result) = db.transaction(|tx| {
545    ///     Box::pin(async move {
546    ///         // Only `tx` is available here
547    ///         service.create_user(tx, &scope, data).await?;
548    ///         Ok(user_id)
549    ///     })
550    /// }).await;
551    ///
552    /// let user_id = result?;
553    /// ```
554    ///
555    /// # Returns
556    ///
557    /// Returns `(Self, Result<T>)` where:
558    /// - `Self` is always returned (even on error) so the connection can be reused
559    /// - `Result<T>` contains the transaction result or error
560    pub async fn transaction<F, T>(self, f: F) -> (Self, anyhow::Result<T>)
561    where
562        F: for<'a> FnOnce(
563                &'a DbTx<'a>,
564            )
565                -> Pin<Box<dyn Future<Output = anyhow::Result<T>> + Send + 'a>>
566            + Send,
567        T: Send + 'static,
568    {
569        let txn = match self.handle.sea_internal_ref().begin().await {
570            Ok(t) => t,
571            Err(e) => return (self, Err(e.into())),
572        };
573        let tx = DbTx { tx: &txn };
574
575        // Run the closure with the transaction guard set
576        let res = with_tx_guard(f(&tx)).await;
577
578        match res {
579            Ok(v) => match txn.commit().await {
580                Ok(()) => (self, Ok(v)),
581                Err(e) => (self, Err(e.into())),
582            },
583            Err(e) => {
584                _ = txn.rollback().await;
585                (self, Err(e))
586            }
587        }
588    }
589
590    /// Execute a transaction with typed domain errors.
591    ///
592    /// This variant separates infrastructure errors (connection issues, commit failures)
593    /// from domain errors returned by the closure.
594    ///
595    /// # Example
596    ///
597    /// ```ignore
598    /// let (db, result) = db.in_transaction(|tx| {
599    ///     Box::pin(async move {
600    ///         service.create_user(tx, &scope, data).await
601    ///     })
602    /// }).await;
603    ///
604    /// match result {
605    ///     Ok(user) => println!("Created: {:?}", user),
606    ///     Err(TxError::Domain(e)) => println!("Business error: {}", e),
607    ///     Err(TxError::Infra(e)) => println!("DB error: {}", e),
608    /// }
609    /// ```
610    pub async fn in_transaction<T, E, F>(self, f: F) -> (Self, Result<T, TxError<E>>)
611    where
612        T: Send + 'static,
613        E: std::fmt::Debug + std::fmt::Display + Send + 'static,
614        F: for<'a> FnOnce(&'a DbTx<'a>) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>
615            + Send,
616    {
617        use super::tx_error::InfraError;
618
619        let txn = match self.handle.sea_internal_ref().begin().await {
620            Ok(txn) => txn,
621            Err(e) => return (self, Err(TxError::Infra(InfraError::new(e.to_string())))),
622        };
623
624        let tx = DbTx { tx: &txn };
625
626        // Run the closure with the transaction guard set
627        let res = with_tx_guard(f(&tx)).await;
628
629        match res {
630            Ok(v) => match txn.commit().await {
631                Ok(()) => (self, Ok(v)),
632                Err(e) => (self, Err(TxError::Infra(InfraError::new(e.to_string())))),
633            },
634            Err(e) => {
635                _ = txn.rollback().await;
636                (self, Err(TxError::Domain(e)))
637            }
638        }
639    }
640
641    /// Execute a transaction with custom configuration (isolation level, access mode).
642    ///
643    /// # Example
644    ///
645    /// ```ignore
646    /// use modkit_db::secure::{TxConfig, TxIsolationLevel};
647    ///
648    /// let config = TxConfig {
649    ///     isolation: Some(TxIsolationLevel::Serializable),
650    ///     access_mode: None,
651    /// };
652    ///
653    /// let (db, result) = db.transaction_with_config(config, |tx| {
654    ///     Box::pin(async move {
655    ///         // Serializable isolation
656    ///         service.reconcile(tx, &scope).await
657    ///     })
658    /// }).await;
659    /// ```
660    pub async fn transaction_with_config<T, F>(
661        self,
662        config: TxConfig,
663        f: F,
664    ) -> (Self, anyhow::Result<T>)
665    where
666        T: Send + 'static,
667        F: for<'a> FnOnce(
668                &'a DbTx<'a>,
669            )
670                -> Pin<Box<dyn Future<Output = anyhow::Result<T>> + Send + 'a>>
671            + Send,
672    {
673        use sea_orm::{AccessMode, IsolationLevel};
674
675        let isolation: Option<IsolationLevel> = config.isolation.map(Into::into);
676        let access_mode: Option<AccessMode> = config.access_mode.map(Into::into);
677
678        let txn = match self
679            .handle
680            .sea_internal_ref()
681            .begin_with_config(isolation, access_mode)
682            .await
683        {
684            Ok(t) => t,
685            Err(e) => return (self, Err(e.into())),
686        };
687        let tx = DbTx { tx: &txn };
688
689        // Run the closure with the transaction guard set
690        let res = with_tx_guard(f(&tx)).await;
691
692        match res {
693            Ok(v) => match txn.commit().await {
694                Ok(()) => (self, Ok(v)),
695                Err(e) => (self, Err(e.into())),
696            },
697            Err(e) => {
698                _ = txn.rollback().await;
699                (self, Err(e))
700            }
701        }
702    }
703
704    /// Return database engine identifier for logging/tracing.
705    #[must_use]
706    pub fn db_engine(&self) -> &'static str {
707        use sea_orm::{ConnectionTrait, DbBackend};
708
709        match self.handle.sea_internal_ref().get_database_backend() {
710            DbBackend::Postgres => "postgres",
711            DbBackend::MySql => "mysql",
712            DbBackend::Sqlite => "sqlite",
713        }
714    }
715}
716
717/// Non-transactional database runner.
718///
719/// This type borrows from a [`Db`] and can be used to execute queries outside
720/// of a transaction context.
721///
722/// # Security
723///
724/// - NOT `Clone`: Cannot be duplicated
725/// - Borrows from `Db`: While `DbConn` exists, the `Db` cannot start a transaction
726/// - Cannot be constructed by user code: Only `Db::conn()` creates it
727///
728/// # Example
729///
730/// ```ignore
731/// let conn = db.conn()?;
732///
733/// let users = Entity::find()
734///     .secure()
735///     .scope_with(&scope)
736///     .all(&conn)
737///     .await?;
738/// ```
739pub struct DbConn<'a> {
740    pub(crate) conn: &'a DatabaseConnection,
741}
742
743impl std::fmt::Debug for DbConn<'_> {
744    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
745        f.debug_struct("DbConn").finish_non_exhaustive()
746    }
747}
748
749/// Transactional database runner.
750///
751/// This type is only available inside a transaction closure and represents
752/// an active database transaction.
753///
754/// # Security
755///
756/// - NOT `Clone`: Cannot be duplicated
757/// - Lifetime-bound: Cannot escape the transaction closure
758/// - Cannot be constructed by user code: Only `Db::transaction()` creates it
759///
760/// # Example
761///
762/// ```ignore
763/// let (db, result) = db.transaction(|tx| {
764///     Box::pin(async move {
765///         Entity::insert(model)
766///             .secure()
767///             .scope_with_model(&scope, &model)?
768///             .exec(tx)
769///             .await?;
770///         Ok(())
771///     })
772/// }).await;
773/// ```
774pub struct DbTx<'a> {
775    pub(crate) tx: &'a DatabaseTransaction,
776}
777
778impl std::fmt::Debug for DbTx<'_> {
779    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
780        f.debug_struct("DbTx").finish_non_exhaustive()
781    }
782}
783
784// NOTE: tests for `Db` live under `libs/modkit-db/tests/` so they can be gated per-backend
785// without creating feature-specific unused-import warnings in this module.