1#![cfg_attr(not(feature = "std"), no_std)]
2#![deny(warnings)]
3#![allow(clippy::type_complexity)]
4
5extern crate alloc;
6
7use core::{
8 borrow::Borrow,
9 ops::{Bound, RangeBounds},
10 sync::atomic::{AtomicU64, Ordering},
11};
12
13use alloc::collections::btree_map::{Iter as BTreeMapIter, Range as BTreeMapRange};
14
15use smallvec_wrapper::OneOrMore;
16use txn_core::types::{Entry, EntryData, EntryValue};
17
18use crossbeam_skiplist::SkipMap;
19
20pub mod iter;
21use iter::*;
22
23pub mod rev_iter;
24use rev_iter::*;
25
26pub mod range;
27use range::*;
28
29pub mod rev_range;
30use rev_range::*;
31
32pub mod types;
33use types::*;
34
35#[doc(hidden)]
36pub trait Database<K, V>: AsSkipCore<K, V> {}
37
38impl<K, V, T: AsSkipCore<K, V>> Database<K, V> for T {}
39
40#[doc(hidden)]
41pub trait AsSkipCore<K, V> {
42 fn as_inner(&self) -> &SkipCore<K, V>;
45}
46
47pub struct SkipCore<K, V> {
48 map: SkipMap<K, Values<V>>,
49 last_discard_version: AtomicU64,
50}
51
52impl<K, V> Default for SkipCore<K, V> {
53 #[inline]
54 fn default() -> Self {
55 Self::new()
56 }
57}
58
59impl<K, V> SkipCore<K, V> {
60 #[inline]
61 pub fn new() -> Self {
62 Self {
63 map: SkipMap::new(),
64 last_discard_version: AtomicU64::new(0),
65 }
66 }
67
68 #[inline]
69 #[doc(hidden)]
70 #[allow(private_interfaces)]
71 pub fn __by_ref(&self) -> &SkipMap<K, Values<V>> {
72 &self.map
73 }
74}
75
76impl<K, V> SkipCore<K, V>
77where
78 K: Ord,
79 V: Send + 'static,
80{
81 pub fn apply(&self, entries: OneOrMore<Entry<K, V>>) {
82 for ent in entries {
83 let version = ent.version();
84 match ent.data {
85 EntryData::Insert { key, value } => {
86 let ent = self.map.get_or_insert_with(key, || Values::new());
87 let val = ent.value();
88 val.lock();
89 val.insert(version, Some(value));
90 val.unlock();
91 }
92 EntryData::Remove(key) => {
93 if let Some(values) = self.map.get(&key) {
94 let values = values.value();
95 if !values.is_empty() {
96 values.insert(version, None);
97 }
98 }
99 }
100 }
101 }
102 }
103}
104
105impl<K, V> SkipCore<K, V>
106where
107 K: Ord,
108{
109 pub fn get<Q>(&self, key: &Q, version: u64) -> Option<CommittedRef<'_, K, V>>
110 where
111 K: Borrow<Q>,
112 Q: Ord + ?Sized,
113 {
114 let ent = self.map.get(key)?;
115 let version = ent
116 .value()
117 .upper_bound(Bound::Included(&version))
118 .and_then(|v| {
119 if v.value().is_some() {
120 Some(*v.key())
121 } else {
122 None
123 }
124 })?;
125
126 Some(CommittedRef { ent, version })
127 }
128
129 pub fn contains_key<Q>(&self, key: &Q, version: u64) -> bool
130 where
131 K: Borrow<Q>,
132 Q: Ord + ?Sized,
133 {
134 match self.map.get(key) {
135 None => false,
136 Some(values) => match values.value().upper_bound(Bound::Included(&version)) {
137 None => false,
138 Some(ent) => ent.value().is_some(),
139 },
140 }
141 }
142
143 pub fn iter(&self, version: u64) -> Iter<'_, K, V> {
144 let iter = self.map.iter();
145 Iter { iter, version }
146 }
147
148 pub fn iter_rev(&self, version: u64) -> RevIter<'_, K, V> {
149 let iter = self.map.iter();
150 RevIter {
151 iter: iter.rev(),
152 version,
153 }
154 }
155
156 pub fn range<Q, R>(&self, range: R, version: u64) -> Range<'_, Q, R, K, V>
157 where
158 K: Borrow<Q>,
159 R: RangeBounds<Q>,
160 Q: Ord + ?Sized,
161 {
162 Range {
163 range: self.map.range(range),
164 version,
165 }
166 }
167
168 pub fn range_rev<Q, R>(&self, range: R, version: u64) -> RevRange<'_, Q, R, K, V>
169 where
170 K: Borrow<Q>,
171 R: RangeBounds<Q>,
172 Q: Ord + ?Sized,
173 {
174 RevRange {
175 range: self.map.range(range).rev(),
176 version,
177 }
178 }
179}
180
181impl<K, V> SkipCore<K, V>
182where
183 K: Ord + Send + 'static,
184 V: Send + 'static,
185{
186 pub fn compact(&self, new_discard_version: u64) {
187 match self
188 .last_discard_version
189 .fetch_update(Ordering::SeqCst, Ordering::Acquire, |val| {
190 if val >= new_discard_version {
191 None
192 } else {
193 Some(new_discard_version)
194 }
195 }) {
196 Ok(_) => {}
197 Err(_) => return,
201 }
202
203 for ent in self.map.iter() {
204 let values = ent.value();
205
206 if let Some(oldest) = values.front() {
209 let oldest_version = *oldest.key();
210 if oldest_version >= new_discard_version {
211 continue;
212 }
213 }
214
215 if let Some(newest) = values.back() {
216 let newest_version = *newest.key();
217
218 if newest_version < new_discard_version {
220 if newest.value().is_none() {
222 if values.try_lock() {
224 ent.remove();
226
227 values.unlock();
229 continue;
230 }
231 }
232
233 let mut prev = newest.prev();
235 while let Some(ent) = prev {
236 prev = ent.prev();
237 ent.remove();
238 }
239 continue;
240 }
241
242 let mut bound = values.upper_bound(Bound::Excluded(&new_discard_version));
247
248 if bound.is_none() {
250 continue;
251 }
252
253 while let Some(ent) = bound {
255 bound = ent.prev();
256 ent.remove();
257 }
258 } else {
259 if values.try_lock() {
263 ent.remove();
265
266 values.unlock();
268 }
269 }
270 }
271 }
272}