Skip to main content

frozen_core/fmmap/
mod.rs

1//! Custom implementation of `mmap(2)`
2//!
3//! ## Constraints
4//!
5//! [`FrozenMMap`] treats the mapped file as raw storage for values of `T`. Because of that,
6//! `T` must be a POD type which is safe to persist and later reinterpret from bytes.
7//!
8//! Required properties for `T`,
9//!
10//! - Must use `#[repr(C)]`
11//! - Should be 8-bytes aligned
12//! - Must not implement [`Drop`]
13//! - `size_of::<T>()` should be multiple of `8`
14//!
15//! *NOTE:* `T` must not contain heap owning or process-local pointers like [`Vec`], [`String`],
16//! [`Box`], references and function pointers, or other fields whose bit-pattern is not stable
17//! across reopen.
18//!
19//! These constrains are enforced as [`FrozenMMap`] does not serialize or deserialize values. It
20//! directly reads and writes `T` inside a memory mapped file. That means `T` must have a stable
21//! layout and must remain valid when the file is reopened in a later process.
22//!
23//! ## Example
24//!
25//! ```
26//! use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
27//!
28//! const MODULE_ID: u8 = 0;
29//!
30//! let dir = tempfile::tempdir().unwrap();
31//! let path = dir.path().join("tmp_frozen_mmap");
32//!
33//! let cfg = FrozenMMapCfg {
34//!     module_id: MODULE_ID,
35//!     initial_count: 0x0A,
36//!     flush_duration: std::time::Duration::from_micros(0x96),
37//! };
38//!
39//! let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
40//! assert_eq!(mmap.total_slots(), 0x0A);
41//!
42//! let ticket = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
43//! let _ = futures::executor::block_on(ticket).unwrap();
44//!
45//! let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
46//! assert_eq!(val, 0xDEADC0DE);
47//!
48//! drop(mmap);
49//!
50//! let reopened = FrozenMMap::<u64>::new_grown(&path, cfg, 0x05).unwrap();
51//! assert_eq!(reopened.total_slots(), 0x0A + 0x05);
52//!
53//! let val = unsafe { reopened.read(0, |v| *v) }.unwrap();
54//! assert_eq!(val, 0xDEADC0DE);
55//! ```
56
57#[cfg(any(target_os = "linux", target_os = "macos"))]
58mod posix;
59
60use crate::{
61    ack,
62    error::FrozenResult,
63    ffile::{FrozenFile, FrozenFileCfg},
64    hints,
65};
66use std::{
67    fmt,
68    sync::{self, atomic},
69    thread, time,
70};
71
72/// type for `epoch` used by write ops
73pub type TEpoch = u64;
74
75#[cfg(any(target_os = "linux", target_os = "macos"))]
76type TMap = posix::POSIXMMap;
77
78/// Error codes for [`FrozenMMap`]
79pub(in crate::fmmap) mod err {
80    use crate::error::{ErrCode, FrozenError, FrozenResult};
81
82    /// Domain Id for [`FrozenMMap`] is **18**
83    const ERRDOMAIN: u8 = 0x12;
84
85    /// module id used for [`FrozenMMap`]
86    pub static MID: std::sync::OnceLock<u8> = std::sync::OnceLock::new();
87
88    #[cfg(not(test))]
89    #[inline(always)]
90    pub fn mid() -> &'static u8 {
91        MID.get().unwrap()
92    }
93
94    #[cfg(test)]
95    #[inline(always)]
96    pub fn mid() -> &'static u8 {
97        MID.get_or_init(|| 0)
98    }
99
100    /// internal fuck up (hault and catch fire)
101    pub const HCF: ErrCode = ErrCode::new(0x02, "hault and catch fire");
102
103    /// unknown error (fallback)
104    pub const UNK: ErrCode = ErrCode::new(0x04, "unknown error");
105
106    /// no more memory available
107    pub const NMM: ErrCode = ErrCode::new(0x06, "not enough memory available on the device");
108
109    /// syncing error
110    pub const SYN: ErrCode = ErrCode::new(0x08, "failed to sync/flush data to storage device");
111
112    /// no write/read perm
113    pub const PRM: ErrCode = ErrCode::new(0x0A, "missing permissions for IO");
114
115    /// type `T` must not be zero sized
116    pub const ZRO: ErrCode = ErrCode::new(0x0C, "type T must not be zero sized");
117
118    /// flush_tx error (unable to spawn)
119    pub const FXE: ErrCode = ErrCode::new(0x10, "unable to spawn flush_tx");
120
121    /// type `T` implements drop
122    pub const DRP: ErrCode = ErrCode::new(0x12, "type T must not implement `Drop`");
123
124    /// type `T` is not 8 bytes aligned
125    pub const ALN: ErrCode = ErrCode::new(0x14, "type T must be 8-bytes aligned");
126
127    /// `size_of::<T>()` is not multiple of 8
128    pub const SZE: ErrCode = ErrCode::new(0x16, "`size_of::<T>()` must be multiple of 8 bytes");
129
130    #[inline]
131    pub(in crate::fmmap) fn new_err<R, E: std::fmt::Display>(
132        code: ErrCode,
133        error: E,
134    ) -> FrozenResult<R> {
135        let err = FrozenError::new_raw(*mid(), ERRDOMAIN, code, error);
136        Err(err)
137    }
138
139    #[inline]
140    pub(in crate::fmmap) fn new_err_default<R>(code: ErrCode) -> FrozenResult<R> {
141        let err = FrozenError::new_raw(*mid(), ERRDOMAIN, code, "");
142        Err(err)
143    }
144}
145
146/// Config for [`FrozenMMap`]
147#[derive(Debug, Clone)]
148pub struct FrozenMMapCfg {
149    /// Identifier used for error propagation by [`frozen_core::error::FrozenError`]
150    pub module_id: u8,
151
152    /// Number of slots to pre-allocate when [`FrozenMMap`] is initialized
153    ///
154    /// Each slot has size of [`FrozenMMap::<T>::SLOT_SIZE`], where initial file length will
155    /// be `chunk_size * initial_count` (bytes)
156    pub initial_count: usize,
157
158    /// Time interval used by flusher tx, to batch write ops into a durable window and sync them
159    /// together, where all write ops in certain time interval falls into a single durable window
160    pub flush_duration: time::Duration,
161}
162
163/// Custom implementation of `mmap(2)`
164///
165/// ## Constraints
166///
167/// [`FrozenMMap`] treats the mapped file as raw storage for values of `T`. Because of that, `T`
168/// must be a POD type which is safe to persist and later reinterpret from bytes.
169///
170/// Required properties for `T`,
171///
172/// - Must use `#[repr(C)]`
173/// - Should be 8-bytes aligned
174/// - Must not implement [`Drop`]
175/// - `size_of::<T>()` should be multiple of `8`
176///
177/// *NOTE:* `T` must not contain heap owning or process-local pointers like [`Vec`], [`String`],
178/// [`Box`], references and function pointers, or other fields whose bit-pattern is not stable
179/// across reopen.
180///
181/// These constrains are enforced as [`FrozenMMap`] does not serialize or deserialize values. It
182/// directly reads and writes `T` inside a memory mapped file. That means `T` must have a stable
183/// layout and must remain valid when the file is reopened in a later process.
184///
185/// ## Example
186///
187/// ```
188/// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
189///
190/// const MODULE_ID: u8 = 0;
191///
192/// let dir = tempfile::tempdir().unwrap();
193/// let path = dir.path().join("tmp_frozen_mmap");
194///
195/// let cfg = FrozenMMapCfg {
196///     module_id: MODULE_ID,
197///     initial_count: 0x0A,
198///     flush_duration: std::time::Duration::from_micros(0x96),
199/// };
200///
201/// let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
202/// assert_eq!(mmap.total_slots(), 0x0A);
203///
204/// let ticket = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
205/// let _ = futures::executor::block_on(ticket).unwrap();
206///
207/// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
208/// assert_eq!(val, 0xDEADC0DE);
209///
210/// drop(mmap);
211///
212/// let reopened = FrozenMMap::<u64>::new_grown(&path, cfg, 0x05).unwrap();
213/// assert_eq!(reopened.total_slots(), 0x0A + 0x05);
214///
215/// let val = unsafe { reopened.read(0, |v| *v) }.unwrap();
216/// assert_eq!(val, 0xDEADC0DE);
217/// ```
218#[derive(Debug)]
219pub struct FrozenMMap<T>
220where
221    T: Sized + Send + Sync,
222{
223    core: sync::Arc<Core>,
224    flush_tx_handle: Option<thread::JoinHandle<()>>,
225    _t: core::marker::PhantomData<T>,
226}
227
228unsafe impl<T> Send for FrozenMMap<T> where T: Sized + Send + Sync {}
229unsafe impl<T> Sync for FrozenMMap<T> where T: Sized + Send + Sync {}
230
231impl<T> FrozenMMap<T>
232where
233    T: Sized + Send + Sync,
234{
235    /// Memory space required for each slot of [`T`] in [`FrozenMMap`]
236    pub const SLOT_SIZE: usize = std::mem::size_of::<T>();
237
238    /// Create a new [`FrozenMMap`] instance w/ given [`FrozenMMapCfg`]
239    ///
240    /// ## Multiple Instances
241    ///
242    /// For each [`FrozenMMap`] instance, we acquire an exclusive lock from the kernal for the
243    /// underlying [`FrozenFile`], when trying to create multiple instances of [`FrozenMMap`], an
244    /// error will be thrown.
245    ///
246    /// ## Capacity Growth
247    ///
248    /// [`FrozenMMap`] does not support in-place growth of a live mapping, to increase capacity,
249    /// drop the current instance and reopen w/ [`FrozenMMap::open_grown`] which provides memory
250    /// mapping over grown capacity.
251    ///
252    /// ## [`FrozenMMapCfg`]
253    ///
254    /// All configs for [`FrozenMMap`] are stored in [`FrozenMMapCfg`]
255    ///
256    /// ## Working
257    ///
258    /// We first create a new [`FrozenFile`] if note already, then map the entire file using
259    /// `mmap(2)`, the entire file must read/write `T`, which also should stay constant for the
260    /// entire lifetime of file.
261    ///
262    /// ## Important
263    ///
264    /// The `cfg` must not change any of its properties for the entire life of [`FrozenFile`],
265    /// which is used under the hood, one must use config stores like [`Rta`](https://crates.io/crates/rta)
266    /// to store config.
267    ///
268    /// ## Example
269    ///
270    /// ```
271    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
272    ///
273    /// const MODULE_ID: u8 = 0;
274    ///
275    /// let dir = tempfile::tempdir().unwrap();
276    /// let path = dir.path().join("tmp_frozen_mmap");
277    ///
278    /// let cfg = FrozenMMapCfg {
279    ///     module_id: MODULE_ID,
280    ///     initial_count: 0x0A,
281    ///     flush_duration: std::time::Duration::from_micros(0x96),
282    /// };
283    ///
284    /// let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
285    /// assert_eq!(mmap.total_slots(), 0x0A);
286    ///
287    /// let ticket = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
288    /// let _ = futures::executor::block_on(ticket).unwrap();
289    ///
290    /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
291    /// assert_eq!(val, 0xDEADC0DE);
292    /// ```
293    pub fn new<P: AsRef<std::path::Path>>(path: P, cfg: FrozenMMapCfg) -> FrozenResult<Self> {
294        Self::validate_t()?;
295        let (file, curr_length) = Self::open_file(path.as_ref().to_path_buf(), &cfg)?;
296        let total_slots = curr_length / Self::SLOT_SIZE;
297
298        // NOTE: The value is used for error logging and is initialized only once, as `OnceLock`
299        // guarantees that the first caller sets the value and all subsequent calls reuse it
300        let _ = err::MID.get_or_init(|| cfg.module_id);
301
302        let mmap = unsafe { TMap::new(file.fd(), curr_length) }?;
303        let core = sync::Arc::new(Core::new(mmap, file, curr_length, total_slots));
304
305        let cloned_core = sync::Arc::clone(&core);
306        let flush_tx_handle = match thread::Builder::new()
307            .name(format!("mod{}_fmmap_flush_tx", cfg.module_id))
308            .spawn(move || bg_flush_thread(cloned_core, cfg.flush_duration))
309        {
310            Ok(handle) => Some(handle),
311            Err(observed_error) => {
312                return err::new_err(err::FXE, observed_error);
313            }
314        };
315
316        Ok(Self { core, flush_tx_handle, _t: core::marker::PhantomData })
317    }
318
319    /// Create a new [`FrozenMMap`] instance w/ given [`FrozenMMapCfg`], while growing the
320    /// underlying [`FrozenFile`] by `additional_slots` before creating memory mapping
321    ///
322    /// ## Multiple Instances
323    ///
324    /// For each [`FrozenMMap`] instance, we acquire an exclusive lock from the kernal for the
325    /// underlying [`FrozenFile`], when trying to create multiple instances of [`FrozenMMap`], an
326    /// error will be thrown.
327    ///
328    /// ## Why not create a [`FrozenMMap::grow`] call?
329    ///
330    /// Previously when [`FrozenMMap::grow`] was attempted, it was observed that, resizing an active
331    /// memory mapping in place is tricky in concurrent code, as some threads would still hold
332    /// stale/unmapped pointers to mmap due to preemption from the OS schedular
333    ///
334    /// So, instead of remmaping a live instance, the current API performs growth during open,
335    /// making capacity expansion an explicit lifecycle operation, and not a side effect on a live
336    /// instance.
337    ///
338    /// ## [`FrozenMMapCfg`]
339    ///
340    /// All configs for [`FrozenMMap`] are stored in [`FrozenMMapCfg`]
341    ///
342    /// ## Working
343    ///
344    /// We first create a new [`FrozenFile`] if note already, then we grow the file using
345    /// [`FrozenFile::grow`] then map the entire file using `mmap(2)`, the entire file must
346    /// read/write `T`, which also should stay constant for the entire lifetime of file.
347    ///
348    /// ## Important
349    ///
350    /// The `cfg` must not change any of its properties for the entire life of [`FrozenFile`],
351    /// which is used under the hood, one must use config stores like
352    /// [`Rta`](https://crates.io/crates/rta) to store config.
353    ///
354    /// ## Example
355    ///
356    /// ```
357    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
358    ///
359    /// const MODULE_ID: u8 = 0;
360    ///
361    /// let dir = tempfile::tempdir().unwrap();
362    /// let path = dir.path().join("tmp_frozen_mmap");
363    ///
364    /// let cfg = FrozenMMapCfg {
365    ///     module_id: MODULE_ID,
366    ///     initial_count: 0x0A,
367    ///     flush_duration: std::time::Duration::from_micros(0x96),
368    /// };
369    ///
370    /// let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x0A).unwrap();
371    /// assert_eq!(mmap.total_slots(), 0x0A * 2);
372    ///
373    /// let ticket = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
374    /// let _ = futures::executor::block_on(ticket).unwrap();
375    ///
376    /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
377    /// assert_eq!(val, 0xDEADC0DE);
378    /// ```
379    pub fn new_grown<P: AsRef<std::path::Path>>(
380        path: P,
381        cfg: FrozenMMapCfg,
382        additional_slots: usize,
383    ) -> FrozenResult<Self> {
384        Self::validate_t()?;
385        let (file, _) = Self::open_file(path.as_ref().to_path_buf(), &cfg)?;
386
387        // we grow the underlying FrozenFile as requested
388        file.grow(additional_slots)?;
389        let curr_length = file.length()?; // we must read the updated (grown) length
390        let total_slots = curr_length / Self::SLOT_SIZE;
391
392        // NOTE: The value is used for error logging and is initialized only once, as `OnceLock`
393        // guarantees that the first caller sets the value and all subsequent calls reuse it
394        let _ = err::MID.get_or_init(|| cfg.module_id);
395
396        let mmap = unsafe { TMap::new(file.fd(), curr_length) }?;
397        let core = sync::Arc::new(Core::new(mmap, file, curr_length, total_slots));
398
399        let cloned_core = sync::Arc::clone(&core);
400        let flush_tx_handle = match thread::Builder::new()
401            .name(format!("mod{}_fmmap_flush_tx", cfg.module_id))
402            .spawn(move || bg_flush_thread(cloned_core, cfg.flush_duration))
403        {
404            Ok(handle) => Some(handle),
405            Err(observed_error) => {
406                return err::new_err(err::FXE, observed_error);
407            }
408        };
409
410        Ok(Self { core, flush_tx_handle, _t: core::marker::PhantomData })
411    }
412
413    /// Create/open a new [`FrozenFile`] instance
414    fn open_file(
415        path: std::path::PathBuf,
416        cfg: &FrozenMMapCfg,
417    ) -> FrozenResult<(FrozenFile, usize)> {
418        let ff_cfg = FrozenFileCfg {
419            path,
420            module_id: cfg.module_id,
421            buffer_size: Self::SLOT_SIZE,
422            initial_available_buffers: cfg.initial_count,
423        };
424
425        let file = FrozenFile::new(ff_cfg)?;
426        let curr_length = file.length()?;
427
428        Ok((file, curr_length))
429    }
430
431    #[inline]
432    fn validate_t() -> FrozenResult<()> {
433        if std::mem::needs_drop::<T>() {
434            return err::new_err_default(err::DRP);
435        }
436
437        let align = std::mem::align_of::<T>();
438        if align != 8 {
439            return err::new_err_default(err::ALN);
440        }
441
442        let size = std::mem::size_of::<T>();
443        if size == 0 {
444            return err::new_err_default(err::ZRO);
445        }
446
447        if size % 8 != 0 {
448            return err::new_err_default(err::SZE);
449        }
450
451        Ok(())
452    }
453
454    /// Read a `T` at given `index` via callback (`f`)
455    ///
456    /// ## Concurrency
457    ///
458    /// Internally, [`FrozenMMap`] implements per-slot locking, so concurrent reads and writes for
459    /// at same index is atomic and thread safe, while operations on different indices may proceed
460    /// fully in parallel
461    ///
462    /// ## Safety
463    ///
464    /// The caller must ensure following:
465    ///
466    /// - given `index` is within bounds
467    /// - underlying memory contains a valid instance of `T`
468    /// - provided callback `f` must not,
469    ///   - write through the pointer
470    ///   - store or leak pointer beyound there lifetime
471    ///
472    /// Violating any of the above may result in undefined behavior
473    ///
474    /// ## Example
475    ///
476    /// ```
477    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
478    ///
479    /// const MODULE_ID: u8 = 0;
480    ///
481    /// let dir = tempfile::tempdir().unwrap();
482    /// let path = dir.path().join("tmp_read_mmap");
483    ///
484    /// let cfg = FrozenMMapCfg {
485    ///     module_id: MODULE_ID,
486    ///     initial_count: 0x02,
487    ///     flush_duration: std::time::Duration::from_micros(0x60),
488    /// };
489    ///
490    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
491    ///
492    /// let ticket = unsafe { mmap.write(0, |v| *v = 0x0A) }.unwrap();
493    /// let _ = futures::executor::block_on(ticket).unwrap();
494    ///
495    /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
496    /// assert_eq!(val, 0x0A);
497    /// ```
498    #[inline(always)]
499    pub unsafe fn read<R>(&self, index: usize, f: impl FnOnce(*const T) -> R) -> FrozenResult<R> {
500        let offset = Self::SLOT_SIZE * index;
501        let _lock = self.core.locks.lock(index);
502
503        // NOTE: We do avoid acquiring io_lock for read ops to increase the throughput (under the
504        // assumption of OS guarantees visibility)
505
506        let ptr = unsafe { self.core.map.as_ptr(offset) };
507        Ok(f(ptr))
508    }
509
510    /// Write/update a `T` at given `index` via callback (`f`)
511    ///
512    /// ## Concurrency
513    ///
514    /// Internally, [`FrozenMMap`] implements per-slot locking, so concurrent reads and writes for
515    /// at same index is atomic and thread safe, while operations on different indices may proceed
516    /// fully in parallel
517    ///
518    /// ## Safety
519    ///
520    /// The caller must ensure following:
521    ///
522    /// - given `index` is within bounds
523    /// - underlying memory contains a valid instance of `T`
524    /// - provided callback `f` must not  store or leak pointer beyound there lifetime
525    ///
526    /// Violating any of the above may result in undefined behavior
527    ///
528    /// ## Example
529    ///
530    /// ```
531    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
532    ///
533    /// const MODULE_ID: u8 = 0;
534    ///
535    /// let dir = tempfile::tempdir().unwrap();
536    /// let path = dir.path().join("tmp_write_mmap");
537    ///
538    /// let cfg = FrozenMMapCfg {
539    ///     module_id: MODULE_ID,
540    ///     initial_count: 0x02,
541    ///     flush_duration: std::time::Duration::from_micros(0x96),
542    /// };
543    ///
544    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
545    ///
546    /// let ticket = unsafe {mmap.write(1, |v| *v = 0x2B) }.unwrap();
547    /// let _ = futures::executor::block_on(ticket).unwrap();
548    ///
549    /// let val = unsafe { mmap.read(1, |v| *v) }.unwrap();
550    /// assert_eq!(val, 0x2B);
551    /// ```
552    #[inline(always)]
553    pub unsafe fn write(
554        &self,
555        index: usize,
556        f: impl FnOnce(*mut T),
557    ) -> FrozenResult<ack::AckTicket> {
558        // propagate prev errors
559        if let Some(err) = self.core.completion.get_err() {
560            return Err(err);
561        }
562
563        let offset = Self::SLOT_SIZE * index;
564
565        let _guard = self.core.acquire_io_lock();
566        let _lock = self.core.locks.lock(index);
567
568        let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
569        f(ptr);
570
571        self.core.dirty.store(true, atomic::Ordering::Release);
572        let epoch = self.core.completion.increment_current_epoch();
573
574        Ok(ack::AckTicket::new(epoch, self.core.completion.clone()))
575    }
576
577    /// Read current available count of slots, where each slot has size of [`FrozenMMap::<T>::SLOT_SIZE`]
578    ///
579    /// ## Working
580    ///
581    /// This call performs a syscall to fetch current length of [`FrozenFile`] from fs, as the
582    /// current length of the file is not cached anywhere in the pipeline to avoid TOCTAU race
583    /// conditions
584    ///
585    /// ## Example
586    ///
587    /// ```
588    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
589    ///
590    /// const MODULE_ID: u8 = 0;
591    ///
592    /// let dir = tempfile::tempdir().unwrap();
593    /// let path = dir.path().join("tmp_grow_mmap");
594    ///
595    /// let cfg = FrozenMMapCfg {
596    ///     module_id: MODULE_ID,
597    ///     initial_count: 0x02,
598    ///     flush_duration: std::time::Duration::from_micros(0x96),
599    /// };
600    ///
601    /// let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
602    /// assert_eq!(mmap.total_slots(), 0x02);
603    ///
604    /// drop(mmap);
605    ///
606    /// let mmap = FrozenMMap::<u64>::new_grown(&path, cfg, 0x03).unwrap();
607    /// assert_eq!(mmap.total_slots(), 0x02 + 0x03);
608    /// ```
609    #[inline]
610    pub fn total_slots(&self) -> usize {
611        self.core.curr_length / Self::SLOT_SIZE
612    }
613
614    /// Read the total memory footprint (in bytes) used by [`FrozenMMap`]
615    ///
616    /// *NOTE:* This is an approzimation of memory used, actual RSS may differ depending on paging
617    /// and OS
618    ///
619    /// ## Example
620    ///
621    /// ```
622    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
623    ///
624    /// const MODULE_ID: u8 = 0;
625    ///
626    /// let dir = tempfile::tempdir().unwrap();
627    /// let path = dir.path().join("tmp_mem_usage");
628    ///
629    /// let cfg = FrozenMMapCfg {
630    ///     module_id: MODULE_ID,
631    ///     initial_count: 0x10,
632    ///     flush_duration: std::time::Duration::from_micros(0x60),
633    /// };
634    ///
635    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
636    ///
637    /// let bytes = mmap.memory_usage();
638    /// assert!(bytes >= mmap.total_slots() * std::mem::size_of::<u64>());
639    /// ```
640    #[inline]
641    pub fn memory_usage(&self) -> usize {
642        // memory of mem-maped region
643        let mmap_bytes = self.core.curr_length;
644
645        // memory used for locking
646        let lock_bytes = self.core.locks.0.len() * std::mem::size_of::<atomic::AtomicU8>();
647
648        mmap_bytes + lock_bytes
649    }
650
651    /// Create a new [`FrozenMMapTransaction`] context for grouping multi write ops into a single atomic
652    /// operation
653    ///
654    /// ## Overview
655    ///
656    /// The use of [`FrozenMMapTransaction`] allows to group multiple write ops into a single atomic
657    /// operation, hence creating a transactional write operation, which gives following guarantees:
658    ///
659    /// - All write ops succeed together
660    /// - Single epoch to track durability of all writes ops
661    /// - Same durability guarantee for all the included write ops
662    ///
663    /// Simply, this preserves atomic durability semantics for multi index updates
664    ///
665    /// ## Example
666    ///
667    /// ```
668    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
669    ///
670    /// const MODULE_ID: u8 = 0;
671    ///
672    /// let dir = tempfile::tempdir().unwrap();
673    /// let path = dir.path().join("tmp_tx");
674    ///
675    /// let cfg = FrozenMMapCfg {
676    ///     module_id: MODULE_ID,
677    ///     initial_count: 0x0A,
678    ///     flush_duration: std::time::Duration::from_micros(50),
679    /// };
680    ///
681    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
682    ///
683    /// let mut tx = mmap.new_tx();
684    /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
685    /// unsafe { tx.write(1, |v| *v = 0x14) }.unwrap();
686    ///
687    /// let ticket = tx.commit().unwrap();
688    /// let _ = futures::executor::block_on(ticket).unwrap();
689    ///
690    /// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
691    /// let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
692    ///
693    /// assert_eq!((v0, v1), (0x0A, 0x14));
694    /// ```
695    #[inline]
696    pub fn new_tx(&self) -> FrozenMMapTransaction<'_, T> {
697        FrozenMMapTransaction { core: &self.core, ops_vec: Vec::new() }
698    }
699
700    /// Delete the underlying [`FrozenFile`] used for [`FrozenMMap`] from fs
701    ///
702    /// ## Working
703    ///
704    /// When `delete` is called, all read, write, and (background) sync ops are paused
705    /// (indefinitely), whule deletion is done with following steps:
706    ///
707    /// - acquire an exclusive `io_lock` (all other ops are paused indefinitely)
708    /// - if any batch is pending for sync,
709    ///   - swap the flag
710    ///   - call sync manually
711    ///   - incr epoch and update cv
712    /// - brodcast closing so flusher tx could wrap up
713    /// - `munmap(2)` current mapping
714    /// - call delete on [`FrozenFile`]
715    ///
716    /// ## Example
717    ///
718    /// ```
719    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
720    ///
721    /// const MODULE_ID: u8 = 0;
722    ///
723    /// let dir = tempfile::tempdir().unwrap();
724    /// let path = dir.path().join("tmp_delete_mmap");
725    ///
726    /// let cfg = FrozenMMapCfg {
727    ///     module_id: MODULE_ID,
728    ///     initial_count: 0x04,
729    ///     flush_duration: std::time::Duration::from_micros(0x96),
730    /// };
731    ///
732    /// let mut mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
733    /// mmap.delete().unwrap();
734    /// assert!(!path.exists());
735    /// ```
736    pub fn delete(&mut self) -> FrozenResult<()> {
737        // NOTE: we must broadcast that the close is happening to allow flusher tx to wrap up
738        self.core.dirty.store(false, atomic::Ordering::Release);
739        self.core.closed.store(true, atomic::Ordering::Release);
740        self.core.durable_cv.notify_one();
741
742        if let Some(handle) = self.flush_tx_handle.take() {
743            let _ = handle.join();
744        }
745
746        // pause all new write ops
747        let _lock = self.core.acquire_exclusive_io_lock();
748
749        self.munmap()?;
750        self.core.file.delete()
751    }
752
753    /// Flush the dirty mmap data to persistant storage
754    ///
755    /// *NOTE:* This call must be paired w/ [`FrozenMMap::flush_file`] for stronger durability
756    /// guarantee
757    ///
758    /// ## Example
759    ///
760    /// ```
761    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
762    ///
763    /// const MODULE_ID: u8 = 0;
764    ///
765    /// let dir = tempfile::tempdir().unwrap();
766    /// let path = dir.path().join("tmp_delete_mmap");
767    ///
768    /// let cfg = FrozenMMapCfg {
769    ///     module_id: MODULE_ID,
770    ///     initial_count: 0x04,
771    ///     flush_duration: std::time::Duration::from_micros(0x96),
772    /// };
773    ///
774    /// let mut mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
775    /// assert!(unsafe { mmap.flush_mmap() }.is_ok());
776    /// ```
777    #[inline]
778    pub unsafe fn flush_mmap(&self) -> FrozenResult<()> {
779        self.core.map.sync(self.core.curr_length)
780    }
781
782    /// Perform hard flush for the dirty mmap'ed data for stronger durability guarantee
783    ///
784    /// ## Example
785    ///
786    /// ```
787    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
788    ///
789    /// const MODULE_ID: u8 = 0;
790    ///
791    /// let dir = tempfile::tempdir().unwrap();
792    /// let path = dir.path().join("tmp_delete_mmap");
793    ///
794    /// let cfg = FrozenMMapCfg {
795    ///     module_id: MODULE_ID,
796    ///     initial_count: 0x04,
797    ///     flush_duration: std::time::Duration::from_micros(0x96),
798    /// };
799    ///
800    /// let mut mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
801    /// assert!(unsafe { mmap.flush_file() }.is_ok());
802    /// ```
803    #[inline]
804    pub unsafe fn flush_file(&self) -> FrozenResult<()> {
805        self.core.file.sync()
806    }
807
808    #[inline]
809    fn munmap(&self) -> FrozenResult<()> {
810        let length = self.core.curr_length;
811        unsafe { self.core.map.unmap(length) }
812    }
813}
814
815impl<T> Drop for FrozenMMap<T>
816where
817    T: Sized + Send + Sync,
818{
819    fn drop(&mut self) {
820        let is_closed = self.core.closed.swap(true, atomic::Ordering::Release);
821        self.core.cv.notify_one(); // notify flusher tx to shut
822
823        if let Some(handle) = self.flush_tx_handle.take() {
824            let _ = handle.join();
825        }
826
827        // we must acquire an exclusive lock, to prevent dropping while sync, growing or any io ops
828        let _io_lock = self.core.acquire_exclusive_io_lock();
829
830        if !is_closed {
831            let _ = self.munmap();
832        }
833    }
834}
835
836impl<T> fmt::Display for FrozenMMap<T>
837where
838    T: Sized + Send + Sync,
839{
840    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
841        write!(
842            f,
843            "FrozenMMap{{fd: {}, total_slots: {}, len: {}}}",
844            self.core.file.fd(),
845            self.total_slots(),
846            self.core.curr_length,
847        )
848    }
849}
850
851/// ## Why we ignore [`std::sync::PoisonError`]?
852///
853/// The mutex used for lock, is solely used as a parking primitive for [`Condvar`] and does not
854/// protect any mutable state. All the pool invariants and accounting are maintained via atomics
855/// and are completely seperated from the mutex.
856///
857/// A poisoned mutex only indicates that another tx panicked while holding the lock, and indicates
858/// an inconsistent state of the protected value. Since no state can be left partially modified
859/// under this lock, there is no possible consistency risk to recover from and propagating the
860/// poison error would only introduce unnecessary failures into the allocation path.
861///
862/// Therefore, as best effort, we consume the [`std::sync::PoisonError`] and continue operating
863/// with the recovered guard.
864fn bg_flush_thread(core: sync::Arc<Core>, flush_duration: time::Duration) {
865    // init phase (acquiring locks)
866    let mut guard = core.lock.lock().unwrap_or_else(|e| e.into_inner());
867
868    // sync loop w/ non-busy waiting
869    loop {
870        (guard, _) = core.cv.wait_timeout(guard, flush_duration).unwrap_or_else(|e| e.into_inner());
871
872        // NOTE: we must read values of close brodcast before acquire exclusive lock, if done
873        // otherwise, we impose serious deadlock sort of situation for the the flusher tx
874
875        let dirty = core.dirty.swap(false, atomic::Ordering::AcqRel);
876        let closing = core.closed.load(atomic::Ordering::Acquire);
877
878        if !dirty {
879            if closing {
880                core.cv.notify_all();
881                return;
882            }
883
884            continue;
885        }
886
887        // INFO: we must acquire an exclusive IO lock for sync, hence no write, read or
888        // grow could kick in while sync is in progress
889
890        let io_lock = core.acquire_exclusive_io_lock();
891
892        // INFO: we must drop the guard before syscall, as its a blocking operation and holding
893        // the mutex while the syscall takes place is not a good idea, while we drop the mutex
894        // and acqurie it again, in-between other process could acquire it and use it
895        drop(guard);
896
897        // NOTE: We snapshot `current_epoch` before sync to establish a strict batch boundary,
898        // all writes up to `target_epoch` are guaranteed to be part of this flush window,
899        // while anything beyound belongs to the next batch
900        let target_epoch = core.completion.read_current_epoch();
901
902        // NOTE:
903        //
904        // - if sync fails, we update the Core::error w/ the received error object
905        // - we clear it up when another sync call succeeds
906        // - this is valid, as the underlying sync flushes entire mmaped region, hence
907        //   even if the last call failed, and the new one succeeded, we do get the durability
908        //   guarenty for the old data as well
909
910        if let Err(new_error) = core.sync() {
911            core.completion.set_err(new_error);
912        } else {
913            core.completion.mark_epoch_as_durable(target_epoch);
914            core.completion.del_err();
915        }
916
917        // NOTE: notify all listeners currently waiting for durability progress
918        core.completion.notify_all_listeners();
919
920        drop(io_lock);
921        guard = core.acquire_lock();
922    }
923}
924
925/// A context for grouping multi write ops into a single atomic operation
926///
927/// ## Overview
928///
929/// Use of [`FrozenMMapTransaction`] allows to group multiple write ops into a single atomic operation.
930///
931/// - All included writes are applied together
932/// - Single epoch is assinged for an entier transaction
933/// - Durability guarantee is same for all included write ops
934///
935/// ## Example
936///
937/// ```
938/// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
939///
940/// const MODULE_ID: u8 = 0;
941///
942/// let dir = tempfile::tempdir().unwrap();
943/// let path = dir.path().join("tmp_tx");
944///
945/// let cfg = FrozenMMapCfg {
946///     module_id: MODULE_ID,
947///     initial_count: 0x0A,
948///     flush_duration: std::time::Duration::from_micros(50),
949/// };
950///
951/// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
952///
953/// let mut tx = mmap.new_tx();
954/// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
955/// unsafe { tx.write(1, |v| *v = 0x14) }.unwrap();
956/// unsafe { tx.write(2, |v| *v = 0x18) }.unwrap();
957///
958/// let ticket = tx.commit().unwrap();
959/// let _ = futures::executor::block_on(ticket).unwrap();
960///
961/// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
962/// let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
963/// let v2 = unsafe { mmap.read(2, |v| *v).unwrap() };
964///
965/// assert_eq!((v0, v1, v2), (0x0A, 0x14, 0x18));
966/// ```
967pub struct FrozenMMapTransaction<'a, T> {
968    core: &'a Core,
969    ops_vec: Vec<(usize, Box<dyn FnOnce(*mut T) + 'a>)>,
970}
971
972impl<'a, T> FrozenMMapTransaction<'a, T> {
973    /// Append a write op into the [`FrozenMMapTransaction`]
974    ///
975    /// ## Requirements
976    ///
977    /// Write ops must follow these safety requirements,
978    ///
979    /// - No duplicate indices
980    /// - No out-of-order writes (must be incremental)
981    ///
982    /// Violating this constraint would result in [`FrozenError`]
983    ///
984    /// ## Safety
985    ///
986    /// Same safety requirements as [`FrozenMMap::write`] apply here
987    ///
988    /// ## Example
989    ///
990    /// ```
991    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
992    ///
993    /// const MODULE_ID: u8 = 0;
994    ///
995    /// let dir = tempfile::tempdir().unwrap();
996    /// let path = dir.path().join("tmp_tx");
997    ///
998    /// let cfg = FrozenMMapCfg {
999    ///     module_id: MODULE_ID,
1000    ///     initial_count: 0x10,
1001    ///     flush_duration: std::time::Duration::from_micros(50),
1002    /// };
1003    ///
1004    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1005    ///
1006    /// let mut tx = mmap.new_tx();
1007    /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
1008    /// unsafe { tx.write(1, |v| *v = 0x0B) }.unwrap();
1009    /// unsafe { tx.write(2, |v| *v = 0x0C) }.unwrap();
1010    ///
1011    /// let ticket = tx.commit().unwrap();
1012    /// let _ = futures::executor::block_on(ticket).unwrap();
1013    ///
1014    /// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1015    /// let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
1016    /// let v2 = unsafe { mmap.read(2, |v| *v).unwrap() };
1017    ///
1018    /// assert_eq!((v0, v1, v2), (0x0A, 0x0B, 0x0C));
1019    /// ```
1020    #[inline(always)]
1021    pub unsafe fn write<F>(&mut self, index: usize, f: F) -> FrozenResult<()>
1022    where
1023        F: FnOnce(*mut T) + 'a,
1024    {
1025        // NOTE:
1026        //
1027        // This check prevents a potential footgun! For a safe transaction all writes must be,
1028        // - ordered by index (either incr or decr)
1029        // - no multi writes on same index
1030        //
1031        // If any of these is violated, there is a certain risk of deadlock in multi tx env's
1032        if let Some((last_idx, _)) = self.ops_vec.last() {
1033            if index <= *last_idx {
1034                return err::new_err(
1035                    err::HCF,
1036                    "tx writes must be strictly ordered, with no more then single ops on given index",
1037                );
1038            }
1039        }
1040
1041        self.ops_vec.push((index, Box::new(f)));
1042        Ok(())
1043    }
1044
1045    /// Commit the transaction, applying all the writes ops, combined into a single atomic operation
1046    ///
1047    /// ## Guarantees
1048    ///
1049    /// - All writes are applied under a single epoch
1050    /// - All writes belong to the same durability batch
1051    /// - No interleaving with other transactions at epoch level
1052    ///
1053    /// ## Example
1054    ///
1055    /// ```
1056    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
1057    ///
1058    /// const MODULE_ID: u8 = 0;
1059    ///
1060    /// let dir = tempfile::tempdir().unwrap();
1061    /// let path = dir.path().join("tmp_tx");
1062    ///
1063    /// let cfg = FrozenMMapCfg {
1064    ///     module_id: MODULE_ID,
1065    ///     initial_count: 0x10,
1066    ///     flush_duration: std::time::Duration::from_micros(50),
1067    /// };
1068    ///
1069    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1070    ///
1071    /// let mut tx = mmap.new_tx();
1072    /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
1073    /// unsafe { tx.write(2, |v| *v = 0x0C) }.unwrap();
1074    ///
1075    /// let ticket = tx.commit().unwrap();
1076    /// let _ = futures::executor::block_on(ticket).unwrap();
1077    ///
1078    /// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1079    /// let v1 = unsafe { mmap.read(2, |v| *v).unwrap() };
1080    ///
1081    /// assert_eq!((v0, v1), (0x0A, 0x0C));
1082    /// ```
1083    #[inline(always)]
1084    pub fn commit(self) -> FrozenResult<ack::AckTicket> {
1085        if let Some(err) = self.core.completion.get_err() {
1086            return Err(err);
1087        }
1088
1089        let _guard = self.core.acquire_io_lock();
1090
1091        // NOTE: we must acquire all locks beforehand, to make sure all the write go through
1092        let mut guards = Vec::with_capacity(self.ops_vec.len());
1093        for (idx, _) in &self.ops_vec {
1094            guards.push(self.core.locks.lock(*idx));
1095        }
1096
1097        for (idx, op) in self.ops_vec {
1098            let offset = idx * std::mem::size_of::<T>();
1099            let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
1100            op(ptr);
1101        }
1102
1103        self.core.dirty.store(true, atomic::Ordering::Release);
1104        let epoch = self.core.completion.increment_current_epoch();
1105
1106        Ok(ack::AckTicket::new(epoch, self.core.completion.clone()))
1107    }
1108}
1109
1110#[derive(Debug)]
1111struct Core {
1112    completion: sync::Arc<ack::Completion>,
1113    cv: sync::Condvar,
1114    curr_length: usize,
1115    dirty: atomic::AtomicBool,
1116    io_lock: sync::RwLock<()>,
1117    map: TMap,
1118    locks: Locks,
1119    lock: sync::Mutex<()>,
1120    file: FrozenFile,
1121    durable_cv: sync::Condvar,
1122    closed: atomic::AtomicBool,
1123}
1124
1125unsafe impl Send for Core {}
1126unsafe impl Sync for Core {}
1127
1128impl Core {
1129    fn new(map: TMap, file: FrozenFile, curr_length: usize, total_slots: usize) -> Self {
1130        Self {
1131            map,
1132            file,
1133            curr_length,
1134            completion: sync::Arc::new(ack::Completion::default()),
1135            cv: sync::Condvar::new(),
1136            lock: sync::Mutex::new(()),
1137            io_lock: sync::RwLock::new(()),
1138            locks: Locks::new(total_slots),
1139            durable_cv: sync::Condvar::new(),
1140            dirty: atomic::AtomicBool::new(false),
1141            closed: atomic::AtomicBool::new(false),
1142        }
1143    }
1144
1145    #[inline]
1146    fn sync(&self) -> FrozenResult<()> {
1147        unsafe { self.map.sync(self.curr_length) }?;
1148        self.file.sync()
1149    }
1150
1151    /// NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
1152    #[inline]
1153    fn acquire_lock(&self) -> sync::MutexGuard<'_, ()> {
1154        self.lock.lock().unwrap_or_else(|e| e.into_inner())
1155    }
1156
1157    /// NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
1158    #[inline]
1159    fn acquire_io_lock(&self) -> sync::RwLockReadGuard<'_, ()> {
1160        self.io_lock.read().unwrap_or_else(|e| e.into_inner())
1161    }
1162
1163    /// NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
1164    #[inline]
1165    fn acquire_exclusive_io_lock(&self) -> sync::RwLockWriteGuard<'_, ()> {
1166        self.io_lock.write().unwrap_or_else(|e| e.into_inner())
1167    }
1168}
1169
1170#[derive(Debug)]
1171struct Locks(Box<[atomic::AtomicU8]>);
1172
1173impl Locks {
1174    const LOCK: u8 = 1;
1175    const UNLOCK: u8 = 0;
1176
1177    const L1_CONTENTION: usize = 0x10;
1178    const L2_CONTENTION: usize = 0x20;
1179
1180    fn new(cap: usize) -> Self {
1181        let mut slots = Vec::with_capacity(cap);
1182        for _ in 0..cap {
1183            slots.push(atomic::AtomicU8::new(Self::UNLOCK));
1184        }
1185
1186        Self(slots.into_boxed_slice())
1187    }
1188
1189    #[inline(always)]
1190    fn lock(&self, index: usize) -> LockGuard<'_> {
1191        let val = &self.0[index];
1192        let mut spins = 0;
1193
1194        loop {
1195            if val
1196                .compare_exchange_weak(
1197                    Self::UNLOCK,
1198                    Self::LOCK,
1199                    atomic::Ordering::Acquire,
1200                    atomic::Ordering::Relaxed,
1201                )
1202                .is_ok()
1203            {
1204                return LockGuard(val);
1205            }
1206
1207            // we must backoff under contention
1208
1209            // NOTE:
1210            //
1211            // We have three levels of backoff,
1212            //
1213            // 1. Busy waiting w/ CPU hint
1214            // 2. Willingly allow preemption by OS schedular
1215            // 3. Sleep the thread (increasing w/ exponenial factor)
1216
1217            if hints::likely(spins < Self::L1_CONTENTION) {
1218                std::hint::spin_loop();
1219            } else if spins < Self::L2_CONTENTION {
1220                thread::yield_now();
1221            } else {
1222                let ns = 0x30 << (spins - Self::L2_CONTENTION).min(0x0A);
1223                thread::sleep(time::Duration::from_nanos(ns));
1224            }
1225
1226            spins += 1;
1227        }
1228    }
1229}
1230
1231struct LockGuard<'a>(&'a atomic::AtomicU8);
1232
1233impl Drop for LockGuard<'_> {
1234    fn drop(&mut self) {
1235        self.0.store(Locks::UNLOCK, atomic::Ordering::Release);
1236    }
1237}
1238
1239#[cfg(test)]
1240mod tests {
1241    use super::*;
1242
1243    // NOTE: we keep this small on purpose, so we won't have to wait at all in tests
1244    const FLUSH_DURATION: time::Duration = time::Duration::from_micros(10);
1245
1246    const MODULE_ID: u8 = 0;
1247    const INIT_SLOTS: usize = 0x0A;
1248
1249    fn new_tmp() -> (tempfile::TempDir, std::path::PathBuf, FrozenMMapCfg) {
1250        let dir = tempfile::tempdir().unwrap();
1251        let path = dir.path().join("tmp_map");
1252
1253        let cfg = FrozenMMapCfg {
1254            module_id: MODULE_ID,
1255            initial_count: INIT_SLOTS,
1256            flush_duration: FLUSH_DURATION,
1257        };
1258
1259        (dir, path, cfg)
1260    }
1261
1262    mod fm_lifecycle {
1263        use super::*;
1264
1265        #[test]
1266        fn ok_new() {
1267            let (_dir, path, cfg) = new_tmp();
1268            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1269
1270            assert!(!mmap.core.dirty.load(atomic::Ordering::Acquire));
1271            assert!(!mmap.core.closed.load(atomic::Ordering::Acquire));
1272            assert_eq!(mmap.core.curr_length, INIT_SLOTS * FrozenMMap::<u64>::SLOT_SIZE);
1273
1274            assert_eq!(mmap.core.completion.read_current_epoch(), 0);
1275            assert_eq!(mmap.core.completion.read_durable_epoch(), 0);
1276
1277            // satisfies the bg thread was spawned correctly
1278            assert!(mmap.core.completion.get_err().is_none());
1279
1280            // satisfies wait on epoch works
1281            let ticket = unsafe { mmap.write(0, |f| *f = 0x0A).unwrap() };
1282
1283            let ticket_epoch = ticket.epoch();
1284            let durable_epoch = futures::executor::block_on(ticket).unwrap();
1285
1286            assert!(durable_epoch >= ticket_epoch);
1287        }
1288
1289        #[test]
1290        fn ok_new_existing() {
1291            let (_dir, path, cfg) = new_tmp();
1292
1293            // create new + close
1294            let mmap1 = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1295            drop(mmap1);
1296
1297            // open existing
1298            let mmap2 = FrozenMMap::<u64>::new(path, cfg).unwrap();
1299            drop(mmap2);
1300        }
1301
1302        #[test]
1303        fn err_new_when_change_in_cfg() {
1304            let (_dir, path, mut cfg) = new_tmp();
1305
1306            // create new + close
1307            let mmap1 = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1308            drop(mmap1);
1309
1310            // update cfg + opne existing
1311            cfg.initial_count = INIT_SLOTS * 2;
1312            assert!(FrozenMMap::<u64>::new(path, cfg).is_err());
1313        }
1314
1315        #[test]
1316        fn ok_delete() {
1317            let (_dir, path, cfg) = new_tmp();
1318            let mut mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1319
1320            mmap.delete().unwrap();
1321            assert!(!mmap.core.file.exists().unwrap());
1322        }
1323
1324        #[test]
1325        fn err_delete_after_delete() {
1326            let (_dir, path, cfg) = new_tmp();
1327            let mut mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1328
1329            mmap.delete().unwrap();
1330            assert!(!mmap.core.file.exists().unwrap());
1331            assert!(mmap.delete().is_err());
1332        }
1333
1334        #[test]
1335        fn ok_drop_persists_when_dropped_before_bg_flush() {
1336            let (_dir, path, cfg) = new_tmp();
1337            const VAL: u64 = 0x0A;
1338
1339            {
1340                let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1341                unsafe { mmap.write(0, |byte| *byte = VAL).unwrap() };
1342                drop(mmap);
1343            }
1344
1345            {
1346                let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1347                let val = unsafe { mmap.read(0, |byte| *byte).unwrap() };
1348                assert_eq!(val, VAL);
1349            }
1350        }
1351    }
1352
1353    mod fm_validate_t {
1354        use super::*;
1355
1356        #[repr(C, align(8))]
1357        struct OkT {
1358            a: u64,
1359            b: u64,
1360        }
1361
1362        #[repr(C)]
1363        struct BadAlignT {
1364            a: u32,
1365            b: u32,
1366        }
1367
1368        #[repr(C, align(4))]
1369        struct BadSizeT {
1370            a: u32,
1371            b: u16,
1372        }
1373
1374        #[repr(C, align(8))]
1375        struct DropT(u64);
1376
1377        impl Drop for DropT {
1378            fn drop(&mut self) {}
1379        }
1380
1381        #[repr(C, align(8))]
1382        struct ZstT;
1383
1384        #[test]
1385        fn ok_validate_t() {
1386            assert!(FrozenMMap::<OkT>::validate_t().is_ok());
1387        }
1388
1389        #[test]
1390        fn err_validate_t_when_drop() {
1391            assert!(FrozenMMap::<DropT>::validate_t().is_err());
1392        }
1393
1394        #[test]
1395        fn err_validate_t_when_not_8_byte_aligned() {
1396            assert!(FrozenMMap::<BadAlignT>::validate_t().is_err());
1397        }
1398
1399        #[test]
1400        fn err_validate_t_when_size_not_multiple_of_8() {
1401            assert!(FrozenMMap::<BadSizeT>::validate_t().is_err());
1402        }
1403
1404        #[test]
1405        fn err_validate_t_when_zero_sized() {
1406            assert!(FrozenMMap::<ZstT>::validate_t().is_err());
1407        }
1408
1409        #[test]
1410        fn err_new_when_t_implements_drop() {
1411            let (_dir, path, cfg) = new_tmp();
1412            assert!(FrozenMMap::<DropT>::new(path, cfg).is_err());
1413        }
1414
1415        #[test]
1416        fn err_new_when_t_is_not_8_byte_aligned() {
1417            let (_dir, path, cfg) = new_tmp();
1418            assert!(FrozenMMap::<BadAlignT>::new(path, cfg).is_err());
1419        }
1420
1421        #[test]
1422        fn err_new_when_t_size_is_not_multiple_of_8() {
1423            let (_dir, path, cfg) = new_tmp();
1424            assert!(FrozenMMap::<BadSizeT>::new(path, cfg).is_err());
1425        }
1426
1427        #[test]
1428        fn err_new_when_t_is_zero_sized() {
1429            let (_dir, path, cfg) = new_tmp();
1430            assert!(FrozenMMap::<ZstT>::new(path, cfg).is_err());
1431        }
1432
1433        #[test]
1434        fn err_new_grown_when_t_implements_drop() {
1435            let (_dir, path, cfg) = new_tmp();
1436            assert!(FrozenMMap::<DropT>::new_grown(path, cfg, 1).is_err());
1437        }
1438
1439        #[test]
1440        fn err_new_grown_when_t_is_not_8_byte_aligned() {
1441            let (_dir, path, cfg) = new_tmp();
1442            assert!(FrozenMMap::<BadAlignT>::new_grown(path, cfg, 1).is_err());
1443        }
1444
1445        #[test]
1446        fn err_new_grown_when_t_size_is_not_multiple_of_8() {
1447            let (_dir, path, cfg) = new_tmp();
1448            assert!(FrozenMMap::<BadSizeT>::new_grown(path, cfg, 1).is_err());
1449        }
1450
1451        #[test]
1452        fn err_new_grown_when_t_is_zero_sized() {
1453            let (_dir, path, cfg) = new_tmp();
1454            assert!(FrozenMMap::<ZstT>::new_grown(path, cfg, 1).is_err());
1455        }
1456    }
1457
1458    mod fm_new_grown {
1459        use super::*;
1460
1461        #[test]
1462        fn ok_new_grown_updates_length() {
1463            let (_dir, path, cfg) = new_tmp();
1464
1465            let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1466            assert_eq!(mmap.total_slots(), INIT_SLOTS);
1467            drop(mmap);
1468
1469            let mmap = FrozenMMap::<u64>::new_grown(path, cfg, 0x0A).unwrap();
1470            assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x0A);
1471            assert_eq!(mmap.core.curr_length, (INIT_SLOTS + 0x0A) * FrozenMMap::<u64>::SLOT_SIZE);
1472        }
1473
1474        #[test]
1475        fn err_new_grown_with_preexisting_instance() {
1476            let (_dir, path, cfg) = new_tmp();
1477
1478            let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1479            assert_eq!(mmap.total_slots(), INIT_SLOTS);
1480
1481            assert!(FrozenMMap::<u64>::new_grown(path, cfg, 0x0A).is_err());
1482        }
1483
1484        #[test]
1485        fn ok_new_grown_cycle() {
1486            let (_dir, path, cfg) = new_tmp();
1487
1488            let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1489            drop(mmap);
1490
1491            let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x100).unwrap();
1492            assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x100);
1493            drop(mmap);
1494
1495            let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x100).unwrap();
1496            assert_eq!(mmap.total_slots(), INIT_SLOTS + (2 * 0x100));
1497            drop(mmap);
1498
1499            let mmap = FrozenMMap::<u64>::new_grown(path, cfg, 0x100).unwrap();
1500            assert_eq!(mmap.total_slots(), INIT_SLOTS + (3 * 0x100));
1501        }
1502
1503        #[test]
1504        fn ok_write_reopen_grown_read() {
1505            let (_dir, path, cfg) = new_tmp();
1506
1507            {
1508                let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1509                unsafe { mmap.write(0, |v| *v = 0xAA).unwrap() };
1510            }
1511
1512            {
1513                let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x10).unwrap();
1514                unsafe { mmap.write(0, |v| *v = 0xBB).unwrap() };
1515
1516                let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1517                assert_eq!(val, 0xBB);
1518            }
1519        }
1520
1521        #[test]
1522        fn ok_write_reopen_grown_read_cycle() {
1523            let (_dir, path, cfg) = new_tmp();
1524
1525            {
1526                let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1527                unsafe { mmap.write(0, |v| *v = 1).unwrap() };
1528            }
1529
1530            for i in 0..2 {
1531                let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x10).unwrap();
1532                let idx = mmap.total_slots() - 1;
1533                unsafe { mmap.write(idx, |v| *v = (i + 2) as u64).unwrap() };
1534                drop(mmap);
1535            }
1536
1537            let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1538
1539            let base = unsafe { mmap.read(0, |v| *v).unwrap() };
1540            assert_eq!(base, 1);
1541
1542            let last_idx = mmap.total_slots() - 1;
1543            let last = unsafe { mmap.read(last_idx, |v| *v).unwrap() };
1544            assert_eq!(last, 3);
1545        }
1546
1547        #[test]
1548        fn err_new_grown_while_previous_instance_is_alive() {
1549            let (_dir, path, cfg) = new_tmp();
1550
1551            let _mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1552            let reopened = FrozenMMap::<u64>::new_grown(&path, cfg, 0x10);
1553
1554            assert!(reopened.is_err());
1555        }
1556    }
1557
1558    mod fm_write_read {
1559        use super::*;
1560
1561        #[test]
1562        fn ok_write_wait_read_cycle() {
1563            const VAL: u64 = 0xDEADC0DE;
1564            let (_dir, path, cfg) = new_tmp();
1565            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1566
1567            // write + sync
1568            let ticket = unsafe { mmap.write(0, |ptr| *ptr = VAL).unwrap() };
1569
1570            let ticket_epoch = ticket.epoch();
1571            let durable_epoch = futures::executor::block_on(ticket).unwrap();
1572
1573            assert!(durable_epoch >= ticket_epoch);
1574
1575            // read + verify
1576            let val = unsafe { mmap.read(0, |ptr| *ptr).unwrap() };
1577            assert_eq!(val, VAL);
1578        }
1579
1580        #[test]
1581        fn ok_write_read_without_wait() {
1582            const VAL: u64 = 0xDEADC0DE;
1583            let (_dir, path, cfg) = new_tmp();
1584            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1585
1586            unsafe { mmap.write(0, |ptr| *ptr = VAL).unwrap() };
1587            let val = unsafe { mmap.read(0, |ptr| *ptr).unwrap() };
1588            assert_eq!(val, VAL);
1589        }
1590    }
1591
1592    mod fm_tx {
1593        use super::*;
1594
1595        #[test]
1596        fn ok_tx_basic_multi_write() {
1597            let (_dir, path, cfg) = new_tmp();
1598            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1599
1600            let mut tx = mmap.new_tx();
1601            unsafe {
1602                tx.write(0, |v| *v = 1).unwrap();
1603                tx.write(1, |v| *v = 2).unwrap();
1604                tx.write(2, |v| *v = 3).unwrap();
1605            }
1606
1607            let ticket = tx.commit().unwrap();
1608
1609            let ticket_epoch = ticket.epoch();
1610            let durable_epoch = futures::executor::block_on(ticket).unwrap();
1611
1612            assert!(durable_epoch >= ticket_epoch);
1613
1614            let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1615            let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
1616            let v2 = unsafe { mmap.read(2, |v| *v).unwrap() };
1617
1618            assert_eq!((v0, v1, v2), (1, 2, 3));
1619        }
1620
1621        #[test]
1622        fn ok_tx_single_epoch() {
1623            let (_dir, path, cfg) = new_tmp();
1624            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1625
1626            let mut tx = mmap.new_tx();
1627            unsafe {
1628                tx.write(0, |v| *v = 0x0A).unwrap();
1629                tx.write(1, |v| *v = 0x14).unwrap();
1630            }
1631
1632            // NOTE: next write must have strictly higher epoch then current one
1633
1634            let ticket = tx.commit().unwrap();
1635            let next_ticket = unsafe { mmap.write(2, |v| *v = 0x1E).unwrap() };
1636
1637            assert!(next_ticket.epoch() > ticket.epoch());
1638        }
1639
1640        #[test]
1641        fn err_tx_out_of_order() {
1642            let (_dir, path, cfg) = new_tmp();
1643            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1644
1645            let mut tx = mmap.new_tx();
1646            unsafe {
1647                tx.write(2, |v| *v = 1).unwrap();
1648                let res = tx.write(1, |v| *v = 2);
1649                assert!(res.is_err());
1650            }
1651        }
1652
1653        #[test]
1654        fn err_tx_duplicate_index() {
1655            let (_dir, path, cfg) = new_tmp();
1656            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1657
1658            let mut tx = mmap.new_tx();
1659            unsafe {
1660                tx.write(1, |v| *v = 1).unwrap();
1661                let res = tx.write(1, |v| *v = 2);
1662                assert!(res.is_err());
1663            }
1664        }
1665
1666        #[test]
1667        fn ok_tx_concurrent_non_overlapping() {
1668            let (_dir, path, cfg) = new_tmp();
1669            let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
1670
1671            let mut handles = Vec::new();
1672            for i in 0..2 {
1673                let mmap = mmap.clone();
1674                handles.push(thread::spawn(move || {
1675                    let mut tx = mmap.new_tx();
1676
1677                    unsafe {
1678                        tx.write(i * 2, |v| *v = i as u64).unwrap();
1679                        tx.write(i * 2 + 1, |v| *v = i as u64).unwrap();
1680                    }
1681
1682                    let ticket = tx.commit().unwrap();
1683
1684                    let ticket_epoch = ticket.epoch();
1685                    let durable_epoch = futures::executor::block_on(ticket).unwrap();
1686
1687                    assert!(durable_epoch >= ticket_epoch);
1688                }));
1689            }
1690
1691            for h in handles {
1692                h.join().unwrap();
1693            }
1694
1695            for i in 0..2 {
1696                let v0 = unsafe { mmap.read(i * 2, |v| *v).unwrap() };
1697                let v1 = unsafe { mmap.read(i * 2 + 1, |v| *v).unwrap() };
1698
1699                assert_eq!((v0, v1), (i as u64, i as u64));
1700            }
1701        }
1702
1703        #[test]
1704        fn ok_tx_overwrite_last_wins() {
1705            let (_dir, path, cfg) = new_tmp();
1706            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1707
1708            let mut tx = mmap.new_tx();
1709            unsafe {
1710                tx.write(0, |v| *v = 1).unwrap();
1711            }
1712
1713            tx.commit().unwrap();
1714
1715            let mut tx2 = mmap.new_tx();
1716            unsafe {
1717                tx2.write(0, |v| *v = 2).unwrap();
1718            }
1719
1720            let ticket = tx2.commit().unwrap();
1721
1722            let ticket_epoch = ticket.epoch();
1723            let durable_epoch = futures::executor::block_on(ticket).unwrap();
1724
1725            assert!(durable_epoch >= ticket_epoch);
1726
1727            let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1728            assert_eq!(val, 2);
1729        }
1730
1731        #[test]
1732        fn ok_tx_persists_across_reopen() {
1733            let (_dir, path, cfg) = new_tmp();
1734
1735            {
1736                let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1737
1738                let mut tx = mmap.new_tx();
1739                unsafe {
1740                    tx.write(0, |v| *v = 0x3A).unwrap();
1741                    tx.write(1, |v| *v = 0x54).unwrap();
1742                }
1743
1744                let ticket = tx.commit().unwrap();
1745
1746                let ticket_epoch = ticket.epoch();
1747                let durable_epoch = futures::executor::block_on(ticket).unwrap();
1748
1749                assert!(durable_epoch >= ticket_epoch);
1750            }
1751
1752            {
1753                let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1754
1755                let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1756                let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
1757
1758                assert_eq!((v0, v1), (0x3A, 0x54));
1759            }
1760        }
1761    }
1762
1763    mod fm_durability {
1764        use super::*;
1765
1766        #[test]
1767        fn ok_wait_then_drop() {
1768            let (_dir, path, cfg) = new_tmp();
1769            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1770
1771            let ticket = unsafe { mmap.write(0, |v| *v = 7).unwrap() };
1772
1773            let ticket_epoch = ticket.epoch();
1774            let durable_epoch = futures::executor::block_on(ticket).unwrap();
1775
1776            assert!(durable_epoch >= ticket_epoch);
1777
1778            drop(mmap);
1779        }
1780
1781        #[test]
1782        fn ok_epoch_monotonicity() {
1783            let (_dir, path, cfg) = new_tmp();
1784            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1785
1786            let t1 = unsafe { mmap.write(0, |v| *v = 1).unwrap() };
1787            let t2 = unsafe { mmap.write(0, |v| *v = 2).unwrap() };
1788
1789            let durable_epoch = futures::executor::block_on(t2).unwrap();
1790            assert!(durable_epoch >= t1.epoch());
1791        }
1792
1793        #[test]
1794        fn ok_wait_for_durability_with_multi_writers() {
1795            let (_dir, path, cfg) = new_tmp();
1796            let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
1797
1798            let mut handles = Vec::new();
1799            for _ in 0..2 {
1800                let mmap = mmap.clone();
1801                handles.push(thread::spawn(move || {
1802                    let ticket = unsafe { mmap.write(0, |v| *v += 1).unwrap() };
1803
1804                    let ticket_epoch = ticket.epoch();
1805                    let durable_epoch = futures::executor::block_on(ticket).unwrap();
1806
1807                    assert!(durable_epoch >= ticket_epoch);
1808                }));
1809            }
1810
1811            for h in handles {
1812                h.join().unwrap();
1813            }
1814
1815            let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1816            assert_eq!(val, 2);
1817        }
1818    }
1819
1820    mod fm_concurrency {
1821        use super::*;
1822
1823        #[test]
1824        fn ok_parallel_reads_with_diff_index() {
1825            let (_dir, path, cfg) = new_tmp();
1826            let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
1827
1828            unsafe { mmap.write(0, |v| *v = 0x10).unwrap() };
1829            unsafe { mmap.write(1, |v| *v = 0x20).unwrap() };
1830
1831            let t1 = {
1832                let mmap = mmap.clone();
1833                thread::spawn(move || unsafe { mmap.read(0, |v| *v).unwrap() })
1834            };
1835
1836            let t2 = {
1837                let mmap = mmap.clone();
1838                thread::spawn(move || unsafe { mmap.read(1, |v| *v).unwrap() })
1839            };
1840
1841            assert_eq!(t1.join().unwrap(), 0x10);
1842            assert_eq!(t2.join().unwrap(), 0x20);
1843        }
1844
1845        #[test]
1846        fn ok_multi_tx_drop_then_reopen_grown() {
1847            let (_dir, path, cfg) = new_tmp();
1848
1849            {
1850                let mmap = sync::Arc::new(FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap());
1851
1852                let mut handles = Vec::new();
1853                for i in 0..2u64 {
1854                    let mmap = mmap.clone();
1855                    handles.push(thread::spawn(move || {
1856                        let _ = unsafe { mmap.write(i as usize, |v| *v = i + 1).unwrap() };
1857                    }));
1858                }
1859
1860                for i in 0..2usize {
1861                    let mmap = mmap.clone();
1862                    handles.push(thread::spawn(move || {
1863                        let _ = unsafe { mmap.read(i, |v| *v).unwrap() };
1864                    }));
1865                }
1866
1867                for h in handles {
1868                    h.join().unwrap();
1869                }
1870            }
1871
1872            {
1873                let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x10).unwrap();
1874                assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x10);
1875
1876                for i in 0..2u64 {
1877                    let val = unsafe { mmap.read(i as usize, |v| *v).unwrap() };
1878                    assert_eq!(val, i + 1);
1879                }
1880
1881                let idx = mmap.total_slots() - 1;
1882                unsafe { mmap.write(idx, |v| *v = 0xDEAD).unwrap() };
1883
1884                let val = unsafe { mmap.read(idx, |v| *v).unwrap() };
1885                assert_eq!(val, 0xDEAD);
1886            }
1887        }
1888    }
1889
1890    mod ack_ticket {
1891        use super::*;
1892
1893        #[test]
1894        fn ok_parallel_waiters_same_epoch_window() {
1895            let (_dir, path, cfg) = new_tmp();
1896            let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
1897
1898            let barrier = sync::Arc::new(sync::Barrier::new(0x10));
1899
1900            let mut handles = Vec::new();
1901            for i in 0..0x10usize {
1902                let mmap = mmap.clone();
1903                let barrier = barrier.clone();
1904
1905                handles.push(thread::spawn(move || {
1906                    barrier.wait();
1907
1908                    let ticket = unsafe { mmap.write(i % INIT_SLOTS, |v| *v = i as u64).unwrap() };
1909
1910                    let epoch = ticket.epoch();
1911                    let durable = futures::executor::block_on(ticket).unwrap();
1912
1913                    assert!(durable >= epoch);
1914                }));
1915            }
1916
1917            for handle in handles {
1918                handle.join().expect("worker thread panicked");
1919            }
1920        }
1921
1922        #[test]
1923        fn ok_parallel_waiters_same_epoch() {
1924            let completion = sync::Arc::new(ack::Completion::default());
1925
1926            let mut handles = Vec::new();
1927            for _ in 0..0x10 {
1928                let completion = completion.clone();
1929
1930                handles.push(thread::spawn(move || {
1931                    let ticket = ack::AckTicket::new(1, completion);
1932
1933                    assert_eq!(
1934                        futures::executor::block_on(ticket).expect("ticket must complete"),
1935                        1
1936                    );
1937                }));
1938            }
1939
1940            thread::sleep(time::Duration::from_millis(0x0A));
1941
1942            completion.mark_epoch_as_durable(1);
1943            completion.notify_all_listeners();
1944
1945            for handle in handles {
1946                handle.join().expect("worker thread panicked");
1947            }
1948        }
1949    }
1950}