#![deny(missing_docs)]
use std::cell::UnsafeCell;
use std::fs::OpenOptions;
use std::hash::{Hash, Hasher};
use std::io;
use std::marker::PhantomData;
use std::mem;
use std::ops::{Deref, DerefMut};
use std::path::{Path, PathBuf};
use arrayvec::ArrayVec;
use lazy_static::lazy_static;
use memmap::MmapMut;
use parking_lot::{Mutex, MutexGuard};
use seahash::SeaHasher;
const NUM_LANES: usize = 64;
const NUM_SHARDS: usize = 1024;
const PAGE_SIZE: usize = 4096;
const FIRST_LANE_PAGES: usize = 64;
struct Shard;
lazy_static! {
static ref SHARDS: ArrayVec<[Mutex<Shard>; NUM_SHARDS]> = {
let mut locks = ArrayVec::new();
for _ in 0..NUM_SHARDS {
locks.push(Mutex::new(Shard))
}
locks
};
}
#[inline(always)]
fn hash_val<T: Hash>(t: &T) -> u64 {
let mut hasher = SeaHasher::new();
t.hash(&mut hasher);
hasher.finish()
}
enum Found<'a, K, V> {
Some(&'a Entry<K, V>),
None(usize, usize, usize),
Invalid(usize, usize, usize),
}
pub type AlreadyThere = bool;
pub struct Index<K, V> {
lanes: UnsafeCell<ArrayVec<[MmapMut; NUM_LANES]>>,
path: PathBuf,
pages: Mutex<u64>,
_marker: PhantomData<(K, V)>,
}
unsafe impl<K, V> Send for Index<K, V> {}
unsafe impl<K, V> Sync for Index<K, V> {}
#[derive(Debug)]
struct Entry<K, V> {
key: K,
val: V,
next: u64,
kv_checksum: u64,
next_checksum: u64,
}
struct EntryMut<'a, K, V> {
entry: &'a mut Entry<K, V>,
_lock: MutexGuard<'a, Shard>,
}
impl<'a, K, V> Deref for EntryMut<'a, K, V> {
type Target = Entry<K, V>;
fn deref(&self) -> &Self::Target {
&self.entry
}
}
impl<'a, K, V> DerefMut for EntryMut<'a, K, V> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.entry
}
}
impl<K: Hash, V: Hash> Entry<K, V> {
fn new(key: K, val: V) -> Self {
let kv_checksum = hash_val(&key).wrapping_add(hash_val(&val));
let entry = Entry {
key,
val,
kv_checksum,
next: 0,
next_checksum: 0 + 1,
};
debug_assert!(entry.valid());
entry
}
fn valid(&self) -> bool {
if hash_val(&self.key).wrapping_add(hash_val(&self.val))
== self.kv_checksum
&& self.next + 1 == self.next_checksum
{
true
} else {
false
}
}
fn set_next<I: Into<u64>>(&mut self, next: I) {
let next = next.into();
self.next = next;
self.next_checksum = next + 1;
}
}
impl<K: Hash + Copy + PartialEq, V: Hash + Copy> Index<K, V> {
pub fn new<P: AsRef<Path>>(path: &P) -> io::Result<Self> {
let mut lanes = ArrayVec::new();
for n in 0..NUM_LANES {
let mut pathbuf = PathBuf::from(path.as_ref());
pathbuf.push(&format!("{:02x}", n));
if pathbuf.exists() {
let file =
OpenOptions::new().read(true).write(true).open(&pathbuf)?;
let lane_pages = Self::lane_pages(n);
let file_len = PAGE_SIZE as u64 * lane_pages as u64;
file.set_len(file_len)?;
unsafe { lanes.push(MmapMut::map_mut(&file)?) };
}
}
let mut num_pages = 0;
if let Some(last) = lanes.last() {
let last: &MmapMut = last;
let mut full_pages = 0;
for n in 0..lanes.len().saturating_sub(1) {
println!("lane {}, pages {}", n, Self::lane_pages(n));
full_pages += Self::lane_pages(n)
}
let mut low_bound = 0;
let mut high_bound = Self::lane_pages(lanes.len() - 1) - 1;
while low_bound + 1 != high_bound {
let check = low_bound + (high_bound - low_bound) / 2;
println!(
"low bound: {}, high bound: {}, check {}",
low_bound, high_bound, check,
);
let page_ofs = PAGE_SIZE * check;
for slot in 0..Self::entries_per_page() {
let slot_ofs =
page_ofs + slot * mem::size_of::<Entry<K, V>>();
let ptr = last.as_ptr();
let entry: &Entry<K, V> = unsafe {
mem::transmute(ptr.offset(slot_ofs as isize))
};
if entry.valid() {
low_bound = check;
break;
}
}
if low_bound != check {
high_bound = check
}
}
num_pages = full_pages + high_bound;
}
let index = Index {
lanes: UnsafeCell::new(lanes),
path: PathBuf::from(path.as_ref()),
pages: Mutex::new(num_pages as u64),
_marker: PhantomData,
};
if num_pages == 0 {
assert_eq!(index.new_page()?, 0);
}
Ok(index)
}
pub fn pages(&self) -> usize {
*self.pages.lock() as usize
}
#[inline(always)]
fn lane_pages(n: usize) -> usize {
2_usize.pow(n as u32) * FIRST_LANE_PAGES
}
#[inline(always)]
fn entries_per_page() -> usize {
PAGE_SIZE / mem::size_of::<Entry<K, V>>()
}
#[inline(always)]
fn slot(key_hash: u64, depth: usize) -> usize {
(hash_val(&(key_hash + depth as u64)) % Self::entries_per_page() as u64)
as usize
}
#[inline(always)]
fn lane_page(page: usize) -> (usize, usize) {
let usize_bits = mem::size_of::<usize>() * 8;
let i = page / FIRST_LANE_PAGES + 1;
let lane = usize_bits - i.leading_zeros() as usize - 1;
let page = page - (2usize.pow(lane as u32) - 1) * FIRST_LANE_PAGES;
(lane, page)
}
fn new_lane(&self) -> io::Result<()> {
let lanes_ptr = self.lanes.get();
let lane_nr = unsafe { (*lanes_ptr).len() };
let num_pages = Self::lane_pages(lane_nr);
let mut path = self.path.clone();
path.push(format!("{:02x}", lane_nr));
let file_len = PAGE_SIZE as u64 * num_pages as u64;
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&path)?;
file.set_len(file_len)?;
unsafe { (*lanes_ptr).push(MmapMut::map_mut(&file)?) }
Ok(())
}
fn new_page(&self) -> io::Result<u64> {
let mut page_nr = self.pages.lock();
let (_, offset) = Self::lane_page(*page_nr as usize);
if offset == 0 {
self.new_lane()?
}
let new_page_nr = *page_nr;
*page_nr += 1;
Ok(new_page_nr)
}
fn entry(&self, lane: usize, page: usize, slot: usize) -> &Entry<K, V> {
let page_ofs = PAGE_SIZE * page;
let slot_ofs = page_ofs + slot * mem::size_of::<Entry<K, V>>();
unsafe {
mem::transmute(
(*self.lanes.get())[lane].as_ptr().offset(slot_ofs as isize),
)
}
}
fn entry_mut(
&self,
lane: usize,
page: usize,
slot: usize,
) -> EntryMut<K, V> {
let shard = (page ^ slot) % NUM_SHARDS;
let lock = SHARDS[shard].lock();
let page_ofs = PAGE_SIZE * page;
let slot_ofs = page_ofs + slot * mem::size_of::<Entry<K, V>>();
EntryMut {
entry: unsafe {
mem::transmute(
(*self.lanes.get())[lane]
.as_ptr()
.offset(slot_ofs as isize),
)
},
_lock: lock,
}
}
fn find_key(&self, k: &K) -> io::Result<Found<K, V>> {
let mut depth = 0;
let mut abs_page = 0;
loop {
let hash = hash_val(&k);
let slot = Self::slot(hash, depth);
let (lane, page) = Self::lane_page(abs_page);
let entry = self.entry(lane, page, slot);
if !entry.valid() {
return Ok(Found::Invalid(lane, page, slot));
}
if &entry.key == k {
return Ok(Found::Some(entry));
} else if entry.next == 0 {
return Ok(Found::None(lane, page, slot));
} else {
abs_page = entry.next as usize;
}
depth += 1;
}
}
pub fn insert(&self, key: K, val: V) -> io::Result<AlreadyThere> {
match self.find_key(&key)? {
Found::Some(_) => {
Ok(true)
}
Found::Invalid(lane, page, slot) => {
let mut entry = self.entry_mut(lane, page, slot);
if entry.valid() && entry.next != 0 {
mem::drop(entry);
self.insert(key, val)
} else {
*entry = Entry::new(key, val);
return Ok(false);
}
}
Found::None(lane, page, slot) => {
let mut entry = self.entry_mut(lane, page, slot);
if entry.next != 0 {
} else {
entry.set_next(self.new_page()?);
}
mem::drop(entry);
self.insert(key, val)
}
}
}
pub fn get(&self, key: &K) -> io::Result<Option<&V>> {
match self.find_key(key)? {
Found::Some(entry) => Ok(Some(&entry.val)),
_ => Ok(None),
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::thread;
use rand::{seq::SliceRandom, thread_rng};
use tempfile::tempdir;
use super::*;
#[test]
fn simple() {
let dir = tempdir().unwrap();
let index = Index::new(&dir).unwrap();
index.insert(0, 0).unwrap();
assert_eq!(index.get(&0).unwrap(), Some(&0));
}
const N: u64 = 1024 * 256;
#[test]
fn multiple() {
let dir = tempdir().unwrap();
let index = Index::new(&dir).unwrap();
for i in 0..N {
index.insert(i, i).unwrap();
}
for i in 0..N {
assert_eq!(index.get(&i).unwrap(), Some(&i));
}
}
#[test]
fn reload() {
let dir = tempdir().unwrap();
let mut pages;
{
{
let index_a = Index::new(&dir).unwrap();
for i in 0..N {
index_a.insert(i, i).unwrap();
}
pages = index_a.pages();
mem::drop(index_a);
}
let index_b = Index::new(&dir).unwrap();
assert_eq!(pages, index_b.pages());
for i in 0..N {
assert_eq!(index_b.get(&i).unwrap(), Some(&i));
}
for i in N..N * 2 {
index_b.insert(i, i).unwrap();
}
pages = index_b.pages();
mem::drop(index_b);
}
let index_c = Index::new(&dir).unwrap();
assert_eq!(pages, index_c.pages());
for i in 0..N * 2 {
assert_eq!(index_c.get(&i).unwrap(), Some(&i));
}
}
const N_THREADS: usize = 8;
#[test]
fn stress() {
let dir = tempdir().unwrap();
let index = Arc::new(Index::new(&dir).unwrap());
let mut all_indicies = vec![];
for i in 0..N {
all_indicies.push(i);
}
let mut rng = thread_rng();
let mut shuffles_write = vec![];
for _ in 0..N_THREADS {
let mut new = all_indicies.clone();
SliceRandom::shuffle(&mut new[..], &mut rng);
shuffles_write.push(new);
}
let mut shuffles_read = vec![];
for _ in 0..N_THREADS {
let mut new = all_indicies.clone();
SliceRandom::shuffle(&mut new[..], &mut rng);
shuffles_read.push(new);
}
let mut threads_running = vec![];
for i in 0..N_THREADS {
let shuffle_write = mem::replace(&mut shuffles_write[i], vec![]);
let index_write = index.clone();
threads_running.push(thread::spawn(move || {
for write in shuffle_write {
index_write.insert(write, write).unwrap();
}
}));
let shuffle_read = mem::replace(&mut shuffles_read[i], vec![]);
let index_read = index.clone();
threads_running.push(thread::spawn(move || {
for read in shuffle_read {
match index_read.get(&read).unwrap() {
Some(val) => assert_eq!(val, &read),
None => (),
}
}
}));
}
for thread in threads_running {
thread.join().unwrap()
}
for i in 0..N {
assert_eq!(index.get(&i).unwrap(), Some(&i));
}
}
}