fjall 3.1.4

Log-structured, embeddable key-value storage engine
Documentation
// Copyright (c) 2024-present, fjall-rs
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)

pub mod item;

use crate::{Database, Keyspace, PersistMode};
use item::Item;
use lsm_tree::{AbstractTree, UserKey, UserValue, ValueType};
use std::collections::HashSet;

/// An atomic write batch
///
/// Allows atomically writing across keyspaces inside the [`Database`].
pub struct WriteBatch {
    pub(crate) data: Vec<Item>,
    db: Database,
    durability: Option<PersistMode>,
}

impl WriteBatch {
    /// Initializes a new write batch.
    ///
    /// This function is called by [`Database::batch`].
    pub(crate) fn new(db: Database) -> Self {
        Self {
            data: Vec::new(),
            db,
            durability: None,
        }
    }

    /// Initializes a new write batch with preallocated capacity.
    ///
    /// ### Note
    ///
    /// "Capacity" refers to the number of batch item slots, not their size in memory.
    #[must_use]
    pub fn with_capacity(db: Database, capacity: usize) -> Self {
        Self {
            data: Vec::with_capacity(capacity),
            db,
            durability: None,
        }
    }

    /// Gets the number of batched items.
    #[must_use]
    pub fn len(&self) -> usize {
        self.data.len()
    }

    /// Returns `true` if there are no batches items (yet).
    #[must_use]
    pub fn is_empty(&self) -> bool {
        self.len() == 0
    }

    /// Sets the durability level.
    #[must_use]
    pub fn durability(mut self, mode: Option<PersistMode>) -> Self {
        self.durability = mode;
        self
    }

    /// Inserts a key-value pair into the batch.
    pub fn insert<K: Into<UserKey>, V: Into<UserValue>>(&mut self, p: &Keyspace, key: K, value: V) {
        self.data
            .push(Item::new(p.clone(), key, value, ValueType::Value));
    }

    /// Removes a key-value pair.
    pub fn remove<K: Into<UserKey>>(&mut self, p: &Keyspace, key: K) {
        self.data
            .push(Item::new(p.clone(), key, vec![], ValueType::Tombstone));
    }

    /// Adds a weak tombstone marker for a key.
    ///
    /// The tombstone marker of this delete operation will vanish when it
    /// collides with its corresponding insertion.
    /// This may cause older versions of the value to be resurrected, so it should
    /// only be used and preferred in scenarios where a key is only ever written once.
    ///
    /// # Experimental
    ///
    /// This function is currently experimental.
    #[doc(hidden)]
    pub fn remove_weak<K: Into<UserKey>>(&mut self, p: &Keyspace, key: K) {
        self.data
            .push(Item::new(p.clone(), key, vec![], ValueType::WeakTombstone));
    }

    /// Commits the batch to the [`Database`] atomically.
    ///
    /// # Errors
    ///
    /// Will return `Err` if an IO error occurs.
    #[allow(clippy::missing_panics_doc)]
    pub fn commit(mut self) -> crate::Result<()> {
        use std::sync::atomic::Ordering;

        if self.is_empty() {
            return Ok(());
        }

        log::trace!("batch: Acquiring journal writer");
        let mut journal_writer = self.db.supervisor.journal.get_writer();

        // IMPORTANT: Check the poisoned flag after getting journal mutex, otherwise TOCTOU
        if self.db.is_poisoned.load(Ordering::Relaxed) {
            return Err(crate::Error::Poisoned);
        }

        let batch_seqno = self.db.supervisor.seqno.next();

        let _ = journal_writer.write_batch(self.data.iter(), self.data.len(), batch_seqno);

        if let Some(mode) = self.durability {
            if let Err(e) = journal_writer.persist(mode) {
                self.db.is_poisoned.store(true, Ordering::Release);

                log::error!(
                    "persist failed, which is a FATAL, and possibly hardware-related, failure: {e:?}"
                );

                return Err(crate::Error::Poisoned);
            }
        }

        // TODO: maybe we can use a stack alloc hashset/vec here, such as smallset
        #[expect(clippy::mutable_key_type)]
        let mut keyspaces_with_possible_stall = HashSet::new();

        #[expect(clippy::expect_used)]
        let keyspaces = self
            .db
            .supervisor
            .keyspaces
            .read()
            .expect("lock is poisoned");

        let mut batch_size = 0u64;

        log::trace!("Applying batch (size={}) to memtable(s)", self.data.len());

        for item in std::mem::take(&mut self.data) {
            // TODO: need a better, generic write op
            let (item_size, _) = match item.value_type {
                ValueType::Value => item.keyspace.tree.insert(item.key, item.value, batch_seqno),
                ValueType::Tombstone => item.keyspace.tree.remove(item.key, batch_seqno),
                ValueType::WeakTombstone => item.keyspace.tree.remove_weak(item.key, batch_seqno),
                ValueType::Indirection => unreachable!(),
            };

            batch_size += item_size;

            // IMPORTANT: Clone the handle, because we don't want to keep the keyspaces lock open
            keyspaces_with_possible_stall.insert(item.keyspace.clone());
        }

        self.db.supervisor.snapshot_tracker.publish(batch_seqno);

        drop(journal_writer);

        log::trace!("batch: Freed journal writer");

        drop(keyspaces);

        // IMPORTANT: Add batch size to current write buffer size
        // Otherwise write buffer growth is unbounded when using batches
        self.db.supervisor.write_buffer_size.allocate(batch_size);

        // Check each affected keyspace for write stall/halt
        for keyspace in &keyspaces_with_possible_stall {
            let memtable_size = keyspace.tree.active_memtable().size();
            keyspace.check_memtable_rotate(memtable_size);
            keyspace.local_backpressure();
        }

        Ok(())
    }
}