use crc::{Crc, CRC_32_ISCSI};
use integer_encoding::FixedInt;
use std::convert::{TryFrom, TryInto};
use std::fmt;
use std::io::{ErrorKind, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use crate::errors::{DBIOError, LogIOError, LogSerializationErrorKind};
use crate::fs::{FileSystem, RandomAccessFile, ReadonlyRandomAccessFile};
use crate::utils::crc::{mask_checksum, unmask_checksum};
const HEADER_LENGTH_BYTES: usize = 4 + 2 + 1;
const BLOCK_SIZE_BYTES: usize = 32 * 1024;
const CRC_CALCULATOR: Crc<u32> = Crc::<u32>::new(&CRC_32_ISCSI);
type LogIOResult<T> = Result<T, LogIOError>;
#[repr(u8)]
#[derive(Clone, Copy, Debug)]
pub(crate) enum BlockType {
Full = 0,
First,
Middle,
Last,
}
impl TryFrom<u8> for BlockType {
type Error = LogIOError;
fn try_from(value: u8) -> LogIOResult<BlockType> {
let operation = match value {
0 => BlockType::Full,
1 => BlockType::First,
2 => BlockType::Middle,
3 => BlockType::Last,
_ => {
return Err(LogIOError::Seralization(LogSerializationErrorKind::Other(
format!(
"There was an problem parsing the block type. The value received was {}",
value
),
)))
}
};
Ok(operation)
}
}
#[derive(Debug)]
pub(crate) struct BlockRecord {
checksum: u32,
length: u16,
block_type: BlockType,
data: Vec<u8>,
}
impl BlockRecord {
pub(crate) fn new(length: u16, block_type: BlockType, data: Vec<u8>) -> Self {
let checksum = CRC_CALCULATOR.checksum(&data);
Self {
checksum,
length,
block_type,
data,
}
}
}
impl From<&BlockRecord> for Vec<u8> {
fn from(record: &BlockRecord) -> Self {
let initial_capacity = HEADER_LENGTH_BYTES + record.data.len();
let mut buf: Vec<u8> = Vec::with_capacity(initial_capacity);
buf.extend_from_slice(&u32::encode_fixed_vec(mask_checksum(record.checksum)));
buf.extend_from_slice(&u16::encode_fixed_vec(record.length));
buf.extend_from_slice(&[record.block_type as u8]);
buf.extend_from_slice(&record.data);
buf
}
}
impl TryFrom<&Vec<u8>> for BlockRecord {
type Error = LogIOError;
fn try_from(buf: &Vec<u8>) -> LogIOResult<BlockRecord> {
if buf.len() < HEADER_LENGTH_BYTES {
let error_msg = format!(
"Failed to deserialize the provided buffer to a log block record. The buffer was \
expected to be at least the size of the header ({} bytes) but was {}.",
HEADER_LENGTH_BYTES,
buf.len()
);
return Err(LogIOError::Seralization(LogSerializationErrorKind::Other(
error_msg,
)));
}
let checksum = u32::decode_fixed(&buf[0..4]);
let unmasked_checksum = unmask_checksum(checksum);
let data_length = u16::decode_fixed(&buf[4..6]);
let block_type: BlockType = buf[6].try_into()?;
let data = buf[HEADER_LENGTH_BYTES..].to_vec();
let calculated_checksum = CRC_CALCULATOR.checksum(&data);
if calculated_checksum != unmasked_checksum {
return Err(LogIOError::Seralization(LogSerializationErrorKind::Other(
format!(
"The checksums of the data did not match. Expected {unmasked_checksum} but \
got {calculated_checksum}"
),
)));
}
Ok(BlockRecord::new(data_length, block_type, data))
}
}
pub(crate) struct LogWriter {
log_file_path: PathBuf,
log_file: Box<dyn RandomAccessFile>,
current_block_offset: usize,
}
impl LogWriter {
pub fn new<P: AsRef<Path>>(
fs: Arc<dyn FileSystem>,
log_file_path: P,
is_appending: bool,
) -> LogIOResult<Self> {
log::info!(
"Creating/appending to a log file at {}",
log_file_path.as_ref().to_string_lossy()
);
let log_file = fs.create_file(log_file_path.as_ref(), is_appending)?;
let mut block_offset = 0;
let log_file_size = log_file.len()? as usize;
if log_file_size > 0 {
block_offset = log_file_size % BLOCK_SIZE_BYTES;
}
Ok(LogWriter {
log_file_path: log_file_path.as_ref().to_path_buf(),
log_file,
current_block_offset: block_offset,
})
}
pub fn append(&mut self, data: &[u8]) -> LogIOResult<()> {
let mut data_to_write = data;
let mut is_first_data_chunk = true;
loop {
let block_available_space = BLOCK_SIZE_BYTES - self.current_block_offset;
if block_available_space < HEADER_LENGTH_BYTES {
if block_available_space > 0 {
log::debug!(
"Log file {:?}. There is not enough remaining space in the current block \
for the header. Filling it with zeroes.",
self.log_file_path
);
self.log_file
.write_all(&vec![0; HEADER_LENGTH_BYTES - 1][0..block_available_space])?;
}
self.current_block_offset = 0;
}
let space_available_for_data =
BLOCK_SIZE_BYTES - self.current_block_offset - HEADER_LENGTH_BYTES;
let block_data_chunk_length = if data_to_write.len() < space_available_for_data {
data_to_write.len()
} else {
space_available_for_data
};
let is_last_data_chunk = data_to_write.len() == block_data_chunk_length;
let block_type = if is_first_data_chunk && is_last_data_chunk {
BlockType::Full
} else if is_first_data_chunk {
BlockType::First
} else if is_last_data_chunk {
BlockType::Last
} else {
BlockType::Middle
};
self.emit_block(block_type, &data_to_write[0..block_data_chunk_length])?;
data_to_write = data_to_write.split_at(block_data_chunk_length).1;
is_first_data_chunk = false;
if data_to_write.is_empty() {
break;
}
}
Ok(())
}
}
impl LogWriter {
fn emit_block(&mut self, block_type: BlockType, data_chunk: &[u8]) -> LogIOResult<()> {
let data_length = u16::try_from(data_chunk.len())?;
let block = BlockRecord::new(data_length, block_type, data_chunk.to_vec());
log::debug!(
"Writing new record to log file at {:?} with length {} and block type {:?}.",
self.log_file_path,
data_length,
block.block_type
);
self.log_file
.write_all(Vec::<u8>::from(&block).as_slice())?;
self.log_file.flush()?;
let bytes_written = HEADER_LENGTH_BYTES + data_chunk.len();
self.current_block_offset += bytes_written;
log::debug!("Wrote {} bytes to the log file.", bytes_written);
Ok(())
}
fn len(&self) -> LogIOResult<u64> {
Ok(self.log_file.len()?)
}
}
impl fmt::Debug for LogWriter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LogWriter")
.field("log_file_path", &self.log_file_path)
.finish()
}
}
pub(crate) struct LogReader {
log_file: Box<dyn ReadonlyRandomAccessFile>,
log_file_path: PathBuf,
initial_offset: usize,
current_cursor_position: usize,
current_block_offset: usize,
}
impl LogReader {
pub fn new<P: AsRef<Path>>(
fs: Arc<dyn FileSystem>,
log_file_path: P,
initial_block_offset: usize,
) -> LogIOResult<Self> {
log::info!("Reading the log file at {:?}", log_file_path.as_ref());
let log_file = fs.open_file(log_file_path.as_ref())?;
let reader = Self {
log_file,
log_file_path: log_file_path.as_ref().to_path_buf(),
initial_offset: initial_block_offset,
current_cursor_position: initial_block_offset,
current_block_offset: 0,
};
Ok(reader)
}
pub fn read_record(&mut self) -> LogIOResult<(Vec<u8>, bool)> {
if self.current_cursor_position < self.initial_offset {
self.current_cursor_position += self.seek_to_initial_block()? as usize;
}
if self.current_cursor_position > 0
&& (self.current_cursor_position as u64) >= self.len()?
{
return Ok((vec![], true));
}
let mut data_buffer: Vec<u8> = vec![];
loop {
let maybe_record = self.read_physical_record();
if let Err(physical_read_err) = maybe_record {
if let LogIOError::IO(db_io_error) = &physical_read_err {
match db_io_error.kind() {
ErrorKind::UnexpectedEof => return Ok((vec![], true)),
_ => return Err(physical_read_err),
}
}
} else {
let record = maybe_record.unwrap();
data_buffer.extend(record.data);
match record.block_type {
BlockType::Full => {
return Ok((data_buffer, false));
}
BlockType::First => {}
BlockType::Middle => {}
BlockType::Last => {
return Ok((data_buffer, false));
}
}
}
}
}
}
impl LogReader {
fn seek_to_initial_block(&mut self) -> LogIOResult<u64> {
let offset_in_block = self.initial_offset % BLOCK_SIZE_BYTES;
let mut block_start_position = self.initial_offset - offset_in_block;
if offset_in_block > (BLOCK_SIZE_BYTES - (HEADER_LENGTH_BYTES - 1)) {
block_start_position += BLOCK_SIZE_BYTES;
}
if block_start_position > 0 {
return Ok(self
.log_file
.seek(SeekFrom::Start(block_start_position as u64))?);
}
Ok(0)
}
fn read_physical_record(&mut self) -> LogIOResult<BlockRecord> {
if BLOCK_SIZE_BYTES - self.current_block_offset < HEADER_LENGTH_BYTES {
let mut trailer = vec![0u8; BLOCK_SIZE_BYTES - self.current_block_offset];
if !trailer.is_empty() {
self.log_file.read_exact(&mut trailer)?;
self.current_block_offset = 0;
self.current_cursor_position += trailer.len();
}
}
let mut header_buffer = [0; HEADER_LENGTH_BYTES];
let header_bytes_read = self.log_file.read(&mut header_buffer)?;
if header_bytes_read < HEADER_LENGTH_BYTES {
let err_msg = format!(
"Unexpectedly reached the end of the log file at {log_file_path:?} while \
attempting to read a header.",
log_file_path = self.log_file_path
);
return Err(LogIOError::IO(DBIOError::new(
ErrorKind::UnexpectedEof,
err_msg,
)));
}
self.current_block_offset += header_bytes_read;
let data_length = u16::decode_fixed(&header_buffer[4..6]) as usize;
let mut data_buffer = vec![0; data_length];
let data_bytes_read = self.log_file.read(&mut data_buffer)?;
if data_bytes_read < data_length {
let err_msg = format!(
"Unexpectedly reached the end of the log file at {log_file_path:?} while \
attempting to read the data chunk.",
log_file_path = self.log_file_path
);
return Err(LogIOError::IO(DBIOError::new(
ErrorKind::UnexpectedEof,
err_msg,
)));
}
let serialized_block = [header_buffer.to_vec(), data_buffer].concat();
let block_record: BlockRecord = BlockRecord::try_from(&serialized_block)?;
self.current_cursor_position += header_buffer.len() + data_bytes_read;
self.current_block_offset =
(self.current_block_offset + data_bytes_read) % BLOCK_SIZE_BYTES;
Ok(block_record)
}
fn len(&self) -> LogIOResult<u64> {
Ok(self.log_file.len()?)
}
fn log_drop(num_bytes_dropped: u64, reason: String) {
log::error!(
"Skipped reading {} bytes. Reason: {}",
num_bytes_dropped,
&reason
);
}
fn log_corruption(num_bytes_corrupted: u64) {
LogReader::log_drop(num_bytes_corrupted, "Detected corruption.".to_owned())
}
}
#[cfg(test)]
mod tests {
use pretty_assertions::assert_eq;
use std::fmt::Write;
use crate::fs::InMemoryFileSystem;
use super::*;
#[test]
fn can_read_and_write() {
let fs: Arc<dyn FileSystem> = Arc::new(InMemoryFileSystem::new());
let path = "wal-123.log";
let mut log_writer = LogWriter::new(Arc::clone(&fs), path, false).unwrap();
log_writer.append(b"here is a string.").unwrap();
log_writer.append(b"we got more strings over here").unwrap();
log_writer.append(b"").unwrap();
log_writer
.append(b"there was an empty string somewhere")
.unwrap();
drop(log_writer);
let mut log_reader = LogReader::new(Arc::clone(&fs), path, 0).unwrap();
assert_eq!(
b"here is a string.".to_vec(),
log_reader.read_record().unwrap().0
);
assert_eq!(
b"we got more strings over here".to_vec(),
log_reader.read_record().unwrap().0
);
assert_eq!(b"".to_vec(), log_reader.read_record().unwrap().0);
assert_eq!(
b"there was an empty string somewhere".to_vec(),
log_reader.read_record().unwrap().0
);
assert_eq!(
Vec::<u8>::new(),
log_reader.read_record().unwrap().0,
"The reader should be at the end of the file but received an unexpected record."
);
assert_eq!(
Vec::<u8>::new(),
log_reader.read_record().unwrap().0,
"The reader should be at the end of the file but received an unexpected record."
);
}
#[test]
fn can_read_and_write_with_lots_of_blocks() {
let fs: Arc<dyn FileSystem> = Arc::new(InMemoryFileSystem::new());
let path = "wal-123.log";
let mut log_writer = LogWriter::new(Arc::clone(&fs), path, false).unwrap();
for num in 0usize..100_000 {
log_writer.append(format!("{num}").as_bytes()).unwrap();
}
println!("Log writer wrote {} bytes", log_writer.len().unwrap());
drop(log_writer);
let mut log_reader = LogReader::new(Arc::clone(&fs), path, 0).unwrap();
println!("Log reader sees {} bytes", log_reader.len().unwrap());
for num in 0usize..100_000 {
let actual_record = log_reader.read_record().ok();
if num >= 99_990 {
println!(
"Actual record read {}",
std::str::from_utf8(&actual_record.as_ref().unwrap().0).unwrap()
);
}
assert!(
actual_record.unwrap().0 == format!("{num}").as_bytes(),
"Failed to properly read at iteration {num}"
);
}
assert_eq!(
Vec::<u8>::new(),
log_reader.read_record().unwrap().0,
"The reader should be at the end of the file but received an unexpected record."
);
}
#[test]
fn can_read_and_write_with_records_larger_than_the_block_size_limit() {
let fs: Arc<dyn FileSystem> = Arc::new(InMemoryFileSystem::new());
let path = "wal-123.log";
let mut log_writer = LogWriter::new(Arc::clone(&fs), path, false).unwrap();
log_writer.append(b"small").unwrap();
log_writer
.append(&generate_large_buffer("medium", 50_000))
.unwrap();
log_writer
.append(&generate_large_buffer("large", 100_000))
.unwrap();
let mut log_reader = LogReader::new(Arc::clone(&fs), path, 0).unwrap();
assert_eq!(log_reader.read_record().unwrap().0, b"small");
assert_eq!(
log_reader.read_record().unwrap().0,
generate_large_buffer("medium", 50_000)
);
assert_eq!(
log_reader.read_record().unwrap().0,
generate_large_buffer("large", 100_000)
);
assert_eq!(
Vec::<u8>::new(),
log_reader.read_record().unwrap().0,
"The reader should be at the end of the file but received an unexpected record."
);
}
#[test]
fn read_can_differentiate_an_empty_record_from_end_of_file() {
let fs: Arc<dyn FileSystem> = Arc::new(InMemoryFileSystem::new());
let path = "wal-123.log";
let mut log_writer = LogWriter::new(Arc::clone(&fs), path, false).unwrap();
log_writer.append(b"small").unwrap();
log_writer
.append(&generate_large_buffer("medium", 50_000))
.unwrap();
log_writer.append(&[]).unwrap();
log_writer
.append(&generate_large_buffer("large", 100_000))
.unwrap();
let mut log_reader = LogReader::new(Arc::clone(&fs), path, 0).unwrap();
assert_eq!(
log_reader.read_record().unwrap(),
(b"small".to_vec(), false)
);
assert_eq!(
log_reader.read_record().unwrap(),
(generate_large_buffer("medium", 50_000), false)
);
assert_eq!(log_reader.read_record().unwrap(), (vec![], false));
assert_eq!(
log_reader.read_record().unwrap(),
(generate_large_buffer("large", 100_000), false)
);
assert_eq!(
log_reader.read_record().unwrap(),
(vec![], true),
"The reader should be at the end of the file but received an unexpected record."
);
}
#[test]
fn read_returns_eof_when_encountering_an_empty_log() {
let fs: Arc<dyn FileSystem> = Arc::new(InMemoryFileSystem::new());
let path = "wal-123.log";
LogWriter::new(Arc::clone(&fs), path, false).unwrap();
let mut log_reader = LogReader::new(Arc::clone(&fs), path, 0).unwrap();
assert_eq!(log_reader.read_record().unwrap(), (vec![], true));
}
#[test]
fn when_a_there_is_only_enough_room_for_an_empty_record_will_write_a_header_with_no_padding() {
let fs: Arc<dyn FileSystem> = Arc::new(InMemoryFileSystem::new());
let path = "wal-123.log";
let mut log_writer = LogWriter::new(Arc::clone(&fs), path, false).unwrap();
let marginal_size = BLOCK_SIZE_BYTES - (2 * HEADER_LENGTH_BYTES);
log_writer
.append(&generate_large_buffer("foo", marginal_size))
.unwrap();
assert_eq!(
log_writer.len().unwrap(),
(BLOCK_SIZE_BYTES - HEADER_LENGTH_BYTES) as u64
);
log_writer.append(b"").unwrap();
log_writer.append(b"bar").unwrap();
let mut log_reader = LogReader::new(Arc::clone(&fs), path, 0).unwrap();
assert_eq!(
log_reader.read_record().unwrap().0,
generate_large_buffer("foo", marginal_size)
);
assert_eq!(log_reader.read_record().unwrap().0, b"");
assert_eq!(log_reader.read_record().unwrap().0, b"bar");
assert_eq!(
Vec::<u8>::new(),
log_reader.read_record().unwrap().0,
"The reader should be at the end of the file but received an unexpected record."
);
}
#[test]
fn can_reopen_a_log_and_append() {
let fs: Arc<dyn FileSystem> = Arc::new(InMemoryFileSystem::new());
let path = "wal-123.log";
let mut log_writer = LogWriter::new(Arc::clone(&fs), path, false).unwrap();
log_writer.append(b"foo").unwrap();
let mut log_writer = LogWriter::new(Arc::clone(&fs), path, true).unwrap();
log_writer.append(b"bar").unwrap();
let mut log_reader = LogReader::new(Arc::clone(&fs), path, 0).unwrap();
assert_eq!(log_reader.read_record().unwrap().0, b"foo");
assert_eq!(log_reader.read_record().unwrap().0, b"bar");
assert_eq!(
Vec::<u8>::new(),
log_reader.read_record().unwrap().0,
"The reader should be at the end of the file but received an unexpected record."
);
}
fn generate_large_buffer(pattern: &str, target_size: usize) -> Vec<u8> {
let mut string = String::with_capacity(target_size);
while string.len() < target_size {
write!(string, "{pattern}").unwrap();
}
string.as_bytes().to_vec()
}
}