keyvaluedb 0.1.8

Key-Value Database Wrapper
Documentation
//! Key-Value store abstraction.

#![deny(clippy::all)]

mod io_stats;

pub use io_stats::{IoStats, Kind as IoStatsKind};
use std::future::Future;
use std::io;
use std::pin::Pin;

/// Required length of prefixes.
pub const PREFIX_LEN: usize = 12;

/// Database value.
pub type DBValue = Vec<u8>;
pub type DBKey = Vec<u8>;
pub type DBKeyValue = (DBKey, DBValue);
pub type DBKeyRef<'a> = &'a DBKey;
pub type DBKeyValueRef<'a> = (&'a DBKey, &'a DBValue);

/// Write transaction. Batches a sequence of put/delete operations for efficiency.
#[derive(Default, Debug, Clone, Eq, PartialEq)]
pub struct DBTransaction {
    /// Database operations.
    pub ops: Vec<DBOp>,
}

/// Database operation.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum DBOp {
    Insert {
        col: u32,
        key: DBKey,
        value: DBValue,
    },
    Delete {
        col: u32,
        key: DBKey,
    },
    DeletePrefix {
        col: u32,
        prefix: DBKey,
    },
}

impl DBOp {
    /// Returns the key associated with this operation.
    pub fn key(&self) -> &DBKey {
        match *self {
            DBOp::Insert { ref key, .. } => key,
            DBOp::Delete { ref key, .. } => key,
            DBOp::DeletePrefix { ref prefix, .. } => prefix,
        }
    }
    /// Returns the value associated with this operation.
    pub fn value(&self) -> Option<&DBValue> {
        match *self {
            DBOp::Insert { ref value, .. } => Some(value),
            DBOp::Delete { .. } => None,
            DBOp::DeletePrefix { .. } => None,
        }
    }
    /// Returns the column associated with this operation.
    pub fn col(&self) -> u32 {
        match *self {
            DBOp::Insert { col, .. } => col,
            DBOp::Delete { col, .. } => col,
            DBOp::DeletePrefix { col, .. } => col,
        }
    }
}

impl DBTransaction {
    /// Create new transaction.
    pub fn new() -> DBTransaction {
        DBTransaction::with_capacity(256)
    }

    /// Create new transaction with capacity.
    pub fn with_capacity(cap: usize) -> DBTransaction {
        DBTransaction {
            ops: Vec::with_capacity(cap),
        }
    }

    /// Insert a key-value pair in the transaction. Any existing value will be overwritten upon write.
    pub fn put<K, V>(&mut self, col: u32, key: K, value: V)
    where
        K: AsRef<[u8]>,
        V: AsRef<[u8]>,
    {
        self.ops.push(DBOp::Insert {
            col,
            key: DBKey::from(key.as_ref()),
            value: DBValue::from(value.as_ref()),
        })
    }
    pub fn put_owned(&mut self, col: u32, key: Vec<u8>, value: Vec<u8>) {
        self.ops.push(DBOp::Insert { col, key, value })
    }

    /// Delete value by key.
    pub fn delete<K>(&mut self, col: u32, key: K)
    where
        K: AsRef<[u8]>,
    {
        self.ops.push(DBOp::Delete {
            col,
            key: DBKey::from(key.as_ref()),
        });
    }
    pub fn delete_owned(&mut self, col: u32, key: Vec<u8>) {
        self.ops.push(DBOp::Delete { col, key });
    }

    /// Delete all values with the given key prefix.
    /// Using an empty prefix here will remove all keys
    /// (all keys start with the empty prefix).
    pub fn delete_prefix<K>(&mut self, col: u32, prefix: K)
    where
        K: AsRef<[u8]>,
    {
        self.ops.push(DBOp::DeletePrefix {
            col,
            prefix: DBKey::from(prefix.as_ref()),
        });
    }
    pub fn delete_prefix_owned(&mut self, col: u32, prefix: Vec<u8>) {
        self.ops.push(DBOp::DeletePrefix { col, prefix });
    }
}

/// Transaction Result, returns the transaction unchanged upon error
#[derive(Debug)]
pub struct DBTransactionError {
    pub error: io::Error,
    pub transaction: DBTransaction,
}

impl std::fmt::Display for DBTransactionError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("TransactionError")
            .field("error", &self.error)
            .finish()
    }
}
impl std::error::Error for DBTransactionError {}

pub type KeyValueDBPinBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;

