keyvaluedb_memorydb/
lib.rs1#![deny(clippy::all)]
4
5use keyvaluedb::{
6 DBKey, DBKeyRef, DBKeyValueRef, DBOp, DBTransaction, DBTransactionError, DBValue, KeyValueDB,
7};
8use parking_lot::RwLock;
9use std::{
10 collections::{BTreeMap, HashMap},
11 future::Future,
12 io,
13 pin::Pin,
14 sync::Arc,
15};
16
17#[derive(Clone)]
20pub struct InMemory {
21 columns: Arc<RwLock<HashMap<u32, BTreeMap<DBKey, DBValue>>>>,
22}
23
24pub fn create(num_cols: u32) -> InMemory {
27 let mut cols = HashMap::new();
28
29 for idx in 0..num_cols {
30 cols.insert(idx, BTreeMap::new());
31 }
32
33 InMemory {
34 columns: Arc::new(RwLock::new(cols)),
35 }
36}
37
38impl KeyValueDB for InMemory {
39 fn get<'a>(
40 &self,
41 col: u32,
42 key: &'a [u8],
43 ) -> Pin<Box<dyn Future<Output = io::Result<Option<DBValue>>> + Send + 'a>> {
44 let this = self.clone();
45 Box::pin(async move {
46 let columns = this.columns.read();
47 match columns.get(&col) {
48 None => Err(io::Error::from(io::ErrorKind::NotFound)),
49 Some(map) => Ok(map.get(key).cloned()),
50 }
51 })
52 }
53
54 fn delete<'a>(
56 &self,
57 col: u32,
58 key: &'a [u8],
59 ) -> Pin<Box<dyn Future<Output = io::Result<Option<DBValue>>> + Send + 'a>> {
60 let this = self.clone();
61 Box::pin(async move {
62 let mut columns = this.columns.write();
63 match columns.get_mut(&col) {
64 None => Err(io::Error::from(io::ErrorKind::NotFound)),
65 Some(map) => Ok(map.remove(key)),
66 }
67 })
68 }
69
70 fn write(
71 &self,
72 transaction: DBTransaction,
73 ) -> Pin<Box<dyn Future<Output = Result<(), DBTransactionError>> + Send>> {
74 let this = self.clone();
75 Box::pin(async move {
76 let mut columns = this.columns.write();
77 let ops = transaction.ops;
78 for op in ops {
79 match op {
80 DBOp::Insert { col, key, value } => {
81 if let Some(col) = columns.get_mut(&col) {
82 col.insert(key, value);
83 }
84 }
85 DBOp::Delete { col, key } => {
86 if let Some(col) = columns.get_mut(&col) {
87 col.remove(&*key);
88 }
89 }
90 DBOp::DeletePrefix { col, prefix } => {
91 if let Some(col) = columns.get_mut(&col) {
92 use std::ops::Bound;
93 if prefix.is_empty() {
94 col.clear();
95 } else {
96 let start_range = Bound::Included(prefix.to_vec());
97 let keys: Vec<_> =
98 if let Some(end_range) = keyvaluedb::end_prefix(&prefix[..]) {
99 col.range((start_range, Bound::Excluded(end_range)))
100 .map(|(k, _)| k.clone())
101 .collect()
102 } else {
103 col.range((start_range, Bound::Unbounded))
104 .map(|(k, _)| k.clone())
105 .collect()
106 };
107 for key in keys.into_iter() {
108 col.remove(&key[..]);
109 }
110 }
111 }
112 }
113 }
114 }
115 Ok(())
116 })
117 }
118
119 fn iter<'a, T: 'a, F: FnMut(DBKeyValueRef) -> io::Result<Option<T>> + Send + Sync + 'a>(
120 &self,
121 col: u32,
122 prefix: Option<&'a [u8]>,
123 mut f: F,
124 ) -> Pin<Box<dyn Future<Output = io::Result<Option<T>>> + Send + 'a>> {
125 let this = self.clone();
126 Box::pin(async move {
127 match this.columns.read().get(&col) {
128 Some(map) => {
129 for (k, v) in map {
130 if let Some(p) = prefix {
131 if !k.starts_with(p) {
132 continue;
133 }
134 }
135 match f((k, v)) {
136 Ok(None) => (),
137 Ok(Some(v)) => return Ok(Some(v)),
138 Err(e) => return Err(e),
139 }
140 }
141 Ok(None)
142 }
143 None => Err(io::Error::from(io::ErrorKind::NotFound)),
144 }
145 })
146 }
147
148 fn iter_keys<'a, T: 'a, F: FnMut(DBKeyRef) -> io::Result<Option<T>> + Send + Sync + 'a>(
149 &self,
150 col: u32,
151 prefix: Option<&'a [u8]>,
152 mut f: F,
153 ) -> Pin<Box<dyn Future<Output = io::Result<Option<T>>> + Send + 'a>> {
154 let this = self.clone();
155 Box::pin(async move {
156 match this.columns.read().get(&col) {
157 Some(map) => {
158 for k in map.keys() {
159 if let Some(p) = prefix {
160 if !k.starts_with(p) {
161 continue;
162 }
163 }
164 match f(k) {
165 Ok(None) => (),
166 Ok(Some(v)) => return Ok(Some(v)),
167 Err(e) => return Err(e),
168 }
169 }
170 Ok(None)
171 }
172 None => Err(io::Error::from(io::ErrorKind::NotFound)),
173 }
174 })
175 }
176
177 fn num_columns(&self) -> io::Result<u32> {
178 Ok(self.columns.read().len() as u32)
179 }
180
181 fn num_keys(&self, col: u32) -> Pin<Box<dyn Future<Output = io::Result<u64>> + Send>> {
182 let this = self.clone();
183 Box::pin(async move {
184 let c = this.columns.read();
185 let Some(column) = c.get(&col) else {
186 return Err(io::Error::from(io::ErrorKind::NotFound));
187 };
188 Ok(column.len() as u64)
189 })
190 }
191
192 fn restore(&self, _new_db: &str) -> io::Result<()> {
193 Err(io::Error::new(
194 io::ErrorKind::Other,
195 "Attempted to restore in-memory database",
196 ))
197 }
198}
199
200#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
201#[cfg(test)]
202mod tests {
203 use super::create;
204 use keyvaluedb_shared_tests as st;
205 use std::io;
206
207 #[tokio::test]
208 async fn get_fails_with_non_existing_column() -> io::Result<()> {
209 let db = create(1);
210 st::test_get_fails_with_non_existing_column(db).await
211 }
212
213 #[tokio::test]
214 async fn put_and_get() -> io::Result<()> {
215 let db = create(1);
216 st::test_put_and_get(db).await
217 }
218
219 #[tokio::test]
220 async fn num_keys() -> io::Result<()> {
221 let db = create(1);
222 st::test_num_keys(db).await
223 }
224
225 #[tokio::test]
226 async fn delete_and_get() -> io::Result<()> {
227 let db = create(1);
228 st::test_delete_and_get(db).await
229 }
230
231 #[tokio::test]
232 async fn delete_prefix() -> io::Result<()> {
233 let db = create(st::DELETE_PREFIX_NUM_COLUMNS);
234 st::test_delete_prefix(db).await
235 }
236
237 #[tokio::test]
238 async fn iter() -> io::Result<()> {
239 let db = create(1);
240 st::test_iter(db).await
241 }
242
243 #[tokio::test]
244 async fn iter_keys() -> io::Result<()> {
245 let db = create(1);
246 st::test_iter_keys(db).await
247 }
248
249 #[tokio::test]
250 async fn iter_with_prefix() -> io::Result<()> {
251 let db = create(1);
252 st::test_iter_with_prefix(db).await
253 }
254
255 #[tokio::test]
256 async fn complex() -> io::Result<()> {
257 let db = create(1);
258 st::test_complex(db).await
259 }
260}