persistent_kv/
lib.rs

1//! Unordered key-value store with all data held in memory but also persisted to disk
2//! for durability.
3//!
4//! The store is designed to be used as a building block for (distributed) systems that require
5//! a high-throughput, ultra low latency, unordered key-value store with persistence guarantee
6//! and can shard the data so that it always fits into the RAM of one machine.
7//!
8//! # Design goals
9//!
10//!  - Lightweight and with few dependencies
11//!  - Support concurrent read/writes with minimal locking times
12//!  - Tunable write throughput / persistence guarantee trade-off
13//!  - Maximize block device throughput and exploit I/O parallelism if supported by OS and hardware
14//!  - Amortized memory and disk usage is O(number of keys), <10% overhead over payload size (e.g.
15//!    no spikes during snapshotting)
16//!  - Support for both fixed-size and variable-size keys and values
17//!
18//! A good mental model is "Hashmap that keeps its contents between program runs". If more
19//! advanced database features are required, [RocksDB](https://docs.rs/rocksdb/latest/rocksdb/)
20//! or [SQLite](https://docs.rs/sqlite/latest/sqlite/) are a better choice.
21//!
22//! # Key and value format
23//!
24//! Both the key and the value side of the store operate exclusively on byte sequences,
25//! converted from high level types via the [`crate::Serializable`] and [`crate::Deserializable`]
26//! traits. This avoids deviation between in-memory representation of objects and their serialized
27//! form, which could otherwise lead to subtle bugs. As a result of operating on serialized
28//! data only, none of [`Send`] and [`Sync`] and none of the hash map traits [`Hash`] or
29//! [`Eq`] are technically required for the key and value types.
30//!
31//! All fixed-size integer types, [`Vec<u8>`] and [`String`] are supported out of the box
32//! as both keys and values. On the value side, [`prost::Message`] types are also directly
33//! supported using the xxx_proto family of methods.
34//!
35//! # Implementation notes
36//!
37//! Persistence is implemented via a write-ahead log that is periodically compacted and
38//! replaced by full snapshots. All files are stored in a single folder. Individual snapshots
39//! can be internally sharded to ease parallel processing and keep file size reasonable.
40//! See the [`snapshot_set`] module for details on snapshot format.
41//!
42//! The on-disk format is a series of records. Each record is a protobuf message prefixed
43//! by a varint encoded length.
44//!
45//! # Performance notes
46//!
47//! If performance or throughput is a concern, you must benchmark and tune store configuration
48//! for the exact hardware and OS you are targeting.
49//!
50//! Defaults in [`Config`] are ok as a starting point and were derived as follows:
51//!
52//!   1) Linux sees a 2-3x improvement in write throughput when using positioned writes
53//!      (enabled) but the same setting has slightly negative effects on Windows (disabled).
54//!
55//!   2) No OS seems to benefit from sharding the write-ahead log (default is 1)
56//!
57//!   3) Target parallelism for snapshot reads/writes is limited by I/O controller concurrency which
58//!      varies by device type (default is 8 which should suit most modern SSDs).
59//!
60//!   4) The number of memory buckets is never a huge factor, as a rule of thumb it should be
61//!      above the number of simultaneous readers (default is 32)
62mod config;
63mod snapshot;
64pub mod snapshot_set; // Accessible from outside the crate
65mod store;
66mod types;
67
68use std::{
69    borrow::{Borrow, Cow},
70    error::Error,
71};
72
73use snapshot_set::FileSnapshotSet;
74use store::{FixedLengthKey64Bit, Store, StoreImpl, VariableLengthKey};
75
76pub use config::{Config, SyncMode};
77
78pub type Result<T> = std::result::Result<T, Box<dyn Error + Send + Sync + 'static>>;
79
80pub struct PersistentKeyValueStore<K, V> {
81    store: StoreImpl,
82    phantom: std::marker::PhantomData<(K, V)>,
83}
84
85// We don't need K, V to be Sync + Send as we only operate on the serialized version
86// when passing the data between threads internally and the external interface
87// exposes only clones, not references.
88unsafe impl<K, V> Sync for PersistentKeyValueStore<K, V> {}
89unsafe impl<K, V> Send for PersistentKeyValueStore<K, V> {}
90
91/// Trait for deserializing a type from a byte slice.
92pub trait Deserializable {
93    fn from_bytes(bytes: &[u8]) -> Self;
94}
95
96/// Trait for serializing a type to a byte slice or a fixed size byte array.
97pub trait Serializable {
98    const IS_FIXED_SIZE: bool;
99    /// May return a borrowed slice or an owned vector.
100    fn serialize(&self) -> Cow<'_, [u8]>;
101    /// May return None if the type is not fixed size representable.
102    fn serialize_fixed_size(&self) -> Option<[u8; 8]>;
103}
104
105impl<K, V> PersistentKeyValueStore<K, V>
106where
107    K: Serializable,
108{
109    /// Constructs a new store instance.
110    /// The store will be backed by the given path and use the provided configuration.
111    /// This function will block on restoring previously saved state from disk.
112    ///
113    /// Only one store instance can be alive at a time for a given path.
114    ///
115    /// # Iterators
116    ///
117    /// The store acts like a collection and supports (read-only) [`Iterator`] but
118    /// does not support any mutating collection traits (e.g. [`FromIterator`] or
119    /// [`Extend`]). This is needed since set() is fallible.
120    ///
121    /// A consuming iterator consumes the in-memory store only, not the on-disk data.
122    ///
123    /// # Example
124    /// ```
125    /// # fn main() -> persistent_kv::Result<()> {
126    /// use persistent_kv::{Config, PersistentKeyValueStore};
127    /// let path = "/tmp/mystore1";
128    /// let store: PersistentKeyValueStore<String, String> =
129    ///     PersistentKeyValueStore::new(path, Config::default())?;
130    /// store.set("foo", "1")?;
131    /// // Drop the first store instance to ensure no two instances are alive.
132    /// drop(store);
133    /// // Second instance constructed from same path recovers the data.
134    /// let store: PersistentKeyValueStore<String, String> =
135    ///     PersistentKeyValueStore::new(path, Config::default())?;
136    /// assert_eq!(store.get("foo"), Some("1".to_string()));
137    /// # Ok(())
138    /// # }
139    /// ```
140    /// # Errors
141    ///
142    /// Propagates IO errors when reading from disk, also fails when the snapshot files
143    /// don't follow the exact naming schema expected (and written) by this crate.
144    pub fn new(path: impl AsRef<std::path::Path>, config: Config) -> Result<Self> {
145        let snapshot_set = FileSnapshotSet::new(path.as_ref())?;
146        Ok(Self {
147            store: if <K as Serializable>::IS_FIXED_SIZE {
148                StoreImpl::FixedKey(Store::new(snapshot_set, config)?)
149            } else {
150                StoreImpl::VariableKey(Store::new(snapshot_set, config)?)
151            },
152            phantom: std::marker::PhantomData,
153        })
154    }
155
156    /// Removes a key from the store.
157    /// Supports using borrowed keys (e.g. [`str`] for a [`String`] key).
158    /// # Example
159    /// ```
160    /// # fn main() -> persistent_kv::Result<()> {
161    /// use persistent_kv::{Config, PersistentKeyValueStore};
162    /// let store: PersistentKeyValueStore<String, String> =
163    ///     PersistentKeyValueStore::new("/tmp/mystore2", Config::default())?;
164    /// store.set("foo", "1")?;
165    /// assert_eq!(store.get("foo"), Some("1".to_string()));
166    /// store.unset("foo")?;
167    /// assert_eq!(store.get("foo"), None);
168    /// # Ok(())
169    /// # }
170    /// ```
171    /// # Errors
172    /// Propagates any IO errors that occur directly as a result of the write operation.
173    pub fn unset<Q>(&self, key: &Q) -> Result<()>
174    where
175        K: Borrow<Q>,
176        Q: ?Sized + Serializable,
177    {
178        match &self.store {
179            StoreImpl::FixedKey(store) => store
180                .unset(key.serialize_fixed_size().unwrap().borrow())
181                .map(|_| ()),
182            StoreImpl::VariableKey(store) => store.unset(key.serialize().borrow()).map(|_| ()),
183        }
184    }
185
186    fn get_<Q, F, V2>(&self, key: &Q, c: F) -> Option<V2>
187    where
188        K: Borrow<Q>,
189        Q: ?Sized + Serializable,
190        F: FnOnce(Option<&[u8]>) -> Option<V2>,
191    {
192        match &self.store {
193            StoreImpl::VariableKey(store) => store.get_convert(&key.serialize(), c),
194            StoreImpl::FixedKey(store) => {
195                store.get_convert(key.serialize_fixed_size().unwrap().borrow(), c)
196            }
197        }
198    }
199
200    fn set_(&self, key: K, value: Vec<u8>) -> Result<()> {
201        match &self.store {
202            StoreImpl::FixedKey(store) => store
203                .set(
204                    FixedLengthKey64Bit(key.serialize_fixed_size().unwrap()),
205                    value,
206                )
207                .map(|_| ()),
208            StoreImpl::VariableKey(store) => store
209                .set(VariableLengthKey(key.serialize().into_owned()), value)
210                .map(|_| ()),
211        }
212    }
213}
214
215/// Store methods for simple values: Vec[u8], String, integers. We bypass all serialization
216/// frameworks for these types.
217impl<K, V> PersistentKeyValueStore<K, V>
218where
219    K: Serializable,
220    V: Deserializable + Serializable,
221{
222    /// Sets a key-value pair in the store.
223    /// If the key already exists, the value will be overwritten.
224    /// # Example
225    /// ```
226    /// # fn main() -> persistent_kv::Result<()> {
227    /// use persistent_kv::{Config, PersistentKeyValueStore};
228    /// let store: PersistentKeyValueStore<String, String> =
229    ///    PersistentKeyValueStore::new("/tmp/mystore3", Config::default())?;
230    /// store.set("foo", "1")?;
231    /// assert_eq!(store.get("foo"), Some("1".to_string()));
232    /// # Ok(())
233    /// # }
234    /// ```
235    /// # Errors
236    /// Propagates any IO errors that occur directly as a result of the write operation.
237    pub fn set(&self, key: impl Into<K>, value: impl Into<V>) -> Result<()> {
238        self.set_(key.into(), value.into().serialize().into_owned())
239    }
240
241    /// Retrieves a value from the store.
242    /// Supports lookups using borrowed keys (e.g. [`str`] for a [`String`] key).
243    /// # Example
244    /// ```
245    /// # fn main() -> persistent_kv::Result<()> {
246    /// use persistent_kv::{Config, PersistentKeyValueStore};
247    /// let store: PersistentKeyValueStore<String, String> =
248    ///    PersistentKeyValueStore::new("/tmp/mystore4", Config::default())?;
249    /// store.set("foo", "1")?;
250    /// assert_eq!(store.get("foo"), Some("1".to_string()));
251    /// # Ok(())
252    /// # }
253    /// ```
254    pub fn get<Q>(&self, key: &Q) -> Option<V>
255    where
256        K: Borrow<Q>,
257        Q: ?Sized + Serializable,
258    {
259        self.get_(key, |bytes| bytes.map(|bytes| V::from_bytes(bytes)))
260    }
261}
262
263/// Store version for protobuf values.
264impl<K, V> PersistentKeyValueStore<K, V>
265where
266    K: Serializable,
267    V: prost::Message + Default,
268{
269    /// Sets a protobuf-coded value in the store.
270    /// If the key already exists, the value will be overwritten.
271    /// # Example
272    /// ```
273    /// # fn main() -> persistent_kv::Result<()> {
274    /// use prost::Message;
275    /// use persistent_kv::{Config, PersistentKeyValueStore};
276    /// #[derive(Clone, PartialEq, Message)]
277    /// pub struct Foo {
278    ///     #[prost(uint32, tag = "1")]
279    ///     pub bar: u32,
280    /// }
281    /// let store: PersistentKeyValueStore<String, Foo> =
282    ///    PersistentKeyValueStore::new("/tmp/mystore5", Config::default())?;
283    /// store.set_proto("foo", Foo {bar: 42})?;
284    /// assert_eq!(store.get_proto("foo")?, Some(Foo {bar: 42}));
285    /// # Ok(())
286    /// # }
287    /// ```
288    /// # Errors
289    /// Propagates any IO errors that occur directly as a result of the write operation.
290    pub fn set_proto(&self, key: impl Into<K>, value: impl prost::Message) -> Result<()> {
291        self.set_(key.into(), value.encode_to_vec())
292    }
293
294    /// Retrieves a protobuf-coded value from the store.
295    /// Supports lookups using borrowed keys (e.g. [`str`] for a [`String`] key).
296    /// # Example
297    /// ```
298    /// # fn main() -> persistent_kv::Result<()> {
299    /// use prost::Message;
300    /// use persistent_kv::{Config, PersistentKeyValueStore};
301    /// #[derive(Clone, PartialEq, Message)]
302    /// pub struct Foo {
303    ///     #[prost(uint32, tag = "1")]
304    ///     pub bar: u32,
305    /// }
306    /// let store: PersistentKeyValueStore<String, Foo> =
307    ///    PersistentKeyValueStore::new("/tmp/mystore6", Config::default())?;
308    /// store.set_proto("foo", Foo {bar: 42})?;
309    /// assert_eq!(store.get_proto("foo")?, Some(Foo {bar: 42}));
310    /// # Ok(())
311    /// # }
312    /// ```
313    /// # Errors
314    /// Forwards proto decode errors.
315    pub fn get_proto<Q>(&self, key: &Q) -> std::result::Result<Option<V>, prost::DecodeError>
316    where
317        K: Borrow<Q>,
318        Q: ?Sized + Serializable,
319    {
320        self.get_(key, |bytes| bytes.map(|bytes| V::decode(bytes)))
321            .transpose()
322    }
323}
324
325/// Debug trait for PersistentKeyValueStore
326impl<K, V> std::fmt::Debug for PersistentKeyValueStore<K, V> {
327    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
328        let (num_elements, num_bytes) = match &self.store {
329            StoreImpl::FixedKey(store) => store.compute_size_info(),
330            StoreImpl::VariableKey(store) => store.compute_size_info(),
331        };
332        write!(
333            f,
334            "PersistentKeyValueStore({} elements, {} KiB total size)",
335            num_elements,
336            num_bytes / 1024
337        )?;
338        Ok(())
339    }
340}
341
342/// Implement IntoIterator for PersistentKeyValueStore and for &PersistentKeyValueStore
343/// using the underlying store's implementation, encoding/decoding keys and values as needed.
344impl<K, V> IntoIterator for PersistentKeyValueStore<K, V>
345where
346    K: Deserializable + Serializable,
347    V: Deserializable + Serializable,
348{
349    type Item = (K, V);
350    // Until we can use impl Trait in associated types, erasing iterator type avoids
351    // some boilerplate.
352    type IntoIter = Box<dyn Iterator<Item = Self::Item> + 'static>;
353
354    fn into_iter(self) -> Self::IntoIter {
355        match self.store {
356            StoreImpl::FixedKey(store) => Box::new(
357                store
358                    .into_iter()
359                    .map(|(k, v)| (K::from_bytes(&k.0), V::from_bytes(&v))),
360            ) as Box<dyn Iterator<Item = Self::Item>>,
361            StoreImpl::VariableKey(store) => Box::new(
362                store
363                    .into_iter()
364                    .map(|(k, v)| (K::from_bytes(&k.0[..]), V::from_bytes(&v))),
365            ) as Box<dyn Iterator<Item = Self::Item>>,
366        }
367    }
368}
369
370impl<'a, K, V> IntoIterator for &'a PersistentKeyValueStore<K, V>
371where
372    K: Deserializable + Serializable,
373    V: Deserializable + Serializable,
374{
375    type Item = (K, V);
376    // Until we can use impl Trait in associated types, erasing iterator type avoids
377    // some boilerplate.
378    type IntoIter = Box<dyn Iterator<Item = Self::Item> + 'a>;
379
380    fn into_iter(self) -> Self::IntoIter {
381        match self.store {
382            StoreImpl::FixedKey(ref store) => Box::new(
383                store
384                    .into_iter()
385                    .map(|(k, v)| (K::from_bytes(&k.0), V::from_bytes(&v))),
386            ) as Box<dyn Iterator<Item = Self::Item>>,
387            StoreImpl::VariableKey(ref store) => Box::new(
388                store
389                    .into_iter()
390                    .map(|(k, v)| (K::from_bytes(&k.0[..]), V::from_bytes(&v))),
391            )
392                as Box<dyn Iterator<Item = Self::Item>>,
393        }
394    }
395}
396
397impl<K, V> PersistentKeyValueStore<K, V>
398where
399    K: Deserializable + Serializable,
400    V: Deserializable + Serializable,
401{
402    pub fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (K, V)> + 'a> {
403        <&Self as IntoIterator>::into_iter(self)
404    }
405}
406
407#[cfg(test)]
408mod tests {
409    use super::*;
410    use tempfile::TempDir;
411    // This set of tests is not exhaustive as the store itself is tested in-depth in the store module.
412    // Below tests focus on the public API and the serialization/deserialization traits in as far
413    // as the doctests don't already cover them.
414
415    #[test]
416    fn test_send_sync() {
417        fn assert_send_sync<T: Send + Sync>() {}
418        assert_send_sync::<PersistentKeyValueStore<u32, u32>>();
419    }
420
421    #[test]
422    fn setget_string_string() {
423        let tmp_dir = TempDir::new().unwrap();
424        let store: PersistentKeyValueStore<String, String> =
425            PersistentKeyValueStore::new(tmp_dir.path(), Config::default()).unwrap();
426        store.set("foo", "1").unwrap();
427        assert_eq!(store.get("foo"), Some("1".to_string()));
428        store.unset("foo").unwrap();
429        assert_eq!(store.get("foo"), None);
430    }
431
432    #[test]
433    fn setget_u64_u64() {
434        let tmp_dir = TempDir::new().unwrap();
435        let store: PersistentKeyValueStore<u64, u64> =
436            PersistentKeyValueStore::new(tmp_dir.path(), Config::default()).unwrap();
437        store.set(35293853295u64, 1139131311u64).unwrap();
438        assert_eq!(store.get(&35293853295u64), Some(1139131311u64));
439        store.unset(&35293853295u64).unwrap();
440        assert_eq!(store.get(&35293853295u64), None);
441    }
442
443    #[test]
444    fn setget_i32_i32() {
445        let tmp_dir = TempDir::new().unwrap();
446        let store: PersistentKeyValueStore<i32, i32> =
447            PersistentKeyValueStore::new(tmp_dir.path(), Config::default()).unwrap();
448        store.set(352938539, 113913131).unwrap();
449        assert_eq!(store.get(&352938539), Some(113913131));
450        store.unset(&352938539).unwrap();
451        assert_eq!(store.get(&352938539), None);
452    }
453
454    #[test]
455    fn debug_trait() {
456        let tmp_dir = TempDir::new().unwrap();
457        let store: PersistentKeyValueStore<String, String> =
458            PersistentKeyValueStore::new(tmp_dir.path(), Config::default()).unwrap();
459        store.set("foo", "1".repeat(2048)).unwrap();
460        assert_eq!(
461            format!("{store:?}"),
462            "PersistentKeyValueStore(1 elements, 2 KiB total size)"
463        );
464    }
465
466    #[test]
467    fn into_iter() {
468        let tmp_dir = TempDir::new().unwrap();
469        let store: PersistentKeyValueStore<String, String> =
470            PersistentKeyValueStore::new(tmp_dir.path(), Config::default()).unwrap();
471        store.set("foo", "1").unwrap();
472        store.set("bar", "2").unwrap();
473        let mut iter = store.into_iter();
474        assert_eq!(iter.next(), Some(("foo".to_string(), "1".to_string())));
475        assert_eq!(iter.next(), Some(("bar".to_string(), "2".to_string())));
476        assert_eq!(iter.next(), None);
477
478        // into_iter() consumes the store instance but not the on-disk data.
479        let store: PersistentKeyValueStore<String, String> =
480            PersistentKeyValueStore::new(tmp_dir.path(), Config::default()).unwrap();
481        assert_eq!(store.get("foo"), Some("1".to_string()));
482    }
483
484    #[test]
485    fn ref_iter() {
486        let tmp_dir = TempDir::new().unwrap();
487        let store: PersistentKeyValueStore<String, String> =
488            PersistentKeyValueStore::new(tmp_dir.path(), Config::default()).unwrap();
489        store.set("foo", "1").unwrap();
490        store.set("bar", "2").unwrap();
491        let mut iter = store.iter();
492        assert_eq!(iter.next(), Some(("foo".to_string(), "1".to_string())));
493        assert_eq!(iter.next(), Some(("bar".to_string(), "2".to_string())));
494        assert_eq!(iter.next(), None);
495    }
496}