Skip to main content

modkit_db/secure/
secure_conn.rs

1//! High-level secure database wrapper for ergonomic, type-safe access.
2//!
3//! This module provides `SecureConn`, a wrapper around `SeaORM`'s `DatabaseConnection`
4//! that enforces access control policies on all operations.
5//!
6//! # Design Philosophy
7//!
8//! Plugin/module developers should never handle raw `DatabaseConnection` or manually
9//! apply scopes. Instead, they receive a `SecureConn` instance that guarantees:
10//!
11//! - **Automatic scoping**: All queries are filtered by tenant/resource scope
12//! - **Type safety**: Cannot execute unscoped queries
13//! - **Ergonomics**: Simple, fluent API for common operations
14//!
15//! # Example
16//!
17//! ```ignore
18//! use modkit_db::secure::{SecureConn, SecurityCtx, AccessScope};
19//!
20//! pub struct UsersRepo<'a> {
21//!     db: &'a SecureConn,
22//! }
23//!
24//! impl<'a> UsersRepo<'a> {
25//!     pub async fn find_by_id(&self, id: Uuid) -> Result<Option<User>, ScopeError> {
26//!         let user = self.db
27//!             .find_by_id::<user::Entity>(id)?
28//!             .one(self.db.conn())
29//!             .await?;
30//!         Ok(user.map(Into::into))
31//!     }
32//!
33//!     pub async fn find_all(&self) -> Result<Vec<User>, ScopeError> {
34//!         let users = self.db
35//!             .find::<user::Entity>()?
36//!             .all(self.db.conn())
37//!             .await?;
38//!         Ok(users.into_iter().map(Into::into).collect())
39//!     }
40//!
41//!     pub async fn update_status(&self, status: String) -> Result<u64, ScopeError> {
42//!         let result = self.db
43//!             .update_many::<user::Entity>()?
44//!             .col_expr(user::Column::Status, Expr::value(status))
45//!             .exec(self.db.conn())
46//!             .await?;
47//!         Ok(result.rows_affected)
48//!     }
49//! }
50//! ```
51
52use std::{future::Future, pin::Pin};
53
54use sea_orm::{
55    AccessMode, ActiveModelTrait, ColumnTrait, ConnectionTrait, DatabaseConnection,
56    DatabaseTransaction, DbErr, EntityTrait, IsolationLevel, QueryFilter, TransactionTrait,
57    sea_query::Expr,
58};
59use uuid::Uuid;
60
61use crate::secure::tx_error::{InfraError, TxError};
62
63use modkit_security::AccessScope;
64
65use crate::secure::tx_config::TxConfig;
66
67use crate::secure::{ScopableEntity, ScopeError, Scoped, SecureEntityExt, SecureSelect};
68
69use crate::secure::db_ops::{SecureDeleteExt, SecureDeleteMany, SecureUpdateExt, SecureUpdateMany};
70
71/// Secure database connection wrapper.
72///
73/// This is the primary interface for module developers to access the database.
74/// All operations require a `SecurityCtx` parameter for per-request access control.
75///
76/// # Usage
77///
78/// Module services receive a `&SecureConn` and provide `SecurityCtx` per-request:
79///
80/// ```ignore
81/// pub struct MyService<'a> {
82///     db: &'a SecureConn,
83/// }
84///
85/// impl<'a> MyService<'a> {
86///     pub async fn get_user(&self, scope: &AccessScope, id: Uuid) -> Result<Option<User>> {
87///         self.db.find_by_id::<user::Entity>(ctx, id)?
88///             .one(self.db.conn())
89///             .await
90///     }
91/// }
92/// ```
93///
94/// # Security Guarantees
95///
96/// - All queries require `SecurityCtx` from the request
97/// - Queries are scoped by tenant/resource from the context
98/// - Empty scopes result in deny-all (no data returned)
99/// - Type system prevents unscoped queries from compiling
100/// - Cannot bypass security without `insecure-escape` feature
101#[derive(Clone)]
102pub struct SecureConn {
103    conn: DatabaseConnection,
104}
105
106impl SecureConn {
107    /// Create a new secure database connection wrapper.
108    ///
109    /// Typically created via `DbHandle::sea_secure()` rather than directly.
110    #[must_use]
111    pub fn new(conn: DatabaseConnection) -> Self {
112        Self { conn }
113    }
114
115    /// Get a reference to the underlying database connection.
116    ///
117    /// # Safety
118    ///
119    /// Use with caution. Direct connection access bypasses automatic scoping.
120    /// Prefer the high-level methods (`find`, `update_many`, etc.) whenever possible.
121    ///
122    /// Valid use cases:
123    /// - Executing already-scoped queries (`.one()`, `.all()`, `.exec()`)
124    /// - Complex joins that need custom `SeaORM` building
125    /// - Internal infrastructure code (not module business logic)
126    #[must_use]
127    pub fn conn(&self) -> &DatabaseConnection {
128        &self.conn
129    }
130
131    /// Return database engine identifier for tracing / logging.
132    #[must_use]
133    pub fn db_engine(&self) -> &'static str {
134        use sea_orm::DatabaseBackend;
135
136        match self.conn.get_database_backend() {
137            DatabaseBackend::Postgres => "postgres",
138            DatabaseBackend::MySql => "mysql",
139            DatabaseBackend::Sqlite => "sqlite",
140        }
141    }
142
143    /// Create a scoped select query for the given entity.
144    ///
145    /// Returns a `SecureSelect<E, Scoped>` that automatically applies
146    /// tenant/resource filtering based on the provided security context.
147    ///
148    /// # Example
149    ///
150    /// ```ignore
151    /// let ctx = SecurityCtx::for_tenants(vec![tenant_id], user_id);
152    /// let users = db.find::<user::Entity>(&ctx)?
153    ///     .filter(user::Column::Status.eq("active"))
154    ///     .order_by_asc(user::Column::Email)
155    ///     .all(db.conn())
156    ///     .await?;
157    /// ```
158    ///
159    /// # Errors
160    ///
161    #[allow(clippy::unused_self)] // Keep fluent &SecureConn API even when method only delegates
162    pub fn find<E>(&self, scope: &AccessScope) -> SecureSelect<E, Scoped>
163    where
164        E: ScopableEntity + EntityTrait,
165        E::Column: ColumnTrait + Copy,
166    {
167        E::find().secure().scope_with(scope)
168    }
169
170    /// Create a scoped select query filtered by a specific resource ID.
171    ///
172    /// This is a convenience method that combines `find()` with `.and_id()`.
173    ///
174    /// # Example
175    ///
176    /// ```ignore
177    /// let ctx = SecurityCtx::for_tenants(vec![tenant_id], user_id);
178    /// let user = db.find_by_id::<user::Entity>(&ctx, user_id)?
179    ///     .one(db.conn())
180    ///     .await?;
181    /// ```
182    ///
183    /// # Errors
184    /// Returns `ScopeError` if the entity doesn't have a resource column or scoping fails.
185    pub fn find_by_id<E>(
186        &self,
187        scope: &AccessScope,
188        id: Uuid,
189    ) -> Result<SecureSelect<E, Scoped>, ScopeError>
190    where
191        E: ScopableEntity + EntityTrait,
192        E::Column: ColumnTrait + Copy,
193    {
194        self.find::<E>(scope).and_id(id)
195    }
196
197    /// Create a scoped update query for the given entity.
198    ///
199    /// Returns a `SecureUpdateMany<E, Scoped>` that automatically applies
200    /// tenant/resource filtering. Use `.col_expr()` or other `SeaORM` methods
201    /// to specify what to update.
202    ///
203    /// # Example
204    ///
205    /// ```ignore
206    /// let ctx = SecurityCtx::for_tenants(vec![tenant_id], user_id);
207    /// let result = db.update_many::<user::Entity>(&ctx)?
208    ///     .col_expr(user::Column::Status, Expr::value("active"))
209    ///     .col_expr(user::Column::UpdatedAt, Expr::value(Utc::now()))
210    ///     .exec(db.conn())
211    ///     .await?;
212    /// println!("Updated {} rows", result.rows_affected);
213    /// ```
214    ///
215    #[allow(clippy::unused_self)] // Delegates but matches the rest of the connection API
216    #[must_use]
217    pub fn update_many<E>(&self, scope: &AccessScope) -> SecureUpdateMany<E, Scoped>
218    where
219        E: ScopableEntity + EntityTrait,
220        E::Column: ColumnTrait + Copy,
221    {
222        E::update_many().secure().scope_with(scope)
223    }
224
225    /// Create a scoped delete query for the given entity.
226    ///
227    /// Returns a `SecureDeleteMany<E, Scoped>` that automatically applies
228    /// tenant/resource filtering.
229    ///
230    /// # Example
231    ///
232    /// ```ignore
233    /// let ctx = SecurityCtx::for_tenants(vec![tenant_id], user_id);
234    /// let result = db.delete_many::<user::Entity>(&ctx)?
235    ///     .exec(db.conn())
236    ///     .await?;
237    /// println!("Deleted {} rows", result.rows_affected);
238    /// ```
239    ///
240    #[allow(clippy::unused_self)] // Retain method-style ergonomics for callers of SecureConn
241    #[must_use]
242    pub fn delete_many<E>(&self, scope: &AccessScope) -> SecureDeleteMany<E, Scoped>
243    where
244        E: ScopableEntity + EntityTrait,
245        E::Column: ColumnTrait + Copy,
246    {
247        E::delete_many().secure().scope_with(scope)
248    }
249
250    /// Insert a new entity with automatic tenant validation.
251    ///
252    /// This is a convenience wrapper around `secure_insert()` that uses
253    /// the provided security context.
254    ///
255    /// # Example
256    ///
257    /// ```ignore
258    /// let ctx = SecurityCtx::for_tenants(vec![tenant_id], user_id);
259    /// let am = user::ActiveModel {
260    ///     id: Set(Uuid::new_v4()),
261    ///     tenant_id: Set(tenant_id),
262    ///     owner_id: Set(ctx.subject_id),
263    ///     email: Set("user@example.com".to_string()),
264    ///     ..Default::default()
265    /// };
266    ///
267    /// let user = db.insert::<user::Entity>(&ctx, am).await?;
268    /// ```
269    ///
270    /// # Errors
271    ///
272    /// - `ScopeError::Invalid` if entity requires tenant but scope has none
273    /// - `ScopeError::Db` if database insert fails
274    pub async fn insert<E>(
275        &self,
276        scope: &AccessScope,
277        am: E::ActiveModel,
278    ) -> Result<E::Model, ScopeError>
279    where
280        E: ScopableEntity + EntityTrait,
281        E::Column: ColumnTrait + Copy,
282        E::ActiveModel: sea_orm::ActiveModelTrait<Entity = E> + Send,
283        E::Model: sea_orm::IntoActiveModel<E::ActiveModel>,
284    {
285        crate::secure::secure_insert::<E>(am, scope, &self.conn).await
286    }
287
288    /// Update a single entity by ID (unscoped).
289    ///
290    /// **Warning**: This method does NOT validate security scope.
291    /// Use `update_with_ctx()` for scope-validated updates.
292    ///
293    /// This is a convenience method for the common pattern of updating one record
294    /// when you've already validated access separately.
295    ///
296    /// # Example
297    ///
298    /// ```ignore
299    /// let mut user: user::ActiveModel = db.find_by_id::<user::Entity>(id)?
300    ///     .one(db.conn())
301    ///     .await?
302    ///     .ok_or(NotFound)?
303    ///     .into();
304    ///
305    /// user.email = Set("newemail@example.com".to_string());
306    /// user.updated_at = Set(Utc::now());
307    ///
308    /// let updated = db.update_one(user).await?;
309    /// ```
310    ///
311    /// # Errors
312    /// Returns `ScopeError::Db` if the database update fails.
313    pub async fn update_one<E>(&self, am: E::ActiveModel) -> Result<E::Model, ScopeError>
314    where
315        E: EntityTrait,
316        E::ActiveModel: sea_orm::ActiveModelTrait<Entity = E> + Send,
317        E::Model: sea_orm::IntoActiveModel<E::ActiveModel>,
318    {
319        Ok(am.update(&self.conn).await?)
320    }
321
322    /// Update a single entity with security scope validation.
323    ///
324    /// This method ensures the entity being updated is within the security scope
325    /// before performing the update. It validates that the record is accessible
326    /// based on tenant/resource constraints.
327    ///
328    /// # Security
329    ///
330    /// - Validates the entity exists and is accessible in the security scope
331    /// - Returns `ScopeError::Denied` if the entity is not in scope
332    /// - Ensures updates cannot affect entities outside the security boundary
333    ///
334    /// # Example
335    ///
336    /// ```ignore
337    /// let ctx = SecurityCtx::for_tenant(tenant_id, user_id);
338    ///
339    /// // Load and modify
340    /// let user_model = db.find_by_id::<user::Entity>(&ctx, id)?
341    ///     .one(db.conn())
342    ///     .await?
343    ///     .ok_or(NotFound)?;
344    ///
345    /// let mut user: user::ActiveModel = user_model.into();
346    /// user.email = Set("newemail@example.com".to_string());
347    /// user.updated_at = Set(Utc::now());
348    ///
349    /// // Update with scope validation (pass ID separately)
350    /// let updated = db.update_with_ctx::<user::Entity>(&ctx, id, user).await?;
351    /// ```
352    ///
353    /// # Errors
354    ///
355    /// - `ScopeError::Denied` if the entity is not accessible in the current scope
356    /// - `ScopeError::Db` if the database operation fails
357    pub async fn update_with_ctx<E>(
358        &self,
359        scope: &AccessScope,
360        id: Uuid,
361        am: E::ActiveModel,
362    ) -> Result<E::Model, ScopeError>
363    where
364        E: ScopableEntity + EntityTrait,
365        E::Column: ColumnTrait + Copy,
366        E::ActiveModel: sea_orm::ActiveModelTrait<Entity = E> + Send,
367        E::Model: sea_orm::IntoActiveModel<E::ActiveModel>,
368    {
369        let exists = self
370            .find_by_id::<E>(scope, id)?
371            .one(&self.conn)
372            .await?
373            .is_some();
374
375        if !exists {
376            return Err(ScopeError::Denied(
377                "entity not found or not accessible in current security scope",
378            ));
379        }
380
381        Ok(am.update(&self.conn).await?)
382    }
383
384    /// Delete a single entity by ID (scoped).
385    ///
386    /// This validates the entity exists in scope before deleting.
387    ///
388    /// # Example
389    ///
390    /// ```ignore
391    /// let ctx = SecurityCtx::for_tenants(vec![tenant_id], user_id);
392    /// db.delete_by_id::<user::Entity>(&ctx, user_id).await?;
393    /// ```
394    ///
395    /// # Returns
396    ///
397    /// - `Ok(true)` if entity was deleted
398    /// - `Ok(false)` if entity not found in scope
399    ///
400    /// # Errors
401    ///
402    /// Returns `ScopeError::Invalid` if the entity does not have a `resource_col` defined.
403    pub async fn delete_by_id<E>(&self, scope: &AccessScope, id: Uuid) -> Result<bool, ScopeError>
404    where
405        E: ScopableEntity + EntityTrait,
406        E::Column: ColumnTrait + Copy,
407    {
408        let resource_col = E::resource_col().ok_or_else(|| {
409            ScopeError::Invalid("Entity must have a resource_col to use delete_by_id()")
410        })?;
411
412        let result = E::delete_many()
413            .filter(sea_orm::Condition::all().add(Expr::col(resource_col).eq(id)))
414            .secure()
415            .scope_with(scope)
416            .exec(&self.conn)
417            .await?;
418
419        Ok(result.rows_affected > 0)
420    }
421
422    // ========================================================================
423    // Transaction support
424    // ========================================================================
425
426    /// Execute a closure inside a database transaction.
427    ///
428    /// This method starts a `SeaORM` transaction, provides the transaction handle
429    /// to the closure as `&dyn ConnectionTrait`, and handles commit/rollback.
430    ///
431    /// # Return Type
432    ///
433    /// Returns `anyhow::Result<Result<T, E>>` where:
434    /// - Outer `Err`: Database/infrastructure error (transaction rolls back)
435    /// - Inner `Ok(T)`: Success (transaction commits)
436    /// - Inner `Err(E)`: Domain/validation error (transaction still commits)
437    ///
438    /// This design ensures domain validation errors don't cause rollback.
439    ///
440    /// # Architecture Note
441    ///
442    /// Transaction boundaries should be managed by **application/domain services**,
443    /// not by REST handlers. REST handlers should call service methods that
444    /// internally decide when to open transactions.
445    ///
446    /// # Example
447    ///
448    /// ```ignore
449    /// use modkit_db::secure::SecureConn;
450    ///
451    /// // In a domain service:
452    /// pub async fn create_user(
453    ///     db: &SecureConn,
454    ///     repo: &UsersRepo,
455    ///     user: User,
456    /// ) -> Result<User, DomainError> {
457    ///     let result = db.transaction(|conn| async move {
458    ///         // Check email uniqueness
459    ///         if repo.email_exists(conn, &user.email).await? {
460    ///             return Ok(Err(DomainError::EmailExists));
461    ///         }
462    ///         // Create user
463    ///         let created = repo.create(conn, user).await?;
464    ///         Ok(Ok(created))
465    ///     }).await?;
466    ///     result
467    /// }
468    /// ```
469    ///
470    /// # Errors
471    ///
472    /// Returns `Err(anyhow::Error)` if:
473    /// - The transaction cannot be started
474    /// - A database operation fails (transaction is rolled back)
475    /// - The commit fails
476    pub async fn transaction<T, F>(&self, f: F) -> anyhow::Result<T>
477    where
478        T: Send + 'static,
479        F: for<'c> FnOnce(
480                &'c DatabaseTransaction,
481            )
482                -> Pin<Box<dyn Future<Output = anyhow::Result<T>> + Send + 'c>>
483            + Send,
484    {
485        self.conn
486            .transaction::<_, T, DbErr>(|txn| {
487                let fut = f(txn);
488                Box::pin(async move {
489                    fut.await
490                        .map_err(|e| DbErr::Custom(format!("transaction callback failed: {e:#}")))
491                })
492            })
493            .await
494            .map_err(|e| anyhow::anyhow!("transaction failed: {e}"))
495    }
496
497    /// Execute a closure inside a database transaction with custom configuration.
498    ///
499    /// This method is similar to [`transaction`](Self::transaction), but allows
500    /// specifying the isolation level and access mode.
501    ///
502    /// # Configuration
503    ///
504    /// Use [`TxConfig`] to specify transaction settings without importing `SeaORM` types:
505    ///
506    /// ```ignore
507    /// use modkit_db::secure::{TxConfig, TxIsolationLevel, TxAccessMode};
508    ///
509    /// let cfg = TxConfig {
510    ///     isolation: Some(TxIsolationLevel::Serializable),
511    ///     access_mode: Some(TxAccessMode::ReadWrite),
512    /// };
513    /// ```
514    ///
515    /// # Example
516    ///
517    /// ```ignore
518    /// use modkit_db::secure::{SecureConn, TxConfig, TxIsolationLevel};
519    ///
520    /// // In a domain service requiring serializable isolation:
521    /// pub async fn reconcile_accounts(
522    ///     db: &SecureConn,
523    ///     repo: &AccountsRepo,
524    /// ) -> anyhow::Result<Result<ReconciliationResult, DomainError>> {
525    ///     let cfg = TxConfig::serializable();
526    ///
527    ///     db.transaction_with_config(cfg, |conn| async move {
528    ///         let accounts = repo.find_all_pending(conn).await?;
529    ///         for account in accounts {
530    ///             repo.reconcile(conn, &account).await?;
531    ///         }
532    ///         Ok(Ok(ReconciliationResult { processed: accounts.len() }))
533    ///     }).await
534    /// }
535    /// ```
536    ///
537    /// # Backend Notes
538    ///
539    /// - **`PostgreSQL`**: Full support for all isolation levels and access modes.
540    /// - **MySQL/InnoDB**: Full support for all isolation levels and access modes.
541    /// - **`SQLite`**: Only supports `Serializable` isolation. Other levels are
542    ///   mapped to `Serializable`. Read-only mode is a hint only.
543    ///
544    /// # Errors
545    ///
546    /// Returns `Err(anyhow::Error)` if:
547    /// - The transaction cannot be started with the specified configuration
548    /// - A database operation fails (transaction is rolled back)
549    /// - The commit fails
550    pub async fn transaction_with_config<T, F>(&self, cfg: TxConfig, f: F) -> anyhow::Result<T>
551    where
552        T: Send + 'static,
553        F: for<'c> FnOnce(
554                &'c DatabaseTransaction,
555            )
556                -> Pin<Box<dyn Future<Output = anyhow::Result<T>> + Send + 'c>>
557            + Send,
558    {
559        let isolation: Option<IsolationLevel> = cfg.isolation.map(Into::into);
560        let access_mode: Option<AccessMode> = cfg.access_mode.map(Into::into);
561
562        self.conn
563            .transaction_with_config::<_, T, DbErr>(
564                |txn| {
565                    let fut = f(txn);
566                    Box::pin(async move {
567                        fut.await.map_err(|e| {
568                            DbErr::Custom(format!("transaction callback failed: {e:#}"))
569                        })
570                    })
571                },
572                isolation,
573                access_mode,
574            )
575            .await
576            .map_err(|e| anyhow::anyhow!("transaction_with_config failed: {e}"))
577    }
578
579    /// Execute a closure inside a typed domain transaction.
580    ///
581    /// This method returns [`TxError<E>`] which distinguishes domain errors from
582    /// infrastructure errors, allowing callers to handle them appropriately.
583    ///
584    /// # Error Handling
585    ///
586    /// - Domain errors returned from the closure are wrapped in `TxError::Domain(e)`
587    /// - Database infrastructure errors are wrapped in `TxError::Infra(InfraError)`
588    ///
589    /// Use [`TxError::into_domain`] to convert the result into your domain error type.
590    ///
591    /// # Example
592    ///
593    /// ```ignore
594    /// use modkit_db::secure::SecureConn;
595    ///
596    /// async fn create_user(db: &SecureConn, repo: &UsersRepo, user: User) -> Result<User, DomainError> {
597    ///     db.in_transaction(move |tx| Box::pin(async move {
598    ///         if repo.exists(tx, user.id).await? {
599    ///             return Err(DomainError::already_exists(user.id));
600    ///         }
601    ///         repo.create(tx, user).await
602    ///     }))
603    ///     .await
604    ///     .map_err(|e| e.into_domain(DomainError::database_infra))
605    /// }
606    /// ```
607    ///
608    /// # Errors
609    ///
610    /// Returns `Err(TxError<E>)` if:
611    /// - The callback returns a domain error (`TxError::Domain(E)`).
612    /// - The transaction fails due to a database/infrastructure error (`TxError::Infra(InfraError)`).
613    pub async fn in_transaction<T, E, F>(&self, f: F) -> Result<T, TxError<E>>
614    where
615        T: Send + 'static,
616        E: std::fmt::Debug + std::fmt::Display + Send + 'static,
617        F: for<'c> FnOnce(
618                &'c DatabaseTransaction,
619            ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'c>>
620            + Send,
621    {
622        self.conn
623            .transaction::<_, T, TxError<E>>(|txn| {
624                let fut = f(txn);
625                Box::pin(async move { fut.await.map_err(TxError::Domain) })
626            })
627            .await
628            .map_err(|e| match e {
629                sea_orm::TransactionError::Transaction(tx_err) => tx_err,
630                sea_orm::TransactionError::Connection(db_err) => {
631                    TxError::Infra(InfraError::new(db_err.to_string()))
632                }
633            })
634    }
635
636    /// Execute a typed domain transaction with automatic infrastructure error mapping.
637    ///
638    /// This is a convenience wrapper around [`in_transaction`](Self::in_transaction) that
639    /// automatically converts [`TxError`] into the domain error type using the provided
640    /// mapping function for infrastructure errors.
641    ///
642    /// # Example
643    ///
644    /// ```ignore
645    /// use modkit_db::secure::SecureConn;
646    ///
647    /// async fn create_user(db: &SecureConn, repo: &UsersRepo, user: User) -> Result<User, DomainError> {
648    ///     db.in_transaction_mapped(DomainError::database_infra, move |tx| Box::pin(async move {
649    ///         if repo.exists(tx, user.id).await? {
650    ///             return Err(DomainError::already_exists(user.id));
651    ///         }
652    ///         repo.create(tx, user).await
653    ///     })).await
654    /// }
655    /// ```
656    ///
657    /// # Errors
658    ///
659    /// Returns `Err(E)` if:
660    /// - The callback returns a domain error (`E`).
661    /// - The transaction fails due to a database/infrastructure error, mapped via `map_infra`.
662    pub async fn in_transaction_mapped<T, E, F, M>(&self, map_infra: M, f: F) -> Result<T, E>
663    where
664        T: Send + 'static,
665        E: std::fmt::Debug + std::fmt::Display + Send + 'static,
666        M: FnOnce(InfraError) -> E + Send,
667        F: for<'c> FnOnce(
668                &'c DatabaseTransaction,
669            ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'c>>
670            + Send,
671    {
672        self.in_transaction(f)
673            .await
674            .map_err(|tx_err| tx_err.into_domain(map_infra))
675    }
676}