oxgraph-db 0.4.0

Standalone OxGraph-native database engine above the topology substrate.
Documentation
//! The writer's replayable mutation log and the WAL definition-body codec.
//!
//! [`MutationLog`] captures every mutation a writer applies, in order, as
//! fixed [`MutationOp`] records plus an interned blob for variable-length
//! names/text; the commit path serializes it verbatim into the WAL frame. The
//! codec functions below encode and decode the projection/index definition
//! bodies those ops reference.

use zerocopy::byteorder::{LE, U64};

use crate::{
    DbError,
    state::NextIds,
    wire::{self, MUTATION_OP_PAYLOAD_WORDS, MutationOp},
};

/// An ordered, replayable record of every mutation a writer applied, captured
/// AS each mutation happens so the WAL frame replays into a byte-identical
/// overlay. Each entry is a fixed [`MutationOp`]; variable-length names/text are
/// interned into [`Self::blob`] and referenced by `(offset, len)` payload words.
///
/// The writer records ops in two places — into its [`WriteOverlay`] maps (for
/// in-memory reads and the published overlay) and into this log (for the WAL) —
/// and the commit path serializes this log verbatim. Recovery decodes the same
/// ops and re-applies them through the same [`WriteOverlay`] mutators, so the
/// replayed overlay equals the committed one.
///
/// # Performance
///
/// Each push is `O(1)` amortized; interning a name is `O(name.len())`.
#[derive(Clone, Debug, Default)]
pub(crate) struct MutationLog {
    /// Mutation ops in application order.
    pub(super) ops: Vec<MutationOp>,
    /// Interned UTF-8 names/text referenced by `(offset, len)` payload words.
    pub(super) blob: Vec<u8>,
}

impl MutationLog {
    /// Returns whether any op has been recorded (a non-dirty writer logs none).
    ///
    /// # Performance
    ///
    /// This method is `O(1)`.
    pub(crate) const fn is_empty(&self) -> bool {
        self.ops.is_empty()
    }

    /// Interns a name/text value into the blob, returning its `(offset, len)` as
    /// `u64` payload words. Both fit `u64` for any in-memory buffer, so this is
    /// infallible; the overall frame `len` is `u32`-checked at append time.
    ///
    /// # Performance
    ///
    /// This method is `O(value.len())`.
    pub(super) fn intern(&mut self, value: &[u8]) -> (u64, u64) {
        let offset = self.blob.len() as u64;
        self.blob.extend_from_slice(value);
        let len = value.len() as u64;
        (offset, len)
    }

    /// Interns a `u64` definition-body word run into the blob as little-endian
    /// bytes, returning the byte `(offset, len)` of the run.
    ///
    /// # Performance
    ///
    /// This method is `O(words.len())`.
    pub(super) fn intern_words(&mut self, words: &[u64]) -> (u64, u64) {
        let offset = self.blob.len() as u64;
        for word in words {
            self.blob.extend_from_slice(&word.to_le_bytes());
        }
        let len = size_of_val(words) as u64;
        (offset, len)
    }

    /// Records one op with the given kind, packed flags, and leading payload
    /// words (remaining words zero).
    ///
    /// # Performance
    ///
    /// This method is `O(1)`.
    pub(super) fn push(&mut self, op_kind: u32, flags: u32, words: &[u64]) {
        let mut payload = [U64::<LE>::new(0); MUTATION_OP_PAYLOAD_WORDS];
        for (slot, value) in payload.iter_mut().zip(words) {
            *slot = U64::new(*value);
        }
        self.ops.push(MutationOp {
            op_kind: op_kind.into(),
            flags: flags.into(),
            payload,
        });
    }

    /// Appends the nine-value next-id watermark as the final op of a dirty
    /// frame, so recovery restores allocators without recomputing them.
    ///
    /// # Performance
    ///
    /// This method is `O(1)`.
    pub(super) fn push_watermark(&mut self, next: NextIds) {
        self.push(
            wire::OP_NEXT_ID_WATERMARK,
            0,
            &[
                next.element.get(),
                next.relation.get(),
                next.incidence.get(),
                next.role.get(),
                next.label.get(),
                next.relation_type.get(),
                next.property_key.get(),
                next.projection.get(),
                next.index.get(),
            ],
        );
    }
}

/// Reads a `(offset, len)` UTF-8 slice from a replay frame blob.
///
/// # Errors
///
/// Returns [`DbError::LogCorrupt`] when the slice is out of bounds or not UTF-8.
///
/// # Performance
///
/// This function is `O(len)`.
pub(super) fn blob_str(blob: &[u8], offset: u64, len: u64, lsn: u64) -> Result<String, DbError> {
    let start = usize::try_from(offset).map_err(|_overflow| {
        DbError::Storage(crate::error::StorageError::LogCorrupt {
            lsn,
            reason: "blob offset overflow",
        })
    })?;
    let length = usize::try_from(len).map_err(|_overflow| {
        DbError::Storage(crate::error::StorageError::LogCorrupt {
            lsn,
            reason: "blob length overflow",
        })
    })?;
    let end = start.checked_add(length).ok_or(DbError::Storage(
        crate::error::StorageError::LogCorrupt {
            lsn,
            reason: "blob slice overflow",
        },
    ))?;
    let bytes =
        blob.get(start..end)
            .ok_or(DbError::Storage(crate::error::StorageError::LogCorrupt {
                lsn,
                reason: "blob slice out of bounds",
            }))?;
    core::str::from_utf8(bytes)
        .map(str::to_owned)
        .map_err(|_error| {
            DbError::Storage(crate::error::StorageError::LogCorrupt {
                lsn,
                reason: "blob slice is not UTF-8",
            })
        })
}

/// Decodes the `(offset, len)` byte run of a definition body into its `u64`
/// words.
///
/// # Errors
///
/// Returns [`DbError::LogCorrupt`] when the run is out of bounds or not a whole
/// number of `u64` words.
///
/// # Performance
///
/// This function is `O(len)`.
pub(super) fn decode_def_words(
    blob: &[u8],
    offset: u64,
    len: u64,
    lsn: u64,
) -> Result<Vec<u64>, DbError> {
    let start = usize::try_from(offset).map_err(|_overflow| {
        DbError::Storage(crate::error::StorageError::LogCorrupt {
            lsn,
            reason: "def offset overflow",
        })
    })?;
    let length = usize::try_from(len).map_err(|_overflow| {
        DbError::Storage(crate::error::StorageError::LogCorrupt {
            lsn,
            reason: "def length overflow",
        })
    })?;
    let end = start.checked_add(length).ok_or(DbError::Storage(
        crate::error::StorageError::LogCorrupt {
            lsn,
            reason: "def slice overflow",
        },
    ))?;
    let bytes =
        blob.get(start..end)
            .ok_or(DbError::Storage(crate::error::StorageError::LogCorrupt {
                lsn,
                reason: "def slice out of bounds",
            }))?;
    if !bytes.len().is_multiple_of(size_of::<u64>()) {
        return Err(DbError::Storage(crate::error::StorageError::LogCorrupt {
            lsn,
            reason: "def slice is not whole u64 words",
        }));
    }
    Ok(bytes
        .chunks_exact(size_of::<u64>())
        .map(|chunk| {
            let mut word = [0u8; 8];
            word.copy_from_slice(chunk);
            u64::from_le_bytes(word)
        })
        .collect())
}