mindb 0.1.2

Lightweight embedded key–value store with write-ahead log and zstd compression.
Documentation
#![allow(dead_code)]

use std::fs::{self, File};
use std::io::{BufReader, Read, Write, IoSlice};
use std::path::{Path, PathBuf};
use std::time::{SystemTime, UNIX_EPOCH};

use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use uuid::Uuid;
use xxhash_rust::xxh3::xxh3_128;

use crate::storage::{Result, StorageError, validate_segment_length};

const SEGMENT_MAGIC: &[u8; 8] = b"MINDSEG0";
const SEGMENT_VERSION: u32 = 1;
const PAGE_SIZE: usize = 1 << 12; // 4 KiB pages.

/// Compression settings used when writing a segment.
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(tag = "type", content = "level")]
pub enum CompressionCodec {
    /// Writes raw bytes without compression.
    None,
    /// Uses Zstandard with the configured compression level.
    Zstd(i32),
}

impl Default for CompressionCodec {
    fn default() -> Self {
        CompressionCodec::None
    }
}

impl CompressionCodec {
    fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
        match self {
            CompressionCodec::None => Ok(data.to_vec()),
            CompressionCodec::Zstd(level) => {
                let reader = std::io::Cursor::new(data);
                zstd::stream::encode_all(reader, *level)
                    .map_err(|err| StorageError::Compression(err.to_string()))
            }
        }
    }

    fn decompress(&self, data: &[u8]) -> Result<Vec<u8>> {
        match self {
            CompressionCodec::None => Ok(data.to_vec()),
            CompressionCodec::Zstd(_) => {
                let reader = std::io::Cursor::new(data);
                zstd::stream::decode_all(reader)
                    .map_err(|err| StorageError::Compression(err.to_string()))
            }
        }
    }
}

/// Metadata describing a segment page.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PageDescriptor {
    pub offset: u64,
    pub length: u32,
    pub checksum: u128,
}

/// Metadata describing an immutable segment file.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SegmentMetadata {
    pub id: String,
    pub path: PathBuf,
    pub size_bytes: u64,
    pub compressed_size_bytes: u64,
    pub compression: CompressionCodec,
    pub pages: Vec<PageDescriptor>,
    pub sha256: String,
    pub created_at_ms: u128,
}

impl SegmentMetadata {
    pub fn page_count(&self) -> usize {
        self.pages.len()
    }
}

/// Concrete handle for immutable on-disk segments.
#[derive(Clone, Debug)]
pub struct FileSegment {
    metadata: SegmentMetadata,
}

impl FileSegment {
    pub fn new(metadata: SegmentMetadata) -> Self {
        Self { metadata }
    }
}

impl crate::storage::Segment for FileSegment {
    fn metadata(&self) -> &SegmentMetadata {
        &self.metadata
    }

    fn open_reader(&self) -> Result<SegmentReader> {
        SegmentReader::open(&self.metadata.path)
    }
}

/// Streaming writer that constructs immutable segments.
pub struct SegmentWriter {
    path: PathBuf,
    codec: CompressionCodec,
    buffer: Vec<u8>,
    pages: Vec<PageDescriptor>,
}

impl SegmentWriter {
    pub fn new<P: Into<PathBuf>>(path: P, codec: CompressionCodec) -> Self {
        Self {
            path: path.into(),
            codec,
            buffer: Vec::new(),
            pages: Vec::new(),
        }
    }

    pub fn with_capacity<P: Into<PathBuf>>(path: P, codec: CompressionCodec, capacity: usize) -> Self {
        Self {
            path: path.into(),
            codec,
            buffer: Vec::with_capacity(capacity),
            pages: Vec::new(),
        }
    }

