Skip to main content

modkit_db/secure/
secure_conn.rs

1//! High-level secure database wrapper for ergonomic, type-safe access.
2//!
3//! This module provides `SecureConn`, a wrapper around a private `SeaORM` connection
4//! that enforces access control policies on all operations.
5//!
6//! Plugin/module developers should never handle raw `DatabaseConnection` or manually
7//! apply scopes. Instead, they receive a `SecureConn` instance that guarantees:
8//!
9//! - **Automatic scoping**: All queries are filtered by tenant/resource scope
10//! - **Type safety**: Cannot execute unscoped queries
11//! - **Ergonomics**: Simple, fluent API for common operations
12//!
13//! # Example
14//!
15//! ```ignore
16//! use modkit_db::secure::{SecureConn, SecurityCtx, AccessScope};
17//!
18//! pub struct UsersRepo<'a> {
19//!     db: &'a SecureConn,
20//! }
21//!
22//! impl<'a> UsersRepo<'a> {
23//!     pub async fn find_by_id(&self, id: Uuid) -> Result<Option<User>, ScopeError> {
24//!         let user = self.db
25//!             .find_by_id::<user::Entity>(id)?
26//!             .one(self.db)
27//!             .await?;
28//!         Ok(user.map(Into::into))
29//!     }
30//!
31//!     pub async fn find_all(&self) -> Result<Vec<User>, ScopeError> {
32//!         let users = self.db
33//!             .find::<user::Entity>()?
34//!             .all(self.db)
35//!             .await?;
36//!         Ok(users.into_iter().map(Into::into).collect())
37//!     }
38//!
39//!     pub async fn update_status(&self, status: String) -> Result<u64, ScopeError> {
40//!         let result = self.db
41//!             .update_many::<user::Entity>()?
42//!             .col_expr(user::Column::Status, Expr::value(status))
43//!             .exec(self.db)
44//!             .await?;
45//!         Ok(result.rows_affected)
46//!     }
47//! }
48//! ```
49
50use std::{future::Future, pin::Pin};
51
52use sea_orm::{
53    AccessMode, ColumnTrait, ConnectionTrait, DatabaseConnection, DatabaseTransaction, EntityTrait,
54    IsolationLevel, QueryFilter, TransactionTrait, sea_query::Expr,
55};
56use uuid::Uuid;
57
58use crate::secure::tx_error::{InfraError, TxError};
59
60use modkit_security::AccessScope;
61
62use crate::secure::tx_config::TxConfig;
63
64use crate::secure::{ScopableEntity, ScopeError, Scoped, SecureEntityExt, SecureSelect};
65
66use crate::secure::db_ops::{SecureDeleteExt, SecureDeleteMany, SecureUpdateExt, SecureUpdateMany};
67
68/// Secure transaction wrapper (capability).
69///
70/// This type intentionally does not expose any raw transaction or executor API.
71pub struct SecureTx<'a> {
72    pub(crate) tx: &'a DatabaseTransaction,
73}
74
75impl<'a> SecureTx<'a> {
76    #[must_use]
77    pub(crate) fn new(tx: &'a DatabaseTransaction) -> Self {
78        Self { tx }
79    }
80}
81
82/// Secure database connection wrapper.
83///
84/// This is the primary interface for module developers to access the database.
85/// All operations require a `SecurityCtx` parameter for per-request access control.
86///
87/// # Usage
88///
89/// Module services receive a `&SecureConn` and provide `SecurityCtx` per-request:
90///
91/// ```ignore
92/// pub struct MyService<'a> {
93///     db: &'a SecureConn,
94/// }
95///
96/// impl<'a> MyService<'a> {
97///     pub async fn get_user(&self, scope: &AccessScope, id: Uuid) -> Result<Option<User>> {
98///         self.db.find_by_id::<user::Entity>(ctx, id)?
99///             .one(self.db)
100///             .await
101///     }
102/// }
103/// ```
104///
105/// # Security Guarantees
106///
107/// - All queries require `SecurityCtx` from the request
108/// - Queries are scoped by tenant/resource from the context
109/// - Empty scopes result in deny-all (no data returned)
110/// - Type system prevents unscoped queries from compiling
111/// - Modules cannot access raw database connections
112pub struct SecureConn {
113    pub(crate) conn: DatabaseConnection,
114}
115
116impl SecureConn {
117    /// Create a new secure database connection wrapper.
118    /// Internal-only accessor to the raw database connection.
119    ///
120    /// # Security
121    ///
122    /// This MUST NOT be exposed publicly. Any public raw access to the underlying database
123    /// handle would allow bypassing scoping and tenant isolation.
124    #[must_use]
125    pub(crate) fn conn_internal(&self) -> &DatabaseConnection {
126        &self.conn
127    }
128
129    /// Return database engine identifier for tracing / logging.
130    #[must_use]
131    pub fn db_engine(&self) -> &'static str {
132        use sea_orm::DbBackend;
133
134        match self.conn.get_database_backend() {
135            DbBackend::Postgres => "postgres",
136            DbBackend::MySql => "mysql",
137            DbBackend::Sqlite => "sqlite",
138        }
139    }
140
141    /// Create a scoped select query for the given entity.
142    ///
143    /// Returns a `SecureSelect<E, Scoped>` that automatically applies
144    /// tenant/resource filtering based on the provided security context.
145    ///
146    /// # Example
147    ///
148    /// ```ignore
149    /// let ctx = SecurityCtx::for_tenants(vec![tenant_id], user_id);
150    /// let users = db.find::<user::Entity>(&ctx)?
151    ///     .filter(user::Column::Status.eq("active"))
152    ///     .order_by_asc(user::Column::Email)
153    ///     .all(db)
154    ///     .await?;
155    /// ```
156    ///
157    /// # Errors
158    ///
159    #[allow(clippy::unused_self)] // Keep fluent &SecureConn API even when method only delegates
160    pub fn find<E>(&self, scope: &AccessScope) -> SecureSelect<E, Scoped>
161    where
162        E: ScopableEntity + EntityTrait,
163        E::Column: ColumnTrait + Copy,
164    {
165        E::find().secure().scope_with(scope)
166    }
167
168    /// Create a scoped select query filtered by a specific resource ID.
169    ///
170    /// This is a convenience method that combines `find()` with `.and_id()`.
171    ///
172    /// # Example
173    ///
174    /// ```ignore
175    /// let ctx = SecurityCtx::for_tenants(vec![tenant_id], user_id);
176    /// let user = db.find_by_id::<user::Entity>(&ctx, user_id)?
177    ///     .one(db)
178    ///     .await?;
179    /// ```
180    ///
181    /// # Errors
182    /// Returns `ScopeError` if the entity doesn't have a resource column or scoping fails.
183    pub fn find_by_id<E>(
184        &self,
185        scope: &AccessScope,
186        id: Uuid,
187    ) -> Result<SecureSelect<E, Scoped>, ScopeError>
188    where
189        E: ScopableEntity + EntityTrait,
190        E::Column: ColumnTrait + Copy,
191    {
192        self.find::<E>(scope).and_id(id)
193    }
194
195    /// Create a scoped update query for the given entity.
196    ///
197    /// Returns a `SecureUpdateMany<E, Scoped>` that automatically applies
198    /// tenant/resource filtering. Use `.col_expr()` or other `SeaORM` methods
199    /// to specify what to update.
200    ///
201    /// # Example
202    ///
203    /// ```ignore
204    /// let ctx = SecurityCtx::for_tenants(vec![tenant_id], user_id);
205    /// let result = db.update_many::<user::Entity>(&ctx)?
206    ///     .col_expr(user::Column::Status, Expr::value("active"))
207    ///     .col_expr(user::Column::UpdatedAt, Expr::value(Utc::now()))
208    ///     .exec(db)
209    ///     .await?;
210    /// println!("Updated {} rows", result.rows_affected);
211    /// ```
212    ///
213    #[allow(clippy::unused_self)] // Delegates but matches the rest of the connection API
214    #[must_use]
215    pub fn update_many<E>(&self, scope: &AccessScope) -> SecureUpdateMany<E, Scoped>
216    where
217        E: ScopableEntity + EntityTrait,
218        E::Column: ColumnTrait + Copy,
219    {
220        E::update_many().secure().scope_with(scope)
221    }
222
223    /// Create a scoped delete query for the given entity.
224    ///
225    /// Returns a `SecureDeleteMany<E, Scoped>` that automatically applies
226    /// tenant/resource filtering.
227    ///
228    /// # Example
229    ///
230    /// ```ignore
231    /// let ctx = SecurityCtx::for_tenants(vec![tenant_id], user_id);
232    /// let result = db.delete_many::<user::Entity>(&ctx)?
233    ///     .exec(db)
234    ///     .await?;
235    /// println!("Deleted {} rows", result.rows_affected);
236    /// ```
237    ///
238    #[allow(clippy::unused_self)] // Retain method-style ergonomics for callers of SecureConn
239    #[must_use]
240    pub fn delete_many<E>(&self, scope: &AccessScope) -> SecureDeleteMany<E, Scoped>
241    where
242        E: ScopableEntity + EntityTrait,
243        E::Column: ColumnTrait + Copy,
244    {
245        E::delete_many().secure().scope_with(scope)
246    }
247
248    /// Create a scoped insert builder with `on_conflict()` support.
249    ///
250    /// Unlike the simpler `insert()` method, this returns a builder that allows
251    /// setting `on_conflict()` for upsert semantics while still enforcing
252    /// tenant validation through the secure typestate pattern.
253    ///
254    /// # Example
255    ///
256    /// ```ignore
257    /// use sea_orm::sea_query::OnConflict;
258    ///
259    /// let scope = AccessScope::for_tenants(vec![tenant_id]);
260    /// let am = settings::ActiveModel {
261    ///     tenant_id: Set(tenant_id),
262    ///     user_id: Set(user_id),
263    ///     theme: Set(Some("dark".to_string())),
264    ///     ..Default::default()
265    /// };
266    ///
267    /// db.insert_one(&scope, am)?
268    ///     .on_conflict(
269    ///         OnConflict::columns([Column::TenantId, Column::UserId])
270    ///             .update_columns([Column::Theme])
271    ///             .to_owned()
272    ///     )
273    ///     .exec(db)
274    ///     .await?;
275    /// ```
276    ///
277    /// # Errors
278    ///
279    /// - `ScopeError::Invalid` if `tenant_id` is not set for tenant-scoped entities
280    /// - `ScopeError::TenantNotInScope` if `tenant_id` is not in the provided scope
281    #[allow(clippy::needless_pass_by_value)] // We clone for insert and borrow for validation
282    pub fn insert_one<E>(
283        &self,
284        scope: &AccessScope,
285        am: E::ActiveModel,
286    ) -> Result<crate::secure::SecureInsertOne<E::ActiveModel, Scoped>, ScopeError>
287    where
288        E: ScopableEntity + EntityTrait,
289        E::Column: ColumnTrait + Copy,
290        E::ActiveModel: sea_orm::ActiveModelTrait<Entity = E> + Send,
291    {
292        use crate::secure::SecureInsertExt;
293        E::insert(am.clone()).secure().scope_with_model(scope, &am)
294    }
295
296    /// Insert a new entity with automatic tenant validation.
297    ///
298    /// This is a convenience wrapper around `secure_insert()` that uses
299    /// the provided security context.
300    ///
301    /// # Example
302    ///
303    /// ```ignore
304    /// let ctx = SecurityCtx::for_tenants(vec![tenant_id], user_id);
305    /// let am = user::ActiveModel {
306    ///     id: Set(Uuid::new_v4()),
307    ///     tenant_id: Set(tenant_id),
308    ///     owner_id: Set(ctx.subject_id),
309    ///     email: Set("user@example.com".to_string()),
310    ///     ..Default::default()
311    /// };
312    ///
313    /// let user = db.insert::<user::Entity>(&ctx, am).await?;
314    /// ```
315    ///
316    /// # Errors
317    ///
318    /// - `ScopeError::Invalid` if entity requires tenant but scope has none
319    /// - `ScopeError::Db` if database insert fails
320    pub async fn insert<E>(
321        &self,
322        scope: &AccessScope,
323        am: E::ActiveModel,
324    ) -> Result<E::Model, ScopeError>
325    where
326        E: ScopableEntity + EntityTrait,
327        E::Column: ColumnTrait + Copy,
328        E::ActiveModel: sea_orm::ActiveModelTrait<Entity = E> + Send,
329        E::Model: sea_orm::IntoActiveModel<E::ActiveModel>,
330    {
331        crate::secure::secure_insert::<E>(am, scope, self).await
332    }
333
334    /// Update a single entity with security scope validation.
335    ///
336    /// This method ensures the entity being updated is within the security scope
337    /// before performing the update. It validates that the record is accessible
338    /// based on tenant/resource constraints.
339    ///
340    /// # Security
341    ///
342    /// - Validates the entity exists and is accessible in the security scope
343    /// - Returns `ScopeError::Denied` if the entity is not in scope
344    /// - Ensures updates cannot affect entities outside the security boundary
345    ///
346    /// # Example
347    ///
348    /// ```ignore
349    /// let ctx = SecurityCtx::for_tenant(tenant_id, user_id);
350    ///
351    /// // Load and modify
352    /// let user_model = db.find_by_id::<user::Entity>(&ctx, id)?
353    ///     .one(db)
354    ///     .await?
355    ///     .ok_or(NotFound)?;
356    ///
357    /// let mut user: user::ActiveModel = user_model.into();
358    /// user.email = Set("newemail@example.com".to_string());
359    /// user.updated_at = Set(Utc::now());
360    ///
361    /// // Update with scope validation (pass ID separately)
362    /// let updated = db.update_with_ctx::<user::Entity>(&ctx, id, user).await?;
363    /// ```
364    ///
365    /// # Errors
366    ///
367    /// - `ScopeError::Denied` if the entity is not accessible in the current scope
368    /// - `ScopeError::Db` if the database operation fails
369    pub async fn update_with_ctx<E>(
370        &self,
371        scope: &AccessScope,
372        id: Uuid,
373        am: E::ActiveModel,
374    ) -> Result<E::Model, ScopeError>
375    where
376        E: ScopableEntity + EntityTrait,
377        E::Column: ColumnTrait + Copy,
378        E::ActiveModel: sea_orm::ActiveModelTrait<Entity = E> + Send,
379        E::Model: sea_orm::IntoActiveModel<E::ActiveModel> + sea_orm::ModelTrait<Entity = E>,
380    {
381        crate::secure::secure_update_with_scope::<E>(am, scope, id, self).await
382    }
383
384    /// Delete a single entity by ID (scoped).
385    ///
386    /// This validates the entity exists in scope before deleting.
387    ///
388    /// # Example
389    ///
390    /// ```ignore
391    /// let ctx = SecurityCtx::for_tenants(vec![tenant_id], user_id);
392    /// db.delete_by_id::<user::Entity>(&ctx, user_id).await?;
393    /// ```
394    ///
395    /// # Returns
396    ///
397    /// - `Ok(true)` if entity was deleted
398    /// - `Ok(false)` if entity not found in scope
399    ///
400    /// # Errors
401    ///
402    /// Returns `ScopeError::Invalid` if the entity does not have a `resource_col` defined.
403    pub async fn delete_by_id<E>(&self, scope: &AccessScope, id: Uuid) -> Result<bool, ScopeError>
404    where
405        E: ScopableEntity + EntityTrait,
406        E::Column: ColumnTrait + Copy,
407    {
408        let resource_col = E::resource_col().ok_or_else(|| {
409            ScopeError::Invalid("Entity must have a resource_col to use delete_by_id()")
410        })?;
411
412        let result = E::delete_many()
413            .filter(sea_orm::Condition::all().add(Expr::col(resource_col).eq(id)))
414            .secure()
415            .scope_with(scope)
416            .exec(self)
417            .await?;
418
419        Ok(result.rows_affected > 0)
420    }
421
422    // ========================================================================
423    // Transaction support
424    // ========================================================================
425
426    /// Execute a closure inside a database transaction.
427    ///
428    /// This method starts a `SeaORM` transaction, provides the transaction handle
429    /// to the closure as `&SecureTx`, and handles commit/rollback.
430    ///
431    /// # Return Type
432    ///
433    /// Returns `anyhow::Result<Result<T, E>>` where:
434    /// - Outer `Err`: Database/infrastructure error (transaction rolls back)
435    /// - Inner `Ok(T)`: Success (transaction commits)
436    /// - Inner `Err(E)`: Domain/validation error (transaction still commits)
437    ///
438    /// This design ensures domain validation errors don't cause rollback.
439    ///
440    /// # Architecture Note
441    ///
442    /// Transaction boundaries should be managed by **application/domain services**,
443    /// not by REST handlers. REST handlers should call service methods that
444    /// internally decide when to open transactions.
445    ///
446    /// # Example
447    ///
448    /// ```ignore
449    /// use modkit_db::secure::SecureConn;
450    ///
451    /// // In a domain service:
452    /// pub async fn create_user(
453    ///     db: &SecureConn,
454    ///     repo: &UsersRepo,
455    ///     user: User,
456    /// ) -> Result<User, DomainError> {
457    ///     let result = db.transaction(|conn| async move {
458    ///         // Check email uniqueness
459    ///         if repo.email_exists(conn, &user.email).await? {
460    ///             return Ok(Err(DomainError::EmailExists));
461    ///         }
462    ///         // Create user
463    ///         let created = repo.create(conn, user).await?;
464    ///         Ok(Ok(created))
465    ///     }).await?;
466    ///     result
467    /// }
468    /// ```
469    ///
470    /// # Security
471    ///
472    /// This method **consumes** `self` and returns it after the transaction completes.
473    /// This prevents accidental use of the outer connection inside the transaction,
474    /// making transaction bypass impossible by construction.
475    ///
476    /// Only `&SecureTx` is available inside the closure, ensuring all operations
477    /// execute within the transaction scope.
478    ///
479    /// # Returns
480    ///
481    /// Returns a tuple `(Self, Result<()>)` where:
482    /// - `Self` is the connection (always returned, even on error)
483    /// - `Result<()>` indicates transaction success or failure
484    ///
485    /// # Errors
486    ///
487    /// The `Result` component is `Err(anyhow::Error)` if:
488    /// - The transaction cannot be started
489    /// - A database operation fails (transaction is rolled back)
490    /// - The commit fails
491    pub async fn transaction<F>(self, f: F) -> (Self, anyhow::Result<()>)
492    where
493        F: for<'a> FnOnce(
494                &'a SecureTx<'a>,
495            )
496                -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'a>>
497            + Send,
498    {
499        let txn = match self.conn_internal().begin().await {
500            Ok(t) => t,
501            Err(e) => return (self, Err(e.into())),
502        };
503        let tx = SecureTx::new(&txn);
504
505        let res = f(&tx).await;
506
507        match res {
508            Ok(()) => match txn.commit().await {
509                Ok(()) => (self, Ok(())),
510                Err(e) => (self, Err(e.into())),
511            },
512            Err(e) => {
513                _ = txn.rollback().await;
514                (self, Err(e))
515            }
516        }
517    }
518
519    /// Execute a transaction and return both the connection and a result value.
520    ///
521    /// This method consumes `self` and returns both the connection and the result
522    /// from the transaction closure. Use this when you need to return data from
523    /// within the transaction.
524    ///
525    /// # Security
526    ///
527    /// Like [`transaction`](Self::transaction), this method prevents transaction bypass
528    /// by consuming `self`, making it impossible to access the outer connection
529    /// inside the transaction closure.
530    ///
531    /// # Example
532    ///
533    /// ```ignore
534    /// let (conn, result) = conn.transaction_with(|tx| {
535    ///     Box::pin(async move {
536    ///         let user = repo.create(tx, &scope, new_user).await?;
537    ///         Ok(user)
538    ///     })
539    /// }).await;
540    /// let user = result?;
541    /// ```
542    ///
543    /// # Returns
544    ///
545    /// Returns a tuple `(Self, Result<T>)` where:
546    /// - `Self` is the connection (always returned)
547    /// - `Result<T>` contains the transaction result or error
548    ///
549    /// # Errors
550    ///
551    /// The `Result` component is `Err(anyhow::Error)` if:
552    /// - The transaction cannot be started
553    /// - A database operation fails (transaction is rolled back)
554    /// - The commit fails
555    pub async fn transaction_with<T, F>(self, f: F) -> (Self, anyhow::Result<T>)
556    where
557        T: Send + 'static,
558        F: for<'a> FnOnce(
559                &'a SecureTx<'a>,
560            )
561                -> Pin<Box<dyn Future<Output = anyhow::Result<T>> + Send + 'a>>
562            + Send,
563    {
564        let txn = match self.conn_internal().begin().await {
565            Ok(t) => t,
566            Err(e) => return (self, Err(e.into())),
567        };
568        let tx = SecureTx::new(&txn);
569
570        let res = f(&tx).await;
571
572        match res {
573            Ok(v) => match txn.commit().await {
574                Ok(()) => (self, Ok(v)),
575                Err(e) => (self, Err(e.into())),
576            },
577            Err(e) => {
578                _ = txn.rollback().await;
579                (self, Err(e))
580            }
581        }
582    }
583
584    /// Execute a closure inside a database transaction with custom configuration.
585    ///
586    /// This method is similar to [`transaction`](Self::transaction), but allows
587    /// specifying the isolation level and access mode.
588    ///
589    /// # Configuration
590    ///
591    /// Use [`TxConfig`] to specify transaction settings without importing `SeaORM` types:
592    ///
593    /// ```ignore
594    /// use modkit_db::secure::{TxConfig, TxIsolationLevel, TxAccessMode};
595    ///
596    /// let cfg = TxConfig {
597    ///     isolation: Some(TxIsolationLevel::Serializable),
598    ///     access_mode: Some(TxAccessMode::ReadWrite),
599    /// };
600    /// ```
601    ///
602    /// # Example
603    ///
604    /// ```ignore
605    /// use modkit_db::secure::{SecureConn, TxConfig, TxIsolationLevel};
606    ///
607    /// // In a domain service requiring serializable isolation:
608    /// pub async fn reconcile_accounts(
609    ///     db: &SecureConn,
610    ///     repo: &AccountsRepo,
611    /// ) -> anyhow::Result<Result<ReconciliationResult, DomainError>> {
612    ///     let cfg = TxConfig::serializable();
613    ///
614    ///     db.transaction_with_config(cfg, |conn| async move {
615    ///         let accounts = repo.find_all_pending(conn).await?;
616    ///         for account in accounts {
617    ///             repo.reconcile(conn, &account).await?;
618    ///         }
619    ///         Ok(Ok(ReconciliationResult { processed: accounts.len() }))
620    ///     }).await
621    /// }
622    /// ```
623    ///
624    /// # Backend Notes
625    ///
626    /// - **`PostgreSQL`**: Full support for all isolation levels and access modes.
627    /// - **MySQL/InnoDB**: Full support for all isolation levels and access modes.
628    /// - **`SQLite`**: Only supports `Serializable` isolation. Other levels are
629    ///   mapped to `Serializable`. Read-only mode is a hint only.
630    ///
631    /// # Security
632    ///
633    /// This method consumes `self` and returns both the connection and result,
634    /// preventing transaction bypass by making the outer connection unavailable
635    /// inside the closure.
636    ///
637    /// # Returns
638    ///
639    /// Returns a tuple `(Self, Result<T>)` where:
640    /// - `Self` is the connection (always returned)
641    /// - `Result<T>` contains the transaction result or error
642    ///
643    /// # Errors
644    ///
645    /// The `Result` component is `Err(anyhow::Error)` if:
646    /// - The transaction cannot be started with the specified configuration
647    /// - A database operation fails (transaction is rolled back)
648    /// - The commit fails
649    pub async fn transaction_with_config<T, F>(
650        self,
651        cfg: TxConfig,
652        f: F,
653    ) -> (Self, anyhow::Result<T>)
654    where
655        T: Send + 'static,
656        F: for<'a> FnOnce(
657                &'a SecureTx<'a>,
658            )
659                -> Pin<Box<dyn Future<Output = anyhow::Result<T>> + Send + 'a>>
660            + Send,
661    {
662        let isolation: Option<IsolationLevel> = cfg.isolation.map(Into::into);
663        let access_mode: Option<AccessMode> = cfg.access_mode.map(Into::into);
664
665        let txn = match self
666            .conn_internal()
667            .begin_with_config(isolation, access_mode)
668            .await
669        {
670            Ok(t) => t,
671            Err(e) => return (self, Err(e.into())),
672        };
673        let tx = SecureTx::new(&txn);
674
675        let res = f(&tx).await;
676
677        match res {
678            Ok(v) => match txn.commit().await {
679                Ok(()) => (self, Ok(v)),
680                Err(e) => (self, Err(e.into())),
681            },
682            Err(e) => {
683                _ = txn.rollback().await;
684                (self, Err(e))
685            }
686        }
687    }
688
689    /// Execute a closure inside a typed domain transaction.
690    ///
691    /// This method returns [`TxError<E>`] which distinguishes domain errors from
692    /// infrastructure errors, allowing callers to handle them appropriately.
693    ///
694    /// # Error Handling
695    ///
696    /// - Domain errors returned from the closure are wrapped in `TxError::Domain(e)`
697    /// - Database infrastructure errors are wrapped in `TxError::Infra(InfraError)`
698    ///
699    /// Use [`TxError::into_domain`] to convert the result into your domain error type.
700    ///
701    /// # Example
702    ///
703    /// ```ignore
704    /// use modkit_db::secure::SecureConn;
705    ///
706    /// async fn create_user(db: &SecureConn, repo: &UsersRepo, user: User) -> Result<User, DomainError> {
707    ///     db.in_transaction(move |tx| Box::pin(async move {
708    ///         if repo.exists(tx, user.id).await? {
709    ///             return Err(DomainError::already_exists(user.id));
710    ///         }
711    ///         repo.create(tx, user).await
712    ///     }))
713    ///     .await
714    ///     .map_err(|e| e.into_domain(DomainError::database_infra))
715    /// }
716    /// ```
717    ///
718    /// # Security
719    ///
720    /// This method consumes `self` and returns both the connection and result,
721    /// preventing transaction bypass by making the outer connection unavailable
722    /// inside the closure.
723    ///
724    /// # Returns
725    ///
726    /// Returns a tuple `(Self, Result<T, TxError<E>>)` where:
727    /// - `Self` is the connection (always returned)
728    /// - `Result<T, TxError<E>>` contains the transaction result or error
729    ///
730    /// # Errors
731    ///
732    /// The `Result` component is `Err(TxError<E>)` if:
733    /// - The callback returns a domain error (`TxError::Domain(E)`).
734    /// - The transaction fails due to a database/infrastructure error (`TxError::Infra(InfraError)`).
735    pub async fn in_transaction<T, E, F>(self, f: F) -> (Self, Result<T, TxError<E>>)
736    where
737        T: Send + 'static,
738        E: std::fmt::Debug + std::fmt::Display + Send + 'static,
739        F: for<'a> FnOnce(
740                &'a SecureTx<'a>,
741            ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>
742            + Send,
743    {
744        let txn = match self.conn_internal().begin().await {
745            Ok(t) => t,
746            Err(e) => return (self, Err(TxError::Infra(InfraError::new(e.to_string())))),
747        };
748        let tx = SecureTx::new(&txn);
749
750        let res = f(&tx).await;
751
752        match res {
753            Ok(v) => match txn.commit().await {
754                Ok(()) => (self, Ok(v)),
755                Err(e) => (self, Err(TxError::Infra(InfraError::new(e.to_string())))),
756            },
757            Err(e) => {
758                _ = txn.rollback().await;
759                (self, Err(TxError::Domain(e)))
760            }
761        }
762    }
763
764    /// Execute a typed domain transaction with automatic infrastructure error mapping.
765    ///
766    /// This is a convenience wrapper around [`in_transaction`](Self::in_transaction) that
767    /// automatically converts [`TxError`] into the domain error type using the provided
768    /// mapping function for infrastructure errors.
769    ///
770    /// # Example
771    ///
772    /// ```ignore
773    /// use modkit_db::secure::SecureConn;
774    ///
775    /// async fn create_user(db: &SecureConn, repo: &UsersRepo, user: User) -> Result<User, DomainError> {
776    ///     db.in_transaction_mapped(DomainError::database_infra, move |tx| Box::pin(async move {
777    ///         if repo.exists(tx, user.id).await? {
778    ///             return Err(DomainError::already_exists(user.id));
779    ///         }
780    ///         repo.create(tx, user).await
781    ///     })).await
782    /// }
783    /// ```
784    ///
785    /// # Security
786    ///
787    /// This method consumes `self` and returns both the connection and result,
788    /// preventing transaction bypass.
789    ///
790    /// # Returns
791    ///
792    /// Returns a tuple `(Self, Result<T, E>)` where:
793    /// - `Self` is the connection (always returned)
794    /// - `Result<T, E>` contains the transaction result or mapped error
795    ///
796    /// # Errors
797    ///
798    /// The `Result` component is `Err(E)` if:
799    /// - The callback returns a domain error (`E`).
800    /// - The transaction fails due to a database/infrastructure error, mapped via `map_infra`.
801    pub async fn in_transaction_mapped<T, E, F, M>(self, map_infra: M, f: F) -> (Self, Result<T, E>)
802    where
803        T: Send + 'static,
804        E: std::fmt::Debug + std::fmt::Display + Send + 'static,
805        M: FnOnce(InfraError) -> E + Send,
806        F: for<'a> FnOnce(
807                &'a SecureTx<'a>,
808            ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>
809            + Send,
810    {
811        let (conn, result) = self.in_transaction(f).await;
812        (conn, result.map_err(|tx_err| tx_err.into_domain(map_infra)))
813    }
814}