use std::{io, u64};
use log::{debug, warn};
use crate::{
commit::Commit,
error,
segment::{FileLike, Header, Metadata, Reader, Writer},
Options,
};
mod fs;
#[cfg(test)]
pub mod mem;
pub use fs::Fs;
#[cfg(test)]
pub use mem::Memory;
pub trait Repo: Clone {
type Segment: io::Read + io::Write + FileLike;
fn create_segment(&self, offset: u64) -> io::Result<Self::Segment>;
fn open_segment(&self, offset: u64) -> io::Result<Self::Segment>;
fn remove_segment(&self, offset: u64) -> io::Result<()>;
fn existing_offsets(&self) -> io::Result<Vec<u64>>;
}
pub fn create_segment_writer<R: Repo>(repo: &R, opts: Options, offset: u64) -> io::Result<Writer<R::Segment>> {
let mut storage = repo.create_segment(offset)?;
Header {
log_format_version: opts.log_format_version,
checksum_algorithm: Commit::CHECKSUM_ALGORITHM,
}
.write(&mut storage)?;
storage.fsync()?;
Ok(Writer {
commit: Commit {
min_tx_offset: offset,
n: 0,
records: Vec::new(),
},
inner: io::BufWriter::new(storage),
min_tx_offset: offset,
bytes_written: Header::LEN as u64,
max_records_in_commit: opts.max_records_in_commit,
})
}
pub fn resume_segment_writer<R: Repo>(
repo: &R,
opts: Options,
offset: u64,
) -> io::Result<Result<Writer<R::Segment>, Metadata>> {
let mut storage = repo.open_segment(offset)?;
let Metadata {
header,
tx_range,
size_in_bytes,
} = match Metadata::extract(offset, &mut storage) {
Err(error::SegmentMetadata::InvalidCommit { sofar, source }) => {
warn!("invalid commit in segment {offset}: {source}");
debug!("sofar={sofar:?}");
return Ok(Err(sofar));
}
Err(error::SegmentMetadata::Io(e)) => return Err(e),
Ok(meta) => meta,
};
header
.ensure_compatible(opts.log_format_version, Commit::CHECKSUM_ALGORITHM)
.map_err(|msg| io::Error::new(io::ErrorKind::InvalidData, msg))?;
Ok(Ok(Writer {
commit: Commit {
min_tx_offset: tx_range.end,
n: 0,
records: Vec::new(),
},
inner: io::BufWriter::new(storage),
min_tx_offset: tx_range.start,
bytes_written: size_in_bytes,
max_records_in_commit: opts.max_records_in_commit,
}))
}
pub fn open_segment_reader<R: Repo>(
repo: &R,
max_log_format_version: u8,
offset: u64,
) -> io::Result<Reader<R::Segment>> {
debug!("open segment reader at {offset}");
let storage = repo.open_segment(offset)?;
Reader::new(max_log_format_version, offset, storage)
}