    pub fn append_block(&mut self, block: &[u8]) {
        let start_offset = self.buffer.len() as u64;
        self.buffer.extend_from_slice(block);

        let mut local_offset = 0usize;
        while local_offset < block.len() {
            let remaining = block.len() - local_offset;
            let len = remaining.min(PAGE_SIZE);
            let slice = &block[local_offset..local_offset + len];
            let checksum = xxh3_128(slice);
            self.pages.push(PageDescriptor {
                offset: start_offset + local_offset as u64,
                length: len as u32,
                checksum,
            });
            local_offset += len;
        }
    }

    pub fn append_entry(&mut self, key: &[u8], value: &[u8], sequence: u64, tombstone: bool) {
        let start_offset = self.buffer.len() as u64;
        // Encode header directly into the buffer (4 + 4 + 8 + 1)
        self.buffer.extend_from_slice(&(key.len() as u32).to_le_bytes());
        self.buffer.extend_from_slice(&(value.len() as u32).to_le_bytes());
        self.buffer.extend_from_slice(&sequence.to_le_bytes());
        self.buffer.push(if tombstone { 1 } else { 0 });
        // Append payloads
        self.buffer.extend_from_slice(key);
        self.buffer.extend_from_slice(value);

        // Compute page descriptors and checksums for the appended range
        let appended_len = 4 + 4 + 8 + 1 + key.len() + value.len();
        let mut local_offset = 0usize;
        while local_offset < appended_len {
            let remaining = appended_len - local_offset;
            let len = remaining.min(PAGE_SIZE);
            let start = (start_offset as usize) + local_offset;
            let end = start + len;
            let slice = &self.buffer[start..end];
            let checksum = xxh3_128(slice);
            self.pages.push(PageDescriptor {
                offset: start_offset + local_offset as u64,
                length: len as u32,
                checksum,
            });
            local_offset += len;
        }
    }

    pub fn bytes_written(&self) -> usize {
        self.buffer.len()
    }

    pub fn finish(mut self) -> Result<FileSegment> {
        if !self.buffer.is_empty() {
            let remainder = self.buffer.len() % PAGE_SIZE;
            if remainder != 0 {
                let pad_len = PAGE_SIZE - remainder;
                let new_len = self.buffer.len() + pad_len;
                self.buffer.resize(new_len, 0);
                if let Some(last_page) = self.pages.last_mut() {
                    last_page.length += pad_len as u32;
                    let start = last_page.offset as usize;
                    let end = start + last_page.length as usize;
                    last_page.checksum = xxh3_128(&self.buffer[start..end]);
                }
            }
        }

        validate_segment_length(self.buffer.len() as u64)?;
        if let Some(parent) = self.path.parent() {
            fs::create_dir_all(parent)?;
        }

        // Compute digest over uncompressed buffer always
        let mut sha = Sha256::new();
        sha.update(&self.buffer);
        let digest = sha.finalize();
        let sha256 = format!("{:x}", digest);

        // Prepare compressed size; for Zstd compute once and reuse for write.
        let mut compressed_vec: Option<Vec<u8>> = None;
        let compressed_size_bytes: u64 = match &self.codec {
            CompressionCodec::None => self.buffer.len() as u64,
            CompressionCodec::Zstd(level) => {
                let reader = std::io::Cursor::new(&self.buffer);
                let compressed = zstd::stream::encode_all(reader, *level)
                    .map_err(|err| StorageError::Compression(err.to_string()))?;
                let len = compressed.len() as u64;
                compressed_vec = Some(compressed);
                len
            }
        };

        let metadata = SegmentMetadata {
            id: Uuid::new_v4().to_string(),
            path: self.path.clone(),
            size_bytes: self.buffer.len() as u64,
            compressed_size_bytes,
            compression: self.codec.clone(),
            pages: self.pages.clone(),
            sha256,
            created_at_ms: SystemTime::now()
                .duration_since(UNIX_EPOCH)
                .unwrap_or_default()
                .as_millis(),
        };

        let metadata_json = serde_json::to_vec(&metadata)?;

        let mut file = File::create(&self.path)?;
        let version_bytes = SEGMENT_VERSION.to_le_bytes();
        let metadata_len_bytes = (metadata_json.len() as u32).to_le_bytes();

        // Build a small contiguous header buffer, then write payload.
        let mut header = Vec::with_capacity(
            SEGMENT_MAGIC.len() + version_bytes.len() + metadata_len_bytes.len() + metadata_json.len(),
        );
        header.extend_from_slice(SEGMENT_MAGIC);
        header.extend_from_slice(&version_bytes);
        header.extend_from_slice(&metadata_len_bytes);
        header.extend_from_slice(&metadata_json);
        file.write_all(&header)?;
        match &self.codec {
            CompressionCodec::None => file.write_all(&self.buffer)?,
            CompressionCodec::Zstd(_) => file.write_all(compressed_vec.as_ref().unwrap())?,
        }

        Ok(FileSegment::new(metadata))
    }
}

