Skip to main content

modkit_db/secure/
db.rs

1//! Secure database handle and runner types.
2//!
3//! This module provides the primary entry point for secure database access:
4//!
5//! - [`Db`]: The database handle. NOT Clone, NOT storable by services.
6//! - [`DbConn`]: Non-transactional runner (borrows from `Db`).
7//! - [`DbTx`]: Transactional runner (lives inside transaction closure).
8//!
9//! # Security Model
10//!
11//! The transaction bypass vulnerability is prevented by multiple layers:
12//!
13//! 1. `Db` does NOT implement `Clone`
14//! 2. `Db::transaction(self, f)` consumes `self`, making it inaccessible inside the closure
15//! 3. Services receive `&impl DBRunner`, not `Db` or any factory
16//! 4. **Task-local guard**: `Db::conn()` fails if called inside a transaction
17//!
18//! The task-local guard provides defense-in-depth: even if code obtains a `Db`
19//! reference via another path (e.g., captured `Arc<AppServices>`), calling
20//! `conn()` will fail at runtime with `DbError::ConnRequestedInsideTx`.
21//!
22//! # Example
23//!
24//! ```ignore
25//! // In handler/entrypoint
26//! let db: Db = ctx.db()?;
27//!
28//! // Non-transactional path
29//! let conn = db.conn()?;  // Note: returns Result now
30//! let user = service.get_user(&conn, &scope, id).await?;
31//!
32//! // Transactional path
33//! let (db, result) = db.transaction(|tx| {
34//!     Box::pin(async move {
35//!         // Only `tx` is available here - `db` is consumed
36//!         // Calling some_db.conn() here would fail with ConnRequestedInsideTx
37//!         service.create_user(tx, &scope, data).await?;
38//!         Ok(user_id)
39//!     })
40//! }).await;
41//! let user_id = result?;
42//! ```
43
44use std::{cell::Cell, future::Future, pin::Pin, sync::Arc};
45
46use sea_orm::{DatabaseConnection, DatabaseTransaction, TransactionTrait};
47
48use super::tx_config::TxConfig;
49use super::tx_error::TxError;
50use crate::{DbError, DbHandle};
51
52// Task-local guard to detect transaction bypass attempts.
53//
54// When set to `true`, any call to `Db::conn()` will fail with
55// `DbError::ConnRequestedInsideTx`. This prevents code from creating
56// non-transactional runners while inside a transaction closure.
57tokio::task_local! {
58    static IN_TX: Cell<bool>;
59}
60
61/// Check if we're currently inside a transaction context.
62///
63/// Returns `true` if a transaction is active in the current task.
64fn is_in_transaction() -> bool {
65    IN_TX.try_with(Cell::get).unwrap_or(false)
66}
67
68/// Execute a closure with the transaction guard set.
69///
70/// This sets `IN_TX = true` for the duration of the closure, ensuring
71/// that any calls to `Db::conn()` within will fail.
72async fn with_tx_guard<F, T>(f: F) -> T
73where
74    F: Future<Output = T>,
75{
76    IN_TX.scope(Cell::new(true), f).await
77}
78
79/// Database handle for secure operations.
80///
81/// # Security
82///
83/// This type is `Clone` to support ergonomic sharing in runtimes and service containers.
84/// Transaction-bypass is still prevented by the task-local guard: any attempt to call
85/// `conn()` inside a transaction closure fails with `DbError::ConnRequestedInsideTx`.
86///
87/// Services and repositories must NOT store this type. They should receive
88/// `&impl DBRunner` as a parameter to all methods that need database access.
89///
90/// # Usage
91///
92/// ```ignore
93/// // At the entrypoint (handler/command)
94/// let db: Db = ctx.db()?;
95///
96/// // Pass runner to service methods
97/// let conn = db.conn()?;
98/// let result = service.do_something(&conn, &scope).await?;
99///
100/// // Or use a transaction
101/// let (db, result) = db.transaction(|tx| {
102///     Box::pin(async move {
103///         service.do_something(tx, &scope).await
104///     })
105/// }).await;
106/// ```
107#[derive(Clone)]
108pub struct Db {
109    handle: Arc<DbHandle>,
110}
111
112impl std::fmt::Debug for Db {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        f.debug_struct("Db")
115            .field("engine", &self.handle.engine())
116            .finish_non_exhaustive()
117    }
118}
119
120impl Db {
121    /// **INTERNAL**: Create a new `Db` from an owned `DbHandle`.
122    ///
123    /// This is typically called by the runtime/context layer, not by service code.
124    #[must_use]
125    pub(crate) fn new(handle: DbHandle) -> Self {
126        Self {
127            handle: Arc::new(handle),
128        }
129    }
130
131    /// **INTERNAL**: Get a privileged `SeaORM` connection clone.
132    ///
133    /// This must not be exposed to module code. It exists for infrastructure
134    /// (migrations) inside `modkit-db`.
135    pub(crate) fn sea_internal(&self) -> DatabaseConnection {
136        self.handle.sea_internal()
137    }
138
139    /// Get a reference to the underlying `DbHandle`.
140    ///
141    /// # Security
142    ///
143    /// This is `pub(crate)` to allow internal infrastructure access (migrations, etc.)
144    /// but prevents service code from extracting the handle.
145    /// Create a non-transactional database runner.
146    ///
147    /// The returned `DbConn` borrows from `self`, ensuring that while a `DbConn`
148    /// exists, the `Db` cannot be used for other purposes (like starting a transaction).
149    ///
150    /// # Errors
151    ///
152    /// Returns `DbError::ConnRequestedInsideTx` if called from within a transaction
153    /// closure. This prevents the transaction bypass vulnerability where code could
154    /// create a non-transactional runner that persists writes outside the transaction.
155    ///
156    /// # Example
157    ///
158    /// ```ignore
159    /// let db: Db = ctx.db()?;
160    /// let conn = db.conn()?;
161    ///
162    /// // Use conn for queries
163    /// let users = Entity::find()
164    ///     .secure()
165    ///     .scope_with(&scope)
166    ///     .all(&conn)
167    ///     .await?;
168    /// ```
169    ///
170    /// The `Result` itself is `#[must_use]`; this method does not add an extra must-use
171    /// marker to avoid clippy `double_must_use`.
172    pub fn conn(&self) -> Result<DbConn<'_>, DbError> {
173        if is_in_transaction() {
174            return Err(DbError::ConnRequestedInsideTx);
175        }
176        Ok(DbConn {
177            conn: self.handle.sea_internal_ref(),
178        })
179    }
180
181    // --- Advisory locks (forwarded, no `DbHandle` exposure) ---
182
183    /// Acquire an advisory lock with the given key and module namespace.
184    ///
185    /// # Errors
186    /// Returns an error if the lock cannot be acquired.
187    pub async fn lock(&self, module: &str, key: &str) -> crate::Result<crate::DbLockGuard> {
188        self.handle.lock(module, key).await
189    }
190
191    /// Try to acquire an advisory lock with configurable retry/backoff policy.
192    ///
193    /// # Errors
194    /// Returns an error if an unrecoverable lock error occurs.
195    pub async fn try_lock(
196        &self,
197        module: &str,
198        key: &str,
199        config: crate::LockConfig,
200    ) -> crate::Result<Option<crate::DbLockGuard>> {
201        self.handle.try_lock(module, key, config).await
202    }
203
204    /// Execute a closure inside a database transaction (borrowed form).
205    ///
206    /// This variant keeps the call site ergonomic for service containers that store a
207    /// reusable DB entrypoint (e.g. `DBProvider`) without exposing `DbHandle`.
208    ///
209    /// # Security
210    ///
211    /// The task-local guard is still enforced: any call to `Db::conn()` within the closure
212    /// will fail with `DbError::ConnRequestedInsideTx`.
213    ///
214    /// # Errors
215    ///
216    /// Returns `DbError` if:
217    /// - starting the transaction fails
218    /// - the closure returns an error
219    /// - commit fails (rollback is attempted on closure error)
220    pub async fn transaction_ref<F, T>(&self, f: F) -> Result<T, DbError>
221    where
222        F: for<'a> FnOnce(
223                &'a DbTx<'a>,
224            )
225                -> Pin<Box<dyn Future<Output = Result<T, DbError>> + Send + 'a>>
226            + Send,
227        T: Send + 'static,
228    {
229        let txn = self.handle.sea_internal_ref().begin().await?;
230        let tx = DbTx { tx: &txn };
231
232        // Run the closure with the transaction guard set
233        let res = with_tx_guard(f(&tx)).await;
234
235        match res {
236            Ok(v) => {
237                txn.commit().await?;
238                Ok(v)
239            }
240            Err(e) => {
241                let _ = txn.rollback().await;
242                Err(e)
243            }
244        }
245    }
246
247    /// Execute a closure inside a database transaction, mapping infrastructure errors into `E`.
248    ///
249    /// This is the preferred building block for service-facing entrypoints (like `DBProvider`)
250    /// that must return **domain** errors while still acquiring connections internally.
251    ///
252    /// - The transaction closure returns `Result<T, E>` (domain error).
253    /// - Begin/commit failures are `DbError` and are mapped via `E: From<DbError>`.
254    ///
255    /// # Security
256    ///
257    /// The task-local guard is enforced for the duration of the closure.
258    ///
259    /// # Errors
260    ///
261    /// Returns `E` if:
262    /// - starting the transaction fails (mapped from `DbError`)
263    /// - the closure returns an error
264    /// - commit fails (mapped from `DbError`)
265    pub async fn transaction_ref_mapped<F, T, E>(&self, f: F) -> Result<T, E>
266    where
267        E: From<DbError> + Send + 'static,
268        F: for<'a> FnOnce(&'a DbTx<'a>) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>
269            + Send,
270        T: Send + 'static,
271    {
272        let txn = self
273            .handle
274            .sea_internal_ref()
275            .begin()
276            .await
277            .map_err(DbError::from)
278            .map_err(E::from)?;
279        let tx = DbTx { tx: &txn };
280
281        // Run the closure with the transaction guard set
282        let res = with_tx_guard(f(&tx)).await;
283
284        match res {
285            Ok(v) => {
286                txn.commit().await.map_err(DbError::from).map_err(E::from)?;
287                Ok(v)
288            }
289            Err(e) => {
290                let _ = txn.rollback().await;
291                Err(e)
292            }
293        }
294    }
295
296    /// Execute a closure inside a database transaction.
297    ///
298    /// # Security
299    ///
300    /// This method **consumes** `self` and returns it after the transaction completes.
301    /// This is critical for security: inside the closure, the original `Db` is not
302    /// accessible, so code cannot call `db.conn()` to create a non-transactional runner.
303    ///
304    /// Additionally, a task-local guard is set during the transaction, so any call
305    /// to `conn()` on *any* `Db` instance will fail with `DbError::ConnRequestedInsideTx`.
306    ///
307    /// # Example
308    ///
309    /// ```ignore
310    /// let db: Db = ctx.db()?;
311    ///
312    /// let (db, result) = db.transaction(|tx| {
313    ///     Box::pin(async move {
314    ///         // Only `tx` is available here
315    ///         service.create_user(tx, &scope, data).await?;
316    ///         Ok(user_id)
317    ///     })
318    /// }).await;
319    ///
320    /// let user_id = result?;
321    /// ```
322    ///
323    /// # Returns
324    ///
325    /// Returns `(Self, Result<T>)` where:
326    /// - `Self` is always returned (even on error) so the connection can be reused
327    /// - `Result<T>` contains the transaction result or error
328    pub async fn transaction<F, T>(self, f: F) -> (Self, anyhow::Result<T>)
329    where
330        F: for<'a> FnOnce(
331                &'a DbTx<'a>,
332            )
333                -> Pin<Box<dyn Future<Output = anyhow::Result<T>> + Send + 'a>>
334            + Send,
335        T: Send + 'static,
336    {
337        let txn = match self.handle.sea_internal_ref().begin().await {
338            Ok(t) => t,
339            Err(e) => return (self, Err(e.into())),
340        };
341        let tx = DbTx { tx: &txn };
342
343        // Run the closure with the transaction guard set
344        let res = with_tx_guard(f(&tx)).await;
345
346        match res {
347            Ok(v) => match txn.commit().await {
348                Ok(()) => (self, Ok(v)),
349                Err(e) => (self, Err(e.into())),
350            },
351            Err(e) => {
352                let _ = txn.rollback().await;
353                (self, Err(e))
354            }
355        }
356    }
357
358    /// Execute a transaction with typed domain errors.
359    ///
360    /// This variant separates infrastructure errors (connection issues, commit failures)
361    /// from domain errors returned by the closure.
362    ///
363    /// # Example
364    ///
365    /// ```ignore
366    /// let (db, result) = db.in_transaction(|tx| {
367    ///     Box::pin(async move {
368    ///         service.create_user(tx, &scope, data).await
369    ///     })
370    /// }).await;
371    ///
372    /// match result {
373    ///     Ok(user) => println!("Created: {:?}", user),
374    ///     Err(TxError::Domain(e)) => println!("Business error: {}", e),
375    ///     Err(TxError::Infra(e)) => println!("DB error: {}", e),
376    /// }
377    /// ```
378    pub async fn in_transaction<T, E, F>(self, f: F) -> (Self, Result<T, TxError<E>>)
379    where
380        T: Send + 'static,
381        E: std::fmt::Debug + std::fmt::Display + Send + 'static,
382        F: for<'a> FnOnce(&'a DbTx<'a>) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>
383            + Send,
384    {
385        use super::tx_error::InfraError;
386
387        let txn = match self.handle.sea_internal_ref().begin().await {
388            Ok(txn) => txn,
389            Err(e) => return (self, Err(TxError::Infra(InfraError::new(e.to_string())))),
390        };
391
392        let tx = DbTx { tx: &txn };
393
394        // Run the closure with the transaction guard set
395        let res = with_tx_guard(f(&tx)).await;
396
397        match res {
398            Ok(v) => match txn.commit().await {
399                Ok(()) => (self, Ok(v)),
400                Err(e) => (self, Err(TxError::Infra(InfraError::new(e.to_string())))),
401            },
402            Err(e) => {
403                let _ = txn.rollback().await;
404                (self, Err(TxError::Domain(e)))
405            }
406        }
407    }
408
409    /// Execute a transaction with custom configuration (isolation level, access mode).
410    ///
411    /// # Example
412    ///
413    /// ```ignore
414    /// use modkit_db::secure::{TxConfig, TxIsolationLevel};
415    ///
416    /// let config = TxConfig {
417    ///     isolation: Some(TxIsolationLevel::Serializable),
418    ///     access_mode: None,
419    /// };
420    ///
421    /// let (db, result) = db.transaction_with_config(config, |tx| {
422    ///     Box::pin(async move {
423    ///         // Serializable isolation
424    ///         service.reconcile(tx, &scope).await
425    ///     })
426    /// }).await;
427    /// ```
428    pub async fn transaction_with_config<T, F>(
429        self,
430        config: TxConfig,
431        f: F,
432    ) -> (Self, anyhow::Result<T>)
433    where
434        T: Send + 'static,
435        F: for<'a> FnOnce(
436                &'a DbTx<'a>,
437            )
438                -> Pin<Box<dyn Future<Output = anyhow::Result<T>> + Send + 'a>>
439            + Send,
440    {
441        use sea_orm::{AccessMode, IsolationLevel};
442
443        let isolation: Option<IsolationLevel> = config.isolation.map(Into::into);
444        let access_mode: Option<AccessMode> = config.access_mode.map(Into::into);
445
446        let txn = match self
447            .handle
448            .sea_internal_ref()
449            .begin_with_config(isolation, access_mode)
450            .await
451        {
452            Ok(t) => t,
453            Err(e) => return (self, Err(e.into())),
454        };
455        let tx = DbTx { tx: &txn };
456
457        // Run the closure with the transaction guard set
458        let res = with_tx_guard(f(&tx)).await;
459
460        match res {
461            Ok(v) => match txn.commit().await {
462                Ok(()) => (self, Ok(v)),
463                Err(e) => (self, Err(e.into())),
464            },
465            Err(e) => {
466                let _ = txn.rollback().await;
467                (self, Err(e))
468            }
469        }
470    }
471
472    /// Return database engine identifier for logging/tracing.
473    #[must_use]
474    pub fn db_engine(&self) -> &'static str {
475        use sea_orm::{ConnectionTrait, DbBackend};
476
477        match self.handle.sea_internal_ref().get_database_backend() {
478            DbBackend::Postgres => "postgres",
479            DbBackend::MySql => "mysql",
480            DbBackend::Sqlite => "sqlite",
481        }
482    }
483}
484
485/// Non-transactional database runner.
486///
487/// This type borrows from a [`Db`] and can be used to execute queries outside
488/// of a transaction context.
489///
490/// # Security
491///
492/// - NOT `Clone`: Cannot be duplicated
493/// - Borrows from `Db`: While `DbConn` exists, the `Db` cannot start a transaction
494/// - Cannot be constructed by user code: Only `Db::conn()` creates it
495///
496/// # Example
497///
498/// ```ignore
499/// let conn = db.conn()?;
500///
501/// let users = Entity::find()
502///     .secure()
503///     .scope_with(&scope)
504///     .all(&conn)
505///     .await?;
506/// ```
507pub struct DbConn<'a> {
508    pub(crate) conn: &'a DatabaseConnection,
509}
510
511impl std::fmt::Debug for DbConn<'_> {
512    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
513        f.debug_struct("DbConn").finish_non_exhaustive()
514    }
515}
516
517/// Transactional database runner.
518///
519/// This type is only available inside a transaction closure and represents
520/// an active database transaction.
521///
522/// # Security
523///
524/// - NOT `Clone`: Cannot be duplicated
525/// - Lifetime-bound: Cannot escape the transaction closure
526/// - Cannot be constructed by user code: Only `Db::transaction()` creates it
527///
528/// # Example
529///
530/// ```ignore
531/// let (db, result) = db.transaction(|tx| {
532///     Box::pin(async move {
533///         Entity::insert(model)
534///             .secure()
535///             .scope_with_model(&scope, &model)?
536///             .exec(tx)
537///             .await?;
538///         Ok(())
539///     })
540/// }).await;
541/// ```
542pub struct DbTx<'a> {
543    pub(crate) tx: &'a DatabaseTransaction,
544}
545
546impl std::fmt::Debug for DbTx<'_> {
547    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
548        f.debug_struct("DbTx").finish_non_exhaustive()
549    }
550}
551
552// NOTE: tests for `Db` live under `libs/modkit-db/tests/` so they can be gated per-backend
553// without creating feature-specific unused-import warnings in this module.