use super::*;
pub struct Log {
pub(super) iobufs: IoBufs,
config: Config,
_flusher: Option<flusher::Flusher>,
}
unsafe impl Send for Log {}
impl Log {
pub fn start<R>(
config: Config,
snapshot: Snapshot<R>,
) -> Result<Log, ()> {
let iobufs = IoBufs::start(config.clone(), snapshot)?;
let iobufs_flusher = iobufs.clone();
let flusher = config.flush_every_ms.map(move |fem| {
flusher::Flusher::new(
"log flusher".to_owned(),
iobufs_flusher,
fem,
)
});
Ok(Log {
iobufs,
config,
_flusher: flusher,
})
}
pub fn start_raw_log(config: Config) -> Result<Log, ()> {
assert_eq!(config.segment_mode, SegmentMode::Linear);
let log_iter = raw_segment_iter_from(0, &config)?;
let snapshot = advance_snapshot::<NullMaterializer, (), ()>(
log_iter,
Snapshot::default(),
&config,
)?;
Log::start::<()>(config, snapshot)
}
pub fn flush(&self) -> Result<(), ()> {
self.iobufs.flush()
}
pub fn reserve(
&self,
buf: Vec<u8>,
) -> Result<Reservation<'_>, ()> {
self.iobufs.reserve(buf)
}
pub(super) fn reserve_blob(
&self,
blob_ptr: BlobPointer,
) -> Result<Reservation<'_>, ()> {
self.iobufs.reserve_blob(blob_ptr)
}
pub fn write(&self, buf: Vec<u8>) -> Result<(Lsn, DiskPtr), ()> {
self.reserve(buf).and_then(|res| res.complete())
}
pub fn iter_from(&self, lsn: Lsn) -> LogIter {
self.iobufs.iter_from(lsn)
}
pub fn read(
&self,
lsn: Lsn,
ptr: DiskPtr,
) -> Result<LogRead, ()> {
trace!("reading log lsn {} ptr {}", lsn, ptr);
self.make_stable(lsn)?;
if ptr.is_inline() {
let lid = ptr.inline();
let f = &self.config.file;
let read = f.read_message(lid, &self.config);
read.and_then(|log_read| match log_read {
LogRead::Inline(read_lsn, _, _)
| LogRead::Blob(read_lsn, _, _) => {
if lsn != read_lsn {
debug!(
"lsn of disk read is {} but we expected it to be {}",
read_lsn,
lsn,
);
Err(Error::Corruption { at: ptr })
} else {
Ok(log_read)
}
}
_ => Ok(log_read),
})
} else {
let (_lid, blob_ptr) = ptr.blob();
read_blob(blob_ptr, &self.config)
.map(|buf| LogRead::Blob(lsn, buf, blob_ptr))
}
}
pub fn stable_offset(&self) -> Lsn {
self.iobufs.stable()
}
pub fn make_stable(&self, lsn: Lsn) -> Result<(), ()> {
self.iobufs.make_stable(lsn)
}
pub(crate) fn with_sa<B, F>(&self, f: F) -> B
where
F: FnOnce(&mut SegmentAccountant) -> B,
{
self.iobufs.with_sa(f)
}
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub(crate) enum MessageKind {
Inline,
Blob,
Failed,
Pad,
Corrupted,
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub(crate) struct MessageHeader {
pub(crate) kind: MessageKind,
pub(crate) lsn: Lsn,
pub(crate) len: usize,
pub(crate) crc16: [u8; 2],
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub(crate) struct SegmentHeader {
pub(crate) lsn: Lsn,
pub(crate) ok: bool,
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub(crate) struct SegmentTrailer {
pub(crate) lsn: Lsn,
pub(crate) ok: bool,
}
#[doc(hidden)]
#[derive(Debug)]
pub enum LogRead {
Inline(Lsn, Vec<u8>, usize),
Blob(Lsn, Vec<u8>, BlobPointer),
Failed(Lsn, usize),
Pad(Lsn),
Corrupted(usize),
DanglingBlob(Lsn, BlobPointer),
}
impl LogRead {
pub fn inline(self) -> Option<(Lsn, Vec<u8>, usize)> {
match self {
LogRead::Inline(lsn, bytes, len) => {
Some((lsn, bytes, len))
}
_ => None,
}
}
pub fn is_inline(&self) -> bool {
match *self {
LogRead::Inline(_, _, _) => true,
_ => false,
}
}
pub fn blob(self) -> Option<(Lsn, Vec<u8>, BlobPointer)> {
match self {
LogRead::Blob(lsn, buf, ptr) => Some((lsn, buf, ptr)),
_ => None,
}
}
pub fn is_blob(&self) -> bool {
match self {
LogRead::Blob(..) => true,
_ => false,
}
}
pub fn is_failed(&self) -> bool {
match *self {
LogRead::Failed(_, _) => true,
_ => false,
}
}
pub fn is_successful(&self) -> bool {
match *self {
LogRead::Inline(_, _, _) | LogRead::Blob(_, _, _) => true,
_ => false,
}
}
pub fn is_pad(&self) -> bool {
match *self {
LogRead::Pad(_) => true,
_ => false,
}
}
pub fn is_corrupt(&self) -> bool {
match *self {
LogRead::Corrupted(_) => true,
_ => false,
}
}
pub fn into_data(self) -> Option<Vec<u8>> {
match self {
LogRead::Blob(_, buf, _) | LogRead::Inline(_, buf, _) => {
Some(buf)
}
_ => None,
}
}
}
impl From<[u8; MSG_HEADER_LEN]> for MessageHeader {
fn from(buf: [u8; MSG_HEADER_LEN]) -> MessageHeader {
let kind = match buf[0] {
INLINE_FLUSH => MessageKind::Inline,
BLOB_FLUSH => MessageKind::Blob,
FAILED_FLUSH => MessageKind::Failed,
SEGMENT_PAD => MessageKind::Pad,
_ => MessageKind::Corrupted,
};
let lsn = arr_to_u64(&buf[1..9]) as Lsn;
let len = arr_to_u32(&buf[9..13]);
let crc16 = [buf[13] ^ 0xFF, buf[14] ^ 0xFF];
MessageHeader {
kind,
lsn,
len: len as usize,
crc16,
}
}
}
impl Into<[u8; MSG_HEADER_LEN]> for MessageHeader {
fn into(self) -> [u8; MSG_HEADER_LEN] {
let mut buf = [0u8; MSG_HEADER_LEN];
buf[0] = match self.kind {
MessageKind::Inline => INLINE_FLUSH,
MessageKind::Blob => BLOB_FLUSH,
MessageKind::Failed => FAILED_FLUSH,
MessageKind::Pad => SEGMENT_PAD,
MessageKind::Corrupted => EVIL_BYTE,
};
let lsn_arr = u64_to_arr(self.lsn as u64);
unsafe {
std::ptr::copy_nonoverlapping(
lsn_arr.as_ptr(),
buf[1..].as_mut_ptr(),
std::mem::size_of::<u64>(),
);
}
assert!(self.len <= std::u32::MAX as usize);
let len_arr = u32_to_arr(self.len as u32);
unsafe {
std::ptr::copy_nonoverlapping(
len_arr.as_ptr(),
buf[9..].as_mut_ptr(),
std::mem::size_of::<u32>(),
);
}
buf[13] = self.crc16[0] ^ 0xFF;
buf[14] = self.crc16[1] ^ 0xFF;
buf
}
}
impl From<[u8; SEG_HEADER_LEN]> for SegmentHeader {
fn from(buf: [u8; SEG_HEADER_LEN]) -> SegmentHeader {
let crc16 = [buf[0] ^ 0xFF, buf[1] ^ 0xFF];
let xor_lsn = arr_to_u64(&buf[2..10]) as Lsn;
let lsn = xor_lsn ^ 0xFFFF_FFFF;
let crc16_tested = crc16_arr(&buf[2..10]);
SegmentHeader {
lsn,
ok: crc16_tested == crc16,
}
}
}
impl Into<[u8; SEG_HEADER_LEN]> for SegmentHeader {
fn into(self) -> [u8; SEG_HEADER_LEN] {
let mut buf = [0u8; SEG_HEADER_LEN];
let xor_lsn = self.lsn ^ 0xFFFF_FFFF;
let lsn_arr = u64_to_arr(xor_lsn as u64);
unsafe {
std::ptr::copy_nonoverlapping(
lsn_arr.as_ptr(),
buf[2..].as_mut_ptr(),
std::mem::size_of::<u64>(),
);
}
let crc16 = crc16_arr(&lsn_arr);
buf[0] = crc16[0] ^ 0xFF;
buf[1] = crc16[1] ^ 0xFF;
buf
}
}
impl From<[u8; SEG_TRAILER_LEN]> for SegmentTrailer {
fn from(buf: [u8; SEG_TRAILER_LEN]) -> SegmentTrailer {
let crc16 = [buf[0] ^ 0xFF, buf[1] ^ 0xFF];
let xor_lsn = arr_to_u64(&buf[2..10]) as Lsn;
let lsn = xor_lsn ^ 0xFFFF_FFFF;
let crc16_tested = crc16_arr(&buf[2..10]);
SegmentTrailer {
lsn,
ok: crc16_tested == crc16,
}
}
}
impl Into<[u8; SEG_TRAILER_LEN]> for SegmentTrailer {
fn into(self) -> [u8; SEG_TRAILER_LEN] {
let mut buf = [0u8; SEG_TRAILER_LEN];
let xor_lsn = self.lsn ^ 0xFFFF_FFFF;
let lsn_arr = u64_to_arr(xor_lsn as u64);
unsafe {
std::ptr::copy_nonoverlapping(
lsn_arr.as_ptr(),
buf[2..].as_mut_ptr(),
std::mem::size_of::<u64>(),
);
}
let crc16 = crc16_arr(&lsn_arr);
buf[0] = crc16[0] ^ 0xFF;
buf[1] = crc16[1] ^ 0xFF;
buf
}
}