Skip to main content

frozen_core/fmmap/
mod.rs

1//! Custom implementation of `mmap(2)`
2//!
3//! ## Constraints
4//!
5//! [`FrozenMMap`] treats the mapped file as raw storage for values of `T`. Because of that,
6//! `T` must be a POD type which is safe to persist and later reinterpret from bytes.
7//!
8//! Required properties for `T`,
9//!
10//! - Must use `#[repr(C)]`
11//! - Should be 8-bytes aligned
12//! - Must not implement [`Drop`]
13//! - `size_of::<T>()` should be multiple of `8`
14//!
15//! *NOTE:* `T` must not contain heap owning or process-local pointers like [`Vec`], [`String`],
16//! [`Box`], references and function pointers, or other fields whose bit-pattern is not stable
17//! across reopen.
18//!
19//! These constrains are enforced as [`FrozenMMap`] does not serialize or deserialize values. It
20//! directly reads and writes `T` inside a memory mapped file. That means `T` must have a stable
21//! layout and must remain valid when the file is reopened in a later process.
22//!
23//! ## Example
24//!
25//! ```
26//! use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
27//!
28//! const MODULE_ID: u8 = 0;
29//!
30//! let dir = tempfile::tempdir().unwrap();
31//! let path = dir.path().join("tmp_frozen_mmap");
32//!
33//! let cfg = FrozenMMapCfg {
34//!     module_id: MODULE_ID,
35//!     initial_count: 0x0A,
36//!     flush_duration: std::time::Duration::from_micros(0x96),
37//! };
38//!
39//! let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
40//! assert_eq!(mmap.total_slots(), 0x0A);
41//!
42//! let epoch = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
43//! mmap.wait_for_durability(epoch).unwrap();
44//!
45//! let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
46//! assert_eq!(val, 0xDEADC0DE);
47//!
48//! drop(mmap);
49//!
50//! let reopened = FrozenMMap::<u64>::new_grown(&path, cfg, 0x05).unwrap();
51//! assert_eq!(reopened.total_slots(), 0x0A + 0x05);
52//!
53//! let val = unsafe { reopened.read(0, |v| *v) }.unwrap();
54//! assert_eq!(val, 0xDEADC0DE);
55//! ```
56
57#[cfg(any(target_os = "linux", target_os = "macos"))]
58mod posix;
59
60use crate::{
61    error::{FrozenError, FrozenResult},
62    ffile::{FrozenFile, FrozenFileCfg},
63    hints,
64};
65use std::{
66    fmt,
67    sync::{self, atomic},
68    thread, time,
69};
70
71/// type for `epoch` used by write ops
72pub type TEpoch = u64;
73
74#[cfg(any(target_os = "linux", target_os = "macos"))]
75type TMap = posix::POSIXMMap;
76
77/// Error codes for [`FrozenMMap`]
78pub(in crate::fmmap) mod err {
79    use crate::error::{ErrCode, FrozenError, FrozenResult};
80
81    /// Domain Id for [`FrozenMMap`] is **18**
82    const ERRDOMAIN: u8 = 0x12;
83
84    /// module id used for [`FrozenMMap`]
85    pub static MID: std::sync::OnceLock<u8> = std::sync::OnceLock::new();
86
87    #[cfg(not(test))]
88    #[inline(always)]
89    pub fn mid() -> &'static u8 {
90        MID.get().unwrap()
91    }
92
93    #[cfg(test)]
94    #[inline(always)]
95    pub fn mid() -> &'static u8 {
96        MID.get_or_init(|| 0)
97    }
98
99    /// internal fuck up (hault and catch fire)
100    pub const HCF: ErrCode = ErrCode::new(0x02, "hault and catch fire");
101
102    /// unknown error (fallback)
103    pub const UNK: ErrCode = ErrCode::new(0x04, "unknown error");
104
105    /// no more memory available
106    pub const NMM: ErrCode = ErrCode::new(0x06, "not enough memory available on the device");
107
108    /// syncing error
109    pub const SYN: ErrCode = ErrCode::new(0x08, "failed to sync/flush data to storage device");
110
111    /// no write/read perm
112    pub const PRM: ErrCode = ErrCode::new(0x0A, "missing permissions for IO");
113
114    /// flush_tx error (panic inside)
115    pub const TXE: ErrCode = ErrCode::new(0x0C, "flush_tx paniced inside");
116
117    /// flush_tx error (unable to spawn)
118    pub const FXE: ErrCode = ErrCode::new(0x10, "unable to spawn flush_tx");
119
120    /// type `T` implements drop
121    pub const DRP: ErrCode = ErrCode::new(0x12, "type T must not implement `Drop`");
122
123    /// type `T` is not 8 bytes aligned
124    pub const ALN: ErrCode = ErrCode::new(0x14, "type T must be 8-bytes aligned");
125
126    /// `size_of::<T>()` is not multiple of 8
127    pub const SZE: ErrCode = ErrCode::new(0x16, "`size_of::<T>()` must be multiple of 8 bytes");
128
129    /// type `T` must not be zero sized
130    pub const ZRO: ErrCode = ErrCode::new(0x18, "type T must not be zero sized");
131
132    #[inline]
133    pub(in crate::fmmap) fn new_err<R, E: std::fmt::Display>(code: ErrCode, error: E) -> FrozenResult<R> {
134        let err = FrozenError::new_raw(*mid(), ERRDOMAIN, code, error);
135        Err(err)
136    }
137
138    #[inline]
139    pub(in crate::fmmap) fn new_err_default<R>(code: ErrCode) -> FrozenResult<R> {
140        let err = FrozenError::new_raw(*mid(), ERRDOMAIN, code, "");
141        Err(err)
142    }
143
144    #[inline]
145    pub(in crate::fmmap) fn new_err_raw<E: std::fmt::Display>(code: ErrCode, error: E) -> FrozenError {
146        FrozenError::new_raw(*mid(), ERRDOMAIN, code, error)
147    }
148}
149
150/// Config for [`FrozenMMap`]
151#[derive(Debug, Clone)]
152pub struct FrozenMMapCfg {
153    /// Identifier used for error propagation by [`frozen_core::error::FrozenError`]
154    pub module_id: u8,
155
156    /// Number of slots to pre-allocate when [`FrozenMMap`] is initialized
157    ///
158    /// Each slot has size of [`FrozenMMap::<T>::SLOT_SIZE`], where initial file length will
159    /// be `chunk_size * initial_count` (bytes)
160    pub initial_count: usize,
161
162    /// Time interval used by flusher tx, to batch write ops into a durable window and sync them
163    /// together, where all write ops in certain time interval falls into a single durable window
164    pub flush_duration: time::Duration,
165}
166
167/// Custom implementation of `mmap(2)`
168///
169/// ## Constraints
170///
171/// [`FrozenMMap`] treats the mapped file as raw storage for values of `T`. Because of that, `T`
172/// must be a POD type which is safe to persist and later reinterpret from bytes.
173///
174/// Required properties for `T`,
175///
176/// - Must use `#[repr(C)]`
177/// - Should be 8-bytes aligned
178/// - Must not implement [`Drop`]
179/// - `size_of::<T>()` should be multiple of `8`
180///
181/// *NOTE:* `T` must not contain heap owning or process-local pointers like [`Vec`], [`String`],
182/// [`Box`], references and function pointers, or other fields whose bit-pattern is not stable
183/// across reopen.
184///
185/// These constrains are enforced as [`FrozenMMap`] does not serialize or deserialize values. It
186/// directly reads and writes `T` inside a memory mapped file. That means `T` must have a stable
187/// layout and must remain valid when the file is reopened in a later process.
188///
189/// ## Example
190///
191/// ```
192/// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
193///
194/// const MODULE_ID: u8 = 0;
195///
196/// let dir = tempfile::tempdir().unwrap();
197/// let path = dir.path().join("tmp_frozen_mmap");
198///
199/// let cfg = FrozenMMapCfg {
200///     module_id: MODULE_ID,
201///     initial_count: 0x0A,
202///     flush_duration: std::time::Duration::from_micros(0x96),
203/// };
204///
205/// let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
206/// assert_eq!(mmap.total_slots(), 0x0A);
207///
208/// let epoch = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
209/// mmap.wait_for_durability(epoch).unwrap();
210///
211/// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
212/// assert_eq!(val, 0xDEADC0DE);
213///
214/// drop(mmap);
215///
216/// let reopened = FrozenMMap::<u64>::new_grown(&path, cfg, 0x05).unwrap();
217/// assert_eq!(reopened.total_slots(), 0x0A + 0x05);
218///
219/// let val = unsafe { reopened.read(0, |v| *v) }.unwrap();
220/// assert_eq!(val, 0xDEADC0DE);
221/// ```
222#[derive(Debug)]
223pub struct FrozenMMap<T>
224where
225    T: Sized + Send + Sync,
226{
227    core: sync::Arc<Core>,
228    tx: Option<thread::JoinHandle<()>>,
229    _t: core::marker::PhantomData<T>,
230}
231
232unsafe impl<T> Send for FrozenMMap<T> where T: Sized + Send + Sync {}
233unsafe impl<T> Sync for FrozenMMap<T> where T: Sized + Send + Sync {}
234
235impl<T> FrozenMMap<T>
236where
237    T: Sized + Send + Sync,
238{
239    /// Memory space required for each slot of [`T`] in [`FrozenMMap`]
240    pub const SLOT_SIZE: usize = std::mem::size_of::<T>();
241
242    /// Create a new [`FrozenMMap`] instance w/ given [`FrozenMMapCfg`]
243    ///
244    /// ## Multiple Instances
245    ///
246    /// For each [`FrozenMMap`] instance, we acquire an exclusive lock from the kernal for the
247    /// underlying [`FrozenFile`], when trying to create multiple instances of [`FrozenMMap`], an
248    /// error will be thrown.
249    ///
250    /// ## Capacity Growth
251    ///
252    /// [`FrozenMMap`] does not support in-place growth of a live mapping, to increase capacity,
253    /// drop the current instance and reopen w/ [`FrozenMMap::open_grown`] which provides memory
254    /// mapping over grown capacity.
255    ///
256    /// ## [`FrozenMMapCfg`]
257    ///
258    /// All configs for [`FrozenMMap`] are stored in [`FrozenMMapCfg`]
259    ///
260    /// ## Working
261    ///
262    /// We first create a new [`FrozenFile`] if note already, then map the entire file using
263    /// `mmap(2)`, the entire file must read/write `T`, which also should stay constant for the
264    /// entire lifetime of file.
265    ///
266    /// ## Important
267    ///
268    /// The `cfg` must not change any of its properties for the entire life of [`FrozenFile`],
269    /// which is used under the hood, one must use config stores like [`Rta`](https://crates.io/crates/rta)
270    /// to store config.
271    ///
272    /// ## Example
273    ///
274    /// ```
275    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
276    ///
277    /// const MODULE_ID: u8 = 0;
278    ///
279    /// let dir = tempfile::tempdir().unwrap();
280    /// let path = dir.path().join("tmp_frozen_mmap");
281    ///
282    /// let cfg = FrozenMMapCfg {
283    ///     module_id: MODULE_ID,
284    ///     initial_count: 0x0A,
285    ///     flush_duration: std::time::Duration::from_micros(0x96),
286    /// };
287    ///
288    /// let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
289    /// assert_eq!(mmap.total_slots(), 0x0A);
290    ///
291    /// let epoch = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
292    /// mmap.wait_for_durability(epoch).unwrap();
293    ///
294    /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
295    /// assert_eq!(val, 0xDEADC0DE);
296    /// ```
297    pub fn new<P: AsRef<std::path::Path>>(path: P, cfg: FrozenMMapCfg) -> FrozenResult<Self> {
298        Self::validate_t()?;
299        let (file, curr_length) = Self::open_file(path.as_ref().to_path_buf(), &cfg)?;
300        let total_slots = curr_length / Self::SLOT_SIZE;
301
302        // NOTE: The value is used for error logging and is initialized only once, as `OnceLock`
303        // guarantees that the first caller sets the value and all subsequent calls reuse it
304        let _ = err::MID.get_or_init(|| cfg.module_id);
305
306        let mmap = unsafe { TMap::new(file.fd(), curr_length) }?;
307        let core = sync::Arc::new(Core::new(mmap, file, cfg.flush_duration, curr_length, total_slots));
308
309        // INFO: we spawn the thread for background sync
310        let tx = Core::spawn_tx(core.clone())?;
311
312        Ok(Self { core, tx: Some(tx), _t: core::marker::PhantomData })
313    }
314
315    /// Create a new [`FrozenMMap`] instance w/ given [`FrozenMMapCfg`], while growing the
316    /// underlying [`FrozenFile`] by `additional_slots` before creating memory mapping
317    ///
318    /// ## Multiple Instances
319    ///
320    /// For each [`FrozenMMap`] instance, we acquire an exclusive lock from the kernal for the
321    /// underlying [`FrozenFile`], when trying to create multiple instances of [`FrozenMMap`], an
322    /// error will be thrown.
323    ///
324    /// ## Why not create a [`FrozenMMap::grow`] call?
325    ///
326    /// Previously when [`FrozenMMap::grow`] was attempted, it was observed that, resizing an active
327    /// memory mapping in place is tricky in concurrent code, as some threads would still hold
328    /// stale/unmapped pointers to mmap due to preemption from the OS schedular
329    ///
330    /// So, instead of remmaping a live instance, the current API performs growth during open,
331    /// making capacity expansion an explicit lifecycle operation, and not a side effect on a live
332    /// instance.
333    ///
334    /// ## [`FrozenMMapCfg`]
335    ///
336    /// All configs for [`FrozenMMap`] are stored in [`FrozenMMapCfg`]
337    ///
338    /// ## Working
339    ///
340    /// We first create a new [`FrozenFile`] if note already, then we grow the file using
341    /// [`FrozenFile::grow`] then map the entire file using `mmap(2)`, the entire file must
342    /// read/write `T`, which also should stay constant for the entire lifetime of file.
343    ///
344    /// ## Important
345    ///
346    /// The `cfg` must not change any of its properties for the entire life of [`FrozenFile`],
347    /// which is used under the hood, one must use config stores like
348    /// [`Rta`](https://crates.io/crates/rta) to store config.
349    ///
350    /// ## Example
351    ///
352    /// ```
353    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
354    ///
355    /// const MODULE_ID: u8 = 0;
356    ///
357    /// let dir = tempfile::tempdir().unwrap();
358    /// let path = dir.path().join("tmp_frozen_mmap");
359    ///
360    /// let cfg = FrozenMMapCfg {
361    ///     module_id: MODULE_ID,
362    ///     initial_count: 0x0A,
363    ///     flush_duration: std::time::Duration::from_micros(0x96),
364    /// };
365    ///
366    /// let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x0A).unwrap();
367    /// assert_eq!(mmap.total_slots(), 0x0A * 2);
368    ///
369    /// let epoch = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
370    /// mmap.wait_for_durability(epoch).unwrap();
371    ///
372    /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
373    /// assert_eq!(val, 0xDEADC0DE);
374    /// ```
375    pub fn new_grown<P: AsRef<std::path::Path>>(
376        path: P,
377        cfg: FrozenMMapCfg,
378        additional_slots: usize,
379    ) -> FrozenResult<Self> {
380        Self::validate_t()?;
381        let (file, _) = Self::open_file(path.as_ref().to_path_buf(), &cfg)?;
382
383        // we grow the underlying FrozenFile as requested
384        file.grow(additional_slots)?;
385        let curr_length = file.length()?; // we must read the updated (grown) length
386        let total_slots = curr_length / Self::SLOT_SIZE;
387
388        // NOTE: The value is used for error logging and is initialized only once, as `OnceLock`
389        // guarantees that the first caller sets the value and all subsequent calls reuse it
390        let _ = err::MID.get_or_init(|| cfg.module_id);
391
392        let mmap = unsafe { TMap::new(file.fd(), curr_length) }?;
393        let core = sync::Arc::new(Core::new(mmap, file, cfg.flush_duration, curr_length, total_slots));
394
395        // INFO: we spawn the thread for background sync
396        let tx = Core::spawn_tx(core.clone())?;
397
398        Ok(Self { core, tx: Some(tx), _t: core::marker::PhantomData })
399    }
400
401    /// Create/open a new [`FrozenFile`] instance
402    fn open_file(path: std::path::PathBuf, cfg: &FrozenMMapCfg) -> FrozenResult<(FrozenFile, usize)> {
403        let ff_cfg = FrozenFileCfg {
404            path,
405            module_id: cfg.module_id,
406            buffer_size: Self::SLOT_SIZE,
407            initial_available_buffers: cfg.initial_count,
408        };
409
410        let file = FrozenFile::new(ff_cfg)?;
411        let curr_length = file.length()?;
412
413        Ok((file, curr_length))
414    }
415
416    #[inline]
417    fn validate_t() -> FrozenResult<()> {
418        if std::mem::needs_drop::<T>() {
419            return err::new_err_default(err::DRP);
420        }
421
422        let align = std::mem::align_of::<T>();
423        if align != 8 {
424            return err::new_err_default(err::ALN);
425        }
426
427        let size = std::mem::size_of::<T>();
428        if size == 0 {
429            return err::new_err_default(err::ZRO);
430        }
431
432        if size % 8 != 0 {
433            return err::new_err_default(err::SZE);
434        }
435
436        Ok(())
437    }
438
439    /// Blocks until given `epoch` becomes durable
440    ///
441    /// ## Batching
442    ///
443    /// With respect to `flush_duration`, all write ops are batched before sync, which is executed
444    /// by flusher tx working in background, while each write is assigned w/ current durable epoch,
445    /// and all writes which observe the exact same epoch, belong to the same durability window, and
446    /// are all sync'ed together
447    ///
448    /// When a background sync succeeds, the internal durable epoch is incremented, indicating that
449    /// all writes that observed the previous epoch are now durable on disk
450    ///
451    /// ## Example
452    ///
453    /// ```
454    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
455    ///
456    /// const MODULE_ID: u8 = 0;
457    ///
458    /// let dir = tempfile::tempdir().unwrap();
459    /// let path = dir.path().join("tmp_wait_epoch");
460    ///
461    /// let cfg = FrozenMMapCfg {
462    ///     module_id: MODULE_ID,
463    ///     initial_count: 0x04,
464    ///     flush_duration: std::time::Duration::from_micros(0x60),
465    /// };
466    ///
467    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
468    ///
469    /// let epoch = unsafe { mmap.write(0, |v| *v = 0x8A) }.unwrap();
470    /// mmap.wait_for_durability(epoch).unwrap();
471    ///
472    /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
473    /// assert_eq!(val, 0x8A);
474    /// ```
475    pub fn wait_for_durability(&self, epoch: u64) -> FrozenResult<()> {
476        if let Some(sync_err) = self.core.get_sync_error() {
477            return Err(sync_err);
478        }
479
480        let durable_epoch = self.core.durable_epoch.load(atomic::Ordering::Acquire);
481        if durable_epoch >= epoch {
482            return Ok(());
483        }
484
485        let mut guard = self.core.acquire_durable_lock();
486        loop {
487            if let Some(sync_err) = self.core.get_sync_error() {
488                return Err(sync_err);
489            }
490
491            if self.core.durable_epoch.load(atomic::Ordering::Acquire) >= epoch {
492                return Ok(());
493            }
494
495            // NOTE: See [`Core::acquire_durable_lock`] implementation for rationale behind poison recovery
496            guard = self.core.durable_cv.wait(guard).unwrap_or_else(|e| e.into_inner());
497        }
498    }
499
500    /// Read a `T` at given `index` via callback (`f`)
501    ///
502    /// ## Concurrency
503    ///
504    /// Internally, [`FrozenMMap`] implements per-slot locking, so concurrent reads and writes for
505    /// at same index is atomic and thread safe, while operations on different indices may proceed
506    /// fully in parallel
507    ///
508    /// ## Safety
509    ///
510    /// The caller must ensure following:
511    ///
512    /// - given `index` is within bounds
513    /// - underlying memory contains a valid instance of `T`
514    /// - provided callback `f` must not,
515    ///   - write through the pointer
516    ///   - store or leak pointer beyound there lifetime
517    ///
518    /// Violating any of the above may result in undefined behavior
519    ///
520    /// ## Example
521    ///
522    /// ```
523    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
524    ///
525    /// const MODULE_ID: u8 = 0;
526    ///
527    /// let dir = tempfile::tempdir().unwrap();
528    /// let path = dir.path().join("tmp_read_mmap");
529    ///
530    /// let cfg = FrozenMMapCfg {
531    ///     module_id: MODULE_ID,
532    ///     initial_count: 0x02,
533    ///     flush_duration: std::time::Duration::from_micros(0x60),
534    /// };
535    ///
536    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
537    ///
538    /// let epoch = unsafe { mmap.write(0, |v| *v = 0x0A) }.unwrap();
539    /// mmap.wait_for_durability(epoch).unwrap();
540    ///
541    /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
542    /// assert_eq!(val, 0x0A);
543    /// ```
544    #[inline(always)]
545    pub unsafe fn read<R>(&self, index: usize, f: impl FnOnce(*const T) -> R) -> FrozenResult<R> {
546        let offset = Self::SLOT_SIZE * index;
547        let _lock = self.core.locks.lock(index);
548
549        // NOTE: We do avoid acquiring io_lock for read ops to increase the throughput (under the
550        // assumption of OS guarantees visibility)
551
552        let ptr = unsafe { self.core.map.as_ptr(offset) };
553        Ok(f(ptr))
554    }
555
556    /// Write/update a `T` at given `index` via callback (`f`)
557    ///
558    /// ## Concurrency
559    ///
560    /// Internally, [`FrozenMMap`] implements per-slot locking, so concurrent reads and writes for
561    /// at same index is atomic and thread safe, while operations on different indices may proceed
562    /// fully in parallel
563    ///
564    /// ## Safety
565    ///
566    /// The caller must ensure following:
567    ///
568    /// - given `index` is within bounds
569    /// - underlying memory contains a valid instance of `T`
570    /// - provided callback `f` must not  store or leak pointer beyound there lifetime
571    ///
572    /// Violating any of the above may result in undefined behavior
573    ///
574    /// ## Example
575    ///
576    /// ```
577    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
578    ///
579    /// const MODULE_ID: u8 = 0;
580    ///
581    /// let dir = tempfile::tempdir().unwrap();
582    /// let path = dir.path().join("tmp_write_mmap");
583    ///
584    /// let cfg = FrozenMMapCfg {
585    ///     module_id: MODULE_ID,
586    ///     initial_count: 0x02,
587    ///     flush_duration: std::time::Duration::from_micros(0x96),
588    /// };
589    ///
590    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
591    ///
592    /// let epoch = unsafe {mmap.write(1, |v| *v = 0x2B) }.unwrap();
593    /// mmap.wait_for_durability(epoch).unwrap();
594    ///
595    /// let val = unsafe { mmap.read(1, |v| *v) }.unwrap();
596    /// assert_eq!(val, 0x2B);
597    /// ```
598    #[inline(always)]
599    pub unsafe fn write(&self, index: usize, f: impl FnOnce(*mut T)) -> FrozenResult<TEpoch> {
600        // propagate prev errors
601        if let Some(err) = self.core.get_sync_error() {
602            return Err(err);
603        }
604
605        let offset = Self::SLOT_SIZE * index;
606
607        let _guard = self.core.acquire_io_lock();
608        let _lock = self.core.locks.lock(index);
609
610        let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
611        f(ptr);
612
613        self.core.dirty.store(true, atomic::Ordering::Release);
614        let epoch = self.core.incr_curr_epoch();
615
616        Ok(epoch)
617    }
618
619    /// Write/update a `T` at given `index` via callback (`f`) w/ instant durability
620    ///
621    /// This function performs a blocking hard-sync, unlike [`FrozenMMap::write`], the update is
622    /// immediately persisted to the underlying storage device
623    ///
624    /// ## Concurrency
625    ///
626    /// Internally, [`FrozenMMap`] implements per-slot locking, so concurrent reads and writes for
627    /// at same index is atomic and thread safe, while operations on different indices may proceed
628    /// fully in parallel.
629    ///
630    /// ## Safety
631    ///
632    /// The caller must ensure following:
633    ///
634    /// - given `index` is within bounds
635    /// - underlying memory contains a valid instance of `T`
636    /// - provided callback `f` must not  store or leak pointer beyound there lifetime
637    ///
638    /// Violating any of the above may result in undefined behavior
639    ///
640    /// ## Example
641    ///
642    /// ```
643    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
644    ///
645    /// const MODULE_ID: u8 = 0;
646    ///
647    /// let dir = tempfile::tempdir().unwrap();
648    /// let path = dir.path().join("tmp_write_sync");
649    ///
650    /// let cfg = FrozenMMapCfg {
651    ///     module_id: MODULE_ID,
652    ///     initial_count: 0x02,
653    ///     flush_duration: std::time::Duration::from_micros(0x96),
654    /// };
655    ///
656    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
657    /// unsafe { mmap.write_sync(0, |v| *v = 0xC0DE) }.unwrap();
658    ///
659    /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
660    /// assert_eq!(val, 0xC0DE);
661    /// ```
662    #[inline(always)]
663    pub unsafe fn write_sync(&self, index: usize, f: impl FnOnce(*mut T)) -> FrozenResult<()> {
664        // propagate prev errors
665        if let Some(err) = self.core.get_sync_error() {
666            return Err(err);
667        }
668
669        let offset = Self::SLOT_SIZE * index;
670
671        // block flush_tx scheduling
672        let _flush_guard = self.core.acquire_lock();
673
674        // we use exlusive lock as we perform blocking hard sync
675        let _guard = self.core.acquire_exclusive_io_lock();
676
677        let _lock = self.core.locks.lock(index);
678        let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
679        f(ptr);
680
681        // blocking hard sync
682        self.core.sync()?;
683
684        // NOTE: we hard sync we flushed an entier batch, so we can skip the sync via flush_tx as
685        // the current batch is durable
686
687        self.core.mark_epoch_durable();
688        let prev = self.core.dirty.swap(false, atomic::Ordering::AcqRel);
689
690        // NOTE: we must also notify cv's waiting for durability (we skip if there was no batch to
691        // sync)
692        if prev {
693            let _g = self.core.acquire_durable_lock();
694            self.core.durable_cv.notify_all();
695        }
696
697        Ok(())
698    }
699
700    /// Read current available count of slots, where each slot has size of [`FrozenMMap::<T>::SLOT_SIZE`]
701    ///
702    /// ## Working
703    ///
704    /// This call performs a syscall to fetch current length of [`FrozenFile`] from fs, as the
705    /// current length of the file is not cached anywhere in the pipeline to avoid TOCTAU race
706    /// conditions
707    ///
708    /// ## Example
709    ///
710    /// ```
711    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
712    ///
713    /// const MODULE_ID: u8 = 0;
714    ///
715    /// let dir = tempfile::tempdir().unwrap();
716    /// let path = dir.path().join("tmp_grow_mmap");
717    ///
718    /// let cfg = FrozenMMapCfg {
719    ///     module_id: MODULE_ID,
720    ///     initial_count: 0x02,
721    ///     flush_duration: std::time::Duration::from_micros(0x96),
722    /// };
723    ///
724    /// let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
725    /// assert_eq!(mmap.total_slots(), 0x02);
726    ///
727    /// drop(mmap);
728    ///
729    /// let mmap = FrozenMMap::<u64>::new_grown(&path, cfg, 0x03).unwrap();
730    /// assert_eq!(mmap.total_slots(), 0x02 + 0x03);
731    /// ```
732    #[inline]
733    pub fn total_slots(&self) -> usize {
734        self.core.curr_length / Self::SLOT_SIZE
735    }
736
737    /// Read the total memory footprint (in bytes) used by [`FrozenMMap`]
738    ///
739    /// *NOTE:* This is an approzimation of memory used, actual RSS may differ depending on paging
740    /// and OS
741    ///
742    /// ## Example
743    ///
744    /// ```
745    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
746    ///
747    /// const MODULE_ID: u8 = 0;
748    ///
749    /// let dir = tempfile::tempdir().unwrap();
750    /// let path = dir.path().join("tmp_mem_usage");
751    ///
752    /// let cfg = FrozenMMapCfg {
753    ///     module_id: MODULE_ID,
754    ///     initial_count: 0x10,
755    ///     flush_duration: std::time::Duration::from_micros(0x60),
756    /// };
757    ///
758    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
759    ///
760    /// let bytes = mmap.memory_usage();
761    /// assert!(bytes >= mmap.total_slots() * std::mem::size_of::<u64>());
762    /// ```
763    #[inline]
764    pub fn memory_usage(&self) -> usize {
765        // memory of mem-maped region
766        let mmap_bytes = self.core.curr_length;
767
768        // memory used for locking
769        let lock_bytes = self.core.locks.0.len() * std::mem::size_of::<atomic::AtomicU8>();
770
771        mmap_bytes + lock_bytes
772    }
773
774    /// Create a new [`FMTransaction`] context for grouping multi write ops into a single atomic
775    /// operation
776    ///
777    /// ## Overview
778    ///
779    /// The use of [`FMTransaction`] allows to group multiple write ops into a single atomic
780    /// operation, hence creating a transactional write operation, which gives following guarantees:
781    ///
782    /// - All write ops succeed together
783    /// - Single epoch to track durability of all writes ops
784    /// - Same durability guarantee for all the included write ops
785    ///
786    /// Simply, this preserves atomic durability semantics for multi index updates
787    ///
788    /// ## Example
789    ///
790    /// ```
791    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
792    ///
793    /// const MODULE_ID: u8 = 0;
794    ///
795    /// let dir = tempfile::tempdir().unwrap();
796    /// let path = dir.path().join("tmp_tx");
797    ///
798    /// let cfg = FrozenMMapCfg {
799    ///     module_id: MODULE_ID,
800    ///     initial_count: 0x0A,
801    ///     flush_duration: std::time::Duration::from_micros(50),
802    /// };
803    ///
804    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
805    ///
806    /// let mut tx = mmap.new_tx();
807    /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
808    /// unsafe { tx.write(1, |v| *v = 0x14) }.unwrap();
809    ///
810    /// let epoch = tx.commit().unwrap();
811    /// mmap.wait_for_durability(epoch).unwrap();
812    ///
813    /// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
814    /// let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
815    ///
816    /// assert_eq!((v0, v1), (0x0A, 0x14));
817    /// ```
818    #[inline]
819    pub fn new_tx(&self) -> FMTransaction<'_, T> {
820        FMTransaction { core: &self.core, ops_vec: Vec::new() }
821    }
822
823    /// Delete the underlying [`FrozenFile`] used for [`FrozenMMap`] from fs
824    ///
825    /// ## Working
826    ///
827    /// When `delete` is called, all read, write, and (background) sync ops are paused
828    /// (indefinitely), whule deletion is done with following steps:
829    ///
830    /// - acquire an exclusive `io_lock` (all other ops are paused indefinitely)
831    /// - if any batch is pending for sync,
832    ///   - swap the flag
833    ///   - call sync manually
834    ///   - incr epoch and update cv
835    /// - brodcast closing so flusher tx could wrap up
836    /// - `munmap(2)` current mapping
837    /// - call delete on [`FrozenFile`]
838    ///
839    /// ## Example
840    ///
841    /// ```
842    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
843    ///
844    /// const MODULE_ID: u8 = 0;
845    ///
846    /// let dir = tempfile::tempdir().unwrap();
847    /// let path = dir.path().join("tmp_delete_mmap");
848    ///
849    /// let cfg = FrozenMMapCfg {
850    ///     module_id: MODULE_ID,
851    ///     initial_count: 0x04,
852    ///     flush_duration: std::time::Duration::from_micros(0x96),
853    /// };
854    ///
855    /// let mut mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
856    /// mmap.delete().unwrap();
857    /// assert!(!path.exists());
858    /// ```
859    pub fn delete(&mut self) -> FrozenResult<()> {
860        // NOTE: we must broadcast that the close is happening to allow flusher tx to wrap up
861        self.core.dirty.store(false, atomic::Ordering::Release);
862        self.core.closed.store(true, atomic::Ordering::Release);
863        self.core.durable_cv.notify_one();
864
865        if let Some(handle) = self.tx.take() {
866            let _ = handle.join();
867        }
868
869        // pause all new write ops
870        let _lock = self.core.acquire_exclusive_io_lock();
871
872        self.munmap()?;
873        self.core.file.delete()
874    }
875
876    #[inline]
877    fn munmap(&self) -> FrozenResult<()> {
878        let length = self.core.curr_length;
879        unsafe { self.core.map.unmap(length) }
880    }
881}
882
883impl<T> Drop for FrozenMMap<T>
884where
885    T: Sized + Send + Sync,
886{
887    fn drop(&mut self) {
888        let is_closed = self.core.closed.swap(true, atomic::Ordering::Release);
889        self.core.cv.notify_one(); // notify flusher tx to shut
890
891        if let Some(handle) = self.tx.take() {
892            let _ = handle.join();
893        }
894
895        // we must acquire an exclusive lock, to prevent dropping while sync, growing or any io ops
896        let _io_lock = self.core.acquire_exclusive_io_lock();
897
898        // free up the boxed error (if any)
899        let ptr = self.core.error.swap(std::ptr::null_mut(), atomic::Ordering::AcqRel);
900        if !ptr.is_null() {
901            unsafe {
902                drop(Box::from_raw(ptr));
903            }
904        }
905
906        if !is_closed {
907            let _ = self.munmap();
908        }
909    }
910}
911
912impl<T> fmt::Display for FrozenMMap<T>
913where
914    T: Sized + Send + Sync,
915{
916    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
917        write!(
918            f,
919            "FrozenMMap{{fd: {}, total_slots: {}, len: {}}}",
920            self.core.file.fd(),
921            self.total_slots(),
922            self.core.curr_length,
923        )
924    }
925}
926
927/// A context for grouping multi write ops into a single atomic operation
928///
929/// ## Overview
930///
931/// Use of [`FMTransaction`] allows to group multiple write ops into a single atomic operation.
932///
933/// - All included writes are applied together
934/// - Single epoch is assinged for an entier transaction
935/// - Durability guarantee is same for all included write ops
936///
937/// ## Example
938///
939/// ```
940/// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
941///
942/// const MODULE_ID: u8 = 0;
943///
944/// let dir = tempfile::tempdir().unwrap();
945/// let path = dir.path().join("tmp_tx");
946///
947/// let cfg = FrozenMMapCfg {
948///     module_id: MODULE_ID,
949///     initial_count: 0x0A,
950///     flush_duration: std::time::Duration::from_micros(50),
951/// };
952///
953/// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
954///
955/// let mut tx = mmap.new_tx();
956/// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
957/// unsafe { tx.write(1, |v| *v = 0x14) }.unwrap();
958/// unsafe { tx.write(2, |v| *v = 0x18) }.unwrap();
959///
960/// let epoch = tx.commit().unwrap();
961/// mmap.wait_for_durability(epoch).unwrap();
962///
963/// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
964/// let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
965/// let v2 = unsafe { mmap.read(2, |v| *v).unwrap() };
966///
967/// assert_eq!((v0, v1, v2), (0x0A, 0x14, 0x18));
968/// ```
969pub struct FMTransaction<'a, T> {
970    core: &'a Core,
971    ops_vec: Vec<(usize, Box<dyn FnOnce(*mut T) + 'a>)>,
972}
973
974impl<'a, T> FMTransaction<'a, T> {
975    /// Append a write op into the [`FMTransaction`]
976    ///
977    /// ## Requirements
978    ///
979    /// Write ops must follow these safety requirements,
980    ///
981    /// - No duplicate indices
982    /// - No out-of-order writes (must be incremental)
983    ///
984    /// Violating this constraint would result in [`FrozenError`]
985    ///
986    /// ## Safety
987    ///
988    /// Same safety requirements as [`FrozenMMap::write`] apply here
989    ///
990    /// ## Example
991    ///
992    /// ```
993    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
994    ///
995    /// const MODULE_ID: u8 = 0;
996    ///
997    /// let dir = tempfile::tempdir().unwrap();
998    /// let path = dir.path().join("tmp_tx");
999    ///
1000    /// let cfg = FrozenMMapCfg {
1001    ///     module_id: MODULE_ID,
1002    ///     initial_count: 0x10,
1003    ///     flush_duration: std::time::Duration::from_micros(50),
1004    /// };
1005    ///
1006    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1007    ///
1008    /// let mut tx = mmap.new_tx();
1009    /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
1010    /// unsafe { tx.write(1, |v| *v = 0x0B) }.unwrap();
1011    /// unsafe { tx.write(2, |v| *v = 0x0C) }.unwrap();
1012    ///
1013    /// let epoch = tx.commit().unwrap();
1014    /// mmap.wait_for_durability(epoch).unwrap();
1015    ///
1016    /// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1017    /// let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
1018    /// let v2 = unsafe { mmap.read(2, |v| *v).unwrap() };
1019    ///
1020    /// assert_eq!((v0, v1, v2), (0x0A, 0x0B, 0x0C));
1021    /// ```
1022    #[inline(always)]
1023    pub unsafe fn write<F>(&mut self, index: usize, f: F) -> FrozenResult<()>
1024    where
1025        F: FnOnce(*mut T) + 'a,
1026    {
1027        // NOTE:
1028        //
1029        // This check prevents a potential footgun! For a safe transaction all writes must be,
1030        // - ordered by index (either incr or decr)
1031        // - no multi writes on same index
1032        //
1033        // If any of these is violated, there is a certain risk of deadlock in multi tx env's
1034        if let Some((last_idx, _)) = self.ops_vec.last() {
1035            if index <= *last_idx {
1036                return err::new_err(
1037                    err::HCF,
1038                    "tx writes must be strictly ordered, with no more then single ops on given index",
1039                );
1040            }
1041        }
1042
1043        self.ops_vec.push((index, Box::new(f)));
1044        Ok(())
1045    }
1046
1047    /// Commit the transaction, applying all the writes ops, combined into a single atomic operation
1048    ///
1049    /// ## Guarantees
1050    ///
1051    /// - All writes are applied under a single epoch
1052    /// - All writes belong to the same durability batch
1053    /// - No interleaving with other transactions at epoch level
1054    ///
1055    /// ## Example
1056    ///
1057    /// ```
1058    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
1059    ///
1060    /// const MODULE_ID: u8 = 0;
1061    ///
1062    /// let dir = tempfile::tempdir().unwrap();
1063    /// let path = dir.path().join("tmp_tx");
1064    ///
1065    /// let cfg = FrozenMMapCfg {
1066    ///     module_id: MODULE_ID,
1067    ///     initial_count: 0x10,
1068    ///     flush_duration: std::time::Duration::from_micros(50),
1069    /// };
1070    ///
1071    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1072    ///
1073    /// let mut tx = mmap.new_tx();
1074    /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
1075    /// unsafe { tx.write(2, |v| *v = 0x0C) }.unwrap();
1076    ///
1077    /// let epoch = tx.commit().unwrap();
1078    /// mmap.wait_for_durability(epoch).unwrap();
1079    ///
1080    /// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1081    /// let v1 = unsafe { mmap.read(2, |v| *v).unwrap() };
1082    ///
1083    /// assert_eq!((v0, v1), (0x0A, 0x0C));
1084    /// ```
1085    #[inline(always)]
1086    pub fn commit(self) -> FrozenResult<u64> {
1087        if let Some(err) = self.core.get_sync_error() {
1088            return Err(err);
1089        }
1090
1091        let _guard = self.core.acquire_io_lock();
1092
1093        // NOTE: we must acquire all locks beforehand, to make sure all the write go through
1094        let mut guards = Vec::with_capacity(self.ops_vec.len());
1095        for (idx, _) in &self.ops_vec {
1096            guards.push(self.core.locks.lock(*idx));
1097        }
1098
1099        for (idx, op) in self.ops_vec {
1100            let offset = idx * std::mem::size_of::<T>();
1101            let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
1102            op(ptr);
1103        }
1104
1105        self.core.dirty.store(true, atomic::Ordering::Release);
1106        let epoch = self.core.incr_curr_epoch();
1107
1108        Ok(epoch)
1109    }
1110}
1111
1112#[derive(Debug)]
1113struct Core {
1114    map: TMap,
1115    locks: Locks,
1116    file: FrozenFile,
1117    cv: sync::Condvar,
1118    curr_length: usize,
1119    lock: sync::Mutex<()>,
1120    io_lock: sync::RwLock<()>,
1121    dirty: atomic::AtomicBool,
1122    durable_cv: sync::Condvar,
1123    closed: atomic::AtomicBool,
1124    durable_lock: sync::Mutex<()>,
1125    flush_duration: time::Duration,
1126    current_epoch: atomic::AtomicU64,
1127    durable_epoch: atomic::AtomicU64,
1128    error: atomic::AtomicPtr<sync::Arc<FrozenError>>,
1129}
1130
1131unsafe impl Send for Core {}
1132unsafe impl Sync for Core {}
1133
1134impl Core {
1135    fn new(
1136        map: TMap,
1137        file: FrozenFile,
1138        flush_duration: time::Duration,
1139        curr_length: usize,
1140        total_slots: usize,
1141    ) -> Self {
1142        Self {
1143            map,
1144            file,
1145            curr_length,
1146            flush_duration,
1147            cv: sync::Condvar::new(),
1148            lock: sync::Mutex::new(()),
1149            io_lock: sync::RwLock::new(()),
1150            locks: Locks::new(total_slots),
1151            durable_cv: sync::Condvar::new(),
1152            durable_lock: sync::Mutex::new(()),
1153            dirty: atomic::AtomicBool::new(false),
1154            closed: atomic::AtomicBool::new(false),
1155            current_epoch: atomic::AtomicU64::new(0),
1156            durable_epoch: atomic::AtomicU64::new(0),
1157            error: atomic::AtomicPtr::new(std::ptr::null_mut()),
1158        }
1159    }
1160
1161    #[inline]
1162    fn sync(&self) -> FrozenResult<()> {
1163        unsafe { self.map.sync(self.curr_length) }?;
1164        self.file.sync()
1165    }
1166
1167    #[inline(always)]
1168    fn set_sync_error(&self, err: FrozenError) {
1169        let boxed = Box::into_raw(Box::new(sync::Arc::new(err)));
1170        let old = self.error.swap(boxed, atomic::Ordering::AcqRel);
1171
1172        // NOTE: we must free the old error, if any, to avoid mem leaks
1173        if !old.is_null() {
1174            unsafe { drop(Box::from_raw(old)) };
1175        }
1176    }
1177
1178    #[inline(always)]
1179    fn get_sync_error(&self) -> Option<FrozenError> {
1180        let ptr = self.error.load(atomic::Ordering::Acquire);
1181        if hints::likely(ptr.is_null()) {
1182            return None;
1183        }
1184
1185        let arc = unsafe { &*ptr }.clone();
1186        Some((*arc).clone())
1187    }
1188
1189    #[inline]
1190    fn clear_sync_error(&self) {
1191        let old = self.error.swap(std::ptr::null_mut(), atomic::Ordering::AcqRel);
1192        if hints::unlikely(!old.is_null()) {
1193            unsafe {
1194                drop(Box::from_raw(old));
1195            }
1196        }
1197    }
1198
1199    /// ## Why we ignore [`std::sync::PoisonError`]?
1200    ///
1201    /// The mutex used for lock, is solely used as a parking primitive for [`Condvar`] and does not
1202    /// protect any mutable state. All the pool invariants and accounting are maintained via atomics
1203    /// and are completely seperated from the mutex.
1204    ///
1205    /// A poisoned mutex only indicates that another tx panicked while holding the lock, and indicates
1206    /// an inconsistent state of the protected value. Since no state can be left partially modified
1207    /// under this lock, there is no possible consistency risk to recover from and propagating the
1208    /// poison error would only introduce unnecessary failures into the allocation path.
1209    ///
1210    /// Therefore, as best effort, we consume the [`std::sync::PoisonError`] and continue operating
1211    /// with the recovered guard.
1212    #[inline]
1213    fn acquire_durable_lock(&self) -> sync::MutexGuard<'_, ()> {
1214        self.durable_lock.lock().unwrap_or_else(|e| e.into_inner())
1215    }
1216
1217    /// NOTE: See [`Core::acquire_durable_lock`] implementation for rationale behind poison recovery
1218    #[inline]
1219    fn acquire_lock(&self) -> sync::MutexGuard<'_, ()> {
1220        self.lock.lock().unwrap_or_else(|e| e.into_inner())
1221    }
1222
1223    /// NOTE: See [`Core::acquire_durable_lock`] implementation for rationale behind poison recovery
1224    #[inline]
1225    fn acquire_io_lock(&self) -> sync::RwLockReadGuard<'_, ()> {
1226        self.io_lock.read().unwrap_or_else(|e| e.into_inner())
1227    }
1228
1229    /// NOTE: See [`Core::acquire_durable_lock`] implementation for rationale behind poison recovery
1230    #[inline]
1231    fn acquire_exclusive_io_lock(&self) -> sync::RwLockWriteGuard<'_, ()> {
1232        self.io_lock.write().unwrap_or_else(|e| e.into_inner())
1233    }
1234
1235    #[inline]
1236    fn incr_curr_epoch(&self) -> u64 {
1237        self.current_epoch.fetch_add(1, atomic::Ordering::Release) + 1
1238    }
1239
1240    #[inline]
1241    fn mark_epoch_durable(&self) {
1242        let curr_epoch = self.current_epoch.load(atomic::Ordering::Acquire);
1243        self.durable_epoch.store(curr_epoch, atomic::Ordering::Release);
1244    }
1245
1246    fn spawn_tx(core: sync::Arc<Self>) -> FrozenResult<thread::JoinHandle<()>> {
1247        match thread::Builder::new().name("fm-flush-tx".into()).spawn(move || Self::flush_tx(core)) {
1248            Ok(tx) => Ok(tx),
1249            Err(error) => err::new_err(err::FXE, error),
1250        }
1251    }
1252
1253    fn flush_tx(core: sync::Arc<Self>) {
1254        // init phase (acquiring locks)
1255        let mut guard = match core.lock.lock() {
1256            Ok(g) => g,
1257            Err(error) => {
1258                core.set_sync_error(err::new_err_raw(err::FXE, error));
1259                core.cv.notify_all();
1260                return;
1261            }
1262        };
1263
1264        // sync loop w/ non-busy waiting
1265        loop {
1266            guard = match core.cv.wait_timeout(guard, core.flush_duration) {
1267                Ok((g, _)) => g,
1268                Err(e) => {
1269                    core.set_sync_error(err::new_err_raw(err::TXE, e));
1270                    core.cv.notify_all();
1271                    return;
1272                }
1273            };
1274
1275            // NOTE: we must read values of close brodcast before acquire exclusive lock, if done
1276            // otherwise, we impose serious deadlock sort of situation for the the flusher tx
1277
1278            let dirty = core.dirty.swap(false, atomic::Ordering::AcqRel);
1279            let closing = core.closed.load(atomic::Ordering::Acquire);
1280
1281            if !dirty {
1282                if closing {
1283                    core.cv.notify_all();
1284                    return;
1285                }
1286
1287                continue;
1288            }
1289
1290            // INFO: we must acquire an exclusive IO lock for sync, hence no write, read or
1291            // grow could kick in while sync is in progress
1292
1293            let io_lock = core.acquire_exclusive_io_lock();
1294
1295            // INFO: we must drop the guard before syscall, as its a blocking operation and holding
1296            // the mutex while the syscall takes place is not a good idea, while we drop the mutex
1297            // and acqurie it again, in-between other process could acquire it and use it
1298            drop(guard);
1299
1300            // NOTE: We snapshot `current_epoch` before sync to establish a strict batch boundary,
1301            // all writes up to `target_epoch` are guaranteed to be part of this flush window,
1302            // while anything beyound belongs to the next batch
1303            let target_epoch = core.current_epoch.load(atomic::Ordering::Acquire);
1304
1305            // NOTE:
1306            //
1307            // - if sync fails, we update the Core::error w/ the received error object
1308            // - we clear it up when another sync call succeeds
1309            // - this is valid, as the underlying sync flushes entire mmaped region, hence
1310            //   even if the last call failed, and the new one succeeded, we do get the durability
1311            //   guarenty for the old data as well
1312
1313            match core.sync() {
1314                Ok(_) => {
1315                    core.durable_epoch.store(target_epoch, atomic::Ordering::Release);
1316
1317                    let _g = core.acquire_durable_lock();
1318
1319                    core.clear_sync_error();
1320                    core.durable_cv.notify_all();
1321                }
1322                Err(err) => {
1323                    core.set_sync_error(err);
1324                    core.durable_cv.notify_all();
1325                }
1326            }
1327
1328            drop(io_lock);
1329            guard = core.acquire_lock();
1330        }
1331    }
1332}
1333
1334#[derive(Debug)]
1335struct Locks(Box<[atomic::AtomicU8]>);
1336
1337impl Locks {
1338    const LOCK: u8 = 1;
1339    const UNLOCK: u8 = 0;
1340
1341    const L1_CONTENTION: usize = 0x10;
1342    const L2_CONTENTION: usize = 0x20;
1343
1344    fn new(cap: usize) -> Self {
1345        let mut slots = Vec::with_capacity(cap);
1346        for _ in 0..cap {
1347            slots.push(atomic::AtomicU8::new(Self::UNLOCK));
1348        }
1349
1350        Self(slots.into_boxed_slice())
1351    }
1352
1353    #[inline(always)]
1354    fn lock(&self, index: usize) -> LockGuard<'_> {
1355        let val = &self.0[index];
1356        let mut spins = 0;
1357
1358        loop {
1359            if val
1360                .compare_exchange_weak(Self::UNLOCK, Self::LOCK, atomic::Ordering::Acquire, atomic::Ordering::Relaxed)
1361                .is_ok()
1362            {
1363                return LockGuard(val);
1364            }
1365
1366            // we must backoff under contention
1367
1368            // NOTE:
1369            //
1370            // We have three levels of backoff,
1371            //
1372            // 1. Busy waiting w/ CPU hint
1373            // 2. Willingly allow preemption by OS schedular
1374            // 3. Sleep the thread (increasing w/ exponenial factor)
1375
1376            if hints::likely(spins < Self::L1_CONTENTION) {
1377                std::hint::spin_loop();
1378            } else if spins < Self::L2_CONTENTION {
1379                thread::yield_now();
1380            } else {
1381                let ns = 0x30 << (spins - Self::L2_CONTENTION).min(0x0A);
1382                thread::sleep(time::Duration::from_nanos(ns));
1383            }
1384
1385            spins += 1;
1386        }
1387    }
1388}
1389
1390struct LockGuard<'a>(&'a atomic::AtomicU8);
1391
1392impl Drop for LockGuard<'_> {
1393    fn drop(&mut self) {
1394        self.0.store(Locks::UNLOCK, atomic::Ordering::Release);
1395    }
1396}
1397
1398#[cfg(test)]
1399mod tests {
1400    use super::*;
1401
1402    // NOTE: we keep this small on purpose, so we won't have to wait at all in tests
1403    const FLUSH_DURATION: time::Duration = time::Duration::from_micros(10);
1404
1405    const MODULE_ID: u8 = 0;
1406    const INIT_SLOTS: usize = 0x0A;
1407
1408    fn new_tmp() -> (tempfile::TempDir, std::path::PathBuf, FrozenMMapCfg) {
1409        let dir = tempfile::tempdir().unwrap();
1410        let path = dir.path().join("tmp_map");
1411
1412        let cfg = FrozenMMapCfg { module_id: MODULE_ID, initial_count: INIT_SLOTS, flush_duration: FLUSH_DURATION };
1413
1414        (dir, path, cfg)
1415    }
1416
1417    mod fm_lifecycle {
1418        use super::*;
1419
1420        #[test]
1421        fn ok_new() {
1422            let (_dir, path, cfg) = new_tmp();
1423            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1424
1425            assert_eq!(mmap.core.flush_duration, FLUSH_DURATION);
1426            assert!(!mmap.core.dirty.load(atomic::Ordering::Acquire));
1427            assert!(!mmap.core.closed.load(atomic::Ordering::Acquire));
1428            assert_eq!(mmap.core.durable_epoch.load(atomic::Ordering::Acquire), 0);
1429            assert_eq!(mmap.core.curr_length, INIT_SLOTS * FrozenMMap::<u64>::SLOT_SIZE);
1430
1431            // satisfies the bg thread was spawned correctly
1432            assert!(mmap.core.error.load(atomic::Ordering::Acquire).is_null());
1433
1434            // satisfies wait on epoch works
1435            let epoch = unsafe { mmap.write(0, |f| *f = 0x0A).unwrap() };
1436            assert!(mmap.wait_for_durability(epoch).is_ok());
1437        }
1438
1439        #[test]
1440        fn ok_new_existing() {
1441            let (_dir, path, cfg) = new_tmp();
1442
1443            // create new + close
1444            let mmap1 = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1445            drop(mmap1);
1446
1447            // open existing
1448            let mmap2 = FrozenMMap::<u64>::new(path, cfg).unwrap();
1449            drop(mmap2);
1450        }
1451
1452        #[test]
1453        fn err_new_when_change_in_cfg() {
1454            let (_dir, path, mut cfg) = new_tmp();
1455
1456            // create new + close
1457            let mmap1 = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1458            drop(mmap1);
1459
1460            // update cfg + opne existing
1461            cfg.initial_count = INIT_SLOTS * 2;
1462            assert!(FrozenMMap::<u64>::new(path, cfg).is_err());
1463        }
1464
1465        #[test]
1466        fn ok_delete() {
1467            let (_dir, path, cfg) = new_tmp();
1468            let mut mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1469
1470            mmap.delete().unwrap();
1471            assert!(!mmap.core.file.exists().unwrap());
1472        }
1473
1474        #[test]
1475        fn err_delete_after_delete() {
1476            let (_dir, path, cfg) = new_tmp();
1477            let mut mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1478
1479            mmap.delete().unwrap();
1480            assert!(!mmap.core.file.exists().unwrap());
1481            assert!(mmap.delete().is_err());
1482        }
1483
1484        #[test]
1485        fn ok_drop_persists_when_dropped_before_bg_flush() {
1486            let (_dir, path, cfg) = new_tmp();
1487            const VAL: u64 = 0x0A;
1488
1489            {
1490                let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1491                unsafe { mmap.write(0, |byte| *byte = VAL).unwrap() };
1492                drop(mmap);
1493            }
1494
1495            {
1496                let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1497                let val = unsafe { mmap.read(0, |byte| *byte).unwrap() };
1498                assert_eq!(val, VAL);
1499            }
1500        }
1501    }
1502
1503    mod fm_validate_t {
1504        use super::*;
1505
1506        #[repr(C, align(8))]
1507        struct OkT {
1508            a: u64,
1509            b: u64,
1510        }
1511
1512        #[repr(C)]
1513        struct BadAlignT {
1514            a: u32,
1515            b: u32,
1516        }
1517
1518        #[repr(C, align(4))]
1519        struct BadSizeT {
1520            a: u32,
1521            b: u16,
1522        }
1523
1524        #[repr(C, align(8))]
1525        struct DropT(u64);
1526
1527        impl Drop for DropT {
1528            fn drop(&mut self) {}
1529        }
1530
1531        #[repr(C, align(8))]
1532        struct ZstT;
1533
1534        #[test]
1535        fn ok_validate_t() {
1536            assert!(FrozenMMap::<OkT>::validate_t().is_ok());
1537        }
1538
1539        #[test]
1540        fn err_validate_t_when_drop() {
1541            assert!(FrozenMMap::<DropT>::validate_t().is_err());
1542        }
1543
1544        #[test]
1545        fn err_validate_t_when_not_8_byte_aligned() {
1546            assert!(FrozenMMap::<BadAlignT>::validate_t().is_err());
1547        }
1548
1549        #[test]
1550        fn err_validate_t_when_size_not_multiple_of_8() {
1551            assert!(FrozenMMap::<BadSizeT>::validate_t().is_err());
1552        }
1553
1554        #[test]
1555        fn err_validate_t_when_zero_sized() {
1556            assert!(FrozenMMap::<ZstT>::validate_t().is_err());
1557        }
1558
1559        #[test]
1560        fn err_new_when_t_implements_drop() {
1561            let (_dir, path, cfg) = new_tmp();
1562            assert!(FrozenMMap::<DropT>::new(path, cfg).is_err());
1563        }
1564
1565        #[test]
1566        fn err_new_when_t_is_not_8_byte_aligned() {
1567            let (_dir, path, cfg) = new_tmp();
1568            assert!(FrozenMMap::<BadAlignT>::new(path, cfg).is_err());
1569        }
1570
1571        #[test]
1572        fn err_new_when_t_size_is_not_multiple_of_8() {
1573            let (_dir, path, cfg) = new_tmp();
1574            assert!(FrozenMMap::<BadSizeT>::new(path, cfg).is_err());
1575        }
1576
1577        #[test]
1578        fn err_new_when_t_is_zero_sized() {
1579            let (_dir, path, cfg) = new_tmp();
1580            assert!(FrozenMMap::<ZstT>::new(path, cfg).is_err());
1581        }
1582
1583        #[test]
1584        fn err_new_grown_when_t_implements_drop() {
1585            let (_dir, path, cfg) = new_tmp();
1586            assert!(FrozenMMap::<DropT>::new_grown(path, cfg, 1).is_err());
1587        }
1588
1589        #[test]
1590        fn err_new_grown_when_t_is_not_8_byte_aligned() {
1591            let (_dir, path, cfg) = new_tmp();
1592            assert!(FrozenMMap::<BadAlignT>::new_grown(path, cfg, 1).is_err());
1593        }
1594
1595        #[test]
1596        fn err_new_grown_when_t_size_is_not_multiple_of_8() {
1597            let (_dir, path, cfg) = new_tmp();
1598            assert!(FrozenMMap::<BadSizeT>::new_grown(path, cfg, 1).is_err());
1599        }
1600
1601        #[test]
1602        fn err_new_grown_when_t_is_zero_sized() {
1603            let (_dir, path, cfg) = new_tmp();
1604            assert!(FrozenMMap::<ZstT>::new_grown(path, cfg, 1).is_err());
1605        }
1606    }
1607
1608    mod fm_new_grown {
1609        use super::*;
1610
1611        #[test]
1612        fn ok_new_grown_updates_length() {
1613            let (_dir, path, cfg) = new_tmp();
1614
1615            let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1616            assert_eq!(mmap.total_slots(), INIT_SLOTS);
1617            drop(mmap);
1618
1619            let mmap = FrozenMMap::<u64>::new_grown(path, cfg, 0x0A).unwrap();
1620            assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x0A);
1621            assert_eq!(mmap.core.curr_length, (INIT_SLOTS + 0x0A) * FrozenMMap::<u64>::SLOT_SIZE);
1622        }
1623
1624        #[test]
1625        fn err_new_grown_with_preexisting_instance() {
1626            let (_dir, path, cfg) = new_tmp();
1627
1628            let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1629            assert_eq!(mmap.total_slots(), INIT_SLOTS);
1630
1631            assert!(FrozenMMap::<u64>::new_grown(path, cfg, 0x0A).is_err());
1632        }
1633
1634        #[test]
1635        fn ok_new_grown_cycle() {
1636            let (_dir, path, cfg) = new_tmp();
1637
1638            let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1639            drop(mmap);
1640
1641            let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x100).unwrap();
1642            assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x100);
1643            drop(mmap);
1644
1645            let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x100).unwrap();
1646            assert_eq!(mmap.total_slots(), INIT_SLOTS + (2 * 0x100));
1647            drop(mmap);
1648
1649            let mmap = FrozenMMap::<u64>::new_grown(path, cfg, 0x100).unwrap();
1650            assert_eq!(mmap.total_slots(), INIT_SLOTS + (3 * 0x100));
1651        }
1652
1653        #[test]
1654        fn ok_write_reopen_grown_read() {
1655            let (_dir, path, cfg) = new_tmp();
1656
1657            {
1658                let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1659                unsafe { mmap.write(0, |v| *v = 0xAA).unwrap() };
1660            }
1661
1662            {
1663                let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x10).unwrap();
1664                unsafe { mmap.write(0, |v| *v = 0xBB).unwrap() };
1665
1666                let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1667                assert_eq!(val, 0xBB);
1668            }
1669        }
1670
1671        #[test]
1672        fn ok_write_reopen_grown_read_cycle() {
1673            let (_dir, path, cfg) = new_tmp();
1674
1675            {
1676                let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1677                unsafe { mmap.write(0, |v| *v = 1).unwrap() };
1678            }
1679
1680            for i in 0..2 {
1681                let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x10).unwrap();
1682                let idx = mmap.total_slots() - 1;
1683                unsafe { mmap.write(idx, |v| *v = (i + 2) as u64).unwrap() };
1684                drop(mmap);
1685            }
1686
1687            let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1688
1689            let base = unsafe { mmap.read(0, |v| *v).unwrap() };
1690            assert_eq!(base, 1);
1691
1692            let last_idx = mmap.total_slots() - 1;
1693            let last = unsafe { mmap.read(last_idx, |v| *v).unwrap() };
1694            assert_eq!(last, 3);
1695        }
1696
1697        #[test]
1698        fn err_new_grown_while_previous_instance_is_alive() {
1699            let (_dir, path, cfg) = new_tmp();
1700
1701            let _mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1702            let reopened = FrozenMMap::<u64>::new_grown(&path, cfg, 0x10);
1703
1704            assert!(reopened.is_err());
1705        }
1706    }
1707
1708    mod fm_write_read {
1709        use super::*;
1710
1711        #[test]
1712        fn ok_write_wait_read_cycle() {
1713            const VAL: u64 = 0xDEADC0DE;
1714            let (_dir, path, cfg) = new_tmp();
1715            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1716
1717            // write + sync
1718            let epoch = unsafe { mmap.write(0, |ptr| *ptr = VAL).unwrap() };
1719            mmap.wait_for_durability(epoch).unwrap();
1720
1721            // read + verify
1722            let val = unsafe { mmap.read(0, |ptr| *ptr).unwrap() };
1723            assert_eq!(val, VAL);
1724        }
1725
1726        #[test]
1727        fn ok_write_read_without_wait() {
1728            const VAL: u64 = 0xDEADC0DE;
1729            let (_dir, path, cfg) = new_tmp();
1730            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1731
1732            unsafe { mmap.write(0, |ptr| *ptr = VAL).unwrap() };
1733            let val = unsafe { mmap.read(0, |ptr| *ptr).unwrap() };
1734            assert_eq!(val, VAL);
1735        }
1736    }
1737
1738    mod fm_write_sync_read {
1739        use super::*;
1740
1741        #[test]
1742        fn ok_write_sync_read() {
1743            let (_dir, path, cfg) = new_tmp();
1744            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1745
1746            unsafe { mmap.write_sync(0, |v| *v = 0x4C).unwrap() };
1747
1748            let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1749            assert_eq!(val, 0x4C);
1750        }
1751
1752        #[test]
1753        fn ok_write_sync_wait_returns_immediately() {
1754            let (_dir, path, cfg) = new_tmp();
1755            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1756
1757            let _ = unsafe { mmap.write_sync(0, |v| *v = 0x6A).unwrap() };
1758
1759            let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1760            assert_eq!(val, 0x6A);
1761        }
1762
1763        #[test]
1764        fn ok_write_sync_followed_by_async_write() {
1765            let (_dir, path, cfg) = new_tmp();
1766
1767            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1768            unsafe { mmap.write_sync(0, |v| *v = 0x0A).unwrap() };
1769
1770            let epoch = unsafe { mmap.write(0, |v| *v = 0x14).unwrap() };
1771            mmap.wait_for_durability(epoch).unwrap();
1772
1773            let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1774            assert_eq!(val, 0x14);
1775        }
1776
1777        #[test]
1778        fn ok_write_sync_makes_prev_batch_durable() {
1779            let (_dir, path, cfg) = new_tmp();
1780            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1781
1782            let async_epoch = unsafe { mmap.write(0, |v| *v = 1).unwrap() };
1783            unsafe { mmap.write_sync(1, |v| *v = 2).unwrap() };
1784            mmap.wait_for_durability(async_epoch).unwrap();
1785
1786            let v1 = unsafe { mmap.read(0, |v| *v).unwrap() };
1787            let v2 = unsafe { mmap.read(1, |v| *v).unwrap() };
1788
1789            assert_eq!(v1, 1);
1790            assert_eq!(v2, 2);
1791        }
1792
1793        #[test]
1794        fn ok_write_sync_persists_across_reopen() {
1795            let (_dir, path, cfg) = new_tmp();
1796            const VAL: u64 = 0x1000;
1797
1798            {
1799                let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1800                unsafe { mmap.write_sync(0, |v| *v = VAL).unwrap() };
1801            }
1802
1803            {
1804                let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1805                let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1806                assert_eq!(val, VAL);
1807            }
1808        }
1809    }
1810
1811    mod fm_tx {
1812        use super::*;
1813
1814        #[test]
1815        fn ok_tx_basic_multi_write() {
1816            let (_dir, path, cfg) = new_tmp();
1817            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1818
1819            let mut tx = mmap.new_tx();
1820            unsafe {
1821                tx.write(0, |v| *v = 1).unwrap();
1822                tx.write(1, |v| *v = 2).unwrap();
1823                tx.write(2, |v| *v = 3).unwrap();
1824            }
1825
1826            let epoch = tx.commit().unwrap();
1827            mmap.wait_for_durability(epoch).unwrap();
1828
1829            let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1830            let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
1831            let v2 = unsafe { mmap.read(2, |v| *v).unwrap() };
1832
1833            assert_eq!((v0, v1, v2), (1, 2, 3));
1834        }
1835
1836        #[test]
1837        fn ok_tx_single_epoch() {
1838            let (_dir, path, cfg) = new_tmp();
1839            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1840
1841            let mut tx = mmap.new_tx();
1842            unsafe {
1843                tx.write(0, |v| *v = 0x0A).unwrap();
1844                tx.write(1, |v| *v = 0x14).unwrap();
1845            }
1846
1847            // NOTE: next write must have strictly higher epoch then current one
1848
1849            let epoch = tx.commit().unwrap();
1850            let next_epoch = unsafe { mmap.write(2, |v| *v = 0x1E).unwrap() };
1851
1852            assert!(next_epoch > epoch);
1853        }
1854
1855        #[test]
1856        fn err_tx_out_of_order() {
1857            let (_dir, path, cfg) = new_tmp();
1858            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1859
1860            let mut tx = mmap.new_tx();
1861            unsafe {
1862                tx.write(2, |v| *v = 1).unwrap();
1863                let res = tx.write(1, |v| *v = 2);
1864                assert!(res.is_err());
1865            }
1866        }
1867
1868        #[test]
1869        fn err_tx_duplicate_index() {
1870            let (_dir, path, cfg) = new_tmp();
1871            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1872
1873            let mut tx = mmap.new_tx();
1874            unsafe {
1875                tx.write(1, |v| *v = 1).unwrap();
1876                let res = tx.write(1, |v| *v = 2);
1877                assert!(res.is_err());
1878            }
1879        }
1880
1881        #[test]
1882        fn ok_tx_concurrent_non_overlapping() {
1883            let (_dir, path, cfg) = new_tmp();
1884            let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
1885
1886            let mut handles = Vec::new();
1887            for i in 0..2 {
1888                let mmap = mmap.clone();
1889                handles.push(thread::spawn(move || {
1890                    let mut tx = mmap.new_tx();
1891
1892                    unsafe {
1893                        tx.write(i * 2, |v| *v = i as u64).unwrap();
1894                        tx.write(i * 2 + 1, |v| *v = i as u64).unwrap();
1895                    }
1896
1897                    let epoch = tx.commit().unwrap();
1898                    mmap.wait_for_durability(epoch).unwrap();
1899                }));
1900            }
1901
1902            for h in handles {
1903                h.join().unwrap();
1904            }
1905
1906            for i in 0..2 {
1907                let v0 = unsafe { mmap.read(i * 2, |v| *v).unwrap() };
1908                let v1 = unsafe { mmap.read(i * 2 + 1, |v| *v).unwrap() };
1909
1910                assert_eq!((v0, v1), (i as u64, i as u64));
1911            }
1912        }
1913
1914        #[test]
1915        fn ok_tx_overwrite_last_wins() {
1916            let (_dir, path, cfg) = new_tmp();
1917            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1918
1919            let mut tx = mmap.new_tx();
1920            unsafe {
1921                tx.write(0, |v| *v = 1).unwrap();
1922            }
1923
1924            tx.commit().unwrap();
1925
1926            let mut tx2 = mmap.new_tx();
1927            unsafe {
1928                tx2.write(0, |v| *v = 2).unwrap();
1929            }
1930
1931            let epoch = tx2.commit().unwrap();
1932            mmap.wait_for_durability(epoch).unwrap();
1933
1934            let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1935            assert_eq!(val, 2);
1936        }
1937
1938        #[test]
1939        fn ok_tx_persists_across_reopen() {
1940            let (_dir, path, cfg) = new_tmp();
1941
1942            {
1943                let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1944
1945                let mut tx = mmap.new_tx();
1946                unsafe {
1947                    tx.write(0, |v| *v = 0x3A).unwrap();
1948                    tx.write(1, |v| *v = 0x54).unwrap();
1949                }
1950
1951                let epoch = tx.commit().unwrap();
1952                mmap.wait_for_durability(epoch).unwrap();
1953            }
1954
1955            {
1956                let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1957
1958                let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1959                let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
1960
1961                assert_eq!((v0, v1), (0x3A, 0x54));
1962            }
1963        }
1964    }
1965
1966    mod fm_durability {
1967        use super::*;
1968
1969        #[test]
1970        fn ok_wait_then_drop() {
1971            let (_dir, path, cfg) = new_tmp();
1972            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1973
1974            let epoch = unsafe { mmap.write(0, |v| *v = 7).unwrap() };
1975            mmap.wait_for_durability(epoch).unwrap();
1976
1977            drop(mmap);
1978        }
1979
1980        #[test]
1981        fn ok_epoch_monotonicity() {
1982            let (_dir, path, cfg) = new_tmp();
1983            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1984
1985            let e1 = unsafe { mmap.write(0, |v| *v = 1).unwrap() };
1986            mmap.wait_for_durability(e1).unwrap();
1987
1988            let e2 = unsafe { mmap.write(0, |v| *v = 2).unwrap() };
1989            mmap.wait_for_durability(e2).unwrap();
1990            assert!(e2 >= e1);
1991        }
1992
1993        #[test]
1994        fn ok_wait_for_durability_with_multi_writers() {
1995            let (_dir, path, cfg) = new_tmp();
1996            let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
1997
1998            let mut handles = Vec::new();
1999            for _ in 0..2 {
2000                let mmap = mmap.clone();
2001                handles.push(thread::spawn(move || {
2002                    let epoch = unsafe { mmap.write(0, |v| *v += 1).unwrap() };
2003                    mmap.wait_for_durability(epoch).unwrap();
2004                }));
2005            }
2006
2007            for h in handles {
2008                h.join().unwrap();
2009            }
2010
2011            let val = unsafe { mmap.read(0, |v| *v).unwrap() };
2012            assert_eq!(val, 2);
2013        }
2014    }
2015
2016    mod fm_concurrency {
2017        use super::*;
2018
2019        #[test]
2020        fn ok_parallel_reads_with_diff_index() {
2021            let (_dir, path, cfg) = new_tmp();
2022            let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
2023
2024            unsafe { mmap.write(0, |v| *v = 0x10).unwrap() };
2025            unsafe { mmap.write(1, |v| *v = 0x20).unwrap() };
2026
2027            let t1 = {
2028                let mmap = mmap.clone();
2029                thread::spawn(move || unsafe { mmap.read(0, |v| *v).unwrap() })
2030            };
2031
2032            let t2 = {
2033                let mmap = mmap.clone();
2034                thread::spawn(move || unsafe { mmap.read(1, |v| *v).unwrap() })
2035            };
2036
2037            assert_eq!(t1.join().unwrap(), 0x10);
2038            assert_eq!(t2.join().unwrap(), 0x20);
2039        }
2040
2041        #[test]
2042        fn ok_multi_tx_drop_then_reopen_grown() {
2043            let (_dir, path, cfg) = new_tmp();
2044
2045            {
2046                let mmap = sync::Arc::new(FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap());
2047
2048                let mut handles = Vec::new();
2049                for i in 0..2u64 {
2050                    let mmap = mmap.clone();
2051                    handles.push(thread::spawn(move || {
2052                        let _ = unsafe { mmap.write(i as usize, |v| *v = i + 1).unwrap() };
2053                    }));
2054                }
2055
2056                for i in 0..2usize {
2057                    let mmap = mmap.clone();
2058                    handles.push(thread::spawn(move || {
2059                        let _ = unsafe { mmap.read(i, |v| *v).unwrap() };
2060                    }));
2061                }
2062
2063                for h in handles {
2064                    h.join().unwrap();
2065                }
2066            }
2067
2068            {
2069                let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x10).unwrap();
2070                assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x10);
2071
2072                for i in 0..2u64 {
2073                    let val = unsafe { mmap.read(i as usize, |v| *v).unwrap() };
2074                    assert_eq!(val, i + 1);
2075                }
2076
2077                let idx = mmap.total_slots() - 1;
2078                unsafe { mmap.write(idx, |v| *v = 0xDEAD).unwrap() };
2079
2080                let val = unsafe { mmap.read(idx, |v| *v).unwrap() };
2081                assert_eq!(val, 0xDEAD);
2082            }
2083        }
2084    }
2085}