use std::mem::{size_of, transmute};
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard};
#[cfg(unix)]
pub use memmap2::Advice;
use memmap2::MmapMut;
use crate::traits::{open_cache_file, Cache, CacheStore};
use crate::{OsmNodeCacheError, OsmNodeCacheResult};
pub type OnSizeChange = fn(old_size: usize, new_size: usize) -> ();
#[derive(Clone)]
pub struct DenseFileCacheOpts {
filename: Arc<PathBuf>,
write: bool,
autogrow: bool,
init_size: usize,
page_size: usize,
#[cfg(unix)]
advice: Advice,
on_size_change: Option<OnSizeChange>,
}
impl DenseFileCacheOpts {
#[must_use]
pub fn new(filename: PathBuf) -> Self {
DenseFileCacheOpts {
filename: Arc::new(filename),
write: true,
autogrow: true,
init_size: 1024 * 1024 * 1024, page_size: 1024 * 1024 * 1024, on_size_change: None,
#[cfg(unix)]
advice: Advice::Normal,
}
}
#[must_use]
pub fn write(mut self, write: bool) -> Self {
if !write {
todo!("Readonly cache is not supported yet")
}
self.write = write;
self
}
#[must_use]
pub fn on_size_change(mut self, on_size_change: Option<OnSizeChange>) -> Self {
self.on_size_change = on_size_change;
self
}
#[must_use]
pub fn autogrow(mut self, autogrow: bool) -> Self {
if !autogrow {
todo!("Constant cache size is not supported yet")
}
self.autogrow = autogrow;
self
}
#[must_use]
pub fn init_size(mut self, init_size: usize) -> Self {
self.init_size = init_size;
self
}
#[must_use]
pub fn page_size(mut self, page_size: usize) -> Self {
self.page_size = page_size;
self
}
#[must_use]
pub fn advise(mut self, advice: Advice) -> Self {
self.advice = advice;
self
}
pub fn open(self) -> OsmNodeCacheResult<DenseFileCache> {
DenseFileCache::new_opt(self)
}
}
fn resize_and_memmap(index: usize, opts: &DenseFileCacheOpts) -> OsmNodeCacheResult<MmapMut> {
if opts.page_size % size_of::<usize>() != 0 {
return Err(OsmNodeCacheError::InvalidPageSize {
page_size: opts.page_size,
element_size: size_of::<usize>(),
});
}
let file = open_cache_file(opts.filename.as_ref())?;
let old_size = file.metadata().unwrap().len();
let capacity = (index + 1) * size_of::<usize>();
let pages = capacity / opts.page_size + usize::from(capacity % opts.page_size != 0);
let new_size = (pages * opts.page_size) as u64;
if old_size < new_size {
if let Some(value) = opts.on_size_change {
value(to_64_usize(old_size), to_64_usize(new_size));
}
file.set_len(new_size)?;
}
Ok(unsafe { MmapMut::map_mut(&file)? })
}
fn to_64_usize(old_size: u64) -> usize {
usize::try_from(old_size).expect("Unable to convert large u64 to usize on this platform")
}
fn lock_and_link(memmap: &RwLock<MmapMut>) -> (Option<RwLockReadGuard<'_, MmapMut>>, &[AtomicU64]) {
let mm = memmap.read().unwrap();
let data_as_u8: &[u8] = mm.as_ref();
let raw_data;
#[allow(clippy::transmute_ptr_to_ptr)]
{
raw_data = unsafe { transmute::<&[u8], &[AtomicU64]>(data_as_u8) };
}
(Some(mm), raw_data)
}
#[derive(Clone)]
pub struct DenseFileCache {
opts: DenseFileCacheOpts,
memmap: Arc<RwLock<MmapMut>>,
mutex: Arc<Mutex<()>>,
}
struct CacheWriter<'a> {
parent: &'a DenseFileCache,
mm_setter: Option<RwLockReadGuard<'a, MmapMut>>,
raw_data: &'a [AtomicU64],
}
impl DenseFileCache {
pub fn new(filename: PathBuf) -> OsmNodeCacheResult<Self> {
DenseFileCacheOpts::new(filename).open()
}
#[cfg(unix)]
pub fn advise(&self, advice: Advice) -> OsmNodeCacheResult<()> {
self.memmap.read().unwrap().advise(advice)?;
Ok(())
}
fn new_opt(opts: DenseFileCacheOpts) -> OsmNodeCacheResult<Self> {
let mmap = resize_and_memmap(0, &opts)?;
let cache = Self {
opts,
memmap: Arc::new(RwLock::new(mmap)),
mutex: Arc::new(Mutex::new(())),
};
#[cfg(unix)]
if cache.opts.advice != Advice::Normal {
cache.advise(cache.opts.advice)?;
}
Ok(cache)
}
}
impl CacheStore for DenseFileCache {
fn get_accessor(&self) -> Box<dyn Cache + '_> {
let (mm_setter, raw_data) = lock_and_link(&self.memmap);
Box::new(CacheWriter {
parent: self,
mm_setter,
raw_data,
})
}
}
impl CacheWriter<'_> {
fn len(&self) -> usize {
self.raw_data.len() / size_of::<usize>()
}
}
impl Cache for CacheWriter<'_> {
fn get(&self, index: usize) -> u64 {
assert!(
index < self.len(),
"Index {index} exceeds cache size {}",
self.len()
);
self.raw_data[index].load(Ordering::Relaxed)
}
fn set(&mut self, index: usize, value: u64) {
if index >= self.len() {
self.mm_setter = None;
{
let _pre_write_lock = self.parent.mutex.lock().unwrap();
if index >= self.len() {
let p = self.parent;
let mut write_lock = p.memmap.write().unwrap();
write_lock.flush().unwrap();
*write_lock = resize_and_memmap(index, &p.opts).unwrap();
}
}
let (mm_setter, raw_data) = lock_and_link(&self.parent.memmap);
self.mm_setter = mm_setter;
self.raw_data = raw_data;
}
self.raw_data[index].store(value, Ordering::Relaxed);
}
}
#[cfg(test)]
mod tests {
use std::fs;
use std::path::PathBuf;
use rayon::iter::{ParallelBridge, ParallelIterator};
use crate::traits::tests::get_random_items;
use crate::*;
#[test]
fn dense_file() {
let test_file = "./dense_file_test.dat";
let _ = fs::remove_file(test_file);
{
let fc = DenseFileCacheOpts::new(PathBuf::from(test_file))
.page_size(8)
.open()
.unwrap();
let threads = 10;
let items = 100_000;
(0_usize..threads)
.par_bridge()
.for_each_with(fc.clone(), |fc, _thread_id| {
let mut cache = fc.get_accessor();
for v in get_random_items(items) {
cache.set(v, v as u64);
}
});
(0_usize..threads)
.par_bridge()
.for_each_with(fc, |fc, _thread_id| {
let cache = fc.get_accessor();
for v in get_random_items(items) {
assert_eq!(v as u64, cache.get(v));
}
});
}
let _ = fs::remove_file(test_file);
}
}