use crate::skiplist;
use crate::{Db, Representable, RC_ROOT};
#[cfg(feature = "mmap")]
use fs2::FileExt;
#[cfg(feature = "mmap")]
use memmap;
use parking_lot::lock_api::RawMutex;
use std;
use std::borrow::Borrow;
use std::collections::{HashMap, HashSet};
#[cfg(feature = "mmap")]
use std::fs::OpenOptions;
use std::path::Path;
#[cfg(feature = "mmap")]
use std::path::PathBuf;
use std::ptr::copy_nonoverlapping;
use std::sync::{Condvar, Mutex};
use thiserror::*;
pub const CURRENT_VERSION: u64 = 3;
const OFF_MAP_LENGTH: isize = 1;
const OFF_CURRENT_FREE: isize = 2;
pub const PAGE_SIZE: usize = 4096;
pub const PAGE_SIZE_64: u64 = 4096;
pub const ZERO_HEADER: isize = 24;
pub const CRC_SIZE: isize = 4;
const DEFAULT_FLUSH_LIMIT: u64 = 4096;
#[derive(Debug, Error)]
pub enum Error {
#[error(transparent)]
IO(#[from] std::io::Error),
#[error("Lock poisoning error")]
Poison,
#[error("Sanakirja version mismatch")]
VersionMismatch,
#[error(transparent)]
CRC(#[from] CRCError),
}
impl<T> From<std::sync::PoisonError<T>> for Error {
fn from(_: std::sync::PoisonError<T>) -> Error {
Error::Poison
}
}
#[derive(Debug, Error)]
#[error("CRC check failed")]
pub struct CRCError{}
#[derive(Debug)]
#[doc(hidden)]
pub struct Map {
pub ptr: *mut u8,
#[cfg(feature = "mmap")]
file: Option<std::fs::File>,
#[cfg(feature = "mmap")]
mmap: memmap::MmapMut,
#[cfg(not(feature = "mmap"))]
layout: std::alloc::Layout,
pub length: u64,
}
impl Map {
#[cfg(feature = "mmap")]
fn flush(&self) -> Result<(), Error> {
Ok(self.mmap.flush()?)
}
#[cfg(not(feature = "mmap"))]
fn flush(&self) -> Result<(), Error> {
Ok(())
}
}
pub struct Env<T> {
#[cfg(feature = "mmap")]
path: Option<PathBuf>,
flush_limit: u64,
#[cfg(feature = "mmap")]
lock_file: Option<T>,
#[cfg(not(feature = "mmap"))]
lock_file: std::marker::PhantomData<T>,
#[doc(hidden)]
pub mmaps: Mutex<Vec<Map>>,
first_unused_page: Mutex<u64>,
clock: Mutex<u64>,
txn_counter: Mutex<usize>,
last_commit_date: Mutex<(u64, usize)>,
concurrent_txns_are_finished: Condvar,
mutable: parking_lot::RawMutex,
version: u64,
}
unsafe impl<T> Send for Env<T> {}
unsafe impl<T> Sync for Env<T> {}
impl<T> Env<T> {
pub fn len(&self) -> u64 {
*self.first_unused_page.lock().unwrap()
}
}
#[cfg(feature = "mmap")]
impl<T> Drop for Env<T> {
fn drop(&mut self) {
for map in self.mmaps.lock().unwrap().drain(..) {
drop(map.mmap);
drop(map.file);
}
}
}
#[cfg(not(feature = "mmap"))]
impl<T> Drop for Env<T> {
fn drop(&mut self) {
if let Ok(mut mmaps) = self.mmaps.lock() {
for map in mmaps.drain(..) {
unsafe { std::alloc::dealloc(map.ptr, map.layout) }
}
}
}
}
pub struct Txn<L, E: Borrow<Env<L>>> {
env: E,
start_date: u64,
lock: std::marker::PhantomData<L>,
version: u64,
}
pub struct MutTxn<E: Borrow<Env<Exclusive>>, T> {
env: E,
parent: T,
last_page: u64,
current_list_page: Page,
current_list_length: u64,
current_list_position: u64,
occupied_clean_pages: HashSet<u64>,
free_clean_pages: Vec<u64>,
free_pages: Vec<u64>,
roots: HashMap<usize, u64>,
mut_page_count: u64,
version: u64,
}
impl<L, E: Borrow<Env<L>>> Drop for Txn<L, E> {
fn drop(&mut self) {
let env = self.env.borrow();
let mut m = env.txn_counter.lock().unwrap();
*m -= 1;
let mut m = env.last_commit_date.lock().unwrap();
if self.start_date <= m.0 {
m.1 -= 1
}
if m.1 == 0 {
env.concurrent_txns_are_finished.notify_one()
}
}
}
impl<E: Borrow<Env<Exclusive>>, T> Drop for MutTxn<E, T> {
fn drop(&mut self) {
debug!("dropping transaction");
unsafe { self.env.borrow().mutable.unlock() }
}
}
#[derive(Debug, Clone)]
pub struct Statistics {
pub free_pages: HashSet<u64>,
pub bookkeeping_pages: Vec<u64>,
pub total_pages: usize,
pub reference_counts: HashMap<u64, u64>,
}
pub struct Exclusive(std::fs::File);
#[cfg(feature = "mmap")]
impl Drop for Exclusive {
fn drop(&mut self) {
debug!("dropping lock file");
self.0.unlock().unwrap_or(());
}
}
pub struct Shared(std::fs::File);
#[cfg(feature = "mmap")]
impl Drop for Shared {
fn drop(&mut self) {
debug!("dropping lock file");
self.0.unlock().unwrap_or(());
}
}
impl<T> Env<T> {
pub fn file_size<P: AsRef<Path>>(path: P) -> Result<u64, Error> {
let db_path = path.as_ref().join("db");
debug!(
"db_path = {:?}, len = {:?}",
db_path,
std::fs::metadata(&db_path)?.len()
);
Ok(std::fs::metadata(&db_path)?.len())
}
pub fn set_flush_limit(&mut self, flush: u64) {
self.flush_limit = flush
}
pub fn flush_limit(&self) -> u64 {
self.flush_limit
}
#[cfg(feature = "mmap")]
pub unsafe fn new_nolock<P: AsRef<Path>>(path: P, length: u64) -> Result<Self, Error> {
debug!("length {:?}", length);
let mut db_path = path.as_ref().join("db0");
let db_exists = std::fs::metadata(&db_path).is_ok();
let length = if let Ok(meta) = std::fs::metadata(&db_path) {
std::cmp::max(meta.len(), length)
} else {
std::cmp::max(length, 4096)
};
let length = length.next_power_of_two();
let file = OpenOptions::new()
.read(true)
.write(true)
.truncate(false)
.create(true)
.open(&db_path)?;
debug!("allocate: {:?}", length);
file.set_len(length)?;
debug!(
"metadata.len() = {:?}",
std::fs::metadata(&db_path).map(|x| x.len())
);
let mmap = memmap::MmapMut::map_mut(&file)?;
let mut env = Self::new_nolock_mmap(Some(file), length, mmap, !db_exists)?;
db_path.pop();
env.path = Some(db_path);
Ok(env)
}
#[cfg(feature = "mmap")]
unsafe fn new_nolock_mmap(
file: Option<std::fs::File>,
length: u64,
mut mmap: memmap::MmapMut,
initialise: bool,
) -> Result<Self, Error> {
let map = mmap.as_mut_ptr();
debug!("mmap: {:?} {:?}", map, mmap.len());
let version = if initialise {
std::ptr::write_bytes(map, 0, PAGE_SIZE);
*(map as *mut u64) = CURRENT_VERSION.to_le();
1
} else {
let version = u64::from_le(*(map as *const u64));
assert!(version == 1 || version == 2 || version == 3);
version
};
let env = Env {
path: None,
mmaps: Mutex::new(vec![Map {
ptr: map,
mmap,
file,
length,
}]),
lock_file: None,
first_unused_page: Mutex::new(0),
mutable: parking_lot::RawMutex::INIT,
flush_limit: DEFAULT_FLUSH_LIMIT,
txn_counter: Mutex::new(0),
clock: Mutex::new(0),
last_commit_date: Mutex::new((0, 0)),
concurrent_txns_are_finished: Condvar::new(),
version,
};
Ok(env)
}
#[cfg(not(feature = "mmap"))]
unsafe fn new_nolock_mmap(length: u64, initialise: bool) -> Result<Env<Exclusive>, Error> {
let layout = std::alloc::Layout::from_size_align(length as usize, 64).unwrap();
let map = std::alloc::alloc(layout);
let version = if initialise {
std::ptr::write_bytes(map, 0, PAGE_SIZE);
*(map as *mut u64) = CURRENT_VERSION.to_le();
CURRENT_VERSION
} else {
let version = u64::from_le(*(map as *const u64));
assert!(version == 1 || version == 2);
version
};
let env = Env {
mmaps: Mutex::new(vec![Map {
ptr: map,
layout,
length,
}]),
lock_file: std::marker::PhantomData,
first_unused_page: Mutex::new(0),
mutable: parking_lot::RawMutex::INIT,
flush_limit: DEFAULT_FLUSH_LIMIT,
txn_counter: Mutex::new(0),
clock: Mutex::new(0),
last_commit_date: Mutex::new((0, 0)),
concurrent_txns_are_finished: Condvar::new(),
version,
};
Ok(env)
}
}
impl Env<Shared> {
#[cfg(feature = "mmap")]
pub fn new_shared<P: AsRef<Path>>(path: P, length: u64) -> Result<Env<Shared>, Error> {
let lock_file = OpenOptions::new()
.read(true)
.write(true)
.truncate(false)
.create(true)
.open(path.as_ref().join("db").with_extension("lock"))?;
lock_file.lock_shared()?;
let mut env = unsafe { Self::new_nolock(path, length)? };
env.lock_file = Some(Shared(lock_file));
Ok(env)
}
#[cfg(feature = "mmap")]
pub fn try_new_shared<P: AsRef<Path>>(path: P, length: u64) -> Result<Env<Shared>, Error> {
let lock_file = OpenOptions::new()
.read(true)
.write(true)
.truncate(false)
.create(true)
.open(path.as_ref().join("db").with_extension("lock"))?;
lock_file.try_lock_shared()?;
let mut env = unsafe { Self::new_nolock(path, length)? };
env.lock_file = Some(Shared(lock_file));
Ok(env)
}
}
impl Env<Exclusive> {
#[cfg(feature = "mmap")]
pub fn new<P: AsRef<Path>>(path: P, length: u64) -> Result<Env<Exclusive>, Error> {
let lock_file = OpenOptions::new()
.read(true)
.write(true)
.truncate(false)
.create(true)
.open(path.as_ref().join("db").with_extension("lock"))?;
lock_file.lock_exclusive()?;
let mut env = unsafe { Self::new_nolock(path, length)? };
env.lock_file = Some(Exclusive(lock_file));
Ok(env)
}
#[cfg(feature = "mmap")]
pub fn try_new<P: AsRef<Path>>(path: P, length: u64) -> Result<Env<Exclusive>, Error> {
let lock_file = OpenOptions::new()
.read(true)
.write(true)
.truncate(false)
.create(true)
.open(path.as_ref().join("db").with_extension("lock"))?;
lock_file.try_lock_exclusive()?;
let mut env = unsafe { Self::new_nolock(path, length)? };
env.lock_file = Some(Exclusive(lock_file));
Ok(env)
}
#[cfg(feature = "mmap")]
pub fn new_anon(length: u64) -> Result<Env<Exclusive>, Error> {
let length = std::cmp::max(length, 4096).next_power_of_two();
let mmap = memmap::MmapMut::map_anon(length as usize)?;
unsafe { Self::new_nolock_mmap(None, length, mmap, true) }
}
#[cfg(not(feature = "mmap"))]
pub fn new_anon(length: u64) -> Result<Env<Exclusive>, Error> {
let length = std::cmp::max(length, 4096).next_power_of_two();
unsafe { Self::new_nolock_mmap(length, true) }
}
pub fn mut_txn_begin<E: Borrow<Self>>(env: E) -> Result<MutTxn<E, ()>, Error> {
unsafe {
debug!("taking mutable lock");
env.borrow().mutable.lock();
debug!("taking last commit lock");
{
let mut last_commit = env.borrow().last_commit_date.lock()?;
while last_commit.1 > 0 {
last_commit = env
.borrow()
.concurrent_txns_are_finished
.wait(last_commit)?;
}
}
debug!("lock ok");
let last_page = u64::from_le(
*((env.borrow().mmaps.lock().unwrap()[0].ptr as *const u64).offset(OFF_MAP_LENGTH)),
);
if env.borrow().version >= 3 {
}
let current_list_page = u64::from_le(
*((env.borrow().mmaps.lock().unwrap()[0].ptr as *const u64)
.offset(OFF_CURRENT_FREE)),
);
debug!("map header = {:?}, {:?}", last_page, current_list_page);
let current_list_page = Page {
data: env.borrow().find_offset(current_list_page),
offset: current_list_page,
};
let current_list_length = if current_list_page.offset == 0 {
0
} else {
u64::from_le(*((current_list_page.data as *const u64).offset(1)))
};
Ok(MutTxn {
version: env.borrow().version,
env,
parent: (),
last_page: if last_page == 0 {
PAGE_SIZE_64
} else {
last_page
},
current_list_page: current_list_page,
current_list_length: current_list_length,
current_list_position: current_list_length,
occupied_clean_pages: HashSet::new(),
free_clean_pages: Vec::new(),
free_pages: Vec::new(),
roots: HashMap::new(),
mut_page_count: 0,
})
}
}
}
impl<L> Env<L> {
#[cfg(not(feature = "mmap"))]
fn open_mmap(&self, i: usize, length0: u64) -> Result<Map, Error> {
let length = length0 << i;
let layout = std::alloc::Layout::from_size_align(length as usize, 64).unwrap();
let map = unsafe { std::alloc::alloc(layout) };
Ok(Map {
ptr: map,
layout,
length,
})
}
#[cfg(feature = "mmap")]
fn open_mmap(&self, i: usize, length0: u64) -> Result<Map, Error> {
let length = length0 << i;
if let Some(ref path) = self.path {
let mut db_path = path.join(&format!("db{}", i));
let file = OpenOptions::new()
.read(true)
.write(true)
.truncate(false)
.create(true)
.open(&db_path)?;
debug!("allocate: {:?}", length);
file.set_len(length)?;
debug!(
"metadata.len() = {:?}",
std::fs::metadata(&db_path).map(|x| x.len())
);
db_path.pop();
let mut mmap = unsafe { memmap::MmapMut::map_mut(&file)? };
Ok(Map {
ptr: mmap.as_mut_ptr(),
mmap,
file: Some(file),
length,
})
} else {
let mut mmap = memmap::MmapMut::map_anon(length as usize)?;
Ok(Map {
ptr: mmap.as_mut_ptr(),
mmap,
file: None,
length,
})
}
}
unsafe fn find_offset(&self, mut offset: u64) -> *mut u8 {
let mut i = 0;
let mut mmaps = self.mmaps.lock().unwrap();
loop {
debug!("find_offset i = {:?}, mmaps = {:?}", i, mmaps);
if i >= mmaps.len() {
let length0 = mmaps[0].length;
mmaps.push(self.open_mmap(i, length0).unwrap());
}
if offset < mmaps[i].length {
return mmaps[i].ptr.offset(offset as isize);
}
offset -= mmaps[i].length;
i += 1
}
}
#[cfg(feature = "mmap")]
pub unsafe fn close(&mut self) {
for m in self.mmaps.lock().unwrap().drain(..) {
drop(m.mmap);
}
if let Some(lock_file) = self.lock_file.take() {
drop(lock_file)
}
}
#[cfg(not(feature = "mmap"))]
pub unsafe fn close(&mut self) {
if let Ok(mut mmaps) = self.mmaps.lock() {
for m in mmaps.drain(..) {
std::alloc::dealloc(m.ptr, m.layout)
}
}
}
}
impl<L> Env<L> {
pub fn txn_begin<E: Borrow<Self>>(env: E) -> Result<Txn<L, E>, Error> {
let start_date = {
let mut read = env.borrow().clock.lock()?;
*read += 1;
let mut counter = env.borrow().txn_counter.lock()?;
*counter += 1;
*read
};
if env.borrow().version >= 3 {
unsafe {
check_crc0(env.borrow().mmaps.lock().unwrap()[0].ptr)?
}
}
Ok(Txn {
version: env.borrow().version,
env,
start_date,
lock: std::marker::PhantomData,
})
}
}
impl<L, E: Borrow<Env<L>>> Txn<L, E> {
pub fn statistics(&self) -> Statistics {
unsafe {
let total_pages = u64::from_le(
*((self.env.borrow().mmaps.lock().unwrap()[0].ptr as *const u64)
.offset(OFF_MAP_LENGTH)),
) as usize;
let mut free_pages = HashSet::new();
let mut bookkeeping_pages = Vec::new();
let mut cur = u64::from_le(
*((self.env.borrow().mmaps.lock().unwrap()[0].ptr as *const u64)
.offset(OFF_CURRENT_FREE)),
);
while cur != 0 {
bookkeeping_pages.push(cur);
let p = self.env.borrow().find_offset(cur) as *const u64;
let prev = u64::from_le(*p);
let len = u64::from_le(*(p.offset(1)));
debug!("bookkeeping page: {:?}, {} {}", cur, prev, len);
{
let mut p: *const u64 = (p).offset(2);
let mut i = 0;
while i < len {
let free_page = u64::from_le(*p);
if !free_pages.insert(free_page) {
panic!("free page counted twice: {:?}", free_page)
}
p = p.offset(1);
i += 1
}
}
cur = prev
}
let mut reference_counts = HashMap::new();
let rc_root = self.root_(RC_ROOT);
if rc_root != 0 {
use crate::Transaction;
let db: Db<u64, u64> = Db(rc_root, std::marker::PhantomData);
reference_counts.extend(self.iter(&db, None).unwrap().map(|x| x.unwrap()))
}
Statistics {
total_pages: total_pages / PAGE_SIZE,
free_pages,
bookkeeping_pages,
reference_counts,
}
}
}
pub fn references<K: Representable, V: Representable>(
&self,
h: &mut HashMap<u64, usize>,
root: Db<K, V>,
) {
self.references_::<K, V>(h, root.0)
}
pub fn check_references(&self, refs: &mut HashMap<u64, usize>) {
let rc_root = self.root_(RC_ROOT);
if rc_root != 0 {
let db: Db<u64, u64> = Db(rc_root, std::marker::PhantomData);
self.references::<u64, u64>(refs, db);
}
for (page, rc0) in refs.iter() {
debug!("checking rc of {:?}", page);
let rc1 = super::rc(self, *page).unwrap() as usize;
if !((rc1 <= 1 && *rc0 == 1) || *rc0 == rc1) {
panic!("page {:?} has rc {:?}, should have {:?}", page, rc1, rc0);
}
}
}
fn references_<K: Representable, V: Representable>(
&self,
h: &mut HashMap<u64, usize>,
page: u64,
) {
debug!("starting references, page = {:?}", page);
let mut stack = vec![page];
while let Some(page) = stack.pop() {
debug!("references_, page = {:?}", page);
use std::collections::hash_map::Entry;
if page != 0 {
match h.entry(page) {
Entry::Vacant(e) => {
e.insert(1);
}
Entry::Occupied(e) => {
*e.into_mut() += 1;
continue;
}
}
let data = unsafe { self.env.borrow().find_offset(page) };
debug!("off = {:?}", page);
unsafe {
if self.version >= 2 && page > 0 {
check_crc(data).unwrap();
} else if self.version >= 3 {
check_crc0(data).unwrap();
}
}
let page = Page { data, offset: page };
for (_, v, r) in skiplist::iter_all::<_, K, V>(&page) {
info!("ignoring references in {:?}", v);
debug!("r = {:?}", r);
stack.push(r)
}
}
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct Page {
pub data: *const u8,
pub offset: u64,
}
#[derive(Debug)]
#[doc(hidden)]
pub struct MutPage {
pub data: *mut u8,
pub offset: u64,
}
pub trait LoadPage {
fn load_page(&self, off: u64) -> Result<Page, CRCError>;
fn root_(&self, num: usize) -> u64;
}
impl<L, E: Borrow<Env<L>>> LoadPage for Txn<L, E> {
fn load_page(&self, off: u64) -> Result<Page, CRCError> {
trace!("load_page: off={:?}", off);
unsafe {
let data = self.env.borrow().find_offset(off);
if self.version >= 2 && off > 0 {
check_crc(data)?;
} else if self.version >= 3 {
check_crc0(data)?;
}
Ok(Page { data, offset: off })
}
}
fn root_(&self, num: usize) -> u64 {
assert!(ZERO_HEADER as usize + ((num + 1) << 3) < PAGE_SIZE);
unsafe {
u64::from_le(
*((self.env.borrow().mmaps.lock().unwrap()[0]
.ptr
.offset(ZERO_HEADER) as *const u64)
.offset(num as isize)),
)
}
}
}
impl<E: Borrow<Env<Exclusive>>, A> LoadPage for MutTxn<E, A> {
fn load_page(&self, off: u64) -> Result<Page, CRCError> {
trace!("mut load_page: off={:?}", off);
unsafe {
let data = self.env.borrow().find_offset(off);
if !self.occupied_clean_pages.contains(&off) {
if self.version >= 2 && off > 0 {
check_crc(data)?;
} else if self.version >= 3 {
check_crc0(data)?;
}
}
Ok(Page { data, offset: off })
}
}
fn root_(&self, num: usize) -> u64 {
if let Some(root) = self.roots.get(&num) {
*root
} else {
assert!(ZERO_HEADER + ((num as isize + 1) << 3) < (PAGE_SIZE as isize));
unsafe {
u64::from_le(
*((self.env.borrow().mmaps.lock().unwrap()[0]
.ptr
.offset(ZERO_HEADER) as *const u64)
.offset(num as isize)),
)
}
}
}
}
#[derive(Debug)]
pub(crate) enum Cow {
Page(Page),
MutPage(MutPage),
}
impl<E: Borrow<Env<Exclusive>> + Clone, T> MutTxn<E, T> {
pub fn mut_txn_begin<'txn>(&'txn mut self) -> Result<MutTxn<E, &'txn mut MutTxn<E, T>>, Error> {
Ok(MutTxn {
version: self.env.borrow().version,
env: self.env.clone(),
last_page: self.last_page,
current_list_page: Page {
data: self.current_list_page.data,
offset: self.current_list_page.offset,
},
current_list_length: self.current_list_length,
current_list_position: self.current_list_position,
occupied_clean_pages: HashSet::new(),
free_clean_pages: Vec::new(),
free_pages: Vec::new(),
roots: self.roots.clone(),
mut_page_count: 0,
parent: self,
})
}
}
#[cfg(feature = "crc32")]
unsafe fn check_crc(p: *const u8) -> Result<(), CRCError> {
let crc = u32::from_le(*(p as *const u32));
let mut h = crc32fast::Hasher::new();
let data = std::slice::from_raw_parts(p.offset(4), PAGE_SIZE - 4);
h.update(data);
let crc_ = h.finalize();
debug!("check crc {:?} {:?} {:?}", p, crc, crc_);
if crc_ == crc {
Ok(())
} else {
Err(CRCError {})
}
}
#[cfg(not(feature = "crc32"))]
unsafe fn check_crc(_: *const u8) -> Result<(), CRCError> {
Ok(())
}
#[cfg(feature = "crc32")]
unsafe fn check_crc0(p: *const u8) -> Result<(), CRCError> {
let crc = u32::from_le(*(p as *const u32).offset(1023));
let mut h = crc32fast::Hasher::new();
let data = std::slice::from_raw_parts(p, PAGE_SIZE - 4);
h.update(data);
let crc_ = h.finalize();
debug!("check crc0 {:?} {:?} {:?}", p, crc, crc_);
if crc_ == crc {
Ok(())
} else {
Err(CRCError {})
}
}
#[cfg(not(feature = "crc32"))]
unsafe fn check_crc0(_: *const u8) -> Result<(), CRCError> {
Ok(())
}
impl<E: Borrow<Env<Exclusive>>, T> MutTxn<E, T> {
pub(crate) fn set_root_(&mut self, num: usize, value: u64) {
self.roots.insert(num, value);
}
pub(crate) fn load_cow_page(&mut self, off: u64) -> Result<Cow, Error> {
trace!("load_cow_page: off={:?}", off);
if off != 0 && self.occupied_clean_pages.contains(&off) {
unsafe {
Ok(Cow::MutPage(MutPage {
data: self.env.borrow().find_offset(off),
offset: off,
}))
}
} else {
unsafe {
let d = self.env.borrow().find_offset(off);
if self.version >= 2 && off > 0 {
check_crc(d)?;
} else if self.version >= 3 {
check_crc0(d)?;
}
Ok(Cow::Page(Page {
data: d,
offset: off,
}))
}
}
}
pub(crate) unsafe fn free_page(&mut self, offset: u64) {
debug!("transaction::free page: {:?}", offset);
if self.occupied_clean_pages.remove(&offset) {
self.free_clean_pages.push(offset);
} else {
self.free_pages.push(offset)
}
}
fn free_pages_pop(&mut self) -> Option<u64> {
debug!(
"free_pages_pop, current_list_position:{}",
self.current_list_position
);
if self.current_list_page.offset == 0 {
None
} else {
if self.current_list_position == 0 {
let previous_page =
unsafe { u64::from_le(*(self.current_list_page.data as *const u64)) };
debug!("free_pages_pop, previous page:{}", previous_page);
if previous_page == 0 {
None
} else {
self.free_pages.push(self.current_list_page.offset);
unsafe {
self.current_list_page = Page {
data: self.env.borrow().find_offset(previous_page),
offset: previous_page,
};
self.current_list_length =
u64::from_le(*((self.current_list_page.data as *const u64).offset(1)))
}
self.current_list_position = self.current_list_length;
self.free_pages_pop()
}
} else {
let pos = self.current_list_position;
self.current_list_position -= 1;
debug!(
"free_pages_pop, new position:{}",
self.current_list_position
);
unsafe {
Some(u64::from_le(
*((self.current_list_page.data as *mut u64).offset(1 + pos as isize)),
))
}
}
}
}
#[doc(hidden)]
pub fn alloc_page(&mut self) -> Result<MutPage, Error> {
debug!("alloc page");
self.mut_page_count += 1;
if self.mut_page_count >= self.env.borrow().flush_limit {
for m in self.env.borrow().mmaps.lock().unwrap().iter() {
m.flush()?;
}
self.mut_page_count = 0
}
if let Some(page) = self.free_clean_pages.pop() {
debug!("clean page reuse:{}", page);
self.occupied_clean_pages.insert(page);
Ok(MutPage {
data: unsafe { self.env.borrow().find_offset(page) },
offset: page,
})
} else {
if let Some(page) = self.free_pages_pop() {
debug!("using an old free page: {}", page);
self.occupied_clean_pages.insert(page);
Ok(MutPage {
data: unsafe { self.env.borrow().find_offset(page) },
offset: page,
})
} else {
let last = self.last_page;
debug!("eating the free space: {}", last);
self.last_page += PAGE_SIZE_64;
self.occupied_clean_pages.insert(last);
Ok(MutPage {
data: unsafe { self.env.borrow().find_offset(last) },
offset: last,
})
}
}
}
}
pub trait Commit {
fn commit(self) -> Result<(), Error>;
}
impl<'a, E: Borrow<Env<Exclusive>>, T> Commit for MutTxn<E, &'a mut MutTxn<E, T>> {
fn commit(self) -> Result<(), Error> {
self.parent.last_page = self.last_page;
self.parent.current_list_page = Page {
offset: self.current_list_page.offset,
data: self.current_list_page.data,
};
self.parent.current_list_length = self.current_list_length;
self.parent.current_list_position = self.current_list_position;
self.parent
.occupied_clean_pages
.extend(self.occupied_clean_pages.iter());
self.parent
.free_clean_pages
.extend(self.free_clean_pages.iter());
self.parent.free_pages.extend(self.free_pages.iter());
for (u, v) in self.roots.iter() {
self.parent.roots.insert(*u, *v);
}
Ok(())
}
}
impl<E: Borrow<Env<Exclusive>>> MutTxn<E, ()> {
#[cfg(feature = "crc32")]
fn crc(&self) {
for &p in self.occupied_clean_pages.iter() {
unsafe {
let mut h = crc32fast::Hasher::new();
let ptr = self.env.borrow().find_offset(p);
let data = std::slice::from_raw_parts(ptr.offset(4), PAGE_SIZE - 4);
h.update(data);
*(ptr as *mut u32) = h.finalize().to_le();
}
}
}
#[cfg(not(feature = "crc32"))]
fn crc(&self) {}
}
#[cfg(feature = "crc32")]
fn crc0(ptr: *mut u8) {
unsafe {
let mut h = crc32fast::Hasher::new();
let data = std::slice::from_raw_parts(ptr, PAGE_SIZE - 4);
h.update(data);
*(ptr as *mut u32).offset(1023) = h.finalize().to_le();
}
}
#[cfg(not(feature = "crc32"))]
fn crc0(_ptr: *mut u8) {}
impl<E: Borrow<Env<Exclusive>>> Commit for MutTxn<E, ()> {
fn commit(mut self) -> Result<(), Error> {
unsafe {
info!("Commit");
self.crc();
let mut current_page = self.alloc_page()?;
if self.current_list_page.offset != 0 {
debug!("commit: realloc BK, copy {:?}", self.current_list_position);
copy_nonoverlapping(
self.current_list_page.data as *const u64,
current_page.data as *mut u64,
2 + self.current_list_position as usize,
);
*((current_page.data as *mut u64).offset(1)) = self.current_list_position.to_le();
debug!("freeing BK page {:?}", self.current_list_page.offset);
self.free_pages.push(self.current_list_page.offset);
} else {
*(current_page.data as *mut u64) = 0;
*((current_page.data as *mut u64).offset(1)) = 0;
}
while !(self.free_pages.is_empty() && self.free_clean_pages.is_empty()) {
debug!("commit: pushing");
let len = u64::from_le(*((current_page.data as *const u64).offset(1)));
debug!("len={:?}", len);
if 16 + len * 8 + 8 >= PAGE_SIZE_64 {
debug!("commit: current is full, len={}", len);
let p = self
.free_pages
.pop()
.unwrap_or_else(|| self.free_clean_pages.pop().unwrap());
let new_page = MutPage {
data: self.env.borrow().find_offset(p),
offset: p,
};
debug!("commit {} allocated {:?}", line!(), new_page.offset);
*(new_page.data as *mut u64) = current_page.offset.to_le();
*((new_page.data as *mut u64).offset(1)) = 0;
current_page = new_page;
} else {
let p = self
.free_pages
.pop()
.unwrap_or_else(|| self.free_clean_pages.pop().unwrap());
debug!("commit: push {}", p);
*((current_page.data as *mut u64).offset(1)) = (len + 1).to_le();
*((current_page.data as *mut u64).offset(2 + len as isize)) = p.to_le();
}
}
{
debug!("commit: taking local lock");
let mut last_commit = self.env.borrow().last_commit_date.lock()?;
debug!("commit: taking txn_counter lock");
let n_txns = self.env.borrow().txn_counter.lock()?;
debug!("commit: taking clock lock");
let mut clock = self.env.borrow().clock.lock()?;
debug!("ok");
*clock += 1;
last_commit.0 = *clock;
last_commit.1 = *n_txns;
debug!("commit: lock ok");
let mmaps = self.env.borrow().mmaps.lock().unwrap();
for (u, v) in self.roots.iter() {
*((mmaps[0].ptr.offset(ZERO_HEADER) as *mut u64).offset(*u as isize)) =
(*v).to_le();
}
for mmap in mmaps.iter().skip(1) {
mmap.flush()?;
}
*((mmaps[0].ptr as *mut u64).offset(OFF_MAP_LENGTH)) = self.last_page.to_le();
*((mmaps[0].ptr as *mut u64).offset(OFF_CURRENT_FREE)) =
current_page.offset.to_le();
crc0(mmaps[0].ptr);
mmaps[0].flush()?;
{
let mut last = self.env.borrow().first_unused_page.lock()?;
debug!("last_page = {:?}", self.last_page);
*last = self.last_page;
}
self.env.borrow().mutable.unlock();
Ok(())
}
}
}
}