Skip to main content

frozen_core/fmmap/
mod.rs

1//! Custom implementation of `mmap(2)`
2//!
3//! ## Constraints
4//!
5//! [`FrozenMMap`] treats the mapped file as raw storage for values of `T`. Because of that, `T` must be a POD type
6//! which is safe to persist and later reinterpret from bytes.
7//!
8//! Required properties for `T`,
9//!
10//! - Must use `#[repr(C)]`
11//! - Should be 8-bytes aligned
12//! - Must not implement [`Drop`]
13//! - `size_of::<T>()` should be multiple of `8`
14//!
15//! *NOTE:* `T` must not contain heap owning or process-local pointers like [`Vec`], [`String`], [`Box`], references
16//! and function pointers, or other fields whose bit-pattern is not stable across reopen.
17//!
18//! These constrains are enforced as [`FrozenMMap`] does not serialize or deserialize values. It directly reads and
19//! writes `T` inside a memory mapped file. That means `T` must have a stable layout and must remain valid when the file
20//! is reopened in a later process.
21//!
22//! ## Example
23//!
24//! ```
25//! use frozen_core::fmmap::{FrozenMMap, FMCfg};
26//!
27//! const MODULE_ID: u8 = 0;
28//!
29//! let dir = tempfile::tempdir().unwrap();
30//! let path = dir.path().join("tmp_frozen_mmap");
31//!
32//! let cfg = FMCfg {
33//!     initial_count: 0x0A,
34//!     flush_duration: std::time::Duration::from_micros(0x96),
35//! };
36//!
37//! let mmap = FrozenMMap::<u64, MODULE_ID>::new(&path, cfg.clone()).unwrap();
38//! assert_eq!(mmap.total_slots(), 0x0A);
39//!
40//! let epoch = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
41//! mmap.wait_for_durability(epoch).unwrap();
42//!
43//! let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
44//! assert_eq!(val, 0xDEADC0DE);
45//!
46//! drop(mmap);
47//!
48//! let reopened = FrozenMMap::<u64, MODULE_ID>::new_grown(&path, cfg, 0x05).unwrap();
49//! assert_eq!(reopened.total_slots(), 0x0A + 0x05);
50//!
51//! let val = unsafe { reopened.read(0, |v| *v) }.unwrap();
52//! assert_eq!(val, 0xDEADC0DE);
53//! ```
54
55#[cfg(any(target_os = "linux", target_os = "macos"))]
56mod posix;
57
58use crate::{
59    error::{ErrCode, FrozenError, FrozenResult},
60    ffile::{FFCfg, FrozenFile},
61    hints,
62};
63use std::{
64    fmt,
65    sync::{self, atomic},
66    thread, time,
67};
68
69/// Domain Id for [`FrozenMMap`] is **18**
70const ERRDOMAIN: u8 = 0x12;
71
72/// type for `epoch` used by write ops
73pub type TEpoch = u64;
74
75#[cfg(any(target_os = "linux", target_os = "macos"))]
76type TMap = posix::POSIXMMap;
77
78/// module id used for [`FrozenMMap`]
79static MID: sync::OnceLock<u8> = sync::OnceLock::new();
80
81#[cfg(not(test))]
82#[inline(always)]
83fn mid() -> &'static u8 {
84    MID.get().unwrap()
85}
86
87#[cfg(test)]
88#[inline(always)]
89fn mid() -> &'static u8 {
90    MID.get_or_init(|| 0)
91}
92
93/// Error codes for [`FrozenMMap`]
94pub(in crate::fmmap) mod err {
95    use super::ErrCode;
96
97    /// (512) internal fuck up (hault and catch fire)
98    pub const HCF: ErrCode = ErrCode::new(0x200, "hault and catch fire");
99
100    /// (513) unknown error (fallback)
101    pub const UNK: ErrCode = ErrCode::new(0x201, "unknown error");
102
103    /// (514) no more memory available
104    pub const NMM: ErrCode = ErrCode::new(0x202, "not enough memory available on the device");
105
106    /// (515) syncing error
107    pub const SYN: ErrCode = ErrCode::new(0x203, "failed to sync/flush data to storage device");
108
109    /// (516) no write/read perm
110    pub const PRM: ErrCode = ErrCode::new(0x204, "missing permissions for IO");
111
112    /// (517) flush_tx error (panic inside)
113    pub const TXE: ErrCode = ErrCode::new(0x205, "flush_tx paniced inside");
114
115    /// (518) flush_tx error (unable to spawn)
116    pub const FXE: ErrCode = ErrCode::new(0x206, "unable to spawn flush_tx");
117
118    /// (519) lock poisoned
119    pub const LPN: ErrCode = ErrCode::new(0x207, "lock poisoned internally");
120
121    /// (520) type `T` implements drop
122    pub const DRP: ErrCode = ErrCode::new(0x208, "type T must not implement `Drop`");
123
124    /// (521) type `T` is not 8 bytes aligned
125    pub const ALN: ErrCode = ErrCode::new(0x209, "type T must be 8-bytes aligned");
126
127    /// (522) `size_of::<T>()` is not multiple of 8
128    pub const SZE: ErrCode = ErrCode::new(0x20A, "`size_of::<T>()` must be multiple of 8 bytes");
129
130    /// (523) type `T` must not be zero sized
131    pub const ZRO: ErrCode = ErrCode::new(0x20B, "type T must not be zero sized");
132}
133
134#[inline]
135pub(in crate::fmmap) fn new_err<R, E: std::fmt::Display>(code: ErrCode, error: E) -> FrozenResult<R> {
136    let err = FrozenError::new_raw(*mid(), ERRDOMAIN, code, error);
137    Err(err)
138}
139
140#[inline]
141pub(in crate::fmmap) fn new_err_default<R>(code: ErrCode) -> FrozenResult<R> {
142    let err = FrozenError::new_raw(*mid(), ERRDOMAIN, code, "");
143    Err(err)
144}
145
146#[inline]
147pub(in crate::fmmap) fn new_err_raw<E: std::fmt::Display>(code: ErrCode, error: E) -> FrozenError {
148    FrozenError::new_raw(*mid(), ERRDOMAIN, code, error)
149}
150
151/// Config for [`FrozenMMap`]
152#[derive(Debug, Clone)]
153pub struct FMCfg {
154    /// Number of slots to pre-allocate when [`FrozenMMap`] is initialized
155    ///
156    /// Each slot has size of [`FrozenMMap::<T>::SLOT_SIZE`], where initial file length will
157    /// be `chunk_size * initial_count` (bytes)
158    pub initial_count: usize,
159
160    /// Time interval used by flusher tx, to batch write ops into a durable window and sync them
161    /// together, where all write ops in certain time interval falls into a single durable window
162    pub flush_duration: time::Duration,
163}
164
165/// Custom implementation of `mmap(2)`
166///
167/// ## Constraints
168///
169/// [`FrozenMMap`] treats the mapped file as raw storage for values of `T`. Because of that, `T` must be a POD type
170/// which is safe to persist and later reinterpret from bytes.
171///
172/// Required properties for `T`,
173///
174/// - Must use `#[repr(C)]`
175/// - Should be 8-bytes aligned
176/// - Must not implement [`Drop`]
177/// - `size_of::<T>()` should be multiple of `8`
178///
179/// *NOTE:* `T` must not contain heap owning or process-local pointers like [`Vec`], [`String`], [`Box`], references
180/// and function pointers, or other fields whose bit-pattern is not stable across reopen.
181///
182/// These constrains are enforced as [`FrozenMMap`] does not serialize or deserialize values. It directly reads and
183/// writes `T` inside a memory mapped file. That means `T` must have a stable layout and must remain valid when the file
184/// is reopened in a later process.
185///
186/// ## Example
187///
188/// ```
189/// use frozen_core::fmmap::{FrozenMMap, FMCfg};
190///
191/// const MODULE_ID: u8 = 0;
192///
193/// let dir = tempfile::tempdir().unwrap();
194/// let path = dir.path().join("tmp_frozen_mmap");
195///
196/// let cfg = FMCfg {
197///     initial_count: 0x0A,
198///     flush_duration: std::time::Duration::from_micros(0x96),
199/// };
200///
201/// let mmap = FrozenMMap::<u64, MODULE_ID>::new(&path, cfg.clone()).unwrap();
202/// assert_eq!(mmap.total_slots(), 0x0A);
203///
204/// let epoch = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
205/// mmap.wait_for_durability(epoch).unwrap();
206///
207/// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
208/// assert_eq!(val, 0xDEADC0DE);
209///
210/// drop(mmap);
211///
212/// let reopened = FrozenMMap::<u64, MODULE_ID>::new_grown(&path, cfg, 0x05).unwrap();
213/// assert_eq!(reopened.total_slots(), 0x0A + 0x05);
214///
215/// let val = unsafe { reopened.read(0, |v| *v) }.unwrap();
216/// assert_eq!(val, 0xDEADC0DE);
217/// ```
218#[derive(Debug)]
219pub struct FrozenMMap<T, const MODULE_ID: u8>
220where
221    T: Sized + Send + Sync,
222{
223    core: sync::Arc<Core>,
224    tx: Option<thread::JoinHandle<()>>,
225    _t: core::marker::PhantomData<T>,
226}
227
228unsafe impl<T, const MODULE_ID: u8> Send for FrozenMMap<T, MODULE_ID> where T: Sized + Send + Sync {}
229unsafe impl<T, const MODULE_ID: u8> Sync for FrozenMMap<T, MODULE_ID> where T: Sized + Send + Sync {}
230
231impl<T, const MODULE_ID: u8> FrozenMMap<T, MODULE_ID>
232where
233    T: Sized + Send + Sync,
234{
235    /// Memory space required for each slot of [`T`] in [`FrozenMMap`]
236    pub const SLOT_SIZE: usize = std::mem::size_of::<T>();
237
238    /// Create a new [`FrozenMMap`] instance w/ given [`FMCfg`]
239    ///
240    /// ## Multiple Instances
241    ///
242    /// For each [`FrozenMMap`] instance, we acquire an exclusive lock from the kernal for the underlying [`FrozenFile`],
243    /// when trying to create multiple instances of [`FrozenMMap`], an error will be thrown    ///
244    ///
245    /// ## Capacity Growth
246    ///
247    /// [`FrozenMMap`] does not support in-place growth of a live mapping, to increase capacity, drop the current instance
248    /// and reopen w/ [`FrozenMMap::open_grown`] which provides memory mapping over grown capacity
249    ///
250    /// ## [`FMCfg`]
251    ///
252    /// All configs for [`FrozenMMap`] are stored in [`FMCfg`]
253    ///
254    /// ## Working
255    ///
256    /// We first create a new [`FrozenFile`] if note already, then map the entire file using `mmap(2)`,
257    /// the entire file must read/write `T`, which also should stay constant for the entire lifetime of file
258    ///
259    /// ## Important
260    ///
261    /// The `cfg` must not change any of its properties for the entire life of [`FrozenFile`], which is used under
262    /// the hood, one must use config stores like [`Rta`](https://crates.io/crates/rta) to store config
263    ///
264    /// ## Example
265    ///
266    /// ```
267    /// use frozen_core::fmmap::{FrozenMMap, FMCfg};
268    ///
269    /// const MODULE_ID: u8 = 0;
270    ///
271    /// let dir = tempfile::tempdir().unwrap();
272    /// let path = dir.path().join("tmp_frozen_mmap");
273    ///
274    /// let cfg = FMCfg {
275    ///     initial_count: 0x0A,
276    ///     flush_duration: std::time::Duration::from_micros(0x96),
277    /// };
278    ///
279    /// let mmap = FrozenMMap::<u64, MODULE_ID>::new(&path, cfg.clone()).unwrap();
280    /// assert_eq!(mmap.total_slots(), 0x0A);
281    ///
282    /// let epoch = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
283    /// mmap.wait_for_durability(epoch).unwrap();
284    ///
285    /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
286    /// assert_eq!(val, 0xDEADC0DE);
287    /// ```
288    pub fn new<P: AsRef<std::path::Path>>(path: P, cfg: FMCfg) -> FrozenResult<Self> {
289        Self::validate_t()?;
290        let (file, curr_length) = Self::open_file(path.as_ref().to_path_buf(), &cfg)?;
291        let total_slots = curr_length / Self::SLOT_SIZE;
292
293        // NOTE: The value is used for error logging and is initialized only once, as `OnceLock` guarantees that the
294        // first caller sets the value and all subsequent calls reuse it
295        let _ = MID.get_or_init(|| MODULE_ID);
296
297        let mmap = unsafe { TMap::new(file.fd(), curr_length) }?;
298        let core = sync::Arc::new(Core::new(mmap, file, cfg.flush_duration, curr_length, total_slots));
299
300        // INFO: we spawn the thread for background sync
301        let tx = Core::spawn_tx(core.clone())?;
302
303        Ok(Self { core, tx: Some(tx), _t: core::marker::PhantomData })
304    }
305
306    /// Create a new [`FrozenMMap`] instance w/ given [`FMCfg`], while growing the underlying [`FrozenFile`]
307    /// by `additional_slots` before creating memory mapping
308    ///
309    /// ## Multiple Instances
310    ///
311    /// For each [`FrozenMMap`] instance, we acquire an exclusive lock from the kernal for the underlying [`FrozenFile`],
312    /// when trying to create multiple instances of [`FrozenMMap`], an error will be thrown
313    ///
314    /// ## Why not create a [`FrozenMMap::grow`] call?
315    ///
316    /// Previously when [`FrozenMMap::grow`] was attempted, it was observed that, resizing an active memory mapping
317    /// in place is tricky in concurrent code, as some threads would still hold stale/unmapped pointers to mmap
318    /// due to preemption from the OS schedular
319    ///
320    /// So, instead of remmaping a live instance, the current API performs growth during open, making capacity expansion
321    /// an explicit lifecycle operation, and not a side effect on a live instance
322    ///
323    /// ## [`FMCfg`]
324    ///
325    /// All configs for [`FrozenMMap`] are stored in [`FMCfg`]
326    ///
327    /// ## Working
328    ///
329    /// We first create a new [`FrozenFile`] if note already, then we grow the file using [`FrozenFile::grow`]
330    /// then map the entire file using `mmap(2)`, the entire file must read/write `T`, which also should stay
331    /// constant for the entire lifetime of file
332    ///
333    /// ## Important
334    ///
335    /// The `cfg` must not change any of its properties for the entire life of [`FrozenFile`], which is used under
336    /// the hood, one must use config stores like [`Rta`](https://crates.io/crates/rta) to store config
337    ///
338    /// ## Example
339    ///
340    /// ```
341    /// use frozen_core::fmmap::{FrozenMMap, FMCfg};
342    ///
343    /// const MODULE_ID: u8 = 0;
344    ///
345    /// let dir = tempfile::tempdir().unwrap();
346    /// let path = dir.path().join("tmp_frozen_mmap");
347    ///
348    /// let cfg = FMCfg {
349    ///     initial_count: 0x0A,
350    ///     flush_duration: std::time::Duration::from_micros(0x96),
351    /// };
352    ///
353    /// let mmap = FrozenMMap::<u64, MODULE_ID>::new_grown(&path, cfg.clone(), 0x0A).unwrap();
354    /// assert_eq!(mmap.total_slots(), 0x0A * 2);
355    ///
356    /// let epoch = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
357    /// mmap.wait_for_durability(epoch).unwrap();
358    ///
359    /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
360    /// assert_eq!(val, 0xDEADC0DE);
361    /// ```
362    pub fn new_grown<P: AsRef<std::path::Path>>(path: P, cfg: FMCfg, additional_slots: usize) -> FrozenResult<Self> {
363        Self::validate_t()?;
364        let (file, _) = Self::open_file(path.as_ref().to_path_buf(), &cfg)?;
365
366        // we grow the underlying FrozenFile as requested
367        file.grow(additional_slots)?;
368        let curr_length = file.length()?; // we must read the updated (grown) length
369        let total_slots = curr_length / Self::SLOT_SIZE;
370
371        // NOTE: The value is used for error logging and is initialized only once, as `OnceLock` guarantees that the
372        // first caller sets the value and all subsequent calls reuse it
373        let _ = MID.get_or_init(|| MODULE_ID);
374
375        let mmap = unsafe { TMap::new(file.fd(), curr_length) }?;
376        let core = sync::Arc::new(Core::new(mmap, file, cfg.flush_duration, curr_length, total_slots));
377
378        // INFO: we spawn the thread for background sync
379        let tx = Core::spawn_tx(core.clone())?;
380
381        Ok(Self { core, tx: Some(tx), _t: core::marker::PhantomData })
382    }
383
384    /// Create/open a new [`FrozenFile`] instance
385    fn open_file(path: std::path::PathBuf, cfg: &FMCfg) -> FrozenResult<(FrozenFile, usize)> {
386        let ff_cfg = FFCfg { path, chunk_size: Self::SLOT_SIZE, initial_chunk_amount: cfg.initial_count };
387
388        let file = FrozenFile::new::<MODULE_ID>(ff_cfg)?;
389        let curr_length = file.length()?;
390
391        Ok((file, curr_length))
392    }
393
394    #[inline]
395    fn validate_t() -> FrozenResult<()> {
396        if std::mem::needs_drop::<T>() {
397            return new_err_default(err::DRP);
398        }
399
400        let align = std::mem::align_of::<T>();
401        if align != 8 {
402            return new_err_default(err::ALN);
403        }
404
405        let size = std::mem::size_of::<T>();
406        if size == 0 {
407            return new_err_default(err::ZRO);
408        }
409
410        if size % 8 != 0 {
411            return new_err_default(err::SZE);
412        }
413
414        Ok(())
415    }
416
417    /// Blocks until given `epoch` becomes durable
418    ///
419    /// ## Batching
420    ///
421    /// With respect to `flush_duration`, all write ops are batched before sync, which is executed by flusher tx
422    /// working in background, while each write is assigned w/ current durable epoch, and all writes which
423    /// observe the exact same epoch, belong to the same durability window, and are all sync'ed together
424    ///
425    /// When a background sync succeeds, the internal durable epoch is incremented, indicating that all writes
426    /// that observed the previous epoch are now durable on disk
427    ///
428    /// ## Example
429    ///
430    /// ```
431    /// use frozen_core::fmmap::{FrozenMMap, FMCfg};
432    ///
433    /// const MODULE_ID: u8 = 0;
434    ///
435    /// let dir = tempfile::tempdir().unwrap();
436    /// let path = dir.path().join("tmp_wait_epoch");
437    ///
438    /// let cfg = FMCfg {
439    ///     initial_count: 0x04,
440    ///     flush_duration: std::time::Duration::from_micros(0x60),
441    /// };
442    ///
443    /// let mmap = FrozenMMap::<u64, MODULE_ID>::new(&path, cfg).unwrap();
444    ///
445    /// let epoch = unsafe { mmap.write(0, |v| *v = 0x8A) }.unwrap();
446    /// mmap.wait_for_durability(epoch).unwrap();
447    ///
448    /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
449    /// assert_eq!(val, 0x8A);
450    /// ```
451    pub fn wait_for_durability(&self, epoch: u64) -> FrozenResult<()> {
452        if let Some(sync_err) = self.core.get_sync_error() {
453            return Err(sync_err);
454        }
455
456        let durable_epoch = self.core.durable_epoch.load(atomic::Ordering::Acquire);
457        if durable_epoch >= epoch {
458            return Ok(());
459        }
460
461        let mut guard = match self.core.durable_lock.lock() {
462            Ok(g) => g,
463            Err(e) => return new_err(err::LPN, e),
464        };
465
466        loop {
467            if let Some(sync_err) = self.core.get_sync_error() {
468                return Err(sync_err);
469            }
470
471            if self.core.durable_epoch.load(atomic::Ordering::Acquire) >= epoch {
472                return Ok(());
473            }
474
475            guard = match self.core.durable_cv.wait(guard) {
476                Ok(g) => g,
477                Err(e) => return new_err(err::LPN, e),
478            };
479        }
480    }
481
482    /// Read a `T` at given `index` via callback (`f`)
483    ///
484    /// ## Concurrency
485    ///
486    /// Internally, [`FrozenMMap`] implements per-slot locking, so concurrent reads and writes for at same index is atomic
487    /// and thread safe, while operations on different indices may proceed fully in parallel
488    ///
489    /// ## Safety
490    ///
491    /// The caller must ensure following:
492    ///
493    /// - given `index` is within bounds
494    /// - underlying memory contains a valid instance of `T`
495    /// - provided callback `f` must not,
496    ///   - write through the pointer
497    ///   - store or leak pointer beyound there lifetime
498    ///
499    /// Violating any of the above may result in undefined behavior
500    ///
501    /// ## Example
502    ///
503    /// ```
504    /// use frozen_core::fmmap::{FrozenMMap, FMCfg};
505    ///
506    /// const MODULE_ID: u8 = 0;
507    ///
508    /// let dir = tempfile::tempdir().unwrap();
509    /// let path = dir.path().join("tmp_read_mmap");
510    ///
511    /// let cfg = FMCfg {
512    ///     initial_count: 0x02,
513    ///     flush_duration: std::time::Duration::from_micros(0x60),
514    /// };
515    ///
516    /// let mmap = FrozenMMap::<u64, MODULE_ID>::new(&path, cfg).unwrap();
517    ///
518    /// let epoch = unsafe { mmap.write(0, |v| *v = 0x0A) }.unwrap();
519    /// mmap.wait_for_durability(epoch).unwrap();
520    ///
521    /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
522    /// assert_eq!(val, 0x0A);
523    /// ```
524    #[inline(always)]
525    pub unsafe fn read<R>(&self, index: usize, f: impl FnOnce(*const T) -> R) -> FrozenResult<R> {
526        let offset = Self::SLOT_SIZE * index;
527        let _lock = self.core.locks.lock(index);
528
529        // NOTE: We do avoid acquiring io_lock for read ops to increase the throughput (under the assumption of
530        // OS guarantees visibility)
531
532        let ptr = unsafe { self.core.map.as_ptr(offset) };
533        Ok(f(ptr))
534    }
535
536    /// Write/update a `T` at given `index` via callback (`f`)
537    ///
538    /// ## Concurrency
539    ///
540    /// Internally, [`FrozenMMap`] implements per-slot locking, so concurrent reads and writes for at same index is atomic
541    /// and thread safe, while operations on different indices may proceed fully in parallel
542    ///
543    /// ## Safety
544    ///
545    /// The caller must ensure following:
546    ///
547    /// - given `index` is within bounds
548    /// - underlying memory contains a valid instance of `T`
549    /// - provided callback `f` must not  store or leak pointer beyound there lifetime
550    ///
551    /// Violating any of the above may result in undefined behavior
552    ///
553    /// ## Example
554    ///
555    /// ```
556    /// use frozen_core::fmmap::{FrozenMMap, FMCfg};
557    ///
558    /// const MODULE_ID: u8 = 0;
559    ///
560    /// let dir = tempfile::tempdir().unwrap();
561    /// let path = dir.path().join("tmp_write_mmap");
562    ///
563    /// let cfg = FMCfg {
564    ///     initial_count: 0x02,
565    ///     flush_duration: std::time::Duration::from_micros(0x96),
566    /// };
567    ///
568    /// let mmap = FrozenMMap::<u64, MODULE_ID>::new(&path, cfg).unwrap();
569    ///
570    /// let epoch = unsafe {mmap.write(1, |v| *v = 0x2B) }.unwrap();
571    /// mmap.wait_for_durability(epoch).unwrap();
572    ///
573    /// let val = unsafe { mmap.read(1, |v| *v) }.unwrap();
574    /// assert_eq!(val, 0x2B);
575    /// ```
576    #[inline(always)]
577    pub unsafe fn write(&self, index: usize, f: impl FnOnce(*mut T)) -> FrozenResult<TEpoch> {
578        // propagate prev errors
579        if let Some(err) = self.core.get_sync_error() {
580            return Err(err);
581        }
582
583        let offset = Self::SLOT_SIZE * index;
584
585        let _guard = self.core.acquire_io_lock()?;
586        let _lock = self.core.locks.lock(index);
587
588        let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
589        f(ptr);
590
591        self.core.dirty.store(true, atomic::Ordering::Release);
592        let epoch = self.core.incr_curr_epoch();
593
594        Ok(epoch)
595    }
596
597    /// Write/update a `T` at given `index` via callback (`f`) w/ instant durability
598    ///
599    /// This function performs a blocking hard-sync, unlike [`FrozenMMap::write`], the update is immediately
600    /// persisted to the underlying storage device
601    ///
602    /// ## Concurrency
603    ///
604    /// Internally, [`FrozenMMap`] implements per-slot locking, so concurrent reads and writes for at same index is atomic
605    /// and thread safe, while operations on different indices may proceed fully in parallel
606    ///
607    /// ## Safety
608    ///
609    /// The caller must ensure following:
610    ///
611    /// - given `index` is within bounds
612    /// - underlying memory contains a valid instance of `T`
613    /// - provided callback `f` must not  store or leak pointer beyound there lifetime
614    ///
615    /// Violating any of the above may result in undefined behavior
616    ///
617    /// ## Example
618    ///
619    /// ```
620    /// use frozen_core::fmmap::{FrozenMMap, FMCfg};
621    ///
622    /// const MODULE_ID: u8 = 0;
623    ///
624    /// let dir = tempfile::tempdir().unwrap();
625    /// let path = dir.path().join("tmp_write_sync");
626    ///
627    /// let cfg = FMCfg {
628    ///     initial_count: 0x02,
629    ///     flush_duration: std::time::Duration::from_micros(0x96),
630    /// };
631    ///
632    /// let mmap = FrozenMMap::<u64, MODULE_ID>::new(&path, cfg).unwrap();
633    /// unsafe { mmap.write_sync(0, |v| *v = 0xC0DE) }.unwrap();
634    ///
635    /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
636    /// assert_eq!(val, 0xC0DE);
637    /// ```
638    #[inline(always)]
639    pub unsafe fn write_sync(&self, index: usize, f: impl FnOnce(*mut T)) -> FrozenResult<()> {
640        // propagate prev errors
641        if let Some(err) = self.core.get_sync_error() {
642            return Err(err);
643        }
644
645        let offset = Self::SLOT_SIZE * index;
646
647        // block flush_tx scheduling
648        let _flush_guard = self.core.lock.lock().map_err(|e| new_err_raw(err::LPN, e))?;
649
650        // we use exlusive lock as we perform blocking hard sync
651        let _guard = self.core.acquire_exclusive_io_lock()?;
652
653        let _lock = self.core.locks.lock(index);
654        let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
655        f(ptr);
656
657        // blocking hard sync
658        self.core.sync()?;
659
660        // NOTE: we hard sync we flushed an entier batch, so we can skip the sync via flush_tx as the
661        // current batch is durable
662
663        self.core.mark_epoch_durable();
664        let prev = self.core.dirty.swap(false, atomic::Ordering::AcqRel);
665
666        // NOTE: we must also notify cv's waiting for durability (we skip if there was no batch to sync)
667        if prev {
668            let _g = self.core.durable_lock.lock().map_err(|e| new_err_raw(err::LPN, e))?;
669            self.core.durable_cv.notify_all();
670        }
671
672        Ok(())
673    }
674
675    /// Read current available count of slots, where each slot has size of [`FrozenMMap::<T>::SLOT_SIZE`]
676    ///
677    /// ## Working
678    ///
679    /// This call performs a syscall to fetch current length of [`FrozenFile`] from fs, as the current length of the
680    /// file is not cached anywhere in the pipeline to avoid TOCTAU race conditions
681    ///
682    /// ## Example
683    ///
684    /// ```
685    /// use frozen_core::fmmap::{FrozenMMap, FMCfg};
686    ///
687    /// const MODULE_ID: u8 = 0;
688    ///
689    /// let dir = tempfile::tempdir().unwrap();
690    /// let path = dir.path().join("tmp_grow_mmap");
691    ///
692    /// let cfg = FMCfg {
693    ///     initial_count: 0x02,
694    ///     flush_duration: std::time::Duration::from_micros(0x96),
695    /// };
696    ///
697    /// let mmap = FrozenMMap::<u64, MODULE_ID>::new(&path, cfg.clone()).unwrap();
698    /// assert_eq!(mmap.total_slots(), 0x02);
699    ///
700    /// drop(mmap);
701    ///
702    /// let mmap = FrozenMMap::<u64, MODULE_ID>::new_grown(&path, cfg, 0x03).unwrap();
703    /// assert_eq!(mmap.total_slots(), 0x02 + 0x03);
704    /// ```
705    #[inline]
706    pub fn total_slots(&self) -> usize {
707        self.core.curr_length / Self::SLOT_SIZE
708    }
709
710    /// Read the total memory footprint (in bytes) used by [`FrozenMMap`]
711    ///
712    /// *NOTE:* This is an approzimation of memory used, actual RSS may differ depending on paging and OS
713    ///
714    /// ## Example
715    ///
716    /// ```
717    /// use frozen_core::fmmap::{FrozenMMap, FMCfg};
718    ///
719    /// const MODULE_ID: u8 = 0;
720    ///
721    /// let dir = tempfile::tempdir().unwrap();
722    /// let path = dir.path().join("tmp_mem_usage");
723    ///
724    /// let cfg = FMCfg {
725    ///     initial_count: 0x10,
726    ///     flush_duration: std::time::Duration::from_micros(0x60),
727    /// };
728    ///
729    /// let mmap = FrozenMMap::<u64, MODULE_ID>::new(&path, cfg).unwrap();
730    ///
731    /// let bytes = mmap.memory_usage();
732    /// assert!(bytes >= mmap.total_slots() * std::mem::size_of::<u64>());
733    /// ```
734    #[inline]
735    pub fn memory_usage(&self) -> usize {
736        // memory of mem-maped region
737        let mmap_bytes = self.core.curr_length;
738
739        // memory used for locking
740        let lock_bytes = self.core.locks.0.len() * std::mem::size_of::<atomic::AtomicU8>();
741
742        mmap_bytes + lock_bytes
743    }
744
745    /// Create a new [`FMTransaction`] context for grouping multi write ops into a single atomic operation
746    ///
747    /// ## Overview
748    ///
749    /// The use of [`FMTransaction`] allows to group multiple write ops into a single atomic operation, hence
750    /// creating a transactional write operation, which gives following guarantees,
751    ///
752    /// - All write ops succeed together
753    /// - Single epoch to track durability of all writes ops
754    /// - Same durability guarantee for all the included write ops
755    ///
756    /// Simply, this preserves atomic durability semantics for multi index updates
757    ///
758    /// ## Example
759    ///
760    /// ```
761    /// use frozen_core::fmmap::{FrozenMMap, FMCfg};
762    ///
763    /// const MID: u8 = 0;
764    ///
765    /// let dir = tempfile::tempdir().unwrap();
766    /// let path = dir.path().join("tmp_tx");
767    ///
768    /// let cfg = FMCfg {
769    ///     initial_count: 0x0A,
770    ///     flush_duration: std::time::Duration::from_micros(50),
771    /// };
772    ///
773    /// let mmap = FrozenMMap::<u64, MID>::new(&path, cfg).unwrap();
774    ///
775    /// let mut tx = mmap.new_tx();
776    /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
777    /// unsafe { tx.write(1, |v| *v = 0x14) }.unwrap();
778    ///
779    /// let epoch = tx.commit().unwrap();
780    /// mmap.wait_for_durability(epoch).unwrap();
781    ///
782    /// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
783    /// let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
784    ///
785    /// assert_eq!((v0, v1), (0x0A, 0x14));
786    /// ```
787    #[inline]
788    pub fn new_tx(&self) -> FMTransaction<'_, T> {
789        FMTransaction { core: &self.core, ops_vec: Vec::new() }
790    }
791
792    /// Delete the underlying [`FrozenFile`] used for [`FrozenMMap`] from fs
793    ///
794    /// ## Working
795    ///
796    /// When `delete` is called, all read, write, and (background) sync ops are paused (indefinitely),
797    /// whule deletion is done with following steps:
798    ///
799    /// - acquire an exclusive `io_lock` (all other ops are paused indefinitely)
800    /// - if any batch is pending for sync,
801    ///   - swap the flag
802    ///   - call sync manually
803    ///   - incr epoch and update cv
804    /// - brodcast closing so flusher tx could wrap up
805    /// - `munmap(2)` current mapping
806    /// - call delete on [`FrozenFile`]
807    ///
808    /// ## Example
809    ///
810    /// ```
811    /// use frozen_core::fmmap::{FrozenMMap, FMCfg};
812    ///
813    /// const MODULE_ID: u8 = 0;
814    ///
815    /// let dir = tempfile::tempdir().unwrap();
816    /// let path = dir.path().join("tmp_delete_mmap");
817    ///
818    /// let cfg = FMCfg {
819    ///     initial_count: 0x04,
820    ///     flush_duration: std::time::Duration::from_micros(0x96),
821    /// };
822    ///
823    /// let mut mmap = FrozenMMap::<u64, MODULE_ID>::new(&path, cfg).unwrap();
824    /// mmap.delete().unwrap();
825    /// assert!(!path.exists());
826    /// ```
827    pub fn delete(&mut self) -> FrozenResult<()> {
828        // NOTE: we must broadcast that the close is happening to allow flusher tx to wrap up
829        self.core.dirty.store(false, atomic::Ordering::Release);
830        self.core.closed.store(true, atomic::Ordering::Release);
831        self.core.durable_cv.notify_one();
832
833        if let Some(handle) = self.tx.take() {
834            let _ = handle.join();
835        }
836
837        // pause all new write ops
838        let _lock = self.core.acquire_exclusive_io_lock()?;
839
840        self.munmap()?;
841        self.core.file.delete()
842    }
843
844    #[inline]
845    fn munmap(&self) -> FrozenResult<()> {
846        let length = self.core.curr_length;
847        unsafe { self.core.map.unmap(length) }
848    }
849}
850
851impl<T, const MODULE_ID: u8> Drop for FrozenMMap<T, MODULE_ID>
852where
853    T: Sized + Send + Sync,
854{
855    fn drop(&mut self) {
856        let is_closed = self.core.closed.swap(true, atomic::Ordering::Release);
857        self.core.cv.notify_one(); // notify flusher tx to shut
858
859        if let Some(handle) = self.tx.take() {
860            let _ = handle.join();
861        }
862
863        // we must acquire an exclusive lock, to prevent dropping while sync, growing or any io ops
864        let _io_lock = self.core.acquire_exclusive_io_lock();
865
866        // free up the boxed error (if any)
867        let ptr = self.core.error.swap(std::ptr::null_mut(), atomic::Ordering::AcqRel);
868        if !ptr.is_null() {
869            unsafe {
870                drop(Box::from_raw(ptr));
871            }
872        }
873
874        if !is_closed {
875            let _ = self.munmap();
876        }
877    }
878}
879
880impl<T, const MODULE_ID: u8> fmt::Display for FrozenMMap<T, MODULE_ID>
881where
882    T: Sized + Send + Sync,
883{
884    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
885        write!(
886            f,
887            "FrozenMMap{{fd: {}, total_slots: {}, len: {}}}",
888            self.core.file.fd(),
889            self.total_slots(),
890            self.core.curr_length,
891        )
892    }
893}
894
895/// A context for grouping multi write ops into a single atomic operation
896///
897/// ## Overview
898///
899/// Use of [`FMTransaction`] allows to group multiple write ops into a single atomic operation.
900///
901/// - All included writes are applied together
902/// - Single epoch is assinged for an entier transaction
903/// - Durability guarantee is same for all included write ops
904///
905/// ## Example
906///
907/// ```
908/// use frozen_core::fmmap::{FrozenMMap, FMCfg};
909///
910/// const MID: u8 = 0;
911///
912/// let dir = tempfile::tempdir().unwrap();
913/// let path = dir.path().join("tmp_tx");
914///
915/// let cfg = FMCfg {
916///     initial_count: 0x0A,
917///     flush_duration: std::time::Duration::from_micros(50),
918/// };
919///
920/// let mmap = FrozenMMap::<u64, MID>::new(&path, cfg).unwrap();
921///
922/// let mut tx = mmap.new_tx();
923/// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
924/// unsafe { tx.write(1, |v| *v = 0x14) }.unwrap();
925/// unsafe { tx.write(2, |v| *v = 0x18) }.unwrap();
926///
927/// let epoch = tx.commit().unwrap();
928/// mmap.wait_for_durability(epoch).unwrap();
929///
930/// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
931/// let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
932/// let v2 = unsafe { mmap.read(2, |v| *v).unwrap() };
933///
934/// assert_eq!((v0, v1, v2), (0x0A, 0x14, 0x18));
935/// ```
936pub struct FMTransaction<'a, T> {
937    core: &'a Core,
938    ops_vec: Vec<(usize, Box<dyn FnOnce(*mut T) + 'a>)>,
939}
940
941impl<'a, T> FMTransaction<'a, T> {
942    /// Append a write op into the [`FMTransaction`]
943    ///
944    /// ## Requirements
945    ///
946    /// Write ops must follow these safety requirements,
947    ///
948    /// - No duplicate indices
949    /// - No out-of-order writes (must be incremental)
950    ///
951    /// Violating this constraint would result in [`FrozenError`]
952    ///
953    /// ## Safety
954    ///
955    /// Same safety requirements as [`FrozenMMap::write`] apply here
956    ///
957    /// ## Example
958    ///
959    /// ```
960    /// use frozen_core::fmmap::{FrozenMMap, FMCfg};
961    ///
962    /// const MID: u8 = 0;
963    ///
964    /// let dir = tempfile::tempdir().unwrap();
965    /// let path = dir.path().join("tmp_tx");
966    ///
967    /// let cfg = FMCfg {
968    ///     initial_count: 0x10,
969    ///     flush_duration: std::time::Duration::from_micros(50),
970    /// };
971    ///
972    /// let mmap = FrozenMMap::<u64, MID>::new(&path, cfg).unwrap();
973    ///
974    /// let mut tx = mmap.new_tx();
975    /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
976    /// unsafe { tx.write(1, |v| *v = 0x0B) }.unwrap();
977    /// unsafe { tx.write(2, |v| *v = 0x0C) }.unwrap();
978    ///
979    /// let epoch = tx.commit().unwrap();
980    /// mmap.wait_for_durability(epoch).unwrap();
981    ///
982    /// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
983    /// let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
984    /// let v2 = unsafe { mmap.read(2, |v| *v).unwrap() };
985    ///
986    /// assert_eq!((v0, v1, v2), (0x0A, 0x0B, 0x0C));
987    /// ```
988    #[inline(always)]
989    pub unsafe fn write<F>(&mut self, index: usize, f: F) -> FrozenResult<()>
990    where
991        F: FnOnce(*mut T) + 'a,
992    {
993        // NOTE:
994        //
995        // This check prevents a potential footgun! For a safe transaction all writes must be,
996        // - ordered by index (either incr or decr)
997        // - no multi writes on same index
998        //
999        // If any of these is violated, there is a certain risk of deadlock in multi tx env's
1000        if let Some((last_idx, _)) = self.ops_vec.last() {
1001            if index <= *last_idx {
1002                return new_err(
1003                    err::HCF,
1004                    "tx writes must be strictly ordered, with no more then single ops on given index",
1005                );
1006            }
1007        }
1008
1009        self.ops_vec.push((index, Box::new(f)));
1010        Ok(())
1011    }
1012
1013    /// Commit the transaction, applying all the writes ops, combined into a single atomic operation
1014    ///
1015    /// ## Guarantees
1016    ///
1017    /// - All writes are applied under a single epoch
1018    /// - All writes belong to the same durability batch
1019    /// - No interleaving with other transactions at epoch level
1020    ///
1021    /// ## Example
1022    ///
1023    /// ```
1024    /// use frozen_core::fmmap::{FrozenMMap, FMCfg};
1025    ///
1026    /// const MID: u8 = 0;
1027    ///
1028    /// let dir = tempfile::tempdir().unwrap();
1029    /// let path = dir.path().join("tmp_tx");
1030    ///
1031    /// let cfg = FMCfg {
1032    ///     initial_count: 0x10,
1033    ///     flush_duration: std::time::Duration::from_micros(50),
1034    /// };
1035    ///
1036    /// let mmap = FrozenMMap::<u64, MID>::new(&path, cfg).unwrap();
1037    ///
1038    /// let mut tx = mmap.new_tx();
1039    /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
1040    /// unsafe { tx.write(2, |v| *v = 0x0C) }.unwrap();
1041    ///
1042    /// let epoch = tx.commit().unwrap();
1043    /// mmap.wait_for_durability(epoch).unwrap();
1044    ///
1045    /// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1046    /// let v1 = unsafe { mmap.read(2, |v| *v).unwrap() };
1047    ///
1048    /// assert_eq!((v0, v1), (0x0A, 0x0C));
1049    /// ```
1050    #[inline(always)]
1051    pub fn commit(self) -> FrozenResult<u64> {
1052        if let Some(err) = self.core.get_sync_error() {
1053            return Err(err);
1054        }
1055
1056        let _guard = self.core.acquire_io_lock()?;
1057
1058        // NOTE: we must acquire all locks beforehand, to make sure all the write go through
1059        let mut guards = Vec::with_capacity(self.ops_vec.len());
1060        for (idx, _) in &self.ops_vec {
1061            guards.push(self.core.locks.lock(*idx));
1062        }
1063
1064        for (idx, op) in self.ops_vec {
1065            let offset = idx * std::mem::size_of::<T>();
1066            let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
1067            op(ptr);
1068        }
1069
1070        self.core.dirty.store(true, atomic::Ordering::Release);
1071        let epoch = self.core.incr_curr_epoch();
1072
1073        Ok(epoch)
1074    }
1075}
1076
1077#[derive(Debug)]
1078struct Core {
1079    map: TMap,
1080    locks: Locks,
1081    file: FrozenFile,
1082    cv: sync::Condvar,
1083    curr_length: usize,
1084    lock: sync::Mutex<()>,
1085    io_lock: sync::RwLock<()>,
1086    dirty: atomic::AtomicBool,
1087    durable_cv: sync::Condvar,
1088    closed: atomic::AtomicBool,
1089    durable_lock: sync::Mutex<()>,
1090    flush_duration: time::Duration,
1091    current_epoch: atomic::AtomicU64,
1092    durable_epoch: atomic::AtomicU64,
1093    error: atomic::AtomicPtr<sync::Arc<FrozenError>>,
1094}
1095
1096unsafe impl Send for Core {}
1097unsafe impl Sync for Core {}
1098
1099impl Core {
1100    fn new(
1101        map: TMap,
1102        file: FrozenFile,
1103        flush_duration: time::Duration,
1104        curr_length: usize,
1105        total_slots: usize,
1106    ) -> Self {
1107        Self {
1108            map,
1109            file,
1110            curr_length,
1111            flush_duration,
1112            cv: sync::Condvar::new(),
1113            lock: sync::Mutex::new(()),
1114            io_lock: sync::RwLock::new(()),
1115            locks: Locks::new(total_slots),
1116            durable_cv: sync::Condvar::new(),
1117            durable_lock: sync::Mutex::new(()),
1118            dirty: atomic::AtomicBool::new(false),
1119            closed: atomic::AtomicBool::new(false),
1120            current_epoch: atomic::AtomicU64::new(0),
1121            durable_epoch: atomic::AtomicU64::new(0),
1122            error: atomic::AtomicPtr::new(std::ptr::null_mut()),
1123        }
1124    }
1125
1126    #[inline]
1127    fn sync(&self) -> FrozenResult<()> {
1128        unsafe { self.map.sync(self.curr_length) }?;
1129        self.file.sync()
1130    }
1131
1132    #[inline(always)]
1133    fn set_sync_error(&self, err: FrozenError) {
1134        let boxed = Box::into_raw(Box::new(sync::Arc::new(err)));
1135        let old = self.error.swap(boxed, atomic::Ordering::AcqRel);
1136
1137        // NOTE: we must free the old error, if any, to avoid mem leaks
1138        if !old.is_null() {
1139            unsafe { drop(Box::from_raw(old)) };
1140        }
1141    }
1142
1143    #[inline(always)]
1144    fn get_sync_error(&self) -> Option<FrozenError> {
1145        let ptr = self.error.load(atomic::Ordering::Acquire);
1146        if hints::likely(ptr.is_null()) {
1147            return None;
1148        }
1149
1150        let arc = unsafe { &*ptr }.clone();
1151        Some((*arc).clone())
1152    }
1153
1154    #[inline]
1155    fn clear_sync_error(&self) {
1156        let old = self.error.swap(std::ptr::null_mut(), atomic::Ordering::AcqRel);
1157        if hints::unlikely(!old.is_null()) {
1158            unsafe {
1159                drop(Box::from_raw(old));
1160            }
1161        }
1162    }
1163
1164    #[inline]
1165    fn acquire_io_lock(&self) -> FrozenResult<sync::RwLockReadGuard<'_, ()>> {
1166        self.io_lock.read().map_err(|e| new_err_raw(err::LPN, e))
1167    }
1168
1169    #[inline]
1170    fn acquire_exclusive_io_lock(&self) -> FrozenResult<sync::RwLockWriteGuard<'_, ()>> {
1171        self.io_lock.write().map_err(|e| new_err_raw(err::LPN, e))
1172    }
1173
1174    #[inline]
1175    fn incr_curr_epoch(&self) -> u64 {
1176        self.current_epoch.fetch_add(1, atomic::Ordering::Release) + 1
1177    }
1178
1179    #[inline]
1180    fn mark_epoch_durable(&self) {
1181        let curr_epoch = self.current_epoch.load(atomic::Ordering::Acquire);
1182        self.durable_epoch.store(curr_epoch, atomic::Ordering::Release);
1183    }
1184
1185    fn spawn_tx(core: sync::Arc<Self>) -> FrozenResult<thread::JoinHandle<()>> {
1186        match thread::Builder::new().name("fm-flush-tx".into()).spawn(move || Self::flush_tx(core)) {
1187            Ok(tx) => Ok(tx),
1188            Err(error) => new_err(err::FXE, error),
1189        }
1190    }
1191
1192    fn flush_tx(core: sync::Arc<Self>) {
1193        // init phase (acquiring locks)
1194        let mut guard = match core.lock.lock() {
1195            Ok(g) => g,
1196            Err(error) => {
1197                core.set_sync_error(new_err_raw(err::FXE, error));
1198                core.cv.notify_all();
1199                return;
1200            }
1201        };
1202
1203        // sync loop w/ non-busy waiting
1204        loop {
1205            guard = match core.cv.wait_timeout(guard, core.flush_duration) {
1206                Ok((g, _)) => g,
1207                Err(e) => {
1208                    core.set_sync_error(new_err_raw(err::TXE, e));
1209                    core.cv.notify_all();
1210                    return;
1211                }
1212            };
1213
1214            // NOTE: we must read values of close brodcast before acquire exclusive lock,
1215            // if done otherwise, we impose serious deadlock sort of situation for the the flusher tx
1216
1217            let dirty = core.dirty.swap(false, atomic::Ordering::AcqRel);
1218            let closing = core.closed.load(atomic::Ordering::Acquire);
1219
1220            if !dirty {
1221                if closing {
1222                    core.cv.notify_all();
1223                    return;
1224                }
1225
1226                continue;
1227            }
1228
1229            // INFO: we must acquire an exclusive IO lock for sync, hence no write, read or
1230            // grow could kick in while sync is in progress
1231
1232            let io_lock = match core.acquire_exclusive_io_lock() {
1233                Ok(lock) => lock,
1234                Err(e) => {
1235                    core.set_sync_error(e);
1236                    core.cv.notify_all();
1237                    return;
1238                }
1239            };
1240
1241            // INFO: we must drop the guard before syscall, as its a blocking operation and holding
1242            // the mutex while the syscall takes place is not a good idea, while we drop the mutex
1243            // and acqurie it again, in-between other process could acquire it and use it
1244            drop(guard);
1245
1246            // NOTE: We snapshot `current_epoch` before sync to establish a strict batch boundary, all writes
1247            // up to `target_epoch` are guaranteed to be part of this flush window, while anything beyound belongs
1248            // to the next batch
1249            let target_epoch = core.current_epoch.load(atomic::Ordering::Acquire);
1250
1251            // NOTE:
1252            //
1253            // - if sync fails, we update the Core::error w/ the received error object
1254            // - we clear it up when another sync call succeeds
1255            // - this is valid, as the underlying sync flushes entire mmaped region, hence
1256            //   even if the last call failed, and the new one succeeded, we do get the durability
1257            //   guarenty for the old data as well
1258
1259            match core.sync() {
1260                Ok(_) => {
1261                    core.durable_epoch.store(target_epoch, atomic::Ordering::Release);
1262
1263                    let _g = match core.durable_lock.lock() {
1264                        Ok(g) => g,
1265                        Err(e) => {
1266                            core.set_sync_error(new_err_raw(err::LPN, e));
1267                            return;
1268                        }
1269                    };
1270
1271                    core.clear_sync_error();
1272                    core.durable_cv.notify_all();
1273                }
1274                Err(err) => {
1275                    core.set_sync_error(err);
1276                    core.durable_cv.notify_all();
1277                }
1278            }
1279
1280            drop(io_lock);
1281            guard = match core.lock.lock() {
1282                Ok(g) => g,
1283                Err(e) => {
1284                    core.set_sync_error(new_err_raw(err::LPN, e));
1285                    core.durable_cv.notify_all();
1286                    return;
1287                }
1288            };
1289        }
1290    }
1291}
1292
1293#[derive(Debug)]
1294struct Locks(Box<[atomic::AtomicU8]>);
1295
1296impl Locks {
1297    const LOCK: u8 = 1;
1298    const UNLOCK: u8 = 0;
1299
1300    const L1_CONTENTION: usize = 0x10;
1301    const L2_CONTENTION: usize = 0x20;
1302
1303    fn new(cap: usize) -> Self {
1304        let mut slots = Vec::with_capacity(cap);
1305        for _ in 0..cap {
1306            slots.push(atomic::AtomicU8::new(Self::UNLOCK));
1307        }
1308
1309        Self(slots.into_boxed_slice())
1310    }
1311
1312    #[inline(always)]
1313    fn lock(&self, index: usize) -> LockGuard<'_> {
1314        let val = &self.0[index];
1315        let mut spins = 0;
1316
1317        loop {
1318            if val
1319                .compare_exchange_weak(Self::UNLOCK, Self::LOCK, atomic::Ordering::Acquire, atomic::Ordering::Relaxed)
1320                .is_ok()
1321            {
1322                return LockGuard(val);
1323            }
1324
1325            // we must backoff under contention
1326
1327            // NOTE:
1328            //
1329            // We have three levels of backoff,
1330            //
1331            // 1. Busy waiting w/ CPU hint
1332            // 2. Willingly allow preemption by OS schedular
1333            // 3. Sleep the thread (increasing w/ exponenial factor)
1334
1335            if hints::likely(spins < Self::L1_CONTENTION) {
1336                std::hint::spin_loop();
1337            } else if spins < Self::L2_CONTENTION {
1338                thread::yield_now();
1339            } else {
1340                let ns = 0x30 << (spins - Self::L2_CONTENTION).min(0x0A);
1341                thread::sleep(time::Duration::from_nanos(ns));
1342            }
1343
1344            spins += 1;
1345        }
1346    }
1347}
1348
1349struct LockGuard<'a>(&'a atomic::AtomicU8);
1350
1351impl Drop for LockGuard<'_> {
1352    fn drop(&mut self) {
1353        self.0.store(Locks::UNLOCK, atomic::Ordering::Release);
1354    }
1355}
1356
1357#[cfg(test)]
1358mod tests {
1359    use super::*;
1360
1361    // NOTE: we keep this small on purpose, so we won't have to wait at all in tests
1362    const FLUSH_DURATION: time::Duration = time::Duration::from_micros(10);
1363
1364    const MID: u8 = 0;
1365    const INIT_SLOTS: usize = 0x0A;
1366
1367    fn new_tmp() -> (tempfile::TempDir, std::path::PathBuf, FMCfg) {
1368        let dir = tempfile::tempdir().unwrap();
1369        let path = dir.path().join("tmp_map");
1370
1371        let cfg = FMCfg { initial_count: INIT_SLOTS, flush_duration: FLUSH_DURATION };
1372
1373        (dir, path, cfg)
1374    }
1375
1376    mod fm_lifecycle {
1377        use super::*;
1378
1379        #[test]
1380        fn ok_new() {
1381            let (_dir, path, cfg) = new_tmp();
1382            let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
1383
1384            assert_eq!(mmap.core.flush_duration, FLUSH_DURATION);
1385            assert!(!mmap.core.dirty.load(atomic::Ordering::Acquire));
1386            assert!(!mmap.core.closed.load(atomic::Ordering::Acquire));
1387            assert_eq!(mmap.core.durable_epoch.load(atomic::Ordering::Acquire), 0);
1388            assert_eq!(mmap.core.curr_length, INIT_SLOTS * FrozenMMap::<u64, MID>::SLOT_SIZE);
1389
1390            // satisfies the bg thread was spawned correctly
1391            assert!(mmap.core.error.load(atomic::Ordering::Acquire).is_null());
1392
1393            // satisfies wait on epoch works
1394            let epoch = unsafe { mmap.write(0, |f| *f = 0x0A).unwrap() };
1395            assert!(mmap.wait_for_durability(epoch).is_ok());
1396        }
1397
1398        #[test]
1399        fn ok_new_existing() {
1400            let (_dir, path, cfg) = new_tmp();
1401
1402            // create new + close
1403            let mmap1 = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
1404            drop(mmap1);
1405
1406            // open existing
1407            let mmap2 = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
1408            drop(mmap2);
1409        }
1410
1411        #[test]
1412        fn err_new_when_change_in_cfg() {
1413            let (_dir, path, mut cfg) = new_tmp();
1414
1415            // create new + close
1416            let mmap1 = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
1417            drop(mmap1);
1418
1419            // update cfg + opne existing
1420            cfg.initial_count = INIT_SLOTS * 2;
1421            assert!(FrozenMMap::<u64, MID>::new(path, cfg).is_err());
1422        }
1423
1424        #[test]
1425        fn ok_delete() {
1426            let (_dir, path, cfg) = new_tmp();
1427            let mut mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
1428
1429            mmap.delete().unwrap();
1430            assert!(!mmap.core.file.exists().unwrap());
1431        }
1432
1433        #[test]
1434        fn err_delete_after_delete() {
1435            let (_dir, path, cfg) = new_tmp();
1436            let mut mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
1437
1438            mmap.delete().unwrap();
1439            assert!(!mmap.core.file.exists().unwrap());
1440            assert!(mmap.delete().is_err());
1441        }
1442
1443        #[test]
1444        fn ok_drop_persists_when_dropped_before_bg_flush() {
1445            let (_dir, path, cfg) = new_tmp();
1446            const VAL: u64 = 0x0A;
1447
1448            {
1449                let mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
1450                unsafe { mmap.write(0, |byte| *byte = VAL).unwrap() };
1451                drop(mmap);
1452            }
1453
1454            {
1455                let mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
1456                let val = unsafe { mmap.read(0, |byte| *byte).unwrap() };
1457                assert_eq!(val, VAL);
1458            }
1459        }
1460    }
1461
1462    mod fm_validate_t {
1463        use super::*;
1464
1465        #[repr(C, align(8))]
1466        struct OkT {
1467            a: u64,
1468            b: u64,
1469        }
1470
1471        #[repr(C)]
1472        struct BadAlignT {
1473            a: u32,
1474            b: u32,
1475        }
1476
1477        #[repr(C, align(4))]
1478        struct BadSizeT {
1479            a: u32,
1480            b: u16,
1481        }
1482
1483        #[repr(C, align(8))]
1484        struct DropT(u64);
1485
1486        impl Drop for DropT {
1487            fn drop(&mut self) {}
1488        }
1489
1490        #[repr(C, align(8))]
1491        struct ZstT;
1492
1493        #[test]
1494        fn ok_validate_t() {
1495            assert!(FrozenMMap::<OkT, MID>::validate_t().is_ok());
1496        }
1497
1498        #[test]
1499        fn err_validate_t_when_drop() {
1500            assert!(FrozenMMap::<DropT, MID>::validate_t().is_err());
1501        }
1502
1503        #[test]
1504        fn err_validate_t_when_not_8_byte_aligned() {
1505            assert!(FrozenMMap::<BadAlignT, MID>::validate_t().is_err());
1506        }
1507
1508        #[test]
1509        fn err_validate_t_when_size_not_multiple_of_8() {
1510            assert!(FrozenMMap::<BadSizeT, MID>::validate_t().is_err());
1511        }
1512
1513        #[test]
1514        fn err_validate_t_when_zero_sized() {
1515            assert!(FrozenMMap::<ZstT, MID>::validate_t().is_err());
1516        }
1517
1518        #[test]
1519        fn err_new_when_t_implements_drop() {
1520            let (_dir, path, cfg) = new_tmp();
1521            assert!(FrozenMMap::<DropT, MID>::new(path, cfg).is_err());
1522        }
1523
1524        #[test]
1525        fn err_new_when_t_is_not_8_byte_aligned() {
1526            let (_dir, path, cfg) = new_tmp();
1527            assert!(FrozenMMap::<BadAlignT, MID>::new(path, cfg).is_err());
1528        }
1529
1530        #[test]
1531        fn err_new_when_t_size_is_not_multiple_of_8() {
1532            let (_dir, path, cfg) = new_tmp();
1533            assert!(FrozenMMap::<BadSizeT, MID>::new(path, cfg).is_err());
1534        }
1535
1536        #[test]
1537        fn err_new_when_t_is_zero_sized() {
1538            let (_dir, path, cfg) = new_tmp();
1539            assert!(FrozenMMap::<ZstT, MID>::new(path, cfg).is_err());
1540        }
1541
1542        #[test]
1543        fn err_new_grown_when_t_implements_drop() {
1544            let (_dir, path, cfg) = new_tmp();
1545            assert!(FrozenMMap::<DropT, MID>::new_grown(path, cfg, 1).is_err());
1546        }
1547
1548        #[test]
1549        fn err_new_grown_when_t_is_not_8_byte_aligned() {
1550            let (_dir, path, cfg) = new_tmp();
1551            assert!(FrozenMMap::<BadAlignT, MID>::new_grown(path, cfg, 1).is_err());
1552        }
1553
1554        #[test]
1555        fn err_new_grown_when_t_size_is_not_multiple_of_8() {
1556            let (_dir, path, cfg) = new_tmp();
1557            assert!(FrozenMMap::<BadSizeT, MID>::new_grown(path, cfg, 1).is_err());
1558        }
1559
1560        #[test]
1561        fn err_new_grown_when_t_is_zero_sized() {
1562            let (_dir, path, cfg) = new_tmp();
1563            assert!(FrozenMMap::<ZstT, MID>::new_grown(path, cfg, 1).is_err());
1564        }
1565    }
1566
1567    mod fm_new_grown {
1568        use super::*;
1569
1570        #[test]
1571        fn ok_new_grown_updates_length() {
1572            let (_dir, path, cfg) = new_tmp();
1573
1574            let mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
1575            assert_eq!(mmap.total_slots(), INIT_SLOTS);
1576            drop(mmap);
1577
1578            let mmap = FrozenMMap::<u64, MID>::new_grown(path, cfg, 0x0A).unwrap();
1579            assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x0A);
1580            assert_eq!(mmap.core.curr_length, (INIT_SLOTS + 0x0A) * FrozenMMap::<u64, MID>::SLOT_SIZE);
1581        }
1582
1583        #[test]
1584        fn err_new_grown_with_preexisting_instance() {
1585            let (_dir, path, cfg) = new_tmp();
1586
1587            let mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
1588            assert_eq!(mmap.total_slots(), INIT_SLOTS);
1589
1590            assert!(FrozenMMap::<u64, MID>::new_grown(path, cfg, 0x0A).is_err());
1591        }
1592
1593        #[test]
1594        fn ok_new_grown_cycle() {
1595            let (_dir, path, cfg) = new_tmp();
1596
1597            let mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
1598            drop(mmap);
1599
1600            let mmap = FrozenMMap::<u64, MID>::new_grown(&path, cfg.clone(), 0x100).unwrap();
1601            assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x100);
1602            drop(mmap);
1603
1604            let mmap = FrozenMMap::<u64, MID>::new_grown(&path, cfg.clone(), 0x100).unwrap();
1605            assert_eq!(mmap.total_slots(), INIT_SLOTS + (2 * 0x100));
1606            drop(mmap);
1607
1608            let mmap = FrozenMMap::<u64, MID>::new_grown(path, cfg, 0x100).unwrap();
1609            assert_eq!(mmap.total_slots(), INIT_SLOTS + (3 * 0x100));
1610        }
1611
1612        #[test]
1613        fn ok_write_reopen_grown_read() {
1614            let (_dir, path, cfg) = new_tmp();
1615
1616            {
1617                let mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
1618                unsafe { mmap.write(0, |v| *v = 0xAA).unwrap() };
1619            }
1620
1621            {
1622                let mmap = FrozenMMap::<u64, MID>::new_grown(&path, cfg.clone(), 0x10).unwrap();
1623                unsafe { mmap.write(0, |v| *v = 0xBB).unwrap() };
1624
1625                let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1626                assert_eq!(val, 0xBB);
1627            }
1628        }
1629
1630        #[test]
1631        fn ok_write_reopen_grown_read_cycle() {
1632            let (_dir, path, cfg) = new_tmp();
1633
1634            {
1635                let mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
1636                unsafe { mmap.write(0, |v| *v = 1).unwrap() };
1637            }
1638
1639            for i in 0..2 {
1640                let mmap = FrozenMMap::<u64, MID>::new_grown(&path, cfg.clone(), 0x10).unwrap();
1641                let idx = mmap.total_slots() - 1;
1642                unsafe { mmap.write(idx, |v| *v = (i + 2) as u64).unwrap() };
1643                drop(mmap);
1644            }
1645
1646            let mmap = FrozenMMap::<u64, MID>::new(&path, cfg).unwrap();
1647
1648            let base = unsafe { mmap.read(0, |v| *v).unwrap() };
1649            assert_eq!(base, 1);
1650
1651            let last_idx = mmap.total_slots() - 1;
1652            let last = unsafe { mmap.read(last_idx, |v| *v).unwrap() };
1653            assert_eq!(last, 3);
1654        }
1655
1656        #[test]
1657        fn err_new_grown_while_previous_instance_is_alive() {
1658            let (_dir, path, cfg) = new_tmp();
1659
1660            let _mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
1661            let reopened = FrozenMMap::<u64, MID>::new_grown(&path, cfg, 0x10);
1662
1663            assert!(reopened.is_err());
1664        }
1665    }
1666
1667    mod fm_write_read {
1668        use super::*;
1669
1670        #[test]
1671        fn ok_write_wait_read_cycle() {
1672            const VAL: u64 = 0xDEADC0DE;
1673            let (_dir, path, cfg) = new_tmp();
1674            let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
1675
1676            // write + sync
1677            let epoch = unsafe { mmap.write(0, |ptr| *ptr = VAL).unwrap() };
1678            mmap.wait_for_durability(epoch).unwrap();
1679
1680            // read + verify
1681            let val = unsafe { mmap.read(0, |ptr| *ptr).unwrap() };
1682            assert_eq!(val, VAL);
1683        }
1684
1685        #[test]
1686        fn ok_write_read_without_wait() {
1687            const VAL: u64 = 0xDEADC0DE;
1688            let (_dir, path, cfg) = new_tmp();
1689            let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
1690
1691            unsafe { mmap.write(0, |ptr| *ptr = VAL).unwrap() };
1692            let val = unsafe { mmap.read(0, |ptr| *ptr).unwrap() };
1693            assert_eq!(val, VAL);
1694        }
1695    }
1696
1697    mod fm_write_sync_read {
1698        use super::*;
1699
1700        #[test]
1701        fn ok_write_sync_read() {
1702            let (_dir, path, cfg) = new_tmp();
1703            let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
1704
1705            unsafe { mmap.write_sync(0, |v| *v = 0x4C).unwrap() };
1706
1707            let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1708            assert_eq!(val, 0x4C);
1709        }
1710
1711        #[test]
1712        fn ok_write_sync_wait_returns_immediately() {
1713            let (_dir, path, cfg) = new_tmp();
1714            let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
1715
1716            let _ = unsafe { mmap.write_sync(0, |v| *v = 0x6A).unwrap() };
1717
1718            let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1719            assert_eq!(val, 0x6A);
1720        }
1721
1722        #[test]
1723        fn ok_write_sync_followed_by_async_write() {
1724            let (_dir, path, cfg) = new_tmp();
1725
1726            let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
1727            unsafe { mmap.write_sync(0, |v| *v = 0x0A).unwrap() };
1728
1729            let epoch = unsafe { mmap.write(0, |v| *v = 0x14).unwrap() };
1730            mmap.wait_for_durability(epoch).unwrap();
1731
1732            let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1733            assert_eq!(val, 0x14);
1734        }
1735
1736        #[test]
1737        fn ok_write_sync_makes_prev_batch_durable() {
1738            let (_dir, path, cfg) = new_tmp();
1739            let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
1740
1741            let async_epoch = unsafe { mmap.write(0, |v| *v = 1).unwrap() };
1742            unsafe { mmap.write_sync(1, |v| *v = 2).unwrap() };
1743            mmap.wait_for_durability(async_epoch).unwrap();
1744
1745            let v1 = unsafe { mmap.read(0, |v| *v).unwrap() };
1746            let v2 = unsafe { mmap.read(1, |v| *v).unwrap() };
1747
1748            assert_eq!(v1, 1);
1749            assert_eq!(v2, 2);
1750        }
1751
1752        #[test]
1753        fn ok_write_sync_persists_across_reopen() {
1754            let (_dir, path, cfg) = new_tmp();
1755            const VAL: u64 = 0x1000;
1756
1757            {
1758                let mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
1759                unsafe { mmap.write_sync(0, |v| *v = VAL).unwrap() };
1760            }
1761
1762            {
1763                let mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
1764                let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1765                assert_eq!(val, VAL);
1766            }
1767        }
1768    }
1769
1770    mod fm_tx {
1771        use super::*;
1772
1773        #[test]
1774        fn ok_tx_basic_multi_write() {
1775            let (_dir, path, cfg) = new_tmp();
1776            let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
1777
1778            let mut tx = mmap.new_tx();
1779            unsafe {
1780                tx.write(0, |v| *v = 1).unwrap();
1781                tx.write(1, |v| *v = 2).unwrap();
1782                tx.write(2, |v| *v = 3).unwrap();
1783            }
1784
1785            let epoch = tx.commit().unwrap();
1786            mmap.wait_for_durability(epoch).unwrap();
1787
1788            let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1789            let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
1790            let v2 = unsafe { mmap.read(2, |v| *v).unwrap() };
1791
1792            assert_eq!((v0, v1, v2), (1, 2, 3));
1793        }
1794
1795        #[test]
1796        fn ok_tx_single_epoch() {
1797            let (_dir, path, cfg) = new_tmp();
1798            let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
1799
1800            let mut tx = mmap.new_tx();
1801            unsafe {
1802                tx.write(0, |v| *v = 0x0A).unwrap();
1803                tx.write(1, |v| *v = 0x14).unwrap();
1804            }
1805
1806            // NOTE: next write must have strictly higher epoch then current one
1807
1808            let epoch = tx.commit().unwrap();
1809            let next_epoch = unsafe { mmap.write(2, |v| *v = 0x1E).unwrap() };
1810
1811            assert!(next_epoch > epoch);
1812        }
1813
1814        #[test]
1815        fn err_tx_out_of_order() {
1816            let (_dir, path, cfg) = new_tmp();
1817            let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
1818
1819            let mut tx = mmap.new_tx();
1820            unsafe {
1821                tx.write(2, |v| *v = 1).unwrap();
1822                let res = tx.write(1, |v| *v = 2);
1823                assert!(res.is_err());
1824            }
1825        }
1826
1827        #[test]
1828        fn err_tx_duplicate_index() {
1829            let (_dir, path, cfg) = new_tmp();
1830            let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
1831
1832            let mut tx = mmap.new_tx();
1833            unsafe {
1834                tx.write(1, |v| *v = 1).unwrap();
1835                let res = tx.write(1, |v| *v = 2);
1836                assert!(res.is_err());
1837            }
1838        }
1839
1840        #[test]
1841        fn ok_tx_concurrent_non_overlapping() {
1842            let (_dir, path, cfg) = new_tmp();
1843            let mmap = sync::Arc::new(FrozenMMap::<u64, MID>::new(path, cfg).unwrap());
1844
1845            let mut handles = Vec::new();
1846            for i in 0..2 {
1847                let mmap = mmap.clone();
1848                handles.push(thread::spawn(move || {
1849                    let mut tx = mmap.new_tx();
1850
1851                    unsafe {
1852                        tx.write(i * 2, |v| *v = i as u64).unwrap();
1853                        tx.write(i * 2 + 1, |v| *v = i as u64).unwrap();
1854                    }
1855
1856                    let epoch = tx.commit().unwrap();
1857                    mmap.wait_for_durability(epoch).unwrap();
1858                }));
1859            }
1860
1861            for h in handles {
1862                h.join().unwrap();
1863            }
1864
1865            for i in 0..2 {
1866                let v0 = unsafe { mmap.read(i * 2, |v| *v).unwrap() };
1867                let v1 = unsafe { mmap.read(i * 2 + 1, |v| *v).unwrap() };
1868
1869                assert_eq!((v0, v1), (i as u64, i as u64));
1870            }
1871        }
1872
1873        #[test]
1874        fn ok_tx_overwrite_last_wins() {
1875            let (_dir, path, cfg) = new_tmp();
1876            let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
1877
1878            let mut tx = mmap.new_tx();
1879            unsafe {
1880                tx.write(0, |v| *v = 1).unwrap();
1881            }
1882
1883            tx.commit().unwrap();
1884
1885            let mut tx2 = mmap.new_tx();
1886            unsafe {
1887                tx2.write(0, |v| *v = 2).unwrap();
1888            }
1889
1890            let epoch = tx2.commit().unwrap();
1891            mmap.wait_for_durability(epoch).unwrap();
1892
1893            let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1894            assert_eq!(val, 2);
1895        }
1896
1897        #[test]
1898        fn ok_tx_persists_across_reopen() {
1899            let (_dir, path, cfg) = new_tmp();
1900
1901            {
1902                let mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
1903
1904                let mut tx = mmap.new_tx();
1905                unsafe {
1906                    tx.write(0, |v| *v = 0x3A).unwrap();
1907                    tx.write(1, |v| *v = 0x54).unwrap();
1908                }
1909
1910                let epoch = tx.commit().unwrap();
1911                mmap.wait_for_durability(epoch).unwrap();
1912            }
1913
1914            {
1915                let mmap = FrozenMMap::<u64, MID>::new(&path, cfg).unwrap();
1916
1917                let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1918                let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
1919
1920                assert_eq!((v0, v1), (0x3A, 0x54));
1921            }
1922        }
1923    }
1924
1925    mod fm_durability {
1926        use super::*;
1927
1928        #[test]
1929        fn ok_wait_then_drop() {
1930            let (_dir, path, cfg) = new_tmp();
1931            let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
1932
1933            let epoch = unsafe { mmap.write(0, |v| *v = 7).unwrap() };
1934            mmap.wait_for_durability(epoch).unwrap();
1935
1936            drop(mmap);
1937        }
1938
1939        #[test]
1940        fn ok_epoch_monotonicity() {
1941            let (_dir, path, cfg) = new_tmp();
1942            let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
1943
1944            let e1 = unsafe { mmap.write(0, |v| *v = 1).unwrap() };
1945            mmap.wait_for_durability(e1).unwrap();
1946
1947            let e2 = unsafe { mmap.write(0, |v| *v = 2).unwrap() };
1948            mmap.wait_for_durability(e2).unwrap();
1949            assert!(e2 >= e1);
1950        }
1951
1952        #[test]
1953        fn ok_wait_for_durability_with_multi_writers() {
1954            let (_dir, path, cfg) = new_tmp();
1955            let mmap = sync::Arc::new(FrozenMMap::<u64, MID>::new(path, cfg).unwrap());
1956
1957            let mut handles = Vec::new();
1958            for _ in 0..2 {
1959                let mmap = mmap.clone();
1960                handles.push(thread::spawn(move || {
1961                    let epoch = unsafe { mmap.write(0, |v| *v += 1).unwrap() };
1962                    mmap.wait_for_durability(epoch).unwrap();
1963                }));
1964            }
1965
1966            for h in handles {
1967                h.join().unwrap();
1968            }
1969
1970            let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1971            assert_eq!(val, 2);
1972        }
1973    }
1974
1975    mod fm_concurrency {
1976        use super::*;
1977
1978        #[test]
1979        fn ok_parallel_reads_with_diff_index() {
1980            let (_dir, path, cfg) = new_tmp();
1981            let mmap = sync::Arc::new(FrozenMMap::<u64, MID>::new(path, cfg).unwrap());
1982
1983            unsafe { mmap.write(0, |v| *v = 0x10).unwrap() };
1984            unsafe { mmap.write(1, |v| *v = 0x20).unwrap() };
1985
1986            let t1 = {
1987                let mmap = mmap.clone();
1988                thread::spawn(move || unsafe { mmap.read(0, |v| *v).unwrap() })
1989            };
1990
1991            let t2 = {
1992                let mmap = mmap.clone();
1993                thread::spawn(move || unsafe { mmap.read(1, |v| *v).unwrap() })
1994            };
1995
1996            assert_eq!(t1.join().unwrap(), 0x10);
1997            assert_eq!(t2.join().unwrap(), 0x20);
1998        }
1999
2000        #[test]
2001        fn ok_multi_tx_drop_then_reopen_grown() {
2002            let (_dir, path, cfg) = new_tmp();
2003
2004            {
2005                let mmap = sync::Arc::new(FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap());
2006
2007                let mut handles = Vec::new();
2008                for i in 0..2u64 {
2009                    let mmap = mmap.clone();
2010                    handles.push(thread::spawn(move || {
2011                        let _ = unsafe { mmap.write(i as usize, |v| *v = i + 1).unwrap() };
2012                    }));
2013                }
2014
2015                for i in 0..2usize {
2016                    let mmap = mmap.clone();
2017                    handles.push(thread::spawn(move || {
2018                        let _ = unsafe { mmap.read(i, |v| *v).unwrap() };
2019                    }));
2020                }
2021
2022                for h in handles {
2023                    h.join().unwrap();
2024                }
2025            }
2026
2027            {
2028                let mmap = FrozenMMap::<u64, MID>::new_grown(&path, cfg.clone(), 0x10).unwrap();
2029                assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x10);
2030
2031                for i in 0..2u64 {
2032                    let val = unsafe { mmap.read(i as usize, |v| *v).unwrap() };
2033                    assert_eq!(val, i + 1);
2034                }
2035
2036                let idx = mmap.total_slots() - 1;
2037                unsafe { mmap.write(idx, |v| *v = 0xDEAD).unwrap() };
2038
2039                let val = unsafe { mmap.read(idx, |v| *v).unwrap() };
2040                assert_eq!(val, 0xDEAD);
2041            }
2042        }
2043    }
2044}