bonsaidb-local 0.4.1

Local database storage for BonsaiDb.
Documentation
use std::{
    collections::HashMap,
    path::{Path, PathBuf},
    sync::Arc,
    time::Duration,
};

#[cfg(feature = "encryption")]
use bonsaidb_core::document::KeyId;
use bonsaidb_core::{
    permissions::Permissions,
    schema::{Schema, SchemaName},
};
use sysinfo::{RefreshKind, System, SystemExt};

#[cfg(feature = "encryption")]
use crate::vault::AnyVaultKeyStorage;
use crate::{
    storage::{DatabaseOpener, StorageSchemaOpener},
    Error,
};

#[cfg(feature = "password-hashing")]
mod argon;
#[cfg(feature = "password-hashing")]
pub use argon::*;

/// Configuration options for [`Storage`](crate::storage::Storage).
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct StorageConfiguration {
    /// The path to the database. Defaults to `db.bonsaidb` if not specified.
    pub path: Option<PathBuf>,

    /// Prevents storing data on the disk. This is intended for testing purposes
    /// primarily. Keep in mind that the underlying storage format is
    /// append-only.
    pub memory_only: bool,

    /// The unique id of the server. If not specified, the server will randomly
    /// generate a unique id on startup. If the server generated an id and this
    /// value is subsequently set, the generated id will be overridden by the
    /// one specified here.
    pub unique_id: Option<u64>,

    /// The vault key storage to use. If not specified,
    /// [`LocalVaultKeyStorage`](crate::vault::LocalVaultKeyStorage) will be
    /// used with the server's data folder as the path. This is **incredibly
    /// insecure and should not be used outside of testing**.
    ///
    /// For secure encryption, it is important to store the vault keys in a
    /// location that is separate from the database. If the keys are on the same
    /// hardware as the encrypted content, anyone with access to the disk will
    /// be able to decrypt the stored data.
    #[cfg(feature = "encryption")]
    pub vault_key_storage: Option<Arc<dyn AnyVaultKeyStorage>>,

    /// The default encryption key for the database. If specified, all documents
    /// will be stored encrypted at-rest using the key specified. Having this
    /// key specified will also encrypt views. Without this, views will be
    /// stored unencrypted.
    #[cfg(feature = "encryption")]
    pub default_encryption_key: Option<KeyId>,

    /// Configuration options related to background tasks.
    pub workers: Tasks,

    /// Configuration options related to views.
    pub views: Views,

    /// Controls how the key-value store persists keys, on a per-database basis.
    pub key_value_persistence: KeyValuePersistence,

    /// Sets the default compression algorithm.
    #[cfg(feature = "compression")]
    pub default_compression: Option<Compression>,

    /// The permissions granted to authenticated connections to this server.
    pub authenticated_permissions: Permissions,

    /// Password hashing configuration.
    #[cfg(feature = "password-hashing")]
    pub argon: ArgonConfiguration,

    pub(crate) initial_schemas: HashMap<SchemaName, Arc<dyn DatabaseOpener>>,
}

impl Default for StorageConfiguration {
    fn default() -> Self {
        let system_specs = RefreshKind::new().with_cpu().with_memory();
        let mut system = System::new_with_specifics(system_specs);
        system.refresh_specifics(system_specs);
        Self {
            path: None,
            memory_only: false,
            unique_id: None,
            #[cfg(feature = "encryption")]
            vault_key_storage: None,
            #[cfg(feature = "encryption")]
            default_encryption_key: None,
            #[cfg(feature = "compression")]
            default_compression: None,
            workers: Tasks::default_for(&system),
            views: Views::default(),
            key_value_persistence: KeyValuePersistence::default(),
            authenticated_permissions: Permissions::default(),
            #[cfg(feature = "password-hashing")]
            argon: ArgonConfiguration::default_for(&system),
            initial_schemas: HashMap::default(),
        }
    }
}

impl StorageConfiguration {
    /// Registers the schema provided.
    pub fn register_schema<S: Schema>(&mut self) -> Result<(), Error> {
        // TODO this should error on duplicate registration.
        self.initial_schemas
            .insert(S::schema_name(), Arc::new(StorageSchemaOpener::<S>::new()?));
        Ok(())
    }
}

