use std::io;
use self::reader::LogReader;
use super::*;
pub struct LogIter {
pub config: Config,
pub segment_iter: Box<dyn Iterator<Item = (Lsn, LogId)>>,
pub segment_base: Option<LogId>,
pub max_lsn: Lsn,
pub cur_lsn: Lsn,
pub trailer: Option<Lsn>,
}
impl Iterator for LogIter {
type Item = (Lsn, DiskPtr, Vec<u8>);
fn next(&mut self) -> Option<Self::Item> {
loop {
let remaining_seg_too_small_for_msg = !valid_entry_offset(
self.cur_lsn as LogId,
self.config.io_buf_size,
);
if self.trailer.is_none() && remaining_seg_too_small_for_msg {
trace!("trailer is none, ending iteration at {}", self.max_lsn);
return None;
} else if self.segment_base.is_none()
|| remaining_seg_too_small_for_msg
{
if let Some((next_lsn, next_lid)) = self.segment_iter.next() {
assert!(
next_lsn + (self.config.io_buf_size as Lsn)
>= self.cur_lsn,
"caller is responsible for providing segments \
that contain the initial cur_lsn value or higher"
);
#[cfg(target_os = "linux")]
self.fadvise_willneed(next_lid);
if let Err(e) = self.read_segment(next_lsn, next_lid) {
debug!(
"hit snap while reading segments in \
iterator: {:?}",
e
);
return None;
}
} else {
trace!("no segments remaining to iterate over");
return None;
}
}
if self.cur_lsn > self.max_lsn {
trace!("hit max_lsn in iterator, stopping");
return None;
}
let lid = self.segment_base.unwrap()
+ (self.cur_lsn % self.config.io_buf_size as Lsn) as LogId;
let f = &self.config.file;
match f.read_message(lid, self.cur_lsn, &self.config) {
Ok(LogRead::Blob(lsn, buf, blob_ptr)) => {
trace!("read blob flush in LogIter::next");
self.cur_lsn += (MSG_HEADER_LEN + BLOB_INLINE_LEN) as Lsn;
return Some((lsn, DiskPtr::Blob(lid, blob_ptr), buf));
}
Ok(LogRead::Inline(lsn, buf, on_disk_len)) => {
trace!("read inline flush in LogIter::next");
self.cur_lsn += (MSG_HEADER_LEN + on_disk_len) as Lsn;
return Some((lsn, DiskPtr::Inline(lid), buf));
}
Ok(LogRead::BatchManifest(last_lsn_in_batch)) => {
if last_lsn_in_batch > self.max_lsn {
return None;
} else {
self.cur_lsn +=
(MSG_HEADER_LEN + BATCH_MANIFEST_INLINE_LEN) as Lsn;
continue;
}
}
Ok(LogRead::Failed(_, on_disk_len)) => {
trace!("read zeroed in LogIter::next");
self.cur_lsn += (MSG_HEADER_LEN + on_disk_len) as Lsn;
}
Ok(LogRead::Corrupted(_len)) => {
trace!(
"read corrupted msg in LogIter::next as lid {} lsn {}",
lid,
self.cur_lsn
);
return None;
}
Ok(LogRead::Pad(_lsn)) => {
if self.trailer.is_none() {
trace!("no segment trailer found, ending iteration");
return None;
}
self.segment_base.take();
self.trailer.take();
continue;
}
Ok(LogRead::DanglingBlob(lsn, blob_ptr)) => {
debug!(
"encountered dangling blob \
pointer at lsn {} ptr {}",
lsn, blob_ptr
);
self.cur_lsn += (MSG_HEADER_LEN + BLOB_INLINE_LEN) as Lsn;
continue;
}
Err(e) => {
debug!(
"failed to read log message at lid {} \
with expected lsn {} during iteration: {}",
lid, self.cur_lsn, e
);
return None;
}
}
}
}
}
impl LogIter {
fn read_segment(&mut self, lsn: Lsn, offset: LogId) -> Result<()> {
trace!(
"LogIter::read_segment lsn: {:?} cur_lsn: {:?}",
lsn,
self.cur_lsn
);
assert!(lsn + self.config.io_buf_size as Lsn >= self.cur_lsn);
let f = &self.config.file;
let segment_header = f.read_segment_header(offset)?;
if offset % self.config.io_buf_size as LogId != 0 {
debug!("segment offset not divisible by segment length");
return Err(Error::Corruption {
at: DiskPtr::Inline(offset),
});
}
if segment_header.lsn % self.config.io_buf_size as Lsn != 0 {
debug!(
"expected a segment header lsn that is divisible \
by the io_buf_size ({}) instead it was {}",
self.config.io_buf_size, segment_header.lsn
);
return Err(Error::Corruption {
at: DiskPtr::Inline(offset),
});
}
if segment_header.lsn != lsn {
debug!(
"segment header lsn ({}) != expected lsn ({})",
segment_header.lsn, lsn
);
return Err(io::Error::new(
io::ErrorKind::Other,
"encountered torn segment",
)
.into());
}
let trailer_offset = offset + self.config.io_buf_size as LogId
- SEG_TRAILER_LEN as LogId;
let trailer_lsn = segment_header.lsn + self.config.io_buf_size as Lsn
- SEG_TRAILER_LEN as Lsn;
trace!("trying to read trailer from {}", trailer_offset);
let segment_trailer = f.read_segment_trailer(trailer_offset);
trace!("read segment header {:?}", segment_header);
trace!("read segment trailer {:?}", segment_trailer);
let trailer_lsn = segment_trailer.ok().and_then(|st| {
if st.ok && st.lsn == trailer_lsn {
Some(st.lsn)
} else {
trace!("segment trailer corrupted, not reading next segment");
None
}
});
self.trailer = trailer_lsn;
self.cur_lsn = segment_header.lsn + SEG_HEADER_LEN as Lsn;
self.segment_base = Some(offset);
Ok(())
}
#[cfg(target_os = "linux")]
fn fadvise_willneed(&self, lid: LogId) {
use std::os::unix::io::AsRawFd;
let f = &self.config.file;
let ret = unsafe {
libc::posix_fadvise(
f.as_raw_fd(),
lid as libc::off_t,
self.config.io_buf_size as libc::off_t,
libc::POSIX_FADV_WILLNEED,
)
};
if ret != 0 {
panic!(
"failed to call fadvise: {}",
std::io::Error::from_raw_os_error(ret)
);
}
}
}
fn valid_entry_offset(lid: LogId, segment_len: usize) -> bool {
let seg_start = lid / segment_len as LogId * segment_len as LogId;
let max_lid = seg_start + segment_len as LogId
- SEG_TRAILER_LEN as LogId
- MSG_HEADER_LEN as LogId;
let min_lid = seg_start + SEG_HEADER_LEN as LogId;
lid >= min_lid && lid <= max_lid
}