modkit_db/secure/db.rs
1//! Secure database handle and runner types.
2//!
3//! This module provides the primary entry point for secure database access:
4//!
5//! - [`Db`]: The database handle. NOT Clone, NOT storable by services.
6//! - [`DbConn`]: Non-transactional runner (borrows from `Db`).
7//! - [`DbTx`]: Transactional runner (lives inside transaction closure).
8//!
9//! # Security Model
10//!
11//! The transaction bypass vulnerability is prevented by multiple layers:
12//!
13//! 1. `Db` does NOT implement `Clone`
14//! 2. `Db::transaction(self, f)` consumes `self`, making it inaccessible inside the closure
15//! 3. Services receive `&impl DBRunner`, not `Db` or any factory
16//! 4. **Task-local guard**: `Db::conn()` fails if called inside a transaction
17//!
18//! The task-local guard provides defense-in-depth: even if code obtains a `Db`
19//! reference via another path (e.g., captured `Arc<AppServices>`), calling
20//! `conn()` will fail at runtime with `DbError::ConnRequestedInsideTx`.
21//!
22//! # Example
23//!
24//! ```ignore
25//! // In handler/entrypoint
26//! let db: Db = ctx.db()?;
27//!
28//! // Non-transactional path
29//! let conn = db.conn()?; // Note: returns Result now
30//! let user = service.get_user(&conn, &scope, id).await?;
31//!
32//! // Transactional path
33//! let (db, result) = db.transaction(|tx| {
34//! Box::pin(async move {
35//! // Only `tx` is available here - `db` is consumed
36//! // Calling some_db.conn() here would fail with ConnRequestedInsideTx
37//! service.create_user(tx, &scope, data).await?;
38//! Ok(user_id)
39//! })
40//! }).await;
41//! let user_id = result?;
42//! ```
43
44use std::{cell::Cell, future::Future, pin::Pin, sync::Arc};
45
46use sea_orm::{DatabaseConnection, DatabaseTransaction, TransactionTrait};
47
48use super::tx_config::TxConfig;
49use super::tx_error::TxError;
50use crate::{DbError, DbHandle};
51
52// Task-local guard to detect transaction bypass attempts.
53//
54// When set to `true`, any call to `Db::conn()` will fail with
55// `DbError::ConnRequestedInsideTx`. This prevents code from creating
56// non-transactional runners while inside a transaction closure.
57tokio::task_local! {
58 static IN_TX: Cell<bool>;
59}
60
61/// Check if we're currently inside a transaction context.
62///
63/// Returns `true` if a transaction is active in the current task.
64fn is_in_transaction() -> bool {
65 IN_TX.try_with(Cell::get).unwrap_or(false)
66}
67
68/// Execute a closure with the transaction guard set.
69///
70/// This sets `IN_TX = true` for the duration of the closure, ensuring
71/// that any calls to `Db::conn()` within will fail.
72async fn with_tx_guard<F, T>(f: F) -> T
73where
74 F: Future<Output = T>,
75{
76 IN_TX.scope(Cell::new(true), f).await
77}
78
79/// Database handle for secure operations.
80///
81/// # Security
82///
83/// This type is `Clone` to support ergonomic sharing in runtimes and service containers.
84/// Transaction-bypass is still prevented by the task-local guard: any attempt to call
85/// `conn()` inside a transaction closure fails with `DbError::ConnRequestedInsideTx`.
86///
87/// Services and repositories must NOT store this type. They should receive
88/// `&impl DBRunner` as a parameter to all methods that need database access.
89///
90/// # Usage
91///
92/// ```ignore
93/// // At the entrypoint (handler/command)
94/// let db: Db = ctx.db()?;
95///
96/// // Pass runner to service methods
97/// let conn = db.conn()?;
98/// let result = service.do_something(&conn, &scope).await?;
99///
100/// // Or use a transaction
101/// let (db, result) = db.transaction(|tx| {
102/// Box::pin(async move {
103/// service.do_something(tx, &scope).await
104/// })
105/// }).await;
106/// ```
107#[derive(Clone)]
108pub struct Db {
109 handle: Arc<DbHandle>,
110}
111
112impl std::fmt::Debug for Db {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 f.debug_struct("Db")
115 .field("engine", &self.handle.engine())
116 .finish_non_exhaustive()
117 }
118}
119
120impl Db {
121 /// **INTERNAL**: Create a new `Db` from an owned `DbHandle`.
122 ///
123 /// This is typically called by the runtime/context layer, not by service code.
124 #[must_use]
125 pub(crate) fn new(handle: DbHandle) -> Self {
126 Self {
127 handle: Arc::new(handle),
128 }
129 }
130
131 /// **INTERNAL**: Get a privileged `SeaORM` connection clone.
132 ///
133 /// This must not be exposed to module code. It exists for infrastructure
134 /// (migrations) inside `modkit-db`.
135 pub(crate) fn sea_internal(&self) -> DatabaseConnection {
136 self.handle.sea_internal()
137 }
138
139 /// Get a reference to the underlying `DbHandle`.
140 ///
141 /// # Security
142 ///
143 /// This is `pub(crate)` to allow internal infrastructure access (migrations, etc.)
144 /// but prevents service code from extracting the handle.
145 /// Create a non-transactional database runner.
146 ///
147 /// The returned `DbConn` borrows from `self`, ensuring that while a `DbConn`
148 /// exists, the `Db` cannot be used for other purposes (like starting a transaction).
149 ///
150 /// # Errors
151 ///
152 /// Returns `DbError::ConnRequestedInsideTx` if called from within a transaction
153 /// closure. This prevents the transaction bypass vulnerability where code could
154 /// create a non-transactional runner that persists writes outside the transaction.
155 ///
156 /// # Example
157 ///
158 /// ```ignore
159 /// let db: Db = ctx.db()?;
160 /// let conn = db.conn()?;
161 ///
162 /// // Use conn for queries
163 /// let users = Entity::find()
164 /// .secure()
165 /// .scope_with(&scope)
166 /// .all(&conn)
167 /// .await?;
168 /// ```
169 ///
170 /// The `Result` itself is `#[must_use]`; this method does not add an extra must-use
171 /// marker to avoid clippy `double_must_use`.
172 pub fn conn(&self) -> Result<DbConn<'_>, DbError> {
173 if is_in_transaction() {
174 return Err(DbError::ConnRequestedInsideTx);
175 }
176 Ok(DbConn {
177 conn: self.handle.sea_internal_ref(),
178 })
179 }
180
181 // --- Advisory locks (forwarded, no `DbHandle` exposure) ---
182
183 /// Acquire an advisory lock with the given key and module namespace.
184 ///
185 /// # Errors
186 /// Returns an error if the lock cannot be acquired.
187 pub async fn lock(&self, module: &str, key: &str) -> crate::Result<crate::DbLockGuard> {
188 self.handle.lock(module, key).await
189 }
190
191 /// Try to acquire an advisory lock with configurable retry/backoff policy.
192 ///
193 /// # Errors
194 /// Returns an error if an unrecoverable lock error occurs.
195 pub async fn try_lock(
196 &self,
197 module: &str,
198 key: &str,
199 config: crate::LockConfig,
200 ) -> crate::Result<Option<crate::DbLockGuard>> {
201 self.handle.try_lock(module, key, config).await
202 }
203
204 /// Execute a closure inside a database transaction (borrowed form).
205 ///
206 /// This variant keeps the call site ergonomic for service containers that store a
207 /// reusable DB entrypoint (e.g. `DBProvider`) without exposing `DbHandle`.
208 ///
209 /// # Security
210 ///
211 /// The task-local guard is still enforced: any call to `Db::conn()` within the closure
212 /// will fail with `DbError::ConnRequestedInsideTx`.
213 ///
214 /// # Errors
215 ///
216 /// Returns `DbError` if:
217 /// - starting the transaction fails
218 /// - the closure returns an error
219 /// - commit fails (rollback is attempted on closure error)
220 pub async fn transaction_ref<F, T>(&self, f: F) -> Result<T, DbError>
221 where
222 F: for<'a> FnOnce(
223 &'a DbTx<'a>,
224 )
225 -> Pin<Box<dyn Future<Output = Result<T, DbError>> + Send + 'a>>
226 + Send,
227 T: Send + 'static,
228 {
229 let txn = self.handle.sea_internal_ref().begin().await?;
230 let tx = DbTx { tx: &txn };
231
232 // Run the closure with the transaction guard set
233 let res = with_tx_guard(f(&tx)).await;
234
235 match res {
236 Ok(v) => {
237 txn.commit().await?;
238 Ok(v)
239 }
240 Err(e) => {
241 _ = txn.rollback().await;
242 Err(e)
243 }
244 }
245 }
246
247 /// Execute a closure inside a database transaction, mapping infrastructure errors into `E`.
248 ///
249 /// This is the preferred building block for service-facing entrypoints (like `DBProvider`)
250 /// that must return **domain** errors while still acquiring connections internally.
251 ///
252 /// - The transaction closure returns `Result<T, E>` (domain error).
253 /// - Begin/commit failures are `DbError` and are mapped via `E: From<DbError>`.
254 ///
255 /// # Security
256 ///
257 /// The task-local guard is enforced for the duration of the closure.
258 ///
259 /// # Errors
260 ///
261 /// Returns `E` if:
262 /// - starting the transaction fails (mapped from `DbError`)
263 /// - the closure returns an error
264 /// - commit fails (mapped from `DbError`)
265 pub async fn transaction_ref_mapped<F, T, E>(&self, f: F) -> Result<T, E>
266 where
267 E: From<DbError> + Send + 'static,
268 F: for<'a> FnOnce(&'a DbTx<'a>) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>
269 + Send,
270 T: Send + 'static,
271 {
272 let txn = self
273 .handle
274 .sea_internal_ref()
275 .begin()
276 .await
277 .map_err(DbError::from)
278 .map_err(E::from)?;
279 let tx = DbTx { tx: &txn };
280
281 // Run the closure with the transaction guard set
282 let res = with_tx_guard(f(&tx)).await;
283
284 match res {
285 Ok(v) => {
286 txn.commit().await.map_err(DbError::from).map_err(E::from)?;
287 Ok(v)
288 }
289 Err(e) => {
290 _ = txn.rollback().await;
291 Err(e)
292 }
293 }
294 }
295
296 /// Execute a closure inside a database transaction with custom configuration
297 /// (isolation level, access mode), mapping infrastructure errors into `E`.
298 ///
299 /// This is the preferred building block for service-facing entrypoints (like `DBProvider`)
300 /// that must return **domain** errors and need non-default transaction settings
301 /// (e.g., `SERIALIZABLE` isolation).
302 ///
303 /// # Security
304 ///
305 /// The task-local guard is enforced for the duration of the closure.
306 ///
307 /// # Errors
308 ///
309 /// Returns `E` if:
310 /// - starting the transaction fails (mapped from `DbError`)
311 /// - the closure returns an error
312 /// - commit fails (mapped from `DbError`)
313 pub async fn transaction_ref_mapped_with_config<F, T, E>(
314 &self,
315 config: TxConfig,
316 f: F,
317 ) -> Result<T, E>
318 where
319 E: From<DbError> + Send + 'static,
320 F: for<'a> FnOnce(&'a DbTx<'a>) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>
321 + Send,
322 T: Send + 'static,
323 {
324 use sea_orm::{AccessMode, IsolationLevel};
325
326 let isolation: Option<IsolationLevel> = config.isolation.map(Into::into);
327 let access_mode: Option<AccessMode> = config.access_mode.map(Into::into);
328
329 let txn = self
330 .handle
331 .sea_internal_ref()
332 .begin_with_config(isolation, access_mode)
333 .await
334 .map_err(DbError::from)
335 .map_err(E::from)?;
336 let tx = DbTx { tx: &txn };
337
338 // Run the closure with the transaction guard set
339 let res = with_tx_guard(f(&tx)).await;
340
341 match res {
342 Ok(v) => {
343 txn.commit().await.map_err(DbError::from).map_err(E::from)?;
344 Ok(v)
345 }
346 Err(e) => {
347 _ = txn.rollback().await;
348 Err(e)
349 }
350 }
351 }
352
353 /// Execute a closure inside a database transaction.
354 ///
355 /// # Security
356 ///
357 /// This method **consumes** `self` and returns it after the transaction completes.
358 /// This is critical for security: inside the closure, the original `Db` is not
359 /// accessible, so code cannot call `db.conn()` to create a non-transactional runner.
360 ///
361 /// Additionally, a task-local guard is set during the transaction, so any call
362 /// to `conn()` on *any* `Db` instance will fail with `DbError::ConnRequestedInsideTx`.
363 ///
364 /// # Example
365 ///
366 /// ```ignore
367 /// let db: Db = ctx.db()?;
368 ///
369 /// let (db, result) = db.transaction(|tx| {
370 /// Box::pin(async move {
371 /// // Only `tx` is available here
372 /// service.create_user(tx, &scope, data).await?;
373 /// Ok(user_id)
374 /// })
375 /// }).await;
376 ///
377 /// let user_id = result?;
378 /// ```
379 ///
380 /// # Returns
381 ///
382 /// Returns `(Self, Result<T>)` where:
383 /// - `Self` is always returned (even on error) so the connection can be reused
384 /// - `Result<T>` contains the transaction result or error
385 pub async fn transaction<F, T>(self, f: F) -> (Self, anyhow::Result<T>)
386 where
387 F: for<'a> FnOnce(
388 &'a DbTx<'a>,
389 )
390 -> Pin<Box<dyn Future<Output = anyhow::Result<T>> + Send + 'a>>
391 + Send,
392 T: Send + 'static,
393 {
394 let txn = match self.handle.sea_internal_ref().begin().await {
395 Ok(t) => t,
396 Err(e) => return (self, Err(e.into())),
397 };
398 let tx = DbTx { tx: &txn };
399
400 // Run the closure with the transaction guard set
401 let res = with_tx_guard(f(&tx)).await;
402
403 match res {
404 Ok(v) => match txn.commit().await {
405 Ok(()) => (self, Ok(v)),
406 Err(e) => (self, Err(e.into())),
407 },
408 Err(e) => {
409 _ = txn.rollback().await;
410 (self, Err(e))
411 }
412 }
413 }
414
415 /// Execute a transaction with typed domain errors.
416 ///
417 /// This variant separates infrastructure errors (connection issues, commit failures)
418 /// from domain errors returned by the closure.
419 ///
420 /// # Example
421 ///
422 /// ```ignore
423 /// let (db, result) = db.in_transaction(|tx| {
424 /// Box::pin(async move {
425 /// service.create_user(tx, &scope, data).await
426 /// })
427 /// }).await;
428 ///
429 /// match result {
430 /// Ok(user) => println!("Created: {:?}", user),
431 /// Err(TxError::Domain(e)) => println!("Business error: {}", e),
432 /// Err(TxError::Infra(e)) => println!("DB error: {}", e),
433 /// }
434 /// ```
435 pub async fn in_transaction<T, E, F>(self, f: F) -> (Self, Result<T, TxError<E>>)
436 where
437 T: Send + 'static,
438 E: std::fmt::Debug + std::fmt::Display + Send + 'static,
439 F: for<'a> FnOnce(&'a DbTx<'a>) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>
440 + Send,
441 {
442 use super::tx_error::InfraError;
443
444 let txn = match self.handle.sea_internal_ref().begin().await {
445 Ok(txn) => txn,
446 Err(e) => return (self, Err(TxError::Infra(InfraError::new(e.to_string())))),
447 };
448
449 let tx = DbTx { tx: &txn };
450
451 // Run the closure with the transaction guard set
452 let res = with_tx_guard(f(&tx)).await;
453
454 match res {
455 Ok(v) => match txn.commit().await {
456 Ok(()) => (self, Ok(v)),
457 Err(e) => (self, Err(TxError::Infra(InfraError::new(e.to_string())))),
458 },
459 Err(e) => {
460 _ = txn.rollback().await;
461 (self, Err(TxError::Domain(e)))
462 }
463 }
464 }
465
466 /// Execute a transaction with custom configuration (isolation level, access mode).
467 ///
468 /// # Example
469 ///
470 /// ```ignore
471 /// use modkit_db::secure::{TxConfig, TxIsolationLevel};
472 ///
473 /// let config = TxConfig {
474 /// isolation: Some(TxIsolationLevel::Serializable),
475 /// access_mode: None,
476 /// };
477 ///
478 /// let (db, result) = db.transaction_with_config(config, |tx| {
479 /// Box::pin(async move {
480 /// // Serializable isolation
481 /// service.reconcile(tx, &scope).await
482 /// })
483 /// }).await;
484 /// ```
485 pub async fn transaction_with_config<T, F>(
486 self,
487 config: TxConfig,
488 f: F,
489 ) -> (Self, anyhow::Result<T>)
490 where
491 T: Send + 'static,
492 F: for<'a> FnOnce(
493 &'a DbTx<'a>,
494 )
495 -> Pin<Box<dyn Future<Output = anyhow::Result<T>> + Send + 'a>>
496 + Send,
497 {
498 use sea_orm::{AccessMode, IsolationLevel};
499
500 let isolation: Option<IsolationLevel> = config.isolation.map(Into::into);
501 let access_mode: Option<AccessMode> = config.access_mode.map(Into::into);
502
503 let txn = match self
504 .handle
505 .sea_internal_ref()
506 .begin_with_config(isolation, access_mode)
507 .await
508 {
509 Ok(t) => t,
510 Err(e) => return (self, Err(e.into())),
511 };
512 let tx = DbTx { tx: &txn };
513
514 // Run the closure with the transaction guard set
515 let res = with_tx_guard(f(&tx)).await;
516
517 match res {
518 Ok(v) => match txn.commit().await {
519 Ok(()) => (self, Ok(v)),
520 Err(e) => (self, Err(e.into())),
521 },
522 Err(e) => {
523 _ = txn.rollback().await;
524 (self, Err(e))
525 }
526 }
527 }
528
529 /// Return database engine identifier for logging/tracing.
530 #[must_use]
531 pub fn db_engine(&self) -> &'static str {
532 use sea_orm::{ConnectionTrait, DbBackend};
533
534 match self.handle.sea_internal_ref().get_database_backend() {
535 DbBackend::Postgres => "postgres",
536 DbBackend::MySql => "mysql",
537 DbBackend::Sqlite => "sqlite",
538 }
539 }
540}
541
542/// Non-transactional database runner.
543///
544/// This type borrows from a [`Db`] and can be used to execute queries outside
545/// of a transaction context.
546///
547/// # Security
548///
549/// - NOT `Clone`: Cannot be duplicated
550/// - Borrows from `Db`: While `DbConn` exists, the `Db` cannot start a transaction
551/// - Cannot be constructed by user code: Only `Db::conn()` creates it
552///
553/// # Example
554///
555/// ```ignore
556/// let conn = db.conn()?;
557///
558/// let users = Entity::find()
559/// .secure()
560/// .scope_with(&scope)
561/// .all(&conn)
562/// .await?;
563/// ```
564pub struct DbConn<'a> {
565 pub(crate) conn: &'a DatabaseConnection,
566}
567
568impl std::fmt::Debug for DbConn<'_> {
569 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
570 f.debug_struct("DbConn").finish_non_exhaustive()
571 }
572}
573
574/// Transactional database runner.
575///
576/// This type is only available inside a transaction closure and represents
577/// an active database transaction.
578///
579/// # Security
580///
581/// - NOT `Clone`: Cannot be duplicated
582/// - Lifetime-bound: Cannot escape the transaction closure
583/// - Cannot be constructed by user code: Only `Db::transaction()` creates it
584///
585/// # Example
586///
587/// ```ignore
588/// let (db, result) = db.transaction(|tx| {
589/// Box::pin(async move {
590/// Entity::insert(model)
591/// .secure()
592/// .scope_with_model(&scope, &model)?
593/// .exec(tx)
594/// .await?;
595/// Ok(())
596/// })
597/// }).await;
598/// ```
599pub struct DbTx<'a> {
600 pub(crate) tx: &'a DatabaseTransaction,
601}
602
603impl std::fmt::Debug for DbTx<'_> {
604 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
605 f.debug_struct("DbTx").finish_non_exhaustive()
606 }
607}
608
609// NOTE: tests for `Db` live under `libs/modkit-db/tests/` so they can be gated per-backend
610// without creating feature-specific unused-import warnings in this module.