frozen_core/fmmap/mod.rs
1//! Custom implementation of `mmap(2)`
2//!
3//! ## Constraints
4//!
5//! [`FrozenMMap`] treats the mapped file as raw storage for values of `T`. Because of that,
6//! `T` must be a POD type which is safe to persist and later reinterpret from bytes.
7//!
8//! Required properties for `T`,
9//!
10//! - Must use `#[repr(C)]`
11//! - Should be 8-bytes aligned
12//! - Must not implement [`Drop`]
13//! - `size_of::<T>()` should be multiple of `8`
14//!
15//! *NOTE:* `T` must not contain heap owning or process-local pointers like [`Vec`], [`String`],
16//! [`Box`], references and function pointers, or other fields whose bit-pattern is not stable
17//! across reopen.
18//!
19//! These constrains are enforced as [`FrozenMMap`] does not serialize or deserialize values. It
20//! directly reads and writes `T` inside a memory mapped file. That means `T` must have a stable
21//! layout and must remain valid when the file is reopened in a later process.
22//!
23//! ## Example
24//!
25//! ```
26//! use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
27//!
28//! const MODULE_ID: u8 = 0;
29//!
30//! let dir = tempfile::tempdir().unwrap();
31//! let path = dir.path().join("tmp_frozen_mmap");
32//!
33//! let cfg = FrozenMMapCfg {
34//! module_id: MODULE_ID,
35//! initial_count: 0x0A,
36//! flush_duration: std::time::Duration::from_micros(0x96),
37//! };
38//!
39//! let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
40//! assert_eq!(mmap.total_slots(), 0x0A);
41//!
42//! let epoch = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
43//! mmap.wait_for_durability(epoch).unwrap();
44//!
45//! let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
46//! assert_eq!(val, 0xDEADC0DE);
47//!
48//! drop(mmap);
49//!
50//! let reopened = FrozenMMap::<u64>::new_grown(&path, cfg, 0x05).unwrap();
51//! assert_eq!(reopened.total_slots(), 0x0A + 0x05);
52//!
53//! let val = unsafe { reopened.read(0, |v| *v) }.unwrap();
54//! assert_eq!(val, 0xDEADC0DE);
55//! ```
56
57#[cfg(any(target_os = "linux", target_os = "macos"))]
58mod posix;
59
60use crate::{
61 error::{FrozenError, FrozenResult},
62 ffile::{FrozenFile, FrozenFileCfg},
63 hints,
64};
65use std::{
66 fmt,
67 sync::{self, atomic},
68 thread, time,
69};
70
71/// type for `epoch` used by write ops
72pub type TEpoch = u64;
73
74#[cfg(any(target_os = "linux", target_os = "macos"))]
75type TMap = posix::POSIXMMap;
76
77/// Error codes for [`FrozenMMap`]
78pub(in crate::fmmap) mod err {
79 use crate::error::{ErrCode, FrozenError, FrozenResult};
80
81 /// Domain Id for [`FrozenMMap`] is **18**
82 const ERRDOMAIN: u8 = 0x12;
83
84 /// module id used for [`FrozenMMap`]
85 pub static MID: std::sync::OnceLock<u8> = std::sync::OnceLock::new();
86
87 #[cfg(not(test))]
88 #[inline(always)]
89 pub fn mid() -> &'static u8 {
90 MID.get().unwrap()
91 }
92
93 #[cfg(test)]
94 #[inline(always)]
95 pub fn mid() -> &'static u8 {
96 MID.get_or_init(|| 0)
97 }
98
99 /// internal fuck up (hault and catch fire)
100 pub const HCF: ErrCode = ErrCode::new(0x02, "hault and catch fire");
101
102 /// unknown error (fallback)
103 pub const UNK: ErrCode = ErrCode::new(0x04, "unknown error");
104
105 /// no more memory available
106 pub const NMM: ErrCode = ErrCode::new(0x06, "not enough memory available on the device");
107
108 /// syncing error
109 pub const SYN: ErrCode = ErrCode::new(0x08, "failed to sync/flush data to storage device");
110
111 /// no write/read perm
112 pub const PRM: ErrCode = ErrCode::new(0x0A, "missing permissions for IO");
113
114 /// flush_tx error (panic inside)
115 pub const TXE: ErrCode = ErrCode::new(0x0C, "flush_tx paniced inside");
116
117 /// flush_tx error (unable to spawn)
118 pub const FXE: ErrCode = ErrCode::new(0x10, "unable to spawn flush_tx");
119
120 /// type `T` implements drop
121 pub const DRP: ErrCode = ErrCode::new(0x12, "type T must not implement `Drop`");
122
123 /// type `T` is not 8 bytes aligned
124 pub const ALN: ErrCode = ErrCode::new(0x14, "type T must be 8-bytes aligned");
125
126 /// `size_of::<T>()` is not multiple of 8
127 pub const SZE: ErrCode = ErrCode::new(0x16, "`size_of::<T>()` must be multiple of 8 bytes");
128
129 /// type `T` must not be zero sized
130 pub const ZRO: ErrCode = ErrCode::new(0x18, "type T must not be zero sized");
131
132 #[inline]
133 pub(in crate::fmmap) fn new_err<R, E: std::fmt::Display>(
134 code: ErrCode,
135 error: E,
136 ) -> FrozenResult<R> {
137 let err = FrozenError::new_raw(*mid(), ERRDOMAIN, code, error);
138 Err(err)
139 }
140
141 #[inline]
142 pub(in crate::fmmap) fn new_err_default<R>(code: ErrCode) -> FrozenResult<R> {
143 let err = FrozenError::new_raw(*mid(), ERRDOMAIN, code, "");
144 Err(err)
145 }
146
147 #[inline]
148 pub(in crate::fmmap) fn new_err_raw<E: std::fmt::Display>(
149 code: ErrCode,
150 error: E,
151 ) -> FrozenError {
152 FrozenError::new_raw(*mid(), ERRDOMAIN, code, error)
153 }
154}
155
156/// Config for [`FrozenMMap`]
157#[derive(Debug, Clone)]
158pub struct FrozenMMapCfg {
159 /// Identifier used for error propagation by [`frozen_core::error::FrozenError`]
160 pub module_id: u8,
161
162 /// Number of slots to pre-allocate when [`FrozenMMap`] is initialized
163 ///
164 /// Each slot has size of [`FrozenMMap::<T>::SLOT_SIZE`], where initial file length will
165 /// be `chunk_size * initial_count` (bytes)
166 pub initial_count: usize,
167
168 /// Time interval used by flusher tx, to batch write ops into a durable window and sync them
169 /// together, where all write ops in certain time interval falls into a single durable window
170 pub flush_duration: time::Duration,
171}
172
173/// Custom implementation of `mmap(2)`
174///
175/// ## Constraints
176///
177/// [`FrozenMMap`] treats the mapped file as raw storage for values of `T`. Because of that, `T`
178/// must be a POD type which is safe to persist and later reinterpret from bytes.
179///
180/// Required properties for `T`,
181///
182/// - Must use `#[repr(C)]`
183/// - Should be 8-bytes aligned
184/// - Must not implement [`Drop`]
185/// - `size_of::<T>()` should be multiple of `8`
186///
187/// *NOTE:* `T` must not contain heap owning or process-local pointers like [`Vec`], [`String`],
188/// [`Box`], references and function pointers, or other fields whose bit-pattern is not stable
189/// across reopen.
190///
191/// These constrains are enforced as [`FrozenMMap`] does not serialize or deserialize values. It
192/// directly reads and writes `T` inside a memory mapped file. That means `T` must have a stable
193/// layout and must remain valid when the file is reopened in a later process.
194///
195/// ## Example
196///
197/// ```
198/// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
199///
200/// const MODULE_ID: u8 = 0;
201///
202/// let dir = tempfile::tempdir().unwrap();
203/// let path = dir.path().join("tmp_frozen_mmap");
204///
205/// let cfg = FrozenMMapCfg {
206/// module_id: MODULE_ID,
207/// initial_count: 0x0A,
208/// flush_duration: std::time::Duration::from_micros(0x96),
209/// };
210///
211/// let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
212/// assert_eq!(mmap.total_slots(), 0x0A);
213///
214/// let epoch = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
215/// mmap.wait_for_durability(epoch).unwrap();
216///
217/// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
218/// assert_eq!(val, 0xDEADC0DE);
219///
220/// drop(mmap);
221///
222/// let reopened = FrozenMMap::<u64>::new_grown(&path, cfg, 0x05).unwrap();
223/// assert_eq!(reopened.total_slots(), 0x0A + 0x05);
224///
225/// let val = unsafe { reopened.read(0, |v| *v) }.unwrap();
226/// assert_eq!(val, 0xDEADC0DE);
227/// ```
228#[derive(Debug)]
229pub struct FrozenMMap<T>
230where
231 T: Sized + Send + Sync,
232{
233 core: sync::Arc<Core>,
234 tx: Option<thread::JoinHandle<()>>,
235 _t: core::marker::PhantomData<T>,
236}
237
238unsafe impl<T> Send for FrozenMMap<T> where T: Sized + Send + Sync {}
239unsafe impl<T> Sync for FrozenMMap<T> where T: Sized + Send + Sync {}
240
241impl<T> FrozenMMap<T>
242where
243 T: Sized + Send + Sync,
244{
245 /// Memory space required for each slot of [`T`] in [`FrozenMMap`]
246 pub const SLOT_SIZE: usize = std::mem::size_of::<T>();
247
248 /// Create a new [`FrozenMMap`] instance w/ given [`FrozenMMapCfg`]
249 ///
250 /// ## Multiple Instances
251 ///
252 /// For each [`FrozenMMap`] instance, we acquire an exclusive lock from the kernal for the
253 /// underlying [`FrozenFile`], when trying to create multiple instances of [`FrozenMMap`], an
254 /// error will be thrown.
255 ///
256 /// ## Capacity Growth
257 ///
258 /// [`FrozenMMap`] does not support in-place growth of a live mapping, to increase capacity,
259 /// drop the current instance and reopen w/ [`FrozenMMap::open_grown`] which provides memory
260 /// mapping over grown capacity.
261 ///
262 /// ## [`FrozenMMapCfg`]
263 ///
264 /// All configs for [`FrozenMMap`] are stored in [`FrozenMMapCfg`]
265 ///
266 /// ## Working
267 ///
268 /// We first create a new [`FrozenFile`] if note already, then map the entire file using
269 /// `mmap(2)`, the entire file must read/write `T`, which also should stay constant for the
270 /// entire lifetime of file.
271 ///
272 /// ## Important
273 ///
274 /// The `cfg` must not change any of its properties for the entire life of [`FrozenFile`],
275 /// which is used under the hood, one must use config stores like [`Rta`](https://crates.io/crates/rta)
276 /// to store config.
277 ///
278 /// ## Example
279 ///
280 /// ```
281 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
282 ///
283 /// const MODULE_ID: u8 = 0;
284 ///
285 /// let dir = tempfile::tempdir().unwrap();
286 /// let path = dir.path().join("tmp_frozen_mmap");
287 ///
288 /// let cfg = FrozenMMapCfg {
289 /// module_id: MODULE_ID,
290 /// initial_count: 0x0A,
291 /// flush_duration: std::time::Duration::from_micros(0x96),
292 /// };
293 ///
294 /// let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
295 /// assert_eq!(mmap.total_slots(), 0x0A);
296 ///
297 /// let epoch = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
298 /// mmap.wait_for_durability(epoch).unwrap();
299 ///
300 /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
301 /// assert_eq!(val, 0xDEADC0DE);
302 /// ```
303 pub fn new<P: AsRef<std::path::Path>>(path: P, cfg: FrozenMMapCfg) -> FrozenResult<Self> {
304 Self::validate_t()?;
305 let (file, curr_length) = Self::open_file(path.as_ref().to_path_buf(), &cfg)?;
306 let total_slots = curr_length / Self::SLOT_SIZE;
307
308 // NOTE: The value is used for error logging and is initialized only once, as `OnceLock`
309 // guarantees that the first caller sets the value and all subsequent calls reuse it
310 let _ = err::MID.get_or_init(|| cfg.module_id);
311
312 let mmap = unsafe { TMap::new(file.fd(), curr_length) }?;
313 let core =
314 sync::Arc::new(Core::new(mmap, file, cfg.flush_duration, curr_length, total_slots));
315
316 // INFO: we spawn the thread for background sync
317 let tx = Core::spawn_tx(core.clone())?;
318
319 Ok(Self { core, tx: Some(tx), _t: core::marker::PhantomData })
320 }
321
322 /// Create a new [`FrozenMMap`] instance w/ given [`FrozenMMapCfg`], while growing the
323 /// underlying [`FrozenFile`] by `additional_slots` before creating memory mapping
324 ///
325 /// ## Multiple Instances
326 ///
327 /// For each [`FrozenMMap`] instance, we acquire an exclusive lock from the kernal for the
328 /// underlying [`FrozenFile`], when trying to create multiple instances of [`FrozenMMap`], an
329 /// error will be thrown.
330 ///
331 /// ## Why not create a [`FrozenMMap::grow`] call?
332 ///
333 /// Previously when [`FrozenMMap::grow`] was attempted, it was observed that, resizing an active
334 /// memory mapping in place is tricky in concurrent code, as some threads would still hold
335 /// stale/unmapped pointers to mmap due to preemption from the OS schedular
336 ///
337 /// So, instead of remmaping a live instance, the current API performs growth during open,
338 /// making capacity expansion an explicit lifecycle operation, and not a side effect on a live
339 /// instance.
340 ///
341 /// ## [`FrozenMMapCfg`]
342 ///
343 /// All configs for [`FrozenMMap`] are stored in [`FrozenMMapCfg`]
344 ///
345 /// ## Working
346 ///
347 /// We first create a new [`FrozenFile`] if note already, then we grow the file using
348 /// [`FrozenFile::grow`] then map the entire file using `mmap(2)`, the entire file must
349 /// read/write `T`, which also should stay constant for the entire lifetime of file.
350 ///
351 /// ## Important
352 ///
353 /// The `cfg` must not change any of its properties for the entire life of [`FrozenFile`],
354 /// which is used under the hood, one must use config stores like
355 /// [`Rta`](https://crates.io/crates/rta) to store config.
356 ///
357 /// ## Example
358 ///
359 /// ```
360 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
361 ///
362 /// const MODULE_ID: u8 = 0;
363 ///
364 /// let dir = tempfile::tempdir().unwrap();
365 /// let path = dir.path().join("tmp_frozen_mmap");
366 ///
367 /// let cfg = FrozenMMapCfg {
368 /// module_id: MODULE_ID,
369 /// initial_count: 0x0A,
370 /// flush_duration: std::time::Duration::from_micros(0x96),
371 /// };
372 ///
373 /// let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x0A).unwrap();
374 /// assert_eq!(mmap.total_slots(), 0x0A * 2);
375 ///
376 /// let epoch = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
377 /// mmap.wait_for_durability(epoch).unwrap();
378 ///
379 /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
380 /// assert_eq!(val, 0xDEADC0DE);
381 /// ```
382 pub fn new_grown<P: AsRef<std::path::Path>>(
383 path: P,
384 cfg: FrozenMMapCfg,
385 additional_slots: usize,
386 ) -> FrozenResult<Self> {
387 Self::validate_t()?;
388 let (file, _) = Self::open_file(path.as_ref().to_path_buf(), &cfg)?;
389
390 // we grow the underlying FrozenFile as requested
391 file.grow(additional_slots)?;
392 let curr_length = file.length()?; // we must read the updated (grown) length
393 let total_slots = curr_length / Self::SLOT_SIZE;
394
395 // NOTE: The value is used for error logging and is initialized only once, as `OnceLock`
396 // guarantees that the first caller sets the value and all subsequent calls reuse it
397 let _ = err::MID.get_or_init(|| cfg.module_id);
398
399 let mmap = unsafe { TMap::new(file.fd(), curr_length) }?;
400 let core =
401 sync::Arc::new(Core::new(mmap, file, cfg.flush_duration, curr_length, total_slots));
402
403 // INFO: we spawn the thread for background sync
404 let tx = Core::spawn_tx(core.clone())?;
405
406 Ok(Self { core, tx: Some(tx), _t: core::marker::PhantomData })
407 }
408
409 /// Create/open a new [`FrozenFile`] instance
410 fn open_file(
411 path: std::path::PathBuf,
412 cfg: &FrozenMMapCfg,
413 ) -> FrozenResult<(FrozenFile, usize)> {
414 let ff_cfg = FrozenFileCfg {
415 path,
416 module_id: cfg.module_id,
417 buffer_size: Self::SLOT_SIZE,
418 initial_available_buffers: cfg.initial_count,
419 };
420
421 let file = FrozenFile::new(ff_cfg)?;
422 let curr_length = file.length()?;
423
424 Ok((file, curr_length))
425 }
426
427 #[inline]
428 fn validate_t() -> FrozenResult<()> {
429 if std::mem::needs_drop::<T>() {
430 return err::new_err_default(err::DRP);
431 }
432
433 let align = std::mem::align_of::<T>();
434 if align != 8 {
435 return err::new_err_default(err::ALN);
436 }
437
438 let size = std::mem::size_of::<T>();
439 if size == 0 {
440 return err::new_err_default(err::ZRO);
441 }
442
443 if size % 8 != 0 {
444 return err::new_err_default(err::SZE);
445 }
446
447 Ok(())
448 }
449
450 /// Blocks until given `epoch` becomes durable
451 ///
452 /// ## Batching
453 ///
454 /// With respect to `flush_duration`, all write ops are batched before sync, which is executed
455 /// by flusher tx working in background, while each write is assigned w/ current durable epoch,
456 /// and all writes which observe the exact same epoch, belong to the same durability window, and
457 /// are all sync'ed together
458 ///
459 /// When a background sync succeeds, the internal durable epoch is incremented, indicating that
460 /// all writes that observed the previous epoch are now durable on disk
461 ///
462 /// ## Example
463 ///
464 /// ```
465 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
466 ///
467 /// const MODULE_ID: u8 = 0;
468 ///
469 /// let dir = tempfile::tempdir().unwrap();
470 /// let path = dir.path().join("tmp_wait_epoch");
471 ///
472 /// let cfg = FrozenMMapCfg {
473 /// module_id: MODULE_ID,
474 /// initial_count: 0x04,
475 /// flush_duration: std::time::Duration::from_micros(0x60),
476 /// };
477 ///
478 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
479 ///
480 /// let epoch = unsafe { mmap.write(0, |v| *v = 0x8A) }.unwrap();
481 /// mmap.wait_for_durability(epoch).unwrap();
482 ///
483 /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
484 /// assert_eq!(val, 0x8A);
485 /// ```
486 pub fn wait_for_durability(&self, epoch: u64) -> FrozenResult<()> {
487 if let Some(sync_err) = self.core.get_sync_error() {
488 return Err(sync_err);
489 }
490
491 let durable_epoch = self.core.durable_epoch.load(atomic::Ordering::Acquire);
492 if durable_epoch >= epoch {
493 return Ok(());
494 }
495
496 let mut guard = self.core.acquire_durable_lock();
497 loop {
498 if let Some(sync_err) = self.core.get_sync_error() {
499 return Err(sync_err);
500 }
501
502 if self.core.durable_epoch.load(atomic::Ordering::Acquire) >= epoch {
503 return Ok(());
504 }
505
506 // NOTE: See [`Core::acquire_durable_lock`] implementation for rationale behind poison recovery
507 guard = self.core.durable_cv.wait(guard).unwrap_or_else(|e| e.into_inner());
508 }
509 }
510
511 /// Read a `T` at given `index` via callback (`f`)
512 ///
513 /// ## Concurrency
514 ///
515 /// Internally, [`FrozenMMap`] implements per-slot locking, so concurrent reads and writes for
516 /// at same index is atomic and thread safe, while operations on different indices may proceed
517 /// fully in parallel
518 ///
519 /// ## Safety
520 ///
521 /// The caller must ensure following:
522 ///
523 /// - given `index` is within bounds
524 /// - underlying memory contains a valid instance of `T`
525 /// - provided callback `f` must not,
526 /// - write through the pointer
527 /// - store or leak pointer beyound there lifetime
528 ///
529 /// Violating any of the above may result in undefined behavior
530 ///
531 /// ## Example
532 ///
533 /// ```
534 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
535 ///
536 /// const MODULE_ID: u8 = 0;
537 ///
538 /// let dir = tempfile::tempdir().unwrap();
539 /// let path = dir.path().join("tmp_read_mmap");
540 ///
541 /// let cfg = FrozenMMapCfg {
542 /// module_id: MODULE_ID,
543 /// initial_count: 0x02,
544 /// flush_duration: std::time::Duration::from_micros(0x60),
545 /// };
546 ///
547 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
548 ///
549 /// let epoch = unsafe { mmap.write(0, |v| *v = 0x0A) }.unwrap();
550 /// mmap.wait_for_durability(epoch).unwrap();
551 ///
552 /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
553 /// assert_eq!(val, 0x0A);
554 /// ```
555 #[inline(always)]
556 pub unsafe fn read<R>(&self, index: usize, f: impl FnOnce(*const T) -> R) -> FrozenResult<R> {
557 let offset = Self::SLOT_SIZE * index;
558 let _lock = self.core.locks.lock(index);
559
560 // NOTE: We do avoid acquiring io_lock for read ops to increase the throughput (under the
561 // assumption of OS guarantees visibility)
562
563 let ptr = unsafe { self.core.map.as_ptr(offset) };
564 Ok(f(ptr))
565 }
566
567 /// Write/update a `T` at given `index` via callback (`f`)
568 ///
569 /// ## Concurrency
570 ///
571 /// Internally, [`FrozenMMap`] implements per-slot locking, so concurrent reads and writes for
572 /// at same index is atomic and thread safe, while operations on different indices may proceed
573 /// fully in parallel
574 ///
575 /// ## Safety
576 ///
577 /// The caller must ensure following:
578 ///
579 /// - given `index` is within bounds
580 /// - underlying memory contains a valid instance of `T`
581 /// - provided callback `f` must not store or leak pointer beyound there lifetime
582 ///
583 /// Violating any of the above may result in undefined behavior
584 ///
585 /// ## Example
586 ///
587 /// ```
588 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
589 ///
590 /// const MODULE_ID: u8 = 0;
591 ///
592 /// let dir = tempfile::tempdir().unwrap();
593 /// let path = dir.path().join("tmp_write_mmap");
594 ///
595 /// let cfg = FrozenMMapCfg {
596 /// module_id: MODULE_ID,
597 /// initial_count: 0x02,
598 /// flush_duration: std::time::Duration::from_micros(0x96),
599 /// };
600 ///
601 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
602 ///
603 /// let epoch = unsafe {mmap.write(1, |v| *v = 0x2B) }.unwrap();
604 /// mmap.wait_for_durability(epoch).unwrap();
605 ///
606 /// let val = unsafe { mmap.read(1, |v| *v) }.unwrap();
607 /// assert_eq!(val, 0x2B);
608 /// ```
609 #[inline(always)]
610 pub unsafe fn write(&self, index: usize, f: impl FnOnce(*mut T)) -> FrozenResult<TEpoch> {
611 // propagate prev errors
612 if let Some(err) = self.core.get_sync_error() {
613 return Err(err);
614 }
615
616 let offset = Self::SLOT_SIZE * index;
617
618 let _guard = self.core.acquire_io_lock();
619 let _lock = self.core.locks.lock(index);
620
621 let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
622 f(ptr);
623
624 self.core.dirty.store(true, atomic::Ordering::Release);
625 let epoch = self.core.incr_curr_epoch();
626
627 Ok(epoch)
628 }
629
630 /// Write/update a `T` at given `index` via callback (`f`) w/ instant durability
631 ///
632 /// This function performs a blocking hard-sync, unlike [`FrozenMMap::write`], the update is
633 /// immediately persisted to the underlying storage device
634 ///
635 /// ## Concurrency
636 ///
637 /// Internally, [`FrozenMMap`] implements per-slot locking, so concurrent reads and writes for
638 /// at same index is atomic and thread safe, while operations on different indices may proceed
639 /// fully in parallel.
640 ///
641 /// ## Safety
642 ///
643 /// The caller must ensure following:
644 ///
645 /// - given `index` is within bounds
646 /// - underlying memory contains a valid instance of `T`
647 /// - provided callback `f` must not store or leak pointer beyound there lifetime
648 ///
649 /// Violating any of the above may result in undefined behavior
650 ///
651 /// ## Example
652 ///
653 /// ```
654 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
655 ///
656 /// const MODULE_ID: u8 = 0;
657 ///
658 /// let dir = tempfile::tempdir().unwrap();
659 /// let path = dir.path().join("tmp_write_sync");
660 ///
661 /// let cfg = FrozenMMapCfg {
662 /// module_id: MODULE_ID,
663 /// initial_count: 0x02,
664 /// flush_duration: std::time::Duration::from_micros(0x96),
665 /// };
666 ///
667 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
668 /// unsafe { mmap.write_sync(0, |v| *v = 0xC0DE) }.unwrap();
669 ///
670 /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
671 /// assert_eq!(val, 0xC0DE);
672 /// ```
673 #[inline(always)]
674 pub unsafe fn write_sync(&self, index: usize, f: impl FnOnce(*mut T)) -> FrozenResult<()> {
675 // propagate prev errors
676 if let Some(err) = self.core.get_sync_error() {
677 return Err(err);
678 }
679
680 let offset = Self::SLOT_SIZE * index;
681
682 // block flush_tx scheduling
683 let _flush_guard = self.core.acquire_lock();
684
685 // we use exlusive lock as we perform blocking hard sync
686 let _guard = self.core.acquire_exclusive_io_lock();
687
688 let _lock = self.core.locks.lock(index);
689 let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
690 f(ptr);
691
692 // blocking hard sync
693 self.core.sync()?;
694
695 // NOTE: we hard sync we flushed an entier batch, so we can skip the sync via flush_tx as
696 // the current batch is durable
697
698 self.core.mark_epoch_durable();
699 let prev = self.core.dirty.swap(false, atomic::Ordering::AcqRel);
700
701 // NOTE: we must also notify cv's waiting for durability (we skip if there was no batch to
702 // sync)
703 if prev {
704 let _g = self.core.acquire_durable_lock();
705 self.core.durable_cv.notify_all();
706 }
707
708 Ok(())
709 }
710
711 /// Read current available count of slots, where each slot has size of [`FrozenMMap::<T>::SLOT_SIZE`]
712 ///
713 /// ## Working
714 ///
715 /// This call performs a syscall to fetch current length of [`FrozenFile`] from fs, as the
716 /// current length of the file is not cached anywhere in the pipeline to avoid TOCTAU race
717 /// conditions
718 ///
719 /// ## Example
720 ///
721 /// ```
722 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
723 ///
724 /// const MODULE_ID: u8 = 0;
725 ///
726 /// let dir = tempfile::tempdir().unwrap();
727 /// let path = dir.path().join("tmp_grow_mmap");
728 ///
729 /// let cfg = FrozenMMapCfg {
730 /// module_id: MODULE_ID,
731 /// initial_count: 0x02,
732 /// flush_duration: std::time::Duration::from_micros(0x96),
733 /// };
734 ///
735 /// let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
736 /// assert_eq!(mmap.total_slots(), 0x02);
737 ///
738 /// drop(mmap);
739 ///
740 /// let mmap = FrozenMMap::<u64>::new_grown(&path, cfg, 0x03).unwrap();
741 /// assert_eq!(mmap.total_slots(), 0x02 + 0x03);
742 /// ```
743 #[inline]
744 pub fn total_slots(&self) -> usize {
745 self.core.curr_length / Self::SLOT_SIZE
746 }
747
748 /// Read the total memory footprint (in bytes) used by [`FrozenMMap`]
749 ///
750 /// *NOTE:* This is an approzimation of memory used, actual RSS may differ depending on paging
751 /// and OS
752 ///
753 /// ## Example
754 ///
755 /// ```
756 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
757 ///
758 /// const MODULE_ID: u8 = 0;
759 ///
760 /// let dir = tempfile::tempdir().unwrap();
761 /// let path = dir.path().join("tmp_mem_usage");
762 ///
763 /// let cfg = FrozenMMapCfg {
764 /// module_id: MODULE_ID,
765 /// initial_count: 0x10,
766 /// flush_duration: std::time::Duration::from_micros(0x60),
767 /// };
768 ///
769 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
770 ///
771 /// let bytes = mmap.memory_usage();
772 /// assert!(bytes >= mmap.total_slots() * std::mem::size_of::<u64>());
773 /// ```
774 #[inline]
775 pub fn memory_usage(&self) -> usize {
776 // memory of mem-maped region
777 let mmap_bytes = self.core.curr_length;
778
779 // memory used for locking
780 let lock_bytes = self.core.locks.0.len() * std::mem::size_of::<atomic::AtomicU8>();
781
782 mmap_bytes + lock_bytes
783 }
784
785 /// Create a new [`FMTransaction`] context for grouping multi write ops into a single atomic
786 /// operation
787 ///
788 /// ## Overview
789 ///
790 /// The use of [`FMTransaction`] allows to group multiple write ops into a single atomic
791 /// operation, hence creating a transactional write operation, which gives following guarantees:
792 ///
793 /// - All write ops succeed together
794 /// - Single epoch to track durability of all writes ops
795 /// - Same durability guarantee for all the included write ops
796 ///
797 /// Simply, this preserves atomic durability semantics for multi index updates
798 ///
799 /// ## Example
800 ///
801 /// ```
802 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
803 ///
804 /// const MODULE_ID: u8 = 0;
805 ///
806 /// let dir = tempfile::tempdir().unwrap();
807 /// let path = dir.path().join("tmp_tx");
808 ///
809 /// let cfg = FrozenMMapCfg {
810 /// module_id: MODULE_ID,
811 /// initial_count: 0x0A,
812 /// flush_duration: std::time::Duration::from_micros(50),
813 /// };
814 ///
815 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
816 ///
817 /// let mut tx = mmap.new_tx();
818 /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
819 /// unsafe { tx.write(1, |v| *v = 0x14) }.unwrap();
820 ///
821 /// let epoch = tx.commit().unwrap();
822 /// mmap.wait_for_durability(epoch).unwrap();
823 ///
824 /// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
825 /// let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
826 ///
827 /// assert_eq!((v0, v1), (0x0A, 0x14));
828 /// ```
829 #[inline]
830 pub fn new_tx(&self) -> FMTransaction<'_, T> {
831 FMTransaction { core: &self.core, ops_vec: Vec::new() }
832 }
833
834 /// Delete the underlying [`FrozenFile`] used for [`FrozenMMap`] from fs
835 ///
836 /// ## Working
837 ///
838 /// When `delete` is called, all read, write, and (background) sync ops are paused
839 /// (indefinitely), whule deletion is done with following steps:
840 ///
841 /// - acquire an exclusive `io_lock` (all other ops are paused indefinitely)
842 /// - if any batch is pending for sync,
843 /// - swap the flag
844 /// - call sync manually
845 /// - incr epoch and update cv
846 /// - brodcast closing so flusher tx could wrap up
847 /// - `munmap(2)` current mapping
848 /// - call delete on [`FrozenFile`]
849 ///
850 /// ## Example
851 ///
852 /// ```
853 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
854 ///
855 /// const MODULE_ID: u8 = 0;
856 ///
857 /// let dir = tempfile::tempdir().unwrap();
858 /// let path = dir.path().join("tmp_delete_mmap");
859 ///
860 /// let cfg = FrozenMMapCfg {
861 /// module_id: MODULE_ID,
862 /// initial_count: 0x04,
863 /// flush_duration: std::time::Duration::from_micros(0x96),
864 /// };
865 ///
866 /// let mut mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
867 /// mmap.delete().unwrap();
868 /// assert!(!path.exists());
869 /// ```
870 pub fn delete(&mut self) -> FrozenResult<()> {
871 // NOTE: we must broadcast that the close is happening to allow flusher tx to wrap up
872 self.core.dirty.store(false, atomic::Ordering::Release);
873 self.core.closed.store(true, atomic::Ordering::Release);
874 self.core.durable_cv.notify_one();
875
876 if let Some(handle) = self.tx.take() {
877 let _ = handle.join();
878 }
879
880 // pause all new write ops
881 let _lock = self.core.acquire_exclusive_io_lock();
882
883 self.munmap()?;
884 self.core.file.delete()
885 }
886
887 #[inline]
888 fn munmap(&self) -> FrozenResult<()> {
889 let length = self.core.curr_length;
890 unsafe { self.core.map.unmap(length) }
891 }
892}
893
894impl<T> Drop for FrozenMMap<T>
895where
896 T: Sized + Send + Sync,
897{
898 fn drop(&mut self) {
899 let is_closed = self.core.closed.swap(true, atomic::Ordering::Release);
900 self.core.cv.notify_one(); // notify flusher tx to shut
901
902 if let Some(handle) = self.tx.take() {
903 let _ = handle.join();
904 }
905
906 // we must acquire an exclusive lock, to prevent dropping while sync, growing or any io ops
907 let _io_lock = self.core.acquire_exclusive_io_lock();
908
909 // free up the boxed error (if any)
910 let ptr = self.core.error.swap(std::ptr::null_mut(), atomic::Ordering::AcqRel);
911 if !ptr.is_null() {
912 unsafe {
913 drop(Box::from_raw(ptr));
914 }
915 }
916
917 if !is_closed {
918 let _ = self.munmap();
919 }
920 }
921}
922
923impl<T> fmt::Display for FrozenMMap<T>
924where
925 T: Sized + Send + Sync,
926{
927 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
928 write!(
929 f,
930 "FrozenMMap{{fd: {}, total_slots: {}, len: {}}}",
931 self.core.file.fd(),
932 self.total_slots(),
933 self.core.curr_length,
934 )
935 }
936}
937
938/// A context for grouping multi write ops into a single atomic operation
939///
940/// ## Overview
941///
942/// Use of [`FMTransaction`] allows to group multiple write ops into a single atomic operation.
943///
944/// - All included writes are applied together
945/// - Single epoch is assinged for an entier transaction
946/// - Durability guarantee is same for all included write ops
947///
948/// ## Example
949///
950/// ```
951/// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
952///
953/// const MODULE_ID: u8 = 0;
954///
955/// let dir = tempfile::tempdir().unwrap();
956/// let path = dir.path().join("tmp_tx");
957///
958/// let cfg = FrozenMMapCfg {
959/// module_id: MODULE_ID,
960/// initial_count: 0x0A,
961/// flush_duration: std::time::Duration::from_micros(50),
962/// };
963///
964/// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
965///
966/// let mut tx = mmap.new_tx();
967/// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
968/// unsafe { tx.write(1, |v| *v = 0x14) }.unwrap();
969/// unsafe { tx.write(2, |v| *v = 0x18) }.unwrap();
970///
971/// let epoch = tx.commit().unwrap();
972/// mmap.wait_for_durability(epoch).unwrap();
973///
974/// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
975/// let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
976/// let v2 = unsafe { mmap.read(2, |v| *v).unwrap() };
977///
978/// assert_eq!((v0, v1, v2), (0x0A, 0x14, 0x18));
979/// ```
980pub struct FMTransaction<'a, T> {
981 core: &'a Core,
982 ops_vec: Vec<(usize, Box<dyn FnOnce(*mut T) + 'a>)>,
983}
984
985impl<'a, T> FMTransaction<'a, T> {
986 /// Append a write op into the [`FMTransaction`]
987 ///
988 /// ## Requirements
989 ///
990 /// Write ops must follow these safety requirements,
991 ///
992 /// - No duplicate indices
993 /// - No out-of-order writes (must be incremental)
994 ///
995 /// Violating this constraint would result in [`FrozenError`]
996 ///
997 /// ## Safety
998 ///
999 /// Same safety requirements as [`FrozenMMap::write`] apply here
1000 ///
1001 /// ## Example
1002 ///
1003 /// ```
1004 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
1005 ///
1006 /// const MODULE_ID: u8 = 0;
1007 ///
1008 /// let dir = tempfile::tempdir().unwrap();
1009 /// let path = dir.path().join("tmp_tx");
1010 ///
1011 /// let cfg = FrozenMMapCfg {
1012 /// module_id: MODULE_ID,
1013 /// initial_count: 0x10,
1014 /// flush_duration: std::time::Duration::from_micros(50),
1015 /// };
1016 ///
1017 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1018 ///
1019 /// let mut tx = mmap.new_tx();
1020 /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
1021 /// unsafe { tx.write(1, |v| *v = 0x0B) }.unwrap();
1022 /// unsafe { tx.write(2, |v| *v = 0x0C) }.unwrap();
1023 ///
1024 /// let epoch = tx.commit().unwrap();
1025 /// mmap.wait_for_durability(epoch).unwrap();
1026 ///
1027 /// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1028 /// let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
1029 /// let v2 = unsafe { mmap.read(2, |v| *v).unwrap() };
1030 ///
1031 /// assert_eq!((v0, v1, v2), (0x0A, 0x0B, 0x0C));
1032 /// ```
1033 #[inline(always)]
1034 pub unsafe fn write<F>(&mut self, index: usize, f: F) -> FrozenResult<()>
1035 where
1036 F: FnOnce(*mut T) + 'a,
1037 {
1038 // NOTE:
1039 //
1040 // This check prevents a potential footgun! For a safe transaction all writes must be,
1041 // - ordered by index (either incr or decr)
1042 // - no multi writes on same index
1043 //
1044 // If any of these is violated, there is a certain risk of deadlock in multi tx env's
1045 if let Some((last_idx, _)) = self.ops_vec.last() {
1046 if index <= *last_idx {
1047 return err::new_err(
1048 err::HCF,
1049 "tx writes must be strictly ordered, with no more then single ops on given index",
1050 );
1051 }
1052 }
1053
1054 self.ops_vec.push((index, Box::new(f)));
1055 Ok(())
1056 }
1057
1058 /// Commit the transaction, applying all the writes ops, combined into a single atomic operation
1059 ///
1060 /// ## Guarantees
1061 ///
1062 /// - All writes are applied under a single epoch
1063 /// - All writes belong to the same durability batch
1064 /// - No interleaving with other transactions at epoch level
1065 ///
1066 /// ## Example
1067 ///
1068 /// ```
1069 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
1070 ///
1071 /// const MODULE_ID: u8 = 0;
1072 ///
1073 /// let dir = tempfile::tempdir().unwrap();
1074 /// let path = dir.path().join("tmp_tx");
1075 ///
1076 /// let cfg = FrozenMMapCfg {
1077 /// module_id: MODULE_ID,
1078 /// initial_count: 0x10,
1079 /// flush_duration: std::time::Duration::from_micros(50),
1080 /// };
1081 ///
1082 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1083 ///
1084 /// let mut tx = mmap.new_tx();
1085 /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
1086 /// unsafe { tx.write(2, |v| *v = 0x0C) }.unwrap();
1087 ///
1088 /// let epoch = tx.commit().unwrap();
1089 /// mmap.wait_for_durability(epoch).unwrap();
1090 ///
1091 /// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1092 /// let v1 = unsafe { mmap.read(2, |v| *v).unwrap() };
1093 ///
1094 /// assert_eq!((v0, v1), (0x0A, 0x0C));
1095 /// ```
1096 #[inline(always)]
1097 pub fn commit(self) -> FrozenResult<u64> {
1098 if let Some(err) = self.core.get_sync_error() {
1099 return Err(err);
1100 }
1101
1102 let _guard = self.core.acquire_io_lock();
1103
1104 // NOTE: we must acquire all locks beforehand, to make sure all the write go through
1105 let mut guards = Vec::with_capacity(self.ops_vec.len());
1106 for (idx, _) in &self.ops_vec {
1107 guards.push(self.core.locks.lock(*idx));
1108 }
1109
1110 for (idx, op) in self.ops_vec {
1111 let offset = idx * std::mem::size_of::<T>();
1112 let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
1113 op(ptr);
1114 }
1115
1116 self.core.dirty.store(true, atomic::Ordering::Release);
1117 let epoch = self.core.incr_curr_epoch();
1118
1119 Ok(epoch)
1120 }
1121}
1122
1123#[derive(Debug)]
1124struct Core {
1125 map: TMap,
1126 locks: Locks,
1127 file: FrozenFile,
1128 cv: sync::Condvar,
1129 curr_length: usize,
1130 lock: sync::Mutex<()>,
1131 io_lock: sync::RwLock<()>,
1132 dirty: atomic::AtomicBool,
1133 durable_cv: sync::Condvar,
1134 closed: atomic::AtomicBool,
1135 durable_lock: sync::Mutex<()>,
1136 flush_duration: time::Duration,
1137 current_epoch: atomic::AtomicU64,
1138 durable_epoch: atomic::AtomicU64,
1139 error: atomic::AtomicPtr<sync::Arc<FrozenError>>,
1140}
1141
1142unsafe impl Send for Core {}
1143unsafe impl Sync for Core {}
1144
1145impl Core {
1146 fn new(
1147 map: TMap,
1148 file: FrozenFile,
1149 flush_duration: time::Duration,
1150 curr_length: usize,
1151 total_slots: usize,
1152 ) -> Self {
1153 Self {
1154 map,
1155 file,
1156 curr_length,
1157 flush_duration,
1158 cv: sync::Condvar::new(),
1159 lock: sync::Mutex::new(()),
1160 io_lock: sync::RwLock::new(()),
1161 locks: Locks::new(total_slots),
1162 durable_cv: sync::Condvar::new(),
1163 durable_lock: sync::Mutex::new(()),
1164 dirty: atomic::AtomicBool::new(false),
1165 closed: atomic::AtomicBool::new(false),
1166 current_epoch: atomic::AtomicU64::new(0),
1167 durable_epoch: atomic::AtomicU64::new(0),
1168 error: atomic::AtomicPtr::new(std::ptr::null_mut()),
1169 }
1170 }
1171
1172 #[inline]
1173 fn sync(&self) -> FrozenResult<()> {
1174 unsafe { self.map.sync(self.curr_length) }?;
1175 self.file.sync()
1176 }
1177
1178 #[inline(always)]
1179 fn set_sync_error(&self, err: FrozenError) {
1180 let boxed = Box::into_raw(Box::new(sync::Arc::new(err)));
1181 let old = self.error.swap(boxed, atomic::Ordering::AcqRel);
1182
1183 // NOTE: we must free the old error, if any, to avoid mem leaks
1184 if !old.is_null() {
1185 unsafe { drop(Box::from_raw(old)) };
1186 }
1187 }
1188
1189 #[inline(always)]
1190 fn get_sync_error(&self) -> Option<FrozenError> {
1191 let ptr = self.error.load(atomic::Ordering::Acquire);
1192 if hints::likely(ptr.is_null()) {
1193 return None;
1194 }
1195
1196 let arc = unsafe { &*ptr }.clone();
1197 Some((*arc).clone())
1198 }
1199
1200 #[inline]
1201 fn clear_sync_error(&self) {
1202 let old = self.error.swap(std::ptr::null_mut(), atomic::Ordering::AcqRel);
1203 if hints::unlikely(!old.is_null()) {
1204 unsafe {
1205 drop(Box::from_raw(old));
1206 }
1207 }
1208 }
1209
1210 /// ## Why we ignore [`std::sync::PoisonError`]?
1211 ///
1212 /// The mutex used for lock, is solely used as a parking primitive for [`Condvar`] and does not
1213 /// protect any mutable state. All the pool invariants and accounting are maintained via atomics
1214 /// and are completely seperated from the mutex.
1215 ///
1216 /// A poisoned mutex only indicates that another tx panicked while holding the lock, and indicates
1217 /// an inconsistent state of the protected value. Since no state can be left partially modified
1218 /// under this lock, there is no possible consistency risk to recover from and propagating the
1219 /// poison error would only introduce unnecessary failures into the allocation path.
1220 ///
1221 /// Therefore, as best effort, we consume the [`std::sync::PoisonError`] and continue operating
1222 /// with the recovered guard.
1223 #[inline]
1224 fn acquire_durable_lock(&self) -> sync::MutexGuard<'_, ()> {
1225 self.durable_lock.lock().unwrap_or_else(|e| e.into_inner())
1226 }
1227
1228 /// NOTE: See [`Core::acquire_durable_lock`] implementation for rationale behind poison recovery
1229 #[inline]
1230 fn acquire_lock(&self) -> sync::MutexGuard<'_, ()> {
1231 self.lock.lock().unwrap_or_else(|e| e.into_inner())
1232 }
1233
1234 /// NOTE: See [`Core::acquire_durable_lock`] implementation for rationale behind poison recovery
1235 #[inline]
1236 fn acquire_io_lock(&self) -> sync::RwLockReadGuard<'_, ()> {
1237 self.io_lock.read().unwrap_or_else(|e| e.into_inner())
1238 }
1239
1240 /// NOTE: See [`Core::acquire_durable_lock`] implementation for rationale behind poison recovery
1241 #[inline]
1242 fn acquire_exclusive_io_lock(&self) -> sync::RwLockWriteGuard<'_, ()> {
1243 self.io_lock.write().unwrap_or_else(|e| e.into_inner())
1244 }
1245
1246 #[inline]
1247 fn incr_curr_epoch(&self) -> u64 {
1248 self.current_epoch.fetch_add(1, atomic::Ordering::Release) + 1
1249 }
1250
1251 #[inline]
1252 fn mark_epoch_durable(&self) {
1253 let curr_epoch = self.current_epoch.load(atomic::Ordering::Acquire);
1254 self.durable_epoch.store(curr_epoch, atomic::Ordering::Release);
1255 }
1256
1257 fn spawn_tx(core: sync::Arc<Self>) -> FrozenResult<thread::JoinHandle<()>> {
1258 match thread::Builder::new().name("fm-flush-tx".into()).spawn(move || Self::flush_tx(core))
1259 {
1260 Ok(tx) => Ok(tx),
1261 Err(error) => err::new_err(err::FXE, error),
1262 }
1263 }
1264
1265 fn flush_tx(core: sync::Arc<Self>) {
1266 // init phase (acquiring locks)
1267 let mut guard = match core.lock.lock() {
1268 Ok(g) => g,
1269 Err(error) => {
1270 core.set_sync_error(err::new_err_raw(err::FXE, error));
1271 core.cv.notify_all();
1272 return;
1273 }
1274 };
1275
1276 // sync loop w/ non-busy waiting
1277 loop {
1278 guard = match core.cv.wait_timeout(guard, core.flush_duration) {
1279 Ok((g, _)) => g,
1280 Err(e) => {
1281 core.set_sync_error(err::new_err_raw(err::TXE, e));
1282 core.cv.notify_all();
1283 return;
1284 }
1285 };
1286
1287 // NOTE: we must read values of close brodcast before acquire exclusive lock, if done
1288 // otherwise, we impose serious deadlock sort of situation for the the flusher tx
1289
1290 let dirty = core.dirty.swap(false, atomic::Ordering::AcqRel);
1291 let closing = core.closed.load(atomic::Ordering::Acquire);
1292
1293 if !dirty {
1294 if closing {
1295 core.cv.notify_all();
1296 return;
1297 }
1298
1299 continue;
1300 }
1301
1302 // INFO: we must acquire an exclusive IO lock for sync, hence no write, read or
1303 // grow could kick in while sync is in progress
1304
1305 let io_lock = core.acquire_exclusive_io_lock();
1306
1307 // INFO: we must drop the guard before syscall, as its a blocking operation and holding
1308 // the mutex while the syscall takes place is not a good idea, while we drop the mutex
1309 // and acqurie it again, in-between other process could acquire it and use it
1310 drop(guard);
1311
1312 // NOTE: We snapshot `current_epoch` before sync to establish a strict batch boundary,
1313 // all writes up to `target_epoch` are guaranteed to be part of this flush window,
1314 // while anything beyound belongs to the next batch
1315 let target_epoch = core.current_epoch.load(atomic::Ordering::Acquire);
1316
1317 // NOTE:
1318 //
1319 // - if sync fails, we update the Core::error w/ the received error object
1320 // - we clear it up when another sync call succeeds
1321 // - this is valid, as the underlying sync flushes entire mmaped region, hence
1322 // even if the last call failed, and the new one succeeded, we do get the durability
1323 // guarenty for the old data as well
1324
1325 match core.sync() {
1326 Ok(_) => {
1327 core.durable_epoch.store(target_epoch, atomic::Ordering::Release);
1328
1329 let _g = core.acquire_durable_lock();
1330
1331 core.clear_sync_error();
1332 core.durable_cv.notify_all();
1333 }
1334 Err(err) => {
1335 core.set_sync_error(err);
1336 core.durable_cv.notify_all();
1337 }
1338 }
1339
1340 drop(io_lock);
1341 guard = core.acquire_lock();
1342 }
1343 }
1344}
1345
1346#[derive(Debug)]
1347struct Locks(Box<[atomic::AtomicU8]>);
1348
1349impl Locks {
1350 const LOCK: u8 = 1;
1351 const UNLOCK: u8 = 0;
1352
1353 const L1_CONTENTION: usize = 0x10;
1354 const L2_CONTENTION: usize = 0x20;
1355
1356 fn new(cap: usize) -> Self {
1357 let mut slots = Vec::with_capacity(cap);
1358 for _ in 0..cap {
1359 slots.push(atomic::AtomicU8::new(Self::UNLOCK));
1360 }
1361
1362 Self(slots.into_boxed_slice())
1363 }
1364
1365 #[inline(always)]
1366 fn lock(&self, index: usize) -> LockGuard<'_> {
1367 let val = &self.0[index];
1368 let mut spins = 0;
1369
1370 loop {
1371 if val
1372 .compare_exchange_weak(
1373 Self::UNLOCK,
1374 Self::LOCK,
1375 atomic::Ordering::Acquire,
1376 atomic::Ordering::Relaxed,
1377 )
1378 .is_ok()
1379 {
1380 return LockGuard(val);
1381 }
1382
1383 // we must backoff under contention
1384
1385 // NOTE:
1386 //
1387 // We have three levels of backoff,
1388 //
1389 // 1. Busy waiting w/ CPU hint
1390 // 2. Willingly allow preemption by OS schedular
1391 // 3. Sleep the thread (increasing w/ exponenial factor)
1392
1393 if hints::likely(spins < Self::L1_CONTENTION) {
1394 std::hint::spin_loop();
1395 } else if spins < Self::L2_CONTENTION {
1396 thread::yield_now();
1397 } else {
1398 let ns = 0x30 << (spins - Self::L2_CONTENTION).min(0x0A);
1399 thread::sleep(time::Duration::from_nanos(ns));
1400 }
1401
1402 spins += 1;
1403 }
1404 }
1405}
1406
1407struct LockGuard<'a>(&'a atomic::AtomicU8);
1408
1409impl Drop for LockGuard<'_> {
1410 fn drop(&mut self) {
1411 self.0.store(Locks::UNLOCK, atomic::Ordering::Release);
1412 }
1413}
1414
1415#[cfg(test)]
1416mod tests {
1417 use super::*;
1418
1419 // NOTE: we keep this small on purpose, so we won't have to wait at all in tests
1420 const FLUSH_DURATION: time::Duration = time::Duration::from_micros(10);
1421
1422 const MODULE_ID: u8 = 0;
1423 const INIT_SLOTS: usize = 0x0A;
1424
1425 fn new_tmp() -> (tempfile::TempDir, std::path::PathBuf, FrozenMMapCfg) {
1426 let dir = tempfile::tempdir().unwrap();
1427 let path = dir.path().join("tmp_map");
1428
1429 let cfg = FrozenMMapCfg {
1430 module_id: MODULE_ID,
1431 initial_count: INIT_SLOTS,
1432 flush_duration: FLUSH_DURATION,
1433 };
1434
1435 (dir, path, cfg)
1436 }
1437
1438 mod fm_lifecycle {
1439 use super::*;
1440
1441 #[test]
1442 fn ok_new() {
1443 let (_dir, path, cfg) = new_tmp();
1444 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1445
1446 assert_eq!(mmap.core.flush_duration, FLUSH_DURATION);
1447 assert!(!mmap.core.dirty.load(atomic::Ordering::Acquire));
1448 assert!(!mmap.core.closed.load(atomic::Ordering::Acquire));
1449 assert_eq!(mmap.core.durable_epoch.load(atomic::Ordering::Acquire), 0);
1450 assert_eq!(mmap.core.curr_length, INIT_SLOTS * FrozenMMap::<u64>::SLOT_SIZE);
1451
1452 // satisfies the bg thread was spawned correctly
1453 assert!(mmap.core.error.load(atomic::Ordering::Acquire).is_null());
1454
1455 // satisfies wait on epoch works
1456 let epoch = unsafe { mmap.write(0, |f| *f = 0x0A).unwrap() };
1457 assert!(mmap.wait_for_durability(epoch).is_ok());
1458 }
1459
1460 #[test]
1461 fn ok_new_existing() {
1462 let (_dir, path, cfg) = new_tmp();
1463
1464 // create new + close
1465 let mmap1 = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1466 drop(mmap1);
1467
1468 // open existing
1469 let mmap2 = FrozenMMap::<u64>::new(path, cfg).unwrap();
1470 drop(mmap2);
1471 }
1472
1473 #[test]
1474 fn err_new_when_change_in_cfg() {
1475 let (_dir, path, mut cfg) = new_tmp();
1476
1477 // create new + close
1478 let mmap1 = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1479 drop(mmap1);
1480
1481 // update cfg + opne existing
1482 cfg.initial_count = INIT_SLOTS * 2;
1483 assert!(FrozenMMap::<u64>::new(path, cfg).is_err());
1484 }
1485
1486 #[test]
1487 fn ok_delete() {
1488 let (_dir, path, cfg) = new_tmp();
1489 let mut mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1490
1491 mmap.delete().unwrap();
1492 assert!(!mmap.core.file.exists().unwrap());
1493 }
1494
1495 #[test]
1496 fn err_delete_after_delete() {
1497 let (_dir, path, cfg) = new_tmp();
1498 let mut mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1499
1500 mmap.delete().unwrap();
1501 assert!(!mmap.core.file.exists().unwrap());
1502 assert!(mmap.delete().is_err());
1503 }
1504
1505 #[test]
1506 fn ok_drop_persists_when_dropped_before_bg_flush() {
1507 let (_dir, path, cfg) = new_tmp();
1508 const VAL: u64 = 0x0A;
1509
1510 {
1511 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1512 unsafe { mmap.write(0, |byte| *byte = VAL).unwrap() };
1513 drop(mmap);
1514 }
1515
1516 {
1517 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1518 let val = unsafe { mmap.read(0, |byte| *byte).unwrap() };
1519 assert_eq!(val, VAL);
1520 }
1521 }
1522 }
1523
1524 mod fm_validate_t {
1525 use super::*;
1526
1527 #[repr(C, align(8))]
1528 struct OkT {
1529 a: u64,
1530 b: u64,
1531 }
1532
1533 #[repr(C)]
1534 struct BadAlignT {
1535 a: u32,
1536 b: u32,
1537 }
1538
1539 #[repr(C, align(4))]
1540 struct BadSizeT {
1541 a: u32,
1542 b: u16,
1543 }
1544
1545 #[repr(C, align(8))]
1546 struct DropT(u64);
1547
1548 impl Drop for DropT {
1549 fn drop(&mut self) {}
1550 }
1551
1552 #[repr(C, align(8))]
1553 struct ZstT;
1554
1555 #[test]
1556 fn ok_validate_t() {
1557 assert!(FrozenMMap::<OkT>::validate_t().is_ok());
1558 }
1559
1560 #[test]
1561 fn err_validate_t_when_drop() {
1562 assert!(FrozenMMap::<DropT>::validate_t().is_err());
1563 }
1564
1565 #[test]
1566 fn err_validate_t_when_not_8_byte_aligned() {
1567 assert!(FrozenMMap::<BadAlignT>::validate_t().is_err());
1568 }
1569
1570 #[test]
1571 fn err_validate_t_when_size_not_multiple_of_8() {
1572 assert!(FrozenMMap::<BadSizeT>::validate_t().is_err());
1573 }
1574
1575 #[test]
1576 fn err_validate_t_when_zero_sized() {
1577 assert!(FrozenMMap::<ZstT>::validate_t().is_err());
1578 }
1579
1580 #[test]
1581 fn err_new_when_t_implements_drop() {
1582 let (_dir, path, cfg) = new_tmp();
1583 assert!(FrozenMMap::<DropT>::new(path, cfg).is_err());
1584 }
1585
1586 #[test]
1587 fn err_new_when_t_is_not_8_byte_aligned() {
1588 let (_dir, path, cfg) = new_tmp();
1589 assert!(FrozenMMap::<BadAlignT>::new(path, cfg).is_err());
1590 }
1591
1592 #[test]
1593 fn err_new_when_t_size_is_not_multiple_of_8() {
1594 let (_dir, path, cfg) = new_tmp();
1595 assert!(FrozenMMap::<BadSizeT>::new(path, cfg).is_err());
1596 }
1597
1598 #[test]
1599 fn err_new_when_t_is_zero_sized() {
1600 let (_dir, path, cfg) = new_tmp();
1601 assert!(FrozenMMap::<ZstT>::new(path, cfg).is_err());
1602 }
1603
1604 #[test]
1605 fn err_new_grown_when_t_implements_drop() {
1606 let (_dir, path, cfg) = new_tmp();
1607 assert!(FrozenMMap::<DropT>::new_grown(path, cfg, 1).is_err());
1608 }
1609
1610 #[test]
1611 fn err_new_grown_when_t_is_not_8_byte_aligned() {
1612 let (_dir, path, cfg) = new_tmp();
1613 assert!(FrozenMMap::<BadAlignT>::new_grown(path, cfg, 1).is_err());
1614 }
1615
1616 #[test]
1617 fn err_new_grown_when_t_size_is_not_multiple_of_8() {
1618 let (_dir, path, cfg) = new_tmp();
1619 assert!(FrozenMMap::<BadSizeT>::new_grown(path, cfg, 1).is_err());
1620 }
1621
1622 #[test]
1623 fn err_new_grown_when_t_is_zero_sized() {
1624 let (_dir, path, cfg) = new_tmp();
1625 assert!(FrozenMMap::<ZstT>::new_grown(path, cfg, 1).is_err());
1626 }
1627 }
1628
1629 mod fm_new_grown {
1630 use super::*;
1631
1632 #[test]
1633 fn ok_new_grown_updates_length() {
1634 let (_dir, path, cfg) = new_tmp();
1635
1636 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1637 assert_eq!(mmap.total_slots(), INIT_SLOTS);
1638 drop(mmap);
1639
1640 let mmap = FrozenMMap::<u64>::new_grown(path, cfg, 0x0A).unwrap();
1641 assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x0A);
1642 assert_eq!(mmap.core.curr_length, (INIT_SLOTS + 0x0A) * FrozenMMap::<u64>::SLOT_SIZE);
1643 }
1644
1645 #[test]
1646 fn err_new_grown_with_preexisting_instance() {
1647 let (_dir, path, cfg) = new_tmp();
1648
1649 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1650 assert_eq!(mmap.total_slots(), INIT_SLOTS);
1651
1652 assert!(FrozenMMap::<u64>::new_grown(path, cfg, 0x0A).is_err());
1653 }
1654
1655 #[test]
1656 fn ok_new_grown_cycle() {
1657 let (_dir, path, cfg) = new_tmp();
1658
1659 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1660 drop(mmap);
1661
1662 let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x100).unwrap();
1663 assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x100);
1664 drop(mmap);
1665
1666 let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x100).unwrap();
1667 assert_eq!(mmap.total_slots(), INIT_SLOTS + (2 * 0x100));
1668 drop(mmap);
1669
1670 let mmap = FrozenMMap::<u64>::new_grown(path, cfg, 0x100).unwrap();
1671 assert_eq!(mmap.total_slots(), INIT_SLOTS + (3 * 0x100));
1672 }
1673
1674 #[test]
1675 fn ok_write_reopen_grown_read() {
1676 let (_dir, path, cfg) = new_tmp();
1677
1678 {
1679 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1680 unsafe { mmap.write(0, |v| *v = 0xAA).unwrap() };
1681 }
1682
1683 {
1684 let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x10).unwrap();
1685 unsafe { mmap.write(0, |v| *v = 0xBB).unwrap() };
1686
1687 let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1688 assert_eq!(val, 0xBB);
1689 }
1690 }
1691
1692 #[test]
1693 fn ok_write_reopen_grown_read_cycle() {
1694 let (_dir, path, cfg) = new_tmp();
1695
1696 {
1697 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1698 unsafe { mmap.write(0, |v| *v = 1).unwrap() };
1699 }
1700
1701 for i in 0..2 {
1702 let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x10).unwrap();
1703 let idx = mmap.total_slots() - 1;
1704 unsafe { mmap.write(idx, |v| *v = (i + 2) as u64).unwrap() };
1705 drop(mmap);
1706 }
1707
1708 let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1709
1710 let base = unsafe { mmap.read(0, |v| *v).unwrap() };
1711 assert_eq!(base, 1);
1712
1713 let last_idx = mmap.total_slots() - 1;
1714 let last = unsafe { mmap.read(last_idx, |v| *v).unwrap() };
1715 assert_eq!(last, 3);
1716 }
1717
1718 #[test]
1719 fn err_new_grown_while_previous_instance_is_alive() {
1720 let (_dir, path, cfg) = new_tmp();
1721
1722 let _mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1723 let reopened = FrozenMMap::<u64>::new_grown(&path, cfg, 0x10);
1724
1725 assert!(reopened.is_err());
1726 }
1727 }
1728
1729 mod fm_write_read {
1730 use super::*;
1731
1732 #[test]
1733 fn ok_write_wait_read_cycle() {
1734 const VAL: u64 = 0xDEADC0DE;
1735 let (_dir, path, cfg) = new_tmp();
1736 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1737
1738 // write + sync
1739 let epoch = unsafe { mmap.write(0, |ptr| *ptr = VAL).unwrap() };
1740 mmap.wait_for_durability(epoch).unwrap();
1741
1742 // read + verify
1743 let val = unsafe { mmap.read(0, |ptr| *ptr).unwrap() };
1744 assert_eq!(val, VAL);
1745 }
1746
1747 #[test]
1748 fn ok_write_read_without_wait() {
1749 const VAL: u64 = 0xDEADC0DE;
1750 let (_dir, path, cfg) = new_tmp();
1751 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1752
1753 unsafe { mmap.write(0, |ptr| *ptr = VAL).unwrap() };
1754 let val = unsafe { mmap.read(0, |ptr| *ptr).unwrap() };
1755 assert_eq!(val, VAL);
1756 }
1757 }
1758
1759 mod fm_write_sync_read {
1760 use super::*;
1761
1762 #[test]
1763 fn ok_write_sync_read() {
1764 let (_dir, path, cfg) = new_tmp();
1765 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1766
1767 unsafe { mmap.write_sync(0, |v| *v = 0x4C).unwrap() };
1768
1769 let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1770 assert_eq!(val, 0x4C);
1771 }
1772
1773 #[test]
1774 fn ok_write_sync_wait_returns_immediately() {
1775 let (_dir, path, cfg) = new_tmp();
1776 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1777
1778 let _ = unsafe { mmap.write_sync(0, |v| *v = 0x6A).unwrap() };
1779
1780 let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1781 assert_eq!(val, 0x6A);
1782 }
1783
1784 #[test]
1785 fn ok_write_sync_followed_by_async_write() {
1786 let (_dir, path, cfg) = new_tmp();
1787
1788 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1789 unsafe { mmap.write_sync(0, |v| *v = 0x0A).unwrap() };
1790
1791 let epoch = unsafe { mmap.write(0, |v| *v = 0x14).unwrap() };
1792 mmap.wait_for_durability(epoch).unwrap();
1793
1794 let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1795 assert_eq!(val, 0x14);
1796 }
1797
1798 #[test]
1799 fn ok_write_sync_makes_prev_batch_durable() {
1800 let (_dir, path, cfg) = new_tmp();
1801 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1802
1803 let async_epoch = unsafe { mmap.write(0, |v| *v = 1).unwrap() };
1804 unsafe { mmap.write_sync(1, |v| *v = 2).unwrap() };
1805 mmap.wait_for_durability(async_epoch).unwrap();
1806
1807 let v1 = unsafe { mmap.read(0, |v| *v).unwrap() };
1808 let v2 = unsafe { mmap.read(1, |v| *v).unwrap() };
1809
1810 assert_eq!(v1, 1);
1811 assert_eq!(v2, 2);
1812 }
1813
1814 #[test]
1815 fn ok_write_sync_persists_across_reopen() {
1816 let (_dir, path, cfg) = new_tmp();
1817 const VAL: u64 = 0x1000;
1818
1819 {
1820 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1821 unsafe { mmap.write_sync(0, |v| *v = VAL).unwrap() };
1822 }
1823
1824 {
1825 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1826 let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1827 assert_eq!(val, VAL);
1828 }
1829 }
1830 }
1831
1832 mod fm_tx {
1833 use super::*;
1834
1835 #[test]
1836 fn ok_tx_basic_multi_write() {
1837 let (_dir, path, cfg) = new_tmp();
1838 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1839
1840 let mut tx = mmap.new_tx();
1841 unsafe {
1842 tx.write(0, |v| *v = 1).unwrap();
1843 tx.write(1, |v| *v = 2).unwrap();
1844 tx.write(2, |v| *v = 3).unwrap();
1845 }
1846
1847 let epoch = tx.commit().unwrap();
1848 mmap.wait_for_durability(epoch).unwrap();
1849
1850 let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1851 let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
1852 let v2 = unsafe { mmap.read(2, |v| *v).unwrap() };
1853
1854 assert_eq!((v0, v1, v2), (1, 2, 3));
1855 }
1856
1857 #[test]
1858 fn ok_tx_single_epoch() {
1859 let (_dir, path, cfg) = new_tmp();
1860 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1861
1862 let mut tx = mmap.new_tx();
1863 unsafe {
1864 tx.write(0, |v| *v = 0x0A).unwrap();
1865 tx.write(1, |v| *v = 0x14).unwrap();
1866 }
1867
1868 // NOTE: next write must have strictly higher epoch then current one
1869
1870 let epoch = tx.commit().unwrap();
1871 let next_epoch = unsafe { mmap.write(2, |v| *v = 0x1E).unwrap() };
1872
1873 assert!(next_epoch > epoch);
1874 }
1875
1876 #[test]
1877 fn err_tx_out_of_order() {
1878 let (_dir, path, cfg) = new_tmp();
1879 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1880
1881 let mut tx = mmap.new_tx();
1882 unsafe {
1883 tx.write(2, |v| *v = 1).unwrap();
1884 let res = tx.write(1, |v| *v = 2);
1885 assert!(res.is_err());
1886 }
1887 }
1888
1889 #[test]
1890 fn err_tx_duplicate_index() {
1891 let (_dir, path, cfg) = new_tmp();
1892 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1893
1894 let mut tx = mmap.new_tx();
1895 unsafe {
1896 tx.write(1, |v| *v = 1).unwrap();
1897 let res = tx.write(1, |v| *v = 2);
1898 assert!(res.is_err());
1899 }
1900 }
1901
1902 #[test]
1903 fn ok_tx_concurrent_non_overlapping() {
1904 let (_dir, path, cfg) = new_tmp();
1905 let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
1906
1907 let mut handles = Vec::new();
1908 for i in 0..2 {
1909 let mmap = mmap.clone();
1910 handles.push(thread::spawn(move || {
1911 let mut tx = mmap.new_tx();
1912
1913 unsafe {
1914 tx.write(i * 2, |v| *v = i as u64).unwrap();
1915 tx.write(i * 2 + 1, |v| *v = i as u64).unwrap();
1916 }
1917
1918 let epoch = tx.commit().unwrap();
1919 mmap.wait_for_durability(epoch).unwrap();
1920 }));
1921 }
1922
1923 for h in handles {
1924 h.join().unwrap();
1925 }
1926
1927 for i in 0..2 {
1928 let v0 = unsafe { mmap.read(i * 2, |v| *v).unwrap() };
1929 let v1 = unsafe { mmap.read(i * 2 + 1, |v| *v).unwrap() };
1930
1931 assert_eq!((v0, v1), (i as u64, i as u64));
1932 }
1933 }
1934
1935 #[test]
1936 fn ok_tx_overwrite_last_wins() {
1937 let (_dir, path, cfg) = new_tmp();
1938 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1939
1940 let mut tx = mmap.new_tx();
1941 unsafe {
1942 tx.write(0, |v| *v = 1).unwrap();
1943 }
1944
1945 tx.commit().unwrap();
1946
1947 let mut tx2 = mmap.new_tx();
1948 unsafe {
1949 tx2.write(0, |v| *v = 2).unwrap();
1950 }
1951
1952 let epoch = tx2.commit().unwrap();
1953 mmap.wait_for_durability(epoch).unwrap();
1954
1955 let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1956 assert_eq!(val, 2);
1957 }
1958
1959 #[test]
1960 fn ok_tx_persists_across_reopen() {
1961 let (_dir, path, cfg) = new_tmp();
1962
1963 {
1964 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1965
1966 let mut tx = mmap.new_tx();
1967 unsafe {
1968 tx.write(0, |v| *v = 0x3A).unwrap();
1969 tx.write(1, |v| *v = 0x54).unwrap();
1970 }
1971
1972 let epoch = tx.commit().unwrap();
1973 mmap.wait_for_durability(epoch).unwrap();
1974 }
1975
1976 {
1977 let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1978
1979 let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1980 let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
1981
1982 assert_eq!((v0, v1), (0x3A, 0x54));
1983 }
1984 }
1985 }
1986
1987 mod fm_durability {
1988 use super::*;
1989
1990 #[test]
1991 fn ok_wait_then_drop() {
1992 let (_dir, path, cfg) = new_tmp();
1993 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1994
1995 let epoch = unsafe { mmap.write(0, |v| *v = 7).unwrap() };
1996 mmap.wait_for_durability(epoch).unwrap();
1997
1998 drop(mmap);
1999 }
2000
2001 #[test]
2002 fn ok_epoch_monotonicity() {
2003 let (_dir, path, cfg) = new_tmp();
2004 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
2005
2006 let e1 = unsafe { mmap.write(0, |v| *v = 1).unwrap() };
2007 mmap.wait_for_durability(e1).unwrap();
2008
2009 let e2 = unsafe { mmap.write(0, |v| *v = 2).unwrap() };
2010 mmap.wait_for_durability(e2).unwrap();
2011 assert!(e2 >= e1);
2012 }
2013
2014 #[test]
2015 fn ok_wait_for_durability_with_multi_writers() {
2016 let (_dir, path, cfg) = new_tmp();
2017 let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
2018
2019 let mut handles = Vec::new();
2020 for _ in 0..2 {
2021 let mmap = mmap.clone();
2022 handles.push(thread::spawn(move || {
2023 let epoch = unsafe { mmap.write(0, |v| *v += 1).unwrap() };
2024 mmap.wait_for_durability(epoch).unwrap();
2025 }));
2026 }
2027
2028 for h in handles {
2029 h.join().unwrap();
2030 }
2031
2032 let val = unsafe { mmap.read(0, |v| *v).unwrap() };
2033 assert_eq!(val, 2);
2034 }
2035 }
2036
2037 mod fm_concurrency {
2038 use super::*;
2039
2040 #[test]
2041 fn ok_parallel_reads_with_diff_index() {
2042 let (_dir, path, cfg) = new_tmp();
2043 let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
2044
2045 unsafe { mmap.write(0, |v| *v = 0x10).unwrap() };
2046 unsafe { mmap.write(1, |v| *v = 0x20).unwrap() };
2047
2048 let t1 = {
2049 let mmap = mmap.clone();
2050 thread::spawn(move || unsafe { mmap.read(0, |v| *v).unwrap() })
2051 };
2052
2053 let t2 = {
2054 let mmap = mmap.clone();
2055 thread::spawn(move || unsafe { mmap.read(1, |v| *v).unwrap() })
2056 };
2057
2058 assert_eq!(t1.join().unwrap(), 0x10);
2059 assert_eq!(t2.join().unwrap(), 0x20);
2060 }
2061
2062 #[test]
2063 fn ok_multi_tx_drop_then_reopen_grown() {
2064 let (_dir, path, cfg) = new_tmp();
2065
2066 {
2067 let mmap = sync::Arc::new(FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap());
2068
2069 let mut handles = Vec::new();
2070 for i in 0..2u64 {
2071 let mmap = mmap.clone();
2072 handles.push(thread::spawn(move || {
2073 let _ = unsafe { mmap.write(i as usize, |v| *v = i + 1).unwrap() };
2074 }));
2075 }
2076
2077 for i in 0..2usize {
2078 let mmap = mmap.clone();
2079 handles.push(thread::spawn(move || {
2080 let _ = unsafe { mmap.read(i, |v| *v).unwrap() };
2081 }));
2082 }
2083
2084 for h in handles {
2085 h.join().unwrap();
2086 }
2087 }
2088
2089 {
2090 let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x10).unwrap();
2091 assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x10);
2092
2093 for i in 0..2u64 {
2094 let val = unsafe { mmap.read(i as usize, |v| *v).unwrap() };
2095 assert_eq!(val, i + 1);
2096 }
2097
2098 let idx = mmap.total_slots() - 1;
2099 unsafe { mmap.write(idx, |v| *v = 0xDEAD).unwrap() };
2100
2101 let val = unsafe { mmap.read(idx, |v| *v).unwrap() };
2102 assert_eq!(val, 0xDEAD);
2103 }
2104 }
2105 }
2106}