#[cfg(any(target_os = "linux", target_os = "macos"))]
mod posix;
use crate::{
error::{ErrCode, FrozenErr, FrozenRes},
ffile::{FFCfg, FrozenFile},
hints,
};
use std::{
fmt,
sync::{self, atomic},
thread, time,
};
const ERRDOMAIN: u8 = 0x12;
pub type TEpoch = u64;
#[cfg(any(target_os = "linux", target_os = "macos"))]
type TMap = posix::POSIXMMap;
static MID: sync::OnceLock<u8> = sync::OnceLock::new();
#[cfg(not(test))]
#[inline(always)]
fn mid() -> &'static u8 {
MID.get().unwrap()
}
#[cfg(test)]
#[inline(always)]
fn mid() -> &'static u8 {
MID.get_or_init(|| 0)
}
pub(in crate::fmmap) mod err {
use super::ErrCode;
pub const HCF: ErrCode = ErrCode::new(0x200, "hault and catch fire");
pub const UNK: ErrCode = ErrCode::new(0x201, "unknown error");
pub const NMM: ErrCode = ErrCode::new(0x202, "not enough memory available on the device");
pub const SYN: ErrCode = ErrCode::new(0x203, "failed to sync/flush data to storage device");
pub const PRM: ErrCode = ErrCode::new(0x204, "missing permissions for IO");
pub const TXE: ErrCode = ErrCode::new(0x205, "flush_tx paniced inside");
pub const FXE: ErrCode = ErrCode::new(0x206, "unable to spawn flush_tx");
pub const LPN: ErrCode = ErrCode::new(0x207, "lock poisoned internally");
pub const DRP: ErrCode = ErrCode::new(0x208, "type T must not implement `Drop`");
pub const ALN: ErrCode = ErrCode::new(0x209, "type T must be 8-bytes aligned");
pub const SZE: ErrCode = ErrCode::new(0x20A, "`size_of::<T>()` must be multiple of 8 bytes");
pub const ZRO: ErrCode = ErrCode::new(0x20B, "type T must not be zero sized");
}
#[inline]
pub(in crate::fmmap) fn new_err<R, E: std::fmt::Display>(code: ErrCode, error: E) -> FrozenRes<R> {
let err = FrozenErr::new_raw(*mid(), ERRDOMAIN, code, error);
Err(err)
}
#[inline]
pub(in crate::fmmap) fn new_err_default<R>(code: ErrCode) -> FrozenRes<R> {
let err = FrozenErr::new_raw(*mid(), ERRDOMAIN, code, "");
Err(err)
}
#[inline]
pub(in crate::fmmap) fn new_err_raw<E: std::fmt::Display>(code: ErrCode, error: E) -> FrozenErr {
FrozenErr::new_raw(*mid(), ERRDOMAIN, code, error)
}
#[derive(Debug, Clone)]
pub struct FMCfg {
pub initial_count: usize,
pub flush_duration: time::Duration,
}
#[derive(Debug)]
pub struct FrozenMMap<T, const MODULE_ID: u8>
where
T: Sized + Send + Sync,
{
core: sync::Arc<Core>,
tx: Option<thread::JoinHandle<()>>,
_t: core::marker::PhantomData<T>,
}
unsafe impl<T, const MODULE_ID: u8> Send for FrozenMMap<T, MODULE_ID> where T: Sized + Send + Sync {}
unsafe impl<T, const MODULE_ID: u8> Sync for FrozenMMap<T, MODULE_ID> where T: Sized + Send + Sync {}
impl<T, const MODULE_ID: u8> FrozenMMap<T, MODULE_ID>
where
T: Sized + Send + Sync,
{
pub const SLOT_SIZE: usize = std::mem::size_of::<T>();
pub fn new<P: AsRef<std::path::Path>>(path: P, cfg: FMCfg) -> FrozenRes<Self> {
Self::validate_t()?;
let (file, curr_length) = Self::open_file(path.as_ref().to_path_buf(), &cfg)?;
let total_slots = curr_length / Self::SLOT_SIZE;
let _ = MID.get_or_init(|| MODULE_ID);
let mmap = unsafe { TMap::new(file.fd(), curr_length) }?;
let core = sync::Arc::new(Core::new(mmap, file, cfg.flush_duration, curr_length, total_slots));
let tx = Core::spawn_tx(core.clone())?;
Ok(Self {
core,
tx: Some(tx),
_t: core::marker::PhantomData,
})
}
pub fn new_grown<P: AsRef<std::path::Path>>(path: P, cfg: FMCfg, additional_slots: usize) -> FrozenRes<Self> {
Self::validate_t()?;
let (file, _) = Self::open_file(path.as_ref().to_path_buf(), &cfg)?;
file.grow(additional_slots)?;
let curr_length = file.length()?; let total_slots = curr_length / Self::SLOT_SIZE;
let _ = MID.get_or_init(|| MODULE_ID);
let mmap = unsafe { TMap::new(file.fd(), curr_length) }?;
let core = sync::Arc::new(Core::new(mmap, file, cfg.flush_duration, curr_length, total_slots));
let tx = Core::spawn_tx(core.clone())?;
Ok(Self {
core,
tx: Some(tx),
_t: core::marker::PhantomData,
})
}
fn open_file(path: std::path::PathBuf, cfg: &FMCfg) -> FrozenRes<(FrozenFile, usize)> {
let ff_cfg = FFCfg {
path,
chunk_size: Self::SLOT_SIZE,
initial_chunk_amount: cfg.initial_count,
};
let file = FrozenFile::new::<MODULE_ID>(ff_cfg)?;
let curr_length = file.length()?;
Ok((file, curr_length))
}
#[inline]
fn validate_t() -> FrozenRes<()> {
if std::mem::needs_drop::<T>() {
return new_err_default(err::DRP);
}
let align = std::mem::align_of::<T>();
if align != 8 {
return new_err_default(err::ALN);
}
let size = std::mem::size_of::<T>();
if size == 0 {
return new_err_default(err::ZRO);
}
if size % 8 != 0 {
return new_err_default(err::SZE);
}
Ok(())
}
pub fn wait_for_durability(&self, epoch: u64) -> FrozenRes<()> {
if let Some(sync_err) = self.core.get_sync_error() {
return Err(sync_err);
}
let durable_epoch = self.core.durable_epoch.load(atomic::Ordering::Acquire);
if durable_epoch >= epoch {
return Ok(());
}
let mut guard = match self.core.durable_lock.lock() {
Ok(g) => g,
Err(e) => return new_err(err::LPN, e),
};
loop {
if let Some(sync_err) = self.core.get_sync_error() {
return Err(sync_err);
}
if self.core.durable_epoch.load(atomic::Ordering::Acquire) >= epoch {
return Ok(());
}
guard = match self.core.durable_cv.wait(guard) {
Ok(g) => g,
Err(e) => return new_err(err::LPN, e),
};
}
}
#[inline(always)]
pub unsafe fn read<R>(&self, index: usize, f: impl FnOnce(*const T) -> R) -> FrozenRes<R> {
let offset = Self::SLOT_SIZE * index;
let _lock = self.core.locks.lock(index);
let ptr = unsafe { self.core.map.as_ptr(offset) };
Ok(f(ptr))
}
#[inline(always)]
pub unsafe fn write(&self, index: usize, f: impl FnOnce(*mut T)) -> FrozenRes<TEpoch> {
if let Some(err) = self.core.get_sync_error() {
return Err(err);
}
let offset = Self::SLOT_SIZE * index;
let _guard = self.core.acquire_io_lock()?;
let _lock = self.core.locks.lock(index);
let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
f(ptr);
self.core.dirty.store(true, atomic::Ordering::Release);
let epoch = self.core.incr_curr_epoch();
Ok(epoch)
}
#[inline(always)]
pub unsafe fn write_sync(&self, index: usize, f: impl FnOnce(*mut T)) -> FrozenRes<()> {
if let Some(err) = self.core.get_sync_error() {
return Err(err);
}
let offset = Self::SLOT_SIZE * index;
let _flush_guard = self.core.lock.lock().map_err(|e| new_err_raw(err::LPN, e))?;
let _guard = self.core.acquire_exclusive_io_lock()?;
let _lock = self.core.locks.lock(index);
let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
f(ptr);
self.core.sync()?;
self.core.mark_epoch_durable();
let prev = self.core.dirty.swap(false, atomic::Ordering::AcqRel);
if prev {
let _g = self.core.durable_lock.lock().map_err(|e| new_err_raw(err::LPN, e))?;
self.core.durable_cv.notify_all();
}
Ok(())
}
#[inline]
pub fn total_slots(&self) -> usize {
self.core.curr_length / Self::SLOT_SIZE
}
#[inline]
pub fn memory_usage(&self) -> usize {
let mmap_bytes = self.core.curr_length;
let lock_bytes = self.core.locks.0.len() * std::mem::size_of::<atomic::AtomicU8>();
mmap_bytes + lock_bytes
}
#[inline]
pub fn new_tx(&self) -> FMTransaction<'_, T> {
FMTransaction {
core: &self.core,
ops_vec: Vec::new(),
}
}
pub fn delete(&mut self) -> FrozenRes<()> {
self.core.dirty.store(false, atomic::Ordering::Release);
self.core.closed.store(true, atomic::Ordering::Release);
self.core.durable_cv.notify_one();
if let Some(handle) = self.tx.take() {
let _ = handle.join();
}
let _lock = self.core.acquire_exclusive_io_lock()?;
self.munmap()?;
self.core.file.delete()
}
#[inline]
fn munmap(&self) -> FrozenRes<()> {
let length = self.core.curr_length;
unsafe { self.core.map.unmap(length) }
}
}
impl<T, const MODULE_ID: u8> Drop for FrozenMMap<T, MODULE_ID>
where
T: Sized + Send + Sync,
{
fn drop(&mut self) {
let is_closed = self.core.closed.swap(true, atomic::Ordering::Release);
self.core.cv.notify_one();
if let Some(handle) = self.tx.take() {
let _ = handle.join();
}
let _io_lock = self.core.acquire_exclusive_io_lock();
let ptr = self.core.error.swap(std::ptr::null_mut(), atomic::Ordering::AcqRel);
if !ptr.is_null() {
unsafe {
drop(Box::from_raw(ptr));
}
}
if !is_closed {
let _ = self.munmap();
}
}
}
impl<T, const MODULE_ID: u8> fmt::Display for FrozenMMap<T, MODULE_ID>
where
T: Sized + Send + Sync,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"FrozenMMap{{fd: {}, total_slots: {}, len: {}}}",
self.core.file.fd(),
self.total_slots(),
self.core.curr_length,
)
}
}
pub struct FMTransaction<'a, T> {
core: &'a Core,
ops_vec: Vec<(usize, Box<dyn FnOnce(*mut T) + 'a>)>,
}
impl<'a, T> FMTransaction<'a, T> {
#[inline(always)]
pub unsafe fn write<F>(&mut self, index: usize, f: F) -> FrozenRes<()>
where
F: FnOnce(*mut T) + 'a,
{
if let Some((last_idx, _)) = self.ops_vec.last() {
if index <= *last_idx {
return new_err(
err::HCF,
"tx writes must be strictly ordered, with no more then single ops on given index",
);
}
}
self.ops_vec.push((index, Box::new(f)));
Ok(())
}
#[inline(always)]
pub fn commit(self) -> FrozenRes<u64> {
if let Some(err) = self.core.get_sync_error() {
return Err(err);
}
let _guard = self.core.acquire_io_lock()?;
let mut guards = Vec::with_capacity(self.ops_vec.len());
for (idx, _) in &self.ops_vec {
guards.push(self.core.locks.lock(*idx));
}
for (idx, op) in self.ops_vec {
let offset = idx * std::mem::size_of::<T>();
let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
op(ptr);
}
self.core.dirty.store(true, atomic::Ordering::Release);
let epoch = self.core.incr_curr_epoch();
Ok(epoch)
}
}
#[derive(Debug)]
struct Core {
map: TMap,
locks: Locks,
file: FrozenFile,
cv: sync::Condvar,
curr_length: usize,
lock: sync::Mutex<()>,
io_lock: sync::RwLock<()>,
dirty: atomic::AtomicBool,
durable_cv: sync::Condvar,
closed: atomic::AtomicBool,
durable_lock: sync::Mutex<()>,
flush_duration: time::Duration,
current_epoch: atomic::AtomicU64,
durable_epoch: atomic::AtomicU64,
error: atomic::AtomicPtr<sync::Arc<FrozenErr>>,
}
unsafe impl Send for Core {}
unsafe impl Sync for Core {}
impl Core {
fn new(
map: TMap,
file: FrozenFile,
flush_duration: time::Duration,
curr_length: usize,
total_slots: usize,
) -> Self {
Self {
map,
file,
curr_length,
flush_duration,
cv: sync::Condvar::new(),
lock: sync::Mutex::new(()),
io_lock: sync::RwLock::new(()),
locks: Locks::new(total_slots),
durable_cv: sync::Condvar::new(),
durable_lock: sync::Mutex::new(()),
dirty: atomic::AtomicBool::new(false),
closed: atomic::AtomicBool::new(false),
current_epoch: atomic::AtomicU64::new(0),
durable_epoch: atomic::AtomicU64::new(0),
error: atomic::AtomicPtr::new(std::ptr::null_mut()),
}
}
#[inline]
fn sync(&self) -> FrozenRes<()> {
unsafe { self.map.sync(self.curr_length) }?;
self.file.sync()
}
#[inline(always)]
fn set_sync_error(&self, err: FrozenErr) {
let boxed = Box::into_raw(Box::new(sync::Arc::new(err)));
let old = self.error.swap(boxed, atomic::Ordering::AcqRel);
if !old.is_null() {
unsafe { drop(Box::from_raw(old)) };
}
}
#[inline(always)]
fn get_sync_error(&self) -> Option<FrozenErr> {
let ptr = self.error.load(atomic::Ordering::Acquire);
if hints::likely(ptr.is_null()) {
return None;
}
let arc = unsafe { &*ptr }.clone();
Some((*arc).clone())
}
#[inline]
fn clear_sync_error(&self) {
let old = self.error.swap(std::ptr::null_mut(), atomic::Ordering::AcqRel);
if hints::unlikely(!old.is_null()) {
unsafe {
drop(Box::from_raw(old));
}
}
}
#[inline]
fn acquire_io_lock(&self) -> FrozenRes<sync::RwLockReadGuard<'_, ()>> {
self.io_lock.read().map_err(|e| new_err_raw(err::LPN, e))
}
#[inline]
fn acquire_exclusive_io_lock(&self) -> FrozenRes<sync::RwLockWriteGuard<'_, ()>> {
self.io_lock.write().map_err(|e| new_err_raw(err::LPN, e))
}
#[inline]
fn incr_curr_epoch(&self) -> u64 {
self.current_epoch.fetch_add(1, atomic::Ordering::Release) + 1
}
#[inline]
fn mark_epoch_durable(&self) {
let curr_epoch = self.current_epoch.load(atomic::Ordering::Acquire);
self.durable_epoch.store(curr_epoch, atomic::Ordering::Release);
}
fn spawn_tx(core: sync::Arc<Self>) -> FrozenRes<thread::JoinHandle<()>> {
match thread::Builder::new()
.name("fm-flush-tx".into())
.spawn(move || Self::flush_tx(core))
{
Ok(tx) => Ok(tx),
Err(error) => new_err(err::FXE, error),
}
}
fn flush_tx(core: sync::Arc<Self>) {
let mut guard = match core.lock.lock() {
Ok(g) => g,
Err(error) => {
core.set_sync_error(new_err_raw(err::FXE, error));
core.cv.notify_all();
return;
}
};
loop {
guard = match core.cv.wait_timeout(guard, core.flush_duration) {
Ok((g, _)) => g,
Err(e) => {
core.set_sync_error(new_err_raw(err::TXE, e));
core.cv.notify_all();
return;
}
};
let dirty = core.dirty.swap(false, atomic::Ordering::AcqRel);
let closing = core.closed.load(atomic::Ordering::Acquire);
if !dirty {
if closing {
core.cv.notify_all();
return;
}
continue;
}
let io_lock = match core.acquire_exclusive_io_lock() {
Ok(lock) => lock,
Err(e) => {
core.set_sync_error(e);
core.cv.notify_all();
return;
}
};
drop(guard);
let target_epoch = core.current_epoch.load(atomic::Ordering::Acquire);
match core.sync() {
Ok(_) => {
core.durable_epoch.store(target_epoch, atomic::Ordering::Release);
let _g = match core.durable_lock.lock() {
Ok(g) => g,
Err(e) => {
core.set_sync_error(new_err_raw(err::LPN, e));
return;
}
};
core.clear_sync_error();
core.durable_cv.notify_all();
}
Err(err) => {
core.set_sync_error(err);
core.durable_cv.notify_all();
}
}
drop(io_lock);
guard = match core.lock.lock() {
Ok(g) => g,
Err(e) => {
core.set_sync_error(new_err_raw(err::LPN, e));
core.durable_cv.notify_all();
return;
}
};
}
}
}
#[derive(Debug)]
struct Locks(Box<[atomic::AtomicU8]>);
impl Locks {
const LOCK: u8 = 1;
const UNLOCK: u8 = 0;
const L1_CONTENTION: usize = 0x10;
const L2_CONTENTION: usize = 0x20;
fn new(cap: usize) -> Self {
let mut slots = Vec::with_capacity(cap);
for _ in 0..cap {
slots.push(atomic::AtomicU8::new(Self::UNLOCK));
}
Self(slots.into_boxed_slice())
}
#[inline(always)]
fn lock(&self, index: usize) -> LockGuard<'_> {
let val = &self.0[index];
let mut spins = 0;
loop {
if val
.compare_exchange_weak(
Self::UNLOCK,
Self::LOCK,
atomic::Ordering::Acquire,
atomic::Ordering::Relaxed,
)
.is_ok()
{
return LockGuard(val);
}
if hints::likely(spins < Self::L1_CONTENTION) {
std::hint::spin_loop();
} else if spins < Self::L2_CONTENTION {
thread::yield_now();
} else {
let ns = 0x30 << (spins - Self::L2_CONTENTION).min(0x0A);
thread::sleep(time::Duration::from_nanos(ns));
}
spins += 1;
}
}
}
struct LockGuard<'a>(&'a atomic::AtomicU8);
impl Drop for LockGuard<'_> {
fn drop(&mut self) {
self.0.store(Locks::UNLOCK, atomic::Ordering::Release);
}
}
#[cfg(test)]
mod tests {
use super::*;
const FLUSH_DURATION: time::Duration = time::Duration::from_micros(10);
const MID: u8 = 0;
const INIT_SLOTS: usize = 0x0A;
fn new_tmp() -> (tempfile::TempDir, std::path::PathBuf, FMCfg) {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("tmp_map");
let cfg = FMCfg {
initial_count: INIT_SLOTS,
flush_duration: FLUSH_DURATION,
};
(dir, path, cfg)
}
mod fm_lifecycle {
use super::*;
#[test]
fn ok_new() {
let (_dir, path, cfg) = new_tmp();
let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
assert_eq!(mmap.core.flush_duration, FLUSH_DURATION);
assert!(!mmap.core.dirty.load(atomic::Ordering::Acquire));
assert!(!mmap.core.closed.load(atomic::Ordering::Acquire));
assert_eq!(mmap.core.durable_epoch.load(atomic::Ordering::Acquire), 0);
assert_eq!(mmap.core.curr_length, INIT_SLOTS * FrozenMMap::<u64, MID>::SLOT_SIZE);
assert!(mmap.core.error.load(atomic::Ordering::Acquire).is_null());
let epoch = unsafe { mmap.write(0, |f| *f = 0x0A).unwrap() };
assert!(mmap.wait_for_durability(epoch).is_ok());
}
#[test]
fn ok_new_existing() {
let (_dir, path, cfg) = new_tmp();
let mmap1 = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
drop(mmap1);
let mmap2 = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
drop(mmap2);
}
#[test]
fn err_new_when_change_in_cfg() {
let (_dir, path, mut cfg) = new_tmp();
let mmap1 = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
drop(mmap1);
cfg.initial_count = INIT_SLOTS * 2;
assert!(FrozenMMap::<u64, MID>::new(path, cfg).is_err());
}
#[test]
fn ok_delete() {
let (_dir, path, cfg) = new_tmp();
let mut mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
mmap.delete().unwrap();
assert!(!mmap.core.file.exists().unwrap());
}
#[test]
fn err_delete_after_delete() {
let (_dir, path, cfg) = new_tmp();
let mut mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
mmap.delete().unwrap();
assert!(!mmap.core.file.exists().unwrap());
assert!(mmap.delete().is_err());
}
#[test]
fn ok_drop_persists_when_dropped_before_bg_flush() {
let (_dir, path, cfg) = new_tmp();
const VAL: u64 = 0x0A;
{
let mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
unsafe { mmap.write(0, |byte| *byte = VAL).unwrap() };
drop(mmap);
}
{
let mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
let val = unsafe { mmap.read(0, |byte| *byte).unwrap() };
assert_eq!(val, VAL);
}
}
}
mod fm_validate_t {
use super::*;
#[repr(C, align(8))]
struct OkT {
a: u64,
b: u64,
}
#[repr(C)]
struct BadAlignT {
a: u32,
b: u32,
}
#[repr(C, align(4))]
struct BadSizeT {
a: u32,
b: u16,
}
#[repr(C, align(8))]
struct DropT(u64);
impl Drop for DropT {
fn drop(&mut self) {}
}
#[repr(C, align(8))]
struct ZstT;
#[test]
fn ok_validate_t() {
assert!(FrozenMMap::<OkT, MID>::validate_t().is_ok());
}
#[test]
fn err_validate_t_when_drop() {
assert!(FrozenMMap::<DropT, MID>::validate_t().is_err());
}
#[test]
fn err_validate_t_when_not_8_byte_aligned() {
assert!(FrozenMMap::<BadAlignT, MID>::validate_t().is_err());
}
#[test]
fn err_validate_t_when_size_not_multiple_of_8() {
assert!(FrozenMMap::<BadSizeT, MID>::validate_t().is_err());
}
#[test]
fn err_validate_t_when_zero_sized() {
assert!(FrozenMMap::<ZstT, MID>::validate_t().is_err());
}
#[test]
fn err_new_when_t_implements_drop() {
let (_dir, path, cfg) = new_tmp();
assert!(FrozenMMap::<DropT, MID>::new(path, cfg).is_err());
}
#[test]
fn err_new_when_t_is_not_8_byte_aligned() {
let (_dir, path, cfg) = new_tmp();
assert!(FrozenMMap::<BadAlignT, MID>::new(path, cfg).is_err());
}
#[test]
fn err_new_when_t_size_is_not_multiple_of_8() {
let (_dir, path, cfg) = new_tmp();
assert!(FrozenMMap::<BadSizeT, MID>::new(path, cfg).is_err());
}
#[test]
fn err_new_when_t_is_zero_sized() {
let (_dir, path, cfg) = new_tmp();
assert!(FrozenMMap::<ZstT, MID>::new(path, cfg).is_err());
}
#[test]
fn err_new_grown_when_t_implements_drop() {
let (_dir, path, cfg) = new_tmp();
assert!(FrozenMMap::<DropT, MID>::new_grown(path, cfg, 1).is_err());
}
#[test]
fn err_new_grown_when_t_is_not_8_byte_aligned() {
let (_dir, path, cfg) = new_tmp();
assert!(FrozenMMap::<BadAlignT, MID>::new_grown(path, cfg, 1).is_err());
}
#[test]
fn err_new_grown_when_t_size_is_not_multiple_of_8() {
let (_dir, path, cfg) = new_tmp();
assert!(FrozenMMap::<BadSizeT, MID>::new_grown(path, cfg, 1).is_err());
}
#[test]
fn err_new_grown_when_t_is_zero_sized() {
let (_dir, path, cfg) = new_tmp();
assert!(FrozenMMap::<ZstT, MID>::new_grown(path, cfg, 1).is_err());
}
}
mod fm_new_grown {
use super::*;
#[test]
fn ok_new_grown_updates_length() {
let (_dir, path, cfg) = new_tmp();
let mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
assert_eq!(mmap.total_slots(), INIT_SLOTS);
drop(mmap);
let mmap = FrozenMMap::<u64, MID>::new_grown(path, cfg, 0x0A).unwrap();
assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x0A);
assert_eq!(
mmap.core.curr_length,
(INIT_SLOTS + 0x0A) * FrozenMMap::<u64, MID>::SLOT_SIZE
);
}
#[test]
fn err_new_grown_with_preexisting_instance() {
let (_dir, path, cfg) = new_tmp();
let mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
assert_eq!(mmap.total_slots(), INIT_SLOTS);
assert!(FrozenMMap::<u64, MID>::new_grown(path, cfg, 0x0A).is_err());
}
#[test]
fn ok_new_grown_cycle() {
let (_dir, path, cfg) = new_tmp();
let mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
drop(mmap);
let mmap = FrozenMMap::<u64, MID>::new_grown(&path, cfg.clone(), 0x100).unwrap();
assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x100);
drop(mmap);
let mmap = FrozenMMap::<u64, MID>::new_grown(&path, cfg.clone(), 0x100).unwrap();
assert_eq!(mmap.total_slots(), INIT_SLOTS + (2 * 0x100));
drop(mmap);
let mmap = FrozenMMap::<u64, MID>::new_grown(path, cfg, 0x100).unwrap();
assert_eq!(mmap.total_slots(), INIT_SLOTS + (3 * 0x100));
}
#[test]
fn ok_write_reopen_grown_read() {
let (_dir, path, cfg) = new_tmp();
{
let mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
unsafe { mmap.write(0, |v| *v = 0xAA).unwrap() };
}
{
let mmap = FrozenMMap::<u64, MID>::new_grown(&path, cfg.clone(), 0x10).unwrap();
unsafe { mmap.write(0, |v| *v = 0xBB).unwrap() };
let val = unsafe { mmap.read(0, |v| *v).unwrap() };
assert_eq!(val, 0xBB);
}
}
#[test]
fn ok_write_reopen_grown_read_cycle() {
let (_dir, path, cfg) = new_tmp();
{
let mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
unsafe { mmap.write(0, |v| *v = 1).unwrap() };
}
for i in 0..2 {
let mmap = FrozenMMap::<u64, MID>::new_grown(&path, cfg.clone(), 0x10).unwrap();
let idx = mmap.total_slots() - 1;
unsafe { mmap.write(idx, |v| *v = (i + 2) as u64).unwrap() };
drop(mmap);
}
let mmap = FrozenMMap::<u64, MID>::new(&path, cfg).unwrap();
let base = unsafe { mmap.read(0, |v| *v).unwrap() };
assert_eq!(base, 1);
let last_idx = mmap.total_slots() - 1;
let last = unsafe { mmap.read(last_idx, |v| *v).unwrap() };
assert_eq!(last, 3);
}
#[test]
fn err_new_grown_while_previous_instance_is_alive() {
let (_dir, path, cfg) = new_tmp();
let _mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
let reopened = FrozenMMap::<u64, MID>::new_grown(&path, cfg, 0x10);
assert!(reopened.is_err());
}
}
mod fm_write_read {
use super::*;
#[test]
fn ok_write_wait_read_cycle() {
const VAL: u64 = 0xDEADC0DE;
let (_dir, path, cfg) = new_tmp();
let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
let epoch = unsafe { mmap.write(0, |ptr| *ptr = VAL).unwrap() };
mmap.wait_for_durability(epoch).unwrap();
let val = unsafe { mmap.read(0, |ptr| *ptr).unwrap() };
assert_eq!(val, VAL);
}
#[test]
fn ok_write_read_without_wait() {
const VAL: u64 = 0xDEADC0DE;
let (_dir, path, cfg) = new_tmp();
let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
unsafe { mmap.write(0, |ptr| *ptr = VAL).unwrap() };
let val = unsafe { mmap.read(0, |ptr| *ptr).unwrap() };
assert_eq!(val, VAL);
}
}
mod fm_write_sync_read {
use super::*;
#[test]
fn ok_write_sync_read() {
let (_dir, path, cfg) = new_tmp();
let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
unsafe { mmap.write_sync(0, |v| *v = 0x4C).unwrap() };
let val = unsafe { mmap.read(0, |v| *v).unwrap() };
assert_eq!(val, 0x4C);
}
#[test]
fn ok_write_sync_wait_returns_immediately() {
let (_dir, path, cfg) = new_tmp();
let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
let _ = unsafe { mmap.write_sync(0, |v| *v = 0x6A).unwrap() };
let val = unsafe { mmap.read(0, |v| *v).unwrap() };
assert_eq!(val, 0x6A);
}
#[test]
fn ok_write_sync_followed_by_async_write() {
let (_dir, path, cfg) = new_tmp();
let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
unsafe { mmap.write_sync(0, |v| *v = 0x0A).unwrap() };
let epoch = unsafe { mmap.write(0, |v| *v = 0x14).unwrap() };
mmap.wait_for_durability(epoch).unwrap();
let val = unsafe { mmap.read(0, |v| *v).unwrap() };
assert_eq!(val, 0x14);
}
#[test]
fn ok_write_sync_makes_prev_batch_durable() {
let (_dir, path, cfg) = new_tmp();
let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
let async_epoch = unsafe { mmap.write(0, |v| *v = 1).unwrap() };
unsafe { mmap.write_sync(1, |v| *v = 2).unwrap() };
mmap.wait_for_durability(async_epoch).unwrap();
let v1 = unsafe { mmap.read(0, |v| *v).unwrap() };
let v2 = unsafe { mmap.read(1, |v| *v).unwrap() };
assert_eq!(v1, 1);
assert_eq!(v2, 2);
}
#[test]
fn ok_write_sync_persists_across_reopen() {
let (_dir, path, cfg) = new_tmp();
const VAL: u64 = 0x1000;
{
let mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
unsafe { mmap.write_sync(0, |v| *v = VAL).unwrap() };
}
{
let mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
let val = unsafe { mmap.read(0, |v| *v).unwrap() };
assert_eq!(val, VAL);
}
}
}
mod fm_tx {
use super::*;
#[test]
fn ok_tx_basic_multi_write() {
let (_dir, path, cfg) = new_tmp();
let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
let mut tx = mmap.new_tx();
unsafe {
tx.write(0, |v| *v = 1).unwrap();
tx.write(1, |v| *v = 2).unwrap();
tx.write(2, |v| *v = 3).unwrap();
}
let epoch = tx.commit().unwrap();
mmap.wait_for_durability(epoch).unwrap();
let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
let v2 = unsafe { mmap.read(2, |v| *v).unwrap() };
assert_eq!((v0, v1, v2), (1, 2, 3));
}
#[test]
fn ok_tx_single_epoch() {
let (_dir, path, cfg) = new_tmp();
let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
let mut tx = mmap.new_tx();
unsafe {
tx.write(0, |v| *v = 0x0A).unwrap();
tx.write(1, |v| *v = 0x14).unwrap();
}
let epoch = tx.commit().unwrap();
let next_epoch = unsafe { mmap.write(2, |v| *v = 0x1E).unwrap() };
assert!(next_epoch > epoch);
}
#[test]
fn err_tx_out_of_order() {
let (_dir, path, cfg) = new_tmp();
let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
let mut tx = mmap.new_tx();
unsafe {
tx.write(2, |v| *v = 1).unwrap();
let res = tx.write(1, |v| *v = 2);
assert!(res.is_err());
}
}
#[test]
fn err_tx_duplicate_index() {
let (_dir, path, cfg) = new_tmp();
let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
let mut tx = mmap.new_tx();
unsafe {
tx.write(1, |v| *v = 1).unwrap();
let res = tx.write(1, |v| *v = 2);
assert!(res.is_err());
}
}
#[test]
fn ok_tx_concurrent_non_overlapping() {
let (_dir, path, cfg) = new_tmp();
let mmap = sync::Arc::new(FrozenMMap::<u64, MID>::new(path, cfg).unwrap());
let mut handles = Vec::new();
for i in 0..2 {
let mmap = mmap.clone();
handles.push(thread::spawn(move || {
let mut tx = mmap.new_tx();
unsafe {
tx.write(i * 2, |v| *v = i as u64).unwrap();
tx.write(i * 2 + 1, |v| *v = i as u64).unwrap();
}
let epoch = tx.commit().unwrap();
mmap.wait_for_durability(epoch).unwrap();
}));
}
for h in handles {
h.join().unwrap();
}
for i in 0..2 {
let v0 = unsafe { mmap.read(i * 2, |v| *v).unwrap() };
let v1 = unsafe { mmap.read(i * 2 + 1, |v| *v).unwrap() };
assert_eq!((v0, v1), (i as u64, i as u64));
}
}
#[test]
fn ok_tx_overwrite_last_wins() {
let (_dir, path, cfg) = new_tmp();
let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
let mut tx = mmap.new_tx();
unsafe {
tx.write(0, |v| *v = 1).unwrap();
}
tx.commit().unwrap();
let mut tx2 = mmap.new_tx();
unsafe {
tx2.write(0, |v| *v = 2).unwrap();
}
let epoch = tx2.commit().unwrap();
mmap.wait_for_durability(epoch).unwrap();
let val = unsafe { mmap.read(0, |v| *v).unwrap() };
assert_eq!(val, 2);
}
#[test]
fn ok_tx_persists_across_reopen() {
let (_dir, path, cfg) = new_tmp();
{
let mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
let mut tx = mmap.new_tx();
unsafe {
tx.write(0, |v| *v = 0x3A).unwrap();
tx.write(1, |v| *v = 0x54).unwrap();
}
let epoch = tx.commit().unwrap();
mmap.wait_for_durability(epoch).unwrap();
}
{
let mmap = FrozenMMap::<u64, MID>::new(&path, cfg).unwrap();
let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
assert_eq!((v0, v1), (0x3A, 0x54));
}
}
}
mod fm_durability {
use super::*;
#[test]
fn ok_wait_then_drop() {
let (_dir, path, cfg) = new_tmp();
let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
let epoch = unsafe { mmap.write(0, |v| *v = 7).unwrap() };
mmap.wait_for_durability(epoch).unwrap();
drop(mmap);
}
#[test]
fn ok_epoch_monotonicity() {
let (_dir, path, cfg) = new_tmp();
let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
let e1 = unsafe { mmap.write(0, |v| *v = 1).unwrap() };
mmap.wait_for_durability(e1).unwrap();
let e2 = unsafe { mmap.write(0, |v| *v = 2).unwrap() };
mmap.wait_for_durability(e2).unwrap();
assert!(e2 >= e1);
}
#[test]
fn ok_wait_for_durability_with_multi_writers() {
let (_dir, path, cfg) = new_tmp();
let mmap = sync::Arc::new(FrozenMMap::<u64, MID>::new(path, cfg).unwrap());
let mut handles = Vec::new();
for _ in 0..2 {
let mmap = mmap.clone();
handles.push(thread::spawn(move || {
let epoch = unsafe { mmap.write(0, |v| *v += 1).unwrap() };
mmap.wait_for_durability(epoch).unwrap();
}));
}
for h in handles {
h.join().unwrap();
}
let val = unsafe { mmap.read(0, |v| *v).unwrap() };
assert_eq!(val, 2);
}
}
mod fm_concurrency {
use super::*;
#[test]
fn ok_parallel_reads_with_diff_index() {
let (_dir, path, cfg) = new_tmp();
let mmap = sync::Arc::new(FrozenMMap::<u64, MID>::new(path, cfg).unwrap());
unsafe { mmap.write(0, |v| *v = 0x10).unwrap() };
unsafe { mmap.write(1, |v| *v = 0x20).unwrap() };
let t1 = {
let mmap = mmap.clone();
thread::spawn(move || unsafe { mmap.read(0, |v| *v).unwrap() })
};
let t2 = {
let mmap = mmap.clone();
thread::spawn(move || unsafe { mmap.read(1, |v| *v).unwrap() })
};
assert_eq!(t1.join().unwrap(), 0x10);
assert_eq!(t2.join().unwrap(), 0x20);
}
#[test]
fn ok_multi_tx_drop_then_reopen_grown() {
let (_dir, path, cfg) = new_tmp();
{
let mmap = sync::Arc::new(FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap());
let mut handles = Vec::new();
for i in 0..2u64 {
let mmap = mmap.clone();
handles.push(thread::spawn(move || {
let _ = unsafe { mmap.write(i as usize, |v| *v = i + 1).unwrap() };
}));
}
for i in 0..2usize {
let mmap = mmap.clone();
handles.push(thread::spawn(move || {
let _ = unsafe { mmap.read(i, |v| *v).unwrap() };
}));
}
for h in handles {
h.join().unwrap();
}
}
{
let mmap = FrozenMMap::<u64, MID>::new_grown(&path, cfg.clone(), 0x10).unwrap();
assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x10);
for i in 0..2u64 {
let val = unsafe { mmap.read(i as usize, |v| *v).unwrap() };
assert_eq!(val, i + 1);
}
let idx = mmap.total_slots() - 1;
unsafe { mmap.write(idx, |v| *v = 0xDEAD).unwrap() };
let val = unsafe { mmap.read(idx, |v| *v).unwrap() };
assert_eq!(val, 0xDEAD);
}
}
}
}