use std::{
sync::atomic::AtomicBool, sync::atomic::Ordering::SeqCst, sync::Arc,
};
use parking_lot::{Condvar, Mutex, RwLock};
use self::reader::LogReader;
use super::*;
pub(crate) const MAX_WRITERS: Header = 127;
pub(crate) type Header = u64;
macro_rules! io_fail {
($self:expr, $e:expr) => {
#[cfg(feature = "failpoints")]
fail_point!($e, |_| {
$self.config.set_global_error(Error::FailPoint);
let _ = $self.intervals.lock();
$self.interval_updated.notify_all();
Err(Error::FailPoint)
});
};
}
pub(crate) struct IoBuf {
pub(crate) buf: UnsafeCell<Vec<u8>>,
header: CachePadded<AtomicU64>,
pub(super) lid: LogId,
pub(super) lsn: Lsn,
pub(super) capacity: usize,
maxed: AtomicBool,
linearizer: Mutex<()>,
stored_max_stable_lsn: Lsn,
}
unsafe impl Sync for IoBuf {}
pub(super) struct IoBufs {
pub(super) config: Config,
pub(crate) iobuf: RwLock<Arc<IoBuf>>,
pub(crate) intervals: Mutex<Vec<(Lsn, Lsn)>>,
pub(super) interval_updated: Condvar,
pub(crate) stable_lsn: AtomicLsn,
pub(crate) max_reserved_lsn: AtomicLsn,
pub(crate) max_header_stable_lsn: Arc<AtomicLsn>,
pub(crate) segment_accountant: Mutex<SegmentAccountant>,
}
impl IoBufs {
pub(crate) fn start(config: Config, snapshot: Snapshot) -> Result<Self> {
let file = &config.file;
let io_buf_size = config.io_buf_size;
let snapshot_last_lsn = snapshot.last_lsn;
let snapshot_last_lid = snapshot.last_lid;
let snapshot_max_header_stable_lsn = snapshot.max_header_stable_lsn;
let mut segment_accountant: SegmentAccountant =
SegmentAccountant::start(config.clone(), snapshot)?;
let (next_lsn, next_lid) = if snapshot_last_lsn
% config.io_buf_size as Lsn
== 0
{
(snapshot_last_lsn, snapshot_last_lid)
} else {
let width = match file.read_message(
snapshot_last_lid,
snapshot_last_lsn,
&config,
) {
Ok(LogRead::Failed(_, len))
| Ok(LogRead::Inline(_, _, len)) => len + MSG_HEADER_LEN as u32,
Ok(LogRead::Blob(_header, _buf, _blob_ptr)) => {
(BLOB_INLINE_LEN + MSG_HEADER_LEN) as u32
}
other => {
debug!(
"got non-flush tip while recovering at {}: {:?}",
snapshot_last_lid, other
);
0
}
};
(
snapshot_last_lsn + Lsn::from(width),
snapshot_last_lid + LogId::from(width),
)
};
let mut iobuf = IoBuf::new(io_buf_size);
trace!(
"starting IoBufs with next_lsn: {} \
next_lid: {}",
next_lsn,
next_lid
);
let stable = next_lsn - 1;
if next_lsn % config.io_buf_size as Lsn == 0 {
if next_lsn == 0 {
assert_eq!(next_lid, 0);
}
let lid = segment_accountant.next(next_lsn)?;
if next_lsn == 0 {
assert_eq!(0, lid);
}
iobuf.lid = lid;
iobuf.capacity = io_buf_size;
iobuf.store_segment_header(0, next_lsn, stable);
debug!(
"starting log at clean offset {}, recovered lsn {}",
next_lid, next_lsn
);
} else {
let offset = assert_usize(next_lid % io_buf_size as LogId);
iobuf.lid = next_lid;
iobuf.capacity = io_buf_size - offset;
iobuf.lsn = next_lsn;
debug!(
"starting log at split offset {}, recovered lsn {}",
next_lid, next_lsn
);
}
gc_blobs(&config, stable)?;
Ok(Self {
config,
iobuf: RwLock::new(Arc::new(iobuf)),
intervals: Mutex::new(vec![]),
interval_updated: Condvar::new(),
stable_lsn: AtomicLsn::new(stable),
max_reserved_lsn: AtomicLsn::new(stable),
max_header_stable_lsn: Arc::new(AtomicLsn::new(
snapshot_max_header_stable_lsn,
)),
segment_accountant: Mutex::new(segment_accountant),
})
}
pub(super) fn with_sa<B, F>(&self, f: F) -> B
where
F: FnOnce(&mut SegmentAccountant) -> B,
{
let start = clock();
debug_delay();
let mut sa = self.segment_accountant.lock();
let locked_at = clock();
M.accountant_lock.measure(locked_at - start);
let ret = f(&mut sa);
drop(sa);
M.accountant_hold.measure(clock() - locked_at);
ret
}
pub(super) fn try_with_sa<B, F>(&self, f: F) -> Option<B>
where
F: FnOnce(&mut SegmentAccountant) -> B,
{
let start = clock();
debug_delay();
let mut sa = self.segment_accountant.try_lock()?;
let locked_at = clock();
M.accountant_lock.measure(locked_at - start);
let ret = f(&mut sa);
drop(sa);
M.accountant_hold.measure(clock() - locked_at);
Some(ret)
}
pub(crate) fn iter_from(&self, lsn: Lsn) -> LogIter {
trace!("iterating from lsn {}", lsn);
let io_buf_size = self.config.io_buf_size;
let segment_base_lsn = lsn / io_buf_size as Lsn * io_buf_size as Lsn;
let min_lsn = segment_base_lsn + SEG_HEADER_LEN as Lsn;
let corrected_lsn = std::cmp::max(lsn, min_lsn);
let segment_iter =
self.with_sa(|sa| sa.segment_snapshot_iter_from(corrected_lsn));
LogIter {
config: self.config.clone(),
max_lsn: self.stable(),
cur_lsn: corrected_lsn,
segment_base: None,
segment_iter,
}
}
pub(super) fn stable(&self) -> Lsn {
debug_delay();
self.stable_lsn.load(SeqCst) as Lsn
}
pub(crate) fn encapsulate(
&self,
in_buf: &[u8],
out_buf: &mut [u8],
kind: MessageKind,
pid: PageId,
lsn: Lsn,
over_blob_threshold: bool,
) -> Result<()> {
let blob_ptr;
let to_reserve = if over_blob_threshold {
io_fail!(self, "blob blob write");
write_blob(&self.config, kind, lsn, in_buf)?;
let lsn_buf = u64_to_arr(lsn as u64);
blob_ptr = lsn_buf;
&blob_ptr
} else {
in_buf
};
assert_eq!(out_buf.len(), to_reserve.len() + MSG_HEADER_LEN);
let header = MessageHeader {
kind,
pid,
lsn,
len: u32::try_from(to_reserve.len()).unwrap(),
crc32: 0,
};
let header_bytes: [u8; MSG_HEADER_LEN] = header.into();
unsafe {
std::ptr::copy_nonoverlapping(
header_bytes.as_ptr(),
out_buf.as_mut_ptr(),
MSG_HEADER_LEN,
);
std::ptr::copy_nonoverlapping(
to_reserve.as_ptr(),
out_buf.as_mut_ptr().add(MSG_HEADER_LEN),
to_reserve.len(),
);
}
Ok(())
}
pub(crate) fn write_to_log(&self, iobuf: &IoBuf) -> Result<()> {
let _measure = Measure::new(&M.write_to_log);
let header = iobuf.get_header();
let lid = iobuf.lid;
let base_lsn = iobuf.lsn;
let capacity = iobuf.capacity;
let io_buf_size = self.config.io_buf_size;
assert_eq!(
(lid % io_buf_size as LogId) as Lsn,
base_lsn % io_buf_size as Lsn
);
assert_ne!(
lid,
LogId::max_value(),
"created reservation for uninitialized slot",
);
assert!(is_sealed(header));
let bytes_to_write = offset(header);
trace!(
"write_to_log lid {} lsn {} len {}",
lid,
base_lsn,
bytes_to_write
);
let maxed = iobuf.linearized(|| iobuf.get_maxed());
let unused_space = capacity - bytes_to_write;
let should_pad = maxed && unused_space >= MSG_HEADER_LEN;
if should_pad {
let data = unsafe { (*iobuf.buf.get()).as_mut_slice() };
let pad_len = capacity - bytes_to_write - MSG_HEADER_LEN;
let padding_bytes = vec![MessageKind::Corrupted.into(); pad_len];
let header = MessageHeader {
kind: MessageKind::Pad,
pid: PageId::max_value(),
lsn: base_lsn + bytes_to_write as Lsn,
len: u32::try_from(pad_len).unwrap(),
crc32: 0,
};
let header_bytes: [u8; MSG_HEADER_LEN] = header.into();
unsafe {
std::ptr::copy_nonoverlapping(
header_bytes.as_ptr(),
data.as_mut_ptr().add(bytes_to_write),
MSG_HEADER_LEN,
);
std::ptr::copy_nonoverlapping(
padding_bytes.as_ptr(),
data.as_mut_ptr().add(bytes_to_write + MSG_HEADER_LEN),
pad_len,
);
}
let mut hasher = crc32fast::Hasher::new();
hasher.update(&padding_bytes);
hasher.update(&header_bytes);
let crc32 = hasher.finalize();
let crc32_arr = u32_to_arr(crc32 ^ 0xFFFF_FFFF);
unsafe {
std::ptr::copy_nonoverlapping(
crc32_arr.as_ptr(),
data.as_mut_ptr().add(
bytes_to_write + MSG_HEADER_LEN
- std::mem::size_of::<u32>(),
),
std::mem::size_of::<u32>(),
);
}
}
let total_len = if maxed { capacity } else { bytes_to_write };
let data = unsafe { (*iobuf.buf.get()).as_mut_slice() };
let f = &self.config.file;
io_fail!(self, "buffer write");
f.pwrite_all(&data[..total_len], lid)?;
if !self.config.temporary {
f.sync_all()?;
}
io_fail!(self, "buffer write post");
if total_len > 0 {
let complete_len = if maxed {
let lsn_idx = base_lsn / io_buf_size as Lsn;
let next_seg_beginning = (lsn_idx + 1) * io_buf_size as Lsn;
assert_usize(next_seg_beginning - base_lsn)
} else {
total_len
};
debug!(
"wrote lsns {}-{} to disk at offsets {}-{}, maxed {} complete_len {}",
base_lsn,
base_lsn + total_len as Lsn - 1,
lid,
lid + total_len as LogId - 1,
maxed,
complete_len
);
self.mark_interval(base_lsn, complete_len);
}
M.written_bytes.measure(total_len as f64);
let guard = pin();
let max_header_stable_lsn = self.max_header_stable_lsn.clone();
let stored_max_stable_lsn = iobuf.stored_max_stable_lsn;
guard.defer(move || {
trace!("bumping atomic header lsn to {}", stored_max_stable_lsn);
bump_atomic_lsn(&max_header_stable_lsn, stored_max_stable_lsn)
});
guard.flush();
drop(guard);
let current_max_header_stable_lsn =
self.max_header_stable_lsn.load(SeqCst);
if let Some(ret) =
self.try_with_sa(|sa| sa.stabilize(current_max_header_stable_lsn))
{
ret
} else {
Ok(())
}
}
fn mark_interval(&self, whence: Lsn, len: usize) {
debug!("mark_interval({}, {})", whence, len);
assert!(
len > 0,
"mark_interval called with an empty length at {}",
whence
);
let mut intervals = self.intervals.lock();
let interval = (whence, whence + len as Lsn - 1);
intervals.push(interval);
#[cfg(any(feature = "event_log", feature = "lock_free_delays"))]
assert!(
intervals.len() < 10000,
"intervals is getting strangely long... {:?}",
*intervals
);
intervals.sort_unstable_by(|a, b| b.cmp(a));
let mut updated = false;
let len_before = intervals.len();
while let Some(&(low, high)) = intervals.last() {
assert!(low <= high);
let cur_stable = self.stable_lsn.load(SeqCst);
assert!(
low > cur_stable,
"somehow, we marked offset {} stable while \
interval {}-{} had not yet been applied!",
cur_stable,
low,
high
);
if cur_stable + 1 == low {
let old = self.stable_lsn.swap(high, SeqCst);
assert_eq!(
old, cur_stable,
"concurrent stable offset modification detected"
);
debug!("new highest interval: {} - {}", low, high);
intervals.pop();
updated = true;
} else {
break;
}
}
if len_before - intervals.len() > 100 {
debug!("large merge of {} intervals", len_before - intervals.len());
}
if updated {
self.interval_updated.notify_all();
}
}
pub(super) fn current_iobuf(&self) -> Arc<IoBuf> {
self.iobuf.read().clone()
}
}
pub(crate) fn make_stable(iobufs: &Arc<IoBufs>, lsn: Lsn) -> Result<usize> {
let _measure = Measure::new(&M.make_stable);
let first_stable = iobufs.stable();
if first_stable >= lsn {
return Ok(0);
}
let mut stable = first_stable;
while stable < lsn {
if let Err(e) = iobufs.config.global_error() {
let _ = iobufs.intervals.lock();
iobufs.interval_updated.notify_all();
return Err(e);
}
let iobuf = iobufs.current_iobuf();
let header = iobuf.get_header();
if offset(header) == 0 || is_sealed(header) || iobuf.lsn > lsn {
} else {
maybe_seal_and_write_iobuf(iobufs, &iobuf, header, false)?;
stable = iobufs.stable();
continue;
}
let mut waiter = iobufs.intervals.lock();
stable = iobufs.stable();
if stable < lsn {
trace!("waiting on cond var for make_stable({})", lsn);
if cfg!(feature = "event_log") {
let timeout = iobufs
.interval_updated
.wait_for(&mut waiter, std::time::Duration::from_secs(30));
if timeout.timed_out() {
fn tn() -> String {
std::thread::current()
.name()
.unwrap_or("unknown")
.to_owned()
}
panic!(
"{} failed to make_stable after 30 seconds. \
waiting to stabilize lsn {}, current stable {} \
intervals: {:?}",
tn(),
lsn,
iobufs.stable(),
waiter
);
}
} else {
iobufs.interval_updated.wait(&mut waiter);
}
} else {
trace!("make_stable({}) returning", lsn);
break;
}
}
Ok(assert_usize(stable - first_stable))
}
pub(super) fn flush(iobufs: &Arc<IoBufs>) -> Result<usize> {
let max_reserved_lsn = iobufs.max_reserved_lsn.load(SeqCst) as Lsn;
make_stable(iobufs, max_reserved_lsn)
}
pub(crate) fn maybe_seal_and_write_iobuf(
iobufs: &Arc<IoBufs>,
iobuf: &Arc<IoBuf>,
header: Header,
from_reserve: bool,
) -> Result<()> {
if is_sealed(header) {
return Ok(());
}
let lid = iobuf.lid;
let lsn = iobuf.lsn;
let capacity = iobuf.capacity;
let io_buf_size = iobufs.config.io_buf_size;
if offset(header) > capacity {
return Ok(());
}
let sealed = mk_sealed(header);
let res_len = offset(sealed);
let maxed = from_reserve || capacity - res_len < MSG_HEADER_LEN;
let worked = iobuf.linearized(|| {
if iobuf.cas_header(header, sealed).is_err() {
return false;
}
trace!("sealed iobuf with lsn {}", lsn);
if maxed {
trace!("setting maxed to true for iobuf with lsn {}", lsn);
iobuf.set_maxed(true);
}
true
});
if !worked {
return Ok(());
}
assert!(
capacity + SEG_HEADER_LEN >= res_len,
"res_len of {} higher than buffer capacity {}",
res_len,
capacity
);
assert_ne!(
lid,
LogId::max_value(),
"sealing something that should never have \
been claimed (iobuf lsn {})\n{:?}",
lsn,
iobufs
);
let mut next_lsn = lsn;
let measure_assign_offset = Measure::new(&M.assign_offset);
let next_offset = if maxed {
let lsn_idx = lsn / io_buf_size as Lsn;
next_lsn = (lsn_idx + 1) * io_buf_size as Lsn;
debug!(
"rolling to new segment after clearing {}-{}",
lid,
lid + res_len as LogId,
);
match iobufs.with_sa(|sa| sa.next(next_lsn)) {
Ok(ret) => ret,
Err(e) => {
iobufs.config.set_global_error(e.clone());
let _ = iobufs.intervals.lock();
iobufs.interval_updated.notify_all();
return Err(e);
}
}
} else {
debug!(
"advancing offset within the current segment from {} to {}",
lid,
lid + res_len as LogId
);
next_lsn += res_len as Lsn;
lid + res_len as LogId
};
let mut next_iobuf = IoBuf::new(io_buf_size);
next_iobuf.lid = next_offset;
if maxed {
next_iobuf.capacity = io_buf_size;
next_iobuf.store_segment_header(sealed, next_lsn, iobufs.stable());
} else {
let new_cap = capacity - res_len;
assert_ne!(new_cap, 0);
next_iobuf.capacity = new_cap;
next_iobuf.lsn = next_lsn;
let last_salt = salt(sealed);
let new_salt = bump_salt(last_salt);
next_iobuf.set_header(new_salt);
}
debug_delay();
let intervals = iobufs.intervals.lock();
let mut mu = iobufs.iobuf.write();
*mu = Arc::new(next_iobuf);
drop(mu);
iobufs.interval_updated.notify_all();
drop(intervals);
drop(measure_assign_offset);
if n_writers(sealed) == 0 {
iobufs.config.global_error()?;
trace!(
"asynchronously writing iobuf with lsn {} to log from maybe_seal",
lsn
);
let iobufs = iobufs.clone();
let iobuf = iobuf.clone();
let _result = threadpool::spawn(move || {
if let Err(e) = iobufs.write_to_log(&iobuf) {
error!(
"hit error while writing iobuf with lsn {}: {:?}",
lsn, e
);
let _ = iobufs.intervals.lock();
iobufs.interval_updated.notify_all();
iobufs.config.set_global_error(e);
}
});
#[cfg(any(test, feature = "check_snapshot_integrity"))]
_result.unwrap();
Ok(())
} else {
Ok(())
}
}
impl Debug for IoBufs {
fn fmt(
&self,
formatter: &mut fmt::Formatter<'_>,
) -> std::result::Result<(), fmt::Error> {
formatter.write_fmt(format_args!("IoBufs {{ buf: {:?} }}", self.iobuf))
}
}
impl Debug for IoBuf {
fn fmt(
&self,
formatter: &mut fmt::Formatter<'_>,
) -> std::result::Result<(), fmt::Error> {
let header = self.get_header();
formatter.write_fmt(format_args!(
"\n\tIoBuf {{ lid: {}, n_writers: {}, offset: \
{}, sealed: {} }}",
self.lid,
n_writers(header),
offset(header),
is_sealed(header)
))
}
}
impl IoBuf {
pub(crate) fn new(buf_size: usize) -> Self {
Self {
buf: UnsafeCell::new(vec![0; buf_size]),
header: CachePadded::new(AtomicU64::new(0)),
lid: LogId::max_value(),
lsn: 0,
capacity: 0,
maxed: AtomicBool::new(false),
linearizer: Mutex::new(()),
stored_max_stable_lsn: -1,
}
}
pub(crate) fn linearized<F, B>(&self, f: F) -> B
where
F: FnOnce() -> B,
{
let _l = self.linearizer.lock();
f()
}
pub(crate) fn store_segment_header(
&mut self,
last: Header,
lsn: Lsn,
max_stable_lsn: Lsn,
) {
debug!("storing lsn {} in beginning of buffer", lsn);
assert!(self.capacity >= SEG_HEADER_LEN);
self.stored_max_stable_lsn = max_stable_lsn;
self.lsn = lsn;
let header = SegmentHeader {
lsn,
max_stable_lsn,
ok: true,
};
let header_bytes: [u8; SEG_HEADER_LEN] = header.into();
unsafe {
std::ptr::copy_nonoverlapping(
header_bytes.as_ptr(),
(*self.buf.get()).as_mut_ptr(),
SEG_HEADER_LEN,
);
}
let last_salt = salt(last);
let new_salt = bump_salt(last_salt);
let bumped = bump_offset(new_salt, SEG_HEADER_LEN);
self.set_header(bumped);
}
pub(crate) fn set_maxed(&self, maxed: bool) {
debug_delay();
self.maxed.store(maxed, SeqCst);
}
pub(crate) fn get_maxed(&self) -> bool {
debug_delay();
self.maxed.load(SeqCst)
}
pub(crate) fn get_header(&self) -> Header {
debug_delay();
self.header.load(SeqCst)
}
pub(crate) fn set_header(&self, new: Header) {
debug_delay();
self.header.store(new, SeqCst);
}
pub(crate) fn cas_header(
&self,
old: Header,
new: Header,
) -> std::result::Result<Header, Header> {
debug_delay();
let res = self.header.compare_and_swap(old, new, SeqCst);
if res == old {
Ok(new)
} else {
Err(res)
}
}
}
pub(crate) const fn is_sealed(v: Header) -> bool {
v & 1 << 31 == 1 << 31
}
pub(crate) const fn mk_sealed(v: Header) -> Header {
v | 1 << 31
}
pub(crate) const fn n_writers(v: Header) -> Header {
v << 33 >> 57
}
#[cfg_attr(not(feature = "no_inline"), inline)]
pub(crate) fn incr_writers(v: Header) -> Header {
assert_ne!(n_writers(v), MAX_WRITERS);
v + (1 << 24)
}
#[cfg_attr(not(feature = "no_inline"), inline)]
pub(crate) fn decr_writers(v: Header) -> Header {
assert_ne!(n_writers(v), 0);
v - (1 << 24)
}
pub(crate) const fn offset(v: Header) -> usize {
let ret = v << 40 >> 40;
ret as usize
}
#[cfg_attr(not(feature = "no_inline"), inline)]
pub(crate) fn bump_offset(v: Header, by: usize) -> Header {
assert_eq!(by >> 24, 0);
v + (by as Header)
}
pub(crate) const fn bump_salt(v: Header) -> Header {
(v + (1 << 32)) & 0xFFFF_FFFF_0000_0000
}
pub(crate) const fn salt(v: Header) -> Header {
v >> 32 << 32
}