use std::io::{Seek, Write};
#[cfg(feature = "zstd")]
use zstd::block::compress;
use super::*;
#[doc(hidden)]
pub const HEADER_LEN: usize = 7;
struct IoBuf {
buf: UnsafeCell<Vec<u8>>,
header: AtomicUsize,
log_offset: AtomicUsize,
}
unsafe impl Sync for IoBuf {}
impl Debug for IoBuf {
fn fmt(&self, formatter: &mut fmt::Formatter) -> Result<(), fmt::Error> {
let header = self.get_header();
formatter.write_fmt(format_args!(
"\n\tIoBuf {{ log_offset: {}, n_writers: {}, offset: \
{}, sealed: {} }}",
self.get_log_offset(),
n_writers(header),
offset(header),
is_sealed(header)
))
}
}
impl IoBuf {
fn new(buf_size: usize) -> IoBuf {
IoBuf {
buf: UnsafeCell::new(vec![0; buf_size]),
header: AtomicUsize::new(0),
log_offset: AtomicUsize::new(std::usize::MAX),
}
}
fn set_log_offset(&self, offset: LogID) {
self.log_offset.store(offset as usize, SeqCst);
}
fn get_log_offset(&self) -> LogID {
self.log_offset.load(SeqCst) as LogID
}
fn get_header(&self) -> u32 {
self.header.load(SeqCst) as u32
}
fn set_header(&self, new: u32) {
self.header.store(new as usize, SeqCst);
}
fn cas_header(&self, old: u32, new: u32) -> Result<u32, u32> {
let res = self.header.compare_and_swap(
old as usize,
new as usize,
SeqCst,
) as u32;
if res == old { Ok(new) } else { Err(res) }
}
fn cas_log_offset(&self, old: LogID, new: LogID) -> Result<LogID, LogID> {
let res = self.log_offset.compare_and_swap(
old as usize,
new as usize,
SeqCst,
) as LogID;
if res == old { Ok(new) } else { Err(res) }
}
}
pub struct IoBufs {
config: Config,
bufs: Vec<IoBuf>,
current_buf: AtomicUsize,
written_bufs: AtomicUsize,
intervals: Mutex<Vec<(LogID, LogID)>>,
stable: AtomicUsize,
file_for_writing: Mutex<std::fs::File>,
}
impl Debug for IoBufs {
fn fmt(&self, formatter: &mut fmt::Formatter) -> Result<(), fmt::Error> {
let current_buf = self.current_buf.load(SeqCst);
let written_bufs = self.written_bufs.load(SeqCst);
formatter.write_fmt(format_args!(
"IoBufs {{ sealed: {}, written: {}, bufs: {:?} }}",
current_buf,
written_bufs,
self.bufs
))
}
}
impl IoBufs {
pub fn new(config: Config) -> IoBufs {
let disk_offset = {
let cached_f = config.cached_file();
let file = cached_f.borrow();
file.metadata().unwrap().len()
};
let bufs = rep_no_copy![IoBuf::new(config.get_io_buf_size()); config.get_io_bufs()];
let current_buf = 0;
bufs[current_buf].set_log_offset(disk_offset);
let path = config.get_path().unwrap_or_else(|| config.get_tmp_path());
let mut options = std::fs::OpenOptions::new();
options.create(true);
options.write(true);
let file = options.open(path).unwrap();
IoBufs {
bufs: bufs,
current_buf: AtomicUsize::new(current_buf),
written_bufs: AtomicUsize::new(0),
intervals: Mutex::new(vec![]),
stable: AtomicUsize::new(disk_offset as usize),
config: config,
file_for_writing: Mutex::new(file),
}
}
pub fn config(&self) -> &Config {
&self.config
}
fn idx(&self) -> usize {
let current_buf = self.current_buf.load(SeqCst);
current_buf % self.config.get_io_bufs()
}
pub(super) fn stable(&self) -> LogID {
self.stable.load(SeqCst) as LogID
}
pub(super) fn reserve(&self, raw_buf: Vec<u8>) -> Reservation {
let start = clock();
assert_eq!((raw_buf.len() + HEADER_LEN) >> 32, 0);
let buf = encapsulate(raw_buf, self.config.get_use_compression());
assert!(buf.len() <= self.config.get_io_buf_size());
let mut printed = false;
macro_rules! trace_once {
($($msg:expr),*) => {
if !printed {
#[cfg(feature = "log")]
trace!($($msg),*);
printed = true;
}};
}
let mut spins = 0;
loop {
let written_bufs = self.written_bufs.load(SeqCst);
let current_buf = self.current_buf.load(SeqCst);
let idx = current_buf % self.config.get_io_bufs();
spins += 1;
if spins > 1_000_000 {
#[cfg(feature = "log")]
debug!("{:?} stalling in reserve, idx {}", tn(), idx);
spins = 0;
}
if written_bufs > current_buf {
trace_once!("({:?}) written ahead of sealed, spinning", tn());
M.log_looped();
continue;
}
if current_buf - written_bufs >= self.config.get_io_bufs() {
trace_once!("({:?}) old io buffer not written yet, spinning", tn());
M.log_looped();
continue;
}
let iobuf = &self.bufs[idx];
let header = iobuf.get_header();
if is_sealed(header) {
trace_once!("({:?}) io buffer already sealed, spinning", tn());
M.log_looped();
continue;
}
let buf_offset = offset(header);
if buf_offset as LogID + buf.len() as LogID > self.config.get_io_buf_size() as LogID {
self.maybe_seal_and_write_iobuf(idx, header);
trace_once!("({:?}) io buffer too full, spinning", tn());
M.log_looped();
continue;
}
let bumped_offset = bump_offset(header, buf.len() as u32);
let claimed = incr_writers(bumped_offset);
assert!(!is_sealed(claimed));
if iobuf.cas_header(header, claimed).is_err() {
trace_once!("({:?}) CAS failed while claiming buffer slot, spinning", tn());
M.log_looped();
continue;
}
assert_ne!(n_writers(claimed), 0);
let log_offset = iobuf.get_log_offset();
assert_ne!(
log_offset as usize,
std::usize::MAX,
"({:?}) fucked up on idx {}\n{:?}",
tn(),
idx,
self
);
let out_buf = unsafe { (*iobuf.buf.get()).as_mut_slice() };
let res_start = buf_offset as usize;
let res_end = res_start + buf.len();
let destination = &mut (out_buf)[res_start..res_end];
let reservation_offset = log_offset + buf_offset as LogID;
M.reserve.measure(clock() - start);
return Reservation {
idx: idx,
iobufs: self,
data: buf,
destination: destination,
flushed: false,
reservation_offset: reservation_offset,
};
}
}
pub(super) fn exit_reservation(&self, idx: usize) {
let iobuf = &self.bufs[idx];
let mut header = iobuf.get_header();
let mut spins = 0;
loop {
spins += 1;
if spins > 10 {
#[cfg(feature = "log")]
debug!("{:?} have spun >10x in decr", tn());
spins = 0;
}
let new_hv = decr_writers(header);
match iobuf.cas_header(header, new_hv) {
Ok(new) => {
header = new;
break;
}
Err(new) => {
header = new;
}
}
}
if n_writers(header) == 0 && is_sealed(header) {
self.write_to_log(idx);
}
}
pub(super) fn flush(&self) {
let idx = self.idx();
let header = self.bufs[idx].get_header();
if offset(header) == 0 || is_sealed(header) {
return;
}
self.maybe_seal_and_write_iobuf(idx, header);
}
fn maybe_seal_and_write_iobuf(&self, idx: usize, header: u32) {
let iobuf = &self.bufs[idx];
if is_sealed(header) {
return;
}
let log_offset = iobuf.get_log_offset();
let sealed = mk_sealed(header);
if iobuf.cas_header(header, sealed).is_err() {
return;
}
#[cfg(feature = "log")]
trace!("({:?}) {} sealed", tn(), idx);
let res_len = offset(sealed) as usize;
let max = std::usize::MAX as LogID;
assert_ne!(
log_offset,
max,
"({:?}) sealing something that should never have been claimed (idx {})\n{:?}",
tn(),
idx,
self
);
let next_offset = log_offset + res_len as LogID;
let next_idx = (idx + 1) % self.config.get_io_bufs();
let next_iobuf = &self.bufs[next_idx];
let mut spins = 0;
while next_iobuf.cas_log_offset(max, next_offset).is_err() {
spins += 1;
if spins > 1_000_000 {
#[cfg(feature = "log")]
debug!("have spun >1,000,000x in seal of buf {}", idx);
spins = 0;
}
}
#[cfg(feature = "log")]
trace!("({:?}) {} log set", tn(), next_idx);
next_iobuf.set_header(0);
#[cfg(feature = "log")]
trace!("({:?}) {} zeroed header", tn(), next_idx);
let _current_buf = self.current_buf.fetch_add(1, SeqCst) + 1;
#[cfg(feature = "log")]
trace!(
"({:?}) {} current_buf",
tn(),
_current_buf % self.config.get_io_bufs()
);
if n_writers(sealed) == 0 {
self.write_to_log(idx);
}
}
fn write_to_log(&self, idx: usize) {
let start = clock();
let iobuf = &self.bufs[idx];
let header = iobuf.get_header();
let log_offset = iobuf.get_log_offset();
let interval = (log_offset, log_offset + offset(header) as LogID);
assert_ne!(
log_offset as usize,
std::usize::MAX,
"({:?}) created reservation for uninitialized slot",
tn()
);
let res_len = offset(header) as usize;
let data = unsafe { (*iobuf.buf.get()).as_mut_slice() };
let dirty_bytes = &data[0..res_len];
let mut f = self.file_for_writing.lock().unwrap();
f.seek(SeekFrom::Start(log_offset)).unwrap();
f.write_all(dirty_bytes).unwrap();
let max = std::usize::MAX as LogID;
iobuf.set_log_offset(max);
#[cfg(feature = "log")]
trace!("({:?}) {} log <- MAX", tn(), idx);
let _written_bufs = self.written_bufs.fetch_add(1, SeqCst);
#[cfg(feature = "log")]
trace!("({:?}) {} written", tn(), _written_bufs % self.config.get_io_bufs());
self.mark_interval(interval);
M.write_to_log.measure(clock() - start);
}
fn mark_interval(&self, interval: (LogID, LogID)) {
let mut intervals = self.intervals.lock().unwrap();
intervals.push(interval);
intervals.sort();
while let Some(&(low, high)) = intervals.get(0) {
let cur_stable = self.stable.load(SeqCst) as LogID;
if cur_stable == low {
let old = self.stable.swap(high as usize, SeqCst);
assert_eq!(old, cur_stable as usize);
intervals.remove(0);
} else {
break;
}
}
}
}
impl Drop for IoBufs {
fn drop(&mut self) {
for _ in 0..self.config.get_io_bufs() {
self.flush();
}
let f = self.file_for_writing.lock().unwrap();
f.sync_all().unwrap();
}
}
#[inline(always)]
fn encapsulate(raw_buf: Vec<u8>, _use_compression: bool) -> Vec<u8> {
#[cfg(feature = "zstd")]
let mut buf = if _use_compression {
let start = clock();
let res = compress(&*raw_buf, 5).unwrap();
M.compress.measure(clock() - start);
res
} else {
raw_buf
};
#[cfg(not(feature = "zstd"))]
let mut buf = raw_buf;
let size_bytes: [u8; 4] = unsafe { std::mem::transmute(buf.len() as u32) };
let mut valid_bytes = vec![1u8];
let mut crc16_bytes = crc16_arr(&buf).to_vec();
let mut out = Vec::with_capacity(HEADER_LEN + buf.len());
out.append(&mut valid_bytes);
out.append(&mut size_bytes.to_vec());
out.append(&mut crc16_bytes);
assert_eq!(out.len(), HEADER_LEN);
out.append(&mut buf);
out
}
#[inline(always)]
fn is_sealed(v: u32) -> bool {
v >> 31 == 1
}
#[inline(always)]
fn mk_sealed(v: u32) -> u32 {
v | 1 << 31
}
#[inline(always)]
fn n_writers(v: u32) -> u32 {
v << 1 >> 25
}
#[inline(always)]
fn incr_writers(v: u32) -> u32 {
assert_ne!(n_writers(v), 127);
v + (1 << 24)
}
#[inline(always)]
fn decr_writers(v: u32) -> u32 {
assert_ne!(n_writers(v), 0);
v - (1 << 24)
}
#[inline(always)]
fn offset(v: u32) -> u32 {
v << 8 >> 8
}
#[inline(always)]
fn bump_offset(v: u32, by: u32) -> u32 {
assert_eq!(by >> 24, 0);
v + by
}