Skip to main content

force_sync/store/pg/
store.rs

1//! PostgreSQL-backed sync store helpers.
2
3use std::future::Future;
4
5use deadpool_postgres::{Client, Pool};
6use futures::future::BoxFuture;
7
8use crate::error::ForceSyncError;
9
10/// PostgreSQL-backed store for sync engine work.
11#[derive(Clone, Debug)]
12pub struct PgStore {
13    pool: Pool,
14}
15
16impl PgStore {
17    /// Creates a new store wrapper around an existing connection pool.
18    #[must_use]
19    pub const fn new(pool: Pool) -> Self {
20        Self { pool }
21    }
22
23    /// Returns the backing `PostgreSQL` pool.
24    #[must_use]
25    pub const fn pool(&self) -> &Pool {
26        &self.pool
27    }
28
29    /// Runs an operation against a pooled `PostgreSQL` client.
30    ///
31    /// # Errors
32    ///
33    /// Returns a pool or query error if the client cannot be acquired or the
34    /// callback fails.
35    pub async fn with_client<T, F, Fut>(&self, f: F) -> Result<T, ForceSyncError>
36    where
37        F: FnOnce(Client) -> Fut,
38        Fut: Future<Output = Result<T, tokio_postgres::Error>>,
39    {
40        let client = self.pool.get().await?;
41        f(client).await.map_err(Into::into)
42    }
43
44    /// Runs an operation inside a `PostgreSQL` transaction.
45    ///
46    /// # Errors
47    ///
48    /// Returns a pool or query error if the transaction cannot be opened or
49    /// the callback fails.
50    pub async fn with_transaction<T, F>(&self, f: F) -> Result<T, ForceSyncError>
51    where
52        F: for<'a> FnOnce(
53            &'a tokio_postgres::Transaction<'a>,
54        ) -> BoxFuture<'a, Result<T, ForceSyncError>>,
55    {
56        let mut client = self.pool.get().await?;
57        let transaction = client.transaction().await?;
58
59        match f(&transaction).await {
60            Ok(value) => {
61                transaction.commit().await?;
62                Ok(value)
63            }
64            Err(err) => match transaction.rollback().await {
65                Ok(()) => Err(err),
66                Err(rollback) => Err(ForceSyncError::TransactionRollback {
67                    callback: Box::new(err),
68                    rollback,
69                }),
70            },
71        }
72    }
73}