pub mod chunk_id;
pub mod closed_chunk;
pub mod open_chunk;
pub mod record_iterator;
use std::fs::File;
use std::fs::OpenOptions;
use std::io;
use std::marker::PhantomData;
use std::os::unix::fs::FileExt;
use std::sync::Arc;
use codeq::Decode;
use codeq::OffsetSize;
use codeq::error_context_ext::ErrorContextExt;
use log::error;
use log::warn;
use record_iterator::RecordIterator;
use crate::Config;
use crate::chunk::chunk_id::ChunkId;
use crate::num::format_pad9_u64;
use crate::types::Segment;
#[derive(Debug, Clone)]
pub struct Chunk<Rec> {
pub(crate) f: Arc<File>,
global_offsets: Vec<u64>,
#[allow(dead_code)]
truncated: Option<u64>,
pub(crate) _p: PhantomData<Rec>,
}
impl<Rec> Chunk<Rec> {
pub fn records_count(&self) -> usize {
self.global_offsets.len() - 1
}
pub fn chunk_id(&self) -> ChunkId {
ChunkId(self.global_offsets[0])
}
pub fn last_segment(&self) -> Segment {
let offsets = &self.global_offsets;
let l = offsets.len();
let start = offsets[l - 2];
let end = offsets[l - 1];
Segment::new(start, end - start)
}
pub fn chunk_size(&self) -> u64 {
self.end_offset()
}
#[allow(dead_code)]
pub fn end_offset(&self) -> u64 {
self.global_offsets[self.global_offsets.len() - 1]
- self.global_offsets[0]
}
pub fn global_start(&self) -> u64 {
self.global_offsets[0]
}
#[allow(dead_code)]
pub fn global_end(&self) -> u64 {
self.global_offsets[self.global_offsets.len() - 1]
}
pub fn record_segment(&self, index: usize) -> Segment {
let start = self.global_offsets[index];
let end = self.global_offsets[index + 1];
Segment::new(start, end - start)
}
pub fn is_truncated(&self) -> bool {
self.truncated.is_some()
}
pub fn truncated_file_size(&self) -> Option<u64> {
self.truncated
}
pub(crate) fn append_record_size(&mut self, size: u64) {
let last = self.global_offsets[self.global_offsets.len() - 1];
self.global_offsets.push(last + size);
}
pub fn open_chunk_file(
config: &Config,
chunk_id: ChunkId,
) -> Result<File, io::Error> {
let path = config.chunk_path(chunk_id);
let f = OpenOptions::new()
.read(true)
.write(true)
.open(path)
.context(|| format!("open {}", chunk_id))?;
Ok(f)
}
}
impl<Rec> Chunk<Rec>
where Rec: Decode + 'static
{
pub(crate) fn open_with_truncate(
config: Arc<Config>,
chunk_id: ChunkId,
allow_truncate: bool,
) -> Result<(Self, Vec<Rec>), io::Error> {
let f = Self::open_chunk_file(&config, chunk_id)?;
let arc_f = Arc::new(f);
let file_size = arc_f.metadata()?.len();
let it = Self::load_records_iter(&config, arc_f.clone(), chunk_id)?;
let mut record_offsets = vec![chunk_id.offset()];
let mut records = Vec::new();
let mut truncate = false;
for res in it {
match res {
Ok((seg, record)) => {
record_offsets.push(chunk_id.offset() + seg.end().0);
records.push(record);
}
Err(io_err) => {
let global_offset = record_offsets.last().copied().unwrap();
truncate = Self::handle_record_error(
io_err,
arc_f.clone(),
global_offset,
chunk_id,
&config,
allow_truncate,
)?;
break;
}
};
}
let truncated = if truncate {
arc_f
.set_len(*record_offsets.last().unwrap() - chunk_id.offset())?;
arc_f.sync_all()?;
Some(file_size)
} else {
None
};
let chunk = Self {
f: arc_f,
global_offsets: record_offsets,
truncated,
_p: Default::default(),
};
Ok((chunk, records))
}
fn handle_record_error(
io_err: io::Error,
file: Arc<File>,
global_offset: u64,
chunk_id: ChunkId,
config: &Config,
allow_truncate: bool,
) -> Result<bool, io::Error> {
let at = format!(
"at offset {} in chunk {}",
format_pad9_u64(global_offset),
chunk_id
);
error!(
"Error reading record {at}: {}, error kind: {:?}; trying to recover...",
io_err,
io_err.kind()
);
let can_truncate =
config.truncate_incomplete_record() && allow_truncate;
if io_err.kind() == io::ErrorKind::UnexpectedEof {
if can_truncate {
warn!("UnexpectedEof {at}; truncating");
return Ok(true);
}
error!("UnexpectedEof {at}; truncate disabled");
return Err(io_err);
}
let all_zero = Self::verify_trailing_zeros(
file,
global_offset - chunk_id.offset(),
chunk_id,
)?;
if all_zero && can_truncate {
warn!("Trailing zeros {at}; truncating");
return Ok(true);
}
if all_zero {
error!("Trailing zeros {at}; truncate disabled");
} else {
error!("Damaged record({}) {at}", io_err);
}
Err(io_err)
}
fn verify_trailing_zeros(
file: Arc<File>,
mut start_offset: u64,
chunk_id: ChunkId,
) -> Result<bool, io::Error> {
let file_size = file.metadata()?.len();
if start_offset > file_size {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!(
"Start offset {} exceeds file size {}",
start_offset, file_size
),
));
}
if file_size == start_offset {
return Ok(true);
}
const WARN_THRESHOLD: u64 = 64 * 1024; if file_size - start_offset > WARN_THRESHOLD {
warn!(
"Large maybe damaged section detected: {} bytes to the end; in chunk {}",
file_size - start_offset,
chunk_id
);
}
const READ_CHUNK_SIZE: usize = 1024; let mut buffer = vec![0u8; READ_CHUNK_SIZE];
loop {
let n = file.read_at(&mut buffer, start_offset)?;
if n == 0 {
break;
}
for (i, byt) in buffer.iter().enumerate().take(n) {
if *byt != 0 {
error!(
"Non-zero byte detected at offset {} in chunk {}",
start_offset + i as u64,
chunk_id
);
return Ok(false);
}
}
start_offset += n as u64;
}
Ok(true)
}
#[allow(clippy::type_complexity)]
pub fn dump(
config: &Config,
chunk_id: ChunkId,
) -> Result<Vec<Result<(Segment, Rec), io::Error>>, io::Error> {
let f = Self::open_chunk_file(config, chunk_id)?;
let it = Self::load_records_iter(config, Arc::new(f), chunk_id)?;
Ok(it.collect::<Vec<_>>())
}
pub fn load_records_iter(
config: &Config,
f: Arc<File>,
chunk_id: ChunkId,
) -> Result<
impl Iterator<Item = Result<(Segment, Rec), io::Error>> + '_,
io::Error,
> {
let file_size = f
.metadata()
.context(|| format!("get file size of {chunk_id}"))?
.len();
let br = io::BufReader::with_capacity(config.read_buffer_size(), f);
Ok(RecordIterator::new(br, file_size, chunk_id))
}
pub fn read_record(&self, segment: Segment) -> Result<Rec, io::Error> {
#[cfg(debug_assertions)]
self.debug_assert_valid_segment(segment);
let offset = segment.offset().0 - self.global_start();
let size = *segment.size() as usize;
let mut buf = vec![0u8; size];
self.f.read_exact_at(&mut buf, offset)?;
Rec::decode(&buf[..]).context(|| {
format!("decode Record {:?} in {}", segment, self.chunk_id())
})
}
#[cfg(debug_assertions)]
fn debug_assert_valid_segment(&self, segment: Segment) {
let start = segment.offset().0;
let end = segment.end().0;
let found = self
.global_offsets
.binary_search(&start)
.is_ok_and(|i| self.global_offsets.get(i + 1) == Some(&end));
debug_assert!(
found,
"segment {:?} is not in {}",
segment,
self.chunk_id()
);
}
}
#[cfg(test)]
mod tests {
use std::fs::File;
use std::io;
use std::io::Write;
use std::sync::Arc;
use crate::Chunk;
use crate::ChunkId;
use crate::Config;
use crate::Segment;
fn temp_file(bytes: &[u8]) -> Result<Arc<File>, io::Error> {
let mut file = tempfile::tempfile()?;
file.write_all(bytes)?;
Ok(Arc::new(file))
}
fn config(truncate_incomplete_record: bool) -> Config {
let mut config = Config::new("unused");
config.truncate_incomplete_record = Some(truncate_incomplete_record);
config
}
#[cfg(debug_assertions)]
#[test]
#[should_panic(expected = "is not in ChunkId")]
fn test_read_record_rejects_unknown_segment() {
let chunk = Chunk::<String> {
f: temp_file(&[]).unwrap(),
global_offsets: vec![10, 20],
truncated: None,
_p: Default::default(),
};
let _ = chunk.read_record(Segment::new(11, 1));
}
#[test]
fn test_verify_trailing_zeros() -> Result<(), io::Error> {
let file = temp_file(&[0, 0, 0, 0])?;
assert!(Chunk::<String>::verify_trailing_zeros(
file.clone(),
2,
ChunkId(0)
)?);
assert!(Chunk::<String>::verify_trailing_zeros(
file.clone(),
4,
ChunkId(0)
)?);
let err =
Chunk::<String>::verify_trailing_zeros(file.clone(), 5, ChunkId(0))
.unwrap_err();
assert_eq!(io::ErrorKind::InvalidInput, err.kind());
assert_eq!("Start offset 5 exceeds file size 4", err.to_string());
let file = temp_file(&[0, 0, 7, 0])?;
assert!(!Chunk::<String>::verify_trailing_zeros(
file,
0,
ChunkId(0)
)?);
Ok(())
}
#[test]
fn test_verify_trailing_zeros_accepts_large_zero_tail()
-> Result<(), io::Error> {
let file = temp_file(&vec![0; 64 * 1024 + 1])?;
assert!(Chunk::<String>::verify_trailing_zeros(file, 0, ChunkId(0))?);
Ok(())
}
#[test]
fn test_handle_record_error_for_unexpected_eof() -> Result<(), io::Error> {
let file = temp_file(&[])?;
let err = io::Error::new(io::ErrorKind::UnexpectedEof, "short record");
assert!(Chunk::<String>::handle_record_error(
err,
file.clone(),
0,
ChunkId(0),
&config(true),
true,
)?);
let err = io::Error::new(io::ErrorKind::UnexpectedEof, "short record");
let got = Chunk::<String>::handle_record_error(
err,
file,
0,
ChunkId(0),
&config(false),
true,
)
.unwrap_err();
assert_eq!(io::ErrorKind::UnexpectedEof, got.kind());
Ok(())
}
#[test]
fn test_handle_record_error_for_trailing_zeros() -> Result<(), io::Error> {
let file = temp_file(&[0, 0])?;
let err = io::Error::new(io::ErrorKind::InvalidData, "bad record");
assert!(Chunk::<String>::handle_record_error(
err,
file.clone(),
0,
ChunkId(0),
&config(true),
true,
)?);
let err = io::Error::new(io::ErrorKind::InvalidData, "bad record");
let got = Chunk::<String>::handle_record_error(
err,
file,
0,
ChunkId(0),
&config(false),
true,
)
.unwrap_err();
assert_eq!(io::ErrorKind::InvalidData, got.kind());
Ok(())
}
#[test]
fn test_handle_record_error_rejects_non_zero_tail() -> Result<(), io::Error>
{
let file = temp_file(&[0, 1])?;
let err = io::Error::new(io::ErrorKind::InvalidData, "bad record");
let got = Chunk::<String>::handle_record_error(
err,
file,
0,
ChunkId(0),
&config(true),
true,
)
.unwrap_err();
assert_eq!(io::ErrorKind::InvalidData, got.kind());
Ok(())
}
#[test]
fn test_read_record_adds_decode_context() -> Result<(), io::Error> {
let chunk = Chunk::<String> {
f: temp_file(&[0, 0, 0])?,
global_offsets: vec![12, 15],
truncated: None,
_p: Default::default(),
};
let err = chunk.read_record(Segment::new(12, 3)).unwrap_err();
assert_eq!(io::ErrorKind::UnexpectedEof, err.kind());
assert!(err.to_string().contains("decode Record"));
assert!(err.to_string().contains("ChunkId"));
Ok(())
}
}