/// Configuration options for background tasks.
#[derive(Debug, Clone)]
pub struct Tasks {
    /// Defines how many workers should be spawned to process tasks. This
    /// defaults to the 2x the number of cpu cores available to the system or 2,
    /// whichever is larger.
    pub worker_count: usize,

    /// Defines how many simultaneous threads should be used when a task is
    /// parallelizable. This defaults to the nuber of cpu cores available to the
    /// system.
    pub parallelization: usize,
}

impl SystemDefault for Tasks {
    fn default_for(system: &System) -> Self {
        let num_cpus = system
            .physical_core_count()
            .unwrap_or(0)
            .max(system.processors().len())
            .max(1);
        Self {
            worker_count: num_cpus * 2,
            parallelization: num_cpus,
        }
    }
}

/// Configuration options for views.
#[derive(Clone, Debug, Default)]
pub struct Views {
    /// If true, the database will scan all views during the call to
    /// `open_local`. This will cause database opening to take longer, but once
    /// the database is open, no request will need to wait for the integrity to
    /// be checked. However, for faster startup time, you may wish to delay the
    /// integrity scan. Default value is `false`.
    pub check_integrity_on_open: bool,
}

/// Rules for persisting key-value changes. Default persistence is to
/// immediately persist all changes. While this ensures data integrity, the
/// overhead of the key-value store can be significantly reduced by utilizing
/// lazy persistence strategies that delay writing changes until certain
/// thresholds have been met.
///
/// ## Immediate persistence
///
/// The default persistence mode will trigger commits always:
///
/// ```rust
/// # use bonsaidb_local::config::KeyValuePersistence;
/// # use std::time::Duration;
/// assert!(!KeyValuePersistence::default().should_commit(0, Duration::ZERO));
/// assert!(KeyValuePersistence::default().should_commit(1, Duration::ZERO));
/// ```
///
/// ## Lazy persistence
///
/// Lazy persistence allows setting multiple thresholds, allowing for customized
/// behavior that can help tune performance, especially under write-heavy loads.
///
/// It is good practice to include one [`PersistenceThreshold`] that has no
/// duration, as it will ensure that the in-memory cache cannot exceed a certain
/// size. This number is counted for each database indepenently.
///
/// ```rust
/// # use bonsaidb_local::config::{KeyValuePersistence, PersistenceThreshold};
/// # use std::time::Duration;
/// #
/// let persistence = KeyValuePersistence::lazy([
///     PersistenceThreshold::after_changes(1).and_duration(Duration::from_secs(120)),
///     PersistenceThreshold::after_changes(10).and_duration(Duration::from_secs(10)),
///     PersistenceThreshold::after_changes(100),
/// ]);
///
/// // After 1 change and 60 seconds, no changes would be committed:
/// assert!(!persistence.should_commit(1, Duration::from_secs(60)));
/// // But on or after 120 seconds, that change will be committed:
/// assert!(persistence.should_commit(1, Duration::from_secs(120)));
///
/// // After 10 changes and 10 seconds, changes will be committed:
/// assert!(persistence.should_commit(10, Duration::from_secs(10)));
///
/// // Once 100 changes have been accumulated, this ruleset will always commit
/// // regardless of duration.
/// assert!(persistence.should_commit(100, Duration::ZERO));
/// ```
#[derive(Debug, Clone)]
#[must_use]
pub struct KeyValuePersistence(KeyValuePersistenceInner);

#[derive(Debug, Clone)]
enum KeyValuePersistenceInner {
    Immediate,
    Lazy(Vec<PersistenceThreshold>),
}

impl Default for KeyValuePersistence {
    /// Returns [`KeyValuePersistence::immediate()`].
    fn default() -> Self {
        Self::immediate()
    }
}

impl KeyValuePersistence {
    /// Returns a ruleset that commits all changes immediately.
    pub const fn immediate() -> Self {
        Self(KeyValuePersistenceInner::Immediate)
    }

    /// Returns a ruleset that lazily commits data based on a list of thresholds.
    pub fn lazy<II>(rules: II) -> Self
    where
        II: IntoIterator<Item = PersistenceThreshold>,
    {
        let mut rules = rules.into_iter().collect::<Vec<_>>();
        rules.sort_by(|a, b| a.number_of_changes.cmp(&b.number_of_changes));
        Self(KeyValuePersistenceInner::Lazy(rules))
    }

