frozen_core/fmmap/mod.rs
1//! Custom implementation of `mmap(2)`
2//!
3//! ## Constraints
4//!
5//! [`FrozenMMap`] treats the mapped file as raw storage for values of `T`. Because of that,
6//! `T` must be a POD type which is safe to persist and later reinterpret from bytes.
7//!
8//! Required properties for `T`,
9//!
10//! - Must use `#[repr(C)]`
11//! - Should be 8-bytes aligned
12//! - Must not implement [`Drop`]
13//! - `size_of::<T>()` should be multiple of `8`
14//!
15//! *NOTE:* `T` must not contain heap owning or process-local pointers like [`Vec`], [`String`],
16//! [`Box`], references and function pointers, or other fields whose bit-pattern is not stable
17//! across reopen.
18//!
19//! These constrains are enforced as [`FrozenMMap`] does not serialize or deserialize values. It
20//! directly reads and writes `T` inside a memory mapped file. That means `T` must have a stable
21//! layout and must remain valid when the file is reopened in a later process.
22//!
23//! ## Example
24//!
25//! ```
26//! use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
27//!
28//! const MODULE_ID: u8 = 0;
29//!
30//! let dir = tempfile::tempdir().unwrap();
31//! let path = dir.path().join("tmp_frozen_mmap");
32//!
33//! let cfg = FrozenMMapCfg {
34//! module_id: MODULE_ID,
35//! initial_count: 0x0A,
36//! flush_duration: std::time::Duration::from_micros(0x96),
37//! };
38//!
39//! let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
40//! assert_eq!(mmap.total_slots(), 0x0A);
41//!
42//! let epoch = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
43//! mmap.wait_for_durability(epoch).unwrap();
44//!
45//! let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
46//! assert_eq!(val, 0xDEADC0DE);
47//!
48//! drop(mmap);
49//!
50//! let reopened = FrozenMMap::<u64>::new_grown(&path, cfg, 0x05).unwrap();
51//! assert_eq!(reopened.total_slots(), 0x0A + 0x05);
52//!
53//! let val = unsafe { reopened.read(0, |v| *v) }.unwrap();
54//! assert_eq!(val, 0xDEADC0DE);
55//! ```
56
57#[cfg(any(target_os = "linux", target_os = "macos"))]
58mod posix;
59
60use crate::{
61 error::{FrozenError, FrozenResult},
62 ffile::{FrozenFile, FrozenFileCfg},
63 hints,
64};
65use std::{
66 fmt,
67 sync::{self, atomic},
68 thread, time,
69};
70
71/// type for `epoch` used by write ops
72pub type TEpoch = u64;
73
74#[cfg(any(target_os = "linux", target_os = "macos"))]
75type TMap = posix::POSIXMMap;
76
77/// Error codes for [`FrozenMMap`]
78pub(in crate::fmmap) mod err {
79 use crate::error::{ErrCode, FrozenError, FrozenResult};
80
81 /// Domain Id for [`FrozenMMap`] is **18**
82 const ERRDOMAIN: u8 = 0x12;
83
84 /// module id used for [`FrozenMMap`]
85 pub static MID: std::sync::OnceLock<u8> = std::sync::OnceLock::new();
86
87 #[cfg(not(test))]
88 #[inline(always)]
89 pub fn mid() -> &'static u8 {
90 MID.get().unwrap()
91 }
92
93 #[cfg(test)]
94 #[inline(always)]
95 pub fn mid() -> &'static u8 {
96 MID.get_or_init(|| 0)
97 }
98
99 /// internal fuck up (hault and catch fire)
100 pub const HCF: ErrCode = ErrCode::new(0x02, "hault and catch fire");
101
102 /// unknown error (fallback)
103 pub const UNK: ErrCode = ErrCode::new(0x04, "unknown error");
104
105 /// no more memory available
106 pub const NMM: ErrCode = ErrCode::new(0x06, "not enough memory available on the device");
107
108 /// syncing error
109 pub const SYN: ErrCode = ErrCode::new(0x08, "failed to sync/flush data to storage device");
110
111 /// no write/read perm
112 pub const PRM: ErrCode = ErrCode::new(0x0A, "missing permissions for IO");
113
114 /// flush_tx error (panic inside)
115 pub const TXE: ErrCode = ErrCode::new(0x0C, "flush_tx paniced inside");
116
117 /// flush_tx error (unable to spawn)
118 pub const FXE: ErrCode = ErrCode::new(0x10, "unable to spawn flush_tx");
119
120 /// type `T` implements drop
121 pub const DRP: ErrCode = ErrCode::new(0x12, "type T must not implement `Drop`");
122
123 /// type `T` is not 8 bytes aligned
124 pub const ALN: ErrCode = ErrCode::new(0x14, "type T must be 8-bytes aligned");
125
126 /// `size_of::<T>()` is not multiple of 8
127 pub const SZE: ErrCode = ErrCode::new(0x16, "`size_of::<T>()` must be multiple of 8 bytes");
128
129 /// type `T` must not be zero sized
130 pub const ZRO: ErrCode = ErrCode::new(0x18, "type T must not be zero sized");
131
132 #[inline]
133 pub(in crate::fmmap) fn new_err<R, E: std::fmt::Display>(code: ErrCode, error: E) -> FrozenResult<R> {
134 let err = FrozenError::new_raw(*mid(), ERRDOMAIN, code, error);
135 Err(err)
136 }
137
138 #[inline]
139 pub(in crate::fmmap) fn new_err_default<R>(code: ErrCode) -> FrozenResult<R> {
140 let err = FrozenError::new_raw(*mid(), ERRDOMAIN, code, "");
141 Err(err)
142 }
143
144 #[inline]
145 pub(in crate::fmmap) fn new_err_raw<E: std::fmt::Display>(code: ErrCode, error: E) -> FrozenError {
146 FrozenError::new_raw(*mid(), ERRDOMAIN, code, error)
147 }
148}
149
150/// Config for [`FrozenMMap`]
151#[derive(Debug, Clone)]
152pub struct FrozenMMapCfg {
153 /// Identifier used for error propagation by [`frozen_core::error::FrozenError`]
154 pub module_id: u8,
155
156 /// Number of slots to pre-allocate when [`FrozenMMap`] is initialized
157 ///
158 /// Each slot has size of [`FrozenMMap::<T>::SLOT_SIZE`], where initial file length will
159 /// be `chunk_size * initial_count` (bytes)
160 pub initial_count: usize,
161
162 /// Time interval used by flusher tx, to batch write ops into a durable window and sync them
163 /// together, where all write ops in certain time interval falls into a single durable window
164 pub flush_duration: time::Duration,
165}
166
167/// Custom implementation of `mmap(2)`
168///
169/// ## Constraints
170///
171/// [`FrozenMMap`] treats the mapped file as raw storage for values of `T`. Because of that, `T`
172/// must be a POD type which is safe to persist and later reinterpret from bytes.
173///
174/// Required properties for `T`,
175///
176/// - Must use `#[repr(C)]`
177/// - Should be 8-bytes aligned
178/// - Must not implement [`Drop`]
179/// - `size_of::<T>()` should be multiple of `8`
180///
181/// *NOTE:* `T` must not contain heap owning or process-local pointers like [`Vec`], [`String`],
182/// [`Box`], references and function pointers, or other fields whose bit-pattern is not stable
183/// across reopen.
184///
185/// These constrains are enforced as [`FrozenMMap`] does not serialize or deserialize values. It
186/// directly reads and writes `T` inside a memory mapped file. That means `T` must have a stable
187/// layout and must remain valid when the file is reopened in a later process.
188///
189/// ## Example
190///
191/// ```
192/// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
193///
194/// const MODULE_ID: u8 = 0;
195///
196/// let dir = tempfile::tempdir().unwrap();
197/// let path = dir.path().join("tmp_frozen_mmap");
198///
199/// let cfg = FrozenMMapCfg {
200/// module_id: MODULE_ID,
201/// initial_count: 0x0A,
202/// flush_duration: std::time::Duration::from_micros(0x96),
203/// };
204///
205/// let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
206/// assert_eq!(mmap.total_slots(), 0x0A);
207///
208/// let epoch = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
209/// mmap.wait_for_durability(epoch).unwrap();
210///
211/// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
212/// assert_eq!(val, 0xDEADC0DE);
213///
214/// drop(mmap);
215///
216/// let reopened = FrozenMMap::<u64>::new_grown(&path, cfg, 0x05).unwrap();
217/// assert_eq!(reopened.total_slots(), 0x0A + 0x05);
218///
219/// let val = unsafe { reopened.read(0, |v| *v) }.unwrap();
220/// assert_eq!(val, 0xDEADC0DE);
221/// ```
222#[derive(Debug)]
223pub struct FrozenMMap<T>
224where
225 T: Sized + Send + Sync,
226{
227 core: sync::Arc<Core>,
228 tx: Option<thread::JoinHandle<()>>,
229 _t: core::marker::PhantomData<T>,
230}
231
232unsafe impl<T> Send for FrozenMMap<T> where T: Sized + Send + Sync {}
233unsafe impl<T> Sync for FrozenMMap<T> where T: Sized + Send + Sync {}
234
235impl<T> FrozenMMap<T>
236where
237 T: Sized + Send + Sync,
238{
239 /// Memory space required for each slot of [`T`] in [`FrozenMMap`]
240 pub const SLOT_SIZE: usize = std::mem::size_of::<T>();
241
242 /// Create a new [`FrozenMMap`] instance w/ given [`FrozenMMapCfg`]
243 ///
244 /// ## Multiple Instances
245 ///
246 /// For each [`FrozenMMap`] instance, we acquire an exclusive lock from the kernal for the
247 /// underlying [`FrozenFile`], when trying to create multiple instances of [`FrozenMMap`], an
248 /// error will be thrown.
249 ///
250 /// ## Capacity Growth
251 ///
252 /// [`FrozenMMap`] does not support in-place growth of a live mapping, to increase capacity,
253 /// drop the current instance and reopen w/ [`FrozenMMap::open_grown`] which provides memory
254 /// mapping over grown capacity.
255 ///
256 /// ## [`FrozenMMapCfg`]
257 ///
258 /// All configs for [`FrozenMMap`] are stored in [`FrozenMMapCfg`]
259 ///
260 /// ## Working
261 ///
262 /// We first create a new [`FrozenFile`] if note already, then map the entire file using
263 /// `mmap(2)`, the entire file must read/write `T`, which also should stay constant for the
264 /// entire lifetime of file.
265 ///
266 /// ## Important
267 ///
268 /// The `cfg` must not change any of its properties for the entire life of [`FrozenFile`],
269 /// which is used under the hood, one must use config stores like [`Rta`](https://crates.io/crates/rta)
270 /// to store config.
271 ///
272 /// ## Example
273 ///
274 /// ```
275 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
276 ///
277 /// const MODULE_ID: u8 = 0;
278 ///
279 /// let dir = tempfile::tempdir().unwrap();
280 /// let path = dir.path().join("tmp_frozen_mmap");
281 ///
282 /// let cfg = FrozenMMapCfg {
283 /// module_id: MODULE_ID,
284 /// initial_count: 0x0A,
285 /// flush_duration: std::time::Duration::from_micros(0x96),
286 /// };
287 ///
288 /// let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
289 /// assert_eq!(mmap.total_slots(), 0x0A);
290 ///
291 /// let epoch = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
292 /// mmap.wait_for_durability(epoch).unwrap();
293 ///
294 /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
295 /// assert_eq!(val, 0xDEADC0DE);
296 /// ```
297 pub fn new<P: AsRef<std::path::Path>>(path: P, cfg: FrozenMMapCfg) -> FrozenResult<Self> {
298 Self::validate_t()?;
299 let (file, curr_length) = Self::open_file(path.as_ref().to_path_buf(), &cfg)?;
300 let total_slots = curr_length / Self::SLOT_SIZE;
301
302 // NOTE: The value is used for error logging and is initialized only once, as `OnceLock`
303 // guarantees that the first caller sets the value and all subsequent calls reuse it
304 let _ = err::MID.get_or_init(|| cfg.module_id);
305
306 let mmap = unsafe { TMap::new(file.fd(), curr_length) }?;
307 let core = sync::Arc::new(Core::new(mmap, file, cfg.flush_duration, curr_length, total_slots));
308
309 // INFO: we spawn the thread for background sync
310 let tx = Core::spawn_tx(core.clone())?;
311
312 Ok(Self { core, tx: Some(tx), _t: core::marker::PhantomData })
313 }
314
315 /// Create a new [`FrozenMMap`] instance w/ given [`FrozenMMapCfg`], while growing the
316 /// underlying [`FrozenFile`] by `additional_slots` before creating memory mapping
317 ///
318 /// ## Multiple Instances
319 ///
320 /// For each [`FrozenMMap`] instance, we acquire an exclusive lock from the kernal for the
321 /// underlying [`FrozenFile`], when trying to create multiple instances of [`FrozenMMap`], an
322 /// error will be thrown.
323 ///
324 /// ## Why not create a [`FrozenMMap::grow`] call?
325 ///
326 /// Previously when [`FrozenMMap::grow`] was attempted, it was observed that, resizing an active
327 /// memory mapping in place is tricky in concurrent code, as some threads would still hold
328 /// stale/unmapped pointers to mmap due to preemption from the OS schedular
329 ///
330 /// So, instead of remmaping a live instance, the current API performs growth during open,
331 /// making capacity expansion an explicit lifecycle operation, and not a side effect on a live
332 /// instance.
333 ///
334 /// ## [`FrozenMMapCfg`]
335 ///
336 /// All configs for [`FrozenMMap`] are stored in [`FrozenMMapCfg`]
337 ///
338 /// ## Working
339 ///
340 /// We first create a new [`FrozenFile`] if note already, then we grow the file using
341 /// [`FrozenFile::grow`] then map the entire file using `mmap(2)`, the entire file must
342 /// read/write `T`, which also should stay constant for the entire lifetime of file.
343 ///
344 /// ## Important
345 ///
346 /// The `cfg` must not change any of its properties for the entire life of [`FrozenFile`],
347 /// which is used under the hood, one must use config stores like
348 /// [`Rta`](https://crates.io/crates/rta) to store config.
349 ///
350 /// ## Example
351 ///
352 /// ```
353 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
354 ///
355 /// const MODULE_ID: u8 = 0;
356 ///
357 /// let dir = tempfile::tempdir().unwrap();
358 /// let path = dir.path().join("tmp_frozen_mmap");
359 ///
360 /// let cfg = FrozenMMapCfg {
361 /// module_id: MODULE_ID,
362 /// initial_count: 0x0A,
363 /// flush_duration: std::time::Duration::from_micros(0x96),
364 /// };
365 ///
366 /// let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x0A).unwrap();
367 /// assert_eq!(mmap.total_slots(), 0x0A * 2);
368 ///
369 /// let epoch = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
370 /// mmap.wait_for_durability(epoch).unwrap();
371 ///
372 /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
373 /// assert_eq!(val, 0xDEADC0DE);
374 /// ```
375 pub fn new_grown<P: AsRef<std::path::Path>>(
376 path: P,
377 cfg: FrozenMMapCfg,
378 additional_slots: usize,
379 ) -> FrozenResult<Self> {
380 Self::validate_t()?;
381 let (file, _) = Self::open_file(path.as_ref().to_path_buf(), &cfg)?;
382
383 // we grow the underlying FrozenFile as requested
384 file.grow(additional_slots)?;
385 let curr_length = file.length()?; // we must read the updated (grown) length
386 let total_slots = curr_length / Self::SLOT_SIZE;
387
388 // NOTE: The value is used for error logging and is initialized only once, as `OnceLock`
389 // guarantees that the first caller sets the value and all subsequent calls reuse it
390 let _ = err::MID.get_or_init(|| cfg.module_id);
391
392 let mmap = unsafe { TMap::new(file.fd(), curr_length) }?;
393 let core = sync::Arc::new(Core::new(mmap, file, cfg.flush_duration, curr_length, total_slots));
394
395 // INFO: we spawn the thread for background sync
396 let tx = Core::spawn_tx(core.clone())?;
397
398 Ok(Self { core, tx: Some(tx), _t: core::marker::PhantomData })
399 }
400
401 /// Create/open a new [`FrozenFile`] instance
402 fn open_file(path: std::path::PathBuf, cfg: &FrozenMMapCfg) -> FrozenResult<(FrozenFile, usize)> {
403 let ff_cfg = FrozenFileCfg {
404 path,
405 module_id: cfg.module_id,
406 buffer_size: Self::SLOT_SIZE,
407 initial_available_buffers: cfg.initial_count,
408 };
409
410 let file = FrozenFile::new(ff_cfg)?;
411 let curr_length = file.length()?;
412
413 Ok((file, curr_length))
414 }
415
416 #[inline]
417 fn validate_t() -> FrozenResult<()> {
418 if std::mem::needs_drop::<T>() {
419 return err::new_err_default(err::DRP);
420 }
421
422 let align = std::mem::align_of::<T>();
423 if align != 8 {
424 return err::new_err_default(err::ALN);
425 }
426
427 let size = std::mem::size_of::<T>();
428 if size == 0 {
429 return err::new_err_default(err::ZRO);
430 }
431
432 if size % 8 != 0 {
433 return err::new_err_default(err::SZE);
434 }
435
436 Ok(())
437 }
438
439 /// Blocks until given `epoch` becomes durable
440 ///
441 /// ## Batching
442 ///
443 /// With respect to `flush_duration`, all write ops are batched before sync, which is executed
444 /// by flusher tx working in background, while each write is assigned w/ current durable epoch,
445 /// and all writes which observe the exact same epoch, belong to the same durability window, and
446 /// are all sync'ed together
447 ///
448 /// When a background sync succeeds, the internal durable epoch is incremented, indicating that
449 /// all writes that observed the previous epoch are now durable on disk
450 ///
451 /// ## Example
452 ///
453 /// ```
454 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
455 ///
456 /// const MODULE_ID: u8 = 0;
457 ///
458 /// let dir = tempfile::tempdir().unwrap();
459 /// let path = dir.path().join("tmp_wait_epoch");
460 ///
461 /// let cfg = FrozenMMapCfg {
462 /// module_id: MODULE_ID,
463 /// initial_count: 0x04,
464 /// flush_duration: std::time::Duration::from_micros(0x60),
465 /// };
466 ///
467 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
468 ///
469 /// let epoch = unsafe { mmap.write(0, |v| *v = 0x8A) }.unwrap();
470 /// mmap.wait_for_durability(epoch).unwrap();
471 ///
472 /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
473 /// assert_eq!(val, 0x8A);
474 /// ```
475 pub fn wait_for_durability(&self, epoch: u64) -> FrozenResult<()> {
476 if let Some(sync_err) = self.core.get_sync_error() {
477 return Err(sync_err);
478 }
479
480 let durable_epoch = self.core.durable_epoch.load(atomic::Ordering::Acquire);
481 if durable_epoch >= epoch {
482 return Ok(());
483 }
484
485 let mut guard = self.core.acquire_durable_lock();
486 loop {
487 if let Some(sync_err) = self.core.get_sync_error() {
488 return Err(sync_err);
489 }
490
491 if self.core.durable_epoch.load(atomic::Ordering::Acquire) >= epoch {
492 return Ok(());
493 }
494
495 // NOTE: See [`Core::acquire_durable_lock`] implementation for rationale behind poison recovery
496 guard = self.core.durable_cv.wait(guard).unwrap_or_else(|e| e.into_inner());
497 }
498 }
499
500 /// Read a `T` at given `index` via callback (`f`)
501 ///
502 /// ## Concurrency
503 ///
504 /// Internally, [`FrozenMMap`] implements per-slot locking, so concurrent reads and writes for
505 /// at same index is atomic and thread safe, while operations on different indices may proceed
506 /// fully in parallel
507 ///
508 /// ## Safety
509 ///
510 /// The caller must ensure following:
511 ///
512 /// - given `index` is within bounds
513 /// - underlying memory contains a valid instance of `T`
514 /// - provided callback `f` must not,
515 /// - write through the pointer
516 /// - store or leak pointer beyound there lifetime
517 ///
518 /// Violating any of the above may result in undefined behavior
519 ///
520 /// ## Example
521 ///
522 /// ```
523 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
524 ///
525 /// const MODULE_ID: u8 = 0;
526 ///
527 /// let dir = tempfile::tempdir().unwrap();
528 /// let path = dir.path().join("tmp_read_mmap");
529 ///
530 /// let cfg = FrozenMMapCfg {
531 /// module_id: MODULE_ID,
532 /// initial_count: 0x02,
533 /// flush_duration: std::time::Duration::from_micros(0x60),
534 /// };
535 ///
536 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
537 ///
538 /// let epoch = unsafe { mmap.write(0, |v| *v = 0x0A) }.unwrap();
539 /// mmap.wait_for_durability(epoch).unwrap();
540 ///
541 /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
542 /// assert_eq!(val, 0x0A);
543 /// ```
544 #[inline(always)]
545 pub unsafe fn read<R>(&self, index: usize, f: impl FnOnce(*const T) -> R) -> FrozenResult<R> {
546 let offset = Self::SLOT_SIZE * index;
547 let _lock = self.core.locks.lock(index);
548
549 // NOTE: We do avoid acquiring io_lock for read ops to increase the throughput (under the
550 // assumption of OS guarantees visibility)
551
552 let ptr = unsafe { self.core.map.as_ptr(offset) };
553 Ok(f(ptr))
554 }
555
556 /// Write/update a `T` at given `index` via callback (`f`)
557 ///
558 /// ## Concurrency
559 ///
560 /// Internally, [`FrozenMMap`] implements per-slot locking, so concurrent reads and writes for
561 /// at same index is atomic and thread safe, while operations on different indices may proceed
562 /// fully in parallel
563 ///
564 /// ## Safety
565 ///
566 /// The caller must ensure following:
567 ///
568 /// - given `index` is within bounds
569 /// - underlying memory contains a valid instance of `T`
570 /// - provided callback `f` must not store or leak pointer beyound there lifetime
571 ///
572 /// Violating any of the above may result in undefined behavior
573 ///
574 /// ## Example
575 ///
576 /// ```
577 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
578 ///
579 /// const MODULE_ID: u8 = 0;
580 ///
581 /// let dir = tempfile::tempdir().unwrap();
582 /// let path = dir.path().join("tmp_write_mmap");
583 ///
584 /// let cfg = FrozenMMapCfg {
585 /// module_id: MODULE_ID,
586 /// initial_count: 0x02,
587 /// flush_duration: std::time::Duration::from_micros(0x96),
588 /// };
589 ///
590 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
591 ///
592 /// let epoch = unsafe {mmap.write(1, |v| *v = 0x2B) }.unwrap();
593 /// mmap.wait_for_durability(epoch).unwrap();
594 ///
595 /// let val = unsafe { mmap.read(1, |v| *v) }.unwrap();
596 /// assert_eq!(val, 0x2B);
597 /// ```
598 #[inline(always)]
599 pub unsafe fn write(&self, index: usize, f: impl FnOnce(*mut T)) -> FrozenResult<TEpoch> {
600 // propagate prev errors
601 if let Some(err) = self.core.get_sync_error() {
602 return Err(err);
603 }
604
605 let offset = Self::SLOT_SIZE * index;
606
607 let _guard = self.core.acquire_io_lock();
608 let _lock = self.core.locks.lock(index);
609
610 let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
611 f(ptr);
612
613 self.core.dirty.store(true, atomic::Ordering::Release);
614 let epoch = self.core.incr_curr_epoch();
615
616 Ok(epoch)
617 }
618
619 /// Write/update a `T` at given `index` via callback (`f`) w/ instant durability
620 ///
621 /// This function performs a blocking hard-sync, unlike [`FrozenMMap::write`], the update is
622 /// immediately persisted to the underlying storage device
623 ///
624 /// ## Concurrency
625 ///
626 /// Internally, [`FrozenMMap`] implements per-slot locking, so concurrent reads and writes for
627 /// at same index is atomic and thread safe, while operations on different indices may proceed
628 /// fully in parallel.
629 ///
630 /// ## Safety
631 ///
632 /// The caller must ensure following:
633 ///
634 /// - given `index` is within bounds
635 /// - underlying memory contains a valid instance of `T`
636 /// - provided callback `f` must not store or leak pointer beyound there lifetime
637 ///
638 /// Violating any of the above may result in undefined behavior
639 ///
640 /// ## Example
641 ///
642 /// ```
643 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
644 ///
645 /// const MODULE_ID: u8 = 0;
646 ///
647 /// let dir = tempfile::tempdir().unwrap();
648 /// let path = dir.path().join("tmp_write_sync");
649 ///
650 /// let cfg = FrozenMMapCfg {
651 /// module_id: MODULE_ID,
652 /// initial_count: 0x02,
653 /// flush_duration: std::time::Duration::from_micros(0x96),
654 /// };
655 ///
656 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
657 /// unsafe { mmap.write_sync(0, |v| *v = 0xC0DE) }.unwrap();
658 ///
659 /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
660 /// assert_eq!(val, 0xC0DE);
661 /// ```
662 #[inline(always)]
663 pub unsafe fn write_sync(&self, index: usize, f: impl FnOnce(*mut T)) -> FrozenResult<()> {
664 // propagate prev errors
665 if let Some(err) = self.core.get_sync_error() {
666 return Err(err);
667 }
668
669 let offset = Self::SLOT_SIZE * index;
670
671 // block flush_tx scheduling
672 let _flush_guard = self.core.acquire_lock();
673
674 // we use exlusive lock as we perform blocking hard sync
675 let _guard = self.core.acquire_exclusive_io_lock();
676
677 let _lock = self.core.locks.lock(index);
678 let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
679 f(ptr);
680
681 // blocking hard sync
682 self.core.sync()?;
683
684 // NOTE: we hard sync we flushed an entier batch, so we can skip the sync via flush_tx as
685 // the current batch is durable
686
687 self.core.mark_epoch_durable();
688 let prev = self.core.dirty.swap(false, atomic::Ordering::AcqRel);
689
690 // NOTE: we must also notify cv's waiting for durability (we skip if there was no batch to
691 // sync)
692 if prev {
693 let _g = self.core.acquire_durable_lock();
694 self.core.durable_cv.notify_all();
695 }
696
697 Ok(())
698 }
699
700 /// Read current available count of slots, where each slot has size of [`FrozenMMap::<T>::SLOT_SIZE`]
701 ///
702 /// ## Working
703 ///
704 /// This call performs a syscall to fetch current length of [`FrozenFile`] from fs, as the
705 /// current length of the file is not cached anywhere in the pipeline to avoid TOCTAU race
706 /// conditions
707 ///
708 /// ## Example
709 ///
710 /// ```
711 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
712 ///
713 /// const MODULE_ID: u8 = 0;
714 ///
715 /// let dir = tempfile::tempdir().unwrap();
716 /// let path = dir.path().join("tmp_grow_mmap");
717 ///
718 /// let cfg = FrozenMMapCfg {
719 /// module_id: MODULE_ID,
720 /// initial_count: 0x02,
721 /// flush_duration: std::time::Duration::from_micros(0x96),
722 /// };
723 ///
724 /// let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
725 /// assert_eq!(mmap.total_slots(), 0x02);
726 ///
727 /// drop(mmap);
728 ///
729 /// let mmap = FrozenMMap::<u64>::new_grown(&path, cfg, 0x03).unwrap();
730 /// assert_eq!(mmap.total_slots(), 0x02 + 0x03);
731 /// ```
732 #[inline]
733 pub fn total_slots(&self) -> usize {
734 self.core.curr_length / Self::SLOT_SIZE
735 }
736
737 /// Read the total memory footprint (in bytes) used by [`FrozenMMap`]
738 ///
739 /// *NOTE:* This is an approzimation of memory used, actual RSS may differ depending on paging
740 /// and OS
741 ///
742 /// ## Example
743 ///
744 /// ```
745 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
746 ///
747 /// const MODULE_ID: u8 = 0;
748 ///
749 /// let dir = tempfile::tempdir().unwrap();
750 /// let path = dir.path().join("tmp_mem_usage");
751 ///
752 /// let cfg = FrozenMMapCfg {
753 /// module_id: MODULE_ID,
754 /// initial_count: 0x10,
755 /// flush_duration: std::time::Duration::from_micros(0x60),
756 /// };
757 ///
758 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
759 ///
760 /// let bytes = mmap.memory_usage();
761 /// assert!(bytes >= mmap.total_slots() * std::mem::size_of::<u64>());
762 /// ```
763 #[inline]
764 pub fn memory_usage(&self) -> usize {
765 // memory of mem-maped region
766 let mmap_bytes = self.core.curr_length;
767
768 // memory used for locking
769 let lock_bytes = self.core.locks.0.len() * std::mem::size_of::<atomic::AtomicU8>();
770
771 mmap_bytes + lock_bytes
772 }
773
774 /// Create a new [`FMTransaction`] context for grouping multi write ops into a single atomic
775 /// operation
776 ///
777 /// ## Overview
778 ///
779 /// The use of [`FMTransaction`] allows to group multiple write ops into a single atomic
780 /// operation, hence creating a transactional write operation, which gives following guarantees:
781 ///
782 /// - All write ops succeed together
783 /// - Single epoch to track durability of all writes ops
784 /// - Same durability guarantee for all the included write ops
785 ///
786 /// Simply, this preserves atomic durability semantics for multi index updates
787 ///
788 /// ## Example
789 ///
790 /// ```
791 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
792 ///
793 /// const MODULE_ID: u8 = 0;
794 ///
795 /// let dir = tempfile::tempdir().unwrap();
796 /// let path = dir.path().join("tmp_tx");
797 ///
798 /// let cfg = FrozenMMapCfg {
799 /// module_id: MODULE_ID,
800 /// initial_count: 0x0A,
801 /// flush_duration: std::time::Duration::from_micros(50),
802 /// };
803 ///
804 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
805 ///
806 /// let mut tx = mmap.new_tx();
807 /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
808 /// unsafe { tx.write(1, |v| *v = 0x14) }.unwrap();
809 ///
810 /// let epoch = tx.commit().unwrap();
811 /// mmap.wait_for_durability(epoch).unwrap();
812 ///
813 /// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
814 /// let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
815 ///
816 /// assert_eq!((v0, v1), (0x0A, 0x14));
817 /// ```
818 #[inline]
819 pub fn new_tx(&self) -> FMTransaction<'_, T> {
820 FMTransaction { core: &self.core, ops_vec: Vec::new() }
821 }
822
823 /// Delete the underlying [`FrozenFile`] used for [`FrozenMMap`] from fs
824 ///
825 /// ## Working
826 ///
827 /// When `delete` is called, all read, write, and (background) sync ops are paused
828 /// (indefinitely), whule deletion is done with following steps:
829 ///
830 /// - acquire an exclusive `io_lock` (all other ops are paused indefinitely)
831 /// - if any batch is pending for sync,
832 /// - swap the flag
833 /// - call sync manually
834 /// - incr epoch and update cv
835 /// - brodcast closing so flusher tx could wrap up
836 /// - `munmap(2)` current mapping
837 /// - call delete on [`FrozenFile`]
838 ///
839 /// ## Example
840 ///
841 /// ```
842 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
843 ///
844 /// const MODULE_ID: u8 = 0;
845 ///
846 /// let dir = tempfile::tempdir().unwrap();
847 /// let path = dir.path().join("tmp_delete_mmap");
848 ///
849 /// let cfg = FrozenMMapCfg {
850 /// module_id: MODULE_ID,
851 /// initial_count: 0x04,
852 /// flush_duration: std::time::Duration::from_micros(0x96),
853 /// };
854 ///
855 /// let mut mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
856 /// mmap.delete().unwrap();
857 /// assert!(!path.exists());
858 /// ```
859 pub fn delete(&mut self) -> FrozenResult<()> {
860 // NOTE: we must broadcast that the close is happening to allow flusher tx to wrap up
861 self.core.dirty.store(false, atomic::Ordering::Release);
862 self.core.closed.store(true, atomic::Ordering::Release);
863 self.core.durable_cv.notify_one();
864
865 if let Some(handle) = self.tx.take() {
866 let _ = handle.join();
867 }
868
869 // pause all new write ops
870 let _lock = self.core.acquire_exclusive_io_lock();
871
872 self.munmap()?;
873 self.core.file.delete()
874 }
875
876 #[inline]
877 fn munmap(&self) -> FrozenResult<()> {
878 let length = self.core.curr_length;
879 unsafe { self.core.map.unmap(length) }
880 }
881}
882
883impl<T> Drop for FrozenMMap<T>
884where
885 T: Sized + Send + Sync,
886{
887 fn drop(&mut self) {
888 let is_closed = self.core.closed.swap(true, atomic::Ordering::Release);
889 self.core.cv.notify_one(); // notify flusher tx to shut
890
891 if let Some(handle) = self.tx.take() {
892 let _ = handle.join();
893 }
894
895 // we must acquire an exclusive lock, to prevent dropping while sync, growing or any io ops
896 let _io_lock = self.core.acquire_exclusive_io_lock();
897
898 // free up the boxed error (if any)
899 let ptr = self.core.error.swap(std::ptr::null_mut(), atomic::Ordering::AcqRel);
900 if !ptr.is_null() {
901 unsafe {
902 drop(Box::from_raw(ptr));
903 }
904 }
905
906 if !is_closed {
907 let _ = self.munmap();
908 }
909 }
910}
911
912impl<T> fmt::Display for FrozenMMap<T>
913where
914 T: Sized + Send + Sync,
915{
916 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
917 write!(
918 f,
919 "FrozenMMap{{fd: {}, total_slots: {}, len: {}}}",
920 self.core.file.fd(),
921 self.total_slots(),
922 self.core.curr_length,
923 )
924 }
925}
926
927/// A context for grouping multi write ops into a single atomic operation
928///
929/// ## Overview
930///
931/// Use of [`FMTransaction`] allows to group multiple write ops into a single atomic operation.
932///
933/// - All included writes are applied together
934/// - Single epoch is assinged for an entier transaction
935/// - Durability guarantee is same for all included write ops
936///
937/// ## Example
938///
939/// ```
940/// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
941///
942/// const MODULE_ID: u8 = 0;
943///
944/// let dir = tempfile::tempdir().unwrap();
945/// let path = dir.path().join("tmp_tx");
946///
947/// let cfg = FrozenMMapCfg {
948/// module_id: MODULE_ID,
949/// initial_count: 0x0A,
950/// flush_duration: std::time::Duration::from_micros(50),
951/// };
952///
953/// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
954///
955/// let mut tx = mmap.new_tx();
956/// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
957/// unsafe { tx.write(1, |v| *v = 0x14) }.unwrap();
958/// unsafe { tx.write(2, |v| *v = 0x18) }.unwrap();
959///
960/// let epoch = tx.commit().unwrap();
961/// mmap.wait_for_durability(epoch).unwrap();
962///
963/// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
964/// let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
965/// let v2 = unsafe { mmap.read(2, |v| *v).unwrap() };
966///
967/// assert_eq!((v0, v1, v2), (0x0A, 0x14, 0x18));
968/// ```
969pub struct FMTransaction<'a, T> {
970 core: &'a Core,
971 ops_vec: Vec<(usize, Box<dyn FnOnce(*mut T) + 'a>)>,
972}
973
974impl<'a, T> FMTransaction<'a, T> {
975 /// Append a write op into the [`FMTransaction`]
976 ///
977 /// ## Requirements
978 ///
979 /// Write ops must follow these safety requirements,
980 ///
981 /// - No duplicate indices
982 /// - No out-of-order writes (must be incremental)
983 ///
984 /// Violating this constraint would result in [`FrozenError`]
985 ///
986 /// ## Safety
987 ///
988 /// Same safety requirements as [`FrozenMMap::write`] apply here
989 ///
990 /// ## Example
991 ///
992 /// ```
993 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
994 ///
995 /// const MODULE_ID: u8 = 0;
996 ///
997 /// let dir = tempfile::tempdir().unwrap();
998 /// let path = dir.path().join("tmp_tx");
999 ///
1000 /// let cfg = FrozenMMapCfg {
1001 /// module_id: MODULE_ID,
1002 /// initial_count: 0x10,
1003 /// flush_duration: std::time::Duration::from_micros(50),
1004 /// };
1005 ///
1006 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1007 ///
1008 /// let mut tx = mmap.new_tx();
1009 /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
1010 /// unsafe { tx.write(1, |v| *v = 0x0B) }.unwrap();
1011 /// unsafe { tx.write(2, |v| *v = 0x0C) }.unwrap();
1012 ///
1013 /// let epoch = tx.commit().unwrap();
1014 /// mmap.wait_for_durability(epoch).unwrap();
1015 ///
1016 /// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1017 /// let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
1018 /// let v2 = unsafe { mmap.read(2, |v| *v).unwrap() };
1019 ///
1020 /// assert_eq!((v0, v1, v2), (0x0A, 0x0B, 0x0C));
1021 /// ```
1022 #[inline(always)]
1023 pub unsafe fn write<F>(&mut self, index: usize, f: F) -> FrozenResult<()>
1024 where
1025 F: FnOnce(*mut T) + 'a,
1026 {
1027 // NOTE:
1028 //
1029 // This check prevents a potential footgun! For a safe transaction all writes must be,
1030 // - ordered by index (either incr or decr)
1031 // - no multi writes on same index
1032 //
1033 // If any of these is violated, there is a certain risk of deadlock in multi tx env's
1034 if let Some((last_idx, _)) = self.ops_vec.last() {
1035 if index <= *last_idx {
1036 return err::new_err(
1037 err::HCF,
1038 "tx writes must be strictly ordered, with no more then single ops on given index",
1039 );
1040 }
1041 }
1042
1043 self.ops_vec.push((index, Box::new(f)));
1044 Ok(())
1045 }
1046
1047 /// Commit the transaction, applying all the writes ops, combined into a single atomic operation
1048 ///
1049 /// ## Guarantees
1050 ///
1051 /// - All writes are applied under a single epoch
1052 /// - All writes belong to the same durability batch
1053 /// - No interleaving with other transactions at epoch level
1054 ///
1055 /// ## Example
1056 ///
1057 /// ```
1058 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
1059 ///
1060 /// const MODULE_ID: u8 = 0;
1061 ///
1062 /// let dir = tempfile::tempdir().unwrap();
1063 /// let path = dir.path().join("tmp_tx");
1064 ///
1065 /// let cfg = FrozenMMapCfg {
1066 /// module_id: MODULE_ID,
1067 /// initial_count: 0x10,
1068 /// flush_duration: std::time::Duration::from_micros(50),
1069 /// };
1070 ///
1071 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1072 ///
1073 /// let mut tx = mmap.new_tx();
1074 /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
1075 /// unsafe { tx.write(2, |v| *v = 0x0C) }.unwrap();
1076 ///
1077 /// let epoch = tx.commit().unwrap();
1078 /// mmap.wait_for_durability(epoch).unwrap();
1079 ///
1080 /// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1081 /// let v1 = unsafe { mmap.read(2, |v| *v).unwrap() };
1082 ///
1083 /// assert_eq!((v0, v1), (0x0A, 0x0C));
1084 /// ```
1085 #[inline(always)]
1086 pub fn commit(self) -> FrozenResult<u64> {
1087 if let Some(err) = self.core.get_sync_error() {
1088 return Err(err);
1089 }
1090
1091 let _guard = self.core.acquire_io_lock();
1092
1093 // NOTE: we must acquire all locks beforehand, to make sure all the write go through
1094 let mut guards = Vec::with_capacity(self.ops_vec.len());
1095 for (idx, _) in &self.ops_vec {
1096 guards.push(self.core.locks.lock(*idx));
1097 }
1098
1099 for (idx, op) in self.ops_vec {
1100 let offset = idx * std::mem::size_of::<T>();
1101 let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
1102 op(ptr);
1103 }
1104
1105 self.core.dirty.store(true, atomic::Ordering::Release);
1106 let epoch = self.core.incr_curr_epoch();
1107
1108 Ok(epoch)
1109 }
1110}
1111
1112#[derive(Debug)]
1113struct Core {
1114 map: TMap,
1115 locks: Locks,
1116 file: FrozenFile,
1117 cv: sync::Condvar,
1118 curr_length: usize,
1119 lock: sync::Mutex<()>,
1120 io_lock: sync::RwLock<()>,
1121 dirty: atomic::AtomicBool,
1122 durable_cv: sync::Condvar,
1123 closed: atomic::AtomicBool,
1124 durable_lock: sync::Mutex<()>,
1125 flush_duration: time::Duration,
1126 current_epoch: atomic::AtomicU64,
1127 durable_epoch: atomic::AtomicU64,
1128 error: atomic::AtomicPtr<sync::Arc<FrozenError>>,
1129}
1130
1131unsafe impl Send for Core {}
1132unsafe impl Sync for Core {}
1133
1134impl Core {
1135 fn new(
1136 map: TMap,
1137 file: FrozenFile,
1138 flush_duration: time::Duration,
1139 curr_length: usize,
1140 total_slots: usize,
1141 ) -> Self {
1142 Self {
1143 map,
1144 file,
1145 curr_length,
1146 flush_duration,
1147 cv: sync::Condvar::new(),
1148 lock: sync::Mutex::new(()),
1149 io_lock: sync::RwLock::new(()),
1150 locks: Locks::new(total_slots),
1151 durable_cv: sync::Condvar::new(),
1152 durable_lock: sync::Mutex::new(()),
1153 dirty: atomic::AtomicBool::new(false),
1154 closed: atomic::AtomicBool::new(false),
1155 current_epoch: atomic::AtomicU64::new(0),
1156 durable_epoch: atomic::AtomicU64::new(0),
1157 error: atomic::AtomicPtr::new(std::ptr::null_mut()),
1158 }
1159 }
1160
1161 #[inline]
1162 fn sync(&self) -> FrozenResult<()> {
1163 unsafe { self.map.sync(self.curr_length) }?;
1164 self.file.sync()
1165 }
1166
1167 #[inline(always)]
1168 fn set_sync_error(&self, err: FrozenError) {
1169 let boxed = Box::into_raw(Box::new(sync::Arc::new(err)));
1170 let old = self.error.swap(boxed, atomic::Ordering::AcqRel);
1171
1172 // NOTE: we must free the old error, if any, to avoid mem leaks
1173 if !old.is_null() {
1174 unsafe { drop(Box::from_raw(old)) };
1175 }
1176 }
1177
1178 #[inline(always)]
1179 fn get_sync_error(&self) -> Option<FrozenError> {
1180 let ptr = self.error.load(atomic::Ordering::Acquire);
1181 if hints::likely(ptr.is_null()) {
1182 return None;
1183 }
1184
1185 let arc = unsafe { &*ptr }.clone();
1186 Some((*arc).clone())
1187 }
1188
1189 #[inline]
1190 fn clear_sync_error(&self) {
1191 let old = self.error.swap(std::ptr::null_mut(), atomic::Ordering::AcqRel);
1192 if hints::unlikely(!old.is_null()) {
1193 unsafe {
1194 drop(Box::from_raw(old));
1195 }
1196 }
1197 }
1198
1199 /// ## Why we ignore [`std::sync::PoisonError`]?
1200 ///
1201 /// The mutex used for lock, is solely used as a parking primitive for [`Condvar`] and does not
1202 /// protect any mutable state. All the pool invariants and accounting are maintained via atomics
1203 /// and are completely seperated from the mutex.
1204 ///
1205 /// A poisoned mutex only indicates that another tx panicked while holding the lock, and indicates
1206 /// an inconsistent state of the protected value. Since no state can be left partially modified
1207 /// under this lock, there is no possible consistency risk to recover from and propagating the
1208 /// poison error would only introduce unnecessary failures into the allocation path.
1209 ///
1210 /// Therefore, as best effort, we consume the [`std::sync::PoisonError`] and continue operating
1211 /// with the recovered guard.
1212 #[inline]
1213 fn acquire_durable_lock(&self) -> sync::MutexGuard<'_, ()> {
1214 self.durable_lock.lock().unwrap_or_else(|e| e.into_inner())
1215 }
1216
1217 /// NOTE: See [`Core::acquire_durable_lock`] implementation for rationale behind poison recovery
1218 #[inline]
1219 fn acquire_lock(&self) -> sync::MutexGuard<'_, ()> {
1220 self.lock.lock().unwrap_or_else(|e| e.into_inner())
1221 }
1222
1223 /// NOTE: See [`Core::acquire_durable_lock`] implementation for rationale behind poison recovery
1224 #[inline]
1225 fn acquire_io_lock(&self) -> sync::RwLockReadGuard<'_, ()> {
1226 self.io_lock.read().unwrap_or_else(|e| e.into_inner())
1227 }
1228
1229 /// NOTE: See [`Core::acquire_durable_lock`] implementation for rationale behind poison recovery
1230 #[inline]
1231 fn acquire_exclusive_io_lock(&self) -> sync::RwLockWriteGuard<'_, ()> {
1232 self.io_lock.write().unwrap_or_else(|e| e.into_inner())
1233 }
1234
1235 #[inline]
1236 fn incr_curr_epoch(&self) -> u64 {
1237 self.current_epoch.fetch_add(1, atomic::Ordering::Release) + 1
1238 }
1239
1240 #[inline]
1241 fn mark_epoch_durable(&self) {
1242 let curr_epoch = self.current_epoch.load(atomic::Ordering::Acquire);
1243 self.durable_epoch.store(curr_epoch, atomic::Ordering::Release);
1244 }
1245
1246 fn spawn_tx(core: sync::Arc<Self>) -> FrozenResult<thread::JoinHandle<()>> {
1247 match thread::Builder::new().name("fm-flush-tx".into()).spawn(move || Self::flush_tx(core)) {
1248 Ok(tx) => Ok(tx),
1249 Err(error) => err::new_err(err::FXE, error),
1250 }
1251 }
1252
1253 fn flush_tx(core: sync::Arc<Self>) {
1254 // init phase (acquiring locks)
1255 let mut guard = match core.lock.lock() {
1256 Ok(g) => g,
1257 Err(error) => {
1258 core.set_sync_error(err::new_err_raw(err::FXE, error));
1259 core.cv.notify_all();
1260 return;
1261 }
1262 };
1263
1264 // sync loop w/ non-busy waiting
1265 loop {
1266 guard = match core.cv.wait_timeout(guard, core.flush_duration) {
1267 Ok((g, _)) => g,
1268 Err(e) => {
1269 core.set_sync_error(err::new_err_raw(err::TXE, e));
1270 core.cv.notify_all();
1271 return;
1272 }
1273 };
1274
1275 // NOTE: we must read values of close brodcast before acquire exclusive lock, if done
1276 // otherwise, we impose serious deadlock sort of situation for the the flusher tx
1277
1278 let dirty = core.dirty.swap(false, atomic::Ordering::AcqRel);
1279 let closing = core.closed.load(atomic::Ordering::Acquire);
1280
1281 if !dirty {
1282 if closing {
1283 core.cv.notify_all();
1284 return;
1285 }
1286
1287 continue;
1288 }
1289
1290 // INFO: we must acquire an exclusive IO lock for sync, hence no write, read or
1291 // grow could kick in while sync is in progress
1292
1293 let io_lock = core.acquire_exclusive_io_lock();
1294
1295 // INFO: we must drop the guard before syscall, as its a blocking operation and holding
1296 // the mutex while the syscall takes place is not a good idea, while we drop the mutex
1297 // and acqurie it again, in-between other process could acquire it and use it
1298 drop(guard);
1299
1300 // NOTE: We snapshot `current_epoch` before sync to establish a strict batch boundary,
1301 // all writes up to `target_epoch` are guaranteed to be part of this flush window,
1302 // while anything beyound belongs to the next batch
1303 let target_epoch = core.current_epoch.load(atomic::Ordering::Acquire);
1304
1305 // NOTE:
1306 //
1307 // - if sync fails, we update the Core::error w/ the received error object
1308 // - we clear it up when another sync call succeeds
1309 // - this is valid, as the underlying sync flushes entire mmaped region, hence
1310 // even if the last call failed, and the new one succeeded, we do get the durability
1311 // guarenty for the old data as well
1312
1313 match core.sync() {
1314 Ok(_) => {
1315 core.durable_epoch.store(target_epoch, atomic::Ordering::Release);
1316
1317 let _g = core.acquire_durable_lock();
1318
1319 core.clear_sync_error();
1320 core.durable_cv.notify_all();
1321 }
1322 Err(err) => {
1323 core.set_sync_error(err);
1324 core.durable_cv.notify_all();
1325 }
1326 }
1327
1328 drop(io_lock);
1329 guard = core.acquire_lock();
1330 }
1331 }
1332}
1333
1334#[derive(Debug)]
1335struct Locks(Box<[atomic::AtomicU8]>);
1336
1337impl Locks {
1338 const LOCK: u8 = 1;
1339 const UNLOCK: u8 = 0;
1340
1341 const L1_CONTENTION: usize = 0x10;
1342 const L2_CONTENTION: usize = 0x20;
1343
1344 fn new(cap: usize) -> Self {
1345 let mut slots = Vec::with_capacity(cap);
1346 for _ in 0..cap {
1347 slots.push(atomic::AtomicU8::new(Self::UNLOCK));
1348 }
1349
1350 Self(slots.into_boxed_slice())
1351 }
1352
1353 #[inline(always)]
1354 fn lock(&self, index: usize) -> LockGuard<'_> {
1355 let val = &self.0[index];
1356 let mut spins = 0;
1357
1358 loop {
1359 if val
1360 .compare_exchange_weak(Self::UNLOCK, Self::LOCK, atomic::Ordering::Acquire, atomic::Ordering::Relaxed)
1361 .is_ok()
1362 {
1363 return LockGuard(val);
1364 }
1365
1366 // we must backoff under contention
1367
1368 // NOTE:
1369 //
1370 // We have three levels of backoff,
1371 //
1372 // 1. Busy waiting w/ CPU hint
1373 // 2. Willingly allow preemption by OS schedular
1374 // 3. Sleep the thread (increasing w/ exponenial factor)
1375
1376 if hints::likely(spins < Self::L1_CONTENTION) {
1377 std::hint::spin_loop();
1378 } else if spins < Self::L2_CONTENTION {
1379 thread::yield_now();
1380 } else {
1381 let ns = 0x30 << (spins - Self::L2_CONTENTION).min(0x0A);
1382 thread::sleep(time::Duration::from_nanos(ns));
1383 }
1384
1385 spins += 1;
1386 }
1387 }
1388}
1389
1390struct LockGuard<'a>(&'a atomic::AtomicU8);
1391
1392impl Drop for LockGuard<'_> {
1393 fn drop(&mut self) {
1394 self.0.store(Locks::UNLOCK, atomic::Ordering::Release);
1395 }
1396}
1397
1398#[cfg(test)]
1399mod tests {
1400 use super::*;
1401
1402 // NOTE: we keep this small on purpose, so we won't have to wait at all in tests
1403 const FLUSH_DURATION: time::Duration = time::Duration::from_micros(10);
1404
1405 const MODULE_ID: u8 = 0;
1406 const INIT_SLOTS: usize = 0x0A;
1407
1408 fn new_tmp() -> (tempfile::TempDir, std::path::PathBuf, FrozenMMapCfg) {
1409 let dir = tempfile::tempdir().unwrap();
1410 let path = dir.path().join("tmp_map");
1411
1412 let cfg = FrozenMMapCfg { module_id: MODULE_ID, initial_count: INIT_SLOTS, flush_duration: FLUSH_DURATION };
1413
1414 (dir, path, cfg)
1415 }
1416
1417 mod fm_lifecycle {
1418 use super::*;
1419
1420 #[test]
1421 fn ok_new() {
1422 let (_dir, path, cfg) = new_tmp();
1423 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1424
1425 assert_eq!(mmap.core.flush_duration, FLUSH_DURATION);
1426 assert!(!mmap.core.dirty.load(atomic::Ordering::Acquire));
1427 assert!(!mmap.core.closed.load(atomic::Ordering::Acquire));
1428 assert_eq!(mmap.core.durable_epoch.load(atomic::Ordering::Acquire), 0);
1429 assert_eq!(mmap.core.curr_length, INIT_SLOTS * FrozenMMap::<u64>::SLOT_SIZE);
1430
1431 // satisfies the bg thread was spawned correctly
1432 assert!(mmap.core.error.load(atomic::Ordering::Acquire).is_null());
1433
1434 // satisfies wait on epoch works
1435 let epoch = unsafe { mmap.write(0, |f| *f = 0x0A).unwrap() };
1436 assert!(mmap.wait_for_durability(epoch).is_ok());
1437 }
1438
1439 #[test]
1440 fn ok_new_existing() {
1441 let (_dir, path, cfg) = new_tmp();
1442
1443 // create new + close
1444 let mmap1 = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1445 drop(mmap1);
1446
1447 // open existing
1448 let mmap2 = FrozenMMap::<u64>::new(path, cfg).unwrap();
1449 drop(mmap2);
1450 }
1451
1452 #[test]
1453 fn err_new_when_change_in_cfg() {
1454 let (_dir, path, mut cfg) = new_tmp();
1455
1456 // create new + close
1457 let mmap1 = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1458 drop(mmap1);
1459
1460 // update cfg + opne existing
1461 cfg.initial_count = INIT_SLOTS * 2;
1462 assert!(FrozenMMap::<u64>::new(path, cfg).is_err());
1463 }
1464
1465 #[test]
1466 fn ok_delete() {
1467 let (_dir, path, cfg) = new_tmp();
1468 let mut mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1469
1470 mmap.delete().unwrap();
1471 assert!(!mmap.core.file.exists().unwrap());
1472 }
1473
1474 #[test]
1475 fn err_delete_after_delete() {
1476 let (_dir, path, cfg) = new_tmp();
1477 let mut mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1478
1479 mmap.delete().unwrap();
1480 assert!(!mmap.core.file.exists().unwrap());
1481 assert!(mmap.delete().is_err());
1482 }
1483
1484 #[test]
1485 fn ok_drop_persists_when_dropped_before_bg_flush() {
1486 let (_dir, path, cfg) = new_tmp();
1487 const VAL: u64 = 0x0A;
1488
1489 {
1490 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1491 unsafe { mmap.write(0, |byte| *byte = VAL).unwrap() };
1492 drop(mmap);
1493 }
1494
1495 {
1496 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1497 let val = unsafe { mmap.read(0, |byte| *byte).unwrap() };
1498 assert_eq!(val, VAL);
1499 }
1500 }
1501 }
1502
1503 mod fm_validate_t {
1504 use super::*;
1505
1506 #[repr(C, align(8))]
1507 struct OkT {
1508 a: u64,
1509 b: u64,
1510 }
1511
1512 #[repr(C)]
1513 struct BadAlignT {
1514 a: u32,
1515 b: u32,
1516 }
1517
1518 #[repr(C, align(4))]
1519 struct BadSizeT {
1520 a: u32,
1521 b: u16,
1522 }
1523
1524 #[repr(C, align(8))]
1525 struct DropT(u64);
1526
1527 impl Drop for DropT {
1528 fn drop(&mut self) {}
1529 }
1530
1531 #[repr(C, align(8))]
1532 struct ZstT;
1533
1534 #[test]
1535 fn ok_validate_t() {
1536 assert!(FrozenMMap::<OkT>::validate_t().is_ok());
1537 }
1538
1539 #[test]
1540 fn err_validate_t_when_drop() {
1541 assert!(FrozenMMap::<DropT>::validate_t().is_err());
1542 }
1543
1544 #[test]
1545 fn err_validate_t_when_not_8_byte_aligned() {
1546 assert!(FrozenMMap::<BadAlignT>::validate_t().is_err());
1547 }
1548
1549 #[test]
1550 fn err_validate_t_when_size_not_multiple_of_8() {
1551 assert!(FrozenMMap::<BadSizeT>::validate_t().is_err());
1552 }
1553
1554 #[test]
1555 fn err_validate_t_when_zero_sized() {
1556 assert!(FrozenMMap::<ZstT>::validate_t().is_err());
1557 }
1558
1559 #[test]
1560 fn err_new_when_t_implements_drop() {
1561 let (_dir, path, cfg) = new_tmp();
1562 assert!(FrozenMMap::<DropT>::new(path, cfg).is_err());
1563 }
1564
1565 #[test]
1566 fn err_new_when_t_is_not_8_byte_aligned() {
1567 let (_dir, path, cfg) = new_tmp();
1568 assert!(FrozenMMap::<BadAlignT>::new(path, cfg).is_err());
1569 }
1570
1571 #[test]
1572 fn err_new_when_t_size_is_not_multiple_of_8() {
1573 let (_dir, path, cfg) = new_tmp();
1574 assert!(FrozenMMap::<BadSizeT>::new(path, cfg).is_err());
1575 }
1576
1577 #[test]
1578 fn err_new_when_t_is_zero_sized() {
1579 let (_dir, path, cfg) = new_tmp();
1580 assert!(FrozenMMap::<ZstT>::new(path, cfg).is_err());
1581 }
1582
1583 #[test]
1584 fn err_new_grown_when_t_implements_drop() {
1585 let (_dir, path, cfg) = new_tmp();
1586 assert!(FrozenMMap::<DropT>::new_grown(path, cfg, 1).is_err());
1587 }
1588
1589 #[test]
1590 fn err_new_grown_when_t_is_not_8_byte_aligned() {
1591 let (_dir, path, cfg) = new_tmp();
1592 assert!(FrozenMMap::<BadAlignT>::new_grown(path, cfg, 1).is_err());
1593 }
1594
1595 #[test]
1596 fn err_new_grown_when_t_size_is_not_multiple_of_8() {
1597 let (_dir, path, cfg) = new_tmp();
1598 assert!(FrozenMMap::<BadSizeT>::new_grown(path, cfg, 1).is_err());
1599 }
1600
1601 #[test]
1602 fn err_new_grown_when_t_is_zero_sized() {
1603 let (_dir, path, cfg) = new_tmp();
1604 assert!(FrozenMMap::<ZstT>::new_grown(path, cfg, 1).is_err());
1605 }
1606 }
1607
1608 mod fm_new_grown {
1609 use super::*;
1610
1611 #[test]
1612 fn ok_new_grown_updates_length() {
1613 let (_dir, path, cfg) = new_tmp();
1614
1615 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1616 assert_eq!(mmap.total_slots(), INIT_SLOTS);
1617 drop(mmap);
1618
1619 let mmap = FrozenMMap::<u64>::new_grown(path, cfg, 0x0A).unwrap();
1620 assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x0A);
1621 assert_eq!(mmap.core.curr_length, (INIT_SLOTS + 0x0A) * FrozenMMap::<u64>::SLOT_SIZE);
1622 }
1623
1624 #[test]
1625 fn err_new_grown_with_preexisting_instance() {
1626 let (_dir, path, cfg) = new_tmp();
1627
1628 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1629 assert_eq!(mmap.total_slots(), INIT_SLOTS);
1630
1631 assert!(FrozenMMap::<u64>::new_grown(path, cfg, 0x0A).is_err());
1632 }
1633
1634 #[test]
1635 fn ok_new_grown_cycle() {
1636 let (_dir, path, cfg) = new_tmp();
1637
1638 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1639 drop(mmap);
1640
1641 let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x100).unwrap();
1642 assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x100);
1643 drop(mmap);
1644
1645 let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x100).unwrap();
1646 assert_eq!(mmap.total_slots(), INIT_SLOTS + (2 * 0x100));
1647 drop(mmap);
1648
1649 let mmap = FrozenMMap::<u64>::new_grown(path, cfg, 0x100).unwrap();
1650 assert_eq!(mmap.total_slots(), INIT_SLOTS + (3 * 0x100));
1651 }
1652
1653 #[test]
1654 fn ok_write_reopen_grown_read() {
1655 let (_dir, path, cfg) = new_tmp();
1656
1657 {
1658 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1659 unsafe { mmap.write(0, |v| *v = 0xAA).unwrap() };
1660 }
1661
1662 {
1663 let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x10).unwrap();
1664 unsafe { mmap.write(0, |v| *v = 0xBB).unwrap() };
1665
1666 let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1667 assert_eq!(val, 0xBB);
1668 }
1669 }
1670
1671 #[test]
1672 fn ok_write_reopen_grown_read_cycle() {
1673 let (_dir, path, cfg) = new_tmp();
1674
1675 {
1676 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1677 unsafe { mmap.write(0, |v| *v = 1).unwrap() };
1678 }
1679
1680 for i in 0..2 {
1681 let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x10).unwrap();
1682 let idx = mmap.total_slots() - 1;
1683 unsafe { mmap.write(idx, |v| *v = (i + 2) as u64).unwrap() };
1684 drop(mmap);
1685 }
1686
1687 let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1688
1689 let base = unsafe { mmap.read(0, |v| *v).unwrap() };
1690 assert_eq!(base, 1);
1691
1692 let last_idx = mmap.total_slots() - 1;
1693 let last = unsafe { mmap.read(last_idx, |v| *v).unwrap() };
1694 assert_eq!(last, 3);
1695 }
1696
1697 #[test]
1698 fn err_new_grown_while_previous_instance_is_alive() {
1699 let (_dir, path, cfg) = new_tmp();
1700
1701 let _mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1702 let reopened = FrozenMMap::<u64>::new_grown(&path, cfg, 0x10);
1703
1704 assert!(reopened.is_err());
1705 }
1706 }
1707
1708 mod fm_write_read {
1709 use super::*;
1710
1711 #[test]
1712 fn ok_write_wait_read_cycle() {
1713 const VAL: u64 = 0xDEADC0DE;
1714 let (_dir, path, cfg) = new_tmp();
1715 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1716
1717 // write + sync
1718 let epoch = unsafe { mmap.write(0, |ptr| *ptr = VAL).unwrap() };
1719 mmap.wait_for_durability(epoch).unwrap();
1720
1721 // read + verify
1722 let val = unsafe { mmap.read(0, |ptr| *ptr).unwrap() };
1723 assert_eq!(val, VAL);
1724 }
1725
1726 #[test]
1727 fn ok_write_read_without_wait() {
1728 const VAL: u64 = 0xDEADC0DE;
1729 let (_dir, path, cfg) = new_tmp();
1730 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1731
1732 unsafe { mmap.write(0, |ptr| *ptr = VAL).unwrap() };
1733 let val = unsafe { mmap.read(0, |ptr| *ptr).unwrap() };
1734 assert_eq!(val, VAL);
1735 }
1736 }
1737
1738 mod fm_write_sync_read {
1739 use super::*;
1740
1741 #[test]
1742 fn ok_write_sync_read() {
1743 let (_dir, path, cfg) = new_tmp();
1744 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1745
1746 unsafe { mmap.write_sync(0, |v| *v = 0x4C).unwrap() };
1747
1748 let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1749 assert_eq!(val, 0x4C);
1750 }
1751
1752 #[test]
1753 fn ok_write_sync_wait_returns_immediately() {
1754 let (_dir, path, cfg) = new_tmp();
1755 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1756
1757 let _ = unsafe { mmap.write_sync(0, |v| *v = 0x6A).unwrap() };
1758
1759 let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1760 assert_eq!(val, 0x6A);
1761 }
1762
1763 #[test]
1764 fn ok_write_sync_followed_by_async_write() {
1765 let (_dir, path, cfg) = new_tmp();
1766
1767 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1768 unsafe { mmap.write_sync(0, |v| *v = 0x0A).unwrap() };
1769
1770 let epoch = unsafe { mmap.write(0, |v| *v = 0x14).unwrap() };
1771 mmap.wait_for_durability(epoch).unwrap();
1772
1773 let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1774 assert_eq!(val, 0x14);
1775 }
1776
1777 #[test]
1778 fn ok_write_sync_makes_prev_batch_durable() {
1779 let (_dir, path, cfg) = new_tmp();
1780 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1781
1782 let async_epoch = unsafe { mmap.write(0, |v| *v = 1).unwrap() };
1783 unsafe { mmap.write_sync(1, |v| *v = 2).unwrap() };
1784 mmap.wait_for_durability(async_epoch).unwrap();
1785
1786 let v1 = unsafe { mmap.read(0, |v| *v).unwrap() };
1787 let v2 = unsafe { mmap.read(1, |v| *v).unwrap() };
1788
1789 assert_eq!(v1, 1);
1790 assert_eq!(v2, 2);
1791 }
1792
1793 #[test]
1794 fn ok_write_sync_persists_across_reopen() {
1795 let (_dir, path, cfg) = new_tmp();
1796 const VAL: u64 = 0x1000;
1797
1798 {
1799 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1800 unsafe { mmap.write_sync(0, |v| *v = VAL).unwrap() };
1801 }
1802
1803 {
1804 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1805 let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1806 assert_eq!(val, VAL);
1807 }
1808 }
1809 }
1810
1811 mod fm_tx {
1812 use super::*;
1813
1814 #[test]
1815 fn ok_tx_basic_multi_write() {
1816 let (_dir, path, cfg) = new_tmp();
1817 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1818
1819 let mut tx = mmap.new_tx();
1820 unsafe {
1821 tx.write(0, |v| *v = 1).unwrap();
1822 tx.write(1, |v| *v = 2).unwrap();
1823 tx.write(2, |v| *v = 3).unwrap();
1824 }
1825
1826 let epoch = tx.commit().unwrap();
1827 mmap.wait_for_durability(epoch).unwrap();
1828
1829 let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1830 let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
1831 let v2 = unsafe { mmap.read(2, |v| *v).unwrap() };
1832
1833 assert_eq!((v0, v1, v2), (1, 2, 3));
1834 }
1835
1836 #[test]
1837 fn ok_tx_single_epoch() {
1838 let (_dir, path, cfg) = new_tmp();
1839 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1840
1841 let mut tx = mmap.new_tx();
1842 unsafe {
1843 tx.write(0, |v| *v = 0x0A).unwrap();
1844 tx.write(1, |v| *v = 0x14).unwrap();
1845 }
1846
1847 // NOTE: next write must have strictly higher epoch then current one
1848
1849 let epoch = tx.commit().unwrap();
1850 let next_epoch = unsafe { mmap.write(2, |v| *v = 0x1E).unwrap() };
1851
1852 assert!(next_epoch > epoch);
1853 }
1854
1855 #[test]
1856 fn err_tx_out_of_order() {
1857 let (_dir, path, cfg) = new_tmp();
1858 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1859
1860 let mut tx = mmap.new_tx();
1861 unsafe {
1862 tx.write(2, |v| *v = 1).unwrap();
1863 let res = tx.write(1, |v| *v = 2);
1864 assert!(res.is_err());
1865 }
1866 }
1867
1868 #[test]
1869 fn err_tx_duplicate_index() {
1870 let (_dir, path, cfg) = new_tmp();
1871 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1872
1873 let mut tx = mmap.new_tx();
1874 unsafe {
1875 tx.write(1, |v| *v = 1).unwrap();
1876 let res = tx.write(1, |v| *v = 2);
1877 assert!(res.is_err());
1878 }
1879 }
1880
1881 #[test]
1882 fn ok_tx_concurrent_non_overlapping() {
1883 let (_dir, path, cfg) = new_tmp();
1884 let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
1885
1886 let mut handles = Vec::new();
1887 for i in 0..2 {
1888 let mmap = mmap.clone();
1889 handles.push(thread::spawn(move || {
1890 let mut tx = mmap.new_tx();
1891
1892 unsafe {
1893 tx.write(i * 2, |v| *v = i as u64).unwrap();
1894 tx.write(i * 2 + 1, |v| *v = i as u64).unwrap();
1895 }
1896
1897 let epoch = tx.commit().unwrap();
1898 mmap.wait_for_durability(epoch).unwrap();
1899 }));
1900 }
1901
1902 for h in handles {
1903 h.join().unwrap();
1904 }
1905
1906 for i in 0..2 {
1907 let v0 = unsafe { mmap.read(i * 2, |v| *v).unwrap() };
1908 let v1 = unsafe { mmap.read(i * 2 + 1, |v| *v).unwrap() };
1909
1910 assert_eq!((v0, v1), (i as u64, i as u64));
1911 }
1912 }
1913
1914 #[test]
1915 fn ok_tx_overwrite_last_wins() {
1916 let (_dir, path, cfg) = new_tmp();
1917 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1918
1919 let mut tx = mmap.new_tx();
1920 unsafe {
1921 tx.write(0, |v| *v = 1).unwrap();
1922 }
1923
1924 tx.commit().unwrap();
1925
1926 let mut tx2 = mmap.new_tx();
1927 unsafe {
1928 tx2.write(0, |v| *v = 2).unwrap();
1929 }
1930
1931 let epoch = tx2.commit().unwrap();
1932 mmap.wait_for_durability(epoch).unwrap();
1933
1934 let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1935 assert_eq!(val, 2);
1936 }
1937
1938 #[test]
1939 fn ok_tx_persists_across_reopen() {
1940 let (_dir, path, cfg) = new_tmp();
1941
1942 {
1943 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1944
1945 let mut tx = mmap.new_tx();
1946 unsafe {
1947 tx.write(0, |v| *v = 0x3A).unwrap();
1948 tx.write(1, |v| *v = 0x54).unwrap();
1949 }
1950
1951 let epoch = tx.commit().unwrap();
1952 mmap.wait_for_durability(epoch).unwrap();
1953 }
1954
1955 {
1956 let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1957
1958 let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1959 let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
1960
1961 assert_eq!((v0, v1), (0x3A, 0x54));
1962 }
1963 }
1964 }
1965
1966 mod fm_durability {
1967 use super::*;
1968
1969 #[test]
1970 fn ok_wait_then_drop() {
1971 let (_dir, path, cfg) = new_tmp();
1972 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1973
1974 let epoch = unsafe { mmap.write(0, |v| *v = 7).unwrap() };
1975 mmap.wait_for_durability(epoch).unwrap();
1976
1977 drop(mmap);
1978 }
1979
1980 #[test]
1981 fn ok_epoch_monotonicity() {
1982 let (_dir, path, cfg) = new_tmp();
1983 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1984
1985 let e1 = unsafe { mmap.write(0, |v| *v = 1).unwrap() };
1986 mmap.wait_for_durability(e1).unwrap();
1987
1988 let e2 = unsafe { mmap.write(0, |v| *v = 2).unwrap() };
1989 mmap.wait_for_durability(e2).unwrap();
1990 assert!(e2 >= e1);
1991 }
1992
1993 #[test]
1994 fn ok_wait_for_durability_with_multi_writers() {
1995 let (_dir, path, cfg) = new_tmp();
1996 let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
1997
1998 let mut handles = Vec::new();
1999 for _ in 0..2 {
2000 let mmap = mmap.clone();
2001 handles.push(thread::spawn(move || {
2002 let epoch = unsafe { mmap.write(0, |v| *v += 1).unwrap() };
2003 mmap.wait_for_durability(epoch).unwrap();
2004 }));
2005 }
2006
2007 for h in handles {
2008 h.join().unwrap();
2009 }
2010
2011 let val = unsafe { mmap.read(0, |v| *v).unwrap() };
2012 assert_eq!(val, 2);
2013 }
2014 }
2015
2016 mod fm_concurrency {
2017 use super::*;
2018
2019 #[test]
2020 fn ok_parallel_reads_with_diff_index() {
2021 let (_dir, path, cfg) = new_tmp();
2022 let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
2023
2024 unsafe { mmap.write(0, |v| *v = 0x10).unwrap() };
2025 unsafe { mmap.write(1, |v| *v = 0x20).unwrap() };
2026
2027 let t1 = {
2028 let mmap = mmap.clone();
2029 thread::spawn(move || unsafe { mmap.read(0, |v| *v).unwrap() })
2030 };
2031
2032 let t2 = {
2033 let mmap = mmap.clone();
2034 thread::spawn(move || unsafe { mmap.read(1, |v| *v).unwrap() })
2035 };
2036
2037 assert_eq!(t1.join().unwrap(), 0x10);
2038 assert_eq!(t2.join().unwrap(), 0x20);
2039 }
2040
2041 #[test]
2042 fn ok_multi_tx_drop_then_reopen_grown() {
2043 let (_dir, path, cfg) = new_tmp();
2044
2045 {
2046 let mmap = sync::Arc::new(FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap());
2047
2048 let mut handles = Vec::new();
2049 for i in 0..2u64 {
2050 let mmap = mmap.clone();
2051 handles.push(thread::spawn(move || {
2052 let _ = unsafe { mmap.write(i as usize, |v| *v = i + 1).unwrap() };
2053 }));
2054 }
2055
2056 for i in 0..2usize {
2057 let mmap = mmap.clone();
2058 handles.push(thread::spawn(move || {
2059 let _ = unsafe { mmap.read(i, |v| *v).unwrap() };
2060 }));
2061 }
2062
2063 for h in handles {
2064 h.join().unwrap();
2065 }
2066 }
2067
2068 {
2069 let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x10).unwrap();
2070 assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x10);
2071
2072 for i in 0..2u64 {
2073 let val = unsafe { mmap.read(i as usize, |v| *v).unwrap() };
2074 assert_eq!(val, i + 1);
2075 }
2076
2077 let idx = mmap.total_slots() - 1;
2078 unsafe { mmap.write(idx, |v| *v = 0xDEAD).unwrap() };
2079
2080 let val = unsafe { mmap.read(idx, |v| *v).unwrap() };
2081 assert_eq!(val, 0xDEAD);
2082 }
2083 }
2084 }
2085}