modkit_db/secure/db.rs
1//! Secure database handle and runner types.
2//!
3//! This module provides the primary entry point for secure database access:
4//!
5//! - [`Db`]: The database handle. NOT Clone, NOT storable by services.
6//! - [`DbConn`]: Non-transactional runner (borrows from `Db`).
7//! - [`DbTx`]: Transactional runner (lives inside transaction closure).
8//!
9//! # Security Model
10//!
11//! The transaction bypass vulnerability is prevented by multiple layers:
12//!
13//! 1. `Db` does NOT implement `Clone`
14//! 2. `Db::transaction(self, f)` consumes `self`, making it inaccessible inside the closure
15//! 3. Services receive `&impl DBRunner`, not `Db` or any factory
16//! 4. **Task-local guard**: `Db::conn()` fails if called inside a transaction
17//!
18//! The task-local guard provides defense-in-depth: even if code obtains a `Db`
19//! reference via another path (e.g., captured `Arc<AppServices>`), calling
20//! `conn()` will fail at runtime with `DbError::ConnRequestedInsideTx`.
21//!
22//! # Example
23//!
24//! ```ignore
25//! // In handler/entrypoint
26//! let db: Db = ctx.db()?;
27//!
28//! // Non-transactional path
29//! let conn = db.conn()?; // Note: returns Result now
30//! let user = service.get_user(&conn, &scope, id).await?;
31//!
32//! // Transactional path
33//! let (db, result) = db.transaction(|tx| {
34//! Box::pin(async move {
35//! // Only `tx` is available here - `db` is consumed
36//! // Calling some_db.conn() here would fail with ConnRequestedInsideTx
37//! service.create_user(tx, &scope, data).await?;
38//! Ok(user_id)
39//! })
40//! }).await;
41//! let user_id = result?;
42//! ```
43
44use std::{cell::Cell, future::Future, pin::Pin, sync::Arc};
45
46use sea_orm::{DatabaseConnection, DatabaseTransaction, TransactionTrait};
47
48use super::tx_config::TxConfig;
49use super::tx_error::TxError;
50use crate::{DbError, DbHandle};
51
52// Task-local guard to detect transaction bypass attempts.
53//
54// When set to `true`, any call to `Db::conn()` will fail with
55// `DbError::ConnRequestedInsideTx`. This prevents code from creating
56// non-transactional runners while inside a transaction closure.
57tokio::task_local! {
58 static IN_TX: Cell<bool>;
59}
60
61/// Check if we're currently inside a transaction context.
62///
63/// Returns `true` if a transaction is active in the current task.
64fn is_in_transaction() -> bool {
65 IN_TX.try_with(Cell::get).unwrap_or(false)
66}
67
68/// Execute a closure with the transaction guard set.
69///
70/// This sets `IN_TX = true` for the duration of the closure, ensuring
71/// that any calls to `Db::conn()` within will fail.
72async fn with_tx_guard<F, T>(f: F) -> T
73where
74 F: Future<Output = T>,
75{
76 IN_TX.scope(Cell::new(true), f).await
77}
78
79/// Database handle for secure operations.
80///
81/// # Security
82///
83/// This type is `Clone` to support ergonomic sharing in runtimes and service containers.
84/// Transaction-bypass is still prevented by the task-local guard: any attempt to call
85/// `conn()` inside a transaction closure fails with `DbError::ConnRequestedInsideTx`.
86///
87/// Services and repositories must NOT store this type. They should receive
88/// `&impl DBRunner` as a parameter to all methods that need database access.
89///
90/// # Usage
91///
92/// ```ignore
93/// // At the entrypoint (handler/command)
94/// let db: Db = ctx.db()?;
95///
96/// // Pass runner to service methods
97/// let conn = db.conn()?;
98/// let result = service.do_something(&conn, &scope).await?;
99///
100/// // Or use a transaction
101/// let (db, result) = db.transaction(|tx| {
102/// Box::pin(async move {
103/// service.do_something(tx, &scope).await
104/// })
105/// }).await;
106/// ```
107#[derive(Clone)]
108pub struct Db {
109 handle: Arc<DbHandle>,
110}
111
112impl std::fmt::Debug for Db {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 f.debug_struct("Db")
115 .field("engine", &self.handle.engine())
116 .finish_non_exhaustive()
117 }
118}
119
120impl Db {
121 /// **INTERNAL**: Create a new `Db` from an owned `DbHandle`.
122 ///
123 /// This is typically called by the runtime/context layer, not by service code.
124 #[must_use]
125 pub(crate) fn new(handle: DbHandle) -> Self {
126 Self {
127 handle: Arc::new(handle),
128 }
129 }
130
131 /// **INTERNAL**: Get a privileged `SeaORM` connection clone.
132 ///
133 /// This must not be exposed to module code. It exists for infrastructure
134 /// (migrations) inside `modkit-db`.
135 pub(crate) fn sea_internal(&self) -> DatabaseConnection {
136 self.handle.sea_internal()
137 }
138
139 /// Get a reference to the underlying `DbHandle`.
140 ///
141 /// # Security
142 ///
143 /// This is `pub(crate)` to allow internal infrastructure access (migrations, etc.)
144 /// but prevents service code from extracting the handle.
145 /// Create a non-transactional database runner.
146 ///
147 /// The returned `DbConn` borrows from `self`, ensuring that while a `DbConn`
148 /// exists, the `Db` cannot be used for other purposes (like starting a transaction).
149 ///
150 /// # Errors
151 ///
152 /// Returns `DbError::ConnRequestedInsideTx` if called from within a transaction
153 /// closure. This prevents the transaction bypass vulnerability where code could
154 /// create a non-transactional runner that persists writes outside the transaction.
155 ///
156 /// # Example
157 ///
158 /// ```ignore
159 /// let db: Db = ctx.db()?;
160 /// let conn = db.conn()?;
161 ///
162 /// // Use conn for queries
163 /// let users = Entity::find()
164 /// .secure()
165 /// .scope_with(&scope)
166 /// .all(&conn)
167 /// .await?;
168 /// ```
169 ///
170 /// The `Result` itself is `#[must_use]`; this method does not add an extra must-use
171 /// marker to avoid clippy `double_must_use`.
172 pub fn conn(&self) -> Result<DbConn<'_>, DbError> {
173 if is_in_transaction() {
174 return Err(DbError::ConnRequestedInsideTx);
175 }
176 Ok(DbConn {
177 conn: self.handle.sea_internal_ref(),
178 })
179 }
180
181 // --- Advisory locks (forwarded, no `DbHandle` exposure) ---
182
183 /// Acquire an advisory lock with the given key and module namespace.
184 ///
185 /// # Errors
186 /// Returns an error if the lock cannot be acquired.
187 pub async fn lock(&self, module: &str, key: &str) -> crate::Result<crate::DbLockGuard> {
188 self.handle.lock(module, key).await
189 }
190
191 /// Try to acquire an advisory lock with configurable retry/backoff policy.
192 ///
193 /// # Errors
194 /// Returns an error if an unrecoverable lock error occurs.
195 pub async fn try_lock(
196 &self,
197 module: &str,
198 key: &str,
199 config: crate::LockConfig,
200 ) -> crate::Result<Option<crate::DbLockGuard>> {
201 self.handle.try_lock(module, key, config).await
202 }
203
204 /// Execute a closure inside a database transaction (borrowed form).
205 ///
206 /// This variant keeps the call site ergonomic for service containers that store a
207 /// reusable DB entrypoint (e.g. `DBProvider`) without exposing `DbHandle`.
208 ///
209 /// # Security
210 ///
211 /// The task-local guard is still enforced: any call to `Db::conn()` within the closure
212 /// will fail with `DbError::ConnRequestedInsideTx`.
213 ///
214 /// # Errors
215 ///
216 /// Returns `DbError` if:
217 /// - starting the transaction fails
218 /// - the closure returns an error
219 /// - commit fails (rollback is attempted on closure error)
220 pub async fn transaction_ref<F, T>(&self, f: F) -> Result<T, DbError>
221 where
222 F: for<'a> FnOnce(
223 &'a DbTx<'a>,
224 )
225 -> Pin<Box<dyn Future<Output = Result<T, DbError>> + Send + 'a>>
226 + Send,
227 T: Send + 'static,
228 {
229 let txn = self.handle.sea_internal_ref().begin().await?;
230 let tx = DbTx { tx: &txn };
231
232 // Run the closure with the transaction guard set
233 let res = with_tx_guard(f(&tx)).await;
234
235 match res {
236 Ok(v) => {
237 txn.commit().await?;
238 Ok(v)
239 }
240 Err(e) => {
241 _ = txn.rollback().await;
242 Err(e)
243 }
244 }
245 }
246
247 /// Execute a closure inside a database transaction, mapping infrastructure errors into `E`.
248 ///
249 /// This is the preferred building block for service-facing entrypoints (like `DBProvider`)
250 /// that must return **domain** errors while still acquiring connections internally.
251 ///
252 /// - The transaction closure returns `Result<T, E>` (domain error).
253 /// - Begin/commit failures are `DbError` and are mapped via `E: From<DbError>`.
254 ///
255 /// # Security
256 ///
257 /// The task-local guard is enforced for the duration of the closure.
258 ///
259 /// # Errors
260 ///
261 /// Returns `E` if:
262 /// - starting the transaction fails (mapped from `DbError`)
263 /// - the closure returns an error
264 /// - commit fails (mapped from `DbError`)
265 pub async fn transaction_ref_mapped<F, T, E>(&self, f: F) -> Result<T, E>
266 where
267 E: From<DbError> + Send + 'static,
268 F: for<'a> FnOnce(&'a DbTx<'a>) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>
269 + Send,
270 T: Send + 'static,
271 {
272 let txn = self
273 .handle
274 .sea_internal_ref()
275 .begin()
276 .await
277 .map_err(DbError::from)
278 .map_err(E::from)?;
279 let tx = DbTx { tx: &txn };
280
281 // Run the closure with the transaction guard set
282 let res = with_tx_guard(f(&tx)).await;
283
284 match res {
285 Ok(v) => {
286 txn.commit().await.map_err(DbError::from).map_err(E::from)?;
287 Ok(v)
288 }
289 Err(e) => {
290 _ = txn.rollback().await;
291 Err(e)
292 }
293 }
294 }
295
296 /// Execute a closure inside a database transaction.
297 ///
298 /// # Security
299 ///
300 /// This method **consumes** `self` and returns it after the transaction completes.
301 /// This is critical for security: inside the closure, the original `Db` is not
302 /// accessible, so code cannot call `db.conn()` to create a non-transactional runner.
303 ///
304 /// Additionally, a task-local guard is set during the transaction, so any call
305 /// to `conn()` on *any* `Db` instance will fail with `DbError::ConnRequestedInsideTx`.
306 ///
307 /// # Example
308 ///
309 /// ```ignore
310 /// let db: Db = ctx.db()?;
311 ///
312 /// let (db, result) = db.transaction(|tx| {
313 /// Box::pin(async move {
314 /// // Only `tx` is available here
315 /// service.create_user(tx, &scope, data).await?;
316 /// Ok(user_id)
317 /// })
318 /// }).await;
319 ///
320 /// let user_id = result?;
321 /// ```
322 ///
323 /// # Returns
324 ///
325 /// Returns `(Self, Result<T>)` where:
326 /// - `Self` is always returned (even on error) so the connection can be reused
327 /// - `Result<T>` contains the transaction result or error
328 pub async fn transaction<F, T>(self, f: F) -> (Self, anyhow::Result<T>)
329 where
330 F: for<'a> FnOnce(
331 &'a DbTx<'a>,
332 )
333 -> Pin<Box<dyn Future<Output = anyhow::Result<T>> + Send + 'a>>
334 + Send,
335 T: Send + 'static,
336 {
337 let txn = match self.handle.sea_internal_ref().begin().await {
338 Ok(t) => t,
339 Err(e) => return (self, Err(e.into())),
340 };
341 let tx = DbTx { tx: &txn };
342
343 // Run the closure with the transaction guard set
344 let res = with_tx_guard(f(&tx)).await;
345
346 match res {
347 Ok(v) => match txn.commit().await {
348 Ok(()) => (self, Ok(v)),
349 Err(e) => (self, Err(e.into())),
350 },
351 Err(e) => {
352 _ = txn.rollback().await;
353 (self, Err(e))
354 }
355 }
356 }
357
358 /// Execute a transaction with typed domain errors.
359 ///
360 /// This variant separates infrastructure errors (connection issues, commit failures)
361 /// from domain errors returned by the closure.
362 ///
363 /// # Example
364 ///
365 /// ```ignore
366 /// let (db, result) = db.in_transaction(|tx| {
367 /// Box::pin(async move {
368 /// service.create_user(tx, &scope, data).await
369 /// })
370 /// }).await;
371 ///
372 /// match result {
373 /// Ok(user) => println!("Created: {:?}", user),
374 /// Err(TxError::Domain(e)) => println!("Business error: {}", e),
375 /// Err(TxError::Infra(e)) => println!("DB error: {}", e),
376 /// }
377 /// ```
378 pub async fn in_transaction<T, E, F>(self, f: F) -> (Self, Result<T, TxError<E>>)
379 where
380 T: Send + 'static,
381 E: std::fmt::Debug + std::fmt::Display + Send + 'static,
382 F: for<'a> FnOnce(&'a DbTx<'a>) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>
383 + Send,
384 {
385 use super::tx_error::InfraError;
386
387 let txn = match self.handle.sea_internal_ref().begin().await {
388 Ok(txn) => txn,
389 Err(e) => return (self, Err(TxError::Infra(InfraError::new(e.to_string())))),
390 };
391
392 let tx = DbTx { tx: &txn };
393
394 // Run the closure with the transaction guard set
395 let res = with_tx_guard(f(&tx)).await;
396
397 match res {
398 Ok(v) => match txn.commit().await {
399 Ok(()) => (self, Ok(v)),
400 Err(e) => (self, Err(TxError::Infra(InfraError::new(e.to_string())))),
401 },
402 Err(e) => {
403 _ = txn.rollback().await;
404 (self, Err(TxError::Domain(e)))
405 }
406 }
407 }
408
409 /// Execute a transaction with custom configuration (isolation level, access mode).
410 ///
411 /// # Example
412 ///
413 /// ```ignore
414 /// use modkit_db::secure::{TxConfig, TxIsolationLevel};
415 ///
416 /// let config = TxConfig {
417 /// isolation: Some(TxIsolationLevel::Serializable),
418 /// access_mode: None,
419 /// };
420 ///
421 /// let (db, result) = db.transaction_with_config(config, |tx| {
422 /// Box::pin(async move {
423 /// // Serializable isolation
424 /// service.reconcile(tx, &scope).await
425 /// })
426 /// }).await;
427 /// ```
428 pub async fn transaction_with_config<T, F>(
429 self,
430 config: TxConfig,
431 f: F,
432 ) -> (Self, anyhow::Result<T>)
433 where
434 T: Send + 'static,
435 F: for<'a> FnOnce(
436 &'a DbTx<'a>,
437 )
438 -> Pin<Box<dyn Future<Output = anyhow::Result<T>> + Send + 'a>>
439 + Send,
440 {
441 use sea_orm::{AccessMode, IsolationLevel};
442
443 let isolation: Option<IsolationLevel> = config.isolation.map(Into::into);
444 let access_mode: Option<AccessMode> = config.access_mode.map(Into::into);
445
446 let txn = match self
447 .handle
448 .sea_internal_ref()
449 .begin_with_config(isolation, access_mode)
450 .await
451 {
452 Ok(t) => t,
453 Err(e) => return (self, Err(e.into())),
454 };
455 let tx = DbTx { tx: &txn };
456
457 // Run the closure with the transaction guard set
458 let res = with_tx_guard(f(&tx)).await;
459
460 match res {
461 Ok(v) => match txn.commit().await {
462 Ok(()) => (self, Ok(v)),
463 Err(e) => (self, Err(e.into())),
464 },
465 Err(e) => {
466 _ = txn.rollback().await;
467 (self, Err(e))
468 }
469 }
470 }
471
472 /// Return database engine identifier for logging/tracing.
473 #[must_use]
474 pub fn db_engine(&self) -> &'static str {
475 use sea_orm::{ConnectionTrait, DbBackend};
476
477 match self.handle.sea_internal_ref().get_database_backend() {
478 DbBackend::Postgres => "postgres",
479 DbBackend::MySql => "mysql",
480 DbBackend::Sqlite => "sqlite",
481 }
482 }
483}
484
485/// Non-transactional database runner.
486///
487/// This type borrows from a [`Db`] and can be used to execute queries outside
488/// of a transaction context.
489///
490/// # Security
491///
492/// - NOT `Clone`: Cannot be duplicated
493/// - Borrows from `Db`: While `DbConn` exists, the `Db` cannot start a transaction
494/// - Cannot be constructed by user code: Only `Db::conn()` creates it
495///
496/// # Example
497///
498/// ```ignore
499/// let conn = db.conn()?;
500///
501/// let users = Entity::find()
502/// .secure()
503/// .scope_with(&scope)
504/// .all(&conn)
505/// .await?;
506/// ```
507pub struct DbConn<'a> {
508 pub(crate) conn: &'a DatabaseConnection,
509}
510
511impl std::fmt::Debug for DbConn<'_> {
512 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
513 f.debug_struct("DbConn").finish_non_exhaustive()
514 }
515}
516
517/// Transactional database runner.
518///
519/// This type is only available inside a transaction closure and represents
520/// an active database transaction.
521///
522/// # Security
523///
524/// - NOT `Clone`: Cannot be duplicated
525/// - Lifetime-bound: Cannot escape the transaction closure
526/// - Cannot be constructed by user code: Only `Db::transaction()` creates it
527///
528/// # Example
529///
530/// ```ignore
531/// let (db, result) = db.transaction(|tx| {
532/// Box::pin(async move {
533/// Entity::insert(model)
534/// .secure()
535/// .scope_with_model(&scope, &model)?
536/// .exec(tx)
537/// .await?;
538/// Ok(())
539/// })
540/// }).await;
541/// ```
542pub struct DbTx<'a> {
543 pub(crate) tx: &'a DatabaseTransaction,
544}
545
546impl std::fmt::Debug for DbTx<'_> {
547 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
548 f.debug_struct("DbTx").finish_non_exhaustive()
549 }
550}
551
552// NOTE: tests for `Db` live under `libs/modkit-db/tests/` so they can be gated per-backend
553// without creating feature-specific unused-import warnings in this module.