use std::sync::Arc;
use super::*;
pub struct Log {
iobufs: Arc<IoBufs>,
config: Config,
_flusher: periodic::Periodic<Arc<IoBufs>>,
}
unsafe impl Send for Log {}
unsafe impl Sync for Log {}
impl Log {
pub fn start<R>(
config: Config,
snapshot: Snapshot<R>,
) -> Result<Log, ()> {
let iobufs =
Arc::new(IoBufs::start(config.clone(), snapshot)?);
let flusher = periodic::Periodic::new(
"log flusher".to_owned(),
iobufs.clone(),
config.flush_every_ms,
);
Ok(Log {
iobufs: iobufs,
config: config,
_flusher: flusher,
})
}
pub fn start_raw_log(config: Config) -> Result<Log, ()> {
assert_eq!(config.segment_mode, SegmentMode::Linear);
let log_iter = raw_segment_iter_from(0, &config)?;
let snapshot = advance_snapshot::<NullMaterializer, (), ()>(
log_iter,
Snapshot::default(),
&config,
)?;
Log::start::<()>(config, snapshot)
}
pub fn flush(&self) -> Result<(), ()> {
self.iobufs.flush()
}
pub fn reserve(
&self,
buf: Vec<u8>,
) -> Result<Reservation<'_>, ()> {
self.iobufs.reserve(buf)
}
pub(super) fn reserve_blob(
&self,
blob_ptr: BlobPointer,
) -> Result<Reservation<'_>, ()> {
self.iobufs.reserve_blob(blob_ptr)
}
pub fn write(&self, buf: Vec<u8>) -> Result<(Lsn, DiskPtr), ()> {
self.iobufs.reserve(buf).and_then(|res| res.complete())
}
pub fn iter_from(&self, lsn: Lsn) -> LogIter {
trace!("iterating from lsn {}", lsn);
let io_buf_size = self.config.io_buf_size;
let segment_base_lsn =
lsn / io_buf_size as Lsn * io_buf_size as Lsn;
let min_lsn = segment_base_lsn + SEG_HEADER_LEN as Lsn;
let corrected_lsn = std::cmp::max(lsn, min_lsn);
let segment_iter = self.with_sa(|sa| {
sa.segment_snapshot_iter_from(corrected_lsn)
});
LogIter {
config: self.config.clone(),
max_lsn: self.stable_offset(),
cur_lsn: corrected_lsn,
segment_base: None,
segment_iter: segment_iter,
segment_len: io_buf_size,
use_compression: self.config.use_compression,
trailer: None,
}
}
pub fn read(
&self,
lsn: Lsn,
ptr: DiskPtr,
) -> Result<LogRead, ()> {
trace!("reading log lsn {} ptr {}", lsn, ptr);
self.make_stable(lsn)?;
if ptr.is_inline() {
let lid = ptr.inline();
let f = self.config.file()?;
let read = f.read_message(lid, &self.config);
read.and_then(|log_read| match log_read {
LogRead::Inline(read_lsn, _, _)
| LogRead::Blob(read_lsn, _, _) => {
if lsn != read_lsn {
Err(Error::Corruption { at: ptr })
} else {
Ok(log_read)
}
}
_ => Ok(log_read),
}).map_err(|e| e.into())
} else {
let (_lid, blob_ptr) = ptr.blob();
read_blob(blob_ptr, &self.config)
.map(|buf| LogRead::Blob(lsn, buf, blob_ptr))
}
}
pub fn stable_offset(&self) -> Lsn {
self.iobufs.stable()
}
pub fn make_stable(&self, lsn: Lsn) -> Result<(), ()> {
self.iobufs.make_stable(lsn)
}
pub(crate) fn with_sa<B, F>(&self, f: F) -> B
where
F: FnOnce(&mut SegmentAccountant) -> B,
{
self.iobufs.with_sa(f)
}
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub(crate) enum MessageKind {
Inline,
Blob,
Failed,
Pad,
Corrupted,
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub(crate) struct MessageHeader {
pub(crate) kind: MessageKind,
pub(crate) lsn: Lsn,
pub(crate) len: usize,
pub(crate) crc16: [u8; 2],
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub(crate) struct SegmentHeader {
pub(crate) lsn: Lsn,
pub(crate) ok: bool,
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub(crate) struct SegmentTrailer {
pub(crate) lsn: Lsn,
pub(crate) ok: bool,
}
#[doc(hidden)]
#[derive(Debug)]
pub enum LogRead {
Inline(Lsn, Vec<u8>, usize),
Blob(Lsn, Vec<u8>, BlobPointer),
Failed(Lsn, usize),
Pad(Lsn),
Corrupted(usize),
DanglingBlob(Lsn, BlobPointer),
}
impl LogRead {
pub fn inline(self) -> Option<(Lsn, Vec<u8>, usize)> {
match self {
LogRead::Inline(lsn, bytes, len) => {
Some((lsn, bytes, len))
}
_ => None,
}
}
pub fn is_inline(&self) -> bool {
match *self {
LogRead::Inline(_, _, _) => true,
_ => false,
}
}
pub fn blob(self) -> Option<(Lsn, Vec<u8>, BlobPointer)> {
match self {
LogRead::Blob(lsn, buf, ptr) => Some((lsn, buf, ptr)),
_ => None,
}
}
pub fn is_blob(&self) -> bool {
match self {
LogRead::Blob(..) => true,
_ => false,
}
}
pub fn is_failed(&self) -> bool {
match *self {
LogRead::Failed(_, _) => true,
_ => false,
}
}
pub fn is_successful(&self) -> bool {
match *self {
LogRead::Inline(_, _, _) | LogRead::Blob(_, _, _) => true,
_ => false,
}
}
pub fn is_pad(&self) -> bool {
match *self {
LogRead::Pad(_) => true,
_ => false,
}
}
pub fn is_corrupt(&self) -> bool {
match *self {
LogRead::Corrupted(_) => true,
_ => false,
}
}
pub fn into_data(self) -> Option<Vec<u8>> {
match self {
LogRead::Blob(_, buf, _) | LogRead::Inline(_, buf, _) => {
Some(buf)
}
_ => None,
}
}
}
impl From<[u8; MSG_HEADER_LEN]> for MessageHeader {
fn from(buf: [u8; MSG_HEADER_LEN]) -> MessageHeader {
let kind = match buf[0] {
INLINE_FLUSH => MessageKind::Inline,
BLOB_FLUSH => MessageKind::Blob,
FAILED_FLUSH => MessageKind::Failed,
SEGMENT_PAD => MessageKind::Pad,
_ => MessageKind::Corrupted,
};
let lsn_buf = &buf[1..9];
let mut lsn_arr = [0u8; 8];
lsn_arr.copy_from_slice(&*lsn_buf);
let lsn = arr_to_u64(lsn_arr) as Lsn;
let len_buf = &buf[9..13];
let mut len_arr = [0u8; 4];
len_arr.copy_from_slice(&*len_buf);
let len = arr_to_u32(len_arr);
let crc16 = [buf[13] ^ 0xFF, buf[14] ^ 0xFF];
MessageHeader {
kind: kind,
lsn: lsn,
len: len as usize,
crc16: crc16,
}
}
}
impl Into<[u8; MSG_HEADER_LEN]> for MessageHeader {
fn into(self) -> [u8; MSG_HEADER_LEN] {
let mut buf = [0u8; MSG_HEADER_LEN];
buf[0] = match self.kind {
MessageKind::Inline => INLINE_FLUSH,
MessageKind::Blob => BLOB_FLUSH,
MessageKind::Failed => FAILED_FLUSH,
MessageKind::Pad => SEGMENT_PAD,
MessageKind::Corrupted => EVIL_BYTE,
};
let lsn_arr = u64_to_arr(self.lsn as u64);
buf[1..9].copy_from_slice(&lsn_arr);
assert!(self.len <= std::u32::MAX as usize);
let len_arr = u32_to_arr(self.len as u32);
buf[9..13].copy_from_slice(&len_arr);
buf[13] = self.crc16[0] ^ 0xFF;
buf[14] = self.crc16[1] ^ 0xFF;
buf
}
}
impl From<[u8; SEG_HEADER_LEN]> for SegmentHeader {
fn from(buf: [u8; SEG_HEADER_LEN]) -> SegmentHeader {
let crc16 = [buf[0] ^ 0xFF, buf[1] ^ 0xFF];
let lsn_buf = &buf[2..10];
let mut lsn_arr = [0u8; 8];
lsn_arr.copy_from_slice(&*lsn_buf);
let xor_lsn = arr_to_u64(lsn_arr) as Lsn;
let lsn = xor_lsn ^ 0xFFFF_FFFF;
let crc16_tested = crc16_arr(&lsn_arr);
SegmentHeader {
lsn: lsn,
ok: crc16_tested == crc16,
}
}
}
impl Into<[u8; SEG_HEADER_LEN]> for SegmentHeader {
fn into(self) -> [u8; SEG_HEADER_LEN] {
let mut buf = [0u8; SEG_HEADER_LEN];
let xor_lsn = self.lsn ^ 0xFFFF_FFFF;
let lsn_arr = u64_to_arr(xor_lsn as u64);
buf[2..10].copy_from_slice(&lsn_arr);
let crc16 = crc16_arr(&lsn_arr);
buf[0] = crc16[0] ^ 0xFF;
buf[1] = crc16[1] ^ 0xFF;
buf
}
}
impl From<[u8; SEG_TRAILER_LEN]> for SegmentTrailer {
fn from(buf: [u8; SEG_TRAILER_LEN]) -> SegmentTrailer {
let crc16 = [buf[0] ^ 0xFF, buf[1] ^ 0xFF];
let lsn_buf = &buf[2..10];
let mut lsn_arr = [0u8; 8];
lsn_arr.copy_from_slice(&*lsn_buf);
let xor_lsn = arr_to_u64(lsn_arr) as Lsn;
let lsn = xor_lsn ^ 0xFFFF_FFFF;
let crc16_tested = crc16_arr(&lsn_arr);
SegmentTrailer {
lsn: lsn,
ok: crc16_tested == crc16,
}
}
}
impl Into<[u8; SEG_TRAILER_LEN]> for SegmentTrailer {
fn into(self) -> [u8; SEG_TRAILER_LEN] {
let mut buf = [0u8; SEG_TRAILER_LEN];
let xor_lsn = self.lsn ^ 0xFFFF_FFFF;
let lsn_arr = u64_to_arr(xor_lsn as u64);
buf[2..10].copy_from_slice(&lsn_arr);
let crc16 = crc16_arr(&lsn_arr);
buf[0] = crc16[0] ^ 0xFF;
buf[1] = crc16[1] ^ 0xFF;
buf
}
}