use std::convert::TryInto;
use std::marker::PhantomData;
use std::pin::Pin;
use std::ptr::NonNull;
use std::time::{Duration, Instant};
use fdb_sys::if_cfg_api_versions;
use foundationdb_macros::cfg_api_versions;
use foundationdb_sys as fdb_sys;
use crate::metrics::{MetricsReport, TransactionMetrics};
use crate::options;
use crate::transaction::*;
use crate::{FdbError, FdbResult, error};
use crate::error::FdbBindingError;
#[cfg_api_versions(min = 710)]
#[cfg(feature = "tenant-experimental")]
use crate::tenant::FdbTenant;
use futures::prelude::*;
pub struct MaybeCommitted(bool);
impl From<MaybeCommitted> for bool {
fn from(value: MaybeCommitted) -> Self {
value.0
}
}
pub trait RunnerHooks {
fn on_commit_error(
&self,
_err: &TransactionCommitError,
) -> impl Future<Output = FdbResult<()>> + Send {
async { Ok(()) }
}
fn on_closure_error(&self, _err: &FdbError) {}
fn on_error_duration(&self, _duration_ms: u64) {}
fn on_commit_success(&self, _committed: &TransactionCommitted, _commit_duration_ms: u64) {}
fn on_retry(&self) {}
fn on_complete(&self) {}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct NoopHooks;
impl RunnerHooks for NoopHooks {}
pub(crate) struct InstrumentedHooks {
pub(crate) metrics: TransactionMetrics,
pub(crate) start: Instant,
}
impl RunnerHooks for InstrumentedHooks {
async fn on_commit_error(&self, err: &TransactionCommitError) -> FdbResult<()> {
if err.code() == 1020 {
self.metrics.increment_conflict_count();
}
let keys = err.conflicting_keys().await?;
if !keys.is_empty() {
self.metrics.set_conflicting_keys(keys);
}
Ok(())
}
fn on_error_duration(&self, duration_ms: u64) {
self.metrics.add_error_time(duration_ms);
}
fn on_commit_success(&self, committed: &TransactionCommitted, commit_duration_ms: u64) {
self.metrics.record_commit_time(commit_duration_ms);
if let Ok(version) = committed.committed_version() {
self.metrics.set_commit_version(version);
}
}
fn on_retry(&self) {
self.metrics.reset_current();
}
fn on_complete(&self) {
let total_duration = self.start.elapsed().as_millis() as u64;
self.metrics.set_execution_time(total_duration);
}
}
#[cfg_attr(
feature = "trace",
tracing::instrument(level = "debug", skip(initial_transaction, hooks, closure))
)]
pub(crate) async fn run_with_hooks<F, Fut, T, H: RunnerHooks>(
initial_transaction: RetryableTransaction,
hooks: &H,
closure: F,
) -> Result<T, FdbBindingError>
where
F: Fn(RetryableTransaction, MaybeCommitted) -> Fut,
Fut: Future<Output = Result<T, FdbBindingError>>,
{
let mut maybe_committed = false;
let mut transaction = initial_transaction;
#[cfg(feature = "trace")]
let mut iteration: u64 = 0;
loop {
#[cfg(feature = "trace")]
{
iteration += 1;
}
let result_closure = closure(transaction.clone(), MaybeCommitted(maybe_committed)).await;
if let Err(e) = result_closure {
if let Some(fdb_err) = e.get_fdb_error() {
maybe_committed = fdb_err.is_maybe_committed();
hooks.on_closure_error(&fdb_err);
let now_on_error = Instant::now();
match transaction.on_error(fdb_err).await {
Ok(Ok(t)) => {
hooks.on_error_duration(now_on_error.elapsed().as_millis() as u64);
#[cfg(feature = "trace")]
{
let error_code = fdb_err.code();
tracing::warn!(iteration, error_code, "restarting transaction");
}
hooks.on_retry();
transaction = t;
continue;
}
Ok(Err(non_retryable)) => {
return Err(FdbBindingError::from(non_retryable));
}
Err(binding_err) => {
return Err(binding_err);
}
}
}
return Err(e);
}
#[cfg(feature = "trace")]
tracing::info!(iteration, "closure executed, checking result...");
let now_commit = Instant::now();
let commit_result = transaction.commit().await;
let commit_duration = now_commit.elapsed().as_millis() as u64;
match commit_result {
Err(err) => {
#[cfg(feature = "trace")]
tracing::error!(
iteration,
"transaction reference kept, aborting transaction"
);
return Err(err);
}
Ok(Ok(committed)) => {
hooks.on_commit_success(&committed, commit_duration);
#[cfg(feature = "trace")]
tracing::info!(iteration, "success, returning result");
return result_closure;
}
Ok(Err(commit_error)) => {
#[cfg(feature = "trace")]
let error_code = commit_error.code();
maybe_committed = commit_error.is_maybe_committed();
if let Err(_e) = hooks.on_commit_error(&commit_error).await {
#[cfg(feature = "trace")]
tracing::debug!(error_code = _e.code(), "on_commit_error hook failed");
}
let now_on_error = Instant::now();
match commit_error.on_error().await {
Ok(t) => {
hooks.on_error_duration(now_on_error.elapsed().as_millis() as u64);
#[cfg(feature = "trace")]
tracing::warn!(iteration, error_code, "restarting transaction");
hooks.on_retry();
transaction = RetryableTransaction::new(t);
continue;
}
Err(non_retryable) => {
#[cfg(feature = "trace")]
{
let error_code = non_retryable.code();
tracing::error!(
iteration,
error_code,
"could not commit, non retryable error"
);
}
return Err(FdbBindingError::from(non_retryable));
}
}
}
}
}
}
pub struct Database {
pub(crate) inner: NonNull<fdb_sys::FDBDatabase>,
}
unsafe impl Send for Database {}
unsafe impl Sync for Database {}
impl Drop for Database {
fn drop(&mut self) {
unsafe {
fdb_sys::fdb_database_destroy(self.inner.as_ptr());
}
}
}
#[cfg_api_versions(min = 610)]
impl Database {
pub fn new(path: Option<&str>) -> FdbResult<Database> {
let path_str =
path.map(|path| std::ffi::CString::new(path).expect("path to be convertible to CStr"));
let path_ptr = path_str
.as_ref()
.map(|path| path.as_ptr())
.unwrap_or(std::ptr::null());
let mut v: *mut fdb_sys::FDBDatabase = std::ptr::null_mut();
let err = unsafe { fdb_sys::fdb_create_database(path_ptr, &mut v) };
drop(path_str); error::eval(err)?;
let ptr =
NonNull::new(v).expect("fdb_create_database to not return null if there is no error");
Ok(unsafe { Self::new_from_pointer(ptr) })
}
pub unsafe fn new_from_pointer(ptr: NonNull<fdb_sys::FDBDatabase>) -> Self {
Self { inner: ptr }
}
pub fn from_path(path: &str) -> FdbResult<Database> {
Self::new(Some(path))
}
#[allow(clippy::should_implement_trait)]
pub fn default() -> FdbResult<Database> {
Self::new(None)
}
}
#[cfg_api_versions(min = 710)]
#[cfg(feature = "tenant-experimental")]
impl Database {
pub fn open_tenant(&self, tenant_name: &[u8]) -> FdbResult<FdbTenant> {
let mut ptr: *mut fdb_sys::FDB_tenant = std::ptr::null_mut();
let err = unsafe {
fdb_sys::fdb_database_open_tenant(
self.inner.as_ptr(),
tenant_name.as_ptr(),
tenant_name.len().try_into().unwrap(),
&mut ptr,
)
};
error::eval(err)?;
Ok(FdbTenant {
inner: NonNull::new(ptr)
.expect("fdb_database_open_tenant to not return null if there is no error"),
name: tenant_name.to_owned(),
})
}
}
#[cfg_api_versions(min = 730)]
impl Database {
pub fn get_client_status(
&self,
) -> impl Future<Output = FdbResult<crate::future::FdbSlice>> + Send + Sync + Unpin + use<>
{
crate::future::FdbFuture::new(unsafe {
fdb_sys::fdb_database_get_client_status(self.inner.as_ptr())
})
}
}
impl Database {
pub async fn new_compat(path: Option<&str>) -> FdbResult<Database> {
if_cfg_api_versions!(min = 510, max = 600 => {
let cluster = crate::cluster::Cluster::new(path).await?;
let database = cluster.create_database().await?;
Ok(database)
} else {
Database::new(path)
})
}
pub fn set_option(&self, opt: options::DatabaseOption) -> FdbResult<()> {
unsafe { opt.apply(self.inner.as_ptr()) }
}
#[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
pub fn create_trx(&self) -> FdbResult<Transaction> {
let mut trx: *mut fdb_sys::FDBTransaction = std::ptr::null_mut();
let err =
unsafe { fdb_sys::fdb_database_create_transaction(self.inner.as_ptr(), &mut trx) };
error::eval(err)?;
Ok(Transaction::new(NonNull::new(trx).expect(
"fdb_database_create_transaction to not return null if there is no error",
)))
}
#[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
pub fn create_instrumented_trx(
&self,
metrics: TransactionMetrics,
) -> Result<Transaction, FdbBindingError> {
let mut trx: *mut fdb_sys::FDBTransaction = std::ptr::null_mut();
let err =
unsafe { fdb_sys::fdb_database_create_transaction(self.inner.as_ptr(), &mut trx) };
error::eval(err)?;
let inner = NonNull::new(trx)
.expect("fdb_database_create_transaction to not return null if there is no error");
Ok(Transaction::new_instrumented(inner, metrics))
}
#[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
fn create_retryable_trx(&self) -> FdbResult<RetryableTransaction> {
Ok(RetryableTransaction::new(self.create_trx()?))
}
#[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
pub fn create_intrumented_retryable_trx(
&self,
metrics: TransactionMetrics,
) -> Result<RetryableTransaction, FdbBindingError> {
Ok(RetryableTransaction::new(
self.create_instrumented_trx(metrics.clone())?,
))
}
pub async fn transact<F>(&self, mut f: F, options: TransactOption) -> Result<F::Item, F::Error>
where
F: DatabaseTransact,
{
let is_idempotent = options.is_idempotent;
let time_out = options.time_out.map(|d| Instant::now() + d);
let retry_limit = options.retry_limit;
let mut tries: u32 = 0;
let mut trx = self.create_trx()?;
let mut can_retry = move || {
tries += 1;
retry_limit.map(|limit| tries < limit).unwrap_or(true)
&& time_out.map(|t| Instant::now() < t).unwrap_or(true)
};
loop {
let r = f.transact(trx).await;
f = r.0;
trx = r.1;
trx = match r.2 {
Ok(item) => match trx.commit().await {
Ok(_) => break Ok(item),
Err(e) => {
if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
e.on_error().await?
} else {
break Err(F::Error::from(e.into()));
}
}
},
Err(user_err) => match user_err.try_into_fdb_error() {
Ok(e) => {
if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
trx.on_error(e).await?
} else {
break Err(F::Error::from(e));
}
}
Err(user_err) => break Err(user_err),
},
};
}
}
pub fn transact_boxed<'trx, F, D, T, E>(
&'trx self,
data: D,
f: F,
options: TransactOption,
) -> impl Future<Output = Result<T, E>> + Send + 'trx
where
for<'a> F: FnMut(
&'a Transaction,
&'a mut D,
) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
E: TransactError,
F: Send + 'trx,
T: Send + 'trx,
E: Send + 'trx,
D: Send + 'trx,
{
self.transact(
boxed::FnMutBoxed {
f,
d: data,
m: PhantomData,
},
options,
)
}
pub fn transact_boxed_local<'trx, F, D, T, E>(
&'trx self,
data: D,
f: F,
options: TransactOption,
) -> impl Future<Output = Result<T, E>> + 'trx
where
for<'a> F:
FnMut(&'a Transaction, &'a mut D) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
E: TransactError,
F: 'trx,
T: 'trx,
E: 'trx,
D: 'trx,
{
self.transact(
boxed_local::FnMutBoxedLocal {
f,
d: data,
m: PhantomData,
},
options,
)
}
#[cfg_attr(
feature = "trace",
tracing::instrument(level = "debug", skip(self, closure))
)]
pub async fn run<F, Fut, T>(&self, closure: F) -> Result<T, FdbBindingError>
where
F: Fn(RetryableTransaction, MaybeCommitted) -> Fut,
Fut: Future<Output = Result<T, FdbBindingError>>,
{
let transaction = self.create_retryable_trx()?;
run_with_hooks(transaction, &NoopHooks, closure).await
}
#[cfg_attr(
feature = "trace",
tracing::instrument(level = "debug", skip(self, hooks, closure))
)]
pub async fn run_with_hooks<F, Fut, T, H: RunnerHooks>(
&self,
hooks: &H,
closure: F,
) -> Result<T, FdbBindingError>
where
F: Fn(RetryableTransaction, MaybeCommitted) -> Fut,
Fut: Future<Output = Result<T, FdbBindingError>>,
{
let transaction = self.create_retryable_trx()?;
run_with_hooks(transaction, hooks, closure).await
}
#[cfg_attr(
feature = "trace",
tracing::instrument(level = "debug", skip(self, closure))
)]
pub async fn instrumented_run<F, Fut, T>(
&self,
closure: F,
) -> Result<(T, MetricsReport), (FdbBindingError, MetricsReport)>
where
F: Fn(RetryableTransaction, MaybeCommitted) -> Fut,
Fut: Future<Output = Result<T, FdbBindingError>>,
{
let metrics = TransactionMetrics::new();
let hooks = InstrumentedHooks {
metrics: metrics.clone(),
start: Instant::now(),
};
let transaction = match self.create_intrumented_retryable_trx(metrics.clone()) {
Ok(trx) => trx,
Err(err) => {
hooks.on_complete();
return Err((err, metrics.get_metrics_data()));
}
};
match run_with_hooks(transaction, &hooks, closure).await {
Ok(val) => {
hooks.on_complete();
Ok((val, metrics.get_metrics_data()))
}
Err(err) => {
hooks.on_complete();
Err((err, metrics.get_metrics_data()))
}
}
}
pub async fn perform_no_op(&self) -> FdbResult<()> {
let trx = self.create_trx()?;
trx.set_read_version(42);
trx.get_read_version().await?;
Ok(())
}
#[cfg_api_versions(min = 710)]
pub async fn get_main_thread_busyness(&self) -> FdbResult<f64> {
let busyness =
unsafe { fdb_sys::fdb_database_get_main_thread_busyness(self.inner.as_ptr()) };
Ok(busyness)
}
}
pub trait DatabaseTransact: Sized {
type Item;
type Error: TransactError;
type Future: Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)>;
fn transact(self, trx: Transaction) -> Self::Future;
}
#[allow(clippy::needless_lifetimes)]
#[allow(clippy::type_complexity)]
mod boxed {
use super::*;
async fn boxed_data_fut<'t, F, T, E, D>(
mut f: FnMutBoxed<'t, F, D>,
trx: Transaction,
) -> (FnMutBoxed<'t, F, D>, Transaction, Result<T, E>)
where
F: for<'a> FnMut(
&'a Transaction,
&'a mut D,
) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
E: TransactError,
{
let r = (f.f)(&trx, &mut f.d).await;
(f, trx, r)
}
pub struct FnMutBoxed<'t, F, D> {
pub f: F,
pub d: D,
pub m: PhantomData<&'t ()>,
}
impl<'t, F, T, E, D> DatabaseTransact for FnMutBoxed<'t, F, D>
where
F: for<'a> FnMut(
&'a Transaction,
&'a mut D,
) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
F: 't + Send,
T: 't,
E: 't,
D: 't + Send,
E: TransactError,
{
type Item = T;
type Error = E;
type Future = Pin<
Box<
dyn Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)>
+ Send
+ 't,
>,
>;
fn transact(self, trx: Transaction) -> Self::Future {
boxed_data_fut(self, trx).boxed()
}
}
}
#[allow(clippy::needless_lifetimes)]
#[allow(clippy::type_complexity)]
mod boxed_local {
use super::*;
async fn boxed_local_data_fut<'t, F, T, E, D>(
mut f: FnMutBoxedLocal<'t, F, D>,
trx: Transaction,
) -> (FnMutBoxedLocal<'t, F, D>, Transaction, Result<T, E>)
where
F: for<'a> FnMut(
&'a Transaction,
&'a mut D,
) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
E: TransactError,
{
let r = (f.f)(&trx, &mut f.d).await;
(f, trx, r)
}
pub struct FnMutBoxedLocal<'t, F, D> {
pub f: F,
pub d: D,
pub m: PhantomData<&'t ()>,
}
impl<'t, F, T, E, D> DatabaseTransact for FnMutBoxedLocal<'t, F, D>
where
F: for<'a> FnMut(
&'a Transaction,
&'a mut D,
) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
F: 't,
T: 't,
E: 't,
D: 't,
E: TransactError,
{
type Item = T;
type Error = E;
type Future = Pin<
Box<dyn Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)> + 't>,
>;
fn transact(self, trx: Transaction) -> Self::Future {
boxed_local_data_fut(self, trx).boxed_local()
}
}
}
pub trait TransactError: From<FdbError> {
fn try_into_fdb_error(self) -> Result<FdbError, Self>;
}
impl<T> TransactError for T
where
T: From<FdbError> + TryInto<FdbError, Error = T>,
{
fn try_into_fdb_error(self) -> Result<FdbError, Self> {
self.try_into()
}
}
impl TransactError for FdbError {
fn try_into_fdb_error(self) -> Result<FdbError, Self> {
Ok(self)
}
}
#[derive(Default, Clone)]
pub struct TransactOption {
pub retry_limit: Option<u32>,
pub time_out: Option<Duration>,
pub is_idempotent: bool,
}
impl TransactOption {
pub fn idempotent() -> Self {
Self {
is_idempotent: true,
..TransactOption::default()
}
}
}