use fs2::FileExt;
use memmap;
use skiplist;
use std;
use std::collections::{HashMap, HashSet};
use std::fs::{File, OpenOptions};
use std::path::Path;
use std::ptr::copy_nonoverlapping;
use std::sync::{Condvar, Mutex, MutexGuard};
use {Db, Representable, RC_ROOT};
pub const CURRENT_VERSION: u64 = 0;
const OFF_MAP_LENGTH: isize = 1;
const OFF_CURRENT_FREE: isize = 2;
pub const PAGE_SIZE: usize = 4096;
pub const PAGE_SIZE_64: u64 = 4096;
pub const ZERO_HEADER: isize = 24;
const DEFAULT_FLUSH_LIMIT: u64 = 4096;
#[derive(Debug)]
pub enum Error {
IO(std::io::Error),
NotEnoughSpace,
Poison,
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match *self {
Error::IO(ref err) => write!(f, "IO error: {}", err),
Error::NotEnoughSpace => write!(
f,
"Not enough space. Try opening the environment with a larger size."
),
Error::Poison => write!(f, "Lock poisoning error"),
}
}
}
impl std::error::Error for Error {
fn description(&self) -> &str {
match *self {
Error::IO(ref err) => err.description(),
Error::NotEnoughSpace => {
"Not enough space. Try opening the environment with a larger size."
}
Error::Poison => "Poison error",
}
}
fn cause(&self) -> Option<&std::error::Error> {
match *self {
Error::IO(ref err) => Some(err),
Error::NotEnoughSpace => None,
Error::Poison => None,
}
}
}
impl From<std::io::Error> for Error {
fn from(e: std::io::Error) -> Error {
Error::IO(e)
}
}
impl<T> From<std::sync::PoisonError<T>> for Error {
fn from(_: std::sync::PoisonError<T>) -> Error {
Error::Poison
}
}
pub struct Env {
length: u64,
backing_file: File,
flush_limit: u64,
lock_file: Option<File>,
mmap: Option<memmap::MmapMut>,
map: *mut u8,
first_unused_page: Mutex<u64>,
clock: Mutex<u64>,
txn_counter: Mutex<usize>,
last_commit_date: Mutex<(u64, usize)>,
concurrent_txns_are_finished: Condvar,
mutable: Mutex<()>,
}
unsafe impl Send for Env {}
unsafe impl Sync for Env {}
impl Drop for Env {
fn drop(&mut self) {
if let Some(mmap) = self.mmap.take() {
drop(mmap);
if let Ok(f) = self.first_unused_page.lock() {
if *f > 0 {
self.backing_file.set_len(*f).unwrap_or(());
}
}
}
if let Some(ref mut lock_file) = self.lock_file {
lock_file.unlock().unwrap_or(());
}
}
}
pub struct Txn<'env> {
env: &'env Env,
start_date: u64,
}
pub struct MutTxn<'env, T> {
env: &'env Env,
mutable: Option<MutexGuard<'env, ()>>,
parent: T,
last_page: u64,
current_list_page: Page,
current_list_length: u64,
current_list_position: u64,
occupied_clean_pages: HashSet<u64>,
free_clean_pages: Vec<u64>,
free_pages: Vec<u64>,
roots: HashMap<usize, u64>,
mut_page_count: u64,
}
impl<'env> Drop for Txn<'env> {
fn drop(&mut self) {
let mut m = self.env.txn_counter.lock().unwrap();
*m -= 1;
let mut m = self.env.last_commit_date.lock().unwrap();
if self.start_date <= m.0 {
m.1 -= 1
}
if m.1 == 0 {
self.env.concurrent_txns_are_finished.notify_one()
}
}
}
impl<'env, T> Drop for MutTxn<'env, T> {
fn drop(&mut self) {
debug!("dropping transaction");
if let Some(ref mut guard) = self.mutable {
debug!("dropping guard");
**guard
}
}
}
#[derive(Debug)]
pub struct Statistics {
pub free_pages: HashSet<u64>,
pub bookkeeping_pages: Vec<u64>,
pub total_pages: usize,
pub reference_counts: HashMap<u64, u64>,
}
impl Env {
pub fn file_size<P: AsRef<Path>>(path: P) -> Result<u64, Error> {
let db_path = path.as_ref().join("db");
debug!(
"db_path = {:?}, len = {:?}",
db_path,
std::fs::metadata(&db_path)?.len()
);
Ok(std::fs::metadata(&db_path)?.len())
}
pub fn size(&self) -> u64 {
self.length
}
pub fn set_flush_limit(&mut self, flush: u64) {
self.flush_limit = flush
}
pub fn flush_limit(&self) -> u64 {
self.flush_limit
}
pub fn new<P: AsRef<Path>>(path: P, length: u64) -> Result<Env, Error> {
let lock_file = OpenOptions::new()
.read(true)
.write(true)
.truncate(false)
.create(true)
.open(path.as_ref().join("db").with_extension("lock"))?;
lock_file.lock_exclusive()?;
let mut env = unsafe { Self::new_nolock(path, length)? };
env.lock_file = Some(lock_file);
Ok(env)
}
pub unsafe fn new_nolock<P: AsRef<Path>>(path: P, length: u64) -> Result<Env, Error> {
debug!("length {:?}", length);
let db_path = path.as_ref().join("db");
let db_exists = std::fs::metadata(&db_path).is_ok();
let length = if let Ok(meta) = std::fs::metadata(&db_path) {
std::cmp::max(meta.len(), length)
} else {
length
};
let file = OpenOptions::new()
.read(true)
.write(true)
.truncate(false)
.create(true)
.open(&db_path)?;
debug!("allocate: {:?}", length);
file.set_len(length)?;
let mut mmap = memmap::MmapMut::map_mut(&file)?;
let map = mmap.as_mut_ptr();
debug!("mmap: {:?} {:?}", map, mmap.len());
if !db_exists {
std::ptr::write_bytes(map, 0, PAGE_SIZE);
*(map as *mut u64) = CURRENT_VERSION.to_le();
} else {
assert!(u64::from_le(*(map as *const u64)) == CURRENT_VERSION)
}
debug!(
"metadata.len() = {:?}",
std::fs::metadata(&db_path).map(|x| x.len())
);
let env = Env {
length: length,
mmap: Some(mmap),
map: map,
backing_file: file,
lock_file: None,
first_unused_page: Mutex::new(0),
mutable: Mutex::new(()),
flush_limit: DEFAULT_FLUSH_LIMIT,
txn_counter: Mutex::new(0),
clock: Mutex::new(0),
last_commit_date: Mutex::new((0, 0)),
concurrent_txns_are_finished: Condvar::new(),
};
Ok(env)
}
pub fn txn_begin<'env>(&'env self) -> Result<Txn<'env>, Error> {
let mut read = self.clock.lock()?;
*read += 1;
let mut counter = self.txn_counter.lock()?;
*counter += 1;
Ok(Txn {
env: self,
start_date: *read,
})
}
pub fn mut_txn_begin<'env>(&'env self) -> Result<MutTxn<'env, ()>, Error> {
unsafe {
debug!("taking mutable lock");
let guard = self.mutable.lock()?;
debug!("taking last commit lock");
let mut last_commit = self.last_commit_date.lock()?;
while last_commit.1 > 0 {
last_commit = self.concurrent_txns_are_finished.wait(last_commit)?;
}
debug!("lock ok");
let last_page = u64::from_le(*((self.map as *const u64).offset(OFF_MAP_LENGTH)));
let current_list_page =
u64::from_le(*((self.map as *const u64).offset(OFF_CURRENT_FREE)));
debug!("map header = {:?}, {:?}", last_page, current_list_page);
assert!(current_list_page < self.length);
let current_list_page = Page {
data: self.map.offset(current_list_page as isize),
offset: current_list_page,
};
let current_list_length = if current_list_page.offset == 0 {
0
} else {
u64::from_le(*((current_list_page.data as *const u64).offset(1)))
};
Ok(MutTxn {
env: self,
mutable: Some(guard),
parent: (),
last_page: if last_page == 0 {
PAGE_SIZE_64
} else {
last_page
},
current_list_page: current_list_page,
current_list_length: current_list_length,
current_list_position: current_list_length,
occupied_clean_pages: HashSet::new(),
free_clean_pages: Vec::new(),
free_pages: Vec::new(),
roots: HashMap::new(),
mut_page_count: 0,
})
}
}
pub unsafe fn close(&mut self) {
if let Some(mmap) = self.mmap.take() {
drop(mmap);
if let Ok(f) = self.first_unused_page.lock() {
if *f > 0 {
self.backing_file.set_len(*f).unwrap_or(());
}
}
}
if let Some(ref mut lock_file) = self.lock_file {
lock_file.unlock().unwrap_or(());
}
}
}
impl<'a> Txn<'a> {
pub fn statistics(&self) -> Statistics {
unsafe {
let total_pages =
u64::from_le(*((self.env.map as *const u64).offset(OFF_MAP_LENGTH))) as usize;
let mut free_pages = HashSet::new();
let mut bookkeeping_pages = Vec::new();
let mut cur = u64::from_le(*((self.env.map as *const u64).offset(OFF_CURRENT_FREE)));
while cur != 0 {
bookkeeping_pages.push(cur);
let p = self.env.map.offset(cur as isize) as *const u64;
let prev = u64::from_le(*p);
let len = u64::from_le(*(p.offset(1)));
debug!("bookkeeping page: {:?}, {} {}", cur, prev, len);
{
let mut p: *const u64 = (p).offset(2);
let mut i = 0;
while i < len {
let free_page = u64::from_le(*p);
if !free_pages.insert(free_page) {
panic!("free page counted twice: {:?}", free_page)
}
p = p.offset(1);
i += 1
}
}
cur = prev
}
let mut reference_counts = HashMap::new();
let rc_root = self.root_(RC_ROOT);
if rc_root != 0 {
use Transaction;
let db: Db<u64, u64> = Db(rc_root, std::marker::PhantomData);
reference_counts.extend(self.iter(&db, None))
}
Statistics {
total_pages: total_pages / PAGE_SIZE,
free_pages,
bookkeeping_pages,
reference_counts,
}
}
}
pub fn references<K: Representable, V: Representable>(
&self,
h: &mut HashSet<u64>,
root: Db<K, V>,
) {
self.references_::<K, V>(h, root.0)
}
fn references_<K: Representable, V: Representable>(&self, h: &mut HashSet<u64>, page: u64) {
if page != 0 {
if h.contains(&page) {
panic!("h.contains({:?})", page);
}
h.insert(page);
let page = Page {
data: unsafe { self.env.map.offset(page as isize) },
offset: page,
};
for (_, v, r) in skiplist::iter_all::<_, K, V>(&page) {
if let Some((k, v)) = v {
for offset in k.page_offsets() {
h.insert(offset);
}
for offset in v.page_offsets() {
h.insert(offset);
}
}
self.references_::<K, V>(h, r)
}
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct Page {
pub data: *const u8,
pub offset: u64,
}
#[derive(Debug)]
pub(crate) struct MutPage {
pub data: *mut u8,
pub offset: u64,
}
pub trait LoadPage {
fn load_page(&self, off: u64) -> Page;
fn root_(&self, num: usize) -> u64;
}
impl<'env> LoadPage for Txn<'env> {
fn load_page(&self, off: u64) -> Page {
trace!("load_page: off={:?}, length = {:?}", off, self.env.length);
assert!(off < self.env.length);
unsafe {
Page {
data: self.env.map.offset(off as isize),
offset: off,
}
}
}
fn root_(&self, num: usize) -> u64 {
assert!(ZERO_HEADER as usize + ((num + 1) << 3) < PAGE_SIZE);
unsafe {
u64::from_le(*((self.env.map.offset(ZERO_HEADER) as *const u64).offset(num as isize)))
}
}
}
impl<'env, A> LoadPage for MutTxn<'env, A> {
fn load_page(&self, off: u64) -> Page {
if off >= self.env.length {
panic!("{:?} >= {:?}", off, self.env.length)
}
unsafe {
Page {
data: self.env.map.offset(off as isize),
offset: off,
}
}
}
fn root_(&self, num: usize) -> u64 {
if let Some(root) = self.roots.get(&num) {
*root
} else {
assert!(ZERO_HEADER + ((num as isize + 1) << 3) < (PAGE_SIZE as isize));
unsafe {
u64::from_le(
*((self.env.map.offset(ZERO_HEADER) as *const u64).offset(num as isize)),
)
}
}
}
}
#[derive(Debug)]
pub(crate) enum Cow {
Page(Page),
MutPage(MutPage),
}
impl<'env, T> MutTxn<'env, T> {
pub fn mut_txn_begin<'txn>(
&'txn mut self,
) -> Result<MutTxn<'env, &'txn mut MutTxn<'env, T>>, Error> {
unsafe {
let mut txn = MutTxn {
env: self.env,
mutable: None,
parent: std::mem::uninitialized(),
last_page: self.last_page,
current_list_page: Page {
data: self.current_list_page.data,
offset: self.current_list_page.offset,
},
current_list_length: self.current_list_length,
current_list_position: self.current_list_position,
occupied_clean_pages: HashSet::new(),
free_clean_pages: Vec::new(),
free_pages: Vec::new(),
roots: self.roots.clone(),
mut_page_count: 0,
};
txn.parent = self;
Ok(txn)
}
}
pub(crate) fn set_root_(&mut self, num: usize, value: u64) {
self.roots.insert(num, value);
}
pub(crate) fn load_cow_page(&mut self, off: u64) -> Cow {
trace!(
"transaction::load_mut_page: {:?} {:?}",
off,
self.occupied_clean_pages
);
assert!(off < self.env.length);
if off != 0 && self.occupied_clean_pages.contains(&off) {
unsafe {
Cow::MutPage(MutPage {
data: self.env.map.offset(off as isize),
offset: off,
})
}
} else {
unsafe {
let d = self.env.map.offset(off as isize);
Cow::Page(Page {
data: d,
offset: off,
})
}
}
}
pub(crate) unsafe fn free_page(&mut self, offset: u64) {
debug!("transaction::free page: {:?}", offset);
if self.occupied_clean_pages.remove(&offset) {
self.free_clean_pages.push(offset);
} else {
self.free_pages.push(offset)
}
}
fn free_pages_pop(&mut self) -> Option<u64> {
debug!(
"free_pages_pop, current_list_position:{}",
self.current_list_position
);
if self.current_list_page.offset == 0 {
None
} else {
if self.current_list_position == 0 {
let previous_page =
unsafe { u64::from_le(*(self.current_list_page.data as *const u64)) };
debug!("free_pages_pop, previous page:{}", previous_page);
if previous_page == 0 {
None
} else {
self.free_pages.push(self.current_list_page.offset);
unsafe {
self.current_list_page = Page {
data: self.env.map.offset(previous_page as isize),
offset: previous_page,
};
self.current_list_length =
u64::from_le(*((self.current_list_page.data as *const u64).offset(1)))
}
self.current_list_position = self.current_list_length;
self.free_pages_pop()
}
} else {
let pos = self.current_list_position;
self.current_list_position -= 1;
debug!(
"free_pages_pop, new position:{}",
self.current_list_position
);
unsafe {
Some(u64::from_le(
*((self.current_list_page.data as *mut u64).offset(1 + pos as isize)),
))
}
}
}
}
pub(crate) fn alloc_page(&mut self) -> Result<MutPage, Error> {
debug!("alloc page");
self.mut_page_count += 1;
if self.mut_page_count >= self.env.flush_limit {
if let Some(ref map) = self.env.mmap {
map.flush()?;
}
self.mut_page_count = 0
}
if let Some(page) = self.free_clean_pages.pop() {
debug!("clean page reuse:{}", page);
self.occupied_clean_pages.insert(page);
Ok(MutPage {
data: unsafe { self.env.map.offset(page as isize) },
offset: page,
})
} else {
if let Some(page) = self.free_pages_pop() {
debug!("using an old free page: {}", page);
self.occupied_clean_pages.insert(page);
Ok(MutPage {
data: unsafe { self.env.map.offset(page as isize) },
offset: page,
})
} else {
let last = self.last_page;
debug!("eating the free space: {}", last);
if self.last_page + PAGE_SIZE_64 < self.env.length {
self.last_page += PAGE_SIZE_64;
self.occupied_clean_pages.insert(last);
Ok(MutPage {
data: unsafe { self.env.map.offset(last as isize) },
offset: last,
})
} else {
Err(Error::NotEnoughSpace)
}
}
}
}
}
pub trait Commit {
fn commit(self) -> Result<(), Error>;
}
impl<'a, 'env, T> Commit for MutTxn<'env, &'a mut MutTxn<'env, T>> {
fn commit(self) -> Result<(), Error> {
self.parent.last_page = self.last_page;
self.parent.current_list_page = Page {
offset: self.current_list_page.offset,
data: self.current_list_page.data,
};
self.parent.current_list_length = self.current_list_length;
self.parent.current_list_position = self.current_list_position;
self.parent
.occupied_clean_pages
.extend(self.occupied_clean_pages.iter());
self.parent
.free_clean_pages
.extend(self.free_clean_pages.iter());
self.parent.free_pages.extend(self.free_pages.iter());
for (u, v) in self.roots.iter() {
self.parent.roots.insert(*u, *v);
}
Ok(())
}
}
impl<'env> Commit for MutTxn<'env, ()> {
fn commit(mut self) -> Result<(), Error> {
unsafe {
let mut current_page = try!(self.alloc_page());
if self.current_list_page.offset != 0 {
debug!("commit: realloc BK, copy {:?}", self.current_list_position);
copy_nonoverlapping(
self.current_list_page.data as *const u64,
current_page.data as *mut u64,
2 + self.current_list_position as usize,
);
*((current_page.data as *mut u64).offset(1)) = self.current_list_position.to_le();
debug!("freeing BK page {:?}", self.current_list_page.offset);
self.free_pages.push(self.current_list_page.offset);
} else {
*(current_page.data as *mut u64) = 0;
*((current_page.data as *mut u64).offset(1)) = 0;
}
while !(self.free_pages.is_empty() && self.free_clean_pages.is_empty()) {
debug!("commit: pushing");
let len = u64::from_le(*((current_page.data as *const u64).offset(1)));
debug!("len={:?}", len);
if 16 + len * 8 + 8 >= PAGE_SIZE_64 {
debug!("commit: current is full, len={}", len);
let p = self
.free_pages
.pop()
.unwrap_or_else(|| self.free_clean_pages.pop().unwrap());
let new_page = MutPage {
data: self.env.map.offset(p as isize),
offset: p,
};
debug!("commit {} allocated {:?}", line!(), new_page.offset);
*(new_page.data as *mut u64) = current_page.offset.to_le();
*((new_page.data as *mut u64).offset(1)) = 0;
current_page = new_page;
} else {
let p = self
.free_pages
.pop()
.unwrap_or_else(|| self.free_clean_pages.pop().unwrap());
debug!("commit: push {}", p);
*((current_page.data as *mut u64).offset(1)) = (len + 1).to_le();
*((current_page.data as *mut u64).offset(2 + len as isize)) = p.to_le();
}
}
{
debug!("commit: taking local lock");
let mut last_commit = self.env.last_commit_date.lock()?;
debug!("commit: taking txn_counter lock");
let n_txns = self.env.txn_counter.lock()?;
debug!("commit: taking clock lock");
let mut clock = self.env.clock.lock()?;
debug!("ok");
*clock += 1;
last_commit.0 = *clock;
last_commit.1 = *n_txns;
debug!("commit: lock ok");
for (u, v) in self.roots.iter() {
*((self.env.map.offset(ZERO_HEADER) as *mut u64).offset(*u as isize)) =
(*v).to_le();
}
debug!("env.length = {:?}", self.env.length);
if let Some(ref mmap) = self.env.mmap {
mmap.flush_range(2 * PAGE_SIZE, (self.env.length - 2 * PAGE_SIZE_64) as usize)?;
*((self.env.map as *mut u64).offset(OFF_MAP_LENGTH)) = self.last_page.to_le();
*((self.env.map as *mut u64).offset(OFF_CURRENT_FREE)) =
current_page.offset.to_le();
mmap.flush_range(0, 2 * PAGE_SIZE)?;
}
{
let mut last = self.env.first_unused_page.lock()?;
debug!("last_page = {:?}", self.last_page);
*last = self.last_page;
}
if let Some(guard) = self.mutable.take() {
debug!("dropping guard");
*guard
}
Ok(())
}
}
}
}