mindb 0.1.2

Lightweight embedded key–value store with write-ahead log and zstd compression.
Documentation
#![allow(dead_code)]

use std::fs::{self, File};
use std::io::{BufWriter, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};

use crate::storage::{self, CompressionCodec, FileSegment, Segment, SegmentWriter};

use super::memtable::{FrozenMemTable, MemTableEntry};

/// Error type produced by the flush coordinator.
#[derive(Debug, thiserror::Error)]
pub enum FlushError {
    #[error("storage error: {0}")]
    Storage(#[from] storage::StorageError),
    #[error("io error: {0}")]
    Io(#[from] std::io::Error),
}

/// A single index sidecar entry.
#[derive(Clone, Debug)]
pub struct IndexEntry {
    pub key: MemTableKey,
    pub offset: u64,
}

/// Wrapper that keeps the key payload reference-counted for the flush output.
#[derive(Clone, Debug)]
pub struct MemTableKey(pub std::sync::Arc<[u8]>);

impl AsRef<[u8]> for MemTableKey {
    fn as_ref(&self) -> &[u8] {
        &self.0
    }
}

/// Materialised index sidecar generated by a flush.
#[derive(Clone, Debug)]
pub struct IndexSidecar {
    pub path: PathBuf,
    pub entries: Vec<IndexEntry>,
}

impl IndexSidecar {
    fn write_to_disk(&self) -> Result<(), std::io::Error> {
        if let Some(parent) = self.path.parent() {
            fs::create_dir_all(parent)?;
        }
        let estimated: usize = self
            .entries
            .iter()
            .map(|e| 4 + e.key.as_ref().len() + 8)
            .sum();
        let mut writer =
            BufWriter::with_capacity(estimated.max(8192), File::create(&self.path)?);
        for entry in &self.entries {
            let key = entry.key.as_ref();
            let len = key.len() as u32;
            writer.write_all(&len.to_le_bytes())?;
            writer.write_all(key)?;
            writer.write_all(&entry.offset.to_le_bytes())?;
        }
        writer.flush()?;
        Ok(())
    }
}

/// Result returned after a successful memtable flush.
#[derive(Clone, Debug)]
pub struct FlushOutcome {
    pub segment: FileSegment,
    pub index: IndexSidecar,
}

/// Coordinates turning frozen memtables into immutable segments plus index sidecars.
pub struct FlushCoordinator {
    base_dir: PathBuf,
    compression: CompressionCodec,
    next_id: AtomicU64,
}

impl FlushCoordinator {
    pub fn new<P: AsRef<Path>>(base_dir: P, compression: CompressionCodec) -> Self {
        Self {
            base_dir: base_dir.as_ref().to_path_buf(),
            compression,
            next_id: AtomicU64::new(0),
        }
    }

    fn segments_dir(&self) -> PathBuf {
        self.base_dir.join("segments")
    }

    fn index_dir(&self) -> PathBuf {
        self.base_dir.join("indexes")
    }

    fn encode_entry(entry: &MemTableEntry, buffer: &mut Vec<u8>) {
        buffer.clear();
        buffer.reserve(entry.key.len() + entry.value.len() + 32);
        buffer.extend_from_slice(&(entry.key.len() as u32).to_le_bytes());
        buffer.extend_from_slice(&(entry.value.len() as u32).to_le_bytes());
        buffer.extend_from_slice(&entry.sequence.to_le_bytes());
        buffer.push(entry.tombstone as u8);
        buffer.extend_from_slice(entry.key.as_ref());
        buffer.extend_from_slice(entry.value.as_ref());
    }

    /// Flushes a frozen memtable into an immutable segment and index sidecar.
    pub fn flush(&self, frozen: FrozenMemTable) -> Result<FlushOutcome, FlushError> {
        let entries = frozen.into_entries();
        if entries.is_empty() {
            let empty_path = self.index_dir().join("empty.idx");
            let sidecar = IndexSidecar {
                path: empty_path,
                entries: Vec::new(),
            };
            return Ok(FlushOutcome {
                segment: FileSegment::new(storage::SegmentMetadata {
                    id: String::new(),
                    path: PathBuf::new(),
                    size_bytes: 0,
                    compressed_size_bytes: 0,
                    compression: self.compression.clone(),
                    pages: Vec::new(),
                    sha256: String::new(),
                    created_at_ms: 0,
                }),
                index: sidecar,
            });
        }

        let flush_id = self.next_id.fetch_add(1, Ordering::SeqCst);
        let segment_path = self
            .segments_dir()
            .join(format!("flush-{flush_id:016x}.seg"));
        // Pre-size the segment writer buffer to the total encoded payload to reduce reallocations.
        let total_payload: usize = entries
            .iter()
            .map(|e| 4 + 4 + 8 + 1 + e.key.len() + e.value.len())
            .sum();
        let mut writer = SegmentWriter::with_capacity(&segment_path, self.compression.clone(), total_payload);

        let mut index_entries = Vec::with_capacity(entries.len());

        for entry in &entries {
            let offset = writer.bytes_written() as u64;
            writer.append_entry(entry.key.as_ref(), entry.value.as_ref(), entry.sequence, entry.tombstone);
            index_entries.push(IndexEntry {
                key: MemTableKey(entry.key.clone()),
                offset,
            });
        }

        let segment = writer.finish()?;

        let sidecar_path = self
            .index_dir()
            .join(segment.metadata().id.clone())
            .with_extension("idx");
        let sidecar = IndexSidecar {
            path: sidecar_path,
            entries: index_entries,
        };
        sidecar.write_to_disk()?;

        Ok(FlushOutcome {
            segment,
            index: sidecar,
        })
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::storage::CompressionCodec;
    use crate::write::memtable::MemTable;

    fn encoded_len(entry: &MemTableEntry) -> usize {
        4  // key length prefix
        + 4 // value length prefix
        + 8 // sequence number
        + 1 // tombstone flag
        + entry.key.len()
        + entry.value.len()
    }

    fn align_to_page(len: usize) -> usize {
        const PAGE_SIZE: usize = 4096;
        if len == 0 {
            0
        } else {
            ((len + PAGE_SIZE - 1) / PAGE_SIZE) * PAGE_SIZE
        }
    }

    #[test]
    fn flush_segment_matches_payload_size() {
        let tempdir = tempfile::tempdir().expect("tempdir");
        let table = MemTable::new();
        table
            .put(b"key-a", b"value-a".to_vec())
            .expect("put should succeed");
        table
            .put(b"key-b", b"value-b-with-some-extra-bytes".to_vec())
            .expect("put should succeed");

        let frozen = table.freeze();
        let expected_payload: usize = frozen.entries().iter().map(encoded_len).sum();
        assert_ne!(expected_payload % 4096, 0);

        let coordinator = FlushCoordinator::new(tempdir.path(), CompressionCodec::None);
        let outcome = coordinator
            .flush(frozen)
            .expect("flush should succeed without padding");

        let aligned_expected = align_to_page(expected_payload);
        assert_eq!(
            outcome.segment.metadata().size_bytes as usize,
            aligned_expected,
            "segment should be padded only to the next page boundary"
        );

        let total_page_bytes: usize = outcome
            .segment
            .metadata()
            .pages
            .iter()
            .map(|page| page.length as usize)
            .sum();
        assert_eq!(total_page_bytes, aligned_expected);
        assert!(
            outcome
                .segment
                .metadata()
                .pages
                .iter()
                .all(|page| page.length as usize <= 4096)
        );
    }
}