use crate::options::TransactionOption;
use std::future::Future;
use crate::database::TransactError;
use crate::{
error, Database, DatabaseTransact, FdbBindingError, FdbError, FdbResult, KeySelector,
RangeOption, RetryableTransaction, TransactOption, Transaction,
};
use foundationdb_macros::cfg_api_versions;
use foundationdb_sys as fdb_sys;
use fdb_sys::if_cfg_api_versions;
use futures::TryStreamExt;
use serde::{Deserialize, Serialize};
use serde_json::Error;
use std::ptr::NonNull;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Instant;
#[cfg(feature = "fdb-7_1")]
const TENANT_MAP_PREFIX: &[u8] = b"\xFF\xFF/management/tenant_map/";
#[cfg(feature = "fdb-7_1")]
const TENANT_MAP_PREFIX_END: &[u8] = b"\xFF\xFF/management/tenant_map0";
#[cfg_api_versions(min = 730)]
const TENANT_MAP_PREFIX: &[u8] = b"\xFF\xFF/management/tenant/map/";
#[cfg_api_versions(min = 730)]
const TENANT_MAP_PREFIX_END: &[u8] = b"\xFF\xFF/management/tenant/map0";
pub struct FdbTenant {
pub(crate) inner: NonNull<fdb_sys::FDBTenant>,
pub(crate) name: Vec<u8>,
}
unsafe impl Send for FdbTenant {}
unsafe impl Sync for FdbTenant {}
impl Drop for FdbTenant {
fn drop(&mut self) {
unsafe {
fdb_sys::fdb_tenant_destroy(self.inner.as_ptr());
}
}
}
impl FdbTenant {
pub fn get_name(&self) -> &[u8] {
&self.name
}
pub fn create_trx(&self) -> FdbResult<Transaction> {
let mut trx: *mut fdb_sys::FDBTransaction = std::ptr::null_mut();
let err = unsafe { fdb_sys::fdb_tenant_create_transaction(self.inner.as_ptr(), &mut trx) };
error::eval(err)?;
Ok(Transaction::new(NonNull::new(trx).expect(
"fdb_tenant_create_transaction to not return null if there is no error",
)))
}
fn create_retryable_trx(&self) -> FdbResult<RetryableTransaction> {
Ok(RetryableTransaction::new(self.create_trx()?))
}
pub async fn run<F, Fut, T>(&self, closure: F) -> Result<T, FdbBindingError>
where
F: Fn(RetryableTransaction, bool) -> Fut,
Fut: Future<Output = Result<T, FdbBindingError>>,
{
let mut maybe_committed_transaction = false;
let mut transaction = self.create_retryable_trx()?;
loop {
let result_closure = closure(transaction.clone(), maybe_committed_transaction).await;
if let Err(e) = result_closure {
if let Some(e) = e.get_fdb_error() {
maybe_committed_transaction = e.is_maybe_committed();
match transaction.on_error(e).await {
Ok(Ok(t)) => {
transaction = t;
continue;
}
Ok(Err(non_retryable_error)) => {
return Err(FdbBindingError::from(non_retryable_error))
}
Err(non_retryable_error) => return Err(non_retryable_error),
}
}
return Err(e);
}
let commit_result = transaction.commit().await;
match commit_result {
Err(err) => return Err(err),
Ok(Ok(_)) => return result_closure,
Ok(Err(transaction_commit_error)) => {
maybe_committed_transaction = transaction_commit_error.is_maybe_committed();
match transaction_commit_error.on_error().await {
Ok(t) => {
transaction = RetryableTransaction::new(t);
continue;
}
Err(non_retryable_error) => {
return Err(FdbBindingError::from(non_retryable_error))
}
}
}
}
}
}
pub async fn transact<F>(&self, mut f: F, options: TransactOption) -> Result<F::Item, F::Error>
where
F: DatabaseTransact,
{
let is_idempotent = options.is_idempotent;
let time_out = options.time_out.map(|d| Instant::now() + d);
let retry_limit = options.retry_limit;
let mut tries: u32 = 0;
let mut trx = self.create_trx()?;
let mut can_retry = move || {
tries += 1;
retry_limit.map(|limit| tries < limit).unwrap_or(true)
&& time_out.map(|t| Instant::now() < t).unwrap_or(true)
};
loop {
let r = f.transact(trx).await;
f = r.0;
trx = r.1;
trx = match r.2 {
Ok(item) => match trx.commit().await {
Ok(_) => break Ok(item),
Err(e) => {
if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
e.on_error().await?
} else {
break Err(F::Error::from(e.into()));
}
}
},
Err(user_err) => match user_err.try_into_fdb_error() {
Ok(e) => {
if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
trx.on_error(e).await?
} else {
break Err(F::Error::from(e));
}
}
Err(user_err) => break Err(user_err),
},
};
}
}
}
#[cfg(feature = "fdb-7_1")]
#[derive(Serialize, Deserialize, Debug)]
pub struct TenantInfo {
pub id: i64,
pub prefix: Vec<u8>,
pub name: Vec<u8>,
}
#[cfg_api_versions(min = 730)]
#[derive(Serialize, Deserialize, Debug)]
pub struct TenantInfo {
pub id: i64,
pub prefix: FDBTenantPrintableInfo,
pub name: FDBTenantPrintableInfo,
}
impl TryFrom<(&[u8], &[u8])> for TenantInfo {
type Error = Error;
fn try_from(k_v: (&[u8], &[u8])) -> Result<Self, Self::Error> {
let value = k_v.1;
match serde_json::from_slice::<FDBTenantInfo>(value) {
Ok(tenant_info) => if_cfg_api_versions!(min = 730 => {
Ok(TenantInfo {
name: tenant_info.name,
id: tenant_info.id,
prefix: tenant_info.prefix,
})
} else {
Ok(TenantInfo {
name: k_v.0.split_at(TENANT_MAP_PREFIX.len()).1.to_vec(),
id: tenant_info.id,
prefix: tenant_info.prefix,
})
}),
Err(err) => Err(err),
}
}
}
#[cfg(feature = "fdb-7_1")]
#[derive(Serialize, Deserialize, Debug)]
struct FDBTenantInfo {
id: i64,
#[serde(with = "serde_bytes")]
prefix: Vec<u8>,
}
#[cfg_api_versions(min = 730)]
#[derive(Serialize, Deserialize, Debug)]
struct FDBTenantInfo {
id: i64,
lock_state: TenantLockState,
name: FDBTenantPrintableInfo,
prefix: FDBTenantPrintableInfo,
}
#[cfg_api_versions(min = 730)]
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
enum TenantLockState {
Unlocked,
Locked,
ReadOnly,
}
#[cfg_api_versions(min = 730)]
#[derive(Serialize, Deserialize, Debug)]
pub struct FDBTenantPrintableInfo {
base64: String,
printable: String,
}
#[derive(Debug)]
pub struct TenantManagement;
impl TenantManagement {
pub async fn create_tenant(db: &Database, tenant_name: &[u8]) -> Result<(), FdbError> {
let checked_existence = AtomicBool::new(false);
let checked_existence_ref = &checked_existence;
let mut key: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + tenant_name.len());
key.extend_from_slice(TENANT_MAP_PREFIX);
key.extend_from_slice(tenant_name);
let key_ref = &key;
db.run(|trx, _maybe_committed| async move {
trx.set_option(TransactionOption::SpecialKeySpaceEnableWrites)?;
if checked_existence_ref.load(Ordering::SeqCst) {
trx.set(key_ref, &[]);
Ok(())
} else {
let maybe_key = trx.get(key_ref, false).await?;
checked_existence_ref.store(true, Ordering::SeqCst);
match maybe_key {
None => {
trx.set(key_ref, &[]);
Ok(())
}
Some(_) => {
Err(FdbBindingError::from(FdbError::new(2132)))
}
}
}
})
.await
.map_err(|e| e.get_fdb_error().unwrap())
}
pub async fn get_tenant(
db: &Database,
tenant_name: &[u8],
) -> Result<Option<Result<TenantInfo, serde_json::Error>>, FdbError> {
let mut key: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + tenant_name.len());
key.extend_from_slice(TENANT_MAP_PREFIX);
key.extend_from_slice(tenant_name);
let key_ref = &key;
match db
.run(|trx, _maybe_committed| async move {
trx.set_option(TransactionOption::ReadSystemKeys)?;
trx.set_option(TransactionOption::ReadLockAware)?;
Ok(trx.get(key_ref, false).await?)
})
.await
{
Ok(None) => Ok(None),
Ok(Some(kv)) => Ok(Some(TenantInfo::try_from((key.as_slice(), kv.as_ref())))),
Err(err) => Err(err.get_fdb_error().unwrap()),
}
}
pub async fn delete_tenant(db: &Database, tenant_name: &[u8]) -> Result<(), FdbError> {
let checked_existence = AtomicBool::new(false);
let checked_existence_ref = &checked_existence;
let mut key: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + tenant_name.len());
key.extend_from_slice(TENANT_MAP_PREFIX);
key.extend_from_slice(tenant_name);
let key_ref = &key;
db.run(|trx, _maybe_committed| async move {
trx.set_option(TransactionOption::SpecialKeySpaceEnableWrites)?;
if checked_existence_ref.load(Ordering::SeqCst) {
trx.clear(key_ref);
Ok(())
} else {
let maybe_key = trx.get(key_ref, false).await?;
checked_existence_ref.store(true, Ordering::SeqCst);
match maybe_key {
None => {
Err(FdbBindingError::from(FdbError::new(2131)))
}
Some(_) => {
trx.clear(key_ref);
Ok(())
}
}
}
})
.await
.map_err(|e| e.get_fdb_error().unwrap())
}
pub async fn list_tenant(
db: &Database,
begin: &[u8],
end: &[u8],
limit: Option<usize>,
) -> Result<Vec<Result<TenantInfo, serde_json::Error>>, FdbError> {
let trx = db.create_trx()?;
trx.set_option(TransactionOption::ReadSystemKeys)?;
trx.set_option(TransactionOption::ReadLockAware)?;
let mut begin_range: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + begin.len());
begin_range.extend_from_slice(TENANT_MAP_PREFIX);
begin_range.extend_from_slice(begin);
let end_range = if end.is_empty() {
TENANT_MAP_PREFIX_END.to_vec()
} else {
let mut end_range: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + end.len());
end_range.extend_from_slice(TENANT_MAP_PREFIX);
end_range.extend_from_slice(end);
end_range
};
let range_option = RangeOption {
begin: KeySelector::first_greater_than(begin_range),
end: KeySelector::first_greater_than(end_range),
limit,
..Default::default()
};
trx.get_ranges_keyvalues(range_option, false)
.map_ok(|fdb_value| TenantInfo::try_from((fdb_value.key(), fdb_value.value())))
.try_collect::<Vec<Result<TenantInfo, serde_json::Error>>>()
.await
}
}