/// Generic key-value database.
///
/// The `KeyValueDB` deals with "column families", which can be thought of as distinct
/// stores within a database. Keys written in one column family will not be accessible from
/// any other. The number of column families must be specified at initialization, with a
/// differing interface for each database.
///
/// The API laid out here, along with the `Sync` bound implies interior synchronization for
/// implementation.
/// Clone is here so we can pass an owned self to async functions, requiring interior locked mutability.
pub trait KeyValueDB: Sync + Send + Clone + 'static {
    /// Helper to create a new transaction.
    fn transaction(&self) -> DBTransaction {
        DBTransaction::new()
    }

    /// Get a value by key.
    fn get<'a>(
        &'a self,
        col: u32,
        key: &'a [u8],
    ) -> KeyValueDBPinBoxFuture<'a, io::Result<Option<DBValue>>>;

    /// Remove a value by key, returning the old value
    fn delete<'a>(
        &'a self,
        col: u32,
        key: &'a [u8],
    ) -> KeyValueDBPinBoxFuture<'a, io::Result<Option<DBValue>>>;

    /// Write a transaction of changes to the backing store.
    fn write(
        &self,
        transaction: DBTransaction,
    ) -> KeyValueDBPinBoxFuture<'_, Result<(), DBTransactionError>>;

    /// Iterate over the data for a given column.
    /// Return all key/value pairs, optionally where the key starts with the given prefix.
    /// Iterator closure returns true for more items, false to stop iteration.
    fn iter<
        'a,
        T: Send + 'static,
        C: Send + 'static,
        F: FnMut(&mut C, DBKeyValueRef) -> io::Result<Option<T>> + Send + Sync + 'static,
    >(
        &'a self,
        col: u32,
        prefix: Option<&'a [u8]>,
        context: C,
        f: F,
    ) -> KeyValueDBPinBoxFuture<'a, io::Result<(C, Option<T>)>>;

    /// Iterate over the data for a given column.
    /// Return all keys, optionally where the key starts with the given prefix.
    /// Iterator closure returns true for more items, false to stop iteration.
    fn iter_keys<
        'a,
        T: Send + 'static,
        C: Send + 'static,
        F: FnMut(&mut C, DBKeyRef) -> io::Result<Option<T>> + Send + Sync + 'static,
    >(
        &'a self,
        col: u32,
        prefix: Option<&'a [u8]>,
        context: C,
        f: F,
    ) -> KeyValueDBPinBoxFuture<'a, io::Result<(C, Option<T>)>>;

    /// Query statistics.
    ///
    /// Not all keyvaluedb implementations are able or expected to implement this, so by
    /// default, empty statistics is returned. Also, not all keyvaluedb implementation
    /// can return every statistic or configured to do so (some statistics gathering
    /// may impede the performance and might be off by default).
    fn io_stats(&self, _kind: IoStatsKind) -> IoStats {
        IoStats::empty()
    }

    /// The number of column families in the db.
    fn num_columns(&self) -> io::Result<u32>;

    /// The number of keys in a column (estimated).
    fn num_keys(&self, col: u32) -> KeyValueDBPinBoxFuture<'_, io::Result<u64>>;

    /// Check for the existence of a value by key.
    fn has_key<'a>(
        &'a self,
        col: u32,
        key: &'a [u8],
    ) -> KeyValueDBPinBoxFuture<'a, io::Result<bool>> {
        let this = self.clone();
        Box::pin(async move { Ok(this.get(col, key).await?.is_some()) })
    }

    /// Check for the existence of a value by prefix.
    fn has_prefix<'a>(
        &'a self,
        col: u32,
        prefix: &'a [u8],
    ) -> KeyValueDBPinBoxFuture<'a, io::Result<bool>> {
        let this = self.clone();
        Box::pin(async move {
            let (_, out) = this
                .iter_keys(col, Some(prefix), (), |_, _| Ok(Some(())))
                .await?;
            Ok(out.is_some())
        })
    }

    /// Get the first value matching the given prefix.
    fn first_with_prefix<'a>(
        &'a self,
        col: u32,
        prefix: &'a [u8],
    ) -> KeyValueDBPinBoxFuture<'a, io::Result<Option<DBKeyValue>>> {
        let this = self.clone();
        Box::pin(async move {
            let (_, out) = this
                .iter(col, Some(prefix), (), |_, (k, v)| {
                    Ok(Some((k.to_vec(), v.to_vec())))
                })
                .await?;
            Ok(out)
        })
    }

    /// Cleanup/Vacuum database
    fn cleanup(&self) -> KeyValueDBPinBoxFuture<'_, io::Result<()>> {
        Box::pin(async { Ok(()) })
    }
}

/// For a given start prefix (inclusive), returns the correct end prefix (non-inclusive).
/// This assumes the key bytes are ordered in lexicographical order.
/// Since key length is not limited, for some case we return `None` because there is
/// no bounded limit (every keys in the serie `[]`, `[255]`, `[255, 255]` ...).
pub fn end_prefix(prefix: &[u8]) -> Option<Vec<u8>> {
    let mut end_range = prefix.to_vec();
    while let Some(0xff) = end_range.last() {
        end_range.pop();
    }
    if let Some(byte) = end_range.last_mut() {
        *byte += 1;
        Some(end_range)
    } else {
        None
    }
}

#[cfg(test)]
mod test {
    use super::end_prefix;

    #[test]
    fn end_prefix_test() {
        assert_eq!(end_prefix(&[5, 6, 7]), Some(vec![5, 6, 8]));
        assert_eq!(end_prefix(&[5, 6, 255]), Some(vec![5, 7]));
        // This is not equal as the result is before start.
        assert_ne!(end_prefix(&[5, 255, 255]), Some(vec![5, 255]));
        // This is equal ([5, 255] will not be deleted because
        // it is before start).
        assert_eq!(end_prefix(&[5, 255, 255]), Some(vec![6]));
        assert_eq!(end_prefix(&[255, 255, 255]), None);

        assert_eq!(end_prefix(&[0x00, 0xff]), Some(vec![0x01]));
        assert_eq!(end_prefix(&[0xff]), None);
        assert_eq!(end_prefix(&[]), None);
        assert_eq!(end_prefix(b"0"), Some(b"1".to_vec()));
    }
}