use std::io;
use log::{debug, warn};
use crate::{
commit::Commit,
error,
index::{IndexFile, IndexFileMut},
segment::{FileLike, Header, Metadata, OffsetIndexWriter, Reader, Writer},
Options,
};
pub(crate) mod fs;
#[cfg(any(test, feature = "test"))]
pub mod mem;
pub use fs::Fs;
#[cfg(any(test, feature = "test"))]
pub use mem::Memory;
pub type TxOffset = u64;
pub type TxOffsetIndexMut = IndexFileMut<TxOffset>;
pub type TxOffsetIndex = IndexFile<TxOffset>;
pub trait SegmentLen: io::Seek {
fn segment_len(&mut self) -> io::Result<u64> {
let old_pos = self.stream_position()?;
let len = self.seek(io::SeekFrom::End(0))?;
if old_pos != len {
self.seek(io::SeekFrom::Start(old_pos))?;
}
Ok(len)
}
}
pub trait SegmentReader: io::BufRead + SegmentLen + Send + Sync {}
impl<T: io::BufRead + SegmentLen + Send + Sync> SegmentReader for T {}
pub trait SegmentWriter: FileLike + io::Read + io::Write + SegmentLen + Send + Sync {}
impl<T: FileLike + io::Read + io::Write + SegmentLen + Send + Sync> SegmentWriter for T {}
pub trait Repo: Clone {
type SegmentWriter: SegmentWriter + 'static;
type SegmentReader: SegmentReader + 'static;
fn create_segment(&self, offset: u64) -> io::Result<Self::SegmentWriter>;
fn open_segment_reader(&self, offset: u64) -> io::Result<Self::SegmentReader>;
fn open_segment_writer(&self, offset: u64) -> io::Result<Self::SegmentWriter>;
fn remove_segment(&self, offset: u64) -> io::Result<()>;
fn compress_segment(&self, offset: u64) -> io::Result<()>;
fn existing_offsets(&self) -> io::Result<Vec<u64>>;
fn create_offset_index(&self, _offset: TxOffset, _cap: u64) -> io::Result<TxOffsetIndexMut> {
Err(io::Error::other("not implemented"))
}
fn remove_offset_index(&self, _offset: TxOffset) -> io::Result<()> {
Err(io::Error::other("not implemented"))
}
fn get_offset_index(&self, _offset: TxOffset) -> io::Result<TxOffsetIndex> {
Err(io::Error::other("not implemented"))
}
}
impl<T: Repo> Repo for &T {
type SegmentWriter = T::SegmentWriter;
type SegmentReader = T::SegmentReader;
fn create_segment(&self, offset: u64) -> io::Result<Self::SegmentWriter> {
T::create_segment(self, offset)
}
fn open_segment_reader(&self, offset: u64) -> io::Result<Self::SegmentReader> {
T::open_segment_reader(self, offset)
}
fn open_segment_writer(&self, offset: u64) -> io::Result<Self::SegmentWriter> {
T::open_segment_writer(self, offset)
}
fn remove_segment(&self, offset: u64) -> io::Result<()> {
T::remove_segment(self, offset)
}
fn compress_segment(&self, offset: u64) -> io::Result<()> {
T::compress_segment(self, offset)
}
fn existing_offsets(&self) -> io::Result<Vec<u64>> {
T::existing_offsets(self)
}
fn create_offset_index(&self, offset: TxOffset, cap: u64) -> io::Result<TxOffsetIndexMut> {
T::create_offset_index(self, offset, cap)
}
fn remove_offset_index(&self, offset: TxOffset) -> io::Result<()> {
T::remove_offset_index(self, offset)
}
fn get_offset_index(&self, offset: TxOffset) -> io::Result<TxOffsetIndex> {
T::get_offset_index(self, offset)
}
}
impl<T: SegmentLen> SegmentLen for io::BufReader<T> {
fn segment_len(&mut self) -> io::Result<u64> {
SegmentLen::segment_len(self.get_mut())
}
}
pub(crate) fn create_offset_index_writer<R: Repo>(repo: &R, offset: u64, opts: Options) -> Option<OffsetIndexWriter> {
repo.create_offset_index(offset, opts.offset_index_len())
.map(|index| OffsetIndexWriter::new(index, opts))
.map_err(|e| {
warn!("failed to get offset index for segment {offset}: {e}");
})
.ok()
}
pub fn create_segment_writer<R: Repo>(
repo: &R,
opts: Options,
epoch: u64,
offset: u64,
) -> io::Result<Writer<R::SegmentWriter>> {
let mut storage = repo.create_segment(offset)?;
Header {
log_format_version: opts.log_format_version,
checksum_algorithm: Commit::CHECKSUM_ALGORITHM,
}
.write(&mut storage)?;
storage.fsync()?;
Ok(Writer {
commit: Commit {
min_tx_offset: offset,
n: 0,
records: Vec::new(),
epoch,
},
inner: io::BufWriter::new(storage),
min_tx_offset: offset,
bytes_written: Header::LEN as u64,
max_records_in_commit: opts.max_records_in_commit,
offset_index_head: create_offset_index_writer(repo, offset, opts),
})
}
pub fn resume_segment_writer<R: Repo>(
repo: &R,
opts: Options,
offset: u64,
) -> io::Result<Result<Writer<R::SegmentWriter>, Metadata>> {
let mut storage = repo.open_segment_writer(offset)?;
let offset_index = repo.get_offset_index(offset).ok();
let Metadata {
header,
tx_range,
size_in_bytes,
max_epoch,
max_commit_offset: _,
} = match Metadata::extract(offset, &mut storage, offset_index.as_ref()) {
Err(error::SegmentMetadata::InvalidCommit { sofar, source }) => {
warn!("invalid commit in segment {offset}: {source}");
debug!("sofar={sofar:?}");
return Ok(Err(sofar));
}
Err(error::SegmentMetadata::Io(e)) => return Err(e),
Ok(meta) => meta,
};
header
.ensure_compatible(opts.log_format_version, Commit::CHECKSUM_ALGORITHM)
.map_err(|msg| io::Error::new(io::ErrorKind::InvalidData, msg))?;
if header.log_format_version != opts.log_format_version {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"log format version mismatch: current={} segment={}",
opts.log_format_version, header.log_format_version
),
));
}
Ok(Ok(Writer {
commit: Commit {
min_tx_offset: tx_range.end,
n: 0,
records: Vec::new(),
epoch: max_epoch,
},
inner: io::BufWriter::new(storage),
min_tx_offset: tx_range.start,
bytes_written: size_in_bytes,
max_records_in_commit: opts.max_records_in_commit,
offset_index_head: create_offset_index_writer(repo, offset, opts),
}))
}
pub fn open_segment_reader<R: Repo>(
repo: &R,
max_log_format_version: u8,
offset: u64,
) -> io::Result<Reader<R::SegmentReader>> {
debug!("open segment reader at {offset}");
let storage = repo.open_segment_reader(offset)?;
Reader::new(max_log_format_version, offset, storage)
}