modkit_db/secure/secure_conn.rs
1//! High-level secure database wrapper for ergonomic, type-safe access.
2//!
3//! This module provides `SecureConn`, a wrapper around a private `SeaORM` connection
4//! that enforces access control policies on all operations.
5//!
6//! Plugin/module developers should never handle raw `DatabaseConnection` or manually
7//! apply scopes. Instead, they receive a `SecureConn` instance that guarantees:
8//!
9//! - **Automatic scoping**: All queries are filtered by tenant/resource scope
10//! - **Type safety**: Cannot execute unscoped queries
11//! - **Ergonomics**: Simple, fluent API for common operations
12//!
13//! # Example
14//!
15//! ```ignore
16//! use modkit_db::secure::{SecureConn, SecurityCtx, AccessScope};
17//!
18//! pub struct UsersRepo<'a> {
19//! db: &'a SecureConn,
20//! }
21//!
22//! impl<'a> UsersRepo<'a> {
23//! pub async fn find_by_id(&self, id: Uuid) -> Result<Option<User>, ScopeError> {
24//! let user = self.db
25//! .find_by_id::<user::Entity>(id)?
26//! .one(self.db)
27//! .await?;
28//! Ok(user.map(Into::into))
29//! }
30//!
31//! pub async fn find_all(&self) -> Result<Vec<User>, ScopeError> {
32//! let users = self.db
33//! .find::<user::Entity>()?
34//! .all(self.db)
35//! .await?;
36//! Ok(users.into_iter().map(Into::into).collect())
37//! }
38//!
39//! pub async fn update_status(&self, status: String) -> Result<u64, ScopeError> {
40//! let result = self.db
41//! .update_many::<user::Entity>()?
42//! .col_expr(user::Column::Status, Expr::value(status))
43//! .exec(self.db)
44//! .await?;
45//! Ok(result.rows_affected)
46//! }
47//! }
48//! ```
49
50use std::{future::Future, pin::Pin};
51
52use sea_orm::{
53 AccessMode, ColumnTrait, ConnectionTrait, DatabaseConnection, DatabaseTransaction, EntityTrait,
54 IsolationLevel, QueryFilter, TransactionTrait, sea_query::Expr,
55};
56use uuid::Uuid;
57
58use crate::secure::tx_error::{InfraError, TxError};
59
60use modkit_security::AccessScope;
61
62use crate::secure::tx_config::TxConfig;
63
64use crate::secure::{ScopableEntity, ScopeError, Scoped, SecureEntityExt, SecureSelect};
65
66use crate::secure::db_ops::{SecureDeleteExt, SecureDeleteMany, SecureUpdateExt, SecureUpdateMany};
67
68/// Secure transaction wrapper (capability).
69///
70/// This type intentionally does not expose any raw transaction or executor API.
71pub struct SecureTx<'a> {
72 pub(crate) tx: &'a DatabaseTransaction,
73}
74
75impl<'a> SecureTx<'a> {
76 #[must_use]
77 pub(crate) fn new(tx: &'a DatabaseTransaction) -> Self {
78 Self { tx }
79 }
80}
81
82/// Secure database connection wrapper.
83///
84/// This is the primary interface for module developers to access the database.
85/// All operations require a `SecurityCtx` parameter for per-request access control.
86///
87/// # Usage
88///
89/// Module services receive a `&SecureConn` and provide `SecurityCtx` per-request:
90///
91/// ```ignore
92/// pub struct MyService<'a> {
93/// db: &'a SecureConn,
94/// }
95///
96/// impl<'a> MyService<'a> {
97/// pub async fn get_user(&self, scope: &AccessScope, id: Uuid) -> Result<Option<User>> {
98/// self.db.find_by_id::<user::Entity>(ctx, id)?
99/// .one(self.db)
100/// .await
101/// }
102/// }
103/// ```
104///
105/// # Security Guarantees
106///
107/// - All queries require `SecurityCtx` from the request
108/// - Queries are scoped by tenant/resource from the context
109/// - Empty scopes result in deny-all (no data returned)
110/// - Type system prevents unscoped queries from compiling
111/// - Modules cannot access raw database connections
112pub struct SecureConn {
113 pub(crate) conn: DatabaseConnection,
114}
115
116impl SecureConn {
117 /// Create a new secure database connection wrapper.
118 /// Internal-only accessor to the raw database connection.
119 ///
120 /// # Security
121 ///
122 /// This MUST NOT be exposed publicly. Any public raw access to the underlying database
123 /// handle would allow bypassing scoping and tenant isolation.
124 #[must_use]
125 pub(crate) fn conn_internal(&self) -> &DatabaseConnection {
126 &self.conn
127 }
128
129 /// Return database engine identifier for tracing / logging.
130 #[must_use]
131 pub fn db_engine(&self) -> &'static str {
132 use sea_orm::DbBackend;
133
134 match self.conn.get_database_backend() {
135 DbBackend::Postgres => "postgres",
136 DbBackend::MySql => "mysql",
137 DbBackend::Sqlite => "sqlite",
138 }
139 }
140
141 /// Create a scoped select query for the given entity.
142 ///
143 /// Returns a `SecureSelect<E, Scoped>` that automatically applies
144 /// tenant/resource filtering based on the provided security context.
145 ///
146 /// # Example
147 ///
148 /// ```ignore
149 /// let ctx = SecurityCtx::for_tenants(vec![tenant_id], user_id);
150 /// let users = db.find::<user::Entity>(&ctx)?
151 /// .filter(user::Column::Status.eq("active"))
152 /// .order_by_asc(user::Column::Email)
153 /// .all(db)
154 /// .await?;
155 /// ```
156 ///
157 /// # Errors
158 ///
159 #[allow(clippy::unused_self)] // Keep fluent &SecureConn API even when method only delegates
160 pub fn find<E>(&self, scope: &AccessScope) -> SecureSelect<E, Scoped>
161 where
162 E: ScopableEntity + EntityTrait,
163 E::Column: ColumnTrait + Copy,
164 {
165 E::find().secure().scope_with(scope)
166 }
167
168 /// Create a scoped select query filtered by a specific resource ID.
169 ///
170 /// This is a convenience method that combines `find()` with `.and_id()`.
171 ///
172 /// # Example
173 ///
174 /// ```ignore
175 /// let ctx = SecurityCtx::for_tenants(vec![tenant_id], user_id);
176 /// let user = db.find_by_id::<user::Entity>(&ctx, user_id)?
177 /// .one(db)
178 /// .await?;
179 /// ```
180 ///
181 /// # Errors
182 /// Returns `ScopeError` if the entity doesn't have a resource column or scoping fails.
183 pub fn find_by_id<E>(
184 &self,
185 scope: &AccessScope,
186 id: Uuid,
187 ) -> Result<SecureSelect<E, Scoped>, ScopeError>
188 where
189 E: ScopableEntity + EntityTrait,
190 E::Column: ColumnTrait + Copy,
191 {
192 self.find::<E>(scope).and_id(id)
193 }
194
195 /// Create a scoped update query for the given entity.
196 ///
197 /// Returns a `SecureUpdateMany<E, Scoped>` that automatically applies
198 /// tenant/resource filtering. Use `.col_expr()` or other `SeaORM` methods
199 /// to specify what to update.
200 ///
201 /// # Example
202 ///
203 /// ```ignore
204 /// let ctx = SecurityCtx::for_tenants(vec![tenant_id], user_id);
205 /// let result = db.update_many::<user::Entity>(&ctx)?
206 /// .col_expr(user::Column::Status, Expr::value("active"))
207 /// .col_expr(user::Column::UpdatedAt, Expr::value(Utc::now()))
208 /// .exec(db)
209 /// .await?;
210 /// println!("Updated {} rows", result.rows_affected);
211 /// ```
212 ///
213 #[allow(clippy::unused_self)] // Delegates but matches the rest of the connection API
214 #[must_use]
215 pub fn update_many<E>(&self, scope: &AccessScope) -> SecureUpdateMany<E, Scoped>
216 where
217 E: ScopableEntity + EntityTrait,
218 E::Column: ColumnTrait + Copy,
219 {
220 E::update_many().secure().scope_with(scope)
221 }
222
223 /// Create a scoped delete query for the given entity.
224 ///
225 /// Returns a `SecureDeleteMany<E, Scoped>` that automatically applies
226 /// tenant/resource filtering.
227 ///
228 /// # Example
229 ///
230 /// ```ignore
231 /// let ctx = SecurityCtx::for_tenants(vec![tenant_id], user_id);
232 /// let result = db.delete_many::<user::Entity>(&ctx)?
233 /// .exec(db)
234 /// .await?;
235 /// println!("Deleted {} rows", result.rows_affected);
236 /// ```
237 ///
238 #[allow(clippy::unused_self)] // Retain method-style ergonomics for callers of SecureConn
239 #[must_use]
240 pub fn delete_many<E>(&self, scope: &AccessScope) -> SecureDeleteMany<E, Scoped>
241 where
242 E: ScopableEntity + EntityTrait,
243 E::Column: ColumnTrait + Copy,
244 {
245 E::delete_many().secure().scope_with(scope)
246 }
247
248 /// Create a scoped insert builder with `on_conflict()` support.
249 ///
250 /// Unlike the simpler `insert()` method, this returns a builder that allows
251 /// setting `on_conflict()` for upsert semantics while still enforcing
252 /// tenant validation through the secure typestate pattern.
253 ///
254 /// # Example
255 ///
256 /// ```ignore
257 /// use sea_orm::sea_query::OnConflict;
258 ///
259 /// let scope = AccessScope::tenants_only(vec![tenant_id]);
260 /// let am = settings::ActiveModel {
261 /// tenant_id: Set(tenant_id),
262 /// user_id: Set(user_id),
263 /// theme: Set(Some("dark".to_string())),
264 /// ..Default::default()
265 /// };
266 ///
267 /// db.insert_one(&scope, am)?
268 /// .on_conflict(
269 /// OnConflict::columns([Column::TenantId, Column::UserId])
270 /// .update_columns([Column::Theme])
271 /// .to_owned()
272 /// )
273 /// .exec(db)
274 /// .await?;
275 /// ```
276 ///
277 /// # Errors
278 ///
279 /// - `ScopeError::Invalid` if `tenant_id` is not set for tenant-scoped entities
280 /// - `ScopeError::TenantNotInScope` if `tenant_id` is not in the provided scope
281 #[allow(clippy::needless_pass_by_value)] // We clone for insert and borrow for validation
282 pub fn insert_one<E>(
283 &self,
284 scope: &AccessScope,
285 am: E::ActiveModel,
286 ) -> Result<crate::secure::SecureInsertOne<E::ActiveModel, Scoped>, ScopeError>
287 where
288 E: ScopableEntity + EntityTrait,
289 E::Column: ColumnTrait + Copy,
290 E::ActiveModel: sea_orm::ActiveModelTrait<Entity = E> + Send,
291 {
292 use crate::secure::SecureInsertExt;
293 E::insert(am.clone()).secure().scope_with_model(scope, &am)
294 }
295
296 /// Insert a new entity with automatic tenant validation.
297 ///
298 /// This is a convenience wrapper around `secure_insert()` that uses
299 /// the provided security context.
300 ///
301 /// # Example
302 ///
303 /// ```ignore
304 /// let ctx = SecurityCtx::for_tenants(vec![tenant_id], user_id);
305 /// let am = user::ActiveModel {
306 /// id: Set(Uuid::new_v4()),
307 /// tenant_id: Set(tenant_id),
308 /// owner_id: Set(ctx.subject_id),
309 /// email: Set("user@example.com".to_string()),
310 /// ..Default::default()
311 /// };
312 ///
313 /// let user = db.insert::<user::Entity>(&ctx, am).await?;
314 /// ```
315 ///
316 /// # Errors
317 ///
318 /// - `ScopeError::Invalid` if entity requires tenant but scope has none
319 /// - `ScopeError::Db` if database insert fails
320 pub async fn insert<E>(
321 &self,
322 scope: &AccessScope,
323 am: E::ActiveModel,
324 ) -> Result<E::Model, ScopeError>
325 where
326 E: ScopableEntity + EntityTrait,
327 E::Column: ColumnTrait + Copy,
328 E::ActiveModel: sea_orm::ActiveModelTrait<Entity = E> + Send,
329 E::Model: sea_orm::IntoActiveModel<E::ActiveModel>,
330 {
331 crate::secure::secure_insert::<E>(am, scope, self).await
332 }
333
334 /// Update a single entity with security scope validation.
335 ///
336 /// This method ensures the entity being updated is within the security scope
337 /// before performing the update. It validates that the record is accessible
338 /// based on tenant/resource constraints.
339 ///
340 /// # Security
341 ///
342 /// - Validates the entity exists and is accessible in the security scope
343 /// - Returns `ScopeError::Denied` if the entity is not in scope
344 /// - Ensures updates cannot affect entities outside the security boundary
345 ///
346 /// # Example
347 ///
348 /// ```ignore
349 /// let ctx = SecurityCtx::for_tenant(tenant_id, user_id);
350 ///
351 /// // Load and modify
352 /// let user_model = db.find_by_id::<user::Entity>(&ctx, id)?
353 /// .one(db)
354 /// .await?
355 /// .ok_or(NotFound)?;
356 ///
357 /// let mut user: user::ActiveModel = user_model.into();
358 /// user.email = Set("newemail@example.com".to_string());
359 /// user.updated_at = Set(Utc::now());
360 ///
361 /// // Update with scope validation (pass ID separately)
362 /// let updated = db.update_with_ctx::<user::Entity>(&ctx, id, user).await?;
363 /// ```
364 ///
365 /// # Errors
366 ///
367 /// - `ScopeError::Denied` if the entity is not accessible in the current scope
368 /// - `ScopeError::Db` if the database operation fails
369 pub async fn update_with_ctx<E>(
370 &self,
371 scope: &AccessScope,
372 id: Uuid,
373 am: E::ActiveModel,
374 ) -> Result<E::Model, ScopeError>
375 where
376 E: ScopableEntity + EntityTrait,
377 E::Column: ColumnTrait + Copy,
378 E::ActiveModel: sea_orm::ActiveModelTrait<Entity = E> + Send,
379 E::Model: sea_orm::IntoActiveModel<E::ActiveModel> + sea_orm::ModelTrait<Entity = E>,
380 {
381 crate::secure::secure_update_with_scope::<E>(am, scope, id, self).await
382 }
383
384 /// Delete a single entity by ID (scoped).
385 ///
386 /// This validates the entity exists in scope before deleting.
387 ///
388 /// # Example
389 ///
390 /// ```ignore
391 /// let ctx = SecurityCtx::for_tenants(vec![tenant_id], user_id);
392 /// db.delete_by_id::<user::Entity>(&ctx, user_id).await?;
393 /// ```
394 ///
395 /// # Returns
396 ///
397 /// - `Ok(true)` if entity was deleted
398 /// - `Ok(false)` if entity not found in scope
399 ///
400 /// # Errors
401 ///
402 /// Returns `ScopeError::Invalid` if the entity does not have a `resource_col` defined.
403 pub async fn delete_by_id<E>(&self, scope: &AccessScope, id: Uuid) -> Result<bool, ScopeError>
404 where
405 E: ScopableEntity + EntityTrait,
406 E::Column: ColumnTrait + Copy,
407 {
408 let resource_col = E::resource_col().ok_or_else(|| {
409 ScopeError::Invalid("Entity must have a resource_col to use delete_by_id()")
410 })?;
411
412 let result = E::delete_many()
413 .filter(sea_orm::Condition::all().add(Expr::col(resource_col).eq(id)))
414 .secure()
415 .scope_with(scope)
416 .exec(self)
417 .await?;
418
419 Ok(result.rows_affected > 0)
420 }
421
422 // ========================================================================
423 // Transaction support
424 // ========================================================================
425
426 /// Execute a closure inside a database transaction.
427 ///
428 /// This method starts a `SeaORM` transaction, provides the transaction handle
429 /// to the closure as `&SecureTx`, and handles commit/rollback.
430 ///
431 /// # Return Type
432 ///
433 /// Returns `anyhow::Result<Result<T, E>>` where:
434 /// - Outer `Err`: Database/infrastructure error (transaction rolls back)
435 /// - Inner `Ok(T)`: Success (transaction commits)
436 /// - Inner `Err(E)`: Domain/validation error (transaction still commits)
437 ///
438 /// This design ensures domain validation errors don't cause rollback.
439 ///
440 /// # Architecture Note
441 ///
442 /// Transaction boundaries should be managed by **application/domain services**,
443 /// not by REST handlers. REST handlers should call service methods that
444 /// internally decide when to open transactions.
445 ///
446 /// # Example
447 ///
448 /// ```ignore
449 /// use modkit_db::secure::SecureConn;
450 ///
451 /// // In a domain service:
452 /// pub async fn create_user(
453 /// db: &SecureConn,
454 /// repo: &UsersRepo,
455 /// user: User,
456 /// ) -> Result<User, DomainError> {
457 /// let result = db.transaction(|conn| async move {
458 /// // Check email uniqueness
459 /// if repo.email_exists(conn, &user.email).await? {
460 /// return Ok(Err(DomainError::EmailExists));
461 /// }
462 /// // Create user
463 /// let created = repo.create(conn, user).await?;
464 /// Ok(Ok(created))
465 /// }).await?;
466 /// result
467 /// }
468 /// ```
469 ///
470 /// # Security
471 ///
472 /// This method **consumes** `self` and returns it after the transaction completes.
473 /// This prevents accidental use of the outer connection inside the transaction,
474 /// making transaction bypass impossible by construction.
475 ///
476 /// Only `&SecureTx` is available inside the closure, ensuring all operations
477 /// execute within the transaction scope.
478 ///
479 /// # Returns
480 ///
481 /// Returns a tuple `(Self, Result<()>)` where:
482 /// - `Self` is the connection (always returned, even on error)
483 /// - `Result<()>` indicates transaction success or failure
484 ///
485 /// # Errors
486 ///
487 /// The `Result` component is `Err(anyhow::Error)` if:
488 /// - The transaction cannot be started
489 /// - A database operation fails (transaction is rolled back)
490 /// - The commit fails
491 pub async fn transaction<F>(self, f: F) -> (Self, anyhow::Result<()>)
492 where
493 F: for<'a> FnOnce(
494 &'a SecureTx<'a>,
495 )
496 -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'a>>
497 + Send,
498 {
499 let txn = match self.conn_internal().begin().await {
500 Ok(t) => t,
501 Err(e) => return (self, Err(e.into())),
502 };
503 let tx = SecureTx::new(&txn);
504
505 let res = f(&tx).await;
506
507 match res {
508 Ok(()) => match txn.commit().await {
509 Ok(()) => (self, Ok(())),
510 Err(e) => (self, Err(e.into())),
511 },
512 Err(e) => {
513 let _ = txn.rollback().await;
514 (self, Err(e))
515 }
516 }
517 }
518
519 /// Execute a transaction and return both the connection and a result value.
520 ///
521 /// This method consumes `self` and returns both the connection and the result
522 /// from the transaction closure. Use this when you need to return data from
523 /// within the transaction.
524 ///
525 /// # Security
526 ///
527 /// Like [`transaction`](Self::transaction), this method prevents transaction bypass
528 /// by consuming `self`, making it impossible to access the outer connection
529 /// inside the transaction closure.
530 ///
531 /// # Example
532 ///
533 /// ```ignore
534 /// let (conn, result) = conn.transaction_with(|tx| {
535 /// Box::pin(async move {
536 /// let user = repo.create(tx, &scope, new_user).await?;
537 /// Ok(user)
538 /// })
539 /// }).await;
540 /// let user = result?;
541 /// ```
542 ///
543 /// # Returns
544 ///
545 /// Returns a tuple `(Self, Result<T>)` where:
546 /// - `Self` is the connection (always returned)
547 /// - `Result<T>` contains the transaction result or error
548 ///
549 /// # Errors
550 ///
551 /// The `Result` component is `Err(anyhow::Error)` if:
552 /// - The transaction cannot be started
553 /// - A database operation fails (transaction is rolled back)
554 /// - The commit fails
555 pub async fn transaction_with<T, F>(self, f: F) -> (Self, anyhow::Result<T>)
556 where
557 T: Send + 'static,
558 F: for<'a> FnOnce(
559 &'a SecureTx<'a>,
560 )
561 -> Pin<Box<dyn Future<Output = anyhow::Result<T>> + Send + 'a>>
562 + Send,
563 {
564 let txn = match self.conn_internal().begin().await {
565 Ok(t) => t,
566 Err(e) => return (self, Err(e.into())),
567 };
568 let tx = SecureTx::new(&txn);
569
570 let res = f(&tx).await;
571
572 match res {
573 Ok(v) => match txn.commit().await {
574 Ok(()) => (self, Ok(v)),
575 Err(e) => (self, Err(e.into())),
576 },
577 Err(e) => {
578 let _ = txn.rollback().await;
579 (self, Err(e))
580 }
581 }
582 }
583
584 /// Execute a closure inside a database transaction with custom configuration.
585 ///
586 /// This method is similar to [`transaction`](Self::transaction), but allows
587 /// specifying the isolation level and access mode.
588 ///
589 /// # Configuration
590 ///
591 /// Use [`TxConfig`] to specify transaction settings without importing `SeaORM` types:
592 ///
593 /// ```ignore
594 /// use modkit_db::secure::{TxConfig, TxIsolationLevel, TxAccessMode};
595 ///
596 /// let cfg = TxConfig {
597 /// isolation: Some(TxIsolationLevel::Serializable),
598 /// access_mode: Some(TxAccessMode::ReadWrite),
599 /// };
600 /// ```
601 ///
602 /// # Example
603 ///
604 /// ```ignore
605 /// use modkit_db::secure::{SecureConn, TxConfig, TxIsolationLevel};
606 ///
607 /// // In a domain service requiring serializable isolation:
608 /// pub async fn reconcile_accounts(
609 /// db: &SecureConn,
610 /// repo: &AccountsRepo,
611 /// ) -> anyhow::Result<Result<ReconciliationResult, DomainError>> {
612 /// let cfg = TxConfig::serializable();
613 ///
614 /// db.transaction_with_config(cfg, |conn| async move {
615 /// let accounts = repo.find_all_pending(conn).await?;
616 /// for account in accounts {
617 /// repo.reconcile(conn, &account).await?;
618 /// }
619 /// Ok(Ok(ReconciliationResult { processed: accounts.len() }))
620 /// }).await
621 /// }
622 /// ```
623 ///
624 /// # Backend Notes
625 ///
626 /// - **`PostgreSQL`**: Full support for all isolation levels and access modes.
627 /// - **MySQL/InnoDB**: Full support for all isolation levels and access modes.
628 /// - **`SQLite`**: Only supports `Serializable` isolation. Other levels are
629 /// mapped to `Serializable`. Read-only mode is a hint only.
630 ///
631 /// # Security
632 ///
633 /// This method consumes `self` and returns both the connection and result,
634 /// preventing transaction bypass by making the outer connection unavailable
635 /// inside the closure.
636 ///
637 /// # Returns
638 ///
639 /// Returns a tuple `(Self, Result<T>)` where:
640 /// - `Self` is the connection (always returned)
641 /// - `Result<T>` contains the transaction result or error
642 ///
643 /// # Errors
644 ///
645 /// The `Result` component is `Err(anyhow::Error)` if:
646 /// - The transaction cannot be started with the specified configuration
647 /// - A database operation fails (transaction is rolled back)
648 /// - The commit fails
649 pub async fn transaction_with_config<T, F>(
650 self,
651 cfg: TxConfig,
652 f: F,
653 ) -> (Self, anyhow::Result<T>)
654 where
655 T: Send + 'static,
656 F: for<'a> FnOnce(
657 &'a SecureTx<'a>,
658 )
659 -> Pin<Box<dyn Future<Output = anyhow::Result<T>> + Send + 'a>>
660 + Send,
661 {
662 let isolation: Option<IsolationLevel> = cfg.isolation.map(Into::into);
663 let access_mode: Option<AccessMode> = cfg.access_mode.map(Into::into);
664
665 let txn = match self
666 .conn_internal()
667 .begin_with_config(isolation, access_mode)
668 .await
669 {
670 Ok(t) => t,
671 Err(e) => return (self, Err(e.into())),
672 };
673 let tx = SecureTx::new(&txn);
674
675 let res = f(&tx).await;
676
677 match res {
678 Ok(v) => match txn.commit().await {
679 Ok(()) => (self, Ok(v)),
680 Err(e) => (self, Err(e.into())),
681 },
682 Err(e) => {
683 let _ = txn.rollback().await;
684 (self, Err(e))
685 }
686 }
687 }
688
689 /// Execute a closure inside a typed domain transaction.
690 ///
691 /// This method returns [`TxError<E>`] which distinguishes domain errors from
692 /// infrastructure errors, allowing callers to handle them appropriately.
693 ///
694 /// # Error Handling
695 ///
696 /// - Domain errors returned from the closure are wrapped in `TxError::Domain(e)`
697 /// - Database infrastructure errors are wrapped in `TxError::Infra(InfraError)`
698 ///
699 /// Use [`TxError::into_domain`] to convert the result into your domain error type.
700 ///
701 /// # Example
702 ///
703 /// ```ignore
704 /// use modkit_db::secure::SecureConn;
705 ///
706 /// async fn create_user(db: &SecureConn, repo: &UsersRepo, user: User) -> Result<User, DomainError> {
707 /// db.in_transaction(move |tx| Box::pin(async move {
708 /// if repo.exists(tx, user.id).await? {
709 /// return Err(DomainError::already_exists(user.id));
710 /// }
711 /// repo.create(tx, user).await
712 /// }))
713 /// .await
714 /// .map_err(|e| e.into_domain(DomainError::database_infra))
715 /// }
716 /// ```
717 ///
718 /// # Security
719 ///
720 /// This method consumes `self` and returns both the connection and result,
721 /// preventing transaction bypass by making the outer connection unavailable
722 /// inside the closure.
723 ///
724 /// # Returns
725 ///
726 /// Returns a tuple `(Self, Result<T, TxError<E>>)` where:
727 /// - `Self` is the connection (always returned)
728 /// - `Result<T, TxError<E>>` contains the transaction result or error
729 ///
730 /// # Errors
731 ///
732 /// The `Result` component is `Err(TxError<E>)` if:
733 /// - The callback returns a domain error (`TxError::Domain(E)`).
734 /// - The transaction fails due to a database/infrastructure error (`TxError::Infra(InfraError)`).
735 pub async fn in_transaction<T, E, F>(self, f: F) -> (Self, Result<T, TxError<E>>)
736 where
737 T: Send + 'static,
738 E: std::fmt::Debug + std::fmt::Display + Send + 'static,
739 F: for<'a> FnOnce(
740 &'a SecureTx<'a>,
741 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>
742 + Send,
743 {
744 let txn = match self.conn_internal().begin().await {
745 Ok(t) => t,
746 Err(e) => return (self, Err(TxError::Infra(InfraError::new(e.to_string())))),
747 };
748 let tx = SecureTx::new(&txn);
749
750 let res = f(&tx).await;
751
752 match res {
753 Ok(v) => match txn.commit().await {
754 Ok(()) => (self, Ok(v)),
755 Err(e) => (self, Err(TxError::Infra(InfraError::new(e.to_string())))),
756 },
757 Err(e) => {
758 let _ = txn.rollback().await;
759 (self, Err(TxError::Domain(e)))
760 }
761 }
762 }
763
764 /// Execute a typed domain transaction with automatic infrastructure error mapping.
765 ///
766 /// This is a convenience wrapper around [`in_transaction`](Self::in_transaction) that
767 /// automatically converts [`TxError`] into the domain error type using the provided
768 /// mapping function for infrastructure errors.
769 ///
770 /// # Example
771 ///
772 /// ```ignore
773 /// use modkit_db::secure::SecureConn;
774 ///
775 /// async fn create_user(db: &SecureConn, repo: &UsersRepo, user: User) -> Result<User, DomainError> {
776 /// db.in_transaction_mapped(DomainError::database_infra, move |tx| Box::pin(async move {
777 /// if repo.exists(tx, user.id).await? {
778 /// return Err(DomainError::already_exists(user.id));
779 /// }
780 /// repo.create(tx, user).await
781 /// })).await
782 /// }
783 /// ```
784 ///
785 /// # Security
786 ///
787 /// This method consumes `self` and returns both the connection and result,
788 /// preventing transaction bypass.
789 ///
790 /// # Returns
791 ///
792 /// Returns a tuple `(Self, Result<T, E>)` where:
793 /// - `Self` is the connection (always returned)
794 /// - `Result<T, E>` contains the transaction result or mapped error
795 ///
796 /// # Errors
797 ///
798 /// The `Result` component is `Err(E)` if:
799 /// - The callback returns a domain error (`E`).
800 /// - The transaction fails due to a database/infrastructure error, mapped via `map_infra`.
801 pub async fn in_transaction_mapped<T, E, F, M>(self, map_infra: M, f: F) -> (Self, Result<T, E>)
802 where
803 T: Send + 'static,
804 E: std::fmt::Debug + std::fmt::Display + Send + 'static,
805 M: FnOnce(InfraError) -> E + Send,
806 F: for<'a> FnOnce(
807 &'a SecureTx<'a>,
808 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>
809 + Send,
810 {
811 let (conn, result) = self.in_transaction(f).await;
812 (conn, result.map_err(|tx_err| tx_err.into_domain(map_infra)))
813 }
814}