Skip to main content

frozen_core/fmmap/
mod.rs

1//! Custom implementation of `mmap(2)`
2//!
3//! ## Constraints
4//!
5//! [`FrozenMMap`] treats the mapped file as raw storage for values of `T`. Because of that,
6//! `T` must be a POD type which is safe to persist and later reinterpret from bytes.
7//!
8//! Required properties for `T`,
9//!
10//! - Must use `#[repr(C)]`
11//! - Should be 8-bytes aligned
12//! - Must not implement [`Drop`]
13//! - `size_of::<T>()` should be multiple of `8`
14//!
15//! *NOTE:* `T` must not contain heap owning or process-local pointers like [`Vec`], [`String`],
16//! [`Box`], references and function pointers, or other fields whose bit-pattern is not stable
17//! across reopen.
18//!
19//! These constrains are enforced as [`FrozenMMap`] does not serialize or deserialize values. It
20//! directly reads and writes `T` inside a memory mapped file. That means `T` must have a stable
21//! layout and must remain valid when the file is reopened in a later process.
22//!
23//! ## Example
24//!
25//! ```
26//! use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
27//!
28//! const MODULE_ID: u8 = 0;
29//!
30//! let dir = tempfile::tempdir().unwrap();
31//! let path = dir.path().join("tmp_frozen_mmap");
32//!
33//! let cfg = FrozenMMapCfg {
34//!     module_id: MODULE_ID,
35//!     initial_count: 0x0A,
36//!     flush_duration: std::time::Duration::from_micros(0x96),
37//! };
38//!
39//! let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
40//! assert_eq!(mmap.total_slots(), 0x0A);
41//!
42//! let epoch = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
43//! mmap.wait_for_durability(epoch).unwrap();
44//!
45//! let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
46//! assert_eq!(val, 0xDEADC0DE);
47//!
48//! drop(mmap);
49//!
50//! let reopened = FrozenMMap::<u64>::new_grown(&path, cfg, 0x05).unwrap();
51//! assert_eq!(reopened.total_slots(), 0x0A + 0x05);
52//!
53//! let val = unsafe { reopened.read(0, |v| *v) }.unwrap();
54//! assert_eq!(val, 0xDEADC0DE);
55//! ```
56
57#[cfg(any(target_os = "linux", target_os = "macos"))]
58mod posix;
59
60use crate::{
61    error::{FrozenError, FrozenResult},
62    ffile::{FrozenFile, FrozenFileCfg},
63    hints,
64};
65use std::{
66    fmt,
67    sync::{self, atomic},
68    thread, time,
69};
70
71/// type for `epoch` used by write ops
72pub type TEpoch = u64;
73
74#[cfg(any(target_os = "linux", target_os = "macos"))]
75type TMap = posix::POSIXMMap;
76
77/// Error codes for [`FrozenMMap`]
78pub(in crate::fmmap) mod err {
79    use crate::error::{ErrCode, FrozenError, FrozenResult};
80
81    /// Domain Id for [`FrozenMMap`] is **18**
82    const ERRDOMAIN: u8 = 0x12;
83
84    /// module id used for [`FrozenMMap`]
85    pub static MID: std::sync::OnceLock<u8> = std::sync::OnceLock::new();
86
87    #[cfg(not(test))]
88    #[inline(always)]
89    pub fn mid() -> &'static u8 {
90        MID.get().unwrap()
91    }
92
93    #[cfg(test)]
94    #[inline(always)]
95    pub fn mid() -> &'static u8 {
96        MID.get_or_init(|| 0)
97    }
98
99    /// internal fuck up (hault and catch fire)
100    pub const HCF: ErrCode = ErrCode::new(0x02, "hault and catch fire");
101
102    /// unknown error (fallback)
103    pub const UNK: ErrCode = ErrCode::new(0x04, "unknown error");
104
105    /// no more memory available
106    pub const NMM: ErrCode = ErrCode::new(0x06, "not enough memory available on the device");
107
108    /// syncing error
109    pub const SYN: ErrCode = ErrCode::new(0x08, "failed to sync/flush data to storage device");
110
111    /// no write/read perm
112    pub const PRM: ErrCode = ErrCode::new(0x0A, "missing permissions for IO");
113
114    /// flush_tx error (panic inside)
115    pub const TXE: ErrCode = ErrCode::new(0x0C, "flush_tx paniced inside");
116
117    /// flush_tx error (unable to spawn)
118    pub const FXE: ErrCode = ErrCode::new(0x10, "unable to spawn flush_tx");
119
120    /// type `T` implements drop
121    pub const DRP: ErrCode = ErrCode::new(0x12, "type T must not implement `Drop`");
122
123    /// type `T` is not 8 bytes aligned
124    pub const ALN: ErrCode = ErrCode::new(0x14, "type T must be 8-bytes aligned");
125
126    /// `size_of::<T>()` is not multiple of 8
127    pub const SZE: ErrCode = ErrCode::new(0x16, "`size_of::<T>()` must be multiple of 8 bytes");
128
129    /// type `T` must not be zero sized
130    pub const ZRO: ErrCode = ErrCode::new(0x18, "type T must not be zero sized");
131
132    #[inline]
133    pub(in crate::fmmap) fn new_err<R, E: std::fmt::Display>(
134        code: ErrCode,
135        error: E,
136    ) -> FrozenResult<R> {
137        let err = FrozenError::new_raw(*mid(), ERRDOMAIN, code, error);
138        Err(err)
139    }
140
141    #[inline]
142    pub(in crate::fmmap) fn new_err_default<R>(code: ErrCode) -> FrozenResult<R> {
143        let err = FrozenError::new_raw(*mid(), ERRDOMAIN, code, "");
144        Err(err)
145    }
146
147    #[inline]
148    pub(in crate::fmmap) fn new_err_raw<E: std::fmt::Display>(
149        code: ErrCode,
150        error: E,
151    ) -> FrozenError {
152        FrozenError::new_raw(*mid(), ERRDOMAIN, code, error)
153    }
154}
155
156/// Config for [`FrozenMMap`]
157#[derive(Debug, Clone)]
158pub struct FrozenMMapCfg {
159    /// Identifier used for error propagation by [`frozen_core::error::FrozenError`]
160    pub module_id: u8,
161
162    /// Number of slots to pre-allocate when [`FrozenMMap`] is initialized
163    ///
164    /// Each slot has size of [`FrozenMMap::<T>::SLOT_SIZE`], where initial file length will
165    /// be `chunk_size * initial_count` (bytes)
166    pub initial_count: usize,
167
168    /// Time interval used by flusher tx, to batch write ops into a durable window and sync them
169    /// together, where all write ops in certain time interval falls into a single durable window
170    pub flush_duration: time::Duration,
171}
172
173/// Custom implementation of `mmap(2)`
174///
175/// ## Constraints
176///
177/// [`FrozenMMap`] treats the mapped file as raw storage for values of `T`. Because of that, `T`
178/// must be a POD type which is safe to persist and later reinterpret from bytes.
179///
180/// Required properties for `T`,
181///
182/// - Must use `#[repr(C)]`
183/// - Should be 8-bytes aligned
184/// - Must not implement [`Drop`]
185/// - `size_of::<T>()` should be multiple of `8`
186///
187/// *NOTE:* `T` must not contain heap owning or process-local pointers like [`Vec`], [`String`],
188/// [`Box`], references and function pointers, or other fields whose bit-pattern is not stable
189/// across reopen.
190///
191/// These constrains are enforced as [`FrozenMMap`] does not serialize or deserialize values. It
192/// directly reads and writes `T` inside a memory mapped file. That means `T` must have a stable
193/// layout and must remain valid when the file is reopened in a later process.
194///
195/// ## Example
196///
197/// ```
198/// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
199///
200/// const MODULE_ID: u8 = 0;
201///
202/// let dir = tempfile::tempdir().unwrap();
203/// let path = dir.path().join("tmp_frozen_mmap");
204///
205/// let cfg = FrozenMMapCfg {
206///     module_id: MODULE_ID,
207///     initial_count: 0x0A,
208///     flush_duration: std::time::Duration::from_micros(0x96),
209/// };
210///
211/// let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
212/// assert_eq!(mmap.total_slots(), 0x0A);
213///
214/// let epoch = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
215/// mmap.wait_for_durability(epoch).unwrap();
216///
217/// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
218/// assert_eq!(val, 0xDEADC0DE);
219///
220/// drop(mmap);
221///
222/// let reopened = FrozenMMap::<u64>::new_grown(&path, cfg, 0x05).unwrap();
223/// assert_eq!(reopened.total_slots(), 0x0A + 0x05);
224///
225/// let val = unsafe { reopened.read(0, |v| *v) }.unwrap();
226/// assert_eq!(val, 0xDEADC0DE);
227/// ```
228#[derive(Debug)]
229pub struct FrozenMMap<T>
230where
231    T: Sized + Send + Sync,
232{
233    core: sync::Arc<Core>,
234    tx: Option<thread::JoinHandle<()>>,
235    _t: core::marker::PhantomData<T>,
236}
237
238unsafe impl<T> Send for FrozenMMap<T> where T: Sized + Send + Sync {}
239unsafe impl<T> Sync for FrozenMMap<T> where T: Sized + Send + Sync {}
240
241impl<T> FrozenMMap<T>
242where
243    T: Sized + Send + Sync,
244{
245    /// Memory space required for each slot of [`T`] in [`FrozenMMap`]
246    pub const SLOT_SIZE: usize = std::mem::size_of::<T>();
247
248    /// Create a new [`FrozenMMap`] instance w/ given [`FrozenMMapCfg`]
249    ///
250    /// ## Multiple Instances
251    ///
252    /// For each [`FrozenMMap`] instance, we acquire an exclusive lock from the kernal for the
253    /// underlying [`FrozenFile`], when trying to create multiple instances of [`FrozenMMap`], an
254    /// error will be thrown.
255    ///
256    /// ## Capacity Growth
257    ///
258    /// [`FrozenMMap`] does not support in-place growth of a live mapping, to increase capacity,
259    /// drop the current instance and reopen w/ [`FrozenMMap::open_grown`] which provides memory
260    /// mapping over grown capacity.
261    ///
262    /// ## [`FrozenMMapCfg`]
263    ///
264    /// All configs for [`FrozenMMap`] are stored in [`FrozenMMapCfg`]
265    ///
266    /// ## Working
267    ///
268    /// We first create a new [`FrozenFile`] if note already, then map the entire file using
269    /// `mmap(2)`, the entire file must read/write `T`, which also should stay constant for the
270    /// entire lifetime of file.
271    ///
272    /// ## Important
273    ///
274    /// The `cfg` must not change any of its properties for the entire life of [`FrozenFile`],
275    /// which is used under the hood, one must use config stores like [`Rta`](https://crates.io/crates/rta)
276    /// to store config.
277    ///
278    /// ## Example
279    ///
280    /// ```
281    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
282    ///
283    /// const MODULE_ID: u8 = 0;
284    ///
285    /// let dir = tempfile::tempdir().unwrap();
286    /// let path = dir.path().join("tmp_frozen_mmap");
287    ///
288    /// let cfg = FrozenMMapCfg {
289    ///     module_id: MODULE_ID,
290    ///     initial_count: 0x0A,
291    ///     flush_duration: std::time::Duration::from_micros(0x96),
292    /// };
293    ///
294    /// let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
295    /// assert_eq!(mmap.total_slots(), 0x0A);
296    ///
297    /// let epoch = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
298    /// mmap.wait_for_durability(epoch).unwrap();
299    ///
300    /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
301    /// assert_eq!(val, 0xDEADC0DE);
302    /// ```
303    pub fn new<P: AsRef<std::path::Path>>(path: P, cfg: FrozenMMapCfg) -> FrozenResult<Self> {
304        Self::validate_t()?;
305        let (file, curr_length) = Self::open_file(path.as_ref().to_path_buf(), &cfg)?;
306        let total_slots = curr_length / Self::SLOT_SIZE;
307
308        // NOTE: The value is used for error logging and is initialized only once, as `OnceLock`
309        // guarantees that the first caller sets the value and all subsequent calls reuse it
310        let _ = err::MID.get_or_init(|| cfg.module_id);
311
312        let mmap = unsafe { TMap::new(file.fd(), curr_length) }?;
313        let core =
314            sync::Arc::new(Core::new(mmap, file, cfg.flush_duration, curr_length, total_slots));
315
316        // INFO: we spawn the thread for background sync
317        let tx = Core::spawn_tx(core.clone())?;
318
319        Ok(Self { core, tx: Some(tx), _t: core::marker::PhantomData })
320    }
321
322    /// Create a new [`FrozenMMap`] instance w/ given [`FrozenMMapCfg`], while growing the
323    /// underlying [`FrozenFile`] by `additional_slots` before creating memory mapping
324    ///
325    /// ## Multiple Instances
326    ///
327    /// For each [`FrozenMMap`] instance, we acquire an exclusive lock from the kernal for the
328    /// underlying [`FrozenFile`], when trying to create multiple instances of [`FrozenMMap`], an
329    /// error will be thrown.
330    ///
331    /// ## Why not create a [`FrozenMMap::grow`] call?
332    ///
333    /// Previously when [`FrozenMMap::grow`] was attempted, it was observed that, resizing an active
334    /// memory mapping in place is tricky in concurrent code, as some threads would still hold
335    /// stale/unmapped pointers to mmap due to preemption from the OS schedular
336    ///
337    /// So, instead of remmaping a live instance, the current API performs growth during open,
338    /// making capacity expansion an explicit lifecycle operation, and not a side effect on a live
339    /// instance.
340    ///
341    /// ## [`FrozenMMapCfg`]
342    ///
343    /// All configs for [`FrozenMMap`] are stored in [`FrozenMMapCfg`]
344    ///
345    /// ## Working
346    ///
347    /// We first create a new [`FrozenFile`] if note already, then we grow the file using
348    /// [`FrozenFile::grow`] then map the entire file using `mmap(2)`, the entire file must
349    /// read/write `T`, which also should stay constant for the entire lifetime of file.
350    ///
351    /// ## Important
352    ///
353    /// The `cfg` must not change any of its properties for the entire life of [`FrozenFile`],
354    /// which is used under the hood, one must use config stores like
355    /// [`Rta`](https://crates.io/crates/rta) to store config.
356    ///
357    /// ## Example
358    ///
359    /// ```
360    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
361    ///
362    /// const MODULE_ID: u8 = 0;
363    ///
364    /// let dir = tempfile::tempdir().unwrap();
365    /// let path = dir.path().join("tmp_frozen_mmap");
366    ///
367    /// let cfg = FrozenMMapCfg {
368    ///     module_id: MODULE_ID,
369    ///     initial_count: 0x0A,
370    ///     flush_duration: std::time::Duration::from_micros(0x96),
371    /// };
372    ///
373    /// let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x0A).unwrap();
374    /// assert_eq!(mmap.total_slots(), 0x0A * 2);
375    ///
376    /// let epoch = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
377    /// mmap.wait_for_durability(epoch).unwrap();
378    ///
379    /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
380    /// assert_eq!(val, 0xDEADC0DE);
381    /// ```
382    pub fn new_grown<P: AsRef<std::path::Path>>(
383        path: P,
384        cfg: FrozenMMapCfg,
385        additional_slots: usize,
386    ) -> FrozenResult<Self> {
387        Self::validate_t()?;
388        let (file, _) = Self::open_file(path.as_ref().to_path_buf(), &cfg)?;
389
390        // we grow the underlying FrozenFile as requested
391        file.grow(additional_slots)?;
392        let curr_length = file.length()?; // we must read the updated (grown) length
393        let total_slots = curr_length / Self::SLOT_SIZE;
394
395        // NOTE: The value is used for error logging and is initialized only once, as `OnceLock`
396        // guarantees that the first caller sets the value and all subsequent calls reuse it
397        let _ = err::MID.get_or_init(|| cfg.module_id);
398
399        let mmap = unsafe { TMap::new(file.fd(), curr_length) }?;
400        let core =
401            sync::Arc::new(Core::new(mmap, file, cfg.flush_duration, curr_length, total_slots));
402
403        // INFO: we spawn the thread for background sync
404        let tx = Core::spawn_tx(core.clone())?;
405
406        Ok(Self { core, tx: Some(tx), _t: core::marker::PhantomData })
407    }
408
409    /// Create/open a new [`FrozenFile`] instance
410    fn open_file(
411        path: std::path::PathBuf,
412        cfg: &FrozenMMapCfg,
413    ) -> FrozenResult<(FrozenFile, usize)> {
414        let ff_cfg = FrozenFileCfg {
415            path,
416            module_id: cfg.module_id,
417            buffer_size: Self::SLOT_SIZE,
418            initial_available_buffers: cfg.initial_count,
419        };
420
421        let file = FrozenFile::new(ff_cfg)?;
422        let curr_length = file.length()?;
423
424        Ok((file, curr_length))
425    }
426
427    #[inline]
428    fn validate_t() -> FrozenResult<()> {
429        if std::mem::needs_drop::<T>() {
430            return err::new_err_default(err::DRP);
431        }
432
433        let align = std::mem::align_of::<T>();
434        if align != 8 {
435            return err::new_err_default(err::ALN);
436        }
437
438        let size = std::mem::size_of::<T>();
439        if size == 0 {
440            return err::new_err_default(err::ZRO);
441        }
442
443        if size % 8 != 0 {
444            return err::new_err_default(err::SZE);
445        }
446
447        Ok(())
448    }
449
450    /// Blocks until given `epoch` becomes durable
451    ///
452    /// ## Batching
453    ///
454    /// With respect to `flush_duration`, all write ops are batched before sync, which is executed
455    /// by flusher tx working in background, while each write is assigned w/ current durable epoch,
456    /// and all writes which observe the exact same epoch, belong to the same durability window, and
457    /// are all sync'ed together
458    ///
459    /// When a background sync succeeds, the internal durable epoch is incremented, indicating that
460    /// all writes that observed the previous epoch are now durable on disk
461    ///
462    /// ## Example
463    ///
464    /// ```
465    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
466    ///
467    /// const MODULE_ID: u8 = 0;
468    ///
469    /// let dir = tempfile::tempdir().unwrap();
470    /// let path = dir.path().join("tmp_wait_epoch");
471    ///
472    /// let cfg = FrozenMMapCfg {
473    ///     module_id: MODULE_ID,
474    ///     initial_count: 0x04,
475    ///     flush_duration: std::time::Duration::from_micros(0x60),
476    /// };
477    ///
478    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
479    ///
480    /// let epoch = unsafe { mmap.write(0, |v| *v = 0x8A) }.unwrap();
481    /// mmap.wait_for_durability(epoch).unwrap();
482    ///
483    /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
484    /// assert_eq!(val, 0x8A);
485    /// ```
486    pub fn wait_for_durability(&self, epoch: u64) -> FrozenResult<()> {
487        if let Some(sync_err) = self.core.get_sync_error() {
488            return Err(sync_err);
489        }
490
491        let durable_epoch = self.core.durable_epoch.load(atomic::Ordering::Acquire);
492        if durable_epoch >= epoch {
493            return Ok(());
494        }
495
496        let mut guard = self.core.acquire_durable_lock();
497        loop {
498            if let Some(sync_err) = self.core.get_sync_error() {
499                return Err(sync_err);
500            }
501
502            if self.core.durable_epoch.load(atomic::Ordering::Acquire) >= epoch {
503                return Ok(());
504            }
505
506            // NOTE: See [`Core::acquire_durable_lock`] implementation for rationale behind poison recovery
507            guard = self.core.durable_cv.wait(guard).unwrap_or_else(|e| e.into_inner());
508        }
509    }
510
511    /// Read a `T` at given `index` via callback (`f`)
512    ///
513    /// ## Concurrency
514    ///
515    /// Internally, [`FrozenMMap`] implements per-slot locking, so concurrent reads and writes for
516    /// at same index is atomic and thread safe, while operations on different indices may proceed
517    /// fully in parallel
518    ///
519    /// ## Safety
520    ///
521    /// The caller must ensure following:
522    ///
523    /// - given `index` is within bounds
524    /// - underlying memory contains a valid instance of `T`
525    /// - provided callback `f` must not,
526    ///   - write through the pointer
527    ///   - store or leak pointer beyound there lifetime
528    ///
529    /// Violating any of the above may result in undefined behavior
530    ///
531    /// ## Example
532    ///
533    /// ```
534    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
535    ///
536    /// const MODULE_ID: u8 = 0;
537    ///
538    /// let dir = tempfile::tempdir().unwrap();
539    /// let path = dir.path().join("tmp_read_mmap");
540    ///
541    /// let cfg = FrozenMMapCfg {
542    ///     module_id: MODULE_ID,
543    ///     initial_count: 0x02,
544    ///     flush_duration: std::time::Duration::from_micros(0x60),
545    /// };
546    ///
547    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
548    ///
549    /// let epoch = unsafe { mmap.write(0, |v| *v = 0x0A) }.unwrap();
550    /// mmap.wait_for_durability(epoch).unwrap();
551    ///
552    /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
553    /// assert_eq!(val, 0x0A);
554    /// ```
555    #[inline(always)]
556    pub unsafe fn read<R>(&self, index: usize, f: impl FnOnce(*const T) -> R) -> FrozenResult<R> {
557        let offset = Self::SLOT_SIZE * index;
558        let _lock = self.core.locks.lock(index);
559
560        // NOTE: We do avoid acquiring io_lock for read ops to increase the throughput (under the
561        // assumption of OS guarantees visibility)
562
563        let ptr = unsafe { self.core.map.as_ptr(offset) };
564        Ok(f(ptr))
565    }
566
567    /// Write/update a `T` at given `index` via callback (`f`)
568    ///
569    /// ## Concurrency
570    ///
571    /// Internally, [`FrozenMMap`] implements per-slot locking, so concurrent reads and writes for
572    /// at same index is atomic and thread safe, while operations on different indices may proceed
573    /// fully in parallel
574    ///
575    /// ## Safety
576    ///
577    /// The caller must ensure following:
578    ///
579    /// - given `index` is within bounds
580    /// - underlying memory contains a valid instance of `T`
581    /// - provided callback `f` must not  store or leak pointer beyound there lifetime
582    ///
583    /// Violating any of the above may result in undefined behavior
584    ///
585    /// ## Example
586    ///
587    /// ```
588    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
589    ///
590    /// const MODULE_ID: u8 = 0;
591    ///
592    /// let dir = tempfile::tempdir().unwrap();
593    /// let path = dir.path().join("tmp_write_mmap");
594    ///
595    /// let cfg = FrozenMMapCfg {
596    ///     module_id: MODULE_ID,
597    ///     initial_count: 0x02,
598    ///     flush_duration: std::time::Duration::from_micros(0x96),
599    /// };
600    ///
601    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
602    ///
603    /// let epoch = unsafe {mmap.write(1, |v| *v = 0x2B) }.unwrap();
604    /// mmap.wait_for_durability(epoch).unwrap();
605    ///
606    /// let val = unsafe { mmap.read(1, |v| *v) }.unwrap();
607    /// assert_eq!(val, 0x2B);
608    /// ```
609    #[inline(always)]
610    pub unsafe fn write(&self, index: usize, f: impl FnOnce(*mut T)) -> FrozenResult<TEpoch> {
611        // propagate prev errors
612        if let Some(err) = self.core.get_sync_error() {
613            return Err(err);
614        }
615
616        let offset = Self::SLOT_SIZE * index;
617
618        let _guard = self.core.acquire_io_lock();
619        let _lock = self.core.locks.lock(index);
620
621        let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
622        f(ptr);
623
624        self.core.dirty.store(true, atomic::Ordering::Release);
625        let epoch = self.core.incr_curr_epoch();
626
627        Ok(epoch)
628    }
629
630    /// Write/update a `T` at given `index` via callback (`f`) w/ instant durability
631    ///
632    /// This function performs a blocking hard-sync, unlike [`FrozenMMap::write`], the update is
633    /// immediately persisted to the underlying storage device
634    ///
635    /// ## Concurrency
636    ///
637    /// Internally, [`FrozenMMap`] implements per-slot locking, so concurrent reads and writes for
638    /// at same index is atomic and thread safe, while operations on different indices may proceed
639    /// fully in parallel.
640    ///
641    /// ## Safety
642    ///
643    /// The caller must ensure following:
644    ///
645    /// - given `index` is within bounds
646    /// - underlying memory contains a valid instance of `T`
647    /// - provided callback `f` must not  store or leak pointer beyound there lifetime
648    ///
649    /// Violating any of the above may result in undefined behavior
650    ///
651    /// ## Example
652    ///
653    /// ```
654    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
655    ///
656    /// const MODULE_ID: u8 = 0;
657    ///
658    /// let dir = tempfile::tempdir().unwrap();
659    /// let path = dir.path().join("tmp_write_sync");
660    ///
661    /// let cfg = FrozenMMapCfg {
662    ///     module_id: MODULE_ID,
663    ///     initial_count: 0x02,
664    ///     flush_duration: std::time::Duration::from_micros(0x96),
665    /// };
666    ///
667    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
668    /// unsafe { mmap.write_sync(0, |v| *v = 0xC0DE) }.unwrap();
669    ///
670    /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
671    /// assert_eq!(val, 0xC0DE);
672    /// ```
673    #[inline(always)]
674    pub unsafe fn write_sync(&self, index: usize, f: impl FnOnce(*mut T)) -> FrozenResult<()> {
675        // propagate prev errors
676        if let Some(err) = self.core.get_sync_error() {
677            return Err(err);
678        }
679
680        let offset = Self::SLOT_SIZE * index;
681
682        // block flush_tx scheduling
683        let _flush_guard = self.core.acquire_lock();
684
685        // we use exlusive lock as we perform blocking hard sync
686        let _guard = self.core.acquire_exclusive_io_lock();
687
688        let _lock = self.core.locks.lock(index);
689        let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
690        f(ptr);
691
692        // blocking hard sync
693        self.core.sync()?;
694
695        // NOTE: we hard sync we flushed an entier batch, so we can skip the sync via flush_tx as
696        // the current batch is durable
697
698        self.core.mark_epoch_durable();
699        let prev = self.core.dirty.swap(false, atomic::Ordering::AcqRel);
700
701        // NOTE: we must also notify cv's waiting for durability (we skip if there was no batch to
702        // sync)
703        if prev {
704            let _g = self.core.acquire_durable_lock();
705            self.core.durable_cv.notify_all();
706        }
707
708        Ok(())
709    }
710
711    /// Read current available count of slots, where each slot has size of [`FrozenMMap::<T>::SLOT_SIZE`]
712    ///
713    /// ## Working
714    ///
715    /// This call performs a syscall to fetch current length of [`FrozenFile`] from fs, as the
716    /// current length of the file is not cached anywhere in the pipeline to avoid TOCTAU race
717    /// conditions
718    ///
719    /// ## Example
720    ///
721    /// ```
722    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
723    ///
724    /// const MODULE_ID: u8 = 0;
725    ///
726    /// let dir = tempfile::tempdir().unwrap();
727    /// let path = dir.path().join("tmp_grow_mmap");
728    ///
729    /// let cfg = FrozenMMapCfg {
730    ///     module_id: MODULE_ID,
731    ///     initial_count: 0x02,
732    ///     flush_duration: std::time::Duration::from_micros(0x96),
733    /// };
734    ///
735    /// let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
736    /// assert_eq!(mmap.total_slots(), 0x02);
737    ///
738    /// drop(mmap);
739    ///
740    /// let mmap = FrozenMMap::<u64>::new_grown(&path, cfg, 0x03).unwrap();
741    /// assert_eq!(mmap.total_slots(), 0x02 + 0x03);
742    /// ```
743    #[inline]
744    pub fn total_slots(&self) -> usize {
745        self.core.curr_length / Self::SLOT_SIZE
746    }
747
748    /// Read the total memory footprint (in bytes) used by [`FrozenMMap`]
749    ///
750    /// *NOTE:* This is an approzimation of memory used, actual RSS may differ depending on paging
751    /// and OS
752    ///
753    /// ## Example
754    ///
755    /// ```
756    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
757    ///
758    /// const MODULE_ID: u8 = 0;
759    ///
760    /// let dir = tempfile::tempdir().unwrap();
761    /// let path = dir.path().join("tmp_mem_usage");
762    ///
763    /// let cfg = FrozenMMapCfg {
764    ///     module_id: MODULE_ID,
765    ///     initial_count: 0x10,
766    ///     flush_duration: std::time::Duration::from_micros(0x60),
767    /// };
768    ///
769    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
770    ///
771    /// let bytes = mmap.memory_usage();
772    /// assert!(bytes >= mmap.total_slots() * std::mem::size_of::<u64>());
773    /// ```
774    #[inline]
775    pub fn memory_usage(&self) -> usize {
776        // memory of mem-maped region
777        let mmap_bytes = self.core.curr_length;
778
779        // memory used for locking
780        let lock_bytes = self.core.locks.0.len() * std::mem::size_of::<atomic::AtomicU8>();
781
782        mmap_bytes + lock_bytes
783    }
784
785    /// Create a new [`FMTransaction`] context for grouping multi write ops into a single atomic
786    /// operation
787    ///
788    /// ## Overview
789    ///
790    /// The use of [`FMTransaction`] allows to group multiple write ops into a single atomic
791    /// operation, hence creating a transactional write operation, which gives following guarantees:
792    ///
793    /// - All write ops succeed together
794    /// - Single epoch to track durability of all writes ops
795    /// - Same durability guarantee for all the included write ops
796    ///
797    /// Simply, this preserves atomic durability semantics for multi index updates
798    ///
799    /// ## Example
800    ///
801    /// ```
802    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
803    ///
804    /// const MODULE_ID: u8 = 0;
805    ///
806    /// let dir = tempfile::tempdir().unwrap();
807    /// let path = dir.path().join("tmp_tx");
808    ///
809    /// let cfg = FrozenMMapCfg {
810    ///     module_id: MODULE_ID,
811    ///     initial_count: 0x0A,
812    ///     flush_duration: std::time::Duration::from_micros(50),
813    /// };
814    ///
815    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
816    ///
817    /// let mut tx = mmap.new_tx();
818    /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
819    /// unsafe { tx.write(1, |v| *v = 0x14) }.unwrap();
820    ///
821    /// let epoch = tx.commit().unwrap();
822    /// mmap.wait_for_durability(epoch).unwrap();
823    ///
824    /// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
825    /// let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
826    ///
827    /// assert_eq!((v0, v1), (0x0A, 0x14));
828    /// ```
829    #[inline]
830    pub fn new_tx(&self) -> FMTransaction<'_, T> {
831        FMTransaction { core: &self.core, ops_vec: Vec::new() }
832    }
833
834    /// Delete the underlying [`FrozenFile`] used for [`FrozenMMap`] from fs
835    ///
836    /// ## Working
837    ///
838    /// When `delete` is called, all read, write, and (background) sync ops are paused
839    /// (indefinitely), whule deletion is done with following steps:
840    ///
841    /// - acquire an exclusive `io_lock` (all other ops are paused indefinitely)
842    /// - if any batch is pending for sync,
843    ///   - swap the flag
844    ///   - call sync manually
845    ///   - incr epoch and update cv
846    /// - brodcast closing so flusher tx could wrap up
847    /// - `munmap(2)` current mapping
848    /// - call delete on [`FrozenFile`]
849    ///
850    /// ## Example
851    ///
852    /// ```
853    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
854    ///
855    /// const MODULE_ID: u8 = 0;
856    ///
857    /// let dir = tempfile::tempdir().unwrap();
858    /// let path = dir.path().join("tmp_delete_mmap");
859    ///
860    /// let cfg = FrozenMMapCfg {
861    ///     module_id: MODULE_ID,
862    ///     initial_count: 0x04,
863    ///     flush_duration: std::time::Duration::from_micros(0x96),
864    /// };
865    ///
866    /// let mut mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
867    /// mmap.delete().unwrap();
868    /// assert!(!path.exists());
869    /// ```
870    pub fn delete(&mut self) -> FrozenResult<()> {
871        // NOTE: we must broadcast that the close is happening to allow flusher tx to wrap up
872        self.core.dirty.store(false, atomic::Ordering::Release);
873        self.core.closed.store(true, atomic::Ordering::Release);
874        self.core.durable_cv.notify_one();
875
876        if let Some(handle) = self.tx.take() {
877            let _ = handle.join();
878        }
879
880        // pause all new write ops
881        let _lock = self.core.acquire_exclusive_io_lock();
882
883        self.munmap()?;
884        self.core.file.delete()
885    }
886
887    #[inline]
888    fn munmap(&self) -> FrozenResult<()> {
889        let length = self.core.curr_length;
890        unsafe { self.core.map.unmap(length) }
891    }
892}
893
894impl<T> Drop for FrozenMMap<T>
895where
896    T: Sized + Send + Sync,
897{
898    fn drop(&mut self) {
899        let is_closed = self.core.closed.swap(true, atomic::Ordering::Release);
900        self.core.cv.notify_one(); // notify flusher tx to shut
901
902        if let Some(handle) = self.tx.take() {
903            let _ = handle.join();
904        }
905
906        // we must acquire an exclusive lock, to prevent dropping while sync, growing or any io ops
907        let _io_lock = self.core.acquire_exclusive_io_lock();
908
909        // free up the boxed error (if any)
910        let ptr = self.core.error.swap(std::ptr::null_mut(), atomic::Ordering::AcqRel);
911        if !ptr.is_null() {
912            unsafe {
913                drop(Box::from_raw(ptr));
914            }
915        }
916
917        if !is_closed {
918            let _ = self.munmap();
919        }
920    }
921}
922
923impl<T> fmt::Display for FrozenMMap<T>
924where
925    T: Sized + Send + Sync,
926{
927    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
928        write!(
929            f,
930            "FrozenMMap{{fd: {}, total_slots: {}, len: {}}}",
931            self.core.file.fd(),
932            self.total_slots(),
933            self.core.curr_length,
934        )
935    }
936}
937
938/// A context for grouping multi write ops into a single atomic operation
939///
940/// ## Overview
941///
942/// Use of [`FMTransaction`] allows to group multiple write ops into a single atomic operation.
943///
944/// - All included writes are applied together
945/// - Single epoch is assinged for an entier transaction
946/// - Durability guarantee is same for all included write ops
947///
948/// ## Example
949///
950/// ```
951/// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
952///
953/// const MODULE_ID: u8 = 0;
954///
955/// let dir = tempfile::tempdir().unwrap();
956/// let path = dir.path().join("tmp_tx");
957///
958/// let cfg = FrozenMMapCfg {
959///     module_id: MODULE_ID,
960///     initial_count: 0x0A,
961///     flush_duration: std::time::Duration::from_micros(50),
962/// };
963///
964/// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
965///
966/// let mut tx = mmap.new_tx();
967/// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
968/// unsafe { tx.write(1, |v| *v = 0x14) }.unwrap();
969/// unsafe { tx.write(2, |v| *v = 0x18) }.unwrap();
970///
971/// let epoch = tx.commit().unwrap();
972/// mmap.wait_for_durability(epoch).unwrap();
973///
974/// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
975/// let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
976/// let v2 = unsafe { mmap.read(2, |v| *v).unwrap() };
977///
978/// assert_eq!((v0, v1, v2), (0x0A, 0x14, 0x18));
979/// ```
980pub struct FMTransaction<'a, T> {
981    core: &'a Core,
982    ops_vec: Vec<(usize, Box<dyn FnOnce(*mut T) + 'a>)>,
983}
984
985impl<'a, T> FMTransaction<'a, T> {
986    /// Append a write op into the [`FMTransaction`]
987    ///
988    /// ## Requirements
989    ///
990    /// Write ops must follow these safety requirements,
991    ///
992    /// - No duplicate indices
993    /// - No out-of-order writes (must be incremental)
994    ///
995    /// Violating this constraint would result in [`FrozenError`]
996    ///
997    /// ## Safety
998    ///
999    /// Same safety requirements as [`FrozenMMap::write`] apply here
1000    ///
1001    /// ## Example
1002    ///
1003    /// ```
1004    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
1005    ///
1006    /// const MODULE_ID: u8 = 0;
1007    ///
1008    /// let dir = tempfile::tempdir().unwrap();
1009    /// let path = dir.path().join("tmp_tx");
1010    ///
1011    /// let cfg = FrozenMMapCfg {
1012    ///     module_id: MODULE_ID,
1013    ///     initial_count: 0x10,
1014    ///     flush_duration: std::time::Duration::from_micros(50),
1015    /// };
1016    ///
1017    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1018    ///
1019    /// let mut tx = mmap.new_tx();
1020    /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
1021    /// unsafe { tx.write(1, |v| *v = 0x0B) }.unwrap();
1022    /// unsafe { tx.write(2, |v| *v = 0x0C) }.unwrap();
1023    ///
1024    /// let epoch = tx.commit().unwrap();
1025    /// mmap.wait_for_durability(epoch).unwrap();
1026    ///
1027    /// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1028    /// let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
1029    /// let v2 = unsafe { mmap.read(2, |v| *v).unwrap() };
1030    ///
1031    /// assert_eq!((v0, v1, v2), (0x0A, 0x0B, 0x0C));
1032    /// ```
1033    #[inline(always)]
1034    pub unsafe fn write<F>(&mut self, index: usize, f: F) -> FrozenResult<()>
1035    where
1036        F: FnOnce(*mut T) + 'a,
1037    {
1038        // NOTE:
1039        //
1040        // This check prevents a potential footgun! For a safe transaction all writes must be,
1041        // - ordered by index (either incr or decr)
1042        // - no multi writes on same index
1043        //
1044        // If any of these is violated, there is a certain risk of deadlock in multi tx env's
1045        if let Some((last_idx, _)) = self.ops_vec.last() {
1046            if index <= *last_idx {
1047                return err::new_err(
1048                    err::HCF,
1049                    "tx writes must be strictly ordered, with no more then single ops on given index",
1050                );
1051            }
1052        }
1053
1054        self.ops_vec.push((index, Box::new(f)));
1055        Ok(())
1056    }
1057
1058    /// Commit the transaction, applying all the writes ops, combined into a single atomic operation
1059    ///
1060    /// ## Guarantees
1061    ///
1062    /// - All writes are applied under a single epoch
1063    /// - All writes belong to the same durability batch
1064    /// - No interleaving with other transactions at epoch level
1065    ///
1066    /// ## Example
1067    ///
1068    /// ```
1069    /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
1070    ///
1071    /// const MODULE_ID: u8 = 0;
1072    ///
1073    /// let dir = tempfile::tempdir().unwrap();
1074    /// let path = dir.path().join("tmp_tx");
1075    ///
1076    /// let cfg = FrozenMMapCfg {
1077    ///     module_id: MODULE_ID,
1078    ///     initial_count: 0x10,
1079    ///     flush_duration: std::time::Duration::from_micros(50),
1080    /// };
1081    ///
1082    /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1083    ///
1084    /// let mut tx = mmap.new_tx();
1085    /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
1086    /// unsafe { tx.write(2, |v| *v = 0x0C) }.unwrap();
1087    ///
1088    /// let epoch = tx.commit().unwrap();
1089    /// mmap.wait_for_durability(epoch).unwrap();
1090    ///
1091    /// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1092    /// let v1 = unsafe { mmap.read(2, |v| *v).unwrap() };
1093    ///
1094    /// assert_eq!((v0, v1), (0x0A, 0x0C));
1095    /// ```
1096    #[inline(always)]
1097    pub fn commit(self) -> FrozenResult<u64> {
1098        if let Some(err) = self.core.get_sync_error() {
1099            return Err(err);
1100        }
1101
1102        let _guard = self.core.acquire_io_lock();
1103
1104        // NOTE: we must acquire all locks beforehand, to make sure all the write go through
1105        let mut guards = Vec::with_capacity(self.ops_vec.len());
1106        for (idx, _) in &self.ops_vec {
1107            guards.push(self.core.locks.lock(*idx));
1108        }
1109
1110        for (idx, op) in self.ops_vec {
1111            let offset = idx * std::mem::size_of::<T>();
1112            let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
1113            op(ptr);
1114        }
1115
1116        self.core.dirty.store(true, atomic::Ordering::Release);
1117        let epoch = self.core.incr_curr_epoch();
1118
1119        Ok(epoch)
1120    }
1121}
1122
1123#[derive(Debug)]
1124struct Core {
1125    map: TMap,
1126    locks: Locks,
1127    file: FrozenFile,
1128    cv: sync::Condvar,
1129    curr_length: usize,
1130    lock: sync::Mutex<()>,
1131    io_lock: sync::RwLock<()>,
1132    dirty: atomic::AtomicBool,
1133    durable_cv: sync::Condvar,
1134    closed: atomic::AtomicBool,
1135    durable_lock: sync::Mutex<()>,
1136    flush_duration: time::Duration,
1137    current_epoch: atomic::AtomicU64,
1138    durable_epoch: atomic::AtomicU64,
1139    error: atomic::AtomicPtr<sync::Arc<FrozenError>>,
1140}
1141
1142unsafe impl Send for Core {}
1143unsafe impl Sync for Core {}
1144
1145impl Core {
1146    fn new(
1147        map: TMap,
1148        file: FrozenFile,
1149        flush_duration: time::Duration,
1150        curr_length: usize,
1151        total_slots: usize,
1152    ) -> Self {
1153        Self {
1154            map,
1155            file,
1156            curr_length,
1157            flush_duration,
1158            cv: sync::Condvar::new(),
1159            lock: sync::Mutex::new(()),
1160            io_lock: sync::RwLock::new(()),
1161            locks: Locks::new(total_slots),
1162            durable_cv: sync::Condvar::new(),
1163            durable_lock: sync::Mutex::new(()),
1164            dirty: atomic::AtomicBool::new(false),
1165            closed: atomic::AtomicBool::new(false),
1166            current_epoch: atomic::AtomicU64::new(0),
1167            durable_epoch: atomic::AtomicU64::new(0),
1168            error: atomic::AtomicPtr::new(std::ptr::null_mut()),
1169        }
1170    }
1171
1172    #[inline]
1173    fn sync(&self) -> FrozenResult<()> {
1174        unsafe { self.map.sync(self.curr_length) }?;
1175        self.file.sync()
1176    }
1177
1178    #[inline(always)]
1179    fn set_sync_error(&self, err: FrozenError) {
1180        let boxed = Box::into_raw(Box::new(sync::Arc::new(err)));
1181        let old = self.error.swap(boxed, atomic::Ordering::AcqRel);
1182
1183        // NOTE: we must free the old error, if any, to avoid mem leaks
1184        if !old.is_null() {
1185            unsafe { drop(Box::from_raw(old)) };
1186        }
1187    }
1188
1189    #[inline(always)]
1190    fn get_sync_error(&self) -> Option<FrozenError> {
1191        let ptr = self.error.load(atomic::Ordering::Acquire);
1192        if hints::likely(ptr.is_null()) {
1193            return None;
1194        }
1195
1196        let arc = unsafe { &*ptr }.clone();
1197        Some((*arc).clone())
1198    }
1199
1200    #[inline]
1201    fn clear_sync_error(&self) {
1202        let old = self.error.swap(std::ptr::null_mut(), atomic::Ordering::AcqRel);
1203        if hints::unlikely(!old.is_null()) {
1204            unsafe {
1205                drop(Box::from_raw(old));
1206            }
1207        }
1208    }
1209
1210    /// ## Why we ignore [`std::sync::PoisonError`]?
1211    ///
1212    /// The mutex used for lock, is solely used as a parking primitive for [`Condvar`] and does not
1213    /// protect any mutable state. All the pool invariants and accounting are maintained via atomics
1214    /// and are completely seperated from the mutex.
1215    ///
1216    /// A poisoned mutex only indicates that another tx panicked while holding the lock, and indicates
1217    /// an inconsistent state of the protected value. Since no state can be left partially modified
1218    /// under this lock, there is no possible consistency risk to recover from and propagating the
1219    /// poison error would only introduce unnecessary failures into the allocation path.
1220    ///
1221    /// Therefore, as best effort, we consume the [`std::sync::PoisonError`] and continue operating
1222    /// with the recovered guard.
1223    #[inline]
1224    fn acquire_durable_lock(&self) -> sync::MutexGuard<'_, ()> {
1225        self.durable_lock.lock().unwrap_or_else(|e| e.into_inner())
1226    }
1227
1228    /// NOTE: See [`Core::acquire_durable_lock`] implementation for rationale behind poison recovery
1229    #[inline]
1230    fn acquire_lock(&self) -> sync::MutexGuard<'_, ()> {
1231        self.lock.lock().unwrap_or_else(|e| e.into_inner())
1232    }
1233
1234    /// NOTE: See [`Core::acquire_durable_lock`] implementation for rationale behind poison recovery
1235    #[inline]
1236    fn acquire_io_lock(&self) -> sync::RwLockReadGuard<'_, ()> {
1237        self.io_lock.read().unwrap_or_else(|e| e.into_inner())
1238    }
1239
1240    /// NOTE: See [`Core::acquire_durable_lock`] implementation for rationale behind poison recovery
1241    #[inline]
1242    fn acquire_exclusive_io_lock(&self) -> sync::RwLockWriteGuard<'_, ()> {
1243        self.io_lock.write().unwrap_or_else(|e| e.into_inner())
1244    }
1245
1246    #[inline]
1247    fn incr_curr_epoch(&self) -> u64 {
1248        self.current_epoch.fetch_add(1, atomic::Ordering::Release) + 1
1249    }
1250
1251    #[inline]
1252    fn mark_epoch_durable(&self) {
1253        let curr_epoch = self.current_epoch.load(atomic::Ordering::Acquire);
1254        self.durable_epoch.store(curr_epoch, atomic::Ordering::Release);
1255    }
1256
1257    fn spawn_tx(core: sync::Arc<Self>) -> FrozenResult<thread::JoinHandle<()>> {
1258        match thread::Builder::new().name("fm-flush-tx".into()).spawn(move || Self::flush_tx(core))
1259        {
1260            Ok(tx) => Ok(tx),
1261            Err(error) => err::new_err(err::FXE, error),
1262        }
1263    }
1264
1265    fn flush_tx(core: sync::Arc<Self>) {
1266        // init phase (acquiring locks)
1267        let mut guard = match core.lock.lock() {
1268            Ok(g) => g,
1269            Err(error) => {
1270                core.set_sync_error(err::new_err_raw(err::FXE, error));
1271                core.cv.notify_all();
1272                return;
1273            }
1274        };
1275
1276        // sync loop w/ non-busy waiting
1277        loop {
1278            guard = match core.cv.wait_timeout(guard, core.flush_duration) {
1279                Ok((g, _)) => g,
1280                Err(e) => {
1281                    core.set_sync_error(err::new_err_raw(err::TXE, e));
1282                    core.cv.notify_all();
1283                    return;
1284                }
1285            };
1286
1287            // NOTE: we must read values of close brodcast before acquire exclusive lock, if done
1288            // otherwise, we impose serious deadlock sort of situation for the the flusher tx
1289
1290            let dirty = core.dirty.swap(false, atomic::Ordering::AcqRel);
1291            let closing = core.closed.load(atomic::Ordering::Acquire);
1292
1293            if !dirty {
1294                if closing {
1295                    core.cv.notify_all();
1296                    return;
1297                }
1298
1299                continue;
1300            }
1301
1302            // INFO: we must acquire an exclusive IO lock for sync, hence no write, read or
1303            // grow could kick in while sync is in progress
1304
1305            let io_lock = core.acquire_exclusive_io_lock();
1306
1307            // INFO: we must drop the guard before syscall, as its a blocking operation and holding
1308            // the mutex while the syscall takes place is not a good idea, while we drop the mutex
1309            // and acqurie it again, in-between other process could acquire it and use it
1310            drop(guard);
1311
1312            // NOTE: We snapshot `current_epoch` before sync to establish a strict batch boundary,
1313            // all writes up to `target_epoch` are guaranteed to be part of this flush window,
1314            // while anything beyound belongs to the next batch
1315            let target_epoch = core.current_epoch.load(atomic::Ordering::Acquire);
1316
1317            // NOTE:
1318            //
1319            // - if sync fails, we update the Core::error w/ the received error object
1320            // - we clear it up when another sync call succeeds
1321            // - this is valid, as the underlying sync flushes entire mmaped region, hence
1322            //   even if the last call failed, and the new one succeeded, we do get the durability
1323            //   guarenty for the old data as well
1324
1325            match core.sync() {
1326                Ok(_) => {
1327                    core.durable_epoch.store(target_epoch, atomic::Ordering::Release);
1328
1329                    let _g = core.acquire_durable_lock();
1330
1331                    core.clear_sync_error();
1332                    core.durable_cv.notify_all();
1333                }
1334                Err(err) => {
1335                    core.set_sync_error(err);
1336                    core.durable_cv.notify_all();
1337                }
1338            }
1339
1340            drop(io_lock);
1341            guard = core.acquire_lock();
1342        }
1343    }
1344}
1345
1346#[derive(Debug)]
1347struct Locks(Box<[atomic::AtomicU8]>);
1348
1349impl Locks {
1350    const LOCK: u8 = 1;
1351    const UNLOCK: u8 = 0;
1352
1353    const L1_CONTENTION: usize = 0x10;
1354    const L2_CONTENTION: usize = 0x20;
1355
1356    fn new(cap: usize) -> Self {
1357        let mut slots = Vec::with_capacity(cap);
1358        for _ in 0..cap {
1359            slots.push(atomic::AtomicU8::new(Self::UNLOCK));
1360        }
1361
1362        Self(slots.into_boxed_slice())
1363    }
1364
1365    #[inline(always)]
1366    fn lock(&self, index: usize) -> LockGuard<'_> {
1367        let val = &self.0[index];
1368        let mut spins = 0;
1369
1370        loop {
1371            if val
1372                .compare_exchange_weak(
1373                    Self::UNLOCK,
1374                    Self::LOCK,
1375                    atomic::Ordering::Acquire,
1376                    atomic::Ordering::Relaxed,
1377                )
1378                .is_ok()
1379            {
1380                return LockGuard(val);
1381            }
1382
1383            // we must backoff under contention
1384
1385            // NOTE:
1386            //
1387            // We have three levels of backoff,
1388            //
1389            // 1. Busy waiting w/ CPU hint
1390            // 2. Willingly allow preemption by OS schedular
1391            // 3. Sleep the thread (increasing w/ exponenial factor)
1392
1393            if hints::likely(spins < Self::L1_CONTENTION) {
1394                std::hint::spin_loop();
1395            } else if spins < Self::L2_CONTENTION {
1396                thread::yield_now();
1397            } else {
1398                let ns = 0x30 << (spins - Self::L2_CONTENTION).min(0x0A);
1399                thread::sleep(time::Duration::from_nanos(ns));
1400            }
1401
1402            spins += 1;
1403        }
1404    }
1405}
1406
1407struct LockGuard<'a>(&'a atomic::AtomicU8);
1408
1409impl Drop for LockGuard<'_> {
1410    fn drop(&mut self) {
1411        self.0.store(Locks::UNLOCK, atomic::Ordering::Release);
1412    }
1413}
1414
1415#[cfg(test)]
1416mod tests {
1417    use super::*;
1418
1419    // NOTE: we keep this small on purpose, so we won't have to wait at all in tests
1420    const FLUSH_DURATION: time::Duration = time::Duration::from_micros(10);
1421
1422    const MODULE_ID: u8 = 0;
1423    const INIT_SLOTS: usize = 0x0A;
1424
1425    fn new_tmp() -> (tempfile::TempDir, std::path::PathBuf, FrozenMMapCfg) {
1426        let dir = tempfile::tempdir().unwrap();
1427        let path = dir.path().join("tmp_map");
1428
1429        let cfg = FrozenMMapCfg {
1430            module_id: MODULE_ID,
1431            initial_count: INIT_SLOTS,
1432            flush_duration: FLUSH_DURATION,
1433        };
1434
1435        (dir, path, cfg)
1436    }
1437
1438    mod fm_lifecycle {
1439        use super::*;
1440
1441        #[test]
1442        fn ok_new() {
1443            let (_dir, path, cfg) = new_tmp();
1444            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1445
1446            assert_eq!(mmap.core.flush_duration, FLUSH_DURATION);
1447            assert!(!mmap.core.dirty.load(atomic::Ordering::Acquire));
1448            assert!(!mmap.core.closed.load(atomic::Ordering::Acquire));
1449            assert_eq!(mmap.core.durable_epoch.load(atomic::Ordering::Acquire), 0);
1450            assert_eq!(mmap.core.curr_length, INIT_SLOTS * FrozenMMap::<u64>::SLOT_SIZE);
1451
1452            // satisfies the bg thread was spawned correctly
1453            assert!(mmap.core.error.load(atomic::Ordering::Acquire).is_null());
1454
1455            // satisfies wait on epoch works
1456            let epoch = unsafe { mmap.write(0, |f| *f = 0x0A).unwrap() };
1457            assert!(mmap.wait_for_durability(epoch).is_ok());
1458        }
1459
1460        #[test]
1461        fn ok_new_existing() {
1462            let (_dir, path, cfg) = new_tmp();
1463
1464            // create new + close
1465            let mmap1 = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1466            drop(mmap1);
1467
1468            // open existing
1469            let mmap2 = FrozenMMap::<u64>::new(path, cfg).unwrap();
1470            drop(mmap2);
1471        }
1472
1473        #[test]
1474        fn err_new_when_change_in_cfg() {
1475            let (_dir, path, mut cfg) = new_tmp();
1476
1477            // create new + close
1478            let mmap1 = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1479            drop(mmap1);
1480
1481            // update cfg + opne existing
1482            cfg.initial_count = INIT_SLOTS * 2;
1483            assert!(FrozenMMap::<u64>::new(path, cfg).is_err());
1484        }
1485
1486        #[test]
1487        fn ok_delete() {
1488            let (_dir, path, cfg) = new_tmp();
1489            let mut mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1490
1491            mmap.delete().unwrap();
1492            assert!(!mmap.core.file.exists().unwrap());
1493        }
1494
1495        #[test]
1496        fn err_delete_after_delete() {
1497            let (_dir, path, cfg) = new_tmp();
1498            let mut mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1499
1500            mmap.delete().unwrap();
1501            assert!(!mmap.core.file.exists().unwrap());
1502            assert!(mmap.delete().is_err());
1503        }
1504
1505        #[test]
1506        fn ok_drop_persists_when_dropped_before_bg_flush() {
1507            let (_dir, path, cfg) = new_tmp();
1508            const VAL: u64 = 0x0A;
1509
1510            {
1511                let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1512                unsafe { mmap.write(0, |byte| *byte = VAL).unwrap() };
1513                drop(mmap);
1514            }
1515
1516            {
1517                let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1518                let val = unsafe { mmap.read(0, |byte| *byte).unwrap() };
1519                assert_eq!(val, VAL);
1520            }
1521        }
1522    }
1523
1524    mod fm_validate_t {
1525        use super::*;
1526
1527        #[repr(C, align(8))]
1528        struct OkT {
1529            a: u64,
1530            b: u64,
1531        }
1532
1533        #[repr(C)]
1534        struct BadAlignT {
1535            a: u32,
1536            b: u32,
1537        }
1538
1539        #[repr(C, align(4))]
1540        struct BadSizeT {
1541            a: u32,
1542            b: u16,
1543        }
1544
1545        #[repr(C, align(8))]
1546        struct DropT(u64);
1547
1548        impl Drop for DropT {
1549            fn drop(&mut self) {}
1550        }
1551
1552        #[repr(C, align(8))]
1553        struct ZstT;
1554
1555        #[test]
1556        fn ok_validate_t() {
1557            assert!(FrozenMMap::<OkT>::validate_t().is_ok());
1558        }
1559
1560        #[test]
1561        fn err_validate_t_when_drop() {
1562            assert!(FrozenMMap::<DropT>::validate_t().is_err());
1563        }
1564
1565        #[test]
1566        fn err_validate_t_when_not_8_byte_aligned() {
1567            assert!(FrozenMMap::<BadAlignT>::validate_t().is_err());
1568        }
1569
1570        #[test]
1571        fn err_validate_t_when_size_not_multiple_of_8() {
1572            assert!(FrozenMMap::<BadSizeT>::validate_t().is_err());
1573        }
1574
1575        #[test]
1576        fn err_validate_t_when_zero_sized() {
1577            assert!(FrozenMMap::<ZstT>::validate_t().is_err());
1578        }
1579
1580        #[test]
1581        fn err_new_when_t_implements_drop() {
1582            let (_dir, path, cfg) = new_tmp();
1583            assert!(FrozenMMap::<DropT>::new(path, cfg).is_err());
1584        }
1585
1586        #[test]
1587        fn err_new_when_t_is_not_8_byte_aligned() {
1588            let (_dir, path, cfg) = new_tmp();
1589            assert!(FrozenMMap::<BadAlignT>::new(path, cfg).is_err());
1590        }
1591
1592        #[test]
1593        fn err_new_when_t_size_is_not_multiple_of_8() {
1594            let (_dir, path, cfg) = new_tmp();
1595            assert!(FrozenMMap::<BadSizeT>::new(path, cfg).is_err());
1596        }
1597
1598        #[test]
1599        fn err_new_when_t_is_zero_sized() {
1600            let (_dir, path, cfg) = new_tmp();
1601            assert!(FrozenMMap::<ZstT>::new(path, cfg).is_err());
1602        }
1603
1604        #[test]
1605        fn err_new_grown_when_t_implements_drop() {
1606            let (_dir, path, cfg) = new_tmp();
1607            assert!(FrozenMMap::<DropT>::new_grown(path, cfg, 1).is_err());
1608        }
1609
1610        #[test]
1611        fn err_new_grown_when_t_is_not_8_byte_aligned() {
1612            let (_dir, path, cfg) = new_tmp();
1613            assert!(FrozenMMap::<BadAlignT>::new_grown(path, cfg, 1).is_err());
1614        }
1615
1616        #[test]
1617        fn err_new_grown_when_t_size_is_not_multiple_of_8() {
1618            let (_dir, path, cfg) = new_tmp();
1619            assert!(FrozenMMap::<BadSizeT>::new_grown(path, cfg, 1).is_err());
1620        }
1621
1622        #[test]
1623        fn err_new_grown_when_t_is_zero_sized() {
1624            let (_dir, path, cfg) = new_tmp();
1625            assert!(FrozenMMap::<ZstT>::new_grown(path, cfg, 1).is_err());
1626        }
1627    }
1628
1629    mod fm_new_grown {
1630        use super::*;
1631
1632        #[test]
1633        fn ok_new_grown_updates_length() {
1634            let (_dir, path, cfg) = new_tmp();
1635
1636            let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1637            assert_eq!(mmap.total_slots(), INIT_SLOTS);
1638            drop(mmap);
1639
1640            let mmap = FrozenMMap::<u64>::new_grown(path, cfg, 0x0A).unwrap();
1641            assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x0A);
1642            assert_eq!(mmap.core.curr_length, (INIT_SLOTS + 0x0A) * FrozenMMap::<u64>::SLOT_SIZE);
1643        }
1644
1645        #[test]
1646        fn err_new_grown_with_preexisting_instance() {
1647            let (_dir, path, cfg) = new_tmp();
1648
1649            let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1650            assert_eq!(mmap.total_slots(), INIT_SLOTS);
1651
1652            assert!(FrozenMMap::<u64>::new_grown(path, cfg, 0x0A).is_err());
1653        }
1654
1655        #[test]
1656        fn ok_new_grown_cycle() {
1657            let (_dir, path, cfg) = new_tmp();
1658
1659            let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1660            drop(mmap);
1661
1662            let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x100).unwrap();
1663            assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x100);
1664            drop(mmap);
1665
1666            let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x100).unwrap();
1667            assert_eq!(mmap.total_slots(), INIT_SLOTS + (2 * 0x100));
1668            drop(mmap);
1669
1670            let mmap = FrozenMMap::<u64>::new_grown(path, cfg, 0x100).unwrap();
1671            assert_eq!(mmap.total_slots(), INIT_SLOTS + (3 * 0x100));
1672        }
1673
1674        #[test]
1675        fn ok_write_reopen_grown_read() {
1676            let (_dir, path, cfg) = new_tmp();
1677
1678            {
1679                let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1680                unsafe { mmap.write(0, |v| *v = 0xAA).unwrap() };
1681            }
1682
1683            {
1684                let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x10).unwrap();
1685                unsafe { mmap.write(0, |v| *v = 0xBB).unwrap() };
1686
1687                let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1688                assert_eq!(val, 0xBB);
1689            }
1690        }
1691
1692        #[test]
1693        fn ok_write_reopen_grown_read_cycle() {
1694            let (_dir, path, cfg) = new_tmp();
1695
1696            {
1697                let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1698                unsafe { mmap.write(0, |v| *v = 1).unwrap() };
1699            }
1700
1701            for i in 0..2 {
1702                let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x10).unwrap();
1703                let idx = mmap.total_slots() - 1;
1704                unsafe { mmap.write(idx, |v| *v = (i + 2) as u64).unwrap() };
1705                drop(mmap);
1706            }
1707
1708            let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1709
1710            let base = unsafe { mmap.read(0, |v| *v).unwrap() };
1711            assert_eq!(base, 1);
1712
1713            let last_idx = mmap.total_slots() - 1;
1714            let last = unsafe { mmap.read(last_idx, |v| *v).unwrap() };
1715            assert_eq!(last, 3);
1716        }
1717
1718        #[test]
1719        fn err_new_grown_while_previous_instance_is_alive() {
1720            let (_dir, path, cfg) = new_tmp();
1721
1722            let _mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1723            let reopened = FrozenMMap::<u64>::new_grown(&path, cfg, 0x10);
1724
1725            assert!(reopened.is_err());
1726        }
1727    }
1728
1729    mod fm_write_read {
1730        use super::*;
1731
1732        #[test]
1733        fn ok_write_wait_read_cycle() {
1734            const VAL: u64 = 0xDEADC0DE;
1735            let (_dir, path, cfg) = new_tmp();
1736            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1737
1738            // write + sync
1739            let epoch = unsafe { mmap.write(0, |ptr| *ptr = VAL).unwrap() };
1740            mmap.wait_for_durability(epoch).unwrap();
1741
1742            // read + verify
1743            let val = unsafe { mmap.read(0, |ptr| *ptr).unwrap() };
1744            assert_eq!(val, VAL);
1745        }
1746
1747        #[test]
1748        fn ok_write_read_without_wait() {
1749            const VAL: u64 = 0xDEADC0DE;
1750            let (_dir, path, cfg) = new_tmp();
1751            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1752
1753            unsafe { mmap.write(0, |ptr| *ptr = VAL).unwrap() };
1754            let val = unsafe { mmap.read(0, |ptr| *ptr).unwrap() };
1755            assert_eq!(val, VAL);
1756        }
1757    }
1758
1759    mod fm_write_sync_read {
1760        use super::*;
1761
1762        #[test]
1763        fn ok_write_sync_read() {
1764            let (_dir, path, cfg) = new_tmp();
1765            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1766
1767            unsafe { mmap.write_sync(0, |v| *v = 0x4C).unwrap() };
1768
1769            let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1770            assert_eq!(val, 0x4C);
1771        }
1772
1773        #[test]
1774        fn ok_write_sync_wait_returns_immediately() {
1775            let (_dir, path, cfg) = new_tmp();
1776            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1777
1778            let _ = unsafe { mmap.write_sync(0, |v| *v = 0x6A).unwrap() };
1779
1780            let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1781            assert_eq!(val, 0x6A);
1782        }
1783
1784        #[test]
1785        fn ok_write_sync_followed_by_async_write() {
1786            let (_dir, path, cfg) = new_tmp();
1787
1788            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1789            unsafe { mmap.write_sync(0, |v| *v = 0x0A).unwrap() };
1790
1791            let epoch = unsafe { mmap.write(0, |v| *v = 0x14).unwrap() };
1792            mmap.wait_for_durability(epoch).unwrap();
1793
1794            let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1795            assert_eq!(val, 0x14);
1796        }
1797
1798        #[test]
1799        fn ok_write_sync_makes_prev_batch_durable() {
1800            let (_dir, path, cfg) = new_tmp();
1801            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1802
1803            let async_epoch = unsafe { mmap.write(0, |v| *v = 1).unwrap() };
1804            unsafe { mmap.write_sync(1, |v| *v = 2).unwrap() };
1805            mmap.wait_for_durability(async_epoch).unwrap();
1806
1807            let v1 = unsafe { mmap.read(0, |v| *v).unwrap() };
1808            let v2 = unsafe { mmap.read(1, |v| *v).unwrap() };
1809
1810            assert_eq!(v1, 1);
1811            assert_eq!(v2, 2);
1812        }
1813
1814        #[test]
1815        fn ok_write_sync_persists_across_reopen() {
1816            let (_dir, path, cfg) = new_tmp();
1817            const VAL: u64 = 0x1000;
1818
1819            {
1820                let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1821                unsafe { mmap.write_sync(0, |v| *v = VAL).unwrap() };
1822            }
1823
1824            {
1825                let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1826                let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1827                assert_eq!(val, VAL);
1828            }
1829        }
1830    }
1831
1832    mod fm_tx {
1833        use super::*;
1834
1835        #[test]
1836        fn ok_tx_basic_multi_write() {
1837            let (_dir, path, cfg) = new_tmp();
1838            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1839
1840            let mut tx = mmap.new_tx();
1841            unsafe {
1842                tx.write(0, |v| *v = 1).unwrap();
1843                tx.write(1, |v| *v = 2).unwrap();
1844                tx.write(2, |v| *v = 3).unwrap();
1845            }
1846
1847            let epoch = tx.commit().unwrap();
1848            mmap.wait_for_durability(epoch).unwrap();
1849
1850            let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1851            let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
1852            let v2 = unsafe { mmap.read(2, |v| *v).unwrap() };
1853
1854            assert_eq!((v0, v1, v2), (1, 2, 3));
1855        }
1856
1857        #[test]
1858        fn ok_tx_single_epoch() {
1859            let (_dir, path, cfg) = new_tmp();
1860            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1861
1862            let mut tx = mmap.new_tx();
1863            unsafe {
1864                tx.write(0, |v| *v = 0x0A).unwrap();
1865                tx.write(1, |v| *v = 0x14).unwrap();
1866            }
1867
1868            // NOTE: next write must have strictly higher epoch then current one
1869
1870            let epoch = tx.commit().unwrap();
1871            let next_epoch = unsafe { mmap.write(2, |v| *v = 0x1E).unwrap() };
1872
1873            assert!(next_epoch > epoch);
1874        }
1875
1876        #[test]
1877        fn err_tx_out_of_order() {
1878            let (_dir, path, cfg) = new_tmp();
1879            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1880
1881            let mut tx = mmap.new_tx();
1882            unsafe {
1883                tx.write(2, |v| *v = 1).unwrap();
1884                let res = tx.write(1, |v| *v = 2);
1885                assert!(res.is_err());
1886            }
1887        }
1888
1889        #[test]
1890        fn err_tx_duplicate_index() {
1891            let (_dir, path, cfg) = new_tmp();
1892            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1893
1894            let mut tx = mmap.new_tx();
1895            unsafe {
1896                tx.write(1, |v| *v = 1).unwrap();
1897                let res = tx.write(1, |v| *v = 2);
1898                assert!(res.is_err());
1899            }
1900        }
1901
1902        #[test]
1903        fn ok_tx_concurrent_non_overlapping() {
1904            let (_dir, path, cfg) = new_tmp();
1905            let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
1906
1907            let mut handles = Vec::new();
1908            for i in 0..2 {
1909                let mmap = mmap.clone();
1910                handles.push(thread::spawn(move || {
1911                    let mut tx = mmap.new_tx();
1912
1913                    unsafe {
1914                        tx.write(i * 2, |v| *v = i as u64).unwrap();
1915                        tx.write(i * 2 + 1, |v| *v = i as u64).unwrap();
1916                    }
1917
1918                    let epoch = tx.commit().unwrap();
1919                    mmap.wait_for_durability(epoch).unwrap();
1920                }));
1921            }
1922
1923            for h in handles {
1924                h.join().unwrap();
1925            }
1926
1927            for i in 0..2 {
1928                let v0 = unsafe { mmap.read(i * 2, |v| *v).unwrap() };
1929                let v1 = unsafe { mmap.read(i * 2 + 1, |v| *v).unwrap() };
1930
1931                assert_eq!((v0, v1), (i as u64, i as u64));
1932            }
1933        }
1934
1935        #[test]
1936        fn ok_tx_overwrite_last_wins() {
1937            let (_dir, path, cfg) = new_tmp();
1938            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1939
1940            let mut tx = mmap.new_tx();
1941            unsafe {
1942                tx.write(0, |v| *v = 1).unwrap();
1943            }
1944
1945            tx.commit().unwrap();
1946
1947            let mut tx2 = mmap.new_tx();
1948            unsafe {
1949                tx2.write(0, |v| *v = 2).unwrap();
1950            }
1951
1952            let epoch = tx2.commit().unwrap();
1953            mmap.wait_for_durability(epoch).unwrap();
1954
1955            let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1956            assert_eq!(val, 2);
1957        }
1958
1959        #[test]
1960        fn ok_tx_persists_across_reopen() {
1961            let (_dir, path, cfg) = new_tmp();
1962
1963            {
1964                let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1965
1966                let mut tx = mmap.new_tx();
1967                unsafe {
1968                    tx.write(0, |v| *v = 0x3A).unwrap();
1969                    tx.write(1, |v| *v = 0x54).unwrap();
1970                }
1971
1972                let epoch = tx.commit().unwrap();
1973                mmap.wait_for_durability(epoch).unwrap();
1974            }
1975
1976            {
1977                let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1978
1979                let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1980                let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
1981
1982                assert_eq!((v0, v1), (0x3A, 0x54));
1983            }
1984        }
1985    }
1986
1987    mod fm_durability {
1988        use super::*;
1989
1990        #[test]
1991        fn ok_wait_then_drop() {
1992            let (_dir, path, cfg) = new_tmp();
1993            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1994
1995            let epoch = unsafe { mmap.write(0, |v| *v = 7).unwrap() };
1996            mmap.wait_for_durability(epoch).unwrap();
1997
1998            drop(mmap);
1999        }
2000
2001        #[test]
2002        fn ok_epoch_monotonicity() {
2003            let (_dir, path, cfg) = new_tmp();
2004            let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
2005
2006            let e1 = unsafe { mmap.write(0, |v| *v = 1).unwrap() };
2007            mmap.wait_for_durability(e1).unwrap();
2008
2009            let e2 = unsafe { mmap.write(0, |v| *v = 2).unwrap() };
2010            mmap.wait_for_durability(e2).unwrap();
2011            assert!(e2 >= e1);
2012        }
2013
2014        #[test]
2015        fn ok_wait_for_durability_with_multi_writers() {
2016            let (_dir, path, cfg) = new_tmp();
2017            let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
2018
2019            let mut handles = Vec::new();
2020            for _ in 0..2 {
2021                let mmap = mmap.clone();
2022                handles.push(thread::spawn(move || {
2023                    let epoch = unsafe { mmap.write(0, |v| *v += 1).unwrap() };
2024                    mmap.wait_for_durability(epoch).unwrap();
2025                }));
2026            }
2027
2028            for h in handles {
2029                h.join().unwrap();
2030            }
2031
2032            let val = unsafe { mmap.read(0, |v| *v).unwrap() };
2033            assert_eq!(val, 2);
2034        }
2035    }
2036
2037    mod fm_concurrency {
2038        use super::*;
2039
2040        #[test]
2041        fn ok_parallel_reads_with_diff_index() {
2042            let (_dir, path, cfg) = new_tmp();
2043            let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
2044
2045            unsafe { mmap.write(0, |v| *v = 0x10).unwrap() };
2046            unsafe { mmap.write(1, |v| *v = 0x20).unwrap() };
2047
2048            let t1 = {
2049                let mmap = mmap.clone();
2050                thread::spawn(move || unsafe { mmap.read(0, |v| *v).unwrap() })
2051            };
2052
2053            let t2 = {
2054                let mmap = mmap.clone();
2055                thread::spawn(move || unsafe { mmap.read(1, |v| *v).unwrap() })
2056            };
2057
2058            assert_eq!(t1.join().unwrap(), 0x10);
2059            assert_eq!(t2.join().unwrap(), 0x20);
2060        }
2061
2062        #[test]
2063        fn ok_multi_tx_drop_then_reopen_grown() {
2064            let (_dir, path, cfg) = new_tmp();
2065
2066            {
2067                let mmap = sync::Arc::new(FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap());
2068
2069                let mut handles = Vec::new();
2070                for i in 0..2u64 {
2071                    let mmap = mmap.clone();
2072                    handles.push(thread::spawn(move || {
2073                        let _ = unsafe { mmap.write(i as usize, |v| *v = i + 1).unwrap() };
2074                    }));
2075                }
2076
2077                for i in 0..2usize {
2078                    let mmap = mmap.clone();
2079                    handles.push(thread::spawn(move || {
2080                        let _ = unsafe { mmap.read(i, |v| *v).unwrap() };
2081                    }));
2082                }
2083
2084                for h in handles {
2085                    h.join().unwrap();
2086                }
2087            }
2088
2089            {
2090                let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x10).unwrap();
2091                assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x10);
2092
2093                for i in 0..2u64 {
2094                    let val = unsafe { mmap.read(i as usize, |v| *v).unwrap() };
2095                    assert_eq!(val, i + 1);
2096                }
2097
2098                let idx = mmap.total_slots() - 1;
2099                unsafe { mmap.write(idx, |v| *v = 0xDEAD).unwrap() };
2100
2101                let val = unsafe { mmap.read(idx, |v| *v).unwrap() };
2102                assert_eq!(val, 0xDEAD);
2103            }
2104        }
2105    }
2106}