persistent_kv/lib.rs
1//! Unordered key-value store with all data held in memory but also persisted to disk
2//! for durability.
3//!
4//! The store is designed to be used as a building block for (distributed) systems that require
5//! a high-throughput, ultra low latency, unordered key-value store with persistence guarantee
6//! and can shard the data so that it always fits into the RAM of one machine.
7//!
8//! # Design goals
9//!
10//! - Lightweight and with few dependencies
11//! - Support concurrent read/writes with minimal locking times
12//! - Tunable write throughput / persistence guarantee trade-off
13//! - Maximize block device throughput and exploit I/O parallelism if supported by OS and hardware
14//! - Amortized memory and disk usage is O(number of keys), <10% overhead over payload size (e.g.
15//! no spikes during snapshotting)
16//! - Support for both fixed-size and variable-size keys and values
17//!
18//! A good mental model is "Hashmap that keeps its contents between program runs". If more
19//! advanced database features are required, [RocksDB](https://docs.rs/rocksdb/latest/rocksdb/)
20//! or [SQLite](https://docs.rs/sqlite/latest/sqlite/) are a better choice.
21//!
22//! # Key and value format
23//!
24//! Both the key and the value side of the store operate exclusively on byte sequences,
25//! converted from high level types via the [`crate::Serializable`] and [`crate::Deserializable`]
26//! traits. This avoids deviation between in-memory representation of objects and their serialized
27//! form, which could otherwise lead to subtle bugs. As a result of operating on serialized
28//! data only, none of [`Send`] and [`Sync`] and none of the hash map traits [`Hash`] or
29//! [`Eq`] are technically required for the key and value types.
30//!
31//! All fixed-size integer types, [`Vec<u8>`] and [`String`] are supported out of the box
32//! as both keys and values. On the value side, [`prost::Message`] types are also directly
33//! supported using the xxx_proto family of methods.
34//!
35//! # Implementation notes
36//!
37//! Persistence is implemented via a write-ahead log that is periodically compacted and
38//! replaced by full snapshots. All files are stored in a single folder. Individual snapshots
39//! can be internally sharded to ease parallel processing and keep file size reasonable.
40//! See the [`snapshot_set`] module for details on snapshot format.
41//!
42//! The on-disk format is a series of records. Each record is a protobuf message prefixed
43//! by a varint encoded length.
44//!
45//! # Performance notes
46//!
47//! If performance or throughput is a concern, you must benchmark and tune store configuration
48//! for the exact hardware and OS you are targeting.
49//!
50//! Defaults in [`Config`] are ok as a starting point and were derived as follows:
51//!
52//! 1) Linux sees a 2-3x improvement in write throughput when using positioned writes
53//! (enabled) but the same setting has slightly negative effects on Windows (disabled).
54//!
55//! 2) No OS seems to benefit from sharding the write-ahead log (default is 1)
56//!
57//! 3) Target parallelism for snapshot reads/writes is limited by I/O controller concurrency which
58//! varies by device type (default is 8 which should suit most modern SSDs).
59//!
60//! 4) The number of memory buckets is never a huge factor, as a rule of thumb it should be
61//! above the number of simultaneous readers (default is 32)
62mod config;
63mod snapshot;
64pub mod snapshot_set; // Accessible from outside the crate
65mod store;
66mod types;
67
68use std::{
69 borrow::{Borrow, Cow},
70 error::Error,
71};
72
73use snapshot_set::FileSnapshotSet;
74use store::{FixedLengthKey64Bit, Store, StoreImpl, VariableLengthKey};
75
76pub use config::{Config, SyncMode};
77
78pub type Result<T> = std::result::Result<T, Box<dyn Error + Send + Sync + 'static>>;
79
80pub struct PersistentKeyValueStore<K, V> {
81 store: StoreImpl,
82 phantom: std::marker::PhantomData<(K, V)>,
83}
84
85// We don't need K, V to be Sync + Send as we only operate on the serialized version
86// when passing the data between threads internally and the external interface
87// exposes only clones, not references.
88unsafe impl<K, V> Sync for PersistentKeyValueStore<K, V> {}
89unsafe impl<K, V> Send for PersistentKeyValueStore<K, V> {}
90
91/// Trait for deserializing a type from a byte slice.
92pub trait Deserializable {
93 fn from_bytes(bytes: &[u8]) -> Self;
94}
95
96/// Trait for serializing a type to a byte slice or a fixed size byte array.
97pub trait Serializable {
98 const IS_FIXED_SIZE: bool;
99 /// May return a borrowed slice or an owned vector.
100 fn serialize(&self) -> Cow<'_, [u8]>;
101 /// May return None if the type is not fixed size representable.
102 fn serialize_fixed_size(&self) -> Option<[u8; 8]>;
103}
104
105impl<K, V> PersistentKeyValueStore<K, V>
106where
107 K: Serializable,
108{
109 /// Constructs a new store instance.
110 /// The store will be backed by the given path and use the provided configuration.
111 /// This function will block on restoring previously saved state from disk.
112 ///
113 /// Only one store instance can be alive at a time for a given path.
114 ///
115 /// # Iterators
116 ///
117 /// The store acts like a collection and supports (read-only) [`Iterator`] but
118 /// does not support any mutating collection traits (e.g. [`FromIterator`] or
119 /// [`Extend`]). This is needed since set() is fallible.
120 ///
121 /// A consuming iterator consumes the in-memory store only, not the on-disk data.
122 ///
123 /// # Example
124 /// ```
125 /// # fn main() -> persistent_kv::Result<()> {
126 /// use persistent_kv::{Config, PersistentKeyValueStore};
127 /// let path = "/tmp/mystore1";
128 /// let store: PersistentKeyValueStore<String, String> =
129 /// PersistentKeyValueStore::new(path, Config::default())?;
130 /// store.set("foo", "1")?;
131 /// // Drop the first store instance to ensure no two instances are alive.
132 /// drop(store);
133 /// // Second instance constructed from same path recovers the data.
134 /// let store: PersistentKeyValueStore<String, String> =
135 /// PersistentKeyValueStore::new(path, Config::default())?;
136 /// assert_eq!(store.get("foo"), Some("1".to_string()));
137 /// # Ok(())
138 /// # }
139 /// ```
140 /// # Errors
141 ///
142 /// Propagates IO errors when reading from disk, also fails when the snapshot files
143 /// don't follow the exact naming schema expected (and written) by this crate.
144 pub fn new(path: impl AsRef<std::path::Path>, config: Config) -> Result<Self> {
145 let snapshot_set = FileSnapshotSet::new(path.as_ref())?;
146 Ok(Self {
147 store: if <K as Serializable>::IS_FIXED_SIZE {
148 StoreImpl::FixedKey(Store::new(snapshot_set, config)?)
149 } else {
150 StoreImpl::VariableKey(Store::new(snapshot_set, config)?)
151 },
152 phantom: std::marker::PhantomData,
153 })
154 }
155
156 /// Removes a key from the store.
157 /// Supports using borrowed keys (e.g. [`str`] for a [`String`] key).
158 /// # Example
159 /// ```
160 /// # fn main() -> persistent_kv::Result<()> {
161 /// use persistent_kv::{Config, PersistentKeyValueStore};
162 /// let store: PersistentKeyValueStore<String, String> =
163 /// PersistentKeyValueStore::new("/tmp/mystore2", Config::default())?;
164 /// store.set("foo", "1")?;
165 /// assert_eq!(store.get("foo"), Some("1".to_string()));
166 /// store.unset("foo")?;
167 /// assert_eq!(store.get("foo"), None);
168 /// # Ok(())
169 /// # }
170 /// ```
171 /// # Errors
172 /// Propagates any IO errors that occur directly as a result of the write operation.
173 pub fn unset<Q>(&self, key: &Q) -> Result<()>
174 where
175 K: Borrow<Q>,
176 Q: ?Sized + Serializable,
177 {
178 match &self.store {
179 StoreImpl::FixedKey(store) => store
180 .unset(key.serialize_fixed_size().unwrap().borrow())
181 .map(|_| ()),
182 StoreImpl::VariableKey(store) => store.unset(key.serialize().borrow()).map(|_| ()),
183 }
184 }
185
186 fn get_<Q, F, V2>(&self, key: &Q, c: F) -> Option<V2>
187 where
188 K: Borrow<Q>,
189 Q: ?Sized + Serializable,
190 F: FnOnce(Option<&[u8]>) -> Option<V2>,
191 {
192 match &self.store {
193 StoreImpl::VariableKey(store) => store.get_convert(&key.serialize(), c),
194 StoreImpl::FixedKey(store) => {
195 store.get_convert(key.serialize_fixed_size().unwrap().borrow(), c)
196 }
197 }
198 }
199
200 fn set_(&self, key: K, value: Vec<u8>) -> Result<()> {
201 match &self.store {
202 StoreImpl::FixedKey(store) => store
203 .set(
204 FixedLengthKey64Bit(key.serialize_fixed_size().unwrap()),
205 value,
206 )
207 .map(|_| ()),
208 StoreImpl::VariableKey(store) => store
209 .set(VariableLengthKey(key.serialize().into_owned()), value)
210 .map(|_| ()),
211 }
212 }
213}
214
215/// Store methods for simple values: Vec[u8], String, integers. We bypass all serialization
216/// frameworks for these types.
217impl<K, V> PersistentKeyValueStore<K, V>
218where
219 K: Serializable,
220 V: Deserializable + Serializable,
221{
222 /// Sets a key-value pair in the store.
223 /// If the key already exists, the value will be overwritten.
224 /// # Example
225 /// ```
226 /// # fn main() -> persistent_kv::Result<()> {
227 /// use persistent_kv::{Config, PersistentKeyValueStore};
228 /// let store: PersistentKeyValueStore<String, String> =
229 /// PersistentKeyValueStore::new("/tmp/mystore3", Config::default())?;
230 /// store.set("foo", "1")?;
231 /// assert_eq!(store.get("foo"), Some("1".to_string()));
232 /// # Ok(())
233 /// # }
234 /// ```
235 /// # Errors
236 /// Propagates any IO errors that occur directly as a result of the write operation.
237 pub fn set(&self, key: impl Into<K>, value: impl Into<V>) -> Result<()> {
238 self.set_(key.into(), value.into().serialize().into_owned())
239 }
240
241 /// Retrieves a value from the store.
242 /// Supports lookups using borrowed keys (e.g. [`str`] for a [`String`] key).
243 /// # Example
244 /// ```
245 /// # fn main() -> persistent_kv::Result<()> {
246 /// use persistent_kv::{Config, PersistentKeyValueStore};
247 /// let store: PersistentKeyValueStore<String, String> =
248 /// PersistentKeyValueStore::new("/tmp/mystore4", Config::default())?;
249 /// store.set("foo", "1")?;
250 /// assert_eq!(store.get("foo"), Some("1".to_string()));
251 /// # Ok(())
252 /// # }
253 /// ```
254 pub fn get<Q>(&self, key: &Q) -> Option<V>
255 where
256 K: Borrow<Q>,
257 Q: ?Sized + Serializable,
258 {
259 self.get_(key, |bytes| bytes.map(|bytes| V::from_bytes(bytes)))
260 }
261}
262
263/// Store version for protobuf values.
264impl<K, V> PersistentKeyValueStore<K, V>
265where
266 K: Serializable,
267 V: prost::Message + Default,
268{
269 /// Sets a protobuf-coded value in the store.
270 /// If the key already exists, the value will be overwritten.
271 /// # Example
272 /// ```
273 /// # fn main() -> persistent_kv::Result<()> {
274 /// use prost::Message;
275 /// use persistent_kv::{Config, PersistentKeyValueStore};
276 /// #[derive(Clone, PartialEq, Message)]
277 /// pub struct Foo {
278 /// #[prost(uint32, tag = "1")]
279 /// pub bar: u32,
280 /// }
281 /// let store: PersistentKeyValueStore<String, Foo> =
282 /// PersistentKeyValueStore::new("/tmp/mystore5", Config::default())?;
283 /// store.set_proto("foo", Foo {bar: 42})?;
284 /// assert_eq!(store.get_proto("foo")?, Some(Foo {bar: 42}));
285 /// # Ok(())
286 /// # }
287 /// ```
288 /// # Errors
289 /// Propagates any IO errors that occur directly as a result of the write operation.
290 pub fn set_proto(&self, key: impl Into<K>, value: impl prost::Message) -> Result<()> {
291 self.set_(key.into(), value.encode_to_vec())
292 }
293
294 /// Retrieves a protobuf-coded value from the store.
295 /// Supports lookups using borrowed keys (e.g. [`str`] for a [`String`] key).
296 /// # Example
297 /// ```
298 /// # fn main() -> persistent_kv::Result<()> {
299 /// use prost::Message;
300 /// use persistent_kv::{Config, PersistentKeyValueStore};
301 /// #[derive(Clone, PartialEq, Message)]
302 /// pub struct Foo {
303 /// #[prost(uint32, tag = "1")]
304 /// pub bar: u32,
305 /// }
306 /// let store: PersistentKeyValueStore<String, Foo> =
307 /// PersistentKeyValueStore::new("/tmp/mystore6", Config::default())?;
308 /// store.set_proto("foo", Foo {bar: 42})?;
309 /// assert_eq!(store.get_proto("foo")?, Some(Foo {bar: 42}));
310 /// # Ok(())
311 /// # }
312 /// ```
313 /// # Errors
314 /// Forwards proto decode errors.
315 pub fn get_proto<Q>(&self, key: &Q) -> std::result::Result<Option<V>, prost::DecodeError>
316 where
317 K: Borrow<Q>,
318 Q: ?Sized + Serializable,
319 {
320 self.get_(key, |bytes| bytes.map(|bytes| V::decode(bytes)))
321 .transpose()
322 }
323}
324
325/// Debug trait for PersistentKeyValueStore
326impl<K, V> std::fmt::Debug for PersistentKeyValueStore<K, V> {
327 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
328 let (num_elements, num_bytes) = match &self.store {
329 StoreImpl::FixedKey(store) => store.compute_size_info(),
330 StoreImpl::VariableKey(store) => store.compute_size_info(),
331 };
332 write!(
333 f,
334 "PersistentKeyValueStore({} elements, {} KiB total size)",
335 num_elements,
336 num_bytes / 1024
337 )?;
338 Ok(())
339 }
340}
341
342/// Implement IntoIterator for PersistentKeyValueStore and for &PersistentKeyValueStore
343/// using the underlying store's implementation, encoding/decoding keys and values as needed.
344impl<K, V> IntoIterator for PersistentKeyValueStore<K, V>
345where
346 K: Deserializable + Serializable,
347 V: Deserializable + Serializable,
348{
349 type Item = (K, V);
350 // Until we can use impl Trait in associated types, erasing iterator type avoids
351 // some boilerplate.
352 type IntoIter = Box<dyn Iterator<Item = Self::Item> + 'static>;
353
354 fn into_iter(self) -> Self::IntoIter {
355 match self.store {
356 StoreImpl::FixedKey(store) => Box::new(
357 store
358 .into_iter()
359 .map(|(k, v)| (K::from_bytes(&k.0), V::from_bytes(&v))),
360 ) as Box<dyn Iterator<Item = Self::Item>>,
361 StoreImpl::VariableKey(store) => Box::new(
362 store
363 .into_iter()
364 .map(|(k, v)| (K::from_bytes(&k.0[..]), V::from_bytes(&v))),
365 ) as Box<dyn Iterator<Item = Self::Item>>,
366 }
367 }
368}
369
370impl<'a, K, V> IntoIterator for &'a PersistentKeyValueStore<K, V>
371where
372 K: Deserializable + Serializable,
373 V: Deserializable + Serializable,
374{
375 type Item = (K, V);
376 // Until we can use impl Trait in associated types, erasing iterator type avoids
377 // some boilerplate.
378 type IntoIter = Box<dyn Iterator<Item = Self::Item> + 'a>;
379
380 fn into_iter(self) -> Self::IntoIter {
381 match self.store {
382 StoreImpl::FixedKey(ref store) => Box::new(
383 store
384 .into_iter()
385 .map(|(k, v)| (K::from_bytes(&k.0), V::from_bytes(&v))),
386 ) as Box<dyn Iterator<Item = Self::Item>>,
387 StoreImpl::VariableKey(ref store) => Box::new(
388 store
389 .into_iter()
390 .map(|(k, v)| (K::from_bytes(&k.0[..]), V::from_bytes(&v))),
391 )
392 as Box<dyn Iterator<Item = Self::Item>>,
393 }
394 }
395}
396
397impl<K, V> PersistentKeyValueStore<K, V>
398where
399 K: Deserializable + Serializable,
400 V: Deserializable + Serializable,
401{
402 pub fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (K, V)> + 'a> {
403 <&Self as IntoIterator>::into_iter(self)
404 }
405}
406
407#[cfg(test)]
408mod tests {
409 use super::*;
410 use tempfile::TempDir;
411 // This set of tests is not exhaustive as the store itself is tested in-depth in the store module.
412 // Below tests focus on the public API and the serialization/deserialization traits in as far
413 // as the doctests don't already cover them.
414
415 #[test]
416 fn test_send_sync() {
417 fn assert_send_sync<T: Send + Sync>() {}
418 assert_send_sync::<PersistentKeyValueStore<u32, u32>>();
419 }
420
421 #[test]
422 fn setget_string_string() {
423 let tmp_dir = TempDir::new().unwrap();
424 let store: PersistentKeyValueStore<String, String> =
425 PersistentKeyValueStore::new(tmp_dir.path(), Config::default()).unwrap();
426 store.set("foo", "1").unwrap();
427 assert_eq!(store.get("foo"), Some("1".to_string()));
428 store.unset("foo").unwrap();
429 assert_eq!(store.get("foo"), None);
430 }
431
432 #[test]
433 fn setget_u64_u64() {
434 let tmp_dir = TempDir::new().unwrap();
435 let store: PersistentKeyValueStore<u64, u64> =
436 PersistentKeyValueStore::new(tmp_dir.path(), Config::default()).unwrap();
437 store.set(35293853295u64, 1139131311u64).unwrap();
438 assert_eq!(store.get(&35293853295u64), Some(1139131311u64));
439 store.unset(&35293853295u64).unwrap();
440 assert_eq!(store.get(&35293853295u64), None);
441 }
442
443 #[test]
444 fn setget_i32_i32() {
445 let tmp_dir = TempDir::new().unwrap();
446 let store: PersistentKeyValueStore<i32, i32> =
447 PersistentKeyValueStore::new(tmp_dir.path(), Config::default()).unwrap();
448 store.set(352938539, 113913131).unwrap();
449 assert_eq!(store.get(&352938539), Some(113913131));
450 store.unset(&352938539).unwrap();
451 assert_eq!(store.get(&352938539), None);
452 }
453
454 #[test]
455 fn debug_trait() {
456 let tmp_dir = TempDir::new().unwrap();
457 let store: PersistentKeyValueStore<String, String> =
458 PersistentKeyValueStore::new(tmp_dir.path(), Config::default()).unwrap();
459 store.set("foo", "1".repeat(2048)).unwrap();
460 assert_eq!(
461 format!("{store:?}"),
462 "PersistentKeyValueStore(1 elements, 2 KiB total size)"
463 );
464 }
465
466 #[test]
467 fn into_iter() {
468 let tmp_dir = TempDir::new().unwrap();
469 let store: PersistentKeyValueStore<String, String> =
470 PersistentKeyValueStore::new(tmp_dir.path(), Config::default()).unwrap();
471 store.set("foo", "1").unwrap();
472 store.set("bar", "2").unwrap();
473 let mut iter = store.into_iter();
474 assert_eq!(iter.next(), Some(("foo".to_string(), "1".to_string())));
475 assert_eq!(iter.next(), Some(("bar".to_string(), "2".to_string())));
476 assert_eq!(iter.next(), None);
477
478 // into_iter() consumes the store instance but not the on-disk data.
479 let store: PersistentKeyValueStore<String, String> =
480 PersistentKeyValueStore::new(tmp_dir.path(), Config::default()).unwrap();
481 assert_eq!(store.get("foo"), Some("1".to_string()));
482 }
483
484 #[test]
485 fn ref_iter() {
486 let tmp_dir = TempDir::new().unwrap();
487 let store: PersistentKeyValueStore<String, String> =
488 PersistentKeyValueStore::new(tmp_dir.path(), Config::default()).unwrap();
489 store.set("foo", "1").unwrap();
490 store.set("bar", "2").unwrap();
491 let mut iter = store.iter();
492 assert_eq!(iter.next(), Some(("foo".to_string(), "1".to_string())));
493 assert_eq!(iter.next(), Some(("bar".to_string(), "2".to_string())));
494 assert_eq!(iter.next(), None);
495 }
496}