armdb 0.1.12

sharded bitcask key-value storage optimized for NVMe
Documentation
use std::fs::{self, File};
use std::path::{Path, PathBuf};

use zerocopy::FromBytes;

use crate::entry::{EntryHeader, compute_crc32, entry_size};
use crate::error::DbResult;
use crate::hint;

/// A raw entry read from the log.
pub struct RawEntry {
    /// Complete serialized entry (header + key + value + padding).
    pub data: Vec<u8>,
    /// GSN extracted from header (sequence number, no tombstone bit).
    pub gsn: u64,
    /// Source file ID.
    pub file_id: u32,
    /// Byte offset of the entry start within the file.
    pub file_offset: u64,
    /// Key length that matched CRC.
    pub key_len: u16,
}

/// Reads entries from a shard's data files starting from a GSN offset.
/// Used for catch-up replication (initial sync, reconnect, SPSC overflow).
pub struct ShardLogReader {
    shard_dir: PathBuf,
    file_ids: Vec<u32>,
    current_file_idx: usize,
    current_file: Option<File>,
    current_offset: u64,
    current_file_len: u64,
    key_lens: Vec<usize>,
    last_matched_k: usize,
    /// Read-ahead buffer for efficient sequential scanning.
    read_buf: Vec<u8>,
    read_buf_offset: u64,
    read_buf_len: usize,
}

const READ_AHEAD_SIZE: usize = 64 * 1024;
const HEADER_SIZE: usize = size_of::<EntryHeader>();

impl ShardLogReader {
    /// Create a new log reader starting from `from_gsn`.
    pub fn new(shard_dir: PathBuf, from_gsn: u64, key_lens: Vec<usize>) -> DbResult<Self> {
        let file_ids = scan_data_files(&shard_dir)?;
        let last_k = key_lens.first().copied().unwrap_or(0);

        let mut reader = Self {
            shard_dir,
            file_ids,
            current_file_idx: 0,
            current_file: None,
            current_offset: 0,
            current_file_len: 0,
            key_lens,
            last_matched_k: last_k,
            read_buf: vec![0u8; READ_AHEAD_SIZE],
            read_buf_offset: 0,
            read_buf_len: 0,
        };

        if from_gsn > 0 {
            reader.seek_to_gsn(from_gsn)?;
        } else if !reader.file_ids.is_empty() {
            reader.open_file(0)?;
        }

        Ok(reader)
    }

    /// Read the next entry. Returns None at end of available data.
    pub fn next_entry(&mut self) -> DbResult<Option<RawEntry>> {
        loop {
            if self.current_file.is_none() {
                return Ok(None);
            }

            // Try to read header from current position
            let header_bytes = match self.read_bytes(HEADER_SIZE)? {
                Some(b) => b,
                None => {
                    // Move to next file
                    if !self.advance_file()? {
                        return Ok(None);
                    }
                    continue;
                }
            };

            let header = match EntryHeader::read_from_bytes(&header_bytes) {
                Ok(h) => h,
                Err(_) => {
                    // Corrupted header — skip rest of file
                    if !self.advance_file()? {
                        return Ok(None);
                    }
                    continue;
                }
            };

            // Try each key_len to determine entry size. Start with last matched K.
            let entry_offset = self.current_offset;
            let file_id = self.file_ids[self.current_file_idx];

            if let Some((total_size, matched_k)) = self.resolve_entry_size(&header)? {
                self.last_matched_k = matched_k;

                // Read complete entry
                let data = match self.read_bytes_from(entry_offset, total_size)? {
                    Some(d) => d,
                    None => {
                        // Partial entry at EOF — move to next file
                        if !self.advance_file()? {
                            return Ok(None);
                        }
                        continue;
                    }
                };

                self.current_offset = entry_offset + total_size as u64;

                return Ok(Some(RawEntry {
                    data,
                    gsn: header.sequence(),
                    file_id,
                    file_offset: entry_offset,
                    key_len: matched_k as u16,
                }));
            } else {
                // No key_len matched — corrupted or unknown tree. Skip header and try next.
                self.current_offset += HEADER_SIZE as u64;
            }
        }
    }

    /// Try each known key_len to find the correct entry size via CRC matching.
    fn resolve_entry_size(&mut self, header: &EntryHeader) -> DbResult<Option<(usize, usize)>> {
        // Try last matched K first (cache hit ~90%)
        let last_k = self.last_matched_k;
        if last_k > 0
            && let Some(result) = self.try_key_len(header, last_k)?
        {
            return Ok(Some(result));
        }

        // Try other K values — copy the list to avoid borrow conflict
        let key_lens: Vec<usize> = self.key_lens.clone();
        for k in key_lens {
            if k == last_k {
                continue;
            }
            if let Some(result) = self.try_key_len(header, k)? {
                return Ok(Some(result));
            }
        }

        Ok(None)
    }

    /// Try a specific key_len: read the entry, verify CRC.
    fn try_key_len(&mut self, header: &EntryHeader, k: usize) -> DbResult<Option<(usize, usize)>> {
        let total = entry_size(k, header.value_len) as usize;
        let entry_offset = self.current_offset;

        let data = match self.peek_bytes_from(entry_offset, total)? {
            Some(d) => d,
            None => return Ok(None),
        };

        if data.len() < 16 + k + header.value_len as usize {
            return Ok(None);
        }

        let key = &data[16..16 + k];
        let value = &data[16 + k..16 + k + header.value_len as usize];
        let expected_crc = compute_crc32(header.gsn, header.value_len, key, value);

        if expected_crc == header.crc32 {
            Ok(Some((total, k)))
        } else {
            Ok(None)
        }
    }

