use crate::kernel::io::{FileExtension, IoFactory, IoType, IoWriter};
use crate::kernel::lsm::storage::Gen;
use crate::kernel::{sorted_gen_list, KernelResult};
use crate::KernelError;
use integer_encoding::FixedInt;
use std::cmp::min;
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::Path;
use std::sync::Arc;
const BLOCK_SIZE: usize = 32 * 1024;
const HEADER_SIZE: usize = 4 + 4 + 1;
#[derive(Clone)]
pub(crate) struct LogLoader {
factory: Arc<IoFactory>,
io_type: IoType,
}
impl LogLoader {
pub(crate) fn reload<F, R>(
wal_dir_path: &Path,
path_name: (&str, Option<i64>),
io_type: IoType,
records: &mut Vec<R>,
fn_decode: F,
) -> KernelResult<(Self, i64)>
where
F: Fn(&mut Vec<u8>, &mut Vec<R>) -> KernelResult<()>,
{
let (loader, log_gen) = Self::_reload(wal_dir_path, path_name, io_type)?;
loader.load(log_gen, records, fn_decode)?;
Ok((loader, log_gen))
}
fn _reload(
wal_dir_path: &Path,
path_name: (&str, Option<i64>),
io_type: IoType,
) -> KernelResult<(Self, i64)> {
let (path, name) = path_name;
let wal_path = wal_dir_path.join(path);
let factory = Arc::new(IoFactory::new(wal_path.clone(), FileExtension::Log)?);
let current_gen = name
.or_else(|| {
sorted_gen_list(&wal_path, FileExtension::Log)
.ok()
.and_then(|vec| vec.last().cloned())
})
.unwrap_or(Gen::create());
Ok((LogLoader { factory, io_type }, current_gen))
}
pub(crate) fn load<F, R>(
&self,
gen: i64,
records: &mut Vec<R>,
fn_decode: F,
) -> KernelResult<()>
where
F: Fn(&mut Vec<u8>, &mut Vec<R>) -> KernelResult<()>,
{
let mut reader = LogReader::new(self.factory.reader(gen, self.io_type)?);
let mut buf = vec![0; 128];
while reader.read(&mut buf).unwrap_or(0) > 0 {
fn_decode(&mut buf, records)?;
}
Ok(())
}
#[allow(dead_code)]
pub(crate) fn clean(&self, gen: i64) -> KernelResult<()> {
self.factory.clean(gen)
}
pub(crate) fn writer(&self, gen: i64) -> KernelResult<LogWriter<Box<dyn IoWriter>>> {
let new_fs = self.factory.writer(gen, self.io_type)?;
Ok(LogWriter::new(new_fs))
}
}
#[derive(Debug, Clone, Copy)]
pub(crate) enum RecordType {
Full = 1,
First = 2,
Middle = 3,
Last = 4,
}
impl From<u8> for RecordType {
fn from(value: u8) -> Self {
match value {
1 => RecordType::Full,
2 => RecordType::First,
3 => RecordType::Middle,
4 => RecordType::Last,
_ => panic!("Unknown value: {value}"),
}
}
}
pub(crate) struct LogWriter<W: Write + Seek> {
dst: W,
current_block_offset: usize,
block_size: usize,
}
impl<W: Write + Seek> LogWriter<W> {
pub(crate) fn new(writer: W) -> LogWriter<W> {
LogWriter {
dst: writer,
current_block_offset: 0,
block_size: BLOCK_SIZE,
}
}
#[allow(dead_code)]
pub(crate) fn new_with_off(writer: W, off: usize) -> LogWriter<W> {
let mut w = LogWriter::new(writer);
w.current_block_offset = off % BLOCK_SIZE;
w
}
pub(crate) fn seek_end(&mut self) -> KernelResult<u64> {
Ok(self.dst.seek(SeekFrom::End(0))?)
}
pub(crate) fn add_record(&mut self, r: &[u8]) -> KernelResult<usize> {
let mut record = r;
let mut len = 0;
while !record.is_empty() {
assert!(self.block_size > HEADER_SIZE);
let space_left = self.block_size - self.current_block_offset;
if space_left < HEADER_SIZE {
if space_left > 0 {
self.dst.write_all(&vec![0u8; space_left])?;
}
self.current_block_offset = 0;
}
let avail_for_data = self.block_size - self.current_block_offset - HEADER_SIZE;
let data_frag_len = min(record.len(), avail_for_data);
let first_frag = len == 0;
let record_type = if first_frag && data_frag_len == record.len() {
RecordType::Full
} else if first_frag {
RecordType::First
} else if data_frag_len == record.len() {
RecordType::Last
} else {
RecordType::Middle
};
len = self.emit_record(record_type, record, data_frag_len)?;
record = &record[data_frag_len..];
}
Ok(len)
}
fn emit_record(&mut self, t: RecordType, data: &[u8], len: usize) -> KernelResult<usize> {
let crc = crc32fast::hash(&data[0..len]);
let mut header_bytes = crc.encode_fixed_vec();
header_bytes.append(&mut (len as u32).encode_fixed_vec());
header_bytes.append(&mut vec![t as u8]);
let mut offset = 0;
offset += self.dst.write(&header_bytes)?;
offset += self.dst.write(&data[0..len])?;
self.current_block_offset += offset;
Ok(offset)
}
#[allow(dead_code)]
pub(crate) fn flush(&mut self) -> KernelResult<()> {
self.dst.flush()?;
Ok(())
}
}
pub(crate) struct LogReader<R: Read + Seek> {
src: R,
offset: usize,
block_size: usize,
head_scratch: [u8; HEADER_SIZE],
}
impl<R: Read + Seek> LogReader<R> {
pub(crate) fn new(src: R) -> LogReader<R> {
LogReader {
src,
offset: 0,
block_size: BLOCK_SIZE,
head_scratch: [0u8; HEADER_SIZE],
}
}
pub(crate) fn read(&mut self, dst: &mut Vec<u8>) -> KernelResult<usize> {
let mut dst_offset = 0;
let mut head_pos = 0;
dst.clear();
loop {
let leftover = self.block_size - self.offset;
if leftover < HEADER_SIZE {
if leftover != 0 {
let _ = self.src.seek(SeekFrom::Current((leftover) as i64))?;
}
self.offset = 0;
}
head_pos += self.src.read(&mut self.head_scratch[head_pos..])?;
if head_pos == 0 {
return Ok(dst_offset);
} else if head_pos != HEADER_SIZE {
continue;
} else {
head_pos = 0;
}
self.offset += HEADER_SIZE;
let crc = u32::decode_fixed(&self.head_scratch[0..4]);
let length = u32::decode_fixed(&self.head_scratch[4..8]) as usize;
let mut buf = vec![0; length];
self.src.read_exact(&mut buf)?;
self.offset += length;
dst_offset += length;
if crc32fast::hash(&buf) != crc {
return Err(KernelError::CrcMisMatch);
}
dst.append(&mut buf);
if let RecordType::Full | RecordType::Last = RecordType::from(self.head_scratch[8]) {
return Ok(dst_offset);
}
}
}
}
#[cfg(test)]
mod tests {
use crate::kernel::io::IoType;
use crate::kernel::lsm::log::{LogLoader, LogReader, LogWriter, HEADER_SIZE};
use crate::kernel::lsm::mem_table::DEFAULT_WAL_PATH;
use crate::kernel::lsm::storage::Config;
use crate::kernel::KernelResult;
use std::fs::{File, OpenOptions};
use std::io::Cursor;
use std::mem;
use tempfile::TempDir;
#[test]
fn test_writer() {
let data = &[
"hello world. My first log entry.",
"and my second",
"and my third",
];
let mut lw = LogWriter::new(Cursor::new(Vec::new()));
let total_len = data.iter().fold(0, |l, d| l + d.len());
for d in data {
let _ = lw.add_record(d.as_bytes());
}
assert_eq!(lw.current_block_offset, total_len + 3 * HEADER_SIZE);
}
#[test]
fn test_writer_append() {
let data = &[
"hello world. My first log entry.",
"and my second",
"and my third",
];
let mut dst = vec![0; 1024];
{
let mut lw = LogWriter::new(Cursor::new(dst.as_mut_slice()));
for d in data {
let _ = lw.add_record(d.as_bytes());
}
}
let old = dst.clone();
{
let offset = data[0].len() + HEADER_SIZE;
let mut lw =
LogWriter::new_with_off(Cursor::new(&mut dst.as_mut_slice()[offset..]), offset);
for d in &data[1..] {
let _ = lw.add_record(d.as_bytes());
}
}
assert_eq!(old, dst);
}
#[test]
fn test_reader() -> KernelResult<()> {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let file_path = temp_dir.path().join("test.txt");
let fs = OpenOptions::new()
.create(true)
.write(true)
.read(true)
.open(file_path.clone())?;
let data = [
"abcdefghi".as_bytes().to_vec(), "123456789012".as_bytes().to_vec(), "0101010101010101010101".as_bytes().to_vec(),
]; let mut lw = LogWriter::new(fs);
for e in data.iter() {
assert!(lw.add_record(e).is_ok());
}
assert_eq!(lw.dst.metadata()?.len(), 70);
let mut lr = LogReader::new(File::open(file_path)?);
let mut dst = Vec::with_capacity(128);
let mut i = 0;
loop {
let r = lr.read(&mut dst);
if r.is_err() {
panic!("{}", r.unwrap_err());
} else if r.unwrap() == 0 {
break;
}
assert_eq!(dst, data[i]);
i += 1;
}
assert_eq!(i, data.len());
Ok(())
}
#[test]
fn test_log_loader() -> KernelResult<()> {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let config = Config::new(temp_dir.into_path());
let (loader, _) = LogLoader::reload(
config.path(),
(DEFAULT_WAL_PATH, Some(1)),
IoType::Buf,
&mut vec![0],
|_, _| Ok(()),
)?;
let mut writer = loader.writer(1)?;
let _ = writer.add_record(b"kip_key_1")?;
let _ = writer.add_record(b"kip_key_2")?;
writer.flush()?;
drop(loader);
let mut reload_data_1 = Vec::new();
let (wal, log_gen) = LogLoader::reload(
config.path(),
(DEFAULT_WAL_PATH, Some(1)),
IoType::Buf,
&mut reload_data_1,
|bytes, records| {
records.push(mem::take(bytes));
Ok(())
},
)?;
let mut reload_data_2 = Vec::new();
wal.load(1, &mut reload_data_2, |bytes, records| {
records.push(mem::take(bytes));
Ok(())
})?;
assert_eq!(log_gen, 1);
assert_eq!(reload_data_1, vec![b"kip_key_1", b"kip_key_2"]);
assert_eq!(reload_data_1, reload_data_2);
Ok(())
}
}