force_sync/store/pg/
store.rs1use std::future::Future;
4
5use deadpool_postgres::{Client, Pool};
6use futures::future::BoxFuture;
7
8use crate::error::ForceSyncError;
9
10#[derive(Clone, Debug)]
12pub struct PgStore {
13 pool: Pool,
14}
15
16impl PgStore {
17 #[must_use]
19 pub const fn new(pool: Pool) -> Self {
20 Self { pool }
21 }
22
23 #[must_use]
25 pub const fn pool(&self) -> &Pool {
26 &self.pool
27 }
28
29 pub async fn with_client<T, F, Fut>(&self, f: F) -> Result<T, ForceSyncError>
36 where
37 F: FnOnce(Client) -> Fut,
38 Fut: Future<Output = Result<T, tokio_postgres::Error>>,
39 {
40 let client = self.pool.get().await?;
41 f(client).await.map_err(Into::into)
42 }
43
44 pub async fn with_transaction<T, F>(&self, f: F) -> Result<T, ForceSyncError>
51 where
52 F: for<'a> FnOnce(
53 &'a tokio_postgres::Transaction<'a>,
54 ) -> BoxFuture<'a, Result<T, ForceSyncError>>,
55 {
56 let mut client = self.pool.get().await?;
57 let transaction = client.transaction().await?;
58
59 match f(&transaction).await {
60 Ok(value) => {
61 transaction.commit().await?;
62 Ok(value)
63 }
64 Err(err) => match transaction.rollback().await {
65 Ok(()) => Err(err),
66 Err(rollback) => Err(ForceSyncError::TransactionRollback {
67 callback: Box::new(err),
68 rollback,
69 }),
70 },
71 }
72 }
73}