skipdb_core/
lib.rs

1#![cfg_attr(not(feature = "std"), no_std)]
2#![deny(warnings)]
3#![allow(clippy::type_complexity)]
4
5extern crate alloc;
6
7use core::{
8  borrow::Borrow,
9  ops::{Bound, RangeBounds},
10  sync::atomic::{AtomicU64, Ordering},
11};
12
13use alloc::collections::btree_map::{Iter as BTreeMapIter, Range as BTreeMapRange};
14
15use smallvec_wrapper::OneOrMore;
16use txn_core::types::{Entry, EntryData, EntryValue};
17
18use crossbeam_skiplist::SkipMap;
19
20pub mod iter;
21use iter::*;
22
23pub mod rev_iter;
24use rev_iter::*;
25
26pub mod range;
27use range::*;
28
29pub mod rev_range;
30use rev_range::*;
31
32pub mod types;
33use types::*;
34
35#[doc(hidden)]
36pub trait Database<K, V>: AsSkipCore<K, V> {}
37
38impl<K, V, T: AsSkipCore<K, V>> Database<K, V> for T {}
39
40#[doc(hidden)]
41pub trait AsSkipCore<K, V> {
42  // This trait is sealed and cannot be implemented for types outside of this crate.
43  // So returning a reference to the inner database is ok.
44  fn as_inner(&self) -> &SkipCore<K, V>;
45}
46
47pub struct SkipCore<K, V> {
48  map: SkipMap<K, Values<V>>,
49  last_discard_version: AtomicU64,
50}
51
52impl<K, V> Default for SkipCore<K, V> {
53  #[inline]
54  fn default() -> Self {
55    Self::new()
56  }
57}
58
59impl<K, V> SkipCore<K, V> {
60  #[inline]
61  pub fn new() -> Self {
62    Self {
63      map: SkipMap::new(),
64      last_discard_version: AtomicU64::new(0),
65    }
66  }
67
68  #[inline]
69  #[doc(hidden)]
70  #[allow(private_interfaces)]
71  pub fn __by_ref(&self) -> &SkipMap<K, Values<V>> {
72    &self.map
73  }
74}
75
76impl<K, V> SkipCore<K, V>
77where
78  K: Ord,
79  V: Send + 'static,
80{
81  pub fn apply(&self, entries: OneOrMore<Entry<K, V>>) {
82    for ent in entries {
83      let version = ent.version();
84      match ent.data {
85        EntryData::Insert { key, value } => {
86          let ent = self.map.get_or_insert_with(key, || Values::new());
87          let val = ent.value();
88          val.lock();
89          val.insert(version, Some(value));
90          val.unlock();
91        }
92        EntryData::Remove(key) => {
93          if let Some(values) = self.map.get(&key) {
94            let values = values.value();
95            if !values.is_empty() {
96              values.insert(version, None);
97            }
98          }
99        }
100      }
101    }
102  }
103}
104
105impl<K, V> SkipCore<K, V>
106where
107  K: Ord,
108{
109  pub fn get<Q>(&self, key: &Q, version: u64) -> Option<CommittedRef<'_, K, V>>
110  where
111    K: Borrow<Q>,
112    Q: Ord + ?Sized,
113  {
114    let ent = self.map.get(key)?;
115    let version = ent
116      .value()
117      .upper_bound(Bound::Included(&version))
118      .and_then(|v| {
119        if v.value().is_some() {
120          Some(*v.key())
121        } else {
122          None
123        }
124      })?;
125
126    Some(CommittedRef { ent, version })
127  }
128
129  pub fn contains_key<Q>(&self, key: &Q, version: u64) -> bool
130  where
131    K: Borrow<Q>,
132    Q: Ord + ?Sized,
133  {
134    match self.map.get(key) {
135      None => false,
136      Some(values) => match values.value().upper_bound(Bound::Included(&version)) {
137        None => false,
138        Some(ent) => ent.value().is_some(),
139      },
140    }
141  }
142
143  pub fn iter(&self, version: u64) -> Iter<'_, K, V> {
144    let iter = self.map.iter();
145    Iter { iter, version }
146  }
147
148  pub fn iter_rev(&self, version: u64) -> RevIter<'_, K, V> {
149    let iter = self.map.iter();
150    RevIter {
151      iter: iter.rev(),
152      version,
153    }
154  }
155
156  pub fn range<Q, R>(&self, range: R, version: u64) -> Range<'_, Q, R, K, V>
157  where
158    K: Borrow<Q>,
159    R: RangeBounds<Q>,
160    Q: Ord + ?Sized,
161  {
162    Range {
163      range: self.map.range(range),
164      version,
165    }
166  }
167
168  pub fn range_rev<Q, R>(&self, range: R, version: u64) -> RevRange<'_, Q, R, K, V>
169  where
170    K: Borrow<Q>,
171    R: RangeBounds<Q>,
172    Q: Ord + ?Sized,
173  {
174    RevRange {
175      range: self.map.range(range).rev(),
176      version,
177    }
178  }
179}
180
181impl<K, V> SkipCore<K, V>
182where
183  K: Ord + Send + 'static,
184  V: Send + 'static,
185{
186  pub fn compact(&self, new_discard_version: u64) {
187    match self
188      .last_discard_version
189      .fetch_update(Ordering::SeqCst, Ordering::Acquire, |val| {
190        if val >= new_discard_version {
191          None
192        } else {
193          Some(new_discard_version)
194        }
195      }) {
196      Ok(_) => {}
197      // if we fail to insert the new discard version,
198      // which means there is another thread that is compacting the database.
199      // To avoid run multiple compacting at the same time, we just return.
200      Err(_) => return,
201    }
202
203    for ent in self.map.iter() {
204      let values = ent.value();
205
206      // if the oldest version is larger or equal to the new discard version,
207      // then nothing to remove.
208      if let Some(oldest) = values.front() {
209        let oldest_version = *oldest.key();
210        if oldest_version >= new_discard_version {
211          continue;
212        }
213      }
214
215      if let Some(newest) = values.back() {
216        let newest_version = *newest.key();
217
218        // if the newest version is smaller than the new discard version,
219        if newest_version < new_discard_version {
220          // if the newest value is none, then we can try to remove the whole key.
221          if newest.value().is_none() {
222            // try to lock the entry.
223            if values.try_lock() {
224              // we get the lock, then we can remove the whole key.
225              ent.remove();
226
227              // unlock the entry.
228              values.unlock();
229              continue;
230            }
231          }
232
233          // we leave the current newest value and try to remove previous values.
234          let mut prev = newest.prev();
235          while let Some(ent) = prev {
236            prev = ent.prev();
237            ent.remove();
238          }
239          continue;
240        }
241
242        // handle the complex case: we have some values that are larger than the new discard version,
243        // and some values that are smaller than the new discard version.
244
245        // find the first value that is smaller than the new discard version.
246        let mut bound = values.upper_bound(Bound::Excluded(&new_discard_version));
247
248        // means that no value is smaller than the new discard version.
249        if bound.is_none() {
250          continue;
251        }
252
253        // remove all values that are smaller than the new discard version.
254        while let Some(ent) = bound {
255          bound = ent.prev();
256          ent.remove();
257        }
258      } else {
259        // we do not have any value in the entry, then we can try to remove the whole key.
260
261        // try to lock the entry.
262        if values.try_lock() {
263          // we get the lock, then we can remove the whole key.
264          ent.remove();
265
266          // unlock the entry.
267          values.unlock();
268        }
269      }
270    }
271  }
272}