frozen_core/fmmap/mod.rs
1//! Custom implementation of `mmap(2)`
2//!
3//! ## Constraints
4//!
5//! [`FrozenMMap`] treats the mapped file as raw storage for values of `T`. Because of that,
6//! `T` must be a POD type which is safe to persist and later reinterpret from bytes.
7//!
8//! Required properties for `T`,
9//!
10//! - Must use `#[repr(C)]`
11//! - Should be 8-bytes aligned
12//! - Must not implement [`Drop`]
13//! - `size_of::<T>()` should be multiple of `8`
14//!
15//! *NOTE:* `T` must not contain heap owning or process-local pointers like [`Vec`], [`String`],
16//! [`Box`], references and function pointers, or other fields whose bit-pattern is not stable
17//! across reopen.
18//!
19//! These constrains are enforced as [`FrozenMMap`] does not serialize or deserialize values. It
20//! directly reads and writes `T` inside a memory mapped file. That means `T` must have a stable
21//! layout and must remain valid when the file is reopened in a later process.
22//!
23//! ## Example
24//!
25//! ```
26//! use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
27//!
28//! const MODULE_ID: u8 = 0;
29//!
30//! let dir = tempfile::tempdir().unwrap();
31//! let path = dir.path().join("tmp_frozen_mmap");
32//!
33//! let cfg = FrozenMMapCfg {
34//! module_id: MODULE_ID,
35//! initial_count: 0x0A,
36//! immediate_durability: false,
37//! flush_duration: std::time::Duration::from_micros(0x96),
38//! };
39//!
40//! let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
41//! assert_eq!(mmap.total_slots(), 0x0A);
42//!
43//! let ticket = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
44//! let _ = futures::executor::block_on(ticket).unwrap();
45//!
46//! let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
47//! assert_eq!(val, 0xDEADC0DE);
48//!
49//! drop(mmap);
50//!
51//! let reopened = FrozenMMap::<u64>::new_grown(&path, cfg, 0x05).unwrap();
52//! assert_eq!(reopened.total_slots(), 0x0A + 0x05);
53//!
54//! let val = unsafe { reopened.read(0, |v| *v) }.unwrap();
55//! assert_eq!(val, 0xDEADC0DE);
56//! ```
57
58#[cfg(any(target_os = "linux", target_os = "macos"))]
59mod posix;
60
61use crate::{
62 ack,
63 error::FrozenResult,
64 ffile::{FrozenFile, FrozenFileCfg},
65 hints,
66};
67use std::{
68 fmt,
69 sync::{self, atomic},
70 thread, time,
71};
72
73/// type for `epoch` used by write ops
74pub type TEpoch = u64;
75
76#[cfg(any(target_os = "linux", target_os = "macos"))]
77type TMap = posix::POSIXMMap;
78
79/// Error codes for [`FrozenMMap`]
80pub(in crate::fmmap) mod err {
81 use crate::error::{ErrCode, FrozenError, FrozenResult};
82
83 /// Domain Id for [`FrozenMMap`] is **18**
84 const ERRDOMAIN: u8 = 0x12;
85
86 /// module id used for [`FrozenMMap`]
87 pub static MID: std::sync::OnceLock<u8> = std::sync::OnceLock::new();
88
89 #[cfg(not(test))]
90 #[inline(always)]
91 pub fn mid() -> &'static u8 {
92 MID.get().unwrap()
93 }
94
95 #[cfg(test)]
96 #[inline(always)]
97 pub fn mid() -> &'static u8 {
98 MID.get_or_init(|| 0)
99 }
100
101 /// internal fuck up (hault and catch fire)
102 pub const HCF: ErrCode = ErrCode::new(0x02, "hault and catch fire");
103
104 /// unknown error (fallback)
105 pub const UNK: ErrCode = ErrCode::new(0x04, "unknown error");
106
107 /// no more memory available
108 pub const NMM: ErrCode = ErrCode::new(0x06, "not enough memory available on the device");
109
110 /// syncing error
111 pub const SYN: ErrCode = ErrCode::new(0x08, "failed to sync/flush data to storage device");
112
113 /// no write/read perm
114 pub const PRM: ErrCode = ErrCode::new(0x0A, "missing permissions for IO");
115
116 /// type `T` must not be zero sized
117 pub const ZRO: ErrCode = ErrCode::new(0x0C, "type T must not be zero sized");
118
119 /// flush_tx error (unable to spawn)
120 pub const FXE: ErrCode = ErrCode::new(0x10, "unable to spawn flush_tx");
121
122 /// type `T` implements drop
123 pub const DRP: ErrCode = ErrCode::new(0x12, "type T must not implement `Drop`");
124
125 /// type `T` is not 8 bytes aligned
126 pub const ALN: ErrCode = ErrCode::new(0x14, "type T must be 8-bytes aligned");
127
128 /// `size_of::<T>()` is not multiple of 8
129 pub const SZE: ErrCode = ErrCode::new(0x16, "`size_of::<T>()` must be multiple of 8 bytes");
130
131 #[inline]
132 pub(in crate::fmmap) fn new_err<R, E: std::fmt::Display>(
133 code: ErrCode,
134 error: E,
135 ) -> FrozenResult<R> {
136 let err = FrozenError::new_raw(*mid(), ERRDOMAIN, code, error);
137 Err(err)
138 }
139
140 #[inline]
141 pub(in crate::fmmap) fn new_err_default<R>(code: ErrCode) -> FrozenResult<R> {
142 let err = FrozenError::new_raw(*mid(), ERRDOMAIN, code, "");
143 Err(err)
144 }
145}
146
147/// Config for [`FrozenMMap`]
148#[derive(Debug, Clone)]
149pub struct FrozenMMapCfg {
150 /// Identifier used for error propagation by [`frozen_core::error::FrozenError`]
151 pub module_id: u8,
152
153 /// Number of slots to pre-allocate when [`FrozenMMap`] is initialized
154 ///
155 /// Each slot has size of [`FrozenMMap::<T>::SLOT_SIZE`], where initial file length will
156 /// be `chunk_size * initial_count` (bytes)
157 pub initial_count: usize,
158
159 /// Time interval used by flusher tx, to batch write ops into a durable window and sync them
160 /// together, where all write ops in certain time interval falls into a single durable window
161 pub flush_duration: time::Duration,
162
163 /// Enables immediate durability scheduing write operations
164 ///
165 /// When disabled, durability is driven by the background flusher thread with respect to
166 /// [`FrozenMMapCfg::flush_duration`] time interval, allowing multiple write calls to be
167 /// bathced and flushed together into a single sync operation, reducing strain on resources.
168 ///
169 /// When enabled, every successful [`FrozenMMap::write`] and [`FrozenMMapTransaction::commit`]
170 /// immediately wakes the background flusher thread, disregarding the
171 /// [`FrozenMMapCfg::flush_duration`] time interval.
172 ///
173 /// This configuration is intended for workloads where writes are rare and durability latency
174 /// is preferred over batching efficiency.
175 pub immediate_durability: bool,
176}
177
178/// Custom implementation of `mmap(2)`
179///
180/// ## Constraints
181///
182/// [`FrozenMMap`] treats the mapped file as raw storage for values of `T`. Because of that, `T`
183/// must be a POD type which is safe to persist and later reinterpret from bytes.
184///
185/// Required properties for `T`,
186///
187/// - Must use `#[repr(C)]`
188/// - Should be 8-bytes aligned
189/// - Must not implement [`Drop`]
190/// - `size_of::<T>()` should be multiple of `8`
191///
192/// *NOTE:* `T` must not contain heap owning or process-local pointers like [`Vec`], [`String`],
193/// [`Box`], references and function pointers, or other fields whose bit-pattern is not stable
194/// across reopen.
195///
196/// These constrains are enforced as [`FrozenMMap`] does not serialize or deserialize values. It
197/// directly reads and writes `T` inside a memory mapped file. That means `T` must have a stable
198/// layout and must remain valid when the file is reopened in a later process.
199///
200/// ## Example
201///
202/// ```
203/// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
204///
205/// const MODULE_ID: u8 = 0;
206///
207/// let dir = tempfile::tempdir().unwrap();
208/// let path = dir.path().join("tmp_frozen_mmap");
209///
210/// let cfg = FrozenMMapCfg {
211/// module_id: MODULE_ID,
212/// initial_count: 0x0A,
213/// immediate_durability: false,
214/// flush_duration: std::time::Duration::from_micros(0x96),
215/// };
216///
217/// let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
218/// assert_eq!(mmap.total_slots(), 0x0A);
219///
220/// let ticket = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
221/// let _ = futures::executor::block_on(ticket).unwrap();
222///
223/// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
224/// assert_eq!(val, 0xDEADC0DE);
225///
226/// drop(mmap);
227///
228/// let reopened = FrozenMMap::<u64>::new_grown(&path, cfg, 0x05).unwrap();
229/// assert_eq!(reopened.total_slots(), 0x0A + 0x05);
230///
231/// let val = unsafe { reopened.read(0, |v| *v) }.unwrap();
232/// assert_eq!(val, 0xDEADC0DE);
233/// ```
234#[derive(Debug)]
235pub struct FrozenMMap<T>
236where
237 T: Sized + Send + Sync,
238{
239 core: sync::Arc<Core>,
240 flush_tx_handle: Option<thread::JoinHandle<()>>,
241 _t: core::marker::PhantomData<T>,
242}
243
244unsafe impl<T> Send for FrozenMMap<T> where T: Sized + Send + Sync {}
245unsafe impl<T> Sync for FrozenMMap<T> where T: Sized + Send + Sync {}
246
247impl<T> FrozenMMap<T>
248where
249 T: Sized + Send + Sync,
250{
251 /// Memory space required for each slot of [`T`] in [`FrozenMMap`]
252 pub const SLOT_SIZE: usize = std::mem::size_of::<T>();
253
254 /// Create a new [`FrozenMMap`] instance w/ given [`FrozenMMapCfg`]
255 ///
256 /// ## Multiple Instances
257 ///
258 /// For each [`FrozenMMap`] instance, we acquire an exclusive lock from the kernal for the
259 /// underlying [`FrozenFile`], when trying to create multiple instances of [`FrozenMMap`], an
260 /// error will be thrown.
261 ///
262 /// ## Capacity Growth
263 ///
264 /// [`FrozenMMap`] does not support in-place growth of a live mapping, to increase capacity,
265 /// drop the current instance and reopen w/ [`FrozenMMap::open_grown`] which provides memory
266 /// mapping over grown capacity.
267 ///
268 /// ## [`FrozenMMapCfg`]
269 ///
270 /// All configs for [`FrozenMMap`] are stored in [`FrozenMMapCfg`]
271 ///
272 /// ## Working
273 ///
274 /// We first create a new [`FrozenFile`] if note already, then map the entire file using
275 /// `mmap(2)`, the entire file must read/write `T`, which also should stay constant for the
276 /// entire lifetime of file.
277 ///
278 /// ## Important
279 ///
280 /// The `cfg` must not change any of its properties for the entire life of [`FrozenFile`],
281 /// which is used under the hood, one must use config stores like [`Rta`](https://crates.io/crates/rta)
282 /// to store config.
283 ///
284 /// ## Example
285 ///
286 /// ```
287 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
288 ///
289 /// const MODULE_ID: u8 = 0;
290 ///
291 /// let dir = tempfile::tempdir().unwrap();
292 /// let path = dir.path().join("tmp_frozen_mmap");
293 ///
294 /// let cfg = FrozenMMapCfg {
295 /// module_id: MODULE_ID,
296 /// initial_count: 0x0A,
297 /// immediate_durability: false,
298 /// flush_duration: std::time::Duration::from_micros(0x96),
299 /// };
300 ///
301 /// let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
302 /// assert_eq!(mmap.total_slots(), 0x0A);
303 ///
304 /// let ticket = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
305 /// let _ = futures::executor::block_on(ticket).unwrap();
306 ///
307 /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
308 /// assert_eq!(val, 0xDEADC0DE);
309 /// ```
310 pub fn new<P: AsRef<std::path::Path>>(path: P, cfg: FrozenMMapCfg) -> FrozenResult<Self> {
311 Self::validate_t()?;
312 let (file, curr_length) = Self::open_file(path.as_ref().to_path_buf(), &cfg)?;
313 let total_slots = curr_length / Self::SLOT_SIZE;
314
315 // NOTE: The value is used for error logging and is initialized only once, as `OnceLock`
316 // guarantees that the first caller sets the value and all subsequent calls reuse it
317 let _ = err::MID.get_or_init(|| cfg.module_id);
318
319 let mmap = unsafe { TMap::new(file.fd(), curr_length) }?;
320 let core = sync::Arc::new(Core::new(mmap, file, curr_length, total_slots));
321
322 let cloned_core = sync::Arc::clone(&core);
323 let flush_tx_handle = match thread::Builder::new()
324 .name(format!("mod{}_fmmap_flush_tx", cfg.module_id))
325 .spawn(move || bg_flush_thread(cloned_core, cfg.flush_duration))
326 {
327 Ok(handle) => Some(handle),
328 Err(observed_error) => {
329 return err::new_err(err::FXE, observed_error);
330 }
331 };
332
333 Ok(Self { core, flush_tx_handle, _t: core::marker::PhantomData })
334 }
335
336 /// Create a new [`FrozenMMap`] instance w/ given [`FrozenMMapCfg`], while growing the
337 /// underlying [`FrozenFile`] by `additional_slots` before creating memory mapping
338 ///
339 /// ## Multiple Instances
340 ///
341 /// For each [`FrozenMMap`] instance, we acquire an exclusive lock from the kernal for the
342 /// underlying [`FrozenFile`], when trying to create multiple instances of [`FrozenMMap`], an
343 /// error will be thrown.
344 ///
345 /// ## Why not create a [`FrozenMMap::grow`] call?
346 ///
347 /// Previously when [`FrozenMMap::grow`] was attempted, it was observed that, resizing an active
348 /// memory mapping in place is tricky in concurrent code, as some threads would still hold
349 /// stale/unmapped pointers to mmap due to preemption from the OS schedular
350 ///
351 /// So, instead of remmaping a live instance, the current API performs growth during open,
352 /// making capacity expansion an explicit lifecycle operation, and not a side effect on a live
353 /// instance.
354 ///
355 /// ## [`FrozenMMapCfg`]
356 ///
357 /// All configs for [`FrozenMMap`] are stored in [`FrozenMMapCfg`]
358 ///
359 /// ## Working
360 ///
361 /// We first create a new [`FrozenFile`] if note already, then we grow the file using
362 /// [`FrozenFile::grow`] then map the entire file using `mmap(2)`, the entire file must
363 /// read/write `T`, which also should stay constant for the entire lifetime of file.
364 ///
365 /// ## Important
366 ///
367 /// The `cfg` must not change any of its properties for the entire life of [`FrozenFile`],
368 /// which is used under the hood, one must use config stores like
369 /// [`Rta`](https://crates.io/crates/rta) to store config.
370 ///
371 /// ## Example
372 ///
373 /// ```
374 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
375 ///
376 /// const MODULE_ID: u8 = 0;
377 ///
378 /// let dir = tempfile::tempdir().unwrap();
379 /// let path = dir.path().join("tmp_frozen_mmap");
380 ///
381 /// let cfg = FrozenMMapCfg {
382 /// module_id: MODULE_ID,
383 /// initial_count: 0x0A,
384 /// immediate_durability: false,
385 /// flush_duration: std::time::Duration::from_micros(0x96),
386 /// };
387 ///
388 /// let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x0A).unwrap();
389 /// assert_eq!(mmap.total_slots(), 0x0A * 2);
390 ///
391 /// let ticket = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
392 /// let _ = futures::executor::block_on(ticket).unwrap();
393 ///
394 /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
395 /// assert_eq!(val, 0xDEADC0DE);
396 /// ```
397 pub fn new_grown<P: AsRef<std::path::Path>>(
398 path: P,
399 cfg: FrozenMMapCfg,
400 additional_slots: usize,
401 ) -> FrozenResult<Self> {
402 Self::validate_t()?;
403 let (file, _) = Self::open_file(path.as_ref().to_path_buf(), &cfg)?;
404
405 // we grow the underlying FrozenFile as requested
406 file.grow(additional_slots)?;
407 let curr_length = file.length()?; // we must read the updated (grown) length
408 let total_slots = curr_length / Self::SLOT_SIZE;
409
410 // NOTE: The value is used for error logging and is initialized only once, as `OnceLock`
411 // guarantees that the first caller sets the value and all subsequent calls reuse it
412 let _ = err::MID.get_or_init(|| cfg.module_id);
413
414 let mmap = unsafe { TMap::new(file.fd(), curr_length) }?;
415 let core = sync::Arc::new(Core::new(mmap, file, curr_length, total_slots));
416
417 let cloned_core = sync::Arc::clone(&core);
418 let flush_tx_handle = match thread::Builder::new()
419 .name(format!("mod{}_fmmap_flush_tx", cfg.module_id))
420 .spawn(move || bg_flush_thread(cloned_core, cfg.flush_duration))
421 {
422 Ok(handle) => Some(handle),
423 Err(observed_error) => {
424 return err::new_err(err::FXE, observed_error);
425 }
426 };
427
428 Ok(Self { core, flush_tx_handle, _t: core::marker::PhantomData })
429 }
430
431 /// Create/open a new [`FrozenFile`] instance
432 fn open_file(
433 path: std::path::PathBuf,
434 cfg: &FrozenMMapCfg,
435 ) -> FrozenResult<(FrozenFile, usize)> {
436 let ff_cfg = FrozenFileCfg {
437 path,
438 module_id: cfg.module_id,
439 buffer_size: Self::SLOT_SIZE,
440 initial_available_buffers: cfg.initial_count,
441 };
442
443 let file = FrozenFile::new(ff_cfg)?;
444 let curr_length = file.length()?;
445
446 Ok((file, curr_length))
447 }
448
449 #[inline]
450 fn validate_t() -> FrozenResult<()> {
451 if std::mem::needs_drop::<T>() {
452 return err::new_err_default(err::DRP);
453 }
454
455 let align = std::mem::align_of::<T>();
456 if align != 8 {
457 return err::new_err_default(err::ALN);
458 }
459
460 let size = std::mem::size_of::<T>();
461 if size == 0 {
462 return err::new_err_default(err::ZRO);
463 }
464
465 if size % 8 != 0 {
466 return err::new_err_default(err::SZE);
467 }
468
469 Ok(())
470 }
471
472 /// Read a `T` at given `index` via callback (`f`)
473 ///
474 /// ## Concurrency
475 ///
476 /// Internally, [`FrozenMMap`] implements per-slot locking, so concurrent reads and writes for
477 /// at same index is atomic and thread safe, while operations on different indices may proceed
478 /// fully in parallel
479 ///
480 /// ## Safety
481 ///
482 /// The caller must ensure following:
483 ///
484 /// - given `index` is within bounds
485 /// - underlying memory contains a valid instance of `T`
486 /// - provided callback `f` must not,
487 /// - write through the pointer
488 /// - store or leak pointer beyound there lifetime
489 ///
490 /// Violating any of the above may result in undefined behavior
491 ///
492 /// ## Example
493 ///
494 /// ```
495 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
496 ///
497 /// const MODULE_ID: u8 = 0;
498 ///
499 /// let dir = tempfile::tempdir().unwrap();
500 /// let path = dir.path().join("tmp_read_mmap");
501 ///
502 /// let cfg = FrozenMMapCfg {
503 /// module_id: MODULE_ID,
504 /// initial_count: 0x02,
505 /// immediate_durability: false,
506 /// flush_duration: std::time::Duration::from_micros(0x60),
507 /// };
508 ///
509 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
510 ///
511 /// let ticket = unsafe { mmap.write(0, |v| *v = 0x0A) }.unwrap();
512 /// let _ = futures::executor::block_on(ticket).unwrap();
513 ///
514 /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
515 /// assert_eq!(val, 0x0A);
516 /// ```
517 #[inline(always)]
518 pub unsafe fn read<R>(&self, index: usize, f: impl FnOnce(*const T) -> R) -> FrozenResult<R> {
519 let offset = Self::SLOT_SIZE * index;
520 let _lock = self.core.locks.lock(index);
521
522 // NOTE: We do avoid acquiring io_lock for read ops to increase the throughput (under the
523 // assumption of OS guarantees visibility)
524
525 let ptr = unsafe { self.core.map.as_ptr(offset) };
526 Ok(f(ptr))
527 }
528
529 /// Write/update a `T` at given `index` via callback (`f`)
530 ///
531 /// ## Concurrency
532 ///
533 /// Internally, [`FrozenMMap`] implements per-slot locking, so concurrent reads and writes for
534 /// at same index is atomic and thread safe, while operations on different indices may proceed
535 /// fully in parallel
536 ///
537 /// ## Safety
538 ///
539 /// The caller must ensure following:
540 ///
541 /// - given `index` is within bounds
542 /// - underlying memory contains a valid instance of `T`
543 /// - provided callback `f` must not store or leak pointer beyound there lifetime
544 ///
545 /// Violating any of the above may result in undefined behavior
546 ///
547 /// ## Example
548 ///
549 /// ```
550 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
551 ///
552 /// const MODULE_ID: u8 = 0;
553 ///
554 /// let dir = tempfile::tempdir().unwrap();
555 /// let path = dir.path().join("tmp_write_mmap");
556 ///
557 /// let cfg = FrozenMMapCfg {
558 /// module_id: MODULE_ID,
559 /// initial_count: 0x02,
560 /// immediate_durability: false,
561 /// flush_duration: std::time::Duration::from_micros(0x96),
562 /// };
563 ///
564 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
565 ///
566 /// let ticket = unsafe {mmap.write(1, |v| *v = 0x2B) }.unwrap();
567 /// let _ = futures::executor::block_on(ticket).unwrap();
568 ///
569 /// let val = unsafe { mmap.read(1, |v| *v) }.unwrap();
570 /// assert_eq!(val, 0x2B);
571 /// ```
572 #[inline(always)]
573 pub unsafe fn write(
574 &self,
575 index: usize,
576 f: impl FnOnce(*mut T),
577 ) -> FrozenResult<ack::AckTicket> {
578 // propagate prev errors
579 if let Some(err) = self.core.completion.get_err() {
580 return Err(err);
581 }
582
583 let offset = Self::SLOT_SIZE * index;
584
585 let _guard = self.core.acquire_io_lock();
586 let _lock = self.core.locks.lock(index);
587
588 let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
589 f(ptr);
590
591 self.core.dirty.store(true, atomic::Ordering::Release);
592 let epoch = self.core.completion.increment_current_epoch();
593
594 Ok(ack::AckTicket::new(epoch, self.core.completion.clone()))
595 }
596
597 /// Read current available count of slots, where each slot has size of [`FrozenMMap::<T>::SLOT_SIZE`]
598 ///
599 /// ## Working
600 ///
601 /// This call performs a syscall to fetch current length of [`FrozenFile`] from fs, as the
602 /// current length of the file is not cached anywhere in the pipeline to avoid TOCTAU race
603 /// conditions
604 ///
605 /// ## Example
606 ///
607 /// ```
608 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
609 ///
610 /// const MODULE_ID: u8 = 0;
611 ///
612 /// let dir = tempfile::tempdir().unwrap();
613 /// let path = dir.path().join("tmp_grow_mmap");
614 ///
615 /// let cfg = FrozenMMapCfg {
616 /// module_id: MODULE_ID,
617 /// initial_count: 0x02,
618 /// immediate_durability: false,
619 /// flush_duration: std::time::Duration::from_micros(0x96),
620 /// };
621 ///
622 /// let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
623 /// assert_eq!(mmap.total_slots(), 0x02);
624 ///
625 /// drop(mmap);
626 ///
627 /// let mmap = FrozenMMap::<u64>::new_grown(&path, cfg, 0x03).unwrap();
628 /// assert_eq!(mmap.total_slots(), 0x02 + 0x03);
629 /// ```
630 #[inline]
631 pub fn total_slots(&self) -> usize {
632 self.core.curr_length / Self::SLOT_SIZE
633 }
634
635 /// Read the total memory footprint (in bytes) used by [`FrozenMMap`]
636 ///
637 /// *NOTE:* This is an approzimation of memory used, actual RSS may differ depending on paging
638 /// and OS
639 ///
640 /// ## Example
641 ///
642 /// ```
643 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
644 ///
645 /// const MODULE_ID: u8 = 0;
646 ///
647 /// let dir = tempfile::tempdir().unwrap();
648 /// let path = dir.path().join("tmp_mem_usage");
649 ///
650 /// let cfg = FrozenMMapCfg {
651 /// module_id: MODULE_ID,
652 /// initial_count: 0x10,
653 /// immediate_durability: false,
654 /// flush_duration: std::time::Duration::from_micros(0x60),
655 /// };
656 ///
657 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
658 ///
659 /// let bytes = mmap.memory_usage();
660 /// assert!(bytes >= mmap.total_slots() * std::mem::size_of::<u64>());
661 /// ```
662 #[inline]
663 pub fn memory_usage(&self) -> usize {
664 // memory of mem-maped region
665 let mmap_bytes = self.core.curr_length;
666
667 // memory used for locking
668 let lock_bytes = self.core.locks.0.len() * std::mem::size_of::<atomic::AtomicU8>();
669
670 mmap_bytes + lock_bytes
671 }
672
673 /// Create a new [`FrozenMMapTransaction`] context for grouping multi write ops into a single atomic
674 /// operation
675 ///
676 /// ## Overview
677 ///
678 /// The use of [`FrozenMMapTransaction`] allows to group multiple write ops into a single atomic
679 /// operation, hence creating a transactional write operation, which gives following guarantees:
680 ///
681 /// - All write ops succeed together
682 /// - Single epoch to track durability of all writes ops
683 /// - Same durability guarantee for all the included write ops
684 ///
685 /// Simply, this preserves atomic durability semantics for multi index updates
686 ///
687 /// ## Example
688 ///
689 /// ```
690 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
691 ///
692 /// const MODULE_ID: u8 = 0;
693 ///
694 /// let dir = tempfile::tempdir().unwrap();
695 /// let path = dir.path().join("tmp_tx");
696 ///
697 /// let cfg = FrozenMMapCfg {
698 /// module_id: MODULE_ID,
699 /// initial_count: 0x0A,
700 /// immediate_durability: false,
701 /// flush_duration: std::time::Duration::from_micros(50),
702 /// };
703 ///
704 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
705 ///
706 /// let mut tx = mmap.new_tx();
707 /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
708 /// unsafe { tx.write(1, |v| *v = 0x14) }.unwrap();
709 ///
710 /// let ticket = tx.commit().unwrap();
711 /// let _ = futures::executor::block_on(ticket).unwrap();
712 ///
713 /// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
714 /// let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
715 ///
716 /// assert_eq!((v0, v1), (0x0A, 0x14));
717 /// ```
718 #[inline]
719 pub fn new_tx(&self) -> FrozenMMapTransaction<'_, T> {
720 FrozenMMapTransaction { core: &self.core, ops_vec: Vec::new() }
721 }
722
723 /// Delete the underlying [`FrozenFile`] used for [`FrozenMMap`] from fs
724 ///
725 /// ## Working
726 ///
727 /// When `delete` is called, all read, write, and (background) sync ops are paused
728 /// (indefinitely), whule deletion is done with following steps:
729 ///
730 /// - acquire an exclusive `io_lock` (all other ops are paused indefinitely)
731 /// - if any batch is pending for sync,
732 /// - swap the flag
733 /// - call sync manually
734 /// - incr epoch and update cv
735 /// - brodcast closing so flusher tx could wrap up
736 /// - `munmap(2)` current mapping
737 /// - call delete on [`FrozenFile`]
738 ///
739 /// ## Example
740 ///
741 /// ```
742 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
743 ///
744 /// const MODULE_ID: u8 = 0;
745 ///
746 /// let dir = tempfile::tempdir().unwrap();
747 /// let path = dir.path().join("tmp_delete_mmap");
748 ///
749 /// let cfg = FrozenMMapCfg {
750 /// module_id: MODULE_ID,
751 /// initial_count: 0x04,
752 /// immediate_durability: false,
753 /// flush_duration: std::time::Duration::from_micros(0x96),
754 /// };
755 ///
756 /// let mut mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
757 /// mmap.delete().unwrap();
758 /// assert!(!path.exists());
759 /// ```
760 pub fn delete(&mut self) -> FrozenResult<()> {
761 // NOTE: we must broadcast that the close is happening to allow flusher tx to wrap up
762 self.core.dirty.store(false, atomic::Ordering::Release);
763 self.core.closed.store(true, atomic::Ordering::Release);
764 self.core.durable_cv.notify_one();
765
766 if let Some(handle) = self.flush_tx_handle.take() {
767 let _ = handle.join();
768 }
769
770 // pause all new write ops
771 let _lock = self.core.acquire_exclusive_io_lock();
772
773 self.munmap()?;
774 self.core.file.delete()
775 }
776
777 /// Flush the dirty mmap data to persistant storage
778 ///
779 /// *NOTE:* This call must be paired w/ [`FrozenMMap::flush_file`] for stronger durability
780 /// guarantee
781 ///
782 /// ## Example
783 ///
784 /// ```
785 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
786 ///
787 /// const MODULE_ID: u8 = 0;
788 ///
789 /// let dir = tempfile::tempdir().unwrap();
790 /// let path = dir.path().join("tmp_delete_mmap");
791 ///
792 /// let cfg = FrozenMMapCfg {
793 /// module_id: MODULE_ID,
794 /// initial_count: 0x04,
795 /// immediate_durability: false,
796 /// flush_duration: std::time::Duration::from_micros(0x96),
797 /// };
798 ///
799 /// let mut mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
800 /// assert!(unsafe { mmap.flush_mmap() }.is_ok());
801 /// ```
802 #[inline]
803 pub unsafe fn flush_mmap(&self) -> FrozenResult<()> {
804 self.core.map.sync(self.core.curr_length)
805 }
806
807 /// Perform hard flush for the dirty mmap'ed data for stronger durability guarantee
808 ///
809 /// ## Example
810 ///
811 /// ```
812 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
813 ///
814 /// const MODULE_ID: u8 = 0;
815 ///
816 /// let dir = tempfile::tempdir().unwrap();
817 /// let path = dir.path().join("tmp_delete_mmap");
818 ///
819 /// let cfg = FrozenMMapCfg {
820 /// module_id: MODULE_ID,
821 /// initial_count: 0x04,
822 /// immediate_durability: false,
823 /// flush_duration: std::time::Duration::from_micros(0x96),
824 /// };
825 ///
826 /// let mut mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
827 /// assert!(unsafe { mmap.flush_file() }.is_ok());
828 /// ```
829 #[inline]
830 pub unsafe fn flush_file(&self) -> FrozenResult<()> {
831 self.core.file.sync()
832 }
833
834 #[inline]
835 fn munmap(&self) -> FrozenResult<()> {
836 let length = self.core.curr_length;
837 unsafe { self.core.map.unmap(length) }
838 }
839}
840
841impl<T> Drop for FrozenMMap<T>
842where
843 T: Sized + Send + Sync,
844{
845 fn drop(&mut self) {
846 let is_closed = self.core.closed.swap(true, atomic::Ordering::Release);
847 self.core.cv.notify_one(); // notify flusher tx to shut
848
849 if let Some(handle) = self.flush_tx_handle.take() {
850 let _ = handle.join();
851 }
852
853 // we must acquire an exclusive lock, to prevent dropping while sync, growing or any io ops
854 let _io_lock = self.core.acquire_exclusive_io_lock();
855
856 if !is_closed {
857 let _ = self.munmap();
858 }
859 }
860}
861
862impl<T> fmt::Display for FrozenMMap<T>
863where
864 T: Sized + Send + Sync,
865{
866 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
867 write!(
868 f,
869 "FrozenMMap{{fd: {}, total_slots: {}, len: {}}}",
870 self.core.file.fd(),
871 self.total_slots(),
872 self.core.curr_length,
873 )
874 }
875}
876
877/// ## Why we ignore [`std::sync::PoisonError`]?
878///
879/// The mutex used for lock, is solely used as a parking primitive for [`Condvar`] and does not
880/// protect any mutable state. All the pool invariants and accounting are maintained via atomics
881/// and are completely seperated from the mutex.
882///
883/// A poisoned mutex only indicates that another tx panicked while holding the lock, and indicates
884/// an inconsistent state of the protected value. Since no state can be left partially modified
885/// under this lock, there is no possible consistency risk to recover from and propagating the
886/// poison error would only introduce unnecessary failures into the allocation path.
887///
888/// Therefore, as best effort, we consume the [`std::sync::PoisonError`] and continue operating
889/// with the recovered guard.
890fn bg_flush_thread(core: sync::Arc<Core>, flush_duration: time::Duration) {
891 // init phase (acquiring locks)
892 let mut guard = core.lock.lock().unwrap_or_else(|e| e.into_inner());
893
894 // sync loop w/ non-busy waiting
895 loop {
896 (guard, _) = core.cv.wait_timeout(guard, flush_duration).unwrap_or_else(|e| e.into_inner());
897
898 // NOTE: we must read values of close brodcast before acquire exclusive lock, if done
899 // otherwise, we impose serious deadlock sort of situation for the the flusher tx
900
901 let dirty = core.dirty.swap(false, atomic::Ordering::AcqRel);
902 let closing = core.closed.load(atomic::Ordering::Acquire);
903
904 if !dirty {
905 if closing {
906 core.cv.notify_all();
907 return;
908 }
909
910 continue;
911 }
912
913 // INFO: we must acquire an exclusive IO lock for sync, hence no write, read or
914 // grow could kick in while sync is in progress
915
916 let io_lock = core.acquire_exclusive_io_lock();
917
918 // INFO: we must drop the guard before syscall, as its a blocking operation and holding
919 // the mutex while the syscall takes place is not a good idea, while we drop the mutex
920 // and acqurie it again, in-between other process could acquire it and use it
921 drop(guard);
922
923 // NOTE: We snapshot `current_epoch` before sync to establish a strict batch boundary,
924 // all writes up to `target_epoch` are guaranteed to be part of this flush window,
925 // while anything beyound belongs to the next batch
926 let target_epoch = core.completion.read_current_epoch();
927
928 // NOTE:
929 //
930 // - if sync fails, we update the Core::error w/ the received error object
931 // - we clear it up when another sync call succeeds
932 // - this is valid, as the underlying sync flushes entire mmaped region, hence
933 // even if the last call failed, and the new one succeeded, we do get the durability
934 // guarenty for the old data as well
935
936 if let Err(new_error) = core.sync() {
937 core.completion.set_err(new_error);
938 } else {
939 core.completion.mark_epoch_as_durable(target_epoch);
940 core.completion.del_err();
941 }
942
943 // NOTE: notify all listeners currently waiting for durability progress
944 core.completion.notify_all_listeners();
945
946 drop(io_lock);
947 guard = core.acquire_lock();
948 }
949}
950
951/// A context for grouping multi write ops into a single atomic operation
952///
953/// ## Overview
954///
955/// Use of [`FrozenMMapTransaction`] allows to group multiple write ops into a single atomic operation.
956///
957/// - All included writes are applied together
958/// - Single epoch is assinged for an entier transaction
959/// - Durability guarantee is same for all included write ops
960///
961/// ## Example
962///
963/// ```
964/// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
965///
966/// const MODULE_ID: u8 = 0;
967///
968/// let dir = tempfile::tempdir().unwrap();
969/// let path = dir.path().join("tmp_tx");
970///
971/// let cfg = FrozenMMapCfg {
972/// module_id: MODULE_ID,
973/// initial_count: 0x0A,
974/// immediate_durability: false,
975/// flush_duration: std::time::Duration::from_micros(50),
976/// };
977///
978/// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
979///
980/// let mut tx = mmap.new_tx();
981/// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
982/// unsafe { tx.write(1, |v| *v = 0x14) }.unwrap();
983/// unsafe { tx.write(2, |v| *v = 0x18) }.unwrap();
984///
985/// let ticket = tx.commit().unwrap();
986/// let _ = futures::executor::block_on(ticket).unwrap();
987///
988/// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
989/// let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
990/// let v2 = unsafe { mmap.read(2, |v| *v).unwrap() };
991///
992/// assert_eq!((v0, v1, v2), (0x0A, 0x14, 0x18));
993/// ```
994pub struct FrozenMMapTransaction<'a, T> {
995 core: &'a Core,
996 ops_vec: Vec<(usize, Box<dyn FnOnce(*mut T) + 'a>)>,
997}
998
999impl<'a, T> FrozenMMapTransaction<'a, T> {
1000 /// Append a write op into the [`FrozenMMapTransaction`]
1001 ///
1002 /// ## Requirements
1003 ///
1004 /// Write ops must follow these safety requirements,
1005 ///
1006 /// - No duplicate indices
1007 /// - No out-of-order writes (must be incremental)
1008 ///
1009 /// Violating this constraint would result in [`FrozenError`]
1010 ///
1011 /// ## Safety
1012 ///
1013 /// Same safety requirements as [`FrozenMMap::write`] apply here
1014 ///
1015 /// ## Example
1016 ///
1017 /// ```
1018 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
1019 ///
1020 /// const MODULE_ID: u8 = 0;
1021 ///
1022 /// let dir = tempfile::tempdir().unwrap();
1023 /// let path = dir.path().join("tmp_tx");
1024 ///
1025 /// let cfg = FrozenMMapCfg {
1026 /// module_id: MODULE_ID,
1027 /// initial_count: 0x10,
1028 /// immediate_durability: false,
1029 /// flush_duration: std::time::Duration::from_micros(50),
1030 /// };
1031 ///
1032 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1033 ///
1034 /// let mut tx = mmap.new_tx();
1035 /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
1036 /// unsafe { tx.write(1, |v| *v = 0x0B) }.unwrap();
1037 /// unsafe { tx.write(2, |v| *v = 0x0C) }.unwrap();
1038 ///
1039 /// let ticket = tx.commit().unwrap();
1040 /// let _ = futures::executor::block_on(ticket).unwrap();
1041 ///
1042 /// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1043 /// let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
1044 /// let v2 = unsafe { mmap.read(2, |v| *v).unwrap() };
1045 ///
1046 /// assert_eq!((v0, v1, v2), (0x0A, 0x0B, 0x0C));
1047 /// ```
1048 #[inline(always)]
1049 pub unsafe fn write<F>(&mut self, index: usize, f: F) -> FrozenResult<()>
1050 where
1051 F: FnOnce(*mut T) + 'a,
1052 {
1053 // NOTE:
1054 //
1055 // This check prevents a potential footgun! For a safe transaction all writes must be,
1056 // - ordered by index (either incr or decr)
1057 // - no multi writes on same index
1058 //
1059 // If any of these is violated, there is a certain risk of deadlock in multi tx env's
1060 if let Some((last_idx, _)) = self.ops_vec.last() {
1061 if index <= *last_idx {
1062 return err::new_err(
1063 err::HCF,
1064 "tx writes must be strictly ordered, with no more then single ops on given index",
1065 );
1066 }
1067 }
1068
1069 self.ops_vec.push((index, Box::new(f)));
1070 Ok(())
1071 }
1072
1073 /// Commit the transaction, applying all the writes ops, combined into a single atomic operation
1074 ///
1075 /// ## Guarantees
1076 ///
1077 /// - All writes are applied under a single epoch
1078 /// - All writes belong to the same durability batch
1079 /// - No interleaving with other transactions at epoch level
1080 ///
1081 /// ## Example
1082 ///
1083 /// ```
1084 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
1085 ///
1086 /// const MODULE_ID: u8 = 0;
1087 ///
1088 /// let dir = tempfile::tempdir().unwrap();
1089 /// let path = dir.path().join("tmp_tx");
1090 ///
1091 /// let cfg = FrozenMMapCfg {
1092 /// module_id: MODULE_ID,
1093 /// initial_count: 0x10,
1094 /// immediate_durability: false,
1095 /// flush_duration: std::time::Duration::from_micros(50),
1096 /// };
1097 ///
1098 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1099 ///
1100 /// let mut tx = mmap.new_tx();
1101 /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
1102 /// unsafe { tx.write(2, |v| *v = 0x0C) }.unwrap();
1103 ///
1104 /// let ticket = tx.commit().unwrap();
1105 /// let _ = futures::executor::block_on(ticket).unwrap();
1106 ///
1107 /// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1108 /// let v1 = unsafe { mmap.read(2, |v| *v).unwrap() };
1109 ///
1110 /// assert_eq!((v0, v1), (0x0A, 0x0C));
1111 /// ```
1112 #[inline(always)]
1113 pub fn commit(self) -> FrozenResult<ack::AckTicket> {
1114 if let Some(err) = self.core.completion.get_err() {
1115 return Err(err);
1116 }
1117
1118 let _guard = self.core.acquire_io_lock();
1119
1120 // NOTE: we must acquire all locks beforehand, to make sure all the write go through
1121 let mut guards = Vec::with_capacity(self.ops_vec.len());
1122 for (idx, _) in &self.ops_vec {
1123 guards.push(self.core.locks.lock(*idx));
1124 }
1125
1126 for (idx, op) in self.ops_vec {
1127 let offset = idx * std::mem::size_of::<T>();
1128 let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
1129 op(ptr);
1130 }
1131
1132 self.core.dirty.store(true, atomic::Ordering::Release);
1133 let epoch = self.core.completion.increment_current_epoch();
1134
1135 Ok(ack::AckTicket::new(epoch, self.core.completion.clone()))
1136 }
1137}
1138
1139#[derive(Debug)]
1140struct Core {
1141 completion: sync::Arc<ack::Completion>,
1142 cv: sync::Condvar,
1143 curr_length: usize,
1144 dirty: atomic::AtomicBool,
1145 io_lock: sync::RwLock<()>,
1146 map: TMap,
1147 locks: Locks,
1148 lock: sync::Mutex<()>,
1149 file: FrozenFile,
1150 durable_cv: sync::Condvar,
1151 closed: atomic::AtomicBool,
1152}
1153
1154unsafe impl Send for Core {}
1155unsafe impl Sync for Core {}
1156
1157impl Core {
1158 fn new(map: TMap, file: FrozenFile, curr_length: usize, total_slots: usize) -> Self {
1159 Self {
1160 map,
1161 file,
1162 curr_length,
1163 completion: sync::Arc::new(ack::Completion::default()),
1164 cv: sync::Condvar::new(),
1165 lock: sync::Mutex::new(()),
1166 io_lock: sync::RwLock::new(()),
1167 locks: Locks::new(total_slots),
1168 durable_cv: sync::Condvar::new(),
1169 dirty: atomic::AtomicBool::new(false),
1170 closed: atomic::AtomicBool::new(false),
1171 }
1172 }
1173
1174 #[inline]
1175 fn sync(&self) -> FrozenResult<()> {
1176 unsafe { self.map.sync(self.curr_length) }?;
1177 self.file.sync()
1178 }
1179
1180 /// NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
1181 #[inline]
1182 fn acquire_lock(&self) -> sync::MutexGuard<'_, ()> {
1183 self.lock.lock().unwrap_or_else(|e| e.into_inner())
1184 }
1185
1186 /// NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
1187 #[inline]
1188 fn acquire_io_lock(&self) -> sync::RwLockReadGuard<'_, ()> {
1189 self.io_lock.read().unwrap_or_else(|e| e.into_inner())
1190 }
1191
1192 /// NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
1193 #[inline]
1194 fn acquire_exclusive_io_lock(&self) -> sync::RwLockWriteGuard<'_, ()> {
1195 self.io_lock.write().unwrap_or_else(|e| e.into_inner())
1196 }
1197}
1198
1199#[derive(Debug)]
1200struct Locks(Box<[atomic::AtomicU8]>);
1201
1202impl Locks {
1203 const LOCK: u8 = 1;
1204 const UNLOCK: u8 = 0;
1205
1206 const L1_CONTENTION: usize = 0x10;
1207 const L2_CONTENTION: usize = 0x20;
1208
1209 fn new(cap: usize) -> Self {
1210 let mut slots = Vec::with_capacity(cap);
1211 for _ in 0..cap {
1212 slots.push(atomic::AtomicU8::new(Self::UNLOCK));
1213 }
1214
1215 Self(slots.into_boxed_slice())
1216 }
1217
1218 #[inline(always)]
1219 fn lock(&self, index: usize) -> LockGuard<'_> {
1220 let val = &self.0[index];
1221 let mut spins = 0;
1222
1223 loop {
1224 if val
1225 .compare_exchange_weak(
1226 Self::UNLOCK,
1227 Self::LOCK,
1228 atomic::Ordering::Acquire,
1229 atomic::Ordering::Relaxed,
1230 )
1231 .is_ok()
1232 {
1233 return LockGuard(val);
1234 }
1235
1236 // we must backoff under contention
1237
1238 // NOTE:
1239 //
1240 // We have three levels of backoff,
1241 //
1242 // 1. Busy waiting w/ CPU hint
1243 // 2. Willingly allow preemption by OS schedular
1244 // 3. Sleep the thread (increasing w/ exponenial factor)
1245
1246 if hints::likely(spins < Self::L1_CONTENTION) {
1247 std::hint::spin_loop();
1248 } else if spins < Self::L2_CONTENTION {
1249 thread::yield_now();
1250 } else {
1251 let ns = 0x30 << (spins - Self::L2_CONTENTION).min(0x0A);
1252 thread::sleep(time::Duration::from_nanos(ns));
1253 }
1254
1255 spins += 1;
1256 }
1257 }
1258}
1259
1260struct LockGuard<'a>(&'a atomic::AtomicU8);
1261
1262impl Drop for LockGuard<'_> {
1263 fn drop(&mut self) {
1264 self.0.store(Locks::UNLOCK, atomic::Ordering::Release);
1265 }
1266}
1267
1268#[cfg(test)]
1269mod tests {
1270 use super::*;
1271
1272 // NOTE: we keep this small on purpose, so we won't have to wait at all in tests
1273 const FLUSH_DURATION: time::Duration = time::Duration::from_micros(10);
1274
1275 const MODULE_ID: u8 = 0;
1276 const INIT_SLOTS: usize = 0x0A;
1277
1278 fn new_tmp() -> (tempfile::TempDir, std::path::PathBuf, FrozenMMapCfg) {
1279 let dir = tempfile::tempdir().unwrap();
1280 let path = dir.path().join("tmp_map");
1281
1282 let cfg = FrozenMMapCfg {
1283 module_id: MODULE_ID,
1284 initial_count: INIT_SLOTS,
1285 immediate_durability: false,
1286 flush_duration: FLUSH_DURATION,
1287 };
1288
1289 (dir, path, cfg)
1290 }
1291
1292 mod fm_lifecycle {
1293 use super::*;
1294
1295 #[test]
1296 fn ok_new() {
1297 let (_dir, path, cfg) = new_tmp();
1298 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1299
1300 assert!(!mmap.core.dirty.load(atomic::Ordering::Acquire));
1301 assert!(!mmap.core.closed.load(atomic::Ordering::Acquire));
1302 assert_eq!(mmap.core.curr_length, INIT_SLOTS * FrozenMMap::<u64>::SLOT_SIZE);
1303
1304 assert_eq!(mmap.core.completion.read_current_epoch(), 0);
1305 assert_eq!(mmap.core.completion.read_durable_epoch(), 0);
1306
1307 // satisfies the bg thread was spawned correctly
1308 assert!(mmap.core.completion.get_err().is_none());
1309
1310 // satisfies wait on epoch works
1311 let ticket = unsafe { mmap.write(0, |f| *f = 0x0A).unwrap() };
1312
1313 let ticket_epoch = ticket.epoch();
1314 let durable_epoch = futures::executor::block_on(ticket).unwrap();
1315
1316 assert!(durable_epoch >= ticket_epoch);
1317 }
1318
1319 #[test]
1320 fn ok_new_existing() {
1321 let (_dir, path, cfg) = new_tmp();
1322
1323 // create new + close
1324 let mmap1 = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1325 drop(mmap1);
1326
1327 // open existing
1328 let mmap2 = FrozenMMap::<u64>::new(path, cfg).unwrap();
1329 drop(mmap2);
1330 }
1331
1332 #[test]
1333 fn err_new_when_change_in_cfg() {
1334 let (_dir, path, mut cfg) = new_tmp();
1335
1336 // create new + close
1337 let mmap1 = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1338 drop(mmap1);
1339
1340 // update cfg + opne existing
1341 cfg.initial_count = INIT_SLOTS * 2;
1342 assert!(FrozenMMap::<u64>::new(path, cfg).is_err());
1343 }
1344
1345 #[test]
1346 fn ok_delete() {
1347 let (_dir, path, cfg) = new_tmp();
1348 let mut mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1349
1350 mmap.delete().unwrap();
1351 assert!(!mmap.core.file.exists().unwrap());
1352 }
1353
1354 #[test]
1355 fn err_delete_after_delete() {
1356 let (_dir, path, cfg) = new_tmp();
1357 let mut mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1358
1359 mmap.delete().unwrap();
1360 assert!(!mmap.core.file.exists().unwrap());
1361 assert!(mmap.delete().is_err());
1362 }
1363
1364 #[test]
1365 fn ok_drop_persists_when_dropped_before_bg_flush() {
1366 let (_dir, path, cfg) = new_tmp();
1367 const VAL: u64 = 0x0A;
1368
1369 {
1370 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1371 unsafe { mmap.write(0, |byte| *byte = VAL).unwrap() };
1372 drop(mmap);
1373 }
1374
1375 {
1376 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1377 let val = unsafe { mmap.read(0, |byte| *byte).unwrap() };
1378 assert_eq!(val, VAL);
1379 }
1380 }
1381 }
1382
1383 mod fm_validate_t {
1384 use super::*;
1385
1386 #[repr(C, align(8))]
1387 struct OkT {
1388 a: u64,
1389 b: u64,
1390 }
1391
1392 #[repr(C)]
1393 struct BadAlignT {
1394 a: u32,
1395 b: u32,
1396 }
1397
1398 #[repr(C, align(4))]
1399 struct BadSizeT {
1400 a: u32,
1401 b: u16,
1402 }
1403
1404 #[repr(C, align(8))]
1405 struct DropT(u64);
1406
1407 impl Drop for DropT {
1408 fn drop(&mut self) {}
1409 }
1410
1411 #[repr(C, align(8))]
1412 struct ZstT;
1413
1414 #[test]
1415 fn ok_validate_t() {
1416 assert!(FrozenMMap::<OkT>::validate_t().is_ok());
1417 }
1418
1419 #[test]
1420 fn err_validate_t_when_drop() {
1421 assert!(FrozenMMap::<DropT>::validate_t().is_err());
1422 }
1423
1424 #[test]
1425 fn err_validate_t_when_not_8_byte_aligned() {
1426 assert!(FrozenMMap::<BadAlignT>::validate_t().is_err());
1427 }
1428
1429 #[test]
1430 fn err_validate_t_when_size_not_multiple_of_8() {
1431 assert!(FrozenMMap::<BadSizeT>::validate_t().is_err());
1432 }
1433
1434 #[test]
1435 fn err_validate_t_when_zero_sized() {
1436 assert!(FrozenMMap::<ZstT>::validate_t().is_err());
1437 }
1438
1439 #[test]
1440 fn err_new_when_t_implements_drop() {
1441 let (_dir, path, cfg) = new_tmp();
1442 assert!(FrozenMMap::<DropT>::new(path, cfg).is_err());
1443 }
1444
1445 #[test]
1446 fn err_new_when_t_is_not_8_byte_aligned() {
1447 let (_dir, path, cfg) = new_tmp();
1448 assert!(FrozenMMap::<BadAlignT>::new(path, cfg).is_err());
1449 }
1450
1451 #[test]
1452 fn err_new_when_t_size_is_not_multiple_of_8() {
1453 let (_dir, path, cfg) = new_tmp();
1454 assert!(FrozenMMap::<BadSizeT>::new(path, cfg).is_err());
1455 }
1456
1457 #[test]
1458 fn err_new_when_t_is_zero_sized() {
1459 let (_dir, path, cfg) = new_tmp();
1460 assert!(FrozenMMap::<ZstT>::new(path, cfg).is_err());
1461 }
1462
1463 #[test]
1464 fn err_new_grown_when_t_implements_drop() {
1465 let (_dir, path, cfg) = new_tmp();
1466 assert!(FrozenMMap::<DropT>::new_grown(path, cfg, 1).is_err());
1467 }
1468
1469 #[test]
1470 fn err_new_grown_when_t_is_not_8_byte_aligned() {
1471 let (_dir, path, cfg) = new_tmp();
1472 assert!(FrozenMMap::<BadAlignT>::new_grown(path, cfg, 1).is_err());
1473 }
1474
1475 #[test]
1476 fn err_new_grown_when_t_size_is_not_multiple_of_8() {
1477 let (_dir, path, cfg) = new_tmp();
1478 assert!(FrozenMMap::<BadSizeT>::new_grown(path, cfg, 1).is_err());
1479 }
1480
1481 #[test]
1482 fn err_new_grown_when_t_is_zero_sized() {
1483 let (_dir, path, cfg) = new_tmp();
1484 assert!(FrozenMMap::<ZstT>::new_grown(path, cfg, 1).is_err());
1485 }
1486 }
1487
1488 mod fm_new_grown {
1489 use super::*;
1490
1491 #[test]
1492 fn ok_new_grown_updates_length() {
1493 let (_dir, path, cfg) = new_tmp();
1494
1495 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1496 assert_eq!(mmap.total_slots(), INIT_SLOTS);
1497 drop(mmap);
1498
1499 let mmap = FrozenMMap::<u64>::new_grown(path, cfg, 0x0A).unwrap();
1500 assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x0A);
1501 assert_eq!(mmap.core.curr_length, (INIT_SLOTS + 0x0A) * FrozenMMap::<u64>::SLOT_SIZE);
1502 }
1503
1504 #[test]
1505 fn err_new_grown_with_preexisting_instance() {
1506 let (_dir, path, cfg) = new_tmp();
1507
1508 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1509 assert_eq!(mmap.total_slots(), INIT_SLOTS);
1510
1511 assert!(FrozenMMap::<u64>::new_grown(path, cfg, 0x0A).is_err());
1512 }
1513
1514 #[test]
1515 fn ok_new_grown_cycle() {
1516 let (_dir, path, cfg) = new_tmp();
1517
1518 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1519 drop(mmap);
1520
1521 let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x100).unwrap();
1522 assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x100);
1523 drop(mmap);
1524
1525 let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x100).unwrap();
1526 assert_eq!(mmap.total_slots(), INIT_SLOTS + (2 * 0x100));
1527 drop(mmap);
1528
1529 let mmap = FrozenMMap::<u64>::new_grown(path, cfg, 0x100).unwrap();
1530 assert_eq!(mmap.total_slots(), INIT_SLOTS + (3 * 0x100));
1531 }
1532
1533 #[test]
1534 fn ok_write_reopen_grown_read() {
1535 let (_dir, path, cfg) = new_tmp();
1536
1537 {
1538 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1539 unsafe { mmap.write(0, |v| *v = 0xAA).unwrap() };
1540 }
1541
1542 {
1543 let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x10).unwrap();
1544 unsafe { mmap.write(0, |v| *v = 0xBB).unwrap() };
1545
1546 let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1547 assert_eq!(val, 0xBB);
1548 }
1549 }
1550
1551 #[test]
1552 fn ok_write_reopen_grown_read_cycle() {
1553 let (_dir, path, cfg) = new_tmp();
1554
1555 {
1556 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1557 unsafe { mmap.write(0, |v| *v = 1).unwrap() };
1558 }
1559
1560 for i in 0..2 {
1561 let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x10).unwrap();
1562 let idx = mmap.total_slots() - 1;
1563 unsafe { mmap.write(idx, |v| *v = (i + 2) as u64).unwrap() };
1564 drop(mmap);
1565 }
1566
1567 let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1568
1569 let base = unsafe { mmap.read(0, |v| *v).unwrap() };
1570 assert_eq!(base, 1);
1571
1572 let last_idx = mmap.total_slots() - 1;
1573 let last = unsafe { mmap.read(last_idx, |v| *v).unwrap() };
1574 assert_eq!(last, 3);
1575 }
1576
1577 #[test]
1578 fn err_new_grown_while_previous_instance_is_alive() {
1579 let (_dir, path, cfg) = new_tmp();
1580
1581 let _mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1582 let reopened = FrozenMMap::<u64>::new_grown(&path, cfg, 0x10);
1583
1584 assert!(reopened.is_err());
1585 }
1586 }
1587
1588 mod fm_write_read {
1589 use super::*;
1590
1591 #[test]
1592 fn ok_write_wait_read_cycle() {
1593 const VAL: u64 = 0xDEADC0DE;
1594 let (_dir, path, cfg) = new_tmp();
1595 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1596
1597 // write + sync
1598 let ticket = unsafe { mmap.write(0, |ptr| *ptr = VAL).unwrap() };
1599
1600 let ticket_epoch = ticket.epoch();
1601 let durable_epoch = futures::executor::block_on(ticket).unwrap();
1602
1603 assert!(durable_epoch >= ticket_epoch);
1604
1605 // read + verify
1606 let val = unsafe { mmap.read(0, |ptr| *ptr).unwrap() };
1607 assert_eq!(val, VAL);
1608 }
1609
1610 #[test]
1611 fn ok_write_read_without_wait() {
1612 const VAL: u64 = 0xDEADC0DE;
1613 let (_dir, path, cfg) = new_tmp();
1614 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1615
1616 unsafe { mmap.write(0, |ptr| *ptr = VAL).unwrap() };
1617 let val = unsafe { mmap.read(0, |ptr| *ptr).unwrap() };
1618 assert_eq!(val, VAL);
1619 }
1620 }
1621
1622 mod fm_tx {
1623 use super::*;
1624
1625 #[test]
1626 fn ok_tx_basic_multi_write() {
1627 let (_dir, path, cfg) = new_tmp();
1628 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1629
1630 let mut tx = mmap.new_tx();
1631 unsafe {
1632 tx.write(0, |v| *v = 1).unwrap();
1633 tx.write(1, |v| *v = 2).unwrap();
1634 tx.write(2, |v| *v = 3).unwrap();
1635 }
1636
1637 let ticket = tx.commit().unwrap();
1638
1639 let ticket_epoch = ticket.epoch();
1640 let durable_epoch = futures::executor::block_on(ticket).unwrap();
1641
1642 assert!(durable_epoch >= ticket_epoch);
1643
1644 let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1645 let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
1646 let v2 = unsafe { mmap.read(2, |v| *v).unwrap() };
1647
1648 assert_eq!((v0, v1, v2), (1, 2, 3));
1649 }
1650
1651 #[test]
1652 fn ok_tx_single_epoch() {
1653 let (_dir, path, cfg) = new_tmp();
1654 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1655
1656 let mut tx = mmap.new_tx();
1657 unsafe {
1658 tx.write(0, |v| *v = 0x0A).unwrap();
1659 tx.write(1, |v| *v = 0x14).unwrap();
1660 }
1661
1662 // NOTE: next write must have strictly higher epoch then current one
1663
1664 let ticket = tx.commit().unwrap();
1665 let next_ticket = unsafe { mmap.write(2, |v| *v = 0x1E).unwrap() };
1666
1667 assert!(next_ticket.epoch() > ticket.epoch());
1668 }
1669
1670 #[test]
1671 fn err_tx_out_of_order() {
1672 let (_dir, path, cfg) = new_tmp();
1673 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1674
1675 let mut tx = mmap.new_tx();
1676 unsafe {
1677 tx.write(2, |v| *v = 1).unwrap();
1678 let res = tx.write(1, |v| *v = 2);
1679 assert!(res.is_err());
1680 }
1681 }
1682
1683 #[test]
1684 fn err_tx_duplicate_index() {
1685 let (_dir, path, cfg) = new_tmp();
1686 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1687
1688 let mut tx = mmap.new_tx();
1689 unsafe {
1690 tx.write(1, |v| *v = 1).unwrap();
1691 let res = tx.write(1, |v| *v = 2);
1692 assert!(res.is_err());
1693 }
1694 }
1695
1696 #[test]
1697 fn ok_tx_concurrent_non_overlapping() {
1698 let (_dir, path, cfg) = new_tmp();
1699 let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
1700
1701 let mut handles = Vec::new();
1702 for i in 0..2 {
1703 let mmap = mmap.clone();
1704 handles.push(thread::spawn(move || {
1705 let mut tx = mmap.new_tx();
1706
1707 unsafe {
1708 tx.write(i * 2, |v| *v = i as u64).unwrap();
1709 tx.write(i * 2 + 1, |v| *v = i as u64).unwrap();
1710 }
1711
1712 let ticket = tx.commit().unwrap();
1713
1714 let ticket_epoch = ticket.epoch();
1715 let durable_epoch = futures::executor::block_on(ticket).unwrap();
1716
1717 assert!(durable_epoch >= ticket_epoch);
1718 }));
1719 }
1720
1721 for h in handles {
1722 h.join().unwrap();
1723 }
1724
1725 for i in 0..2 {
1726 let v0 = unsafe { mmap.read(i * 2, |v| *v).unwrap() };
1727 let v1 = unsafe { mmap.read(i * 2 + 1, |v| *v).unwrap() };
1728
1729 assert_eq!((v0, v1), (i as u64, i as u64));
1730 }
1731 }
1732
1733 #[test]
1734 fn ok_tx_overwrite_last_wins() {
1735 let (_dir, path, cfg) = new_tmp();
1736 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1737
1738 let mut tx = mmap.new_tx();
1739 unsafe {
1740 tx.write(0, |v| *v = 1).unwrap();
1741 }
1742
1743 tx.commit().unwrap();
1744
1745 let mut tx2 = mmap.new_tx();
1746 unsafe {
1747 tx2.write(0, |v| *v = 2).unwrap();
1748 }
1749
1750 let ticket = tx2.commit().unwrap();
1751
1752 let ticket_epoch = ticket.epoch();
1753 let durable_epoch = futures::executor::block_on(ticket).unwrap();
1754
1755 assert!(durable_epoch >= ticket_epoch);
1756
1757 let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1758 assert_eq!(val, 2);
1759 }
1760
1761 #[test]
1762 fn ok_tx_persists_across_reopen() {
1763 let (_dir, path, cfg) = new_tmp();
1764
1765 {
1766 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1767
1768 let mut tx = mmap.new_tx();
1769 unsafe {
1770 tx.write(0, |v| *v = 0x3A).unwrap();
1771 tx.write(1, |v| *v = 0x54).unwrap();
1772 }
1773
1774 let ticket = tx.commit().unwrap();
1775
1776 let ticket_epoch = ticket.epoch();
1777 let durable_epoch = futures::executor::block_on(ticket).unwrap();
1778
1779 assert!(durable_epoch >= ticket_epoch);
1780 }
1781
1782 {
1783 let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1784
1785 let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1786 let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
1787
1788 assert_eq!((v0, v1), (0x3A, 0x54));
1789 }
1790 }
1791 }
1792
1793 mod fm_durability {
1794 use super::*;
1795
1796 #[test]
1797 fn ok_wait_then_drop() {
1798 let (_dir, path, cfg) = new_tmp();
1799 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1800
1801 let ticket = unsafe { mmap.write(0, |v| *v = 7).unwrap() };
1802
1803 let ticket_epoch = ticket.epoch();
1804 let durable_epoch = futures::executor::block_on(ticket).unwrap();
1805
1806 assert!(durable_epoch >= ticket_epoch);
1807
1808 drop(mmap);
1809 }
1810
1811 #[test]
1812 fn ok_epoch_monotonicity() {
1813 let (_dir, path, cfg) = new_tmp();
1814 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1815
1816 let t1 = unsafe { mmap.write(0, |v| *v = 1).unwrap() };
1817 let t2 = unsafe { mmap.write(0, |v| *v = 2).unwrap() };
1818
1819 let durable_epoch = futures::executor::block_on(t2).unwrap();
1820 assert!(durable_epoch >= t1.epoch());
1821 }
1822
1823 #[test]
1824 fn ok_wait_for_durability_with_multi_writers() {
1825 let (_dir, path, cfg) = new_tmp();
1826 let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
1827
1828 let mut handles = Vec::new();
1829 for _ in 0..2 {
1830 let mmap = mmap.clone();
1831 handles.push(thread::spawn(move || {
1832 let ticket = unsafe { mmap.write(0, |v| *v += 1).unwrap() };
1833
1834 let ticket_epoch = ticket.epoch();
1835 let durable_epoch = futures::executor::block_on(ticket).unwrap();
1836
1837 assert!(durable_epoch >= ticket_epoch);
1838 }));
1839 }
1840
1841 for h in handles {
1842 h.join().unwrap();
1843 }
1844
1845 let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1846 assert_eq!(val, 2);
1847 }
1848 }
1849
1850 mod fm_concurrency {
1851 use super::*;
1852
1853 #[test]
1854 fn ok_parallel_reads_with_diff_index() {
1855 let (_dir, path, cfg) = new_tmp();
1856 let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
1857
1858 unsafe { mmap.write(0, |v| *v = 0x10).unwrap() };
1859 unsafe { mmap.write(1, |v| *v = 0x20).unwrap() };
1860
1861 let t1 = {
1862 let mmap = mmap.clone();
1863 thread::spawn(move || unsafe { mmap.read(0, |v| *v).unwrap() })
1864 };
1865
1866 let t2 = {
1867 let mmap = mmap.clone();
1868 thread::spawn(move || unsafe { mmap.read(1, |v| *v).unwrap() })
1869 };
1870
1871 assert_eq!(t1.join().unwrap(), 0x10);
1872 assert_eq!(t2.join().unwrap(), 0x20);
1873 }
1874
1875 #[test]
1876 fn ok_multi_tx_drop_then_reopen_grown() {
1877 let (_dir, path, cfg) = new_tmp();
1878
1879 {
1880 let mmap = sync::Arc::new(FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap());
1881
1882 let mut handles = Vec::new();
1883 for i in 0..2u64 {
1884 let mmap = mmap.clone();
1885 handles.push(thread::spawn(move || {
1886 let _ = unsafe { mmap.write(i as usize, |v| *v = i + 1).unwrap() };
1887 }));
1888 }
1889
1890 for i in 0..2usize {
1891 let mmap = mmap.clone();
1892 handles.push(thread::spawn(move || {
1893 let _ = unsafe { mmap.read(i, |v| *v).unwrap() };
1894 }));
1895 }
1896
1897 for h in handles {
1898 h.join().unwrap();
1899 }
1900 }
1901
1902 {
1903 let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x10).unwrap();
1904 assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x10);
1905
1906 for i in 0..2u64 {
1907 let val = unsafe { mmap.read(i as usize, |v| *v).unwrap() };
1908 assert_eq!(val, i + 1);
1909 }
1910
1911 let idx = mmap.total_slots() - 1;
1912 unsafe { mmap.write(idx, |v| *v = 0xDEAD).unwrap() };
1913
1914 let val = unsafe { mmap.read(idx, |v| *v).unwrap() };
1915 assert_eq!(val, 0xDEAD);
1916 }
1917 }
1918 }
1919
1920 mod ack_ticket {
1921 use super::*;
1922
1923 #[test]
1924 fn ok_parallel_waiters_same_epoch_window() {
1925 let (_dir, path, cfg) = new_tmp();
1926 let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
1927
1928 let barrier = sync::Arc::new(sync::Barrier::new(0x10));
1929
1930 let mut handles = Vec::new();
1931 for i in 0..0x10usize {
1932 let mmap = mmap.clone();
1933 let barrier = barrier.clone();
1934
1935 handles.push(thread::spawn(move || {
1936 barrier.wait();
1937
1938 let ticket = unsafe { mmap.write(i % INIT_SLOTS, |v| *v = i as u64).unwrap() };
1939
1940 let epoch = ticket.epoch();
1941 let durable = futures::executor::block_on(ticket).unwrap();
1942
1943 assert!(durable >= epoch);
1944 }));
1945 }
1946
1947 for handle in handles {
1948 handle.join().expect("worker thread panicked");
1949 }
1950 }
1951
1952 #[test]
1953 fn ok_parallel_waiters_same_epoch() {
1954 let completion = sync::Arc::new(ack::Completion::default());
1955
1956 let mut handles = Vec::new();
1957 for _ in 0..0x10 {
1958 let completion = completion.clone();
1959
1960 handles.push(thread::spawn(move || {
1961 let ticket = ack::AckTicket::new(1, completion);
1962
1963 assert_eq!(
1964 futures::executor::block_on(ticket).expect("ticket must complete"),
1965 1
1966 );
1967 }));
1968 }
1969
1970 thread::sleep(time::Duration::from_millis(0x0A));
1971
1972 completion.mark_epoch_as_durable(1);
1973 completion.notify_all_listeners();
1974
1975 for handle in handles {
1976 handle.join().expect("worker thread panicked");
1977 }
1978 }
1979 }
1980}