frozen_core/fmmap/mod.rs
1//! Custom implementation of `mmap(2)`
2//!
3//! ## Constraints
4//!
5//! [`FrozenMMap`] treats the mapped file as raw storage for values of `T`. Because of that, `T` must be a POD type
6//! which is safe to persist and later reinterpret from bytes.
7//!
8//! Required properties for `T`,
9//!
10//! - Must use `#[repr(C)]`
11//! - Should be 8-bytes aligned
12//! - Must not implement [`Drop`]
13//! - `size_of::<T>()` should be multiple of `8`
14//!
15//! *NOTE:* `T` must not contain heap owning or process-local pointers like [`Vec`], [`String`], [`Box`], references
16//! and function pointers, or other fields whose bit-pattern is not stable across reopen.
17//!
18//! These constrains are enforced as [`FrozenMMap`] does not serialize or deserialize values. It directly reads and
19//! writes `T` inside a memory mapped file. That means `T` must have a stable layout and must remain valid when the file
20//! is reopened in a later process.
21//!
22//! ## Example
23//!
24//! ```
25//! use frozen_core::fmmap::{FrozenMMap, FMCfg};
26//!
27//! const MODULE_ID: u8 = 0;
28//!
29//! let dir = tempfile::tempdir().unwrap();
30//! let path = dir.path().join("tmp_frozen_mmap");
31//!
32//! let cfg = FMCfg {
33//! initial_count: 0x0A,
34//! flush_duration: std::time::Duration::from_micros(0x96),
35//! };
36//!
37//! let mmap = FrozenMMap::<u64, MODULE_ID>::new(&path, cfg.clone()).unwrap();
38//! assert_eq!(mmap.total_slots(), 0x0A);
39//!
40//! let epoch = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
41//! mmap.wait_for_durability(epoch).unwrap();
42//!
43//! let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
44//! assert_eq!(val, 0xDEADC0DE);
45//!
46//! drop(mmap);
47//!
48//! let reopened = FrozenMMap::<u64, MODULE_ID>::new_grown(&path, cfg, 0x05).unwrap();
49//! assert_eq!(reopened.total_slots(), 0x0A + 0x05);
50//!
51//! let val = unsafe { reopened.read(0, |v| *v) }.unwrap();
52//! assert_eq!(val, 0xDEADC0DE);
53//! ```
54
55#[cfg(any(target_os = "linux", target_os = "macos"))]
56mod posix;
57
58use crate::{
59 error::{ErrCode, FrozenError, FrozenResult},
60 ffile::{FFCfg, FrozenFile},
61 hints,
62};
63use std::{
64 fmt,
65 sync::{self, atomic},
66 thread, time,
67};
68
69/// Domain Id for [`FrozenMMap`] is **18**
70const ERRDOMAIN: u8 = 0x12;
71
72/// type for `epoch` used by write ops
73pub type TEpoch = u64;
74
75#[cfg(any(target_os = "linux", target_os = "macos"))]
76type TMap = posix::POSIXMMap;
77
78/// module id used for [`FrozenMMap`]
79static MID: sync::OnceLock<u8> = sync::OnceLock::new();
80
81#[cfg(not(test))]
82#[inline(always)]
83fn mid() -> &'static u8 {
84 MID.get().unwrap()
85}
86
87#[cfg(test)]
88#[inline(always)]
89fn mid() -> &'static u8 {
90 MID.get_or_init(|| 0)
91}
92
93/// Error codes for [`FrozenMMap`]
94pub(in crate::fmmap) mod err {
95 use super::ErrCode;
96
97 /// (512) internal fuck up (hault and catch fire)
98 pub const HCF: ErrCode = ErrCode::new(0x200, "hault and catch fire");
99
100 /// (513) unknown error (fallback)
101 pub const UNK: ErrCode = ErrCode::new(0x201, "unknown error");
102
103 /// (514) no more memory available
104 pub const NMM: ErrCode = ErrCode::new(0x202, "not enough memory available on the device");
105
106 /// (515) syncing error
107 pub const SYN: ErrCode = ErrCode::new(0x203, "failed to sync/flush data to storage device");
108
109 /// (516) no write/read perm
110 pub const PRM: ErrCode = ErrCode::new(0x204, "missing permissions for IO");
111
112 /// (517) flush_tx error (panic inside)
113 pub const TXE: ErrCode = ErrCode::new(0x205, "flush_tx paniced inside");
114
115 /// (518) flush_tx error (unable to spawn)
116 pub const FXE: ErrCode = ErrCode::new(0x206, "unable to spawn flush_tx");
117
118 /// (519) lock poisoned
119 pub const LPN: ErrCode = ErrCode::new(0x207, "lock poisoned internally");
120
121 /// (520) type `T` implements drop
122 pub const DRP: ErrCode = ErrCode::new(0x208, "type T must not implement `Drop`");
123
124 /// (521) type `T` is not 8 bytes aligned
125 pub const ALN: ErrCode = ErrCode::new(0x209, "type T must be 8-bytes aligned");
126
127 /// (522) `size_of::<T>()` is not multiple of 8
128 pub const SZE: ErrCode = ErrCode::new(0x20A, "`size_of::<T>()` must be multiple of 8 bytes");
129
130 /// (523) type `T` must not be zero sized
131 pub const ZRO: ErrCode = ErrCode::new(0x20B, "type T must not be zero sized");
132}
133
134#[inline]
135pub(in crate::fmmap) fn new_err<R, E: std::fmt::Display>(code: ErrCode, error: E) -> FrozenResult<R> {
136 let err = FrozenError::new_raw(*mid(), ERRDOMAIN, code, error);
137 Err(err)
138}
139
140#[inline]
141pub(in crate::fmmap) fn new_err_default<R>(code: ErrCode) -> FrozenResult<R> {
142 let err = FrozenError::new_raw(*mid(), ERRDOMAIN, code, "");
143 Err(err)
144}
145
146#[inline]
147pub(in crate::fmmap) fn new_err_raw<E: std::fmt::Display>(code: ErrCode, error: E) -> FrozenError {
148 FrozenError::new_raw(*mid(), ERRDOMAIN, code, error)
149}
150
151/// Config for [`FrozenMMap`]
152#[derive(Debug, Clone)]
153pub struct FMCfg {
154 /// Number of slots to pre-allocate when [`FrozenMMap`] is initialized
155 ///
156 /// Each slot has size of [`FrozenMMap::<T>::SLOT_SIZE`], where initial file length will
157 /// be `chunk_size * initial_count` (bytes)
158 pub initial_count: usize,
159
160 /// Time interval used by flusher tx, to batch write ops into a durable window and sync them
161 /// together, where all write ops in certain time interval falls into a single durable window
162 pub flush_duration: time::Duration,
163}
164
165/// Custom implementation of `mmap(2)`
166///
167/// ## Constraints
168///
169/// [`FrozenMMap`] treats the mapped file as raw storage for values of `T`. Because of that, `T` must be a POD type
170/// which is safe to persist and later reinterpret from bytes.
171///
172/// Required properties for `T`,
173///
174/// - Must use `#[repr(C)]`
175/// - Should be 8-bytes aligned
176/// - Must not implement [`Drop`]
177/// - `size_of::<T>()` should be multiple of `8`
178///
179/// *NOTE:* `T` must not contain heap owning or process-local pointers like [`Vec`], [`String`], [`Box`], references
180/// and function pointers, or other fields whose bit-pattern is not stable across reopen.
181///
182/// These constrains are enforced as [`FrozenMMap`] does not serialize or deserialize values. It directly reads and
183/// writes `T` inside a memory mapped file. That means `T` must have a stable layout and must remain valid when the file
184/// is reopened in a later process.
185///
186/// ## Example
187///
188/// ```
189/// use frozen_core::fmmap::{FrozenMMap, FMCfg};
190///
191/// const MODULE_ID: u8 = 0;
192///
193/// let dir = tempfile::tempdir().unwrap();
194/// let path = dir.path().join("tmp_frozen_mmap");
195///
196/// let cfg = FMCfg {
197/// initial_count: 0x0A,
198/// flush_duration: std::time::Duration::from_micros(0x96),
199/// };
200///
201/// let mmap = FrozenMMap::<u64, MODULE_ID>::new(&path, cfg.clone()).unwrap();
202/// assert_eq!(mmap.total_slots(), 0x0A);
203///
204/// let epoch = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
205/// mmap.wait_for_durability(epoch).unwrap();
206///
207/// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
208/// assert_eq!(val, 0xDEADC0DE);
209///
210/// drop(mmap);
211///
212/// let reopened = FrozenMMap::<u64, MODULE_ID>::new_grown(&path, cfg, 0x05).unwrap();
213/// assert_eq!(reopened.total_slots(), 0x0A + 0x05);
214///
215/// let val = unsafe { reopened.read(0, |v| *v) }.unwrap();
216/// assert_eq!(val, 0xDEADC0DE);
217/// ```
218#[derive(Debug)]
219pub struct FrozenMMap<T, const MODULE_ID: u8>
220where
221 T: Sized + Send + Sync,
222{
223 core: sync::Arc<Core>,
224 tx: Option<thread::JoinHandle<()>>,
225 _t: core::marker::PhantomData<T>,
226}
227
228unsafe impl<T, const MODULE_ID: u8> Send for FrozenMMap<T, MODULE_ID> where T: Sized + Send + Sync {}
229unsafe impl<T, const MODULE_ID: u8> Sync for FrozenMMap<T, MODULE_ID> where T: Sized + Send + Sync {}
230
231impl<T, const MODULE_ID: u8> FrozenMMap<T, MODULE_ID>
232where
233 T: Sized + Send + Sync,
234{
235 /// Memory space required for each slot of [`T`] in [`FrozenMMap`]
236 pub const SLOT_SIZE: usize = std::mem::size_of::<T>();
237
238 /// Create a new [`FrozenMMap`] instance w/ given [`FMCfg`]
239 ///
240 /// ## Multiple Instances
241 ///
242 /// For each [`FrozenMMap`] instance, we acquire an exclusive lock from the kernal for the underlying [`FrozenFile`],
243 /// when trying to create multiple instances of [`FrozenMMap`], an error will be thrown ///
244 ///
245 /// ## Capacity Growth
246 ///
247 /// [`FrozenMMap`] does not support in-place growth of a live mapping, to increase capacity, drop the current instance
248 /// and reopen w/ [`FrozenMMap::open_grown`] which provides memory mapping over grown capacity
249 ///
250 /// ## [`FMCfg`]
251 ///
252 /// All configs for [`FrozenMMap`] are stored in [`FMCfg`]
253 ///
254 /// ## Working
255 ///
256 /// We first create a new [`FrozenFile`] if note already, then map the entire file using `mmap(2)`,
257 /// the entire file must read/write `T`, which also should stay constant for the entire lifetime of file
258 ///
259 /// ## Important
260 ///
261 /// The `cfg` must not change any of its properties for the entire life of [`FrozenFile`], which is used under
262 /// the hood, one must use config stores like [`Rta`](https://crates.io/crates/rta) to store config
263 ///
264 /// ## Example
265 ///
266 /// ```
267 /// use frozen_core::fmmap::{FrozenMMap, FMCfg};
268 ///
269 /// const MODULE_ID: u8 = 0;
270 ///
271 /// let dir = tempfile::tempdir().unwrap();
272 /// let path = dir.path().join("tmp_frozen_mmap");
273 ///
274 /// let cfg = FMCfg {
275 /// initial_count: 0x0A,
276 /// flush_duration: std::time::Duration::from_micros(0x96),
277 /// };
278 ///
279 /// let mmap = FrozenMMap::<u64, MODULE_ID>::new(&path, cfg.clone()).unwrap();
280 /// assert_eq!(mmap.total_slots(), 0x0A);
281 ///
282 /// let epoch = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
283 /// mmap.wait_for_durability(epoch).unwrap();
284 ///
285 /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
286 /// assert_eq!(val, 0xDEADC0DE);
287 /// ```
288 pub fn new<P: AsRef<std::path::Path>>(path: P, cfg: FMCfg) -> FrozenResult<Self> {
289 Self::validate_t()?;
290 let (file, curr_length) = Self::open_file(path.as_ref().to_path_buf(), &cfg)?;
291 let total_slots = curr_length / Self::SLOT_SIZE;
292
293 // NOTE: The value is used for error logging and is initialized only once, as `OnceLock` guarantees that the
294 // first caller sets the value and all subsequent calls reuse it
295 let _ = MID.get_or_init(|| MODULE_ID);
296
297 let mmap = unsafe { TMap::new(file.fd(), curr_length) }?;
298 let core = sync::Arc::new(Core::new(mmap, file, cfg.flush_duration, curr_length, total_slots));
299
300 // INFO: we spawn the thread for background sync
301 let tx = Core::spawn_tx(core.clone())?;
302
303 Ok(Self { core, tx: Some(tx), _t: core::marker::PhantomData })
304 }
305
306 /// Create a new [`FrozenMMap`] instance w/ given [`FMCfg`], while growing the underlying [`FrozenFile`]
307 /// by `additional_slots` before creating memory mapping
308 ///
309 /// ## Multiple Instances
310 ///
311 /// For each [`FrozenMMap`] instance, we acquire an exclusive lock from the kernal for the underlying [`FrozenFile`],
312 /// when trying to create multiple instances of [`FrozenMMap`], an error will be thrown
313 ///
314 /// ## Why not create a [`FrozenMMap::grow`] call?
315 ///
316 /// Previously when [`FrozenMMap::grow`] was attempted, it was observed that, resizing an active memory mapping
317 /// in place is tricky in concurrent code, as some threads would still hold stale/unmapped pointers to mmap
318 /// due to preemption from the OS schedular
319 ///
320 /// So, instead of remmaping a live instance, the current API performs growth during open, making capacity expansion
321 /// an explicit lifecycle operation, and not a side effect on a live instance
322 ///
323 /// ## [`FMCfg`]
324 ///
325 /// All configs for [`FrozenMMap`] are stored in [`FMCfg`]
326 ///
327 /// ## Working
328 ///
329 /// We first create a new [`FrozenFile`] if note already, then we grow the file using [`FrozenFile::grow`]
330 /// then map the entire file using `mmap(2)`, the entire file must read/write `T`, which also should stay
331 /// constant for the entire lifetime of file
332 ///
333 /// ## Important
334 ///
335 /// The `cfg` must not change any of its properties for the entire life of [`FrozenFile`], which is used under
336 /// the hood, one must use config stores like [`Rta`](https://crates.io/crates/rta) to store config
337 ///
338 /// ## Example
339 ///
340 /// ```
341 /// use frozen_core::fmmap::{FrozenMMap, FMCfg};
342 ///
343 /// const MODULE_ID: u8 = 0;
344 ///
345 /// let dir = tempfile::tempdir().unwrap();
346 /// let path = dir.path().join("tmp_frozen_mmap");
347 ///
348 /// let cfg = FMCfg {
349 /// initial_count: 0x0A,
350 /// flush_duration: std::time::Duration::from_micros(0x96),
351 /// };
352 ///
353 /// let mmap = FrozenMMap::<u64, MODULE_ID>::new_grown(&path, cfg.clone(), 0x0A).unwrap();
354 /// assert_eq!(mmap.total_slots(), 0x0A * 2);
355 ///
356 /// let epoch = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
357 /// mmap.wait_for_durability(epoch).unwrap();
358 ///
359 /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
360 /// assert_eq!(val, 0xDEADC0DE);
361 /// ```
362 pub fn new_grown<P: AsRef<std::path::Path>>(path: P, cfg: FMCfg, additional_slots: usize) -> FrozenResult<Self> {
363 Self::validate_t()?;
364 let (file, _) = Self::open_file(path.as_ref().to_path_buf(), &cfg)?;
365
366 // we grow the underlying FrozenFile as requested
367 file.grow(additional_slots)?;
368 let curr_length = file.length()?; // we must read the updated (grown) length
369 let total_slots = curr_length / Self::SLOT_SIZE;
370
371 // NOTE: The value is used for error logging and is initialized only once, as `OnceLock` guarantees that the
372 // first caller sets the value and all subsequent calls reuse it
373 let _ = MID.get_or_init(|| MODULE_ID);
374
375 let mmap = unsafe { TMap::new(file.fd(), curr_length) }?;
376 let core = sync::Arc::new(Core::new(mmap, file, cfg.flush_duration, curr_length, total_slots));
377
378 // INFO: we spawn the thread for background sync
379 let tx = Core::spawn_tx(core.clone())?;
380
381 Ok(Self { core, tx: Some(tx), _t: core::marker::PhantomData })
382 }
383
384 /// Create/open a new [`FrozenFile`] instance
385 fn open_file(path: std::path::PathBuf, cfg: &FMCfg) -> FrozenResult<(FrozenFile, usize)> {
386 let ff_cfg = FFCfg { path, chunk_size: Self::SLOT_SIZE, initial_chunk_amount: cfg.initial_count };
387
388 let file = FrozenFile::new::<MODULE_ID>(ff_cfg)?;
389 let curr_length = file.length()?;
390
391 Ok((file, curr_length))
392 }
393
394 #[inline]
395 fn validate_t() -> FrozenResult<()> {
396 if std::mem::needs_drop::<T>() {
397 return new_err_default(err::DRP);
398 }
399
400 let align = std::mem::align_of::<T>();
401 if align != 8 {
402 return new_err_default(err::ALN);
403 }
404
405 let size = std::mem::size_of::<T>();
406 if size == 0 {
407 return new_err_default(err::ZRO);
408 }
409
410 if size % 8 != 0 {
411 return new_err_default(err::SZE);
412 }
413
414 Ok(())
415 }
416
417 /// Blocks until given `epoch` becomes durable
418 ///
419 /// ## Batching
420 ///
421 /// With respect to `flush_duration`, all write ops are batched before sync, which is executed by flusher tx
422 /// working in background, while each write is assigned w/ current durable epoch, and all writes which
423 /// observe the exact same epoch, belong to the same durability window, and are all sync'ed together
424 ///
425 /// When a background sync succeeds, the internal durable epoch is incremented, indicating that all writes
426 /// that observed the previous epoch are now durable on disk
427 ///
428 /// ## Example
429 ///
430 /// ```
431 /// use frozen_core::fmmap::{FrozenMMap, FMCfg};
432 ///
433 /// const MODULE_ID: u8 = 0;
434 ///
435 /// let dir = tempfile::tempdir().unwrap();
436 /// let path = dir.path().join("tmp_wait_epoch");
437 ///
438 /// let cfg = FMCfg {
439 /// initial_count: 0x04,
440 /// flush_duration: std::time::Duration::from_micros(0x60),
441 /// };
442 ///
443 /// let mmap = FrozenMMap::<u64, MODULE_ID>::new(&path, cfg).unwrap();
444 ///
445 /// let epoch = unsafe { mmap.write(0, |v| *v = 0x8A) }.unwrap();
446 /// mmap.wait_for_durability(epoch).unwrap();
447 ///
448 /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
449 /// assert_eq!(val, 0x8A);
450 /// ```
451 pub fn wait_for_durability(&self, epoch: u64) -> FrozenResult<()> {
452 if let Some(sync_err) = self.core.get_sync_error() {
453 return Err(sync_err);
454 }
455
456 let durable_epoch = self.core.durable_epoch.load(atomic::Ordering::Acquire);
457 if durable_epoch >= epoch {
458 return Ok(());
459 }
460
461 let mut guard = match self.core.durable_lock.lock() {
462 Ok(g) => g,
463 Err(e) => return new_err(err::LPN, e),
464 };
465
466 loop {
467 if let Some(sync_err) = self.core.get_sync_error() {
468 return Err(sync_err);
469 }
470
471 if self.core.durable_epoch.load(atomic::Ordering::Acquire) >= epoch {
472 return Ok(());
473 }
474
475 guard = match self.core.durable_cv.wait(guard) {
476 Ok(g) => g,
477 Err(e) => return new_err(err::LPN, e),
478 };
479 }
480 }
481
482 /// Read a `T` at given `index` via callback (`f`)
483 ///
484 /// ## Concurrency
485 ///
486 /// Internally, [`FrozenMMap`] implements per-slot locking, so concurrent reads and writes for at same index is atomic
487 /// and thread safe, while operations on different indices may proceed fully in parallel
488 ///
489 /// ## Safety
490 ///
491 /// The caller must ensure following:
492 ///
493 /// - given `index` is within bounds
494 /// - underlying memory contains a valid instance of `T`
495 /// - provided callback `f` must not,
496 /// - write through the pointer
497 /// - store or leak pointer beyound there lifetime
498 ///
499 /// Violating any of the above may result in undefined behavior
500 ///
501 /// ## Example
502 ///
503 /// ```
504 /// use frozen_core::fmmap::{FrozenMMap, FMCfg};
505 ///
506 /// const MODULE_ID: u8 = 0;
507 ///
508 /// let dir = tempfile::tempdir().unwrap();
509 /// let path = dir.path().join("tmp_read_mmap");
510 ///
511 /// let cfg = FMCfg {
512 /// initial_count: 0x02,
513 /// flush_duration: std::time::Duration::from_micros(0x60),
514 /// };
515 ///
516 /// let mmap = FrozenMMap::<u64, MODULE_ID>::new(&path, cfg).unwrap();
517 ///
518 /// let epoch = unsafe { mmap.write(0, |v| *v = 0x0A) }.unwrap();
519 /// mmap.wait_for_durability(epoch).unwrap();
520 ///
521 /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
522 /// assert_eq!(val, 0x0A);
523 /// ```
524 #[inline(always)]
525 pub unsafe fn read<R>(&self, index: usize, f: impl FnOnce(*const T) -> R) -> FrozenResult<R> {
526 let offset = Self::SLOT_SIZE * index;
527 let _lock = self.core.locks.lock(index);
528
529 // NOTE: We do avoid acquiring io_lock for read ops to increase the throughput (under the assumption of
530 // OS guarantees visibility)
531
532 let ptr = unsafe { self.core.map.as_ptr(offset) };
533 Ok(f(ptr))
534 }
535
536 /// Write/update a `T` at given `index` via callback (`f`)
537 ///
538 /// ## Concurrency
539 ///
540 /// Internally, [`FrozenMMap`] implements per-slot locking, so concurrent reads and writes for at same index is atomic
541 /// and thread safe, while operations on different indices may proceed fully in parallel
542 ///
543 /// ## Safety
544 ///
545 /// The caller must ensure following:
546 ///
547 /// - given `index` is within bounds
548 /// - underlying memory contains a valid instance of `T`
549 /// - provided callback `f` must not store or leak pointer beyound there lifetime
550 ///
551 /// Violating any of the above may result in undefined behavior
552 ///
553 /// ## Example
554 ///
555 /// ```
556 /// use frozen_core::fmmap::{FrozenMMap, FMCfg};
557 ///
558 /// const MODULE_ID: u8 = 0;
559 ///
560 /// let dir = tempfile::tempdir().unwrap();
561 /// let path = dir.path().join("tmp_write_mmap");
562 ///
563 /// let cfg = FMCfg {
564 /// initial_count: 0x02,
565 /// flush_duration: std::time::Duration::from_micros(0x96),
566 /// };
567 ///
568 /// let mmap = FrozenMMap::<u64, MODULE_ID>::new(&path, cfg).unwrap();
569 ///
570 /// let epoch = unsafe {mmap.write(1, |v| *v = 0x2B) }.unwrap();
571 /// mmap.wait_for_durability(epoch).unwrap();
572 ///
573 /// let val = unsafe { mmap.read(1, |v| *v) }.unwrap();
574 /// assert_eq!(val, 0x2B);
575 /// ```
576 #[inline(always)]
577 pub unsafe fn write(&self, index: usize, f: impl FnOnce(*mut T)) -> FrozenResult<TEpoch> {
578 // propagate prev errors
579 if let Some(err) = self.core.get_sync_error() {
580 return Err(err);
581 }
582
583 let offset = Self::SLOT_SIZE * index;
584
585 let _guard = self.core.acquire_io_lock()?;
586 let _lock = self.core.locks.lock(index);
587
588 let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
589 f(ptr);
590
591 self.core.dirty.store(true, atomic::Ordering::Release);
592 let epoch = self.core.incr_curr_epoch();
593
594 Ok(epoch)
595 }
596
597 /// Write/update a `T` at given `index` via callback (`f`) w/ instant durability
598 ///
599 /// This function performs a blocking hard-sync, unlike [`FrozenMMap::write`], the update is immediately
600 /// persisted to the underlying storage device
601 ///
602 /// ## Concurrency
603 ///
604 /// Internally, [`FrozenMMap`] implements per-slot locking, so concurrent reads and writes for at same index is atomic
605 /// and thread safe, while operations on different indices may proceed fully in parallel
606 ///
607 /// ## Safety
608 ///
609 /// The caller must ensure following:
610 ///
611 /// - given `index` is within bounds
612 /// - underlying memory contains a valid instance of `T`
613 /// - provided callback `f` must not store or leak pointer beyound there lifetime
614 ///
615 /// Violating any of the above may result in undefined behavior
616 ///
617 /// ## Example
618 ///
619 /// ```
620 /// use frozen_core::fmmap::{FrozenMMap, FMCfg};
621 ///
622 /// const MODULE_ID: u8 = 0;
623 ///
624 /// let dir = tempfile::tempdir().unwrap();
625 /// let path = dir.path().join("tmp_write_sync");
626 ///
627 /// let cfg = FMCfg {
628 /// initial_count: 0x02,
629 /// flush_duration: std::time::Duration::from_micros(0x96),
630 /// };
631 ///
632 /// let mmap = FrozenMMap::<u64, MODULE_ID>::new(&path, cfg).unwrap();
633 /// unsafe { mmap.write_sync(0, |v| *v = 0xC0DE) }.unwrap();
634 ///
635 /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
636 /// assert_eq!(val, 0xC0DE);
637 /// ```
638 #[inline(always)]
639 pub unsafe fn write_sync(&self, index: usize, f: impl FnOnce(*mut T)) -> FrozenResult<()> {
640 // propagate prev errors
641 if let Some(err) = self.core.get_sync_error() {
642 return Err(err);
643 }
644
645 let offset = Self::SLOT_SIZE * index;
646
647 // block flush_tx scheduling
648 let _flush_guard = self.core.lock.lock().map_err(|e| new_err_raw(err::LPN, e))?;
649
650 // we use exlusive lock as we perform blocking hard sync
651 let _guard = self.core.acquire_exclusive_io_lock()?;
652
653 let _lock = self.core.locks.lock(index);
654 let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
655 f(ptr);
656
657 // blocking hard sync
658 self.core.sync()?;
659
660 // NOTE: we hard sync we flushed an entier batch, so we can skip the sync via flush_tx as the
661 // current batch is durable
662
663 self.core.mark_epoch_durable();
664 let prev = self.core.dirty.swap(false, atomic::Ordering::AcqRel);
665
666 // NOTE: we must also notify cv's waiting for durability (we skip if there was no batch to sync)
667 if prev {
668 let _g = self.core.durable_lock.lock().map_err(|e| new_err_raw(err::LPN, e))?;
669 self.core.durable_cv.notify_all();
670 }
671
672 Ok(())
673 }
674
675 /// Read current available count of slots, where each slot has size of [`FrozenMMap::<T>::SLOT_SIZE`]
676 ///
677 /// ## Working
678 ///
679 /// This call performs a syscall to fetch current length of [`FrozenFile`] from fs, as the current length of the
680 /// file is not cached anywhere in the pipeline to avoid TOCTAU race conditions
681 ///
682 /// ## Example
683 ///
684 /// ```
685 /// use frozen_core::fmmap::{FrozenMMap, FMCfg};
686 ///
687 /// const MODULE_ID: u8 = 0;
688 ///
689 /// let dir = tempfile::tempdir().unwrap();
690 /// let path = dir.path().join("tmp_grow_mmap");
691 ///
692 /// let cfg = FMCfg {
693 /// initial_count: 0x02,
694 /// flush_duration: std::time::Duration::from_micros(0x96),
695 /// };
696 ///
697 /// let mmap = FrozenMMap::<u64, MODULE_ID>::new(&path, cfg.clone()).unwrap();
698 /// assert_eq!(mmap.total_slots(), 0x02);
699 ///
700 /// drop(mmap);
701 ///
702 /// let mmap = FrozenMMap::<u64, MODULE_ID>::new_grown(&path, cfg, 0x03).unwrap();
703 /// assert_eq!(mmap.total_slots(), 0x02 + 0x03);
704 /// ```
705 #[inline]
706 pub fn total_slots(&self) -> usize {
707 self.core.curr_length / Self::SLOT_SIZE
708 }
709
710 /// Read the total memory footprint (in bytes) used by [`FrozenMMap`]
711 ///
712 /// *NOTE:* This is an approzimation of memory used, actual RSS may differ depending on paging and OS
713 ///
714 /// ## Example
715 ///
716 /// ```
717 /// use frozen_core::fmmap::{FrozenMMap, FMCfg};
718 ///
719 /// const MODULE_ID: u8 = 0;
720 ///
721 /// let dir = tempfile::tempdir().unwrap();
722 /// let path = dir.path().join("tmp_mem_usage");
723 ///
724 /// let cfg = FMCfg {
725 /// initial_count: 0x10,
726 /// flush_duration: std::time::Duration::from_micros(0x60),
727 /// };
728 ///
729 /// let mmap = FrozenMMap::<u64, MODULE_ID>::new(&path, cfg).unwrap();
730 ///
731 /// let bytes = mmap.memory_usage();
732 /// assert!(bytes >= mmap.total_slots() * std::mem::size_of::<u64>());
733 /// ```
734 #[inline]
735 pub fn memory_usage(&self) -> usize {
736 // memory of mem-maped region
737 let mmap_bytes = self.core.curr_length;
738
739 // memory used for locking
740 let lock_bytes = self.core.locks.0.len() * std::mem::size_of::<atomic::AtomicU8>();
741
742 mmap_bytes + lock_bytes
743 }
744
745 /// Create a new [`FMTransaction`] context for grouping multi write ops into a single atomic operation
746 ///
747 /// ## Overview
748 ///
749 /// The use of [`FMTransaction`] allows to group multiple write ops into a single atomic operation, hence
750 /// creating a transactional write operation, which gives following guarantees,
751 ///
752 /// - All write ops succeed together
753 /// - Single epoch to track durability of all writes ops
754 /// - Same durability guarantee for all the included write ops
755 ///
756 /// Simply, this preserves atomic durability semantics for multi index updates
757 ///
758 /// ## Example
759 ///
760 /// ```
761 /// use frozen_core::fmmap::{FrozenMMap, FMCfg};
762 ///
763 /// const MID: u8 = 0;
764 ///
765 /// let dir = tempfile::tempdir().unwrap();
766 /// let path = dir.path().join("tmp_tx");
767 ///
768 /// let cfg = FMCfg {
769 /// initial_count: 0x0A,
770 /// flush_duration: std::time::Duration::from_micros(50),
771 /// };
772 ///
773 /// let mmap = FrozenMMap::<u64, MID>::new(&path, cfg).unwrap();
774 ///
775 /// let mut tx = mmap.new_tx();
776 /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
777 /// unsafe { tx.write(1, |v| *v = 0x14) }.unwrap();
778 ///
779 /// let epoch = tx.commit().unwrap();
780 /// mmap.wait_for_durability(epoch).unwrap();
781 ///
782 /// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
783 /// let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
784 ///
785 /// assert_eq!((v0, v1), (0x0A, 0x14));
786 /// ```
787 #[inline]
788 pub fn new_tx(&self) -> FMTransaction<'_, T> {
789 FMTransaction { core: &self.core, ops_vec: Vec::new() }
790 }
791
792 /// Delete the underlying [`FrozenFile`] used for [`FrozenMMap`] from fs
793 ///
794 /// ## Working
795 ///
796 /// When `delete` is called, all read, write, and (background) sync ops are paused (indefinitely),
797 /// whule deletion is done with following steps:
798 ///
799 /// - acquire an exclusive `io_lock` (all other ops are paused indefinitely)
800 /// - if any batch is pending for sync,
801 /// - swap the flag
802 /// - call sync manually
803 /// - incr epoch and update cv
804 /// - brodcast closing so flusher tx could wrap up
805 /// - `munmap(2)` current mapping
806 /// - call delete on [`FrozenFile`]
807 ///
808 /// ## Example
809 ///
810 /// ```
811 /// use frozen_core::fmmap::{FrozenMMap, FMCfg};
812 ///
813 /// const MODULE_ID: u8 = 0;
814 ///
815 /// let dir = tempfile::tempdir().unwrap();
816 /// let path = dir.path().join("tmp_delete_mmap");
817 ///
818 /// let cfg = FMCfg {
819 /// initial_count: 0x04,
820 /// flush_duration: std::time::Duration::from_micros(0x96),
821 /// };
822 ///
823 /// let mut mmap = FrozenMMap::<u64, MODULE_ID>::new(&path, cfg).unwrap();
824 /// mmap.delete().unwrap();
825 /// assert!(!path.exists());
826 /// ```
827 pub fn delete(&mut self) -> FrozenResult<()> {
828 // NOTE: we must broadcast that the close is happening to allow flusher tx to wrap up
829 self.core.dirty.store(false, atomic::Ordering::Release);
830 self.core.closed.store(true, atomic::Ordering::Release);
831 self.core.durable_cv.notify_one();
832
833 if let Some(handle) = self.tx.take() {
834 let _ = handle.join();
835 }
836
837 // pause all new write ops
838 let _lock = self.core.acquire_exclusive_io_lock()?;
839
840 self.munmap()?;
841 self.core.file.delete()
842 }
843
844 #[inline]
845 fn munmap(&self) -> FrozenResult<()> {
846 let length = self.core.curr_length;
847 unsafe { self.core.map.unmap(length) }
848 }
849}
850
851impl<T, const MODULE_ID: u8> Drop for FrozenMMap<T, MODULE_ID>
852where
853 T: Sized + Send + Sync,
854{
855 fn drop(&mut self) {
856 let is_closed = self.core.closed.swap(true, atomic::Ordering::Release);
857 self.core.cv.notify_one(); // notify flusher tx to shut
858
859 if let Some(handle) = self.tx.take() {
860 let _ = handle.join();
861 }
862
863 // we must acquire an exclusive lock, to prevent dropping while sync, growing or any io ops
864 let _io_lock = self.core.acquire_exclusive_io_lock();
865
866 // free up the boxed error (if any)
867 let ptr = self.core.error.swap(std::ptr::null_mut(), atomic::Ordering::AcqRel);
868 if !ptr.is_null() {
869 unsafe {
870 drop(Box::from_raw(ptr));
871 }
872 }
873
874 if !is_closed {
875 let _ = self.munmap();
876 }
877 }
878}
879
880impl<T, const MODULE_ID: u8> fmt::Display for FrozenMMap<T, MODULE_ID>
881where
882 T: Sized + Send + Sync,
883{
884 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
885 write!(
886 f,
887 "FrozenMMap{{fd: {}, total_slots: {}, len: {}}}",
888 self.core.file.fd(),
889 self.total_slots(),
890 self.core.curr_length,
891 )
892 }
893}
894
895/// A context for grouping multi write ops into a single atomic operation
896///
897/// ## Overview
898///
899/// Use of [`FMTransaction`] allows to group multiple write ops into a single atomic operation.
900///
901/// - All included writes are applied together
902/// - Single epoch is assinged for an entier transaction
903/// - Durability guarantee is same for all included write ops
904///
905/// ## Example
906///
907/// ```
908/// use frozen_core::fmmap::{FrozenMMap, FMCfg};
909///
910/// const MID: u8 = 0;
911///
912/// let dir = tempfile::tempdir().unwrap();
913/// let path = dir.path().join("tmp_tx");
914///
915/// let cfg = FMCfg {
916/// initial_count: 0x0A,
917/// flush_duration: std::time::Duration::from_micros(50),
918/// };
919///
920/// let mmap = FrozenMMap::<u64, MID>::new(&path, cfg).unwrap();
921///
922/// let mut tx = mmap.new_tx();
923/// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
924/// unsafe { tx.write(1, |v| *v = 0x14) }.unwrap();
925/// unsafe { tx.write(2, |v| *v = 0x18) }.unwrap();
926///
927/// let epoch = tx.commit().unwrap();
928/// mmap.wait_for_durability(epoch).unwrap();
929///
930/// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
931/// let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
932/// let v2 = unsafe { mmap.read(2, |v| *v).unwrap() };
933///
934/// assert_eq!((v0, v1, v2), (0x0A, 0x14, 0x18));
935/// ```
936pub struct FMTransaction<'a, T> {
937 core: &'a Core,
938 ops_vec: Vec<(usize, Box<dyn FnOnce(*mut T) + 'a>)>,
939}
940
941impl<'a, T> FMTransaction<'a, T> {
942 /// Append a write op into the [`FMTransaction`]
943 ///
944 /// ## Requirements
945 ///
946 /// Write ops must follow these safety requirements,
947 ///
948 /// - No duplicate indices
949 /// - No out-of-order writes (must be incremental)
950 ///
951 /// Violating this constraint would result in [`FrozenError`]
952 ///
953 /// ## Safety
954 ///
955 /// Same safety requirements as [`FrozenMMap::write`] apply here
956 ///
957 /// ## Example
958 ///
959 /// ```
960 /// use frozen_core::fmmap::{FrozenMMap, FMCfg};
961 ///
962 /// const MID: u8 = 0;
963 ///
964 /// let dir = tempfile::tempdir().unwrap();
965 /// let path = dir.path().join("tmp_tx");
966 ///
967 /// let cfg = FMCfg {
968 /// initial_count: 0x10,
969 /// flush_duration: std::time::Duration::from_micros(50),
970 /// };
971 ///
972 /// let mmap = FrozenMMap::<u64, MID>::new(&path, cfg).unwrap();
973 ///
974 /// let mut tx = mmap.new_tx();
975 /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
976 /// unsafe { tx.write(1, |v| *v = 0x0B) }.unwrap();
977 /// unsafe { tx.write(2, |v| *v = 0x0C) }.unwrap();
978 ///
979 /// let epoch = tx.commit().unwrap();
980 /// mmap.wait_for_durability(epoch).unwrap();
981 ///
982 /// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
983 /// let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
984 /// let v2 = unsafe { mmap.read(2, |v| *v).unwrap() };
985 ///
986 /// assert_eq!((v0, v1, v2), (0x0A, 0x0B, 0x0C));
987 /// ```
988 #[inline(always)]
989 pub unsafe fn write<F>(&mut self, index: usize, f: F) -> FrozenResult<()>
990 where
991 F: FnOnce(*mut T) + 'a,
992 {
993 // NOTE:
994 //
995 // This check prevents a potential footgun! For a safe transaction all writes must be,
996 // - ordered by index (either incr or decr)
997 // - no multi writes on same index
998 //
999 // If any of these is violated, there is a certain risk of deadlock in multi tx env's
1000 if let Some((last_idx, _)) = self.ops_vec.last() {
1001 if index <= *last_idx {
1002 return new_err(
1003 err::HCF,
1004 "tx writes must be strictly ordered, with no more then single ops on given index",
1005 );
1006 }
1007 }
1008
1009 self.ops_vec.push((index, Box::new(f)));
1010 Ok(())
1011 }
1012
1013 /// Commit the transaction, applying all the writes ops, combined into a single atomic operation
1014 ///
1015 /// ## Guarantees
1016 ///
1017 /// - All writes are applied under a single epoch
1018 /// - All writes belong to the same durability batch
1019 /// - No interleaving with other transactions at epoch level
1020 ///
1021 /// ## Example
1022 ///
1023 /// ```
1024 /// use frozen_core::fmmap::{FrozenMMap, FMCfg};
1025 ///
1026 /// const MID: u8 = 0;
1027 ///
1028 /// let dir = tempfile::tempdir().unwrap();
1029 /// let path = dir.path().join("tmp_tx");
1030 ///
1031 /// let cfg = FMCfg {
1032 /// initial_count: 0x10,
1033 /// flush_duration: std::time::Duration::from_micros(50),
1034 /// };
1035 ///
1036 /// let mmap = FrozenMMap::<u64, MID>::new(&path, cfg).unwrap();
1037 ///
1038 /// let mut tx = mmap.new_tx();
1039 /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
1040 /// unsafe { tx.write(2, |v| *v = 0x0C) }.unwrap();
1041 ///
1042 /// let epoch = tx.commit().unwrap();
1043 /// mmap.wait_for_durability(epoch).unwrap();
1044 ///
1045 /// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1046 /// let v1 = unsafe { mmap.read(2, |v| *v).unwrap() };
1047 ///
1048 /// assert_eq!((v0, v1), (0x0A, 0x0C));
1049 /// ```
1050 #[inline(always)]
1051 pub fn commit(self) -> FrozenResult<u64> {
1052 if let Some(err) = self.core.get_sync_error() {
1053 return Err(err);
1054 }
1055
1056 let _guard = self.core.acquire_io_lock()?;
1057
1058 // NOTE: we must acquire all locks beforehand, to make sure all the write go through
1059 let mut guards = Vec::with_capacity(self.ops_vec.len());
1060 for (idx, _) in &self.ops_vec {
1061 guards.push(self.core.locks.lock(*idx));
1062 }
1063
1064 for (idx, op) in self.ops_vec {
1065 let offset = idx * std::mem::size_of::<T>();
1066 let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
1067 op(ptr);
1068 }
1069
1070 self.core.dirty.store(true, atomic::Ordering::Release);
1071 let epoch = self.core.incr_curr_epoch();
1072
1073 Ok(epoch)
1074 }
1075}
1076
1077#[derive(Debug)]
1078struct Core {
1079 map: TMap,
1080 locks: Locks,
1081 file: FrozenFile,
1082 cv: sync::Condvar,
1083 curr_length: usize,
1084 lock: sync::Mutex<()>,
1085 io_lock: sync::RwLock<()>,
1086 dirty: atomic::AtomicBool,
1087 durable_cv: sync::Condvar,
1088 closed: atomic::AtomicBool,
1089 durable_lock: sync::Mutex<()>,
1090 flush_duration: time::Duration,
1091 current_epoch: atomic::AtomicU64,
1092 durable_epoch: atomic::AtomicU64,
1093 error: atomic::AtomicPtr<sync::Arc<FrozenError>>,
1094}
1095
1096unsafe impl Send for Core {}
1097unsafe impl Sync for Core {}
1098
1099impl Core {
1100 fn new(
1101 map: TMap,
1102 file: FrozenFile,
1103 flush_duration: time::Duration,
1104 curr_length: usize,
1105 total_slots: usize,
1106 ) -> Self {
1107 Self {
1108 map,
1109 file,
1110 curr_length,
1111 flush_duration,
1112 cv: sync::Condvar::new(),
1113 lock: sync::Mutex::new(()),
1114 io_lock: sync::RwLock::new(()),
1115 locks: Locks::new(total_slots),
1116 durable_cv: sync::Condvar::new(),
1117 durable_lock: sync::Mutex::new(()),
1118 dirty: atomic::AtomicBool::new(false),
1119 closed: atomic::AtomicBool::new(false),
1120 current_epoch: atomic::AtomicU64::new(0),
1121 durable_epoch: atomic::AtomicU64::new(0),
1122 error: atomic::AtomicPtr::new(std::ptr::null_mut()),
1123 }
1124 }
1125
1126 #[inline]
1127 fn sync(&self) -> FrozenResult<()> {
1128 unsafe { self.map.sync(self.curr_length) }?;
1129 self.file.sync()
1130 }
1131
1132 #[inline(always)]
1133 fn set_sync_error(&self, err: FrozenError) {
1134 let boxed = Box::into_raw(Box::new(sync::Arc::new(err)));
1135 let old = self.error.swap(boxed, atomic::Ordering::AcqRel);
1136
1137 // NOTE: we must free the old error, if any, to avoid mem leaks
1138 if !old.is_null() {
1139 unsafe { drop(Box::from_raw(old)) };
1140 }
1141 }
1142
1143 #[inline(always)]
1144 fn get_sync_error(&self) -> Option<FrozenError> {
1145 let ptr = self.error.load(atomic::Ordering::Acquire);
1146 if hints::likely(ptr.is_null()) {
1147 return None;
1148 }
1149
1150 let arc = unsafe { &*ptr }.clone();
1151 Some((*arc).clone())
1152 }
1153
1154 #[inline]
1155 fn clear_sync_error(&self) {
1156 let old = self.error.swap(std::ptr::null_mut(), atomic::Ordering::AcqRel);
1157 if hints::unlikely(!old.is_null()) {
1158 unsafe {
1159 drop(Box::from_raw(old));
1160 }
1161 }
1162 }
1163
1164 #[inline]
1165 fn acquire_io_lock(&self) -> FrozenResult<sync::RwLockReadGuard<'_, ()>> {
1166 self.io_lock.read().map_err(|e| new_err_raw(err::LPN, e))
1167 }
1168
1169 #[inline]
1170 fn acquire_exclusive_io_lock(&self) -> FrozenResult<sync::RwLockWriteGuard<'_, ()>> {
1171 self.io_lock.write().map_err(|e| new_err_raw(err::LPN, e))
1172 }
1173
1174 #[inline]
1175 fn incr_curr_epoch(&self) -> u64 {
1176 self.current_epoch.fetch_add(1, atomic::Ordering::Release) + 1
1177 }
1178
1179 #[inline]
1180 fn mark_epoch_durable(&self) {
1181 let curr_epoch = self.current_epoch.load(atomic::Ordering::Acquire);
1182 self.durable_epoch.store(curr_epoch, atomic::Ordering::Release);
1183 }
1184
1185 fn spawn_tx(core: sync::Arc<Self>) -> FrozenResult<thread::JoinHandle<()>> {
1186 match thread::Builder::new().name("fm-flush-tx".into()).spawn(move || Self::flush_tx(core)) {
1187 Ok(tx) => Ok(tx),
1188 Err(error) => new_err(err::FXE, error),
1189 }
1190 }
1191
1192 fn flush_tx(core: sync::Arc<Self>) {
1193 // init phase (acquiring locks)
1194 let mut guard = match core.lock.lock() {
1195 Ok(g) => g,
1196 Err(error) => {
1197 core.set_sync_error(new_err_raw(err::FXE, error));
1198 core.cv.notify_all();
1199 return;
1200 }
1201 };
1202
1203 // sync loop w/ non-busy waiting
1204 loop {
1205 guard = match core.cv.wait_timeout(guard, core.flush_duration) {
1206 Ok((g, _)) => g,
1207 Err(e) => {
1208 core.set_sync_error(new_err_raw(err::TXE, e));
1209 core.cv.notify_all();
1210 return;
1211 }
1212 };
1213
1214 // NOTE: we must read values of close brodcast before acquire exclusive lock,
1215 // if done otherwise, we impose serious deadlock sort of situation for the the flusher tx
1216
1217 let dirty = core.dirty.swap(false, atomic::Ordering::AcqRel);
1218 let closing = core.closed.load(atomic::Ordering::Acquire);
1219
1220 if !dirty {
1221 if closing {
1222 core.cv.notify_all();
1223 return;
1224 }
1225
1226 continue;
1227 }
1228
1229 // INFO: we must acquire an exclusive IO lock for sync, hence no write, read or
1230 // grow could kick in while sync is in progress
1231
1232 let io_lock = match core.acquire_exclusive_io_lock() {
1233 Ok(lock) => lock,
1234 Err(e) => {
1235 core.set_sync_error(e);
1236 core.cv.notify_all();
1237 return;
1238 }
1239 };
1240
1241 // INFO: we must drop the guard before syscall, as its a blocking operation and holding
1242 // the mutex while the syscall takes place is not a good idea, while we drop the mutex
1243 // and acqurie it again, in-between other process could acquire it and use it
1244 drop(guard);
1245
1246 // NOTE: We snapshot `current_epoch` before sync to establish a strict batch boundary, all writes
1247 // up to `target_epoch` are guaranteed to be part of this flush window, while anything beyound belongs
1248 // to the next batch
1249 let target_epoch = core.current_epoch.load(atomic::Ordering::Acquire);
1250
1251 // NOTE:
1252 //
1253 // - if sync fails, we update the Core::error w/ the received error object
1254 // - we clear it up when another sync call succeeds
1255 // - this is valid, as the underlying sync flushes entire mmaped region, hence
1256 // even if the last call failed, and the new one succeeded, we do get the durability
1257 // guarenty for the old data as well
1258
1259 match core.sync() {
1260 Ok(_) => {
1261 core.durable_epoch.store(target_epoch, atomic::Ordering::Release);
1262
1263 let _g = match core.durable_lock.lock() {
1264 Ok(g) => g,
1265 Err(e) => {
1266 core.set_sync_error(new_err_raw(err::LPN, e));
1267 return;
1268 }
1269 };
1270
1271 core.clear_sync_error();
1272 core.durable_cv.notify_all();
1273 }
1274 Err(err) => {
1275 core.set_sync_error(err);
1276 core.durable_cv.notify_all();
1277 }
1278 }
1279
1280 drop(io_lock);
1281 guard = match core.lock.lock() {
1282 Ok(g) => g,
1283 Err(e) => {
1284 core.set_sync_error(new_err_raw(err::LPN, e));
1285 core.durable_cv.notify_all();
1286 return;
1287 }
1288 };
1289 }
1290 }
1291}
1292
1293#[derive(Debug)]
1294struct Locks(Box<[atomic::AtomicU8]>);
1295
1296impl Locks {
1297 const LOCK: u8 = 1;
1298 const UNLOCK: u8 = 0;
1299
1300 const L1_CONTENTION: usize = 0x10;
1301 const L2_CONTENTION: usize = 0x20;
1302
1303 fn new(cap: usize) -> Self {
1304 let mut slots = Vec::with_capacity(cap);
1305 for _ in 0..cap {
1306 slots.push(atomic::AtomicU8::new(Self::UNLOCK));
1307 }
1308
1309 Self(slots.into_boxed_slice())
1310 }
1311
1312 #[inline(always)]
1313 fn lock(&self, index: usize) -> LockGuard<'_> {
1314 let val = &self.0[index];
1315 let mut spins = 0;
1316
1317 loop {
1318 if val
1319 .compare_exchange_weak(Self::UNLOCK, Self::LOCK, atomic::Ordering::Acquire, atomic::Ordering::Relaxed)
1320 .is_ok()
1321 {
1322 return LockGuard(val);
1323 }
1324
1325 // we must backoff under contention
1326
1327 // NOTE:
1328 //
1329 // We have three levels of backoff,
1330 //
1331 // 1. Busy waiting w/ CPU hint
1332 // 2. Willingly allow preemption by OS schedular
1333 // 3. Sleep the thread (increasing w/ exponenial factor)
1334
1335 if hints::likely(spins < Self::L1_CONTENTION) {
1336 std::hint::spin_loop();
1337 } else if spins < Self::L2_CONTENTION {
1338 thread::yield_now();
1339 } else {
1340 let ns = 0x30 << (spins - Self::L2_CONTENTION).min(0x0A);
1341 thread::sleep(time::Duration::from_nanos(ns));
1342 }
1343
1344 spins += 1;
1345 }
1346 }
1347}
1348
1349struct LockGuard<'a>(&'a atomic::AtomicU8);
1350
1351impl Drop for LockGuard<'_> {
1352 fn drop(&mut self) {
1353 self.0.store(Locks::UNLOCK, atomic::Ordering::Release);
1354 }
1355}
1356
1357#[cfg(test)]
1358mod tests {
1359 use super::*;
1360
1361 // NOTE: we keep this small on purpose, so we won't have to wait at all in tests
1362 const FLUSH_DURATION: time::Duration = time::Duration::from_micros(10);
1363
1364 const MID: u8 = 0;
1365 const INIT_SLOTS: usize = 0x0A;
1366
1367 fn new_tmp() -> (tempfile::TempDir, std::path::PathBuf, FMCfg) {
1368 let dir = tempfile::tempdir().unwrap();
1369 let path = dir.path().join("tmp_map");
1370
1371 let cfg = FMCfg { initial_count: INIT_SLOTS, flush_duration: FLUSH_DURATION };
1372
1373 (dir, path, cfg)
1374 }
1375
1376 mod fm_lifecycle {
1377 use super::*;
1378
1379 #[test]
1380 fn ok_new() {
1381 let (_dir, path, cfg) = new_tmp();
1382 let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
1383
1384 assert_eq!(mmap.core.flush_duration, FLUSH_DURATION);
1385 assert!(!mmap.core.dirty.load(atomic::Ordering::Acquire));
1386 assert!(!mmap.core.closed.load(atomic::Ordering::Acquire));
1387 assert_eq!(mmap.core.durable_epoch.load(atomic::Ordering::Acquire), 0);
1388 assert_eq!(mmap.core.curr_length, INIT_SLOTS * FrozenMMap::<u64, MID>::SLOT_SIZE);
1389
1390 // satisfies the bg thread was spawned correctly
1391 assert!(mmap.core.error.load(atomic::Ordering::Acquire).is_null());
1392
1393 // satisfies wait on epoch works
1394 let epoch = unsafe { mmap.write(0, |f| *f = 0x0A).unwrap() };
1395 assert!(mmap.wait_for_durability(epoch).is_ok());
1396 }
1397
1398 #[test]
1399 fn ok_new_existing() {
1400 let (_dir, path, cfg) = new_tmp();
1401
1402 // create new + close
1403 let mmap1 = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
1404 drop(mmap1);
1405
1406 // open existing
1407 let mmap2 = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
1408 drop(mmap2);
1409 }
1410
1411 #[test]
1412 fn err_new_when_change_in_cfg() {
1413 let (_dir, path, mut cfg) = new_tmp();
1414
1415 // create new + close
1416 let mmap1 = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
1417 drop(mmap1);
1418
1419 // update cfg + opne existing
1420 cfg.initial_count = INIT_SLOTS * 2;
1421 assert!(FrozenMMap::<u64, MID>::new(path, cfg).is_err());
1422 }
1423
1424 #[test]
1425 fn ok_delete() {
1426 let (_dir, path, cfg) = new_tmp();
1427 let mut mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
1428
1429 mmap.delete().unwrap();
1430 assert!(!mmap.core.file.exists().unwrap());
1431 }
1432
1433 #[test]
1434 fn err_delete_after_delete() {
1435 let (_dir, path, cfg) = new_tmp();
1436 let mut mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
1437
1438 mmap.delete().unwrap();
1439 assert!(!mmap.core.file.exists().unwrap());
1440 assert!(mmap.delete().is_err());
1441 }
1442
1443 #[test]
1444 fn ok_drop_persists_when_dropped_before_bg_flush() {
1445 let (_dir, path, cfg) = new_tmp();
1446 const VAL: u64 = 0x0A;
1447
1448 {
1449 let mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
1450 unsafe { mmap.write(0, |byte| *byte = VAL).unwrap() };
1451 drop(mmap);
1452 }
1453
1454 {
1455 let mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
1456 let val = unsafe { mmap.read(0, |byte| *byte).unwrap() };
1457 assert_eq!(val, VAL);
1458 }
1459 }
1460 }
1461
1462 mod fm_validate_t {
1463 use super::*;
1464
1465 #[repr(C, align(8))]
1466 struct OkT {
1467 a: u64,
1468 b: u64,
1469 }
1470
1471 #[repr(C)]
1472 struct BadAlignT {
1473 a: u32,
1474 b: u32,
1475 }
1476
1477 #[repr(C, align(4))]
1478 struct BadSizeT {
1479 a: u32,
1480 b: u16,
1481 }
1482
1483 #[repr(C, align(8))]
1484 struct DropT(u64);
1485
1486 impl Drop for DropT {
1487 fn drop(&mut self) {}
1488 }
1489
1490 #[repr(C, align(8))]
1491 struct ZstT;
1492
1493 #[test]
1494 fn ok_validate_t() {
1495 assert!(FrozenMMap::<OkT, MID>::validate_t().is_ok());
1496 }
1497
1498 #[test]
1499 fn err_validate_t_when_drop() {
1500 assert!(FrozenMMap::<DropT, MID>::validate_t().is_err());
1501 }
1502
1503 #[test]
1504 fn err_validate_t_when_not_8_byte_aligned() {
1505 assert!(FrozenMMap::<BadAlignT, MID>::validate_t().is_err());
1506 }
1507
1508 #[test]
1509 fn err_validate_t_when_size_not_multiple_of_8() {
1510 assert!(FrozenMMap::<BadSizeT, MID>::validate_t().is_err());
1511 }
1512
1513 #[test]
1514 fn err_validate_t_when_zero_sized() {
1515 assert!(FrozenMMap::<ZstT, MID>::validate_t().is_err());
1516 }
1517
1518 #[test]
1519 fn err_new_when_t_implements_drop() {
1520 let (_dir, path, cfg) = new_tmp();
1521 assert!(FrozenMMap::<DropT, MID>::new(path, cfg).is_err());
1522 }
1523
1524 #[test]
1525 fn err_new_when_t_is_not_8_byte_aligned() {
1526 let (_dir, path, cfg) = new_tmp();
1527 assert!(FrozenMMap::<BadAlignT, MID>::new(path, cfg).is_err());
1528 }
1529
1530 #[test]
1531 fn err_new_when_t_size_is_not_multiple_of_8() {
1532 let (_dir, path, cfg) = new_tmp();
1533 assert!(FrozenMMap::<BadSizeT, MID>::new(path, cfg).is_err());
1534 }
1535
1536 #[test]
1537 fn err_new_when_t_is_zero_sized() {
1538 let (_dir, path, cfg) = new_tmp();
1539 assert!(FrozenMMap::<ZstT, MID>::new(path, cfg).is_err());
1540 }
1541
1542 #[test]
1543 fn err_new_grown_when_t_implements_drop() {
1544 let (_dir, path, cfg) = new_tmp();
1545 assert!(FrozenMMap::<DropT, MID>::new_grown(path, cfg, 1).is_err());
1546 }
1547
1548 #[test]
1549 fn err_new_grown_when_t_is_not_8_byte_aligned() {
1550 let (_dir, path, cfg) = new_tmp();
1551 assert!(FrozenMMap::<BadAlignT, MID>::new_grown(path, cfg, 1).is_err());
1552 }
1553
1554 #[test]
1555 fn err_new_grown_when_t_size_is_not_multiple_of_8() {
1556 let (_dir, path, cfg) = new_tmp();
1557 assert!(FrozenMMap::<BadSizeT, MID>::new_grown(path, cfg, 1).is_err());
1558 }
1559
1560 #[test]
1561 fn err_new_grown_when_t_is_zero_sized() {
1562 let (_dir, path, cfg) = new_tmp();
1563 assert!(FrozenMMap::<ZstT, MID>::new_grown(path, cfg, 1).is_err());
1564 }
1565 }
1566
1567 mod fm_new_grown {
1568 use super::*;
1569
1570 #[test]
1571 fn ok_new_grown_updates_length() {
1572 let (_dir, path, cfg) = new_tmp();
1573
1574 let mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
1575 assert_eq!(mmap.total_slots(), INIT_SLOTS);
1576 drop(mmap);
1577
1578 let mmap = FrozenMMap::<u64, MID>::new_grown(path, cfg, 0x0A).unwrap();
1579 assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x0A);
1580 assert_eq!(mmap.core.curr_length, (INIT_SLOTS + 0x0A) * FrozenMMap::<u64, MID>::SLOT_SIZE);
1581 }
1582
1583 #[test]
1584 fn err_new_grown_with_preexisting_instance() {
1585 let (_dir, path, cfg) = new_tmp();
1586
1587 let mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
1588 assert_eq!(mmap.total_slots(), INIT_SLOTS);
1589
1590 assert!(FrozenMMap::<u64, MID>::new_grown(path, cfg, 0x0A).is_err());
1591 }
1592
1593 #[test]
1594 fn ok_new_grown_cycle() {
1595 let (_dir, path, cfg) = new_tmp();
1596
1597 let mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
1598 drop(mmap);
1599
1600 let mmap = FrozenMMap::<u64, MID>::new_grown(&path, cfg.clone(), 0x100).unwrap();
1601 assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x100);
1602 drop(mmap);
1603
1604 let mmap = FrozenMMap::<u64, MID>::new_grown(&path, cfg.clone(), 0x100).unwrap();
1605 assert_eq!(mmap.total_slots(), INIT_SLOTS + (2 * 0x100));
1606 drop(mmap);
1607
1608 let mmap = FrozenMMap::<u64, MID>::new_grown(path, cfg, 0x100).unwrap();
1609 assert_eq!(mmap.total_slots(), INIT_SLOTS + (3 * 0x100));
1610 }
1611
1612 #[test]
1613 fn ok_write_reopen_grown_read() {
1614 let (_dir, path, cfg) = new_tmp();
1615
1616 {
1617 let mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
1618 unsafe { mmap.write(0, |v| *v = 0xAA).unwrap() };
1619 }
1620
1621 {
1622 let mmap = FrozenMMap::<u64, MID>::new_grown(&path, cfg.clone(), 0x10).unwrap();
1623 unsafe { mmap.write(0, |v| *v = 0xBB).unwrap() };
1624
1625 let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1626 assert_eq!(val, 0xBB);
1627 }
1628 }
1629
1630 #[test]
1631 fn ok_write_reopen_grown_read_cycle() {
1632 let (_dir, path, cfg) = new_tmp();
1633
1634 {
1635 let mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
1636 unsafe { mmap.write(0, |v| *v = 1).unwrap() };
1637 }
1638
1639 for i in 0..2 {
1640 let mmap = FrozenMMap::<u64, MID>::new_grown(&path, cfg.clone(), 0x10).unwrap();
1641 let idx = mmap.total_slots() - 1;
1642 unsafe { mmap.write(idx, |v| *v = (i + 2) as u64).unwrap() };
1643 drop(mmap);
1644 }
1645
1646 let mmap = FrozenMMap::<u64, MID>::new(&path, cfg).unwrap();
1647
1648 let base = unsafe { mmap.read(0, |v| *v).unwrap() };
1649 assert_eq!(base, 1);
1650
1651 let last_idx = mmap.total_slots() - 1;
1652 let last = unsafe { mmap.read(last_idx, |v| *v).unwrap() };
1653 assert_eq!(last, 3);
1654 }
1655
1656 #[test]
1657 fn err_new_grown_while_previous_instance_is_alive() {
1658 let (_dir, path, cfg) = new_tmp();
1659
1660 let _mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
1661 let reopened = FrozenMMap::<u64, MID>::new_grown(&path, cfg, 0x10);
1662
1663 assert!(reopened.is_err());
1664 }
1665 }
1666
1667 mod fm_write_read {
1668 use super::*;
1669
1670 #[test]
1671 fn ok_write_wait_read_cycle() {
1672 const VAL: u64 = 0xDEADC0DE;
1673 let (_dir, path, cfg) = new_tmp();
1674 let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
1675
1676 // write + sync
1677 let epoch = unsafe { mmap.write(0, |ptr| *ptr = VAL).unwrap() };
1678 mmap.wait_for_durability(epoch).unwrap();
1679
1680 // read + verify
1681 let val = unsafe { mmap.read(0, |ptr| *ptr).unwrap() };
1682 assert_eq!(val, VAL);
1683 }
1684
1685 #[test]
1686 fn ok_write_read_without_wait() {
1687 const VAL: u64 = 0xDEADC0DE;
1688 let (_dir, path, cfg) = new_tmp();
1689 let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
1690
1691 unsafe { mmap.write(0, |ptr| *ptr = VAL).unwrap() };
1692 let val = unsafe { mmap.read(0, |ptr| *ptr).unwrap() };
1693 assert_eq!(val, VAL);
1694 }
1695 }
1696
1697 mod fm_write_sync_read {
1698 use super::*;
1699
1700 #[test]
1701 fn ok_write_sync_read() {
1702 let (_dir, path, cfg) = new_tmp();
1703 let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
1704
1705 unsafe { mmap.write_sync(0, |v| *v = 0x4C).unwrap() };
1706
1707 let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1708 assert_eq!(val, 0x4C);
1709 }
1710
1711 #[test]
1712 fn ok_write_sync_wait_returns_immediately() {
1713 let (_dir, path, cfg) = new_tmp();
1714 let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
1715
1716 let _ = unsafe { mmap.write_sync(0, |v| *v = 0x6A).unwrap() };
1717
1718 let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1719 assert_eq!(val, 0x6A);
1720 }
1721
1722 #[test]
1723 fn ok_write_sync_followed_by_async_write() {
1724 let (_dir, path, cfg) = new_tmp();
1725
1726 let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
1727 unsafe { mmap.write_sync(0, |v| *v = 0x0A).unwrap() };
1728
1729 let epoch = unsafe { mmap.write(0, |v| *v = 0x14).unwrap() };
1730 mmap.wait_for_durability(epoch).unwrap();
1731
1732 let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1733 assert_eq!(val, 0x14);
1734 }
1735
1736 #[test]
1737 fn ok_write_sync_makes_prev_batch_durable() {
1738 let (_dir, path, cfg) = new_tmp();
1739 let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
1740
1741 let async_epoch = unsafe { mmap.write(0, |v| *v = 1).unwrap() };
1742 unsafe { mmap.write_sync(1, |v| *v = 2).unwrap() };
1743 mmap.wait_for_durability(async_epoch).unwrap();
1744
1745 let v1 = unsafe { mmap.read(0, |v| *v).unwrap() };
1746 let v2 = unsafe { mmap.read(1, |v| *v).unwrap() };
1747
1748 assert_eq!(v1, 1);
1749 assert_eq!(v2, 2);
1750 }
1751
1752 #[test]
1753 fn ok_write_sync_persists_across_reopen() {
1754 let (_dir, path, cfg) = new_tmp();
1755 const VAL: u64 = 0x1000;
1756
1757 {
1758 let mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
1759 unsafe { mmap.write_sync(0, |v| *v = VAL).unwrap() };
1760 }
1761
1762 {
1763 let mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
1764 let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1765 assert_eq!(val, VAL);
1766 }
1767 }
1768 }
1769
1770 mod fm_tx {
1771 use super::*;
1772
1773 #[test]
1774 fn ok_tx_basic_multi_write() {
1775 let (_dir, path, cfg) = new_tmp();
1776 let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
1777
1778 let mut tx = mmap.new_tx();
1779 unsafe {
1780 tx.write(0, |v| *v = 1).unwrap();
1781 tx.write(1, |v| *v = 2).unwrap();
1782 tx.write(2, |v| *v = 3).unwrap();
1783 }
1784
1785 let epoch = tx.commit().unwrap();
1786 mmap.wait_for_durability(epoch).unwrap();
1787
1788 let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1789 let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
1790 let v2 = unsafe { mmap.read(2, |v| *v).unwrap() };
1791
1792 assert_eq!((v0, v1, v2), (1, 2, 3));
1793 }
1794
1795 #[test]
1796 fn ok_tx_single_epoch() {
1797 let (_dir, path, cfg) = new_tmp();
1798 let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
1799
1800 let mut tx = mmap.new_tx();
1801 unsafe {
1802 tx.write(0, |v| *v = 0x0A).unwrap();
1803 tx.write(1, |v| *v = 0x14).unwrap();
1804 }
1805
1806 // NOTE: next write must have strictly higher epoch then current one
1807
1808 let epoch = tx.commit().unwrap();
1809 let next_epoch = unsafe { mmap.write(2, |v| *v = 0x1E).unwrap() };
1810
1811 assert!(next_epoch > epoch);
1812 }
1813
1814 #[test]
1815 fn err_tx_out_of_order() {
1816 let (_dir, path, cfg) = new_tmp();
1817 let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
1818
1819 let mut tx = mmap.new_tx();
1820 unsafe {
1821 tx.write(2, |v| *v = 1).unwrap();
1822 let res = tx.write(1, |v| *v = 2);
1823 assert!(res.is_err());
1824 }
1825 }
1826
1827 #[test]
1828 fn err_tx_duplicate_index() {
1829 let (_dir, path, cfg) = new_tmp();
1830 let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
1831
1832 let mut tx = mmap.new_tx();
1833 unsafe {
1834 tx.write(1, |v| *v = 1).unwrap();
1835 let res = tx.write(1, |v| *v = 2);
1836 assert!(res.is_err());
1837 }
1838 }
1839
1840 #[test]
1841 fn ok_tx_concurrent_non_overlapping() {
1842 let (_dir, path, cfg) = new_tmp();
1843 let mmap = sync::Arc::new(FrozenMMap::<u64, MID>::new(path, cfg).unwrap());
1844
1845 let mut handles = Vec::new();
1846 for i in 0..2 {
1847 let mmap = mmap.clone();
1848 handles.push(thread::spawn(move || {
1849 let mut tx = mmap.new_tx();
1850
1851 unsafe {
1852 tx.write(i * 2, |v| *v = i as u64).unwrap();
1853 tx.write(i * 2 + 1, |v| *v = i as u64).unwrap();
1854 }
1855
1856 let epoch = tx.commit().unwrap();
1857 mmap.wait_for_durability(epoch).unwrap();
1858 }));
1859 }
1860
1861 for h in handles {
1862 h.join().unwrap();
1863 }
1864
1865 for i in 0..2 {
1866 let v0 = unsafe { mmap.read(i * 2, |v| *v).unwrap() };
1867 let v1 = unsafe { mmap.read(i * 2 + 1, |v| *v).unwrap() };
1868
1869 assert_eq!((v0, v1), (i as u64, i as u64));
1870 }
1871 }
1872
1873 #[test]
1874 fn ok_tx_overwrite_last_wins() {
1875 let (_dir, path, cfg) = new_tmp();
1876 let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
1877
1878 let mut tx = mmap.new_tx();
1879 unsafe {
1880 tx.write(0, |v| *v = 1).unwrap();
1881 }
1882
1883 tx.commit().unwrap();
1884
1885 let mut tx2 = mmap.new_tx();
1886 unsafe {
1887 tx2.write(0, |v| *v = 2).unwrap();
1888 }
1889
1890 let epoch = tx2.commit().unwrap();
1891 mmap.wait_for_durability(epoch).unwrap();
1892
1893 let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1894 assert_eq!(val, 2);
1895 }
1896
1897 #[test]
1898 fn ok_tx_persists_across_reopen() {
1899 let (_dir, path, cfg) = new_tmp();
1900
1901 {
1902 let mmap = FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap();
1903
1904 let mut tx = mmap.new_tx();
1905 unsafe {
1906 tx.write(0, |v| *v = 0x3A).unwrap();
1907 tx.write(1, |v| *v = 0x54).unwrap();
1908 }
1909
1910 let epoch = tx.commit().unwrap();
1911 mmap.wait_for_durability(epoch).unwrap();
1912 }
1913
1914 {
1915 let mmap = FrozenMMap::<u64, MID>::new(&path, cfg).unwrap();
1916
1917 let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1918 let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
1919
1920 assert_eq!((v0, v1), (0x3A, 0x54));
1921 }
1922 }
1923 }
1924
1925 mod fm_durability {
1926 use super::*;
1927
1928 #[test]
1929 fn ok_wait_then_drop() {
1930 let (_dir, path, cfg) = new_tmp();
1931 let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
1932
1933 let epoch = unsafe { mmap.write(0, |v| *v = 7).unwrap() };
1934 mmap.wait_for_durability(epoch).unwrap();
1935
1936 drop(mmap);
1937 }
1938
1939 #[test]
1940 fn ok_epoch_monotonicity() {
1941 let (_dir, path, cfg) = new_tmp();
1942 let mmap = FrozenMMap::<u64, MID>::new(path, cfg).unwrap();
1943
1944 let e1 = unsafe { mmap.write(0, |v| *v = 1).unwrap() };
1945 mmap.wait_for_durability(e1).unwrap();
1946
1947 let e2 = unsafe { mmap.write(0, |v| *v = 2).unwrap() };
1948 mmap.wait_for_durability(e2).unwrap();
1949 assert!(e2 >= e1);
1950 }
1951
1952 #[test]
1953 fn ok_wait_for_durability_with_multi_writers() {
1954 let (_dir, path, cfg) = new_tmp();
1955 let mmap = sync::Arc::new(FrozenMMap::<u64, MID>::new(path, cfg).unwrap());
1956
1957 let mut handles = Vec::new();
1958 for _ in 0..2 {
1959 let mmap = mmap.clone();
1960 handles.push(thread::spawn(move || {
1961 let epoch = unsafe { mmap.write(0, |v| *v += 1).unwrap() };
1962 mmap.wait_for_durability(epoch).unwrap();
1963 }));
1964 }
1965
1966 for h in handles {
1967 h.join().unwrap();
1968 }
1969
1970 let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1971 assert_eq!(val, 2);
1972 }
1973 }
1974
1975 mod fm_concurrency {
1976 use super::*;
1977
1978 #[test]
1979 fn ok_parallel_reads_with_diff_index() {
1980 let (_dir, path, cfg) = new_tmp();
1981 let mmap = sync::Arc::new(FrozenMMap::<u64, MID>::new(path, cfg).unwrap());
1982
1983 unsafe { mmap.write(0, |v| *v = 0x10).unwrap() };
1984 unsafe { mmap.write(1, |v| *v = 0x20).unwrap() };
1985
1986 let t1 = {
1987 let mmap = mmap.clone();
1988 thread::spawn(move || unsafe { mmap.read(0, |v| *v).unwrap() })
1989 };
1990
1991 let t2 = {
1992 let mmap = mmap.clone();
1993 thread::spawn(move || unsafe { mmap.read(1, |v| *v).unwrap() })
1994 };
1995
1996 assert_eq!(t1.join().unwrap(), 0x10);
1997 assert_eq!(t2.join().unwrap(), 0x20);
1998 }
1999
2000 #[test]
2001 fn ok_multi_tx_drop_then_reopen_grown() {
2002 let (_dir, path, cfg) = new_tmp();
2003
2004 {
2005 let mmap = sync::Arc::new(FrozenMMap::<u64, MID>::new(&path, cfg.clone()).unwrap());
2006
2007 let mut handles = Vec::new();
2008 for i in 0..2u64 {
2009 let mmap = mmap.clone();
2010 handles.push(thread::spawn(move || {
2011 let _ = unsafe { mmap.write(i as usize, |v| *v = i + 1).unwrap() };
2012 }));
2013 }
2014
2015 for i in 0..2usize {
2016 let mmap = mmap.clone();
2017 handles.push(thread::spawn(move || {
2018 let _ = unsafe { mmap.read(i, |v| *v).unwrap() };
2019 }));
2020 }
2021
2022 for h in handles {
2023 h.join().unwrap();
2024 }
2025 }
2026
2027 {
2028 let mmap = FrozenMMap::<u64, MID>::new_grown(&path, cfg.clone(), 0x10).unwrap();
2029 assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x10);
2030
2031 for i in 0..2u64 {
2032 let val = unsafe { mmap.read(i as usize, |v| *v).unwrap() };
2033 assert_eq!(val, i + 1);
2034 }
2035
2036 let idx = mmap.total_slots() - 1;
2037 unsafe { mmap.write(idx, |v| *v = 0xDEAD).unwrap() };
2038
2039 let val = unsafe { mmap.read(idx, |v| *v).unwrap() };
2040 assert_eq!(val, 0xDEAD);
2041 }
2042 }
2043 }
2044}