use std::sync::Arc;
use super::*;
#[derive(Debug)]
pub struct Log {
pub(super) iobufs: Arc<IoBufs>,
pub(crate) config: Config,
}
unsafe impl Send for Log {}
impl Log {
pub fn start(config: Config, snapshot: Snapshot) -> Result<Self> {
let iobufs = Arc::new(IoBufs::start(config.clone(), snapshot)?);
Ok(Self { iobufs, config })
}
pub fn start_raw_log(config: Config) -> Result<Self> {
assert_eq!(config.segment_mode, SegmentMode::Linear);
let (log_iter, _) = raw_segment_iter_from(0, &config)?;
let snapshot =
advance_snapshot(log_iter, Snapshot::default(), &config)?;
Self::start(config, snapshot)
}
pub fn flush(&self) -> Result<usize> {
iobuf::flush(&self.iobufs)
}
pub fn iter_from(&self, lsn: Lsn) -> LogIter {
self.iobufs.iter_from(lsn)
}
pub fn read(&self, pid: PageId, lsn: Lsn, ptr: DiskPtr) -> Result<LogRead> {
trace!("reading log lsn {} ptr {}", lsn, ptr);
self.make_stable(lsn)?;
if ptr.is_inline() {
let f = &self.config.file;
f.read_message(ptr.lid(), lsn, &self.config)
} else {
let (_, blob_ptr) = ptr.blob();
read_blob(blob_ptr, &self.config).map(|(kind, buf)| {
let sz = MSG_HEADER_LEN + BLOB_INLINE_LEN;
let header = MessageHeader {
kind,
pid,
lsn,
crc32: 0,
len: sz as u32,
};
LogRead::Blob(header, buf, blob_ptr)
})
}
}
pub fn stable_offset(&self) -> Lsn {
self.iobufs.stable()
}
pub fn make_stable(&self, lsn: Lsn) -> Result<usize> {
iobuf::make_stable(&self.iobufs, lsn)
}
pub(crate) fn with_sa<B, F>(&self, f: F) -> B
where
F: FnOnce(&mut SegmentAccountant) -> B,
{
self.iobufs.with_sa(f)
}
pub(super) fn rewrite_blob_ptr(
&self,
pid: PageId,
blob_ptr: BlobPointer,
) -> Result<Reservation<'_>> {
let lsn_buf: [u8; std::mem::size_of::<BlobPointer>()] =
u64_to_arr(blob_ptr as u64);
self.reserve_inner(LogKind::Replace, pid, &lsn_buf, true)
}
#[allow(unused)]
pub fn reserve(
&self,
log_kind: LogKind,
pid: PageId,
raw_buf: &[u8],
) -> Result<Reservation<'_>> {
let mut _compressed: Option<Vec<u8>> = None;
let mut buf = raw_buf;
#[cfg(feature = "compression")]
{
if self.config.use_compression {
use zstd::block::compress;
let _measure = Measure::new(&M.compress);
let compressed_buf =
compress(buf, self.config.compression_factor).unwrap();
_compressed = Some(compressed_buf);
buf = _compressed.as_ref().unwrap();
}
}
self.reserve_inner(log_kind, pid, buf, false)
}
fn reserve_inner(
&self,
log_kind: LogKind,
pid: PageId,
buf: &[u8],
is_blob_rewrite: bool,
) -> Result<Reservation<'_>> {
let _measure = Measure::new(&M.reserve_lat);
let total_buf_len = MSG_HEADER_LEN + buf.len();
M.reserve_sz.measure(total_buf_len as f64);
let max_buf_size = (self.config.io_buf_size
/ MINIMUM_ITEMS_PER_SEGMENT)
- SEG_HEADER_LEN;
let over_blob_threshold = total_buf_len > max_buf_size;
assert!(!(over_blob_threshold && is_blob_rewrite));
let inline_buf_len = if over_blob_threshold {
MSG_HEADER_LEN + std::mem::size_of::<Lsn>()
} else {
total_buf_len
};
trace!("reserving buf of len {}", inline_buf_len);
let mut printed = false;
macro_rules! trace_once {
($($msg:expr),*) => {
if !printed {
trace!($($msg),*);
printed = true;
}
};
}
let backoff = Backoff::new();
let kind = match (pid, log_kind, over_blob_threshold || is_blob_rewrite)
{
(COUNTER_PID, LogKind::Replace, false) => MessageKind::Counter,
(META_PID, LogKind::Replace, true) => MessageKind::BlobMeta,
(META_PID, LogKind::Replace, false) => MessageKind::InlineMeta,
(CONFIG_PID, LogKind::Replace, true) => MessageKind::BlobConfig,
(CONFIG_PID, LogKind::Replace, false) => MessageKind::InlineConfig,
(BATCH_MANIFEST_PID, LogKind::Skip, false) => {
MessageKind::BatchManifest
}
(_, LogKind::Free, false) => MessageKind::Free,
(_, LogKind::Replace, true) => MessageKind::BlobReplace,
(_, LogKind::Replace, false) => MessageKind::InlineReplace,
(_, LogKind::Append, true) => MessageKind::BlobAppend,
(_, LogKind::Append, false) => MessageKind::InlineAppend,
other => panic!(
"unexpected combination of PageId, \
LogKind, and blob status: {:?}",
other
),
};
loop {
M.log_reservation_attempted();
if let Err(e) = self.config.global_error() {
let _ = self.iobufs.intervals.lock();
self.iobufs.interval_updated.notify_all();
return Err(e);
}
let iobuf = self.iobufs.current_iobuf();
let header = iobuf.get_header();
if iobuf::is_sealed(header) {
trace_once!("io buffer already sealed, spinning");
backoff.snooze();
continue;
}
let buf_offset = iobuf::offset(header);
let prospective_size = buf_offset + inline_buf_len;
let would_overflow = prospective_size > iobuf.capacity;
if would_overflow {
trace_once!("io buffer too full, spinning");
iobuf::maybe_seal_and_write_iobuf(
&self.iobufs,
&iobuf,
header,
true,
)?;
backoff.spin();
continue;
}
let bumped_offset = iobuf::bump_offset(header, inline_buf_len);
if iobuf::n_writers(bumped_offset) == iobuf::MAX_WRITERS {
trace_once!(
"spinning because our buffer has {} writers already",
iobuf::MAX_WRITERS
);
backoff.snooze();
continue;
}
let claimed = iobuf::incr_writers(bumped_offset);
if iobuf.cas_header(header, claimed).is_err() {
trace_once!("CAS failed while claiming buffer slot, spinning");
backoff.spin();
continue;
}
let log_id = iobuf.lid;
assert_ne!(iobuf::n_writers(claimed), 0);
assert!(!iobuf::is_sealed(claimed));
let reservation_lsn = iobuf.lsn + buf_offset as Lsn;
assert_ne!(
log_id,
LogId::max_value(),
"fucked up on iobuf with lsn {}\n{:?}",
reservation_lsn,
self
);
let out_buf = unsafe { (*iobuf.buf.get()).as_mut_slice() };
let res_start = buf_offset;
let res_end = res_start + inline_buf_len;
let destination = &mut (out_buf)[res_start..res_end];
let reservation_offset = log_id + buf_offset as LogId;
trace!(
"reserved {} bytes at lsn {} lid {}",
inline_buf_len,
reservation_lsn,
reservation_offset,
);
bump_atomic_lsn(&self.iobufs.max_reserved_lsn, reservation_lsn);
self.iobufs.encapsulate(
&*buf,
destination,
kind,
pid,
reservation_lsn,
over_blob_threshold,
)?;
M.log_reservation_success();
let ptr = if over_blob_threshold {
DiskPtr::new_blob(reservation_offset, reservation_lsn)
} else if is_blob_rewrite {
let blob_ptr = arr_to_u64(&*buf) as BlobPointer;
DiskPtr::new_blob(reservation_offset, blob_ptr)
} else {
DiskPtr::new_inline(reservation_offset)
};
return Ok(Reservation {
iobuf,
log: &self,
buf: destination,
flushed: false,
lsn: reservation_lsn,
ptr,
is_blob_rewrite,
});
}
}
pub(super) fn exit_reservation(&self, iobuf: &Arc<IoBuf>) -> Result<()> {
let mut header = iobuf.get_header();
loop {
let new_hv = iobuf::decr_writers(header);
match iobuf.cas_header(header, new_hv) {
Ok(new) => {
header = new;
break;
}
Err(new) => {
header = new;
}
}
}
if iobuf::n_writers(header) == 0 && iobuf::is_sealed(header) {
if let Err(e) = self.config.global_error() {
let _ = self.iobufs.intervals.lock();
self.iobufs.interval_updated.notify_all();
return Err(e);
}
let lsn = iobuf.lsn;
trace!(
"asynchronously writing iobuf with lsn {} \
to log from exit_reservation",
lsn
);
let iobufs = self.iobufs.clone();
let iobuf = iobuf.clone();
let _result = threadpool::spawn(move || {
if let Err(e) = iobufs.write_to_log(&iobuf) {
error!(
"hit error while writing iobuf with lsn {}: {:?}",
lsn, e
);
iobufs.config.set_global_error(e);
}
});
#[cfg(any(test, feature = "check_snapshot_integrity"))]
_result.unwrap();
Ok(())
} else {
Ok(())
}
}
}
impl Drop for Log {
fn drop(&mut self) {
if self.config.global_error().is_err() {
return;
}
if let Err(e) = iobuf::flush(&self.iobufs) {
error!("failed to flush from IoBufs::drop: {}", e);
}
if !self.config.temporary {
self.config.file.sync_all().unwrap();
}
debug!("IoBufs dropped");
}
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub struct MessageHeader {
pub(crate) kind: MessageKind,
pub(crate) lsn: Lsn,
pub(crate) pid: PageId,
pub(crate) len: u32,
pub(crate) crc32: u32,
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub(crate) struct SegmentHeader {
pub(crate) lsn: Lsn,
pub(crate) max_stable_lsn: Lsn,
pub(crate) ok: bool,
}
#[doc(hidden)]
#[derive(Debug)]
pub enum LogRead {
Inline(MessageHeader, Vec<u8>, u32),
Blob(MessageHeader, Vec<u8>, BlobPointer),
Failed(Lsn, u32),
Pad(Lsn),
Corrupted(u32),
DanglingBlob(MessageHeader, BlobPointer),
BatchManifest(Lsn),
}
impl LogRead {
pub fn is_inline(&self) -> bool {
match *self {
LogRead::Inline(..) => true,
_ => false,
}
}
pub fn is_blob(&self) -> bool {
match self {
LogRead::Blob(..) => true,
_ => false,
}
}
pub fn is_failed(&self) -> bool {
match *self {
LogRead::Failed(_, _) => true,
_ => false,
}
}
pub fn is_successful(&self) -> bool {
match *self {
LogRead::Inline(..) | LogRead::Blob(..) => true,
_ => false,
}
}
pub fn is_pad(&self) -> bool {
match *self {
LogRead::Pad(_) => true,
_ => false,
}
}
pub fn is_corrupt(&self) -> bool {
match *self {
LogRead::Corrupted(_) => true,
_ => false,
}
}
pub fn into_data(self) -> Option<Vec<u8>> {
match self {
LogRead::Blob(_, buf, _) | LogRead::Inline(_, buf, _) => Some(buf),
_ => None,
}
}
}
impl From<[u8; MSG_HEADER_LEN]> for MessageHeader {
fn from(buf: [u8; MSG_HEADER_LEN]) -> Self {
let kind = MessageKind::from(buf[0]);
unsafe {
let page_id = arr_to_u64(buf.get_unchecked(1..9));
let lsn = arr_to_u64(buf.get_unchecked(9..17)) as Lsn;
let length = arr_to_u32(buf.get_unchecked(17..21));
let crc32 = arr_to_u32(buf.get_unchecked(21..)) ^ 0xFFFF_FFFF;
Self {
kind,
pid: page_id,
lsn,
len: length,
crc32,
}
}
}
}
impl Into<[u8; MSG_HEADER_LEN]> for MessageHeader {
fn into(self) -> [u8; MSG_HEADER_LEN] {
let mut buf = [0; MSG_HEADER_LEN];
buf[0] = self.kind.into();
let pid_arr = u64_to_arr(self.pid);
let lsn_arr = u64_to_arr(self.lsn as u64);
let length_arr = u32_to_arr(self.len as u32);
let crc32_arr = u32_to_arr(self.crc32 ^ 0xFFFF_FFFF);
unsafe {
std::ptr::copy_nonoverlapping(
pid_arr.as_ptr(),
buf.as_mut_ptr().add(1),
std::mem::size_of::<u64>(),
);
std::ptr::copy_nonoverlapping(
lsn_arr.as_ptr(),
buf.as_mut_ptr().add(9),
std::mem::size_of::<u64>(),
);
std::ptr::copy_nonoverlapping(
length_arr.as_ptr(),
buf.as_mut_ptr().add(17),
std::mem::size_of::<u32>(),
);
std::ptr::copy_nonoverlapping(
crc32_arr.as_ptr(),
buf.as_mut_ptr().add(21),
std::mem::size_of::<u32>(),
);
}
buf
}
}
impl From<[u8; SEG_HEADER_LEN]> for SegmentHeader {
fn from(buf: [u8; SEG_HEADER_LEN]) -> Self {
unsafe {
let crc32_header =
arr_to_u32(buf.get_unchecked(0..4)) ^ 0xFFFF_FFFF;
let xor_lsn = arr_to_u64(buf.get_unchecked(4..12)) as Lsn;
let lsn = xor_lsn ^ 0x7FFF_FFFF_FFFF_FFFF;
let xor_max_stable_lsn =
arr_to_u64(buf.get_unchecked(12..20)) as Lsn;
let max_stable_lsn = xor_max_stable_lsn ^ 0x7FFF_FFFF_FFFF_FFFF;
let crc32_tested = crc32(&buf[4..20]);
let ok = crc32_tested == crc32_header;
if !ok {
debug!(
"segment with lsn {} had computed crc {}, \
but stored crc {}",
lsn, crc32_tested, crc32_header
);
}
Self {
lsn,
max_stable_lsn,
ok,
}
}
}
}
impl Into<[u8; SEG_HEADER_LEN]> for SegmentHeader {
fn into(self) -> [u8; SEG_HEADER_LEN] {
let mut buf = [0; SEG_HEADER_LEN];
let xor_lsn = self.lsn ^ 0x7FFF_FFFF_FFFF_FFFF;
let xor_max_stable_lsn = self.max_stable_lsn ^ 0x7FFF_FFFF_FFFF_FFFF;
let lsn_arr = u64_to_arr(xor_lsn as u64);
let highest_stable_lsn_arr = u64_to_arr(xor_max_stable_lsn as u64);
unsafe {
std::ptr::copy_nonoverlapping(
lsn_arr.as_ptr(),
buf.as_mut_ptr().add(4),
std::mem::size_of::<u64>(),
);
std::ptr::copy_nonoverlapping(
highest_stable_lsn_arr.as_ptr(),
buf.as_mut_ptr().add(12),
std::mem::size_of::<u64>(),
);
}
let crc32 = u32_to_arr(crc32(&buf[4..20]) ^ 0xFFFF_FFFF);
unsafe {
std::ptr::copy_nonoverlapping(
crc32.as_ptr(),
buf.as_mut_ptr(),
std::mem::size_of::<u32>(),
);
}
buf
}
}