use std::{
alloc::{alloc, dealloc, Layout},
cell::UnsafeCell,
sync::atomic::AtomicPtr,
};
use crate::{pagecache::*, *};
macro_rules! io_fail {
($self:expr, $e:expr) => {
#[cfg(feature = "failpoints")]
{
debug_delay();
if crate::fail::is_active($e) {
$self.config.set_global_error(Error::FailPoint);
let _mu = $self.intervals.lock();
drop(_mu);
let _notified = $self.interval_updated.notify_all();
return Err(Error::FailPoint);
}
};
};
}
struct AlignedBuf(*mut u8, usize);
#[allow(unsafe_code)]
unsafe impl Send for AlignedBuf {}
#[allow(unsafe_code)]
unsafe impl Sync for AlignedBuf {}
impl AlignedBuf {
fn new(len: usize) -> AlignedBuf {
let layout = Layout::from_size_align(len, 8192).unwrap();
let ptr = unsafe { alloc(layout) };
assert!(!ptr.is_null(), "failed to allocate critical IO buffer");
AlignedBuf(ptr, len)
}
}
impl Drop for AlignedBuf {
fn drop(&mut self) {
let layout = Layout::from_size_align(self.1, 8192).unwrap();
unsafe {
dealloc(self.0, layout);
}
}
}
pub(crate) struct IoBuf {
buf: Arc<UnsafeCell<AlignedBuf>>,
header: CachePadded<AtomicU64>,
base: usize,
pub offset: LogOffset,
pub lsn: Lsn,
pub capacity: usize,
stored_max_stable_lsn: Lsn,
}
#[allow(unsafe_code)]
unsafe impl Sync for IoBuf {}
#[allow(unsafe_code)]
unsafe impl Send for IoBuf {}
impl IoBuf {
pub(crate) fn get_mut_range(
&self,
at: usize,
len: usize,
) -> &'static mut [u8] {
let buf_ptr = self.buf.get();
unsafe {
assert!((*buf_ptr).1 >= at + len);
std::slice::from_raw_parts_mut(
(*buf_ptr).0.add(self.base + at),
len,
)
}
}
fn store_segment_header(
&mut self,
last: Header,
lsn: Lsn,
max_stable_lsn: Lsn,
) {
debug!("storing lsn {} in beginning of buffer", lsn);
assert!(self.capacity >= SEG_HEADER_LEN);
self.stored_max_stable_lsn = max_stable_lsn;
self.lsn = lsn;
let header = SegmentHeader { lsn, max_stable_lsn, ok: true };
let header_bytes: [u8; SEG_HEADER_LEN] = header.into();
#[allow(unsafe_code)]
unsafe {
std::ptr::copy_nonoverlapping(
header_bytes.as_ptr(),
(*self.buf.get()).0,
SEG_HEADER_LEN,
);
}
let last_salt = header::salt(last);
let new_salt = header::bump_salt(last_salt);
let bumped = header::bump_offset(new_salt, SEG_HEADER_LEN);
self.set_header(bumped);
}
pub(crate) fn get_header(&self) -> Header {
debug_delay();
self.header.load(Acquire)
}
pub(crate) fn set_header(&self, new: Header) {
debug_delay();
self.header.store(new, Release);
}
pub(crate) fn cas_header(
&self,
old: Header,
new: Header,
) -> std::result::Result<Header, Header> {
debug_delay();
let res = self.header.compare_and_swap(old, new, SeqCst);
if res == old {
Ok(new)
} else {
Err(res)
}
}
}
#[derive(Debug)]
pub(crate) struct StabilityIntervals {
fsynced_ranges: Vec<(Lsn, Lsn)>,
batches: BTreeMap<Lsn, Lsn>,
stable_lsn: Lsn,
}
impl StabilityIntervals {
fn new(lsn: Lsn) -> StabilityIntervals {
StabilityIntervals {
stable_lsn: lsn,
fsynced_ranges: vec![],
batches: BTreeMap::default(),
}
}
pub(crate) fn mark_batch(&mut self, interval: (Lsn, Lsn)) {
assert!(interval.0 > self.stable_lsn);
self.batches.insert(interval.0, interval.1);
}
fn mark_fsync(&mut self, interval: (Lsn, Lsn)) -> Option<Lsn> {
trace!(
"pushing interval {:?} into fsynced_ranges {:?}",
interval,
self.fsynced_ranges
);
if let Some((low, high)) = self.fsynced_ranges.last_mut() {
if *low == interval.1 + 1 {
*low = interval.0
} else if *high + 1 == interval.0 {
*high = interval.1
} else {
self.fsynced_ranges.push(interval);
}
} else {
self.fsynced_ranges.push(interval);
}
#[cfg(any(test, feature = "event_log", feature = "lock_free_delays"))]
assert!(
self.fsynced_ranges.len() < 10000,
"intervals is getting strangely long... {:?}",
self
);
self.fsynced_ranges
.sort_unstable_by_key(|&range| std::cmp::Reverse(range));
while let Some(&(low, high)) = self.fsynced_ranges.last() {
assert!(low <= high);
let cur_stable = self.stable_lsn;
assert!(
low > cur_stable,
"somehow, we marked offset {} stable while \
interval {}-{} had not yet been applied!",
cur_stable,
low,
high
);
if cur_stable + 1 == low {
debug!("new highest interval: {} - {}", low, high);
self.fsynced_ranges.pop().unwrap();
self.stable_lsn = high;
} else {
break;
}
}
let mut batch_stable_lsn = None;
while let Some((low, high)) =
self.batches.iter().map(|(l, h)| (*l, *h)).next()
{
assert!(
low < high,
"expected batch low mark {} to be below high mark {}",
low,
high
);
if high <= self.stable_lsn {
if let Some(bsl) = batch_stable_lsn {
assert!(bsl < high);
}
batch_stable_lsn = Some(high);
self.batches.remove(&low).unwrap();
} else {
if low <= self.stable_lsn {
batch_stable_lsn = Some(low - 1);
}
break;
}
}
if self.batches.is_empty() {
Some(self.stable_lsn)
} else {
batch_stable_lsn
}
}
}
pub(crate) struct IoBufs {
pub config: RunningConfig,
pub iobuf: AtomicPtr<IoBuf>,
pub intervals: Mutex<StabilityIntervals>,
pub interval_updated: Condvar,
pub stable_lsn: AtomicLsn,
pub max_reserved_lsn: AtomicLsn,
pub max_header_stable_lsn: Arc<AtomicLsn>,
pub segment_accountant: Mutex<SegmentAccountant>,
pub segment_cleaner: SegmentCleaner,
deferred_segment_ops: stack::Stack<SegmentOp>,
#[cfg(feature = "io_uring")]
pub submission_mutex: Mutex<()>,
#[cfg(feature = "io_uring")]
pub io_uring: rio::Rio,
}
impl Drop for IoBufs {
fn drop(&mut self) {
let ptr = self.iobuf.swap(std::ptr::null_mut(), SeqCst);
assert!(!ptr.is_null());
unsafe {
Arc::from_raw(ptr);
}
}
}
impl IoBufs {
pub fn start(config: RunningConfig, snapshot: &Snapshot) -> Result<IoBufs> {
let segment_cleaner = SegmentCleaner::default();
let mut segment_accountant: SegmentAccountant =
SegmentAccountant::start(
config.clone(),
snapshot,
segment_cleaner.clone(),
)?;
let segment_size = config.segment_size;
let (recovered_lid, recovered_lsn) =
snapshot.recovered_coords(config.segment_size);
let (next_lid, next_lsn) = match (recovered_lid, recovered_lsn) {
(Some(next_lid), Some(next_lsn)) => {
debug!(
"starting log at recovered active \
offset {}, recovered lsn {}",
next_lid, next_lsn
);
(next_lid, next_lsn)
}
(None, None) => {
debug!("starting log for a totally fresh system");
let next_lsn = 0;
let next_lid = segment_accountant.next(next_lsn)?;
(next_lid, next_lsn)
}
(None, Some(next_lsn)) => {
let next_lid = segment_accountant.next(next_lsn)?;
debug!(
"starting log at clean offset {}, recovered lsn {}",
next_lid, next_lsn
);
(next_lid, next_lsn)
}
(Some(_), None) => unreachable!(),
};
assert!(next_lsn >= Lsn::try_from(next_lid).unwrap());
debug!(
"starting IoBufs with next_lsn: {} \
next_lid: {}",
next_lsn, next_lid
);
let stable = next_lsn - 1;
let base = assert_usize(next_lid % segment_size as LogOffset);
let mut iobuf = IoBuf {
buf: Arc::new(UnsafeCell::new(AlignedBuf::new(segment_size))),
header: CachePadded::new(AtomicU64::new(0)),
base,
offset: next_lid,
lsn: next_lsn,
capacity: segment_size - base,
stored_max_stable_lsn: -1,
};
if snapshot.active_segment.is_none() {
iobuf.store_segment_header(0, next_lsn, stable);
}
Ok(IoBufs {
config,
iobuf: AtomicPtr::new(Arc::into_raw(Arc::new(iobuf)) as *mut IoBuf),
intervals: Mutex::new(StabilityIntervals::new(stable)),
interval_updated: Condvar::new(),
stable_lsn: AtomicLsn::new(stable),
max_reserved_lsn: AtomicLsn::new(stable),
max_header_stable_lsn: Arc::new(AtomicLsn::new(next_lsn)),
segment_accountant: Mutex::new(segment_accountant),
segment_cleaner,
deferred_segment_ops: stack::Stack::default(),
#[cfg(feature = "io_uring")]
submission_mutex: Mutex::new(()),
#[cfg(feature = "io_uring")]
io_uring: rio::new()?,
})
}
pub(in crate::pagecache) fn sa_mark_link(
&self,
pid: PageId,
cache_info: CacheInfo,
guard: &Guard,
) {
let op = SegmentOp::Link { pid, cache_info };
self.deferred_segment_ops.push(op, guard);
}
pub(in crate::pagecache) fn sa_mark_replace(
&self,
pid: PageId,
lsn: Lsn,
old_cache_infos: &[CacheInfo],
new_cache_info: CacheInfo,
guard: &Guard,
) -> Result<()> {
debug_delay();
if let Some(mut sa) = self.segment_accountant.try_lock() {
let start = clock();
sa.mark_replace(pid, lsn, old_cache_infos, new_cache_info)?;
for op in self.deferred_segment_ops.take_iter(guard) {
sa.apply_op(op)?;
}
M.accountant_hold.measure(clock() - start);
} else {
let op = SegmentOp::Replace {
pid,
lsn,
old_cache_infos: old_cache_infos.to_vec(),
new_cache_info,
};
self.deferred_segment_ops.push(op, guard);
}
Ok(())
}
pub(in crate::pagecache) fn sa_stabilize(
&self,
lsn: Lsn,
guard: &Guard,
) -> Result<()> {
self.with_sa(|sa| {
for op in self.deferred_segment_ops.take_iter(guard) {
sa.apply_op(op)?;
}
sa.stabilize(lsn)?;
Ok(())
})
}
pub(in crate::pagecache) fn with_sa<B, F>(&self, f: F) -> B
where
F: FnOnce(&mut SegmentAccountant) -> B,
{
let start = clock();
debug_delay();
let mut sa = self.segment_accountant.lock();
let locked_at = clock();
M.accountant_lock.measure(locked_at - start);
let ret = f(&mut sa);
drop(sa);
M.accountant_hold.measure(clock() - locked_at);
ret
}
pub(crate) fn iter_from(&self, lsn: Lsn) -> LogIter {
trace!("iterating from lsn {}", lsn);
let segments = self.with_sa(|sa| sa.segment_snapshot_iter_from(lsn));
LogIter {
config: self.config.clone(),
max_lsn: Some(self.stable()),
cur_lsn: None,
segment_base: None,
segments,
last_stage: false,
}
}
pub(in crate::pagecache) fn stable(&self) -> Lsn {
debug_delay();
self.stable_lsn.load(Acquire)
}
#[allow(clippy::mut_mut)]
pub(crate) fn encapsulate<T: Serialize + Debug>(
&self,
item: &T,
header: MessageHeader,
mut out_buf: &mut [u8],
blob_id: Option<Lsn>,
) -> Result<()> {
let out_buf_ref: &mut &mut [u8] = &mut out_buf;
{
let _ = Measure::new(&M.serialize);
header.serialize_into(out_buf_ref);
}
if let Some(blob_id) = blob_id {
io_fail!(self, "blob blob write");
write_blob(&self.config, header.kind, blob_id, item)?;
let _ = Measure::new(&M.serialize);
blob_id.serialize_into(out_buf_ref);
} else {
let _ = Measure::new(&M.serialize);
item.serialize_into(out_buf_ref);
};
assert_eq!(
out_buf_ref.len(),
0,
"trying to serialize header {:?} \
and item {:?} but there were \
buffer leftovers at the end",
header,
item
);
Ok(())
}
pub(crate) fn write_to_log(&self, iobuf: &IoBuf) -> Result<()> {
let _measure = Measure::new(&M.write_to_log);
let header = iobuf.get_header();
let log_offset = iobuf.offset;
let base_lsn = iobuf.lsn;
let capacity = iobuf.capacity;
let segment_size = self.config.segment_size;
assert_eq!(
Lsn::try_from(log_offset % segment_size as LogOffset).unwrap(),
base_lsn % segment_size as Lsn
);
assert_ne!(
log_offset,
LogOffset::max_value(),
"created reservation for uninitialized slot",
);
assert!(header::is_sealed(header));
let bytes_to_write = header::offset(header);
trace!(
"write_to_log log_offset {} lsn {} len {}",
log_offset,
base_lsn,
bytes_to_write
);
let maxed = header::is_maxed(header);
let unused_space = capacity - bytes_to_write;
let should_pad = maxed && unused_space >= MAX_MSG_HEADER_LEN;
if should_pad {
let pad_len = unused_space - MAX_MSG_HEADER_LEN;
let data = iobuf.get_mut_range(bytes_to_write, unused_space);
let segment_number = SegmentNumber(
u64::try_from(base_lsn).unwrap()
/ u64::try_from(self.config.segment_size).unwrap(),
);
let header = MessageHeader {
kind: MessageKind::Cap,
pid: PageId::max_value(),
segment_number,
len: u64::try_from(pad_len).unwrap(),
crc32: 0,
};
let header_bytes = header.serialize();
let padding_bytes = vec![
MessageKind::Corrupted.into();
unused_space - header_bytes.len()
];
#[allow(unsafe_code)]
unsafe {
std::ptr::copy_nonoverlapping(
header_bytes.as_ptr(),
data.as_mut_ptr(),
header_bytes.len(),
);
std::ptr::copy_nonoverlapping(
padding_bytes.as_ptr(),
data.as_mut_ptr().add(header_bytes.len()),
padding_bytes.len(),
);
}
let crc32_arr = u32_to_arr(calculate_message_crc32(
&header_bytes,
&padding_bytes[..pad_len],
));
#[allow(unsafe_code)]
unsafe {
std::ptr::copy_nonoverlapping(
crc32_arr.as_ptr(),
data.as_mut_ptr(),
std::mem::size_of::<u32>(),
);
}
} else if maxed {
let data = iobuf.get_mut_range(bytes_to_write, unused_space);
#[allow(unsafe_code)]
unsafe {
std::ptr::write_bytes(
data.as_mut_ptr(),
MessageKind::Corrupted.into(),
unused_space,
);
}
}
let total_len = if maxed { capacity } else { bytes_to_write };
let data = iobuf.get_mut_range(0, total_len);
let stored_max_stable_lsn = iobuf.stored_max_stable_lsn;
io_fail!(self, "buffer write");
#[cfg(feature = "io_uring")]
{
let mut wrote = 0;
while wrote < total_len {
let to_write = &data[wrote..];
let offset = log_offset + wrote as u64;
let link_mu = self.submission_mutex.lock();
let wrote_completion = self.io_uring.write_at_ordered(
&*self.config.file,
&to_write,
offset,
rio::Ordering::Link,
);
let sync_completion = self.io_uring.sync_file_range(
&*self.config.file,
offset,
to_write.len(),
);
sync_completion.wait()?;
drop(link_mu);
wrote += wrote_completion.wait()?;
}
}
#[cfg(not(feature = "io_uring"))]
{
let f = &self.config.file;
pwrite_all(f, data, log_offset)?;
if !self.config.temporary {
#[cfg(target_os = "linux")]
{
use std::os::unix::io::AsRawFd;
let ret = unsafe {
libc::sync_file_range(
f.as_raw_fd(),
i64::try_from(log_offset).unwrap(),
i64::try_from(total_len).unwrap(),
libc::SYNC_FILE_RANGE_WAIT_BEFORE
| libc::SYNC_FILE_RANGE_WRITE
| libc::SYNC_FILE_RANGE_WAIT_AFTER,
)
};
if ret < 0 {
let err = std::io::Error::last_os_error();
if let Some(libc::ENOSYS) = err.raw_os_error() {
f.sync_all()?;
} else {
return Err(err.into());
}
}
}
#[cfg(not(target_os = "linux"))]
f.sync_all()?;
}
}
io_fail!(self, "buffer write post");
if total_len > 0 {
let complete_len = if maxed {
let lsn_idx = base_lsn / segment_size as Lsn;
let next_seg_beginning = (lsn_idx + 1) * segment_size as Lsn;
assert_usize(next_seg_beginning - base_lsn)
} else {
total_len
};
debug!(
"wrote lsns {}-{} to disk at offsets {}-{}, maxed {} complete_len {}",
base_lsn,
base_lsn + total_len as Lsn - 1,
log_offset,
log_offset + total_len as LogOffset - 1,
maxed,
complete_len
);
self.mark_interval(base_lsn, complete_len);
}
M.written_bytes.measure(total_len as u64);
let guard = pin();
let max_header_stable_lsn = self.max_header_stable_lsn.clone();
guard.defer(move || {
trace!("bumping atomic header lsn to {}", stored_max_stable_lsn);
bump_atomic_lsn(&max_header_stable_lsn, stored_max_stable_lsn)
});
guard.flush();
let current_max_header_stable_lsn =
self.max_header_stable_lsn.load(Acquire);
self.sa_stabilize(current_max_header_stable_lsn, &guard)
}
fn mark_interval(&self, whence: Lsn, len: usize) {
debug!("mark_interval({}, {})", whence, len);
assert!(
len > 0,
"mark_interval called with an empty length at {}",
whence
);
let mut intervals = self.intervals.lock();
let interval = (whence, whence + len as Lsn - 1);
let updated = intervals.mark_fsync(interval);
if let Some(new_stable_lsn) = updated {
trace!("mark_interval new highest lsn {}", new_stable_lsn);
self.stable_lsn.store(new_stable_lsn, SeqCst);
#[cfg(feature = "event_log")]
{
self.config.event_log.stabilized_lsn(new_stable_lsn + 1);
}
drop(intervals);
}
let _notified = self.interval_updated.notify_all();
}
pub(in crate::pagecache) fn current_iobuf(&self) -> Arc<IoBuf> {
let arc = unsafe { Arc::from_raw(self.iobuf.load(Acquire)) };
#[allow(clippy::mem_forget)]
std::mem::forget(arc.clone());
arc
}
}
pub(crate) fn roll_iobuf(iobufs: &Arc<IoBufs>) -> Result<usize> {
let iobuf = iobufs.current_iobuf();
let header = iobuf.get_header();
if header::is_sealed(header) {
trace!("skipping roll_iobuf due to already-sealed header");
return Ok(0);
}
if header::offset(header) == 0 {
trace!("skipping roll_iobuf due to empty segment");
} else {
trace!("sealing ioubuf from roll_iobuf");
maybe_seal_and_write_iobuf(iobufs, &iobuf, header, false)?;
}
Ok(header::offset(header))
}
pub(in crate::pagecache) fn make_stable(
iobufs: &Arc<IoBufs>,
lsn: Lsn,
) -> Result<usize> {
make_stable_inner(iobufs, lsn, false)
}
pub(in crate::pagecache) fn make_durable(
iobufs: &Arc<IoBufs>,
lsn: Lsn,
) -> Result<usize> {
make_stable_inner(iobufs, lsn, true)
}
pub(in crate::pagecache) fn make_stable_inner(
iobufs: &Arc<IoBufs>,
lsn: Lsn,
partial_durability: bool,
) -> Result<usize> {
let _measure = Measure::new(&M.make_stable);
let first_stable = iobufs.stable();
if first_stable >= lsn {
return Ok(0);
}
let mut stable = first_stable;
while stable < lsn {
if let Err(e) = iobufs.config.global_error() {
let intervals = iobufs.intervals.lock();
drop(intervals);
let _notified = iobufs.interval_updated.notify_all();
return Err(e);
}
let iobuf = iobufs.current_iobuf();
let header = iobuf.get_header();
if header::offset(header) == 0
|| header::is_sealed(header)
|| iobuf.lsn > lsn
{
} else {
maybe_seal_and_write_iobuf(iobufs, &iobuf, header, false)?;
stable = iobufs.stable();
continue;
}
let mut waiter = iobufs.intervals.lock();
if let Err(e) = iobufs.config.global_error() {
drop(waiter);
let _notified = iobufs.interval_updated.notify_all();
return Err(e);
}
stable = iobufs.stable();
if partial_durability {
if waiter.stable_lsn > lsn {
return Ok(assert_usize(stable - first_stable));
}
for (low, high) in &waiter.fsynced_ranges {
if *low <= lsn && *high > lsn {
return Ok(assert_usize(stable - first_stable));
}
}
}
if stable < lsn {
trace!("waiting on cond var for make_stable({})", lsn);
if cfg!(feature = "event_log") {
let timeout = iobufs
.interval_updated
.wait_for(&mut waiter, std::time::Duration::from_secs(30));
if timeout.timed_out() {
fn tn() -> String {
std::thread::current()
.name()
.unwrap_or("unknown")
.to_owned()
}
panic!(
"{} failed to make_stable after 30 seconds. \
waiting to stabilize lsn {}, current stable {} \
intervals: {:?}",
tn(),
lsn,
iobufs.stable(),
waiter
);
}
} else {
iobufs.interval_updated.wait(&mut waiter);
}
} else {
debug!("make_stable({}) returning", lsn);
break;
}
}
Ok(assert_usize(stable - first_stable))
}
pub(in crate::pagecache) fn flush(iobufs: &Arc<IoBufs>) -> Result<usize> {
let _cc = concurrency_control::read();
let max_reserved_lsn = iobufs.max_reserved_lsn.load(Acquire);
make_stable(iobufs, max_reserved_lsn)
}
pub(in crate::pagecache) fn maybe_seal_and_write_iobuf(
iobufs: &Arc<IoBufs>,
iobuf: &Arc<IoBuf>,
header: Header,
from_reserve: bool,
) -> Result<()> {
if header::is_sealed(header) {
return Ok(());
}
let lid = iobuf.offset;
let lsn = iobuf.lsn;
let capacity = iobuf.capacity;
let segment_size = iobufs.config.segment_size;
if header::offset(header) > capacity {
return Ok(());
}
let res_len = header::offset(header);
let maxed = from_reserve || capacity - res_len < MAX_MSG_HEADER_LEN;
let sealed = if maxed {
trace!("setting maxed to true for iobuf with lsn {}", lsn);
header::mk_maxed(header::mk_sealed(header))
} else {
header::mk_sealed(header)
};
let worked = iobuf.cas_header(header, sealed).is_ok();
if !worked {
return Ok(());
}
trace!("sealed iobuf with lsn {}", lsn);
assert!(
capacity + SEG_HEADER_LEN >= res_len,
"res_len of {} higher than buffer capacity {}",
res_len,
capacity
);
assert_ne!(
lid,
LogOffset::max_value(),
"sealing something that should never have \
been claimed (iobuf lsn {})\n{:?}",
lsn,
iobufs
);
let mut next_lsn = lsn;
let measure_assign_offset = Measure::new(&M.assign_offset);
let next_offset = if maxed {
let lsn_idx = lsn / segment_size as Lsn;
next_lsn = (lsn_idx + 1) * segment_size as Lsn;
debug!(
"rolling to new segment after clearing {}-{}",
lid,
lid + res_len as LogOffset,
);
match iobufs.with_sa(|sa| sa.next(next_lsn)) {
Ok(ret) => ret,
Err(e) => {
iobufs.config.set_global_error(e.clone());
let intervals = iobufs.intervals.lock();
drop(intervals);
let _notified = iobufs.interval_updated.notify_all();
return Err(e);
}
}
} else {
debug!(
"advancing offset within the current segment from {} to {}",
lid,
lid + res_len as LogOffset
);
next_lsn += res_len as Lsn;
lid + res_len as LogOffset
};
let next_iobuf = if maxed {
let mut next_iobuf = IoBuf {
buf: Arc::new(UnsafeCell::new(AlignedBuf::new(segment_size))),
header: CachePadded::new(AtomicU64::new(0)),
base: 0,
offset: next_offset,
lsn: next_lsn,
capacity: segment_size,
stored_max_stable_lsn: -1,
};
next_iobuf.store_segment_header(sealed, next_lsn, iobufs.stable());
next_iobuf
} else {
let new_cap = capacity - res_len;
assert_ne!(new_cap, 0);
let last_salt = header::salt(sealed);
let new_salt = header::bump_salt(last_salt);
IoBuf {
buf: iobuf.buf.clone(),
header: CachePadded::new(AtomicU64::new(new_salt)),
base: iobuf.base + res_len,
offset: next_offset,
lsn: next_lsn,
capacity: new_cap,
stored_max_stable_lsn: -1,
}
};
debug_delay();
let intervals = iobufs.intervals.lock();
let old_ptr = iobufs
.iobuf
.swap(Arc::into_raw(Arc::new(next_iobuf)) as *mut IoBuf, SeqCst);
let old_arc = unsafe { Arc::from_raw(old_ptr) };
pin().defer(move || drop(old_arc));
drop(intervals);
let _notified = iobufs.interval_updated.notify_all();
drop(measure_assign_offset);
if header::n_writers(sealed) == 0 {
iobufs.config.global_error()?;
trace!(
"asynchronously writing iobuf with lsn {} to log from maybe_seal",
lsn
);
let iobufs = iobufs.clone();
let iobuf = iobuf.clone();
let _result = threadpool::spawn(move || {
if let Err(e) = iobufs.write_to_log(&iobuf) {
error!(
"hit error while writing iobuf with lsn {}: {:?}",
lsn, e
);
iobufs.config.set_global_error(e);
let intervals = iobufs.intervals.lock();
drop(intervals);
let _notified = iobufs.interval_updated.notify_all();
}
})?;
#[cfg(feature = "event_log")]
_result.wait();
Ok(())
} else {
Ok(())
}
}
impl Debug for IoBufs {
fn fmt(
&self,
formatter: &mut fmt::Formatter<'_>,
) -> std::result::Result<(), fmt::Error> {
formatter.write_fmt(format_args!("IoBufs {{ buf: {:?} }}", self.iobuf))
}
}
impl Debug for IoBuf {
fn fmt(
&self,
formatter: &mut fmt::Formatter<'_>,
) -> std::result::Result<(), fmt::Error> {
let header = self.get_header();
formatter.write_fmt(format_args!(
"\n\tIoBuf {{ lid: {}, n_writers: {}, offset: \
{}, sealed: {} }}",
self.offset,
header::n_writers(header),
header::offset(header),
header::is_sealed(header)
))
}
}