assemblage_kv/
lib.rs

1//! # Versioned and transactional key-value store for native and wasm targets.
2//!
3//! This crate provides a persistent key-value store implemented as a
4//! log-structured hash table similar to
5//! [Bitcask](https://riak.com/assets/bitcask-intro.pdf). Writes of new or
6//! changed values never overwrite old entries, but are simply appended to the
7//! end of the storage. Old values are kept at earlier offsets in the storage
8//! and remain accessible. An in-memory hash table tracks the storage offsets of
9//! all keys and allows efficient reads directly from the relevant portions of
10//! the storage. A store can be merged, which discards old versions and builds a
11//! more compact representation containing only the latest value of each key.
12//!
13//! ## Features
14//!
15//!   - _simple_: log-structured hash architecture, with all keys in memory
16//!   - _fully versioned:_ old values remain accessible until merged
17//!   - _transactional:_ all reads and writes happen only in isolated transactions
18//!   - _storage-agnostic:_ supports files on native and IndexedDB on wasm
19//!
20//! ## Example
21//!
22//! ```
23//! use assemblage_kv::{run, storage::{self, Storage}, KvStore, Snapshot, Result};
24//!
25//! fn main() -> Result<()> {
26//!     // The `run!` macro abstracts away the boilerplate of setting up the
27//!     // right async environment and storage for native / wasm and is not
28//!     // needed outside of doc tests.
29//!     run!(async |storage| {
30//!         let store_name = storage.name().to_string();
31//!         let mut store = KvStore::open(storage).await?;
32//!         let slot = 0;
33//!
34//!         {
35//!             let mut current = store.current().await;
36//!             assert_eq!(current.get::<_, u8>(slot, &"key1").await?, None);
37//!             current.insert(slot, &"key1", 1)?;
38//!             current.commit().await?;
39//!         }
40//!
41//!         {
42//!             let mut current = store.current().await;
43//!             assert_eq!(current.get(slot, &"key1").await?, Some(1));
44//!             current.remove(slot, &"key1")?;
45//!             current.commit().await?;
46//!         }
47//!
48//!         {
49//!             let mut current = store.current().await;
50//!             assert_eq!(current.get::<_, u8>(slot, &"key1").await?, None);
51//!             current.insert(slot, &"key1", 3)?;
52//!             current.commit().await?;
53//!         }
54//!
55//!         {
56//!             let current = store.current().await;
57//!             let versions = current.versions(slot, &"key1").await?;
58//!             assert_eq!(versions.len(), 3);
59//!             assert_eq!(current.get_version(slot, &"key1", versions[0]).await?, Some(1));
60//!             assert_eq!(current.get_version::<_, u8>(slot, &"key1", versions[1]).await?, None);
61//!             assert_eq!(current.get_version(slot, &"key1", versions[2]).await?, Some(3));
62//!         }
63//!
64//!         store.merge().await?;
65//!         let storage = storage::open(&store_name).await?;
66//!         let store = KvStore::open(storage).await?;
67//!
68//!         {
69//!             let current = store.current().await;
70//!             let versions = current.versions(slot, &"key1").await?;
71//!             assert_eq!(versions.len(), 1);
72//!             assert_eq!(current.get(slot, &"key1").await?, Some(3));
73//!         }
74//!         Ok(())
75//!     })
76//! }
77//! ```
78
79#![deny(missing_docs)]
80#![deny(broken_intra_doc_links)]
81#![deny(unsafe_code)]
82
83use crate::{storage::Storage, timestamp::timestamp_now_monotonic};
84use crc32fast::Hasher;
85use log::warn;
86use serde::{de::DeserializeOwned, Serialize};
87use std::{cmp::max, collections::HashMap, mem};
88use tokio::sync::{Mutex, MutexGuard};
89
90pub mod storage;
91pub mod timestamp;
92
93const BYTES_TIMESTAMP_FULL: usize = 6;
94const BYTES_CRC: usize = 4;
95
96/// The error type for store operations.
97#[derive(Debug)]
98pub enum Error {
99    /// Caused by storage read or write operations.
100    StorageError(storage::Error),
101    /// The CRC checksum of an entry did not match its content.
102    CorruptDataError(u64),
103    /// The kv entry had an invalid format.
104    InvalidEntryError {
105        /// The reason why the entry was invalid.
106        reason: String,
107    },
108    /// The store key was invalid.
109    InvalidKeyError {
110        /// The reason why the key was invalid.
111        reason: String,
112    },
113    /// The store value was invalid.
114    InvalidValueError {
115        /// The reason why the value was invalid.
116        reason: String,
117    },
118    /// The store key or value exceeded the maximum size supported by the store.
119    MaxSizeExceeded {
120        /// The size of the key or value in bytes.
121        size: usize,
122        /// The maximum bytes available for storing the key or value size.
123        max_bytes: u8,
124        /// The bytes that would be required to store the key or value size.
125        bytes_required: u8,
126    },
127    /// The bytes read exceeded the expected number of bytes for an int.
128    InvalidIntLength {
129        /// The number of bytes of the int that were expected.
130        bytes_expected: u8,
131        /// The number of bytes that were found.
132        bytes_found: u8,
133    },
134    /// The storage could not be locked.
135    StorageLockError,
136    /// The transaction has read a value that has since been overwritten.
137    TransactionConflict,
138}
139
140/// A specialized `Result` type for store operations.
141pub type Result<T> = std::result::Result<T, Error>;
142
143impl<'a> From<storage::Error> for Error {
144    fn from(e: storage::Error) -> Self {
145        Error::StorageError(e)
146    }
147}
148
149/// A versioned key-value store using a log-structured hash table.
150///
151/// Reads and writes `serde` keys and values. All reads/writes happen through
152/// transactions. A store can be merged to discard old versions and thus store a
153/// more compact representation containing only the latest version of each
154/// key-value pair.
155///
156/// Versions are also used to implement a "move to trash" behavior. Whenever a
157/// value is removed, it is not purged from storage but simply marked as
158/// removed. It remains accessible until the "trash is emptied" during the next
159/// merge. As a consequence there are 2 different methods that can read values
160/// from the store, depending on whether the trash should be included or not,
161/// [`Snapshot::get()`] (which will return `None` if the value was "moved to the
162/// trash") and [`Snapshot::get_unremoved()`] (which will return the last
163/// unremoved version if the value was "moved to the trash").
164///
165///   - Keys/values are `Serialize`/`Deserialize` and are
166///     serialized/deserialized to/from [MessagePack](https://msgpack.org/)
167///     using `rmp-serde`.
168///   - Keys are not read/written as-is, but always associated with a "slot".
169///     These slots act as the different "indexes" of a store.
170pub struct KvStore<S: Storage> {
171    name: String,
172    storage: Mutex<S>,
173    offsets: Mutex<HashMap<Vec<u8>, Vec<BlobVersion>>>,
174    latest_timestamp: Mutex<u64>,
175}
176
177impl<S: Storage> KvStore<S> {
178    /// Opens and reads a store from storage.
179    ///
180    /// If no store exists at the storage location, a new store will be
181    /// initialized. Otherwise, the store will be read and checked for corrupted
182    /// data. In case of corruption, everything after the corrupted offset will
183    /// be truncated and later writes will overwrite the corrupted entries.
184    /// After the initial read, a hash table of all the keys in the store and
185    /// their storage offsets is kept in memory.
186    pub async fn open(storage: S) -> Result<Self> {
187        let mut store = Self {
188            name: String::from(storage.name()),
189            storage: Mutex::new(storage),
190            offsets: Mutex::new(HashMap::new()),
191            latest_timestamp: Mutex::new(0),
192        };
193        init_store(&mut store).await?;
194        Ok(store)
195    }
196
197    /// Returns the (file-)name of the storage.
198    pub fn name(&self) -> &str {
199        &self.name
200    }
201
202    /// Consumes the store to return its underlying storage.
203    pub fn into_storage(self) -> Result<S> {
204        Ok(self.storage.into_inner())
205    }
206
207    /// Returns the total length of the storage in bytes.
208    pub async fn len(&self) -> u64 {
209        self.storage.lock().await.len()
210    }
211
212    /// Returns `true` if the storage is empty.
213    pub async fn is_empty(&self) -> bool {
214        self.len().await == 0
215    }
216
217    /// Creates a transactional read-write snapshot of the store at the current
218    /// point in time, see [`Snapshot`].
219    pub async fn current(&self) -> Snapshot<'_, S> {
220        let latest_timestamp = *self.latest_timestamp.lock().await;
221        let latest_offset = self.storage.lock().await.len();
222        let snapshot_timestamp = timestamp_now_monotonic(latest_timestamp);
223        Snapshot {
224            store: self,
225            snapshot_timestamp,
226            latest_timestamp,
227            latest_offset,
228            cached_entries: Mutex::new(HashMap::new()),
229            transaction_entries: HashMap::new(),
230        }
231    }
232
233    /// Merges and compacts the store by removing old versions.
234    ///
235    /// Merging a store reclaims space by removing all versions that were
236    /// superseded by newer writes to the same key. As a side effect, a merge
237    /// "empties the trash" and ensures that removed values cannot be read and
238    /// restored anymore.
239    pub async fn merge(&mut self) -> Result<()> {
240        {
241            self.storage.lock().await.flush().await?;
242            self.storage.lock().await.start_merge().await?;
243
244            let mut crc = Hasher::new();
245            let mut offset = 0;
246            let mut storage = self.storage.lock().await;
247            while offset < storage.len() {
248                let mut entry = Entry::read_from(&mut storage, offset).await?;
249                let entry_length = entry.len() as u64;
250                let offsets = &mut (*self.offsets.lock().await);
251
252                // all kv writes have Some(key), all transactions have None
253                if let Some(k) = entry.key.as_ref() {
254                    if offset == offsets[k].last().unwrap().offset {
255                        entry.update_crc(&mut crc);
256                        entry.write_to(&mut storage).await?;
257                    }
258                } else if entry.is_transaction_commit() {
259                    entry.update_crc(&mut crc);
260                    let crc_merged = crc.finalize();
261                    let crc_original = entry.crc()?;
262                    if crc_merged != crc_original {
263                        entry.set_crc(crc_merged);
264                    }
265                    entry.write_to(&mut storage).await?;
266                    crc = Hasher::new();
267                }
268                offset += entry_length as u64;
269            }
270
271            storage.flush().await?;
272            storage.stop_merge().await?;
273            storage.flush().await?;
274        }
275        init_store(self).await?;
276        Ok(())
277    }
278}
279
280#[derive(Debug, Copy, Clone)]
281enum SnapshotBoundary {
282    Timestamp(u64),
283    Offset(u64),
284}
285
286/// A transactional snapshot of a store at a particular point in time that
287/// caches all reads and buffers all writes in memory.
288///
289/// A transaction is a snapshot of the store at the point in time when the
290/// transaction was started. New values can be added inside the transaction, but
291/// writes from other transactions are isolated from the current transaction.
292/// Reads are cached for each transaction, so that multiple reads of the same
293/// key (and version) only have to access storage once. Writes are only
294/// persisted at the end of a successful transaction, until then all writes
295/// simply mutate an in-memory `HashMap`.
296///
297/// Transactions provide some basic ACID guarantees and must be
298/// [serializable](https://en.wikipedia.org/wiki/Serializability), meaning that
299/// a transaction can only be committed if it does not conflict with a
300/// previously committed transaction. If a transaction `t1` reads any key-value
301/// pair (even a version with an older timestamp) that is modified and committed
302/// in a later transaction `t2` before `t1` is comitted, `t1` will fail with an
303/// [`Error::TransactionConflict`] and must be explicitly rerun by user of the
304/// store. In other words, the following transaction behaviour will lead to a
305/// conflict:
306///
307/// ```text
308/// +- t1: -------+
309/// | read key1   |   +- t2 --------+
310/// |             |   | write key1  |
311/// |             |   | commit: ok  |
312/// | write key1  |   +-------------+
313/// | commit: err |
314/// +-------------+
315/// ```
316pub struct Snapshot<'a, S: Storage> {
317    store: &'a KvStore<S>,
318    snapshot_timestamp: u64,
319    latest_timestamp: u64,
320    latest_offset: u64,
321    transaction_entries: HashMap<Vec<u8>, Option<Vec<u8>>>,
322    cached_entries: Mutex<HashMap<Vec<u8>, ValuesByVersion>>,
323}
324
325type ValuesByVersion = HashMap<Version, Option<Vec<u8>>>;
326
327impl<'a, S: Storage> Snapshot<'a, S> {
328    /// Returns the (file-)name of the store associated with this snapshot.
329    pub fn name(&self) -> &str {
330        self.store.name()
331    }
332
333    /// Returns the latest value associated with a slot and key from the store.
334    ///
335    /// Returns `None` if the key is not found in the store _or if the value
336    /// associated with the key has been removed and was thus "moved to trash"_.
337    pub async fn get<K, V>(&self, slot: u8, k: &K) -> Result<Option<V>>
338    where
339        K: Serialize,
340        V: DeserializeOwned,
341    {
342        let versions = self.versions(slot, k).await?;
343        blob_to_serde_value(self.get_bytes(slot, k, versions.last().copied()).await?)
344    }
345
346    /// Returns the latest _non-removed_ value associated with a slot and key from
347    /// the store (even if the value was moved to the trash).
348    ///
349    /// Returns `None` only if the key is not found in the store _and there is
350    /// no old version of it in the store_. If the value associated with the key
351    /// has been removed from the store but is still "in the trash", the value
352    /// in the trash will be returned. In other words, _some_ value will always
353    /// be returned unless the key has never been written to the store (since
354    /// the last merge).
355    pub async fn get_unremoved<K, V>(&self, slot: u8, k: &K) -> Result<Option<V>>
356    where
357        K: Serialize,
358        V: DeserializeOwned,
359    {
360        let versions = self.versions(slot, k).await?;
361        let unremoved = versions.iter().filter(|v| !v.is_removed);
362        blob_to_serde_value(self.get_bytes(slot, k, unremoved.last().copied()).await?)
363    }
364
365    /// Returns the specified version of the value with the given slot and key.
366    pub async fn get_version<K, V>(&self, slot: u8, k: &K, version: Version) -> Result<Option<V>>
367    where
368        K: Serialize,
369        V: DeserializeOwned,
370    {
371        blob_to_serde_value(self.get_bytes(slot, k, Some(version)).await?)
372    }
373
374    async fn get_bytes<K>(&self, slot: u8, k: &K, v: Option<Version>) -> Result<Option<Vec<u8>>>
375    where
376        K: Serialize,
377    {
378        let k = &serde_to_blob_key(slot, k)?;
379        let mut cached_entries = self.cached_entries.lock().await;
380        if !cached_entries.contains_key(k) {
381            cached_entries.insert(k.clone(), HashMap::new());
382        }
383        if let Some(version) = v {
384            if let Some(entry) = self.transaction_entries.get(k) {
385                if !version.is_committed {
386                    return Ok(entry.clone());
387                }
388            }
389            let versions = cached_entries.get_mut(k).unwrap();
390            if let Some(entry) = versions.get(&version) {
391                return Ok(entry.clone());
392            }
393
394            if let Some(offset) = version.offset {
395                let entry = Entry::read_from(&mut self.store.storage.lock().await, offset).await?;
396                versions.insert(version, entry.val.clone());
397                Ok(entry.val)
398            } else {
399                Ok(None)
400            }
401        } else {
402            Ok(None)
403        }
404    }
405
406    /// Returns all the versions contained in the store for the given slot and
407    /// key, ordered from earliest to latest version.
408    ///
409    /// Since all keys are stored in memory, this operation is quite fast as it
410    /// does not need to access the persistent storage.
411    pub async fn versions<K>(&self, slot: u8, k: &K) -> Result<Vec<Version>>
412    where
413        K: Serialize,
414    {
415        let k = &serde_to_blob_key(slot, k)?;
416        let up_until = self.latest_time_or_offset();
417        let mut versions: Vec<Version> =
418            versions_up_until(self.store.offsets.lock().await.get(k), up_until)
419                .into_iter()
420                .map(|v| v.into())
421                .collect();
422        if let Some(entry) = self.transaction_entries.get(k) {
423            versions.push(Version {
424                offset: None,
425                is_committed: false,
426                is_removed: entry.is_none(),
427                timestamp: self.snapshot_timestamp,
428            });
429        }
430        Ok(versions)
431    }
432
433    /// Returns the timestamp of the last write to the store (in milliseconds
434    /// since the Unix epoch).
435    pub async fn last_updated(&self) -> Result<Option<u64>> {
436        Ok(if !self.transaction_entries.is_empty() {
437            Some(self.snapshot_timestamp)
438        } else if self.latest_timestamp > 0 {
439            Some(self.latest_timestamp)
440        } else {
441            None
442        })
443    }
444
445    /// Returns all non-removed keys of the specified index in the store.
446    ///
447    /// Since all keys are stored in memory, this operation is quite fast as it
448    /// does not need to access the persistent storage.
449    pub async fn keys<K: DeserializeOwned>(&self, slot: u8) -> Result<Vec<K>> {
450        let mut keys: Vec<K> = Vec::new();
451        for (key, versions) in self.store.offsets.lock().await.iter() {
452            let versions = versions_up_until(Some(versions), self.latest_time_or_offset());
453            if let Some(s) = key.last() {
454                if *s == slot && !versions.last().unwrap().is_removed {
455                    let key = rmp_serde::decode::from_read(&key[..key.len() - 1]).map_err(|e| {
456                        Error::InvalidKeyError {
457                            reason: format!("{}", e),
458                        }
459                    });
460                    keys.push(key?);
461                }
462            }
463        }
464        Ok(keys)
465    }
466
467    /// Inserts a key-value pair in the store, superseding older versions.
468    ///
469    /// All inserts are buffered in memory and only persisted at the end of a
470    /// transaction. If an insert is later followed by another insert with the
471    /// same key in the same transaction, only the second insert is written to
472    /// storage, as from the point of view of the transaction both inserts
473    /// happen at the same time and thus only the last one for each key must be
474    /// stored as a new version in the store.
475    pub fn insert<K, V>(&mut self, slot: u8, k: K, v: V) -> Result<()>
476    where
477        K: Serialize,
478        V: Serialize,
479    {
480        let v = serde_to_blob_value(&v)?;
481        self.insert_bytes(slot, k, v)?;
482        Ok(())
483    }
484
485    fn insert_bytes<K>(&mut self, slot: u8, k: K, v: Vec<u8>) -> Result<()>
486    where
487        K: Serialize,
488    {
489        let k = serde_to_blob_key(slot, &k)?;
490        self.transaction_entries.insert(k, Some(v));
491        Ok(())
492    }
493
494    /// Removes the value associated with the given slot and key from the store
495    /// (and moves it to the trash).
496    ///
497    /// All removes are buffered in memory and only persisted at the end of the
498    /// transaction. Removing a key does not purge the associated value from the
499    /// store, instead it simply adds a new version that marks the key as
500    /// removed, while keeping the old versions of the key accessible. As a
501    /// result, this acts like a "move to trash" operation and allows the value
502    /// to be restored from the trash if desired. The trash will be emptied when
503    /// the store is merged, at which point the removed value will be purged
504    /// from the store.
505    pub fn remove<K>(&mut self, slot: u8, k: K) -> Result<()>
506    where
507        K: Serialize,
508    {
509        let k = serde_to_blob_key(slot, &k)?;
510        self.transaction_entries.insert(k, None);
511        Ok(())
512    }
513
514    /// Aborts the current transaction, discarding all of its write operations.
515    pub async fn abort(mut self) -> Result<()> {
516        // Clear transaction explicitly to suppress warning on drop:
517        self.transaction_entries.clear();
518        Ok(())
519    }
520
521    /// Commits the current transaction, persisting all of its write operations
522    /// as new versions in the store.
523    pub async fn commit(mut self) -> Result<()> {
524        let entries = mem::take(&mut self.transaction_entries);
525        if entries.is_empty() {
526            return Ok(());
527        }
528        let mut storage = self.store.storage.lock().await;
529        let mut offsets = self.store.offsets.lock().await;
530        {
531            for k in self.cached_entries.lock().await.keys() {
532                if let Some(versions) = offsets.get(k) {
533                    let version = versions
534                        .last()
535                        .unwrap_or_else(|| panic!("could not find last version of key {:?}", k));
536
537                    // the value that was read in this transaction has since
538                    // been modified by another transaction and committed, the
539                    // current transaction is thus in conflict and cannot be
540                    // committed
541                    if version.offset >= self.latest_offset {
542                        return Err(Error::TransactionConflict);
543                    }
544                }
545            }
546        }
547
548        let mut crc = Hasher::new();
549        let mut uncommitted_offsets = Vec::with_capacity(entries.len());
550        for (k, buf) in entries.into_iter() {
551            if let Some(buf) = buf {
552                let entry = Entry::kv_insert(k, buf)?;
553                let offset = entry.write_to(&mut storage).await?;
554                entry.update_crc(&mut crc);
555                uncommitted_offsets.push((entry.key.unwrap(), offset, false));
556            } else {
557                let entry = Entry::kv_remove(k)?;
558                let offset = entry.write_to(&mut storage).await?;
559                entry.update_crc(&mut crc);
560                uncommitted_offsets.push((entry.key.unwrap(), offset, true));
561            }
562        }
563
564        let t_commit = timestamp_now_monotonic(self.latest_timestamp);
565        let mut entry = Entry::transaction_commit(t_commit)?;
566        entry.update_crc(&mut crc);
567        entry.set_crc(crc.finalize());
568        entry.write_to(&mut storage).await?;
569
570        for (k, offset, is_removed) in uncommitted_offsets {
571            offsets
572                .entry(k.to_vec())
573                .or_insert_with(Vec::new)
574                .push(BlobVersion {
575                    offset,
576                    is_removed,
577                    timestamp: t_commit,
578                });
579        }
580        *self.store.latest_timestamp.lock().await = t_commit;
581        storage.flush().await?;
582        Ok(())
583    }
584
585    fn latest_time_or_offset(&self) -> SnapshotBoundary {
586        if self.snapshot_timestamp == self.latest_timestamp {
587            SnapshotBoundary::Offset(self.latest_offset)
588        } else {
589            SnapshotBoundary::Timestamp(self.latest_timestamp)
590        }
591    }
592}
593
594impl<S: Storage> Drop for Snapshot<'_, S> {
595    fn drop(&mut self) {
596        if !self.transaction_entries.is_empty() {
597            warn!("Snapshot with changes was dropped without being committed!");
598        }
599    }
600}
601
602async fn init_store<S: Storage>(store: &mut KvStore<S>) -> Result<()> {
603    let mut uncommitted = Vec::new();
604    let mut crc = Hasher::new();
605    let mut latest_timestamp = 0;
606    let mut offset = 0;
607    let max_offset = store.len().await;
608    while offset < max_offset {
609        let entry = Entry::read_from(&mut store.storage.lock().await, offset).await?;
610        let entry_length = entry.len() as u64;
611        let offsets = &mut (*store.offsets.lock().await);
612
613        if !entry.is_transaction() {
614            entry.update_crc(&mut crc);
615            uncommitted.push((entry.key.unwrap(), offset, entry.val.is_none()));
616        } else if entry.is_transaction_commit() {
617            entry.update_crc(&mut crc);
618            let crc_kv_writes = crc.finalize();
619            let crc_commit = entry.crc()?;
620            if crc_kv_writes != crc_commit {
621                warn!("Truncating corrupt store at offset {}", offset);
622                store
623                    .storage
624                    .lock()
625                    .await
626                    .truncate(offset)
627                    .await
628                    .expect("Error while truncating storage to remove corrupt data");
629                break;
630            }
631
632            let timestamp_commit = u64_from_bytes(entry.val.as_ref().unwrap())?;
633            for (k, offset, is_removed) in uncommitted.iter() {
634                offsets
635                    .entry(k.to_vec())
636                    .or_insert_with(Vec::new)
637                    .push(BlobVersion {
638                        offset: *offset,
639                        is_removed: *is_removed,
640                        timestamp: timestamp_commit,
641                    });
642            }
643            uncommitted.clear();
644            crc = Hasher::new();
645            latest_timestamp = max(latest_timestamp, timestamp_commit);
646        }
647
648        offset += entry_length as u64;
649    }
650    store.latest_timestamp = Mutex::new(latest_timestamp);
651    Ok(())
652}
653
654fn serde_to_blob_key(slot: u8, k: &impl Serialize) -> Result<Vec<u8>> {
655    rmp_serde::encode::to_vec(k)
656        .map(|mut v| {
657            v.push(slot);
658            v
659        })
660        .map_err(|e| Error::InvalidKeyError {
661            reason: format!("Unable to serialize: {}", e),
662        })
663}
664
665fn serde_to_blob_value(v: &impl Serialize) -> Result<Vec<u8>> {
666    rmp_serde::encode::to_vec(v).map_err(|e| Error::InvalidValueError {
667        reason: format!("Unable to serialize: {}", e),
668    })
669}
670
671fn blob_to_serde_value<V>(v: Option<Vec<u8>>) -> Result<Option<V>>
672where
673    V: DeserializeOwned,
674{
675    v.map(|v| {
676        rmp_serde::decode::from_read(&v[..]).map_err(|e| Error::InvalidValueError {
677            reason: format!("{}", e),
678        })
679    })
680    .transpose()
681}
682
683#[derive(Debug, Copy, Clone)]
684struct BlobVersion {
685    offset: u64,
686    is_removed: bool,
687    timestamp: u64,
688}
689
690fn versions_up_until(
691    versions: Option<&Vec<BlobVersion>>,
692    up_until: SnapshotBoundary,
693) -> Vec<BlobVersion> {
694    versions.map_or(Vec::new(), |v| {
695        v.iter()
696            .filter(|v| match up_until {
697                SnapshotBoundary::Timestamp(t) => v.timestamp <= t,
698                SnapshotBoundary::Offset(o) => v.offset < o,
699            })
700            .cloned()
701            .collect()
702    })
703}
704
705type Value = Vec<u8>;
706
707#[derive(Debug)]
708struct Entry {
709    header: u8,
710    sizes: Vec<u8>,
711    key: Option<Vec<u8>>,
712    val: Option<Vec<u8>>,
713    crc: Option<Vec<u8>>,
714}
715
716// BITS IN THE ENTRY HEADER BYTE:
717//
718// 0b____000_00_000
719//       ||| || \\\__ bytes required to store the value size (0-6 bytes)
720//       ||| \\______ bytes required to store the key size (0-3 bytes)
721//       \\\_________ flags reserved for later use
722impl Entry {
723    fn transaction_commit(timestamp: u64) -> Result<Self> {
724        let mut buf = vec![0; BYTES_TIMESTAMP_FULL];
725        buf.copy_from_slice(&timestamp.to_le_bytes()[..BYTES_TIMESTAMP_FULL]);
726        Self::new(None, Some(buf))
727    }
728
729    fn kv_insert(k: Vec<u8>, v: Value) -> Result<Self> {
730        Self::new(Some(k), Some(v))
731    }
732
733    fn kv_remove(k: Vec<u8>) -> Result<Self> {
734        Self::new(Some(k), None)
735    }
736
737    fn new(k: Option<Vec<u8>>, v: Option<Value>) -> Result<Self> {
738        let key_size = k.as_ref().map_or(0, |k| k.len());
739        let bytes_key_size = k
740            .as_ref()
741            .map_or(Ok(0), |k| bytes_required_for(k.len(), 3))?;
742        let val_size = v.as_ref().map_or(0, |v| v.len());
743        let bytes_val_size = v
744            .as_ref()
745            .map_or(Ok(0), |v| bytes_required_for(v.len(), 6))?;
746        let header = (bytes_key_size << 3) | bytes_val_size;
747
748        let mut sizes = vec![0; (bytes_key_size + bytes_val_size) as usize];
749        sizes[..bytes_key_size as usize]
750            .copy_from_slice(&key_size.to_le_bytes()[0..bytes_key_size as usize]);
751        sizes[bytes_key_size as usize..]
752            .copy_from_slice(&val_size.to_le_bytes()[0..bytes_val_size as usize]);
753
754        Ok(Self {
755            header,
756            sizes,
757            key: k,
758            val: v,
759            crc: None,
760        })
761    }
762
763    async fn read_from<S: Storage>(storage: &mut MutexGuard<'_, S>, offset: u64) -> Result<Self> {
764        let max_length_of_header_and_sizes = 1 + 3 + 6;
765        let mut header_and_sizes = storage.read(offset, max_length_of_header_and_sizes).await?;
766        if header_and_sizes.is_empty() {
767            return Err(Error::InvalidEntryError {
768                reason: "Offset exceeds storage bounds".to_string(),
769            });
770        }
771        let header = header_and_sizes.remove(0);
772        let mut sizes = header_and_sizes;
773        let bytes_val_size = (header & 0b111) as u32;
774        let bytes_key_size = ((header & 0b11000) >> 3) as u32;
775        if bytes_key_size > 3 {
776            return Err(Error::InvalidEntryError {
777                reason: format!(
778                    "Key size can have a maximum of 3 bytes, but has {}",
779                    bytes_key_size
780                ),
781            });
782        }
783        if bytes_val_size > 6 {
784            return Err(Error::InvalidEntryError {
785                reason: format!(
786                    "Value size can have a maximum of 6 bytes, but has {}",
787                    bytes_val_size
788                ),
789            });
790        }
791
792        let offset_sizes = offset + 1;
793        let bytes_sizes = bytes_key_size + bytes_val_size;
794        if sizes.len() < bytes_key_size as usize {
795            return Err(Error::InvalidEntryError {
796                reason: "Invalid length of entry size buffer".to_string(),
797            });
798        }
799        sizes.truncate(bytes_sizes as usize);
800
801        let key_size = u32_from_bytes(&sizes[..bytes_key_size as usize])?;
802        let val_size = u32_from_bytes(&sizes[bytes_key_size as usize..])?;
803        let offset_content = offset_sizes + bytes_sizes as u64;
804
805        if key_size > (1 << 16) {
806            return Err(Error::InvalidEntryError {
807                reason: "Key size is > max size of 2^16 bytes".to_string(),
808            });
809        }
810        if val_size > (1 << 24) {
811            return Err(Error::InvalidEntryError {
812                reason: "Value size is > max size of 2^24 bytes".to_string(),
813            });
814        }
815
816        let is_transaction = bytes_key_size == 0;
817        let (key, val, crc) = if is_transaction {
818            if val_size > 0 {
819                let bytes_content = key_size + val_size + BYTES_CRC as u32;
820                let content = storage.read(offset_content, bytes_content).await?;
821                if content.len() < val_size as usize {
822                    return Err(Error::InvalidEntryError {
823                        reason: "Invalid length of entry content buffer".to_string(),
824                    });
825                }
826                (
827                    None,
828                    Some(content[..val_size as usize].to_vec()),
829                    Some(content[val_size as usize..].to_vec()),
830                )
831            } else {
832                (None, None, None)
833            }
834        } else {
835            let bytes_content = key_size + val_size;
836            let content = storage.read(offset_content, bytes_content).await?;
837            if content.len() < key_size as usize {
838                return Err(Error::InvalidEntryError {
839                    reason: "Invalid length of entry content buffer".to_string(),
840                });
841            }
842            let key = Some(content[..key_size as usize].to_vec());
843            if val_size > 0 {
844                (key, Some(content[key_size as usize..].to_vec()), None)
845            } else {
846                (key, None, None)
847            }
848        };
849        Ok(Self {
850            header,
851            sizes,
852            key,
853            val,
854            crc,
855        })
856    }
857
858    async fn write_to<S: Storage>(&self, storage: &mut MutexGuard<'_, S>) -> Result<u64> {
859        let offset = storage.write(&[self.header]).await?;
860        storage.write(&self.sizes).await?;
861        if let Some(k) = &self.key {
862            storage.write(k).await?;
863        }
864        if let Some(v) = &self.val {
865            storage.write(v).await?;
866        }
867        if let Some(crc) = &self.crc {
868            storage.write(crc).await?;
869        }
870        Ok(offset)
871    }
872
873    fn set_crc(&mut self, crc: u32) {
874        self.crc = Some(crc.to_le_bytes().to_vec());
875    }
876
877    fn crc(&self) -> Result<u32> {
878        if !self.is_transaction_commit() {
879            panic!("Trying to read CRC value from a non-commit entry");
880        }
881        u32_from_bytes(self.crc.as_ref().unwrap())
882    }
883
884    fn update_crc(&self, crc: &mut Hasher) {
885        crc.update(&[self.header]);
886        crc.update(&self.sizes);
887        if let Some(k) = &self.key {
888            crc.update(k);
889        }
890        if let Some(v) = &self.val {
891            crc.update(v);
892        }
893    }
894
895    fn len(&self) -> usize {
896        1 + self.sizes.len()
897            + self.key.as_ref().map_or(0, |k| k.len())
898            + self.val.as_ref().map_or(0, |v| v.len())
899            + self.crc.as_ref().map_or(0, |crc| crc.len())
900    }
901
902    fn is_transaction(&self) -> bool {
903        self.key.is_none()
904    }
905
906    fn is_transaction_commit(&self) -> bool {
907        self.is_transaction() && self.val.as_ref().is_some()
908    }
909}
910
911fn u64_from_bytes(bytes: &[u8]) -> Result<u64> {
912    if bytes.len() > 8 {
913        Err(Error::InvalidIntLength {
914            bytes_expected: 8,
915            bytes_found: bytes.len() as u8,
916        })
917    } else {
918        let mut buf = [0; 8];
919        buf[..bytes.len()].copy_from_slice(bytes);
920        Ok(u64::from_le_bytes(buf))
921    }
922}
923
924fn u32_from_bytes(bytes: &[u8]) -> Result<u32> {
925    if bytes.len() > 4 {
926        Err(Error::InvalidIntLength {
927            bytes_expected: 4,
928            bytes_found: bytes.len() as u8,
929        })
930    } else {
931        let mut buf = [0; 4];
932        buf[..bytes.len()].copy_from_slice(bytes);
933        Ok(u32::from_le_bytes(buf))
934    }
935}
936
937fn bytes_required_for(n: usize, max_bytes: u8) -> Result<u8> {
938    let zero: usize = 0;
939    let bit_length = zero.leading_zeros() - n.leading_zeros();
940    let mut bytes_required = (bit_length / 8) as u8;
941    if bit_length == 0 || bit_length % 8 != 0 {
942        bytes_required += 1;
943    };
944    if bytes_required > max_bytes {
945        Err(Error::MaxSizeExceeded {
946            size: n,
947            max_bytes,
948            bytes_required,
949        })
950    } else {
951        Ok(bytes_required)
952    }
953}
954
955/// A version of a key-value pair in a store at a particular point in time.
956#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
957pub struct Version {
958    offset: Option<u64>,
959    /// True if the entry is persisted in the store, false if not yet committed.
960    pub is_committed: bool,
961    /// True if the key is removed ("moved to trash"), false otherwise.
962    pub is_removed: bool,
963    /// Timestamp of the version in milliseconds since the Unix epoch.
964    pub timestamp: u64,
965}
966
967impl From<BlobVersion> for Version {
968    fn from(v: BlobVersion) -> Self {
969        Self {
970            offset: Some(v.offset),
971            is_committed: true,
972            is_removed: v.is_removed,
973            timestamp: v.timestamp,
974        }
975    }
976}