wal-db 0.5.0

Write-ahead log primitive for Rust storage engines. Durable, recoverable, lock-free append path. The WAL substrate under lsm-db, txn-db, raft-io, and Hive DB.
Documentation
//! On-disk record framing.
//!
//! Every record is written as a fixed 8-byte header followed by its payload:
//!
//! ```text
//! +-----------+-----------+----------------------+
//! | crc32c    | length    | payload              |
//! | 4 bytes   | 4 bytes   | `length` bytes       |
//! +-----------+-----------+----------------------+
//! ```
//!
//! Both integers are little-endian, fixed regardless of host byte order, so a
//! log written on one machine reads back identically on another.
//!
//! There is no sequence-number field: a record's LSN is its byte offset in the
//! log, which recovery already knows as it scans, so storing it again would
//! waste eight bytes on every record.
//!
//! The checksum covers everything after it — the length and the payload. Placing
//! it first lets recovery read it before it knows how many payload bytes follow,
//! then confirm the whole record once those bytes are in hand. A torn write (a
//! crash partway through appending) leaves either too few bytes to form a record
//! or a payload that no longer matches the checksum; either way it is detected.
//!
//! The algorithm is CRC32C (Castagnoli). It is the standard choice for storage
//! checksums: stronger error detection than the IEEE CRC32 used by zip, and
//! backed by a dedicated CPU instruction on x86-64 (SSE4.2) and aarch64.

/// Byte offset of the checksum within the header.
pub(crate) const CRC_OFFSET: usize = 0;
/// Byte offset of the payload-length field within the header.
pub(crate) const LEN_OFFSET: usize = 4;
/// Total header size in bytes.
pub(crate) const HEADER_LEN: usize = 8;

/// The parsed fields of a record header. The payload still has to be read and
/// checked against [`Header::crc`].
#[derive(Debug, Clone, Copy)]
pub(crate) struct Header {
    /// Stored checksum of the length and the payload.
    pub crc: u32,
    /// Declared payload length in bytes. Untrusted until the checksum verifies.
    pub len: u32,
}

/// The total on-disk size of a record with a payload of `payload_len` bytes.
#[inline]
pub(crate) fn framed_len(payload_len: usize) -> usize {
    HEADER_LEN + payload_len
}

/// Frame `payload` into `buf`, replacing whatever `buf` held.
///
/// `buf` is the caller's reusable scratch space: it is cleared and refilled, so
/// once it has grown to fit typical records no further allocation happens. The
/// append path reuses a thread-local buffer, making steady-state appends
/// allocation-free.
///
/// The caller must have already established that `payload.len()` fits in a
/// `u32` (the log enforces this through the maximum record size). The debug
/// assertion documents that contract; in release builds an over-long payload
/// would have been rejected before reaching here.
#[inline]
pub(crate) fn encode(buf: &mut Vec<u8>, payload: &[u8]) {
    debug_assert!(
        payload.len() <= u32::MAX as usize,
        "payload length must fit in u32"
    );

    buf.clear();
    buf.reserve(framed_len(payload.len()));
    buf.extend_from_slice(&[0u8; 4]); // checksum placeholder, overwritten below
    buf.extend_from_slice(&(payload.len() as u32).to_le_bytes());
    buf.extend_from_slice(payload);

    // Checksum covers the length and the payload — everything past the 4-byte
    // checksum field itself.
    let crc = crc32c::crc32c(&buf[LEN_OFFSET..]);
    buf[CRC_OFFSET..LEN_OFFSET].copy_from_slice(&crc.to_le_bytes());
}

/// Parse the two header fields out of a full header's bytes.
#[inline]
pub(crate) fn parse_header(bytes: &[u8; HEADER_LEN]) -> Header {
    let crc = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
    let len = u32::from_le_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]);
    Header { crc, len }
}

/// Recompute the checksum of a record from its header bytes and payload, and
/// compare it to the value the header claims.
///
/// Returns `true` when the record is intact. The computation mirrors
/// [`encode`]: the checksum runs over the length (the header past the checksum
/// field) and then the payload, which by the streaming property of CRC32C
/// equals the checksum of those bytes concatenated.
#[inline]
pub(crate) fn verify(header_bytes: &[u8; HEADER_LEN], payload: &[u8], expected_crc: u32) -> bool {
    let partial = crc32c::crc32c(&header_bytes[LEN_OFFSET..HEADER_LEN]);
    let crc = crc32c::crc32c_append(partial, payload);
    crc == expected_crc
}

#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
    use super::*;

    fn header_array(buf: &[u8]) -> [u8; HEADER_LEN] {
        buf[..HEADER_LEN].try_into().unwrap()
    }

    #[test]
    fn test_encode_layout_and_roundtrip() {
        let mut buf = Vec::new();
        encode(&mut buf, b"hello");

        assert_eq!(buf.len(), framed_len(5));

        let header = parse_header(&header_array(&buf));
        assert_eq!(header.len, 5);

        let payload = &buf[HEADER_LEN..];
        assert_eq!(payload, b"hello");
        assert!(verify(&header_array(&buf), payload, header.crc));
    }

    #[test]
    fn test_encode_empty_payload() {
        let mut buf = Vec::new();
        encode(&mut buf, b"");
        assert_eq!(buf.len(), HEADER_LEN);

        let header = parse_header(&header_array(&buf));
        assert_eq!(header.len, 0);
        assert!(verify(&header_array(&buf), &[], header.crc));
    }

    #[test]
    fn test_streaming_crc_equals_contiguous_crc() {
        // The verify path checksums the header tail and payload in two chunks;
        // encode checksums them as one. They must agree.
        let mut buf = Vec::new();
        encode(&mut buf, b"some bytes here");
        let header = parse_header(&header_array(&buf));
        let contiguous = crc32c::crc32c(&buf[LEN_OFFSET..]);
        assert_eq!(contiguous, header.crc);
    }

    #[test]
    fn test_flipped_payload_byte_fails_verify() {
        let mut buf = Vec::new();
        encode(&mut buf, b"payload");
        let header = parse_header(&header_array(&buf));

        let mut payload = buf[HEADER_LEN..].to_vec();
        payload[0] ^= 0x01;
        assert!(!verify(&header_array(&buf), &payload, header.crc));
    }

    #[test]
    fn test_flipped_length_byte_fails_verify() {
        let mut buf = Vec::new();
        encode(&mut buf, b"payload");
        // Corrupt the length field; the checksum (which covers it) must reject.
        buf[LEN_OFFSET] ^= 0x01;
        let header = parse_header(&header_array(&buf));
        let payload = &buf[HEADER_LEN..];
        assert!(!verify(&header_array(&buf), payload, header.crc));
    }

    #[test]
    fn test_reused_buffer_does_not_leak_previous_record() {
        let mut buf = Vec::new();
        encode(&mut buf, b"a longer first record");
        encode(&mut buf, b"short");
        assert_eq!(buf.len(), framed_len(5));

        let header = parse_header(&header_array(&buf));
        assert_eq!(&buf[HEADER_LEN..], b"short");
        assert!(verify(&header_array(&buf), &buf[HEADER_LEN..], header.crc));
    }
}