use crate::{
bufpool,
error::{FrozenError, FrozenResult},
ffile, hints, mpscq,
};
use std::{
ptr,
sync::{self, atomic},
thread, time,
};
#[derive(Debug, Clone)]
pub struct WritePipeCfg {
pub module_id: u8,
pub flush_duration: time::Duration,
}
#[derive(Debug)]
pub struct WritePipe {
core: sync::Arc<Core>,
flush_tx_handle: Option<thread::JoinHandle<()>>,
}
unsafe impl Send for WritePipe {}
unsafe impl Sync for WritePipe {}
impl WritePipe {
#[inline]
pub fn new(cfg: WritePipeCfg, file: sync::Arc<ffile::FrozenFile>) -> FrozenResult<Self> {
let core = sync::Arc::new(Core::new(file));
let cloned_core = core.clone();
let flush_tx_handle = match thread::Builder::new()
.name(format!("mod{}_wpipe_flush_tx", cfg.module_id))
.spawn(move || bg_flush_thread(cloned_core, cfg.flush_duration))
{
Ok(handle) => Some(handle),
Err(observed_error) => return Err(err::new_error(cfg.module_id, err::FXE, observed_error)),
};
Ok(Self { core: core, flush_tx_handle })
}
#[inline]
pub fn write(&self, request: WriteRequest) -> FrozenResult<WriteTicket> {
let _io_lock = self.core.acquire_shared_io_lock();
if let Some(frozen_error) = self.core.completion.error.get() {
return Err(frozen_error);
}
let epoch = self.core.increment_current_epoch();
let internal_req = WriteRequestInternal { request, epoch };
self.core.queue.push(internal_req);
Ok(WriteTicket { epoch, completion: self.core.completion.clone() })
}
}
impl Drop for WritePipe {
fn drop(&mut self) {
self.core.closed.store(true, atomic::Ordering::Release);
self.core.flush_cv.notify_one();
if let Some(handle) = self.flush_tx_handle.take() {
let _ = handle.join();
}
}
}
#[derive(Debug)]
pub struct WriteRequest {
pub allocation: bufpool::BufPoolAllocation,
pub slot_index: usize,
}
#[derive(Debug)]
pub struct WriteTicket {
epoch: u64,
completion: sync::Arc<Completion>,
}
impl WriteTicket {
pub const fn epoch(&self) -> u64 {
self.epoch
}
#[inline]
fn is_ready(&self) -> bool {
let durable = self.completion.durable_epoch.load(atomic::Ordering::Acquire);
if hints::likely(durable >= self.epoch) {
return true;
}
false
}
}
impl std::future::Future for WriteTicket {
type Output = FrozenResult<u64>;
#[inline(always)]
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
if self.is_ready() {
return std::task::Poll::Ready(Ok(self.epoch));
}
if let Some(frozen_error) = self.completion.error.get() {
return std::task::Poll::Ready(Err(frozen_error));
}
self.completion.waker.register(cx.waker());
if self.is_ready() {
return std::task::Poll::Ready(Ok(self.epoch));
}
if let Some(frozen_error) = self.completion.error.get() {
return std::task::Poll::Ready(Err(frozen_error));
}
std::task::Poll::Pending
}
}
fn bg_flush_thread(core: sync::Arc<Core>, flush_duration: time::Duration) {
let mut guard = core.flush_guard.lock().unwrap_or_else(|e| e.into_inner());
loop {
(guard, _) = core.flush_cv.wait_timeout(guard, flush_duration).unwrap_or_else(|e| e.into_inner());
let queued_ops = core.queue.drain();
let closed = core.closed.load(atomic::Ordering::Acquire);
if queued_ops.is_empty() {
if closed {
return;
}
continue;
}
let _io_lock = core.acquire_exclusive_io_lock();
let (_min_index, _max_index, max_epoch) = match core.write_queued_ops(queued_ops) {
Ok(res) => res,
Err(new_error) => {
core.completion.error.set(new_error);
core.completion.waker.wake();
drop(_io_lock);
continue;
}
};
#[cfg(target_os = "linux")]
if let Err(new_error) = core.file.sync_range(_min_index, _max_index - _min_index) {
core.completion.error.set(new_error);
core.completion.waker.wake();
drop(_io_lock);
continue;
}
if let Err(new_error) = core.file.sync() {
core.completion.error.set(new_error);
core.completion.waker.wake();
drop(_io_lock);
continue;
} else {
core.completion.mark_epoch_as_durable(max_epoch);
core.completion.error.del();
}
}
}
#[derive(Debug)]
struct Core {
completion: sync::Arc<Completion>,
closed: atomic::AtomicBool,
epoch: atomic::AtomicU64,
file: sync::Arc<ffile::FrozenFile>,
flush_cv: sync::Condvar,
flush_guard: sync::Mutex<()>,
io_lock: sync::RwLock<()>,
queue: mpscq::MPSCQueue<WriteRequestInternal>,
}
impl Core {
fn new(file: sync::Arc<ffile::FrozenFile>) -> Self {
Self {
file,
completion: sync::Arc::new(Completion::default()),
closed: atomic::AtomicBool::new(false),
epoch: atomic::AtomicU64::new(0),
flush_cv: sync::Condvar::new(),
flush_guard: sync::Mutex::new(()),
io_lock: sync::RwLock::new(()),
queue: mpscq::MPSCQueue::default(),
}
}
#[inline]
fn acquire_shared_io_lock(&self) -> sync::RwLockReadGuard<'_, ()> {
self.io_lock.read().unwrap_or_else(|e| e.into_inner())
}
#[inline]
fn acquire_exclusive_io_lock(&self) -> sync::RwLockWriteGuard<'_, ()> {
self.io_lock.write().unwrap_or_else(|e| e.into_inner())
}
#[inline(always)]
fn write_queued_ops(&self, queued_ops: Vec<WriteRequestInternal>) -> FrozenResult<(usize, usize, u64)> {
let mut max_epoch = 0;
let mut max_index = 0;
let mut min_index = usize::MAX;
for op in queued_ops {
let ops_len = op.request.allocation.length();
match ops_len {
1 => {
self.file.pwrite(op.request.allocation.first(), op.request.slot_index)?;
}
_ => {
let bufs: Vec<bufpool::BufferPointer> = op.request.allocation.iter().collect();
self.file.pwritev(&bufs, op.request.slot_index)?;
}
}
max_epoch = max_epoch.max(op.epoch);
min_index = min_index.min(op.request.slot_index);
max_index = max_index.max(op.request.slot_index + ops_len);
}
Ok((min_index, max_index, max_epoch))
}
#[inline]
fn increment_current_epoch(&self) -> u64 {
self.epoch.fetch_add(1, atomic::Ordering::AcqRel).wrapping_add(1)
}
}
#[derive(Debug)]
struct WriteRequestInternal {
epoch: u64,
request: WriteRequest,
}
#[derive(Debug)]
struct Completion {
durable_epoch: atomic::AtomicU64,
error: FlushError,
waker: futures::task::AtomicWaker,
}
impl Default for Completion {
fn default() -> Self {
Self {
durable_epoch: atomic::AtomicU64::new(0),
waker: futures::task::AtomicWaker::new(),
error: FlushError::default(),
}
}
}
impl Completion {
fn mark_epoch_as_durable(&self, epoch: u64) {
self.durable_epoch.store(epoch, atomic::Ordering::Release);
self.waker.wake();
}
}
#[derive(Debug)]
struct FlushError(atomic::AtomicPtr<FrozenError>);
impl Default for FlushError {
fn default() -> Self {
Self(atomic::AtomicPtr::new(ptr::null_mut()))
}
}
impl Drop for FlushError {
fn drop(&mut self) {
let err_ptr = self.0.load(atomic::Ordering::Acquire);
if !err_ptr.is_null() {
let _ = unsafe { Box::from_raw(err_ptr) };
}
}
}
impl FlushError {
#[inline]
fn get(&self) -> Option<FrozenError> {
let curr_err = self.0.load(atomic::Ordering::Acquire);
if hints::unlikely(!curr_err.is_null()) {
let frozen_error = unsafe { (*curr_err).clone() };
return Some(frozen_error);
}
None
}
#[inline]
fn set(&self, new_error: FrozenError) {
let boxed_error = Box::into_raw(Box::new(new_error));
let old_err = self.0.swap(boxed_error, atomic::Ordering::AcqRel);
if hints::unlikely(!old_err.is_null()) {
let _ = unsafe { Box::from_raw(old_err) };
}
}
#[inline]
fn del(&self) {
let old_err = self.0.swap(ptr::null_mut(), atomic::Ordering::AcqRel);
if hints::unlikely(!old_err.is_null()) {
let _ = unsafe { Box::from_raw(old_err) };
}
}
}
mod err {
use crate::error::{ErrCode, FrozenError};
const DOMAIN_ID: u8 = 0x02;
#[inline]
pub fn new_error<E: std::fmt::Display>(module_id: u8, code: ErrCode, observed_error: E) -> FrozenError {
FrozenError::new_raw(module_id, DOMAIN_ID, code, observed_error)
}
pub const FXE: ErrCode = ErrCode::new(0x10, "unable to spawn background flush thread");
}
#[cfg(test)]
mod tests {
use super::*;
use crate::utils::BufferSize;
const MODULE_ID: u8 = 0x00;
const BUFFER_SIZE: BufferSize = BufferSize::S128;
const INITIAL_BUFFER_AMOUT: usize = 0x200;
const FLUSH_DURATION: time::Duration = time::Duration::from_millis(1);
fn new_objects<P: AsRef<std::path::Path>>(path: P) -> (sync::Arc<ffile::FrozenFile>, bufpool::BufPool, WritePipe) {
let file_cfg = ffile::FrozenFileCfg {
module_id: MODULE_ID,
path: path.as_ref().to_path_buf(),
buffer_size: BUFFER_SIZE as usize,
initial_available_buffers: INITIAL_BUFFER_AMOUT,
};
let file = sync::Arc::new(ffile::FrozenFile::new(file_cfg).unwrap());
let pool_cfg =
bufpool::BufPoolCfg { buffer_size: BUFFER_SIZE, max_memory: INITIAL_BUFFER_AMOUT * BUFFER_SIZE as usize };
let pool = bufpool::BufPool::new(pool_cfg);
let pipe_cfg = WritePipeCfg { module_id: MODULE_ID, flush_duration: FLUSH_DURATION };
let pipe = WritePipe::new(pipe_cfg, file.clone()).unwrap();
(file, pool, pipe)
}
fn prep_write(buf_ptr: *const u8, n: usize, pool: &bufpool::BufPool) -> bufpool::BufPoolAllocation {
let allocation = pool.allocate(n);
for allocated_buf in allocation.iter() {
unsafe { ptr::copy_nonoverlapping(buf_ptr, allocated_buf, BUFFER_SIZE as usize) };
}
allocation
}
fn compare_with_readback(
buf: &[u8],
read_index: usize,
required: usize,
pool: &bufpool::BufPool,
file: &ffile::FrozenFile,
) {
let read_allocation = pool.allocate(required);
let read_bufs: Vec<bufpool::BufferPointer> = read_allocation.iter().collect();
file.preadv(&read_bufs, read_index).unwrap();
for read_buf in read_allocation.iter() {
let observed = unsafe { std::slice::from_raw_parts(read_buf, BUFFER_SIZE as usize) };
assert_eq!(buf, observed);
}
}
mod lifecycle {
use super::*;
#[test]
fn ok_new() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("write_single");
let file_cfg = ffile::FrozenFileCfg {
path,
module_id: MODULE_ID,
buffer_size: BUFFER_SIZE as usize,
initial_available_buffers: INITIAL_BUFFER_AMOUT,
};
let file = sync::Arc::new(ffile::FrozenFile::new(file_cfg).unwrap());
let pipe_cfg = WritePipeCfg { module_id: MODULE_ID, flush_duration: FLUSH_DURATION };
assert!(WritePipe::new(pipe_cfg, file).is_ok());
}
#[test]
fn ok_drop() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("write_single");
let (_file, _, pipe) = new_objects(path);
drop(pipe);
}
}
mod shutdown {
use super::*;
#[test]
fn ok_drop_before_pending_write_call() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("write_single");
let (_file, pool, pipe) = new_objects(path);
const BUFFER: [u8; BUFFER_SIZE as usize] = [0x20; BUFFER_SIZE as usize];
let allocation = prep_write(BUFFER.as_ptr(), 1, &pool);
let request = WriteRequest { allocation, slot_index: 0 };
assert!(pipe.write(request).is_ok());
drop(pipe);
}
#[test]
fn ok_drop_waits_for_pending_write_call() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("write_single");
const BUFFER: [u8; BUFFER_SIZE as usize] = [0x20; BUFFER_SIZE as usize];
{
let (_file, pool, pipe) = new_objects(path.clone());
let allocation = prep_write(BUFFER.as_ptr(), 1, &pool);
let request = WriteRequest { allocation, slot_index: 0 };
assert!(pipe.write(request).is_ok());
drop(pipe);
}
{
let (file, pool, _) = new_objects(path);
compare_with_readback(&BUFFER, 0, 1, &pool, &file);
}
}
#[test]
fn ok_drop_does_not_deadlock_when_multiple_pending_writes() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("write_single");
let (_file, pool, pipe) = new_objects(path);
for i in 0..INITIAL_BUFFER_AMOUT {
let buffer = vec![i as u8; BUFFER_SIZE as usize];
let allocation = prep_write(buffer.as_ptr(), 1, &pool);
let request = WriteRequest { allocation, slot_index: 0 };
assert!(pipe.write(request).is_ok());
}
drop(pipe);
}
#[test]
fn ok_drop_correctly_waits_for_pending_write_with_multi_threads() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("write_single");
let (_file, pool, pipe) = new_objects(path);
const BUFFER: [u8; BUFFER_SIZE as usize] = [0x20; BUFFER_SIZE as usize];
let pipe = sync::Arc::new(pipe);
let pipe2 = sync::Arc::clone(&pipe);
let handle = thread::spawn(move || {
let allocation = prep_write(BUFFER.as_ptr(), 1, &pool);
let request = WriteRequest { allocation, slot_index: 0 };
assert!(pipe2.write(request).is_ok());
});
drop(pipe);
handle.join().unwrap();
}
}
mod pipe_writes {
use super::*;
#[test]
fn ok_write() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("write_single");
let (_file, pool, pipe) = new_objects(path);
const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
let allocation = prep_write(BUFFER.as_ptr(), 0x0A, &pool);
let request = WriteRequest { allocation, slot_index: 0 };
assert!(pipe.write(request).is_ok());
}
#[test]
fn ok_write_epoch_is_monotonic() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("write_single");
let (_file, pool, pipe) = new_objects(path);
const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
let allocation1 = prep_write(BUFFER.as_ptr(), 1, &pool);
let ticket1 = pipe.write(WriteRequest { allocation: allocation1, slot_index: 0 }).unwrap();
let allocation2 = prep_write(BUFFER.as_ptr(), 1, &pool);
let ticket2 = pipe.write(WriteRequest { allocation: allocation2, slot_index: 1 }).unwrap();
let allocation3 = prep_write(BUFFER.as_ptr(), 1, &pool);
let ticket3 = pipe.write(WriteRequest { allocation: allocation3, slot_index: 2 }).unwrap();
assert!(ticket3.epoch() > ticket2.epoch());
assert!(ticket2.epoch() > ticket1.epoch());
}
}
mod write_ticket {
use super::*;
#[test]
fn ok_readback_after_write_with_await() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("write_single");
let (file, pool, pipe) = new_objects(path);
const REQUIRED: usize = 0x0A;
const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
let write_allocation = prep_write(BUFFER.as_ptr(), REQUIRED, &pool);
let request = WriteRequest { allocation: write_allocation, slot_index: 0 };
let ticket = pipe.write(request).unwrap();
let ticket_epoch = ticket.epoch();
let durable_epoch = futures::executor::block_on(ticket).unwrap();
assert!(durable_epoch >= ticket_epoch);
compare_with_readback(&BUFFER, 0, REQUIRED, &pool, &file);
}
#[test]
fn ok_readback_after_batch_write() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("write_single");
let (file, pool, pipe) = new_objects(path);
const BUFFERS: [([u8; BUFFER_SIZE as usize], usize); 5] = [
([0x0Au8; BUFFER_SIZE as usize], 0x1A),
([0x0Bu8; BUFFER_SIZE as usize], 0x1B),
([0x0Cu8; BUFFER_SIZE as usize], 0x1C),
([0x0Du8; BUFFER_SIZE as usize], 0x1D),
([0x0Eu8; BUFFER_SIZE as usize], 0x1E),
];
let mut slot_index = 0;
let mut latest_ticket = None;
for (buf, required) in BUFFERS {
let allocation = prep_write(buf.as_ptr(), required, &pool);
let request = WriteRequest { allocation, slot_index };
let ticket = pipe.write(request).unwrap();
slot_index += required;
latest_ticket = Some(ticket);
}
assert!(latest_ticket.is_some());
if let Some(ticket) = latest_ticket {
let ticket_epoch = ticket.epoch();
let durable_epoch = futures::executor::block_on(ticket).unwrap();
assert!(durable_epoch >= ticket_epoch);
}
let mut read_index = 0;
for (buf, required) in BUFFERS {
compare_with_readback(&buf, read_index, required, &pool, &file);
read_index += required;
}
}
#[test]
fn ok_multiple_concurrent_awaits() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("write_single");
let (_file, pool, pipe) = new_objects(path);
const REQUIRED: usize = 0x0A;
const BUFFER: [u8; BUFFER_SIZE as usize] = [0x0Au8; BUFFER_SIZE as usize];
let allocation1 = prep_write(BUFFER.as_ptr(), REQUIRED, &pool);
let ticket1 = pipe.write(WriteRequest { allocation: allocation1, slot_index: 0 }).unwrap();
let allocation2 = prep_write(BUFFER.as_ptr(), REQUIRED, &pool);
let ticket2 = pipe.write(WriteRequest { allocation: allocation2, slot_index: 0 }).unwrap();
let (e1, e2) = futures::executor::block_on(async { futures::join!(ticket1, ticket2) });
assert!(e1.is_ok());
assert!(e2.is_ok());
assert!(e2.unwrap() > e1.unwrap());
}
#[test]
fn ok_awaiting_last_ticket_implies_previous_writes_are_durable() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("durability_boundary");
let (file, pool, pipe) = new_objects(path);
const BUFFER_A: [u8; BUFFER_SIZE as usize] = [0xAA; BUFFER_SIZE as usize];
const BUFFER_B: [u8; BUFFER_SIZE as usize] = [0xBB; BUFFER_SIZE as usize];
const BUFFER_C: [u8; BUFFER_SIZE as usize] = [0xCC; BUFFER_SIZE as usize];
let alloc_a = prep_write(BUFFER_A.as_ptr(), 1, &pool);
let ticket_a = pipe.write(WriteRequest { allocation: alloc_a, slot_index: 0 }).unwrap();
let alloc_b = prep_write(BUFFER_B.as_ptr(), 1, &pool);
let ticket_b = pipe.write(WriteRequest { allocation: alloc_b, slot_index: 1 }).unwrap();
let alloc_c = prep_write(BUFFER_C.as_ptr(), 1, &pool);
let ticket_c = pipe.write(WriteRequest { allocation: alloc_c, slot_index: 2 }).unwrap();
let epoch_a = ticket_a.epoch();
let epoch_b = ticket_b.epoch();
let epoch_c = ticket_c.epoch();
let durable_epoch = futures::executor::block_on(ticket_c).unwrap();
assert!(durable_epoch >= epoch_c);
assert!(durable_epoch >= epoch_b);
assert!(durable_epoch >= epoch_a);
compare_with_readback(&BUFFER_A, 0, 1, &pool, &file);
compare_with_readback(&BUFFER_B, 1, 1, &pool, &file);
compare_with_readback(&BUFFER_C, 2, 1, &pool, &file);
}
}
mod concurrency {
use super::*;
#[test]
fn ok_multi_threaded_writers() {
const THREADS: usize = 4;
const WRITES_PER_THREAD: usize = 0x40;
const _: () = assert!(THREADS * WRITES_PER_THREAD < INITIAL_BUFFER_AMOUT);
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("multi_threaded_writers");
let (_file, pool, pipe) = new_objects(path);
let pipe = sync::Arc::new(pipe);
let pool = sync::Arc::new(pool);
let mut handles = Vec::with_capacity(THREADS);
for tid in 0..THREADS {
let pipe = sync::Arc::clone(&pipe);
let pool = sync::Arc::clone(&pool);
handles.push(thread::spawn(move || {
let mut tickets = Vec::with_capacity(WRITES_PER_THREAD);
for i in 0..WRITES_PER_THREAD {
let buffer = vec![tid as u8; BUFFER_SIZE as usize];
let allocation = prep_write(buffer.as_ptr(), 1, &pool);
let slot_index = tid * WRITES_PER_THREAD + i;
let ticket = pipe.write(WriteRequest { allocation, slot_index }).unwrap();
tickets.push(ticket);
}
tickets
}));
}
let mut tickets = Vec::new();
for handle in handles {
tickets.extend(handle.join().unwrap());
}
assert_eq!(tickets.len(), THREADS * WRITES_PER_THREAD,);
let mut epochs: Vec<u64> = tickets.iter().map(WriteTicket::epoch).collect();
epochs.sort_unstable();
for (ed, observed) in (1u64..=epochs.len() as u64).zip(epochs.iter().copied()) {
assert_eq!(ed, observed);
}
let latest_ticket = tickets.into_iter().max_by_key(WriteTicket::epoch).unwrap();
let durable_epoch = futures::executor::block_on(latest_ticket).unwrap();
assert_eq!(durable_epoch, (THREADS * WRITES_PER_THREAD) as u64,);
}
}
}