fdb/transaction/
fdb_transaction.rs

1use bytes::Bytes;
2
3use std::convert::TryInto;
4use std::future::Future;
5use std::ptr::NonNull;
6use std::sync::Arc;
7
8use crate::error::{check, FdbError, FdbResult};
9use crate::future::{
10    FdbFuture, FdbFutureCStringArray, FdbFutureI64, FdbFutureKey, FdbFutureMaybeValue,
11    FdbFutureUnit, FdbStreamKeyValue,
12};
13use crate::option::ConflictRangeType;
14use crate::range::{Range, RangeOptions};
15use crate::transaction::{MutationType, ReadTransaction, Transaction, TransactionOption};
16use crate::tuple::key_util;
17use crate::{Key, KeySelector, Value};
18
19#[cfg(feature = "fdb-7_1")]
20use crate::future::{FdbFutureKeyArray, FdbStreamMappedKeyValue};
21
22#[cfg(feature = "fdb-7_1")]
23use crate::Mapper;
24
25/// Committed version of the [`Transaction`].
26///
27/// [`get_committed_version`] provides a value of this type. This
28/// value can be returned from [`run`] method closure. After the
29/// transaction successfully commits, you can use [`into`] method to
30/// get the committed version.
31///
32/// [`get_committed_version`]: Transaction::get_committed_version
33/// [`run`]: crate::database::FdbDatabase::run
34/// [`into`]: Into::into
35//
36// Here we are maintaining a copy of `FdbTransaction`, in order to
37// prevent the `run` method from advertently destroying
38// `fdb_sys::FDBTransaction`.
39#[derive(Debug)]
40pub struct CommittedVersion {
41    fdb_transaction: FdbTransaction,
42}
43
44impl CommittedVersion {
45    fn new(fdb_transaction: FdbTransaction) -> CommittedVersion {
46        CommittedVersion { fdb_transaction }
47    }
48}
49
50impl From<CommittedVersion> for FdbResult<i64> {
51    fn from(t: CommittedVersion) -> FdbResult<i64> {
52        let mut out_version = 0;
53
54        check(unsafe {
55            fdb_sys::fdb_transaction_get_committed_version(
56                t.fdb_transaction.get_c_api_ptr(),
57                &mut out_version,
58            )
59        })
60        .map(|_| out_version)
61    }
62}
63
64/// [`fdb_c`] client level versionstamp.
65///
66/// [`get_versionstamp`] provides a value of this type. This value can
67/// be returned from the [`run`] method closure. After the transaction
68/// successfully commits, you can use the [`get`] method to get the
69/// versionstamp.
70///
71/// [`fdb_c`]: https://apple.github.io/foundationdb/data-modeling.html#versionstamps
72/// [`run`]: crate::database::FdbDatabase::run
73/// [`get_versionstamp`]: Transaction::get_versionstamp
74/// [`get`]: TransactionVersionstamp::get
75//
76// Here we are maintaining a copy of `FdbTransaction`, in order to
77// prevent the `run` method from advertently destroying
78// `fdb_sys::FDBTransaction`
79#[derive(Debug)]
80#[allow(dead_code)]
81pub struct TransactionVersionstamp {
82    fdb_transaction: FdbTransaction,
83    future: FdbFutureKey,
84}
85
86impl TransactionVersionstamp {
87    /// Get [`fdb_c`] client level versionstamp.
88    ///
89    /// [`fdb_c`]: https://apple.github.io/foundationdb/data-modeling.html#versionstamps
90    pub async fn get(self) -> FdbResult<Bytes> {
91        self.future.await.map(|k| k.into())
92    }
93
94    /// Gets the inner [`FdbFutureKey`] while dropping the inner
95    /// [`FdbTransaction`].
96    ///
97    /// # Safety
98    ///
99    /// You should not use this API. It exists to support binding
100    /// tester.
101    pub unsafe fn get_inner_future(self) -> FdbFutureKey {
102        self.future
103    }
104
105    fn new(fdb_transaction: FdbTransaction, future: FdbFutureKey) -> TransactionVersionstamp {
106        TransactionVersionstamp {
107            fdb_transaction,
108            future,
109        }
110    }
111}
112
113/// A handle to a FDB transaction.
114///
115/// [`create_transaction`] method on [`FdbDatabase`] can be used to
116/// create a [`FdbTransaction`].
117///
118/// [`create_transaction`]: crate::database::FdbDatabase::create_transaction
119/// [`FdbDatabase`]: crate::database::FdbDatabase
120//
121// Unlike `FDBTransaction.java`, we do not store a copy of
122// `FdbDatabase` here. It is the responsibility of the caller to
123// ensure that `FdbDatabase` is alive during the lifetime of the
124// `FdbTransaction`.
125//
126// The design of `FdbTransaction` is very similar to the design of
127// `FdbDatabase`, where in the `Drop` trait we ensure that when we
128// have the last `Arc` to `fdb_sys::FDBTransaction`, then we
129// `fdb_sys::fdb_transaction_destroy`.
130#[derive(Clone, Debug)]
131pub struct FdbTransaction {
132    c_ptr: Option<Arc<NonNull<fdb_sys::FDBTransaction>>>,
133}
134
135impl FdbTransaction {
136    /// Return a special-purpose, read-only view of the database
137    /// ([`FdbReadTransaction`]). Reads done using [`snapshot`] are
138    /// known as *snapshot reads*. Snapshot reads selectively relax
139    /// FDB's isolation property, reducing transaction conflicts but
140    /// making reasoning about concurrency harder.
141    ///
142    /// For more information about how to use snapshot reads
143    /// correctly, see [snapshot reads].
144    ///
145    /// [`snapshot`]: FdbTransaction::snapshot
146    /// [snapshot reads]: https://apple.github.io/foundationdb/developer-guide.html#snapshot-reads
147    pub fn snapshot(&self) -> FdbReadTransaction {
148        let c_ptr = self.c_ptr.clone();
149
150        FdbReadTransaction::new(FdbTransaction { c_ptr })
151    }
152
153    /// Runs a closure in the context of this [`FdbTransaction`].
154    ///
155    /// # Safety
156    ///
157    /// You should not use this API. It exists to support binding
158    /// tester.
159    pub async unsafe fn run<T, F, Fut>(&self, mut f: F) -> FdbResult<T>
160    where
161        F: FnMut(FdbTransaction) -> Fut,
162        Fut: Future<Output = FdbResult<T>>,
163    {
164        // We don't do `commit()` here because the semantics of `run`
165        // is not very clear. If binding tester's `execute_mutation`
166        // fails with a retryable transaction on `tr:
167        // &FdbTransaction`, we'll possibly have to consider adding
168        // `on_error` and `commit` check here.
169        f(self.clone()).await
170    }
171
172    /// Runs a closure in the context of [`FdbReadTransaction`],
173    /// derived from [`FdbTransaction`].
174    ///
175    /// # Safety
176    ///
177    /// You should not use this API. It exists to support binding
178    /// tester.
179    //
180    // Note:
181    //
182    // In the following case we must use `F: FnMut(FdbTransaction) ->
183    // Fut` because we need to ensure that any reads done within the
184    // closure should be non-snapshot reads. Non-snapshot reads happen
185    // on `FdbTransaction`. It is the responsibility of the caller to
186    // ensure that they don't do any mutations in the closure.
187    pub async unsafe fn read<T, F, Fut>(&self, mut f: F) -> FdbResult<T>
188    where
189        F: FnMut(FdbTransaction) -> Fut,
190        Fut: Future<Output = FdbResult<T>>,
191    {
192        f(self.clone()).await
193    }
194
195    pub(crate) fn new(c_ptr: Option<Arc<NonNull<fdb_sys::FDBTransaction>>>) -> FdbTransaction {
196        FdbTransaction { c_ptr }
197    }
198
199    pub(crate) fn get_c_api_ptr(&self) -> *mut fdb_sys::FDB_transaction {
200        // Safety: It is safe to unwrap here because if we have a
201        // `self: &FdbTransaction`, then `c_ptr` *must* be
202        // `Some<Arc<NonNull<...>>>`. *Only* time that `c_ptr` is
203        // `None` is after `Drop::drop` has been called.
204        (self.c_ptr.as_ref().unwrap()).as_ptr()
205    }
206}
207
208impl Drop for FdbTransaction {
209    fn drop(&mut self) {
210        if let Some(a) = self.c_ptr.take() {
211            match Arc::try_unwrap(a) {
212                Ok(a) => unsafe {
213                    fdb_sys::fdb_transaction_destroy(a.as_ptr());
214                },
215                Err(at) => {
216                    drop(at);
217                }
218            };
219        }
220    }
221}
222
223// `snapshot` is `false` below because any reads that we do on
224// `FdbTransaction` is a `non-snapshot` read.
225impl ReadTransaction for FdbTransaction {
226    unsafe fn on_error(&self, e: FdbError) -> FdbFutureUnit {
227        FdbFuture::new(fdb_sys::fdb_transaction_on_error(
228            self.get_c_api_ptr(),
229            e.code(),
230        ))
231    }
232
233    fn get(&self, key: impl Into<Key>) -> FdbFutureMaybeValue {
234        internal::read_transaction::get(self.get_c_api_ptr(), key, false)
235    }
236
237    fn get_addresses_for_key(&self, key: impl Into<Key>) -> FdbFutureCStringArray {
238        internal::read_transaction::get_addresses_for_key(self.get_c_api_ptr(), key)
239    }
240
241    fn get_estimated_range_size_bytes(&self, range: Range) -> FdbFutureI64 {
242        let (begin, end) = range.into_parts();
243
244        internal::read_transaction::get_estimated_range_size_bytes(self.get_c_api_ptr(), begin, end)
245    }
246
247    fn get_key(&self, selector: KeySelector) -> FdbFutureKey {
248        internal::read_transaction::get_key(self.get_c_api_ptr(), selector, false)
249    }
250
251    #[cfg(feature = "fdb-7_1")]
252    fn get_mapped_range(
253        &self,
254        begin: KeySelector,
255        end: KeySelector,
256        mapper: impl Into<Mapper>,
257        options: RangeOptions,
258    ) -> FdbStreamMappedKeyValue {
259        // Convert to a value `Mapper` type here as we need to
260        // maintain a `Mapper` in the state machine.
261        FdbStreamMappedKeyValue::new(self.clone(), begin, end, mapper.into(), options, false)
262    }
263
264    fn get_range(
265        &self,
266        begin: KeySelector,
267        end: KeySelector,
268        options: RangeOptions,
269    ) -> FdbStreamKeyValue {
270        FdbStreamKeyValue::new(self.clone(), begin, end, options, false)
271    }
272
273    #[cfg(feature = "fdb-7_1")]
274    fn get_range_split_points(
275        &self,
276        begin: impl Into<Key>,
277        end: impl Into<Key>,
278        chunk_size: i64,
279    ) -> FdbFutureKeyArray {
280        internal::read_transaction::get_range_split_points(
281            self.get_c_api_ptr(),
282            begin,
283            end,
284            chunk_size,
285        )
286    }
287
288    unsafe fn get_read_version(&self) -> FdbFutureI64 {
289        internal::read_transaction::get_read_version(self.get_c_api_ptr())
290    }
291
292    fn set_option(&self, option: TransactionOption) -> FdbResult<()> {
293        internal::read_transaction::set_option(self.get_c_api_ptr(), option)
294    }
295
296    unsafe fn set_read_version(&self, version: i64) {
297        internal::read_transaction::set_read_version(self.get_c_api_ptr(), version)
298    }
299}
300
301impl Transaction for FdbTransaction {
302    fn add_read_conflict_key(&self, key: impl Into<Key>) -> FdbResult<()> {
303        let begin_key = key.into();
304        // Add a 0x00 to `end_key`. `begin_key` is inclusive and
305        // `end_key` is exclusive. By appending `0x00` to `end_key` we
306        // can make the conflict range contain only
307        // `begin_key`. `key_util::key_after` method does this for us.
308        let end_key = key_util::key_after(begin_key.clone());
309
310        internal::transaction::add_conflict_range(
311            self.get_c_api_ptr(),
312            begin_key,
313            end_key,
314            ConflictRangeType::Read,
315        )
316    }
317
318    fn add_read_conflict_range(&self, range: Range) -> FdbResult<()> {
319        let (begin, end) = range.into_parts();
320
321        internal::transaction::add_conflict_range(
322            self.get_c_api_ptr(),
323            begin,
324            end,
325            ConflictRangeType::Read,
326        )
327    }
328
329    fn add_write_conflict_key(&self, key: impl Into<Key>) -> FdbResult<()> {
330        let begin_key = key.into();
331        // Add a 0x00 to `end_key`. `begin_key` is inclusive and
332        // `end_key` is exclusive. By appending `0x00` to `end_key` we
333        // can make the conflict range contain only
334        // `begin_key`. `key_util::key_after` method does this for us.
335        let end_key = key_util::key_after(begin_key.clone());
336
337        internal::transaction::add_conflict_range(
338            self.get_c_api_ptr(),
339            begin_key,
340            end_key,
341            ConflictRangeType::Write,
342        )
343    }
344
345    fn add_write_conflict_range(&self, range: Range) -> FdbResult<()> {
346        let (begin, end) = range.into_parts();
347
348        internal::transaction::add_conflict_range(
349            self.get_c_api_ptr(),
350            begin,
351            end,
352            ConflictRangeType::Write,
353        )
354    }
355
356    unsafe fn cancel(&self) {
357        fdb_sys::fdb_transaction_cancel(self.get_c_api_ptr());
358    }
359
360    fn clear(&self, key: impl Into<Key>) {
361        let k = Bytes::from(key.into());
362        let key_name = k.as_ref().as_ptr();
363        let key_name_length = k.as_ref().len().try_into().unwrap();
364
365        unsafe { fdb_sys::fdb_transaction_clear(self.get_c_api_ptr(), key_name, key_name_length) }
366    }
367
368    fn clear_range(&self, range: Range) {
369        let (begin_key, end_key) = range.into_parts();
370
371        let bk = Bytes::from(begin_key);
372        let begin_key_name = bk.as_ref().as_ptr();
373        let begin_key_name_length = bk.as_ref().len().try_into().unwrap();
374
375        let ek = Bytes::from(end_key);
376        let end_key_name = ek.as_ref().as_ptr();
377        let end_key_name_length = ek.as_ref().len().try_into().unwrap();
378
379        unsafe {
380            fdb_sys::fdb_transaction_clear_range(
381                self.get_c_api_ptr(),
382                begin_key_name,
383                begin_key_name_length,
384                end_key_name,
385                end_key_name_length,
386            )
387        }
388    }
389
390    unsafe fn commit(&self) -> FdbFutureUnit {
391        FdbFuture::new(fdb_sys::fdb_transaction_commit(self.get_c_api_ptr()))
392    }
393
394    fn get_approximate_size(&self) -> FdbFutureI64 {
395        FdbFuture::new(unsafe {
396            fdb_sys::fdb_transaction_get_approximate_size(self.get_c_api_ptr())
397        })
398    }
399
400    unsafe fn get_committed_version(&self) -> CommittedVersion {
401        CommittedVersion::new(self.clone())
402    }
403
404    unsafe fn get_versionstamp(&self) -> TransactionVersionstamp {
405        let fdb_transaction = self.clone();
406
407        let future = FdbFuture::new(fdb_sys::fdb_transaction_get_versionstamp(
408            self.get_c_api_ptr(),
409        ));
410
411        TransactionVersionstamp::new(fdb_transaction, future)
412    }
413
414    unsafe fn mutate(&self, optype: MutationType, key: impl Into<Key>, param: Bytes) {
415        let k = Bytes::from(key.into());
416        let key_name = k.as_ref().as_ptr();
417        let key_name_length = k.as_ref().len().try_into().unwrap();
418
419        let p = param;
420        let param = p.as_ref().as_ptr();
421        let param_length = p.as_ref().len().try_into().unwrap();
422
423        fdb_sys::fdb_transaction_atomic_op(
424            self.get_c_api_ptr(),
425            key_name,
426            key_name_length,
427            param,
428            param_length,
429            optype.code(),
430        );
431    }
432
433    unsafe fn reset(&self) {
434        fdb_sys::fdb_transaction_reset(self.get_c_api_ptr());
435    }
436
437    fn set(&self, key: impl Into<Key>, value: impl Into<Value>) {
438        let k = Bytes::from(key.into());
439        let key_name = k.as_ref().as_ptr();
440        let key_name_length = k.as_ref().len().try_into().unwrap();
441
442        // `value` is being overridden to get naming consistent with C
443        // API parameters
444        let v = Bytes::from(value.into());
445        let value = v.as_ref().as_ptr();
446        let value_length = v.as_ref().len().try_into().unwrap();
447
448        unsafe {
449            fdb_sys::fdb_transaction_set(
450                self.get_c_api_ptr(),
451                key_name,
452                key_name_length,
453                value,
454                value_length,
455            )
456        }
457    }
458
459    fn watch(&self, key: impl Into<Key>) -> FdbFutureUnit {
460        let k = Bytes::from(key.into());
461        let key_name = k.as_ref().as_ptr();
462        let key_name_length = k.as_ref().len().try_into().unwrap();
463
464        FdbFuture::new(unsafe {
465            fdb_sys::fdb_transaction_watch(self.get_c_api_ptr(), key_name, key_name_length)
466        })
467    }
468}
469
470// # Safety
471//
472// After `FdbTransaction` is created,
473// `NonNull<fdb_sys::FDBTransaction>` is accessed read-only, till it
474// is finally dropped.
475//
476// Due to the use of `Arc`, copies are carefully managed, and
477// `Drop::drop` calls `fdb_sys::fdb_transaction_destroy`, when the
478// last copy of the `Arc` pointer is dropped.
479//
480// Other than `Drop::drop` (where we already ensure exclusive access),
481// we don't have any mutable state inside `FdbDatabase` that needs to
482// be protected with exclusive access. This allows us to add the
483// `Send` trait.
484//
485// `FdbTransaction` is read-only, *without* interior mutability, it is
486// safe to add `Sync` trait.
487//
488// The main reason for adding `Send` trait is so that values of
489// `FdbTransaction` can be moved to other threads.
490//
491// The main reason for adding `Sync` trait is because in binding
492// tester, we are putting a `FdbTransaction` behind a
493// `Arc<DashMap<...>` and this requires `Sync`.
494unsafe impl Send for FdbTransaction {}
495unsafe impl Sync for FdbTransaction {}
496
497/// A handle to a FDB snapshot, suitable for performing snapshot
498/// reads.
499///
500/// Snapshot reads offer more relaxed isolation level than FDB's
501/// default serializable isolation, reducing transaction conflicts but
502/// making it harder to reason about concurrency.
503///
504/// For more information about how to use snapshot reads correctly,
505/// see [snapshot reads].
506///
507/// [`snapshot`] method on [`FdbTransaction`] can be used to create a
508/// [`FdbReadTransaction`].
509///
510/// [`snapshot`]: FdbTransaction::snapshot
511/// [snapshot reads]: https://apple.github.io/foundationdb/developer-guide.html#snapshot-reads
512//
513// `FdbReadTransaction` internally has a owned `FdbTransaction`. When
514// the last `FdbTransaction` gets dropped, then `<FdbTransaction as
515// Drop>::drop` will call `fdb_transaction_destroy()`. Therefore we
516// don't have to do anything special when dropping
517// `FdbReadTransaction`.
518#[derive(Clone, Debug)]
519pub struct FdbReadTransaction {
520    inner: FdbTransaction,
521}
522
523impl FdbReadTransaction {
524    /// Runs a closure in the context of this [`FdbReadTransaction`].
525    ///
526    /// # Safety
527    ///
528    /// You should not use this API. It exists to support binding
529    /// tester.
530    pub async unsafe fn read<T, F, Fut>(&self, mut f: F) -> FdbResult<T>
531    where
532        F: FnMut(FdbReadTransaction) -> Fut,
533        Fut: Future<Output = FdbResult<T>>,
534    {
535        f(self.clone()).await
536    }
537
538    fn new(inner: FdbTransaction) -> FdbReadTransaction {
539        FdbReadTransaction { inner }
540    }
541}
542
543// `snapshot` is `true` below because any reads that we do on
544// `FdbReadTransaction` is a `snapshot` read.
545impl ReadTransaction for FdbReadTransaction {
546    unsafe fn on_error(&self, e: FdbError) -> FdbFutureUnit {
547        self.inner.on_error(e)
548    }
549
550    fn get(&self, key: impl Into<Key>) -> FdbFutureMaybeValue {
551        internal::read_transaction::get(self.inner.get_c_api_ptr(), key, true)
552    }
553
554    fn get_addresses_for_key(&self, key: impl Into<Key>) -> FdbFutureCStringArray {
555        self.inner.get_addresses_for_key(key)
556    }
557
558    fn get_estimated_range_size_bytes(&self, range: Range) -> FdbFutureI64 {
559        self.inner.get_estimated_range_size_bytes(range)
560    }
561
562    fn get_key(&self, selector: KeySelector) -> FdbFutureKey {
563        internal::read_transaction::get_key(self.inner.get_c_api_ptr(), selector, true)
564    }
565
566    #[cfg(feature = "fdb-7_1")]
567    fn get_mapped_range(
568        &self,
569        begin: KeySelector,
570        end: KeySelector,
571        mapper: impl Into<Mapper>,
572        options: RangeOptions,
573    ) -> FdbStreamMappedKeyValue {
574        // Convert to a value `Mapper` type here as we need to
575        // maintain a `Mapper` in the state machine.
576        FdbStreamMappedKeyValue::new(self.inner.clone(), begin, end, mapper.into(), options, true)
577    }
578
579    fn get_range(
580        &self,
581        begin: KeySelector,
582        end: KeySelector,
583        options: RangeOptions,
584    ) -> FdbStreamKeyValue {
585        FdbStreamKeyValue::new(self.inner.clone(), begin, end, options, true)
586    }
587
588    #[cfg(feature = "fdb-7_1")]
589    fn get_range_split_points(
590        &self,
591        begin: impl Into<Key>,
592        end: impl Into<Key>,
593        chunk_size: i64,
594    ) -> FdbFutureKeyArray {
595        self.inner.get_range_split_points(begin, end, chunk_size)
596    }
597
598    unsafe fn get_read_version(&self) -> FdbFutureI64 {
599        self.inner.get_read_version()
600    }
601
602    fn set_option(&self, option: TransactionOption) -> FdbResult<()> {
603        self.inner.set_option(option)
604    }
605
606    unsafe fn set_read_version(&self, version: i64) {
607        self.inner.set_read_version(version)
608    }
609}
610
611pub(super) mod internal {
612    pub(super) mod transaction {
613        use bytes::Bytes;
614
615        use std::convert::TryInto;
616
617        use crate::error::{check, FdbResult};
618        use crate::option::ConflictRangeType;
619        use crate::Key;
620
621        pub(crate) fn add_conflict_range(
622            transaction: *mut fdb_sys::FDBTransaction,
623            begin_key: Key,
624            end_key: Key,
625            ty: ConflictRangeType,
626        ) -> FdbResult<()> {
627            let bk = Bytes::from(begin_key);
628            let begin_key_name = bk.as_ref().as_ptr();
629            let begin_key_name_length = bk.as_ref().len().try_into().unwrap();
630
631            let ek = Bytes::from(end_key);
632            let end_key_name = ek.as_ref().as_ptr();
633            let end_key_name_length = ek.as_ref().len().try_into().unwrap();
634
635            check(unsafe {
636                fdb_sys::fdb_transaction_add_conflict_range(
637                    transaction,
638                    begin_key_name,
639                    begin_key_name_length,
640                    end_key_name,
641                    end_key_name_length,
642                    ty.code(),
643                )
644            })
645        }
646    }
647
648    pub(super) mod read_transaction {
649        use bytes::Bytes;
650
651        use std::convert::TryInto;
652
653        use crate::error::FdbResult;
654        use crate::future::{
655            FdbFuture, FdbFutureCStringArray, FdbFutureI64, FdbFutureKey, FdbFutureMaybeValue,
656        };
657        use crate::transaction::TransactionOption;
658        use crate::{Key, KeySelector};
659
660        #[cfg(feature = "fdb-7_1")]
661        use crate::future::FdbFutureKeyArray;
662
663        pub(crate) fn get(
664            transaction: *mut fdb_sys::FDBTransaction,
665            key: impl Into<Key>,
666            snapshot: bool,
667        ) -> FdbFutureMaybeValue {
668            let k = Bytes::from(key.into());
669            let key_name = k.as_ref().as_ptr();
670            let key_name_length = k.as_ref().len().try_into().unwrap();
671            let s = if snapshot { 1 } else { 0 };
672
673            FdbFuture::new(unsafe {
674                fdb_sys::fdb_transaction_get(transaction, key_name, key_name_length, s)
675            })
676        }
677
678        pub(crate) fn get_addresses_for_key(
679            transaction: *mut fdb_sys::FDBTransaction,
680            key: impl Into<Key>,
681        ) -> FdbFutureCStringArray {
682            let k = Bytes::from(key.into());
683            let key_name = k.as_ref().as_ptr();
684            let key_name_length = k.as_ref().len().try_into().unwrap();
685
686            FdbFuture::new(unsafe {
687                fdb_sys::fdb_transaction_get_addresses_for_key(
688                    transaction,
689                    key_name,
690                    key_name_length,
691                )
692            })
693        }
694
695        pub(crate) fn get_estimated_range_size_bytes(
696            transaction: *mut fdb_sys::FDBTransaction,
697            begin_key: Key,
698            end_key: Key,
699        ) -> FdbFutureI64 {
700            let bk = Bytes::from(begin_key);
701            let begin_key_name = bk.as_ref().as_ptr();
702            let begin_key_name_length = bk.as_ref().len().try_into().unwrap();
703
704            let ek = Bytes::from(end_key);
705            let end_key_name = ek.as_ref().as_ptr();
706            let end_key_name_length = ek.as_ref().len().try_into().unwrap();
707
708            FdbFuture::new(unsafe {
709                fdb_sys::fdb_transaction_get_estimated_range_size_bytes(
710                    transaction,
711                    begin_key_name,
712                    begin_key_name_length,
713                    end_key_name,
714                    end_key_name_length,
715                )
716            })
717        }
718
719        pub(crate) fn get_key(
720            transaction: *mut fdb_sys::FDBTransaction,
721            selector: KeySelector,
722            snapshot: bool,
723        ) -> FdbFutureKey {
724            let (key, or_equal, offset) = selector.deconstruct();
725            let k = Bytes::from(key);
726            let key_name = k.as_ref().as_ptr();
727            let key_name_length = k.as_ref().len().try_into().unwrap();
728            let or_equal = if or_equal { 1 } else { 0 };
729
730            let s = if snapshot { 1 } else { 0 };
731
732            FdbFuture::new(unsafe {
733                fdb_sys::fdb_transaction_get_key(
734                    transaction,
735                    key_name,
736                    key_name_length,
737                    or_equal,
738                    offset,
739                    s,
740                )
741            })
742        }
743
744        #[cfg(feature = "fdb-7_1")]
745        pub(crate) fn get_range_split_points(
746            transaction: *mut fdb_sys::FDBTransaction,
747            begin_key: impl Into<Key>,
748            end_key: impl Into<Key>,
749            chunk_size: i64,
750        ) -> FdbFutureKeyArray {
751            let bk = Bytes::from(begin_key.into());
752            let begin_key_name = bk.as_ref().as_ptr();
753            let begin_key_name_length = bk.as_ref().len().try_into().unwrap();
754
755            let ek = Bytes::from(end_key.into());
756            let end_key_name = ek.as_ref().as_ptr();
757            let end_key_name_length = ek.as_ref().len().try_into().unwrap();
758
759            FdbFuture::new(unsafe {
760                fdb_sys::fdb_transaction_get_range_split_points(
761                    transaction,
762                    begin_key_name,
763                    begin_key_name_length,
764                    end_key_name,
765                    end_key_name_length,
766                    chunk_size,
767                )
768            })
769        }
770
771        pub(crate) fn get_read_version(transaction: *mut fdb_sys::FDBTransaction) -> FdbFutureI64 {
772            FdbFuture::new(unsafe { fdb_sys::fdb_transaction_get_read_version(transaction) })
773        }
774
775        pub(crate) fn set_option(
776            transaction: *mut fdb_sys::FDBTransaction,
777            option: TransactionOption,
778        ) -> FdbResult<()> {
779            unsafe { option.apply(transaction) }
780        }
781
782        pub(crate) fn set_read_version(transaction: *mut fdb_sys::FDBTransaction, version: i64) {
783            unsafe { fdb_sys::fdb_transaction_set_read_version(transaction, version) }
784        }
785    }
786}
787
788#[cfg(test)]
789mod tests {
790    use impls::impls;
791
792    use std::ptr::NonNull;
793    use std::sync::atomic::{AtomicBool, Ordering};
794    use std::sync::Arc;
795
796    use super::{FdbReadTransaction, FdbTransaction};
797
798    #[test]
799    fn impls() {
800        #[rustfmt::skip]
801	assert!(impls!(
802	    FdbTransaction:
803	        Send &
804		!Copy));
805
806        #[rustfmt::skip]
807	assert!(impls!(
808	    FdbReadTransaction:
809	        Send &
810		!Copy));
811    }
812
813    #[allow(dead_code)]
814    #[derive(Debug)]
815    struct DummyFdbTransaction {
816        c_ptr: Option<Arc<NonNull<fdb_sys::FDBTransaction>>>,
817    }
818
819    unsafe impl Send for DummyFdbTransaction {}
820
821    #[test]
822    fn trait_bounds() {
823        fn trait_bounds_for_fdb_transaction<T>(_t: T)
824        where
825            T: Send + 'static,
826        {
827        }
828
829        let d = DummyFdbTransaction {
830            c_ptr: Some(Arc::new(NonNull::dangling())),
831        };
832        trait_bounds_for_fdb_transaction(d);
833    }
834
835    static mut DROP_TEST_DUMMY_FDB_TRANSACTION_HAS_DROPPED: AtomicBool = AtomicBool::new(false);
836
837    #[derive(Debug)]
838    struct DropTestDummyFdbTransaction {
839        c_ptr: Option<Arc<NonNull<fdb_sys::FDBTransaction>>>,
840    }
841
842    unsafe impl Send for DropTestDummyFdbTransaction {}
843    unsafe impl Sync for DropTestDummyFdbTransaction {}
844
845    impl Drop for DropTestDummyFdbTransaction {
846        fn drop(&mut self) {
847            if let Some(a) = self.c_ptr.take() {
848                match Arc::try_unwrap(a) {
849                    Ok(_) => {
850                        unsafe {
851                            DROP_TEST_DUMMY_FDB_TRANSACTION_HAS_DROPPED
852                                .store(true, Ordering::SeqCst);
853                        };
854                    }
855                    Err(at) => {
856                        drop(at);
857                    }
858                };
859            }
860        }
861    }
862
863    #[tokio::test]
864    async fn multiple_drop() {
865        let d0 = DropTestDummyFdbTransaction {
866            c_ptr: Some(Arc::new(NonNull::dangling())),
867        };
868
869        // Initially this is false.
870        assert!(!unsafe { DROP_TEST_DUMMY_FDB_TRANSACTION_HAS_DROPPED.load(Ordering::SeqCst) });
871
872        let d1 = DropTestDummyFdbTransaction {
873            c_ptr: d0.c_ptr.clone(),
874        };
875
876        assert_eq!(Arc::strong_count(d1.c_ptr.as_ref().unwrap()), 2);
877
878        tokio::spawn(async move {
879            let _ = d1;
880        })
881        .await
882        .unwrap();
883
884        assert_eq!(Arc::strong_count(d0.c_ptr.as_ref().unwrap()), 1);
885
886        let d2 = DropTestDummyFdbTransaction {
887            c_ptr: d0.c_ptr.clone(),
888        };
889        let d3 = DropTestDummyFdbTransaction {
890            c_ptr: d2.c_ptr.clone(),
891        };
892
893        tokio::spawn(async move {
894            let _ = d2;
895            let _ = d3;
896        })
897        .await
898        .unwrap();
899
900        assert_eq!(Arc::strong_count(d0.c_ptr.as_ref().unwrap()), 1);
901
902        drop(d0);
903
904        assert!(unsafe { DROP_TEST_DUMMY_FDB_TRANSACTION_HAS_DROPPED.load(Ordering::SeqCst) });
905    }
906}