1#![allow(clippy::type_complexity)]
7#![forbid(unsafe_code)]
8#![deny(warnings, missing_docs)]
9#![cfg_attr(docsrs, feature(doc_cfg))]
10#![cfg_attr(docsrs, allow(unused_attributes))]
11
12use std::{cell::RefCell, sync::Arc};
13
14use core::{hash::BuildHasher, mem};
15
16use either::Either;
17use indexmap::{IndexMap, IndexSet};
18use smallvec_wrapper::MediumVec;
19pub use smallvec_wrapper::OneOrMore;
20
21pub mod error;
23
24#[cfg(any(feature = "test", test))]
26#[cfg_attr(docsrs, doc(cfg(feature = "test")))]
27pub mod tests;
28
29mod oracle;
30use oracle::*;
31mod read;
32pub use read::*;
33mod write;
34pub use write::*;
35
36pub use mwmr_core::{sync::*, types::*};
37
38#[derive(Debug, Clone)]
40pub struct Options {
41 detect_conflicts: bool,
42}
43
44impl core::default::Default for Options {
45 fn default() -> Self {
46 Self::new()
47 }
48}
49
50impl Options {
51 #[inline]
53 pub const fn new() -> Self {
54 Self {
55 detect_conflicts: true,
56 }
57 }
58
59 #[inline]
61 pub const fn detect_conflicts(&self) -> bool {
62 self.detect_conflicts
63 }
64
65 #[inline]
67 pub fn set_detect_conflicts(&mut self, detect_conflicts: bool) -> &mut Self {
68 self.detect_conflicts = detect_conflicts;
69 self
70 }
71
72 #[inline]
74 pub const fn with_detect_conflicts(mut self, detect_conflicts: bool) -> Self {
75 self.detect_conflicts = detect_conflicts;
76 self
77 }
78}
79
80struct Inner<D, S = std::hash::RandomState> {
81 db: D,
82 opts: Options,
85 orc: Oracle<S>,
86 hasher: S,
87}
88pub struct TransactionDB<D, S = std::hash::RandomState> {
90 inner: Arc<Inner<D, S>>,
91}
92
93impl<D, S> Clone for TransactionDB<D, S> {
94 fn clone(&self) -> Self {
95 Self {
96 inner: self.inner.clone(),
97 }
98 }
99}
100
101impl<D: Database, S: BuildHasher + Default + Clone + 'static> TransactionDB<D, S>
102where
103 D::Key: Eq + core::hash::Hash,
104{
105 pub fn write(&self) -> WriteTransaction<D, IndexMapManager<D::Key, D::Value, S>, S> {
108 WriteTransaction {
109 db: self.clone(),
110 read_ts: self.inner.orc.read_ts(),
111 size: 0,
112 count: 0,
113 reads: MediumVec::new(),
114 conflict_keys: if self.inner.opts.detect_conflicts {
115 Some(IndexSet::with_hasher(self.inner.hasher.clone()))
116 } else {
117 None
118 },
119 pending_writes: Some(IndexMap::with_hasher(S::default())),
120 duplicate_writes: OneOrMore::new(),
121 discarded: false,
122 done_read: false,
123 }
124 }
125}
126
127impl<D: Database, S: Clone + 'static> TransactionDB<D, S> {
128 pub fn write_by<W: PendingManager>(&self, backend: W) -> WriteTransaction<D, W, S> {
130 WriteTransaction {
131 db: self.clone(),
132 read_ts: self.inner.orc.read_ts(),
133 size: 0,
134 count: 0,
135 reads: MediumVec::new(),
136 conflict_keys: if self.inner.opts.detect_conflicts {
137 Some(IndexSet::with_hasher(self.inner.hasher.clone()))
138 } else {
139 None
140 },
141 pending_writes: Some(backend),
142 duplicate_writes: OneOrMore::new(),
143 discarded: false,
144 done_read: false,
145 }
146 }
147}
148
149impl<D: Database, S: Default> TransactionDB<D, S> {
150 pub fn new(transaction_opts: Options, database_opts: D::Options) -> Result<Self, D::Error> {
152 Self::with_hasher(transaction_opts, database_opts, S::default())
153 }
154}
155
156impl<D: Database, S> TransactionDB<D, S> {
157 pub fn with_hasher(
159 transaction_opts: Options,
160 database_opts: D::Options,
161 hasher: S,
162 ) -> Result<Self, D::Error> {
163 D::open(database_opts).map(|db| Self {
164 inner: Arc::new(Inner {
165 orc: {
166 let next_ts = db.maximum_version();
167 let orc = Oracle::new(
168 format!("{}.pending_reads", core::any::type_name::<D>()).into(),
169 format!("{}.txn_timestamps", core::any::type_name::<D>()).into(),
170 transaction_opts.detect_conflicts(),
171 next_ts,
172 );
173 orc.read_mark.done_unchecked(next_ts);
174 orc.txn_mark.done_unchecked(next_ts);
175 orc.increment_next_ts();
176 orc
177 },
178 db,
179 opts: transaction_opts,
180 hasher,
181 }),
182 })
183 }
184
185 pub fn discard_hint(&self) -> u64 {
188 self.inner.orc.discard_at_or_below()
189 }
190
191 pub fn database_options(&self) -> &D::Options {
193 self.inner.db.options()
194 }
195
196 pub fn transaction_options(&self) -> &Options {
198 &self.inner.opts
199 }
200
201 pub fn database(&self) -> &D {
206 &self.inner.db
207 }
208
209 pub fn read(&self) -> ReadTransaction<D, S> {
211 ReadTransaction {
212 db: self.clone(),
213 read_ts: self.inner.orc.read_ts(),
214 }
215 }
216
217 #[inline]
218 fn orc(&self) -> &Oracle<S> {
219 &self.inner.orc
220 }
221}