1#![allow(clippy::type_complexity)]
6#![forbid(unsafe_code)]
7#![deny(warnings, missing_docs)]
8#![cfg_attr(docsrs, feature(doc_cfg))]
9#![cfg_attr(docsrs, allow(unused_attributes))]
10
11use std::sync::Arc;
12
13use core::mem;
14
15use error::TransactionError;
16pub use smallvec_wrapper::OneOrMore;
17
18pub use wmark::{AsyncSpawner, Detach};
19
20#[cfg(feature = "smol")]
21pub use wmark::SmolSpawner;
22
23#[cfg(feature = "async-std")]
24pub use wmark::AsyncStdSpawner;
25
26#[cfg(feature = "tokio")]
27pub use wmark::TokioSpawner;
28
29#[cfg(feature = "wasm")]
30pub use wmark::WasmSpawner;
31
32pub use txn_core::error;
34
35mod oracle;
36use oracle::*;
37mod read;
38pub use read::*;
39mod write;
40pub use write::*;
41
42pub use txn_core::{
43 future::*,
44 sync::{
45 BTreeCm, BTreePwm, Cm, CmComparable, CmEquivalent, HashCm, HashCmOptions, IndexMapPwm, Marker,
46 Pwm, PwmComparable, PwmComparableRange, PwmEquivalent, PwmEquivalentRange, PwmRange,
47 },
48 types::*,
49};
50
51pub struct AsyncTm<K, V, C, P, S>
53where
54 S: AsyncSpawner,
55{
56 inner: Arc<Oracle<C, S>>,
57 _phantom: std::marker::PhantomData<(K, V, P)>,
58}
59
60impl<K, V, C, P, S> Clone for AsyncTm<K, V, C, P, S>
61where
62 S: AsyncSpawner,
63{
64 fn clone(&self) -> Self {
65 Self {
66 inner: self.inner.clone(),
67 _phantom: std::marker::PhantomData,
68 }
69 }
70}
71
72impl<K, V, C, P, S> AsyncTm<K, V, C, P, S>
73where
74 C: AsyncCm<Key = K>,
75 P: AsyncPwm<Key = K, Value = V>,
76 S: AsyncSpawner,
77{
78 pub async fn write(
81 &self,
82 pending_manager_opts: P::Options,
83 conflict_manager_opts: C::Options,
84 ) -> Result<AsyncWtm<K, V, C, P, S>, TransactionError<C::Error, P::Error>> {
85 let read_ts = self.inner.read_ts().await;
86 Ok(AsyncWtm {
87 orc: self.inner.clone(),
88 read_ts,
89 size: 0,
90 count: 0,
91 conflict_manager: Some(
92 C::new(conflict_manager_opts)
93 .await
94 .map_err(TransactionError::conflict)?,
95 ),
96 pending_writes: Some(
97 P::new(pending_manager_opts)
98 .await
99 .map_err(TransactionError::pending)?,
100 ),
101 duplicate_writes: OneOrMore::new(),
102 discarded: false,
103 done_read: false,
104 })
105 }
106}
107
108impl<K, V, C, P, S> AsyncTm<K, V, C, P, S>
109where
110 S: AsyncSpawner,
111{
112 #[inline]
115 pub async fn new(name: &str, current_version: u64) -> Self {
116 Self {
117 inner: Arc::new({
118 let next_ts = current_version;
119 let orc = Oracle::new(
120 format!("{}.pending_reads", name).into(),
121 format!("{}.txn_timestamps", name).into(),
122 next_ts,
123 );
124 orc.read_mark.done(next_ts).unwrap();
125 orc.txn_mark.done(next_ts).unwrap();
126 orc.increment_next_ts().await;
127 orc
128 }),
129 _phantom: std::marker::PhantomData,
130 }
131 }
132
133 #[inline]
135 pub async fn version(&self) -> u64 {
136 self.inner.read_ts().await
137 }
138
139 #[inline]
141 pub async fn close(&self) {
142 self.inner.stop().await;
143 }
144}
145
146impl<K, V, C, P, S> AsyncTm<K, V, C, P, S>
147where
148 S: AsyncSpawner,
149{
150 pub fn discard_hint(&self) -> u64 {
153 self.inner.discard_at_or_below()
154 }
155
156 pub async fn read(&self) -> AsyncRtm<K, V, C, P, S> {
158 AsyncRtm {
159 db: self.clone(),
160 read_ts: self.inner.read_ts().await,
161 }
162 }
163}