batpak 0.7.0

Event sourcing with causal graphs and policy gates. Sync API, zero async.
Documentation
pub(crate) mod scan;
pub(crate) mod sidx;

use crate::event::Event;
use crate::store::StoreError;
use serde::{Deserialize, Serialize};
use std::io::{Read, Seek, SeekFrom, Write};
// serde(with) resolves via string path — no explicit wire import needed.

/// Segment file format: magic(4) + header_len(4 BE) + header(msgpack) + frames
/// Frame: \[len:u32 BE\]\[crc32:u32 BE\]\[msgpack\]
/// Files named: {segment_id:06}.fbat. Sequential u64.
pub const SEGMENT_MAGIC: &[u8; 4] = b"FBAT";
/// File extension used for all segment files (without the leading dot).
pub const SEGMENT_EXTENSION: &str = "fbat";

/// Maximum allowed frame payload size in bytes. Frames claiming a payload
/// larger than this are rejected as corrupt before allocation, preventing
/// a malicious or corrupt segment file from causing unbounded memory use.
pub(crate) const MAX_FRAME_PAYLOAD: usize = 256 * 1024 * 1024;

/// Segment file header, serialized as MessagePack after the magic bytes.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SegmentHeader {
    /// Segment format version number.
    pub version: u16,
    /// Reserved flags field; currently always 0.
    pub flags: u16,
    /// Nanoseconds since Unix epoch when this segment was created.
    pub created_ns: i64,
    /// Numeric identifier of this segment file.
    pub segment_id: u64,
}

/// FramePayload: what gets serialized into each frame.
/// entity and scope are stored as strings (not Coordinate) because segments
/// are the persistence layer — they don't depend on the Coordinate type.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(crate) struct FramePayload<P> {
    /// The event data stored in this frame.
    pub event: Event<P>,
    /// Entity name string (e.g. `"user:42"`).
    pub entity: String,
    /// Scope name string (e.g. `"profile"`).
    pub scope: String,
}

#[derive(Serialize)]
pub(crate) struct FramePayloadRef<'a, P> {
    pub event: &'a Event<P>,
    pub entity: &'a str,
    pub scope: &'a str,
}

/// Typestate marker for an active (writable) segment.
pub struct Active;
/// Typestate marker for a sealed (read-only) segment.
pub struct Sealed;
/// A segment file handle parameterized by its lifecycle state (`Active` or `Sealed`).
pub struct Segment<State> {
    /// Parsed header of this segment file.
    pub header: SegmentHeader,
    /// Filesystem path to the segment file.
    pub path: std::path::PathBuf,
    file: Option<std::fs::File>,
    written_bytes: u64,
    _state: std::marker::PhantomData<State>,
}

/// Outcome of a compaction run: whether work happened, was skipped because
/// the sealed-segment count was below the configured threshold, or failed
/// inside the swap-point protocol. A `Failed` result guarantees the live
/// index was NOT mutated — the F6 / FREEZE-4 contract routes rebuild
/// failures here so callers can distinguish "did nothing" from "tried and
/// failed" from "did compact" without clobbering the reader-visible state.
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub enum CompactionOutcome {
    /// Compaction merged and replaced sealed segments.
    Performed,
    /// Compaction was a no-op: sealed-segment count was below `min_segments`.
    Skipped,
    /// The compact-swap protocol aborted before the swap point; the live
    /// index is unchanged. `reason` describes which step failed (off-side
    /// rebuild error, disk-side scan error, etc.). See
    /// `src/store/lifecycle.rs::compact` and
    /// `StoreIndex::replace_contents_from_fresh` for the swap invariants.
    Failed {
        /// Human-readable description of which swap-point step aborted.
        reason: String,
    },
}

/// Result returned by a compaction run.
#[derive(Debug)]
pub struct CompactionResult {
    /// Whether the compaction actually ran, was skipped, or failed.
    pub outcome: CompactionOutcome,
    /// Number of sealed segment files that were merged and removed. Always
    /// `0` for [`CompactionOutcome::Skipped`] and
    /// [`CompactionOutcome::Failed`].
    pub segments_removed: usize,
    /// Total bytes freed by removing the merged segment files. Always `0`
    /// for [`CompactionOutcome::Skipped`] and
    /// [`CompactionOutcome::Failed`].
    pub bytes_reclaimed: u64,
}

/// frame_encode: serialize data to msgpack, wrap in \[len:u32 BE\]\[crc32:u32 BE\]\[msgpack\]
/// Segment payloads are always encoded with `rmp_serde::to_vec_named()`.
/// \[DEP:rmp_serde::to_vec_named\] → `Result<Vec<u8>, encode::Error>`
/// \[DEP:crc32fast::hash\] → u32
///
/// # Errors
/// Returns `StoreError::Serialization` if the data cannot be serialized to MessagePack.
pub fn frame_encode<T: serde::Serialize>(data: &T) -> Result<Vec<u8>, StoreError> {
    let msgpack =
        rmp_serde::to_vec_named(data).map_err(|e| StoreError::Serialization(Box::new(e)))?;
    let crc = crc32fast::hash(&msgpack);
    let len = u32::try_from(msgpack.len()).map_err(|_| StoreError::ser_msg("frame exceeds 4GB"))?;

    let mut frame = Vec::with_capacity(8 + msgpack.len());
    frame.extend_from_slice(&len.to_be_bytes());
    frame.extend_from_slice(&crc.to_be_bytes());
    frame.extend_from_slice(&msgpack);
    Ok(frame)
}

