use foundationdb_sys as fdb_sys;
use std::fmt;
use std::ops::{Deref, Range, RangeInclusive};
use std::ptr::NonNull;
use std::sync::Arc;
use crate::future::*;
use crate::keyselector::*;
use crate::metrics::{FdbCommand, TransactionMetrics};
use crate::options;
use crate::{FdbError, FdbResult, error};
use foundationdb_macros::cfg_api_versions;
use crate::error::{FdbBindingError, TransactionMetricsNotFound};
use futures::{
Future, FutureExt, Stream, TryFutureExt, TryStreamExt, future, future::Either, stream,
};
#[cfg_api_versions(min = 610)]
const METADATA_VERSION_KEY: &[u8] = b"\xff/metadataVersion";
const CONFLICTING_KEYS_PREFIX: &[u8] = b"\xff\xff/transaction/conflicting_keys/";
const CONFLICTING_KEYS_END: &[u8] = b"\xff\xff/transaction/conflicting_keys/\xff\xff";
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ConflictingKeyRange {
begin: Vec<u8>,
end: Vec<u8>,
}
impl ConflictingKeyRange {
pub fn begin(&self) -> &[u8] {
&self.begin
}
pub fn end(&self) -> &[u8] {
&self.end
}
}
impl fmt::Display for ConflictingKeyRange {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}..{}",
String::from_utf8_lossy(&self.begin),
String::from_utf8_lossy(&self.end),
)
}
}
#[derive(Debug)]
#[repr(transparent)]
pub struct TransactionCommitted {
tr: Transaction,
}
impl TransactionCommitted {
pub fn committed_version(&self) -> FdbResult<i64> {
let mut version: i64 = 0;
error::eval(unsafe {
fdb_sys::fdb_transaction_get_committed_version(self.tr.inner.as_ptr(), &mut version)
})?;
Ok(version)
}
pub fn reset(mut self) -> Transaction {
self.tr.reset();
self.tr
}
}
impl From<TransactionCommitted> for Transaction {
fn from(tc: TransactionCommitted) -> Transaction {
tc.reset()
}
}
pub struct TransactionCommitError {
tr: Transaction,
err: FdbError,
}
impl TransactionCommitError {
pub fn on_error(self) -> impl Future<Output = FdbResult<Transaction>> {
FdbFuture::<()>::new(unsafe {
fdb_sys::fdb_transaction_on_error(self.tr.inner.as_ptr(), self.err.code())
})
.map_ok(|()| self.tr)
}
#[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
pub async fn conflicting_keys(&self) -> FdbResult<Vec<ConflictingKeyRange>> {
self.tr.conflicting_keys().await
}
pub fn reset(mut self) -> Transaction {
self.tr.reset();
self.tr
}
}
impl Deref for TransactionCommitError {
type Target = FdbError;
fn deref(&self) -> &FdbError {
&self.err
}
}
impl From<TransactionCommitError> for FdbError {
fn from(tce: TransactionCommitError) -> FdbError {
tce.err
}
}
impl fmt::Debug for TransactionCommitError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "TransactionCommitError({})", self.err)
}
}
impl fmt::Display for TransactionCommitError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.err.fmt(f)
}
}
impl std::error::Error for TransactionCommitError {}
type TransactionResult = Result<TransactionCommitted, TransactionCommitError>;
#[derive(Debug)]
#[repr(transparent)]
pub struct TransactionCancelled {
tr: Transaction,
}
impl TransactionCancelled {
pub fn reset(mut self) -> Transaction {
self.tr.reset();
self.tr
}
}
impl From<TransactionCancelled> for Transaction {
fn from(tc: TransactionCancelled) -> Transaction {
tc.reset()
}
}
#[derive(Debug)]
pub struct Transaction {
inner: NonNull<fdb_sys::FDBTransaction>,
metrics: Option<TransactionMetrics>,
}
unsafe impl Send for Transaction {}
unsafe impl Sync for Transaction {}
#[inline]
fn fdb_bool(v: bool) -> fdb_sys::fdb_bool_t {
if v { 1 } else { 0 }
}
#[inline]
fn fdb_len(len: usize, context: &'static str) -> std::os::raw::c_int {
assert!(len <= i32::MAX as usize, "{context}.len() > i32::MAX");
len as i32
}
#[inline]
fn fdb_iteration(iteration: usize) -> std::os::raw::c_int {
if iteration > i32::MAX as usize {
0 } else {
iteration as i32
}
}
#[inline]
fn fdb_limit(v: usize) -> std::os::raw::c_int {
if v > i32::MAX as usize {
i32::MAX
} else {
v as i32
}
}
#[derive(Debug, Clone)]
pub struct RangeOption<'a> {
pub begin: KeySelector<'a>,
pub end: KeySelector<'a>,
pub limit: Option<usize>,
pub target_bytes: usize,
pub mode: options::StreamingMode,
pub reverse: bool,
#[doc(hidden)]
pub __non_exhaustive: std::marker::PhantomData<()>,
}
impl RangeOption<'_> {
pub fn rev(mut self) -> Self {
self.reverse = !self.reverse;
self
}
pub fn next_range(mut self, kvs: &FdbValues) -> Option<Self> {
if !kvs.more() {
return None;
}
let last = kvs.last()?;
let last_key = last.key();
if let Some(limit) = self.limit.as_mut() {
*limit = limit.saturating_sub(kvs.len());
if *limit == 0 {
return None;
}
}
if self.reverse {
self.end.make_first_greater_or_equal(last_key);
} else {
self.begin.make_first_greater_than(last_key);
}
Some(self)
}
#[cfg_api_versions(min = 710)]
pub(crate) fn next_mapped_range(mut self, kvs: &MappedKeyValues) -> Option<Self> {
if !kvs.more() {
return None;
}
let last = kvs.last()?;
let last_key = last.parent_key();
if let Some(limit) = self.limit.as_mut() {
*limit = limit.saturating_sub(kvs.len());
if *limit == 0 {
return None;
}
}
if self.reverse {
self.end.make_first_greater_or_equal(last_key);
} else {
self.begin.make_first_greater_than(last_key);
}
Some(self)
}
}
impl Default for RangeOption<'_> {
fn default() -> Self {
Self {
begin: KeySelector::first_greater_or_equal([].as_ref()),
end: KeySelector::first_greater_or_equal([].as_ref()),
limit: None,
target_bytes: 0,
mode: options::StreamingMode::Iterator,
reverse: false,
__non_exhaustive: std::marker::PhantomData,
}
}
}
impl<'a> From<(KeySelector<'a>, KeySelector<'a>)> for RangeOption<'a> {
fn from((begin, end): (KeySelector<'a>, KeySelector<'a>)) -> Self {
Self {
begin,
end,
..Self::default()
}
}
}
impl From<(Vec<u8>, Vec<u8>)> for RangeOption<'static> {
fn from((begin, end): (Vec<u8>, Vec<u8>)) -> Self {
Self {
begin: KeySelector::first_greater_or_equal(begin),
end: KeySelector::first_greater_or_equal(end),
..Self::default()
}
}
}
impl<'a> From<(&'a [u8], &'a [u8])> for RangeOption<'a> {
fn from((begin, end): (&'a [u8], &'a [u8])) -> Self {
Self {
begin: KeySelector::first_greater_or_equal(begin),
end: KeySelector::first_greater_or_equal(end),
..Self::default()
}
}
}
impl<'a> From<std::ops::Range<KeySelector<'a>>> for RangeOption<'a> {
fn from(range: Range<KeySelector<'a>>) -> Self {
RangeOption::from((range.start, range.end))
}
}
impl<'a> From<std::ops::Range<&'a [u8]>> for RangeOption<'a> {
fn from(range: Range<&'a [u8]>) -> Self {
RangeOption::from((range.start, range.end))
}
}
impl From<std::ops::Range<std::vec::Vec<u8>>> for RangeOption<'static> {
fn from(range: Range<Vec<u8>>) -> Self {
RangeOption::from((range.start, range.end))
}
}
impl<'a> From<std::ops::RangeInclusive<&'a [u8]>> for RangeOption<'a> {
fn from(range: RangeInclusive<&'a [u8]>) -> Self {
let (start, end) = range.into_inner();
(KeySelector::first_greater_or_equal(start)..KeySelector::first_greater_than(end)).into()
}
}
impl From<std::ops::RangeInclusive<std::vec::Vec<u8>>> for RangeOption<'static> {
fn from(range: RangeInclusive<Vec<u8>>) -> Self {
let (start, end) = range.into_inner();
(KeySelector::first_greater_or_equal(start)..KeySelector::first_greater_than(end)).into()
}
}
impl Transaction {
pub(crate) fn new(inner: NonNull<fdb_sys::FDBTransaction>) -> Self {
Self {
inner,
metrics: None,
}
}
pub fn new_instrumented(
inner: NonNull<fdb_sys::FDBTransaction>,
metrics: TransactionMetrics,
) -> Self {
Self {
inner,
metrics: Some(metrics),
}
}
pub fn set_option(&self, opt: options::TransactionOption) -> FdbResult<()> {
unsafe { opt.apply(self.inner.as_ptr()) }
}
pub fn set_raw_option(
&self,
code: fdb_sys::FDBTransactionOption,
data: Option<Vec<u8>>,
) -> FdbResult<()> {
let (data_ptr, size) = data
.as_ref()
.map(|data| {
(
data.as_ptr(),
i32::try_from(data.len()).expect("len to fit in i32"),
)
})
.unwrap_or_else(|| (std::ptr::null(), 0));
let err = unsafe {
fdb_sys::fdb_transaction_set_option(self.inner.as_ptr(), code, data_ptr, size)
};
if err != 0 {
Err(FdbError::from_code(err))
} else {
Ok(())
}
}
#[cfg_attr(
feature = "trace",
tracing::instrument(level = "debug", skip(self, key, value))
)]
pub fn set(&self, key: &[u8], value: &[u8]) {
unsafe {
fdb_sys::fdb_transaction_set(
self.inner.as_ptr(),
key.as_ptr(),
fdb_len(key.len(), "key"),
value.as_ptr(),
fdb_len(value.len(), "value"),
)
}
if let Some(metrics) = &self.metrics {
metrics.report_metrics(FdbCommand::Set((key.len() + value.len()) as u64));
}
}
#[cfg_attr(
feature = "trace",
tracing::instrument(level = "debug", skip(self, key))
)]
pub fn clear(&self, key: &[u8]) {
unsafe {
fdb_sys::fdb_transaction_clear(
self.inner.as_ptr(),
key.as_ptr(),
fdb_len(key.len(), "key"),
)
}
if let Some(metrics) = &self.metrics {
metrics.report_metrics(FdbCommand::Clear);
}
}
#[cfg_attr(
feature = "trace",
tracing::instrument(level = "debug", skip(self, key))
)]
pub fn get(
&self,
key: &[u8],
snapshot: bool,
) -> impl Future<Output = FdbResult<Option<FdbSlice>>> + Send + Sync + Unpin + use<> {
let metrics = self.metrics.clone();
let lenght_key = key.len();
FdbFuture::<Option<FdbSlice>>::new(unsafe {
fdb_sys::fdb_transaction_get(
self.inner.as_ptr(),
key.as_ptr(),
fdb_len(key.len(), "key"),
fdb_bool(snapshot),
)
})
.map(move |result| {
if let Ok(value) = &result {
if let Some(metrics) = metrics.as_ref() {
let (bytes_count, kv_fetched) = if let Some(values) = value {
((lenght_key + values.len()) as u64, 1)
} else {
(lenght_key as u64, 0)
};
metrics.report_metrics(FdbCommand::Get(bytes_count, kv_fetched));
}
}
result
})
}
#[cfg_attr(
feature = "trace",
tracing::instrument(level = "debug", skip(self, key, param))
)]
pub fn atomic_op(&self, key: &[u8], param: &[u8], op_type: options::MutationType) {
unsafe {
fdb_sys::fdb_transaction_atomic_op(
self.inner.as_ptr(),
key.as_ptr(),
fdb_len(key.len(), "key"),
param.as_ptr(),
fdb_len(param.len(), "param"),
op_type.code(),
)
}
if let Some(metrics) = &self.metrics {
metrics.report_metrics(FdbCommand::Atomic);
}
}
#[cfg_attr(
feature = "trace",
tracing::instrument(level = "debug", skip(self, selector, snapshot))
)]
pub fn get_key(
&self,
selector: &KeySelector,
snapshot: bool,
) -> impl Future<Output = FdbResult<FdbSlice>> + Send + Sync + Unpin + use<> {
let key = selector.key();
FdbFuture::new(unsafe {
fdb_sys::fdb_transaction_get_key(
self.inner.as_ptr(),
key.as_ptr(),
fdb_len(key.len(), "key"),
fdb_bool(selector.or_equal()),
selector.offset(),
fdb_bool(snapshot),
)
})
}
#[cfg_attr(
feature = "trace",
tracing::instrument(level = "debug", skip(self, opt, snapshot))
)]
pub fn get_ranges<'a>(
&'a self,
opt: RangeOption<'a>,
snapshot: bool,
) -> impl Stream<Item = FdbResult<FdbValues>> + Send + Sync + Unpin + 'a {
stream::unfold((1, Some(opt)), move |(iteration, maybe_opt)| {
if let Some(opt) = maybe_opt {
Either::Left(self.get_range(&opt, iteration as usize, snapshot).map(
move |maybe_values| {
let next_opt = match &maybe_values {
Ok(values) => opt.next_range(values),
Err(..) => None,
};
Some((maybe_values, (iteration + 1, next_opt)))
},
))
} else {
Either::Right(future::ready(None))
}
})
}
#[cfg_attr(
feature = "trace",
tracing::instrument(level = "debug", skip(self, opt, snapshot))
)]
pub fn get_ranges_keyvalues<'a>(
&'a self,
opt: RangeOption<'a>,
snapshot: bool,
) -> impl Stream<Item = FdbResult<FdbValue>> + Unpin + 'a {
self.get_ranges(opt, snapshot)
.map_ok(|values| stream::iter(values.into_iter().map(Ok)))
.try_flatten()
}
#[cfg_attr(
feature = "trace",
tracing::instrument(level = "debug", skip(self, opt, snapshot))
)]
pub fn get_range(
&self,
opt: &RangeOption,
iteration: usize,
snapshot: bool,
) -> impl Future<Output = FdbResult<FdbValues>> + Send + Sync + Unpin + use<> {
let begin = &opt.begin;
let end = &opt.end;
let key_begin = begin.key();
let key_end = end.key();
let metrics = self.metrics.clone();
FdbFuture::<FdbValues>::new(unsafe {
fdb_sys::fdb_transaction_get_range(
self.inner.as_ptr(),
key_begin.as_ptr(),
fdb_len(key_begin.len(), "key_begin"),
fdb_bool(begin.or_equal()),
begin.offset(),
key_end.as_ptr(),
fdb_len(key_end.len(), "key_end"),
fdb_bool(end.or_equal()),
end.offset(),
fdb_limit(opt.limit.unwrap_or(0)),
fdb_limit(opt.target_bytes),
opt.mode.code(),
fdb_iteration(iteration),
fdb_bool(snapshot),
fdb_bool(opt.reverse),
)
})
.map(move |result| {
if let (Ok(values), Some(metrics)) = (&result, metrics) {
let kv_fetched = values.len();
let mut bytes_count = 0;
for key_value in values.as_ref() {
let key_len = key_value.key().len();
let value_len = key_value.value().len();
bytes_count += (key_len + value_len) as u64
}
metrics.report_metrics(FdbCommand::GetRange(bytes_count, kv_fetched as u64));
};
result
})
}
#[cfg_api_versions(min = 710)]
#[cfg_attr(
feature = "trace",
tracing::instrument(level = "debug", skip(self, opt, mapper, snapshot))
)]
pub fn get_mapped_range(
&self,
opt: &RangeOption,
mapper: &[u8],
iteration: usize,
snapshot: bool,
) -> impl Future<Output = FdbResult<MappedKeyValues>> + Send + Sync + Unpin + use<> {
let begin = &opt.begin;
let end = &opt.end;
let key_begin = begin.key();
let key_end = end.key();
FdbFuture::new(unsafe {
fdb_sys::fdb_transaction_get_mapped_range(
self.inner.as_ptr(),
key_begin.as_ptr(),
fdb_len(key_begin.len(), "key_begin"),
fdb_bool(begin.or_equal()),
begin.offset(),
key_end.as_ptr(),
fdb_len(key_end.len(), "key_end"),
fdb_bool(end.or_equal()),
end.offset(),
mapper.as_ptr(),
fdb_len(mapper.len(), "mapper_length"),
fdb_limit(opt.limit.unwrap_or(0)),
fdb_limit(opt.target_bytes),
opt.mode.code(),
fdb_iteration(iteration),
fdb_bool(snapshot),
fdb_bool(opt.reverse),
)
})
}
#[cfg_api_versions(min = 710)]
#[cfg_attr(
feature = "trace",
tracing::instrument(level = "debug", skip(self, opt, mapper, snapshot))
)]
pub fn get_mapped_ranges<'a>(
&'a self,
opt: RangeOption<'a>,
mapper: &'a [u8],
snapshot: bool,
) -> impl Stream<Item = FdbResult<MappedKeyValues>> + Send + Sync + Unpin + 'a {
stream::unfold((1, Some(opt)), move |(iteration, maybe_opt)| {
if let Some(opt) = maybe_opt {
Either::Left(
self.get_mapped_range(&opt, mapper, iteration as usize, snapshot)
.map(move |maybe_values| {
let next_opt = match &maybe_values {
Ok(values) => opt.next_mapped_range(values),
Err(..) => None,
};
Some((maybe_values, (iteration + 1, next_opt)))
}),
)
} else {
Either::Right(future::ready(None))
}
})
}
#[cfg_attr(
feature = "trace",
tracing::instrument(level = "debug", skip(self, begin, end))
)]
pub fn clear_range(&self, begin: &[u8], end: &[u8]) {
unsafe {
fdb_sys::fdb_transaction_clear_range(
self.inner.as_ptr(),
begin.as_ptr(),
fdb_len(begin.len(), "begin"),
end.as_ptr(),
fdb_len(end.len(), "end"),
)
}
if let Some(metrics) = &self.metrics {
metrics.report_metrics(FdbCommand::ClearRange);
}
}
#[cfg_api_versions(min = 630)]
pub fn get_estimated_range_size_bytes(
&self,
begin: &[u8],
end: &[u8],
) -> impl Future<Output = FdbResult<i64>> + Send + Sync + Unpin + use<> {
FdbFuture::<i64>::new(unsafe {
fdb_sys::fdb_transaction_get_estimated_range_size_bytes(
self.inner.as_ptr(),
begin.as_ptr(),
fdb_len(begin.len(), "begin"),
end.as_ptr(),
fdb_len(end.len(), "end"),
)
})
}
#[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
pub fn commit(self) -> impl Future<Output = TransactionResult> + Send + Sync + Unpin {
FdbFuture::<()>::new(unsafe { fdb_sys::fdb_transaction_commit(self.inner.as_ptr()) }).map(
move |r| match r {
Ok(()) => Ok(TransactionCommitted { tr: self }),
Err(err) => Err(TransactionCommitError { tr: self, err }),
},
)
}
pub fn on_error(
self,
err: FdbError,
) -> impl Future<Output = FdbResult<Transaction>> + Send + Sync + Unpin {
FdbFuture::<()>::new(unsafe {
fdb_sys::fdb_transaction_on_error(self.inner.as_ptr(), err.code())
})
.map_ok(|()| self)
}
pub fn cancel(self) -> TransactionCancelled {
unsafe {
fdb_sys::fdb_transaction_cancel(self.inner.as_ptr());
}
TransactionCancelled { tr: self }
}
pub fn set_custom_metric(
&self,
name: &str,
value: u64,
labels: &[(&str, &str)],
) -> Result<(), TransactionMetricsNotFound> {
if let Some(metrics) = &self.metrics {
metrics.set_custom(name, value, labels);
Ok(())
} else {
Err(TransactionMetricsNotFound)
}
}
pub fn increment_custom_metric(
&self,
name: &str,
amount: u64,
labels: &[(&str, &str)],
) -> Result<(), TransactionMetricsNotFound> {
if let Some(metrics) = &self.metrics {
metrics.increment_custom(name, amount, labels);
Ok(())
} else {
Err(TransactionMetricsNotFound)
}
}
pub fn get_addresses_for_key(
&self,
key: &[u8],
) -> impl Future<Output = FdbResult<FdbAddresses>> + Send + Sync + Unpin + use<> {
FdbFuture::new(unsafe {
fdb_sys::fdb_transaction_get_addresses_for_key(
self.inner.as_ptr(),
key.as_ptr(),
fdb_len(key.len(), "key"),
)
})
}
pub fn watch(
&self,
key: &[u8],
) -> impl Future<Output = FdbResult<()>> + Send + Sync + Unpin + use<> {
FdbFuture::new(unsafe {
fdb_sys::fdb_transaction_watch(
self.inner.as_ptr(),
key.as_ptr(),
fdb_len(key.len(), "key"),
)
})
}
#[cfg_api_versions(min = 620)]
pub fn get_approximate_size(
&self,
) -> impl Future<Output = FdbResult<i64>> + Send + Sync + Unpin + use<> {
FdbFuture::new(unsafe {
fdb_sys::fdb_transaction_get_approximate_size(self.inner.as_ptr())
})
}
#[cfg_api_versions(min = 700)]
pub fn get_range_split_points(
&self,
begin: &[u8],
end: &[u8],
chunk_size: i64,
) -> impl Future<Output = FdbResult<FdbKeys>> + Send + Sync + Unpin + use<> {
FdbFuture::<FdbKeys>::new(unsafe {
fdb_sys::fdb_transaction_get_range_split_points(
self.inner.as_ptr(),
begin.as_ptr(),
fdb_len(begin.len(), "begin"),
end.as_ptr(),
fdb_len(end.len(), "end"),
chunk_size,
)
})
}
pub fn get_versionstamp(
&self,
) -> impl Future<Output = FdbResult<FdbSlice>> + Send + Sync + Unpin + use<> {
FdbFuture::new(unsafe { fdb_sys::fdb_transaction_get_versionstamp(self.inner.as_ptr()) })
}
pub fn get_read_version(
&self,
) -> impl Future<Output = FdbResult<i64>> + Send + Sync + Unpin + use<> {
FdbFuture::new(unsafe { fdb_sys::fdb_transaction_get_read_version(self.inner.as_ptr()) })
}
pub fn set_read_version(&self, version: i64) {
unsafe { fdb_sys::fdb_transaction_set_read_version(self.inner.as_ptr(), version) }
}
#[cfg_api_versions(min = 610)]
pub async fn get_metadata_version(&self, snapshot: bool) -> FdbResult<Option<i64>> {
match self.get(METADATA_VERSION_KEY, snapshot).await {
Ok(Some(fdb_slice)) => {
let value = fdb_slice.deref();
if value.len() < 8 {
return Ok(None);
}
let mut arr = [0u8; 8];
arr.copy_from_slice(&value[0..8]);
let transaction_version: i64 = i64::from_be_bytes(arr);
Ok(Some(transaction_version))
}
Ok(None) => Ok(None),
Err(err) => Err(err),
}
}
#[cfg_api_versions(min = 610)]
pub fn update_metadata_version(&self) {
let param = vec![0u8; 14];
self.atomic_op(
METADATA_VERSION_KEY,
param.as_slice(),
options::MutationType::SetVersionstampedValue,
)
}
pub fn reset(&mut self) {
unsafe { fdb_sys::fdb_transaction_reset(self.inner.as_ptr()) }
}
#[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
pub async fn conflicting_keys(&self) -> FdbResult<Vec<ConflictingKeyRange>> {
let opt = RangeOption::from((CONFLICTING_KEYS_PREFIX, CONFLICTING_KEYS_END));
let range_result = self.get_range(&opt, 1, false).await?;
let prefix_len = CONFLICTING_KEYS_PREFIX.len();
let mut ranges = Vec::new();
let mut current_begin: Option<Vec<u8>> = None;
for kv in range_result.iter() {
let raw_key = kv.key();
let actual_key = if raw_key.len() > prefix_len {
raw_key[prefix_len..].to_vec()
} else {
Vec::new()
};
match kv.value() {
b"1" => {
current_begin = Some(actual_key);
}
b"0" => {
if let Some(begin) = current_begin.take() {
ranges.push(ConflictingKeyRange {
begin,
end: actual_key,
});
}
}
_ => {}
}
}
debug_assert!(
current_begin.is_none(),
"unpaired '1' marker in conflicting keys response"
);
Ok(ranges)
}
pub fn add_conflict_range(
&self,
begin: &[u8],
end: &[u8],
ty: options::ConflictRangeType,
) -> FdbResult<()> {
error::eval(unsafe {
fdb_sys::fdb_transaction_add_conflict_range(
self.inner.as_ptr(),
begin.as_ptr(),
fdb_len(begin.len(), "begin"),
end.as_ptr(),
fdb_len(end.len(), "end"),
ty.code(),
)
})
}
}
impl Drop for Transaction {
fn drop(&mut self) {
unsafe {
fdb_sys::fdb_transaction_destroy(self.inner.as_ptr());
}
}
}
#[derive(Clone)]
pub struct RetryableTransaction {
inner: Arc<Transaction>,
}
impl Deref for RetryableTransaction {
type Target = Transaction;
fn deref(&self) -> &Transaction {
self.inner.deref()
}
}
impl RetryableTransaction {
pub(crate) fn new(t: Transaction) -> RetryableTransaction {
RetryableTransaction { inner: Arc::new(t) }
}
pub(crate) fn take(self) -> Result<Transaction, FdbBindingError> {
if Arc::weak_count(&self.inner) != 0 {
return Err(FdbBindingError::ReferenceToTransactionKept);
}
Arc::try_unwrap(self.inner).map_err(|_| FdbBindingError::ReferenceToTransactionKept)
}
pub(crate) async fn on_error(
self,
err: FdbError,
) -> Result<Result<RetryableTransaction, FdbError>, FdbBindingError> {
Ok(self
.take()?
.on_error(err)
.await
.map(RetryableTransaction::new))
}
#[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
pub(crate) async fn commit(
self,
) -> Result<Result<TransactionCommitted, TransactionCommitError>, FdbBindingError> {
Ok(self.take()?.commit().await)
}
}