frozen_core/fmmap/mod.rs
1//! Custom implementation of `mmap(2)`
2//!
3//! ## Constraints
4//!
5//! [`FrozenMMap`] treats the mapped file as raw storage for values of `T`. Because of that,
6//! `T` must be a POD type which is safe to persist and later reinterpret from bytes.
7//!
8//! Required properties for `T`,
9//!
10//! - Must use `#[repr(C)]`
11//! - Should be 8-bytes aligned
12//! - Must not implement [`Drop`]
13//! - `size_of::<T>()` should be multiple of `8`
14//!
15//! *NOTE:* `T` must not contain heap owning or process-local pointers like [`Vec`], [`String`],
16//! [`Box`], references and function pointers, or other fields whose bit-pattern is not stable
17//! across reopen.
18//!
19//! These constrains are enforced as [`FrozenMMap`] does not serialize or deserialize values. It
20//! directly reads and writes `T` inside a memory mapped file. That means `T` must have a stable
21//! layout and must remain valid when the file is reopened in a later process.
22//!
23//! ## Example
24//!
25//! ```
26//! use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
27//!
28//! const MODULE_ID: u8 = 0;
29//!
30//! let dir = tempfile::tempdir().unwrap();
31//! let path = dir.path().join("tmp_frozen_mmap");
32//!
33//! let cfg = FrozenMMapCfg {
34//! module_id: MODULE_ID,
35//! initial_count: 0x0A,
36//! immediate_durability: false,
37//! flush_duration: std::time::Duration::from_micros(0x96),
38//! };
39//!
40//! let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
41//! assert_eq!(mmap.total_slots(), 0x0A);
42//!
43//! let ticket = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
44//! let _ = futures::executor::block_on(ticket).unwrap();
45//!
46//! let val = unsafe { mmap.read(0, |v| *v) };
47//! assert_eq!(val, 0xDEADC0DE);
48//!
49//! drop(mmap);
50//!
51//! let reopened = FrozenMMap::<u64>::new_grown(&path, cfg, 0x05).unwrap();
52//! assert_eq!(reopened.total_slots(), 0x0A + 0x05);
53//!
54//! let val = unsafe { reopened.read(0, |v| *v) };
55//! assert_eq!(val, 0xDEADC0DE);
56//! ```
57
58#[cfg(any(target_os = "linux", target_os = "macos"))]
59mod posix;
60
61use crate::{
62 ack,
63 error::FrozenResult,
64 ffile::{FrozenFile, FrozenFileCfg},
65 hints,
66};
67use std::{
68 fmt,
69 sync::{self, atomic},
70 thread, time,
71};
72
73/// type for `epoch` used by write ops
74pub type TEpoch = u64;
75
76#[cfg(any(target_os = "linux", target_os = "macos"))]
77type TMap = posix::POSIXMMap;
78
79/// Error codes for [`FrozenMMap`]
80pub(in crate::fmmap) mod err {
81 use crate::error::{ErrCode, FrozenError, FrozenResult};
82
83 /// Domain Id for [`FrozenMMap`] is **18**
84 const ERRDOMAIN: u8 = 0x12;
85
86 /// module id used for [`FrozenMMap`]
87 pub static MID: std::sync::OnceLock<u8> = std::sync::OnceLock::new();
88
89 #[cfg(not(test))]
90 #[inline(always)]
91 pub fn mid() -> &'static u8 {
92 MID.get().unwrap()
93 }
94
95 #[cfg(test)]
96 #[inline(always)]
97 pub fn mid() -> &'static u8 {
98 MID.get_or_init(|| 0)
99 }
100
101 /// internal fuck up (hault and catch fire)
102 pub const HCF: ErrCode = ErrCode::new(0x02, "hault and catch fire");
103
104 /// unknown error (fallback)
105 pub const UNK: ErrCode = ErrCode::new(0x04, "unknown error");
106
107 /// no more memory available
108 pub const NMM: ErrCode = ErrCode::new(0x06, "not enough memory available on the device");
109
110 /// syncing error
111 pub const SYN: ErrCode = ErrCode::new(0x08, "failed to sync/flush data to storage device");
112
113 /// no write/read perm
114 pub const PRM: ErrCode = ErrCode::new(0x0A, "missing permissions for IO");
115
116 /// type `T` must not be zero sized
117 pub const ZRO: ErrCode = ErrCode::new(0x0C, "type T must not be zero sized");
118
119 /// flush_tx error (unable to spawn)
120 pub const FXE: ErrCode = ErrCode::new(0x10, "unable to spawn flush_tx");
121
122 /// type `T` implements drop
123 pub const DRP: ErrCode = ErrCode::new(0x12, "type T must not implement `Drop`");
124
125 /// type `T` is not 8 bytes aligned
126 pub const ALN: ErrCode = ErrCode::new(0x14, "type T must be 8-bytes aligned");
127
128 /// `size_of::<T>()` is not multiple of 8
129 pub const SZE: ErrCode = ErrCode::new(0x16, "`size_of::<T>()` must be multiple of 8 bytes");
130
131 #[inline]
132 pub(in crate::fmmap) fn new_err<R, E: std::fmt::Display>(
133 code: ErrCode,
134 error: E,
135 ) -> FrozenResult<R> {
136 let err = FrozenError::new_raw(*mid(), ERRDOMAIN, code, error);
137 Err(err)
138 }
139
140 #[inline]
141 pub(in crate::fmmap) fn new_err_default<R>(code: ErrCode) -> FrozenResult<R> {
142 let err = FrozenError::new_raw(*mid(), ERRDOMAIN, code, "");
143 Err(err)
144 }
145}
146
147/// Config for [`FrozenMMap`]
148///
149/// ## Example
150///
151/// ```
152/// use frozen_core::fmmap::FrozenMMapCfg;
153///
154/// let cfg = FrozenMMapCfg {
155/// module_id: 0,
156/// initial_count: 0x100,
157/// immediate_durability: false,
158/// flush_duration: std::time::Duration::from_millis(0x0A),
159/// };
160///
161/// assert_eq!(cfg.initial_count, 0x100);
162/// assert!(!cfg.immediate_durability);
163/// assert_eq!(cfg.flush_duration.as_millis(), 0x0A);
164/// ```
165#[derive(Debug, Clone)]
166pub struct FrozenMMapCfg {
167 /// Identifier used for error propagation by [`frozen_core::error::FrozenError`]
168 pub module_id: u8,
169
170 /// Number of slots to pre-allocate when [`FrozenMMap`] is initialized
171 ///
172 /// Each slot has size of [`FrozenMMap::<T>::SLOT_SIZE`], where initial file length will
173 /// be `chunk_size * initial_count` (bytes)
174 pub initial_count: usize,
175
176 /// Time interval used by flusher tx, to batch write ops into a durable window and sync them
177 /// together, where all write ops in certain time interval falls into a single durable window
178 pub flush_duration: time::Duration,
179
180 /// Enables immediate durability scheduing write operations
181 ///
182 /// When disabled, durability is driven by the background flusher thread with respect to
183 /// [`FrozenMMapCfg::flush_duration`] time interval, allowing multiple write calls to be
184 /// bathced and flushed together into a single sync operation, reducing strain on resources.
185 ///
186 /// When enabled, every successful [`FrozenMMap::write`] and [`FrozenMMapTransaction::commit`]
187 /// immediately wakes the background flusher thread, disregarding the
188 /// [`FrozenMMapCfg::flush_duration`] time interval.
189 ///
190 /// This configuration is intended for workloads where writes are rare and durability latency
191 /// is preferred over batching efficiency.
192 pub immediate_durability: bool,
193}
194
195/// Custom implementation of `mmap(2)`
196///
197/// ## Constraints
198///
199/// [`FrozenMMap`] treats the mapped file as raw storage for values of `T`. Because of that, `T`
200/// must be a POD type which is safe to persist and later reinterpret from bytes.
201///
202/// Required properties for `T`,
203///
204/// - Must use `#[repr(C)]`
205/// - Should be 8-bytes aligned
206/// - Must not implement [`Drop`]
207/// - `size_of::<T>()` should be multiple of `8`
208///
209/// *NOTE:* `T` must not contain heap owning or process-local pointers like [`Vec`], [`String`],
210/// [`Box`], references and function pointers, or other fields whose bit-pattern is not stable
211/// across reopen.
212///
213/// These constrains are enforced as [`FrozenMMap`] does not serialize or deserialize values. It
214/// directly reads and writes `T` inside a memory mapped file. That means `T` must have a stable
215/// layout and must remain valid when the file is reopened in a later process.
216///
217/// ## Example
218///
219/// ```
220/// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
221///
222/// const MODULE_ID: u8 = 0;
223///
224/// let dir = tempfile::tempdir().unwrap();
225/// let path = dir.path().join("tmp_frozen_mmap");
226///
227/// let cfg = FrozenMMapCfg {
228/// module_id: MODULE_ID,
229/// initial_count: 0x0A,
230/// immediate_durability: false,
231/// flush_duration: std::time::Duration::from_micros(0x96),
232/// };
233///
234/// let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
235/// assert_eq!(mmap.total_slots(), 0x0A);
236///
237/// let ticket = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
238/// let _ = futures::executor::block_on(ticket).unwrap();
239///
240/// let val = unsafe { mmap.read(0, |v| *v) };
241/// assert_eq!(val, 0xDEADC0DE);
242///
243/// drop(mmap);
244///
245/// let reopened = FrozenMMap::<u64>::new_grown(&path, cfg, 0x05).unwrap();
246/// assert_eq!(reopened.total_slots(), 0x0A + 0x05);
247///
248/// let val = unsafe { reopened.read(0, |v| *v) };
249/// assert_eq!(val, 0xDEADC0DE);
250/// ```
251#[derive(Debug)]
252pub struct FrozenMMap<T>
253where
254 T: Sized + Send + Sync,
255{
256 core: sync::Arc<Core>,
257 flush_tx_handle: Option<thread::JoinHandle<()>>,
258 _t: core::marker::PhantomData<T>,
259}
260
261unsafe impl<T> Send for FrozenMMap<T> where T: Sized + Send + Sync {}
262unsafe impl<T> Sync for FrozenMMap<T> where T: Sized + Send + Sync {}
263
264impl<T> FrozenMMap<T>
265where
266 T: Sized + Send + Sync,
267{
268 /// Memory space required for each slot of [`T`] in [`FrozenMMap`]
269 ///
270 /// ## Example
271 ///
272 /// ```
273 /// use frozen_core::fmmap::FrozenMMap;
274 ///
275 /// assert_eq!(
276 /// FrozenMMap::<u64>::SLOT_SIZE,
277 /// std::mem::size_of::<u64>(),
278 /// );
279 /// ```
280 pub const SLOT_SIZE: usize = std::mem::size_of::<T>();
281
282 /// Create a new [`FrozenMMap`] instance w/ given [`FrozenMMapCfg`]
283 ///
284 /// ## Multiple Instances
285 ///
286 /// For each [`FrozenMMap`] instance, we acquire an exclusive lock from the kernal for the
287 /// underlying [`FrozenFile`], when trying to create multiple instances of [`FrozenMMap`], an
288 /// error will be thrown.
289 ///
290 /// ## Capacity Growth
291 ///
292 /// [`FrozenMMap`] does not support in-place growth of a live mapping, to increase capacity,
293 /// drop the current instance and reopen w/ [`FrozenMMap::open_grown`] which provides memory
294 /// mapping over grown capacity.
295 ///
296 /// ## [`FrozenMMapCfg`]
297 ///
298 /// All configs for [`FrozenMMap`] are stored in [`FrozenMMapCfg`]
299 ///
300 /// ## Working
301 ///
302 /// We first create a new [`FrozenFile`] if note already, then map the entire file using
303 /// `mmap(2)`, the entire file must read/write `T`, which also should stay constant for the
304 /// entire lifetime of file.
305 ///
306 /// ## Important
307 ///
308 /// The `cfg` must not change any of its properties for the entire life of [`FrozenFile`],
309 /// which is used under the hood, one must use config stores like [`Rta`](https://crates.io/crates/rta)
310 /// to store config.
311 ///
312 /// ## Example
313 ///
314 /// ```
315 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
316 ///
317 /// const MODULE_ID: u8 = 0;
318 ///
319 /// let dir = tempfile::tempdir().unwrap();
320 /// let path = dir.path().join("tmp_frozen_mmap");
321 ///
322 /// let cfg = FrozenMMapCfg {
323 /// module_id: MODULE_ID,
324 /// initial_count: 0x0A,
325 /// immediate_durability: false,
326 /// flush_duration: std::time::Duration::from_micros(0x96),
327 /// };
328 ///
329 /// let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
330 /// assert_eq!(mmap.total_slots(), 0x0A);
331 ///
332 /// let ticket = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
333 /// let _ = futures::executor::block_on(ticket).unwrap();
334 ///
335 /// let val = unsafe { mmap.read(0, |v| *v) };
336 /// assert_eq!(val, 0xDEADC0DE);
337 /// ```
338 pub fn new<P: AsRef<std::path::Path>>(path: P, cfg: FrozenMMapCfg) -> FrozenResult<Self> {
339 Self::validate_t()?;
340 let (file, curr_length) = Self::open_file(path.as_ref().to_path_buf(), &cfg)?;
341 let total_slots = curr_length / Self::SLOT_SIZE;
342
343 // NOTE: The value is used for error logging and is initialized only once, as `OnceLock`
344 // guarantees that the first caller sets the value and all subsequent calls reuse it
345 let _ = err::MID.get_or_init(|| cfg.module_id);
346
347 let mmap = unsafe { TMap::new(file.fd(), curr_length) }?;
348 let core = sync::Arc::new(Core::new(
349 mmap,
350 file,
351 curr_length,
352 total_slots,
353 cfg.immediate_durability,
354 ));
355
356 let cloned_core = sync::Arc::clone(&core);
357 let flush_tx_handle = match thread::Builder::new()
358 .name(format!("mod{}_fmmap_flush_tx", cfg.module_id))
359 .spawn(move || bg_flush_thread(cloned_core, cfg.flush_duration))
360 {
361 Ok(handle) => Some(handle),
362 Err(observed_error) => {
363 return err::new_err(err::FXE, observed_error);
364 }
365 };
366
367 Ok(Self { core, flush_tx_handle, _t: core::marker::PhantomData })
368 }
369
370 /// Create a new [`FrozenMMap`] instance w/ given [`FrozenMMapCfg`], while growing the
371 /// underlying [`FrozenFile`] by `additional_slots` before creating memory mapping
372 ///
373 /// ## Multiple Instances
374 ///
375 /// For each [`FrozenMMap`] instance, we acquire an exclusive lock from the kernal for the
376 /// underlying [`FrozenFile`], when trying to create multiple instances of [`FrozenMMap`], an
377 /// error will be thrown.
378 ///
379 /// ## Why not create a [`FrozenMMap::grow`] call?
380 ///
381 /// Previously when [`FrozenMMap::grow`] was attempted, it was observed that, resizing an active
382 /// memory mapping in place is tricky in concurrent code, as some threads would still hold
383 /// stale/unmapped pointers to mmap due to preemption from the OS schedular
384 ///
385 /// So, instead of remmaping a live instance, the current API performs growth during open,
386 /// making capacity expansion an explicit lifecycle operation, and not a side effect on a live
387 /// instance.
388 ///
389 /// ## [`FrozenMMapCfg`]
390 ///
391 /// All configs for [`FrozenMMap`] are stored in [`FrozenMMapCfg`]
392 ///
393 /// ## Working
394 ///
395 /// We first create a new [`FrozenFile`] if note already, then we grow the file using
396 /// [`FrozenFile::grow`] then map the entire file using `mmap(2)`, the entire file must
397 /// read/write `T`, which also should stay constant for the entire lifetime of file.
398 ///
399 /// ## Important
400 ///
401 /// The `cfg` must not change any of its properties for the entire life of [`FrozenFile`],
402 /// which is used under the hood, one must use config stores like
403 /// [`Rta`](https://crates.io/crates/rta) to store config.
404 ///
405 /// ## Example
406 ///
407 /// ```
408 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
409 ///
410 /// const MODULE_ID: u8 = 0;
411 ///
412 /// let dir = tempfile::tempdir().unwrap();
413 /// let path = dir.path().join("tmp_frozen_mmap");
414 ///
415 /// let cfg = FrozenMMapCfg {
416 /// module_id: MODULE_ID,
417 /// initial_count: 0x0A,
418 /// immediate_durability: false,
419 /// flush_duration: std::time::Duration::from_micros(0x96),
420 /// };
421 ///
422 /// let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x0A).unwrap();
423 /// assert_eq!(mmap.total_slots(), 0x0A * 2);
424 ///
425 /// let ticket = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
426 /// let _ = futures::executor::block_on(ticket).unwrap();
427 ///
428 /// let val = unsafe { mmap.read(0, |v| *v) };
429 /// assert_eq!(val, 0xDEADC0DE);
430 /// ```
431 pub fn new_grown<P: AsRef<std::path::Path>>(
432 path: P,
433 cfg: FrozenMMapCfg,
434 additional_slots: usize,
435 ) -> FrozenResult<Self> {
436 Self::validate_t()?;
437 let (file, _) = Self::open_file(path.as_ref().to_path_buf(), &cfg)?;
438
439 // we grow the underlying FrozenFile as requested
440 file.grow(additional_slots)?;
441 let curr_length = file.length()?; // we must read the updated (grown) length
442 let total_slots = curr_length / Self::SLOT_SIZE;
443
444 // NOTE: The value is used for error logging and is initialized only once, as `OnceLock`
445 // guarantees that the first caller sets the value and all subsequent calls reuse it
446 let _ = err::MID.get_or_init(|| cfg.module_id);
447
448 let mmap = unsafe { TMap::new(file.fd(), curr_length) }?;
449 let core = sync::Arc::new(Core::new(
450 mmap,
451 file,
452 curr_length,
453 total_slots,
454 cfg.immediate_durability,
455 ));
456
457 let cloned_core = sync::Arc::clone(&core);
458 let flush_tx_handle = match thread::Builder::new()
459 .name(format!("mod{}_fmmap_flush_tx", cfg.module_id))
460 .spawn(move || bg_flush_thread(cloned_core, cfg.flush_duration))
461 {
462 Ok(handle) => Some(handle),
463 Err(observed_error) => {
464 return err::new_err(err::FXE, observed_error);
465 }
466 };
467
468 Ok(Self { core, flush_tx_handle, _t: core::marker::PhantomData })
469 }
470
471 /// Create/open a new [`FrozenFile`] instance
472 fn open_file(
473 path: std::path::PathBuf,
474 cfg: &FrozenMMapCfg,
475 ) -> FrozenResult<(FrozenFile, usize)> {
476 let ff_cfg = FrozenFileCfg {
477 path,
478 module_id: cfg.module_id,
479 buffer_size: Self::SLOT_SIZE,
480 initial_available_buffers: cfg.initial_count,
481 };
482
483 let file = FrozenFile::new(ff_cfg)?;
484 let curr_length = file.length()?;
485
486 Ok((file, curr_length))
487 }
488
489 #[inline]
490 fn validate_t() -> FrozenResult<()> {
491 if std::mem::needs_drop::<T>() {
492 return err::new_err_default(err::DRP);
493 }
494
495 let align = std::mem::align_of::<T>();
496 if align != 8 {
497 return err::new_err_default(err::ALN);
498 }
499
500 let size = std::mem::size_of::<T>();
501 if size == 0 {
502 return err::new_err_default(err::ZRO);
503 }
504
505 if size % 8 != 0 {
506 return err::new_err_default(err::SZE);
507 }
508
509 Ok(())
510 }
511
512 /// Read a `T` at given `index` via callback (`f`)
513 ///
514 /// ## Concurrency
515 ///
516 /// Internally, [`FrozenMMap`] implements per-slot locking, so concurrent reads and writes for
517 /// at same index is atomic and thread safe, while operations on different indices may proceed
518 /// fully in parallel
519 ///
520 /// ## Safety
521 ///
522 /// The caller must ensure following:
523 ///
524 /// - given `index` is within bounds
525 /// - underlying memory contains a valid instance of `T`
526 /// - provided callback `f` must not,
527 /// - write through the pointer
528 /// - store or leak pointer beyound there lifetime
529 ///
530 /// Violating any of the above may result in undefined behavior
531 ///
532 /// ## Example
533 ///
534 /// ```
535 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
536 ///
537 /// const MODULE_ID: u8 = 0;
538 ///
539 /// let dir = tempfile::tempdir().unwrap();
540 /// let path = dir.path().join("tmp_read_mmap");
541 ///
542 /// let cfg = FrozenMMapCfg {
543 /// module_id: MODULE_ID,
544 /// initial_count: 0x02,
545 /// immediate_durability: false,
546 /// flush_duration: std::time::Duration::from_micros(0x60),
547 /// };
548 ///
549 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
550 ///
551 /// let ticket = unsafe { mmap.write(0, |v| *v = 0x0A) }.unwrap();
552 /// let _ = futures::executor::block_on(ticket).unwrap();
553 ///
554 /// let val = unsafe { mmap.read(0, |v| *v) };
555 /// assert_eq!(val, 0x0A);
556 /// ```
557 #[inline(always)]
558 pub unsafe fn read<R>(&self, index: usize, f: impl FnOnce(*const T) -> R) -> R {
559 let offset = Self::SLOT_SIZE * index;
560 let _lock = self.core.locks.lock(index);
561
562 // NOTE: We do avoid acquiring io_lock for read ops to increase the throughput (under the
563 // assumption of OS guarantees visibility)
564
565 let ptr = unsafe { self.core.map.as_ptr(offset) };
566 f(ptr)
567 }
568
569 /// Write/update a `T` at given `index` via callback (`f`)
570 ///
571 /// ## Concurrency
572 ///
573 /// Internally, [`FrozenMMap`] implements per-slot locking, so concurrent reads and writes for
574 /// at same index is atomic and thread safe, while operations on different indices may proceed
575 /// fully in parallel
576 ///
577 /// ## Safety
578 ///
579 /// The caller must ensure following:
580 ///
581 /// - given `index` is within bounds
582 /// - underlying memory contains a valid instance of `T`
583 /// - provided callback `f` must not store or leak pointer beyound there lifetime
584 ///
585 /// Violating any of the above may result in undefined behavior
586 ///
587 /// ## Example
588 ///
589 /// ```
590 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
591 ///
592 /// const MODULE_ID: u8 = 0;
593 ///
594 /// let dir = tempfile::tempdir().unwrap();
595 /// let path = dir.path().join("tmp_write_mmap");
596 ///
597 /// let cfg = FrozenMMapCfg {
598 /// module_id: MODULE_ID,
599 /// initial_count: 0x02,
600 /// immediate_durability: false,
601 /// flush_duration: std::time::Duration::from_micros(0x96),
602 /// };
603 ///
604 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
605 ///
606 /// let ticket = unsafe {mmap.write(1, |v| *v = 0x2B) }.unwrap();
607 /// let _ = futures::executor::block_on(ticket).unwrap();
608 ///
609 /// let val = unsafe { mmap.read(1, |v| *v) };
610 /// assert_eq!(val, 0x2B);
611 /// ```
612 #[inline(always)]
613 pub unsafe fn write(
614 &self,
615 index: usize,
616 f: impl FnOnce(*mut T),
617 ) -> FrozenResult<ack::AckTicket> {
618 // propagate prev errors
619 if let Some(err) = self.core.completion.get_err() {
620 return Err(err);
621 }
622
623 let offset = Self::SLOT_SIZE * index;
624
625 let _guard = self.core.acquire_io_lock();
626 let _lock = self.core.locks.lock(index);
627
628 let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
629 f(ptr);
630
631 self.core.dirty.store(true, atomic::Ordering::Release);
632 if self.core.immediate_durability {
633 self.core.cv.notify_one();
634 }
635
636 let epoch = self.core.completion.increment_current_epoch();
637 Ok(ack::AckTicket::new(epoch, self.core.completion.clone()))
638 }
639
640 /// Read current available count of slots, where each slot has size of [`FrozenMMap::<T>::SLOT_SIZE`]
641 ///
642 /// ## Working
643 ///
644 /// This call performs a syscall to fetch current length of [`FrozenFile`] from fs, as the
645 /// current length of the file is not cached anywhere in the pipeline to avoid TOCTAU race
646 /// conditions
647 ///
648 /// ## Example
649 ///
650 /// ```
651 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
652 ///
653 /// const MODULE_ID: u8 = 0;
654 ///
655 /// let dir = tempfile::tempdir().unwrap();
656 /// let path = dir.path().join("tmp_grow_mmap");
657 ///
658 /// let cfg = FrozenMMapCfg {
659 /// module_id: MODULE_ID,
660 /// initial_count: 0x02,
661 /// immediate_durability: false,
662 /// flush_duration: std::time::Duration::from_micros(0x96),
663 /// };
664 ///
665 /// let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
666 /// assert_eq!(mmap.total_slots(), 0x02);
667 ///
668 /// drop(mmap);
669 ///
670 /// let mmap = FrozenMMap::<u64>::new_grown(&path, cfg, 0x03).unwrap();
671 /// assert_eq!(mmap.total_slots(), 0x02 + 0x03);
672 /// ```
673 #[inline]
674 pub fn total_slots(&self) -> usize {
675 self.core.curr_length / Self::SLOT_SIZE
676 }
677
678 /// Read the total memory footprint (in bytes) used by [`FrozenMMap`]
679 ///
680 /// *NOTE:* This is an approzimation of memory used, actual RSS may differ depending on paging
681 /// and OS
682 ///
683 /// ## Example
684 ///
685 /// ```
686 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
687 ///
688 /// const MODULE_ID: u8 = 0;
689 ///
690 /// let dir = tempfile::tempdir().unwrap();
691 /// let path = dir.path().join("tmp_mem_usage");
692 ///
693 /// let cfg = FrozenMMapCfg {
694 /// module_id: MODULE_ID,
695 /// initial_count: 0x10,
696 /// immediate_durability: false,
697 /// flush_duration: std::time::Duration::from_micros(0x60),
698 /// };
699 ///
700 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
701 ///
702 /// let bytes = mmap.memory_usage();
703 /// assert!(bytes >= mmap.total_slots() * std::mem::size_of::<u64>());
704 /// ```
705 #[inline]
706 pub fn memory_usage(&self) -> usize {
707 // memory of mem-maped region
708 let mmap_bytes = self.core.curr_length;
709
710 // memory used for locking
711 let lock_bytes = self.core.locks.0.len() * std::mem::size_of::<atomic::AtomicU8>();
712
713 mmap_bytes + lock_bytes
714 }
715
716 /// Create a new [`FrozenMMapTransaction`] context for grouping multi write ops into a single atomic
717 /// operation
718 ///
719 /// ## Overview
720 ///
721 /// The use of [`FrozenMMapTransaction`] allows to group multiple write ops into a single atomic
722 /// operation, hence creating a transactional write operation, which gives following guarantees:
723 ///
724 /// - All write ops succeed together
725 /// - Single epoch to track durability of all writes ops
726 /// - Same durability guarantee for all the included write ops
727 ///
728 /// Simply, this preserves atomic durability semantics for multi index updates
729 ///
730 /// ## Example
731 ///
732 /// ```
733 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
734 ///
735 /// const MODULE_ID: u8 = 0;
736 ///
737 /// let dir = tempfile::tempdir().unwrap();
738 /// let path = dir.path().join("tmp_tx");
739 ///
740 /// let cfg = FrozenMMapCfg {
741 /// module_id: MODULE_ID,
742 /// initial_count: 0x0A,
743 /// immediate_durability: false,
744 /// flush_duration: std::time::Duration::from_micros(50),
745 /// };
746 ///
747 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
748 ///
749 /// let mut tx = mmap.new_tx();
750 /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
751 /// unsafe { tx.write(1, |v| *v = 0x14) }.unwrap();
752 ///
753 /// let ticket = tx.commit().unwrap();
754 /// let _ = futures::executor::block_on(ticket).unwrap();
755 ///
756 /// let v0 = unsafe { mmap.read(0, |v| *v) };
757 /// let v1 = unsafe { mmap.read(1, |v| *v) };
758 ///
759 /// assert_eq!((v0, v1), (0x0A, 0x14));
760 /// ```
761 #[inline]
762 pub fn new_tx(&self) -> FrozenMMapTransaction<'_, T> {
763 FrozenMMapTransaction { core: &self.core, ops_vec: Vec::new() }
764 }
765
766 /// Delete the underlying [`FrozenFile`] used for [`FrozenMMap`] from fs
767 ///
768 /// ## Working
769 ///
770 /// When `delete` is called, all read, write, and (background) sync ops are paused
771 /// (indefinitely), whule deletion is done with following steps:
772 ///
773 /// - acquire an exclusive `io_lock` (all other ops are paused indefinitely)
774 /// - if any batch is pending for sync,
775 /// - swap the flag
776 /// - call sync manually
777 /// - incr epoch and update cv
778 /// - brodcast closing so flusher tx could wrap up
779 /// - `munmap(2)` current mapping
780 /// - call delete on [`FrozenFile`]
781 ///
782 /// ## Example
783 ///
784 /// ```
785 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
786 ///
787 /// const MODULE_ID: u8 = 0;
788 ///
789 /// let dir = tempfile::tempdir().unwrap();
790 /// let path = dir.path().join("tmp_delete_mmap");
791 ///
792 /// let cfg = FrozenMMapCfg {
793 /// module_id: MODULE_ID,
794 /// initial_count: 0x04,
795 /// immediate_durability: false,
796 /// flush_duration: std::time::Duration::from_micros(0x96),
797 /// };
798 ///
799 /// let mut mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
800 /// mmap.delete().unwrap();
801 /// assert!(!path.exists());
802 /// ```
803 pub fn delete(&mut self) -> FrozenResult<()> {
804 // NOTE: we must broadcast that the close is happening to allow flusher tx to wrap up
805 self.core.closed.store(true, atomic::Ordering::Release);
806 if let Some(handle) = self.flush_tx_handle.take() {
807 let _ = handle.join();
808 }
809
810 // pause all new write ops
811 let _lock = self.core.acquire_exclusive_io_lock();
812
813 self.munmap()?;
814 self.core.file.delete()
815 }
816
817 /// Flush the dirty mmap data to persistant storage
818 ///
819 /// *NOTE:* This call must be paired w/ [`FrozenMMap::flush_file`] for stronger durability
820 /// guarantee
821 ///
822 /// ## Example
823 ///
824 /// ```
825 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
826 ///
827 /// const MODULE_ID: u8 = 0;
828 ///
829 /// let dir = tempfile::tempdir().unwrap();
830 /// let path = dir.path().join("tmp_delete_mmap");
831 ///
832 /// let cfg = FrozenMMapCfg {
833 /// module_id: MODULE_ID,
834 /// initial_count: 0x04,
835 /// immediate_durability: false,
836 /// flush_duration: std::time::Duration::from_micros(0x96),
837 /// };
838 ///
839 /// let mut mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
840 /// assert!(unsafe { mmap.flush_mmap() }.is_ok());
841 /// ```
842 #[inline]
843 pub unsafe fn flush_mmap(&self) -> FrozenResult<()> {
844 self.core.map.sync(self.core.curr_length)
845 }
846
847 /// Perform hard flush for the dirty mmap'ed data for stronger durability guarantee
848 ///
849 /// ## Example
850 ///
851 /// ```
852 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
853 ///
854 /// const MODULE_ID: u8 = 0;
855 ///
856 /// let dir = tempfile::tempdir().unwrap();
857 /// let path = dir.path().join("tmp_delete_mmap");
858 ///
859 /// let cfg = FrozenMMapCfg {
860 /// module_id: MODULE_ID,
861 /// initial_count: 0x04,
862 /// immediate_durability: false,
863 /// flush_duration: std::time::Duration::from_micros(0x96),
864 /// };
865 ///
866 /// let mut mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
867 /// assert!(unsafe { mmap.flush_file() }.is_ok());
868 /// ```
869 #[inline]
870 pub unsafe fn flush_file(&self) -> FrozenResult<()> {
871 self.core.file.sync()
872 }
873
874 #[inline]
875 fn munmap(&self) -> FrozenResult<()> {
876 let length = self.core.curr_length;
877 unsafe { self.core.map.unmap(length) }
878 }
879}
880
881impl<T> Drop for FrozenMMap<T>
882where
883 T: Sized + Send + Sync,
884{
885 fn drop(&mut self) {
886 let is_closed = self.core.closed.swap(true, atomic::Ordering::Release);
887 self.core.cv.notify_one(); // notify flusher tx to shut
888
889 if let Some(handle) = self.flush_tx_handle.take() {
890 let _ = handle.join();
891 }
892
893 // we must acquire an exclusive lock, to prevent dropping while sync, growing or any io ops
894 let _io_lock = self.core.acquire_exclusive_io_lock();
895
896 if !is_closed {
897 let _ = self.munmap();
898 }
899 }
900}
901
902impl<T> fmt::Display for FrozenMMap<T>
903where
904 T: Sized + Send + Sync,
905{
906 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
907 write!(
908 f,
909 "FrozenMMap{{fd: {}, total_slots: {}, len: {}}}",
910 self.core.file.fd(),
911 self.total_slots(),
912 self.core.curr_length,
913 )
914 }
915}
916
917/// ## Why we ignore [`std::sync::PoisonError`]?
918///
919/// The mutex used for lock, is solely used as a parking primitive for [`Condvar`] and does not
920/// protect any mutable state. All the pool invariants and accounting are maintained via atomics
921/// and are completely seperated from the mutex.
922///
923/// A poisoned mutex only indicates that another tx panicked while holding the lock, and indicates
924/// an inconsistent state of the protected value. Since no state can be left partially modified
925/// under this lock, there is no possible consistency risk to recover from and propagating the
926/// poison error would only introduce unnecessary failures into the allocation path.
927///
928/// Therefore, as best effort, we consume the [`std::sync::PoisonError`] and continue operating
929/// with the recovered guard.
930fn bg_flush_thread(core: sync::Arc<Core>, flush_duration: time::Duration) {
931 // init phase (acquiring locks)
932 let mut guard = core.lock.lock().unwrap_or_else(|e| e.into_inner());
933
934 // sync loop w/ non-busy waiting
935 loop {
936 (guard, _) = core.cv.wait_timeout(guard, flush_duration).unwrap_or_else(|e| e.into_inner());
937
938 // NOTE: we must read values of close brodcast before acquire exclusive lock, if done
939 // otherwise, we impose serious deadlock sort of situation for the the flusher tx
940
941 let dirty = core.dirty.swap(false, atomic::Ordering::AcqRel);
942 let closing = core.closed.load(atomic::Ordering::Acquire);
943
944 if !dirty {
945 if closing {
946 core.cv.notify_all();
947 return;
948 }
949
950 continue;
951 }
952
953 // INFO: we must acquire an exclusive IO lock for sync, hence no write, read or
954 // grow could kick in while sync is in progress
955
956 let io_lock = core.acquire_exclusive_io_lock();
957
958 // INFO: we must drop the guard before syscall, as its a blocking operation and holding
959 // the mutex while the syscall takes place is not a good idea, while we drop the mutex
960 // and acqurie it again, in-between other process could acquire it and use it
961 drop(guard);
962
963 // NOTE: We snapshot `current_epoch` before sync to establish a strict batch boundary,
964 // all writes up to `target_epoch` are guaranteed to be part of this flush window,
965 // while anything beyound belongs to the next batch
966 let target_epoch = core.completion.read_current_epoch();
967
968 // NOTE:
969 //
970 // - if sync fails, we update the Core::error w/ the received error object
971 // - we clear it up when another sync call succeeds
972 // - this is valid, as the underlying sync flushes entire mmaped region, hence
973 // even if the last call failed, and the new one succeeded, we do get the durability
974 // guarenty for the old data as well
975
976 if let Err(new_error) = core.sync() {
977 core.completion.set_err(new_error);
978 } else {
979 core.completion.mark_epoch_as_durable(target_epoch);
980 core.completion.del_err();
981 }
982
983 // NOTE: notify all listeners currently waiting for durability progress
984 core.completion.notify_all_listeners();
985
986 drop(io_lock);
987 guard = core.acquire_lock();
988 }
989}
990
991/// A context for grouping multi write ops into a single atomic operation
992///
993/// ## Overview
994///
995/// Use of [`FrozenMMapTransaction`] allows to group multiple write ops into a single atomic operation.
996///
997/// - All included writes are applied together
998/// - Single epoch is assinged for an entier transaction
999/// - Durability guarantee is same for all included write ops
1000///
1001/// ## Example
1002///
1003/// ```
1004/// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
1005///
1006/// const MODULE_ID: u8 = 0;
1007///
1008/// let dir = tempfile::tempdir().unwrap();
1009/// let path = dir.path().join("tmp_tx");
1010///
1011/// let cfg = FrozenMMapCfg {
1012/// module_id: MODULE_ID,
1013/// initial_count: 0x0A,
1014/// immediate_durability: false,
1015/// flush_duration: std::time::Duration::from_micros(50),
1016/// };
1017///
1018/// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1019///
1020/// let mut tx = mmap.new_tx();
1021/// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
1022/// unsafe { tx.write(1, |v| *v = 0x14) }.unwrap();
1023/// unsafe { tx.write(2, |v| *v = 0x18) }.unwrap();
1024///
1025/// let ticket = tx.commit().unwrap();
1026/// let _ = futures::executor::block_on(ticket).unwrap();
1027///
1028/// let v0 = unsafe { mmap.read(0, |v| *v) };
1029/// let v1 = unsafe { mmap.read(1, |v| *v) };
1030/// let v2 = unsafe { mmap.read(2, |v| *v) };
1031///
1032/// assert_eq!((v0, v1, v2), (0x0A, 0x14, 0x18));
1033/// ```
1034pub struct FrozenMMapTransaction<'a, T> {
1035 core: &'a Core,
1036 ops_vec: Vec<(usize, Box<dyn FnOnce(*mut T) + 'a>)>,
1037}
1038
1039impl<'a, T> FrozenMMapTransaction<'a, T> {
1040 /// Append a write op into the [`FrozenMMapTransaction`]
1041 ///
1042 /// ## Requirements
1043 ///
1044 /// Write ops must follow these safety requirements,
1045 ///
1046 /// - No duplicate indices
1047 /// - No out-of-order writes (must be incremental)
1048 ///
1049 /// Violating this constraint would result in [`FrozenError`]
1050 ///
1051 /// ## Safety
1052 ///
1053 /// Same safety requirements as [`FrozenMMap::write`] apply here
1054 ///
1055 /// ## Example
1056 ///
1057 /// ```
1058 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
1059 ///
1060 /// const MODULE_ID: u8 = 0;
1061 ///
1062 /// let dir = tempfile::tempdir().unwrap();
1063 /// let path = dir.path().join("tmp_tx");
1064 ///
1065 /// let cfg = FrozenMMapCfg {
1066 /// module_id: MODULE_ID,
1067 /// initial_count: 0x10,
1068 /// immediate_durability: false,
1069 /// flush_duration: std::time::Duration::from_micros(50),
1070 /// };
1071 ///
1072 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1073 ///
1074 /// let mut tx = mmap.new_tx();
1075 /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
1076 /// unsafe { tx.write(1, |v| *v = 0x0B) }.unwrap();
1077 /// unsafe { tx.write(2, |v| *v = 0x0C) }.unwrap();
1078 ///
1079 /// let ticket = tx.commit().unwrap();
1080 /// let _ = futures::executor::block_on(ticket).unwrap();
1081 ///
1082 /// let v0 = unsafe { mmap.read(0, |v| *v) };
1083 /// let v1 = unsafe { mmap.read(1, |v| *v) };
1084 /// let v2 = unsafe { mmap.read(2, |v| *v) };
1085 ///
1086 /// assert_eq!((v0, v1, v2), (0x0A, 0x0B, 0x0C));
1087 /// ```
1088 #[inline(always)]
1089 pub unsafe fn write<F>(&mut self, index: usize, f: F) -> FrozenResult<()>
1090 where
1091 F: FnOnce(*mut T) + 'a,
1092 {
1093 // NOTE:
1094 //
1095 // This check prevents a potential footgun! For a safe transaction all writes must be,
1096 // - ordered by index (either incr or decr)
1097 // - no multi writes on same index
1098 //
1099 // If any of these is violated, there is a certain risk of deadlock in multi tx env's
1100 if let Some((last_idx, _)) = self.ops_vec.last() {
1101 if index <= *last_idx {
1102 return err::new_err(
1103 err::HCF,
1104 "tx writes must be strictly ordered, with no more then single ops on given index",
1105 );
1106 }
1107 }
1108
1109 self.ops_vec.push((index, Box::new(f)));
1110 Ok(())
1111 }
1112
1113 /// Commit the transaction, applying all the writes ops, combined into a single atomic operation
1114 ///
1115 /// ## Guarantees
1116 ///
1117 /// - All writes are applied under a single epoch
1118 /// - All writes belong to the same durability batch
1119 /// - No interleaving with other transactions at epoch level
1120 ///
1121 /// ## Example
1122 ///
1123 /// ```
1124 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
1125 ///
1126 /// const MODULE_ID: u8 = 0;
1127 ///
1128 /// let dir = tempfile::tempdir().unwrap();
1129 /// let path = dir.path().join("tmp_tx");
1130 ///
1131 /// let cfg = FrozenMMapCfg {
1132 /// module_id: MODULE_ID,
1133 /// initial_count: 0x10,
1134 /// immediate_durability: false,
1135 /// flush_duration: std::time::Duration::from_micros(50),
1136 /// };
1137 ///
1138 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1139 ///
1140 /// let mut tx = mmap.new_tx();
1141 /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
1142 /// unsafe { tx.write(2, |v| *v = 0x0C) }.unwrap();
1143 ///
1144 /// let ticket = tx.commit().unwrap();
1145 /// let _ = futures::executor::block_on(ticket).unwrap();
1146 ///
1147 /// let v0 = unsafe { mmap.read(0, |v| *v) };
1148 /// let v1 = unsafe { mmap.read(2, |v| *v) };
1149 ///
1150 /// assert_eq!((v0, v1), (0x0A, 0x0C));
1151 /// ```
1152 #[inline(always)]
1153 pub fn commit(self) -> FrozenResult<ack::AckTicket> {
1154 if let Some(err) = self.core.completion.get_err() {
1155 return Err(err);
1156 }
1157
1158 let _guard = self.core.acquire_io_lock();
1159
1160 // NOTE: we must acquire all locks beforehand, to make sure all the write go through
1161 let mut guards = Vec::with_capacity(self.ops_vec.len());
1162 for (idx, _) in &self.ops_vec {
1163 guards.push(self.core.locks.lock(*idx));
1164 }
1165
1166 for (idx, op) in self.ops_vec {
1167 let offset = idx * std::mem::size_of::<T>();
1168 let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
1169 op(ptr);
1170 }
1171
1172 self.core.dirty.store(true, atomic::Ordering::Release);
1173 if self.core.immediate_durability {
1174 self.core.cv.notify_one();
1175 }
1176
1177 let epoch = self.core.completion.increment_current_epoch();
1178 Ok(ack::AckTicket::new(epoch, self.core.completion.clone()))
1179 }
1180}
1181
1182#[derive(Debug)]
1183struct Core {
1184 completion: sync::Arc<ack::Completion>,
1185 cv: sync::Condvar,
1186 curr_length: usize,
1187 dirty: atomic::AtomicBool,
1188 io_lock: sync::RwLock<()>,
1189 immediate_durability: bool,
1190 map: TMap,
1191 locks: Locks,
1192 lock: sync::Mutex<()>,
1193 file: FrozenFile,
1194 closed: atomic::AtomicBool,
1195}
1196
1197unsafe impl Send for Core {}
1198unsafe impl Sync for Core {}
1199
1200impl Core {
1201 fn new(
1202 map: TMap,
1203 file: FrozenFile,
1204 curr_length: usize,
1205 total_slots: usize,
1206 immediate_durability: bool,
1207 ) -> Self {
1208 Self {
1209 map,
1210 file,
1211 curr_length,
1212 immediate_durability,
1213 completion: sync::Arc::new(ack::Completion::default()),
1214 cv: sync::Condvar::new(),
1215 lock: sync::Mutex::new(()),
1216 io_lock: sync::RwLock::new(()),
1217 locks: Locks::new(total_slots),
1218 dirty: atomic::AtomicBool::new(false),
1219 closed: atomic::AtomicBool::new(false),
1220 }
1221 }
1222
1223 #[inline]
1224 fn sync(&self) -> FrozenResult<()> {
1225 unsafe { self.map.sync(self.curr_length) }?;
1226 self.file.sync()
1227 }
1228
1229 /// NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
1230 #[inline]
1231 fn acquire_lock(&self) -> sync::MutexGuard<'_, ()> {
1232 self.lock.lock().unwrap_or_else(|e| e.into_inner())
1233 }
1234
1235 /// NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
1236 #[inline]
1237 fn acquire_io_lock(&self) -> sync::RwLockReadGuard<'_, ()> {
1238 self.io_lock.read().unwrap_or_else(|e| e.into_inner())
1239 }
1240
1241 /// NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
1242 #[inline]
1243 fn acquire_exclusive_io_lock(&self) -> sync::RwLockWriteGuard<'_, ()> {
1244 self.io_lock.write().unwrap_or_else(|e| e.into_inner())
1245 }
1246}
1247
1248#[derive(Debug)]
1249struct Locks(Box<[atomic::AtomicU8]>);
1250
1251impl Locks {
1252 const LOCK: u8 = 1;
1253 const UNLOCK: u8 = 0;
1254
1255 const L1_CONTENTION: usize = 0x10;
1256 const L2_CONTENTION: usize = 0x20;
1257
1258 fn new(cap: usize) -> Self {
1259 let mut slots = Vec::with_capacity(cap);
1260 for _ in 0..cap {
1261 slots.push(atomic::AtomicU8::new(Self::UNLOCK));
1262 }
1263
1264 Self(slots.into_boxed_slice())
1265 }
1266
1267 #[inline(always)]
1268 fn lock(&self, index: usize) -> LockGuard<'_> {
1269 let val = &self.0[index];
1270 let mut spins = 0;
1271
1272 loop {
1273 if val
1274 .compare_exchange_weak(
1275 Self::UNLOCK,
1276 Self::LOCK,
1277 atomic::Ordering::Acquire,
1278 atomic::Ordering::Relaxed,
1279 )
1280 .is_ok()
1281 {
1282 return LockGuard(val);
1283 }
1284
1285 // we must backoff under contention
1286
1287 // NOTE:
1288 //
1289 // We have three levels of backoff,
1290 //
1291 // 1. Busy waiting w/ CPU hint
1292 // 2. Willingly allow preemption by OS schedular
1293 // 3. Sleep the thread (increasing w/ exponenial factor)
1294
1295 if hints::likely(spins < Self::L1_CONTENTION) {
1296 std::hint::spin_loop();
1297 } else if spins < Self::L2_CONTENTION {
1298 thread::yield_now();
1299 } else {
1300 let ns = 0x30 << (spins - Self::L2_CONTENTION).min(0x0A);
1301 thread::sleep(time::Duration::from_nanos(ns));
1302 }
1303
1304 spins += 1;
1305 }
1306 }
1307}
1308
1309struct LockGuard<'a>(&'a atomic::AtomicU8);
1310
1311impl Drop for LockGuard<'_> {
1312 fn drop(&mut self) {
1313 self.0.store(Locks::UNLOCK, atomic::Ordering::Release);
1314 }
1315}
1316
1317#[cfg(test)]
1318mod tests {
1319 use super::*;
1320
1321 // NOTE: we keep this small on purpose, so we won't have to wait at all in tests
1322 const FLUSH_DURATION: time::Duration = time::Duration::from_micros(10);
1323
1324 const MODULE_ID: u8 = 0;
1325 const INIT_SLOTS: usize = 0x0A;
1326
1327 fn new_tmp() -> (tempfile::TempDir, std::path::PathBuf, FrozenMMapCfg) {
1328 let dir = tempfile::tempdir().unwrap();
1329 let path = dir.path().join("tmp_map");
1330
1331 let cfg = FrozenMMapCfg {
1332 module_id: MODULE_ID,
1333 initial_count: INIT_SLOTS,
1334 immediate_durability: false,
1335 flush_duration: FLUSH_DURATION,
1336 };
1337
1338 (dir, path, cfg)
1339 }
1340
1341 mod fm_lifecycle {
1342 use super::*;
1343
1344 #[test]
1345 fn ok_new() {
1346 let (_dir, path, cfg) = new_tmp();
1347 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1348
1349 assert!(!mmap.core.dirty.load(atomic::Ordering::Acquire));
1350 assert!(!mmap.core.closed.load(atomic::Ordering::Acquire));
1351 assert_eq!(mmap.core.curr_length, INIT_SLOTS * FrozenMMap::<u64>::SLOT_SIZE);
1352
1353 assert_eq!(mmap.core.completion.read_current_epoch(), 0);
1354 assert_eq!(mmap.core.completion.read_durable_epoch(), 0);
1355
1356 // satisfies the bg thread was spawned correctly
1357 assert!(mmap.core.completion.get_err().is_none());
1358
1359 // satisfies wait on epoch works
1360 let ticket = unsafe { mmap.write(0, |f| *f = 0x0A).unwrap() };
1361
1362 let ticket_epoch = ticket.epoch();
1363 let durable_epoch = futures::executor::block_on(ticket).unwrap();
1364
1365 assert!(durable_epoch >= ticket_epoch);
1366 }
1367
1368 #[test]
1369 fn ok_new_existing() {
1370 let (_dir, path, cfg) = new_tmp();
1371
1372 // create new + close
1373 let mmap1 = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1374 drop(mmap1);
1375
1376 // open existing
1377 let mmap2 = FrozenMMap::<u64>::new(path, cfg).unwrap();
1378 drop(mmap2);
1379 }
1380
1381 #[test]
1382 fn err_new_when_change_in_cfg() {
1383 let (_dir, path, mut cfg) = new_tmp();
1384
1385 // create new + close
1386 let mmap1 = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1387 drop(mmap1);
1388
1389 // update cfg + opne existing
1390 cfg.initial_count = INIT_SLOTS * 2;
1391 assert!(FrozenMMap::<u64>::new(path, cfg).is_err());
1392 }
1393
1394 #[test]
1395 fn ok_delete() {
1396 let (_dir, path, cfg) = new_tmp();
1397 let mut mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1398
1399 mmap.delete().unwrap();
1400 assert!(!mmap.core.file.exists().unwrap());
1401 }
1402
1403 #[test]
1404 fn err_delete_after_delete() {
1405 let (_dir, path, cfg) = new_tmp();
1406 let mut mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1407
1408 mmap.delete().unwrap();
1409 assert!(!mmap.core.file.exists().unwrap());
1410 assert!(mmap.delete().is_err());
1411 }
1412
1413 #[test]
1414 fn ok_drop_persists_when_dropped_before_bg_flush() {
1415 let (_dir, path, cfg) = new_tmp();
1416 const VAL: u64 = 0x0A;
1417
1418 {
1419 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1420 unsafe { mmap.write(0, |byte| *byte = VAL).unwrap() };
1421 drop(mmap);
1422 }
1423
1424 {
1425 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1426 let val = unsafe { mmap.read(0, |byte| *byte) };
1427 assert_eq!(val, VAL);
1428 }
1429 }
1430 }
1431
1432 mod fm_validate_t {
1433 use super::*;
1434
1435 #[repr(C, align(8))]
1436 struct OkT {
1437 a: u64,
1438 b: u64,
1439 }
1440
1441 #[repr(C)]
1442 struct BadAlignT {
1443 a: u32,
1444 b: u32,
1445 }
1446
1447 #[repr(C, align(4))]
1448 struct BadSizeT {
1449 a: u32,
1450 b: u16,
1451 }
1452
1453 #[repr(C, align(8))]
1454 struct DropT(u64);
1455
1456 impl Drop for DropT {
1457 fn drop(&mut self) {}
1458 }
1459
1460 #[repr(C, align(8))]
1461 struct ZstT;
1462
1463 #[test]
1464 fn ok_validate_t() {
1465 assert!(FrozenMMap::<OkT>::validate_t().is_ok());
1466 }
1467
1468 #[test]
1469 fn err_validate_t_when_drop() {
1470 assert!(FrozenMMap::<DropT>::validate_t().is_err());
1471 }
1472
1473 #[test]
1474 fn err_validate_t_when_not_8_byte_aligned() {
1475 assert!(FrozenMMap::<BadAlignT>::validate_t().is_err());
1476 }
1477
1478 #[test]
1479 fn err_validate_t_when_size_not_multiple_of_8() {
1480 assert!(FrozenMMap::<BadSizeT>::validate_t().is_err());
1481 }
1482
1483 #[test]
1484 fn err_validate_t_when_zero_sized() {
1485 assert!(FrozenMMap::<ZstT>::validate_t().is_err());
1486 }
1487
1488 #[test]
1489 fn err_new_when_t_implements_drop() {
1490 let (_dir, path, cfg) = new_tmp();
1491 assert!(FrozenMMap::<DropT>::new(path, cfg).is_err());
1492 }
1493
1494 #[test]
1495 fn err_new_when_t_is_not_8_byte_aligned() {
1496 let (_dir, path, cfg) = new_tmp();
1497 assert!(FrozenMMap::<BadAlignT>::new(path, cfg).is_err());
1498 }
1499
1500 #[test]
1501 fn err_new_when_t_size_is_not_multiple_of_8() {
1502 let (_dir, path, cfg) = new_tmp();
1503 assert!(FrozenMMap::<BadSizeT>::new(path, cfg).is_err());
1504 }
1505
1506 #[test]
1507 fn err_new_when_t_is_zero_sized() {
1508 let (_dir, path, cfg) = new_tmp();
1509 assert!(FrozenMMap::<ZstT>::new(path, cfg).is_err());
1510 }
1511
1512 #[test]
1513 fn err_new_grown_when_t_implements_drop() {
1514 let (_dir, path, cfg) = new_tmp();
1515 assert!(FrozenMMap::<DropT>::new_grown(path, cfg, 1).is_err());
1516 }
1517
1518 #[test]
1519 fn err_new_grown_when_t_is_not_8_byte_aligned() {
1520 let (_dir, path, cfg) = new_tmp();
1521 assert!(FrozenMMap::<BadAlignT>::new_grown(path, cfg, 1).is_err());
1522 }
1523
1524 #[test]
1525 fn err_new_grown_when_t_size_is_not_multiple_of_8() {
1526 let (_dir, path, cfg) = new_tmp();
1527 assert!(FrozenMMap::<BadSizeT>::new_grown(path, cfg, 1).is_err());
1528 }
1529
1530 #[test]
1531 fn err_new_grown_when_t_is_zero_sized() {
1532 let (_dir, path, cfg) = new_tmp();
1533 assert!(FrozenMMap::<ZstT>::new_grown(path, cfg, 1).is_err());
1534 }
1535 }
1536
1537 mod fm_new_grown {
1538 use super::*;
1539
1540 #[test]
1541 fn ok_new_grown_updates_length() {
1542 let (_dir, path, cfg) = new_tmp();
1543
1544 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1545 assert_eq!(mmap.total_slots(), INIT_SLOTS);
1546 drop(mmap);
1547
1548 let mmap = FrozenMMap::<u64>::new_grown(path, cfg, 0x0A).unwrap();
1549 assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x0A);
1550 assert_eq!(mmap.core.curr_length, (INIT_SLOTS + 0x0A) * FrozenMMap::<u64>::SLOT_SIZE);
1551 }
1552
1553 #[test]
1554 fn err_new_grown_with_preexisting_instance() {
1555 let (_dir, path, cfg) = new_tmp();
1556
1557 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1558 assert_eq!(mmap.total_slots(), INIT_SLOTS);
1559
1560 assert!(FrozenMMap::<u64>::new_grown(path, cfg, 0x0A).is_err());
1561 }
1562
1563 #[test]
1564 fn ok_new_grown_cycle() {
1565 let (_dir, path, cfg) = new_tmp();
1566
1567 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1568 drop(mmap);
1569
1570 let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x100).unwrap();
1571 assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x100);
1572 drop(mmap);
1573
1574 let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x100).unwrap();
1575 assert_eq!(mmap.total_slots(), INIT_SLOTS + (2 * 0x100));
1576 drop(mmap);
1577
1578 let mmap = FrozenMMap::<u64>::new_grown(path, cfg, 0x100).unwrap();
1579 assert_eq!(mmap.total_slots(), INIT_SLOTS + (3 * 0x100));
1580 }
1581
1582 #[test]
1583 fn ok_write_reopen_grown_read() {
1584 let (_dir, path, cfg) = new_tmp();
1585
1586 {
1587 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1588 unsafe { mmap.write(0, |v| *v = 0xAA).unwrap() };
1589 }
1590
1591 {
1592 let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x10).unwrap();
1593 unsafe { mmap.write(0, |v| *v = 0xBB).unwrap() };
1594
1595 let val = unsafe { mmap.read(0, |v| *v) };
1596 assert_eq!(val, 0xBB);
1597 }
1598 }
1599
1600 #[test]
1601 fn ok_write_reopen_grown_read_cycle() {
1602 let (_dir, path, cfg) = new_tmp();
1603
1604 {
1605 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1606 unsafe { mmap.write(0, |v| *v = 1).unwrap() };
1607 }
1608
1609 for i in 0..2 {
1610 let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x10).unwrap();
1611 let idx = mmap.total_slots() - 1;
1612 unsafe { mmap.write(idx, |v| *v = (i + 2) as u64).unwrap() };
1613 drop(mmap);
1614 }
1615
1616 let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1617
1618 let base = unsafe { mmap.read(0, |v| *v) };
1619 assert_eq!(base, 1);
1620
1621 let last_idx = mmap.total_slots() - 1;
1622 let last = unsafe { mmap.read(last_idx, |v| *v) };
1623 assert_eq!(last, 3);
1624 }
1625
1626 #[test]
1627 fn err_new_grown_while_previous_instance_is_alive() {
1628 let (_dir, path, cfg) = new_tmp();
1629
1630 let _mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1631 let reopened = FrozenMMap::<u64>::new_grown(&path, cfg, 0x10);
1632
1633 assert!(reopened.is_err());
1634 }
1635 }
1636
1637 mod fm_write_read {
1638 use super::*;
1639
1640 #[test]
1641 fn ok_write_wait_read_cycle() {
1642 const VAL: u64 = 0xDEADC0DE;
1643 let (_dir, path, cfg) = new_tmp();
1644 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1645
1646 // write + sync
1647 let ticket = unsafe { mmap.write(0, |ptr| *ptr = VAL).unwrap() };
1648
1649 let ticket_epoch = ticket.epoch();
1650 let durable_epoch = futures::executor::block_on(ticket).unwrap();
1651
1652 assert!(durable_epoch >= ticket_epoch);
1653
1654 // read + verify
1655 let val = unsafe { mmap.read(0, |ptr| *ptr) };
1656 assert_eq!(val, VAL);
1657 }
1658
1659 #[test]
1660 fn ok_write_read_without_wait() {
1661 const VAL: u64 = 0xDEADC0DE;
1662 let (_dir, path, cfg) = new_tmp();
1663 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1664
1665 unsafe { mmap.write(0, |ptr| *ptr = VAL).unwrap() };
1666 let val = unsafe { mmap.read(0, |ptr| *ptr) };
1667 assert_eq!(val, VAL);
1668 }
1669 }
1670
1671 mod fm_tx {
1672 use super::*;
1673
1674 #[test]
1675 fn ok_tx_basic_multi_write() {
1676 let (_dir, path, cfg) = new_tmp();
1677 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1678
1679 let mut tx = mmap.new_tx();
1680 unsafe {
1681 tx.write(0, |v| *v = 1).unwrap();
1682 tx.write(1, |v| *v = 2).unwrap();
1683 tx.write(2, |v| *v = 3).unwrap();
1684 }
1685
1686 let ticket = tx.commit().unwrap();
1687
1688 let ticket_epoch = ticket.epoch();
1689 let durable_epoch = futures::executor::block_on(ticket).unwrap();
1690
1691 assert!(durable_epoch >= ticket_epoch);
1692
1693 let v0 = unsafe { mmap.read(0, |v| *v) };
1694 let v1 = unsafe { mmap.read(1, |v| *v) };
1695 let v2 = unsafe { mmap.read(2, |v| *v) };
1696
1697 assert_eq!((v0, v1, v2), (1, 2, 3));
1698 }
1699
1700 #[test]
1701 fn ok_tx_single_epoch() {
1702 let (_dir, path, cfg) = new_tmp();
1703 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1704
1705 let mut tx = mmap.new_tx();
1706 unsafe {
1707 tx.write(0, |v| *v = 0x0A).unwrap();
1708 tx.write(1, |v| *v = 0x14).unwrap();
1709 }
1710
1711 // NOTE: next write must have strictly higher epoch then current one
1712
1713 let ticket = tx.commit().unwrap();
1714 let next_ticket = unsafe { mmap.write(2, |v| *v = 0x1E).unwrap() };
1715
1716 assert!(next_ticket.epoch() > ticket.epoch());
1717 }
1718
1719 #[test]
1720 fn err_tx_out_of_order() {
1721 let (_dir, path, cfg) = new_tmp();
1722 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1723
1724 let mut tx = mmap.new_tx();
1725 unsafe {
1726 tx.write(2, |v| *v = 1).unwrap();
1727 let res = tx.write(1, |v| *v = 2);
1728 assert!(res.is_err());
1729 }
1730 }
1731
1732 #[test]
1733 fn err_tx_duplicate_index() {
1734 let (_dir, path, cfg) = new_tmp();
1735 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1736
1737 let mut tx = mmap.new_tx();
1738 unsafe {
1739 tx.write(1, |v| *v = 1).unwrap();
1740 let res = tx.write(1, |v| *v = 2);
1741 assert!(res.is_err());
1742 }
1743 }
1744
1745 #[test]
1746 fn ok_tx_concurrent_non_overlapping() {
1747 let (_dir, path, cfg) = new_tmp();
1748 let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
1749
1750 let mut handles = Vec::new();
1751 for i in 0..2 {
1752 let mmap = mmap.clone();
1753 handles.push(thread::spawn(move || {
1754 let mut tx = mmap.new_tx();
1755
1756 unsafe {
1757 tx.write(i * 2, |v| *v = i as u64).unwrap();
1758 tx.write(i * 2 + 1, |v| *v = i as u64).unwrap();
1759 }
1760
1761 let ticket = tx.commit().unwrap();
1762
1763 let ticket_epoch = ticket.epoch();
1764 let durable_epoch = futures::executor::block_on(ticket).unwrap();
1765
1766 assert!(durable_epoch >= ticket_epoch);
1767 }));
1768 }
1769
1770 for h in handles {
1771 h.join().unwrap();
1772 }
1773
1774 for i in 0..2 {
1775 let v0 = unsafe { mmap.read(i * 2, |v| *v) };
1776 let v1 = unsafe { mmap.read(i * 2 + 1, |v| *v) };
1777
1778 assert_eq!((v0, v1), (i as u64, i as u64));
1779 }
1780 }
1781
1782 #[test]
1783 fn ok_tx_overwrite_last_wins() {
1784 let (_dir, path, cfg) = new_tmp();
1785 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1786
1787 let mut tx = mmap.new_tx();
1788 unsafe {
1789 tx.write(0, |v| *v = 1).unwrap();
1790 }
1791
1792 tx.commit().unwrap();
1793
1794 let mut tx2 = mmap.new_tx();
1795 unsafe {
1796 tx2.write(0, |v| *v = 2).unwrap();
1797 }
1798
1799 let ticket = tx2.commit().unwrap();
1800
1801 let ticket_epoch = ticket.epoch();
1802 let durable_epoch = futures::executor::block_on(ticket).unwrap();
1803
1804 assert!(durable_epoch >= ticket_epoch);
1805
1806 let val = unsafe { mmap.read(0, |v| *v) };
1807 assert_eq!(val, 2);
1808 }
1809
1810 #[test]
1811 fn ok_tx_persists_across_reopen() {
1812 let (_dir, path, cfg) = new_tmp();
1813
1814 {
1815 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1816
1817 let mut tx = mmap.new_tx();
1818 unsafe {
1819 tx.write(0, |v| *v = 0x3A).unwrap();
1820 tx.write(1, |v| *v = 0x54).unwrap();
1821 }
1822
1823 let ticket = tx.commit().unwrap();
1824
1825 let ticket_epoch = ticket.epoch();
1826 let durable_epoch = futures::executor::block_on(ticket).unwrap();
1827
1828 assert!(durable_epoch >= ticket_epoch);
1829 }
1830
1831 {
1832 let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1833
1834 let v0 = unsafe { mmap.read(0, |v| *v) };
1835 let v1 = unsafe { mmap.read(1, |v| *v) };
1836
1837 assert_eq!((v0, v1), (0x3A, 0x54));
1838 }
1839 }
1840 }
1841
1842 mod fm_durability {
1843 use super::*;
1844
1845 #[test]
1846 fn ok_wait_then_drop() {
1847 let (_dir, path, cfg) = new_tmp();
1848 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1849
1850 let ticket = unsafe { mmap.write(0, |v| *v = 7).unwrap() };
1851
1852 let ticket_epoch = ticket.epoch();
1853 let durable_epoch = futures::executor::block_on(ticket).unwrap();
1854
1855 assert!(durable_epoch >= ticket_epoch);
1856
1857 drop(mmap);
1858 }
1859
1860 #[test]
1861 fn ok_epoch_monotonicity() {
1862 let (_dir, path, cfg) = new_tmp();
1863 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1864
1865 let t1 = unsafe { mmap.write(0, |v| *v = 1).unwrap() };
1866 let t2 = unsafe { mmap.write(0, |v| *v = 2).unwrap() };
1867
1868 let durable_epoch = futures::executor::block_on(t2).unwrap();
1869 assert!(durable_epoch >= t1.epoch());
1870 }
1871
1872 #[test]
1873 fn ok_wait_for_durability_with_multi_writers() {
1874 let (_dir, path, cfg) = new_tmp();
1875 let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
1876
1877 let mut handles = Vec::new();
1878 for _ in 0..2 {
1879 let mmap = mmap.clone();
1880 handles.push(thread::spawn(move || {
1881 let ticket = unsafe { mmap.write(0, |v| *v += 1).unwrap() };
1882
1883 let ticket_epoch = ticket.epoch();
1884 let durable_epoch = futures::executor::block_on(ticket).unwrap();
1885
1886 assert!(durable_epoch >= ticket_epoch);
1887 }));
1888 }
1889
1890 for h in handles {
1891 h.join().unwrap();
1892 }
1893
1894 let val = unsafe { mmap.read(0, |v| *v) };
1895 assert_eq!(val, 2);
1896 }
1897 }
1898
1899 mod fm_concurrency {
1900 use super::*;
1901
1902 #[test]
1903 fn ok_parallel_reads_with_diff_index() {
1904 let (_dir, path, cfg) = new_tmp();
1905 let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
1906
1907 unsafe { mmap.write(0, |v| *v = 0x10).unwrap() };
1908 unsafe { mmap.write(1, |v| *v = 0x20).unwrap() };
1909
1910 let t1 = {
1911 let mmap = mmap.clone();
1912 thread::spawn(move || unsafe { mmap.read(0, |v| *v) })
1913 };
1914
1915 let t2 = {
1916 let mmap = mmap.clone();
1917 thread::spawn(move || unsafe { mmap.read(1, |v| *v) })
1918 };
1919
1920 assert_eq!(t1.join().unwrap(), 0x10);
1921 assert_eq!(t2.join().unwrap(), 0x20);
1922 }
1923
1924 #[test]
1925 fn ok_multi_tx_drop_then_reopen_grown() {
1926 let (_dir, path, cfg) = new_tmp();
1927
1928 {
1929 let mmap = sync::Arc::new(FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap());
1930
1931 let mut handles = Vec::new();
1932 for i in 0..2u64 {
1933 let mmap = mmap.clone();
1934 handles.push(thread::spawn(move || {
1935 let _ = unsafe { mmap.write(i as usize, |v| *v = i + 1).unwrap() };
1936 }));
1937 }
1938
1939 for i in 0..2usize {
1940 let mmap = mmap.clone();
1941 handles.push(thread::spawn(move || {
1942 let _ = unsafe { mmap.read(i, |v| *v) };
1943 }));
1944 }
1945
1946 for h in handles {
1947 h.join().unwrap();
1948 }
1949 }
1950
1951 {
1952 let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x10).unwrap();
1953 assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x10);
1954
1955 for i in 0..2u64 {
1956 let val = unsafe { mmap.read(i as usize, |v| *v) };
1957 assert_eq!(val, i + 1);
1958 }
1959
1960 let idx = mmap.total_slots() - 1;
1961 unsafe { mmap.write(idx, |v| *v = 0xDEAD).unwrap() };
1962
1963 let val = unsafe { mmap.read(idx, |v| *v) };
1964 assert_eq!(val, 0xDEAD);
1965 }
1966 }
1967 }
1968
1969 mod ack_ticket {
1970 use super::*;
1971
1972 #[test]
1973 fn ok_parallel_waiters_same_epoch_window() {
1974 let (_dir, path, cfg) = new_tmp();
1975 let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
1976
1977 let barrier = sync::Arc::new(sync::Barrier::new(0x10));
1978
1979 let mut handles = Vec::new();
1980 for i in 0..0x10usize {
1981 let mmap = mmap.clone();
1982 let barrier = barrier.clone();
1983
1984 handles.push(thread::spawn(move || {
1985 barrier.wait();
1986
1987 let ticket = unsafe { mmap.write(i % INIT_SLOTS, |v| *v = i as u64).unwrap() };
1988
1989 let epoch = ticket.epoch();
1990 let durable = futures::executor::block_on(ticket).unwrap();
1991
1992 assert!(durable >= epoch);
1993 }));
1994 }
1995
1996 for handle in handles {
1997 handle.join().expect("worker thread panicked");
1998 }
1999 }
2000
2001 #[test]
2002 fn ok_parallel_waiters_same_epoch() {
2003 let completion = sync::Arc::new(ack::Completion::default());
2004
2005 let mut handles = Vec::new();
2006 for _ in 0..0x10 {
2007 let completion = completion.clone();
2008
2009 handles.push(thread::spawn(move || {
2010 let ticket = ack::AckTicket::new(1, completion);
2011
2012 assert_eq!(
2013 futures::executor::block_on(ticket).expect("ticket must complete"),
2014 1
2015 );
2016 }));
2017 }
2018
2019 thread::sleep(time::Duration::from_millis(0x0A));
2020
2021 completion.mark_epoch_as_durable(1);
2022 completion.notify_all_listeners();
2023
2024 for handle in handles {
2025 handle.join().expect("worker thread panicked");
2026 }
2027 }
2028 }
2029}