/// Error from frame_decode. Does not include segment_id — the caller
/// wraps this with the correct segment context.
#[derive(Debug)]
#[non_exhaustive]
pub enum FrameDecodeError {
    /// The buffer is shorter than the minimum 8-byte frame header.
    TooShort,
    /// The buffer ends before the full frame payload is available.
    Truncated {
        /// Total bytes expected for the complete frame (header + payload).
        expected_len: usize,
        /// Bytes actually available in the buffer.
        available: usize,
    },
    /// The CRC32 checksum in the frame header did not match the payload.
    CrcMismatch {
        /// CRC value stored in the frame header.
        expected: u32,
        /// CRC value computed from the actual payload bytes.
        actual: u32,
    },
}

impl std::fmt::Display for FrameDecodeError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::TooShort => write!(f, "frame too short for header"),
            Self::Truncated {
                expected_len,
                available,
            } => {
                write!(
                    f,
                    "frame truncated: expected {expected_len} bytes, got {available}"
                )
            }
            Self::CrcMismatch { expected, actual } => {
                write!(
                    f,
                    "CRC mismatch: expected {expected:#010x}, got {actual:#010x}"
                )
            }
        }
    }
}

/// frame_decode: read \[len\]\[crc\]\[msgpack\], verify CRC, return msgpack bytes.
/// Returns (msgpack_bytes, total_frame_size_consumed).
///
/// # Errors
/// Returns `FrameDecodeError::TooShort` if the buffer is under 8 bytes.
/// Returns `FrameDecodeError::Truncated` if the buffer ends before the full frame payload.
/// Returns `FrameDecodeError::CrcMismatch` if the checksum does not match the payload.
pub fn frame_decode(buf: &[u8]) -> Result<(&[u8], usize), FrameDecodeError> {
    if buf.len() < 8 {
        return Err(FrameDecodeError::TooShort);
    }
    let len = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]) as usize;
    let expected_crc = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);

    // A6: explicit bounds check before slicing. `8 + len` can overflow
    // `usize` on 32-bit targets (where usize is 32-bit and a u32 payload
    // length can approach usize::MAX); the checked_add guards against
    // that case so we never index past the end of the buffer.
    let expected_len = 8usize.checked_add(len).ok_or(FrameDecodeError::Truncated {
        expected_len: usize::MAX,
        available: buf.len(),
    })?;
    if buf.len() < expected_len {
        return Err(FrameDecodeError::Truncated {
            expected_len,
            available: buf.len(),
        });
    }
    let msgpack = &buf[8..expected_len];
    let actual_crc = crc32fast::hash(msgpack);
    if actual_crc != expected_crc {
        return Err(FrameDecodeError::CrcMismatch {
            expected: expected_crc,
            actual: actual_crc,
        });
    }
    Ok((msgpack, expected_len))
}

/// Segment naming helper.
pub fn segment_filename(segment_id: u64) -> String {
    format!("{:06}.{}", segment_id, SEGMENT_EXTENSION)
}

impl Segment<Active> {
    /// Create new active segment.
    ///
    /// # Errors
    /// Returns `StoreError::Io` if the segment file cannot be created or the header cannot be written.
    /// Returns `StoreError::Serialization` if the segment header cannot be serialized.
    pub fn create(dir: &std::path::Path, segment_id: u64) -> Result<Self, StoreError> {
        let path = dir.join(segment_filename(segment_id));
        let mut file = crate::store::platform::fs::create_new_file(&path)?;

        let header = SegmentHeader {
            version: 1,
            flags: 0,
            created_ns: crate::store::platform::clock::now_wall_ns_saturating(),
            segment_id,
        };

        // Write magic + header_len(u32 BE) + header(msgpack)
        file.write_all(SEGMENT_MAGIC).map_err(StoreError::Io)?;
        let header_bytes =
            rmp_serde::to_vec_named(&header).map_err(|e| StoreError::Serialization(Box::new(e)))?;
        let header_len = u32::try_from(header_bytes.len())
            .map_err(|_| StoreError::ser_msg("segment header length exceeds u32::MAX"))?
            .to_be_bytes();
        file.write_all(&header_len).map_err(StoreError::Io)?;
        file.write_all(&header_bytes).map_err(StoreError::Io)?;

        Ok(Self {
            header,
            path,
            file: Some(file),
            written_bytes: (4 + 4 + header_bytes.len()) as u64, // magic + len + header
            _state: std::marker::PhantomData,
        })
    }

