1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
#![feature(async_iterator)]
#![feature(test)]
mod anvil_db;
mod background;
mod bloom_filter;
mod checksum;
mod common;
mod compactor;
mod concurrent_skip_list;
mod context;
mod helpful_macros;
mod hopscotch;
mod kv;
mod logging;
mod mem_queue;
mod mem_table;
mod os;
mod sst;
mod storage;
mod tablet;
mod var_int;
mod wal;
// test only
mod test_util;
mod tests;
mod tuesday;
mod wednesday;
use std::async_iter::AsyncIterator;
use std::pin::Pin;
use std::task::{Context, Poll};
use anvil_db::{AnvilDb, AnvilDbConfig, AnvilDbScanner};
use context::SimpleContext;
use kv::RangeSet;
use logging::DefaultLogger;
use sst::block_cache::cache::LruBlockCache;
use storage::blob_store::LocalBlobStore;
type SuperSimpleContext = SimpleContext<LocalBlobStore, LruBlockCache, DefaultLogger>;
type SuperSimpleAnvilScanner<'a> = AnvilDbScanner<'a, SuperSimpleContext>;
type SuperSimpleAnvilDb = AnvilDb<SuperSimpleContext>;
#[derive(Debug)]
pub struct AnvilDB {
inner: SuperSimpleAnvilDb,
}
impl AnvilDB {
/// Create a new database instance.
///
/// # Arguments
///
/// - dir_name: the directory containing the database files
///
/// # Returns
///
/// A new database instance, or an error String on error.
pub fn new(dir_name: &str) -> Result<AnvilDB, String> {
let store = LocalBlobStore::new(dir_name)?;
let inner = AnvilDb::new(store)?;
Ok(AnvilDB { inner })
}
/// Create a new database instance with a custom configuration.
///
/// # Arguments
///
/// - dir_name: the directory containing the database files
/// - config: the configuration to use
///
/// # Returns
///
/// A new database instance, or an error String on error.
pub fn with_config(dir_name: &str, config: AnvilDBConfig) -> Result<AnvilDB, String> {
let store = LocalBlobStore::new(dir_name)?;
let inner: SuperSimpleAnvilDb = AnvilDb::<_>::with_config(store, config.inner)?;
Ok(AnvilDB { inner })
}
/// Recover the database.
///
/// # Arguments
///
/// - dirname: the directory containing the database files
///
/// # Returns
///
/// A new database instance, or an error String on error.
pub fn recover(dir_name: &str, config: AnvilDBConfig) -> Result<AnvilDB, String> {
let store = LocalBlobStore::new(dir_name)?;
let inner = AnvilDb::recover(store, config.inner)?;
Ok(AnvilDB { inner })
}
/// Get an element from the database.
///
/// # Arguments
///
/// - key: the key to retrieve
///
/// # Returns
///
/// A byte vector if found, or an error string.
pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, String> {
self.inner.get(key)
}
/// Get an element from the database asynchronously.
///
/// # Arguments
///
/// - key: the key to retrieve
///
/// # Returns
///
/// A future for a byte vector if found, or an error string.
pub async fn async_get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, String> {
self.inner.async_get(key).await
}
/// Set a key-value pair for the database.
///
/// # Arguments
///
/// - key: the key to set
/// - value: the value to set
///
/// # Returns
///
/// An error String on error.
pub fn set(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
self.inner.set(key, value)
}
/// Set a key-value pair for the database asynchronously.
///
/// # Arguments
///
/// - key: the key to set
/// - value: the value to set
///
/// # Returns
///
/// An error String on error.
pub async fn async_set(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
self.inner.async_set(key, value).await
}
/// Remove a key from the database.
///
/// # Arguments
///
/// - key: the key to remove
///
/// # Returns
///
/// An error String on error.
pub fn remove(&self, key: &[u8]) -> Result<(), String> {
self.inner.remove(key)
}
/// Remove a key from the database asynchronously.
///
/// # Arguments
///
/// - key: the key to remove
///
/// # Returns
///
/// An error String on error.
pub async fn async_remove(&self, key: &[u8]) -> Result<(), String> {
self.inner.async_remove(key).await
}
/// Perform a major compaction.
///
/// # Returns
///
/// An error String if an error occurs.
pub fn compact(&self) -> Result<(), String> {
self.inner.compact()
}
/// Close a database.
///
/// # Returns
///
/// An error String on error.
pub fn close(self) -> Result<(), String> {
self.inner.close()
}
/// Get an iterator over the database.
///
/// # Returns
///
/// An iterator over the database.
pub fn try_scan(&self) -> Result<AnvilScanner, String> {
Ok(AnvilScanner {
inner: self.inner.try_scan()?,
})
}
/// Get an asynchronous iterator over the database.
///
/// # Returns
///
/// An asynchronous iterator over the database.
pub fn try_async_scan(&self) -> Result<AnvilScanner, String> {
Ok(AnvilScanner {
inner: self.inner.try_scan()?,
})
}
}
#[derive(Debug)]
pub struct AnvilScanner<'a> {
inner: SuperSimpleAnvilScanner<'a>,
}
impl AnvilScanner<'_> {
pub fn from(mut self, key: &[u8]) -> Self {
self.inner = self.inner.from(key);
self
}
pub fn to(mut self, key: &[u8]) -> Self {
self.inner = self.inner.to(key);
self
}
}
impl Iterator for AnvilScanner<'_> {
type Item = Result<(Vec<u8>, Vec<u8>), String>;
fn next(&mut self) -> Option<Self::Item> {
loop {
let result = self.inner.next()?;
let pair = match result {
Ok(pair) => pair,
Err(err) => {
return Some(Err(err));
}
};
let value = if let Some(value) = pair.value_ref().as_ref() {
value.clone()
} else {
continue;
};
let key = pair.key_ref().to_vec();
return Some(Ok((key, value)));
}
}
}
impl AsyncIterator for AnvilScanner<'_> {
type Item = Result<(Vec<u8>, Vec<u8>), String>;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// TODO(t/1336): This should actually poll the inner iterator.
Poll::Ready(self.next())
}
}
#[derive(Default)]
pub struct AnvilDBConfig {
inner: AnvilDbConfig<DefaultLogger>,
}
impl AnvilDBConfig {
/// Set the maximum number of concurrent writers.
///
/// # Arguments
///
/// - max_concurrent_writers: the maximum number of concurrent writers
///
/// # Returns
///
/// The configuration object.
pub fn with_max_concurrent_writers(self, max_concurrent_writers: usize) -> Self {
let inner = self
.inner
.with_max_concurrent_writers(max_concurrent_writers);
AnvilDBConfig { inner }
}
/// Set the maximum size of the write-ahead log.
///
/// # Arguments
///
/// - max_wal_bytes: the maximum size of the write-ahead log in bytes
///
/// # Returns
///
/// The configuration object.
pub fn with_max_wal_bytes(self, max_wal_bytes: usize) -> Self {
let inner = self.inner.with_max_wal_bytes(max_wal_bytes);
AnvilDBConfig { inner }
}
/// Set the maximum size of the block cache in bytes.
/// By default, the AnvilDB will measure how much memory on the system is
/// unused and plan to use half of it for the block cache.
/// If the OS is not supported or if reading the memory fails, the default
/// is set to 0.5 GiB.
/// Using the function overrides that behavior.
///
/// TODO(t/1441): The memory is not allocated up-front.
///
/// # Arguments
///
/// - cache_size_bytes: the maximum size of the block cache in bytes
///
/// # Returns
///
/// The configuration object.
pub fn with_cache_size_bytes(self, cache_size_bytes: usize) -> Self {
let inner = self.inner.with_cache_size_bytes(cache_size_bytes);
AnvilDBConfig { inner }
}
}