use std::io;
use self::reader::LogReader;
use super::*;
pub struct LogIter {
pub config: Config,
pub segment_iter: Box<dyn Iterator<Item = (Lsn, LogId)>>,
pub segment_base: Option<LogId>,
pub segment_len: usize,
pub use_compression: bool,
pub max_lsn: Lsn,
pub cur_lsn: Lsn,
pub trailer: Option<Lsn>,
}
impl Iterator for LogIter {
type Item = (Lsn, DiskPtr, Vec<u8>);
fn next(&mut self) -> Option<Self::Item> {
loop {
let remaining_seg_too_small_for_msg = !valid_entry_offset(
self.cur_lsn as LogId,
self.segment_len,
);
if self.trailer.is_none()
&& remaining_seg_too_small_for_msg
{
return None;
} else if self.segment_base.is_none()
|| remaining_seg_too_small_for_msg
{
if let Some((next_lsn, next_lid)) =
self.segment_iter.next()
{
assert!(
next_lsn + (self.segment_len as Lsn) >= self.cur_lsn,
"caller is responsible for providing segments \
that contain the initial cur_lsn value or higher"
);
#[cfg(target_os = "linux")]
self.fadvise_willneed(next_lid);
if let Err(e) =
self.read_segment(next_lsn, next_lid)
{
debug!(
"hit snap while reading segments in \
iterator: {:?}",
e
);
return None;
}
} else {
return None;
}
}
if self.cur_lsn > self.max_lsn {
return None;
}
let lid = self.segment_base.unwrap()
+ (self.cur_lsn % self.segment_len as Lsn) as LogId;
if let Ok(f) = self.config.file() {
match f.read_message(lid, &self.config) {
Ok(LogRead::Blob(lsn, buf, blob_ptr)) => {
if lsn != self.cur_lsn {
error!("read Flush with bad lsn");
return None;
}
trace!("read blob flush in LogIter::next");
self.cur_lsn +=
(MSG_HEADER_LEN + BLOB_INLINE_LEN) as Lsn;
return Some((
lsn,
DiskPtr::Blob(lid, blob_ptr),
buf,
));
}
Ok(LogRead::Inline(lsn, buf, on_disk_len)) => {
if lsn != self.cur_lsn {
error!("read Flush with bad lsn");
return None;
}
trace!("read inline flush in LogIter::next");
self.cur_lsn +=
(MSG_HEADER_LEN + on_disk_len) as Lsn;
return Some((lsn, DiskPtr::Inline(lid), buf));
}
Ok(LogRead::Failed(lsn, on_disk_len)) => {
if lsn != self.cur_lsn {
error!("read Failed with bad lsn");
return None;
}
trace!("read zeroed in LogIter::next");
self.cur_lsn +=
(MSG_HEADER_LEN + on_disk_len) as Lsn;
}
Ok(LogRead::Corrupted(_len)) => {
trace!("read corrupted end in LogIter::next");
return None;
}
Ok(LogRead::Pad(lsn)) => {
if lsn != self.cur_lsn {
error!("read Pad with bad lsn");
return None;
}
if self.trailer.is_none() {
return None;
}
self.segment_base.take();
self.trailer.take();
continue;
}
Ok(LogRead::DanglingBlob(lsn, blob_ptr)) => {
debug!(
"encountered dangling blob \
pointer at lsn {} ptr {}",
lsn, blob_ptr
);
self.cur_lsn +=
(MSG_HEADER_LEN + BLOB_INLINE_LEN) as Lsn;
continue;
}
Err(e) => {
error!(
"failed to read log message at lid {} \
with expected lsn {} during iteration: {}",
lid,
self.cur_lsn,
e
);
return None;
}
}
} else {
return None;
}
}
}
}
impl LogIter {
fn read_segment(
&mut self,
lsn: Lsn,
offset: LogId,
) -> Result<(), ()> {
trace!(
"LogIter::read_segment lsn: {:?} cur_lsn: {:?}",
lsn,
self.cur_lsn
);
assert!(lsn + self.segment_len as Lsn >= self.cur_lsn);
let f = self.config.file()?;
let segment_header = f.read_segment_header(offset)?;
if offset % self.segment_len as LogId != 0 {
debug!("segment offset not divisible by segment length");
return Err(Error::Corruption {
at: DiskPtr::Inline(offset),
});
}
if segment_header.lsn % self.segment_len as Lsn != 0 {
debug!(
"expected a segment header lsn that is divisible \
by the io_buf_size ({}) instead it was {}",
self.segment_len, segment_header.lsn
);
return Err(Error::Corruption {
at: DiskPtr::Inline(offset),
});
}
if segment_header.lsn != lsn {
error!(
"segment header lsn ({}) != expected lsn ({})",
segment_header.lsn, lsn
);
return Err(io::Error::new(
io::ErrorKind::Other,
"encountered torn segment",
).into());
}
let trailer_offset = offset + self.segment_len as LogId
- SEG_TRAILER_LEN as LogId;
let trailer_lsn = segment_header.lsn
+ self.segment_len as Lsn
- SEG_TRAILER_LEN as Lsn;
trace!("trying to read trailer from {}", trailer_offset);
let segment_trailer = f.read_segment_trailer(trailer_offset);
trace!("read segment header {:?}", segment_header);
trace!("read segment trailer {:?}", segment_trailer);
let trailer_lsn = segment_trailer.ok().and_then(|st| {
if st.ok && st.lsn == trailer_lsn {
Some(st.lsn)
} else {
None
}
});
self.trailer = trailer_lsn;
self.cur_lsn = segment_header.lsn + SEG_HEADER_LEN as Lsn;
self.segment_base = Some(offset);
Ok(())
}
#[cfg(target_os = "linux")]
fn fadvise_willneed(&self, lid: LogId) {
use std::os::unix::io::AsRawFd;
if let Ok(f) = self.config.file() {
let ret = unsafe {
libc::posix_fadvise(
f.as_raw_fd(),
lid as libc::off_t,
self.config.io_buf_size as libc::off_t,
libc::POSIX_FADV_WILLNEED,
)
};
if ret != 0 {
panic!(
"failed to call fadvise: {}",
std::io::Error::from_raw_os_error(ret)
);
}
}
}
}
fn valid_entry_offset(lid: LogId, segment_len: usize) -> bool {
let seg_start = lid / segment_len as LogId * segment_len as LogId;
let max_lid = seg_start + segment_len as LogId
- SEG_TRAILER_LEN as LogId
- MSG_HEADER_LEN as LogId;
let min_lid = seg_start + SEG_HEADER_LEN as LogId;
lid >= min_lid && lid <= max_lid
}