use crate::errors::{MmapIoError, Result};
use crate::mmap::{MapVariant, MemoryMappedFile};
use memmap2::MmapMut;
use parking_lot::RwLockReadGuard;
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::atomic::{AtomicU32, AtomicU64};
#[allow(dead_code)]
enum ViewGuard<'a> {
Locked(RwLockReadGuard<'a, MmapMut>),
None,
}
pub struct AtomicView<'a, T> {
_guard: ViewGuard<'a>,
ptr: *const T,
_marker: PhantomData<&'a T>,
}
unsafe impl<T: Sync> Send for AtomicView<'_, T> {}
unsafe impl<T: Sync> Sync for AtomicView<'_, T> {}
impl<T> Deref for AtomicView<'_, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.ptr }
}
}
pub struct AtomicSliceView<'a, T> {
_guard: ViewGuard<'a>,
ptr: *const T,
len: usize,
_marker: PhantomData<&'a [T]>,
}
unsafe impl<T: Sync> Send for AtomicSliceView<'_, T> {}
unsafe impl<T: Sync> Sync for AtomicSliceView<'_, T> {}
impl<T> Deref for AtomicSliceView<'_, T> {
type Target = [T];
fn deref(&self) -> &[T] {
unsafe { std::slice::from_raw_parts(self.ptr, self.len) }
}
}
fn base_ptr_and_guard(mapping: &MemoryMappedFile) -> (*const u8, ViewGuard<'_>) {
match &mapping.inner.map {
MapVariant::Ro(m) => (m.as_ptr(), ViewGuard::None),
MapVariant::Rw(lock) => {
let guard = lock.read();
let ptr = guard.as_ptr();
(ptr, ViewGuard::Locked(guard))
}
MapVariant::Cow(m) => (m.as_ptr(), ViewGuard::None),
}
}
impl MemoryMappedFile {
#[cfg(feature = "atomic")]
pub fn atomic_u64(&self, offset: u64) -> Result<AtomicView<'_, AtomicU64>> {
const ALIGN: u64 = std::mem::align_of::<AtomicU64>() as u64;
const SIZE: u64 = std::mem::size_of::<AtomicU64>() as u64;
if offset % ALIGN != 0 {
return Err(MmapIoError::Misaligned {
required: ALIGN,
offset,
});
}
let total = self.current_len()?;
if offset.saturating_add(SIZE) > total {
return Err(MmapIoError::OutOfBounds {
offset,
len: SIZE,
total,
});
}
let offset_usize: usize = offset.try_into().map_err(|_| MmapIoError::OutOfBounds {
offset,
len: SIZE,
total,
})?;
let (base, guard) = base_ptr_and_guard(self);
let ptr = unsafe { base.add(offset_usize) as *const AtomicU64 };
Ok(AtomicView {
_guard: guard,
ptr,
_marker: PhantomData,
})
}
#[cfg(feature = "atomic")]
pub fn atomic_u32(&self, offset: u64) -> Result<AtomicView<'_, AtomicU32>> {
const ALIGN: u64 = std::mem::align_of::<AtomicU32>() as u64;
const SIZE: u64 = std::mem::size_of::<AtomicU32>() as u64;
if offset % ALIGN != 0 {
return Err(MmapIoError::Misaligned {
required: ALIGN,
offset,
});
}
let total = self.current_len()?;
if offset.saturating_add(SIZE) > total {
return Err(MmapIoError::OutOfBounds {
offset,
len: SIZE,
total,
});
}
let offset_usize: usize = offset.try_into().map_err(|_| MmapIoError::OutOfBounds {
offset,
len: SIZE,
total,
})?;
let (base, guard) = base_ptr_and_guard(self);
let ptr = unsafe { base.add(offset_usize) as *const AtomicU32 };
Ok(AtomicView {
_guard: guard,
ptr,
_marker: PhantomData,
})
}
#[cfg(feature = "atomic")]
pub fn atomic_u64_slice(
&self,
offset: u64,
count: usize,
) -> Result<AtomicSliceView<'_, AtomicU64>> {
const ALIGN: u64 = std::mem::align_of::<AtomicU64>() as u64;
const SIZE: u64 = std::mem::size_of::<AtomicU64>() as u64;
if offset % ALIGN != 0 {
return Err(MmapIoError::Misaligned {
required: ALIGN,
offset,
});
}
let total_size = SIZE.saturating_mul(count as u64);
let total = self.current_len()?;
if offset.saturating_add(total_size) > total {
return Err(MmapIoError::OutOfBounds {
offset,
len: total_size,
total,
});
}
let offset_usize: usize = offset.try_into().map_err(|_| MmapIoError::OutOfBounds {
offset,
len: total_size,
total,
})?;
let (base, guard) = base_ptr_and_guard(self);
let ptr = unsafe { base.add(offset_usize) as *const AtomicU64 };
Ok(AtomicSliceView {
_guard: guard,
ptr,
len: count,
_marker: PhantomData,
})
}
#[cfg(feature = "atomic")]
pub fn atomic_u32_slice(
&self,
offset: u64,
count: usize,
) -> Result<AtomicSliceView<'_, AtomicU32>> {
const ALIGN: u64 = std::mem::align_of::<AtomicU32>() as u64;
const SIZE: u64 = std::mem::size_of::<AtomicU32>() as u64;
if offset % ALIGN != 0 {
return Err(MmapIoError::Misaligned {
required: ALIGN,
offset,
});
}
let total_size = SIZE.saturating_mul(count as u64);
let total = self.current_len()?;
if offset.saturating_add(total_size) > total {
return Err(MmapIoError::OutOfBounds {
offset,
len: total_size,
total,
});
}
let offset_usize: usize = offset.try_into().map_err(|_| MmapIoError::OutOfBounds {
offset,
len: total_size,
total,
})?;
let (base, guard) = base_ptr_and_guard(self);
let ptr = unsafe { base.add(offset_usize) as *const AtomicU32 };
Ok(AtomicSliceView {
_guard: guard,
ptr,
len: count,
_marker: PhantomData,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::create_mmap;
use std::fs;
use std::path::PathBuf;
use std::sync::atomic::Ordering;
fn tmp_path(name: &str) -> PathBuf {
let mut p = std::env::temp_dir();
p.push(format!(
"mmap_io_atomic_test_{}_{}",
name,
std::process::id()
));
p
}
#[test]
#[cfg(feature = "atomic")]
fn test_atomic_u64_operations() {
let path = tmp_path("atomic_u64");
let _ = fs::remove_file(&path);
let mmap = create_mmap(&path, 64).expect("create");
let atomic = mmap.atomic_u64(0).expect("atomic at 0");
atomic.store(0x1234567890ABCDEF, Ordering::SeqCst);
assert_eq!(atomic.load(Ordering::SeqCst), 0x1234567890ABCDEF);
let atomic2 = mmap.atomic_u64(8).expect("atomic at 8");
atomic2.store(0xFEDCBA0987654321, Ordering::SeqCst);
assert_eq!(atomic2.load(Ordering::SeqCst), 0xFEDCBA0987654321);
assert!(matches!(
mmap.atomic_u64(1),
Err(MmapIoError::Misaligned {
required: 8,
offset: 1
})
));
assert!(matches!(
mmap.atomic_u64(7),
Err(MmapIoError::Misaligned {
required: 8,
offset: 7
})
));
assert!(mmap.atomic_u64(64).is_err());
assert!(mmap.atomic_u64(57).is_err());
drop(atomic);
drop(atomic2);
fs::remove_file(&path).expect("cleanup");
}
#[test]
#[cfg(feature = "atomic")]
fn test_atomic_u32_operations() {
let path = tmp_path("atomic_u32");
let _ = fs::remove_file(&path);
let mmap = create_mmap(&path, 32).expect("create");
let atomic = mmap.atomic_u32(0).expect("atomic at 0");
atomic.store(0x12345678, Ordering::SeqCst);
assert_eq!(atomic.load(Ordering::SeqCst), 0x12345678);
let atomic2 = mmap.atomic_u32(4).expect("atomic at 4");
atomic2.store(0x87654321, Ordering::SeqCst);
assert_eq!(atomic2.load(Ordering::SeqCst), 0x87654321);
assert!(matches!(
mmap.atomic_u32(1),
Err(MmapIoError::Misaligned {
required: 4,
offset: 1
})
));
assert!(matches!(
mmap.atomic_u32(3),
Err(MmapIoError::Misaligned {
required: 4,
offset: 3
})
));
assert!(mmap.atomic_u32(32).is_err());
assert!(mmap.atomic_u32(29).is_err());
drop(atomic);
drop(atomic2);
fs::remove_file(&path).expect("cleanup");
}
#[test]
#[cfg(feature = "atomic")]
fn test_atomic_slices() {
let path = tmp_path("atomic_slices");
let _ = fs::remove_file(&path);
let mmap = create_mmap(&path, 128).expect("create");
let u64_slice = mmap.atomic_u64_slice(0, 4).expect("u64 slice");
assert_eq!(u64_slice.len(), 4);
for (i, atomic) in u64_slice.iter().enumerate() {
atomic.store(i as u64 * 100, Ordering::SeqCst);
}
for (i, atomic) in u64_slice.iter().enumerate() {
assert_eq!(atomic.load(Ordering::SeqCst), i as u64 * 100);
}
drop(u64_slice);
let u32_slice = mmap.atomic_u32_slice(64, 8).expect("u32 slice");
assert_eq!(u32_slice.len(), 8);
for (i, atomic) in u32_slice.iter().enumerate() {
atomic.store(i as u32 * 10, Ordering::SeqCst);
}
for (i, atomic) in u32_slice.iter().enumerate() {
assert_eq!(atomic.load(Ordering::SeqCst), i as u32 * 10);
}
drop(u32_slice);
assert!(mmap.atomic_u64_slice(1, 2).is_err());
assert!(mmap.atomic_u32_slice(2, 2).is_err());
assert!(mmap.atomic_u64_slice(120, 2).is_err());
assert!(mmap.atomic_u32_slice(124, 2).is_err());
fs::remove_file(&path).expect("cleanup");
}
#[test]
#[cfg(feature = "atomic")]
fn test_atomic_with_different_modes() {
let path = tmp_path("atomic_modes");
let _ = fs::remove_file(&path);
let mmap = create_mmap(&path, 16).expect("create");
{
let atomic = mmap.atomic_u64(0).expect("atomic");
atomic.store(42, Ordering::SeqCst);
}
mmap.flush().expect("flush");
drop(mmap);
let mmap = MemoryMappedFile::open_ro(&path).expect("open ro");
{
let atomic = mmap.atomic_u64(0).expect("atomic ro");
assert_eq!(atomic.load(Ordering::SeqCst), 42);
}
drop(mmap);
#[cfg(feature = "cow")]
{
let mmap = MemoryMappedFile::open_cow(&path).expect("open cow");
{
let atomic = mmap.atomic_u64(0).expect("atomic cow");
assert_eq!(atomic.load(Ordering::SeqCst), 42);
}
drop(mmap);
}
fs::remove_file(&path).expect("cleanup");
}
#[test]
#[cfg(feature = "atomic")]
fn test_concurrent_atomic_access() {
use std::sync::Arc;
use std::thread;
let path = tmp_path("concurrent_atomic");
let _ = fs::remove_file(&path);
let mmap = Arc::new(create_mmap(&path, 8).expect("create"));
{
let atomic = mmap.atomic_u64(0).expect("atomic");
atomic.store(0, Ordering::SeqCst);
}
let handles: Vec<_> = (0..4)
.map(|_| {
let mmap = Arc::clone(&mmap);
thread::spawn(move || {
let atomic = mmap.atomic_u64(0).expect("atomic in thread");
for _ in 0..1000 {
atomic.fetch_add(1, Ordering::SeqCst);
}
})
})
.collect();
for handle in handles {
handle.join().expect("thread join");
}
let atomic = mmap.atomic_u64(0).expect("atomic final");
assert_eq!(atomic.load(Ordering::SeqCst), 4000);
drop(atomic);
drop(Arc::try_unwrap(mmap).ok());
let _ = fs::remove_file(&path);
}
}