#[cfg(any(target_os = "linux", target_os = "macos"))]
mod posix;
use crate::{
error::{FrozenErr, FrozenRes},
ffile::{FFCfg, FrozenFile},
hints,
};
use std::{
cell, fmt, mem,
sync::{self, atomic},
thread, time,
};
type TEpoch = u64;
const ERRDOMAIN: u8 = 0x12;
#[cfg(any(target_os = "linux", target_os = "macos"))]
type TMap = posix::POSIXMMap;
static mut MODULE_ID: u8 = 0;
#[repr(u16)]
pub enum FMMapErr {
Hcf = 0x200,
Unk = 0x201,
Nmm = 0x202,
Syn = 0x203,
Txe = 0x204,
Lpn = 0x205,
Prm = 0x208,
Cfg = 0x209,
}
impl FMMapErr {
#[inline]
fn default_message(&self) -> &'static [u8] {
match self {
Self::Cfg => b"invalid cfg",
Self::Lpn => b"lock poisoned",
Self::Unk => b"unknown error type",
Self::Nmm => b"no more memory left",
Self::Hcf => b"hault and catch fire",
Self::Prm => b"no perm to read or write",
Self::Txe => b"thread failed or paniced",
Self::Syn => b"failed to sync/flush data to storage device",
}
}
}
#[inline]
pub(in crate::fmmap) fn new_err<R>(res: FMMapErr, message: Vec<u8>) -> FrozenRes<R> {
let detail = res.default_message();
let err = FrozenErr::new(unsafe { MODULE_ID }, ERRDOMAIN, res as u16, detail, message);
Err(err)
}
#[inline]
pub(in crate::fmmap) fn new_err_raw<E: std::fmt::Display>(res: FMMapErr, error: E) -> FrozenErr {
let detail = res.default_message();
FrozenErr::new(
unsafe { MODULE_ID },
ERRDOMAIN,
res as u16,
detail,
error.to_string().as_bytes().to_vec(),
)
}
#[derive(Debug, Clone)]
pub struct FMCfg {
pub mid: u8,
pub path: std::path::PathBuf,
pub initial_count: usize,
pub flush_duration: time::Duration,
}
#[derive(Debug)]
pub struct FrozenMMap<T>
where
T: Sized + Send + Sync,
{
core: sync::Arc<Core>,
tx: Option<thread::JoinHandle<()>>,
_type: core::marker::PhantomData<T>,
}
unsafe impl<T> Send for FrozenMMap<T> where T: Sized + Send + Sync {}
unsafe impl<T> Sync for FrozenMMap<T> where T: Sized + Send + Sync {}
impl<T> FrozenMMap<T>
where
T: Sized + Send + Sync,
{
pub const SLOT_SIZE: usize = std::mem::size_of::<ObjectInterface<T>>();
#[inline]
pub fn slots(&self) -> usize {
self.core.curr_length() / Self::SLOT_SIZE
}
pub fn new(cfg: FMCfg) -> FrozenRes<Self> {
let ff_cfg = FFCfg {
mid: cfg.mid,
path: cfg.path,
chunk_size: Self::SLOT_SIZE,
initial_chunk_amount: cfg.initial_count,
};
let file = FrozenFile::new(ff_cfg)?;
let curr_length = file.length()?;
unsafe { MODULE_ID = cfg.mid };
let mmap = unsafe { TMap::new(file.fd(), curr_length) }?;
let core = sync::Arc::new(Core::new(mmap, file, cfg.flush_duration, curr_length));
let tx = Core::spawn_tx(core.clone())?;
Ok(Self {
core,
tx: Some(tx),
_type: core::marker::PhantomData,
})
}
pub fn wait_for_durability(&self, epoch: u64) -> FrozenRes<()> {
if let Some(sync_err) = self.core.get_sync_error() {
return Err(sync_err);
}
let durable_epoch = self.core.durable_epoch.load(atomic::Ordering::Acquire);
if durable_epoch == 0 || durable_epoch > epoch {
return Ok(());
}
let mut guard = match self.core.durable_lock.lock() {
Ok(g) => g,
Err(e) => return Err(new_err_raw(FMMapErr::Lpn, e)),
};
loop {
if let Some(sync_err) = self.core.get_sync_error() {
return Err(sync_err);
}
if self.core.durable_epoch.load(atomic::Ordering::Acquire) > epoch {
return Ok(());
}
guard = match self.core.durable_cv.wait(guard) {
Ok(g) => g,
Err(e) => return Err(new_err_raw(FMMapErr::Lpn, e)),
};
}
}
#[inline]
pub fn read<R>(&self, index: usize, f: impl FnOnce(&T) -> R) -> FrozenRes<R> {
let offset = Self::SLOT_SIZE * index;
let _guard = self.core.acquire_io_lock()?;
let slot = unsafe { &*self.get_mmap().as_ptr::<T>(offset) };
let _oi_guard = slot.lock();
let res = unsafe { f(slot.get()) };
Ok(res)
}
#[inline]
pub fn write<R>(&self, index: usize, f: impl FnOnce(&mut T) -> R) -> FrozenRes<(R, TEpoch)> {
let offset = Self::SLOT_SIZE * index;
let _guard = self.core.acquire_io_lock()?;
let slot = unsafe { &*self.get_mmap().as_ptr::<T>(offset) };
let _oi_guard = slot.lock();
let res = unsafe { f(slot.get_mut()) };
let epoch = self.core.durable_epoch.load(atomic::Ordering::Acquire);
self.core.dirty.store(true, atomic::Ordering::Release);
Ok((res, epoch))
}
pub fn grow(&self, count: usize) -> FrozenRes<()> {
let core = &self.core;
let _lock = self.core.acquire_exclusive_io_lock()?;
if core.dirty.swap(false, atomic::Ordering::AcqRel) {
core.sync()?;
core.incr_epoch();
let _g = core.durable_lock.lock().map_err(|e| new_err_raw(FMMapErr::Lpn, e))?;
core.durable_cv.notify_all();
}
unsafe {
self.munmap()?;
mem::ManuallyDrop::drop(&mut *core.mmap.get());
}
core.ffile.grow(count)?;
let new_len = core.ffile.length()?;
core.curr_length.store(new_len, atomic::Ordering::Release);
unsafe {
let new_map = TMap::new(core.ffile.fd(), new_len)?;
*core.mmap.get() = mem::ManuallyDrop::new(new_map);
};
Ok(())
}
pub fn delete(&mut self) -> FrozenRes<()> {
let core = &self.core;
let _lock = core.acquire_exclusive_io_lock()?;
if core.dirty.swap(false, atomic::Ordering::AcqRel) {
core.sync()?;
core.incr_epoch();
let _g = core.durable_lock.lock().map_err(|e| new_err_raw(FMMapErr::Lpn, e))?;
core.durable_cv.notify_all();
}
core.closed.store(true, atomic::Ordering::Release);
core.cv.notify_one();
if let Some(handle) = self.tx.take() {
let _ = handle.join();
}
self.munmap()?;
core.ffile.delete()
}
#[inline]
fn munmap(&self) -> FrozenRes<()> {
let length = self.core.curr_length();
unsafe { self.get_mmap().unmap(length) }
}
#[inline]
fn get_mmap(&self) -> &mem::ManuallyDrop<TMap> {
unsafe { &*self.core.mmap.get() }
}
}
impl<T> Drop for FrozenMMap<T>
where
T: Sized + Send + Sync,
{
fn drop(&mut self) {
let not_unmapped = !self.core.closed.swap(true, atomic::Ordering::Release);
self.core.cv.notify_one();
if let Some(handle) = self.tx.take() {
let _ = handle.join();
}
let _io_lock = self.core.acquire_exclusive_io_lock();
let ptr = self.core.error.swap(std::ptr::null_mut(), atomic::Ordering::AcqRel);
if !ptr.is_null() {
unsafe {
drop(Box::from_raw(ptr));
}
}
if not_unmapped {
let _ = self.munmap();
}
}
}
impl<T> fmt::Display for FrozenMMap<T>
where
T: Sized + Send + Sync,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"FrozenMMap{{fd: {}, len: {}}}",
self.core.ffile.fd(),
self.core.curr_length()
)
}
}
#[derive(Debug)]
struct Core {
cv: sync::Condvar,
ffile: FrozenFile,
lock: sync::Mutex<()>,
io_lock: sync::RwLock<()>,
dirty: atomic::AtomicBool,
durable_cv: sync::Condvar,
closed: atomic::AtomicBool,
durable_lock: sync::Mutex<()>,
flush_duration: time::Duration,
durable_epoch: atomic::AtomicU64,
curr_length: atomic::AtomicUsize,
error: atomic::AtomicPtr<FrozenErr>,
mmap: cell::UnsafeCell<mem::ManuallyDrop<TMap>>,
}
unsafe impl Send for Core {}
unsafe impl Sync for Core {}
impl Core {
fn new(mmap: TMap, ffile: FrozenFile, flush_duration: time::Duration, curr_length: usize) -> Self {
Self {
ffile,
flush_duration,
cv: sync::Condvar::new(),
lock: sync::Mutex::new(()),
io_lock: sync::RwLock::new(()),
durable_cv: sync::Condvar::new(),
durable_lock: sync::Mutex::new(()),
dirty: atomic::AtomicBool::new(false),
closed: atomic::AtomicBool::new(false),
durable_epoch: atomic::AtomicU64::new(0),
curr_length: atomic::AtomicUsize::new(curr_length),
error: atomic::AtomicPtr::new(std::ptr::null_mut()),
mmap: cell::UnsafeCell::new(mem::ManuallyDrop::new(mmap)),
}
}
#[inline]
fn curr_length(&self) -> usize {
self.curr_length.load(atomic::Ordering::Acquire)
}
#[inline]
fn sync(&self) -> FrozenRes<()> {
unsafe { (*self.mmap.get()).sync(self.curr_length()) }?;
self.ffile.sync()
}
#[inline]
fn set_sync_error(&self, err: FrozenErr) {
let boxed = Box::into_raw(Box::new(err));
let old = self.error.swap(boxed, atomic::Ordering::AcqRel);
if !old.is_null() {
unsafe {
drop(Box::from_raw(old));
}
}
}
#[inline]
fn get_sync_error(&self) -> Option<FrozenErr> {
let ptr = self.error.load(atomic::Ordering::Acquire);
if hints::likely(ptr.is_null()) {
return None;
}
Some(unsafe { (*ptr).clone() })
}
#[inline]
fn clear_sync_error(&self) {
let old = self.error.swap(std::ptr::null_mut(), atomic::Ordering::AcqRel);
if hints::unlikely(!old.is_null()) {
unsafe {
drop(Box::from_raw(old));
}
}
}
#[inline]
fn acquire_io_lock(&self) -> FrozenRes<sync::RwLockReadGuard<'_, ()>> {
self.io_lock.read().map_err(|e| new_err_raw(FMMapErr::Lpn, e))
}
#[inline]
fn acquire_exclusive_io_lock(&self) -> FrozenRes<sync::RwLockWriteGuard<'_, ()>> {
self.io_lock.write().map_err(|e| new_err_raw(FMMapErr::Lpn, e))
}
#[inline]
fn incr_epoch(&self) {
self.durable_epoch.fetch_add(1, atomic::Ordering::Release);
}
fn spawn_tx(core: sync::Arc<Self>) -> FrozenRes<thread::JoinHandle<()>> {
match thread::Builder::new()
.name("fm-flush-tx".into())
.spawn(move || Self::flush_tx(core))
{
Ok(tx) => Ok(tx),
Err(error) => {
let mut error = error.to_string().as_bytes().to_vec();
error.extend_from_slice(b"Failed to spawn flush thread for FrozenMMap");
new_err(FMMapErr::Hcf, error)
}
}
}
fn flush_tx(core: sync::Arc<Self>) {
let mut guard = match core.lock.lock() {
Ok(g) => g,
Err(error) => {
let mut message = error.to_string().as_bytes().to_vec();
message.extend_from_slice(b"Flush thread died before init could be completed for FrozenMMap");
let error = FrozenErr::new(
unsafe { MODULE_ID },
ERRDOMAIN,
FMMapErr::Lpn as u16,
FMMapErr::Lpn.default_message(),
message,
);
core.set_sync_error(error);
return;
}
};
loop {
guard = match core.cv.wait_timeout(guard, core.flush_duration) {
Ok((g, _)) => g,
Err(e) => {
core.set_sync_error(new_err_raw(FMMapErr::Txe, e));
return;
}
};
let dirty = core.dirty.swap(false, atomic::Ordering::AcqRel);
let closing = core.closed.load(atomic::Ordering::Acquire);
if !dirty {
if closing {
return;
}
continue;
}
let io_lock = match core.acquire_exclusive_io_lock() {
Ok(lock) => lock,
Err(e) => {
core.set_sync_error(new_err_raw(FMMapErr::Lpn, e));
return;
}
};
drop(guard);
match core.sync() {
Ok(_) => {
core.incr_epoch();
let _g = match core.durable_lock.lock() {
Ok(g) => g,
Err(e) => {
core.set_sync_error(new_err_raw(FMMapErr::Lpn, e));
return;
}
};
core.durable_cv.notify_all();
core.clear_sync_error();
}
Err(err) => core.set_sync_error(err),
}
drop(io_lock);
guard = match core.lock.lock() {
Ok(g) => g,
Err(e) => {
core.set_sync_error(new_err_raw(FMMapErr::Lpn, e));
return;
}
};
}
}
}
#[repr(C)]
pub(in crate::fmmap) struct ObjectInterface<T>
where
T: Sized + Send + Sync,
{
lock: atomic::AtomicU8,
value: cell::UnsafeCell<T>,
}
impl<T> ObjectInterface<T>
where
T: Sized + Send + Sync,
{
const MAX_SPINS: usize = 0x10;
#[inline]
fn lock(&self) -> OIGuard<'_, T> {
let mut spins = 0;
loop {
if self
.lock
.compare_exchange_weak(0, 1, atomic::Ordering::Acquire, atomic::Ordering::Relaxed)
.is_ok()
{
return OIGuard { oi: self };
}
if hints::likely(spins < Self::MAX_SPINS) {
std::hint::spin_loop();
} else {
std::thread::yield_now();
}
spins += 1;
}
}
#[inline]
unsafe fn get(&self) -> &T {
&*self.value.get()
}
#[inline]
#[allow(clippy::mut_from_ref)]
unsafe fn get_mut(&self) -> &mut T {
&mut *self.value.get()
}
}
struct OIGuard<'a, T>
where
T: Sized + Send + Sync,
{
oi: &'a ObjectInterface<T>,
}
impl<T> Drop for OIGuard<'_, T>
where
T: Sized + Send + Sync,
{
fn drop(&mut self) {
self.oi.lock.store(0, atomic::Ordering::Release);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error::TEST_MID;
use crate::ffile::FFileErr;
const INIT_SLOTS: usize = 0x0A;
const FLUSH_DURATION: time::Duration = time::Duration::from_micros(10);
fn new_tmp() -> (tempfile::TempDir, FMCfg) {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("tmp_map");
let cfg = FMCfg {
path,
mid: TEST_MID,
initial_count: INIT_SLOTS,
flush_duration: FLUSH_DURATION,
};
(dir, cfg)
}
mod fm_lifecycle {
use super::*;
#[test]
fn ok_new() {
let (_dir, cfg) = new_tmp();
let mmap = FrozenMMap::<u8>::new(cfg).unwrap();
assert_eq!(mmap.core.flush_duration, FLUSH_DURATION);
assert!(!mmap.core.dirty.load(atomic::Ordering::Acquire));
assert!(!mmap.core.closed.load(atomic::Ordering::Acquire));
assert_eq!(mmap.core.durable_epoch.load(atomic::Ordering::Acquire), 0);
assert_eq!(
mmap.core.curr_length.load(atomic::Ordering::Acquire),
INIT_SLOTS * FrozenMMap::<u8>::SLOT_SIZE
);
assert!(mmap.core.error.load(atomic::Ordering::Acquire).is_null());
assert!(mmap.wait_for_durability(0).is_ok());
}
#[test]
fn ok_new_existing() {
let (_dir, cfg) = new_tmp();
let mmap1 = FrozenMMap::<u8>::new(cfg.clone()).unwrap();
drop(mmap1);
let mmap2 = FrozenMMap::<u8>::new(cfg).unwrap();
drop(mmap2);
}
#[test]
fn err_new_when_change_in_cfg() {
let (_dir, mut cfg) = new_tmp();
let mmap1 = FrozenMMap::<u8>::new(cfg.clone()).unwrap();
drop(mmap1);
cfg.initial_count = INIT_SLOTS * 2;
let err = FrozenMMap::<u8>::new(cfg).unwrap_err();
assert!(err.compare(FFileErr::Cpt as u16));
}
#[test]
fn ok_delete() {
let (_dir, cfg) = new_tmp();
let mut mmap = FrozenMMap::<u8>::new(cfg.clone()).unwrap();
mmap.delete().unwrap();
assert!(!mmap.core.ffile.exists().unwrap());
}
#[test]
fn err_delete_after_delete() {
let (_dir, cfg) = new_tmp();
let mut mmap = FrozenMMap::<u8>::new(cfg.clone()).unwrap();
mmap.delete().unwrap();
assert!(!mmap.core.ffile.exists().unwrap());
let err = mmap.delete().unwrap_err();
assert!(err.compare(FFileErr::Inv as u16));
}
#[test]
fn ok_drop_persists_when_dropped_before_bg_flush() {
let (_dir, cfg) = new_tmp();
const VAL: u8 = 0x0A;
{
let mmap = FrozenMMap::<u8>::new(cfg.clone()).unwrap();
mmap.write(0, |byte| *byte = VAL).unwrap();
drop(mmap);
}
{
let mmap = FrozenMMap::<u8>::new(cfg.clone()).unwrap();
let val = mmap.read(0, |byte| *byte).unwrap();
assert_eq!(val, VAL);
}
}
}
mod fm_grow {
use super::*;
#[test]
fn ok_grow_updates_length() {
let (_dir, cfg) = new_tmp();
let mmap = FrozenMMap::<u8>::new(cfg).unwrap();
assert_eq!(mmap.core.curr_length(), INIT_SLOTS * FrozenMMap::<u8>::SLOT_SIZE);
mmap.grow(0x0A).unwrap();
assert_eq!(
mmap.core.curr_length(),
(INIT_SLOTS + 0x0A) * FrozenMMap::<u8>::SLOT_SIZE
);
}
#[test]
fn ok_grow_sync_cycle() {
let (_dir, cfg) = new_tmp();
let mmap = FrozenMMap::<u8>::new(cfg).unwrap();
for _ in 0..0x0A {
mmap.grow(0x100).unwrap();
}
assert_eq!(
mmap.core.curr_length(),
(INIT_SLOTS + (0x0A * 0x100)) * FrozenMMap::<u8>::SLOT_SIZE
);
}
#[test]
fn ok_write_grow_read() {
let (_dir, cfg) = new_tmp();
let mmap = FrozenMMap::<u64>::new(cfg).unwrap();
mmap.write(0, |v| *v = 0xAA).unwrap();
mmap.grow(0x10).unwrap();
mmap.write(0, |v| *v = 0xBB).unwrap();
let val = mmap.read(0, |v| *v).unwrap();
assert_eq!(val, 0xBB);
}
#[test]
fn ok_write_grow_read_cycle() {
let (_dir, cfg) = new_tmp();
let mmap = FrozenMMap::<u64>::new(cfg).unwrap();
mmap.write(0, |v| *v = 1).unwrap();
for i in 0..5 {
mmap.grow(0x10).unwrap();
let idx = mmap.slots() - 1;
mmap.write(idx, |v| *v = (i + 2) as u64).unwrap();
}
let base = mmap.read(0, |v| *v).unwrap();
assert_eq!(base, 1);
let last_idx = mmap.slots() - 1;
let last = mmap.read(last_idx, |v| *v).unwrap();
assert_eq!(last, 6);
}
}
mod fm_write_read {
use super::*;
#[test]
fn ok_write_wait_read_cycle() {
const VAL: u32 = 0xDEADC0DE;
let (_dir, cfg) = new_tmp();
let mmap = FrozenMMap::<u32>::new(cfg).unwrap();
let (_, epoch) = mmap.write(0, |ptr| *ptr = VAL).unwrap();
mmap.wait_for_durability(epoch).unwrap();
let val = mmap.read(0, |ptr| *ptr).unwrap();
assert_eq!(val, VAL);
}
#[test]
fn ok_write_read_without_wait() {
const VAL: u32 = 0xDEADC0DE;
let (_dir, cfg) = new_tmp();
let mmap = FrozenMMap::<u32>::new(cfg).unwrap();
mmap.write(0, |ptr| *ptr = VAL).unwrap();
let val = mmap.read(0, |ptr| *ptr).unwrap();
assert_eq!(val, VAL);
}
}
mod fm_durability {
use super::*;
#[test]
fn ok_wait_then_drop() {
let (_dir, cfg) = new_tmp();
let mmap = FrozenMMap::<u64>::new(cfg).unwrap();
let (_, epoch) = mmap.write(0, |v| *v = 7).unwrap();
mmap.wait_for_durability(epoch).unwrap();
drop(mmap);
}
#[test]
fn ok_epoch_monotonicity() {
let (_dir, cfg) = new_tmp();
let mmap = FrozenMMap::<u64>::new(cfg).unwrap();
let (_, e1) = mmap.write(0, |v| *v = 1).unwrap();
mmap.wait_for_durability(e1).unwrap();
let (_, e2) = mmap.write(0, |v| *v = 2).unwrap();
mmap.wait_for_durability(e2).unwrap();
assert!(e2 >= e1);
}
#[test]
fn ok_wait_for_durability_with_multi_writers() {
let (_dir, cfg) = new_tmp();
let mmap = sync::Arc::new(FrozenMMap::<u64>::new(cfg).unwrap());
let mut handles = Vec::new();
for _ in 0..0x0A {
let mmap = mmap.clone();
handles.push(thread::spawn(move || {
let (_, epoch) = mmap.write(0, |v| *v += 1).unwrap();
mmap.wait_for_durability(epoch).unwrap();
}));
}
for h in handles {
h.join().unwrap();
}
let val = mmap.read(0, |v| *v).unwrap();
assert_eq!(val, 0x0A);
}
}
mod fm_concurrency {
use super::*;
#[test]
fn ok_oi_lock_with_multi_threads_same_index() {
let (_dir, cfg) = new_tmp();
let mmap = sync::Arc::new(FrozenMMap::<u64>::new(cfg).unwrap());
let mut handles = Vec::new();
for _ in 0..0x0A {
let mmap = mmap.clone();
handles.push(thread::spawn(move || {
for _ in 0..0x100 {
mmap.write(0, |v| *v += 1).unwrap();
}
}));
}
for h in handles {
h.join().unwrap();
}
let val = mmap.read(0, |v| *v).unwrap();
assert_eq!(val, 0x0A * 0x100);
}
#[test]
fn ok_parallel_reads_with_diff_index() {
let (_dir, cfg) = new_tmp();
let mmap = sync::Arc::new(FrozenMMap::<u64>::new(cfg).unwrap());
mmap.write(0, |v| *v = 0x10).unwrap();
mmap.write(1, |v| *v = 0x20).unwrap();
let t1 = {
let mmap = mmap.clone();
thread::spawn(move || mmap.read(0, |v| *v).unwrap())
};
let t2 = {
let mmap = mmap.clone();
thread::spawn(move || mmap.read(1, |v| *v).unwrap())
};
assert_eq!(t1.join().unwrap(), 0x10);
assert_eq!(t2.join().unwrap(), 0x20);
}
#[test]
fn ok_grow_with_multi_threads() {
let (_dir, cfg) = new_tmp();
let mmap = sync::Arc::new(FrozenMMap::<u64>::new(cfg).unwrap());
const THREADS: usize = 4;
const GROWS_PER_THREAD: usize = 0x10;
let mut handles = Vec::new();
for _ in 0..THREADS {
let mmap = mmap.clone();
handles.push(thread::spawn(move || {
for _ in 0..GROWS_PER_THREAD {
mmap.grow(1).unwrap();
let idx = mmap.slots() - 1;
mmap.write(idx, |v| *v = 0xABCD).unwrap();
}
}));
}
for h in handles {
h.join().unwrap();
}
let expected_min = INIT_SLOTS + (THREADS * GROWS_PER_THREAD);
assert_eq!(mmap.slots(), expected_min);
let last = mmap.read(mmap.slots() - 1, |v| *v).unwrap();
assert_eq!(last, 0xABCD);
}
#[test]
fn ok_wait_during_grow_cycle() {
let (_dir, cfg) = new_tmp();
let mmap = sync::Arc::new(FrozenMMap::<u64>::new(cfg).unwrap());
let mmap2 = mmap.clone();
let t = thread::spawn(move || {
let (_, epoch) = mmap2.write(0, |v| *v = 42).unwrap();
mmap2.wait_for_durability(epoch).unwrap();
});
mmap.grow(8).unwrap();
t.join().unwrap();
let val = mmap.read(0, |v| *v).unwrap();
assert_eq!(val, 42);
}
}
}