    /// Write a frame. Returns offset where frame starts.
    ///
    /// # Errors
    /// Returns `StoreError::Io` if writing to the segment file fails.
    pub fn write_frame(&mut self, frame: &[u8]) -> Result<u64, StoreError> {
        let offset = self.written_bytes;
        if let Some(ref mut f) = self.file {
            f.write_all(frame).map_err(StoreError::Io)?;
        }
        self.written_bytes += frame.len() as u64;
        Ok(offset)
    }

    /// Append all frame bytes from an existing segment file, skipping that file's header.
    ///
    /// # Errors
    /// Returns `StoreError::Io` if the source file cannot be read or frames cannot be written.
    /// Returns `StoreError::Corrupt` if the source file does not begin with the expected magic bytes.
    pub fn append_frames_from_segment(
        &mut self,
        path: &std::path::Path,
    ) -> Result<u64, StoreError> {
        let mut source = std::fs::File::open(path).map_err(StoreError::Io)?;
        let mut magic = [0u8; 4];
        source.read_exact(&mut magic).map_err(StoreError::Io)?;
        if &magic != SEGMENT_MAGIC {
            return Err(StoreError::corrupt_magic(0));
        }

        let mut header_len_buf = [0u8; 4];
        source
            .read_exact(&mut header_len_buf)
            .map_err(StoreError::Io)?;
        let header_len = u32::from_be_bytes(header_len_buf) as u64;
        let frames_start = 8 + header_len;

        // Determine where frames end: if the segment has a SIDX footer,
        // the frames stop at string_table_offset. Otherwise, frames extend
        // to the end of the file.
        let file_len = source.seek(SeekFrom::End(0)).map_err(StoreError::Io)?;
        let frames_end = detect_sidx_boundary(&mut source, file_len)?.unwrap_or(file_len);

        source
            .seek(SeekFrom::Start(frames_start))
            .map_err(StoreError::Io)?;

        let offset = self.written_bytes;
        if let Some(ref mut destination) = self.file {
            let bytes_to_copy = frames_end.saturating_sub(frames_start);
            let copied = std::io::copy(&mut source.take(bytes_to_copy), destination)
                .map_err(StoreError::Io)?;
            self.written_bytes += copied;
        }
        Ok(offset)
    }

    /// Returns `true` if the segment has reached or exceeded `max_bytes` and should be rotated.
    pub fn needs_rotation(&self, max_bytes: u64) -> bool {
        self.written_bytes >= max_bytes
    }

    /// Flush the segment file to durable storage using the specified sync mode.
    ///
    /// # Errors
    /// Returns `StoreError::Io` if the OS-level sync call fails.
    pub fn sync_with_mode(&mut self, mode: &crate::store::SyncMode) -> Result<(), StoreError> {
        if let Some(ref f) = self.file {
            crate::store::platform::sync::sync_file_with_mode(f, mode)?;
        }
        Ok(())
    }

    /// Write a SIDX footer to the end of this segment before sealing.
    /// The footer enables fast cold-start index rebuild by storing compact
    /// binary entries instead of requiring full msgpack frame deserialization.
    ///
    /// # Errors
    /// Returns `StoreError::Io`, `StoreError::Serialization`, or
    /// `StoreError::SegmentTooManyEntries` if writing fails.
    pub(crate) fn write_sidx_footer(
        &mut self,
        collector: &crate::store::segment::sidx::SidxEntryCollector,
    ) -> Result<(), StoreError> {
        if let Some(ref mut f) = self.file {
            collector.write_footer(f, self.header.segment_id)?;
        }
        Ok(())
    }

    /// Seal: close file handle, transition to Sealed.
    pub fn seal(mut self) -> Segment<Sealed> {
        drop(self.file.take());
        Segment {
            header: self.header,
            path: self.path,
            file: None,
            written_bytes: self.written_bytes,
            _state: std::marker::PhantomData,
        }
    }
}

/// Check whether a segment file ends with a SIDX footer.
/// If so, return the byte offset where the string table starts (= end of frames).
/// If not, return `None` (frames extend to EOF).
pub(crate) fn detect_sidx_boundary<R: Read + Seek>(
    source: &mut R,
    file_len: u64,
) -> Result<Option<u64>, StoreError> {
    // SIDX trailer is the last 16 bytes: [string_table_offset:u64 LE][entry_count:u32 LE][magic:4]
    const TRAILER_LEN: u64 = 16;
    if file_len < TRAILER_LEN {
        return Ok(None);
    }
    source
        .seek(SeekFrom::End(-(TRAILER_LEN as i64)))
        .map_err(StoreError::Io)?;
    let mut trailer = [0u8; 16];
    source.read_exact(&mut trailer).map_err(StoreError::Io)?;

    // Check magic at bytes 12..16
    if &trailer[12..16] != crate::store::segment::sidx::SIDX_MAGIC {
        return Ok(None);
    }
    // string_table_offset at bytes 0..8
    let string_table_offset = u64::from_le_bytes([
        trailer[0], trailer[1], trailer[2], trailer[3], trailer[4], trailer[5], trailer[6],
        trailer[7],
    ]);
    Ok(Some(string_table_offset))
}