use std::io::ErrorKind::UnexpectedEof;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
#[cfg(feature = "zstd")]
use zstd::block::decompress;
use super::*;
pub struct LockFreeLog {
pub(super) iobufs: Arc<IoBufs>,
flusher_shutdown: Arc<AtomicBool>,
flusher_handle: Option<std::thread::JoinHandle<()>>,
}
unsafe impl Send for LockFreeLog {}
unsafe impl Sync for LockFreeLog {}
impl Drop for LockFreeLog {
fn drop(&mut self) {
self.flusher_shutdown.store(
true,
std::sync::atomic::Ordering::SeqCst,
);
if let Some(join_handle) = self.flusher_handle.take() {
join_handle.join().unwrap();
}
}
}
impl LockFreeLog {
pub fn start_system(config: Config) -> LockFreeLog {
let iobufs = Arc::new(IoBufs::new(config.clone()));
let flusher_shutdown = Arc::new(AtomicBool::new(false));
let flusher_handle = config.get_flush_every_ms().map(|flush_every_ms| {
periodic_flusher::flusher(
"log flusher".to_owned(),
iobufs.clone(),
flusher_shutdown.clone(),
flush_every_ms,
).unwrap()
});
LockFreeLog {
iobufs: iobufs,
flusher_shutdown: flusher_shutdown,
flusher_handle: flusher_handle,
}
}
pub fn flush(&self) {
self.iobufs.flush();
}
}
impl Log for LockFreeLog {
fn reserve(&self, buf: Vec<u8>) -> Reservation {
self.iobufs.reserve(buf)
}
fn config(&self) -> &Config {
self.iobufs.config()
}
fn write(&self, buf: Vec<u8>) -> LogID {
self.iobufs.reserve(buf).complete()
}
fn read(&self, id: LogID) -> io::Result<LogRead> {
let start = clock();
let cached_f = self.config().cached_file();
let mut f = cached_f.borrow_mut();
f.seek(SeekFrom::Start(id))?;
let mut valid_buf = [0u8; 1];
f.read_exact(&mut valid_buf)?;
let valid = valid_buf[0] == 1;
let mut len_buf = [0u8; 4];
f.read_exact(&mut len_buf)?;
let len32: u32 = unsafe { std::mem::transmute(len_buf) };
let mut len = len32 as usize;
let max = self.config().get_io_buf_size() - HEADER_LEN;
if len > max {
#[cfg(feature = "log")]
error!("log read invalid message length, {} should be <= {}", len, max);
M.read.measure(clock() - start);
return Ok(LogRead::Corrupted(len));
} else if len == 0 && !valid {
loop {
let mut byte = [0u8; 1];
if let Err(e) = f.read_exact(&mut byte) {
if e.kind() == UnexpectedEof {
break;
}
panic!("{:?}", e);
}
if byte[0] != 1 {
debug_assert_eq!(byte[0], 0);
len += 1;
} else {
break;
}
}
}
if !valid {
M.read.measure(clock() - start);
return Ok(LogRead::Zeroed(len + 5));
}
let mut crc16_buf = [0u8; 2];
f.read_exact(&mut crc16_buf)?;
let mut buf = Vec::with_capacity(len);
unsafe {
buf.set_len(len);
}
f.read_exact(&mut buf)?;
let checksum = crc16_arr(&buf);
if checksum != crc16_buf {
M.read.measure(clock() - start);
return Ok(LogRead::Corrupted(len));
}
#[cfg(feature = "zstd")]
let res = {
if self.config().get_use_compression() {
let start = clock();
let res = Ok(LogRead::Flush(decompress(&*buf, max).unwrap(), len));
M.decompress.measure(clock() - start);
res
} else {
Ok(LogRead::Flush(buf, len))
}
};
#[cfg(not(feature = "zstd"))]
let res = Ok(LogRead::Flush(buf, len));
M.read.measure(clock() - start);
res
}
fn stable_offset(&self) -> LogID {
self.iobufs.stable()
}
fn make_stable(&self, id: LogID) {
let start = clock();
let mut spins = 0;
loop {
self.iobufs.flush();
spins += 1;
if spins > 2_000_000 {
#[cfg(feature = "log")]
debug!("have spun >2000000x in make_stable");
spins = 0;
}
let cur = self.iobufs.stable();
if cur > id {
M.make_stable.measure(clock() - start);
return;
}
}
}
fn punch_hole(&self, id: LogID) {
let cached_f = self.config().cached_file();
let mut f = cached_f.borrow_mut();
punch_hole(&mut f, id).unwrap();
}
}