pg_worm/query/
transaction.rs

1use std::{
2    alloc::{alloc, dealloc, handle_alloc_error, Layout},
3    ptr::drop_in_place,
4};
5
6use deadpool_postgres::{Client as DpClient, Transaction as DpTransaction};
7
8use crate::{fetch_client, Error};
9
10use super::{Executable, Query, ToQuery};
11
12struct PinnedClient(pub *mut DpClient);
13
14impl PinnedClient {
15    unsafe fn from_client(client: DpClient) -> PinnedClient {
16        // Allocate memory on the heap
17        let layout = Layout::new::<DpClient>();
18        let pointer = alloc(layout) as *mut DpClient;
19
20        // Make sure it worked
21        if pointer.is_null() {
22            handle_alloc_error(layout);
23        }
24
25        // Move the client object to the heap
26        pointer.write(client);
27
28        // Return a `PinnedClient` object with a pointer
29        // to the underlying client.
30        PinnedClient(pointer)
31    }
32}
33
34impl Drop for PinnedClient {
35    fn drop(&mut self) {
36        unsafe {
37            // Call `drop` on the client object to make sure
38            // it is properly cleaned up and
39            // returned to the pool.
40            drop_in_place(self.0);
41
42            // Deallocate the previously allocated
43            // memory when the PinnedClient is dropped.
44            dealloc(self.0 as *mut u8, Layout::new::<DpClient>());
45        }
46    }
47}
48
49/// A struct providing transaction functionality.
50///
51/// Use it to execute queries as part of this transaction.
52/// When you are done, commit using `.commit()`
53pub struct Transaction<'a> {
54    transaction: DpTransaction<'a>,
55    _client: PinnedClient,
56}
57
58impl<'a> Transaction<'a> {
59    async fn from_client<'this>(client: DpClient) -> Result<Transaction<'a>, Error> {
60        let client = unsafe { PinnedClient::from_client(client) };
61        let transaction = unsafe {
62            // Convert `*mut DpClient` to `&mut DpClient`
63            // This shouldn't fail since the pointer in PinnedCliend
64            // is guaranteed not to be null.
65            &mut *client.0
66        }
67        .transaction()
68        .await?;
69
70        Ok(Transaction {
71            _client: client,
72            transaction,
73        })
74    }
75
76    /// Begin a new transaction.
77    pub async fn begin() -> Result<Transaction<'a>, Error> {
78        let client = fetch_client().await?;
79
80        Transaction::from_client(client).await
81    }
82
83    /// Rollback this transaction. TODO
84    pub async fn rollback(self) -> Result<(), Error> {
85        self.transaction.rollback().await.map_err(Error::from)
86    }
87
88    /// Commit the transaction. TODO
89    pub async fn commit(self) -> Result<(), Error> {
90        self.transaction.commit().await.map_err(Error::from)
91    }
92
93    /// Execute a query  as part of this transaction
94    /// and return its return value.
95    pub async fn execute<'b, Q, T>(&self, mut query: Q) -> Result<T, Error>
96    where
97        Q: ToQuery<'b, T>,
98        Query<'b, T>: Executable<Output = T>,
99    {
100        let query = query.to_query();
101        query.exec_with(&self.transaction).await
102    }
103}