use std::future::Future;
use std::ptr::{self, NonNull};
use std::sync::Arc;
use crate::error::{check, FdbError, FdbResult};
use crate::transaction::{FdbReadTransaction, FdbTransaction, ReadTransaction, Transaction};
use crate::Tenant;
#[derive(Clone, Debug)]
pub struct FdbTenant {
c_ptr: Option<Arc<NonNull<fdb_sys::FDBTenant>>>,
name: Tenant,
}
impl FdbTenant {
pub fn create_transaction(&self) -> FdbResult<FdbTransaction> {
let mut ptr: *mut fdb_sys::FDB_transaction = ptr::null_mut();
check(unsafe {
fdb_sys::fdb_tenant_create_transaction(
(*(self.c_ptr.as_ref().unwrap())).as_ptr(),
&mut ptr,
)
})
.map(|_| {
FdbTransaction::new(Some(Arc::new(NonNull::new(ptr).expect(
"fdb_tenant_create_transaction returned null, but did not return an error",
))))
})
}
pub fn get_name(&self) -> Tenant {
self.name.clone()
}
pub async fn run<T, F, Fut>(&self, mut f: F) -> FdbResult<T>
where
F: FnMut(FdbTransaction) -> Fut,
Fut: Future<Output = FdbResult<T>>,
{
let t = self.create_transaction()?;
loop {
let ret_val = f(t.clone()).await;
if let Err(e) = ret_val {
if FdbError::layer_error(e.code()) {
return Err(e);
} else if let Err(e1) = unsafe { t.on_error(e) }.await {
return Err(e1);
} else {
continue;
}
}
if let Err(e) = unsafe { t.commit() }.await {
if let Err(e1) = unsafe { t.on_error(e) }.await {
return Err(e1);
} else {
continue;
}
}
return ret_val;
}
}
pub async fn read<T, F, Fut>(&self, mut f: F) -> FdbResult<T>
where
F: FnMut(FdbReadTransaction) -> Fut,
Fut: Future<Output = FdbResult<T>>,
{
let t = self.create_transaction()?.snapshot();
loop {
let ret_val = f(t.clone()).await;
if let Err(e) = ret_val {
if FdbError::layer_error(e.code()) {
return Err(e);
} else if let Err(e1) = unsafe { t.on_error(e) }.await {
return Err(e1);
} else {
continue;
}
}
return ret_val;
}
}
pub(crate) fn new(c_ptr: Option<Arc<NonNull<fdb_sys::FDBTenant>>>, name: Tenant) -> FdbTenant {
FdbTenant { c_ptr, name }
}
}
impl Drop for FdbTenant {
fn drop(&mut self) {
if let Some(a) = self.c_ptr.take() {
match Arc::try_unwrap(a) {
Ok(a) => unsafe {
fdb_sys::fdb_tenant_destroy(a.as_ptr());
},
Err(at) => {
drop(at);
}
};
}
}
}
unsafe impl Send for FdbTenant {}
unsafe impl Sync for FdbTenant {}
#[cfg(test)]
mod tests {
use bytes::Bytes;
use impls::impls;
use std::ptr::NonNull;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use crate::Tenant;
use super::FdbTenant;
#[test]
fn impls() {
#[rustfmt::skip]
assert!(impls!(
FdbTenant:
Send &
Sync &
Clone &
!Copy));
}
#[allow(dead_code)]
#[derive(Clone, Debug)]
struct DummyFdbTenant {
c_ptr: Option<Arc<NonNull<fdb_sys::FDBTenant>>>,
name: Tenant,
}
unsafe impl Send for DummyFdbTenant {}
unsafe impl Sync for DummyFdbTenant {}
#[test]
fn trait_bounds() {
fn trait_bounds_for_fdb_tenant<T>(_t: T)
where
T: Send + Sync + 'static,
{
}
let d = DummyFdbTenant {
c_ptr: Some(Arc::new(NonNull::dangling())),
name: Bytes::new().into(),
};
trait_bounds_for_fdb_tenant(d);
}
static mut DROP_TEST_DUMMY_FDB_TENANT_HAS_DROPPED: AtomicBool = AtomicBool::new(false);
#[allow(dead_code)]
#[derive(Clone, Debug)]
struct DropTestDummyFdbTenant {
c_ptr: Option<Arc<NonNull<fdb_sys::FDBTenant>>>,
name: Tenant,
}
unsafe impl Send for DropTestDummyFdbTenant {}
unsafe impl Sync for DropTestDummyFdbTenant {}
impl Drop for DropTestDummyFdbTenant {
fn drop(&mut self) {
if let Some(a) = self.c_ptr.take() {
match Arc::try_unwrap(a) {
Ok(_) => {
unsafe {
DROP_TEST_DUMMY_FDB_TENANT_HAS_DROPPED.store(true, Ordering::SeqCst);
};
}
Err(at) => {
drop(at);
}
};
}
}
}
#[tokio::test]
async fn multiple_drop() {
let d0 = DropTestDummyFdbTenant {
c_ptr: Some(Arc::new(NonNull::dangling())),
name: Bytes::new().into(),
};
assert!(!unsafe { DROP_TEST_DUMMY_FDB_TENANT_HAS_DROPPED.load(Ordering::SeqCst) });
let d1 = d0.clone();
assert_eq!(Arc::strong_count(d1.c_ptr.as_ref().unwrap()), 2);
tokio::spawn(async move {
let _ = d1;
})
.await
.unwrap();
assert_eq!(Arc::strong_count(d0.c_ptr.as_ref().unwrap()), 1);
let d2 = d0.clone();
let d3 = d2.clone();
tokio::spawn(async move {
let _ = d2;
let _ = d3;
})
.await
.unwrap();
assert_eq!(Arc::strong_count(d0.c_ptr.as_ref().unwrap()), 1);
drop(d0);
assert!(unsafe { DROP_TEST_DUMMY_FDB_TENANT_HAS_DROPPED.load(Ordering::SeqCst) });
}
}