Skip to main content

modkit_db/secure/
db.rs

1//! Secure database handle and runner types.
2//!
3//! This module provides the primary entry point for secure database access:
4//!
5//! - [`Db`]: The database handle. NOT Clone, NOT storable by services.
6//! - [`DbConn`]: Non-transactional runner (borrows from `Db`).
7//! - [`DbTx`]: Transactional runner (lives inside transaction closure).
8//!
9//! # Security Model
10//!
11//! The transaction bypass vulnerability is prevented by multiple layers:
12//!
13//! 1. `Db` does NOT implement `Clone`
14//! 2. `Db::transaction(self, f)` consumes `self`, making it inaccessible inside the closure
15//! 3. Services receive `&impl DBRunner`, not `Db` or any factory
16//! 4. **Task-local guard**: `Db::conn()` fails if called inside a transaction
17//!
18//! The task-local guard provides defense-in-depth: even if code obtains a `Db`
19//! reference via another path (e.g., captured `Arc<AppServices>`), calling
20//! `conn()` will fail at runtime with `DbError::ConnRequestedInsideTx`.
21//!
22//! # Example
23//!
24//! ```ignore
25//! // In handler/entrypoint
26//! let db: Db = ctx.db()?;
27//!
28//! // Non-transactional path
29//! let conn = db.conn()?;  // Note: returns Result now
30//! let user = service.get_user(&conn, &scope, id).await?;
31//!
32//! // Transactional path
33//! let (db, result) = db.transaction(|tx| {
34//!     Box::pin(async move {
35//!         // Only `tx` is available here - `db` is consumed
36//!         // Calling some_db.conn() here would fail with ConnRequestedInsideTx
37//!         service.create_user(tx, &scope, data).await?;
38//!         Ok(user_id)
39//!     })
40//! }).await;
41//! let user_id = result?;
42//! ```
43
44use std::{cell::Cell, future::Future, pin::Pin, sync::Arc};
45
46use sea_orm::{DatabaseConnection, DatabaseTransaction, TransactionTrait};
47
48use super::tx_config::TxConfig;
49use super::tx_error::TxError;
50use crate::{DbError, DbHandle};
51
52// Task-local guard to detect transaction bypass attempts.
53//
54// When set to `true`, any call to `Db::conn()` will fail with
55// `DbError::ConnRequestedInsideTx`. This prevents code from creating
56// non-transactional runners while inside a transaction closure.
57tokio::task_local! {
58    static IN_TX: Cell<bool>;
59}
60
61/// Check if we're currently inside a transaction context.
62///
63/// Returns `true` if a transaction is active in the current task.
64fn is_in_transaction() -> bool {
65    IN_TX.try_with(Cell::get).unwrap_or(false)
66}
67
68/// Execute a closure with the transaction guard set.
69///
70/// This sets `IN_TX = true` for the duration of the closure, ensuring
71/// that any calls to `Db::conn()` within will fail.
72async fn with_tx_guard<F, T>(f: F) -> T
73where
74    F: Future<Output = T>,
75{
76    IN_TX.scope(Cell::new(true), f).await
77}
78
79/// Database handle for secure operations.
80///
81/// # Security
82///
83/// This type is `Clone` to support ergonomic sharing in runtimes and service containers.
84/// Transaction-bypass is still prevented by the task-local guard: any attempt to call
85/// `conn()` inside a transaction closure fails with `DbError::ConnRequestedInsideTx`.
86///
87/// Services and repositories must NOT store this type. They should receive
88/// `&impl DBRunner` as a parameter to all methods that need database access.
89///
90/// # Usage
91///
92/// ```ignore
93/// // At the entrypoint (handler/command)
94/// let db: Db = ctx.db()?;
95///
96/// // Pass runner to service methods
97/// let conn = db.conn()?;
98/// let result = service.do_something(&conn, &scope).await?;
99///
100/// // Or use a transaction
101/// let (db, result) = db.transaction(|tx| {
102///     Box::pin(async move {
103///         service.do_something(tx, &scope).await
104///     })
105/// }).await;
106/// ```
107#[derive(Clone)]
108pub struct Db {
109    handle: Arc<DbHandle>,
110}
111
112impl std::fmt::Debug for Db {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        f.debug_struct("Db")
115            .field("engine", &self.handle.engine())
116            .finish_non_exhaustive()
117    }
118}
119
120impl Db {
121    /// **INTERNAL**: Create a new `Db` from an owned `DbHandle`.
122    ///
123    /// This is typically called by the runtime/context layer, not by service code.
124    #[must_use]
125    pub(crate) fn new(handle: DbHandle) -> Self {
126        Self {
127            handle: Arc::new(handle),
128        }
129    }
130
131    /// **INTERNAL**: Get a privileged `SeaORM` connection clone.
132    ///
133    /// This must not be exposed to module code. It exists for infrastructure
134    /// (migrations) inside `modkit-db`.
135    pub(crate) fn sea_internal(&self) -> DatabaseConnection {
136        self.handle.sea_internal()
137    }
138
139    /// Get a reference to the underlying `DbHandle`.
140    ///
141    /// # Security
142    ///
143    /// This is `pub(crate)` to allow internal infrastructure access (migrations, etc.)
144    /// but prevents service code from extracting the handle.
145    /// Create a non-transactional database runner.
146    ///
147    /// The returned `DbConn` borrows from `self`, ensuring that while a `DbConn`
148    /// exists, the `Db` cannot be used for other purposes (like starting a transaction).
149    ///
150    /// # Errors
151    ///
152    /// Returns `DbError::ConnRequestedInsideTx` if called from within a transaction
153    /// closure. This prevents the transaction bypass vulnerability where code could
154    /// create a non-transactional runner that persists writes outside the transaction.
155    ///
156    /// # Example
157    ///
158    /// ```ignore
159    /// let db: Db = ctx.db()?;
160    /// let conn = db.conn()?;
161    ///
162    /// // Use conn for queries
163    /// let users = Entity::find()
164    ///     .secure()
165    ///     .scope_with(&scope)
166    ///     .all(&conn)
167    ///     .await?;
168    /// ```
169    ///
170    /// The `Result` itself is `#[must_use]`; this method does not add an extra must-use
171    /// marker to avoid clippy `double_must_use`.
172    pub fn conn(&self) -> Result<DbConn<'_>, DbError> {
173        if is_in_transaction() {
174            return Err(DbError::ConnRequestedInsideTx);
175        }
176        Ok(DbConn {
177            conn: self.handle.sea_internal_ref(),
178        })
179    }
180
181    // --- Advisory locks (forwarded, no `DbHandle` exposure) ---
182
183    /// Acquire an advisory lock with the given key and module namespace.
184    ///
185    /// # Errors
186    /// Returns an error if the lock cannot be acquired.
187    pub async fn lock(&self, module: &str, key: &str) -> crate::Result<crate::DbLockGuard> {
188        self.handle.lock(module, key).await
189    }
190
191    /// Try to acquire an advisory lock with configurable retry/backoff policy.
192    ///
193    /// # Errors
194    /// Returns an error if an unrecoverable lock error occurs.
195    pub async fn try_lock(
196        &self,
197        module: &str,
198        key: &str,
199        config: crate::LockConfig,
200    ) -> crate::Result<Option<crate::DbLockGuard>> {
201        self.handle.try_lock(module, key, config).await
202    }
203
204    /// Execute a closure inside a database transaction (borrowed form).
205    ///
206    /// This variant keeps the call site ergonomic for service containers that store a
207    /// reusable DB entrypoint (e.g. `DBProvider`) without exposing `DbHandle`.
208    ///
209    /// # Security
210    ///
211    /// The task-local guard is still enforced: any call to `Db::conn()` within the closure
212    /// will fail with `DbError::ConnRequestedInsideTx`.
213    ///
214    /// # Errors
215    ///
216    /// Returns `DbError` if:
217    /// - starting the transaction fails
218    /// - the closure returns an error
219    /// - commit fails (rollback is attempted on closure error)
220    pub async fn transaction_ref<F, T>(&self, f: F) -> Result<T, DbError>
221    where
222        F: for<'a> FnOnce(
223                &'a DbTx<'a>,
224            )
225                -> Pin<Box<dyn Future<Output = Result<T, DbError>> + Send + 'a>>
226            + Send,
227        T: Send + 'static,
228    {
229        let txn = self.handle.sea_internal_ref().begin().await?;
230        let tx = DbTx { tx: &txn };
231
232        // Run the closure with the transaction guard set
233        let res = with_tx_guard(f(&tx)).await;
234
235        match res {
236            Ok(v) => {
237                txn.commit().await?;
238                Ok(v)
239            }
240            Err(e) => {
241                _ = txn.rollback().await;
242                Err(e)
243            }
244        }
245    }
246
247    /// Execute a closure inside a database transaction, mapping infrastructure errors into `E`.
248    ///
249    /// This is the preferred building block for service-facing entrypoints (like `DBProvider`)
250    /// that must return **domain** errors while still acquiring connections internally.
251    ///
252    /// - The transaction closure returns `Result<T, E>` (domain error).
253    /// - Begin/commit failures are `DbError` and are mapped via `E: From<DbError>`.
254    ///
255    /// # Security
256    ///
257    /// The task-local guard is enforced for the duration of the closure.
258    ///
259    /// # Errors
260    ///
261    /// Returns `E` if:
262    /// - starting the transaction fails (mapped from `DbError`)
263    /// - the closure returns an error
264    /// - commit fails (mapped from `DbError`)
265    pub async fn transaction_ref_mapped<F, T, E>(&self, f: F) -> Result<T, E>
266    where
267        E: From<DbError> + Send + 'static,
268        F: for<'a> FnOnce(&'a DbTx<'a>) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>
269            + Send,
270        T: Send + 'static,
271    {
272        let txn = self
273            .handle
274            .sea_internal_ref()
275            .begin()
276            .await
277            .map_err(DbError::from)
278            .map_err(E::from)?;
279        let tx = DbTx { tx: &txn };
280
281        // Run the closure with the transaction guard set
282        let res = with_tx_guard(f(&tx)).await;
283
284        match res {
285            Ok(v) => {
286                txn.commit().await.map_err(DbError::from).map_err(E::from)?;
287                Ok(v)
288            }
289            Err(e) => {
290                _ = txn.rollback().await;
291                Err(e)
292            }
293        }
294    }
295
296    /// Execute a closure inside a database transaction with custom configuration
297    /// (isolation level, access mode), mapping infrastructure errors into `E`.
298    ///
299    /// This is the preferred building block for service-facing entrypoints (like `DBProvider`)
300    /// that must return **domain** errors and need non-default transaction settings
301    /// (e.g., `SERIALIZABLE` isolation).
302    ///
303    /// # Security
304    ///
305    /// The task-local guard is enforced for the duration of the closure.
306    ///
307    /// # Errors
308    ///
309    /// Returns `E` if:
310    /// - starting the transaction fails (mapped from `DbError`)
311    /// - the closure returns an error
312    /// - commit fails (mapped from `DbError`)
313    pub async fn transaction_ref_mapped_with_config<F, T, E>(
314        &self,
315        config: TxConfig,
316        f: F,
317    ) -> Result<T, E>
318    where
319        E: From<DbError> + Send + 'static,
320        F: for<'a> FnOnce(&'a DbTx<'a>) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>
321            + Send,
322        T: Send + 'static,
323    {
324        use sea_orm::{AccessMode, IsolationLevel};
325
326        let isolation: Option<IsolationLevel> = config.isolation.map(Into::into);
327        let access_mode: Option<AccessMode> = config.access_mode.map(Into::into);
328
329        let txn = self
330            .handle
331            .sea_internal_ref()
332            .begin_with_config(isolation, access_mode)
333            .await
334            .map_err(DbError::from)
335            .map_err(E::from)?;
336        let tx = DbTx { tx: &txn };
337
338        // Run the closure with the transaction guard set
339        let res = with_tx_guard(f(&tx)).await;
340
341        match res {
342            Ok(v) => {
343                txn.commit().await.map_err(DbError::from).map_err(E::from)?;
344                Ok(v)
345            }
346            Err(e) => {
347                _ = txn.rollback().await;
348                Err(e)
349            }
350        }
351    }
352
353    /// Execute a closure inside a database transaction.
354    ///
355    /// # Security
356    ///
357    /// This method **consumes** `self` and returns it after the transaction completes.
358    /// This is critical for security: inside the closure, the original `Db` is not
359    /// accessible, so code cannot call `db.conn()` to create a non-transactional runner.
360    ///
361    /// Additionally, a task-local guard is set during the transaction, so any call
362    /// to `conn()` on *any* `Db` instance will fail with `DbError::ConnRequestedInsideTx`.
363    ///
364    /// # Example
365    ///
366    /// ```ignore
367    /// let db: Db = ctx.db()?;
368    ///
369    /// let (db, result) = db.transaction(|tx| {
370    ///     Box::pin(async move {
371    ///         // Only `tx` is available here
372    ///         service.create_user(tx, &scope, data).await?;
373    ///         Ok(user_id)
374    ///     })
375    /// }).await;
376    ///
377    /// let user_id = result?;
378    /// ```
379    ///
380    /// # Returns
381    ///
382    /// Returns `(Self, Result<T>)` where:
383    /// - `Self` is always returned (even on error) so the connection can be reused
384    /// - `Result<T>` contains the transaction result or error
385    pub async fn transaction<F, T>(self, f: F) -> (Self, anyhow::Result<T>)
386    where
387        F: for<'a> FnOnce(
388                &'a DbTx<'a>,
389            )
390                -> Pin<Box<dyn Future<Output = anyhow::Result<T>> + Send + 'a>>
391            + Send,
392        T: Send + 'static,
393    {
394        let txn = match self.handle.sea_internal_ref().begin().await {
395            Ok(t) => t,
396            Err(e) => return (self, Err(e.into())),
397        };
398        let tx = DbTx { tx: &txn };
399
400        // Run the closure with the transaction guard set
401        let res = with_tx_guard(f(&tx)).await;
402
403        match res {
404            Ok(v) => match txn.commit().await {
405                Ok(()) => (self, Ok(v)),
406                Err(e) => (self, Err(e.into())),
407            },
408            Err(e) => {
409                _ = txn.rollback().await;
410                (self, Err(e))
411            }
412        }
413    }
414
415    /// Execute a transaction with typed domain errors.
416    ///
417    /// This variant separates infrastructure errors (connection issues, commit failures)
418    /// from domain errors returned by the closure.
419    ///
420    /// # Example
421    ///
422    /// ```ignore
423    /// let (db, result) = db.in_transaction(|tx| {
424    ///     Box::pin(async move {
425    ///         service.create_user(tx, &scope, data).await
426    ///     })
427    /// }).await;
428    ///
429    /// match result {
430    ///     Ok(user) => println!("Created: {:?}", user),
431    ///     Err(TxError::Domain(e)) => println!("Business error: {}", e),
432    ///     Err(TxError::Infra(e)) => println!("DB error: {}", e),
433    /// }
434    /// ```
435    pub async fn in_transaction<T, E, F>(self, f: F) -> (Self, Result<T, TxError<E>>)
436    where
437        T: Send + 'static,
438        E: std::fmt::Debug + std::fmt::Display + Send + 'static,
439        F: for<'a> FnOnce(&'a DbTx<'a>) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>
440            + Send,
441    {
442        use super::tx_error::InfraError;
443
444        let txn = match self.handle.sea_internal_ref().begin().await {
445            Ok(txn) => txn,
446            Err(e) => return (self, Err(TxError::Infra(InfraError::new(e.to_string())))),
447        };
448
449        let tx = DbTx { tx: &txn };
450
451        // Run the closure with the transaction guard set
452        let res = with_tx_guard(f(&tx)).await;
453
454        match res {
455            Ok(v) => match txn.commit().await {
456                Ok(()) => (self, Ok(v)),
457                Err(e) => (self, Err(TxError::Infra(InfraError::new(e.to_string())))),
458            },
459            Err(e) => {
460                _ = txn.rollback().await;
461                (self, Err(TxError::Domain(e)))
462            }
463        }
464    }
465
466    /// Execute a transaction with custom configuration (isolation level, access mode).
467    ///
468    /// # Example
469    ///
470    /// ```ignore
471    /// use modkit_db::secure::{TxConfig, TxIsolationLevel};
472    ///
473    /// let config = TxConfig {
474    ///     isolation: Some(TxIsolationLevel::Serializable),
475    ///     access_mode: None,
476    /// };
477    ///
478    /// let (db, result) = db.transaction_with_config(config, |tx| {
479    ///     Box::pin(async move {
480    ///         // Serializable isolation
481    ///         service.reconcile(tx, &scope).await
482    ///     })
483    /// }).await;
484    /// ```
485    pub async fn transaction_with_config<T, F>(
486        self,
487        config: TxConfig,
488        f: F,
489    ) -> (Self, anyhow::Result<T>)
490    where
491        T: Send + 'static,
492        F: for<'a> FnOnce(
493                &'a DbTx<'a>,
494            )
495                -> Pin<Box<dyn Future<Output = anyhow::Result<T>> + Send + 'a>>
496            + Send,
497    {
498        use sea_orm::{AccessMode, IsolationLevel};
499
500        let isolation: Option<IsolationLevel> = config.isolation.map(Into::into);
501        let access_mode: Option<AccessMode> = config.access_mode.map(Into::into);
502
503        let txn = match self
504            .handle
505            .sea_internal_ref()
506            .begin_with_config(isolation, access_mode)
507            .await
508        {
509            Ok(t) => t,
510            Err(e) => return (self, Err(e.into())),
511        };
512        let tx = DbTx { tx: &txn };
513
514        // Run the closure with the transaction guard set
515        let res = with_tx_guard(f(&tx)).await;
516
517        match res {
518            Ok(v) => match txn.commit().await {
519                Ok(()) => (self, Ok(v)),
520                Err(e) => (self, Err(e.into())),
521            },
522            Err(e) => {
523                _ = txn.rollback().await;
524                (self, Err(e))
525            }
526        }
527    }
528
529    /// Return database engine identifier for logging/tracing.
530    #[must_use]
531    pub fn db_engine(&self) -> &'static str {
532        use sea_orm::{ConnectionTrait, DbBackend};
533
534        match self.handle.sea_internal_ref().get_database_backend() {
535            DbBackend::Postgres => "postgres",
536            DbBackend::MySql => "mysql",
537            DbBackend::Sqlite => "sqlite",
538        }
539    }
540}
541
542/// Non-transactional database runner.
543///
544/// This type borrows from a [`Db`] and can be used to execute queries outside
545/// of a transaction context.
546///
547/// # Security
548///
549/// - NOT `Clone`: Cannot be duplicated
550/// - Borrows from `Db`: While `DbConn` exists, the `Db` cannot start a transaction
551/// - Cannot be constructed by user code: Only `Db::conn()` creates it
552///
553/// # Example
554///
555/// ```ignore
556/// let conn = db.conn()?;
557///
558/// let users = Entity::find()
559///     .secure()
560///     .scope_with(&scope)
561///     .all(&conn)
562///     .await?;
563/// ```
564pub struct DbConn<'a> {
565    pub(crate) conn: &'a DatabaseConnection,
566}
567
568impl std::fmt::Debug for DbConn<'_> {
569    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
570        f.debug_struct("DbConn").finish_non_exhaustive()
571    }
572}
573
574/// Transactional database runner.
575///
576/// This type is only available inside a transaction closure and represents
577/// an active database transaction.
578///
579/// # Security
580///
581/// - NOT `Clone`: Cannot be duplicated
582/// - Lifetime-bound: Cannot escape the transaction closure
583/// - Cannot be constructed by user code: Only `Db::transaction()` creates it
584///
585/// # Example
586///
587/// ```ignore
588/// let (db, result) = db.transaction(|tx| {
589///     Box::pin(async move {
590///         Entity::insert(model)
591///             .secure()
592///             .scope_with_model(&scope, &model)?
593///             .exec(tx)
594///             .await?;
595///         Ok(())
596///     })
597/// }).await;
598/// ```
599pub struct DbTx<'a> {
600    pub(crate) tx: &'a DatabaseTransaction,
601}
602
603impl std::fmt::Debug for DbTx<'_> {
604    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
605        f.debug_struct("DbTx").finish_non_exhaustive()
606    }
607}
608
609// NOTE: tests for `Db` live under `libs/modkit-db/tests/` so they can be gated per-backend
610// without creating feature-specific unused-import warnings in this module.