/// Segment reader that validates page checksums and optional compression.
pub struct SegmentReader {
    metadata: SegmentMetadata,
    buffer: Vec<u8>,
}

impl SegmentReader {
    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
        let file = File::open(path.as_ref())?;
        let mut reader = BufReader::new(file);

        let mut magic = [0u8; 8];
        reader.read_exact(&mut magic)?;
        if &magic != SEGMENT_MAGIC {
            return Err(StorageError::InvalidFormat("invalid segment magic".into()));
        }

        let mut version = [0u8; 4];
        reader.read_exact(&mut version)?;
        if u32::from_le_bytes(version) != SEGMENT_VERSION {
            return Err(StorageError::InvalidFormat(
                "unsupported segment version".into(),
            ));
        }

        let mut metadata_len = [0u8; 4];
        reader.read_exact(&mut metadata_len)?;
        let metadata_len = u32::from_le_bytes(metadata_len) as usize;
        let mut metadata_buf = vec![0u8; metadata_len];
        reader.read_exact(&mut metadata_buf)?;
        let mut metadata: SegmentMetadata = serde_json::from_slice(&metadata_buf)?;
        metadata.path = path.as_ref().to_path_buf();

        let mut compressed = Vec::new();
        reader.read_to_end(&mut compressed)?;
        if compressed.len() as u64 != metadata.compressed_size_bytes {
            metadata.compressed_size_bytes = compressed.len() as u64;
        }

        let buffer = metadata.compression.decompress(&compressed)?;
        if !buffer.is_empty() {
            let remainder = buffer.len() % PAGE_SIZE;
            if remainder != 0 {
                return Err(StorageError::InvalidFormat(
                    "segment buffer is not page aligned".into(),
                ));
            }
        }
        validate_segment_length(buffer.len() as u64)?;

        let mut sha = Sha256::new();
        sha.update(&buffer);
        let computed = format!("{:x}", sha.finalize());
        if computed != metadata.sha256 {
            return Err(StorageError::InvalidFormat(
                "segment digest mismatch".into(),
            ));
        }

        Ok(Self { metadata, buffer })
    }

    pub fn metadata(&self) -> &SegmentMetadata {
        &self.metadata
    }

    pub fn page_count(&self) -> usize {
        self.metadata.pages.len()
    }

    pub fn read_page(&self, index: usize) -> Result<&[u8]> {
        let page = self
            .metadata
            .pages
            .get(index)
            .ok_or_else(|| StorageError::InvalidFormat("page index out of bounds".into()))?;
        let start = page.offset as usize;
        let end = start + page.length as usize;
        let slice = self
            .buffer
            .get(start..end)
            .ok_or_else(|| StorageError::InvalidFormat("page slice outside of bounds".into()))?;
        let checksum = xxh3_128(slice);
        if checksum != page.checksum {
            return Err(StorageError::ChecksumMismatch {
                segment: self.metadata.id.clone(),
                page: index,
            });
        }
        Ok(slice)
    }

    pub fn as_bytes(&self) -> &[u8] {
        &self.buffer
    }
}