    /// Seek to the first entry with GSN >= target_gsn.
    fn seek_to_gsn(&mut self, target_gsn: u64) -> DbResult<()> {
        // Try hint files first for faster seeking
        for (idx, &fid) in self.file_ids.iter().enumerate() {
            let hint_path = self.shard_dir.join(format!("{fid:06}.hint"));
            if !hint_path.exists() {
                continue;
            }

            // Check if this file might contain our target GSN
            // by scanning hint entries (much smaller than data file)
            if let Some(hint_data) = hint::read_hint_file(&hint_path)? {
                // Find the smallest K to determine hint entry size
                for &k in &self.key_lens {
                    let hint_entry_size = hint::hint_entry_size(k);
                    if hint_data.len() % hint_entry_size != 0 {
                        continue;
                    }

                    // Check if last hint entry's GSN >= target
                    let entry_count = hint_data.len() / hint_entry_size;
                    if entry_count == 0 {
                        continue;
                    }

                    let last_entry_start = (entry_count - 1) * hint_entry_size;
                    let last_gsn = u64::from_ne_bytes(
                        hint_data[last_entry_start..last_entry_start + 8]
                            .try_into()
                            .expect("8 bytes"),
                    );
                    let last_seq = last_gsn & !crate::entry::TOMBSTONE_BIT;

                    if last_seq < target_gsn {
                        continue; // Target is after this file
                    }

                    // This file contains our target. Open it and scan from start.
                    self.open_file(idx)?;
                    self.skip_until_gsn(target_gsn)?;
                    return Ok(());
                }
            }
        }

        // No hint match — scan from first file
        if !self.file_ids.is_empty() {
            self.open_file(0)?;
            self.skip_until_gsn(target_gsn)?;
        }

        Ok(())
    }

    /// Skip entries until we reach one with GSN >= target.
    fn skip_until_gsn(&mut self, target_gsn: u64) -> DbResult<()> {
        loop {
            if self.current_file.is_none() {
                return Ok(());
            }

            let save_offset = self.current_offset;
            match self.next_entry()? {
                Some(entry) if entry.gsn >= target_gsn => {
                    // Found it — rewind to this entry
                    self.current_offset = save_offset;
                    self.read_buf_len = 0; // invalidate read-ahead
                    return Ok(());
                }
                Some(_) => continue,
                None => return Ok(()),
            }
        }
    }

    fn open_file(&mut self, idx: usize) -> DbResult<()> {
        let fid = self.file_ids[idx];
        let path = self.shard_dir.join(format!("{fid:06}.data"));
        let file = File::open(&path)?;
        let file_len = file.metadata()?.len();
        self.current_file_idx = idx;
        self.current_file = Some(file);
        self.current_offset = 0;
        self.current_file_len = file_len;
        self.read_buf_len = 0;
        self.read_buf_offset = 0;
        Ok(())
    }

    fn advance_file(&mut self) -> DbResult<bool> {
        let next_idx = self.current_file_idx + 1;
        if next_idx >= self.file_ids.len() {
            self.current_file = None;
            return Ok(false);
        }
        self.open_file(next_idx)?;
        Ok(true)
    }

    /// Read `len` bytes from current_offset, advancing the position.
    fn read_bytes(&mut self, len: usize) -> DbResult<Option<Vec<u8>>> {
        self.read_bytes_from(self.current_offset, len)
    }

    /// Read `len` bytes from a specific offset without advancing position.
    fn peek_bytes_from(&mut self, offset: u64, len: usize) -> DbResult<Option<Vec<u8>>> {
        if offset + len as u64 > self.current_file_len {
            return Ok(None);
        }

        // Check read-ahead buffer
        if offset >= self.read_buf_offset
            && offset + len as u64 <= self.read_buf_offset + self.read_buf_len as u64
        {
            let start = (offset - self.read_buf_offset) as usize;
            return Ok(Some(self.read_buf[start..start + len].to_vec()));
        }

        // Refill read-ahead buffer
        self.fill_read_buf(offset)?;

        if self.read_buf_len >= len {
            return Ok(Some(self.read_buf[..len].to_vec()));
        }

        Ok(None)
    }

    /// Read `len` bytes from a specific offset.
    fn read_bytes_from(&mut self, offset: u64, len: usize) -> DbResult<Option<Vec<u8>>> {
        self.peek_bytes_from(offset, len)
    }

    fn fill_read_buf(&mut self, offset: u64) -> DbResult<()> {
        use std::io::{Read, Seek, SeekFrom};

        let file = match self.current_file.as_mut() {
            Some(f) => f,
            None => return Ok(()),
        };

        let remaining = self.current_file_len.saturating_sub(offset) as usize;
        let to_read = remaining.min(READ_AHEAD_SIZE);
        if to_read == 0 {
            self.read_buf_len = 0;
            return Ok(());
        }

        file.seek(SeekFrom::Start(offset))?;
        self.read_buf_len = file.read(&mut self.read_buf[..to_read])?;
        self.read_buf_offset = offset;
        Ok(())
    }
}

fn scan_data_files(dir: &Path) -> DbResult<Vec<u32>> {
    let mut file_ids: Vec<u32> = Vec::new();
    if !dir.exists() {
        return Ok(file_ids);
    }
    for entry in fs::read_dir(dir)? {
        let entry = entry?;
        let name = entry.file_name();
        let name = name.to_string_lossy();
        if name.ends_with(".data")
            && let Ok(id) = name.trim_end_matches(".data").parse::<u32>()
        {
            file_ids.push(id);
        }
    }
    file_ids.sort();
    Ok(file_ids)
}