1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
use std::{
alloc::{alloc, dealloc, handle_alloc_error, Layout},
ptr::drop_in_place,
};
use deadpool_postgres::{Client as DpClient, Transaction as DpTransaction};
use crate::{fetch_client, Error};
use super::{Executable, Query, ToQuery};
struct PinnedClient(pub *mut DpClient);
impl PinnedClient {
unsafe fn from_client(client: DpClient) -> PinnedClient {
// Allocate memory on the heap
let layout = Layout::new::<DpClient>();
let pointer = alloc(layout) as *mut DpClient;
// Make sure it worked
if pointer.is_null() {
handle_alloc_error(layout);
}
// Move the client object to the heap
pointer.write(client);
// Return a `PinnedClient` object with a pointer
// to the underlying client.
PinnedClient(pointer)
}
}
impl Drop for PinnedClient {
fn drop(&mut self) {
unsafe {
// Call `drop` on the client object to make sure
// it is properly cleaned up and
// returned to the pool.
drop_in_place(self.0);
// Deallocate the previously allocated
// memory when the PinnedClient is dropped.
dealloc(self.0 as *mut u8, Layout::new::<DpClient>());
}
}
}
/// A struct providing transaction functionality.
///
/// Use it to execute queries as part of this transaction.
/// When you are done, commit using `.commit()`
pub struct Transaction<'a> {
transaction: DpTransaction<'a>,
_client: PinnedClient,
}
impl<'a> Transaction<'a> {
async fn from_client<'this>(client: DpClient) -> Result<Transaction<'a>, Error> {
let client = unsafe { PinnedClient::from_client(client) };
let transaction = unsafe {
// Convert `*mut DpClient` to `&mut DpClient`
// This shouldn't fail since the pointer in PinnedCliend
// is guaranteed not to be null.
&mut *client.0
}
.transaction()
.await?;
Ok(Transaction {
_client: client,
transaction,
})
}
/// Begin a new transaction.
pub async fn begin() -> Result<Transaction<'a>, Error> {
let client = fetch_client().await?;
Transaction::from_client(client).await
}
/// Rollback this transaction. TODO
pub async fn rollback(self) -> Result<(), Error> {
self.transaction.rollback().await.map_err(Error::from)
}
/// Commit the transaction. TODO
pub async fn commit(self) -> Result<(), Error> {
self.transaction.commit().await.map_err(Error::from)
}
/// Execute a query as part of this transaction
/// and return its return value.
pub async fn execute<'b, Q, T>(&self, mut query: Q) -> Result<T, Error>
where
Q: ToQuery<'b, T>,
Query<'b, T>: Executable<Output = T>,
{
let query = query.to_query();
query.exec_with(&self.transaction).await
}
}