    /// Returns true if these rules determine that the outstanding changes should be persisted.
    #[must_use]
    pub fn should_commit(
        &self,
        number_of_changes: usize,
        elapsed_since_last_commit: Duration,
    ) -> bool {
        self.duration_until_next_commit(number_of_changes, elapsed_since_last_commit)
            == Some(Duration::ZERO)
    }

    pub(crate) fn duration_until_next_commit(
        &self,
        number_of_changes: usize,
        elapsed_since_last_commit: Duration,
    ) -> Option<Duration> {
        if number_of_changes == 0 {
            None
        } else {
            match &self.0 {
                KeyValuePersistenceInner::Immediate => Some(Duration::ZERO),
                KeyValuePersistenceInner::Lazy(rules) => {
                    let mut shortest_duration = Duration::MAX;
                    for rule in rules
                        .iter()
                        .take_while(|rule| rule.number_of_changes <= number_of_changes)
                    {
                        let remaining_time =
                            rule.duration.saturating_sub(elapsed_since_last_commit);
                        shortest_duration = shortest_duration.min(remaining_time);

                        if shortest_duration == Duration::ZERO {
                            break;
                        }
                    }
                    (shortest_duration < Duration::MAX).then(|| shortest_duration)
                }
            }
        }
    }
}

/// A threshold controlling lazy commits. For a threshold to apply, both
/// `number_of_changes` must be met or surpassed and `duration` must have
/// elpased since the last commit.
///
/// A threshold with a duration of zero will not wait any time to persist
/// changes once the specified `number_of_changes` has been met or surpassed.
#[derive(Debug, Copy, Clone)]
#[must_use]
pub struct PersistenceThreshold {
    /// The minimum number of changes that must have occurred for this threshold to apply.
    pub number_of_changes: usize,
    /// The amount of time that must elapse since the last write for this threshold to apply.
    pub duration: Duration,
}

impl PersistenceThreshold {
    /// Returns a threshold that applies after a number of changes have elapsed.
    pub const fn after_changes(number_of_changes: usize) -> Self {
        Self {
            number_of_changes,
            duration: Duration::ZERO,
        }
    }

    /// Sets the duration of this threshold to `duration` and returns self.
    pub const fn and_duration(mut self, duration: Duration) -> Self {
        self.duration = duration;
        self
    }
}

/// Storage configuration builder methods.
pub trait Builder: Default {
    /// Creates a default configuration with `path` set.
    #[must_use]
    fn new<P: AsRef<Path>>(path: P) -> Self {
        Self::default().path(path)
    }
    /// Registers the schema and returns self.
    fn with_schema<S: Schema>(self) -> Result<Self, Error>;

    /// Sets [`StorageConfiguration::path`](StorageConfiguration#structfield.memory_only) to true and returns self.
    #[must_use]
    fn memory_only(self) -> Self;
    /// Sets [`StorageConfiguration::path`](StorageConfiguration#structfield.path) to `path` and returns self.
    #[must_use]
    fn path<P: AsRef<Path>>(self, path: P) -> Self;
    /// Sets [`StorageConfiguration::unique_id`](StorageConfiguration#structfield.unique_id) to `unique_id` and returns self.
    #[must_use]
    fn unique_id(self, unique_id: u64) -> Self;
    /// Sets [`StorageConfiguration::vault_key_storage`](StorageConfiguration#structfield.vault_key_storage) to `key_storage` and returns self.
    #[cfg(feature = "encryption")]
    #[must_use]
    fn vault_key_storage<VaultKeyStorage: AnyVaultKeyStorage>(
        self,
        key_storage: VaultKeyStorage,
    ) -> Self;
    /// Sets [`StorageConfiguration::default_encryption_key`](StorageConfiguration#structfield.default_encryption_key) to `path` and returns self.
    #[cfg(feature = "encryption")]
    #[must_use]
    fn default_encryption_key(self, key: KeyId) -> Self;
    /// Sets [`Tasks::worker_count`] to `worker_count` and returns self.
    #[must_use]
    fn tasks_worker_count(self, worker_count: usize) -> Self;
    /// Sets [`Tasks::parallelization`] to `parallelization` and returns self.
    #[must_use]
    fn tasks_parallelization(self, parallelization: usize) -> Self;
    /// Sets [`Views::check_integrity_on_open`] to `check` and returns self.
    #[must_use]
    fn check_view_integrity_on_open(self, check: bool) -> Self;
    /// Sets [`StorageConfiguration::default_compression`](StorageConfiguration#structfield.default_compression) to `path` and returns self.
    #[cfg(feature = "compression")]
    #[must_use]
    fn default_compression(self, compression: Compression) -> Self;
    /// Sets [`StorageConfiguration::key_value_persistence`](StorageConfiguration#structfield.key_value_persistence) to `persistence` and returns self.
    #[must_use]
    fn key_value_persistence(self, persistence: KeyValuePersistence) -> Self;
    /// Sets [`Self::authenticated_permissions`](Self#structfield.authenticated_permissions) to `authenticated_permissions` and returns self.
    #[must_use]
    fn authenticated_permissions<P: Into<Permissions>>(self, authenticated_permissions: P) -> Self;
    /// Sets [`StorageConfiguration::argon`](StorageConfiguration#structfield.argon) to `argon` and returns self.
    #[cfg(feature = "password-hashing")]
    #[must_use]
    fn argon(self, argon: ArgonConfiguration) -> Self;
}

