Skip to main content

frozen_core/fmmap/
mod.rs

1//! Custom implementation of `mmap(2)`
2//!
3//! ## Constraints
4//!
5//! [`FrozenMMap`] treats the mapped file as raw storage for values of `T`. Because of that,
6//! `T` must be a POD type which is safe to persist and later reinterpret from bytes.
7//!
8//! Required properties for `T`,
9//!
10//! - Must use `#[repr(C)]`
11//! - Should be 8-bytes aligned
12//! - Must not implement [`Drop`]
13//! - `size_of::<T>()` should be multiple of `8`
14//!
15//! *NOTE:* `T` must not contain heap owning or process-local pointers like [`Vec`], [`String`],
16//! [`Box`], references and function pointers, or other fields whose bit-pattern is not stable
17//! across reopen.
18//!
19//! These constrains are enforced as [`FrozenMMap`] does not serialize or deserialize values. It
20//! directly reads and writes `T` inside a memory mapped file. That means `T` must have a stable
21//! layout and must remain valid when the file is reopened in a later process.
22//!
23//! ## Example
24//!
25//! ```
26//! use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
27//!
28//! const MODULE_ID: u8 = 0;
29//!
30//! let dir = tempfile::tempdir().unwrap();
31//! let path = dir.path().join("tmp_frozen_mmap");
32//!
33//! let cfg = FrozenMMapCfg {
34//!     module_id: MODULE_ID,
35//!     initial_count: 0x0A,
36//!     immediate_durability: false,
37//!     flush_duration: std::time::Duration::from_micros(0x96),
38//! };
39//!
40//! let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
41//! assert_eq!(mmap.total_slots(), 0x0A);
42//!
43//! let ticket = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
44//! let _ = futures::executor::block_on(ticket).unwrap();
45//!
46//! let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
47//! assert_eq!(val, 0xDEADC0DE);
48//!
49//! drop(mmap);
50//!
51//! let reopened = FrozenMMap::<u64>::new_grown(&path, cfg, 0x05).unwrap();
52//! assert_eq!(reopened.total_slots(), 0x0A + 0x05);
53//!
54//! let val = unsafe { reopened.read(0, |v| *v) }.unwrap();
55//! assert_eq!(val, 0xDEADC0DE);
56//! ```
57
58#[cfg(any(target_os = "linux", target_os = "macos"))]
59mod posix;
60
61use crate::{
62    ack,
63    error::FrozenResult,
64    ffile::{FrozenFile, FrozenFileCfg},
65    hints,
66};
67use std::{
68    fmt,
69    sync::{self, atomic},
70    thread, time,
71};
72
73/// type for `epoch` used by write ops
74pub type TEpoch = u64;
75
76#[cfg(any(target_os = "linux", target_os = "macos"))]
77type TMap = posix::POSIXMMap;
78
79/// Error codes for [`FrozenMMap`]
80pub(in crate::fmmap) mod err {
81    use crate::error::{ErrCode, FrozenError, FrozenResult};
82
83    /// Domain Id for [`FrozenMMap`] is **18**
84    const ERRDOMAIN: u8 = 0x12;
85
86    /// module id used for [`FrozenMMap`]
87    pub static MID: std::sync::OnceLock<u8> = std::sync::OnceLock::new();
88
89    #[cfg(not(test))]
90    #[inline(always)]
91    pub fn mid() -> &'static u8 {
92        MID.get().unwrap()
93    }
94
95    #[cfg(test)]
96    #[inline(always)]
97    pub fn mid() -> &'static u8 {
98        MID.get_or_init(|| 0)
99    }
100
101    /// internal fuck up (hault and catch fire)
102    pub const HCF: ErrCode = ErrCode::new(0x02, "hault and catch fire");
103
104    /// unknown error (fallback)
105    pub const UNK: ErrCode = ErrCode::new(0x04, "unknown error");
106
107    /// no more memory available
108    pub const NMM: ErrCode = ErrCode::new(0x06, "not enough memory available on the device");
109
110    /// syncing error
111    pub const SYN: ErrCode = ErrCode::new(0x08, "failed to sync/flush data to storage device");
112
113    /// no write/read perm
114    pub const PRM: ErrCode = ErrCode::new(0x0A, "missing permissions for IO");
115
116    /// type `T` must not be zero sized
117    pub const ZRO: ErrCode = ErrCode::new(0x0C, "type T must not be zero sized");
118
119    /// flush_tx error (unable to spawn)
120    pub const FXE: ErrCode = ErrCode::new(0x10, "unable to spawn flush_tx");
121
122    /// type `T` implements drop
123    pub const DRP: ErrCode = ErrCode::new(0x12, "type T must not implement `Drop`");
124
125    /// type `T` is not 8 bytes aligned
126    pub const ALN: ErrCode = ErrCode::new(0x14, "type T must be 8-bytes aligned");
127
128    /// `size_of::<T>()` is not multiple of 8
129    pub const SZE: ErrCode = ErrCode::new(0x16, "`size_of::<T>()` must be multiple of 8 bytes");
130
131    #[inline]
132    pub(in crate::fmmap) fn new_err<R, E: std::fmt::Display>(
133        code: ErrCode,
134        error: E,
135    ) -> FrozenResult<R> {
136        let err = FrozenError::new_raw(*mid(), ERRDOMAIN, code, error);
137        Err(err)
138    }
139
140    #[inline]
141    pub(in crate::fmmap) fn new_err_default<R>(code: ErrCode) -> FrozenResult<R> {
142        let err = FrozenError::new_raw(*mid(), ERRDOMAIN, code, "");
143        Err(err)
144    }
145}
146
147/// Config for [`FrozenMMap`]
148#[derive(Debug, Clone)]
149pub struct FrozenMMapCfg {
150    /// Identifier used for error propagation by [`frozen_core::error::FrozenError`]
151    pub module_id: u8,
152
153    /// Number of slots to pre-allocate when [`FrozenMMap`] is initialized
154    ///
155    /// Each slot has size of [`FrozenMMap::<T>::SLOT_SIZE`], where initial file length will
156    /// be `chunk_size * initial_count` (bytes)
157    pub initial_count: usize,
158
159    /// Time interval used by flusher tx, to batch write ops into a durable window and sync them
160    /// together, where all write ops in certain time interval falls into a single durable window
161    pub flush_duration: time::Duration,
162
163    /// Enables immediate durability scheduing write operations
164    ///
165    /// When disabled, durability is driven by the background flusher thread with respect to
166    /// [`FrozenMMapCfg::flush_duration`] time interval, allowing multiple write calls to be
167    /// bathced and flushed together into a single sync operation, reducing strain on resources.
168    ///
169    /// When enabled, every successful [`FrozenMMap::write`] and [`FrozenMMapTransaction::commit`]
170    /// immediately wakes the background flusher thread, disregarding the
171    /// [`FrozenMMapCfg::flush_duration`] time interval.
172    ///
173    /// This configuration is intended for workloads where writes are rare and durability latency
174    /// is preferred over batching efficiency.
175    pub immediate_durability: bool,
176}
177
178/// Custom implementation of `mmap(2)`
179///
180/// ## Constraints
181///
182/// [`FrozenMMap`] treats the mapped file as raw storage for values of `T`. Because of that, `T`
183/// must be a POD type which is safe to persist and later reinterpret from bytes.
184///
185/// Required properties for `T`,
186///
187/// - Must use `#[repr(C)]`
188/// - Should be 8-bytes aligned
189/// - Must not implement [`Drop`]
190/// - `size_of::<T>()` should be multiple of `8`
191///
192/// *NOTE:* `T` must not contain heap owning or process-local pointers like [`Vec`], [`String`],
193/// [`Box`], references and function pointers, or other fields whose bit-pattern is not stable
194/// across reopen.
195///
196/// These constrains are enforced as [`FrozenMMap`] does not serialize or deserialize values. It
197/// directly reads and writes `T` inside a memory mapped file. That means `T` must have a stable
198/// layout and must remain valid when the file is reopened in a later process.
199///
200/// ## Example
201///
202/// ```
203/// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
204///
205/// const MODULE_ID: u8 = 0;
206///
207/// let dir = tempfile::tempdir().unwrap();
208/// let path = dir.path().join("tmp_frozen_mmap");
209///
210/// let cfg = FrozenMMapCfg {
211///     module_id: MODULE_ID,
212///     initial_count: 0x0A,
213///     immediate_durability: false,
214///     flush_duration: std::time::Duration::from_micros(0x96),
215/// };
216///
217/// let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
218/// assert_eq!(mmap.total_slots(), 0x0A);
219///
220/// let ticket = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
221/// let _ = futures::executor::block_on(ticket).unwrap();
222///
223/// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
224/// assert_eq!(val, 0xDEADC0DE);
225///
226/// drop(mmap);
227///
228/// let reopened = FrozenMMap::<u64>::new_grown(&path, cfg, 0x05).unwrap();
229/// assert_eq!(reopened.total_slots(), 0x0A + 0x05);
230///
231/// let val = unsafe { reopened.read(0, |v| *v) }.unwrap();
232/// assert_eq!(val, 0xDEADC0DE);
233/// ```
234#[derive(Debug)]
235pub struct FrozenMMap<T>
236where
237    T: Sized + Send + Sync,
238{
239    core: sync::Arc<Core>,
240    flush_tx_handle: Option<thread::JoinHandle<()>>,
241    _t: core::marker::PhantomData<T>,
242}
243
244unsafe impl<T> Send for FrozenMMap<T> where T: Sized + Send + Sync {}
245unsafe impl<T> Sync for FrozenMMap<T> where T: Sized + Send + Sync {}
246
247impl<T> FrozenMMap<T>
248where
249    T: Sized + Send + Sync,
250{
251    /// Memory space required for each slot of [`T`] in [`FrozenMMap`]
252    pub const SLOT_SIZE: usize = std::mem::size_of::<T>();
253
254    /// Create a new [`FrozenMMap`] instance w/ given [`FrozenMMapCfg`]
255    ///
256    /// ## Multiple Instances
257    ///
258    /// For each [`FrozenMMap`] instance, we acquire an exclusive lock from the kernal for the
259    /// underlying [`FrozenFile`], when trying to create multiple instances of [`FrozenMMap`], an
260    /// error will be thrown.
261    ///
262    /// ## Capacity Growth
263    ///
264    /// [`FrozenMMap`] does not support in-place growth of a live mapping, to increase capacity,
265    /// drop the current instance and reopen w/ [`FrozenMMap::open_grown`] which provides memory
266    /// mapping over grown capacity.
267    ///
268    /// ## [`FrozenMMapCfg`]
269    ///
270    /// All configs for [`FrozenMMap`] are stored in [`FrozenMMapCfg`]
271    ///
272    /// ## Working
273    ///
274    /// We first create a new [`FrozenFile`] if note already, then map the entire file using
275    /// `mmap(2)`, the entire file must read/write `T`, which also should stay constant for the
276    /// entire lifetime of file.
277    ///
278    /// ## Important
279    ///
280    /// The `cfg` must not change any of its properties for the entire life of [`FrozenFile`],
281    /// which is used under the hood, one must use config stores like [`Rta`](https://crates.io/crates/rta)
282    /// to store config.
283    ///
284    /// ## Example
285    ///
286    /// ```
287    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
288    ///
289    /// const MODULE_ID: u8 = 0;
290    ///
291    /// let dir = tempfile::tempdir().unwrap();
292    /// let path = dir.path().join("tmp_frozen_mmap");
293    ///
294    /// let cfg = FrozenMMapCfg {
295    ///     module_id: MODULE_ID,
296    ///     initial_count: 0x0A,
297    ///     immediate_durability: false,
298    ///     flush_duration: std::time::Duration::from_micros(0x96),
299    /// };
300    ///
301    /// let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
302    /// assert_eq!(mmap.total_slots(), 0x0A);
303    ///
304    /// let ticket = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
305    /// let _ = futures::executor::block_on(ticket).unwrap();
306    ///
307    /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
308    /// assert_eq!(val, 0xDEADC0DE);
309    /// ```
310    pub fn new<P: AsRef<std::path::Path>>(path: P, cfg: FrozenMMapCfg) -> FrozenResult<Self> {
311        Self::validate_t()?;
312        let (file, curr_length) = Self::open_file(path.as_ref().to_path_buf(), &cfg)?;
313        let total_slots = curr_length / Self::SLOT_SIZE;
314
315        // NOTE: The value is used for error logging and is initialized only once, as `OnceLock`
316        // guarantees that the first caller sets the value and all subsequent calls reuse it
317        let _ = err::MID.get_or_init(|| cfg.module_id);
318
319        let mmap = unsafe { TMap::new(file.fd(), curr_length) }?;
320        let core = sync::Arc::new(Core::new(mmap, file, curr_length, total_slots));
321
322        let cloned_core = sync::Arc::clone(&core);
323        let flush_tx_handle = match thread::Builder::new()
324            .name(format!("mod{}_fmmap_flush_tx", cfg.module_id))
325            .spawn(move || bg_flush_thread(cloned_core, cfg.flush_duration))
326        {
327            Ok(handle) => Some(handle),
328            Err(observed_error) => {
329                return err::new_err(err::FXE, observed_error);
330            }
331        };
332
333        Ok(Self { core, flush_tx_handle, _t: core::marker::PhantomData })
334    }
335
336    /// Create a new [`FrozenMMap`] instance w/ given [`FrozenMMapCfg`], while growing the
337    /// underlying [`FrozenFile`] by `additional_slots` before creating memory mapping
338    ///
339    /// ## Multiple Instances
340    ///
341    /// For each [`FrozenMMap`] instance, we acquire an exclusive lock from the kernal for the
342    /// underlying [`FrozenFile`], when trying to create multiple instances of [`FrozenMMap`], an
343    /// error will be thrown.
344    ///
345    /// ## Why not create a [`FrozenMMap::grow`] call?
346    ///
347    /// Previously when [`FrozenMMap::grow`] was attempted, it was observed that, resizing an active
348    /// memory mapping in place is tricky in concurrent code, as some threads would still hold
349    /// stale/unmapped pointers to mmap due to preemption from the OS schedular
350    ///
351    /// So, instead of remmaping a live instance, the current API performs growth during open,
352    /// making capacity expansion an explicit lifecycle operation, and not a side effect on a live
353    /// instance.
354    ///
355    /// ## [`FrozenMMapCfg`]
356    ///
357    /// All configs for [`FrozenMMap`] are stored in [`FrozenMMapCfg`]
358    ///
359    /// ## Working
360    ///
361    /// We first create a new [`FrozenFile`] if note already, then we grow the file using
362    /// [`FrozenFile::grow`] then map the entire file using `mmap(2)`, the entire file must
363    /// read/write `T`, which also should stay constant for the entire lifetime of file.
364    ///
365    /// ## Important
366    ///
367    /// The `cfg` must not change any of its properties for the entire life of [`FrozenFile`],
368    /// which is used under the hood, one must use config stores like
369    /// [`Rta`](https://crates.io/crates/rta) to store config.
370    ///
371    /// ## Example
372    ///
373    /// ```
374    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
375    ///
376    /// const MODULE_ID: u8 = 0;
377    ///
378    /// let dir = tempfile::tempdir().unwrap();
379    /// let path = dir.path().join("tmp_frozen_mmap");
380    ///
381    /// let cfg = FrozenMMapCfg {
382    ///     module_id: MODULE_ID,
383    ///     initial_count: 0x0A,
384    ///     immediate_durability: false,
385    ///     flush_duration: std::time::Duration::from_micros(0x96),
386    /// };
387    ///
388    /// let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x0A).unwrap();
389    /// assert_eq!(mmap.total_slots(), 0x0A * 2);
390    ///
391    /// let ticket = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
392    /// let _ = futures::executor::block_on(ticket).unwrap();
393    ///
394    /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
395    /// assert_eq!(val, 0xDEADC0DE);
396    /// ```
397    pub fn new_grown<P: AsRef<std::path::Path>>(
398        path: P,
399        cfg: FrozenMMapCfg,
400        additional_slots: usize,
401    ) -> FrozenResult<Self> {
402        Self::validate_t()?;
403        let (file, _) = Self::open_file(path.as_ref().to_path_buf(), &cfg)?;
404
405        // we grow the underlying FrozenFile as requested
406        file.grow(additional_slots)?;
407        let curr_length = file.length()?; // we must read the updated (grown) length
408        let total_slots = curr_length / Self::SLOT_SIZE;
409
410        // NOTE: The value is used for error logging and is initialized only once, as `OnceLock`
411        // guarantees that the first caller sets the value and all subsequent calls reuse it
412        let _ = err::MID.get_or_init(|| cfg.module_id);
413
414        let mmap = unsafe { TMap::new(file.fd(), curr_length) }?;
415        let core = sync::Arc::new(Core::new(mmap, file, curr_length, total_slots));
416
417        let cloned_core = sync::Arc::clone(&core);
418        let flush_tx_handle = match thread::Builder::new()
419            .name(format!("mod{}_fmmap_flush_tx", cfg.module_id))
420            .spawn(move || bg_flush_thread(cloned_core, cfg.flush_duration))
421        {
422            Ok(handle) => Some(handle),
423            Err(observed_error) => {
424                return err::new_err(err::FXE, observed_error);
425            }
426        };
427
428        Ok(Self { core, flush_tx_handle, _t: core::marker::PhantomData })
429    }
430
431    /// Create/open a new [`FrozenFile`] instance
432    fn open_file(
433        path: std::path::PathBuf,
434        cfg: &FrozenMMapCfg,
435    ) -> FrozenResult<(FrozenFile, usize)> {
436        let ff_cfg = FrozenFileCfg {
437            path,
438            module_id: cfg.module_id,
439            buffer_size: Self::SLOT_SIZE,
440            initial_available_buffers: cfg.initial_count,
441        };
442
443        let file = FrozenFile::new(ff_cfg)?;
444        let curr_length = file.length()?;
445
446        Ok((file, curr_length))
447    }
448
449    #[inline]
450    fn validate_t() -> FrozenResult<()> {
451        if std::mem::needs_drop::<T>() {
452            return err::new_err_default(err::DRP);
453        }
454
455        let align = std::mem::align_of::<T>();
456        if align != 8 {
457            return err::new_err_default(err::ALN);
458        }
459
460        let size = std::mem::size_of::<T>();
461        if size == 0 {
462            return err::new_err_default(err::ZRO);
463        }
464
465        if size % 8 != 0 {
466            return err::new_err_default(err::SZE);
467        }
468
469        Ok(())
470    }
471
472    /// Read a `T` at given `index` via callback (`f`)
473    ///
474    /// ## Concurrency
475    ///
476    /// Internally, [`FrozenMMap`] implements per-slot locking, so concurrent reads and writes for
477    /// at same index is atomic and thread safe, while operations on different indices may proceed
478    /// fully in parallel
479    ///
480    /// ## Safety
481    ///
482    /// The caller must ensure following:
483    ///
484    /// - given `index` is within bounds
485    /// - underlying memory contains a valid instance of `T`
486    /// - provided callback `f` must not,
487    ///   - write through the pointer
488    ///   - store or leak pointer beyound there lifetime
489    ///
490    /// Violating any of the above may result in undefined behavior
491    ///
492    /// ## Example
493    ///
494    /// ```
495    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
496    ///
497    /// const MODULE_ID: u8 = 0;
498    ///
499    /// let dir = tempfile::tempdir().unwrap();
500    /// let path = dir.path().join("tmp_read_mmap");
501    ///
502    /// let cfg = FrozenMMapCfg {
503    ///     module_id: MODULE_ID,
504    ///     initial_count: 0x02,
505    ///     immediate_durability: false,
506    ///     flush_duration: std::time::Duration::from_micros(0x60),
507    /// };
508    ///
509    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
510    ///
511    /// let ticket = unsafe { mmap.write(0, |v| *v = 0x0A) }.unwrap();
512    /// let _ = futures::executor::block_on(ticket).unwrap();
513    ///
514    /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
515    /// assert_eq!(val, 0x0A);
516    /// ```
517    #[inline(always)]
518    pub unsafe fn read<R>(&self, index: usize, f: impl FnOnce(*const T) -> R) -> FrozenResult<R> {
519        let offset = Self::SLOT_SIZE * index;
520        let _lock = self.core.locks.lock(index);
521
522        // NOTE: We do avoid acquiring io_lock for read ops to increase the throughput (under the
523        // assumption of OS guarantees visibility)
524
525        let ptr = unsafe { self.core.map.as_ptr(offset) };
526        Ok(f(ptr))
527    }
528
529    /// Write/update a `T` at given `index` via callback (`f`)
530    ///
531    /// ## Concurrency
532    ///
533    /// Internally, [`FrozenMMap`] implements per-slot locking, so concurrent reads and writes for
534    /// at same index is atomic and thread safe, while operations on different indices may proceed
535    /// fully in parallel
536    ///
537    /// ## Safety
538    ///
539    /// The caller must ensure following:
540    ///
541    /// - given `index` is within bounds
542    /// - underlying memory contains a valid instance of `T`
543    /// - provided callback `f` must not  store or leak pointer beyound there lifetime
544    ///
545    /// Violating any of the above may result in undefined behavior
546    ///
547    /// ## Example
548    ///
549    /// ```
550    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
551    ///
552    /// const MODULE_ID: u8 = 0;
553    ///
554    /// let dir = tempfile::tempdir().unwrap();
555    /// let path = dir.path().join("tmp_write_mmap");
556    ///
557    /// let cfg = FrozenMMapCfg {
558    ///     module_id: MODULE_ID,
559    ///     initial_count: 0x02,
560    ///     immediate_durability: false,
561    ///     flush_duration: std::time::Duration::from_micros(0x96),
562    /// };
563    ///
564    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
565    ///
566    /// let ticket = unsafe {mmap.write(1, |v| *v = 0x2B) }.unwrap();
567    /// let _ = futures::executor::block_on(ticket).unwrap();
568    ///
569    /// let val = unsafe { mmap.read(1, |v| *v) }.unwrap();
570    /// assert_eq!(val, 0x2B);
571    /// ```
572    #[inline(always)]
573    pub unsafe fn write(
574        &self,
575        index: usize,
576        f: impl FnOnce(*mut T),
577    ) -> FrozenResult<ack::AckTicket> {
578        // propagate prev errors
579        if let Some(err) = self.core.completion.get_err() {
580            return Err(err);
581        }
582
583        let offset = Self::SLOT_SIZE * index;
584
585        let _guard = self.core.acquire_io_lock();
586        let _lock = self.core.locks.lock(index);
587
588        let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
589        f(ptr);
590
591        self.core.dirty.store(true, atomic::Ordering::Release);
592        let epoch = self.core.completion.increment_current_epoch();
593
594        Ok(ack::AckTicket::new(epoch, self.core.completion.clone()))
595    }
596
597    /// Read current available count of slots, where each slot has size of [`FrozenMMap::<T>::SLOT_SIZE`]
598    ///
599    /// ## Working
600    ///
601    /// This call performs a syscall to fetch current length of [`FrozenFile`] from fs, as the
602    /// current length of the file is not cached anywhere in the pipeline to avoid TOCTAU race
603    /// conditions
604    ///
605    /// ## Example
606    ///
607    /// ```
608    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
609    ///
610    /// const MODULE_ID: u8 = 0;
611    ///
612    /// let dir = tempfile::tempdir().unwrap();
613    /// let path = dir.path().join("tmp_grow_mmap");
614    ///
615    /// let cfg = FrozenMMapCfg {
616    ///     module_id: MODULE_ID,
617    ///     initial_count: 0x02,
618    ///     immediate_durability: false,
619    ///     flush_duration: std::time::Duration::from_micros(0x96),
620    /// };
621    ///
622    /// let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
623    /// assert_eq!(mmap.total_slots(), 0x02);
624    ///
625    /// drop(mmap);
626    ///
627    /// let mmap = FrozenMMap::<u64>::new_grown(&path, cfg, 0x03).unwrap();
628    /// assert_eq!(mmap.total_slots(), 0x02 + 0x03);
629    /// ```
630    #[inline]
631    pub fn total_slots(&self) -> usize {
632        self.core.curr_length / Self::SLOT_SIZE
633    }
634
635    /// Read the total memory footprint (in bytes) used by [`FrozenMMap`]
636    ///
637    /// *NOTE:* This is an approzimation of memory used, actual RSS may differ depending on paging
638    /// and OS
639    ///
640    /// ## Example
641    ///
642    /// ```
643    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
644    ///
645    /// const MODULE_ID: u8 = 0;
646    ///
647    /// let dir = tempfile::tempdir().unwrap();
648    /// let path = dir.path().join("tmp_mem_usage");
649    ///
650    /// let cfg = FrozenMMapCfg {
651    ///     module_id: MODULE_ID,
652    ///     initial_count: 0x10,
653    ///     immediate_durability: false,
654    ///     flush_duration: std::time::Duration::from_micros(0x60),
655    /// };
656    ///
657    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
658    ///
659    /// let bytes = mmap.memory_usage();
660    /// assert!(bytes >= mmap.total_slots() * std::mem::size_of::<u64>());
661    /// ```
662    #[inline]
663    pub fn memory_usage(&self) -> usize {
664        // memory of mem-maped region
665        let mmap_bytes = self.core.curr_length;
666
667        // memory used for locking
668        let lock_bytes = self.core.locks.0.len() * std::mem::size_of::<atomic::AtomicU8>();
669
670        mmap_bytes + lock_bytes
671    }
672
673    /// Create a new [`FrozenMMapTransaction`] context for grouping multi write ops into a single atomic
674    /// operation
675    ///
676    /// ## Overview
677    ///
678    /// The use of [`FrozenMMapTransaction`] allows to group multiple write ops into a single atomic
679    /// operation, hence creating a transactional write operation, which gives following guarantees:
680    ///
681    /// - All write ops succeed together
682    /// - Single epoch to track durability of all writes ops
683    /// - Same durability guarantee for all the included write ops
684    ///
685    /// Simply, this preserves atomic durability semantics for multi index updates
686    ///
687    /// ## Example
688    ///
689    /// ```
690    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
691    ///
692    /// const MODULE_ID: u8 = 0;
693    ///
694    /// let dir = tempfile::tempdir().unwrap();
695    /// let path = dir.path().join("tmp_tx");
696    ///
697    /// let cfg = FrozenMMapCfg {
698    ///     module_id: MODULE_ID,
699    ///     initial_count: 0x0A,
700    ///     immediate_durability: false,
701    ///     flush_duration: std::time::Duration::from_micros(50),
702    /// };
703    ///
704    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
705    ///
706    /// let mut tx = mmap.new_tx();
707    /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
708    /// unsafe { tx.write(1, |v| *v = 0x14) }.unwrap();
709    ///
710    /// let ticket = tx.commit().unwrap();
711    /// let _ = futures::executor::block_on(ticket).unwrap();
712    ///
713    /// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
714    /// let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
715    ///
716    /// assert_eq!((v0, v1), (0x0A, 0x14));
717    /// ```
718    #[inline]
719    pub fn new_tx(&self) -> FrozenMMapTransaction<'_, T> {
720        FrozenMMapTransaction { core: &self.core, ops_vec: Vec::new() }
721    }
722
723    /// Delete the underlying [`FrozenFile`] used for [`FrozenMMap`] from fs
724    ///
725    /// ## Working
726    ///
727    /// When `delete` is called, all read, write, and (background) sync ops are paused
728    /// (indefinitely), whule deletion is done with following steps:
729    ///
730    /// - acquire an exclusive `io_lock` (all other ops are paused indefinitely)
731    /// - if any batch is pending for sync,
732    ///   - swap the flag
733    ///   - call sync manually
734    ///   - incr epoch and update cv
735    /// - brodcast closing so flusher tx could wrap up
736    /// - `munmap(2)` current mapping
737    /// - call delete on [`FrozenFile`]
738    ///
739    /// ## Example
740    ///
741    /// ```
742    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
743    ///
744    /// const MODULE_ID: u8 = 0;
745    ///
746    /// let dir = tempfile::tempdir().unwrap();
747    /// let path = dir.path().join("tmp_delete_mmap");
748    ///
749    /// let cfg = FrozenMMapCfg {
750    ///     module_id: MODULE_ID,
751    ///     initial_count: 0x04,
752    ///     immediate_durability: false,
753    ///     flush_duration: std::time::Duration::from_micros(0x96),
754    /// };
755    ///
756    /// let mut mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
757    /// mmap.delete().unwrap();
758    /// assert!(!path.exists());
759    /// ```
760    pub fn delete(&mut self) -> FrozenResult<()> {
761        // NOTE: we must broadcast that the close is happening to allow flusher tx to wrap up
762        self.core.dirty.store(false, atomic::Ordering::Release);
763        self.core.closed.store(true, atomic::Ordering::Release);
764        self.core.durable_cv.notify_one();
765
766        if let Some(handle) = self.flush_tx_handle.take() {
767            let _ = handle.join();
768        }
769
770        // pause all new write ops
771        let _lock = self.core.acquire_exclusive_io_lock();
772
773        self.munmap()?;
774        self.core.file.delete()
775    }
776
777    /// Flush the dirty mmap data to persistant storage
778    ///
779    /// *NOTE:* This call must be paired w/ [`FrozenMMap::flush_file`] for stronger durability
780    /// guarantee
781    ///
782    /// ## Example
783    ///
784    /// ```
785    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
786    ///
787    /// const MODULE_ID: u8 = 0;
788    ///
789    /// let dir = tempfile::tempdir().unwrap();
790    /// let path = dir.path().join("tmp_delete_mmap");
791    ///
792    /// let cfg = FrozenMMapCfg {
793    ///     module_id: MODULE_ID,
794    ///     initial_count: 0x04,
795    ///     immediate_durability: false,
796    ///     flush_duration: std::time::Duration::from_micros(0x96),
797    /// };
798    ///
799    /// let mut mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
800    /// assert!(unsafe { mmap.flush_mmap() }.is_ok());
801    /// ```
802    #[inline]
803    pub unsafe fn flush_mmap(&self) -> FrozenResult<()> {
804        self.core.map.sync(self.core.curr_length)
805    }
806
807    /// Perform hard flush for the dirty mmap'ed data for stronger durability guarantee
808    ///
809    /// ## Example
810    ///
811    /// ```
812    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
813    ///
814    /// const MODULE_ID: u8 = 0;
815    ///
816    /// let dir = tempfile::tempdir().unwrap();
817    /// let path = dir.path().join("tmp_delete_mmap");
818    ///
819    /// let cfg = FrozenMMapCfg {
820    ///     module_id: MODULE_ID,
821    ///     initial_count: 0x04,
822    ///     immediate_durability: false,
823    ///     flush_duration: std::time::Duration::from_micros(0x96),
824    /// };
825    ///
826    /// let mut mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
827    /// assert!(unsafe { mmap.flush_file() }.is_ok());
828    /// ```
829    #[inline]
830    pub unsafe fn flush_file(&self) -> FrozenResult<()> {
831        self.core.file.sync()
832    }
833
834    #[inline]
835    fn munmap(&self) -> FrozenResult<()> {
836        let length = self.core.curr_length;
837        unsafe { self.core.map.unmap(length) }
838    }
839}
840
841impl<T> Drop for FrozenMMap<T>
842where
843    T: Sized + Send + Sync,
844{
845    fn drop(&mut self) {
846        let is_closed = self.core.closed.swap(true, atomic::Ordering::Release);
847        self.core.cv.notify_one(); // notify flusher tx to shut
848
849        if let Some(handle) = self.flush_tx_handle.take() {
850            let _ = handle.join();
851        }
852
853        // we must acquire an exclusive lock, to prevent dropping while sync, growing or any io ops
854        let _io_lock = self.core.acquire_exclusive_io_lock();
855
856        if !is_closed {
857            let _ = self.munmap();
858        }
859    }
860}
861
862impl<T> fmt::Display for FrozenMMap<T>
863where
864    T: Sized + Send + Sync,
865{
866    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
867        write!(
868            f,
869            "FrozenMMap{{fd: {}, total_slots: {}, len: {}}}",
870            self.core.file.fd(),
871            self.total_slots(),
872            self.core.curr_length,
873        )
874    }
875}
876
877/// ## Why we ignore [`std::sync::PoisonError`]?
878///
879/// The mutex used for lock, is solely used as a parking primitive for [`Condvar`] and does not
880/// protect any mutable state. All the pool invariants and accounting are maintained via atomics
881/// and are completely seperated from the mutex.
882///
883/// A poisoned mutex only indicates that another tx panicked while holding the lock, and indicates
884/// an inconsistent state of the protected value. Since no state can be left partially modified
885/// under this lock, there is no possible consistency risk to recover from and propagating the
886/// poison error would only introduce unnecessary failures into the allocation path.
887///
888/// Therefore, as best effort, we consume the [`std::sync::PoisonError`] and continue operating
889/// with the recovered guard.
890fn bg_flush_thread(core: sync::Arc<Core>, flush_duration: time::Duration) {
891    // init phase (acquiring locks)
892    let mut guard = core.lock.lock().unwrap_or_else(|e| e.into_inner());
893
894    // sync loop w/ non-busy waiting
895    loop {
896        (guard, _) = core.cv.wait_timeout(guard, flush_duration).unwrap_or_else(|e| e.into_inner());
897
898        // NOTE: we must read values of close brodcast before acquire exclusive lock, if done
899        // otherwise, we impose serious deadlock sort of situation for the the flusher tx
900
901        let dirty = core.dirty.swap(false, atomic::Ordering::AcqRel);
902        let closing = core.closed.load(atomic::Ordering::Acquire);
903
904        if !dirty {
905            if closing {
906                core.cv.notify_all();
907                return;
908            }
909
910            continue;
911        }
912
913        // INFO: we must acquire an exclusive IO lock for sync, hence no write, read or
914        // grow could kick in while sync is in progress
915
916        let io_lock = core.acquire_exclusive_io_lock();
917
918        // INFO: we must drop the guard before syscall, as its a blocking operation and holding
919        // the mutex while the syscall takes place is not a good idea, while we drop the mutex
920        // and acqurie it again, in-between other process could acquire it and use it
921        drop(guard);
922
923        // NOTE: We snapshot `current_epoch` before sync to establish a strict batch boundary,
924        // all writes up to `target_epoch` are guaranteed to be part of this flush window,
925        // while anything beyound belongs to the next batch
926        let target_epoch = core.completion.read_current_epoch();
927
928        // NOTE:
929        //
930        // - if sync fails, we update the Core::error w/ the received error object
931        // - we clear it up when another sync call succeeds
932        // - this is valid, as the underlying sync flushes entire mmaped region, hence
933        //   even if the last call failed, and the new one succeeded, we do get the durability
934        //   guarenty for the old data as well
935
936        if let Err(new_error) = core.sync() {
937            core.completion.set_err(new_error);
938        } else {
939            core.completion.mark_epoch_as_durable(target_epoch);
940            core.completion.del_err();
941        }
942
943        // NOTE: notify all listeners currently waiting for durability progress
944        core.completion.notify_all_listeners();
945
946        drop(io_lock);
947        guard = core.acquire_lock();
948    }
949}
950
951/// A context for grouping multi write ops into a single atomic operation
952///
953/// ## Overview
954///
955/// Use of [`FrozenMMapTransaction`] allows to group multiple write ops into a single atomic operation.
956///
957/// - All included writes are applied together
958/// - Single epoch is assinged for an entier transaction
959/// - Durability guarantee is same for all included write ops
960///
961/// ## Example
962///
963/// ```
964/// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
965///
966/// const MODULE_ID: u8 = 0;
967///
968/// let dir = tempfile::tempdir().unwrap();
969/// let path = dir.path().join("tmp_tx");
970///
971/// let cfg = FrozenMMapCfg {
972///     module_id: MODULE_ID,
973///     initial_count: 0x0A,
974///     immediate_durability: false,
975///     flush_duration: std::time::Duration::from_micros(50),
976/// };
977///
978/// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
979///
980/// let mut tx = mmap.new_tx();
981/// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
982/// unsafe { tx.write(1, |v| *v = 0x14) }.unwrap();
983/// unsafe { tx.write(2, |v| *v = 0x18) }.unwrap();
984///
985/// let ticket = tx.commit().unwrap();
986/// let _ = futures::executor::block_on(ticket).unwrap();
987///
988/// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
989/// let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
990/// let v2 = unsafe { mmap.read(2, |v| *v).unwrap() };
991///
992/// assert_eq!((v0, v1, v2), (0x0A, 0x14, 0x18));
993/// ```
994pub struct FrozenMMapTransaction<'a, T> {
995    core: &'a Core,
996    ops_vec: Vec<(usize, Box<dyn FnOnce(*mut T) + 'a>)>,
997}
998
999impl<'a, T> FrozenMMapTransaction<'a, T> {
1000    /// Append a write op into the [`FrozenMMapTransaction`]
1001    ///
1002    /// ## Requirements
1003    ///
1004    /// Write ops must follow these safety requirements,
1005    ///
1006    /// - No duplicate indices
1007    /// - No out-of-order writes (must be incremental)
1008    ///
1009    /// Violating this constraint would result in [`FrozenError`]
1010    ///
1011    /// ## Safety
1012    ///
1013    /// Same safety requirements as [`FrozenMMap::write`] apply here
1014    ///
1015    /// ## Example
1016    ///
1017    /// ```
1018    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
1019    ///
1020    /// const MODULE_ID: u8 = 0;
1021    ///
1022    /// let dir = tempfile::tempdir().unwrap();
1023    /// let path = dir.path().join("tmp_tx");
1024    ///
1025    /// let cfg = FrozenMMapCfg {
1026    ///     module_id: MODULE_ID,
1027    ///     initial_count: 0x10,
1028    ///     immediate_durability: false,
1029    ///     flush_duration: std::time::Duration::from_micros(50),
1030    /// };
1031    ///
1032    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1033    ///
1034    /// let mut tx = mmap.new_tx();
1035    /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
1036    /// unsafe { tx.write(1, |v| *v = 0x0B) }.unwrap();
1037    /// unsafe { tx.write(2, |v| *v = 0x0C) }.unwrap();
1038    ///
1039    /// let ticket = tx.commit().unwrap();
1040    /// let _ = futures::executor::block_on(ticket).unwrap();
1041    ///
1042    /// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1043    /// let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
1044    /// let v2 = unsafe { mmap.read(2, |v| *v).unwrap() };
1045    ///
1046    /// assert_eq!((v0, v1, v2), (0x0A, 0x0B, 0x0C));
1047    /// ```
1048    #[inline(always)]
1049    pub unsafe fn write<F>(&mut self, index: usize, f: F) -> FrozenResult<()>
1050    where
1051        F: FnOnce(*mut T) + 'a,
1052    {
1053        // NOTE:
1054        //
1055        // This check prevents a potential footgun! For a safe transaction all writes must be,
1056        // - ordered by index (either incr or decr)
1057        // - no multi writes on same index
1058        //
1059        // If any of these is violated, there is a certain risk of deadlock in multi tx env's
1060        if let Some((last_idx, _)) = self.ops_vec.last() {
1061            if index <= *last_idx {
1062                return err::new_err(
1063                    err::HCF,
1064                    "tx writes must be strictly ordered, with no more then single ops on given index",
1065                );
1066            }
1067        }
1068
1069        self.ops_vec.push((index, Box::new(f)));
1070        Ok(())
1071    }
1072
1073    /// Commit the transaction, applying all the writes ops, combined into a single atomic operation
1074    ///
1075    /// ## Guarantees
1076    ///
1077    /// - All writes are applied under a single epoch
1078    /// - All writes belong to the same durability batch
1079    /// - No interleaving with other transactions at epoch level
1080    ///
1081    /// ## Example
1082    ///
1083    /// ```
1084    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
1085    ///
1086    /// const MODULE_ID: u8 = 0;
1087    ///
1088    /// let dir = tempfile::tempdir().unwrap();
1089    /// let path = dir.path().join("tmp_tx");
1090    ///
1091    /// let cfg = FrozenMMapCfg {
1092    ///     module_id: MODULE_ID,
1093    ///     initial_count: 0x10,
1094    ///     immediate_durability: false,
1095    ///     flush_duration: std::time::Duration::from_micros(50),
1096    /// };
1097    ///
1098    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1099    ///
1100    /// let mut tx = mmap.new_tx();
1101    /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
1102    /// unsafe { tx.write(2, |v| *v = 0x0C) }.unwrap();
1103    ///
1104    /// let ticket = tx.commit().unwrap();
1105    /// let _ = futures::executor::block_on(ticket).unwrap();
1106    ///
1107    /// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1108    /// let v1 = unsafe { mmap.read(2, |v| *v).unwrap() };
1109    ///
1110    /// assert_eq!((v0, v1), (0x0A, 0x0C));
1111    /// ```
1112    #[inline(always)]
1113    pub fn commit(self) -> FrozenResult<ack::AckTicket> {
1114        if let Some(err) = self.core.completion.get_err() {
1115            return Err(err);
1116        }
1117
1118        let _guard = self.core.acquire_io_lock();
1119
1120        // NOTE: we must acquire all locks beforehand, to make sure all the write go through
1121        let mut guards = Vec::with_capacity(self.ops_vec.len());
1122        for (idx, _) in &self.ops_vec {
1123            guards.push(self.core.locks.lock(*idx));
1124        }
1125
1126        for (idx, op) in self.ops_vec {
1127            let offset = idx * std::mem::size_of::<T>();
1128            let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
1129            op(ptr);
1130        }
1131
1132        self.core.dirty.store(true, atomic::Ordering::Release);
1133        let epoch = self.core.completion.increment_current_epoch();
1134
1135        Ok(ack::AckTicket::new(epoch, self.core.completion.clone()))
1136    }
1137}
1138
1139#[derive(Debug)]
1140struct Core {
1141    completion: sync::Arc<ack::Completion>,
1142    cv: sync::Condvar,
1143    curr_length: usize,
1144    dirty: atomic::AtomicBool,
1145    io_lock: sync::RwLock<()>,
1146    map: TMap,
1147    locks: Locks,
1148    lock: sync::Mutex<()>,
1149    file: FrozenFile,
1150    durable_cv: sync::Condvar,
1151    closed: atomic::AtomicBool,
1152}
1153
1154unsafe impl Send for Core {}
1155unsafe impl Sync for Core {}
1156
1157impl Core {
1158    fn new(map: TMap, file: FrozenFile, curr_length: usize, total_slots: usize) -> Self {
1159        Self {
1160            map,
1161            file,
1162            curr_length,
1163            completion: sync::Arc::new(ack::Completion::default()),
1164            cv: sync::Condvar::new(),
1165            lock: sync::Mutex::new(()),
1166            io_lock: sync::RwLock::new(()),
1167            locks: Locks::new(total_slots),
1168            durable_cv: sync::Condvar::new(),
1169            dirty: atomic::AtomicBool::new(false),
1170            closed: atomic::AtomicBool::new(false),
1171        }
1172    }
1173
1174    #[inline]
1175    fn sync(&self) -> FrozenResult<()> {
1176        unsafe { self.map.sync(self.curr_length) }?;
1177        self.file.sync()
1178    }
1179
1180    /// NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
1181    #[inline]
1182    fn acquire_lock(&self) -> sync::MutexGuard<'_, ()> {
1183        self.lock.lock().unwrap_or_else(|e| e.into_inner())
1184    }
1185
1186    /// NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
1187    #[inline]
1188    fn acquire_io_lock(&self) -> sync::RwLockReadGuard<'_, ()> {
1189        self.io_lock.read().unwrap_or_else(|e| e.into_inner())
1190    }
1191
1192    /// NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
1193    #[inline]
1194    fn acquire_exclusive_io_lock(&self) -> sync::RwLockWriteGuard<'_, ()> {
1195        self.io_lock.write().unwrap_or_else(|e| e.into_inner())
1196    }
1197}
1198
1199#[derive(Debug)]
1200struct Locks(Box<[atomic::AtomicU8]>);
1201
1202impl Locks {
1203    const LOCK: u8 = 1;
1204    const UNLOCK: u8 = 0;
1205
1206    const L1_CONTENTION: usize = 0x10;
1207    const L2_CONTENTION: usize = 0x20;
1208
1209    fn new(cap: usize) -> Self {
1210        let mut slots = Vec::with_capacity(cap);
1211        for _ in 0..cap {
1212            slots.push(atomic::AtomicU8::new(Self::UNLOCK));
1213        }
1214
1215        Self(slots.into_boxed_slice())
1216    }
1217
1218    #[inline(always)]
1219    fn lock(&self, index: usize) -> LockGuard<'_> {
1220        let val = &self.0[index];
1221        let mut spins = 0;
1222
1223        loop {
1224            if val
1225                .compare_exchange_weak(
1226                    Self::UNLOCK,
1227                    Self::LOCK,
1228                    atomic::Ordering::Acquire,
1229                    atomic::Ordering::Relaxed,
1230                )
1231                .is_ok()
1232            {
1233                return LockGuard(val);
1234            }
1235
1236            // we must backoff under contention
1237
1238            // NOTE:
1239            //
1240            // We have three levels of backoff,
1241            //
1242            // 1. Busy waiting w/ CPU hint
1243            // 2. Willingly allow preemption by OS schedular
1244            // 3. Sleep the thread (increasing w/ exponenial factor)
1245
1246            if hints::likely(spins < Self::L1_CONTENTION) {
1247                std::hint::spin_loop();
1248            } else if spins < Self::L2_CONTENTION {
1249                thread::yield_now();
1250            } else {
1251                let ns = 0x30 << (spins - Self::L2_CONTENTION).min(0x0A);
1252                thread::sleep(time::Duration::from_nanos(ns));
1253            }
1254
1255            spins += 1;
1256        }
1257    }
1258}
1259
1260struct LockGuard<'a>(&'a atomic::AtomicU8);
1261
1262impl Drop for LockGuard<'_> {
1263    fn drop(&mut self) {
1264        self.0.store(Locks::UNLOCK, atomic::Ordering::Release);
1265    }
1266}
1267
1268#[cfg(test)]
1269mod tests {
1270    use super::*;
1271
1272    // NOTE: we keep this small on purpose, so we won't have to wait at all in tests
1273    const FLUSH_DURATION: time::Duration = time::Duration::from_micros(10);
1274
1275    const MODULE_ID: u8 = 0;
1276    const INIT_SLOTS: usize = 0x0A;
1277
1278    fn new_tmp() -> (tempfile::TempDir, std::path::PathBuf, FrozenMMapCfg) {
1279        let dir = tempfile::tempdir().unwrap();
1280        let path = dir.path().join("tmp_map");
1281
1282        let cfg = FrozenMMapCfg {
1283            module_id: MODULE_ID,
1284            initial_count: INIT_SLOTS,
1285            immediate_durability: false,
1286            flush_duration: FLUSH_DURATION,
1287        };
1288
1289        (dir, path, cfg)
1290    }
1291
1292    mod fm_lifecycle {
1293        use super::*;
1294
1295        #[test]
1296        fn ok_new() {
1297            let (_dir, path, cfg) = new_tmp();
1298            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1299
1300            assert!(!mmap.core.dirty.load(atomic::Ordering::Acquire));
1301            assert!(!mmap.core.closed.load(atomic::Ordering::Acquire));
1302            assert_eq!(mmap.core.curr_length, INIT_SLOTS * FrozenMMap::<u64>::SLOT_SIZE);
1303
1304            assert_eq!(mmap.core.completion.read_current_epoch(), 0);
1305            assert_eq!(mmap.core.completion.read_durable_epoch(), 0);
1306
1307            // satisfies the bg thread was spawned correctly
1308            assert!(mmap.core.completion.get_err().is_none());
1309
1310            // satisfies wait on epoch works
1311            let ticket = unsafe { mmap.write(0, |f| *f = 0x0A).unwrap() };
1312
1313            let ticket_epoch = ticket.epoch();
1314            let durable_epoch = futures::executor::block_on(ticket).unwrap();
1315
1316            assert!(durable_epoch >= ticket_epoch);
1317        }
1318
1319        #[test]
1320        fn ok_new_existing() {
1321            let (_dir, path, cfg) = new_tmp();
1322
1323            // create new + close
1324            let mmap1 = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1325            drop(mmap1);
1326
1327            // open existing
1328            let mmap2 = FrozenMMap::<u64>::new(path, cfg).unwrap();
1329            drop(mmap2);
1330        }
1331
1332        #[test]
1333        fn err_new_when_change_in_cfg() {
1334            let (_dir, path, mut cfg) = new_tmp();
1335
1336            // create new + close
1337            let mmap1 = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1338            drop(mmap1);
1339
1340            // update cfg + opne existing
1341            cfg.initial_count = INIT_SLOTS * 2;
1342            assert!(FrozenMMap::<u64>::new(path, cfg).is_err());
1343        }
1344
1345        #[test]
1346        fn ok_delete() {
1347            let (_dir, path, cfg) = new_tmp();
1348            let mut mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1349
1350            mmap.delete().unwrap();
1351            assert!(!mmap.core.file.exists().unwrap());
1352        }
1353
1354        #[test]
1355        fn err_delete_after_delete() {
1356            let (_dir, path, cfg) = new_tmp();
1357            let mut mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1358
1359            mmap.delete().unwrap();
1360            assert!(!mmap.core.file.exists().unwrap());
1361            assert!(mmap.delete().is_err());
1362        }
1363
1364        #[test]
1365        fn ok_drop_persists_when_dropped_before_bg_flush() {
1366            let (_dir, path, cfg) = new_tmp();
1367            const VAL: u64 = 0x0A;
1368
1369            {
1370                let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1371                unsafe { mmap.write(0, |byte| *byte = VAL).unwrap() };
1372                drop(mmap);
1373            }
1374
1375            {
1376                let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1377                let val = unsafe { mmap.read(0, |byte| *byte).unwrap() };
1378                assert_eq!(val, VAL);
1379            }
1380        }
1381    }
1382
1383    mod fm_validate_t {
1384        use super::*;
1385
1386        #[repr(C, align(8))]
1387        struct OkT {
1388            a: u64,
1389            b: u64,
1390        }
1391
1392        #[repr(C)]
1393        struct BadAlignT {
1394            a: u32,
1395            b: u32,
1396        }
1397
1398        #[repr(C, align(4))]
1399        struct BadSizeT {
1400            a: u32,
1401            b: u16,
1402        }
1403
1404        #[repr(C, align(8))]
1405        struct DropT(u64);
1406
1407        impl Drop for DropT {
1408            fn drop(&mut self) {}
1409        }
1410
1411        #[repr(C, align(8))]
1412        struct ZstT;
1413
1414        #[test]
1415        fn ok_validate_t() {
1416            assert!(FrozenMMap::<OkT>::validate_t().is_ok());
1417        }
1418
1419        #[test]
1420        fn err_validate_t_when_drop() {
1421            assert!(FrozenMMap::<DropT>::validate_t().is_err());
1422        }
1423
1424        #[test]
1425        fn err_validate_t_when_not_8_byte_aligned() {
1426            assert!(FrozenMMap::<BadAlignT>::validate_t().is_err());
1427        }
1428
1429        #[test]
1430        fn err_validate_t_when_size_not_multiple_of_8() {
1431            assert!(FrozenMMap::<BadSizeT>::validate_t().is_err());
1432        }
1433
1434        #[test]
1435        fn err_validate_t_when_zero_sized() {
1436            assert!(FrozenMMap::<ZstT>::validate_t().is_err());
1437        }
1438
1439        #[test]
1440        fn err_new_when_t_implements_drop() {
1441            let (_dir, path, cfg) = new_tmp();
1442            assert!(FrozenMMap::<DropT>::new(path, cfg).is_err());
1443        }
1444
1445        #[test]
1446        fn err_new_when_t_is_not_8_byte_aligned() {
1447            let (_dir, path, cfg) = new_tmp();
1448            assert!(FrozenMMap::<BadAlignT>::new(path, cfg).is_err());
1449        }
1450
1451        #[test]
1452        fn err_new_when_t_size_is_not_multiple_of_8() {
1453            let (_dir, path, cfg) = new_tmp();
1454            assert!(FrozenMMap::<BadSizeT>::new(path, cfg).is_err());
1455        }
1456
1457        #[test]
1458        fn err_new_when_t_is_zero_sized() {
1459            let (_dir, path, cfg) = new_tmp();
1460            assert!(FrozenMMap::<ZstT>::new(path, cfg).is_err());
1461        }
1462
1463        #[test]
1464        fn err_new_grown_when_t_implements_drop() {
1465            let (_dir, path, cfg) = new_tmp();
1466            assert!(FrozenMMap::<DropT>::new_grown(path, cfg, 1).is_err());
1467        }
1468
1469        #[test]
1470        fn err_new_grown_when_t_is_not_8_byte_aligned() {
1471            let (_dir, path, cfg) = new_tmp();
1472            assert!(FrozenMMap::<BadAlignT>::new_grown(path, cfg, 1).is_err());
1473        }
1474
1475        #[test]
1476        fn err_new_grown_when_t_size_is_not_multiple_of_8() {
1477            let (_dir, path, cfg) = new_tmp();
1478            assert!(FrozenMMap::<BadSizeT>::new_grown(path, cfg, 1).is_err());
1479        }
1480
1481        #[test]
1482        fn err_new_grown_when_t_is_zero_sized() {
1483            let (_dir, path, cfg) = new_tmp();
1484            assert!(FrozenMMap::<ZstT>::new_grown(path, cfg, 1).is_err());
1485        }
1486    }
1487
1488    mod fm_new_grown {
1489        use super::*;
1490
1491        #[test]
1492        fn ok_new_grown_updates_length() {
1493            let (_dir, path, cfg) = new_tmp();
1494
1495            let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1496            assert_eq!(mmap.total_slots(), INIT_SLOTS);
1497            drop(mmap);
1498
1499            let mmap = FrozenMMap::<u64>::new_grown(path, cfg, 0x0A).unwrap();
1500            assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x0A);
1501            assert_eq!(mmap.core.curr_length, (INIT_SLOTS + 0x0A) * FrozenMMap::<u64>::SLOT_SIZE);
1502        }
1503
1504        #[test]
1505        fn err_new_grown_with_preexisting_instance() {
1506            let (_dir, path, cfg) = new_tmp();
1507
1508            let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1509            assert_eq!(mmap.total_slots(), INIT_SLOTS);
1510
1511            assert!(FrozenMMap::<u64>::new_grown(path, cfg, 0x0A).is_err());
1512        }
1513
1514        #[test]
1515        fn ok_new_grown_cycle() {
1516            let (_dir, path, cfg) = new_tmp();
1517
1518            let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1519            drop(mmap);
1520
1521            let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x100).unwrap();
1522            assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x100);
1523            drop(mmap);
1524
1525            let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x100).unwrap();
1526            assert_eq!(mmap.total_slots(), INIT_SLOTS + (2 * 0x100));
1527            drop(mmap);
1528
1529            let mmap = FrozenMMap::<u64>::new_grown(path, cfg, 0x100).unwrap();
1530            assert_eq!(mmap.total_slots(), INIT_SLOTS + (3 * 0x100));
1531        }
1532
1533        #[test]
1534        fn ok_write_reopen_grown_read() {
1535            let (_dir, path, cfg) = new_tmp();
1536
1537            {
1538                let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1539                unsafe { mmap.write(0, |v| *v = 0xAA).unwrap() };
1540            }
1541
1542            {
1543                let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x10).unwrap();
1544                unsafe { mmap.write(0, |v| *v = 0xBB).unwrap() };
1545
1546                let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1547                assert_eq!(val, 0xBB);
1548            }
1549        }
1550
1551        #[test]
1552        fn ok_write_reopen_grown_read_cycle() {
1553            let (_dir, path, cfg) = new_tmp();
1554
1555            {
1556                let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1557                unsafe { mmap.write(0, |v| *v = 1).unwrap() };
1558            }
1559
1560            for i in 0..2 {
1561                let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x10).unwrap();
1562                let idx = mmap.total_slots() - 1;
1563                unsafe { mmap.write(idx, |v| *v = (i + 2) as u64).unwrap() };
1564                drop(mmap);
1565            }
1566
1567            let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1568
1569            let base = unsafe { mmap.read(0, |v| *v).unwrap() };
1570            assert_eq!(base, 1);
1571
1572            let last_idx = mmap.total_slots() - 1;
1573            let last = unsafe { mmap.read(last_idx, |v| *v).unwrap() };
1574            assert_eq!(last, 3);
1575        }
1576
1577        #[test]
1578        fn err_new_grown_while_previous_instance_is_alive() {
1579            let (_dir, path, cfg) = new_tmp();
1580
1581            let _mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1582            let reopened = FrozenMMap::<u64>::new_grown(&path, cfg, 0x10);
1583
1584            assert!(reopened.is_err());
1585        }
1586    }
1587
1588    mod fm_write_read {
1589        use super::*;
1590
1591        #[test]
1592        fn ok_write_wait_read_cycle() {
1593            const VAL: u64 = 0xDEADC0DE;
1594            let (_dir, path, cfg) = new_tmp();
1595            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1596
1597            // write + sync
1598            let ticket = unsafe { mmap.write(0, |ptr| *ptr = VAL).unwrap() };
1599
1600            let ticket_epoch = ticket.epoch();
1601            let durable_epoch = futures::executor::block_on(ticket).unwrap();
1602
1603            assert!(durable_epoch >= ticket_epoch);
1604
1605            // read + verify
1606            let val = unsafe { mmap.read(0, |ptr| *ptr).unwrap() };
1607            assert_eq!(val, VAL);
1608        }
1609
1610        #[test]
1611        fn ok_write_read_without_wait() {
1612            const VAL: u64 = 0xDEADC0DE;
1613            let (_dir, path, cfg) = new_tmp();
1614            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1615
1616            unsafe { mmap.write(0, |ptr| *ptr = VAL).unwrap() };
1617            let val = unsafe { mmap.read(0, |ptr| *ptr).unwrap() };
1618            assert_eq!(val, VAL);
1619        }
1620    }
1621
1622    mod fm_tx {
1623        use super::*;
1624
1625        #[test]
1626        fn ok_tx_basic_multi_write() {
1627            let (_dir, path, cfg) = new_tmp();
1628            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1629
1630            let mut tx = mmap.new_tx();
1631            unsafe {
1632                tx.write(0, |v| *v = 1).unwrap();
1633                tx.write(1, |v| *v = 2).unwrap();
1634                tx.write(2, |v| *v = 3).unwrap();
1635            }
1636
1637            let ticket = tx.commit().unwrap();
1638
1639            let ticket_epoch = ticket.epoch();
1640            let durable_epoch = futures::executor::block_on(ticket).unwrap();
1641
1642            assert!(durable_epoch >= ticket_epoch);
1643
1644            let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1645            let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
1646            let v2 = unsafe { mmap.read(2, |v| *v).unwrap() };
1647
1648            assert_eq!((v0, v1, v2), (1, 2, 3));
1649        }
1650
1651        #[test]
1652        fn ok_tx_single_epoch() {
1653            let (_dir, path, cfg) = new_tmp();
1654            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1655
1656            let mut tx = mmap.new_tx();
1657            unsafe {
1658                tx.write(0, |v| *v = 0x0A).unwrap();
1659                tx.write(1, |v| *v = 0x14).unwrap();
1660            }
1661
1662            // NOTE: next write must have strictly higher epoch then current one
1663
1664            let ticket = tx.commit().unwrap();
1665            let next_ticket = unsafe { mmap.write(2, |v| *v = 0x1E).unwrap() };
1666
1667            assert!(next_ticket.epoch() > ticket.epoch());
1668        }
1669
1670        #[test]
1671        fn err_tx_out_of_order() {
1672            let (_dir, path, cfg) = new_tmp();
1673            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1674
1675            let mut tx = mmap.new_tx();
1676            unsafe {
1677                tx.write(2, |v| *v = 1).unwrap();
1678                let res = tx.write(1, |v| *v = 2);
1679                assert!(res.is_err());
1680            }
1681        }
1682
1683        #[test]
1684        fn err_tx_duplicate_index() {
1685            let (_dir, path, cfg) = new_tmp();
1686            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1687
1688            let mut tx = mmap.new_tx();
1689            unsafe {
1690                tx.write(1, |v| *v = 1).unwrap();
1691                let res = tx.write(1, |v| *v = 2);
1692                assert!(res.is_err());
1693            }
1694        }
1695
1696        #[test]
1697        fn ok_tx_concurrent_non_overlapping() {
1698            let (_dir, path, cfg) = new_tmp();
1699            let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
1700
1701            let mut handles = Vec::new();
1702            for i in 0..2 {
1703                let mmap = mmap.clone();
1704                handles.push(thread::spawn(move || {
1705                    let mut tx = mmap.new_tx();
1706
1707                    unsafe {
1708                        tx.write(i * 2, |v| *v = i as u64).unwrap();
1709                        tx.write(i * 2 + 1, |v| *v = i as u64).unwrap();
1710                    }
1711
1712                    let ticket = tx.commit().unwrap();
1713
1714                    let ticket_epoch = ticket.epoch();
1715                    let durable_epoch = futures::executor::block_on(ticket).unwrap();
1716
1717                    assert!(durable_epoch >= ticket_epoch);
1718                }));
1719            }
1720
1721            for h in handles {
1722                h.join().unwrap();
1723            }
1724
1725            for i in 0..2 {
1726                let v0 = unsafe { mmap.read(i * 2, |v| *v).unwrap() };
1727                let v1 = unsafe { mmap.read(i * 2 + 1, |v| *v).unwrap() };
1728
1729                assert_eq!((v0, v1), (i as u64, i as u64));
1730            }
1731        }
1732
1733        #[test]
1734        fn ok_tx_overwrite_last_wins() {
1735            let (_dir, path, cfg) = new_tmp();
1736            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1737
1738            let mut tx = mmap.new_tx();
1739            unsafe {
1740                tx.write(0, |v| *v = 1).unwrap();
1741            }
1742
1743            tx.commit().unwrap();
1744
1745            let mut tx2 = mmap.new_tx();
1746            unsafe {
1747                tx2.write(0, |v| *v = 2).unwrap();
1748            }
1749
1750            let ticket = tx2.commit().unwrap();
1751
1752            let ticket_epoch = ticket.epoch();
1753            let durable_epoch = futures::executor::block_on(ticket).unwrap();
1754
1755            assert!(durable_epoch >= ticket_epoch);
1756
1757            let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1758            assert_eq!(val, 2);
1759        }
1760
1761        #[test]
1762        fn ok_tx_persists_across_reopen() {
1763            let (_dir, path, cfg) = new_tmp();
1764
1765            {
1766                let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1767
1768                let mut tx = mmap.new_tx();
1769                unsafe {
1770                    tx.write(0, |v| *v = 0x3A).unwrap();
1771                    tx.write(1, |v| *v = 0x54).unwrap();
1772                }
1773
1774                let ticket = tx.commit().unwrap();
1775
1776                let ticket_epoch = ticket.epoch();
1777                let durable_epoch = futures::executor::block_on(ticket).unwrap();
1778
1779                assert!(durable_epoch >= ticket_epoch);
1780            }
1781
1782            {
1783                let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1784
1785                let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1786                let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
1787
1788                assert_eq!((v0, v1), (0x3A, 0x54));
1789            }
1790        }
1791    }
1792
1793    mod fm_durability {
1794        use super::*;
1795
1796        #[test]
1797        fn ok_wait_then_drop() {
1798            let (_dir, path, cfg) = new_tmp();
1799            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1800
1801            let ticket = unsafe { mmap.write(0, |v| *v = 7).unwrap() };
1802
1803            let ticket_epoch = ticket.epoch();
1804            let durable_epoch = futures::executor::block_on(ticket).unwrap();
1805
1806            assert!(durable_epoch >= ticket_epoch);
1807
1808            drop(mmap);
1809        }
1810
1811        #[test]
1812        fn ok_epoch_monotonicity() {
1813            let (_dir, path, cfg) = new_tmp();
1814            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1815
1816            let t1 = unsafe { mmap.write(0, |v| *v = 1).unwrap() };
1817            let t2 = unsafe { mmap.write(0, |v| *v = 2).unwrap() };
1818
1819            let durable_epoch = futures::executor::block_on(t2).unwrap();
1820            assert!(durable_epoch >= t1.epoch());
1821        }
1822
1823        #[test]
1824        fn ok_wait_for_durability_with_multi_writers() {
1825            let (_dir, path, cfg) = new_tmp();
1826            let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
1827
1828            let mut handles = Vec::new();
1829            for _ in 0..2 {
1830                let mmap = mmap.clone();
1831                handles.push(thread::spawn(move || {
1832                    let ticket = unsafe { mmap.write(0, |v| *v += 1).unwrap() };
1833
1834                    let ticket_epoch = ticket.epoch();
1835                    let durable_epoch = futures::executor::block_on(ticket).unwrap();
1836
1837                    assert!(durable_epoch >= ticket_epoch);
1838                }));
1839            }
1840
1841            for h in handles {
1842                h.join().unwrap();
1843            }
1844
1845            let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1846            assert_eq!(val, 2);
1847        }
1848    }
1849
1850    mod fm_concurrency {
1851        use super::*;
1852
1853        #[test]
1854        fn ok_parallel_reads_with_diff_index() {
1855            let (_dir, path, cfg) = new_tmp();
1856            let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
1857
1858            unsafe { mmap.write(0, |v| *v = 0x10).unwrap() };
1859            unsafe { mmap.write(1, |v| *v = 0x20).unwrap() };
1860
1861            let t1 = {
1862                let mmap = mmap.clone();
1863                thread::spawn(move || unsafe { mmap.read(0, |v| *v).unwrap() })
1864            };
1865
1866            let t2 = {
1867                let mmap = mmap.clone();
1868                thread::spawn(move || unsafe { mmap.read(1, |v| *v).unwrap() })
1869            };
1870
1871            assert_eq!(t1.join().unwrap(), 0x10);
1872            assert_eq!(t2.join().unwrap(), 0x20);
1873        }
1874
1875        #[test]
1876        fn ok_multi_tx_drop_then_reopen_grown() {
1877            let (_dir, path, cfg) = new_tmp();
1878
1879            {
1880                let mmap = sync::Arc::new(FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap());
1881
1882                let mut handles = Vec::new();
1883                for i in 0..2u64 {
1884                    let mmap = mmap.clone();
1885                    handles.push(thread::spawn(move || {
1886                        let _ = unsafe { mmap.write(i as usize, |v| *v = i + 1).unwrap() };
1887                    }));
1888                }
1889
1890                for i in 0..2usize {
1891                    let mmap = mmap.clone();
1892                    handles.push(thread::spawn(move || {
1893                        let _ = unsafe { mmap.read(i, |v| *v).unwrap() };
1894                    }));
1895                }
1896
1897                for h in handles {
1898                    h.join().unwrap();
1899                }
1900            }
1901
1902            {
1903                let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x10).unwrap();
1904                assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x10);
1905
1906                for i in 0..2u64 {
1907                    let val = unsafe { mmap.read(i as usize, |v| *v).unwrap() };
1908                    assert_eq!(val, i + 1);
1909                }
1910
1911                let idx = mmap.total_slots() - 1;
1912                unsafe { mmap.write(idx, |v| *v = 0xDEAD).unwrap() };
1913
1914                let val = unsafe { mmap.read(idx, |v| *v).unwrap() };
1915                assert_eq!(val, 0xDEAD);
1916            }
1917        }
1918    }
1919
1920    mod ack_ticket {
1921        use super::*;
1922
1923        #[test]
1924        fn ok_parallel_waiters_same_epoch_window() {
1925            let (_dir, path, cfg) = new_tmp();
1926            let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
1927
1928            let barrier = sync::Arc::new(sync::Barrier::new(0x10));
1929
1930            let mut handles = Vec::new();
1931            for i in 0..0x10usize {
1932                let mmap = mmap.clone();
1933                let barrier = barrier.clone();
1934
1935                handles.push(thread::spawn(move || {
1936                    barrier.wait();
1937
1938                    let ticket = unsafe { mmap.write(i % INIT_SLOTS, |v| *v = i as u64).unwrap() };
1939
1940                    let epoch = ticket.epoch();
1941                    let durable = futures::executor::block_on(ticket).unwrap();
1942
1943                    assert!(durable >= epoch);
1944                }));
1945            }
1946
1947            for handle in handles {
1948                handle.join().expect("worker thread panicked");
1949            }
1950        }
1951
1952        #[test]
1953        fn ok_parallel_waiters_same_epoch() {
1954            let completion = sync::Arc::new(ack::Completion::default());
1955
1956            let mut handles = Vec::new();
1957            for _ in 0..0x10 {
1958                let completion = completion.clone();
1959
1960                handles.push(thread::spawn(move || {
1961                    let ticket = ack::AckTicket::new(1, completion);
1962
1963                    assert_eq!(
1964                        futures::executor::block_on(ticket).expect("ticket must complete"),
1965                        1
1966                    );
1967                }));
1968            }
1969
1970            thread::sleep(time::Duration::from_millis(0x0A));
1971
1972            completion.mark_epoch_as_durable(1);
1973            completion.notify_all_listeners();
1974
1975            for handle in handles {
1976                handle.join().expect("worker thread panicked");
1977            }
1978        }
1979    }
1980}