#[cfg(any(target_os = "linux", target_os = "macos"))]
mod posix;
use crate::{
error::{ErrCode, FrozenErr, FrozenRes},
ffile::{FFCfg, FrozenFile},
hints,
};
use std::{
cell, fmt, mem,
sync::{self, atomic},
thread, time,
};
const ERRDOMAIN: u8 = 0x12;
pub type TEpoch = u64;
#[cfg(any(target_os = "linux", target_os = "macos"))]
type TMap = posix::POSIXMMap;
static MODULE_ID: sync::OnceLock<u8> = sync::OnceLock::new();
#[cfg(not(test))]
#[inline(always)]
fn mod_id() -> &'static u8 {
MODULE_ID.get().unwrap()
}
#[cfg(test)]
#[inline(always)]
fn mod_id() -> &'static u8 {
MODULE_ID.get_or_init(|| 0)
}
pub(in crate::fmmap) mod err {
use super::ErrCode;
pub const HCF: ErrCode = ErrCode::new(0x200, "hault and catch fire");
pub const UNK: ErrCode = ErrCode::new(0x201, "unknown error");
pub const NMM: ErrCode = ErrCode::new(0x202, "not enough memory available on the device");
pub const SYN: ErrCode = ErrCode::new(0x203, "failed to sync/flush data to storage device");
pub const PRM: ErrCode = ErrCode::new(0x204, "missing permissions for IO");
pub const TXE: ErrCode = ErrCode::new(0x205, "flush_tx paniced inside");
pub const FXE: ErrCode = ErrCode::new(0x206, "unable to spawn flush_tx");
pub const LPN: ErrCode = ErrCode::new(0x207, "lock poisoned internally");
}
#[inline]
pub(in crate::fmmap) fn new_err<R, E: std::fmt::Display>(code: ErrCode, error: E) -> FrozenRes<R> {
let err = FrozenErr::new_raw(*mod_id(), ERRDOMAIN, code, error);
Err(err)
}
#[inline]
pub(in crate::fmmap) fn new_err_raw<E: std::fmt::Display>(code: ErrCode, error: E) -> FrozenErr {
FrozenErr::new_raw(*mod_id(), ERRDOMAIN, code, error)
}
#[derive(Debug, Clone)]
pub struct FMCfg {
pub mid: u8,
pub path: std::path::PathBuf,
pub initial_count: usize,
pub flush_duration: time::Duration,
}
#[derive(Debug)]
pub struct FrozenMMap<T>
where
T: Sized + Send + Sync,
{
core: sync::Arc<Core>,
tx: Option<thread::JoinHandle<()>>,
_type: core::marker::PhantomData<T>,
}
unsafe impl<T> Send for FrozenMMap<T> where T: Sized + Send + Sync {}
unsafe impl<T> Sync for FrozenMMap<T> where T: Sized + Send + Sync {}
impl<T> FrozenMMap<T>
where
T: Sized + Send + Sync,
{
pub const SLOT_SIZE: usize = std::mem::size_of::<ObjectInterface<T>>();
#[inline]
pub fn slots(&self) -> usize {
self.core.curr_length() / Self::SLOT_SIZE
}
pub fn new(cfg: FMCfg) -> FrozenRes<Self> {
let ff_cfg = FFCfg {
mid: cfg.mid,
path: cfg.path,
chunk_size: Self::SLOT_SIZE,
initial_chunk_amount: cfg.initial_count,
};
let file = FrozenFile::new(ff_cfg)?;
let curr_length = file.length()?;
let _ = MODULE_ID.get_or_init(|| cfg.mid);
let mmap = unsafe { TMap::new(file.fd(), curr_length) }?;
let core = sync::Arc::new(Core::new(mmap, file, cfg.flush_duration, curr_length));
let tx = Core::spawn_tx(core.clone())?;
Ok(Self {
core,
tx: Some(tx),
_type: core::marker::PhantomData,
})
}
pub fn wait_for_durability(&self, epoch: u64) -> FrozenRes<()> {
if let Some(sync_err) = self.core.get_sync_error() {
return Err(sync_err);
}
let durable_epoch = self.core.durable_epoch.load(atomic::Ordering::Acquire);
if durable_epoch > epoch {
return Ok(());
}
let mut guard = match self.core.durable_lock.lock() {
Ok(g) => g,
Err(e) => return new_err(err::LPN, e),
};
loop {
if let Some(sync_err) = self.core.get_sync_error() {
return Err(sync_err);
}
if self.core.durable_epoch.load(atomic::Ordering::Acquire) > epoch {
return Ok(());
}
guard = match self.core.durable_cv.wait(guard) {
Ok(g) => g,
Err(e) => return new_err(err::LPN, e),
};
}
}
#[inline(always)]
pub fn read<R>(&self, index: usize, f: impl FnOnce(&T) -> R) -> FrozenRes<R> {
let offset = Self::SLOT_SIZE * index;
let _guard = self.core.acquire_io_lock()?;
let slot: &ObjectInterface<T> = unsafe { &*self.get_mmap().as_ptr::<T>(offset) };
let _oi_guard = slot.lock();
let res = unsafe { f(slot.get()) };
Ok(res)
}
#[inline(always)]
pub fn write<R>(&self, index: usize, f: impl FnOnce(&mut T) -> R) -> FrozenRes<(R, TEpoch)> {
if let Some(err) = self.core.get_sync_error() {
return Err(err);
}
let offset = Self::SLOT_SIZE * index;
let _guard = self.core.acquire_io_lock()?;
let slot: &ObjectInterface<T> = unsafe { &*self.get_mmap().as_ptr::<T>(offset) };
let _oi_guard = slot.lock();
let res = unsafe { f(slot.get_mut()) };
self.core.dirty.store(true, atomic::Ordering::Release);
let epoch = self.core.durable_epoch.load(atomic::Ordering::Acquire);
Ok((res, epoch))
}
#[inline(always)]
pub fn write_sync<R>(&self, index: usize, f: impl FnOnce(&mut T) -> R) -> FrozenRes<(R, TEpoch)> {
if let Some(err) = self.core.get_sync_error() {
return Err(err);
}
let offset = Self::SLOT_SIZE * index;
let _flush_guard = self.core.lock.lock().map_err(|e| new_err_raw(err::LPN, e))?;
let _guard = self.core.acquire_exclusive_io_lock()?;
let prev_epoch = self.core.durable_epoch.fetch_add(1, atomic::Ordering::Release);
let prev = self.core.dirty.swap(false, atomic::Ordering::AcqRel);
let slot: &ObjectInterface<T> = unsafe { &*self.get_mmap().as_ptr::<T>(offset) };
let _oi_guard = slot.lock();
let res = unsafe { f(slot.get_mut()) };
self.core.sync()?;
if prev {
let _g = self.core.durable_lock.lock().map_err(|e| new_err_raw(err::LPN, e))?;
self.core.durable_cv.notify_all();
}
Ok((res, prev_epoch))
}
pub fn grow(&self, count: usize) -> FrozenRes<()> {
let core = &self.core;
let _lock = self.core.acquire_exclusive_io_lock()?;
if core.dirty.swap(false, atomic::Ordering::AcqRel) {
core.sync()?;
core.incr_epoch();
let _g = core.durable_lock.lock().map_err(|e| new_err_raw(err::LPN, e))?;
core.durable_cv.notify_all();
}
unsafe {
self.munmap()?;
mem::ManuallyDrop::drop(&mut *core.mmap.get());
}
core.ffile.grow(count)?;
let new_len = core.ffile.length()?;
core.curr_length.store(new_len, atomic::Ordering::Release);
unsafe {
let new_map = TMap::new(core.ffile.fd(), new_len)?;
*core.mmap.get() = mem::ManuallyDrop::new(new_map);
};
Ok(())
}
pub fn delete(&mut self) -> FrozenRes<()> {
let core = &self.core;
let _lock = core.acquire_exclusive_io_lock()?;
if core.dirty.swap(false, atomic::Ordering::AcqRel) {
core.sync()?;
core.incr_epoch();
let _g = core.durable_lock.lock().map_err(|e| new_err_raw(err::LPN, e))?;
core.durable_cv.notify_all();
}
core.closed.store(true, atomic::Ordering::Release);
core.cv.notify_one();
if let Some(handle) = self.tx.take() {
let _ = handle.join();
}
self.munmap()?;
core.ffile.delete()
}
#[inline]
fn munmap(&self) -> FrozenRes<()> {
let length = self.core.curr_length();
unsafe { self.get_mmap().unmap(length) }
}
#[inline]
fn get_mmap(&self) -> &mem::ManuallyDrop<TMap> {
unsafe { &*self.core.mmap.get() }
}
}
impl<T> Drop for FrozenMMap<T>
where
T: Sized + Send + Sync,
{
fn drop(&mut self) {
self.core.closed.store(true, atomic::Ordering::Release);
self.core.cv.notify_one();
if let Some(handle) = self.tx.take() {
let _ = handle.join();
}
let _io_lock = self.core.acquire_exclusive_io_lock();
let ptr = self.core.error.swap(std::ptr::null_mut(), atomic::Ordering::AcqRel);
if !ptr.is_null() {
unsafe {
drop(Box::from_raw(ptr));
}
}
let _ = self.munmap();
}
}
impl<T> fmt::Display for FrozenMMap<T>
where
T: Sized + Send + Sync,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"FrozenMMap{{fd: {}, len: {}}}",
self.core.ffile.fd(),
self.core.curr_length()
)
}
}
#[derive(Debug)]
struct Core {
cv: sync::Condvar,
ffile: FrozenFile,
lock: sync::Mutex<()>,
io_lock: sync::RwLock<()>,
dirty: atomic::AtomicBool,
durable_cv: sync::Condvar,
closed: atomic::AtomicBool,
durable_lock: sync::Mutex<()>,
flush_duration: time::Duration,
durable_epoch: atomic::AtomicU64,
curr_length: atomic::AtomicUsize,
error: atomic::AtomicPtr<FrozenErr>,
mmap: cell::UnsafeCell<mem::ManuallyDrop<TMap>>,
}
unsafe impl Send for Core {}
unsafe impl Sync for Core {}
impl Core {
fn new(mmap: TMap, ffile: FrozenFile, flush_duration: time::Duration, curr_length: usize) -> Self {
Self {
ffile,
flush_duration,
cv: sync::Condvar::new(),
lock: sync::Mutex::new(()),
io_lock: sync::RwLock::new(()),
durable_cv: sync::Condvar::new(),
durable_lock: sync::Mutex::new(()),
dirty: atomic::AtomicBool::new(false),
closed: atomic::AtomicBool::new(false),
durable_epoch: atomic::AtomicU64::new(0),
curr_length: atomic::AtomicUsize::new(curr_length),
error: atomic::AtomicPtr::new(std::ptr::null_mut()),
mmap: cell::UnsafeCell::new(mem::ManuallyDrop::new(mmap)),
}
}
#[inline]
fn curr_length(&self) -> usize {
self.curr_length.load(atomic::Ordering::Acquire)
}
#[inline]
fn sync(&self) -> FrozenRes<()> {
unsafe { (*self.mmap.get()).sync(self.curr_length()) }?;
self.ffile.sync()
}
#[inline]
fn set_sync_error(&self, err: FrozenErr) {
let boxed = Box::into_raw(Box::new(err));
let old = self.error.swap(boxed, atomic::Ordering::AcqRel);
if !old.is_null() {
unsafe {
drop(Box::from_raw(old));
}
}
}
#[inline(always)]
fn get_sync_error(&self) -> Option<FrozenErr> {
let ptr = self.error.load(atomic::Ordering::Acquire);
if hints::likely(ptr.is_null()) {
return None;
}
Some(unsafe { (*ptr).clone() })
}
#[inline]
fn clear_sync_error(&self) {
let old = self.error.swap(std::ptr::null_mut(), atomic::Ordering::AcqRel);
if hints::unlikely(!old.is_null()) {
unsafe {
drop(Box::from_raw(old));
}
}
}
#[inline]
fn acquire_io_lock(&self) -> FrozenRes<sync::RwLockReadGuard<'_, ()>> {
self.io_lock.read().map_err(|e| new_err_raw(err::LPN, e))
}
#[inline]
fn acquire_exclusive_io_lock(&self) -> FrozenRes<sync::RwLockWriteGuard<'_, ()>> {
self.io_lock.write().map_err(|e| new_err_raw(err::LPN, e))
}
#[inline]
fn incr_epoch(&self) {
self.durable_epoch.fetch_add(1, atomic::Ordering::Release);
}
fn spawn_tx(core: sync::Arc<Self>) -> FrozenRes<thread::JoinHandle<()>> {
match thread::Builder::new()
.name("fm-flush-tx".into())
.spawn(move || Self::flush_tx(core))
{
Ok(tx) => Ok(tx),
Err(error) => new_err(err::FXE, error),
}
}
fn flush_tx(core: sync::Arc<Self>) {
let mut guard = match core.lock.lock() {
Ok(g) => g,
Err(error) => {
core.set_sync_error(new_err_raw(err::FXE, error));
return;
}
};
loop {
guard = match core.cv.wait_timeout(guard, core.flush_duration) {
Ok((g, _)) => g,
Err(e) => {
core.set_sync_error(new_err_raw(err::TXE, e));
return;
}
};
let dirty = core.dirty.swap(false, atomic::Ordering::AcqRel);
let closing = core.closed.load(atomic::Ordering::Acquire);
if !dirty {
if closing {
return;
}
continue;
}
let io_lock = match core.acquire_exclusive_io_lock() {
Ok(lock) => lock,
Err(e) => {
core.set_sync_error(e);
return;
}
};
drop(guard);
match core.sync() {
Ok(_) => {
core.incr_epoch();
let _g = match core.durable_lock.lock() {
Ok(g) => g,
Err(e) => {
core.set_sync_error(new_err_raw(err::LPN, e));
return;
}
};
core.durable_cv.notify_all();
core.clear_sync_error();
}
Err(err) => core.set_sync_error(err),
}
drop(io_lock);
guard = match core.lock.lock() {
Ok(g) => g,
Err(e) => {
core.set_sync_error(new_err_raw(err::LPN, e));
return;
}
};
}
}
}
#[repr(C)]
pub(in crate::fmmap) struct ObjectInterface<T>
where
T: Sized + Send + Sync,
{
lock: cell::UnsafeCell<u64>,
value: cell::UnsafeCell<T>,
}
impl<T> ObjectInterface<T>
where
T: Sized + Send + Sync,
{
const MAX_SPINS: usize = 0x10;
#[inline]
fn lock(&self) -> OIGuard<'_, T> {
let atm = unsafe { atomic::AtomicU64::from_ptr(self.lock.get()) };
let mut spins = 0;
loop {
if atm
.compare_exchange_weak(0, 1, atomic::Ordering::Acquire, atomic::Ordering::Relaxed)
.is_ok()
{
return OIGuard { oi: self };
}
if hints::likely(spins < Self::MAX_SPINS) {
std::hint::spin_loop();
} else {
std::thread::yield_now();
}
spins += 1;
}
}
#[inline]
unsafe fn get(&self) -> &T {
&*self.value.get()
}
#[inline]
#[allow(clippy::mut_from_ref)]
unsafe fn get_mut(&self) -> &mut T {
&mut *self.value.get()
}
}
struct OIGuard<'a, T>
where
T: Sized + Send + Sync,
{
oi: &'a ObjectInterface<T>,
}
impl<T> Drop for OIGuard<'_, T>
where
T: Sized + Send + Sync,
{
fn drop(&mut self) {
std::sync::atomic::fence(atomic::Ordering::Release);
let atm = unsafe { atomic::AtomicU64::from_ptr(self.oi.lock.get()) };
atm.store(0, atomic::Ordering::Release);
}
}
#[cfg(test)]
mod tests {
use super::*;
const FLUSH_DURATION: time::Duration = time::Duration::from_micros(10);
const INIT_SLOTS: usize = 0x0A;
const MOD_ID: u8 = 0;
fn new_tmp() -> (tempfile::TempDir, FMCfg) {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("tmp_map");
let cfg = FMCfg {
path,
mid: MOD_ID,
initial_count: INIT_SLOTS,
flush_duration: FLUSH_DURATION,
};
(dir, cfg)
}
mod fm_lifecycle {
use super::*;
#[test]
fn ok_new() {
let (_dir, cfg) = new_tmp();
let mmap = FrozenMMap::<u8>::new(cfg).unwrap();
assert_eq!(mmap.core.flush_duration, FLUSH_DURATION);
assert!(!mmap.core.dirty.load(atomic::Ordering::Acquire));
assert!(!mmap.core.closed.load(atomic::Ordering::Acquire));
assert_eq!(mmap.core.durable_epoch.load(atomic::Ordering::Acquire), 0);
assert_eq!(
mmap.core.curr_length.load(atomic::Ordering::Acquire),
INIT_SLOTS * FrozenMMap::<u8>::SLOT_SIZE
);
assert!(mmap.core.error.load(atomic::Ordering::Acquire).is_null());
let (_, epoch) = mmap.write(0, |f| *f = 0x0A).unwrap();
assert_eq!(epoch, 0); assert!(mmap.wait_for_durability(epoch).is_ok());
}
#[test]
fn ok_new_existing() {
let (_dir, cfg) = new_tmp();
let mmap1 = FrozenMMap::<u8>::new(cfg.clone()).unwrap();
drop(mmap1);
let mmap2 = FrozenMMap::<u8>::new(cfg).unwrap();
drop(mmap2);
}
#[test]
fn err_new_when_change_in_cfg() {
let (_dir, mut cfg) = new_tmp();
let mmap1 = FrozenMMap::<u8>::new(cfg.clone()).unwrap();
drop(mmap1);
cfg.initial_count = INIT_SLOTS * 2;
assert!(FrozenMMap::<u8>::new(cfg).is_err());
}
#[test]
fn ok_delete() {
let (_dir, cfg) = new_tmp();
let mut mmap = FrozenMMap::<u8>::new(cfg.clone()).unwrap();
mmap.delete().unwrap();
assert!(!mmap.core.ffile.exists().unwrap());
}
#[test]
fn err_delete_after_delete() {
let (_dir, cfg) = new_tmp();
let mut mmap = FrozenMMap::<u8>::new(cfg.clone()).unwrap();
mmap.delete().unwrap();
assert!(!mmap.core.ffile.exists().unwrap());
assert!(mmap.delete().is_err());
}
#[test]
fn ok_drop_persists_when_dropped_before_bg_flush() {
let (_dir, cfg) = new_tmp();
const VAL: u8 = 0x0A;
{
let mmap = FrozenMMap::<u8>::new(cfg.clone()).unwrap();
mmap.write(0, |byte| *byte = VAL).unwrap();
drop(mmap);
}
{
let mmap = FrozenMMap::<u8>::new(cfg.clone()).unwrap();
let val = mmap.read(0, |byte| *byte).unwrap();
assert_eq!(val, VAL);
}
}
}
mod fm_grow {
use super::*;
#[test]
fn ok_grow_updates_length() {
let (_dir, cfg) = new_tmp();
let mmap = FrozenMMap::<u8>::new(cfg).unwrap();
assert_eq!(mmap.core.curr_length(), INIT_SLOTS * FrozenMMap::<u8>::SLOT_SIZE);
mmap.grow(0x0A).unwrap();
assert_eq!(
mmap.core.curr_length(),
(INIT_SLOTS + 0x0A) * FrozenMMap::<u8>::SLOT_SIZE
);
}
#[test]
fn ok_grow_sync_cycle() {
let (_dir, cfg) = new_tmp();
let mmap = FrozenMMap::<u8>::new(cfg).unwrap();
for _ in 0..0x0A {
mmap.grow(0x100).unwrap();
}
assert_eq!(
mmap.core.curr_length(),
(INIT_SLOTS + (0x0A * 0x100)) * FrozenMMap::<u8>::SLOT_SIZE
);
}
#[test]
fn ok_write_grow_read() {
let (_dir, cfg) = new_tmp();
let mmap = FrozenMMap::<u64>::new(cfg).unwrap();
mmap.write(0, |v| *v = 0xAA).unwrap();
mmap.grow(0x10).unwrap();
mmap.write(0, |v| *v = 0xBB).unwrap();
let val = mmap.read(0, |v| *v).unwrap();
assert_eq!(val, 0xBB);
}
#[test]
fn ok_write_grow_read_cycle() {
let (_dir, cfg) = new_tmp();
let mmap = FrozenMMap::<u64>::new(cfg).unwrap();
mmap.write(0, |v| *v = 1).unwrap();
for i in 0..5 {
mmap.grow(0x10).unwrap();
let idx = mmap.slots() - 1;
mmap.write(idx, |v| *v = (i + 2) as u64).unwrap();
}
let base = mmap.read(0, |v| *v).unwrap();
assert_eq!(base, 1);
let last_idx = mmap.slots() - 1;
let last = mmap.read(last_idx, |v| *v).unwrap();
assert_eq!(last, 6);
}
}
mod fm_write_read {
use super::*;
#[test]
fn ok_write_wait_read_cycle() {
const VAL: u32 = 0xDEADC0DE;
let (_dir, cfg) = new_tmp();
let mmap = FrozenMMap::<u32>::new(cfg).unwrap();
let (_, epoch) = mmap.write(0, |ptr| *ptr = VAL).unwrap();
mmap.wait_for_durability(epoch).unwrap();
let val = mmap.read(0, |ptr| *ptr).unwrap();
assert_eq!(val, VAL);
}
#[test]
fn ok_write_read_without_wait() {
const VAL: u32 = 0xDEADC0DE;
let (_dir, cfg) = new_tmp();
let mmap = FrozenMMap::<u32>::new(cfg).unwrap();
mmap.write(0, |ptr| *ptr = VAL).unwrap();
let val = mmap.read(0, |ptr| *ptr).unwrap();
assert_eq!(val, VAL);
}
}
mod fm_write_sync_read {
use super::*;
#[test]
fn ok_write_sync_read() {
let (_dir, cfg) = new_tmp();
let mmap = FrozenMMap::<u64>::new(cfg).unwrap();
mmap.write_sync(0, |v| *v = 0x4C).unwrap();
let val = mmap.read(0, |v| *v).unwrap();
assert_eq!(val, 0x4C);
}
#[test]
fn ok_write_sync_wait_returns_immediately() {
let (_dir, cfg) = new_tmp();
let mmap = FrozenMMap::<u64>::new(cfg).unwrap();
let (_, epoch) = mmap.write_sync(0, |v| *v = 0x6A).unwrap();
mmap.wait_for_durability(epoch).unwrap();
let val = mmap.read(0, |v| *v).unwrap();
assert_eq!(val, 0x6A);
}
#[test]
fn ok_write_sync_epoch_progression() {
let (_dir, cfg) = new_tmp();
let mmap = FrozenMMap::<u64>::new(cfg).unwrap();
let (_, e1) = mmap.write_sync(0, |v| *v = 1).unwrap();
let (_, e2) = mmap.write_sync(0, |v| *v = 2).unwrap();
assert!(e2 >= e1);
mmap.wait_for_durability(e1).unwrap();
mmap.wait_for_durability(e2).unwrap();
}
#[test]
fn ok_write_sync_followed_by_async_write() {
let (_dir, cfg) = new_tmp();
let mmap = FrozenMMap::<u64>::new(cfg).unwrap();
mmap.write_sync(0, |v| *v = 0x0A).unwrap();
let (_, epoch) = mmap.write(0, |v| *v = 0x14).unwrap();
mmap.wait_for_durability(epoch).unwrap();
let val = mmap.read(0, |v| *v).unwrap();
assert_eq!(val, 0x14);
}
#[test]
fn ok_write_sync_makes_prev_batch_durable() {
let (_dir, cfg) = new_tmp();
let mmap = FrozenMMap::<u64>::new(cfg).unwrap();
let (_, async_epoch) = mmap.write(0, |v| *v = 1).unwrap();
mmap.write_sync(1, |v| *v = 2).unwrap();
mmap.wait_for_durability(async_epoch).unwrap();
let v1 = mmap.read(0, |v| *v).unwrap();
let v2 = mmap.read(1, |v| *v).unwrap();
assert_eq!(v1, 1);
assert_eq!(v2, 2);
}
#[test]
fn ok_write_sync_persists_across_reopen() {
let (dir, cfg) = new_tmp();
const VAL: u64 = 0x1000;
{
let mmap = FrozenMMap::<u64>::new(cfg.clone()).unwrap();
mmap.write_sync(0, |v| *v = VAL).unwrap();
}
{
let mmap = FrozenMMap::<u64>::new(cfg.clone()).unwrap();
let val = mmap.read(0, |v| *v).unwrap();
assert_eq!(val, VAL);
}
drop(dir);
}
}
mod fm_durability {
use super::*;
#[test]
fn ok_wait_then_drop() {
let (_dir, cfg) = new_tmp();
let mmap = FrozenMMap::<u64>::new(cfg).unwrap();
let (_, epoch) = mmap.write(0, |v| *v = 7).unwrap();
mmap.wait_for_durability(epoch).unwrap();
drop(mmap);
}
#[test]
fn ok_epoch_monotonicity() {
let (_dir, cfg) = new_tmp();
let mmap = FrozenMMap::<u64>::new(cfg).unwrap();
let (_, e1) = mmap.write(0, |v| *v = 1).unwrap();
mmap.wait_for_durability(e1).unwrap();
let (_, e2) = mmap.write(0, |v| *v = 2).unwrap();
mmap.wait_for_durability(e2).unwrap();
assert!(e2 >= e1);
}
#[test]
fn ok_wait_for_durability_with_multi_writers() {
let (_dir, cfg) = new_tmp();
let mmap = sync::Arc::new(FrozenMMap::<u64>::new(cfg).unwrap());
let mut handles = Vec::new();
for _ in 0..0x0A {
let mmap = mmap.clone();
handles.push(thread::spawn(move || {
let (_, epoch) = mmap.write(0, |v| *v += 1).unwrap();
mmap.wait_for_durability(epoch).unwrap();
}));
}
for h in handles {
h.join().unwrap();
}
let val = mmap.read(0, |v| *v).unwrap();
assert_eq!(val, 0x0A);
}
}
mod fm_concurrency {
use super::*;
#[test]
fn ok_oi_lock_with_multi_threads_same_index() {
let (_dir, cfg) = new_tmp();
let mmap = sync::Arc::new(FrozenMMap::<u64>::new(cfg).unwrap());
let mut handles = Vec::new();
for _ in 0..0x0A {
let mmap = mmap.clone();
handles.push(thread::spawn(move || {
for _ in 0..0x100 {
mmap.write(0, |v| *v += 1).unwrap();
}
}));
}
for h in handles {
h.join().unwrap();
}
let val = mmap.read(0, |v| *v).unwrap();
assert_eq!(val, 0x0A * 0x100);
}
#[test]
fn ok_parallel_reads_with_diff_index() {
let (_dir, cfg) = new_tmp();
let mmap = sync::Arc::new(FrozenMMap::<u64>::new(cfg).unwrap());
mmap.write(0, |v| *v = 0x10).unwrap();
mmap.write(1, |v| *v = 0x20).unwrap();
let t1 = {
let mmap = mmap.clone();
thread::spawn(move || mmap.read(0, |v| *v).unwrap())
};
let t2 = {
let mmap = mmap.clone();
thread::spawn(move || mmap.read(1, |v| *v).unwrap())
};
assert_eq!(t1.join().unwrap(), 0x10);
assert_eq!(t2.join().unwrap(), 0x20);
}
#[test]
fn ok_grow_with_multi_threads() {
let (_dir, cfg) = new_tmp();
let mmap = sync::Arc::new(FrozenMMap::<u64>::new(cfg).unwrap());
const THREADS: usize = 4;
const GROWS_PER_THREAD: usize = 0x10;
let mut handles = Vec::new();
for _ in 0..THREADS {
let mmap = mmap.clone();
handles.push(thread::spawn(move || {
for _ in 0..GROWS_PER_THREAD {
mmap.grow(1).unwrap();
let idx = mmap.slots() - 1;
mmap.write(idx, |v| *v = 0xABCD).unwrap();
}
}));
}
for h in handles {
h.join().unwrap();
}
let expected_min = INIT_SLOTS + (THREADS * GROWS_PER_THREAD);
assert_eq!(mmap.slots(), expected_min);
let last = mmap.read(mmap.slots() - 1, |v| *v).unwrap();
assert_eq!(last, 0xABCD);
}
#[test]
fn ok_wait_during_grow_cycle() {
let (_dir, cfg) = new_tmp();
let mmap = sync::Arc::new(FrozenMMap::<u64>::new(cfg).unwrap());
let mmap2 = mmap.clone();
let t = thread::spawn(move || {
let (_, epoch) = mmap2.write(0, |v| *v = 42).unwrap();
mmap2.wait_for_durability(epoch).unwrap();
});
mmap.grow(8).unwrap();
t.join().unwrap();
let val = mmap.read(0, |v| *v).unwrap();
assert_eq!(val, 42);
}
}
}