use bytes::{BufMut, Bytes, BytesMut};
use tokio_stream::StreamExt;
use std::future::Future;
use std::ptr::{self, NonNull};
use std::sync::Arc;
use crate::database::DatabaseOption;
use crate::error::{check, FdbError, FdbResult};
use crate::range::{Range, RangeOptions};
use crate::transaction::{
FdbReadTransaction, FdbTransaction, ReadTransaction, Transaction, TransactionOption,
};
use crate::Key;
#[cfg(feature = "fdb-7_1")]
use std::convert::TryInto;
#[cfg(feature = "fdb-7_1")]
use crate::Tenant;
#[cfg(feature = "fdb-7_1")]
use crate::tenant::FdbTenant;
#[derive(Clone, Debug)]
pub struct FdbDatabase {
c_ptr: Option<Arc<NonNull<fdb_sys::FDBDatabase>>>,
}
impl FdbDatabase {
pub fn create_transaction(&self) -> FdbResult<FdbTransaction> {
let mut ptr: *mut fdb_sys::FDB_transaction = ptr::null_mut();
check(unsafe {
fdb_sys::fdb_database_create_transaction(
(*(self.c_ptr.as_ref().unwrap())).as_ptr(),
&mut ptr,
)
})
.map(|_| {
FdbTransaction::new(Some(Arc::new(NonNull::new(ptr).expect(
"fdb_database_create_transaction returned null, but did not return an error",
))))
})
}
pub async fn get_boundary_keys(
&self,
begin: impl Into<Key>,
end: impl Into<Key>,
limit: i32,
read_version: i64,
) -> FdbResult<Vec<Key>> {
let tr = self.create_transaction()?;
if read_version != 0 {
unsafe {
tr.set_read_version(read_version);
}
}
tr.set_option(TransactionOption::ReadSystemKeys)?;
tr.set_option(TransactionOption::LockAware)?;
let range = Range::new(
{
let mut b = BytesMut::new();
b.put(&b"\xFF/keyServers/"[..]);
b.put(Into::<Bytes>::into(begin.into()));
Into::<Bytes>::into(b)
},
{
let mut b = BytesMut::new();
b.put(&b"\xFF/keyServers/"[..]);
b.put(Into::<Bytes>::into(end.into()));
Into::<Bytes>::into(b)
},
);
let mut res = Vec::new();
let mut range_stream = range.into_stream(&tr.snapshot(), {
let mut ro = RangeOptions::default();
ro.set_limit(limit);
ro
});
while let Some(x) = range_stream.next().await {
let kv = x?;
res.push({
Into::<Key>::into(Into::<Bytes>::into(kv.into_key()).slice(13..))
});
}
Ok(res)
}
#[cfg(feature = "fdb-7_1")]
pub fn open_tenant(&self, tenant_name: impl Into<Tenant>) -> FdbResult<FdbTenant> {
let t = Bytes::from(tenant_name.into());
let tenant_name = t.as_ref().as_ptr();
let tenant_name_length = t.as_ref().len().try_into().unwrap();
let mut ptr: *mut fdb_sys::FDB_tenant = ptr::null_mut();
check(unsafe {
fdb_sys::fdb_database_open_tenant(
(*(self.c_ptr.as_ref().unwrap())).as_ptr(),
tenant_name,
tenant_name_length,
&mut ptr,
)
})
.map(|_| {
FdbTenant::new(
Some(Arc::new(NonNull::new(ptr).expect(
"fdb_database_open_tenant returned null, but did not return an error",
))),
t.into(),
)
})
}
pub async fn run<T, F, Fut>(&self, mut f: F) -> FdbResult<T>
where
F: FnMut(FdbTransaction) -> Fut,
Fut: Future<Output = FdbResult<T>>,
{
let t = self.create_transaction()?;
loop {
let ret_val = f(t.clone()).await;
if let Err(e) = ret_val {
if FdbError::layer_error(e.code()) {
return Err(e);
} else if let Err(e1) = unsafe { t.on_error(e) }.await {
return Err(e1);
} else {
continue;
}
}
if let Err(e) = unsafe { t.commit() }.await {
if let Err(e1) = unsafe { t.on_error(e) }.await {
return Err(e1);
} else {
continue;
}
}
return ret_val;
}
}
pub async fn read<T, F, Fut>(&self, mut f: F) -> FdbResult<T>
where
F: FnMut(FdbReadTransaction) -> Fut,
Fut: Future<Output = FdbResult<T>>,
{
let t = self.create_transaction()?.snapshot();
loop {
let ret_val = f(t.clone()).await;
if let Err(e) = ret_val {
if FdbError::layer_error(e.code()) {
return Err(e);
} else if let Err(e1) = unsafe { t.on_error(e) }.await {
return Err(e1);
} else {
continue;
}
}
return ret_val;
}
}
pub fn set_option(&self, option: DatabaseOption) -> FdbResult<()> {
unsafe { option.apply((self.c_ptr.as_ref().unwrap()).as_ptr()) }
}
pub(crate) fn new(c_ptr: Option<Arc<NonNull<fdb_sys::FDBDatabase>>>) -> FdbDatabase {
FdbDatabase { c_ptr }
}
}
impl Drop for FdbDatabase {
fn drop(&mut self) {
if let Some(a) = self.c_ptr.take() {
match Arc::try_unwrap(a) {
Ok(a) => unsafe {
fdb_sys::fdb_database_destroy(a.as_ptr());
},
Err(at) => {
drop(at);
}
};
}
}
}
unsafe impl Send for FdbDatabase {}
unsafe impl Sync for FdbDatabase {}
#[cfg(test)]
mod tests {
use impls::impls;
use std::ptr::NonNull;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use super::FdbDatabase;
#[test]
fn impls() {
#[rustfmt::skip]
assert!(impls!(
FdbDatabase:
Send &
Sync &
Clone &
!Copy));
}
#[allow(dead_code)]
#[derive(Clone, Debug)]
struct DummyFdbDatabase {
c_ptr: Option<Arc<NonNull<fdb_sys::FDBDatabase>>>,
}
unsafe impl Send for DummyFdbDatabase {}
unsafe impl Sync for DummyFdbDatabase {}
#[test]
fn trait_bounds() {
fn trait_bounds_for_fdb_database<T>(_t: T)
where
T: Send + Sync + 'static,
{
}
let d = DummyFdbDatabase {
c_ptr: Some(Arc::new(NonNull::dangling())),
};
trait_bounds_for_fdb_database(d);
}
static mut DROP_TEST_DUMMY_FDB_DATABASE_HAS_DROPPED: AtomicBool = AtomicBool::new(false);
#[derive(Clone, Debug)]
struct DropTestDummyFdbDatabase {
c_ptr: Option<Arc<NonNull<fdb_sys::FDBDatabase>>>,
}
unsafe impl Send for DropTestDummyFdbDatabase {}
unsafe impl Sync for DropTestDummyFdbDatabase {}
impl Drop for DropTestDummyFdbDatabase {
fn drop(&mut self) {
if let Some(a) = self.c_ptr.take() {
match Arc::try_unwrap(a) {
Ok(_) => {
unsafe {
DROP_TEST_DUMMY_FDB_DATABASE_HAS_DROPPED.store(true, Ordering::SeqCst);
};
}
Err(at) => {
drop(at);
}
};
}
}
}
#[tokio::test]
async fn multiple_drop() {
let d0 = DropTestDummyFdbDatabase {
c_ptr: Some(Arc::new(NonNull::dangling())),
};
assert!(!unsafe { DROP_TEST_DUMMY_FDB_DATABASE_HAS_DROPPED.load(Ordering::SeqCst) });
let d1 = d0.clone();
assert_eq!(Arc::strong_count(d1.c_ptr.as_ref().unwrap()), 2);
tokio::spawn(async move {
let _ = d1;
})
.await
.unwrap();
assert_eq!(Arc::strong_count(d0.c_ptr.as_ref().unwrap()), 1);
let d2 = d0.clone();
let d3 = d2.clone();
tokio::spawn(async move {
let _ = d2;
let _ = d3;
})
.await
.unwrap();
assert_eq!(Arc::strong_count(d0.c_ptr.as_ref().unwrap()), 1);
drop(d0);
assert!(unsafe { DROP_TEST_DUMMY_FDB_DATABASE_HAS_DROPPED.load(Ordering::SeqCst) });
}
}