use std::fs::{self, File};
use std::path::{Path, PathBuf};
use zerocopy::FromBytes;
use crate::entry::{EntryHeader, compute_crc32, entry_size};
use crate::error::DbResult;
use crate::hint;
pub struct RawEntry {
pub data: Vec<u8>,
pub gsn: u64,
pub file_id: u32,
pub file_offset: u64,
pub key_len: u16,
}
pub struct ShardLogReader {
shard_dir: PathBuf,
file_ids: Vec<u32>,
current_file_idx: usize,
current_file: Option<File>,
current_offset: u64,
current_file_len: u64,
key_lens: Vec<usize>,
last_matched_k: usize,
read_buf: Vec<u8>,
read_buf_offset: u64,
read_buf_len: usize,
}
const READ_AHEAD_SIZE: usize = 64 * 1024;
const HEADER_SIZE: usize = size_of::<EntryHeader>();
impl ShardLogReader {
pub fn new(shard_dir: PathBuf, from_gsn: u64, key_lens: Vec<usize>) -> DbResult<Self> {
let file_ids = scan_data_files(&shard_dir)?;
let last_k = key_lens.first().copied().unwrap_or(0);
let mut reader = Self {
shard_dir,
file_ids,
current_file_idx: 0,
current_file: None,
current_offset: 0,
current_file_len: 0,
key_lens,
last_matched_k: last_k,
read_buf: vec![0u8; READ_AHEAD_SIZE],
read_buf_offset: 0,
read_buf_len: 0,
};
if from_gsn > 0 {
reader.seek_to_gsn(from_gsn)?;
} else if !reader.file_ids.is_empty() {
reader.open_file(0)?;
}
Ok(reader)
}
pub fn next_entry(&mut self) -> DbResult<Option<RawEntry>> {
loop {
if self.current_file.is_none() {
return Ok(None);
}
let header_bytes = match self.read_bytes(HEADER_SIZE)? {
Some(b) => b,
None => {
if !self.advance_file()? {
return Ok(None);
}
continue;
}
};
let header = match EntryHeader::read_from_bytes(&header_bytes) {
Ok(h) => h,
Err(_) => {
if !self.advance_file()? {
return Ok(None);
}
continue;
}
};
let entry_offset = self.current_offset;
let file_id = self.file_ids[self.current_file_idx];
if let Some((total_size, matched_k)) = self.resolve_entry_size(&header)? {
self.last_matched_k = matched_k;
let data = match self.read_bytes_from(entry_offset, total_size)? {
Some(d) => d,
None => {
if !self.advance_file()? {
return Ok(None);
}
continue;
}
};
self.current_offset = entry_offset + total_size as u64;
return Ok(Some(RawEntry {
data,
gsn: header.sequence(),
file_id,
file_offset: entry_offset,
key_len: matched_k as u16,
}));
} else {
self.current_offset += HEADER_SIZE as u64;
}
}
}
fn resolve_entry_size(&mut self, header: &EntryHeader) -> DbResult<Option<(usize, usize)>> {
let last_k = self.last_matched_k;
if last_k > 0
&& let Some(result) = self.try_key_len(header, last_k)?
{
return Ok(Some(result));
}
let key_lens: Vec<usize> = self.key_lens.clone();
for k in key_lens {
if k == last_k {
continue;
}
if let Some(result) = self.try_key_len(header, k)? {
return Ok(Some(result));
}
}
Ok(None)
}
fn try_key_len(&mut self, header: &EntryHeader, k: usize) -> DbResult<Option<(usize, usize)>> {
let total = entry_size(k, header.value_len) as usize;
let entry_offset = self.current_offset;
let data = match self.peek_bytes_from(entry_offset, total)? {
Some(d) => d,
None => return Ok(None),
};
if data.len() < 16 + k + header.value_len as usize {
return Ok(None);
}
let key = &data[16..16 + k];
let value = &data[16 + k..16 + k + header.value_len as usize];
let expected_crc = compute_crc32(header.gsn, header.value_len, key, value);
if expected_crc == header.crc32 {
Ok(Some((total, k)))
} else {
Ok(None)
}
}
fn seek_to_gsn(&mut self, target_gsn: u64) -> DbResult<()> {
for (idx, &fid) in self.file_ids.iter().enumerate() {
let hint_path = self.shard_dir.join(format!("{fid:06}.hint"));
if !hint_path.exists() {
continue;
}
if let Some(hint_data) = hint::read_hint_file(&hint_path)? {
for &k in &self.key_lens {
let hint_entry_size = hint::hint_entry_size(k);
if hint_data.len() % hint_entry_size != 0 {
continue;
}
let entry_count = hint_data.len() / hint_entry_size;
if entry_count == 0 {
continue;
}
let last_entry_start = (entry_count - 1) * hint_entry_size;
let last_gsn = u64::from_ne_bytes(
hint_data[last_entry_start..last_entry_start + 8]
.try_into()
.expect("8 bytes"),
);
let last_seq = last_gsn & !crate::entry::TOMBSTONE_BIT;
if last_seq < target_gsn {
continue; }
self.open_file(idx)?;
self.skip_until_gsn(target_gsn)?;
return Ok(());
}
}
}
if !self.file_ids.is_empty() {
self.open_file(0)?;
self.skip_until_gsn(target_gsn)?;
}
Ok(())
}
fn skip_until_gsn(&mut self, target_gsn: u64) -> DbResult<()> {
loop {
if self.current_file.is_none() {
return Ok(());
}
let save_offset = self.current_offset;
match self.next_entry()? {
Some(entry) if entry.gsn >= target_gsn => {
self.current_offset = save_offset;
self.read_buf_len = 0; return Ok(());
}
Some(_) => continue,
None => return Ok(()),
}
}
}
fn open_file(&mut self, idx: usize) -> DbResult<()> {
let fid = self.file_ids[idx];
let path = self.shard_dir.join(format!("{fid:06}.data"));
let file = File::open(&path)?;
let file_len = file.metadata()?.len();
self.current_file_idx = idx;
self.current_file = Some(file);
self.current_offset = 0;
self.current_file_len = file_len;
self.read_buf_len = 0;
self.read_buf_offset = 0;
Ok(())
}
fn advance_file(&mut self) -> DbResult<bool> {
let next_idx = self.current_file_idx + 1;
if next_idx >= self.file_ids.len() {
self.current_file = None;
return Ok(false);
}
self.open_file(next_idx)?;
Ok(true)
}
fn read_bytes(&mut self, len: usize) -> DbResult<Option<Vec<u8>>> {
self.read_bytes_from(self.current_offset, len)
}
fn peek_bytes_from(&mut self, offset: u64, len: usize) -> DbResult<Option<Vec<u8>>> {
if offset + len as u64 > self.current_file_len {
return Ok(None);
}
if offset >= self.read_buf_offset
&& offset + len as u64 <= self.read_buf_offset + self.read_buf_len as u64
{
let start = (offset - self.read_buf_offset) as usize;
return Ok(Some(self.read_buf[start..start + len].to_vec()));
}
self.fill_read_buf(offset)?;
if self.read_buf_len >= len {
return Ok(Some(self.read_buf[..len].to_vec()));
}
Ok(None)
}
fn read_bytes_from(&mut self, offset: u64, len: usize) -> DbResult<Option<Vec<u8>>> {
self.peek_bytes_from(offset, len)
}
fn fill_read_buf(&mut self, offset: u64) -> DbResult<()> {
use std::io::{Read, Seek, SeekFrom};
let file = match self.current_file.as_mut() {
Some(f) => f,
None => return Ok(()),
};
let remaining = self.current_file_len.saturating_sub(offset) as usize;
let to_read = remaining.min(READ_AHEAD_SIZE);
if to_read == 0 {
self.read_buf_len = 0;
return Ok(());
}
file.seek(SeekFrom::Start(offset))?;
self.read_buf_len = file.read(&mut self.read_buf[..to_read])?;
self.read_buf_offset = offset;
Ok(())
}
}
fn scan_data_files(dir: &Path) -> DbResult<Vec<u32>> {
let mut file_ids: Vec<u32> = Vec::new();
if !dir.exists() {
return Ok(file_ids);
}
for entry in fs::read_dir(dir)? {
let entry = entry?;
let name = entry.file_name();
let name = name.to_string_lossy();
if name.ends_with(".data")
&& let Ok(id) = name.trim_end_matches(".data").parse::<u32>()
{
file_ids.push(id);
}
}
file_ids.sort();
Ok(file_ids)
}