use std::sync::Arc;
use self::reader::LogReader;
use super::*;
#[doc(hidden)]
pub const MSG_HEADER_LEN: usize = 15;
#[doc(hidden)]
pub const SEG_HEADER_LEN: usize = 10;
#[doc(hidden)]
pub const SEG_TRAILER_LEN: usize = 10;
pub struct Log {
iobufs: Arc<IoBufs>,
config: Config,
_flusher: periodic::Periodic<Arc<IoBufs>>,
}
unsafe impl Send for Log {}
unsafe impl Sync for Log {}
impl Log {
pub fn start<R>(
config: Config,
snapshot: Snapshot<R>,
) -> CacheResult<Log, ()> {
let iobufs = Arc::new(IoBufs::start(config.clone(), snapshot)?);
let flusher = periodic::Periodic::new(
"log flusher".to_owned(),
iobufs.clone(),
config.flush_every_ms,
);
Ok(Log {
iobufs: iobufs,
config: config,
_flusher: flusher,
})
}
pub fn start_raw_log(config: Config) -> CacheResult<Log, ()> {
assert_eq!(config.segment_mode, SegmentMode::Linear);
let log_iter = raw_segment_iter_from(0, &config)?;
let snapshot = advance_snapshot::<NullMaterializer, (), ()>(
log_iter,
Snapshot::default(),
&config,
)?;
Log::start::<()>(config, snapshot)
}
pub fn flush(&self) -> CacheResult<(), ()> {
self.iobufs.flush()
}
pub fn reserve(&self, buf: Vec<u8>) -> CacheResult<Reservation, ()> {
self.iobufs.reserve(buf)
}
pub fn write(&self, buf: Vec<u8>) -> CacheResult<(Lsn, LogID), ()> {
self.iobufs.reserve(buf).and_then(|res| res.complete())
}
pub fn iter_from(&self, lsn: Lsn) -> LogIter {
trace!("iterating from lsn {}", lsn);
let io_buf_size = self.config.io_buf_size;
let segment_base_lsn = lsn / io_buf_size as Lsn * io_buf_size as Lsn;
let min_lsn = segment_base_lsn + SEG_HEADER_LEN as Lsn;
let corrected_lsn = std::cmp::max(lsn, min_lsn);
let segment_iter =
self.with_sa(|sa| sa.segment_snapshot_iter_from(corrected_lsn));
LogIter {
config: self.config.clone(),
max_lsn: self.stable_offset(),
cur_lsn: corrected_lsn,
segment_base: None,
segment_iter: segment_iter,
segment_len: io_buf_size,
use_compression: self.config.use_compression,
trailer: None,
}
}
pub fn read(&self, lsn: Lsn, lid: LogID) -> CacheResult<LogRead, ()> {
trace!("reading log lsn {} lid {}", lsn, lid);
self.make_stable(lsn)?;
let f = self.config.file()?;
let read = f.read_message(
lid,
self.config.io_buf_size,
self.config.use_compression,
);
read.and_then(|log_read| match log_read {
LogRead::Flush(read_lsn, _, _) => {
assert_eq!(lsn, read_lsn);
Ok(log_read)
}
_ => Ok(log_read),
}).map_err(|e| e.into())
}
pub fn stable_offset(&self) -> Lsn {
self.iobufs.stable()
}
pub fn make_stable(&self, lsn: Lsn) -> CacheResult<(), ()> {
self.iobufs.make_stable(lsn)
}
pub(in io) fn with_sa<B, F>(&self, f: F) -> B
where F: FnOnce(&mut SegmentAccountant) -> B
{
self.iobufs.with_sa(f)
}
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub(crate) enum MessageKind {
Success,
Failed,
Pad,
Corrupted,
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub(crate) struct MessageHeader {
pub kind: MessageKind,
pub lsn: Lsn,
pub len: usize,
pub crc16: [u8; 2],
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub(crate) struct SegmentHeader {
pub lsn: Lsn,
pub ok: bool,
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub(crate) struct SegmentTrailer {
pub lsn: Lsn,
pub ok: bool,
}
#[doc(hidden)]
#[derive(Debug)]
pub enum LogRead {
Flush(Lsn, Vec<u8>, usize),
Failed(Lsn, usize),
Pad(Lsn),
Corrupted(usize),
}
impl LogRead {
pub fn flush(self) -> Option<(Lsn, Vec<u8>, usize)> {
match self {
LogRead::Flush(lsn, bytes, len) => Some((lsn, bytes, len)),
_ => None,
}
}
pub fn is_flush(&self) -> bool {
match *self {
LogRead::Flush(_, _, _) => true,
_ => false,
}
}
pub fn is_failed(&self) -> bool {
match *self {
LogRead::Failed(_, _) => true,
_ => false,
}
}
pub fn is_pad(&self) -> bool {
match *self {
LogRead::Pad(_) => true,
_ => false,
}
}
pub fn is_corrupt(&self) -> bool {
match *self {
LogRead::Corrupted(_) => true,
_ => false,
}
}
pub fn unwrap(self) -> (Lsn, Vec<u8>, usize) {
match self {
LogRead::Flush(lsn, bytes, len) => (lsn, bytes, len),
_ => panic!("called unwrap on a non-flush LogRead"),
}
}
pub fn expect<'a>(self, msg: &'a str) -> (Lsn, Vec<u8>, usize) {
match self {
LogRead::Flush(lsn, bytes, len) => (lsn, bytes, len),
_ => panic!("{}", msg),
}
}
}
impl From<[u8; MSG_HEADER_LEN]> for MessageHeader {
fn from(buf: [u8; MSG_HEADER_LEN]) -> MessageHeader {
let kind = match buf[0] {
SUCCESSFUL_FLUSH => MessageKind::Success,
FAILED_FLUSH => MessageKind::Failed,
SEGMENT_PAD => MessageKind::Pad,
_ => MessageKind::Corrupted,
};
let lsn_buf = &buf[1..9];
let mut lsn_arr = [0u8; 8];
lsn_arr.copy_from_slice(&*lsn_buf);
let lsn: Lsn = unsafe { std::mem::transmute(lsn_arr) };
let len_buf = &buf[9..13];
let mut len_arr = [0u8; 4];
len_arr.copy_from_slice(&*len_buf);
let len: u32 = unsafe { std::mem::transmute(len_arr) };
let crc16 = [buf[13] ^ 0xFF, buf[14] ^ 0xFF];
MessageHeader {
kind: kind,
lsn: lsn,
len: len as usize,
crc16: crc16,
}
}
}
impl Into<[u8; MSG_HEADER_LEN]> for MessageHeader {
fn into(self) -> [u8; MSG_HEADER_LEN] {
let mut buf = [0u8; MSG_HEADER_LEN];
buf[0] = match self.kind {
MessageKind::Success => SUCCESSFUL_FLUSH,
MessageKind::Failed => FAILED_FLUSH,
MessageKind::Pad => SEGMENT_PAD,
MessageKind::Corrupted => EVIL_BYTE,
};
let lsn_arr: [u8; 8] = unsafe { std::mem::transmute(self.lsn) };
buf[1..9].copy_from_slice(&lsn_arr);
let len_arr: [u8; 4] = unsafe { std::mem::transmute(self.len as u32) };
buf[9..13].copy_from_slice(&len_arr);
buf[13] = self.crc16[0] ^ 0xFF;
buf[14] = self.crc16[1] ^ 0xFF;
buf
}
}
impl From<[u8; SEG_HEADER_LEN]> for SegmentHeader {
fn from(buf: [u8; SEG_HEADER_LEN]) -> SegmentHeader {
let crc16 = [buf[0] ^ 0xFF, buf[1] ^ 0xFF];
let lsn_buf = &buf[2..10];
let mut lsn_arr = [0u8; 8];
lsn_arr.copy_from_slice(&*lsn_buf);
let xor_lsn: Lsn = unsafe { std::mem::transmute(lsn_arr) };
let lsn = xor_lsn ^ 0xFFFF_FFFF;
let crc16_tested = crc16_arr(&lsn_arr);
SegmentHeader {
lsn: lsn,
ok: crc16_tested == crc16,
}
}
}
impl Into<[u8; SEG_HEADER_LEN]> for SegmentHeader {
fn into(self) -> [u8; SEG_HEADER_LEN] {
let mut buf = [0u8; SEG_HEADER_LEN];
let xor_lsn = self.lsn ^ 0xFFFF_FFFF;
let lsn_arr: [u8; 8] = unsafe { std::mem::transmute(xor_lsn) };
buf[2..10].copy_from_slice(&lsn_arr);
let crc16 = crc16_arr(&lsn_arr);
buf[0] = crc16[0] ^ 0xFF;
buf[1] = crc16[1] ^ 0xFF;
buf
}
}
impl From<[u8; SEG_TRAILER_LEN]> for SegmentTrailer {
fn from(buf: [u8; SEG_TRAILER_LEN]) -> SegmentTrailer {
let crc16 = [buf[0] ^ 0xFF, buf[1] ^ 0xFF];
let lsn_buf = &buf[2..10];
let mut lsn_arr = [0u8; 8];
lsn_arr.copy_from_slice(&*lsn_buf);
let xor_lsn: Lsn = unsafe { std::mem::transmute(lsn_arr) };
let lsn = xor_lsn ^ 0xFFFF_FFFF;
let crc16_tested = crc16_arr(&lsn_arr);
SegmentTrailer {
lsn: lsn,
ok: crc16_tested == crc16,
}
}
}
impl Into<[u8; SEG_TRAILER_LEN]> for SegmentTrailer {
fn into(self) -> [u8; SEG_TRAILER_LEN] {
let mut buf = [0u8; SEG_TRAILER_LEN];
let xor_lsn = self.lsn ^ 0xFFFF_FFFF;
let lsn_arr: [u8; 8] = unsafe { std::mem::transmute(xor_lsn) };
buf[2..10].copy_from_slice(&lsn_arr);
let crc16 = crc16_arr(&lsn_arr);
buf[0] = crc16[0] ^ 0xFF;
buf[1] = crc16[1] ^ 0xFF;
buf
}
}