pg_worm/query/
transaction.rs1use std::{
2 alloc::{alloc, dealloc, handle_alloc_error, Layout},
3 ptr::drop_in_place,
4};
5
6use deadpool_postgres::{Client as DpClient, Transaction as DpTransaction};
7
8use crate::{fetch_client, Error};
9
10use super::{Executable, Query, ToQuery};
11
12struct PinnedClient(pub *mut DpClient);
13
14impl PinnedClient {
15 unsafe fn from_client(client: DpClient) -> PinnedClient {
16 let layout = Layout::new::<DpClient>();
18 let pointer = alloc(layout) as *mut DpClient;
19
20 if pointer.is_null() {
22 handle_alloc_error(layout);
23 }
24
25 pointer.write(client);
27
28 PinnedClient(pointer)
31 }
32}
33
34impl Drop for PinnedClient {
35 fn drop(&mut self) {
36 unsafe {
37 drop_in_place(self.0);
41
42 dealloc(self.0 as *mut u8, Layout::new::<DpClient>());
45 }
46 }
47}
48
49pub struct Transaction<'a> {
54 transaction: DpTransaction<'a>,
55 _client: PinnedClient,
56}
57
58impl<'a> Transaction<'a> {
59 async fn from_client<'this>(client: DpClient) -> Result<Transaction<'a>, Error> {
60 let client = unsafe { PinnedClient::from_client(client) };
61 let transaction = unsafe {
62 &mut *client.0
66 }
67 .transaction()
68 .await?;
69
70 Ok(Transaction {
71 _client: client,
72 transaction,
73 })
74 }
75
76 pub async fn begin() -> Result<Transaction<'a>, Error> {
78 let client = fetch_client().await?;
79
80 Transaction::from_client(client).await
81 }
82
83 pub async fn rollback(self) -> Result<(), Error> {
85 self.transaction.rollback().await.map_err(Error::from)
86 }
87
88 pub async fn commit(self) -> Result<(), Error> {
90 self.transaction.commit().await.map_err(Error::from)
91 }
92
93 pub async fn execute<'b, Q, T>(&self, mut query: Q) -> Result<T, Error>
96 where
97 Q: ToQuery<'b, T>,
98 Query<'b, T>: Executable<Output = T>,
99 {
100 let query = query.to_query();
101 query.exec_with(&self.transaction).await
102 }
103}