use super::{new_error, FMErr, FRes};
use libc::{
c_void, mmap, msync, munmap, off_t, size_t, EACCES, EBADF, EBUSY, EINVAL, EIO, ENOMEM, EOVERFLOW, MAP_FAILED,
MAP_SHARED, MS_SYNC, PROT_READ, PROT_WRITE,
};
use std::sync::atomic;
pub(crate) struct MMap {
ptr: *mut c_void,
unmapped: atomic::AtomicBool,
}
unsafe impl Send for MMap {}
unsafe impl Sync for MMap {}
impl MMap {
pub(crate) unsafe fn new(fd: i32, length: size_t, mid: u8) -> FRes<Self> {
let ptr = mmap(
std::ptr::null_mut(),
length,
PROT_WRITE | PROT_READ,
MAP_SHARED,
fd,
0 as off_t,
);
if ptr == MAP_FAILED {
let error = last_os_error();
let err_raw = error.raw_os_error();
if err_raw == Some(EINVAL)
|| err_raw == Some(EBADF)
|| err_raw == Some(EACCES)
|| err_raw == Some(EOVERFLOW)
{
return new_error(mid, FMErr::Hcf, error);
}
if err_raw == Some(ENOMEM) {
return new_error(mid, FMErr::Nmm, error);
}
return new_error(mid, FMErr::Unk, error);
}
return Ok(Self {
ptr,
unmapped: atomic::AtomicBool::new(false),
});
}
pub(crate) unsafe fn unmap(&self, length: usize, mid: u8) -> FRes<()> {
if self
.unmapped
.compare_exchange(false, true, atomic::Ordering::AcqRel, atomic::Ordering::Acquire)
.is_err()
{
return Ok(());
}
if munmap(self.ptr, length) != 0 {
let error = last_os_error();
let err_raw = error.raw_os_error();
if err_raw == Some(EINVAL) || err_raw == Some(ENOMEM) {
return new_error(mid, FMErr::Hcf, error);
}
return new_error(mid, FMErr::Unk, error);
}
Ok(())
}
pub(crate) unsafe fn sync(&self, length: usize, mid: u8) -> FRes<()> {
const MAX_RETRIES: usize = 4;
let mut retries = 0;
loop {
if msync(self.ptr, length, MS_SYNC) != 0 {
let error = last_os_error();
let error_raw = error.raw_os_error();
if error.kind() == std::io::ErrorKind::Interrupted {
continue;
}
if error_raw == Some(ENOMEM) || error_raw == Some(EINVAL) {
return new_error(mid, FMErr::Hcf, error);
}
if error_raw == Some(EIO) || error_raw == Some(EBUSY) {
if retries < MAX_RETRIES {
retries += 1;
std::thread::yield_now();
continue;
}
return new_error(mid, FMErr::Syn, error);
}
return new_error(mid, FMErr::Unk, error);
}
return Ok(());
}
}
#[inline]
pub(crate) unsafe fn get<T>(&self, offset: usize) -> *const T {
debug_assert!(offset % 8 == 0, "Offset must be 8 bytes aligned");
debug_assert!(
!self.unmapped.load(atomic::Ordering::Acquire),
"Trying to access dropped mmap"
);
self.ptr.add(offset) as *const T
}
#[inline]
pub(crate) unsafe fn get_mut<T>(&self, offset: usize) -> *mut T {
debug_assert!(offset % 0x8 == 0, "Offset must be 8 bytes aligned");
debug_assert!(
!self.unmapped.load(atomic::Ordering::Acquire),
"Trying to access dropped mmap"
);
self.ptr.add(offset) as *mut T
}
}
#[inline]
fn last_os_error() -> std::io::Error {
std::io::Error::last_os_error()
}
#[cfg(all(test, target_os = "linux"))]
mod tests {
use super::*;
use crate::fe::FECheckOk;
use crate::ff::{FFCfg, FF};
use std::path::PathBuf;
use tempfile::{tempdir, TempDir};
const MID: u8 = 0x00; const LEN: usize = 0x80;
fn get_ff_cfg(path: PathBuf) -> FFCfg {
FFCfg {
path,
module_id: MID,
auto_flush: false,
flush_duration: std::time::Duration::from_secs(1),
}
}
fn new_tmp() -> (TempDir, PathBuf, FF, MMap) {
let dir = tempdir().expect("temp dir");
let tmp = dir.path().join("tmp_file");
unsafe {
let file = FF::new(get_ff_cfg(tmp.clone()), LEN as u64).expect("new FF");
let mmap = MMap::new(file.fd(), LEN, MID).expect("new MMap");
(dir, tmp, file, mmap)
}
}
mod map_unmap {
use super::*;
#[test]
fn map_unmap_cycle() {
let (_dir, _tmp, _file, map) = new_tmp();
assert!(!map.ptr.is_null());
assert!(unsafe { map.unmap(LEN, MID) }.check_ok());
}
#[test]
fn map_fails_on_invalid_fd() {
unsafe { assert!(MMap::new(-1, LEN, MID).is_err()) };
}
#[test]
fn unmap_after_unmap_does_not_fails() {
let (_dir, _tmp, _file, map) = new_tmp();
unsafe {
assert!(map.unmap(LEN, MID).check_ok());
assert!(map.unmap(LEN, MID).check_ok());
assert!(map.unmap(LEN, MID).check_ok());
}
}
}
mod sync {
use super::*;
#[test]
fn msync_works() {
let (_dir, _tmp, _file, map) = new_tmp();
unsafe {
assert!(map.sync(LEN, MID).check_ok());
assert!(map.unmap(LEN, MID).check_ok());
}
}
}
mod write_read {
use super::*;
#[test]
fn write_read_cycle() {
let (_dir, _tmp, _file, map) = new_tmp();
unsafe {
let ptr = map.get_mut::<u64>(0);
*ptr = 0xDEAD_C0DE_DEAD_C0DE;
assert!(map.sync(LEN, MID).check_ok());
let val = *map.get::<u64>(0);
assert_eq!(val, 0xDEAD_C0DE_DEAD_C0DE);
assert!(map.unmap(LEN, MID).check_ok());
}
}
#[test]
fn write_read_across_sessions() {
let (_dir, tmp, file, map) = new_tmp();
unsafe {
let ptr = map.get_mut::<u64>(0);
*ptr = 0xDEAD_C0DE_DEAD_C0DE;
assert!(map.sync(LEN, MID).check_ok());
assert!(map.unmap(LEN, MID).check_ok());
drop(file);
}
unsafe {
let file = FF::open(get_ff_cfg(tmp)).expect("open existing");
let map = MMap::new(file.fd(), LEN, MID).expect("new mmap");
let val = *map.get::<u64>(0);
assert_eq!(val, 0xDEAD_C0DE_DEAD_C0DE);
assert!(map.unmap(LEN, MID).check_ok());
}
}
#[test]
fn mmap_write_is_in_synced_with_file_read() {
let (_dir, _tmp, file, map) = new_tmp();
unsafe {
let ptr = map.get_mut::<u64>(0);
*ptr = 0xDEAD_C0DE_DEAD_C0DE;
assert!(map.sync(LEN, MID).check_ok());
let mut buf = [0u8; 8];
file.read(buf.as_mut_ptr(), 0, 8).expect("failed to read");
assert_eq!(u64::from_le_bytes(buf), 0xDEAD_C0DE_DEAD_C0DE);
assert!(map.unmap(LEN, MID).check_ok());
}
}
}
mod concurrency {
use super::*;
#[test]
fn munmap_is_thread_safe() {
let (_dir, _tmp, _file, map) = new_tmp();
let mut handles = Vec::new();
let map = std::sync::Arc::new(map);
for _ in 0..8 {
let m = map.clone();
handles.push(std::thread::spawn(move || unsafe {
assert!(m.unmap(LEN, MID).check_ok());
}));
}
for h in handles {
assert!(h
.join()
.map_err(|e| {
eprintln!("\n{:?}\n", e);
})
.is_ok());
}
}
#[test]
fn msync_is_thread_safe() {
let (_dir, _tmp, _file, map) = new_tmp();
let mut handles = Vec::new();
let map = std::sync::Arc::new(map);
unsafe {
*map.get_mut::<u64>(0) = 42;
}
for _ in 0..8 {
let m = map.clone();
handles.push(std::thread::spawn(move || unsafe {
assert!(m.sync(LEN, MID).check_ok());
}));
}
for h in handles {
assert!(h
.join()
.map_err(|e| {
eprintln!("\n{:?}\n", e);
})
.is_ok());
}
unsafe {
assert_eq!(*map.get::<u64>(0), 42);
assert!(map.unmap(LEN, MID).check_ok());
}
}
#[test]
fn concurrent_writes_then_sync() {
let (_dir, _tmp, _file, map) = new_tmp();
let mut handles = Vec::new();
let map = std::sync::Arc::new(map);
for i in 0..8u64 {
let m = map.clone();
handles.push(std::thread::spawn(move || unsafe {
let ptr = m.get_mut::<u64>(0);
*ptr = i;
}));
}
for h in handles {
assert!(h
.join()
.map_err(|e| {
eprintln!("\n{:?}\n", e);
})
.is_ok());
}
unsafe {
assert!(map.sync(LEN, MID).check_ok());
assert!(map.unmap(LEN, MID).check_ok());
}
}
}
}