frozen_core/fmmap/mod.rs
1//! Custom implementation of `mmap(2)`
2//!
3//! ## Constraints
4//!
5//! [`FrozenMMap`] treats the mapped file as raw storage for values of `T`. Because of that,
6//! `T` must be a POD type which is safe to persist and later reinterpret from bytes.
7//!
8//! Required properties for `T`,
9//!
10//! - Must use `#[repr(C)]`
11//! - Should be 8-bytes aligned
12//! - Must not implement [`Drop`]
13//! - `size_of::<T>()` should be multiple of `8`
14//!
15//! *NOTE:* `T` must not contain heap owning or process-local pointers like [`Vec`], [`String`],
16//! [`Box`], references and function pointers, or other fields whose bit-pattern is not stable
17//! across reopen.
18//!
19//! These constrains are enforced as [`FrozenMMap`] does not serialize or deserialize values. It
20//! directly reads and writes `T` inside a memory mapped file. That means `T` must have a stable
21//! layout and must remain valid when the file is reopened in a later process.
22//!
23//! ## Example
24//!
25//! ```
26//! use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
27//!
28//! const MODULE_ID: u8 = 0;
29//!
30//! let dir = tempfile::tempdir().unwrap();
31//! let path = dir.path().join("tmp_frozen_mmap");
32//!
33//! let cfg = FrozenMMapCfg {
34//! module_id: MODULE_ID,
35//! initial_count: 0x0A,
36//! flush_duration: std::time::Duration::from_micros(0x96),
37//! };
38//!
39//! let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
40//! assert_eq!(mmap.total_slots(), 0x0A);
41//!
42//! let ticket = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
43//! let _ = futures::executor::block_on(ticket).unwrap();
44//!
45//! let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
46//! assert_eq!(val, 0xDEADC0DE);
47//!
48//! drop(mmap);
49//!
50//! let reopened = FrozenMMap::<u64>::new_grown(&path, cfg, 0x05).unwrap();
51//! assert_eq!(reopened.total_slots(), 0x0A + 0x05);
52//!
53//! let val = unsafe { reopened.read(0, |v| *v) }.unwrap();
54//! assert_eq!(val, 0xDEADC0DE);
55//! ```
56
57#[cfg(any(target_os = "linux", target_os = "macos"))]
58mod posix;
59
60use crate::{
61 ack,
62 error::FrozenResult,
63 ffile::{FrozenFile, FrozenFileCfg},
64 hints,
65};
66use std::{
67 fmt,
68 sync::{self, atomic},
69 thread, time,
70};
71
72/// type for `epoch` used by write ops
73pub type TEpoch = u64;
74
75#[cfg(any(target_os = "linux", target_os = "macos"))]
76type TMap = posix::POSIXMMap;
77
78/// Error codes for [`FrozenMMap`]
79pub(in crate::fmmap) mod err {
80 use crate::error::{ErrCode, FrozenError, FrozenResult};
81
82 /// Domain Id for [`FrozenMMap`] is **18**
83 const ERRDOMAIN: u8 = 0x12;
84
85 /// module id used for [`FrozenMMap`]
86 pub static MID: std::sync::OnceLock<u8> = std::sync::OnceLock::new();
87
88 #[cfg(not(test))]
89 #[inline(always)]
90 pub fn mid() -> &'static u8 {
91 MID.get().unwrap()
92 }
93
94 #[cfg(test)]
95 #[inline(always)]
96 pub fn mid() -> &'static u8 {
97 MID.get_or_init(|| 0)
98 }
99
100 /// internal fuck up (hault and catch fire)
101 pub const HCF: ErrCode = ErrCode::new(0x02, "hault and catch fire");
102
103 /// unknown error (fallback)
104 pub const UNK: ErrCode = ErrCode::new(0x04, "unknown error");
105
106 /// no more memory available
107 pub const NMM: ErrCode = ErrCode::new(0x06, "not enough memory available on the device");
108
109 /// syncing error
110 pub const SYN: ErrCode = ErrCode::new(0x08, "failed to sync/flush data to storage device");
111
112 /// no write/read perm
113 pub const PRM: ErrCode = ErrCode::new(0x0A, "missing permissions for IO");
114
115 /// type `T` must not be zero sized
116 pub const ZRO: ErrCode = ErrCode::new(0x0C, "type T must not be zero sized");
117
118 /// flush_tx error (unable to spawn)
119 pub const FXE: ErrCode = ErrCode::new(0x10, "unable to spawn flush_tx");
120
121 /// type `T` implements drop
122 pub const DRP: ErrCode = ErrCode::new(0x12, "type T must not implement `Drop`");
123
124 /// type `T` is not 8 bytes aligned
125 pub const ALN: ErrCode = ErrCode::new(0x14, "type T must be 8-bytes aligned");
126
127 /// `size_of::<T>()` is not multiple of 8
128 pub const SZE: ErrCode = ErrCode::new(0x16, "`size_of::<T>()` must be multiple of 8 bytes");
129
130 #[inline]
131 pub(in crate::fmmap) fn new_err<R, E: std::fmt::Display>(
132 code: ErrCode,
133 error: E,
134 ) -> FrozenResult<R> {
135 let err = FrozenError::new_raw(*mid(), ERRDOMAIN, code, error);
136 Err(err)
137 }
138
139 #[inline]
140 pub(in crate::fmmap) fn new_err_default<R>(code: ErrCode) -> FrozenResult<R> {
141 let err = FrozenError::new_raw(*mid(), ERRDOMAIN, code, "");
142 Err(err)
143 }
144}
145
146/// Config for [`FrozenMMap`]
147#[derive(Debug, Clone)]
148pub struct FrozenMMapCfg {
149 /// Identifier used for error propagation by [`frozen_core::error::FrozenError`]
150 pub module_id: u8,
151
152 /// Number of slots to pre-allocate when [`FrozenMMap`] is initialized
153 ///
154 /// Each slot has size of [`FrozenMMap::<T>::SLOT_SIZE`], where initial file length will
155 /// be `chunk_size * initial_count` (bytes)
156 pub initial_count: usize,
157
158 /// Time interval used by flusher tx, to batch write ops into a durable window and sync them
159 /// together, where all write ops in certain time interval falls into a single durable window
160 pub flush_duration: time::Duration,
161}
162
163/// Custom implementation of `mmap(2)`
164///
165/// ## Constraints
166///
167/// [`FrozenMMap`] treats the mapped file as raw storage for values of `T`. Because of that, `T`
168/// must be a POD type which is safe to persist and later reinterpret from bytes.
169///
170/// Required properties for `T`,
171///
172/// - Must use `#[repr(C)]`
173/// - Should be 8-bytes aligned
174/// - Must not implement [`Drop`]
175/// - `size_of::<T>()` should be multiple of `8`
176///
177/// *NOTE:* `T` must not contain heap owning or process-local pointers like [`Vec`], [`String`],
178/// [`Box`], references and function pointers, or other fields whose bit-pattern is not stable
179/// across reopen.
180///
181/// These constrains are enforced as [`FrozenMMap`] does not serialize or deserialize values. It
182/// directly reads and writes `T` inside a memory mapped file. That means `T` must have a stable
183/// layout and must remain valid when the file is reopened in a later process.
184///
185/// ## Example
186///
187/// ```
188/// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
189///
190/// const MODULE_ID: u8 = 0;
191///
192/// let dir = tempfile::tempdir().unwrap();
193/// let path = dir.path().join("tmp_frozen_mmap");
194///
195/// let cfg = FrozenMMapCfg {
196/// module_id: MODULE_ID,
197/// initial_count: 0x0A,
198/// flush_duration: std::time::Duration::from_micros(0x96),
199/// };
200///
201/// let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
202/// assert_eq!(mmap.total_slots(), 0x0A);
203///
204/// let ticket = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
205/// let _ = futures::executor::block_on(ticket).unwrap();
206///
207/// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
208/// assert_eq!(val, 0xDEADC0DE);
209///
210/// drop(mmap);
211///
212/// let reopened = FrozenMMap::<u64>::new_grown(&path, cfg, 0x05).unwrap();
213/// assert_eq!(reopened.total_slots(), 0x0A + 0x05);
214///
215/// let val = unsafe { reopened.read(0, |v| *v) }.unwrap();
216/// assert_eq!(val, 0xDEADC0DE);
217/// ```
218#[derive(Debug)]
219pub struct FrozenMMap<T>
220where
221 T: Sized + Send + Sync,
222{
223 core: sync::Arc<Core>,
224 flush_tx_handle: Option<thread::JoinHandle<()>>,
225 _t: core::marker::PhantomData<T>,
226}
227
228unsafe impl<T> Send for FrozenMMap<T> where T: Sized + Send + Sync {}
229unsafe impl<T> Sync for FrozenMMap<T> where T: Sized + Send + Sync {}
230
231impl<T> FrozenMMap<T>
232where
233 T: Sized + Send + Sync,
234{
235 /// Memory space required for each slot of [`T`] in [`FrozenMMap`]
236 pub const SLOT_SIZE: usize = std::mem::size_of::<T>();
237
238 /// Create a new [`FrozenMMap`] instance w/ given [`FrozenMMapCfg`]
239 ///
240 /// ## Multiple Instances
241 ///
242 /// For each [`FrozenMMap`] instance, we acquire an exclusive lock from the kernal for the
243 /// underlying [`FrozenFile`], when trying to create multiple instances of [`FrozenMMap`], an
244 /// error will be thrown.
245 ///
246 /// ## Capacity Growth
247 ///
248 /// [`FrozenMMap`] does not support in-place growth of a live mapping, to increase capacity,
249 /// drop the current instance and reopen w/ [`FrozenMMap::open_grown`] which provides memory
250 /// mapping over grown capacity.
251 ///
252 /// ## [`FrozenMMapCfg`]
253 ///
254 /// All configs for [`FrozenMMap`] are stored in [`FrozenMMapCfg`]
255 ///
256 /// ## Working
257 ///
258 /// We first create a new [`FrozenFile`] if note already, then map the entire file using
259 /// `mmap(2)`, the entire file must read/write `T`, which also should stay constant for the
260 /// entire lifetime of file.
261 ///
262 /// ## Important
263 ///
264 /// The `cfg` must not change any of its properties for the entire life of [`FrozenFile`],
265 /// which is used under the hood, one must use config stores like [`Rta`](https://crates.io/crates/rta)
266 /// to store config.
267 ///
268 /// ## Example
269 ///
270 /// ```
271 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
272 ///
273 /// const MODULE_ID: u8 = 0;
274 ///
275 /// let dir = tempfile::tempdir().unwrap();
276 /// let path = dir.path().join("tmp_frozen_mmap");
277 ///
278 /// let cfg = FrozenMMapCfg {
279 /// module_id: MODULE_ID,
280 /// initial_count: 0x0A,
281 /// flush_duration: std::time::Duration::from_micros(0x96),
282 /// };
283 ///
284 /// let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
285 /// assert_eq!(mmap.total_slots(), 0x0A);
286 ///
287 /// let ticket = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
288 /// let _ = futures::executor::block_on(ticket).unwrap();
289 ///
290 /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
291 /// assert_eq!(val, 0xDEADC0DE);
292 /// ```
293 pub fn new<P: AsRef<std::path::Path>>(path: P, cfg: FrozenMMapCfg) -> FrozenResult<Self> {
294 Self::validate_t()?;
295 let (file, curr_length) = Self::open_file(path.as_ref().to_path_buf(), &cfg)?;
296 let total_slots = curr_length / Self::SLOT_SIZE;
297
298 // NOTE: The value is used for error logging and is initialized only once, as `OnceLock`
299 // guarantees that the first caller sets the value and all subsequent calls reuse it
300 let _ = err::MID.get_or_init(|| cfg.module_id);
301
302 let mmap = unsafe { TMap::new(file.fd(), curr_length) }?;
303 let core = sync::Arc::new(Core::new(mmap, file, curr_length, total_slots));
304
305 let cloned_core = sync::Arc::clone(&core);
306 let flush_tx_handle = match thread::Builder::new()
307 .name(format!("mod{}_fmmap_flush_tx", cfg.module_id))
308 .spawn(move || bg_flush_thread(cloned_core, cfg.flush_duration))
309 {
310 Ok(handle) => Some(handle),
311 Err(observed_error) => {
312 return err::new_err(err::FXE, observed_error);
313 }
314 };
315
316 Ok(Self { core, flush_tx_handle, _t: core::marker::PhantomData })
317 }
318
319 /// Create a new [`FrozenMMap`] instance w/ given [`FrozenMMapCfg`], while growing the
320 /// underlying [`FrozenFile`] by `additional_slots` before creating memory mapping
321 ///
322 /// ## Multiple Instances
323 ///
324 /// For each [`FrozenMMap`] instance, we acquire an exclusive lock from the kernal for the
325 /// underlying [`FrozenFile`], when trying to create multiple instances of [`FrozenMMap`], an
326 /// error will be thrown.
327 ///
328 /// ## Why not create a [`FrozenMMap::grow`] call?
329 ///
330 /// Previously when [`FrozenMMap::grow`] was attempted, it was observed that, resizing an active
331 /// memory mapping in place is tricky in concurrent code, as some threads would still hold
332 /// stale/unmapped pointers to mmap due to preemption from the OS schedular
333 ///
334 /// So, instead of remmaping a live instance, the current API performs growth during open,
335 /// making capacity expansion an explicit lifecycle operation, and not a side effect on a live
336 /// instance.
337 ///
338 /// ## [`FrozenMMapCfg`]
339 ///
340 /// All configs for [`FrozenMMap`] are stored in [`FrozenMMapCfg`]
341 ///
342 /// ## Working
343 ///
344 /// We first create a new [`FrozenFile`] if note already, then we grow the file using
345 /// [`FrozenFile::grow`] then map the entire file using `mmap(2)`, the entire file must
346 /// read/write `T`, which also should stay constant for the entire lifetime of file.
347 ///
348 /// ## Important
349 ///
350 /// The `cfg` must not change any of its properties for the entire life of [`FrozenFile`],
351 /// which is used under the hood, one must use config stores like
352 /// [`Rta`](https://crates.io/crates/rta) to store config.
353 ///
354 /// ## Example
355 ///
356 /// ```
357 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
358 ///
359 /// const MODULE_ID: u8 = 0;
360 ///
361 /// let dir = tempfile::tempdir().unwrap();
362 /// let path = dir.path().join("tmp_frozen_mmap");
363 ///
364 /// let cfg = FrozenMMapCfg {
365 /// module_id: MODULE_ID,
366 /// initial_count: 0x0A,
367 /// flush_duration: std::time::Duration::from_micros(0x96),
368 /// };
369 ///
370 /// let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x0A).unwrap();
371 /// assert_eq!(mmap.total_slots(), 0x0A * 2);
372 ///
373 /// let ticket = unsafe { mmap.write(0, |v| *v = 0xDEADC0DE) }.unwrap();
374 /// let _ = futures::executor::block_on(ticket).unwrap();
375 ///
376 /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
377 /// assert_eq!(val, 0xDEADC0DE);
378 /// ```
379 pub fn new_grown<P: AsRef<std::path::Path>>(
380 path: P,
381 cfg: FrozenMMapCfg,
382 additional_slots: usize,
383 ) -> FrozenResult<Self> {
384 Self::validate_t()?;
385 let (file, _) = Self::open_file(path.as_ref().to_path_buf(), &cfg)?;
386
387 // we grow the underlying FrozenFile as requested
388 file.grow(additional_slots)?;
389 let curr_length = file.length()?; // we must read the updated (grown) length
390 let total_slots = curr_length / Self::SLOT_SIZE;
391
392 // NOTE: The value is used for error logging and is initialized only once, as `OnceLock`
393 // guarantees that the first caller sets the value and all subsequent calls reuse it
394 let _ = err::MID.get_or_init(|| cfg.module_id);
395
396 let mmap = unsafe { TMap::new(file.fd(), curr_length) }?;
397 let core = sync::Arc::new(Core::new(mmap, file, curr_length, total_slots));
398
399 let cloned_core = sync::Arc::clone(&core);
400 let flush_tx_handle = match thread::Builder::new()
401 .name(format!("mod{}_fmmap_flush_tx", cfg.module_id))
402 .spawn(move || bg_flush_thread(cloned_core, cfg.flush_duration))
403 {
404 Ok(handle) => Some(handle),
405 Err(observed_error) => {
406 return err::new_err(err::FXE, observed_error);
407 }
408 };
409
410 Ok(Self { core, flush_tx_handle, _t: core::marker::PhantomData })
411 }
412
413 /// Create/open a new [`FrozenFile`] instance
414 fn open_file(
415 path: std::path::PathBuf,
416 cfg: &FrozenMMapCfg,
417 ) -> FrozenResult<(FrozenFile, usize)> {
418 let ff_cfg = FrozenFileCfg {
419 path,
420 module_id: cfg.module_id,
421 buffer_size: Self::SLOT_SIZE,
422 initial_available_buffers: cfg.initial_count,
423 };
424
425 let file = FrozenFile::new(ff_cfg)?;
426 let curr_length = file.length()?;
427
428 Ok((file, curr_length))
429 }
430
431 #[inline]
432 fn validate_t() -> FrozenResult<()> {
433 if std::mem::needs_drop::<T>() {
434 return err::new_err_default(err::DRP);
435 }
436
437 let align = std::mem::align_of::<T>();
438 if align != 8 {
439 return err::new_err_default(err::ALN);
440 }
441
442 let size = std::mem::size_of::<T>();
443 if size == 0 {
444 return err::new_err_default(err::ZRO);
445 }
446
447 if size % 8 != 0 {
448 return err::new_err_default(err::SZE);
449 }
450
451 Ok(())
452 }
453
454 /// Read a `T` at given `index` via callback (`f`)
455 ///
456 /// ## Concurrency
457 ///
458 /// Internally, [`FrozenMMap`] implements per-slot locking, so concurrent reads and writes for
459 /// at same index is atomic and thread safe, while operations on different indices may proceed
460 /// fully in parallel
461 ///
462 /// ## Safety
463 ///
464 /// The caller must ensure following:
465 ///
466 /// - given `index` is within bounds
467 /// - underlying memory contains a valid instance of `T`
468 /// - provided callback `f` must not,
469 /// - write through the pointer
470 /// - store or leak pointer beyound there lifetime
471 ///
472 /// Violating any of the above may result in undefined behavior
473 ///
474 /// ## Example
475 ///
476 /// ```
477 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
478 ///
479 /// const MODULE_ID: u8 = 0;
480 ///
481 /// let dir = tempfile::tempdir().unwrap();
482 /// let path = dir.path().join("tmp_read_mmap");
483 ///
484 /// let cfg = FrozenMMapCfg {
485 /// module_id: MODULE_ID,
486 /// initial_count: 0x02,
487 /// flush_duration: std::time::Duration::from_micros(0x60),
488 /// };
489 ///
490 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
491 ///
492 /// let ticket = unsafe { mmap.write(0, |v| *v = 0x0A) }.unwrap();
493 /// let _ = futures::executor::block_on(ticket).unwrap();
494 ///
495 /// let val = unsafe { mmap.read(0, |v| *v) }.unwrap();
496 /// assert_eq!(val, 0x0A);
497 /// ```
498 #[inline(always)]
499 pub unsafe fn read<R>(&self, index: usize, f: impl FnOnce(*const T) -> R) -> FrozenResult<R> {
500 let offset = Self::SLOT_SIZE * index;
501 let _lock = self.core.locks.lock(index);
502
503 // NOTE: We do avoid acquiring io_lock for read ops to increase the throughput (under the
504 // assumption of OS guarantees visibility)
505
506 let ptr = unsafe { self.core.map.as_ptr(offset) };
507 Ok(f(ptr))
508 }
509
510 /// Write/update a `T` at given `index` via callback (`f`)
511 ///
512 /// ## Concurrency
513 ///
514 /// Internally, [`FrozenMMap`] implements per-slot locking, so concurrent reads and writes for
515 /// at same index is atomic and thread safe, while operations on different indices may proceed
516 /// fully in parallel
517 ///
518 /// ## Safety
519 ///
520 /// The caller must ensure following:
521 ///
522 /// - given `index` is within bounds
523 /// - underlying memory contains a valid instance of `T`
524 /// - provided callback `f` must not store or leak pointer beyound there lifetime
525 ///
526 /// Violating any of the above may result in undefined behavior
527 ///
528 /// ## Example
529 ///
530 /// ```
531 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
532 ///
533 /// const MODULE_ID: u8 = 0;
534 ///
535 /// let dir = tempfile::tempdir().unwrap();
536 /// let path = dir.path().join("tmp_write_mmap");
537 ///
538 /// let cfg = FrozenMMapCfg {
539 /// module_id: MODULE_ID,
540 /// initial_count: 0x02,
541 /// flush_duration: std::time::Duration::from_micros(0x96),
542 /// };
543 ///
544 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
545 ///
546 /// let ticket = unsafe {mmap.write(1, |v| *v = 0x2B) }.unwrap();
547 /// let _ = futures::executor::block_on(ticket).unwrap();
548 ///
549 /// let val = unsafe { mmap.read(1, |v| *v) }.unwrap();
550 /// assert_eq!(val, 0x2B);
551 /// ```
552 #[inline(always)]
553 pub unsafe fn write(
554 &self,
555 index: usize,
556 f: impl FnOnce(*mut T),
557 ) -> FrozenResult<ack::AckTicket> {
558 // propagate prev errors
559 if let Some(err) = self.core.completion.get_err() {
560 return Err(err);
561 }
562
563 let offset = Self::SLOT_SIZE * index;
564
565 let _guard = self.core.acquire_io_lock();
566 let _lock = self.core.locks.lock(index);
567
568 let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
569 f(ptr);
570
571 self.core.dirty.store(true, atomic::Ordering::Release);
572 let epoch = self.core.completion.increment_current_epoch();
573
574 Ok(ack::AckTicket::new(epoch, self.core.completion.clone()))
575 }
576
577 /// Read current available count of slots, where each slot has size of [`FrozenMMap::<T>::SLOT_SIZE`]
578 ///
579 /// ## Working
580 ///
581 /// This call performs a syscall to fetch current length of [`FrozenFile`] from fs, as the
582 /// current length of the file is not cached anywhere in the pipeline to avoid TOCTAU race
583 /// conditions
584 ///
585 /// ## Example
586 ///
587 /// ```
588 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
589 ///
590 /// const MODULE_ID: u8 = 0;
591 ///
592 /// let dir = tempfile::tempdir().unwrap();
593 /// let path = dir.path().join("tmp_grow_mmap");
594 ///
595 /// let cfg = FrozenMMapCfg {
596 /// module_id: MODULE_ID,
597 /// initial_count: 0x02,
598 /// flush_duration: std::time::Duration::from_micros(0x96),
599 /// };
600 ///
601 /// let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
602 /// assert_eq!(mmap.total_slots(), 0x02);
603 ///
604 /// drop(mmap);
605 ///
606 /// let mmap = FrozenMMap::<u64>::new_grown(&path, cfg, 0x03).unwrap();
607 /// assert_eq!(mmap.total_slots(), 0x02 + 0x03);
608 /// ```
609 #[inline]
610 pub fn total_slots(&self) -> usize {
611 self.core.curr_length / Self::SLOT_SIZE
612 }
613
614 /// Read the total memory footprint (in bytes) used by [`FrozenMMap`]
615 ///
616 /// *NOTE:* This is an approzimation of memory used, actual RSS may differ depending on paging
617 /// and OS
618 ///
619 /// ## Example
620 ///
621 /// ```
622 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
623 ///
624 /// const MODULE_ID: u8 = 0;
625 ///
626 /// let dir = tempfile::tempdir().unwrap();
627 /// let path = dir.path().join("tmp_mem_usage");
628 ///
629 /// let cfg = FrozenMMapCfg {
630 /// module_id: MODULE_ID,
631 /// initial_count: 0x10,
632 /// flush_duration: std::time::Duration::from_micros(0x60),
633 /// };
634 ///
635 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
636 ///
637 /// let bytes = mmap.memory_usage();
638 /// assert!(bytes >= mmap.total_slots() * std::mem::size_of::<u64>());
639 /// ```
640 #[inline]
641 pub fn memory_usage(&self) -> usize {
642 // memory of mem-maped region
643 let mmap_bytes = self.core.curr_length;
644
645 // memory used for locking
646 let lock_bytes = self.core.locks.0.len() * std::mem::size_of::<atomic::AtomicU8>();
647
648 mmap_bytes + lock_bytes
649 }
650
651 /// Create a new [`FrozenMMapTransaction`] context for grouping multi write ops into a single atomic
652 /// operation
653 ///
654 /// ## Overview
655 ///
656 /// The use of [`FrozenMMapTransaction`] allows to group multiple write ops into a single atomic
657 /// operation, hence creating a transactional write operation, which gives following guarantees:
658 ///
659 /// - All write ops succeed together
660 /// - Single epoch to track durability of all writes ops
661 /// - Same durability guarantee for all the included write ops
662 ///
663 /// Simply, this preserves atomic durability semantics for multi index updates
664 ///
665 /// ## Example
666 ///
667 /// ```
668 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
669 ///
670 /// const MODULE_ID: u8 = 0;
671 ///
672 /// let dir = tempfile::tempdir().unwrap();
673 /// let path = dir.path().join("tmp_tx");
674 ///
675 /// let cfg = FrozenMMapCfg {
676 /// module_id: MODULE_ID,
677 /// initial_count: 0x0A,
678 /// flush_duration: std::time::Duration::from_micros(50),
679 /// };
680 ///
681 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
682 ///
683 /// let mut tx = mmap.new_tx();
684 /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
685 /// unsafe { tx.write(1, |v| *v = 0x14) }.unwrap();
686 ///
687 /// let ticket = tx.commit().unwrap();
688 /// let _ = futures::executor::block_on(ticket).unwrap();
689 ///
690 /// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
691 /// let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
692 ///
693 /// assert_eq!((v0, v1), (0x0A, 0x14));
694 /// ```
695 #[inline]
696 pub fn new_tx(&self) -> FrozenMMapTransaction<'_, T> {
697 FrozenMMapTransaction { core: &self.core, ops_vec: Vec::new() }
698 }
699
700 /// Delete the underlying [`FrozenFile`] used for [`FrozenMMap`] from fs
701 ///
702 /// ## Working
703 ///
704 /// When `delete` is called, all read, write, and (background) sync ops are paused
705 /// (indefinitely), whule deletion is done with following steps:
706 ///
707 /// - acquire an exclusive `io_lock` (all other ops are paused indefinitely)
708 /// - if any batch is pending for sync,
709 /// - swap the flag
710 /// - call sync manually
711 /// - incr epoch and update cv
712 /// - brodcast closing so flusher tx could wrap up
713 /// - `munmap(2)` current mapping
714 /// - call delete on [`FrozenFile`]
715 ///
716 /// ## Example
717 ///
718 /// ```
719 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
720 ///
721 /// const MODULE_ID: u8 = 0;
722 ///
723 /// let dir = tempfile::tempdir().unwrap();
724 /// let path = dir.path().join("tmp_delete_mmap");
725 ///
726 /// let cfg = FrozenMMapCfg {
727 /// module_id: MODULE_ID,
728 /// initial_count: 0x04,
729 /// flush_duration: std::time::Duration::from_micros(0x96),
730 /// };
731 ///
732 /// let mut mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
733 /// mmap.delete().unwrap();
734 /// assert!(!path.exists());
735 /// ```
736 pub fn delete(&mut self) -> FrozenResult<()> {
737 // NOTE: we must broadcast that the close is happening to allow flusher tx to wrap up
738 self.core.dirty.store(false, atomic::Ordering::Release);
739 self.core.closed.store(true, atomic::Ordering::Release);
740 self.core.durable_cv.notify_one();
741
742 if let Some(handle) = self.flush_tx_handle.take() {
743 let _ = handle.join();
744 }
745
746 // pause all new write ops
747 let _lock = self.core.acquire_exclusive_io_lock();
748
749 self.munmap()?;
750 self.core.file.delete()
751 }
752
753 /// Flush the dirty mmap data to persistant storage
754 ///
755 /// *NOTE:* This call must be paired w/ [`FrozenMMap::flush_file`] for stronger durability
756 /// guarantee
757 ///
758 /// ## Example
759 ///
760 /// ```
761 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
762 ///
763 /// const MODULE_ID: u8 = 0;
764 ///
765 /// let dir = tempfile::tempdir().unwrap();
766 /// let path = dir.path().join("tmp_delete_mmap");
767 ///
768 /// let cfg = FrozenMMapCfg {
769 /// module_id: MODULE_ID,
770 /// initial_count: 0x04,
771 /// flush_duration: std::time::Duration::from_micros(0x96),
772 /// };
773 ///
774 /// let mut mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
775 /// assert!(unsafe { mmap.flush_mmap() }.is_ok());
776 /// ```
777 #[inline]
778 pub unsafe fn flush_mmap(&self) -> FrozenResult<()> {
779 self.core.map.sync(self.core.curr_length)
780 }
781
782 /// Perform hard flush for the dirty mmap'ed data for stronger durability guarantee
783 ///
784 /// ## Example
785 ///
786 /// ```
787 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
788 ///
789 /// const MODULE_ID: u8 = 0;
790 ///
791 /// let dir = tempfile::tempdir().unwrap();
792 /// let path = dir.path().join("tmp_delete_mmap");
793 ///
794 /// let cfg = FrozenMMapCfg {
795 /// module_id: MODULE_ID,
796 /// initial_count: 0x04,
797 /// flush_duration: std::time::Duration::from_micros(0x96),
798 /// };
799 ///
800 /// let mut mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
801 /// assert!(unsafe { mmap.flush_file() }.is_ok());
802 /// ```
803 #[inline]
804 pub unsafe fn flush_file(&self) -> FrozenResult<()> {
805 self.core.file.sync()
806 }
807
808 #[inline]
809 fn munmap(&self) -> FrozenResult<()> {
810 let length = self.core.curr_length;
811 unsafe { self.core.map.unmap(length) }
812 }
813}
814
815impl<T> Drop for FrozenMMap<T>
816where
817 T: Sized + Send + Sync,
818{
819 fn drop(&mut self) {
820 let is_closed = self.core.closed.swap(true, atomic::Ordering::Release);
821 self.core.cv.notify_one(); // notify flusher tx to shut
822
823 if let Some(handle) = self.flush_tx_handle.take() {
824 let _ = handle.join();
825 }
826
827 // we must acquire an exclusive lock, to prevent dropping while sync, growing or any io ops
828 let _io_lock = self.core.acquire_exclusive_io_lock();
829
830 if !is_closed {
831 let _ = self.munmap();
832 }
833 }
834}
835
836impl<T> fmt::Display for FrozenMMap<T>
837where
838 T: Sized + Send + Sync,
839{
840 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
841 write!(
842 f,
843 "FrozenMMap{{fd: {}, total_slots: {}, len: {}}}",
844 self.core.file.fd(),
845 self.total_slots(),
846 self.core.curr_length,
847 )
848 }
849}
850
851/// ## Why we ignore [`std::sync::PoisonError`]?
852///
853/// The mutex used for lock, is solely used as a parking primitive for [`Condvar`] and does not
854/// protect any mutable state. All the pool invariants and accounting are maintained via atomics
855/// and are completely seperated from the mutex.
856///
857/// A poisoned mutex only indicates that another tx panicked while holding the lock, and indicates
858/// an inconsistent state of the protected value. Since no state can be left partially modified
859/// under this lock, there is no possible consistency risk to recover from and propagating the
860/// poison error would only introduce unnecessary failures into the allocation path.
861///
862/// Therefore, as best effort, we consume the [`std::sync::PoisonError`] and continue operating
863/// with the recovered guard.
864fn bg_flush_thread(core: sync::Arc<Core>, flush_duration: time::Duration) {
865 // init phase (acquiring locks)
866 let mut guard = core.lock.lock().unwrap_or_else(|e| e.into_inner());
867
868 // sync loop w/ non-busy waiting
869 loop {
870 (guard, _) = core.cv.wait_timeout(guard, flush_duration).unwrap_or_else(|e| e.into_inner());
871
872 // NOTE: we must read values of close brodcast before acquire exclusive lock, if done
873 // otherwise, we impose serious deadlock sort of situation for the the flusher tx
874
875 let dirty = core.dirty.swap(false, atomic::Ordering::AcqRel);
876 let closing = core.closed.load(atomic::Ordering::Acquire);
877
878 if !dirty {
879 if closing {
880 core.cv.notify_all();
881 return;
882 }
883
884 continue;
885 }
886
887 // INFO: we must acquire an exclusive IO lock for sync, hence no write, read or
888 // grow could kick in while sync is in progress
889
890 let io_lock = core.acquire_exclusive_io_lock();
891
892 // INFO: we must drop the guard before syscall, as its a blocking operation and holding
893 // the mutex while the syscall takes place is not a good idea, while we drop the mutex
894 // and acqurie it again, in-between other process could acquire it and use it
895 drop(guard);
896
897 // NOTE: We snapshot `current_epoch` before sync to establish a strict batch boundary,
898 // all writes up to `target_epoch` are guaranteed to be part of this flush window,
899 // while anything beyound belongs to the next batch
900 let target_epoch = core.completion.read_current_epoch();
901
902 // NOTE:
903 //
904 // - if sync fails, we update the Core::error w/ the received error object
905 // - we clear it up when another sync call succeeds
906 // - this is valid, as the underlying sync flushes entire mmaped region, hence
907 // even if the last call failed, and the new one succeeded, we do get the durability
908 // guarenty for the old data as well
909
910 if let Err(new_error) = core.sync() {
911 core.completion.set_err(new_error);
912 } else {
913 core.completion.mark_epoch_as_durable(target_epoch);
914 core.completion.del_err();
915 }
916
917 // NOTE: notify all listeners currently waiting for durability progress
918 core.completion.notify_all_listeners();
919
920 drop(io_lock);
921 guard = core.acquire_lock();
922 }
923}
924
925/// A context for grouping multi write ops into a single atomic operation
926///
927/// ## Overview
928///
929/// Use of [`FrozenMMapTransaction`] allows to group multiple write ops into a single atomic operation.
930///
931/// - All included writes are applied together
932/// - Single epoch is assinged for an entier transaction
933/// - Durability guarantee is same for all included write ops
934///
935/// ## Example
936///
937/// ```
938/// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
939///
940/// const MODULE_ID: u8 = 0;
941///
942/// let dir = tempfile::tempdir().unwrap();
943/// let path = dir.path().join("tmp_tx");
944///
945/// let cfg = FrozenMMapCfg {
946/// module_id: MODULE_ID,
947/// initial_count: 0x0A,
948/// flush_duration: std::time::Duration::from_micros(50),
949/// };
950///
951/// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
952///
953/// let mut tx = mmap.new_tx();
954/// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
955/// unsafe { tx.write(1, |v| *v = 0x14) }.unwrap();
956/// unsafe { tx.write(2, |v| *v = 0x18) }.unwrap();
957///
958/// let ticket = tx.commit().unwrap();
959/// let _ = futures::executor::block_on(ticket).unwrap();
960///
961/// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
962/// let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
963/// let v2 = unsafe { mmap.read(2, |v| *v).unwrap() };
964///
965/// assert_eq!((v0, v1, v2), (0x0A, 0x14, 0x18));
966/// ```
967pub struct FrozenMMapTransaction<'a, T> {
968 core: &'a Core,
969 ops_vec: Vec<(usize, Box<dyn FnOnce(*mut T) + 'a>)>,
970}
971
972impl<'a, T> FrozenMMapTransaction<'a, T> {
973 /// Append a write op into the [`FrozenMMapTransaction`]
974 ///
975 /// ## Requirements
976 ///
977 /// Write ops must follow these safety requirements,
978 ///
979 /// - No duplicate indices
980 /// - No out-of-order writes (must be incremental)
981 ///
982 /// Violating this constraint would result in [`FrozenError`]
983 ///
984 /// ## Safety
985 ///
986 /// Same safety requirements as [`FrozenMMap::write`] apply here
987 ///
988 /// ## Example
989 ///
990 /// ```
991 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
992 ///
993 /// const MODULE_ID: u8 = 0;
994 ///
995 /// let dir = tempfile::tempdir().unwrap();
996 /// let path = dir.path().join("tmp_tx");
997 ///
998 /// let cfg = FrozenMMapCfg {
999 /// module_id: MODULE_ID,
1000 /// initial_count: 0x10,
1001 /// flush_duration: std::time::Duration::from_micros(50),
1002 /// };
1003 ///
1004 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1005 ///
1006 /// let mut tx = mmap.new_tx();
1007 /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
1008 /// unsafe { tx.write(1, |v| *v = 0x0B) }.unwrap();
1009 /// unsafe { tx.write(2, |v| *v = 0x0C) }.unwrap();
1010 ///
1011 /// let ticket = tx.commit().unwrap();
1012 /// let _ = futures::executor::block_on(ticket).unwrap();
1013 ///
1014 /// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1015 /// let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
1016 /// let v2 = unsafe { mmap.read(2, |v| *v).unwrap() };
1017 ///
1018 /// assert_eq!((v0, v1, v2), (0x0A, 0x0B, 0x0C));
1019 /// ```
1020 #[inline(always)]
1021 pub unsafe fn write<F>(&mut self, index: usize, f: F) -> FrozenResult<()>
1022 where
1023 F: FnOnce(*mut T) + 'a,
1024 {
1025 // NOTE:
1026 //
1027 // This check prevents a potential footgun! For a safe transaction all writes must be,
1028 // - ordered by index (either incr or decr)
1029 // - no multi writes on same index
1030 //
1031 // If any of these is violated, there is a certain risk of deadlock in multi tx env's
1032 if let Some((last_idx, _)) = self.ops_vec.last() {
1033 if index <= *last_idx {
1034 return err::new_err(
1035 err::HCF,
1036 "tx writes must be strictly ordered, with no more then single ops on given index",
1037 );
1038 }
1039 }
1040
1041 self.ops_vec.push((index, Box::new(f)));
1042 Ok(())
1043 }
1044
1045 /// Commit the transaction, applying all the writes ops, combined into a single atomic operation
1046 ///
1047 /// ## Guarantees
1048 ///
1049 /// - All writes are applied under a single epoch
1050 /// - All writes belong to the same durability batch
1051 /// - No interleaving with other transactions at epoch level
1052 ///
1053 /// ## Example
1054 ///
1055 /// ```
1056 /// use frozen_core::fmmap::{FrozenMMap, FrozenMMapCfg};
1057 ///
1058 /// const MODULE_ID: u8 = 0;
1059 ///
1060 /// let dir = tempfile::tempdir().unwrap();
1061 /// let path = dir.path().join("tmp_tx");
1062 ///
1063 /// let cfg = FrozenMMapCfg {
1064 /// module_id: MODULE_ID,
1065 /// initial_count: 0x10,
1066 /// flush_duration: std::time::Duration::from_micros(50),
1067 /// };
1068 ///
1069 /// let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1070 ///
1071 /// let mut tx = mmap.new_tx();
1072 /// unsafe { tx.write(0, |v| *v = 0x0A) }.unwrap();
1073 /// unsafe { tx.write(2, |v| *v = 0x0C) }.unwrap();
1074 ///
1075 /// let ticket = tx.commit().unwrap();
1076 /// let _ = futures::executor::block_on(ticket).unwrap();
1077 ///
1078 /// let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1079 /// let v1 = unsafe { mmap.read(2, |v| *v).unwrap() };
1080 ///
1081 /// assert_eq!((v0, v1), (0x0A, 0x0C));
1082 /// ```
1083 #[inline(always)]
1084 pub fn commit(self) -> FrozenResult<ack::AckTicket> {
1085 if let Some(err) = self.core.completion.get_err() {
1086 return Err(err);
1087 }
1088
1089 let _guard = self.core.acquire_io_lock();
1090
1091 // NOTE: we must acquire all locks beforehand, to make sure all the write go through
1092 let mut guards = Vec::with_capacity(self.ops_vec.len());
1093 for (idx, _) in &self.ops_vec {
1094 guards.push(self.core.locks.lock(*idx));
1095 }
1096
1097 for (idx, op) in self.ops_vec {
1098 let offset = idx * std::mem::size_of::<T>();
1099 let ptr = unsafe { self.core.map.as_mut_ptr(offset) };
1100 op(ptr);
1101 }
1102
1103 self.core.dirty.store(true, atomic::Ordering::Release);
1104 let epoch = self.core.completion.increment_current_epoch();
1105
1106 Ok(ack::AckTicket::new(epoch, self.core.completion.clone()))
1107 }
1108}
1109
1110#[derive(Debug)]
1111struct Core {
1112 completion: sync::Arc<ack::Completion>,
1113 cv: sync::Condvar,
1114 curr_length: usize,
1115 dirty: atomic::AtomicBool,
1116 io_lock: sync::RwLock<()>,
1117 map: TMap,
1118 locks: Locks,
1119 lock: sync::Mutex<()>,
1120 file: FrozenFile,
1121 durable_cv: sync::Condvar,
1122 closed: atomic::AtomicBool,
1123}
1124
1125unsafe impl Send for Core {}
1126unsafe impl Sync for Core {}
1127
1128impl Core {
1129 fn new(map: TMap, file: FrozenFile, curr_length: usize, total_slots: usize) -> Self {
1130 Self {
1131 map,
1132 file,
1133 curr_length,
1134 completion: sync::Arc::new(ack::Completion::default()),
1135 cv: sync::Condvar::new(),
1136 lock: sync::Mutex::new(()),
1137 io_lock: sync::RwLock::new(()),
1138 locks: Locks::new(total_slots),
1139 durable_cv: sync::Condvar::new(),
1140 dirty: atomic::AtomicBool::new(false),
1141 closed: atomic::AtomicBool::new(false),
1142 }
1143 }
1144
1145 #[inline]
1146 fn sync(&self) -> FrozenResult<()> {
1147 unsafe { self.map.sync(self.curr_length) }?;
1148 self.file.sync()
1149 }
1150
1151 /// NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
1152 #[inline]
1153 fn acquire_lock(&self) -> sync::MutexGuard<'_, ()> {
1154 self.lock.lock().unwrap_or_else(|e| e.into_inner())
1155 }
1156
1157 /// NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
1158 #[inline]
1159 fn acquire_io_lock(&self) -> sync::RwLockReadGuard<'_, ()> {
1160 self.io_lock.read().unwrap_or_else(|e| e.into_inner())
1161 }
1162
1163 /// NOTE: See [`bg_flush_thread`] implementation for rationale behind poison recovery
1164 #[inline]
1165 fn acquire_exclusive_io_lock(&self) -> sync::RwLockWriteGuard<'_, ()> {
1166 self.io_lock.write().unwrap_or_else(|e| e.into_inner())
1167 }
1168}
1169
1170#[derive(Debug)]
1171struct Locks(Box<[atomic::AtomicU8]>);
1172
1173impl Locks {
1174 const LOCK: u8 = 1;
1175 const UNLOCK: u8 = 0;
1176
1177 const L1_CONTENTION: usize = 0x10;
1178 const L2_CONTENTION: usize = 0x20;
1179
1180 fn new(cap: usize) -> Self {
1181 let mut slots = Vec::with_capacity(cap);
1182 for _ in 0..cap {
1183 slots.push(atomic::AtomicU8::new(Self::UNLOCK));
1184 }
1185
1186 Self(slots.into_boxed_slice())
1187 }
1188
1189 #[inline(always)]
1190 fn lock(&self, index: usize) -> LockGuard<'_> {
1191 let val = &self.0[index];
1192 let mut spins = 0;
1193
1194 loop {
1195 if val
1196 .compare_exchange_weak(
1197 Self::UNLOCK,
1198 Self::LOCK,
1199 atomic::Ordering::Acquire,
1200 atomic::Ordering::Relaxed,
1201 )
1202 .is_ok()
1203 {
1204 return LockGuard(val);
1205 }
1206
1207 // we must backoff under contention
1208
1209 // NOTE:
1210 //
1211 // We have three levels of backoff,
1212 //
1213 // 1. Busy waiting w/ CPU hint
1214 // 2. Willingly allow preemption by OS schedular
1215 // 3. Sleep the thread (increasing w/ exponenial factor)
1216
1217 if hints::likely(spins < Self::L1_CONTENTION) {
1218 std::hint::spin_loop();
1219 } else if spins < Self::L2_CONTENTION {
1220 thread::yield_now();
1221 } else {
1222 let ns = 0x30 << (spins - Self::L2_CONTENTION).min(0x0A);
1223 thread::sleep(time::Duration::from_nanos(ns));
1224 }
1225
1226 spins += 1;
1227 }
1228 }
1229}
1230
1231struct LockGuard<'a>(&'a atomic::AtomicU8);
1232
1233impl Drop for LockGuard<'_> {
1234 fn drop(&mut self) {
1235 self.0.store(Locks::UNLOCK, atomic::Ordering::Release);
1236 }
1237}
1238
1239#[cfg(test)]
1240mod tests {
1241 use super::*;
1242
1243 // NOTE: we keep this small on purpose, so we won't have to wait at all in tests
1244 const FLUSH_DURATION: time::Duration = time::Duration::from_micros(10);
1245
1246 const MODULE_ID: u8 = 0;
1247 const INIT_SLOTS: usize = 0x0A;
1248
1249 fn new_tmp() -> (tempfile::TempDir, std::path::PathBuf, FrozenMMapCfg) {
1250 let dir = tempfile::tempdir().unwrap();
1251 let path = dir.path().join("tmp_map");
1252
1253 let cfg = FrozenMMapCfg {
1254 module_id: MODULE_ID,
1255 initial_count: INIT_SLOTS,
1256 flush_duration: FLUSH_DURATION,
1257 };
1258
1259 (dir, path, cfg)
1260 }
1261
1262 mod fm_lifecycle {
1263 use super::*;
1264
1265 #[test]
1266 fn ok_new() {
1267 let (_dir, path, cfg) = new_tmp();
1268 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1269
1270 assert!(!mmap.core.dirty.load(atomic::Ordering::Acquire));
1271 assert!(!mmap.core.closed.load(atomic::Ordering::Acquire));
1272 assert_eq!(mmap.core.curr_length, INIT_SLOTS * FrozenMMap::<u64>::SLOT_SIZE);
1273
1274 assert_eq!(mmap.core.completion.read_current_epoch(), 0);
1275 assert_eq!(mmap.core.completion.read_durable_epoch(), 0);
1276
1277 // satisfies the bg thread was spawned correctly
1278 assert!(mmap.core.completion.get_err().is_none());
1279
1280 // satisfies wait on epoch works
1281 let ticket = unsafe { mmap.write(0, |f| *f = 0x0A).unwrap() };
1282
1283 let ticket_epoch = ticket.epoch();
1284 let durable_epoch = futures::executor::block_on(ticket).unwrap();
1285
1286 assert!(durable_epoch >= ticket_epoch);
1287 }
1288
1289 #[test]
1290 fn ok_new_existing() {
1291 let (_dir, path, cfg) = new_tmp();
1292
1293 // create new + close
1294 let mmap1 = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1295 drop(mmap1);
1296
1297 // open existing
1298 let mmap2 = FrozenMMap::<u64>::new(path, cfg).unwrap();
1299 drop(mmap2);
1300 }
1301
1302 #[test]
1303 fn err_new_when_change_in_cfg() {
1304 let (_dir, path, mut cfg) = new_tmp();
1305
1306 // create new + close
1307 let mmap1 = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1308 drop(mmap1);
1309
1310 // update cfg + opne existing
1311 cfg.initial_count = INIT_SLOTS * 2;
1312 assert!(FrozenMMap::<u64>::new(path, cfg).is_err());
1313 }
1314
1315 #[test]
1316 fn ok_delete() {
1317 let (_dir, path, cfg) = new_tmp();
1318 let mut mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1319
1320 mmap.delete().unwrap();
1321 assert!(!mmap.core.file.exists().unwrap());
1322 }
1323
1324 #[test]
1325 fn err_delete_after_delete() {
1326 let (_dir, path, cfg) = new_tmp();
1327 let mut mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1328
1329 mmap.delete().unwrap();
1330 assert!(!mmap.core.file.exists().unwrap());
1331 assert!(mmap.delete().is_err());
1332 }
1333
1334 #[test]
1335 fn ok_drop_persists_when_dropped_before_bg_flush() {
1336 let (_dir, path, cfg) = new_tmp();
1337 const VAL: u64 = 0x0A;
1338
1339 {
1340 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1341 unsafe { mmap.write(0, |byte| *byte = VAL).unwrap() };
1342 drop(mmap);
1343 }
1344
1345 {
1346 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1347 let val = unsafe { mmap.read(0, |byte| *byte).unwrap() };
1348 assert_eq!(val, VAL);
1349 }
1350 }
1351 }
1352
1353 mod fm_validate_t {
1354 use super::*;
1355
1356 #[repr(C, align(8))]
1357 struct OkT {
1358 a: u64,
1359 b: u64,
1360 }
1361
1362 #[repr(C)]
1363 struct BadAlignT {
1364 a: u32,
1365 b: u32,
1366 }
1367
1368 #[repr(C, align(4))]
1369 struct BadSizeT {
1370 a: u32,
1371 b: u16,
1372 }
1373
1374 #[repr(C, align(8))]
1375 struct DropT(u64);
1376
1377 impl Drop for DropT {
1378 fn drop(&mut self) {}
1379 }
1380
1381 #[repr(C, align(8))]
1382 struct ZstT;
1383
1384 #[test]
1385 fn ok_validate_t() {
1386 assert!(FrozenMMap::<OkT>::validate_t().is_ok());
1387 }
1388
1389 #[test]
1390 fn err_validate_t_when_drop() {
1391 assert!(FrozenMMap::<DropT>::validate_t().is_err());
1392 }
1393
1394 #[test]
1395 fn err_validate_t_when_not_8_byte_aligned() {
1396 assert!(FrozenMMap::<BadAlignT>::validate_t().is_err());
1397 }
1398
1399 #[test]
1400 fn err_validate_t_when_size_not_multiple_of_8() {
1401 assert!(FrozenMMap::<BadSizeT>::validate_t().is_err());
1402 }
1403
1404 #[test]
1405 fn err_validate_t_when_zero_sized() {
1406 assert!(FrozenMMap::<ZstT>::validate_t().is_err());
1407 }
1408
1409 #[test]
1410 fn err_new_when_t_implements_drop() {
1411 let (_dir, path, cfg) = new_tmp();
1412 assert!(FrozenMMap::<DropT>::new(path, cfg).is_err());
1413 }
1414
1415 #[test]
1416 fn err_new_when_t_is_not_8_byte_aligned() {
1417 let (_dir, path, cfg) = new_tmp();
1418 assert!(FrozenMMap::<BadAlignT>::new(path, cfg).is_err());
1419 }
1420
1421 #[test]
1422 fn err_new_when_t_size_is_not_multiple_of_8() {
1423 let (_dir, path, cfg) = new_tmp();
1424 assert!(FrozenMMap::<BadSizeT>::new(path, cfg).is_err());
1425 }
1426
1427 #[test]
1428 fn err_new_when_t_is_zero_sized() {
1429 let (_dir, path, cfg) = new_tmp();
1430 assert!(FrozenMMap::<ZstT>::new(path, cfg).is_err());
1431 }
1432
1433 #[test]
1434 fn err_new_grown_when_t_implements_drop() {
1435 let (_dir, path, cfg) = new_tmp();
1436 assert!(FrozenMMap::<DropT>::new_grown(path, cfg, 1).is_err());
1437 }
1438
1439 #[test]
1440 fn err_new_grown_when_t_is_not_8_byte_aligned() {
1441 let (_dir, path, cfg) = new_tmp();
1442 assert!(FrozenMMap::<BadAlignT>::new_grown(path, cfg, 1).is_err());
1443 }
1444
1445 #[test]
1446 fn err_new_grown_when_t_size_is_not_multiple_of_8() {
1447 let (_dir, path, cfg) = new_tmp();
1448 assert!(FrozenMMap::<BadSizeT>::new_grown(path, cfg, 1).is_err());
1449 }
1450
1451 #[test]
1452 fn err_new_grown_when_t_is_zero_sized() {
1453 let (_dir, path, cfg) = new_tmp();
1454 assert!(FrozenMMap::<ZstT>::new_grown(path, cfg, 1).is_err());
1455 }
1456 }
1457
1458 mod fm_new_grown {
1459 use super::*;
1460
1461 #[test]
1462 fn ok_new_grown_updates_length() {
1463 let (_dir, path, cfg) = new_tmp();
1464
1465 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1466 assert_eq!(mmap.total_slots(), INIT_SLOTS);
1467 drop(mmap);
1468
1469 let mmap = FrozenMMap::<u64>::new_grown(path, cfg, 0x0A).unwrap();
1470 assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x0A);
1471 assert_eq!(mmap.core.curr_length, (INIT_SLOTS + 0x0A) * FrozenMMap::<u64>::SLOT_SIZE);
1472 }
1473
1474 #[test]
1475 fn err_new_grown_with_preexisting_instance() {
1476 let (_dir, path, cfg) = new_tmp();
1477
1478 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1479 assert_eq!(mmap.total_slots(), INIT_SLOTS);
1480
1481 assert!(FrozenMMap::<u64>::new_grown(path, cfg, 0x0A).is_err());
1482 }
1483
1484 #[test]
1485 fn ok_new_grown_cycle() {
1486 let (_dir, path, cfg) = new_tmp();
1487
1488 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1489 drop(mmap);
1490
1491 let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x100).unwrap();
1492 assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x100);
1493 drop(mmap);
1494
1495 let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x100).unwrap();
1496 assert_eq!(mmap.total_slots(), INIT_SLOTS + (2 * 0x100));
1497 drop(mmap);
1498
1499 let mmap = FrozenMMap::<u64>::new_grown(path, cfg, 0x100).unwrap();
1500 assert_eq!(mmap.total_slots(), INIT_SLOTS + (3 * 0x100));
1501 }
1502
1503 #[test]
1504 fn ok_write_reopen_grown_read() {
1505 let (_dir, path, cfg) = new_tmp();
1506
1507 {
1508 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1509 unsafe { mmap.write(0, |v| *v = 0xAA).unwrap() };
1510 }
1511
1512 {
1513 let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x10).unwrap();
1514 unsafe { mmap.write(0, |v| *v = 0xBB).unwrap() };
1515
1516 let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1517 assert_eq!(val, 0xBB);
1518 }
1519 }
1520
1521 #[test]
1522 fn ok_write_reopen_grown_read_cycle() {
1523 let (_dir, path, cfg) = new_tmp();
1524
1525 {
1526 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1527 unsafe { mmap.write(0, |v| *v = 1).unwrap() };
1528 }
1529
1530 for i in 0..2 {
1531 let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x10).unwrap();
1532 let idx = mmap.total_slots() - 1;
1533 unsafe { mmap.write(idx, |v| *v = (i + 2) as u64).unwrap() };
1534 drop(mmap);
1535 }
1536
1537 let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1538
1539 let base = unsafe { mmap.read(0, |v| *v).unwrap() };
1540 assert_eq!(base, 1);
1541
1542 let last_idx = mmap.total_slots() - 1;
1543 let last = unsafe { mmap.read(last_idx, |v| *v).unwrap() };
1544 assert_eq!(last, 3);
1545 }
1546
1547 #[test]
1548 fn err_new_grown_while_previous_instance_is_alive() {
1549 let (_dir, path, cfg) = new_tmp();
1550
1551 let _mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1552 let reopened = FrozenMMap::<u64>::new_grown(&path, cfg, 0x10);
1553
1554 assert!(reopened.is_err());
1555 }
1556 }
1557
1558 mod fm_write_read {
1559 use super::*;
1560
1561 #[test]
1562 fn ok_write_wait_read_cycle() {
1563 const VAL: u64 = 0xDEADC0DE;
1564 let (_dir, path, cfg) = new_tmp();
1565 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1566
1567 // write + sync
1568 let ticket = unsafe { mmap.write(0, |ptr| *ptr = VAL).unwrap() };
1569
1570 let ticket_epoch = ticket.epoch();
1571 let durable_epoch = futures::executor::block_on(ticket).unwrap();
1572
1573 assert!(durable_epoch >= ticket_epoch);
1574
1575 // read + verify
1576 let val = unsafe { mmap.read(0, |ptr| *ptr).unwrap() };
1577 assert_eq!(val, VAL);
1578 }
1579
1580 #[test]
1581 fn ok_write_read_without_wait() {
1582 const VAL: u64 = 0xDEADC0DE;
1583 let (_dir, path, cfg) = new_tmp();
1584 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1585
1586 unsafe { mmap.write(0, |ptr| *ptr = VAL).unwrap() };
1587 let val = unsafe { mmap.read(0, |ptr| *ptr).unwrap() };
1588 assert_eq!(val, VAL);
1589 }
1590 }
1591
1592 mod fm_tx {
1593 use super::*;
1594
1595 #[test]
1596 fn ok_tx_basic_multi_write() {
1597 let (_dir, path, cfg) = new_tmp();
1598 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1599
1600 let mut tx = mmap.new_tx();
1601 unsafe {
1602 tx.write(0, |v| *v = 1).unwrap();
1603 tx.write(1, |v| *v = 2).unwrap();
1604 tx.write(2, |v| *v = 3).unwrap();
1605 }
1606
1607 let ticket = tx.commit().unwrap();
1608
1609 let ticket_epoch = ticket.epoch();
1610 let durable_epoch = futures::executor::block_on(ticket).unwrap();
1611
1612 assert!(durable_epoch >= ticket_epoch);
1613
1614 let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1615 let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
1616 let v2 = unsafe { mmap.read(2, |v| *v).unwrap() };
1617
1618 assert_eq!((v0, v1, v2), (1, 2, 3));
1619 }
1620
1621 #[test]
1622 fn ok_tx_single_epoch() {
1623 let (_dir, path, cfg) = new_tmp();
1624 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1625
1626 let mut tx = mmap.new_tx();
1627 unsafe {
1628 tx.write(0, |v| *v = 0x0A).unwrap();
1629 tx.write(1, |v| *v = 0x14).unwrap();
1630 }
1631
1632 // NOTE: next write must have strictly higher epoch then current one
1633
1634 let ticket = tx.commit().unwrap();
1635 let next_ticket = unsafe { mmap.write(2, |v| *v = 0x1E).unwrap() };
1636
1637 assert!(next_ticket.epoch() > ticket.epoch());
1638 }
1639
1640 #[test]
1641 fn err_tx_out_of_order() {
1642 let (_dir, path, cfg) = new_tmp();
1643 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1644
1645 let mut tx = mmap.new_tx();
1646 unsafe {
1647 tx.write(2, |v| *v = 1).unwrap();
1648 let res = tx.write(1, |v| *v = 2);
1649 assert!(res.is_err());
1650 }
1651 }
1652
1653 #[test]
1654 fn err_tx_duplicate_index() {
1655 let (_dir, path, cfg) = new_tmp();
1656 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1657
1658 let mut tx = mmap.new_tx();
1659 unsafe {
1660 tx.write(1, |v| *v = 1).unwrap();
1661 let res = tx.write(1, |v| *v = 2);
1662 assert!(res.is_err());
1663 }
1664 }
1665
1666 #[test]
1667 fn ok_tx_concurrent_non_overlapping() {
1668 let (_dir, path, cfg) = new_tmp();
1669 let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
1670
1671 let mut handles = Vec::new();
1672 for i in 0..2 {
1673 let mmap = mmap.clone();
1674 handles.push(thread::spawn(move || {
1675 let mut tx = mmap.new_tx();
1676
1677 unsafe {
1678 tx.write(i * 2, |v| *v = i as u64).unwrap();
1679 tx.write(i * 2 + 1, |v| *v = i as u64).unwrap();
1680 }
1681
1682 let ticket = tx.commit().unwrap();
1683
1684 let ticket_epoch = ticket.epoch();
1685 let durable_epoch = futures::executor::block_on(ticket).unwrap();
1686
1687 assert!(durable_epoch >= ticket_epoch);
1688 }));
1689 }
1690
1691 for h in handles {
1692 h.join().unwrap();
1693 }
1694
1695 for i in 0..2 {
1696 let v0 = unsafe { mmap.read(i * 2, |v| *v).unwrap() };
1697 let v1 = unsafe { mmap.read(i * 2 + 1, |v| *v).unwrap() };
1698
1699 assert_eq!((v0, v1), (i as u64, i as u64));
1700 }
1701 }
1702
1703 #[test]
1704 fn ok_tx_overwrite_last_wins() {
1705 let (_dir, path, cfg) = new_tmp();
1706 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1707
1708 let mut tx = mmap.new_tx();
1709 unsafe {
1710 tx.write(0, |v| *v = 1).unwrap();
1711 }
1712
1713 tx.commit().unwrap();
1714
1715 let mut tx2 = mmap.new_tx();
1716 unsafe {
1717 tx2.write(0, |v| *v = 2).unwrap();
1718 }
1719
1720 let ticket = tx2.commit().unwrap();
1721
1722 let ticket_epoch = ticket.epoch();
1723 let durable_epoch = futures::executor::block_on(ticket).unwrap();
1724
1725 assert!(durable_epoch >= ticket_epoch);
1726
1727 let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1728 assert_eq!(val, 2);
1729 }
1730
1731 #[test]
1732 fn ok_tx_persists_across_reopen() {
1733 let (_dir, path, cfg) = new_tmp();
1734
1735 {
1736 let mmap = FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap();
1737
1738 let mut tx = mmap.new_tx();
1739 unsafe {
1740 tx.write(0, |v| *v = 0x3A).unwrap();
1741 tx.write(1, |v| *v = 0x54).unwrap();
1742 }
1743
1744 let ticket = tx.commit().unwrap();
1745
1746 let ticket_epoch = ticket.epoch();
1747 let durable_epoch = futures::executor::block_on(ticket).unwrap();
1748
1749 assert!(durable_epoch >= ticket_epoch);
1750 }
1751
1752 {
1753 let mmap = FrozenMMap::<u64>::new(&path, cfg).unwrap();
1754
1755 let v0 = unsafe { mmap.read(0, |v| *v).unwrap() };
1756 let v1 = unsafe { mmap.read(1, |v| *v).unwrap() };
1757
1758 assert_eq!((v0, v1), (0x3A, 0x54));
1759 }
1760 }
1761 }
1762
1763 mod fm_durability {
1764 use super::*;
1765
1766 #[test]
1767 fn ok_wait_then_drop() {
1768 let (_dir, path, cfg) = new_tmp();
1769 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1770
1771 let ticket = unsafe { mmap.write(0, |v| *v = 7).unwrap() };
1772
1773 let ticket_epoch = ticket.epoch();
1774 let durable_epoch = futures::executor::block_on(ticket).unwrap();
1775
1776 assert!(durable_epoch >= ticket_epoch);
1777
1778 drop(mmap);
1779 }
1780
1781 #[test]
1782 fn ok_epoch_monotonicity() {
1783 let (_dir, path, cfg) = new_tmp();
1784 let mmap = FrozenMMap::<u64>::new(path, cfg).unwrap();
1785
1786 let t1 = unsafe { mmap.write(0, |v| *v = 1).unwrap() };
1787 let t2 = unsafe { mmap.write(0, |v| *v = 2).unwrap() };
1788
1789 let durable_epoch = futures::executor::block_on(t2).unwrap();
1790 assert!(durable_epoch >= t1.epoch());
1791 }
1792
1793 #[test]
1794 fn ok_wait_for_durability_with_multi_writers() {
1795 let (_dir, path, cfg) = new_tmp();
1796 let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
1797
1798 let mut handles = Vec::new();
1799 for _ in 0..2 {
1800 let mmap = mmap.clone();
1801 handles.push(thread::spawn(move || {
1802 let ticket = unsafe { mmap.write(0, |v| *v += 1).unwrap() };
1803
1804 let ticket_epoch = ticket.epoch();
1805 let durable_epoch = futures::executor::block_on(ticket).unwrap();
1806
1807 assert!(durable_epoch >= ticket_epoch);
1808 }));
1809 }
1810
1811 for h in handles {
1812 h.join().unwrap();
1813 }
1814
1815 let val = unsafe { mmap.read(0, |v| *v).unwrap() };
1816 assert_eq!(val, 2);
1817 }
1818 }
1819
1820 mod fm_concurrency {
1821 use super::*;
1822
1823 #[test]
1824 fn ok_parallel_reads_with_diff_index() {
1825 let (_dir, path, cfg) = new_tmp();
1826 let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
1827
1828 unsafe { mmap.write(0, |v| *v = 0x10).unwrap() };
1829 unsafe { mmap.write(1, |v| *v = 0x20).unwrap() };
1830
1831 let t1 = {
1832 let mmap = mmap.clone();
1833 thread::spawn(move || unsafe { mmap.read(0, |v| *v).unwrap() })
1834 };
1835
1836 let t2 = {
1837 let mmap = mmap.clone();
1838 thread::spawn(move || unsafe { mmap.read(1, |v| *v).unwrap() })
1839 };
1840
1841 assert_eq!(t1.join().unwrap(), 0x10);
1842 assert_eq!(t2.join().unwrap(), 0x20);
1843 }
1844
1845 #[test]
1846 fn ok_multi_tx_drop_then_reopen_grown() {
1847 let (_dir, path, cfg) = new_tmp();
1848
1849 {
1850 let mmap = sync::Arc::new(FrozenMMap::<u64>::new(&path, cfg.clone()).unwrap());
1851
1852 let mut handles = Vec::new();
1853 for i in 0..2u64 {
1854 let mmap = mmap.clone();
1855 handles.push(thread::spawn(move || {
1856 let _ = unsafe { mmap.write(i as usize, |v| *v = i + 1).unwrap() };
1857 }));
1858 }
1859
1860 for i in 0..2usize {
1861 let mmap = mmap.clone();
1862 handles.push(thread::spawn(move || {
1863 let _ = unsafe { mmap.read(i, |v| *v).unwrap() };
1864 }));
1865 }
1866
1867 for h in handles {
1868 h.join().unwrap();
1869 }
1870 }
1871
1872 {
1873 let mmap = FrozenMMap::<u64>::new_grown(&path, cfg.clone(), 0x10).unwrap();
1874 assert_eq!(mmap.total_slots(), INIT_SLOTS + 0x10);
1875
1876 for i in 0..2u64 {
1877 let val = unsafe { mmap.read(i as usize, |v| *v).unwrap() };
1878 assert_eq!(val, i + 1);
1879 }
1880
1881 let idx = mmap.total_slots() - 1;
1882 unsafe { mmap.write(idx, |v| *v = 0xDEAD).unwrap() };
1883
1884 let val = unsafe { mmap.read(idx, |v| *v).unwrap() };
1885 assert_eq!(val, 0xDEAD);
1886 }
1887 }
1888 }
1889
1890 mod ack_ticket {
1891 use super::*;
1892
1893 #[test]
1894 fn ok_parallel_waiters_same_epoch_window() {
1895 let (_dir, path, cfg) = new_tmp();
1896 let mmap = sync::Arc::new(FrozenMMap::<u64>::new(path, cfg).unwrap());
1897
1898 let barrier = sync::Arc::new(sync::Barrier::new(0x10));
1899
1900 let mut handles = Vec::new();
1901 for i in 0..0x10usize {
1902 let mmap = mmap.clone();
1903 let barrier = barrier.clone();
1904
1905 handles.push(thread::spawn(move || {
1906 barrier.wait();
1907
1908 let ticket = unsafe { mmap.write(i % INIT_SLOTS, |v| *v = i as u64).unwrap() };
1909
1910 let epoch = ticket.epoch();
1911 let durable = futures::executor::block_on(ticket).unwrap();
1912
1913 assert!(durable >= epoch);
1914 }));
1915 }
1916
1917 for handle in handles {
1918 handle.join().expect("worker thread panicked");
1919 }
1920 }
1921
1922 #[test]
1923 fn ok_parallel_waiters_same_epoch() {
1924 let completion = sync::Arc::new(ack::Completion::default());
1925
1926 let mut handles = Vec::new();
1927 for _ in 0..0x10 {
1928 let completion = completion.clone();
1929
1930 handles.push(thread::spawn(move || {
1931 let ticket = ack::AckTicket::new(1, completion);
1932
1933 assert_eq!(
1934 futures::executor::block_on(ticket).expect("ticket must complete"),
1935 1
1936 );
1937 }));
1938 }
1939
1940 thread::sleep(time::Duration::from_millis(0x0A));
1941
1942 completion.mark_epoch_as_durable(1);
1943 completion.notify_all_listeners();
1944
1945 for handle in handles {
1946 handle.join().expect("worker thread panicked");
1947 }
1948 }
1949 }
1950}