pub(crate) mod chunk_id;
pub(crate) mod closed_chunk;
pub(crate) mod open_chunk;
mod record_iterator;
use std::fs::File;
use std::fs::OpenOptions;
use std::io;
use std::marker::PhantomData;
use std::os::unix::fs::FileExt;
use std::sync::Arc;
use codeq::Decode;
use codeq::OffsetSize;
use codeq::error_context_ext::ErrorContextExt;
use log::error;
use log::warn;
use record_iterator::RecordIterator;
use crate::Config;
use crate::Types;
use crate::WALRecord;
use crate::chunk::chunk_id::ChunkId;
use crate::num::format_pad9_u64;
use crate::types::Segment;
#[derive(Debug, Clone)]
pub struct Chunk<T> {
pub(crate) f: Arc<File>,
pub(crate) global_offsets: Vec<u64>,
#[allow(dead_code)]
pub(crate) truncated: Option<u64>,
pub(crate) _p: PhantomData<T>,
}
impl<T> Chunk<T> {
pub(crate) fn records_count(&self) -> usize {
self.global_offsets.len() - 1
}
pub(crate) fn chunk_id(&self) -> ChunkId {
ChunkId(self.global_offsets[0])
}
pub(crate) fn last_segment(&self) -> Segment {
let offsets = &self.global_offsets;
let l = offsets.len();
let start = offsets[l - 2];
let end = offsets[l - 1];
Segment::new(start, end - start)
}
pub(crate) fn chunk_size(&self) -> u64 {
self.end_offset()
}
#[allow(dead_code)]
pub(crate) fn end_offset(&self) -> u64 {
self.global_offsets[self.global_offsets.len() - 1]
- self.global_offsets[0]
}
pub(crate) fn global_start(&self) -> u64 {
self.global_offsets[0]
}
#[allow(dead_code)]
pub(crate) fn global_end(&self) -> u64 {
self.global_offsets[self.global_offsets.len() - 1]
}
pub(crate) fn append_record_size(&mut self, size: u64) {
let last = self.global_offsets[self.global_offsets.len() - 1];
self.global_offsets.push(last + size);
}
pub(crate) fn open_chunk_file(
config: &Config,
chunk_id: ChunkId,
) -> Result<File, io::Error> {
let path = config.chunk_path(chunk_id);
let f = OpenOptions::new()
.read(true)
.write(true)
.open(path)
.context(|| format!("open {}", chunk_id))?;
Ok(f)
}
}
impl<T> Chunk<T>
where T: Types
{
pub(crate) fn open(
config: Arc<Config>,
chunk_id: ChunkId,
) -> Result<(Self, Vec<WALRecord<T>>), io::Error> {
let f = Self::open_chunk_file(&config, chunk_id)?;
let arc_f = Arc::new(f);
let file_size = arc_f.metadata()?.len();
let it = Self::load_records_iter(&config, arc_f.clone(), chunk_id)?;
let mut record_offsets = vec![chunk_id.offset()];
let mut records = Vec::new();
let mut truncate = false;
for res in it {
match res {
Ok((seg, record)) => {
record_offsets.push(chunk_id.offset() + seg.end().0);
records.push(record);
}
Err(io_err) => {
let global_offset = record_offsets.last().copied().unwrap();
truncate = Self::handle_record_error(
io_err,
arc_f.clone(),
global_offset,
chunk_id,
&config,
)?;
break;
}
};
}
let truncated = if truncate {
arc_f
.set_len(*record_offsets.last().unwrap() - chunk_id.offset())?;
arc_f.sync_all()?;
Some(file_size)
} else {
None
};
let chunk = Self {
f: arc_f,
global_offsets: record_offsets,
truncated,
_p: Default::default(),
};
Ok((chunk, records))
}
fn handle_record_error(
io_err: io::Error,
file: Arc<File>,
global_offset: u64,
chunk_id: ChunkId,
config: &Config,
) -> Result<bool, io::Error> {
let at = format!(
"at offset {} in chunk {}",
format_pad9_u64(global_offset),
chunk_id
);
error!(
"Error reading record {at}: {}, error kind: {:?}; trying to recover...",
io_err,
io_err.kind()
);
let can_truncate = config.truncate_incomplete_record();
if io_err.kind() == io::ErrorKind::UnexpectedEof {
if can_truncate {
warn!("UnexpectedEof {at}; truncating");
return Ok(true);
}
error!("UnexpectedEof {at}; truncate disabled");
return Err(io_err);
}
let all_zero = Self::verify_trailing_zeros(
file,
global_offset - chunk_id.offset(),
chunk_id,
)?;
if all_zero && can_truncate {
warn!("Trailing zeros {at}; truncating");
return Ok(true);
}
if all_zero {
error!("Trailing zeros {at}; truncate disabled");
} else {
error!("Damaged record({}) {at}", io_err);
}
Err(io_err)
}
fn verify_trailing_zeros(
file: Arc<File>,
mut start_offset: u64,
chunk_id: ChunkId,
) -> Result<bool, io::Error> {
let file_size = file.metadata()?.len();
if start_offset > file_size {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!(
"Start offset {} exceeds file size {}",
start_offset, file_size
),
));
}
if file_size == start_offset {
return Ok(true);
}
const WARN_THRESHOLD: u64 = 64 * 1024; if file_size - start_offset > WARN_THRESHOLD {
warn!(
"Large maybe damaged section detected: {} bytes to the end; in chunk {}",
file_size - start_offset,
chunk_id
);
}
const READ_CHUNK_SIZE: usize = 1024; let mut buffer = vec![0u8; READ_CHUNK_SIZE];
loop {
let n = file.read_at(&mut buffer, start_offset)?;
if n == 0 {
break;
}
for (i, byt) in buffer.iter().enumerate().take(n) {
if *byt != 0 {
error!(
"Non-zero byte detected at offset {} in chunk {}",
start_offset + i as u64,
chunk_id
);
return Ok(false);
}
}
start_offset += n as u64;
}
Ok(true)
}
#[allow(clippy::type_complexity)]
pub(crate) fn dump(
config: &Config,
chunk_id: ChunkId,
) -> Result<Vec<Result<(Segment, WALRecord<T>), io::Error>>, io::Error>
{
let f = Self::open_chunk_file(config, chunk_id)?;
let it = Self::load_records_iter(config, Arc::new(f), chunk_id)?;
Ok(it.collect::<Vec<_>>())
}
pub(crate) fn load_records_iter(
config: &Config,
f: Arc<File>,
chunk_id: ChunkId,
) -> Result<
impl Iterator<Item = Result<(Segment, WALRecord<T>), io::Error>> + '_,
io::Error,
> {
let file_size = f
.metadata()
.context(|| format!("get file size of {chunk_id}"))?
.len();
let br = io::BufReader::with_capacity(config.read_buffer_size(), f);
Ok(RecordIterator::new(br, file_size, chunk_id))
}
pub(crate) fn read_record(
&self,
segment: Segment,
) -> Result<WALRecord<T>, io::Error> {
let offset = segment.offset().0 - self.global_start();
let size = *segment.size() as usize;
let mut buf = vec![0u8; size];
self.f.read_exact_at(&mut buf, offset)?;
WALRecord::<T>::decode(&buf[..]).context(|| {
format!("decode Record {:?} in {}", segment, self.chunk_id())
})
}
}