impl Builder for StorageConfiguration {
    fn with_schema<S: Schema>(mut self) -> Result<Self, Error> {
        self.register_schema::<S>()?;
        Ok(self)
    }

    fn memory_only(mut self) -> Self {
        self.memory_only = true;
        self
    }

    fn path<P: AsRef<Path>>(mut self, path: P) -> Self {
        self.path = Some(path.as_ref().to_owned());
        self
    }

    fn unique_id(mut self, unique_id: u64) -> Self {
        self.unique_id = Some(unique_id);
        self
    }

    #[cfg(feature = "encryption")]
    fn vault_key_storage<VaultKeyStorage: AnyVaultKeyStorage>(
        mut self,
        key_storage: VaultKeyStorage,
    ) -> Self {
        self.vault_key_storage = Some(Arc::new(key_storage));
        self
    }

    #[cfg(feature = "encryption")]
    fn default_encryption_key(mut self, key: KeyId) -> Self {
        self.default_encryption_key = Some(key);
        self
    }

    #[cfg(feature = "compression")]
    fn default_compression(mut self, compression: Compression) -> Self {
        self.default_compression = Some(compression);
        self
    }

    fn tasks_worker_count(mut self, worker_count: usize) -> Self {
        self.workers.worker_count = worker_count;
        self
    }

    fn tasks_parallelization(mut self, parallelization: usize) -> Self {
        self.workers.parallelization = parallelization;
        self
    }

    fn check_view_integrity_on_open(mut self, check: bool) -> Self {
        self.views.check_integrity_on_open = check;
        self
    }

    fn key_value_persistence(mut self, persistence: KeyValuePersistence) -> Self {
        self.key_value_persistence = persistence;
        self
    }

    fn authenticated_permissions<P: Into<Permissions>>(
        mut self,
        authenticated_permissions: P,
    ) -> Self {
        self.authenticated_permissions = authenticated_permissions.into();
        self
    }

    #[cfg(feature = "password-hashing")]
    fn argon(mut self, argon: ArgonConfiguration) -> Self {
        self.argon = argon;
        self
    }
}

pub(crate) trait SystemDefault: Sized {
    fn default_for(system: &System) -> Self;
    fn default() -> Self {
        let system_specs = RefreshKind::new().with_cpu().with_memory();
        let mut system = System::new_with_specifics(system_specs);
        system.refresh_specifics(system_specs);
        Self::default_for(&system)
    }
}

/// All available compression algorithms.
#[derive(Debug, Clone, Copy)]
pub enum Compression {
    /// Compress data using the
    /// [lz4](https://en.wikipedia.org/wiki/LZ4_(compression_algorithm))
    /// algorithm. This is powered by
    /// [lz4_flex](https://crates.io/crates/lz4_flex).
    Lz4 = 1,
}

impl Compression {
    #[must_use]
    #[cfg(feature = "compression")]
    pub(crate) fn from_u8(value: u8) -> Option<Self> {
        match value {
            1 => Some(Self::Lz4),
            _ => None,
        }
    }
}