1use bytes::Bytes;
2
3use std::convert::TryInto;
4use std::future::Future;
5use std::ptr::NonNull;
6use std::sync::Arc;
7
8use crate::error::{check, FdbError, FdbResult};
9use crate::future::{
10 FdbFuture, FdbFutureCStringArray, FdbFutureI64, FdbFutureKey, FdbFutureMaybeValue,
11 FdbFutureUnit, FdbStreamKeyValue,
12};
13use crate::option::ConflictRangeType;
14use crate::range::{Range, RangeOptions};
15use crate::transaction::{MutationType, ReadTransaction, Transaction, TransactionOption};
16use crate::tuple::key_util;
17use crate::{Key, KeySelector, Value};
18
19#[cfg(feature = "fdb-7_1")]
20use crate::future::{FdbFutureKeyArray, FdbStreamMappedKeyValue};
21
22#[cfg(feature = "fdb-7_1")]
23use crate::Mapper;
24
25#[derive(Debug)]
40pub struct CommittedVersion {
41 fdb_transaction: FdbTransaction,
42}
43
44impl CommittedVersion {
45 fn new(fdb_transaction: FdbTransaction) -> CommittedVersion {
46 CommittedVersion { fdb_transaction }
47 }
48}
49
50impl From<CommittedVersion> for FdbResult<i64> {
51 fn from(t: CommittedVersion) -> FdbResult<i64> {
52 let mut out_version = 0;
53
54 check(unsafe {
55 fdb_sys::fdb_transaction_get_committed_version(
56 t.fdb_transaction.get_c_api_ptr(),
57 &mut out_version,
58 )
59 })
60 .map(|_| out_version)
61 }
62}
63
64#[derive(Debug)]
80#[allow(dead_code)]
81pub struct TransactionVersionstamp {
82 fdb_transaction: FdbTransaction,
83 future: FdbFutureKey,
84}
85
86impl TransactionVersionstamp {
87 pub async fn get(self) -> FdbResult<Bytes> {
91 self.future.await.map(|k| k.into())
92 }
93
94 pub unsafe fn get_inner_future(self) -> FdbFutureKey {
102 self.future
103 }
104
105 fn new(fdb_transaction: FdbTransaction, future: FdbFutureKey) -> TransactionVersionstamp {
106 TransactionVersionstamp {
107 fdb_transaction,
108 future,
109 }
110 }
111}
112
113#[derive(Clone, Debug)]
131pub struct FdbTransaction {
132 c_ptr: Option<Arc<NonNull<fdb_sys::FDBTransaction>>>,
133}
134
135impl FdbTransaction {
136 pub fn snapshot(&self) -> FdbReadTransaction {
148 let c_ptr = self.c_ptr.clone();
149
150 FdbReadTransaction::new(FdbTransaction { c_ptr })
151 }
152
153 pub async unsafe fn run<T, F, Fut>(&self, mut f: F) -> FdbResult<T>
160 where
161 F: FnMut(FdbTransaction) -> Fut,
162 Fut: Future<Output = FdbResult<T>>,
163 {
164 f(self.clone()).await
170 }
171
172 pub async unsafe fn read<T, F, Fut>(&self, mut f: F) -> FdbResult<T>
188 where
189 F: FnMut(FdbTransaction) -> Fut,
190 Fut: Future<Output = FdbResult<T>>,
191 {
192 f(self.clone()).await
193 }
194
195 pub(crate) fn new(c_ptr: Option<Arc<NonNull<fdb_sys::FDBTransaction>>>) -> FdbTransaction {
196 FdbTransaction { c_ptr }
197 }
198
199 pub(crate) fn get_c_api_ptr(&self) -> *mut fdb_sys::FDB_transaction {
200 (self.c_ptr.as_ref().unwrap()).as_ptr()
205 }
206}
207
208impl Drop for FdbTransaction {
209 fn drop(&mut self) {
210 if let Some(a) = self.c_ptr.take() {
211 match Arc::try_unwrap(a) {
212 Ok(a) => unsafe {
213 fdb_sys::fdb_transaction_destroy(a.as_ptr());
214 },
215 Err(at) => {
216 drop(at);
217 }
218 };
219 }
220 }
221}
222
223impl ReadTransaction for FdbTransaction {
226 unsafe fn on_error(&self, e: FdbError) -> FdbFutureUnit {
227 FdbFuture::new(fdb_sys::fdb_transaction_on_error(
228 self.get_c_api_ptr(),
229 e.code(),
230 ))
231 }
232
233 fn get(&self, key: impl Into<Key>) -> FdbFutureMaybeValue {
234 internal::read_transaction::get(self.get_c_api_ptr(), key, false)
235 }
236
237 fn get_addresses_for_key(&self, key: impl Into<Key>) -> FdbFutureCStringArray {
238 internal::read_transaction::get_addresses_for_key(self.get_c_api_ptr(), key)
239 }
240
241 fn get_estimated_range_size_bytes(&self, range: Range) -> FdbFutureI64 {
242 let (begin, end) = range.into_parts();
243
244 internal::read_transaction::get_estimated_range_size_bytes(self.get_c_api_ptr(), begin, end)
245 }
246
247 fn get_key(&self, selector: KeySelector) -> FdbFutureKey {
248 internal::read_transaction::get_key(self.get_c_api_ptr(), selector, false)
249 }
250
251 #[cfg(feature = "fdb-7_1")]
252 fn get_mapped_range(
253 &self,
254 begin: KeySelector,
255 end: KeySelector,
256 mapper: impl Into<Mapper>,
257 options: RangeOptions,
258 ) -> FdbStreamMappedKeyValue {
259 FdbStreamMappedKeyValue::new(self.clone(), begin, end, mapper.into(), options, false)
262 }
263
264 fn get_range(
265 &self,
266 begin: KeySelector,
267 end: KeySelector,
268 options: RangeOptions,
269 ) -> FdbStreamKeyValue {
270 FdbStreamKeyValue::new(self.clone(), begin, end, options, false)
271 }
272
273 #[cfg(feature = "fdb-7_1")]
274 fn get_range_split_points(
275 &self,
276 begin: impl Into<Key>,
277 end: impl Into<Key>,
278 chunk_size: i64,
279 ) -> FdbFutureKeyArray {
280 internal::read_transaction::get_range_split_points(
281 self.get_c_api_ptr(),
282 begin,
283 end,
284 chunk_size,
285 )
286 }
287
288 unsafe fn get_read_version(&self) -> FdbFutureI64 {
289 internal::read_transaction::get_read_version(self.get_c_api_ptr())
290 }
291
292 fn set_option(&self, option: TransactionOption) -> FdbResult<()> {
293 internal::read_transaction::set_option(self.get_c_api_ptr(), option)
294 }
295
296 unsafe fn set_read_version(&self, version: i64) {
297 internal::read_transaction::set_read_version(self.get_c_api_ptr(), version)
298 }
299}
300
301impl Transaction for FdbTransaction {
302 fn add_read_conflict_key(&self, key: impl Into<Key>) -> FdbResult<()> {
303 let begin_key = key.into();
304 let end_key = key_util::key_after(begin_key.clone());
309
310 internal::transaction::add_conflict_range(
311 self.get_c_api_ptr(),
312 begin_key,
313 end_key,
314 ConflictRangeType::Read,
315 )
316 }
317
318 fn add_read_conflict_range(&self, range: Range) -> FdbResult<()> {
319 let (begin, end) = range.into_parts();
320
321 internal::transaction::add_conflict_range(
322 self.get_c_api_ptr(),
323 begin,
324 end,
325 ConflictRangeType::Read,
326 )
327 }
328
329 fn add_write_conflict_key(&self, key: impl Into<Key>) -> FdbResult<()> {
330 let begin_key = key.into();
331 let end_key = key_util::key_after(begin_key.clone());
336
337 internal::transaction::add_conflict_range(
338 self.get_c_api_ptr(),
339 begin_key,
340 end_key,
341 ConflictRangeType::Write,
342 )
343 }
344
345 fn add_write_conflict_range(&self, range: Range) -> FdbResult<()> {
346 let (begin, end) = range.into_parts();
347
348 internal::transaction::add_conflict_range(
349 self.get_c_api_ptr(),
350 begin,
351 end,
352 ConflictRangeType::Write,
353 )
354 }
355
356 unsafe fn cancel(&self) {
357 fdb_sys::fdb_transaction_cancel(self.get_c_api_ptr());
358 }
359
360 fn clear(&self, key: impl Into<Key>) {
361 let k = Bytes::from(key.into());
362 let key_name = k.as_ref().as_ptr();
363 let key_name_length = k.as_ref().len().try_into().unwrap();
364
365 unsafe { fdb_sys::fdb_transaction_clear(self.get_c_api_ptr(), key_name, key_name_length) }
366 }
367
368 fn clear_range(&self, range: Range) {
369 let (begin_key, end_key) = range.into_parts();
370
371 let bk = Bytes::from(begin_key);
372 let begin_key_name = bk.as_ref().as_ptr();
373 let begin_key_name_length = bk.as_ref().len().try_into().unwrap();
374
375 let ek = Bytes::from(end_key);
376 let end_key_name = ek.as_ref().as_ptr();
377 let end_key_name_length = ek.as_ref().len().try_into().unwrap();
378
379 unsafe {
380 fdb_sys::fdb_transaction_clear_range(
381 self.get_c_api_ptr(),
382 begin_key_name,
383 begin_key_name_length,
384 end_key_name,
385 end_key_name_length,
386 )
387 }
388 }
389
390 unsafe fn commit(&self) -> FdbFutureUnit {
391 FdbFuture::new(fdb_sys::fdb_transaction_commit(self.get_c_api_ptr()))
392 }
393
394 fn get_approximate_size(&self) -> FdbFutureI64 {
395 FdbFuture::new(unsafe {
396 fdb_sys::fdb_transaction_get_approximate_size(self.get_c_api_ptr())
397 })
398 }
399
400 unsafe fn get_committed_version(&self) -> CommittedVersion {
401 CommittedVersion::new(self.clone())
402 }
403
404 unsafe fn get_versionstamp(&self) -> TransactionVersionstamp {
405 let fdb_transaction = self.clone();
406
407 let future = FdbFuture::new(fdb_sys::fdb_transaction_get_versionstamp(
408 self.get_c_api_ptr(),
409 ));
410
411 TransactionVersionstamp::new(fdb_transaction, future)
412 }
413
414 unsafe fn mutate(&self, optype: MutationType, key: impl Into<Key>, param: Bytes) {
415 let k = Bytes::from(key.into());
416 let key_name = k.as_ref().as_ptr();
417 let key_name_length = k.as_ref().len().try_into().unwrap();
418
419 let p = param;
420 let param = p.as_ref().as_ptr();
421 let param_length = p.as_ref().len().try_into().unwrap();
422
423 fdb_sys::fdb_transaction_atomic_op(
424 self.get_c_api_ptr(),
425 key_name,
426 key_name_length,
427 param,
428 param_length,
429 optype.code(),
430 );
431 }
432
433 unsafe fn reset(&self) {
434 fdb_sys::fdb_transaction_reset(self.get_c_api_ptr());
435 }
436
437 fn set(&self, key: impl Into<Key>, value: impl Into<Value>) {
438 let k = Bytes::from(key.into());
439 let key_name = k.as_ref().as_ptr();
440 let key_name_length = k.as_ref().len().try_into().unwrap();
441
442 let v = Bytes::from(value.into());
445 let value = v.as_ref().as_ptr();
446 let value_length = v.as_ref().len().try_into().unwrap();
447
448 unsafe {
449 fdb_sys::fdb_transaction_set(
450 self.get_c_api_ptr(),
451 key_name,
452 key_name_length,
453 value,
454 value_length,
455 )
456 }
457 }
458
459 fn watch(&self, key: impl Into<Key>) -> FdbFutureUnit {
460 let k = Bytes::from(key.into());
461 let key_name = k.as_ref().as_ptr();
462 let key_name_length = k.as_ref().len().try_into().unwrap();
463
464 FdbFuture::new(unsafe {
465 fdb_sys::fdb_transaction_watch(self.get_c_api_ptr(), key_name, key_name_length)
466 })
467 }
468}
469
470unsafe impl Send for FdbTransaction {}
495unsafe impl Sync for FdbTransaction {}
496
497#[derive(Clone, Debug)]
519pub struct FdbReadTransaction {
520 inner: FdbTransaction,
521}
522
523impl FdbReadTransaction {
524 pub async unsafe fn read<T, F, Fut>(&self, mut f: F) -> FdbResult<T>
531 where
532 F: FnMut(FdbReadTransaction) -> Fut,
533 Fut: Future<Output = FdbResult<T>>,
534 {
535 f(self.clone()).await
536 }
537
538 fn new(inner: FdbTransaction) -> FdbReadTransaction {
539 FdbReadTransaction { inner }
540 }
541}
542
543impl ReadTransaction for FdbReadTransaction {
546 unsafe fn on_error(&self, e: FdbError) -> FdbFutureUnit {
547 self.inner.on_error(e)
548 }
549
550 fn get(&self, key: impl Into<Key>) -> FdbFutureMaybeValue {
551 internal::read_transaction::get(self.inner.get_c_api_ptr(), key, true)
552 }
553
554 fn get_addresses_for_key(&self, key: impl Into<Key>) -> FdbFutureCStringArray {
555 self.inner.get_addresses_for_key(key)
556 }
557
558 fn get_estimated_range_size_bytes(&self, range: Range) -> FdbFutureI64 {
559 self.inner.get_estimated_range_size_bytes(range)
560 }
561
562 fn get_key(&self, selector: KeySelector) -> FdbFutureKey {
563 internal::read_transaction::get_key(self.inner.get_c_api_ptr(), selector, true)
564 }
565
566 #[cfg(feature = "fdb-7_1")]
567 fn get_mapped_range(
568 &self,
569 begin: KeySelector,
570 end: KeySelector,
571 mapper: impl Into<Mapper>,
572 options: RangeOptions,
573 ) -> FdbStreamMappedKeyValue {
574 FdbStreamMappedKeyValue::new(self.inner.clone(), begin, end, mapper.into(), options, true)
577 }
578
579 fn get_range(
580 &self,
581 begin: KeySelector,
582 end: KeySelector,
583 options: RangeOptions,
584 ) -> FdbStreamKeyValue {
585 FdbStreamKeyValue::new(self.inner.clone(), begin, end, options, true)
586 }
587
588 #[cfg(feature = "fdb-7_1")]
589 fn get_range_split_points(
590 &self,
591 begin: impl Into<Key>,
592 end: impl Into<Key>,
593 chunk_size: i64,
594 ) -> FdbFutureKeyArray {
595 self.inner.get_range_split_points(begin, end, chunk_size)
596 }
597
598 unsafe fn get_read_version(&self) -> FdbFutureI64 {
599 self.inner.get_read_version()
600 }
601
602 fn set_option(&self, option: TransactionOption) -> FdbResult<()> {
603 self.inner.set_option(option)
604 }
605
606 unsafe fn set_read_version(&self, version: i64) {
607 self.inner.set_read_version(version)
608 }
609}
610
611pub(super) mod internal {
612 pub(super) mod transaction {
613 use bytes::Bytes;
614
615 use std::convert::TryInto;
616
617 use crate::error::{check, FdbResult};
618 use crate::option::ConflictRangeType;
619 use crate::Key;
620
621 pub(crate) fn add_conflict_range(
622 transaction: *mut fdb_sys::FDBTransaction,
623 begin_key: Key,
624 end_key: Key,
625 ty: ConflictRangeType,
626 ) -> FdbResult<()> {
627 let bk = Bytes::from(begin_key);
628 let begin_key_name = bk.as_ref().as_ptr();
629 let begin_key_name_length = bk.as_ref().len().try_into().unwrap();
630
631 let ek = Bytes::from(end_key);
632 let end_key_name = ek.as_ref().as_ptr();
633 let end_key_name_length = ek.as_ref().len().try_into().unwrap();
634
635 check(unsafe {
636 fdb_sys::fdb_transaction_add_conflict_range(
637 transaction,
638 begin_key_name,
639 begin_key_name_length,
640 end_key_name,
641 end_key_name_length,
642 ty.code(),
643 )
644 })
645 }
646 }
647
648 pub(super) mod read_transaction {
649 use bytes::Bytes;
650
651 use std::convert::TryInto;
652
653 use crate::error::FdbResult;
654 use crate::future::{
655 FdbFuture, FdbFutureCStringArray, FdbFutureI64, FdbFutureKey, FdbFutureMaybeValue,
656 };
657 use crate::transaction::TransactionOption;
658 use crate::{Key, KeySelector};
659
660 #[cfg(feature = "fdb-7_1")]
661 use crate::future::FdbFutureKeyArray;
662
663 pub(crate) fn get(
664 transaction: *mut fdb_sys::FDBTransaction,
665 key: impl Into<Key>,
666 snapshot: bool,
667 ) -> FdbFutureMaybeValue {
668 let k = Bytes::from(key.into());
669 let key_name = k.as_ref().as_ptr();
670 let key_name_length = k.as_ref().len().try_into().unwrap();
671 let s = if snapshot { 1 } else { 0 };
672
673 FdbFuture::new(unsafe {
674 fdb_sys::fdb_transaction_get(transaction, key_name, key_name_length, s)
675 })
676 }
677
678 pub(crate) fn get_addresses_for_key(
679 transaction: *mut fdb_sys::FDBTransaction,
680 key: impl Into<Key>,
681 ) -> FdbFutureCStringArray {
682 let k = Bytes::from(key.into());
683 let key_name = k.as_ref().as_ptr();
684 let key_name_length = k.as_ref().len().try_into().unwrap();
685
686 FdbFuture::new(unsafe {
687 fdb_sys::fdb_transaction_get_addresses_for_key(
688 transaction,
689 key_name,
690 key_name_length,
691 )
692 })
693 }
694
695 pub(crate) fn get_estimated_range_size_bytes(
696 transaction: *mut fdb_sys::FDBTransaction,
697 begin_key: Key,
698 end_key: Key,
699 ) -> FdbFutureI64 {
700 let bk = Bytes::from(begin_key);
701 let begin_key_name = bk.as_ref().as_ptr();
702 let begin_key_name_length = bk.as_ref().len().try_into().unwrap();
703
704 let ek = Bytes::from(end_key);
705 let end_key_name = ek.as_ref().as_ptr();
706 let end_key_name_length = ek.as_ref().len().try_into().unwrap();
707
708 FdbFuture::new(unsafe {
709 fdb_sys::fdb_transaction_get_estimated_range_size_bytes(
710 transaction,
711 begin_key_name,
712 begin_key_name_length,
713 end_key_name,
714 end_key_name_length,
715 )
716 })
717 }
718
719 pub(crate) fn get_key(
720 transaction: *mut fdb_sys::FDBTransaction,
721 selector: KeySelector,
722 snapshot: bool,
723 ) -> FdbFutureKey {
724 let (key, or_equal, offset) = selector.deconstruct();
725 let k = Bytes::from(key);
726 let key_name = k.as_ref().as_ptr();
727 let key_name_length = k.as_ref().len().try_into().unwrap();
728 let or_equal = if or_equal { 1 } else { 0 };
729
730 let s = if snapshot { 1 } else { 0 };
731
732 FdbFuture::new(unsafe {
733 fdb_sys::fdb_transaction_get_key(
734 transaction,
735 key_name,
736 key_name_length,
737 or_equal,
738 offset,
739 s,
740 )
741 })
742 }
743
744 #[cfg(feature = "fdb-7_1")]
745 pub(crate) fn get_range_split_points(
746 transaction: *mut fdb_sys::FDBTransaction,
747 begin_key: impl Into<Key>,
748 end_key: impl Into<Key>,
749 chunk_size: i64,
750 ) -> FdbFutureKeyArray {
751 let bk = Bytes::from(begin_key.into());
752 let begin_key_name = bk.as_ref().as_ptr();
753 let begin_key_name_length = bk.as_ref().len().try_into().unwrap();
754
755 let ek = Bytes::from(end_key.into());
756 let end_key_name = ek.as_ref().as_ptr();
757 let end_key_name_length = ek.as_ref().len().try_into().unwrap();
758
759 FdbFuture::new(unsafe {
760 fdb_sys::fdb_transaction_get_range_split_points(
761 transaction,
762 begin_key_name,
763 begin_key_name_length,
764 end_key_name,
765 end_key_name_length,
766 chunk_size,
767 )
768 })
769 }
770
771 pub(crate) fn get_read_version(transaction: *mut fdb_sys::FDBTransaction) -> FdbFutureI64 {
772 FdbFuture::new(unsafe { fdb_sys::fdb_transaction_get_read_version(transaction) })
773 }
774
775 pub(crate) fn set_option(
776 transaction: *mut fdb_sys::FDBTransaction,
777 option: TransactionOption,
778 ) -> FdbResult<()> {
779 unsafe { option.apply(transaction) }
780 }
781
782 pub(crate) fn set_read_version(transaction: *mut fdb_sys::FDBTransaction, version: i64) {
783 unsafe { fdb_sys::fdb_transaction_set_read_version(transaction, version) }
784 }
785 }
786}
787
788#[cfg(test)]
789mod tests {
790 use impls::impls;
791
792 use std::ptr::NonNull;
793 use std::sync::atomic::{AtomicBool, Ordering};
794 use std::sync::Arc;
795
796 use super::{FdbReadTransaction, FdbTransaction};
797
798 #[test]
799 fn impls() {
800 #[rustfmt::skip]
801 assert!(impls!(
802 FdbTransaction:
803 Send &
804 !Copy));
805
806 #[rustfmt::skip]
807 assert!(impls!(
808 FdbReadTransaction:
809 Send &
810 !Copy));
811 }
812
813 #[allow(dead_code)]
814 #[derive(Debug)]
815 struct DummyFdbTransaction {
816 c_ptr: Option<Arc<NonNull<fdb_sys::FDBTransaction>>>,
817 }
818
819 unsafe impl Send for DummyFdbTransaction {}
820
821 #[test]
822 fn trait_bounds() {
823 fn trait_bounds_for_fdb_transaction<T>(_t: T)
824 where
825 T: Send + 'static,
826 {
827 }
828
829 let d = DummyFdbTransaction {
830 c_ptr: Some(Arc::new(NonNull::dangling())),
831 };
832 trait_bounds_for_fdb_transaction(d);
833 }
834
835 static mut DROP_TEST_DUMMY_FDB_TRANSACTION_HAS_DROPPED: AtomicBool = AtomicBool::new(false);
836
837 #[derive(Debug)]
838 struct DropTestDummyFdbTransaction {
839 c_ptr: Option<Arc<NonNull<fdb_sys::FDBTransaction>>>,
840 }
841
842 unsafe impl Send for DropTestDummyFdbTransaction {}
843 unsafe impl Sync for DropTestDummyFdbTransaction {}
844
845 impl Drop for DropTestDummyFdbTransaction {
846 fn drop(&mut self) {
847 if let Some(a) = self.c_ptr.take() {
848 match Arc::try_unwrap(a) {
849 Ok(_) => {
850 unsafe {
851 DROP_TEST_DUMMY_FDB_TRANSACTION_HAS_DROPPED
852 .store(true, Ordering::SeqCst);
853 };
854 }
855 Err(at) => {
856 drop(at);
857 }
858 };
859 }
860 }
861 }
862
863 #[tokio::test]
864 async fn multiple_drop() {
865 let d0 = DropTestDummyFdbTransaction {
866 c_ptr: Some(Arc::new(NonNull::dangling())),
867 };
868
869 assert!(!unsafe { DROP_TEST_DUMMY_FDB_TRANSACTION_HAS_DROPPED.load(Ordering::SeqCst) });
871
872 let d1 = DropTestDummyFdbTransaction {
873 c_ptr: d0.c_ptr.clone(),
874 };
875
876 assert_eq!(Arc::strong_count(d1.c_ptr.as_ref().unwrap()), 2);
877
878 tokio::spawn(async move {
879 let _ = d1;
880 })
881 .await
882 .unwrap();
883
884 assert_eq!(Arc::strong_count(d0.c_ptr.as_ref().unwrap()), 1);
885
886 let d2 = DropTestDummyFdbTransaction {
887 c_ptr: d0.c_ptr.clone(),
888 };
889 let d3 = DropTestDummyFdbTransaction {
890 c_ptr: d2.c_ptr.clone(),
891 };
892
893 tokio::spawn(async move {
894 let _ = d2;
895 let _ = d3;
896 })
897 .await
898 .unwrap();
899
900 assert_eq!(Arc::strong_count(d0.c_ptr.as_ref().unwrap()), 1);
901
902 drop(d0);
903
904 assert!(unsafe { DROP_TEST_DUMMY_FDB_TRANSACTION_HAS_DROPPED.load(Ordering::SeqCst) });
905 }
906}