1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
use std::{collections::hash_map::RandomState, hash::Hash};

use super::*;

mod write;
pub use write::*;

#[cfg(all(test, any(feature = "tokio", feature = "smol", feature = "async-std")))]
mod tests;

/// Database for [`smol`](https://crates.io/crates/smol) runtime.
#[cfg(feature = "smol")]
#[cfg_attr(docsrs, doc(cfg(feature = "smol")))]
pub type SmolOptimisticDb<K, V, S = RandomState> = OptimisticDb<K, V, SmolSpawner, S>;

/// Database for [`tokio`](https://crates.io/crates/tokio) runtime.
#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
pub type TokioOptimisticDb<K, V, S = RandomState> = OptimisticDb<K, V, TokioSpawner, S>;

/// Database for [`async-std`](https://crates.io/crates/async-std) runtime.
#[cfg(feature = "async-std")]
#[cfg_attr(docsrs, doc(cfg(feature = "async-std")))]
pub type AsyncStdOptimisticDb<K, V, S = RandomState> = OptimisticDb<K, V, AsyncStdSpawner, S>;

struct Inner<K, V, SP, S = RandomState>
where
  SP: AsyncSpawner,
{
  tm: AsyncTm<K, V, HashCm<K, S>, BTreePwm<K, V>, SP>,
  map: SkipCore<K, V>,
  hasher: S,
}

impl<K, V, SP: AsyncSpawner, S> Inner<K, V, SP, S> {
  async fn new(name: &str, hasher: S) -> Self {
    let tm = AsyncTm::<_, _, _, _, SP>::new(name, 0).await;
    Self {
      tm,
      map: SkipCore::new(),
      hasher,
    }
  }

  async fn version(&self) -> u64 {
    self.tm.version().await
  }
}

/// A concurrent MVCC in-memory key-value database.
///
/// `OptimisticDb` requires key to be [`Ord`] and [`Hash`](Hash).
///
/// Comparing to [`SerializableDb`](crate::serializable::SerializableDb):
/// 1. `SerializableDb` support full serializable snapshot isolation, which can detect both direct dependencies and indirect dependencies.
/// 2. `SerializableDb` does not require key to implement [`Hash`](core::hash::Hash).
/// 3. But, [`OptimisticDb`](crate::optimistic::OptimisticDb) has more flexible write transaction APIs and no clone happen.
pub struct OptimisticDb<K, V, SP: AsyncSpawner, S = RandomState> {
  inner: Arc<Inner<K, V, SP, S>>,
}

#[doc(hidden)]
impl<K, V, SP, S> AsSkipCore<K, V> for OptimisticDb<K, V, SP, S>
where
  SP: AsyncSpawner,
{
  #[inline]
  fn as_inner(&self) -> &SkipCore<K, V> {
    &self.inner.map
  }
}

impl<K, V, SP, S> Clone for OptimisticDb<K, V, SP, S>
where
  SP: AsyncSpawner,
{
  #[inline]
  fn clone(&self) -> Self {
    Self {
      inner: self.inner.clone(),
    }
  }
}

impl<K, V, SP: AsyncSpawner> OptimisticDb<K, V, SP> {
  /// Creates a new `OptimisticDb` with the given options.
  #[inline]
  pub async fn new() -> Self {
    Self::with_hasher(Default::default()).await
  }
}

impl<K, V, SP: AsyncSpawner, S> OptimisticDb<K, V, SP, S> {
  /// Creates a new `OptimisticDb` with the given hasher.
  #[inline]
  pub async fn with_hasher(hasher: S) -> Self {
    let inner = Arc::new(Inner::<_, _, SP, _>::new(core::any::type_name::<Self>(), hasher).await);
    Self { inner }
  }

  /// Returns the current read version of the database.
  #[inline]
  pub async fn version(&self) -> u64 {
    self.inner.version().await
  }

  /// Create a read transaction.
  #[inline]
  pub async fn read(&self) -> ReadTransaction<K, V, OptimisticDb<K, V, SP, S>, HashCm<K, S>, SP> {
    ReadTransaction::new(self.clone(), self.inner.tm.read().await)
  }
}

impl<K, V, SP, S> OptimisticDb<K, V, SP, S>
where
  K: Ord + Hash + Eq,
  S: BuildHasher + Clone,
  SP: AsyncSpawner,
{
  /// Create a write transaction.
  #[inline]
  pub async fn write(&self) -> OptimisticTransaction<K, V, SP, S> {
    OptimisticTransaction::new(self.clone(), None).await
  }

  /// Create a write transaction with the given capacity hint.
  #[inline]
  pub async fn write_with_capacity(&self, capacity: usize) -> OptimisticTransaction<K, V, SP, S> {
    OptimisticTransaction::new(self.clone(), Some(capacity)).await
  }
}

impl<K, V, SP, S> OptimisticDb<K, V, SP, S>
where
  K: Ord + Eq + Hash + Send + 'static,
  V: Send + 'static,
  SP: AsyncSpawner,
{
  /// Compact the database.
  #[inline]
  pub fn compact(&self) {
    self.inner.map.compact(self.inner.tm.discard_hint());
  }
}