1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
use super::*;

mod write;
pub use write::*;

#[cfg(test)]
mod tests;

struct Inner<K, V, SP, S = std::hash::RandomState> {
  tm: AsyncTm<K, V, HashCm<K, S>, PendingMap<K, V>, SP>,
  map: SkipCore<K, V>,
  hasher: S,
  max_batch_size: u64,
  max_batch_entries: u64,
}

impl<K, V, SP: AsyncSpawner, S> Inner<K, V, SP, S> {
  async fn new(name: &str, max_batch_size: u64, max_batch_entries: u64, hasher: S) -> Self {
    let tm = AsyncTm::<_, _, _, _, SP>::new(name, 0).await;
    Self {
      tm,
      map: SkipCore::new(),
      hasher,
      max_batch_size,
      max_batch_entries,
    }
  }

  async fn version(&self) -> u64 {
    self.tm.version().await
  }
}

/// A concurrent ACID, MVCC in-memory database based on [`crossbeam-skiplist`][crossbeam_skiplist].
///
/// `EquivalentDB` requires key to be [`Ord`] and [`Hash`](core::hash::Hash).
///
/// Comparing to [`ComparableDB`](crate::comparable::ComparableDB),
/// `EquivalentDB` has more flexible write transaction APIs and no clone happen.
/// But, [`ComparableDB`](crate::comparable::ComparableDB) does not require the key to implement [`Hash`](core::hash::Hash).
pub struct EquivalentDB<K, V, SP, S = std::hash::RandomState> {
  inner: Arc<Inner<K, V, SP, S>>,
}

impl<K, V, S> AsSkipCore<K, V> for EquivalentDB<K, V, S> {
  #[inline]
  fn as_inner(&self) -> &SkipCore<K, V> {
    &self.inner.map
  }
}

impl<K, V, SP, S> Clone for EquivalentDB<K, V, SP, S> {
  #[inline]
  fn clone(&self) -> Self {
    Self {
      inner: self.inner.clone(),
    }
  }
}

impl<K, V, SP: AsyncSpawner> EquivalentDB<K, V, SP> {
  /// Creates a new `EquivalentDB` with the given options.
  #[inline]
  pub async fn new() -> Self {
    Self::with_options_and_hasher(Default::default(), Default::default()).await
  }
}

impl<K, V, SP: AsyncSpawner, S> EquivalentDB<K, V, SP, S> {
  /// Creates a new `EquivalentDB` with the given hasher.
  #[inline]
  pub async fn with_hasher(hasher: S) -> Self {
    Self::with_options_and_hasher(Default::default(), hasher).await
  }

  /// Creates a new `EquivalentDB` with the given options and hasher.
  #[inline]
  pub async fn with_options_and_hasher(opts: Options, hasher: S) -> Self {
    let inner = Arc::new(
      Inner::<_, _, SP, _>::new(
        core::any::type_name::<Self>(),
        opts.max_batch_size(),
        opts.max_batch_entries(),
        hasher,
      )
      .await,
    );
    Self { inner }
  }

  /// Returns the current read version of the database.
  #[inline]
  pub async fn version(&self) -> u64 {
    self.inner.version().await
  }

  /// Create a read transaction.
  #[inline]
  pub async fn read(&self) -> ReadTransaction<K, V, EquivalentDB<K, V, SP, S>, HashCm<K, S>, SP> {
    ReadTransaction::new(self.clone(), self.inner.tm.read().await)
  }
}

impl<K, V, SP, S> EquivalentDB<K, V, SP, S>
where
  K: Ord + core::hash::Hash + Eq,
  S: BuildHasher + Clone,
  SP: AsyncSpawner,
{
  /// Create a write transaction.
  #[inline]
  pub async fn write(&self) -> WriteTransaction<K, V, SP, S> {
    WriteTransaction::new(self.clone(), None).await
  }

  /// Create a write transaction with the given capacity hint.
  #[inline]
  pub async fn write_with_capacity(&self, capacity: usize) -> WriteTransaction<K, V, SP, S> {
    WriteTransaction::new(self.clone(), Some(capacity)).await
  }
}

impl<K, V, SP, S> EquivalentDB<K, V, SP, S>
where
  K: Ord + Eq + core::hash::Hash + Send + 'static,
  V: Send + 'static,
  SP: AsyncSpawner,
{
  /// Compact the database.
  #[inline]
  pub fn compact(&self) {
    self.inner.map.compact(self.inner.tm.discard_hint());
  }
}