axum_postgres_tx/
lib.rs

1#![deny(clippy::unwrap_used)]
2#![warn(clippy::impl_trait_in_params)]
3
4use axum_core::response::{IntoResponse, Response};
5use bb8_postgres::{
6    PostgresConnectionManager,
7    bb8::{PooledConnection, RunError},
8    tokio_postgres::{self, NoTls, Transaction},
9};
10use http::StatusCode;
11use thiserror::Error;
12
13mod extension;
14pub mod layer;
15pub mod tx;
16
17#[derive(Debug, Error)]
18pub enum Error {
19    #[error(transparent)]
20    Pool(#[from] RunError<tokio_postgres::Error>),
21    #[error(transparent)]
22    Db(#[from] tokio_postgres::Error),
23    #[error("required extension not registered")]
24    MissingExtension,
25    #[error("Tx extractor used multiple times in the same handler/middleware")]
26    OverlappingExtractors,
27}
28
29impl IntoResponse for Error {
30    fn into_response(self) -> Response {
31        (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()).into_response()
32    }
33}
34
35type Pool = bb8_postgres::bb8::Pool<PostgresConnectionManager<NoTls>>;
36
37struct OwnedTx {
38    // NOTE: must be declared in this order so that trans's destructor runs first
39    tx: Transaction<'static>,
40    // Use a Box so that Connection's address is stable
41    _conn: Box<PooledConnection<'static, PostgresConnectionManager<NoTls>>>,
42}
43
44impl OwnedTx {
45    // SAFETY: As a result of the transmute, the transaction must still be dropped when the caller is dropped.
46    async fn new(
47        conn: PooledConnection<'static, PostgresConnectionManager<NoTls>>,
48        read_only: bool,
49    ) -> Result<OwnedTx, tokio_postgres::Error> {
50        let mut conn = Box::new(conn);
51        let tx = conn
52            .build_transaction()
53            .read_only(read_only)
54            .start()
55            .await?;
56        let tx = unsafe {
57            std::mem::transmute::<tokio_postgres::Transaction<'_>, tokio_postgres::Transaction<'_>>(
58                tx,
59            )
60        };
61        Ok(OwnedTx { tx, _conn: conn })
62    }
63}