Skip to main content

frozen_core/fmmap/
mod.rs

1//! Custom implementation of `mmap(2)`
2//!
3//! ## Constraints
4//!
5//! [`FrozenMMap`] treats the mapped file as raw storage for values of `T`. Because of that,
6//! `T` must be a POD type which is safe to persist and later reinterpret from bytes.
7//!
8//! Required properties for `T`,
9//!
10//! - Must use `#[repr(C)]`
11//! - Should be 8-bytes aligned
12//! - Must not implement [`Drop`]
13//! - `size_of::<T>()` should be multiple of `8`
14//!
15//! *NOTE:* `T` must not contain heap owning or process-local pointers like [`Vec`], [`String`],
16//! [`Box`], references and function pointers, or other fields whose bit-pattern is not stable
17//! across reopen.
18//!
19//! These constrains are enforced as [`FrozenMMap`] does not serialize or deserialize values. It
20//! directly reads and writes `T` inside a memory mapped file. That means `T` must have a stable
21//! layout and must remain valid when the file is reopened in a later process.
22//!
23//! ## Example
24//!
25//! ```
26//! use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
27//!
28//! const MODULE_ID: u8 = 0;
29//!
30//! let dir = tempfile::tempdir().unwrap();
31//! let path = dir.path().join("tmp_frozen_mmap");
32//!
33//! let cfg = FrozenMMapCfg {
34//!     module_id: MODULE_ID,
35//!     initial_count: 0x0A,
36//!     immediate_durability: false,
37//!     flush_duration: std::time::Duration::from_micros(0x96),
38//! };
39//!
40//! let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
41//! assert_eq!(mmap.total_slots(), 0x0A);
42//!
43//! let ticket = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
44//! let _ = futures::executor::block_on(ticket).unwrap();
45//!
46//! let val = unsafe { mmap.read(0, |v| *v) };
47//! assert_eq!(val, 0xDEADC0DE);
48//!
49//! drop(mmap);
50//!
51//! let reopened = FrozenMMap::<u64>::new_grown(&path, cfg, 0x05).unwrap();
52//! assert_eq!(reopened.total_slots(), 0x0A + 0x05);
53//!
54//! let val = unsafe { reopened.read(0, |v| *v) };
55//! assert_eq!(val, 0xDEADC0DE);
56//! ```
57
58#[cfg(any(target_os = "linux", target_os = "macos"))]
59mod posix;
60
61use crate::{
62    ack,
63    error::FrozenResult,
64    ffile::{FrozenFile, FrozenFileCfg},
65    hints,
66};
67use std::{
68    fmt,
69    sync::{self, atomic},
70    thread, time,
71};
72
73/// type for `epoch` used by write ops
74pub type TEpoch = u64;
75
76#[cfg(any(target_os = "linux", target_os = "macos"))]
77type TMap = posix::POSIXMMap;
78
79/// Error codes for [`FrozenMMap`]
80pub(in crate::fmmap) mod err {
81    use crate::error::{ErrCode, FrozenError, FrozenResult};
82
83    /// Domain Id for [`FrozenMMap`] is **18**
84    const ERRDOMAIN: u8 = 0x12;
85
86    /// module id used for [`FrozenMMap`]
87    pub static MID: std::sync::OnceLock<u8> = std::sync::OnceLock::new();
88
89    #[cfg(not(test))]
90    #[inline(always)]
91    pub fn mid() -> &'static u8 {
92        MID.get().unwrap()
93    }
94
95    #[cfg(test)]
96    #[inline(always)]
97    pub fn mid() -> &'static u8 {
98        MID.get_or_init(|| 0)
99    }
100
101    /// internal fuck up (hault and catch fire)
102    pub const HCF: ErrCode = ErrCode::new(0x02, "hault and catch fire");
103
104    /// unknown error (fallback)
105    pub const UNK: ErrCode = ErrCode::new(0x04, "unknown error");
106
107    /// no more memory available
108    pub const NMM: ErrCode = ErrCode::new(0x06, "not enough memory available on the device");
109
110    /// syncing error
111    pub const SYN: ErrCode = ErrCode::new(0x08, "failed to sync/flush data to storage device");
112
113    /// no write/read perm
114    pub const PRM: ErrCode = ErrCode::new(0x0A, "missing permissions for IO");
115
116    /// type `T` must not be zero sized
117    pub const ZRO: ErrCode = ErrCode::new(0x0C, "type T must not be zero sized");
118
119    /// flush_tx error (unable to spawn)
120    pub const FXE: ErrCode = ErrCode::new(0x10, "unable to spawn flush_tx");
121
122    /// type `T` implements drop
123    pub const DRP: ErrCode = ErrCode::new(0x12, "type T must not implement `Drop`");
124
125    /// type `T` is not 8 bytes aligned
126    pub const ALN: ErrCode = ErrCode::new(0x14, "type T must be 8-bytes aligned");
127
128    /// `size_of::<T>()` is not multiple of 8
129    pub const SZE: ErrCode = ErrCode::new(0x16, "`size_of::<T>()` must be multiple of 8 bytes");
130
131    #[inline]
132    pub(in crate::fmmap) fn new_err<R, E: std::fmt::Display>(
133        code: ErrCode,
134        error: E,
135    ) -> FrozenResult<R> {
136        let err = FrozenError::new_raw(*mid(), ERRDOMAIN, code, error);
137        Err(err)
138    }
139
140    #[inline]
141    pub(in crate::fmmap) fn new_err_default<R>(code: ErrCode) -> FrozenResult<R> {
142        let err = FrozenError::new_raw(*mid(), ERRDOMAIN, code, "");
143        Err(err)
144    }
145}
146
147/// Config for [`FrozenMMap`]
148///
149/// ## Example
150///
151/// ```
152/// use frozen_core::fmmap::FrozenMMapCfg;
153///
154/// let cfg = FrozenMMapCfg {
155///     module_id: 0,
156///     initial_count: 0x100,
157///     immediate_durability: false,
158///     flush_duration: std::time::Duration::from_millis(0x0A),
159/// };
160///
161/// assert_eq!(cfg.initial_count, 0x100);
162/// assert!(!cfg.immediate_durability);
163/// assert_eq!(cfg.flush_duration.as_millis(), 0x0A);
164/// ```
165#[derive(Debug, Clone)]
166pub struct FrozenMMapCfg {
167    /// Identifier used for error propagation by [`frozen_core::error::FrozenError`]
168    pub module_id: u8,
169
170    /// Number of slots to pre-allocate when [`FrozenMMap`] is initialized
171    ///
172    /// Each slot has size of [`FrozenMMap::<T>::SLOT_SIZE`], where initial file length will
173    /// be `chunk_size * initial_count` (bytes)
174    pub initial_count: usize,
175
176    /// Time interval used by flusher tx, to batch write ops into a durable window and sync them
177    /// together, where all write ops in certain time interval falls into a single durable window
178    pub flush_duration: time::Duration,
179
180    /// Enables immediate durability scheduing write operations
181    ///
182    /// When disabled, durability is driven by the background flusher thread with respect to
183    /// [`FrozenMMapCfg::flush_duration`] time interval, allowing multiple write calls to be
184    /// bathced and flushed together into a single sync operation, reducing strain on resources.
185    ///
186    /// When enabled, every successful [`FrozenMMap::write`] and [`FrozenMMapTransaction::commit`]
187    /// immediately wakes the background flusher thread, disregarding the
188    /// [`FrozenMMapCfg::flush_duration`] time interval.
189    ///
190    /// This configuration is intended for workloads where writes are rare and durability latency
191    /// is preferred over batching efficiency.
192    pub immediate_durability: bool,
193}
194
195/// Custom implementation of `mmap(2)`
196///
197/// ## Constraints
198///
199/// [`FrozenMMap`] treats the mapped file as raw storage for values of `T`. Because of that, `T`
200/// must be a POD type which is safe to persist and later reinterpret from bytes.
201///
202/// Required properties for `T`,
203///
204/// - Must use `#[repr(C)]`
205/// - Should be 8-bytes aligned
206/// - Must not implement [`Drop`]
207/// - `size_of::<T>()` should be multiple of `8`
208///
209/// *NOTE:* `T` must not contain heap owning or process-local pointers like [`Vec`], [`String`],
210/// [`Box`], references and function pointers, or other fields whose bit-pattern is not stable
211/// across reopen.
212///
213/// These constrains are enforced as [`FrozenMMap`] does not serialize or deserialize values. It
214/// directly reads and writes `T` inside a memory mapped file. That means `T` must have a stable
215/// layout and must remain valid when the file is reopened in a later process.
216///
217/// ## Example
218///
219/// ```
220/// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
221///
222/// const MODULE_ID: u8 = 0;
223///
224/// let dir = tempfile::tempdir().unwrap();
225/// let path = dir.path().join("tmp_frozen_mmap");
226///
227/// let cfg = FrozenMMapCfg {
228///     module_id: MODULE_ID,
229///     initial_count: 0x0A,
230///     immediate_durability: false,
231///     flush_duration: std::time::Duration::from_micros(0x96),
232/// };
233///
234/// let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
235/// assert_eq!(mmap.total_slots(), 0x0A);
236///
237/// let ticket = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
238/// let _ = futures::executor::block_on(ticket).unwrap();
239///
240/// let val = unsafe { mmap.read(0, |v| *v) };
241/// assert_eq!(val, 0xDEADC0DE);
242///
243/// drop(mmap);
244///
245/// let reopened = FrozenMMap::<u64>::new_grown(&path, cfg, 0x05).unwrap();
246/// assert_eq!(reopened.total_slots(), 0x0A + 0x05);
247///
248/// let val = unsafe { reopened.read(0, |v| *v) };
249/// assert_eq!(val, 0xDEADC0DE);
250/// ```
251#[derive(Debug)]
252pub struct FrozenMMap<T>
253where
254    T: Sized + Send + Sync,
255{
256    core: sync::Arc<Core>,
257    flush_tx_handle: Option<thread::JoinHandle<()>>,
258    _t: core::marker::PhantomData<T>,
259}
260
261unsafe impl<T> Send for FrozenMMap<T> where T: Sized + Send + Sync {}
262unsafe impl<T> Sync for FrozenMMap<T> where T: Sized + Send + Sync {}
263
264impl<T> FrozenMMap<T>
265where
266    T: Sized + Send + Sync,
267{
268    /// Memory space required for each slot of [`T`] in [`FrozenMMap`]
269    ///
270    /// ## Example
271    ///
272    /// ```
273    /// use frozen_core::fmmap::FrozenMMap;
274    ///
275    /// assert_eq!(
276    ///     FrozenMMap::<u64>::SLOT_SIZE,
277    ///     std::mem::size_of::<u64>(),
278    /// );
279    /// ```
280    pub const SLOT_SIZE: usize = std::mem::size_of::<T>();
281
282    /// Create a new [`FrozenMMap`] instance w/ given [`FrozenMMapCfg`]
283    ///
284    /// ## Multiple Instances
285    ///
286    /// For each [`FrozenMMap`] instance, we acquire an exclusive lock from the kernal for the
287    /// underlying [`FrozenFile`], when trying to create multiple instances of [`FrozenMMap`], an
288    /// error will be thrown.
289    ///
290    /// ## Capacity Growth
291    ///
292    /// [`FrozenMMap`] does not support in-place growth of a live mapping, to increase capacity,
293    /// drop the current instance and reopen w/ [`FrozenMMap::open_grown`] which provides memory
294    /// mapping over grown capacity.
295    ///
296    /// ## [`FrozenMMapCfg`]
297    ///
298    /// All configs for [`FrozenMMap`] are stored in [`FrozenMMapCfg`]
299    ///
300    /// ## Working
301    ///
302    /// We first create a new [`FrozenFile`] if note already, then map the entire file using
303    /// `mmap(2)`, the entire file must read/write `T`, which also should stay constant for the
304    /// entire lifetime of file.
305    ///
306    /// ## Important
307    ///
308    /// The `cfg` must not change any of its properties for the entire life of [`FrozenFile`],
309    /// which is used under the hood, one must use config stores like [`Rta`](https://crates.io/crates/rta)
310    /// to store config.
311    ///
312    /// ## Example
313    ///
314    /// ```
315    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
316    ///
317    /// const MODULE_ID: u8 = 0;
318    ///
319    /// let dir = tempfile::tempdir().unwrap();
320    /// let path = dir.path().join("tmp_frozen_mmap");
321    ///
322    /// let cfg = FrozenMMapCfg {
323    ///     module_id: MODULE_ID,
324    ///     initial_count: 0x0A,
325    ///     immediate_durability: false,
326    ///     flush_duration: std::time::Duration::from_micros(0x96),
327    /// };
328    ///
329    /// let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
330    /// assert_eq!(mmap.total_slots(), 0x0A);
331    ///
332    /// let ticket = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
333    /// let _ = futures::executor::block_on(ticket).unwrap();
334    ///
335    /// let val = unsafe { mmap.read(0, |v| *v) };
336    /// assert_eq!(val, 0xDEADC0DE);
337    /// ```
338    pub fn new<P: AsRef<std::path::Path>>(path: P, cfg: FrozenMMapCfg) -> FrozenResult<Self> {
339        Self::validate_t()?;
340        let (file, curr_length) = Self::open_file(path.as_ref().to_path_buf(), &cfg)?;
341        let total_slots = curr_length / Self::SLOT_SIZE;
342
343        // NOTE: The value is used for error logging and is initialized only once, as `OnceLock`
344        // guarantees that the first caller sets the value and all subsequent calls reuse it
345        let _ = err::MID.get_or_init(|| cfg.module_id);
346
347        let mmap = unsafe { TMap::new(file.fd(), curr_length) }?;
348        let core = sync::Arc::new(Core::new(
349            mmap,
350            file,
351            curr_length,
352            total_slots,
353            cfg.immediate_durability,
354        ));
355
356        let cloned_core = sync::Arc::clone(&core);
357        let flush_tx_handle = match thread::Builder::new()
358            .name(format!("mod{}_fmmap_flush_tx", cfg.module_id))
359            .spawn(move || bg_flush_thread(cloned_core, cfg.flush_duration))
360        {
361            Ok(handle) => Some(handle),
362            Err(observed_error) => {
363                return err::new_err(err::FXE, observed_error);
364            }
365        };
366
367        Ok(Self { core, flush_tx_handle, _t: core::marker::PhantomData })
368    }
369
370    /// Create a new [`FrozenMMap`] instance w/ given [`FrozenMMapCfg`], while growing the
371    /// underlying [`FrozenFile`] by `additional_slots` before creating memory mapping
372    ///
373    /// ## Multiple Instances
374    ///
375    /// For each [`FrozenMMap`] instance, we acquire an exclusive lock from the kernal for the
376    /// underlying [`FrozenFile`], when trying to create multiple instances of [`FrozenMMap`], an
377    /// error will be thrown.
378    ///
379    /// ## Why not create a [`FrozenMMap::grow`] call?
380    ///
381    /// Previously when [`FrozenMMap::grow`] was attempted, it was observed that, resizing an active
382    /// memory mapping in place is tricky in concurrent code, as some threads would still hold
383    /// stale/unmapped pointers to mmap due to preemption from the OS schedular
384    ///
385    /// So, instead of remmaping a live instance, the current API performs growth during open,
386    /// making capacity expansion an explicit lifecycle operation, and not a side effect on a live
387    /// instance.
388    ///
389    /// ## [`FrozenMMapCfg`]
390    ///
391    /// All configs for [`FrozenMMap`] are stored in [`FrozenMMapCfg`]
392    ///
393    /// ## Working
394    ///
395    /// We first create a new [`FrozenFile`] if note already, then we grow the file using
396    /// [`FrozenFile::grow`] then map the entire file using `mmap(2)`, the entire file must
397    /// read/write `T`, which also should stay constant for the entire lifetime of file.
398    ///
399    /// ## Important
400    ///
401    /// The `cfg` must not change any of its properties for the entire life of [`FrozenFile`],
402    /// which is used under the hood, one must use config stores like
403    /// [`Rta`](https://crates.io/crates/rta) to store config.
404    ///
405    /// ## Example
406    ///
407    /// ```
408    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
409    ///
410    /// const MODULE_ID: u8 = 0;
411    ///
412    /// let dir = tempfile::tempdir().unwrap();
413    /// let path = dir.path().join("tmp_frozen_mmap");
414    ///
415    /// let cfg = FrozenMMapCfg {
416    ///     module_id: MODULE_ID,
417    ///     initial_count: 0x0A,
418    ///     immediate_durability: false,
419    ///     flush_duration: std::time::Duration::from_micros(0x96),
420    /// };
421    ///
422    /// let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x0A).unwrap();
423    /// assert_eq!(mmap.total_slots(), 0x0A * 2);
424    ///
425    /// let ticket = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
426    /// let _ = futures::executor::block_on(ticket).unwrap();
427    ///
428    /// let val = unsafe { mmap.read(0, |v| *v) };
429    /// assert_eq!(val, 0xDEADC0DE);
430    /// ```
431    pub fn new_grown<P: AsRef<std::path::Path>>(
432        path: P,
433        cfg: FrozenMMapCfg,
434        additional_slots: usize,
435    ) -> FrozenResult<Self> {
436        Self::validate_t()?;
437        let (file, _) = Self::open_file(path.as_ref().to_path_buf(), &cfg)?;
438
439        // we grow the underlying FrozenFile as requested
440        file.grow(additional_slots)?;
441        let curr_length = file.length()?; // we must read the updated (grown) length
442        let total_slots = curr_length / Self::SLOT_SIZE;
443
444        // NOTE: The value is used for error logging and is initialized only once, as `OnceLock`
445        // guarantees that the first caller sets the value and all subsequent calls reuse it
446        let _ = err::MID.get_or_init(|| cfg.module_id);
447
448        let mmap = unsafe { TMap::new(file.fd(), curr_length) }?;
449        let core = sync::Arc::new(Core::new(
450            mmap,
451            file,
452            curr_length,
453            total_slots,
454            cfg.immediate_durability,
455        ));
456
457        let cloned_core = sync::Arc::clone(&core);
458        let flush_tx_handle = match thread::Builder::new()
459            .name(format!("mod{}_fmmap_flush_tx", cfg.module_id))
460            .spawn(move || bg_flush_thread(cloned_core, cfg.flush_duration))
461        {
462            Ok(handle) => Some(handle),
463            Err(observed_error) => {
464                return err::new_err(err::FXE, observed_error);
465            }
466        };
467
468        Ok(Self { core, flush_tx_handle, _t: core::marker::PhantomData })
469    }
470
471    /// Create/open a new [`FrozenFile`] instance
472    fn open_file(
473        path: std::path::PathBuf,
474        cfg: &FrozenMMapCfg,
475    ) -> FrozenResult<(FrozenFile, usize)> {
476        let ff_cfg = FrozenFileCfg {
477            path,
478            module_id: cfg.module_id,
479            buffer_size: Self::SLOT_SIZE,
480            initial_available_buffers: cfg.initial_count,
481        };
482
483        let file = FrozenFile::new(ff_cfg)?;
484        let curr_length = file.length()?;
485
486        Ok((file, curr_length))
487    }
488
489    #[inline]
490    fn validate_t() -> FrozenResult<()> {
491        if std::mem::needs_drop::<T>() {
492            return err::new_err_default(err::DRP);
493        }
494
495        let align = std::mem::align_of::<T>();
496        if align != 8 {
497            return err::new_err_default(err::ALN);
498        }
499
500        let size = std::mem::size_of::<T>();
501        if size == 0 {
502            return err::new_err_default(err::ZRO);
503        }
504
505        if size % 8 != 0 {
506            return err::new_err_default(err::SZE);
507        }
508
509        Ok(())
510    }
511
512    /// Read a `T` at given `index` via callback (`f`)
513    ///
514    /// ## Concurrency
515    ///
516    /// Internally, [`FrozenMMap`] implements per-slot locking, so concurrent reads and writes for
517    /// at same index is atomic and thread safe, while operations on different indices may proceed
518    /// fully in parallel
519    ///
520    /// ## Safety
521    ///
522    /// The caller must ensure following:
523    ///
524    /// - given `index` is within bounds
525    /// - underlying memory contains a valid instance of `T`
526    /// - provided callback `f` must not,
527    ///   - write through the pointer
528    ///   - store or leak pointer beyound there lifetime
529    ///
530    /// Violating any of the above may result in undefined behavior
531    ///
532    /// ## Example
533    ///
534    /// ```
535    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
536    ///
537    /// const MODULE_ID: u8 = 0;
538    ///
539    /// let dir = tempfile::tempdir().unwrap();
540    /// let path = dir.path().join("tmp_read_mmap");
541    ///
542    /// let cfg = FrozenMMapCfg {
543    ///     module_id: MODULE_ID,
544    ///     initial_count: 0x02,
545    ///     immediate_durability: false,
546    ///     flush_duration: std::time::Duration::from_micros(0x60),
547    /// };
548    ///
549    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
550    ///
551    /// let ticket = unsafe { mmap.write(0, |v| *v = 0x0A) }.unwrap();
552    /// let _ = futures::executor::block_on(ticket).unwrap();
553    ///
554    /// let val = unsafe { mmap.read(0, |v| *v) };
555    /// assert_eq!(val, 0x0A);
556    /// ```
557    #[inline(always)]
558    pub unsafe fn read<R>(&self, index: usize, f: impl FnOnce(*const T) -> R) -> R {
559        let offset = Self::SLOT_SIZE * index;
560        let _lock = self.core.locks.lock(index);
561
562        // NOTE: We do avoid acquiring io_lock for read ops to increase the throughput (under the
563        // assumption of OS guarantees visibility)
564
565        let ptr = unsafe { self.core.map.as_ptr(offset) };
566        f(ptr)
567    }
568
569    /// Write/update a `T` at given `index` via callback (`f`)
570    ///
571    /// ## Concurrency
572    ///
573    /// Internally, [`FrozenMMap`] implements per-slot locking, so concurrent reads and writes for
574    /// at same index is atomic and thread safe, while operations on different indices may proceed
575    /// fully in parallel
576    ///
577    /// ## Safety
578    ///
579    /// The caller must ensure following:
580    ///
581    /// - given `index` is within bounds
582    /// - underlying memory contains a valid instance of `T`
583    /// - provided callback `f` must not  store or leak pointer beyound there lifetime
584    ///
585    /// Violating any of the above may result in undefined behavior
586    ///
587    /// ## Example
588    ///
589    /// ```
590    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
591    ///
592    /// const MODULE_ID: u8 = 0;
593    ///
594    /// let dir = tempfile::tempdir().unwrap();
595    /// let path = dir.path().join("tmp_write_mmap");
596    ///
597    /// let cfg = FrozenMMapCfg {
598    ///     module_id: MODULE_ID,
599    ///     initial_count: 0x02,
600    ///     immediate_durability: false,
601    ///     flush_duration: std::time::Duration::from_micros(0x96),
602    /// };
603    ///
604    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
605    ///
606    /// let ticket = unsafe {mmap.write(1, |v| *v = 0x2B) }.unwrap();
607    /// let _ = futures::executor::block_on(ticket).unwrap();
608    ///
609    /// let val = unsafe { mmap.read(1, |v| *v) };
610    /// assert_eq!(val, 0x2B);
611    /// ```
612    #[inline(always)]
613    pub unsafe fn write(
614        &self,
615        index: usize,
616        f: impl FnOnce(*mut T),
617    ) -> FrozenResult<ack::AckTicket> {
618        // propagate prev errors
619        if let Some(err) = self.core.completion.get_err() {
620            return Err(err);
621        }
622
623        let offset = Self::SLOT_SIZE * index;
624
625        let _guard = self.core.acquire_io_lock();
626        let _lock = self.core.locks.lock(index);
627
628        let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
629        f(ptr);
630
631        self.core.dirty.store(true, atomic::Ordering::Release);
632        if self.core.immediate_durability {
633            self.core.cv.notify_one();
634        }
635
636        let epoch = self.core.completion.increment_current_epoch();
637        Ok(ack::AckTicket::new(epoch, self.core.completion.clone()))
638    }
639
640    /// Read current available count of slots, where each slot has size of [`FrozenMMap::<T>::SLOT_SIZE`]
641    ///
642    /// ## Working
643    ///
644    /// This call performs a syscall to fetch current length of [`FrozenFile`] from fs, as the
645    /// current length of the file is not cached anywhere in the pipeline to avoid TOCTAU race
646    /// conditions
647    ///
648    /// ## Example
649    ///
650    /// ```
651    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
652    ///
653    /// const MODULE_ID: u8 = 0;
654    ///
655    /// let dir = tempfile::tempdir().unwrap();
656    /// let path = dir.path().join("tmp_grow_mmap");
657    ///
658    /// let cfg = FrozenMMapCfg {
659    ///     module_id: MODULE_ID,
660    ///     initial_count: 0x02,
661    ///     immediate_durability: false,
662    ///     flush_duration: std::time::Duration::from_micros(0x96),
663    /// };
664    ///
665    /// let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
666    /// assert_eq!(mmap.total_slots(), 0x02);
667    ///
668    /// drop(mmap);
669    ///
670    /// let mmap = FrozenMMap::<u64>::new_grown(&path, cfg, 0x03).unwrap();
671    /// assert_eq!(mmap.total_slots(), 0x02 + 0x03);
672    /// ```
673    #[inline]
674    pub fn total_slots(&self) -> usize {
675        self.core.curr_length / Self::SLOT_SIZE
676    }
677
678    /// Read the total memory footprint (in bytes) used by [`FrozenMMap`]
679    ///
680    /// *NOTE:* This is an approzimation of memory used, actual RSS may differ depending on paging
681    /// and OS
682    ///
683    /// ## Example
684    ///
685    /// ```
686    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
687    ///
688    /// const MODULE_ID: u8 = 0;
689    ///
690    /// let dir = tempfile::tempdir().unwrap();
691    /// let path = dir.path().join("tmp_mem_usage");
692    ///
693    /// let cfg = FrozenMMapCfg {
694    ///     module_id: MODULE_ID,
695    ///     initial_count: 0x10,
696    ///     immediate_durability: false,
697    ///     flush_duration: std::time::Duration::from_micros(0x60),
698    /// };
699    ///
700    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
701    ///
702    /// let bytes = mmap.memory_usage();
703    /// assert!(bytes >= mmap.total_slots() * std::mem::size_of::<u64>());
704    /// ```
705    #[inline]
706    pub fn memory_usage(&self) -> usize {
707        // memory of mem-maped region
708        let mmap_bytes = self.core.curr_length;
709
710        // memory used for locking
711        let lock_bytes = self.core.locks.0.len() * std::mem::size_of::<atomic::AtomicU8>();
712
713        mmap_bytes + lock_bytes
714    }
715
716    /// Create a new [`FrozenMMapTransaction`] context for grouping multi write ops into a single atomic
717    /// operation
718    ///
719    /// ## Overview
720    ///
721    /// The use of [`FrozenMMapTransaction`] allows to group multiple write ops into a single atomic
722    /// operation, hence creating a transactional write operation, which gives following guarantees:
723    ///
724    /// - All write ops succeed together
725    /// - Single epoch to track durability of all writes ops
726    /// - Same durability guarantee for all the included write ops
727    ///
728    /// Simply, this preserves atomic durability semantics for multi index updates
729    ///
730    /// ## Example
731    ///
732    /// ```
733    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
734    ///
735    /// const MODULE_ID: u8 = 0;
736    ///
737    /// let dir = tempfile::tempdir().unwrap();
738    /// let path = dir.path().join("tmp_tx");
739    ///
740    /// let cfg = FrozenMMapCfg {
741    ///     module_id: MODULE_ID,
742    ///     initial_count: 0x0A,
743    ///     immediate_durability: false,
744    ///     flush_duration: std::time::Duration::from_micros(50),
745    /// };
746    ///
747    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
748    ///
749    /// let mut tx = mmap.new_tx();
750    /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
751    /// unsafe { tx.write(1, |v| *v = 0x14) }.unwrap();
752    ///
753    /// let ticket = tx.commit().unwrap();
754    /// let _ = futures::executor::block_on(ticket).unwrap();
755    ///
756    /// let v0 = unsafe { mmap.read(0, |v| *v) };
757    /// let v1 = unsafe { mmap.read(1, |v| *v) };
758    ///
759    /// assert_eq!((v0, v1), (0x0A, 0x14));
760    /// ```
761    #[inline]
762    pub fn new_tx(&self) -> FrozenMMapTransaction<'_, T> {
763        FrozenMMapTransaction { core: &self.core, ops_vec: Vec::new() }
764    }
765
766    /// Delete the underlying [`FrozenFile`] used for [`FrozenMMap`] from fs
767    ///
768    /// ## Working
769    ///
770    /// When `delete` is called, all read, write, and (background) sync ops are paused
771    /// (indefinitely), whule deletion is done with following steps:
772    ///
773    /// - acquire an exclusive `io_lock` (all other ops are paused indefinitely)
774    /// - if any batch is pending for sync,
775    ///   - swap the flag
776    ///   - call sync manually
777    ///   - incr epoch and update cv
778    /// - brodcast closing so flusher tx could wrap up
779    /// - `munmap(2)` current mapping
780    /// - call delete on [`FrozenFile`]
781    ///
782    /// ## Example
783    ///
784    /// ```
785    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
786    ///
787    /// const MODULE_ID: u8 = 0;
788    ///
789    /// let dir = tempfile::tempdir().unwrap();
790    /// let path = dir.path().join("tmp_delete_mmap");
791    ///
792    /// let cfg = FrozenMMapCfg {
793    ///     module_id: MODULE_ID,
794    ///     initial_count: 0x04,
795    ///     immediate_durability: false,
796    ///     flush_duration: std::time::Duration::from_micros(0x96),
797    /// };
798    ///
799    /// let mut mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
800    /// mmap.delete().unwrap();
801    /// assert!(!path.exists());
802    /// ```
803    pub fn delete(&mut self) -> FrozenResult<()> {
804        // NOTE: we must broadcast that the close is happening to allow flusher tx to wrap up
805        self.core.closed.store(true, atomic::Ordering::Release);
806        if let Some(handle) = self.flush_tx_handle.take() {
807            let _ = handle.join();
808        }
809
810        // pause all new write ops
811        let _lock = self.core.acquire_exclusive_io_lock();
812
813        self.munmap()?;
814        self.core.file.delete()
815    }
816
817    /// Flush the dirty mmap data to persistant storage
818    ///
819    /// *NOTE:* This call must be paired w/ [`FrozenMMap::flush_file`] for stronger durability
820    /// guarantee
821    ///
822    /// ## Example
823    ///
824    /// ```
825    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
826    ///
827    /// const MODULE_ID: u8 = 0;
828    ///
829    /// let dir = tempfile::tempdir().unwrap();
830    /// let path = dir.path().join("tmp_delete_mmap");
831    ///
832    /// let cfg = FrozenMMapCfg {
833    ///     module_id: MODULE_ID,
834    ///     initial_count: 0x04,
835    ///     immediate_durability: false,
836    ///     flush_duration: std::time::Duration::from_micros(0x96),
837    /// };
838    ///
839    /// let mut mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
840    /// assert!(unsafe { mmap.flush_mmap() }.is_ok());
841    /// ```
842    #[inline]
843    pub unsafe fn flush_mmap(&self) -> FrozenResult<()> {
844        self.core.map.sync(self.core.curr_length)
845    }
846
847    /// Perform hard flush for the dirty mmap'ed data for stronger durability guarantee
848    ///
849    /// ## Example
850    ///
851    /// ```
852    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
853    ///
854    /// const MODULE_ID: u8 = 0;
855    ///
856    /// let dir = tempfile::tempdir().unwrap();
857    /// let path = dir.path().join("tmp_delete_mmap");
858    ///
859    /// let cfg = FrozenMMapCfg {
860    ///     module_id: MODULE_ID,
861    ///     initial_count: 0x04,
862    ///     immediate_durability: false,
863    ///     flush_duration: std::time::Duration::from_micros(0x96),
864    /// };
865    ///
866    /// let mut mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
867    /// assert!(unsafe { mmap.flush_file() }.is_ok());
868    /// ```
869    #[inline]
870    pub unsafe fn flush_file(&self) -> FrozenResult<()> {
871        self.core.file.sync()
872    }
873
874    #[inline]
875    fn munmap(&self) -> FrozenResult<()> {
876        let length = self.core.curr_length;
877        unsafe { self.core.map.unmap(length) }
878    }
879}
880
881impl<T> Drop for FrozenMMap<T>
882where
883    T: Sized + Send + Sync,
884{
885    fn drop(&mut self) {
886        let is_closed = self.core.closed.swap(true, atomic::Ordering::Release);
887        self.core.cv.notify_one(); // notify flusher tx to shut
888
889        if let Some(handle) = self.flush_tx_handle.take() {
890            let _ = handle.join();
891        }
892
893        // we must acquire an exclusive lock, to prevent dropping while sync, growing or any io ops
894        let _io_lock = self.core.acquire_exclusive_io_lock();
895
896        if !is_closed {
897            let _ = self.munmap();
898        }
899    }
900}
901
902impl<T> fmt::Display for FrozenMMap<T>
903where
904    T: Sized + Send + Sync,
905{
906    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
907        write!(
908            f,
909            "FrozenMMap{{fd: {}, total_slots: {}, len: {}}}",
910            self.core.file.fd(),
911            self.total_slots(),
912            self.core.curr_length,
913        )
914    }
915}
916
917/// ## Why we ignore [`std::sync::PoisonError`]?
918///
919/// The mutex used for lock, is solely used as a parking primitive for [`Condvar`] and does not
920/// protect any mutable state. All the pool invariants and accounting are maintained via atomics
921/// and are completely seperated from the mutex.
922///
923/// A poisoned mutex only indicates that another tx panicked while holding the lock, and indicates
924/// an inconsistent state of the protected value. Since no state can be left partially modified
925/// under this lock, there is no possible consistency risk to recover from and propagating the
926/// poison error would only introduce unnecessary failures into the allocation path.
927///
928/// Therefore, as best effort, we consume the [`std::sync::PoisonError`] and continue operating
929/// with the recovered guard.
930fn bg_flush_thread(core: sync::Arc<Core>, flush_duration: time::Duration) {
931    // init phase (acquiring locks)
932    let mut guard = core.lock.lock().unwrap_or_else(|e| e.into_inner());
933
934    // sync loop w/ non-busy waiting
935    loop {
936        (guard, _) = core.cv.wait_timeout(guard, flush_duration).unwrap_or_else(|e| e.into_inner());
937
938        // NOTE: we must read values of close brodcast before acquire exclusive lock, if done
939        // otherwise, we impose serious deadlock sort of situation for the the flusher tx
940
941        let dirty = core.dirty.swap(false, atomic::Ordering::AcqRel);
942        let closing = core.closed.load(atomic::Ordering::Acquire);
943
944        if !dirty {
945            if closing {
946                core.cv.notify_all();
947                return;
948            }
949
950            continue;
951        }
952
953        // INFO: we must acquire an exclusive IO lock for sync, hence no write, read or
954        // grow could kick in while sync is in progress
955
956        let io_lock = core.acquire_exclusive_io_lock();
957
958        // INFO: we must drop the guard before syscall, as its a blocking operation and holding
959        // the mutex while the syscall takes place is not a good idea, while we drop the mutex
960        // and acqurie it again, in-between other process could acquire it and use it
961        drop(guard);
962
963        // NOTE: We snapshot `current_epoch` before sync to establish a strict batch boundary,
964        // all writes up to `target_epoch` are guaranteed to be part of this flush window,
965        // while anything beyound belongs to the next batch
966        let target_epoch = core.completion.read_current_epoch();
967
968        // NOTE:
969        //
970        // - if sync fails, we update the Core::error w/ the received error object
971        // - we clear it up when another sync call succeeds
972        // - this is valid, as the underlying sync flushes entire mmaped region, hence
973        //   even if the last call failed, and the new one succeeded, we do get the durability
974        //   guarenty for the old data as well
975
976        if let Err(new_error) = core.sync() {
977            core.completion.set_err(new_error);
978        } else {
979            core.completion.mark_epoch_as_durable(target_epoch);
980            core.completion.del_err();
981        }
982
983        // NOTE: notify all listeners currently waiting for durability progress
984        core.completion.notify_all_listeners();
985
986        drop(io_lock);
987        guard = core.acquire_lock();
988    }
989}
990
991/// A context for grouping multi write ops into a single atomic operation
992///
993/// ## Overview
994///
995/// Use of [`FrozenMMapTransaction`] allows to group multiple write ops into a single atomic operation.
996///
997/// - All included writes are applied together
998/// - Single epoch is assinged for an entier transaction
999/// - Durability guarantee is same for all included write ops
1000///
1001/// ## Example
1002///
1003/// ```
1004/// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
1005///
1006/// const MODULE_ID: u8 = 0;
1007///
1008/// let dir = tempfile::tempdir().unwrap();
1009/// let path = dir.path().join("tmp_tx");
1010///
1011/// let cfg = FrozenMMapCfg {
1012///     module_id: MODULE_ID,
1013///     initial_count: 0x0A,
1014///     immediate_durability: false,
1015///     flush_duration: std::time::Duration::from_micros(50),
1016/// };
1017///
1018/// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1019///
1020/// let mut tx = mmap.new_tx();
1021/// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
1022/// unsafe { tx.write(1, |v| *v = 0x14) }.unwrap();
1023/// unsafe { tx.write(2, |v| *v = 0x18) }.unwrap();
1024///
1025/// let ticket = tx.commit().unwrap();
1026/// let _ = futures::executor::block_on(ticket).unwrap();
1027///
1028/// let v0 = unsafe { mmap.read(0, |v| *v) };
1029/// let v1 = unsafe { mmap.read(1, |v| *v) };
1030/// let v2 = unsafe { mmap.read(2, |v| *v) };
1031///
1032/// assert_eq!((v0, v1, v2), (0x0A, 0x14, 0x18));
1033/// ```
1034pub struct FrozenMMapTransaction<'a, T> {
1035    core: &'a Core,
1036    ops_vec: Vec<(usize, Box<dyn FnOnce(*mut T) + 'a>)>,
1037}
1038
1039impl<'a, T> FrozenMMapTransaction<'a, T> {
1040    /// Append a write op into the [`FrozenMMapTransaction`]
1041    ///
1042    /// ## Requirements
1043    ///
1044    /// Write ops must follow these safety requirements,
1045    ///
1046    /// - No duplicate indices
1047    /// - No out-of-order writes (must be incremental)
1048    ///
1049    /// Violating this constraint would result in [`FrozenError`]
1050    ///
1051    /// ## Safety
1052    ///
1053    /// Same safety requirements as [`FrozenMMap::write`] apply here
1054    ///
1055    /// ## Example
1056    ///
1057    /// ```
1058    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
1059    ///
1060    /// const MODULE_ID: u8 = 0;
1061    ///
1062    /// let dir = tempfile::tempdir().unwrap();
1063    /// let path = dir.path().join("tmp_tx");
1064    ///
1065    /// let cfg = FrozenMMapCfg {
1066    ///     module_id: MODULE_ID,
1067    ///     initial_count: 0x10,
1068    ///     immediate_durability: false,
1069    ///     flush_duration: std::time::Duration::from_micros(50),
1070    /// };
1071    ///
1072    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1073    ///
1074    /// let mut tx = mmap.new_tx();
1075    /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
1076    /// unsafe { tx.write(1, |v| *v = 0x0B) }.unwrap();
1077    /// unsafe { tx.write(2, |v| *v = 0x0C) }.unwrap();
1078    ///
1079    /// let ticket = tx.commit().unwrap();
1080    /// let _ = futures::executor::block_on(ticket).unwrap();
1081    ///
1082    /// let v0 = unsafe { mmap.read(0, |v| *v) };
1083    /// let v1 = unsafe { mmap.read(1, |v| *v) };
1084    /// let v2 = unsafe { mmap.read(2, |v| *v) };
1085    ///
1086    /// assert_eq!((v0, v1, v2), (0x0A, 0x0B, 0x0C));
1087    /// ```
1088    #[inline(always)]
1089    pub unsafe fn write<F>(&mut self, index: usize, f: F) -> FrozenResult<()>
1090    where
1091        F: FnOnce(*mut T) + 'a,
1092    {
1093        // NOTE:
1094        //
1095        // This check prevents a potential footgun! For a safe transaction all writes must be,
1096        // - ordered by index (either incr or decr)
1097        // - no multi writes on same index
1098        //
1099        // If any of these is violated, there is a certain risk of deadlock in multi tx env's
1100        if let Some((last_idx, _)) = self.ops_vec.last() {
1101            if index <= *last_idx {
1102                return err::new_err(
1103                    err::HCF,
1104                    "tx writes must be strictly ordered, with no more then single ops on given index",
1105                );
1106            }
1107        }
1108
1109        self.ops_vec.push((index, Box::new(f)));
1110        Ok(())
1111    }
1112
1113    /// Commit the transaction, applying all the writes ops, combined into a single atomic operation
1114    ///
1115    /// ## Guarantees
1116    ///
1117    /// - All writes are applied under a single epoch
1118    /// - All writes belong to the same durability batch
1119    /// - No interleaving with other transactions at epoch level
1120    ///
1121    /// ## Example
1122    ///
1123    /// ```
1124    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
1125    ///
1126    /// const MODULE_ID: u8 = 0;
1127    ///
1128    /// let dir = tempfile::tempdir().unwrap();
1129    /// let path = dir.path().join("tmp_tx");
1130    ///
1131    /// let cfg = FrozenMMapCfg {
1132    ///     module_id: MODULE_ID,
1133    ///     initial_count: 0x10,
1134    ///     immediate_durability: false,
1135    ///     flush_duration: std::time::Duration::from_micros(50),
1136    /// };
1137    ///
1138    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1139    ///
1140    /// let mut tx = mmap.new_tx();
1141    /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
1142    /// unsafe { tx.write(2, |v| *v = 0x0C) }.unwrap();
1143    ///
1144    /// let ticket = tx.commit().unwrap();
1145    /// let _ = futures::executor::block_on(ticket).unwrap();
1146    ///
1147    /// let v0 = unsafe { mmap.read(0, |v| *v) };
1148    /// let v1 = unsafe { mmap.read(2, |v| *v) };
1149    ///
1150    /// assert_eq!((v0, v1), (0x0A, 0x0C));
1151    /// ```
1152    #[inline(always)]
1153    pub fn commit(self) -> FrozenResult<ack::AckTicket> {
1154        if let Some(err) = self.core.completion.get_err() {
1155            return Err(err);
1156        }
1157
1158        let _guard = self.core.acquire_io_lock();
1159
1160        // NOTE: we must acquire all locks beforehand, to make sure all the write go through
1161        let mut guards = Vec::with_capacity(self.ops_vec.len());
1162        for (idx, _) in &self.ops_vec {
1163            guards.push(self.core.locks.lock(*idx));
1164        }
1165
1166        for (idx, op) in self.ops_vec {
1167            let offset = idx * std::mem::size_of::<T>();
1168            let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
1169            op(ptr);
1170        }
1171
1172        self.core.dirty.store(true, atomic::Ordering::Release);
1173        if self.core.immediate_durability {
1174            self.core.cv.notify_one();
1175        }
1176
1177        let epoch = self.core.completion.increment_current_epoch();
1178        Ok(ack::AckTicket::new(epoch, self.core.completion.clone()))
1179    }
1180}
1181
1182#[derive(Debug)]
1183struct Core {
1184    completion: sync::Arc<ack::Completion>,
1185    cv: sync::Condvar,
1186    curr_length: usize,
1187    dirty: atomic::AtomicBool,
1188    io_lock: sync::RwLock<()>,
1189    immediate_durability: bool,
1190    map: TMap,
1191    locks: Locks,
1192    lock: sync::Mutex<()>,
1193    file: FrozenFile,
1194    closed: atomic::AtomicBool,
1195}
1196
1197unsafe impl Send for Core {}
1198unsafe impl Sync for Core {}
1199
1200impl Core {
1201    fn new(
1202        map: TMap,
1203        file: FrozenFile,
1204        curr_length: usize,
1205        total_slots: usize,
1206        immediate_durability: bool,
1207    ) -> Self {
1208        Self {
1209            map,
1210            file,
1211            curr_length,
1212            immediate_durability,
1213            completion: sync::Arc::new(ack::Completion::default()),
1214            cv: sync::Condvar::new(),
1215            lock: sync::Mutex::new(()),
1216            io_lock: sync::RwLock::new(()),
1217            locks: Locks::new(total_slots),
1218            dirty: atomic::AtomicBool::new(false),
1219            closed: atomic::AtomicBool::new(false),
1220        }
1221    }
1222
1223    #[inline]
1224    fn sync(&self) -> FrozenResult<()> {
1225        unsafe { self.map.sync(self.curr_length) }?;
1226        self.file.sync()
1227    }
1228
1229    /// NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
1230    #[inline]
1231    fn acquire_lock(&self) -> sync::MutexGuard<'_, ()> {
1232        self.lock.lock().unwrap_or_else(|e| e.into_inner())
1233    }
1234
1235    /// NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
1236    #[inline]
1237    fn acquire_io_lock(&self) -> sync::RwLockReadGuard<'_, ()> {
1238        self.io_lock.read().unwrap_or_else(|e| e.into_inner())
1239    }
1240
1241    /// NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
1242    #[inline]
1243    fn acquire_exclusive_io_lock(&self) -> sync::RwLockWriteGuard<'_, ()> {
1244        self.io_lock.write().unwrap_or_else(|e| e.into_inner())
1245    }
1246}
1247
1248#[derive(Debug)]
1249struct Locks(Box<[atomic::AtomicU8]>);
1250
1251impl Locks {
1252    const LOCK: u8 = 1;
1253    const UNLOCK: u8 = 0;
1254
1255    const L1_CONTENTION: usize = 0x10;
1256    const L2_CONTENTION: usize = 0x20;
1257
1258    fn new(cap: usize) -> Self {
1259        let mut slots = Vec::with_capacity(cap);
1260        for _ in 0..cap {
1261            slots.push(atomic::AtomicU8::new(Self::UNLOCK));
1262        }
1263
1264        Self(slots.into_boxed_slice())
1265    }
1266
1267    #[inline(always)]
1268    fn lock(&self, index: usize) -> LockGuard<'_> {
1269        let val = &self.0[index];
1270        let mut spins = 0;
1271
1272        loop {
1273            if val
1274                .compare_exchange_weak(
1275                    Self::UNLOCK,
1276                    Self::LOCK,
1277                    atomic::Ordering::Acquire,
1278                    atomic::Ordering::Relaxed,
1279                )
1280                .is_ok()
1281            {
1282                return LockGuard(val);
1283            }
1284
1285            // we must backoff under contention
1286
1287            // NOTE:
1288            //
1289            // We have three levels of backoff,
1290            //
1291            // 1. Busy waiting w/ CPU hint
1292            // 2. Willingly allow preemption by OS schedular
1293            // 3. Sleep the thread (increasing w/ exponenial factor)
1294
1295            if hints::likely(spins < Self::L1_CONTENTION) {
1296                std::hint::spin_loop();
1297            } else if spins < Self::L2_CONTENTION {
1298                thread::yield_now();
1299            } else {
1300                let ns = 0x30 << (spins - Self::L2_CONTENTION).min(0x0A);
1301                thread::sleep(time::Duration::from_nanos(ns));
1302            }
1303
1304            spins += 1;
1305        }
1306    }
1307}
1308
1309struct LockGuard<'a>(&'a atomic::AtomicU8);
1310
1311impl Drop for LockGuard<'_> {
1312    fn drop(&mut self) {
1313        self.0.store(Locks::UNLOCK, atomic::Ordering::Release);
1314    }
1315}
1316
1317#[cfg(test)]
1318mod tests {
1319    use super::*;
1320
1321    // NOTE: we keep this small on purpose, so we won't have to wait at all in tests
1322    const FLUSH_DURATION: time::Duration = time::Duration::from_micros(10);
1323
1324    const MODULE_ID: u8 = 0;
1325    const INIT_SLOTS: usize = 0x0A;
1326
1327    fn new_tmp() -> (tempfile::TempDir, std::path::PathBuf, FrozenMMapCfg) {
1328        let dir = tempfile::tempdir().unwrap();
1329        let path = dir.path().join("tmp_map");
1330
1331        let cfg = FrozenMMapCfg {
1332            module_id: MODULE_ID,
1333            initial_count: INIT_SLOTS,
1334            immediate_durability: false,
1335            flush_duration: FLUSH_DURATION,
1336        };
1337
1338        (dir, path, cfg)
1339    }
1340
1341    mod fm_lifecycle {
1342        use super::*;
1343
1344        #[test]
1345        fn ok_new() {
1346            let (_dir, path, cfg) = new_tmp();
1347            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1348
1349            assert!(!mmap.core.dirty.load(atomic::Ordering::Acquire));
1350            assert!(!mmap.core.closed.load(atomic::Ordering::Acquire));
1351            assert_eq!(mmap.core.curr_length, INIT_SLOTS * FrozenMMap::<u64>::SLOT_SIZE);
1352
1353            assert_eq!(mmap.core.completion.read_current_epoch(), 0);
1354            assert_eq!(mmap.core.completion.read_durable_epoch(), 0);
1355
1356            // satisfies the bg thread was spawned correctly
1357            assert!(mmap.core.completion.get_err().is_none());
1358
1359            // satisfies wait on epoch works
1360            let ticket = unsafe { mmap.write(0, |f| *f = 0x0A).unwrap() };
1361
1362            let ticket_epoch = ticket.epoch();
1363            let durable_epoch = futures::executor::block_on(ticket).unwrap();
1364
1365            assert!(durable_epoch >= ticket_epoch);
1366        }
1367
1368        #[test]
1369        fn ok_new_existing() {
1370            let (_dir, path, cfg) = new_tmp();
1371
1372            // create new + close
1373            let mmap1 = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1374            drop(mmap1);
1375
1376            // open existing
1377            let mmap2 = FrozenMMap::<u64>::new(path, cfg).unwrap();
1378            drop(mmap2);
1379        }
1380
1381        #[test]
1382        fn err_new_when_change_in_cfg() {
1383            let (_dir, path, mut cfg) = new_tmp();
1384
1385            // create new + close
1386            let mmap1 = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1387            drop(mmap1);
1388
1389            // update cfg + opne existing
1390            cfg.initial_count = INIT_SLOTS * 2;
1391            assert!(FrozenMMap::<u64>::new(path, cfg).is_err());
1392        }
1393
1394        #[test]
1395        fn ok_delete() {
1396            let (_dir, path, cfg) = new_tmp();
1397            let mut mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1398
1399            mmap.delete().unwrap();
1400            assert!(!mmap.core.file.exists().unwrap());
1401        }
1402
1403        #[test]
1404        fn err_delete_after_delete() {
1405            let (_dir, path, cfg) = new_tmp();
1406            let mut mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1407
1408            mmap.delete().unwrap();
1409            assert!(!mmap.core.file.exists().unwrap());
1410            assert!(mmap.delete().is_err());
1411        }
1412
1413        #[test]
1414        fn ok_drop_persists_when_dropped_before_bg_flush() {
1415            let (_dir, path, cfg) = new_tmp();
1416            const VAL: u64 = 0x0A;
1417
1418            {
1419                let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1420                unsafe { mmap.write(0, |byte| *byte = VAL).unwrap() };
1421                drop(mmap);
1422            }
1423
1424            {
1425                let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1426                let val = unsafe { mmap.read(0, |byte| *byte) };
1427                assert_eq!(val, VAL);
1428            }
1429        }
1430    }
1431
1432    mod fm_validate_t {
1433        use super::*;
1434
1435        #[repr(C, align(8))]
1436        struct OkT {
1437            a: u64,
1438            b: u64,
1439        }
1440
1441        #[repr(C)]
1442        struct BadAlignT {
1443            a: u32,
1444            b: u32,
1445        }
1446
1447        #[repr(C, align(4))]
1448        struct BadSizeT {
1449            a: u32,
1450            b: u16,
1451        }
1452
1453        #[repr(C, align(8))]
1454        struct DropT(u64);
1455
1456        impl Drop for DropT {
1457            fn drop(&mut self) {}
1458        }
1459
1460        #[repr(C, align(8))]
1461        struct ZstT;
1462
1463        #[test]
1464        fn ok_validate_t() {
1465            assert!(FrozenMMap::<OkT>::validate_t().is_ok());
1466        }
1467
1468        #[test]
1469        fn err_validate_t_when_drop() {
1470            assert!(FrozenMMap::<DropT>::validate_t().is_err());
1471        }
1472
1473        #[test]
1474        fn err_validate_t_when_not_8_byte_aligned() {
1475            assert!(FrozenMMap::<BadAlignT>::validate_t().is_err());
1476        }
1477
1478        #[test]
1479        fn err_validate_t_when_size_not_multiple_of_8() {
1480            assert!(FrozenMMap::<BadSizeT>::validate_t().is_err());
1481        }
1482
1483        #[test]
1484        fn err_validate_t_when_zero_sized() {
1485            assert!(FrozenMMap::<ZstT>::validate_t().is_err());
1486        }
1487
1488        #[test]
1489        fn err_new_when_t_implements_drop() {
1490            let (_dir, path, cfg) = new_tmp();
1491            assert!(FrozenMMap::<DropT>::new(path, cfg).is_err());
1492        }
1493
1494        #[test]
1495        fn err_new_when_t_is_not_8_byte_aligned() {
1496            let (_dir, path, cfg) = new_tmp();
1497            assert!(FrozenMMap::<BadAlignT>::new(path, cfg).is_err());
1498        }
1499
1500        #[test]
1501        fn err_new_when_t_size_is_not_multiple_of_8() {
1502            let (_dir, path, cfg) = new_tmp();
1503            assert!(FrozenMMap::<BadSizeT>::new(path, cfg).is_err());
1504        }
1505
1506        #[test]
1507        fn err_new_when_t_is_zero_sized() {
1508            let (_dir, path, cfg) = new_tmp();
1509            assert!(FrozenMMap::<ZstT>::new(path, cfg).is_err());
1510        }
1511
1512        #[test]
1513        fn err_new_grown_when_t_implements_drop() {
1514            let (_dir, path, cfg) = new_tmp();
1515            assert!(FrozenMMap::<DropT>::new_grown(path, cfg, 1).is_err());
1516        }
1517
1518        #[test]
1519        fn err_new_grown_when_t_is_not_8_byte_aligned() {
1520            let (_dir, path, cfg) = new_tmp();
1521            assert!(FrozenMMap::<BadAlignT>::new_grown(path, cfg, 1).is_err());
1522        }
1523
1524        #[test]
1525        fn err_new_grown_when_t_size_is_not_multiple_of_8() {
1526            let (_dir, path, cfg) = new_tmp();
1527            assert!(FrozenMMap::<BadSizeT>::new_grown(path, cfg, 1).is_err());
1528        }
1529
1530        #[test]
1531        fn err_new_grown_when_t_is_zero_sized() {
1532            let (_dir, path, cfg) = new_tmp();
1533            assert!(FrozenMMap::<ZstT>::new_grown(path, cfg, 1).is_err());
1534        }
1535    }
1536
1537    mod fm_new_grown {
1538        use super::*;
1539
1540        #[test]
1541        fn ok_new_grown_updates_length() {
1542            let (_dir, path, cfg) = new_tmp();
1543
1544            let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1545            assert_eq!(mmap.total_slots(), INIT_SLOTS);
1546            drop(mmap);
1547
1548            let mmap = FrozenMMap::<u64>::new_grown(path, cfg, 0x0A).unwrap();
1549            assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x0A);
1550            assert_eq!(mmap.core.curr_length, (INIT_SLOTS + 0x0A) * FrozenMMap::<u64>::SLOT_SIZE);
1551        }
1552
1553        #[test]
1554        fn err_new_grown_with_preexisting_instance() {
1555            let (_dir, path, cfg) = new_tmp();
1556
1557            let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1558            assert_eq!(mmap.total_slots(), INIT_SLOTS);
1559
1560            assert!(FrozenMMap::<u64>::new_grown(path, cfg, 0x0A).is_err());
1561        }
1562
1563        #[test]
1564        fn ok_new_grown_cycle() {
1565            let (_dir, path, cfg) = new_tmp();
1566
1567            let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1568            drop(mmap);
1569
1570            let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x100).unwrap();
1571            assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x100);
1572            drop(mmap);
1573
1574            let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x100).unwrap();
1575            assert_eq!(mmap.total_slots(), INIT_SLOTS + (2 * 0x100));
1576            drop(mmap);
1577
1578            let mmap = FrozenMMap::<u64>::new_grown(path, cfg, 0x100).unwrap();
1579            assert_eq!(mmap.total_slots(), INIT_SLOTS + (3 * 0x100));
1580        }
1581
1582        #[test]
1583        fn ok_write_reopen_grown_read() {
1584            let (_dir, path, cfg) = new_tmp();
1585
1586            {
1587                let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1588                unsafe { mmap.write(0, |v| *v = 0xAA).unwrap() };
1589            }
1590
1591            {
1592                let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x10).unwrap();
1593                unsafe { mmap.write(0, |v| *v = 0xBB).unwrap() };
1594
1595                let val = unsafe { mmap.read(0, |v| *v) };
1596                assert_eq!(val, 0xBB);
1597            }
1598        }
1599
1600        #[test]
1601        fn ok_write_reopen_grown_read_cycle() {
1602            let (_dir, path, cfg) = new_tmp();
1603
1604            {
1605                let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1606                unsafe { mmap.write(0, |v| *v = 1).unwrap() };
1607            }
1608
1609            for i in 0..2 {
1610                let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x10).unwrap();
1611                let idx = mmap.total_slots() - 1;
1612                unsafe { mmap.write(idx, |v| *v = (i + 2) as u64).unwrap() };
1613                drop(mmap);
1614            }
1615
1616            let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1617
1618            let base = unsafe { mmap.read(0, |v| *v) };
1619            assert_eq!(base, 1);
1620
1621            let last_idx = mmap.total_slots() - 1;
1622            let last = unsafe { mmap.read(last_idx, |v| *v) };
1623            assert_eq!(last, 3);
1624        }
1625
1626        #[test]
1627        fn err_new_grown_while_previous_instance_is_alive() {
1628            let (_dir, path, cfg) = new_tmp();
1629
1630            let _mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1631            let reopened = FrozenMMap::<u64>::new_grown(&path, cfg, 0x10);
1632
1633            assert!(reopened.is_err());
1634        }
1635    }
1636
1637    mod fm_write_read {
1638        use super::*;
1639
1640        #[test]
1641        fn ok_write_wait_read_cycle() {
1642            const VAL: u64 = 0xDEADC0DE;
1643            let (_dir, path, cfg) = new_tmp();
1644            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1645
1646            // write + sync
1647            let ticket = unsafe { mmap.write(0, |ptr| *ptr = VAL).unwrap() };
1648
1649            let ticket_epoch = ticket.epoch();
1650            let durable_epoch = futures::executor::block_on(ticket).unwrap();
1651
1652            assert!(durable_epoch >= ticket_epoch);
1653
1654            // read + verify
1655            let val = unsafe { mmap.read(0, |ptr| *ptr) };
1656            assert_eq!(val, VAL);
1657        }
1658
1659        #[test]
1660        fn ok_write_read_without_wait() {
1661            const VAL: u64 = 0xDEADC0DE;
1662            let (_dir, path, cfg) = new_tmp();
1663            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1664
1665            unsafe { mmap.write(0, |ptr| *ptr = VAL).unwrap() };
1666            let val = unsafe { mmap.read(0, |ptr| *ptr) };
1667            assert_eq!(val, VAL);
1668        }
1669    }
1670
1671    mod fm_tx {
1672        use super::*;
1673
1674        #[test]
1675        fn ok_tx_basic_multi_write() {
1676            let (_dir, path, cfg) = new_tmp();
1677            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1678
1679            let mut tx = mmap.new_tx();
1680            unsafe {
1681                tx.write(0, |v| *v = 1).unwrap();
1682                tx.write(1, |v| *v = 2).unwrap();
1683                tx.write(2, |v| *v = 3).unwrap();
1684            }
1685
1686            let ticket = tx.commit().unwrap();
1687
1688            let ticket_epoch = ticket.epoch();
1689            let durable_epoch = futures::executor::block_on(ticket).unwrap();
1690
1691            assert!(durable_epoch >= ticket_epoch);
1692
1693            let v0 = unsafe { mmap.read(0, |v| *v) };
1694            let v1 = unsafe { mmap.read(1, |v| *v) };
1695            let v2 = unsafe { mmap.read(2, |v| *v) };
1696
1697            assert_eq!((v0, v1, v2), (1, 2, 3));
1698        }
1699
1700        #[test]
1701        fn ok_tx_single_epoch() {
1702            let (_dir, path, cfg) = new_tmp();
1703            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1704
1705            let mut tx = mmap.new_tx();
1706            unsafe {
1707                tx.write(0, |v| *v = 0x0A).unwrap();
1708                tx.write(1, |v| *v = 0x14).unwrap();
1709            }
1710
1711            // NOTE: next write must have strictly higher epoch then current one
1712
1713            let ticket = tx.commit().unwrap();
1714            let next_ticket = unsafe { mmap.write(2, |v| *v = 0x1E).unwrap() };
1715
1716            assert!(next_ticket.epoch() > ticket.epoch());
1717        }
1718
1719        #[test]
1720        fn err_tx_out_of_order() {
1721            let (_dir, path, cfg) = new_tmp();
1722            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1723
1724            let mut tx = mmap.new_tx();
1725            unsafe {
1726                tx.write(2, |v| *v = 1).unwrap();
1727                let res = tx.write(1, |v| *v = 2);
1728                assert!(res.is_err());
1729            }
1730        }
1731
1732        #[test]
1733        fn err_tx_duplicate_index() {
1734            let (_dir, path, cfg) = new_tmp();
1735            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1736
1737            let mut tx = mmap.new_tx();
1738            unsafe {
1739                tx.write(1, |v| *v = 1).unwrap();
1740                let res = tx.write(1, |v| *v = 2);
1741                assert!(res.is_err());
1742            }
1743        }
1744
1745        #[test]
1746        fn ok_tx_concurrent_non_overlapping() {
1747            let (_dir, path, cfg) = new_tmp();
1748            let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
1749
1750            let mut handles = Vec::new();
1751            for i in 0..2 {
1752                let mmap = mmap.clone();
1753                handles.push(thread::spawn(move || {
1754                    let mut tx = mmap.new_tx();
1755
1756                    unsafe {
1757                        tx.write(i * 2, |v| *v = i as u64).unwrap();
1758                        tx.write(i * 2 + 1, |v| *v = i as u64).unwrap();
1759                    }
1760
1761                    let ticket = tx.commit().unwrap();
1762
1763                    let ticket_epoch = ticket.epoch();
1764                    let durable_epoch = futures::executor::block_on(ticket).unwrap();
1765
1766                    assert!(durable_epoch >= ticket_epoch);
1767                }));
1768            }
1769
1770            for h in handles {
1771                h.join().unwrap();
1772            }
1773
1774            for i in 0..2 {
1775                let v0 = unsafe { mmap.read(i * 2, |v| *v) };
1776                let v1 = unsafe { mmap.read(i * 2 + 1, |v| *v) };
1777
1778                assert_eq!((v0, v1), (i as u64, i as u64));
1779            }
1780        }
1781
1782        #[test]
1783        fn ok_tx_overwrite_last_wins() {
1784            let (_dir, path, cfg) = new_tmp();
1785            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1786
1787            let mut tx = mmap.new_tx();
1788            unsafe {
1789                tx.write(0, |v| *v = 1).unwrap();
1790            }
1791
1792            tx.commit().unwrap();
1793
1794            let mut tx2 = mmap.new_tx();
1795            unsafe {
1796                tx2.write(0, |v| *v = 2).unwrap();
1797            }
1798
1799            let ticket = tx2.commit().unwrap();
1800
1801            let ticket_epoch = ticket.epoch();
1802            let durable_epoch = futures::executor::block_on(ticket).unwrap();
1803
1804            assert!(durable_epoch >= ticket_epoch);
1805
1806            let val = unsafe { mmap.read(0, |v| *v) };
1807            assert_eq!(val, 2);
1808        }
1809
1810        #[test]
1811        fn ok_tx_persists_across_reopen() {
1812            let (_dir, path, cfg) = new_tmp();
1813
1814            {
1815                let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1816
1817                let mut tx = mmap.new_tx();
1818                unsafe {
1819                    tx.write(0, |v| *v = 0x3A).unwrap();
1820                    tx.write(1, |v| *v = 0x54).unwrap();
1821                }
1822
1823                let ticket = tx.commit().unwrap();
1824
1825                let ticket_epoch = ticket.epoch();
1826                let durable_epoch = futures::executor::block_on(ticket).unwrap();
1827
1828                assert!(durable_epoch >= ticket_epoch);
1829            }
1830
1831            {
1832                let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1833
1834                let v0 = unsafe { mmap.read(0, |v| *v) };
1835                let v1 = unsafe { mmap.read(1, |v| *v) };
1836
1837                assert_eq!((v0, v1), (0x3A, 0x54));
1838            }
1839        }
1840    }
1841
1842    mod fm_durability {
1843        use super::*;
1844
1845        #[test]
1846        fn ok_wait_then_drop() {
1847            let (_dir, path, cfg) = new_tmp();
1848            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1849
1850            let ticket = unsafe { mmap.write(0, |v| *v = 7).unwrap() };
1851
1852            let ticket_epoch = ticket.epoch();
1853            let durable_epoch = futures::executor::block_on(ticket).unwrap();
1854
1855            assert!(durable_epoch >= ticket_epoch);
1856
1857            drop(mmap);
1858        }
1859
1860        #[test]
1861        fn ok_epoch_monotonicity() {
1862            let (_dir, path, cfg) = new_tmp();
1863            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1864
1865            let t1 = unsafe { mmap.write(0, |v| *v = 1).unwrap() };
1866            let t2 = unsafe { mmap.write(0, |v| *v = 2).unwrap() };
1867
1868            let durable_epoch = futures::executor::block_on(t2).unwrap();
1869            assert!(durable_epoch >= t1.epoch());
1870        }
1871
1872        #[test]
1873        fn ok_wait_for_durability_with_multi_writers() {
1874            let (_dir, path, cfg) = new_tmp();
1875            let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
1876
1877            let mut handles = Vec::new();
1878            for _ in 0..2 {
1879                let mmap = mmap.clone();
1880                handles.push(thread::spawn(move || {
1881                    let ticket = unsafe { mmap.write(0, |v| *v += 1).unwrap() };
1882
1883                    let ticket_epoch = ticket.epoch();
1884                    let durable_epoch = futures::executor::block_on(ticket).unwrap();
1885
1886                    assert!(durable_epoch >= ticket_epoch);
1887                }));
1888            }
1889
1890            for h in handles {
1891                h.join().unwrap();
1892            }
1893
1894            let val = unsafe { mmap.read(0, |v| *v) };
1895            assert_eq!(val, 2);
1896        }
1897    }
1898
1899    mod fm_concurrency {
1900        use super::*;
1901
1902        #[test]
1903        fn ok_parallel_reads_with_diff_index() {
1904            let (_dir, path, cfg) = new_tmp();
1905            let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
1906
1907            unsafe { mmap.write(0, |v| *v = 0x10).unwrap() };
1908            unsafe { mmap.write(1, |v| *v = 0x20).unwrap() };
1909
1910            let t1 = {
1911                let mmap = mmap.clone();
1912                thread::spawn(move || unsafe { mmap.read(0, |v| *v) })
1913            };
1914
1915            let t2 = {
1916                let mmap = mmap.clone();
1917                thread::spawn(move || unsafe { mmap.read(1, |v| *v) })
1918            };
1919
1920            assert_eq!(t1.join().unwrap(), 0x10);
1921            assert_eq!(t2.join().unwrap(), 0x20);
1922        }
1923
1924        #[test]
1925        fn ok_multi_tx_drop_then_reopen_grown() {
1926            let (_dir, path, cfg) = new_tmp();
1927
1928            {
1929                let mmap = sync::Arc::new(FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap());
1930
1931                let mut handles = Vec::new();
1932                for i in 0..2u64 {
1933                    let mmap = mmap.clone();
1934                    handles.push(thread::spawn(move || {
1935                        let _ = unsafe { mmap.write(i as usize, |v| *v = i + 1).unwrap() };
1936                    }));
1937                }
1938
1939                for i in 0..2usize {
1940                    let mmap = mmap.clone();
1941                    handles.push(thread::spawn(move || {
1942                        let _ = unsafe { mmap.read(i, |v| *v) };
1943                    }));
1944                }
1945
1946                for h in handles {
1947                    h.join().unwrap();
1948                }
1949            }
1950
1951            {
1952                let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x10).unwrap();
1953                assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x10);
1954
1955                for i in 0..2u64 {
1956                    let val = unsafe { mmap.read(i as usize, |v| *v) };
1957                    assert_eq!(val, i + 1);
1958                }
1959
1960                let idx = mmap.total_slots() - 1;
1961                unsafe { mmap.write(idx, |v| *v = 0xDEAD).unwrap() };
1962
1963                let val = unsafe { mmap.read(idx, |v| *v) };
1964                assert_eq!(val, 0xDEAD);
1965            }
1966        }
1967    }
1968
1969    mod ack_ticket {
1970        use super::*;
1971
1972        #[test]
1973        fn ok_parallel_waiters_same_epoch_window() {
1974            let (_dir, path, cfg) = new_tmp();
1975            let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
1976
1977            let barrier = sync::Arc::new(sync::Barrier::new(0x10));
1978
1979            let mut handles = Vec::new();
1980            for i in 0..0x10usize {
1981                let mmap = mmap.clone();
1982                let barrier = barrier.clone();
1983
1984                handles.push(thread::spawn(move || {
1985                    barrier.wait();
1986
1987                    let ticket = unsafe { mmap.write(i % INIT_SLOTS, |v| *v = i as u64).unwrap() };
1988
1989                    let epoch = ticket.epoch();
1990                    let durable = futures::executor::block_on(ticket).unwrap();
1991
1992                    assert!(durable >= epoch);
1993                }));
1994            }
1995
1996            for handle in handles {
1997                handle.join().expect("worker thread panicked");
1998            }
1999        }
2000
2001        #[test]
2002        fn ok_parallel_waiters_same_epoch() {
2003            let completion = sync::Arc::new(ack::Completion::default());
2004
2005            let mut handles = Vec::new();
2006            for _ in 0..0x10 {
2007                let completion = completion.clone();
2008
2009                handles.push(thread::spawn(move || {
2010                    let ticket = ack::AckTicket::new(1, completion);
2011
2012                    assert_eq!(
2013                        futures::executor::block_on(ticket).expect("ticket must complete"),
2014                        1
2015                    );
2016                }));
2017            }
2018
2019            thread::sleep(time::Duration::from_millis(0x0A));
2020
2021            completion.mark_epoch_as_durable(1);
2022            completion.notify_all_listeners();
2023
2024            for handle in handles {
2025                handle.join().expect("worker thread panicked");
2026            }
2027        }
2028    }
2029}