use std::fmt;
use std::io;
use std::ops::Deref;
use std::sync::atomic::{
AtomicU64,
Ordering::{Acquire, SeqCst},
};
use std::sync::Arc;
#[cfg(unix)] use libc;
#[derive(Debug)]
pub struct MemAdviseError {
ecode: i32,
}
impl From<MemAdviseError> for io::Error {
fn from(me: MemAdviseError) -> io::Error {
io::Error::new(io::ErrorKind::Other, me)
}
}
impl fmt::Display for MemAdviseError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "libc::posix_madvise error return code {}", self.ecode)
}
}
impl std::error::Error for MemAdviseError {}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
#[repr(u64)]
pub enum MemAdvice {
Normal = 0, Random = 0x003FF, Sequential = 0xFFC00, }
impl From<u64> for MemAdvice {
fn from(v: u64) -> Self {
match v {
0 => MemAdvice::Normal,
0x003FF => MemAdvice::Random,
0xFFC00 => MemAdvice::Sequential,
_ => unreachable!("not a MemAdvice repr!"),
}
}
}
#[derive(Debug)]
pub struct MemHandle<T>
where T: Deref<Target=[u8]>
{
mem: Arc<Mem<T>>,
advice: AtomicU64,
}
impl<T> MemHandle<T>
where T: Deref<Target=[u8]>
{
pub fn new(mem: T) -> MemHandle<T> {
MemHandle {
mem: Arc::new(Mem::new(mem)),
advice: AtomicU64::new(MemAdvice::Normal as u64)
}
}
pub fn advise(&self, advice: MemAdvice)
-> Result<MemAdvice, MemAdviseError>
{
let prior = self.advice.swap(advice as u64, SeqCst).into();
if advice == prior {
Ok(prior)
} else {
self.mem.adjust_advice(prior, advice)
}
}
}
impl<T> Clone for MemHandle<T>
where T: Deref<Target=[u8]>
{
fn clone(&self) -> MemHandle<T> {
MemHandle {
mem: self.mem.clone(),
advice: AtomicU64::new(MemAdvice::Normal as u64)
}
}
}
impl<T> Drop for MemHandle<T>
where T: Deref<Target=[u8]>
{
fn drop(&mut self) {
let advice = self.advice.load(Acquire).into();
if advice != MemAdvice::Normal {
self.mem.adjust_advice(advice, MemAdvice::Normal).ok();
}
}
}
impl<T> Deref for MemHandle<T>
where T: Deref<Target=[u8]>
{
type Target = [u8];
fn deref(&self) -> &[u8] {
&self.mem
}
}
#[derive(Debug)]
struct Mem<T>
where T: Deref<Target=[u8]>
{
mem: T,
advisors: AtomicU64,
}
impl<T> Mem<T>
where T: Deref<Target=[u8]>
{
fn new(mem: T) -> Mem<T> {
Mem { mem, advisors: AtomicU64::new(0) }
}
fn adjust_advice(&self, prior: MemAdvice, advice: MemAdvice)
-> Result<MemAdvice, MemAdviseError>
{
debug_assert!(prior != advice);
let mut adv = self.advisors.load(Acquire);
loop {
let old_top = top_most(adv);
let new_adv = decr_advisors(adv, prior);
let new_adv = incr_advisors(new_adv, advice);
let new_top = top_most(new_adv);
match self.advisors.compare_exchange_weak(
adv, new_adv, SeqCst, Acquire)
{
Ok(_) => {
if new_top != old_top {
advise(&self.mem, new_top)?;
return Ok(new_top);
}
return Ok(new_top);
}
Err(x) => adv = x
}
}
}
}
impl<T> Deref for Mem<T>
where T: Deref<Target=[u8]>
{
type Target = [u8];
fn deref(&self) -> &[u8] {
&self.mem
}
}
fn decr_advisors(mut advisors: u64, prior: MemAdvice) -> u64 {
if prior != MemAdvice::Normal {
let mut p = advisors & (prior as u64);
advisors -= p;
if prior == MemAdvice::Sequential { p >>= 10; }
if p > 0 { p -= 1; }
if prior == MemAdvice::Sequential { p <<= 10; }
advisors |= p;
}
advisors
}
fn incr_advisors(mut advisors: u64, advice: MemAdvice) -> u64 {
let mut cur = advisors & (advice as u64);
advisors -= cur;
match advice {
MemAdvice::Normal => {
advisors
}
MemAdvice::Random => {
if cur < 0x3FF { cur += 1; }
advisors | cur
}
MemAdvice::Sequential => {
cur >>= 10;
if cur < 0x3FF { cur += 1; }
cur <<= 10;
advisors | cur
}
}
}
fn top_most(advisors: u64) -> MemAdvice {
if (advisors & (MemAdvice::Sequential as u64)) > 0 {
MemAdvice::Sequential
} else if (advisors & (MemAdvice::Random as u64)) > 0 {
MemAdvice::Random
} else {
MemAdvice::Normal
}
}
#[cfg(unix)]
fn advise<T>(mem: &T, advice: MemAdvice) -> Result<(), MemAdviseError>
where T: Deref<Target=[u8]>
{
let flags: libc::c_int = match advice {
MemAdvice::Normal => libc::POSIX_MADV_NORMAL,
MemAdvice::Random => libc::POSIX_MADV_RANDOM,
MemAdvice::Sequential => libc::POSIX_MADV_SEQUENTIAL,
};
let ptr = &(mem[0]) as *const u8 as *mut libc::c_void;
let res = unsafe { libc::posix_madvise(ptr, mem.len(), flags) };
if res == 0 {
Ok(())
} else {
Err(MemAdviseError { ecode: res })
}
}
#[cfg(not(unix))]
fn advise<T>(_mem: &T, _advice: MemAdvice) -> Result<(), MemAdviseError>
where T: Deref<Target=[u8]>
{
Ok(())
}
#[cfg(test)]
mod tests {
use std::mem;
use crate::mem::MemHandle;
#[test]
fn test_with_any_deref() {
let _m = MemHandle::new(vec![0u8; 1024]);
}
fn is_send<T: Send>() -> bool { true }
fn is_sync<T: Sync>() -> bool { true }
#[test]
fn test_send_sync() {
assert!(is_send::<MemHandle<Vec<u8>>>());
assert!(is_sync::<MemHandle<Vec<u8>>>());
}
#[test]
#[cfg(target_pointer_width = "64")]
fn test_size() {
assert_eq!(mem::size_of::<MemHandle<Vec<u8>>>(), 16);
}
#[cfg(feature = "mmap")]
mod mmap {
use std::io::Write;
use std::thread;
use tempfile::tempfile;
use memmap::Mmap;
use rand::seq::SliceRandom;
use crate::mem::MemHandle;
use crate::mem::MemAdvice::*;
#[test]
fn test_advise_one() {
let map = {
let mut f = tempfile().unwrap();
f.write_all(&vec![1u8; 256 * 1024]).unwrap();
unsafe { Mmap::map(&f) }.unwrap()
};
let mem = MemHandle::new(map);
assert_eq!(mem.advise(Normal).unwrap(), Normal);
assert_eq!(mem.advise(Random).unwrap(), Random);
assert_eq!(mem.advise(Random).unwrap(), Random);
assert_eq!(1u8, mem[0]);
assert_eq!(mem.advise(Sequential).unwrap(), Sequential);
assert_eq!(1u8, mem[128*1024-1]);
assert_eq!(mem.advise(Random).unwrap(), Random);
assert_eq!(1u8, mem[256*1024-1]);
assert_eq!(mem.advise(Normal).unwrap(), Normal);
}
#[test]
fn test_advise_two_random() {
let map = {
let mut f = tempfile().unwrap();
f.write_all(&vec![1u8; 256 * 1024]).unwrap();
unsafe { Mmap::map(&f) }.unwrap()
};
let h1 = MemHandle::new(map);
let h2 = h1.clone();
assert_eq!(h1.advise(Sequential).unwrap(), Sequential);
assert_eq!(1u8, h1[0]);
assert_eq!(h2.advise(Random).unwrap(), Sequential);
assert_eq!(1u8, h2[128*1024-1]);
drop(h1);
assert_eq!(h2.advise(Random).unwrap(), Random);
}
#[test]
fn test_advise_two_normal() {
let map = {
let mut f = tempfile().unwrap();
f.write_all(&vec![1u8; 256 * 1024]).unwrap();
unsafe { Mmap::map(&f) }.unwrap()
};
let h1 = MemHandle::new(map);
let h2 = h1.clone();
assert_eq!(h1.advise(Sequential).unwrap(), Sequential);
drop(h1);
assert_eq!(h2.advise(Normal).unwrap(), Normal);
}
#[test]
fn test_advise_three() {
let map = {
let mut f = tempfile().unwrap();
f.write_all(&vec![1u8; 256 * 1024]).unwrap();
unsafe { Mmap::map(&f) }.unwrap()
};
let h1 = MemHandle::new(map);
let h2 = h1.clone();
let h3 = h2.clone();
assert_eq!(h1.advise(Sequential).unwrap(), Sequential);
assert_eq!(h2.advise(Random).unwrap(), Sequential);
assert_eq!(h3.advise(Random).unwrap(), Sequential);
drop(h1); assert_eq!(h3.advise(Normal).unwrap(), Random); }
#[test]
fn test_advise_threaded() {
let mut rng = rand::thread_rng();
let one_of = vec![Normal, Random, Sequential];
let map = {
let mut f = tempfile().unwrap();
f.write_all(&vec![2u8; 64 * 1024]).unwrap();
unsafe { Mmap::map(&f) }.unwrap()
};
let h0 = MemHandle::new(map);
for _ in 0..47 {
let mut threads = Vec::with_capacity(100);
let advices = (0..13).map(|_| {
*one_of.choose(&mut rng).unwrap()
});
for advice in advices {
let hc = h0.clone();
threads.push(thread::spawn(move || {
let res = hc.advise(advice).expect("advise");
assert!(res >= advice);
}));
}
for t in threads {
t.join().unwrap();
}
}
}
}
}